diff options
author | Tiago Natel de Moura <t.nateldemoura@f5.com> | 2021-11-09 15:48:44 +0300 |
---|---|---|
committer | Tiago Natel de Moura <t.nateldemoura@f5.com> | 2021-11-09 15:48:44 +0300 |
commit | e207415a78ae67b937faf7e5bcd6e5192993180a (patch) | |
tree | 8f26521322b194af7c22134ebd8fdc393e649718 | |
parent | 1de660b6df93c09719361e364211c7c6388c01ce (diff) | |
download | unit-e207415a78ae67b937faf7e5bcd6e5192993180a.tar.gz unit-e207415a78ae67b937faf7e5bcd6e5192993180a.tar.bz2 |
Introducing application prototype processes.
-rw-r--r-- | docs/changes.xml | 6 | ||||
-rw-r--r-- | src/nxt_application.c | 451 | ||||
-rw-r--r-- | src/nxt_external.c | 10 | ||||
-rw-r--r-- | src/nxt_main_process.c | 183 | ||||
-rw-r--r-- | src/nxt_main_process.h | 1 | ||||
-rw-r--r-- | src/nxt_port.c | 30 | ||||
-rw-r--r-- | src/nxt_port.h | 3 | ||||
-rw-r--r-- | src/nxt_process.c | 272 | ||||
-rw-r--r-- | src/nxt_process.h | 11 | ||||
-rw-r--r-- | src/nxt_process_type.h | 1 | ||||
-rw-r--r-- | src/nxt_router.c | 279 | ||||
-rw-r--r-- | src/nxt_router.h | 2 | ||||
-rw-r--r-- | src/nxt_runtime.c | 19 | ||||
-rw-r--r-- | src/nxt_runtime.h | 2 | ||||
-rw-r--r-- | src/nxt_unit.c | 10 |
15 files changed, 1098 insertions, 182 deletions
diff --git a/docs/changes.xml b/docs/changes.xml index 3bf0abc2..bbcc72ad 100644 --- a/docs/changes.xml +++ b/docs/changes.xml @@ -95,6 +95,12 @@ request routing by the query string. </para> </change> +<change type="feature"> +<para> +PHP opcache is shared between application processes. +</para> +</change> + <change type="bugfix"> <para> fixed building with glibc 2.34, notably Fedora 35. diff --git a/src/nxt_application.c b/src/nxt_application.c index f42905eb..589821fb 100644 --- a/src/nxt_application.c +++ b/src/nxt_application.c @@ -42,9 +42,23 @@ static void nxt_discovery_quit(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data); static nxt_app_module_t *nxt_app_module_load(nxt_task_t *task, const char *name); +static nxt_int_t nxt_proto_setup(nxt_task_t *task, nxt_process_t *process); +static nxt_int_t nxt_proto_start(nxt_task_t *task, nxt_process_data_t *data); static nxt_int_t nxt_app_setup(nxt_task_t *task, nxt_process_t *process); static nxt_int_t nxt_app_set_environment(nxt_conf_value_t *environment); +static void nxt_proto_start_process_handler(nxt_task_t *task, + nxt_port_recv_msg_t *msg); +static void nxt_proto_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); +static void nxt_proto_process_created_handler(nxt_task_t *task, + nxt_port_recv_msg_t *msg); +static void nxt_proto_quit_children(nxt_task_t *task); +static nxt_process_t *nxt_proto_process_find(nxt_task_t *task, nxt_pid_t pid); +static void nxt_proto_process_add(nxt_task_t *task, nxt_process_t *process); +static nxt_process_t *nxt_proto_process_remove(nxt_task_t *task, nxt_pid_t pid); static u_char *nxt_cstr_dup(nxt_mp_t *mp, u_char *dst, u_char *src); +static void nxt_proto_signal_handler(nxt_task_t *task, void *obj, void *data); +static void nxt_proto_sigterm_handler(nxt_task_t *task, void *obj, void *data); +static void nxt_proto_sigchld_handler(nxt_task_t *task, void *obj, void *data); nxt_str_t nxt_server = nxt_string(NXT_SERVER); @@ -55,7 +69,12 @@ static uint32_t compat[] = { }; -static nxt_app_module_t *nxt_app; +static nxt_lvlhsh_t nxt_proto_processes; +static nxt_queue_t nxt_proto_children; +static nxt_bool_t nxt_proto_exiting; + +static nxt_app_module_t *nxt_app; +static nxt_common_app_conf_t *nxt_app_conf; static const nxt_port_handlers_t nxt_discovery_process_port_handlers = { @@ -70,6 +89,29 @@ static const nxt_port_handlers_t nxt_discovery_process_port_handlers = { }; +const nxt_sig_event_t nxt_prototype_signals[] = { + nxt_event_signal(SIGHUP, nxt_proto_signal_handler), + nxt_event_signal(SIGINT, nxt_proto_sigterm_handler), + nxt_event_signal(SIGQUIT, nxt_proto_sigterm_handler), + nxt_event_signal(SIGTERM, nxt_proto_sigterm_handler), + nxt_event_signal(SIGCHLD, nxt_proto_sigchld_handler), + nxt_event_signal_end, +}; + + +static const nxt_port_handlers_t nxt_proto_process_port_handlers = { + .quit = nxt_proto_quit_handler, + .change_file = nxt_port_change_log_file_handler, + .new_port = nxt_port_new_port_handler, + .process_created = nxt_proto_process_created_handler, + .process_ready = nxt_port_process_ready_handler, + .remove_pid = nxt_port_remove_pid_handler, + .start_process = nxt_proto_start_process_handler, + .rpc_ready = nxt_port_rpc_handler, + .rpc_error = nxt_port_rpc_handler, +}; + + static const nxt_port_handlers_t nxt_app_process_port_handlers = { .quit = nxt_signal_quit_handler, .rpc_ready = nxt_port_rpc_handler, @@ -89,12 +131,23 @@ const nxt_process_init_t nxt_discovery_process = { }; +const nxt_process_init_t nxt_proto_process = { + .type = NXT_PROCESS_PROTOTYPE, + .prefork = nxt_isolation_main_prefork, + .restart = 0, + .setup = nxt_proto_setup, + .start = nxt_proto_start, + .port_handlers = &nxt_proto_process_port_handlers, + .signals = nxt_prototype_signals, +}; + + const nxt_process_init_t nxt_app_process = { .type = NXT_PROCESS_APP, .setup = nxt_app_setup, - .prefork = nxt_isolation_main_prefork, + .start = NULL, + .prefork = NULL, .restart = 0, - .start = NULL, /* set to module->start */ .port_handlers = &nxt_app_process_port_handlers, .signals = nxt_process_signals, }; @@ -443,15 +496,18 @@ nxt_discovery_quit(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) static nxt_int_t -nxt_app_setup(nxt_task_t *task, nxt_process_t *process) +nxt_proto_setup(nxt_task_t *task, nxt_process_t *process) { nxt_int_t ret; - nxt_process_init_t *init; nxt_app_lang_module_t *lang; nxt_common_app_conf_t *app_conf; app_conf = process->data.app; + nxt_queue_init(&nxt_proto_children); + + nxt_app_conf = app_conf; + lang = nxt_app_lang_module(task->thread->runtime, &app_conf->type); if (nxt_slow_path(lang == NULL)) { nxt_alert(task, "unknown application type: \"%V\"", &app_conf->type); @@ -479,7 +535,6 @@ nxt_app_setup(nxt_task_t *task, nxt_process_t *process) if (nxt_app->setup != NULL) { ret = nxt_app->setup(task, process, app_conf); - if (nxt_slow_path(ret != NXT_OK)) { return ret; } @@ -514,13 +569,251 @@ nxt_app_setup(nxt_task_t *task, nxt_process_t *process) } } + process->state = NXT_PROCESS_STATE_CREATED; + + return NXT_OK; +} + + +static nxt_int_t +nxt_proto_start(nxt_task_t *task, nxt_process_data_t *data) +{ + nxt_debug(task, "prototype waiting for clone messages"); + + return NXT_OK; +} + + +static void +nxt_proto_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + u_char *p; + nxt_int_t ret; + nxt_port_t *port; + nxt_runtime_t *rt; + nxt_process_t *process; + nxt_process_init_t *init; + + rt = task->thread->runtime; + + process = nxt_process_new(rt); + if (nxt_slow_path(process == NULL)) { + goto failed; + } + + process->mem_pool = nxt_mp_create(1024, 128, 256, 32); + if (nxt_slow_path(process->mem_pool == NULL)) { + nxt_process_use(task, process, -1); + goto failed; + } + + process->parent_port = rt->port_by_type[NXT_PROCESS_PROTOTYPE]; + init = nxt_process_init(process); + *init = nxt_app_process; + + process->name = nxt_mp_alloc(process->mem_pool, nxt_app_conf->name.length + + sizeof("\"\" application") + 1); + + if (nxt_slow_path(process->name == NULL)) { + nxt_process_use(task, process, -1); + + goto failed; + } init->start = nxt_app->start; + init->name = (const char *) nxt_app_conf->name.start; + + p = (u_char *) process->name; + *p++ = '"'; + p = nxt_cpymem(p, nxt_app_conf->name.start, nxt_app_conf->name.length); + p = nxt_cpymem(p, "\" application", 13); + *p = '\0'; + + process->user_cred = &rt->user_cred; + + process->data.app = nxt_app_conf; + process->stream = msg->port_msg.stream; + + ret = nxt_process_start(task, process); + if (nxt_slow_path(ret == NXT_ERROR)) { + nxt_process_use(task, process, -1); + + goto failed; + } + + nxt_proto_process_add(task, process); + + return; + +failed: + + port = nxt_runtime_port_find(rt, msg->port_msg.pid, + msg->port_msg.reply_port); + + if (nxt_fast_path(port != NULL)) { + nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, + -1, msg->port_msg.stream, 0, NULL); + } +} + + +static void +nxt_proto_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + nxt_debug(task, "prototype quit handler"); + + nxt_proto_quit_children(task); + + nxt_proto_exiting = 1; + + if (nxt_queue_is_empty(&nxt_proto_children)) { + nxt_process_quit(task, 0); + } +} + + +static void +nxt_proto_quit_children(nxt_task_t *task) +{ + nxt_port_t *port; + nxt_process_t *process; + + nxt_queue_each(process, &nxt_proto_children, nxt_process_t, link) { + port = nxt_process_port_first(process); + + (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, + -1, 0, 0, NULL); + } + nxt_queue_loop; +} + + +static void +nxt_proto_process_created_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + nxt_pid_t isolated_pid, pid; + nxt_process_t *process; + + isolated_pid = nxt_recv_msg_cmsg_pid(msg); + + process = nxt_proto_process_find(task, isolated_pid); + if (nxt_slow_path(process == NULL)) { + return; + } + process->state = NXT_PROCESS_STATE_CREATED; - return NXT_OK; + pid = msg->port_msg.pid; + + if (process->pid != pid) { + nxt_debug(task, "app process %PI (aka %PI) is created", isolated_pid, + pid); + + nxt_runtime_process_remove(task->thread->runtime, process); + + process->pid = pid; + + nxt_runtime_process_add(task, process); + + } else { + nxt_debug(task, "app process %PI is created", isolated_pid); + } +} + + +static void +nxt_proto_signal_handler(nxt_task_t *task, void *obj, void *data) +{ + nxt_trace(task, "signal signo:%d (%s) received, ignored", + (int) (uintptr_t) obj, data); +} + + +static void +nxt_proto_sigterm_handler(nxt_task_t *task, void *obj, void *data) +{ + nxt_trace(task, "signal signo:%d (%s) received", + (int) (uintptr_t) obj, data); + + nxt_proto_quit_children(task); + + nxt_proto_exiting = 1; + + if (nxt_queue_is_empty(&nxt_proto_children)) { + nxt_process_quit(task, 0); + } +} + + +static void +nxt_proto_sigchld_handler(nxt_task_t *task, void *obj, void *data) +{ + int status; + nxt_err_t err; + nxt_pid_t pid; + nxt_process_t *process; + + nxt_debug(task, "proto sigchld handler signo:%d (%s)", + (int) (uintptr_t) obj, data); + + for ( ;; ) { + pid = waitpid(-1, &status, WNOHANG); + + if (pid == -1) { + + switch (err = nxt_errno) { + + case NXT_ECHILD: + return; + + case NXT_EINTR: + continue; + + default: + nxt_alert(task, "waitpid() failed: %E", err); + return; + } + } + + nxt_debug(task, "waitpid(): %PI", pid); + + if (pid == 0) { + return; + } + + if (WTERMSIG(status)) { +#ifdef WCOREDUMP + nxt_alert(task, "app process (isolated %PI) exited on signal %d%s", + pid, WTERMSIG(status), + WCOREDUMP(status) ? " (core dumped)" : ""); +#else + nxt_alert(task, "app process (isolated %PI) exited on signal %d", + pid, WTERMSIG(status)); +#endif + + } else { + nxt_trace(task, "app process (isolated %PI) exited with code %d", + pid, WEXITSTATUS(status)); + } + + process = nxt_proto_process_remove(task, pid); + if (process == NULL) { + continue; + } + + if (process->state != NXT_PROCESS_STATE_CREATING) { + nxt_port_remove_notify_others(task, process); + } + + nxt_process_close_ports(task, process); + + if (nxt_proto_exiting && nxt_queue_is_empty(&nxt_proto_children)) { + nxt_process_quit(task, 0); + return; + } + } } @@ -619,6 +912,19 @@ nxt_cstr_dup(nxt_mp_t *mp, u_char *dst, u_char *src) } +static nxt_int_t +nxt_app_setup(nxt_task_t *task, nxt_process_t *process) +{ + nxt_process_init_t *init; + + process->state = NXT_PROCESS_STATE_CREATED; + + init = nxt_process_init(process); + + return init->start(task, &process->data); +} + + nxt_app_lang_module_t * nxt_app_lang_module(nxt_runtime_t *rt, nxt_str_t *name) { @@ -707,15 +1013,15 @@ nxt_int_t nxt_unit_default_init(nxt_task_t *task, nxt_unit_init_t *init, nxt_common_app_conf_t *conf) { - nxt_port_t *my_port, *main_port, *router_port; + nxt_port_t *my_port, *proto_port, *router_port; nxt_runtime_t *rt; nxt_memzero(init, sizeof(nxt_unit_init_t)); rt = task->thread->runtime; - main_port = rt->port_by_type[NXT_PROCESS_MAIN]; - if (nxt_slow_path(main_port == NULL)) { + proto_port = rt->port_by_type[NXT_PROCESS_PROTOTYPE]; + if (nxt_slow_path(proto_port == NULL)) { return NXT_ERROR; } @@ -729,10 +1035,10 @@ nxt_unit_default_init(nxt_task_t *task, nxt_unit_init_t *init, return NXT_ERROR; } - init->ready_port.id.pid = main_port->pid; - init->ready_port.id.id = main_port->id; + init->ready_port.id.pid = proto_port->pid; + init->ready_port.id.id = proto_port->id; init->ready_port.in_fd = -1; - init->ready_port.out_fd = main_port->pair[1]; + init->ready_port.out_fd = proto_port->pair[1]; init->ready_stream = my_port->process->stream; @@ -753,3 +1059,122 @@ nxt_unit_default_init(nxt_task_t *task, nxt_unit_init_t *init, return NXT_OK; } + + +static nxt_int_t +nxt_proto_lvlhsh_isolated_pid_test(nxt_lvlhsh_query_t *lhq, void *data) +{ + nxt_pid_t *qpid; + nxt_process_t *process; + + process = data; + qpid = (nxt_pid_t *) lhq->key.start; + + if (*qpid == process->isolated_pid) { + return NXT_OK; + } + + return NXT_DECLINED; +} + + +static const nxt_lvlhsh_proto_t lvlhsh_processes_proto nxt_aligned(64) = { + NXT_LVLHSH_DEFAULT, + nxt_proto_lvlhsh_isolated_pid_test, + nxt_lvlhsh_alloc, + nxt_lvlhsh_free, +}; + + +nxt_inline void +nxt_proto_process_lhq_pid(nxt_lvlhsh_query_t *lhq, nxt_pid_t *pid) +{ + lhq->key_hash = nxt_murmur_hash2(pid, sizeof(nxt_pid_t)); + lhq->key.length = sizeof(nxt_pid_t); + lhq->key.start = (u_char *) pid; + lhq->proto = &lvlhsh_processes_proto; +} + + +static void +nxt_proto_process_add(nxt_task_t *task, nxt_process_t *process) +{ + nxt_runtime_t *rt; + nxt_lvlhsh_query_t lhq; + + rt = task->thread->runtime; + + nxt_proto_process_lhq_pid(&lhq, &process->isolated_pid); + + lhq.replace = 0; + lhq.value = process; + lhq.pool = rt->mem_pool; + + switch (nxt_lvlhsh_insert(&nxt_proto_processes, &lhq)) { + + case NXT_OK: + nxt_debug(task, "process (isolated %PI) added", process->isolated_pid); + + nxt_queue_insert_tail(&nxt_proto_children, &process->link); + break; + + default: + nxt_debug(task, "process (isolated %PI) failed to add", + process->isolated_pid); + break; + } +} + + +static nxt_process_t * +nxt_proto_process_remove(nxt_task_t *task, nxt_pid_t pid) +{ + nxt_runtime_t *rt; + nxt_process_t *process; + nxt_lvlhsh_query_t lhq; + + nxt_proto_process_lhq_pid(&lhq, &pid); + + rt = task->thread->runtime; + + lhq.pool = rt->mem_pool; + + switch (nxt_lvlhsh_delete(&nxt_proto_processes, &lhq)) { + + case NXT_OK: + nxt_debug(task, "process (isolated %PI) removed", pid); + + process = lhq.value; + + nxt_queue_remove(&process->link); + break; + + default: + nxt_debug(task, "process (isolated %PI) remove failed", pid); + process = NULL; + break; + } + + return process; +} + + +static nxt_process_t * +nxt_proto_process_find(nxt_task_t *task, nxt_pid_t pid) +{ + nxt_process_t *process; + nxt_lvlhsh_query_t lhq; + + nxt_proto_process_lhq_pid(&lhq, &pid); + + if (nxt_lvlhsh_find(&nxt_proto_processes, &lhq) == NXT_OK) { + process = lhq.value; + + } else { + nxt_debug(task, "process (isolated %PI) not found", pid); + + process = NULL; + } + + return process; +} diff --git a/src/nxt_external.c b/src/nxt_external.c index 1275aa87..b41ca51b 100644 --- a/src/nxt_external.c +++ b/src/nxt_external.c @@ -67,7 +67,7 @@ nxt_external_start(nxt_task_t *task, nxt_process_data_t *data) nxt_str_t str; nxt_int_t rc; nxt_uint_t i, argc; - nxt_port_t *my_port, *main_port, *router_port; + nxt_port_t *my_port, *proto_port, *router_port; nxt_runtime_t *rt; nxt_conf_value_t *value; nxt_common_app_conf_t *conf; @@ -76,17 +76,17 @@ nxt_external_start(nxt_task_t *task, nxt_process_data_t *data) rt = task->thread->runtime; conf = data->app; - main_port = rt->port_by_type[NXT_PROCESS_MAIN]; + proto_port = rt->port_by_type[NXT_PROCESS_PROTOTYPE]; router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; my_port = nxt_runtime_port_find(rt, nxt_pid, 0); - if (nxt_slow_path(main_port == NULL || my_port == NULL + if (nxt_slow_path(proto_port == NULL || my_port == NULL || router_port == NULL)) { return NXT_ERROR; } - rc = nxt_external_fd_no_cloexec(task, main_port->pair[1]); + rc = nxt_external_fd_no_cloexec(task, proto_port->pair[1]); if (nxt_slow_path(rc != NXT_OK)) { return NXT_ERROR; } @@ -115,7 +115,7 @@ nxt_external_start(nxt_task_t *task, nxt_process_data_t *data) "%PI,%ud,%d,%d;" "%d,%z,%uD,%Z", NXT_VERSION, my_port->process->stream, - main_port->pid, main_port->id, main_port->pair[1], + proto_port->pid, proto_port->id, proto_port->pair[1], router_port->pid, router_port->id, router_port->pair[1], my_port->pid, my_port->id, my_port->pair[0], my_port->pair[1], diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c index eff96e14..a5a20d3d 100644 --- a/src/nxt_main_process.c +++ b/src/nxt_main_process.c @@ -10,6 +10,7 @@ #include <nxt_main_process.h> #include <nxt_conf.h> #include <nxt_router.h> +#include <nxt_port_queue.h> #if (NXT_TLS) #include <nxt_cert.h> #endif @@ -52,6 +53,8 @@ static nxt_int_t nxt_main_listening_socket(nxt_sockaddr_t *sa, static void nxt_main_port_modules_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); static int nxt_cdecl nxt_app_lang_compare(const void *v1, const void *v2); +static void nxt_main_process_whoami_handler(nxt_task_t *task, + nxt_port_recv_msg_t *msg); static void nxt_main_port_conf_store_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); static nxt_int_t nxt_main_file_store(nxt_task_t *task, const char *tmp_name, @@ -326,7 +329,7 @@ static nxt_conf_app_map_t nxt_app_maps[] = { static void -nxt_port_main_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +nxt_main_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { nxt_debug(task, "main data: %*s", nxt_buf_mem_used_size(&msg->buf->mem), msg->buf->mem.pos); @@ -334,7 +337,33 @@ nxt_port_main_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) static void -nxt_port_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +nxt_main_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + void *mem; + nxt_port_t *port; + + nxt_port_new_port_handler(task, msg); + + port = msg->u.new_port; + + if (port != NULL + && port->type == NXT_PROCESS_APP + && msg->fd[1] != -1) + { + mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t), + PROT_READ | PROT_WRITE, MAP_SHARED, msg->fd[1], 0); + if (nxt_fast_path(mem != MAP_FAILED)) { + port->queue = mem; + } + + nxt_fd_close(msg->fd[1]); + msg->fd[1] = -1; + } +} + + +static void +nxt_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { u_char *start, *p, ch; size_t type_len; @@ -374,16 +403,18 @@ nxt_port_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) return; } + process->parent_port = rt->port_by_type[NXT_PROCESS_MAIN]; + init = nxt_process_init(process); - *init = nxt_app_process; + *init = nxt_proto_process; b = nxt_buf_chk_make_plain(process->mem_pool, msg->buf, msg->size); if (b == NULL) { goto failed; } - nxt_debug(task, "main start process: %*s", b->mem.free - b->mem.pos, + nxt_debug(task, "main start prototype: %*s", b->mem.free - b->mem.pos, b->mem.pos); app_conf = nxt_mp_zalloc(process->mem_pool, sizeof(nxt_common_app_conf_t)); @@ -399,7 +430,7 @@ nxt_port_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) init->name = (const char *) start; process->name = nxt_mp_alloc(process->mem_pool, app_conf->name.length - + sizeof("\"\" application") + 1); + + sizeof("\"\" prototype") + 1); if (nxt_slow_path(process->name == NULL)) { goto failed; @@ -408,7 +439,7 @@ nxt_port_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) p = (u_char *) process->name; *p++ = '"'; p = nxt_cpymem(p, init->name, app_conf->name.length); - p = nxt_cpymem(p, "\" application", 13); + p = nxt_cpymem(p, "\" prototype", 11); *p = '\0'; app_conf->shm_limit = 100 * 1024 * 1024; @@ -504,21 +535,17 @@ nxt_main_process_created_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) rt = task->thread->runtime; - process = nxt_runtime_process_find(rt, msg->port_msg.pid); - if (nxt_slow_path(process == NULL)) { - return; - } - - nxt_assert(process->state == NXT_PROCESS_STATE_CREATING); - port = nxt_runtime_port_find(rt, msg->port_msg.pid, msg->port_msg.reply_port); - - if (nxt_slow_path(port == NULL)) { return; } + process = port->process; + + nxt_assert(process != NULL); + nxt_assert(process->state == NXT_PROCESS_STATE_CREATING); + #if (NXT_HAVE_CLONE && NXT_HAVE_CLONE_NEWUSER) if (nxt_is_clone_flag_set(process->isolation.clone.flags, NEWUSER)) { if (nxt_slow_path(nxt_clone_credential_map(task, process->pid, @@ -542,10 +569,13 @@ nxt_main_process_created_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) static nxt_port_handlers_t nxt_main_process_port_handlers = { - .data = nxt_port_main_data_handler, + .data = nxt_main_data_handler, + .new_port = nxt_main_new_port_handler, .process_created = nxt_main_process_created_handler, .process_ready = nxt_port_process_ready_handler, - .start_process = nxt_port_main_start_process_handler, + .whoami = nxt_main_process_whoami_handler, + .remove_pid = nxt_port_remove_pid_handler, + .start_process = nxt_main_start_process_handler, .socket = nxt_main_port_socket_handler, .modules = nxt_main_port_modules_handler, .conf_store = nxt_main_port_conf_store_handler, @@ -559,6 +589,88 @@ static nxt_port_handlers_t nxt_main_process_port_handlers = { }; +static void +nxt_main_process_whoami_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + nxt_buf_t *buf; + nxt_pid_t pid, ppid; + nxt_port_t *port; + nxt_runtime_t *rt; + nxt_process_t *pprocess; + + nxt_assert(msg->port_msg.reply_port == 0); + + if (nxt_slow_path(msg->buf == NULL + || nxt_buf_used_size(msg->buf) != sizeof(nxt_pid_t))) + { + nxt_alert(task, "whoami: buffer is NULL or unexpected size"); + goto fail; + } + + nxt_memcpy(&ppid, msg->buf->mem.pos, sizeof(nxt_pid_t)); + + rt = task->thread->runtime; + + pprocess = nxt_runtime_process_find(rt, ppid); + if (nxt_slow_path(pprocess == NULL)) { + nxt_alert(task, "whoami: parent process %PI not found", ppid); + goto fail; + } + + pid = nxt_recv_msg_cmsg_pid(msg); + + nxt_debug(task, "whoami: from %PI, parent %PI, fd %d", pid, ppid, + msg->fd[0]); + + if (msg->fd[0] != -1) { + port = nxt_runtime_process_port_create(task, rt, pid, 0, + NXT_PROCESS_APP); + if (nxt_slow_path(port == NULL)) { + goto fail; + } + + nxt_fd_nonblocking(task, msg->fd[0]); + + port->pair[0] = -1; + port->pair[1] = msg->fd[0]; + msg->fd[0] = -1; + + port->max_size = 16 * 1024; + port->max_share = 64 * 1024; + port->socket.task = task; + + nxt_port_write_enable(task, port); + + } else { + port = nxt_runtime_port_find(rt, pid, 0); + if (nxt_slow_path(port == NULL)) { + goto fail; + } + } + + if (ppid != nxt_pid) { + nxt_queue_insert_tail(&pprocess->children, &port->process->link); + } + + buf = nxt_buf_mem_alloc(task->thread->engine->mem_pool, + sizeof(nxt_pid_t), 0); + if (nxt_slow_path(buf == NULL)) { + goto fail; + } + + buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(nxt_pid_t)); + + (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_READY_LAST, -1, + msg->port_msg.stream, 0, buf); + +fail: + + if (msg->fd[0] != -1) { + nxt_fd_close(msg->fd[0]); + } +} + + static nxt_int_t nxt_main_process_port_create(nxt_task_t *task, nxt_runtime_t *rt) { @@ -751,8 +863,10 @@ nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data) nxt_int_t ret; nxt_err_t err; nxt_pid_t pid; + nxt_port_t *port; + nxt_queue_t children; nxt_runtime_t *rt; - nxt_process_t *process; + nxt_process_t *process, *child; nxt_process_init_t init; nxt_debug(task, "sigchld handler signo:%d (%s)", @@ -809,7 +923,31 @@ nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data) process->stream = 0; } + nxt_queue_init(&children); + + if (!nxt_queue_is_empty(&process->children)) { + nxt_queue_add(&children, &process->children); + + nxt_queue_init(&process->children); + + nxt_queue_each(child, &children, nxt_process_t, link) { + port = nxt_process_port_first(child); + + (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, + -1, 0, 0, NULL); + } nxt_queue_loop; + } + if (nxt_exiting) { + nxt_process_close_ports(task, process); + + nxt_queue_each(child, &children, nxt_process_t, link) { + nxt_queue_remove(&child->link); + child->link.next = NULL; + + nxt_process_close_ports(task, child); + } nxt_queue_loop; + if (rt->nprocesses <= 1) { nxt_runtime_quit(task, 0); } @@ -819,6 +957,15 @@ nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data) nxt_port_remove_notify_others(task, process); + nxt_queue_each(child, &children, nxt_process_t, link) { + nxt_port_remove_notify_others(task, child); + + nxt_queue_remove(&child->link); + child->link.next = NULL; + + nxt_process_close_ports(task, child); + } nxt_queue_loop; + init = *(nxt_process_init_t *) nxt_process_init(process); nxt_process_close_ports(task, process); diff --git a/src/nxt_main_process.h b/src/nxt_main_process.h index 80429b6c..ef083d63 100644 --- a/src/nxt_main_process.h +++ b/src/nxt_main_process.h @@ -27,6 +27,7 @@ NXT_EXPORT extern nxt_uint_t nxt_conf_ver; NXT_EXPORT extern const nxt_process_init_t nxt_discovery_process; NXT_EXPORT extern const nxt_process_init_t nxt_controller_process; NXT_EXPORT extern const nxt_process_init_t nxt_router_process; +NXT_EXPORT extern const nxt_process_init_t nxt_proto_process; NXT_EXPORT extern const nxt_process_init_t nxt_app_process; extern const nxt_sig_event_t nxt_main_process_signals[]; diff --git a/src/nxt_port.c b/src/nxt_port.c index d6b33f71..1e8fa28a 100644 --- a/src/nxt_port.c +++ b/src/nxt_port.c @@ -12,6 +12,8 @@ #include <nxt_port_queue.h> +static void nxt_port_remove_pid(nxt_task_t *task, nxt_port_recv_msg_t *msg, + nxt_pid_t pid); static void nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); static nxt_atomic_uint_t nxt_port_last_id = 1; @@ -274,6 +276,8 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_debug(task, "port %PI:%d already exists", new_port_msg->pid, new_port_msg->id); + msg->u.new_port = port; + nxt_fd_close(msg->fd[0]); msg->fd[0] = -1; return; @@ -384,14 +388,13 @@ nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt, nxt_uint_t slot, port = nxt_process_port_first(process); - b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, - sizeof(nxt_port_data_t)); + b = nxt_buf_mem_alloc(task->thread->engine->mem_pool, + sizeof(nxt_uint_t), 0); if (nxt_slow_path(b == NULL)) { continue; } - *(nxt_uint_t *) b->mem.pos = slot; - b->mem.free += sizeof(nxt_uint_t); + b->mem.free = nxt_cpymem(b->mem.free, &slot, sizeof(nxt_uint_t)); (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_CHANGE_FILE, fd, 0, 0, b); @@ -497,16 +500,25 @@ nxt_port_remove_notify_others(nxt_task_t *task, nxt_process_t *process) void nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { - nxt_buf_t *buf; - nxt_pid_t pid; - nxt_runtime_t *rt; - nxt_process_t *process; + nxt_pid_t pid; + nxt_buf_t *buf; buf = msg->buf; nxt_assert(nxt_buf_used_size(buf) == sizeof(pid)); - nxt_memcpy(&pid, buf->mem.pos, sizeof(pid)); + nxt_memcpy(&pid, buf->mem.pos, sizeof(nxt_pid_t)); + + nxt_port_remove_pid(task, msg, pid); +} + + +static void +nxt_port_remove_pid(nxt_task_t *task, nxt_port_recv_msg_t *msg, + nxt_pid_t pid) +{ + nxt_runtime_t *rt; + nxt_process_t *process; msg->u.removed_pid = pid; diff --git a/src/nxt_port.h b/src/nxt_port.h index 1f24c5da..3b66edfd 100644 --- a/src/nxt_port.h +++ b/src/nxt_port.h @@ -33,6 +33,7 @@ struct nxt_port_handlers_s { /* New process */ nxt_port_handler_t process_created; nxt_port_handler_t process_ready; + nxt_port_handler_t whoami; /* Process exit/crash notification. */ nxt_port_handler_t remove_pid; @@ -92,6 +93,7 @@ typedef enum { _NXT_PORT_MSG_PROCESS_CREATED = nxt_port_handler_idx(process_created), _NXT_PORT_MSG_PROCESS_READY = nxt_port_handler_idx(process_ready), + _NXT_PORT_MSG_WHOAMI = nxt_port_handler_idx(whoami), _NXT_PORT_MSG_REMOVE_PID = nxt_port_handler_idx(remove_pid), _NXT_PORT_MSG_QUIT = nxt_port_handler_idx(quit), @@ -131,6 +133,7 @@ typedef enum { NXT_PORT_MSG_PROCESS_CREATED = nxt_msg_last(_NXT_PORT_MSG_PROCESS_CREATED), NXT_PORT_MSG_PROCESS_READY = nxt_msg_last(_NXT_PORT_MSG_PROCESS_READY), + NXT_PORT_MSG_WHOAMI = nxt_msg_last(_NXT_PORT_MSG_WHOAMI), NXT_PORT_MSG_QUIT = nxt_msg_last(_NXT_PORT_MSG_QUIT), NXT_PORT_MSG_REMOVE_PID = nxt_msg_last(_NXT_PORT_MSG_REMOVE_PID), diff --git a/src/nxt_process.c b/src/nxt_process.c index 59dd9180..fca197eb 100644 --- a/src/nxt_process.c +++ b/src/nxt_process.c @@ -5,7 +5,6 @@ */ #include <nxt_main.h> -#include <nxt_main_process.h> #if (NXT_HAVE_CLONE) #include <nxt_clone.h> @@ -17,10 +16,26 @@ #include <sys/prctl.h> #endif + +#if (NXT_HAVE_CLONE) && (NXT_HAVE_CLONE_NEWPID) +#define nxt_is_pid_isolated(process) \ + nxt_is_clone_flag_set(process->isolation.clone.flags, NEWPID) +#else +#define nxt_is_pid_isolated(process) \ + (0) +#endif + + static nxt_pid_t nxt_process_create(nxt_task_t *task, nxt_process_t *process); +static nxt_int_t nxt_process_do_start(nxt_task_t *task, nxt_process_t *process); +static nxt_int_t nxt_process_whoami(nxt_task_t *task, nxt_process_t *process); static nxt_int_t nxt_process_setup(nxt_task_t *task, nxt_process_t *process); static nxt_int_t nxt_process_child_fixup(nxt_task_t *task, nxt_process_t *process); +static void nxt_process_whoami_ok(nxt_task_t *task, nxt_port_recv_msg_t *msg, + void *data); +static void nxt_process_whoami_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, + void *data); static nxt_int_t nxt_process_send_created(nxt_task_t *task, nxt_process_t *process); static nxt_int_t nxt_process_send_ready(nxt_task_t *task, @@ -44,19 +59,28 @@ nxt_uid_t nxt_euid; nxt_gid_t nxt_egid; nxt_bool_t nxt_proc_conn_matrix[NXT_PROCESS_MAX][NXT_PROCESS_MAX] = { - { 1, 1, 1, 1, 1 }, - { 1, 0, 0, 0, 0 }, - { 1, 0, 0, 1, 0 }, - { 1, 0, 1, 0, 1 }, - { 1, 0, 0, 1, 0 }, + { 1, 1, 1, 1, 1, 1 }, + { 1, 0, 0, 0, 0, 0 }, + { 1, 0, 0, 1, 0, 0 }, + { 1, 0, 1, 1, 1, 1 }, + { 1, 0, 0, 1, 0, 0 }, + { 1, 0, 0, 1, 0, 0 }, }; nxt_bool_t nxt_proc_remove_notify_matrix[NXT_PROCESS_MAX][NXT_PROCESS_MAX] = { - { 0, 0, 0, 0, 0 }, - { 0, 0, 0, 0, 0 }, - { 0, 0, 0, 1, 0 }, - { 0, 0, 1, 0, 1 }, - { 0, 0, 0, 1, 0 }, + { 0, 0, 0, 0, 0, 0 }, + { 0, 0, 0, 0, 0, 0 }, + { 0, 0, 0, 1, 0, 0 }, + { 0, 0, 1, 0, 1, 1 }, + { 0, 0, 0, 1, 0, 0 }, + { 1, 0, 0, 1, 0, 0 }, +}; + + +static const nxt_port_handlers_t nxt_process_whoami_port_handlers = { + .quit = nxt_signal_quit_handler, + .rpc_ready = nxt_port_rpc_handler, + .rpc_error = nxt_port_rpc_handler, }; @@ -78,6 +102,8 @@ nxt_process_new(nxt_runtime_t *rt) process->use_count = 1; + nxt_queue_init(&process->children); + return process; } @@ -108,6 +134,8 @@ nxt_process_init_start(nxt_task_t *task, nxt_process_init_t init) return NXT_ERROR; } + process->parent_port = rt->port_by_type[rt->type]; + process->name = init.name; process->user_cred = &rt->user_cred; @@ -177,7 +205,7 @@ nxt_process_start(nxt_task_t *task, nxt_process_t *process) break; default: - /* The main process created a new process. */ + /* The parent process created a new process. */ nxt_process_use(task, process, -1); @@ -216,41 +244,16 @@ nxt_process_child_fixup(nxt_task_t *task, nxt_process_t *process) init = nxt_process_init(process); + nxt_ppid = nxt_pid; + nxt_pid = nxt_getpid(); process->pid = nxt_pid; + process->isolated_pid = nxt_pid; /* Clean inherited cached thread tid. */ task->thread->tid = 0; -#if (NXT_HAVE_CLONE && NXT_HAVE_CLONE_NEWPID) - if (nxt_is_clone_flag_set(process->isolation.clone.flags, NEWPID)) { - ssize_t pidsz; - char procpid[10]; - - nxt_debug(task, "%s isolated pid is %d", process->name, nxt_pid); - - pidsz = readlink("/proc/self", procpid, sizeof(procpid)); - - if (nxt_slow_path(pidsz < 0 || pidsz >= (ssize_t) sizeof(procpid))) { - nxt_alert(task, "failed to read real pid from /proc/self"); - return NXT_ERROR; - } - - procpid[pidsz] = '\0'; - - nxt_pid = (nxt_pid_t) strtol(procpid, NULL, 10); - - nxt_assert(nxt_pid > 0 && nxt_errno != ERANGE); - - process->pid = nxt_pid; - task->thread->tid = nxt_pid; - - nxt_debug(task, "%s real pid is %d", process->name, nxt_pid); - } - -#endif - ptype = init->type; nxt_port_reset_next_id(); @@ -262,7 +265,9 @@ nxt_process_child_fixup(nxt_task_t *task, nxt_process_t *process) /* Remove not ready processes. */ nxt_runtime_process_each(rt, p) { - if (nxt_proc_conn_matrix[ptype][nxt_process_type(p)] == 0) { + if (nxt_proc_conn_matrix[ptype][nxt_process_type(p)] == 0 + && p->pid != nxt_ppid) /* Always keep parent's port. */ + { nxt_debug(task, "remove not required process %PI", p->pid); nxt_process_close_ports(task, p); @@ -315,9 +320,8 @@ nxt_process_create(nxt_task_t *task, nxt_process_t *process) return -1; } - nxt_runtime_process_add(task, process); - - if (nxt_slow_path(nxt_process_setup(task, process) != NXT_OK)) { + ret = nxt_process_setup(task, process); + if (nxt_slow_path(ret != NXT_OK)) { nxt_process_quit(task, 1); } @@ -337,6 +341,7 @@ nxt_process_create(nxt_task_t *task, nxt_process_t *process) #endif process->pid = pid; + process->isolated_pid = pid; nxt_runtime_process_add(task, process); @@ -348,7 +353,6 @@ static nxt_int_t nxt_process_setup(nxt_task_t *task, nxt_process_t *process) { nxt_int_t ret; - nxt_port_t *port, *main_port; nxt_thread_t *thread; nxt_runtime_t *rt; nxt_process_init_t *init; @@ -388,17 +392,45 @@ nxt_process_setup(nxt_task_t *task, nxt_process_t *process) return NXT_ERROR; } - main_port = rt->port_by_type[NXT_PROCESS_MAIN]; + nxt_port_read_close(process->parent_port); + nxt_port_write_enable(task, process->parent_port); + + /* + * If the parent process is already isolated, rt->pid_isolation is already + * set to 1 at this point. + */ + if (nxt_is_pid_isolated(process)) { + rt->is_pid_isolated = 1; + } + + if (rt->is_pid_isolated + || process->parent_port != rt->port_by_type[NXT_PROCESS_MAIN]) + { + ret = nxt_process_whoami(task, process); + + } else { + ret = nxt_process_do_start(task, process); + } + + return ret; +} + + +static nxt_int_t +nxt_process_do_start(nxt_task_t *task, nxt_process_t *process) +{ + nxt_int_t ret; + nxt_port_t *port; + nxt_process_init_t *init; - nxt_port_read_close(main_port); - nxt_port_write_enable(task, main_port); + nxt_runtime_process_add(task, process); + init = nxt_process_init(process); port = nxt_process_port_first(process); nxt_port_enable(task, port, init->port_handlers); ret = init->setup(task, process); - if (nxt_slow_path(ret != NXT_OK)) { return NXT_ERROR; } @@ -435,6 +467,113 @@ nxt_process_setup(nxt_task_t *task, nxt_process_t *process) static nxt_int_t +nxt_process_whoami(nxt_task_t *task, nxt_process_t *process) +{ + uint32_t stream; + nxt_fd_t fd; + nxt_buf_t *buf; + nxt_int_t ret; + nxt_port_t *my_port, *main_port; + nxt_runtime_t *rt; + + rt = task->thread->runtime; + + my_port = nxt_process_port_first(process); + main_port = rt->port_by_type[NXT_PROCESS_MAIN]; + + nxt_assert(my_port != NULL && main_port != NULL); + + nxt_port_enable(task, my_port, &nxt_process_whoami_port_handlers); + + buf = nxt_buf_mem_alloc(main_port->mem_pool, sizeof(nxt_pid_t), 0); + if (nxt_slow_path(buf == NULL)) { + return NXT_ERROR; + } + + buf->mem.free = nxt_cpymem(buf->mem.free, &nxt_ppid, sizeof(nxt_pid_t)); + + stream = nxt_port_rpc_register_handler(task, my_port, + nxt_process_whoami_ok, + nxt_process_whoami_error, + main_port->pid, process); + if (nxt_slow_path(stream == 0)) { + nxt_mp_free(main_port->mem_pool, buf); + + return NXT_ERROR; + } + + fd = (process->parent_port != main_port) ? my_port->pair[1] : -1; + + ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_WHOAMI, + fd, stream, my_port->id, buf); + + if (nxt_slow_path(ret != NXT_OK)) { + nxt_alert(task, "%s failed to send WHOAMI message", process->name); + nxt_port_rpc_cancel(task, my_port, stream); + nxt_mp_free(main_port->mem_pool, buf); + + return NXT_ERROR; + } + + return NXT_OK; +} + + +static void +nxt_process_whoami_ok(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) +{ + nxt_pid_t pid, isolated_pid; + nxt_buf_t *buf; + nxt_port_t *port; + nxt_process_t *process; + nxt_runtime_t *rt; + + process = data; + + buf = msg->buf; + + nxt_assert(nxt_buf_used_size(buf) == sizeof(nxt_pid_t)); + + nxt_memcpy(&pid, buf->mem.pos, sizeof(nxt_pid_t)); + + isolated_pid = nxt_pid; + + if (isolated_pid != pid) { + nxt_pid = pid; + process->pid = pid; + + nxt_process_port_each(process, port) { + port->pid = pid; + } nxt_process_port_loop; + } + + rt = task->thread->runtime; + + if (process->parent_port != rt->port_by_type[NXT_PROCESS_MAIN]) { + port = process->parent_port; + + (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_PROCESS_CREATED, + -1, 0, 0, NULL); + + nxt_log(task, NXT_LOG_INFO, "%s started", process->name); + } + + if (nxt_slow_path(nxt_process_do_start(task, process) != NXT_OK)) { + nxt_process_quit(task, 1); + } +} + + +static void +nxt_process_whoami_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) +{ + nxt_alert(task, "WHOAMI error"); + + nxt_process_quit(task, 1); +} + + +static nxt_int_t nxt_process_send_created(nxt_task_t *task, nxt_process_t *process) { uint32_t stream; @@ -483,6 +622,9 @@ nxt_process_created_ok(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) nxt_process_init_t *init; process = data; + + process->state = NXT_PROCESS_STATE_READY; + init = nxt_process_init(process); ret = nxt_process_apply_creds(task, process); @@ -492,11 +634,23 @@ nxt_process_created_ok(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) nxt_log(task, NXT_LOG_INFO, "%s started", process->name); + ret = nxt_process_send_ready(task, process); + if (nxt_slow_path(ret != NXT_OK)) { + goto fail; + } + ret = init->start(task, &process->data); -fail: + if (nxt_process_type(process) != NXT_PROCESS_PROTOTYPE) { + nxt_port_write_close(nxt_process_port_first(process)); + } - nxt_process_quit(task, ret == NXT_OK ? 0 : 1); + if (nxt_fast_path(ret == NXT_OK)) { + return; + } + +fail: + nxt_process_quit(task, 1); } @@ -584,7 +738,8 @@ nxt_process_apply_creds(nxt_task_t *task, nxt_process_t *process) #if (NXT_HAVE_CLONE && NXT_HAVE_CLONE_NEWUSER) if (!cap_setid - && nxt_is_clone_flag_set(process->isolation.clone.flags, NEWUSER)) { + && nxt_is_clone_flag_set(process->isolation.clone.flags, NEWUSER)) + { cap_setid = 1; } #endif @@ -617,17 +772,10 @@ nxt_process_apply_creds(nxt_task_t *task, nxt_process_t *process) static nxt_int_t nxt_process_send_ready(nxt_task_t *task, nxt_process_t *process) { - nxt_int_t ret; - nxt_port_t *main_port; - nxt_runtime_t *rt; - - rt = task->thread->runtime; - - main_port = rt->port_by_type[NXT_PROCESS_MAIN]; - - nxt_assert(main_port != NULL); + nxt_int_t ret; - ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_PROCESS_READY, + ret = nxt_port_socket_write(task, process->parent_port, + NXT_PORT_MSG_PROCESS_READY, -1, process->stream, 0, NULL); if (nxt_slow_path(ret != NXT_OK)) { diff --git a/src/nxt_process.h b/src/nxt_process.h index e4d3c93a..c92eebd8 100644 --- a/src/nxt_process.h +++ b/src/nxt_process.h @@ -99,19 +99,26 @@ typedef struct { struct nxt_process_s { nxt_pid_t pid; - const char *name; - nxt_queue_t ports; /* of nxt_port_t */ + nxt_queue_t ports; /* of nxt_port_t.link */ nxt_process_state_t state; nxt_bool_t registered; nxt_int_t use_count; nxt_port_mmaps_t incoming; + + nxt_pid_t isolated_pid; + const char *name; + nxt_port_t *parent_port; + uint32_t stream; nxt_mp_t *mem_pool; nxt_credential_t *user_cred; + nxt_queue_t children; /* of nxt_process_t.link */ + nxt_queue_link_t link; /* for nxt_process_t.children */ + nxt_process_data_t data; nxt_process_isolation_t isolation; diff --git a/src/nxt_process_type.h b/src/nxt_process_type.h index 14deda19..d0093431 100644 --- a/src/nxt_process_type.h +++ b/src/nxt_process_type.h @@ -13,6 +13,7 @@ typedef enum { NXT_PROCESS_DISCOVERY, NXT_PROCESS_CONTROLLER, NXT_PROCESS_ROUTER, + NXT_PROCESS_PROTOTYPE, NXT_PROCESS_APP, NXT_PROCESS_MAX, diff --git a/src/nxt_router.c b/src/nxt_router.c index 1dd7f8e5..7623ccbb 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -65,12 +65,14 @@ typedef struct { typedef struct { nxt_app_t *app; nxt_router_temp_conf_t *temp_conf; + uint8_t proto; /* 1 bit */ } nxt_app_rpc_t; typedef struct { nxt_app_joint_t *app_joint; uint32_t generation; + uint8_t proto; /* 1 bit */ } nxt_app_joint_rpc_t; @@ -392,32 +394,52 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, { size_t size; uint32_t stream; - nxt_mp_t *mp; nxt_int_t ret; nxt_app_t *app; nxt_buf_t *b; - nxt_port_t *main_port; + nxt_port_t *dport; nxt_runtime_t *rt; nxt_app_joint_rpc_t *app_joint_rpc; app = data; - rt = task->thread->runtime; - main_port = rt->port_by_type[NXT_PROCESS_MAIN]; + nxt_thread_mutex_lock(&app->mutex); - nxt_debug(task, "app '%V' %p start process", &app->name, app); + dport = app->proto_port; - size = app->name.length + 1 + app->conf.length; + nxt_thread_mutex_unlock(&app->mutex); - b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size); + if (dport != NULL) { + nxt_debug(task, "app '%V' %p start process", &app->name, app); - if (nxt_slow_path(b == NULL)) { - goto failed; - } + b = NULL; - nxt_buf_cpystr(b, &app->name); - *b->mem.free++ = '\0'; - nxt_buf_cpystr(b, &app->conf); + } else { + if (app->proto_port_requests > 0) { + nxt_debug(task, "app '%V' %p wait for prototype process", + &app->name, app); + + app->proto_port_requests++; + + goto skip; + } + + nxt_debug(task, "app '%V' %p start prototype process", &app->name, app); + + rt = task->thread->runtime; + dport = rt->port_by_type[NXT_PROCESS_MAIN]; + + size = app->name.length + 1 + app->conf.length; + + b = nxt_buf_mem_alloc(task->thread->engine->mem_pool, size, 0); + if (nxt_slow_path(b == NULL)) { + goto failed; + } + + nxt_buf_cpystr(b, &app->name); + *b->mem.free++ = '\0'; + nxt_buf_cpystr(b, &app->conf); + } app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port, nxt_router_app_port_ready, @@ -429,7 +451,7 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, stream = nxt_port_rpc_ex_stream(app_joint_rpc); - ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS, + ret = nxt_port_socket_write(task, dport, NXT_PORT_MSG_START_PROCESS, -1, stream, port->id, b); if (nxt_slow_path(ret != NXT_OK)) { nxt_port_rpc_cancel(task, port, stream); @@ -439,26 +461,23 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, app_joint_rpc->app_joint = app->joint; app_joint_rpc->generation = app->generation; + app_joint_rpc->proto = (b != NULL); - nxt_router_app_joint_use(task, app->joint, 1); + if (b != NULL) { + app->proto_port_requests++; - nxt_router_app_use(task, app, -1); + b = NULL; + } - return; + nxt_router_app_joint_use(task, app->joint, 1); failed: if (b != NULL) { - mp = b->data; - nxt_mp_free(mp, b); - nxt_mp_release(mp); + nxt_mp_free(b->data, b); } - nxt_thread_mutex_lock(&app->mutex); - - app->pending_processes--; - - nxt_thread_mutex_unlock(&app->mutex); +skip: nxt_router_app_use(task, app, -1); } @@ -658,6 +677,12 @@ nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_router_greet_controller(task, msg->u.new_port); } + if (port != NULL && port->type == NXT_PROCESS_PROTOTYPE) { + nxt_port_rpc_handler(task, msg); + + return; + } + if (port == NULL || port->type != NXT_PROCESS_APP) { if (msg->port_msg.stream == 0) { @@ -683,6 +708,8 @@ nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) return; } + nxt_debug(task, "new port id %d (%d)", port->id, port->type); + /* * Port with "id == 0" is application 'main' port and it always * should come with non-zero stream. @@ -819,7 +846,8 @@ nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_app_t *app; nxt_int_t ret; nxt_str_t app_name; - nxt_port_t *port, *reply_port, *shared_port, *old_shared_port; + nxt_port_t *reply_port, *shared_port, *old_shared_port; + nxt_port_t *proto_port; nxt_port_msg_type_t reply; reply_port = nxt_runtime_port_find(task->thread->runtime, @@ -862,12 +890,15 @@ nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_thread_mutex_lock(&app->mutex); - nxt_queue_each(port, &app->ports, nxt_port_t, app_link) { + proto_port = app->proto_port; - (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, - 0, 0, NULL); + if (proto_port != NULL) { + nxt_debug(task, "send QUIT to prototype '%V' pid %PI", &app->name, + proto_port->pid); - } nxt_queue_loop; + app->proto_port = NULL; + proto_port->app = NULL; + } app->generation++; @@ -883,6 +914,15 @@ nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_port_close(task, old_shared_port); nxt_port_use(task, old_shared_port, -1); + if (proto_port != NULL) { + (void) nxt_port_socket_write(task, proto_port, NXT_PORT_MSG_QUIT, + -1, 0, 0, NULL); + + nxt_port_close(task, proto_port); + + nxt_port_use(task, proto_port, -1); + } + reply = NXT_PORT_MSG_RPC_READY_LAST; } else { @@ -2735,54 +2775,67 @@ nxt_router_app_rpc_create(nxt_task_t *task, uint32_t stream; nxt_int_t ret; nxt_buf_t *b; - nxt_port_t *main_port, *router_port; + nxt_port_t *router_port, *dport; nxt_runtime_t *rt; nxt_app_rpc_t *rpc; - rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_app_rpc_t)); - if (rpc == NULL) { - goto fail; - } + rt = task->thread->runtime; - rpc->app = app; - rpc->temp_conf = tmcf; + dport = app->proto_port; - nxt_debug(task, "app '%V' prefork", &app->name); + if (dport == NULL) { + nxt_debug(task, "app '%V' prototype prefork", &app->name); - size = app->name.length + 1 + app->conf.length; + size = app->name.length + 1 + app->conf.length; - b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); - if (nxt_slow_path(b == NULL)) { - goto fail; - } + b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); + if (nxt_slow_path(b == NULL)) { + goto fail; + } - b->completion_handler = nxt_buf_dummy_completion; + b->completion_handler = nxt_buf_dummy_completion; - nxt_buf_cpystr(b, &app->name); - *b->mem.free++ = '\0'; - nxt_buf_cpystr(b, &app->conf); + nxt_buf_cpystr(b, &app->name); + *b->mem.free++ = '\0'; + nxt_buf_cpystr(b, &app->conf); + + dport = rt->port_by_type[NXT_PROCESS_MAIN]; + + } else { + nxt_debug(task, "app '%V' prefork", &app->name); + + b = NULL; + } - rt = task->thread->runtime; - main_port = rt->port_by_type[NXT_PROCESS_MAIN]; router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; - stream = nxt_port_rpc_register_handler(task, router_port, + rpc = nxt_port_rpc_register_handler_ex(task, router_port, nxt_router_app_prefork_ready, nxt_router_app_prefork_error, - -1, rpc); - if (nxt_slow_path(stream == 0)) { + sizeof(nxt_app_rpc_t)); + if (nxt_slow_path(rpc == NULL)) { goto fail; } - ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS, - -1, stream, router_port->id, b); + rpc->app = app; + rpc->temp_conf = tmcf; + rpc->proto = (b != NULL); + + stream = nxt_port_rpc_ex_stream(rpc); + ret = nxt_port_socket_write(task, dport, + NXT_PORT_MSG_START_PROCESS, + -1, stream, router_port->id, b); if (nxt_slow_path(ret != NXT_OK)) { nxt_port_rpc_cancel(task, router_port, stream); goto fail; } - app->pending_processes++; + if (b == NULL) { + nxt_port_rpc_ex_set_peer(task, router_port, rpc, dport->pid); + + app->pending_processes++; + } return; @@ -2807,9 +2860,24 @@ nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, port = msg->u.new_port; nxt_assert(port != NULL); - nxt_assert(port->type == NXT_PROCESS_APP); nxt_assert(port->id == 0); + if (rpc->proto) { + nxt_assert(app->proto_port == NULL); + nxt_assert(port->type == NXT_PROCESS_PROTOTYPE); + + nxt_port_inc_use(port); + + app->proto_port = port; + port->app = app; + + nxt_router_app_rpc_create(task, rpc->temp_conf, app); + + return; + } + + nxt_assert(port->type == NXT_PROCESS_APP); + port->app = app; port->main_app_port = port; @@ -2851,10 +2919,16 @@ nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, app = rpc->app; tmcf = rpc->temp_conf; - nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"", - &app->name); + if (rpc->proto) { + nxt_log(task, NXT_LOG_WARN, "failed to start prototype \"%V\"", + &app->name); - app->pending_processes--; + } else { + nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"", + &app->name); + + app->pending_processes--; + } nxt_router_conf_error(task, tmcf); } @@ -4413,8 +4487,9 @@ static void nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) { + uint32_t n; nxt_app_t *app; - nxt_bool_t start_process; + nxt_bool_t start_process, restarted; nxt_port_t *port; nxt_app_joint_t *app_joint; nxt_app_joint_rpc_t *app_joint_rpc; @@ -4427,7 +4502,6 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_assert(app_joint != NULL); nxt_assert(port != NULL); - nxt_assert(port->type == NXT_PROCESS_APP); nxt_assert(port->id == 0); app = app_joint->app; @@ -4444,11 +4518,51 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_thread_mutex_lock(&app->mutex); + restarted = (app->generation != app_joint_rpc->generation); + + if (app_joint_rpc->proto) { + nxt_assert(app->proto_port == NULL); + nxt_assert(port->type == NXT_PROCESS_PROTOTYPE); + + n = app->proto_port_requests; + app->proto_port_requests = 0; + + if (nxt_slow_path(restarted)) { + nxt_thread_mutex_unlock(&app->mutex); + + nxt_debug(task, "proto port ready for restarted app, send QUIT"); + + nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, + NULL); + + } else { + port->app = app; + app->proto_port = port; + + nxt_thread_mutex_unlock(&app->mutex); + + nxt_port_use(task, port, 1); + } + + port = task->thread->runtime->port_by_type[NXT_PROCESS_ROUTER]; + + while (n > 0) { + nxt_router_app_use(task, app, 1); + + nxt_router_start_app_process_handler(task, port, app); + + n--; + } + + return; + } + + nxt_assert(port->type == NXT_PROCESS_APP); nxt_assert(app->pending_processes != 0); app->pending_processes--; - if (nxt_slow_path(app->generation != app_joint_rpc->generation)) { + if (nxt_slow_path(restarted)) { nxt_debug(task, "new port ready for restarted app, send QUIT"); start_process = !task->thread->engine->shutdown @@ -4591,7 +4705,6 @@ nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, } - nxt_inline nxt_port_t * nxt_router_app_get_port_for_quit(nxt_task_t *task, nxt_app_t *app) { @@ -4791,6 +4904,20 @@ nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port) nxt_thread_mutex_lock(&app->mutex); + if (port == app->proto_port) { + app->proto_port = NULL; + port->app = NULL; + + nxt_thread_mutex_unlock(&app->mutex); + + nxt_debug(task, "app '%V' prototype pid %PI closed", &app->name, + port->pid); + + nxt_port_use(task, port, -1); + + return; + } + nxt_port_hash_remove(&app->port_hash, port); app->port_hash_count--; @@ -4979,7 +5106,7 @@ static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data) { nxt_app_t *app; - nxt_port_t *port; + nxt_port_t *port, *proto_port; nxt_app_joint_t *app_joint; app_joint = obj; @@ -4991,10 +5118,6 @@ nxt_router_free_app(nxt_task_t *task, void *obj, void *data) break; } - nxt_debug(task, "send QUIT to app '%V' pid %PI", &app->name, port->pid); - - nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); - nxt_port_use(task, port, -1); } @@ -5015,8 +5138,28 @@ nxt_router_free_app(nxt_task_t *task, void *obj, void *data) nxt_port_use(task, port, -1); } + proto_port = app->proto_port; + + if (proto_port != NULL) { + nxt_debug(task, "send QUIT to prototype '%V' pid %PI", &app->name, + proto_port->pid); + + app->proto_port = NULL; + proto_port->app = NULL; + } + nxt_thread_mutex_unlock(&app->mutex); + if (proto_port != NULL) { + nxt_port_socket_write(task, proto_port, NXT_PORT_MSG_QUIT, + -1, 0, 0, NULL); + + nxt_port_close(task, proto_port); + + nxt_port_use(task, proto_port, -1); + } + + nxt_assert(app->proto_port == NULL); nxt_assert(app->processes == 0); nxt_assert(app->active_requests == 0); nxt_assert(app->port_hash_count == 0); diff --git a/src/nxt_router.h b/src/nxt_router.h index 011663c3..7e337d27 100644 --- a/src/nxt_router.h +++ b/src/nxt_router.h @@ -126,6 +126,7 @@ struct nxt_app_s { uint32_t max_pending_processes; uint32_t generation; + uint32_t proto_port_requests; nxt_msec_t timeout; nxt_msec_t idle_timeout; @@ -144,6 +145,7 @@ struct nxt_app_s { nxt_app_joint_t *joint; nxt_port_t *shared_port; + nxt_port_t *proto_port; nxt_port_mmaps_t outgoing; }; diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c index d6cc728f..46955f1c 100644 --- a/src/nxt_runtime.c +++ b/src/nxt_runtime.c @@ -40,8 +40,6 @@ static void nxt_runtime_thread_pool_init(void); static void nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj, void *data); static nxt_process_t *nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid); -static void nxt_runtime_process_remove(nxt_runtime_t *rt, - nxt_process_t *process); static void nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port); @@ -504,7 +502,9 @@ nxt_runtime_stop_app_processes(nxt_task_t *task, nxt_runtime_t *rt) init = nxt_process_init(process); - if (init->type == NXT_PROCESS_APP) { + if (init->type == NXT_PROCESS_APP + || init->type == NXT_PROCESS_PROTOTYPE) + { nxt_process_port_each(process, port) { @@ -528,6 +528,8 @@ nxt_runtime_stop_all_processes(nxt_task_t *task, nxt_runtime_t *rt) nxt_process_port_each(process, port) { + nxt_debug(task, "%d sending quit to %PI", rt->type, port->pid); + (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); @@ -1389,10 +1391,21 @@ nxt_runtime_pid_file_create(nxt_task_t *task, nxt_file_name_t *pid_file) void nxt_runtime_process_release(nxt_runtime_t *rt, nxt_process_t *process) { + nxt_process_t *child; + if (process->registered == 1) { nxt_runtime_process_remove(rt, process); } + if (process->link.next != NULL) { + nxt_queue_remove(&process->link); + } + + nxt_queue_each(child, &process->children, nxt_process_t, link) { + nxt_queue_remove(&child->link); + child->link.next = NULL; + } nxt_queue_loop; + nxt_assert(process->use_count == 0); nxt_assert(process->registered == 0); diff --git a/src/nxt_runtime.h b/src/nxt_runtime.h index f40b9389..d7fe2f38 100644 --- a/src/nxt_runtime.h +++ b/src/nxt_runtime.h @@ -54,6 +54,7 @@ struct nxt_runtime_s { uint8_t daemon; uint8_t batch; uint8_t status; + uint8_t is_pid_isolated; const char *engine; uint32_t engine_connections; @@ -95,6 +96,7 @@ nxt_int_t nxt_runtime_thread_pool_create(nxt_thread_t *thr, nxt_runtime_t *rt, void nxt_runtime_process_add(nxt_task_t *task, nxt_process_t *process); +void nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process); nxt_process_t *nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid); diff --git a/src/nxt_unit.c b/src/nxt_unit.c index a29c3fb9..06ad1636 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -418,6 +418,9 @@ typedef struct { } nxt_unit_port_hash_id_t; +static pid_t nxt_unit_pid; + + nxt_unit_ctx_t * nxt_unit_init(nxt_unit_init_t *init) { @@ -428,6 +431,8 @@ nxt_unit_init(nxt_unit_init_t *init) nxt_unit_impl_t *lib; nxt_unit_port_t ready_port, router_port, read_port; + nxt_unit_pid = getpid(); + lib = nxt_unit_create(init); if (nxt_slow_path(lib == NULL)) { return NULL; @@ -471,6 +476,7 @@ nxt_unit_init(nxt_unit_init_t *init) } lib->pid = read_port.id.pid; + nxt_unit_pid = lib->pid; ctx = &lib->main_ctx.ctx; @@ -6571,7 +6577,7 @@ nxt_unit_log(nxt_unit_ctx_t *ctx, int level, const char *fmt, ...) log_fd = lib->log_fd; } else { - pid = getpid(); + pid = nxt_unit_pid; log_fd = STDERR_FILENO; } @@ -6615,7 +6621,7 @@ nxt_unit_req_log(nxt_unit_request_info_t *req, int level, const char *fmt, ...) log_fd = lib->log_fd; } else { - pid = getpid(); + pid = nxt_unit_pid; log_fd = STDERR_FILENO; } |