diff options
-rw-r--r-- | src/nxt_application.h | 1 | ||||
-rw-r--r-- | src/nxt_router.c | 97 |
2 files changed, 89 insertions, 9 deletions
diff --git a/src/nxt_application.h b/src/nxt_application.h index caf5b884..f4865d99 100644 --- a/src/nxt_application.h +++ b/src/nxt_application.h @@ -133,6 +133,7 @@ struct nxt_app_parse_ctx_s { nxt_app_request_t r; nxt_http_request_t *request; nxt_timer_t timer; + void *timer_data; nxt_http_request_parse_t parser; nxt_http_request_parse_t resp_parser; nxt_mp_t *mem_pool; diff --git a/src/nxt_router.c b/src/nxt_router.c index 994ba4af..5b638ed0 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -543,6 +543,7 @@ nxt_router_ra_release(nxt_task_t *task, nxt_req_app_link_t *ra) if (rc->app->timeout != 0) { rc->ap->timer.handler = nxt_router_app_timeout; + rc->ap->timer_data = rc; nxt_timer_add(task->thread->engine, &rc->ap->timer, rc->app->timeout); } @@ -707,6 +708,8 @@ nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc) } if (rc->ap != NULL) { + rc->ap->timer_data = NULL; + nxt_app_http_req_done(task, rc->ap); rc->ap = NULL; @@ -2664,7 +2667,6 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, } if (ar->request->error) { - nxt_app_http_req_done(task, ar); nxt_router_rc_unlink(task, rc); return; } @@ -2677,8 +2679,9 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_router_rc_unlink(task, rc); } else { - if (rc->app->timeout != 0) { + if (rc->app != NULL && rc->app->timeout != 0) { ar->timer.handler = nxt_router_app_timeout; + ar->timer_data = rc; nxt_timer_add(task->thread->engine, &ar->timer, rc->app->timeout); } } @@ -2732,10 +2735,9 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, fail: - nxt_app_http_req_done(task, ar); - nxt_router_rc_unlink(task, rc); - nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); + + nxt_router_rc_unlink(task, rc); } @@ -4262,16 +4264,93 @@ nxt_router_conn_free(nxt_task_t *task, void *obj, void *data) static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data) { - nxt_timer_t *timer; - nxt_app_parse_ctx_t *ar; + nxt_app_t *app; + nxt_bool_t cancelled, unlinked; + nxt_port_t *port; + nxt_timer_t *timer; + nxt_queue_link_t *lnk; + nxt_req_app_link_t *pending_ra; + nxt_app_parse_ctx_t *ar; + nxt_req_conn_link_t *rc; + nxt_port_select_state_t state; timer = obj; nxt_debug(task, "router app timeout"); ar = nxt_timer_data(timer, nxt_app_parse_ctx_t, timer); + rc = ar->timer_data; + app = rc->app; - if (!ar->request->header_sent) { - nxt_http_request_error(task, ar->request, NXT_HTTP_SERVICE_UNAVAILABLE); + if (app == NULL) { + goto generate_error; } + + port = NULL; + pending_ra = NULL; + + if (rc->app_port != NULL) { + port = rc->app_port; + rc->app_port = NULL; + } + + if (port == NULL && rc->ra != NULL && rc->ra->app_port != NULL) { + port = rc->ra->app_port; + rc->ra->app_port = NULL; + } + + if (port == NULL) { + goto generate_error; + } + + nxt_thread_mutex_lock(&app->mutex); + + unlinked = nxt_queue_chk_remove(&port->app_link); + + if (!nxt_queue_is_empty(&port->pending_requests)) { + lnk = nxt_queue_first(&port->pending_requests); + + pending_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, + link_port_pending); + + nxt_assert(pending_ra->link_app_pending.next != NULL); + + nxt_debug(task, "app '%V' pending request #%uD found", + &app->name, pending_ra->stream); + + cancelled = nxt_router_msg_cancel(task, &pending_ra->msg_info, + pending_ra->stream); + + if (cancelled) { + nxt_router_ra_inc_use(pending_ra); + + state.ra = pending_ra; + state.app = app; + + nxt_router_port_select(task, &state); + + } else { + pending_ra = NULL; + } + } + + nxt_thread_mutex_unlock(&app->mutex); + + if (pending_ra != NULL + && nxt_router_port_post_select(task, &state) == NXT_OK) + { + nxt_router_app_prepare_request(task, pending_ra); + } + + nxt_debug(task, "send quit to app '%V' pid %PI", &app->name, port->pid); + + nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); + + nxt_port_use(task, port, unlinked ? -2 : -1); + +generate_error: + + nxt_http_request_error(task, ar->request, NXT_HTTP_SERVICE_UNAVAILABLE); + + nxt_router_rc_unlink(task, rc); } |