summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/nxt_router.c95
1 files changed, 59 insertions, 36 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c
index b3da0eaa..fddc3582 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -302,24 +302,14 @@ nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app)
}
-static nxt_req_app_link_t *
-nxt_router_ra_create(nxt_task_t *task, nxt_req_conn_link_t *rc)
+nxt_inline void
+nxt_router_ra_init(nxt_task_t *task, nxt_req_app_link_t *ra,
+ nxt_req_conn_link_t *rc)
{
- nxt_mp_t *mp;
nxt_event_engine_t *engine;
- nxt_req_app_link_t *ra;
- mp = rc->ap->mem_pool;
engine = task->thread->engine;
- ra = nxt_mp_retain(mp, sizeof(nxt_req_app_link_t));
-
- if (nxt_slow_path(ra == NULL)) {
- return NULL;
- }
-
- nxt_debug(task, "ra stream #%uD create", rc->stream);
-
nxt_memzero(ra, sizeof(nxt_req_app_link_t));
ra->stream = rc->stream;
@@ -327,13 +317,36 @@ nxt_router_ra_create(nxt_task_t *task, nxt_req_conn_link_t *rc)
ra->rc = rc;
rc->ra = ra;
ra->reply_port = engine->port;
-
- ra->mem_pool = mp;
+ ra->ap = rc->ap;
ra->work.handler = NULL;
ra->work.task = &engine->task;
ra->work.obj = ra;
ra->work.data = engine;
+}
+
+
+nxt_inline nxt_req_app_link_t *
+nxt_router_ra_create(nxt_task_t *task, nxt_req_app_link_t *ra_src)
+{
+ nxt_mp_t *mp;
+ nxt_req_app_link_t *ra;
+
+ mp = ra_src->ap->mem_pool;
+
+ ra = nxt_mp_retain(mp, sizeof(nxt_req_app_link_t));
+
+ if (nxt_slow_path(ra == NULL)) {
+
+ ra_src->rc->ra = NULL;
+ ra_src->rc = NULL;
+
+ return NULL;
+ }
+
+ nxt_router_ra_init(task, ra, ra_src->rc);
+
+ ra->mem_pool = mp;
return ra;
}
@@ -388,7 +401,9 @@ nxt_router_ra_release(nxt_task_t *task, void *obj, void *data)
ra->app_port = NULL;
}
- nxt_mp_release(ra->mem_pool, ra);
+ if (ra->mem_pool != NULL) {
+ nxt_mp_release(ra->mem_pool, ra);
+ }
}
@@ -435,7 +450,9 @@ nxt_router_ra_abort(nxt_task_t *task, void *obj, void *data)
ra->app_port = NULL;
}
- nxt_mp_release(ra->mem_pool, ra);
+ if (ra->mem_pool != NULL) {
+ nxt_mp_release(ra->mem_pool, ra);
+ }
}
@@ -494,7 +511,9 @@ nxt_router_ra_error(nxt_task_t *task, nxt_req_app_link_t *ra, int code,
ra->app_port = NULL;
}
- nxt_mp_release(ra->mem_pool, ra);
+ if (ra->mem_pool != NULL) {
+ nxt_mp_release(ra->mem_pool, ra);
+ }
}
@@ -2807,20 +2826,24 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
nxt_timer_add(engine, &c->read_timer, app->timeout);
}
+ can_start_worker = 0;
+
nxt_thread_mutex_lock(&app->mutex);
if (!nxt_queue_is_empty(&app->ports)) {
port = nxt_router_app_get_port_unsafe(app, &use_delta);
- can_start_worker = 0;
-
} else {
- nxt_queue_insert_tail(&app->requests, &ra->link);
+ ra = nxt_router_ra_create(task, ra);
+
+ if (nxt_fast_path(ra != NULL)) {
+ nxt_queue_insert_tail(&app->requests, &ra->link);
- can_start_worker = (app->workers + app->pending_workers) <
- app->max_workers;
- if (can_start_worker) {
- app->pending_workers++;
+ can_start_worker = (app->workers + app->pending_workers) <
+ app->max_workers;
+ if (can_start_worker) {
+ app->pending_workers++;
+ }
}
port = NULL;
@@ -2828,6 +2851,12 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
nxt_thread_mutex_unlock(&app->mutex);
+ if (nxt_slow_path(ra == NULL)) {
+ nxt_router_gen_error(task, c, 500, "Failed to allocate "
+ "req<->app link");
+ return NXT_ERROR;
+ }
+
if (port != NULL) {
nxt_debug(task, "already have port for app '%V' %p ", &app->name, app);
@@ -2839,6 +2868,8 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
return NXT_OK;
}
+ nxt_debug(task, "ra stream #%uD allocated", ra->stream);
+
if (!can_start_worker) {
nxt_debug(task, "app '%V' %p too many running or pending workers",
&app->name, app);
@@ -3060,7 +3091,7 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
nxt_int_t res;
nxt_port_t *port;
nxt_event_engine_t *engine;
- nxt_req_app_link_t *ra;
+ nxt_req_app_link_t ra_local, *ra;
nxt_req_conn_link_t *rc;
engine = task->thread->engine;
@@ -3088,16 +3119,8 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
rc->ap = ap;
c->socket.data = NULL;
- ra = nxt_router_ra_create(task, rc);
-
- if (nxt_slow_path(ra == NULL)) {
- nxt_router_gen_error(task, c, 500, "Failed to allocate "
- "req<->app link");
-
- return;
- }
-
- ra->ap = ap;
+ ra = &ra_local;
+ nxt_router_ra_init(task, ra, rc);
res = nxt_router_app_port(task, ra);