summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_main_process.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nxt_main_process.c')
-rw-r--r--src/nxt_main_process.c183
1 files changed, 165 insertions, 18 deletions
diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c
index eff96e14..a5a20d3d 100644
--- a/src/nxt_main_process.c
+++ b/src/nxt_main_process.c
@@ -10,6 +10,7 @@
#include <nxt_main_process.h>
#include <nxt_conf.h>
#include <nxt_router.h>
+#include <nxt_port_queue.h>
#if (NXT_TLS)
#include <nxt_cert.h>
#endif
@@ -52,6 +53,8 @@ static nxt_int_t nxt_main_listening_socket(nxt_sockaddr_t *sa,
static void nxt_main_port_modules_handler(nxt_task_t *task,
nxt_port_recv_msg_t *msg);
static int nxt_cdecl nxt_app_lang_compare(const void *v1, const void *v2);
+static void nxt_main_process_whoami_handler(nxt_task_t *task,
+ nxt_port_recv_msg_t *msg);
static void nxt_main_port_conf_store_handler(nxt_task_t *task,
nxt_port_recv_msg_t *msg);
static nxt_int_t nxt_main_file_store(nxt_task_t *task, const char *tmp_name,
@@ -326,7 +329,7 @@ static nxt_conf_app_map_t nxt_app_maps[] = {
static void
-nxt_port_main_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
+nxt_main_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
nxt_debug(task, "main data: %*s",
nxt_buf_mem_used_size(&msg->buf->mem), msg->buf->mem.pos);
@@ -334,7 +337,33 @@ nxt_port_main_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
static void
-nxt_port_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
+nxt_main_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
+{
+ void *mem;
+ nxt_port_t *port;
+
+ nxt_port_new_port_handler(task, msg);
+
+ port = msg->u.new_port;
+
+ if (port != NULL
+ && port->type == NXT_PROCESS_APP
+ && msg->fd[1] != -1)
+ {
+ mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
+ PROT_READ | PROT_WRITE, MAP_SHARED, msg->fd[1], 0);
+ if (nxt_fast_path(mem != MAP_FAILED)) {
+ port->queue = mem;
+ }
+
+ nxt_fd_close(msg->fd[1]);
+ msg->fd[1] = -1;
+ }
+}
+
+
+static void
+nxt_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
u_char *start, *p, ch;
size_t type_len;
@@ -374,16 +403,18 @@ nxt_port_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
return;
}
+ process->parent_port = rt->port_by_type[NXT_PROCESS_MAIN];
+
init = nxt_process_init(process);
- *init = nxt_app_process;
+ *init = nxt_proto_process;
b = nxt_buf_chk_make_plain(process->mem_pool, msg->buf, msg->size);
if (b == NULL) {
goto failed;
}
- nxt_debug(task, "main start process: %*s", b->mem.free - b->mem.pos,
+ nxt_debug(task, "main start prototype: %*s", b->mem.free - b->mem.pos,
b->mem.pos);
app_conf = nxt_mp_zalloc(process->mem_pool, sizeof(nxt_common_app_conf_t));
@@ -399,7 +430,7 @@ nxt_port_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
init->name = (const char *) start;
process->name = nxt_mp_alloc(process->mem_pool, app_conf->name.length
- + sizeof("\"\" application") + 1);
+ + sizeof("\"\" prototype") + 1);
if (nxt_slow_path(process->name == NULL)) {
goto failed;
@@ -408,7 +439,7 @@ nxt_port_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
p = (u_char *) process->name;
*p++ = '"';
p = nxt_cpymem(p, init->name, app_conf->name.length);
- p = nxt_cpymem(p, "\" application", 13);
+ p = nxt_cpymem(p, "\" prototype", 11);
*p = '\0';
app_conf->shm_limit = 100 * 1024 * 1024;
@@ -504,21 +535,17 @@ nxt_main_process_created_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
rt = task->thread->runtime;
- process = nxt_runtime_process_find(rt, msg->port_msg.pid);
- if (nxt_slow_path(process == NULL)) {
- return;
- }
-
- nxt_assert(process->state == NXT_PROCESS_STATE_CREATING);
-
port = nxt_runtime_port_find(rt, msg->port_msg.pid,
msg->port_msg.reply_port);
-
-
if (nxt_slow_path(port == NULL)) {
return;
}
+ process = port->process;
+
+ nxt_assert(process != NULL);
+ nxt_assert(process->state == NXT_PROCESS_STATE_CREATING);
+
#if (NXT_HAVE_CLONE && NXT_HAVE_CLONE_NEWUSER)
if (nxt_is_clone_flag_set(process->isolation.clone.flags, NEWUSER)) {
if (nxt_slow_path(nxt_clone_credential_map(task, process->pid,
@@ -542,10 +569,13 @@ nxt_main_process_created_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
static nxt_port_handlers_t nxt_main_process_port_handlers = {
- .data = nxt_port_main_data_handler,
+ .data = nxt_main_data_handler,
+ .new_port = nxt_main_new_port_handler,
.process_created = nxt_main_process_created_handler,
.process_ready = nxt_port_process_ready_handler,
- .start_process = nxt_port_main_start_process_handler,
+ .whoami = nxt_main_process_whoami_handler,
+ .remove_pid = nxt_port_remove_pid_handler,
+ .start_process = nxt_main_start_process_handler,
.socket = nxt_main_port_socket_handler,
.modules = nxt_main_port_modules_handler,
.conf_store = nxt_main_port_conf_store_handler,
@@ -559,6 +589,88 @@ static nxt_port_handlers_t nxt_main_process_port_handlers = {
};
+static void
+nxt_main_process_whoami_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
+{
+ nxt_buf_t *buf;
+ nxt_pid_t pid, ppid;
+ nxt_port_t *port;
+ nxt_runtime_t *rt;
+ nxt_process_t *pprocess;
+
+ nxt_assert(msg->port_msg.reply_port == 0);
+
+ if (nxt_slow_path(msg->buf == NULL
+ || nxt_buf_used_size(msg->buf) != sizeof(nxt_pid_t)))
+ {
+ nxt_alert(task, "whoami: buffer is NULL or unexpected size");
+ goto fail;
+ }
+
+ nxt_memcpy(&ppid, msg->buf->mem.pos, sizeof(nxt_pid_t));
+
+ rt = task->thread->runtime;
+
+ pprocess = nxt_runtime_process_find(rt, ppid);
+ if (nxt_slow_path(pprocess == NULL)) {
+ nxt_alert(task, "whoami: parent process %PI not found", ppid);
+ goto fail;
+ }
+
+ pid = nxt_recv_msg_cmsg_pid(msg);
+
+ nxt_debug(task, "whoami: from %PI, parent %PI, fd %d", pid, ppid,
+ msg->fd[0]);
+
+ if (msg->fd[0] != -1) {
+ port = nxt_runtime_process_port_create(task, rt, pid, 0,
+ NXT_PROCESS_APP);
+ if (nxt_slow_path(port == NULL)) {
+ goto fail;
+ }
+
+ nxt_fd_nonblocking(task, msg->fd[0]);
+
+ port->pair[0] = -1;
+ port->pair[1] = msg->fd[0];
+ msg->fd[0] = -1;
+
+ port->max_size = 16 * 1024;
+ port->max_share = 64 * 1024;
+ port->socket.task = task;
+
+ nxt_port_write_enable(task, port);
+
+ } else {
+ port = nxt_runtime_port_find(rt, pid, 0);
+ if (nxt_slow_path(port == NULL)) {
+ goto fail;
+ }
+ }
+
+ if (ppid != nxt_pid) {
+ nxt_queue_insert_tail(&pprocess->children, &port->process->link);
+ }
+
+ buf = nxt_buf_mem_alloc(task->thread->engine->mem_pool,
+ sizeof(nxt_pid_t), 0);
+ if (nxt_slow_path(buf == NULL)) {
+ goto fail;
+ }
+
+ buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(nxt_pid_t));
+
+ (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_READY_LAST, -1,
+ msg->port_msg.stream, 0, buf);
+
+fail:
+
+ if (msg->fd[0] != -1) {
+ nxt_fd_close(msg->fd[0]);
+ }
+}
+
+
static nxt_int_t
nxt_main_process_port_create(nxt_task_t *task, nxt_runtime_t *rt)
{
@@ -751,8 +863,10 @@ nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data)
nxt_int_t ret;
nxt_err_t err;
nxt_pid_t pid;
+ nxt_port_t *port;
+ nxt_queue_t children;
nxt_runtime_t *rt;
- nxt_process_t *process;
+ nxt_process_t *process, *child;
nxt_process_init_t init;
nxt_debug(task, "sigchld handler signo:%d (%s)",
@@ -809,7 +923,31 @@ nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data)
process->stream = 0;
}
+ nxt_queue_init(&children);
+
+ if (!nxt_queue_is_empty(&process->children)) {
+ nxt_queue_add(&children, &process->children);
+
+ nxt_queue_init(&process->children);
+
+ nxt_queue_each(child, &children, nxt_process_t, link) {
+ port = nxt_process_port_first(child);
+
+ (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT,
+ -1, 0, 0, NULL);
+ } nxt_queue_loop;
+ }
+
if (nxt_exiting) {
+ nxt_process_close_ports(task, process);
+
+ nxt_queue_each(child, &children, nxt_process_t, link) {
+ nxt_queue_remove(&child->link);
+ child->link.next = NULL;
+
+ nxt_process_close_ports(task, child);
+ } nxt_queue_loop;
+
if (rt->nprocesses <= 1) {
nxt_runtime_quit(task, 0);
}
@@ -819,6 +957,15 @@ nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data)
nxt_port_remove_notify_others(task, process);
+ nxt_queue_each(child, &children, nxt_process_t, link) {
+ nxt_port_remove_notify_others(task, child);
+
+ nxt_queue_remove(&child->link);
+ child->link.next = NULL;
+
+ nxt_process_close_ports(task, child);
+ } nxt_queue_loop;
+
init = *(nxt_process_init_t *) nxt_process_init(process);
nxt_process_close_ports(task, process);