From 4eecf1cb6ad520458e595313dc65e3e75405a252 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Tue, 26 Nov 2019 17:14:53 +0300 Subject: Refactoring reference counting of req_app_link. The reason for the change is that the req_app_link reference count was incorrect if the application crashed at start; in this case, the nxt_request_app_link_update_peer() function was never called. This closes #332 issue on GitHub. --- src/nxt_router.c | 82 +++++++++++++++++++++++++++++++++----------------------- 1 file changed, 48 insertions(+), 34 deletions(-) (limited to 'src') diff --git a/src/nxt_router.c b/src/nxt_router.c index b9f5d921..38396e86 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -95,16 +95,16 @@ nxt_request_app_link_inc_use(nxt_request_app_link_t *req_app_link) } nxt_inline void -nxt_request_app_link_dec_use(nxt_request_app_link_t *req_app_link) +nxt_request_app_link_chk_use(nxt_request_app_link_t *req_app_link, int i) { #if (NXT_DEBUG) int c; - c = nxt_atomic_fetch_add(&req_app_link->use_count, -1); + c = nxt_atomic_fetch_add(&req_app_link->use_count, i); - nxt_assert(c > 1); + nxt_assert((c + i) > 0); #else - (void) nxt_atomic_fetch_add(&req_app_link->use_count, -1); + (void) nxt_atomic_fetch_add(&req_app_link->use_count, i); #endif } @@ -600,8 +600,6 @@ nxt_request_app_link_update_peer(nxt_task_t *task, nxt_port_rpc_ex_set_peer(task, engine->port, req_rpc_data, req_app_link->app_port->pid); } - - nxt_request_app_link_use(task, req_app_link, -1); } @@ -3732,8 +3730,6 @@ nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, cancelled = nxt_router_msg_cancel(task, &req_app_link->msg_info, req_app_link->stream); if (cancelled) { - nxt_request_app_link_inc_use(req_app_link); - res = nxt_router_app_port(task, req_rpc_data->app, req_app_link); if (res == NXT_OK) { @@ -3751,6 +3747,8 @@ nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_router_app_prepare_request(task, req_app_link); } + nxt_request_app_link_use(task, req_app_link, -1); + msg->port_msg.last = 0; return; @@ -4015,6 +4013,8 @@ nxt_router_app_process_request(nxt_task_t *task, void *obj, void *data) #endif nxt_router_app_prepare_request(task, req_app_link); + + nxt_request_app_link_use(task, req_app_link, -1); } @@ -4148,8 +4148,6 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, re_ra->stream); if (cancelled) { - nxt_request_app_link_inc_use(re_ra); - state.req_app_link = re_ra; state.app = app; @@ -4217,19 +4215,38 @@ re_ra_cancelled: if (re_ra != NULL) { if (nxt_router_port_post_select(task, &state) == NXT_OK) { + /* + * There should be call nxt_request_app_link_inc_use(re_ra), + * but we need to decrement use then. So, let's skip both. + */ + nxt_work_queue_add(&task->thread->engine->fast_work_queue, nxt_router_app_process_request, &task->thread->engine->task, app, re_ra); + + } else { + /* + * This call should be unconditional, but we want to spare + * couple of CPU ticks to postpone the head death of the universe. + */ + + nxt_request_app_link_use(task, re_ra, -1); } } if (req_app_link != NULL) { - nxt_request_app_link_use(task, req_app_link, -1); + /* + * Here we do the same trick as described above, + * but without conditions. + * Skip required nxt_request_app_link_inc_use(req_app_link). + */ nxt_work_queue_add(&task->thread->engine->fast_work_queue, nxt_router_app_process_request, &task->thread->engine->task, app, req_app_link); + /* ... skip nxt_request_app_link_use(task, req_app_link, -1) too. */ + goto adjust_use; } @@ -4477,6 +4494,7 @@ nxt_router_free_app(nxt_task_t *task, void *obj, void *data) static void nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state) { + int ra_use_delta; nxt_app_t *app; nxt_bool_t can_start_process; nxt_request_app_link_t *req_app_link; @@ -4485,11 +4503,7 @@ nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state) app = state->app; state->failed_port_use_delta = 0; - - if (nxt_queue_chk_remove(&req_app_link->link_app_requests)) - { - nxt_request_app_link_dec_use(req_app_link); - } + ra_use_delta = -nxt_queue_chk_remove(&req_app_link->link_app_requests); if (nxt_queue_chk_remove(&req_app_link->link_port_pending)) { @@ -4498,7 +4512,7 @@ nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state) nxt_queue_remove(&req_app_link->link_app_pending); req_app_link->link_app_pending.next = NULL; - nxt_request_app_link_dec_use(req_app_link); + ra_use_delta--; } state->failed_port = req_app_link->app_port; @@ -4538,7 +4552,7 @@ nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state) &req_app_link->link_app_requests); } - nxt_request_app_link_inc_use(req_app_link); + ra_use_delta++; nxt_debug(task, "req_app_link stream #%uD enqueue to app->requests", req_app_link->stream); @@ -4569,6 +4583,8 @@ nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state) } } + nxt_request_app_link_chk_use(req_app_link, ra_use_delta); + fail: state->shared_ra = req_app_link; @@ -4596,7 +4612,6 @@ nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state) nxt_request_app_link_error(state->req_app_link, 500, "Failed to allocate shared req<->app link"); - nxt_request_app_link_use(task, state->req_app_link, -1); return NXT_ERROR; } @@ -4625,7 +4640,6 @@ nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state) if (nxt_slow_path(res != NXT_OK)) { nxt_request_app_link_error(req_app_link, 500, "Failed to start app process"); - nxt_request_app_link_use(task, req_app_link, -1); return NXT_ERROR; } @@ -4686,19 +4700,19 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r, nxt_request_app_link_init(task, req_app_link, req_rpc_data); res = nxt_router_app_port(task, app, req_app_link); + req_app_link = req_rpc_data->req_app_link; - if (res != NXT_OK) { - return; - } + if (res == NXT_OK) { + port = req_app_link->app_port; - req_app_link = req_rpc_data->req_app_link; - port = req_app_link->app_port; + nxt_assert(port != NULL); - nxt_assert(port != NULL); + nxt_port_rpc_ex_set_peer(task, engine->port, req_rpc_data, port->pid); - nxt_port_rpc_ex_set_peer(task, engine->port, req_rpc_data, port->pid); + nxt_router_app_prepare_request(task, req_app_link); + } - nxt_router_app_prepare_request(task, req_app_link); + nxt_request_app_link_use(task, req_app_link, -1); } @@ -5172,8 +5186,6 @@ nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data) pending_ra->stream); if (cancelled) { - nxt_request_app_link_inc_use(pending_ra); - state.req_app_link = pending_ra; state.app = app; @@ -5186,10 +5198,12 @@ nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data) nxt_thread_mutex_unlock(&app->mutex); - if (pending_ra != NULL - && nxt_router_port_post_select(task, &state) == NXT_OK) - { - nxt_router_app_prepare_request(task, pending_ra); + if (pending_ra != NULL) { + if (nxt_router_port_post_select(task, &state) == NXT_OK) { + nxt_router_app_prepare_request(task, pending_ra); + } + + nxt_request_app_link_use(task, pending_ra, -1); } nxt_debug(task, "send quit to app '%V' pid %PI", &app->name, port->pid); -- cgit From 2f23923e44d4528a547d2a29212ac93c3f0e25de Mon Sep 17 00:00:00 2001 From: Tiago Natel Date: Tue, 26 Nov 2019 16:15:23 +0000 Subject: Changed the group listing to run unprivileged when possible. Now the nxt_user_groups_get() function uses getgrouplist(3) when available (except MacOS, see below). For some platforms, getgrouplist() supports a method of probing how much groups the user has but the behavior is not consistent. The method used here consists of optimistically trying to get up to min(256, NGROUPS_MAX) groups; only if ngroups returned exceeds the original value, we do a second call. This method can block main's process if LDAP/NDIS+ is in use. MacOS has getgrouplist(3) but it's buggy. It doesn't update ngroups if the value passed is smaller than the number of groups the user has. Some projects (like Go stdlib) call getgrouplist() in a loop, increasing ngroups until it exceeds the number of groups user belongs to or fail when a limit is reached. For performance reasons, this is to be avoided and MacOS is handled in the fallback implementation. The fallback implementation is the old Unit approach. It saves main's user groups (getgroups(2)) and then calls initgroups(3) to load application's groups in main, then does a second getgroups(2) to store the gids and restore main's groups in the end. Because of initgroups(3)' call to setgroups(2), this method requires root capabilities. In the case of OSX, which has small NGROUPS_MAX by default (16), it's not possible to restore main's groups if it's large; if so, this method fallbacks again: user_cred gids aren't stored, and the worker process calls initgroups() itself and may block for some time if LDAP/NDIS+ is in use. --- src/nxt_process.c | 134 ++++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 104 insertions(+), 30 deletions(-) (limited to 'src') diff --git a/src/nxt_process.c b/src/nxt_process.c index b246a58c..64356d64 100644 --- a/src/nxt_process.c +++ b/src/nxt_process.c @@ -571,19 +571,109 @@ nxt_user_cred_get(nxt_task_t *task, nxt_user_cred_t *uc, const char *group) uc->base_gid = grp->gr_gid; } - return nxt_user_groups_get(task, uc); + nxt_debug(task, "about to get \"%s\" groups (uid:%d, base gid:%d)", + uc->user, uc->uid, uc->base_gid); + + if (nxt_user_groups_get(task, uc) != NXT_OK) { + return NXT_ERROR; + } + +#if (NXT_DEBUG) + { + u_char *p, *end; + nxt_uint_t i; + u_char msg[NXT_MAX_ERROR_STR]; + + p = msg; + end = msg + NXT_MAX_ERROR_STR; + + for (i = 0; i < uc->ngroups; i++) { + p = nxt_sprintf(p, end, "%d%c", uc->gids[i], + i+1 < uc->ngroups ? ',' : '\0'); + } + + nxt_debug(task, "user \"%s\" has gids:%*s", uc->user, p - msg, msg); + } +#endif + + return NXT_OK; } +#if (NXT_HAVE_GETGROUPLIST && !NXT_MACOSX) + +#define NXT_NGROUPS nxt_min(256, NGROUPS_MAX) + + +static nxt_int_t +nxt_user_groups_get(nxt_task_t *task, nxt_user_cred_t *uc) +{ + int ngroups; + gid_t groups[NXT_NGROUPS]; + + ngroups = NXT_NGROUPS; + + if (getgrouplist(uc->user, uc->base_gid, groups, &ngroups) < 0) { + if (nxt_slow_path(ngroups <= NXT_NGROUPS)) { + nxt_alert(task, "getgrouplist(\"%s\", %d, ...) failed %E", uc->user, + uc->base_gid, nxt_errno); + + return NXT_ERROR; + } + } + + if (ngroups > NXT_NGROUPS) { + if (ngroups > NGROUPS_MAX) { + ngroups = NGROUPS_MAX; + } + + uc->ngroups = ngroups; + + uc->gids = nxt_malloc(ngroups * sizeof(gid_t)); + if (nxt_slow_path(uc->gids == NULL)) { + return NXT_ERROR; + } + + if (nxt_slow_path(getgrouplist(uc->user, uc->base_gid, uc->gids, + &ngroups) < 0)) { + + nxt_alert(task, "getgrouplist(\"%s\", %d) failed %E", uc->user, + uc->base_gid, nxt_errno); + + nxt_free(uc->gids); + + return NXT_ERROR; + } + + return NXT_OK; + } + + uc->ngroups = ngroups; + + uc->gids = nxt_malloc(ngroups * sizeof(gid_t)); + if (nxt_slow_path(uc->gids == NULL)) { + return NXT_ERROR; + } + + nxt_memcpy(uc->gids, groups, ngroups * sizeof(gid_t)); + + return NXT_OK; +} + + +#else + /* + * For operating systems that lack getgrouplist(3) or it's buggy (MacOS), * nxt_user_groups_get() stores an array of groups IDs which should be - * set by the initgroups() function for a given user. The initgroups() + * set by the setgroups() function for a given user. The initgroups() * may block a just forked worker process for some time if LDAP or NDIS+ * is used, so nxt_user_groups_get() allows to get worker user groups in * main process. In a nutshell the initgroups() calls getgrouplist() - * followed by setgroups(). However Solaris lacks the getgrouplist(). + * followed by setgroups(). However older Solaris lacks the getgrouplist(). * Besides getgrouplist() does not allow to query the exact number of - * groups while NGROUPS_MAX can be quite large (e.g. 65536 on Linux). + * groups in some platforms, while NGROUPS_MAX can be quite large (e.g. + * 65536 on Linux). * So nxt_user_groups_get() emulates getgrouplist(): at first the function * saves the super-user groups IDs, then calls initgroups() and saves the * specified user groups IDs, and then restores the super-user groups IDs. @@ -610,7 +700,7 @@ nxt_user_groups_get(nxt_task_t *task, nxt_user_cred_t *uc) nsaved = getgroups(0, NULL); - if (nsaved == -1) { + if (nxt_slow_path(nsaved == -1)) { nxt_alert(task, "getgroups(0, NULL) failed %E", nxt_errno); return NXT_ERROR; } @@ -628,7 +718,7 @@ nxt_user_groups_get(nxt_task_t *task, nxt_user_cred_t *uc) saved = nxt_malloc(nsaved * sizeof(nxt_gid_t)); - if (saved == NULL) { + if (nxt_slow_path(saved == NULL)) { return NXT_ERROR; } @@ -636,7 +726,7 @@ nxt_user_groups_get(nxt_task_t *task, nxt_user_cred_t *uc) nsaved = getgroups(nsaved, saved); - if (nsaved == -1) { + if (nxt_slow_path(nsaved == -1)) { nxt_alert(task, "getgroups(%d) failed %E", nsaved, nxt_errno); goto free; } @@ -662,7 +752,7 @@ nxt_user_groups_get(nxt_task_t *task, nxt_user_cred_t *uc) ngroups = getgroups(0, NULL); - if (ngroups == -1) { + if (nxt_slow_path(ngroups == -1)) { nxt_alert(task, "getgroups(0, NULL) failed %E", nxt_errno); goto restore; } @@ -671,43 +761,24 @@ nxt_user_groups_get(nxt_task_t *task, nxt_user_cred_t *uc) uc->gids = nxt_malloc(ngroups * sizeof(nxt_gid_t)); - if (uc->gids == NULL) { + if (nxt_slow_path(uc->gids == NULL)) { goto restore; } ngroups = getgroups(ngroups, uc->gids); - if (ngroups == -1) { + if (nxt_slow_path(ngroups == -1)) { nxt_alert(task, "getgroups(%d) failed %E", ngroups, nxt_errno); goto restore; } uc->ngroups = ngroups; -#if (NXT_DEBUG) - { - u_char *p, *end; - nxt_uint_t i; - u_char msg[NXT_MAX_ERROR_STR]; - - p = msg; - end = msg + NXT_MAX_ERROR_STR; - - for (i = 0; i < uc->ngroups; i++) { - p = nxt_sprintf(p, end, "%uL:", (uint64_t) uc->gids[i]); - } - - nxt_debug(task, "user \"%s\" cred: uid:%uL base gid:%uL, gids:%*s", - uc->user, (uint64_t) uc->uid, (uint64_t) uc->base_gid, - p - msg, msg); - } -#endif - ret = NXT_OK; restore: - if (setgroups(nsaved, saved) != 0) { + if (nxt_slow_path(setgroups(nsaved, saved) != 0)) { nxt_alert(task, "setgroups(%d) failed %E", nsaved, nxt_errno); ret = NXT_ERROR; } @@ -720,6 +791,9 @@ free: } +#endif + + nxt_int_t nxt_user_cred_set(nxt_task_t *task, nxt_user_cred_t *uc) { -- cgit From 417cc7be7c0c6f6e62f0916f671bbf0a4460226b Mon Sep 17 00:00:00 2001 From: Tiago Natel Date: Tue, 26 Nov 2019 16:26:24 +0000 Subject: Refactor of process init. Introduces the functions nxt_process_init_create() and nxt_process_init_creds_set(). --- src/nxt_main_process.c | 313 ++++++++++++++++++++++++++++++++--------------- src/nxt_main_process.h | 5 +- src/nxt_port.c | 2 +- src/nxt_port.h | 2 +- src/nxt_process.c | 24 ++-- src/nxt_process.h | 26 ++-- src/nxt_runtime.c | 7 +- src/nxt_worker_process.c | 19 --- 8 files changed, 249 insertions(+), 149 deletions(-) (limited to 'src') diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c index cfe0341f..05027443 100644 --- a/src/nxt_main_process.c +++ b/src/nxt_main_process.c @@ -29,6 +29,10 @@ typedef struct { } nxt_conf_app_map_t; +extern nxt_port_handlers_t nxt_controller_process_port_handlers; +extern nxt_port_handlers_t nxt_router_process_port_handlers; + + static nxt_int_t nxt_main_process_port_create(nxt_task_t *task, nxt_runtime_t *rt); static void nxt_main_process_title(nxt_task_t *task); @@ -36,6 +40,8 @@ static nxt_int_t nxt_main_start_controller_process(nxt_task_t *task, nxt_runtime_t *rt); static nxt_int_t nxt_main_create_controller_process(nxt_task_t *task, nxt_runtime_t *rt, nxt_process_init_t *init); +static nxt_int_t nxt_main_create_router_process(nxt_task_t *task, nxt_runtime_t *rt, + nxt_process_init_t *init); static nxt_int_t nxt_main_start_router_process(nxt_task_t *task, nxt_runtime_t *rt); static nxt_int_t nxt_main_start_discovery_process(nxt_task_t *task, @@ -67,6 +73,12 @@ static void nxt_main_port_conf_store_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); static void nxt_main_port_access_log_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); +static nxt_process_init_t *nxt_process_init_create(nxt_task_t *task, + nxt_process_type_t type, const nxt_str_t *name); +static nxt_int_t nxt_process_init_name_set(nxt_process_init_t *init, + nxt_process_type_t type, const nxt_str_t *name); +static nxt_int_t nxt_process_init_creds_set(nxt_task_t *task, + nxt_process_init_t *init, nxt_str_t *user, nxt_str_t *group); static nxt_int_t nxt_init_set_isolation(nxt_task_t *task, nxt_process_init_t *init, nxt_conf_value_t *isolation); @@ -84,6 +96,54 @@ const nxt_sig_event_t nxt_main_process_signals[] = { }; +static const nxt_port_handlers_t nxt_app_process_port_handlers = { + .new_port = nxt_port_new_port_handler, + .change_file = nxt_port_change_log_file_handler, + .mmap = nxt_port_mmap_handler, + .remove_pid = nxt_port_remove_pid_handler, +}; + + +static const nxt_port_handlers_t nxt_discovery_process_port_handlers = { + .quit = nxt_worker_process_quit_handler, + .new_port = nxt_port_new_port_handler, + .change_file = nxt_port_change_log_file_handler, + .mmap = nxt_port_mmap_handler, + .data = nxt_port_data_handler, + .remove_pid = nxt_port_remove_pid_handler, + .rpc_ready = nxt_port_rpc_handler, + .rpc_error = nxt_port_rpc_handler, +}; + + +static const nxt_port_handlers_t *nxt_process_port_handlers[NXT_PROCESS_MAX] = +{ + NULL, + &nxt_discovery_process_port_handlers, + &nxt_controller_process_port_handlers, + &nxt_router_process_port_handlers, + &nxt_app_process_port_handlers +}; + + +static const nxt_process_start_t nxt_process_starts[NXT_PROCESS_MAX] = { + NULL, + nxt_discovery_start, + nxt_controller_start, + nxt_router_start, + nxt_app_start +}; + + +static const nxt_process_restart_t nxt_process_restarts[NXT_PROCESS_MAX] = { + NULL, + NULL, + &nxt_main_create_controller_process, + &nxt_main_create_router_process, + NULL +}; + + static nxt_bool_t nxt_exiting; @@ -453,20 +513,13 @@ nxt_main_start_controller_process(nxt_task_t *task, nxt_runtime_t *rt) { nxt_process_init_t *init; - init = nxt_mp_zalloc(rt->mem_pool, sizeof(nxt_process_init_t)); + static const nxt_str_t name = nxt_string("controller"); + + init = nxt_process_init_create(task, NXT_PROCESS_CONTROLLER, &name); if (nxt_slow_path(init == NULL)) { return NXT_ERROR; } - init->start = nxt_controller_start; - init->name = "controller"; - init->user_cred = &rt->user_cred; - init->port_handlers = &nxt_controller_process_port_handlers; - init->signals = nxt_worker_process_signals; - init->type = NXT_PROCESS_CONTROLLER; - init->stream = 0; - init->restart = &nxt_main_create_controller_process; - return nxt_main_create_controller_process(task, rt, init);; } @@ -547,21 +600,13 @@ nxt_main_start_discovery_process(nxt_task_t *task, nxt_runtime_t *rt) { nxt_process_init_t *init; - init = nxt_mp_zalloc(rt->mem_pool, sizeof(nxt_process_init_t)); + static const nxt_str_t name = nxt_string("discovery"); + + init = nxt_process_init_create(task, NXT_PROCESS_DISCOVERY, &name); if (nxt_slow_path(init == NULL)) { return NXT_ERROR; } - init->start = nxt_discovery_start; - init->name = "discovery"; - init->user_cred = &rt->user_cred; - init->port_handlers = &nxt_discovery_process_port_handlers; - init->signals = nxt_worker_process_signals; - init->type = NXT_PROCESS_DISCOVERY; - init->data = rt; - init->stream = 0; - init->restart = NULL; - return nxt_main_create_worker_process(task, rt, init); } @@ -571,72 +616,43 @@ nxt_main_start_router_process(nxt_task_t *task, nxt_runtime_t *rt) { nxt_process_init_t *init; - init = nxt_mp_zalloc(rt->mem_pool, sizeof(nxt_process_init_t)); + static const nxt_str_t name = nxt_string("router"); + + init = nxt_process_init_create(task, NXT_PROCESS_ROUTER, &name); if (nxt_slow_path(init == NULL)) { return NXT_ERROR; } - init->start = nxt_router_start; - init->name = "router"; - init->user_cred = &rt->user_cred; - init->port_handlers = &nxt_router_process_port_handlers; - init->signals = nxt_worker_process_signals; - init->type = NXT_PROCESS_ROUTER; - init->data = rt; - init->stream = 0; - init->restart = &nxt_main_create_worker_process; + return nxt_main_create_router_process(task, rt, init); +} + + +static nxt_int_t +nxt_main_create_router_process(nxt_task_t *task, nxt_runtime_t *rt, + nxt_process_init_t *init) +{ + nxt_main_stop_worker_processes(task, rt); return nxt_main_create_worker_process(task, rt, init); } + static nxt_int_t nxt_main_start_worker_process(nxt_task_t *task, nxt_runtime_t *rt, nxt_common_app_conf_t *app_conf, uint32_t stream) { - char *user, *group; - u_char *title, *last, *end; - size_t size; nxt_int_t ret; nxt_process_init_t *init; - size = sizeof(nxt_process_init_t) - + app_conf->name.length - + sizeof("\"\" application"); - - if (rt->capabilities.setid) { - size += sizeof(nxt_user_cred_t) - + app_conf->user.length + 1 - + app_conf->group.length + 1; - } - - init = nxt_mp_zalloc(rt->mem_pool, size); + init = nxt_process_init_create(task, NXT_PROCESS_WORKER, &app_conf->name); if (nxt_slow_path(init == NULL)) { return NXT_ERROR; } if (rt->capabilities.setid) { - init->user_cred = nxt_pointer_to(init, sizeof(nxt_process_init_t)); - user = nxt_pointer_to(init->user_cred, sizeof(nxt_user_cred_t)); - - nxt_memcpy(user, app_conf->user.start, app_conf->user.length); - last = nxt_pointer_to(user, app_conf->user.length); - *last++ = '\0'; - - init->user_cred->user = user; - - if (app_conf->group.start != NULL) { - group = (char *) last; - - nxt_memcpy(group, app_conf->group.start, app_conf->group.length); - last = nxt_pointer_to(group, app_conf->group.length); - *last++ = '\0'; - - } else { - group = NULL; - } - - ret = nxt_user_cred_get(task, init->user_cred, group); - if (ret != NXT_OK) { + ret = nxt_process_init_creds_set(task, init, &app_conf->user, + &app_conf->group); + if (nxt_slow_path(ret != NXT_OK)) { goto fail; } @@ -658,23 +674,10 @@ nxt_main_start_worker_process(nxt_task_t *task, nxt_runtime_t *rt, &app_conf->name); goto fail; } - - last = nxt_pointer_to(init, sizeof(nxt_process_init_t)); } - title = last; - end = title + app_conf->name.length + sizeof("\"\" application"); - - nxt_sprintf(title, end, "\"%V\" application%Z", &app_conf->name); - - init->start = nxt_app_start; - init->name = (char *) title; - init->port_handlers = &nxt_app_process_port_handlers; - init->signals = nxt_worker_process_signals; - init->type = NXT_PROCESS_WORKER; init->data = app_conf; init->stream = stream; - init->restart = NULL; ret = nxt_init_set_isolation(task, init, app_conf->isolation); if (nxt_slow_path(ret != NXT_OK)) { @@ -685,13 +688,13 @@ nxt_main_start_worker_process(nxt_task_t *task, nxt_runtime_t *rt, fail: - nxt_mp_free(rt->mem_pool, init); + nxt_mp_destroy(init->mem_pool); return NXT_ERROR; } -static nxt_int_t +nxt_int_t nxt_main_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt, nxt_process_init_t *init) { @@ -706,7 +709,7 @@ nxt_main_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt, process = nxt_runtime_process_new(rt); if (nxt_slow_path(process == NULL)) { - nxt_mp_free(rt->mem_pool, init); + nxt_mp_destroy(init->mem_pool); return NXT_ERROR; } @@ -972,12 +975,13 @@ nxt_main_process_signal_handler(nxt_task_t *task, void *obj, void *data) static void nxt_main_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid) { - nxt_buf_t *buf; - nxt_port_t *port; - nxt_runtime_t *rt; - nxt_process_t *process; - nxt_process_type_t ptype; - nxt_process_init_t *init; + nxt_buf_t *buf; + nxt_port_t *port; + nxt_runtime_t *rt; + nxt_process_t *process; + nxt_process_type_t ptype; + nxt_process_init_t *init; + nxt_process_restart_t restart; rt = task->thread->runtime; @@ -988,6 +992,7 @@ nxt_main_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid) process->init = NULL; ptype = nxt_process_type(process); + restart = nxt_process_restarts[ptype]; if (process->ready) { init->stream = 0; @@ -996,6 +1001,8 @@ nxt_main_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid) nxt_process_close_ports(task, process); if (nxt_exiting) { + nxt_mp_destroy(init->mem_pool); + if (rt->nprocesses <= 2) { nxt_runtime_quit(task, 0); } @@ -1030,15 +1037,11 @@ nxt_main_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid) -1, init->stream, 0, buf); } nxt_runtime_process_loop; - if (init->restart != NULL) { - if (init->type == NXT_PROCESS_ROUTER) { - nxt_main_stop_worker_processes(task, rt); - } - - init->restart(task, rt, init); + if (restart != NULL) { + restart(task, rt, init); } else { - nxt_mp_free(rt->mem_pool, init); + nxt_mp_destroy(init->mem_pool); } } } @@ -1588,3 +1591,121 @@ nxt_init_set_ns(nxt_task_t *task, nxt_process_init_t *init, return NXT_OK; } + + +static nxt_process_init_t * +nxt_process_init_create(nxt_task_t *task, nxt_process_type_t type, + const nxt_str_t *name) +{ + nxt_mp_t *mp; + nxt_int_t ret; + nxt_runtime_t *rt; + nxt_process_init_t *init; + + mp = nxt_mp_create(1024, 128, 256, 32); + if (nxt_slow_path(mp == NULL)) { + return NULL; + } + + init = nxt_mp_zalloc(mp, sizeof(nxt_process_init_t)); + if (nxt_slow_path(init == NULL)) { + goto fail; + } + + init->mem_pool = mp; + + ret = nxt_process_init_name_set(init, type, name); + if (nxt_slow_path(ret != NXT_OK)) { + goto fail; + } + + rt = task->thread->runtime; + + init->type = type; + init->start = nxt_process_starts[type]; + init->port_handlers = nxt_process_port_handlers[type]; + init->signals = nxt_worker_process_signals; + init->user_cred = &rt->user_cred; + init->data = &rt; + + return init; + +fail: + + nxt_mp_destroy(mp); + + return NULL; +} + + +static nxt_int_t +nxt_process_init_name_set(nxt_process_init_t *init, nxt_process_type_t type, + const nxt_str_t *name) +{ + u_char *str, *end; + size_t size; + const char *fmt; + + size = name->length + 1; + + if (type == NXT_PROCESS_WORKER) { + size += nxt_length("\"\" application"); + fmt = "\"%V\" application%Z"; + + } else { + fmt = "%V%Z"; + } + + str = nxt_mp_alloc(init->mem_pool, size); + if (nxt_slow_path(str == NULL)) { + return NXT_ERROR; + } + + end = str + size; + + nxt_sprintf(str, end, fmt, name); + + init->name = (char *) str; + + return NXT_OK; +} + + +static nxt_int_t +nxt_process_init_creds_set(nxt_task_t *task, nxt_process_init_t *init, + nxt_str_t *user, nxt_str_t *group) +{ + char *str; + + init->user_cred = nxt_mp_zalloc(init->mem_pool, + sizeof(nxt_user_cred_t)); + + if (nxt_slow_path(init->user_cred == NULL)) { + return NXT_ERROR; + } + + str = nxt_mp_zalloc(init->mem_pool, user->length + 1); + if (nxt_slow_path(str == NULL)) { + return NXT_ERROR; + } + + nxt_memcpy(str, user->start, user->length); + str[user->length] = '\0'; + + init->user_cred->user = str; + + if (group->start != NULL) { + str = nxt_mp_zalloc(init->mem_pool, group->length + 1); + if (nxt_slow_path(str == NULL)) { + return NXT_ERROR; + } + + nxt_memcpy(str, group->start, group->length); + str[group->length] = '\0'; + + } else { + str = NULL; + } + + return nxt_user_cred_get(task, init->mem_pool, init->user_cred, str); +} diff --git a/src/nxt_main_process.h b/src/nxt_main_process.h index d932e11f..b0570a84 100644 --- a/src/nxt_main_process.h +++ b/src/nxt_main_process.h @@ -36,10 +36,7 @@ nxt_int_t nxt_router_start(nxt_task_t *task, void *data); nxt_int_t nxt_discovery_start(nxt_task_t *task, void *data); nxt_int_t nxt_app_start(nxt_task_t *task, void *data); -extern nxt_port_handlers_t nxt_controller_process_port_handlers; -extern nxt_port_handlers_t nxt_discovery_process_port_handlers; -extern nxt_port_handlers_t nxt_app_process_port_handlers; -extern nxt_port_handlers_t nxt_router_process_port_handlers; + extern const nxt_sig_event_t nxt_main_process_signals[]; extern const nxt_sig_event_t nxt_worker_process_signals[]; diff --git a/src/nxt_port.c b/src/nxt_port.c index 8d14a5e7..70cf33e6 100644 --- a/src/nxt_port.c +++ b/src/nxt_port.c @@ -140,7 +140,7 @@ nxt_port_reset_next_id() void nxt_port_enable(nxt_task_t *task, nxt_port_t *port, - nxt_port_handlers_t *handlers) + const nxt_port_handlers_t *handlers) { port->pid = nxt_pid; port->handler = nxt_port_handler; diff --git a/src/nxt_port.h b/src/nxt_port.h index eeb6caa5..3f9302e8 100644 --- a/src/nxt_port.h +++ b/src/nxt_port.h @@ -269,7 +269,7 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, } void nxt_port_enable(nxt_task_t *task, nxt_port_t *port, - nxt_port_handlers_t *handlers); + const nxt_port_handlers_t *handlers); nxt_int_t nxt_port_send_port(nxt_task_t *task, nxt_port_t *port, nxt_port_t *new_port, uint32_t stream); void nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt, diff --git a/src/nxt_process.c b/src/nxt_process.c index 64356d64..0cb0fbdd 100644 --- a/src/nxt_process.c +++ b/src/nxt_process.c @@ -14,7 +14,8 @@ #include static void nxt_process_start(nxt_task_t *task, nxt_process_t *process); -static nxt_int_t nxt_user_groups_get(nxt_task_t *task, nxt_user_cred_t *uc); +static nxt_int_t nxt_user_groups_get(nxt_task_t *task, nxt_mp_t *mp, + nxt_user_cred_t *uc); static nxt_int_t nxt_process_worker_setup(nxt_task_t *task, nxt_process_t *process, int parentfd); @@ -526,7 +527,8 @@ nxt_nanosleep(nxt_nsec_t ns) nxt_int_t -nxt_user_cred_get(nxt_task_t *task, nxt_user_cred_t *uc, const char *group) +nxt_user_cred_get(nxt_task_t *task, nxt_mp_t *mp, nxt_user_cred_t *uc, + const char *group) { struct group *grp; struct passwd *pwd; @@ -574,7 +576,7 @@ nxt_user_cred_get(nxt_task_t *task, nxt_user_cred_t *uc, const char *group) nxt_debug(task, "about to get \"%s\" groups (uid:%d, base gid:%d)", uc->user, uc->uid, uc->base_gid); - if (nxt_user_groups_get(task, uc) != NXT_OK) { + if (nxt_user_groups_get(task, mp, uc) != NXT_OK) { return NXT_ERROR; } @@ -606,7 +608,7 @@ nxt_user_cred_get(nxt_task_t *task, nxt_user_cred_t *uc, const char *group) static nxt_int_t -nxt_user_groups_get(nxt_task_t *task, nxt_user_cred_t *uc) +nxt_user_groups_get(nxt_task_t *task, nxt_mp_t *mp, nxt_user_cred_t *uc) { int ngroups; gid_t groups[NXT_NGROUPS]; @@ -629,7 +631,7 @@ nxt_user_groups_get(nxt_task_t *task, nxt_user_cred_t *uc) uc->ngroups = ngroups; - uc->gids = nxt_malloc(ngroups * sizeof(gid_t)); + uc->gids = nxt_mp_alloc(mp, ngroups * sizeof(gid_t)); if (nxt_slow_path(uc->gids == NULL)) { return NXT_ERROR; } @@ -640,8 +642,6 @@ nxt_user_groups_get(nxt_task_t *task, nxt_user_cred_t *uc) nxt_alert(task, "getgrouplist(\"%s\", %d) failed %E", uc->user, uc->base_gid, nxt_errno); - nxt_free(uc->gids); - return NXT_ERROR; } @@ -650,7 +650,7 @@ nxt_user_groups_get(nxt_task_t *task, nxt_user_cred_t *uc) uc->ngroups = ngroups; - uc->gids = nxt_malloc(ngroups * sizeof(gid_t)); + uc->gids = nxt_mp_alloc(mp, ngroups * sizeof(gid_t)); if (nxt_slow_path(uc->gids == NULL)) { return NXT_ERROR; } @@ -692,7 +692,7 @@ nxt_user_groups_get(nxt_task_t *task, nxt_user_cred_t *uc) */ static nxt_int_t -nxt_user_groups_get(nxt_task_t *task, nxt_user_cred_t *uc) +nxt_user_groups_get(nxt_task_t *task, nxt_mp_t *mp, nxt_user_cred_t *uc) { int nsaved, ngroups; nxt_int_t ret; @@ -716,7 +716,7 @@ nxt_user_groups_get(nxt_task_t *task, nxt_user_cred_t *uc) return NXT_OK; } - saved = nxt_malloc(nsaved * sizeof(nxt_gid_t)); + saved = nxt_mp_alloc(mp, nsaved * sizeof(nxt_gid_t)); if (nxt_slow_path(saved == NULL)) { return NXT_ERROR; @@ -759,7 +759,7 @@ nxt_user_groups_get(nxt_task_t *task, nxt_user_cred_t *uc) nxt_debug(task, "getgroups(0, NULL): %d", ngroups); - uc->gids = nxt_malloc(ngroups * sizeof(nxt_gid_t)); + uc->gids = nxt_mp_alloc(mp, ngroups * sizeof(nxt_gid_t)); if (nxt_slow_path(uc->gids == NULL)) { goto restore; @@ -785,7 +785,7 @@ restore: free: - nxt_free(saved); + nxt_mp_free(mp, saved); return ret; } diff --git a/src/nxt_process.h b/src/nxt_process.h index d67573f1..3f247ae1 100644 --- a/src/nxt_process.h +++ b/src/nxt_process.h @@ -35,22 +35,21 @@ typedef nxt_int_t (*nxt_process_restart_t)(nxt_task_t *task, nxt_runtime_t *rt, nxt_process_init_t *init); struct nxt_process_init_s { - nxt_process_start_t start; - const char *name; - nxt_user_cred_t *user_cred; + nxt_mp_t *mem_pool; + nxt_process_start_t start; + const char *name; + nxt_user_cred_t *user_cred; - nxt_port_handlers_t *port_handlers; - const nxt_sig_event_t *signals; + const nxt_port_handlers_t *port_handlers; + const nxt_sig_event_t *signals; - nxt_process_type_t type; + nxt_process_type_t type; - void *data; - uint32_t stream; - - nxt_process_restart_t restart; + void *data; + uint32_t stream; union { - nxt_process_clone_t clone; + nxt_process_clone_t clone; } isolation; }; @@ -126,7 +125,6 @@ void nxt_process_connected_port_remove(nxt_process_t *process, nxt_port_t *nxt_process_connected_port_find(nxt_process_t *process, nxt_pid_t pid, nxt_port_id_t port_id); - void nxt_worker_process_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); @@ -155,8 +153,8 @@ NXT_EXPORT void nxt_process_title(nxt_task_t *task, const char *fmt, ...); #define nxt_abort() \ (void) raise(SIGABRT) -NXT_EXPORT nxt_int_t nxt_user_cred_get(nxt_task_t *task, nxt_user_cred_t *uc, - const char *group); +NXT_EXPORT nxt_int_t nxt_user_cred_get(nxt_task_t *task, nxt_mp_t *mp, + nxt_user_cred_t *uc, const char *group); NXT_EXPORT nxt_int_t nxt_user_cred_set(nxt_task_t *task, nxt_user_cred_t *uc); NXT_EXPORT extern nxt_pid_t nxt_pid; diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c index 096aabc4..afd2cb66 100644 --- a/src/nxt_runtime.c +++ b/src/nxt_runtime.c @@ -705,7 +705,10 @@ nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt) } if (rt->capabilities.setid) { - if (nxt_user_cred_get(task, &rt->user_cred, rt->group) != NXT_OK) { + ret = nxt_user_cred_get(task, rt->mem_pool, &rt->user_cred, + rt->group); + + if (nxt_slow_path(ret != NXT_OK)) { return NXT_ERROR; } @@ -1323,7 +1326,7 @@ nxt_runtime_process_release(nxt_runtime_t *rt, nxt_process_t *process) nxt_thread_mutex_destroy(&process->cp_mutex); if (process->init != NULL) { - nxt_mp_free(rt->mem_pool, process->init); + nxt_mp_destroy(process->init->mem_pool); } nxt_mp_free(rt->mem_pool, process); diff --git a/src/nxt_worker_process.c b/src/nxt_worker_process.c index c068cca4..754e2ea8 100644 --- a/src/nxt_worker_process.c +++ b/src/nxt_worker_process.c @@ -19,25 +19,6 @@ static void nxt_worker_process_sigterm_handler(nxt_task_t *task, void *obj, static void nxt_worker_process_sigquit_handler(nxt_task_t *task, void *obj, void *data); -nxt_port_handlers_t nxt_app_process_port_handlers = { - .new_port = nxt_port_new_port_handler, - .change_file = nxt_port_change_log_file_handler, - .mmap = nxt_port_mmap_handler, - .remove_pid = nxt_port_remove_pid_handler, -}; - - -nxt_port_handlers_t nxt_discovery_process_port_handlers = { - .quit = nxt_worker_process_quit_handler, - .new_port = nxt_port_new_port_handler, - .change_file = nxt_port_change_log_file_handler, - .mmap = nxt_port_mmap_handler, - .data = nxt_port_data_handler, - .remove_pid = nxt_port_remove_pid_handler, - .rpc_ready = nxt_port_rpc_handler, - .rpc_error = nxt_port_rpc_handler, -}; - const nxt_sig_event_t nxt_worker_process_signals[] = { nxt_event_signal(SIGHUP, nxt_worker_process_signal_handler), -- cgit From ed2492a66afdf578d1e4f99dc098ab685607b3ba Mon Sep 17 00:00:00 2001 From: Tiago Natel Date: Fri, 6 Dec 2019 13:28:05 +0000 Subject: Moved credential-related code to nxt_credential.c. This is required to avoid include cycles, as some nxt_clone_* functions depend on the credential structures, but nxt_process depends on clone structures. --- src/nxt_credential.c | 342 +++++++++++++++++++++++++++++++++++++++++++++++++ src/nxt_credential.h | 28 ++++ src/nxt_main.h | 1 + src/nxt_main_process.c | 5 +- src/nxt_process.c | 335 +----------------------------------------------- src/nxt_process.h | 15 +-- src/nxt_runtime.c | 2 +- src/nxt_runtime.h | 2 +- 8 files changed, 377 insertions(+), 353 deletions(-) create mode 100644 src/nxt_credential.c create mode 100644 src/nxt_credential.h (limited to 'src') diff --git a/src/nxt_credential.c b/src/nxt_credential.c new file mode 100644 index 00000000..9f275b7d --- /dev/null +++ b/src/nxt_credential.c @@ -0,0 +1,342 @@ +/* + * Copyright (C) Igor Sysoev + * Copyright (C) NGINX, Inc. + */ + +#include + + +static nxt_int_t nxt_credential_groups_get(nxt_task_t *task, nxt_mp_t *mp, + nxt_credential_t *uc); + + +nxt_int_t +nxt_credential_get(nxt_task_t *task, nxt_mp_t *mp, nxt_credential_t *uc, + const char *group) +{ + struct group *grp; + struct passwd *pwd; + + nxt_errno = 0; + + pwd = getpwnam(uc->user); + + if (nxt_slow_path(pwd == NULL)) { + + if (nxt_errno == 0) { + nxt_alert(task, "getpwnam(\"%s\") failed, user \"%s\" not found", + uc->user, uc->user); + } else { + nxt_alert(task, "getpwnam(\"%s\") failed %E", uc->user, nxt_errno); + } + + return NXT_ERROR; + } + + uc->uid = pwd->pw_uid; + uc->base_gid = pwd->pw_gid; + + if (group != NULL && group[0] != '\0') { + nxt_errno = 0; + + grp = getgrnam(group); + + if (nxt_slow_path(grp == NULL)) { + + if (nxt_errno == 0) { + nxt_alert(task, + "getgrnam(\"%s\") failed, group \"%s\" not found", + group, group); + } else { + nxt_alert(task, "getgrnam(\"%s\") failed %E", group, nxt_errno); + } + + return NXT_ERROR; + } + + uc->base_gid = grp->gr_gid; + } + + nxt_debug(task, "about to get \"%s\" groups (uid:%d, base gid:%d)", + uc->user, uc->uid, uc->base_gid); + + if (nxt_credential_groups_get(task, mp, uc) != NXT_OK) { + return NXT_ERROR; + } + +#if (NXT_DEBUG) + { + u_char *p, *end; + nxt_uint_t i; + u_char msg[NXT_MAX_ERROR_STR]; + + p = msg; + end = msg + NXT_MAX_ERROR_STR; + + for (i = 0; i < uc->ngroups; i++) { + p = nxt_sprintf(p, end, "%d%c", uc->gids[i], + i+1 < uc->ngroups ? ',' : '\0'); + } + + nxt_debug(task, "user \"%s\" has gids:%*s", uc->user, p - msg, msg); + } +#endif + + return NXT_OK; +} + + +#if (NXT_HAVE_GETGROUPLIST && !NXT_MACOSX) + +#define NXT_NGROUPS nxt_min(256, NGROUPS_MAX) + + +static nxt_int_t +nxt_credential_groups_get(nxt_task_t *task, nxt_mp_t *mp, + nxt_credential_t *uc) +{ + int ngroups; + gid_t groups[NXT_NGROUPS]; + + ngroups = NXT_NGROUPS; + + if (getgrouplist(uc->user, uc->base_gid, groups, &ngroups) < 0) { + if (nxt_slow_path(ngroups <= NXT_NGROUPS)) { + nxt_alert(task, "getgrouplist(\"%s\", %d, ...) failed %E", uc->user, + uc->base_gid, nxt_errno); + + return NXT_ERROR; + } + } + + if (ngroups > NXT_NGROUPS) { + if (ngroups > NGROUPS_MAX) { + ngroups = NGROUPS_MAX; + } + + uc->ngroups = ngroups; + + uc->gids = nxt_mp_alloc(mp, ngroups * sizeof(gid_t)); + if (nxt_slow_path(uc->gids == NULL)) { + return NXT_ERROR; + } + + if (nxt_slow_path(getgrouplist(uc->user, uc->base_gid, uc->gids, + &ngroups) < 0)) { + + nxt_alert(task, "getgrouplist(\"%s\", %d) failed %E", uc->user, + uc->base_gid, nxt_errno); + + return NXT_ERROR; + } + + return NXT_OK; + } + + uc->ngroups = ngroups; + + uc->gids = nxt_mp_alloc(mp, ngroups * sizeof(gid_t)); + if (nxt_slow_path(uc->gids == NULL)) { + return NXT_ERROR; + } + + nxt_memcpy(uc->gids, groups, ngroups * sizeof(gid_t)); + + return NXT_OK; +} + + +#else + +/* + * For operating systems that lack getgrouplist(3) or it's buggy (MacOS), + * nxt_credential_groups_get() stores an array of groups IDs which should be + * set by the setgroups() function for a given user. The initgroups() + * may block a just forked worker process for some time if LDAP or NDIS+ + * is used, so nxt_credential_groups_get() allows to get worker user groups in + * main process. In a nutshell the initgroups() calls getgrouplist() + * followed by setgroups(). However older Solaris lacks the getgrouplist(). + * Besides getgrouplist() does not allow to query the exact number of + * groups in some platforms, while NGROUPS_MAX can be quite large (e.g. + * 65536 on Linux). + * So nxt_credential_groups_get() emulates getgrouplist(): at first the + * function saves the super-user groups IDs, then calls initgroups() and saves + * the specified user groups IDs, and then restores the super-user groups IDs. + * This works at least on Linux, FreeBSD, and Solaris, but does not work + * on MacOSX, getgroups(2): + * + * To provide compatibility with applications that use getgroups() in + * environments where users may be in more than {NGROUPS_MAX} groups, + * a variant of getgroups(), obtained when compiling with either the + * macros _DARWIN_UNLIMITED_GETGROUPS or _DARWIN_C_SOURCE defined, can + * be used that is not limited to {NGROUPS_MAX} groups. However, this + * variant only returns the user's default group access list and not + * the group list modified by a call to setgroups(2). + * + * For such cases initgroups() is used in worker process as fallback. + */ + +static nxt_int_t +nxt_credential_groups_get(nxt_task_t *task, nxt_mp_t *mp, nxt_credential_t *uc) +{ + int nsaved, ngroups; + nxt_int_t ret; + nxt_gid_t *saved; + + nsaved = getgroups(0, NULL); + + if (nxt_slow_path(nsaved == -1)) { + nxt_alert(task, "getgroups(0, NULL) failed %E", nxt_errno); + return NXT_ERROR; + } + + nxt_debug(task, "getgroups(0, NULL): %d", nsaved); + + if (nsaved > NGROUPS_MAX) { + /* MacOSX case. */ + + uc->gids = NULL; + uc->ngroups = 0; + + return NXT_OK; + } + + saved = nxt_mp_alloc(mp, nsaved * sizeof(nxt_gid_t)); + + if (nxt_slow_path(saved == NULL)) { + return NXT_ERROR; + } + + ret = NXT_ERROR; + + nsaved = getgroups(nsaved, saved); + + if (nxt_slow_path(nsaved == -1)) { + nxt_alert(task, "getgroups(%d) failed %E", nsaved, nxt_errno); + goto free; + } + + nxt_debug(task, "getgroups(): %d", nsaved); + + if (initgroups(uc->user, uc->base_gid) != 0) { + if (nxt_errno == NXT_EPERM) { + nxt_log(task, NXT_LOG_NOTICE, + "initgroups(%s, %d) failed %E, ignored", + uc->user, uc->base_gid, nxt_errno); + + ret = NXT_OK; + + goto free; + + } else { + nxt_alert(task, "initgroups(%s, %d) failed %E", + uc->user, uc->base_gid, nxt_errno); + goto restore; + } + } + + ngroups = getgroups(0, NULL); + + if (nxt_slow_path(ngroups == -1)) { + nxt_alert(task, "getgroups(0, NULL) failed %E", nxt_errno); + goto restore; + } + + nxt_debug(task, "getgroups(0, NULL): %d", ngroups); + + uc->gids = nxt_mp_alloc(mp, ngroups * sizeof(nxt_gid_t)); + + if (nxt_slow_path(uc->gids == NULL)) { + goto restore; + } + + ngroups = getgroups(ngroups, uc->gids); + + if (nxt_slow_path(ngroups == -1)) { + nxt_alert(task, "getgroups(%d) failed %E", ngroups, nxt_errno); + goto restore; + } + + uc->ngroups = ngroups; + + ret = NXT_OK; + +restore: + + if (nxt_slow_path(setgroups(nsaved, saved) != 0)) { + nxt_alert(task, "setgroups(%d) failed %E", nsaved, nxt_errno); + ret = NXT_ERROR; + } + +free: + + nxt_mp_free(mp, saved); + + return ret; +} + + +#endif + + +nxt_int_t +nxt_credential_set(nxt_task_t *task, nxt_credential_t *uc) +{ + nxt_debug(task, "user cred set: \"%s\" uid:%d base gid:%d", + uc->user, uc->uid, uc->base_gid); + + if (setgid(uc->base_gid) != 0) { + +#if (NXT_HAVE_CLONE) + if (nxt_errno == EINVAL) { + nxt_log(task, NXT_LOG_ERR, "The gid %d isn't valid in the " + "application namespace.", uc->base_gid); + return NXT_ERROR; + } +#endif + + nxt_alert(task, "setgid(%d) failed %E", uc->base_gid, nxt_errno); + return NXT_ERROR; + } + + if (uc->gids != NULL) { + if (setgroups(uc->ngroups, uc->gids) != 0) { + +#if (NXT_HAVE_CLONE) + if (nxt_errno == EINVAL) { + nxt_log(task, NXT_LOG_ERR, "The user \"%s\" (uid: %d) has " + "supplementary group ids not valid in the application " + "namespace.", uc->user, uc->uid); + return NXT_ERROR; + } +#endif + + nxt_alert(task, "setgroups(%i) failed %E", uc->ngroups, nxt_errno); + return NXT_ERROR; + } + + } else { + /* MacOSX fallback. */ + if (initgroups(uc->user, uc->base_gid) != 0) { + nxt_alert(task, "initgroups(%s, %d) failed %E", + uc->user, uc->base_gid, nxt_errno); + return NXT_ERROR; + } + } + + if (setuid(uc->uid) != 0) { + +#if (NXT_HAVE_CLONE) + if (nxt_errno == EINVAL) { + nxt_log(task, NXT_LOG_ERR, "The uid %d (user \"%s\") isn't " + "valid in the application namespace.", uc->uid, uc->user); + return NXT_ERROR; + } +#endif + + nxt_alert(task, "setuid(%d) failed %E", uc->uid, nxt_errno); + return NXT_ERROR; + } + + return NXT_OK; +} diff --git a/src/nxt_credential.h b/src/nxt_credential.h new file mode 100644 index 00000000..e9f59327 --- /dev/null +++ b/src/nxt_credential.h @@ -0,0 +1,28 @@ +/* + * Copyright (C) Igor Sysoev + * Copyright (C) NGINX, Inc. + */ + +#ifndef _NXT_CREDENTIAL_H_INCLUDED_ +#define _NXT_CREDENTIAL_H_INCLUDED_ + + +typedef uid_t nxt_uid_t; +typedef gid_t nxt_gid_t; + +typedef struct { + const char *user; + nxt_uid_t uid; + nxt_gid_t base_gid; + nxt_uint_t ngroups; + nxt_gid_t *gids; +} nxt_credential_t; + + +NXT_EXPORT nxt_int_t nxt_credential_get(nxt_task_t *task, nxt_mp_t *mp, + nxt_credential_t *uc, const char *group); +NXT_EXPORT nxt_int_t nxt_credential_set(nxt_task_t *task, + nxt_credential_t *uc); + + +#endif /* _NXT_CREDENTIAL_H_INCLUDED_ */ diff --git a/src/nxt_main.h b/src/nxt_main.h index 0afebb96..d9e337d2 100644 --- a/src/nxt_main.h +++ b/src/nxt_main.h @@ -58,6 +58,7 @@ typedef uint16_t nxt_port_id_t; #include #include #include +#include #include #include #include diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c index 05027443..02b0c293 100644 --- a/src/nxt_main_process.c +++ b/src/nxt_main_process.c @@ -1677,8 +1677,7 @@ nxt_process_init_creds_set(nxt_task_t *task, nxt_process_init_t *init, { char *str; - init->user_cred = nxt_mp_zalloc(init->mem_pool, - sizeof(nxt_user_cred_t)); + init->user_cred = nxt_mp_zalloc(init->mem_pool, sizeof(nxt_credential_t)); if (nxt_slow_path(init->user_cred == NULL)) { return NXT_ERROR; @@ -1707,5 +1706,5 @@ nxt_process_init_creds_set(nxt_task_t *task, nxt_process_init_t *init, str = NULL; } - return nxt_user_cred_get(task, init->mem_pool, init->user_cred, str); + return nxt_credential_get(task, init->mem_pool, init->user_cred, str); } diff --git a/src/nxt_process.c b/src/nxt_process.c index 0cb0fbdd..75c6ef25 100644 --- a/src/nxt_process.c +++ b/src/nxt_process.c @@ -14,8 +14,6 @@ #include static void nxt_process_start(nxt_task_t *task, nxt_process_t *process); -static nxt_int_t nxt_user_groups_get(nxt_task_t *task, nxt_mp_t *mp, - nxt_user_cred_t *uc); static nxt_int_t nxt_process_worker_setup(nxt_task_t *task, nxt_process_t *process, int parentfd); @@ -279,7 +277,7 @@ nxt_process_start(nxt_task_t *task, nxt_process_t *process) nxt_random_init(&thread->random); if (rt->capabilities.setid && init->user_cred != NULL) { - ret = nxt_user_cred_set(task, init->user_cred); + ret = nxt_credential_set(task, init->user_cred); if (ret != NXT_OK) { goto fail; } @@ -526,337 +524,6 @@ nxt_nanosleep(nxt_nsec_t ns) } -nxt_int_t -nxt_user_cred_get(nxt_task_t *task, nxt_mp_t *mp, nxt_user_cred_t *uc, - const char *group) -{ - struct group *grp; - struct passwd *pwd; - - nxt_errno = 0; - - pwd = getpwnam(uc->user); - - if (nxt_slow_path(pwd == NULL)) { - - if (nxt_errno == 0) { - nxt_alert(task, "getpwnam(\"%s\") failed, user \"%s\" not found", - uc->user, uc->user); - } else { - nxt_alert(task, "getpwnam(\"%s\") failed %E", uc->user, nxt_errno); - } - - return NXT_ERROR; - } - - uc->uid = pwd->pw_uid; - uc->base_gid = pwd->pw_gid; - - if (group != NULL && group[0] != '\0') { - nxt_errno = 0; - - grp = getgrnam(group); - - if (nxt_slow_path(grp == NULL)) { - - if (nxt_errno == 0) { - nxt_alert(task, - "getgrnam(\"%s\") failed, group \"%s\" not found", - group, group); - } else { - nxt_alert(task, "getgrnam(\"%s\") failed %E", group, nxt_errno); - } - - return NXT_ERROR; - } - - uc->base_gid = grp->gr_gid; - } - - nxt_debug(task, "about to get \"%s\" groups (uid:%d, base gid:%d)", - uc->user, uc->uid, uc->base_gid); - - if (nxt_user_groups_get(task, mp, uc) != NXT_OK) { - return NXT_ERROR; - } - -#if (NXT_DEBUG) - { - u_char *p, *end; - nxt_uint_t i; - u_char msg[NXT_MAX_ERROR_STR]; - - p = msg; - end = msg + NXT_MAX_ERROR_STR; - - for (i = 0; i < uc->ngroups; i++) { - p = nxt_sprintf(p, end, "%d%c", uc->gids[i], - i+1 < uc->ngroups ? ',' : '\0'); - } - - nxt_debug(task, "user \"%s\" has gids:%*s", uc->user, p - msg, msg); - } -#endif - - return NXT_OK; -} - - -#if (NXT_HAVE_GETGROUPLIST && !NXT_MACOSX) - -#define NXT_NGROUPS nxt_min(256, NGROUPS_MAX) - - -static nxt_int_t -nxt_user_groups_get(nxt_task_t *task, nxt_mp_t *mp, nxt_user_cred_t *uc) -{ - int ngroups; - gid_t groups[NXT_NGROUPS]; - - ngroups = NXT_NGROUPS; - - if (getgrouplist(uc->user, uc->base_gid, groups, &ngroups) < 0) { - if (nxt_slow_path(ngroups <= NXT_NGROUPS)) { - nxt_alert(task, "getgrouplist(\"%s\", %d, ...) failed %E", uc->user, - uc->base_gid, nxt_errno); - - return NXT_ERROR; - } - } - - if (ngroups > NXT_NGROUPS) { - if (ngroups > NGROUPS_MAX) { - ngroups = NGROUPS_MAX; - } - - uc->ngroups = ngroups; - - uc->gids = nxt_mp_alloc(mp, ngroups * sizeof(gid_t)); - if (nxt_slow_path(uc->gids == NULL)) { - return NXT_ERROR; - } - - if (nxt_slow_path(getgrouplist(uc->user, uc->base_gid, uc->gids, - &ngroups) < 0)) { - - nxt_alert(task, "getgrouplist(\"%s\", %d) failed %E", uc->user, - uc->base_gid, nxt_errno); - - return NXT_ERROR; - } - - return NXT_OK; - } - - uc->ngroups = ngroups; - - uc->gids = nxt_mp_alloc(mp, ngroups * sizeof(gid_t)); - if (nxt_slow_path(uc->gids == NULL)) { - return NXT_ERROR; - } - - nxt_memcpy(uc->gids, groups, ngroups * sizeof(gid_t)); - - return NXT_OK; -} - - -#else - -/* - * For operating systems that lack getgrouplist(3) or it's buggy (MacOS), - * nxt_user_groups_get() stores an array of groups IDs which should be - * set by the setgroups() function for a given user. The initgroups() - * may block a just forked worker process for some time if LDAP or NDIS+ - * is used, so nxt_user_groups_get() allows to get worker user groups in - * main process. In a nutshell the initgroups() calls getgrouplist() - * followed by setgroups(). However older Solaris lacks the getgrouplist(). - * Besides getgrouplist() does not allow to query the exact number of - * groups in some platforms, while NGROUPS_MAX can be quite large (e.g. - * 65536 on Linux). - * So nxt_user_groups_get() emulates getgrouplist(): at first the function - * saves the super-user groups IDs, then calls initgroups() and saves the - * specified user groups IDs, and then restores the super-user groups IDs. - * This works at least on Linux, FreeBSD, and Solaris, but does not work - * on MacOSX, getgroups(2): - * - * To provide compatibility with applications that use getgroups() in - * environments where users may be in more than {NGROUPS_MAX} groups, - * a variant of getgroups(), obtained when compiling with either the - * macros _DARWIN_UNLIMITED_GETGROUPS or _DARWIN_C_SOURCE defined, can - * be used that is not limited to {NGROUPS_MAX} groups. However, this - * variant only returns the user's default group access list and not - * the group list modified by a call to setgroups(2). - * - * For such cases initgroups() is used in worker process as fallback. - */ - -static nxt_int_t -nxt_user_groups_get(nxt_task_t *task, nxt_mp_t *mp, nxt_user_cred_t *uc) -{ - int nsaved, ngroups; - nxt_int_t ret; - nxt_gid_t *saved; - - nsaved = getgroups(0, NULL); - - if (nxt_slow_path(nsaved == -1)) { - nxt_alert(task, "getgroups(0, NULL) failed %E", nxt_errno); - return NXT_ERROR; - } - - nxt_debug(task, "getgroups(0, NULL): %d", nsaved); - - if (nsaved > NGROUPS_MAX) { - /* MacOSX case. */ - - uc->gids = NULL; - uc->ngroups = 0; - - return NXT_OK; - } - - saved = nxt_mp_alloc(mp, nsaved * sizeof(nxt_gid_t)); - - if (nxt_slow_path(saved == NULL)) { - return NXT_ERROR; - } - - ret = NXT_ERROR; - - nsaved = getgroups(nsaved, saved); - - if (nxt_slow_path(nsaved == -1)) { - nxt_alert(task, "getgroups(%d) failed %E", nsaved, nxt_errno); - goto free; - } - - nxt_debug(task, "getgroups(): %d", nsaved); - - if (initgroups(uc->user, uc->base_gid) != 0) { - if (nxt_errno == NXT_EPERM) { - nxt_log(task, NXT_LOG_NOTICE, - "initgroups(%s, %d) failed %E, ignored", - uc->user, uc->base_gid, nxt_errno); - - ret = NXT_OK; - - goto free; - - } else { - nxt_alert(task, "initgroups(%s, %d) failed %E", - uc->user, uc->base_gid, nxt_errno); - goto restore; - } - } - - ngroups = getgroups(0, NULL); - - if (nxt_slow_path(ngroups == -1)) { - nxt_alert(task, "getgroups(0, NULL) failed %E", nxt_errno); - goto restore; - } - - nxt_debug(task, "getgroups(0, NULL): %d", ngroups); - - uc->gids = nxt_mp_alloc(mp, ngroups * sizeof(nxt_gid_t)); - - if (nxt_slow_path(uc->gids == NULL)) { - goto restore; - } - - ngroups = getgroups(ngroups, uc->gids); - - if (nxt_slow_path(ngroups == -1)) { - nxt_alert(task, "getgroups(%d) failed %E", ngroups, nxt_errno); - goto restore; - } - - uc->ngroups = ngroups; - - ret = NXT_OK; - -restore: - - if (nxt_slow_path(setgroups(nsaved, saved) != 0)) { - nxt_alert(task, "setgroups(%d) failed %E", nsaved, nxt_errno); - ret = NXT_ERROR; - } - -free: - - nxt_mp_free(mp, saved); - - return ret; -} - - -#endif - - -nxt_int_t -nxt_user_cred_set(nxt_task_t *task, nxt_user_cred_t *uc) -{ - nxt_debug(task, "user cred set: \"%s\" uid:%d base gid:%d", - uc->user, uc->uid, uc->base_gid); - - if (setgid(uc->base_gid) != 0) { - -#if (NXT_HAVE_CLONE) - if (nxt_errno == EINVAL) { - nxt_log(task, NXT_LOG_ERR, "The gid %d isn't valid in the " - "application namespace.", uc->base_gid); - return NXT_ERROR; - } -#endif - - nxt_alert(task, "setgid(%d) failed %E", uc->base_gid, nxt_errno); - return NXT_ERROR; - } - - if (uc->gids != NULL) { - if (setgroups(uc->ngroups, uc->gids) != 0) { - -#if (NXT_HAVE_CLONE) - if (nxt_errno == EINVAL) { - nxt_log(task, NXT_LOG_ERR, "The user \"%s\" (uid: %d) has " - "supplementary group ids not valid in the application " - "namespace.", uc->user, uc->uid); - return NXT_ERROR; - } -#endif - - nxt_alert(task, "setgroups(%i) failed %E", uc->ngroups, nxt_errno); - return NXT_ERROR; - } - - } else { - /* MacOSX fallback. */ - if (initgroups(uc->user, uc->base_gid) != 0) { - nxt_alert(task, "initgroups(%s, %d) failed %E", - uc->user, uc->base_gid, nxt_errno); - return NXT_ERROR; - } - } - - if (setuid(uc->uid) != 0) { - -#if (NXT_HAVE_CLONE) - if (nxt_errno == EINVAL) { - nxt_log(task, NXT_LOG_ERR, "The uid %d (user \"%s\") isn't " - "valid in the application namespace.", uc->uid, uc->user); - return NXT_ERROR; - } -#endif - - nxt_alert(task, "setuid(%d) failed %E", uc->uid, nxt_errno); - return NXT_ERROR; - } - - return NXT_OK; -} - - void nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i) { diff --git a/src/nxt_process.h b/src/nxt_process.h index 3f247ae1..457d2299 100644 --- a/src/nxt_process.h +++ b/src/nxt_process.h @@ -11,18 +11,8 @@ typedef pid_t nxt_pid_t; -typedef uid_t nxt_uid_t; -typedef gid_t nxt_gid_t; -typedef struct { - const char *user; - nxt_uid_t uid; - nxt_gid_t base_gid; - nxt_uint_t ngroups; - nxt_gid_t *gids; -} nxt_user_cred_t; - typedef struct { nxt_int_t flags; nxt_conf_value_t *uidmap; @@ -38,7 +28,7 @@ struct nxt_process_init_s { nxt_mp_t *mem_pool; nxt_process_start_t start; const char *name; - nxt_user_cred_t *user_cred; + nxt_credential_t *user_cred; const nxt_port_handlers_t *port_handlers; const nxt_sig_event_t *signals; @@ -153,9 +143,6 @@ NXT_EXPORT void nxt_process_title(nxt_task_t *task, const char *fmt, ...); #define nxt_abort() \ (void) raise(SIGABRT) -NXT_EXPORT nxt_int_t nxt_user_cred_get(nxt_task_t *task, nxt_mp_t *mp, - nxt_user_cred_t *uc, const char *group); -NXT_EXPORT nxt_int_t nxt_user_cred_set(nxt_task_t *task, nxt_user_cred_t *uc); NXT_EXPORT extern nxt_pid_t nxt_pid; NXT_EXPORT extern nxt_pid_t nxt_ppid; diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c index afd2cb66..80b25c1b 100644 --- a/src/nxt_runtime.c +++ b/src/nxt_runtime.c @@ -705,7 +705,7 @@ nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt) } if (rt->capabilities.setid) { - ret = nxt_user_cred_get(task, rt->mem_pool, &rt->user_cred, + ret = nxt_credential_get(task, rt->mem_pool, &rt->user_cred, rt->group); if (nxt_slow_path(ret != NXT_OK)) { diff --git a/src/nxt_runtime.h b/src/nxt_runtime.h index d5b340b6..f8d19ec6 100644 --- a/src/nxt_runtime.h +++ b/src/nxt_runtime.h @@ -58,7 +58,7 @@ struct nxt_runtime_s { const char *engine; uint32_t engine_connections; uint32_t auxiliary_threads; - nxt_user_cred_t user_cred; + nxt_credential_t user_cred; nxt_capabilities_t capabilities; const char *group; const char *pid; -- cgit From 411daeaa532c47328ab901a7ba9ea5dcd876be06 Mon Sep 17 00:00:00 2001 From: Tiago Natel Date: Fri, 6 Dec 2019 16:52:50 +0000 Subject: Isolation: allowed the use of credentials with unpriv userns. The setuid/setgid syscalls requires root capabilities but if the kernel supports unprivileged user namespace then the child process has the full set of capabilities in the new namespace, then we can allow setting "user" and "group" in such cases (this is a common security use case). Tests were added to ensure user gets meaningful error messages for uid/gid mapping misconfigurations. --- src/nxt_clone.c | 311 +++++++++++++++++++----- src/nxt_clone.h | 40 ++- src/nxt_credential.c | 66 ++--- src/nxt_credential.h | 4 +- src/nxt_lib.c | 2 + src/nxt_main_process.c | 213 ++++++++++++++-- src/nxt_process.c | 32 ++- src/nxt_process.h | 18 +- src/test/nxt_clone_test.c | 601 ++++++++++++++++++++++++++++++++++++++++++++++ src/test/nxt_tests.c | 6 + src/test/nxt_tests.h | 1 + 11 files changed, 1159 insertions(+), 135 deletions(-) create mode 100644 src/test/nxt_clone_test.c (limited to 'src') diff --git a/src/nxt_clone.c b/src/nxt_clone.c index a2c376a3..9ee3c012 100644 --- a/src/nxt_clone.c +++ b/src/nxt_clone.c @@ -25,26 +25,18 @@ nxt_clone(nxt_int_t flags) #if (NXT_HAVE_CLONE_NEWUSER) -/* map uid 65534 to unit pid */ -#define NXT_DEFAULT_UNPRIV_MAP "65534 %d 1" - -nxt_int_t nxt_clone_proc_setgroups(nxt_task_t *task, pid_t child_pid, +nxt_int_t nxt_clone_credential_setgroups(nxt_task_t *task, pid_t child_pid, const char *str); -nxt_int_t nxt_clone_proc_map_set(nxt_task_t *task, const char* mapfile, - pid_t pid, nxt_int_t defval, nxt_conf_value_t *mapobj); -nxt_int_t nxt_clone_proc_map_write(nxt_task_t *task, const char *mapfile, +nxt_int_t nxt_clone_credential_map_set(nxt_task_t *task, const char* mapfile, + pid_t pid, nxt_int_t default_container, nxt_int_t default_host, + nxt_clone_credential_map_t *map); +nxt_int_t nxt_clone_credential_map_write(nxt_task_t *task, const char *mapfile, pid_t pid, u_char *mapinfo); -typedef struct { - nxt_int_t container; - nxt_int_t host; - nxt_int_t size; -} nxt_clone_procmap_t; - - nxt_int_t -nxt_clone_proc_setgroups(nxt_task_t *task, pid_t child_pid, const char *str) +nxt_clone_credential_setgroups(nxt_task_t *task, pid_t child_pid, + const char *str) { int fd, n; u_char *p, *end; @@ -89,8 +81,8 @@ nxt_clone_proc_setgroups(nxt_task_t *task, pid_t child_pid, const char *str) nxt_int_t -nxt_clone_proc_map_write(nxt_task_t *task, const char *mapfile, pid_t pid, - u_char *mapinfo) +nxt_clone_credential_map_write(nxt_task_t *task, const char *mapfile, + pid_t pid, u_char *mapinfo) { int len, mapfd; u_char *p, *end; @@ -139,17 +131,13 @@ nxt_clone_proc_map_write(nxt_task_t *task, const char *mapfile, pid_t pid, nxt_int_t -nxt_clone_proc_map_set(nxt_task_t *task, const char* mapfile, pid_t pid, - nxt_int_t defval, nxt_conf_value_t *mapobj) +nxt_clone_credential_map_set(nxt_task_t *task, const char* mapfile, pid_t pid, + nxt_int_t default_container, nxt_int_t default_host, + nxt_clone_credential_map_t *map) { - u_char *p, *end, *mapinfo; - nxt_int_t container, host, size; - nxt_int_t ret, len, count, i; - nxt_conf_value_t *obj, *value; - - static nxt_str_t str_cont = nxt_string("container"); - static nxt_str_t str_host = nxt_string("host"); - static nxt_str_t str_size = nxt_string("size"); + u_char *p, *end, *mapinfo; + nxt_int_t ret, len; + nxt_uint_t i; /* * uid_map one-entry size: @@ -157,44 +145,28 @@ nxt_clone_proc_map_set(nxt_task_t *task, const char* mapfile, pid_t pid, */ len = sizeof(u_char) * (10 + 10 + 10 + 2 + 1); - if (mapobj != NULL) { - count = nxt_conf_array_elements_count(mapobj); - - if (count == 0) { - goto default_map; - } - - len = len * count + 1; + if (map->size > 0) { + len = len * map->size + 1; mapinfo = nxt_malloc(len); if (nxt_slow_path(mapinfo == NULL)) { - nxt_alert(task, "failed to allocate uid_map buffer"); return NXT_ERROR; } p = mapinfo; end = mapinfo + len; - for (i = 0; i < count; i++) { - obj = nxt_conf_get_array_element(mapobj, i); + for (i = 0; i < map->size; i++) { + p = nxt_sprintf(p, end, "%d %d %d", map->map[i].container, + map->map[i].host, map->map[i].size); - value = nxt_conf_get_object_member(obj, &str_cont, NULL); - container = nxt_conf_get_integer(value); - - value = nxt_conf_get_object_member(obj, &str_host, NULL); - host = nxt_conf_get_integer(value); - - value = nxt_conf_get_object_member(obj, &str_size, NULL); - size = nxt_conf_get_integer(value); - - p = nxt_sprintf(p, end, "%d %d %d", container, host, size); if (nxt_slow_path(p == end)) { - nxt_alert(task, "write past the uid_map buffer"); + nxt_alert(task, "write past the mapinfo buffer"); nxt_free(mapinfo); return NXT_ERROR; } - if (i+1 < count) { + if (i+1 < map->size) { *p++ = '\n'; } else { @@ -203,27 +175,24 @@ nxt_clone_proc_map_set(nxt_task_t *task, const char* mapfile, pid_t pid, } } else { - -default_map: - mapinfo = nxt_malloc(len); if (nxt_slow_path(mapinfo == NULL)) { - nxt_alert(task, "failed to allocate uid_map buffer"); return NXT_ERROR; } end = mapinfo + len; - p = nxt_sprintf(mapinfo, end, NXT_DEFAULT_UNPRIV_MAP, defval); + p = nxt_sprintf(mapinfo, end, "%d %d 1", + default_container, default_host); *p = '\0'; if (nxt_slow_path(p == end)) { - nxt_alert(task, "write past the %s buffer", mapfile); + nxt_alert(task, "write past mapinfo buffer"); nxt_free(mapinfo); return NXT_ERROR; } } - ret = nxt_clone_proc_map_write(task, mapfile, pid, mapinfo); + ret = nxt_clone_credential_map_write(task, mapfile, pid, mapinfo); nxt_free(mapinfo); @@ -232,31 +201,50 @@ default_map: nxt_int_t -nxt_clone_proc_map(nxt_task_t *task, pid_t pid, nxt_process_clone_t *clone) +nxt_clone_credential_map(nxt_task_t *task, pid_t pid, + nxt_credential_t *app_creds, nxt_clone_t *clone) { nxt_int_t ret; - nxt_int_t uid, gid; + nxt_int_t default_host_uid; + nxt_int_t default_host_gid; const char *rule; nxt_runtime_t *rt; rt = task->thread->runtime; - uid = geteuid(); - gid = getegid(); - rule = rt->capabilities.setid ? "allow" : "deny"; + if (rt->capabilities.setid) { + rule = "allow"; + + /* + * By default we don't map a privileged user + */ + default_host_uid = app_creds->uid; + default_host_gid = app_creds->base_gid; + } else { + rule = "deny"; + + default_host_uid = nxt_euid; + default_host_gid = nxt_egid; + } + + ret = nxt_clone_credential_map_set(task, "uid_map", pid, app_creds->uid, + default_host_uid, + &clone->uidmap); - ret = nxt_clone_proc_map_set(task, "uid_map", pid, uid, clone->uidmap); if (nxt_slow_path(ret != NXT_OK)) { return NXT_ERROR; } - ret = nxt_clone_proc_setgroups(task, pid, rule); + ret = nxt_clone_credential_setgroups(task, pid, rule); if (nxt_slow_path(ret != NXT_OK)) { nxt_alert(task, "failed to write /proc/%d/setgroups", pid); return NXT_ERROR; } - ret = nxt_clone_proc_map_set(task, "gid_map", pid, gid, clone->gidmap); + ret = nxt_clone_credential_map_set(task, "gid_map", pid, app_creds->base_gid, + default_host_gid, + &clone->gidmap); + if (nxt_slow_path(ret != NXT_OK)) { return NXT_ERROR; } @@ -264,4 +252,197 @@ nxt_clone_proc_map(nxt_task_t *task, pid_t pid, nxt_process_clone_t *clone) return NXT_OK; } + +nxt_int_t +nxt_clone_vldt_credential_uidmap(nxt_task_t *task, + nxt_clone_credential_map_t *map, nxt_credential_t *creds) +{ + nxt_int_t id; + nxt_uint_t i; + nxt_runtime_t *rt; + nxt_clone_map_entry_t m; + + if (map->size == 0) { + return NXT_OK; + } + + rt = task->thread->runtime; + + if (!rt->capabilities.setid) { + if (nxt_slow_path(map->size > 1)) { + nxt_log(task, NXT_LOG_NOTICE, "\"uidmap\" field has %d entries " + "but unprivileged unit has a maximum of 1 map.", + map->size); + + return NXT_ERROR; + } + + id = map->map[0].host; + + if (nxt_slow_path((nxt_uid_t) id != nxt_euid)) { + nxt_log(task, NXT_LOG_NOTICE, "\"uidmap\" field has an entry for " + "host uid %d but unprivileged unit can only map itself " + "(uid %d) into child namespaces.", id, nxt_euid); + + return NXT_ERROR; + } + + return NXT_OK; + } + + for (i = 0; i < map->size; i++) { + m = map->map[i]; + + if (creds->uid >= (nxt_uid_t) m.container + && creds->uid < (nxt_uid_t) (m.container + m.size)) + { + return NXT_OK; + } + } + + nxt_log(task, NXT_LOG_NOTICE, "\"uidmap\" field has no \"container\" " + "entry for user \"%s\" (uid %d)", creds->user, creds->uid); + + return NXT_ERROR; +} + + +nxt_int_t +nxt_clone_vldt_credential_gidmap(nxt_task_t *task, + nxt_clone_credential_map_t *map, nxt_credential_t *creds) +{ + nxt_uint_t base_ok, gid_ok, gids_ok; + nxt_uint_t i, j; + nxt_runtime_t *rt; + nxt_clone_map_entry_t m; + + rt = task->thread->runtime; + + if (!rt->capabilities.setid) { + if (creds->ngroups > 0 + && !(creds->ngroups == 1 && creds->gids[0] == creds->base_gid)) { + nxt_log(task, NXT_LOG_NOTICE, + "unprivileged unit disallow supplementary groups for " + "new namespace (user \"%s\" has %d group%s).", + creds->user, creds->ngroups, + creds->ngroups > 1 ? "s" : ""); + + return NXT_ERROR; + } + + if (map->size == 0) { + return NXT_OK; + } + + if (nxt_slow_path(map->size > 1)) { + nxt_log(task, NXT_LOG_NOTICE, "\"gidmap\" field has %d entries " + "but unprivileged unit has a maximum of 1 map.", + map->size); + + return NXT_ERROR; + } + + m = map->map[0]; + + if (nxt_slow_path((nxt_gid_t) m.host != nxt_egid)) { + nxt_log(task, NXT_LOG_ERR, "\"gidmap\" field has an entry for " + "host gid %d but unprivileged unit can only map itself " + "(gid %d) into child namespaces.", m.host, nxt_egid); + + return NXT_ERROR; + } + + if (nxt_slow_path(m.size > 1)) { + nxt_log(task, NXT_LOG_ERR, "\"gidmap\" field has an entry with " + "\"size\": %d, but for unprivileged unit it must be 1.", + m.size); + + return NXT_ERROR; + } + + if (nxt_slow_path((nxt_gid_t) m.container != creds->base_gid)) { + nxt_log(task, NXT_LOG_ERR, + "\"gidmap\" field has no \"container\" entry for gid %d.", + creds->base_gid); + + return NXT_ERROR; + } + + return NXT_OK; + } + + if (map->size == 0) { + if (creds->ngroups > 0 + && !(creds->ngroups == 1 && creds->gids[0] == creds->base_gid)) + { + nxt_log(task, NXT_LOG_ERR, "\"gidmap\" field has no entries " + "but user \"%s\" has %d suplementary group%s.", + creds->user, creds->ngroups, + creds->ngroups > 1 ? "s" : ""); + + return NXT_ERROR; + } + + return NXT_OK; + } + + base_ok = 0; + gids_ok = 0; + + for (i = 0; i < creds->ngroups; i++) { + gid_ok = 0; + + for (j = 0; j < map->size; j++) { + m = map->map[j]; + + if (!base_ok && creds->base_gid >= (nxt_gid_t) m.container + && creds->base_gid < (nxt_gid_t) (m.container+m.size)) + { + base_ok = 1; + } + + if (creds->gids[i] >= (nxt_gid_t) m.container + && creds->gids[i] < (nxt_gid_t) (m.container+m.size)) + { + gid_ok = 1; + break; + } + } + + if (nxt_fast_path(gid_ok)) { + gids_ok++; + } + } + + if (!base_ok) { + for (i = 0; i < map->size; i++) { + m = map->map[i]; + + if (creds->base_gid >= (nxt_gid_t) m.container + && creds->base_gid < (nxt_gid_t) (m.container+m.size)) + { + base_ok = 1; + break; + } + } + } + + if (nxt_slow_path(!base_ok)) { + nxt_log(task, NXT_LOG_ERR, "\"gidmap\" field has no \"container\" " + "entry for gid %d.", creds->base_gid); + + return NXT_ERROR; + } + + if (nxt_slow_path(gids_ok < creds->ngroups)) { + nxt_log(task, NXT_LOG_ERR, "\"gidmap\" field has missing " + "suplementary gid mappings (found %d out of %d).", gids_ok, + creds->ngroups); + + return NXT_ERROR; + } + + return NXT_OK; +} + #endif diff --git a/src/nxt_clone.h b/src/nxt_clone.h index 50dec0b4..dcccf1db 100644 --- a/src/nxt_clone.h +++ b/src/nxt_clone.h @@ -7,11 +7,47 @@ #define _NXT_CLONE_INCLUDED_ +#if (NXT_HAVE_CLONE_NEWUSER) + +typedef struct { + nxt_int_t container; + nxt_int_t host; + nxt_int_t size; +} nxt_clone_map_entry_t; + +typedef struct { + nxt_uint_t size; + nxt_clone_map_entry_t *map; +} nxt_clone_credential_map_t; + +#endif + +typedef struct { + nxt_int_t flags; + +#if (NXT_HAVE_CLONE_NEWUSER) + nxt_clone_credential_map_t uidmap; + nxt_clone_credential_map_t gidmap; +#endif + +} nxt_clone_t; + + pid_t nxt_clone(nxt_int_t flags); + #if (NXT_HAVE_CLONE_NEWUSER) -nxt_int_t nxt_clone_proc_map(nxt_task_t *task, pid_t pid, - nxt_process_clone_t *clone); + +#define NXT_CLONE_USER(flags) \ + ((flags & CLONE_NEWUSER) == CLONE_NEWUSER) + +NXT_EXPORT nxt_int_t nxt_clone_credential_map(nxt_task_t *task, pid_t pid, + nxt_credential_t *creds, nxt_clone_t *clone); +NXT_EXPORT nxt_int_t nxt_clone_vldt_credential_uidmap(nxt_task_t *task, + nxt_clone_credential_map_t *map, nxt_credential_t *creds); +NXT_EXPORT nxt_int_t nxt_clone_vldt_credential_gidmap(nxt_task_t *task, + nxt_clone_credential_map_t *map, nxt_credential_t *creds); + #endif #endif /* _NXT_CLONE_INCLUDED_ */ diff --git a/src/nxt_credential.c b/src/nxt_credential.c index 9f275b7d..168db9cf 100644 --- a/src/nxt_credential.c +++ b/src/nxt_credential.c @@ -280,61 +280,69 @@ free: nxt_int_t -nxt_credential_set(nxt_task_t *task, nxt_credential_t *uc) +nxt_credential_setuid(nxt_task_t *task, nxt_credential_t *uc) { - nxt_debug(task, "user cred set: \"%s\" uid:%d base gid:%d", - uc->user, uc->uid, uc->base_gid); + nxt_debug(task, "user cred set: \"%s\" uid:%d", uc->user, uc->uid); - if (setgid(uc->base_gid) != 0) { + if (setuid(uc->uid) != 0) { #if (NXT_HAVE_CLONE) if (nxt_errno == EINVAL) { - nxt_log(task, NXT_LOG_ERR, "The gid %d isn't valid in the " - "application namespace.", uc->base_gid); + nxt_log(task, NXT_LOG_ERR, "The uid %d (user \"%s\") isn't " + "valid in the application namespace.", uc->uid, uc->user); return NXT_ERROR; } #endif - nxt_alert(task, "setgid(%d) failed %E", uc->base_gid, nxt_errno); + nxt_alert(task, "setuid(%d) failed %E", uc->uid, nxt_errno); return NXT_ERROR; } - if (uc->gids != NULL) { - if (setgroups(uc->ngroups, uc->gids) != 0) { + return NXT_OK; +} -#if (NXT_HAVE_CLONE) - if (nxt_errno == EINVAL) { - nxt_log(task, NXT_LOG_ERR, "The user \"%s\" (uid: %d) has " - "supplementary group ids not valid in the application " - "namespace.", uc->user, uc->uid); - return NXT_ERROR; - } -#endif - nxt_alert(task, "setgroups(%i) failed %E", uc->ngroups, nxt_errno); - return NXT_ERROR; - } +nxt_int_t +nxt_credential_setgids(nxt_task_t *task, nxt_credential_t *uc) +{ + nxt_runtime_t *rt; - } else { - /* MacOSX fallback. */ - if (initgroups(uc->user, uc->base_gid) != 0) { - nxt_alert(task, "initgroups(%s, %d) failed %E", - uc->user, uc->base_gid, nxt_errno); + nxt_debug(task, "user cred set gids: base gid:%d, ngroups: %d", + uc->base_gid, uc->ngroups); + + rt = task->thread->runtime; + + if (setgid(uc->base_gid) != 0) { + +#if (NXT_HAVE_CLONE) + if (nxt_errno == EINVAL) { + nxt_log(task, NXT_LOG_ERR, "The gid %d isn't valid in the " + "application namespace.", uc->base_gid); return NXT_ERROR; } +#endif + + nxt_alert(task, "setgid(%d) failed %E", uc->base_gid, nxt_errno); + return NXT_ERROR; } - if (setuid(uc->uid) != 0) { + if (!rt->capabilities.setid) { + return NXT_OK; + } + + if (nxt_slow_path(uc->ngroups > 0 + && setgroups(uc->ngroups, uc->gids) != 0)) { #if (NXT_HAVE_CLONE) if (nxt_errno == EINVAL) { - nxt_log(task, NXT_LOG_ERR, "The uid %d (user \"%s\") isn't " - "valid in the application namespace.", uc->uid, uc->user); + nxt_log(task, NXT_LOG_ERR, "The user \"%s\" (uid: %d) has " + "supplementary group ids not valid in the application " + "namespace.", uc->user, uc->uid); return NXT_ERROR; } #endif - nxt_alert(task, "setuid(%d) failed %E", uc->uid, nxt_errno); + nxt_alert(task, "setgroups(%i) failed %E", uc->ngroups, nxt_errno); return NXT_ERROR; } diff --git a/src/nxt_credential.h b/src/nxt_credential.h index e9f59327..243eba83 100644 --- a/src/nxt_credential.h +++ b/src/nxt_credential.h @@ -21,7 +21,9 @@ typedef struct { NXT_EXPORT nxt_int_t nxt_credential_get(nxt_task_t *task, nxt_mp_t *mp, nxt_credential_t *uc, const char *group); -NXT_EXPORT nxt_int_t nxt_credential_set(nxt_task_t *task, +NXT_EXPORT nxt_int_t nxt_credential_setuid(nxt_task_t *task, + nxt_credential_t *uc); +NXT_EXPORT nxt_int_t nxt_credential_setgids(nxt_task_t *task, nxt_credential_t *uc); diff --git a/src/nxt_lib.c b/src/nxt_lib.c index db3d29c1..1634a2b8 100644 --- a/src/nxt_lib.c +++ b/src/nxt_lib.c @@ -43,6 +43,8 @@ nxt_lib_start(const char *app, char **argv, char ***envp) nxt_pid = getpid(); nxt_ppid = getppid(); + nxt_euid = geteuid(); + nxt_egid = getegid(); #if (NXT_DEBUG) diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c index 02b0c293..79b9ee1f 100644 --- a/src/nxt_main_process.c +++ b/src/nxt_main_process.c @@ -80,10 +80,22 @@ static nxt_int_t nxt_process_init_name_set(nxt_process_init_t *init, static nxt_int_t nxt_process_init_creds_set(nxt_task_t *task, nxt_process_init_t *init, nxt_str_t *user, nxt_str_t *group); -static nxt_int_t nxt_init_set_isolation(nxt_task_t *task, - nxt_process_init_t *init, nxt_conf_value_t *isolation); -static nxt_int_t nxt_init_set_ns(nxt_task_t *task, nxt_process_init_t *init, - nxt_conf_value_t *ns); +static nxt_int_t nxt_init_isolation(nxt_task_t *task, + nxt_conf_value_t *isolation, nxt_process_init_t *init); +#if (NXT_HAVE_CLONE) +static nxt_int_t nxt_init_clone_flags(nxt_task_t *task, + nxt_conf_value_t *namespaces, nxt_process_init_t *init); +#endif + +#if (NXT_HAVE_CLONE_NEWUSER) +static nxt_int_t nxt_init_isolation_creds(nxt_task_t *task, + nxt_conf_value_t *isolation, nxt_process_init_t *init); +static nxt_int_t nxt_init_vldt_isolation_creds(nxt_task_t *task, + nxt_process_init_t *init); +static nxt_int_t nxt_init_isolation_credential_map(nxt_task_t *task, + nxt_mp_t *mem_pool, nxt_conf_value_t *map_array, + nxt_clone_credential_map_t *map); +#endif const nxt_sig_event_t nxt_main_process_signals[] = { nxt_event_signal(SIGHUP, nxt_main_process_signal_handler), @@ -641,6 +653,7 @@ static nxt_int_t nxt_main_start_worker_process(nxt_task_t *task, nxt_runtime_t *rt, nxt_common_app_conf_t *app_conf, uint32_t stream) { + nxt_int_t cap_setid; nxt_int_t ret; nxt_process_init_t *init; @@ -649,7 +662,22 @@ nxt_main_start_worker_process(nxt_task_t *task, nxt_runtime_t *rt, return NXT_ERROR; } - if (rt->capabilities.setid) { + cap_setid = rt->capabilities.setid; + + if (app_conf->isolation != NULL) { + ret = nxt_init_isolation(task, app_conf->isolation, init); + if (nxt_slow_path(ret != NXT_OK)) { + goto fail; + } + } + +#if (NXT_HAVE_CLONE_NEWUSER) + if (NXT_CLONE_USER(init->isolation.clone.flags)) { + cap_setid = 1; + } +#endif + + if (cap_setid) { ret = nxt_process_init_creds_set(task, init, &app_conf->user, &app_conf->group); if (nxt_slow_path(ret != NXT_OK)) { @@ -679,10 +707,12 @@ nxt_main_start_worker_process(nxt_task_t *task, nxt_runtime_t *rt, init->data = app_conf; init->stream = stream; - ret = nxt_init_set_isolation(task, init, app_conf->isolation); +#if (NXT_HAVE_CLONE_NEWUSER) + ret = nxt_init_vldt_isolation_creds(task, init); if (nxt_slow_path(ret != NXT_OK)) { goto fail; } +#endif return nxt_main_create_worker_process(task, rt, init); @@ -1487,45 +1517,178 @@ nxt_main_port_access_log_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) static nxt_int_t -nxt_init_set_isolation(nxt_task_t *task, nxt_process_init_t *init, - nxt_conf_value_t *isolation) +nxt_init_isolation(nxt_task_t *task, nxt_conf_value_t *isolation, + nxt_process_init_t *init) +{ +#if (NXT_HAVE_CLONE) + nxt_int_t ret; + nxt_conf_value_t *obj; + + static nxt_str_t nsname = nxt_string("namespaces"); + + obj = nxt_conf_get_object_member(isolation, &nsname, NULL); + if (obj != NULL) { + ret = nxt_init_clone_flags(task, obj, init); + if (nxt_slow_path(ret != NXT_OK)) { + return NXT_ERROR; + } + } +#endif + +#if (NXT_HAVE_CLONE_NEWUSER) + ret = nxt_init_isolation_creds(task, isolation, init); + if (nxt_slow_path(ret != NXT_OK)) { + return NXT_ERROR; + } +#endif + + return NXT_OK; +} + + +#if (NXT_HAVE_CLONE_NEWUSER) + +static nxt_int_t +nxt_init_isolation_creds(nxt_task_t *task, nxt_conf_value_t *isolation, + nxt_process_init_t *init) { nxt_int_t ret; - nxt_conf_value_t *object; + nxt_clone_t *clone; + nxt_conf_value_t *array; - static nxt_str_t nsname = nxt_string("namespaces"); static nxt_str_t uidname = nxt_string("uidmap"); static nxt_str_t gidname = nxt_string("gidmap"); - if (isolation == NULL) { + clone = &init->isolation.clone; + + array = nxt_conf_get_object_member(isolation, &uidname, NULL); + if (array != NULL) { + ret = nxt_init_isolation_credential_map(task, init->mem_pool, array, + &clone->uidmap); + + if (nxt_slow_path(ret != NXT_OK)) { + return NXT_ERROR; + } + } + + array = nxt_conf_get_object_member(isolation, &gidname, NULL); + if (array != NULL) { + ret = nxt_init_isolation_credential_map(task, init->mem_pool, array, + &clone->gidmap); + + if (nxt_slow_path(ret != NXT_OK)) { + return NXT_ERROR; + } + } + + return NXT_OK; +} + + +static nxt_int_t +nxt_init_vldt_isolation_creds(nxt_task_t *task, nxt_process_init_t *init) +{ + nxt_int_t ret; + nxt_clone_t *clone; + + clone = &init->isolation.clone; + + if (clone->uidmap.size == 0 && clone->gidmap.size == 0) { return NXT_OK; } - object = nxt_conf_get_object_member(isolation, &nsname, NULL); - if (object != NULL) { - ret = nxt_init_set_ns(task, init, object); - if (ret != NXT_OK) { - return ret; + if (!NXT_CLONE_USER(clone->flags)) { + if (nxt_slow_path(clone->uidmap.size > 0)) { + nxt_log(task, NXT_LOG_ERR, "\"uidmap\" is set but " + "\"isolation.namespaces.credential\" is false or unset"); + + return NXT_ERROR; } + + if (nxt_slow_path(clone->gidmap.size > 0)) { + nxt_log(task, NXT_LOG_ERR, "\"gidmap\" is set but " + "\"isolation.namespaces.credential\" is false or unset"); + + return NXT_ERROR; + } + + return NXT_OK; + } + + ret = nxt_clone_vldt_credential_uidmap(task, &clone->uidmap, + init->user_cred); + + if (nxt_slow_path(ret != NXT_OK)) { + return NXT_ERROR; + } + + return nxt_clone_vldt_credential_gidmap(task, &clone->gidmap, + init->user_cred); +} + + +static nxt_int_t +nxt_init_isolation_credential_map(nxt_task_t *task, nxt_mp_t *mem_pool, + nxt_conf_value_t *map_array, nxt_clone_credential_map_t *map) +{ + nxt_int_t ret; + nxt_uint_t i; + nxt_conf_value_t *obj; + + static nxt_conf_map_t nxt_clone_map_entry_conf[] = { + { + nxt_string("container"), + NXT_CONF_MAP_INT, + offsetof(nxt_clone_map_entry_t, container), + }, + + { + nxt_string("host"), + NXT_CONF_MAP_INT, + offsetof(nxt_clone_map_entry_t, host), + }, + + { + nxt_string("size"), + NXT_CONF_MAP_INT, + offsetof(nxt_clone_map_entry_t, size), + }, + }; + + map->size = nxt_conf_array_elements_count(map_array); + + if (map->size == 0) { + return NXT_OK; } - object = nxt_conf_get_object_member(isolation, &uidname, NULL); - if (object != NULL) { - init->isolation.clone.uidmap = object; + map->map = nxt_mp_alloc(mem_pool, + map->size * sizeof(nxt_clone_map_entry_t)); + if (nxt_slow_path(map->map == NULL)) { + return NXT_ERROR; } - object = nxt_conf_get_object_member(isolation, &gidname, NULL); - if (object != NULL) { - init->isolation.clone.gidmap = object; + for (i = 0; i < map->size; i++) { + obj = nxt_conf_get_array_element(map_array, i); + + ret = nxt_conf_map_object(mem_pool, obj, nxt_clone_map_entry_conf, + nxt_nitems(nxt_clone_map_entry_conf), + map->map + i); + if (nxt_slow_path(ret != NXT_OK)) { + nxt_alert(task, "clone map entry map error"); + return NXT_ERROR; + } } return NXT_OK; } +#endif + +#if (NXT_HAVE_CLONE) static nxt_int_t -nxt_init_set_ns(nxt_task_t *task, nxt_process_init_t *init, - nxt_conf_value_t *namespaces) +nxt_init_clone_flags(nxt_task_t *task, nxt_conf_value_t *namespaces, + nxt_process_init_t *init) { uint32_t index; nxt_str_t name; @@ -1592,6 +1755,8 @@ nxt_init_set_ns(nxt_task_t *task, nxt_process_init_t *init, return NXT_OK; } +#endif + static nxt_process_init_t * nxt_process_init_create(nxt_task_t *task, nxt_process_type_t type, diff --git a/src/nxt_process.c b/src/nxt_process.c index 75c6ef25..035f747f 100644 --- a/src/nxt_process.c +++ b/src/nxt_process.c @@ -23,6 +23,12 @@ nxt_pid_t nxt_pid; /* An original parent process pid. */ nxt_pid_t nxt_ppid; +/* A cached process effective uid */ +nxt_uid_t nxt_euid; + +/* A cached process effective gid */ +nxt_gid_t nxt_egid; + nxt_bool_t nxt_proc_conn_matrix[NXT_PROCESS_MAX][NXT_PROCESS_MAX] = { { 1, 1, 1, 1, 1 }, { 1, 0, 0, 0, 0 }, @@ -207,8 +213,9 @@ nxt_process_create(nxt_task_t *task, nxt_process_t *process) } #if (NXT_HAVE_CLONE && NXT_HAVE_CLONE_NEWUSER) - if ((init->isolation.clone.flags & CLONE_NEWUSER) == CLONE_NEWUSER) { - ret = nxt_clone_proc_map(task, pid, &init->isolation.clone); + if (NXT_CLONE_USER(init->isolation.clone.flags)) { + ret = nxt_clone_credential_map(task, pid, init->user_cred, + &init->isolation.clone); if (nxt_slow_path(ret != NXT_OK)) { goto fail; } @@ -257,7 +264,7 @@ cleanup: static void nxt_process_start(nxt_task_t *task, nxt_process_t *process) { - nxt_int_t ret; + nxt_int_t ret, cap_setid; nxt_port_t *port, *main_port; nxt_thread_t *thread; nxt_runtime_t *rt; @@ -276,9 +283,22 @@ nxt_process_start(nxt_task_t *task, nxt_process_t *process) nxt_random_init(&thread->random); - if (rt->capabilities.setid && init->user_cred != NULL) { - ret = nxt_credential_set(task, init->user_cred); - if (ret != NXT_OK) { + cap_setid = rt->capabilities.setid; + +#if (NXT_HAVE_CLONE_NEWUSER) + if (!cap_setid && NXT_CLONE_USER(init->isolation.clone.flags)) { + cap_setid = 1; + } +#endif + + if (cap_setid) { + ret = nxt_credential_setgids(task, init->user_cred); + if (nxt_slow_path(ret != NXT_OK)) { + goto fail; + } + + ret = nxt_credential_setuid(task, init->user_cred); + if (nxt_slow_path(ret != NXT_OK)) { goto fail; } } diff --git a/src/nxt_process.h b/src/nxt_process.h index 457d2299..343fffb8 100644 --- a/src/nxt_process.h +++ b/src/nxt_process.h @@ -7,18 +7,14 @@ #ifndef _NXT_PROCESS_H_INCLUDED_ #define _NXT_PROCESS_H_INCLUDED_ -#include +#if (NXT_HAVE_CLONE) +#include +#endif typedef pid_t nxt_pid_t; -typedef struct { - nxt_int_t flags; - nxt_conf_value_t *uidmap; - nxt_conf_value_t *gidmap; -} nxt_process_clone_t; - typedef struct nxt_process_init_s nxt_process_init_t; typedef nxt_int_t (*nxt_process_start_t)(nxt_task_t *task, void *data); typedef nxt_int_t (*nxt_process_restart_t)(nxt_task_t *task, nxt_runtime_t *rt, @@ -39,7 +35,9 @@ struct nxt_process_init_s { uint32_t stream; union { - nxt_process_clone_t clone; +#if (NXT_HAVE_CLONE) + nxt_clone_t clone; +#endif } isolation; }; @@ -118,6 +116,8 @@ nxt_port_t *nxt_process_connected_port_find(nxt_process_t *process, void nxt_worker_process_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); +void nxt_init_destroy(nxt_runtime_t *rt, nxt_process_init_t *init); + #if (NXT_HAVE_SETPROCTITLE) @@ -146,6 +146,8 @@ NXT_EXPORT void nxt_process_title(nxt_task_t *task, const char *fmt, ...); NXT_EXPORT extern nxt_pid_t nxt_pid; NXT_EXPORT extern nxt_pid_t nxt_ppid; +NXT_EXPORT extern nxt_uid_t nxt_euid; +NXT_EXPORT extern nxt_gid_t nxt_egid; NXT_EXPORT extern char **nxt_process_argv; NXT_EXPORT extern char ***nxt_process_environ; diff --git a/src/test/nxt_clone_test.c b/src/test/nxt_clone_test.c new file mode 100644 index 00000000..15d36557 --- /dev/null +++ b/src/test/nxt_clone_test.c @@ -0,0 +1,601 @@ +/* + * Copyright (C) NGINX, Inc. + * Copyright (C) Valentin V. Bartenev + */ + +#include +#include +#include "nxt_tests.h" + + +#define UIDMAP 1 +#define GIDMAP 2 + + +typedef struct { + nxt_int_t map_type; + nxt_str_t map_data; + nxt_int_t setid; + nxt_credential_t creds; + nxt_uid_t unit_euid; + nxt_gid_t unit_egid; + nxt_int_t result; + nxt_str_t errmsg; +} nxt_clone_creds_testcase_t; + +typedef struct { + nxt_clone_creds_testcase_t *tc; +} nxt_clone_creds_ctx_t; + + +nxt_int_t nxt_clone_test_mappings(nxt_task_t *task, nxt_mp_t *mp, + nxt_clone_creds_ctx_t *ctx, nxt_clone_creds_testcase_t *tc); +void nxt_cdecl nxt_clone_test_log_handler(nxt_uint_t level, nxt_log_t *log, + const char *fmt, ...); +nxt_int_t nxt_clone_test_map_assert(nxt_task_t *task, + nxt_clone_creds_testcase_t *tc, nxt_clone_credential_map_t *map); +static nxt_int_t nxt_clone_test_parse_map(nxt_task_t *task, + nxt_str_t *map_str, nxt_clone_credential_map_t *map); + + +nxt_log_t *test_log; + +static nxt_gid_t gids[] = {1000, 10000, 60000}; + +static nxt_clone_creds_testcase_t testcases[] = { + { + /* + * Unprivileged unit + * + * if no uid mapping and app creds and unit creds are the same, + * then we automatically add a map for the creds->uid. + * Then, child process can safely setuid(creds->uid) in + * the new namespace. + */ + UIDMAP, + nxt_string(""), + 0, + {"nobody", 65534, 65534, 0, NULL}, + 1000, 1000, + NXT_OK, + nxt_string("") + }, + { + UIDMAP, + nxt_string(""), + 0, + {"johndoe", 10000, 10000, 0, NULL}, + 1000, 1000, + NXT_OK, + nxt_string("") + }, + { + UIDMAP, + nxt_string("[{\"container\": 1000, \"host\": 1000, \"size\": 1}]"), + 0, + {"johndoe", 1000, 1000, 0, NULL}, + 1000, 1000, + NXT_OK, + nxt_string("") + }, + { + UIDMAP, + nxt_string("[{\"container\": 0, \"host\": 1000, \"size\": 1}]"), + 0, + {"root", 0, 0, 0, NULL}, + 1000, 1000, + NXT_OK, + nxt_string("") + }, + { + UIDMAP, + nxt_string("[{\"container\": 65534, \"host\": 1000, \"size\": 1}]"), + 0, + {"nobody", 65534, 0, 0, NULL}, + 1000, 1000, + NXT_OK, + nxt_string("") + }, + { + UIDMAP, + nxt_string("[{\"container\": 0, \"host\": 1000, \"size\": 1}," + " {\"container\": 1000, \"host\": 2000, \"size\": 1}]"), + 0, + {"root", 0, 0, 0, NULL}, + 1000, 1000, + NXT_ERROR, + nxt_string("\"uidmap\" field has 2 entries but unprivileged unit has " + "a maximum of 1 map.") + }, + { + UIDMAP, + nxt_string("[{\"container\": 0, \"host\": 1000, \"size\": 1}," + " {\"container\": 1000, \"host\": 2000, \"size\": 1}]"), + 1, /* privileged */ + {"root", 0, 0, 0, NULL}, + 1000, 1000, + NXT_OK, + nxt_string("") + }, + { + UIDMAP, + nxt_string("[{\"container\": 0, \"host\": 1000, \"size\": 1000}," + " {\"container\": 1000, \"host\": 2000, \"size\": 1000}]"), + 1, /* privileged */ + {"johndoe", 500, 0, 0, NULL}, + 1000, 1000, + NXT_OK, + nxt_string("") + }, + { + UIDMAP, + nxt_string("[{\"container\": 0, \"host\": 1000, \"size\": 1000}," + " {\"container\": 1000, \"host\": 2000, \"size\": 1000}]"), + 1, /* privileged */ + {"johndoe", 1000, 0, 0, NULL}, + 1000, 1000, + NXT_OK, + nxt_string("") + }, + { + UIDMAP, + nxt_string("[{\"container\": 0, \"host\": 1000, \"size\": 1000}," + " {\"container\": 1000, \"host\": 2000, \"size\": 1000}]"), + 1, /* privileged */ + {"johndoe", 1500, 0, 0, NULL}, + 1000, 1000, + NXT_OK, + nxt_string("") + }, + { + UIDMAP, + nxt_string("[{\"container\": 0, \"host\": 1000, \"size\": 1000}," + " {\"container\": 1000, \"host\": 2000, \"size\": 1000}]"), + 1, /* privileged */ + {"johndoe", 1999, 0, 0, NULL}, + 1000, 1000, + NXT_OK, + nxt_string("") + }, + { + UIDMAP, + nxt_string("[{\"container\": 0, \"host\": 1000, \"size\": 1000}," + " {\"container\": 1000, \"host\": 2000, \"size\": 1000}]"), + 1, /* privileged */ + {"johndoe", 2000, 0, 0, NULL}, + 1000, 1000, + NXT_ERROR, + nxt_string("\"uidmap\" field has no \"container\" entry for user " + "\"johndoe\" (uid 2000)") + }, + { + /* + * Unprivileged unit + * + * if no gid mapping and app creds and unit creds are the same, + * then we automatically add a map for the creds->base_gid. + * Then, child process can safely setgid(creds->base_gid) in + * the new namespace. + */ + GIDMAP, + nxt_string("[]"), + 0, + {"nobody", 65534, 65534, 0, NULL}, + 1000, 1000, + NXT_OK, + nxt_string("") + }, + { + /* + * Unprivileged unit + * + * Inside the new namespace, we can have any gid but it + * should map to parent gid (in this case 1000) in parent + * namespace. + */ + GIDMAP, + nxt_string("[{\"container\": 0, \"host\": 1000, \"size\": 1}]"), + 0, + {"root", 0, 0, 0, NULL}, + 1000, 1000, + NXT_OK, + nxt_string("") + }, + { + GIDMAP, + nxt_string("[{\"container\": 65534, \"host\": 1000, \"size\": 1}]"), + 0, + {"nobody", 65534, 65534, 0, NULL}, + 1000, 1000, + NXT_OK, + nxt_string("") + }, + { + /* + * Unprivileged unit + * + * There's no mapping for "johndoe" (gid 1000) inside the namespace. + */ + GIDMAP, + nxt_string("[{\"container\": 65535, \"host\": 1000, \"size\": 1}]"), + 0, + {"johndoe", 1000, 1000, 0, NULL}, + 1000, 1000, + NXT_ERROR, + nxt_string("\"gidmap\" field has no \"container\" entry for " + "gid 1000.") + }, + { + GIDMAP, + nxt_string("[{\"container\": 1000, \"host\": 1000, \"size\": 2}]"), + 0, + {"johndoe", 1000, 1000, 0, NULL}, + 1000, 1000, + NXT_ERROR, + nxt_string("\"gidmap\" field has an entry with \"size\": 2, but " + "for unprivileged unit it must be 1.") + }, + { + GIDMAP, + nxt_string("[{\"container\": 1000, \"host\": 1001, \"size\": 1}]"), + 0, + {"johndoe", 1000, 1000, 0, NULL}, + 1000, 1000, + NXT_ERROR, + nxt_string("\"gidmap\" field has an entry for host gid 1001 but " + "unprivileged unit can only map itself (gid 1000) " + "into child namespaces.") + }, + { + GIDMAP, + nxt_string("[{\"container\": 1000, \"host\": 1000, \"size\": 1}]"), + 0, + {"johndoe", 1000, 1000, 3, gids}, + 1000, 1000, + NXT_ERROR, + nxt_string("unprivileged unit disallow supplementary groups for " + "new namespace (user \"johndoe\" has 3 groups).") + }, + + /* privileged unit */ + + /* not root with capabilities */ + { + GIDMAP, + nxt_string("[]"), + 1, + {"johndoe", 1000, 1000, 0, NULL}, + 1000, 1000, + NXT_OK, + nxt_string("") + }, + { + GIDMAP, + nxt_string(""), + 1, + {"johndoe", 1000, 1000, 0, NULL}, + 1000, 1000, + NXT_OK, + nxt_string("") + }, + { + /* missing gid of {"user": "nobody"} */ + GIDMAP, + nxt_string("[{\"container\": 0, \"host\": 1000, \"size\": 1}]"), + 1, + {"nobody", 65534, 65534, 0, NULL}, + 1000, 1000, + NXT_ERROR, + nxt_string("\"gidmap\" field has no \"container\" entry for " + "gid 65534.") + }, + { + /* solves the previous by mapping 65534 gids */ + GIDMAP, + nxt_string("[{\"container\": 0, \"host\": 1000, \"size\": 65535}]"), + 1, + {"nobody", 65534, 65534, 0, NULL}, + 1000, 1000, + NXT_OK, + nxt_string("") + }, + { + /* solves by adding a separate mapping */ + GIDMAP, + nxt_string("[{\"container\": 0, \"host\": 1000, \"size\": 1}," + " {\"container\": 65534, \"host\": 1000, \"size\": 1}]"), + 1, + {"nobody", 65534, 65534, 0, NULL}, + 1000, 1000, + NXT_OK, + nxt_string("") + }, + { + /* + * Map a big range + */ + GIDMAP, + nxt_string("[{\"container\": 0, \"host\": 0, \"size\": 200000}]"), + 1, + {"johndoe", 100000, 100000, 0, NULL}, + 1000, 1000, + NXT_OK, + nxt_string("") + }, + { + /* + * Validate if supplementary groups are mapped + */ + GIDMAP, + nxt_string("[]"), + 1, + {"johndoe", 1000, 1000, 3, gids}, + 1000, 1000, + NXT_ERROR, + nxt_string("\"gidmap\" field has no entries but user \"johndoe\" " + "has 3 suplementary groups."), + }, + { + GIDMAP, + nxt_string("[{\"container\": 0, \"host\": 0, \"size\": 1}]"), + 1, + {"johndoe", 1000, 1000, 3, gids}, + 1000, 1000, + NXT_ERROR, + nxt_string("\"gidmap\" field has no \"container\" entry for " + "gid 1000."), + }, + { + GIDMAP, + nxt_string("[{\"container\": 1000, \"host\": 0, \"size\": 1}]"), + 1, + {"johndoe", 1000, 1000, 3, gids}, + 1000, 1000, + NXT_ERROR, + nxt_string("\"gidmap\" field has missing suplementary gid mappings " + "(found 1 out of 3)."), + }, + { + GIDMAP, + nxt_string("[{\"container\": 1000, \"host\": 0, \"size\": 1}," + " {\"container\": 10000, \"host\": 10000, \"size\": 1}]"), + 1, + {"johndoe", 1000, 1000, 3, gids}, + 1000, 1000, + NXT_ERROR, + nxt_string("\"gidmap\" field has missing suplementary gid mappings " + "(found 2 out of 3)."), + }, + { + /* + * Fix all mappings + */ + GIDMAP, + nxt_string("[{\"container\": 1000, \"host\": 0, \"size\": 1}," + "{\"container\": 10000, \"host\": 10000, \"size\": 1}," + " {\"container\": 60000, \"host\": 60000, \"size\": 1}]"), + 1, + {"johndoe", 1000, 1000, 3, gids}, + 1000, 1000, + NXT_OK, + nxt_string(""), + }, +}; + + +void nxt_cdecl +nxt_clone_test_log_handler(nxt_uint_t level, nxt_log_t *log, + const char *fmt, ...) +{ + u_char *p, *end; + va_list args; + nxt_clone_creds_ctx_t *ctx; + nxt_clone_creds_testcase_t *tc; + u_char msg[NXT_MAX_ERROR_STR]; + + p = msg; + end = msg + NXT_MAX_ERROR_STR; + + ctx = log->ctx; + tc = ctx->tc; + + va_start(args, fmt); + p = nxt_vsprintf(p, end, fmt, args); + va_end(args); + + *p++ = '\0'; + + if (tc->result == NXT_OK && level == NXT_LOG_DEBUG) { + return; + } + + if (tc->errmsg.length == 0) { + nxt_log_error(NXT_LOG_ERR, &nxt_main_log, "unexpected log: %s", msg); + return; + } + + if (!nxt_str_eq(&tc->errmsg, msg, (nxt_uint_t) (p - msg - 1))) { + nxt_log_error(NXT_LOG_ERR, &nxt_main_log, + "error log mismatch: got [%s] but wants [%V]", + msg, &tc->errmsg); + return; + } +} + + +nxt_int_t +nxt_clone_creds_test(nxt_thread_t *thr) +{ + nxt_mp_t *mp; + nxt_int_t ret; + nxt_uint_t count, i; + nxt_task_t *task; + nxt_runtime_t rt; + nxt_clone_creds_ctx_t ctx; + + nxt_log_t nxt_clone_creds_log = { + NXT_LOG_INFO, + 0, + nxt_clone_test_log_handler, + NULL, + &ctx + }; + + nxt_thread_time_update(thr); + + thr->runtime = &rt; + + task = thr->task; + + mp = nxt_mp_create(1024, 128, 256, 32); + if (mp == NULL) { + return NXT_ERROR; + } + + rt.mem_pool = mp; + + test_log = task->log; + task->log = &nxt_clone_creds_log; + task->thread = thr; + + count = sizeof(testcases)/sizeof(nxt_clone_creds_testcase_t); + + for (i = 0; i < count; i++) { + ret = nxt_clone_test_mappings(task, mp, &ctx, &testcases[i]); + + if (ret != NXT_OK) { + goto fail; + } + } + + ret = NXT_OK; + + nxt_log_error(NXT_LOG_NOTICE, test_log, "clone creds test passed"); + +fail: + task->log = test_log; + nxt_mp_destroy(mp); + + return ret; +} + + +nxt_int_t +nxt_clone_test_mappings(nxt_task_t *task, nxt_mp_t *mp, + nxt_clone_creds_ctx_t *ctx, nxt_clone_creds_testcase_t *tc) +{ + nxt_int_t ret; + nxt_runtime_t *rt; + nxt_clone_credential_map_t map; + + rt = task->thread->runtime; + + map.size = 0; + + if (tc->map_data.length > 0) { + ret = nxt_clone_test_parse_map(task, &tc->map_data, &map); + if (ret != NXT_OK) { + return NXT_ERROR; + } + } + + rt->capabilities.setid = tc->setid; + + nxt_euid = tc->unit_euid; + nxt_egid = tc->unit_egid; + + ctx->tc = tc; + + if (nxt_clone_test_map_assert(task, tc, &map) != NXT_OK) { + return NXT_ERROR; + } + + if (tc->setid && nxt_euid != 0) { + /* + * Running as root should have the same behavior as + * passing Linux capabilities. + */ + + nxt_euid = 0; + nxt_egid = 0; + + if (nxt_clone_test_map_assert(task, tc, &map) != NXT_OK) { + return NXT_ERROR; + } + } + + return NXT_OK; +} + + +nxt_int_t +nxt_clone_test_map_assert(nxt_task_t *task, nxt_clone_creds_testcase_t *tc, + nxt_clone_credential_map_t *map) +{ + nxt_int_t ret; + + if (tc->map_type == UIDMAP) { + ret = nxt_clone_vldt_credential_uidmap(task, map, &tc->creds); + } else { + ret = nxt_clone_vldt_credential_gidmap(task, map, &tc->creds); + } + + if (ret != tc->result) { + nxt_log_error(NXT_LOG_ERR, &nxt_main_log, + "return %d instead of %d (map: %V)", ret, tc->result, + &tc->map_data); + + return NXT_ERROR; + } + + return NXT_OK; +} + + +static nxt_int_t +nxt_clone_test_parse_map(nxt_task_t *task, nxt_str_t *map_str, + nxt_clone_credential_map_t *map) +{ + nxt_uint_t i; + nxt_runtime_t *rt; + nxt_conf_value_t *array, *obj, *value; + + static nxt_str_t host_name = nxt_string("host"); + static nxt_str_t cont_name = nxt_string("container"); + static nxt_str_t size_name = nxt_string("size"); + + rt = task->thread->runtime; + + array = nxt_conf_json_parse_str(rt->mem_pool, map_str); + if (array == NULL) { + return NXT_ERROR; + } + + map->size = nxt_conf_array_elements_count(array); + + if (map->size == 0) { + return NXT_OK; + } + + map->map = nxt_mp_alloc(rt->mem_pool, + map->size * sizeof(nxt_clone_map_entry_t)); + + if (map->map == NULL) { + return NXT_ERROR; + } + + for (i = 0; i < map->size; i++) { + obj = nxt_conf_get_array_element(array, i); + + value = nxt_conf_get_object_member(obj, &host_name, NULL); + map->map[i].host = nxt_conf_get_integer(value); + + value = nxt_conf_get_object_member(obj, &cont_name, NULL); + map->map[i].container = nxt_conf_get_integer(value); + + value = nxt_conf_get_object_member(obj, &size_name, NULL); + map->map[i].size = nxt_conf_get_integer(value); + } + + return NXT_OK; +} diff --git a/src/test/nxt_tests.c b/src/test/nxt_tests.c index 7cba0f69..901d76c3 100644 --- a/src/test/nxt_tests.c +++ b/src/test/nxt_tests.c @@ -162,5 +162,11 @@ main(int argc, char **argv) return 1; } +#if (NXT_HAVE_CLONE_NEWUSER) + if (nxt_clone_creds_test(thr) != NXT_OK) { + return 1; + } +#endif + return 0; } diff --git a/src/test/nxt_tests.h b/src/test/nxt_tests.h index be4168cf..d531cc7d 100644 --- a/src/test/nxt_tests.h +++ b/src/test/nxt_tests.h @@ -64,6 +64,7 @@ nxt_int_t nxt_malloc_test(nxt_thread_t *thr); nxt_int_t nxt_utf8_test(nxt_thread_t *thr); nxt_int_t nxt_http_parse_test(nxt_thread_t *thr); nxt_int_t nxt_strverscmp_test(nxt_thread_t *thr); +nxt_int_t nxt_clone_creds_test(nxt_thread_t *thr); #endif /* _NXT_TESTS_H_INCLUDED_ */ -- cgit From 51af6ac0a1a73cd2c539188379f487557b80b341 Mon Sep 17 00:00:00 2001 From: Valentin Bartenev Date: Mon, 23 Dec 2019 21:14:14 +0300 Subject: Python: pre-creation of objects for string constants. This is an optimization to avoid creating them at runtime on each request. --- src/nxt_python_wsgi.c | 194 ++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 156 insertions(+), 38 deletions(-) (limited to 'src') diff --git a/src/nxt_python_wsgi.c b/src/nxt_python_wsgi.c index 5bb2fb2c..3b13bea1 100644 --- a/src/nxt_python_wsgi.c +++ b/src/nxt_python_wsgi.c @@ -50,6 +50,8 @@ #define PyBytes_Check PyString_Check #define PyBytes_GET_SIZE PyString_GET_SIZE #define PyBytes_AS_STRING PyString_AS_STRING +#define PyUnicode_InternInPlace PyString_InternInPlace +#define PyUnicode_AsUTF8 PyString_AS_STRING #endif typedef struct nxt_python_run_ctx_s nxt_python_run_ctx_t; @@ -64,15 +66,18 @@ typedef struct { } nxt_py_error_t; static nxt_int_t nxt_python_init(nxt_task_t *task, nxt_common_app_conf_t *conf); +static nxt_int_t nxt_python_init_strings(void); static void nxt_python_request_handler(nxt_unit_request_info_t *req); static void nxt_python_atexit(void); static PyObject *nxt_python_create_environ(nxt_task_t *task); static PyObject *nxt_python_get_environ(nxt_python_run_ctx_t *ctx); -static int nxt_python_add_sptr(nxt_python_run_ctx_t *ctx, const char *name, +static int nxt_python_add_sptr(nxt_python_run_ctx_t *ctx, PyObject *name, nxt_unit_sptr_t *sptr, uint32_t size); -static int nxt_python_add_str(nxt_python_run_ctx_t *ctx, const char *name, - const char *str, uint32_t size); +static int nxt_python_add_field(nxt_python_run_ctx_t *ctx, + nxt_unit_field_t *field); +static int nxt_python_add_obj(nxt_python_run_ctx_t *ctx, PyObject *name, + PyObject *value); static PyObject *nxt_py_start_resp(PyObject *self, PyObject *args); static int nxt_python_response_add_field(nxt_python_run_ctx_t *ctx, @@ -157,6 +162,48 @@ static PyThreadState *nxt_python_thread_state; static nxt_python_run_ctx_t *nxt_python_run_ctx; +static PyObject *nxt_py_80_str; +static PyObject *nxt_py_close_str; +static PyObject *nxt_py_content_length_str; +static PyObject *nxt_py_content_type_str; +static PyObject *nxt_py_http_str; +static PyObject *nxt_py_https_str; +static PyObject *nxt_py_path_info_str; +static PyObject *nxt_py_query_string_str; +static PyObject *nxt_py_remote_addr_str; +static PyObject *nxt_py_request_method_str; +static PyObject *nxt_py_request_uri_str; +static PyObject *nxt_py_server_addr_str; +static PyObject *nxt_py_server_name_str; +static PyObject *nxt_py_server_port_str; +static PyObject *nxt_py_server_protocol_str; +static PyObject *nxt_py_wsgi_uri_scheme_str; + +typedef struct { + nxt_str_t string; + PyObject **object_p; +} nxt_python_string_t; + +static nxt_python_string_t nxt_python_strings[] = { + { nxt_string("80"), &nxt_py_80_str }, + { nxt_string("close"), &nxt_py_close_str }, + { nxt_string("CONTENT_LENGTH"), &nxt_py_content_length_str }, + { nxt_string("CONTENT_TYPE"), &nxt_py_content_type_str }, + { nxt_string("http"), &nxt_py_http_str }, + { nxt_string("https"), &nxt_py_https_str }, + { nxt_string("PATH_INFO"), &nxt_py_path_info_str }, + { nxt_string("QUERY_STRING"), &nxt_py_query_string_str }, + { nxt_string("REMOTE_ADDR"), &nxt_py_remote_addr_str }, + { nxt_string("REQUEST_METHOD"), &nxt_py_request_method_str }, + { nxt_string("REQUEST_URI"), &nxt_py_request_uri_str }, + { nxt_string("SERVER_ADDR"), &nxt_py_server_addr_str }, + { nxt_string("SERVER_NAME"), &nxt_py_server_name_str }, + { nxt_string("SERVER_PORT"), &nxt_py_server_port_str }, + { nxt_string("SERVER_PROTOCOL"), &nxt_py_server_protocol_str }, + { nxt_string("wsgi.url_scheme"), &nxt_py_wsgi_uri_scheme_str }, +}; + + static nxt_int_t nxt_python_init(nxt_task_t *task, nxt_common_app_conf_t *conf) { @@ -239,6 +286,12 @@ nxt_python_init(nxt_task_t *task, nxt_common_app_conf_t *conf) Py_InitializeEx(0); module = NULL; + obj = NULL; + + if (nxt_slow_path(nxt_python_init_strings() != NXT_OK)) { + nxt_alert(task, "Python failed to init string objects"); + goto fail; + } obj = PySys_GetObject((char *) "stderr"); if (nxt_slow_path(obj == NULL)) { @@ -382,6 +435,31 @@ fail: } +static nxt_int_t +nxt_python_init_strings(void) +{ + PyObject *obj; + nxt_uint_t i; + nxt_python_string_t *pstr; + + for (i = 0; i < nxt_nitems(nxt_python_strings); i++) { + pstr = &nxt_python_strings[i]; + + obj = PyString_FromStringAndSize((char *) pstr->string.start, + pstr->string.length); + if (nxt_slow_path(obj == NULL)) { + return NXT_ERROR; + } + + PyUnicode_InternInPlace(&obj); + + *pstr->object_p = obj; + } + + return NXT_OK; +} + + static void nxt_python_request_handler(nxt_unit_request_info_t *req) { @@ -478,7 +556,7 @@ nxt_python_request_handler(nxt_unit_request_info_t *req) rc = NXT_UNIT_ERROR; } - close = PyObject_GetAttrString(response, "close"); + close = PyObject_GetAttr(response, nxt_py_close_str); if (close != NULL) { result = PyObject_CallFunction(close, NULL); @@ -512,6 +590,12 @@ done: static void nxt_python_atexit(void) { + nxt_uint_t i; + + for (i = 0; i < nxt_nitems(nxt_python_strings); i++) { + Py_XDECREF(*nxt_python_strings[i].object_p); + } + Py_XDECREF(nxt_py_stderr_flush); Py_XDECREF(nxt_py_application); Py_XDECREF(nxt_py_start_resp_obj); @@ -655,7 +739,6 @@ static PyObject * nxt_python_get_environ(nxt_python_run_ctx_t *ctx) { int rc; - char *name; uint32_t i; PyObject *environ; nxt_unit_field_t *f; @@ -681,47 +764,52 @@ nxt_python_get_environ(nxt_python_run_ctx_t *ctx) } \ } while(0) - RC(nxt_python_add_sptr(ctx, "REQUEST_METHOD", &r->method, + RC(nxt_python_add_sptr(ctx, nxt_py_request_method_str, &r->method, r->method_length)); - RC(nxt_python_add_sptr(ctx, "REQUEST_URI", &r->target, r->target_length)); - RC(nxt_python_add_sptr(ctx, "QUERY_STRING", &r->query, r->query_length)); - RC(nxt_python_add_sptr(ctx, "PATH_INFO", &r->path, r->path_length)); - - RC(nxt_python_add_sptr(ctx, "REMOTE_ADDR", &r->remote, r->remote_length)); - RC(nxt_python_add_sptr(ctx, "SERVER_ADDR", &r->local, r->local_length)); + RC(nxt_python_add_sptr(ctx, nxt_py_request_uri_str, &r->target, + r->target_length)); + RC(nxt_python_add_sptr(ctx, nxt_py_query_string_str, &r->query, + r->query_length)); + RC(nxt_python_add_sptr(ctx, nxt_py_path_info_str, &r->path, + r->path_length)); + + RC(nxt_python_add_sptr(ctx, nxt_py_remote_addr_str, &r->remote, + r->remote_length)); + RC(nxt_python_add_sptr(ctx, nxt_py_server_addr_str, &r->local, + r->local_length)); if (r->tls) { - RC(nxt_python_add_str(ctx, "wsgi.url_scheme", "https", 5)); - + RC(nxt_python_add_obj(ctx, nxt_py_wsgi_uri_scheme_str, + nxt_py_https_str)); } else { - RC(nxt_python_add_str(ctx, "wsgi.url_scheme", "http", 4)); + RC(nxt_python_add_obj(ctx, nxt_py_wsgi_uri_scheme_str, + nxt_py_http_str)); } - RC(nxt_python_add_sptr(ctx, "SERVER_PROTOCOL", &r->version, + RC(nxt_python_add_sptr(ctx, nxt_py_server_protocol_str, &r->version, r->version_length)); - RC(nxt_python_add_sptr(ctx, "SERVER_NAME", &r->server_name, + RC(nxt_python_add_sptr(ctx, nxt_py_server_name_str, &r->server_name, r->server_name_length)); - RC(nxt_python_add_str(ctx, "SERVER_PORT", "80", 2)); + RC(nxt_python_add_obj(ctx, nxt_py_server_port_str, nxt_py_80_str)); for (i = 0; i < r->fields_count; i++) { f = r->fields + i; - name = nxt_unit_sptr_get(&f->name); - RC(nxt_python_add_sptr(ctx, name, &f->value, f->value_length)); + RC(nxt_python_add_field(ctx, f)); } if (r->content_length_field != NXT_UNIT_NONE_FIELD) { f = r->fields + r->content_length_field; - RC(nxt_python_add_sptr(ctx, "CONTENT_LENGTH", &f->value, + RC(nxt_python_add_sptr(ctx, nxt_py_content_length_str, &f->value, f->value_length)); } if (r->content_type_field != NXT_UNIT_NONE_FIELD) { f = r->fields + r->content_type_field; - RC(nxt_python_add_sptr(ctx, "CONTENT_TYPE", &f->value, + RC(nxt_python_add_sptr(ctx, nxt_py_content_type_str, &f->value, f->value_length)); } @@ -738,7 +826,7 @@ fail: static int -nxt_python_add_sptr(nxt_python_run_ctx_t *ctx, const char *name, +nxt_python_add_sptr(nxt_python_run_ctx_t *ctx, PyObject *name, nxt_unit_sptr_t *sptr, uint32_t size) { char *src; @@ -756,10 +844,10 @@ nxt_python_add_sptr(nxt_python_run_ctx_t *ctx, const char *name, return NXT_UNIT_ERROR; } - if (nxt_slow_path(PyDict_SetItemString(ctx->environ, name, value) != 0)) { + if (nxt_slow_path(PyDict_SetItem(ctx->environ, name, value) != 0)) { nxt_unit_req_error(ctx->req, "Python failed to set the \"%s\" environ value", - name); + PyUnicode_AsUTF8(name)); Py_DECREF(value); return NXT_UNIT_ERROR; @@ -772,37 +860,67 @@ nxt_python_add_sptr(nxt_python_run_ctx_t *ctx, const char *name, static int -nxt_python_add_str(nxt_python_run_ctx_t *ctx, const char *name, - const char *str, uint32_t size) +nxt_python_add_field(nxt_python_run_ctx_t *ctx, nxt_unit_field_t *field) { - PyObject *value; + char *src; + PyObject *name, *value; - if (nxt_slow_path(str == NULL)) { - return NXT_UNIT_OK; + src = nxt_unit_sptr_get(&field->name); + + name = PyString_FromStringAndSize(src, field->name_length); + if (nxt_slow_path(name == NULL)) { + nxt_unit_req_error(ctx->req, + "Python failed to create name string \"%.*s\"", + (int) field->name_length, src); + nxt_python_print_exception(); + + return NXT_UNIT_ERROR; } - value = PyString_FromStringAndSize(str, size); + src = nxt_unit_sptr_get(&field->value); + + value = PyString_FromStringAndSize(src, field->value_length); if (nxt_slow_path(value == NULL)) { nxt_unit_req_error(ctx->req, "Python failed to create value string \"%.*s\"", - (int) size, str); + (int) field->value_length, src); nxt_python_print_exception(); - return NXT_UNIT_ERROR; + goto fail; } - if (nxt_slow_path(PyDict_SetItemString(ctx->environ, name, value) != 0)) { + if (nxt_slow_path(PyDict_SetItem(ctx->environ, name, value) != 0)) { nxt_unit_req_error(ctx->req, "Python failed to set the \"%s\" environ value", - name); + PyUnicode_AsUTF8(name)); + goto fail; + } - Py_DECREF(value); + Py_DECREF(name); + Py_DECREF(value); + + return NXT_UNIT_OK; + +fail: + + Py_DECREF(name); + Py_XDECREF(value); + + return NXT_UNIT_ERROR; +} + + +static int +nxt_python_add_obj(nxt_python_run_ctx_t *ctx, PyObject *name, PyObject *value) +{ + if (nxt_slow_path(PyDict_SetItem(ctx->environ, name, value) != 0)) { + nxt_unit_req_error(ctx->req, + "Python failed to set the \"%s\" environ value", + PyUnicode_AsUTF8(name)); return NXT_UNIT_ERROR; } - Py_DECREF(value); - return NXT_UNIT_OK; } -- cgit From 823f658c771af8b6ff07a3581a2c63efc6a15ad2 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Tue, 24 Dec 2019 17:59:37 +0300 Subject: Go: linking against libunit. --- src/go/unit/ldflags.go | 10 ++++++++++ 1 file changed, 10 insertions(+) create mode 100644 src/go/unit/ldflags.go (limited to 'src') diff --git a/src/go/unit/ldflags.go b/src/go/unit/ldflags.go new file mode 100644 index 00000000..68f2ab78 --- /dev/null +++ b/src/go/unit/ldflags.go @@ -0,0 +1,10 @@ +/* + * Copyright (C) NGINX, Inc. + */ + +package unit + +/* +#cgo LDFLAGS: -lunit +*/ +import "C" -- cgit From f5a2984acf9a562b6b1eb45a21d9af2be2415659 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Tue, 24 Dec 2019 17:59:52 +0300 Subject: Go: moving source files to the root of the project. This patch includes packaging changes related to files move. --- src/go/unit/ldflags-lrt.go | 13 --- src/go/unit/ldflags.go | 10 --- src/go/unit/nxt_cgo_lib.c | 207 --------------------------------------------- src/go/unit/nxt_cgo_lib.h | 40 --------- src/go/unit/port.go | 170 ------------------------------------- src/go/unit/request.go | 144 ------------------------------- src/go/unit/response.go | 87 ------------------- src/go/unit/unit.go | 149 -------------------------------- 8 files changed, 820 deletions(-) delete mode 100644 src/go/unit/ldflags-lrt.go delete mode 100644 src/go/unit/ldflags.go delete mode 100644 src/go/unit/nxt_cgo_lib.c delete mode 100644 src/go/unit/nxt_cgo_lib.h delete mode 100644 src/go/unit/port.go delete mode 100644 src/go/unit/request.go delete mode 100644 src/go/unit/response.go delete mode 100644 src/go/unit/unit.go (limited to 'src') diff --git a/src/go/unit/ldflags-lrt.go b/src/go/unit/ldflags-lrt.go deleted file mode 100644 index f5a63508..00000000 --- a/src/go/unit/ldflags-lrt.go +++ /dev/null @@ -1,13 +0,0 @@ -// +build linux netbsd - -/* - * Copyright (C) Max Romanov - * Copyright (C) NGINX, Inc. - */ - -package unit - -/* -#cgo LDFLAGS: -lrt -*/ -import "C" diff --git a/src/go/unit/ldflags.go b/src/go/unit/ldflags.go deleted file mode 100644 index 68f2ab78..00000000 --- a/src/go/unit/ldflags.go +++ /dev/null @@ -1,10 +0,0 @@ -/* - * Copyright (C) NGINX, Inc. - */ - -package unit - -/* -#cgo LDFLAGS: -lunit -*/ -import "C" diff --git a/src/go/unit/nxt_cgo_lib.c b/src/go/unit/nxt_cgo_lib.c deleted file mode 100644 index 5cb31b5a..00000000 --- a/src/go/unit/nxt_cgo_lib.c +++ /dev/null @@ -1,207 +0,0 @@ - -/* - * Copyright (C) Max Romanov - * Copyright (C) NGINX, Inc. - */ - -#include "_cgo_export.h" - -#include -#include - - -static void nxt_cgo_request_handler(nxt_unit_request_info_t *req); -static nxt_cgo_str_t *nxt_cgo_str_init(nxt_cgo_str_t *dst, - nxt_unit_sptr_t *sptr, uint32_t length); -static int nxt_cgo_add_port(nxt_unit_ctx_t *, nxt_unit_port_t *port); -static void nxt_cgo_remove_port(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id); -static ssize_t nxt_cgo_port_send(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id, - const void *buf, size_t buf_size, const void *oob, size_t oob_size); -static ssize_t nxt_cgo_port_recv(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id, - void *buf, size_t buf_size, void *oob, size_t oob_size); - -int -nxt_cgo_run(uintptr_t handler) -{ - int rc; - nxt_unit_ctx_t *ctx; - nxt_unit_init_t init; - - memset(&init, 0, sizeof(init)); - - init.callbacks.request_handler = nxt_cgo_request_handler; - init.callbacks.add_port = nxt_cgo_add_port; - init.callbacks.remove_port = nxt_cgo_remove_port; - init.callbacks.port_send = nxt_cgo_port_send; - init.callbacks.port_recv = nxt_cgo_port_recv; - - init.data = (void *) handler; - - ctx = nxt_unit_init(&init); - if (ctx == NULL) { - return NXT_UNIT_ERROR; - } - - rc = nxt_unit_run(ctx); - - nxt_unit_done(ctx); - - return rc; -} - - -static void -nxt_cgo_request_handler(nxt_unit_request_info_t *req) -{ - uint32_t i; - uintptr_t go_req; - nxt_cgo_str_t method, uri, name, value, proto, host, remote_addr; - nxt_unit_field_t *f; - nxt_unit_request_t *r; - - r = req->request; - - go_req = nxt_go_request_create((uintptr_t) req, - nxt_cgo_str_init(&method, &r->method, r->method_length), - nxt_cgo_str_init(&uri, &r->target, r->target_length)); - - nxt_go_request_set_proto(go_req, - nxt_cgo_str_init(&proto, &r->version, r->version_length), 1, 1); - - for (i = 0; i < r->fields_count; i++) { - f = &r->fields[i]; - - nxt_go_request_add_header(go_req, - nxt_cgo_str_init(&name, &f->name, f->name_length), - nxt_cgo_str_init(&value, &f->value, f->value_length)); - } - - nxt_go_request_set_content_length(go_req, r->content_length); - nxt_go_request_set_host(go_req, - nxt_cgo_str_init(&host, &r->server_name, r->server_name_length)); - nxt_go_request_set_remote_addr(go_req, - nxt_cgo_str_init(&remote_addr, &r->remote, r->remote_length)); - - if (r->tls) { - nxt_go_request_set_tls(go_req); - } - - nxt_go_request_handler(go_req, (uintptr_t) req->unit->data); -} - - -static nxt_cgo_str_t * -nxt_cgo_str_init(nxt_cgo_str_t *dst, nxt_unit_sptr_t *sptr, uint32_t length) -{ - dst->length = length; - dst->start = nxt_unit_sptr_get(sptr); - - return dst; -} - - -static int -nxt_cgo_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) -{ - nxt_go_add_port(port->id.pid, port->id.id, - port->in_fd, port->out_fd); - - return nxt_unit_add_port(ctx, port); -} - - -static void -nxt_cgo_remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) -{ - nxt_go_remove_port(port_id->pid, port_id->id); - - nxt_unit_remove_port(ctx, port_id); -} - - -static ssize_t -nxt_cgo_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, - const void *buf, size_t buf_size, const void *oob, size_t oob_size) -{ - return nxt_go_port_send(port_id->pid, port_id->id, - (void *) buf, buf_size, (void *) oob, oob_size); -} - - -static ssize_t -nxt_cgo_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, - void *buf, size_t buf_size, void *oob, size_t oob_size) -{ - return nxt_go_port_recv(port_id->pid, port_id->id, - buf, buf_size, oob, oob_size); -} - - -int -nxt_cgo_response_create(uintptr_t req, int status, int fields, - uint32_t fields_size) -{ - return nxt_unit_response_init((nxt_unit_request_info_t *) req, - status, fields, fields_size); -} - - -int -nxt_cgo_response_add_field(uintptr_t req, uintptr_t name, uint8_t name_len, - uintptr_t value, uint32_t value_len) -{ - return nxt_unit_response_add_field((nxt_unit_request_info_t *) req, - (char *) name, name_len, - (char *) value, value_len); -} - - -int -nxt_cgo_response_send(uintptr_t req) -{ - return nxt_unit_response_send((nxt_unit_request_info_t *) req); -} - - -ssize_t -nxt_cgo_response_write(uintptr_t req, uintptr_t start, uint32_t len) -{ - int rc; - - rc = nxt_unit_response_write((nxt_unit_request_info_t *) req, - (void *) start, len); - if (rc != NXT_UNIT_OK) { - return -1; - } - - return len; -} - - -ssize_t -nxt_cgo_request_read(uintptr_t req, uintptr_t dst, uint32_t dst_len) -{ - return nxt_unit_request_read((nxt_unit_request_info_t *) req, - (void *) dst, dst_len); -} - - -int -nxt_cgo_request_close(uintptr_t req) -{ - return 0; -} - - -void -nxt_cgo_request_done(uintptr_t req, int res) -{ - nxt_unit_request_done((nxt_unit_request_info_t *) req, res); -} - - -void -nxt_cgo_warn(uintptr_t msg, uint32_t msg_len) -{ - nxt_unit_warn(NULL, "%.*s", (int) msg_len, (char *) msg); -} diff --git a/src/go/unit/nxt_cgo_lib.h b/src/go/unit/nxt_cgo_lib.h deleted file mode 100644 index 5317380b..00000000 --- a/src/go/unit/nxt_cgo_lib.h +++ /dev/null @@ -1,40 +0,0 @@ - -/* - * Copyright (C) Max Romanov - * Copyright (C) NGINX, Inc. - */ - -#ifndef _NXT_CGO_LIB_H_INCLUDED_ -#define _NXT_CGO_LIB_H_INCLUDED_ - - -#include -#include -#include - -typedef struct { - int length; - char *start; -} nxt_cgo_str_t; - -int nxt_cgo_run(uintptr_t handler); - -int nxt_cgo_response_create(uintptr_t req, int code, int fields, - uint32_t fields_size); - -int nxt_cgo_response_add_field(uintptr_t req, uintptr_t name, uint8_t name_len, - uintptr_t value, uint32_t value_len); - -int nxt_cgo_response_send(uintptr_t req); - -ssize_t nxt_cgo_response_write(uintptr_t req, uintptr_t src, uint32_t len); - -ssize_t nxt_cgo_request_read(uintptr_t req, uintptr_t dst, uint32_t dst_len); - -int nxt_cgo_request_close(uintptr_t req); - -void nxt_cgo_request_done(uintptr_t req, int res); - -void nxt_cgo_warn(uintptr_t msg, uint32_t msg_len); - -#endif /* _NXT_CGO_LIB_H_INCLUDED_ */ diff --git a/src/go/unit/port.go b/src/go/unit/port.go deleted file mode 100644 index a68cae74..00000000 --- a/src/go/unit/port.go +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Copyright (C) Max Romanov - * Copyright (C) NGINX, Inc. - */ - -package unit - -/* -#include "nxt_cgo_lib.h" -*/ -import "C" - -import ( - "net" - "os" - "sync" - "unsafe" -) - -type port_key struct { - pid int - id int -} - -type port struct { - key port_key - rcv *net.UnixConn - snd *net.UnixConn -} - -type port_registry struct { - sync.RWMutex - m map[port_key]*port -} - -var port_registry_ port_registry - -func find_port(key port_key) *port { - port_registry_.RLock() - res := port_registry_.m[key] - port_registry_.RUnlock() - - return res -} - -func add_port(p *port) { - - port_registry_.Lock() - if port_registry_.m == nil { - port_registry_.m = make(map[port_key]*port) - } - - port_registry_.m[p.key] = p - - port_registry_.Unlock() -} - -func (p *port) Close() { - if p.rcv != nil { - p.rcv.Close() - } - - if p.snd != nil { - p.snd.Close() - } -} - -func getUnixConn(fd int) *net.UnixConn { - if fd < 0 { - return nil - } - - f := os.NewFile(uintptr(fd), "sock") - defer f.Close() - - c, err := net.FileConn(f) - if err != nil { - nxt_go_warn("FileConn error %s", err) - return nil - } - - uc, ok := c.(*net.UnixConn) - if !ok { - nxt_go_warn("Not a Unix-domain socket %d", fd) - return nil - } - - return uc -} - -//export nxt_go_add_port -func nxt_go_add_port(pid C.int, id C.int, rcv C.int, snd C.int) { - p := &port{ - key: port_key{ - pid: int(pid), - id: int(id), - }, - rcv: getUnixConn(int(rcv)), - snd: getUnixConn(int(snd)), - } - - add_port(p) -} - -//export nxt_go_remove_port -func nxt_go_remove_port(pid C.int, id C.int) { - key := port_key{ - pid: int(pid), - id: int(id), - } - - port_registry_.Lock() - if port_registry_.m != nil { - delete(port_registry_.m, key) - } - - port_registry_.Unlock() -} - -//export nxt_go_port_send -func nxt_go_port_send(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int, - oob unsafe.Pointer, oob_size C.int) C.ssize_t { - - key := port_key{ - pid: int(pid), - id: int(id), - } - - p := find_port(key) - - if p == nil { - nxt_go_warn("port %d:%d not found", pid, id) - return 0 - } - - n, oobn, err := p.snd.WriteMsgUnix(GoBytes(buf, buf_size), - GoBytes(oob, oob_size), nil) - - if err != nil { - nxt_go_warn("write result %d (%d), %s", n, oobn, err) - } - - return C.ssize_t(n) -} - -//export nxt_go_port_recv -func nxt_go_port_recv(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int, - oob unsafe.Pointer, oob_size C.int) C.ssize_t { - - key := port_key{ - pid: int(pid), - id: int(id), - } - - p := find_port(key) - - if p == nil { - nxt_go_warn("port %d:%d not found", pid, id) - return 0 - } - - n, oobn, _, _, err := p.rcv.ReadMsgUnix(GoBytes(buf, buf_size), - GoBytes(oob, oob_size)) - - if err != nil { - nxt_go_warn("read result %d (%d), %s", n, oobn, err) - } - - return C.ssize_t(n) -} diff --git a/src/go/unit/request.go b/src/go/unit/request.go deleted file mode 100644 index 1d8c6702..00000000 --- a/src/go/unit/request.go +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Copyright (C) Max Romanov - * Copyright (C) NGINX, Inc. - */ - -package unit - -/* -#include "nxt_cgo_lib.h" -*/ -import "C" - -import ( - "io" - "net/http" - "net/url" - "crypto/tls" - "unsafe" -) - -type request struct { - req http.Request - resp *response - c_req C.uintptr_t -} - -func (r *request) Read(p []byte) (n int, err error) { - res := C.nxt_cgo_request_read(r.c_req, buf_ref(p), C.uint32_t(len(p))) - - if res == 0 && len(p) > 0 { - return 0, io.EOF - } - - return int(res), nil -} - -func (r *request) Close() error { - C.nxt_cgo_request_close(r.c_req) - return nil -} - -func (r *request) response() *response { - if r.resp == nil { - r.resp = new_response(r.c_req, &r.req) - } - - return r.resp -} - -func (r *request) done() { - resp := r.response() - if !resp.headerSent { - resp.WriteHeader(http.StatusOK) - } - C.nxt_cgo_request_done(r.c_req, 0) -} - -func get_request(go_req uintptr) *request { - return (*request)(unsafe.Pointer(go_req)) -} - -//export nxt_go_request_create -func nxt_go_request_create(c_req C.uintptr_t, - c_method *C.nxt_cgo_str_t, c_uri *C.nxt_cgo_str_t) uintptr { - - uri := C.GoStringN(c_uri.start, c_uri.length) - - var URL *url.URL - var err error - if URL, err = url.ParseRequestURI(uri); err != nil { - return 0 - } - - r := &request{ - req: http.Request{ - Method: C.GoStringN(c_method.start, c_method.length), - URL: URL, - Header: http.Header{}, - Body: nil, - RequestURI: uri, - }, - c_req: c_req, - } - r.req.Body = r - - return uintptr(unsafe.Pointer(r)) -} - -//export nxt_go_request_set_proto -func nxt_go_request_set_proto(go_req uintptr, proto *C.nxt_cgo_str_t, - maj C.int, min C.int) { - - r := get_request(go_req) - r.req.Proto = C.GoStringN(proto.start, proto.length) - r.req.ProtoMajor = int(maj) - r.req.ProtoMinor = int(min) -} - -//export nxt_go_request_add_header -func nxt_go_request_add_header(go_req uintptr, name *C.nxt_cgo_str_t, - value *C.nxt_cgo_str_t) { - - r := get_request(go_req) - r.req.Header.Add(C.GoStringN(name.start, name.length), - C.GoStringN(value.start, value.length)) -} - -//export nxt_go_request_set_content_length -func nxt_go_request_set_content_length(go_req uintptr, l C.int64_t) { - get_request(go_req).req.ContentLength = int64(l) -} - -//export nxt_go_request_set_host -func nxt_go_request_set_host(go_req uintptr, host *C.nxt_cgo_str_t) { - get_request(go_req).req.Host = C.GoStringN(host.start, host.length) -} - -//export nxt_go_request_set_url -func nxt_go_request_set_url(go_req uintptr, scheme *C.char) { - get_request(go_req).req.URL.Scheme = C.GoString(scheme) -} - -//export nxt_go_request_set_remote_addr -func nxt_go_request_set_remote_addr(go_req uintptr, addr *C.nxt_cgo_str_t) { - - get_request(go_req).req.RemoteAddr = C.GoStringN(addr.start, addr.length) -} - -//export nxt_go_request_set_tls -func nxt_go_request_set_tls(go_req uintptr) { - - get_request(go_req).req.TLS = &tls.ConnectionState{ } -} - -//export nxt_go_request_handler -func nxt_go_request_handler(go_req uintptr, h uintptr) { - r := get_request(go_req) - handler := get_handler(h) - - go func(r *request) { - handler.ServeHTTP(r.response(), &r.req) - r.done() - }(r) -} diff --git a/src/go/unit/response.go b/src/go/unit/response.go deleted file mode 100644 index 767d66b7..00000000 --- a/src/go/unit/response.go +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Copyright (C) Max Romanov - * Copyright (C) NGINX, Inc. - */ - -package unit - -/* -#include "nxt_cgo_lib.h" -*/ -import "C" - -import ( - "net/http" -) - -type response struct { - header http.Header - headerSent bool - req *http.Request - c_req C.uintptr_t -} - -func new_response(c_req C.uintptr_t, req *http.Request) *response { - resp := &response{ - header: http.Header{}, - req: req, - c_req: c_req, - } - - return resp -} - -func (r *response) Header() http.Header { - return r.header -} - -func (r *response) Write(p []byte) (n int, err error) { - if !r.headerSent { - r.WriteHeader(http.StatusOK) - } - - res := C.nxt_cgo_response_write(r.c_req, buf_ref(p), C.uint32_t(len(p))) - return int(res), nil -} - -func (r *response) WriteHeader(code int) { - if r.headerSent { - // Note: explicitly using Stderr, as Stdout is our HTTP output. - nxt_go_warn("multiple response.WriteHeader calls") - return - } - r.headerSent = true - - // Set a default Content-Type - if _, hasType := r.header["Content-Type"]; !hasType { - r.header.Add("Content-Type", "text/html; charset=utf-8") - } - - fields := 0 - fields_size := 0 - - for k, vv := range r.header { - for _, v := range vv { - fields++ - fields_size += len(k) + len(v) - } - } - - C.nxt_cgo_response_create(r.c_req, C.int(code), C.int(fields), - C.uint32_t(fields_size)) - - for k, vv := range r.header { - for _, v := range vv { - C.nxt_cgo_response_add_field(r.c_req, str_ref(k), C.uint8_t(len(k)), - str_ref(v), C.uint32_t(len(v))) - } - } - - C.nxt_cgo_response_send(r.c_req) -} - -func (r *response) Flush() { - if !r.headerSent { - r.WriteHeader(http.StatusOK) - } -} diff --git a/src/go/unit/unit.go b/src/go/unit/unit.go deleted file mode 100644 index 1534479e..00000000 --- a/src/go/unit/unit.go +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Copyright (C) Max Romanov - * Copyright (C) NGINX, Inc. - */ - -package unit - -/* -#include "nxt_cgo_lib.h" -*/ -import "C" - -import ( - "fmt" - "net/http" - "sync" - "unsafe" -) - -type cbuf struct { - b C.uintptr_t - s C.size_t -} - -func buf_ref(buf []byte) C.uintptr_t { - if len(buf) == 0 { - return 0 - } - - return C.uintptr_t(uintptr(unsafe.Pointer(&buf[0]))) -} - -type StringHeader struct { - Data unsafe.Pointer - Len int -} - -func str_ref(s string) C.uintptr_t { - header := (*StringHeader)(unsafe.Pointer(&s)) - - return C.uintptr_t(uintptr(unsafe.Pointer(header.Data))) -} - -func (buf *cbuf) init_bytes(b []byte) { - buf.b = buf_ref(b) - buf.s = C.size_t(len(b)) -} - -func (buf *cbuf) init_string(s string) { - buf.b = str_ref(s) - buf.s = C.size_t(len(s)) -} - -type SliceHeader struct { - Data unsafe.Pointer - Len int - Cap int -} - -func (buf *cbuf) GoBytes() []byte { - if buf == nil { - var b [0]byte - return b[:0] - } - - bytesHeader := &SliceHeader{ - Data: unsafe.Pointer(uintptr(buf.b)), - Len: int(buf.s), - Cap: int(buf.s), - } - - return *(*[]byte)(unsafe.Pointer(bytesHeader)) -} - -func GoBytes(buf unsafe.Pointer, size C.int) []byte { - bytesHeader := &SliceHeader{ - Data: buf, - Len: int(size), - Cap: int(size), - } - - return *(*[]byte)(unsafe.Pointer(bytesHeader)) -} - -func nxt_go_warn(format string, args ...interface{}) { - str := fmt.Sprintf("[go] " + format, args...) - - C.nxt_cgo_warn(str_ref(str), C.uint32_t(len(str))) -} - -type handler_registry struct { - sync.RWMutex - next uintptr - m map[uintptr]*http.Handler -} - -var handler_registry_ handler_registry - -func set_handler(handler *http.Handler) uintptr { - - handler_registry_.Lock() - if handler_registry_.m == nil { - handler_registry_.m = make(map[uintptr]*http.Handler) - handler_registry_.next = 1 - } - - h := handler_registry_.next - handler_registry_.next += 1 - handler_registry_.m[h] = handler - - handler_registry_.Unlock() - - return h -} - -func get_handler(h uintptr) http.Handler { - handler_registry_.RLock() - defer handler_registry_.RUnlock() - - return *handler_registry_.m[h] -} - -func reset_handler(h uintptr) { - - handler_registry_.Lock() - if handler_registry_.m != nil { - delete(handler_registry_.m, h) - } - - handler_registry_.Unlock() -} - -func ListenAndServe(addr string, handler http.Handler) error { - if handler == nil { - handler = http.DefaultServeMux - } - - h := set_handler(&handler) - - rc := C.nxt_cgo_run(C.uintptr_t(h)) - - reset_handler(h) - - if rc != 0 { - return http.ListenAndServe(addr, handler) - } - - return nil -} -- cgit From faeb73a65e9ab07c1636cccb33f23df96fddd330 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Tue, 24 Dec 2019 18:03:56 +0300 Subject: Using non-shared memory buffers for small messages. Current shared memory buffer implementation uses fixed-size memory blocks, allocating at least 16384 bytes. When application sends data in a large number of small chunks, it makes sense to buffer them or use plain memory buffers to improve performance and reduce memory footprint. This patch introduces minimum size limit (1024 bytes) for shared memory buffers. --- src/nxt_unit.c | 304 ++++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 203 insertions(+), 101 deletions(-) (limited to 'src') diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 0cf32916..d97d19f2 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -19,6 +19,10 @@ #include #endif +#define NXT_UNIT_MAX_PLAIN_SIZE 1024 +#define NXT_UNIT_LOCAL_BUF_SIZE \ + (NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t)) + typedef struct nxt_unit_impl_s nxt_unit_impl_t; typedef struct nxt_unit_mmap_s nxt_unit_mmap_t; typedef struct nxt_unit_mmaps_s nxt_unit_mmaps_t; @@ -63,6 +67,7 @@ static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf); static int nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, nxt_unit_mmap_buf_t *mmap_buf, int last); static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf); +static void nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf); static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, size_t size); static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, @@ -75,7 +80,7 @@ static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int fd); static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, uint32_t size, - nxt_unit_mmap_buf_t *mmap_buf); + nxt_unit_mmap_buf_t *mmap_buf, char *local_buf); static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd); static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps); @@ -135,10 +140,11 @@ struct nxt_unit_mmap_buf_s { nxt_unit_mmap_buf_t **prev; nxt_port_mmap_header_t *hdr; -// nxt_queue_link_t link; nxt_unit_port_id_t port_id; nxt_unit_request_info_t *req; nxt_unit_ctx_impl_t *ctx_impl; + char *free_ptr; + char *plain_ptr; }; @@ -196,8 +202,6 @@ struct nxt_unit_websocket_frame_impl_s { nxt_queue_link_t link; nxt_unit_ctx_impl_t *ctx_impl; - - void *retain_buf; }; @@ -961,7 +965,6 @@ nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) ws_impl->ws.req = req; ws_impl->buf = NULL; - ws_impl->retain_buf = NULL; if (recv_msg->mmap) { for (b = recv_msg->incoming_buf; b != NULL; b = b->next) { @@ -986,7 +989,6 @@ nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) return NXT_UNIT_ERROR; } - b->hdr = NULL; b->req = req; b->buf.start = recv_msg->start; b->buf.free = b->buf.start; @@ -1193,12 +1195,6 @@ nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws) ws->req = NULL; - if (ws_impl->retain_buf != NULL) { - free(ws_impl->retain_buf); - - ws_impl->retain_buf = NULL; - } - pthread_mutex_lock(&ws_impl->ctx_impl->mutex); nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link); @@ -1649,7 +1645,7 @@ nxt_unit_response_send(nxt_unit_request_info_t *req) req->response_buf = NULL; req_impl->state = NXT_UNIT_RS_RESPONSE_SENT; - nxt_unit_mmap_buf_release(mmap_buf); + nxt_unit_mmap_buf_free(mmap_buf); } return rc; @@ -1697,7 +1693,7 @@ nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size) nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf); rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, - &req->response_port, size, mmap_buf); + &req->response_port, size, mmap_buf, NULL); if (nxt_slow_path(rc != NXT_UNIT_OK)) { nxt_unit_mmap_buf_release(mmap_buf); @@ -1762,6 +1758,9 @@ nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx) mmap_buf->ctx_impl = ctx_impl; + mmap_buf->hdr = NULL; + mmap_buf->free_ptr = NULL; + return mmap_buf; } @@ -1896,7 +1895,7 @@ nxt_unit_buf_send(nxt_unit_buf_t *buf) } } - nxt_unit_mmap_buf_release(mmap_buf); + nxt_unit_mmap_buf_free(mmap_buf); return NXT_UNIT_OK; } @@ -1917,7 +1916,7 @@ nxt_unit_buf_send_done(nxt_unit_buf_t *buf) rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 1); if (nxt_slow_path(rc == NXT_UNIT_OK)) { - nxt_unit_mmap_buf_release(mmap_buf); + nxt_unit_mmap_buf_free(mmap_buf); nxt_unit_request_info_release(req); @@ -1936,7 +1935,7 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, nxt_port_mmap_msg_t mmap_msg; } m; - u_char *end, *last_used, *first_free; + u_char *last_used, *first_free; ssize_t res; nxt_chunk_id_t first_free_chunk; nxt_unit_buf_t *buf; @@ -1960,35 +1959,67 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, m.msg.mf = 0; m.msg.tracking = 0; - if (hdr != NULL) { + if (m.msg.mmap) { m.mmap_msg.mmap_id = hdr->id; m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr, (u_char *) buf->start); - } - nxt_unit_debug(ctx, "#%"PRIu32": send mmap: (%d,%d,%d)", - stream, - (int) m.mmap_msg.mmap_id, - (int) m.mmap_msg.chunk_id, - (int) m.mmap_msg.size); + nxt_unit_debug(ctx, "#%"PRIu32": send mmap: (%d,%d,%d)", + stream, + (int) m.mmap_msg.mmap_id, + (int) m.mmap_msg.chunk_id, + (int) m.mmap_msg.size); - res = lib->callbacks.port_send(ctx, &mmap_buf->port_id, &m, - m.msg.mmap ? sizeof(m) : sizeof(m.msg), - NULL, 0); - if (nxt_slow_path(res != sizeof(m))) { - return NXT_UNIT_ERROR; - } + res = lib->callbacks.port_send(ctx, &mmap_buf->port_id, &m, sizeof(m), + NULL, 0); + if (nxt_slow_path(res != sizeof(m))) { + return NXT_UNIT_ERROR; + } + + if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) { + last_used = (u_char *) buf->free - 1; + + first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1; + first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk); - if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE && hdr != NULL) { - last_used = (u_char *) buf->free - 1; + buf->start = (char *) first_free; + buf->free = buf->start; - first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1; - first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk); - end = (u_char *) buf->end; + if (buf->end < buf->start) { + buf->end = buf->start; + } - nxt_unit_mmap_release(hdr, first_free, (uint32_t) (end - first_free)); + } else { + buf->start = NULL; + buf->free = NULL; + buf->end = NULL; - buf->end = (char *) first_free; + mmap_buf->hdr = NULL; + } + + } else { + if (nxt_slow_path(mmap_buf->plain_ptr == NULL + || mmap_buf->plain_ptr > buf->start - sizeof(m.msg))) + { + nxt_unit_warn(ctx, "#%"PRIu32": failed to send plain memory buffer" + ": no space reserved for message header", stream); + + return NXT_UNIT_ERROR; + } + + memcpy(buf->start - sizeof(m.msg), &m.msg, sizeof(m.msg)); + + nxt_unit_debug(ctx, "#%"PRIu32": send plain: %d", + stream, + (int) (sizeof(m.msg) + m.mmap_msg.size)); + + res = lib->callbacks.port_send(ctx, &mmap_buf->port_id, + buf->start - sizeof(m.msg), + m.mmap_msg.size + sizeof(m.msg), + NULL, 0); + if (nxt_slow_path(res != (ssize_t) (m.mmap_msg.size + sizeof(m.msg)))) { + return NXT_UNIT_ERROR; + } } return NXT_UNIT_OK; @@ -2005,12 +2036,22 @@ nxt_unit_buf_free(nxt_unit_buf_t *buf) static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf) { - if (nxt_fast_path(mmap_buf->hdr != NULL)) { + nxt_unit_free_outgoing_buf(mmap_buf); + + nxt_unit_mmap_buf_release(mmap_buf); +} + + +static void +nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf) +{ + if (mmap_buf->hdr != NULL) { nxt_unit_mmap_release(mmap_buf->hdr, mmap_buf->buf.start, mmap_buf->buf.end - mmap_buf->buf.start); - } - nxt_unit_mmap_buf_release(mmap_buf); + } else if (mmap_buf->free_ptr != NULL) { + free(mmap_buf->free_ptr); + } } @@ -2052,11 +2093,18 @@ nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start, const char *part_start; nxt_unit_mmap_buf_t mmap_buf; nxt_unit_request_info_impl_t *req_impl; + char local_buf[NXT_UNIT_LOCAL_BUF_SIZE]; req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); part_start = start; + if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { + nxt_unit_req_warn(req, "write: response not initialized yet"); + + return NXT_UNIT_ERROR; + } + /* Check if response is not send yet. */ if (nxt_slow_path(req->response_buf)) { part_size = req->response_buf->end - req->response_buf->free; @@ -2081,7 +2129,7 @@ nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start, rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, &req->response_port, part_size, - &mmap_buf); + &mmap_buf, local_buf); if (nxt_slow_path(rc != NXT_UNIT_OK)) { return rc; } @@ -2090,10 +2138,10 @@ nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start, part_start, part_size); rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, &mmap_buf, 0); - if (nxt_slow_path(rc != NXT_UNIT_OK)) { - nxt_unit_mmap_release(mmap_buf.hdr, mmap_buf.buf.start, - mmap_buf.buf.end - mmap_buf.buf.start); + nxt_unit_free_outgoing_buf(&mmap_buf); + + if (nxt_slow_path(rc != NXT_UNIT_OK)) { return rc; } @@ -2109,9 +2157,14 @@ int nxt_unit_response_write_cb(nxt_unit_request_info_t *req, nxt_unit_read_info_t *read_info) { - int rc; - ssize_t n; - nxt_unit_buf_t *buf; + int rc; + ssize_t n; + nxt_unit_buf_t *buf; + nxt_unit_mmap_buf_t mmap_buf; + nxt_unit_request_info_impl_t *req_impl; + char local_buf[NXT_UNIT_LOCAL_BUF_SIZE]; + + req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); /* Check if response is not send yet. */ if (nxt_slow_path(req->response_buf)) { @@ -2159,20 +2212,23 @@ nxt_unit_response_write_cb(nxt_unit_request_info_t *req, nxt_unit_req_debug(req, "write_cb, alloc %"PRIu32"", read_info->buf_size); - buf = nxt_unit_response_buf_alloc(req, nxt_min(read_info->buf_size, - PORT_MMAP_DATA_SIZE)); - if (nxt_slow_path(buf == NULL)) { - nxt_unit_req_error(req, "Failed to allocate buf for content"); - - return NXT_UNIT_ERROR; + rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, + &req->response_port, + nxt_min(read_info->buf_size, + PORT_MMAP_DATA_SIZE), + &mmap_buf, local_buf); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return rc; } + buf = &mmap_buf.buf; + while (!read_info->eof && buf->end > buf->free) { n = read_info->read(read_info, buf->free, buf->end - buf->free); if (nxt_slow_path(n < 0)) { nxt_unit_req_error(req, "Read error"); - nxt_unit_buf_free(buf); + nxt_unit_free_outgoing_buf(&mmap_buf); return NXT_UNIT_ERROR; } @@ -2180,7 +2236,10 @@ nxt_unit_response_write_cb(nxt_unit_request_info_t *req, buf->free += n; } - rc = nxt_unit_buf_send(buf); + rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, &mmap_buf, 0); + + nxt_unit_free_outgoing_buf(&mmap_buf); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { nxt_unit_req_error(req, "Failed to send content"); @@ -2325,12 +2384,17 @@ int nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode, uint8_t last, const struct iovec *iov, int iovcnt) { - int i, rc; - size_t l, copy; - uint32_t payload_len, buf_size; - const uint8_t *b; - nxt_unit_buf_t *buf; - nxt_websocket_header_t *wh; + int i, rc; + size_t l, copy; + uint32_t payload_len, buf_size; + const uint8_t *b; + nxt_unit_buf_t *buf; + nxt_unit_mmap_buf_t mmap_buf; + nxt_websocket_header_t *wh; + nxt_unit_request_info_impl_t *req_impl; + char local_buf[NXT_UNIT_LOCAL_BUF_SIZE]; + + req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); payload_len = 0; @@ -2340,17 +2404,21 @@ nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode, buf_size = 10 + payload_len; - buf = nxt_unit_response_buf_alloc(req, nxt_min(buf_size, - PORT_MMAP_DATA_SIZE)); - if (nxt_slow_path(buf == NULL)) { - nxt_unit_req_error(req, "Failed to allocate buf for content"); - - return NXT_UNIT_ERROR; + rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, + &req->response_port, + nxt_min(buf_size, PORT_MMAP_DATA_SIZE), + &mmap_buf, local_buf); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return rc; } + buf = &mmap_buf.buf; + buf->start[0] = 0; buf->start[1] = 0; + buf_size -= buf->end - buf->start; + wh = (void *) buf->free; buf->free = nxt_websocket_frame_init(wh, payload_len); @@ -2370,32 +2438,36 @@ nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode, l -= copy; if (l > 0) { - buf_size -= buf->end - buf->start; + if (nxt_fast_path(buf->free > buf->start)) { + rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, + &mmap_buf, 0); - rc = nxt_unit_buf_send(buf); - if (nxt_slow_path(rc != NXT_UNIT_OK)) { - nxt_unit_req_error(req, "Failed to send content"); + nxt_unit_free_outgoing_buf(&mmap_buf); - return NXT_UNIT_ERROR; + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return rc; + } } - buf = nxt_unit_response_buf_alloc(req, nxt_min(buf_size, - PORT_MMAP_DATA_SIZE)); - if (nxt_slow_path(buf == NULL)) { - nxt_unit_req_error(req, - "Failed to allocate buf for content"); - - return NXT_UNIT_ERROR; + rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, + &req->response_port, + nxt_min(buf_size, + PORT_MMAP_DATA_SIZE), + &mmap_buf, local_buf); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + return rc; } + + buf_size -= buf->end - buf->start; } } } if (buf->free > buf->start) { - rc = nxt_unit_buf_send(buf); - if (nxt_slow_path(rc != NXT_UNIT_OK)) { - nxt_unit_req_error(req, "Failed to send content"); - } + rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, + &mmap_buf, 0); + + nxt_unit_free_outgoing_buf(&mmap_buf); } return rc; @@ -2437,7 +2509,7 @@ nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws) ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws); - if (ws_impl->retain_buf != NULL || ws_impl->buf->hdr != NULL) { + if (ws_impl->buf->free_ptr != NULL || ws_impl->buf->hdr != NULL) { return NXT_UNIT_OK; } @@ -2454,7 +2526,7 @@ nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws) ws_impl->buf->buf.free = b; ws_impl->buf->buf.end = b + size; - ws_impl->retain_buf = b; + ws_impl->buf->free_ptr = b; return NXT_UNIT_OK; } @@ -2760,12 +2832,38 @@ nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int fd) static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, uint32_t size, - nxt_unit_mmap_buf_t *mmap_buf) + nxt_unit_mmap_buf_t *mmap_buf, char *local_buf) { uint32_t nchunks; nxt_chunk_id_t c; nxt_port_mmap_header_t *hdr; + if (size <= NXT_UNIT_MAX_PLAIN_SIZE) { + if (local_buf != NULL) { + mmap_buf->free_ptr = NULL; + mmap_buf->plain_ptr = local_buf; + + } else { + mmap_buf->free_ptr = malloc(size + sizeof(nxt_port_msg_t)); + if (nxt_slow_path(mmap_buf->free_ptr == NULL)) { + return NXT_UNIT_ERROR; + } + + mmap_buf->plain_ptr = mmap_buf->free_ptr; + } + + mmap_buf->hdr = NULL; + mmap_buf->buf.start = mmap_buf->plain_ptr + sizeof(nxt_port_msg_t); + mmap_buf->buf.free = mmap_buf->buf.start; + mmap_buf->buf.end = mmap_buf->buf.start + size; + mmap_buf->port_id = *port_id; + + nxt_unit_debug(ctx, "outgoing plain buffer allocation: (%p, %d)", + mmap_buf->buf.start, (int) size); + + return NXT_UNIT_OK; + } + nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE; hdr = nxt_unit_mmap_get(ctx, process, port_id, &c, nchunks); @@ -2778,6 +2876,7 @@ nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, mmap_buf->buf.free = mmap_buf->buf.start; mmap_buf->buf.end = mmap_buf->buf.start + nchunks * PORT_MMAP_CHUNK_SIZE; mmap_buf->port_id = *port_id; + mmap_buf->free_ptr = NULL; nxt_unit_debug(ctx, "outgoing mmap allocation: (%d,%d,%d)", (int) hdr->id, (int) c, @@ -3020,6 +3119,22 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) incoming_tail = &recv_msg->incoming_buf; + for (; mmap_msg < end; mmap_msg++) { + b = nxt_unit_mmap_buf_get(ctx); + if (nxt_slow_path(b == NULL)) { + nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf", + recv_msg->stream); + + return NXT_UNIT_ERROR; + } + + nxt_unit_mmap_buf_insert(incoming_tail, b); + incoming_tail = &b->next; + } + + b = recv_msg->incoming_buf; + mmap_msg = recv_msg->start; + pthread_mutex_lock(&process->incoming.mutex); for (; mmap_msg < end; mmap_msg++) { @@ -3043,26 +3158,13 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) recv_msg->size = size; } - b = nxt_unit_mmap_buf_get(ctx); - if (nxt_slow_path(b == NULL)) { - pthread_mutex_unlock(&process->incoming.mutex); - - nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf", - recv_msg->stream); - - nxt_unit_mmap_release(hdr, start, size); - - return NXT_UNIT_ERROR; - } - - nxt_unit_mmap_buf_insert(incoming_tail, b); - incoming_tail = &b->next; - b->buf.start = start; b->buf.free = start; b->buf.end = b->buf.start + size; b->hdr = hdr; + b = b->next; + nxt_unit_debug(ctx, "#%"PRIu32": mmap_read: [%p,%d] %d->%d,(%d,%d,%d)", recv_msg->stream, start, (int) size, -- cgit From 806b3945fed6f7d2a8e836487dcea9c63f8d7129 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Tue, 24 Dec 2019 18:04:00 +0300 Subject: Introducing write tail reference to avoid buffer chain iteration. --- src/nxt_h1proto.c | 19 +++++++++++++++---- src/nxt_h1proto.h | 2 ++ 2 files changed, 17 insertions(+), 4 deletions(-) (limited to 'src') diff --git a/src/nxt_h1proto.c b/src/nxt_h1proto.c index b07eaf84..8ce57893 100644 --- a/src/nxt_h1proto.c +++ b/src/nxt_h1proto.c @@ -1230,6 +1230,7 @@ nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r, c = h1p->conn; c->write = header; + h1p->conn_write_tail = &header->next; c->write_state = &nxt_h1p_request_send_state; if (body_handler != NULL) { @@ -1342,8 +1343,14 @@ nxt_h1p_request_send(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *out) nxt_conn_write(task->thread->engine, c); } else { - nxt_buf_chain_add(&c->write, out); + *h1p->conn_write_tail = out; } + + while (out->next != NULL) { + out = out->next; + } + + h1p->conn_write_tail = &out->next; } @@ -1730,9 +1737,10 @@ nxt_h1p_idle_timeout(nxt_task_t *task, void *obj, void *data) static void nxt_h1p_idle_response(nxt_task_t *task, nxt_conn_t *c) { - u_char *p; - size_t size; - nxt_buf_t *out, *last; + u_char *p; + size_t size; + nxt_buf_t *out, *last; + nxt_h1proto_t *h1p; size = nxt_length(NXT_H1P_IDLE_TIMEOUT) + nxt_http_date_cache.size @@ -1762,6 +1770,9 @@ nxt_h1p_idle_response(nxt_task_t *task, nxt_conn_t *c) last->completion_handler = nxt_h1p_idle_response_sent; last->parent = c; + h1p = c->socket.data; + h1p->conn_write_tail = &last->next; + c->write = out; c->write_state = &nxt_h1p_timeout_response_state; diff --git a/src/nxt_h1proto.h b/src/nxt_h1proto.h index 61da6770..3294713f 100644 --- a/src/nxt_h1proto.h +++ b/src/nxt_h1proto.h @@ -40,6 +40,8 @@ struct nxt_h1proto_s { nxt_http_request_t *request; nxt_buf_t *buffers; + + nxt_buf_t **conn_write_tail; /* * All fields before the conn field will * be zeroed in a keep-alive connection. -- cgit From 429c5a1c54fbc457c6caf6503525fd22225a4e22 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Tue, 24 Dec 2019 18:04:05 +0300 Subject: Renaming nxt_unit_mmap_buf_remove to nxt_unit_mmap_buf_unlink. The function unchains the buffer from the buffer's linked list. --- src/nxt_unit.c | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) (limited to 'src') diff --git a/src/nxt_unit.c b/src/nxt_unit.c index d97d19f2..118e3e5d 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -41,7 +41,7 @@ nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head, nxt_unit_mmap_buf_t *mmap_buf); nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev, nxt_unit_mmap_buf_t *mmap_buf); -nxt_inline void nxt_unit_mmap_buf_remove(nxt_unit_mmap_buf_t *mmap_buf); +nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf); static int nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream); static int nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, @@ -521,7 +521,7 @@ nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev, nxt_inline void -nxt_unit_mmap_buf_remove(nxt_unit_mmap_buf_t *mmap_buf) +nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf) { nxt_unit_mmap_buf_t **prev; @@ -1751,7 +1751,7 @@ nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx) } else { mmap_buf = ctx_impl->free_buf; - nxt_unit_mmap_buf_remove(mmap_buf); + nxt_unit_mmap_buf_unlink(mmap_buf); pthread_mutex_unlock(&ctx_impl->mutex); } @@ -1768,7 +1768,7 @@ nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx) static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf) { - nxt_unit_mmap_buf_remove(mmap_buf); + nxt_unit_mmap_buf_unlink(mmap_buf); pthread_mutex_lock(&mmap_buf->ctx_impl->mutex); @@ -3501,12 +3501,12 @@ nxt_unit_ctx_free(nxt_unit_ctx_t *ctx) } nxt_queue_loop; - nxt_unit_mmap_buf_remove(&ctx_impl->ctx_buf[0]); - nxt_unit_mmap_buf_remove(&ctx_impl->ctx_buf[1]); + nxt_unit_mmap_buf_unlink(&ctx_impl->ctx_buf[0]); + nxt_unit_mmap_buf_unlink(&ctx_impl->ctx_buf[1]); while (ctx_impl->free_buf != NULL) { mmap_buf = ctx_impl->free_buf; - nxt_unit_mmap_buf_remove(mmap_buf); + nxt_unit_mmap_buf_unlink(mmap_buf); free(mmap_buf); } -- cgit From 64f649f9903e226421869376bc72a8513581d7d0 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Tue, 24 Dec 2019 18:04:09 +0300 Subject: Adding "limits/shm" configuration validation and parsing. --- src/nxt_application.h | 3 +++ src/nxt_conf_validation.c | 5 +++++ src/nxt_external.c | 4 ++-- src/nxt_java.c | 1 + src/nxt_main_process.c | 32 +++++++++++++++++++++++++++++++- src/nxt_php_sapi.c | 1 + src/nxt_python_wsgi.c | 1 + src/nxt_unit.c | 28 ++++++++++++++++++++-------- src/nxt_unit.h | 1 + src/perl/nxt_perl_psgi.c | 1 + src/ruby/nxt_ruby.c | 1 + 11 files changed, 67 insertions(+), 11 deletions(-) (limited to 'src') diff --git a/src/nxt_application.h b/src/nxt_application.h index 2a1fa39e..e7177887 100644 --- a/src/nxt_application.h +++ b/src/nxt_application.h @@ -89,6 +89,9 @@ struct nxt_common_app_conf_s { nxt_conf_value_t *environment; nxt_conf_value_t *isolation; + nxt_conf_value_t *limits; + + size_t shm_limit; union { nxt_external_app_conf_t external; diff --git a/src/nxt_conf_validation.c b/src/nxt_conf_validation.c index 105af675..923e6c8c 100644 --- a/src/nxt_conf_validation.c +++ b/src/nxt_conf_validation.c @@ -358,6 +358,11 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_app_limits_members[] = { NULL, NULL }, + { nxt_string("shm"), + NXT_CONF_VLDT_INTEGER, + NULL, + NULL }, + NXT_CONF_VLDT_END }; diff --git a/src/nxt_external.c b/src/nxt_external.c index 89fe08c8..e498a938 100644 --- a/src/nxt_external.c +++ b/src/nxt_external.c @@ -98,11 +98,11 @@ nxt_external_init(nxt_task_t *task, nxt_common_app_conf_t *conf) "%s;%uD;" "%PI,%ud,%d;" "%PI,%ud,%d;" - "%d,%Z", + "%d,%z,%Z", NXT_VERSION, my_port->process->init->stream, main_port->pid, main_port->id, main_port->pair[1], my_port->pid, my_port->id, my_port->pair[0], - 2); + 2, conf->shm_limit); if (nxt_slow_path(p == end)) { nxt_alert(task, "internal error: buffer too small for NXT_UNIT_INIT"); diff --git a/src/nxt_java.c b/src/nxt_java.c index 08e24595..004907d6 100644 --- a/src/nxt_java.c +++ b/src/nxt_java.c @@ -354,6 +354,7 @@ nxt_java_init(nxt_task_t *task, nxt_common_app_conf_t *conf) java_init.callbacks.close_handler = nxt_java_close_handler; java_init.request_data_size = sizeof(nxt_java_request_data_t); java_init.data = &data; + java_init.shm_limit = conf->shm_limit; ctx = nxt_unit_init(&java_init); if (nxt_slow_path(ctx == NULL)) { diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c index 79b9ee1f..eed37752 100644 --- a/src/nxt_main_process.c +++ b/src/nxt_main_process.c @@ -215,7 +215,24 @@ static nxt_conf_map_t nxt_common_app_conf[] = { nxt_string("isolation"), NXT_CONF_MAP_PTR, offsetof(nxt_common_app_conf_t, isolation), - } + }, + + { + nxt_string("limits"), + NXT_CONF_MAP_PTR, + offsetof(nxt_common_app_conf_t, limits), + }, + +}; + + +static nxt_conf_map_t nxt_common_app_limits_conf[] = { + { + nxt_string("shm"), + NXT_CONF_MAP_SIZE, + offsetof(nxt_common_app_conf_t, shm_limit), + }, + }; @@ -381,6 +398,7 @@ nxt_port_main_start_worker_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) app_conf.name.start = start; app_conf.name.length = nxt_strlen(start); + app_conf.shm_limit = 100 * 1024 * 1024; start += app_conf.name.length + 1; @@ -427,6 +445,18 @@ nxt_port_main_start_worker_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) goto failed; } + if (app_conf.limits != NULL) { + ret = nxt_conf_map_object(mp, app_conf.limits, + nxt_common_app_limits_conf, + nxt_nitems(nxt_common_app_limits_conf), + &app_conf); + + if (nxt_slow_path(ret != NXT_OK)) { + nxt_alert(task, "failed to map app limits received from router"); + goto failed; + } + } + ret = nxt_main_start_worker_process(task, task->thread->runtime, &app_conf, msg->port_msg.stream); diff --git a/src/nxt_php_sapi.c b/src/nxt_php_sapi.c index 7a5e0a3b..0f6ce686 100644 --- a/src/nxt_php_sapi.c +++ b/src/nxt_php_sapi.c @@ -352,6 +352,7 @@ nxt_php_init(nxt_task_t *task, nxt_common_app_conf_t *conf) nxt_fd_blocking(task, my_port->pair[0]); php_init.log_fd = 2; + php_init.shm_limit = conf->shm_limit; unit_ctx = nxt_unit_init(&php_init); if (nxt_slow_path(unit_ctx == NULL)) { diff --git a/src/nxt_python_wsgi.c b/src/nxt_python_wsgi.c index 3b13bea1..ea8b6903 100644 --- a/src/nxt_python_wsgi.c +++ b/src/nxt_python_wsgi.c @@ -404,6 +404,7 @@ nxt_python_init(nxt_task_t *task, nxt_common_app_conf_t *conf) nxt_unit_default_init(task, &python_init); python_init.callbacks.request_handler = nxt_python_request_handler; + python_init.shm_limit = conf->shm_limit; unit_ctx = nxt_unit_init(&python_init); if (nxt_slow_path(unit_ctx == NULL)) { diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 118e3e5d..8bc7b679 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -43,7 +43,8 @@ nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev, nxt_unit_mmap_buf_t *mmap_buf); nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf); static int nxt_unit_read_env(nxt_unit_port_t *ready_port, - nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream); + nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream, + uint32_t *shm_limit); static int nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, uint32_t stream); static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, @@ -240,6 +241,7 @@ struct nxt_unit_impl_s { nxt_unit_callbacks_t callbacks; uint32_t request_data_size; + uint32_t shm_mmap_limit; pthread_mutex_t mutex; @@ -306,7 +308,7 @@ nxt_unit_ctx_t * nxt_unit_init(nxt_unit_init_t *init) { int rc; - uint32_t ready_stream; + uint32_t ready_stream, shm_limit; nxt_unit_ctx_t *ctx; nxt_unit_impl_t *lib; nxt_unit_port_t ready_port, read_port; @@ -329,12 +331,20 @@ nxt_unit_init(nxt_unit_init_t *init) ready_port.id.id); nxt_unit_port_id_init(&read_port.id, read_port.id.pid, read_port.id.id); + } else { rc = nxt_unit_read_env(&ready_port, &read_port, &lib->log_fd, - &ready_stream); + &ready_stream, &shm_limit); if (nxt_slow_path(rc != NXT_UNIT_OK)) { goto fail; } + + lib->shm_mmap_limit = (shm_limit + PORT_MMAP_DATA_SIZE - 1) + / PORT_MMAP_DATA_SIZE; + } + + if (nxt_slow_path(lib->shm_mmap_limit < 1)) { + lib->shm_mmap_limit = 1; } lib->pid = read_port.id.pid; @@ -399,6 +409,8 @@ nxt_unit_create(nxt_unit_init_t *init) lib->callbacks = init->callbacks; lib->request_data_size = init->request_data_size; + lib->shm_mmap_limit = (init->shm_limit + PORT_MMAP_DATA_SIZE - 1) + / PORT_MMAP_DATA_SIZE; lib->processes.slot = NULL; lib->ports.slot = NULL; @@ -539,7 +551,7 @@ nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf) static int nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *read_port, - int *log_fd, uint32_t *stream) + int *log_fd, uint32_t *stream, uint32_t *shm_limit) { int rc; int ready_fd, read_fd; @@ -574,14 +586,14 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *read_port, "%"PRIu32";" "%"PRId64",%"PRIu32",%d;" "%"PRId64",%"PRIu32",%d;" - "%d", + "%d,%"PRIu32, &ready_stream, &ready_pid, &ready_id, &ready_fd, &read_pid, &read_id, &read_fd, - log_fd); + log_fd, shm_limit); - if (nxt_slow_path(rc != 8)) { - nxt_unit_alert(NULL, "failed to scan variables"); + if (nxt_slow_path(rc != 9)) { + nxt_unit_alert(NULL, "failed to scan variables: %d", rc); return NXT_UNIT_ERROR; } diff --git a/src/nxt_unit.h b/src/nxt_unit.h index 3471a758..6948b253 100644 --- a/src/nxt_unit.h +++ b/src/nxt_unit.h @@ -155,6 +155,7 @@ struct nxt_unit_init_s { int max_pending_requests; uint32_t request_data_size; + uint32_t shm_limit; nxt_unit_callbacks_t callbacks; diff --git a/src/perl/nxt_perl_psgi.c b/src/perl/nxt_perl_psgi.c index b99d3269..16159b5b 100644 --- a/src/perl/nxt_perl_psgi.c +++ b/src/perl/nxt_perl_psgi.c @@ -1156,6 +1156,7 @@ nxt_perl_psgi_init(nxt_task_t *task, nxt_common_app_conf_t *conf) perl_init.callbacks.request_handler = nxt_perl_psgi_request_handler; perl_init.data = &module; + perl_init.shm_limit = conf->shm_limit; unit_ctx = nxt_unit_init(&perl_init); if (nxt_slow_path(unit_ctx == NULL)) { diff --git a/src/ruby/nxt_ruby.c b/src/ruby/nxt_ruby.c index 45d7a7aa..e4b30319 100644 --- a/src/ruby/nxt_ruby.c +++ b/src/ruby/nxt_ruby.c @@ -134,6 +134,7 @@ nxt_ruby_init(nxt_task_t *task, nxt_common_app_conf_t *conf) nxt_unit_default_init(task, &ruby_unit_init); ruby_unit_init.callbacks.request_handler = nxt_ruby_request_handler; + ruby_unit_init.shm_limit = conf->shm_limit; unit_ctx = nxt_unit_init(&ruby_unit_init); if (nxt_slow_path(unit_ctx == NULL)) { -- cgit From df7caf465072e171f88358b9e69c65b76d8efd25 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Tue, 24 Dec 2019 18:04:13 +0300 Subject: Introducing port messages to notify about out of shared memory. - OOSM (out of shared memory). Sent by application process to router when application reaches the limit of allocated shared memory and needs more. - SHM_ACK. Sent by router to application when the application's shared memory is released and the OOSM flag is enabled for the segment. This implements blocking mode (the library waits for SHM_ACK in case of out of shared memory condition and retries allocating the required memory amount) and non-blocking mode (the library notifies the application that it's out of shared memory and returns control to the application module that sets up the output queue and puts SHM_ACK in the main message loop). --- src/nxt_port.h | 9 + src/nxt_port_memory.c | 19 ++ src/nxt_port_memory_int.h | 1 + src/nxt_router.c | 56 +++++ src/nxt_unit.c | 544 ++++++++++++++++++++++++++++++++++++++++------ src/nxt_unit.h | 6 + 6 files changed, 567 insertions(+), 68 deletions(-) (limited to 'src') diff --git a/src/nxt_port.h b/src/nxt_port.h index 3f9302e8..c6f15238 100644 --- a/src/nxt_port.h +++ b/src/nxt_port.h @@ -44,6 +44,9 @@ struct nxt_port_handlers_s { /* Various data. */ nxt_port_handler_t data; + + nxt_port_handler_t oosm; + nxt_port_handler_t shm_ack; }; @@ -82,6 +85,9 @@ typedef enum { _NXT_PORT_MSG_DATA = nxt_port_handler_idx(data), + _NXT_PORT_MSG_OOSM = nxt_port_handler_idx(oosm), + _NXT_PORT_MSG_SHM_ACK = nxt_port_handler_idx(shm_ack), + NXT_PORT_MSG_MAX = sizeof(nxt_port_handlers_t) / sizeof(nxt_port_handler_t), @@ -114,6 +120,9 @@ typedef enum { NXT_PORT_MSG_DATA = _NXT_PORT_MSG_DATA, NXT_PORT_MSG_DATA_LAST = _NXT_PORT_MSG_DATA | NXT_PORT_MSG_LAST, + + NXT_PORT_MSG_OOSM = _NXT_PORT_MSG_OOSM | NXT_PORT_MSG_LAST, + NXT_PORT_MSG_SHM_ACK = _NXT_PORT_MSG_SHM_ACK | NXT_PORT_MSG_LAST, } nxt_port_msg_type_t; diff --git a/src/nxt_port_memory.c b/src/nxt_port_memory.c index b7068c88..24a40406 100644 --- a/src/nxt_port_memory.c +++ b/src/nxt_port_memory.c @@ -112,6 +112,8 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data) u_char *p; nxt_mp_t *mp; nxt_buf_t *b; + nxt_port_t *port; + nxt_process_t *process; nxt_chunk_id_t c; nxt_port_mmap_header_t *hdr; nxt_port_mmap_handler_t *mmap_handler; @@ -163,6 +165,21 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data) c++; } + if (hdr->dst_pid == nxt_pid + && nxt_atomic_cmp_set(&hdr->oosm, 1, 0)) + { + process = nxt_runtime_process_find(task->thread->runtime, hdr->src_pid); + + if (process != NULL && !nxt_queue_is_empty(&process->ports)) { + port = nxt_process_port_first(process); + + if (port->type == NXT_PROCESS_WORKER) { + (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_SHM_ACK, + -1, 0, 0, NULL); + } + } + } + release_buf: nxt_port_mmap_handler_use(mmap_handler, -1); @@ -454,6 +471,8 @@ nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c, goto unlock_return; } } + + hdr->oosm = 1; } /* TODO introduce port_mmap limit and release wait. */ diff --git a/src/nxt_port_memory_int.h b/src/nxt_port_memory_int.h index 53dfaebf..87c3d833 100644 --- a/src/nxt_port_memory_int.h +++ b/src/nxt_port_memory_int.h @@ -51,6 +51,7 @@ struct nxt_port_mmap_header_s { nxt_pid_t src_pid; /* For sanity check. */ nxt_pid_t dst_pid; /* For sanity check. */ nxt_port_id_t sent_over; + nxt_atomic_t oosm; nxt_free_map_t free_map[MAX_FREE_IDX]; nxt_free_map_t free_map_padding; nxt_free_map_t free_tracking_map[MAX_FREE_IDX]; diff --git a/src/nxt_router.c b/src/nxt_router.c index 38396e86..6a1f3792 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -248,6 +248,7 @@ static nxt_int_t nxt_router_http_request_done(nxt_task_t *task, nxt_http_request_t *r); static void nxt_router_http_request_release(nxt_task_t *task, void *obj, void *data); +static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); extern const nxt_http_request_state_t nxt_http_websocket; @@ -276,6 +277,7 @@ nxt_port_handlers_t nxt_router_process_port_handlers = { .access_log = nxt_router_access_log_reopen_handler, .rpc_ready = nxt_port_rpc_handler, .rpc_error = nxt_port_rpc_handler, + .oosm = nxt_router_oosm_handler, }; @@ -2748,6 +2750,7 @@ static nxt_port_handlers_t nxt_router_app_port_handlers = { .rpc_error = nxt_port_rpc_handler, .mmap = nxt_port_mmap_handler, .data = nxt_port_rpc_handler, + .oosm = nxt_router_oosm_handler, }; @@ -5241,3 +5244,56 @@ nxt_router_http_request_release(nxt_task_t *task, void *obj, void *data) nxt_mp_release(r->mem_pool); } + + +static void +nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + size_t mi; + uint32_t i; + nxt_bool_t ack; + nxt_process_t *process; + nxt_free_map_t *m; + nxt_port_mmap_header_t *hdr; + + nxt_debug(task, "oosm in %PI", msg->port_msg.pid); + + process = nxt_runtime_process_find(task->thread->runtime, + msg->port_msg.pid); + if (nxt_slow_path(process == NULL)) { + return; + } + + ack = 0; + + /* + * To mitigate possible racing condition (when OOSM message received + * after some of the memory was already freed), need to try to find + * first free segment in shared memory and send ACK if found. + */ + + nxt_thread_mutex_lock(&process->incoming.mutex); + + for (i = 0; i < process->incoming.size; i++) { + hdr = process->incoming.elts[i].mmap_handler->hdr; + m = hdr->free_map; + + for (mi = 0; mi < MAX_FREE_IDX; mi++) { + if (m[mi] != 0) { + ack = 1; + + nxt_debug(task, "oosm: already free #%uD %uz = 0x%08xA", + i, mi, m[mi]); + + break; + } + } + } + + nxt_thread_mutex_unlock(&process->incoming.mutex); + + if (ack) { + (void) nxt_port_socket_write(task, msg->port, NXT_PORT_MSG_SHM_ACK, + -1, 0, 0, NULL); + } +} diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 8bc7b679..95874db3 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -19,6 +19,10 @@ #include #endif +#define NXT_UNIT_MAX_PLAIN_SIZE 1024 +#define NXT_UNIT_LOCAL_BUF_SIZE \ + (NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t)) + #define NXT_UNIT_MAX_PLAIN_SIZE 1024 #define NXT_UNIT_LOCAL_BUF_SIZE \ (NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t)) @@ -29,6 +33,7 @@ typedef struct nxt_unit_mmaps_s nxt_unit_mmaps_t; typedef struct nxt_unit_process_s nxt_unit_process_t; typedef struct nxt_unit_mmap_buf_s nxt_unit_mmap_buf_t; typedef struct nxt_unit_recv_msg_s nxt_unit_recv_msg_t; +typedef struct nxt_unit_read_buf_s nxt_unit_read_buf_t; typedef struct nxt_unit_ctx_impl_s nxt_unit_ctx_impl_t; typedef struct nxt_unit_port_impl_s nxt_unit_port_impl_t; typedef struct nxt_unit_request_info_impl_s nxt_unit_request_info_impl_t; @@ -53,6 +58,7 @@ static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); +static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx); static nxt_unit_request_info_impl_t *nxt_unit_request_info_get( nxt_unit_ctx_t *ctx); static void nxt_unit_request_info_release(nxt_unit_request_info_t *req); @@ -69,11 +75,18 @@ static int nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, nxt_unit_mmap_buf_t *mmap_buf, int last); static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf); static void nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf); +static nxt_unit_read_buf_t *nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx); +static nxt_unit_read_buf_t *nxt_unit_read_buf_get_impl( + nxt_unit_ctx_impl_t *ctx_impl); +static void nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx, + nxt_unit_read_buf_t *rbuf); static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, size_t size); static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, - nxt_chunk_id_t *c, int n); + nxt_chunk_id_t *c, int *n, int min_n); +static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id); +static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx); static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i); static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, int n); @@ -81,7 +94,7 @@ static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int fd); static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, uint32_t size, - nxt_unit_mmap_buf_t *mmap_buf, char *local_buf); + uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf); static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd); static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps); @@ -94,14 +107,18 @@ static int nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); -static int nxt_unit_mmap_release(nxt_port_mmap_header_t *hdr, void *start, - uint32_t size); +static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, + nxt_unit_process_t *process, + nxt_port_mmap_header_t *hdr, void *start, uint32_t size); +static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid); static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid); static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_ctx_t *ctx, pid_t pid, int remove); static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib); +static void nxt_unit_read_buf(nxt_unit_ctx_t *ctx, + nxt_unit_read_buf_t *rbuf); static int nxt_unit_create_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int *fd); @@ -144,6 +161,7 @@ struct nxt_unit_mmap_buf_s { nxt_unit_port_id_t port_id; nxt_unit_request_info_t *req; nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_process_t *process; char *free_ptr; char *plain_ptr; }; @@ -206,6 +224,14 @@ struct nxt_unit_websocket_frame_impl_s { }; +struct nxt_unit_read_buf_s { + nxt_unit_read_buf_t *next; + ssize_t size; + char buf[16384]; + char oob[256]; +}; + + struct nxt_unit_ctx_impl_s { nxt_unit_ctx_t ctx; @@ -230,7 +256,12 @@ struct nxt_unit_ctx_impl_s { /* of nxt_unit_request_info_impl_t */ nxt_lvlhsh_t requests; + nxt_unit_read_buf_t *pending_read_head; + nxt_unit_read_buf_t **pending_read_tail; + nxt_unit_read_buf_t *free_read_buf; + nxt_unit_mmap_buf_t ctx_buf[2]; + nxt_unit_read_buf_t ctx_read_buf; nxt_unit_request_info_impl_t req; }; @@ -277,6 +308,7 @@ struct nxt_unit_mmaps_s { pthread_mutex_t mutex; uint32_t size; uint32_t cap; + nxt_atomic_t allocated_chunks; nxt_unit_mmap_t *elts; }; @@ -495,6 +527,11 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link); + ctx_impl->pending_read_head = NULL; + ctx_impl->pending_read_tail = &ctx_impl->pending_read_head; + ctx_impl->free_read_buf = &ctx_impl->ctx_read_buf; + ctx_impl->ctx_read_buf.next = NULL; + ctx_impl->req.req.ctx = &ctx_impl->ctx; ctx_impl->req.req.unit = &lib->unit; @@ -772,6 +809,10 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, rc = NXT_UNIT_OK; break; + case _NXT_PORT_MSG_SHM_ACK: + rc = nxt_unit_process_shm_ack(ctx); + break; + default: nxt_unit_debug(ctx, "#%"PRIu32": ignore message type: %d", port_msg->stream, (int) port_msg->type); @@ -1052,6 +1093,23 @@ nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) } +static int +nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx) +{ + nxt_unit_impl_t *lib; + nxt_unit_callbacks_t *cb; + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + cb = &lib->callbacks; + + if (cb->shm_ack_handler != NULL) { + cb->shm_ack_handler(ctx); + } + + return NXT_UNIT_OK; +} + + static nxt_unit_request_info_impl_t * nxt_unit_request_info_get(nxt_unit_ctx_t *ctx) { @@ -1705,7 +1763,8 @@ nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size) nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf); rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, - &req->response_port, size, mmap_buf, NULL); + &req->response_port, size, size, mmap_buf, + NULL); if (nxt_slow_path(rc != NXT_UNIT_OK)) { nxt_unit_mmap_buf_release(mmap_buf); @@ -1947,6 +2006,7 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, nxt_port_mmap_msg_t mmap_msg; } m; + int rc; u_char *last_used, *first_free; ssize_t res; nxt_chunk_id_t first_free_chunk; @@ -1971,6 +2031,8 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, m.msg.mf = 0; m.msg.tracking = 0; + rc = NXT_UNIT_ERROR; + if (m.msg.mmap) { m.mmap_msg.mmap_id = hdr->id; m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr, @@ -1985,13 +2047,13 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, res = lib->callbacks.port_send(ctx, &mmap_buf->port_id, &m, sizeof(m), NULL, 0); if (nxt_slow_path(res != sizeof(m))) { - return NXT_UNIT_ERROR; + goto free_buf; } - if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) { - last_used = (u_char *) buf->free - 1; + last_used = (u_char *) buf->free - 1; + first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1; - first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1; + if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) { first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk); buf->start = (char *) first_free; @@ -2009,6 +2071,13 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, mmap_buf->hdr = NULL; } + nxt_atomic_fetch_add(&mmap_buf->process->outgoing.allocated_chunks, + (int) m.mmap_msg.chunk_id - (int) first_free_chunk); + + nxt_unit_debug(ctx, "process %d allocated_chunks %d", + mmap_buf->process->pid, + mmap_buf->process->outgoing.allocated_chunks); + } else { if (nxt_slow_path(mmap_buf->plain_ptr == NULL || mmap_buf->plain_ptr > buf->start - sizeof(m.msg))) @@ -2016,7 +2085,7 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, nxt_unit_warn(ctx, "#%"PRIu32": failed to send plain memory buffer" ": no space reserved for message header", stream); - return NXT_UNIT_ERROR; + goto free_buf; } memcpy(buf->start - sizeof(m.msg), &m.msg, sizeof(m.msg)); @@ -2030,11 +2099,17 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream, m.mmap_msg.size + sizeof(m.msg), NULL, 0); if (nxt_slow_path(res != (ssize_t) (m.mmap_msg.size + sizeof(m.msg)))) { - return NXT_UNIT_ERROR; + goto free_buf; } } - return NXT_UNIT_OK; + rc = NXT_UNIT_OK; + +free_buf: + + nxt_unit_free_outgoing_buf(mmap_buf); + + return rc; } @@ -2058,15 +2133,76 @@ static void nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf) { if (mmap_buf->hdr != NULL) { - nxt_unit_mmap_release(mmap_buf->hdr, mmap_buf->buf.start, + nxt_unit_mmap_release(&mmap_buf->ctx_impl->ctx, + mmap_buf->process, + mmap_buf->hdr, mmap_buf->buf.start, mmap_buf->buf.end - mmap_buf->buf.start); - } else if (mmap_buf->free_ptr != NULL) { + mmap_buf->hdr = NULL; + + return; + } + + if (mmap_buf->free_ptr != NULL) { free(mmap_buf->free_ptr); + + mmap_buf->free_ptr = NULL; } } +static nxt_unit_read_buf_t * +nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx) +{ + nxt_unit_ctx_impl_t *ctx_impl; + + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + pthread_mutex_lock(&ctx_impl->mutex); + + return nxt_unit_read_buf_get_impl(ctx_impl); +} + + +static nxt_unit_read_buf_t * +nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl) +{ + nxt_unit_read_buf_t *rbuf; + + if (ctx_impl->free_read_buf != NULL) { + rbuf = ctx_impl->free_read_buf; + ctx_impl->free_read_buf = rbuf->next; + + pthread_mutex_unlock(&ctx_impl->mutex); + + return rbuf; + } + + pthread_mutex_unlock(&ctx_impl->mutex); + + rbuf = malloc(sizeof(nxt_unit_read_buf_t)); + + return rbuf; +} + + +static void +nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx, + nxt_unit_read_buf_t *rbuf) +{ + nxt_unit_ctx_impl_t *ctx_impl; + + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + pthread_mutex_lock(&ctx_impl->mutex); + + rbuf->next = ctx_impl->free_read_buf; + ctx_impl->free_read_buf = rbuf; + + pthread_mutex_unlock(&ctx_impl->mutex); +} + + nxt_unit_buf_t * nxt_unit_buf_next(nxt_unit_buf_t *buf) { @@ -2099,9 +2235,22 @@ nxt_unit_buf_min(void) int nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start, size_t size) +{ + ssize_t res; + + res = nxt_unit_response_write_nb(req, start, size, size); + + return res < 0 ? -res : NXT_UNIT_OK; +} + + +ssize_t +nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start, + size_t size, size_t min_size) { int rc; - uint32_t part_size; + ssize_t sent; + uint32_t part_size, min_part_size, buf_size; const char *part_start; nxt_unit_mmap_buf_t mmap_buf; nxt_unit_request_info_impl_t *req_impl; @@ -2110,58 +2259,70 @@ nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start, req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); part_start = start; + sent = 0; if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { nxt_unit_req_warn(req, "write: response not initialized yet"); - return NXT_UNIT_ERROR; + return -NXT_UNIT_ERROR; } /* Check if response is not send yet. */ - if (nxt_slow_path(req->response_buf)) { + if (nxt_slow_path(req->response_buf != NULL)) { part_size = req->response_buf->end - req->response_buf->free; part_size = nxt_min(size, part_size); rc = nxt_unit_response_add_content(req, part_start, part_size); if (nxt_slow_path(rc != NXT_UNIT_OK)) { - return rc; + return -rc; } rc = nxt_unit_response_send(req); if (nxt_slow_path(rc != NXT_UNIT_OK)) { - return rc; + return -rc; } size -= part_size; part_start += part_size; + sent += part_size; + + min_size -= nxt_min(min_size, part_size); } while (size > 0) { part_size = nxt_min(size, PORT_MMAP_DATA_SIZE); + min_part_size = nxt_min(min_size, part_size); + min_part_size = nxt_min(min_part_size, PORT_MMAP_CHUNK_SIZE); rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, &req->response_port, part_size, - &mmap_buf, local_buf); + min_part_size, &mmap_buf, local_buf); if (nxt_slow_path(rc != NXT_UNIT_OK)) { - return rc; + return -rc; + } + + buf_size = mmap_buf.buf.end - mmap_buf.buf.free; + if (nxt_slow_path(buf_size == 0)) { + return sent; } + part_size = nxt_min(buf_size, part_size); mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free, part_start, part_size); rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, &mmap_buf, 0); - - nxt_unit_free_outgoing_buf(&mmap_buf); - if (nxt_slow_path(rc != NXT_UNIT_OK)) { - return rc; + return -rc; } size -= part_size; part_start += part_size; + sent += part_size; + + min_size -= nxt_min(min_size, part_size); } - return NXT_UNIT_OK; + return sent; } @@ -2171,6 +2332,7 @@ nxt_unit_response_write_cb(nxt_unit_request_info_t *req, { int rc; ssize_t n; + uint32_t buf_size; nxt_unit_buf_t *buf; nxt_unit_mmap_buf_t mmap_buf; nxt_unit_request_info_impl_t *req_impl; @@ -2224,10 +2386,11 @@ nxt_unit_response_write_cb(nxt_unit_request_info_t *req, nxt_unit_req_debug(req, "write_cb, alloc %"PRIu32"", read_info->buf_size); + buf_size = nxt_min(read_info->buf_size, PORT_MMAP_DATA_SIZE); + rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, &req->response_port, - nxt_min(read_info->buf_size, - PORT_MMAP_DATA_SIZE), + buf_size, buf_size, &mmap_buf, local_buf); if (nxt_slow_path(rc != NXT_UNIT_OK)) { return rc; @@ -2249,9 +2412,6 @@ nxt_unit_response_write_cb(nxt_unit_request_info_t *req, } rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, &mmap_buf, 0); - - nxt_unit_free_outgoing_buf(&mmap_buf); - if (nxt_slow_path(rc != NXT_UNIT_OK)) { nxt_unit_req_error(req, "Failed to send content"); @@ -2398,7 +2558,7 @@ nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode, { int i, rc; size_t l, copy; - uint32_t payload_len, buf_size; + uint32_t payload_len, buf_size, alloc_size; const uint8_t *b; nxt_unit_buf_t *buf; nxt_unit_mmap_buf_t mmap_buf; @@ -2415,10 +2575,11 @@ nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode, } buf_size = 10 + payload_len; + alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE); rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, &req->response_port, - nxt_min(buf_size, PORT_MMAP_DATA_SIZE), + alloc_size, alloc_size, &mmap_buf, local_buf); if (nxt_slow_path(rc != NXT_UNIT_OK)) { return rc; @@ -2454,17 +2615,16 @@ nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode, rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, &mmap_buf, 0); - nxt_unit_free_outgoing_buf(&mmap_buf); - if (nxt_slow_path(rc != NXT_UNIT_OK)) { return rc; } } + alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE); + rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process, &req->response_port, - nxt_min(buf_size, - PORT_MMAP_DATA_SIZE), + alloc_size, alloc_size, &mmap_buf, local_buf); if (nxt_slow_path(rc != NXT_UNIT_OK)) { return rc; @@ -2478,8 +2638,6 @@ nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode, if (buf->free > buf->start) { rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, &mmap_buf, 0); - - nxt_unit_free_outgoing_buf(&mmap_buf); } return rc; @@ -2553,15 +2711,23 @@ nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws) static nxt_port_mmap_header_t * nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, - nxt_unit_port_id_t *port_id, nxt_chunk_id_t *c, int n) + nxt_unit_port_id_t *port_id, nxt_chunk_id_t *c, int *n, int min_n) { int res, nchunks, i; + uint32_t outgoing_size; nxt_unit_mmap_t *mm, *mm_end; + nxt_unit_impl_t *lib; nxt_port_mmap_header_t *hdr; + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + pthread_mutex_lock(&process->outgoing.mutex); - mm_end = process->outgoing.elts + process->outgoing.size; +retry: + + outgoing_size = process->outgoing.size; + + mm_end = process->outgoing.elts + outgoing_size; for (mm = process->outgoing.elts; mm < mm_end; mm++) { hdr = mm->hdr; @@ -2575,11 +2741,17 @@ nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, while (nxt_port_mmap_get_free_chunk(hdr->free_map, c)) { nchunks = 1; - while (nchunks < n) { + while (nchunks < *n) { res = nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, *c + nchunks); if (res == 0) { + if (nchunks >= min_n) { + *n = nchunks; + + goto unlock; + } + for (i = 0; i < nchunks; i++) { nxt_port_mmap_set_chunk_free(hdr->free_map, *c + i); } @@ -2592,23 +2764,155 @@ nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, nchunks++; } - if (nchunks == n) { + if (nchunks >= min_n) { + *n = nchunks; + goto unlock; } } + + hdr->oosm = 1; + } + + if (outgoing_size >= lib->shm_mmap_limit) { + /* Cannot allocate more shared memory. */ + pthread_mutex_unlock(&process->outgoing.mutex); + + if (min_n == 0) { + *n = 0; + } + + if (nxt_slow_path(process->outgoing.allocated_chunks + min_n + >= lib->shm_mmap_limit * PORT_MMAP_CHUNK_COUNT)) + { + /* Memory allocated by application, but not send to router. */ + return NULL; + } + + /* Notify router about OOSM condition. */ + + res = nxt_unit_send_oosm(ctx, port_id); + if (nxt_slow_path(res != NXT_UNIT_OK)) { + return NULL; + } + + /* Return if caller can handle OOSM condition. Non-blocking mode. */ + + if (min_n == 0) { + return NULL; + } + + nxt_unit_debug(ctx, "oosm: waiting for ACK"); + + res = nxt_unit_wait_shm_ack(ctx); + if (nxt_slow_path(res != NXT_UNIT_OK)) { + return NULL; + } + + nxt_unit_debug(ctx, "oosm: retry"); + + pthread_mutex_lock(&process->outgoing.mutex); + + goto retry; } *c = 0; - hdr = nxt_unit_new_mmap(ctx, process, port_id, n); + hdr = nxt_unit_new_mmap(ctx, process, port_id, *n); unlock: + nxt_atomic_fetch_add(&process->outgoing.allocated_chunks, *n); + + nxt_unit_debug(ctx, "process %d allocated_chunks %d", + process->pid, + process->outgoing.allocated_chunks); + pthread_mutex_unlock(&process->outgoing.mutex); return hdr; } +static int +nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) +{ + ssize_t res; + nxt_port_msg_t msg; + nxt_unit_impl_t *lib; + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + + msg.stream = 0; + msg.pid = lib->pid; + msg.reply_port = 0; + msg.type = _NXT_PORT_MSG_OOSM; + msg.last = 0; + msg.mmap = 0; + msg.nf = 0; + msg.mf = 0; + msg.tracking = 0; + + res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg), NULL, 0); + if (nxt_slow_path(res != sizeof(msg))) { + nxt_unit_warn(ctx, "failed to send oosm to %d: %s (%d)", + (int) port_id->pid, strerror(errno), errno); + + return NXT_UNIT_ERROR; + } + + return NXT_UNIT_OK; +} + + +static int +nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx) +{ + nxt_port_msg_t *port_msg; + nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_read_buf_t *rbuf; + + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + while (1) { + rbuf = nxt_unit_read_buf_get(ctx); + if (nxt_slow_path(rbuf == NULL)) { + return NXT_UNIT_ERROR; + } + + nxt_unit_read_buf(ctx, rbuf); + if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) { + nxt_unit_read_buf_release(ctx, rbuf); + + return NXT_UNIT_ERROR; + } + + port_msg = (nxt_port_msg_t *) rbuf->buf; + + if (port_msg->type == _NXT_PORT_MSG_SHM_ACK) { + nxt_unit_read_buf_release(ctx, rbuf); + + break; + } + + pthread_mutex_lock(&ctx_impl->mutex); + + *ctx_impl->pending_read_tail = rbuf; + ctx_impl->pending_read_tail = &rbuf->next; + rbuf->next = NULL; + + pthread_mutex_unlock(&ctx_impl->mutex); + + if (port_msg->type == _NXT_PORT_MSG_QUIT) { + nxt_unit_debug(ctx, "oosm: quit received"); + + return NXT_UNIT_ERROR; + } + } + + return NXT_UNIT_OK; +} + + static nxt_unit_mmap_t * nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i) { @@ -2843,10 +3147,10 @@ nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int fd) static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, - nxt_unit_port_id_t *port_id, uint32_t size, + nxt_unit_port_id_t *port_id, uint32_t size, uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf) { - uint32_t nchunks; + int nchunks, min_nchunks; nxt_chunk_id_t c; nxt_port_mmap_header_t *hdr; @@ -2869,6 +3173,7 @@ nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, mmap_buf->buf.free = mmap_buf->buf.start; mmap_buf->buf.end = mmap_buf->buf.start + size; mmap_buf->port_id = *port_id; + mmap_buf->process = process; nxt_unit_debug(ctx, "outgoing plain buffer allocation: (%p, %d)", mmap_buf->buf.start, (int) size); @@ -2877,9 +3182,20 @@ nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, } nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE; + min_nchunks = (min_size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE; - hdr = nxt_unit_mmap_get(ctx, process, port_id, &c, nchunks); + hdr = nxt_unit_mmap_get(ctx, process, port_id, &c, &nchunks, min_nchunks); if (nxt_slow_path(hdr == NULL)) { + if (nxt_fast_path(min_nchunks == 0 && nchunks == 0)) { + mmap_buf->hdr = NULL; + mmap_buf->buf.start = NULL; + mmap_buf->buf.free = NULL; + mmap_buf->buf.end = NULL; + mmap_buf->free_ptr = NULL; + + return NXT_UNIT_OK; + } + return NXT_UNIT_ERROR; } @@ -2888,6 +3204,7 @@ nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, mmap_buf->buf.free = mmap_buf->buf.start; mmap_buf->buf.end = mmap_buf->buf.start + nchunks * PORT_MMAP_CHUNK_SIZE; mmap_buf->port_id = *port_id; + mmap_buf->process = process; mmap_buf->free_ptr = NULL; nxt_unit_debug(ctx, "outgoing mmap allocation: (%d,%d,%d)", @@ -2991,6 +3308,7 @@ nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps) mmaps->size = 0; mmaps->cap = 0; mmaps->elts = NULL; + mmaps->allocated_chunks = 0; } @@ -3174,6 +3492,7 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) b->buf.free = start; b->buf.end = b->buf.start + size; b->hdr = hdr; + b->process = process; b = b->next; @@ -3191,23 +3510,79 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) } -static int -nxt_unit_mmap_release(nxt_port_mmap_header_t *hdr, void *start, uint32_t size) +static void +nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, + nxt_unit_process_t *process, nxt_port_mmap_header_t *hdr, + void *start, uint32_t size) { - u_char *p, *end; - nxt_chunk_id_t c; + int freed_chunks; + u_char *p, *end; + nxt_chunk_id_t c; + nxt_unit_impl_t *lib; memset(start, 0xA5, size); p = start; end = p + size; c = nxt_port_mmap_chunk_id(hdr, p); + freed_chunks = 0; while (p < end) { nxt_port_mmap_set_chunk_free(hdr->free_map, c); p += PORT_MMAP_CHUNK_SIZE; c++; + freed_chunks++; + } + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + + if (hdr->src_pid == lib->pid && freed_chunks != 0) { + nxt_atomic_fetch_add(&process->outgoing.allocated_chunks, + -freed_chunks); + + nxt_unit_debug(ctx, "process %d allocated_chunks %d", + process->pid, + process->outgoing.allocated_chunks); + } + + if (hdr->dst_pid == lib->pid + && freed_chunks != 0 + && nxt_atomic_cmp_set(&hdr->oosm, 1, 0)) + { + nxt_unit_send_shm_ack(ctx, hdr->src_pid); + } +} + + +static int +nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid) +{ + ssize_t res; + nxt_port_msg_t msg; + nxt_unit_impl_t *lib; + nxt_unit_port_id_t port_id; + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + + nxt_unit_port_id_init(&port_id, pid, 0); + + msg.stream = 0; + msg.pid = lib->pid; + msg.reply_port = 0; + msg.type = _NXT_PORT_MSG_SHM_ACK; + msg.last = 0; + msg.mmap = 0; + msg.nf = 0; + msg.mf = 0; + msg.tracking = 0; + + res = lib->callbacks.port_send(ctx, &port_id, &msg, sizeof(msg), NULL, 0); + if (nxt_slow_path(res != sizeof(msg))) { + nxt_unit_warn(ctx, "failed to send ack to %d: %s (%d)", + (int) port_id.pid, strerror(errno), errno); + + return NXT_UNIT_ERROR; } return NXT_UNIT_OK; @@ -3369,43 +3744,76 @@ int nxt_unit_run_once(nxt_unit_ctx_t *ctx) { int rc; - char buf[4096]; - char oob[256]; - ssize_t rsize; - nxt_unit_impl_t *lib; nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_read_buf_t *rbuf; - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); - memset(oob, 0, sizeof(struct cmsghdr)); + pthread_mutex_lock(&ctx_impl->mutex); + + if (ctx_impl->pending_read_head != NULL) { + rbuf = ctx_impl->pending_read_head; + ctx_impl->pending_read_head = rbuf->next; + + if (ctx_impl->pending_read_tail == &rbuf->next) { + ctx_impl->pending_read_tail = &ctx_impl->pending_read_head; + } + + pthread_mutex_unlock(&ctx_impl->mutex); - if (ctx_impl->read_port_fd != -1) { - rsize = nxt_unit_port_recv(ctx, ctx_impl->read_port_fd, - buf, sizeof(buf), - oob, sizeof(oob)); } else { - rsize = lib->callbacks.port_recv(ctx, &ctx_impl->read_port_id, - buf, sizeof(buf), - oob, sizeof(oob)); + rbuf = nxt_unit_read_buf_get_impl(ctx_impl); + if (nxt_slow_path(rbuf == NULL)) { + return NXT_UNIT_ERROR; + } + + nxt_unit_read_buf(ctx, rbuf); } - if (nxt_fast_path(rsize > 0)) { - rc = nxt_unit_process_msg(ctx, &ctx_impl->read_port_id, buf, rsize, - oob, sizeof(oob)); + if (nxt_fast_path(rbuf->size > 0)) { + rc = nxt_unit_process_msg(ctx, &ctx_impl->read_port_id, + rbuf->buf, rbuf->size, + rbuf->oob, sizeof(rbuf->oob)); #if (NXT_DEBUG) - memset(buf, 0xAC, rsize); + memset(rbuf->buf, 0xAC, rbuf->size); #endif } else { rc = NXT_UNIT_ERROR; } + nxt_unit_read_buf_release(ctx, rbuf); + return rc; } +static void +nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) +{ + nxt_unit_impl_t *lib; + nxt_unit_ctx_impl_t *ctx_impl; + + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + memset(rbuf->oob, 0, sizeof(struct cmsghdr)); + + if (ctx_impl->read_port_fd != -1) { + rbuf->size = nxt_unit_port_recv(ctx, ctx_impl->read_port_fd, + rbuf->buf, sizeof(rbuf->buf), + rbuf->oob, sizeof(rbuf->oob)); + + } else { + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + + rbuf->size = lib->callbacks.port_recv(ctx, &ctx_impl->read_port_id, + rbuf->buf, sizeof(rbuf->buf), + rbuf->oob, sizeof(rbuf->oob)); + } +} + + void nxt_unit_done(nxt_unit_ctx_t *ctx) { diff --git a/src/nxt_unit.h b/src/nxt_unit.h index 6948b253..c8aaa124 100644 --- a/src/nxt_unit.h +++ b/src/nxt_unit.h @@ -137,6 +137,9 @@ struct nxt_unit_callbacks_s { /* Gracefully quit the application. Optional. */ void (*quit)(nxt_unit_ctx_t *); + /* Shared memory release acknowledgement. */ + void (*shm_ack_handler)(nxt_unit_ctx_t *); + /* Send data and control to process pid using port id. Optional. */ ssize_t (*port_send)(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id, const void *buf, size_t buf_size, @@ -323,6 +326,9 @@ uint32_t nxt_unit_buf_min(void); int nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start, size_t size); +ssize_t nxt_unit_response_write_nb(nxt_unit_request_info_t *req, + const void *start, size_t size, size_t min_size); + int nxt_unit_response_write_cb(nxt_unit_request_info_t *req, nxt_unit_read_info_t *read_info); -- cgit From 763bdff4018ec35de8383273d366160adebb6021 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Tue, 24 Dec 2019 18:04:17 +0300 Subject: Node.js: implementing output message drain using SHM_ACK feature. ServerResponse.write() method tries to write data buffer using libunit and stores buffers to write in a Server-wide output queue, which is processed in response to SHM_ACK message from router. As a side effect 'drain' event implemented and socket.writable flag reflect current state. --- src/nodejs/unit-http/http_server.js | 121 +++++++++++++++++++++++++++++++++--- src/nodejs/unit-http/unit.cpp | 90 ++++++++++++++++++++++----- src/nodejs/unit-http/unit.h | 3 + 3 files changed, 188 insertions(+), 26 deletions(-) (limited to 'src') diff --git a/src/nodejs/unit-http/http_server.js b/src/nodejs/unit-http/http_server.js index c42149a5..2f324329 100644 --- a/src/nodejs/unit-http/http_server.js +++ b/src/nodejs/unit-http/http_server.js @@ -227,6 +227,7 @@ ServerResponse.prototype._write = unit_lib.response_write; ServerResponse.prototype._writeBody = function(chunk, encoding, callback) { var contentLength = 0; + var res, o; this._sendHeaders(); @@ -247,11 +248,32 @@ ServerResponse.prototype._writeBody = function(chunk, encoding, callback) { if (typeof chunk === 'string') { contentLength = Buffer.byteLength(chunk, encoding); + if (contentLength > unit_lib.buf_min) { + chunk = Buffer.from(chunk, encoding); + + contentLength = chunk.length; + } + } else { contentLength = chunk.length; } - this._write(chunk, contentLength); + if (this.server._output.length > 0 || !this.socket.writable) { + o = new BufferedOutput(this, 0, chunk, encoding, callback); + this.server._output.push(o); + + return false; + } + + res = this._write(chunk, 0, contentLength); + if (res < contentLength) { + this.socket.writable = false; + + o = new BufferedOutput(this, res, chunk, encoding, callback); + this.server._output.push(o); + + return false; + } } if (typeof callback === 'function') { @@ -265,29 +287,48 @@ ServerResponse.prototype._writeBody = function(chunk, encoding, callback) { * the event loop. All callbacks passed to process.nextTick() * will be resolved before the event loop continues. */ - process.nextTick(function () { - callback(this); - }.bind(this)); + process.nextTick(callback); } + + return true; }; ServerResponse.prototype.write = function write(chunk, encoding, callback) { if (this.finished) { - throw new Error("Write after end"); - } + if (typeof encoding === 'function') { + callback = encoding; + encoding = null; + } - this._writeBody(chunk, encoding, callback); + var err = new Error("Write after end"); + process.nextTick(() => { + this.emit('error', err); - return true; + if (typeof callback === 'function') { + callback(err); + } + }) + } + + return this._writeBody(chunk, encoding, callback); }; ServerResponse.prototype._end = unit_lib.response_end; ServerResponse.prototype.end = function end(chunk, encoding, callback) { if (!this.finished) { - this._writeBody(chunk, encoding, callback); + if (typeof encoding === 'function') { + callback = encoding; + encoding = null; + } - this._end(); + this._writeBody(chunk, encoding, () => { + this._end(); + + if (typeof callback === 'function') { + callback(); + } + }); this.finished = true; } @@ -393,6 +434,9 @@ function Server(requestListener) { this._upgradeListenerCount--; } }); + + this._output = []; + this._drain_resp = new Set(); } util.inherits(Server, EventEmitter); @@ -429,6 +473,63 @@ Server.prototype.emit_close = function () { this.emit('close'); }; +Server.prototype.emit_drain = function () { + var res, o, l; + + if (this._output.length <= 0) { + return; + } + + while (this._output.length > 0) { + o = this._output[0]; + + if (typeof o.chunk === 'string') { + l = Buffer.byteLength(o.chunk, o.encoding); + + } else { + l = o.chunk.length; + } + + res = o.resp._write(o.chunk, o.offset, l); + + o.offset += res; + if (o.offset < l) { + return; + } + + this._drain_resp.add(o.resp); + + if (typeof o.callback === 'function') { + process.nextTick(o.callback); + } + + this._output.shift(); + } + + for (var resp of this._drain_resp) { + + if (resp.socket.writable) { + continue; + } + + resp.socket.writable = true; + + process.nextTick(() => { + resp.emit("drain"); + }); + } + + this._drain_resp.clear(); +}; + +function BufferedOutput(resp, offset, chunk, encoding, callback) { + this.resp = resp; + this.offset = offset; + this.chunk = chunk; + this.encoding = encoding; + this.callback = callback; +} + function connectionListener(socket) { } diff --git a/src/nodejs/unit-http/unit.cpp b/src/nodejs/unit-http/unit.cpp index 10875703..1fa73689 100644 --- a/src/nodejs/unit-http/unit.cpp +++ b/src/nodejs/unit-http/unit.cpp @@ -70,6 +70,8 @@ Unit::init(napi_env env, napi_value exports) websocket_send_frame); napi.set_named_property(exports, "websocket_set_sock", websocket_set_sock); + napi.set_named_property(exports, "buf_min", nxt_unit_buf_min()); + napi.set_named_property(exports, "buf_max", nxt_unit_buf_max()); } catch (exception &e) { napi.throw_error(e); @@ -148,6 +150,7 @@ Unit::create_server(napi_env env, napi_callback_info info) unit_init.callbacks.request_handler = request_handler_cb; unit_init.callbacks.websocket_handler = websocket_handler_cb; unit_init.callbacks.close_handler = close_handler_cb; + unit_init.callbacks.shm_ack_handler = shm_ack_handler_cb; unit_init.callbacks.add_port = add_port; unit_init.callbacks.remove_port = remove_port; unit_init.callbacks.quit = quit_cb; @@ -308,6 +311,40 @@ Unit::close_handler(nxt_unit_request_info_t *req) } +void +Unit::shm_ack_handler_cb(nxt_unit_ctx_t *ctx) +{ + Unit *obj; + + obj = reinterpret_cast(ctx->unit->data); + + obj->shm_ack_handler(ctx); +} + + +void +Unit::shm_ack_handler(nxt_unit_ctx_t *ctx) +{ + napi_value server_obj, emit_drain; + + try { + nxt_handle_scope scope(env()); + + server_obj = get_server_object(); + + emit_drain = get_named_property(server_obj, "emit_drain"); + + nxt_async_context async_context(env(), "shm_ack_handler"); + nxt_callback_scope async_scope(async_context); + + make_callback(async_context, server_obj, emit_drain); + + } catch (exception &e) { + nxt_unit_warn(ctx, "shm_ack_handler: %s", e.str); + } +} + + static void nxt_uv_read_callback(uv_poll_t *handle, int status, int events) { @@ -748,47 +785,68 @@ Unit::response_write(napi_env env, napi_callback_info info) int ret; void *ptr; size_t argc, have_buf_len; - uint32_t buf_len; + ssize_t res_len; + uint32_t buf_start, buf_len; nxt_napi napi(env); napi_value this_arg; nxt_unit_buf_t *buf; napi_valuetype buf_type; nxt_unit_request_info_t *req; - napi_value argv[2]; + napi_value argv[3]; - argc = 2; + argc = 3; try { this_arg = napi.get_cb_info(info, argc, argv); - if (argc != 2) { + if (argc != 3) { throw exception("Wrong args count. Expected: " - "chunk, chunk length"); + "chunk, start, length"); } req = napi.get_request_info(this_arg); buf_type = napi.type_of(argv[0]); - buf_len = napi.get_value_uint32(argv[1]) + 1; - - buf = nxt_unit_response_buf_alloc(req, buf_len); - if (buf == NULL) { - throw exception("Failed to allocate response buffer"); - } + buf_start = napi.get_value_uint32(argv[1]); + buf_len = napi.get_value_uint32(argv[2]) + 1; if (buf_type == napi_string) { /* TODO: will work only for utf8 content-type */ + if (req->response_buf != NULL + && (req->response_buf->end - req->response_buf->free) + >= buf_len) + { + buf = req->response_buf; + + } else { + buf = nxt_unit_response_buf_alloc(req, buf_len); + if (buf == NULL) { + throw exception("Failed to allocate response buffer"); + } + } + have_buf_len = napi.get_value_string_utf8(argv[0], buf->free, buf_len); + buf->free += have_buf_len; + + ret = nxt_unit_buf_send(buf); + if (ret == NXT_UNIT_OK) { + res_len = have_buf_len; + } + } else { ptr = napi.get_buffer_info(argv[0], have_buf_len); - memcpy(buf->free, ptr, have_buf_len); - } + if (buf_start > 0) { + ptr = ((uint8_t *) ptr) + buf_start; + have_buf_len -= buf_start; + } - buf->free += have_buf_len; + res_len = nxt_unit_response_write_nb(req, ptr, have_buf_len, 0); + + ret = res_len < 0 ? -res_len : NXT_UNIT_OK; + } - ret = nxt_unit_buf_send(buf); if (ret != NXT_UNIT_OK) { throw exception("Failed to send body buf"); } @@ -797,7 +855,7 @@ Unit::response_write(napi_env env, napi_callback_info info) return nullptr; } - return this_arg; + return napi.create((int64_t) res_len); } diff --git a/src/nodejs/unit-http/unit.h b/src/nodejs/unit-http/unit.h index f5eaf9fd..18359118 100644 --- a/src/nodejs/unit-http/unit.h +++ b/src/nodejs/unit-http/unit.h @@ -36,6 +36,9 @@ private: static void close_handler_cb(nxt_unit_request_info_t *req); void close_handler(nxt_unit_request_info_t *req); + static void shm_ack_handler_cb(nxt_unit_ctx_t *ctx); + void shm_ack_handler(nxt_unit_ctx_t *ctx); + static int add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); static void remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id); -- cgit From 8677bf8d41e002310971fddbc5e45c64034afc2f Mon Sep 17 00:00:00 2001 From: Axel Duch Date: Tue, 24 Dec 2019 13:58:10 +0000 Subject: Router: introducing routing on client address. --- src/nxt_conf_validation.c | 68 +++++++++ src/nxt_http_route.c | 243 +++++++++++++++++++++++++++++++- src/nxt_http_route_addr.c | 349 ++++++++++++++++++++++++++++++++++++++++++++++ src/nxt_http_route_addr.h | 73 ++++++++++ src/nxt_sockaddr.c | 4 + 5 files changed, 733 insertions(+), 4 deletions(-) create mode 100644 src/nxt_http_route_addr.c create mode 100644 src/nxt_http_route_addr.h (limited to 'src') diff --git a/src/nxt_conf_validation.c b/src/nxt_conf_validation.c index 923e6c8c..cbccbade 100644 --- a/src/nxt_conf_validation.c +++ b/src/nxt_conf_validation.c @@ -9,6 +9,8 @@ #include #include #include +#include +#include typedef enum { @@ -82,6 +84,10 @@ static nxt_int_t nxt_conf_vldt_match_patterns_set_member( nxt_conf_validation_t *vldt, nxt_str_t *name, nxt_conf_value_t *value); static nxt_int_t nxt_conf_vldt_match_scheme_pattern(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, void *data); +static nxt_int_t nxt_conf_vldt_match_addrs(nxt_conf_validation_t *vldt, + nxt_conf_value_t *value, void *data); +static nxt_int_t nxt_conf_vldt_match_addr(nxt_conf_validation_t *vldt, + nxt_conf_value_t *value); static nxt_int_t nxt_conf_vldt_app_name(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, void *data); static nxt_int_t nxt_conf_vldt_app(nxt_conf_validation_t *vldt, @@ -283,6 +289,11 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_match_members[] = { &nxt_conf_vldt_match_patterns, NULL }, + { nxt_string("source"), + NXT_CONF_VLDT_STRING | NXT_CONF_VLDT_ARRAY, + &nxt_conf_vldt_match_addrs, + NULL }, + { nxt_string("uri"), NXT_CONF_VLDT_STRING | NXT_CONF_VLDT_ARRAY, &nxt_conf_vldt_match_patterns, @@ -1154,6 +1165,63 @@ nxt_conf_vldt_match_pattern(nxt_conf_validation_t *vldt, } +static nxt_int_t +nxt_conf_vldt_match_addrs(nxt_conf_validation_t *vldt, + nxt_conf_value_t *value, void *data) +{ + if (nxt_conf_type(value) == NXT_CONF_ARRAY) { + return nxt_conf_vldt_array_iterator(vldt, value, + &nxt_conf_vldt_match_addr); + } + + return nxt_conf_vldt_match_addr(vldt, value); +} + + +static nxt_int_t +nxt_conf_vldt_match_addr(nxt_conf_validation_t *vldt, + nxt_conf_value_t *value) +{ + nxt_http_route_addr_pattern_t pattern; + + switch (nxt_http_route_addr_pattern_parse(vldt->pool, &pattern, value)) { + + case NXT_OK: + return NXT_OK; + + case NXT_ADDR_PATTERN_PORT_ERROR: + return nxt_conf_vldt_error(vldt, "The \"address\" port an invalid " + "port."); + + case NXT_ADDR_PATTERN_CV_TYPE_ERROR: + return nxt_conf_vldt_error(vldt, "The \"match\" pattern for " + "\"address\" must be a string."); + + case NXT_ADDR_PATTERN_LENGTH_ERROR: + return nxt_conf_vldt_error(vldt, "The \"address\" is too short."); + + case NXT_ADDR_PATTERN_FORMAT_ERROR: + return nxt_conf_vldt_error(vldt, "The \"address\" format is invalid."); + + case NXT_ADDR_PATTERN_RANGE_OVERLAP_ERROR: + return nxt_conf_vldt_error(vldt, "The \"address\" range is " + "overlapping."); + + case NXT_ADDR_PATTERN_CIDR_ERROR: + return nxt_conf_vldt_error(vldt, "The \"address\" has an invalid CIDR " + "prefix."); + + case NXT_ADDR_PATTERN_NO_IPv6_ERROR: + return nxt_conf_vldt_error(vldt, "The \"address\" does not support " + "IPv6 with your configuration."); + + default: + return nxt_conf_vldt_error(vldt, "The \"address\" has an unknown " + "format."); + } +} + + static nxt_int_t nxt_conf_vldt_match_scheme_pattern(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, void *data) diff --git a/src/nxt_http_route.c b/src/nxt_http_route.c index 18b352ea..d14dcc07 100644 --- a/src/nxt_http_route.c +++ b/src/nxt_http_route.c @@ -6,6 +6,8 @@ #include #include +#include +#include typedef enum { @@ -16,6 +18,7 @@ typedef enum { NXT_HTTP_ROUTE_ARGUMENT, NXT_HTTP_ROUTE_COOKIE, NXT_HTTP_ROUTE_SCHEME, + NXT_HTTP_ROUTE_SOURCE, } nxt_http_route_object_t; @@ -50,6 +53,7 @@ typedef struct { nxt_conf_value_t *arguments; nxt_conf_value_t *cookies; nxt_conf_value_t *scheme; + nxt_conf_value_t *source; } nxt_http_route_match_conf_t; @@ -118,9 +122,18 @@ typedef struct { } nxt_http_route_table_t; +typedef struct { + /* The object must be the first field. */ + nxt_http_route_object_t object:8; + uint32_t items; + nxt_http_route_addr_pattern_t addr_pattern[0]; +} nxt_http_route_addr_rule_t; + + typedef union { nxt_http_route_rule_t *rule; nxt_http_route_table_t *table; + nxt_http_route_addr_rule_t *addr_rule; } nxt_http_route_test_t; @@ -170,6 +183,8 @@ static nxt_http_route_ruleset_t *nxt_http_route_ruleset_create(nxt_task_t *task, static nxt_http_route_rule_t *nxt_http_route_rule_name_create(nxt_task_t *task, nxt_mp_t *mp, nxt_conf_value_t *rule_cv, nxt_str_t *name, nxt_bool_t case_sensitive); +static nxt_http_route_addr_rule_t *nxt_http_route_addr_rule_create( + nxt_task_t *task, nxt_mp_t *mp, nxt_conf_value_t *cv); static nxt_http_route_rule_t *nxt_http_route_rule_create(nxt_task_t *task, nxt_mp_t *mp, nxt_conf_value_t *cv, nxt_bool_t case_sensitive, nxt_http_route_pattern_case_t pattern_case); @@ -196,6 +211,8 @@ static nxt_int_t nxt_http_route_table(nxt_http_request_t *r, nxt_http_route_table_t *table); static nxt_int_t nxt_http_route_ruleset(nxt_http_request_t *r, nxt_http_route_ruleset_t *ruleset); +static nxt_int_t nxt_http_route_addr_rule(nxt_http_request_t *r, + nxt_http_route_addr_rule_t *addr_rule, nxt_sockaddr_t *sockaddr); static nxt_int_t nxt_http_route_rule(nxt_http_request_t *r, nxt_http_route_rule_t *rule); static nxt_int_t nxt_http_route_header(nxt_http_request_t *r, @@ -329,6 +346,12 @@ static nxt_conf_map_t nxt_http_route_match_conf[] = { NXT_CONF_MAP_PTR, offsetof(nxt_http_route_match_conf_t, cookies), }, + + { + nxt_string("source"), + NXT_CONF_MAP_PTR, + offsetof(nxt_http_route_match_conf_t, source), + }, }; @@ -381,6 +404,7 @@ nxt_http_route_match_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_http_route_rule_t *rule; nxt_http_route_table_t *table; nxt_http_route_match_t *match; + nxt_http_route_addr_rule_t *addr_rule; nxt_http_route_match_conf_t mtcf; static nxt_str_t match_path = nxt_string("/match"); @@ -505,6 +529,17 @@ nxt_http_route_match_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, test++; } + if (mtcf.source != NULL) { + addr_rule = nxt_http_route_addr_rule_create(task, mp, mtcf.source); + if (addr_rule == NULL) { + return NULL; + } + + addr_rule->object = NXT_HTTP_ROUTE_SOURCE; + test->addr_rule = addr_rule; + test++; + } + return match; } @@ -770,6 +805,53 @@ nxt_http_route_rule_create(nxt_task_t *task, nxt_mp_t *mp, } +static nxt_http_route_addr_rule_t * +nxt_http_route_addr_rule_create(nxt_task_t *task, nxt_mp_t *mp, + nxt_conf_value_t *cv) +{ + size_t size; + uint32_t i, n; + nxt_bool_t array; + nxt_conf_value_t *value; + nxt_http_route_addr_rule_t *addr_rule; + nxt_http_route_addr_pattern_t *pattern; + + array = (nxt_conf_type(cv) == NXT_CONF_ARRAY); + n = array ? nxt_conf_array_elements_count(cv) : 1; + + size = sizeof(nxt_http_route_addr_rule_t) + + n * sizeof(nxt_http_route_addr_pattern_t); + + addr_rule = nxt_mp_alloc(mp, size); + if (nxt_slow_path(addr_rule == NULL)) { + return NULL; + } + + addr_rule->items = n; + + if (!array) { + pattern = &addr_rule->addr_pattern[0]; + + if (nxt_http_route_addr_pattern_parse(mp, pattern, cv) != NXT_OK) { + return NULL; + } + + return addr_rule; + } + + for (i = 0; i < n; i++) { + pattern = &addr_rule->addr_pattern[i]; + value = nxt_conf_get_array_element(cv, i); + + if (nxt_http_route_addr_pattern_parse(mp, pattern, value) != NXT_OK) { + return NULL; + } + } + + return addr_rule; +} + + static int nxt_http_pattern_compare(const void *one, const void *two) { @@ -1141,11 +1223,16 @@ nxt_http_route_match(nxt_http_request_t *r, nxt_http_route_match_t *match) end = test + match->items; while (test < end) { - if (test->rule->object != NXT_HTTP_ROUTE_TABLE) { - ret = nxt_http_route_rule(r, test->rule); - - } else { + switch (test->rule->object) { + case NXT_HTTP_ROUTE_TABLE: ret = nxt_http_route_table(r, test->table); + break; + case NXT_HTTP_ROUTE_SOURCE: + ret = nxt_http_route_addr_rule(r, test->addr_rule, r->remote); + break; + default: + ret = nxt_http_route_rule(r, test->rule); + break; } if (ret <= 0) { @@ -1255,6 +1342,154 @@ nxt_http_route_rule(nxt_http_request_t *r, nxt_http_route_rule_t *rule) } +static nxt_int_t +nxt_http_route_addr_pattern_match(nxt_http_route_addr_pattern_t *p, + nxt_sockaddr_t *sa) +{ +#if (NXT_INET6) + uint32_t i; +#endif + in_port_t in_port; + nxt_int_t match; + struct sockaddr_in *sin; +#if (NXT_INET6) + struct sockaddr_in6 *sin6; +#endif + nxt_http_route_addr_base_t *base; + + base = &p->base; + + switch (sa->u.sockaddr.sa_family) { + + case AF_INET: + + match = (base->addr_family == AF_INET + || base->addr_family == AF_UNSPEC); + if (!match) { + break; + } + + sin = &sa->u.sockaddr_in; + in_port = ntohs(sin->sin_port); + + match = (in_port >= base->port.start && in_port <= base->port.end); + if (!match) { + break; + } + + switch (base->match_type) { + + case NXT_HTTP_ROUTE_ADDR_ANY: + break; + + case NXT_HTTP_ROUTE_ADDR_EXACT: + match = (nxt_memcmp(&sin->sin_addr, &p->addr.v4.start, + sizeof(struct in_addr)) + == 0); + break; + + case NXT_HTTP_ROUTE_ADDR_RANGE: + match = (nxt_memcmp(&sin->sin_addr, &p->addr.v4.start, + sizeof(struct in_addr)) >= 0 + && nxt_memcmp(&sin->sin_addr, &p->addr.v4.end, + sizeof(struct in_addr)) <= 0); + break; + + case NXT_HTTP_ROUTE_ADDR_CIDR: + match = ((sin->sin_addr.s_addr & p->addr.v4.end) + == p->addr.v4.start); + break; + + default: + nxt_unreachable(); + } + + break; + +#if (NXT_INET6) + case AF_INET6: + + match = (base->addr_family == AF_INET6 + || base->addr_family == AF_UNSPEC); + if (!match) { + break; + } + + sin6 = &sa->u.sockaddr_in6; + in_port = ntohs(sin6->sin6_port); + + match = (in_port >= base->port.start && in_port <= base->port.end); + if (!match) { + break; + } + + switch (base->match_type) { + + case NXT_HTTP_ROUTE_ADDR_ANY: + break; + + case NXT_HTTP_ROUTE_ADDR_EXACT: + match = (nxt_memcmp(&sin6->sin6_addr, &p->addr.v6.start, + sizeof(struct in6_addr)) + == 0); + break; + + case NXT_HTTP_ROUTE_ADDR_RANGE: + match = (nxt_memcmp(&sin6->sin6_addr, &p->addr.v6.start, + sizeof(struct in6_addr)) >= 0 + && nxt_memcmp(&sin6->sin6_addr, &p->addr.v6.end, + sizeof(struct in6_addr)) <= 0); + break; + + case NXT_HTTP_ROUTE_ADDR_CIDR: + for (i = 0; i < 16; i++) { + match = ((sin6->sin6_addr.s6_addr[i] + & p->addr.v6.end.s6_addr[i]) + == p->addr.v6.start.s6_addr[i]); + + if (!match) { + break; + } + } + + break; + + default: + nxt_unreachable(); + } + + break; +#endif + + default: + match = 0; + break; + } + + return match ^ base->negative; +} + + +static nxt_int_t +nxt_http_route_addr_rule(nxt_http_request_t *r, + nxt_http_route_addr_rule_t *addr_rule, nxt_sockaddr_t *sa) +{ + uint32_t i, n; + nxt_http_route_addr_pattern_t *p; + + n = addr_rule->items; + + for (i = 0; i < n; i++) { + p = &addr_rule->addr_pattern[i]; + if (nxt_http_route_addr_pattern_match(p, sa)) { + return 1; + } + } + + return 0; +} + + static nxt_int_t nxt_http_route_header(nxt_http_request_t *r, nxt_http_route_rule_t *rule) { diff --git a/src/nxt_http_route_addr.c b/src/nxt_http_route_addr.c new file mode 100644 index 00000000..3c7e9c84 --- /dev/null +++ b/src/nxt_http_route_addr.c @@ -0,0 +1,349 @@ + +/* + * Copyright (C) Axel Duch + * Copyright (C) NGINX, Inc. + */ + +#include +#include + + +static nxt_bool_t nxt_str_looks_like_ipv6(const nxt_str_t *str); +#if (NXT_INET6) +static nxt_bool_t nxt_valid_ipv6_blocks(u_char *c, size_t len); +#endif + + +nxt_int_t +nxt_http_route_addr_pattern_parse(nxt_mp_t *mp, + nxt_http_route_addr_pattern_t *pattern, nxt_conf_value_t *cv) +{ + u_char *delim, *end; + nxt_int_t ret, cidr_prefix; + nxt_str_t addr, port; + nxt_http_route_addr_base_t *base; + nxt_http_route_addr_range_t *inet; + + if (nxt_conf_type(cv) != NXT_CONF_STRING) { + return NXT_ADDR_PATTERN_CV_TYPE_ERROR; + } + + nxt_conf_get_string(cv, &addr); + + base = &pattern->base; + + if (addr.length > 0 && addr.start[0] == '!') { + addr.start++; + addr.length--; + + base->negative = 1; + + } else { + base->negative = 0; + } + + if (nxt_slow_path(addr.length < 2)) { + return NXT_ADDR_PATTERN_LENGTH_ERROR; + } + + nxt_str_null(&port); + + if (addr.start[0] == '*' && addr.start[1] == ':') { + port.start = addr.start + 2; + port.length = addr.length - 2; + base->addr_family = AF_UNSPEC; + base->match_type = NXT_HTTP_ROUTE_ADDR_ANY; + + goto parse_port; + } + + if (nxt_str_looks_like_ipv6(&addr)) { +#if (NXT_INET6) + uint8_t i; + nxt_int_t len; + nxt_http_route_in6_addr_range_t *inet6; + + base->addr_family = AF_INET6; + + if (addr.start[0] == '[') { + addr.start++; + addr.length--; + + end = addr.start + addr.length; + + port.start = nxt_rmemstrn(addr.start, end, "]:", 2); + if (nxt_slow_path(port.start == NULL)) { + return NXT_ADDR_PATTERN_FORMAT_ERROR; + } + + addr.length = port.start - addr.start; + port.start += nxt_length("]:"); + port.length = end - port.start; + } + + inet6 = &pattern->addr.v6; + + delim = nxt_memchr(addr.start, '-', addr.length); + if (delim != NULL) { + len = delim - addr.start; + if (nxt_slow_path(!nxt_valid_ipv6_blocks(addr.start, len))) { + return NXT_ADDR_PATTERN_FORMAT_ERROR; + } + + ret = nxt_inet6_addr(&inet6->start, addr.start, len); + if (nxt_slow_path(ret != NXT_OK)) { + return NXT_ADDR_PATTERN_FORMAT_ERROR; + } + + len = addr.start + addr.length - delim - 1; + if (nxt_slow_path(!nxt_valid_ipv6_blocks(delim + 1, len))) { + return NXT_ADDR_PATTERN_FORMAT_ERROR; + } + + ret = nxt_inet6_addr(&inet6->end, delim + 1, len); + if (nxt_slow_path(ret != NXT_OK)) { + return NXT_ADDR_PATTERN_FORMAT_ERROR; + } + + if (nxt_slow_path(nxt_memcmp(&inet6->start, &inet6->end, + sizeof(struct in6_addr)) > 0)) + { + return NXT_ADDR_PATTERN_RANGE_OVERLAP_ERROR; + } + + base->match_type = NXT_HTTP_ROUTE_ADDR_RANGE; + + goto parse_port; + } + + delim = nxt_memchr(addr.start, '/', addr.length); + if (delim != NULL) { + cidr_prefix = nxt_int_parse(delim + 1, + addr.start + addr.length - (delim + 1)); + if (nxt_slow_path(cidr_prefix < 0 || cidr_prefix > 128)) { + return NXT_ADDR_PATTERN_CIDR_ERROR; + } + + addr.length = delim - addr.start; + if (nxt_slow_path(!nxt_valid_ipv6_blocks(addr.start, + addr.length))) + { + return NXT_ADDR_PATTERN_FORMAT_ERROR; + } + + ret = nxt_inet6_addr(&inet6->start, addr.start, addr.length); + if (nxt_slow_path(ret != NXT_OK)) { + return NXT_ADDR_PATTERN_FORMAT_ERROR; + } + + if (nxt_slow_path(cidr_prefix == 0)) { + base->match_type = NXT_HTTP_ROUTE_ADDR_ANY; + + goto parse_port; + } + + if (nxt_slow_path(cidr_prefix == 128)) { + base->match_type = NXT_HTTP_ROUTE_ADDR_EXACT; + + goto parse_port; + } + + base->match_type = NXT_HTTP_ROUTE_ADDR_CIDR; + + for (i = 0; i < sizeof(struct in6_addr); i++) { + if (cidr_prefix >= 8) { + inet6->end.s6_addr[i] = 0xFF; + cidr_prefix -= 8; + + continue; + } + + if (cidr_prefix > 0) { + inet6->end.s6_addr[i] = 0xFF & (0xFF << (8 - cidr_prefix)); + inet6->start.s6_addr[i] &= inet6->end.s6_addr[i]; + cidr_prefix = 0; + + continue; + } + + inet6->start.s6_addr[i] = 0; + inet6->end.s6_addr[i] = 0; + } + + goto parse_port; + } + + base->match_type = NXT_HTTP_ROUTE_ADDR_EXACT; + + if (nxt_slow_path(!nxt_valid_ipv6_blocks(addr.start, addr.length))) { + return NXT_ADDR_PATTERN_FORMAT_ERROR; + } + + nxt_inet6_addr(&inet6->start, addr.start, addr.length); + + goto parse_port; +#endif + return NXT_ADDR_PATTERN_NO_IPv6_ERROR; + } + + base->addr_family = AF_INET; + + delim = nxt_memchr(addr.start, ':', addr.length); + if (delim != NULL) { + port.start = delim + 1; + port.length = addr.start + addr.length - port.start; + addr.length = delim - addr.start; + } + + inet = &pattern->addr.v4; + + delim = nxt_memchr(addr.start, '-', addr.length); + if (delim != NULL) { + inet->start = nxt_inet_addr(addr.start, delim - addr.start); + if (nxt_slow_path(inet->start == INADDR_NONE)) { + return NXT_ADDR_PATTERN_FORMAT_ERROR; + } + + inet->end = nxt_inet_addr(delim + 1, + addr.start + addr.length - (delim + 1)); + if (nxt_slow_path(inet->end == INADDR_NONE)) { + return NXT_ADDR_PATTERN_FORMAT_ERROR; + } + + if (nxt_slow_path(nxt_memcmp(&inet->start, &inet->end, + sizeof(struct in_addr)) > 0)) + { + return NXT_ADDR_PATTERN_RANGE_OVERLAP_ERROR; + } + + base->match_type = NXT_HTTP_ROUTE_ADDR_RANGE; + + goto parse_port; + } + + delim = nxt_memchr(addr.start, '/', addr.length); + if (delim != NULL) { + cidr_prefix = nxt_int_parse(delim + 1, + addr.start + addr.length - (delim + 1)); + if (nxt_slow_path(cidr_prefix < 0 || cidr_prefix > 32)) { + return NXT_ADDR_PATTERN_CIDR_ERROR; + } + + addr.length = delim - addr.start; + inet->end = htonl(0xFFFFFFFF & (0xFFFFFFFF << (32 - cidr_prefix))); + + inet->start = nxt_inet_addr(addr.start, addr.length) & inet->end; + if (nxt_slow_path(inet->start == INADDR_NONE)) { + return NXT_ADDR_PATTERN_FORMAT_ERROR; + } + + if (cidr_prefix == 0) { + base->match_type = NXT_HTTP_ROUTE_ADDR_ANY; + + goto parse_port; + } + + if (cidr_prefix < 32) { + base->match_type = NXT_HTTP_ROUTE_ADDR_CIDR; + + goto parse_port; + } + } + + inet->start = nxt_inet_addr(addr.start, addr.length); + if (nxt_slow_path(inet->start == INADDR_NONE)) { + return NXT_ADDR_PATTERN_FORMAT_ERROR; + } + + base->match_type = NXT_HTTP_ROUTE_ADDR_EXACT; + +parse_port: + + if (port.length == 0) { + if (nxt_slow_path(port.start != NULL)) { + return NXT_ADDR_PATTERN_FORMAT_ERROR; + } + + base->port.start = 0; + base->port.end = 65535; + + return NXT_OK; + } + + delim = nxt_memchr(port.start, '-', port.length - 1); + if (delim != NULL) { + ret = nxt_int_parse(port.start, delim - port.start); + if (nxt_slow_path(ret < 0 || ret > 65535)) { + return NXT_ADDR_PATTERN_PORT_ERROR; + } + + base->port.start = ret; + + ret = nxt_int_parse(delim + 1, port.start + port.length - (delim + 1)); + if (nxt_slow_path(ret < base->port.start || ret > 65535)) { + return NXT_ADDR_PATTERN_PORT_ERROR; + } + + base->port.end = ret; + + } else { + ret = nxt_int_parse(port.start, port.length); + if (nxt_slow_path(ret < 0 || ret > 65535)) { + return NXT_ADDR_PATTERN_PORT_ERROR; + } + + base->port.start = ret; + base->port.end = ret; + } + + return NXT_OK; +} + + +static nxt_bool_t +nxt_str_looks_like_ipv6(const nxt_str_t *str) +{ + u_char *colon, *end; + + colon = nxt_memchr(str->start, ':', str->length); + + if (colon != NULL) { + end = str->start + str->length; + colon = nxt_memchr(colon + 1, ':', end - (colon + 1)); + } + + return (colon != NULL); +} + + +#if (NXT_INET6) + +static nxt_bool_t +nxt_valid_ipv6_blocks(u_char *c, size_t len) +{ + u_char *end; + nxt_uint_t colon_gap; + + end = c + len; + colon_gap = 0; + + while (c != end) { + if (*c == ':') { + colon_gap = 0; + c++; + + continue; + } + + colon_gap++; + c++; + + if (nxt_slow_path(colon_gap > 4)) { + return 0; + } + } + + return 1; +} + +#endif diff --git a/src/nxt_http_route_addr.h b/src/nxt_http_route_addr.h new file mode 100644 index 00000000..3b1e1da3 --- /dev/null +++ b/src/nxt_http_route_addr.h @@ -0,0 +1,73 @@ + +/* + * Copyright (C) Axel Duch + * Copyright (C) NGINX, Inc. + */ + +#include + +#ifndef _NXT_HTTP_ROUTE_ADDR_H_INCLUDED_ +#define _NXT_HTTP_ROUTE_ADDR_H_INCLUDED_ + + +enum { + NXT_HTTP_ROUTE_ADDR_ANY = 0, + NXT_HTTP_ROUTE_ADDR_RANGE, + NXT_HTTP_ROUTE_ADDR_EXACT, + NXT_HTTP_ROUTE_ADDR_CIDR, +}; + + +enum { + NXT_ADDR_PATTERN_PORT_ERROR = NXT_OK + 1, + NXT_ADDR_PATTERN_CV_TYPE_ERROR, + NXT_ADDR_PATTERN_LENGTH_ERROR, + NXT_ADDR_PATTERN_FORMAT_ERROR, + NXT_ADDR_PATTERN_RANGE_OVERLAP_ERROR, + NXT_ADDR_PATTERN_CIDR_ERROR, + NXT_ADDR_PATTERN_NO_IPv6_ERROR, +}; + + +typedef struct { + in_addr_t start; + in_addr_t end; +} nxt_http_route_addr_range_t; + + +#if (NXT_INET6) +typedef struct { + struct in6_addr start; + struct in6_addr end; +} nxt_http_route_in6_addr_range_t; +#endif + + +typedef struct { + uint8_t match_type:2; + uint8_t negative:1; + uint8_t addr_family; + + struct { + uint16_t start; + uint16_t end; + } port; +} nxt_http_route_addr_base_t; + + +typedef struct { + nxt_http_route_addr_base_t base; + + union { + nxt_http_route_addr_range_t v4; +#if (NXT_INET6) + nxt_http_route_in6_addr_range_t v6; +#endif + } addr; +} nxt_http_route_addr_pattern_t; + + +NXT_EXPORT nxt_int_t nxt_http_route_addr_pattern_parse(nxt_mp_t *mp, + nxt_http_route_addr_pattern_t *pattern, nxt_conf_value_t *cv); + +#endif /* _NXT_HTTP_ROUTE_ADDR_H_INCLUDED_ */ diff --git a/src/nxt_sockaddr.c b/src/nxt_sockaddr.c index 57dfbfa6..af696a6b 100644 --- a/src/nxt_sockaddr.c +++ b/src/nxt_sockaddr.c @@ -1140,6 +1140,10 @@ nxt_inet_addr(u_char *buf, size_t length) in_addr_t addr; nxt_uint_t digit, octet, dots; + if (nxt_slow_path(*(buf + length - 1) == '.')) { + return INADDR_NONE; + } + addr = 0; octet = 0; dots = 0; -- cgit From 1a76371499d60856e000bd560a30389ad442b13a Mon Sep 17 00:00:00 2001 From: Axel Duch Date: Tue, 24 Dec 2019 13:59:58 +0000 Subject: Router: introducing routing on listener address. --- src/nxt_conf_validation.c | 5 +++++ src/nxt_http_route.c | 35 +++++++++++++++++++++++++++++++---- 2 files changed, 36 insertions(+), 4 deletions(-) (limited to 'src') diff --git a/src/nxt_conf_validation.c b/src/nxt_conf_validation.c index cbccbade..5a1f7839 100644 --- a/src/nxt_conf_validation.c +++ b/src/nxt_conf_validation.c @@ -294,6 +294,11 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_match_members[] = { &nxt_conf_vldt_match_addrs, NULL }, + { nxt_string("destination"), + NXT_CONF_VLDT_STRING | NXT_CONF_VLDT_ARRAY, + &nxt_conf_vldt_match_addrs, + NULL }, + { nxt_string("uri"), NXT_CONF_VLDT_STRING | NXT_CONF_VLDT_ARRAY, &nxt_conf_vldt_match_patterns, diff --git a/src/nxt_http_route.c b/src/nxt_http_route.c index d14dcc07..ef9593b7 100644 --- a/src/nxt_http_route.c +++ b/src/nxt_http_route.c @@ -19,6 +19,7 @@ typedef enum { NXT_HTTP_ROUTE_COOKIE, NXT_HTTP_ROUTE_SCHEME, NXT_HTTP_ROUTE_SOURCE, + NXT_HTTP_ROUTE_DESTINATION, } nxt_http_route_object_t; @@ -54,6 +55,7 @@ typedef struct { nxt_conf_value_t *cookies; nxt_conf_value_t *scheme; nxt_conf_value_t *source; + nxt_conf_value_t *destination; } nxt_http_route_match_conf_t; @@ -205,8 +207,8 @@ static void nxt_http_route_cleanup(nxt_task_t *task, nxt_http_route_t *routes); static nxt_http_action_t *nxt_http_route_handler(nxt_task_t *task, nxt_http_request_t *r, nxt_http_action_t *start); -static nxt_http_action_t *nxt_http_route_match(nxt_http_request_t *r, - nxt_http_route_match_t *match); +static nxt_http_action_t *nxt_http_route_match(nxt_task_t *task, + nxt_http_request_t *r, nxt_http_route_match_t *match); static nxt_int_t nxt_http_route_table(nxt_http_request_t *r, nxt_http_route_table_t *table); static nxt_int_t nxt_http_route_ruleset(nxt_http_request_t *r, @@ -352,6 +354,12 @@ static nxt_conf_map_t nxt_http_route_match_conf[] = { NXT_CONF_MAP_PTR, offsetof(nxt_http_route_match_conf_t, source), }, + + { + nxt_string("destination"), + NXT_CONF_MAP_PTR, + offsetof(nxt_http_route_match_conf_t, destination), + }, }; @@ -540,6 +548,17 @@ nxt_http_route_match_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, test++; } + if (mtcf.destination != NULL) { + addr_rule = nxt_http_route_addr_rule_create(task, mp, mtcf.destination); + if (addr_rule == NULL) { + return NULL; + } + + addr_rule->object = NXT_HTTP_ROUTE_DESTINATION; + test->addr_rule = addr_rule; + test++; + } + return match; } @@ -1199,7 +1218,7 @@ nxt_http_route_handler(nxt_task_t *task, nxt_http_request_t *r, end = match + route->items; while (match < end) { - action = nxt_http_route_match(r, *match); + action = nxt_http_route_match(task, r, *match); if (action != NULL) { return action; } @@ -1214,7 +1233,8 @@ nxt_http_route_handler(nxt_task_t *task, nxt_http_request_t *r, static nxt_http_action_t * -nxt_http_route_match(nxt_http_request_t *r, nxt_http_route_match_t *match) +nxt_http_route_match(nxt_task_t *task, nxt_http_request_t *r, + nxt_http_route_match_t *match) { nxt_int_t ret; nxt_http_route_test_t *test, *end; @@ -1230,6 +1250,13 @@ nxt_http_route_match(nxt_http_request_t *r, nxt_http_route_match_t *match) case NXT_HTTP_ROUTE_SOURCE: ret = nxt_http_route_addr_rule(r, test->addr_rule, r->remote); break; + case NXT_HTTP_ROUTE_DESTINATION: + if (r->local == NULL && nxt_fast_path(r->proto.any != NULL)) { + nxt_http_proto[r->protocol].local_addr(task, r); + } + + ret = nxt_http_route_addr_rule(r, test->addr_rule, r->local); + break; default: ret = nxt_http_route_rule(r, test->rule); break; -- cgit From eced72ba25a1aced3553ac4e8499c4c2befd2a91 Mon Sep 17 00:00:00 2001 From: Tiago Natel de Moura Date: Mon, 16 Dec 2019 17:12:09 +0000 Subject: Using the 64-bit Linux capability mode when available. For backward compatibility, the Linux capabilities macros exposes v1 semantics (32-bit) by default. We probe the version at runtime (because of pre-compiled binaries) but the kernel syscall API is conservative and it doesn't return a 64-bit capability version if the input version is v1. This patch suppress the kernel > 5.0 dmesg log below: capability: warning: 'unitd' uses 32-bit capabilities (legacy support in use) --- src/nxt_capability.c | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) (limited to 'src') diff --git a/src/nxt_capability.c b/src/nxt_capability.c index 805faff6..dfa7a834 100644 --- a/src/nxt_capability.c +++ b/src/nxt_capability.c @@ -10,6 +10,16 @@ #include #include + +#if (_LINUX_CAPABILITY_VERSION_3) +#define NXT_CAPABILITY_VERSION _LINUX_CAPABILITY_VERSION_3 +#elif (_LINUX_CAPABILITY_VERSION_2) +#define NXT_CAPABILITY_VERSION _LINUX_CAPABILITY_VERSION_2 +#else +#define NXT_CAPABILITY_VERSION _LINUX_CAPABILITY_VERSION +#endif + + #define nxt_capget(hdrp, datap) \ syscall(SYS_capget, hdrp, datap) #define nxt_capset(hdrp, datap) \ @@ -43,7 +53,7 @@ nxt_capability_linux_get_version() { struct __user_cap_header_struct hdr; - hdr.version = _LINUX_CAPABILITY_VERSION; + hdr.version = NXT_CAPABILITY_VERSION; hdr.pid = nxt_pid; nxt_capget(&hdr, NULL); -- cgit