summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2017-12-27 17:48:04 +0300
committerMax Romanov <max.romanov@nginx.com>2017-12-27 17:48:04 +0300
commit5196cf4d5003dade99321f71bcc1af80f0e5ad50 (patch)
tree7e087e5135793b55922082f18a84b387832cac63
parentbaa8c9387b00dc3cd72cfca14c61c25a2badada9 (diff)
downloadunit-5196cf4d5003dade99321f71bcc1af80f0e5ad50.tar.gz
unit-5196cf4d5003dade99321f71bcc1af80f0e5ad50.tar.bz2
Rescheduling of pending request after configured timeout.
New optional configuration parameter introduced: limits.reschedule_timeout. Default value 1 second. In the case when request is written to the port socket 'in advance', it is called 'pending'. On every completed request, the head of pending request is checked against reschedule timeout. If this request waiting for execution longer than timeout, it is cancelled, new port selected for this request.
Diffstat (limited to '')
-rw-r--r--src/nxt_conf_validation.c5
-rw-r--r--src/nxt_port_rpc.c4
-rw-r--r--src/nxt_router.c297
-rw-r--r--src/nxt_router.h4
4 files changed, 222 insertions, 88 deletions
diff --git a/src/nxt_conf_validation.c b/src/nxt_conf_validation.c
index cc2ce5ef..05d0a021 100644
--- a/src/nxt_conf_validation.c
+++ b/src/nxt_conf_validation.c
@@ -78,6 +78,11 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_app_limits_members[] = {
NULL,
NULL },
+ { nxt_string("reschedule_timeout"),
+ NXT_CONF_INTEGER,
+ NULL,
+ NULL },
+
{ nxt_string("requests"),
NXT_CONF_INTEGER,
NULL,
diff --git a/src/nxt_port_rpc.c b/src/nxt_port_rpc.c
index 15c550f4..b041256a 100644
--- a/src/nxt_port_rpc.c
+++ b/src/nxt_port_rpc.c
@@ -314,10 +314,6 @@ nxt_port_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
reg = lhq.value;
- if (reg->peer != -1) {
- nxt_assert(reg->peer == msg->port_msg.pid);
- }
-
if (type == _NXT_PORT_MSG_RPC_ERROR) {
reg->error_handler(task, msg, reg->data);
diff --git a/src/nxt_router.c b/src/nxt_router.c
index 1f0f0b0f..e586f63a 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -13,6 +13,7 @@ typedef struct {
nxt_str_t type;
uint32_t workers;
nxt_msec_t timeout;
+ nxt_msec_t res_timeout;
uint32_t requests;
nxt_conf_value_t *limits_value;
} nxt_router_app_conf_t;
@@ -55,8 +56,11 @@ struct nxt_req_app_link_s {
nxt_msg_info_t msg_info;
nxt_req_conn_link_t *rc;
+ nxt_nsec_t res_time;
+
nxt_queue_link_t link_app_requests; /* for nxt_app_t.requests */
nxt_queue_link_t link_port_pending; /* for nxt_port_t.pending_requests */
+ nxt_queue_link_t link_app_pending; /* for nxt_app_t.pending */
nxt_mp_t *mem_pool;
nxt_work_t work;
@@ -72,6 +76,26 @@ typedef struct {
} nxt_socket_rpc_t;
+struct nxt_port_select_state_s {
+ nxt_app_t *app;
+ nxt_req_app_link_t *ra;
+
+ nxt_port_t *failed_port;
+ int failed_port_use_delta;
+
+ nxt_bool_t can_start_worker;
+ nxt_req_app_link_t *shared_ra;
+ nxt_port_t *port;
+};
+
+typedef struct nxt_port_select_state_s nxt_port_select_state_t;
+
+static void nxt_router_port_select(nxt_task_t *task,
+ nxt_port_select_state_t *state);
+
+static nxt_int_t nxt_router_port_post_select(nxt_task_t *task,
+ nxt_port_select_state_t *state);
+
static nxt_int_t nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app);
nxt_inline void
@@ -172,7 +196,8 @@ static void nxt_router_app_port_error(nxt_task_t *task,
static nxt_port_t * nxt_router_app_get_idle_port(nxt_app_t *app);
static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
uint32_t request_failed, uint32_t got_response);
-static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra);
+static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_app_t *app,
+ nxt_req_app_link_t *ra);
static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data);
static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj,
@@ -575,6 +600,21 @@ nxt_router_ra_error(nxt_req_app_link_t *ra, int code, const char* str)
}
+nxt_inline void
+nxt_router_ra_pending(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra)
+{
+ nxt_queue_insert_tail(&ra->app_port->pending_requests,
+ &ra->link_port_pending);
+ nxt_queue_insert_tail(&app->pending, &ra->link_app_pending);
+
+ nxt_router_ra_inc_use(ra);
+
+ ra->res_time = nxt_thread_monotonic_time(task->thread) + app->res_timeout;
+
+ nxt_debug(task, "ra stream #%uD enqueue to pending_requests", ra->stream);
+}
+
+
nxt_inline nxt_bool_t
nxt_queue_chk_remove(nxt_queue_link_t *lnk)
{
@@ -615,13 +655,15 @@ nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc)
nxt_thread_mutex_lock(&rc->app->mutex);
if (ra->link_app_requests.next == NULL
- && ra->link_port_pending.next == NULL)
+ && ra->link_port_pending.next == NULL
+ && ra->link_app_pending.next == NULL)
{
ra = NULL;
} else {
ra_use_delta -= nxt_queue_chk_remove(&ra->link_app_requests);
ra_use_delta -= nxt_queue_chk_remove(&ra->link_port_pending);
+ nxt_queue_chk_remove(&ra->link_app_pending);
}
nxt_thread_mutex_unlock(&rc->app->mutex);
@@ -974,6 +1016,12 @@ static nxt_conf_map_t nxt_router_app_limits_conf[] = {
},
{
+ nxt_string("reschedule_timeout"),
+ NXT_CONF_MAP_MSEC,
+ offsetof(nxt_router_app_conf_t, res_timeout),
+ },
+
+ {
nxt_string("requests"),
NXT_CONF_MAP_INT32,
offsetof(nxt_router_app_conf_t, requests),
@@ -1127,6 +1175,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
apcf.workers = 1;
apcf.timeout = 0;
+ apcf.res_timeout = 1000;
apcf.requests = 0;
apcf.limits_value = NULL;
@@ -1156,7 +1205,8 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_debug(task, "application type: %V", &apcf.type);
nxt_debug(task, "application workers: %D", apcf.workers);
- nxt_debug(task, "application timeout: %D", apcf.timeout);
+ nxt_debug(task, "application request timeout: %D", apcf.timeout);
+ nxt_debug(task, "application reschedule timeout: %D", apcf.res_timeout);
nxt_debug(task, "application requests: %D", apcf.requests);
lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
@@ -1176,6 +1226,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_queue_init(&app->ports);
nxt_queue_init(&app->requests);
+ nxt_queue_init(&app->pending);
app->name.length = name.length;
nxt_memcpy(app->name.start, name.start, name.length);
@@ -1183,6 +1234,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
app->type = lang->type;
app->max_workers = apcf.workers;
app->timeout = apcf.timeout;
+ app->res_timeout = apcf.res_timeout * 1000000;
app->live = 1;
app->max_pending_responses = 2;
app->prepare_msg = nxt_app_prepare_msg[lang->type];
@@ -2481,7 +2533,7 @@ nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
if (cancelled) {
nxt_router_ra_inc_use(ra);
- res = nxt_router_app_port(task, ra);
+ res = nxt_router_app_port(task, rc->app, ra);
if (res == NXT_OK) {
port = ra->app_port;
@@ -2702,7 +2754,7 @@ nxt_router_app_first_port_busy(nxt_app_t *app)
nxt_inline nxt_port_t *
-nxt_router_app_get_port_unsafe(nxt_app_t *app)
+nxt_router_pop_first_port(nxt_app_t *app)
{
nxt_port_t *port;
nxt_queue_link_t *lnk;
@@ -2783,14 +2835,17 @@ static void
nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
uint32_t request_failed, uint32_t got_response)
{
- nxt_app_t *app;
- nxt_bool_t send_quit;
- nxt_queue_link_t *lnk;
- nxt_req_app_link_t *ra, *next_ra;
+ nxt_app_t *app;
+ nxt_bool_t send_quit, cancelled;
+ nxt_queue_link_t *lnk;
+ nxt_req_app_link_t *ra, *pending_ra, *re_ra;
+ nxt_port_select_state_t state;
nxt_assert(port != NULL);
nxt_assert(port->app != NULL);
+ ra = NULL;
+
app = port->app;
nxt_thread_mutex_lock(&app->mutex);
@@ -2798,8 +2853,11 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
port->app_pending_responses -= request_failed + got_response;
port->app_responses += got_response;
- if (app->live != 0
- && port->pair[1] != -1
+ if (nxt_slow_path(app->live == 0)) {
+ goto app_dead;
+ }
+
+ if (port->pair[1] != -1
&& (app->max_pending_responses == 0
|| port->app_pending_responses < app->max_pending_responses))
{
@@ -2823,8 +2881,7 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
}
}
- if (app->live != 0
- && !nxt_queue_is_empty(&app->ports)
+ if (!nxt_queue_is_empty(&app->ports)
&& !nxt_queue_is_empty(&app->requests))
{
lnk = nxt_queue_first(&app->requests);
@@ -2833,19 +2890,16 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_requests);
- ra->app_port = nxt_router_app_get_port_unsafe(app);
+ ra->app_port = nxt_router_pop_first_port(app);
if (ra->app_port->app_pending_responses > 1) {
- nxt_queue_insert_tail(&ra->app_port->pending_requests,
- &ra->link_port_pending);
-
- nxt_router_ra_inc_use(ra);
+ nxt_router_ra_pending(task, app, ra);
}
-
- } else {
- ra = NULL;
}
+app_dead:
+
+ /* Pop first pending request for this port. */
if ((request_failed > 0 || got_response > 0)
&& !nxt_queue_is_empty(&port->pending_requests))
{
@@ -2853,19 +2907,63 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
nxt_queue_remove(lnk);
lnk->next = NULL;
- next_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t,
- link_port_pending);
+ pending_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t,
+ link_port_pending);
+
+ nxt_assert(pending_ra->link_app_pending.next != NULL);
+
+ nxt_queue_remove(&pending_ra->link_app_pending);
+ pending_ra->link_app_pending.next = NULL;
} else {
- next_ra = NULL;
+ pending_ra = NULL;
}
+ /* Try to cancel and re-schedule first stalled request for this app. */
+ if (got_response > 0 && !nxt_queue_is_empty(&app->pending)) {
+ lnk = nxt_queue_first(&app->pending);
+
+ re_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_pending);
+
+ if (re_ra->res_time <= nxt_thread_monotonic_time(task->thread)) {
+
+ nxt_debug(task, "app '%V' stalled request #%uD detected",
+ &app->name, re_ra->stream);
+
+ cancelled = nxt_router_msg_cancel(task, &re_ra->msg_info,
+ re_ra->stream);
+
+ if (cancelled) {
+ nxt_router_ra_inc_use(re_ra);
+
+ state.ra = re_ra;
+ state.app = app;
+
+ nxt_router_port_select(task, &state);
+
+ goto re_ra_cancelled;
+ }
+ }
+ }
+
+ re_ra = NULL;
+
+re_ra_cancelled:
+
send_quit = app->live == 0 && port->app_pending_responses > 0;
nxt_thread_mutex_unlock(&app->mutex);
- if (next_ra != NULL) {
- nxt_router_ra_use(task, next_ra, -1);
+ if (pending_ra != NULL) {
+ nxt_router_ra_use(task, pending_ra, -1);
+ }
+
+ if (re_ra != NULL) {
+ if (nxt_router_port_post_select(task, &state) == NXT_OK) {
+ nxt_work_queue_add(&task->thread->engine->fast_work_queue,
+ nxt_router_app_process_request,
+ &task->thread->engine->task, app, re_ra);
+ }
}
if (ra != NULL) {
@@ -2950,22 +3048,16 @@ nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port)
}
-static nxt_int_t
-nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
+static void
+nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state)
{
- int failed_port_use_delta;
- nxt_int_t res;
- nxt_app_t *app;
- nxt_bool_t can_start_worker;
- nxt_conn_t *c;
- nxt_port_t *port, *failed_port;
-
- c = ra->rc->conn;
- app = ra->rc->app;
+ nxt_app_t *app;
+ nxt_req_app_link_t *ra;
- failed_port_use_delta = 0;
+ ra = state->ra;
+ app = state->app;
- nxt_thread_mutex_lock(&app->mutex);
+ state->failed_port_use_delta = 0;
if (nxt_queue_chk_remove(&ra->link_app_requests))
{
@@ -2974,93 +3066,113 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
if (nxt_queue_chk_remove(&ra->link_port_pending))
{
+ nxt_assert(ra->link_app_pending.next != NULL);
+
+ nxt_queue_remove(&ra->link_app_pending);
+ ra->link_app_pending.next = NULL;
+
nxt_router_ra_dec_use(ra);
}
- if (ra->app_port != NULL) {
- failed_port = ra->app_port;
- failed_port_use_delta--;
+ state->failed_port = ra->app_port;
- failed_port->app_pending_responses--;
+ if (ra->app_port != NULL) {
+ state->failed_port_use_delta--;
- if (failed_port->app_link.next != NULL) {
- nxt_queue_remove(&failed_port->app_link);
- failed_port->app_link.next = NULL;
+ state->failed_port->app_pending_responses--;
- failed_port_use_delta--;
+ if (nxt_queue_chk_remove(&state->failed_port->app_link)) {
+ state->failed_port_use_delta--;
}
- } else {
- failed_port = NULL;
+ ra->app_port = NULL;
}
- can_start_worker = (app->workers + app->pending_workers)
- < app->max_workers;
+ state->can_start_worker = (app->workers + app->pending_workers)
+ < app->max_workers;
+ state->port = NULL;
if (nxt_queue_is_empty(&app->ports)
- || (can_start_worker && nxt_router_app_first_port_busy(app)) )
+ || (state->can_start_worker && nxt_router_app_first_port_busy(app)) )
{
ra = nxt_router_ra_create(task, ra);
- if (nxt_fast_path(ra != NULL)) {
+ if (nxt_slow_path(ra == NULL)) {
+ goto fail;
+ }
+
+ if (nxt_slow_path(state->failed_port != NULL)) {
+ nxt_queue_insert_head(&app->requests, &ra->link_app_requests);
+
+ } else {
nxt_queue_insert_tail(&app->requests, &ra->link_app_requests);
+ }
- nxt_router_ra_inc_use(ra);
+ nxt_router_ra_inc_use(ra);
- nxt_debug(task, "ra stream #%uD enqueue to app->requests",
- ra->stream);
+ nxt_debug(task, "ra stream #%uD enqueue to app->requests", ra->stream);
- if (can_start_worker) {
- app->pending_workers++;
- }
+ if (state->can_start_worker) {
+ app->pending_workers++;
}
- port = NULL;
-
} else {
- port = nxt_router_app_get_port_unsafe(app);
+ state->port = nxt_router_pop_first_port(app);
- if (port->app_pending_responses > 1) {
+ if (state->port->app_pending_responses > 1) {
ra = nxt_router_ra_create(task, ra);
- if (nxt_fast_path(ra != NULL)) {
- nxt_queue_insert_tail(&port->pending_requests,
- &ra->link_port_pending);
+ if (nxt_slow_path(ra == NULL)) {
+ goto fail;
+ }
- nxt_router_ra_inc_use(ra);
+ ra->app_port = state->port;
- nxt_debug(task, "ra stream #%uD enqueue to "
- "port->pending_requests", ra->stream);
- }
+ nxt_router_ra_pending(task, app, ra);
}
}
- nxt_thread_mutex_unlock(&app->mutex);
+fail:
+
+ state->shared_ra = ra;
+}
+
+
+static nxt_int_t
+nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state)
+{
+ nxt_int_t res;
+ nxt_app_t *app;
+ nxt_req_app_link_t *ra;
- if (failed_port_use_delta != 0) {
- nxt_port_use(task, failed_port, failed_port_use_delta);
+ ra = state->shared_ra;
+ app = state->app;
+
+ if (state->failed_port_use_delta != 0) {
+ nxt_port_use(task, state->failed_port, state->failed_port_use_delta);
}
if (nxt_slow_path(ra == NULL)) {
- nxt_router_gen_error(task, c, 500, "Failed to allocate "
- "req<->app link");
-
- if (port != NULL) {
- nxt_port_use(task, port, -1);
+ if (state->port != NULL) {
+ nxt_port_use(task, state->port, -1);
}
+ nxt_router_ra_error(state->ra, 500,
+ "Failed to allocate shared req<->app link");
+ nxt_router_ra_use(task, state->ra, -1);
+
return NXT_ERROR;
}
- if (port != NULL) {
+ if (state->port != NULL) {
nxt_debug(task, "already have port for app '%V' %p ", &app->name, app);
- ra->app_port = port;
+ ra->app_port = state->port;
return NXT_OK;
}
- if (!can_start_worker) {
+ if (!state->can_start_worker) {
nxt_debug(task, "app '%V' %p too many running or pending workers",
&app->name, app);
@@ -3070,7 +3182,8 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
res = nxt_router_start_worker(task, app);
if (nxt_slow_path(res != NXT_OK)) {
- nxt_router_gen_error(task, c, 500, "Failed to start worker");
+ nxt_router_ra_error(ra, 500, "Failed to start worker");
+ nxt_router_ra_use(task, ra, -1);
return NXT_ERROR;
}
@@ -3079,6 +3192,24 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
}
+static nxt_int_t
+nxt_router_app_port(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra)
+{
+ nxt_port_select_state_t state;
+
+ state.ra = ra;
+ state.app = app;
+
+ nxt_thread_mutex_lock(&app->mutex);
+
+ nxt_router_port_select(task, &state);
+
+ nxt_thread_mutex_unlock(&app->mutex);
+
+ return nxt_router_port_post_select(task, &state);
+}
+
+
static void
nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data)
{
@@ -3392,7 +3523,7 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
ra = &ra_local;
nxt_router_ra_init(task, ra, rc);
- res = nxt_router_app_port(task, ra);
+ res = nxt_router_app_port(task, app, ra);
if (res != NXT_OK) {
return;
diff --git a/src/nxt_router.h b/src/nxt_router.h
index 521c0397..89543a1b 100644
--- a/src/nxt_router.h
+++ b/src/nxt_router.h
@@ -81,7 +81,8 @@ struct nxt_app_s {
nxt_thread_mutex_t mutex; /* Protects ports queue. */
nxt_queue_t ports; /* of nxt_port_t.app_link */
- nxt_queue_t requests; /* of nxt_req_conn_link_t */
+ nxt_queue_t requests; /* of nxt_req_app_link_t */
+ nxt_queue_t pending; /* of nxt_req_app_link_t */
nxt_str_t name;
uint32_t pending_workers;
@@ -90,6 +91,7 @@ struct nxt_app_s {
uint32_t max_pending_responses;
nxt_msec_t timeout;
+ nxt_nsec_t res_timeout;
nxt_app_type_t type:8;
uint8_t live; /* 1 bit */