diff options
Diffstat (limited to 'src/nxt_main_process.c')
-rw-r--r-- | src/nxt_main_process.c | 547 |
1 files changed, 292 insertions, 255 deletions
diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c index 16c6a297..a5a20d3d 100644 --- a/src/nxt_main_process.c +++ b/src/nxt_main_process.c @@ -10,6 +10,7 @@ #include <nxt_main_process.h> #include <nxt_conf.h> #include <nxt_router.h> +#include <nxt_port_queue.h> #if (NXT_TLS) #include <nxt_cert.h> #endif @@ -31,18 +32,9 @@ typedef struct { } nxt_conf_app_map_t; -extern nxt_port_handlers_t nxt_controller_process_port_handlers; -extern nxt_port_handlers_t nxt_router_process_port_handlers; - - static nxt_int_t nxt_main_process_port_create(nxt_task_t *task, nxt_runtime_t *rt); static void nxt_main_process_title(nxt_task_t *task); -static nxt_int_t nxt_main_process_create(nxt_task_t *task, - const nxt_process_init_t init); -static nxt_int_t nxt_main_start_process(nxt_task_t *task, - nxt_process_t *process); -static nxt_process_t *nxt_main_process_new(nxt_task_t *task, nxt_runtime_t *rt); static void nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj, void *data); static void nxt_main_process_sigquit_handler(nxt_task_t *task, void *obj, @@ -53,7 +45,7 @@ static void nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data); static void nxt_main_process_signal_handler(nxt_task_t *task, void *obj, void *data); -static void nxt_main_cleanup_process(nxt_task_t *task, nxt_pid_t pid); +static void nxt_main_process_cleanup(nxt_task_t *task, nxt_process_t *process); static void nxt_main_port_socket_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); static nxt_int_t nxt_main_listening_socket(nxt_sockaddr_t *sa, @@ -61,8 +53,12 @@ static nxt_int_t nxt_main_listening_socket(nxt_sockaddr_t *sa, static void nxt_main_port_modules_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); static int nxt_cdecl nxt_app_lang_compare(const void *v1, const void *v2); +static void nxt_main_process_whoami_handler(nxt_task_t *task, + nxt_port_recv_msg_t *msg); static void nxt_main_port_conf_store_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); +static nxt_int_t nxt_main_file_store(nxt_task_t *task, const char *tmp_name, + const char *name, u_char *buf, size_t size); static void nxt_main_port_access_log_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); @@ -77,6 +73,8 @@ const nxt_sig_event_t nxt_main_process_signals[] = { }; +nxt_uint_t nxt_conf_ver; + static nxt_bool_t nxt_exiting; @@ -97,7 +95,7 @@ nxt_main_process_start(nxt_thread_t *thr, nxt_task_t *task, * nxt_main_port_modules_handler() which starts the controller * and router processes. */ - return nxt_main_process_create(task, nxt_discovery_process); + return nxt_process_init_start(task, nxt_discovery_process); } @@ -154,6 +152,12 @@ static nxt_conf_map_t nxt_common_app_limits_conf[] = { offsetof(nxt_common_app_conf_t, shm_limit), }, + { + nxt_string("requests"), + NXT_CONF_MAP_INT32, + offsetof(nxt_common_app_conf_t, request_limit), + }, + }; @@ -325,7 +329,7 @@ static nxt_conf_app_map_t nxt_app_maps[] = { static void -nxt_port_main_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +nxt_main_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { nxt_debug(task, "main data: %*s", nxt_buf_mem_used_size(&msg->buf->mem), msg->buf->mem.pos); @@ -333,7 +337,33 @@ nxt_port_main_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) static void -nxt_port_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +nxt_main_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + void *mem; + nxt_port_t *port; + + nxt_port_new_port_handler(task, msg); + + port = msg->u.new_port; + + if (port != NULL + && port->type == NXT_PROCESS_APP + && msg->fd[1] != -1) + { + mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t), + PROT_READ | PROT_WRITE, MAP_SHARED, msg->fd[1], 0); + if (nxt_fast_path(mem != MAP_FAILED)) { + port->queue = mem; + } + + nxt_fd_close(msg->fd[1]); + msg->fd[1] = -1; + } +} + + +static void +nxt_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { u_char *start, *p, ch; size_t type_len; @@ -349,21 +379,42 @@ nxt_port_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) rt = task->thread->runtime; - process = nxt_main_process_new(task, rt); + port = rt->port_by_type[NXT_PROCESS_ROUTER]; + if (nxt_slow_path(port == NULL)) { + nxt_alert(task, "router port not found"); + return; + } + + if (nxt_slow_path(port->pid != nxt_recv_msg_cmsg_pid(msg))) { + nxt_alert(task, "process %PI cannot start processes", + nxt_recv_msg_cmsg_pid(msg)); + + return; + } + + process = nxt_process_new(rt); if (nxt_slow_path(process == NULL)) { return; } + process->mem_pool = nxt_mp_create(1024, 128, 256, 32); + if (process->mem_pool == NULL) { + nxt_process_use(task, process, -1); + return; + } + + process->parent_port = rt->port_by_type[NXT_PROCESS_MAIN]; + init = nxt_process_init(process); - *init = nxt_app_process; + *init = nxt_proto_process; b = nxt_buf_chk_make_plain(process->mem_pool, msg->buf, msg->size); if (b == NULL) { goto failed; } - nxt_debug(task, "main start process: %*s", b->mem.free - b->mem.pos, + nxt_debug(task, "main start prototype: %*s", b->mem.free - b->mem.pos, b->mem.pos); app_conf = nxt_mp_zalloc(process->mem_pool, sizeof(nxt_common_app_conf_t)); @@ -379,7 +430,7 @@ nxt_port_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) init->name = (const char *) start; process->name = nxt_mp_alloc(process->mem_pool, app_conf->name.length - + sizeof("\"\" application") + 1); + + sizeof("\"\" prototype") + 1); if (nxt_slow_path(process->name == NULL)) { goto failed; @@ -388,10 +439,11 @@ nxt_port_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) p = (u_char *) process->name; *p++ = '"'; p = nxt_cpymem(p, init->name, app_conf->name.length); - p = nxt_cpymem(p, "\" application", 13); + p = nxt_cpymem(p, "\" prototype", 11); *p = '\0'; app_conf->shm_limit = 100 * 1024 * 1024; + app_conf->request_limit = 0; start += app_conf->name.length + 1; @@ -455,7 +507,7 @@ nxt_port_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) process->stream = msg->port_msg.stream; process->data.app = app_conf; - ret = nxt_main_start_process(task, process); + ret = nxt_process_start(task, process); if (nxt_fast_path(ret == NXT_OK || ret == NXT_AGAIN)) { return; } @@ -483,21 +535,17 @@ nxt_main_process_created_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) rt = task->thread->runtime; - process = nxt_runtime_process_find(rt, msg->port_msg.pid); - if (nxt_slow_path(process == NULL)) { - return; - } - - nxt_assert(process->state == NXT_PROCESS_STATE_CREATING); - port = nxt_runtime_port_find(rt, msg->port_msg.pid, msg->port_msg.reply_port); - - if (nxt_slow_path(port == NULL)) { return; } + process = port->process; + + nxt_assert(process != NULL); + nxt_assert(process->state == NXT_PROCESS_STATE_CREATING); + #if (NXT_HAVE_CLONE && NXT_HAVE_CLONE_NEWUSER) if (nxt_is_clone_flag_set(process->isolation.clone.flags, NEWUSER)) { if (nxt_slow_path(nxt_clone_credential_map(task, process->pid, @@ -521,10 +569,13 @@ nxt_main_process_created_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) static nxt_port_handlers_t nxt_main_process_port_handlers = { - .data = nxt_port_main_data_handler, + .data = nxt_main_data_handler, + .new_port = nxt_main_new_port_handler, .process_created = nxt_main_process_created_handler, .process_ready = nxt_port_process_ready_handler, - .start_process = nxt_port_main_start_process_handler, + .whoami = nxt_main_process_whoami_handler, + .remove_pid = nxt_port_remove_pid_handler, + .start_process = nxt_main_start_process_handler, .socket = nxt_main_port_socket_handler, .modules = nxt_main_port_modules_handler, .conf_store = nxt_main_port_conf_store_handler, @@ -538,6 +589,88 @@ static nxt_port_handlers_t nxt_main_process_port_handlers = { }; +static void +nxt_main_process_whoami_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + nxt_buf_t *buf; + nxt_pid_t pid, ppid; + nxt_port_t *port; + nxt_runtime_t *rt; + nxt_process_t *pprocess; + + nxt_assert(msg->port_msg.reply_port == 0); + + if (nxt_slow_path(msg->buf == NULL + || nxt_buf_used_size(msg->buf) != sizeof(nxt_pid_t))) + { + nxt_alert(task, "whoami: buffer is NULL or unexpected size"); + goto fail; + } + + nxt_memcpy(&ppid, msg->buf->mem.pos, sizeof(nxt_pid_t)); + + rt = task->thread->runtime; + + pprocess = nxt_runtime_process_find(rt, ppid); + if (nxt_slow_path(pprocess == NULL)) { + nxt_alert(task, "whoami: parent process %PI not found", ppid); + goto fail; + } + + pid = nxt_recv_msg_cmsg_pid(msg); + + nxt_debug(task, "whoami: from %PI, parent %PI, fd %d", pid, ppid, + msg->fd[0]); + + if (msg->fd[0] != -1) { + port = nxt_runtime_process_port_create(task, rt, pid, 0, + NXT_PROCESS_APP); + if (nxt_slow_path(port == NULL)) { + goto fail; + } + + nxt_fd_nonblocking(task, msg->fd[0]); + + port->pair[0] = -1; + port->pair[1] = msg->fd[0]; + msg->fd[0] = -1; + + port->max_size = 16 * 1024; + port->max_share = 64 * 1024; + port->socket.task = task; + + nxt_port_write_enable(task, port); + + } else { + port = nxt_runtime_port_find(rt, pid, 0); + if (nxt_slow_path(port == NULL)) { + goto fail; + } + } + + if (ppid != nxt_pid) { + nxt_queue_insert_tail(&pprocess->children, &port->process->link); + } + + buf = nxt_buf_mem_alloc(task->thread->engine->mem_pool, + sizeof(nxt_pid_t), 0); + if (nxt_slow_path(buf == NULL)) { + goto fail; + } + + buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(nxt_pid_t)); + + (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_READY_LAST, -1, + msg->port_msg.stream, 0, buf); + +fail: + + if (msg->fd[0] != -1) { + nxt_fd_close(msg->fd[0]); + } +} + + static nxt_int_t nxt_main_process_port_create(nxt_task_t *task, nxt_runtime_t *rt) { @@ -597,139 +730,6 @@ nxt_main_process_title(nxt_task_t *task) } -static nxt_int_t -nxt_main_process_create(nxt_task_t *task, const nxt_process_init_t init) -{ - nxt_int_t ret; - nxt_runtime_t *rt; - nxt_process_t *process; - nxt_process_init_t *pinit; - - rt = task->thread->runtime; - - process = nxt_main_process_new(task, rt); - if (nxt_slow_path(process == NULL)) { - return NXT_ERROR; - } - - process->name = init.name; - process->user_cred = &rt->user_cred; - - pinit = nxt_process_init(process); - *pinit = init; - - ret = nxt_main_start_process(task, process); - if (nxt_slow_path(ret == NXT_ERROR)) { - nxt_process_use(task, process, -1); - } - - return ret; -} - - -static nxt_process_t * -nxt_main_process_new(nxt_task_t *task, nxt_runtime_t *rt) -{ - nxt_process_t *process; - - process = nxt_runtime_process_new(rt); - if (nxt_slow_path(process == NULL)) { - return NULL; - } - - process->mem_pool = nxt_mp_create(1024, 128, 256, 32); - if (process->mem_pool == NULL) { - nxt_process_use(task, process, -1); - return NULL; - } - - return process; -} - - -static nxt_int_t -nxt_main_start_process(nxt_task_t *task, nxt_process_t *process) -{ - nxt_mp_t *tmp_mp; - nxt_int_t ret; - nxt_pid_t pid; - nxt_port_t *port; - nxt_process_init_t *init; - - init = nxt_process_init(process); - - port = nxt_port_new(task, 0, 0, init->type); - if (nxt_slow_path(port == NULL)) { - return NXT_ERROR; - } - - nxt_process_port_add(task, process, port); - - ret = nxt_port_socket_init(task, port, 0); - if (nxt_slow_path(ret != NXT_OK)) { - goto free_port; - } - - tmp_mp = nxt_mp_create(1024, 128, 256, 32); - if (nxt_slow_path(tmp_mp == NULL)) { - ret = NXT_ERROR; - - goto close_port; - } - - if (init->prefork) { - ret = init->prefork(task, process, tmp_mp); - if (nxt_slow_path(ret != NXT_OK)) { - goto free_mempool; - } - } - - pid = nxt_process_create(task, process); - - switch (pid) { - - case -1: - ret = NXT_ERROR; - break; - - case 0: - /* The child process: return to the event engine work queue loop. */ - - nxt_process_use(task, process, -1); - - ret = NXT_AGAIN; - break; - - default: - /* The main process created a new process. */ - - nxt_process_use(task, process, -1); - - nxt_port_read_close(port); - nxt_port_write_enable(task, port); - - ret = NXT_OK; - break; - } - -free_mempool: - - nxt_mp_destroy(tmp_mp); - -close_port: - - if (nxt_slow_path(ret == NXT_ERROR)) { - nxt_port_close(task, port); - } - -free_port: - - nxt_port_use(task, port, -1); - - return ret; -} - - static void nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj, void *data) { @@ -859,13 +859,21 @@ fail: static void nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data) { - int status; - nxt_err_t err; - nxt_pid_t pid; + int status; + nxt_int_t ret; + nxt_err_t err; + nxt_pid_t pid; + nxt_port_t *port; + nxt_queue_t children; + nxt_runtime_t *rt; + nxt_process_t *process, *child; + nxt_process_init_t init; nxt_debug(task, "sigchld handler signo:%d (%s)", (int) (uintptr_t) obj, data); + rt = task->thread->runtime; + for ( ;; ) { pid = waitpid(-1, &status, WNOHANG); @@ -906,94 +914,86 @@ nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data) pid, WEXITSTATUS(status)); } - nxt_main_cleanup_process(task, pid); - } -} + process = nxt_runtime_process_find(rt, pid); + if (process != NULL) { + nxt_main_process_cleanup(task, process); -static void -nxt_main_process_signal_handler(nxt_task_t *task, void *obj, void *data) -{ - nxt_trace(task, "signal signo:%d (%s) recevied, ignored", - (int) (uintptr_t) obj, data); -} + if (process->state == NXT_PROCESS_STATE_READY) { + process->stream = 0; + } + nxt_queue_init(&children); -static void -nxt_main_cleanup_process(nxt_task_t *task, nxt_pid_t pid) -{ - int stream; - nxt_int_t ret; - nxt_buf_t *buf; - nxt_port_t *port; - const char *name; - nxt_runtime_t *rt; - nxt_process_t *process; - nxt_process_init_t init; + if (!nxt_queue_is_empty(&process->children)) { + nxt_queue_add(&children, &process->children); - rt = task->thread->runtime; + nxt_queue_init(&process->children); - process = nxt_runtime_process_find(rt, pid); - if (!process) { - return; - } + nxt_queue_each(child, &children, nxt_process_t, link) { + port = nxt_process_port_first(child); - if (process->isolation.cleanup != NULL) { - process->isolation.cleanup(task, process); - } + (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, + -1, 0, 0, NULL); + } nxt_queue_loop; + } - name = process->name; - stream = process->stream; - init = *((nxt_process_init_t *) nxt_process_init(process)); + if (nxt_exiting) { + nxt_process_close_ports(task, process); - if (process->state == NXT_PROCESS_STATE_READY) { - process->stream = 0; - } + nxt_queue_each(child, &children, nxt_process_t, link) { + nxt_queue_remove(&child->link); + child->link.next = NULL; - nxt_process_close_ports(task, process); + nxt_process_close_ports(task, child); + } nxt_queue_loop; - if (nxt_exiting) { - if (rt->nprocesses <= 1) { - nxt_runtime_quit(task, 0); - } + if (rt->nprocesses <= 1) { + nxt_runtime_quit(task, 0); + } - return; - } + return; + } - nxt_runtime_process_each(rt, process) { + nxt_port_remove_notify_others(task, process); - if (process->pid == nxt_pid - || process->pid == pid - || nxt_queue_is_empty(&process->ports)) - { - continue; - } + nxt_queue_each(child, &children, nxt_process_t, link) { + nxt_port_remove_notify_others(task, child); - port = nxt_process_port_first(process); + nxt_queue_remove(&child->link); + child->link.next = NULL; - if (nxt_proc_remove_notify_matrix[init.type][port->type] == 0) { - continue; - } + nxt_process_close_ports(task, child); + } nxt_queue_loop; + + init = *(nxt_process_init_t *) nxt_process_init(process); - buf = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, - sizeof(pid)); + nxt_process_close_ports(task, process); - if (nxt_slow_path(buf == NULL)) { - continue; + if (init.restart) { + ret = nxt_process_init_start(task, init); + if (nxt_slow_path(ret == NXT_ERROR)) { + nxt_alert(task, "failed to restart %s", init.name); + } + } } + } +} - buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(pid)); - nxt_port_socket_write(task, port, NXT_PORT_MSG_REMOVE_PID, -1, - stream, 0, buf); +static void +nxt_main_process_signal_handler(nxt_task_t *task, void *obj, void *data) +{ + nxt_trace(task, "signal signo:%d (%s) recevied, ignored", + (int) (uintptr_t) obj, data); +} - } nxt_runtime_process_loop; - if (init.restart) { - ret = nxt_main_process_create(task, init); - if (nxt_slow_path(ret == NXT_ERROR)) { - nxt_alert(task, "failed to restart %s", name); - } +static void +nxt_main_process_cleanup(nxt_task_t *task, nxt_process_t *process) +{ + if (process->isolation.cleanup != NULL) { + process->isolation.cleanup(task, process); } } @@ -1016,6 +1016,13 @@ nxt_main_port_socket_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) return; } + if (nxt_slow_path(port->type != NXT_PROCESS_ROUTER)) { + nxt_alert(task, "process %PI cannot create listener sockets", + msg->port_msg.pid); + + return; + } + b = msg->buf; sa = (nxt_sockaddr_t *) b->mem.pos; @@ -1259,6 +1266,7 @@ nxt_main_port_modules_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) rt = task->thread->runtime; if (msg->port_msg.pid != rt->port_by_type[NXT_PROCESS_DISCOVERY]->pid) { + nxt_alert(task, "process %PI cannot send modules", msg->port_msg.pid); return; } @@ -1379,9 +1387,9 @@ fail: nxt_mp_destroy(mp); - ret = nxt_main_process_create(task, nxt_controller_process); + ret = nxt_process_init_start(task, nxt_controller_process); if (ret == NXT_OK) { - ret = nxt_main_process_create(task, nxt_router_process); + ret = nxt_process_init_start(task, nxt_router_process); } if (nxt_slow_path(ret == NXT_ERROR)) { @@ -1419,11 +1427,20 @@ static void nxt_main_port_conf_store_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { void *p; - size_t size; - ssize_t n; + size_t n, size; nxt_int_t ret; - nxt_file_t file; + nxt_port_t *ctl_port; nxt_runtime_t *rt; + u_char ver[NXT_INT_T_LEN]; + + rt = task->thread->runtime; + + ctl_port = rt->port_by_type[NXT_PROCESS_CONTROLLER]; + + if (nxt_slow_path(msg->port_msg.pid != ctl_port->pid)) { + nxt_alert(task, "process %PI cannot store conf", msg->port_msg.pid); + return; + } p = MAP_FAILED; @@ -1457,29 +1474,18 @@ nxt_main_port_conf_store_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_debug(task, "conf_store_handler(%uz): %*s", size, size, p); - nxt_memzero(&file, sizeof(nxt_file_t)); - - rt = task->thread->runtime; - - file.name = (nxt_file_name_t *) rt->conf_tmp; + if (nxt_conf_ver != NXT_VERNUM) { + n = nxt_sprintf(ver, ver + NXT_INT_T_LEN, "%d", NXT_VERNUM) - ver; - if (nxt_slow_path(nxt_file_open(task, &file, NXT_FILE_WRONLY, - NXT_FILE_TRUNCATE, NXT_FILE_OWNER_ACCESS) - != NXT_OK)) - { - goto error; - } - - n = nxt_file_write(&file, p, size, 0); - - nxt_file_close(task, &file); + ret = nxt_main_file_store(task, rt->ver_tmp, rt->ver, ver, n); + if (nxt_slow_path(ret != NXT_OK)) { + goto error; + } - if (nxt_slow_path(n != (ssize_t) size)) { - (void) nxt_file_delete(file.name); - goto error; + nxt_conf_ver = NXT_VERNUM; } - ret = nxt_file_rename(file.name, (nxt_file_name_t *) rt->conf); + ret = nxt_main_file_store(task, rt->conf_tmp, rt->conf, p, size); if (nxt_fast_path(ret == NXT_OK)) { goto cleanup; @@ -1502,6 +1508,37 @@ cleanup: } +static nxt_int_t +nxt_main_file_store(nxt_task_t *task, const char *tmp_name, const char *name, + u_char *buf, size_t size) +{ + ssize_t n; + nxt_int_t ret; + nxt_file_t file; + + nxt_memzero(&file, sizeof(nxt_file_t)); + + file.name = (nxt_file_name_t *) name; + + ret = nxt_file_open(task, &file, NXT_FILE_WRONLY, NXT_FILE_TRUNCATE, + NXT_FILE_OWNER_ACCESS); + if (nxt_slow_path(ret != NXT_OK)) { + return NXT_ERROR; + } + + n = nxt_file_write(&file, buf, size, 0); + + nxt_file_close(task, &file); + + if (nxt_slow_path(n != (ssize_t) size)) { + (void) nxt_file_delete(file.name); + return NXT_ERROR; + } + + return nxt_file_rename(file.name, (nxt_file_name_t *) name); +} + + static void nxt_main_port_access_log_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { |