diff options
-rw-r--r-- | auto/sources | 2 | ||||
-rw-r--r-- | src/nxt_application.c | 363 | ||||
-rw-r--r-- | src/nxt_application.h | 11 | ||||
-rw-r--r-- | src/nxt_cert.c | 11 | ||||
-rw-r--r-- | src/nxt_cert.h | 2 | ||||
-rw-r--r-- | src/nxt_clone.h | 14 | ||||
-rw-r--r-- | src/nxt_controller.c | 142 | ||||
-rw-r--r-- | src/nxt_external.c | 11 | ||||
-rw-r--r-- | src/nxt_java.c | 61 | ||||
-rw-r--r-- | src/nxt_main_process.c | 1049 | ||||
-rw-r--r-- | src/nxt_main_process.h | 19 | ||||
-rw-r--r-- | src/nxt_php_sapi.c | 32 | ||||
-rw-r--r-- | src/nxt_port.c | 4 | ||||
-rw-r--r-- | src/nxt_port.h | 141 | ||||
-rw-r--r-- | src/nxt_port_memory.c | 2 | ||||
-rw-r--r-- | src/nxt_process.c | 534 | ||||
-rw-r--r-- | src/nxt_process.h | 137 | ||||
-rw-r--r-- | src/nxt_process_type.h | 2 | ||||
-rw-r--r-- | src/nxt_python_wsgi.c | 13 | ||||
-rw-r--r-- | src/nxt_router.c | 44 | ||||
-rw-r--r-- | src/nxt_runtime.c | 60 | ||||
-rw-r--r-- | src/nxt_runtime.h | 1 | ||||
-rw-r--r-- | src/nxt_signal_handlers.c | 67 | ||||
-rw-r--r-- | src/nxt_unit.c | 2 | ||||
-rw-r--r-- | src/nxt_worker_process.c | 118 | ||||
-rw-r--r-- | src/perl/nxt_perl_psgi.c | 11 | ||||
-rw-r--r-- | src/ruby/nxt_ruby.c | 20 | ||||
-rw-r--r-- | test/test_java_application.py | 1 |
28 files changed, 1543 insertions, 1331 deletions
diff --git a/auto/sources b/auto/sources index c6b34bbc..4ac132dd 100644 --- a/auto/sources +++ b/auto/sources @@ -78,7 +78,7 @@ NXT_LIB_SRCS=" \ src/nxt_conf.c \ src/nxt_conf_validation.c \ src/nxt_main_process.c \ - src/nxt_worker_process.c \ + src/nxt_signal_handlers.c \ src/nxt_controller.c \ src/nxt_router.c \ src/nxt_h1proto.c \ diff --git a/src/nxt_application.c b/src/nxt_application.c index bebe3907..6de82257 100644 --- a/src/nxt_application.c +++ b/src/nxt_application.c @@ -25,6 +25,8 @@ typedef struct { } nxt_module_t; +static nxt_int_t nxt_discovery_start(nxt_task_t *task, + nxt_process_data_t *data); static nxt_buf_t *nxt_discovery_modules(nxt_task_t *task, const char *path); static nxt_int_t nxt_discovery_module(nxt_task_t *task, nxt_mp_t *mp, nxt_array_t *modules, const char *name); @@ -34,7 +36,27 @@ static void nxt_discovery_quit(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data); static nxt_app_module_t *nxt_app_module_load(nxt_task_t *task, const char *name); +static nxt_int_t nxt_app_prefork(nxt_task_t *task, nxt_process_t *process, + nxt_mp_t *mp); +static nxt_int_t nxt_app_setup(nxt_task_t *task, nxt_process_t *process); static nxt_int_t nxt_app_set_environment(nxt_conf_value_t *environment); +static nxt_int_t nxt_app_isolation(nxt_task_t *task, + nxt_conf_value_t *isolation, nxt_process_t *process); + +#if (NXT_HAVE_CLONE) +static nxt_int_t nxt_app_clone_flags(nxt_task_t *task, + nxt_conf_value_t *namespaces, nxt_clone_t *clone); +#endif + +#if (NXT_HAVE_CLONE_NEWUSER) +static nxt_int_t nxt_app_isolation_creds(nxt_task_t *task, + nxt_conf_value_t *isolation, nxt_process_t *process); +static nxt_int_t nxt_app_isolation_credential_map(nxt_task_t *task, + nxt_mp_t *mem_pool, nxt_conf_value_t *map_array, + nxt_clone_credential_map_t *map); +#endif + +nxt_str_t nxt_server = nxt_string(NXT_SERVER); static uint32_t compat[] = { @@ -42,14 +64,53 @@ static uint32_t compat[] = { }; -nxt_str_t nxt_server = nxt_string(NXT_SERVER); +static nxt_app_module_t *nxt_app; -static nxt_app_module_t *nxt_app; +static const nxt_port_handlers_t nxt_discovery_process_port_handlers = { + .quit = nxt_signal_quit_handler, + .new_port = nxt_port_new_port_handler, + .change_file = nxt_port_change_log_file_handler, + .mmap = nxt_port_mmap_handler, + .data = nxt_port_data_handler, + .remove_pid = nxt_port_remove_pid_handler, + .rpc_ready = nxt_port_rpc_handler, + .rpc_error = nxt_port_rpc_handler, +}; -nxt_int_t -nxt_discovery_start(nxt_task_t *task, void *data) +static const nxt_port_handlers_t nxt_app_process_port_handlers = { + .quit = nxt_signal_quit_handler, + .rpc_ready = nxt_port_rpc_handler, + .rpc_error = nxt_port_rpc_handler, +}; + + +const nxt_process_init_t nxt_discovery_process = { + .name = "discovery", + .type = NXT_PROCESS_DISCOVERY, + .prefork = NULL, + .restart = 0, + .setup = nxt_process_core_setup, + .start = nxt_discovery_start, + .port_handlers = &nxt_discovery_process_port_handlers, + .signals = nxt_process_signals, +}; + + +const nxt_process_init_t nxt_app_process = { + .type = NXT_PROCESS_APP, + .setup = nxt_app_setup, + .prefork = nxt_app_prefork, + .restart = 0, + .start = NULL, /* set to module->start */ + .port_handlers = &nxt_app_process_port_handlers, + .signals = nxt_process_signals, +}; + + +static nxt_int_t +nxt_discovery_start(nxt_task_t *task, nxt_process_data_t *data) { uint32_t stream; nxt_buf_t *b; @@ -57,7 +118,7 @@ nxt_discovery_start(nxt_task_t *task, void *data) nxt_port_t *main_port, *discovery_port; nxt_runtime_t *rt; - nxt_debug(task, "DISCOVERY"); + nxt_log(task, NXT_LOG_INFO, "discovery started"); rt = task->thread->runtime; @@ -301,18 +362,85 @@ nxt_discovery_completion_handler(nxt_task_t *task, void *obj, void *data) static void nxt_discovery_quit(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) { - nxt_worker_process_quit_handler(task, msg); + nxt_signal_quit_handler(task, msg); } -nxt_int_t -nxt_app_start(nxt_task_t *task, void *data) +static nxt_int_t +nxt_app_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp) +{ + nxt_int_t cap_setid; + nxt_int_t ret; + nxt_runtime_t *rt; + nxt_common_app_conf_t *app_conf; + + rt = task->thread->runtime; + app_conf = process->data.app; + cap_setid = rt->capabilities.setid; + + if (app_conf->isolation != NULL) { + ret = nxt_app_isolation(task, app_conf->isolation, process); + if (nxt_slow_path(ret != NXT_OK)) { + return ret; + } + } + +#if (NXT_HAVE_CLONE_NEWUSER) + if (nxt_is_clone_flag_set(process->isolation.clone.flags, NEWUSER)) { + cap_setid = 1; + } +#endif + + if (cap_setid) { + ret = nxt_process_creds_set(task, process, &app_conf->user, + &app_conf->group); + + if (nxt_slow_path(ret != NXT_OK)) { + return ret; + } + + } else { + if (!nxt_str_eq(&app_conf->user, (u_char *) rt->user_cred.user, + nxt_strlen(rt->user_cred.user))) + { + nxt_alert(task, "cannot set user \"%V\" for app \"%V\": " + "missing capabilities", &app_conf->user, &app_conf->name); + + return NXT_ERROR; + } + + if (app_conf->group.length > 0 + && !nxt_str_eq(&app_conf->group, (u_char *) rt->group, + nxt_strlen(rt->group))) + { + nxt_alert(task, "cannot set group \"%V\" for app \"%V\": " + "missing capabilities", &app_conf->group, + &app_conf->name); + + return NXT_ERROR; + } + } + +#if (NXT_HAVE_CLONE_NEWUSER) + ret = nxt_process_vldt_isolation_creds(task, process); + if (nxt_slow_path(ret != NXT_OK)) { + return ret; + } +#endif + + return NXT_OK; +} + + +static nxt_int_t +nxt_app_setup(nxt_task_t *task, nxt_process_t *process) { nxt_int_t ret; + nxt_process_init_t *init; nxt_app_lang_module_t *lang; nxt_common_app_conf_t *app_conf; - app_conf = data; + app_conf = process->data.app; lang = nxt_app_lang_module(task->thread->runtime, &app_conf->type); if (nxt_slow_path(lang == NULL)) { @@ -332,8 +460,8 @@ nxt_app_start(nxt_task_t *task, void *data) } } - if (nxt_app->pre_init != NULL) { - ret = nxt_app->pre_init(task, data); + if (nxt_app->setup != NULL) { + ret = nxt_app->setup(task, process, app_conf); if (nxt_slow_path(ret != NXT_OK)) { return ret; @@ -360,16 +488,13 @@ nxt_app_start(nxt_task_t *task, void *data) return NXT_ERROR; } - ret = nxt_app->init(task, data); + init = nxt_process_init(process); - if (nxt_slow_path(ret != NXT_OK)) { - nxt_debug(task, "application init failed"); + init->start = nxt_app->start; - } else { - nxt_debug(task, "application init done"); - } + process->state = NXT_PROCESS_STATE_CREATED; - return ret; + return NXT_OK; } @@ -429,6 +554,206 @@ nxt_app_set_environment(nxt_conf_value_t *environment) } +static nxt_int_t +nxt_app_isolation(nxt_task_t *task, nxt_conf_value_t *isolation, + nxt_process_t *process) +{ +#if (NXT_HAVE_CLONE) + nxt_int_t ret; + nxt_conf_value_t *obj; + + static nxt_str_t nsname = nxt_string("namespaces"); + + obj = nxt_conf_get_object_member(isolation, &nsname, NULL); + if (obj != NULL) { + ret = nxt_app_clone_flags(task, obj, &process->isolation.clone); + if (nxt_slow_path(ret != NXT_OK)) { + return NXT_ERROR; + } + } +#endif + +#if (NXT_HAVE_CLONE_NEWUSER) + ret = nxt_app_isolation_creds(task, isolation, process); + if (nxt_slow_path(ret != NXT_OK)) { + return NXT_ERROR; + } +#endif + + return NXT_OK; +} + + +#if (NXT_HAVE_CLONE_NEWUSER) + +static nxt_int_t +nxt_app_isolation_creds(nxt_task_t *task, nxt_conf_value_t *isolation, + nxt_process_t *process) +{ + nxt_int_t ret; + nxt_clone_t *clone; + nxt_conf_value_t *array; + + static nxt_str_t uidname = nxt_string("uidmap"); + static nxt_str_t gidname = nxt_string("gidmap"); + + clone = &process->isolation.clone; + + array = nxt_conf_get_object_member(isolation, &uidname, NULL); + if (array != NULL) { + ret = nxt_app_isolation_credential_map(task, process->mem_pool, array, + &clone->uidmap); + + if (nxt_slow_path(ret != NXT_OK)) { + return NXT_ERROR; + } + } + + array = nxt_conf_get_object_member(isolation, &gidname, NULL); + if (array != NULL) { + ret = nxt_app_isolation_credential_map(task, process->mem_pool, array, + &clone->gidmap); + + if (nxt_slow_path(ret != NXT_OK)) { + return NXT_ERROR; + } + } + + return NXT_OK; +} + + +static nxt_int_t +nxt_app_isolation_credential_map(nxt_task_t *task, nxt_mp_t *mp, + nxt_conf_value_t *map_array, nxt_clone_credential_map_t *map) +{ + nxt_int_t ret; + nxt_uint_t i; + nxt_conf_value_t *obj; + + static nxt_conf_map_t nxt_clone_map_entry_conf[] = { + { + nxt_string("container"), + NXT_CONF_MAP_INT, + offsetof(nxt_clone_map_entry_t, container), + }, + + { + nxt_string("host"), + NXT_CONF_MAP_INT, + offsetof(nxt_clone_map_entry_t, host), + }, + + { + nxt_string("size"), + NXT_CONF_MAP_INT, + offsetof(nxt_clone_map_entry_t, size), + }, + }; + + map->size = nxt_conf_array_elements_count(map_array); + + if (map->size == 0) { + return NXT_OK; + } + + map->map = nxt_mp_alloc(mp, map->size * sizeof(nxt_clone_map_entry_t)); + if (nxt_slow_path(map->map == NULL)) { + return NXT_ERROR; + } + + for (i = 0; i < map->size; i++) { + obj = nxt_conf_get_array_element(map_array, i); + + ret = nxt_conf_map_object(mp, obj, nxt_clone_map_entry_conf, + nxt_nitems(nxt_clone_map_entry_conf), + map->map + i); + if (nxt_slow_path(ret != NXT_OK)) { + nxt_alert(task, "clone map entry map error"); + return NXT_ERROR; + } + } + + return NXT_OK; +} + +#endif + +#if (NXT_HAVE_CLONE) + +static nxt_int_t +nxt_app_clone_flags(nxt_task_t *task, nxt_conf_value_t *namespaces, + nxt_clone_t *clone) +{ + uint32_t index; + nxt_str_t name; + nxt_int_t flag; + nxt_conf_value_t *value; + + index = 0; + + for ( ;; ) { + value = nxt_conf_next_object_member(namespaces, &name, &index); + + if (value == NULL) { + break; + } + + flag = 0; + +#if (NXT_HAVE_CLONE_NEWUSER) + if (nxt_str_eq(&name, "credential", 10)) { + flag = CLONE_NEWUSER; + } +#endif + +#if (NXT_HAVE_CLONE_NEWPID) + if (nxt_str_eq(&name, "pid", 3)) { + flag = CLONE_NEWPID; + } +#endif + +#if (NXT_HAVE_CLONE_NEWNET) + if (nxt_str_eq(&name, "network", 7)) { + flag = CLONE_NEWNET; + } +#endif + +#if (NXT_HAVE_CLONE_NEWUTS) + if (nxt_str_eq(&name, "uname", 5)) { + flag = CLONE_NEWUTS; + } +#endif + +#if (NXT_HAVE_CLONE_NEWNS) + if (nxt_str_eq(&name, "mount", 5)) { + flag = CLONE_NEWNS; + } +#endif + +#if (NXT_HAVE_CLONE_NEWCGROUP) + if (nxt_str_eq(&name, "cgroup", 6)) { + flag = CLONE_NEWCGROUP; + } +#endif + + if (!flag) { + nxt_alert(task, "unknown namespace flag: \"%V\"", &name); + return NXT_ERROR; + } + + if (nxt_conf_get_boolean(value)) { + clone->flags |= flag; + } + } + + return NXT_OK; +} + +#endif + + + nxt_app_lang_module_t * nxt_app_lang_module(nxt_runtime_t *rt, nxt_str_t *name) { @@ -539,7 +864,7 @@ nxt_unit_default_init(nxt_task_t *task, nxt_unit_init_t *init) nxt_fd_blocking(task, main_port->pair[1]); - init->ready_stream = my_port->process->init->stream; + init->ready_stream = my_port->process->stream; init->read_port.id.pid = my_port->pid; init->read_port.id.id = my_port->id; diff --git a/src/nxt_application.h b/src/nxt_application.h index 972a712b..b4231e3b 100644 --- a/src/nxt_application.h +++ b/src/nxt_application.h @@ -27,6 +27,8 @@ typedef enum { typedef struct nxt_app_module_s nxt_app_module_t; +typedef nxt_int_t (*nxt_application_setup_t)(nxt_task_t *task, + nxt_process_t *process, nxt_common_app_conf_t *conf); typedef struct { @@ -37,9 +39,6 @@ typedef struct { } nxt_app_lang_module_t; -typedef struct nxt_common_app_conf_s nxt_common_app_conf_t; - - typedef struct { char *executable; nxt_conf_value_t *arguments; @@ -111,10 +110,8 @@ struct nxt_app_module_s { nxt_str_t type; const char *version; - nxt_int_t (*pre_init)(nxt_task_t *task, - nxt_common_app_conf_t *conf); - nxt_int_t (*init)(nxt_task_t *task, - nxt_common_app_conf_t *conf); + nxt_application_setup_t setup; + nxt_process_start_t start; }; diff --git a/src/nxt_cert.c b/src/nxt_cert.c index ee258646..9e825d80 100644 --- a/src/nxt_cert.c +++ b/src/nxt_cert.c @@ -797,12 +797,11 @@ nxt_cert_info_delete(nxt_str_t *name) nxt_array_t * -nxt_cert_store_load(nxt_task_t *task) +nxt_cert_store_load(nxt_task_t *task, nxt_mp_t *mp) { DIR *dir; size_t size, alloc; u_char *buf, *p; - nxt_mp_t *mp; nxt_str_t name; nxt_int_t ret; nxt_file_t file; @@ -818,14 +817,8 @@ nxt_cert_store_load(nxt_task_t *task) return NULL; } - mp = nxt_mp_create(1024, 128, 256, 32); - if (nxt_slow_path(mp == NULL)) { - return NULL; - } - certs = nxt_array_create(mp, 16, sizeof(nxt_cert_item_t)); if (nxt_slow_path(certs == NULL)) { - nxt_mp_destroy(mp); return NULL; } @@ -933,7 +926,7 @@ nxt_cert_store_release(nxt_array_t *certs) nxt_fd_close(items[i].fd); } - nxt_mp_destroy(certs->mem_pool); + nxt_array_destroy(certs); } diff --git a/src/nxt_cert.h b/src/nxt_cert.h index 319d5d3c..dbaddcf9 100644 --- a/src/nxt_cert.h +++ b/src/nxt_cert.h @@ -19,7 +19,7 @@ nxt_conf_value_t *nxt_cert_info_get(nxt_str_t *name); nxt_conf_value_t *nxt_cert_info_get_all(nxt_mp_t *mp); nxt_int_t nxt_cert_info_delete(nxt_str_t *name); -nxt_array_t *nxt_cert_store_load(nxt_task_t *task); +nxt_array_t *nxt_cert_store_load(nxt_task_t *task, nxt_mp_t *mem_pool); void nxt_cert_store_release(nxt_array_t *certs); void nxt_cert_store_get(nxt_task_t *task, nxt_str_t *name, nxt_mp_t *mp, diff --git a/src/nxt_clone.h b/src/nxt_clone.h index dcccf1db..c2066ce6 100644 --- a/src/nxt_clone.h +++ b/src/nxt_clone.h @@ -3,8 +3,8 @@ * Copyright (C) NGINX, Inc. */ -#ifndef _NXT_CLONE_INCLUDED_ -#define _NXT_CLONE_INCLUDED_ +#ifndef _NXT_CLONE_H_INCLUDED_ +#define _NXT_CLONE_H_INCLUDED_ #if (NXT_HAVE_CLONE_NEWUSER) @@ -36,10 +36,11 @@ typedef struct { pid_t nxt_clone(nxt_int_t flags); -#if (NXT_HAVE_CLONE_NEWUSER) +#define nxt_is_clone_flag_set(flags, test) \ + ((flags & CLONE_##test) == CLONE_##test) + -#define NXT_CLONE_USER(flags) \ - ((flags & CLONE_NEWUSER) == CLONE_NEWUSER) +#if (NXT_HAVE_CLONE_NEWUSER) NXT_EXPORT nxt_int_t nxt_clone_credential_map(nxt_task_t *task, pid_t pid, nxt_credential_t *creds, nxt_clone_t *clone); @@ -50,4 +51,5 @@ NXT_EXPORT nxt_int_t nxt_clone_vldt_credential_gidmap(nxt_task_t *task, #endif -#endif /* _NXT_CLONE_INCLUDED_ */ + +#endif /* _NXT_CLONE_H_INCLUDED_ */ diff --git a/src/nxt_controller.c b/src/nxt_controller.c index ea70cf78..a61c127d 100644 --- a/src/nxt_controller.c +++ b/src/nxt_controller.c @@ -39,11 +39,17 @@ typedef struct { } nxt_controller_response_t; +static nxt_int_t nxt_controller_prefork(nxt_task_t *task, + nxt_process_t *process, nxt_mp_t *mp); +static nxt_int_t nxt_controller_start(nxt_task_t *task, + nxt_process_data_t *data); static void nxt_controller_process_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); static void nxt_controller_send_current_conf(nxt_task_t *task); static void nxt_controller_router_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); +static void nxt_controller_remove_pid_handler(nxt_task_t *task, + nxt_port_recv_msg_t *msg); static nxt_int_t nxt_controller_conf_default(void); static void nxt_controller_conf_init_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data); @@ -83,6 +89,8 @@ static void nxt_controller_process_cert(nxt_task_t *task, static void nxt_controller_process_cert_save(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data); static nxt_bool_t nxt_controller_cert_in_use(nxt_str_t *name); +static void nxt_controller_cert_cleanup(nxt_task_t *task, void *obj, + void *data); #endif static void nxt_controller_conf_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data); @@ -114,21 +122,117 @@ static const nxt_event_conn_state_t nxt_controller_conn_write_state; static const nxt_event_conn_state_t nxt_controller_conn_close_state; -nxt_port_handlers_t nxt_controller_process_port_handlers = { - .quit = nxt_worker_process_quit_handler, +static const nxt_port_handlers_t nxt_controller_process_port_handlers = { + .quit = nxt_signal_quit_handler, .new_port = nxt_controller_process_new_port_handler, .change_file = nxt_port_change_log_file_handler, .mmap = nxt_port_mmap_handler, .process_ready = nxt_controller_router_ready_handler, .data = nxt_port_data_handler, - .remove_pid = nxt_port_remove_pid_handler, + .remove_pid = nxt_controller_remove_pid_handler, .rpc_ready = nxt_port_rpc_handler, .rpc_error = nxt_port_rpc_handler, }; -nxt_int_t -nxt_controller_start(nxt_task_t *task, void *data) +const nxt_process_init_t nxt_controller_process = { + .name = "controller", + .type = NXT_PROCESS_CONTROLLER, + .prefork = nxt_controller_prefork, + .restart = 1, + .setup = nxt_process_core_setup, + .start = nxt_controller_start, + .port_handlers = &nxt_controller_process_port_handlers, + .signals = nxt_process_signals, +}; + + +static nxt_int_t +nxt_controller_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp) +{ + ssize_t n; + nxt_int_t ret; + nxt_str_t *conf; + nxt_file_t file; + nxt_runtime_t *rt; + nxt_file_info_t fi; + nxt_controller_init_t ctrl_init; + + nxt_log(task, NXT_LOG_INFO, "controller started"); + + rt = task->thread->runtime; + + nxt_memzero(&ctrl_init, sizeof(nxt_controller_init_t)); + + conf = &ctrl_init.conf; + + nxt_memzero(&file, sizeof(nxt_file_t)); + + file.name = (nxt_file_name_t *) rt->conf; + + ret = nxt_file_open(task, &file, NXT_FILE_RDONLY, NXT_FILE_OPEN, 0); + + if (ret == NXT_OK) { + ret = nxt_file_info(&file, &fi); + + if (nxt_fast_path(ret == NXT_OK && nxt_is_file(&fi))) { + conf->length = nxt_file_size(&fi); + conf->start = nxt_mp_alloc(mp, conf->length); + if (nxt_slow_path(conf->start == NULL)) { + nxt_file_close(task, &file); + return NXT_ERROR; + } + + n = nxt_file_read(&file, conf->start, conf->length, 0); + + if (nxt_slow_path(n != (ssize_t) conf->length)) { + conf->start = NULL; + conf->length = 0; + + nxt_alert(task, "failed to restore previous configuration: " + "cannot read the file"); + } + } + + nxt_file_close(task, &file); + } + +#if (NXT_TLS) + ctrl_init.certs = nxt_cert_store_load(task, mp); + + nxt_mp_cleanup(mp, nxt_controller_cert_cleanup, task, ctrl_init.certs, rt); +#endif + + process->data.controller = ctrl_init; + + return NXT_OK; +} + + +#if (NXT_TLS) + +static void +nxt_controller_cert_cleanup(nxt_task_t *task, void *obj, void *data) +{ + pid_t main_pid; + nxt_array_t *certs; + nxt_runtime_t *rt; + + certs = obj; + rt = data; + + main_pid = rt->port_by_type[NXT_PROCESS_MAIN]->pid; + + if (nxt_pid == main_pid && certs != NULL) { + nxt_cert_store_release(certs); + } +} + +#endif + + +static nxt_int_t +nxt_controller_start(nxt_task_t *task, nxt_process_data_t *data) { nxt_mp_t *mp; nxt_int_t ret; @@ -147,15 +251,13 @@ nxt_controller_start(nxt_task_t *task, void *data) nxt_queue_init(&nxt_controller_waiting_requests); - init = data; + init = &data->controller; #if (NXT_TLS) - if (init->certs != NULL) { nxt_cert_info_init(task, init->certs); nxt_cert_store_release(init->certs); } - #endif json = &init->conf; @@ -170,8 +272,6 @@ nxt_controller_start(nxt_task_t *task, void *data) } conf = nxt_conf_json_parse_str(mp, json); - nxt_free(json->start); - if (nxt_slow_path(conf == NULL)) { nxt_alert(task, "failed to restore previous configuration: " "file is corrupted or not enough memory"); @@ -295,6 +395,28 @@ nxt_controller_router_ready_handler(nxt_task_t *task, } +static void +nxt_controller_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + nxt_pid_t pid; + nxt_process_t *process; + nxt_runtime_t *rt; + + rt = task->thread->runtime; + + nxt_assert(nxt_buf_used_size(msg->buf) == sizeof(pid)); + + nxt_memcpy(&pid, msg->buf->mem.pos, sizeof(pid)); + + process = nxt_runtime_process_find(rt, pid); + if (process != NULL && nxt_process_type(process) == NXT_PROCESS_ROUTER) { + nxt_controller_router_ready = 0; + } + + nxt_port_remove_pid_handler(task, msg); +} + + static nxt_int_t nxt_controller_conf_default(void) { diff --git a/src/nxt_external.c b/src/nxt_external.c index e498a938..58523525 100644 --- a/src/nxt_external.c +++ b/src/nxt_external.c @@ -9,8 +9,7 @@ #include <nxt_unit.h> -static nxt_int_t nxt_external_init(nxt_task_t *task, - nxt_common_app_conf_t *conf); +static nxt_int_t nxt_external_start(nxt_task_t *task, nxt_process_data_t *data); nxt_app_module_t nxt_external_module = { @@ -19,7 +18,7 @@ nxt_app_module_t nxt_external_module = { nxt_string("external"), "*", NULL, - nxt_external_init, + nxt_external_start, }; @@ -58,7 +57,7 @@ nxt_external_fd_no_cloexec(nxt_task_t *task, nxt_socket_t fd) static nxt_int_t -nxt_external_init(nxt_task_t *task, nxt_common_app_conf_t *conf) +nxt_external_start(nxt_task_t *task, nxt_process_data_t *data) { char **argv; u_char buf[256]; @@ -71,9 +70,11 @@ nxt_external_init(nxt_task_t *task, nxt_common_app_conf_t *conf) nxt_port_t *my_port, *main_port; nxt_runtime_t *rt; nxt_conf_value_t *value; + nxt_common_app_conf_t *conf; nxt_external_app_conf_t *c; rt = task->thread->runtime; + conf = data->app; main_port = rt->port_by_type[NXT_PROCESS_MAIN]; my_port = nxt_runtime_port_find(rt, nxt_pid, 0); @@ -99,7 +100,7 @@ nxt_external_init(nxt_task_t *task, nxt_common_app_conf_t *conf) "%PI,%ud,%d;" "%PI,%ud,%d;" "%d,%z,%Z", - NXT_VERSION, my_port->process->init->stream, + NXT_VERSION, my_port->process->stream, main_port->pid, main_port->id, main_port->pair[1], my_port->pid, my_port->id, my_port->pair[0], 2, conf->shm_limit); diff --git a/src/nxt_java.c b/src/nxt_java.c index 004907d6..c4145c1d 100644 --- a/src/nxt_java.c +++ b/src/nxt_java.c @@ -27,9 +27,10 @@ #include "nxt_jars.h" -static nxt_int_t nxt_java_pre_init(nxt_task_t *task, +static nxt_int_t nxt_java_setup(nxt_task_t *task, nxt_process_t *process, nxt_common_app_conf_t *conf); -static nxt_int_t nxt_java_init(nxt_task_t *task, nxt_common_app_conf_t *conf); +static nxt_int_t nxt_java_start(nxt_task_t *task, + nxt_process_data_t *data); static void nxt_java_request_handler(nxt_unit_request_info_t *req); static void nxt_java_websocket_handler(nxt_unit_websocket_frame_t *ws); static void nxt_java_close_handler(nxt_unit_request_info_t *req); @@ -49,8 +50,8 @@ NXT_EXPORT nxt_app_module_t nxt_app_module = { compat, nxt_string("java"), NXT_STRING(NXT_JAVA_VERSION), - nxt_java_pre_init, - nxt_java_init, + nxt_java_setup, + nxt_java_start, }; typedef struct { @@ -60,7 +61,8 @@ typedef struct { static nxt_int_t -nxt_java_pre_init(nxt_task_t *task, nxt_common_app_conf_t *conf) +nxt_java_setup(nxt_task_t *task, nxt_process_t *process, + nxt_common_app_conf_t *conf) { const char *unit_jars; @@ -115,24 +117,26 @@ nxt_java_module_jars(const char *jars[], int jar_count) static nxt_int_t -nxt_java_init(nxt_task_t *task, nxt_common_app_conf_t *conf) +nxt_java_start(nxt_task_t *task, nxt_process_data_t *data) { - jint rc; - char *opt, *real_path; - char **classpath_arr, **unit_jars, **system_jars; - JavaVM *jvm; - JNIEnv *env; - jobject cl, classpath; - nxt_str_t str; - nxt_int_t opt_len, real_path_len; - nxt_uint_t i, unit_jars_count, classpath_count, system_jars_count; - JavaVMOption *jvm_opt; - JavaVMInitArgs jvm_args; - nxt_unit_ctx_t *ctx; - nxt_unit_init_t java_init; - nxt_java_data_t data; - nxt_conf_value_t *value; - nxt_java_app_conf_t *c; + jint rc; + char *opt, *real_path; + char **classpath_arr, **unit_jars, **system_jars; + JavaVM *jvm; + JNIEnv *env; + jobject cl, classpath; + nxt_str_t str; + nxt_int_t opt_len, real_path_len; + nxt_uint_t i, unit_jars_count, classpath_count; + nxt_uint_t system_jars_count; + JavaVMOption *jvm_opt; + JavaVMInitArgs jvm_args; + nxt_unit_ctx_t *ctx; + nxt_unit_init_t java_init; + nxt_java_data_t java_data; + nxt_conf_value_t *value; + nxt_java_app_conf_t *c; + nxt_common_app_conf_t *app_conf; //setenv("ASAN_OPTIONS", "handle_segv=0", 1); @@ -140,7 +144,8 @@ nxt_java_init(nxt_task_t *task, nxt_common_app_conf_t *conf) jvm_args.nOptions = 0; jvm_args.ignoreUnrecognized = 0; - c = &conf->u.java; + app_conf = data->app; + c = &app_conf->u.java; if (c->options != NULL) { jvm_args.nOptions += nxt_conf_array_elements_count(c->options); @@ -338,8 +343,8 @@ nxt_java_init(nxt_task_t *task, nxt_common_app_conf_t *conf) goto env_failed; } - data.env = env; - data.ctx = nxt_java_startContext(env, c->webapp, classpath); + java_data.env = env; + java_data.ctx = nxt_java_startContext(env, c->webapp, classpath); if ((*env)->ExceptionCheck(env)) { nxt_alert(task, "Unhandled exception in application start"); @@ -353,8 +358,8 @@ nxt_java_init(nxt_task_t *task, nxt_common_app_conf_t *conf) java_init.callbacks.websocket_handler = nxt_java_websocket_handler; java_init.callbacks.close_handler = nxt_java_close_handler; java_init.request_data_size = sizeof(nxt_java_request_data_t); - java_init.data = &data; - java_init.shm_limit = conf->shm_limit; + java_init.data = &java_data; + java_init.shm_limit = app_conf->shm_limit; ctx = nxt_unit_init(&java_init); if (nxt_slow_path(ctx == NULL)) { @@ -367,7 +372,7 @@ nxt_java_init(nxt_task_t *task, nxt_common_app_conf_t *conf) /* TODO report error */ } - nxt_java_stopContext(env, data.ctx); + nxt_java_stopContext(env, java_data.ctx); if ((*env)->ExceptionCheck(env)) { (*env)->ExceptionDescribe(env); diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c index c35954c0..0dff050b 100644 --- a/src/nxt_main_process.c +++ b/src/nxt_main_process.c @@ -36,20 +36,11 @@ extern nxt_port_handlers_t nxt_router_process_port_handlers; static nxt_int_t nxt_main_process_port_create(nxt_task_t *task, nxt_runtime_t *rt); static void nxt_main_process_title(nxt_task_t *task); -static nxt_int_t nxt_main_start_controller_process(nxt_task_t *task, - nxt_runtime_t *rt); -static nxt_int_t nxt_main_create_controller_process(nxt_task_t *task, - nxt_runtime_t *rt, nxt_process_init_t *init); -static nxt_int_t nxt_main_create_router_process(nxt_task_t *task, nxt_runtime_t *rt, - nxt_process_init_t *init); -static nxt_int_t nxt_main_start_router_process(nxt_task_t *task, - nxt_runtime_t *rt); -static nxt_int_t nxt_main_start_discovery_process(nxt_task_t *task, - nxt_runtime_t *rt); -static nxt_int_t nxt_main_start_worker_process(nxt_task_t *task, - nxt_runtime_t *rt, nxt_common_app_conf_t *app_conf, uint32_t stream); -static nxt_int_t nxt_main_create_worker_process(nxt_task_t *task, - nxt_runtime_t *rt, nxt_process_init_t *init); +static nxt_int_t nxt_main_process_create(nxt_task_t *task, + const nxt_process_init_t init); +static nxt_int_t nxt_main_start_process(nxt_task_t *task, + nxt_process_t *process); +static nxt_process_t *nxt_main_process_new(nxt_task_t *task, nxt_runtime_t *rt); static void nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj, void *data); static void nxt_main_process_sigquit_handler(nxt_task_t *task, void *obj, @@ -60,8 +51,7 @@ static void nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data); static void nxt_main_process_signal_handler(nxt_task_t *task, void *obj, void *data); -static void nxt_main_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid); -static void nxt_main_stop_worker_processes(nxt_task_t *task, nxt_runtime_t *rt); +static void nxt_main_cleanup_process(nxt_task_t *task, nxt_pid_t pid); static void nxt_main_port_socket_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); static nxt_int_t nxt_main_listening_socket(nxt_sockaddr_t *sa, @@ -73,29 +63,6 @@ static void nxt_main_port_conf_store_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); static void nxt_main_port_access_log_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); -static nxt_process_init_t *nxt_process_init_create(nxt_task_t *task, - nxt_process_type_t type, const nxt_str_t *name); -static nxt_int_t nxt_process_init_name_set(nxt_process_init_t *init, - nxt_process_type_t type, const nxt_str_t *name); -static nxt_int_t nxt_process_init_creds_set(nxt_task_t *task, - nxt_process_init_t *init, nxt_str_t *user, nxt_str_t *group); - -static nxt_int_t nxt_init_isolation(nxt_task_t *task, - nxt_conf_value_t *isolation, nxt_process_init_t *init); -#if (NXT_HAVE_CLONE) -static nxt_int_t nxt_init_clone_flags(nxt_task_t *task, - nxt_conf_value_t *namespaces, nxt_process_init_t *init); -#endif - -#if (NXT_HAVE_CLONE_NEWUSER) -static nxt_int_t nxt_init_isolation_creds(nxt_task_t *task, - nxt_conf_value_t *isolation, nxt_process_init_t *init); -static nxt_int_t nxt_init_vldt_isolation_creds(nxt_task_t *task, - nxt_process_init_t *init); -static nxt_int_t nxt_init_isolation_credential_map(nxt_task_t *task, - nxt_mp_t *mem_pool, nxt_conf_value_t *map_array, - nxt_clone_credential_map_t *map); -#endif const nxt_sig_event_t nxt_main_process_signals[] = { nxt_event_signal(SIGHUP, nxt_main_process_signal_handler), @@ -108,54 +75,6 @@ const nxt_sig_event_t nxt_main_process_signals[] = { }; -static const nxt_port_handlers_t nxt_app_process_port_handlers = { - .new_port = nxt_port_new_port_handler, - .change_file = nxt_port_change_log_file_handler, - .mmap = nxt_port_mmap_handler, - .remove_pid = nxt_port_remove_pid_handler, -}; - - -static const nxt_port_handlers_t nxt_discovery_process_port_handlers = { - .quit = nxt_worker_process_quit_handler, - .new_port = nxt_port_new_port_handler, - .change_file = nxt_port_change_log_file_handler, - .mmap = nxt_port_mmap_handler, - .data = nxt_port_data_handler, - .remove_pid = nxt_port_remove_pid_handler, - .rpc_ready = nxt_port_rpc_handler, - .rpc_error = nxt_port_rpc_handler, -}; - - -static const nxt_port_handlers_t *nxt_process_port_handlers[NXT_PROCESS_MAX] = -{ - NULL, - &nxt_discovery_process_port_handlers, - &nxt_controller_process_port_handlers, - &nxt_router_process_port_handlers, - &nxt_app_process_port_handlers -}; - - -static const nxt_process_start_t nxt_process_starts[NXT_PROCESS_MAX] = { - NULL, - nxt_discovery_start, - nxt_controller_start, - nxt_router_start, - nxt_app_start -}; - - -static const nxt_process_restart_t nxt_process_restarts[NXT_PROCESS_MAX] = { - NULL, - NULL, - &nxt_main_create_controller_process, - &nxt_main_create_router_process, - NULL -}; - - static nxt_bool_t nxt_exiting; @@ -172,11 +91,11 @@ nxt_main_process_start(nxt_thread_t *thr, nxt_task_t *task, nxt_main_process_title(task); /* - * The dicsovery process will send a message processed by + * The discovery process will send a message processed by * nxt_main_port_modules_handler() which starts the controller * and router processes. */ - return nxt_main_start_discovery_process(task, rt); + return nxt_main_process_create(task, nxt_discovery_process); } @@ -350,48 +269,71 @@ nxt_port_main_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) static void -nxt_port_main_start_worker_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +nxt_port_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { - u_char *start, ch; + u_char *start, *p, ch; size_t type_len; - nxt_mp_t *mp; nxt_int_t ret; nxt_buf_t *b; nxt_port_t *port; nxt_runtime_t *rt; + nxt_process_t *process; nxt_app_type_t idx; nxt_conf_value_t *conf; - nxt_common_app_conf_t app_conf; + nxt_process_init_t *init; + nxt_common_app_conf_t *app_conf; ret = NXT_ERROR; - mp = nxt_mp_create(1024, 128, 256, 32); + rt = task->thread->runtime; - if (nxt_slow_path(mp == NULL)) { + process = nxt_main_process_new(task, rt); + if (nxt_slow_path(process == NULL)) { return; } - b = nxt_buf_chk_make_plain(mp, msg->buf, msg->size); + init = nxt_process_init(process); + + *init = nxt_app_process; + b = nxt_buf_chk_make_plain(process->mem_pool, msg->buf, msg->size); if (b == NULL) { - return; + goto failed; } - nxt_debug(task, "main start worker: %*s", b->mem.free - b->mem.pos, + nxt_debug(task, "main start process: %*s", b->mem.free - b->mem.pos, b->mem.pos); - nxt_memzero(&app_conf, sizeof(nxt_common_app_conf_t)); + app_conf = nxt_mp_zalloc(process->mem_pool, sizeof(nxt_common_app_conf_t)); + if (nxt_slow_path(app_conf == NULL)) { + goto failed; + } start = b->mem.pos; - app_conf.name.start = start; - app_conf.name.length = nxt_strlen(start); - app_conf.shm_limit = 100 * 1024 * 1024; + app_conf->name.start = start; + app_conf->name.length = nxt_strlen(start); + + init->name = (const char *) start; - start += app_conf.name.length + 1; + process->name = nxt_mp_alloc(process->mem_pool, app_conf->name.length + + sizeof("\"\" application") + 1); - conf = nxt_conf_json_parse(mp, start, b->mem.free, NULL); + if (nxt_slow_path(process->name == NULL)) { + goto failed; + } + + p = (u_char *) process->name; + *p++ = '"'; + p = nxt_cpymem(p, init->name, app_conf->name.length); + p = nxt_cpymem(p, "\" application", 13); + *p = '\0'; + app_conf->shm_limit = 100 * 1024 * 1024; + + start += app_conf->name.length + 1; + + conf = nxt_conf_json_parse(process->mem_pool, start, b->mem.free, NULL); if (conf == NULL) { nxt_alert(task, "router app configuration parsing error"); @@ -400,44 +342,45 @@ nxt_port_main_start_worker_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) rt = task->thread->runtime; - app_conf.user.start = (u_char*)rt->user_cred.user; - app_conf.user.length = nxt_strlen(rt->user_cred.user); + app_conf->user.start = (u_char*)rt->user_cred.user; + app_conf->user.length = nxt_strlen(rt->user_cred.user); + + ret = nxt_conf_map_object(process->mem_pool, conf, nxt_common_app_conf, + nxt_nitems(nxt_common_app_conf), app_conf); - ret = nxt_conf_map_object(mp, conf, nxt_common_app_conf, - nxt_nitems(nxt_common_app_conf), &app_conf); if (ret != NXT_OK) { nxt_alert(task, "failed to map common app conf received from router"); goto failed; } - for (type_len = 0; type_len != app_conf.type.length; type_len++) { - ch = app_conf.type.start[type_len]; + for (type_len = 0; type_len != app_conf->type.length; type_len++) { + ch = app_conf->type.start[type_len]; if (ch == ' ' || nxt_isdigit(ch)) { break; } } - idx = nxt_app_parse_type(app_conf.type.start, type_len); + idx = nxt_app_parse_type(app_conf->type.start, type_len); if (nxt_slow_path(idx >= nxt_nitems(nxt_app_maps))) { nxt_alert(task, "invalid app type %d received from router", (int) idx); goto failed; } - ret = nxt_conf_map_object(mp, conf, nxt_app_maps[idx].map, - nxt_app_maps[idx].size, &app_conf); + ret = nxt_conf_map_object(process->mem_pool, conf, nxt_app_maps[idx].map, + nxt_app_maps[idx].size, app_conf); if (nxt_slow_path(ret != NXT_OK)) { nxt_alert(task, "failed to map app conf received from router"); goto failed; } - if (app_conf.limits != NULL) { - ret = nxt_conf_map_object(mp, app_conf.limits, + if (app_conf->limits != NULL) { + ret = nxt_conf_map_object(process->mem_pool, app_conf->limits, nxt_common_app_limits_conf, nxt_nitems(nxt_common_app_limits_conf), - &app_conf); + app_conf); if (nxt_slow_path(ret != NXT_OK)) { nxt_alert(task, "failed to map app limits received from router"); @@ -445,40 +388,91 @@ nxt_port_main_start_worker_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) } } - app_conf.self = conf; + app_conf->self = conf; - ret = nxt_main_start_worker_process(task, task->thread->runtime, - &app_conf, msg->port_msg.stream); + process->stream = msg->port_msg.stream; + process->data.app = app_conf; + + ret = nxt_main_start_process(task, process); + if (nxt_fast_path(ret == NXT_OK || ret == NXT_AGAIN)) { + return; + } failed: - if (ret == NXT_ERROR) { - port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid, - msg->port_msg.reply_port); - if (nxt_fast_path(port != NULL)) { - nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, - -1, msg->port_msg.stream, 0, NULL); - } + nxt_process_use(task, process, -1); + + port = nxt_runtime_port_find(rt, msg->port_msg.pid, + msg->port_msg.reply_port); + + if (nxt_fast_path(port != NULL)) { + nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, + -1, msg->port_msg.stream, 0, NULL); } +} - nxt_mp_destroy(mp); + +static void +nxt_main_process_created_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + nxt_port_t *port; + nxt_process_t *process; + nxt_runtime_t *rt; + + rt = task->thread->runtime; + + process = nxt_runtime_process_find(rt, msg->port_msg.pid); + if (nxt_slow_path(process == NULL)) { + return; + } + + nxt_assert(process->state == NXT_PROCESS_STATE_CREATING); + + port = nxt_runtime_port_find(rt, msg->port_msg.pid, + msg->port_msg.reply_port); + + + if (nxt_slow_path(port == NULL)) { + return; + } + +#if (NXT_HAVE_CLONE && NXT_HAVE_CLONE_NEWUSER) + if (nxt_is_clone_flag_set(process->isolation.clone.flags, NEWUSER)) { + if (nxt_slow_path(nxt_clone_credential_map(task, process->pid, + process->user_cred, + &process->isolation.clone) + != NXT_OK)) + { + (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, + -1, msg->port_msg.stream, 0, NULL); + return; + } + } + +#endif + + process->state = NXT_PROCESS_STATE_CREATED; + + (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_READY_LAST, + -1, msg->port_msg.stream, 0, NULL); } static nxt_port_handlers_t nxt_main_process_port_handlers = { - .data = nxt_port_main_data_handler, - .process_ready = nxt_port_process_ready_handler, - .start_worker = nxt_port_main_start_worker_handler, - .socket = nxt_main_port_socket_handler, - .modules = nxt_main_port_modules_handler, - .conf_store = nxt_main_port_conf_store_handler, + .data = nxt_port_main_data_handler, + .process_created = nxt_main_process_created_handler, + .process_ready = nxt_port_process_ready_handler, + .start_process = nxt_port_main_start_process_handler, + .socket = nxt_main_port_socket_handler, + .modules = nxt_main_port_modules_handler, + .conf_store = nxt_main_port_conf_store_handler, #if (NXT_TLS) - .cert_get = nxt_cert_store_get_handler, - .cert_delete = nxt_cert_store_delete_handler, + .cert_get = nxt_cert_store_get_handler, + .cert_delete = nxt_cert_store_delete_handler, #endif - .access_log = nxt_main_port_access_log_handler, - .rpc_ready = nxt_port_rpc_handler, - .rpc_error = nxt_port_rpc_handler, + .access_log = nxt_main_port_access_log_handler, + .rpc_ready = nxt_port_rpc_handler, + .rpc_error = nxt_port_rpc_handler, }; @@ -499,16 +493,17 @@ nxt_main_process_port_create(nxt_task_t *task, nxt_runtime_t *rt) ret = nxt_port_socket_init(task, port, 0); if (nxt_slow_path(ret != NXT_OK)) { + nxt_port_use(task, port, -1); return ret; } /* * A main process port. A write port is not closed - * since it should be inherited by worker processes. + * since it should be inherited by processes. */ nxt_port_enable(task, port, &nxt_main_process_port_handlers); - process->ready = 1; + process->state = NXT_PROCESS_STATE_READY; return NXT_OK; } @@ -541,234 +536,68 @@ nxt_main_process_title(nxt_task_t *task) static nxt_int_t -nxt_main_start_controller_process(nxt_task_t *task, nxt_runtime_t *rt) +nxt_main_process_create(nxt_task_t *task, const nxt_process_init_t init) { - nxt_process_init_t *init; + nxt_int_t ret; + nxt_runtime_t *rt; + nxt_process_t *process; + nxt_process_init_t *pinit; - static const nxt_str_t name = nxt_string("controller"); + rt = task->thread->runtime; - init = nxt_process_init_create(task, NXT_PROCESS_CONTROLLER, &name); - if (nxt_slow_path(init == NULL)) { + process = nxt_main_process_new(task, rt); + if (nxt_slow_path(process == NULL)) { return NXT_ERROR; } - return nxt_main_create_controller_process(task, rt, init);; -} + process->name = init.name; + process->user_cred = &rt->user_cred; + pinit = nxt_process_init(process); + *pinit = init; -static nxt_int_t -nxt_main_create_controller_process(nxt_task_t *task, nxt_runtime_t *rt, - nxt_process_init_t *init) -{ - ssize_t n; - nxt_int_t ret; - nxt_str_t *conf; - nxt_file_t file; - nxt_file_info_t fi; - nxt_controller_init_t ctrl_init; - - nxt_memzero(&ctrl_init, sizeof(nxt_controller_init_t)); - - conf = &ctrl_init.conf; - - nxt_memzero(&file, sizeof(nxt_file_t)); - - file.name = (nxt_file_name_t *) rt->conf; - - ret = nxt_file_open(task, &file, NXT_FILE_RDONLY, NXT_FILE_OPEN, 0); - - if (ret == NXT_OK) { - ret = nxt_file_info(&file, &fi); - - if (nxt_fast_path(ret == NXT_OK && nxt_is_file(&fi))) { - conf->length = nxt_file_size(&fi); - conf->start = nxt_malloc(conf->length); - - if (nxt_slow_path(conf->start == NULL)) { - nxt_file_close(task, &file); - return NXT_ERROR; - } - - n = nxt_file_read(&file, conf->start, conf->length, 0); - - if (nxt_slow_path(n != (ssize_t) conf->length)) { - nxt_free(conf->start); - conf->start = NULL; - - nxt_alert(task, "failed to restore previous configuration: " - "cannot read the file"); - } - } - - nxt_file_close(task, &file); - } - -#if (NXT_TLS) - ctrl_init.certs = nxt_cert_store_load(task); -#endif - - init->data = &ctrl_init; - - ret = nxt_main_create_worker_process(task, rt, init); - - if (ret == NXT_OK) { - if (conf->start != NULL) { - nxt_free(conf->start); - } - -#if (NXT_TLS) - if (ctrl_init.certs != NULL) { - nxt_cert_store_release(ctrl_init.certs); - } -#endif + ret = nxt_main_start_process(task, process); + if (nxt_slow_path(ret == NXT_ERROR)) { + nxt_process_use(task, process, -1); } return ret; } -static nxt_int_t -nxt_main_start_discovery_process(nxt_task_t *task, nxt_runtime_t *rt) +static nxt_process_t * +nxt_main_process_new(nxt_task_t *task, nxt_runtime_t *rt) { - nxt_process_init_t *init; - - static const nxt_str_t name = nxt_string("discovery"); + nxt_process_t *process; - init = nxt_process_init_create(task, NXT_PROCESS_DISCOVERY, &name); - if (nxt_slow_path(init == NULL)) { - return NXT_ERROR; + process = nxt_runtime_process_new(rt); + if (nxt_slow_path(process == NULL)) { + return NULL; } - return nxt_main_create_worker_process(task, rt, init); -} - - -static nxt_int_t -nxt_main_start_router_process(nxt_task_t *task, nxt_runtime_t *rt) -{ - nxt_process_init_t *init; - - static const nxt_str_t name = nxt_string("router"); - - init = nxt_process_init_create(task, NXT_PROCESS_ROUTER, &name); - if (nxt_slow_path(init == NULL)) { - return NXT_ERROR; + process->mem_pool = nxt_mp_create(1024, 128, 256, 32); + if (process->mem_pool == NULL) { + nxt_process_use(task, process, -1); + return NULL; } - return nxt_main_create_router_process(task, rt, init); + return process; } static nxt_int_t -nxt_main_create_router_process(nxt_task_t *task, nxt_runtime_t *rt, - nxt_process_init_t *init) +nxt_main_start_process(nxt_task_t *task, nxt_process_t *process) { - nxt_main_stop_worker_processes(task, rt); - - return nxt_main_create_worker_process(task, rt, init); -} - - -static nxt_int_t -nxt_main_start_worker_process(nxt_task_t *task, nxt_runtime_t *rt, - nxt_common_app_conf_t *app_conf, uint32_t stream) -{ - nxt_int_t cap_setid; + nxt_mp_t *tmp_mp; nxt_int_t ret; + nxt_pid_t pid; + nxt_port_t *port; nxt_process_init_t *init; - init = nxt_process_init_create(task, NXT_PROCESS_WORKER, &app_conf->name); - if (nxt_slow_path(init == NULL)) { - return NXT_ERROR; - } - - cap_setid = rt->capabilities.setid; - - if (app_conf->isolation != NULL) { - ret = nxt_init_isolation(task, app_conf->isolation, init); - if (nxt_slow_path(ret != NXT_OK)) { - goto fail; - } - } - -#if (NXT_HAVE_CLONE_NEWUSER) - if (NXT_CLONE_USER(init->isolation.clone.flags)) { - cap_setid = 1; - } -#endif - - if (cap_setid) { - ret = nxt_process_init_creds_set(task, init, &app_conf->user, - &app_conf->group); - if (nxt_slow_path(ret != NXT_OK)) { - goto fail; - } - - } else { - if (!nxt_str_eq(&app_conf->user, (u_char *) rt->user_cred.user, - nxt_strlen(rt->user_cred.user))) - { - nxt_alert(task, "cannot set user \"%V\" for app \"%V\": " - "missing capabilities", &app_conf->user, &app_conf->name); - goto fail; - } - - if (app_conf->group.length > 0 - && !nxt_str_eq(&app_conf->group, (u_char *) rt->group, - nxt_strlen(rt->group))) - { - nxt_alert(task, "cannot set group \"%V\" for app \"%V\": " - "missing capabilities", &app_conf->group, - &app_conf->name); - goto fail; - } - } - - init->data = app_conf; - init->stream = stream; - -#if (NXT_HAVE_CLONE_NEWUSER) - ret = nxt_init_vldt_isolation_creds(task, init); - if (nxt_slow_path(ret != NXT_OK)) { - goto fail; - } -#endif - - return nxt_main_create_worker_process(task, rt, init); - -fail: - - nxt_mp_destroy(init->mem_pool); - - return NXT_ERROR; -} - - -nxt_int_t -nxt_main_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt, - nxt_process_init_t *init) -{ - nxt_int_t ret; - nxt_pid_t pid; - nxt_port_t *port; - nxt_process_t *process; - - /* - * TODO: remove process, init, ports from array on memory and fork failures. - */ - - process = nxt_runtime_process_new(rt); - if (nxt_slow_path(process == NULL)) { - nxt_mp_destroy(init->mem_pool); - - return NXT_ERROR; - } - - process->init = init; + init = nxt_process_init(process); port = nxt_port_new(task, 0, 0, init->type); if (nxt_slow_path(port == NULL)) { - nxt_process_use(task, process, -1); return NXT_ERROR; } @@ -776,10 +605,24 @@ nxt_main_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt, nxt_process_use(task, process, -1); + ret = NXT_ERROR; + tmp_mp = NULL; + ret = nxt_port_socket_init(task, port, 0); if (nxt_slow_path(ret != NXT_OK)) { - nxt_port_use(task, port, -1); - return ret; + goto fail; + } + + tmp_mp = nxt_mp_create(1024, 128, 256, 32); + if (tmp_mp == NULL) { + goto fail; + } + + if (init->prefork) { + ret = init->prefork(task, process, tmp_mp); + if (nxt_slow_path(ret != NXT_OK)) { + goto fail; + } } pid = nxt_process_create(task, process); @@ -788,15 +631,13 @@ nxt_main_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt, case -1: nxt_port_close(task, port); - nxt_port_use(task, port, -1); - - return NXT_ERROR; + break; case 0: - /* A worker process, return to the event engine work queue loop. */ - nxt_port_use(task, port, -1); + /* The child process: return to the event engine work queue loop. */ - return NXT_AGAIN; + ret = NXT_AGAIN; + break; default: /* The main process created a new process. */ @@ -804,35 +645,22 @@ nxt_main_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt, nxt_port_read_close(port); nxt_port_write_enable(task, port); - nxt_port_use(task, port, -1); - - return NXT_OK; + ret = NXT_OK; + break; } -} - -void -nxt_main_stop_all_processes(nxt_task_t *task, nxt_runtime_t *rt) -{ - nxt_port_t *port; - nxt_process_t *process; - - nxt_runtime_process_each(rt, process) { - - if (nxt_pid != process->pid) { - nxt_process_port_each(process, port) { +fail: - (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, - -1, 0, 0, NULL); + nxt_port_use(task, port, -1); - } nxt_process_port_loop; - } + if (nxt_fast_path(tmp_mp != NULL)) { + nxt_mp_destroy(tmp_mp); + } - } nxt_runtime_process_loop; + return ret; } - static void nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj, void *data) { @@ -1009,7 +837,7 @@ nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data) pid, WEXITSTATUS(status)); } - nxt_main_cleanup_worker_process(task, pid); + nxt_main_cleanup_process(task, pid); } } @@ -1023,102 +851,81 @@ nxt_main_process_signal_handler(nxt_task_t *task, void *obj, void *data) static void -nxt_main_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid) +nxt_main_cleanup_process(nxt_task_t *task, nxt_pid_t pid) { - nxt_buf_t *buf; - nxt_port_t *port; - nxt_runtime_t *rt; - nxt_process_t *process; - nxt_process_type_t ptype; - nxt_process_init_t *init; - nxt_process_restart_t restart; + int stream; + nxt_int_t ret; + nxt_buf_t *buf; + nxt_port_t *port; + const char *name; + nxt_runtime_t *rt; + nxt_process_t *process; + nxt_process_init_t init; rt = task->thread->runtime; process = nxt_runtime_process_find(rt, pid); + if (!process) { + return; + } - if (process) { - init = process->init; - process->init = NULL; + name = process->name; + stream = process->stream; + init = *((nxt_process_init_t *) nxt_process_init(process)); - ptype = nxt_process_type(process); - restart = nxt_process_restarts[ptype]; + if (process->state == NXT_PROCESS_STATE_READY) { + process->stream = 0; + } - if (process->ready) { - init->stream = 0; - } + nxt_process_close_ports(task, process); - nxt_process_close_ports(task, process); + if (nxt_exiting) { + if (rt->nprocesses <= 1) { + nxt_runtime_quit(task, 0); + } - if (nxt_exiting) { - nxt_mp_destroy(init->mem_pool); + return; + } - if (rt->nprocesses <= 2) { - nxt_runtime_quit(task, 0); - } + nxt_runtime_process_each(rt, process) { - return; + if (process->pid == nxt_pid + || process->pid == pid + || nxt_queue_is_empty(&process->ports)) + { + continue; } - nxt_runtime_process_each(rt, process) { + port = nxt_process_port_first(process); - if (process->pid == nxt_pid - || process->pid == pid - || nxt_queue_is_empty(&process->ports)) - { - continue; - } - - port = nxt_process_port_first(process); + if (nxt_proc_remove_notify_matrix[init.type][port->type] == 0) { + continue; + } - if (nxt_proc_remove_notify_matrix[ptype][port->type] == 0) { - continue; - } + buf = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, + sizeof(pid)); - buf = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, - sizeof(pid)); - if (nxt_slow_path(buf == NULL)) { - continue; - } + if (nxt_slow_path(buf == NULL)) { + continue; + } - buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(pid)); + buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(pid)); - nxt_port_socket_write(task, port, NXT_PORT_MSG_REMOVE_PID, - -1, init->stream, 0, buf); - } nxt_runtime_process_loop; + nxt_port_socket_write(task, port, NXT_PORT_MSG_REMOVE_PID, -1, + stream, 0, buf); - if (restart != NULL) { - restart(task, rt, init); + } nxt_runtime_process_loop; - } else { - nxt_mp_destroy(init->mem_pool); + if (init.restart) { + ret = nxt_main_process_create(task, init); + if (nxt_slow_path(ret == NXT_ERROR)) { + nxt_alert(task, "failed to restart %s", name); } } } static void -nxt_main_stop_worker_processes(nxt_task_t *task, nxt_runtime_t *rt) -{ - nxt_port_t *port; - nxt_process_t *process; - - nxt_runtime_process_each(rt, process) { - - nxt_process_port_each(process, port) { - - if (port->type == NXT_PROCESS_WORKER) { - (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, - -1, 0, 0, NULL); - } - - } nxt_process_port_loop; - - } nxt_runtime_process_loop; -} - - -static void nxt_main_port_socket_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { size_t size; @@ -1419,10 +1226,15 @@ fail: nxt_mp_destroy(mp); - ret = nxt_main_start_controller_process(task, rt); - + ret = nxt_main_process_create(task, nxt_controller_process); if (ret == NXT_OK) { - (void) nxt_main_start_router_process(task, rt); + ret = nxt_main_process_create(task, nxt_router_process); + } + + if (nxt_slow_path(ret == NXT_ERROR)) { + nxt_exiting = 1; + + nxt_runtime_quit(task, 1); } } @@ -1534,362 +1346,3 @@ nxt_main_port_access_log_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) msg->port_msg.stream, 0, NULL); } } - - -static nxt_int_t -nxt_init_isolation(nxt_task_t *task, nxt_conf_value_t *isolation, - nxt_process_init_t *init) -{ -#if (NXT_HAVE_CLONE) - nxt_int_t ret; - nxt_conf_value_t *obj; - - static nxt_str_t nsname = nxt_string("namespaces"); - - obj = nxt_conf_get_object_member(isolation, &nsname, NULL); - if (obj != NULL) { - ret = nxt_init_clone_flags(task, obj, init); - if (nxt_slow_path(ret != NXT_OK)) { - return NXT_ERROR; - } - } -#endif - -#if (NXT_HAVE_CLONE_NEWUSER) - ret = nxt_init_isolation_creds(task, isolation, init); - if (nxt_slow_path(ret != NXT_OK)) { - return NXT_ERROR; - } -#endif - - return NXT_OK; -} - - -#if (NXT_HAVE_CLONE_NEWUSER) - -static nxt_int_t -nxt_init_isolation_creds(nxt_task_t *task, nxt_conf_value_t *isolation, - nxt_process_init_t *init) -{ - nxt_int_t ret; - nxt_clone_t *clone; - nxt_conf_value_t *array; - - static nxt_str_t uidname = nxt_string("uidmap"); - static nxt_str_t gidname = nxt_string("gidmap"); - - clone = &init->isolation.clone; - - array = nxt_conf_get_object_member(isolation, &uidname, NULL); - if (array != NULL) { - ret = nxt_init_isolation_credential_map(task, init->mem_pool, array, - &clone->uidmap); - - if (nxt_slow_path(ret != NXT_OK)) { - return NXT_ERROR; - } - } - - array = nxt_conf_get_object_member(isolation, &gidname, NULL); - if (array != NULL) { - ret = nxt_init_isolation_credential_map(task, init->mem_pool, array, - &clone->gidmap); - - if (nxt_slow_path(ret != NXT_OK)) { - return NXT_ERROR; - } - } - - return NXT_OK; -} - - -static nxt_int_t -nxt_init_vldt_isolation_creds(nxt_task_t *task, nxt_process_init_t *init) -{ - nxt_int_t ret; - nxt_clone_t *clone; - - clone = &init->isolation.clone; - - if (clone->uidmap.size == 0 && clone->gidmap.size == 0) { - return NXT_OK; - } - - if (!NXT_CLONE_USER(clone->flags)) { - if (nxt_slow_path(clone->uidmap.size > 0)) { - nxt_log(task, NXT_LOG_ERR, "\"uidmap\" is set but " - "\"isolation.namespaces.credential\" is false or unset"); - - return NXT_ERROR; - } - - if (nxt_slow_path(clone->gidmap.size > 0)) { - nxt_log(task, NXT_LOG_ERR, "\"gidmap\" is set but " - "\"isolation.namespaces.credential\" is false or unset"); - - return NXT_ERROR; - } - - return NXT_OK; - } - - ret = nxt_clone_vldt_credential_uidmap(task, &clone->uidmap, - init->user_cred); - - if (nxt_slow_path(ret != NXT_OK)) { - return NXT_ERROR; - } - - return nxt_clone_vldt_credential_gidmap(task, &clone->gidmap, - init->user_cred); -} - - -static nxt_int_t -nxt_init_isolation_credential_map(nxt_task_t *task, nxt_mp_t *mem_pool, - nxt_conf_value_t *map_array, nxt_clone_credential_map_t *map) -{ - nxt_int_t ret; - nxt_uint_t i; - nxt_conf_value_t *obj; - - static nxt_conf_map_t nxt_clone_map_entry_conf[] = { - { - nxt_string("container"), - NXT_CONF_MAP_INT, - offsetof(nxt_clone_map_entry_t, container), - }, - - { - nxt_string("host"), - NXT_CONF_MAP_INT, - offsetof(nxt_clone_map_entry_t, host), - }, - - { - nxt_string("size"), - NXT_CONF_MAP_INT, - offsetof(nxt_clone_map_entry_t, size), - }, - }; - - map->size = nxt_conf_array_elements_count(map_array); - - if (map->size == 0) { - return NXT_OK; - } - - map->map = nxt_mp_alloc(mem_pool, - map->size * sizeof(nxt_clone_map_entry_t)); - if (nxt_slow_path(map->map == NULL)) { - return NXT_ERROR; - } - - for (i = 0; i < map->size; i++) { - obj = nxt_conf_get_array_element(map_array, i); - - ret = nxt_conf_map_object(mem_pool, obj, nxt_clone_map_entry_conf, - nxt_nitems(nxt_clone_map_entry_conf), - map->map + i); - if (nxt_slow_path(ret != NXT_OK)) { - nxt_alert(task, "clone map entry map error"); - return NXT_ERROR; - } - } - - return NXT_OK; -} - -#endif - -#if (NXT_HAVE_CLONE) - -static nxt_int_t -nxt_init_clone_flags(nxt_task_t *task, nxt_conf_value_t *namespaces, - nxt_process_init_t *init) -{ - uint32_t index; - nxt_str_t name; - nxt_int_t flag; - nxt_conf_value_t *value; - - index = 0; - - for ( ;; ) { - value = nxt_conf_next_object_member(namespaces, &name, &index); - - if (value == NULL) { - break; - } - - flag = 0; - -#if (NXT_HAVE_CLONE_NEWUSER) - if (nxt_str_eq(&name, "credential", 10)) { - flag = CLONE_NEWUSER; - } -#endif - -#if (NXT_HAVE_CLONE_NEWPID) - if (nxt_str_eq(&name, "pid", 3)) { - flag = CLONE_NEWPID; - } -#endif - -#if (NXT_HAVE_CLONE_NEWNET) - if (nxt_str_eq(&name, "network", 7)) { - flag = CLONE_NEWNET; - } -#endif - -#if (NXT_HAVE_CLONE_NEWUTS) - if (nxt_str_eq(&name, "uname", 5)) { - flag = CLONE_NEWUTS; - } -#endif - -#if (NXT_HAVE_CLONE_NEWNS) - if (nxt_str_eq(&name, "mount", 5)) { - flag = CLONE_NEWNS; - } -#endif - -#if (NXT_HAVE_CLONE_NEWCGROUP) - if (nxt_str_eq(&name, "cgroup", 6)) { - flag = CLONE_NEWCGROUP; - } -#endif - - if (!flag) { - nxt_alert(task, "unknown namespace flag: \"%V\"", &name); - return NXT_ERROR; - } - - if (nxt_conf_get_boolean(value)) { - init->isolation.clone.flags |= flag; - } - } - - return NXT_OK; -} - -#endif - - -static nxt_process_init_t * -nxt_process_init_create(nxt_task_t *task, nxt_process_type_t type, - const nxt_str_t *name) -{ - nxt_mp_t *mp; - nxt_int_t ret; - nxt_runtime_t *rt; - nxt_process_init_t *init; - - mp = nxt_mp_create(1024, 128, 256, 32); - if (nxt_slow_path(mp == NULL)) { - return NULL; - } - - init = nxt_mp_zalloc(mp, sizeof(nxt_process_init_t)); - if (nxt_slow_path(init == NULL)) { - goto fail; - } - - init->mem_pool = mp; - - ret = nxt_process_init_name_set(init, type, name); - if (nxt_slow_path(ret != NXT_OK)) { - goto fail; - } - - rt = task->thread->runtime; - - init->type = type; - init->start = nxt_process_starts[type]; - init->port_handlers = nxt_process_port_handlers[type]; - init->signals = nxt_worker_process_signals; - init->user_cred = &rt->user_cred; - init->data = &rt; - - return init; - -fail: - - nxt_mp_destroy(mp); - - return NULL; -} - - -static nxt_int_t -nxt_process_init_name_set(nxt_process_init_t *init, nxt_process_type_t type, - const nxt_str_t *name) -{ - u_char *str, *end; - size_t size; - const char *fmt; - - size = name->length + 1; - - if (type == NXT_PROCESS_WORKER) { - size += nxt_length("\"\" application"); - fmt = "\"%V\" application%Z"; - - } else { - fmt = "%V%Z"; - } - - str = nxt_mp_alloc(init->mem_pool, size); - if (nxt_slow_path(str == NULL)) { - return NXT_ERROR; - } - - end = str + size; - - nxt_sprintf(str, end, fmt, name); - - init->name = (char *) str; - - return NXT_OK; -} - - -static nxt_int_t -nxt_process_init_creds_set(nxt_task_t *task, nxt_process_init_t *init, - nxt_str_t *user, nxt_str_t *group) -{ - char *str; - - init->user_cred = nxt_mp_zalloc(init->mem_pool, sizeof(nxt_credential_t)); - - if (nxt_slow_path(init->user_cred == NULL)) { - return NXT_ERROR; - } - - str = nxt_mp_zalloc(init->mem_pool, user->length + 1); - if (nxt_slow_path(str == NULL)) { - return NXT_ERROR; - } - - nxt_memcpy(str, user->start, user->length); - str[user->length] = '\0'; - - init->user_cred->user = str; - - if (group->start != NULL) { - str = nxt_mp_zalloc(init->mem_pool, group->length + 1); - if (nxt_slow_path(str == NULL)) { - return NXT_ERROR; - } - - nxt_memcpy(str, group->start, group->length); - str[group->length] = '\0'; - - } else { - str = NULL; - } - - return nxt_credential_get(task, init->mem_pool, init->user_cred, str); -} diff --git a/src/nxt_main_process.h b/src/nxt_main_process.h index b0570a84..f9c974d8 100644 --- a/src/nxt_main_process.h +++ b/src/nxt_main_process.h @@ -19,26 +19,17 @@ typedef enum { } nxt_socket_error_t; -typedef struct { - nxt_str_t conf; -#if (NXT_TLS) - nxt_array_t *certs; -#endif -} nxt_controller_init_t; - - nxt_int_t nxt_main_process_start(nxt_thread_t *thr, nxt_task_t *task, nxt_runtime_t *runtime); -void nxt_main_stop_all_processes(nxt_task_t *task, nxt_runtime_t *runtime); -nxt_int_t nxt_controller_start(nxt_task_t *task, void *data); -nxt_int_t nxt_router_start(nxt_task_t *task, void *data); -nxt_int_t nxt_discovery_start(nxt_task_t *task, void *data); -nxt_int_t nxt_app_start(nxt_task_t *task, void *data); +NXT_EXPORT extern const nxt_process_init_t nxt_discovery_process; +NXT_EXPORT extern const nxt_process_init_t nxt_controller_process; +NXT_EXPORT extern const nxt_process_init_t nxt_router_process; +NXT_EXPORT extern const nxt_process_init_t nxt_app_process; extern const nxt_sig_event_t nxt_main_process_signals[]; -extern const nxt_sig_event_t nxt_worker_process_signals[]; +extern const nxt_sig_event_t nxt_process_signals[]; #endif /* _NXT_MAIN_PROCESS_H_INCLUDED_ */ diff --git a/src/nxt_php_sapi.c b/src/nxt_php_sapi.c index d3c23c31..ddad5761 100644 --- a/src/nxt_php_sapi.c +++ b/src/nxt_php_sapi.c @@ -72,7 +72,7 @@ typedef void (*zif_handler)(INTERNAL_FUNCTION_PARAMETERS); #endif -static nxt_int_t nxt_php_init(nxt_task_t *task, nxt_common_app_conf_t *conf); +static nxt_int_t nxt_php_start(nxt_task_t *task, nxt_process_data_t *data); static nxt_int_t nxt_php_set_target(nxt_task_t *task, nxt_php_target_t *target, nxt_conf_value_t *conf); static void nxt_php_set_options(nxt_task_t *task, nxt_conf_value_t *options, @@ -242,7 +242,7 @@ NXT_EXPORT nxt_app_module_t nxt_app_module = { nxt_string("php"), PHP_VERSION, NULL, - nxt_php_init, + nxt_php_start, }; @@ -256,19 +256,20 @@ static void ***tsrm_ls; static nxt_int_t -nxt_php_init(nxt_task_t *task, nxt_common_app_conf_t *conf) +nxt_php_start(nxt_task_t *task, nxt_process_data_t *data) { - u_char *p; - uint32_t next; - nxt_str_t ini_path, name; - nxt_int_t ret; - nxt_uint_t n; - nxt_port_t *my_port, *main_port; - nxt_runtime_t *rt; - nxt_unit_ctx_t *unit_ctx; - nxt_unit_init_t php_init; - nxt_conf_value_t *value; - nxt_php_app_conf_t *c; + u_char *p; + uint32_t next; + nxt_str_t ini_path, name; + nxt_int_t ret; + nxt_uint_t n; + nxt_port_t *my_port, *main_port; + nxt_runtime_t *rt; + nxt_unit_ctx_t *unit_ctx; + nxt_unit_init_t php_init; + nxt_conf_value_t *value; + nxt_php_app_conf_t *c; + nxt_common_app_conf_t *conf; static nxt_str_t file_str = nxt_string("file"); static nxt_str_t user_str = nxt_string("user"); @@ -276,6 +277,7 @@ nxt_php_init(nxt_task_t *task, nxt_common_app_conf_t *conf) nxt_php_task = task; + conf = data->app; c = &conf->u.php; n = (c->targets != NULL) ? nxt_conf_object_members_count(c->targets) : 1; @@ -385,7 +387,7 @@ nxt_php_init(nxt_task_t *task, nxt_common_app_conf_t *conf) nxt_fd_blocking(task, main_port->pair[1]); - php_init.ready_stream = my_port->process->init->stream; + php_init.ready_stream = my_port->process->stream; php_init.read_port.id.pid = my_port->pid; php_init.read_port.id.id = my_port->id; diff --git a/src/nxt_port.c b/src/nxt_port.c index 70cf33e6..7232c465 100644 --- a/src/nxt_port.c +++ b/src/nxt_port.c @@ -296,7 +296,9 @@ nxt_port_process_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) return; } - process->ready = 1; + nxt_assert(process->state != NXT_PROCESS_STATE_READY); + + process->state = NXT_PROCESS_STATE_READY; nxt_assert(!nxt_queue_is_empty(&process->ports)); diff --git a/src/nxt_port.h b/src/nxt_port.h index c6f15238..0e8707f3 100644 --- a/src/nxt_port.h +++ b/src/nxt_port.h @@ -14,7 +14,7 @@ struct nxt_port_handlers_s { nxt_port_handler_t rpc_error; /* Main process RPC requests. */ - nxt_port_handler_t start_worker; + nxt_port_handler_t start_process; nxt_port_handler_t socket; nxt_port_handler_t modules; nxt_port_handler_t conf_store; @@ -27,7 +27,8 @@ struct nxt_port_handlers_s { nxt_port_handler_t new_port; nxt_port_handler_t mmap; - /* New process ready. */ + /* New process */ + nxt_port_handler_t process_created; nxt_port_handler_t process_ready; /* Process exit/crash notification. */ @@ -53,76 +54,76 @@ struct nxt_port_handlers_s { #define nxt_port_handler_idx(name) \ ( offsetof(nxt_port_handlers_t, name) / sizeof(nxt_port_handler_t) ) +#define nxt_msg_last(handler) \ + (handler | NXT_PORT_MSG_LAST) typedef enum { - NXT_PORT_MSG_LAST = 0x100, - NXT_PORT_MSG_CLOSE_FD = 0x200, - NXT_PORT_MSG_SYNC = 0x400, - - NXT_PORT_MSG_MASK = 0xFF, - - _NXT_PORT_MSG_RPC_READY = nxt_port_handler_idx(rpc_ready), - _NXT_PORT_MSG_RPC_ERROR = nxt_port_handler_idx(rpc_error), - - _NXT_PORT_MSG_START_WORKER = nxt_port_handler_idx(start_worker), - _NXT_PORT_MSG_SOCKET = nxt_port_handler_idx(socket), - _NXT_PORT_MSG_MODULES = nxt_port_handler_idx(modules), - _NXT_PORT_MSG_CONF_STORE = nxt_port_handler_idx(conf_store), - _NXT_PORT_MSG_CERT_GET = nxt_port_handler_idx(cert_get), - _NXT_PORT_MSG_CERT_DELETE = nxt_port_handler_idx(cert_delete), - _NXT_PORT_MSG_ACCESS_LOG = nxt_port_handler_idx(access_log), - - _NXT_PORT_MSG_CHANGE_FILE = nxt_port_handler_idx(change_file), - _NXT_PORT_MSG_NEW_PORT = nxt_port_handler_idx(new_port), - _NXT_PORT_MSG_MMAP = nxt_port_handler_idx(mmap), - - _NXT_PORT_MSG_PROCESS_READY = nxt_port_handler_idx(process_ready), - _NXT_PORT_MSG_REMOVE_PID = nxt_port_handler_idx(remove_pid), - _NXT_PORT_MSG_QUIT = nxt_port_handler_idx(quit), - - _NXT_PORT_MSG_REQ_HEADERS = nxt_port_handler_idx(req_headers), - _NXT_PORT_MSG_WEBSOCKET = nxt_port_handler_idx(websocket_frame), - - _NXT_PORT_MSG_DATA = nxt_port_handler_idx(data), - - _NXT_PORT_MSG_OOSM = nxt_port_handler_idx(oosm), - _NXT_PORT_MSG_SHM_ACK = nxt_port_handler_idx(shm_ack), - - NXT_PORT_MSG_MAX = sizeof(nxt_port_handlers_t) - / sizeof(nxt_port_handler_t), - - NXT_PORT_MSG_RPC_READY = _NXT_PORT_MSG_RPC_READY, - NXT_PORT_MSG_RPC_READY_LAST = _NXT_PORT_MSG_RPC_READY | NXT_PORT_MSG_LAST, - NXT_PORT_MSG_RPC_ERROR = _NXT_PORT_MSG_RPC_ERROR | NXT_PORT_MSG_LAST, - - NXT_PORT_MSG_START_WORKER = _NXT_PORT_MSG_START_WORKER - | NXT_PORT_MSG_LAST, - NXT_PORT_MSG_SOCKET = _NXT_PORT_MSG_SOCKET | NXT_PORT_MSG_LAST, - NXT_PORT_MSG_MODULES = _NXT_PORT_MSG_MODULES | NXT_PORT_MSG_LAST, - NXT_PORT_MSG_CONF_STORE = _NXT_PORT_MSG_CONF_STORE | NXT_PORT_MSG_LAST, - NXT_PORT_MSG_CERT_GET = _NXT_PORT_MSG_CERT_GET | NXT_PORT_MSG_LAST, - NXT_PORT_MSG_CERT_DELETE = _NXT_PORT_MSG_CERT_DELETE | NXT_PORT_MSG_LAST, - NXT_PORT_MSG_ACCESS_LOG = _NXT_PORT_MSG_ACCESS_LOG | NXT_PORT_MSG_LAST, - - NXT_PORT_MSG_CHANGE_FILE = _NXT_PORT_MSG_CHANGE_FILE | NXT_PORT_MSG_LAST, - NXT_PORT_MSG_NEW_PORT = _NXT_PORT_MSG_NEW_PORT | NXT_PORT_MSG_LAST, - NXT_PORT_MSG_MMAP = _NXT_PORT_MSG_MMAP | NXT_PORT_MSG_LAST - | NXT_PORT_MSG_CLOSE_FD | NXT_PORT_MSG_SYNC, - - NXT_PORT_MSG_PROCESS_READY = _NXT_PORT_MSG_PROCESS_READY - | NXT_PORT_MSG_LAST, - NXT_PORT_MSG_QUIT = _NXT_PORT_MSG_QUIT | NXT_PORT_MSG_LAST, - NXT_PORT_MSG_REMOVE_PID = _NXT_PORT_MSG_REMOVE_PID | NXT_PORT_MSG_LAST, - - NXT_PORT_MSG_REQ_HEADERS = _NXT_PORT_MSG_REQ_HEADERS, - NXT_PORT_MSG_WEBSOCKET = _NXT_PORT_MSG_WEBSOCKET, - NXT_PORT_MSG_WEBSOCKET_LAST = _NXT_PORT_MSG_WEBSOCKET | NXT_PORT_MSG_LAST, - - NXT_PORT_MSG_DATA = _NXT_PORT_MSG_DATA, - NXT_PORT_MSG_DATA_LAST = _NXT_PORT_MSG_DATA | NXT_PORT_MSG_LAST, - - NXT_PORT_MSG_OOSM = _NXT_PORT_MSG_OOSM | NXT_PORT_MSG_LAST, - NXT_PORT_MSG_SHM_ACK = _NXT_PORT_MSG_SHM_ACK | NXT_PORT_MSG_LAST, + NXT_PORT_MSG_LAST = 0x100, + NXT_PORT_MSG_CLOSE_FD = 0x200, + NXT_PORT_MSG_SYNC = 0x400, + + NXT_PORT_MSG_MASK = 0xFF, + + _NXT_PORT_MSG_RPC_READY = nxt_port_handler_idx(rpc_ready), + _NXT_PORT_MSG_RPC_ERROR = nxt_port_handler_idx(rpc_error), + + _NXT_PORT_MSG_START_PROCESS = nxt_port_handler_idx(start_process), + _NXT_PORT_MSG_SOCKET = nxt_port_handler_idx(socket), + _NXT_PORT_MSG_MODULES = nxt_port_handler_idx(modules), + _NXT_PORT_MSG_CONF_STORE = nxt_port_handler_idx(conf_store), + _NXT_PORT_MSG_CERT_GET = nxt_port_handler_idx(cert_get), + _NXT_PORT_MSG_CERT_DELETE = nxt_port_handler_idx(cert_delete), + _NXT_PORT_MSG_ACCESS_LOG = nxt_port_handler_idx(access_log), + + _NXT_PORT_MSG_CHANGE_FILE = nxt_port_handler_idx(change_file), + _NXT_PORT_MSG_NEW_PORT = nxt_port_handler_idx(new_port), + _NXT_PORT_MSG_MMAP = nxt_port_handler_idx(mmap), + + _NXT_PORT_MSG_PROCESS_CREATED = nxt_port_handler_idx(process_created), + _NXT_PORT_MSG_PROCESS_READY = nxt_port_handler_idx(process_ready), + _NXT_PORT_MSG_REMOVE_PID = nxt_port_handler_idx(remove_pid), + _NXT_PORT_MSG_QUIT = nxt_port_handler_idx(quit), + + _NXT_PORT_MSG_REQ_HEADERS = nxt_port_handler_idx(req_headers), + _NXT_PORT_MSG_WEBSOCKET = nxt_port_handler_idx(websocket_frame), + + _NXT_PORT_MSG_DATA = nxt_port_handler_idx(data), + + _NXT_PORT_MSG_OOSM = nxt_port_handler_idx(oosm), + _NXT_PORT_MSG_SHM_ACK = nxt_port_handler_idx(shm_ack), + + NXT_PORT_MSG_MAX = sizeof(nxt_port_handlers_t) + / sizeof(nxt_port_handler_t), + + NXT_PORT_MSG_RPC_READY = _NXT_PORT_MSG_RPC_READY, + NXT_PORT_MSG_RPC_READY_LAST = nxt_msg_last(_NXT_PORT_MSG_RPC_READY), + NXT_PORT_MSG_RPC_ERROR = nxt_msg_last(_NXT_PORT_MSG_RPC_ERROR), + NXT_PORT_MSG_START_PROCESS = nxt_msg_last(_NXT_PORT_MSG_START_PROCESS), + NXT_PORT_MSG_SOCKET = nxt_msg_last(_NXT_PORT_MSG_SOCKET), + NXT_PORT_MSG_MODULES = nxt_msg_last(_NXT_PORT_MSG_MODULES), + NXT_PORT_MSG_CONF_STORE = nxt_msg_last(_NXT_PORT_MSG_CONF_STORE), + NXT_PORT_MSG_CERT_GET = nxt_msg_last(_NXT_PORT_MSG_CERT_GET), + NXT_PORT_MSG_CERT_DELETE = nxt_msg_last(_NXT_PORT_MSG_CERT_DELETE), + NXT_PORT_MSG_ACCESS_LOG = nxt_msg_last(_NXT_PORT_MSG_ACCESS_LOG), + NXT_PORT_MSG_CHANGE_FILE = nxt_msg_last(_NXT_PORT_MSG_CHANGE_FILE), + NXT_PORT_MSG_NEW_PORT = nxt_msg_last(_NXT_PORT_MSG_NEW_PORT), + NXT_PORT_MSG_MMAP = nxt_msg_last(_NXT_PORT_MSG_MMAP) + | NXT_PORT_MSG_CLOSE_FD | NXT_PORT_MSG_SYNC, + + NXT_PORT_MSG_PROCESS_CREATED = nxt_msg_last(_NXT_PORT_MSG_PROCESS_CREATED), + NXT_PORT_MSG_PROCESS_READY = nxt_msg_last(_NXT_PORT_MSG_PROCESS_READY), + NXT_PORT_MSG_QUIT = nxt_msg_last(_NXT_PORT_MSG_QUIT), + NXT_PORT_MSG_REMOVE_PID = nxt_msg_last(_NXT_PORT_MSG_REMOVE_PID), + + NXT_PORT_MSG_REQ_HEADERS = _NXT_PORT_MSG_REQ_HEADERS, + NXT_PORT_MSG_WEBSOCKET = _NXT_PORT_MSG_WEBSOCKET, + NXT_PORT_MSG_WEBSOCKET_LAST = nxt_msg_last(_NXT_PORT_MSG_WEBSOCKET), + + NXT_PORT_MSG_DATA = _NXT_PORT_MSG_DATA, + NXT_PORT_MSG_DATA_LAST = nxt_msg_last(_NXT_PORT_MSG_DATA), + + NXT_PORT_MSG_OOSM = nxt_msg_last(_NXT_PORT_MSG_OOSM), + NXT_PORT_MSG_SHM_ACK = nxt_msg_last(_NXT_PORT_MSG_SHM_ACK), } nxt_port_msg_type_t; diff --git a/src/nxt_port_memory.c b/src/nxt_port_memory.c index 33d3777e..f4d2125c 100644 --- a/src/nxt_port_memory.c +++ b/src/nxt_port_memory.c @@ -174,7 +174,7 @@ complete_buf: if (process != NULL && !nxt_queue_is_empty(&process->ports)) { port = nxt_process_port_first(process); - if (port->type == NXT_PROCESS_WORKER) { + if (port->type == NXT_PROCESS_APP) { (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_SHM_ACK, -1, 0, 0, NULL); } diff --git a/src/nxt_process.c b/src/nxt_process.c index f5959edf..e84549b3 100644 --- a/src/nxt_process.c +++ b/src/nxt_process.c @@ -13,9 +13,18 @@ #include <signal.h> -static void nxt_process_start(nxt_task_t *task, nxt_process_t *process); -static nxt_int_t nxt_process_worker_setup(nxt_task_t *task, - nxt_process_t *process, int parentfd); +static nxt_int_t nxt_process_setup(nxt_task_t *task, nxt_process_t *process); +static nxt_int_t nxt_process_child_fixup(nxt_task_t *task, + nxt_process_t *process); +static nxt_int_t nxt_process_send_created(nxt_task_t *task, + nxt_process_t *process); +static nxt_int_t nxt_process_send_ready(nxt_task_t *task, + nxt_process_t *process); +static void nxt_process_created_ok(nxt_task_t *task, nxt_port_recv_msg_t *msg, + void *data); +static void nxt_process_created_error(nxt_task_t *task, + nxt_port_recv_msg_t *msg, void *data); + /* A cached process pid. */ nxt_pid_t nxt_pid; @@ -47,62 +56,58 @@ nxt_bool_t nxt_proc_remove_notify_matrix[NXT_PROCESS_MAX][NXT_PROCESS_MAX] = { static nxt_int_t -nxt_process_worker_setup(nxt_task_t *task, nxt_process_t *process, int parentfd) +nxt_process_child_fixup(nxt_task_t *task, nxt_process_t *process) { - pid_t rpid, pid; - ssize_t n; - nxt_int_t parent_status; nxt_process_t *p; nxt_runtime_t *rt; nxt_process_init_t *init; nxt_process_type_t ptype; - pid = getpid(); - rpid = 0; - rt = task->thread->runtime; - init = process->init; + init = nxt_process_init(process); - /* Setup the worker process. */ + nxt_pid = nxt_getpid(); - n = read(parentfd, &rpid, sizeof(rpid)); - if (nxt_slow_path(n == -1 || n != sizeof(rpid))) { - nxt_alert(task, "failed to read real pid"); - return NXT_ERROR; - } - - if (nxt_slow_path(rpid == 0)) { - nxt_alert(task, "failed to get real pid from parent"); - return NXT_ERROR; - } - - nxt_pid = rpid; + process->pid = nxt_pid; /* Clean inherited cached thread tid. */ task->thread->tid = 0; - process->pid = nxt_pid; +#if (NXT_HAVE_CLONE && NXT_HAVE_CLONE_NEWPID) + if (nxt_is_clone_flag_set(process->isolation.clone.flags, NEWPID)) { + ssize_t pidsz; + char procpid[10]; - if (nxt_pid != pid) { - nxt_debug(task, "app \"%s\" real pid %d", init->name, nxt_pid); - nxt_debug(task, "app \"%s\" isolated pid: %d", init->name, pid); - } + nxt_debug(task, "%s isolated pid is %d", process->name, nxt_pid); - n = read(parentfd, &parent_status, sizeof(parent_status)); - if (nxt_slow_path(n == -1 || n != sizeof(parent_status))) { - nxt_alert(task, "failed to read parent status"); - return NXT_ERROR; - } + pidsz = readlink("/proc/self", procpid, sizeof(procpid)); + + if (nxt_slow_path(pidsz < 0 || pidsz >= (ssize_t) sizeof(procpid))) { + nxt_alert(task, "failed to read real pid from /proc/self"); + return NXT_ERROR; + } + + procpid[pidsz] = '\0'; + + nxt_pid = (nxt_pid_t) strtol(procpid, NULL, 10); + + nxt_assert(nxt_pid > 0 && nxt_errno != ERANGE); - if (nxt_slow_path(parent_status != NXT_OK)) { - return parent_status; + process->pid = nxt_pid; + task->thread->tid = nxt_pid; + + nxt_debug(task, "%s real pid is %d", process->name, nxt_pid); } +#endif + ptype = init->type; nxt_port_reset_next_id(); nxt_event_engine_thread_adopt(task->thread->engine); + rt = task->thread->runtime; + /* Remove not ready processes. */ nxt_runtime_process_each(rt, p) { @@ -114,7 +119,7 @@ nxt_process_worker_setup(nxt_task_t *task, nxt_process_t *process, int parentfd) continue; } - if (!p->ready) { + if (p->state != NXT_PROCESS_STATE_READY) { nxt_debug(task, "remove not ready process %PI", p->pid); nxt_process_close_ports(task, p); @@ -127,12 +132,6 @@ nxt_process_worker_setup(nxt_task_t *task, nxt_process_t *process, int parentfd) } nxt_runtime_process_loop; - nxt_runtime_process_add(task, process); - - nxt_process_start(task, process); - - process->ready = 1; - return NXT_OK; } @@ -140,48 +139,36 @@ nxt_process_worker_setup(nxt_task_t *task, nxt_process_t *process, int parentfd) nxt_pid_t nxt_process_create(nxt_task_t *task, nxt_process_t *process) { - int pipefd[2]; nxt_int_t ret; nxt_pid_t pid; - nxt_process_init_t *init; - - if (nxt_slow_path(pipe(pipefd) == -1)) { - nxt_alert(task, "failed to create process pipe for passing rpid"); - return -1; - } - - init = process->init; #if (NXT_HAVE_CLONE) - pid = nxt_clone(SIGCHLD | init->isolation.clone.flags); + pid = nxt_clone(SIGCHLD | process->isolation.clone.flags); if (nxt_slow_path(pid < 0)) { - nxt_alert(task, "clone() failed while creating \"%s\" %E", - init->name, nxt_errno); - goto cleanup; + nxt_alert(task, "clone() failed for %s %E", process->name, nxt_errno); + return pid; } #else pid = fork(); if (nxt_slow_path(pid < 0)) { - nxt_alert(task, "fork() failed while creating \"%s\" %E", - init->name, nxt_errno); - goto cleanup; + nxt_alert(task, "fork() failed for %s %E", process->name, nxt_errno); + return pid; } #endif if (pid == 0) { /* Child. */ - if (nxt_slow_path(close(pipefd[1]) == -1)) { - nxt_alert(task, "failed to close writer pipe fd"); - } - - ret = nxt_process_worker_setup(task, process, pipefd[0]); + ret = nxt_process_child_fixup(task, process); if (nxt_slow_path(ret != NXT_OK)) { - exit(1); + nxt_process_quit(task, 1); + return -1; } - if (nxt_slow_path(close(pipefd[0]) == -1)) { - nxt_alert(task, "failed to close writer pipe fd"); + nxt_runtime_process_add(task, process); + + if (nxt_slow_path(nxt_process_setup(task, process) != NXT_OK)) { + nxt_process_quit(task, 1); } /* @@ -193,78 +180,24 @@ nxt_process_create(nxt_task_t *task, nxt_process_t *process) /* Parent. */ - /* - * At this point, the child process is blocked reading the - * pipe fd to get its real pid (rpid). - * - * If anything goes wrong now, we need to terminate the child - * process by sending a NXT_ERROR in the pipe. - */ - #if (NXT_HAVE_CLONE) - nxt_debug(task, "clone(\"%s\"): %PI", init->name, pid); + nxt_debug(task, "clone(%s): %PI", process->name, pid); #else - nxt_debug(task, "fork(\"%s\"): %PI", init->name, pid); + nxt_debug(task, "fork(%s): %PI", process->name, pid); #endif - if (nxt_slow_path(write(pipefd[1], &pid, sizeof(pid)) == -1)) { - nxt_alert(task, "failed to write real pid"); - goto fail; - } - -#if (NXT_HAVE_CLONE && NXT_HAVE_CLONE_NEWUSER) - if (NXT_CLONE_USER(init->isolation.clone.flags)) { - ret = nxt_clone_credential_map(task, pid, init->user_cred, - &init->isolation.clone); - if (nxt_slow_path(ret != NXT_OK)) { - goto fail; - } - } -#endif - - ret = NXT_OK; - - if (nxt_slow_path(write(pipefd[1], &ret, sizeof(ret)) == -1)) { - nxt_alert(task, "failed to write status"); - goto fail; - } - process->pid = pid; nxt_runtime_process_add(task, process); - goto cleanup; - -fail: - - ret = NXT_ERROR; - - if (nxt_slow_path(write(pipefd[1], &ret, sizeof(ret)) == -1)) { - nxt_alert(task, "failed to write status"); - } - - waitpid(pid, NULL, 0); - - pid = -1; - -cleanup: - - if (nxt_slow_path(close(pipefd[0]) != 0)) { - nxt_alert(task, "failed to close pipe: %E", nxt_errno); - } - - if (nxt_slow_path(close(pipefd[1]) != 0)) { - nxt_alert(task, "failed to close pipe: %E", nxt_errno); - } - return pid; } -static void -nxt_process_start(nxt_task_t *task, nxt_process_t *process) +static nxt_int_t +nxt_process_setup(nxt_task_t *task, nxt_process_t *process) { - nxt_int_t ret, cap_setid; + nxt_int_t ret; nxt_port_t *port, *main_port; nxt_thread_t *thread; nxt_runtime_t *rt; @@ -272,37 +205,17 @@ nxt_process_start(nxt_task_t *task, nxt_process_t *process) nxt_event_engine_t *engine; const nxt_event_interface_t *interface; - init = process->init; + init = nxt_process_init(process); - nxt_log(task, NXT_LOG_INFO, "%s started", init->name); + nxt_debug(task, "%s setup", process->name); - nxt_process_title(task, "unit: %s", init->name); + nxt_process_title(task, "unit: %s", process->name); thread = task->thread; rt = thread->runtime; nxt_random_init(&thread->random); - cap_setid = rt->capabilities.setid; - -#if (NXT_HAVE_CLONE_NEWUSER) - if (!cap_setid && NXT_CLONE_USER(init->isolation.clone.flags)) { - cap_setid = 1; - } -#endif - - if (cap_setid) { - ret = nxt_credential_setgids(task, init->user_cred); - if (nxt_slow_path(ret != NXT_OK)) { - goto fail; - } - - ret = nxt_credential_setuid(task, init->user_cred); - if (nxt_slow_path(ret != NXT_OK)) { - goto fail; - } - } - rt->type = init->type; engine = thread->engine; @@ -312,17 +225,17 @@ nxt_process_start(nxt_task_t *task, nxt_process_t *process) interface = nxt_service_get(rt->services, "engine", rt->engine); if (nxt_slow_path(interface == NULL)) { - goto fail; + return NXT_ERROR; } if (nxt_event_engine_change(engine, interface, rt->batch) != NXT_OK) { - goto fail; + return NXT_ERROR; } ret = nxt_runtime_thread_pool_create(thread, rt, rt->auxiliary_threads, 60000 * 1000000LL); if (nxt_slow_path(ret != NXT_OK)) { - goto fail; + return NXT_ERROR; } main_port = rt->port_by_type[NXT_PROCESS_MAIN]; @@ -334,28 +247,282 @@ nxt_process_start(nxt_task_t *task, nxt_process_t *process) nxt_port_write_close(port); - ret = init->start(task, init->data); + nxt_port_enable(task, port, init->port_handlers); + + ret = init->setup(task, process); if (nxt_slow_path(ret != NXT_OK)) { - goto fail; + return NXT_ERROR; } - nxt_port_enable(task, port, init->port_handlers); + switch (process->state) { - ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_PROCESS_READY, - -1, init->stream, 0, NULL); + case NXT_PROCESS_STATE_CREATED: + ret = nxt_process_send_created(task, process); + break; + + case NXT_PROCESS_STATE_READY: + ret = nxt_process_send_ready(task, process); + + if (nxt_slow_path(ret != NXT_OK)) { + break; + } + + ret = init->start(task, &process->data); + break; + + default: + nxt_assert(0); + } if (nxt_slow_path(ret != NXT_OK)) { - nxt_log(task, NXT_LOG_ERR, "failed to send READY message to main"); + nxt_alert(task, "%s failed to start", process->name); + } + + return ret; +} + +static nxt_int_t +nxt_process_send_created(nxt_task_t *task, nxt_process_t *process) +{ + uint32_t stream; + nxt_int_t ret; + nxt_port_t *my_port, *main_port; + nxt_runtime_t *rt; + + nxt_assert(process->state == NXT_PROCESS_STATE_CREATED); + + rt = task->thread->runtime; + + my_port = nxt_process_port_first(process); + main_port = rt->port_by_type[NXT_PROCESS_MAIN]; + + nxt_assert(my_port != NULL && main_port != NULL); + + stream = nxt_port_rpc_register_handler(task, my_port, + nxt_process_created_ok, + nxt_process_created_error, + main_port->pid, process); + + if (nxt_slow_path(stream == 0)) { + return NXT_ERROR; + } + + ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_PROCESS_CREATED, + -1, stream, my_port->id, NULL); + + if (nxt_slow_path(ret != NXT_OK)) { + nxt_alert(task, "%s failed to send CREATED message", process->name); + nxt_port_rpc_cancel(task, my_port, stream); + return NXT_ERROR; + } + + nxt_debug(task, "%s created", process->name); + + return NXT_OK; +} + + +static void +nxt_process_created_ok(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) +{ + nxt_int_t ret; + nxt_process_t *process; + nxt_process_init_t *init; + + process = data; + init = nxt_process_init(process); + + ret = nxt_process_apply_creds(task, process); + if (nxt_slow_path(ret != NXT_OK)) { goto fail; } - return; + nxt_log(task, NXT_LOG_INFO, "%s started", process->name); + + ret = init->start(task, &process->data); fail: - exit(1); + nxt_process_quit(task, ret == NXT_OK ? 0 : 1); +} + + +static void +nxt_process_created_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, + void *data) +{ + nxt_process_t *process; + nxt_process_init_t *init; + + process = data; + init = nxt_process_init(process); + + nxt_alert(task, "%s failed to start", init->name); + + nxt_process_quit(task, 1); +} + + +nxt_int_t +nxt_process_core_setup(nxt_task_t *task, nxt_process_t *process) +{ + nxt_int_t ret; + + ret = nxt_process_apply_creds(task, process); + if (nxt_slow_path(ret != NXT_OK)) { + return NXT_ERROR; + } + + process->state = NXT_PROCESS_STATE_READY; + + return NXT_OK; +} + + +#if (NXT_HAVE_CLONE_NEWUSER) + +nxt_int_t +nxt_process_vldt_isolation_creds(nxt_task_t *task, nxt_process_t *process) +{ + nxt_int_t ret; + nxt_clone_t *clone; + nxt_credential_t *creds; + + clone = &process->isolation.clone; + creds = process->user_cred; + + if (clone->uidmap.size == 0 && clone->gidmap.size == 0) { + return NXT_OK; + } + + if (!nxt_is_clone_flag_set(clone->flags, NEWUSER)) { + if (nxt_slow_path(clone->uidmap.size > 0)) { + nxt_log(task, NXT_LOG_ERR, "\"uidmap\" is set but " + "\"isolation.namespaces.credential\" is false or unset"); + + return NXT_ERROR; + } + + if (nxt_slow_path(clone->gidmap.size > 0)) { + nxt_log(task, NXT_LOG_ERR, "\"gidmap\" is set but " + "\"isolation.namespaces.credential\" is false or unset"); + + return NXT_ERROR; + } + + return NXT_OK; + } + + ret = nxt_clone_vldt_credential_uidmap(task, &clone->uidmap, creds); + if (nxt_slow_path(ret != NXT_OK)) { + return NXT_ERROR; + } + + return nxt_clone_vldt_credential_gidmap(task, &clone->gidmap, creds); +} + +#endif + + +nxt_int_t +nxt_process_creds_set(nxt_task_t *task, nxt_process_t *process, nxt_str_t *user, + nxt_str_t *group) +{ + char *str; + + process->user_cred = nxt_mp_zalloc(process->mem_pool, + sizeof(nxt_credential_t)); + + if (nxt_slow_path(process->user_cred == NULL)) { + return NXT_ERROR; + } + + str = nxt_mp_zalloc(process->mem_pool, user->length + 1); + if (nxt_slow_path(str == NULL)) { + return NXT_ERROR; + } + + nxt_memcpy(str, user->start, user->length); + str[user->length] = '\0'; + + process->user_cred->user = str; + + if (group->start != NULL) { + str = nxt_mp_zalloc(process->mem_pool, group->length + 1); + if (nxt_slow_path(str == NULL)) { + return NXT_ERROR; + } + + nxt_memcpy(str, group->start, group->length); + str[group->length] = '\0'; + + } else { + str = NULL; + } + + return nxt_credential_get(task, process->mem_pool, process->user_cred, str); +} + + +nxt_int_t +nxt_process_apply_creds(nxt_task_t *task, nxt_process_t *process) +{ + nxt_int_t ret, cap_setid; + nxt_runtime_t *rt; + + rt = task->thread->runtime; + + cap_setid = rt->capabilities.setid; + +#if (NXT_HAVE_CLONE && NXT_HAVE_CLONE_NEWUSER) + if (!cap_setid + && nxt_is_clone_flag_set(process->isolation.clone.flags, NEWUSER)) { + cap_setid = 1; + } +#endif + + if (cap_setid) { + ret = nxt_credential_setgids(task, process->user_cred); + if (nxt_slow_path(ret != NXT_OK)) { + return NXT_ERROR; + } + + ret = nxt_credential_setuid(task, process->user_cred); + if (nxt_slow_path(ret != NXT_OK)) { + return NXT_ERROR; + } + } + + return NXT_OK; +} + + +static nxt_int_t +nxt_process_send_ready(nxt_task_t *task, nxt_process_t *process) +{ + nxt_int_t ret; + nxt_port_t *main_port; + nxt_runtime_t *rt; + + rt = task->thread->runtime; + + main_port = rt->port_by_type[NXT_PROCESS_MAIN]; + + nxt_assert(main_port != NULL); + + ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_PROCESS_READY, + -1, process->stream, 0, NULL); + + if (nxt_slow_path(ret != NXT_OK)) { + nxt_alert(task, "%s failed to send READY message", process->name); + return NXT_ERROR; + } + + nxt_debug(task, "%s sent ready", process->name); + + return NXT_OK; } @@ -625,3 +792,50 @@ nxt_process_connected_port_find(nxt_process_t *process, nxt_port_t *port) return res; } + + +void +nxt_process_quit(nxt_task_t *task, nxt_uint_t exit_status) +{ + nxt_uint_t n; + nxt_queue_t *listen; + nxt_runtime_t *rt; + nxt_queue_link_t *link, *next; + nxt_listen_event_t *lev; + nxt_listen_socket_t *ls; + + rt = task->thread->runtime; + + nxt_debug(task, "close listen connections"); + + listen = &task->thread->engine->listen_connections; + + for (link = nxt_queue_first(listen); + link != nxt_queue_tail(listen); + link = next) + { + next = nxt_queue_next(link); + lev = nxt_queue_link_data(link, nxt_listen_event_t, link); + nxt_queue_remove(link); + + nxt_fd_event_close(task->thread->engine, &lev->socket); + } + + if (rt->listen_sockets != NULL) { + + ls = rt->listen_sockets->elts; + n = rt->listen_sockets->nelts; + + while (n != 0) { + nxt_socket_close(task, ls->socket); + ls->socket = -1; + + ls++; + n--; + } + + rt->listen_sockets->nelts = 0; + } + + nxt_runtime_quit(task, exit_status); +} diff --git a/src/nxt_process.h b/src/nxt_process.h index 3f7155c8..45bab25e 100644 --- a/src/nxt_process.h +++ b/src/nxt_process.h @@ -8,68 +8,123 @@ #define _NXT_PROCESS_H_INCLUDED_ #if (NXT_HAVE_CLONE) +#include <unistd.h> #include <nxt_clone.h> #endif -typedef pid_t nxt_pid_t; +#if (NXT_HAVE_CLONE) +/* + * Old glibc wrapper for getpid(2) returns a cached pid invalidated only by + * fork(2) calls. As we use clone(2) for container, it returns the wrong pid. + */ +#define nxt_getpid() \ + syscall(__NR_getpid) +#else +#define nxt_getpid() \ + getpid() +#endif +typedef pid_t nxt_pid_t; -typedef struct nxt_process_init_s nxt_process_init_t; -typedef nxt_int_t (*nxt_process_start_t)(nxt_task_t *task, void *data); -typedef nxt_int_t (*nxt_process_restart_t)(nxt_task_t *task, nxt_runtime_t *rt, - nxt_process_init_t *init); -struct nxt_process_init_s { - nxt_mp_t *mem_pool; - nxt_process_start_t start; - const char *name; - nxt_credential_t *user_cred; +typedef struct nxt_common_app_conf_s nxt_common_app_conf_t; - const nxt_port_handlers_t *port_handlers; - const nxt_sig_event_t *signals; - nxt_process_type_t type; +typedef struct { + nxt_runtime_t *rt; +} nxt_discovery_init_t; - void *data; - uint32_t stream; - union { -#if (NXT_HAVE_CLONE) - nxt_clone_t clone; +typedef struct { + nxt_str_t conf; +#if (NXT_TLS) + nxt_array_t *certs; #endif - } isolation; -}; +} nxt_controller_init_t; + + +typedef union { + void *discovery; + nxt_controller_init_t controller; + void *router; + nxt_common_app_conf_t *app; +} nxt_process_data_t; + + +typedef enum { + NXT_PROCESS_STATE_CREATING = 0, + NXT_PROCESS_STATE_CREATED, + NXT_PROCESS_STATE_READY, +} nxt_process_state_t; typedef struct nxt_port_mmap_s nxt_port_mmap_t; -typedef struct nxt_port_mmaps_s nxt_port_mmaps_t; -struct nxt_port_mmaps_s { + +typedef struct { nxt_thread_mutex_t mutex; uint32_t size; uint32_t cap; nxt_port_mmap_t *elts; -}; +} nxt_port_mmaps_t; typedef struct { - nxt_pid_t pid; - nxt_queue_t ports; /* of nxt_port_t */ - nxt_bool_t ready; - nxt_bool_t registered; - nxt_int_t use_count; + nxt_pid_t pid; + const char *name; + nxt_queue_t ports; /* of nxt_port_t */ + nxt_process_state_t state; + nxt_bool_t registered; + nxt_int_t use_count; + + nxt_port_mmaps_t incoming; + nxt_port_mmaps_t outgoing; + + nxt_thread_mutex_t cp_mutex; + nxt_lvlhsh_t connected_ports; /* of nxt_port_t */ - nxt_process_init_t *init; + uint32_t stream; - nxt_port_mmaps_t incoming; - nxt_port_mmaps_t outgoing; + nxt_mp_t *mem_pool; + nxt_credential_t *user_cred; - nxt_thread_mutex_t cp_mutex; - nxt_lvlhsh_t connected_ports; /* of nxt_port_t */ + nxt_process_data_t data; + + union { +#if (NXT_HAVE_CLONE) + nxt_clone_t clone; +#endif + } isolation; } nxt_process_t; +typedef nxt_int_t (*nxt_process_prefork_t)(nxt_task_t *task, + nxt_process_t *process, nxt_mp_t *mp); +typedef nxt_int_t (*nxt_process_postfork_t)(nxt_task_t *task, + nxt_process_t *process, nxt_mp_t *mp); +typedef nxt_int_t (*nxt_process_setup_t)(nxt_task_t *task, + nxt_process_t *process); +typedef nxt_int_t (*nxt_process_start_t)(nxt_task_t *task, + nxt_process_data_t *data); + + +typedef struct { + const char *name; + nxt_process_type_t type; + + nxt_process_prefork_t prefork; + + nxt_process_setup_t setup; + nxt_process_start_t start; + + uint8_t restart; /* 1-bit */ + + const nxt_port_handlers_t *port_handlers; + const nxt_sig_event_t *signals; +} nxt_process_init_t; + + extern nxt_bool_t nxt_proc_conn_matrix[NXT_PROCESS_MAX][NXT_PROCESS_MAX]; extern nxt_bool_t nxt_proc_remove_notify_matrix[NXT_PROCESS_MAX][NXT_PROCESS_MAX]; @@ -84,6 +139,9 @@ NXT_EXPORT void nxt_nanosleep(nxt_nsec_t ns); NXT_EXPORT void nxt_process_arguments(nxt_task_t *task, char **orig_argv, char ***orig_envp); +#define nxt_process_init(process) \ + (nxt_pointer_to(process, sizeof(nxt_process_t))) + #define nxt_process_port_remove(port) \ nxt_queue_remove(&port->link) @@ -113,11 +171,18 @@ void nxt_process_connected_port_remove(nxt_process_t *process, nxt_port_t *nxt_process_connected_port_find(nxt_process_t *process, nxt_port_t *port); -void nxt_worker_process_quit_handler(nxt_task_t *task, - nxt_port_recv_msg_t *msg); +void nxt_process_quit(nxt_task_t *task, nxt_uint_t exit_status); +void nxt_signal_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); -void nxt_init_destroy(nxt_runtime_t *rt, nxt_process_init_t *init); +nxt_int_t nxt_process_core_setup(nxt_task_t *task, nxt_process_t *process); +nxt_int_t nxt_process_creds_set(nxt_task_t *task, nxt_process_t *process, + nxt_str_t *user, nxt_str_t *group); +nxt_int_t nxt_process_apply_creds(nxt_task_t *task, nxt_process_t *process); +#if (NXT_HAVE_CLONE_NEWUSER) +nxt_int_t nxt_process_vldt_isolation_creds(nxt_task_t *task, + nxt_process_t *process); +#endif #if (NXT_HAVE_SETPROCTITLE) diff --git a/src/nxt_process_type.h b/src/nxt_process_type.h index 5ff06d63..14deda19 100644 --- a/src/nxt_process_type.h +++ b/src/nxt_process_type.h @@ -13,7 +13,7 @@ typedef enum { NXT_PROCESS_DISCOVERY, NXT_PROCESS_CONTROLLER, NXT_PROCESS_ROUTER, - NXT_PROCESS_WORKER, + NXT_PROCESS_APP, NXT_PROCESS_MAX, } nxt_process_type_t; diff --git a/src/nxt_python_wsgi.c b/src/nxt_python_wsgi.c index 14211f3f..089d15c0 100644 --- a/src/nxt_python_wsgi.c +++ b/src/nxt_python_wsgi.c @@ -65,7 +65,8 @@ typedef struct { PyObject_HEAD } nxt_py_error_t; -static nxt_int_t nxt_python_init(nxt_task_t *task, nxt_common_app_conf_t *conf); +static nxt_int_t nxt_python_start(nxt_task_t *task, + nxt_process_data_t *data); static nxt_int_t nxt_python_init_strings(void); static void nxt_python_request_handler(nxt_unit_request_info_t *req); static void nxt_python_atexit(void); @@ -116,7 +117,7 @@ NXT_EXPORT nxt_app_module_t nxt_app_module = { nxt_string("python"), PY_VERSION, NULL, - nxt_python_init, + nxt_python_start, }; @@ -211,7 +212,7 @@ static nxt_python_string_t nxt_python_strings[] = { static nxt_int_t -nxt_python_init(nxt_task_t *task, nxt_common_app_conf_t *conf) +nxt_python_start(nxt_task_t *task, nxt_process_data_t *data) { int rc; char *nxt_py_module; @@ -219,6 +220,7 @@ nxt_python_init(nxt_task_t *task, nxt_common_app_conf_t *conf) PyObject *obj, *pypath, *module; nxt_unit_ctx_t *unit_ctx; nxt_unit_init_t python_init; + nxt_common_app_conf_t *app_conf; nxt_python_app_conf_t *c; #if PY_MAJOR_VERSION == 3 char *path; @@ -229,7 +231,8 @@ nxt_python_init(nxt_task_t *task, nxt_common_app_conf_t *conf) static const char bin_python[] = "/bin/python"; #endif - c = &conf->u.python; + app_conf = data->app; + c = &app_conf->u.python; if (c->module.length == 0) { nxt_alert(task, "python module is empty"); @@ -410,7 +413,7 @@ nxt_python_init(nxt_task_t *task, nxt_common_app_conf_t *conf) nxt_unit_default_init(task, &python_init); python_init.callbacks.request_handler = nxt_python_request_handler; - python_init.shm_limit = conf->shm_limit; + python_init.shm_limit = data->app->shm_limit; unit_ctx = nxt_unit_init(&python_init); if (nxt_slow_path(unit_ctx == NULL)) { diff --git a/src/nxt_router.c b/src/nxt_router.c index b4cba08b..788199c7 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -75,6 +75,9 @@ struct nxt_port_select_state_s { typedef struct nxt_port_select_state_s nxt_port_select_state_t; +static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, + nxt_mp_t *mp); +static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data); static void nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port); @@ -268,8 +271,8 @@ static const nxt_str_t *nxt_app_msg_prefix[] = { }; -nxt_port_handlers_t nxt_router_process_port_handlers = { - .quit = nxt_worker_process_quit_handler, +static const nxt_port_handlers_t nxt_router_process_port_handlers = { + .quit = nxt_signal_quit_handler, .new_port = nxt_router_new_port_handler, .change_file = nxt_port_change_log_file_handler, .mmap = nxt_port_mmap_handler, @@ -282,8 +285,29 @@ nxt_port_handlers_t nxt_router_process_port_handlers = { }; -nxt_int_t -nxt_router_start(nxt_task_t *task, void *data) +const nxt_process_init_t nxt_router_process = { + .name = "router", + .type = NXT_PROCESS_ROUTER, + .prefork = nxt_router_prefork, + .restart = 1, + .setup = nxt_process_core_setup, + .start = nxt_router_start, + .port_handlers = &nxt_router_process_port_handlers, + .signals = nxt_process_signals, +}; + + +static nxt_int_t +nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp) +{ + nxt_runtime_stop_app_processes(task, task->thread->runtime); + + return NXT_OK; +} + + +static nxt_int_t +nxt_router_start(nxt_task_t *task, nxt_process_data_t *data) { nxt_int_t ret; nxt_port_t *controller_port; @@ -292,6 +316,8 @@ nxt_router_start(nxt_task_t *task, void *data) rt = task->thread->runtime; + nxt_log(task, NXT_LOG_INFO, "router started"); + #if (NXT_TLS) rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL"); if (nxt_slow_path(rt->tls == NULL)) { @@ -382,8 +408,8 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, goto failed; } - ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1, - stream, port->id, b); + ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS, + -1, stream, port->id, b); if (nxt_slow_path(ret != NXT_OK)) { nxt_port_rpc_cancel(task, port, stream); @@ -862,7 +888,7 @@ nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) } if (msg->u.new_port == NULL - || msg->u.new_port->type != NXT_PROCESS_WORKER) + || msg->u.new_port->type != NXT_PROCESS_APP) { msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; } @@ -2400,8 +2426,8 @@ nxt_router_app_rpc_create(nxt_task_t *task, goto fail; } - ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1, - stream, router_port->id, b); + ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS, + -1, stream, router_port->id, b); if (nxt_slow_path(ret != NXT_OK)) { nxt_port_rpc_cancel(task, router_port, stream); diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c index ea01f06f..d7e35dec 100644 --- a/src/nxt_runtime.c +++ b/src/nxt_runtime.c @@ -21,6 +21,7 @@ static nxt_int_t nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt); static void nxt_runtime_start(nxt_task_t *task, void *obj, void *data); static void nxt_runtime_initial_start(nxt_task_t *task, nxt_uint_t status); static void nxt_runtime_close_idle_connections(nxt_event_engine_t *engine); +static void nxt_runtime_stop_all_processes(nxt_task_t *task, nxt_runtime_t *rt); static void nxt_runtime_exit(nxt_task_t *task, void *obj, void *data); static nxt_int_t nxt_runtime_event_engine_change(nxt_task_t *task, nxt_runtime_t *rt); @@ -438,7 +439,7 @@ nxt_runtime_quit(nxt_task_t *task, nxt_uint_t status) } if (rt->type == NXT_PROCESS_MAIN) { - nxt_main_stop_all_processes(task, rt); + nxt_runtime_stop_all_processes(task, rt); done = 0; } } @@ -478,6 +479,50 @@ nxt_runtime_close_idle_connections(nxt_event_engine_t *engine) } +void +nxt_runtime_stop_app_processes(nxt_task_t *task, nxt_runtime_t *rt) +{ + nxt_port_t *port; + nxt_process_t *process; + nxt_process_init_t *init; + + nxt_runtime_process_each(rt, process) { + + init = nxt_process_init(process); + + if (init->type == NXT_PROCESS_APP) { + + nxt_process_port_each(process, port) { + + (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, + 0, 0, NULL); + + } nxt_process_port_loop; + } + + } nxt_runtime_process_loop; +} + + +static void +nxt_runtime_stop_all_processes(nxt_task_t *task, nxt_runtime_t *rt) +{ + nxt_port_t *port; + nxt_process_t *process; + + nxt_runtime_process_each(rt, process) { + + nxt_process_port_each(process, port) { + + (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, + 0, NULL); + + } nxt_process_port_loop; + + } nxt_runtime_process_loop; +} + + static void nxt_runtime_exit(nxt_task_t *task, void *obj, void *data) { @@ -525,6 +570,10 @@ nxt_runtime_exit(nxt_task_t *task, void *obj, void *data) } nxt_runtime_process_loop; + if (rt->port_by_type[rt->type] != NULL) { + nxt_port_use(task, rt->port_by_type[rt->type], -1); + } + nxt_thread_mutex_destroy(&rt->processes_mutex); status = rt->status; @@ -1306,7 +1355,9 @@ nxt_runtime_process_new(nxt_runtime_t *rt) /* TODO: memory failures. */ - process = nxt_mp_zalloc(rt->mem_pool, sizeof(nxt_process_t)); + process = nxt_mp_zalloc(rt->mem_pool, + sizeof(nxt_process_t) + sizeof(nxt_process_init_t)); + if (nxt_slow_path(process == NULL)) { return NULL; } @@ -1347,8 +1398,9 @@ nxt_runtime_process_release(nxt_runtime_t *rt, nxt_process_t *process) nxt_thread_mutex_destroy(&process->outgoing.mutex); nxt_thread_mutex_destroy(&process->cp_mutex); - if (process->init != NULL) { - nxt_mp_destroy(process->init->mem_pool); + /* processes from nxt_runtime_process_get() have no memory pool */ + if (process->mem_pool != NULL) { + nxt_mp_destroy(process->mem_pool); } nxt_mp_free(rt->mem_pool, process); diff --git a/src/nxt_runtime.h b/src/nxt_runtime.h index a364c38c..d29b6b4d 100644 --- a/src/nxt_runtime.h +++ b/src/nxt_runtime.h @@ -110,6 +110,7 @@ nxt_port_t *nxt_runtime_process_port_create(nxt_task_t *task, nxt_runtime_t *rt, nxt_pid_t pid, nxt_port_id_t id, nxt_process_type_t type); void nxt_runtime_port_remove(nxt_task_t *task, nxt_port_t *port); +void nxt_runtime_stop_app_processes(nxt_task_t *task, nxt_runtime_t *rt); NXT_EXPORT nxt_port_t *nxt_runtime_port_find(nxt_runtime_t *rt, nxt_pid_t pid, nxt_port_id_t port_id); diff --git a/src/nxt_signal_handlers.c b/src/nxt_signal_handlers.c new file mode 100644 index 00000000..69ae2bc4 --- /dev/null +++ b/src/nxt_signal_handlers.c @@ -0,0 +1,67 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) NGINX, Inc. + */ + +#include <nxt_main.h> +#include <nxt_runtime.h> +#include <nxt_port.h> +#include <nxt_main_process.h> +#include <nxt_router.h> + + +static void nxt_signal_handler(nxt_task_t *task, void *obj, void *data); +static void nxt_signal_sigterm_handler(nxt_task_t *task, void *obj, void *data); +static void nxt_signal_sigquit_handler(nxt_task_t *task, void *obj, void *data); + + +const nxt_sig_event_t nxt_process_signals[] = { + nxt_event_signal(SIGHUP, nxt_signal_handler), + nxt_event_signal(SIGINT, nxt_signal_sigterm_handler), + nxt_event_signal(SIGQUIT, nxt_signal_sigquit_handler), + nxt_event_signal(SIGTERM, nxt_signal_sigterm_handler), + nxt_event_signal(SIGCHLD, nxt_signal_handler), + nxt_event_signal(SIGUSR1, nxt_signal_handler), + nxt_event_signal(SIGUSR2, nxt_signal_handler), + nxt_event_signal_end, +}; + + +static void +nxt_signal_handler(nxt_task_t *task, void *obj, void *data) +{ + nxt_trace(task, "signal signo:%d (%s) recevied, ignored", + (int) (uintptr_t) obj, data); +} + + +void +nxt_signal_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + nxt_process_quit(task, 0); +} + + +static void +nxt_signal_sigterm_handler(nxt_task_t *task, void *obj, void *data) +{ + nxt_debug(task, "sigterm handler signo:%d (%s)", + (int) (uintptr_t) obj, data); + + /* A fast exit. */ + + nxt_runtime_quit(task, 0); +} + + +static void +nxt_signal_sigquit_handler(nxt_task_t *task, void *obj, void *data) +{ + nxt_debug(task, "sigquit handler signo:%d (%s)", + (int) (uintptr_t) obj, data); + + /* A graceful exit. */ + + nxt_process_quit(task, 0); +} diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 67244420..9f6eab95 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -4248,7 +4248,7 @@ nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst, m.new_port.id = new_port->id; m.new_port.pid = new_port->pid; - m.new_port.type = NXT_PROCESS_WORKER; + m.new_port.type = NXT_PROCESS_APP; m.new_port.max_size = 16 * 1024; m.new_port.max_share = 64 * 1024; diff --git a/src/nxt_worker_process.c b/src/nxt_worker_process.c deleted file mode 100644 index 754e2ea8..00000000 --- a/src/nxt_worker_process.c +++ /dev/null @@ -1,118 +0,0 @@ - -/* - * Copyright (C) Igor Sysoev - * Copyright (C) NGINX, Inc. - */ - -#include <nxt_main.h> -#include <nxt_runtime.h> -#include <nxt_port.h> -#include <nxt_main_process.h> -#include <nxt_router.h> - - -static void nxt_worker_process_quit(nxt_task_t *task); -static void nxt_worker_process_signal_handler(nxt_task_t *task, void *obj, - void *data); -static void nxt_worker_process_sigterm_handler(nxt_task_t *task, void *obj, - void *data); -static void nxt_worker_process_sigquit_handler(nxt_task_t *task, void *obj, - void *data); - - -const nxt_sig_event_t nxt_worker_process_signals[] = { - nxt_event_signal(SIGHUP, nxt_worker_process_signal_handler), - nxt_event_signal(SIGINT, nxt_worker_process_sigterm_handler), - nxt_event_signal(SIGQUIT, nxt_worker_process_sigquit_handler), - nxt_event_signal(SIGTERM, nxt_worker_process_sigterm_handler), - nxt_event_signal(SIGCHLD, nxt_worker_process_signal_handler), - nxt_event_signal(SIGUSR1, nxt_worker_process_signal_handler), - nxt_event_signal(SIGUSR2, nxt_worker_process_signal_handler), - nxt_event_signal_end, -}; - - -static void -nxt_worker_process_quit(nxt_task_t *task) -{ - nxt_uint_t n; - nxt_queue_t *listen; - nxt_runtime_t *rt; - nxt_queue_link_t *link, *next; - nxt_listen_event_t *lev; - nxt_listen_socket_t *ls; - - rt = task->thread->runtime; - - nxt_debug(task, "close listen connections"); - - listen = &task->thread->engine->listen_connections; - - for (link = nxt_queue_first(listen); - link != nxt_queue_tail(listen); - link = next) - { - next = nxt_queue_next(link); - lev = nxt_queue_link_data(link, nxt_listen_event_t, link); - nxt_queue_remove(link); - - nxt_fd_event_close(task->thread->engine, &lev->socket); - } - - if (rt->listen_sockets != NULL) { - - ls = rt->listen_sockets->elts; - n = rt->listen_sockets->nelts; - - while (n != 0) { - nxt_socket_close(task, ls->socket); - ls->socket = -1; - - ls++; - n--; - } - - rt->listen_sockets->nelts = 0; - } - - nxt_runtime_quit(task, 0); -} - - -static void -nxt_worker_process_signal_handler(nxt_task_t *task, void *obj, void *data) -{ - nxt_trace(task, "signal signo:%d (%s) recevied, ignored", - (int) (uintptr_t) obj, data); -} - - -void -nxt_worker_process_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) -{ - nxt_worker_process_quit(task); -} - - -static void -nxt_worker_process_sigterm_handler(nxt_task_t *task, void *obj, void *data) -{ - nxt_debug(task, "sigterm handler signo:%d (%s)", - (int) (uintptr_t) obj, data); - - /* A fast exit. */ - - nxt_runtime_quit(task, 0); -} - - -static void -nxt_worker_process_sigquit_handler(nxt_task_t *task, void *obj, void *data) -{ - nxt_debug(task, "sigquit handler signo:%d (%s)", - (int) (uintptr_t) obj, data); - - /* A graceful exit. */ - - nxt_worker_process_quit(task); -} diff --git a/src/perl/nxt_perl_psgi.c b/src/perl/nxt_perl_psgi.c index 548e6daa..5e9200dc 100644 --- a/src/perl/nxt_perl_psgi.c +++ b/src/perl/nxt_perl_psgi.c @@ -95,8 +95,8 @@ static int nxt_perl_psgi_result_array(PerlInterpreter *my_perl, static void nxt_perl_psgi_result_cb(PerlInterpreter *my_perl, SV *result, nxt_unit_request_info_t *req); -static nxt_int_t nxt_perl_psgi_init(nxt_task_t *task, - nxt_common_app_conf_t *conf); +static nxt_int_t nxt_perl_psgi_start(nxt_task_t *task, + nxt_process_data_t *conf); static void nxt_perl_psgi_request_handler(nxt_unit_request_info_t *req); static void nxt_perl_psgi_atexit(void); @@ -119,7 +119,7 @@ NXT_EXPORT nxt_app_module_t nxt_app_module = { nxt_string("perl"), PERL_VERSION_STRING, NULL, - nxt_perl_psgi_init, + nxt_perl_psgi_start, }; @@ -1134,14 +1134,17 @@ nxt_perl_psgi_result_cb(PerlInterpreter *my_perl, SV *result, static nxt_int_t -nxt_perl_psgi_init(nxt_task_t *task, nxt_common_app_conf_t *conf) +nxt_perl_psgi_start(nxt_task_t *task, nxt_process_data_t *data) { int rc; nxt_unit_ctx_t *unit_ctx; nxt_unit_init_t perl_init; PerlInterpreter *my_perl; + nxt_common_app_conf_t *conf; nxt_perl_psgi_module_t module; + conf = data->app; + my_perl = nxt_perl_psgi_interpreter_init(task, conf->u.perl.script, &module.app); diff --git a/src/ruby/nxt_ruby.c b/src/ruby/nxt_ruby.c index 417e2d8d..40f72f51 100644 --- a/src/ruby/nxt_ruby.c +++ b/src/ruby/nxt_ruby.c @@ -28,7 +28,8 @@ typedef struct { } nxt_ruby_rack_init_t; -static nxt_int_t nxt_ruby_init(nxt_task_t *task, nxt_common_app_conf_t *conf); +static nxt_int_t nxt_ruby_start(nxt_task_t *task, + nxt_process_data_t *data); static VALUE nxt_ruby_init_basic(VALUE arg); static nxt_int_t nxt_ruby_init_io(nxt_task_t *task); static VALUE nxt_ruby_rack_init(nxt_ruby_rack_init_t *rack_init); @@ -78,21 +79,24 @@ NXT_EXPORT nxt_app_module_t nxt_app_module = { nxt_string("ruby"), ruby_version, NULL, - nxt_ruby_init, + nxt_ruby_start, }; static nxt_int_t -nxt_ruby_init(nxt_task_t *task, nxt_common_app_conf_t *conf) +nxt_ruby_start(nxt_task_t *task, nxt_process_data_t *data) { - int state, rc; - VALUE res; - nxt_unit_ctx_t *unit_ctx; - nxt_unit_init_t ruby_unit_init; - nxt_ruby_rack_init_t rack_init; + int state, rc; + VALUE res; + nxt_unit_ctx_t *unit_ctx; + nxt_unit_init_t ruby_unit_init; + nxt_ruby_rack_init_t rack_init; + nxt_common_app_conf_t *conf; static char *argv[2] = { (char *) "NGINX_Unit", (char *) "-e0" }; + conf = data->app; + RUBY_INIT_STACK ruby_init(); ruby_options(2, argv); diff --git a/test/test_java_application.py b/test/test_java_application.py index 3051ddea..0cb18c25 100644 --- a/test/test_java_application.py +++ b/test/test_java_application.py @@ -13,6 +13,7 @@ class TestJavaApplication(TestApplicationJava): [ r'realpath.*failed', r'failed to apply new conf', + r'application setup failed', ] ) self.assertIn( |