summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_process.c
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2017-10-19 17:37:19 +0300
committerMax Romanov <max.romanov@nginx.com>2017-10-19 17:37:19 +0300
commitb3aab8c66fa7269147a6696c98a52971020be6e8 (patch)
tree30fb2198ab3293f7248b6ae65692f792a590501e /src/nxt_process.c
parent6031c63225838d6bc266c4f015d9a1085f600324 (diff)
downloadunit-b3aab8c66fa7269147a6696c98a52971020be6e8.tar.gz
unit-b3aab8c66fa7269147a6696c98a52971020be6e8.tar.bz2
Filtering process to keep connection.
- Main process should be connected to all other processes. - Controller should be connected to Router. - Router should be connected to Controller and all Workers. - Workers should be connected to Router worker thread ports only. This filtering helps to avoid unnecessary communication and various errors during massive application workers stop / restart.
Diffstat (limited to 'src/nxt_process.c')
-rw-r--r--src/nxt_process.c49
1 files changed, 43 insertions, 6 deletions
diff --git a/src/nxt_process.c b/src/nxt_process.c
index 1b01713d..4d0241ec 100644
--- a/src/nxt_process.c
+++ b/src/nxt_process.c
@@ -18,13 +18,31 @@ nxt_pid_t nxt_pid;
/* An original parent process pid. */
nxt_pid_t nxt_ppid;
+nxt_bool_t nxt_proc_conn_martix[NXT_PROCESS_MAX][NXT_PROCESS_MAX] = {
+ { 0, 0, 0, 0, 0, 0 },
+ { 0, 1, 1, 1, 1, 1 },
+ { 0, 1, 0, 0, 0, 0 },
+ { 0, 1, 0, 0, 1, 0 },
+ { 0, 1, 0, 1, 0, 1 },
+ { 0, 1, 0, 0, 0, 0 },
+};
+
+nxt_bool_t nxt_proc_remove_notify_martix[NXT_PROCESS_MAX][NXT_PROCESS_MAX] = {
+ { 0, 0, 0, 0, 0, 0 },
+ { 0, 0, 0, 0, 0, 0 },
+ { 0, 0, 0, 0, 0, 0 },
+ { 0, 0, 0, 0, 1, 0 },
+ { 0, 0, 0, 1, 0, 1 },
+ { 0, 0, 0, 0, 1, 0 },
+};
nxt_pid_t
nxt_process_create(nxt_task_t *task, nxt_process_t *process)
{
- nxt_pid_t pid;
- nxt_process_t *p;
- nxt_runtime_t *rt;
+ nxt_pid_t pid;
+ nxt_process_t *p;
+ nxt_runtime_t *rt;
+ nxt_process_type_t ptype;
rt = task->thread->runtime;
@@ -48,6 +66,8 @@ nxt_process_create(nxt_task_t *task, nxt_process_t *process)
rt->types = 0;
+ ptype = process->init->type;
+
nxt_port_reset_next_id();
nxt_event_engine_thread_adopt(task->thread->engine);
@@ -55,16 +75,25 @@ nxt_process_create(nxt_task_t *task, nxt_process_t *process)
/* Remove not ready processes */
nxt_runtime_process_each(rt, p) {
+ if (nxt_proc_conn_martix[ptype][nxt_process_type(p)] == 0) {
+ nxt_debug(task, "remove not required process %PI", p->pid);
+
+ nxt_process_close_ports(task, p);
+
+ continue;
+ }
+
if (!p->ready) {
nxt_debug(task, "remove not ready process %PI", p->pid);
nxt_process_close_ports(task, p);
- } else {
- nxt_port_mmaps_destroy(&p->incoming, 0);
- nxt_port_mmaps_destroy(&p->outgoing, 0);
+ continue;
}
+ nxt_port_mmaps_destroy(&p->incoming, 0);
+ nxt_port_mmaps_destroy(&p->outgoing, 0);
+
} nxt_runtime_process_loop;
nxt_runtime_process_add(task, process);
@@ -598,6 +627,14 @@ nxt_process_port_add(nxt_task_t *task, nxt_process_t *process, nxt_port_t *port)
}
+nxt_process_type_t
+nxt_process_type(nxt_process_t *process)
+{
+ return nxt_queue_is_empty(&process->ports) ? 0 :
+ (nxt_process_port_first(process))->type;
+}
+
+
void
nxt_process_close_ports(nxt_task_t *task, nxt_process_t *process)
{