diff options
author | Max Romanov <max.romanov@nginx.com> | 2017-12-27 17:47:18 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2017-12-27 17:47:18 +0300 |
commit | bef2ec483eaa525da770c024069b31320b63977e (patch) | |
tree | a0e89da5619533e84a045ae357b9fb44cb250c30 | |
parent | ab138c91661aa9b3ba36fb9a1a2154461a1e2372 (diff) | |
download | unit-bef2ec483eaa525da770c024069b31320b63977e.tar.gz unit-bef2ec483eaa525da770c024069b31320b63977e.tar.bz2 |
Fixing application timeout.
Application timeout limits maximum time of worker response in processing
particular request. Not including the time required to start worker,
time in request queue etc.
-rw-r--r-- | src/nxt_port.c | 1 | ||||
-rw-r--r-- | src/nxt_port.h | 6 | ||||
-rw-r--r-- | src/nxt_port_rpc.c | 55 | ||||
-rw-r--r-- | src/nxt_router.c | 407 |
4 files changed, 362 insertions, 107 deletions
diff --git a/src/nxt_port.c b/src/nxt_port.c index 4a07657d..5a1d4e6c 100644 --- a/src/nxt_port.c +++ b/src/nxt_port.c @@ -66,6 +66,7 @@ nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid, nxt_queue_init(&port->messages); nxt_thread_mutex_create(&port->write_mutex); + nxt_queue_init(&port->pending_requests); } else { nxt_mp_destroy(mp); diff --git a/src/nxt_port.h b/src/nxt_port.h index cf30bc92..74d37dcb 100644 --- a/src/nxt_port.h +++ b/src/nxt_port.h @@ -168,6 +168,7 @@ struct nxt_port_s { uint32_t app_pending_responses; uint32_t app_responses; + nxt_queue_t pending_requests; nxt_port_handler_t handler; nxt_port_handler_t *data; @@ -265,4 +266,9 @@ nxt_int_t nxt_port_post(nxt_task_t *task, nxt_port_t *port, nxt_port_post_handler_t handler, void *data); void nxt_port_use(nxt_task_t *task, nxt_port_t *port, int i); +nxt_inline void nxt_port_inc_use(nxt_port_t *port) +{ + nxt_atomic_fetch_add(&port->use_count, 1); +} + #endif /* _NXT_PORT_H_INCLUDED_ */ diff --git a/src/nxt_port_rpc.c b/src/nxt_port_rpc.c index 04de7775..15c550f4 100644 --- a/src/nxt_port_rpc.c +++ b/src/nxt_port_rpc.c @@ -25,6 +25,11 @@ struct nxt_port_rpc_reg_s { }; +static void +nxt_port_rpc_remove_from_peers(nxt_task_t *task, nxt_port_t *port, + nxt_port_rpc_reg_t *reg); + + static nxt_int_t nxt_rpc_reg_test(nxt_lvlhsh_query_t *lhq, void *data) { @@ -168,10 +173,17 @@ nxt_port_rpc_ex_set_peer(nxt_task_t *task, nxt_port_t *port, nxt_assert(reg->data == ex); - if (peer == -1 || reg->peer != -1) { - nxt_log_error(NXT_LOG_ERR, task->log, "rpc: stream #%uD failed to " - "change peer %PI->%PI", reg->stream, reg->peer, peer); + if (nxt_slow_path(peer == reg->peer)) { + return; + } + + if (reg->peer != -1) { + nxt_port_rpc_remove_from_peers(task, port, reg); + + reg->peer = -1; + } + if (peer == -1) { return; } @@ -359,7 +371,6 @@ nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer) msg.port_msg.pid = peer; msg.port_msg.type = _NXT_PORT_MSG_REMOVE_PID; - msg.port_msg.last = 1; peer_link = lhq.value; last = 0; @@ -375,20 +386,7 @@ nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer) nxt_debug(task, "rpc: stream #%uD trigger error", stream); msg.port_msg.stream = stream; - - reg->error_handler(task, &msg, reg->data); - - nxt_port_rpc_lhq_stream(&lhq, &stream); - lhq.pool = port->mem_pool; - - ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq); - - if (nxt_slow_path(ret != NXT_OK)) { - nxt_log_error(NXT_LOG_ERR, task->log, - "rpc: stream #%uD failed to delete handler", stream); - - return; - } + msg.port_msg.last = 1; if (peer_link == peer_link->next) { nxt_assert(peer_link->prev == peer_link); @@ -405,6 +403,27 @@ nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer) peer_link = next_link; } + reg->peer = -1; + + reg->error_handler(task, &msg, reg->data); + + /* Reset 'last' flag to preserve rpc handler. */ + if (msg.port_msg.last == 0) { + continue; + } + + nxt_port_rpc_lhq_stream(&lhq, &stream); + lhq.pool = port->mem_pool; + + ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq); + + if (nxt_slow_path(ret != NXT_OK)) { + nxt_log_error(NXT_LOG_ERR, task->log, + "rpc: stream #%uD failed to delete handler", stream); + + return; + } + nxt_mp_free(port->mem_pool, reg); } } diff --git a/src/nxt_router.c b/src/nxt_router.c index c71338fc..61daf746 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -48,14 +48,15 @@ typedef struct { struct nxt_req_app_link_s { uint32_t stream; + nxt_atomic_t use_count; nxt_port_t *app_port; - nxt_pid_t app_pid; nxt_port_t *reply_port; nxt_app_parse_ctx_t *ap; nxt_msg_info_t msg_info; nxt_req_conn_link_t *rc; - nxt_queue_link_t link; /* for nxt_app_t.requests */ + nxt_queue_link_t link_app_requests; /* for nxt_app_t.requests */ + nxt_queue_link_t link_port_pending; /* for nxt_port_t.pending_requests */ nxt_mp_t *mem_pool; nxt_work_t work; @@ -73,6 +74,24 @@ typedef struct { static nxt_int_t nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app); +nxt_inline void +nxt_router_ra_inc_use(nxt_req_app_link_t *ra) +{ + nxt_atomic_fetch_add(&ra->use_count, 1); +} + +nxt_inline void +nxt_router_ra_dec_use(nxt_req_app_link_t *ra) +{ + int c; + + c = nxt_atomic_fetch_add(&ra->use_count, -1); + + nxt_assert(c > 1); +} + +static void nxt_router_ra_use(nxt_task_t *task, nxt_req_app_link_t *ra, int i); + static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task); static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data); static void nxt_router_conf_ready(nxt_task_t *task, @@ -153,6 +172,7 @@ static void nxt_router_app_port_error(nxt_task_t *task, static nxt_port_t * nxt_router_app_get_idle_port(nxt_app_t *app); static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, uint32_t request_failed, uint32_t got_response); +static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra); static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data); static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, @@ -162,7 +182,7 @@ static void nxt_router_conn_http_body_read(nxt_task_t *task, void *obj, void *data); static void nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, nxt_app_parse_ctx_t *ap); -static void nxt_router_process_http_request_mp(nxt_task_t *task, +static void nxt_router_app_prepare_request(nxt_task_t *task, nxt_req_app_link_t *ra); static nxt_int_t nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, nxt_app_wmsg_t *wmsg); @@ -319,7 +339,7 @@ nxt_router_ra_init(nxt_task_t *task, nxt_req_app_link_t *ra, nxt_memzero(ra, sizeof(nxt_req_app_link_t)); ra->stream = rc->stream; - ra->app_pid = -1; + ra->use_count = 1; ra->rc = rc; rc->ra = ra; ra->reply_port = engine->port; @@ -338,6 +358,10 @@ nxt_router_ra_create(nxt_task_t *task, nxt_req_app_link_t *ra_src) nxt_mp_t *mp; nxt_req_app_link_t *ra; + if (ra_src->mem_pool != NULL) { + return ra_src; + } + mp = ra_src->ap->mem_pool; ra = nxt_mp_retain(mp, sizeof(nxt_req_app_link_t)); @@ -394,25 +418,38 @@ nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info, static void -nxt_router_ra_release(nxt_task_t *task, void *obj, void *data) +nxt_router_ra_update_peer(nxt_task_t *task, nxt_req_app_link_t *ra); + + +static void +nxt_router_ra_update_peer_handler(nxt_task_t *task, void *obj, void *data) +{ + nxt_req_app_link_t *ra; + + ra = obj; + + nxt_router_ra_update_peer(task, ra); + + nxt_router_ra_use(task, ra, -1); +} + + +static void +nxt_router_ra_update_peer(nxt_task_t *task, nxt_req_app_link_t *ra) { - nxt_req_app_link_t *ra; nxt_event_engine_t *engine; nxt_req_conn_link_t *rc; - ra = obj; - engine = data; + engine = ra->work.data; if (task->thread->engine != engine) { - if (ra->app_port != NULL) { - ra->app_pid = ra->app_port->pid; - } + nxt_router_ra_inc_use(ra); - ra->work.handler = nxt_router_ra_release; + ra->work.handler = nxt_router_ra_update_peer_handler; ra->work.task = &engine->task; ra->work.next = NULL; - nxt_debug(task, "ra stream #%uD post release to %p", + nxt_debug(task, "ra stream #%uD post update peer to %p", ra->stream, engine); nxt_event_engine_post(engine, &ra->work); @@ -420,22 +457,47 @@ nxt_router_ra_release(nxt_task_t *task, void *obj, void *data) return; } + nxt_debug(task, "ra stream #%uD update peer", ra->stream); + + rc = ra->rc; + + if (rc != NULL && ra->app_port != NULL) { + nxt_port_rpc_ex_set_peer(task, engine->port, rc, ra->app_port->pid); + } + + nxt_router_ra_use(task, ra, -1); +} + + +static void +nxt_router_ra_release(nxt_task_t *task, nxt_req_app_link_t *ra) +{ + nxt_conn_t *c; + nxt_req_conn_link_t *rc; + + nxt_assert(task->thread->engine == ra->work.data); + nxt_assert(ra->use_count == 0); + nxt_debug(task, "ra stream #%uD release", ra->stream); rc = ra->rc; if (rc != NULL) { - if (ra->app_pid != -1) { - nxt_port_rpc_ex_set_peer(task, engine->port, rc, ra->app_pid); - } + c = rc->conn; if (nxt_slow_path(ra->err_code != 0)) { - nxt_router_gen_error(task, rc->conn, ra->err_code, ra->err_str); + nxt_router_gen_error(task, c, ra->err_code, ra->err_str); } else { rc->app_port = ra->app_port; rc->msg_info = ra->msg_info; + if (rc->app->timeout != 0) { + c->read_timer.handler = nxt_router_app_timeout; + nxt_timer_add(task->thread->engine, &c->read_timer, + rc->app->timeout); + } + ra->app_port = NULL; ra->msg_info.buf = NULL; } @@ -458,6 +520,52 @@ nxt_router_ra_release(nxt_task_t *task, void *obj, void *data) } +static void +nxt_router_ra_release_handler(nxt_task_t *task, void *obj, void *data) +{ + nxt_req_app_link_t *ra; + + ra = obj; + + nxt_assert(ra->work.data == data); + + nxt_atomic_fetch_add(&ra->use_count, -1); + + nxt_router_ra_release(task, ra); +} + + +static void +nxt_router_ra_use(nxt_task_t *task, nxt_req_app_link_t *ra, int i) +{ + int c; + nxt_event_engine_t *engine; + + c = nxt_atomic_fetch_add(&ra->use_count, i); + + if (i < 0 && c == -i) { + engine = ra->work.data; + + if (task->thread->engine == engine) { + nxt_router_ra_release(task, ra); + + return; + } + + nxt_router_ra_inc_use(ra); + + ra->work.handler = nxt_router_ra_release_handler; + ra->work.task = &engine->task; + ra->work.next = NULL; + + nxt_debug(task, "ra stream #%uD post release to %p", + ra->stream, engine); + + nxt_event_engine_post(engine, &ra->work); + } +} + + nxt_inline void nxt_router_ra_error(nxt_req_app_link_t *ra, int code, const char* str) { @@ -467,9 +575,25 @@ nxt_router_ra_error(nxt_req_app_link_t *ra, int code, const char* str) } +nxt_inline nxt_bool_t +nxt_queue_chk_remove(nxt_queue_link_t *lnk) +{ + if (lnk->next != NULL) { + nxt_queue_remove(lnk); + + lnk->next = NULL; + + return 1; + } + + return 0; +} + + nxt_inline void nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc) { + int ra_use_delta; nxt_req_app_link_t *ra; if (rc->app_port != NULL) { @@ -486,22 +610,25 @@ nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc) rc->ra = NULL; ra->rc = NULL; - nxt_thread_mutex_lock(&rc->app->mutex); + ra_use_delta = 0; - if (ra->link.next != NULL) { - nxt_queue_remove(&ra->link); + nxt_thread_mutex_lock(&rc->app->mutex); - ra->link.next = NULL; + if (ra->link_app_requests.next == NULL + && ra->link_port_pending.next == NULL) + { + ra = NULL; } else { - ra = NULL; + ra_use_delta -= nxt_queue_chk_remove(&ra->link_app_requests); + ra_use_delta -= nxt_queue_chk_remove(&ra->link_port_pending); } nxt_thread_mutex_unlock(&rc->app->mutex); - } - if (ra != NULL) { - nxt_router_ra_release(task, ra, ra->work.data); + if (ra != NULL) { + nxt_router_ra_use(task, ra, ra_use_delta); + } } if (rc->app != NULL) { @@ -2266,6 +2393,7 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, size_t dump_size; nxt_buf_t *b, *last; nxt_conn_t *c; + nxt_event_engine_t *engine; nxt_req_conn_link_t *rc; b = msg->buf; @@ -2287,6 +2415,10 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, b = NULL; } + engine = task->thread->engine; + + nxt_timer_disable(engine, &c->read_timer); + if (msg->port_msg.last != 0) { nxt_debug(task, "router data create last buf"); @@ -2298,6 +2430,12 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_buf_chain_add(&b, last); nxt_router_rc_unlink(task, rc); + + } else { + if (rc->app->timeout != 0) { + c->read_timer.handler = nxt_router_app_timeout; + nxt_timer_add(engine, &c->read_timer, rc->app->timeout); + } } if (b == NULL) { @@ -2327,10 +2465,41 @@ static void nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) { + nxt_int_t res; + nxt_port_t *port; + nxt_bool_t cancelled; + nxt_req_app_link_t *ra; nxt_req_conn_link_t *rc; rc = data; + ra = rc->ra; + + if (ra != NULL) { + cancelled = nxt_router_msg_cancel(task, &ra->msg_info, ra->stream); + + if (cancelled) { + nxt_router_ra_inc_use(ra); + + res = nxt_router_app_port(task, ra); + + if (res == NXT_OK) { + port = ra->app_port; + + nxt_assert(port != NULL); + + nxt_port_rpc_ex_set_peer(task, task->thread->engine->port, rc, + port->pid); + + nxt_router_app_prepare_request(task, ra); + } + + msg->port_msg.last = 0; + + return; + } + } + nxt_router_gen_error(task, rc->conn, 500, "Application terminated unexpectedly"); @@ -2478,7 +2647,7 @@ nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_queue_remove(lnk); lnk->next = NULL; - ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link); + ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_requests); } else { ra = NULL; @@ -2491,7 +2660,7 @@ nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, &app->name, app, ra->stream); nxt_router_ra_error(ra, 500, "Failed to start application worker"); - nxt_router_ra_release(task, ra, ra->work.data); + nxt_router_ra_use(task, ra, -1); } nxt_router_app_use(task, app, -1); @@ -2533,7 +2702,7 @@ nxt_router_app_first_port_busy(nxt_app_t *app) nxt_inline nxt_port_t * -nxt_router_app_get_port_unsafe(nxt_app_t *app, int *use_delta) +nxt_router_app_get_port_unsafe(nxt_app_t *app) { nxt_port_t *port; nxt_queue_link_t *lnk; @@ -2550,10 +2719,10 @@ nxt_router_app_get_port_unsafe(nxt_app_t *app, int *use_delta) { nxt_queue_insert_tail(&app->ports, lnk); + nxt_port_inc_use(port); + } else { lnk->next = NULL; - - (*use_delta)--; } return port; @@ -2606,7 +2775,7 @@ nxt_router_app_process_request(nxt_task_t *task, void *obj, void *data) nxt_debug(task, "app '%V' %p process next stream #%uD", &app->name, app, ra->stream); - nxt_router_process_http_request_mp(task, ra); + nxt_router_app_prepare_request(task, ra); } @@ -2614,19 +2783,16 @@ static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, uint32_t request_failed, uint32_t got_response) { - int use_delta, ra_use_delta; nxt_app_t *app; nxt_bool_t send_quit; nxt_queue_link_t *lnk; - nxt_req_app_link_t *ra; + nxt_req_app_link_t *ra, *next_ra; nxt_assert(port != NULL); nxt_assert(port->app != NULL); app = port->app; - use_delta = (request_failed == 0 && got_response == 0) ? 0 : -1; - nxt_thread_mutex_lock(&app->mutex); port->app_pending_responses -= request_failed + got_response; @@ -2645,7 +2811,7 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, nxt_queue_insert_head(&app->ports, &port->app_link); } - use_delta++; + nxt_port_inc_use(port); } else { if (port->app_pending_responses == 0 @@ -2665,21 +2831,46 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, nxt_queue_remove(lnk); lnk->next = NULL; - ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link); + ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_requests); - ra_use_delta = 1; - ra->app_port = nxt_router_app_get_port_unsafe(app, &ra_use_delta); + ra->app_port = nxt_router_app_get_port_unsafe(app); + + if (ra->app_port->app_pending_responses > 1) { + nxt_queue_insert_tail(&ra->app_port->pending_requests, + &ra->link_port_pending); + + nxt_router_ra_inc_use(ra); + } } else { ra = NULL; - ra_use_delta = 0; + } + + if ((request_failed > 0 || got_response > 0) + && !nxt_queue_is_empty(&port->pending_requests)) + { + lnk = nxt_queue_first(&port->pending_requests); + nxt_queue_remove(lnk); + lnk->next = NULL; + + next_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, + link_port_pending); + + } else { + next_ra = NULL; } send_quit = app->live == 0 && port->app_pending_responses > 0; nxt_thread_mutex_unlock(&app->mutex); + if (next_ra != NULL) { + nxt_router_ra_use(task, next_ra, -1); + } + if (ra != NULL) { + nxt_router_ra_use(task, ra, -1); + nxt_work_queue_add(&task->thread->engine->fast_work_queue, nxt_router_app_process_request, &task->thread->engine->task, app, ra); @@ -2710,12 +2901,8 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, adjust_use: - if (use_delta != 0) { - nxt_port_use(task, port, use_delta); - } - - if (ra_use_delta != 0) { - nxt_port_use(task, ra->app_port, ra_use_delta); + if (request_failed > 0 || got_response > 0) { + nxt_port_use(task, port, -1); } } @@ -2766,41 +2953,46 @@ nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port) static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra) { - int use_delta; - nxt_int_t res; - nxt_app_t *app; - nxt_bool_t can_start_worker; - nxt_conn_t *c; - nxt_port_t *port; - nxt_event_engine_t *engine; - nxt_socket_conf_joint_t *joint; + int failed_port_use_delta; + nxt_int_t res; + nxt_app_t *app; + nxt_bool_t can_start_worker; + nxt_conn_t *c; + nxt_port_t *port, *failed_port; - use_delta = 1; c = ra->rc->conn; + app = ra->rc->app; - joint = c->joint; - app = joint->socket_conf->application; + failed_port_use_delta = 0; - if (app == NULL) { - nxt_router_gen_error(task, c, 500, - "Application is NULL in socket_conf"); - return NXT_ERROR; + nxt_thread_mutex_lock(&app->mutex); + + if (nxt_queue_chk_remove(&ra->link_app_requests)) + { + nxt_router_ra_dec_use(ra); } - ra->rc->app = app; + if (nxt_queue_chk_remove(&ra->link_port_pending)) + { + nxt_router_ra_dec_use(ra); + } - nxt_router_app_use(task, app, 1); + if (ra->app_port != NULL) { + failed_port = ra->app_port; + failed_port_use_delta--; - engine = task->thread->engine; + failed_port->app_pending_responses--; - nxt_timer_disable(engine, &c->read_timer); + if (failed_port->app_link.next != NULL) { + nxt_queue_remove(&failed_port->app_link); + failed_port->app_link.next = NULL; - if (app->timeout != 0) { - c->read_timer.handler = nxt_router_app_timeout; - nxt_timer_add(engine, &c->read_timer, app->timeout); - } + failed_port_use_delta--; + } - nxt_thread_mutex_lock(&app->mutex); + } else { + failed_port = NULL; + } can_start_worker = (app->workers + app->pending_workers) < app->max_workers; @@ -2811,7 +3003,12 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra) ra = nxt_router_ra_create(task, ra); if (nxt_fast_path(ra != NULL)) { - nxt_queue_insert_tail(&app->requests, &ra->link); + nxt_queue_insert_tail(&app->requests, &ra->link_app_requests); + + nxt_router_ra_inc_use(ra); + + nxt_debug(task, "ra stream #%uD enqueue to app->requests", + ra->stream); if (can_start_worker) { app->pending_workers++; @@ -2821,14 +3018,37 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra) port = NULL; } else { - port = nxt_router_app_get_port_unsafe(app, &use_delta); + port = nxt_router_app_get_port_unsafe(app); + + if (port->app_pending_responses > 1) { + ra = nxt_router_ra_create(task, ra); + + if (nxt_fast_path(ra != NULL)) { + nxt_queue_insert_tail(&port->pending_requests, + &ra->link_port_pending); + + nxt_router_ra_inc_use(ra); + + nxt_debug(task, "ra stream #%uD enqueue to " + "port->pending_requests", ra->stream); + } + } } nxt_thread_mutex_unlock(&app->mutex); + if (failed_port_use_delta != 0) { + nxt_port_use(task, failed_port, failed_port_use_delta); + } + if (nxt_slow_path(ra == NULL)) { nxt_router_gen_error(task, c, 500, "Failed to allocate " "req<->app link"); + + if (port != NULL) { + nxt_port_use(task, port, -1); + } + return NXT_ERROR; } @@ -2837,14 +3057,9 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra) ra->app_port = port; - if (use_delta != 0) { - nxt_port_use(task, port, use_delta); - } return NXT_OK; } - nxt_debug(task, "ra stream #%uD allocated", ra->stream); - if (!can_start_worker) { nxt_debug(task, "app '%V' %p too many running or pending workers", &app->name, app); @@ -3127,11 +3342,22 @@ static void nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, nxt_app_parse_ctx_t *ap) { - nxt_int_t res; - nxt_port_t *port; - nxt_event_engine_t *engine; - nxt_req_app_link_t ra_local, *ra; - nxt_req_conn_link_t *rc; + nxt_int_t res; + nxt_app_t *app; + nxt_port_t *port; + nxt_event_engine_t *engine; + nxt_req_app_link_t ra_local, *ra; + nxt_req_conn_link_t *rc; + nxt_socket_conf_joint_t *joint; + + joint = c->joint; + app = joint->socket_conf->application; + + if (app == NULL) { + nxt_router_gen_error(task, c, 500, + "Application is NULL in socket_conf"); + return; + } engine = task->thread->engine; @@ -3149,6 +3375,11 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, rc->stream = nxt_port_rpc_ex_stream(rc); rc->conn = c; + rc->app = app; + + nxt_router_app_use(task, app, 1); + + nxt_timer_disable(engine, &c->read_timer); nxt_queue_insert_tail(&c->requests, &rc->link); @@ -3167,16 +3398,14 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, return; } + ra = rc->ra; port = ra->app_port; - if (nxt_slow_path(port == NULL)) { - nxt_router_gen_error(task, c, 500, "Application port not found"); - return; - } + nxt_assert(port != NULL); nxt_port_rpc_ex_set_peer(task, engine->port, rc, port->pid); - nxt_router_process_http_request_mp(task, ra); + nxt_router_app_prepare_request(task, ra); } @@ -3187,7 +3416,7 @@ nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj, void *data) static void -nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra) +nxt_router_app_prepare_request(nxt_task_t *task, nxt_req_app_link_t *ra) { uint32_t request_failed; nxt_buf_t *b; @@ -3266,7 +3495,7 @@ release_port: nxt_router_app_port_release(task, port, request_failed, 0); - nxt_router_ra_release(task, ra, ra->work.data); + nxt_router_ra_update_peer(task, ra); } |