diff options
author | Max Romanov <max.romanov@nginx.com> | 2017-05-12 20:32:41 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2017-05-12 20:32:41 +0300 |
commit | f7b4bdfd892a0b479dc946896435a3ba7f9615dd (patch) | |
tree | a6f0c4ebaeed2d9f0fcb1c07178b52a684a53280 /src/nxt_master_process.c | |
parent | 1782c771fab999b37a8c04ed72760e3528205be7 (diff) | |
download | unit-f7b4bdfd892a0b479dc946896435a3ba7f9615dd.tar.gz unit-f7b4bdfd892a0b479dc946896435a3ba7f9615dd.tar.bz2 |
Using shared memory to send data via nxt_port.
Usage:
b = nxt_port_mmap_get_buf(task, port, size);
b->mem.free = nxt_cpymem(b->mem.free, data, size);
nxt_port_socket_write(task, port, NXT_PORT_MSG_DATA, -1, 0, b);
Diffstat (limited to '')
-rw-r--r-- | src/nxt_master_process.c | 81 |
1 files changed, 37 insertions, 44 deletions
diff --git a/src/nxt_master_process.c b/src/nxt_master_process.c index e3d5382a..ed39130b 100644 --- a/src/nxt_master_process.c +++ b/src/nxt_master_process.c @@ -80,14 +80,12 @@ nxt_master_process_port_create(nxt_task_t *task, nxt_runtime_t *rt) nxt_port_t *port; nxt_process_t *process; - process = nxt_runtime_new_process(rt); + process = nxt_runtime_process_get(rt, nxt_pid); if (nxt_slow_path(process == NULL)) { return NXT_ERROR; } - process->pid = nxt_pid; - - port = nxt_array_zero_add(process->ports); + port = nxt_process_port_new(process); if (nxt_slow_path(port == NULL)) { return NXT_ERROR; } @@ -97,8 +95,10 @@ nxt_master_process_port_create(nxt_task_t *task, nxt_runtime_t *rt) return ret; } - port->pid = nxt_pid; port->engine = 0; + port->type = NXT_PROCESS_MASTER; + + nxt_runtime_port_add(rt, port); /* * A master process port. A write port is not closed @@ -220,17 +220,16 @@ nxt_master_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt, * TODO: remove process, init, ports from array on memory and fork failures. */ - process = nxt_runtime_new_process(rt); + process = nxt_runtime_process_new(rt); if (nxt_slow_path(process == NULL)) { return NXT_ERROR; } process->init = init; + master_process = rt->mprocess; + init->master_port = nxt_process_port_first(master_process); - master_process = rt->processes->elts; - init->master_port = master_process->ports->elts; - - port = nxt_array_zero_add(process->ports); + port = nxt_process_port_new(process); if (nxt_slow_path(port == NULL)) { return NXT_ERROR; } @@ -243,6 +242,7 @@ nxt_master_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt, } port->engine = 0; + port->type = init->type; pid = nxt_process_create(task, init); @@ -253,6 +253,11 @@ nxt_master_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt, case 0: /* A worker process, return to the event engine work queue loop. */ + process->pid = nxt_pid; + port->pid = nxt_pid; + + nxt_runtime_process_add(rt, process); + return NXT_AGAIN; default: @@ -260,6 +265,8 @@ nxt_master_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt, process->pid = pid; port->pid = pid; + nxt_runtime_process_add(rt, process); + nxt_port_read_close(port); nxt_port_write_enable(task, port); @@ -272,27 +279,23 @@ nxt_master_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt, void nxt_master_stop_worker_processes(nxt_task_t *task, nxt_runtime_t *rt) { - nxt_uint_t i, n, nprocesses, nports; nxt_port_t *port; nxt_process_t *process; - process = rt->processes->elts; - nprocesses = rt->processes->nelts; + nxt_runtime_process_each(rt, process) + { + if (nxt_pid != process->pid) { + process->init = NULL; - for (i = 0; i < nprocesses; i++) { + nxt_process_port_each(process, port) { - if (nxt_pid != process[i].pid) { - process[i].init = NULL; + (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, + -1, 0, 0, NULL); - port = process[i].ports->elts; - nports = process[i].ports->nelts; - - for (n = 0; n < nports; n++) { - (void) nxt_port_socket_write(task, &port[n], NXT_PORT_MSG_QUIT, - -1, 0, NULL); - } + } nxt_process_port_loop; } } + nxt_runtime_process_loop; } @@ -331,7 +334,7 @@ nxt_master_process_sigusr1_handler(nxt_task_t *task, void *obj, void *data) nxt_int_t ret; nxt_uint_t n; nxt_file_t *file, *new_file; - nxt_runtime_t *rt; + nxt_runtime_t *rt; nxt_array_t *new_files; nxt_mem_pool_t *mp; @@ -473,39 +476,29 @@ nxt_master_process_sigchld_handler(nxt_task_t *task, void *obj, void *data) static void nxt_master_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid) { - nxt_uint_t i, n; nxt_runtime_t *rt; nxt_process_t *process; nxt_process_init_t *init; rt = task->thread->runtime; - process = rt->processes->elts; - n = rt->processes->nelts; - - /* A process[0] is the master process. */ + process = nxt_runtime_process_find(rt, pid); - for (i = 1; i < n; i++) { + if (process) { + init = process->init; - if (pid == process[i].pid) { - init = process[i].init; + /* TODO: free ports fds. */ - /* TODO: free ports fds. */ + nxt_runtime_process_remove(rt, process); - nxt_array_remove(rt->processes, &process[i]); + if (nxt_exiting) { - if (nxt_exiting) { - nxt_debug(task, "processes %d", n); - - if (n == 2) { - nxt_runtime_quit(task); - } - - } else if (init != NULL) { - (void) nxt_master_create_worker_process(task, rt, init); + if (rt->nprocesses == 2) { + nxt_runtime_quit(task); } - return; + } else if (init != NULL) { + (void) nxt_master_create_worker_process(task, rt, init); } } } |