diff options
-rw-r--r-- | src/nxt_conf_validation.c | 5 | ||||
-rw-r--r-- | src/nxt_port_rpc.c | 4 | ||||
-rw-r--r-- | src/nxt_router.c | 297 | ||||
-rw-r--r-- | src/nxt_router.h | 4 |
4 files changed, 222 insertions, 88 deletions
diff --git a/src/nxt_conf_validation.c b/src/nxt_conf_validation.c index cc2ce5ef..05d0a021 100644 --- a/src/nxt_conf_validation.c +++ b/src/nxt_conf_validation.c @@ -78,6 +78,11 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_app_limits_members[] = { NULL, NULL }, + { nxt_string("reschedule_timeout"), + NXT_CONF_INTEGER, + NULL, + NULL }, + { nxt_string("requests"), NXT_CONF_INTEGER, NULL, diff --git a/src/nxt_port_rpc.c b/src/nxt_port_rpc.c index 15c550f4..b041256a 100644 --- a/src/nxt_port_rpc.c +++ b/src/nxt_port_rpc.c @@ -314,10 +314,6 @@ nxt_port_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) reg = lhq.value; - if (reg->peer != -1) { - nxt_assert(reg->peer == msg->port_msg.pid); - } - if (type == _NXT_PORT_MSG_RPC_ERROR) { reg->error_handler(task, msg, reg->data); diff --git a/src/nxt_router.c b/src/nxt_router.c index 1f0f0b0f..e586f63a 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -13,6 +13,7 @@ typedef struct { nxt_str_t type; uint32_t workers; nxt_msec_t timeout; + nxt_msec_t res_timeout; uint32_t requests; nxt_conf_value_t *limits_value; } nxt_router_app_conf_t; @@ -55,8 +56,11 @@ struct nxt_req_app_link_s { nxt_msg_info_t msg_info; nxt_req_conn_link_t *rc; + nxt_nsec_t res_time; + nxt_queue_link_t link_app_requests; /* for nxt_app_t.requests */ nxt_queue_link_t link_port_pending; /* for nxt_port_t.pending_requests */ + nxt_queue_link_t link_app_pending; /* for nxt_app_t.pending */ nxt_mp_t *mem_pool; nxt_work_t work; @@ -72,6 +76,26 @@ typedef struct { } nxt_socket_rpc_t; +struct nxt_port_select_state_s { + nxt_app_t *app; + nxt_req_app_link_t *ra; + + nxt_port_t *failed_port; + int failed_port_use_delta; + + nxt_bool_t can_start_worker; + nxt_req_app_link_t *shared_ra; + nxt_port_t *port; +}; + +typedef struct nxt_port_select_state_s nxt_port_select_state_t; + +static void nxt_router_port_select(nxt_task_t *task, + nxt_port_select_state_t *state); + +static nxt_int_t nxt_router_port_post_select(nxt_task_t *task, + nxt_port_select_state_t *state); + static nxt_int_t nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app); nxt_inline void @@ -172,7 +196,8 @@ static void nxt_router_app_port_error(nxt_task_t *task, static nxt_port_t * nxt_router_app_get_idle_port(nxt_app_t *app); static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, uint32_t request_failed, uint32_t got_response); -static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra); +static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_app_t *app, + nxt_req_app_link_t *ra); static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data); static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, @@ -575,6 +600,21 @@ nxt_router_ra_error(nxt_req_app_link_t *ra, int code, const char* str) } +nxt_inline void +nxt_router_ra_pending(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra) +{ + nxt_queue_insert_tail(&ra->app_port->pending_requests, + &ra->link_port_pending); + nxt_queue_insert_tail(&app->pending, &ra->link_app_pending); + + nxt_router_ra_inc_use(ra); + + ra->res_time = nxt_thread_monotonic_time(task->thread) + app->res_timeout; + + nxt_debug(task, "ra stream #%uD enqueue to pending_requests", ra->stream); +} + + nxt_inline nxt_bool_t nxt_queue_chk_remove(nxt_queue_link_t *lnk) { @@ -615,13 +655,15 @@ nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc) nxt_thread_mutex_lock(&rc->app->mutex); if (ra->link_app_requests.next == NULL - && ra->link_port_pending.next == NULL) + && ra->link_port_pending.next == NULL + && ra->link_app_pending.next == NULL) { ra = NULL; } else { ra_use_delta -= nxt_queue_chk_remove(&ra->link_app_requests); ra_use_delta -= nxt_queue_chk_remove(&ra->link_port_pending); + nxt_queue_chk_remove(&ra->link_app_pending); } nxt_thread_mutex_unlock(&rc->app->mutex); @@ -974,6 +1016,12 @@ static nxt_conf_map_t nxt_router_app_limits_conf[] = { }, { + nxt_string("reschedule_timeout"), + NXT_CONF_MAP_MSEC, + offsetof(nxt_router_app_conf_t, res_timeout), + }, + + { nxt_string("requests"), NXT_CONF_MAP_INT32, offsetof(nxt_router_app_conf_t, requests), @@ -1127,6 +1175,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, apcf.workers = 1; apcf.timeout = 0; + apcf.res_timeout = 1000; apcf.requests = 0; apcf.limits_value = NULL; @@ -1156,7 +1205,8 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_debug(task, "application type: %V", &apcf.type); nxt_debug(task, "application workers: %D", apcf.workers); - nxt_debug(task, "application timeout: %D", apcf.timeout); + nxt_debug(task, "application request timeout: %D", apcf.timeout); + nxt_debug(task, "application reschedule timeout: %D", apcf.res_timeout); nxt_debug(task, "application requests: %D", apcf.requests); lang = nxt_app_lang_module(task->thread->runtime, &apcf.type); @@ -1176,6 +1226,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_queue_init(&app->ports); nxt_queue_init(&app->requests); + nxt_queue_init(&app->pending); app->name.length = name.length; nxt_memcpy(app->name.start, name.start, name.length); @@ -1183,6 +1234,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, app->type = lang->type; app->max_workers = apcf.workers; app->timeout = apcf.timeout; + app->res_timeout = apcf.res_timeout * 1000000; app->live = 1; app->max_pending_responses = 2; app->prepare_msg = nxt_app_prepare_msg[lang->type]; @@ -2481,7 +2533,7 @@ nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, if (cancelled) { nxt_router_ra_inc_use(ra); - res = nxt_router_app_port(task, ra); + res = nxt_router_app_port(task, rc->app, ra); if (res == NXT_OK) { port = ra->app_port; @@ -2702,7 +2754,7 @@ nxt_router_app_first_port_busy(nxt_app_t *app) nxt_inline nxt_port_t * -nxt_router_app_get_port_unsafe(nxt_app_t *app) +nxt_router_pop_first_port(nxt_app_t *app) { nxt_port_t *port; nxt_queue_link_t *lnk; @@ -2783,14 +2835,17 @@ static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, uint32_t request_failed, uint32_t got_response) { - nxt_app_t *app; - nxt_bool_t send_quit; - nxt_queue_link_t *lnk; - nxt_req_app_link_t *ra, *next_ra; + nxt_app_t *app; + nxt_bool_t send_quit, cancelled; + nxt_queue_link_t *lnk; + nxt_req_app_link_t *ra, *pending_ra, *re_ra; + nxt_port_select_state_t state; nxt_assert(port != NULL); nxt_assert(port->app != NULL); + ra = NULL; + app = port->app; nxt_thread_mutex_lock(&app->mutex); @@ -2798,8 +2853,11 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, port->app_pending_responses -= request_failed + got_response; port->app_responses += got_response; - if (app->live != 0 - && port->pair[1] != -1 + if (nxt_slow_path(app->live == 0)) { + goto app_dead; + } + + if (port->pair[1] != -1 && (app->max_pending_responses == 0 || port->app_pending_responses < app->max_pending_responses)) { @@ -2823,8 +2881,7 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, } } - if (app->live != 0 - && !nxt_queue_is_empty(&app->ports) + if (!nxt_queue_is_empty(&app->ports) && !nxt_queue_is_empty(&app->requests)) { lnk = nxt_queue_first(&app->requests); @@ -2833,19 +2890,16 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_requests); - ra->app_port = nxt_router_app_get_port_unsafe(app); + ra->app_port = nxt_router_pop_first_port(app); if (ra->app_port->app_pending_responses > 1) { - nxt_queue_insert_tail(&ra->app_port->pending_requests, - &ra->link_port_pending); - - nxt_router_ra_inc_use(ra); + nxt_router_ra_pending(task, app, ra); } - - } else { - ra = NULL; } +app_dead: + + /* Pop first pending request for this port. */ if ((request_failed > 0 || got_response > 0) && !nxt_queue_is_empty(&port->pending_requests)) { @@ -2853,19 +2907,63 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, nxt_queue_remove(lnk); lnk->next = NULL; - next_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, - link_port_pending); + pending_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, + link_port_pending); + + nxt_assert(pending_ra->link_app_pending.next != NULL); + + nxt_queue_remove(&pending_ra->link_app_pending); + pending_ra->link_app_pending.next = NULL; } else { - next_ra = NULL; + pending_ra = NULL; } + /* Try to cancel and re-schedule first stalled request for this app. */ + if (got_response > 0 && !nxt_queue_is_empty(&app->pending)) { + lnk = nxt_queue_first(&app->pending); + + re_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_pending); + + if (re_ra->res_time <= nxt_thread_monotonic_time(task->thread)) { + + nxt_debug(task, "app '%V' stalled request #%uD detected", + &app->name, re_ra->stream); + + cancelled = nxt_router_msg_cancel(task, &re_ra->msg_info, + re_ra->stream); + + if (cancelled) { + nxt_router_ra_inc_use(re_ra); + + state.ra = re_ra; + state.app = app; + + nxt_router_port_select(task, &state); + + goto re_ra_cancelled; + } + } + } + + re_ra = NULL; + +re_ra_cancelled: + send_quit = app->live == 0 && port->app_pending_responses > 0; nxt_thread_mutex_unlock(&app->mutex); - if (next_ra != NULL) { - nxt_router_ra_use(task, next_ra, -1); + if (pending_ra != NULL) { + nxt_router_ra_use(task, pending_ra, -1); + } + + if (re_ra != NULL) { + if (nxt_router_port_post_select(task, &state) == NXT_OK) { + nxt_work_queue_add(&task->thread->engine->fast_work_queue, + nxt_router_app_process_request, + &task->thread->engine->task, app, re_ra); + } } if (ra != NULL) { @@ -2950,22 +3048,16 @@ nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port) } -static nxt_int_t -nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra) +static void +nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state) { - int failed_port_use_delta; - nxt_int_t res; - nxt_app_t *app; - nxt_bool_t can_start_worker; - nxt_conn_t *c; - nxt_port_t *port, *failed_port; - - c = ra->rc->conn; - app = ra->rc->app; + nxt_app_t *app; + nxt_req_app_link_t *ra; - failed_port_use_delta = 0; + ra = state->ra; + app = state->app; - nxt_thread_mutex_lock(&app->mutex); + state->failed_port_use_delta = 0; if (nxt_queue_chk_remove(&ra->link_app_requests)) { @@ -2974,93 +3066,113 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra) if (nxt_queue_chk_remove(&ra->link_port_pending)) { + nxt_assert(ra->link_app_pending.next != NULL); + + nxt_queue_remove(&ra->link_app_pending); + ra->link_app_pending.next = NULL; + nxt_router_ra_dec_use(ra); } - if (ra->app_port != NULL) { - failed_port = ra->app_port; - failed_port_use_delta--; + state->failed_port = ra->app_port; - failed_port->app_pending_responses--; + if (ra->app_port != NULL) { + state->failed_port_use_delta--; - if (failed_port->app_link.next != NULL) { - nxt_queue_remove(&failed_port->app_link); - failed_port->app_link.next = NULL; + state->failed_port->app_pending_responses--; - failed_port_use_delta--; + if (nxt_queue_chk_remove(&state->failed_port->app_link)) { + state->failed_port_use_delta--; } - } else { - failed_port = NULL; + ra->app_port = NULL; } - can_start_worker = (app->workers + app->pending_workers) - < app->max_workers; + state->can_start_worker = (app->workers + app->pending_workers) + < app->max_workers; + state->port = NULL; if (nxt_queue_is_empty(&app->ports) - || (can_start_worker && nxt_router_app_first_port_busy(app)) ) + || (state->can_start_worker && nxt_router_app_first_port_busy(app)) ) { ra = nxt_router_ra_create(task, ra); - if (nxt_fast_path(ra != NULL)) { + if (nxt_slow_path(ra == NULL)) { + goto fail; + } + + if (nxt_slow_path(state->failed_port != NULL)) { + nxt_queue_insert_head(&app->requests, &ra->link_app_requests); + + } else { nxt_queue_insert_tail(&app->requests, &ra->link_app_requests); + } - nxt_router_ra_inc_use(ra); + nxt_router_ra_inc_use(ra); - nxt_debug(task, "ra stream #%uD enqueue to app->requests", - ra->stream); + nxt_debug(task, "ra stream #%uD enqueue to app->requests", ra->stream); - if (can_start_worker) { - app->pending_workers++; - } + if (state->can_start_worker) { + app->pending_workers++; } - port = NULL; - } else { - port = nxt_router_app_get_port_unsafe(app); + state->port = nxt_router_pop_first_port(app); - if (port->app_pending_responses > 1) { + if (state->port->app_pending_responses > 1) { ra = nxt_router_ra_create(task, ra); - if (nxt_fast_path(ra != NULL)) { - nxt_queue_insert_tail(&port->pending_requests, - &ra->link_port_pending); + if (nxt_slow_path(ra == NULL)) { + goto fail; + } - nxt_router_ra_inc_use(ra); + ra->app_port = state->port; - nxt_debug(task, "ra stream #%uD enqueue to " - "port->pending_requests", ra->stream); - } + nxt_router_ra_pending(task, app, ra); } } - nxt_thread_mutex_unlock(&app->mutex); +fail: + + state->shared_ra = ra; +} + + +static nxt_int_t +nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state) +{ + nxt_int_t res; + nxt_app_t *app; + nxt_req_app_link_t *ra; - if (failed_port_use_delta != 0) { - nxt_port_use(task, failed_port, failed_port_use_delta); + ra = state->shared_ra; + app = state->app; + + if (state->failed_port_use_delta != 0) { + nxt_port_use(task, state->failed_port, state->failed_port_use_delta); } if (nxt_slow_path(ra == NULL)) { - nxt_router_gen_error(task, c, 500, "Failed to allocate " - "req<->app link"); - - if (port != NULL) { - nxt_port_use(task, port, -1); + if (state->port != NULL) { + nxt_port_use(task, state->port, -1); } + nxt_router_ra_error(state->ra, 500, + "Failed to allocate shared req<->app link"); + nxt_router_ra_use(task, state->ra, -1); + return NXT_ERROR; } - if (port != NULL) { + if (state->port != NULL) { nxt_debug(task, "already have port for app '%V' %p ", &app->name, app); - ra->app_port = port; + ra->app_port = state->port; return NXT_OK; } - if (!can_start_worker) { + if (!state->can_start_worker) { nxt_debug(task, "app '%V' %p too many running or pending workers", &app->name, app); @@ -3070,7 +3182,8 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra) res = nxt_router_start_worker(task, app); if (nxt_slow_path(res != NXT_OK)) { - nxt_router_gen_error(task, c, 500, "Failed to start worker"); + nxt_router_ra_error(ra, 500, "Failed to start worker"); + nxt_router_ra_use(task, ra, -1); return NXT_ERROR; } @@ -3079,6 +3192,24 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra) } +static nxt_int_t +nxt_router_app_port(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra) +{ + nxt_port_select_state_t state; + + state.ra = ra; + state.app = app; + + nxt_thread_mutex_lock(&app->mutex); + + nxt_router_port_select(task, &state); + + nxt_thread_mutex_unlock(&app->mutex); + + return nxt_router_port_post_select(task, &state); +} + + static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data) { @@ -3392,7 +3523,7 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, ra = &ra_local; nxt_router_ra_init(task, ra, rc); - res = nxt_router_app_port(task, ra); + res = nxt_router_app_port(task, app, ra); if (res != NXT_OK) { return; diff --git a/src/nxt_router.h b/src/nxt_router.h index 521c0397..89543a1b 100644 --- a/src/nxt_router.h +++ b/src/nxt_router.h @@ -81,7 +81,8 @@ struct nxt_app_s { nxt_thread_mutex_t mutex; /* Protects ports queue. */ nxt_queue_t ports; /* of nxt_port_t.app_link */ - nxt_queue_t requests; /* of nxt_req_conn_link_t */ + nxt_queue_t requests; /* of nxt_req_app_link_t */ + nxt_queue_t pending; /* of nxt_req_app_link_t */ nxt_str_t name; uint32_t pending_workers; @@ -90,6 +91,7 @@ struct nxt_app_s { uint32_t max_pending_responses; nxt_msec_t timeout; + nxt_nsec_t res_timeout; nxt_app_type_t type:8; uint8_t live; /* 1 bit */ |