summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorTiago Natel de Moura <t.nateldemoura@f5.com>2021-11-09 15:48:44 +0300
committerTiago Natel de Moura <t.nateldemoura@f5.com>2021-11-09 15:48:44 +0300
commit1de660b6df93c09719361e364211c7c6388c01ce (patch)
tree8333001eb6c335f9c08836c7e2293eea5912b95c
parentff6a7053f500414dc74568a4e49adbac7f0cf634 (diff)
downloadunit-1de660b6df93c09719361e364211c7c6388c01ce.tar.gz
unit-1de660b6df93c09719361e364211c7c6388c01ce.tar.bz2
Changed nxt_process_* for reuse.
This enables the reuse of process creation functions.
Diffstat (limited to '')
-rw-r--r--src/nxt_main_process.c271
-rw-r--r--src/nxt_port.c47
-rw-r--r--src/nxt_port.h1
-rw-r--r--src/nxt_process.c160
-rw-r--r--src/nxt_process.h6
-rw-r--r--src/nxt_runtime.c29
-rw-r--r--src/nxt_runtime.h2
7 files changed, 255 insertions, 261 deletions
diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c
index 3ca46202..eff96e14 100644
--- a/src/nxt_main_process.c
+++ b/src/nxt_main_process.c
@@ -34,11 +34,6 @@ typedef struct {
static nxt_int_t nxt_main_process_port_create(nxt_task_t *task,
nxt_runtime_t *rt);
static void nxt_main_process_title(nxt_task_t *task);
-static nxt_int_t nxt_main_process_create(nxt_task_t *task,
- const nxt_process_init_t init);
-static nxt_int_t nxt_main_start_process(nxt_task_t *task,
- nxt_process_t *process);
-static nxt_process_t *nxt_main_process_new(nxt_task_t *task, nxt_runtime_t *rt);
static void nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj,
void *data);
static void nxt_main_process_sigquit_handler(nxt_task_t *task, void *obj,
@@ -49,7 +44,7 @@ static void nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj,
void *data);
static void nxt_main_process_signal_handler(nxt_task_t *task, void *obj,
void *data);
-static void nxt_main_cleanup_process(nxt_task_t *task, nxt_pid_t pid);
+static void nxt_main_process_cleanup(nxt_task_t *task, nxt_process_t *process);
static void nxt_main_port_socket_handler(nxt_task_t *task,
nxt_port_recv_msg_t *msg);
static nxt_int_t nxt_main_listening_socket(nxt_sockaddr_t *sa,
@@ -97,7 +92,7 @@ nxt_main_process_start(nxt_thread_t *thr, nxt_task_t *task,
* nxt_main_port_modules_handler() which starts the controller
* and router processes.
*/
- return nxt_main_process_create(task, nxt_discovery_process);
+ return nxt_process_init_start(task, nxt_discovery_process);
}
@@ -368,11 +363,17 @@ nxt_port_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
return;
}
- process = nxt_main_process_new(task, rt);
+ process = nxt_process_new(rt);
if (nxt_slow_path(process == NULL)) {
return;
}
+ process->mem_pool = nxt_mp_create(1024, 128, 256, 32);
+ if (process->mem_pool == NULL) {
+ nxt_process_use(task, process, -1);
+ return;
+ }
+
init = nxt_process_init(process);
*init = nxt_app_process;
@@ -475,7 +476,7 @@ nxt_port_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
process->stream = msg->port_msg.stream;
process->data.app = app_conf;
- ret = nxt_main_start_process(task, process);
+ ret = nxt_process_start(task, process);
if (nxt_fast_path(ret == NXT_OK || ret == NXT_AGAIN)) {
return;
}
@@ -617,139 +618,6 @@ nxt_main_process_title(nxt_task_t *task)
}
-static nxt_int_t
-nxt_main_process_create(nxt_task_t *task, const nxt_process_init_t init)
-{
- nxt_int_t ret;
- nxt_runtime_t *rt;
- nxt_process_t *process;
- nxt_process_init_t *pinit;
-
- rt = task->thread->runtime;
-
- process = nxt_main_process_new(task, rt);
- if (nxt_slow_path(process == NULL)) {
- return NXT_ERROR;
- }
-
- process->name = init.name;
- process->user_cred = &rt->user_cred;
-
- pinit = nxt_process_init(process);
- *pinit = init;
-
- ret = nxt_main_start_process(task, process);
- if (nxt_slow_path(ret == NXT_ERROR)) {
- nxt_process_use(task, process, -1);
- }
-
- return ret;
-}
-
-
-static nxt_process_t *
-nxt_main_process_new(nxt_task_t *task, nxt_runtime_t *rt)
-{
- nxt_process_t *process;
-
- process = nxt_runtime_process_new(rt);
- if (nxt_slow_path(process == NULL)) {
- return NULL;
- }
-
- process->mem_pool = nxt_mp_create(1024, 128, 256, 32);
- if (process->mem_pool == NULL) {
- nxt_process_use(task, process, -1);
- return NULL;
- }
-
- return process;
-}
-
-
-static nxt_int_t
-nxt_main_start_process(nxt_task_t *task, nxt_process_t *process)
-{
- nxt_mp_t *tmp_mp;
- nxt_int_t ret;
- nxt_pid_t pid;
- nxt_port_t *port;
- nxt_process_init_t *init;
-
- init = nxt_process_init(process);
-
- port = nxt_port_new(task, 0, 0, init->type);
- if (nxt_slow_path(port == NULL)) {
- return NXT_ERROR;
- }
-
- nxt_process_port_add(task, process, port);
-
- ret = nxt_port_socket_init(task, port, 0);
- if (nxt_slow_path(ret != NXT_OK)) {
- goto free_port;
- }
-
- tmp_mp = nxt_mp_create(1024, 128, 256, 32);
- if (nxt_slow_path(tmp_mp == NULL)) {
- ret = NXT_ERROR;
-
- goto close_port;
- }
-
- if (init->prefork) {
- ret = init->prefork(task, process, tmp_mp);
- if (nxt_slow_path(ret != NXT_OK)) {
- goto free_mempool;
- }
- }
-
- pid = nxt_process_create(task, process);
-
- switch (pid) {
-
- case -1:
- ret = NXT_ERROR;
- break;
-
- case 0:
- /* The child process: return to the event engine work queue loop. */
-
- nxt_process_use(task, process, -1);
-
- ret = NXT_AGAIN;
- break;
-
- default:
- /* The main process created a new process. */
-
- nxt_process_use(task, process, -1);
-
- nxt_port_read_close(port);
- nxt_port_write_enable(task, port);
-
- ret = NXT_OK;
- break;
- }
-
-free_mempool:
-
- nxt_mp_destroy(tmp_mp);
-
-close_port:
-
- if (nxt_slow_path(ret == NXT_ERROR)) {
- nxt_port_close(task, port);
- }
-
-free_port:
-
- nxt_port_use(task, port, -1);
-
- return ret;
-}
-
-
static void
nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj, void *data)
{
@@ -879,13 +747,19 @@ fail:
static void
nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data)
{
- int status;
- nxt_err_t err;
- nxt_pid_t pid;
+ int status;
+ nxt_int_t ret;
+ nxt_err_t err;
+ nxt_pid_t pid;
+ nxt_runtime_t *rt;
+ nxt_process_t *process;
+ nxt_process_init_t init;
nxt_debug(task, "sigchld handler signo:%d (%s)",
(int) (uintptr_t) obj, data);
+ rt = task->thread->runtime;
+
for ( ;; ) {
pid = waitpid(-1, &status, WNOHANG);
@@ -926,7 +800,36 @@ nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data)
pid, WEXITSTATUS(status));
}
- nxt_main_cleanup_process(task, pid);
+ process = nxt_runtime_process_find(rt, pid);
+
+ if (process != NULL) {
+ nxt_main_process_cleanup(task, process);
+
+ if (process->state == NXT_PROCESS_STATE_READY) {
+ process->stream = 0;
+ }
+
+ if (nxt_exiting) {
+ if (rt->nprocesses <= 1) {
+ nxt_runtime_quit(task, 0);
+ }
+
+ return;
+ }
+
+ nxt_port_remove_notify_others(task, process);
+
+ init = *(nxt_process_init_t *) nxt_process_init(process);
+
+ nxt_process_close_ports(task, process);
+
+ if (init.restart) {
+ ret = nxt_process_init_start(task, init);
+ if (nxt_slow_path(ret == NXT_ERROR)) {
+ nxt_alert(task, "failed to restart %s", init.name);
+ }
+ }
+ }
}
}
@@ -940,81 +843,11 @@ nxt_main_process_signal_handler(nxt_task_t *task, void *obj, void *data)
static void
-nxt_main_cleanup_process(nxt_task_t *task, nxt_pid_t pid)
+nxt_main_process_cleanup(nxt_task_t *task, nxt_process_t *process)
{
- int stream;
- nxt_int_t ret;
- nxt_buf_t *buf;
- nxt_port_t *port;
- const char *name;
- nxt_runtime_t *rt;
- nxt_process_t *process;
- nxt_process_init_t init;
-
- rt = task->thread->runtime;
-
- process = nxt_runtime_process_find(rt, pid);
- if (!process) {
- return;
- }
-
if (process->isolation.cleanup != NULL) {
process->isolation.cleanup(task, process);
}
-
- name = process->name;
- stream = process->stream;
- init = *((nxt_process_init_t *) nxt_process_init(process));
-
- if (process->state == NXT_PROCESS_STATE_READY) {
- process->stream = 0;
- }
-
- nxt_process_close_ports(task, process);
-
- if (nxt_exiting) {
- if (rt->nprocesses <= 1) {
- nxt_runtime_quit(task, 0);
- }
-
- return;
- }
-
- nxt_runtime_process_each(rt, process) {
-
- if (process->pid == nxt_pid
- || process->pid == pid
- || nxt_queue_is_empty(&process->ports))
- {
- continue;
- }
-
- port = nxt_process_port_first(process);
-
- if (nxt_proc_remove_notify_matrix[init.type][port->type] == 0) {
- continue;
- }
-
- buf = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
- sizeof(pid));
-
- if (nxt_slow_path(buf == NULL)) {
- continue;
- }
-
- buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(pid));
-
- nxt_port_socket_write(task, port, NXT_PORT_MSG_REMOVE_PID, -1,
- stream, 0, buf);
-
- } nxt_runtime_process_loop;
-
- if (init.restart) {
- ret = nxt_main_process_create(task, init);
- if (nxt_slow_path(ret == NXT_ERROR)) {
- nxt_alert(task, "failed to restart %s", name);
- }
- }
}
@@ -1407,9 +1240,9 @@ fail:
nxt_mp_destroy(mp);
- ret = nxt_main_process_create(task, nxt_controller_process);
+ ret = nxt_process_init_start(task, nxt_controller_process);
if (ret == NXT_OK) {
- ret = nxt_main_process_create(task, nxt_router_process);
+ ret = nxt_process_init_start(task, nxt_router_process);
}
if (nxt_slow_path(ret == NXT_ERROR)) {
diff --git a/src/nxt_port.c b/src/nxt_port.c
index d4e46564..d6b33f71 100644
--- a/src/nxt_port.c
+++ b/src/nxt_port.c
@@ -448,6 +448,53 @@ nxt_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
void
+nxt_port_remove_notify_others(nxt_task_t *task, nxt_process_t *process)
+{
+ nxt_pid_t pid;
+ nxt_buf_t *buf;
+ nxt_port_t *port;
+ nxt_runtime_t *rt;
+ nxt_process_t *p;
+ nxt_process_type_t ptype;
+
+ pid = process->pid;
+
+ ptype = nxt_process_type(process);
+
+ rt = task->thread->runtime;
+
+ nxt_runtime_process_each(rt, p) {
+
+ if (p->pid == nxt_pid
+ || p->pid == pid
+ || nxt_queue_is_empty(&p->ports))
+ {
+ continue;
+ }
+
+ port = nxt_process_port_first(p);
+
+ if (nxt_proc_remove_notify_matrix[ptype][port->type] == 0) {
+ continue;
+ }
+
+ buf = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
+ sizeof(pid));
+
+ if (nxt_slow_path(buf == NULL)) {
+ continue;
+ }
+
+ buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(pid));
+
+ nxt_port_socket_write(task, port, NXT_PORT_MSG_REMOVE_PID, -1,
+ process->stream, 0, buf);
+
+ } nxt_runtime_process_loop;
+}
+
+
+void
nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
nxt_buf_t *buf;
diff --git a/src/nxt_port.h b/src/nxt_port.h
index d7fc8d73..1f24c5da 100644
--- a/src/nxt_port.h
+++ b/src/nxt_port.h
@@ -339,6 +339,7 @@ nxt_int_t nxt_port_send_port(nxt_task_t *task, nxt_port_t *port,
nxt_port_t *new_port, uint32_t stream);
void nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt,
nxt_uint_t slot, nxt_fd_t fd);
+void nxt_port_remove_notify_others(nxt_task_t *task, nxt_process_t *process);
void nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
void nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
diff --git a/src/nxt_process.c b/src/nxt_process.c
index 87419313..59dd9180 100644
--- a/src/nxt_process.c
+++ b/src/nxt_process.c
@@ -17,6 +17,7 @@
#include <sys/prctl.h>
#endif
+static nxt_pid_t nxt_process_create(nxt_task_t *task, nxt_process_t *process);
static nxt_int_t nxt_process_setup(nxt_task_t *task, nxt_process_t *process);
static nxt_int_t nxt_process_child_fixup(nxt_task_t *task,
nxt_process_t *process);
@@ -59,6 +60,152 @@ nxt_bool_t nxt_proc_remove_notify_matrix[NXT_PROCESS_MAX][NXT_PROCESS_MAX] = {
};
+nxt_process_t *
+nxt_process_new(nxt_runtime_t *rt)
+{
+ nxt_process_t *process;
+
+ process = nxt_mp_zalloc(rt->mem_pool, sizeof(nxt_process_t)
+ + sizeof(nxt_process_init_t));
+
+ if (nxt_slow_path(process == NULL)) {
+ return NULL;
+ }
+
+ nxt_queue_init(&process->ports);
+
+ nxt_thread_mutex_create(&process->incoming.mutex);
+
+ process->use_count = 1;
+
+ return process;
+}
+
+
+void
+nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i)
+{
+ process->use_count += i;
+
+ if (process->use_count == 0) {
+ nxt_runtime_process_release(task->thread->runtime, process);
+ }
+}
+
+
+nxt_int_t
+nxt_process_init_start(nxt_task_t *task, nxt_process_init_t init)
+{
+ nxt_int_t ret;
+ nxt_runtime_t *rt;
+ nxt_process_t *process;
+ nxt_process_init_t *pinit;
+
+ rt = task->thread->runtime;
+
+ process = nxt_process_new(rt);
+ if (nxt_slow_path(process == NULL)) {
+ return NXT_ERROR;
+ }
+
+ process->name = init.name;
+ process->user_cred = &rt->user_cred;
+
+ pinit = nxt_process_init(process);
+ *pinit = init;
+
+ ret = nxt_process_start(task, process);
+ if (nxt_slow_path(ret == NXT_ERROR)) {
+ nxt_process_use(task, process, -1);
+ }
+
+ return ret;
+}
+
+
+nxt_int_t
+nxt_process_start(nxt_task_t *task, nxt_process_t *process)
+{
+ nxt_mp_t *tmp_mp;
+ nxt_int_t ret;
+ nxt_pid_t pid;
+ nxt_port_t *port;
+ nxt_process_init_t *init;
+
+ init = nxt_process_init(process);
+
+ port = nxt_port_new(task, 0, 0, init->type);
+ if (nxt_slow_path(port == NULL)) {
+ return NXT_ERROR;
+ }
+
+ nxt_process_port_add(task, process, port);
+
+ ret = nxt_port_socket_init(task, port, 0);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ goto free_port;
+ }
+
+ tmp_mp = nxt_mp_create(1024, 128, 256, 32);
+ if (nxt_slow_path(tmp_mp == NULL)) {
+ ret = NXT_ERROR;
+
+ goto close_port;
+ }
+
+ if (init->prefork) {
+ ret = init->prefork(task, process, tmp_mp);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ goto free_mempool;
+ }
+ }
+
+ pid = nxt_process_create(task, process);
+
+ switch (pid) {
+
+ case -1:
+ ret = NXT_ERROR;
+ break;
+
+ case 0:
+ /* The child process: return to the event engine work queue loop. */
+
+ nxt_process_use(task, process, -1);
+
+ ret = NXT_AGAIN;
+ break;
+
+ default:
+ /* The main process created a new process. */
+
+ nxt_process_use(task, process, -1);
+
+ nxt_port_read_close(port);
+ nxt_port_write_enable(task, port);
+
+ ret = NXT_OK;
+ break;
+ }
+
+free_mempool:
+
+ nxt_mp_destroy(tmp_mp);
+
+close_port:
+
+ if (nxt_slow_path(ret == NXT_ERROR)) {
+ nxt_port_close(task, port);
+ }
+
+free_port:
+
+ nxt_port_use(task, port, -1);
+
+ return ret;
+}
+
+
static nxt_int_t
nxt_process_child_fixup(nxt_task_t *task, nxt_process_t *process)
{
@@ -139,7 +286,7 @@ nxt_process_child_fixup(nxt_task_t *task, nxt_process_t *process)
}
-nxt_pid_t
+static nxt_pid_t
nxt_process_create(nxt_task_t *task, nxt_process_t *process)
{
nxt_int_t ret;
@@ -680,17 +827,6 @@ nxt_nanosleep(nxt_nsec_t ns)
void
-nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i)
-{
- process->use_count += i;
-
- if (process->use_count == 0) {
- nxt_runtime_process_release(task->thread->runtime, process);
- }
-}
-
-
-void
nxt_process_port_add(nxt_task_t *task, nxt_process_t *process, nxt_port_t *port)
{
nxt_assert(port->process == NULL);
diff --git a/src/nxt_process.h b/src/nxt_process.h
index 4f24b179..e4d3c93a 100644
--- a/src/nxt_process.h
+++ b/src/nxt_process.h
@@ -148,8 +148,6 @@ extern nxt_bool_t nxt_proc_conn_matrix[NXT_PROCESS_MAX][NXT_PROCESS_MAX];
extern nxt_bool_t
nxt_proc_remove_notify_matrix[NXT_PROCESS_MAX][NXT_PROCESS_MAX];
-NXT_EXPORT nxt_pid_t nxt_process_create(nxt_task_t *task,
- nxt_process_t *process);
NXT_EXPORT nxt_pid_t nxt_process_execute(nxt_task_t *task, char *name,
char **argv, char **envp);
NXT_EXPORT nxt_int_t nxt_process_daemon(nxt_task_t *task);
@@ -176,6 +174,10 @@ NXT_EXPORT void nxt_process_port_add(nxt_task_t *task, nxt_process_t *process,
#define nxt_process_port_loop \
nxt_queue_loop
+nxt_process_t *nxt_process_new(nxt_runtime_t *rt);
+void nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i);
+nxt_int_t nxt_process_init_start(nxt_task_t *task, nxt_process_init_t init);
+nxt_int_t nxt_process_start(nxt_task_t *task, nxt_process_t *process);
nxt_process_type_t nxt_process_type(nxt_process_t *process);
void nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i);
diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c
index a30d332b..d6cc728f 100644
--- a/src/nxt_runtime.c
+++ b/src/nxt_runtime.c
@@ -580,6 +580,7 @@ nxt_runtime_exit(nxt_task_t *task, void *obj, void *data)
nxt_runtime_process_each(rt, process) {
+ nxt_runtime_process_remove(rt, process);
nxt_process_close_ports(task, process);
} nxt_runtime_process_loop;
@@ -1385,30 +1386,6 @@ nxt_runtime_pid_file_create(nxt_task_t *task, nxt_file_name_t *pid_file)
}
-nxt_process_t *
-nxt_runtime_process_new(nxt_runtime_t *rt)
-{
- nxt_process_t *process;
-
- /* TODO: memory failures. */
-
- process = nxt_mp_zalloc(rt->mem_pool,
- sizeof(nxt_process_t) + sizeof(nxt_process_init_t));
-
- if (nxt_slow_path(process == NULL)) {
- return NULL;
- }
-
- nxt_queue_init(&process->ports);
-
- nxt_thread_mutex_create(&process->incoming.mutex);
-
- process->use_count = 1;
-
- return process;
-}
-
-
void
nxt_runtime_process_release(nxt_runtime_t *rt, nxt_process_t *process)
{
@@ -1512,7 +1489,7 @@ nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid)
return process;
}
- process = nxt_runtime_process_new(rt);
+ process = nxt_process_new(rt);
if (nxt_slow_path(process == NULL)) {
nxt_thread_mutex_unlock(&rt->processes_mutex);
@@ -1601,7 +1578,7 @@ nxt_runtime_process_add(nxt_task_t *task, nxt_process_t *process)
}
-static void
+void
nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process)
{
nxt_pid_t pid;
diff --git a/src/nxt_runtime.h b/src/nxt_runtime.h
index 2037fc5d..f40b9389 100644
--- a/src/nxt_runtime.h
+++ b/src/nxt_runtime.h
@@ -94,8 +94,6 @@ nxt_int_t nxt_runtime_thread_pool_create(nxt_thread_t *thr, nxt_runtime_t *rt,
nxt_uint_t max_threads, nxt_nsec_t timeout);
-nxt_process_t *nxt_runtime_process_new(nxt_runtime_t *rt);
-
void nxt_runtime_process_add(nxt_task_t *task, nxt_process_t *process);
nxt_process_t *nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid);