summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_master_process.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/nxt_master_process.c177
1 files changed, 165 insertions, 12 deletions
diff --git a/src/nxt_master_process.c b/src/nxt_master_process.c
index 3e72ce85..8a4f8564 100644
--- a/src/nxt_master_process.c
+++ b/src/nxt_master_process.c
@@ -27,6 +27,8 @@ static nxt_int_t nxt_master_start_controller_process(nxt_task_t *task,
nxt_runtime_t *rt);
static nxt_int_t nxt_master_start_router_process(nxt_task_t *task,
nxt_runtime_t *rt);
+static nxt_int_t nxt_master_start_discovery_process(nxt_task_t *task,
+ nxt_runtime_t *rt);
static nxt_int_t nxt_master_start_worker_process(nxt_task_t *task,
nxt_runtime_t *rt, nxt_common_app_conf_t *app_conf, uint32_t stream);
static nxt_int_t nxt_master_create_worker_process(nxt_task_t *task,
@@ -44,6 +46,9 @@ static void nxt_master_port_socket_handler(nxt_task_t *task,
nxt_port_recv_msg_t *msg);
static nxt_int_t nxt_master_listening_socket(nxt_sockaddr_t *sa,
nxt_listening_socket_t *ls);
+static void nxt_master_port_modules_handler(nxt_task_t *task,
+ nxt_port_recv_msg_t *msg);
+static int nxt_cdecl nxt_app_lang_compare(const void *v1, const void *v2);
const nxt_sig_event_t nxt_master_process_signals[] = {
@@ -63,8 +68,6 @@ nxt_int_t
nxt_master_process_start(nxt_thread_t *thr, nxt_task_t *task,
nxt_runtime_t *rt)
{
- nxt_int_t ret;
-
rt->types |= (1U << NXT_PROCESS_MASTER);
if (nxt_master_process_port_create(task, rt) != NXT_OK) {
@@ -73,12 +76,12 @@ nxt_master_process_start(nxt_thread_t *thr, nxt_task_t *task,
nxt_master_process_title(task);
- ret = nxt_master_start_controller_process(task, rt);
- if (ret != NXT_OK) {
- return ret;
- }
-
- return nxt_master_start_router_process(task, rt);
+ /*
+ * The dicsovery process will send a message processed by
+ * nxt_master_port_modules_handler() which starts the controller
+ * and router processes.
+ */
+ return nxt_master_start_discovery_process(task, rt);
}
@@ -197,8 +200,6 @@ nxt_port_master_start_worker_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
return;
}
- app_conf.type_id = nxt_app_parse_type(&app_conf.type);
-
ret = nxt_master_start_worker_process(task, task->thread->runtime,
&app_conf, msg->port_msg.stream);
@@ -216,6 +217,7 @@ static nxt_port_handler_t nxt_master_process_port_handlers[] = {
nxt_port_ready_handler,
nxt_port_master_start_worker_handler,
nxt_master_port_socket_handler,
+ nxt_master_port_modules_handler,
nxt_port_rpc_handler,
nxt_port_rpc_handler,
};
@@ -289,7 +291,7 @@ nxt_master_start_controller_process(nxt_task_t *task, nxt_runtime_t *rt)
{
nxt_process_init_t *init;
- init = nxt_mp_get(rt->mem_pool, sizeof(nxt_process_init_t));
+ init = nxt_malloc(sizeof(nxt_process_init_t));
if (nxt_slow_path(init == NULL)) {
return NXT_ERROR;
}
@@ -309,11 +311,35 @@ nxt_master_start_controller_process(nxt_task_t *task, nxt_runtime_t *rt)
static nxt_int_t
+nxt_master_start_discovery_process(nxt_task_t *task, nxt_runtime_t *rt)
+{
+ nxt_process_init_t *init;
+
+ init = nxt_malloc(sizeof(nxt_process_init_t));
+ if (nxt_slow_path(init == NULL)) {
+ return NXT_ERROR;
+ }
+
+ init->start = nxt_discovery_start;
+ init->name = "discovery";
+ init->user_cred = &rt->user_cred;
+ init->port_handlers = nxt_discovery_process_port_handlers;
+ init->signals = nxt_worker_process_signals;
+ init->type = NXT_PROCESS_DISCOVERY;
+ init->data = rt;
+ init->stream = 0;
+ init->restart = 0;
+
+ return nxt_master_create_worker_process(task, rt, init);
+}
+
+
+static nxt_int_t
nxt_master_start_router_process(nxt_task_t *task, nxt_runtime_t *rt)
{
nxt_process_init_t *init;
- init = nxt_mp_get(rt->mem_pool, sizeof(nxt_process_init_t));
+ init = nxt_malloc(sizeof(nxt_process_init_t));
if (nxt_slow_path(init == NULL)) {
return NXT_ERROR;
}
@@ -885,3 +911,130 @@ fail:
return NXT_ERROR;
}
+
+
+static nxt_conf_map_t nxt_app_lang_module_map[] = {
+ {
+ nxt_string("type"),
+ NXT_CONF_MAP_STR_COPY,
+ offsetof(nxt_app_lang_module_t, type),
+ },
+
+ {
+ nxt_string("version"),
+ NXT_CONF_MAP_STR_COPY,
+ offsetof(nxt_app_lang_module_t, version),
+ },
+
+ {
+ nxt_string("file"),
+ NXT_CONF_MAP_CSTRZ,
+ offsetof(nxt_app_lang_module_t, file),
+ },
+};
+
+
+static void
+nxt_master_port_modules_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
+{
+ uint32_t index;
+ nxt_mp_t *mp;
+ nxt_int_t ret;
+ nxt_buf_t *b;
+ nxt_runtime_t *rt;
+ nxt_conf_value_t *conf, *root, *value;
+ nxt_app_lang_module_t *lang;
+
+ static nxt_str_t root_path = nxt_string("/");
+
+ rt = task->thread->runtime;
+
+ if (msg->port_msg.pid != rt->port_by_type[NXT_PROCESS_DISCOVERY]->pid) {
+ return;
+ }
+
+ b = msg->buf;
+
+ if (b == NULL) {
+ return;
+ }
+
+ nxt_debug(task, "application languages: \"%*s\"",
+ b->mem.free - b->mem.pos, b->mem.pos);
+
+ mp = nxt_mp_create(1024, 128, 256, 32);
+ if (mp == NULL) {
+ return;
+ }
+
+ conf = nxt_conf_json_parse(mp, b->mem.pos, b->mem.free, NULL);
+ if (conf == NULL) {
+ goto fail;
+ }
+
+ root = nxt_conf_get_path(conf, &root_path);
+ if (root == NULL) {
+ goto fail;
+ }
+
+ for (index = 0; /* void */ ; index++) {
+ value = nxt_conf_get_array_element(root, index);
+ if (value == NULL) {
+ break;
+ }
+
+ lang = nxt_array_add(rt->languages);
+ if (lang == NULL) {
+ goto fail;
+ }
+
+ lang->module = NULL;
+
+ ret = nxt_conf_map_object(rt->mem_pool, value, nxt_app_lang_module_map,
+ nxt_nitems(nxt_app_lang_module_map), lang);
+
+ if (ret != NXT_OK) {
+ goto fail;
+ }
+
+ nxt_debug(task, "lang %V %V \"%s\"",
+ &lang->type, &lang->version, lang->file);
+ }
+
+ qsort(rt->languages->elts, rt->languages->nelts,
+ sizeof(nxt_app_lang_module_t), nxt_app_lang_compare);
+
+fail:
+
+ nxt_mp_destroy(mp);
+
+ ret = nxt_master_start_controller_process(task, rt);
+
+ if (ret == NXT_OK) {
+ (void) nxt_master_start_router_process(task, rt);
+ }
+}
+
+
+static int nxt_cdecl
+nxt_app_lang_compare(const void *v1, const void *v2)
+{
+ int n;
+ size_t length;
+ const nxt_app_lang_module_t *lang1, *lang2;
+
+ lang1 = v1;
+ lang2 = v2;
+
+ length = nxt_min(lang1->version.length, lang2->version.length);
+
+ n = nxt_strncmp(lang1->version.start, lang2->version.start, length);
+
+ if (n == 0) {
+ n = lang1->version.length - lang2->version.length;
+ }
+
+ /* Negate result to move higher versions to the beginning. */
+
+ return -n;
+}