summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/nxt_main_process.c18
-rw-r--r--src/nxt_port.c20
-rw-r--r--src/nxt_process.c11
-rw-r--r--src/nxt_process.h2
-rw-r--r--src/nxt_runtime.c71
-rw-r--r--src/nxt_runtime.h10
6 files changed, 65 insertions, 67 deletions
diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c
index b86fb1c1..cfe0341f 100644
--- a/src/nxt_main_process.c
+++ b/src/nxt_main_process.c
@@ -397,31 +397,19 @@ nxt_main_process_port_create(nxt_task_t *task, nxt_runtime_t *rt)
nxt_port_t *port;
nxt_process_t *process;
- process = nxt_runtime_process_get(rt, nxt_pid);
- if (nxt_slow_path(process == NULL)) {
- return NXT_ERROR;
- }
-
- port = nxt_port_new(task, 0, nxt_pid, NXT_PROCESS_MAIN);
+ port = nxt_runtime_process_port_create(task, rt, nxt_pid, 0,
+ NXT_PROCESS_MAIN);
if (nxt_slow_path(port == NULL)) {
- nxt_process_use(task, process, -1);
return NXT_ERROR;
}
- nxt_process_port_add(task, process, port);
-
- nxt_process_use(task, process, -1);
+ process = port->process;
ret = nxt_port_socket_init(task, port, 0);
if (nxt_slow_path(ret != NXT_OK)) {
- nxt_port_use(task, port, -1);
return ret;
}
- nxt_runtime_port_add(task, port);
-
- nxt_port_use(task, port, -1);
-
/*
* A main process port. A write port is not closed
* since it should be inherited by worker processes.
diff --git a/src/nxt_port.c b/src/nxt_port.c
index 9029353a..8d14a5e7 100644
--- a/src/nxt_port.c
+++ b/src/nxt_port.c
@@ -238,7 +238,6 @@ void
nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
nxt_port_t *port;
- nxt_process_t *process;
nxt_runtime_t *rt;
nxt_port_msg_new_port_t *new_port_msg;
@@ -261,22 +260,13 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
return;
}
- process = nxt_runtime_process_get(rt, new_port_msg->pid);
- if (nxt_slow_path(process == NULL)) {
- return;
- }
-
- port = nxt_port_new(task, new_port_msg->id, new_port_msg->pid,
- new_port_msg->type);
+ port = nxt_runtime_process_port_create(task, rt, new_port_msg->pid,
+ new_port_msg->id,
+ new_port_msg->type);
if (nxt_slow_path(port == NULL)) {
- nxt_process_use(task, process, -1);
return;
}
- nxt_process_port_add(task, process, port);
-
- nxt_process_use(task, process, -1);
-
nxt_fd_nonblocking(task, msg->fd);
port->pair[0] = -1;
@@ -286,10 +276,6 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
port->socket.task = task;
- nxt_runtime_port_add(task, port);
-
- nxt_port_use(task, port, -1);
-
nxt_port_write_enable(task, port);
msg->u.new_port = port;
diff --git a/src/nxt_process.c b/src/nxt_process.c
index 4b557b73..b246a58c 100644
--- a/src/nxt_process.c
+++ b/src/nxt_process.c
@@ -784,6 +784,17 @@ nxt_user_cred_set(nxt_task_t *task, nxt_user_cred_t *uc)
void
+nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i)
+{
+ process->use_count += i;
+
+ if (process->use_count == 0) {
+ nxt_runtime_process_release(task->thread->runtime, process);
+ }
+}
+
+
+void
nxt_process_port_add(nxt_task_t *task, nxt_process_t *process, nxt_port_t *port)
{
nxt_assert(port->process == NULL);
diff --git a/src/nxt_process.h b/src/nxt_process.h
index df9ca038..d67573f1 100644
--- a/src/nxt_process.h
+++ b/src/nxt_process.h
@@ -114,6 +114,8 @@ NXT_EXPORT void nxt_process_port_add(nxt_task_t *task, nxt_process_t *process,
nxt_process_type_t nxt_process_type(nxt_process_t *process);
+void nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i);
+
void nxt_process_close_ports(nxt_task_t *task, nxt_process_t *process);
void nxt_process_connected_port_add(nxt_process_t *process, nxt_port_t *port);
diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c
index cf03c888..096aabc4 100644
--- a/src/nxt_runtime.c
+++ b/src/nxt_runtime.c
@@ -37,10 +37,10 @@ static void nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt,
static void nxt_runtime_thread_pool_init(void);
static void nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj,
void *data);
-static void nxt_runtime_process_destroy(nxt_runtime_t *rt,
+static nxt_process_t *nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid);
+static void nxt_runtime_process_remove(nxt_runtime_t *rt,
nxt_process_t *process);
-static nxt_process_t *nxt_runtime_process_remove_pid(nxt_runtime_t *rt,
- nxt_pid_t pid);
+static void nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port);
nxt_int_t
@@ -1298,11 +1298,15 @@ nxt_runtime_process_new(nxt_runtime_t *rt)
}
-static void
-nxt_runtime_process_destroy(nxt_runtime_t *rt, nxt_process_t *process)
+void
+nxt_runtime_process_release(nxt_runtime_t *rt, nxt_process_t *process)
{
nxt_port_t *port;
+ if (process->registered == 1) {
+ nxt_runtime_process_remove(rt, process);
+ }
+
nxt_assert(process->use_count == 0);
nxt_assert(process->registered == 0);
@@ -1385,7 +1389,7 @@ nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid)
}
-nxt_process_t *
+static nxt_process_t *
nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid)
{
nxt_process_t *process;
@@ -1495,13 +1499,13 @@ nxt_runtime_process_add(nxt_task_t *task, nxt_process_t *process)
}
-static nxt_process_t *
-nxt_runtime_process_remove_pid(nxt_runtime_t *rt, nxt_pid_t pid)
+static void
+nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process)
{
- nxt_process_t *process;
+ nxt_pid_t pid;
nxt_lvlhsh_query_t lhq;
- process = NULL;
+ pid = process->pid;
nxt_runtime_process_lhq_pid(&lhq, &pid);
@@ -1527,40 +1531,49 @@ nxt_runtime_process_remove_pid(nxt_runtime_t *rt, nxt_pid_t pid)
}
nxt_thread_mutex_unlock(&rt->processes_mutex);
-
- return process;
}
-void
-nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i)
+nxt_process_t *
+nxt_runtime_process_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe)
{
- nxt_runtime_t *rt;
+ nxt_lvlhsh_each_init(lhe, &lvlhsh_processes_proto);
- process->use_count += i;
+ return nxt_runtime_process_next(rt, lhe);
+}
- if (process->use_count == 0) {
- rt = task->thread->runtime;
- if (process->registered == 1) {
- nxt_runtime_process_remove_pid(rt, process->pid);
- }
+nxt_port_t *
+nxt_runtime_process_port_create(nxt_task_t *task, nxt_runtime_t *rt,
+ nxt_pid_t pid, nxt_port_id_t id, nxt_process_type_t type)
+{
+ nxt_port_t *port;
+ nxt_process_t *process;
- nxt_runtime_process_destroy(rt, process);
+ process = nxt_runtime_process_get(rt, pid);
+ if (nxt_slow_path(process == NULL)) {
+ return NULL;
}
-}
+ port = nxt_port_new(task, id, pid, type);
+ if (nxt_slow_path(port == NULL)) {
+ nxt_process_use(task, process, -1);
+ return NULL;
+ }
-nxt_process_t *
-nxt_runtime_process_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe)
-{
- nxt_lvlhsh_each_init(lhe, &lvlhsh_processes_proto);
+ nxt_process_port_add(task, process, port);
- return nxt_runtime_process_next(rt, lhe);
+ nxt_process_use(task, process, -1);
+
+ nxt_runtime_port_add(task, port);
+
+ nxt_port_use(task, port, -1);
+
+ return port;
}
-void
+static void
nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port)
{
nxt_int_t res;
diff --git a/src/nxt_runtime.h b/src/nxt_runtime.h
index 0791f8e7..d5b340b6 100644
--- a/src/nxt_runtime.h
+++ b/src/nxt_runtime.h
@@ -93,22 +93,20 @@ nxt_int_t nxt_runtime_thread_pool_create(nxt_thread_t *thr, nxt_runtime_t *rt,
nxt_process_t *nxt_runtime_process_new(nxt_runtime_t *rt);
-nxt_process_t *nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid);
-
void nxt_runtime_process_add(nxt_task_t *task, nxt_process_t *process);
nxt_process_t *nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid);
-void nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i);
-
nxt_process_t *nxt_runtime_process_first(nxt_runtime_t *rt,
nxt_lvlhsh_each_t *lhe);
+void nxt_runtime_process_release(nxt_runtime_t *rt, nxt_process_t *process);
+
#define nxt_runtime_process_next(rt, lhe) \
nxt_lvlhsh_each(&rt->processes, lhe)
-
-void nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port);
+nxt_port_t *nxt_runtime_process_port_create(nxt_task_t *task, nxt_runtime_t *rt,
+ nxt_pid_t pid, nxt_port_id_t id, nxt_process_type_t type);
void nxt_runtime_port_remove(nxt_task_t *task, nxt_port_t *port);