diff options
Diffstat (limited to '')
-rw-r--r-- | src/nxt_router.c | 1504 |
1 files changed, 462 insertions, 1042 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c index 4df1489d..44b303e4 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -61,59 +61,13 @@ typedef struct { } nxt_app_rpc_t; -struct nxt_port_select_state_s { - nxt_app_t *app; - nxt_request_app_link_t *req_app_link; - - nxt_port_t *failed_port; - int failed_port_use_delta; - - uint8_t start_process; /* 1 bit */ - nxt_request_app_link_t *shared_ra; - nxt_port_t *port; -}; - -typedef struct nxt_port_select_state_s nxt_port_select_state_t; - static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp); static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data); static void nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port); -static void nxt_router_port_select(nxt_task_t *task, - nxt_port_select_state_t *state); - -static nxt_int_t nxt_router_port_post_select(nxt_task_t *task, - nxt_port_select_state_t *state); - static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app); -static void nxt_request_app_link_update_peer(nxt_task_t *task, - nxt_request_app_link_t *req_app_link); - - -nxt_inline void -nxt_request_app_link_inc_use(nxt_request_app_link_t *req_app_link) -{ - nxt_atomic_fetch_add(&req_app_link->use_count, 1); -} - -nxt_inline void -nxt_request_app_link_chk_use(nxt_request_app_link_t *req_app_link, int i) -{ -#if (NXT_DEBUG) - int c; - - c = nxt_atomic_fetch_add(&req_app_link->use_count, i); - - nxt_assert((c + i) > 0); -#else - (void) nxt_atomic_fetch_add(&req_app_link->use_count, i); -#endif -} - -static void nxt_request_app_link_use(nxt_task_t *task, - nxt_request_app_link_t *req_app_link, int i); static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task); static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data); @@ -196,6 +150,8 @@ static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data); static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data); +static void nxt_router_req_headers_ack_handler(nxt_task_t *task, + nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data); static void nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf); @@ -220,6 +176,8 @@ static void nxt_router_access_log_reopen_error(nxt_task_t *task, static void nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data); +static nxt_int_t nxt_router_app_shared_port_send(nxt_task_t *task, + nxt_port_t *app_port); static void nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data); @@ -227,13 +185,15 @@ static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app); static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, nxt_apr_action_t action); -static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_app_t *app, - nxt_request_app_link_t *req_app_link); +static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app, + nxt_request_rpc_data_t *req_rpc_data); +static void nxt_router_http_request_done(nxt_task_t *task, void *obj, + void *data); static void nxt_router_app_prepare_request(nxt_task_t *task, - nxt_request_app_link_t *req_app_link); + nxt_request_rpc_data_t *req_rpc_data); static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task, - nxt_http_request_t *r, nxt_port_t *port, const nxt_str_t *prefix); + nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix); static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data); static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, @@ -250,7 +210,7 @@ static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data); static void nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i); -static nxt_int_t nxt_router_http_request_done(nxt_task_t *task, +static void nxt_router_http_request_release_post(nxt_task_t *task, nxt_http_request_t *r); static void nxt_router_http_request_release(nxt_task_t *task, void *obj, void *data); @@ -501,83 +461,6 @@ nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app) } -nxt_inline void -nxt_request_app_link_init(nxt_task_t *task, - nxt_request_app_link_t *req_app_link, nxt_request_rpc_data_t *req_rpc_data) -{ - nxt_buf_t *body; - nxt_event_engine_t *engine; - - engine = task->thread->engine; - - nxt_memzero(req_app_link, sizeof(nxt_request_app_link_t)); - - req_app_link->stream = req_rpc_data->stream; - req_app_link->use_count = 1; - req_app_link->req_rpc_data = req_rpc_data; - req_rpc_data->req_app_link = req_app_link; - req_app_link->reply_port = engine->port; - req_app_link->request = req_rpc_data->request; - req_app_link->apr_action = NXT_APR_GOT_RESPONSE; - - req_app_link->work.handler = NULL; - req_app_link->work.task = &engine->task; - req_app_link->work.obj = req_app_link; - req_app_link->work.data = engine; - - body = req_rpc_data->request->body; - - if (body != NULL && nxt_buf_is_file(body)) { - req_app_link->body_fd = body->file->fd; - - body->file->fd = -1; - - } else { - req_app_link->body_fd = -1; - } -} - - -nxt_inline nxt_request_app_link_t * -nxt_request_app_link_alloc(nxt_task_t *task, - nxt_request_app_link_t *ra_src, nxt_request_rpc_data_t *req_rpc_data) -{ - nxt_mp_t *mp; - nxt_request_app_link_t *req_app_link; - - if (ra_src != NULL && ra_src->mem_pool != NULL) { - return ra_src; - } - - mp = req_rpc_data->request->mem_pool; - - req_app_link = nxt_mp_alloc(mp, sizeof(nxt_request_app_link_t)); - - if (nxt_slow_path(req_app_link == NULL)) { - - req_rpc_data->req_app_link = NULL; - - if (ra_src != NULL) { - ra_src->req_rpc_data = NULL; - } - - return NULL; - } - - nxt_mp_retain(mp); - - nxt_request_app_link_init(task, req_app_link, req_rpc_data); - - if (ra_src != NULL) { - req_app_link->body_fd = ra_src->body_fd; - } - - req_app_link->mem_pool = mp; - - return req_app_link; -} - - nxt_inline nxt_bool_t nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info, uint32_t stream) @@ -614,198 +497,6 @@ nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info, } -static void -nxt_request_app_link_update_peer_handler(nxt_task_t *task, void *obj, - void *data) -{ - nxt_request_app_link_t *req_app_link; - - req_app_link = obj; - - nxt_request_app_link_update_peer(task, req_app_link); - - nxt_request_app_link_use(task, req_app_link, -1); -} - - -static void -nxt_request_app_link_update_peer(nxt_task_t *task, - nxt_request_app_link_t *req_app_link) -{ - nxt_event_engine_t *engine; - nxt_request_rpc_data_t *req_rpc_data; - - engine = req_app_link->work.data; - - if (task->thread->engine != engine) { - nxt_request_app_link_inc_use(req_app_link); - - req_app_link->work.handler = nxt_request_app_link_update_peer_handler; - req_app_link->work.task = &engine->task; - req_app_link->work.next = NULL; - - nxt_debug(task, "req_app_link stream #%uD post update peer to %p", - req_app_link->stream, engine); - - nxt_event_engine_post(engine, &req_app_link->work); - - return; - } - - nxt_debug(task, "req_app_link stream #%uD update peer", - req_app_link->stream); - - req_rpc_data = req_app_link->req_rpc_data; - - if (req_rpc_data != NULL && req_app_link->app_port != NULL) { - nxt_port_rpc_ex_set_peer(task, engine->port, req_rpc_data, - req_app_link->app_port->pid); - } -} - - -static void -nxt_request_app_link_release(nxt_task_t *task, - nxt_request_app_link_t *req_app_link) -{ - nxt_mp_t *mp; - nxt_http_request_t *r; - nxt_request_rpc_data_t *req_rpc_data; - - nxt_assert(task->thread->engine == req_app_link->work.data); - nxt_assert(req_app_link->use_count == 0); - - nxt_debug(task, "req_app_link stream #%uD release", req_app_link->stream); - - req_rpc_data = req_app_link->req_rpc_data; - - if (req_rpc_data != NULL) { - if (nxt_slow_path(req_app_link->err_code != 0)) { - nxt_http_request_error(task, req_rpc_data->request, - req_app_link->err_code); - - } else { - req_rpc_data->app_port = req_app_link->app_port; - req_rpc_data->apr_action = req_app_link->apr_action; - req_rpc_data->msg_info = req_app_link->msg_info; - - if (req_rpc_data->app->timeout != 0) { - r = req_rpc_data->request; - - r->timer.handler = nxt_router_app_timeout; - r->timer_data = req_rpc_data; - nxt_timer_add(task->thread->engine, &r->timer, - req_rpc_data->app->timeout); - } - - req_app_link->app_port = NULL; - req_app_link->msg_info.buf = NULL; - } - - req_rpc_data->req_app_link = NULL; - req_app_link->req_rpc_data = NULL; - } - - if (req_app_link->app_port != NULL) { - nxt_router_app_port_release(task, req_app_link->app_port, - req_app_link->apr_action); - - req_app_link->app_port = NULL; - } - - if (req_app_link->body_fd != -1) { - nxt_fd_close(req_app_link->body_fd); - - req_app_link->body_fd = -1; - } - - nxt_router_msg_cancel(task, &req_app_link->msg_info, req_app_link->stream); - - mp = req_app_link->mem_pool; - - if (mp != NULL) { - nxt_mp_free(mp, req_app_link); - nxt_mp_release(mp); - } -} - - -static void -nxt_request_app_link_release_handler(nxt_task_t *task, void *obj, void *data) -{ - nxt_request_app_link_t *req_app_link; - - req_app_link = obj; - - nxt_assert(req_app_link->work.data == data); - - nxt_request_app_link_use(task, req_app_link, -1); -} - - -static void -nxt_request_app_link_use(nxt_task_t *task, nxt_request_app_link_t *req_app_link, - int i) -{ - int c; - nxt_event_engine_t *engine; - - c = nxt_atomic_fetch_add(&req_app_link->use_count, i); - - if (i < 0 && c == -i) { - engine = req_app_link->work.data; - - if (task->thread->engine == engine) { - nxt_request_app_link_release(task, req_app_link); - - return; - } - - nxt_request_app_link_inc_use(req_app_link); - - req_app_link->work.handler = nxt_request_app_link_release_handler; - req_app_link->work.task = &engine->task; - req_app_link->work.next = NULL; - - nxt_debug(task, "req_app_link stream #%uD post release to %p", - req_app_link->stream, engine); - - nxt_event_engine_post(engine, &req_app_link->work); - } -} - - -nxt_inline void -nxt_request_app_link_error(nxt_task_t *task, nxt_app_t *app, - nxt_request_app_link_t *req_app_link, const char *str) -{ - req_app_link->app_port = NULL; - req_app_link->err_code = 500; - req_app_link->err_str = str; - - nxt_alert(task, "app \"%V\" internal error: %s on #%uD", - &app->name, str, req_app_link->stream); -} - - -nxt_inline void -nxt_request_app_link_pending(nxt_task_t *task, nxt_app_t *app, - nxt_request_app_link_t *req_app_link) -{ - nxt_queue_insert_tail(&req_app_link->app_port->pending_requests, - &req_app_link->link_port_pending); - nxt_queue_insert_tail(&app->pending, &req_app_link->link_app_pending); - - nxt_request_app_link_inc_use(req_app_link); - - req_app_link->res_time = nxt_thread_monotonic_time(task->thread) - + app->res_timeout; - - nxt_debug(task, "req_app_link stream #%uD enqueue to pending_requests", - req_app_link->stream); -} - - nxt_inline nxt_bool_t nxt_queue_chk_remove(nxt_queue_link_t *lnk) { @@ -825,8 +516,9 @@ nxt_inline void nxt_request_rpc_data_unlink(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data) { - int ra_use_delta; - nxt_request_app_link_t *req_app_link; + nxt_http_request_t *r; + + nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream); if (req_rpc_data->app_port != NULL) { nxt_router_app_port_release(task, req_rpc_data->app_port, @@ -835,53 +527,34 @@ nxt_request_rpc_data_unlink(nxt_task_t *task, req_rpc_data->app_port = NULL; } - nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream); - - req_app_link = req_rpc_data->req_app_link; - if (req_app_link != NULL) { - req_rpc_data->req_app_link = NULL; - req_app_link->req_rpc_data = NULL; - - ra_use_delta = 0; - - nxt_thread_mutex_lock(&req_rpc_data->app->mutex); + if (req_rpc_data->app != NULL) { + nxt_router_app_use(task, req_rpc_data->app, -1); - if (req_app_link->link_app_requests.next == NULL - && req_app_link->link_port_pending.next == NULL - && req_app_link->link_app_pending.next == NULL - && req_app_link->link_port_websockets.next == NULL) - { - req_app_link = NULL; + req_rpc_data->app = NULL; + } - } else { - ra_use_delta -= - nxt_queue_chk_remove(&req_app_link->link_app_requests) - + nxt_queue_chk_remove(&req_app_link->link_port_pending) - + nxt_queue_chk_remove(&req_app_link->link_port_websockets); + r = req_rpc_data->request; - nxt_queue_chk_remove(&req_app_link->link_app_pending); - } + if (r != NULL) { + r->timer_data = NULL; - nxt_thread_mutex_unlock(&req_rpc_data->app->mutex); + nxt_router_http_request_release_post(task, r); - if (req_app_link != NULL) { - nxt_request_app_link_use(task, req_app_link, ra_use_delta); - } + r->req_rpc_data = NULL; + req_rpc_data->request = NULL; } - if (req_rpc_data->app != NULL) { - nxt_router_app_use(task, req_rpc_data->app, -1); + if (req_rpc_data->msg_info.body_fd != -1) { + nxt_fd_close(req_rpc_data->msg_info.body_fd); - req_rpc_data->app = NULL; + req_rpc_data->msg_info.body_fd = -1; } - if (req_rpc_data->request != NULL) { - req_rpc_data->request->timer_data = NULL; - - nxt_router_http_request_done(task, req_rpc_data->request); + if (req_rpc_data->rpc_cancel) { + req_rpc_data->rpc_cancel = 0; - req_rpc_data->request->req_rpc_data = NULL; - req_rpc_data->request = NULL; + nxt_port_rpc_cancel(task, task->thread->engine->port, + req_rpc_data->stream); } } @@ -889,25 +562,62 @@ nxt_request_rpc_data_unlink(nxt_task_t *task, void nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { + nxt_app_t *app; + nxt_port_t *port, *main_app_port; + nxt_runtime_t *rt; + nxt_port_new_port_handler(task, msg); - if (msg->u.new_port != NULL - && msg->u.new_port->type == NXT_PROCESS_CONTROLLER) - { + port = msg->u.new_port; + + if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) { nxt_router_greet_controller(task, msg->u.new_port); } - if (msg->port_msg.stream == 0) { - return; - } + if (port == NULL || port->type != NXT_PROCESS_APP) { + + if (msg->port_msg.stream == 0) { + return; + } - if (msg->u.new_port == NULL - || msg->u.new_port->type != NXT_PROCESS_APP) - { msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; } - nxt_port_rpc_handler(task, msg); + if (msg->port_msg.stream != 0) { + nxt_port_rpc_handler(task, msg); + return; + } + + /* + * Port with "id == 0" is application 'main' port and it always + * should come with non-zero stream. + */ + nxt_assert(port->id != 0); + + /* Find 'main' app port and get app reference. */ + rt = task->thread->runtime; + + /* + * It is safe to access 'runtime->ports' hash because 'NEW_PORT' + * sent to main port (with id == 0) and processed in main thread. + */ + main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0); + nxt_assert(main_app_port != NULL); + + app = main_app_port->app; + nxt_assert(app != NULL); + + nxt_thread_mutex_lock(&app->mutex); + + /* TODO here should be find-and-add code because there can be + port waiters in port_hash */ + nxt_port_hash_add(&app->port_hash, port); + app->port_hash_count++; + + nxt_thread_mutex_unlock(&app->mutex); + + port->app = app; + port->main_app_port = main_app_port; } @@ -1100,8 +810,10 @@ nxt_router_app_can_start(nxt_app_t *app) nxt_inline nxt_bool_t nxt_router_app_need_start(nxt_app_t *app) { - return app->idle_processes + app->pending_processes - < app->spare_processes; + return (app->active_requests + > app->port_hash_count + app->pending_processes) + || (app->spare_processes + > app->idle_processes + app->pending_processes); } @@ -1530,6 +1242,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_app_t *app, *prev; nxt_str_t *t, *s, *targets; nxt_uint_t n, i; + nxt_port_t *port; nxt_router_t *router; nxt_app_joint_t *app_joint; nxt_conf_value_t *conf, *http, *value, *websocket; @@ -1744,8 +1457,6 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_queue_init(&app->ports); nxt_queue_init(&app->spare_ports); nxt_queue_init(&app->idle_ports); - nxt_queue_init(&app->requests); - nxt_queue_init(&app->pending); app->name.length = name.length; nxt_memcpy(app->name.start, name.start, name.length); @@ -1758,7 +1469,6 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, app->timeout = apcf.timeout; app->res_timeout = apcf.res_timeout * 1000000; app->idle_timeout = apcf.idle_timeout; - app->max_pending_responses = 2; app->max_requests = apcf.requests; app->targets = targets; @@ -1789,6 +1499,25 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, app_joint->free_app_work.handler = nxt_router_free_app; app_joint->free_app_work.task = &engine->task; app_joint->free_app_work.obj = app_joint; + + port = nxt_port_new(task, (nxt_port_id_t) -1, nxt_pid, + NXT_PROCESS_APP); + if (nxt_slow_path(port == NULL)) { + return NXT_ERROR; + } + + ret = nxt_port_socket_init(task, port, 0); + if (nxt_slow_path(ret != NXT_OK)) { + nxt_port_use(task, port, -1); + return NXT_ERROR; + } + + nxt_port_write_enable(task, port); + port->app = app; + + app->shared_port = port; + + nxt_thread_mutex_create(&app->outgoing.mutex); } } @@ -2522,7 +2251,13 @@ nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, app = rpc->app; port = msg->u.new_port; + + nxt_assert(port != NULL); + nxt_assert(port->type == NXT_PROCESS_APP); + nxt_assert(port->id == 0); + port->app = app; + port->main_app_port = port; app->pending_processes--; app->processes++; @@ -2532,11 +2267,15 @@ nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_queue_insert_tail(&app->ports, &port->app_link); nxt_queue_insert_tail(&app->spare_ports, &port->idle_link); + nxt_port_hash_add(&app->port_hash, port); + app->port_hash_count++; port->idle_start = 0; nxt_port_inc_use(port); + nxt_router_app_shared_port_send(task, port); + nxt_work_queue_add(&engine->fast_work_queue, nxt_router_conf_apply, task, rpc->temp_conf, NULL); } @@ -2939,10 +2678,11 @@ nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs) static nxt_port_handlers_t nxt_router_app_port_handlers = { - .rpc_error = nxt_port_rpc_handler, - .mmap = nxt_port_mmap_handler, - .data = nxt_port_rpc_handler, - .oosm = nxt_router_oosm_handler, + .rpc_error = nxt_port_rpc_handler, + .mmap = nxt_port_mmap_handler, + .data = nxt_port_rpc_handler, + .oosm = nxt_router_oosm_handler, + .req_headers_ack = nxt_port_rpc_handler, }; @@ -3736,22 +3476,17 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) { nxt_int_t ret; + nxt_app_t *app; nxt_buf_t *b, *next; nxt_port_t *app_port; nxt_unit_field_t *f; nxt_http_field_t *field; nxt_http_request_t *r; nxt_unit_response_t *resp; - nxt_request_app_link_t *req_app_link; nxt_request_rpc_data_t *req_rpc_data; - b = msg->buf; req_rpc_data = data; - if (msg->size == 0) { - b = NULL; - } - r = req_rpc_data->request; if (nxt_slow_path(r == NULL)) { return; @@ -3762,19 +3497,32 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, return; } + app = req_rpc_data->app; + nxt_assert(app != NULL); + + if (msg->port_msg.type == _NXT_PORT_MSG_REQ_HEADERS_ACK) { + nxt_router_req_headers_ack_handler(task, msg, req_rpc_data); + + return; + } + + b = (msg->size == 0) ? NULL : msg->buf; + if (msg->port_msg.last != 0) { nxt_debug(task, "router data create last buf"); nxt_buf_chain_add(&b, nxt_http_buf_last(r)); + req_rpc_data->rpc_cancel = 0; + req_rpc_data->apr_action = NXT_APR_GOT_RESPONSE; + nxt_request_rpc_data_unlink(task, req_rpc_data); } else { - if (req_rpc_data->app != NULL && req_rpc_data->app->timeout != 0) { + if (app->timeout != 0) { r->timer.handler = nxt_router_app_timeout; r->timer_data = req_rpc_data; - nxt_timer_add(task->thread->engine, &r->timer, - req_rpc_data->app->timeout); + nxt_timer_add(task->thread->engine, &r->timer, app->timeout); } } @@ -3870,39 +3618,21 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, if (r->websocket_handshake && r->status == NXT_HTTP_SWITCHING_PROTOCOLS) { - req_app_link = nxt_request_app_link_alloc(task, - req_rpc_data->req_app_link, - req_rpc_data); - if (nxt_slow_path(req_app_link == NULL)) { - goto fail; - } - - app_port = req_app_link->app_port; - - if (app_port == NULL && req_rpc_data->app_port != NULL) { - req_app_link->app_port = req_rpc_data->app_port; - app_port = req_app_link->app_port; - req_app_link->apr_action = req_rpc_data->apr_action; - - req_rpc_data->app_port = NULL; - } - + app_port = req_rpc_data->app_port; if (nxt_slow_path(app_port == NULL)) { goto fail; } - nxt_thread_mutex_lock(&req_rpc_data->app->mutex); + nxt_thread_mutex_lock(&app->mutex); - nxt_queue_insert_tail(&app_port->active_websockets, - &req_app_link->link_port_websockets); + app_port->main_app_port->active_websockets++; - nxt_thread_mutex_unlock(&req_rpc_data->app->mutex); + nxt_thread_mutex_unlock(&app->mutex); nxt_router_app_port_release(task, app_port, NXT_APR_UPGRADE); - req_app_link->apr_action = NXT_APR_CLOSE; + req_rpc_data->apr_action = NXT_APR_CLOSE; - nxt_debug(task, "req_app_link stream #%uD upgrade", - req_app_link->stream); + nxt_debug(task, "stream #%uD upgrade", req_rpc_data->stream); r->state = &nxt_http_websocket; @@ -3921,6 +3651,94 @@ fail: } +static void +nxt_router_req_headers_ack_handler(nxt_task_t *task, + nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data) +{ + nxt_app_t *app; + nxt_bool_t start_process; + nxt_port_t *app_port, *main_app_port, *idle_port; + nxt_queue_link_t *idle_lnk; + nxt_http_request_t *r; + + nxt_debug(task, "stream #%uD: got ack from %PI:%d", + req_rpc_data->stream, + msg->port_msg.pid, msg->port_msg.reply_port); + + nxt_port_rpc_ex_set_peer(task, msg->port, req_rpc_data, + msg->port_msg.pid); + + app = req_rpc_data->app; + + start_process = 0; + + nxt_thread_mutex_lock(&app->mutex); + + app_port = nxt_port_hash_find(&app->port_hash, msg->port_msg.pid, + msg->port_msg.reply_port); + if (nxt_slow_path(app_port == NULL)) { + nxt_thread_mutex_unlock(&app->mutex); + + r = req_rpc_data->request; + nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); + + return; + } + + main_app_port = app_port->main_app_port; + + if (nxt_queue_chk_remove(&main_app_port->idle_link)) { + app->idle_processes--; + + /* Check port was in 'spare_ports' using idle_start field. */ + if (main_app_port->idle_start == 0 + && app->idle_processes >= app->spare_processes) + { + /* + * If there is a vacant space in spare ports, + * move the last idle to spare_ports. + */ + nxt_assert(!nxt_queue_is_empty(&app->idle_ports)); + + idle_lnk = nxt_queue_last(&app->idle_ports); + idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link); + nxt_queue_remove(idle_lnk); + + nxt_queue_insert_tail(&app->spare_ports, idle_lnk); + + idle_port->idle_start = 0; + } + + if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) { + app->pending_processes++; + start_process = 1; + } + } + + main_app_port->active_requests++; + + nxt_port_inc_use(app_port); + + nxt_thread_mutex_unlock(&app->mutex); + + if (start_process) { + nxt_router_start_app_process(task, app); + } + + nxt_port_use(task, req_rpc_data->app_port, -1); + + req_rpc_data->app_port = app_port; + + if (app->timeout != 0) { + r = req_rpc_data->request; + + r->timer.handler = nxt_router_app_timeout; + r->timer_data = req_rpc_data; + nxt_timer_add(task->thread->engine, &r->timer, app->timeout); + } +} + + static const nxt_http_request_state_t nxt_http_request_send_state nxt_aligned(64) = { @@ -3949,42 +3767,14 @@ static void nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) { - nxt_int_t res; - nxt_port_t *port; - nxt_bool_t cancelled; - nxt_request_app_link_t *req_app_link; nxt_request_rpc_data_t *req_rpc_data; req_rpc_data = data; - req_app_link = req_rpc_data->req_app_link; - - if (req_app_link != NULL) { - cancelled = nxt_router_msg_cancel(task, &req_app_link->msg_info, - req_app_link->stream); - if (cancelled) { - res = nxt_router_app_port(task, req_rpc_data->app, req_app_link); - - if (res == NXT_OK) { - port = req_app_link->app_port; - - if (nxt_slow_path(port == NULL)) { - nxt_log(task, NXT_LOG_ERR, - "port is NULL in cancelled req_app_link"); - return; - } - - nxt_port_rpc_ex_set_peer(task, task->thread->engine->port, - req_rpc_data, port->pid); - - nxt_router_app_prepare_request(task, req_app_link); - } + req_rpc_data->rpc_cancel = 0; - msg->port_msg.last = 0; - - return; - } - } + /* TODO cancel message and return if cancelled. */ + // nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream); if (req_rpc_data->request != NULL) { nxt_http_request_error(task, req_rpc_data->request, @@ -4008,6 +3798,8 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_assert(app_joint != NULL); nxt_assert(port != NULL); + nxt_assert(port->type == NXT_PROCESS_APP); + nxt_assert(port->id == 0); app = app_joint->app; @@ -4022,6 +3814,7 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, } port->app = app; + port->main_app_port = port; nxt_thread_mutex_lock(&app->mutex); @@ -4029,24 +3822,60 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, app->pending_processes--; app->processes++; + nxt_port_hash_add(&app->port_hash, port); + app->port_hash_count++; nxt_thread_mutex_unlock(&app->mutex); nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d", &app->name, port->pid, app->processes, app->pending_processes); + nxt_router_app_shared_port_send(task, port); + nxt_router_app_port_release(task, port, NXT_APR_NEW_PORT); } +static nxt_int_t +nxt_router_app_shared_port_send(nxt_task_t *task, nxt_port_t *app_port) +{ + nxt_buf_t *b; + nxt_port_t *port; + nxt_port_msg_new_port_t *msg; + + b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, + sizeof(nxt_port_data_t)); + if (nxt_slow_path(b == NULL)) { + return NXT_ERROR; + } + + port = app_port->app->shared_port; + + nxt_debug(task, "send port %FD to process %PI", + port->pair[0], app_port->pid); + + b->mem.free += sizeof(nxt_port_msg_new_port_t); + msg = (nxt_port_msg_new_port_t *) b->mem.pos; + + msg->id = port->id; + msg->pid = port->pid; + msg->max_size = port->max_size; + msg->max_share = port->max_share; + msg->type = port->type; + + return nxt_port_socket_twrite(task, app_port, + NXT_PORT_MSG_NEW_PORT, + port->pair[0], + 0, 0, b, NULL); +} + + static void nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) { - nxt_app_t *app; - nxt_app_joint_t *app_joint; - nxt_queue_link_t *lnk; - nxt_request_app_link_t *req_app_link; + nxt_app_t *app; + nxt_app_joint_t *app_joint; app_joint = data; @@ -4070,32 +3899,11 @@ nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, app->pending_processes--; - if (!nxt_queue_is_empty(&app->requests)) { - lnk = nxt_queue_last(&app->requests); - nxt_queue_remove(lnk); - lnk->next = NULL; - - req_app_link = nxt_queue_link_data(lnk, nxt_request_app_link_t, - link_app_requests); - - } else { - req_app_link = NULL; - } - nxt_thread_mutex_unlock(&app->mutex); - if (req_app_link != NULL) { - nxt_debug(task, "app '%V' %p abort next stream #%uD", - &app->name, app, req_app_link->stream); - - nxt_request_app_link_error(task, app, req_app_link, - "Failed to start application process"); - nxt_request_app_link_use(task, req_app_link, -1); - } + /* TODO req_app_link to cancel first pending message */ } -nxt_inline nxt_port_t * -nxt_router_app_get_port_for_quit(nxt_app_t *app); void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i) @@ -4116,63 +3924,6 @@ nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i) } -nxt_inline nxt_bool_t -nxt_router_app_first_port_busy(nxt_app_t *app) -{ - nxt_port_t *port; - nxt_queue_link_t *lnk; - - lnk = nxt_queue_first(&app->ports); - port = nxt_queue_link_data(lnk, nxt_port_t, app_link); - - return port->app_pending_responses > 0; -} - - -nxt_inline nxt_port_t * -nxt_router_pop_first_port(nxt_app_t *app) -{ - nxt_port_t *port; - nxt_queue_link_t *lnk; - - lnk = nxt_queue_first(&app->ports); - nxt_queue_remove(lnk); - - port = nxt_queue_link_data(lnk, nxt_port_t, app_link); - - port->app_pending_responses++; - - if (nxt_queue_chk_remove(&port->idle_link)) { - app->idle_processes--; - - if (port->idle_start == 0) { - nxt_assert(app->idle_processes < app->spare_processes); - - } else { - nxt_assert(app->idle_processes >= app->spare_processes); - - port->idle_start = 0; - } - } - - if ((app->max_pending_responses == 0 - || port->app_pending_responses < app->max_pending_responses) - && (app->max_requests == 0 - || port->app_responses + port->app_pending_responses - < app->max_requests)) - { - nxt_queue_insert_tail(&app->ports, lnk); - - nxt_port_inc_use(port); - - } else { - lnk->next = NULL; - } - - return port; -} - - nxt_inline nxt_port_t * nxt_router_app_get_port_for_quit(nxt_app_t *app) { @@ -4184,12 +3935,6 @@ nxt_router_app_get_port_for_quit(nxt_app_t *app) nxt_queue_each(port, &app->ports, nxt_port_t, app_link) { - if (port->app_pending_responses > 0) { - port = NULL; - - continue; - } - /* Caller is responsible to decrease port use count. */ nxt_queue_chk_remove(&port->app_link); @@ -4197,6 +3942,9 @@ nxt_router_app_get_port_for_quit(nxt_app_t *app) app->idle_processes--; } + nxt_port_hash_remove(&app->port_hash, port); + app->port_hash_count--; + port->app = NULL; app->processes--; @@ -4222,71 +3970,36 @@ nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app) static void -nxt_router_app_process_request(nxt_task_t *task, void *obj, void *data) -{ - nxt_request_app_link_t *req_app_link; - - req_app_link = data; - -#if (NXT_DEBUG) - { - nxt_app_t *app; - - app = obj; - - nxt_assert(app != NULL); - nxt_assert(req_app_link != NULL); - nxt_assert(req_app_link->app_port != NULL); - - nxt_debug(task, "app '%V' %p process next stream #%uD", - &app->name, app, req_app_link->stream); - } -#endif - - nxt_router_app_prepare_request(task, req_app_link); - - nxt_request_app_link_use(task, req_app_link, -1); -} - - -static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, nxt_apr_action_t action) { - int inc_use; - uint32_t dec_pending, got_response; - nxt_app_t *app; - nxt_bool_t port_unchained; - nxt_bool_t send_quit, cancelled, adjust_idle_timer; - nxt_queue_link_t *lnk; - nxt_request_app_link_t *req_app_link, *pending_ra, *re_ra; - nxt_port_select_state_t state; + int inc_use; + uint32_t got_response, dec_requests; + nxt_app_t *app; + nxt_bool_t port_unchained, send_quit, adjust_idle_timer; + nxt_port_t *main_app_port; nxt_assert(port != NULL); nxt_assert(port->app != NULL); - req_app_link = NULL; - app = port->app; inc_use = 0; - dec_pending = 0; got_response = 0; + dec_requests = 0; switch (action) { case NXT_APR_NEW_PORT: break; case NXT_APR_REQUEST_FAILED: - dec_pending = 1; + dec_requests = 1; inc_use = -1; break; case NXT_APR_GOT_RESPONSE: - dec_pending = 1; got_response = 1; inc_use = -1; break; case NXT_APR_UPGRADE: - dec_pending = 1; got_response = 1; break; case NXT_APR_CLOSE: @@ -4294,120 +4007,49 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, break; } - nxt_thread_mutex_lock(&app->mutex); - - port->app_pending_responses -= dec_pending; - port->app_responses += got_response; + nxt_debug(task, "app '%V' release port %PI:%d: %d %d", &app->name, + port->pid, port->id, + (int) inc_use, (int) got_response); - if (port->pair[1] != -1 - && (app->max_pending_responses == 0 - || port->app_pending_responses < app->max_pending_responses) - && (app->max_requests == 0 - || port->app_responses + port->app_pending_responses - < app->max_requests)) - { - if (port->app_link.next == NULL) { - if (port->app_pending_responses > 0) { - nxt_queue_insert_tail(&app->ports, &port->app_link); + if (port == app->shared_port) { + nxt_thread_mutex_lock(&app->mutex); - } else { - nxt_queue_insert_head(&app->ports, &port->app_link); - } + app->active_requests -= got_response + dec_requests; - nxt_port_inc_use(port); + nxt_thread_mutex_unlock(&app->mutex); - } else { - if (port->app_pending_responses == 0 - && nxt_queue_first(&app->ports) != &port->app_link) - { - nxt_queue_remove(&port->app_link); - nxt_queue_insert_head(&app->ports, &port->app_link); - } - } + goto adjust_use; } - if (!nxt_queue_is_empty(&app->ports) - && !nxt_queue_is_empty(&app->requests)) - { - lnk = nxt_queue_first(&app->requests); - nxt_queue_remove(lnk); - lnk->next = NULL; - - req_app_link = nxt_queue_link_data(lnk, nxt_request_app_link_t, - link_app_requests); + main_app_port = port->main_app_port; - req_app_link->app_port = nxt_router_pop_first_port(app); + nxt_thread_mutex_lock(&app->mutex); - if (req_app_link->app_port->app_pending_responses > 1) { - nxt_request_app_link_pending(task, app, req_app_link); - } - } + main_app_port->app_responses += got_response; + main_app_port->active_requests -= got_response + dec_requests; + app->active_requests -= got_response + dec_requests; - /* Pop first pending request for this port. */ - if (dec_pending > 0 - && !nxt_queue_is_empty(&port->pending_requests)) + if (main_app_port->pair[1] != -1 + && (app->max_requests == 0 + || main_app_port->app_responses < app->max_requests)) { - lnk = nxt_queue_first(&port->pending_requests); - nxt_queue_remove(lnk); - lnk->next = NULL; - - pending_ra = nxt_queue_link_data(lnk, nxt_request_app_link_t, - link_port_pending); - - nxt_assert(pending_ra->link_app_pending.next != NULL); - - nxt_queue_remove(&pending_ra->link_app_pending); - pending_ra->link_app_pending.next = NULL; - - } else { - pending_ra = NULL; - } - - /* Try to cancel and re-schedule first stalled request for this app. */ - if (got_response > 0 && !nxt_queue_is_empty(&app->pending)) { - lnk = nxt_queue_first(&app->pending); - - re_ra = nxt_queue_link_data(lnk, nxt_request_app_link_t, - link_app_pending); - - if (re_ra->res_time <= nxt_thread_monotonic_time(task->thread)) { + if (main_app_port->app_link.next == NULL) { + nxt_queue_insert_tail(&app->ports, &main_app_port->app_link); - nxt_debug(task, "app '%V' stalled request #%uD detected", - &app->name, re_ra->stream); - - cancelled = nxt_router_msg_cancel(task, &re_ra->msg_info, - re_ra->stream); - - if (cancelled) { - state.req_app_link = re_ra; - state.app = app; - - /* - * Need to increment use count "in advance" because - * nxt_router_port_select() will remove re_ra from lists - * and decrement use count. - */ - nxt_request_app_link_inc_use(re_ra); - - nxt_router_port_select(task, &state); - - goto re_ra_cancelled; - } + nxt_port_inc_use(main_app_port); } } - re_ra = NULL; - -re_ra_cancelled: - send_quit = (app->max_requests > 0 - && port->app_pending_responses == 0 - && port->app_responses >= app->max_requests); + && main_app_port->app_responses >= app->max_requests); if (send_quit) { - port_unchained = nxt_queue_chk_remove(&port->app_link); + port_unchained = nxt_queue_chk_remove(&main_app_port->app_link); - port->app = NULL; + nxt_port_hash_remove(&app->port_hash, main_app_port); + app->port_hash_count--; + + main_app_port->app = NULL; app->processes--; } else { @@ -4416,9 +4058,10 @@ re_ra_cancelled: adjust_idle_timer = 0; - if (port->pair[1] != -1 && !send_quit && port->app_pending_responses == 0 - && nxt_queue_is_empty(&port->active_websockets) - && port->idle_link.next == NULL) + if (main_app_port->pair[1] != -1 && !send_quit + && main_app_port->active_requests == 0 + && main_app_port->active_websockets == 0 + && main_app_port->idle_link.next == NULL) { if (app->idle_processes == app->spare_processes && app->adjust_idle_work.data == NULL) @@ -4429,12 +4072,12 @@ re_ra_cancelled: } if (app->idle_processes < app->spare_processes) { - nxt_queue_insert_tail(&app->spare_ports, &port->idle_link); + nxt_queue_insert_tail(&app->spare_ports, &main_app_port->idle_link); } else { - nxt_queue_insert_tail(&app->idle_ports, &port->idle_link); + nxt_queue_insert_tail(&app->idle_ports, &main_app_port->idle_link); - port->idle_start = task->thread->engine->timers.now; + main_app_port->idle_start = task->thread->engine->timers.now; } app->idle_processes++; @@ -4447,60 +4090,22 @@ re_ra_cancelled: nxt_event_engine_post(app->engine, &app->adjust_idle_work); } - if (pending_ra != NULL) { - nxt_request_app_link_use(task, pending_ra, -1); - } - - if (re_ra != NULL) { - if (nxt_router_port_post_select(task, &state) == NXT_OK) { - /* - * Reference counter already incremented above, this will - * keep re_ra while nxt_router_app_process_request() - * task is in queue. Reference counter decreased in - * nxt_router_app_process_request() after processing. - */ - - nxt_work_queue_add(&task->thread->engine->fast_work_queue, - nxt_router_app_process_request, - &task->thread->engine->task, app, re_ra); - - } else { - nxt_request_app_link_use(task, re_ra, -1); - } - } - - if (req_app_link != NULL) { - /* - * There should be call nxt_request_app_link_inc_use(req_app_link), - * because of one more link in the queue. But one link was - * recently removed from app->requests linked list. - * Corresponding decrement is in nxt_router_app_process_request(). - */ - - nxt_work_queue_add(&task->thread->engine->fast_work_queue, - nxt_router_app_process_request, - &task->thread->engine->task, app, req_app_link); - - goto adjust_use; - } - /* ? */ - if (port->pair[1] == -1) { + if (main_app_port->pair[1] == -1) { nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)", - &app->name, app, port, port->pid); + &app->name, app, main_app_port, main_app_port->pid); goto adjust_use; } if (send_quit) { - nxt_debug(task, "app '%V' %p send QUIT to port", - &app->name, app); + nxt_debug(task, "app '%V' %p send QUIT to port", &app->name, app); - nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, - -1, 0, 0, NULL); + nxt_port_socket_write(task, main_app_port, NXT_PORT_MSG_QUIT, -1, 0, 0, + NULL); if (port_unchained) { - nxt_port_use(task, port, -1); + nxt_port_use(task, main_app_port, -1); } goto adjust_use; @@ -4529,6 +4134,18 @@ nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port) nxt_thread_mutex_lock(&app->mutex); + nxt_port_hash_remove(&app->port_hash, port); + app->port_hash_count--; + + if (port->id != 0) { + nxt_thread_mutex_unlock(&app->mutex); + + nxt_debug(task, "app '%V' port (%PI, %d) closed", &app->name, + port->pid, port->id); + + return; + } + unchain = nxt_queue_chk_remove(&port->app_link); if (nxt_queue_chk_remove(&port->idle_link)) { @@ -4553,8 +4170,7 @@ nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port) start_process = !task->thread->engine->shutdown && nxt_router_app_can_start(app) - && (!nxt_queue_is_empty(&app->requests) - || nxt_router_app_need_start(app)); + && nxt_router_app_need_start(app); if (start_process) { app->pending_processes++; @@ -4603,6 +4219,10 @@ nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data) app->adjust_idle_work.data = NULL; } + nxt_debug(task, "app '%V' idle_processes %d, spare_processes %d", + &app->name, + (int) app->idle_processes, (int) app->spare_processes); + while (app->idle_processes > app->spare_processes) { nxt_assert(!nxt_queue_is_empty(&app->idle_ports)); @@ -4612,6 +4232,10 @@ nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data) timeout = port->idle_start + app->idle_timeout; + nxt_debug(task, "app '%V' pid %PI, start %M, timeout %M, threshold %M", + &app->name, port->pid, + port->idle_start, timeout, threshold); + if (timeout > threshold) { break; } @@ -4621,6 +4245,9 @@ nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data) nxt_queue_chk_remove(&port->app_link); + nxt_port_hash_remove(&app->port_hash, port); + app->port_hash_count--; + app->idle_processes--; app->processes--; port->app = NULL; @@ -4704,12 +4331,23 @@ nxt_router_free_app(nxt_task_t *task, void *obj, void *data) } nxt_assert(app->processes == 0); + nxt_assert(app->active_requests == 0); + nxt_assert(app->port_hash_count == 0); nxt_assert(app->idle_processes == 0); - nxt_assert(nxt_queue_is_empty(&app->requests)); nxt_assert(nxt_queue_is_empty(&app->ports)); nxt_assert(nxt_queue_is_empty(&app->spare_ports)); nxt_assert(nxt_queue_is_empty(&app->idle_ports)); + nxt_port_mmaps_destroy(&app->outgoing, 1); + + nxt_thread_mutex_destroy(&app->outgoing.mutex); + + if (app->shared_port != NULL) { + app->shared_port->app = NULL; + nxt_port_close(task, app->shared_port); + nxt_port_use(task, app->shared_port, -1); + } + nxt_thread_mutex_destroy(&app->mutex); nxt_mp_destroy(app->mem_pool); @@ -4726,178 +4364,34 @@ nxt_router_free_app(nxt_task_t *task, void *obj, void *data) static void -nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state) -{ - int ra_use_delta; - nxt_app_t *app; - nxt_bool_t can_start_process; - nxt_request_app_link_t *req_app_link; - - req_app_link = state->req_app_link; - app = state->app; - - state->failed_port_use_delta = 0; - ra_use_delta = -nxt_queue_chk_remove(&req_app_link->link_app_requests); - - if (nxt_queue_chk_remove(&req_app_link->link_port_pending)) - { - nxt_assert(req_app_link->link_app_pending.next != NULL); - - nxt_queue_remove(&req_app_link->link_app_pending); - req_app_link->link_app_pending.next = NULL; - - ra_use_delta--; - } - - state->failed_port = req_app_link->app_port; - - if (req_app_link->app_port != NULL) { - state->failed_port_use_delta--; - - state->failed_port->app_pending_responses--; - - if (nxt_queue_chk_remove(&state->failed_port->app_link)) { - state->failed_port_use_delta--; - } - - req_app_link->app_port = NULL; - } - - can_start_process = nxt_router_app_can_start(app); - - state->port = NULL; - state->start_process = 0; - - if (nxt_queue_is_empty(&app->ports) - || (can_start_process && nxt_router_app_first_port_busy(app)) ) - { - req_app_link = nxt_request_app_link_alloc(task, req_app_link, - req_app_link->req_rpc_data); - if (nxt_slow_path(req_app_link == NULL)) { - goto fail; - } - - if (nxt_slow_path(state->failed_port != NULL)) { - nxt_queue_insert_head(&app->requests, - &req_app_link->link_app_requests); - - } else { - nxt_queue_insert_tail(&app->requests, - &req_app_link->link_app_requests); - } - - nxt_request_app_link_inc_use(req_app_link); - - nxt_debug(task, "req_app_link stream #%uD enqueue to app->requests", - req_app_link->stream); - - if (can_start_process) { - app->pending_processes++; - state->start_process = 1; - } - - } else { - state->port = nxt_router_pop_first_port(app); - - if (state->port->app_pending_responses > 1) { - req_app_link = nxt_request_app_link_alloc(task, req_app_link, - req_app_link->req_rpc_data); - if (nxt_slow_path(req_app_link == NULL)) { - goto fail; - } - - req_app_link->app_port = state->port; - - nxt_request_app_link_pending(task, app, req_app_link); - } - - if (can_start_process && nxt_router_app_need_start(app)) { - app->pending_processes++; - state->start_process = 1; - } - } - - nxt_request_app_link_chk_use(req_app_link, ra_use_delta); - -fail: - - state->shared_ra = req_app_link; -} - - -static nxt_int_t -nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state) +nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app, + nxt_request_rpc_data_t *req_rpc_data) { - nxt_int_t res; - nxt_app_t *app; - nxt_request_app_link_t *req_app_link; - - req_app_link = state->shared_ra; - app = state->app; - - if (state->failed_port_use_delta != 0) { - nxt_port_use(task, state->failed_port, state->failed_port_use_delta); - } - - if (nxt_slow_path(req_app_link == NULL)) { - if (state->port != NULL) { - nxt_port_use(task, state->port, -1); - } - - nxt_request_app_link_error(task, app, state->req_app_link, - "Failed to allocate shared req<->app link"); - - return NXT_ERROR; - } - - if (state->port != NULL) { - nxt_debug(task, "already have port for app '%V' %p ", &app->name, app); + nxt_bool_t start_process; + nxt_port_t *port; - req_app_link->app_port = state->port; + start_process = 0; - if (state->start_process) { - nxt_router_start_app_process(task, app); - } + nxt_thread_mutex_lock(&app->mutex); - return NXT_OK; - } + port = app->shared_port; + nxt_port_inc_use(port); - if (!state->start_process) { - nxt_debug(task, "app '%V' %p too many running or pending processes", - &app->name, app); + app->active_requests++; - return NXT_AGAIN; + if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) { + app->pending_processes++; + start_process = 1; } - res = nxt_router_start_app_process(task, app); + nxt_thread_mutex_unlock(&app->mutex); - if (nxt_slow_path(res != NXT_OK)) { - nxt_request_app_link_error(task, app, req_app_link, - "Failed to start app process"); + req_rpc_data->app_port = port; + req_rpc_data->apr_action = NXT_APR_REQUEST_FAILED; - return NXT_ERROR; + if (start_process) { + nxt_router_start_app_process(task, app); } - - return NXT_AGAIN; -} - - -static nxt_int_t -nxt_router_app_port(nxt_task_t *task, nxt_app_t *app, - nxt_request_app_link_t *req_app_link) -{ - nxt_port_select_state_t state; - - state.req_app_link = req_app_link; - state.app = app; - - nxt_thread_mutex_lock(&app->mutex); - - nxt_router_port_select(task, &state); - - nxt_thread_mutex_unlock(&app->mutex); - - return nxt_router_port_post_select(task, &state); } @@ -4905,10 +4399,7 @@ void nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r, nxt_app_t *app) { - nxt_int_t res; - nxt_port_t *port; nxt_event_engine_t *engine; - nxt_request_app_link_t ra_local, *req_app_link; nxt_request_rpc_data_t *req_rpc_data; engine = task->thread->engine; @@ -4927,7 +4418,7 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r, * in port handlers. Need to fixup request memory pool. Counterpart * release will be called via following call chain: * nxt_request_rpc_data_unlink() -> - * nxt_router_http_request_done() -> + * nxt_router_http_request_release_post() -> * nxt_router_http_request_release() */ nxt_mp_retain(r->mem_pool); @@ -4939,29 +4430,37 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r, req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data); req_rpc_data->app = app; + req_rpc_data->msg_info.body_fd = -1; + req_rpc_data->rpc_cancel = 1; nxt_router_app_use(task, app, 1); req_rpc_data->request = r; r->req_rpc_data = req_rpc_data; - req_app_link = &ra_local; - nxt_request_app_link_init(task, req_app_link, req_rpc_data); + if (r->last != NULL) { + r->last->completion_handler = nxt_router_http_request_done; + } - res = nxt_router_app_port(task, app, req_app_link); - req_app_link = req_rpc_data->req_app_link; + nxt_router_app_port_get(task, app, req_rpc_data); + nxt_router_app_prepare_request(task, req_rpc_data); +} - if (res == NXT_OK) { - port = req_app_link->app_port; - nxt_assert(port != NULL); +static void +nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data) +{ + nxt_http_request_t *r; + + r = data; - nxt_port_rpc_ex_set_peer(task, engine->port, req_rpc_data, port->pid); + nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data); - nxt_router_app_prepare_request(task, req_app_link); + if (r->req_rpc_data) { + nxt_request_rpc_data_unlink(task, r->req_rpc_data); } - nxt_request_app_link_use(task, req_app_link, -1); + nxt_http_request_close_handler(task, r, r->proto.any); } @@ -4973,76 +4472,80 @@ nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj, void *data) static void nxt_router_app_prepare_request(nxt_task_t *task, - nxt_request_app_link_t *req_app_link) + nxt_request_rpc_data_t *req_rpc_data) { - nxt_buf_t *buf; + nxt_app_t *app; + nxt_buf_t *buf, *body; nxt_int_t res; nxt_port_t *port, *reply_port; - nxt_apr_action_t apr_action; - nxt_assert(req_app_link->app_port != NULL); + app = req_rpc_data->app; - port = req_app_link->app_port; - reply_port = req_app_link->reply_port; + nxt_assert(app != NULL); - apr_action = NXT_APR_REQUEST_FAILED; + port = req_rpc_data->app_port; - buf = nxt_router_prepare_msg(task, req_app_link->request, port, - nxt_app_msg_prefix[port->app->type]); + nxt_assert(port != NULL); + + reply_port = task->thread->engine->port; + buf = nxt_router_prepare_msg(task, req_rpc_data->request, app, + nxt_app_msg_prefix[app->type]); if (nxt_slow_path(buf == NULL)) { - nxt_request_app_link_error(task, port->app, req_app_link, - "Failed to prepare message for application"); - goto release_port; + nxt_alert(task, "stream #%uD, app '%V': failed to prepare app message", + req_rpc_data->stream, &app->name); + + nxt_http_request_error(task, req_rpc_data->request, + NXT_HTTP_INTERNAL_SERVER_ERROR); + + return; } nxt_debug(task, "about to send %O bytes buffer to app process port %d", nxt_buf_used_size(buf), port->socket.fd); - apr_action = NXT_APR_NEW_PORT; - - req_app_link->msg_info.buf = buf; - req_app_link->msg_info.completion_handler = buf->completion_handler; + req_rpc_data->msg_info.buf = buf; + req_rpc_data->msg_info.completion_handler = buf->completion_handler; - for (; buf; buf = buf->next) { + do { buf->completion_handler = nxt_router_dummy_buf_completion; - } + buf = buf->next; + } while (buf != NULL); - buf = req_app_link->msg_info.buf; + buf = req_rpc_data->msg_info.buf; - res = nxt_port_mmap_get_tracking(task, &port->process->outgoing, - &req_app_link->msg_info.tracking, - req_app_link->stream); - if (nxt_slow_path(res != NXT_OK)) { - nxt_request_app_link_error(task, port->app, req_app_link, - "Failed to get tracking area"); - goto release_port; - } + body = req_rpc_data->request->body; - if (req_app_link->body_fd != -1) { - nxt_debug(task, "stream #%uD: send body fd %d", req_app_link->stream, - req_app_link->body_fd); + if (body != NULL && nxt_buf_is_file(body)) { + req_rpc_data->msg_info.body_fd = body->file->fd; - lseek(req_app_link->body_fd, 0, SEEK_SET); + body->file->fd = -1; + + } else { + req_rpc_data->msg_info.body_fd = -1; } - res = nxt_port_socket_twrite(task, port, NXT_PORT_MSG_REQ_HEADERS, - req_app_link->body_fd, - req_app_link->stream, reply_port->id, buf, - &req_app_link->msg_info.tracking); + if (req_rpc_data->msg_info.body_fd != -1) { + nxt_debug(task, "stream #%uD: send body fd %d", req_rpc_data->stream, + req_rpc_data->msg_info.body_fd); - if (nxt_slow_path(res != NXT_OK)) { - nxt_request_app_link_error(task, port->app, req_app_link, - "Failed to send message to application"); - goto release_port; + lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET); } -release_port: + res = nxt_port_socket_twrite(task, port, + NXT_PORT_MSG_REQ_HEADERS, + req_rpc_data->msg_info.body_fd, + req_rpc_data->stream, reply_port->id, buf, + NULL); - nxt_router_app_port_release(task, port, apr_action); + if (nxt_slow_path(res != NXT_OK)) { + nxt_alert(task, "stream #%uD, app '%V': failed to send app message", + req_rpc_data->stream, &app->name); - nxt_request_app_link_update_peer(task, req_app_link); + nxt_http_request_error(task, req_rpc_data->request, + NXT_HTTP_INTERNAL_SERVER_ERROR); + } } @@ -5100,7 +4603,7 @@ nxt_fields_next(nxt_fields_iter_t *i) static nxt_buf_t * nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r, - nxt_port_t *port, const nxt_str_t *prefix) + nxt_app_t *app, const nxt_str_t *prefix) { void *target_pos, *query_pos; u_char *pos, *end, *p, c; @@ -5141,7 +4644,7 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r, return NULL; } - out = nxt_port_mmap_get_buf(task, &port->process->outgoing, + out = nxt_port_mmap_get_buf(task, &app->outgoing, nxt_min(req_size + content_length, PORT_MMAP_DATA_SIZE)); if (nxt_slow_path(out == NULL)) { return NULL; @@ -5323,8 +4826,7 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r, if (buf == NULL) { free_size = nxt_min(size, PORT_MMAP_DATA_SIZE); - buf = nxt_port_mmap_get_buf(task, &port->process->outgoing, - free_size); + buf = nxt_port_mmap_get_buf(task, &app->outgoing, free_size); if (nxt_slow_path(buf == NULL)) { while (out != NULL) { buf = out->next; @@ -5372,15 +4874,9 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r, static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data) { - nxt_app_t *app; - nxt_bool_t cancelled, unlinked; - nxt_port_t *port; nxt_timer_t *timer; - nxt_queue_link_t *lnk; nxt_http_request_t *r; - nxt_request_app_link_t *pending_ra; nxt_request_rpc_data_t *req_rpc_data; - nxt_port_select_state_t state; timer = obj; @@ -5388,94 +4884,6 @@ nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data) r = nxt_timer_data(timer, nxt_http_request_t, timer); req_rpc_data = r->timer_data; - app = req_rpc_data->app; - - if (app == NULL) { - goto generate_error; - } - - port = NULL; - pending_ra = NULL; - - if (req_rpc_data->app_port != NULL) { - port = req_rpc_data->app_port; - req_rpc_data->app_port = NULL; - } - - if (port == NULL && req_rpc_data->req_app_link != NULL - && req_rpc_data->req_app_link->app_port != NULL) - { - port = req_rpc_data->req_app_link->app_port; - req_rpc_data->req_app_link->app_port = NULL; - } - - if (port == NULL) { - goto generate_error; - } - - nxt_thread_mutex_lock(&app->mutex); - - unlinked = nxt_queue_chk_remove(&port->app_link); - - if (!nxt_queue_is_empty(&port->pending_requests)) { - lnk = nxt_queue_first(&port->pending_requests); - - pending_ra = nxt_queue_link_data(lnk, nxt_request_app_link_t, - link_port_pending); - - nxt_assert(pending_ra->link_app_pending.next != NULL); - - nxt_debug(task, "app '%V' pending request #%uD found", - &app->name, pending_ra->stream); - - cancelled = nxt_router_msg_cancel(task, &pending_ra->msg_info, - pending_ra->stream); - - if (cancelled) { - state.req_app_link = pending_ra; - state.app = app; - - /* - * Need to increment use count "in advance" because - * nxt_router_port_select() will remove pending_ra from lists - * and decrement use count. - */ - nxt_request_app_link_inc_use(pending_ra); - - nxt_router_port_select(task, &state); - - } else { - pending_ra = NULL; - } - } - - nxt_thread_mutex_unlock(&app->mutex); - - if (pending_ra != NULL) { - if (nxt_router_port_post_select(task, &state) == NXT_OK) { - /* - * Reference counter already incremented above, this will - * keep pending_ra while nxt_router_app_process_request() - * task is in queue. Reference counter decreased in - * nxt_router_app_process_request() after processing. - */ - - nxt_work_queue_add(&task->thread->engine->fast_work_queue, - nxt_router_app_process_request, - &task->thread->engine->task, app, pending_ra); - - } else { - nxt_request_app_link_use(task, pending_ra, -1); - } - } - - nxt_debug(task, "send quit to app '%V' pid %PI", &app->name, port->pid); - - nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); - - nxt_port_use(task, port, unlinked ? -2 : -1); - -generate_error: nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); @@ -5483,13 +4891,11 @@ generate_error: } -static nxt_int_t -nxt_router_http_request_done(nxt_task_t *task, nxt_http_request_t *r) +static void +nxt_router_http_request_release_post(nxt_task_t *task, nxt_http_request_t *r) { r->timer.handler = nxt_router_http_request_release; nxt_timer_add(task->thread->engine, &r->timer, 0); - - return NXT_OK; } @@ -5498,7 +4904,7 @@ nxt_router_http_request_release(nxt_task_t *task, void *obj, void *data) { nxt_http_request_t *r; - nxt_debug(task, "http app release"); + nxt_debug(task, "http request pool release"); r = nxt_timer_data(obj, nxt_http_request_t, timer); @@ -5593,7 +4999,18 @@ nxt_router_get_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_assert(port->type == NXT_PROCESS_APP); - mmaps = &port->process->outgoing; + if (nxt_slow_path(port->app == NULL)) { + nxt_alert(task, "get_mmap_handler: app == NULL for reply port %PI:%d", + port->pid, port->id); + + // FIXME + nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, + -1, msg->port_msg.stream, 0, NULL); + + return; + } + + mmaps = &port->app->outgoing; nxt_thread_mutex_lock(&mmaps->mutex); if (nxt_slow_path(get_mmap_msg->id >= mmaps->size)) { @@ -5602,6 +5019,9 @@ nxt_router_get_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_alert(task, "get_mmap_handler: mmap id is too big (%d)", (int) get_mmap_msg->id); + // FIXME + nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, + -1, msg->port_msg.stream, 0, NULL); return; } |