diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/nxt_main_process.c | 18 | ||||
-rw-r--r-- | src/nxt_port.c | 20 | ||||
-rw-r--r-- | src/nxt_process.c | 11 | ||||
-rw-r--r-- | src/nxt_process.h | 2 | ||||
-rw-r--r-- | src/nxt_runtime.c | 71 | ||||
-rw-r--r-- | src/nxt_runtime.h | 10 |
6 files changed, 65 insertions, 67 deletions
diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c index b86fb1c1..cfe0341f 100644 --- a/src/nxt_main_process.c +++ b/src/nxt_main_process.c @@ -397,31 +397,19 @@ nxt_main_process_port_create(nxt_task_t *task, nxt_runtime_t *rt) nxt_port_t *port; nxt_process_t *process; - process = nxt_runtime_process_get(rt, nxt_pid); - if (nxt_slow_path(process == NULL)) { - return NXT_ERROR; - } - - port = nxt_port_new(task, 0, nxt_pid, NXT_PROCESS_MAIN); + port = nxt_runtime_process_port_create(task, rt, nxt_pid, 0, + NXT_PROCESS_MAIN); if (nxt_slow_path(port == NULL)) { - nxt_process_use(task, process, -1); return NXT_ERROR; } - nxt_process_port_add(task, process, port); - - nxt_process_use(task, process, -1); + process = port->process; ret = nxt_port_socket_init(task, port, 0); if (nxt_slow_path(ret != NXT_OK)) { - nxt_port_use(task, port, -1); return ret; } - nxt_runtime_port_add(task, port); - - nxt_port_use(task, port, -1); - /* * A main process port. A write port is not closed * since it should be inherited by worker processes. diff --git a/src/nxt_port.c b/src/nxt_port.c index 9029353a..8d14a5e7 100644 --- a/src/nxt_port.c +++ b/src/nxt_port.c @@ -238,7 +238,6 @@ void nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { nxt_port_t *port; - nxt_process_t *process; nxt_runtime_t *rt; nxt_port_msg_new_port_t *new_port_msg; @@ -261,22 +260,13 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) return; } - process = nxt_runtime_process_get(rt, new_port_msg->pid); - if (nxt_slow_path(process == NULL)) { - return; - } - - port = nxt_port_new(task, new_port_msg->id, new_port_msg->pid, - new_port_msg->type); + port = nxt_runtime_process_port_create(task, rt, new_port_msg->pid, + new_port_msg->id, + new_port_msg->type); if (nxt_slow_path(port == NULL)) { - nxt_process_use(task, process, -1); return; } - nxt_process_port_add(task, process, port); - - nxt_process_use(task, process, -1); - nxt_fd_nonblocking(task, msg->fd); port->pair[0] = -1; @@ -286,10 +276,6 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) port->socket.task = task; - nxt_runtime_port_add(task, port); - - nxt_port_use(task, port, -1); - nxt_port_write_enable(task, port); msg->u.new_port = port; diff --git a/src/nxt_process.c b/src/nxt_process.c index 4b557b73..b246a58c 100644 --- a/src/nxt_process.c +++ b/src/nxt_process.c @@ -784,6 +784,17 @@ nxt_user_cred_set(nxt_task_t *task, nxt_user_cred_t *uc) void +nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i) +{ + process->use_count += i; + + if (process->use_count == 0) { + nxt_runtime_process_release(task->thread->runtime, process); + } +} + + +void nxt_process_port_add(nxt_task_t *task, nxt_process_t *process, nxt_port_t *port) { nxt_assert(port->process == NULL); diff --git a/src/nxt_process.h b/src/nxt_process.h index df9ca038..d67573f1 100644 --- a/src/nxt_process.h +++ b/src/nxt_process.h @@ -114,6 +114,8 @@ NXT_EXPORT void nxt_process_port_add(nxt_task_t *task, nxt_process_t *process, nxt_process_type_t nxt_process_type(nxt_process_t *process); +void nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i); + void nxt_process_close_ports(nxt_task_t *task, nxt_process_t *process); void nxt_process_connected_port_add(nxt_process_t *process, nxt_port_t *port); diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c index cf03c888..096aabc4 100644 --- a/src/nxt_runtime.c +++ b/src/nxt_runtime.c @@ -37,10 +37,10 @@ static void nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt, static void nxt_runtime_thread_pool_init(void); static void nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj, void *data); -static void nxt_runtime_process_destroy(nxt_runtime_t *rt, +static nxt_process_t *nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid); +static void nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process); -static nxt_process_t *nxt_runtime_process_remove_pid(nxt_runtime_t *rt, - nxt_pid_t pid); +static void nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port); nxt_int_t @@ -1298,11 +1298,15 @@ nxt_runtime_process_new(nxt_runtime_t *rt) } -static void -nxt_runtime_process_destroy(nxt_runtime_t *rt, nxt_process_t *process) +void +nxt_runtime_process_release(nxt_runtime_t *rt, nxt_process_t *process) { nxt_port_t *port; + if (process->registered == 1) { + nxt_runtime_process_remove(rt, process); + } + nxt_assert(process->use_count == 0); nxt_assert(process->registered == 0); @@ -1385,7 +1389,7 @@ nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid) } -nxt_process_t * +static nxt_process_t * nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid) { nxt_process_t *process; @@ -1495,13 +1499,13 @@ nxt_runtime_process_add(nxt_task_t *task, nxt_process_t *process) } -static nxt_process_t * -nxt_runtime_process_remove_pid(nxt_runtime_t *rt, nxt_pid_t pid) +static void +nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process) { - nxt_process_t *process; + nxt_pid_t pid; nxt_lvlhsh_query_t lhq; - process = NULL; + pid = process->pid; nxt_runtime_process_lhq_pid(&lhq, &pid); @@ -1527,40 +1531,49 @@ nxt_runtime_process_remove_pid(nxt_runtime_t *rt, nxt_pid_t pid) } nxt_thread_mutex_unlock(&rt->processes_mutex); - - return process; } -void -nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i) +nxt_process_t * +nxt_runtime_process_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe) { - nxt_runtime_t *rt; + nxt_lvlhsh_each_init(lhe, &lvlhsh_processes_proto); - process->use_count += i; + return nxt_runtime_process_next(rt, lhe); +} - if (process->use_count == 0) { - rt = task->thread->runtime; - if (process->registered == 1) { - nxt_runtime_process_remove_pid(rt, process->pid); - } +nxt_port_t * +nxt_runtime_process_port_create(nxt_task_t *task, nxt_runtime_t *rt, + nxt_pid_t pid, nxt_port_id_t id, nxt_process_type_t type) +{ + nxt_port_t *port; + nxt_process_t *process; - nxt_runtime_process_destroy(rt, process); + process = nxt_runtime_process_get(rt, pid); + if (nxt_slow_path(process == NULL)) { + return NULL; } -} + port = nxt_port_new(task, id, pid, type); + if (nxt_slow_path(port == NULL)) { + nxt_process_use(task, process, -1); + return NULL; + } -nxt_process_t * -nxt_runtime_process_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe) -{ - nxt_lvlhsh_each_init(lhe, &lvlhsh_processes_proto); + nxt_process_port_add(task, process, port); - return nxt_runtime_process_next(rt, lhe); + nxt_process_use(task, process, -1); + + nxt_runtime_port_add(task, port); + + nxt_port_use(task, port, -1); + + return port; } -void +static void nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port) { nxt_int_t res; diff --git a/src/nxt_runtime.h b/src/nxt_runtime.h index 0791f8e7..d5b340b6 100644 --- a/src/nxt_runtime.h +++ b/src/nxt_runtime.h @@ -93,22 +93,20 @@ nxt_int_t nxt_runtime_thread_pool_create(nxt_thread_t *thr, nxt_runtime_t *rt, nxt_process_t *nxt_runtime_process_new(nxt_runtime_t *rt); -nxt_process_t *nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid); - void nxt_runtime_process_add(nxt_task_t *task, nxt_process_t *process); nxt_process_t *nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid); -void nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i); - nxt_process_t *nxt_runtime_process_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe); +void nxt_runtime_process_release(nxt_runtime_t *rt, nxt_process_t *process); + #define nxt_runtime_process_next(rt, lhe) \ nxt_lvlhsh_each(&rt->processes, lhe) - -void nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port); +nxt_port_t *nxt_runtime_process_port_create(nxt_task_t *task, nxt_runtime_t *rt, + nxt_pid_t pid, nxt_port_id_t id, nxt_process_type_t type); void nxt_runtime_port_remove(nxt_task_t *task, nxt_port_t *port); |