diff options
author | Max Romanov <max.romanov@nginx.com> | 2017-07-12 20:32:16 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2017-07-12 20:32:16 +0300 |
commit | b0c1e740cf404f8fed5eed75fddb205ca74314e0 (patch) | |
tree | 08dcefc827c5dfb1570b682ea8d1e9abf17a31dc /src/nxt_process.c | |
parent | c38bcb7d70729434893ae4d5f2f58a78a36d2bd5 (diff) | |
download | unit-b0c1e740cf404f8fed5eed75fddb205ca74314e0.tar.gz unit-b0c1e740cf404f8fed5eed75fddb205ca74314e0.tar.bz2 |
New process port exchange changed. READY message type introduced.
Application process start request DATA message from router to master.
Master notifies router via NEW_PORT message after worker process become ready.
Diffstat (limited to 'src/nxt_process.c')
-rw-r--r-- | src/nxt_process.c | 132 |
1 files changed, 99 insertions, 33 deletions
diff --git a/src/nxt_process.c b/src/nxt_process.c index 7d5af92d..5899ce30 100644 --- a/src/nxt_process.c +++ b/src/nxt_process.c @@ -8,7 +8,7 @@ #include <nxt_master_process.h> -static void nxt_process_start(nxt_task_t *task, nxt_process_init_t *process); +static void nxt_process_start(nxt_task_t *task, nxt_process_t *process); static nxt_int_t nxt_user_groups_get(nxt_task_t *task, nxt_user_cred_t *uc); @@ -23,6 +23,7 @@ nxt_pid_t nxt_process_create(nxt_task_t *task, nxt_process_t *process) { nxt_pid_t pid; + nxt_process_t *p; nxt_runtime_t *rt; rt = task->thread->runtime; @@ -44,13 +45,31 @@ nxt_process_create(nxt_task_t *task, nxt_process_t *process) task->thread->tid = 0; process->pid = nxt_pid; - process->init->port->pid = nxt_pid; rt->types = 0; + nxt_port_reset_next_id(); + + /* Remove not ready processes */ + nxt_runtime_process_each(rt, p) { + + if (!p->ready) { + nxt_debug(task, "remove not ready process %PI", p->pid); + + nxt_runtime_process_remove(rt, p); + } else { + nxt_port_mmaps_destroy(p->incoming, 0); + nxt_port_mmaps_destroy(p->outgoing, 0); + } + + } nxt_runtime_process_loop; + nxt_runtime_process_add(rt, process); - nxt_process_start(task, process->init); + nxt_process_start(task, process); + + process->ready = 1; + break; default: @@ -58,7 +77,6 @@ nxt_process_create(nxt_task_t *task, nxt_process_t *process) nxt_debug(task, "fork(\"%s\"): %PI", process->init->name, pid); process->pid = pid; - process->init->port->pid = pid; nxt_runtime_process_add(rt, process); @@ -70,26 +88,30 @@ nxt_process_create(nxt_task_t *task, nxt_process_t *process) static void -nxt_process_start(nxt_task_t *task, nxt_process_init_t *process) +nxt_process_start(nxt_task_t *task, nxt_process_t *process) { nxt_int_t ret; + nxt_port_t *port, *master_port; nxt_thread_t *thread; nxt_runtime_t *rt; + nxt_process_init_t *init; nxt_event_engine_t *engine; const nxt_event_interface_t *interface; - nxt_log(task, NXT_LOG_INFO, "%s started", process->name); + init = process->init; - nxt_process_title(task, "nginext: %s", process->name); + nxt_log(task, NXT_LOG_INFO, "%s started", init->name); + + nxt_process_title(task, "nginext: %s", init->name); thread = task->thread; nxt_random_init(&thread->random); - if (process->user_cred != NULL && getuid() == 0) { + if (init->user_cred != NULL && getuid() == 0) { /* Super-user. */ - ret = nxt_user_cred_set(task, process->user_cred); + ret = nxt_user_cred_set(task, init->user_cred); if (ret != NXT_OK) { goto fail; } @@ -97,15 +119,15 @@ nxt_process_start(nxt_task_t *task, nxt_process_init_t *process) rt = thread->runtime; - rt->types |= (1U << process->type); + rt->types |= (1U << init->type); engine = thread->engine; /* Update inherited master process event engine and signals processing. */ - engine->signals->sigev = process->signals; + engine->signals->sigev = init->signals; interface = nxt_service_get(rt->services, "engine", rt->engine); - if (interface == NULL) { + if (nxt_slow_path(interface == NULL)) { goto fail; } @@ -113,25 +135,40 @@ nxt_process_start(nxt_task_t *task, nxt_process_init_t *process) goto fail; } - nxt_port_read_close(process->master_port); - nxt_port_write_enable(task, process->master_port); - - /* A worker process port. */ - nxt_port_write_close(process->port); - nxt_port_create(task, process->port, process->port_handlers); - ret = nxt_runtime_thread_pool_create(thread, rt, rt->auxiliary_threads, 60000 * 1000000LL); - if (ret != NXT_OK) { + if (nxt_slow_path(ret != NXT_OK)) { + goto fail; + } + + master_port = rt->port_by_type[NXT_PROCESS_MASTER]; + + nxt_port_read_close(master_port); + nxt_port_write_enable(task, master_port); + + port = nxt_process_port_first(process); + + nxt_port_write_close(port); + + ret = init->start(task, init->data); + + if (nxt_slow_path(ret != NXT_OK)) { goto fail; } - ret = process->start(task, rt); + nxt_port_enable(task, port, init->port_handlers); - if (ret == NXT_OK) { - return; + ret = nxt_port_socket_write(task, master_port, NXT_PORT_MSG_READY, + -1, init->stream, 0, NULL); + + if (nxt_slow_path(ret != NXT_OK)) { + nxt_log(task, NXT_LOG_ERR, "failed to send READY message to master"); + + goto fail; } + return; + fail: exit(1); @@ -527,15 +564,24 @@ nxt_user_cred_set(nxt_task_t *task, nxt_user_cred_t *uc) nxt_port_t * -nxt_process_port_new(nxt_process_t *process) +nxt_process_port_new(nxt_runtime_t *rt, nxt_process_t *process, + nxt_port_id_t id, nxt_process_type_t type) { + size_t size; nxt_port_t *port; - port = nxt_mp_zalloc(process->mem_pool, sizeof(nxt_port_t)); + size = sizeof(nxt_port_t); + if (size == NXT_PROCESS_WORKER) { + size += sizeof(nxt_work_t); + } + + port = nxt_mp_zalloc(rt->mem_pool, size); + if (nxt_fast_path(port != NULL)) { - port->id = process->last_port_id++; + port->id = id; port->pid = process->pid; port->process = process; + port->type = type; nxt_process_port_add(process, port); } @@ -547,25 +593,45 @@ nxt_process_port_new(nxt_process_t *process) void nxt_process_connected_port_add(nxt_process_t *process, nxt_port_t *port) { - /* TODO lock ports */ + nxt_thread_mutex_lock(&process->cp_mutex); + + if (process->cp_mem_pool == NULL) { + process->cp_mem_pool = nxt_mp_create(1024, 128, 256, 32); + } - nxt_port_hash_add(&process->connected_ports, process->mem_pool, port); + nxt_mp_thread_adopt(process->cp_mem_pool); + + nxt_port_hash_add(&process->connected_ports, process->cp_mem_pool, port); + + nxt_thread_mutex_unlock(&process->cp_mutex); } void nxt_process_connected_port_remove(nxt_process_t *process, nxt_port_t *port) { - /* TODO lock ports */ + nxt_thread_mutex_lock(&process->cp_mutex); + + if (process->cp_mem_pool != NULL) { + nxt_mp_thread_adopt(process->cp_mem_pool); + + nxt_port_hash_remove(&process->connected_ports, process->cp_mem_pool, + port); + } - nxt_port_hash_remove(&process->connected_ports, process->mem_pool, port); + nxt_thread_mutex_unlock(&process->cp_mutex); } nxt_port_t * nxt_process_connected_port_find(nxt_process_t *process, nxt_pid_t pid, nxt_port_id_t port_id) { - /* TODO lock ports */ + nxt_port_t *res; - return nxt_port_hash_find(&process->connected_ports, pid, port_id); -} + nxt_thread_mutex_lock(&process->cp_mutex); + + res = nxt_port_hash_find(&process->connected_ports, pid, port_id); + nxt_thread_mutex_unlock(&process->cp_mutex); + + return res; +} |