diff options
-rw-r--r-- | src/nxt_main_process.c | 9 | ||||
-rw-r--r-- | src/nxt_port.c | 11 | ||||
-rw-r--r-- | src/nxt_process.c | 35 | ||||
-rw-r--r-- | src/nxt_process.h | 3 | ||||
-rw-r--r-- | src/nxt_runtime.c | 30 | ||||
-rw-r--r-- | src/nxt_runtime.h | 2 |
6 files changed, 52 insertions, 38 deletions
diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c index 9c27a89b..128c0fde 100644 --- a/src/nxt_main_process.c +++ b/src/nxt_main_process.c @@ -257,11 +257,14 @@ nxt_main_process_port_create(nxt_task_t *task, nxt_runtime_t *rt) port = nxt_port_new(task, 0, nxt_pid, NXT_PROCESS_MAIN); if (nxt_slow_path(port == NULL)) { + nxt_process_use(task, process, -1); return NXT_ERROR; } nxt_process_port_add(task, process, port); + nxt_process_use(task, process, -1); + ret = nxt_port_socket_init(task, port, 0); if (nxt_slow_path(ret != NXT_OK)) { nxt_port_use(task, port, -1); @@ -511,12 +514,14 @@ nxt_main_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt, port = nxt_port_new(task, 0, 0, init->type); if (nxt_slow_path(port == NULL)) { - nxt_runtime_process_remove(task, process); + nxt_process_use(task, process, -1); return NXT_ERROR; } nxt_process_port_add(task, process, port); + nxt_process_use(task, process, -1); + ret = nxt_port_socket_init(task, port, 0); if (nxt_slow_path(ret != NXT_OK)) { nxt_port_use(task, port, -1); @@ -760,7 +765,7 @@ nxt_main_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid) if (process) { init = process->init; - nxt_runtime_process_remove(task, process); + nxt_process_close_ports(task, process); if (!nxt_exiting) { nxt_runtime_process_each(rt, process) { diff --git a/src/nxt_port.c b/src/nxt_port.c index 948e4de6..df652ac4 100644 --- a/src/nxt_port.c +++ b/src/nxt_port.c @@ -112,7 +112,11 @@ nxt_port_release(nxt_task_t *task, nxt_port_t *port) } if (port->link.next != NULL) { + nxt_assert(port->process != NULL); + nxt_process_port_remove(port); + + nxt_process_use(task, port->process, -1); } nxt_mp_release(port->mem_pool, NULL); @@ -265,11 +269,14 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) port = nxt_port_new(task, new_port_msg->id, new_port_msg->pid, new_port_msg->type); if (nxt_slow_path(port == NULL)) { + nxt_process_use(task, process, -1); return; } nxt_process_port_add(task, process, port); + nxt_process_use(task, process, -1); + port->pair[0] = -1; port->pair[1] = msg->fd; port->max_size = new_port_msg->max_size; @@ -298,7 +305,7 @@ nxt_port_process_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_assert(nxt_runtime_is_main(rt)); - process = nxt_runtime_process_get(rt, msg->port_msg.pid); + process = nxt_runtime_process_find(rt, msg->port_msg.pid); if (nxt_slow_path(process == NULL)) { return; } @@ -452,7 +459,7 @@ nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) process = nxt_runtime_process_find(rt, pid); if (process) { - nxt_runtime_process_remove(task, process); + nxt_process_close_ports(task, process); } } diff --git a/src/nxt_process.c b/src/nxt_process.c index d3ec36ed..272837b6 100644 --- a/src/nxt_process.c +++ b/src/nxt_process.c @@ -58,7 +58,7 @@ nxt_process_create(nxt_task_t *task, nxt_process_t *process) if (!p->ready) { nxt_debug(task, "remove not ready process %PI", p->pid); - nxt_runtime_process_remove(task, p); + nxt_process_close_ports(task, p); } else { nxt_port_mmaps_destroy(p->incoming, 0); @@ -586,29 +586,30 @@ nxt_user_cred_set(nxt_task_t *task, nxt_user_cred_t *uc) } -static void -nxt_process_port_mp_cleanup(nxt_task_t *task, void *obj, void *data) +void +nxt_process_port_add(nxt_task_t *task, nxt_process_t *process, nxt_port_t *port) { - nxt_process_t *process; - - process = obj; + nxt_assert(port->process == NULL); - process->port_cleanups--; + port->process = process; + nxt_queue_insert_tail(&process->ports, &port->link); - if (process->port_cleanups == 0) { - nxt_runtime_process_remove(task, process); - } + nxt_process_use(task, process, 1); } + void -nxt_process_port_add(nxt_task_t *task, nxt_process_t *process, nxt_port_t *port) +nxt_process_close_ports(nxt_task_t *task, nxt_process_t *process) { - port->process = process; - nxt_queue_insert_tail(&process->ports, &port->link); + nxt_port_t *port; + + nxt_process_port_each(process, port) { - nxt_mp_cleanup(port->mem_pool, nxt_process_port_mp_cleanup, task, process, - NULL); - process->port_cleanups++; + nxt_port_close(task, port); + + nxt_runtime_port_remove(task, port); + + } nxt_process_port_loop; } @@ -622,6 +623,7 @@ nxt_process_connected_port_add(nxt_process_t *process, nxt_port_t *port) nxt_thread_mutex_unlock(&process->cp_mutex); } + void nxt_process_connected_port_remove(nxt_process_t *process, nxt_port_t *port) { @@ -632,6 +634,7 @@ nxt_process_connected_port_remove(nxt_process_t *process, nxt_port_t *port) nxt_thread_mutex_unlock(&process->cp_mutex); } + nxt_port_t * nxt_process_connected_port_find(nxt_process_t *process, nxt_pid_t pid, nxt_port_id_t port_id) diff --git a/src/nxt_process.h b/src/nxt_process.h index c3ffc702..26e7de18 100644 --- a/src/nxt_process.h +++ b/src/nxt_process.h @@ -47,7 +47,7 @@ typedef struct { nxt_queue_t ports; /* of nxt_port_t */ nxt_bool_t ready; nxt_bool_t registered; - nxt_uint_t port_cleanups; + nxt_int_t use_count; nxt_process_init_t *init; @@ -87,6 +87,7 @@ NXT_EXPORT void nxt_process_port_add(nxt_task_t *task, nxt_process_t *process, #define nxt_process_port_loop \ nxt_queue_loop +void nxt_process_close_ports(nxt_task_t *task, nxt_process_t *process); void nxt_process_connected_port_add(nxt_process_t *process, nxt_port_t *port); diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c index be88fc30..b1b80865 100644 --- a/src/nxt_runtime.c +++ b/src/nxt_runtime.c @@ -553,7 +553,7 @@ nxt_runtime_exit(nxt_task_t *task, void *obj, void *data) nxt_runtime_process_each(rt, process) { - nxt_runtime_process_remove(task, process); + nxt_process_close_ports(task, process); } nxt_runtime_process_loop; @@ -1580,6 +1580,8 @@ nxt_runtime_process_new(nxt_runtime_t *rt) nxt_thread_mutex_create(&process->outgoing_mutex); nxt_thread_mutex_create(&process->cp_mutex); + process->use_count = 1; + return process; } @@ -1590,7 +1592,7 @@ nxt_runtime_process_destroy(nxt_runtime_t *rt, nxt_process_t *process) nxt_port_t *port; nxt_lvlhsh_each_t lhe; - nxt_assert(process->port_cleanups == 0); + nxt_assert(process->use_count == 0); nxt_assert(process->registered == 0); nxt_port_mmaps_destroy(process->incoming, 1); @@ -1685,7 +1687,11 @@ nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid) nxt_thread_log_debug("process %PI found", pid); nxt_thread_mutex_unlock(&rt->processes_mutex); - return lhq.value; + + process = lhq.value; + process->use_count++; + + return process; } process = nxt_runtime_process_new(rt); @@ -1812,28 +1818,20 @@ nxt_runtime_process_remove_pid(nxt_runtime_t *rt, nxt_pid_t pid) void -nxt_runtime_process_remove(nxt_task_t *task, nxt_process_t *process) +nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i) { - nxt_port_t *port; nxt_runtime_t *rt; - rt = task->thread->runtime; + process->use_count += i; + + if (process->use_count == 0) { + rt = task->thread->runtime; - if (process->port_cleanups == 0) { if (process->registered == 1) { nxt_runtime_process_remove_pid(rt, process->pid); } nxt_runtime_process_destroy(rt, process); - - } else { - nxt_process_port_each(process, port) { - - nxt_port_close(task, port); - - nxt_runtime_port_remove(task, port); - - } nxt_process_port_loop; } } diff --git a/src/nxt_runtime.h b/src/nxt_runtime.h index 5aa897dc..af105c6b 100644 --- a/src/nxt_runtime.h +++ b/src/nxt_runtime.h @@ -106,7 +106,7 @@ void nxt_runtime_process_add(nxt_task_t *task, nxt_process_t *process); nxt_process_t *nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid); -void nxt_runtime_process_remove(nxt_task_t *task, nxt_process_t *process); +void nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i); nxt_process_t *nxt_runtime_process_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe); |