diff options
author | Max Romanov <max.romanov@nginx.com> | 2017-10-19 17:37:19 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2017-10-19 17:37:19 +0300 |
commit | b3aab8c66fa7269147a6696c98a52971020be6e8 (patch) | |
tree | 30fb2198ab3293f7248b6ae65692f792a590501e /src/nxt_process.c | |
parent | 6031c63225838d6bc266c4f015d9a1085f600324 (diff) | |
download | unit-b3aab8c66fa7269147a6696c98a52971020be6e8.tar.gz unit-b3aab8c66fa7269147a6696c98a52971020be6e8.tar.bz2 |
Filtering process to keep connection.
- Main process should be connected to all other processes.
- Controller should be connected to Router.
- Router should be connected to Controller and all Workers.
- Workers should be connected to Router worker thread ports only.
This filtering helps to avoid unnecessary communication and various errors
during massive application workers stop / restart.
Diffstat (limited to 'src/nxt_process.c')
-rw-r--r-- | src/nxt_process.c | 49 |
1 files changed, 43 insertions, 6 deletions
diff --git a/src/nxt_process.c b/src/nxt_process.c index 1b01713d..4d0241ec 100644 --- a/src/nxt_process.c +++ b/src/nxt_process.c @@ -18,13 +18,31 @@ nxt_pid_t nxt_pid; /* An original parent process pid. */ nxt_pid_t nxt_ppid; +nxt_bool_t nxt_proc_conn_martix[NXT_PROCESS_MAX][NXT_PROCESS_MAX] = { + { 0, 0, 0, 0, 0, 0 }, + { 0, 1, 1, 1, 1, 1 }, + { 0, 1, 0, 0, 0, 0 }, + { 0, 1, 0, 0, 1, 0 }, + { 0, 1, 0, 1, 0, 1 }, + { 0, 1, 0, 0, 0, 0 }, +}; + +nxt_bool_t nxt_proc_remove_notify_martix[NXT_PROCESS_MAX][NXT_PROCESS_MAX] = { + { 0, 0, 0, 0, 0, 0 }, + { 0, 0, 0, 0, 0, 0 }, + { 0, 0, 0, 0, 0, 0 }, + { 0, 0, 0, 0, 1, 0 }, + { 0, 0, 0, 1, 0, 1 }, + { 0, 0, 0, 0, 1, 0 }, +}; nxt_pid_t nxt_process_create(nxt_task_t *task, nxt_process_t *process) { - nxt_pid_t pid; - nxt_process_t *p; - nxt_runtime_t *rt; + nxt_pid_t pid; + nxt_process_t *p; + nxt_runtime_t *rt; + nxt_process_type_t ptype; rt = task->thread->runtime; @@ -48,6 +66,8 @@ nxt_process_create(nxt_task_t *task, nxt_process_t *process) rt->types = 0; + ptype = process->init->type; + nxt_port_reset_next_id(); nxt_event_engine_thread_adopt(task->thread->engine); @@ -55,16 +75,25 @@ nxt_process_create(nxt_task_t *task, nxt_process_t *process) /* Remove not ready processes */ nxt_runtime_process_each(rt, p) { + if (nxt_proc_conn_martix[ptype][nxt_process_type(p)] == 0) { + nxt_debug(task, "remove not required process %PI", p->pid); + + nxt_process_close_ports(task, p); + + continue; + } + if (!p->ready) { nxt_debug(task, "remove not ready process %PI", p->pid); nxt_process_close_ports(task, p); - } else { - nxt_port_mmaps_destroy(&p->incoming, 0); - nxt_port_mmaps_destroy(&p->outgoing, 0); + continue; } + nxt_port_mmaps_destroy(&p->incoming, 0); + nxt_port_mmaps_destroy(&p->outgoing, 0); + } nxt_runtime_process_loop; nxt_runtime_process_add(task, process); @@ -598,6 +627,14 @@ nxt_process_port_add(nxt_task_t *task, nxt_process_t *process, nxt_port_t *port) } +nxt_process_type_t +nxt_process_type(nxt_process_t *process) +{ + return nxt_queue_is_empty(&process->ports) ? 0 : + (nxt_process_port_first(process))->type; +} + + void nxt_process_close_ports(nxt_task_t *task, nxt_process_t *process) { |