summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/nxt_application.h1
-rw-r--r--src/nxt_router.c97
2 files changed, 89 insertions, 9 deletions
diff --git a/src/nxt_application.h b/src/nxt_application.h
index caf5b884..f4865d99 100644
--- a/src/nxt_application.h
+++ b/src/nxt_application.h
@@ -133,6 +133,7 @@ struct nxt_app_parse_ctx_s {
nxt_app_request_t r;
nxt_http_request_t *request;
nxt_timer_t timer;
+ void *timer_data;
nxt_http_request_parse_t parser;
nxt_http_request_parse_t resp_parser;
nxt_mp_t *mem_pool;
diff --git a/src/nxt_router.c b/src/nxt_router.c
index 994ba4af..5b638ed0 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -543,6 +543,7 @@ nxt_router_ra_release(nxt_task_t *task, nxt_req_app_link_t *ra)
if (rc->app->timeout != 0) {
rc->ap->timer.handler = nxt_router_app_timeout;
+ rc->ap->timer_data = rc;
nxt_timer_add(task->thread->engine, &rc->ap->timer,
rc->app->timeout);
}
@@ -707,6 +708,8 @@ nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc)
}
if (rc->ap != NULL) {
+ rc->ap->timer_data = NULL;
+
nxt_app_http_req_done(task, rc->ap);
rc->ap = NULL;
@@ -2664,7 +2667,6 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
}
if (ar->request->error) {
- nxt_app_http_req_done(task, ar);
nxt_router_rc_unlink(task, rc);
return;
}
@@ -2677,8 +2679,9 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_router_rc_unlink(task, rc);
} else {
- if (rc->app->timeout != 0) {
+ if (rc->app != NULL && rc->app->timeout != 0) {
ar->timer.handler = nxt_router_app_timeout;
+ ar->timer_data = rc;
nxt_timer_add(task->thread->engine, &ar->timer, rc->app->timeout);
}
}
@@ -2732,10 +2735,9 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
fail:
- nxt_app_http_req_done(task, ar);
- nxt_router_rc_unlink(task, rc);
-
nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
+
+ nxt_router_rc_unlink(task, rc);
}
@@ -4262,16 +4264,93 @@ nxt_router_conn_free(nxt_task_t *task, void *obj, void *data)
static void
nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
{
- nxt_timer_t *timer;
- nxt_app_parse_ctx_t *ar;
+ nxt_app_t *app;
+ nxt_bool_t cancelled, unlinked;
+ nxt_port_t *port;
+ nxt_timer_t *timer;
+ nxt_queue_link_t *lnk;
+ nxt_req_app_link_t *pending_ra;
+ nxt_app_parse_ctx_t *ar;
+ nxt_req_conn_link_t *rc;
+ nxt_port_select_state_t state;
timer = obj;
nxt_debug(task, "router app timeout");
ar = nxt_timer_data(timer, nxt_app_parse_ctx_t, timer);
+ rc = ar->timer_data;
+ app = rc->app;
- if (!ar->request->header_sent) {
- nxt_http_request_error(task, ar->request, NXT_HTTP_SERVICE_UNAVAILABLE);
+ if (app == NULL) {
+ goto generate_error;
}
+
+ port = NULL;
+ pending_ra = NULL;
+
+ if (rc->app_port != NULL) {
+ port = rc->app_port;
+ rc->app_port = NULL;
+ }
+
+ if (port == NULL && rc->ra != NULL && rc->ra->app_port != NULL) {
+ port = rc->ra->app_port;
+ rc->ra->app_port = NULL;
+ }
+
+ if (port == NULL) {
+ goto generate_error;
+ }
+
+ nxt_thread_mutex_lock(&app->mutex);
+
+ unlinked = nxt_queue_chk_remove(&port->app_link);
+
+ if (!nxt_queue_is_empty(&port->pending_requests)) {
+ lnk = nxt_queue_first(&port->pending_requests);
+
+ pending_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t,
+ link_port_pending);
+
+ nxt_assert(pending_ra->link_app_pending.next != NULL);
+
+ nxt_debug(task, "app '%V' pending request #%uD found",
+ &app->name, pending_ra->stream);
+
+ cancelled = nxt_router_msg_cancel(task, &pending_ra->msg_info,
+ pending_ra->stream);
+
+ if (cancelled) {
+ nxt_router_ra_inc_use(pending_ra);
+
+ state.ra = pending_ra;
+ state.app = app;
+
+ nxt_router_port_select(task, &state);
+
+ } else {
+ pending_ra = NULL;
+ }
+ }
+
+ nxt_thread_mutex_unlock(&app->mutex);
+
+ if (pending_ra != NULL
+ && nxt_router_port_post_select(task, &state) == NXT_OK)
+ {
+ nxt_router_app_prepare_request(task, pending_ra);
+ }
+
+ nxt_debug(task, "send quit to app '%V' pid %PI", &app->name, port->pid);
+
+ nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
+
+ nxt_port_use(task, port, unlinked ? -2 : -1);
+
+generate_error:
+
+ nxt_http_request_error(task, ar->request, NXT_HTTP_SERVICE_UNAVAILABLE);
+
+ nxt_router_rc_unlink(task, rc);
}