diff options
author | Max Romanov <max.romanov@nginx.com> | 2017-08-02 13:22:07 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2017-08-02 13:22:07 +0300 |
commit | f23f985899760fafd853e993d9023b1339f09533 (patch) | |
tree | b1896aa49683d809481351249352dbb18b59cd97 /src/nxt_runtime.c | |
parent | 021a84019f48c9027254f15c9b9db7ea12535dd0 (diff) | |
download | unit-f23f985899760fafd853e993d9023b1339f09533.tar.gz unit-f23f985899760fafd853e993d9023b1339f09533.tar.bz2 |
Runtime processes protected with mutex.
Diffstat (limited to '')
-rw-r--r-- | src/nxt_runtime.c | 124 |
1 files changed, 92 insertions, 32 deletions
diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c index 31829225..5f4ec6f4 100644 --- a/src/nxt_runtime.c +++ b/src/nxt_runtime.c @@ -48,6 +48,11 @@ static void nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt, nxt_runtime_cont_t cont); #endif +static void nxt_runtime_process_destroy(nxt_runtime_t *rt, + nxt_process_t *process); +static nxt_process_t *nxt_runtime_process_remove_pid(nxt_runtime_t *rt, + nxt_pid_t pid); + nxt_int_t nxt_runtime_create(nxt_task_t *task) @@ -71,6 +76,8 @@ nxt_runtime_create(nxt_task_t *task) task->thread->runtime = rt; rt->mem_pool = mp; + nxt_thread_mutex_create(&rt->processes_mutex); + rt->prefix = nxt_current_directory(mp); if (nxt_slow_path(rt->prefix == NULL)) { goto fail; @@ -557,6 +564,8 @@ nxt_runtime_exit(nxt_task_t *task, void *obj, void *data) } nxt_runtime_process_loop; + nxt_thread_mutex_destroy(&rt->processes_mutex); + nxt_mp_destroy(rt->mem_pool); nxt_debug(task, "exit"); @@ -1488,10 +1497,11 @@ nxt_runtime_process_new(nxt_runtime_t *rt) } -void +static void nxt_runtime_process_destroy(nxt_runtime_t *rt, nxt_process_t *process) { nxt_assert(process->port_cleanups == 0); + nxt_assert(process->registered == 0); nxt_port_mmaps_destroy(process->incoming, 1); nxt_port_mmaps_destroy(process->outgoing, 1); @@ -1533,23 +1543,38 @@ static const nxt_lvlhsh_proto_t lvlhsh_processes_proto nxt_aligned(64) = { }; +nxt_inline void +nxt_runtime_process_lhq_pid(nxt_lvlhsh_query_t *lhq, nxt_pid_t *pid) +{ + lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid)); + lhq->key.length = sizeof(*pid); + lhq->key.start = (u_char *) pid; + lhq->proto = &lvlhsh_processes_proto; +} + + nxt_process_t * nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid) { + nxt_process_t *process; nxt_lvlhsh_query_t lhq; - lhq.key_hash = nxt_murmur_hash2(&pid, sizeof(pid)); - lhq.key.length = sizeof(pid); - lhq.key.start = (u_char *) &pid; - lhq.proto = &lvlhsh_processes_proto; + process = NULL; + + nxt_runtime_process_lhq_pid(&lhq, &pid); + + nxt_thread_mutex_lock(&rt->processes_mutex); if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) { - return lhq.value; + process = lhq.value; + + } else { + nxt_thread_log_debug("process %PI not found", pid); } - nxt_thread_log_debug("process %PI not found", pid); + nxt_thread_mutex_unlock(&rt->processes_mutex); - return NULL; + return process; } @@ -1559,13 +1584,14 @@ nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid) nxt_process_t *process; nxt_lvlhsh_query_t lhq; - lhq.key_hash = nxt_murmur_hash2(&pid, sizeof(pid)); - lhq.key.length = sizeof(pid); - lhq.key.start = (u_char *) &pid; - lhq.proto = &lvlhsh_processes_proto; + nxt_runtime_process_lhq_pid(&lhq, &pid); + + nxt_thread_mutex_lock(&rt->processes_mutex); if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) { nxt_thread_log_debug("process %PI found", pid); + + nxt_thread_mutex_unlock(&rt->processes_mutex); return lhq.value; } @@ -1589,6 +1615,8 @@ nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid) rt->nprocesses++; + process->registered = 1; + nxt_thread_log_debug("process %PI insert", pid); break; @@ -1597,6 +1625,8 @@ nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid) break; } + nxt_thread_mutex_unlock(&rt->processes_mutex); + return process; } @@ -1607,14 +1637,16 @@ nxt_runtime_process_add(nxt_runtime_t *rt, nxt_process_t *process) nxt_port_t *port; nxt_lvlhsh_query_t lhq; - lhq.key_hash = nxt_murmur_hash2(&process->pid, sizeof(process->pid)); - lhq.key.length = sizeof(process->pid); - lhq.key.start = (u_char *) &process->pid; - lhq.proto = &lvlhsh_processes_proto; + nxt_assert(process->registered == 0); + + nxt_runtime_process_lhq_pid(&lhq, &process->pid); + lhq.replace = 0; lhq.value = process; lhq.pool = rt->mem_pool; + nxt_thread_mutex_lock(&rt->processes_mutex); + switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) { case NXT_OK: @@ -1632,37 +1664,70 @@ nxt_runtime_process_add(nxt_runtime_t *rt, nxt_process_t *process) } nxt_process_port_loop; + process->registered = 1; + + nxt_thread_log_debug("process %PI added", process->pid); break; default: + nxt_thread_log_debug("process %PI failed to add", process->pid); break; } + + nxt_thread_mutex_unlock(&rt->processes_mutex); } -void -nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process) +static nxt_process_t * +nxt_runtime_process_remove_pid(nxt_runtime_t *rt, nxt_pid_t pid) { - nxt_port_t *port; + nxt_process_t *process; nxt_lvlhsh_query_t lhq; - lhq.key_hash = nxt_murmur_hash2(&process->pid, sizeof(process->pid)); - lhq.key.length = sizeof(process->pid); - lhq.key.start = (u_char *) &process->pid; - lhq.proto = &lvlhsh_processes_proto; - lhq.replace = 0; - lhq.value = process; + process = NULL; + + nxt_runtime_process_lhq_pid(&lhq, &pid); + lhq.pool = rt->mem_pool; + nxt_thread_mutex_lock(&rt->processes_mutex); + switch (nxt_lvlhsh_delete(&rt->processes, &lhq)) { case NXT_OK: rt->nprocesses--; - if (process->port_cleanups == 0) { - nxt_runtime_process_destroy(rt, process); + process = lhq.value; + + process->registered = 0; + + nxt_thread_log_debug("process %PI removed", pid); + break; + + default: + nxt_thread_log_debug("process %PI remove failed", pid); + break; + } + + nxt_thread_mutex_unlock(&rt->processes_mutex); + + return process; +} + + +void +nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process) +{ + nxt_port_t *port; + + if (process->port_cleanups == 0) { + if (process->registered == 1) { + nxt_runtime_process_remove_pid(rt, process->pid); } + nxt_runtime_process_destroy(rt, process); + + } else { nxt_process_port_each(process, port) { nxt_runtime_port_remove(rt, port); @@ -1670,11 +1735,6 @@ nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process) nxt_port_release(port); } nxt_process_port_loop; - - break; - - default: - break; } } |