diff options
author | Max Romanov <max.romanov@nginx.com> | 2017-12-27 17:47:18 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2017-12-27 17:47:18 +0300 |
commit | bef2ec483eaa525da770c024069b31320b63977e (patch) | |
tree | a0e89da5619533e84a045ae357b9fb44cb250c30 /src | |
parent | ab138c91661aa9b3ba36fb9a1a2154461a1e2372 (diff) | |
download | unit-bef2ec483eaa525da770c024069b31320b63977e.tar.gz unit-bef2ec483eaa525da770c024069b31320b63977e.tar.bz2 |
Fixing application timeout.
Application timeout limits maximum time of worker response in processing
particular request. Not including the time required to start worker,
time in request queue etc.
Diffstat (limited to 'src')
-rw-r--r-- | src/nxt_port.c | 1 | ||||
-rw-r--r-- | src/nxt_port.h | 6 | ||||
-rw-r--r-- | src/nxt_port_rpc.c | 55 | ||||
-rw-r--r-- | src/nxt_router.c | 407 |
4 files changed, 362 insertions, 107 deletions
diff --git a/src/nxt_port.c b/src/nxt_port.c index 4a07657d..5a1d4e6c 100644 --- a/src/nxt_port.c +++ b/src/nxt_port.c @@ -66,6 +66,7 @@ nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid, nxt_queue_init(&port->messages); nxt_thread_mutex_create(&port->write_mutex); + nxt_queue_init(&port->pending_requests); } else { nxt_mp_destroy(mp); diff --git a/src/nxt_port.h b/src/nxt_port.h index cf30bc92..74d37dcb 100644 --- a/src/nxt_port.h +++ b/src/nxt_port.h @@ -168,6 +168,7 @@ struct nxt_port_s { uint32_t app_pending_responses; uint32_t app_responses; + nxt_queue_t pending_requests; nxt_port_handler_t handler; nxt_port_handler_t *data; @@ -265,4 +266,9 @@ nxt_int_t nxt_port_post(nxt_task_t *task, nxt_port_t *port, nxt_port_post_handler_t handler, void *data); void nxt_port_use(nxt_task_t *task, nxt_port_t *port, int i); +nxt_inline void nxt_port_inc_use(nxt_port_t *port) +{ + nxt_atomic_fetch_add(&port->use_count, 1); +} + #endif /* _NXT_PORT_H_INCLUDED_ */ diff --git a/src/nxt_port_rpc.c b/src/nxt_port_rpc.c index 04de7775..15c550f4 100644 --- a/src/nxt_port_rpc.c +++ b/src/nxt_port_rpc.c @@ -25,6 +25,11 @@ struct nxt_port_rpc_reg_s { }; +static void +nxt_port_rpc_remove_from_peers(nxt_task_t *task, nxt_port_t *port, + nxt_port_rpc_reg_t *reg); + + static nxt_int_t nxt_rpc_reg_test(nxt_lvlhsh_query_t *lhq, void *data) { @@ -168,10 +173,17 @@ nxt_port_rpc_ex_set_peer(nxt_task_t *task, nxt_port_t *port, nxt_assert(reg->data == ex); - if (peer == -1 || reg->peer != -1) { - nxt_log_error(NXT_LOG_ERR, task->log, "rpc: stream #%uD failed to " - "change peer %PI->%PI", reg->stream, reg->peer, peer); + if (nxt_slow_path(peer == reg->peer)) { + return; + } + + if (reg->peer != -1) { + nxt_port_rpc_remove_from_peers(task, port, reg); + + reg->peer = -1; + } + if (peer == -1) { return; } @@ -359,7 +371,6 @@ nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer) msg.port_msg.pid = peer; msg.port_msg.type = _NXT_PORT_MSG_REMOVE_PID; - msg.port_msg.last = 1; peer_link = lhq.value; last = 0; @@ -375,20 +386,7 @@ nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer) nxt_debug(task, "rpc: stream #%uD trigger error", stream); msg.port_msg.stream = stream; - - reg->error_handler(task, &msg, reg->data); - - nxt_port_rpc_lhq_stream(&lhq, &stream); - lhq.pool = port->mem_pool; - - ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq); - - if (nxt_slow_path(ret != NXT_OK)) { - nxt_log_error(NXT_LOG_ERR, task->log, - "rpc: stream #%uD failed to delete handler", stream); - - return; - } + msg.port_msg.last = 1; if (peer_link == peer_link->next) { nxt_assert(peer_link->prev == peer_link); @@ -405,6 +403,27 @@ nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer) peer_link = next_link; } + reg->peer = -1; + + reg->error_handler(task, &msg, reg->data); + + /* Reset 'last' flag to preserve rpc handler. */ + if (msg.port_msg.last == 0) { + continue; + } + + nxt_port_rpc_lhq_stream(&lhq, &stream); + lhq.pool = port->mem_pool; + + ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq); + + if (nxt_slow_path(ret != NXT_OK)) { + nxt_log_error(NXT_LOG_ERR, task->log, + "rpc: stream #%uD failed to delete handler", stream); + + return; + } + nxt_mp_free(port->mem_pool, reg); } } diff --git a/src/nxt_router.c b/src/nxt_router.c index c71338fc..61daf746 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -48,14 +48,15 @@ typedef struct { struct nxt_req_app_link_s { uint32_t stream; + nxt_atomic_t use_count; nxt_port_t *app_port; - nxt_pid_t app_pid; nxt_port_t *reply_port; nxt_app_parse_ctx_t *ap; nxt_msg_info_t msg_info; nxt_req_conn_link_t *rc; - nxt_queue_link_t link; /* for nxt_app_t.requests */ + nxt_queue_link_t link_app_requests; /* for nxt_app_t.requests */ + nxt_queue_link_t link_port_pending; /* for nxt_port_t.pending_requests */ nxt_mp_t *mem_pool; nxt_work_t work; @@ -73,6 +74,24 @@ typedef struct { static nxt_int_t nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app); +nxt_inline void +nxt_router_ra_inc_use(nxt_req_app_link_t *ra) +{ + nxt_atomic_fetch_add(&ra->use_count, 1); +} + +nxt_inline void +nxt_router_ra_dec_use(nxt_req_app_link_t *ra) +{ + int c; + + c = nxt_atomic_fetch_add(&ra->use_count, -1); + + nxt_assert(c > 1); +} + +static void nxt_router_ra_use(nxt_task_t *task, nxt_req_app_link_t *ra, int i); + static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task); static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data); static void nxt_router_conf_ready(nxt_task_t *task, @@ -153,6 +172,7 @@ static void nxt_router_app_port_error(nxt_task_t *task, static nxt_port_t * nxt_router_app_get_idle_port(nxt_app_t *app); static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, uint32_t request_failed, uint32_t got_response); +static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra); static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data); static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, @@ -162,7 +182,7 @@ static void nxt_router_conn_http_body_read(nxt_task_t *task, void *obj, void *data); static void nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, nxt_app_parse_ctx_t *ap); -static void nxt_router_process_http_request_mp(nxt_task_t *task, +static void nxt_router_app_prepare_request(nxt_task_t *task, nxt_req_app_link_t *ra); static nxt_int_t nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, nxt_app_wmsg_t *wmsg); @@ -319,7 +339,7 @@ nxt_router_ra_init(nxt_task_t *task, nxt_req_app_link_t *ra, nxt_memzero(ra, sizeof(nxt_req_app_link_t)); ra->stream = rc->stream; - ra->app_pid = -1; + ra->use_count = 1; ra->rc = rc; rc->ra = ra; ra->reply_port = engine->port; @@ -338,6 +358,10 @@ nxt_router_ra_create(nxt_task_t *task, nxt_req_app_link_t *ra_src) nxt_mp_t *mp; nxt_req_app_link_t *ra; + if (ra_src->mem_pool != NULL) { + return ra_src; + } + mp = ra_src->ap->mem_pool; ra = nxt_mp_retain(mp, sizeof(nxt_req_app_link_t)); @@ -394,25 +418,38 @@ nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info, static void -nxt_router_ra_release(nxt_task_t *task, void *obj, void *data) +nxt_router_ra_update_peer(nxt_task_t *task, nxt_req_app_link_t *ra); + + +static void +nxt_router_ra_update_peer_handler(nxt_task_t *task, void *obj, void *data) +{ + nxt_req_app_link_t *ra; + + ra = obj; + + nxt_router_ra_update_peer(task, ra); + + nxt_router_ra_use(task, ra, -1); +} + + +static void +nxt_router_ra_update_peer(nxt_task_t *task, nxt_req_app_link_t *ra) { - nxt_req_app_link_t *ra; nxt_event_engine_t *engine; nxt_req_conn_link_t *rc; - ra = obj; - engine = data; + engine = ra->work.data; if (task->thread->engine != engine) { - if (ra->app_port != NULL) { - ra->app_pid = ra->app_port->pid; - } + nxt_router_ra_inc_use(ra); - ra->work.handler = nxt_router_ra_release; + ra->work.handler = nxt_router_ra_update_peer_handler; ra->work.task = &engine->task; ra->work.next = NULL; - nxt_debug(task, "ra stream #%uD post release to %p", + nxt_debug(task, "ra stream #%uD post update peer to %p", ra->stream, engine); nxt_event_engine_post(engine, &ra->work); @@ -420,22 +457,47 @@ nxt_router_ra_release(nxt_task_t *task, void *obj, void *data) return; } + nxt_debug(task, "ra stream #%uD update peer", ra->stream); + + rc = ra->rc; + + if (rc != NULL && ra->app_port != NULL) { + nxt_port_rpc_ex_set_peer(task, engine->port, rc, ra->app_port->pid); + } + + nxt_router_ra_use(task, ra, -1); +} + + +static void +nxt_router_ra_release(nxt_task_t *task, nxt_req_app_link_t *ra) +{ + nxt_conn_t *c; + nxt_req_conn_link_t *rc; + + nxt_assert(task->thread->engine == ra->work.data); + nxt_assert(ra->use_count == 0); + nxt_debug(task, "ra stream #%uD release", ra->stream); rc = ra->rc; if (rc != NULL) { - if (ra->app_pid != -1) { - nxt_port_rpc_ex_set_peer(task, engine->port, rc, ra->app_pid); - } + c = rc->conn; if (nxt_slow_path(ra->err_code != 0)) { - nxt_router_gen_error(task, rc->conn, ra->err_code, ra->err_str); + nxt_router_gen_error(task, c, ra->err_code, ra->err_str); } else { rc->app_port = ra->app_port; rc->msg_info = ra->msg_info; + if (rc->app->timeout != 0) { + c->read_timer.handler = nxt_router_app_timeout; + nxt_timer_add(task->thread->engine, &c->read_timer, + rc->app->timeout); + } + ra->app_port = NULL; ra->msg_info.buf = NULL; } @@ -458,6 +520,52 @@ nxt_router_ra_release(nxt_task_t *task, void *obj, void *data) } +static void +nxt_router_ra_release_handler(nxt_task_t *task, void *obj, void *data) +{ + nxt_req_app_link_t *ra; + + ra = obj; + + nxt_assert(ra->work.data == data); + + nxt_atomic_fetch_add(&ra->use_count, -1); + + nxt_router_ra_release(task, ra); +} + + +static void +nxt_router_ra_use(nxt_task_t *task, nxt_req_app_link_t *ra, int i) +{ + int c; + nxt_event_engine_t *engine; + + c = nxt_atomic_fetch_add(&ra->use_count, i); + + if (i < 0 && c == -i) { + engine = ra->work.data; + + if (task->thread->engine == engine) { + nxt_router_ra_release(task, ra); + + return; + } + + nxt_router_ra_inc_use(ra); + + ra->work.handler = nxt_router_ra_release_handler; + ra->work.task = &engine->task; + ra->work.next = NULL; + + nxt_debug(task, "ra stream #%uD post release to %p", + ra->stream, engine); + + nxt_event_engine_post(engine, &ra->work); + } +} + + nxt_inline void nxt_router_ra_error(nxt_req_app_link_t *ra, int code, const char* str) { @@ -467,9 +575,25 @@ nxt_router_ra_error(nxt_req_app_link_t *ra, int code, const char* str) } +nxt_inline nxt_bool_t +nxt_queue_chk_remove(nxt_queue_link_t *lnk) +{ + if (lnk->next != NULL) { + nxt_queue_remove(lnk); + + lnk->next = NULL; + + return 1; + } + + return 0; +} + + nxt_inline void nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc) { + int ra_use_delta; nxt_req_app_link_t *ra; if (rc->app_port != NULL) { @@ -486,22 +610,25 @@ nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc) rc->ra = NULL; ra->rc = NULL; - nxt_thread_mutex_lock(&rc->app->mutex); + ra_use_delta = 0; - if (ra->link.next != NULL) { - nxt_queue_remove(&ra->link); + nxt_thread_mutex_lock(&rc->app->mutex); - ra->link.next = NULL; + if (ra->link_app_requests.next == NULL + && ra->link_port_pending.next == NULL) + { + ra = NULL; } else { - ra = NULL; + ra_use_delta -= nxt_queue_chk_remove(&ra->link_app_requests); + ra_use_delta -= nxt_queue_chk_remove(&ra->link_port_pending); } nxt_thread_mutex_unlock(&rc->app->mutex); - } - if (ra != NULL) { - nxt_router_ra_release(task, ra, ra->work.data); + if (ra != NULL) { + nxt_router_ra_use(task, ra, ra_use_delta); + } } if (rc->app != NULL) { @@ -2266,6 +2393,7 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, size_t dump_size; nxt_buf_t *b, *last; nxt_conn_t *c; + nxt_event_engine_t *engine; nxt_req_conn_link_t *rc; b = msg->buf; @@ -2287,6 +2415,10 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, b = NULL; } + engine = task->thread->engine; + + nxt_timer_disable(engine, &c->read_timer); + if (msg->port_msg.last != 0) { nxt_debug(task, "router data create last buf"); @@ -2298,6 +2430,12 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_buf_chain_add(&b, last); nxt_router_rc_unlink(task, rc); + + } else { + if (rc->app->timeout != 0) { + c->read_timer.handler = nxt_router_app_timeout; + nxt_timer_add(engine, &c->read_timer, rc->app->timeout); + } } if (b == NULL) { @@ -2327,10 +2465,41 @@ static void nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) { + nxt_int_t res; + nxt_port_t *port; + nxt_bool_t cancelled; + nxt_req_app_link_t *ra; nxt_req_conn_link_t *rc; rc = data; + ra = rc->ra; + + if (ra != NULL) { + cancelled = nxt_router_msg_cancel(task, &ra->msg_info, ra->stream); + + if (cancelled) { + nxt_router_ra_inc_use(ra); + + res = nxt_router_app_port(task, ra); + + if (res == NXT_OK) { + port = ra->app_port; + + nxt_assert(port != NULL); + + nxt_port_rpc_ex_set_peer(task, task->thread->engine->port, rc, + port->pid); + + nxt_router_app_prepare_request(task, ra); + } + + msg->port_msg.last = 0; + + return; + } + } + nxt_router_gen_error(task, rc->conn, 500, "Application terminated unexpectedly"); @@ -2478,7 +2647,7 @@ nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_queue_remove(lnk); lnk->next = NULL; - ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link); + ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_requests); } else { ra = NULL; @@ -2491,7 +2660,7 @@ nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, &app->name, app, ra->stream); nxt_router_ra_error(ra, 500, "Failed to start application worker"); - nxt_router_ra_release(task, ra, ra->work.data); + nxt_router_ra_use(task, ra, -1); } nxt_router_app_use(task, app, -1); @@ -2533,7 +2702,7 @@ nxt_router_app_first_port_busy(nxt_app_t *app) nxt_inline nxt_port_t * -nxt_router_app_get_port_unsafe(nxt_app_t *app, int *use_delta) +nxt_router_app_get_port_unsafe(nxt_app_t *app) { nxt_port_t *port; nxt_queue_link_t *lnk; @@ -2550,10 +2719,10 @@ nxt_router_app_get_port_unsafe(nxt_app_t *app, int *use_delta) { nxt_queue_insert_tail(&app->ports, lnk); + nxt_port_inc_use(port); + } else { lnk->next = NULL; - - (*use_delta)--; } return port; @@ -2606,7 +2775,7 @@ nxt_router_app_process_request(nxt_task_t *task, void *obj, void *data) nxt_debug(task, "app '%V' %p process next stream #%uD", &app->name, app, ra->stream); - nxt_router_process_http_request_mp(task, ra); + nxt_router_app_prepare_request(task, ra); } @@ -2614,19 +2783,16 @@ static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, uint32_t request_failed, uint32_t got_response) { - int use_delta, ra_use_delta; nxt_app_t *app; nxt_bool_t send_quit; nxt_queue_link_t *lnk; - nxt_req_app_link_t *ra; + nxt_req_app_link_t *ra, *next_ra; nxt_assert(port != NULL); nxt_assert(port->app != NULL); app = port->app; - use_delta = (request_failed == 0 && got_response == 0) ? 0 : -1; - nxt_thread_mutex_lock(&app->mutex); port->app_pending_responses -= request_failed + got_response; @@ -2645,7 +2811,7 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, nxt_queue_insert_head(&app->ports, &port->app_link); } - use_delta++; + nxt_port_inc_use(port); } else { if (port->app_pending_responses == 0 @@ -2665,21 +2831,46 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, nxt_queue_remove(lnk); lnk->next = NULL; - ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link); + ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_requests); - ra_use_delta = 1; - ra->app_port = nxt_router_app_get_port_unsafe(app, &ra_use_delta); + ra->app_port = nxt_router_app_get_port_unsafe(app); + + if (ra->app_port->app_pending_responses > 1) { + nxt_queue_insert_tail(&ra->app_port->pending_requests, + &ra->link_port_pending); + + nxt_router_ra_inc_use(ra); + } } else { ra = NULL; - ra_use_delta = 0; + } + + if ((request_failed > 0 || got_response > 0) + && !nxt_queue_is_empty(&port->pending_requests)) + { + lnk = nxt_queue_first(&port->pending_requests); + nxt_queue_remove(lnk); + lnk->next = NULL; + + next_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, + link_port_pending); + + } else { + next_ra = NULL; } send_quit = app->live == 0 && port->app_pending_responses > 0; nxt_thread_mutex_unlock(&app->mutex); + if (next_ra != NULL) { + nxt_router_ra_use(task, next_ra, -1); + } + if (ra != NULL) { + nxt_router_ra_use(task, ra, -1); + nxt_work_queue_add(&task->thread->engine->fast_work_queue, nxt_router_app_process_request, &task->thread->engine->task, app, ra); @@ -2710,12 +2901,8 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, adjust_use: - if (use_delta != 0) { - nxt_port_use(task, port, use_delta); - } - - if (ra_use_delta != 0) { - nxt_port_use(task, ra->app_port, ra_use_delta); + if (request_failed > 0 || got_response > 0) { + nxt_port_use(task, port, -1); } } @@ -2766,41 +2953,46 @@ nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port) static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra) { - int use_delta; - nxt_int_t res; - nxt_app_t *app; - nxt_bool_t can_start_worker; - nxt_conn_t *c; - nxt_port_t *port; - nxt_event_engine_t *engine; - nxt_socket_conf_joint_t *joint; + int failed_port_use_delta; + nxt_int_t res; + nxt_app_t *app; + nxt_bool_t can_start_worker; + nxt_conn_t *c; + nxt_port_t *port, *failed_port; - use_delta = 1; c = ra->rc->conn; + app = ra->rc->app; - joint = c->joint; - app = joint->socket_conf->application; + failed_port_use_delta = 0; - if (app == NULL) { - nxt_router_gen_error(task, c, 500, - "Application is NULL in socket_conf"); - return NXT_ERROR; + nxt_thread_mutex_lock(&app->mutex); + + if (nxt_queue_chk_remove(&ra->link_app_requests)) + { + nxt_router_ra_dec_use(ra); } - ra->rc->app = app; + if (nxt_queue_chk_remove(&ra->link_port_pending)) + { + nxt_router_ra_dec_use(ra); + } - nxt_router_app_use(task, app, 1); + if (ra->app_port != NULL) { + failed_port = ra->app_port; + failed_port_use_delta--; - engine = task->thread->engine; + failed_port->app_pending_responses--; - nxt_timer_disable(engine, &c->read_timer); + if (failed_port->app_link.next != NULL) { + nxt_queue_remove(&failed_port->app_link); + failed_port->app_link.next = NULL; - if (app->timeout != 0) { - c->read_timer.handler = nxt_router_app_timeout; - nxt_timer_add(engine, &c->read_timer, app->timeout); - } + failed_port_use_delta--; + } - nxt_thread_mutex_lock(&app->mutex); + } else { + failed_port = NULL; + } can_start_worker = (app->workers + app->pending_workers) < app->max_workers; @@ -2811,7 +3003,12 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra) ra = nxt_router_ra_create(task, ra); if (nxt_fast_path(ra != NULL)) { - nxt_queue_insert_tail(&app->requests, &ra->link); + nxt_queue_insert_tail(&app->requests, &ra->link_app_requests); + + nxt_router_ra_inc_use(ra); + + nxt_debug(task, "ra stream #%uD enqueue to app->requests", + ra->stream); if (can_start_worker) { app->pending_workers++; @@ -2821,14 +3018,37 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra) port = NULL; } else { - port = nxt_router_app_get_port_unsafe(app, &use_delta); + port = nxt_router_app_get_port_unsafe(app); + + if (port->app_pending_responses > 1) { + ra = nxt_router_ra_create(task, ra); + + if (nxt_fast_path(ra != NULL)) { + nxt_queue_insert_tail(&port->pending_requests, + &ra->link_port_pending); + + nxt_router_ra_inc_use(ra); + + nxt_debug(task, "ra stream #%uD enqueue to " + "port->pending_requests", ra->stream); + } + } } nxt_thread_mutex_unlock(&app->mutex); + if (failed_port_use_delta != 0) { + nxt_port_use(task, failed_port, failed_port_use_delta); + } + if (nxt_slow_path(ra == NULL)) { nxt_router_gen_error(task, c, 500, "Failed to allocate " "req<->app link"); + + if (port != NULL) { + nxt_port_use(task, port, -1); + } + return NXT_ERROR; } @@ -2837,14 +3057,9 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra) ra->app_port = port; - if (use_delta != 0) { - nxt_port_use(task, port, use_delta); - } return NXT_OK; } - nxt_debug(task, "ra stream #%uD allocated", ra->stream); - if (!can_start_worker) { nxt_debug(task, "app '%V' %p too many running or pending workers", &app->name, app); @@ -3127,11 +3342,22 @@ static void nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, nxt_app_parse_ctx_t *ap) { - nxt_int_t res; - nxt_port_t *port; - nxt_event_engine_t *engine; - nxt_req_app_link_t ra_local, *ra; - nxt_req_conn_link_t *rc; + nxt_int_t res; + nxt_app_t *app; + nxt_port_t *port; + nxt_event_engine_t *engine; + nxt_req_app_link_t ra_local, *ra; + nxt_req_conn_link_t *rc; + nxt_socket_conf_joint_t *joint; + + joint = c->joint; + app = joint->socket_conf->application; + + if (app == NULL) { + nxt_router_gen_error(task, c, 500, + "Application is NULL in socket_conf"); + return; + } engine = task->thread->engine; @@ -3149,6 +3375,11 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, rc->stream = nxt_port_rpc_ex_stream(rc); rc->conn = c; + rc->app = app; + + nxt_router_app_use(task, app, 1); + + nxt_timer_disable(engine, &c->read_timer); nxt_queue_insert_tail(&c->requests, &rc->link); @@ -3167,16 +3398,14 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, return; } + ra = rc->ra; port = ra->app_port; - if (nxt_slow_path(port == NULL)) { - nxt_router_gen_error(task, c, 500, "Application port not found"); - return; - } + nxt_assert(port != NULL); nxt_port_rpc_ex_set_peer(task, engine->port, rc, port->pid); - nxt_router_process_http_request_mp(task, ra); + nxt_router_app_prepare_request(task, ra); } @@ -3187,7 +3416,7 @@ nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj, void *data) static void -nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra) +nxt_router_app_prepare_request(nxt_task_t *task, nxt_req_app_link_t *ra) { uint32_t request_failed; nxt_buf_t *b; @@ -3266,7 +3495,7 @@ release_port: nxt_router_app_port_release(task, port, request_failed, 0); - nxt_router_ra_release(task, ra, ra->work.data); + nxt_router_ra_update_peer(task, ra); } |