diff options
Diffstat (limited to '')
-rw-r--r-- | src/nxt_router.c | 2295 |
1 files changed, 1158 insertions, 1137 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c index 788199c7..0e1de6fa 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -15,6 +15,8 @@ #include <nxt_unit_request.h> #include <nxt_unit_response.h> #include <nxt_router_request.h> +#include <nxt_app_queue.h> +#include <nxt_port_queue.h> typedef struct { nxt_str_t type; @@ -61,59 +63,22 @@ typedef struct { } nxt_app_rpc_t; -struct nxt_port_select_state_s { - nxt_app_t *app; - nxt_request_app_link_t *req_app_link; - - nxt_port_t *failed_port; - int failed_port_use_delta; - - uint8_t start_process; /* 1 bit */ - nxt_request_app_link_t *shared_ra; - nxt_port_t *port; -}; - -typedef struct nxt_port_select_state_s nxt_port_select_state_t; - static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp); static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data); static void nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port); -static void nxt_router_port_select(nxt_task_t *task, - nxt_port_select_state_t *state); - -static nxt_int_t nxt_router_port_post_select(nxt_task_t *task, - nxt_port_select_state_t *state); - static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app); -static void nxt_request_app_link_update_peer(nxt_task_t *task, - nxt_request_app_link_t *req_app_link); - -nxt_inline void -nxt_request_app_link_inc_use(nxt_request_app_link_t *req_app_link) -{ - nxt_atomic_fetch_add(&req_app_link->use_count, 1); -} - -nxt_inline void -nxt_request_app_link_chk_use(nxt_request_app_link_t *req_app_link, int i) -{ -#if (NXT_DEBUG) - int c; - - c = nxt_atomic_fetch_add(&req_app_link->use_count, i); - - nxt_assert((c + i) > 0); -#else - (void) nxt_atomic_fetch_add(&req_app_link->use_count, i); -#endif -} - -static void nxt_request_app_link_use(nxt_task_t *task, - nxt_request_app_link_t *req_app_link, int i); +static void nxt_router_new_port_handler(nxt_task_t *task, + nxt_port_recv_msg_t *msg); +static void nxt_router_conf_data_handler(nxt_task_t *task, + nxt_port_recv_msg_t *msg); +static void nxt_router_remove_pid_handler(nxt_task_t *task, + nxt_port_recv_msg_t *msg); +static void nxt_router_access_log_reopen_handler(nxt_task_t *task, + nxt_port_recv_msg_t *msg); static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task); static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data); @@ -128,7 +93,22 @@ static nxt_int_t nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end); static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf, nxt_conf_value_t *conf); + static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name); +static nxt_int_t nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data); +static nxt_int_t nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, + nxt_app_t *app); +static nxt_app_t *nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, + nxt_str_t *name); +static void nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, + int i); + +static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task, + nxt_port_t *port); +static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task, + nxt_port_t *port); +static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task, + nxt_port_t *port, nxt_fd_t fd); static void nxt_router_listen_socket_rpc_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf); static void nxt_router_listen_socket_ready(nxt_task_t *task, @@ -182,6 +162,8 @@ static void nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs); static void nxt_router_thread_start(void *data); +static void nxt_router_rt_add_port(nxt_task_t *task, void *obj, + void *data); static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data); static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj, @@ -194,6 +176,8 @@ static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data); static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data); +static void nxt_router_req_headers_ack_handler(nxt_task_t *task, + nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data); static void nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf); @@ -218,20 +202,29 @@ static void nxt_router_access_log_reopen_error(nxt_task_t *task, static void nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data); +static nxt_int_t nxt_router_app_shared_port_send(nxt_task_t *task, + nxt_port_t *app_port); static void nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data); +static void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i); static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app); static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, nxt_apr_action_t action); -static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_app_t *app, - nxt_request_app_link_t *req_app_link); +static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app, + nxt_request_rpc_data_t *req_rpc_data); +static void nxt_router_http_request_error(nxt_task_t *task, void *obj, + void *data); +static void nxt_router_http_request_done(nxt_task_t *task, void *obj, + void *data); +static void nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj, + void *data); static void nxt_router_app_prepare_request(nxt_task_t *task, - nxt_request_app_link_t *req_app_link); + nxt_request_rpc_data_t *req_rpc_data); static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task, - nxt_http_request_t *r, nxt_port_t *port, const nxt_str_t *prefix); + nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix); static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data); static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, @@ -248,11 +241,15 @@ static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data); static void nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i); -static nxt_int_t nxt_router_http_request_done(nxt_task_t *task, +static void nxt_router_http_request_release_post(nxt_task_t *task, nxt_http_request_t *r); static void nxt_router_http_request_release(nxt_task_t *task, void *obj, void *data); static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); +static void nxt_router_get_port_handler(nxt_task_t *task, + nxt_port_recv_msg_t *msg); +static void nxt_router_get_mmap_handler(nxt_task_t *task, + nxt_port_recv_msg_t *msg); extern const nxt_http_request_state_t nxt_http_websocket; @@ -274,8 +271,10 @@ static const nxt_str_t *nxt_app_msg_prefix[] = { static const nxt_port_handlers_t nxt_router_process_port_handlers = { .quit = nxt_signal_quit_handler, .new_port = nxt_router_new_port_handler, + .get_port = nxt_router_get_port_handler, .change_file = nxt_port_change_log_file_handler, .mmap = nxt_port_mmap_handler, + .get_mmap = nxt_router_get_mmap_handler, .data = nxt_router_conf_data_handler, .remove_pid = nxt_router_remove_pid_handler, .access_log = nxt_router_access_log_reopen_handler, @@ -297,6 +296,14 @@ const nxt_process_init_t nxt_router_process = { }; +/* Queues of nxt_socket_conf_t */ +nxt_queue_t creating_sockets; +nxt_queue_t pending_sockets; +nxt_queue_t updating_sockets; +nxt_queue_t keeping_sockets; +nxt_queue_t deleting_sockets; + + static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp) { @@ -461,6 +468,8 @@ nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app) nxt_port_t *router_port; nxt_runtime_t *rt; + nxt_debug(task, "app '%V' start process", &app->name); + rt = task->thread->runtime; router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; @@ -485,99 +494,26 @@ nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app) } -nxt_inline void -nxt_request_app_link_init(nxt_task_t *task, - nxt_request_app_link_t *req_app_link, nxt_request_rpc_data_t *req_rpc_data) -{ - nxt_buf_t *body; - nxt_event_engine_t *engine; - - engine = task->thread->engine; - - nxt_memzero(req_app_link, sizeof(nxt_request_app_link_t)); - - req_app_link->stream = req_rpc_data->stream; - req_app_link->use_count = 1; - req_app_link->req_rpc_data = req_rpc_data; - req_rpc_data->req_app_link = req_app_link; - req_app_link->reply_port = engine->port; - req_app_link->request = req_rpc_data->request; - req_app_link->apr_action = NXT_APR_GOT_RESPONSE; - - req_app_link->work.handler = NULL; - req_app_link->work.task = &engine->task; - req_app_link->work.obj = req_app_link; - req_app_link->work.data = engine; - - body = req_rpc_data->request->body; - - if (body != NULL && nxt_buf_is_file(body)) { - req_app_link->body_fd = body->file->fd; - - body->file->fd = -1; - - } else { - req_app_link->body_fd = -1; - } -} - - -nxt_inline nxt_request_app_link_t * -nxt_request_app_link_alloc(nxt_task_t *task, - nxt_request_app_link_t *ra_src, nxt_request_rpc_data_t *req_rpc_data) -{ - nxt_mp_t *mp; - nxt_request_app_link_t *req_app_link; - - if (ra_src != NULL && ra_src->mem_pool != NULL) { - return ra_src; - } - - mp = req_rpc_data->request->mem_pool; - - req_app_link = nxt_mp_alloc(mp, sizeof(nxt_request_app_link_t)); - - if (nxt_slow_path(req_app_link == NULL)) { - - req_rpc_data->req_app_link = NULL; - - if (ra_src != NULL) { - ra_src->req_rpc_data = NULL; - } - - return NULL; - } - - nxt_mp_retain(mp); - - nxt_request_app_link_init(task, req_app_link, req_rpc_data); - - if (ra_src != NULL) { - req_app_link->body_fd = ra_src->body_fd; - } - - req_app_link->mem_pool = mp; - - return req_app_link; -} - - nxt_inline nxt_bool_t -nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info, - uint32_t stream) +nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data) { - nxt_buf_t *b, *next; - nxt_bool_t cancelled; + nxt_buf_t *b, *next; + nxt_bool_t cancelled; + nxt_msg_info_t *msg_info; + + msg_info = &req_rpc_data->msg_info; if (msg_info->buf == NULL) { return 0; } - cancelled = nxt_port_mmap_tracking_cancel(task, &msg_info->tracking, - stream); + cancelled = nxt_app_queue_cancel(req_rpc_data->app->shared_port->queue, + msg_info->tracking_cookie, + req_rpc_data->stream); if (cancelled) { - nxt_debug(task, "stream #%uD: cancelled by router", stream); + nxt_debug(task, "stream #%uD: cancelled by router", + req_rpc_data->stream); } for (b = msg_info->buf; b != NULL; b = next) { @@ -598,320 +534,204 @@ nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info, } -static void -nxt_request_app_link_update_peer_handler(nxt_task_t *task, void *obj, - void *data) +nxt_inline nxt_bool_t +nxt_queue_chk_remove(nxt_queue_link_t *lnk) { - nxt_request_app_link_t *req_app_link; + if (lnk->next != NULL) { + nxt_queue_remove(lnk); - req_app_link = obj; + lnk->next = NULL; - nxt_request_app_link_update_peer(task, req_app_link); + return 1; + } - nxt_request_app_link_use(task, req_app_link, -1); + return 0; } -static void -nxt_request_app_link_update_peer(nxt_task_t *task, - nxt_request_app_link_t *req_app_link) +nxt_inline void +nxt_request_rpc_data_unlink(nxt_task_t *task, + nxt_request_rpc_data_t *req_rpc_data) { - nxt_event_engine_t *engine; - nxt_request_rpc_data_t *req_rpc_data; - - engine = req_app_link->work.data; - - if (task->thread->engine != engine) { - nxt_request_app_link_inc_use(req_app_link); - - req_app_link->work.handler = nxt_request_app_link_update_peer_handler; - req_app_link->work.task = &engine->task; - req_app_link->work.next = NULL; + nxt_app_t *app; + nxt_bool_t unlinked; + nxt_http_request_t *r; - nxt_debug(task, "req_app_link stream #%uD post update peer to %p", - req_app_link->stream, engine); + nxt_router_msg_cancel(task, req_rpc_data); - nxt_event_engine_post(engine, &req_app_link->work); + if (req_rpc_data->app_port != NULL) { + nxt_router_app_port_release(task, req_rpc_data->app_port, + req_rpc_data->apr_action); - return; + req_rpc_data->app_port = NULL; } - nxt_debug(task, "req_app_link stream #%uD update peer", - req_app_link->stream); - - req_rpc_data = req_app_link->req_rpc_data; - - if (req_rpc_data != NULL && req_app_link->app_port != NULL) { - nxt_port_rpc_ex_set_peer(task, engine->port, req_rpc_data, - req_app_link->app_port->pid); - } -} + app = req_rpc_data->app; + r = req_rpc_data->request; + if (r != NULL) { + r->timer_data = NULL; -static void -nxt_request_app_link_release(nxt_task_t *task, - nxt_request_app_link_t *req_app_link) -{ - nxt_mp_t *mp; - nxt_http_request_t *r; - nxt_request_rpc_data_t *req_rpc_data; + nxt_router_http_request_release_post(task, r); - nxt_assert(task->thread->engine == req_app_link->work.data); - nxt_assert(req_app_link->use_count == 0); + r->req_rpc_data = NULL; + req_rpc_data->request = NULL; - nxt_debug(task, "req_app_link stream #%uD release", req_app_link->stream); + if (app != NULL) { + unlinked = 0; - req_rpc_data = req_app_link->req_rpc_data; + nxt_thread_mutex_lock(&app->mutex); - if (req_rpc_data != NULL) { - if (nxt_slow_path(req_app_link->err_code != 0)) { - nxt_http_request_error(task, req_rpc_data->request, - req_app_link->err_code); + if (r->app_link.next != NULL) { + nxt_queue_remove(&r->app_link); + r->app_link.next = NULL; - } else { - req_rpc_data->app_port = req_app_link->app_port; - req_rpc_data->apr_action = req_app_link->apr_action; - req_rpc_data->msg_info = req_app_link->msg_info; + unlinked = 1; + } - if (req_rpc_data->app->timeout != 0) { - r = req_rpc_data->request; + nxt_thread_mutex_unlock(&app->mutex); - r->timer.handler = nxt_router_app_timeout; - r->timer_data = req_rpc_data; - nxt_timer_add(task->thread->engine, &r->timer, - req_rpc_data->app->timeout); + if (unlinked) { + nxt_mp_release(r->mem_pool); } - - req_app_link->app_port = NULL; - req_app_link->msg_info.buf = NULL; } - - req_rpc_data->req_app_link = NULL; - req_app_link->req_rpc_data = NULL; } - if (req_app_link->app_port != NULL) { - nxt_router_app_port_release(task, req_app_link->app_port, - req_app_link->apr_action); + if (app != NULL) { + nxt_router_app_use(task, app, -1); - req_app_link->app_port = NULL; + req_rpc_data->app = NULL; } - if (req_app_link->body_fd != -1) { - nxt_fd_close(req_app_link->body_fd); + if (req_rpc_data->msg_info.body_fd != -1) { + nxt_fd_close(req_rpc_data->msg_info.body_fd); - req_app_link->body_fd = -1; + req_rpc_data->msg_info.body_fd = -1; } - nxt_router_msg_cancel(task, &req_app_link->msg_info, req_app_link->stream); + if (req_rpc_data->rpc_cancel) { + req_rpc_data->rpc_cancel = 0; - mp = req_app_link->mem_pool; - - if (mp != NULL) { - nxt_mp_free(mp, req_app_link); - nxt_mp_release(mp); + nxt_port_rpc_cancel(task, task->thread->engine->port, + req_rpc_data->stream); } } static void -nxt_request_app_link_release_handler(nxt_task_t *task, void *obj, void *data) +nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { - nxt_request_app_link_t *req_app_link; - - req_app_link = obj; - - nxt_assert(req_app_link->work.data == data); - - nxt_atomic_fetch_add(&req_app_link->use_count, -1); - - nxt_request_app_link_release(task, req_app_link); -} - + nxt_int_t res; + nxt_app_t *app; + nxt_port_t *port, *main_app_port; + nxt_runtime_t *rt; -static void -nxt_request_app_link_use(nxt_task_t *task, nxt_request_app_link_t *req_app_link, - int i) -{ - int c; - nxt_event_engine_t *engine; + nxt_port_new_port_handler(task, msg); - c = nxt_atomic_fetch_add(&req_app_link->use_count, i); + port = msg->u.new_port; - if (i < 0 && c == -i) { - engine = req_app_link->work.data; + if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) { + nxt_router_greet_controller(task, msg->u.new_port); + } - if (task->thread->engine == engine) { - nxt_request_app_link_release(task, req_app_link); + if (port == NULL || port->type != NXT_PROCESS_APP) { + if (msg->port_msg.stream == 0) { return; } - nxt_request_app_link_inc_use(req_app_link); - - req_app_link->work.handler = nxt_request_app_link_release_handler; - req_app_link->work.task = &engine->task; - req_app_link->work.next = NULL; + msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; - nxt_debug(task, "req_app_link stream #%uD post release to %p", - req_app_link->stream, engine); + } else { + if (msg->fd[1] != -1) { + res = nxt_router_port_queue_map(task, port, msg->fd[1]); + if (nxt_slow_path(res != NXT_OK)) { + return; + } - nxt_event_engine_post(engine, &req_app_link->work); + nxt_fd_close(msg->fd[1]); + msg->fd[1] = -1; + } } -} - - -nxt_inline void -nxt_request_app_link_error(nxt_task_t *task, nxt_app_t *app, - nxt_request_app_link_t *req_app_link, const char *str) -{ - req_app_link->app_port = NULL; - req_app_link->err_code = 500; - req_app_link->err_str = str; - - nxt_alert(task, "app \"%V\" internal error: %s on #%uD", - &app->name, str, req_app_link->stream); -} - -nxt_inline void -nxt_request_app_link_pending(nxt_task_t *task, nxt_app_t *app, - nxt_request_app_link_t *req_app_link) -{ - nxt_queue_insert_tail(&req_app_link->app_port->pending_requests, - &req_app_link->link_port_pending); - nxt_queue_insert_tail(&app->pending, &req_app_link->link_app_pending); + if (msg->port_msg.stream != 0) { + nxt_port_rpc_handler(task, msg); + return; + } - nxt_request_app_link_inc_use(req_app_link); + /* + * Port with "id == 0" is application 'main' port and it always + * should come with non-zero stream. + */ + nxt_assert(port->id != 0); - req_app_link->res_time = nxt_thread_monotonic_time(task->thread) - + app->res_timeout; + /* Find 'main' app port and get app reference. */ + rt = task->thread->runtime; - nxt_debug(task, "req_app_link stream #%uD enqueue to pending_requests", - req_app_link->stream); -} + /* + * It is safe to access 'runtime->ports' hash because 'NEW_PORT' + * sent to main port (with id == 0) and processed in main thread. + */ + main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0); + nxt_assert(main_app_port != NULL); + app = main_app_port->app; + nxt_assert(app != NULL); -nxt_inline nxt_bool_t -nxt_queue_chk_remove(nxt_queue_link_t *lnk) -{ - if (lnk->next != NULL) { - nxt_queue_remove(lnk); + nxt_thread_mutex_lock(&app->mutex); - lnk->next = NULL; + /* TODO here should be find-and-add code because there can be + port waiters in port_hash */ + nxt_port_hash_add(&app->port_hash, port); + app->port_hash_count++; - return 1; - } + nxt_thread_mutex_unlock(&app->mutex); - return 0; + port->app = app; + port->main_app_port = main_app_port; } -nxt_inline void -nxt_request_rpc_data_unlink(nxt_task_t *task, - nxt_request_rpc_data_t *req_rpc_data) +static void +nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { - int ra_use_delta; - nxt_request_app_link_t *req_app_link; - - if (req_rpc_data->app_port != NULL) { - nxt_router_app_port_release(task, req_rpc_data->app_port, - req_rpc_data->apr_action); - - req_rpc_data->app_port = NULL; - } - - nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream); - - req_app_link = req_rpc_data->req_app_link; - if (req_app_link != NULL) { - req_rpc_data->req_app_link = NULL; - req_app_link->req_rpc_data = NULL; - - ra_use_delta = 0; - - nxt_thread_mutex_lock(&req_rpc_data->app->mutex); - - if (req_app_link->link_app_requests.next == NULL - && req_app_link->link_port_pending.next == NULL - && req_app_link->link_app_pending.next == NULL - && req_app_link->link_port_websockets.next == NULL) - { - req_app_link = NULL; - - } else { - ra_use_delta -= - nxt_queue_chk_remove(&req_app_link->link_app_requests) - + nxt_queue_chk_remove(&req_app_link->link_port_pending) - + nxt_queue_chk_remove(&req_app_link->link_port_websockets); - - nxt_queue_chk_remove(&req_app_link->link_app_pending); - } - - nxt_thread_mutex_unlock(&req_rpc_data->app->mutex); - - if (req_app_link != NULL) { - nxt_request_app_link_use(task, req_app_link, ra_use_delta); - } - } - - if (req_rpc_data->app != NULL) { - nxt_router_app_use(task, req_rpc_data->app, -1); + void *p; + size_t size; + nxt_int_t ret; + nxt_router_temp_conf_t *tmcf; - req_rpc_data->app = NULL; + tmcf = nxt_router_temp_conf(task); + if (nxt_slow_path(tmcf == NULL)) { + return; } - if (req_rpc_data->request != NULL) { - req_rpc_data->request->timer_data = NULL; - - nxt_router_http_request_done(task, req_rpc_data->request); - - req_rpc_data->request->req_rpc_data = NULL; - req_rpc_data->request = NULL; + if (nxt_slow_path(msg->fd[0] == -1)) { + nxt_alert(task, "conf_data_handler: invalid file shm fd"); + return; } -} + if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) { + nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)", + (int) nxt_buf_mem_used_size(&msg->buf->mem)); -void -nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) -{ - nxt_port_new_port_handler(task, msg); - - if (msg->u.new_port != NULL - && msg->u.new_port->type == NXT_PROCESS_CONTROLLER) - { - nxt_router_greet_controller(task, msg->u.new_port); - } + nxt_fd_close(msg->fd[0]); + msg->fd[0] = -1; - if (msg->port_msg.stream == 0) { return; } - if (msg->u.new_port == NULL - || msg->u.new_port->type != NXT_PROCESS_APP) - { - msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; - } - - nxt_port_rpc_handler(task, msg); -} + nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t)); + p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0); -void -nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) -{ - nxt_int_t ret; - nxt_buf_t *b; - nxt_router_temp_conf_t *tmcf; + nxt_fd_close(msg->fd[0]); + msg->fd[0] = -1; - tmcf = nxt_router_temp_conf(task); - if (nxt_slow_path(tmcf == NULL)) { + if (nxt_slow_path(p == MAP_FAILED)) { return; } - nxt_debug(task, "nxt_router_conf_data_handler(%O): %*s", - nxt_buf_used_size(msg->buf), - (size_t) nxt_buf_used_size(msg->buf), msg->buf->mem.pos); + nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p); tmcf->router_conf->router = nxt_router; tmcf->stream = msg->port_msg.stream; @@ -922,20 +742,12 @@ nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) if (nxt_slow_path(tmcf->port == NULL)) { nxt_alert(task, "reply port not found"); - return; + goto fail; } nxt_port_use(task, tmcf->port, 1); - b = nxt_buf_chk_make_plain(tmcf->router_conf->mem_pool, - msg->buf, msg->size); - if (nxt_slow_path(b == NULL)) { - nxt_router_conf_error(task, tmcf); - - return; - } - - ret = nxt_router_conf_create(task, tmcf, b->mem.pos, b->mem.free); + ret = nxt_router_conf_create(task, tmcf, p, nxt_pointer_to(p, size)); if (nxt_fast_path(ret == NXT_OK)) { nxt_router_conf_apply(task, tmcf, NULL); @@ -943,6 +755,10 @@ nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) } else { nxt_router_conf_error(task, tmcf); } + +fail: + + nxt_mem_munmap(p, size); } @@ -961,7 +777,7 @@ nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port, } -void +static void nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { nxt_event_engine_t *engine; @@ -1027,11 +843,11 @@ nxt_router_temp_conf(nxt_task_t *task) goto temp_fail; } - nxt_queue_init(&tmcf->deleting); - nxt_queue_init(&tmcf->keeping); - nxt_queue_init(&tmcf->updating); - nxt_queue_init(&tmcf->pending); - nxt_queue_init(&tmcf->creating); + nxt_queue_init(&creating_sockets); + nxt_queue_init(&pending_sockets); + nxt_queue_init(&updating_sockets); + nxt_queue_init(&keeping_sockets); + nxt_queue_init(&deleting_sockets); #if (NXT_TLS) nxt_queue_init(&tmcf->tls); @@ -1065,8 +881,10 @@ nxt_router_app_can_start(nxt_app_t *app) nxt_inline nxt_bool_t nxt_router_app_need_start(nxt_app_t *app) { - return app->idle_processes + app->pending_processes - < app->spare_processes; + return (app->active_requests + > app->port_hash_count + app->pending_processes) + || (app->spare_processes + > app->idle_processes + app->pending_processes); } @@ -1088,11 +906,11 @@ nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data) tmcf = obj; - qlk = nxt_queue_first(&tmcf->pending); + qlk = nxt_queue_first(&pending_sockets); - if (qlk != nxt_queue_tail(&tmcf->pending)) { + if (qlk != nxt_queue_tail(&pending_sockets)) { nxt_queue_remove(qlk); - nxt_queue_insert_tail(&tmcf->creating, qlk); + nxt_queue_insert_tail(&creating_sockets, qlk); skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); @@ -1148,10 +966,12 @@ nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data) nxt_router_apps_sort(task, router, tmcf); + nxt_router_apps_hash_use(task, rtcf, 1); + nxt_router_engines_post(router, tmcf); - nxt_queue_add(&router->sockets, &tmcf->updating); - nxt_queue_add(&router->sockets, &tmcf->creating); + nxt_queue_add(&router->sockets, &updating_sockets); + nxt_queue_add(&router->sockets, &creating_sockets); router->access_log = rtcf->access_log; @@ -1181,11 +1001,39 @@ nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data) static void nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) { - nxt_debug(task, "temp conf count:%D", tmcf->count); + uint32_t count; + nxt_router_conf_t *rtcf; + nxt_thread_spinlock_t *lock; + + nxt_debug(task, "temp conf %p count: %D", tmcf, tmcf->count); - if (--tmcf->count == 0) { - nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST); + if (--tmcf->count > 0) { + return; } + + nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST); + + rtcf = tmcf->router_conf; + + lock = &rtcf->router->lock; + + nxt_thread_spin_lock(lock); + + count = rtcf->count; + + nxt_thread_spin_unlock(lock); + + nxt_debug(task, "rtcf %p: %D", rtcf, count); + + if (count == 0) { + nxt_router_apps_hash_use(task, rtcf, -1); + + nxt_router_access_log_release(task, lock, rtcf->access_log); + + nxt_mp_destroy(rtcf->mem_pool); + } + + nxt_mp_destroy(tmcf->mem_pool); } @@ -1202,8 +1050,8 @@ nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) nxt_alert(task, "failed to apply new conf"); - for (qlk = nxt_queue_first(&tmcf->creating); - qlk != nxt_queue_tail(&tmcf->creating); + for (qlk = nxt_queue_first(&creating_sockets); + qlk != nxt_queue_tail(&creating_sockets); qlk = nxt_queue_next(qlk)) { skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); @@ -1217,22 +1065,12 @@ nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) } nxt_queue_init(&new_socket_confs); - nxt_queue_add(&new_socket_confs, &tmcf->updating); - nxt_queue_add(&new_socket_confs, &tmcf->pending); - nxt_queue_add(&new_socket_confs, &tmcf->creating); + nxt_queue_add(&new_socket_confs, &updating_sockets); + nxt_queue_add(&new_socket_confs, &pending_sockets); + nxt_queue_add(&new_socket_confs, &creating_sockets); rtcf = tmcf->router_conf; - nxt_http_routes_cleanup(task, rtcf->routes); - - nxt_queue_each(skcf, &new_socket_confs, nxt_socket_conf_t, link) { - - if (skcf->action != NULL) { - nxt_http_action_cleanup(task, skcf->action); - } - - } nxt_queue_loop; - nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { nxt_router_app_unlink(task, app); @@ -1241,8 +1079,8 @@ nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) router = rtcf->router; - nxt_queue_add(&router->sockets, &tmcf->keeping); - nxt_queue_add(&router->sockets, &tmcf->deleting); + nxt_queue_add(&router->sockets, &keeping_sockets); + nxt_queue_add(&router->sockets, &deleting_sockets); nxt_queue_add(&router->apps, &tmcf->previous); @@ -1253,6 +1091,8 @@ nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) nxt_mp_destroy(rtcf->mem_pool); nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR); + + nxt_mp_destroy(tmcf->mem_pool); } @@ -1465,6 +1305,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_app_t *app, *prev; nxt_str_t *t, *s, *targets; nxt_uint_t n, i; + nxt_port_t *port; nxt_router_t *router; nxt_app_joint_t *app_joint; nxt_conf_value_t *conf, *http, *value, *websocket; @@ -1569,6 +1410,12 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_queue_remove(&prev->link); nxt_queue_insert_tail(&tmcf->previous, &prev->link); + + ret = nxt_router_apps_hash_add(tmcf->router_conf, prev); + if (nxt_slow_path(ret != NXT_OK)) { + goto fail; + } + continue; } @@ -1679,8 +1526,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_queue_init(&app->ports); nxt_queue_init(&app->spare_ports); nxt_queue_init(&app->idle_ports); - nxt_queue_init(&app->requests); - nxt_queue_init(&app->pending); + nxt_queue_init(&app->ack_waiting_req); app->name.length = name.length; nxt_memcpy(app->name.start, name.start, name.length); @@ -1693,7 +1539,6 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, app->timeout = apcf.timeout; app->res_timeout = apcf.res_timeout * 1000000; app->idle_timeout = apcf.idle_timeout; - app->max_pending_responses = 2; app->max_requests = apcf.requests; app->targets = targets; @@ -1708,6 +1553,11 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_queue_insert_tail(&tmcf->apps, &app->link); + ret = nxt_router_apps_hash_add(tmcf->router_conf, app); + if (nxt_slow_path(ret != NXT_OK)) { + goto app_fail; + } + nxt_router_app_use(task, app, 1); app->joint = app_joint; @@ -1724,6 +1574,31 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, app_joint->free_app_work.handler = nxt_router_free_app; app_joint->free_app_work.task = &engine->task; app_joint->free_app_work.obj = app_joint; + + port = nxt_port_new(task, (nxt_port_id_t) -1, nxt_pid, + NXT_PROCESS_APP); + if (nxt_slow_path(port == NULL)) { + return NXT_ERROR; + } + + ret = nxt_port_socket_init(task, port, 0); + if (nxt_slow_path(ret != NXT_OK)) { + nxt_port_use(task, port, -1); + return NXT_ERROR; + } + + ret = nxt_router_app_queue_init(task, port); + if (nxt_slow_path(ret != NXT_OK)) { + nxt_port_use(task, port, -1); + return NXT_ERROR; + } + + nxt_port_write_enable(task, port); + port->app = app; + + app->shared_port = port; + + nxt_thread_mutex_create(&app->outgoing.mutex); } } @@ -1857,7 +1732,8 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, /* COMPATIBILITY: listener application. */ } else if (lscf.application.length > 0) { - skcf->action = nxt_http_pass_application(task, tmcf, + skcf->action = nxt_http_pass_application(task, + tmcf->router_conf, &lscf.application); } } @@ -1902,7 +1778,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, tmcf->router_conf->access_log = access_log; } - nxt_queue_add(&tmcf->deleting, &router->sockets); + nxt_queue_add(&deleting_sockets, &router->sockets); nxt_queue_init(&router->sockets); return NXT_OK; @@ -2023,20 +1899,182 @@ nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name) } -void -nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, nxt_str_t *name, +static nxt_int_t +nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port) +{ + void *mem; + nxt_int_t fd; + + fd = nxt_shm_open(task, sizeof(nxt_app_queue_t)); + if (nxt_slow_path(fd == -1)) { + return NXT_ERROR; + } + + mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t), + PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + if (nxt_slow_path(mem == MAP_FAILED)) { + nxt_fd_close(fd); + + return NXT_ERROR; + } + + nxt_app_queue_init(mem); + + port->queue_fd = fd; + port->queue = mem; + + return NXT_OK; +} + + +static nxt_int_t +nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port) +{ + void *mem; + nxt_int_t fd; + + fd = nxt_shm_open(task, sizeof(nxt_port_queue_t)); + if (nxt_slow_path(fd == -1)) { + return NXT_ERROR; + } + + mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t), + PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + if (nxt_slow_path(mem == MAP_FAILED)) { + nxt_fd_close(fd); + + return NXT_ERROR; + } + + nxt_port_queue_init(mem); + + port->queue_fd = fd; + port->queue = mem; + + return NXT_OK; +} + + +static nxt_int_t +nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd) +{ + void *mem; + + nxt_assert(fd != -1); + + mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t), + PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + if (nxt_slow_path(mem == MAP_FAILED)) { + + return NXT_ERROR; + } + + port->queue = mem; + + return NXT_OK; +} + + +static const nxt_lvlhsh_proto_t nxt_router_apps_hash_proto nxt_aligned(64) = { + NXT_LVLHSH_DEFAULT, + nxt_router_apps_hash_test, + nxt_mp_lvlhsh_alloc, + nxt_mp_lvlhsh_free, +}; + + +static nxt_int_t +nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data) +{ + nxt_app_t *app; + + app = data; + + return nxt_strstr_eq(&lhq->key, &app->name) ? NXT_OK : NXT_DECLINED; +} + + +static nxt_int_t +nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, nxt_app_t *app) +{ + nxt_lvlhsh_query_t lhq; + + lhq.key_hash = nxt_djb_hash(app->name.start, app->name.length); + lhq.replace = 0; + lhq.key = app->name; + lhq.value = app; + lhq.proto = &nxt_router_apps_hash_proto; + lhq.pool = rtcf->mem_pool; + + switch (nxt_lvlhsh_insert(&rtcf->apps_hash, &lhq)) { + + case NXT_OK: + return NXT_OK; + + case NXT_DECLINED: + nxt_thread_log_alert("router app hash adding failed: " + "\"%V\" is already in hash", &lhq.key); + /* Fall through. */ + default: + return NXT_ERROR; + } +} + + +static nxt_app_t * +nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, nxt_str_t *name) +{ + nxt_lvlhsh_query_t lhq; + + lhq.key_hash = nxt_djb_hash(name->start, name->length); + lhq.key = *name; + lhq.proto = &nxt_router_apps_hash_proto; + + if (nxt_lvlhsh_find(&rtcf->apps_hash, &lhq) != NXT_OK) { + return NULL; + } + + return lhq.value; +} + + +static void +nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, int i) +{ + nxt_app_t *app; + nxt_lvlhsh_each_t lhe; + + nxt_lvlhsh_each_init(&lhe, &nxt_router_apps_hash_proto); + + for ( ;; ) { + app = nxt_lvlhsh_each(&rtcf->apps_hash, &lhe); + + if (app == NULL) { + break; + } + + nxt_router_app_use(task, app, i); + } +} + + + +nxt_int_t +nxt_router_listener_application(nxt_router_conf_t *rtcf, nxt_str_t *name, nxt_http_action_t *action) { nxt_app_t *app; - app = nxt_router_app_find(&tmcf->apps, name); + app = nxt_router_apps_hash_get(rtcf, name); if (app == NULL) { - app = nxt_router_app_find(&tmcf->previous, name); + return NXT_DECLINED; } action->u.application = app; action->handler = nxt_http_application_handler; + + return NXT_OK; } @@ -2141,15 +2179,15 @@ nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf, nskcf->listen = skcf->listen; nxt_queue_remove(qlk); - nxt_queue_insert_tail(&tmcf->keeping, qlk); + nxt_queue_insert_tail(&keeping_sockets, qlk); - nxt_queue_insert_tail(&tmcf->updating, &nskcf->link); + nxt_queue_insert_tail(&updating_sockets, &nskcf->link); return NXT_OK; } } - nxt_queue_insert_tail(&tmcf->pending, &nskcf->link); + nxt_queue_insert_tail(&pending_sockets, &nskcf->link); return NXT_DECLINED; } @@ -2182,6 +2220,8 @@ nxt_router_listen_socket_rpc_create(nxt_task_t *task, goto fail; } + b->completion_handler = nxt_router_dummy_buf_completion; + b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size); rt = task->thread->runtime; @@ -2222,7 +2262,7 @@ nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, rpc = data; - s = msg->fd; + s = msg->fd[0]; ret = nxt_socket_nonblocking(task, s); if (nxt_slow_path(ret != NXT_OK)) { @@ -2360,7 +2400,7 @@ nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, goto fail; } - tlscf->chain_file = msg->fd; + tlscf->chain_file = msg->fd[0]; ret = task->thread->runtime->tls->server_init(task, tlscf); if (nxt_slow_path(ret != NXT_OK)) { @@ -2410,6 +2450,8 @@ nxt_router_app_rpc_create(nxt_task_t *task, goto fail; } + b->completion_handler = nxt_router_dummy_buf_completion; + nxt_buf_cpystr(b, &app->name); *b->mem.free++ = '\0'; nxt_buf_cpystr(b, &app->conf); @@ -2457,7 +2499,13 @@ nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, app = rpc->app; port = msg->u.new_port; + + nxt_assert(port != NULL); + nxt_assert(port->type == NXT_PROCESS_APP); + nxt_assert(port->id == 0); + port->app = app; + port->main_app_port = port; app->pending_processes--; app->processes++; @@ -2468,10 +2516,18 @@ nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_queue_insert_tail(&app->ports, &port->app_link); nxt_queue_insert_tail(&app->spare_ports, &port->idle_link); + nxt_debug(task, "app '%V' move new port %PI:%d to spare_ports", + &app->name, port->pid, port->id); + + nxt_port_hash_add(&app->port_hash, port); + app->port_hash_count++; + port->idle_start = 0; nxt_port_inc_use(port); + nxt_router_app_shared_port_send(task, port); + nxt_work_queue_add(&engine->fast_work_queue, nxt_router_conf_apply, task, rpc->temp_conf, NULL); } @@ -2577,13 +2633,13 @@ nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf, { nxt_int_t ret; - ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating, + ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets, nxt_router_listen_socket_create); if (nxt_slow_path(ret != NXT_OK)) { return ret; } - ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating, + ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets, nxt_router_listen_socket_create); if (nxt_slow_path(ret != NXT_OK)) { return ret; @@ -2599,19 +2655,19 @@ nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, { nxt_int_t ret; - ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating, + ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets, nxt_router_listen_socket_create); if (nxt_slow_path(ret != NXT_OK)) { return ret; } - ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating, + ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets, nxt_router_listen_socket_update); if (nxt_slow_path(ret != NXT_OK)) { return ret; } - ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting); + ret = nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets); if (nxt_slow_path(ret != NXT_OK)) { return ret; } @@ -2631,12 +2687,12 @@ nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, return ret; } - ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->updating); + ret = nxt_router_engine_joints_delete(tmcf, recf, &updating_sockets); if (nxt_slow_path(ret != NXT_OK)) { return ret; } - return nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting); + return nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets); } @@ -2874,10 +2930,11 @@ nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs) static nxt_port_handlers_t nxt_router_app_port_handlers = { - .rpc_error = nxt_port_rpc_handler, - .mmap = nxt_port_mmap_handler, - .data = nxt_port_rpc_handler, - .oosm = nxt_router_oosm_handler, + .rpc_error = nxt_port_rpc_handler, + .mmap = nxt_port_mmap_handler, + .data = nxt_port_rpc_handler, + .oosm = nxt_router_oosm_handler, + .req_headers_ack = nxt_port_rpc_handler, }; @@ -2887,6 +2944,7 @@ nxt_router_thread_start(void *data) nxt_int_t ret; nxt_port_t *port; nxt_task_t *task; + nxt_work_t *work; nxt_thread_t *thread; nxt_thread_link_t *link; nxt_event_engine_t *engine; @@ -2927,15 +2985,53 @@ nxt_router_thread_start(void *data) return; } + ret = nxt_router_port_queue_init(task, port); + if (nxt_slow_path(ret != NXT_OK)) { + nxt_port_use(task, port, -1); + return; + } + engine->port = port; nxt_port_enable(task, port, &nxt_router_app_port_handlers); + work = nxt_zalloc(sizeof(nxt_work_t)); + if (nxt_slow_path(work == NULL)) { + return; + } + + work->handler = nxt_router_rt_add_port; + work->task = link->work.task; + work->obj = work; + work->data = port; + + nxt_event_engine_post(link->work.task->thread->engine, work); + nxt_event_engine_start(engine); } static void +nxt_router_rt_add_port(nxt_task_t *task, void *obj, void *data) +{ + nxt_int_t res; + nxt_port_t *port; + nxt_runtime_t *rt; + + rt = task->thread->runtime; + port = data; + + nxt_free(obj); + + res = nxt_port_hash_add(&rt->ports, port); + + if (nxt_fast_path(res == NXT_OK)) { + nxt_port_use(task, port, 1); + } +} + + +static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data) { nxt_joint_job_t *job; @@ -3146,7 +3242,15 @@ nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev, nxt_debug(task, "listen event count: %D", lev->count); + engine = task->thread->engine; + if (--lev->count == 0) { + if (lev->next != NULL) { + nxt_sockaddr_cache_free(engine, lev->next); + + nxt_conn_free(task, lev->next); + } + nxt_free(lev); } @@ -3154,8 +3258,6 @@ nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev, nxt_router_conf_release(task, joint); } - engine = task->thread->engine; - if (engine->shutdown && nxt_queue_is_empty(&engine->joints)) { nxt_thread_exit(task->thread); } @@ -3205,25 +3307,18 @@ nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint) nxt_thread_spin_unlock(lock); - if (skcf != NULL) { - if (skcf->action != NULL) { - nxt_http_action_cleanup(task, skcf->action); - } - #if (NXT_TLS) - if (skcf->tls != NULL) { - task->thread->runtime->tls->server_free(task, skcf->tls); - } -#endif + if (skcf != NULL && skcf->tls != NULL) { + task->thread->runtime->tls->server_free(task, skcf->tls); } +#endif /* TODO remove engine->port */ - /* TODO excude from connected ports */ if (rtcf != NULL) { nxt_debug(task, "old router conf is destroyed"); - nxt_http_routes_cleanup(task, rtcf->routes); + nxt_router_apps_hash_use(task, rtcf, -1); nxt_router_access_log_release(task, lock, rtcf->access_log); @@ -3380,6 +3475,8 @@ nxt_router_access_log_open(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) goto fail; } + b->completion_handler = nxt_router_dummy_buf_completion; + nxt_buf_cpystr(b, &access_log->path); *b->mem.free++ = '\0'; @@ -3422,7 +3519,7 @@ nxt_router_access_log_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, access_log = tmcf->router_conf->access_log; - access_log->fd = msg->fd; + access_log->fd = msg->fd[0]; nxt_work_queue_add(&task->thread->engine->fast_work_queue, nxt_router_conf_apply, task, tmcf, NULL); @@ -3474,7 +3571,7 @@ typedef struct { } nxt_router_access_log_reopen_t; -void +static void nxt_router_access_log_reopen_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { nxt_mp_t *mp; @@ -3571,13 +3668,13 @@ nxt_router_access_log_reopen_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, if (access_log == nxt_router->access_log) { - if (nxt_slow_path(dup2(msg->fd, access_log->fd) == -1)) { + if (nxt_slow_path(dup2(msg->fd[0], access_log->fd) == -1)) { nxt_alert(task, "dup2(%FD, %FD) failed %E", - msg->fd, access_log->fd, nxt_errno); + msg->fd[0], access_log->fd, nxt_errno); } } - nxt_fd_close(msg->fd); + nxt_fd_close(msg->fd[0]); nxt_mp_release(reopen->mem_pool); } @@ -3633,22 +3730,17 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) { nxt_int_t ret; + nxt_app_t *app; nxt_buf_t *b, *next; nxt_port_t *app_port; nxt_unit_field_t *f; nxt_http_field_t *field; nxt_http_request_t *r; nxt_unit_response_t *resp; - nxt_request_app_link_t *req_app_link; nxt_request_rpc_data_t *req_rpc_data; - b = msg->buf; req_rpc_data = data; - if (msg->size == 0) { - b = NULL; - } - r = req_rpc_data->request; if (nxt_slow_path(r == NULL)) { return; @@ -3659,19 +3751,32 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, return; } + app = req_rpc_data->app; + nxt_assert(app != NULL); + + if (msg->port_msg.type == _NXT_PORT_MSG_REQ_HEADERS_ACK) { + nxt_router_req_headers_ack_handler(task, msg, req_rpc_data); + + return; + } + + b = (msg->size == 0) ? NULL : msg->buf; + if (msg->port_msg.last != 0) { nxt_debug(task, "router data create last buf"); nxt_buf_chain_add(&b, nxt_http_buf_last(r)); + req_rpc_data->rpc_cancel = 0; + req_rpc_data->apr_action = NXT_APR_GOT_RESPONSE; + nxt_request_rpc_data_unlink(task, req_rpc_data); } else { - if (req_rpc_data->app != NULL && req_rpc_data->app->timeout != 0) { + if (app->timeout != 0) { r->timer.handler = nxt_router_app_timeout; r->timer_data = req_rpc_data; - nxt_timer_add(task->thread->engine, &r->timer, - req_rpc_data->app->timeout); + nxt_timer_add(task->thread->engine, &r->timer, app->timeout); } } @@ -3767,39 +3872,21 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, if (r->websocket_handshake && r->status == NXT_HTTP_SWITCHING_PROTOCOLS) { - req_app_link = nxt_request_app_link_alloc(task, - req_rpc_data->req_app_link, - req_rpc_data); - if (nxt_slow_path(req_app_link == NULL)) { - goto fail; - } - - app_port = req_app_link->app_port; - - if (app_port == NULL && req_rpc_data->app_port != NULL) { - req_app_link->app_port = req_rpc_data->app_port; - app_port = req_app_link->app_port; - req_app_link->apr_action = req_rpc_data->apr_action; - - req_rpc_data->app_port = NULL; - } - + app_port = req_rpc_data->app_port; if (nxt_slow_path(app_port == NULL)) { goto fail; } - nxt_thread_mutex_lock(&req_rpc_data->app->mutex); + nxt_thread_mutex_lock(&app->mutex); - nxt_queue_insert_tail(&app_port->active_websockets, - &req_app_link->link_port_websockets); + app_port->main_app_port->active_websockets++; - nxt_thread_mutex_unlock(&req_rpc_data->app->mutex); + nxt_thread_mutex_unlock(&app->mutex); nxt_router_app_port_release(task, app_port, NXT_APR_UPGRADE); - req_app_link->apr_action = NXT_APR_CLOSE; + req_rpc_data->apr_action = NXT_APR_CLOSE; - nxt_debug(task, "req_app_link stream #%uD upgrade", - req_app_link->stream); + nxt_debug(task, "stream #%uD upgrade", req_rpc_data->stream); r->state = &nxt_http_websocket; @@ -3818,6 +3905,133 @@ fail: } +static void +nxt_router_req_headers_ack_handler(nxt_task_t *task, + nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data) +{ + int res; + nxt_app_t *app; + nxt_bool_t start_process, unlinked; + nxt_port_t *app_port, *main_app_port, *idle_port; + nxt_queue_link_t *idle_lnk; + nxt_http_request_t *r; + + nxt_debug(task, "stream #%uD: got ack from %PI:%d", + req_rpc_data->stream, + msg->port_msg.pid, msg->port_msg.reply_port); + + nxt_port_rpc_ex_set_peer(task, msg->port, req_rpc_data, + msg->port_msg.pid); + + app = req_rpc_data->app; + r = req_rpc_data->request; + + start_process = 0; + unlinked = 0; + + nxt_thread_mutex_lock(&app->mutex); + + if (r->app_link.next != NULL) { + nxt_queue_remove(&r->app_link); + r->app_link.next = NULL; + + unlinked = 1; + } + + app_port = nxt_port_hash_find(&app->port_hash, msg->port_msg.pid, + msg->port_msg.reply_port); + if (nxt_slow_path(app_port == NULL)) { + nxt_thread_mutex_unlock(&app->mutex); + + nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); + + if (unlinked) { + nxt_mp_release(r->mem_pool); + } + + return; + } + + main_app_port = app_port->main_app_port; + + if (nxt_queue_chk_remove(&main_app_port->idle_link)) { + app->idle_processes--; + + nxt_debug(task, "app '%V' move port %PI:%d out of %s (ack)", + &app->name, main_app_port->pid, main_app_port->id, + (main_app_port->idle_start ? "idle_ports" : "spare_ports")); + + /* Check port was in 'spare_ports' using idle_start field. */ + if (main_app_port->idle_start == 0 + && app->idle_processes >= app->spare_processes) + { + /* + * If there is a vacant space in spare ports, + * move the last idle to spare_ports. + */ + nxt_assert(!nxt_queue_is_empty(&app->idle_ports)); + + idle_lnk = nxt_queue_last(&app->idle_ports); + idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link); + nxt_queue_remove(idle_lnk); + + nxt_queue_insert_tail(&app->spare_ports, idle_lnk); + + idle_port->idle_start = 0; + + nxt_debug(task, "app '%V' move port %PI:%d from idle_ports " + "to spare_ports", + &app->name, idle_port->pid, idle_port->id); + } + + if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) { + app->pending_processes++; + start_process = 1; + } + } + + main_app_port->active_requests++; + + nxt_port_inc_use(app_port); + + nxt_thread_mutex_unlock(&app->mutex); + + if (unlinked) { + nxt_mp_release(r->mem_pool); + } + + if (start_process) { + nxt_router_start_app_process(task, app); + } + + nxt_port_use(task, req_rpc_data->app_port, -1); + + req_rpc_data->app_port = app_port; + + if (req_rpc_data->msg_info.body_fd != -1) { + nxt_debug(task, "stream #%uD: send body fd %d", req_rpc_data->stream, + req_rpc_data->msg_info.body_fd); + + lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET); + + res = nxt_port_socket_write(task, app_port, NXT_PORT_MSG_REQ_BODY, + req_rpc_data->msg_info.body_fd, + req_rpc_data->stream, + task->thread->engine->port->id, NULL); + + if (nxt_slow_path(res != NXT_OK)) { + nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); + } + } + + if (app->timeout != 0) { + r->timer.handler = nxt_router_app_timeout; + r->timer_data = req_rpc_data; + nxt_timer_add(task->thread->engine, &r->timer, app->timeout); + } +} + + static const nxt_http_request_state_t nxt_http_request_send_state nxt_aligned(64) = { @@ -3846,42 +4060,14 @@ static void nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) { - nxt_int_t res; - nxt_port_t *port; - nxt_bool_t cancelled; - nxt_request_app_link_t *req_app_link; nxt_request_rpc_data_t *req_rpc_data; req_rpc_data = data; - req_app_link = req_rpc_data->req_app_link; - - if (req_app_link != NULL) { - cancelled = nxt_router_msg_cancel(task, &req_app_link->msg_info, - req_app_link->stream); - if (cancelled) { - res = nxt_router_app_port(task, req_rpc_data->app, req_app_link); + req_rpc_data->rpc_cancel = 0; - if (res == NXT_OK) { - port = req_app_link->app_port; - - if (nxt_slow_path(port == NULL)) { - nxt_log(task, NXT_LOG_ERR, - "port is NULL in cancelled req_app_link"); - return; - } - - nxt_port_rpc_ex_set_peer(task, task->thread->engine->port, - req_rpc_data, port->pid); - - nxt_router_app_prepare_request(task, req_app_link); - } - - msg->port_msg.last = 0; - - return; - } - } + /* TODO cancel message and return if cancelled. */ + // nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream); if (req_rpc_data->request != NULL) { nxt_http_request_error(task, req_rpc_data->request, @@ -3905,6 +4091,8 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_assert(app_joint != NULL); nxt_assert(port != NULL); + nxt_assert(port->type == NXT_PROCESS_APP); + nxt_assert(port->id == 0); app = app_joint->app; @@ -3919,6 +4107,7 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, } port->app = app; + port->main_app_port = port; nxt_thread_mutex_lock(&app->mutex); @@ -3926,24 +4115,62 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, app->pending_processes--; app->processes++; + nxt_port_hash_add(&app->port_hash, port); + app->port_hash_count++; nxt_thread_mutex_unlock(&app->mutex); nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d", &app->name, port->pid, app->processes, app->pending_processes); + nxt_router_app_shared_port_send(task, port); + nxt_router_app_port_release(task, port, NXT_APR_NEW_PORT); } +static nxt_int_t +nxt_router_app_shared_port_send(nxt_task_t *task, nxt_port_t *app_port) +{ + nxt_buf_t *b; + nxt_port_t *port; + nxt_port_msg_new_port_t *msg; + + b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, + sizeof(nxt_port_data_t)); + if (nxt_slow_path(b == NULL)) { + return NXT_ERROR; + } + + port = app_port->app->shared_port; + + nxt_debug(task, "send port %FD to process %PI", + port->pair[0], app_port->pid); + + b->mem.free += sizeof(nxt_port_msg_new_port_t); + msg = (nxt_port_msg_new_port_t *) b->mem.pos; + + msg->id = port->id; + msg->pid = port->pid; + msg->max_size = port->max_size; + msg->max_share = port->max_share; + msg->type = port->type; + + return nxt_port_socket_write2(task, app_port, + NXT_PORT_MSG_NEW_PORT, + port->pair[0], port->queue_fd, + 0, 0, b); +} + + static void nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) { - nxt_app_t *app; - nxt_app_joint_t *app_joint; - nxt_queue_link_t *lnk; - nxt_request_app_link_t *req_app_link; + nxt_app_t *app; + nxt_app_joint_t *app_joint; + nxt_queue_link_t *link; + nxt_http_request_t *r; app_joint = data; @@ -3961,117 +4188,49 @@ nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_debug(task, "app '%V' %p start error", &app->name, app); + link = NULL; + nxt_thread_mutex_lock(&app->mutex); nxt_assert(app->pending_processes != 0); app->pending_processes--; - if (!nxt_queue_is_empty(&app->requests)) { - lnk = nxt_queue_last(&app->requests); - nxt_queue_remove(lnk); - lnk->next = NULL; + if (app->processes == 0 && !nxt_queue_is_empty(&app->ack_waiting_req)) { + link = nxt_queue_first(&app->ack_waiting_req); - req_app_link = nxt_queue_link_data(lnk, nxt_request_app_link_t, - link_app_requests); - - } else { - req_app_link = NULL; + nxt_queue_remove(link); + link->next = NULL; } nxt_thread_mutex_unlock(&app->mutex); - if (req_app_link != NULL) { - nxt_debug(task, "app '%V' %p abort next stream #%uD", - &app->name, app, req_app_link->stream); - - nxt_request_app_link_error(task, app, req_app_link, - "Failed to start application process"); - nxt_request_app_link_use(task, req_app_link, -1); - } -} - -nxt_inline nxt_port_t * -nxt_router_app_get_port_for_quit(nxt_app_t *app); - -void -nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i) -{ - int c; - - c = nxt_atomic_fetch_add(&app->use_count, i); - - if (i < 0 && c == -i) { - - if (task->thread->engine != app->engine) { - nxt_event_engine_post(app->engine, &app->joint->free_app_work); - - } else { - nxt_router_free_app(task, app->joint, NULL); - } - } -} - - -nxt_inline nxt_bool_t -nxt_router_app_first_port_busy(nxt_app_t *app) -{ - nxt_port_t *port; - nxt_queue_link_t *lnk; - - lnk = nxt_queue_first(&app->ports); - port = nxt_queue_link_data(lnk, nxt_port_t, app_link); - - return port->app_pending_responses > 0; -} - - -nxt_inline nxt_port_t * -nxt_router_pop_first_port(nxt_app_t *app) -{ - nxt_port_t *port; - nxt_queue_link_t *lnk; - - lnk = nxt_queue_first(&app->ports); - nxt_queue_remove(lnk); + while (link != NULL) { + r = nxt_container_of(link, nxt_http_request_t, app_link); - port = nxt_queue_link_data(lnk, nxt_port_t, app_link); + nxt_event_engine_post(r->engine, &r->err_work); - port->app_pending_responses++; + link = NULL; - if (nxt_queue_chk_remove(&port->idle_link)) { - app->idle_processes--; - - if (port->idle_start == 0) { - nxt_assert(app->idle_processes < app->spare_processes); + nxt_thread_mutex_lock(&app->mutex); - } else { - nxt_assert(app->idle_processes >= app->spare_processes); + if (app->processes == 0 && app->pending_processes == 0 + && !nxt_queue_is_empty(&app->ack_waiting_req)) + { + link = nxt_queue_first(&app->ack_waiting_req); - port->idle_start = 0; + nxt_queue_remove(link); + link->next = NULL; } - } - if ((app->max_pending_responses == 0 - || port->app_pending_responses < app->max_pending_responses) - && (app->max_requests == 0 - || port->app_responses + port->app_pending_responses - < app->max_requests)) - { - nxt_queue_insert_tail(&app->ports, lnk); - - nxt_port_inc_use(port); - - } else { - lnk->next = NULL; + nxt_thread_mutex_unlock(&app->mutex); } - - return port; } + nxt_inline nxt_port_t * -nxt_router_app_get_port_for_quit(nxt_app_t *app) +nxt_router_app_get_port_for_quit(nxt_task_t *task, nxt_app_t *app) { nxt_port_t *port; @@ -4081,19 +4240,20 @@ nxt_router_app_get_port_for_quit(nxt_app_t *app) nxt_queue_each(port, &app->ports, nxt_port_t, app_link) { - if (port->app_pending_responses > 0) { - port = NULL; - - continue; - } - /* Caller is responsible to decrease port use count. */ nxt_queue_chk_remove(&port->app_link); if (nxt_queue_chk_remove(&port->idle_link)) { app->idle_processes--; + + nxt_debug(task, "app '%V' move port %PI:%d out of %s for quit", + &app->name, port->pid, port->id, + (port->idle_start ? "idle_ports" : "spare_ports")); } + nxt_port_hash_remove(&app->port_hash, port); + app->port_hash_count--; + port->app = NULL; app->processes--; @@ -4108,41 +4268,32 @@ nxt_router_app_get_port_for_quit(nxt_app_t *app) static void -nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app) +nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i) { - nxt_debug(task, "app '%V' %p unlink", &app->name, app); + int c; - nxt_queue_remove(&app->link); + c = nxt_atomic_fetch_add(&app->use_count, i); - nxt_router_app_use(task, app, -1); + if (i < 0 && c == -i) { + + if (task->thread->engine != app->engine) { + nxt_event_engine_post(app->engine, &app->joint->free_app_work); + + } else { + nxt_router_free_app(task, app->joint, NULL); + } + } } static void -nxt_router_app_process_request(nxt_task_t *task, void *obj, void *data) +nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app) { - nxt_request_app_link_t *req_app_link; - - req_app_link = data; - -#if (NXT_DEBUG) - { - nxt_app_t *app; - - app = obj; - - nxt_assert(app != NULL); - nxt_assert(req_app_link != NULL); - nxt_assert(req_app_link->app_port != NULL); - - nxt_debug(task, "app '%V' %p process next stream #%uD", - &app->name, app, req_app_link->stream); - } -#endif + nxt_debug(task, "app '%V' %p unlink", &app->name, app); - nxt_router_app_prepare_request(task, req_app_link); + nxt_queue_remove(&app->link); - nxt_request_app_link_use(task, req_app_link, -1); + nxt_router_app_use(task, app, -1); } @@ -4150,40 +4301,33 @@ static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, nxt_apr_action_t action) { - int inc_use; - uint32_t dec_pending, got_response; - nxt_app_t *app; - nxt_bool_t port_unchained; - nxt_bool_t send_quit, cancelled, adjust_idle_timer; - nxt_queue_link_t *lnk; - nxt_request_app_link_t *req_app_link, *pending_ra, *re_ra; - nxt_port_select_state_t state; + int inc_use; + uint32_t got_response, dec_requests; + nxt_app_t *app; + nxt_bool_t port_unchained, send_quit, adjust_idle_timer; + nxt_port_t *main_app_port; nxt_assert(port != NULL); nxt_assert(port->app != NULL); - req_app_link = NULL; - app = port->app; inc_use = 0; - dec_pending = 0; got_response = 0; + dec_requests = 0; switch (action) { case NXT_APR_NEW_PORT: break; case NXT_APR_REQUEST_FAILED: - dec_pending = 1; + dec_requests = 1; inc_use = -1; break; case NXT_APR_GOT_RESPONSE: - dec_pending = 1; got_response = 1; inc_use = -1; break; case NXT_APR_UPGRADE: - dec_pending = 1; got_response = 1; break; case NXT_APR_CLOSE: @@ -4191,120 +4335,49 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, break; } - nxt_thread_mutex_lock(&app->mutex); - - port->app_pending_responses -= dec_pending; - port->app_responses += got_response; + nxt_debug(task, "app '%V' release port %PI:%d: %d %d", &app->name, + port->pid, port->id, + (int) inc_use, (int) got_response); - if (port->pair[1] != -1 - && (app->max_pending_responses == 0 - || port->app_pending_responses < app->max_pending_responses) - && (app->max_requests == 0 - || port->app_responses + port->app_pending_responses - < app->max_requests)) - { - if (port->app_link.next == NULL) { - if (port->app_pending_responses > 0) { - nxt_queue_insert_tail(&app->ports, &port->app_link); + if (port == app->shared_port) { + nxt_thread_mutex_lock(&app->mutex); - } else { - nxt_queue_insert_head(&app->ports, &port->app_link); - } + app->active_requests -= got_response + dec_requests; - nxt_port_inc_use(port); + nxt_thread_mutex_unlock(&app->mutex); - } else { - if (port->app_pending_responses == 0 - && nxt_queue_first(&app->ports) != &port->app_link) - { - nxt_queue_remove(&port->app_link); - nxt_queue_insert_head(&app->ports, &port->app_link); - } - } + goto adjust_use; } - if (!nxt_queue_is_empty(&app->ports) - && !nxt_queue_is_empty(&app->requests)) - { - lnk = nxt_queue_first(&app->requests); - nxt_queue_remove(lnk); - lnk->next = NULL; + main_app_port = port->main_app_port; - req_app_link = nxt_queue_link_data(lnk, nxt_request_app_link_t, - link_app_requests); + nxt_thread_mutex_lock(&app->mutex); - req_app_link->app_port = nxt_router_pop_first_port(app); + main_app_port->app_responses += got_response; + main_app_port->active_requests -= got_response + dec_requests; + app->active_requests -= got_response + dec_requests; - if (req_app_link->app_port->app_pending_responses > 1) { - nxt_request_app_link_pending(task, app, req_app_link); - } - } - - /* Pop first pending request for this port. */ - if (dec_pending > 0 - && !nxt_queue_is_empty(&port->pending_requests)) + if (main_app_port->pair[1] != -1 + && (app->max_requests == 0 + || main_app_port->app_responses < app->max_requests)) { - lnk = nxt_queue_first(&port->pending_requests); - nxt_queue_remove(lnk); - lnk->next = NULL; - - pending_ra = nxt_queue_link_data(lnk, nxt_request_app_link_t, - link_port_pending); - - nxt_assert(pending_ra->link_app_pending.next != NULL); - - nxt_queue_remove(&pending_ra->link_app_pending); - pending_ra->link_app_pending.next = NULL; - - } else { - pending_ra = NULL; - } - - /* Try to cancel and re-schedule first stalled request for this app. */ - if (got_response > 0 && !nxt_queue_is_empty(&app->pending)) { - lnk = nxt_queue_first(&app->pending); + if (main_app_port->app_link.next == NULL) { + nxt_queue_insert_tail(&app->ports, &main_app_port->app_link); - re_ra = nxt_queue_link_data(lnk, nxt_request_app_link_t, - link_app_pending); - - if (re_ra->res_time <= nxt_thread_monotonic_time(task->thread)) { - - nxt_debug(task, "app '%V' stalled request #%uD detected", - &app->name, re_ra->stream); - - cancelled = nxt_router_msg_cancel(task, &re_ra->msg_info, - re_ra->stream); - - if (cancelled) { - state.req_app_link = re_ra; - state.app = app; - - /* - * Need to increment use count "in advance" because - * nxt_router_port_select() will remove re_ra from lists - * and decrement use count. - */ - nxt_request_app_link_inc_use(re_ra); - - nxt_router_port_select(task, &state); - - goto re_ra_cancelled; - } + nxt_port_inc_use(main_app_port); } } - re_ra = NULL; - -re_ra_cancelled: - send_quit = (app->max_requests > 0 - && port->app_pending_responses == 0 - && port->app_responses >= app->max_requests); + && main_app_port->app_responses >= app->max_requests); if (send_quit) { - port_unchained = nxt_queue_chk_remove(&port->app_link); + port_unchained = nxt_queue_chk_remove(&main_app_port->app_link); - port->app = NULL; + nxt_port_hash_remove(&app->port_hash, main_app_port); + app->port_hash_count--; + + main_app_port->app = NULL; app->processes--; } else { @@ -4313,9 +4386,10 @@ re_ra_cancelled: adjust_idle_timer = 0; - if (port->pair[1] != -1 && !send_quit && port->app_pending_responses == 0 - && nxt_queue_is_empty(&port->active_websockets) - && port->idle_link.next == NULL) + if (main_app_port->pair[1] != -1 && !send_quit + && main_app_port->active_requests == 0 + && main_app_port->active_websockets == 0 + && main_app_port->idle_link.next == NULL) { if (app->idle_processes == app->spare_processes && app->adjust_idle_work.data == NULL) @@ -4326,12 +4400,17 @@ re_ra_cancelled: } if (app->idle_processes < app->spare_processes) { - nxt_queue_insert_tail(&app->spare_ports, &port->idle_link); + nxt_queue_insert_tail(&app->spare_ports, &main_app_port->idle_link); + nxt_debug(task, "app '%V' move port %PI:%d to spare_ports", + &app->name, main_app_port->pid, main_app_port->id); } else { - nxt_queue_insert_tail(&app->idle_ports, &port->idle_link); + nxt_queue_insert_tail(&app->idle_ports, &main_app_port->idle_link); - port->idle_start = task->thread->engine->timers.now; + main_app_port->idle_start = task->thread->engine->timers.now; + + nxt_debug(task, "app '%V' move port %PI:%d to idle_ports", + &app->name, main_app_port->pid, main_app_port->id); } app->idle_processes++; @@ -4344,60 +4423,22 @@ re_ra_cancelled: nxt_event_engine_post(app->engine, &app->adjust_idle_work); } - if (pending_ra != NULL) { - nxt_request_app_link_use(task, pending_ra, -1); - } - - if (re_ra != NULL) { - if (nxt_router_port_post_select(task, &state) == NXT_OK) { - /* - * Reference counter already incremented above, this will - * keep re_ra while nxt_router_app_process_request() - * task is in queue. Reference counter decreased in - * nxt_router_app_process_request() after processing. - */ - - nxt_work_queue_add(&task->thread->engine->fast_work_queue, - nxt_router_app_process_request, - &task->thread->engine->task, app, re_ra); - - } else { - nxt_request_app_link_use(task, re_ra, -1); - } - } - - if (req_app_link != NULL) { - /* - * There should be call nxt_request_app_link_inc_use(req_app_link), - * because of one more link in the queue. But one link was - * recently removed from app->requests linked list. - * Corresponding decrement is in nxt_router_app_process_request(). - */ - - nxt_work_queue_add(&task->thread->engine->fast_work_queue, - nxt_router_app_process_request, - &task->thread->engine->task, app, req_app_link); - - goto adjust_use; - } - /* ? */ - if (port->pair[1] == -1) { + if (main_app_port->pair[1] == -1) { nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)", - &app->name, app, port, port->pid); + &app->name, app, main_app_port, main_app_port->pid); goto adjust_use; } if (send_quit) { - nxt_debug(task, "app '%V' %p send QUIT to port", - &app->name, app); + nxt_debug(task, "app '%V' %p send QUIT to port", &app->name, app); - nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, - -1, 0, 0, NULL); + nxt_port_socket_write(task, main_app_port, NXT_PORT_MSG_QUIT, -1, 0, 0, + NULL); if (port_unchained) { - nxt_port_use(task, port, -1); + nxt_port_use(task, main_app_port, -1); } goto adjust_use; @@ -4426,11 +4467,27 @@ nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port) nxt_thread_mutex_lock(&app->mutex); + nxt_port_hash_remove(&app->port_hash, port); + app->port_hash_count--; + + if (port->id != 0) { + nxt_thread_mutex_unlock(&app->mutex); + + nxt_debug(task, "app '%V' port (%PI, %d) closed", &app->name, + port->pid, port->id); + + return; + } + unchain = nxt_queue_chk_remove(&port->app_link); if (nxt_queue_chk_remove(&port->idle_link)) { app->idle_processes--; + nxt_debug(task, "app '%V' move port %PI:%d out of %s before close", + &app->name, port->pid, port->id, + (port->idle_start ? "idle_ports" : "spare_ports")); + if (port->idle_start == 0 && app->idle_processes >= app->spare_processes) { @@ -4443,6 +4500,10 @@ nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port) nxt_queue_insert_tail(&app->spare_ports, idle_lnk); idle_port->idle_start = 0; + + nxt_debug(task, "app '%V' move port %PI:%d from idle_ports " + "to spare_ports", + &app->name, idle_port->pid, idle_port->id); } } @@ -4450,8 +4511,7 @@ nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port) start_process = !task->thread->engine->shutdown && nxt_router_app_can_start(app) - && (!nxt_queue_is_empty(&app->requests) - || nxt_router_app_need_start(app)); + && nxt_router_app_need_start(app); if (start_process) { app->pending_processes++; @@ -4500,6 +4560,10 @@ nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data) app->adjust_idle_work.data = NULL; } + nxt_debug(task, "app '%V' idle_processes %d, spare_processes %d", + &app->name, + (int) app->idle_processes, (int) app->spare_processes); + while (app->idle_processes > app->spare_processes) { nxt_assert(!nxt_queue_is_empty(&app->idle_ports)); @@ -4509,6 +4573,10 @@ nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data) timeout = port->idle_start + app->idle_timeout; + nxt_debug(task, "app '%V' pid %PI, start %M, timeout %M, threshold %M", + &app->name, port->pid, + port->idle_start, timeout, threshold); + if (timeout > threshold) { break; } @@ -4516,8 +4584,14 @@ nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data) nxt_queue_remove(lnk); lnk->next = NULL; + nxt_debug(task, "app '%V' move port %PI:%d out of idle_ports (timeout)", + &app->name, port->pid, port->id); + nxt_queue_chk_remove(&port->app_link); + nxt_port_hash_remove(&app->port_hash, port); + app->port_hash_count--; + app->idle_processes--; app->processes--; port->app = NULL; @@ -4588,7 +4662,7 @@ nxt_router_free_app(nxt_task_t *task, void *obj, void *data) app = app_joint->app; for ( ;; ) { - port = nxt_router_app_get_port_for_quit(app); + port = nxt_router_app_get_port_for_quit(task, app); if (port == NULL) { break; } @@ -4601,12 +4675,23 @@ nxt_router_free_app(nxt_task_t *task, void *obj, void *data) } nxt_assert(app->processes == 0); + nxt_assert(app->active_requests == 0); + nxt_assert(app->port_hash_count == 0); nxt_assert(app->idle_processes == 0); - nxt_assert(nxt_queue_is_empty(&app->requests)); nxt_assert(nxt_queue_is_empty(&app->ports)); nxt_assert(nxt_queue_is_empty(&app->spare_ports)); nxt_assert(nxt_queue_is_empty(&app->idle_ports)); + nxt_port_mmaps_destroy(&app->outgoing, 1); + + nxt_thread_mutex_destroy(&app->outgoing.mutex); + + if (app->shared_port != NULL) { + app->shared_port->app = NULL; + nxt_port_close(task, app->shared_port); + nxt_port_use(task, app->shared_port, -1); + } + nxt_thread_mutex_destroy(&app->mutex); nxt_mp_destroy(app->mem_pool); @@ -4623,178 +4708,49 @@ nxt_router_free_app(nxt_task_t *task, void *obj, void *data) static void -nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state) -{ - int ra_use_delta; - nxt_app_t *app; - nxt_bool_t can_start_process; - nxt_request_app_link_t *req_app_link; - - req_app_link = state->req_app_link; - app = state->app; - - state->failed_port_use_delta = 0; - ra_use_delta = -nxt_queue_chk_remove(&req_app_link->link_app_requests); - - if (nxt_queue_chk_remove(&req_app_link->link_port_pending)) - { - nxt_assert(req_app_link->link_app_pending.next != NULL); - - nxt_queue_remove(&req_app_link->link_app_pending); - req_app_link->link_app_pending.next = NULL; - - ra_use_delta--; - } - - state->failed_port = req_app_link->app_port; - - if (req_app_link->app_port != NULL) { - state->failed_port_use_delta--; - - state->failed_port->app_pending_responses--; - - if (nxt_queue_chk_remove(&state->failed_port->app_link)) { - state->failed_port_use_delta--; - } - - req_app_link->app_port = NULL; - } - - can_start_process = nxt_router_app_can_start(app); - - state->port = NULL; - state->start_process = 0; - - if (nxt_queue_is_empty(&app->ports) - || (can_start_process && nxt_router_app_first_port_busy(app)) ) - { - req_app_link = nxt_request_app_link_alloc(task, req_app_link, - req_app_link->req_rpc_data); - if (nxt_slow_path(req_app_link == NULL)) { - goto fail; - } - - if (nxt_slow_path(state->failed_port != NULL)) { - nxt_queue_insert_head(&app->requests, - &req_app_link->link_app_requests); - - } else { - nxt_queue_insert_tail(&app->requests, - &req_app_link->link_app_requests); - } - - ra_use_delta++; - - nxt_debug(task, "req_app_link stream #%uD enqueue to app->requests", - req_app_link->stream); - - if (can_start_process) { - app->pending_processes++; - state->start_process = 1; - } - - } else { - state->port = nxt_router_pop_first_port(app); - - if (state->port->app_pending_responses > 1) { - req_app_link = nxt_request_app_link_alloc(task, req_app_link, - req_app_link->req_rpc_data); - if (nxt_slow_path(req_app_link == NULL)) { - goto fail; - } - - req_app_link->app_port = state->port; - - nxt_request_app_link_pending(task, app, req_app_link); - } - - if (can_start_process && nxt_router_app_need_start(app)) { - app->pending_processes++; - state->start_process = 1; - } - } - - nxt_request_app_link_chk_use(req_app_link, ra_use_delta); - -fail: - - state->shared_ra = req_app_link; -} - - -static nxt_int_t -nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state) +nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app, + nxt_request_rpc_data_t *req_rpc_data) { - nxt_int_t res; - nxt_app_t *app; - nxt_request_app_link_t *req_app_link; + nxt_bool_t start_process; + nxt_port_t *port; + nxt_http_request_t *r; - req_app_link = state->shared_ra; - app = state->app; + start_process = 0; - if (state->failed_port_use_delta != 0) { - nxt_port_use(task, state->failed_port, state->failed_port_use_delta); - } + nxt_thread_mutex_lock(&app->mutex); - if (nxt_slow_path(req_app_link == NULL)) { - if (state->port != NULL) { - nxt_port_use(task, state->port, -1); - } + port = app->shared_port; + nxt_port_inc_use(port); - nxt_request_app_link_error(task, app, state->req_app_link, - "Failed to allocate shared req<->app link"); + app->active_requests++; - return NXT_ERROR; + if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) { + app->pending_processes++; + start_process = 1; } - if (state->port != NULL) { - nxt_debug(task, "already have port for app '%V' %p ", &app->name, app); - - req_app_link->app_port = state->port; - - if (state->start_process) { - nxt_router_start_app_process(task, app); - } - - return NXT_OK; - } + r = req_rpc_data->request; - if (!state->start_process) { - nxt_debug(task, "app '%V' %p too many running or pending processes", - &app->name, app); + /* + * Put request into application-wide list to be able to cancel request + * if something goes wrong with application processes. + */ + nxt_queue_insert_tail(&app->ack_waiting_req, &r->app_link); - return NXT_AGAIN; - } + nxt_thread_mutex_unlock(&app->mutex); - res = nxt_router_start_app_process(task, app); + /* + * Retain request memory pool while request is linked in ack_waiting_req + * to guarantee request structure memory is accessble. + */ + nxt_mp_retain(r->mem_pool); - if (nxt_slow_path(res != NXT_OK)) { - nxt_request_app_link_error(task, app, req_app_link, - "Failed to start app process"); + req_rpc_data->app_port = port; + req_rpc_data->apr_action = NXT_APR_REQUEST_FAILED; - return NXT_ERROR; + if (start_process) { + nxt_router_start_app_process(task, app); } - - return NXT_AGAIN; -} - - -static nxt_int_t -nxt_router_app_port(nxt_task_t *task, nxt_app_t *app, - nxt_request_app_link_t *req_app_link) -{ - nxt_port_select_state_t state; - - state.req_app_link = req_app_link; - state.app = app; - - nxt_thread_mutex_lock(&app->mutex); - - nxt_router_port_select(task, &state); - - nxt_thread_mutex_unlock(&app->mutex); - - return nxt_router_port_post_select(task, &state); } @@ -4802,10 +4758,7 @@ void nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r, nxt_app_t *app) { - nxt_int_t res; - nxt_port_t *port; nxt_event_engine_t *engine; - nxt_request_app_link_t ra_local, *req_app_link; nxt_request_rpc_data_t *req_rpc_data; engine = task->thread->engine; @@ -4824,7 +4777,7 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r, * in port handlers. Need to fixup request memory pool. Counterpart * release will be called via following call chain: * nxt_request_rpc_data_unlink() -> - * nxt_router_http_request_done() -> + * nxt_router_http_request_release_post() -> * nxt_router_http_request_release() */ nxt_mp_retain(r->mem_pool); @@ -4834,31 +4787,63 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r, r->timer.log = engine->task.log; r->timer.bias = NXT_TIMER_DEFAULT_BIAS; + r->engine = engine; + r->err_work.handler = nxt_router_http_request_error; + r->err_work.task = task; + r->err_work.obj = r; + req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data); req_rpc_data->app = app; + req_rpc_data->msg_info.body_fd = -1; + req_rpc_data->rpc_cancel = 1; nxt_router_app_use(task, app, 1); req_rpc_data->request = r; r->req_rpc_data = req_rpc_data; - req_app_link = &ra_local; - nxt_request_app_link_init(task, req_app_link, req_rpc_data); + if (r->last != NULL) { + r->last->completion_handler = nxt_router_http_request_done; + } - res = nxt_router_app_port(task, app, req_app_link); - req_app_link = req_rpc_data->req_app_link; + nxt_router_app_port_get(task, app, req_rpc_data); + nxt_router_app_prepare_request(task, req_rpc_data); +} - if (res == NXT_OK) { - port = req_app_link->app_port; - nxt_assert(port != NULL); +static void +nxt_router_http_request_error(nxt_task_t *task, void *obj, void *data) +{ + nxt_http_request_t *r; + + r = obj; + + nxt_debug(task, "router http request error (rpc_data %p)", r->req_rpc_data); - nxt_port_rpc_ex_set_peer(task, engine->port, req_rpc_data, port->pid); + nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); + + if (r->req_rpc_data != NULL) { + nxt_request_rpc_data_unlink(task, r->req_rpc_data); + } + + nxt_mp_release(r->mem_pool); +} + + +static void +nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data) +{ + nxt_http_request_t *r; + + r = data; - nxt_router_app_prepare_request(task, req_app_link); + nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data); + + if (r->req_rpc_data != NULL) { + nxt_request_rpc_data_unlink(task, r->req_rpc_data); } - nxt_request_app_link_use(task, req_app_link, -1); + nxt_http_request_close_handler(task, r, r->proto.any); } @@ -4870,91 +4855,106 @@ nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj, void *data) static void nxt_router_app_prepare_request(nxt_task_t *task, - nxt_request_app_link_t *req_app_link) + nxt_request_rpc_data_t *req_rpc_data) { - nxt_buf_t *buf; + nxt_app_t *app; + nxt_buf_t *buf, *body; nxt_int_t res; - nxt_port_t *port, *c_port, *reply_port; - nxt_apr_action_t apr_action; + nxt_port_t *port, *reply_port; - nxt_assert(req_app_link->app_port != NULL); + int notify; + struct { + nxt_port_msg_t pm; + nxt_port_mmap_msg_t mm; + } msg; - port = req_app_link->app_port; - reply_port = req_app_link->reply_port; - apr_action = NXT_APR_REQUEST_FAILED; + app = req_rpc_data->app; - c_port = nxt_process_connected_port_find(port->process, reply_port); + nxt_assert(app != NULL); - if (nxt_slow_path(c_port != reply_port)) { - res = nxt_port_send_port(task, port, reply_port, 0); + port = req_rpc_data->app_port; - if (nxt_slow_path(res != NXT_OK)) { - nxt_request_app_link_error(task, port->app, req_app_link, - "Failed to send reply port to application"); + nxt_assert(port != NULL); + nxt_assert(port->queue != NULL); - goto release_port; - } + reply_port = task->thread->engine->port; - nxt_process_connected_port_add(port->process, reply_port); - } + buf = nxt_router_prepare_msg(task, req_rpc_data->request, app, + nxt_app_msg_prefix[app->type]); + if (nxt_slow_path(buf == NULL)) { + nxt_alert(task, "stream #%uD, app '%V': failed to prepare app message", + req_rpc_data->stream, &app->name); - buf = nxt_router_prepare_msg(task, req_app_link->request, port, - nxt_app_msg_prefix[port->app->type]); + nxt_http_request_error(task, req_rpc_data->request, + NXT_HTTP_INTERNAL_SERVER_ERROR); - if (nxt_slow_path(buf == NULL)) { - nxt_request_app_link_error(task, port->app, req_app_link, - "Failed to prepare message for application"); - goto release_port; + return; } nxt_debug(task, "about to send %O bytes buffer to app process port %d", nxt_buf_used_size(buf), port->socket.fd); - apr_action = NXT_APR_NEW_PORT; - - req_app_link->msg_info.buf = buf; - req_app_link->msg_info.completion_handler = buf->completion_handler; + req_rpc_data->msg_info.buf = buf; + req_rpc_data->msg_info.completion_handler = buf->completion_handler; - for (; buf; buf = buf->next) { + do { buf->completion_handler = nxt_router_dummy_buf_completion; - } + buf = buf->next; + } while (buf != NULL); - buf = req_app_link->msg_info.buf; + buf = req_rpc_data->msg_info.buf; - res = nxt_port_mmap_get_tracking(task, port, - &req_app_link->msg_info.tracking, - req_app_link->stream); - if (nxt_slow_path(res != NXT_OK)) { - nxt_request_app_link_error(task, port->app, req_app_link, - "Failed to get tracking area"); - goto release_port; - } - - if (req_app_link->body_fd != -1) { - nxt_debug(task, "stream #%uD: send body fd %d", req_app_link->stream, - req_app_link->body_fd); + body = req_rpc_data->request->body; - lseek(req_app_link->body_fd, 0, SEEK_SET); - } + if (body != NULL && nxt_buf_is_file(body)) { + req_rpc_data->msg_info.body_fd = body->file->fd; - res = nxt_port_socket_twrite(task, port, NXT_PORT_MSG_REQ_HEADERS, - req_app_link->body_fd, - req_app_link->stream, reply_port->id, buf, - &req_app_link->msg_info.tracking); + body->file->fd = -1; - if (nxt_slow_path(res != NXT_OK)) { - nxt_request_app_link_error(task, port->app, req_app_link, - "Failed to send message to application"); - goto release_port; - } + } else { + req_rpc_data->msg_info.body_fd = -1; + } + + msg.pm.stream = req_rpc_data->stream; + msg.pm.pid = reply_port->pid; + msg.pm.reply_port = reply_port->id; + msg.pm.type = NXT_PORT_MSG_REQ_HEADERS; + msg.pm.last = 0; + msg.pm.mmap = 1; + msg.pm.nf = 0; + msg.pm.mf = 0; + msg.pm.tracking = 0; + + nxt_port_mmap_handler_t *mmap_handler = buf->parent; + nxt_port_mmap_header_t *hdr = mmap_handler->hdr; + + msg.mm.mmap_id = hdr->id; + msg.mm.chunk_id = nxt_port_mmap_chunk_id(hdr, buf->mem.pos); + msg.mm.size = nxt_buf_used_size(buf); + + res = nxt_app_queue_send(port->queue, &msg, sizeof(msg), + req_rpc_data->stream, ¬ify, + &req_rpc_data->msg_info.tracking_cookie); + if (nxt_fast_path(res == NXT_OK)) { + if (notify != 0) { + (void) nxt_port_socket_write(task, port, + NXT_PORT_MSG_READ_QUEUE, + -1, req_rpc_data->stream, + reply_port->id, NULL); -release_port: + } else { + nxt_debug(task, "queue is not empty"); + } - nxt_router_app_port_release(task, port, apr_action); + } else { + nxt_alert(task, "stream #%uD, app '%V': failed to send app message", + req_rpc_data->stream, &app->name); - nxt_request_app_link_update_peer(task, req_app_link); + nxt_http_request_error(task, req_rpc_data->request, + NXT_HTTP_INTERNAL_SERVER_ERROR); + } } @@ -5012,7 +5012,7 @@ nxt_fields_next(nxt_fields_iter_t *i) static nxt_buf_t * nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r, - nxt_port_t *port, const nxt_str_t *prefix) + nxt_app_t *app, const nxt_str_t *prefix) { void *target_pos, *query_pos; u_char *pos, *end, *p, c; @@ -5053,7 +5053,7 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r, return NULL; } - out = nxt_port_mmap_get_buf(task, port, + out = nxt_port_mmap_get_buf(task, &app->outgoing, nxt_min(req_size + content_length, PORT_MMAP_DATA_SIZE)); if (nxt_slow_path(out == NULL)) { return NULL; @@ -5235,7 +5235,7 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r, if (buf == NULL) { free_size = nxt_min(size, PORT_MMAP_DATA_SIZE); - buf = nxt_port_mmap_get_buf(task, port, free_size); + buf = nxt_port_mmap_get_buf(task, &app->outgoing, free_size); if (nxt_slow_path(buf == NULL)) { while (out != NULL) { buf = out->next; @@ -5283,15 +5283,9 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r, static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data) { - nxt_app_t *app; - nxt_bool_t cancelled, unlinked; - nxt_port_t *port; nxt_timer_t *timer; - nxt_queue_link_t *lnk; nxt_http_request_t *r; - nxt_request_app_link_t *pending_ra; nxt_request_rpc_data_t *req_rpc_data; - nxt_port_select_state_t state; timer = obj; @@ -5299,94 +5293,6 @@ nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data) r = nxt_timer_data(timer, nxt_http_request_t, timer); req_rpc_data = r->timer_data; - app = req_rpc_data->app; - - if (app == NULL) { - goto generate_error; - } - - port = NULL; - pending_ra = NULL; - - if (req_rpc_data->app_port != NULL) { - port = req_rpc_data->app_port; - req_rpc_data->app_port = NULL; - } - - if (port == NULL && req_rpc_data->req_app_link != NULL - && req_rpc_data->req_app_link->app_port != NULL) - { - port = req_rpc_data->req_app_link->app_port; - req_rpc_data->req_app_link->app_port = NULL; - } - - if (port == NULL) { - goto generate_error; - } - - nxt_thread_mutex_lock(&app->mutex); - - unlinked = nxt_queue_chk_remove(&port->app_link); - - if (!nxt_queue_is_empty(&port->pending_requests)) { - lnk = nxt_queue_first(&port->pending_requests); - - pending_ra = nxt_queue_link_data(lnk, nxt_request_app_link_t, - link_port_pending); - - nxt_assert(pending_ra->link_app_pending.next != NULL); - - nxt_debug(task, "app '%V' pending request #%uD found", - &app->name, pending_ra->stream); - - cancelled = nxt_router_msg_cancel(task, &pending_ra->msg_info, - pending_ra->stream); - - if (cancelled) { - state.req_app_link = pending_ra; - state.app = app; - - /* - * Need to increment use count "in advance" because - * nxt_router_port_select() will remove pending_ra from lists - * and decrement use count. - */ - nxt_request_app_link_inc_use(pending_ra); - - nxt_router_port_select(task, &state); - - } else { - pending_ra = NULL; - } - } - - nxt_thread_mutex_unlock(&app->mutex); - - if (pending_ra != NULL) { - if (nxt_router_port_post_select(task, &state) == NXT_OK) { - /* - * Reference counter already incremented above, this will - * keep pending_ra while nxt_router_app_process_request() - * task is in queue. Reference counter decreased in - * nxt_router_app_process_request() after processing. - */ - - nxt_work_queue_add(&task->thread->engine->fast_work_queue, - nxt_router_app_process_request, - &task->thread->engine->task, app, pending_ra); - - } else { - nxt_request_app_link_use(task, pending_ra, -1); - } - } - - nxt_debug(task, "send quit to app '%V' pid %PI", &app->name, port->pid); - - nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); - - nxt_port_use(task, port, unlinked ? -2 : -1); - -generate_error: nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); @@ -5394,13 +5300,11 @@ generate_error: } -static nxt_int_t -nxt_router_http_request_done(nxt_task_t *task, nxt_http_request_t *r) +static void +nxt_router_http_request_release_post(nxt_task_t *task, nxt_http_request_t *r) { r->timer.handler = nxt_router_http_request_release; nxt_timer_add(task->thread->engine, &r->timer, 0); - - return NXT_OK; } @@ -5409,7 +5313,7 @@ nxt_router_http_request_release(nxt_task_t *task, void *obj, void *data) { nxt_http_request_t *r; - nxt_debug(task, "http app release"); + nxt_debug(task, "http request pool release"); r = nxt_timer_data(obj, nxt_http_request_t, timer); @@ -5468,3 +5372,120 @@ nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) -1, 0, 0, NULL); } } + + +static void +nxt_router_get_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + nxt_fd_t fd; + nxt_port_t *port; + nxt_runtime_t *rt; + nxt_port_mmaps_t *mmaps; + nxt_port_msg_get_mmap_t *get_mmap_msg; + nxt_port_mmap_handler_t *mmap_handler; + + rt = task->thread->runtime; + + port = nxt_runtime_port_find(rt, msg->port_msg.pid, + msg->port_msg.reply_port); + if (nxt_slow_path(port == NULL)) { + nxt_alert(task, "get_mmap_handler: reply_port %PI:%d not found", + msg->port_msg.pid, msg->port_msg.reply_port); + + return; + } + + if (nxt_slow_path(nxt_buf_used_size(msg->buf) + < (int) sizeof(nxt_port_msg_get_mmap_t))) + { + nxt_alert(task, "get_mmap_handler: message buffer too small (%d)", + (int) nxt_buf_used_size(msg->buf)); + + return; + } + + get_mmap_msg = (nxt_port_msg_get_mmap_t *) msg->buf->mem.pos; + + nxt_assert(port->type == NXT_PROCESS_APP); + + if (nxt_slow_path(port->app == NULL)) { + nxt_alert(task, "get_mmap_handler: app == NULL for reply port %PI:%d", + port->pid, port->id); + + // FIXME + nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, + -1, msg->port_msg.stream, 0, NULL); + + return; + } + + mmaps = &port->app->outgoing; + nxt_thread_mutex_lock(&mmaps->mutex); + + if (nxt_slow_path(get_mmap_msg->id >= mmaps->size)) { + nxt_thread_mutex_unlock(&mmaps->mutex); + + nxt_alert(task, "get_mmap_handler: mmap id is too big (%d)", + (int) get_mmap_msg->id); + + // FIXME + nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, + -1, msg->port_msg.stream, 0, NULL); + return; + } + + mmap_handler = mmaps->elts[get_mmap_msg->id].mmap_handler; + + fd = mmap_handler->fd; + + nxt_thread_mutex_unlock(&mmaps->mutex); + + nxt_debug(task, "get mmap %PI:%d found", + msg->port_msg.pid, (int) get_mmap_msg->id); + + (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL); +} + + +static void +nxt_router_get_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + nxt_port_t *port, *reply_port; + nxt_runtime_t *rt; + nxt_port_msg_get_port_t *get_port_msg; + + rt = task->thread->runtime; + + reply_port = nxt_runtime_port_find(rt, msg->port_msg.pid, + msg->port_msg.reply_port); + if (nxt_slow_path(reply_port == NULL)) { + nxt_alert(task, "get_port_handler: reply_port %PI:%d not found", + msg->port_msg.pid, msg->port_msg.reply_port); + + return; + } + + if (nxt_slow_path(nxt_buf_used_size(msg->buf) + < (int) sizeof(nxt_port_msg_get_port_t))) + { + nxt_alert(task, "get_port_handler: message buffer too small (%d)", + (int) nxt_buf_used_size(msg->buf)); + + return; + } + + get_port_msg = (nxt_port_msg_get_port_t *) msg->buf->mem.pos; + + port = nxt_runtime_port_find(rt, get_port_msg->pid, get_port_msg->id); + if (nxt_slow_path(port == NULL)) { + nxt_alert(task, "get_port_handler: port %PI:%d not found", + get_port_msg->pid, get_port_msg->id); + + return; + } + + nxt_debug(task, "get port %PI:%d found", get_port_msg->pid, + get_port_msg->id); + + (void) nxt_port_send_port(task, reply_port, port, msg->port_msg.stream); +} |