summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_master_process.c
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2017-07-12 20:32:16 +0300
committerMax Romanov <max.romanov@nginx.com>2017-07-12 20:32:16 +0300
commitb0c1e740cf404f8fed5eed75fddb205ca74314e0 (patch)
tree08dcefc827c5dfb1570b682ea8d1e9abf17a31dc /src/nxt_master_process.c
parentc38bcb7d70729434893ae4d5f2f58a78a36d2bd5 (diff)
downloadunit-b0c1e740cf404f8fed5eed75fddb205ca74314e0.tar.gz
unit-b0c1e740cf404f8fed5eed75fddb205ca74314e0.tar.bz2
New process port exchange changed. READY message type introduced.
Application process start request DATA message from router to master. Master notifies router via NEW_PORT message after worker process become ready.
Diffstat (limited to '')
-rw-r--r--src/nxt_master_process.c203
1 files changed, 166 insertions, 37 deletions
diff --git a/src/nxt_master_process.c b/src/nxt_master_process.c
index 3b638389..c5699572 100644
--- a/src/nxt_master_process.c
+++ b/src/nxt_master_process.c
@@ -8,6 +8,8 @@
#include <nxt_runtime.h>
#include <nxt_port.h>
#include <nxt_master_process.h>
+#include <nxt_conf.h>
+#include <nxt_application.h>
static nxt_int_t nxt_master_process_port_create(nxt_task_t *task,
@@ -17,8 +19,8 @@ static nxt_int_t nxt_master_start_controller_process(nxt_task_t *task,
nxt_runtime_t *rt);
static nxt_int_t nxt_master_start_router_process(nxt_task_t *task,
nxt_runtime_t *rt);
-static nxt_int_t nxt_master_start_worker_processes(nxt_task_t *task,
- nxt_runtime_t *rt);
+static nxt_int_t nxt_master_start_worker_process(nxt_task_t *task,
+ nxt_runtime_t *rt, nxt_common_app_conf_t *app_conf, uint32_t stream);
static nxt_int_t nxt_master_create_worker_process(nxt_task_t *task,
nxt_runtime_t *rt, nxt_process_init_t *init);
static void nxt_master_process_sigterm_handler(nxt_task_t *task, void *obj,
@@ -64,15 +66,135 @@ nxt_master_process_start(nxt_thread_t *thr, nxt_task_t *task,
return ret;
}
- ret = nxt_master_start_router_process(task, rt);
+ return nxt_master_start_router_process(task, rt);
+}
+
+
+static nxt_conf_map_t nxt_common_app_conf[] = {
+ {
+ nxt_string("type"),
+ NXT_CONF_MAP_STR,
+ offsetof(nxt_common_app_conf_t, type),
+ },
+
+ {
+ nxt_string("user"),
+ NXT_CONF_MAP_STR,
+ offsetof(nxt_common_app_conf_t, user),
+ },
+
+ {
+ nxt_string("group"),
+ NXT_CONF_MAP_STR,
+ offsetof(nxt_common_app_conf_t, group),
+ },
+
+ {
+ nxt_string("workers"),
+ NXT_CONF_MAP_INT32,
+ offsetof(nxt_common_app_conf_t, workers),
+ },
+
+ {
+ nxt_string("path"),
+ NXT_CONF_MAP_STR,
+ offsetof(nxt_common_app_conf_t, u.python.path),
+ },
+
+ {
+ nxt_string("module"),
+ NXT_CONF_MAP_STR,
+ offsetof(nxt_common_app_conf_t, u.python.module),
+ },
+
+ {
+ nxt_string("root"),
+ NXT_CONF_MAP_STR,
+ offsetof(nxt_common_app_conf_t, u.php.root),
+ },
+
+ {
+ nxt_string("script"),
+ NXT_CONF_MAP_STR,
+ offsetof(nxt_common_app_conf_t, u.php.script),
+ },
+
+ {
+ nxt_string("index"),
+ NXT_CONF_MAP_STR,
+ offsetof(nxt_common_app_conf_t, u.php.index),
+ },
+
+ {
+ nxt_string("executable"),
+ NXT_CONF_MAP_STR,
+ offsetof(nxt_common_app_conf_t, u.go.executable),
+ },
+};
+
+
+static void
+nxt_port_master_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
+{
+ size_t dump_size;
+ nxt_mp_t *mp;
+ nxt_int_t ret;
+ nxt_buf_t *b;
+ nxt_conf_value_t *conf;
+ nxt_common_app_conf_t app_conf;
+
+ static nxt_str_t nobody = nxt_string("nobody");
+
+ b = msg->buf;
+ dump_size = b->mem.free - b->mem.pos;
+
+ if (dump_size > 300) {
+ dump_size = 300;
+ }
+
+ nxt_debug(task, "master data: %*s", dump_size, b->mem.pos);
+
+ mp = nxt_mp_create(1024, 128, 256, 32);
+
+ conf = nxt_conf_json_parse(mp, b->mem.pos, b->mem.free);
+ b->mem.pos = b->mem.free;
+
+ if (conf == NULL) {
+ nxt_log(task, NXT_LOG_CRIT, "configuration parsing error");
+ return;
+ }
+
+ nxt_memzero(&app_conf, sizeof(nxt_common_app_conf_t));
+
+ app_conf.user = nobody;
+
+ ret = nxt_conf_map_object(conf, nxt_common_app_conf,
+ nxt_nitems(nxt_common_app_conf), &app_conf);
if (ret != NXT_OK) {
- return ret;
+ nxt_log(task, NXT_LOG_CRIT, "root map error");
+ return;
}
- return nxt_master_start_worker_processes(task, rt);
+ app_conf.type_id = nxt_app_parse_type(&app_conf.type);
+
+ ret = nxt_master_start_worker_process(task, task->thread->runtime,
+ &app_conf, msg->port_msg.stream);
+
+ nxt_mp_destroy(mp);
}
+static nxt_port_handler_t nxt_master_process_port_handlers[] = {
+ NULL,
+ nxt_port_new_port_handler,
+ NULL,
+ nxt_port_mmap_handler,
+ nxt_port_master_data_handler,
+ NULL,
+ nxt_port_ready_handler,
+};
+
+
static nxt_int_t
nxt_master_process_port_create(nxt_task_t *task, nxt_runtime_t *rt)
{
@@ -85,7 +207,8 @@ nxt_master_process_port_create(nxt_task_t *task, nxt_runtime_t *rt)
return NXT_ERROR;
}
- port = nxt_process_port_new(process);
+ port = nxt_process_port_new(rt, process, nxt_port_get_next_id(),
+ NXT_PROCESS_MASTER);
if (nxt_slow_path(port == NULL)) {
return NXT_ERROR;
}
@@ -95,16 +218,15 @@ nxt_master_process_port_create(nxt_task_t *task, nxt_runtime_t *rt)
return ret;
}
- port->engine = 0;
- port->type = NXT_PROCESS_MASTER;
-
nxt_runtime_port_add(rt, port);
/*
* A master process port. A write port is not closed
* since it should be inherited by worker processes.
*/
- nxt_port_read_enable(task, port);
+ nxt_port_enable(task, port, nxt_master_process_port_handlers);
+
+ process->ready = 1;
return NXT_OK;
}
@@ -148,6 +270,9 @@ nxt_master_start_controller_process(nxt_task_t *task, nxt_runtime_t *rt)
init->port_handlers = nxt_controller_process_port_handlers;
init->signals = nxt_worker_process_signals;
init->type = NXT_PROCESS_CONTROLLER;
+ init->data = rt;
+ init->stream = 0;
+ init->restart = 1;
return nxt_master_create_worker_process(task, rt, init);
}
@@ -169,41 +294,45 @@ nxt_master_start_router_process(nxt_task_t *task, nxt_runtime_t *rt)
init->port_handlers = nxt_router_process_port_handlers;
init->signals = nxt_worker_process_signals;
init->type = NXT_PROCESS_ROUTER;
+ init->data = rt;
+ init->stream = 0;
+ init->restart = 1;
return nxt_master_create_worker_process(task, rt, init);
}
static nxt_int_t
-nxt_master_start_worker_processes(nxt_task_t *task, nxt_runtime_t *rt)
+nxt_master_start_worker_process(nxt_task_t *task, nxt_runtime_t *rt,
+ nxt_common_app_conf_t *app_conf, uint32_t stream)
{
- nxt_int_t ret;
- nxt_uint_t n;
nxt_process_init_t *init;
- init = nxt_mp_get(rt->mem_pool, sizeof(nxt_process_init_t));
+ init = nxt_malloc(sizeof(nxt_process_init_t) +
+ sizeof(nxt_user_cred_t));
if (nxt_slow_path(init == NULL)) {
return NXT_ERROR;
}
+ init->user_cred = (nxt_user_cred_t *) (init + 1);
+
init->start = nxt_app_start;
init->name = "worker process";
- init->user_cred = &rt->user_cred;
+
+ init->user_cred->user = (char *) app_conf->user.start;
+ if (nxt_user_cred_get(task, init->user_cred,
+ (char *) app_conf->group.start) != NXT_OK) {
+ return NXT_ERROR;
+ }
+
init->port_handlers = nxt_app_process_port_handlers;
init->signals = nxt_worker_process_signals;
init->type = NXT_PROCESS_WORKER;
+ init->data = app_conf;
+ init->stream = stream;
+ init->restart = 0;
- n = rt->worker_processes;
-
- while (n-- != 0) {
- ret = nxt_master_create_worker_process(task, rt, init);
-
- if (ret != NXT_OK) {
- return ret;
- }
- }
-
- return NXT_OK;
+ return nxt_master_create_worker_process(task, rt, init);
}
@@ -214,7 +343,7 @@ nxt_master_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt,
nxt_int_t ret;
nxt_pid_t pid;
nxt_port_t *port;
- nxt_process_t *process, *master_process;
+ nxt_process_t *process;
/*
* TODO: remove process, init, ports from array on memory and fork failures.
@@ -226,24 +355,20 @@ nxt_master_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt,
}
process->init = init;
- master_process = rt->mprocess;
- init->master_port = nxt_process_port_first(master_process);
- port = nxt_process_port_new(process);
+ port = nxt_process_port_new(rt, process, 0, init->type);
if (nxt_slow_path(port == NULL)) {
+ nxt_runtime_process_destroy(rt, process);
return NXT_ERROR;
}
- init->port = port;
-
ret = nxt_port_socket_init(task, port, 0);
if (nxt_slow_path(ret != NXT_OK)) {
+ nxt_mp_free(rt->mem_pool, port);
+ nxt_runtime_process_destroy(rt, process);
return ret;
}
- port->engine = 0;
- port->type = init->type;
-
pid = nxt_process_create(task, process);
switch (pid) {
@@ -261,7 +386,6 @@ nxt_master_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt,
nxt_port_read_close(port);
nxt_port_write_enable(task, port);
- nxt_port_send_new_port(task, rt, port);
return NXT_OK;
}
}
@@ -503,7 +627,12 @@ nxt_master_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid)
}
} else if (init != NULL) {
- (void) nxt_master_create_worker_process(task, rt, init);
+ if (init->restart != 0) {
+ (void) nxt_master_create_worker_process(task, rt, init);
+
+ } else {
+ nxt_free(init);
+ }
}
}
}