summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_process.c
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2017-07-12 20:32:16 +0300
committerMax Romanov <max.romanov@nginx.com>2017-07-12 20:32:16 +0300
commitb0c1e740cf404f8fed5eed75fddb205ca74314e0 (patch)
tree08dcefc827c5dfb1570b682ea8d1e9abf17a31dc /src/nxt_process.c
parentc38bcb7d70729434893ae4d5f2f58a78a36d2bd5 (diff)
downloadunit-b0c1e740cf404f8fed5eed75fddb205ca74314e0.tar.gz
unit-b0c1e740cf404f8fed5eed75fddb205ca74314e0.tar.bz2
New process port exchange changed. READY message type introduced.
Application process start request DATA message from router to master. Master notifies router via NEW_PORT message after worker process become ready.
Diffstat (limited to '')
-rw-r--r--src/nxt_process.c132
1 files changed, 99 insertions, 33 deletions
diff --git a/src/nxt_process.c b/src/nxt_process.c
index 7d5af92d..5899ce30 100644
--- a/src/nxt_process.c
+++ b/src/nxt_process.c
@@ -8,7 +8,7 @@
#include <nxt_master_process.h>
-static void nxt_process_start(nxt_task_t *task, nxt_process_init_t *process);
+static void nxt_process_start(nxt_task_t *task, nxt_process_t *process);
static nxt_int_t nxt_user_groups_get(nxt_task_t *task, nxt_user_cred_t *uc);
@@ -23,6 +23,7 @@ nxt_pid_t
nxt_process_create(nxt_task_t *task, nxt_process_t *process)
{
nxt_pid_t pid;
+ nxt_process_t *p;
nxt_runtime_t *rt;
rt = task->thread->runtime;
@@ -44,13 +45,31 @@ nxt_process_create(nxt_task_t *task, nxt_process_t *process)
task->thread->tid = 0;
process->pid = nxt_pid;
- process->init->port->pid = nxt_pid;
rt->types = 0;
+ nxt_port_reset_next_id();
+
+ /* Remove not ready processes */
+ nxt_runtime_process_each(rt, p) {
+
+ if (!p->ready) {
+ nxt_debug(task, "remove not ready process %PI", p->pid);
+
+ nxt_runtime_process_remove(rt, p);
+ } else {
+ nxt_port_mmaps_destroy(p->incoming, 0);
+ nxt_port_mmaps_destroy(p->outgoing, 0);
+ }
+
+ } nxt_runtime_process_loop;
+
nxt_runtime_process_add(rt, process);
- nxt_process_start(task, process->init);
+ nxt_process_start(task, process);
+
+ process->ready = 1;
+
break;
default:
@@ -58,7 +77,6 @@ nxt_process_create(nxt_task_t *task, nxt_process_t *process)
nxt_debug(task, "fork(\"%s\"): %PI", process->init->name, pid);
process->pid = pid;
- process->init->port->pid = pid;
nxt_runtime_process_add(rt, process);
@@ -70,26 +88,30 @@ nxt_process_create(nxt_task_t *task, nxt_process_t *process)
static void
-nxt_process_start(nxt_task_t *task, nxt_process_init_t *process)
+nxt_process_start(nxt_task_t *task, nxt_process_t *process)
{
nxt_int_t ret;
+ nxt_port_t *port, *master_port;
nxt_thread_t *thread;
nxt_runtime_t *rt;
+ nxt_process_init_t *init;
nxt_event_engine_t *engine;
const nxt_event_interface_t *interface;
- nxt_log(task, NXT_LOG_INFO, "%s started", process->name);
+ init = process->init;
- nxt_process_title(task, "nginext: %s", process->name);
+ nxt_log(task, NXT_LOG_INFO, "%s started", init->name);
+
+ nxt_process_title(task, "nginext: %s", init->name);
thread = task->thread;
nxt_random_init(&thread->random);
- if (process->user_cred != NULL && getuid() == 0) {
+ if (init->user_cred != NULL && getuid() == 0) {
/* Super-user. */
- ret = nxt_user_cred_set(task, process->user_cred);
+ ret = nxt_user_cred_set(task, init->user_cred);
if (ret != NXT_OK) {
goto fail;
}
@@ -97,15 +119,15 @@ nxt_process_start(nxt_task_t *task, nxt_process_init_t *process)
rt = thread->runtime;
- rt->types |= (1U << process->type);
+ rt->types |= (1U << init->type);
engine = thread->engine;
/* Update inherited master process event engine and signals processing. */
- engine->signals->sigev = process->signals;
+ engine->signals->sigev = init->signals;
interface = nxt_service_get(rt->services, "engine", rt->engine);
- if (interface == NULL) {
+ if (nxt_slow_path(interface == NULL)) {
goto fail;
}
@@ -113,25 +135,40 @@ nxt_process_start(nxt_task_t *task, nxt_process_init_t *process)
goto fail;
}
- nxt_port_read_close(process->master_port);
- nxt_port_write_enable(task, process->master_port);
-
- /* A worker process port. */
- nxt_port_write_close(process->port);
- nxt_port_create(task, process->port, process->port_handlers);
-
ret = nxt_runtime_thread_pool_create(thread, rt, rt->auxiliary_threads,
60000 * 1000000LL);
- if (ret != NXT_OK) {
+ if (nxt_slow_path(ret != NXT_OK)) {
+ goto fail;
+ }
+
+ master_port = rt->port_by_type[NXT_PROCESS_MASTER];
+
+ nxt_port_read_close(master_port);
+ nxt_port_write_enable(task, master_port);
+
+ port = nxt_process_port_first(process);
+
+ nxt_port_write_close(port);
+
+ ret = init->start(task, init->data);
+
+ if (nxt_slow_path(ret != NXT_OK)) {
goto fail;
}
- ret = process->start(task, rt);
+ nxt_port_enable(task, port, init->port_handlers);
- if (ret == NXT_OK) {
- return;
+ ret = nxt_port_socket_write(task, master_port, NXT_PORT_MSG_READY,
+ -1, init->stream, 0, NULL);
+
+ if (nxt_slow_path(ret != NXT_OK)) {
+ nxt_log(task, NXT_LOG_ERR, "failed to send READY message to master");
+
+ goto fail;
}
+ return;
+
fail:
exit(1);
@@ -527,15 +564,24 @@ nxt_user_cred_set(nxt_task_t *task, nxt_user_cred_t *uc)
nxt_port_t *
-nxt_process_port_new(nxt_process_t *process)
+nxt_process_port_new(nxt_runtime_t *rt, nxt_process_t *process,
+ nxt_port_id_t id, nxt_process_type_t type)
{
+ size_t size;
nxt_port_t *port;
- port = nxt_mp_zalloc(process->mem_pool, sizeof(nxt_port_t));
+ size = sizeof(nxt_port_t);
+ if (size == NXT_PROCESS_WORKER) {
+ size += sizeof(nxt_work_t);
+ }
+
+ port = nxt_mp_zalloc(rt->mem_pool, size);
+
if (nxt_fast_path(port != NULL)) {
- port->id = process->last_port_id++;
+ port->id = id;
port->pid = process->pid;
port->process = process;
+ port->type = type;
nxt_process_port_add(process, port);
}
@@ -547,25 +593,45 @@ nxt_process_port_new(nxt_process_t *process)
void
nxt_process_connected_port_add(nxt_process_t *process, nxt_port_t *port)
{
- /* TODO lock ports */
+ nxt_thread_mutex_lock(&process->cp_mutex);
+
+ if (process->cp_mem_pool == NULL) {
+ process->cp_mem_pool = nxt_mp_create(1024, 128, 256, 32);
+ }
- nxt_port_hash_add(&process->connected_ports, process->mem_pool, port);
+ nxt_mp_thread_adopt(process->cp_mem_pool);
+
+ nxt_port_hash_add(&process->connected_ports, process->cp_mem_pool, port);
+
+ nxt_thread_mutex_unlock(&process->cp_mutex);
}
void
nxt_process_connected_port_remove(nxt_process_t *process, nxt_port_t *port)
{
- /* TODO lock ports */
+ nxt_thread_mutex_lock(&process->cp_mutex);
+
+ if (process->cp_mem_pool != NULL) {
+ nxt_mp_thread_adopt(process->cp_mem_pool);
+
+ nxt_port_hash_remove(&process->connected_ports, process->cp_mem_pool,
+ port);
+ }
- nxt_port_hash_remove(&process->connected_ports, process->mem_pool, port);
+ nxt_thread_mutex_unlock(&process->cp_mutex);
}
nxt_port_t *
nxt_process_connected_port_find(nxt_process_t *process, nxt_pid_t pid,
nxt_port_id_t port_id)
{
- /* TODO lock ports */
+ nxt_port_t *res;
- return nxt_port_hash_find(&process->connected_ports, pid, port_id);
-}
+ nxt_thread_mutex_lock(&process->cp_mutex);
+
+ res = nxt_port_hash_find(&process->connected_ports, pid, port_id);
+ nxt_thread_mutex_unlock(&process->cp_mutex);
+
+ return res;
+}