summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/nxt_main_process.c7
-rw-r--r--src/nxt_port.c7
-rw-r--r--src/nxt_port.h2
-rw-r--r--src/nxt_process.c49
-rw-r--r--src/nxt_process.h6
5 files changed, 58 insertions, 13 deletions
diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c
index 819b797b..d6743b68 100644
--- a/src/nxt_main_process.c
+++ b/src/nxt_main_process.c
@@ -764,6 +764,7 @@ nxt_main_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid)
nxt_port_t *port;
nxt_runtime_t *rt;
nxt_process_t *process;
+ nxt_process_type_t ptype;
nxt_process_init_t *init;
rt = task->thread->runtime;
@@ -773,6 +774,8 @@ nxt_main_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid)
if (process) {
init = process->init;
+ ptype = nxt_process_type(process);
+
nxt_process_close_ports(task, process);
if (!nxt_exiting) {
@@ -787,6 +790,10 @@ nxt_main_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid)
port = nxt_process_port_first(process);
+ if (nxt_proc_remove_notify_martix[ptype][port->type] == 0) {
+ continue;
+ }
+
buf = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
sizeof(pid));
buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(pid));
diff --git a/src/nxt_port.c b/src/nxt_port.c
index 52144759..c63ea01b 100644
--- a/src/nxt_port.c
+++ b/src/nxt_port.c
@@ -177,7 +177,7 @@ nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
}
-void
+nxt_inline void
nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt,
nxt_port_t *new_port, uint32_t stream)
{
@@ -195,10 +195,7 @@ nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt,
port = nxt_process_port_first(process);
- if (port->type == NXT_PROCESS_MAIN
- || port->type == NXT_PROCESS_CONTROLLER
- || port->type == NXT_PROCESS_ROUTER)
- {
+ if (nxt_proc_conn_martix[port->type][new_port->type]) {
(void) nxt_port_send_port(task, port, new_port, stream);
}
diff --git a/src/nxt_port.h b/src/nxt_port.h
index bade2adb..0d913df5 100644
--- a/src/nxt_port.h
+++ b/src/nxt_port.h
@@ -227,8 +227,6 @@ nxt_int_t nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port,
void nxt_port_enable(nxt_task_t *task, nxt_port_t *port,
nxt_port_handlers_t *handlers);
-void nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt,
- nxt_port_t *port, uint32_t stream);
nxt_int_t nxt_port_send_port(nxt_task_t *task, nxt_port_t *port,
nxt_port_t *new_port, uint32_t stream);
void nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt,
diff --git a/src/nxt_process.c b/src/nxt_process.c
index 1b01713d..4d0241ec 100644
--- a/src/nxt_process.c
+++ b/src/nxt_process.c
@@ -18,13 +18,31 @@ nxt_pid_t nxt_pid;
/* An original parent process pid. */
nxt_pid_t nxt_ppid;
+nxt_bool_t nxt_proc_conn_martix[NXT_PROCESS_MAX][NXT_PROCESS_MAX] = {
+ { 0, 0, 0, 0, 0, 0 },
+ { 0, 1, 1, 1, 1, 1 },
+ { 0, 1, 0, 0, 0, 0 },
+ { 0, 1, 0, 0, 1, 0 },
+ { 0, 1, 0, 1, 0, 1 },
+ { 0, 1, 0, 0, 0, 0 },
+};
+
+nxt_bool_t nxt_proc_remove_notify_martix[NXT_PROCESS_MAX][NXT_PROCESS_MAX] = {
+ { 0, 0, 0, 0, 0, 0 },
+ { 0, 0, 0, 0, 0, 0 },
+ { 0, 0, 0, 0, 0, 0 },
+ { 0, 0, 0, 0, 1, 0 },
+ { 0, 0, 0, 1, 0, 1 },
+ { 0, 0, 0, 0, 1, 0 },
+};
nxt_pid_t
nxt_process_create(nxt_task_t *task, nxt_process_t *process)
{
- nxt_pid_t pid;
- nxt_process_t *p;
- nxt_runtime_t *rt;
+ nxt_pid_t pid;
+ nxt_process_t *p;
+ nxt_runtime_t *rt;
+ nxt_process_type_t ptype;
rt = task->thread->runtime;
@@ -48,6 +66,8 @@ nxt_process_create(nxt_task_t *task, nxt_process_t *process)
rt->types = 0;
+ ptype = process->init->type;
+
nxt_port_reset_next_id();
nxt_event_engine_thread_adopt(task->thread->engine);
@@ -55,16 +75,25 @@ nxt_process_create(nxt_task_t *task, nxt_process_t *process)
/* Remove not ready processes */
nxt_runtime_process_each(rt, p) {
+ if (nxt_proc_conn_martix[ptype][nxt_process_type(p)] == 0) {
+ nxt_debug(task, "remove not required process %PI", p->pid);
+
+ nxt_process_close_ports(task, p);
+
+ continue;
+ }
+
if (!p->ready) {
nxt_debug(task, "remove not ready process %PI", p->pid);
nxt_process_close_ports(task, p);
- } else {
- nxt_port_mmaps_destroy(&p->incoming, 0);
- nxt_port_mmaps_destroy(&p->outgoing, 0);
+ continue;
}
+ nxt_port_mmaps_destroy(&p->incoming, 0);
+ nxt_port_mmaps_destroy(&p->outgoing, 0);
+
} nxt_runtime_process_loop;
nxt_runtime_process_add(task, process);
@@ -598,6 +627,14 @@ nxt_process_port_add(nxt_task_t *task, nxt_process_t *process, nxt_port_t *port)
}
+nxt_process_type_t
+nxt_process_type(nxt_process_t *process)
+{
+ return nxt_queue_is_empty(&process->ports) ? 0 :
+ (nxt_process_port_first(process))->type;
+}
+
+
void
nxt_process_close_ports(nxt_task_t *task, nxt_process_t *process)
{
diff --git a/src/nxt_process.h b/src/nxt_process.h
index ae0e1661..c2a011ef 100644
--- a/src/nxt_process.h
+++ b/src/nxt_process.h
@@ -70,6 +70,10 @@ typedef struct {
} nxt_process_t;
+extern nxt_bool_t nxt_proc_conn_martix[NXT_PROCESS_MAX][NXT_PROCESS_MAX];
+extern nxt_bool_t
+ nxt_proc_remove_notify_martix[NXT_PROCESS_MAX][NXT_PROCESS_MAX];
+
NXT_EXPORT nxt_pid_t nxt_process_create(nxt_task_t *task,
nxt_process_t *process);
NXT_EXPORT nxt_pid_t nxt_process_execute(nxt_task_t *task, char *name,
@@ -95,6 +99,8 @@ NXT_EXPORT void nxt_process_port_add(nxt_task_t *task, nxt_process_t *process,
#define nxt_process_port_loop \
nxt_queue_loop
+nxt_process_type_t nxt_process_type(nxt_process_t *process);
+
void nxt_process_close_ports(nxt_task_t *task, nxt_process_t *process);
void nxt_process_connected_port_add(nxt_process_t *process, nxt_port_t *port);