diff options
author | Max Romanov <max.romanov@nginx.com> | 2017-12-27 17:48:04 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2017-12-27 17:48:04 +0300 |
commit | 5196cf4d5003dade99321f71bcc1af80f0e5ad50 (patch) | |
tree | 7e087e5135793b55922082f18a84b387832cac63 /src/nxt_router.c | |
parent | baa8c9387b00dc3cd72cfca14c61c25a2badada9 (diff) | |
download | unit-5196cf4d5003dade99321f71bcc1af80f0e5ad50.tar.gz unit-5196cf4d5003dade99321f71bcc1af80f0e5ad50.tar.bz2 |
Rescheduling of pending request after configured timeout.
New optional configuration parameter introduced: limits.reschedule_timeout.
Default value 1 second. In the case when request is written to the port
socket 'in advance', it is called 'pending'.
On every completed request, the head of pending request is checked against
reschedule timeout. If this request waiting for execution longer than
timeout, it is cancelled, new port selected for this request.
Diffstat (limited to 'src/nxt_router.c')
-rw-r--r-- | src/nxt_router.c | 297 |
1 files changed, 214 insertions, 83 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c index 1f0f0b0f..e586f63a 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -13,6 +13,7 @@ typedef struct { nxt_str_t type; uint32_t workers; nxt_msec_t timeout; + nxt_msec_t res_timeout; uint32_t requests; nxt_conf_value_t *limits_value; } nxt_router_app_conf_t; @@ -55,8 +56,11 @@ struct nxt_req_app_link_s { nxt_msg_info_t msg_info; nxt_req_conn_link_t *rc; + nxt_nsec_t res_time; + nxt_queue_link_t link_app_requests; /* for nxt_app_t.requests */ nxt_queue_link_t link_port_pending; /* for nxt_port_t.pending_requests */ + nxt_queue_link_t link_app_pending; /* for nxt_app_t.pending */ nxt_mp_t *mem_pool; nxt_work_t work; @@ -72,6 +76,26 @@ typedef struct { } nxt_socket_rpc_t; +struct nxt_port_select_state_s { + nxt_app_t *app; + nxt_req_app_link_t *ra; + + nxt_port_t *failed_port; + int failed_port_use_delta; + + nxt_bool_t can_start_worker; + nxt_req_app_link_t *shared_ra; + nxt_port_t *port; +}; + +typedef struct nxt_port_select_state_s nxt_port_select_state_t; + +static void nxt_router_port_select(nxt_task_t *task, + nxt_port_select_state_t *state); + +static nxt_int_t nxt_router_port_post_select(nxt_task_t *task, + nxt_port_select_state_t *state); + static nxt_int_t nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app); nxt_inline void @@ -172,7 +196,8 @@ static void nxt_router_app_port_error(nxt_task_t *task, static nxt_port_t * nxt_router_app_get_idle_port(nxt_app_t *app); static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, uint32_t request_failed, uint32_t got_response); -static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra); +static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_app_t *app, + nxt_req_app_link_t *ra); static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data); static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, @@ -575,6 +600,21 @@ nxt_router_ra_error(nxt_req_app_link_t *ra, int code, const char* str) } +nxt_inline void +nxt_router_ra_pending(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra) +{ + nxt_queue_insert_tail(&ra->app_port->pending_requests, + &ra->link_port_pending); + nxt_queue_insert_tail(&app->pending, &ra->link_app_pending); + + nxt_router_ra_inc_use(ra); + + ra->res_time = nxt_thread_monotonic_time(task->thread) + app->res_timeout; + + nxt_debug(task, "ra stream #%uD enqueue to pending_requests", ra->stream); +} + + nxt_inline nxt_bool_t nxt_queue_chk_remove(nxt_queue_link_t *lnk) { @@ -615,13 +655,15 @@ nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc) nxt_thread_mutex_lock(&rc->app->mutex); if (ra->link_app_requests.next == NULL - && ra->link_port_pending.next == NULL) + && ra->link_port_pending.next == NULL + && ra->link_app_pending.next == NULL) { ra = NULL; } else { ra_use_delta -= nxt_queue_chk_remove(&ra->link_app_requests); ra_use_delta -= nxt_queue_chk_remove(&ra->link_port_pending); + nxt_queue_chk_remove(&ra->link_app_pending); } nxt_thread_mutex_unlock(&rc->app->mutex); @@ -974,6 +1016,12 @@ static nxt_conf_map_t nxt_router_app_limits_conf[] = { }, { + nxt_string("reschedule_timeout"), + NXT_CONF_MAP_MSEC, + offsetof(nxt_router_app_conf_t, res_timeout), + }, + + { nxt_string("requests"), NXT_CONF_MAP_INT32, offsetof(nxt_router_app_conf_t, requests), @@ -1127,6 +1175,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, apcf.workers = 1; apcf.timeout = 0; + apcf.res_timeout = 1000; apcf.requests = 0; apcf.limits_value = NULL; @@ -1156,7 +1205,8 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_debug(task, "application type: %V", &apcf.type); nxt_debug(task, "application workers: %D", apcf.workers); - nxt_debug(task, "application timeout: %D", apcf.timeout); + nxt_debug(task, "application request timeout: %D", apcf.timeout); + nxt_debug(task, "application reschedule timeout: %D", apcf.res_timeout); nxt_debug(task, "application requests: %D", apcf.requests); lang = nxt_app_lang_module(task->thread->runtime, &apcf.type); @@ -1176,6 +1226,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_queue_init(&app->ports); nxt_queue_init(&app->requests); + nxt_queue_init(&app->pending); app->name.length = name.length; nxt_memcpy(app->name.start, name.start, name.length); @@ -1183,6 +1234,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, app->type = lang->type; app->max_workers = apcf.workers; app->timeout = apcf.timeout; + app->res_timeout = apcf.res_timeout * 1000000; app->live = 1; app->max_pending_responses = 2; app->prepare_msg = nxt_app_prepare_msg[lang->type]; @@ -2481,7 +2533,7 @@ nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, if (cancelled) { nxt_router_ra_inc_use(ra); - res = nxt_router_app_port(task, ra); + res = nxt_router_app_port(task, rc->app, ra); if (res == NXT_OK) { port = ra->app_port; @@ -2702,7 +2754,7 @@ nxt_router_app_first_port_busy(nxt_app_t *app) nxt_inline nxt_port_t * -nxt_router_app_get_port_unsafe(nxt_app_t *app) +nxt_router_pop_first_port(nxt_app_t *app) { nxt_port_t *port; nxt_queue_link_t *lnk; @@ -2783,14 +2835,17 @@ static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, uint32_t request_failed, uint32_t got_response) { - nxt_app_t *app; - nxt_bool_t send_quit; - nxt_queue_link_t *lnk; - nxt_req_app_link_t *ra, *next_ra; + nxt_app_t *app; + nxt_bool_t send_quit, cancelled; + nxt_queue_link_t *lnk; + nxt_req_app_link_t *ra, *pending_ra, *re_ra; + nxt_port_select_state_t state; nxt_assert(port != NULL); nxt_assert(port->app != NULL); + ra = NULL; + app = port->app; nxt_thread_mutex_lock(&app->mutex); @@ -2798,8 +2853,11 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, port->app_pending_responses -= request_failed + got_response; port->app_responses += got_response; - if (app->live != 0 - && port->pair[1] != -1 + if (nxt_slow_path(app->live == 0)) { + goto app_dead; + } + + if (port->pair[1] != -1 && (app->max_pending_responses == 0 || port->app_pending_responses < app->max_pending_responses)) { @@ -2823,8 +2881,7 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, } } - if (app->live != 0 - && !nxt_queue_is_empty(&app->ports) + if (!nxt_queue_is_empty(&app->ports) && !nxt_queue_is_empty(&app->requests)) { lnk = nxt_queue_first(&app->requests); @@ -2833,19 +2890,16 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_requests); - ra->app_port = nxt_router_app_get_port_unsafe(app); + ra->app_port = nxt_router_pop_first_port(app); if (ra->app_port->app_pending_responses > 1) { - nxt_queue_insert_tail(&ra->app_port->pending_requests, - &ra->link_port_pending); - - nxt_router_ra_inc_use(ra); + nxt_router_ra_pending(task, app, ra); } - - } else { - ra = NULL; } +app_dead: + + /* Pop first pending request for this port. */ if ((request_failed > 0 || got_response > 0) && !nxt_queue_is_empty(&port->pending_requests)) { @@ -2853,19 +2907,63 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, nxt_queue_remove(lnk); lnk->next = NULL; - next_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, - link_port_pending); + pending_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, + link_port_pending); + + nxt_assert(pending_ra->link_app_pending.next != NULL); + + nxt_queue_remove(&pending_ra->link_app_pending); + pending_ra->link_app_pending.next = NULL; } else { - next_ra = NULL; + pending_ra = NULL; } + /* Try to cancel and re-schedule first stalled request for this app. */ + if (got_response > 0 && !nxt_queue_is_empty(&app->pending)) { + lnk = nxt_queue_first(&app->pending); + + re_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_pending); + + if (re_ra->res_time <= nxt_thread_monotonic_time(task->thread)) { + + nxt_debug(task, "app '%V' stalled request #%uD detected", + &app->name, re_ra->stream); + + cancelled = nxt_router_msg_cancel(task, &re_ra->msg_info, + re_ra->stream); + + if (cancelled) { + nxt_router_ra_inc_use(re_ra); + + state.ra = re_ra; + state.app = app; + + nxt_router_port_select(task, &state); + + goto re_ra_cancelled; + } + } + } + + re_ra = NULL; + +re_ra_cancelled: + send_quit = app->live == 0 && port->app_pending_responses > 0; nxt_thread_mutex_unlock(&app->mutex); - if (next_ra != NULL) { - nxt_router_ra_use(task, next_ra, -1); + if (pending_ra != NULL) { + nxt_router_ra_use(task, pending_ra, -1); + } + + if (re_ra != NULL) { + if (nxt_router_port_post_select(task, &state) == NXT_OK) { + nxt_work_queue_add(&task->thread->engine->fast_work_queue, + nxt_router_app_process_request, + &task->thread->engine->task, app, re_ra); + } } if (ra != NULL) { @@ -2950,22 +3048,16 @@ nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port) } -static nxt_int_t -nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra) +static void +nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state) { - int failed_port_use_delta; - nxt_int_t res; - nxt_app_t *app; - nxt_bool_t can_start_worker; - nxt_conn_t *c; - nxt_port_t *port, *failed_port; - - c = ra->rc->conn; - app = ra->rc->app; + nxt_app_t *app; + nxt_req_app_link_t *ra; - failed_port_use_delta = 0; + ra = state->ra; + app = state->app; - nxt_thread_mutex_lock(&app->mutex); + state->failed_port_use_delta = 0; if (nxt_queue_chk_remove(&ra->link_app_requests)) { @@ -2974,93 +3066,113 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra) if (nxt_queue_chk_remove(&ra->link_port_pending)) { + nxt_assert(ra->link_app_pending.next != NULL); + + nxt_queue_remove(&ra->link_app_pending); + ra->link_app_pending.next = NULL; + nxt_router_ra_dec_use(ra); } - if (ra->app_port != NULL) { - failed_port = ra->app_port; - failed_port_use_delta--; + state->failed_port = ra->app_port; - failed_port->app_pending_responses--; + if (ra->app_port != NULL) { + state->failed_port_use_delta--; - if (failed_port->app_link.next != NULL) { - nxt_queue_remove(&failed_port->app_link); - failed_port->app_link.next = NULL; + state->failed_port->app_pending_responses--; - failed_port_use_delta--; + if (nxt_queue_chk_remove(&state->failed_port->app_link)) { + state->failed_port_use_delta--; } - } else { - failed_port = NULL; + ra->app_port = NULL; } - can_start_worker = (app->workers + app->pending_workers) - < app->max_workers; + state->can_start_worker = (app->workers + app->pending_workers) + < app->max_workers; + state->port = NULL; if (nxt_queue_is_empty(&app->ports) - || (can_start_worker && nxt_router_app_first_port_busy(app)) ) + || (state->can_start_worker && nxt_router_app_first_port_busy(app)) ) { ra = nxt_router_ra_create(task, ra); - if (nxt_fast_path(ra != NULL)) { + if (nxt_slow_path(ra == NULL)) { + goto fail; + } + + if (nxt_slow_path(state->failed_port != NULL)) { + nxt_queue_insert_head(&app->requests, &ra->link_app_requests); + + } else { nxt_queue_insert_tail(&app->requests, &ra->link_app_requests); + } - nxt_router_ra_inc_use(ra); + nxt_router_ra_inc_use(ra); - nxt_debug(task, "ra stream #%uD enqueue to app->requests", - ra->stream); + nxt_debug(task, "ra stream #%uD enqueue to app->requests", ra->stream); - if (can_start_worker) { - app->pending_workers++; - } + if (state->can_start_worker) { + app->pending_workers++; } - port = NULL; - } else { - port = nxt_router_app_get_port_unsafe(app); + state->port = nxt_router_pop_first_port(app); - if (port->app_pending_responses > 1) { + if (state->port->app_pending_responses > 1) { ra = nxt_router_ra_create(task, ra); - if (nxt_fast_path(ra != NULL)) { - nxt_queue_insert_tail(&port->pending_requests, - &ra->link_port_pending); + if (nxt_slow_path(ra == NULL)) { + goto fail; + } - nxt_router_ra_inc_use(ra); + ra->app_port = state->port; - nxt_debug(task, "ra stream #%uD enqueue to " - "port->pending_requests", ra->stream); - } + nxt_router_ra_pending(task, app, ra); } } - nxt_thread_mutex_unlock(&app->mutex); +fail: + + state->shared_ra = ra; +} + + +static nxt_int_t +nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state) +{ + nxt_int_t res; + nxt_app_t *app; + nxt_req_app_link_t *ra; - if (failed_port_use_delta != 0) { - nxt_port_use(task, failed_port, failed_port_use_delta); + ra = state->shared_ra; + app = state->app; + + if (state->failed_port_use_delta != 0) { + nxt_port_use(task, state->failed_port, state->failed_port_use_delta); } if (nxt_slow_path(ra == NULL)) { - nxt_router_gen_error(task, c, 500, "Failed to allocate " - "req<->app link"); - - if (port != NULL) { - nxt_port_use(task, port, -1); + if (state->port != NULL) { + nxt_port_use(task, state->port, -1); } + nxt_router_ra_error(state->ra, 500, + "Failed to allocate shared req<->app link"); + nxt_router_ra_use(task, state->ra, -1); + return NXT_ERROR; } - if (port != NULL) { + if (state->port != NULL) { nxt_debug(task, "already have port for app '%V' %p ", &app->name, app); - ra->app_port = port; + ra->app_port = state->port; return NXT_OK; } - if (!can_start_worker) { + if (!state->can_start_worker) { nxt_debug(task, "app '%V' %p too many running or pending workers", &app->name, app); @@ -3070,7 +3182,8 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra) res = nxt_router_start_worker(task, app); if (nxt_slow_path(res != NXT_OK)) { - nxt_router_gen_error(task, c, 500, "Failed to start worker"); + nxt_router_ra_error(ra, 500, "Failed to start worker"); + nxt_router_ra_use(task, ra, -1); return NXT_ERROR; } @@ -3079,6 +3192,24 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra) } +static nxt_int_t +nxt_router_app_port(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra) +{ + nxt_port_select_state_t state; + + state.ra = ra; + state.app = app; + + nxt_thread_mutex_lock(&app->mutex); + + nxt_router_port_select(task, &state); + + nxt_thread_mutex_unlock(&app->mutex); + + return nxt_router_port_post_select(task, &state); +} + + static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data) { @@ -3392,7 +3523,7 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, ra = &ra_local; nxt_router_ra_init(task, ra, rc); - res = nxt_router_app_port(task, ra); + res = nxt_router_app_port(task, app, ra); if (res != NXT_OK) { return; |