summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_application.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/nxt_application.c451
1 files changed, 438 insertions, 13 deletions
diff --git a/src/nxt_application.c b/src/nxt_application.c
index f42905eb..589821fb 100644
--- a/src/nxt_application.c
+++ b/src/nxt_application.c
@@ -42,9 +42,23 @@ static void nxt_discovery_quit(nxt_task_t *task, nxt_port_recv_msg_t *msg,
void *data);
static nxt_app_module_t *nxt_app_module_load(nxt_task_t *task,
const char *name);
+static nxt_int_t nxt_proto_setup(nxt_task_t *task, nxt_process_t *process);
+static nxt_int_t nxt_proto_start(nxt_task_t *task, nxt_process_data_t *data);
static nxt_int_t nxt_app_setup(nxt_task_t *task, nxt_process_t *process);
static nxt_int_t nxt_app_set_environment(nxt_conf_value_t *environment);
+static void nxt_proto_start_process_handler(nxt_task_t *task,
+ nxt_port_recv_msg_t *msg);
+static void nxt_proto_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
+static void nxt_proto_process_created_handler(nxt_task_t *task,
+ nxt_port_recv_msg_t *msg);
+static void nxt_proto_quit_children(nxt_task_t *task);
+static nxt_process_t *nxt_proto_process_find(nxt_task_t *task, nxt_pid_t pid);
+static void nxt_proto_process_add(nxt_task_t *task, nxt_process_t *process);
+static nxt_process_t *nxt_proto_process_remove(nxt_task_t *task, nxt_pid_t pid);
static u_char *nxt_cstr_dup(nxt_mp_t *mp, u_char *dst, u_char *src);
+static void nxt_proto_signal_handler(nxt_task_t *task, void *obj, void *data);
+static void nxt_proto_sigterm_handler(nxt_task_t *task, void *obj, void *data);
+static void nxt_proto_sigchld_handler(nxt_task_t *task, void *obj, void *data);
nxt_str_t nxt_server = nxt_string(NXT_SERVER);
@@ -55,7 +69,12 @@ static uint32_t compat[] = {
};
-static nxt_app_module_t *nxt_app;
+static nxt_lvlhsh_t nxt_proto_processes;
+static nxt_queue_t nxt_proto_children;
+static nxt_bool_t nxt_proto_exiting;
+
+static nxt_app_module_t *nxt_app;
+static nxt_common_app_conf_t *nxt_app_conf;
static const nxt_port_handlers_t nxt_discovery_process_port_handlers = {
@@ -70,6 +89,29 @@ static const nxt_port_handlers_t nxt_discovery_process_port_handlers = {
};
+const nxt_sig_event_t nxt_prototype_signals[] = {
+ nxt_event_signal(SIGHUP, nxt_proto_signal_handler),
+ nxt_event_signal(SIGINT, nxt_proto_sigterm_handler),
+ nxt_event_signal(SIGQUIT, nxt_proto_sigterm_handler),
+ nxt_event_signal(SIGTERM, nxt_proto_sigterm_handler),
+ nxt_event_signal(SIGCHLD, nxt_proto_sigchld_handler),
+ nxt_event_signal_end,
+};
+
+
+static const nxt_port_handlers_t nxt_proto_process_port_handlers = {
+ .quit = nxt_proto_quit_handler,
+ .change_file = nxt_port_change_log_file_handler,
+ .new_port = nxt_port_new_port_handler,
+ .process_created = nxt_proto_process_created_handler,
+ .process_ready = nxt_port_process_ready_handler,
+ .remove_pid = nxt_port_remove_pid_handler,
+ .start_process = nxt_proto_start_process_handler,
+ .rpc_ready = nxt_port_rpc_handler,
+ .rpc_error = nxt_port_rpc_handler,
+};
+
+
static const nxt_port_handlers_t nxt_app_process_port_handlers = {
.quit = nxt_signal_quit_handler,
.rpc_ready = nxt_port_rpc_handler,
@@ -89,12 +131,23 @@ const nxt_process_init_t nxt_discovery_process = {
};
+const nxt_process_init_t nxt_proto_process = {
+ .type = NXT_PROCESS_PROTOTYPE,
+ .prefork = nxt_isolation_main_prefork,
+ .restart = 0,
+ .setup = nxt_proto_setup,
+ .start = nxt_proto_start,
+ .port_handlers = &nxt_proto_process_port_handlers,
+ .signals = nxt_prototype_signals,
+};
+
+
const nxt_process_init_t nxt_app_process = {
.type = NXT_PROCESS_APP,
.setup = nxt_app_setup,
- .prefork = nxt_isolation_main_prefork,
+ .start = NULL,
+ .prefork = NULL,
.restart = 0,
- .start = NULL, /* set to module->start */
.port_handlers = &nxt_app_process_port_handlers,
.signals = nxt_process_signals,
};
@@ -443,15 +496,18 @@ nxt_discovery_quit(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data)
static nxt_int_t
-nxt_app_setup(nxt_task_t *task, nxt_process_t *process)
+nxt_proto_setup(nxt_task_t *task, nxt_process_t *process)
{
nxt_int_t ret;
- nxt_process_init_t *init;
nxt_app_lang_module_t *lang;
nxt_common_app_conf_t *app_conf;
app_conf = process->data.app;
+ nxt_queue_init(&nxt_proto_children);
+
+ nxt_app_conf = app_conf;
+
lang = nxt_app_lang_module(task->thread->runtime, &app_conf->type);
if (nxt_slow_path(lang == NULL)) {
nxt_alert(task, "unknown application type: \"%V\"", &app_conf->type);
@@ -479,7 +535,6 @@ nxt_app_setup(nxt_task_t *task, nxt_process_t *process)
if (nxt_app->setup != NULL) {
ret = nxt_app->setup(task, process, app_conf);
-
if (nxt_slow_path(ret != NXT_OK)) {
return ret;
}
@@ -514,13 +569,251 @@ nxt_app_setup(nxt_task_t *task, nxt_process_t *process)
}
}
+ process->state = NXT_PROCESS_STATE_CREATED;
+
+ return NXT_OK;
+}
+
+
+static nxt_int_t
+nxt_proto_start(nxt_task_t *task, nxt_process_data_t *data)
+{
+ nxt_debug(task, "prototype waiting for clone messages");
+
+ return NXT_OK;
+}
+
+
+static void
+nxt_proto_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
+{
+ u_char *p;
+ nxt_int_t ret;
+ nxt_port_t *port;
+ nxt_runtime_t *rt;
+ nxt_process_t *process;
+ nxt_process_init_t *init;
+
+ rt = task->thread->runtime;
+
+ process = nxt_process_new(rt);
+ if (nxt_slow_path(process == NULL)) {
+ goto failed;
+ }
+
+ process->mem_pool = nxt_mp_create(1024, 128, 256, 32);
+ if (nxt_slow_path(process->mem_pool == NULL)) {
+ nxt_process_use(task, process, -1);
+ goto failed;
+ }
+
+ process->parent_port = rt->port_by_type[NXT_PROCESS_PROTOTYPE];
+
init = nxt_process_init(process);
+ *init = nxt_app_process;
+
+ process->name = nxt_mp_alloc(process->mem_pool, nxt_app_conf->name.length
+ + sizeof("\"\" application") + 1);
+
+ if (nxt_slow_path(process->name == NULL)) {
+ nxt_process_use(task, process, -1);
+
+ goto failed;
+ }
init->start = nxt_app->start;
+ init->name = (const char *) nxt_app_conf->name.start;
+
+ p = (u_char *) process->name;
+ *p++ = '"';
+ p = nxt_cpymem(p, nxt_app_conf->name.start, nxt_app_conf->name.length);
+ p = nxt_cpymem(p, "\" application", 13);
+ *p = '\0';
+
+ process->user_cred = &rt->user_cred;
+
+ process->data.app = nxt_app_conf;
+ process->stream = msg->port_msg.stream;
+
+ ret = nxt_process_start(task, process);
+ if (nxt_slow_path(ret == NXT_ERROR)) {
+ nxt_process_use(task, process, -1);
+
+ goto failed;
+ }
+
+ nxt_proto_process_add(task, process);
+
+ return;
+
+failed:
+
+ port = nxt_runtime_port_find(rt, msg->port_msg.pid,
+ msg->port_msg.reply_port);
+
+ if (nxt_fast_path(port != NULL)) {
+ nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
+ -1, msg->port_msg.stream, 0, NULL);
+ }
+}
+
+
+static void
+nxt_proto_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
+{
+ nxt_debug(task, "prototype quit handler");
+
+ nxt_proto_quit_children(task);
+
+ nxt_proto_exiting = 1;
+
+ if (nxt_queue_is_empty(&nxt_proto_children)) {
+ nxt_process_quit(task, 0);
+ }
+}
+
+
+static void
+nxt_proto_quit_children(nxt_task_t *task)
+{
+ nxt_port_t *port;
+ nxt_process_t *process;
+
+ nxt_queue_each(process, &nxt_proto_children, nxt_process_t, link) {
+ port = nxt_process_port_first(process);
+
+ (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT,
+ -1, 0, 0, NULL);
+ }
+ nxt_queue_loop;
+}
+
+
+static void
+nxt_proto_process_created_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
+{
+ nxt_pid_t isolated_pid, pid;
+ nxt_process_t *process;
+
+ isolated_pid = nxt_recv_msg_cmsg_pid(msg);
+
+ process = nxt_proto_process_find(task, isolated_pid);
+ if (nxt_slow_path(process == NULL)) {
+ return;
+ }
+
process->state = NXT_PROCESS_STATE_CREATED;
- return NXT_OK;
+ pid = msg->port_msg.pid;
+
+ if (process->pid != pid) {
+ nxt_debug(task, "app process %PI (aka %PI) is created", isolated_pid,
+ pid);
+
+ nxt_runtime_process_remove(task->thread->runtime, process);
+
+ process->pid = pid;
+
+ nxt_runtime_process_add(task, process);
+
+ } else {
+ nxt_debug(task, "app process %PI is created", isolated_pid);
+ }
+}
+
+
+static void
+nxt_proto_signal_handler(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_trace(task, "signal signo:%d (%s) received, ignored",
+ (int) (uintptr_t) obj, data);
+}
+
+
+static void
+nxt_proto_sigterm_handler(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_trace(task, "signal signo:%d (%s) received",
+ (int) (uintptr_t) obj, data);
+
+ nxt_proto_quit_children(task);
+
+ nxt_proto_exiting = 1;
+
+ if (nxt_queue_is_empty(&nxt_proto_children)) {
+ nxt_process_quit(task, 0);
+ }
+}
+
+
+static void
+nxt_proto_sigchld_handler(nxt_task_t *task, void *obj, void *data)
+{
+ int status;
+ nxt_err_t err;
+ nxt_pid_t pid;
+ nxt_process_t *process;
+
+ nxt_debug(task, "proto sigchld handler signo:%d (%s)",
+ (int) (uintptr_t) obj, data);
+
+ for ( ;; ) {
+ pid = waitpid(-1, &status, WNOHANG);
+
+ if (pid == -1) {
+
+ switch (err = nxt_errno) {
+
+ case NXT_ECHILD:
+ return;
+
+ case NXT_EINTR:
+ continue;
+
+ default:
+ nxt_alert(task, "waitpid() failed: %E", err);
+ return;
+ }
+ }
+
+ nxt_debug(task, "waitpid(): %PI", pid);
+
+ if (pid == 0) {
+ return;
+ }
+
+ if (WTERMSIG(status)) {
+#ifdef WCOREDUMP
+ nxt_alert(task, "app process (isolated %PI) exited on signal %d%s",
+ pid, WTERMSIG(status),
+ WCOREDUMP(status) ? " (core dumped)" : "");
+#else
+ nxt_alert(task, "app process (isolated %PI) exited on signal %d",
+ pid, WTERMSIG(status));
+#endif
+
+ } else {
+ nxt_trace(task, "app process (isolated %PI) exited with code %d",
+ pid, WEXITSTATUS(status));
+ }
+
+ process = nxt_proto_process_remove(task, pid);
+ if (process == NULL) {
+ continue;
+ }
+
+ if (process->state != NXT_PROCESS_STATE_CREATING) {
+ nxt_port_remove_notify_others(task, process);
+ }
+
+ nxt_process_close_ports(task, process);
+
+ if (nxt_proto_exiting && nxt_queue_is_empty(&nxt_proto_children)) {
+ nxt_process_quit(task, 0);
+ return;
+ }
+ }
}
@@ -619,6 +912,19 @@ nxt_cstr_dup(nxt_mp_t *mp, u_char *dst, u_char *src)
}
+static nxt_int_t
+nxt_app_setup(nxt_task_t *task, nxt_process_t *process)
+{
+ nxt_process_init_t *init;
+
+ process->state = NXT_PROCESS_STATE_CREATED;
+
+ init = nxt_process_init(process);
+
+ return init->start(task, &process->data);
+}
+
+
nxt_app_lang_module_t *
nxt_app_lang_module(nxt_runtime_t *rt, nxt_str_t *name)
{
@@ -707,15 +1013,15 @@ nxt_int_t
nxt_unit_default_init(nxt_task_t *task, nxt_unit_init_t *init,
nxt_common_app_conf_t *conf)
{
- nxt_port_t *my_port, *main_port, *router_port;
+ nxt_port_t *my_port, *proto_port, *router_port;
nxt_runtime_t *rt;
nxt_memzero(init, sizeof(nxt_unit_init_t));
rt = task->thread->runtime;
- main_port = rt->port_by_type[NXT_PROCESS_MAIN];
- if (nxt_slow_path(main_port == NULL)) {
+ proto_port = rt->port_by_type[NXT_PROCESS_PROTOTYPE];
+ if (nxt_slow_path(proto_port == NULL)) {
return NXT_ERROR;
}
@@ -729,10 +1035,10 @@ nxt_unit_default_init(nxt_task_t *task, nxt_unit_init_t *init,
return NXT_ERROR;
}
- init->ready_port.id.pid = main_port->pid;
- init->ready_port.id.id = main_port->id;
+ init->ready_port.id.pid = proto_port->pid;
+ init->ready_port.id.id = proto_port->id;
init->ready_port.in_fd = -1;
- init->ready_port.out_fd = main_port->pair[1];
+ init->ready_port.out_fd = proto_port->pair[1];
init->ready_stream = my_port->process->stream;
@@ -753,3 +1059,122 @@ nxt_unit_default_init(nxt_task_t *task, nxt_unit_init_t *init,
return NXT_OK;
}
+
+
+static nxt_int_t
+nxt_proto_lvlhsh_isolated_pid_test(nxt_lvlhsh_query_t *lhq, void *data)
+{
+ nxt_pid_t *qpid;
+ nxt_process_t *process;
+
+ process = data;
+ qpid = (nxt_pid_t *) lhq->key.start;
+
+ if (*qpid == process->isolated_pid) {
+ return NXT_OK;
+ }
+
+ return NXT_DECLINED;
+}
+
+
+static const nxt_lvlhsh_proto_t lvlhsh_processes_proto nxt_aligned(64) = {
+ NXT_LVLHSH_DEFAULT,
+ nxt_proto_lvlhsh_isolated_pid_test,
+ nxt_lvlhsh_alloc,
+ nxt_lvlhsh_free,
+};
+
+
+nxt_inline void
+nxt_proto_process_lhq_pid(nxt_lvlhsh_query_t *lhq, nxt_pid_t *pid)
+{
+ lhq->key_hash = nxt_murmur_hash2(pid, sizeof(nxt_pid_t));
+ lhq->key.length = sizeof(nxt_pid_t);
+ lhq->key.start = (u_char *) pid;
+ lhq->proto = &lvlhsh_processes_proto;
+}
+
+
+static void
+nxt_proto_process_add(nxt_task_t *task, nxt_process_t *process)
+{
+ nxt_runtime_t *rt;
+ nxt_lvlhsh_query_t lhq;
+
+ rt = task->thread->runtime;
+
+ nxt_proto_process_lhq_pid(&lhq, &process->isolated_pid);
+
+ lhq.replace = 0;
+ lhq.value = process;
+ lhq.pool = rt->mem_pool;
+
+ switch (nxt_lvlhsh_insert(&nxt_proto_processes, &lhq)) {
+
+ case NXT_OK:
+ nxt_debug(task, "process (isolated %PI) added", process->isolated_pid);
+
+ nxt_queue_insert_tail(&nxt_proto_children, &process->link);
+ break;
+
+ default:
+ nxt_debug(task, "process (isolated %PI) failed to add",
+ process->isolated_pid);
+ break;
+ }
+}
+
+
+static nxt_process_t *
+nxt_proto_process_remove(nxt_task_t *task, nxt_pid_t pid)
+{
+ nxt_runtime_t *rt;
+ nxt_process_t *process;
+ nxt_lvlhsh_query_t lhq;
+
+ nxt_proto_process_lhq_pid(&lhq, &pid);
+
+ rt = task->thread->runtime;
+
+ lhq.pool = rt->mem_pool;
+
+ switch (nxt_lvlhsh_delete(&nxt_proto_processes, &lhq)) {
+
+ case NXT_OK:
+ nxt_debug(task, "process (isolated %PI) removed", pid);
+
+ process = lhq.value;
+
+ nxt_queue_remove(&process->link);
+ break;
+
+ default:
+ nxt_debug(task, "process (isolated %PI) remove failed", pid);
+ process = NULL;
+ break;
+ }
+
+ return process;
+}
+
+
+static nxt_process_t *
+nxt_proto_process_find(nxt_task_t *task, nxt_pid_t pid)
+{
+ nxt_process_t *process;
+ nxt_lvlhsh_query_t lhq;
+
+ nxt_proto_process_lhq_pid(&lhq, &pid);
+
+ if (nxt_lvlhsh_find(&nxt_proto_processes, &lhq) == NXT_OK) {
+ process = lhq.value;
+
+ } else {
+ nxt_debug(task, "process (isolated %PI) not found", pid);
+
+ process = NULL;
+ }
+
+ return process;
+}