diff options
Diffstat (limited to '')
-rw-r--r-- | src/nxt_application.c | 451 |
1 files changed, 438 insertions, 13 deletions
diff --git a/src/nxt_application.c b/src/nxt_application.c index f42905eb..589821fb 100644 --- a/src/nxt_application.c +++ b/src/nxt_application.c @@ -42,9 +42,23 @@ static void nxt_discovery_quit(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data); static nxt_app_module_t *nxt_app_module_load(nxt_task_t *task, const char *name); +static nxt_int_t nxt_proto_setup(nxt_task_t *task, nxt_process_t *process); +static nxt_int_t nxt_proto_start(nxt_task_t *task, nxt_process_data_t *data); static nxt_int_t nxt_app_setup(nxt_task_t *task, nxt_process_t *process); static nxt_int_t nxt_app_set_environment(nxt_conf_value_t *environment); +static void nxt_proto_start_process_handler(nxt_task_t *task, + nxt_port_recv_msg_t *msg); +static void nxt_proto_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); +static void nxt_proto_process_created_handler(nxt_task_t *task, + nxt_port_recv_msg_t *msg); +static void nxt_proto_quit_children(nxt_task_t *task); +static nxt_process_t *nxt_proto_process_find(nxt_task_t *task, nxt_pid_t pid); +static void nxt_proto_process_add(nxt_task_t *task, nxt_process_t *process); +static nxt_process_t *nxt_proto_process_remove(nxt_task_t *task, nxt_pid_t pid); static u_char *nxt_cstr_dup(nxt_mp_t *mp, u_char *dst, u_char *src); +static void nxt_proto_signal_handler(nxt_task_t *task, void *obj, void *data); +static void nxt_proto_sigterm_handler(nxt_task_t *task, void *obj, void *data); +static void nxt_proto_sigchld_handler(nxt_task_t *task, void *obj, void *data); nxt_str_t nxt_server = nxt_string(NXT_SERVER); @@ -55,7 +69,12 @@ static uint32_t compat[] = { }; -static nxt_app_module_t *nxt_app; +static nxt_lvlhsh_t nxt_proto_processes; +static nxt_queue_t nxt_proto_children; +static nxt_bool_t nxt_proto_exiting; + +static nxt_app_module_t *nxt_app; +static nxt_common_app_conf_t *nxt_app_conf; static const nxt_port_handlers_t nxt_discovery_process_port_handlers = { @@ -70,6 +89,29 @@ static const nxt_port_handlers_t nxt_discovery_process_port_handlers = { }; +const nxt_sig_event_t nxt_prototype_signals[] = { + nxt_event_signal(SIGHUP, nxt_proto_signal_handler), + nxt_event_signal(SIGINT, nxt_proto_sigterm_handler), + nxt_event_signal(SIGQUIT, nxt_proto_sigterm_handler), + nxt_event_signal(SIGTERM, nxt_proto_sigterm_handler), + nxt_event_signal(SIGCHLD, nxt_proto_sigchld_handler), + nxt_event_signal_end, +}; + + +static const nxt_port_handlers_t nxt_proto_process_port_handlers = { + .quit = nxt_proto_quit_handler, + .change_file = nxt_port_change_log_file_handler, + .new_port = nxt_port_new_port_handler, + .process_created = nxt_proto_process_created_handler, + .process_ready = nxt_port_process_ready_handler, + .remove_pid = nxt_port_remove_pid_handler, + .start_process = nxt_proto_start_process_handler, + .rpc_ready = nxt_port_rpc_handler, + .rpc_error = nxt_port_rpc_handler, +}; + + static const nxt_port_handlers_t nxt_app_process_port_handlers = { .quit = nxt_signal_quit_handler, .rpc_ready = nxt_port_rpc_handler, @@ -89,12 +131,23 @@ const nxt_process_init_t nxt_discovery_process = { }; +const nxt_process_init_t nxt_proto_process = { + .type = NXT_PROCESS_PROTOTYPE, + .prefork = nxt_isolation_main_prefork, + .restart = 0, + .setup = nxt_proto_setup, + .start = nxt_proto_start, + .port_handlers = &nxt_proto_process_port_handlers, + .signals = nxt_prototype_signals, +}; + + const nxt_process_init_t nxt_app_process = { .type = NXT_PROCESS_APP, .setup = nxt_app_setup, - .prefork = nxt_isolation_main_prefork, + .start = NULL, + .prefork = NULL, .restart = 0, - .start = NULL, /* set to module->start */ .port_handlers = &nxt_app_process_port_handlers, .signals = nxt_process_signals, }; @@ -443,15 +496,18 @@ nxt_discovery_quit(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) static nxt_int_t -nxt_app_setup(nxt_task_t *task, nxt_process_t *process) +nxt_proto_setup(nxt_task_t *task, nxt_process_t *process) { nxt_int_t ret; - nxt_process_init_t *init; nxt_app_lang_module_t *lang; nxt_common_app_conf_t *app_conf; app_conf = process->data.app; + nxt_queue_init(&nxt_proto_children); + + nxt_app_conf = app_conf; + lang = nxt_app_lang_module(task->thread->runtime, &app_conf->type); if (nxt_slow_path(lang == NULL)) { nxt_alert(task, "unknown application type: \"%V\"", &app_conf->type); @@ -479,7 +535,6 @@ nxt_app_setup(nxt_task_t *task, nxt_process_t *process) if (nxt_app->setup != NULL) { ret = nxt_app->setup(task, process, app_conf); - if (nxt_slow_path(ret != NXT_OK)) { return ret; } @@ -514,13 +569,251 @@ nxt_app_setup(nxt_task_t *task, nxt_process_t *process) } } + process->state = NXT_PROCESS_STATE_CREATED; + + return NXT_OK; +} + + +static nxt_int_t +nxt_proto_start(nxt_task_t *task, nxt_process_data_t *data) +{ + nxt_debug(task, "prototype waiting for clone messages"); + + return NXT_OK; +} + + +static void +nxt_proto_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + u_char *p; + nxt_int_t ret; + nxt_port_t *port; + nxt_runtime_t *rt; + nxt_process_t *process; + nxt_process_init_t *init; + + rt = task->thread->runtime; + + process = nxt_process_new(rt); + if (nxt_slow_path(process == NULL)) { + goto failed; + } + + process->mem_pool = nxt_mp_create(1024, 128, 256, 32); + if (nxt_slow_path(process->mem_pool == NULL)) { + nxt_process_use(task, process, -1); + goto failed; + } + + process->parent_port = rt->port_by_type[NXT_PROCESS_PROTOTYPE]; + init = nxt_process_init(process); + *init = nxt_app_process; + + process->name = nxt_mp_alloc(process->mem_pool, nxt_app_conf->name.length + + sizeof("\"\" application") + 1); + + if (nxt_slow_path(process->name == NULL)) { + nxt_process_use(task, process, -1); + + goto failed; + } init->start = nxt_app->start; + init->name = (const char *) nxt_app_conf->name.start; + + p = (u_char *) process->name; + *p++ = '"'; + p = nxt_cpymem(p, nxt_app_conf->name.start, nxt_app_conf->name.length); + p = nxt_cpymem(p, "\" application", 13); + *p = '\0'; + + process->user_cred = &rt->user_cred; + + process->data.app = nxt_app_conf; + process->stream = msg->port_msg.stream; + + ret = nxt_process_start(task, process); + if (nxt_slow_path(ret == NXT_ERROR)) { + nxt_process_use(task, process, -1); + + goto failed; + } + + nxt_proto_process_add(task, process); + + return; + +failed: + + port = nxt_runtime_port_find(rt, msg->port_msg.pid, + msg->port_msg.reply_port); + + if (nxt_fast_path(port != NULL)) { + nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, + -1, msg->port_msg.stream, 0, NULL); + } +} + + +static void +nxt_proto_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + nxt_debug(task, "prototype quit handler"); + + nxt_proto_quit_children(task); + + nxt_proto_exiting = 1; + + if (nxt_queue_is_empty(&nxt_proto_children)) { + nxt_process_quit(task, 0); + } +} + + +static void +nxt_proto_quit_children(nxt_task_t *task) +{ + nxt_port_t *port; + nxt_process_t *process; + + nxt_queue_each(process, &nxt_proto_children, nxt_process_t, link) { + port = nxt_process_port_first(process); + + (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, + -1, 0, 0, NULL); + } + nxt_queue_loop; +} + + +static void +nxt_proto_process_created_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + nxt_pid_t isolated_pid, pid; + nxt_process_t *process; + + isolated_pid = nxt_recv_msg_cmsg_pid(msg); + + process = nxt_proto_process_find(task, isolated_pid); + if (nxt_slow_path(process == NULL)) { + return; + } + process->state = NXT_PROCESS_STATE_CREATED; - return NXT_OK; + pid = msg->port_msg.pid; + + if (process->pid != pid) { + nxt_debug(task, "app process %PI (aka %PI) is created", isolated_pid, + pid); + + nxt_runtime_process_remove(task->thread->runtime, process); + + process->pid = pid; + + nxt_runtime_process_add(task, process); + + } else { + nxt_debug(task, "app process %PI is created", isolated_pid); + } +} + + +static void +nxt_proto_signal_handler(nxt_task_t *task, void *obj, void *data) +{ + nxt_trace(task, "signal signo:%d (%s) received, ignored", + (int) (uintptr_t) obj, data); +} + + +static void +nxt_proto_sigterm_handler(nxt_task_t *task, void *obj, void *data) +{ + nxt_trace(task, "signal signo:%d (%s) received", + (int) (uintptr_t) obj, data); + + nxt_proto_quit_children(task); + + nxt_proto_exiting = 1; + + if (nxt_queue_is_empty(&nxt_proto_children)) { + nxt_process_quit(task, 0); + } +} + + +static void +nxt_proto_sigchld_handler(nxt_task_t *task, void *obj, void *data) +{ + int status; + nxt_err_t err; + nxt_pid_t pid; + nxt_process_t *process; + + nxt_debug(task, "proto sigchld handler signo:%d (%s)", + (int) (uintptr_t) obj, data); + + for ( ;; ) { + pid = waitpid(-1, &status, WNOHANG); + + if (pid == -1) { + + switch (err = nxt_errno) { + + case NXT_ECHILD: + return; + + case NXT_EINTR: + continue; + + default: + nxt_alert(task, "waitpid() failed: %E", err); + return; + } + } + + nxt_debug(task, "waitpid(): %PI", pid); + + if (pid == 0) { + return; + } + + if (WTERMSIG(status)) { +#ifdef WCOREDUMP + nxt_alert(task, "app process (isolated %PI) exited on signal %d%s", + pid, WTERMSIG(status), + WCOREDUMP(status) ? " (core dumped)" : ""); +#else + nxt_alert(task, "app process (isolated %PI) exited on signal %d", + pid, WTERMSIG(status)); +#endif + + } else { + nxt_trace(task, "app process (isolated %PI) exited with code %d", + pid, WEXITSTATUS(status)); + } + + process = nxt_proto_process_remove(task, pid); + if (process == NULL) { + continue; + } + + if (process->state != NXT_PROCESS_STATE_CREATING) { + nxt_port_remove_notify_others(task, process); + } + + nxt_process_close_ports(task, process); + + if (nxt_proto_exiting && nxt_queue_is_empty(&nxt_proto_children)) { + nxt_process_quit(task, 0); + return; + } + } } @@ -619,6 +912,19 @@ nxt_cstr_dup(nxt_mp_t *mp, u_char *dst, u_char *src) } +static nxt_int_t +nxt_app_setup(nxt_task_t *task, nxt_process_t *process) +{ + nxt_process_init_t *init; + + process->state = NXT_PROCESS_STATE_CREATED; + + init = nxt_process_init(process); + + return init->start(task, &process->data); +} + + nxt_app_lang_module_t * nxt_app_lang_module(nxt_runtime_t *rt, nxt_str_t *name) { @@ -707,15 +1013,15 @@ nxt_int_t nxt_unit_default_init(nxt_task_t *task, nxt_unit_init_t *init, nxt_common_app_conf_t *conf) { - nxt_port_t *my_port, *main_port, *router_port; + nxt_port_t *my_port, *proto_port, *router_port; nxt_runtime_t *rt; nxt_memzero(init, sizeof(nxt_unit_init_t)); rt = task->thread->runtime; - main_port = rt->port_by_type[NXT_PROCESS_MAIN]; - if (nxt_slow_path(main_port == NULL)) { + proto_port = rt->port_by_type[NXT_PROCESS_PROTOTYPE]; + if (nxt_slow_path(proto_port == NULL)) { return NXT_ERROR; } @@ -729,10 +1035,10 @@ nxt_unit_default_init(nxt_task_t *task, nxt_unit_init_t *init, return NXT_ERROR; } - init->ready_port.id.pid = main_port->pid; - init->ready_port.id.id = main_port->id; + init->ready_port.id.pid = proto_port->pid; + init->ready_port.id.id = proto_port->id; init->ready_port.in_fd = -1; - init->ready_port.out_fd = main_port->pair[1]; + init->ready_port.out_fd = proto_port->pair[1]; init->ready_stream = my_port->process->stream; @@ -753,3 +1059,122 @@ nxt_unit_default_init(nxt_task_t *task, nxt_unit_init_t *init, return NXT_OK; } + + +static nxt_int_t +nxt_proto_lvlhsh_isolated_pid_test(nxt_lvlhsh_query_t *lhq, void *data) +{ + nxt_pid_t *qpid; + nxt_process_t *process; + + process = data; + qpid = (nxt_pid_t *) lhq->key.start; + + if (*qpid == process->isolated_pid) { + return NXT_OK; + } + + return NXT_DECLINED; +} + + +static const nxt_lvlhsh_proto_t lvlhsh_processes_proto nxt_aligned(64) = { + NXT_LVLHSH_DEFAULT, + nxt_proto_lvlhsh_isolated_pid_test, + nxt_lvlhsh_alloc, + nxt_lvlhsh_free, +}; + + +nxt_inline void +nxt_proto_process_lhq_pid(nxt_lvlhsh_query_t *lhq, nxt_pid_t *pid) +{ + lhq->key_hash = nxt_murmur_hash2(pid, sizeof(nxt_pid_t)); + lhq->key.length = sizeof(nxt_pid_t); + lhq->key.start = (u_char *) pid; + lhq->proto = &lvlhsh_processes_proto; +} + + +static void +nxt_proto_process_add(nxt_task_t *task, nxt_process_t *process) +{ + nxt_runtime_t *rt; + nxt_lvlhsh_query_t lhq; + + rt = task->thread->runtime; + + nxt_proto_process_lhq_pid(&lhq, &process->isolated_pid); + + lhq.replace = 0; + lhq.value = process; + lhq.pool = rt->mem_pool; + + switch (nxt_lvlhsh_insert(&nxt_proto_processes, &lhq)) { + + case NXT_OK: + nxt_debug(task, "process (isolated %PI) added", process->isolated_pid); + + nxt_queue_insert_tail(&nxt_proto_children, &process->link); + break; + + default: + nxt_debug(task, "process (isolated %PI) failed to add", + process->isolated_pid); + break; + } +} + + +static nxt_process_t * +nxt_proto_process_remove(nxt_task_t *task, nxt_pid_t pid) +{ + nxt_runtime_t *rt; + nxt_process_t *process; + nxt_lvlhsh_query_t lhq; + + nxt_proto_process_lhq_pid(&lhq, &pid); + + rt = task->thread->runtime; + + lhq.pool = rt->mem_pool; + + switch (nxt_lvlhsh_delete(&nxt_proto_processes, &lhq)) { + + case NXT_OK: + nxt_debug(task, "process (isolated %PI) removed", pid); + + process = lhq.value; + + nxt_queue_remove(&process->link); + break; + + default: + nxt_debug(task, "process (isolated %PI) remove failed", pid); + process = NULL; + break; + } + + return process; +} + + +static nxt_process_t * +nxt_proto_process_find(nxt_task_t *task, nxt_pid_t pid) +{ + nxt_process_t *process; + nxt_lvlhsh_query_t lhq; + + nxt_proto_process_lhq_pid(&lhq, &pid); + + if (nxt_lvlhsh_find(&nxt_proto_processes, &lhq) == NXT_OK) { + process = lhq.value; + + } else { + nxt_debug(task, "process (isolated %PI) not found", pid); + + process = NULL; + } + + return process; +} |