diff options
Diffstat (limited to 'src/nxt_router.c')
-rw-r--r-- | src/nxt_router.c | 762 |
1 files changed, 432 insertions, 330 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c index df3caf82..5eb95b59 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -24,19 +24,12 @@ typedef struct { typedef struct nxt_req_app_link_s nxt_req_app_link_t; -typedef struct nxt_start_worker_s nxt_start_worker_t; - -struct nxt_start_worker_s { - nxt_app_t *app; - nxt_req_app_link_t *ra; - - nxt_work_t work; -}; typedef struct { uint32_t stream; nxt_conn_t *conn; + nxt_app_t *app; nxt_port_t *app_port; nxt_req_app_link_t *ra; @@ -72,6 +65,8 @@ typedef struct { } nxt_remove_pid_msg_t; +static nxt_int_t nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app); + static void nxt_router_worker_remove_pid_handler(nxt_task_t *task, void *obj, void *data); static void nxt_router_worker_remove_pid_done(nxt_task_t *task, void *obj, @@ -124,7 +119,7 @@ static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, nxt_router_temp_conf_t *tmcf); static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, nxt_event_engine_t *engine); -static void nxt_router_apps_sort(nxt_router_t *router, +static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router, nxt_router_temp_conf_t *tmcf); static void nxt_router_engines_post(nxt_router_t *router, @@ -150,12 +145,14 @@ static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, static void nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint); -static void nxt_router_send_sw_request(nxt_task_t *task, void *obj, - void *data); -static nxt_bool_t nxt_router_app_free(nxt_task_t *task, nxt_app_t *app); -static nxt_port_t * nxt_router_app_get_port(nxt_app_t *app, uint32_t stream); -static void nxt_router_app_release_port(nxt_task_t *task, void *obj, - void *data); +static void nxt_router_app_port_ready(nxt_task_t *task, + nxt_port_recv_msg_t *msg, void *data); +static void nxt_router_app_port_error(nxt_task_t *task, + nxt_port_recv_msg_t *msg, void *data); + +static nxt_port_t * nxt_router_app_get_idle_port(nxt_app_t *app); +static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, + uint32_t request_failed, uint32_t got_response); static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data); static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, @@ -165,7 +162,7 @@ static void nxt_router_conn_http_body_read(nxt_task_t *task, void *obj, static void nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, nxt_app_parse_ctx_t *ap); static void nxt_router_process_http_request_mp(nxt_task_t *task, - nxt_req_app_link_t *ra, nxt_port_t *port); + nxt_req_app_link_t *ra); static nxt_int_t nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, nxt_app_wmsg_t *wmsg); static nxt_int_t nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, @@ -222,68 +219,91 @@ nxt_router_start(nxt_task_t *task, void *data) } -static nxt_start_worker_t * -nxt_router_sw_create(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra) +static void +nxt_router_start_worker_handler(nxt_task_t *task, nxt_port_t *port, void *data) { - nxt_port_t *main_port; - nxt_runtime_t *rt; - nxt_start_worker_t *sw; + size_t size; + uint32_t stream; + nxt_app_t *app; + nxt_buf_t *b; + nxt_port_t *main_port; + nxt_runtime_t *rt; - sw = nxt_zalloc(sizeof(nxt_start_worker_t)); + app = data; - if (nxt_slow_path(sw == NULL)) { - return NULL; - } + rt = task->thread->runtime; + main_port = rt->port_by_type[NXT_PROCESS_MAIN]; - sw->app = app; - sw->ra = ra; + nxt_debug(task, "app '%V' %p start worker", &app->name, app); - nxt_debug(task, "sw %p create, stream #%uD, app '%V' %p", sw, - ra->stream, &app->name, app); + size = app->name.length + 1 + app->conf.length; - rt = task->thread->runtime; - main_port = rt->port_by_type[NXT_PROCESS_MAIN]; + b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size); - sw->work.handler = nxt_router_send_sw_request; - sw->work.task = &main_port->engine->task; - sw->work.obj = sw; - sw->work.data = task->thread->engine; - sw->work.next = NULL; + if (nxt_slow_path(b == NULL)) { + goto failed; + } + + nxt_buf_cpystr(b, &app->name); + *b->mem.free++ = '\0'; + nxt_buf_cpystr(b, &app->conf); - if (task->thread->engine != main_port->engine) { - nxt_debug(task, "sw %p post send to main engine %p", sw, - main_port->engine); + stream = nxt_port_rpc_register_handler(task, port, + nxt_router_app_port_ready, + nxt_router_app_port_error, + -1, app); - nxt_event_engine_post(main_port->engine, &sw->work); + if (nxt_slow_path(stream == 0)) { + nxt_mp_release(b->data, b); - } else { - nxt_router_send_sw_request(task, sw, sw->work.data); + goto failed; } - return sw; -} + nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1, + stream, port->id, b); + + return; +failed: -nxt_inline void -nxt_router_sw_release(nxt_task_t *task, nxt_start_worker_t *sw) -{ - nxt_debug(task, "sw %p release", sw); + nxt_thread_mutex_lock(&app->mutex); + + app->pending_workers--; - nxt_free(sw); + nxt_thread_mutex_unlock(&app->mutex); + + nxt_router_app_use(task, app, -1); } -nxt_inline void -nxt_router_rc_unlink(nxt_req_conn_link_t *rc) +static nxt_int_t +nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app) { - nxt_queue_remove(&rc->link); + nxt_int_t res; + nxt_port_t *router_port; + nxt_runtime_t *rt; - if (rc->ra != NULL) { - rc->ra->rc = NULL; - rc->ra = NULL; + rt = task->thread->runtime; + router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; + + nxt_router_app_use(task, app, 1); + + res = nxt_port_post(task, router_port, nxt_router_start_worker_handler, + app); + + if (res == NXT_OK) { + return res; } - rc->conn = NULL; + nxt_thread_mutex_lock(&app->mutex); + + app->pending_workers--; + + nxt_thread_mutex_unlock(&app->mutex); + + nxt_router_app_use(task, app, -1); + + return NXT_ERROR; } @@ -327,36 +347,18 @@ nxt_router_ra_create(nxt_task_t *task, nxt_req_conn_link_t *rc) static void nxt_router_ra_release(nxt_task_t *task, void *obj, void *data) { - nxt_port_t *app_port; - nxt_req_app_link_t *ra; - nxt_event_engine_t *engine; + nxt_req_app_link_t *ra; + nxt_event_engine_t *engine; + nxt_req_conn_link_t *rc; ra = obj; engine = data; - if (ra->app_port != NULL) { - - app_port = ra->app_port; - ra->app_port = NULL; - - if (task->thread->engine != engine) { - ra->app_pid = app_port->pid; - } - - nxt_router_app_release_port(task, app_port, app_port->app); - -#if 0 - /* Uncomment to hold app port until complete response received. */ - if (ra->rc != NULL) { - ra->rc->app_port = ra->app_port; - - } else { - nxt_router_app_release_port(task, ra->app_port, ra->app_port->app); + if (task->thread->engine != engine) { + if (ra->app_port != NULL) { + ra->app_pid = ra->app_port->pid; } -#endif - } - if (task->thread->engine != engine) { ra->work.handler = nxt_router_ra_release; ra->work.task = &engine->task; ra->work.next = NULL; @@ -369,11 +371,27 @@ nxt_router_ra_release(nxt_task_t *task, void *obj, void *data) return; } - if (ra->rc != NULL && ra->app_pid != -1) { - nxt_port_rpc_ex_set_peer(task, engine->port, ra->rc, ra->app_pid); + nxt_debug(task, "ra stream #%uD release", ra->stream); + + rc = ra->rc; + + if (rc != NULL) { + if (ra->app_pid != -1) { + nxt_port_rpc_ex_set_peer(task, engine->port, rc, ra->app_pid); + } + + rc->app_port = ra->app_port; + + ra->app_port = NULL; + rc->ra = NULL; + ra->rc = NULL; } - nxt_debug(task, "ra stream #%uD release", ra->stream); + if (ra->app_port != NULL) { + nxt_router_app_port_release(task, ra->app_port, 0, 1); + + ra->app_port = NULL; + } nxt_mp_release(ra->mem_pool, ra); } @@ -382,9 +400,10 @@ nxt_router_ra_release(nxt_task_t *task, void *obj, void *data) static void nxt_router_ra_abort(nxt_task_t *task, void *obj, void *data) { - nxt_conn_t *c; - nxt_req_app_link_t *ra; - nxt_event_engine_t *engine; + nxt_conn_t *c; + nxt_req_app_link_t *ra; + nxt_req_conn_link_t *rc; + nxt_event_engine_t *engine; ra = obj; engine = data; @@ -403,17 +422,75 @@ nxt_router_ra_abort(nxt_task_t *task, void *obj, void *data) nxt_debug(task, "ra stream #%uD abort", ra->stream); - if (ra->rc != NULL) { - c = ra->rc->conn; + rc = ra->rc; + + if (rc != NULL) { + c = rc->conn; nxt_router_gen_error(task, c, 500, "Failed to start application worker"); + + rc->ra = NULL; + ra->rc = NULL; + } + + if (ra->app_port != NULL) { + nxt_router_app_port_release(task, ra->app_port, 0, 1); + + ra->app_port = NULL; } nxt_mp_release(ra->mem_pool, ra); } +nxt_inline void +nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc) +{ + nxt_req_app_link_t *ra; + + if (rc->app_port != NULL) { + nxt_router_app_port_release(task, rc->app_port, 0, 1); + + rc->app_port = NULL; + } + + ra = rc->ra; + + if (ra != NULL) { + rc->ra = NULL; + ra->rc = NULL; + + nxt_thread_mutex_lock(&rc->app->mutex); + + if (ra->link.next != NULL) { + nxt_queue_remove(&ra->link); + + ra->link.next = NULL; + + } else { + ra = NULL; + } + + nxt_thread_mutex_unlock(&rc->app->mutex); + } + + if (ra != NULL) { + nxt_router_ra_release(task, ra, ra->work.data); + } + + if (rc->app != NULL) { + nxt_router_app_use(task, rc->app, -1); + + rc->app = NULL; + } + + nxt_queue_remove(&rc->link); + + rc->conn = NULL; +} + + void nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { @@ -526,12 +603,20 @@ nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) static void nxt_router_worker_remove_pid_handler(nxt_task_t *task, void *obj, void *data) { + nxt_pid_t pid; + nxt_buf_t *buf; nxt_event_engine_t *engine; nxt_remove_pid_msg_t *rp; rp = obj; - nxt_port_remove_pid_handler(task, &rp->msg); + buf = rp->msg.buf; + + nxt_assert(nxt_buf_used_size(buf) == sizeof(pid)); + + nxt_memcpy(&pid, buf->mem.pos, sizeof(pid)); + + nxt_port_rpc_remove_peer(task, rp->msg.port, pid); engine = rp->work.data; @@ -658,7 +743,7 @@ nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data) goto fail; } - nxt_router_apps_sort(router, tmcf); + nxt_router_apps_sort(task, router, tmcf); nxt_router_engines_post(router, tmcf); @@ -1005,9 +1090,12 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, app->max_workers = apcf.workers; app->timeout = apcf.timeout; app->live = 1; + app->max_pending_responses = 2; app->prepare_msg = nxt_app_prepare_msg[type]; nxt_queue_insert_tail(&tmcf->apps, &app->link); + + nxt_router_app_use(task, app, 1); } http = nxt_conf_get_path(conf, &http_path); @@ -1695,42 +1783,36 @@ nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, static void -nxt_router_apps_sort(nxt_router_t *router, nxt_router_temp_conf_t *tmcf) +nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router, + nxt_router_temp_conf_t *tmcf) { - nxt_app_t *app; - nxt_port_t *port; + nxt_app_t *app; + nxt_port_t *port; nxt_queue_each(app, &router->apps, nxt_app_t, link) { nxt_queue_remove(&app->link); - nxt_thread_log_debug("about to remove app '%V' %p", &app->name, app); + nxt_debug(task, "about to free app '%V' %p", &app->name, app); app->live = 0; - if (nxt_router_app_free(NULL, app) != 0) { - continue; - } - - if (!nxt_queue_is_empty(&app->requests)) { - - nxt_thread_log_debug("app '%V' %p pending requests found", - &app->name, app); - continue; - } - do { - port = nxt_router_app_get_port(app, 0); + port = nxt_router_app_get_idle_port(app); if (port == NULL) { break; } - nxt_thread_log_debug("port %p send quit", port); + nxt_debug(task, "port %p send quit", port); + + nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, + NULL); - nxt_port_socket_write(&port->engine->task, port, - NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); + nxt_port_use(task, port, -1); } while (1); + nxt_router_app_use(task, app, -1); + } nxt_queue_loop; nxt_queue_add(&router->apps, &tmcf->previous); @@ -1833,7 +1915,7 @@ nxt_router_thread_start(void *data) ret = nxt_port_socket_init(task, port, 0); if (nxt_slow_path(ret != NXT_OK)) { - nxt_mp_release(port->mem_pool, port); + nxt_port_use(task, port, -1); return; } @@ -2124,8 +2206,9 @@ nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data) // TODO notify all apps + port->engine = task->thread->engine; nxt_mp_thread_adopt(port->mem_pool); - nxt_port_release(port); + nxt_port_use(task, port, -1); nxt_mp_thread_adopt(engine->mem_pool); nxt_mp_destroy(engine->mem_pool); @@ -2240,13 +2323,7 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_buf_chain_add(&b, last); - if (rc->app_port != NULL) { - nxt_router_app_release_port(task, rc->app_port, rc->app_port->app); - - rc->app_port = NULL; - } - - nxt_router_rc_unlink(rc); + nxt_router_rc_unlink(task, rc); } if (b == NULL) { @@ -2283,7 +2360,7 @@ nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_router_gen_error(task, rc->conn, 500, "Application terminated unexpectedly"); - nxt_router_rc_unlink(rc); + nxt_router_rc_unlink(task, rc); } @@ -2383,194 +2460,153 @@ nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code, static void -nxt_router_sw_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) +nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, + void *data) { - nxt_start_worker_t *sw; + nxt_app_t *app; + nxt_port_t *port; + + app = data; + port = msg->new_port; - sw = data; + nxt_assert(app != NULL); + nxt_assert(port != NULL); - nxt_assert(sw != NULL); - nxt_assert(sw->app->pending_workers != 0); + port->app = app; - msg->new_port->app = sw->app; + nxt_thread_mutex_lock(&app->mutex); - sw->app->pending_workers--; - sw->app->workers++; + nxt_assert(app->pending_workers != 0); - nxt_debug(task, "sw %p got port %p", sw, msg->new_port); + app->pending_workers--; + app->workers++; + + nxt_thread_mutex_unlock(&app->mutex); - nxt_router_app_release_port(task, msg->new_port, sw->app); + nxt_debug(task, "app '%V' %p new port ready", &app->name, app); - nxt_router_sw_release(task, sw); + nxt_router_app_port_release(task, port, 0, 0); } static void -nxt_router_sw_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) +nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, + void *data) { nxt_app_t *app; nxt_queue_link_t *lnk; nxt_req_app_link_t *ra; - nxt_start_worker_t *sw; - sw = data; + app = data; - nxt_assert(sw != NULL); - nxt_assert(sw->app != NULL); - nxt_assert(sw->app->pending_workers != 0); + nxt_assert(app != NULL); - app = sw->app; + nxt_debug(task, "app '%V' %p start error", &app->name, app); + + nxt_thread_mutex_lock(&app->mutex); - sw->app->pending_workers--; + nxt_assert(app->pending_workers != 0); - nxt_debug(task, "sw %p error, failed to start app '%V'", - sw, &app->name); + app->pending_workers--; if (!nxt_queue_is_empty(&app->requests)) { lnk = nxt_queue_last(&app->requests); nxt_queue_remove(lnk); + lnk->next = NULL; ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link); + } else { + ra = NULL; + } + + nxt_thread_mutex_unlock(&app->mutex); + + if (ra != NULL) { nxt_debug(task, "app '%V' %p abort next stream #%uD", &app->name, app, ra->stream); nxt_router_ra_abort(task, ra, ra->work.data); } - nxt_router_sw_release(task, sw); + nxt_router_app_use(task, app, -1); } -static void -nxt_router_send_sw_request(nxt_task_t *task, void *obj, void *data) +void +nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i) { - size_t size; - uint32_t stream; - nxt_buf_t *b; - nxt_app_t *app; - nxt_port_t *main_port, *router_port, *app_port; - nxt_runtime_t *rt; - nxt_start_worker_t *sw; - nxt_req_app_link_t *ra; - - sw = obj; - app = sw->app; + int c; - if (nxt_queue_is_empty(&app->requests)) { - ra = sw->ra; - app_port = nxt_router_app_get_port(app, ra->stream); + c = nxt_atomic_fetch_add(&app->use_count, i); - if (app_port != NULL) { - nxt_debug(task, "app '%V' %p process stream #%uD", - &app->name, app, ra->stream); + if (i < 0 && c == -i) { - ra->app_port = app_port; + nxt_assert(app->live == 0); + nxt_assert(app->workers == 0); + nxt_assert(app->pending_workers == 0); + nxt_assert(nxt_queue_is_empty(&app->requests) != 0); + nxt_assert(nxt_queue_is_empty(&app->ports) != 0); - nxt_router_process_http_request_mp(task, ra, app_port); - - nxt_router_ra_release(task, ra, ra->work.data); - nxt_router_sw_release(task, sw); - - return; - } - } - - nxt_queue_insert_tail(&app->requests, &sw->ra->link); - - if (app->workers + app->pending_workers >= app->max_workers) { - nxt_debug(task, "app '%V' %p %uD/%uD running/pending workers, " - "max_workers (%uD) reached", &app->name, app, - app->workers, app->pending_workers, app->max_workers); - - nxt_router_sw_release(task, sw); - - return; + nxt_thread_mutex_destroy(&app->mutex); + nxt_free(app); } - - app->pending_workers++; - - nxt_debug(task, "sw %p send", sw); - - rt = task->thread->runtime; - main_port = rt->port_by_type[NXT_PROCESS_MAIN]; - router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; - - size = app->name.length + 1 + app->conf.length; - - b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size); - - nxt_buf_cpystr(b, &app->name); - *b->mem.free++ = '\0'; - nxt_buf_cpystr(b, &app->conf); - - stream = nxt_port_rpc_register_handler(task, router_port, - nxt_router_sw_ready, - nxt_router_sw_error, - main_port->pid, sw); - - nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1, - stream, router_port->id, b); } -static nxt_bool_t -nxt_router_app_free(nxt_task_t *task, nxt_app_t *app) +nxt_inline nxt_port_t * +nxt_router_app_get_port_unsafe(nxt_app_t *app, int *use_delta) { - nxt_queue_link_t *lnk; - nxt_req_app_link_t *ra; + nxt_port_t *port; + nxt_queue_link_t *lnk; - nxt_thread_log_debug("app '%V' %p state: %d/%uD/%uD/%d", &app->name, app, - app->live, app->workers, app->pending_workers, - nxt_queue_is_empty(&app->requests)); + lnk = nxt_queue_first(&app->ports); + nxt_queue_remove(lnk); - if (app->live == 0 - && app->workers == 0 - && app->pending_workers == 0 - && nxt_queue_is_empty(&app->requests)) - { - nxt_thread_mutex_destroy(&app->mutex); - nxt_free(app); + port = nxt_queue_link_data(lnk, nxt_port_t, app_link); - return 1; - } + port->app_requests++; - if (app->live == 1 - && nxt_queue_is_empty(&app->requests) == 0 - && app->workers + app->pending_workers < app->max_workers) + if (app->live && + (app->max_pending_responses == 0 || + (port->app_requests - port->app_responses) < + app->max_pending_responses) ) { - lnk = nxt_queue_first(&app->requests); - nxt_queue_remove(lnk); + nxt_queue_insert_tail(&app->ports, lnk); - ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link); + } else { + lnk->next = NULL; - nxt_router_sw_create(task, app, ra); + (*use_delta)--; } - return 0; + return port; } static nxt_port_t * -nxt_router_app_get_port(nxt_app_t *app, uint32_t stream) +nxt_router_app_get_idle_port(nxt_app_t *app) { - nxt_port_t *port; - nxt_queue_link_t *lnk; + nxt_port_t *port; port = NULL; nxt_thread_mutex_lock(&app->mutex); - if (!nxt_queue_is_empty(&app->ports)) { - lnk = nxt_queue_first(&app->ports); - nxt_queue_remove(lnk); + nxt_queue_each(port, &app->ports, nxt_port_t, app_link) { - lnk->next = NULL; + if (port->app_requests > port->app_responses) { + port = NULL; - port = nxt_queue_link_data(lnk, nxt_port_t, app_link); + continue; + } - port->app_stream = stream; - } + nxt_queue_remove(&port->app_link); + port->app_link.next = NULL; + + break; + + } nxt_queue_loop; nxt_thread_mutex_unlock(&app->mutex); @@ -2579,151 +2615,175 @@ nxt_router_app_get_port(nxt_app_t *app, uint32_t stream) static void -nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data) +nxt_router_app_process_request(nxt_task_t *task, void *obj, void *data) { - nxt_app_t *app; - nxt_port_t *port; - nxt_work_t *work; - nxt_queue_link_t *lnk; - nxt_req_app_link_t *ra; + nxt_app_t *app; + nxt_req_app_link_t *ra; - port = obj; - app = data; + app = obj; + ra = data; nxt_assert(app != NULL); - nxt_assert(app == port->app); - nxt_assert(port->app_link.next == NULL); + nxt_assert(ra != NULL); + nxt_assert(ra->app_port != NULL); + nxt_debug(task, "app '%V' %p process next stream #%uD", + &app->name, app, ra->stream); - if (task->thread->engine != port->engine) { - work = &port->work; + nxt_router_process_http_request_mp(task, ra); +} - nxt_debug(task, "post release port to engine %p", port->engine); - work->next = NULL; - work->handler = nxt_router_app_release_port; - work->task = &port->engine->task; - work->obj = port; - work->data = app; +static void +nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, + uint32_t request_failed, uint32_t got_response) +{ + int use_delta, ra_use_delta; + nxt_app_t *app; + nxt_queue_link_t *lnk; + nxt_req_app_link_t *ra; - nxt_event_engine_post(port->engine, work); + nxt_assert(port != NULL); + nxt_assert(port->app != NULL); - return; + app = port->app; + + use_delta = (request_failed == 0 && got_response == 0) ? 0 : -1; + + nxt_thread_mutex_lock(&app->mutex); + + port->app_requests -= request_failed; + port->app_responses += got_response; + + if (app->live != 0 && + port->pair[1] != -1 && + port->app_link.next == NULL && + (app->max_pending_responses == 0 || + (port->app_requests - port->app_responses) < + app->max_pending_responses) ) + { + nxt_queue_insert_tail(&app->ports, &port->app_link); + use_delta++; } - if (!nxt_queue_is_empty(&app->requests)) { + if (app->live != 0 && + !nxt_queue_is_empty(&app->ports) && + !nxt_queue_is_empty(&app->requests)) + { lnk = nxt_queue_first(&app->requests); nxt_queue_remove(lnk); + lnk->next = NULL; ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link); - nxt_debug(task, "app '%V' %p process next stream #%uD", - &app->name, app, ra->stream); + ra_use_delta = 1; + ra->app_port = nxt_router_app_get_port_unsafe(app, &ra_use_delta); - ra->app_port = port; - port->app_stream = ra->stream; + } else { + ra = NULL; + ra_use_delta = 0; + } - nxt_router_process_http_request_mp(task, ra, port); + nxt_thread_mutex_unlock(&app->mutex); - nxt_router_ra_release(task, ra, ra->work.data); + if (ra != NULL) { + nxt_work_queue_add(&task->thread->engine->fast_work_queue, + nxt_router_app_process_request, + &task->thread->engine->task, app, ra); - return; + goto adjust_use; } - port->app_stream = 0; - + /* ? */ if (port->pair[1] == -1) { - nxt_debug(task, "app '%V' %p port already closed (pid %PI dead?)", - &app->name, app, port->pid); - - app->workers--; - nxt_router_app_free(task, app); + nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)", + &app->name, app, port, port->pid); - port->app = NULL; - - nxt_port_release(port); - - return; + goto adjust_use; } - if (!app->live) { + if (app->live == 0) { nxt_debug(task, "app '%V' %p is not alive, send QUIT to port", &app->name, app); nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); - return; + goto adjust_use; } nxt_debug(task, "app '%V' %p requests queue is empty, keep the port", &app->name, app); - nxt_thread_mutex_lock(&app->mutex); +adjust_use: - nxt_queue_insert_head(&app->ports, &port->app_link); + if (use_delta != 0) { + nxt_port_use(task, port, use_delta); + } - nxt_thread_mutex_unlock(&app->mutex); + if (ra_use_delta != 0) { + nxt_port_use(task, ra->app_port, ra_use_delta); + } } -nxt_bool_t -nxt_router_app_remove_port(nxt_port_t *port) +void +nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port) { nxt_app_t *app; - nxt_bool_t busy; + nxt_bool_t unchain, start_worker; app = port->app; - busy = port->app_stream != 0; - - if (app == NULL) { - nxt_thread_log_debug("port %p app remove, no app", port); - - nxt_assert(port->app_link.next == NULL); - return 1; - } + nxt_assert(app != NULL); nxt_thread_mutex_lock(&app->mutex); - if (port->app_link.next != NULL) { + unchain = port->app_link.next != NULL; + if (unchain) { nxt_queue_remove(&port->app_link); port->app_link.next = NULL; + } + app->workers--; + + start_worker = app->live != 0 && + nxt_queue_is_empty(&app->requests) == 0 && + app->workers + app->pending_workers < app->max_workers; + + if (start_worker) { + app->pending_workers++; } nxt_thread_mutex_unlock(&app->mutex); - if (busy == 0) { - nxt_thread_log_debug("port %p app remove, free, app '%V' %p", port, - &app->name, app); - - app->workers--; - nxt_router_app_free(&port->engine->task, app); + nxt_debug(task, "app '%V' %p port %p close", &app->name, app, port); - return 1; + if (unchain) { + nxt_port_use(task, port, -1); } - nxt_thread_log_debug("port %p app remove, busy, app '%V' %p, " - "app stream #%uD", port, &app->name, app, - port->app_stream); - - return 0; + if (start_worker) { + nxt_router_start_worker(task, app); + } } static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra) { + int use_delta; + nxt_int_t res; nxt_app_t *app; + nxt_bool_t can_start_worker; nxt_conn_t *c; nxt_port_t *port; nxt_event_engine_t *engine; - nxt_start_worker_t *sw; nxt_socket_conf_joint_t *joint; port = NULL; + use_delta = 1; c = ra->rc->conn; joint = c->listen->socket.data; @@ -2735,6 +2795,10 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra) return NXT_ERROR; } + ra->rc->app = app; + + nxt_router_app_use(task, app, 1); + engine = task->thread->engine; nxt_timer_disable(engine, &c->read_timer); @@ -2744,20 +2808,50 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra) nxt_timer_add(engine, &c->read_timer, app->timeout); } - port = nxt_router_app_get_port(app, ra->stream); + nxt_thread_mutex_lock(&app->mutex); + + if (!nxt_queue_is_empty(&app->ports)) { + port = nxt_router_app_get_port_unsafe(app, &use_delta); + + can_start_worker = 0; + + } else { + nxt_queue_insert_tail(&app->requests, &ra->link); + + can_start_worker = (app->workers + app->pending_workers) < + app->max_workers; + if (can_start_worker) { + app->pending_workers++; + } + + port = NULL; + } + + nxt_thread_mutex_unlock(&app->mutex); if (port != NULL) { - nxt_debug(task, "already have port for app '%V'", &app->name); + nxt_debug(task, "already have port for app '%V' %p ", &app->name, app); ra->app_port = port; + + if (use_delta != 0) { + nxt_port_use(task, port, use_delta); + } return NXT_OK; } - sw = nxt_router_sw_create(task, app, ra); + if (!can_start_worker) { + nxt_debug(task, "app '%V' %p too many running or pending workers", + &app->name, app); + + return NXT_AGAIN; + } + + res = nxt_router_start_worker(task, app); + + if (nxt_slow_path(res != NXT_OK)) { + nxt_router_gen_error(task, c, 500, "Failed to start worker"); - if (nxt_slow_path(sw == NULL)) { - nxt_router_gen_error(task, c, 500, - "Failed to allocate start worker struct"); return NXT_ERROR; } @@ -3011,18 +3105,16 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, nxt_port_rpc_ex_set_peer(task, engine->port, rc, port->pid); - nxt_router_process_http_request_mp(task, ra, port); - - nxt_router_ra_release(task, ra, ra->work.data); + nxt_router_process_http_request_mp(task, ra); } static void -nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra, - nxt_port_t *port) +nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra) { + uint32_t request_failed; nxt_int_t res; - nxt_port_t *c_port, *reply_port; + nxt_port_t *port, *c_port, *reply_port; nxt_conn_t *c; nxt_app_wmsg_t wmsg; nxt_app_parse_ctx_t *ap; @@ -3030,11 +3122,15 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra, /* TODO: it is unsafe to use ra->rc and ra->rc->conn in main thread */ nxt_assert(ra->rc != NULL); + nxt_assert(ra->app_port != NULL); + port = ra->app_port; reply_port = ra->reply_port; ap = ra->ap; c = ra->rc->conn; + request_failed = 1; + c_port = nxt_process_connected_port_find(port->process, reply_port->pid, reply_port->id); if (nxt_slow_path(c_port != reply_port)) { @@ -3043,7 +3139,7 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra, if (nxt_slow_path(res != NXT_OK)) { nxt_router_gen_error(task, c, 500, "Failed to send reply port to application"); - return; + goto release_port; } nxt_process_connected_port_add(port->process, reply_port); @@ -3059,21 +3155,33 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra, if (nxt_slow_path(res != NXT_OK)) { nxt_router_gen_error(task, c, 500, "Failed to prepare message for application"); - return; + goto release_port; } nxt_debug(task, "about to send %d bytes buffer to worker port %d", nxt_buf_used_size(wmsg.write), wmsg.port->socket.fd); + request_failed = 0; + res = nxt_port_socket_write(task, wmsg.port, NXT_PORT_MSG_DATA, -1, ra->stream, reply_port->id, wmsg.write); if (nxt_slow_path(res != NXT_OK)) { nxt_router_gen_error(task, c, 500, "Failed to send message to application"); - return; + goto release_port; } + +release_port: + + if (request_failed != 0) { + ra->app_port = 0; + } + + nxt_router_app_port_release(task, port, request_failed, 0); + + nxt_router_ra_release(task, ra, ra->work.data); } @@ -3452,13 +3560,7 @@ nxt_router_conn_free(nxt_task_t *task, void *obj, void *data) nxt_debug(task, "conn %p close, stream #%uD", c, rc->stream); - if (rc->app_port != NULL) { - nxt_router_app_release_port(task, rc->app_port, rc->app_port->app); - - rc->app_port = NULL; - } - - nxt_router_rc_unlink(rc); + nxt_router_rc_unlink(task, rc); nxt_port_rpc_cancel(task, task->thread->engine->port, rc->stream); |