From 4eecf1cb6ad520458e595313dc65e3e75405a252 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Tue, 26 Nov 2019 17:14:53 +0300 Subject: Refactoring reference counting of req_app_link. The reason for the change is that the req_app_link reference count was incorrect if the application crashed at start; in this case, the nxt_request_app_link_update_peer() function was never called. This closes #332 issue on GitHub. --- src/nxt_router.c | 82 +++++++++++++++++++++++++++++++++----------------------- 1 file changed, 48 insertions(+), 34 deletions(-) diff --git a/src/nxt_router.c b/src/nxt_router.c index b9f5d921..38396e86 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -95,16 +95,16 @@ nxt_request_app_link_inc_use(nxt_request_app_link_t *req_app_link) } nxt_inline void -nxt_request_app_link_dec_use(nxt_request_app_link_t *req_app_link) +nxt_request_app_link_chk_use(nxt_request_app_link_t *req_app_link, int i) { #if (NXT_DEBUG) int c; - c = nxt_atomic_fetch_add(&req_app_link->use_count, -1); + c = nxt_atomic_fetch_add(&req_app_link->use_count, i); - nxt_assert(c > 1); + nxt_assert((c + i) > 0); #else - (void) nxt_atomic_fetch_add(&req_app_link->use_count, -1); + (void) nxt_atomic_fetch_add(&req_app_link->use_count, i); #endif } @@ -600,8 +600,6 @@ nxt_request_app_link_update_peer(nxt_task_t *task, nxt_port_rpc_ex_set_peer(task, engine->port, req_rpc_data, req_app_link->app_port->pid); } - - nxt_request_app_link_use(task, req_app_link, -1); } @@ -3732,8 +3730,6 @@ nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, cancelled = nxt_router_msg_cancel(task, &req_app_link->msg_info, req_app_link->stream); if (cancelled) { - nxt_request_app_link_inc_use(req_app_link); - res = nxt_router_app_port(task, req_rpc_data->app, req_app_link); if (res == NXT_OK) { @@ -3751,6 +3747,8 @@ nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_router_app_prepare_request(task, req_app_link); } + nxt_request_app_link_use(task, req_app_link, -1); + msg->port_msg.last = 0; return; @@ -4015,6 +4013,8 @@ nxt_router_app_process_request(nxt_task_t *task, void *obj, void *data) #endif nxt_router_app_prepare_request(task, req_app_link); + + nxt_request_app_link_use(task, req_app_link, -1); } @@ -4148,8 +4148,6 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, re_ra->stream); if (cancelled) { - nxt_request_app_link_inc_use(re_ra); - state.req_app_link = re_ra; state.app = app; @@ -4217,19 +4215,38 @@ re_ra_cancelled: if (re_ra != NULL) { if (nxt_router_port_post_select(task, &state) == NXT_OK) { + /* + * There should be call nxt_request_app_link_inc_use(re_ra), + * but we need to decrement use then. So, let's skip both. + */ + nxt_work_queue_add(&task->thread->engine->fast_work_queue, nxt_router_app_process_request, &task->thread->engine->task, app, re_ra); + + } else { + /* + * This call should be unconditional, but we want to spare + * couple of CPU ticks to postpone the head death of the universe. + */ + + nxt_request_app_link_use(task, re_ra, -1); } } if (req_app_link != NULL) { - nxt_request_app_link_use(task, req_app_link, -1); + /* + * Here we do the same trick as described above, + * but without conditions. + * Skip required nxt_request_app_link_inc_use(req_app_link). + */ nxt_work_queue_add(&task->thread->engine->fast_work_queue, nxt_router_app_process_request, &task->thread->engine->task, app, req_app_link); + /* ... skip nxt_request_app_link_use(task, req_app_link, -1) too. */ + goto adjust_use; } @@ -4477,6 +4494,7 @@ nxt_router_free_app(nxt_task_t *task, void *obj, void *data) static void nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state) { + int ra_use_delta; nxt_app_t *app; nxt_bool_t can_start_process; nxt_request_app_link_t *req_app_link; @@ -4485,11 +4503,7 @@ nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state) app = state->app; state->failed_port_use_delta = 0; - - if (nxt_queue_chk_remove(&req_app_link->link_app_requests)) - { - nxt_request_app_link_dec_use(req_app_link); - } + ra_use_delta = -nxt_queue_chk_remove(&req_app_link->link_app_requests); if (nxt_queue_chk_remove(&req_app_link->link_port_pending)) { @@ -4498,7 +4512,7 @@ nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state) nxt_queue_remove(&req_app_link->link_app_pending); req_app_link->link_app_pending.next = NULL; - nxt_request_app_link_dec_use(req_app_link); + ra_use_delta--; } state->failed_port = req_app_link->app_port; @@ -4538,7 +4552,7 @@ nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state) &req_app_link->link_app_requests); } - nxt_request_app_link_inc_use(req_app_link); + ra_use_delta++; nxt_debug(task, "req_app_link stream #%uD enqueue to app->requests", req_app_link->stream); @@ -4569,6 +4583,8 @@ nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state) } } + nxt_request_app_link_chk_use(req_app_link, ra_use_delta); + fail: state->shared_ra = req_app_link; @@ -4596,7 +4612,6 @@ nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state) nxt_request_app_link_error(state->req_app_link, 500, "Failed to allocate shared req<->app link"); - nxt_request_app_link_use(task, state->req_app_link, -1); return NXT_ERROR; } @@ -4625,7 +4640,6 @@ nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state) if (nxt_slow_path(res != NXT_OK)) { nxt_request_app_link_error(req_app_link, 500, "Failed to start app process"); - nxt_request_app_link_use(task, req_app_link, -1); return NXT_ERROR; } @@ -4686,19 +4700,19 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r, nxt_request_app_link_init(task, req_app_link, req_rpc_data); res = nxt_router_app_port(task, app, req_app_link); + req_app_link = req_rpc_data->req_app_link; - if (res != NXT_OK) { - return; - } + if (res == NXT_OK) { + port = req_app_link->app_port; - req_app_link = req_rpc_data->req_app_link; - port = req_app_link->app_port; + nxt_assert(port != NULL); - nxt_assert(port != NULL); + nxt_port_rpc_ex_set_peer(task, engine->port, req_rpc_data, port->pid); - nxt_port_rpc_ex_set_peer(task, engine->port, req_rpc_data, port->pid); + nxt_router_app_prepare_request(task, req_app_link); + } - nxt_router_app_prepare_request(task, req_app_link); + nxt_request_app_link_use(task, req_app_link, -1); } @@ -5172,8 +5186,6 @@ nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data) pending_ra->stream); if (cancelled) { - nxt_request_app_link_inc_use(pending_ra); - state.req_app_link = pending_ra; state.app = app; @@ -5186,10 +5198,12 @@ nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data) nxt_thread_mutex_unlock(&app->mutex); - if (pending_ra != NULL - && nxt_router_port_post_select(task, &state) == NXT_OK) - { - nxt_router_app_prepare_request(task, pending_ra); + if (pending_ra != NULL) { + if (nxt_router_port_post_select(task, &state) == NXT_OK) { + nxt_router_app_prepare_request(task, pending_ra); + } + + nxt_request_app_link_use(task, pending_ra, -1); } nxt_debug(task, "send quit to app '%V' pid %PI", &app->name, port->pid); -- cgit