diff options
Diffstat (limited to 'src/nxt_router.c')
-rw-r--r-- | src/nxt_router.c | 95 |
1 files changed, 59 insertions, 36 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c index b3da0eaa..fddc3582 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -302,24 +302,14 @@ nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app) } -static nxt_req_app_link_t * -nxt_router_ra_create(nxt_task_t *task, nxt_req_conn_link_t *rc) +nxt_inline void +nxt_router_ra_init(nxt_task_t *task, nxt_req_app_link_t *ra, + nxt_req_conn_link_t *rc) { - nxt_mp_t *mp; nxt_event_engine_t *engine; - nxt_req_app_link_t *ra; - mp = rc->ap->mem_pool; engine = task->thread->engine; - ra = nxt_mp_retain(mp, sizeof(nxt_req_app_link_t)); - - if (nxt_slow_path(ra == NULL)) { - return NULL; - } - - nxt_debug(task, "ra stream #%uD create", rc->stream); - nxt_memzero(ra, sizeof(nxt_req_app_link_t)); ra->stream = rc->stream; @@ -327,13 +317,36 @@ nxt_router_ra_create(nxt_task_t *task, nxt_req_conn_link_t *rc) ra->rc = rc; rc->ra = ra; ra->reply_port = engine->port; - - ra->mem_pool = mp; + ra->ap = rc->ap; ra->work.handler = NULL; ra->work.task = &engine->task; ra->work.obj = ra; ra->work.data = engine; +} + + +nxt_inline nxt_req_app_link_t * +nxt_router_ra_create(nxt_task_t *task, nxt_req_app_link_t *ra_src) +{ + nxt_mp_t *mp; + nxt_req_app_link_t *ra; + + mp = ra_src->ap->mem_pool; + + ra = nxt_mp_retain(mp, sizeof(nxt_req_app_link_t)); + + if (nxt_slow_path(ra == NULL)) { + + ra_src->rc->ra = NULL; + ra_src->rc = NULL; + + return NULL; + } + + nxt_router_ra_init(task, ra, ra_src->rc); + + ra->mem_pool = mp; return ra; } @@ -388,7 +401,9 @@ nxt_router_ra_release(nxt_task_t *task, void *obj, void *data) ra->app_port = NULL; } - nxt_mp_release(ra->mem_pool, ra); + if (ra->mem_pool != NULL) { + nxt_mp_release(ra->mem_pool, ra); + } } @@ -435,7 +450,9 @@ nxt_router_ra_abort(nxt_task_t *task, void *obj, void *data) ra->app_port = NULL; } - nxt_mp_release(ra->mem_pool, ra); + if (ra->mem_pool != NULL) { + nxt_mp_release(ra->mem_pool, ra); + } } @@ -494,7 +511,9 @@ nxt_router_ra_error(nxt_task_t *task, nxt_req_app_link_t *ra, int code, ra->app_port = NULL; } - nxt_mp_release(ra->mem_pool, ra); + if (ra->mem_pool != NULL) { + nxt_mp_release(ra->mem_pool, ra); + } } @@ -2807,20 +2826,24 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra) nxt_timer_add(engine, &c->read_timer, app->timeout); } + can_start_worker = 0; + nxt_thread_mutex_lock(&app->mutex); if (!nxt_queue_is_empty(&app->ports)) { port = nxt_router_app_get_port_unsafe(app, &use_delta); - can_start_worker = 0; - } else { - nxt_queue_insert_tail(&app->requests, &ra->link); + ra = nxt_router_ra_create(task, ra); + + if (nxt_fast_path(ra != NULL)) { + nxt_queue_insert_tail(&app->requests, &ra->link); - can_start_worker = (app->workers + app->pending_workers) < - app->max_workers; - if (can_start_worker) { - app->pending_workers++; + can_start_worker = (app->workers + app->pending_workers) < + app->max_workers; + if (can_start_worker) { + app->pending_workers++; + } } port = NULL; @@ -2828,6 +2851,12 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra) nxt_thread_mutex_unlock(&app->mutex); + if (nxt_slow_path(ra == NULL)) { + nxt_router_gen_error(task, c, 500, "Failed to allocate " + "req<->app link"); + return NXT_ERROR; + } + if (port != NULL) { nxt_debug(task, "already have port for app '%V' %p ", &app->name, app); @@ -2839,6 +2868,8 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra) return NXT_OK; } + nxt_debug(task, "ra stream #%uD allocated", ra->stream); + if (!can_start_worker) { nxt_debug(task, "app '%V' %p too many running or pending workers", &app->name, app); @@ -3060,7 +3091,7 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, nxt_int_t res; nxt_port_t *port; nxt_event_engine_t *engine; - nxt_req_app_link_t *ra; + nxt_req_app_link_t ra_local, *ra; nxt_req_conn_link_t *rc; engine = task->thread->engine; @@ -3088,16 +3119,8 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, rc->ap = ap; c->socket.data = NULL; - ra = nxt_router_ra_create(task, rc); - - if (nxt_slow_path(ra == NULL)) { - nxt_router_gen_error(task, c, 500, "Failed to allocate " - "req<->app link"); - - return; - } - - ra->ap = ap; + ra = &ra_local; + nxt_router_ra_init(task, ra, rc); res = nxt_router_app_port(task, ra); |