diff options
author | Max Romanov <max.romanov@nginx.com> | 2017-10-04 14:58:47 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2017-10-04 14:58:47 +0300 |
commit | 6a64533fa3b96bb64bfde4615e40376d65a292cb (patch) | |
tree | a18ed8059158d833290519e1135209747e28af21 /src/nxt_router.c | |
parent | 414d508e04d26ebef0e3e1ba4ed518b11d3af1a0 (diff) | |
download | unit-6a64533fa3b96bb64bfde4615e40376d65a292cb.tar.gz unit-6a64533fa3b96bb64bfde4615e40376d65a292cb.tar.bz2 |
Introducing use counters for port and app. Thread safe port write.
Use counter helps to simplify logic around port and application free.
Port 'post' function introduced to simplify post execution of particular
function to original port engine's thread.
Write message queue is protected by mutex which makes port write operation
thread safe.
Diffstat (limited to 'src/nxt_router.c')
-rw-r--r-- | src/nxt_router.c | 762 |
1 files changed, 432 insertions, 330 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c index df3caf82..5eb95b59 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -24,19 +24,12 @@ typedef struct { typedef struct nxt_req_app_link_s nxt_req_app_link_t; -typedef struct nxt_start_worker_s nxt_start_worker_t; - -struct nxt_start_worker_s { - nxt_app_t *app; - nxt_req_app_link_t *ra; - - nxt_work_t work; -}; typedef struct { uint32_t stream; nxt_conn_t *conn; + nxt_app_t *app; nxt_port_t *app_port; nxt_req_app_link_t *ra; @@ -72,6 +65,8 @@ typedef struct { } nxt_remove_pid_msg_t; +static nxt_int_t nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app); + static void nxt_router_worker_remove_pid_handler(nxt_task_t *task, void *obj, void *data); static void nxt_router_worker_remove_pid_done(nxt_task_t *task, void *obj, @@ -124,7 +119,7 @@ static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, nxt_router_temp_conf_t *tmcf); static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, nxt_event_engine_t *engine); -static void nxt_router_apps_sort(nxt_router_t *router, +static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router, nxt_router_temp_conf_t *tmcf); static void nxt_router_engines_post(nxt_router_t *router, @@ -150,12 +145,14 @@ static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, static void nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint); -static void nxt_router_send_sw_request(nxt_task_t *task, void *obj, - void *data); -static nxt_bool_t nxt_router_app_free(nxt_task_t *task, nxt_app_t *app); -static nxt_port_t * nxt_router_app_get_port(nxt_app_t *app, uint32_t stream); -static void nxt_router_app_release_port(nxt_task_t *task, void *obj, - void *data); +static void nxt_router_app_port_ready(nxt_task_t *task, + nxt_port_recv_msg_t *msg, void *data); +static void nxt_router_app_port_error(nxt_task_t *task, + nxt_port_recv_msg_t *msg, void *data); + +static nxt_port_t * nxt_router_app_get_idle_port(nxt_app_t *app); +static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, + uint32_t request_failed, uint32_t got_response); static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data); static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, @@ -165,7 +162,7 @@ static void nxt_router_conn_http_body_read(nxt_task_t *task, void *obj, static void nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, nxt_app_parse_ctx_t *ap); static void nxt_router_process_http_request_mp(nxt_task_t *task, - nxt_req_app_link_t *ra, nxt_port_t *port); + nxt_req_app_link_t *ra); static nxt_int_t nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, nxt_app_wmsg_t *wmsg); static nxt_int_t nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, @@ -222,68 +219,91 @@ nxt_router_start(nxt_task_t *task, void *data) } -static nxt_start_worker_t * -nxt_router_sw_create(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra) +static void +nxt_router_start_worker_handler(nxt_task_t *task, nxt_port_t *port, void *data) { - nxt_port_t *main_port; - nxt_runtime_t *rt; - nxt_start_worker_t *sw; + size_t size; + uint32_t stream; + nxt_app_t *app; + nxt_buf_t *b; + nxt_port_t *main_port; + nxt_runtime_t *rt; - sw = nxt_zalloc(sizeof(nxt_start_worker_t)); + app = data; - if (nxt_slow_path(sw == NULL)) { - return NULL; - } + rt = task->thread->runtime; + main_port = rt->port_by_type[NXT_PROCESS_MAIN]; - sw->app = app; - sw->ra = ra; + nxt_debug(task, "app '%V' %p start worker", &app->name, app); - nxt_debug(task, "sw %p create, stream #%uD, app '%V' %p", sw, - ra->stream, &app->name, app); + size = app->name.length + 1 + app->conf.length; - rt = task->thread->runtime; - main_port = rt->port_by_type[NXT_PROCESS_MAIN]; + b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size); - sw->work.handler = nxt_router_send_sw_request; - sw->work.task = &main_port->engine->task; - sw->work.obj = sw; - sw->work.data = task->thread->engine; - sw->work.next = NULL; + if (nxt_slow_path(b == NULL)) { + goto failed; + } + + nxt_buf_cpystr(b, &app->name); + *b->mem.free++ = '\0'; + nxt_buf_cpystr(b, &app->conf); - if (task->thread->engine != main_port->engine) { - nxt_debug(task, "sw %p post send to main engine %p", sw, - main_port->engine); + stream = nxt_port_rpc_register_handler(task, port, + nxt_router_app_port_ready, + nxt_router_app_port_error, + -1, app); - nxt_event_engine_post(main_port->engine, &sw->work); + if (nxt_slow_path(stream == 0)) { + nxt_mp_release(b->data, b); - } else { - nxt_router_send_sw_request(task, sw, sw->work.data); + goto failed; } - return sw; -} + nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1, + stream, port->id, b); + + return; +failed: -nxt_inline void -nxt_router_sw_release(nxt_task_t *task, nxt_start_worker_t *sw) -{ - nxt_debug(task, "sw %p release", sw); + nxt_thread_mutex_lock(&app->mutex); + + app->pending_workers--; - nxt_free(sw); + nxt_thread_mutex_unlock(&app->mutex); + + nxt_router_app_use(task, app, -1); } -nxt_inline void -nxt_router_rc_unlink(nxt_req_conn_link_t *rc) +static nxt_int_t +nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app) { - nxt_queue_remove(&rc->link); + nxt_int_t res; + nxt_port_t *router_port; + nxt_runtime_t *rt; - if (rc->ra != NULL) { - rc->ra->rc = NULL; - rc->ra = NULL; + rt = task->thread->runtime; + router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; + + nxt_router_app_use(task, app, 1); + + res = nxt_port_post(task, router_port, nxt_router_start_worker_handler, + app); + + if (res == NXT_OK) { + return res; } - rc->conn = NULL; + nxt_thread_mutex_lock(&app->mutex); + + app->pending_workers--; + + nxt_thread_mutex_unlock(&app->mutex); + + nxt_router_app_use(task, app, -1); + + return NXT_ERROR; } @@ -327,36 +347,18 @@ nxt_router_ra_create(nxt_task_t *task, nxt_req_conn_link_t *rc) static void nxt_router_ra_release(nxt_task_t *task, void *obj, void *data) { - nxt_port_t *app_port; - nxt_req_app_link_t *ra; - nxt_event_engine_t *engine; + nxt_req_app_link_t *ra; + nxt_event_engine_t *engine; + nxt_req_conn_link_t *rc; ra = obj; engine = data; - if (ra->app_port != NULL) { - - app_port = ra->app_port; - ra->app_port = NULL; - - if (task->thread->engine != engine) { - ra->app_pid = app_port->pid; - } - - nxt_router_app_release_port(task, app_port, app_port->app); - -#if 0 - /* Uncomment to hold app port until complete response received. */ - if (ra->rc != NULL) { - ra->rc->app_port = ra->app_port; - - } else { - nxt_router_app_release_port(task, ra->app_port, ra->app_port->app); + if (task->thread->engine != engine) { + if (ra->app_port != NULL) { + ra->app_pid = ra->app_port->pid; } -#endif - } - if (task->thread->engine != engine) { ra->work.handler = nxt_router_ra_release; ra->work.task = &engine->task; ra->work.next = NULL; @@ -369,11 +371,27 @@ nxt_router_ra_release(nxt_task_t *task, void *obj, void *data) return; } - if (ra->rc != NULL && ra->app_pid != -1) { - nxt_port_rpc_ex_set_peer(task, engine->port, ra->rc, ra->app_pid); + nxt_debug(task, "ra stream #%uD release", ra->stream); + + rc = ra->rc; + + if (rc != NULL) { + if (ra->app_pid != -1) { + nxt_port_rpc_ex_set_peer(task, engine->port, rc, ra->app_pid); + } + + rc->app_port = ra->app_port; + + ra->app_port = NULL; + rc->ra = NULL; + ra->rc = NULL; } - nxt_debug(task, "ra stream #%uD release", ra->stream); + if (ra->app_port != NULL) { + nxt_router_app_port_release(task, ra->app_port, 0, 1); + + ra->app_port = NULL; + } nxt_mp_release(ra->mem_pool, ra); } @@ -382,9 +400,10 @@ nxt_router_ra_release(nxt_task_t *task, void *obj, void *data) static void nxt_router_ra_abort(nxt_task_t *task, void *obj, void *data) { - nxt_conn_t *c; - nxt_req_app_link_t *ra; - nxt_event_engine_t *engine; + nxt_conn_t *c; + nxt_req_app_link_t *ra; + nxt_req_conn_link_t *rc; + nxt_event_engine_t *engine; ra = obj; engine = data; @@ -403,17 +422,75 @@ nxt_router_ra_abort(nxt_task_t *task, void *obj, void *data) nxt_debug(task, "ra stream #%uD abort", ra->stream); - if (ra->rc != NULL) { - c = ra->rc->conn; + rc = ra->rc; + + if (rc != NULL) { + c = rc->conn; nxt_router_gen_error(task, c, 500, "Failed to start application worker"); + + rc->ra = NULL; + ra->rc = NULL; + } + + if (ra->app_port != NULL) { + nxt_router_app_port_release(task, ra->app_port, 0, 1); + + ra->app_port = NULL; } nxt_mp_release(ra->mem_pool, ra); } +nxt_inline void +nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc) +{ + nxt_req_app_link_t *ra; + + if (rc->app_port != NULL) { + nxt_router_app_port_release(task, rc->app_port, 0, 1); + + rc->app_port = NULL; + } + + ra = rc->ra; + + if (ra != NULL) { + rc->ra = NULL; + ra->rc = NULL; + + nxt_thread_mutex_lock(&rc->app->mutex); + + if (ra->link.next != NULL) { + nxt_queue_remove(&ra->link); + + ra->link.next = NULL; + + } else { + ra = NULL; + } + + nxt_thread_mutex_unlock(&rc->app->mutex); + } + + if (ra != NULL) { + nxt_router_ra_release(task, ra, ra->work.data); + } + + if (rc->app != NULL) { + nxt_router_app_use(task, rc->app, -1); + + rc->app = NULL; + } + + nxt_queue_remove(&rc->link); + + rc->conn = NULL; +} + + void nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { @@ -526,12 +603,20 @@ nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) static void nxt_router_worker_remove_pid_handler(nxt_task_t *task, void *obj, void *data) { + nxt_pid_t pid; + nxt_buf_t *buf; nxt_event_engine_t *engine; nxt_remove_pid_msg_t *rp; rp = obj; - nxt_port_remove_pid_handler(task, &rp->msg); + buf = rp->msg.buf; + + nxt_assert(nxt_buf_used_size(buf) == sizeof(pid)); + + nxt_memcpy(&pid, buf->mem.pos, sizeof(pid)); + + nxt_port_rpc_remove_peer(task, rp->msg.port, pid); engine = rp->work.data; @@ -658,7 +743,7 @@ nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data) goto fail; } - nxt_router_apps_sort(router, tmcf); + nxt_router_apps_sort(task, router, tmcf); nxt_router_engines_post(router, tmcf); @@ -1005,9 +1090,12 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, app->max_workers = apcf.workers; app->timeout = apcf.timeout; app->live = 1; + app->max_pending_responses = 2; app->prepare_msg = nxt_app_prepare_msg[type]; nxt_queue_insert_tail(&tmcf->apps, &app->link); + + nxt_router_app_use(task, app, 1); } http = nxt_conf_get_path(conf, &http_path); @@ -1695,42 +1783,36 @@ nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, static void -nxt_router_apps_sort(nxt_router_t *router, nxt_router_temp_conf_t *tmcf) +nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router, + nxt_router_temp_conf_t *tmcf) { - nxt_app_t *app; - nxt_port_t *port; + nxt_app_t *app; + nxt_port_t *port; nxt_queue_each(app, &router->apps, nxt_app_t, link) { nxt_queue_remove(&app->link); - nxt_thread_log_debug("about to remove app '%V' %p", &app->name, app); + nxt_debug(task, "about to free app '%V' %p", &app->name, app); app->live = 0; - if (nxt_router_app_free(NULL, app) != 0) { - continue; - } - - if (!nxt_queue_is_empty(&app->requests)) { - - nxt_thread_log_debug("app '%V' %p pending requests found", - &app->name, app); - continue; - } - do { - port = nxt_router_app_get_port(app, 0); + port = nxt_router_app_get_idle_port(app); if (port == NULL) { break; } - nxt_thread_log_debug("port %p send quit", port); + nxt_debug(task, "port %p send quit", port); + + nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, + NULL); - nxt_port_socket_write(&port->engine->task, port, - NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); + nxt_port_use(task, port, -1); } while (1); + nxt_router_app_use(task, app, -1); + } nxt_queue_loop; nxt_queue_add(&router->apps, &tmcf->previous); @@ -1833,7 +1915,7 @@ nxt_router_thread_start(void *data) ret = nxt_port_socket_init(task, port, 0); if (nxt_slow_path(ret != NXT_OK)) { - nxt_mp_release(port->mem_pool, port); + nxt_port_use(task, port, -1); return; } @@ -2124,8 +2206,9 @@ nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data) // TODO notify all apps + port->engine = task->thread->engine; nxt_mp_thread_adopt(port->mem_pool); - nxt_port_release(port); + nxt_port_use(task, port, -1); nxt_mp_thread_adopt(engine->mem_pool); nxt_mp_destroy(engine->mem_pool); @@ -2240,13 +2323,7 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_buf_chain_add(&b, last); - if (rc->app_port != NULL) { - nxt_router_app_release_port(task, rc->app_port, rc->app_port->app); - - rc->app_port = NULL; - } - - nxt_router_rc_unlink(rc); + nxt_router_rc_unlink(task, rc); } if (b == NULL) { @@ -2283,7 +2360,7 @@ nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_router_gen_error(task, rc->conn, 500, "Application terminated unexpectedly"); - nxt_router_rc_unlink(rc); + nxt_router_rc_unlink(task, rc); } @@ -2383,194 +2460,153 @@ nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code, static void -nxt_router_sw_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) +nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, + void *data) { - nxt_start_worker_t *sw; + nxt_app_t *app; + nxt_port_t *port; + + app = data; + port = msg->new_port; - sw = data; + nxt_assert(app != NULL); + nxt_assert(port != NULL); - nxt_assert(sw != NULL); - nxt_assert(sw->app->pending_workers != 0); + port->app = app; - msg->new_port->app = sw->app; + nxt_thread_mutex_lock(&app->mutex); - sw->app->pending_workers--; - sw->app->workers++; + nxt_assert(app->pending_workers != 0); - nxt_debug(task, "sw %p got port %p", sw, msg->new_port); + app->pending_workers--; + app->workers++; + + nxt_thread_mutex_unlock(&app->mutex); - nxt_router_app_release_port(task, msg->new_port, sw->app); + nxt_debug(task, "app '%V' %p new port ready", &app->name, app); - nxt_router_sw_release(task, sw); + nxt_router_app_port_release(task, port, 0, 0); } static void -nxt_router_sw_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) +nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, + void *data) { nxt_app_t *app; nxt_queue_link_t *lnk; nxt_req_app_link_t *ra; - nxt_start_worker_t *sw; - sw = data; + app = data; - nxt_assert(sw != NULL); - nxt_assert(sw->app != NULL); - nxt_assert(sw->app->pending_workers != 0); + nxt_assert(app != NULL); - app = sw->app; + nxt_debug(task, "app '%V' %p start error", &app->name, app); + + nxt_thread_mutex_lock(&app->mutex); - sw->app->pending_workers--; + nxt_assert(app->pending_workers != 0); - nxt_debug(task, "sw %p error, failed to start app '%V'", - sw, &app->name); + app->pending_workers--; if (!nxt_queue_is_empty(&app->requests)) { lnk = nxt_queue_last(&app->requests); nxt_queue_remove(lnk); + lnk->next = NULL; ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link); + } else { + ra = NULL; + } + + nxt_thread_mutex_unlock(&app->mutex); + + if (ra != NULL) { nxt_debug(task, "app '%V' %p abort next stream #%uD", &app->name, app, ra->stream); nxt_router_ra_abort(task, ra, ra->work.data); } - nxt_router_sw_release(task, sw); + nxt_router_app_use(task, app, -1); } -static void -nxt_router_send_sw_request(nxt_task_t *task, void *obj, void *data) +void +nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i) { - size_t size; - uint32_t stream; - nxt_buf_t *b; - nxt_app_t *app; - nxt_port_t *main_port, *router_port, *app_port; - nxt_runtime_t *rt; - nxt_start_worker_t *sw; - nxt_req_app_link_t *ra; - - sw = obj; - app = sw->app; + int c; - if (nxt_queue_is_empty(&app->requests)) { - ra = sw->ra; - app_port = nxt_router_app_get_port(app, ra->stream); + c = nxt_atomic_fetch_add(&app->use_count, i); - if (app_port != NULL) { - nxt_debug(task, "app '%V' %p process stream #%uD", - &app->name, app, ra->stream); + if (i < 0 && c == -i) { - ra->app_port = app_port; + nxt_assert(app->live == 0); + nxt_assert(app->workers == 0); + nxt_assert(app->pending_workers == 0); + nxt_assert(nxt_queue_is_empty(&app->requests) != 0); + nxt_assert(nxt_queue_is_empty(&app->ports) != 0); - nxt_router_process_http_request_mp(task, ra, app_port); - - nxt_router_ra_release(task, ra, ra->work.data); - nxt_router_sw_release(task, sw); - - return; - } - } - - nxt_queue_insert_tail(&app->requests, &sw->ra->link); - - if (app->workers + app->pending_workers >= app->max_workers) { - nxt_debug(task, "app '%V' %p %uD/%uD running/pending workers, " - "max_workers (%uD) reached", &app->name, app, - app->workers, app->pending_workers, app->max_workers); - - nxt_router_sw_release(task, sw); - - return; + nxt_thread_mutex_destroy(&app->mutex); + nxt_free(app); } - - app->pending_workers++; - - nxt_debug(task, "sw %p send", sw); - - rt = task->thread->runtime; - main_port = rt->port_by_type[NXT_PROCESS_MAIN]; - router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; - - size = app->name.length + 1 + app->conf.length; - - b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size); - - nxt_buf_cpystr(b, &app->name); - *b->mem.free++ = '\0'; - nxt_buf_cpystr(b, &app->conf); - - stream = nxt_port_rpc_register_handler(task, router_port, - nxt_router_sw_ready, - nxt_router_sw_error, - main_port->pid, sw); - - nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1, - stream, router_port->id, b); } -static nxt_bool_t -nxt_router_app_free(nxt_task_t *task, nxt_app_t *app) +nxt_inline nxt_port_t * +nxt_router_app_get_port_unsafe(nxt_app_t *app, int *use_delta) { - nxt_queue_link_t *lnk; - nxt_req_app_link_t *ra; + nxt_port_t *port; + nxt_queue_link_t *lnk; - nxt_thread_log_debug("app '%V' %p state: %d/%uD/%uD/%d", &app->name, app, - app->live, app->workers, app->pending_workers, - nxt_queue_is_empty(&app->requests)); + lnk = nxt_queue_first(&app->ports); + nxt_queue_remove(lnk); - if (app->live == 0 - && app->workers == 0 - && app->pending_workers == 0 - && nxt_queue_is_empty(&app->requests)) - { - nxt_thread_mutex_destroy(&app->mutex); - nxt_free(app); + port = nxt_queue_link_data(lnk, nxt_port_t, app_link); - return 1; - } + port->app_requests++; - if (app->live == 1 - && nxt_queue_is_empty(&app->requests) == 0 - && app->workers + app->pending_workers < app->max_workers) + if (app->live && + (app->max_pending_responses == 0 || + (port->app_requests - port->app_responses) < + app->max_pending_responses) ) { - lnk = nxt_queue_first(&app->requests); - nxt_queue_remove(lnk); + nxt_queue_insert_tail(&app->ports, lnk); - ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link); + } else { + lnk->next = NULL; - nxt_router_sw_create(task, app, ra); + (*use_delta)--; } - return 0; + return port; } static nxt_port_t * -nxt_router_app_get_port(nxt_app_t *app, uint32_t stream) +nxt_router_app_get_idle_port(nxt_app_t *app) { - nxt_port_t *port; - nxt_queue_link_t *lnk; + nxt_port_t *port; port = NULL; nxt_thread_mutex_lock(&app->mutex); - if (!nxt_queue_is_empty(&app->ports)) { - lnk = nxt_queue_first(&app->ports); - nxt_queue_remove(lnk); + nxt_queue_each(port, &app->ports, nxt_port_t, app_link) { - lnk->next = NULL; + if (port->app_requests > port->app_responses) { + port = NULL; - port = nxt_queue_link_data(lnk, nxt_port_t, app_link); + continue; + } - port->app_stream = stream; - } + nxt_queue_remove(&port->app_link); + port->app_link.next = NULL; + + break; + + } nxt_queue_loop; nxt_thread_mutex_unlock(&app->mutex); @@ -2579,151 +2615,175 @@ nxt_router_app_get_port(nxt_app_t *app, uint32_t stream) static void -nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data) +nxt_router_app_process_request(nxt_task_t *task, void *obj, void *data) { - nxt_app_t *app; - nxt_port_t *port; - nxt_work_t *work; - nxt_queue_link_t *lnk; - nxt_req_app_link_t *ra; + nxt_app_t *app; + nxt_req_app_link_t *ra; - port = obj; - app = data; + app = obj; + ra = data; nxt_assert(app != NULL); - nxt_assert(app == port->app); - nxt_assert(port->app_link.next == NULL); + nxt_assert(ra != NULL); + nxt_assert(ra->app_port != NULL); + nxt_debug(task, "app '%V' %p process next stream #%uD", + &app->name, app, ra->stream); - if (task->thread->engine != port->engine) { - work = &port->work; + nxt_router_process_http_request_mp(task, ra); +} - nxt_debug(task, "post release port to engine %p", port->engine); - work->next = NULL; - work->handler = nxt_router_app_release_port; - work->task = &port->engine->task; - work->obj = port; - work->data = app; +static void +nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, + uint32_t request_failed, uint32_t got_response) +{ + int use_delta, ra_use_delta; + nxt_app_t *app; + nxt_queue_link_t *lnk; + nxt_req_app_link_t *ra; - nxt_event_engine_post(port->engine, work); + nxt_assert(port != NULL); + nxt_assert(port->app != NULL); - return; + app = port->app; + + use_delta = (request_failed == 0 && got_response == 0) ? 0 : -1; + + nxt_thread_mutex_lock(&app->mutex); + + port->app_requests -= request_failed; + port->app_responses += got_response; + + if (app->live != 0 && + port->pair[1] != -1 && + port->app_link.next == NULL && + (app->max_pending_responses == 0 || + (port->app_requests - port->app_responses) < + app->max_pending_responses) ) + { + nxt_queue_insert_tail(&app->ports, &port->app_link); + use_delta++; } - if (!nxt_queue_is_empty(&app->requests)) { + if (app->live != 0 && + !nxt_queue_is_empty(&app->ports) && + !nxt_queue_is_empty(&app->requests)) + { lnk = nxt_queue_first(&app->requests); nxt_queue_remove(lnk); + lnk->next = NULL; ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link); - nxt_debug(task, "app '%V' %p process next stream #%uD", - &app->name, app, ra->stream); + ra_use_delta = 1; + ra->app_port = nxt_router_app_get_port_unsafe(app, &ra_use_delta); - ra->app_port = port; - port->app_stream = ra->stream; + } else { + ra = NULL; + ra_use_delta = 0; + } - nxt_router_process_http_request_mp(task, ra, port); + nxt_thread_mutex_unlock(&app->mutex); - nxt_router_ra_release(task, ra, ra->work.data); + if (ra != NULL) { + nxt_work_queue_add(&task->thread->engine->fast_work_queue, + nxt_router_app_process_request, + &task->thread->engine->task, app, ra); - return; + goto adjust_use; } - port->app_stream = 0; - + /* ? */ if (port->pair[1] == -1) { - nxt_debug(task, "app '%V' %p port already closed (pid %PI dead?)", - &app->name, app, port->pid); - - app->workers--; - nxt_router_app_free(task, app); + nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)", + &app->name, app, port, port->pid); - port->app = NULL; - - nxt_port_release(port); - - return; + goto adjust_use; } - if (!app->live) { + if (app->live == 0) { nxt_debug(task, "app '%V' %p is not alive, send QUIT to port", &app->name, app); nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); - return; + goto adjust_use; } nxt_debug(task, "app '%V' %p requests queue is empty, keep the port", &app->name, app); - nxt_thread_mutex_lock(&app->mutex); +adjust_use: - nxt_queue_insert_head(&app->ports, &port->app_link); + if (use_delta != 0) { + nxt_port_use(task, port, use_delta); + } - nxt_thread_mutex_unlock(&app->mutex); + if (ra_use_delta != 0) { + nxt_port_use(task, ra->app_port, ra_use_delta); + } } -nxt_bool_t -nxt_router_app_remove_port(nxt_port_t *port) +void +nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port) { nxt_app_t *app; - nxt_bool_t busy; + nxt_bool_t unchain, start_worker; app = port->app; - busy = port->app_stream != 0; - - if (app == NULL) { - nxt_thread_log_debug("port %p app remove, no app", port); - - nxt_assert(port->app_link.next == NULL); - return 1; - } + nxt_assert(app != NULL); nxt_thread_mutex_lock(&app->mutex); - if (port->app_link.next != NULL) { + unchain = port->app_link.next != NULL; + if (unchain) { nxt_queue_remove(&port->app_link); port->app_link.next = NULL; + } + app->workers--; + + start_worker = app->live != 0 && + nxt_queue_is_empty(&app->requests) == 0 && + app->workers + app->pending_workers < app->max_workers; + + if (start_worker) { + app->pending_workers++; } nxt_thread_mutex_unlock(&app->mutex); - if (busy == 0) { - nxt_thread_log_debug("port %p app remove, free, app '%V' %p", port, - &app->name, app); - - app->workers--; - nxt_router_app_free(&port->engine->task, app); + nxt_debug(task, "app '%V' %p port %p close", &app->name, app, port); - return 1; + if (unchain) { + nxt_port_use(task, port, -1); } - nxt_thread_log_debug("port %p app remove, busy, app '%V' %p, " - "app stream #%uD", port, &app->name, app, - port->app_stream); - - return 0; + if (start_worker) { + nxt_router_start_worker(task, app); + } } static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra) { + int use_delta; + nxt_int_t res; nxt_app_t *app; + nxt_bool_t can_start_worker; nxt_conn_t *c; nxt_port_t *port; nxt_event_engine_t *engine; - nxt_start_worker_t *sw; nxt_socket_conf_joint_t *joint; port = NULL; + use_delta = 1; c = ra->rc->conn; joint = c->listen->socket.data; @@ -2735,6 +2795,10 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra) return NXT_ERROR; } + ra->rc->app = app; + + nxt_router_app_use(task, app, 1); + engine = task->thread->engine; nxt_timer_disable(engine, &c->read_timer); @@ -2744,20 +2808,50 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra) nxt_timer_add(engine, &c->read_timer, app->timeout); } - port = nxt_router_app_get_port(app, ra->stream); + nxt_thread_mutex_lock(&app->mutex); + + if (!nxt_queue_is_empty(&app->ports)) { + port = nxt_router_app_get_port_unsafe(app, &use_delta); + + can_start_worker = 0; + + } else { + nxt_queue_insert_tail(&app->requests, &ra->link); + + can_start_worker = (app->workers + app->pending_workers) < + app->max_workers; + if (can_start_worker) { + app->pending_workers++; + } + + port = NULL; + } + + nxt_thread_mutex_unlock(&app->mutex); if (port != NULL) { - nxt_debug(task, "already have port for app '%V'", &app->name); + nxt_debug(task, "already have port for app '%V' %p ", &app->name, app); ra->app_port = port; + + if (use_delta != 0) { + nxt_port_use(task, port, use_delta); + } return NXT_OK; } - sw = nxt_router_sw_create(task, app, ra); + if (!can_start_worker) { + nxt_debug(task, "app '%V' %p too many running or pending workers", + &app->name, app); + + return NXT_AGAIN; + } + + res = nxt_router_start_worker(task, app); + + if (nxt_slow_path(res != NXT_OK)) { + nxt_router_gen_error(task, c, 500, "Failed to start worker"); - if (nxt_slow_path(sw == NULL)) { - nxt_router_gen_error(task, c, 500, - "Failed to allocate start worker struct"); return NXT_ERROR; } @@ -3011,18 +3105,16 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, nxt_port_rpc_ex_set_peer(task, engine->port, rc, port->pid); - nxt_router_process_http_request_mp(task, ra, port); - - nxt_router_ra_release(task, ra, ra->work.data); + nxt_router_process_http_request_mp(task, ra); } static void -nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra, - nxt_port_t *port) +nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra) { + uint32_t request_failed; nxt_int_t res; - nxt_port_t *c_port, *reply_port; + nxt_port_t *port, *c_port, *reply_port; nxt_conn_t *c; nxt_app_wmsg_t wmsg; nxt_app_parse_ctx_t *ap; @@ -3030,11 +3122,15 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra, /* TODO: it is unsafe to use ra->rc and ra->rc->conn in main thread */ nxt_assert(ra->rc != NULL); + nxt_assert(ra->app_port != NULL); + port = ra->app_port; reply_port = ra->reply_port; ap = ra->ap; c = ra->rc->conn; + request_failed = 1; + c_port = nxt_process_connected_port_find(port->process, reply_port->pid, reply_port->id); if (nxt_slow_path(c_port != reply_port)) { @@ -3043,7 +3139,7 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra, if (nxt_slow_path(res != NXT_OK)) { nxt_router_gen_error(task, c, 500, "Failed to send reply port to application"); - return; + goto release_port; } nxt_process_connected_port_add(port->process, reply_port); @@ -3059,21 +3155,33 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra, if (nxt_slow_path(res != NXT_OK)) { nxt_router_gen_error(task, c, 500, "Failed to prepare message for application"); - return; + goto release_port; } nxt_debug(task, "about to send %d bytes buffer to worker port %d", nxt_buf_used_size(wmsg.write), wmsg.port->socket.fd); + request_failed = 0; + res = nxt_port_socket_write(task, wmsg.port, NXT_PORT_MSG_DATA, -1, ra->stream, reply_port->id, wmsg.write); if (nxt_slow_path(res != NXT_OK)) { nxt_router_gen_error(task, c, 500, "Failed to send message to application"); - return; + goto release_port; } + +release_port: + + if (request_failed != 0) { + ra->app_port = 0; + } + + nxt_router_app_port_release(task, port, request_failed, 0); + + nxt_router_ra_release(task, ra, ra->work.data); } @@ -3452,13 +3560,7 @@ nxt_router_conn_free(nxt_task_t *task, void *obj, void *data) nxt_debug(task, "conn %p close, stream #%uD", c, rc->stream); - if (rc->app_port != NULL) { - nxt_router_app_release_port(task, rc->app_port, rc->app_port->app); - - rc->app_port = NULL; - } - - nxt_router_rc_unlink(rc); + nxt_router_rc_unlink(task, rc); nxt_port_rpc_cancel(task, task->thread->engine->port, rc->stream); |