diff options
author | Tiago Natel de Moura <t.nateldemoura@f5.com> | 2021-11-09 15:48:44 +0300 |
---|---|---|
committer | Tiago Natel de Moura <t.nateldemoura@f5.com> | 2021-11-09 15:48:44 +0300 |
commit | 1de660b6df93c09719361e364211c7c6388c01ce (patch) | |
tree | 8333001eb6c335f9c08836c7e2293eea5912b95c /src | |
parent | ff6a7053f500414dc74568a4e49adbac7f0cf634 (diff) | |
download | unit-1de660b6df93c09719361e364211c7c6388c01ce.tar.gz unit-1de660b6df93c09719361e364211c7c6388c01ce.tar.bz2 |
Changed nxt_process_* for reuse.
This enables the reuse of process creation functions.
Diffstat (limited to 'src')
-rw-r--r-- | src/nxt_main_process.c | 271 | ||||
-rw-r--r-- | src/nxt_port.c | 47 | ||||
-rw-r--r-- | src/nxt_port.h | 1 | ||||
-rw-r--r-- | src/nxt_process.c | 160 | ||||
-rw-r--r-- | src/nxt_process.h | 6 | ||||
-rw-r--r-- | src/nxt_runtime.c | 29 | ||||
-rw-r--r-- | src/nxt_runtime.h | 2 |
7 files changed, 255 insertions, 261 deletions
diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c index 3ca46202..eff96e14 100644 --- a/src/nxt_main_process.c +++ b/src/nxt_main_process.c @@ -34,11 +34,6 @@ typedef struct { static nxt_int_t nxt_main_process_port_create(nxt_task_t *task, nxt_runtime_t *rt); static void nxt_main_process_title(nxt_task_t *task); -static nxt_int_t nxt_main_process_create(nxt_task_t *task, - const nxt_process_init_t init); -static nxt_int_t nxt_main_start_process(nxt_task_t *task, - nxt_process_t *process); -static nxt_process_t *nxt_main_process_new(nxt_task_t *task, nxt_runtime_t *rt); static void nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj, void *data); static void nxt_main_process_sigquit_handler(nxt_task_t *task, void *obj, @@ -49,7 +44,7 @@ static void nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data); static void nxt_main_process_signal_handler(nxt_task_t *task, void *obj, void *data); -static void nxt_main_cleanup_process(nxt_task_t *task, nxt_pid_t pid); +static void nxt_main_process_cleanup(nxt_task_t *task, nxt_process_t *process); static void nxt_main_port_socket_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); static nxt_int_t nxt_main_listening_socket(nxt_sockaddr_t *sa, @@ -97,7 +92,7 @@ nxt_main_process_start(nxt_thread_t *thr, nxt_task_t *task, * nxt_main_port_modules_handler() which starts the controller * and router processes. */ - return nxt_main_process_create(task, nxt_discovery_process); + return nxt_process_init_start(task, nxt_discovery_process); } @@ -368,11 +363,17 @@ nxt_port_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) return; } - process = nxt_main_process_new(task, rt); + process = nxt_process_new(rt); if (nxt_slow_path(process == NULL)) { return; } + process->mem_pool = nxt_mp_create(1024, 128, 256, 32); + if (process->mem_pool == NULL) { + nxt_process_use(task, process, -1); + return; + } + init = nxt_process_init(process); *init = nxt_app_process; @@ -475,7 +476,7 @@ nxt_port_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) process->stream = msg->port_msg.stream; process->data.app = app_conf; - ret = nxt_main_start_process(task, process); + ret = nxt_process_start(task, process); if (nxt_fast_path(ret == NXT_OK || ret == NXT_AGAIN)) { return; } @@ -617,139 +618,6 @@ nxt_main_process_title(nxt_task_t *task) } -static nxt_int_t -nxt_main_process_create(nxt_task_t *task, const nxt_process_init_t init) -{ - nxt_int_t ret; - nxt_runtime_t *rt; - nxt_process_t *process; - nxt_process_init_t *pinit; - - rt = task->thread->runtime; - - process = nxt_main_process_new(task, rt); - if (nxt_slow_path(process == NULL)) { - return NXT_ERROR; - } - - process->name = init.name; - process->user_cred = &rt->user_cred; - - pinit = nxt_process_init(process); - *pinit = init; - - ret = nxt_main_start_process(task, process); - if (nxt_slow_path(ret == NXT_ERROR)) { - nxt_process_use(task, process, -1); - } - - return ret; -} - - -static nxt_process_t * -nxt_main_process_new(nxt_task_t *task, nxt_runtime_t *rt) -{ - nxt_process_t *process; - - process = nxt_runtime_process_new(rt); - if (nxt_slow_path(process == NULL)) { - return NULL; - } - - process->mem_pool = nxt_mp_create(1024, 128, 256, 32); - if (process->mem_pool == NULL) { - nxt_process_use(task, process, -1); - return NULL; - } - - return process; -} - - -static nxt_int_t -nxt_main_start_process(nxt_task_t *task, nxt_process_t *process) -{ - nxt_mp_t *tmp_mp; - nxt_int_t ret; - nxt_pid_t pid; - nxt_port_t *port; - nxt_process_init_t *init; - - init = nxt_process_init(process); - - port = nxt_port_new(task, 0, 0, init->type); - if (nxt_slow_path(port == NULL)) { - return NXT_ERROR; - } - - nxt_process_port_add(task, process, port); - - ret = nxt_port_socket_init(task, port, 0); - if (nxt_slow_path(ret != NXT_OK)) { - goto free_port; - } - - tmp_mp = nxt_mp_create(1024, 128, 256, 32); - if (nxt_slow_path(tmp_mp == NULL)) { - ret = NXT_ERROR; - - goto close_port; - } - - if (init->prefork) { - ret = init->prefork(task, process, tmp_mp); - if (nxt_slow_path(ret != NXT_OK)) { - goto free_mempool; - } - } - - pid = nxt_process_create(task, process); - - switch (pid) { - - case -1: - ret = NXT_ERROR; - break; - - case 0: - /* The child process: return to the event engine work queue loop. */ - - nxt_process_use(task, process, -1); - - ret = NXT_AGAIN; - break; - - default: - /* The main process created a new process. */ - - nxt_process_use(task, process, -1); - - nxt_port_read_close(port); - nxt_port_write_enable(task, port); - - ret = NXT_OK; - break; - } - -free_mempool: - - nxt_mp_destroy(tmp_mp); - -close_port: - - if (nxt_slow_path(ret == NXT_ERROR)) { - nxt_port_close(task, port); - } - -free_port: - - nxt_port_use(task, port, -1); - - return ret; -} - - static void nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj, void *data) { @@ -879,13 +747,19 @@ fail: static void nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data) { - int status; - nxt_err_t err; - nxt_pid_t pid; + int status; + nxt_int_t ret; + nxt_err_t err; + nxt_pid_t pid; + nxt_runtime_t *rt; + nxt_process_t *process; + nxt_process_init_t init; nxt_debug(task, "sigchld handler signo:%d (%s)", (int) (uintptr_t) obj, data); + rt = task->thread->runtime; + for ( ;; ) { pid = waitpid(-1, &status, WNOHANG); @@ -926,7 +800,36 @@ nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data) pid, WEXITSTATUS(status)); } - nxt_main_cleanup_process(task, pid); + process = nxt_runtime_process_find(rt, pid); + + if (process != NULL) { + nxt_main_process_cleanup(task, process); + + if (process->state == NXT_PROCESS_STATE_READY) { + process->stream = 0; + } + + if (nxt_exiting) { + if (rt->nprocesses <= 1) { + nxt_runtime_quit(task, 0); + } + + return; + } + + nxt_port_remove_notify_others(task, process); + + init = *(nxt_process_init_t *) nxt_process_init(process); + + nxt_process_close_ports(task, process); + + if (init.restart) { + ret = nxt_process_init_start(task, init); + if (nxt_slow_path(ret == NXT_ERROR)) { + nxt_alert(task, "failed to restart %s", init.name); + } + } + } } } @@ -940,81 +843,11 @@ nxt_main_process_signal_handler(nxt_task_t *task, void *obj, void *data) static void -nxt_main_cleanup_process(nxt_task_t *task, nxt_pid_t pid) +nxt_main_process_cleanup(nxt_task_t *task, nxt_process_t *process) { - int stream; - nxt_int_t ret; - nxt_buf_t *buf; - nxt_port_t *port; - const char *name; - nxt_runtime_t *rt; - nxt_process_t *process; - nxt_process_init_t init; - - rt = task->thread->runtime; - - process = nxt_runtime_process_find(rt, pid); - if (!process) { - return; - } - if (process->isolation.cleanup != NULL) { process->isolation.cleanup(task, process); } - - name = process->name; - stream = process->stream; - init = *((nxt_process_init_t *) nxt_process_init(process)); - - if (process->state == NXT_PROCESS_STATE_READY) { - process->stream = 0; - } - - nxt_process_close_ports(task, process); - - if (nxt_exiting) { - if (rt->nprocesses <= 1) { - nxt_runtime_quit(task, 0); - } - - return; - } - - nxt_runtime_process_each(rt, process) { - - if (process->pid == nxt_pid - || process->pid == pid - || nxt_queue_is_empty(&process->ports)) - { - continue; - } - - port = nxt_process_port_first(process); - - if (nxt_proc_remove_notify_matrix[init.type][port->type] == 0) { - continue; - } - - buf = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, - sizeof(pid)); - - if (nxt_slow_path(buf == NULL)) { - continue; - } - - buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(pid)); - - nxt_port_socket_write(task, port, NXT_PORT_MSG_REMOVE_PID, -1, - stream, 0, buf); - - } nxt_runtime_process_loop; - - if (init.restart) { - ret = nxt_main_process_create(task, init); - if (nxt_slow_path(ret == NXT_ERROR)) { - nxt_alert(task, "failed to restart %s", name); - } - } } @@ -1407,9 +1240,9 @@ fail: nxt_mp_destroy(mp); - ret = nxt_main_process_create(task, nxt_controller_process); + ret = nxt_process_init_start(task, nxt_controller_process); if (ret == NXT_OK) { - ret = nxt_main_process_create(task, nxt_router_process); + ret = nxt_process_init_start(task, nxt_router_process); } if (nxt_slow_path(ret == NXT_ERROR)) { diff --git a/src/nxt_port.c b/src/nxt_port.c index d4e46564..d6b33f71 100644 --- a/src/nxt_port.c +++ b/src/nxt_port.c @@ -448,6 +448,53 @@ nxt_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) void +nxt_port_remove_notify_others(nxt_task_t *task, nxt_process_t *process) +{ + nxt_pid_t pid; + nxt_buf_t *buf; + nxt_port_t *port; + nxt_runtime_t *rt; + nxt_process_t *p; + nxt_process_type_t ptype; + + pid = process->pid; + + ptype = nxt_process_type(process); + + rt = task->thread->runtime; + + nxt_runtime_process_each(rt, p) { + + if (p->pid == nxt_pid + || p->pid == pid + || nxt_queue_is_empty(&p->ports)) + { + continue; + } + + port = nxt_process_port_first(p); + + if (nxt_proc_remove_notify_matrix[ptype][port->type] == 0) { + continue; + } + + buf = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, + sizeof(pid)); + + if (nxt_slow_path(buf == NULL)) { + continue; + } + + buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(pid)); + + nxt_port_socket_write(task, port, NXT_PORT_MSG_REMOVE_PID, -1, + process->stream, 0, buf); + + } nxt_runtime_process_loop; +} + + +void nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { nxt_buf_t *buf; diff --git a/src/nxt_port.h b/src/nxt_port.h index d7fc8d73..1f24c5da 100644 --- a/src/nxt_port.h +++ b/src/nxt_port.h @@ -339,6 +339,7 @@ nxt_int_t nxt_port_send_port(nxt_task_t *task, nxt_port_t *port, nxt_port_t *new_port, uint32_t stream); void nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt, nxt_uint_t slot, nxt_fd_t fd); +void nxt_port_remove_notify_others(nxt_task_t *task, nxt_process_t *process); void nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); void nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); diff --git a/src/nxt_process.c b/src/nxt_process.c index 87419313..59dd9180 100644 --- a/src/nxt_process.c +++ b/src/nxt_process.c @@ -17,6 +17,7 @@ #include <sys/prctl.h> #endif +static nxt_pid_t nxt_process_create(nxt_task_t *task, nxt_process_t *process); static nxt_int_t nxt_process_setup(nxt_task_t *task, nxt_process_t *process); static nxt_int_t nxt_process_child_fixup(nxt_task_t *task, nxt_process_t *process); @@ -59,6 +60,152 @@ nxt_bool_t nxt_proc_remove_notify_matrix[NXT_PROCESS_MAX][NXT_PROCESS_MAX] = { }; +nxt_process_t * +nxt_process_new(nxt_runtime_t *rt) +{ + nxt_process_t *process; + + process = nxt_mp_zalloc(rt->mem_pool, sizeof(nxt_process_t) + + sizeof(nxt_process_init_t)); + + if (nxt_slow_path(process == NULL)) { + return NULL; + } + + nxt_queue_init(&process->ports); + + nxt_thread_mutex_create(&process->incoming.mutex); + + process->use_count = 1; + + return process; +} + + +void +nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i) +{ + process->use_count += i; + + if (process->use_count == 0) { + nxt_runtime_process_release(task->thread->runtime, process); + } +} + + +nxt_int_t +nxt_process_init_start(nxt_task_t *task, nxt_process_init_t init) +{ + nxt_int_t ret; + nxt_runtime_t *rt; + nxt_process_t *process; + nxt_process_init_t *pinit; + + rt = task->thread->runtime; + + process = nxt_process_new(rt); + if (nxt_slow_path(process == NULL)) { + return NXT_ERROR; + } + + process->name = init.name; + process->user_cred = &rt->user_cred; + + pinit = nxt_process_init(process); + *pinit = init; + + ret = nxt_process_start(task, process); + if (nxt_slow_path(ret == NXT_ERROR)) { + nxt_process_use(task, process, -1); + } + + return ret; +} + + +nxt_int_t +nxt_process_start(nxt_task_t *task, nxt_process_t *process) +{ + nxt_mp_t *tmp_mp; + nxt_int_t ret; + nxt_pid_t pid; + nxt_port_t *port; + nxt_process_init_t *init; + + init = nxt_process_init(process); + + port = nxt_port_new(task, 0, 0, init->type); + if (nxt_slow_path(port == NULL)) { + return NXT_ERROR; + } + + nxt_process_port_add(task, process, port); + + ret = nxt_port_socket_init(task, port, 0); + if (nxt_slow_path(ret != NXT_OK)) { + goto free_port; + } + + tmp_mp = nxt_mp_create(1024, 128, 256, 32); + if (nxt_slow_path(tmp_mp == NULL)) { + ret = NXT_ERROR; + + goto close_port; + } + + if (init->prefork) { + ret = init->prefork(task, process, tmp_mp); + if (nxt_slow_path(ret != NXT_OK)) { + goto free_mempool; + } + } + + pid = nxt_process_create(task, process); + + switch (pid) { + + case -1: + ret = NXT_ERROR; + break; + + case 0: + /* The child process: return to the event engine work queue loop. */ + + nxt_process_use(task, process, -1); + + ret = NXT_AGAIN; + break; + + default: + /* The main process created a new process. */ + + nxt_process_use(task, process, -1); + + nxt_port_read_close(port); + nxt_port_write_enable(task, port); + + ret = NXT_OK; + break; + } + +free_mempool: + + nxt_mp_destroy(tmp_mp); + +close_port: + + if (nxt_slow_path(ret == NXT_ERROR)) { + nxt_port_close(task, port); + } + +free_port: + + nxt_port_use(task, port, -1); + + return ret; +} + + static nxt_int_t nxt_process_child_fixup(nxt_task_t *task, nxt_process_t *process) { @@ -139,7 +286,7 @@ nxt_process_child_fixup(nxt_task_t *task, nxt_process_t *process) } -nxt_pid_t +static nxt_pid_t nxt_process_create(nxt_task_t *task, nxt_process_t *process) { nxt_int_t ret; @@ -680,17 +827,6 @@ nxt_nanosleep(nxt_nsec_t ns) void -nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i) -{ - process->use_count += i; - - if (process->use_count == 0) { - nxt_runtime_process_release(task->thread->runtime, process); - } -} - - -void nxt_process_port_add(nxt_task_t *task, nxt_process_t *process, nxt_port_t *port) { nxt_assert(port->process == NULL); diff --git a/src/nxt_process.h b/src/nxt_process.h index 4f24b179..e4d3c93a 100644 --- a/src/nxt_process.h +++ b/src/nxt_process.h @@ -148,8 +148,6 @@ extern nxt_bool_t nxt_proc_conn_matrix[NXT_PROCESS_MAX][NXT_PROCESS_MAX]; extern nxt_bool_t nxt_proc_remove_notify_matrix[NXT_PROCESS_MAX][NXT_PROCESS_MAX]; -NXT_EXPORT nxt_pid_t nxt_process_create(nxt_task_t *task, - nxt_process_t *process); NXT_EXPORT nxt_pid_t nxt_process_execute(nxt_task_t *task, char *name, char **argv, char **envp); NXT_EXPORT nxt_int_t nxt_process_daemon(nxt_task_t *task); @@ -176,6 +174,10 @@ NXT_EXPORT void nxt_process_port_add(nxt_task_t *task, nxt_process_t *process, #define nxt_process_port_loop \ nxt_queue_loop +nxt_process_t *nxt_process_new(nxt_runtime_t *rt); +void nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i); +nxt_int_t nxt_process_init_start(nxt_task_t *task, nxt_process_init_t init); +nxt_int_t nxt_process_start(nxt_task_t *task, nxt_process_t *process); nxt_process_type_t nxt_process_type(nxt_process_t *process); void nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i); diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c index a30d332b..d6cc728f 100644 --- a/src/nxt_runtime.c +++ b/src/nxt_runtime.c @@ -580,6 +580,7 @@ nxt_runtime_exit(nxt_task_t *task, void *obj, void *data) nxt_runtime_process_each(rt, process) { + nxt_runtime_process_remove(rt, process); nxt_process_close_ports(task, process); } nxt_runtime_process_loop; @@ -1385,30 +1386,6 @@ nxt_runtime_pid_file_create(nxt_task_t *task, nxt_file_name_t *pid_file) } -nxt_process_t * -nxt_runtime_process_new(nxt_runtime_t *rt) -{ - nxt_process_t *process; - - /* TODO: memory failures. */ - - process = nxt_mp_zalloc(rt->mem_pool, - sizeof(nxt_process_t) + sizeof(nxt_process_init_t)); - - if (nxt_slow_path(process == NULL)) { - return NULL; - } - - nxt_queue_init(&process->ports); - - nxt_thread_mutex_create(&process->incoming.mutex); - - process->use_count = 1; - - return process; -} - - void nxt_runtime_process_release(nxt_runtime_t *rt, nxt_process_t *process) { @@ -1512,7 +1489,7 @@ nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid) return process; } - process = nxt_runtime_process_new(rt); + process = nxt_process_new(rt); if (nxt_slow_path(process == NULL)) { nxt_thread_mutex_unlock(&rt->processes_mutex); @@ -1601,7 +1578,7 @@ nxt_runtime_process_add(nxt_task_t *task, nxt_process_t *process) } -static void +void nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process) { nxt_pid_t pid; diff --git a/src/nxt_runtime.h b/src/nxt_runtime.h index 2037fc5d..f40b9389 100644 --- a/src/nxt_runtime.h +++ b/src/nxt_runtime.h @@ -94,8 +94,6 @@ nxt_int_t nxt_runtime_thread_pool_create(nxt_thread_t *thr, nxt_runtime_t *rt, nxt_uint_t max_threads, nxt_nsec_t timeout); -nxt_process_t *nxt_runtime_process_new(nxt_runtime_t *rt); - void nxt_runtime_process_add(nxt_task_t *task, nxt_process_t *process); nxt_process_t *nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid); |