diff options
author | Max Romanov <max.romanov@nginx.com> | 2017-07-12 20:32:16 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2017-07-12 20:32:16 +0300 |
commit | b0c1e740cf404f8fed5eed75fddb205ca74314e0 (patch) | |
tree | 08dcefc827c5dfb1570b682ea8d1e9abf17a31dc /src/nxt_runtime.c | |
parent | c38bcb7d70729434893ae4d5f2f58a78a36d2bd5 (diff) | |
download | unit-b0c1e740cf404f8fed5eed75fddb205ca74314e0.tar.gz unit-b0c1e740cf404f8fed5eed75fddb205ca74314e0.tar.bz2 |
New process port exchange changed. READY message type introduced.
Application process start request DATA message from router to master.
Master notifies router via NEW_PORT message after worker process become ready.
Diffstat (limited to 'src/nxt_runtime.c')
-rw-r--r-- | src/nxt_runtime.c | 70 |
1 files changed, 36 insertions, 34 deletions
diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c index 91c4ba70..6ddf27c1 100644 --- a/src/nxt_runtime.c +++ b/src/nxt_runtime.c @@ -9,6 +9,7 @@ #include <nxt_runtime.h> #include <nxt_port.h> #include <nxt_master_process.h> +#include <nxt_router.h> static nxt_int_t nxt_runtime_inherited_listen_sockets(nxt_task_t *task, @@ -1479,16 +1480,34 @@ nxt_runtime_process_new(nxt_runtime_t *rt) nxt_queue_init(&process->ports); - /* TODO each process should have it's own mem_pool for ports allocation */ - process->mem_pool = rt->mem_pool; - nxt_thread_mutex_create(&process->incoming_mutex); nxt_thread_mutex_create(&process->outgoing_mutex); + nxt_thread_mutex_create(&process->cp_mutex); return process; } +void +nxt_runtime_process_destroy(nxt_runtime_t *rt, nxt_process_t *process) +{ + nxt_port_mmaps_destroy(process->incoming, 1); + nxt_port_mmaps_destroy(process->outgoing, 1); + + if (process->cp_mem_pool != NULL) { + nxt_mp_thread_adopt(process->cp_mem_pool); + + nxt_mp_destroy(process->cp_mem_pool); + } + + nxt_thread_mutex_destroy(&process->incoming_mutex); + nxt_thread_mutex_destroy(&process->outgoing_mutex); + nxt_thread_mutex_destroy(&process->cp_mutex); + + nxt_mp_free(rt->mem_pool, process); +} + + static nxt_int_t nxt_runtime_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data) { @@ -1606,6 +1625,8 @@ nxt_runtime_process_add(nxt_runtime_t *rt, nxt_process_t *process) nxt_process_port_each(process, port) { + port->pid = process->pid; + nxt_runtime_port_add(rt, port); } nxt_process_port_loop; @@ -1621,9 +1642,7 @@ nxt_runtime_process_add(nxt_runtime_t *rt, nxt_process_t *process) void nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process) { - uint32_t i; nxt_port_t *port; - nxt_port_mmap_t *port_mmap; nxt_lvlhsh_query_t lhq; lhq.key_hash = nxt_murmur_hash2(&process->pid, sizeof(process->pid)); @@ -1645,35 +1664,8 @@ nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process) } nxt_process_port_loop; - if (process->incoming) { - nxt_mp_thread_adopt(process->incoming_mp); - - port_mmap = process->incoming->elts; - - for (i = 0; i < process->incoming->nelts; i++) { - nxt_port_mmap_destroy(port_mmap); - } - - nxt_thread_mutex_destroy(&process->incoming_mutex); - - nxt_mp_destroy(process->incoming_mp); - } - - if (process->outgoing) { - nxt_mp_thread_adopt(process->outgoing_mp); - - port_mmap = process->outgoing->elts; - - for (i = 0; i < process->outgoing->nelts; i++) { - nxt_port_mmap_destroy(port_mmap); - } - - nxt_thread_mutex_destroy(&process->outgoing_mutex); + nxt_runtime_process_destroy(rt, process); - nxt_mp_destroy(process->outgoing_mp); - } - - nxt_mp_free(rt->mem_pool, process); break; default: @@ -1704,6 +1696,8 @@ void nxt_runtime_port_add(nxt_runtime_t *rt, nxt_port_t *port) { nxt_port_hash_add(&rt->ports, rt->mem_pool, port); + + rt->port_by_type[port->type] = port; } @@ -1712,6 +1706,10 @@ nxt_runtime_port_remove(nxt_runtime_t *rt, nxt_port_t *port) { nxt_port_hash_remove(&rt->ports, rt->mem_pool, port); + if (rt->port_by_type[port->type] == port) { + rt->port_by_type[port->type] = NULL; + } + if (port->pair[0] != -1) { nxt_fd_close(port->pair[0]); } @@ -1720,11 +1718,15 @@ nxt_runtime_port_remove(nxt_runtime_t *rt, nxt_port_t *port) nxt_fd_close(port->pair[1]); } + if (port->type == NXT_PROCESS_WORKER) { + nxt_router_app_remove_port(port); + } + if (port->mem_pool) { nxt_mp_destroy(port->mem_pool); } - nxt_mp_free(port->process->mem_pool, port); + nxt_mp_free(rt->mem_pool, port); } |