diff options
author | Tiago Natel de Moura <t.nateldemoura@f5.com> | 2020-03-09 16:28:25 +0000 |
---|---|---|
committer | Tiago Natel de Moura <t.nateldemoura@f5.com> | 2020-03-09 16:28:25 +0000 |
commit | e9e5ddd5a5d9ce99768833137eac2551a710becf (patch) | |
tree | 940e591004b16216d5122cd12b367f3ebf15e0d3 /src/nxt_main_process.c | |
parent | aacf11152c314efb1895b6d44ba72dc9f1801c7d (diff) | |
download | unit-e9e5ddd5a5d9ce99768833137eac2551a710becf.tar.gz unit-e9e5ddd5a5d9ce99768833137eac2551a710becf.tar.bz2 |
Refactor of process management.
The process abstraction has changed to:
setup(task, process)
start(task, process_data)
prefork(task, process, mp)
The prefork() occurs in the main process right before fork.
The file src/nxt_main_process.c is completely free of process
specific logic.
The creation of a process now supports a PROCESS_CREATED state. The
The setup() function of each process can set its state to either
created or ready. If created, a MSG_PROCESS_CREATED is sent to main
process, where external setup can be done (required for rootfs under
container).
The core processes (discovery, controller and router) doesn't need
external setup, then they all proceeds to their start() function
straight away.
In the case of applications, the load of the module happens at the
process setup() time and The module's init() function has changed
to be the start() of the process.
The module API has changed to:
setup(task, process, conf)
start(task, data)
As a direct benefit of the PROCESS_CREATED message, the clone(2) of
processes using pid namespaces now doesn't need to create a pipe
to make the child block until parent setup uid/gid mappings nor it
needs to receive the child pid.
Diffstat (limited to 'src/nxt_main_process.c')
-rw-r--r-- | src/nxt_main_process.c | 1049 |
1 files changed, 251 insertions, 798 deletions
diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c index c35954c0..0dff050b 100644 --- a/src/nxt_main_process.c +++ b/src/nxt_main_process.c @@ -36,20 +36,11 @@ extern nxt_port_handlers_t nxt_router_process_port_handlers; static nxt_int_t nxt_main_process_port_create(nxt_task_t *task, nxt_runtime_t *rt); static void nxt_main_process_title(nxt_task_t *task); -static nxt_int_t nxt_main_start_controller_process(nxt_task_t *task, - nxt_runtime_t *rt); -static nxt_int_t nxt_main_create_controller_process(nxt_task_t *task, - nxt_runtime_t *rt, nxt_process_init_t *init); -static nxt_int_t nxt_main_create_router_process(nxt_task_t *task, nxt_runtime_t *rt, - nxt_process_init_t *init); -static nxt_int_t nxt_main_start_router_process(nxt_task_t *task, - nxt_runtime_t *rt); -static nxt_int_t nxt_main_start_discovery_process(nxt_task_t *task, - nxt_runtime_t *rt); -static nxt_int_t nxt_main_start_worker_process(nxt_task_t *task, - nxt_runtime_t *rt, nxt_common_app_conf_t *app_conf, uint32_t stream); -static nxt_int_t nxt_main_create_worker_process(nxt_task_t *task, - nxt_runtime_t *rt, nxt_process_init_t *init); +static nxt_int_t nxt_main_process_create(nxt_task_t *task, + const nxt_process_init_t init); +static nxt_int_t nxt_main_start_process(nxt_task_t *task, + nxt_process_t *process); +static nxt_process_t *nxt_main_process_new(nxt_task_t *task, nxt_runtime_t *rt); static void nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj, void *data); static void nxt_main_process_sigquit_handler(nxt_task_t *task, void *obj, @@ -60,8 +51,7 @@ static void nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data); static void nxt_main_process_signal_handler(nxt_task_t *task, void *obj, void *data); -static void nxt_main_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid); -static void nxt_main_stop_worker_processes(nxt_task_t *task, nxt_runtime_t *rt); +static void nxt_main_cleanup_process(nxt_task_t *task, nxt_pid_t pid); static void nxt_main_port_socket_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); static nxt_int_t nxt_main_listening_socket(nxt_sockaddr_t *sa, @@ -73,29 +63,6 @@ static void nxt_main_port_conf_store_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); static void nxt_main_port_access_log_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); -static nxt_process_init_t *nxt_process_init_create(nxt_task_t *task, - nxt_process_type_t type, const nxt_str_t *name); -static nxt_int_t nxt_process_init_name_set(nxt_process_init_t *init, - nxt_process_type_t type, const nxt_str_t *name); -static nxt_int_t nxt_process_init_creds_set(nxt_task_t *task, - nxt_process_init_t *init, nxt_str_t *user, nxt_str_t *group); - -static nxt_int_t nxt_init_isolation(nxt_task_t *task, - nxt_conf_value_t *isolation, nxt_process_init_t *init); -#if (NXT_HAVE_CLONE) -static nxt_int_t nxt_init_clone_flags(nxt_task_t *task, - nxt_conf_value_t *namespaces, nxt_process_init_t *init); -#endif - -#if (NXT_HAVE_CLONE_NEWUSER) -static nxt_int_t nxt_init_isolation_creds(nxt_task_t *task, - nxt_conf_value_t *isolation, nxt_process_init_t *init); -static nxt_int_t nxt_init_vldt_isolation_creds(nxt_task_t *task, - nxt_process_init_t *init); -static nxt_int_t nxt_init_isolation_credential_map(nxt_task_t *task, - nxt_mp_t *mem_pool, nxt_conf_value_t *map_array, - nxt_clone_credential_map_t *map); -#endif const nxt_sig_event_t nxt_main_process_signals[] = { nxt_event_signal(SIGHUP, nxt_main_process_signal_handler), @@ -108,54 +75,6 @@ const nxt_sig_event_t nxt_main_process_signals[] = { }; -static const nxt_port_handlers_t nxt_app_process_port_handlers = { - .new_port = nxt_port_new_port_handler, - .change_file = nxt_port_change_log_file_handler, - .mmap = nxt_port_mmap_handler, - .remove_pid = nxt_port_remove_pid_handler, -}; - - -static const nxt_port_handlers_t nxt_discovery_process_port_handlers = { - .quit = nxt_worker_process_quit_handler, - .new_port = nxt_port_new_port_handler, - .change_file = nxt_port_change_log_file_handler, - .mmap = nxt_port_mmap_handler, - .data = nxt_port_data_handler, - .remove_pid = nxt_port_remove_pid_handler, - .rpc_ready = nxt_port_rpc_handler, - .rpc_error = nxt_port_rpc_handler, -}; - - -static const nxt_port_handlers_t *nxt_process_port_handlers[NXT_PROCESS_MAX] = -{ - NULL, - &nxt_discovery_process_port_handlers, - &nxt_controller_process_port_handlers, - &nxt_router_process_port_handlers, - &nxt_app_process_port_handlers -}; - - -static const nxt_process_start_t nxt_process_starts[NXT_PROCESS_MAX] = { - NULL, - nxt_discovery_start, - nxt_controller_start, - nxt_router_start, - nxt_app_start -}; - - -static const nxt_process_restart_t nxt_process_restarts[NXT_PROCESS_MAX] = { - NULL, - NULL, - &nxt_main_create_controller_process, - &nxt_main_create_router_process, - NULL -}; - - static nxt_bool_t nxt_exiting; @@ -172,11 +91,11 @@ nxt_main_process_start(nxt_thread_t *thr, nxt_task_t *task, nxt_main_process_title(task); /* - * The dicsovery process will send a message processed by + * The discovery process will send a message processed by * nxt_main_port_modules_handler() which starts the controller * and router processes. */ - return nxt_main_start_discovery_process(task, rt); + return nxt_main_process_create(task, nxt_discovery_process); } @@ -350,48 +269,71 @@ nxt_port_main_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) static void -nxt_port_main_start_worker_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +nxt_port_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { - u_char *start, ch; + u_char *start, *p, ch; size_t type_len; - nxt_mp_t *mp; nxt_int_t ret; nxt_buf_t *b; nxt_port_t *port; nxt_runtime_t *rt; + nxt_process_t *process; nxt_app_type_t idx; nxt_conf_value_t *conf; - nxt_common_app_conf_t app_conf; + nxt_process_init_t *init; + nxt_common_app_conf_t *app_conf; ret = NXT_ERROR; - mp = nxt_mp_create(1024, 128, 256, 32); + rt = task->thread->runtime; - if (nxt_slow_path(mp == NULL)) { + process = nxt_main_process_new(task, rt); + if (nxt_slow_path(process == NULL)) { return; } - b = nxt_buf_chk_make_plain(mp, msg->buf, msg->size); + init = nxt_process_init(process); + + *init = nxt_app_process; + b = nxt_buf_chk_make_plain(process->mem_pool, msg->buf, msg->size); if (b == NULL) { - return; + goto failed; } - nxt_debug(task, "main start worker: %*s", b->mem.free - b->mem.pos, + nxt_debug(task, "main start process: %*s", b->mem.free - b->mem.pos, b->mem.pos); - nxt_memzero(&app_conf, sizeof(nxt_common_app_conf_t)); + app_conf = nxt_mp_zalloc(process->mem_pool, sizeof(nxt_common_app_conf_t)); + if (nxt_slow_path(app_conf == NULL)) { + goto failed; + } start = b->mem.pos; - app_conf.name.start = start; - app_conf.name.length = nxt_strlen(start); - app_conf.shm_limit = 100 * 1024 * 1024; + app_conf->name.start = start; + app_conf->name.length = nxt_strlen(start); + + init->name = (const char *) start; - start += app_conf.name.length + 1; + process->name = nxt_mp_alloc(process->mem_pool, app_conf->name.length + + sizeof("\"\" application") + 1); - conf = nxt_conf_json_parse(mp, start, b->mem.free, NULL); + if (nxt_slow_path(process->name == NULL)) { + goto failed; + } + + p = (u_char *) process->name; + *p++ = '"'; + p = nxt_cpymem(p, init->name, app_conf->name.length); + p = nxt_cpymem(p, "\" application", 13); + *p = '\0'; + app_conf->shm_limit = 100 * 1024 * 1024; + + start += app_conf->name.length + 1; + + conf = nxt_conf_json_parse(process->mem_pool, start, b->mem.free, NULL); if (conf == NULL) { nxt_alert(task, "router app configuration parsing error"); @@ -400,44 +342,45 @@ nxt_port_main_start_worker_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) rt = task->thread->runtime; - app_conf.user.start = (u_char*)rt->user_cred.user; - app_conf.user.length = nxt_strlen(rt->user_cred.user); + app_conf->user.start = (u_char*)rt->user_cred.user; + app_conf->user.length = nxt_strlen(rt->user_cred.user); + + ret = nxt_conf_map_object(process->mem_pool, conf, nxt_common_app_conf, + nxt_nitems(nxt_common_app_conf), app_conf); - ret = nxt_conf_map_object(mp, conf, nxt_common_app_conf, - nxt_nitems(nxt_common_app_conf), &app_conf); if (ret != NXT_OK) { nxt_alert(task, "failed to map common app conf received from router"); goto failed; } - for (type_len = 0; type_len != app_conf.type.length; type_len++) { - ch = app_conf.type.start[type_len]; + for (type_len = 0; type_len != app_conf->type.length; type_len++) { + ch = app_conf->type.start[type_len]; if (ch == ' ' || nxt_isdigit(ch)) { break; } } - idx = nxt_app_parse_type(app_conf.type.start, type_len); + idx = nxt_app_parse_type(app_conf->type.start, type_len); if (nxt_slow_path(idx >= nxt_nitems(nxt_app_maps))) { nxt_alert(task, "invalid app type %d received from router", (int) idx); goto failed; } - ret = nxt_conf_map_object(mp, conf, nxt_app_maps[idx].map, - nxt_app_maps[idx].size, &app_conf); + ret = nxt_conf_map_object(process->mem_pool, conf, nxt_app_maps[idx].map, + nxt_app_maps[idx].size, app_conf); if (nxt_slow_path(ret != NXT_OK)) { nxt_alert(task, "failed to map app conf received from router"); goto failed; } - if (app_conf.limits != NULL) { - ret = nxt_conf_map_object(mp, app_conf.limits, + if (app_conf->limits != NULL) { + ret = nxt_conf_map_object(process->mem_pool, app_conf->limits, nxt_common_app_limits_conf, nxt_nitems(nxt_common_app_limits_conf), - &app_conf); + app_conf); if (nxt_slow_path(ret != NXT_OK)) { nxt_alert(task, "failed to map app limits received from router"); @@ -445,40 +388,91 @@ nxt_port_main_start_worker_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) } } - app_conf.self = conf; + app_conf->self = conf; - ret = nxt_main_start_worker_process(task, task->thread->runtime, - &app_conf, msg->port_msg.stream); + process->stream = msg->port_msg.stream; + process->data.app = app_conf; + + ret = nxt_main_start_process(task, process); + if (nxt_fast_path(ret == NXT_OK || ret == NXT_AGAIN)) { + return; + } failed: - if (ret == NXT_ERROR) { - port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid, - msg->port_msg.reply_port); - if (nxt_fast_path(port != NULL)) { - nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, - -1, msg->port_msg.stream, 0, NULL); - } + nxt_process_use(task, process, -1); + + port = nxt_runtime_port_find(rt, msg->port_msg.pid, + msg->port_msg.reply_port); + + if (nxt_fast_path(port != NULL)) { + nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, + -1, msg->port_msg.stream, 0, NULL); } +} - nxt_mp_destroy(mp); + +static void +nxt_main_process_created_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + nxt_port_t *port; + nxt_process_t *process; + nxt_runtime_t *rt; + + rt = task->thread->runtime; + + process = nxt_runtime_process_find(rt, msg->port_msg.pid); + if (nxt_slow_path(process == NULL)) { + return; + } + + nxt_assert(process->state == NXT_PROCESS_STATE_CREATING); + + port = nxt_runtime_port_find(rt, msg->port_msg.pid, + msg->port_msg.reply_port); + + + if (nxt_slow_path(port == NULL)) { + return; + } + +#if (NXT_HAVE_CLONE && NXT_HAVE_CLONE_NEWUSER) + if (nxt_is_clone_flag_set(process->isolation.clone.flags, NEWUSER)) { + if (nxt_slow_path(nxt_clone_credential_map(task, process->pid, + process->user_cred, + &process->isolation.clone) + != NXT_OK)) + { + (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, + -1, msg->port_msg.stream, 0, NULL); + return; + } + } + +#endif + + process->state = NXT_PROCESS_STATE_CREATED; + + (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_READY_LAST, + -1, msg->port_msg.stream, 0, NULL); } static nxt_port_handlers_t nxt_main_process_port_handlers = { - .data = nxt_port_main_data_handler, - .process_ready = nxt_port_process_ready_handler, - .start_worker = nxt_port_main_start_worker_handler, - .socket = nxt_main_port_socket_handler, - .modules = nxt_main_port_modules_handler, - .conf_store = nxt_main_port_conf_store_handler, + .data = nxt_port_main_data_handler, + .process_created = nxt_main_process_created_handler, + .process_ready = nxt_port_process_ready_handler, + .start_process = nxt_port_main_start_process_handler, + .socket = nxt_main_port_socket_handler, + .modules = nxt_main_port_modules_handler, + .conf_store = nxt_main_port_conf_store_handler, #if (NXT_TLS) - .cert_get = nxt_cert_store_get_handler, - .cert_delete = nxt_cert_store_delete_handler, + .cert_get = nxt_cert_store_get_handler, + .cert_delete = nxt_cert_store_delete_handler, #endif - .access_log = nxt_main_port_access_log_handler, - .rpc_ready = nxt_port_rpc_handler, - .rpc_error = nxt_port_rpc_handler, + .access_log = nxt_main_port_access_log_handler, + .rpc_ready = nxt_port_rpc_handler, + .rpc_error = nxt_port_rpc_handler, }; @@ -499,16 +493,17 @@ nxt_main_process_port_create(nxt_task_t *task, nxt_runtime_t *rt) ret = nxt_port_socket_init(task, port, 0); if (nxt_slow_path(ret != NXT_OK)) { + nxt_port_use(task, port, -1); return ret; } /* * A main process port. A write port is not closed - * since it should be inherited by worker processes. + * since it should be inherited by processes. */ nxt_port_enable(task, port, &nxt_main_process_port_handlers); - process->ready = 1; + process->state = NXT_PROCESS_STATE_READY; return NXT_OK; } @@ -541,234 +536,68 @@ nxt_main_process_title(nxt_task_t *task) static nxt_int_t -nxt_main_start_controller_process(nxt_task_t *task, nxt_runtime_t *rt) +nxt_main_process_create(nxt_task_t *task, const nxt_process_init_t init) { - nxt_process_init_t *init; + nxt_int_t ret; + nxt_runtime_t *rt; + nxt_process_t *process; + nxt_process_init_t *pinit; - static const nxt_str_t name = nxt_string("controller"); + rt = task->thread->runtime; - init = nxt_process_init_create(task, NXT_PROCESS_CONTROLLER, &name); - if (nxt_slow_path(init == NULL)) { + process = nxt_main_process_new(task, rt); + if (nxt_slow_path(process == NULL)) { return NXT_ERROR; } - return nxt_main_create_controller_process(task, rt, init);; -} + process->name = init.name; + process->user_cred = &rt->user_cred; + pinit = nxt_process_init(process); + *pinit = init; -static nxt_int_t -nxt_main_create_controller_process(nxt_task_t *task, nxt_runtime_t *rt, - nxt_process_init_t *init) -{ - ssize_t n; - nxt_int_t ret; - nxt_str_t *conf; - nxt_file_t file; - nxt_file_info_t fi; - nxt_controller_init_t ctrl_init; - - nxt_memzero(&ctrl_init, sizeof(nxt_controller_init_t)); - - conf = &ctrl_init.conf; - - nxt_memzero(&file, sizeof(nxt_file_t)); - - file.name = (nxt_file_name_t *) rt->conf; - - ret = nxt_file_open(task, &file, NXT_FILE_RDONLY, NXT_FILE_OPEN, 0); - - if (ret == NXT_OK) { - ret = nxt_file_info(&file, &fi); - - if (nxt_fast_path(ret == NXT_OK && nxt_is_file(&fi))) { - conf->length = nxt_file_size(&fi); - conf->start = nxt_malloc(conf->length); - - if (nxt_slow_path(conf->start == NULL)) { - nxt_file_close(task, &file); - return NXT_ERROR; - } - - n = nxt_file_read(&file, conf->start, conf->length, 0); - - if (nxt_slow_path(n != (ssize_t) conf->length)) { - nxt_free(conf->start); - conf->start = NULL; - - nxt_alert(task, "failed to restore previous configuration: " - "cannot read the file"); - } - } - - nxt_file_close(task, &file); - } - -#if (NXT_TLS) - ctrl_init.certs = nxt_cert_store_load(task); -#endif - - init->data = &ctrl_init; - - ret = nxt_main_create_worker_process(task, rt, init); - - if (ret == NXT_OK) { - if (conf->start != NULL) { - nxt_free(conf->start); - } - -#if (NXT_TLS) - if (ctrl_init.certs != NULL) { - nxt_cert_store_release(ctrl_init.certs); - } -#endif + ret = nxt_main_start_process(task, process); + if (nxt_slow_path(ret == NXT_ERROR)) { + nxt_process_use(task, process, -1); } return ret; } -static nxt_int_t -nxt_main_start_discovery_process(nxt_task_t *task, nxt_runtime_t *rt) +static nxt_process_t * +nxt_main_process_new(nxt_task_t *task, nxt_runtime_t *rt) { - nxt_process_init_t *init; - - static const nxt_str_t name = nxt_string("discovery"); + nxt_process_t *process; - init = nxt_process_init_create(task, NXT_PROCESS_DISCOVERY, &name); - if (nxt_slow_path(init == NULL)) { - return NXT_ERROR; + process = nxt_runtime_process_new(rt); + if (nxt_slow_path(process == NULL)) { + return NULL; } - return nxt_main_create_worker_process(task, rt, init); -} - - -static nxt_int_t -nxt_main_start_router_process(nxt_task_t *task, nxt_runtime_t *rt) -{ - nxt_process_init_t *init; - - static const nxt_str_t name = nxt_string("router"); - - init = nxt_process_init_create(task, NXT_PROCESS_ROUTER, &name); - if (nxt_slow_path(init == NULL)) { - return NXT_ERROR; + process->mem_pool = nxt_mp_create(1024, 128, 256, 32); + if (process->mem_pool == NULL) { + nxt_process_use(task, process, -1); + return NULL; } - return nxt_main_create_router_process(task, rt, init); + return process; } static nxt_int_t -nxt_main_create_router_process(nxt_task_t *task, nxt_runtime_t *rt, - nxt_process_init_t *init) +nxt_main_start_process(nxt_task_t *task, nxt_process_t *process) { - nxt_main_stop_worker_processes(task, rt); - - return nxt_main_create_worker_process(task, rt, init); -} - - -static nxt_int_t -nxt_main_start_worker_process(nxt_task_t *task, nxt_runtime_t *rt, - nxt_common_app_conf_t *app_conf, uint32_t stream) -{ - nxt_int_t cap_setid; + nxt_mp_t *tmp_mp; nxt_int_t ret; + nxt_pid_t pid; + nxt_port_t *port; nxt_process_init_t *init; - init = nxt_process_init_create(task, NXT_PROCESS_WORKER, &app_conf->name); - if (nxt_slow_path(init == NULL)) { - return NXT_ERROR; - } - - cap_setid = rt->capabilities.setid; - - if (app_conf->isolation != NULL) { - ret = nxt_init_isolation(task, app_conf->isolation, init); - if (nxt_slow_path(ret != NXT_OK)) { - goto fail; - } - } - -#if (NXT_HAVE_CLONE_NEWUSER) - if (NXT_CLONE_USER(init->isolation.clone.flags)) { - cap_setid = 1; - } -#endif - - if (cap_setid) { - ret = nxt_process_init_creds_set(task, init, &app_conf->user, - &app_conf->group); - if (nxt_slow_path(ret != NXT_OK)) { - goto fail; - } - - } else { - if (!nxt_str_eq(&app_conf->user, (u_char *) rt->user_cred.user, - nxt_strlen(rt->user_cred.user))) - { - nxt_alert(task, "cannot set user \"%V\" for app \"%V\": " - "missing capabilities", &app_conf->user, &app_conf->name); - goto fail; - } - - if (app_conf->group.length > 0 - && !nxt_str_eq(&app_conf->group, (u_char *) rt->group, - nxt_strlen(rt->group))) - { - nxt_alert(task, "cannot set group \"%V\" for app \"%V\": " - "missing capabilities", &app_conf->group, - &app_conf->name); - goto fail; - } - } - - init->data = app_conf; - init->stream = stream; - -#if (NXT_HAVE_CLONE_NEWUSER) - ret = nxt_init_vldt_isolation_creds(task, init); - if (nxt_slow_path(ret != NXT_OK)) { - goto fail; - } -#endif - - return nxt_main_create_worker_process(task, rt, init); - -fail: - - nxt_mp_destroy(init->mem_pool); - - return NXT_ERROR; -} - - -nxt_int_t -nxt_main_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt, - nxt_process_init_t *init) -{ - nxt_int_t ret; - nxt_pid_t pid; - nxt_port_t *port; - nxt_process_t *process; - - /* - * TODO: remove process, init, ports from array on memory and fork failures. - */ - - process = nxt_runtime_process_new(rt); - if (nxt_slow_path(process == NULL)) { - nxt_mp_destroy(init->mem_pool); - - return NXT_ERROR; - } - - process->init = init; + init = nxt_process_init(process); port = nxt_port_new(task, 0, 0, init->type); if (nxt_slow_path(port == NULL)) { - nxt_process_use(task, process, -1); return NXT_ERROR; } @@ -776,10 +605,24 @@ nxt_main_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt, nxt_process_use(task, process, -1); + ret = NXT_ERROR; + tmp_mp = NULL; + ret = nxt_port_socket_init(task, port, 0); if (nxt_slow_path(ret != NXT_OK)) { - nxt_port_use(task, port, -1); - return ret; + goto fail; + } + + tmp_mp = nxt_mp_create(1024, 128, 256, 32); + if (tmp_mp == NULL) { + goto fail; + } + + if (init->prefork) { + ret = init->prefork(task, process, tmp_mp); + if (nxt_slow_path(ret != NXT_OK)) { + goto fail; + } } pid = nxt_process_create(task, process); @@ -788,15 +631,13 @@ nxt_main_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt, case -1: nxt_port_close(task, port); - nxt_port_use(task, port, -1); - - return NXT_ERROR; + break; case 0: - /* A worker process, return to the event engine work queue loop. */ - nxt_port_use(task, port, -1); + /* The child process: return to the event engine work queue loop. */ - return NXT_AGAIN; + ret = NXT_AGAIN; + break; default: /* The main process created a new process. */ @@ -804,35 +645,22 @@ nxt_main_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt, nxt_port_read_close(port); nxt_port_write_enable(task, port); - nxt_port_use(task, port, -1); - - return NXT_OK; + ret = NXT_OK; + break; } -} - -void -nxt_main_stop_all_processes(nxt_task_t *task, nxt_runtime_t *rt) -{ - nxt_port_t *port; - nxt_process_t *process; - - nxt_runtime_process_each(rt, process) { - - if (nxt_pid != process->pid) { - nxt_process_port_each(process, port) { +fail: - (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, - -1, 0, 0, NULL); + nxt_port_use(task, port, -1); - } nxt_process_port_loop; - } + if (nxt_fast_path(tmp_mp != NULL)) { + nxt_mp_destroy(tmp_mp); + } - } nxt_runtime_process_loop; + return ret; } - static void nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj, void *data) { @@ -1009,7 +837,7 @@ nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data) pid, WEXITSTATUS(status)); } - nxt_main_cleanup_worker_process(task, pid); + nxt_main_cleanup_process(task, pid); } } @@ -1023,102 +851,81 @@ nxt_main_process_signal_handler(nxt_task_t *task, void *obj, void *data) static void -nxt_main_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid) +nxt_main_cleanup_process(nxt_task_t *task, nxt_pid_t pid) { - nxt_buf_t *buf; - nxt_port_t *port; - nxt_runtime_t *rt; - nxt_process_t *process; - nxt_process_type_t ptype; - nxt_process_init_t *init; - nxt_process_restart_t restart; + int stream; + nxt_int_t ret; + nxt_buf_t *buf; + nxt_port_t *port; + const char *name; + nxt_runtime_t *rt; + nxt_process_t *process; + nxt_process_init_t init; rt = task->thread->runtime; process = nxt_runtime_process_find(rt, pid); + if (!process) { + return; + } - if (process) { - init = process->init; - process->init = NULL; + name = process->name; + stream = process->stream; + init = *((nxt_process_init_t *) nxt_process_init(process)); - ptype = nxt_process_type(process); - restart = nxt_process_restarts[ptype]; + if (process->state == NXT_PROCESS_STATE_READY) { + process->stream = 0; + } - if (process->ready) { - init->stream = 0; - } + nxt_process_close_ports(task, process); - nxt_process_close_ports(task, process); + if (nxt_exiting) { + if (rt->nprocesses <= 1) { + nxt_runtime_quit(task, 0); + } - if (nxt_exiting) { - nxt_mp_destroy(init->mem_pool); + return; + } - if (rt->nprocesses <= 2) { - nxt_runtime_quit(task, 0); - } + nxt_runtime_process_each(rt, process) { - return; + if (process->pid == nxt_pid + || process->pid == pid + || nxt_queue_is_empty(&process->ports)) + { + continue; } - nxt_runtime_process_each(rt, process) { + port = nxt_process_port_first(process); - if (process->pid == nxt_pid - || process->pid == pid - || nxt_queue_is_empty(&process->ports)) - { - continue; - } - - port = nxt_process_port_first(process); + if (nxt_proc_remove_notify_matrix[init.type][port->type] == 0) { + continue; + } - if (nxt_proc_remove_notify_matrix[ptype][port->type] == 0) { - continue; - } + buf = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, + sizeof(pid)); - buf = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, - sizeof(pid)); - if (nxt_slow_path(buf == NULL)) { - continue; - } + if (nxt_slow_path(buf == NULL)) { + continue; + } - buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(pid)); + buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(pid)); - nxt_port_socket_write(task, port, NXT_PORT_MSG_REMOVE_PID, - -1, init->stream, 0, buf); - } nxt_runtime_process_loop; + nxt_port_socket_write(task, port, NXT_PORT_MSG_REMOVE_PID, -1, + stream, 0, buf); - if (restart != NULL) { - restart(task, rt, init); + } nxt_runtime_process_loop; - } else { - nxt_mp_destroy(init->mem_pool); + if (init.restart) { + ret = nxt_main_process_create(task, init); + if (nxt_slow_path(ret == NXT_ERROR)) { + nxt_alert(task, "failed to restart %s", name); } } } static void -nxt_main_stop_worker_processes(nxt_task_t *task, nxt_runtime_t *rt) -{ - nxt_port_t *port; - nxt_process_t *process; - - nxt_runtime_process_each(rt, process) { - - nxt_process_port_each(process, port) { - - if (port->type == NXT_PROCESS_WORKER) { - (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, - -1, 0, 0, NULL); - } - - } nxt_process_port_loop; - - } nxt_runtime_process_loop; -} - - -static void nxt_main_port_socket_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { size_t size; @@ -1419,10 +1226,15 @@ fail: nxt_mp_destroy(mp); - ret = nxt_main_start_controller_process(task, rt); - + ret = nxt_main_process_create(task, nxt_controller_process); if (ret == NXT_OK) { - (void) nxt_main_start_router_process(task, rt); + ret = nxt_main_process_create(task, nxt_router_process); + } + + if (nxt_slow_path(ret == NXT_ERROR)) { + nxt_exiting = 1; + + nxt_runtime_quit(task, 1); } } @@ -1534,362 +1346,3 @@ nxt_main_port_access_log_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) msg->port_msg.stream, 0, NULL); } } - - -static nxt_int_t -nxt_init_isolation(nxt_task_t *task, nxt_conf_value_t *isolation, - nxt_process_init_t *init) -{ -#if (NXT_HAVE_CLONE) - nxt_int_t ret; - nxt_conf_value_t *obj; - - static nxt_str_t nsname = nxt_string("namespaces"); - - obj = nxt_conf_get_object_member(isolation, &nsname, NULL); - if (obj != NULL) { - ret = nxt_init_clone_flags(task, obj, init); - if (nxt_slow_path(ret != NXT_OK)) { - return NXT_ERROR; - } - } -#endif - -#if (NXT_HAVE_CLONE_NEWUSER) - ret = nxt_init_isolation_creds(task, isolation, init); - if (nxt_slow_path(ret != NXT_OK)) { - return NXT_ERROR; - } -#endif - - return NXT_OK; -} - - -#if (NXT_HAVE_CLONE_NEWUSER) - -static nxt_int_t -nxt_init_isolation_creds(nxt_task_t *task, nxt_conf_value_t *isolation, - nxt_process_init_t *init) -{ - nxt_int_t ret; - nxt_clone_t *clone; - nxt_conf_value_t *array; - - static nxt_str_t uidname = nxt_string("uidmap"); - static nxt_str_t gidname = nxt_string("gidmap"); - - clone = &init->isolation.clone; - - array = nxt_conf_get_object_member(isolation, &uidname, NULL); - if (array != NULL) { - ret = nxt_init_isolation_credential_map(task, init->mem_pool, array, - &clone->uidmap); - - if (nxt_slow_path(ret != NXT_OK)) { - return NXT_ERROR; - } - } - - array = nxt_conf_get_object_member(isolation, &gidname, NULL); - if (array != NULL) { - ret = nxt_init_isolation_credential_map(task, init->mem_pool, array, - &clone->gidmap); - - if (nxt_slow_path(ret != NXT_OK)) { - return NXT_ERROR; - } - } - - return NXT_OK; -} - - -static nxt_int_t -nxt_init_vldt_isolation_creds(nxt_task_t *task, nxt_process_init_t *init) -{ - nxt_int_t ret; - nxt_clone_t *clone; - - clone = &init->isolation.clone; - - if (clone->uidmap.size == 0 && clone->gidmap.size == 0) { - return NXT_OK; - } - - if (!NXT_CLONE_USER(clone->flags)) { - if (nxt_slow_path(clone->uidmap.size > 0)) { - nxt_log(task, NXT_LOG_ERR, "\"uidmap\" is set but " - "\"isolation.namespaces.credential\" is false or unset"); - - return NXT_ERROR; - } - - if (nxt_slow_path(clone->gidmap.size > 0)) { - nxt_log(task, NXT_LOG_ERR, "\"gidmap\" is set but " - "\"isolation.namespaces.credential\" is false or unset"); - - return NXT_ERROR; - } - - return NXT_OK; - } - - ret = nxt_clone_vldt_credential_uidmap(task, &clone->uidmap, - init->user_cred); - - if (nxt_slow_path(ret != NXT_OK)) { - return NXT_ERROR; - } - - return nxt_clone_vldt_credential_gidmap(task, &clone->gidmap, - init->user_cred); -} - - -static nxt_int_t -nxt_init_isolation_credential_map(nxt_task_t *task, nxt_mp_t *mem_pool, - nxt_conf_value_t *map_array, nxt_clone_credential_map_t *map) -{ - nxt_int_t ret; - nxt_uint_t i; - nxt_conf_value_t *obj; - - static nxt_conf_map_t nxt_clone_map_entry_conf[] = { - { - nxt_string("container"), - NXT_CONF_MAP_INT, - offsetof(nxt_clone_map_entry_t, container), - }, - - { - nxt_string("host"), - NXT_CONF_MAP_INT, - offsetof(nxt_clone_map_entry_t, host), - }, - - { - nxt_string("size"), - NXT_CONF_MAP_INT, - offsetof(nxt_clone_map_entry_t, size), - }, - }; - - map->size = nxt_conf_array_elements_count(map_array); - - if (map->size == 0) { - return NXT_OK; - } - - map->map = nxt_mp_alloc(mem_pool, - map->size * sizeof(nxt_clone_map_entry_t)); - if (nxt_slow_path(map->map == NULL)) { - return NXT_ERROR; - } - - for (i = 0; i < map->size; i++) { - obj = nxt_conf_get_array_element(map_array, i); - - ret = nxt_conf_map_object(mem_pool, obj, nxt_clone_map_entry_conf, - nxt_nitems(nxt_clone_map_entry_conf), - map->map + i); - if (nxt_slow_path(ret != NXT_OK)) { - nxt_alert(task, "clone map entry map error"); - return NXT_ERROR; - } - } - - return NXT_OK; -} - -#endif - -#if (NXT_HAVE_CLONE) - -static nxt_int_t -nxt_init_clone_flags(nxt_task_t *task, nxt_conf_value_t *namespaces, - nxt_process_init_t *init) -{ - uint32_t index; - nxt_str_t name; - nxt_int_t flag; - nxt_conf_value_t *value; - - index = 0; - - for ( ;; ) { - value = nxt_conf_next_object_member(namespaces, &name, &index); - - if (value == NULL) { - break; - } - - flag = 0; - -#if (NXT_HAVE_CLONE_NEWUSER) - if (nxt_str_eq(&name, "credential", 10)) { - flag = CLONE_NEWUSER; - } -#endif - -#if (NXT_HAVE_CLONE_NEWPID) - if (nxt_str_eq(&name, "pid", 3)) { - flag = CLONE_NEWPID; - } -#endif - -#if (NXT_HAVE_CLONE_NEWNET) - if (nxt_str_eq(&name, "network", 7)) { - flag = CLONE_NEWNET; - } -#endif - -#if (NXT_HAVE_CLONE_NEWUTS) - if (nxt_str_eq(&name, "uname", 5)) { - flag = CLONE_NEWUTS; - } -#endif - -#if (NXT_HAVE_CLONE_NEWNS) - if (nxt_str_eq(&name, "mount", 5)) { - flag = CLONE_NEWNS; - } -#endif - -#if (NXT_HAVE_CLONE_NEWCGROUP) - if (nxt_str_eq(&name, "cgroup", 6)) { - flag = CLONE_NEWCGROUP; - } -#endif - - if (!flag) { - nxt_alert(task, "unknown namespace flag: \"%V\"", &name); - return NXT_ERROR; - } - - if (nxt_conf_get_boolean(value)) { - init->isolation.clone.flags |= flag; - } - } - - return NXT_OK; -} - -#endif - - -static nxt_process_init_t * -nxt_process_init_create(nxt_task_t *task, nxt_process_type_t type, - const nxt_str_t *name) -{ - nxt_mp_t *mp; - nxt_int_t ret; - nxt_runtime_t *rt; - nxt_process_init_t *init; - - mp = nxt_mp_create(1024, 128, 256, 32); - if (nxt_slow_path(mp == NULL)) { - return NULL; - } - - init = nxt_mp_zalloc(mp, sizeof(nxt_process_init_t)); - if (nxt_slow_path(init == NULL)) { - goto fail; - } - - init->mem_pool = mp; - - ret = nxt_process_init_name_set(init, type, name); - if (nxt_slow_path(ret != NXT_OK)) { - goto fail; - } - - rt = task->thread->runtime; - - init->type = type; - init->start = nxt_process_starts[type]; - init->port_handlers = nxt_process_port_handlers[type]; - init->signals = nxt_worker_process_signals; - init->user_cred = &rt->user_cred; - init->data = &rt; - - return init; - -fail: - - nxt_mp_destroy(mp); - - return NULL; -} - - -static nxt_int_t -nxt_process_init_name_set(nxt_process_init_t *init, nxt_process_type_t type, - const nxt_str_t *name) -{ - u_char *str, *end; - size_t size; - const char *fmt; - - size = name->length + 1; - - if (type == NXT_PROCESS_WORKER) { - size += nxt_length("\"\" application"); - fmt = "\"%V\" application%Z"; - - } else { - fmt = "%V%Z"; - } - - str = nxt_mp_alloc(init->mem_pool, size); - if (nxt_slow_path(str == NULL)) { - return NXT_ERROR; - } - - end = str + size; - - nxt_sprintf(str, end, fmt, name); - - init->name = (char *) str; - - return NXT_OK; -} - - -static nxt_int_t -nxt_process_init_creds_set(nxt_task_t *task, nxt_process_init_t *init, - nxt_str_t *user, nxt_str_t *group) -{ - char *str; - - init->user_cred = nxt_mp_zalloc(init->mem_pool, sizeof(nxt_credential_t)); - - if (nxt_slow_path(init->user_cred == NULL)) { - return NXT_ERROR; - } - - str = nxt_mp_zalloc(init->mem_pool, user->length + 1); - if (nxt_slow_path(str == NULL)) { - return NXT_ERROR; - } - - nxt_memcpy(str, user->start, user->length); - str[user->length] = '\0'; - - init->user_cred->user = str; - - if (group->start != NULL) { - str = nxt_mp_zalloc(init->mem_pool, group->length + 1); - if (nxt_slow_path(str == NULL)) { - return NXT_ERROR; - } - - nxt_memcpy(str, group->start, group->length); - str[group->length] = '\0'; - - } else { - str = NULL; - } - - return nxt_credential_get(task, init->mem_pool, init->user_cred, str); -} |