diff options
-rw-r--r-- | src/nxt_port.h | 2 | ||||
-rw-r--r-- | src/nxt_router.c | 69 |
2 files changed, 48 insertions, 23 deletions
diff --git a/src/nxt_port.h b/src/nxt_port.h index c54a1537..cf30bc92 100644 --- a/src/nxt_port.h +++ b/src/nxt_port.h @@ -166,7 +166,7 @@ struct nxt_port_s { /* Maximum interleave of message parts. */ uint32_t max_share; - uint32_t app_requests; + uint32_t app_pending_responses; uint32_t app_responses; nxt_port_handler_t handler; diff --git a/src/nxt_router.c b/src/nxt_router.c index f637c864..c71338fc 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -2519,6 +2519,19 @@ nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i) } +nxt_inline nxt_bool_t +nxt_router_app_first_port_busy(nxt_app_t *app) +{ + nxt_port_t *port; + nxt_queue_link_t *lnk; + + lnk = nxt_queue_first(&app->ports); + port = nxt_queue_link_data(lnk, nxt_port_t, app_link); + + return port->app_pending_responses > 0; +} + + nxt_inline nxt_port_t * nxt_router_app_get_port_unsafe(nxt_app_t *app, int *use_delta) { @@ -2530,12 +2543,10 @@ nxt_router_app_get_port_unsafe(nxt_app_t *app, int *use_delta) port = nxt_queue_link_data(lnk, nxt_port_t, app_link); - port->app_requests++; + port->app_pending_responses++; - if (app->live && - (app->max_pending_responses == 0 || - (port->app_requests - port->app_responses) < - app->max_pending_responses) ) + if (app->max_pending_responses == 0 + || port->app_pending_responses < app->max_pending_responses) { nxt_queue_insert_tail(&app->ports, lnk); @@ -2560,7 +2571,7 @@ nxt_router_app_get_idle_port(nxt_app_t *app) nxt_queue_each(port, &app->ports, nxt_port_t, app_link) { - if (port->app_requests > port->app_responses) { + if (port->app_pending_responses > 0) { port = NULL; continue; @@ -2618,18 +2629,32 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, nxt_thread_mutex_lock(&app->mutex); - port->app_requests -= request_failed; + port->app_pending_responses -= request_failed + got_response; port->app_responses += got_response; if (app->live != 0 && port->pair[1] != -1 && - port->app_link.next == NULL && - (app->max_pending_responses == 0 || - (port->app_requests - port->app_responses) < - app->max_pending_responses) ) + (app->max_pending_responses == 0 + || port->app_pending_responses < app->max_pending_responses)) { - nxt_queue_insert_tail(&app->ports, &port->app_link); - use_delta++; + if (port->app_link.next == NULL) { + if (port->app_pending_responses > 0) { + nxt_queue_insert_tail(&app->ports, &port->app_link); + + } else { + nxt_queue_insert_head(&app->ports, &port->app_link); + } + + use_delta++; + + } else { + if (port->app_pending_responses == 0 + && nxt_queue_first(&app->ports) != &port->app_link) + { + nxt_queue_remove(&port->app_link); + nxt_queue_insert_head(&app->ports, &port->app_link); + } + } } if (app->live != 0 && @@ -2650,7 +2675,7 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, ra_use_delta = 0; } - send_quit = app->live == 0 && port->app_requests == port->app_responses; + send_quit = app->live == 0 && port->app_pending_responses > 0; nxt_thread_mutex_unlock(&app->mutex); @@ -2750,7 +2775,6 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra) nxt_event_engine_t *engine; nxt_socket_conf_joint_t *joint; - port = NULL; use_delta = 1; c = ra->rc->conn; @@ -2776,27 +2800,28 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra) nxt_timer_add(engine, &c->read_timer, app->timeout); } - can_start_worker = 0; - nxt_thread_mutex_lock(&app->mutex); - if (!nxt_queue_is_empty(&app->ports)) { - port = nxt_router_app_get_port_unsafe(app, &use_delta); + can_start_worker = (app->workers + app->pending_workers) + < app->max_workers; - } else { + if (nxt_queue_is_empty(&app->ports) + || (can_start_worker && nxt_router_app_first_port_busy(app)) ) + { ra = nxt_router_ra_create(task, ra); if (nxt_fast_path(ra != NULL)) { nxt_queue_insert_tail(&app->requests, &ra->link); - can_start_worker = (app->workers + app->pending_workers) < - app->max_workers; if (can_start_worker) { app->pending_workers++; } } port = NULL; + + } else { + port = nxt_router_app_get_port_unsafe(app, &use_delta); } nxt_thread_mutex_unlock(&app->mutex); |