summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_router.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/nxt_router.c138
1 files changed, 104 insertions, 34 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c
index b9f5d921..6a1f3792 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -95,16 +95,16 @@ nxt_request_app_link_inc_use(nxt_request_app_link_t *req_app_link)
}
nxt_inline void
-nxt_request_app_link_dec_use(nxt_request_app_link_t *req_app_link)
+nxt_request_app_link_chk_use(nxt_request_app_link_t *req_app_link, int i)
{
#if (NXT_DEBUG)
int c;
- c = nxt_atomic_fetch_add(&req_app_link->use_count, -1);
+ c = nxt_atomic_fetch_add(&req_app_link->use_count, i);
- nxt_assert(c > 1);
+ nxt_assert((c + i) > 0);
#else
- (void) nxt_atomic_fetch_add(&req_app_link->use_count, -1);
+ (void) nxt_atomic_fetch_add(&req_app_link->use_count, i);
#endif
}
@@ -248,6 +248,7 @@ static nxt_int_t nxt_router_http_request_done(nxt_task_t *task,
nxt_http_request_t *r);
static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
void *data);
+static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
extern const nxt_http_request_state_t nxt_http_websocket;
@@ -276,6 +277,7 @@ nxt_port_handlers_t nxt_router_process_port_handlers = {
.access_log = nxt_router_access_log_reopen_handler,
.rpc_ready = nxt_port_rpc_handler,
.rpc_error = nxt_port_rpc_handler,
+ .oosm = nxt_router_oosm_handler,
};
@@ -600,8 +602,6 @@ nxt_request_app_link_update_peer(nxt_task_t *task,
nxt_port_rpc_ex_set_peer(task, engine->port, req_rpc_data,
req_app_link->app_port->pid);
}
-
- nxt_request_app_link_use(task, req_app_link, -1);
}
@@ -2750,6 +2750,7 @@ static nxt_port_handlers_t nxt_router_app_port_handlers = {
.rpc_error = nxt_port_rpc_handler,
.mmap = nxt_port_mmap_handler,
.data = nxt_port_rpc_handler,
+ .oosm = nxt_router_oosm_handler,
};
@@ -3732,8 +3733,6 @@ nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
cancelled = nxt_router_msg_cancel(task, &req_app_link->msg_info,
req_app_link->stream);
if (cancelled) {
- nxt_request_app_link_inc_use(req_app_link);
-
res = nxt_router_app_port(task, req_rpc_data->app, req_app_link);
if (res == NXT_OK) {
@@ -3751,6 +3750,8 @@ nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_router_app_prepare_request(task, req_app_link);
}
+ nxt_request_app_link_use(task, req_app_link, -1);
+
msg->port_msg.last = 0;
return;
@@ -4015,6 +4016,8 @@ nxt_router_app_process_request(nxt_task_t *task, void *obj, void *data)
#endif
nxt_router_app_prepare_request(task, req_app_link);
+
+ nxt_request_app_link_use(task, req_app_link, -1);
}
@@ -4148,8 +4151,6 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
re_ra->stream);
if (cancelled) {
- nxt_request_app_link_inc_use(re_ra);
-
state.req_app_link = re_ra;
state.app = app;
@@ -4217,19 +4218,38 @@ re_ra_cancelled:
if (re_ra != NULL) {
if (nxt_router_port_post_select(task, &state) == NXT_OK) {
+ /*
+ * There should be call nxt_request_app_link_inc_use(re_ra),
+ * but we need to decrement use then. So, let's skip both.
+ */
+
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
nxt_router_app_process_request,
&task->thread->engine->task, app, re_ra);
+
+ } else {
+ /*
+ * This call should be unconditional, but we want to spare
+ * couple of CPU ticks to postpone the head death of the universe.
+ */
+
+ nxt_request_app_link_use(task, re_ra, -1);
}
}
if (req_app_link != NULL) {
- nxt_request_app_link_use(task, req_app_link, -1);
+ /*
+ * Here we do the same trick as described above,
+ * but without conditions.
+ * Skip required nxt_request_app_link_inc_use(req_app_link).
+ */
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
nxt_router_app_process_request,
&task->thread->engine->task, app, req_app_link);
+ /* ... skip nxt_request_app_link_use(task, req_app_link, -1) too. */
+
goto adjust_use;
}
@@ -4477,6 +4497,7 @@ nxt_router_free_app(nxt_task_t *task, void *obj, void *data)
static void
nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state)
{
+ int ra_use_delta;
nxt_app_t *app;
nxt_bool_t can_start_process;
nxt_request_app_link_t *req_app_link;
@@ -4485,11 +4506,7 @@ nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state)
app = state->app;
state->failed_port_use_delta = 0;
-
- if (nxt_queue_chk_remove(&req_app_link->link_app_requests))
- {
- nxt_request_app_link_dec_use(req_app_link);
- }
+ ra_use_delta = -nxt_queue_chk_remove(&req_app_link->link_app_requests);
if (nxt_queue_chk_remove(&req_app_link->link_port_pending))
{
@@ -4498,7 +4515,7 @@ nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state)
nxt_queue_remove(&req_app_link->link_app_pending);
req_app_link->link_app_pending.next = NULL;
- nxt_request_app_link_dec_use(req_app_link);
+ ra_use_delta--;
}
state->failed_port = req_app_link->app_port;
@@ -4538,7 +4555,7 @@ nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state)
&req_app_link->link_app_requests);
}
- nxt_request_app_link_inc_use(req_app_link);
+ ra_use_delta++;
nxt_debug(task, "req_app_link stream #%uD enqueue to app->requests",
req_app_link->stream);
@@ -4569,6 +4586,8 @@ nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state)
}
}
+ nxt_request_app_link_chk_use(req_app_link, ra_use_delta);
+
fail:
state->shared_ra = req_app_link;
@@ -4596,7 +4615,6 @@ nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state)
nxt_request_app_link_error(state->req_app_link, 500,
"Failed to allocate shared req<->app link");
- nxt_request_app_link_use(task, state->req_app_link, -1);
return NXT_ERROR;
}
@@ -4625,7 +4643,6 @@ nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state)
if (nxt_slow_path(res != NXT_OK)) {
nxt_request_app_link_error(req_app_link, 500,
"Failed to start app process");
- nxt_request_app_link_use(task, req_app_link, -1);
return NXT_ERROR;
}
@@ -4686,19 +4703,19 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r,
nxt_request_app_link_init(task, req_app_link, req_rpc_data);
res = nxt_router_app_port(task, app, req_app_link);
+ req_app_link = req_rpc_data->req_app_link;
- if (res != NXT_OK) {
- return;
- }
+ if (res == NXT_OK) {
+ port = req_app_link->app_port;
- req_app_link = req_rpc_data->req_app_link;
- port = req_app_link->app_port;
+ nxt_assert(port != NULL);
- nxt_assert(port != NULL);
+ nxt_port_rpc_ex_set_peer(task, engine->port, req_rpc_data, port->pid);
- nxt_port_rpc_ex_set_peer(task, engine->port, req_rpc_data, port->pid);
+ nxt_router_app_prepare_request(task, req_app_link);
+ }
- nxt_router_app_prepare_request(task, req_app_link);
+ nxt_request_app_link_use(task, req_app_link, -1);
}
@@ -5172,8 +5189,6 @@ nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
pending_ra->stream);
if (cancelled) {
- nxt_request_app_link_inc_use(pending_ra);
-
state.req_app_link = pending_ra;
state.app = app;
@@ -5186,10 +5201,12 @@ nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
nxt_thread_mutex_unlock(&app->mutex);
- if (pending_ra != NULL
- && nxt_router_port_post_select(task, &state) == NXT_OK)
- {
- nxt_router_app_prepare_request(task, pending_ra);
+ if (pending_ra != NULL) {
+ if (nxt_router_port_post_select(task, &state) == NXT_OK) {
+ nxt_router_app_prepare_request(task, pending_ra);
+ }
+
+ nxt_request_app_link_use(task, pending_ra, -1);
}
nxt_debug(task, "send quit to app '%V' pid %PI", &app->name, port->pid);
@@ -5227,3 +5244,56 @@ nxt_router_http_request_release(nxt_task_t *task, void *obj, void *data)
nxt_mp_release(r->mem_pool);
}
+
+
+static void
+nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
+{
+ size_t mi;
+ uint32_t i;
+ nxt_bool_t ack;
+ nxt_process_t *process;
+ nxt_free_map_t *m;
+ nxt_port_mmap_header_t *hdr;
+
+ nxt_debug(task, "oosm in %PI", msg->port_msg.pid);
+
+ process = nxt_runtime_process_find(task->thread->runtime,
+ msg->port_msg.pid);
+ if (nxt_slow_path(process == NULL)) {
+ return;
+ }
+
+ ack = 0;
+
+ /*
+ * To mitigate possible racing condition (when OOSM message received
+ * after some of the memory was already freed), need to try to find
+ * first free segment in shared memory and send ACK if found.
+ */
+
+ nxt_thread_mutex_lock(&process->incoming.mutex);
+
+ for (i = 0; i < process->incoming.size; i++) {
+ hdr = process->incoming.elts[i].mmap_handler->hdr;
+ m = hdr->free_map;
+
+ for (mi = 0; mi < MAX_FREE_IDX; mi++) {
+ if (m[mi] != 0) {
+ ack = 1;
+
+ nxt_debug(task, "oosm: already free #%uD %uz = 0x%08xA",
+ i, mi, m[mi]);
+
+ break;
+ }
+ }
+ }
+
+ nxt_thread_mutex_unlock(&process->incoming.mutex);
+
+ if (ack) {
+ (void) nxt_port_socket_write(task, msg->port, NXT_PORT_MSG_SHM_ACK,
+ -1, 0, 0, NULL);
+ }
+}