diff options
Diffstat (limited to 'src')
57 files changed, 3327 insertions, 1637 deletions
diff --git a/src/nodejs/unit-http/unit.cpp b/src/nodejs/unit-http/unit.cpp index 589eca3f..ee5dc46f 100644 --- a/src/nodejs/unit-http/unit.cpp +++ b/src/nodejs/unit-http/unit.cpp @@ -519,11 +519,11 @@ Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) void -Unit::remove_port(nxt_unit_t *unit, nxt_unit_port_t *port) +Unit::remove_port(nxt_unit_t *unit, nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) { port_data_t *data; - if (port->data != NULL) { + if (port->data != NULL && ctx != NULL) { data = (port_data_t *) port->data; data->stop(); diff --git a/src/nodejs/unit-http/unit.h b/src/nodejs/unit-http/unit.h index 4ef40d45..1aa93073 100644 --- a/src/nodejs/unit-http/unit.h +++ b/src/nodejs/unit-http/unit.h @@ -41,7 +41,8 @@ private: void shm_ack_handler(nxt_unit_ctx_t *ctx); static int add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); - static void remove_port(nxt_unit_t *unit, nxt_unit_port_t *port); + static void remove_port(nxt_unit_t *unit, nxt_unit_ctx_t *ctx, + nxt_unit_port_t *port); static void quit_cb(nxt_unit_ctx_t *ctx); void quit(nxt_unit_ctx_t *ctx); diff --git a/src/nxt_application.c b/src/nxt_application.c index 5d58e60c..589821fb 100644 --- a/src/nxt_application.c +++ b/src/nxt_application.c @@ -42,9 +42,23 @@ static void nxt_discovery_quit(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data); static nxt_app_module_t *nxt_app_module_load(nxt_task_t *task, const char *name); +static nxt_int_t nxt_proto_setup(nxt_task_t *task, nxt_process_t *process); +static nxt_int_t nxt_proto_start(nxt_task_t *task, nxt_process_data_t *data); static nxt_int_t nxt_app_setup(nxt_task_t *task, nxt_process_t *process); static nxt_int_t nxt_app_set_environment(nxt_conf_value_t *environment); +static void nxt_proto_start_process_handler(nxt_task_t *task, + nxt_port_recv_msg_t *msg); +static void nxt_proto_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); +static void nxt_proto_process_created_handler(nxt_task_t *task, + nxt_port_recv_msg_t *msg); +static void nxt_proto_quit_children(nxt_task_t *task); +static nxt_process_t *nxt_proto_process_find(nxt_task_t *task, nxt_pid_t pid); +static void nxt_proto_process_add(nxt_task_t *task, nxt_process_t *process); +static nxt_process_t *nxt_proto_process_remove(nxt_task_t *task, nxt_pid_t pid); static u_char *nxt_cstr_dup(nxt_mp_t *mp, u_char *dst, u_char *src); +static void nxt_proto_signal_handler(nxt_task_t *task, void *obj, void *data); +static void nxt_proto_sigterm_handler(nxt_task_t *task, void *obj, void *data); +static void nxt_proto_sigchld_handler(nxt_task_t *task, void *obj, void *data); nxt_str_t nxt_server = nxt_string(NXT_SERVER); @@ -55,7 +69,12 @@ static uint32_t compat[] = { }; -static nxt_app_module_t *nxt_app; +static nxt_lvlhsh_t nxt_proto_processes; +static nxt_queue_t nxt_proto_children; +static nxt_bool_t nxt_proto_exiting; + +static nxt_app_module_t *nxt_app; +static nxt_common_app_conf_t *nxt_app_conf; static const nxt_port_handlers_t nxt_discovery_process_port_handlers = { @@ -70,6 +89,29 @@ static const nxt_port_handlers_t nxt_discovery_process_port_handlers = { }; +const nxt_sig_event_t nxt_prototype_signals[] = { + nxt_event_signal(SIGHUP, nxt_proto_signal_handler), + nxt_event_signal(SIGINT, nxt_proto_sigterm_handler), + nxt_event_signal(SIGQUIT, nxt_proto_sigterm_handler), + nxt_event_signal(SIGTERM, nxt_proto_sigterm_handler), + nxt_event_signal(SIGCHLD, nxt_proto_sigchld_handler), + nxt_event_signal_end, +}; + + +static const nxt_port_handlers_t nxt_proto_process_port_handlers = { + .quit = nxt_proto_quit_handler, + .change_file = nxt_port_change_log_file_handler, + .new_port = nxt_port_new_port_handler, + .process_created = nxt_proto_process_created_handler, + .process_ready = nxt_port_process_ready_handler, + .remove_pid = nxt_port_remove_pid_handler, + .start_process = nxt_proto_start_process_handler, + .rpc_ready = nxt_port_rpc_handler, + .rpc_error = nxt_port_rpc_handler, +}; + + static const nxt_port_handlers_t nxt_app_process_port_handlers = { .quit = nxt_signal_quit_handler, .rpc_ready = nxt_port_rpc_handler, @@ -89,12 +131,23 @@ const nxt_process_init_t nxt_discovery_process = { }; +const nxt_process_init_t nxt_proto_process = { + .type = NXT_PROCESS_PROTOTYPE, + .prefork = nxt_isolation_main_prefork, + .restart = 0, + .setup = nxt_proto_setup, + .start = nxt_proto_start, + .port_handlers = &nxt_proto_process_port_handlers, + .signals = nxt_prototype_signals, +}; + + const nxt_process_init_t nxt_app_process = { .type = NXT_PROCESS_APP, .setup = nxt_app_setup, - .prefork = nxt_isolation_main_prefork, + .start = NULL, + .prefork = NULL, .restart = 0, - .start = NULL, /* set to module->start */ .port_handlers = &nxt_app_process_port_handlers, .signals = nxt_process_signals, }; @@ -443,15 +496,18 @@ nxt_discovery_quit(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) static nxt_int_t -nxt_app_setup(nxt_task_t *task, nxt_process_t *process) +nxt_proto_setup(nxt_task_t *task, nxt_process_t *process) { nxt_int_t ret; - nxt_process_init_t *init; nxt_app_lang_module_t *lang; nxt_common_app_conf_t *app_conf; app_conf = process->data.app; + nxt_queue_init(&nxt_proto_children); + + nxt_app_conf = app_conf; + lang = nxt_app_lang_module(task->thread->runtime, &app_conf->type); if (nxt_slow_path(lang == NULL)) { nxt_alert(task, "unknown application type: \"%V\"", &app_conf->type); @@ -479,7 +535,6 @@ nxt_app_setup(nxt_task_t *task, nxt_process_t *process) if (nxt_app->setup != NULL) { ret = nxt_app->setup(task, process, app_conf); - if (nxt_slow_path(ret != NXT_OK)) { return ret; } @@ -514,30 +569,285 @@ nxt_app_setup(nxt_task_t *task, nxt_process_t *process) } } + process->state = NXT_PROCESS_STATE_CREATED; + + return NXT_OK; +} + + +static nxt_int_t +nxt_proto_start(nxt_task_t *task, nxt_process_data_t *data) +{ + nxt_debug(task, "prototype waiting for clone messages"); + + return NXT_OK; +} + + +static void +nxt_proto_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + u_char *p; + nxt_int_t ret; + nxt_port_t *port; + nxt_runtime_t *rt; + nxt_process_t *process; + nxt_process_init_t *init; + + rt = task->thread->runtime; + + process = nxt_process_new(rt); + if (nxt_slow_path(process == NULL)) { + goto failed; + } + + process->mem_pool = nxt_mp_create(1024, 128, 256, 32); + if (nxt_slow_path(process->mem_pool == NULL)) { + nxt_process_use(task, process, -1); + goto failed; + } + + process->parent_port = rt->port_by_type[NXT_PROCESS_PROTOTYPE]; + init = nxt_process_init(process); + *init = nxt_app_process; + + process->name = nxt_mp_alloc(process->mem_pool, nxt_app_conf->name.length + + sizeof("\"\" application") + 1); + + if (nxt_slow_path(process->name == NULL)) { + nxt_process_use(task, process, -1); + + goto failed; + } init->start = nxt_app->start; + init->name = (const char *) nxt_app_conf->name.start; + + p = (u_char *) process->name; + *p++ = '"'; + p = nxt_cpymem(p, nxt_app_conf->name.start, nxt_app_conf->name.length); + p = nxt_cpymem(p, "\" application", 13); + *p = '\0'; + + process->user_cred = &rt->user_cred; + + process->data.app = nxt_app_conf; + process->stream = msg->port_msg.stream; + + ret = nxt_process_start(task, process); + if (nxt_slow_path(ret == NXT_ERROR)) { + nxt_process_use(task, process, -1); + + goto failed; + } + + nxt_proto_process_add(task, process); + + return; + +failed: + + port = nxt_runtime_port_find(rt, msg->port_msg.pid, + msg->port_msg.reply_port); + + if (nxt_fast_path(port != NULL)) { + nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, + -1, msg->port_msg.stream, 0, NULL); + } +} + + +static void +nxt_proto_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + nxt_debug(task, "prototype quit handler"); + + nxt_proto_quit_children(task); + + nxt_proto_exiting = 1; + + if (nxt_queue_is_empty(&nxt_proto_children)) { + nxt_process_quit(task, 0); + } +} + + +static void +nxt_proto_quit_children(nxt_task_t *task) +{ + nxt_port_t *port; + nxt_process_t *process; + + nxt_queue_each(process, &nxt_proto_children, nxt_process_t, link) { + port = nxt_process_port_first(process); + + (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, + -1, 0, 0, NULL); + } + nxt_queue_loop; +} + + +static void +nxt_proto_process_created_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + nxt_pid_t isolated_pid, pid; + nxt_process_t *process; + + isolated_pid = nxt_recv_msg_cmsg_pid(msg); + + process = nxt_proto_process_find(task, isolated_pid); + if (nxt_slow_path(process == NULL)) { + return; + } + process->state = NXT_PROCESS_STATE_CREATED; - return NXT_OK; + pid = msg->port_msg.pid; + + if (process->pid != pid) { + nxt_debug(task, "app process %PI (aka %PI) is created", isolated_pid, + pid); + + nxt_runtime_process_remove(task->thread->runtime, process); + + process->pid = pid; + + nxt_runtime_process_add(task, process); + + } else { + nxt_debug(task, "app process %PI is created", isolated_pid); + } +} + + +static void +nxt_proto_signal_handler(nxt_task_t *task, void *obj, void *data) +{ + nxt_trace(task, "signal signo:%d (%s) received, ignored", + (int) (uintptr_t) obj, data); +} + + +static void +nxt_proto_sigterm_handler(nxt_task_t *task, void *obj, void *data) +{ + nxt_trace(task, "signal signo:%d (%s) received", + (int) (uintptr_t) obj, data); + + nxt_proto_quit_children(task); + + nxt_proto_exiting = 1; + + if (nxt_queue_is_empty(&nxt_proto_children)) { + nxt_process_quit(task, 0); + } +} + + +static void +nxt_proto_sigchld_handler(nxt_task_t *task, void *obj, void *data) +{ + int status; + nxt_err_t err; + nxt_pid_t pid; + nxt_process_t *process; + + nxt_debug(task, "proto sigchld handler signo:%d (%s)", + (int) (uintptr_t) obj, data); + + for ( ;; ) { + pid = waitpid(-1, &status, WNOHANG); + + if (pid == -1) { + + switch (err = nxt_errno) { + + case NXT_ECHILD: + return; + + case NXT_EINTR: + continue; + + default: + nxt_alert(task, "waitpid() failed: %E", err); + return; + } + } + + nxt_debug(task, "waitpid(): %PI", pid); + + if (pid == 0) { + return; + } + + if (WTERMSIG(status)) { +#ifdef WCOREDUMP + nxt_alert(task, "app process (isolated %PI) exited on signal %d%s", + pid, WTERMSIG(status), + WCOREDUMP(status) ? " (core dumped)" : ""); +#else + nxt_alert(task, "app process (isolated %PI) exited on signal %d", + pid, WTERMSIG(status)); +#endif + + } else { + nxt_trace(task, "app process (isolated %PI) exited with code %d", + pid, WEXITSTATUS(status)); + } + + process = nxt_proto_process_remove(task, pid); + if (process == NULL) { + continue; + } + + if (process->state != NXT_PROCESS_STATE_CREATING) { + nxt_port_remove_notify_others(task, process); + } + + nxt_process_close_ports(task, process); + + if (nxt_proto_exiting && nxt_queue_is_empty(&nxt_proto_children)) { + nxt_process_quit(task, 0); + return; + } + } } static nxt_app_module_t * nxt_app_module_load(nxt_task_t *task, const char *name) { - void *dl; + char *err; + void *dl; + nxt_app_module_t *app; dl = dlopen(name, RTLD_GLOBAL | RTLD_LAZY); - if (dl != NULL) { - return dlsym(dl, "nxt_app_module"); + if (nxt_slow_path(dl == NULL)) { + err = dlerror(); + nxt_alert(task, "dlopen(\"%s\") failed: \"%s\"", + name, err != NULL ? err : "(null)"); + return NULL; } - nxt_alert(task, "dlopen(\"%s\"), failed: \"%s\"", name, dlerror()); + app = dlsym(dl, "nxt_app_module"); - return NULL; + if (nxt_slow_path(app == NULL)) { + err = dlerror(); + nxt_alert(task, "dlsym(\"%s\", \"nxt_app_module\") failed: \"%s\"", + name, err != NULL ? err : "(null)"); + + if (dlclose(dl) != 0) { + err = dlerror(); + nxt_alert(task, "dlclose(\"%s\") failed: \"%s\"", + name, err != NULL ? err : "(null)"); + } + } + + return app; } @@ -602,6 +912,19 @@ nxt_cstr_dup(nxt_mp_t *mp, u_char *dst, u_char *src) } +static nxt_int_t +nxt_app_setup(nxt_task_t *task, nxt_process_t *process) +{ + nxt_process_init_t *init; + + process->state = NXT_PROCESS_STATE_CREATED; + + init = nxt_process_init(process); + + return init->start(task, &process->data); +} + + nxt_app_lang_module_t * nxt_app_lang_module(nxt_runtime_t *rt, nxt_str_t *name) { @@ -687,17 +1010,18 @@ nxt_app_parse_type(u_char *p, size_t length) nxt_int_t -nxt_unit_default_init(nxt_task_t *task, nxt_unit_init_t *init) +nxt_unit_default_init(nxt_task_t *task, nxt_unit_init_t *init, + nxt_common_app_conf_t *conf) { - nxt_port_t *my_port, *main_port, *router_port; + nxt_port_t *my_port, *proto_port, *router_port; nxt_runtime_t *rt; nxt_memzero(init, sizeof(nxt_unit_init_t)); rt = task->thread->runtime; - main_port = rt->port_by_type[NXT_PROCESS_MAIN]; - if (nxt_slow_path(main_port == NULL)) { + proto_port = rt->port_by_type[NXT_PROCESS_PROTOTYPE]; + if (nxt_slow_path(proto_port == NULL)) { return NXT_ERROR; } @@ -711,10 +1035,10 @@ nxt_unit_default_init(nxt_task_t *task, nxt_unit_init_t *init) return NXT_ERROR; } - init->ready_port.id.pid = main_port->pid; - init->ready_port.id.id = main_port->id; + init->ready_port.id.pid = proto_port->pid; + init->ready_port.id.id = proto_port->id; init->ready_port.in_fd = -1; - init->ready_port.out_fd = main_port->pair[1]; + init->ready_port.out_fd = proto_port->pair[1]; init->ready_stream = my_port->process->stream; @@ -730,5 +1054,127 @@ nxt_unit_default_init(nxt_task_t *task, nxt_unit_init_t *init) init->log_fd = 2; + init->shm_limit = conf->shm_limit; + init->request_limit = conf->request_limit; + return NXT_OK; } + + +static nxt_int_t +nxt_proto_lvlhsh_isolated_pid_test(nxt_lvlhsh_query_t *lhq, void *data) +{ + nxt_pid_t *qpid; + nxt_process_t *process; + + process = data; + qpid = (nxt_pid_t *) lhq->key.start; + + if (*qpid == process->isolated_pid) { + return NXT_OK; + } + + return NXT_DECLINED; +} + + +static const nxt_lvlhsh_proto_t lvlhsh_processes_proto nxt_aligned(64) = { + NXT_LVLHSH_DEFAULT, + nxt_proto_lvlhsh_isolated_pid_test, + nxt_lvlhsh_alloc, + nxt_lvlhsh_free, +}; + + +nxt_inline void +nxt_proto_process_lhq_pid(nxt_lvlhsh_query_t *lhq, nxt_pid_t *pid) +{ + lhq->key_hash = nxt_murmur_hash2(pid, sizeof(nxt_pid_t)); + lhq->key.length = sizeof(nxt_pid_t); + lhq->key.start = (u_char *) pid; + lhq->proto = &lvlhsh_processes_proto; +} + + +static void +nxt_proto_process_add(nxt_task_t *task, nxt_process_t *process) +{ + nxt_runtime_t *rt; + nxt_lvlhsh_query_t lhq; + + rt = task->thread->runtime; + + nxt_proto_process_lhq_pid(&lhq, &process->isolated_pid); + + lhq.replace = 0; + lhq.value = process; + lhq.pool = rt->mem_pool; + + switch (nxt_lvlhsh_insert(&nxt_proto_processes, &lhq)) { + + case NXT_OK: + nxt_debug(task, "process (isolated %PI) added", process->isolated_pid); + + nxt_queue_insert_tail(&nxt_proto_children, &process->link); + break; + + default: + nxt_debug(task, "process (isolated %PI) failed to add", + process->isolated_pid); + break; + } +} + + +static nxt_process_t * +nxt_proto_process_remove(nxt_task_t *task, nxt_pid_t pid) +{ + nxt_runtime_t *rt; + nxt_process_t *process; + nxt_lvlhsh_query_t lhq; + + nxt_proto_process_lhq_pid(&lhq, &pid); + + rt = task->thread->runtime; + + lhq.pool = rt->mem_pool; + + switch (nxt_lvlhsh_delete(&nxt_proto_processes, &lhq)) { + + case NXT_OK: + nxt_debug(task, "process (isolated %PI) removed", pid); + + process = lhq.value; + + nxt_queue_remove(&process->link); + break; + + default: + nxt_debug(task, "process (isolated %PI) remove failed", pid); + process = NULL; + break; + } + + return process; +} + + +static nxt_process_t * +nxt_proto_process_find(nxt_task_t *task, nxt_pid_t pid) +{ + nxt_process_t *process; + nxt_lvlhsh_query_t lhq; + + nxt_proto_process_lhq_pid(&lhq, &pid); + + if (nxt_lvlhsh_find(&nxt_proto_processes, &lhq) == NXT_OK) { + process = lhq.value; + + } else { + nxt_debug(task, "process (isolated %PI) not found", pid); + + process = NULL; + } + + return process; +} diff --git a/src/nxt_application.h b/src/nxt_application.h index 6fbdc4be..4612f072 100644 --- a/src/nxt_application.h +++ b/src/nxt_application.h @@ -101,6 +101,7 @@ struct nxt_common_app_conf_s { nxt_conf_value_t *limits; size_t shm_limit; + uint32_t request_limit; union { nxt_external_app_conf_t external; @@ -137,7 +138,7 @@ NXT_EXPORT extern nxt_str_t nxt_server; extern nxt_app_module_t nxt_external_module; NXT_EXPORT nxt_int_t nxt_unit_default_init(nxt_task_t *task, - nxt_unit_init_t *init); + nxt_unit_init_t *init, nxt_common_app_conf_t *conf); #endif /* _NXT_APPLICATION_H_INCLIDED_ */ diff --git a/src/nxt_cert.c b/src/nxt_cert.c index 1806bc19..01d413e0 100644 --- a/src/nxt_cert.c +++ b/src/nxt_cert.c @@ -1135,7 +1135,17 @@ nxt_cert_store_get_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid, msg->port_msg.reply_port); - if (port == NULL) { + if (nxt_slow_path(port == NULL)) { + nxt_alert(task, "process port not found (pid %PI, reply_port %d)", + msg->port_msg.pid, msg->port_msg.reply_port); + return; + } + + if (nxt_slow_path(port->type != NXT_PROCESS_CONTROLLER + && port->type != NXT_PROCESS_ROUTER)) + { + nxt_alert(task, "process %PI cannot store certificates", + msg->port_msg.pid); return; } @@ -1206,10 +1216,23 @@ nxt_cert_store_delete_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { u_char *p; nxt_str_t name; + nxt_port_t *ctl_port; nxt_runtime_t *rt; nxt_file_name_t *path; rt = task->thread->runtime; + ctl_port = rt->port_by_type[NXT_PROCESS_CONTROLLER]; + + if (nxt_slow_path(ctl_port == NULL)) { + nxt_alert(task, "controller port not found"); + return; + } + + if (nxt_slow_path(nxt_recv_msg_cmsg_pid(msg) != ctl_port->pid)) { + nxt_alert(task, "process %PI cannot delete certificates", + nxt_recv_msg_cmsg_pid(msg)); + return; + } if (nxt_slow_path(rt->certs.start == NULL)) { nxt_alert(task, "no certificates storage directory"); diff --git a/src/nxt_conf.h b/src/nxt_conf.h index 149af39a..8b3565fd 100644 --- a/src/nxt_conf.h +++ b/src/nxt_conf.h @@ -72,6 +72,8 @@ typedef struct { nxt_mp_t *pool; nxt_str_t error; void *ctx; + nxt_mp_t *conf_pool; + nxt_uint_t ver; } nxt_conf_validation_t; diff --git a/src/nxt_conf_validation.c b/src/nxt_conf_validation.c index a53fff74..3f068bbb 100644 --- a/src/nxt_conf_validation.c +++ b/src/nxt_conf_validation.c @@ -33,7 +33,8 @@ typedef enum { typedef enum { - NXT_CONF_VLDT_REQUIRED = 1, + NXT_CONF_VLDT_REQUIRED = 1 << 0, + NXT_CONF_VLDT_VAR = 1 << 1, } nxt_conf_vldt_flags_t; @@ -73,8 +74,8 @@ static nxt_int_t nxt_conf_vldt_type(nxt_conf_validation_t *vldt, nxt_str_t *name, nxt_conf_value_t *value, nxt_conf_vldt_type_t type); static nxt_int_t nxt_conf_vldt_error(nxt_conf_validation_t *vldt, const char *fmt, ...); -static nxt_int_t nxt_conf_vldt_var(nxt_conf_validation_t *vldt, - const char *option, nxt_str_t *value); +static nxt_int_t nxt_conf_vldt_var(nxt_conf_validation_t *vldt, nxt_str_t *name, + nxt_str_t *value); nxt_inline nxt_int_t nxt_conf_vldt_unsupported(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, void *data); @@ -112,6 +113,10 @@ static nxt_int_t nxt_conf_vldt_pass(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, void *data); static nxt_int_t nxt_conf_vldt_return(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, void *data); +static nxt_int_t nxt_conf_vldt_share(nxt_conf_validation_t *vldt, + nxt_conf_value_t *value, void *data); +static nxt_int_t nxt_conf_vldt_share_element(nxt_conf_validation_t *vldt, + nxt_conf_value_t *value); static nxt_int_t nxt_conf_vldt_proxy(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, void *data); static nxt_int_t nxt_conf_vldt_python(nxt_conf_validation_t *vldt, @@ -354,6 +359,7 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_listener_members[] = { .name = nxt_string("pass"), .type = NXT_CONF_VLDT_STRING, .validator = nxt_conf_vldt_pass, + .flags = NXT_CONF_VLDT_VAR, }, { .name = nxt_string("application"), .type = NXT_CONF_VLDT_STRING, @@ -511,8 +517,8 @@ static nxt_int_t nxt_conf_vldt_ticket_key_element(nxt_conf_validation_t *vldt, nxt_conf_value_t *value) { + ssize_t ret; nxt_str_t key; - nxt_int_t ret; if (nxt_conf_type(value) != NXT_CONF_STRING) { return nxt_conf_vldt_error(vldt, "The \"key\" array must " @@ -521,12 +527,8 @@ nxt_conf_vldt_ticket_key_element(nxt_conf_validation_t *vldt, nxt_conf_get_string(value, &key); - ret = nxt_openssl_base64_decode(NULL, 0, key.start, key.length); - if (nxt_slow_path(ret == NXT_ERROR)) { - return NXT_ERROR; - } - - if (ret == NXT_DECLINED) { + ret = nxt_base64_decode(NULL, key.start, key.length); + if (ret == NXT_ERROR) { return nxt_conf_vldt_error(vldt, "Invalid Base64 format for the ticket " "key \"%V\".", &key); } @@ -564,6 +566,7 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_match_members[] = { .name = nxt_string("method"), .type = NXT_CONF_VLDT_STRING | NXT_CONF_VLDT_ARRAY, .validator = nxt_conf_vldt_match_patterns, + .u.string = "method", }, { .name = nxt_string("scheme"), .type = NXT_CONF_VLDT_STRING, @@ -572,6 +575,7 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_match_members[] = { .name = nxt_string("host"), .type = NXT_CONF_VLDT_STRING | NXT_CONF_VLDT_ARRAY, .validator = nxt_conf_vldt_match_patterns, + .u.string = "host", }, { .name = nxt_string("source"), .type = NXT_CONF_VLDT_STRING | NXT_CONF_VLDT_ARRAY, @@ -584,6 +588,12 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_match_members[] = { .name = nxt_string("uri"), .type = NXT_CONF_VLDT_STRING | NXT_CONF_VLDT_ARRAY, .validator = nxt_conf_vldt_match_encoded_patterns, + .u.string = "uri" + }, { + .name = nxt_string("query"), + .type = NXT_CONF_VLDT_STRING | NXT_CONF_VLDT_ARRAY, + .validator = nxt_conf_vldt_match_encoded_patterns, + .u.string = "query" }, { .name = nxt_string("arguments"), .type = NXT_CONF_VLDT_OBJECT | NXT_CONF_VLDT_ARRAY, @@ -592,10 +602,12 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_match_members[] = { .name = nxt_string("headers"), .type = NXT_CONF_VLDT_OBJECT | NXT_CONF_VLDT_ARRAY, .validator = nxt_conf_vldt_match_patterns_sets, + .u.string = "headers" }, { .name = nxt_string("cookies"), .type = NXT_CONF_VLDT_OBJECT | NXT_CONF_VLDT_ARRAY, .validator = nxt_conf_vldt_match_patterns_sets, + .u.string = "cookies" }, NXT_CONF_VLDT_END @@ -607,6 +619,7 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_pass_action_members[] = { .name = nxt_string("pass"), .type = NXT_CONF_VLDT_STRING, .validator = nxt_conf_vldt_pass, + .flags = NXT_CONF_VLDT_VAR, }, NXT_CONF_VLDT_END @@ -630,7 +643,8 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_return_action_members[] = { static nxt_conf_vldt_object_t nxt_conf_vldt_share_action_members[] = { { .name = nxt_string("share"), - .type = NXT_CONF_VLDT_STRING, + .type = NXT_CONF_VLDT_STRING | NXT_CONF_VLDT_ARRAY, + .validator = nxt_conf_vldt_share, }, { .name = nxt_string("types"), .type = NXT_CONF_VLDT_STRING | NXT_CONF_VLDT_ARRAY, @@ -646,6 +660,7 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_share_action_members[] = { .validator = nxt_conf_vldt_unsupported, .u.string = "chroot", #endif + .flags = NXT_CONF_VLDT_VAR, }, { .name = nxt_string("follow_symlinks"), .type = NXT_CONF_VLDT_BOOLEAN, @@ -1163,7 +1178,6 @@ nxt_conf_validate(nxt_conf_validation_t *vldt) nxt_int_t ret; ret = nxt_conf_vldt_type(vldt, NULL, vldt->conf, NXT_CONF_VLDT_OBJECT); - if (ret != NXT_OK) { return ret; } @@ -1290,14 +1304,14 @@ nxt_conf_vldt_unsupported(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, static nxt_int_t -nxt_conf_vldt_var(nxt_conf_validation_t *vldt, const char *option, +nxt_conf_vldt_var(nxt_conf_validation_t *vldt, nxt_str_t *name, nxt_str_t *value) { u_char error[NXT_MAX_ERROR_STR]; if (nxt_var_test(value, error) != NXT_OK) { - return nxt_conf_vldt_error(vldt, "%s in the \"%s\" value.", - error, option); + return nxt_conf_vldt_error(vldt, "%s in the \"%V\" value.", + error, name); } return NXT_OK; @@ -1488,10 +1502,6 @@ nxt_conf_vldt_pass(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, nxt_conf_get_string(value, &pass); - if (nxt_is_var(&pass)) { - return nxt_conf_vldt_var(vldt, "pass", &pass); - } - ret = nxt_http_pass_segments(vldt->pool, &pass, segments, 3); if (ret != NXT_OK) { @@ -1617,6 +1627,70 @@ nxt_conf_vldt_return(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, static nxt_int_t +nxt_conf_vldt_share(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, + void *data) +{ + u_char *p; + nxt_str_t name, temp; + + static nxt_str_t uri = nxt_string("$uri"); + + if (nxt_conf_type(value) == NXT_CONF_ARRAY) { + if (nxt_conf_array_elements_count(value) == 0) { + return nxt_conf_vldt_error(vldt, "The \"share\" array " + "must contain at least one element."); + } + + return nxt_conf_vldt_array_iterator(vldt, value, + &nxt_conf_vldt_share_element); + } + + /* NXT_CONF_STRING */ + + if (vldt->ver < 12600) { + nxt_conf_get_string(value, &name); + + temp.length = name.length + uri.length; + + temp.start = nxt_mp_get(vldt->conf_pool, temp.length); + if (nxt_slow_path(temp.start == NULL)) { + return NXT_ERROR; + } + + p = nxt_cpymem(temp.start, name.start, name.length); + nxt_memcpy(p, uri.start, uri.length); + + nxt_conf_set_string(value, &temp); + } + + return nxt_conf_vldt_share_element(vldt, value); +} + + +static nxt_int_t +nxt_conf_vldt_share_element(nxt_conf_validation_t *vldt, + nxt_conf_value_t *value) +{ + nxt_str_t str; + + static nxt_str_t share = nxt_string("share"); + + if (nxt_conf_type(value) != NXT_CONF_STRING) { + return nxt_conf_vldt_error(vldt, "The \"share\" array must " + "contain only string values."); + } + + nxt_conf_get_string(value, &str); + + if (nxt_is_var(&str)) { + return nxt_conf_vldt_var(vldt, &share, &str); + } + + return NXT_OK; +} + + +static nxt_int_t nxt_conf_vldt_proxy(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, void *data) { @@ -1733,14 +1807,15 @@ static nxt_int_t nxt_conf_vldt_thread_stack_size(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, void *data) { - int64_t size; + int64_t size, min_size; size = nxt_conf_get_number(value); + min_size = sysconf(_SC_THREAD_STACK_MIN); - if (size < NXT_THREAD_STACK_MIN) { + if (size < min_size) { return nxt_conf_vldt_error(vldt, "The \"thread_stack_size\" number " "must be equal to or greater than %d.", - NXT_THREAD_STACK_MIN); + min_size); } if ((size % nxt_pagesize) != 0) { @@ -1801,14 +1876,22 @@ static nxt_int_t nxt_conf_vldt_match_patterns(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, void *data) { + nxt_int_t ret; + + vldt->ctx = data; + if (nxt_conf_type(value) == NXT_CONF_ARRAY) { - return nxt_conf_vldt_array_iterator(vldt, value, - &nxt_conf_vldt_match_pattern); + ret = nxt_conf_vldt_array_iterator(vldt, value, + &nxt_conf_vldt_match_pattern); + + } else { + /* NXT_CONF_STRING */ + ret = nxt_conf_vldt_match_pattern(vldt, value); } - /* NXT_CONF_STRING */ + vldt->ctx = NULL; - return nxt_conf_vldt_match_pattern(vldt, value); + return ret; } @@ -1824,8 +1907,8 @@ nxt_conf_vldt_match_pattern(nxt_conf_validation_t *vldt, #endif if (nxt_conf_type(value) != NXT_CONF_STRING) { - return nxt_conf_vldt_error(vldt, "The \"match\" patterns for \"host\", " - "\"uri\", and \"method\" must be strings."); + return nxt_conf_vldt_error(vldt, "The \"match\" pattern for \"%s\" " + "must be strings.", vldt->ctx); } nxt_conf_get_string(value, &pattern); @@ -1882,7 +1965,7 @@ static nxt_int_t nxt_conf_vldt_match_encoded_patterns_sets( &nxt_conf_vldt_match_encoded_patterns_set); } - /* NXT_CONF_STRING */ + /* NXT_CONF_OBJECT */ return nxt_conf_vldt_match_encoded_patterns_set(vldt, value); } @@ -1923,7 +2006,8 @@ nxt_conf_vldt_match_encoded_patterns_set_member(nxt_conf_validation_t *vldt, "\"arguments\" is encoded but is invalid."); } - return nxt_conf_vldt_match_encoded_patterns(vldt, value, NULL); + return nxt_conf_vldt_match_encoded_patterns(vldt, value, + (void *) "arguments"); } @@ -1931,14 +2015,22 @@ static nxt_int_t nxt_conf_vldt_match_encoded_patterns(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, void *data) { + nxt_int_t ret; + + vldt->ctx = data; + if (nxt_conf_type(value) == NXT_CONF_ARRAY) { - return nxt_conf_vldt_array_iterator(vldt, value, + ret = nxt_conf_vldt_array_iterator(vldt, value, &nxt_conf_vldt_match_encoded_pattern); + + } else { + /* NXT_CONF_STRING */ + ret = nxt_conf_vldt_match_encoded_pattern(vldt, value); } - /* NXT_CONF_STRING */ + vldt->ctx = NULL; - return nxt_conf_vldt_match_encoded_pattern(vldt, value); + return ret; } @@ -1951,8 +2043,8 @@ nxt_conf_vldt_match_encoded_pattern(nxt_conf_validation_t *vldt, nxt_str_t pattern; if (nxt_conf_type(value) != NXT_CONF_STRING) { - return nxt_conf_vldt_error(vldt, "The \"match\" pattern for \"uri\" " - "must be a string."); + return nxt_conf_vldt_error(vldt, "The \"match\" pattern for \"%s\" " + "must be a string.", vldt->ctx); } ret = nxt_conf_vldt_match_pattern(vldt, value); @@ -1969,8 +2061,8 @@ nxt_conf_vldt_match_encoded_pattern(nxt_conf_validation_t *vldt, end = nxt_decode_uri(p, pattern.start, pattern.length); if (nxt_slow_path(end == NULL)) { - return nxt_conf_vldt_error(vldt, "The \"match\" pattern for \"uri\" " - "is encoded but is invalid."); + return nxt_conf_vldt_error(vldt, "The \"match\" pattern for \"%s\" " + "is encoded but is invalid.", vldt->ctx); } return NXT_OK; @@ -2060,14 +2152,22 @@ static nxt_int_t nxt_conf_vldt_match_patterns_sets(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, void *data) { + nxt_int_t ret; + + vldt->ctx = data; + if (nxt_conf_type(value) == NXT_CONF_ARRAY) { - return nxt_conf_vldt_array_iterator(vldt, value, - &nxt_conf_vldt_match_patterns_set); + ret = nxt_conf_vldt_array_iterator(vldt, value, + &nxt_conf_vldt_match_patterns_set); + + } else { + /* NXT_CONF_OBJECT */ + ret = nxt_conf_vldt_match_patterns_set(vldt, value); } - /* NXT_CONF_OBJECT */ + vldt->ctx = NULL; - return nxt_conf_vldt_match_patterns_set(vldt, value); + return ret; } @@ -2077,8 +2177,7 @@ nxt_conf_vldt_match_patterns_set(nxt_conf_validation_t *vldt, { if (nxt_conf_type(value) != NXT_CONF_OBJECT) { return nxt_conf_vldt_error(vldt, "The \"match\" patterns for " - "\"arguments\", \"cookies\", and " - "\"headers\" must be objects."); + "\"%s\" must be objects.", vldt->ctx); } return nxt_conf_vldt_object_iterator(vldt, value, @@ -2095,7 +2194,7 @@ nxt_conf_vldt_match_patterns_set_member(nxt_conf_validation_t *vldt, "not contain empty member names."); } - return nxt_conf_vldt_match_patterns(vldt, value, NULL); + return nxt_conf_vldt_match_patterns(vldt, value, vldt->ctx); } @@ -2279,7 +2378,7 @@ nxt_conf_vldt_object(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, { uint32_t index; nxt_int_t ret; - nxt_str_t name; + nxt_str_t name, var; nxt_conf_value_t *member; nxt_conf_vldt_object_t *vals; @@ -2336,8 +2435,22 @@ nxt_conf_vldt_object(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, continue; } - ret = nxt_conf_vldt_type(vldt, &name, member, vals->type); + if (vals->flags & NXT_CONF_VLDT_VAR + && nxt_conf_type(member) == NXT_CONF_STRING) + { + nxt_conf_get_string(member, &var); + + if (nxt_is_var(&var)) { + ret = nxt_conf_vldt_var(vldt, &name, &var); + if (ret != NXT_OK) { + return ret; + } + break; + } + } + + ret = nxt_conf_vldt_type(vldt, &name, member, vals->type); if (ret != NXT_OK) { return ret; } diff --git a/src/nxt_controller.c b/src/nxt_controller.c index 779a625d..7510d6f0 100644 --- a/src/nxt_controller.c +++ b/src/nxt_controller.c @@ -41,6 +41,8 @@ typedef struct { static nxt_int_t nxt_controller_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp); +static nxt_int_t nxt_controller_file_read(nxt_task_t *task, const char *name, + nxt_str_t *str, nxt_mp_t *mp); static nxt_int_t nxt_controller_start(nxt_task_t *task, nxt_process_data_t *data); static void nxt_controller_process_new_port_handler(nxt_task_t *task, @@ -154,12 +156,9 @@ const nxt_process_init_t nxt_controller_process = { static nxt_int_t nxt_controller_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp) { - ssize_t n; - nxt_int_t ret; - nxt_str_t *conf; - nxt_file_t file; + nxt_str_t ver; + nxt_int_t ret, num; nxt_runtime_t *rt; - nxt_file_info_t fi; nxt_controller_init_t ctrl_init; nxt_log(task, NXT_LOG_INFO, "controller started"); @@ -168,48 +167,98 @@ nxt_controller_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp) nxt_memzero(&ctrl_init, sizeof(nxt_controller_init_t)); - conf = &ctrl_init.conf; + /* + * Since configuration version has only been introduced in 1.26, + * set the default version to 1.25. + */ + nxt_conf_ver = 12500; + + ret = nxt_controller_file_read(task, rt->conf, &ctrl_init.conf, mp); + if (nxt_slow_path(ret == NXT_ERROR)) { + return NXT_ERROR; + } + + if (ret == NXT_OK) { + ret = nxt_controller_file_read(task, rt->ver, &ver, mp); + if (nxt_slow_path(ret == NXT_ERROR)) { + return NXT_ERROR; + } + + if (ret == NXT_OK) { + num = nxt_int_parse(ver.start, ver.length); + + if (nxt_slow_path(num < 0)) { + nxt_alert(task, "failed to restore previous configuration: " + "invalid version string \"%V\"", &ver); + + nxt_str_null(&ctrl_init.conf); + + } else { + nxt_conf_ver = num; + } + } + } + +#if (NXT_TLS) + ctrl_init.certs = nxt_cert_store_load(task, mp); + + nxt_mp_cleanup(mp, nxt_controller_cert_cleanup, task, ctrl_init.certs, rt); +#endif + + process->data.controller = ctrl_init; + + return NXT_OK; +} + + +static nxt_int_t +nxt_controller_file_read(nxt_task_t *task, const char *name, nxt_str_t *str, + nxt_mp_t *mp) +{ + ssize_t n; + nxt_int_t ret; + nxt_file_t file; + nxt_file_info_t fi; nxt_memzero(&file, sizeof(nxt_file_t)); - file.name = (nxt_file_name_t *) rt->conf; + file.name = (nxt_file_name_t *) name; ret = nxt_file_open(task, &file, NXT_FILE_RDONLY, NXT_FILE_OPEN, 0); if (ret == NXT_OK) { ret = nxt_file_info(&file, &fi); + if (nxt_slow_path(ret != NXT_OK)) { + goto fail; + } - if (nxt_fast_path(ret == NXT_OK && nxt_is_file(&fi))) { - conf->length = nxt_file_size(&fi); - conf->start = nxt_mp_alloc(mp, conf->length); - if (nxt_slow_path(conf->start == NULL)) { - nxt_file_close(task, &file); - return NXT_ERROR; + if (nxt_fast_path(nxt_is_file(&fi))) { + str->length = nxt_file_size(&fi); + str->start = nxt_mp_nget(mp, str->length); + if (nxt_slow_path(str->start == NULL)) { + goto fail; } - n = nxt_file_read(&file, conf->start, conf->length, 0); + n = nxt_file_read(&file, str->start, str->length, 0); + if (nxt_slow_path(n != (ssize_t) str->length)) { + goto fail; + } - if (nxt_slow_path(n != (ssize_t) conf->length)) { - conf->start = NULL; - conf->length = 0; + nxt_file_close(task, &file); - nxt_alert(task, "failed to restore previous configuration: " - "cannot read the file"); - } + return NXT_OK; } nxt_file_close(task, &file); } -#if (NXT_TLS) - ctrl_init.certs = nxt_cert_store_load(task, mp); + return NXT_DECLINED; - nxt_mp_cleanup(mp, nxt_controller_cert_cleanup, task, ctrl_init.certs, rt); -#endif +fail: - process->data.controller = ctrl_init; + nxt_file_close(task, &file); - return NXT_OK; + return NXT_ERROR; } @@ -293,6 +342,8 @@ nxt_controller_start(nxt_task_t *task, nxt_process_data_t *data) } vldt.conf = conf; + vldt.conf_pool = mp; + vldt.ver = nxt_conf_ver; ret = nxt_conf_validate(&vldt); @@ -1224,6 +1275,8 @@ nxt_controller_process_config(nxt_task_t *task, nxt_controller_request_t *req, vldt.conf = value; vldt.pool = c->mem_pool; + vldt.conf_pool = mp; + vldt.ver = NXT_VERNUM; rc = nxt_conf_validate(&vldt); @@ -1305,6 +1358,8 @@ nxt_controller_process_config(nxt_task_t *task, nxt_controller_request_t *req, vldt.conf = value; vldt.pool = c->mem_pool; + vldt.conf_pool = mp; + vldt.ver = NXT_VERNUM; rc = nxt_conf_validate(&vldt); diff --git a/src/nxt_external.c b/src/nxt_external.c index 5703e294..b41ca51b 100644 --- a/src/nxt_external.c +++ b/src/nxt_external.c @@ -67,7 +67,7 @@ nxt_external_start(nxt_task_t *task, nxt_process_data_t *data) nxt_str_t str; nxt_int_t rc; nxt_uint_t i, argc; - nxt_port_t *my_port, *main_port, *router_port; + nxt_port_t *my_port, *proto_port, *router_port; nxt_runtime_t *rt; nxt_conf_value_t *value; nxt_common_app_conf_t *conf; @@ -76,17 +76,17 @@ nxt_external_start(nxt_task_t *task, nxt_process_data_t *data) rt = task->thread->runtime; conf = data->app; - main_port = rt->port_by_type[NXT_PROCESS_MAIN]; + proto_port = rt->port_by_type[NXT_PROCESS_PROTOTYPE]; router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; my_port = nxt_runtime_port_find(rt, nxt_pid, 0); - if (nxt_slow_path(main_port == NULL || my_port == NULL + if (nxt_slow_path(proto_port == NULL || my_port == NULL || router_port == NULL)) { return NXT_ERROR; } - rc = nxt_external_fd_no_cloexec(task, main_port->pair[1]); + rc = nxt_external_fd_no_cloexec(task, proto_port->pair[1]); if (nxt_slow_path(rc != NXT_OK)) { return NXT_ERROR; } @@ -113,13 +113,13 @@ nxt_external_start(nxt_task_t *task, nxt_process_data_t *data) "%PI,%ud,%d;" "%PI,%ud,%d;" "%PI,%ud,%d,%d;" - "%d,%z,%Z", + "%d,%z,%uD,%Z", NXT_VERSION, my_port->process->stream, - main_port->pid, main_port->id, main_port->pair[1], + proto_port->pid, proto_port->id, proto_port->pair[1], router_port->pid, router_port->id, router_port->pair[1], my_port->pid, my_port->id, my_port->pair[0], my_port->pair[1], - 2, conf->shm_limit); + 2, conf->shm_limit, conf->request_limit); if (nxt_slow_path(p == end)) { nxt_alert(task, "internal error: buffer too small for NXT_UNIT_INIT"); diff --git a/src/nxt_h1proto.c b/src/nxt_h1proto.c index b683cb22..b683cb22 100755..100644 --- a/src/nxt_h1proto.c +++ b/src/nxt_h1proto.c diff --git a/src/nxt_h1proto_websocket.c b/src/nxt_h1proto_websocket.c index 42a50a34..7be190f6 100644 --- a/src/nxt_h1proto_websocket.c +++ b/src/nxt_h1proto_websocket.c @@ -73,10 +73,10 @@ void nxt_h1p_websocket_first_frame_start(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *ws_frame) { - nxt_conn_t *c; - nxt_timer_t *timer; - nxt_h1proto_t *h1p; - nxt_socket_conf_joint_t *joint; + nxt_conn_t *c; + nxt_timer_t *timer; + nxt_h1proto_t *h1p; + nxt_websocket_conf_t *websocket_conf; nxt_debug(task, "h1p ws first frame start"); @@ -87,11 +87,9 @@ nxt_h1p_websocket_first_frame_start(nxt_task_t *task, nxt_http_request_t *r, nxt_conn_tcp_nodelay_on(task, c); } - joint = c->listen->socket.data; + websocket_conf = &r->conf->socket_conf->websocket_conf; - if (nxt_slow_path(joint != NULL - && joint->socket_conf->websocket_conf.keepalive_interval != 0)) - { + if (nxt_slow_path(websocket_conf->keepalive_interval != 0)) { h1p->websocket_timer = nxt_mp_zget(c->mem_pool, sizeof(nxt_h1p_websocket_timer_t)); if (nxt_slow_path(h1p->websocket_timer == NULL)) { @@ -100,7 +98,7 @@ nxt_h1p_websocket_first_frame_start(nxt_task_t *task, nxt_http_request_t *r, } h1p->websocket_timer->keepalive_interval = - joint->socket_conf->websocket_conf.keepalive_interval; + websocket_conf->keepalive_interval; h1p->websocket_timer->h1p = h1p; timer = &h1p->websocket_timer->timer; @@ -218,14 +216,13 @@ static const nxt_conn_state_t nxt_h1p_read_ws_frame_header_state static void nxt_h1p_conn_ws_frame_header_read(nxt_task_t *task, void *obj, void *data) { - size_t size, hsize, frame_size, max_frame_size; - uint64_t payload_len; - nxt_conn_t *c; - nxt_h1proto_t *h1p; - nxt_http_request_t *r; - nxt_event_engine_t *engine; - nxt_websocket_header_t *wsh; - nxt_socket_conf_joint_t *joint; + size_t size, hsize, frame_size, max_frame_size; + uint64_t payload_len; + nxt_conn_t *c; + nxt_h1proto_t *h1p; + nxt_http_request_t *r; + nxt_event_engine_t *engine; + nxt_websocket_header_t *wsh; c = obj; h1p = data; @@ -265,17 +262,6 @@ nxt_h1p_conn_ws_frame_header_read(nxt_task_t *task, void *obj, void *data) r->ws_frame = c->read; - joint = c->listen->socket.data; - - if (nxt_slow_path(joint == NULL)) { - /* - * Listening socket had been closed while - * connection was in keep-alive state. - */ - c->read_state = &nxt_h1p_idle_close_state; - return; - } - if (nxt_slow_path(wsh->mask == 0)) { hxt_h1p_send_ws_error(task, r, &nxt_ws_err_not_masked); return; @@ -330,7 +316,7 @@ nxt_h1p_conn_ws_frame_header_read(nxt_task_t *task, void *obj, void *data) h1p->websocket_cont_expected = !wsh->fin; } - max_frame_size = joint->socket_conf->websocket_conf.max_frame_size; + max_frame_size = r->conf->socket_conf->websocket_conf.max_frame_size; payload_len = nxt_websocket_frame_payload_len(wsh); diff --git a/src/nxt_http.h b/src/nxt_http.h index 3bc2fd61..02d66f58 100644 --- a/src/nxt_http.h +++ b/src/nxt_http.h @@ -148,6 +148,7 @@ struct nxt_http_request_s { nxt_str_t *path; nxt_str_t *args; + nxt_str_t args_decoded; nxt_array_t *arguments; /* of nxt_http_name_value_t */ nxt_array_t *cookies; /* of nxt_http_name_value_t */ nxt_list_t *fields; @@ -226,9 +227,9 @@ struct nxt_http_action_s { nxt_upstream_t *upstream; uint32_t upstream_number; nxt_var_t *var; + nxt_str_t *pass; } u; - nxt_str_t name; nxt_http_action_t *fallback; }; @@ -313,7 +314,7 @@ nxt_int_t nxt_http_request_content_length(void *ctx, nxt_http_field_t *field, nxt_http_routes_t *nxt_http_routes_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_conf_value_t *routes_conf); nxt_http_action_t *nxt_http_action_create(nxt_task_t *task, - nxt_router_temp_conf_t *tmcf, nxt_str_t *name); + nxt_router_temp_conf_t *tmcf, nxt_str_t *pass); nxt_int_t nxt_http_routes_resolve(nxt_task_t *task, nxt_router_temp_conf_t *tmcf); nxt_int_t nxt_http_pass_segments(nxt_mp_t *mp, nxt_str_t *pass, diff --git a/src/nxt_http_request.c b/src/nxt_http_request.c index b71b25d9..ac614df6 100644 --- a/src/nxt_http_request.c +++ b/src/nxt_http_request.c @@ -460,8 +460,6 @@ nxt_http_request_action(nxt_task_t *task, nxt_http_request_t *r, if (nxt_fast_path(action != NULL)) { do { - nxt_debug(task, "http request route: %V", &action->name); - action = action->handler(task, r, action); if (action == NULL) { diff --git a/src/nxt_http_route.c b/src/nxt_http_route.c index cff69f96..606bf266 100644 --- a/src/nxt_http_route.c +++ b/src/nxt_http_route.c @@ -19,6 +19,7 @@ typedef enum { NXT_HTTP_ROUTE_ARGUMENT, NXT_HTTP_ROUTE_COOKIE, NXT_HTTP_ROUTE_SCHEME, + NXT_HTTP_ROUTE_QUERY, NXT_HTTP_ROUTE_SOURCE, NXT_HTTP_ROUTE_DESTINATION, } nxt_http_route_object_t; @@ -54,6 +55,7 @@ typedef struct { nxt_conf_value_t *arguments; nxt_conf_value_t *cookies; nxt_conf_value_t *scheme; + nxt_conf_value_t *query; nxt_conf_value_t *source; nxt_conf_value_t *destination; } nxt_http_route_match_conf_t; @@ -216,14 +218,12 @@ static nxt_int_t nxt_http_route_resolve(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_http_route_t *route); static nxt_int_t nxt_http_action_resolve(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_http_action_t *action); -static nxt_http_action_t *nxt_http_action_pass_var(nxt_task_t *task, +static nxt_http_action_t *nxt_http_pass_var(nxt_task_t *task, nxt_http_request_t *r, nxt_http_action_t *action); -static void nxt_http_action_pass_var_ready(nxt_task_t *task, void *obj, - void *data); -static void nxt_http_action_pass_var_error(nxt_task_t *task, void *obj, - void *data); -static nxt_int_t nxt_http_pass_find(nxt_task_t *task, nxt_mp_t *mp, - nxt_router_conf_t *rtcf, nxt_http_action_t *action); +static void nxt_http_pass_var_ready(nxt_task_t *task, void *obj, void *data); +static void nxt_http_pass_var_error(nxt_task_t *task, void *obj, void *data); +static nxt_int_t nxt_http_pass_find(nxt_mp_t *mp, nxt_router_conf_t *rtcf, + nxt_str_t *pass, nxt_http_action_t *action); static nxt_int_t nxt_http_route_find(nxt_http_routes_t *routes, nxt_str_t *name, nxt_http_action_t *action); @@ -249,6 +249,8 @@ static nxt_int_t nxt_http_route_test_argument(nxt_http_request_t *r, nxt_http_route_rule_t *rule, nxt_array_t *array); static nxt_int_t nxt_http_route_scheme(nxt_http_request_t *r, nxt_http_route_rule_t *rule); +static nxt_int_t nxt_http_route_query(nxt_http_request_t *r, + nxt_http_route_rule_t *rule); static nxt_int_t nxt_http_route_cookies(nxt_http_request_t *r, nxt_http_route_rule_t *rule); static nxt_array_t *nxt_http_route_cookies_parse(nxt_http_request_t *r); @@ -368,6 +370,12 @@ static nxt_conf_map_t nxt_http_route_match_conf[] = { }, { + nxt_string("query"), + NXT_CONF_MAP_PTR, + offsetof(nxt_http_route_match_conf_t, query), + }, + + { nxt_string("source"), NXT_CONF_MAP_PTR, offsetof(nxt_http_route_match_conf_t, source), @@ -566,6 +574,19 @@ nxt_http_route_match_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, test++; } + if (mtcf.query != NULL) { + rule = nxt_http_route_rule_create(task, mp, mtcf.query, 1, + NXT_HTTP_ROUTE_PATTERN_NOCASE, + NXT_HTTP_ROUTE_ENCODING_URI_PLUS); + if (rule == NULL) { + return NULL; + } + + rule->object = NXT_HTTP_ROUTE_QUERY; + test->rule = rule; + test++; + } + if (mtcf.source != NULL) { addr_rule = nxt_http_route_addr_rule_create(task, mp, mtcf.source); if (addr_rule == NULL) { @@ -652,7 +673,7 @@ nxt_http_action_init(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, { nxt_mp_t *mp; nxt_int_t ret; - nxt_str_t name, *string; + nxt_str_t pass; nxt_http_action_conf_t acf; nxt_memzero(&acf, sizeof(acf)); @@ -679,10 +700,10 @@ nxt_http_action_init(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, return nxt_http_proxy_init(mp, action, &acf); } - nxt_conf_get_string(acf.pass, &name); + nxt_conf_get_string(acf.pass, &pass); - string = nxt_str_dup(mp, &action->name, &name); - if (nxt_slow_path(string == NULL)) { + action->u.var = nxt_var_compile(&pass, mp, 0); + if (nxt_slow_path(action->u.var == NULL)) { return NXT_ERROR; } @@ -1372,7 +1393,7 @@ nxt_http_action_resolve(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_http_action_t *action) { nxt_int_t ret; - nxt_var_t *var; + nxt_str_t pass; if (action->handler != NULL) { if (action->fallback != NULL) { @@ -1382,20 +1403,17 @@ nxt_http_action_resolve(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, return NXT_OK; } - if (nxt_is_var(&action->name)) { - var = nxt_var_compile(&action->name, tmcf->router_conf->mem_pool); - if (nxt_slow_path(var == NULL)) { + if (nxt_var_is_const(action->u.var)) { + nxt_var_raw(action->u.var, &pass); + + ret = nxt_http_pass_find(tmcf->mem_pool, tmcf->router_conf, &pass, + action); + if (nxt_slow_path(ret != NXT_OK)) { return NXT_ERROR; } - action->u.var = var; - action->handler = nxt_http_action_pass_var; - return NXT_OK; - } - - ret = nxt_http_pass_find(task, tmcf->mem_pool, tmcf->router_conf, action); - if (nxt_slow_path(ret != NXT_OK)) { - return NXT_ERROR; + } else { + action->handler = nxt_http_pass_var; } return NXT_OK; @@ -1403,28 +1421,36 @@ nxt_http_action_resolve(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, static nxt_http_action_t * -nxt_http_action_pass_var(nxt_task_t *task, nxt_http_request_t *r, +nxt_http_pass_var(nxt_task_t *task, nxt_http_request_t *r, nxt_http_action_t *action) { - nxt_var_t *var; nxt_int_t ret; + nxt_str_t str; + nxt_var_t *var; + + var = action->u.var; + + nxt_var_raw(var, &str); + + nxt_debug(task, "http pass: \"%V\"", &str); ret = nxt_var_query_init(&r->var_query, r, r->mem_pool); if (nxt_slow_path(ret != NXT_OK)) { goto fail; } - var = action->u.var; - - action = nxt_mp_get(r->mem_pool, sizeof(nxt_http_action_t)); + action = nxt_mp_get(r->mem_pool, + sizeof(nxt_http_action_t) + sizeof(nxt_str_t)); if (nxt_slow_path(action == NULL)) { goto fail; } - nxt_var_query(task, r->var_query, var, &action->name); + action->u.pass = nxt_pointer_to(action, sizeof(nxt_http_action_t)); + + nxt_var_query(task, r->var_query, var, action->u.pass); nxt_var_query_resolve(task, r->var_query, action, - nxt_http_action_pass_var_ready, - nxt_http_action_pass_var_error); + nxt_http_pass_var_ready, + nxt_http_pass_var_error); return NULL; fail: @@ -1435,7 +1461,7 @@ fail: static void -nxt_http_action_pass_var_ready(nxt_task_t *task, void *obj, void *data) +nxt_http_pass_var_ready(nxt_task_t *task, void *obj, void *data) { nxt_int_t ret; nxt_router_conf_t *rtcf; @@ -1447,9 +1473,9 @@ nxt_http_action_pass_var_ready(nxt_task_t *task, void *obj, void *data) action = data; rtcf = r->conf->socket_conf->router_conf; - nxt_debug(task, "http pass lookup: %V", &action->name); + nxt_debug(task, "http pass lookup: %V", action->u.pass); - ret = nxt_http_pass_find(task, r->mem_pool, rtcf, action); + ret = nxt_http_pass_find(r->mem_pool, rtcf, action->u.pass, action); if (ret != NXT_OK) { status = (ret == NXT_DECLINED) ? NXT_HTTP_NOT_FOUND @@ -1464,7 +1490,7 @@ nxt_http_action_pass_var_ready(nxt_task_t *task, void *obj, void *data) static void -nxt_http_action_pass_var_error(nxt_task_t *task, void *obj, void *data) +nxt_http_pass_var_error(nxt_task_t *task, void *obj, void *data) { nxt_http_request_t *r; @@ -1475,13 +1501,13 @@ nxt_http_action_pass_var_error(nxt_task_t *task, void *obj, void *data) static nxt_int_t -nxt_http_pass_find(nxt_task_t *task, nxt_mp_t *mp, nxt_router_conf_t *rtcf, +nxt_http_pass_find(nxt_mp_t *mp, nxt_router_conf_t *rtcf, nxt_str_t *pass, nxt_http_action_t *action) { - nxt_int_t ret; - nxt_str_t segments[3]; + nxt_int_t ret; + nxt_str_t segments[3]; - ret = nxt_http_pass_segments(mp, &action->name, segments, 3); + ret = nxt_http_pass_segments(mp, pass, segments, 3); if (nxt_slow_path(ret != NXT_OK)) { return ret; } @@ -1587,18 +1613,24 @@ nxt_http_route_find(nxt_http_routes_t *routes, nxt_str_t *name, nxt_http_action_t * nxt_http_action_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, - nxt_str_t *name) + nxt_str_t *pass) { + nxt_mp_t *mp; nxt_int_t ret; nxt_http_action_t *action; - action = nxt_mp_alloc(tmcf->router_conf->mem_pool, - sizeof(nxt_http_action_t)); + mp = tmcf->router_conf->mem_pool; + + action = nxt_mp_alloc(mp, sizeof(nxt_http_action_t)); if (nxt_slow_path(action == NULL)) { return NULL; } - action->name = *name; + action->u.var = nxt_var_compile(pass, mp, 0); + if (nxt_slow_path(action->u.var == NULL)) { + return NULL; + } + action->handler = NULL; ret = nxt_http_action_resolve(task, tmcf, action); @@ -1623,8 +1655,6 @@ nxt_http_pass_application(nxt_task_t *task, nxt_router_conf_t *rtcf, return NULL; } - action->name = *name; - (void) nxt_router_application_init(rtcf, name, NULL, action); return action; @@ -1769,6 +1799,9 @@ nxt_http_route_rule(nxt_http_request_t *r, nxt_http_route_rule_t *rule) case NXT_HTTP_ROUTE_SCHEME: return nxt_http_route_scheme(r, rule); + case NXT_HTTP_ROUTE_QUERY: + return nxt_http_route_query(r, rule); + default: break; } @@ -2001,10 +2034,6 @@ nxt_http_route_arguments(nxt_http_request_t *r, nxt_http_route_rule_t *rule) { nxt_array_t *arguments; - if (r->args == NULL) { - return 0; - } - arguments = nxt_http_route_arguments_parse(r); if (nxt_slow_path(arguments == NULL)) { return -1; @@ -2018,10 +2047,9 @@ static nxt_array_t * nxt_http_route_arguments_parse(nxt_http_request_t *r) { size_t name_length; - u_char c, *p, *dst, *dst_start, *start, *end, *name; + u_char *p, *dst, *dst_start, *start, *end, *name; uint8_t d0, d1; uint32_t hash; - nxt_bool_t valid; nxt_array_t *args; nxt_http_name_value_t *nv; @@ -2035,7 +2063,6 @@ nxt_http_route_arguments_parse(nxt_http_request_t *r) } hash = NXT_HTTP_FIELD_HASH_INIT; - valid = 1; name = NULL; name_length = 0; @@ -2044,28 +2071,26 @@ nxt_http_route_arguments_parse(nxt_http_request_t *r) return NULL; } + r->args_decoded.start = dst_start; + start = r->args->start; end = start + r->args->length; for (p = start, dst = dst_start; p < end; p++, dst++) { - c = *p; - *dst = c; + *dst = *p; - switch (c) { + switch (*p) { case '=': - if (name != NULL) { - break; + if (name == NULL) { + name_length = dst - dst_start; + name = dst_start; + dst_start = dst + 1; } - name_length = dst - dst_start; - valid = (name_length != 0); - name = dst_start; - dst_start = dst + 1; - continue; case '&': - if (valid) { + if (name_length != 0 || dst != dst_start) { nv = nxt_http_route_argument(args, name, name_length, hash, dst_start, dst); if (nxt_slow_path(nv == NULL)) { @@ -2075,14 +2100,12 @@ nxt_http_route_arguments_parse(nxt_http_request_t *r) hash = NXT_HTTP_FIELD_HASH_INIT; name_length = 0; - valid = 1; name = NULL; dst_start = dst + 1; continue; case '+': - c = ' '; *dst = ' '; break; @@ -2100,18 +2123,19 @@ nxt_http_route_arguments_parse(nxt_http_request_t *r) } p += 2; - c = (d0 << 4) + d1; - *dst = c; + *dst = (d0 << 4) + d1; break; } if (name == NULL) { - hash = nxt_http_field_hash_char(hash, c); + hash = nxt_http_field_hash_char(hash, *dst); } } - if (valid) { + r->args_decoded.length = dst - r->args_decoded.start; + + if (name_length != 0 || dst != dst_start) { nv = nxt_http_route_argument(args, name, name_length, hash, dst_start, dst); if (nxt_slow_path(nv == NULL)) { @@ -2207,6 +2231,21 @@ nxt_http_route_scheme(nxt_http_request_t *r, nxt_http_route_rule_t *rule) static nxt_int_t +nxt_http_route_query(nxt_http_request_t *r, nxt_http_route_rule_t *rule) +{ + nxt_array_t *arguments; + + arguments = nxt_http_route_arguments_parse(r); + if (nxt_slow_path(arguments == NULL)) { + return -1; + } + + return nxt_http_route_test_rule(r, rule, r->args_decoded.start, + r->args_decoded.length); +} + + +static nxt_int_t nxt_http_route_cookies(nxt_http_request_t *r, nxt_http_route_rule_t *rule) { nxt_array_t *cookies; diff --git a/src/nxt_http_static.c b/src/nxt_http_static.c index 9b79a666..36c1ebc9 100644 --- a/src/nxt_http_static.c +++ b/src/nxt_http_static.c @@ -8,19 +8,51 @@ typedef struct { - nxt_str_t share; - nxt_str_t chroot; - nxt_uint_t resolve; - nxt_http_route_rule_t *types; + nxt_var_t *var; +#if (NXT_HAVE_OPENAT2) + u_char *fname; +#endif + uint8_t is_const; /* 1 bit */ +} nxt_http_static_share_t; + + +typedef struct { + nxt_uint_t nshares; + nxt_http_static_share_t *shares; +#if (NXT_HAVE_OPENAT2) + nxt_var_t *chroot; + nxt_uint_t resolve; +#endif + nxt_http_route_rule_t *types; } nxt_http_static_conf_t; +typedef struct { + nxt_http_action_t *action; + nxt_str_t share; +#if (NXT_HAVE_OPENAT2) + nxt_str_t chroot; +#endif + uint32_t index; + uint8_t need_body; /* 1 bit */ +} nxt_http_static_ctx_t; + + #define NXT_HTTP_STATIC_BUF_COUNT 2 #define NXT_HTTP_STATIC_BUF_SIZE (128 * 1024) static nxt_http_action_t *nxt_http_static(nxt_task_t *task, nxt_http_request_t *r, nxt_http_action_t *action); +static void nxt_http_static_iterate(nxt_task_t *task, nxt_http_request_t *r, + nxt_http_static_ctx_t *ctx); +static void nxt_http_static_send_ready(nxt_task_t *task, void *obj, void *data); +static void nxt_http_static_var_error(nxt_task_t *task, void *obj, void *data); +static void nxt_http_static_next(nxt_task_t *task, nxt_http_request_t *r, + nxt_http_static_ctx_t *ctx, nxt_http_status_t status); +#if (NXT_HAVE_OPENAT2) +static u_char *nxt_http_static_chroot_match(u_char *chr, u_char *shr); +#endif static void nxt_http_static_extract_extension(nxt_str_t *path, nxt_str_t *exten); static void nxt_http_static_body_handler(nxt_task_t *task, void *obj, @@ -41,8 +73,12 @@ nxt_int_t nxt_http_static_init(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_http_action_t *action, nxt_http_action_conf_t *acf) { + uint32_t i; nxt_mp_t *mp; - nxt_str_t *str, value; + nxt_str_t str; + nxt_var_t *var; + nxt_bool_t array; + nxt_conf_value_t *cv; nxt_http_static_conf_t *conf; mp = tmcf->router_conf->mem_pool; @@ -55,39 +91,64 @@ nxt_http_static_init(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, action->handler = nxt_http_static; action->u.conf = conf; - nxt_conf_get_string(acf->share, &value); + array = (nxt_conf_type(acf->share) == NXT_CONF_ARRAY); + conf->nshares = array ? nxt_conf_array_elements_count(acf->share) : 1; - str = nxt_str_dup(mp, &conf->share, &value); - if (nxt_slow_path(str == NULL)) { + conf->shares = nxt_mp_zget(mp, sizeof(nxt_http_static_share_t) + * conf->nshares); + if (nxt_slow_path(conf->shares == NULL)) { return NXT_ERROR; } -#if (NXT_HAVE_OPENAT2) - if (acf->chroot.length > 0) { - u_char *p; - nxt_str_t slash; + if (array) { + for (i = 0; i < conf->nshares; i++) { + cv = nxt_conf_get_array_element(acf->share, i); + nxt_conf_get_string(cv, &str); - if (acf->chroot.start[acf->chroot.length - 1] != '/') { - nxt_str_set(&slash, "/"); + var = nxt_var_compile(&str, mp, 1); + if (nxt_slow_path(var == NULL)) { + return NXT_ERROR; + } - } else { - nxt_str_set(&slash, ""); + conf->shares[i].var = var; + conf->shares[i].is_const = nxt_var_is_const(var); } - value.length = acf->chroot.length + slash.length; + } else { + nxt_conf_get_string(acf->share, &str); - value.start = nxt_mp_alloc(mp, value.length + 1); - if (nxt_slow_path(value.start == NULL)) { + var = nxt_var_compile(&str, mp, 1); + if (nxt_slow_path(var == NULL)) { return NXT_ERROR; } - p = value.start; - p = nxt_cpymem(p, acf->chroot.start, acf->chroot.length); - p = nxt_cpymem(p, slash.start, slash.length); - *p = '\0'; + conf->shares[0].var = var; + conf->shares[0].is_const = nxt_var_is_const(var); + } + +#if (NXT_HAVE_OPENAT2) + if (acf->chroot.length > 0) { + nxt_str_t chr, shr; + nxt_bool_t is_const; + + conf->chroot = nxt_var_compile(&acf->chroot, mp, 1); + if (nxt_slow_path(conf->chroot == NULL)) { + return NXT_ERROR; + } + + is_const = nxt_var_is_const(conf->chroot); - conf->chroot = value; - conf->resolve |= RESOLVE_IN_ROOT; + for (i = 0; i < conf->nshares; i++) { + conf->shares[i].is_const &= is_const; + + if (conf->shares[i].is_const) { + nxt_var_raw(conf->chroot, &chr); + nxt_var_raw(conf->shares[i].var, &shr); + + conf->shares[i].fname = nxt_http_static_chroot_match(chr.start, + shr.start); + } + } } if (acf->follow_symlinks != NULL @@ -128,25 +189,8 @@ static nxt_http_action_t * nxt_http_static(nxt_task_t *task, nxt_http_request_t *r, nxt_http_action_t *action) { - size_t length, encode; - u_char *p, *fname; - struct tm tm; - nxt_buf_t *fb; - nxt_int_t ret; - nxt_str_t index, exten, *mtype, *chroot; - nxt_uint_t level; - nxt_bool_t need_body; - nxt_file_t *f, file; - nxt_file_info_t fi; - nxt_http_field_t *field; - nxt_http_status_t status; - nxt_router_conf_t *rtcf; - nxt_work_handler_t body_handler; - nxt_http_static_conf_t *conf; - - conf = action->u.conf; - - nxt_debug(task, "http static: \"%V\"", &conf->share); + nxt_bool_t need_body; + nxt_http_static_ctx_t *ctx; if (nxt_slow_path(!nxt_str_eq(r->method, "GET", 3))) { @@ -165,71 +209,184 @@ nxt_http_static(nxt_task_t *task, nxt_http_request_t *r, need_body = 1; } - if (r->path->start[r->path->length - 1] == '/') { - /* TODO: dynamic index setting. */ - nxt_str_set(&index, "index.html"); - nxt_str_set(&exten, ".html"); + ctx = nxt_mp_zget(r->mem_pool, sizeof(nxt_http_static_ctx_t)); + if (nxt_slow_path(ctx == NULL)) { + nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); + return NULL; + } + + ctx->action = action; + ctx->need_body = need_body; + + nxt_http_static_iterate(task, r, ctx); + + return NULL; +} + + +static void +nxt_http_static_iterate(nxt_task_t *task, nxt_http_request_t *r, + nxt_http_static_ctx_t *ctx) +{ + nxt_int_t ret; + nxt_http_static_conf_t *conf; + nxt_http_static_share_t *share; + + conf = ctx->action->u.conf; + + share = &conf->shares[ctx->index]; + +#if (NXT_DEBUG) + nxt_str_t shr; + + nxt_var_raw(share->var, &shr); + +#if (NXT_HAVE_OPENAT2) + nxt_str_t chr; + + if (conf->chroot != NULL) { + nxt_var_raw(conf->chroot, &chr); } else { - nxt_str_set(&index, ""); - nxt_str_null(&exten); + nxt_str_set(&chr, ""); } - f = NULL; + nxt_debug(task, "http static: \"%V\" (chroot: \"%V\")", &shr, &chr); +#else + nxt_debug(task, "http static: \"%V\"", &shr); +#endif +#endif /* NXT_DEBUG */ + + if (share->is_const) { + nxt_var_raw(share->var, &ctx->share); + +#if (NXT_HAVE_OPENAT2) + if (conf->chroot != NULL && ctx->index == 0) { + nxt_var_raw(conf->chroot, &ctx->chroot); + } +#endif + + nxt_http_static_send_ready(task, r, ctx); + + } else { + ret = nxt_var_query_init(&r->var_query, r, r->mem_pool); + if (nxt_slow_path(ret != NXT_OK)) { + nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); + return; + } + nxt_var_query(task, r->var_query, share->var, &ctx->share); + +#if (NXT_HAVE_OPENAT2) + if (conf->chroot != NULL && ctx->index == 0) { + nxt_var_query(task, r->var_query, conf->chroot, &ctx->chroot); + } +#endif + + nxt_var_query_resolve(task, r->var_query, ctx, + nxt_http_static_send_ready, + nxt_http_static_var_error); + } +} + + +static void +nxt_http_static_send_ready(nxt_task_t *task, void *obj, void *data) +{ + size_t length, encode; + u_char *p, *fname; + struct tm tm; + nxt_buf_t *fb; + nxt_int_t ret; + nxt_str_t *shr, exten, *mtype; + nxt_uint_t level; + nxt_file_t *f, file; + nxt_file_info_t fi; + nxt_http_field_t *field; + nxt_http_status_t status; + nxt_router_conf_t *rtcf; + nxt_http_action_t *action; + nxt_http_request_t *r; + nxt_work_handler_t body_handler; + nxt_http_static_ctx_t *ctx; + nxt_http_static_conf_t *conf; + + static const nxt_str_t index = nxt_string("index.html"); + + r = obj; + ctx = data; + action = ctx->action; + conf = action->u.conf; rtcf = r->conf->socket_conf->router_conf; + f = NULL; mtype = NULL; - if (conf->types != NULL && exten.start == NULL) { - nxt_http_static_extract_extension(r->path, &exten); - mtype = nxt_http_static_mtype_get(&rtcf->mtypes_hash, &exten); + shr = &ctx->share; + + if (shr->start[shr->length - 1] == '/') { + /* TODO: dynamic index setting. */ + nxt_str_set(&exten, ".html"); - ret = nxt_http_route_test_rule(r, conf->types, mtype->start, - mtype->length); - if (nxt_slow_path(ret == NXT_ERROR)) { + length = shr->length + index.length; + + fname = nxt_mp_nget(r->mem_pool, length + 1); + if (nxt_slow_path(fname == NULL)) { goto fail; } - if (ret == 0) { - if (action->fallback != NULL) { - return action->fallback; + p = fname; + p = nxt_cpymem(p, shr->start, shr->length); + p = nxt_cpymem(p, index.start, index.length); + *p = '\0'; + + } else { + if (conf->types == NULL) { + nxt_str_null(&exten); + + } else { + nxt_http_static_extract_extension(shr, &exten); + mtype = nxt_http_static_mtype_get(&rtcf->mtypes_hash, &exten); + + ret = nxt_http_route_test_rule(r, conf->types, mtype->start, + mtype->length); + if (nxt_slow_path(ret == NXT_ERROR)) { + goto fail; } - nxt_http_request_error(task, r, NXT_HTTP_FORBIDDEN); - return NULL; + if (ret == 0) { + nxt_http_static_next(task, r, ctx, NXT_HTTP_FORBIDDEN); + return; + } } - } - length = conf->share.length + r->path->length + index.length; - - fname = nxt_mp_nget(r->mem_pool, length + 1); - if (nxt_slow_path(fname == NULL)) { - goto fail; + fname = ctx->share.start; } - p = fname; - p = nxt_cpymem(p, conf->share.start, conf->share.length); - p = nxt_cpymem(p, r->path->start, r->path->length); - p = nxt_cpymem(p, index.start, index.length); - *p = '\0'; - nxt_memzero(&file, sizeof(nxt_file_t)); file.name = fname; - chroot = &conf->chroot; - #if (NXT_HAVE_OPENAT2) - if (conf->resolve != 0) { + if (conf->resolve != 0 || ctx->chroot.length > 0) { + nxt_str_t *chr; + nxt_uint_t resolve; + nxt_http_static_share_t *share; + + share = &conf->shares[ctx->index]; + + resolve = conf->resolve; + chr = &ctx->chroot; - if (chroot->length > 0) { - file.name = chroot->start; + if (chr->length > 0) { + resolve |= RESOLVE_IN_ROOT; - if (length > chroot->length - && nxt_memcmp(fname, chroot->start, chroot->length) == 0) - { - fname += chroot->length; + fname = share->is_const + ? share->fname + : nxt_http_static_chroot_match(chr->start, file.name); + + if (fname != NULL) { + file.name = chr->start; ret = nxt_file_open(task, &file, NXT_FILE_SEARCH, NXT_FILE_OPEN, 0); @@ -256,7 +413,7 @@ nxt_http_static(nxt_task_t *task, nxt_http_request_t *r, file.name = fname; ret = nxt_file_openat2(task, &file, NXT_FILE_RDONLY, - NXT_FILE_OPEN, 0, af.fd, conf->resolve); + NXT_FILE_OPEN, 0, af.fd, resolve); if (af.fd != AT_FDCWD) { nxt_file_close(task, &af); @@ -308,27 +465,35 @@ nxt_http_static(nxt_task_t *task, nxt_http_request_t *r, break; } - if (level == NXT_LOG_ERR && action->fallback != NULL) { - return action->fallback; - } - if (status != NXT_HTTP_NOT_FOUND) { - if (chroot->length > 0) { +#if (NXT_HAVE_OPENAT2) + nxt_str_t *chr = &ctx->chroot; + + if (chr->length > 0) { nxt_log(task, level, "opening \"%s\" at \"%V\" failed %E", - fname, chroot, file.error); + fname, chr, file.error); } else { nxt_log(task, level, "opening \"%s\" failed %E", fname, file.error); } + +#else + nxt_log(task, level, "opening \"%s\" failed %E", fname, file.error); +#endif } - nxt_http_request_error(task, r, status); - return NULL; + if (level == NXT_LOG_ERR) { + nxt_http_static_next(task, r, ctx, status); + return; + } + + goto fail; } f = nxt_mp_get(r->mem_pool, sizeof(nxt_file_t)); if (nxt_slow_path(f == NULL)) { + nxt_file_close(task, &file); goto fail; } @@ -381,7 +546,7 @@ nxt_http_static(nxt_task_t *task, nxt_http_request_t *r, - p; if (exten.start == NULL) { - nxt_http_static_extract_extension(r->path, &exten); + nxt_http_static_extract_extension(shr, &exten); } if (mtype == NULL) { @@ -400,7 +565,7 @@ nxt_http_static(nxt_task_t *task, nxt_http_request_t *r, field->value_length = mtype->length; } - if (need_body && nxt_file_size(&fi) > 0) { + if (ctx->need_body && nxt_file_size(&fi) > 0) { fb = nxt_mp_zget(r->mem_pool, NXT_BUF_FILE_SIZE); if (nxt_slow_path(fb == NULL)) { goto fail; @@ -420,19 +585,14 @@ nxt_http_static(nxt_task_t *task, nxt_http_request_t *r, } else { /* Not a file. */ - nxt_file_close(task, f); if (nxt_slow_path(!nxt_is_dir(&fi))) { - if (action->fallback != NULL) { - return action->fallback; - } - nxt_log(task, NXT_LOG_ERR, "\"%FN\" is not a regular file", f->name); - nxt_http_request_error(task, r, NXT_HTTP_NOT_FOUND); - return NULL; + nxt_http_static_next(task, r, ctx, NXT_HTTP_NOT_FOUND); + return; } f = NULL; @@ -482,19 +642,111 @@ nxt_http_static(nxt_task_t *task, nxt_http_request_t *r, nxt_http_request_header_send(task, r, body_handler, NULL); r->state = &nxt_http_static_send_state; - return NULL; + return; fail: - nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); - if (f != NULL) { nxt_file_close(task, f); } - return NULL; + nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); +} + + +static void +nxt_http_static_var_error(nxt_task_t *task, void *obj, void *data) +{ + nxt_http_request_t *r; + + r = obj; + + nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); +} + + +static void +nxt_http_static_next(nxt_task_t *task, nxt_http_request_t *r, + nxt_http_static_ctx_t *ctx, nxt_http_status_t status) +{ + nxt_http_action_t *action; + nxt_http_static_conf_t *conf; + + action = ctx->action; + conf = action->u.conf; + + ctx->index++; + + if (ctx->index < conf->nshares) { + nxt_http_static_iterate(task, r, ctx); + return; + } + + if (action->fallback != NULL) { + nxt_http_request_action(task, r, action->fallback); + return; + } + + nxt_http_request_error(task, r, status); +} + + +#if (NXT_HAVE_OPENAT2) + +static u_char * +nxt_http_static_chroot_match(u_char *chr, u_char *shr) +{ + if (*chr != *shr) { + return NULL; + } + + chr++; + shr++; + + for ( ;; ) { + if (*shr == '\0') { + return NULL; + } + + if (*chr == *shr) { + chr++; + shr++; + continue; + } + + if (*chr == '\0') { + break; + } + + if (*chr == '/') { + if (chr[-1] == '/') { + chr++; + continue; + } + + } else if (*shr == '/') { + if (shr[-1] == '/') { + shr++; + continue; + } + } + + return NULL; + } + + if (shr[-1] != '/' && *shr != '/') { + return NULL; + } + + while (*shr == '/') { + shr++; + } + + return (*shr != '\0') ? shr : NULL; } +#endif + static void nxt_http_static_extract_extension(nxt_str_t *path, nxt_str_t *exten) diff --git a/src/nxt_java.c b/src/nxt_java.c index ac715c0b..75c8ee19 100644 --- a/src/nxt_java.c +++ b/src/nxt_java.c @@ -428,7 +428,7 @@ nxt_java_start(nxt_task_t *task, nxt_process_data_t *data) return NXT_ERROR; } - nxt_unit_default_init(task, &java_init); + nxt_unit_default_init(task, &java_init, app_conf); java_init.callbacks.request_handler = nxt_java_request_handler; java_init.callbacks.websocket_handler = nxt_java_websocket_handler; @@ -437,7 +437,6 @@ nxt_java_start(nxt_task_t *task, nxt_process_data_t *data) java_init.request_data_size = sizeof(nxt_java_request_data_t); java_init.data = &java_data; java_init.ctx_data = env; - java_init.shm_limit = app_conf->shm_limit; ctx = nxt_unit_init(&java_init); if (nxt_slow_path(ctx == NULL)) { @@ -616,11 +615,6 @@ nxt_java_ready_handler(nxt_unit_ctx_t *ctx) nxt_java_data_t *java_data; nxt_java_app_conf_t *c; - /* Worker thread context. */ - if (!nxt_unit_is_main_ctx(ctx)) { - return NXT_UNIT_OK; - } - java_data = ctx->unit->data; c = java_data->conf; diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c index 16c6a297..a5a20d3d 100644 --- a/src/nxt_main_process.c +++ b/src/nxt_main_process.c @@ -10,6 +10,7 @@ #include <nxt_main_process.h> #include <nxt_conf.h> #include <nxt_router.h> +#include <nxt_port_queue.h> #if (NXT_TLS) #include <nxt_cert.h> #endif @@ -31,18 +32,9 @@ typedef struct { } nxt_conf_app_map_t; -extern nxt_port_handlers_t nxt_controller_process_port_handlers; -extern nxt_port_handlers_t nxt_router_process_port_handlers; - - static nxt_int_t nxt_main_process_port_create(nxt_task_t *task, nxt_runtime_t *rt); static void nxt_main_process_title(nxt_task_t *task); -static nxt_int_t nxt_main_process_create(nxt_task_t *task, - const nxt_process_init_t init); -static nxt_int_t nxt_main_start_process(nxt_task_t *task, - nxt_process_t *process); -static nxt_process_t *nxt_main_process_new(nxt_task_t *task, nxt_runtime_t *rt); static void nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj, void *data); static void nxt_main_process_sigquit_handler(nxt_task_t *task, void *obj, @@ -53,7 +45,7 @@ static void nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data); static void nxt_main_process_signal_handler(nxt_task_t *task, void *obj, void *data); -static void nxt_main_cleanup_process(nxt_task_t *task, nxt_pid_t pid); +static void nxt_main_process_cleanup(nxt_task_t *task, nxt_process_t *process); static void nxt_main_port_socket_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); static nxt_int_t nxt_main_listening_socket(nxt_sockaddr_t *sa, @@ -61,8 +53,12 @@ static nxt_int_t nxt_main_listening_socket(nxt_sockaddr_t *sa, static void nxt_main_port_modules_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); static int nxt_cdecl nxt_app_lang_compare(const void *v1, const void *v2); +static void nxt_main_process_whoami_handler(nxt_task_t *task, + nxt_port_recv_msg_t *msg); static void nxt_main_port_conf_store_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); +static nxt_int_t nxt_main_file_store(nxt_task_t *task, const char *tmp_name, + const char *name, u_char *buf, size_t size); static void nxt_main_port_access_log_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); @@ -77,6 +73,8 @@ const nxt_sig_event_t nxt_main_process_signals[] = { }; +nxt_uint_t nxt_conf_ver; + static nxt_bool_t nxt_exiting; @@ -97,7 +95,7 @@ nxt_main_process_start(nxt_thread_t *thr, nxt_task_t *task, * nxt_main_port_modules_handler() which starts the controller * and router processes. */ - return nxt_main_process_create(task, nxt_discovery_process); + return nxt_process_init_start(task, nxt_discovery_process); } @@ -154,6 +152,12 @@ static nxt_conf_map_t nxt_common_app_limits_conf[] = { offsetof(nxt_common_app_conf_t, shm_limit), }, + { + nxt_string("requests"), + NXT_CONF_MAP_INT32, + offsetof(nxt_common_app_conf_t, request_limit), + }, + }; @@ -325,7 +329,7 @@ static nxt_conf_app_map_t nxt_app_maps[] = { static void -nxt_port_main_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +nxt_main_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { nxt_debug(task, "main data: %*s", nxt_buf_mem_used_size(&msg->buf->mem), msg->buf->mem.pos); @@ -333,7 +337,33 @@ nxt_port_main_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) static void -nxt_port_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +nxt_main_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + void *mem; + nxt_port_t *port; + + nxt_port_new_port_handler(task, msg); + + port = msg->u.new_port; + + if (port != NULL + && port->type == NXT_PROCESS_APP + && msg->fd[1] != -1) + { + mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t), + PROT_READ | PROT_WRITE, MAP_SHARED, msg->fd[1], 0); + if (nxt_fast_path(mem != MAP_FAILED)) { + port->queue = mem; + } + + nxt_fd_close(msg->fd[1]); + msg->fd[1] = -1; + } +} + + +static void +nxt_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { u_char *start, *p, ch; size_t type_len; @@ -349,21 +379,42 @@ nxt_port_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) rt = task->thread->runtime; - process = nxt_main_process_new(task, rt); + port = rt->port_by_type[NXT_PROCESS_ROUTER]; + if (nxt_slow_path(port == NULL)) { + nxt_alert(task, "router port not found"); + return; + } + + if (nxt_slow_path(port->pid != nxt_recv_msg_cmsg_pid(msg))) { + nxt_alert(task, "process %PI cannot start processes", + nxt_recv_msg_cmsg_pid(msg)); + + return; + } + + process = nxt_process_new(rt); if (nxt_slow_path(process == NULL)) { return; } + process->mem_pool = nxt_mp_create(1024, 128, 256, 32); + if (process->mem_pool == NULL) { + nxt_process_use(task, process, -1); + return; + } + + process->parent_port = rt->port_by_type[NXT_PROCESS_MAIN]; + init = nxt_process_init(process); - *init = nxt_app_process; + *init = nxt_proto_process; b = nxt_buf_chk_make_plain(process->mem_pool, msg->buf, msg->size); if (b == NULL) { goto failed; } - nxt_debug(task, "main start process: %*s", b->mem.free - b->mem.pos, + nxt_debug(task, "main start prototype: %*s", b->mem.free - b->mem.pos, b->mem.pos); app_conf = nxt_mp_zalloc(process->mem_pool, sizeof(nxt_common_app_conf_t)); @@ -379,7 +430,7 @@ nxt_port_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) init->name = (const char *) start; process->name = nxt_mp_alloc(process->mem_pool, app_conf->name.length - + sizeof("\"\" application") + 1); + + sizeof("\"\" prototype") + 1); if (nxt_slow_path(process->name == NULL)) { goto failed; @@ -388,10 +439,11 @@ nxt_port_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) p = (u_char *) process->name; *p++ = '"'; p = nxt_cpymem(p, init->name, app_conf->name.length); - p = nxt_cpymem(p, "\" application", 13); + p = nxt_cpymem(p, "\" prototype", 11); *p = '\0'; app_conf->shm_limit = 100 * 1024 * 1024; + app_conf->request_limit = 0; start += app_conf->name.length + 1; @@ -455,7 +507,7 @@ nxt_port_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) process->stream = msg->port_msg.stream; process->data.app = app_conf; - ret = nxt_main_start_process(task, process); + ret = nxt_process_start(task, process); if (nxt_fast_path(ret == NXT_OK || ret == NXT_AGAIN)) { return; } @@ -483,21 +535,17 @@ nxt_main_process_created_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) rt = task->thread->runtime; - process = nxt_runtime_process_find(rt, msg->port_msg.pid); - if (nxt_slow_path(process == NULL)) { - return; - } - - nxt_assert(process->state == NXT_PROCESS_STATE_CREATING); - port = nxt_runtime_port_find(rt, msg->port_msg.pid, msg->port_msg.reply_port); - - if (nxt_slow_path(port == NULL)) { return; } + process = port->process; + + nxt_assert(process != NULL); + nxt_assert(process->state == NXT_PROCESS_STATE_CREATING); + #if (NXT_HAVE_CLONE && NXT_HAVE_CLONE_NEWUSER) if (nxt_is_clone_flag_set(process->isolation.clone.flags, NEWUSER)) { if (nxt_slow_path(nxt_clone_credential_map(task, process->pid, @@ -521,10 +569,13 @@ nxt_main_process_created_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) static nxt_port_handlers_t nxt_main_process_port_handlers = { - .data = nxt_port_main_data_handler, + .data = nxt_main_data_handler, + .new_port = nxt_main_new_port_handler, .process_created = nxt_main_process_created_handler, .process_ready = nxt_port_process_ready_handler, - .start_process = nxt_port_main_start_process_handler, + .whoami = nxt_main_process_whoami_handler, + .remove_pid = nxt_port_remove_pid_handler, + .start_process = nxt_main_start_process_handler, .socket = nxt_main_port_socket_handler, .modules = nxt_main_port_modules_handler, .conf_store = nxt_main_port_conf_store_handler, @@ -538,6 +589,88 @@ static nxt_port_handlers_t nxt_main_process_port_handlers = { }; +static void +nxt_main_process_whoami_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + nxt_buf_t *buf; + nxt_pid_t pid, ppid; + nxt_port_t *port; + nxt_runtime_t *rt; + nxt_process_t *pprocess; + + nxt_assert(msg->port_msg.reply_port == 0); + + if (nxt_slow_path(msg->buf == NULL + || nxt_buf_used_size(msg->buf) != sizeof(nxt_pid_t))) + { + nxt_alert(task, "whoami: buffer is NULL or unexpected size"); + goto fail; + } + + nxt_memcpy(&ppid, msg->buf->mem.pos, sizeof(nxt_pid_t)); + + rt = task->thread->runtime; + + pprocess = nxt_runtime_process_find(rt, ppid); + if (nxt_slow_path(pprocess == NULL)) { + nxt_alert(task, "whoami: parent process %PI not found", ppid); + goto fail; + } + + pid = nxt_recv_msg_cmsg_pid(msg); + + nxt_debug(task, "whoami: from %PI, parent %PI, fd %d", pid, ppid, + msg->fd[0]); + + if (msg->fd[0] != -1) { + port = nxt_runtime_process_port_create(task, rt, pid, 0, + NXT_PROCESS_APP); + if (nxt_slow_path(port == NULL)) { + goto fail; + } + + nxt_fd_nonblocking(task, msg->fd[0]); + + port->pair[0] = -1; + port->pair[1] = msg->fd[0]; + msg->fd[0] = -1; + + port->max_size = 16 * 1024; + port->max_share = 64 * 1024; + port->socket.task = task; + + nxt_port_write_enable(task, port); + + } else { + port = nxt_runtime_port_find(rt, pid, 0); + if (nxt_slow_path(port == NULL)) { + goto fail; + } + } + + if (ppid != nxt_pid) { + nxt_queue_insert_tail(&pprocess->children, &port->process->link); + } + + buf = nxt_buf_mem_alloc(task->thread->engine->mem_pool, + sizeof(nxt_pid_t), 0); + if (nxt_slow_path(buf == NULL)) { + goto fail; + } + + buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(nxt_pid_t)); + + (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_READY_LAST, -1, + msg->port_msg.stream, 0, buf); + +fail: + + if (msg->fd[0] != -1) { + nxt_fd_close(msg->fd[0]); + } +} + + static nxt_int_t nxt_main_process_port_create(nxt_task_t *task, nxt_runtime_t *rt) { @@ -597,139 +730,6 @@ nxt_main_process_title(nxt_task_t *task) } -static nxt_int_t -nxt_main_process_create(nxt_task_t *task, const nxt_process_init_t init) -{ - nxt_int_t ret; - nxt_runtime_t *rt; - nxt_process_t *process; - nxt_process_init_t *pinit; - - rt = task->thread->runtime; - - process = nxt_main_process_new(task, rt); - if (nxt_slow_path(process == NULL)) { - return NXT_ERROR; - } - - process->name = init.name; - process->user_cred = &rt->user_cred; - - pinit = nxt_process_init(process); - *pinit = init; - - ret = nxt_main_start_process(task, process); - if (nxt_slow_path(ret == NXT_ERROR)) { - nxt_process_use(task, process, -1); - } - - return ret; -} - - -static nxt_process_t * -nxt_main_process_new(nxt_task_t *task, nxt_runtime_t *rt) -{ - nxt_process_t *process; - - process = nxt_runtime_process_new(rt); - if (nxt_slow_path(process == NULL)) { - return NULL; - } - - process->mem_pool = nxt_mp_create(1024, 128, 256, 32); - if (process->mem_pool == NULL) { - nxt_process_use(task, process, -1); - return NULL; - } - - return process; -} - - -static nxt_int_t -nxt_main_start_process(nxt_task_t *task, nxt_process_t *process) -{ - nxt_mp_t *tmp_mp; - nxt_int_t ret; - nxt_pid_t pid; - nxt_port_t *port; - nxt_process_init_t *init; - - init = nxt_process_init(process); - - port = nxt_port_new(task, 0, 0, init->type); - if (nxt_slow_path(port == NULL)) { - return NXT_ERROR; - } - - nxt_process_port_add(task, process, port); - - ret = nxt_port_socket_init(task, port, 0); - if (nxt_slow_path(ret != NXT_OK)) { - goto free_port; - } - - tmp_mp = nxt_mp_create(1024, 128, 256, 32); - if (nxt_slow_path(tmp_mp == NULL)) { - ret = NXT_ERROR; - - goto close_port; - } - - if (init->prefork) { - ret = init->prefork(task, process, tmp_mp); - if (nxt_slow_path(ret != NXT_OK)) { - goto free_mempool; - } - } - - pid = nxt_process_create(task, process); - - switch (pid) { - - case -1: - ret = NXT_ERROR; - break; - - case 0: - /* The child process: return to the event engine work queue loop. */ - - nxt_process_use(task, process, -1); - - ret = NXT_AGAIN; - break; - - default: - /* The main process created a new process. */ - - nxt_process_use(task, process, -1); - - nxt_port_read_close(port); - nxt_port_write_enable(task, port); - - ret = NXT_OK; - break; - } - -free_mempool: - - nxt_mp_destroy(tmp_mp); - -close_port: - - if (nxt_slow_path(ret == NXT_ERROR)) { - nxt_port_close(task, port); - } - -free_port: - - nxt_port_use(task, port, -1); - - return ret; -} - - static void nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj, void *data) { @@ -859,13 +859,21 @@ fail: static void nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data) { - int status; - nxt_err_t err; - nxt_pid_t pid; + int status; + nxt_int_t ret; + nxt_err_t err; + nxt_pid_t pid; + nxt_port_t *port; + nxt_queue_t children; + nxt_runtime_t *rt; + nxt_process_t *process, *child; + nxt_process_init_t init; nxt_debug(task, "sigchld handler signo:%d (%s)", (int) (uintptr_t) obj, data); + rt = task->thread->runtime; + for ( ;; ) { pid = waitpid(-1, &status, WNOHANG); @@ -906,94 +914,86 @@ nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data) pid, WEXITSTATUS(status)); } - nxt_main_cleanup_process(task, pid); - } -} + process = nxt_runtime_process_find(rt, pid); + if (process != NULL) { + nxt_main_process_cleanup(task, process); -static void -nxt_main_process_signal_handler(nxt_task_t *task, void *obj, void *data) -{ - nxt_trace(task, "signal signo:%d (%s) recevied, ignored", - (int) (uintptr_t) obj, data); -} + if (process->state == NXT_PROCESS_STATE_READY) { + process->stream = 0; + } + nxt_queue_init(&children); -static void -nxt_main_cleanup_process(nxt_task_t *task, nxt_pid_t pid) -{ - int stream; - nxt_int_t ret; - nxt_buf_t *buf; - nxt_port_t *port; - const char *name; - nxt_runtime_t *rt; - nxt_process_t *process; - nxt_process_init_t init; + if (!nxt_queue_is_empty(&process->children)) { + nxt_queue_add(&children, &process->children); - rt = task->thread->runtime; + nxt_queue_init(&process->children); - process = nxt_runtime_process_find(rt, pid); - if (!process) { - return; - } + nxt_queue_each(child, &children, nxt_process_t, link) { + port = nxt_process_port_first(child); - if (process->isolation.cleanup != NULL) { - process->isolation.cleanup(task, process); - } + (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, + -1, 0, 0, NULL); + } nxt_queue_loop; + } - name = process->name; - stream = process->stream; - init = *((nxt_process_init_t *) nxt_process_init(process)); + if (nxt_exiting) { + nxt_process_close_ports(task, process); - if (process->state == NXT_PROCESS_STATE_READY) { - process->stream = 0; - } + nxt_queue_each(child, &children, nxt_process_t, link) { + nxt_queue_remove(&child->link); + child->link.next = NULL; - nxt_process_close_ports(task, process); + nxt_process_close_ports(task, child); + } nxt_queue_loop; - if (nxt_exiting) { - if (rt->nprocesses <= 1) { - nxt_runtime_quit(task, 0); - } + if (rt->nprocesses <= 1) { + nxt_runtime_quit(task, 0); + } - return; - } + return; + } - nxt_runtime_process_each(rt, process) { + nxt_port_remove_notify_others(task, process); - if (process->pid == nxt_pid - || process->pid == pid - || nxt_queue_is_empty(&process->ports)) - { - continue; - } + nxt_queue_each(child, &children, nxt_process_t, link) { + nxt_port_remove_notify_others(task, child); - port = nxt_process_port_first(process); + nxt_queue_remove(&child->link); + child->link.next = NULL; - if (nxt_proc_remove_notify_matrix[init.type][port->type] == 0) { - continue; - } + nxt_process_close_ports(task, child); + } nxt_queue_loop; + + init = *(nxt_process_init_t *) nxt_process_init(process); - buf = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, - sizeof(pid)); + nxt_process_close_ports(task, process); - if (nxt_slow_path(buf == NULL)) { - continue; + if (init.restart) { + ret = nxt_process_init_start(task, init); + if (nxt_slow_path(ret == NXT_ERROR)) { + nxt_alert(task, "failed to restart %s", init.name); + } + } } + } +} - buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(pid)); - nxt_port_socket_write(task, port, NXT_PORT_MSG_REMOVE_PID, -1, - stream, 0, buf); +static void +nxt_main_process_signal_handler(nxt_task_t *task, void *obj, void *data) +{ + nxt_trace(task, "signal signo:%d (%s) recevied, ignored", + (int) (uintptr_t) obj, data); +} - } nxt_runtime_process_loop; - if (init.restart) { - ret = nxt_main_process_create(task, init); - if (nxt_slow_path(ret == NXT_ERROR)) { - nxt_alert(task, "failed to restart %s", name); - } +static void +nxt_main_process_cleanup(nxt_task_t *task, nxt_process_t *process) +{ + if (process->isolation.cleanup != NULL) { + process->isolation.cleanup(task, process); } } @@ -1016,6 +1016,13 @@ nxt_main_port_socket_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) return; } + if (nxt_slow_path(port->type != NXT_PROCESS_ROUTER)) { + nxt_alert(task, "process %PI cannot create listener sockets", + msg->port_msg.pid); + + return; + } + b = msg->buf; sa = (nxt_sockaddr_t *) b->mem.pos; @@ -1259,6 +1266,7 @@ nxt_main_port_modules_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) rt = task->thread->runtime; if (msg->port_msg.pid != rt->port_by_type[NXT_PROCESS_DISCOVERY]->pid) { + nxt_alert(task, "process %PI cannot send modules", msg->port_msg.pid); return; } @@ -1379,9 +1387,9 @@ fail: nxt_mp_destroy(mp); - ret = nxt_main_process_create(task, nxt_controller_process); + ret = nxt_process_init_start(task, nxt_controller_process); if (ret == NXT_OK) { - ret = nxt_main_process_create(task, nxt_router_process); + ret = nxt_process_init_start(task, nxt_router_process); } if (nxt_slow_path(ret == NXT_ERROR)) { @@ -1419,11 +1427,20 @@ static void nxt_main_port_conf_store_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { void *p; - size_t size; - ssize_t n; + size_t n, size; nxt_int_t ret; - nxt_file_t file; + nxt_port_t *ctl_port; nxt_runtime_t *rt; + u_char ver[NXT_INT_T_LEN]; + + rt = task->thread->runtime; + + ctl_port = rt->port_by_type[NXT_PROCESS_CONTROLLER]; + + if (nxt_slow_path(msg->port_msg.pid != ctl_port->pid)) { + nxt_alert(task, "process %PI cannot store conf", msg->port_msg.pid); + return; + } p = MAP_FAILED; @@ -1457,29 +1474,18 @@ nxt_main_port_conf_store_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_debug(task, "conf_store_handler(%uz): %*s", size, size, p); - nxt_memzero(&file, sizeof(nxt_file_t)); - - rt = task->thread->runtime; - - file.name = (nxt_file_name_t *) rt->conf_tmp; + if (nxt_conf_ver != NXT_VERNUM) { + n = nxt_sprintf(ver, ver + NXT_INT_T_LEN, "%d", NXT_VERNUM) - ver; - if (nxt_slow_path(nxt_file_open(task, &file, NXT_FILE_WRONLY, - NXT_FILE_TRUNCATE, NXT_FILE_OWNER_ACCESS) - != NXT_OK)) - { - goto error; - } - - n = nxt_file_write(&file, p, size, 0); - - nxt_file_close(task, &file); + ret = nxt_main_file_store(task, rt->ver_tmp, rt->ver, ver, n); + if (nxt_slow_path(ret != NXT_OK)) { + goto error; + } - if (nxt_slow_path(n != (ssize_t) size)) { - (void) nxt_file_delete(file.name); - goto error; + nxt_conf_ver = NXT_VERNUM; } - ret = nxt_file_rename(file.name, (nxt_file_name_t *) rt->conf); + ret = nxt_main_file_store(task, rt->conf_tmp, rt->conf, p, size); if (nxt_fast_path(ret == NXT_OK)) { goto cleanup; @@ -1502,6 +1508,37 @@ cleanup: } +static nxt_int_t +nxt_main_file_store(nxt_task_t *task, const char *tmp_name, const char *name, + u_char *buf, size_t size) +{ + ssize_t n; + nxt_int_t ret; + nxt_file_t file; + + nxt_memzero(&file, sizeof(nxt_file_t)); + + file.name = (nxt_file_name_t *) name; + + ret = nxt_file_open(task, &file, NXT_FILE_WRONLY, NXT_FILE_TRUNCATE, + NXT_FILE_OWNER_ACCESS); + if (nxt_slow_path(ret != NXT_OK)) { + return NXT_ERROR; + } + + n = nxt_file_write(&file, buf, size, 0); + + nxt_file_close(task, &file); + + if (nxt_slow_path(n != (ssize_t) size)) { + (void) nxt_file_delete(file.name); + return NXT_ERROR; + } + + return nxt_file_rename(file.name, (nxt_file_name_t *) name); +} + + static void nxt_main_port_access_log_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { diff --git a/src/nxt_main_process.h b/src/nxt_main_process.h index f9c974d8..ef083d63 100644 --- a/src/nxt_main_process.h +++ b/src/nxt_main_process.h @@ -23,9 +23,11 @@ nxt_int_t nxt_main_process_start(nxt_thread_t *thr, nxt_task_t *task, nxt_runtime_t *runtime); +NXT_EXPORT extern nxt_uint_t nxt_conf_ver; NXT_EXPORT extern const nxt_process_init_t nxt_discovery_process; NXT_EXPORT extern const nxt_process_init_t nxt_controller_process; NXT_EXPORT extern const nxt_process_init_t nxt_router_process; +NXT_EXPORT extern const nxt_process_init_t nxt_proto_process; NXT_EXPORT extern const nxt_process_init_t nxt_app_process; extern const nxt_sig_event_t nxt_main_process_signals[]; diff --git a/src/nxt_openssl.c b/src/nxt_openssl.c index 273ca7f4..1e08015e 100644 --- a/src/nxt_openssl.c +++ b/src/nxt_openssl.c @@ -16,18 +16,32 @@ typedef struct { - SSL *session; - nxt_conn_t *conn; + SSL *session; + nxt_conn_t *conn; - int ssl_error; - uint8_t times; /* 2 bits */ - uint8_t handshake; /* 1 bit */ + int ssl_error; + uint8_t times; /* 2 bits */ + uint8_t handshake; /* 1 bit */ - nxt_tls_conf_t *conf; - nxt_buf_mem_t buffer; + nxt_tls_conf_t *conf; + nxt_buf_mem_t buffer; } nxt_openssl_conn_t; +struct nxt_tls_ticket_s { + u_char name[16]; + u_char hmac_key[32]; + u_char aes_key[32]; + uint8_t size; +}; + + +struct nxt_tls_tickets_s { + nxt_uint_t count; + nxt_tls_ticket_t tickets[]; +}; + + typedef enum { NXT_OPENSSL_HANDSHAKE = 0, NXT_OPENSSL_READ, @@ -607,8 +621,8 @@ static nxt_int_t nxt_tls_ticket_keys(nxt_task_t *task, SSL_CTX *ctx, nxt_tls_init_t *tls_init, nxt_mp_t *mp) { + size_t len; uint32_t i; - nxt_int_t ret; nxt_str_t value; nxt_uint_t count; nxt_conf_value_t *member, *tickets_conf; @@ -672,23 +686,21 @@ nxt_tls_ticket_keys(nxt_task_t *task, SSL_CTX *ctx, nxt_tls_init_t *tls_init, nxt_conf_get_string(member, &value); - ret = nxt_openssl_base64_decode(buf, 80, value.start, value.length); - if (nxt_slow_path(ret == NXT_ERROR)) { - return NXT_ERROR; - } + len = nxt_base64_decode(buf, value.start, value.length); - if (ret == 48) { - ticket->aes128 = 1; + nxt_memcpy(ticket->name, buf, 16); + + if (len == 48) { nxt_memcpy(ticket->aes_key, buf + 16, 16); nxt_memcpy(ticket->hmac_key, buf + 32, 16); + ticket->size = 16; } else { - ticket->aes128 = 0; nxt_memcpy(ticket->hmac_key, buf + 16, 32); nxt_memcpy(ticket->aes_key, buf + 48, 32); + ticket->size = 32; } - nxt_memcpy(ticket->name, buf, 16); } while (i < count); if (SSL_CTX_set_tlsext_ticket_key_cb(ctx, nxt_tls_ticket_key_callback) @@ -727,7 +739,6 @@ static int nxt_tls_ticket_key_callback(SSL *s, unsigned char *name, unsigned char *iv, EVP_CIPHER_CTX *ectx, HMAC_CTX *hctx, int enc) { - size_t size; nxt_uint_t i; nxt_conn_t *c; const EVP_MD *digest; @@ -745,25 +756,14 @@ nxt_tls_ticket_key_callback(SSL *s, unsigned char *name, unsigned char *iv, tls = c->u.tls; ticket = tls->conf->tickets->tickets; -#ifdef OPENSSL_NO_SHA256 - digest = EVP_sha1(); -#else - digest = EVP_sha256(); -#endif + i = 0; if (enc == 1) { /* encrypt session ticket */ nxt_debug(c->socket.task, "TLS session ticket encrypt"); - if (ticket[0].aes128 == 1) { - cipher = EVP_aes_128_cbc(); - size = 16; - - } else { - cipher = EVP_aes_256_cbc(); - size = 32; - } + cipher = (ticket[0].size == 16) ? EVP_aes_128_cbc() : EVP_aes_256_cbc(); if (RAND_bytes(iv, EVP_CIPHER_iv_length(cipher)) != 1) { nxt_openssl_log_error(c->socket.task, NXT_LOG_ALERT, @@ -771,32 +771,24 @@ nxt_tls_ticket_key_callback(SSL *s, unsigned char *name, unsigned char *iv, return -1; } - if (EVP_EncryptInit_ex(ectx, cipher, NULL, ticket[0].aes_key, iv) - != 1) + nxt_memcpy(name, ticket[0].name, 16); + + if (EVP_EncryptInit_ex(ectx, cipher, NULL, ticket[0].aes_key, iv) != 1) { nxt_openssl_log_error(c->socket.task, NXT_LOG_ALERT, "EVP_EncryptInit_ex() failed"); return -1; } - if (HMAC_Init_ex(hctx, ticket[0].hmac_key, size, digest, NULL) != 1) { - nxt_openssl_log_error(c->socket.task, NXT_LOG_ALERT, - "HMAC_Init_ex() failed"); - return -1; - } - - nxt_memcpy(name, ticket[0].name, 16); - - return 1; - } else { /* decrypt session ticket */ - for (i = 0; i < tls->conf->tickets->count; i++) { + do { if (nxt_memcmp(name, ticket[i].name, 16) == 0) { goto found; } - } + + } while (++i < tls->conf->tickets->count); nxt_debug(c->socket.task, "TLS session ticket decrypt, key not found"); @@ -807,29 +799,33 @@ nxt_tls_ticket_key_callback(SSL *s, unsigned char *name, unsigned char *iv, nxt_debug(c->socket.task, "TLS session ticket decrypt, key number: \"%d\"", i); - if (ticket[i].aes128 == 1) { - cipher = EVP_aes_128_cbc(); - size = 16; + enc = (i == 0) ? 1 : 2 /* renew */; - } else { - cipher = EVP_aes_256_cbc(); - size = 32; - } + cipher = (ticket[i].size == 16) ? EVP_aes_128_cbc() : EVP_aes_256_cbc(); - if (EVP_DecryptInit_ex(ectx, cipher, NULL, ticket[i].aes_key, iv) != 1) { + if (EVP_DecryptInit_ex(ectx, cipher, NULL, ticket[i].aes_key, iv) != 1) + { nxt_openssl_log_error(c->socket.task, NXT_LOG_ALERT, "EVP_DecryptInit_ex() failed"); return -1; } + } - if (HMAC_Init_ex(hctx, ticket[i].hmac_key, size, digest, NULL) != 1) { - nxt_openssl_log_error(c->socket.task, NXT_LOG_ALERT, - "HMAC_Init_ex() failed"); - return -1; - } +#ifdef OPENSSL_NO_SHA256 + digest = EVP_sha1(); +#else + digest = EVP_sha256(); +#endif - return (i == 0) ? 1 : 2 /* renew */; + if (HMAC_Init_ex(hctx, ticket[i].hmac_key, ticket[i].size, digest, NULL) + != 1) + { + nxt_openssl_log_error(c->socket.task, NXT_LOG_ALERT, + "HMAC_Init_ex() failed"); + return -1; } + + return enc; } #endif /* SSL_CTRL_SET_TLSEXT_TICKET_KEY_CB */ @@ -1819,70 +1815,3 @@ nxt_openssl_copy_error(u_char *p, u_char *end) return p; } - - -nxt_int_t -nxt_openssl_base64_decode(u_char *d, size_t dlen, const u_char *s, size_t slen) -{ - BIO *bio, *b64; - nxt_int_t count, ret; - u_char buf[128]; - - b64 = BIO_new(BIO_f_base64()); - if (nxt_slow_path(b64 == NULL)) { - goto error; - } - - bio = BIO_new_mem_buf(s, slen); - if (nxt_slow_path(bio == NULL)) { - goto error; - } - - bio = BIO_push(b64, bio); - - BIO_set_flags(bio, BIO_FLAGS_BASE64_NO_NL); - - count = 0; - - if (d == NULL) { - - for ( ;; ) { - ret = BIO_read(bio, buf, 128); - - if (ret < 0) { - goto invalid; - } - - count += ret; - - if (ret != 128) { - break; - } - } - - } else { - count = BIO_read(bio, d, dlen); - - if (count < 0) { - goto invalid; - } - } - - BIO_free_all(bio); - - return count; - -error: - - BIO_vfree(b64); - ERR_clear_error(); - - return NXT_ERROR; - -invalid: - - BIO_free_all(bio); - ERR_clear_error(); - - return NXT_DECLINED; -} diff --git a/src/nxt_php_sapi.c b/src/nxt_php_sapi.c index 3fb3b0db..ea5f5581 100644 --- a/src/nxt_php_sapi.c +++ b/src/nxt_php_sapi.c @@ -485,14 +485,13 @@ nxt_php_start(nxt_task_t *task, nxt_process_data_t *data) } } - ret = nxt_unit_default_init(task, &php_init); + ret = nxt_unit_default_init(task, &php_init, conf); if (nxt_slow_path(ret != NXT_OK)) { nxt_alert(task, "nxt_unit_default_init() failed"); return ret; } php_init.callbacks.request_handler = nxt_php_request_handler; - php_init.shm_limit = conf->shm_limit; unit_ctx = nxt_unit_init(&php_init); if (nxt_slow_path(unit_ctx == NULL)) { diff --git a/src/nxt_port.c b/src/nxt_port.c index d4e46564..1e8fa28a 100644 --- a/src/nxt_port.c +++ b/src/nxt_port.c @@ -12,6 +12,8 @@ #include <nxt_port_queue.h> +static void nxt_port_remove_pid(nxt_task_t *task, nxt_port_recv_msg_t *msg, + nxt_pid_t pid); static void nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); static nxt_atomic_uint_t nxt_port_last_id = 1; @@ -274,6 +276,8 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_debug(task, "port %PI:%d already exists", new_port_msg->pid, new_port_msg->id); + msg->u.new_port = port; + nxt_fd_close(msg->fd[0]); msg->fd[0] = -1; return; @@ -384,14 +388,13 @@ nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt, nxt_uint_t slot, port = nxt_process_port_first(process); - b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, - sizeof(nxt_port_data_t)); + b = nxt_buf_mem_alloc(task->thread->engine->mem_pool, + sizeof(nxt_uint_t), 0); if (nxt_slow_path(b == NULL)) { continue; } - *(nxt_uint_t *) b->mem.pos = slot; - b->mem.free += sizeof(nxt_uint_t); + b->mem.free = nxt_cpymem(b->mem.free, &slot, sizeof(nxt_uint_t)); (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_CHANGE_FILE, fd, 0, 0, b); @@ -448,18 +451,74 @@ nxt_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) void -nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +nxt_port_remove_notify_others(nxt_task_t *task, nxt_process_t *process) { - nxt_buf_t *buf; nxt_pid_t pid; + nxt_buf_t *buf; + nxt_port_t *port; nxt_runtime_t *rt; - nxt_process_t *process; + nxt_process_t *p; + nxt_process_type_t ptype; + + pid = process->pid; + + ptype = nxt_process_type(process); + + rt = task->thread->runtime; + + nxt_runtime_process_each(rt, p) { + + if (p->pid == nxt_pid + || p->pid == pid + || nxt_queue_is_empty(&p->ports)) + { + continue; + } + + port = nxt_process_port_first(p); + + if (nxt_proc_remove_notify_matrix[ptype][port->type] == 0) { + continue; + } + + buf = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, + sizeof(pid)); + + if (nxt_slow_path(buf == NULL)) { + continue; + } + + buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(pid)); + + nxt_port_socket_write(task, port, NXT_PORT_MSG_REMOVE_PID, -1, + process->stream, 0, buf); + + } nxt_runtime_process_loop; +} + + +void +nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + nxt_pid_t pid; + nxt_buf_t *buf; buf = msg->buf; nxt_assert(nxt_buf_used_size(buf) == sizeof(pid)); - nxt_memcpy(&pid, buf->mem.pos, sizeof(pid)); + nxt_memcpy(&pid, buf->mem.pos, sizeof(nxt_pid_t)); + + nxt_port_remove_pid(task, msg, pid); +} + + +static void +nxt_port_remove_pid(nxt_task_t *task, nxt_port_recv_msg_t *msg, + nxt_pid_t pid) +{ + nxt_runtime_t *rt; + nxt_process_t *process; msg->u.removed_pid = pid; diff --git a/src/nxt_port.h b/src/nxt_port.h index a0bc2512..3b66edfd 100644 --- a/src/nxt_port.h +++ b/src/nxt_port.h @@ -33,6 +33,7 @@ struct nxt_port_handlers_s { /* New process */ nxt_port_handler_t process_created; nxt_port_handler_t process_ready; + nxt_port_handler_t whoami; /* Process exit/crash notification. */ nxt_port_handler_t remove_pid; @@ -92,6 +93,7 @@ typedef enum { _NXT_PORT_MSG_PROCESS_CREATED = nxt_port_handler_idx(process_created), _NXT_PORT_MSG_PROCESS_READY = nxt_port_handler_idx(process_ready), + _NXT_PORT_MSG_WHOAMI = nxt_port_handler_idx(whoami), _NXT_PORT_MSG_REMOVE_PID = nxt_port_handler_idx(remove_pid), _NXT_PORT_MSG_QUIT = nxt_port_handler_idx(quit), @@ -131,6 +133,7 @@ typedef enum { NXT_PORT_MSG_PROCESS_CREATED = nxt_msg_last(_NXT_PORT_MSG_PROCESS_CREATED), NXT_PORT_MSG_PROCESS_READY = nxt_msg_last(_NXT_PORT_MSG_PROCESS_READY), + NXT_PORT_MSG_WHOAMI = nxt_msg_last(_NXT_PORT_MSG_WHOAMI), NXT_PORT_MSG_QUIT = nxt_msg_last(_NXT_PORT_MSG_QUIT), NXT_PORT_MSG_REMOVE_PID = nxt_msg_last(_NXT_PORT_MSG_REMOVE_PID), @@ -153,7 +156,9 @@ typedef enum { /* Passed as a first iov chunk. */ typedef struct { uint32_t stream; - nxt_pid_t pid; + + nxt_pid_t pid; /* not used on Linux and FreeBSD */ + nxt_port_id_t reply_port; uint8_t type; @@ -186,6 +191,9 @@ typedef struct { uint8_t allocated; /* 1 bit */ } nxt_port_send_msg_t; +#if (NXT_HAVE_UCRED) || (NXT_HAVE_MSGHDR_CMSGCRED) +#define NXT_USE_CMSG_PID 1 +#endif struct nxt_port_recv_msg_s { nxt_fd_t fd[2]; @@ -193,6 +201,9 @@ struct nxt_port_recv_msg_s { nxt_port_t *port; nxt_port_msg_t port_msg; size_t size; +#if (NXT_USE_CMSG_PID) + nxt_pid_t cmsg_pid; +#endif nxt_bool_t cancelled; union { nxt_port_t *new_port; @@ -201,6 +212,15 @@ struct nxt_port_recv_msg_s { } u; }; + +#if (NXT_USE_CMSG_PID) +#define nxt_recv_msg_cmsg_pid(msg) ((msg)->cmsg_pid) +#define nxt_recv_msg_cmsg_pid_ref(msg) (&(msg)->cmsg_pid) +#else +#define nxt_recv_msg_cmsg_pid(msg) ((msg)->port_msg.pid) +#define nxt_recv_msg_cmsg_pid_ref(msg) (NULL) +#endif + typedef struct nxt_app_s nxt_app_t; struct nxt_port_s { @@ -224,8 +244,6 @@ struct nxt_port_s { /* Maximum interleave of message parts. */ uint32_t max_share; - uint32_t app_responses; - uint32_t active_websockets; uint32_t active_requests; @@ -324,6 +342,7 @@ nxt_int_t nxt_port_send_port(nxt_task_t *task, nxt_port_t *port, nxt_port_t *new_port, uint32_t stream); void nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt, nxt_uint_t slot, nxt_fd_t fd); +void nxt_port_remove_notify_others(nxt_task_t *task, nxt_process_t *process); void nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); void nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); diff --git a/src/nxt_port_memory.c b/src/nxt_port_memory.c index bffae8a1..e799f860 100644 --- a/src/nxt_port_memory.c +++ b/src/nxt_port_memory.c @@ -232,6 +232,18 @@ nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process, hdr = mem; + if (nxt_slow_path(hdr->src_pid != process->pid + || hdr->dst_pid != nxt_pid)) + { + nxt_log(task, NXT_LOG_WARN, "unexpected pid in mmap header detected: " + "%PI != %PI or %PI != %PI", hdr->src_pid, process->pid, + hdr->dst_pid, nxt_pid); + + nxt_mem_munmap(mem, PORT_MMAP_SIZE); + + return NULL; + } + mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t)); if (nxt_slow_path(mmap_handler == NULL)) { nxt_log(task, NXT_LOG_WARN, "failed to allocate mmap_handler"); @@ -244,16 +256,6 @@ nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process, mmap_handler->hdr = hdr; mmap_handler->fd = -1; - if (nxt_slow_path(hdr->src_pid != process->pid - || hdr->dst_pid != nxt_pid)) - { - nxt_log(task, NXT_LOG_WARN, "unexpected pid in mmap header detected: " - "%PI != %PI or %PI != %PI", hdr->src_pid, process->pid, - hdr->dst_pid, nxt_pid); - - return NULL; - } - nxt_thread_mutex_lock(&process->incoming.mutex); port_mmap = nxt_port_mmap_at(&process->incoming, hdr->id); @@ -261,7 +263,6 @@ nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process, nxt_log(task, NXT_LOG_WARN, "failed to add mmap to incoming array"); nxt_mem_munmap(mem, PORT_MMAP_SIZE); - hdr = NULL; nxt_free(mmap_handler); mmap_handler = NULL; @@ -318,6 +319,7 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_port_mmaps_t *mmaps, MAP_SHARED, fd, 0); if (nxt_slow_path(mem == MAP_FAILED)) { + nxt_fd_close(fd); goto remove_fail; } diff --git a/src/nxt_port_rpc.c b/src/nxt_port_rpc.c index f4008a18..0cac5cbb 100644 --- a/src/nxt_port_rpc.c +++ b/src/nxt_port_rpc.c @@ -393,8 +393,8 @@ nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer) msg.fd[1] = -1; msg.buf = &buf; msg.port = port; - - msg.port_msg.pid = peer; + msg.u.removed_pid = peer; + msg.port_msg.pid = nxt_pid; msg.port_msg.type = _NXT_PORT_MSG_REMOVE_PID; peer_link = lhq.value; diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c index ba1b7081..2a51dfb6 100644 --- a/src/nxt_port_socket.c +++ b/src/nxt_port_socket.c @@ -5,6 +5,7 @@ */ #include <nxt_main.h> +#include <nxt_socket_msg.h> #include <nxt_port_queue.h> #include <nxt_port_memory_int.h> @@ -22,6 +23,7 @@ static nxt_port_send_msg_t *nxt_port_msg_alloc(nxt_port_send_msg_t *m); static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data); static nxt_port_send_msg_t *nxt_port_msg_first(nxt_port_t *port); nxt_inline void nxt_port_msg_close_fd(nxt_port_send_msg_t *msg); +nxt_inline void nxt_port_close_fds(nxt_fd_t *fd); static nxt_buf_t *nxt_port_buf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b, size_t sent, nxt_bool_t mmap_mode); static nxt_port_send_msg_t *nxt_port_msg_insert_tail(nxt_port_t *port, @@ -593,16 +595,21 @@ nxt_port_msg_close_fd(nxt_port_send_msg_t *msg) return; } - if (msg->fd[0] != -1) { - nxt_fd_close(msg->fd[0]); + nxt_port_close_fds(msg->fd); +} - msg->fd[0] = -1; - } - if (msg->fd[1] != -1) { - nxt_fd_close(msg->fd[1]); +nxt_inline void +nxt_port_close_fds(nxt_fd_t *fd) +{ + if (fd[0] != -1) { + nxt_fd_close(fd[0]); + fd[0] = -1; + } - msg->fd[1] = -1; + if (fd[1] != -1) { + nxt_fd_close(fd[1]); + fd[1] = -1; } } @@ -725,16 +732,17 @@ nxt_port_read_handler(nxt_task_t *task, void *obj, void *data) { ssize_t n; nxt_buf_t *b; + nxt_int_t ret; nxt_port_t *port; - struct iovec iov[2]; + nxt_recv_oob_t oob; nxt_port_recv_msg_t msg; + struct iovec iov[2]; port = msg.port = nxt_container_of(obj, nxt_port_t, socket); nxt_assert(port->engine == task->thread->engine); for ( ;; ) { - b = nxt_port_buf_alloc(port); if (nxt_slow_path(b == NULL)) { @@ -747,9 +755,22 @@ nxt_port_read_handler(nxt_task_t *task, void *obj, void *data) iov[1].iov_base = b->mem.pos; iov[1].iov_len = port->max_size; - n = nxt_socketpair_recv(&port->socket, msg.fd, iov, 2); + n = nxt_socketpair_recv(&port->socket, iov, 2, &oob); if (n > 0) { + msg.fd[0] = -1; + msg.fd[1] = -1; + + ret = nxt_socket_msg_oob_get(&oob, msg.fd, + nxt_recv_msg_cmsg_pid_ref(&msg)); + if (nxt_slow_path(ret != NXT_OK)) { + nxt_alert(task, "failed to get oob data from %d", + port->socket.fd); + + nxt_port_close_fds(msg.fd); + + goto fail; + } msg.buf = b; msg.size = n; @@ -778,8 +799,8 @@ nxt_port_read_handler(nxt_task_t *task, void *obj, void *data) return; } - /* n == 0 || n == NXT_ERROR */ - +fail: + /* n == 0 || error */ nxt_work_queue_add(&task->thread->engine->fast_work_queue, nxt_port_error_handler, task, &port->socket, NULL); return; @@ -792,8 +813,10 @@ nxt_port_queue_read_handler(nxt_task_t *task, void *obj, void *data) { ssize_t n; nxt_buf_t *b; + nxt_int_t ret; nxt_port_t *port; struct iovec iov[2]; + nxt_recv_oob_t oob; nxt_port_queue_t *queue; nxt_port_recv_msg_t msg, *smsg; uint8_t qmsg[NXT_PORT_QUEUE_MSG_SIZE]; @@ -884,7 +907,23 @@ nxt_port_queue_read_handler(nxt_task_t *task, void *obj, void *data) iov[1].iov_base = b->mem.pos; iov[1].iov_len = port->max_size; - n = nxt_socketpair_recv(&port->socket, msg.fd, iov, 2); + n = nxt_socketpair_recv(&port->socket, iov, 2, &oob); + + if (n > 0) { + msg.fd[0] = -1; + msg.fd[1] = -1; + + ret = nxt_socket_msg_oob_get(&oob, msg.fd, + nxt_recv_msg_cmsg_pid_ref(&msg)); + if (nxt_slow_path(ret != NXT_OK)) { + nxt_alert(task, "failed to get oob data from %d", + port->socket.fd); + + nxt_port_close_fds(msg.fd); + + return; + } + } if (n == (ssize_t) sizeof(nxt_port_msg_t) && msg.port_msg.type == _NXT_PORT_MSG_READ_QUEUE) @@ -1139,13 +1178,7 @@ nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, nxt_alert(task, "port %d: too small message:%uz", port->socket.fd, msg->size); - if (msg->fd[0] != -1) { - nxt_fd_close(msg->fd[0]); - } - - if (msg->fd[1] != -1) { - nxt_fd_close(msg->fd[1]); - } + nxt_port_close_fds(msg->fd); return; } @@ -1225,13 +1258,7 @@ nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, b = NULL; } else { - if (msg->fd[0] != -1) { - nxt_fd_close(msg->fd[0]); - } - - if (msg->fd[1] != -1) { - nxt_fd_close(msg->fd[1]); - } + nxt_port_close_fds(msg->fd); } } else { if (nxt_fast_path(msg->cancelled == 0)) { diff --git a/src/nxt_process.c b/src/nxt_process.c index 87419313..fca197eb 100644 --- a/src/nxt_process.c +++ b/src/nxt_process.c @@ -5,7 +5,6 @@ */ #include <nxt_main.h> -#include <nxt_main_process.h> #if (NXT_HAVE_CLONE) #include <nxt_clone.h> @@ -17,9 +16,26 @@ #include <sys/prctl.h> #endif + +#if (NXT_HAVE_CLONE) && (NXT_HAVE_CLONE_NEWPID) +#define nxt_is_pid_isolated(process) \ + nxt_is_clone_flag_set(process->isolation.clone.flags, NEWPID) +#else +#define nxt_is_pid_isolated(process) \ + (0) +#endif + + +static nxt_pid_t nxt_process_create(nxt_task_t *task, nxt_process_t *process); +static nxt_int_t nxt_process_do_start(nxt_task_t *task, nxt_process_t *process); +static nxt_int_t nxt_process_whoami(nxt_task_t *task, nxt_process_t *process); static nxt_int_t nxt_process_setup(nxt_task_t *task, nxt_process_t *process); static nxt_int_t nxt_process_child_fixup(nxt_task_t *task, nxt_process_t *process); +static void nxt_process_whoami_ok(nxt_task_t *task, nxt_port_recv_msg_t *msg, + void *data); +static void nxt_process_whoami_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, + void *data); static nxt_int_t nxt_process_send_created(nxt_task_t *task, nxt_process_t *process); static nxt_int_t nxt_process_send_ready(nxt_task_t *task, @@ -43,66 +59,200 @@ nxt_uid_t nxt_euid; nxt_gid_t nxt_egid; nxt_bool_t nxt_proc_conn_matrix[NXT_PROCESS_MAX][NXT_PROCESS_MAX] = { - { 1, 1, 1, 1, 1 }, - { 1, 0, 0, 0, 0 }, - { 1, 0, 0, 1, 0 }, - { 1, 0, 1, 0, 1 }, - { 1, 0, 0, 1, 0 }, + { 1, 1, 1, 1, 1, 1 }, + { 1, 0, 0, 0, 0, 0 }, + { 1, 0, 0, 1, 0, 0 }, + { 1, 0, 1, 1, 1, 1 }, + { 1, 0, 0, 1, 0, 0 }, + { 1, 0, 0, 1, 0, 0 }, }; nxt_bool_t nxt_proc_remove_notify_matrix[NXT_PROCESS_MAX][NXT_PROCESS_MAX] = { - { 0, 0, 0, 0, 0 }, - { 0, 0, 0, 0, 0 }, - { 0, 0, 0, 1, 0 }, - { 0, 0, 1, 0, 1 }, - { 0, 0, 0, 1, 0 }, + { 0, 0, 0, 0, 0, 0 }, + { 0, 0, 0, 0, 0, 0 }, + { 0, 0, 0, 1, 0, 0 }, + { 0, 0, 1, 0, 1, 1 }, + { 0, 0, 0, 1, 0, 0 }, + { 1, 0, 0, 1, 0, 0 }, }; -static nxt_int_t -nxt_process_child_fixup(nxt_task_t *task, nxt_process_t *process) +static const nxt_port_handlers_t nxt_process_whoami_port_handlers = { + .quit = nxt_signal_quit_handler, + .rpc_ready = nxt_port_rpc_handler, + .rpc_error = nxt_port_rpc_handler, +}; + + +nxt_process_t * +nxt_process_new(nxt_runtime_t *rt) { - nxt_process_t *p; + nxt_process_t *process; + + process = nxt_mp_zalloc(rt->mem_pool, sizeof(nxt_process_t) + + sizeof(nxt_process_init_t)); + + if (nxt_slow_path(process == NULL)) { + return NULL; + } + + nxt_queue_init(&process->ports); + + nxt_thread_mutex_create(&process->incoming.mutex); + + process->use_count = 1; + + nxt_queue_init(&process->children); + + return process; +} + + +void +nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i) +{ + process->use_count += i; + + if (process->use_count == 0) { + nxt_runtime_process_release(task->thread->runtime, process); + } +} + + +nxt_int_t +nxt_process_init_start(nxt_task_t *task, nxt_process_init_t init) +{ + nxt_int_t ret; nxt_runtime_t *rt; + nxt_process_t *process; + nxt_process_init_t *pinit; + + rt = task->thread->runtime; + + process = nxt_process_new(rt); + if (nxt_slow_path(process == NULL)) { + return NXT_ERROR; + } + + process->parent_port = rt->port_by_type[rt->type]; + + process->name = init.name; + process->user_cred = &rt->user_cred; + + pinit = nxt_process_init(process); + *pinit = init; + + ret = nxt_process_start(task, process); + if (nxt_slow_path(ret == NXT_ERROR)) { + nxt_process_use(task, process, -1); + } + + return ret; +} + + +nxt_int_t +nxt_process_start(nxt_task_t *task, nxt_process_t *process) +{ + nxt_mp_t *tmp_mp; + nxt_int_t ret; + nxt_pid_t pid; + nxt_port_t *port; nxt_process_init_t *init; - nxt_process_type_t ptype; init = nxt_process_init(process); - nxt_pid = nxt_getpid(); - - process->pid = nxt_pid; + port = nxt_port_new(task, 0, 0, init->type); + if (nxt_slow_path(port == NULL)) { + return NXT_ERROR; + } - /* Clean inherited cached thread tid. */ - task->thread->tid = 0; + nxt_process_port_add(task, process, port); -#if (NXT_HAVE_CLONE && NXT_HAVE_CLONE_NEWPID) - if (nxt_is_clone_flag_set(process->isolation.clone.flags, NEWPID)) { - ssize_t pidsz; - char procpid[10]; + ret = nxt_port_socket_init(task, port, 0); + if (nxt_slow_path(ret != NXT_OK)) { + goto free_port; + } - nxt_debug(task, "%s isolated pid is %d", process->name, nxt_pid); + tmp_mp = nxt_mp_create(1024, 128, 256, 32); + if (nxt_slow_path(tmp_mp == NULL)) { + ret = NXT_ERROR; - pidsz = readlink("/proc/self", procpid, sizeof(procpid)); + goto close_port; + } - if (nxt_slow_path(pidsz < 0 || pidsz >= (ssize_t) sizeof(procpid))) { - nxt_alert(task, "failed to read real pid from /proc/self"); - return NXT_ERROR; + if (init->prefork) { + ret = init->prefork(task, process, tmp_mp); + if (nxt_slow_path(ret != NXT_OK)) { + goto free_mempool; } + } + + pid = nxt_process_create(task, process); + + switch (pid) { - procpid[pidsz] = '\0'; + case -1: + ret = NXT_ERROR; + break; + + case 0: + /* The child process: return to the event engine work queue loop. */ - nxt_pid = (nxt_pid_t) strtol(procpid, NULL, 10); + nxt_process_use(task, process, -1); + + ret = NXT_AGAIN; + break; + + default: + /* The parent process created a new process. */ - nxt_assert(nxt_pid > 0 && nxt_errno != ERANGE); + nxt_process_use(task, process, -1); - process->pid = nxt_pid; - task->thread->tid = nxt_pid; + nxt_port_read_close(port); + nxt_port_write_enable(task, port); - nxt_debug(task, "%s real pid is %d", process->name, nxt_pid); + ret = NXT_OK; + break; } -#endif +free_mempool: + + nxt_mp_destroy(tmp_mp); + +close_port: + + if (nxt_slow_path(ret == NXT_ERROR)) { + nxt_port_close(task, port); + } + +free_port: + + nxt_port_use(task, port, -1); + + return ret; +} + + +static nxt_int_t +nxt_process_child_fixup(nxt_task_t *task, nxt_process_t *process) +{ + nxt_process_t *p; + nxt_runtime_t *rt; + nxt_process_init_t *init; + nxt_process_type_t ptype; + + init = nxt_process_init(process); + + nxt_ppid = nxt_pid; + + nxt_pid = nxt_getpid(); + + process->pid = nxt_pid; + process->isolated_pid = nxt_pid; + + /* Clean inherited cached thread tid. */ + task->thread->tid = 0; ptype = init->type; @@ -115,7 +265,9 @@ nxt_process_child_fixup(nxt_task_t *task, nxt_process_t *process) /* Remove not ready processes. */ nxt_runtime_process_each(rt, p) { - if (nxt_proc_conn_matrix[ptype][nxt_process_type(p)] == 0) { + if (nxt_proc_conn_matrix[ptype][nxt_process_type(p)] == 0 + && p->pid != nxt_ppid) /* Always keep parent's port. */ + { nxt_debug(task, "remove not required process %PI", p->pid); nxt_process_close_ports(task, p); @@ -139,7 +291,7 @@ nxt_process_child_fixup(nxt_task_t *task, nxt_process_t *process) } -nxt_pid_t +static nxt_pid_t nxt_process_create(nxt_task_t *task, nxt_process_t *process) { nxt_int_t ret; @@ -168,9 +320,8 @@ nxt_process_create(nxt_task_t *task, nxt_process_t *process) return -1; } - nxt_runtime_process_add(task, process); - - if (nxt_slow_path(nxt_process_setup(task, process) != NXT_OK)) { + ret = nxt_process_setup(task, process); + if (nxt_slow_path(ret != NXT_OK)) { nxt_process_quit(task, 1); } @@ -190,6 +341,7 @@ nxt_process_create(nxt_task_t *task, nxt_process_t *process) #endif process->pid = pid; + process->isolated_pid = pid; nxt_runtime_process_add(task, process); @@ -201,7 +353,6 @@ static nxt_int_t nxt_process_setup(nxt_task_t *task, nxt_process_t *process) { nxt_int_t ret; - nxt_port_t *port, *main_port; nxt_thread_t *thread; nxt_runtime_t *rt; nxt_process_init_t *init; @@ -241,17 +392,45 @@ nxt_process_setup(nxt_task_t *task, nxt_process_t *process) return NXT_ERROR; } - main_port = rt->port_by_type[NXT_PROCESS_MAIN]; + nxt_port_read_close(process->parent_port); + nxt_port_write_enable(task, process->parent_port); - nxt_port_read_close(main_port); - nxt_port_write_enable(task, main_port); + /* + * If the parent process is already isolated, rt->pid_isolation is already + * set to 1 at this point. + */ + if (nxt_is_pid_isolated(process)) { + rt->is_pid_isolated = 1; + } + + if (rt->is_pid_isolated + || process->parent_port != rt->port_by_type[NXT_PROCESS_MAIN]) + { + ret = nxt_process_whoami(task, process); + + } else { + ret = nxt_process_do_start(task, process); + } + + return ret; +} + + +static nxt_int_t +nxt_process_do_start(nxt_task_t *task, nxt_process_t *process) +{ + nxt_int_t ret; + nxt_port_t *port; + nxt_process_init_t *init; + + nxt_runtime_process_add(task, process); + init = nxt_process_init(process); port = nxt_process_port_first(process); nxt_port_enable(task, port, init->port_handlers); ret = init->setup(task, process); - if (nxt_slow_path(ret != NXT_OK)) { return NXT_ERROR; } @@ -288,6 +467,113 @@ nxt_process_setup(nxt_task_t *task, nxt_process_t *process) static nxt_int_t +nxt_process_whoami(nxt_task_t *task, nxt_process_t *process) +{ + uint32_t stream; + nxt_fd_t fd; + nxt_buf_t *buf; + nxt_int_t ret; + nxt_port_t *my_port, *main_port; + nxt_runtime_t *rt; + + rt = task->thread->runtime; + + my_port = nxt_process_port_first(process); + main_port = rt->port_by_type[NXT_PROCESS_MAIN]; + + nxt_assert(my_port != NULL && main_port != NULL); + + nxt_port_enable(task, my_port, &nxt_process_whoami_port_handlers); + + buf = nxt_buf_mem_alloc(main_port->mem_pool, sizeof(nxt_pid_t), 0); + if (nxt_slow_path(buf == NULL)) { + return NXT_ERROR; + } + + buf->mem.free = nxt_cpymem(buf->mem.free, &nxt_ppid, sizeof(nxt_pid_t)); + + stream = nxt_port_rpc_register_handler(task, my_port, + nxt_process_whoami_ok, + nxt_process_whoami_error, + main_port->pid, process); + if (nxt_slow_path(stream == 0)) { + nxt_mp_free(main_port->mem_pool, buf); + + return NXT_ERROR; + } + + fd = (process->parent_port != main_port) ? my_port->pair[1] : -1; + + ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_WHOAMI, + fd, stream, my_port->id, buf); + + if (nxt_slow_path(ret != NXT_OK)) { + nxt_alert(task, "%s failed to send WHOAMI message", process->name); + nxt_port_rpc_cancel(task, my_port, stream); + nxt_mp_free(main_port->mem_pool, buf); + + return NXT_ERROR; + } + + return NXT_OK; +} + + +static void +nxt_process_whoami_ok(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) +{ + nxt_pid_t pid, isolated_pid; + nxt_buf_t *buf; + nxt_port_t *port; + nxt_process_t *process; + nxt_runtime_t *rt; + + process = data; + + buf = msg->buf; + + nxt_assert(nxt_buf_used_size(buf) == sizeof(nxt_pid_t)); + + nxt_memcpy(&pid, buf->mem.pos, sizeof(nxt_pid_t)); + + isolated_pid = nxt_pid; + + if (isolated_pid != pid) { + nxt_pid = pid; + process->pid = pid; + + nxt_process_port_each(process, port) { + port->pid = pid; + } nxt_process_port_loop; + } + + rt = task->thread->runtime; + + if (process->parent_port != rt->port_by_type[NXT_PROCESS_MAIN]) { + port = process->parent_port; + + (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_PROCESS_CREATED, + -1, 0, 0, NULL); + + nxt_log(task, NXT_LOG_INFO, "%s started", process->name); + } + + if (nxt_slow_path(nxt_process_do_start(task, process) != NXT_OK)) { + nxt_process_quit(task, 1); + } +} + + +static void +nxt_process_whoami_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) +{ + nxt_alert(task, "WHOAMI error"); + + nxt_process_quit(task, 1); +} + + +static nxt_int_t nxt_process_send_created(nxt_task_t *task, nxt_process_t *process) { uint32_t stream; @@ -336,6 +622,9 @@ nxt_process_created_ok(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) nxt_process_init_t *init; process = data; + + process->state = NXT_PROCESS_STATE_READY; + init = nxt_process_init(process); ret = nxt_process_apply_creds(task, process); @@ -345,11 +634,23 @@ nxt_process_created_ok(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) nxt_log(task, NXT_LOG_INFO, "%s started", process->name); + ret = nxt_process_send_ready(task, process); + if (nxt_slow_path(ret != NXT_OK)) { + goto fail; + } + ret = init->start(task, &process->data); -fail: + if (nxt_process_type(process) != NXT_PROCESS_PROTOTYPE) { + nxt_port_write_close(nxt_process_port_first(process)); + } + + if (nxt_fast_path(ret == NXT_OK)) { + return; + } - nxt_process_quit(task, ret == NXT_OK ? 0 : 1); +fail: + nxt_process_quit(task, 1); } @@ -437,7 +738,8 @@ nxt_process_apply_creds(nxt_task_t *task, nxt_process_t *process) #if (NXT_HAVE_CLONE && NXT_HAVE_CLONE_NEWUSER) if (!cap_setid - && nxt_is_clone_flag_set(process->isolation.clone.flags, NEWUSER)) { + && nxt_is_clone_flag_set(process->isolation.clone.flags, NEWUSER)) + { cap_setid = 1; } #endif @@ -470,17 +772,10 @@ nxt_process_apply_creds(nxt_task_t *task, nxt_process_t *process) static nxt_int_t nxt_process_send_ready(nxt_task_t *task, nxt_process_t *process) { - nxt_int_t ret; - nxt_port_t *main_port; - nxt_runtime_t *rt; - - rt = task->thread->runtime; - - main_port = rt->port_by_type[NXT_PROCESS_MAIN]; - - nxt_assert(main_port != NULL); + nxt_int_t ret; - ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_PROCESS_READY, + ret = nxt_port_socket_write(task, process->parent_port, + NXT_PORT_MSG_PROCESS_READY, -1, process->stream, 0, NULL); if (nxt_slow_path(ret != NXT_OK)) { @@ -680,17 +975,6 @@ nxt_nanosleep(nxt_nsec_t ns) void -nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i) -{ - process->use_count += i; - - if (process->use_count == 0) { - nxt_runtime_process_release(task->thread->runtime, process); - } -} - - -void nxt_process_port_add(nxt_task_t *task, nxt_process_t *process, nxt_port_t *port) { nxt_assert(port->process == NULL); diff --git a/src/nxt_process.h b/src/nxt_process.h index 4f24b179..c92eebd8 100644 --- a/src/nxt_process.h +++ b/src/nxt_process.h @@ -99,19 +99,26 @@ typedef struct { struct nxt_process_s { nxt_pid_t pid; - const char *name; - nxt_queue_t ports; /* of nxt_port_t */ + nxt_queue_t ports; /* of nxt_port_t.link */ nxt_process_state_t state; nxt_bool_t registered; nxt_int_t use_count; nxt_port_mmaps_t incoming; + + nxt_pid_t isolated_pid; + const char *name; + nxt_port_t *parent_port; + uint32_t stream; nxt_mp_t *mem_pool; nxt_credential_t *user_cred; + nxt_queue_t children; /* of nxt_process_t.link */ + nxt_queue_link_t link; /* for nxt_process_t.children */ + nxt_process_data_t data; nxt_process_isolation_t isolation; @@ -148,8 +155,6 @@ extern nxt_bool_t nxt_proc_conn_matrix[NXT_PROCESS_MAX][NXT_PROCESS_MAX]; extern nxt_bool_t nxt_proc_remove_notify_matrix[NXT_PROCESS_MAX][NXT_PROCESS_MAX]; -NXT_EXPORT nxt_pid_t nxt_process_create(nxt_task_t *task, - nxt_process_t *process); NXT_EXPORT nxt_pid_t nxt_process_execute(nxt_task_t *task, char *name, char **argv, char **envp); NXT_EXPORT nxt_int_t nxt_process_daemon(nxt_task_t *task); @@ -176,6 +181,10 @@ NXT_EXPORT void nxt_process_port_add(nxt_task_t *task, nxt_process_t *process, #define nxt_process_port_loop \ nxt_queue_loop +nxt_process_t *nxt_process_new(nxt_runtime_t *rt); +void nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i); +nxt_int_t nxt_process_init_start(nxt_task_t *task, nxt_process_init_t init); +nxt_int_t nxt_process_start(nxt_task_t *task, nxt_process_t *process); nxt_process_type_t nxt_process_type(nxt_process_t *process); void nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i); diff --git a/src/nxt_process_type.h b/src/nxt_process_type.h index 14deda19..d0093431 100644 --- a/src/nxt_process_type.h +++ b/src/nxt_process_type.h @@ -13,6 +13,7 @@ typedef enum { NXT_PROCESS_DISCOVERY, NXT_PROCESS_CONTROLLER, NXT_PROCESS_ROUTER, + NXT_PROCESS_PROTOTYPE, NXT_PROCESS_APP, NXT_PROCESS_MAX, diff --git a/src/nxt_router.c b/src/nxt_router.c index 39d375f8..7623ccbb 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -27,7 +27,6 @@ typedef struct { uint32_t spare_processes; nxt_msec_t timeout; nxt_msec_t idle_timeout; - uint32_t requests; nxt_conf_value_t *limits_value; nxt_conf_value_t *processes_value; nxt_conf_value_t *targets_value; @@ -66,12 +65,14 @@ typedef struct { typedef struct { nxt_app_t *app; nxt_router_temp_conf_t *temp_conf; + uint8_t proto; /* 1 bit */ } nxt_app_rpc_t; typedef struct { nxt_app_joint_t *app_joint; uint32_t generation; + uint8_t proto; /* 1 bit */ } nxt_app_joint_rpc_t; @@ -228,8 +229,8 @@ static void nxt_router_app_port_error(nxt_task_t *task, static void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i); static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app); -static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, - nxt_apr_action_t action); +static void nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app, + nxt_port_t *port, nxt_apr_action_t action); static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app, nxt_request_rpc_data_t *req_rpc_data); static void nxt_router_http_request_error(nxt_task_t *task, void *obj, @@ -393,32 +394,52 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, { size_t size; uint32_t stream; - nxt_mp_t *mp; nxt_int_t ret; nxt_app_t *app; nxt_buf_t *b; - nxt_port_t *main_port; + nxt_port_t *dport; nxt_runtime_t *rt; nxt_app_joint_rpc_t *app_joint_rpc; app = data; - rt = task->thread->runtime; - main_port = rt->port_by_type[NXT_PROCESS_MAIN]; + nxt_thread_mutex_lock(&app->mutex); - nxt_debug(task, "app '%V' %p start process", &app->name, app); + dport = app->proto_port; - size = app->name.length + 1 + app->conf.length; + nxt_thread_mutex_unlock(&app->mutex); - b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size); + if (dport != NULL) { + nxt_debug(task, "app '%V' %p start process", &app->name, app); - if (nxt_slow_path(b == NULL)) { - goto failed; - } + b = NULL; - nxt_buf_cpystr(b, &app->name); - *b->mem.free++ = '\0'; - nxt_buf_cpystr(b, &app->conf); + } else { + if (app->proto_port_requests > 0) { + nxt_debug(task, "app '%V' %p wait for prototype process", + &app->name, app); + + app->proto_port_requests++; + + goto skip; + } + + nxt_debug(task, "app '%V' %p start prototype process", &app->name, app); + + rt = task->thread->runtime; + dport = rt->port_by_type[NXT_PROCESS_MAIN]; + + size = app->name.length + 1 + app->conf.length; + + b = nxt_buf_mem_alloc(task->thread->engine->mem_pool, size, 0); + if (nxt_slow_path(b == NULL)) { + goto failed; + } + + nxt_buf_cpystr(b, &app->name); + *b->mem.free++ = '\0'; + nxt_buf_cpystr(b, &app->conf); + } app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port, nxt_router_app_port_ready, @@ -430,7 +451,7 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, stream = nxt_port_rpc_ex_stream(app_joint_rpc); - ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS, + ret = nxt_port_socket_write(task, dport, NXT_PORT_MSG_START_PROCESS, -1, stream, port->id, b); if (nxt_slow_path(ret != NXT_OK)) { nxt_port_rpc_cancel(task, port, stream); @@ -440,26 +461,23 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, app_joint_rpc->app_joint = app->joint; app_joint_rpc->generation = app->generation; + app_joint_rpc->proto = (b != NULL); - nxt_router_app_joint_use(task, app->joint, 1); + if (b != NULL) { + app->proto_port_requests++; - nxt_router_app_use(task, app, -1); + b = NULL; + } - return; + nxt_router_app_joint_use(task, app->joint, 1); failed: if (b != NULL) { - mp = b->data; - nxt_mp_free(mp, b); - nxt_mp_release(mp); + nxt_mp_free(b->data, b); } - nxt_thread_mutex_lock(&app->mutex); - - app->pending_processes--; - - nxt_thread_mutex_unlock(&app->mutex); +skip: nxt_router_app_use(task, app, -1); } @@ -583,14 +601,15 @@ nxt_request_rpc_data_unlink(nxt_task_t *task, nxt_router_msg_cancel(task, req_rpc_data); + app = req_rpc_data->app; + if (req_rpc_data->app_port != NULL) { - nxt_router_app_port_release(task, req_rpc_data->app_port, + nxt_router_app_port_release(task, app, req_rpc_data->app_port, req_rpc_data->apr_action); req_rpc_data->app_port = NULL; } - app = req_rpc_data->app; r = req_rpc_data->request; if (r != NULL) { @@ -658,6 +677,12 @@ nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_router_greet_controller(task, msg->u.new_port); } + if (port != NULL && port->type == NXT_PROCESS_PROTOTYPE) { + nxt_port_rpc_handler(task, msg); + + return; + } + if (port == NULL || port->type != NXT_PROCESS_APP) { if (msg->port_msg.stream == 0) { @@ -683,6 +708,8 @@ nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) return; } + nxt_debug(task, "new port id %d (%d)", port->id, port->type); + /* * Port with "id == 0" is application 'main' port and it always * should come with non-zero stream. @@ -819,7 +846,8 @@ nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_app_t *app; nxt_int_t ret; nxt_str_t app_name; - nxt_port_t *port, *reply_port, *shared_port, *old_shared_port; + nxt_port_t *reply_port, *shared_port, *old_shared_port; + nxt_port_t *proto_port; nxt_port_msg_type_t reply; reply_port = nxt_runtime_port_find(task->thread->runtime, @@ -862,12 +890,15 @@ nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_thread_mutex_lock(&app->mutex); - nxt_queue_each(port, &app->ports, nxt_port_t, app_link) { + proto_port = app->proto_port; - (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, - 0, 0, NULL); + if (proto_port != NULL) { + nxt_debug(task, "send QUIT to prototype '%V' pid %PI", &app->name, + proto_port->pid); - } nxt_queue_loop; + app->proto_port = NULL; + proto_port->app = NULL; + } app->generation++; @@ -883,6 +914,15 @@ nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_port_close(task, old_shared_port); nxt_port_use(task, old_shared_port, -1); + if (proto_port != NULL) { + (void) nxt_port_socket_write(task, proto_port, NXT_PORT_MSG_QUIT, + -1, 0, 0, NULL); + + nxt_port_close(task, proto_port); + + nxt_port_use(task, proto_port, -1); + } + reply = NXT_PORT_MSG_RPC_READY_LAST; } else { @@ -1292,12 +1332,6 @@ static nxt_conf_map_t nxt_router_app_limits_conf[] = { NXT_CONF_MAP_MSEC, offsetof(nxt_router_app_conf_t, timeout), }, - - { - nxt_string("requests"), - NXT_CONF_MAP_INT32, - offsetof(nxt_router_app_conf_t, requests), - }, }; @@ -1566,7 +1600,6 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, apcf.spare_processes = 0; apcf.timeout = 0; apcf.idle_timeout = 15000; - apcf.requests = 0; apcf.limits_value = NULL; apcf.processes_value = NULL; apcf.targets_value = NULL; @@ -1646,7 +1679,6 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_debug(task, "application type: %V", &apcf.type); nxt_debug(task, "application processes: %D", apcf.processes); nxt_debug(task, "application request timeout: %M", apcf.timeout); - nxt_debug(task, "application requests: %D", apcf.requests); lang = nxt_app_lang_module(task->thread->runtime, &apcf.type); @@ -1677,7 +1709,6 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, ? apcf.spare_processes : 1; app->timeout = apcf.timeout; app->idle_timeout = apcf.idle_timeout; - app->max_requests = apcf.requests; app->targets = targets; @@ -2744,54 +2775,67 @@ nxt_router_app_rpc_create(nxt_task_t *task, uint32_t stream; nxt_int_t ret; nxt_buf_t *b; - nxt_port_t *main_port, *router_port; + nxt_port_t *router_port, *dport; nxt_runtime_t *rt; nxt_app_rpc_t *rpc; - rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_app_rpc_t)); - if (rpc == NULL) { - goto fail; - } + rt = task->thread->runtime; - rpc->app = app; - rpc->temp_conf = tmcf; + dport = app->proto_port; - nxt_debug(task, "app '%V' prefork", &app->name); + if (dport == NULL) { + nxt_debug(task, "app '%V' prototype prefork", &app->name); - size = app->name.length + 1 + app->conf.length; + size = app->name.length + 1 + app->conf.length; - b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); - if (nxt_slow_path(b == NULL)) { - goto fail; - } + b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); + if (nxt_slow_path(b == NULL)) { + goto fail; + } - b->completion_handler = nxt_buf_dummy_completion; + b->completion_handler = nxt_buf_dummy_completion; - nxt_buf_cpystr(b, &app->name); - *b->mem.free++ = '\0'; - nxt_buf_cpystr(b, &app->conf); + nxt_buf_cpystr(b, &app->name); + *b->mem.free++ = '\0'; + nxt_buf_cpystr(b, &app->conf); + + dport = rt->port_by_type[NXT_PROCESS_MAIN]; + + } else { + nxt_debug(task, "app '%V' prefork", &app->name); + + b = NULL; + } - rt = task->thread->runtime; - main_port = rt->port_by_type[NXT_PROCESS_MAIN]; router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; - stream = nxt_port_rpc_register_handler(task, router_port, + rpc = nxt_port_rpc_register_handler_ex(task, router_port, nxt_router_app_prefork_ready, nxt_router_app_prefork_error, - -1, rpc); - if (nxt_slow_path(stream == 0)) { + sizeof(nxt_app_rpc_t)); + if (nxt_slow_path(rpc == NULL)) { goto fail; } - ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS, - -1, stream, router_port->id, b); + rpc->app = app; + rpc->temp_conf = tmcf; + rpc->proto = (b != NULL); + + stream = nxt_port_rpc_ex_stream(rpc); + ret = nxt_port_socket_write(task, dport, + NXT_PORT_MSG_START_PROCESS, + -1, stream, router_port->id, b); if (nxt_slow_path(ret != NXT_OK)) { nxt_port_rpc_cancel(task, router_port, stream); goto fail; } - app->pending_processes++; + if (b == NULL) { + nxt_port_rpc_ex_set_peer(task, router_port, rpc, dport->pid); + + app->pending_processes++; + } return; @@ -2816,9 +2860,24 @@ nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, port = msg->u.new_port; nxt_assert(port != NULL); - nxt_assert(port->type == NXT_PROCESS_APP); nxt_assert(port->id == 0); + if (rpc->proto) { + nxt_assert(app->proto_port == NULL); + nxt_assert(port->type == NXT_PROCESS_PROTOTYPE); + + nxt_port_inc_use(port); + + app->proto_port = port; + port->app = app; + + nxt_router_app_rpc_create(task, rpc->temp_conf, app); + + return; + } + + nxt_assert(port->type == NXT_PROCESS_APP); + port->app = app; port->main_app_port = port; @@ -2860,10 +2919,16 @@ nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, app = rpc->app; tmcf = rpc->temp_conf; - nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"", - &app->name); + if (rpc->proto) { + nxt_log(task, NXT_LOG_WARN, "failed to start prototype \"%V\"", + &app->name); - app->pending_processes--; + } else { + nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"", + &app->name); + + app->pending_processes--; + } nxt_router_conf_error(task, tmcf); } @@ -4211,7 +4276,7 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_thread_mutex_unlock(&app->mutex); - nxt_router_app_port_release(task, app_port, NXT_APR_UPGRADE); + nxt_router_app_port_release(task, app, app_port, NXT_APR_UPGRADE); req_rpc_data->apr_action = NXT_APR_CLOSE; nxt_debug(task, "stream #%uD upgrade", req_rpc_data->stream); @@ -4422,8 +4487,9 @@ static void nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) { + uint32_t n; nxt_app_t *app; - nxt_bool_t start_process; + nxt_bool_t start_process, restarted; nxt_port_t *port; nxt_app_joint_t *app_joint; nxt_app_joint_rpc_t *app_joint_rpc; @@ -4436,7 +4502,6 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_assert(app_joint != NULL); nxt_assert(port != NULL); - nxt_assert(port->type == NXT_PROCESS_APP); nxt_assert(port->id == 0); app = app_joint->app; @@ -4453,11 +4518,51 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_thread_mutex_lock(&app->mutex); + restarted = (app->generation != app_joint_rpc->generation); + + if (app_joint_rpc->proto) { + nxt_assert(app->proto_port == NULL); + nxt_assert(port->type == NXT_PROCESS_PROTOTYPE); + + n = app->proto_port_requests; + app->proto_port_requests = 0; + + if (nxt_slow_path(restarted)) { + nxt_thread_mutex_unlock(&app->mutex); + + nxt_debug(task, "proto port ready for restarted app, send QUIT"); + + nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, + NULL); + + } else { + port->app = app; + app->proto_port = port; + + nxt_thread_mutex_unlock(&app->mutex); + + nxt_port_use(task, port, 1); + } + + port = task->thread->runtime->port_by_type[NXT_PROCESS_ROUTER]; + + while (n > 0) { + nxt_router_app_use(task, app, 1); + + nxt_router_start_app_process_handler(task, port, app); + + n--; + } + + return; + } + + nxt_assert(port->type == NXT_PROCESS_APP); nxt_assert(app->pending_processes != 0); app->pending_processes--; - if (nxt_slow_path(app->generation != app_joint_rpc->generation)) { + if (nxt_slow_path(restarted)) { nxt_debug(task, "new port ready for restarted app, send QUIT"); start_process = !task->thread->engine->shutdown @@ -4493,7 +4598,7 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_router_app_shared_port_send(task, port); - nxt_router_app_port_release(task, port, NXT_APR_NEW_PORT); + nxt_router_app_port_release(task, app, port, NXT_APR_NEW_PORT); } @@ -4600,7 +4705,6 @@ nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, } - nxt_inline nxt_port_t * nxt_router_app_get_port_for_quit(nxt_task_t *task, nxt_app_t *app) { @@ -4670,19 +4774,15 @@ nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app) static void -nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, +nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app, nxt_port_t *port, nxt_apr_action_t action) { int inc_use; uint32_t got_response, dec_requests; - nxt_app_t *app; - nxt_bool_t port_unchained, send_quit, adjust_idle_timer; + nxt_bool_t adjust_idle_timer; nxt_port_t *main_app_port; nxt_assert(port != NULL); - nxt_assert(port->app != NULL); - - app = port->app; inc_use = 0; got_response = 0; @@ -4725,40 +4825,18 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, nxt_thread_mutex_lock(&app->mutex); - main_app_port->app_responses += got_response; main_app_port->active_requests -= got_response + dec_requests; app->active_requests -= got_response + dec_requests; - if (main_app_port->pair[1] != -1 - && (app->max_requests == 0 - || main_app_port->app_responses < app->max_requests)) - { - if (main_app_port->app_link.next == NULL) { - nxt_queue_insert_tail(&app->ports, &main_app_port->app_link); + if (main_app_port->pair[1] != -1 && main_app_port->app_link.next == NULL) { + nxt_queue_insert_tail(&app->ports, &main_app_port->app_link); - nxt_port_inc_use(main_app_port); - } - } - - send_quit = (app->max_requests > 0 - && main_app_port->app_responses >= app->max_requests); - - if (send_quit) { - port_unchained = nxt_queue_chk_remove(&main_app_port->app_link); - - nxt_port_hash_remove(&app->port_hash, main_app_port); - app->port_hash_count--; - - main_app_port->app = NULL; - app->processes--; - - } else { - port_unchained = 0; + nxt_port_inc_use(main_app_port); } adjust_idle_timer = 0; - if (main_app_port->pair[1] != -1 && !send_quit + if (main_app_port->pair[1] != -1 && main_app_port->active_requests == 0 && main_app_port->active_websockets == 0 && main_app_port->idle_link.next == NULL) @@ -4803,19 +4881,6 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, goto adjust_use; } - if (send_quit) { - nxt_debug(task, "app '%V' %p send QUIT to port", &app->name, app); - - nxt_port_socket_write(task, main_app_port, NXT_PORT_MSG_QUIT, -1, 0, 0, - NULL); - - if (port_unchained) { - nxt_port_use(task, main_app_port, -1); - } - - goto adjust_use; - } - nxt_debug(task, "app '%V' %p requests queue is empty, keep the port", &app->name, app); @@ -4839,6 +4904,20 @@ nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port) nxt_thread_mutex_lock(&app->mutex); + if (port == app->proto_port) { + app->proto_port = NULL; + port->app = NULL; + + nxt_thread_mutex_unlock(&app->mutex); + + nxt_debug(task, "app '%V' prototype pid %PI closed", &app->name, + port->pid); + + nxt_port_use(task, port, -1); + + return; + } + nxt_port_hash_remove(&app->port_hash, port); app->port_hash_count--; @@ -5027,7 +5106,7 @@ static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data) { nxt_app_t *app; - nxt_port_t *port; + nxt_port_t *port, *proto_port; nxt_app_joint_t *app_joint; app_joint = obj; @@ -5039,10 +5118,6 @@ nxt_router_free_app(nxt_task_t *task, void *obj, void *data) break; } - nxt_debug(task, "send QUIT to app '%V' pid %PI", &app->name, port->pid); - - nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); - nxt_port_use(task, port, -1); } @@ -5063,8 +5138,28 @@ nxt_router_free_app(nxt_task_t *task, void *obj, void *data) nxt_port_use(task, port, -1); } + proto_port = app->proto_port; + + if (proto_port != NULL) { + nxt_debug(task, "send QUIT to prototype '%V' pid %PI", &app->name, + proto_port->pid); + + app->proto_port = NULL; + proto_port->app = NULL; + } + nxt_thread_mutex_unlock(&app->mutex); + if (proto_port != NULL) { + nxt_port_socket_write(task, proto_port, NXT_PORT_MSG_QUIT, + -1, 0, 0, NULL); + + nxt_port_close(task, proto_port); + + nxt_port_use(task, proto_port, -1); + } + + nxt_assert(app->proto_port == NULL); nxt_assert(app->processes == 0); nxt_assert(app->active_requests == 0); nxt_assert(app->port_hash_count == 0); @@ -5501,8 +5596,8 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r, *p++ = '\0'; } - req->query_length = r->args != NULL ? (uint32_t) r->args->length : 0; - if (r->args != NULL && r->args->start != NULL) { + req->query_length = (uint32_t) r->args->length; + if (r->args->start != NULL) { query_pos = nxt_pointer_to(target_pos, r->args->start - r->target.start); diff --git a/src/nxt_router.h b/src/nxt_router.h index fc068b53..7e337d27 100644 --- a/src/nxt_router.h +++ b/src/nxt_router.h @@ -124,9 +124,9 @@ struct nxt_app_s { uint32_t max_processes; uint32_t spare_processes; uint32_t max_pending_processes; - uint32_t max_requests; uint32_t generation; + uint32_t proto_port_requests; nxt_msec_t timeout; nxt_msec_t idle_timeout; @@ -145,6 +145,7 @@ struct nxt_app_s { nxt_app_joint_t *joint; nxt_port_t *shared_port; + nxt_port_t *proto_port; nxt_port_mmaps_t outgoing; }; diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c index 8a86d38a..46955f1c 100644 --- a/src/nxt_runtime.c +++ b/src/nxt_runtime.c @@ -40,8 +40,6 @@ static void nxt_runtime_thread_pool_init(void); static void nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj, void *data); static nxt_process_t *nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid); -static void nxt_runtime_process_remove(nxt_runtime_t *rt, - nxt_process_t *process); static void nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port); @@ -504,7 +502,9 @@ nxt_runtime_stop_app_processes(nxt_task_t *task, nxt_runtime_t *rt) init = nxt_process_init(process); - if (init->type == NXT_PROCESS_APP) { + if (init->type == NXT_PROCESS_APP + || init->type == NXT_PROCESS_PROTOTYPE) + { nxt_process_port_each(process, port) { @@ -528,6 +528,8 @@ nxt_runtime_stop_all_processes(nxt_task_t *task, nxt_runtime_t *rt) nxt_process_port_each(process, port) { + nxt_debug(task, "%d sending quit to %PI", rt->type, port->pid); + (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); @@ -580,6 +582,7 @@ nxt_runtime_exit(nxt_task_t *task, void *obj, void *data) nxt_runtime_process_each(rt, process) { + nxt_runtime_process_remove(rt, process); nxt_process_close_ports(task, process); } nxt_runtime_process_loop; @@ -839,6 +842,21 @@ nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt) slash = "/"; } + ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%sversion%Z", + rt->state, slash); + if (nxt_slow_path(ret != NXT_OK)) { + return NXT_ERROR; + } + + rt->ver = (char *) file_name.start; + + ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s.tmp%Z", rt->ver); + if (nxt_slow_path(ret != NXT_OK)) { + return NXT_ERROR; + } + + rt->ver_tmp = (char *) file_name.start; + ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%sconf.json%Z", rt->state, slash); if (nxt_slow_path(ret != NXT_OK)) { @@ -1370,37 +1388,24 @@ nxt_runtime_pid_file_create(nxt_task_t *task, nxt_file_name_t *pid_file) } -nxt_process_t * -nxt_runtime_process_new(nxt_runtime_t *rt) -{ - nxt_process_t *process; - - /* TODO: memory failures. */ - - process = nxt_mp_zalloc(rt->mem_pool, - sizeof(nxt_process_t) + sizeof(nxt_process_init_t)); - - if (nxt_slow_path(process == NULL)) { - return NULL; - } - - nxt_queue_init(&process->ports); - - nxt_thread_mutex_create(&process->incoming.mutex); - - process->use_count = 1; - - return process; -} - - void nxt_runtime_process_release(nxt_runtime_t *rt, nxt_process_t *process) { + nxt_process_t *child; + if (process->registered == 1) { nxt_runtime_process_remove(rt, process); } + if (process->link.next != NULL) { + nxt_queue_remove(&process->link); + } + + nxt_queue_each(child, &process->children, nxt_process_t, link) { + nxt_queue_remove(&child->link); + child->link.next = NULL; + } nxt_queue_loop; + nxt_assert(process->use_count == 0); nxt_assert(process->registered == 0); @@ -1497,7 +1502,7 @@ nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid) return process; } - process = nxt_runtime_process_new(rt); + process = nxt_process_new(rt); if (nxt_slow_path(process == NULL)) { nxt_thread_mutex_unlock(&rt->processes_mutex); @@ -1586,7 +1591,7 @@ nxt_runtime_process_add(nxt_task_t *task, nxt_process_t *process) } -static void +void nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process) { nxt_pid_t pid; diff --git a/src/nxt_runtime.h b/src/nxt_runtime.h index 0fb8c9a1..d7fe2f38 100644 --- a/src/nxt_runtime.h +++ b/src/nxt_runtime.h @@ -54,6 +54,7 @@ struct nxt_runtime_s { uint8_t daemon; uint8_t batch; uint8_t status; + uint8_t is_pid_isolated; const char *engine; uint32_t engine_connections; @@ -65,6 +66,8 @@ struct nxt_runtime_s { const char *log; const char *modules; const char *state; + const char *ver; + const char *ver_tmp; const char *conf; const char *conf_tmp; const char *control; @@ -92,9 +95,8 @@ nxt_int_t nxt_runtime_thread_pool_create(nxt_thread_t *thr, nxt_runtime_t *rt, nxt_uint_t max_threads, nxt_nsec_t timeout); -nxt_process_t *nxt_runtime_process_new(nxt_runtime_t *rt); - void nxt_runtime_process_add(nxt_task_t *task, nxt_process_t *process); +void nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process); nxt_process_t *nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid); diff --git a/src/nxt_socket.h b/src/nxt_socket.h index 7403de3d..ec21d779 100644 --- a/src/nxt_socket.h +++ b/src/nxt_socket.h @@ -114,8 +114,8 @@ NXT_EXPORT nxt_int_t nxt_socketpair_create(nxt_task_t *task, NXT_EXPORT void nxt_socketpair_close(nxt_task_t *task, nxt_socket_t *pair); NXT_EXPORT ssize_t nxt_socketpair_send(nxt_fd_event_t *ev, nxt_fd_t *fd, nxt_iobuf_t *iob, nxt_uint_t niob); -NXT_EXPORT ssize_t nxt_socketpair_recv(nxt_fd_event_t *ev, nxt_fd_t *fd, - nxt_iobuf_t *iob, nxt_uint_t niob); +NXT_EXPORT ssize_t nxt_socketpair_recv(nxt_fd_event_t *ev, + nxt_iobuf_t *iob, nxt_uint_t niob, void *oob); #define \ diff --git a/src/nxt_socket_msg.c b/src/nxt_socket_msg.c new file mode 100644 index 00000000..3b35ab29 --- /dev/null +++ b/src/nxt_socket_msg.c @@ -0,0 +1,57 @@ +/* + * Copyright (C) Igor Sysoev + * Copyright (C) NGINX, Inc. + */ + +#include <nxt_main.h> +#include <nxt_socket_msg.h> + + +ssize_t +nxt_sendmsg(nxt_socket_t s, nxt_iobuf_t *iob, nxt_uint_t niob, + const nxt_send_oob_t *oob) +{ + struct msghdr msg; + + msg.msg_name = NULL; + msg.msg_namelen = 0; + msg.msg_iov = iob; + msg.msg_iovlen = niob; + /* Flags are cleared just to suppress valgrind warning. */ + msg.msg_flags = 0; + + if (oob != NULL && oob->size != 0) { + msg.msg_control = (void *) oob->buf; + msg.msg_controllen = oob->size; + + } else { + msg.msg_control = NULL; + msg.msg_controllen = 0; + } + + return sendmsg(s, &msg, 0); +} + + +ssize_t +nxt_recvmsg(nxt_socket_t s, nxt_iobuf_t *iob, nxt_uint_t niob, + nxt_recv_oob_t *oob) +{ + ssize_t n; + struct msghdr msg; + + msg.msg_name = NULL; + msg.msg_namelen = 0; + msg.msg_iov = iob; + msg.msg_iovlen = niob; + msg.msg_control = oob->buf; + msg.msg_controllen = sizeof(oob->buf); + + n = recvmsg(s, &msg, 0); + + if (nxt_fast_path(n != -1)) { + oob->size = msg.msg_controllen; + } + + return n; +} diff --git a/src/nxt_socket_msg.h b/src/nxt_socket_msg.h new file mode 100644 index 00000000..04de1761 --- /dev/null +++ b/src/nxt_socket_msg.h @@ -0,0 +1,220 @@ +/* + * Copyright (C) NGINX, Inc. + */ + +#ifndef _NXT_SOCKET_MSG_H_INCLUDED_ +#define _NXT_SOCKET_MSG_H_INCLUDED_ + +#if (NXT_HAVE_UCRED) +#include <sys/un.h> +#endif + + +#if (NXT_HAVE_UCRED) +#define NXT_CRED_USECMSG 1 +#define NXT_CRED_CMSGTYPE SCM_CREDENTIALS +#define NXT_CRED_GETPID(u) (u->pid) + +typedef struct ucred nxt_socket_cred_t; + +#elif (NXT_HAVE_MSGHDR_CMSGCRED) +#define NXT_CRED_USECMSG 1 +#define NXT_CRED_CMSGTYPE SCM_CREDS +#define NXT_CRED_GETPID(u) (u->cmcred_pid) + +typedef struct cmsgcred nxt_socket_cred_t; +#endif + +#if (NXT_CRED_USECMSG) +#define NXT_OOB_RECV_SIZE \ + (CMSG_SPACE(2 * sizeof(int)) + CMSG_SPACE(sizeof(nxt_socket_cred_t))) +#else +#define NXT_OOB_RECV_SIZE \ + CMSG_SPACE(2 * sizeof(int)) +#endif + +#if (NXT_HAVE_MSGHDR_CMSGCRED) +#define NXT_OOB_SEND_SIZE \ + (CMSG_SPACE(2 * sizeof(int)) + CMSG_SPACE(sizeof(nxt_socket_cred_t))) +#else +#define NXT_OOB_SEND_SIZE \ + CMSG_SPACE(2 * sizeof(int)) +#endif + + +typedef struct { + size_t size; + u_char buf[NXT_OOB_RECV_SIZE]; +} nxt_recv_oob_t; + + +typedef struct { + size_t size; + u_char buf[NXT_OOB_SEND_SIZE]; +} nxt_send_oob_t; + + +/** + * The nxt_sendmsg is a wrapper for sendmsg. + * The oob struct must be initialized using nxt_socket_msg_oob_init(). + */ +NXT_EXPORT ssize_t nxt_sendmsg(nxt_socket_t s, nxt_iobuf_t *iob, + nxt_uint_t niob, const nxt_send_oob_t *oob); + +/** + * The nxt_recvmsg is a wrapper for recvmsg. + * The oob buffer must be consumed by using nxt_socket_msg_oob_get(). + */ +NXT_EXPORT ssize_t nxt_recvmsg(nxt_socket_t s, + nxt_iobuf_t *iob, nxt_uint_t niob, nxt_recv_oob_t *oob); + + +nxt_inline void +nxt_socket_msg_oob_init(nxt_send_oob_t *oob, int *fds) +{ + int nfds; + struct cmsghdr *cmsg; + +#if (NXT_HAVE_MSGHDR_CMSGCRED) + cmsg = (struct cmsghdr *) (oob->buf); + /* + * Fill all padding fields with 0. + * Code in Go 1.11 validate cmsghdr using padding field as part of len. + * See Cmsghdr definition and socketControlMessageHeaderAndData function. + */ + nxt_memzero(cmsg, sizeof(struct cmsghdr)); + + cmsg->cmsg_len = CMSG_LEN(sizeof(nxt_socket_cred_t)); + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_type = NXT_CRED_CMSGTYPE; + + oob->size = CMSG_SPACE(sizeof(nxt_socket_cred_t)); + +#else + oob->size = 0; +#endif + + nfds = (fds[0] != -1 ? 1 : 0) + (fds[1] != -1 ? 1 : 0); + + if (nfds == 0) { + return; + } + + cmsg = (struct cmsghdr *) (oob->buf + oob->size); + + nxt_memzero(cmsg, sizeof(struct cmsghdr)); + + cmsg->cmsg_len = CMSG_LEN(nfds * sizeof(int)); + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_type = SCM_RIGHTS; + + /* + * nxt_memcpy() is used instead of simple + * *(int *) CMSG_DATA(&cmsg.cm) = fd; + * because GCC 4.4 with -O2/3/s optimization may issue a warning: + * dereferencing type-punned pointer will break strict-aliasing rules + * + * Fortunately, GCC with -O1 compiles this nxt_memcpy() + * in the same simple assignment as in the code above. + */ + nxt_memcpy(CMSG_DATA(cmsg), fds, nfds * sizeof(int)); + + oob->size += CMSG_SPACE(nfds * sizeof(int)); +} + + +nxt_inline nxt_int_t +nxt_socket_msg_oob_get_fds(nxt_recv_oob_t *oob, nxt_fd_t *fd) +{ + size_t size; + struct msghdr msg; + struct cmsghdr *cmsg; + + msg.msg_control = oob->buf; + msg.msg_controllen = oob->size; + + for (cmsg = CMSG_FIRSTHDR(&msg); + cmsg != NULL; + cmsg = CMSG_NXTHDR(&msg, cmsg)) + { + size = cmsg->cmsg_len - CMSG_LEN(0); + + if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) { + if (nxt_slow_path(size != sizeof(int) && size != 2 * sizeof(int))) { + return NXT_ERROR; + } + + nxt_memcpy(fd, CMSG_DATA(cmsg), size); + + return NXT_OK; + } + } + + return NXT_OK; +} + + +nxt_inline nxt_int_t +nxt_socket_msg_oob_get(nxt_recv_oob_t *oob, nxt_fd_t *fd, nxt_pid_t *pid) +{ + size_t size; + struct msghdr msg; + struct cmsghdr *cmsg; + + if (oob->size == 0) { + return NXT_OK; + } + +#if (NXT_CRED_USECMSG) + *pid = -1; +#endif + + msg.msg_control = oob->buf; + msg.msg_controllen = oob->size; + + for (cmsg = CMSG_FIRSTHDR(&msg); + cmsg != NULL; + cmsg = CMSG_NXTHDR(&msg, cmsg)) + { + size = cmsg->cmsg_len - CMSG_LEN(0); + + if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) { + if (nxt_slow_path(size != sizeof(int) && size != 2 * sizeof(int))) { + return NXT_ERROR; + } + + nxt_memcpy(fd, CMSG_DATA(cmsg), size); + +#if (!NXT_CRED_USECMSG) + break; +#endif + } + +#if (NXT_CRED_USECMSG) + else if (cmsg->cmsg_level == SOL_SOCKET + && cmsg->cmsg_type == NXT_CRED_CMSGTYPE) + { + nxt_socket_cred_t *creds; + + if (nxt_slow_path(size != sizeof(nxt_socket_cred_t))) { + return NXT_ERROR; + } + + creds = (nxt_socket_cred_t *) CMSG_DATA(cmsg); + *pid = NXT_CRED_GETPID(creds); + } +#endif + } + +#if (NXT_CRED_USECMSG) + /* For platforms supporting credential passing, it's enforced */ + if (nxt_slow_path(*pid == -1)) { + return NXT_ERROR; + } +#endif + + return NXT_OK; +} + + +#endif /* _NXT_SOCKET_MSG_H_INCLUDED_ */ diff --git a/src/nxt_socketpair.c b/src/nxt_socketpair.c index 8b9d12bf..45274b78 100644 --- a/src/nxt_socketpair.c +++ b/src/nxt_socketpair.c @@ -5,7 +5,7 @@ */ #include <nxt_main.h> - +#include <nxt_socket_msg.h> /* * SOCK_SEQPACKET protocol is supported for AF_UNIX in Solaris 8 X/Open @@ -20,12 +20,6 @@ #endif -static ssize_t nxt_sendmsg(nxt_socket_t s, nxt_fd_t *fd, nxt_iobuf_t *iob, - nxt_uint_t niob); -static ssize_t nxt_recvmsg(nxt_socket_t s, nxt_fd_t *fd, nxt_iobuf_t *iob, - nxt_uint_t niob); - - nxt_int_t nxt_socketpair_create(nxt_task_t *task, nxt_socket_t *pair) { @@ -52,6 +46,24 @@ nxt_socketpair_create(nxt_task_t *task, nxt_socket_t *pair) goto fail; } +#if NXT_HAVE_SOCKOPT_SO_PASSCRED + int enable_creds = 1; + + if (nxt_slow_path(setsockopt(pair[0], SOL_SOCKET, SO_PASSCRED, + &enable_creds, sizeof(enable_creds)) == -1)) + { + nxt_alert(task, "failed to set SO_PASSCRED %E", nxt_errno); + goto fail; + } + + if (nxt_slow_path(setsockopt(pair[1], SOL_SOCKET, SO_PASSCRED, + &enable_creds, sizeof(enable_creds)) == -1)) + { + nxt_alert(task, "failed to set SO_PASSCRED %E", nxt_errno); + goto fail; + } +#endif + return NXT_OK; fail: @@ -74,11 +86,14 @@ ssize_t nxt_socketpair_send(nxt_fd_event_t *ev, nxt_fd_t *fd, nxt_iobuf_t *iob, nxt_uint_t niob) { - ssize_t n; - nxt_err_t err; + ssize_t n; + nxt_err_t err; + nxt_send_oob_t oob; + + nxt_socket_msg_oob_init(&oob, fd); for ( ;; ) { - n = nxt_sendmsg(ev->fd, fd, iob, niob); + n = nxt_sendmsg(ev->fd, iob, niob, &oob); err = (n == -1) ? nxt_socket_errno : 0; @@ -123,19 +138,19 @@ nxt_socketpair_send(nxt_fd_event_t *ev, nxt_fd_t *fd, nxt_iobuf_t *iob, ssize_t -nxt_socketpair_recv(nxt_fd_event_t *ev, nxt_fd_t *fd, nxt_iobuf_t *iob, - nxt_uint_t niob) +nxt_socketpair_recv(nxt_fd_event_t *ev, nxt_iobuf_t *iob, nxt_uint_t niob, + void *oob) { ssize_t n; nxt_err_t err; for ( ;; ) { - n = nxt_recvmsg(ev->fd, fd, iob, niob); + n = nxt_recvmsg(ev->fd, iob, niob, oob); err = (n == -1) ? nxt_socket_errno : 0; - nxt_debug(ev->task, "recvmsg(%d, %FD, %FD, %ui): %z", ev->fd, fd[0], - fd[1], niob, n); + nxt_debug(ev->task, "recvmsg(%d, %ui, %uz): %z", + ev->fd, niob, ((nxt_recv_oob_t *) oob)->size, n); if (n > 0) { return n; @@ -163,162 +178,10 @@ nxt_socketpair_recv(nxt_fd_event_t *ev, nxt_fd_t *fd, nxt_iobuf_t *iob, continue; default: - nxt_alert(ev->task, "recvmsg(%d, %p, %ui) failed %E", - ev->fd, fd, niob, err); + nxt_alert(ev->task, "recvmsg(%d, %ui) failed %E", + ev->fd, niob, err); return NXT_ERROR; } } } - - -#if (NXT_HAVE_MSGHDR_MSG_CONTROL) - -/* - * Linux, FreeBSD, Solaris X/Open sockets, - * MacOSX, NetBSD, AIX, HP-UX X/Open sockets. - */ - -static ssize_t -nxt_sendmsg(nxt_socket_t s, nxt_fd_t *fd, nxt_iobuf_t *iob, nxt_uint_t niob) -{ - size_t csize; - struct msghdr msg; - union { - struct cmsghdr cm; - char space[CMSG_SPACE(sizeof(int) * 2)]; - } cmsg; - - msg.msg_name = NULL; - msg.msg_namelen = 0; - msg.msg_iov = iob; - msg.msg_iovlen = niob; - /* Flags are cleared just to suppress valgrind warning. */ - msg.msg_flags = 0; - - if (fd[0] != -1) { - csize = (fd[1] == -1) ? sizeof(int) : sizeof(int) * 2; - - msg.msg_control = (caddr_t) &cmsg; - msg.msg_controllen = CMSG_SPACE(csize); - -#if (NXT_VALGRIND) - nxt_memzero(&cmsg, sizeof(cmsg)); -#endif - - cmsg.cm.cmsg_len = CMSG_LEN(csize); - cmsg.cm.cmsg_level = SOL_SOCKET; - cmsg.cm.cmsg_type = SCM_RIGHTS; - - /* - * nxt_memcpy() is used instead of simple - * *(int *) CMSG_DATA(&cmsg.cm) = fd; - * because GCC 4.4 with -O2/3/s optimization may issue a warning: - * dereferencing type-punned pointer will break strict-aliasing rules - * - * Fortunately, GCC with -O1 compiles this nxt_memcpy() - * in the same simple assignment as in the code above. - */ - nxt_memcpy(CMSG_DATA(&cmsg.cm), fd, csize); - - } else { - msg.msg_control = NULL; - msg.msg_controllen = 0; - } - - return sendmsg(s, &msg, 0); -} - - -static ssize_t -nxt_recvmsg(nxt_socket_t s, nxt_fd_t *fd, nxt_iobuf_t *iob, nxt_uint_t niob) -{ - ssize_t n; - struct msghdr msg; - union { - struct cmsghdr cm; - char space[CMSG_SPACE(sizeof(int) * 2)]; - } cmsg; - - msg.msg_name = NULL; - msg.msg_namelen = 0; - msg.msg_iov = iob; - msg.msg_iovlen = niob; - msg.msg_control = (caddr_t) &cmsg; - msg.msg_controllen = sizeof(cmsg); - - fd[0] = -1; - fd[1] = -1; - -#if (NXT_VALGRIND) - nxt_memzero(&cmsg, sizeof(cmsg)); -#endif - - n = recvmsg(s, &msg, 0); - - if (n > 0 - && cmsg.cm.cmsg_level == SOL_SOCKET - && cmsg.cm.cmsg_type == SCM_RIGHTS) - { - if (cmsg.cm.cmsg_len == CMSG_LEN(sizeof(int))) { - nxt_memcpy(fd, CMSG_DATA(&cmsg.cm), sizeof(int)); - } - - if (cmsg.cm.cmsg_len == CMSG_LEN(sizeof(int) * 2)) { - nxt_memcpy(fd, CMSG_DATA(&cmsg.cm), sizeof(int) * 2); - } - } - - return n; -} - -#else - -/* Solaris 4.3BSD sockets. */ - -static ssize_t -nxt_sendmsg(nxt_socket_t s, nxt_fd_t *fd, nxt_iobuf_t *iob, nxt_uint_t niob) -{ - struct msghdr msg; - - msg.msg_name = NULL; - msg.msg_namelen = 0; - msg.msg_iov = iob; - msg.msg_iovlen = niob; - - if (fd[0] != -1) { - msg.msg_accrights = (caddr_t) fd; - msg.msg_accrightslen = sizeof(int); - - if (fd[1] != -1) { - msg.msg_accrightslen += sizeof(int); - } - - } else { - msg.msg_accrights = NULL; - msg.msg_accrightslen = 0; - } - - return sendmsg(s, &msg, 0); -} - - -static ssize_t -nxt_recvmsg(nxt_socket_t s, nxt_fd_t *fd, nxt_iobuf_t *iob, nxt_uint_t niob) -{ - struct msghdr msg; - - fd[0] = -1; - fd[1] = -1; - - msg.msg_name = NULL; - msg.msg_namelen = 0; - msg.msg_iov = iob; - msg.msg_iovlen = niob; - msg.msg_accrights = (caddr_t) fd; - msg.msg_accrightslen = sizeof(int) * 2; - - return recvmsg(s, &msg, 0); -} - -#endif diff --git a/src/nxt_string.c b/src/nxt_string.c index ab568990..b7aef79e 100644 --- a/src/nxt_string.c +++ b/src/nxt_string.c @@ -745,3 +745,100 @@ nxt_is_complex_uri_encoded(u_char *src, size_t length) return 1; } + + +ssize_t +nxt_base64_decode(u_char *dst, u_char *src, size_t length) +{ + u_char *end, *p; + size_t pad; + uint8_t v1, v2, v3, v4; + + static const uint8_t decode[] = { + 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, + 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, + 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 62, 77, 77, 77, 63, + 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 77, 77, 77, 77, 77, 77, + 77, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, + 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 77, 77, 77, 77, 77, + 77, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, + 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 77, 77, 77, 77, 77, + + 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, + 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, + 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, + 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, + 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, + 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, + 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, + 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77, 77 + }; + + end = src + length; + pad = (4 - (length % 4)) % 4; + + if (dst == NULL) { + if (pad > 2) { + return NXT_ERROR; + } + + while (src < end) { + if (decode[*src] != 77) { + src++; + continue; + } + + if (pad == 0) { + pad = end - src; + + if ((pad == 1 || (pad == 2 && src[1] == '=')) && src[0] == '=') + { + break; + } + } + + return NXT_ERROR; + } + + return (length + 3) / 4 * 3 - pad; + } + + nxt_assert(length != 0); + + if (pad == 0) { + pad = (end[-1] == '=') + (end[-2] == '='); + end -= (pad + 3) & 4; + + } else { + end -= 4 - pad; + } + + p = dst; + + while (src < end) { + v1 = decode[src[0]]; + v2 = decode[src[1]]; + v3 = decode[src[2]]; + v4 = decode[src[3]]; + + *p++ = (v1 << 2 | v2 >> 4); + *p++ = (v2 << 4 | v3 >> 2); + *p++ = (v3 << 6 | v4); + + src += 4; + } + + if (pad > 0) { + v1 = decode[src[0]]; + v2 = decode[src[1]]; + + *p++ = (v1 << 2 | v2 >> 4); + + if (pad == 1) { + v3 = decode[src[2]]; + *p++ = (v2 << 4 | v3 >> 2); + } + } + + return (p - dst); +} diff --git a/src/nxt_string.h b/src/nxt_string.h index 7e02f59a..4d565e87 100644 --- a/src/nxt_string.h +++ b/src/nxt_string.h @@ -190,6 +190,8 @@ NXT_EXPORT uintptr_t nxt_encode_complex_uri(u_char *dst, u_char *src, size_t length); NXT_EXPORT nxt_bool_t nxt_is_complex_uri_encoded(u_char *s, size_t length); +NXT_EXPORT ssize_t nxt_base64_decode(u_char *dst, u_char *src, size_t length); + extern const uint8_t nxt_hex2int[256]; diff --git a/src/nxt_thread.h b/src/nxt_thread.h index d7800cc6..2ebc331d 100644 --- a/src/nxt_thread.h +++ b/src/nxt_thread.h @@ -142,14 +142,6 @@ nxt_thread_yield() \ #endif -#if (PTHREAD_STACK_MIN) -#define NXT_THREAD_STACK_MIN PTHREAD_STACK_MIN - -#else -#define NXT_THREAD_STACK_MIN sysconf(_SC_THREAD_STACK_MIN) -#endif - - struct nxt_thread_s { nxt_log_t *log; nxt_log_t main_log; diff --git a/src/nxt_tls.h b/src/nxt_tls.h index eeb4e7ba..0667ade3 100644 --- a/src/nxt_tls.h +++ b/src/nxt_tls.h @@ -92,28 +92,12 @@ struct nxt_tls_init_s { }; -struct nxt_tls_ticket_s { - uint8_t aes128; - u_char name[16]; - u_char hmac_key[32]; - u_char aes_key[32]; -}; - - -struct nxt_tls_tickets_s { - nxt_uint_t count; - nxt_tls_ticket_t tickets[]; -}; - - #if (NXT_HAVE_OPENSSL) extern const nxt_tls_lib_t nxt_openssl_lib; void nxt_cdecl nxt_openssl_log_error(nxt_task_t *task, nxt_uint_t level, const char *fmt, ...); u_char *nxt_openssl_copy_error(u_char *p, u_char *end); -nxt_int_t nxt_openssl_base64_decode(u_char *d, size_t dlen, const u_char *s, - size_t slen); #endif #if (NXT_HAVE_GNUTLS) diff --git a/src/nxt_unit.c b/src/nxt_unit.c index ae4499d8..06ad1636 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -3,10 +3,9 @@ * Copyright (C) NGINX, Inc. */ -#include <stdlib.h> - #include "nxt_main.h" #include "nxt_port_memory_int.h" +#include "nxt_socket_msg.h" #include "nxt_port_queue.h" #include "nxt_app_queue.h" @@ -25,6 +24,11 @@ #define NXT_UNIT_LOCAL_BUF_SIZE \ (NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t)) +enum { + NXT_QUIT_NORMAL = 0, + NXT_QUIT_GRACEFUL = 1, +}; + typedef struct nxt_unit_impl_s nxt_unit_impl_t; typedef struct nxt_unit_mmap_s nxt_unit_mmap_t; typedef struct nxt_unit_mmaps_s nxt_unit_mmaps_t; @@ -51,7 +55,8 @@ nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev, nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf); static int nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, nxt_unit_port_t *read_port, - int *log_fd, uint32_t *stream, uint32_t *shm_limit); + int *log_fd, uint32_t *stream, uint32_t *shm_limit, + uint32_t *request_limit); static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd); static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf, @@ -130,6 +135,7 @@ static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib, static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib); static int nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx); static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf); +static int nxt_unit_chk_ready(nxt_unit_ctx_t *ctx); static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx); static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx); nxt_inline int nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf); @@ -150,20 +156,20 @@ static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue); static void nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx, nxt_queue_t *awaiting_req); -static void nxt_unit_remove_port(nxt_unit_impl_t *lib, +static void nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id); static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id); static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid); static void nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process); -static void nxt_unit_quit(nxt_unit_ctx_t *ctx); +static void nxt_unit_quit(nxt_unit_ctx_t *ctx, uint8_t quit_param); static int nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id); static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, const void *buf, size_t buf_size, - const void *oob, size_t oob_size); + const nxt_send_oob_t *oob); static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd, - const void *buf, size_t buf_size, const void *oob, size_t oob_size); + const void *buf, size_t buf_size, const nxt_send_oob_t *oob); static int nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf); nxt_inline void nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst, @@ -174,7 +180,7 @@ static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf); static int nxt_unit_port_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf); -static int nxt_unit_app_queue_recv(nxt_unit_port_t *port, +static int nxt_unit_app_queue_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf); nxt_inline int nxt_unit_close(int fd); static int nxt_unit_fd_blocking(int fd); @@ -271,8 +277,8 @@ struct nxt_unit_read_buf_s { nxt_queue_link_t link; nxt_unit_ctx_impl_t *ctx_impl; ssize_t size; + nxt_recv_oob_t oob; char buf[16384]; - char oob[256]; }; @@ -311,8 +317,9 @@ struct nxt_unit_ctx_impl_s { /* of nxt_unit_read_buf_t */ nxt_queue_t free_rbuf; - int online; - int ready; + uint8_t online; /* 1 bit */ + uint8_t ready; /* 1 bit */ + uint8_t quit_param; nxt_unit_mmap_buf_t ctx_buf[2]; nxt_unit_read_buf_t ctx_read_buf; @@ -344,9 +351,11 @@ struct nxt_unit_impl_s { nxt_unit_callbacks_t callbacks; nxt_atomic_t use_count; + nxt_atomic_t request_count; uint32_t request_data_size; uint32_t shm_mmap_limit; + uint32_t request_limit; pthread_mutex_t mutex; @@ -409,16 +418,21 @@ typedef struct { } nxt_unit_port_hash_id_t; +static pid_t nxt_unit_pid; + + nxt_unit_ctx_t * nxt_unit_init(nxt_unit_init_t *init) { int rc, queue_fd; void *mem; - uint32_t ready_stream, shm_limit; + uint32_t ready_stream, shm_limit, request_limit; nxt_unit_ctx_t *ctx; nxt_unit_impl_t *lib; nxt_unit_port_t ready_port, router_port, read_port; + nxt_unit_pid = getpid(); + lib = nxt_unit_create(init); if (nxt_slow_path(lib == NULL)) { return NULL; @@ -446,13 +460,15 @@ nxt_unit_init(nxt_unit_init_t *init) } else { rc = nxt_unit_read_env(&ready_port, &router_port, &read_port, - &lib->log_fd, &ready_stream, &shm_limit); + &lib->log_fd, &ready_stream, &shm_limit, + &request_limit); if (nxt_slow_path(rc != NXT_UNIT_OK)) { goto fail; } lib->shm_mmap_limit = (shm_limit + PORT_MMAP_DATA_SIZE - 1) / PORT_MMAP_DATA_SIZE; + lib->request_limit = request_limit; } if (nxt_slow_path(lib->shm_mmap_limit < 1)) { @@ -460,6 +476,7 @@ nxt_unit_init(nxt_unit_init_t *init) } lib->pid = read_port.id.pid; + nxt_unit_pid = lib->pid; ctx = &lib->main_ctx.ctx; @@ -564,6 +581,7 @@ nxt_unit_create(nxt_unit_init_t *init) lib->request_data_size = init->request_data_size; lib->shm_mmap_limit = (init->shm_limit + PORT_MMAP_DATA_SIZE - 1) / PORT_MMAP_DATA_SIZE; + lib->request_limit = init->request_limit; lib->processes.slot = NULL; lib->ports.slot = NULL; @@ -573,6 +591,7 @@ nxt_unit_create(nxt_unit_init_t *init) nxt_queue_init(&lib->contexts); lib->use_count = 0; + lib->request_count = 0; lib->router_port = NULL; lib->shared_port = NULL; @@ -632,6 +651,7 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, ctx_impl->wait_items = 0; ctx_impl->online = 1; ctx_impl->ready = 0; + ctx_impl->quit_param = NXT_QUIT_GRACEFUL; nxt_queue_init(&ctx_impl->free_req); nxt_queue_init(&ctx_impl->free_ws); @@ -780,7 +800,7 @@ nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf) static int nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream, - uint32_t *shm_limit) + uint32_t *shm_limit, uint32_t *request_limit) { int rc; int ready_fd, router_fd, read_in_fd, read_out_fd; @@ -825,12 +845,12 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, "%"PRId64",%"PRIu32",%d;" "%"PRId64",%"PRIu32",%d;" "%"PRId64",%"PRIu32",%d,%d;" - "%d,%"PRIu32, + "%d,%"PRIu32",%"PRIu32, &ready_stream, &ready_pid, &ready_id, &ready_fd, &router_pid, &router_id, &router_fd, &read_pid, &read_id, &read_in_fd, &read_out_fd, - log_fd, shm_limit); + log_fd, shm_limit, request_limit); if (nxt_slow_path(rc == EOF)) { nxt_unit_alert(NULL, "sscanf(%s) failed: %s (%d) for %s env", @@ -839,9 +859,9 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, return NXT_UNIT_ERROR; } - if (nxt_slow_path(rc != 13)) { + if (nxt_slow_path(rc != 14)) { nxt_unit_alert(NULL, "invalid number of variables in %s env: " - "found %d of %d in %s", NXT_UNIT_INIT_ENV, rc, 13, vars); + "found %d of %d in %s", NXT_UNIT_INIT_ENV, rc, 14, vars); return NXT_UNIT_ERROR; } @@ -876,13 +896,10 @@ static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd) { ssize_t res; + nxt_send_oob_t oob; nxt_port_msg_t msg; nxt_unit_impl_t *lib; - - union { - struct cmsghdr cm; - char space[CMSG_SPACE(sizeof(int))]; - } cmsg; + int fds[2] = {queue_fd, -1}; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); @@ -896,25 +913,9 @@ nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd) msg.mf = 0; msg.tracking = 0; - memset(&cmsg, 0, sizeof(cmsg)); - - cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int)); - cmsg.cm.cmsg_level = SOL_SOCKET; - cmsg.cm.cmsg_type = SCM_RIGHTS; - - /* - * memcpy() is used instead of simple - * *(int *) CMSG_DATA(&cmsg.cm) = fd; - * because GCC 4.4 with -O2/3/s optimization may issue a warning: - * dereferencing type-punned pointer will break strict-aliasing rules - * - * Fortunately, GCC with -O1 compiles this nxt_memcpy() - * in the same simple assignment as in the code above. - */ - memcpy(CMSG_DATA(&cmsg.cm), &queue_fd, sizeof(int)); + nxt_socket_msg_oob_init(&oob, fds); - res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg), - &cmsg, sizeof(cmsg)); + res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg), &oob); if (res != sizeof(msg)) { return NXT_UNIT_ERROR; } @@ -929,7 +930,7 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf, { int rc; pid_t pid; - struct cmsghdr *cm; + uint8_t quit_param; nxt_port_msg_t *port_msg; nxt_unit_impl_t *lib; nxt_unit_recv_msg_t recv_msg; @@ -939,18 +940,12 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf, recv_msg.fd[0] = -1; recv_msg.fd[1] = -1; port_msg = (nxt_port_msg_t *) rbuf->buf; - cm = (struct cmsghdr *) rbuf->oob; - if (cm->cmsg_level == SOL_SOCKET - && cm->cmsg_type == SCM_RIGHTS) - { - if (cm->cmsg_len == CMSG_LEN(sizeof(int))) { - memcpy(recv_msg.fd, CMSG_DATA(cm), sizeof(int)); - } - - if (cm->cmsg_len == CMSG_LEN(sizeof(int) * 2)) { - memcpy(recv_msg.fd, CMSG_DATA(cm), sizeof(int) * 2); - } + rc = nxt_socket_msg_oob_get_fds(&rbuf->oob, recv_msg.fd); + if (nxt_slow_path(rc != NXT_OK)) { + nxt_unit_alert(ctx, "failed to receive file descriptor over cmsg"); + rc = NXT_UNIT_ERROR; + goto done; } recv_msg.incoming_buf = NULL; @@ -959,7 +954,7 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf, if (nxt_slow_path(rbuf->size == 0)) { nxt_unit_debug(ctx, "read port closed"); - nxt_unit_quit(ctx); + nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL); rc = NXT_UNIT_OK; goto done; } @@ -1018,9 +1013,18 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf, break; case _NXT_PORT_MSG_QUIT: - nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream); + if (recv_msg.size == sizeof(quit_param)) { + memcpy(&quit_param, recv_msg.start, sizeof(quit_param)); + + } else { + quit_param = NXT_QUIT_NORMAL; + } + + nxt_unit_debug(ctx, "#%"PRIu32": %squit", port_msg->stream, + (quit_param == NXT_QUIT_GRACEFUL ? "graceful " : "")); + + nxt_unit_quit(ctx, quit_param); - nxt_unit_quit(ctx); rc = NXT_UNIT_OK; break; @@ -1220,15 +1224,36 @@ nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx) nxt_unit_impl_t *lib; nxt_unit_ctx_impl_t *ctx_impl; - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + if (nxt_slow_path(ctx_impl->ready)) { + return NXT_UNIT_OK; + } + ctx_impl->ready = 1; - if (lib->callbacks.ready_handler) { + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + + /* Call ready_handler() only for main context. */ + if (&lib->main_ctx == ctx_impl && lib->callbacks.ready_handler != NULL) { return lib->callbacks.ready_handler(ctx); } + if (&lib->main_ctx != ctx_impl) { + /* Check if the main context is already stopped or quit. */ + if (nxt_slow_path(!lib->main_ctx.ready)) { + ctx_impl->ready = 0; + + nxt_unit_quit(ctx, lib->main_ctx.quit_param); + + return NXT_UNIT_OK; + } + + if (lib->callbacks.add_port != NULL) { + lib->callbacks.add_port(ctx, lib->shared_port); + } + } + return NXT_UNIT_OK; } @@ -1561,7 +1586,7 @@ nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req) msg.type = _NXT_PORT_MSG_REQ_HEADERS_ACK; res = nxt_unit_port_send(req->ctx, req->response_port, - &msg, sizeof(msg), NULL, 0); + &msg, sizeof(msg), NULL); if (nxt_slow_path(res != sizeof(msg))) { return NXT_UNIT_ERROR; } @@ -1741,10 +1766,12 @@ nxt_unit_request_info_get(nxt_unit_ctx_t *ctx) static void nxt_unit_request_info_release(nxt_unit_request_info_t *req) { + nxt_unit_ctx_t *ctx; nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_request_info_impl_t *req_impl; - ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx); + ctx = req->ctx; + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); req->response = NULL; @@ -1783,6 +1810,10 @@ nxt_unit_request_info_release(nxt_unit_request_info_t *req) nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link); pthread_mutex_unlock(&ctx_impl->mutex); + + if (nxt_slow_path(!nxt_unit_chk_ready(ctx))) { + nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL); + } } @@ -2621,7 +2652,7 @@ nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req, (int) m.mmap_msg.size); res = nxt_unit_port_send(req->ctx, req->response_port, &m, sizeof(m), - NULL, 0); + NULL); if (nxt_slow_path(res != sizeof(m))) { goto free_buf; } @@ -2673,8 +2704,8 @@ nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req, res = nxt_unit_port_send(req->ctx, req->response_port, buf->start - sizeof(m.msg), - m.mmap_msg.size + sizeof(m.msg), - NULL, 0); + m.mmap_msg.size + sizeof(m.msg), NULL); + if (nxt_slow_path(res != (ssize_t) (m.mmap_msg.size + sizeof(m.msg)))) { goto free_buf; } @@ -2741,7 +2772,7 @@ nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx) pthread_mutex_unlock(&ctx_impl->mutex); - memset(rbuf->oob, 0, sizeof(struct cmsghdr)); + rbuf->oob.size = 0; return rbuf; } @@ -3260,7 +3291,7 @@ skip_response_send: msg.tracking = 0; (void) nxt_unit_port_send(req->ctx, req->response_port, - &msg, sizeof(msg), NULL, 0); + &msg, sizeof(msg), NULL); nxt_unit_request_info_release(req); } @@ -3582,7 +3613,7 @@ nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) msg.mf = 0; msg.tracking = 0; - res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL, 0); + res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL); if (nxt_slow_path(res != sizeof(msg))) { return NXT_UNIT_ERROR; } @@ -3851,12 +3882,10 @@ static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int fd) { ssize_t res; + nxt_send_oob_t oob; nxt_port_msg_t msg; nxt_unit_impl_t *lib; - union { - struct cmsghdr cm; - char space[CMSG_SPACE(sizeof(int))]; - } cmsg; + int fds[2] = {fd, -1}; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); @@ -3870,30 +3899,9 @@ nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int fd) msg.mf = 0; msg.tracking = 0; - /* - * Fill all padding fields with 0. - * Code in Go 1.11 validate cmsghdr using padding field as part of len. - * See Cmsghdr definition and socketControlMessageHeaderAndData function. - */ - memset(&cmsg, 0, sizeof(cmsg)); - - cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int)); - cmsg.cm.cmsg_level = SOL_SOCKET; - cmsg.cm.cmsg_type = SCM_RIGHTS; - - /* - * memcpy() is used instead of simple - * *(int *) CMSG_DATA(&cmsg.cm) = fd; - * because GCC 4.4 with -O2/3/s optimization may issue a warning: - * dereferencing type-punned pointer will break strict-aliasing rules - * - * Fortunately, GCC with -O1 compiles this nxt_memcpy() - * in the same simple assignment as in the code above. - */ - memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int)); + nxt_socket_msg_oob_init(&oob, fds); - res = nxt_unit_port_send(ctx, port, &msg, sizeof(msg), - &cmsg, sizeof(cmsg)); + res = nxt_unit_port_send(ctx, port, &msg, sizeof(msg), &oob); if (nxt_slow_path(res != sizeof(msg))) { return NXT_UNIT_ERROR; } @@ -4083,7 +4091,7 @@ nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx, nxt_unit_ctx_impl_t *ctx_impl) msg.type = _NXT_PORT_MSG_RPC_READY; (void) nxt_unit_port_send(ctx, ctx_impl->read_port, - &msg, sizeof(msg), NULL, 0); + &msg, sizeof(msg), NULL); } @@ -4306,7 +4314,7 @@ nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id) nxt_unit_debug(ctx, "get_mmap: %d %d", (int) pid, (int) id); - res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL, 0); + res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL); if (nxt_slow_path(res != sizeof(m))) { return NXT_UNIT_ERROR; } @@ -4376,7 +4384,7 @@ nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid) msg.mf = 0; msg.tracking = 0; - res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL, 0); + res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL); if (nxt_slow_path(res != sizeof(msg))) { return NXT_UNIT_ERROR; } @@ -4522,7 +4530,7 @@ nxt_unit_run(nxt_unit_ctx_t *ctx) rc = nxt_unit_run_once_impl(ctx); if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { - nxt_unit_quit(ctx); + nxt_unit_quit(ctx, NXT_QUIT_NORMAL); break; } } @@ -4586,6 +4594,7 @@ static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) { int nevents, res, err; + nxt_uint_t nfds; nxt_unit_impl_t *lib; nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_port_impl_t *port_impl; @@ -4593,7 +4602,7 @@ nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); - if (ctx_impl->wait_items > 0 || ctx_impl->ready == 0) { + if (ctx_impl->wait_items > 0 || !nxt_unit_chk_ready(ctx)) { return nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); } @@ -4626,20 +4635,28 @@ retry: } } - res = nxt_unit_app_queue_recv(lib->shared_port, rbuf); - if (res == NXT_UNIT_OK) { - return NXT_UNIT_OK; + if (nxt_fast_path(nxt_unit_chk_ready(ctx))) { + res = nxt_unit_app_queue_recv(ctx, lib->shared_port, rbuf); + if (res == NXT_UNIT_OK) { + return NXT_UNIT_OK; + } + + fds[1].fd = lib->shared_port->in_fd; + fds[1].events = POLLIN; + + nfds = 2; + + } else { + nfds = 1; } fds[0].fd = ctx_impl->read_port->in_fd; fds[0].events = POLLIN; fds[0].revents = 0; - fds[1].fd = lib->shared_port->in_fd; - fds[1].events = POLLIN; fds[1].revents = 0; - nevents = poll(fds, 2, -1); + nevents = poll(fds, nfds, -1); if (nxt_slow_path(nevents == -1)) { err = errno; @@ -4655,7 +4672,7 @@ retry: return (err == EAGAIN) ? NXT_UNIT_AGAIN : NXT_UNIT_ERROR; } - nxt_unit_debug(ctx, "poll(%d,%d): %d, revents [%04uXi, %04uXi]", + nxt_unit_debug(ctx, "poll(%d,%d): %d, revents [%04X, %04X]", fds[0].fd, fds[1].fd, nevents, fds[0].revents, fds[1].revents); @@ -4686,6 +4703,21 @@ retry: static int +nxt_unit_chk_ready(nxt_unit_ctx_t *ctx) +{ + nxt_unit_impl_t *lib; + nxt_unit_ctx_impl_t *ctx_impl; + + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + + return (ctx_impl->ready + && (lib->request_limit == 0 + || lib->request_count < lib->request_limit)); +} + + +static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx) { int rc; @@ -4723,6 +4755,10 @@ nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx) } nxt_queue_loop; + if (!ctx_impl->ready) { + nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL); + } + return rc; } @@ -4903,16 +4939,14 @@ nxt_unit_run_shared(nxt_unit_ctx_t *ctx) int rc; nxt_unit_impl_t *lib; nxt_unit_read_buf_t *rbuf; - nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_ctx_use(ctx); lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); rc = NXT_UNIT_OK; - while (nxt_fast_path(ctx_impl->online)) { + while (nxt_fast_path(nxt_unit_chk_ready(ctx))) { rbuf = nxt_unit_read_buf_get(ctx); if (nxt_slow_path(rbuf == NULL)) { rc = NXT_UNIT_ERROR; @@ -4949,17 +4983,15 @@ nxt_unit_dequeue_request(nxt_unit_ctx_t *ctx) int rc; nxt_unit_impl_t *lib; nxt_unit_read_buf_t *rbuf; - nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_request_info_t *req; nxt_unit_ctx_use(ctx); lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); req = NULL; - if (nxt_slow_path(!ctx_impl->online)) { + if (nxt_slow_path(!nxt_unit_chk_ready(ctx))) { goto done; } @@ -4968,7 +5000,7 @@ nxt_unit_dequeue_request(nxt_unit_ctx_t *ctx) goto done; } - rc = nxt_unit_app_queue_recv(lib->shared_port, rbuf); + rc = nxt_unit_app_queue_recv(ctx, lib->shared_port, rbuf); if (rc != NXT_UNIT_OK) { nxt_unit_read_buf_release(ctx, rbuf); goto done; @@ -4985,17 +5017,6 @@ done: int -nxt_unit_is_main_ctx(nxt_unit_ctx_t *ctx) -{ - nxt_unit_impl_t *lib; - - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - - return (ctx == &lib->main_ctx.ctx); -} - - -int nxt_unit_process_port_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) { int rc; @@ -5017,13 +5038,17 @@ nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) nxt_unit_impl_t *lib; nxt_unit_read_buf_t *rbuf; + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + + if (port == lib->shared_port && !nxt_unit_chk_ready(ctx)) { + return NXT_UNIT_AGAIN; + } + rbuf = nxt_unit_read_buf_get(ctx); if (nxt_slow_path(rbuf == NULL)) { return NXT_UNIT_ERROR; } - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - if (port == lib->shared_port) { rc = nxt_unit_shared_port_recv(ctx, port, rbuf); @@ -5194,7 +5219,7 @@ nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl) pthread_mutex_unlock(&lib->mutex); if (nxt_fast_path(ctx_impl->read_port != NULL)) { - nxt_unit_remove_port(lib, &ctx_impl->read_port->id); + nxt_unit_remove_port(lib, NULL, &ctx_impl->read_port->id); nxt_unit_port_release(ctx_impl->read_port); } @@ -5246,6 +5271,24 @@ nxt_unit_create_port(nxt_unit_ctx_t *ctx) return NULL; } +#if (NXT_HAVE_SOCKOPT_SO_PASSCRED) + int enable_creds = 1; + + if (nxt_slow_path(setsockopt(port_sockets[0], SOL_SOCKET, SO_PASSCRED, + &enable_creds, sizeof(enable_creds)) == -1)) + { + nxt_unit_warn(ctx, "failed to set SO_PASSCRED %s", strerror(errno)); + return NULL; + } + + if (nxt_slow_path(setsockopt(port_sockets[1], SOL_SOCKET, SO_PASSCRED, + &enable_creds, sizeof(enable_creds)) == -1)) + { + nxt_unit_warn(ctx, "failed to set SO_PASSCRED %s", strerror(errno)); + return NULL; + } +#endif + nxt_unit_debug(ctx, "create_port: new socketpair: %d->%d", port_sockets[0], port_sockets[1]); @@ -5286,6 +5329,7 @@ nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst, nxt_unit_port_t *port, int queue_fd) { ssize_t res; + nxt_send_oob_t oob; nxt_unit_impl_t *lib; int fds[2] = { port->out_fd, queue_fd }; @@ -5294,11 +5338,6 @@ nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst, nxt_port_msg_new_port_t new_port; } m; - union { - struct cmsghdr cm; - char space[CMSG_SPACE(sizeof(int) * 2)]; - } cmsg; - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); m.msg.stream = 0; @@ -5317,24 +5356,9 @@ nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst, m.new_port.max_size = 16 * 1024; m.new_port.max_share = 64 * 1024; - memset(&cmsg, 0, sizeof(cmsg)); - - cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int) * 2); - cmsg.cm.cmsg_level = SOL_SOCKET; - cmsg.cm.cmsg_type = SCM_RIGHTS; - - /* - * memcpy() is used instead of simple - * *(int *) CMSG_DATA(&cmsg.cm) = fd; - * because GCC 4.4 with -O2/3/s optimization may issue a warning: - * dereferencing type-punned pointer will break strict-aliasing rules - * - * Fortunately, GCC with -O1 compiles this nxt_memcpy() - * in the same simple assignment as in the code above. - */ - memcpy(CMSG_DATA(&cmsg.cm), fds, sizeof(int) * 2); + nxt_socket_msg_oob_init(&oob, fds); - res = nxt_unit_port_send(ctx, dst, &m, sizeof(m), &cmsg, sizeof(cmsg)); + res = nxt_unit_port_send(ctx, dst, &m, sizeof(m), &oob); return (res == sizeof(m)) ? NXT_UNIT_OK : NXT_UNIT_ERROR; } @@ -5605,7 +5629,8 @@ nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx, nxt_queue_t *awaiting_req) static void -nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id) +nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_ctx_t *ctx, + nxt_unit_port_id_t *port_id) { nxt_unit_port_t *port; nxt_unit_port_impl_t *port_impl; @@ -5623,7 +5648,7 @@ nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id) pthread_mutex_unlock(&lib->mutex); if (lib->callbacks.remove_port != NULL && port != NULL) { - lib->callbacks.remove_port(&lib->unit, port); + lib->callbacks.remove_port(&lib->unit, ctx, port); } if (nxt_fast_path(port != NULL)) { @@ -5700,7 +5725,7 @@ nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process) nxt_queue_remove(&port->link); if (lib->callbacks.remove_port != NULL) { - lib->callbacks.remove_port(&lib->unit, &port->port); + lib->callbacks.remove_port(&lib->unit, NULL, &port->port); } nxt_unit_port_release(&port->port); @@ -5712,56 +5737,96 @@ nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process) static void -nxt_unit_quit(nxt_unit_ctx_t *ctx) +nxt_unit_quit(nxt_unit_ctx_t *ctx, uint8_t quit_param) { - nxt_port_msg_t msg; + nxt_bool_t skip_graceful_broadcast, quit; nxt_unit_impl_t *lib; nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_callbacks_t *cb; nxt_unit_request_info_t *req; nxt_unit_request_info_impl_t *req_impl; + struct { + nxt_port_msg_t msg; + uint8_t quit_param; + } nxt_packed m; + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); - if (!ctx_impl->online) { + nxt_unit_debug(ctx, "quit: %d/%d/%d", (int) quit_param, ctx_impl->ready, + ctx_impl->online); + + if (nxt_slow_path(!ctx_impl->online)) { return; } - ctx_impl->online = 0; + skip_graceful_broadcast = quit_param == NXT_QUIT_GRACEFUL + && !ctx_impl->ready; cb = &lib->callbacks; - if (cb->quit != NULL) { - cb->quit(ctx); + if (nxt_fast_path(ctx_impl->ready)) { + ctx_impl->ready = 0; + + if (cb->remove_port != NULL) { + cb->remove_port(&lib->unit, ctx, lib->shared_port); + } } - nxt_queue_each(req_impl, &ctx_impl->active_req, - nxt_unit_request_info_impl_t, link) - { - req = &req_impl->req; + if (quit_param == NXT_QUIT_GRACEFUL) { + pthread_mutex_lock(&ctx_impl->mutex); - nxt_unit_req_warn(req, "active request on ctx quit"); + quit = nxt_queue_is_empty(&ctx_impl->active_req) + && nxt_queue_is_empty(&ctx_impl->pending_rbuf) + && ctx_impl->wait_items == 0; - if (cb->close_handler) { - nxt_unit_req_debug(req, "close_handler"); + pthread_mutex_unlock(&ctx_impl->mutex); - cb->close_handler(req); + } else { + quit = 1; + ctx_impl->quit_param = NXT_QUIT_GRACEFUL; + } - } else { - nxt_unit_request_done(req, NXT_UNIT_ERROR); + if (quit) { + ctx_impl->online = 0; + + if (cb->quit != NULL) { + cb->quit(ctx); } - } nxt_queue_loop; + nxt_queue_each(req_impl, &ctx_impl->active_req, + nxt_unit_request_info_impl_t, link) + { + req = &req_impl->req; - if (ctx != &lib->main_ctx.ctx) { + nxt_unit_req_warn(req, "active request on ctx quit"); + + if (cb->close_handler) { + nxt_unit_req_debug(req, "close_handler"); + + cb->close_handler(req); + + } else { + nxt_unit_request_done(req, NXT_UNIT_ERROR); + } + + } nxt_queue_loop; + + if (nxt_fast_path(ctx_impl->read_port != NULL)) { + nxt_unit_remove_port(lib, ctx, &ctx_impl->read_port->id); + } + } + + if (ctx != &lib->main_ctx.ctx || skip_graceful_broadcast) { return; } - memset(&msg, 0, sizeof(nxt_port_msg_t)); + memset(&m.msg, 0, sizeof(nxt_port_msg_t)); - msg.pid = lib->pid; - msg.type = _NXT_PORT_MSG_QUIT; + m.msg.pid = lib->pid; + m.msg.type = _NXT_PORT_MSG_QUIT; + m.quit_param = quit_param; pthread_mutex_lock(&lib->mutex); @@ -5775,7 +5840,7 @@ nxt_unit_quit(nxt_unit_ctx_t *ctx) } (void) nxt_unit_port_send(ctx, ctx_impl->read_port, - &msg, sizeof(msg), NULL, 0); + &m, sizeof(m), NULL); } nxt_queue_loop; @@ -5810,7 +5875,7 @@ nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) nxt_unit_debug(ctx, "get_port: %d %d", (int) port_id->pid, (int) port_id->id); - res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL, 0); + res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL); if (nxt_slow_path(res != sizeof(m))) { return NXT_UNIT_ERROR; } @@ -5821,7 +5886,7 @@ nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, - const void *buf, size_t buf_size, const void *oob, size_t oob_size) + const void *buf, size_t buf_size, const nxt_send_oob_t *oob) { int notify; ssize_t ret; @@ -5833,7 +5898,7 @@ nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); - if (port_impl->queue != NULL && oob_size == 0 + if (port_impl->queue != NULL && (oob == NULL || oob->size == 0) && buf_size <= NXT_PORT_QUEUE_MSG_SIZE) { rc = nxt_port_queue_send(port_impl->queue, buf, buf_size, ¬ify); @@ -5855,7 +5920,7 @@ nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, if (lib->callbacks.port_send == NULL) { ret = nxt_unit_sendmsg(ctx, port->out_fd, &msg, - sizeof(nxt_port_msg_t), NULL, 0); + sizeof(nxt_port_msg_t), NULL); nxt_unit_debug(ctx, "port{%d,%d} send %d read_queue", (int) port->id.pid, (int) port->id.id, @@ -5892,15 +5957,15 @@ nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, if (lib->callbacks.port_send != NULL) { ret = lib->callbacks.port_send(ctx, port, buf, buf_size, - oob, oob_size); + oob != NULL ? oob->buf : NULL, + oob != NULL ? oob->size : 0); nxt_unit_debug(ctx, "port{%d,%d} sendcb %d", (int) port->id.pid, (int) port->id.id, (int) ret); } else { - ret = nxt_unit_sendmsg(ctx, port->out_fd, buf, buf_size, - oob, oob_size); + ret = nxt_unit_sendmsg(ctx, port->out_fd, buf, buf_size, oob); nxt_unit_debug(ctx, "port{%d,%d} sendmsg %d", (int) port->id.pid, (int) port->id.id, @@ -5913,29 +5978,20 @@ nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd, - const void *buf, size_t buf_size, const void *oob, size_t oob_size) + const void *buf, size_t buf_size, const nxt_send_oob_t *oob) { int err; - ssize_t res; + ssize_t n; struct iovec iov[1]; - struct msghdr msg; iov[0].iov_base = (void *) buf; iov[0].iov_len = buf_size; - msg.msg_name = NULL; - msg.msg_namelen = 0; - msg.msg_iov = iov; - msg.msg_iovlen = 1; - msg.msg_flags = 0; - msg.msg_control = (void *) oob; - msg.msg_controllen = oob_size; - retry: - res = sendmsg(fd, &msg, 0); + n = nxt_sendmsg(fd, iov, 1, oob); - if (nxt_slow_path(res == -1)) { + if (nxt_slow_path(n == -1)) { err = errno; if (err == EINTR) { @@ -5950,11 +6006,11 @@ retry: fd, (int) buf_size, strerror(err), err); } else { - nxt_unit_debug(ctx, "sendmsg(%d, %d): %d", fd, (int) buf_size, - (int) res); + nxt_unit_debug(ctx, "sendmsg(%d, %d, %d): %d", fd, (int) buf_size, + (oob != NULL ? (int) oob->size : 0), (int) n); } - return res; + return n; } @@ -6063,7 +6119,7 @@ retry: nxt_unit_rbuf_cpy(port_impl->socket_rbuf, rbuf); - memset(rbuf->oob, 0, sizeof(struct cmsghdr)); + rbuf->oob.size = 0; goto retry; } @@ -6074,7 +6130,8 @@ nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst, nxt_unit_read_buf_t *src) { memcpy(dst->buf, src->buf, src->size); dst->size = src->size; - memcpy(dst->oob, src->oob, sizeof(src->oob)); + dst->oob.size = src->oob.size; + memcpy(dst->oob.buf, src->oob.buf, src->oob.size); } @@ -6089,7 +6146,11 @@ nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, retry: - res = nxt_unit_app_queue_recv(port, rbuf); + res = nxt_unit_app_queue_recv(ctx, port, rbuf); + + if (res == NXT_UNIT_OK) { + return NXT_UNIT_OK; + } if (res == NXT_UNIT_AGAIN) { res = nxt_unit_port_recv(ctx, port, rbuf); @@ -6116,16 +6177,18 @@ nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf) { int fd, err; + size_t oob_size; struct iovec iov[1]; - struct msghdr msg; nxt_unit_impl_t *lib; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); if (lib->callbacks.port_recv != NULL) { + oob_size = sizeof(rbuf->oob.buf); + rbuf->size = lib->callbacks.port_recv(ctx, port, rbuf->buf, sizeof(rbuf->buf), - rbuf->oob, sizeof(rbuf->oob)); + rbuf->oob.buf, &oob_size); nxt_unit_debug(ctx, "port{%d,%d} recvcb %d", (int) port->id.pid, (int) port->id.id, (int) rbuf->size); @@ -6134,25 +6197,18 @@ nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, return NXT_UNIT_ERROR; } + rbuf->oob.size = oob_size; return NXT_UNIT_OK; } iov[0].iov_base = rbuf->buf; iov[0].iov_len = sizeof(rbuf->buf); - msg.msg_name = NULL; - msg.msg_namelen = 0; - msg.msg_iov = iov; - msg.msg_iovlen = 1; - msg.msg_flags = 0; - msg.msg_control = rbuf->oob; - msg.msg_controllen = sizeof(rbuf->oob); - fd = port->in_fd; retry: - rbuf->size = recvmsg(fd, &msg, 0); + rbuf->size = nxt_recvmsg(fd, iov, 1, &rbuf->oob); if (nxt_slow_path(rbuf->size == -1)) { err = errno; @@ -6194,13 +6250,20 @@ nxt_unit_port_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf) static int -nxt_unit_app_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf) +nxt_unit_app_queue_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, + nxt_unit_read_buf_t *rbuf) { uint32_t cookie; nxt_port_msg_t *port_msg; nxt_app_queue_t *queue; + nxt_unit_impl_t *lib; nxt_unit_port_impl_t *port_impl; + struct { + nxt_port_msg_t msg; + uint8_t quit_param; + } nxt_packed m; + port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); queue = port_impl->queue; @@ -6214,6 +6277,25 @@ retry: port_msg = (nxt_port_msg_t *) rbuf->buf; if (nxt_app_queue_cancel(queue, cookie, port_msg->stream)) { + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + + if (lib->request_limit != 0) { + nxt_atomic_fetch_add(&lib->request_count, 1); + + if (nxt_slow_path(lib->request_count >= lib->request_limit)) { + nxt_unit_debug(ctx, "request limit reached"); + + memset(&m.msg, 0, sizeof(nxt_port_msg_t)); + + m.msg.pid = lib->pid; + m.msg.type = _NXT_PORT_MSG_QUIT; + m.quit_param = NXT_QUIT_GRACEFUL; + + (void) nxt_unit_port_send(ctx, lib->main_ctx.read_port, + &m, sizeof(m), NULL); + } + } + return NXT_UNIT_OK; } @@ -6495,7 +6577,7 @@ nxt_unit_log(nxt_unit_ctx_t *ctx, int level, const char *fmt, ...) log_fd = lib->log_fd; } else { - pid = getpid(); + pid = nxt_unit_pid; log_fd = STDERR_FILENO; } @@ -6539,7 +6621,7 @@ nxt_unit_req_log(nxt_unit_request_info_t *req, int level, const char *fmt, ...) log_fd = lib->log_fd; } else { - pid = getpid(); + pid = nxt_unit_pid; log_fd = STDERR_FILENO; } diff --git a/src/nxt_unit.h b/src/nxt_unit.h index 1e1a8dbe..1b5280af 100644 --- a/src/nxt_unit.h +++ b/src/nxt_unit.h @@ -136,7 +136,8 @@ struct nxt_unit_callbacks_s { int (*add_port)(nxt_unit_ctx_t *, nxt_unit_port_t *port); /* Remove previously added port. Optional. */ - void (*remove_port)(nxt_unit_t *, nxt_unit_port_t *port); + void (*remove_port)(nxt_unit_t *, nxt_unit_ctx_t *, + nxt_unit_port_t *port); /* Remove all data associated with process pid including ports. Optional. */ void (*remove_pid)(nxt_unit_t *, pid_t pid); @@ -154,7 +155,7 @@ struct nxt_unit_callbacks_s { /* Receive data on port id. Optional. */ ssize_t (*port_recv)(nxt_unit_ctx_t *, nxt_unit_port_t *port, - void *buf, size_t buf_size, void *oob, size_t oob_size); + void *buf, size_t buf_size, void *oob, size_t *oob_size); int (*ready_handler)(nxt_unit_ctx_t *); }; @@ -167,6 +168,7 @@ struct nxt_unit_init_s { uint32_t request_data_size; uint32_t shm_limit; + uint32_t request_limit; nxt_unit_callbacks_t callbacks; @@ -215,8 +217,6 @@ int nxt_unit_run_shared(nxt_unit_ctx_t *ctx); nxt_unit_request_info_t *nxt_unit_dequeue_request(nxt_unit_ctx_t *ctx); -int nxt_unit_is_main_ctx(nxt_unit_ctx_t *ctx); - /* * Receive and process one message, invoke configured callbacks. * diff --git a/src/nxt_upstream.c b/src/nxt_upstream.c index de9b1d49..17593173 100644 --- a/src/nxt_upstream.c +++ b/src/nxt_upstream.c @@ -141,6 +141,11 @@ static nxt_http_action_t * nxt_upstream_handler(nxt_task_t *task, nxt_http_request_t *r, nxt_http_action_t *action) { - return nxt_upstream_proxy_handler(task, r, - r->conf->upstreams[action->u.upstream_number]); + nxt_upstream_t *u; + + u = r->conf->upstreams[action->u.upstream_number]; + + nxt_debug(task, "upstream handler: \"%V\"", &u->name); + + return nxt_upstream_proxy_handler(task, r, u); } diff --git a/src/nxt_var.c b/src/nxt_var.c index 2731fd09..60650ef4 100644 --- a/src/nxt_var.c +++ b/src/nxt_var.c @@ -7,21 +7,28 @@ struct nxt_var_s { - size_t plain; - nxt_uint_t vars; - u_char data[]; + size_t length; + nxt_uint_t vars; + uint8_t strz; /* 1 bit */ + u_char data[]; /* - uint32_t indexes[vars]; - size_t positions[vars]; - u_char chars[plain]; + nxt_var_sub_t subs[vars]; + u_char raw[length]; */ }; typedef struct { - nxt_var_t *var; - nxt_str_t *value; + uint32_t index; + uint32_t length; + uint32_t position; +} nxt_var_sub_t; + + +typedef struct { + nxt_var_t *var; + nxt_str_t *value; } nxt_var_value_t; @@ -43,13 +50,10 @@ struct nxt_var_query_s { }; -#define nxt_var_indexes(var) ((uint32_t *) (var)->data) +#define nxt_var_subs(var) ((nxt_var_sub_t *) (var)->data) -#define nxt_var_positions(var) \ - ((size_t *) ((var)->data + (var)->vars * sizeof(uint32_t))) - -#define nxt_var_plain_start(var) \ - ((var)->data + (var)->vars * (sizeof(uint32_t) + sizeof(size_t))) +#define nxt_var_raw_start(var) \ + ((var)->data + (var)->vars * sizeof(nxt_var_sub_t)) static nxt_int_t nxt_var_hash_test(nxt_lvlhsh_query_t *lhq, void *data); @@ -87,6 +91,21 @@ static uint32_t nxt_var_count; static nxt_var_handler_t *nxt_var_index; +void +nxt_var_raw(nxt_var_t *var, nxt_str_t *str) +{ + str->length = var->length; + str->start = nxt_var_raw_start(var); +} + + +nxt_bool_t +nxt_var_is_const(nxt_var_t *var) +{ + return (var->vars == 0); +} + + static nxt_int_t nxt_var_hash_test(nxt_lvlhsh_query_t *lhq, void *data) { @@ -211,18 +230,17 @@ nxt_var_index_init(void) nxt_var_t * -nxt_var_compile(nxt_str_t *str, nxt_mp_t *mp) +nxt_var_compile(nxt_str_t *str, nxt_mp_t *mp, nxt_bool_t strz) { - u_char *p, *end, *plain_pos; - size_t plain, size, *positions; - uint32_t *indexes; - nxt_var_t *var; - nxt_str_t part; - nxt_uint_t n; - nxt_bool_t is_var; - nxt_var_decl_t *decl; + u_char *p, *end, *next, *src; + size_t size; + nxt_var_t *var; + nxt_str_t part; + nxt_uint_t n; + nxt_bool_t is_var; + nxt_var_sub_t *subs; + nxt_var_decl_t *decl; - plain = 0; n = 0; p = str->start; @@ -230,59 +248,55 @@ nxt_var_compile(nxt_str_t *str, nxt_mp_t *mp) while (p < end) { p = nxt_var_next_part(p, end - p, &part, &is_var); - if (nxt_slow_path(p == NULL)) { return NULL; } if (is_var) { n++; - - } else { - plain += part.length; } } - size = sizeof(nxt_var_t) - + n * (sizeof(nxt_var_handler_t) + sizeof (size_t)) - + plain; + size = sizeof(nxt_var_t) + n * sizeof(nxt_var_sub_t) + str->length; - var = nxt_mp_get(mp, size); + var = nxt_mp_get(mp, size + strz); if (nxt_slow_path(var == NULL)) { return NULL; } - var->plain = plain; + var->length = str->length; var->vars = n; + var->strz = strz; - indexes = nxt_var_indexes(var); - positions = nxt_var_positions(var); - plain_pos = nxt_var_plain_start(var); + subs = nxt_var_subs(var); + src = nxt_var_raw_start(var); - plain = 0; - n = 0; + nxt_memcpy(src, str->start, str->length); + if (strz) { + src[str->length] = '\0'; + } + + n = 0; p = str->start; while (p < end) { - p = nxt_var_next_part(p, end - p, &part, &is_var); + next = nxt_var_next_part(p, end - p, &part, &is_var); if (is_var) { decl = nxt_var_hash_find(&part); - if (nxt_slow_path(decl == NULL)) { return NULL; } - indexes[n] = decl->index; - positions[n] = plain; + subs[n].index = decl->index; + subs[n].length = next - p; + subs[n].position = p - str->start; n++; - - } else { - plain_pos = nxt_cpymem(plain_pos, part.start, part.length); - plain += part.length; } + + p = next; } return var; @@ -438,16 +452,16 @@ void nxt_var_query(nxt_task_t *task, nxt_var_query_t *query, nxt_var_t *var, nxt_str_t *str) { - uint32_t *indexes; - nxt_mp_t *mp; - nxt_str_t *value; - nxt_int_t ret; - nxt_uint_t i; - nxt_var_value_t *val; - - if (var->vars == 0) { - str->length = var->plain; - str->start = nxt_var_plain_start(var); + uint32_t index; + nxt_mp_t *mp; + nxt_str_t *value; + nxt_int_t ret; + nxt_uint_t i; + nxt_var_sub_t *subs; + nxt_var_value_t *val; + + if (nxt_var_is_const(var)) { + nxt_var_raw(var, str); return; } @@ -456,7 +470,7 @@ nxt_var_query(nxt_task_t *task, nxt_var_query_t *query, nxt_var_t *var, } mp = query->values.mem_pool; - indexes = nxt_var_indexes(var); + subs = nxt_var_subs(var); value = query->spare; for (i = 0; i < var->vars; i++) { @@ -468,7 +482,9 @@ nxt_var_query(nxt_task_t *task, nxt_var_query_t *query, nxt_var_t *var, } } - ret = nxt_var_cache_add(&query->cache, indexes[i], value, mp); + index = subs[i].index; + + ret = nxt_var_cache_add(&query->cache, index, value, mp); if (ret != NXT_OK) { if (nxt_slow_path(ret == NXT_ERROR)) { @@ -478,7 +494,7 @@ nxt_var_query(nxt_task_t *task, nxt_var_query_t *query, nxt_var_t *var, continue; /* NXT_DECLINED */ } - ret = nxt_var_index[indexes[i]](task, query, value, query->ctx); + ret = nxt_var_index[index](task, query, value, query->ctx); value = NULL; @@ -538,13 +554,13 @@ nxt_var_query_handle(nxt_task_t *task, nxt_var_query_t *query, static void nxt_var_query_finish(nxt_task_t *task, nxt_var_query_t *query) { - u_char *p, *src; - size_t length, plain, next, *positions; - uint32_t *indexes; - nxt_str_t *str, **part; - nxt_var_t *var; - nxt_uint_t i, j; - nxt_var_value_t *val; + u_char *p, *src; + size_t length, last, next; + nxt_str_t *str, **part; + nxt_var_t *var; + nxt_uint_t i, j; + nxt_var_sub_t *subs; + nxt_var_value_t *val; if (query->failed) { goto done; @@ -555,11 +571,11 @@ nxt_var_query_finish(nxt_task_t *task, nxt_var_query_t *query) for (i = 0; i < query->values.nelts; i++) { var = val[i].var; - length = var->plain; - indexes = nxt_var_indexes(var); + subs = nxt_var_subs(var); + length = var->length; for (j = 0; j < var->vars; j++) { - str = nxt_var_cache_find(&query->cache, indexes[j]); + str = nxt_var_cache_find(&query->cache, subs[j].index); nxt_assert(str != NULL); @@ -572,10 +588,10 @@ nxt_var_query_finish(nxt_task_t *task, nxt_var_query_t *query) *part = str; - length += str->length; + length += str->length - subs[j].length; } - p = nxt_mp_nget(query->values.mem_pool, length); + p = nxt_mp_nget(query->values.mem_pool, length + var->strz); if (nxt_slow_path(p == NULL)) { query->failed = 1; goto done; @@ -585,27 +601,33 @@ nxt_var_query_finish(nxt_task_t *task, nxt_var_query_t *query) val[i].value->start = p; part = query->parts.elts; - positions = nxt_var_positions(var); - src = nxt_var_plain_start(var); + src = nxt_var_raw_start(var); - plain = 0; + last = 0; for (j = 0; j < var->vars; j++) { - next = positions[j]; + next = subs[j].position; - if (next != plain) { - p = nxt_cpymem(p, &src[plain], next - plain); - plain = next; + if (next != last) { + p = nxt_cpymem(p, &src[last], next - last); } p = nxt_cpymem(p, part[j]->start, part[j]->length); + + last = next + subs[j].length; + } + + if (last != var->length) { + p = nxt_cpymem(p, &src[last], var->length - last); } - if (plain != var->plain) { - nxt_memcpy(p, &src[plain], var->plain - plain); + if (var->strz) { + *p = '\0'; } nxt_array_reset(&query->parts); + + nxt_debug(task, "var: \"%*s\" -> \"%V\"", length, src, val[i].value); } done: diff --git a/src/nxt_var.h b/src/nxt_var.h index 7e0a2a21..3b7d0c28 100644 --- a/src/nxt_var.h +++ b/src/nxt_var.h @@ -30,9 +30,12 @@ nxt_is_var(nxt_str_t *str) } +void nxt_var_raw(nxt_var_t *var, nxt_str_t *str); +nxt_bool_t nxt_var_is_const(nxt_var_t *var); + nxt_int_t nxt_var_register(nxt_var_decl_t *decl, size_t n); nxt_int_t nxt_var_index_init(void); -nxt_var_t *nxt_var_compile(nxt_str_t *str, nxt_mp_t *mp); +nxt_var_t *nxt_var_compile(nxt_str_t *str, nxt_mp_t *mp, nxt_bool_t strz); nxt_int_t nxt_var_test(nxt_str_t *str, u_char *error); nxt_int_t nxt_var_query_init(nxt_var_query_t **query_p, void *ctx, diff --git a/src/perl/nxt_perl_psgi.c b/src/perl/nxt_perl_psgi.c index 5df1465d..02555c96 100644 --- a/src/perl/nxt_perl_psgi.c +++ b/src/perl/nxt_perl_psgi.c @@ -1184,13 +1184,12 @@ nxt_perl_psgi_start(nxt_task_t *task, nxt_process_data_t *data) goto fail; } - nxt_unit_default_init(task, &perl_init); + nxt_unit_default_init(task, &perl_init, common_conf); perl_init.callbacks.request_handler = nxt_perl_psgi_request_handler; perl_init.callbacks.ready_handler = nxt_perl_psgi_ready_handler; perl_init.data = c; perl_init.ctx_data = &pctx; - perl_init.shm_limit = common_conf->shm_limit; unit_ctx = nxt_unit_init(&perl_init); if (nxt_slow_path(unit_ctx == NULL)) { @@ -1292,11 +1291,6 @@ nxt_perl_psgi_ready_handler(nxt_unit_ctx_t *ctx) nxt_perl_app_conf_t *c; nxt_perl_psgi_ctx_t *pctx; - /* Worker thread context. */ - if (!nxt_unit_is_main_ctx(ctx)) { - return NXT_UNIT_OK; - } - c = ctx->unit->data; if (c->threads <= 1) { diff --git a/src/python/nxt_python.c b/src/python/nxt_python.c index abb04194..8687c869 100644 --- a/src/python/nxt_python.c +++ b/src/python/nxt_python.c @@ -230,10 +230,9 @@ nxt_python_start(nxt_task_t *task, nxt_process_data_t *data) } } - nxt_unit_default_init(task, &python_init); + nxt_unit_default_init(task, &python_init, data->app); python_init.data = c; - python_init.shm_limit = data->app->shm_limit; python_init.callbacks.ready_handler = nxt_python_ready_handler; proto = c->protocol; @@ -522,18 +521,6 @@ nxt_python_ready_handler(nxt_unit_ctx_t *ctx) nxt_py_thread_info_t *ti; nxt_python_app_conf_t *c; - if (nxt_py_proto.ready != NULL) { - res = nxt_py_proto.ready(ctx); - if (nxt_slow_path(res != NXT_UNIT_OK)) { - return NXT_UNIT_ERROR; - } - } - - /* Worker thread context. */ - if (!nxt_unit_is_main_ctx(ctx)) { - return NXT_UNIT_OK; - } - c = ctx->unit->data; if (c->threads <= 1) { diff --git a/src/python/nxt_python.h b/src/python/nxt_python.h index e4eac9dc..eddb1cfc 100644 --- a/src/python/nxt_python.h +++ b/src/python/nxt_python.h @@ -64,7 +64,6 @@ typedef struct { void (*ctx_data_free)(void *data); int (*startup)(void *data); int (*run)(nxt_unit_ctx_t *ctx); - int (*ready)(nxt_unit_ctx_t *ctx); void (*done)(void); } nxt_python_proto_t; diff --git a/src/python/nxt_python_asgi.c b/src/python/nxt_python_asgi.c index 26003805..354e3a81 100644 --- a/src/python/nxt_python_asgi.c +++ b/src/python/nxt_python_asgi.c @@ -33,10 +33,10 @@ static PyObject *nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len, static PyObject *nxt_py_asgi_create_header(nxt_unit_field_t *f); static PyObject *nxt_py_asgi_create_subprotocols(nxt_unit_field_t *f); -static int nxt_python_asgi_ready(nxt_unit_ctx_t *ctx); - static int nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); -static void nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_port_t *port); +static int nxt_py_asgi_add_reader(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); +static void nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_ctx_t *ctx, + nxt_unit_port_t *port); static void nxt_py_asgi_quit(nxt_unit_ctx_t *ctx); static void nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx); @@ -44,7 +44,6 @@ static PyObject *nxt_py_asgi_port_read(PyObject *self, PyObject *args); static void nxt_python_asgi_done(void); static PyObject *nxt_py_port_read; -static nxt_unit_port_t *nxt_py_shared_port; static PyMethodDef nxt_py_port_read_method = {"unit_port_read", nxt_py_asgi_port_read, METH_VARARGS, ""}; @@ -54,7 +53,6 @@ static nxt_python_proto_t nxt_py_asgi_proto = { .ctx_data_free = nxt_python_asgi_ctx_data_free, .startup = nxt_python_asgi_startup, .run = nxt_python_asgi_run, - .ready = nxt_python_asgi_ready, .done = nxt_python_asgi_done, }; @@ -361,14 +359,6 @@ nxt_python_asgi_run(nxt_unit_ctx_t *ctx) Py_DECREF(res); - nxt_py_asgi_remove_reader(ctx, nxt_py_shared_port); - nxt_py_asgi_remove_reader(ctx, ctx_data->port); - - if (ctx_data->port != NULL) { - ctx_data->port->data = NULL; - ctx_data->port = NULL; - } - nxt_py_asgi_lifespan_shutdown(ctx); return NXT_UNIT_OK; @@ -892,82 +882,9 @@ fail: static int -nxt_python_asgi_ready(nxt_unit_ctx_t *ctx) -{ - int rc; - PyObject *res, *fd, *py_ctx, *py_port; - nxt_unit_port_t *port; - nxt_py_asgi_ctx_data_t *ctx_data; - - if (nxt_slow_path(nxt_py_shared_port == NULL)) { - return NXT_UNIT_ERROR; - } - - port = nxt_py_shared_port; - - nxt_unit_debug(ctx, "asgi_ready %d %p %p", port->in_fd, ctx, port); - - ctx_data = ctx->data; - - rc = NXT_UNIT_ERROR; - - fd = PyLong_FromLong(port->in_fd); - if (nxt_slow_path(fd == NULL)) { - nxt_unit_alert(ctx, "Python failed to create fd"); - nxt_python_print_exception(); - - return rc; - } - - py_ctx = PyLong_FromVoidPtr(ctx); - if (nxt_slow_path(py_ctx == NULL)) { - nxt_unit_alert(ctx, "Python failed to create py_ctx"); - nxt_python_print_exception(); - - goto clean_fd; - } - - py_port = PyLong_FromVoidPtr(port); - if (nxt_slow_path(py_port == NULL)) { - nxt_unit_alert(ctx, "Python failed to create py_port"); - nxt_python_print_exception(); - - goto clean_py_ctx; - } - - res = PyObject_CallFunctionObjArgs(ctx_data->loop_add_reader, - fd, nxt_py_port_read, - py_ctx, py_port, NULL); - if (nxt_slow_path(res == NULL)) { - nxt_unit_alert(ctx, "Python failed to add_reader"); - nxt_python_print_exception(); - - } else { - Py_DECREF(res); - - rc = NXT_UNIT_OK; - } - - Py_DECREF(py_port); - -clean_py_ctx: - - Py_DECREF(py_ctx); - -clean_fd: - - Py_DECREF(fd); - - return rc; -} - - -static int nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) { - int nb, rc; - PyObject *res, *fd, *py_ctx, *py_port; - nxt_py_asgi_ctx_data_t *ctx_data; + int nb; if (port->in_fd == -1) { return NXT_UNIT_OK; @@ -984,27 +901,31 @@ nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) nxt_unit_debug(ctx, "asgi_add_port %d %p %p", port->in_fd, ctx, port); - if (port->id.id == NXT_UNIT_SHARED_PORT_ID) { - nxt_py_shared_port = port; + return nxt_py_asgi_add_reader(ctx, port); +} - return NXT_UNIT_OK; - } - ctx_data = ctx->data; +static int +nxt_py_asgi_add_reader(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) +{ + int rc; + PyObject *res, *fd, *py_ctx, *py_port; + nxt_py_asgi_ctx_data_t *ctx_data; - ctx_data->port = port; - port->data = ctx_data; + nxt_unit_debug(ctx, "asgi_add_reader %d %p %p", port->in_fd, ctx, port); - rc = NXT_UNIT_ERROR; + ctx_data = ctx->data; fd = PyLong_FromLong(port->in_fd); if (nxt_slow_path(fd == NULL)) { nxt_unit_alert(ctx, "Python failed to create fd"); nxt_python_print_exception(); - return rc; + return NXT_UNIT_ERROR; } + rc = NXT_UNIT_ERROR; + py_ctx = PyLong_FromVoidPtr(ctx); if (nxt_slow_path(py_ctx == NULL)) { nxt_unit_alert(ctx, "Python failed to create py_ctx"); @@ -1049,17 +970,16 @@ clean_fd: static void -nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_port_t *port) +nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_ctx_t *ctx, + nxt_unit_port_t *port) { - if (port->in_fd == -1) { + if (port->in_fd == -1 || ctx == NULL) { return; } nxt_unit_debug(NULL, "asgi_remove_port %d %p", port->in_fd, port); - if (nxt_py_shared_port == port) { - nxt_py_shared_port = NULL; - } + nxt_py_asgi_remove_reader(ctx, port); } @@ -1073,27 +993,6 @@ nxt_py_asgi_quit(nxt_unit_ctx_t *ctx) ctx_data = ctx->data; - if (nxt_py_shared_port != NULL) { - p = PyLong_FromLong(nxt_py_shared_port->in_fd); - if (nxt_slow_path(p == NULL)) { - nxt_unit_alert(NULL, "Python failed to create Long"); - nxt_python_print_exception(); - - } else { - res = PyObject_CallFunctionObjArgs(ctx_data->loop_remove_reader, - p, NULL); - if (nxt_slow_path(res == NULL)) { - nxt_unit_alert(NULL, "Python failed to remove_reader"); - nxt_python_print_exception(); - - } else { - Py_DECREF(res); - } - - Py_DECREF(p); - } - } - p = PyLong_FromLong(0); if (nxt_slow_path(p == NULL)) { nxt_unit_alert(NULL, "Python failed to create Long"); diff --git a/src/python/nxt_python_asgi.h b/src/python/nxt_python_asgi.h index 20702065..94478a36 100644 --- a/src/python/nxt_python_asgi.h +++ b/src/python/nxt_python_asgi.h @@ -34,7 +34,6 @@ typedef struct { PyObject *quit_future; PyObject *quit_future_set_result; PyObject **target_lifespans; - nxt_unit_port_t *port; } nxt_py_asgi_ctx_data_t; PyObject *nxt_py_asgi_enum_headers(PyObject *headers, diff --git a/src/python/nxt_python_asgi_http.c b/src/python/nxt_python_asgi_http.c index c4a77d53..05c0da4f 100644 --- a/src/python/nxt_python_asgi_http.c +++ b/src/python/nxt_python_asgi_http.c @@ -636,9 +636,11 @@ nxt_py_asgi_http_close_handler(nxt_unit_request_info_t *req) nxt_unit_req_debug(req, "asgi_http_close_handler"); - http->closed = 1; + if (nxt_fast_path(http != NULL)) { + http->closed = 1; - nxt_py_asgi_http_emit_disconnect(http); + nxt_py_asgi_http_emit_disconnect(http); + } } diff --git a/src/python/nxt_python_asgi_websocket.c b/src/python/nxt_python_asgi_websocket.c index fc7d9fa4..ab1d0324 100644 --- a/src/python/nxt_python_asgi_websocket.c +++ b/src/python/nxt_python_asgi_websocket.c @@ -980,6 +980,10 @@ nxt_py_asgi_websocket_close_handler(nxt_unit_request_info_t *req) nxt_unit_req_debug(req, "asgi_websocket_close_handler"); + if (nxt_slow_path(ws == NULL)) { + return; + } + if (ws->receive_future == NULL) { ws->state = NXT_WS_DISCONNECTED; diff --git a/src/ruby/nxt_ruby.c b/src/ruby/nxt_ruby.c index 522869b5..62498127 100644 --- a/src/ruby/nxt_ruby.c +++ b/src/ruby/nxt_ruby.c @@ -357,11 +357,10 @@ nxt_ruby_start(nxt_task_t *task, nxt_process_data_t *data) goto fail; } - nxt_unit_default_init(task, &ruby_unit_init); + nxt_unit_default_init(task, &ruby_unit_init, conf); ruby_unit_init.callbacks.request_handler = nxt_ruby_request_handler; ruby_unit_init.callbacks.ready_handler = nxt_ruby_ready_handler; - ruby_unit_init.shm_limit = conf->shm_limit; ruby_unit_init.data = c; ruby_unit_init.ctx_data = &ruby_ctx; @@ -1258,11 +1257,6 @@ nxt_ruby_ready_handler(nxt_unit_ctx_t *ctx) nxt_ruby_ctx_t *rctx; nxt_ruby_app_conf_t *c; - /* Worker thread context. */ - if (!nxt_unit_is_main_ctx(ctx)) { - return NXT_UNIT_OK; - } - c = ctx->unit->data; if (c->threads <= 1) { diff --git a/src/test/nxt_base64_test.c b/src/test/nxt_base64_test.c new file mode 100644 index 00000000..13a772b6 --- /dev/null +++ b/src/test/nxt_base64_test.c @@ -0,0 +1,98 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + +#include <nxt_main.h> +#include "nxt_tests.h" + + +nxt_int_t +nxt_base64_test(nxt_thread_t *thr) +{ + ssize_t ret; + nxt_uint_t i; + + static struct { + nxt_str_t enc; + nxt_str_t dec; + + } tests[] = { + { nxt_string("ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz" + "0123456789+//+9876543210" + "zyxwvutsrqponmlkjihgfedcba" + "ZYXWVUTSRQPONMLKJIHGFEDCBA"), + nxt_string("\x00\x10\x83\x10\x51\x87\x20\x92\x8b\x30\xd3\x8f" + "\x41\x14\x93\x51\x55\x97\x61\x96\x9b\x71\xd7\x9f" + "\x82\x18\xa3\x92\x59\xa7\xa2\x9a\xab\xb2\xdb\xaf" + "\xc3\x1c\xb3\xd3\x5d\xb7\xe3\x9e\xbb\xf3\xdf\xbf" + "\xff\xef\x7c\xef\xae\x78\xdf\x6d\x74\xcf\x2c\x70" + "\xbe\xeb\x6c\xae\xaa\x68\x9e\x69\x64\x8e\x28\x60" + "\x7d\xe7\x5c\x6d\xa6\x58\x5d\x65\x54\x4d\x24\x50" + "\x3c\xe3\x4c\x2c\xa2\x48\x1c\x61\x44\x0c\x20\x40") }, + + { nxt_string("Aa=="), + nxt_string("\x01") }, + { nxt_string("0Z"), + nxt_string("\xd1") }, + { nxt_string("0aA="), + nxt_string("\xd1\xa0") }, + { nxt_string("z/+"), + nxt_string("\xcf\xff") }, + { nxt_string("z9+Npe=="), + nxt_string("\xcf\xdf\x8d\xa5") }, + { nxt_string("/+98765"), + nxt_string("\xff\xef\x7c\xef\xae") }, + + { nxt_string("aBc_"), + nxt_null_string }, + { nxt_string("5"), + nxt_null_string }, + { nxt_string("M==="), + nxt_null_string }, + { nxt_string("===="), + nxt_null_string }, + { nxt_string("Ab="), + nxt_null_string }, + { nxt_string("00=0"), + nxt_null_string }, + { nxt_string("\0"), + nxt_null_string }, + { nxt_string("\r\naaaa"), + nxt_null_string }, + { nxt_string("=0000"), + nxt_null_string }, + }; + + u_char buf[96]; + + nxt_thread_time_update(thr); + + for (i = 0; i < nxt_nitems(tests); i++) { + ret = nxt_base64_decode(NULL, tests[i].enc.start, tests[i].enc.length); + + if (ret == NXT_ERROR && tests[i].dec.start == NULL) { + continue; + } + + if ((size_t) ret != tests[i].dec.length) { + nxt_log_alert(thr->log, + "nxt_base64_decode() test \"%V\" failed: incorrect " + "length of decoded string %z, expected %uz", + &tests[i].enc, ret, tests[i].dec.length); + return NXT_ERROR; + } + + ret = nxt_base64_decode(buf, tests[i].enc.start, tests[i].enc.length); + + if (!nxt_str_eq(&tests[i].dec, buf, (size_t) ret)) { + nxt_log_alert(thr->log, "nxt_base64_decode() test \"%V\" failed"); + return NXT_ERROR; + } + } + + nxt_log_error(NXT_LOG_NOTICE, thr->log, "nxt_base64_decode() test passed"); + + return NXT_OK; +} diff --git a/src/test/nxt_tests.c b/src/test/nxt_tests.c index 901d76c3..f5a1cbd4 100644 --- a/src/test/nxt_tests.c +++ b/src/test/nxt_tests.c @@ -162,6 +162,10 @@ main(int argc, char **argv) return 1; } + if (nxt_base64_test(thr) != NXT_OK) { + return 1; + } + #if (NXT_HAVE_CLONE_NEWUSER) if (nxt_clone_creds_test(thr) != NXT_OK) { return 1; diff --git a/src/test/nxt_tests.h b/src/test/nxt_tests.h index d531cc7d..463dc851 100644 --- a/src/test/nxt_tests.h +++ b/src/test/nxt_tests.h @@ -64,6 +64,7 @@ nxt_int_t nxt_malloc_test(nxt_thread_t *thr); nxt_int_t nxt_utf8_test(nxt_thread_t *thr); nxt_int_t nxt_http_parse_test(nxt_thread_t *thr); nxt_int_t nxt_strverscmp_test(nxt_thread_t *thr); +nxt_int_t nxt_base64_test(nxt_thread_t *thr); nxt_int_t nxt_clone_creds_test(nxt_thread_t *thr); diff --git a/src/test/nxt_unit_app_test.c b/src/test/nxt_unit_app_test.c index a5f3728c..8fda9740 100644 --- a/src/test/nxt_unit_app_test.c +++ b/src/test/nxt_unit_app_test.c @@ -97,7 +97,7 @@ ready_handler(nxt_unit_ctx_t *ctx) nxt_unit_debug(ctx, "ready"); - if (!nxt_unit_is_main_ctx(ctx) || thread_count <= 1) { + if (thread_count <= 1) { return NXT_UNIT_OK; } |