diff options
Diffstat (limited to 'src/nxt_main_process.c')
-rw-r--r-- | src/nxt_main_process.c | 183 |
1 files changed, 165 insertions, 18 deletions
diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c index eff96e14..a5a20d3d 100644 --- a/src/nxt_main_process.c +++ b/src/nxt_main_process.c @@ -10,6 +10,7 @@ #include <nxt_main_process.h> #include <nxt_conf.h> #include <nxt_router.h> +#include <nxt_port_queue.h> #if (NXT_TLS) #include <nxt_cert.h> #endif @@ -52,6 +53,8 @@ static nxt_int_t nxt_main_listening_socket(nxt_sockaddr_t *sa, static void nxt_main_port_modules_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); static int nxt_cdecl nxt_app_lang_compare(const void *v1, const void *v2); +static void nxt_main_process_whoami_handler(nxt_task_t *task, + nxt_port_recv_msg_t *msg); static void nxt_main_port_conf_store_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); static nxt_int_t nxt_main_file_store(nxt_task_t *task, const char *tmp_name, @@ -326,7 +329,7 @@ static nxt_conf_app_map_t nxt_app_maps[] = { static void -nxt_port_main_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +nxt_main_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { nxt_debug(task, "main data: %*s", nxt_buf_mem_used_size(&msg->buf->mem), msg->buf->mem.pos); @@ -334,7 +337,33 @@ nxt_port_main_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) static void -nxt_port_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +nxt_main_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + void *mem; + nxt_port_t *port; + + nxt_port_new_port_handler(task, msg); + + port = msg->u.new_port; + + if (port != NULL + && port->type == NXT_PROCESS_APP + && msg->fd[1] != -1) + { + mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t), + PROT_READ | PROT_WRITE, MAP_SHARED, msg->fd[1], 0); + if (nxt_fast_path(mem != MAP_FAILED)) { + port->queue = mem; + } + + nxt_fd_close(msg->fd[1]); + msg->fd[1] = -1; + } +} + + +static void +nxt_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { u_char *start, *p, ch; size_t type_len; @@ -374,16 +403,18 @@ nxt_port_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) return; } + process->parent_port = rt->port_by_type[NXT_PROCESS_MAIN]; + init = nxt_process_init(process); - *init = nxt_app_process; + *init = nxt_proto_process; b = nxt_buf_chk_make_plain(process->mem_pool, msg->buf, msg->size); if (b == NULL) { goto failed; } - nxt_debug(task, "main start process: %*s", b->mem.free - b->mem.pos, + nxt_debug(task, "main start prototype: %*s", b->mem.free - b->mem.pos, b->mem.pos); app_conf = nxt_mp_zalloc(process->mem_pool, sizeof(nxt_common_app_conf_t)); @@ -399,7 +430,7 @@ nxt_port_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) init->name = (const char *) start; process->name = nxt_mp_alloc(process->mem_pool, app_conf->name.length - + sizeof("\"\" application") + 1); + + sizeof("\"\" prototype") + 1); if (nxt_slow_path(process->name == NULL)) { goto failed; @@ -408,7 +439,7 @@ nxt_port_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) p = (u_char *) process->name; *p++ = '"'; p = nxt_cpymem(p, init->name, app_conf->name.length); - p = nxt_cpymem(p, "\" application", 13); + p = nxt_cpymem(p, "\" prototype", 11); *p = '\0'; app_conf->shm_limit = 100 * 1024 * 1024; @@ -504,21 +535,17 @@ nxt_main_process_created_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) rt = task->thread->runtime; - process = nxt_runtime_process_find(rt, msg->port_msg.pid); - if (nxt_slow_path(process == NULL)) { - return; - } - - nxt_assert(process->state == NXT_PROCESS_STATE_CREATING); - port = nxt_runtime_port_find(rt, msg->port_msg.pid, msg->port_msg.reply_port); - - if (nxt_slow_path(port == NULL)) { return; } + process = port->process; + + nxt_assert(process != NULL); + nxt_assert(process->state == NXT_PROCESS_STATE_CREATING); + #if (NXT_HAVE_CLONE && NXT_HAVE_CLONE_NEWUSER) if (nxt_is_clone_flag_set(process->isolation.clone.flags, NEWUSER)) { if (nxt_slow_path(nxt_clone_credential_map(task, process->pid, @@ -542,10 +569,13 @@ nxt_main_process_created_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) static nxt_port_handlers_t nxt_main_process_port_handlers = { - .data = nxt_port_main_data_handler, + .data = nxt_main_data_handler, + .new_port = nxt_main_new_port_handler, .process_created = nxt_main_process_created_handler, .process_ready = nxt_port_process_ready_handler, - .start_process = nxt_port_main_start_process_handler, + .whoami = nxt_main_process_whoami_handler, + .remove_pid = nxt_port_remove_pid_handler, + .start_process = nxt_main_start_process_handler, .socket = nxt_main_port_socket_handler, .modules = nxt_main_port_modules_handler, .conf_store = nxt_main_port_conf_store_handler, @@ -559,6 +589,88 @@ static nxt_port_handlers_t nxt_main_process_port_handlers = { }; +static void +nxt_main_process_whoami_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + nxt_buf_t *buf; + nxt_pid_t pid, ppid; + nxt_port_t *port; + nxt_runtime_t *rt; + nxt_process_t *pprocess; + + nxt_assert(msg->port_msg.reply_port == 0); + + if (nxt_slow_path(msg->buf == NULL + || nxt_buf_used_size(msg->buf) != sizeof(nxt_pid_t))) + { + nxt_alert(task, "whoami: buffer is NULL or unexpected size"); + goto fail; + } + + nxt_memcpy(&ppid, msg->buf->mem.pos, sizeof(nxt_pid_t)); + + rt = task->thread->runtime; + + pprocess = nxt_runtime_process_find(rt, ppid); + if (nxt_slow_path(pprocess == NULL)) { + nxt_alert(task, "whoami: parent process %PI not found", ppid); + goto fail; + } + + pid = nxt_recv_msg_cmsg_pid(msg); + + nxt_debug(task, "whoami: from %PI, parent %PI, fd %d", pid, ppid, + msg->fd[0]); + + if (msg->fd[0] != -1) { + port = nxt_runtime_process_port_create(task, rt, pid, 0, + NXT_PROCESS_APP); + if (nxt_slow_path(port == NULL)) { + goto fail; + } + + nxt_fd_nonblocking(task, msg->fd[0]); + + port->pair[0] = -1; + port->pair[1] = msg->fd[0]; + msg->fd[0] = -1; + + port->max_size = 16 * 1024; + port->max_share = 64 * 1024; + port->socket.task = task; + + nxt_port_write_enable(task, port); + + } else { + port = nxt_runtime_port_find(rt, pid, 0); + if (nxt_slow_path(port == NULL)) { + goto fail; + } + } + + if (ppid != nxt_pid) { + nxt_queue_insert_tail(&pprocess->children, &port->process->link); + } + + buf = nxt_buf_mem_alloc(task->thread->engine->mem_pool, + sizeof(nxt_pid_t), 0); + if (nxt_slow_path(buf == NULL)) { + goto fail; + } + + buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(nxt_pid_t)); + + (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_READY_LAST, -1, + msg->port_msg.stream, 0, buf); + +fail: + + if (msg->fd[0] != -1) { + nxt_fd_close(msg->fd[0]); + } +} + + static nxt_int_t nxt_main_process_port_create(nxt_task_t *task, nxt_runtime_t *rt) { @@ -751,8 +863,10 @@ nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data) nxt_int_t ret; nxt_err_t err; nxt_pid_t pid; + nxt_port_t *port; + nxt_queue_t children; nxt_runtime_t *rt; - nxt_process_t *process; + nxt_process_t *process, *child; nxt_process_init_t init; nxt_debug(task, "sigchld handler signo:%d (%s)", @@ -809,7 +923,31 @@ nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data) process->stream = 0; } + nxt_queue_init(&children); + + if (!nxt_queue_is_empty(&process->children)) { + nxt_queue_add(&children, &process->children); + + nxt_queue_init(&process->children); + + nxt_queue_each(child, &children, nxt_process_t, link) { + port = nxt_process_port_first(child); + + (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, + -1, 0, 0, NULL); + } nxt_queue_loop; + } + if (nxt_exiting) { + nxt_process_close_ports(task, process); + + nxt_queue_each(child, &children, nxt_process_t, link) { + nxt_queue_remove(&child->link); + child->link.next = NULL; + + nxt_process_close_ports(task, child); + } nxt_queue_loop; + if (rt->nprocesses <= 1) { nxt_runtime_quit(task, 0); } @@ -819,6 +957,15 @@ nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data) nxt_port_remove_notify_others(task, process); + nxt_queue_each(child, &children, nxt_process_t, link) { + nxt_port_remove_notify_others(task, child); + + nxt_queue_remove(&child->link); + child->link.next = NULL; + + nxt_process_close_ports(task, child); + } nxt_queue_loop; + init = *(nxt_process_init_t *) nxt_process_init(process); nxt_process_close_ports(task, process); |