diff options
author | Max Romanov <max.romanov@nginx.com> | 2017-10-19 17:37:19 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2017-10-19 17:37:19 +0300 |
commit | b3aab8c66fa7269147a6696c98a52971020be6e8 (patch) | |
tree | 30fb2198ab3293f7248b6ae65692f792a590501e | |
parent | 6031c63225838d6bc266c4f015d9a1085f600324 (diff) | |
download | unit-b3aab8c66fa7269147a6696c98a52971020be6e8.tar.gz unit-b3aab8c66fa7269147a6696c98a52971020be6e8.tar.bz2 |
Filtering process to keep connection.
- Main process should be connected to all other processes.
- Controller should be connected to Router.
- Router should be connected to Controller and all Workers.
- Workers should be connected to Router worker thread ports only.
This filtering helps to avoid unnecessary communication and various errors
during massive application workers stop / restart.
-rw-r--r-- | src/nxt_main_process.c | 7 | ||||
-rw-r--r-- | src/nxt_port.c | 7 | ||||
-rw-r--r-- | src/nxt_port.h | 2 | ||||
-rw-r--r-- | src/nxt_process.c | 49 | ||||
-rw-r--r-- | src/nxt_process.h | 6 |
5 files changed, 58 insertions, 13 deletions
diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c index 819b797b..d6743b68 100644 --- a/src/nxt_main_process.c +++ b/src/nxt_main_process.c @@ -764,6 +764,7 @@ nxt_main_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid) nxt_port_t *port; nxt_runtime_t *rt; nxt_process_t *process; + nxt_process_type_t ptype; nxt_process_init_t *init; rt = task->thread->runtime; @@ -773,6 +774,8 @@ nxt_main_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid) if (process) { init = process->init; + ptype = nxt_process_type(process); + nxt_process_close_ports(task, process); if (!nxt_exiting) { @@ -787,6 +790,10 @@ nxt_main_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid) port = nxt_process_port_first(process); + if (nxt_proc_remove_notify_martix[ptype][port->type] == 0) { + continue; + } + buf = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, sizeof(pid)); buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(pid)); diff --git a/src/nxt_port.c b/src/nxt_port.c index 52144759..c63ea01b 100644 --- a/src/nxt_port.c +++ b/src/nxt_port.c @@ -177,7 +177,7 @@ nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) } -void +nxt_inline void nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt, nxt_port_t *new_port, uint32_t stream) { @@ -195,10 +195,7 @@ nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt, port = nxt_process_port_first(process); - if (port->type == NXT_PROCESS_MAIN - || port->type == NXT_PROCESS_CONTROLLER - || port->type == NXT_PROCESS_ROUTER) - { + if (nxt_proc_conn_martix[port->type][new_port->type]) { (void) nxt_port_send_port(task, port, new_port, stream); } diff --git a/src/nxt_port.h b/src/nxt_port.h index bade2adb..0d913df5 100644 --- a/src/nxt_port.h +++ b/src/nxt_port.h @@ -227,8 +227,6 @@ nxt_int_t nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, void nxt_port_enable(nxt_task_t *task, nxt_port_t *port, nxt_port_handlers_t *handlers); -void nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt, - nxt_port_t *port, uint32_t stream); nxt_int_t nxt_port_send_port(nxt_task_t *task, nxt_port_t *port, nxt_port_t *new_port, uint32_t stream); void nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt, diff --git a/src/nxt_process.c b/src/nxt_process.c index 1b01713d..4d0241ec 100644 --- a/src/nxt_process.c +++ b/src/nxt_process.c @@ -18,13 +18,31 @@ nxt_pid_t nxt_pid; /* An original parent process pid. */ nxt_pid_t nxt_ppid; +nxt_bool_t nxt_proc_conn_martix[NXT_PROCESS_MAX][NXT_PROCESS_MAX] = { + { 0, 0, 0, 0, 0, 0 }, + { 0, 1, 1, 1, 1, 1 }, + { 0, 1, 0, 0, 0, 0 }, + { 0, 1, 0, 0, 1, 0 }, + { 0, 1, 0, 1, 0, 1 }, + { 0, 1, 0, 0, 0, 0 }, +}; + +nxt_bool_t nxt_proc_remove_notify_martix[NXT_PROCESS_MAX][NXT_PROCESS_MAX] = { + { 0, 0, 0, 0, 0, 0 }, + { 0, 0, 0, 0, 0, 0 }, + { 0, 0, 0, 0, 0, 0 }, + { 0, 0, 0, 0, 1, 0 }, + { 0, 0, 0, 1, 0, 1 }, + { 0, 0, 0, 0, 1, 0 }, +}; nxt_pid_t nxt_process_create(nxt_task_t *task, nxt_process_t *process) { - nxt_pid_t pid; - nxt_process_t *p; - nxt_runtime_t *rt; + nxt_pid_t pid; + nxt_process_t *p; + nxt_runtime_t *rt; + nxt_process_type_t ptype; rt = task->thread->runtime; @@ -48,6 +66,8 @@ nxt_process_create(nxt_task_t *task, nxt_process_t *process) rt->types = 0; + ptype = process->init->type; + nxt_port_reset_next_id(); nxt_event_engine_thread_adopt(task->thread->engine); @@ -55,16 +75,25 @@ nxt_process_create(nxt_task_t *task, nxt_process_t *process) /* Remove not ready processes */ nxt_runtime_process_each(rt, p) { + if (nxt_proc_conn_martix[ptype][nxt_process_type(p)] == 0) { + nxt_debug(task, "remove not required process %PI", p->pid); + + nxt_process_close_ports(task, p); + + continue; + } + if (!p->ready) { nxt_debug(task, "remove not ready process %PI", p->pid); nxt_process_close_ports(task, p); - } else { - nxt_port_mmaps_destroy(&p->incoming, 0); - nxt_port_mmaps_destroy(&p->outgoing, 0); + continue; } + nxt_port_mmaps_destroy(&p->incoming, 0); + nxt_port_mmaps_destroy(&p->outgoing, 0); + } nxt_runtime_process_loop; nxt_runtime_process_add(task, process); @@ -598,6 +627,14 @@ nxt_process_port_add(nxt_task_t *task, nxt_process_t *process, nxt_port_t *port) } +nxt_process_type_t +nxt_process_type(nxt_process_t *process) +{ + return nxt_queue_is_empty(&process->ports) ? 0 : + (nxt_process_port_first(process))->type; +} + + void nxt_process_close_ports(nxt_task_t *task, nxt_process_t *process) { diff --git a/src/nxt_process.h b/src/nxt_process.h index ae0e1661..c2a011ef 100644 --- a/src/nxt_process.h +++ b/src/nxt_process.h @@ -70,6 +70,10 @@ typedef struct { } nxt_process_t; +extern nxt_bool_t nxt_proc_conn_martix[NXT_PROCESS_MAX][NXT_PROCESS_MAX]; +extern nxt_bool_t + nxt_proc_remove_notify_martix[NXT_PROCESS_MAX][NXT_PROCESS_MAX]; + NXT_EXPORT nxt_pid_t nxt_process_create(nxt_task_t *task, nxt_process_t *process); NXT_EXPORT nxt_pid_t nxt_process_execute(nxt_task_t *task, char *name, @@ -95,6 +99,8 @@ NXT_EXPORT void nxt_process_port_add(nxt_task_t *task, nxt_process_t *process, #define nxt_process_port_loop \ nxt_queue_loop +nxt_process_type_t nxt_process_type(nxt_process_t *process); + void nxt_process_close_ports(nxt_task_t *task, nxt_process_t *process); void nxt_process_connected_port_add(nxt_process_t *process, nxt_port_t *port); |