summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_router.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nxt_router.c')
-rw-r--r--src/nxt_router.c762
1 files changed, 432 insertions, 330 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c
index df3caf82..5eb95b59 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -24,19 +24,12 @@ typedef struct {
typedef struct nxt_req_app_link_s nxt_req_app_link_t;
-typedef struct nxt_start_worker_s nxt_start_worker_t;
-
-struct nxt_start_worker_s {
- nxt_app_t *app;
- nxt_req_app_link_t *ra;
-
- nxt_work_t work;
-};
typedef struct {
uint32_t stream;
nxt_conn_t *conn;
+ nxt_app_t *app;
nxt_port_t *app_port;
nxt_req_app_link_t *ra;
@@ -72,6 +65,8 @@ typedef struct {
} nxt_remove_pid_msg_t;
+static nxt_int_t nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app);
+
static void nxt_router_worker_remove_pid_handler(nxt_task_t *task, void *obj,
void *data);
static void nxt_router_worker_remove_pid_done(nxt_task_t *task, void *obj,
@@ -124,7 +119,7 @@ static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
nxt_router_temp_conf_t *tmcf);
static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
nxt_event_engine_t *engine);
-static void nxt_router_apps_sort(nxt_router_t *router,
+static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
nxt_router_temp_conf_t *tmcf);
static void nxt_router_engines_post(nxt_router_t *router,
@@ -150,12 +145,14 @@ static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
static void nxt_router_conf_release(nxt_task_t *task,
nxt_socket_conf_joint_t *joint);
-static void nxt_router_send_sw_request(nxt_task_t *task, void *obj,
- void *data);
-static nxt_bool_t nxt_router_app_free(nxt_task_t *task, nxt_app_t *app);
-static nxt_port_t * nxt_router_app_get_port(nxt_app_t *app, uint32_t stream);
-static void nxt_router_app_release_port(nxt_task_t *task, void *obj,
- void *data);
+static void nxt_router_app_port_ready(nxt_task_t *task,
+ nxt_port_recv_msg_t *msg, void *data);
+static void nxt_router_app_port_error(nxt_task_t *task,
+ nxt_port_recv_msg_t *msg, void *data);
+
+static nxt_port_t * nxt_router_app_get_idle_port(nxt_app_t *app);
+static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
+ uint32_t request_failed, uint32_t got_response);
static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data);
static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj,
@@ -165,7 +162,7 @@ static void nxt_router_conn_http_body_read(nxt_task_t *task, void *obj,
static void nxt_router_process_http_request(nxt_task_t *task,
nxt_conn_t *c, nxt_app_parse_ctx_t *ap);
static void nxt_router_process_http_request_mp(nxt_task_t *task,
- nxt_req_app_link_t *ra, nxt_port_t *port);
+ nxt_req_app_link_t *ra);
static nxt_int_t nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
nxt_app_wmsg_t *wmsg);
static nxt_int_t nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
@@ -222,68 +219,91 @@ nxt_router_start(nxt_task_t *task, void *data)
}
-static nxt_start_worker_t *
-nxt_router_sw_create(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra)
+static void
+nxt_router_start_worker_handler(nxt_task_t *task, nxt_port_t *port, void *data)
{
- nxt_port_t *main_port;
- nxt_runtime_t *rt;
- nxt_start_worker_t *sw;
+ size_t size;
+ uint32_t stream;
+ nxt_app_t *app;
+ nxt_buf_t *b;
+ nxt_port_t *main_port;
+ nxt_runtime_t *rt;
- sw = nxt_zalloc(sizeof(nxt_start_worker_t));
+ app = data;
- if (nxt_slow_path(sw == NULL)) {
- return NULL;
- }
+ rt = task->thread->runtime;
+ main_port = rt->port_by_type[NXT_PROCESS_MAIN];
- sw->app = app;
- sw->ra = ra;
+ nxt_debug(task, "app '%V' %p start worker", &app->name, app);
- nxt_debug(task, "sw %p create, stream #%uD, app '%V' %p", sw,
- ra->stream, &app->name, app);
+ size = app->name.length + 1 + app->conf.length;
- rt = task->thread->runtime;
- main_port = rt->port_by_type[NXT_PROCESS_MAIN];
+ b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size);
- sw->work.handler = nxt_router_send_sw_request;
- sw->work.task = &main_port->engine->task;
- sw->work.obj = sw;
- sw->work.data = task->thread->engine;
- sw->work.next = NULL;
+ if (nxt_slow_path(b == NULL)) {
+ goto failed;
+ }
+
+ nxt_buf_cpystr(b, &app->name);
+ *b->mem.free++ = '\0';
+ nxt_buf_cpystr(b, &app->conf);
- if (task->thread->engine != main_port->engine) {
- nxt_debug(task, "sw %p post send to main engine %p", sw,
- main_port->engine);
+ stream = nxt_port_rpc_register_handler(task, port,
+ nxt_router_app_port_ready,
+ nxt_router_app_port_error,
+ -1, app);
- nxt_event_engine_post(main_port->engine, &sw->work);
+ if (nxt_slow_path(stream == 0)) {
+ nxt_mp_release(b->data, b);
- } else {
- nxt_router_send_sw_request(task, sw, sw->work.data);
+ goto failed;
}
- return sw;
-}
+ nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1,
+ stream, port->id, b);
+
+ return;
+failed:
-nxt_inline void
-nxt_router_sw_release(nxt_task_t *task, nxt_start_worker_t *sw)
-{
- nxt_debug(task, "sw %p release", sw);
+ nxt_thread_mutex_lock(&app->mutex);
+
+ app->pending_workers--;
- nxt_free(sw);
+ nxt_thread_mutex_unlock(&app->mutex);
+
+ nxt_router_app_use(task, app, -1);
}
-nxt_inline void
-nxt_router_rc_unlink(nxt_req_conn_link_t *rc)
+static nxt_int_t
+nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app)
{
- nxt_queue_remove(&rc->link);
+ nxt_int_t res;
+ nxt_port_t *router_port;
+ nxt_runtime_t *rt;
- if (rc->ra != NULL) {
- rc->ra->rc = NULL;
- rc->ra = NULL;
+ rt = task->thread->runtime;
+ router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
+
+ nxt_router_app_use(task, app, 1);
+
+ res = nxt_port_post(task, router_port, nxt_router_start_worker_handler,
+ app);
+
+ if (res == NXT_OK) {
+ return res;
}
- rc->conn = NULL;
+ nxt_thread_mutex_lock(&app->mutex);
+
+ app->pending_workers--;
+
+ nxt_thread_mutex_unlock(&app->mutex);
+
+ nxt_router_app_use(task, app, -1);
+
+ return NXT_ERROR;
}
@@ -327,36 +347,18 @@ nxt_router_ra_create(nxt_task_t *task, nxt_req_conn_link_t *rc)
static void
nxt_router_ra_release(nxt_task_t *task, void *obj, void *data)
{
- nxt_port_t *app_port;
- nxt_req_app_link_t *ra;
- nxt_event_engine_t *engine;
+ nxt_req_app_link_t *ra;
+ nxt_event_engine_t *engine;
+ nxt_req_conn_link_t *rc;
ra = obj;
engine = data;
- if (ra->app_port != NULL) {
-
- app_port = ra->app_port;
- ra->app_port = NULL;
-
- if (task->thread->engine != engine) {
- ra->app_pid = app_port->pid;
- }
-
- nxt_router_app_release_port(task, app_port, app_port->app);
-
-#if 0
- /* Uncomment to hold app port until complete response received. */
- if (ra->rc != NULL) {
- ra->rc->app_port = ra->app_port;
-
- } else {
- nxt_router_app_release_port(task, ra->app_port, ra->app_port->app);
+ if (task->thread->engine != engine) {
+ if (ra->app_port != NULL) {
+ ra->app_pid = ra->app_port->pid;
}
-#endif
- }
- if (task->thread->engine != engine) {
ra->work.handler = nxt_router_ra_release;
ra->work.task = &engine->task;
ra->work.next = NULL;
@@ -369,11 +371,27 @@ nxt_router_ra_release(nxt_task_t *task, void *obj, void *data)
return;
}
- if (ra->rc != NULL && ra->app_pid != -1) {
- nxt_port_rpc_ex_set_peer(task, engine->port, ra->rc, ra->app_pid);
+ nxt_debug(task, "ra stream #%uD release", ra->stream);
+
+ rc = ra->rc;
+
+ if (rc != NULL) {
+ if (ra->app_pid != -1) {
+ nxt_port_rpc_ex_set_peer(task, engine->port, rc, ra->app_pid);
+ }
+
+ rc->app_port = ra->app_port;
+
+ ra->app_port = NULL;
+ rc->ra = NULL;
+ ra->rc = NULL;
}
- nxt_debug(task, "ra stream #%uD release", ra->stream);
+ if (ra->app_port != NULL) {
+ nxt_router_app_port_release(task, ra->app_port, 0, 1);
+
+ ra->app_port = NULL;
+ }
nxt_mp_release(ra->mem_pool, ra);
}
@@ -382,9 +400,10 @@ nxt_router_ra_release(nxt_task_t *task, void *obj, void *data)
static void
nxt_router_ra_abort(nxt_task_t *task, void *obj, void *data)
{
- nxt_conn_t *c;
- nxt_req_app_link_t *ra;
- nxt_event_engine_t *engine;
+ nxt_conn_t *c;
+ nxt_req_app_link_t *ra;
+ nxt_req_conn_link_t *rc;
+ nxt_event_engine_t *engine;
ra = obj;
engine = data;
@@ -403,17 +422,75 @@ nxt_router_ra_abort(nxt_task_t *task, void *obj, void *data)
nxt_debug(task, "ra stream #%uD abort", ra->stream);
- if (ra->rc != NULL) {
- c = ra->rc->conn;
+ rc = ra->rc;
+
+ if (rc != NULL) {
+ c = rc->conn;
nxt_router_gen_error(task, c, 500,
"Failed to start application worker");
+
+ rc->ra = NULL;
+ ra->rc = NULL;
+ }
+
+ if (ra->app_port != NULL) {
+ nxt_router_app_port_release(task, ra->app_port, 0, 1);
+
+ ra->app_port = NULL;
}
nxt_mp_release(ra->mem_pool, ra);
}
+nxt_inline void
+nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc)
+{
+ nxt_req_app_link_t *ra;
+
+ if (rc->app_port != NULL) {
+ nxt_router_app_port_release(task, rc->app_port, 0, 1);
+
+ rc->app_port = NULL;
+ }
+
+ ra = rc->ra;
+
+ if (ra != NULL) {
+ rc->ra = NULL;
+ ra->rc = NULL;
+
+ nxt_thread_mutex_lock(&rc->app->mutex);
+
+ if (ra->link.next != NULL) {
+ nxt_queue_remove(&ra->link);
+
+ ra->link.next = NULL;
+
+ } else {
+ ra = NULL;
+ }
+
+ nxt_thread_mutex_unlock(&rc->app->mutex);
+ }
+
+ if (ra != NULL) {
+ nxt_router_ra_release(task, ra, ra->work.data);
+ }
+
+ if (rc->app != NULL) {
+ nxt_router_app_use(task, rc->app, -1);
+
+ rc->app = NULL;
+ }
+
+ nxt_queue_remove(&rc->link);
+
+ rc->conn = NULL;
+}
+
+
void
nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
@@ -526,12 +603,20 @@ nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
static void
nxt_router_worker_remove_pid_handler(nxt_task_t *task, void *obj, void *data)
{
+ nxt_pid_t pid;
+ nxt_buf_t *buf;
nxt_event_engine_t *engine;
nxt_remove_pid_msg_t *rp;
rp = obj;
- nxt_port_remove_pid_handler(task, &rp->msg);
+ buf = rp->msg.buf;
+
+ nxt_assert(nxt_buf_used_size(buf) == sizeof(pid));
+
+ nxt_memcpy(&pid, buf->mem.pos, sizeof(pid));
+
+ nxt_port_rpc_remove_peer(task, rp->msg.port, pid);
engine = rp->work.data;
@@ -658,7 +743,7 @@ nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
goto fail;
}
- nxt_router_apps_sort(router, tmcf);
+ nxt_router_apps_sort(task, router, tmcf);
nxt_router_engines_post(router, tmcf);
@@ -1005,9 +1090,12 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
app->max_workers = apcf.workers;
app->timeout = apcf.timeout;
app->live = 1;
+ app->max_pending_responses = 2;
app->prepare_msg = nxt_app_prepare_msg[type];
nxt_queue_insert_tail(&tmcf->apps, &app->link);
+
+ nxt_router_app_use(task, app, 1);
}
http = nxt_conf_get_path(conf, &http_path);
@@ -1695,42 +1783,36 @@ nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
static void
-nxt_router_apps_sort(nxt_router_t *router, nxt_router_temp_conf_t *tmcf)
+nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
+ nxt_router_temp_conf_t *tmcf)
{
- nxt_app_t *app;
- nxt_port_t *port;
+ nxt_app_t *app;
+ nxt_port_t *port;
nxt_queue_each(app, &router->apps, nxt_app_t, link) {
nxt_queue_remove(&app->link);
- nxt_thread_log_debug("about to remove app '%V' %p", &app->name, app);
+ nxt_debug(task, "about to free app '%V' %p", &app->name, app);
app->live = 0;
- if (nxt_router_app_free(NULL, app) != 0) {
- continue;
- }
-
- if (!nxt_queue_is_empty(&app->requests)) {
-
- nxt_thread_log_debug("app '%V' %p pending requests found",
- &app->name, app);
- continue;
- }
-
do {
- port = nxt_router_app_get_port(app, 0);
+ port = nxt_router_app_get_idle_port(app);
if (port == NULL) {
break;
}
- nxt_thread_log_debug("port %p send quit", port);
+ nxt_debug(task, "port %p send quit", port);
+
+ nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0,
+ NULL);
- nxt_port_socket_write(&port->engine->task, port,
- NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
+ nxt_port_use(task, port, -1);
} while (1);
+ nxt_router_app_use(task, app, -1);
+
} nxt_queue_loop;
nxt_queue_add(&router->apps, &tmcf->previous);
@@ -1833,7 +1915,7 @@ nxt_router_thread_start(void *data)
ret = nxt_port_socket_init(task, port, 0);
if (nxt_slow_path(ret != NXT_OK)) {
- nxt_mp_release(port->mem_pool, port);
+ nxt_port_use(task, port, -1);
return;
}
@@ -2124,8 +2206,9 @@ nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data)
// TODO notify all apps
+ port->engine = task->thread->engine;
nxt_mp_thread_adopt(port->mem_pool);
- nxt_port_release(port);
+ nxt_port_use(task, port, -1);
nxt_mp_thread_adopt(engine->mem_pool);
nxt_mp_destroy(engine->mem_pool);
@@ -2240,13 +2323,7 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_buf_chain_add(&b, last);
- if (rc->app_port != NULL) {
- nxt_router_app_release_port(task, rc->app_port, rc->app_port->app);
-
- rc->app_port = NULL;
- }
-
- nxt_router_rc_unlink(rc);
+ nxt_router_rc_unlink(task, rc);
}
if (b == NULL) {
@@ -2283,7 +2360,7 @@ nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_router_gen_error(task, rc->conn, 500,
"Application terminated unexpectedly");
- nxt_router_rc_unlink(rc);
+ nxt_router_rc_unlink(task, rc);
}
@@ -2383,194 +2460,153 @@ nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code,
static void
-nxt_router_sw_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data)
+nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
+ void *data)
{
- nxt_start_worker_t *sw;
+ nxt_app_t *app;
+ nxt_port_t *port;
+
+ app = data;
+ port = msg->new_port;
- sw = data;
+ nxt_assert(app != NULL);
+ nxt_assert(port != NULL);
- nxt_assert(sw != NULL);
- nxt_assert(sw->app->pending_workers != 0);
+ port->app = app;
- msg->new_port->app = sw->app;
+ nxt_thread_mutex_lock(&app->mutex);
- sw->app->pending_workers--;
- sw->app->workers++;
+ nxt_assert(app->pending_workers != 0);
- nxt_debug(task, "sw %p got port %p", sw, msg->new_port);
+ app->pending_workers--;
+ app->workers++;
+
+ nxt_thread_mutex_unlock(&app->mutex);
- nxt_router_app_release_port(task, msg->new_port, sw->app);
+ nxt_debug(task, "app '%V' %p new port ready", &app->name, app);
- nxt_router_sw_release(task, sw);
+ nxt_router_app_port_release(task, port, 0, 0);
}
static void
-nxt_router_sw_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data)
+nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
+ void *data)
{
nxt_app_t *app;
nxt_queue_link_t *lnk;
nxt_req_app_link_t *ra;
- nxt_start_worker_t *sw;
- sw = data;
+ app = data;
- nxt_assert(sw != NULL);
- nxt_assert(sw->app != NULL);
- nxt_assert(sw->app->pending_workers != 0);
+ nxt_assert(app != NULL);
- app = sw->app;
+ nxt_debug(task, "app '%V' %p start error", &app->name, app);
+
+ nxt_thread_mutex_lock(&app->mutex);
- sw->app->pending_workers--;
+ nxt_assert(app->pending_workers != 0);
- nxt_debug(task, "sw %p error, failed to start app '%V'",
- sw, &app->name);
+ app->pending_workers--;
if (!nxt_queue_is_empty(&app->requests)) {
lnk = nxt_queue_last(&app->requests);
nxt_queue_remove(lnk);
+ lnk->next = NULL;
ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link);
+ } else {
+ ra = NULL;
+ }
+
+ nxt_thread_mutex_unlock(&app->mutex);
+
+ if (ra != NULL) {
nxt_debug(task, "app '%V' %p abort next stream #%uD",
&app->name, app, ra->stream);
nxt_router_ra_abort(task, ra, ra->work.data);
}
- nxt_router_sw_release(task, sw);
+ nxt_router_app_use(task, app, -1);
}
-static void
-nxt_router_send_sw_request(nxt_task_t *task, void *obj, void *data)
+void
+nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i)
{
- size_t size;
- uint32_t stream;
- nxt_buf_t *b;
- nxt_app_t *app;
- nxt_port_t *main_port, *router_port, *app_port;
- nxt_runtime_t *rt;
- nxt_start_worker_t *sw;
- nxt_req_app_link_t *ra;
-
- sw = obj;
- app = sw->app;
+ int c;
- if (nxt_queue_is_empty(&app->requests)) {
- ra = sw->ra;
- app_port = nxt_router_app_get_port(app, ra->stream);
+ c = nxt_atomic_fetch_add(&app->use_count, i);
- if (app_port != NULL) {
- nxt_debug(task, "app '%V' %p process stream #%uD",
- &app->name, app, ra->stream);
+ if (i < 0 && c == -i) {
- ra->app_port = app_port;
+ nxt_assert(app->live == 0);
+ nxt_assert(app->workers == 0);
+ nxt_assert(app->pending_workers == 0);
+ nxt_assert(nxt_queue_is_empty(&app->requests) != 0);
+ nxt_assert(nxt_queue_is_empty(&app->ports) != 0);
- nxt_router_process_http_request_mp(task, ra, app_port);
-
- nxt_router_ra_release(task, ra, ra->work.data);
- nxt_router_sw_release(task, sw);
-
- return;
- }
- }
-
- nxt_queue_insert_tail(&app->requests, &sw->ra->link);
-
- if (app->workers + app->pending_workers >= app->max_workers) {
- nxt_debug(task, "app '%V' %p %uD/%uD running/pending workers, "
- "max_workers (%uD) reached", &app->name, app,
- app->workers, app->pending_workers, app->max_workers);
-
- nxt_router_sw_release(task, sw);
-
- return;
+ nxt_thread_mutex_destroy(&app->mutex);
+ nxt_free(app);
}
-
- app->pending_workers++;
-
- nxt_debug(task, "sw %p send", sw);
-
- rt = task->thread->runtime;
- main_port = rt->port_by_type[NXT_PROCESS_MAIN];
- router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
-
- size = app->name.length + 1 + app->conf.length;
-
- b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size);
-
- nxt_buf_cpystr(b, &app->name);
- *b->mem.free++ = '\0';
- nxt_buf_cpystr(b, &app->conf);
-
- stream = nxt_port_rpc_register_handler(task, router_port,
- nxt_router_sw_ready,
- nxt_router_sw_error,
- main_port->pid, sw);
-
- nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1,
- stream, router_port->id, b);
}
-static nxt_bool_t
-nxt_router_app_free(nxt_task_t *task, nxt_app_t *app)
+nxt_inline nxt_port_t *
+nxt_router_app_get_port_unsafe(nxt_app_t *app, int *use_delta)
{
- nxt_queue_link_t *lnk;
- nxt_req_app_link_t *ra;
+ nxt_port_t *port;
+ nxt_queue_link_t *lnk;
- nxt_thread_log_debug("app '%V' %p state: %d/%uD/%uD/%d", &app->name, app,
- app->live, app->workers, app->pending_workers,
- nxt_queue_is_empty(&app->requests));
+ lnk = nxt_queue_first(&app->ports);
+ nxt_queue_remove(lnk);
- if (app->live == 0
- && app->workers == 0
- && app->pending_workers == 0
- && nxt_queue_is_empty(&app->requests))
- {
- nxt_thread_mutex_destroy(&app->mutex);
- nxt_free(app);
+ port = nxt_queue_link_data(lnk, nxt_port_t, app_link);
- return 1;
- }
+ port->app_requests++;
- if (app->live == 1
- && nxt_queue_is_empty(&app->requests) == 0
- && app->workers + app->pending_workers < app->max_workers)
+ if (app->live &&
+ (app->max_pending_responses == 0 ||
+ (port->app_requests - port->app_responses) <
+ app->max_pending_responses) )
{
- lnk = nxt_queue_first(&app->requests);
- nxt_queue_remove(lnk);
+ nxt_queue_insert_tail(&app->ports, lnk);
- ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link);
+ } else {
+ lnk->next = NULL;
- nxt_router_sw_create(task, app, ra);
+ (*use_delta)--;
}
- return 0;
+ return port;
}
static nxt_port_t *
-nxt_router_app_get_port(nxt_app_t *app, uint32_t stream)
+nxt_router_app_get_idle_port(nxt_app_t *app)
{
- nxt_port_t *port;
- nxt_queue_link_t *lnk;
+ nxt_port_t *port;
port = NULL;
nxt_thread_mutex_lock(&app->mutex);
- if (!nxt_queue_is_empty(&app->ports)) {
- lnk = nxt_queue_first(&app->ports);
- nxt_queue_remove(lnk);
+ nxt_queue_each(port, &app->ports, nxt_port_t, app_link) {
- lnk->next = NULL;
+ if (port->app_requests > port->app_responses) {
+ port = NULL;
- port = nxt_queue_link_data(lnk, nxt_port_t, app_link);
+ continue;
+ }
- port->app_stream = stream;
- }
+ nxt_queue_remove(&port->app_link);
+ port->app_link.next = NULL;
+
+ break;
+
+ } nxt_queue_loop;
nxt_thread_mutex_unlock(&app->mutex);
@@ -2579,151 +2615,175 @@ nxt_router_app_get_port(nxt_app_t *app, uint32_t stream)
static void
-nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data)
+nxt_router_app_process_request(nxt_task_t *task, void *obj, void *data)
{
- nxt_app_t *app;
- nxt_port_t *port;
- nxt_work_t *work;
- nxt_queue_link_t *lnk;
- nxt_req_app_link_t *ra;
+ nxt_app_t *app;
+ nxt_req_app_link_t *ra;
- port = obj;
- app = data;
+ app = obj;
+ ra = data;
nxt_assert(app != NULL);
- nxt_assert(app == port->app);
- nxt_assert(port->app_link.next == NULL);
+ nxt_assert(ra != NULL);
+ nxt_assert(ra->app_port != NULL);
+ nxt_debug(task, "app '%V' %p process next stream #%uD",
+ &app->name, app, ra->stream);
- if (task->thread->engine != port->engine) {
- work = &port->work;
+ nxt_router_process_http_request_mp(task, ra);
+}
- nxt_debug(task, "post release port to engine %p", port->engine);
- work->next = NULL;
- work->handler = nxt_router_app_release_port;
- work->task = &port->engine->task;
- work->obj = port;
- work->data = app;
+static void
+nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
+ uint32_t request_failed, uint32_t got_response)
+{
+ int use_delta, ra_use_delta;
+ nxt_app_t *app;
+ nxt_queue_link_t *lnk;
+ nxt_req_app_link_t *ra;
- nxt_event_engine_post(port->engine, work);
+ nxt_assert(port != NULL);
+ nxt_assert(port->app != NULL);
- return;
+ app = port->app;
+
+ use_delta = (request_failed == 0 && got_response == 0) ? 0 : -1;
+
+ nxt_thread_mutex_lock(&app->mutex);
+
+ port->app_requests -= request_failed;
+ port->app_responses += got_response;
+
+ if (app->live != 0 &&
+ port->pair[1] != -1 &&
+ port->app_link.next == NULL &&
+ (app->max_pending_responses == 0 ||
+ (port->app_requests - port->app_responses) <
+ app->max_pending_responses) )
+ {
+ nxt_queue_insert_tail(&app->ports, &port->app_link);
+ use_delta++;
}
- if (!nxt_queue_is_empty(&app->requests)) {
+ if (app->live != 0 &&
+ !nxt_queue_is_empty(&app->ports) &&
+ !nxt_queue_is_empty(&app->requests))
+ {
lnk = nxt_queue_first(&app->requests);
nxt_queue_remove(lnk);
+ lnk->next = NULL;
ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link);
- nxt_debug(task, "app '%V' %p process next stream #%uD",
- &app->name, app, ra->stream);
+ ra_use_delta = 1;
+ ra->app_port = nxt_router_app_get_port_unsafe(app, &ra_use_delta);
- ra->app_port = port;
- port->app_stream = ra->stream;
+ } else {
+ ra = NULL;
+ ra_use_delta = 0;
+ }
- nxt_router_process_http_request_mp(task, ra, port);
+ nxt_thread_mutex_unlock(&app->mutex);
- nxt_router_ra_release(task, ra, ra->work.data);
+ if (ra != NULL) {
+ nxt_work_queue_add(&task->thread->engine->fast_work_queue,
+ nxt_router_app_process_request,
+ &task->thread->engine->task, app, ra);
- return;
+ goto adjust_use;
}
- port->app_stream = 0;
-
+ /* ? */
if (port->pair[1] == -1) {
- nxt_debug(task, "app '%V' %p port already closed (pid %PI dead?)",
- &app->name, app, port->pid);
-
- app->workers--;
- nxt_router_app_free(task, app);
+ nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)",
+ &app->name, app, port, port->pid);
- port->app = NULL;
-
- nxt_port_release(port);
-
- return;
+ goto adjust_use;
}
- if (!app->live) {
+ if (app->live == 0) {
nxt_debug(task, "app '%V' %p is not alive, send QUIT to port",
&app->name, app);
nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT,
-1, 0, 0, NULL);
- return;
+ goto adjust_use;
}
nxt_debug(task, "app '%V' %p requests queue is empty, keep the port",
&app->name, app);
- nxt_thread_mutex_lock(&app->mutex);
+adjust_use:
- nxt_queue_insert_head(&app->ports, &port->app_link);
+ if (use_delta != 0) {
+ nxt_port_use(task, port, use_delta);
+ }
- nxt_thread_mutex_unlock(&app->mutex);
+ if (ra_use_delta != 0) {
+ nxt_port_use(task, ra->app_port, ra_use_delta);
+ }
}
-nxt_bool_t
-nxt_router_app_remove_port(nxt_port_t *port)
+void
+nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port)
{
nxt_app_t *app;
- nxt_bool_t busy;
+ nxt_bool_t unchain, start_worker;
app = port->app;
- busy = port->app_stream != 0;
-
- if (app == NULL) {
- nxt_thread_log_debug("port %p app remove, no app", port);
-
- nxt_assert(port->app_link.next == NULL);
- return 1;
- }
+ nxt_assert(app != NULL);
nxt_thread_mutex_lock(&app->mutex);
- if (port->app_link.next != NULL) {
+ unchain = port->app_link.next != NULL;
+ if (unchain) {
nxt_queue_remove(&port->app_link);
port->app_link.next = NULL;
+ }
+ app->workers--;
+
+ start_worker = app->live != 0 &&
+ nxt_queue_is_empty(&app->requests) == 0 &&
+ app->workers + app->pending_workers < app->max_workers;
+
+ if (start_worker) {
+ app->pending_workers++;
}
nxt_thread_mutex_unlock(&app->mutex);
- if (busy == 0) {
- nxt_thread_log_debug("port %p app remove, free, app '%V' %p", port,
- &app->name, app);
-
- app->workers--;
- nxt_router_app_free(&port->engine->task, app);
+ nxt_debug(task, "app '%V' %p port %p close", &app->name, app, port);
- return 1;
+ if (unchain) {
+ nxt_port_use(task, port, -1);
}
- nxt_thread_log_debug("port %p app remove, busy, app '%V' %p, "
- "app stream #%uD", port, &app->name, app,
- port->app_stream);
-
- return 0;
+ if (start_worker) {
+ nxt_router_start_worker(task, app);
+ }
}
static nxt_int_t
nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
{
+ int use_delta;
+ nxt_int_t res;
nxt_app_t *app;
+ nxt_bool_t can_start_worker;
nxt_conn_t *c;
nxt_port_t *port;
nxt_event_engine_t *engine;
- nxt_start_worker_t *sw;
nxt_socket_conf_joint_t *joint;
port = NULL;
+ use_delta = 1;
c = ra->rc->conn;
joint = c->listen->socket.data;
@@ -2735,6 +2795,10 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
return NXT_ERROR;
}
+ ra->rc->app = app;
+
+ nxt_router_app_use(task, app, 1);
+
engine = task->thread->engine;
nxt_timer_disable(engine, &c->read_timer);
@@ -2744,20 +2808,50 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
nxt_timer_add(engine, &c->read_timer, app->timeout);
}
- port = nxt_router_app_get_port(app, ra->stream);
+ nxt_thread_mutex_lock(&app->mutex);
+
+ if (!nxt_queue_is_empty(&app->ports)) {
+ port = nxt_router_app_get_port_unsafe(app, &use_delta);
+
+ can_start_worker = 0;
+
+ } else {
+ nxt_queue_insert_tail(&app->requests, &ra->link);
+
+ can_start_worker = (app->workers + app->pending_workers) <
+ app->max_workers;
+ if (can_start_worker) {
+ app->pending_workers++;
+ }
+
+ port = NULL;
+ }
+
+ nxt_thread_mutex_unlock(&app->mutex);
if (port != NULL) {
- nxt_debug(task, "already have port for app '%V'", &app->name);
+ nxt_debug(task, "already have port for app '%V' %p ", &app->name, app);
ra->app_port = port;
+
+ if (use_delta != 0) {
+ nxt_port_use(task, port, use_delta);
+ }
return NXT_OK;
}
- sw = nxt_router_sw_create(task, app, ra);
+ if (!can_start_worker) {
+ nxt_debug(task, "app '%V' %p too many running or pending workers",
+ &app->name, app);
+
+ return NXT_AGAIN;
+ }
+
+ res = nxt_router_start_worker(task, app);
+
+ if (nxt_slow_path(res != NXT_OK)) {
+ nxt_router_gen_error(task, c, 500, "Failed to start worker");
- if (nxt_slow_path(sw == NULL)) {
- nxt_router_gen_error(task, c, 500,
- "Failed to allocate start worker struct");
return NXT_ERROR;
}
@@ -3011,18 +3105,16 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
nxt_port_rpc_ex_set_peer(task, engine->port, rc, port->pid);
- nxt_router_process_http_request_mp(task, ra, port);
-
- nxt_router_ra_release(task, ra, ra->work.data);
+ nxt_router_process_http_request_mp(task, ra);
}
static void
-nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra,
- nxt_port_t *port)
+nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra)
{
+ uint32_t request_failed;
nxt_int_t res;
- nxt_port_t *c_port, *reply_port;
+ nxt_port_t *port, *c_port, *reply_port;
nxt_conn_t *c;
nxt_app_wmsg_t wmsg;
nxt_app_parse_ctx_t *ap;
@@ -3030,11 +3122,15 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra,
/* TODO: it is unsafe to use ra->rc and ra->rc->conn in main thread */
nxt_assert(ra->rc != NULL);
+ nxt_assert(ra->app_port != NULL);
+ port = ra->app_port;
reply_port = ra->reply_port;
ap = ra->ap;
c = ra->rc->conn;
+ request_failed = 1;
+
c_port = nxt_process_connected_port_find(port->process, reply_port->pid,
reply_port->id);
if (nxt_slow_path(c_port != reply_port)) {
@@ -3043,7 +3139,7 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra,
if (nxt_slow_path(res != NXT_OK)) {
nxt_router_gen_error(task, c, 500,
"Failed to send reply port to application");
- return;
+ goto release_port;
}
nxt_process_connected_port_add(port->process, reply_port);
@@ -3059,21 +3155,33 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra,
if (nxt_slow_path(res != NXT_OK)) {
nxt_router_gen_error(task, c, 500,
"Failed to prepare message for application");
- return;
+ goto release_port;
}
nxt_debug(task, "about to send %d bytes buffer to worker port %d",
nxt_buf_used_size(wmsg.write),
wmsg.port->socket.fd);
+ request_failed = 0;
+
res = nxt_port_socket_write(task, wmsg.port, NXT_PORT_MSG_DATA,
-1, ra->stream, reply_port->id, wmsg.write);
if (nxt_slow_path(res != NXT_OK)) {
nxt_router_gen_error(task, c, 500,
"Failed to send message to application");
- return;
+ goto release_port;
}
+
+release_port:
+
+ if (request_failed != 0) {
+ ra->app_port = 0;
+ }
+
+ nxt_router_app_port_release(task, port, request_failed, 0);
+
+ nxt_router_ra_release(task, ra, ra->work.data);
}
@@ -3452,13 +3560,7 @@ nxt_router_conn_free(nxt_task_t *task, void *obj, void *data)
nxt_debug(task, "conn %p close, stream #%uD", c, rc->stream);
- if (rc->app_port != NULL) {
- nxt_router_app_release_port(task, rc->app_port, rc->app_port->app);
-
- rc->app_port = NULL;
- }
-
- nxt_router_rc_unlink(rc);
+ nxt_router_rc_unlink(task, rc);
nxt_port_rpc_cancel(task, task->thread->engine->port, rc->stream);