summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/nxt_conf_validation.c5
-rw-r--r--src/nxt_port_rpc.c4
-rw-r--r--src/nxt_router.c297
-rw-r--r--src/nxt_router.h4
4 files changed, 222 insertions, 88 deletions
diff --git a/src/nxt_conf_validation.c b/src/nxt_conf_validation.c
index cc2ce5ef..05d0a021 100644
--- a/src/nxt_conf_validation.c
+++ b/src/nxt_conf_validation.c
@@ -78,6 +78,11 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_app_limits_members[] = {
NULL,
NULL },
+ { nxt_string("reschedule_timeout"),
+ NXT_CONF_INTEGER,
+ NULL,
+ NULL },
+
{ nxt_string("requests"),
NXT_CONF_INTEGER,
NULL,
diff --git a/src/nxt_port_rpc.c b/src/nxt_port_rpc.c
index 15c550f4..b041256a 100644
--- a/src/nxt_port_rpc.c
+++ b/src/nxt_port_rpc.c
@@ -314,10 +314,6 @@ nxt_port_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
reg = lhq.value;
- if (reg->peer != -1) {
- nxt_assert(reg->peer == msg->port_msg.pid);
- }
-
if (type == _NXT_PORT_MSG_RPC_ERROR) {
reg->error_handler(task, msg, reg->data);
diff --git a/src/nxt_router.c b/src/nxt_router.c
index 1f0f0b0f..e586f63a 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -13,6 +13,7 @@ typedef struct {
nxt_str_t type;
uint32_t workers;
nxt_msec_t timeout;
+ nxt_msec_t res_timeout;
uint32_t requests;
nxt_conf_value_t *limits_value;
} nxt_router_app_conf_t;
@@ -55,8 +56,11 @@ struct nxt_req_app_link_s {
nxt_msg_info_t msg_info;
nxt_req_conn_link_t *rc;
+ nxt_nsec_t res_time;
+
nxt_queue_link_t link_app_requests; /* for nxt_app_t.requests */
nxt_queue_link_t link_port_pending; /* for nxt_port_t.pending_requests */
+ nxt_queue_link_t link_app_pending; /* for nxt_app_t.pending */
nxt_mp_t *mem_pool;
nxt_work_t work;
@@ -72,6 +76,26 @@ typedef struct {
} nxt_socket_rpc_t;
+struct nxt_port_select_state_s {
+ nxt_app_t *app;
+ nxt_req_app_link_t *ra;
+
+ nxt_port_t *failed_port;
+ int failed_port_use_delta;
+
+ nxt_bool_t can_start_worker;
+ nxt_req_app_link_t *shared_ra;
+ nxt_port_t *port;
+};
+
+typedef struct nxt_port_select_state_s nxt_port_select_state_t;
+
+static void nxt_router_port_select(nxt_task_t *task,
+ nxt_port_select_state_t *state);
+
+static nxt_int_t nxt_router_port_post_select(nxt_task_t *task,
+ nxt_port_select_state_t *state);
+
static nxt_int_t nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app);
nxt_inline void
@@ -172,7 +196,8 @@ static void nxt_router_app_port_error(nxt_task_t *task,
static nxt_port_t * nxt_router_app_get_idle_port(nxt_app_t *app);
static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
uint32_t request_failed, uint32_t got_response);
-static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra);
+static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_app_t *app,
+ nxt_req_app_link_t *ra);
static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data);
static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj,
@@ -575,6 +600,21 @@ nxt_router_ra_error(nxt_req_app_link_t *ra, int code, const char* str)
}
+nxt_inline void
+nxt_router_ra_pending(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra)
+{
+ nxt_queue_insert_tail(&ra->app_port->pending_requests,
+ &ra->link_port_pending);
+ nxt_queue_insert_tail(&app->pending, &ra->link_app_pending);
+
+ nxt_router_ra_inc_use(ra);
+
+ ra->res_time = nxt_thread_monotonic_time(task->thread) + app->res_timeout;
+
+ nxt_debug(task, "ra stream #%uD enqueue to pending_requests", ra->stream);
+}
+
+
nxt_inline nxt_bool_t
nxt_queue_chk_remove(nxt_queue_link_t *lnk)
{
@@ -615,13 +655,15 @@ nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc)
nxt_thread_mutex_lock(&rc->app->mutex);
if (ra->link_app_requests.next == NULL
- && ra->link_port_pending.next == NULL)
+ && ra->link_port_pending.next == NULL
+ && ra->link_app_pending.next == NULL)
{
ra = NULL;
} else {
ra_use_delta -= nxt_queue_chk_remove(&ra->link_app_requests);
ra_use_delta -= nxt_queue_chk_remove(&ra->link_port_pending);
+ nxt_queue_chk_remove(&ra->link_app_pending);
}
nxt_thread_mutex_unlock(&rc->app->mutex);
@@ -974,6 +1016,12 @@ static nxt_conf_map_t nxt_router_app_limits_conf[] = {
},
{
+ nxt_string("reschedule_timeout"),
+ NXT_CONF_MAP_MSEC,
+ offsetof(nxt_router_app_conf_t, res_timeout),
+ },
+
+ {
nxt_string("requests"),
NXT_CONF_MAP_INT32,
offsetof(nxt_router_app_conf_t, requests),
@@ -1127,6 +1175,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
apcf.workers = 1;
apcf.timeout = 0;
+ apcf.res_timeout = 1000;
apcf.requests = 0;
apcf.limits_value = NULL;
@@ -1156,7 +1205,8 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_debug(task, "application type: %V", &apcf.type);
nxt_debug(task, "application workers: %D", apcf.workers);
- nxt_debug(task, "application timeout: %D", apcf.timeout);
+ nxt_debug(task, "application request timeout: %D", apcf.timeout);
+ nxt_debug(task, "application reschedule timeout: %D", apcf.res_timeout);
nxt_debug(task, "application requests: %D", apcf.requests);
lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
@@ -1176,6 +1226,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_queue_init(&app->ports);
nxt_queue_init(&app->requests);
+ nxt_queue_init(&app->pending);
app->name.length = name.length;
nxt_memcpy(app->name.start, name.start, name.length);
@@ -1183,6 +1234,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
app->type = lang->type;
app->max_workers = apcf.workers;
app->timeout = apcf.timeout;
+ app->res_timeout = apcf.res_timeout * 1000000;
app->live = 1;
app->max_pending_responses = 2;
app->prepare_msg = nxt_app_prepare_msg[lang->type];
@@ -2481,7 +2533,7 @@ nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
if (cancelled) {
nxt_router_ra_inc_use(ra);
- res = nxt_router_app_port(task, ra);
+ res = nxt_router_app_port(task, rc->app, ra);
if (res == NXT_OK) {
port = ra->app_port;
@@ -2702,7 +2754,7 @@ nxt_router_app_first_port_busy(nxt_app_t *app)
nxt_inline nxt_port_t *
-nxt_router_app_get_port_unsafe(nxt_app_t *app)
+nxt_router_pop_first_port(nxt_app_t *app)
{
nxt_port_t *port;
nxt_queue_link_t *lnk;
@@ -2783,14 +2835,17 @@ static void
nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
uint32_t request_failed, uint32_t got_response)
{
- nxt_app_t *app;
- nxt_bool_t send_quit;
- nxt_queue_link_t *lnk;
- nxt_req_app_link_t *ra, *next_ra;
+ nxt_app_t *app;
+ nxt_bool_t send_quit, cancelled;
+ nxt_queue_link_t *lnk;
+ nxt_req_app_link_t *ra, *pending_ra, *re_ra;
+ nxt_port_select_state_t state;
nxt_assert(port != NULL);
nxt_assert(port->app != NULL);
+ ra = NULL;
+
app = port->app;
nxt_thread_mutex_lock(&app->mutex);
@@ -2798,8 +2853,11 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
port->app_pending_responses -= request_failed + got_response;
port->app_responses += got_response;
- if (app->live != 0
- && port->pair[1] != -1
+ if (nxt_slow_path(app->live == 0)) {
+ goto app_dead;
+ }
+
+ if (port->pair[1] != -1
&& (app->max_pending_responses == 0
|| port->app_pending_responses < app->max_pending_responses))
{
@@ -2823,8 +2881,7 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
}
}
- if (app->live != 0
- && !nxt_queue_is_empty(&app->ports)
+ if (!nxt_queue_is_empty(&app->ports)
&& !nxt_queue_is_empty(&app->requests))
{
lnk = nxt_queue_first(&app->requests);
@@ -2833,19 +2890,16 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_requests);
- ra->app_port = nxt_router_app_get_port_unsafe(app);
+ ra->app_port = nxt_router_pop_first_port(app);
if (ra->app_port->app_pending_responses > 1) {
- nxt_queue_insert_tail(&ra->app_port->pending_requests,
- &ra->link_port_pending);
-
- nxt_router_ra_inc_use(ra);
+ nxt_router_ra_pending(task, app, ra);
}
-
- } else {
- ra = NULL;
}
+app_dead:
+
+ /* Pop first pending request for this port. */
if ((request_failed > 0 || got_response > 0)
&& !nxt_queue_is_empty(&port->pending_requests))
{
@@ -2853,19 +2907,63 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
nxt_queue_remove(lnk);
lnk->next = NULL;
- next_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t,
- link_port_pending);
+ pending_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t,
+ link_port_pending);
+
+ nxt_assert(pending_ra->link_app_pending.next != NULL);
+
+ nxt_queue_remove(&pending_ra->link_app_pending);
+ pending_ra->link_app_pending.next = NULL;
} else {
- next_ra = NULL;
+ pending_ra = NULL;
}
+ /* Try to cancel and re-schedule first stalled request for this app. */
+ if (got_response > 0 && !nxt_queue_is_empty(&app->pending)) {
+ lnk = nxt_queue_first(&app->pending);
+
+ re_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_pending);
+
+ if (re_ra->res_time <= nxt_thread_monotonic_time(task->thread)) {
+
+ nxt_debug(task, "app '%V' stalled request #%uD detected",
+ &app->name, re_ra->stream);
+
+ cancelled = nxt_router_msg_cancel(task, &re_ra->msg_info,
+ re_ra->stream);
+
+ if (cancelled) {
+ nxt_router_ra_inc_use(re_ra);
+
+ state.ra = re_ra;
+ state.app = app;
+
+ nxt_router_port_select(task, &state);
+
+ goto re_ra_cancelled;
+ }
+ }
+ }
+
+ re_ra = NULL;
+
+re_ra_cancelled:
+
send_quit = app->live == 0 && port->app_pending_responses > 0;
nxt_thread_mutex_unlock(&app->mutex);
- if (next_ra != NULL) {
- nxt_router_ra_use(task, next_ra, -1);
+ if (pending_ra != NULL) {
+ nxt_router_ra_use(task, pending_ra, -1);
+ }
+
+ if (re_ra != NULL) {
+ if (nxt_router_port_post_select(task, &state) == NXT_OK) {
+ nxt_work_queue_add(&task->thread->engine->fast_work_queue,
+ nxt_router_app_process_request,
+ &task->thread->engine->task, app, re_ra);
+ }
}
if (ra != NULL) {
@@ -2950,22 +3048,16 @@ nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port)
}
-static nxt_int_t
-nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
+static void
+nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state)
{
- int failed_port_use_delta;
- nxt_int_t res;
- nxt_app_t *app;
- nxt_bool_t can_start_worker;
- nxt_conn_t *c;
- nxt_port_t *port, *failed_port;
-
- c = ra->rc->conn;
- app = ra->rc->app;
+ nxt_app_t *app;
+ nxt_req_app_link_t *ra;
- failed_port_use_delta = 0;
+ ra = state->ra;
+ app = state->app;
- nxt_thread_mutex_lock(&app->mutex);
+ state->failed_port_use_delta = 0;
if (nxt_queue_chk_remove(&ra->link_app_requests))
{
@@ -2974,93 +3066,113 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
if (nxt_queue_chk_remove(&ra->link_port_pending))
{
+ nxt_assert(ra->link_app_pending.next != NULL);
+
+ nxt_queue_remove(&ra->link_app_pending);
+ ra->link_app_pending.next = NULL;
+
nxt_router_ra_dec_use(ra);
}
- if (ra->app_port != NULL) {
- failed_port = ra->app_port;
- failed_port_use_delta--;
+ state->failed_port = ra->app_port;
- failed_port->app_pending_responses--;
+ if (ra->app_port != NULL) {
+ state->failed_port_use_delta--;
- if (failed_port->app_link.next != NULL) {
- nxt_queue_remove(&failed_port->app_link);
- failed_port->app_link.next = NULL;
+ state->failed_port->app_pending_responses--;
- failed_port_use_delta--;
+ if (nxt_queue_chk_remove(&state->failed_port->app_link)) {
+ state->failed_port_use_delta--;
}
- } else {
- failed_port = NULL;
+ ra->app_port = NULL;
}
- can_start_worker = (app->workers + app->pending_workers)
- < app->max_workers;
+ state->can_start_worker = (app->workers + app->pending_workers)
+ < app->max_workers;
+ state->port = NULL;
if (nxt_queue_is_empty(&app->ports)
- || (can_start_worker && nxt_router_app_first_port_busy(app)) )
+ || (state->can_start_worker && nxt_router_app_first_port_busy(app)) )
{
ra = nxt_router_ra_create(task, ra);
- if (nxt_fast_path(ra != NULL)) {
+ if (nxt_slow_path(ra == NULL)) {
+ goto fail;
+ }
+
+ if (nxt_slow_path(state->failed_port != NULL)) {
+ nxt_queue_insert_head(&app->requests, &ra->link_app_requests);
+
+ } else {
nxt_queue_insert_tail(&app->requests, &ra->link_app_requests);
+ }
- nxt_router_ra_inc_use(ra);
+ nxt_router_ra_inc_use(ra);
- nxt_debug(task, "ra stream #%uD enqueue to app->requests",
- ra->stream);
+ nxt_debug(task, "ra stream #%uD enqueue to app->requests", ra->stream);
- if (can_start_worker) {
- app->pending_workers++;
- }
+ if (state->can_start_worker) {
+ app->pending_workers++;
}
- port = NULL;
-
} else {
- port = nxt_router_app_get_port_unsafe(app);
+ state->port = nxt_router_pop_first_port(app);
- if (port->app_pending_responses > 1) {
+ if (state->port->app_pending_responses > 1) {
ra = nxt_router_ra_create(task, ra);
- if (nxt_fast_path(ra != NULL)) {
- nxt_queue_insert_tail(&port->pending_requests,
- &ra->link_port_pending);
+ if (nxt_slow_path(ra == NULL)) {
+ goto fail;
+ }
- nxt_router_ra_inc_use(ra);
+ ra->app_port = state->port;
- nxt_debug(task, "ra stream #%uD enqueue to "
- "port->pending_requests", ra->stream);
- }
+ nxt_router_ra_pending(task, app, ra);
}
}
- nxt_thread_mutex_unlock(&app->mutex);
+fail:
+
+ state->shared_ra = ra;
+}
+
+
+static nxt_int_t
+nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state)
+{
+ nxt_int_t res;
+ nxt_app_t *app;
+ nxt_req_app_link_t *ra;
- if (failed_port_use_delta != 0) {
- nxt_port_use(task, failed_port, failed_port_use_delta);
+ ra = state->shared_ra;
+ app = state->app;
+
+ if (state->failed_port_use_delta != 0) {
+ nxt_port_use(task, state->failed_port, state->failed_port_use_delta);
}
if (nxt_slow_path(ra == NULL)) {
- nxt_router_gen_error(task, c, 500, "Failed to allocate "
- "req<->app link");
-
- if (port != NULL) {
- nxt_port_use(task, port, -1);
+ if (state->port != NULL) {
+ nxt_port_use(task, state->port, -1);
}
+ nxt_router_ra_error(state->ra, 500,
+ "Failed to allocate shared req<->app link");
+ nxt_router_ra_use(task, state->ra, -1);
+
return NXT_ERROR;
}
- if (port != NULL) {
+ if (state->port != NULL) {
nxt_debug(task, "already have port for app '%V' %p ", &app->name, app);
- ra->app_port = port;
+ ra->app_port = state->port;
return NXT_OK;
}
- if (!can_start_worker) {
+ if (!state->can_start_worker) {
nxt_debug(task, "app '%V' %p too many running or pending workers",
&app->name, app);
@@ -3070,7 +3182,8 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
res = nxt_router_start_worker(task, app);
if (nxt_slow_path(res != NXT_OK)) {
- nxt_router_gen_error(task, c, 500, "Failed to start worker");
+ nxt_router_ra_error(ra, 500, "Failed to start worker");
+ nxt_router_ra_use(task, ra, -1);
return NXT_ERROR;
}
@@ -3079,6 +3192,24 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
}
+static nxt_int_t
+nxt_router_app_port(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra)
+{
+ nxt_port_select_state_t state;
+
+ state.ra = ra;
+ state.app = app;
+
+ nxt_thread_mutex_lock(&app->mutex);
+
+ nxt_router_port_select(task, &state);
+
+ nxt_thread_mutex_unlock(&app->mutex);
+
+ return nxt_router_port_post_select(task, &state);
+}
+
+
static void
nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data)
{
@@ -3392,7 +3523,7 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
ra = &ra_local;
nxt_router_ra_init(task, ra, rc);
- res = nxt_router_app_port(task, ra);
+ res = nxt_router_app_port(task, app, ra);
if (res != NXT_OK) {
return;
diff --git a/src/nxt_router.h b/src/nxt_router.h
index 521c0397..89543a1b 100644
--- a/src/nxt_router.h
+++ b/src/nxt_router.h
@@ -81,7 +81,8 @@ struct nxt_app_s {
nxt_thread_mutex_t mutex; /* Protects ports queue. */
nxt_queue_t ports; /* of nxt_port_t.app_link */
- nxt_queue_t requests; /* of nxt_req_conn_link_t */
+ nxt_queue_t requests; /* of nxt_req_app_link_t */
+ nxt_queue_t pending; /* of nxt_req_app_link_t */
nxt_str_t name;
uint32_t pending_workers;
@@ -90,6 +91,7 @@ struct nxt_app_s {
uint32_t max_pending_responses;
nxt_msec_t timeout;
+ nxt_nsec_t res_timeout;
nxt_app_type_t type:8;
uint8_t live; /* 1 bit */