diff options
author | Max Romanov <max.romanov@nginx.com> | 2017-07-12 20:32:16 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2017-07-12 20:32:16 +0300 |
commit | b0c1e740cf404f8fed5eed75fddb205ca74314e0 (patch) | |
tree | 08dcefc827c5dfb1570b682ea8d1e9abf17a31dc /src/nxt_master_process.c | |
parent | c38bcb7d70729434893ae4d5f2f58a78a36d2bd5 (diff) | |
download | unit-b0c1e740cf404f8fed5eed75fddb205ca74314e0.tar.gz unit-b0c1e740cf404f8fed5eed75fddb205ca74314e0.tar.bz2 |
New process port exchange changed. READY message type introduced.
Application process start request DATA message from router to master.
Master notifies router via NEW_PORT message after worker process become ready.
Diffstat (limited to '')
-rw-r--r-- | src/nxt_master_process.c | 203 |
1 files changed, 166 insertions, 37 deletions
diff --git a/src/nxt_master_process.c b/src/nxt_master_process.c index 3b638389..c5699572 100644 --- a/src/nxt_master_process.c +++ b/src/nxt_master_process.c @@ -8,6 +8,8 @@ #include <nxt_runtime.h> #include <nxt_port.h> #include <nxt_master_process.h> +#include <nxt_conf.h> +#include <nxt_application.h> static nxt_int_t nxt_master_process_port_create(nxt_task_t *task, @@ -17,8 +19,8 @@ static nxt_int_t nxt_master_start_controller_process(nxt_task_t *task, nxt_runtime_t *rt); static nxt_int_t nxt_master_start_router_process(nxt_task_t *task, nxt_runtime_t *rt); -static nxt_int_t nxt_master_start_worker_processes(nxt_task_t *task, - nxt_runtime_t *rt); +static nxt_int_t nxt_master_start_worker_process(nxt_task_t *task, + nxt_runtime_t *rt, nxt_common_app_conf_t *app_conf, uint32_t stream); static nxt_int_t nxt_master_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt, nxt_process_init_t *init); static void nxt_master_process_sigterm_handler(nxt_task_t *task, void *obj, @@ -64,15 +66,135 @@ nxt_master_process_start(nxt_thread_t *thr, nxt_task_t *task, return ret; } - ret = nxt_master_start_router_process(task, rt); + return nxt_master_start_router_process(task, rt); +} + + +static nxt_conf_map_t nxt_common_app_conf[] = { + { + nxt_string("type"), + NXT_CONF_MAP_STR, + offsetof(nxt_common_app_conf_t, type), + }, + + { + nxt_string("user"), + NXT_CONF_MAP_STR, + offsetof(nxt_common_app_conf_t, user), + }, + + { + nxt_string("group"), + NXT_CONF_MAP_STR, + offsetof(nxt_common_app_conf_t, group), + }, + + { + nxt_string("workers"), + NXT_CONF_MAP_INT32, + offsetof(nxt_common_app_conf_t, workers), + }, + + { + nxt_string("path"), + NXT_CONF_MAP_STR, + offsetof(nxt_common_app_conf_t, u.python.path), + }, + + { + nxt_string("module"), + NXT_CONF_MAP_STR, + offsetof(nxt_common_app_conf_t, u.python.module), + }, + + { + nxt_string("root"), + NXT_CONF_MAP_STR, + offsetof(nxt_common_app_conf_t, u.php.root), + }, + + { + nxt_string("script"), + NXT_CONF_MAP_STR, + offsetof(nxt_common_app_conf_t, u.php.script), + }, + + { + nxt_string("index"), + NXT_CONF_MAP_STR, + offsetof(nxt_common_app_conf_t, u.php.index), + }, + + { + nxt_string("executable"), + NXT_CONF_MAP_STR, + offsetof(nxt_common_app_conf_t, u.go.executable), + }, +}; + + +static void +nxt_port_master_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + size_t dump_size; + nxt_mp_t *mp; + nxt_int_t ret; + nxt_buf_t *b; + nxt_conf_value_t *conf; + nxt_common_app_conf_t app_conf; + + static nxt_str_t nobody = nxt_string("nobody"); + + b = msg->buf; + dump_size = b->mem.free - b->mem.pos; + + if (dump_size > 300) { + dump_size = 300; + } + + nxt_debug(task, "master data: %*s", dump_size, b->mem.pos); + + mp = nxt_mp_create(1024, 128, 256, 32); + + conf = nxt_conf_json_parse(mp, b->mem.pos, b->mem.free); + b->mem.pos = b->mem.free; + + if (conf == NULL) { + nxt_log(task, NXT_LOG_CRIT, "configuration parsing error"); + return; + } + + nxt_memzero(&app_conf, sizeof(nxt_common_app_conf_t)); + + app_conf.user = nobody; + + ret = nxt_conf_map_object(conf, nxt_common_app_conf, + nxt_nitems(nxt_common_app_conf), &app_conf); if (ret != NXT_OK) { - return ret; + nxt_log(task, NXT_LOG_CRIT, "root map error"); + return; } - return nxt_master_start_worker_processes(task, rt); + app_conf.type_id = nxt_app_parse_type(&app_conf.type); + + ret = nxt_master_start_worker_process(task, task->thread->runtime, + &app_conf, msg->port_msg.stream); + + nxt_mp_destroy(mp); } +static nxt_port_handler_t nxt_master_process_port_handlers[] = { + NULL, + nxt_port_new_port_handler, + NULL, + nxt_port_mmap_handler, + nxt_port_master_data_handler, + NULL, + nxt_port_ready_handler, +}; + + static nxt_int_t nxt_master_process_port_create(nxt_task_t *task, nxt_runtime_t *rt) { @@ -85,7 +207,8 @@ nxt_master_process_port_create(nxt_task_t *task, nxt_runtime_t *rt) return NXT_ERROR; } - port = nxt_process_port_new(process); + port = nxt_process_port_new(rt, process, nxt_port_get_next_id(), + NXT_PROCESS_MASTER); if (nxt_slow_path(port == NULL)) { return NXT_ERROR; } @@ -95,16 +218,15 @@ nxt_master_process_port_create(nxt_task_t *task, nxt_runtime_t *rt) return ret; } - port->engine = 0; - port->type = NXT_PROCESS_MASTER; - nxt_runtime_port_add(rt, port); /* * A master process port. A write port is not closed * since it should be inherited by worker processes. */ - nxt_port_read_enable(task, port); + nxt_port_enable(task, port, nxt_master_process_port_handlers); + + process->ready = 1; return NXT_OK; } @@ -148,6 +270,9 @@ nxt_master_start_controller_process(nxt_task_t *task, nxt_runtime_t *rt) init->port_handlers = nxt_controller_process_port_handlers; init->signals = nxt_worker_process_signals; init->type = NXT_PROCESS_CONTROLLER; + init->data = rt; + init->stream = 0; + init->restart = 1; return nxt_master_create_worker_process(task, rt, init); } @@ -169,41 +294,45 @@ nxt_master_start_router_process(nxt_task_t *task, nxt_runtime_t *rt) init->port_handlers = nxt_router_process_port_handlers; init->signals = nxt_worker_process_signals; init->type = NXT_PROCESS_ROUTER; + init->data = rt; + init->stream = 0; + init->restart = 1; return nxt_master_create_worker_process(task, rt, init); } static nxt_int_t -nxt_master_start_worker_processes(nxt_task_t *task, nxt_runtime_t *rt) +nxt_master_start_worker_process(nxt_task_t *task, nxt_runtime_t *rt, + nxt_common_app_conf_t *app_conf, uint32_t stream) { - nxt_int_t ret; - nxt_uint_t n; nxt_process_init_t *init; - init = nxt_mp_get(rt->mem_pool, sizeof(nxt_process_init_t)); + init = nxt_malloc(sizeof(nxt_process_init_t) + + sizeof(nxt_user_cred_t)); if (nxt_slow_path(init == NULL)) { return NXT_ERROR; } + init->user_cred = (nxt_user_cred_t *) (init + 1); + init->start = nxt_app_start; init->name = "worker process"; - init->user_cred = &rt->user_cred; + + init->user_cred->user = (char *) app_conf->user.start; + if (nxt_user_cred_get(task, init->user_cred, + (char *) app_conf->group.start) != NXT_OK) { + return NXT_ERROR; + } + init->port_handlers = nxt_app_process_port_handlers; init->signals = nxt_worker_process_signals; init->type = NXT_PROCESS_WORKER; + init->data = app_conf; + init->stream = stream; + init->restart = 0; - n = rt->worker_processes; - - while (n-- != 0) { - ret = nxt_master_create_worker_process(task, rt, init); - - if (ret != NXT_OK) { - return ret; - } - } - - return NXT_OK; + return nxt_master_create_worker_process(task, rt, init); } @@ -214,7 +343,7 @@ nxt_master_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt, nxt_int_t ret; nxt_pid_t pid; nxt_port_t *port; - nxt_process_t *process, *master_process; + nxt_process_t *process; /* * TODO: remove process, init, ports from array on memory and fork failures. @@ -226,24 +355,20 @@ nxt_master_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt, } process->init = init; - master_process = rt->mprocess; - init->master_port = nxt_process_port_first(master_process); - port = nxt_process_port_new(process); + port = nxt_process_port_new(rt, process, 0, init->type); if (nxt_slow_path(port == NULL)) { + nxt_runtime_process_destroy(rt, process); return NXT_ERROR; } - init->port = port; - ret = nxt_port_socket_init(task, port, 0); if (nxt_slow_path(ret != NXT_OK)) { + nxt_mp_free(rt->mem_pool, port); + nxt_runtime_process_destroy(rt, process); return ret; } - port->engine = 0; - port->type = init->type; - pid = nxt_process_create(task, process); switch (pid) { @@ -261,7 +386,6 @@ nxt_master_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt, nxt_port_read_close(port); nxt_port_write_enable(task, port); - nxt_port_send_new_port(task, rt, port); return NXT_OK; } } @@ -503,7 +627,12 @@ nxt_master_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid) } } else if (init != NULL) { - (void) nxt_master_create_worker_process(task, rt, init); + if (init->restart != 0) { + (void) nxt_master_create_worker_process(task, rt, init); + + } else { + nxt_free(init); + } } } } |