summaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/nxt_router.c82
1 files changed, 48 insertions, 34 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c
index b9f5d921..38396e86 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -95,16 +95,16 @@ nxt_request_app_link_inc_use(nxt_request_app_link_t *req_app_link)
}
nxt_inline void
-nxt_request_app_link_dec_use(nxt_request_app_link_t *req_app_link)
+nxt_request_app_link_chk_use(nxt_request_app_link_t *req_app_link, int i)
{
#if (NXT_DEBUG)
int c;
- c = nxt_atomic_fetch_add(&req_app_link->use_count, -1);
+ c = nxt_atomic_fetch_add(&req_app_link->use_count, i);
- nxt_assert(c > 1);
+ nxt_assert((c + i) > 0);
#else
- (void) nxt_atomic_fetch_add(&req_app_link->use_count, -1);
+ (void) nxt_atomic_fetch_add(&req_app_link->use_count, i);
#endif
}
@@ -600,8 +600,6 @@ nxt_request_app_link_update_peer(nxt_task_t *task,
nxt_port_rpc_ex_set_peer(task, engine->port, req_rpc_data,
req_app_link->app_port->pid);
}
-
- nxt_request_app_link_use(task, req_app_link, -1);
}
@@ -3732,8 +3730,6 @@ nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
cancelled = nxt_router_msg_cancel(task, &req_app_link->msg_info,
req_app_link->stream);
if (cancelled) {
- nxt_request_app_link_inc_use(req_app_link);
-
res = nxt_router_app_port(task, req_rpc_data->app, req_app_link);
if (res == NXT_OK) {
@@ -3751,6 +3747,8 @@ nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_router_app_prepare_request(task, req_app_link);
}
+ nxt_request_app_link_use(task, req_app_link, -1);
+
msg->port_msg.last = 0;
return;
@@ -4015,6 +4013,8 @@ nxt_router_app_process_request(nxt_task_t *task, void *obj, void *data)
#endif
nxt_router_app_prepare_request(task, req_app_link);
+
+ nxt_request_app_link_use(task, req_app_link, -1);
}
@@ -4148,8 +4148,6 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
re_ra->stream);
if (cancelled) {
- nxt_request_app_link_inc_use(re_ra);
-
state.req_app_link = re_ra;
state.app = app;
@@ -4217,19 +4215,38 @@ re_ra_cancelled:
if (re_ra != NULL) {
if (nxt_router_port_post_select(task, &state) == NXT_OK) {
+ /*
+ * There should be call nxt_request_app_link_inc_use(re_ra),
+ * but we need to decrement use then. So, let's skip both.
+ */
+
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
nxt_router_app_process_request,
&task->thread->engine->task, app, re_ra);
+
+ } else {
+ /*
+ * This call should be unconditional, but we want to spare
+ * couple of CPU ticks to postpone the head death of the universe.
+ */
+
+ nxt_request_app_link_use(task, re_ra, -1);
}
}
if (req_app_link != NULL) {
- nxt_request_app_link_use(task, req_app_link, -1);
+ /*
+ * Here we do the same trick as described above,
+ * but without conditions.
+ * Skip required nxt_request_app_link_inc_use(req_app_link).
+ */
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
nxt_router_app_process_request,
&task->thread->engine->task, app, req_app_link);
+ /* ... skip nxt_request_app_link_use(task, req_app_link, -1) too. */
+
goto adjust_use;
}
@@ -4477,6 +4494,7 @@ nxt_router_free_app(nxt_task_t *task, void *obj, void *data)
static void
nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state)
{
+ int ra_use_delta;
nxt_app_t *app;
nxt_bool_t can_start_process;
nxt_request_app_link_t *req_app_link;
@@ -4485,11 +4503,7 @@ nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state)
app = state->app;
state->failed_port_use_delta = 0;
-
- if (nxt_queue_chk_remove(&req_app_link->link_app_requests))
- {
- nxt_request_app_link_dec_use(req_app_link);
- }
+ ra_use_delta = -nxt_queue_chk_remove(&req_app_link->link_app_requests);
if (nxt_queue_chk_remove(&req_app_link->link_port_pending))
{
@@ -4498,7 +4512,7 @@ nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state)
nxt_queue_remove(&req_app_link->link_app_pending);
req_app_link->link_app_pending.next = NULL;
- nxt_request_app_link_dec_use(req_app_link);
+ ra_use_delta--;
}
state->failed_port = req_app_link->app_port;
@@ -4538,7 +4552,7 @@ nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state)
&req_app_link->link_app_requests);
}
- nxt_request_app_link_inc_use(req_app_link);
+ ra_use_delta++;
nxt_debug(task, "req_app_link stream #%uD enqueue to app->requests",
req_app_link->stream);
@@ -4569,6 +4583,8 @@ nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state)
}
}
+ nxt_request_app_link_chk_use(req_app_link, ra_use_delta);
+
fail:
state->shared_ra = req_app_link;
@@ -4596,7 +4612,6 @@ nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state)
nxt_request_app_link_error(state->req_app_link, 500,
"Failed to allocate shared req<->app link");
- nxt_request_app_link_use(task, state->req_app_link, -1);
return NXT_ERROR;
}
@@ -4625,7 +4640,6 @@ nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state)
if (nxt_slow_path(res != NXT_OK)) {
nxt_request_app_link_error(req_app_link, 500,
"Failed to start app process");
- nxt_request_app_link_use(task, req_app_link, -1);
return NXT_ERROR;
}
@@ -4686,19 +4700,19 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r,
nxt_request_app_link_init(task, req_app_link, req_rpc_data);
res = nxt_router_app_port(task, app, req_app_link);
+ req_app_link = req_rpc_data->req_app_link;
- if (res != NXT_OK) {
- return;
- }
+ if (res == NXT_OK) {
+ port = req_app_link->app_port;
- req_app_link = req_rpc_data->req_app_link;
- port = req_app_link->app_port;
+ nxt_assert(port != NULL);
- nxt_assert(port != NULL);
+ nxt_port_rpc_ex_set_peer(task, engine->port, req_rpc_data, port->pid);
- nxt_port_rpc_ex_set_peer(task, engine->port, req_rpc_data, port->pid);
+ nxt_router_app_prepare_request(task, req_app_link);
+ }
- nxt_router_app_prepare_request(task, req_app_link);
+ nxt_request_app_link_use(task, req_app_link, -1);
}
@@ -5172,8 +5186,6 @@ nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
pending_ra->stream);
if (cancelled) {
- nxt_request_app_link_inc_use(pending_ra);
-
state.req_app_link = pending_ra;
state.app = app;
@@ -5186,10 +5198,12 @@ nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
nxt_thread_mutex_unlock(&app->mutex);
- if (pending_ra != NULL
- && nxt_router_port_post_select(task, &state) == NXT_OK)
- {
- nxt_router_app_prepare_request(task, pending_ra);
+ if (pending_ra != NULL) {
+ if (nxt_router_port_post_select(task, &state) == NXT_OK) {
+ nxt_router_app_prepare_request(task, pending_ra);
+ }
+
+ nxt_request_app_link_use(task, pending_ra, -1);
}
nxt_debug(task, "send quit to app '%V' pid %PI", &app->name, port->pid);