summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_runtime.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nxt_runtime.c')
-rw-r--r--src/nxt_runtime.c124
1 files changed, 92 insertions, 32 deletions
diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c
index 31829225..5f4ec6f4 100644
--- a/src/nxt_runtime.c
+++ b/src/nxt_runtime.c
@@ -48,6 +48,11 @@ static void nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt,
nxt_runtime_cont_t cont);
#endif
+static void nxt_runtime_process_destroy(nxt_runtime_t *rt,
+ nxt_process_t *process);
+static nxt_process_t *nxt_runtime_process_remove_pid(nxt_runtime_t *rt,
+ nxt_pid_t pid);
+
nxt_int_t
nxt_runtime_create(nxt_task_t *task)
@@ -71,6 +76,8 @@ nxt_runtime_create(nxt_task_t *task)
task->thread->runtime = rt;
rt->mem_pool = mp;
+ nxt_thread_mutex_create(&rt->processes_mutex);
+
rt->prefix = nxt_current_directory(mp);
if (nxt_slow_path(rt->prefix == NULL)) {
goto fail;
@@ -557,6 +564,8 @@ nxt_runtime_exit(nxt_task_t *task, void *obj, void *data)
} nxt_runtime_process_loop;
+ nxt_thread_mutex_destroy(&rt->processes_mutex);
+
nxt_mp_destroy(rt->mem_pool);
nxt_debug(task, "exit");
@@ -1488,10 +1497,11 @@ nxt_runtime_process_new(nxt_runtime_t *rt)
}
-void
+static void
nxt_runtime_process_destroy(nxt_runtime_t *rt, nxt_process_t *process)
{
nxt_assert(process->port_cleanups == 0);
+ nxt_assert(process->registered == 0);
nxt_port_mmaps_destroy(process->incoming, 1);
nxt_port_mmaps_destroy(process->outgoing, 1);
@@ -1533,23 +1543,38 @@ static const nxt_lvlhsh_proto_t lvlhsh_processes_proto nxt_aligned(64) = {
};
+nxt_inline void
+nxt_runtime_process_lhq_pid(nxt_lvlhsh_query_t *lhq, nxt_pid_t *pid)
+{
+ lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid));
+ lhq->key.length = sizeof(*pid);
+ lhq->key.start = (u_char *) pid;
+ lhq->proto = &lvlhsh_processes_proto;
+}
+
+
nxt_process_t *
nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid)
{
+ nxt_process_t *process;
nxt_lvlhsh_query_t lhq;
- lhq.key_hash = nxt_murmur_hash2(&pid, sizeof(pid));
- lhq.key.length = sizeof(pid);
- lhq.key.start = (u_char *) &pid;
- lhq.proto = &lvlhsh_processes_proto;
+ process = NULL;
+
+ nxt_runtime_process_lhq_pid(&lhq, &pid);
+
+ nxt_thread_mutex_lock(&rt->processes_mutex);
if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) {
- return lhq.value;
+ process = lhq.value;
+
+ } else {
+ nxt_thread_log_debug("process %PI not found", pid);
}
- nxt_thread_log_debug("process %PI not found", pid);
+ nxt_thread_mutex_unlock(&rt->processes_mutex);
- return NULL;
+ return process;
}
@@ -1559,13 +1584,14 @@ nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid)
nxt_process_t *process;
nxt_lvlhsh_query_t lhq;
- lhq.key_hash = nxt_murmur_hash2(&pid, sizeof(pid));
- lhq.key.length = sizeof(pid);
- lhq.key.start = (u_char *) &pid;
- lhq.proto = &lvlhsh_processes_proto;
+ nxt_runtime_process_lhq_pid(&lhq, &pid);
+
+ nxt_thread_mutex_lock(&rt->processes_mutex);
if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) {
nxt_thread_log_debug("process %PI found", pid);
+
+ nxt_thread_mutex_unlock(&rt->processes_mutex);
return lhq.value;
}
@@ -1589,6 +1615,8 @@ nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid)
rt->nprocesses++;
+ process->registered = 1;
+
nxt_thread_log_debug("process %PI insert", pid);
break;
@@ -1597,6 +1625,8 @@ nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid)
break;
}
+ nxt_thread_mutex_unlock(&rt->processes_mutex);
+
return process;
}
@@ -1607,14 +1637,16 @@ nxt_runtime_process_add(nxt_runtime_t *rt, nxt_process_t *process)
nxt_port_t *port;
nxt_lvlhsh_query_t lhq;
- lhq.key_hash = nxt_murmur_hash2(&process->pid, sizeof(process->pid));
- lhq.key.length = sizeof(process->pid);
- lhq.key.start = (u_char *) &process->pid;
- lhq.proto = &lvlhsh_processes_proto;
+ nxt_assert(process->registered == 0);
+
+ nxt_runtime_process_lhq_pid(&lhq, &process->pid);
+
lhq.replace = 0;
lhq.value = process;
lhq.pool = rt->mem_pool;
+ nxt_thread_mutex_lock(&rt->processes_mutex);
+
switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) {
case NXT_OK:
@@ -1632,37 +1664,70 @@ nxt_runtime_process_add(nxt_runtime_t *rt, nxt_process_t *process)
} nxt_process_port_loop;
+ process->registered = 1;
+
+ nxt_thread_log_debug("process %PI added", process->pid);
break;
default:
+ nxt_thread_log_debug("process %PI failed to add", process->pid);
break;
}
+
+ nxt_thread_mutex_unlock(&rt->processes_mutex);
}
-void
-nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process)
+static nxt_process_t *
+nxt_runtime_process_remove_pid(nxt_runtime_t *rt, nxt_pid_t pid)
{
- nxt_port_t *port;
+ nxt_process_t *process;
nxt_lvlhsh_query_t lhq;
- lhq.key_hash = nxt_murmur_hash2(&process->pid, sizeof(process->pid));
- lhq.key.length = sizeof(process->pid);
- lhq.key.start = (u_char *) &process->pid;
- lhq.proto = &lvlhsh_processes_proto;
- lhq.replace = 0;
- lhq.value = process;
+ process = NULL;
+
+ nxt_runtime_process_lhq_pid(&lhq, &pid);
+
lhq.pool = rt->mem_pool;
+ nxt_thread_mutex_lock(&rt->processes_mutex);
+
switch (nxt_lvlhsh_delete(&rt->processes, &lhq)) {
case NXT_OK:
rt->nprocesses--;
- if (process->port_cleanups == 0) {
- nxt_runtime_process_destroy(rt, process);
+ process = lhq.value;
+
+ process->registered = 0;
+
+ nxt_thread_log_debug("process %PI removed", pid);
+ break;
+
+ default:
+ nxt_thread_log_debug("process %PI remove failed", pid);
+ break;
+ }
+
+ nxt_thread_mutex_unlock(&rt->processes_mutex);
+
+ return process;
+}
+
+
+void
+nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process)
+{
+ nxt_port_t *port;
+
+ if (process->port_cleanups == 0) {
+ if (process->registered == 1) {
+ nxt_runtime_process_remove_pid(rt, process->pid);
}
+ nxt_runtime_process_destroy(rt, process);
+
+ } else {
nxt_process_port_each(process, port) {
nxt_runtime_port_remove(rt, port);
@@ -1670,11 +1735,6 @@ nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process)
nxt_port_release(port);
} nxt_process_port_loop;
-
- break;
-
- default:
- break;
}
}