summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_router.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/nxt_router.c1504
1 files changed, 462 insertions, 1042 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c
index 4df1489d..44b303e4 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -61,59 +61,13 @@ typedef struct {
} nxt_app_rpc_t;
-struct nxt_port_select_state_s {
- nxt_app_t *app;
- nxt_request_app_link_t *req_app_link;
-
- nxt_port_t *failed_port;
- int failed_port_use_delta;
-
- uint8_t start_process; /* 1 bit */
- nxt_request_app_link_t *shared_ra;
- nxt_port_t *port;
-};
-
-typedef struct nxt_port_select_state_s nxt_port_select_state_t;
-
static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process,
nxt_mp_t *mp);
static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data);
static void nxt_router_greet_controller(nxt_task_t *task,
nxt_port_t *controller_port);
-static void nxt_router_port_select(nxt_task_t *task,
- nxt_port_select_state_t *state);
-
-static nxt_int_t nxt_router_port_post_select(nxt_task_t *task,
- nxt_port_select_state_t *state);
-
static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
-static void nxt_request_app_link_update_peer(nxt_task_t *task,
- nxt_request_app_link_t *req_app_link);
-
-
-nxt_inline void
-nxt_request_app_link_inc_use(nxt_request_app_link_t *req_app_link)
-{
- nxt_atomic_fetch_add(&req_app_link->use_count, 1);
-}
-
-nxt_inline void
-nxt_request_app_link_chk_use(nxt_request_app_link_t *req_app_link, int i)
-{
-#if (NXT_DEBUG)
- int c;
-
- c = nxt_atomic_fetch_add(&req_app_link->use_count, i);
-
- nxt_assert((c + i) > 0);
-#else
- (void) nxt_atomic_fetch_add(&req_app_link->use_count, i);
-#endif
-}
-
-static void nxt_request_app_link_use(nxt_task_t *task,
- nxt_request_app_link_t *req_app_link, int i);
static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
@@ -196,6 +150,8 @@ static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
void *data);
static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
void *data);
+static void nxt_router_req_headers_ack_handler(nxt_task_t *task,
+ nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data);
static void nxt_router_listen_socket_release(nxt_task_t *task,
nxt_socket_conf_t *skcf);
@@ -220,6 +176,8 @@ static void nxt_router_access_log_reopen_error(nxt_task_t *task,
static void nxt_router_app_port_ready(nxt_task_t *task,
nxt_port_recv_msg_t *msg, void *data);
+static nxt_int_t nxt_router_app_shared_port_send(nxt_task_t *task,
+ nxt_port_t *app_port);
static void nxt_router_app_port_error(nxt_task_t *task,
nxt_port_recv_msg_t *msg, void *data);
@@ -227,13 +185,15 @@ static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app);
static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
nxt_apr_action_t action);
-static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_app_t *app,
- nxt_request_app_link_t *req_app_link);
+static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
+ nxt_request_rpc_data_t *req_rpc_data);
+static void nxt_router_http_request_done(nxt_task_t *task, void *obj,
+ void *data);
static void nxt_router_app_prepare_request(nxt_task_t *task,
- nxt_request_app_link_t *req_app_link);
+ nxt_request_rpc_data_t *req_rpc_data);
static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task,
- nxt_http_request_t *r, nxt_port_t *port, const nxt_str_t *prefix);
+ nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix);
static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj,
@@ -250,7 +210,7 @@ static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
static void nxt_router_app_joint_use(nxt_task_t *task,
nxt_app_joint_t *app_joint, int i);
-static nxt_int_t nxt_router_http_request_done(nxt_task_t *task,
+static void nxt_router_http_request_release_post(nxt_task_t *task,
nxt_http_request_t *r);
static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
void *data);
@@ -501,83 +461,6 @@ nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
}
-nxt_inline void
-nxt_request_app_link_init(nxt_task_t *task,
- nxt_request_app_link_t *req_app_link, nxt_request_rpc_data_t *req_rpc_data)
-{
- nxt_buf_t *body;
- nxt_event_engine_t *engine;
-
- engine = task->thread->engine;
-
- nxt_memzero(req_app_link, sizeof(nxt_request_app_link_t));
-
- req_app_link->stream = req_rpc_data->stream;
- req_app_link->use_count = 1;
- req_app_link->req_rpc_data = req_rpc_data;
- req_rpc_data->req_app_link = req_app_link;
- req_app_link->reply_port = engine->port;
- req_app_link->request = req_rpc_data->request;
- req_app_link->apr_action = NXT_APR_GOT_RESPONSE;
-
- req_app_link->work.handler = NULL;
- req_app_link->work.task = &engine->task;
- req_app_link->work.obj = req_app_link;
- req_app_link->work.data = engine;
-
- body = req_rpc_data->request->body;
-
- if (body != NULL && nxt_buf_is_file(body)) {
- req_app_link->body_fd = body->file->fd;
-
- body->file->fd = -1;
-
- } else {
- req_app_link->body_fd = -1;
- }
-}
-
-
-nxt_inline nxt_request_app_link_t *
-nxt_request_app_link_alloc(nxt_task_t *task,
- nxt_request_app_link_t *ra_src, nxt_request_rpc_data_t *req_rpc_data)
-{
- nxt_mp_t *mp;
- nxt_request_app_link_t *req_app_link;
-
- if (ra_src != NULL && ra_src->mem_pool != NULL) {
- return ra_src;
- }
-
- mp = req_rpc_data->request->mem_pool;
-
- req_app_link = nxt_mp_alloc(mp, sizeof(nxt_request_app_link_t));
-
- if (nxt_slow_path(req_app_link == NULL)) {
-
- req_rpc_data->req_app_link = NULL;
-
- if (ra_src != NULL) {
- ra_src->req_rpc_data = NULL;
- }
-
- return NULL;
- }
-
- nxt_mp_retain(mp);
-
- nxt_request_app_link_init(task, req_app_link, req_rpc_data);
-
- if (ra_src != NULL) {
- req_app_link->body_fd = ra_src->body_fd;
- }
-
- req_app_link->mem_pool = mp;
-
- return req_app_link;
-}
-
-
nxt_inline nxt_bool_t
nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info,
uint32_t stream)
@@ -614,198 +497,6 @@ nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info,
}
-static void
-nxt_request_app_link_update_peer_handler(nxt_task_t *task, void *obj,
- void *data)
-{
- nxt_request_app_link_t *req_app_link;
-
- req_app_link = obj;
-
- nxt_request_app_link_update_peer(task, req_app_link);
-
- nxt_request_app_link_use(task, req_app_link, -1);
-}
-
-
-static void
-nxt_request_app_link_update_peer(nxt_task_t *task,
- nxt_request_app_link_t *req_app_link)
-{
- nxt_event_engine_t *engine;
- nxt_request_rpc_data_t *req_rpc_data;
-
- engine = req_app_link->work.data;
-
- if (task->thread->engine != engine) {
- nxt_request_app_link_inc_use(req_app_link);
-
- req_app_link->work.handler = nxt_request_app_link_update_peer_handler;
- req_app_link->work.task = &engine->task;
- req_app_link->work.next = NULL;
-
- nxt_debug(task, "req_app_link stream #%uD post update peer to %p",
- req_app_link->stream, engine);
-
- nxt_event_engine_post(engine, &req_app_link->work);
-
- return;
- }
-
- nxt_debug(task, "req_app_link stream #%uD update peer",
- req_app_link->stream);
-
- req_rpc_data = req_app_link->req_rpc_data;
-
- if (req_rpc_data != NULL && req_app_link->app_port != NULL) {
- nxt_port_rpc_ex_set_peer(task, engine->port, req_rpc_data,
- req_app_link->app_port->pid);
- }
-}
-
-
-static void
-nxt_request_app_link_release(nxt_task_t *task,
- nxt_request_app_link_t *req_app_link)
-{
- nxt_mp_t *mp;
- nxt_http_request_t *r;
- nxt_request_rpc_data_t *req_rpc_data;
-
- nxt_assert(task->thread->engine == req_app_link->work.data);
- nxt_assert(req_app_link->use_count == 0);
-
- nxt_debug(task, "req_app_link stream #%uD release", req_app_link->stream);
-
- req_rpc_data = req_app_link->req_rpc_data;
-
- if (req_rpc_data != NULL) {
- if (nxt_slow_path(req_app_link->err_code != 0)) {
- nxt_http_request_error(task, req_rpc_data->request,
- req_app_link->err_code);
-
- } else {
- req_rpc_data->app_port = req_app_link->app_port;
- req_rpc_data->apr_action = req_app_link->apr_action;
- req_rpc_data->msg_info = req_app_link->msg_info;
-
- if (req_rpc_data->app->timeout != 0) {
- r = req_rpc_data->request;
-
- r->timer.handler = nxt_router_app_timeout;
- r->timer_data = req_rpc_data;
- nxt_timer_add(task->thread->engine, &r->timer,
- req_rpc_data->app->timeout);
- }
-
- req_app_link->app_port = NULL;
- req_app_link->msg_info.buf = NULL;
- }
-
- req_rpc_data->req_app_link = NULL;
- req_app_link->req_rpc_data = NULL;
- }
-
- if (req_app_link->app_port != NULL) {
- nxt_router_app_port_release(task, req_app_link->app_port,
- req_app_link->apr_action);
-
- req_app_link->app_port = NULL;
- }
-
- if (req_app_link->body_fd != -1) {
- nxt_fd_close(req_app_link->body_fd);
-
- req_app_link->body_fd = -1;
- }
-
- nxt_router_msg_cancel(task, &req_app_link->msg_info, req_app_link->stream);
-
- mp = req_app_link->mem_pool;
-
- if (mp != NULL) {
- nxt_mp_free(mp, req_app_link);
- nxt_mp_release(mp);
- }
-}
-
-
-static void
-nxt_request_app_link_release_handler(nxt_task_t *task, void *obj, void *data)
-{
- nxt_request_app_link_t *req_app_link;
-
- req_app_link = obj;
-
- nxt_assert(req_app_link->work.data == data);
-
- nxt_request_app_link_use(task, req_app_link, -1);
-}
-
-
-static void
-nxt_request_app_link_use(nxt_task_t *task, nxt_request_app_link_t *req_app_link,
- int i)
-{
- int c;
- nxt_event_engine_t *engine;
-
- c = nxt_atomic_fetch_add(&req_app_link->use_count, i);
-
- if (i < 0 && c == -i) {
- engine = req_app_link->work.data;
-
- if (task->thread->engine == engine) {
- nxt_request_app_link_release(task, req_app_link);
-
- return;
- }
-
- nxt_request_app_link_inc_use(req_app_link);
-
- req_app_link->work.handler = nxt_request_app_link_release_handler;
- req_app_link->work.task = &engine->task;
- req_app_link->work.next = NULL;
-
- nxt_debug(task, "req_app_link stream #%uD post release to %p",
- req_app_link->stream, engine);
-
- nxt_event_engine_post(engine, &req_app_link->work);
- }
-}
-
-
-nxt_inline void
-nxt_request_app_link_error(nxt_task_t *task, nxt_app_t *app,
- nxt_request_app_link_t *req_app_link, const char *str)
-{
- req_app_link->app_port = NULL;
- req_app_link->err_code = 500;
- req_app_link->err_str = str;
-
- nxt_alert(task, "app \"%V\" internal error: %s on #%uD",
- &app->name, str, req_app_link->stream);
-}
-
-
-nxt_inline void
-nxt_request_app_link_pending(nxt_task_t *task, nxt_app_t *app,
- nxt_request_app_link_t *req_app_link)
-{
- nxt_queue_insert_tail(&req_app_link->app_port->pending_requests,
- &req_app_link->link_port_pending);
- nxt_queue_insert_tail(&app->pending, &req_app_link->link_app_pending);
-
- nxt_request_app_link_inc_use(req_app_link);
-
- req_app_link->res_time = nxt_thread_monotonic_time(task->thread)
- + app->res_timeout;
-
- nxt_debug(task, "req_app_link stream #%uD enqueue to pending_requests",
- req_app_link->stream);
-}
-
-
nxt_inline nxt_bool_t
nxt_queue_chk_remove(nxt_queue_link_t *lnk)
{
@@ -825,8 +516,9 @@ nxt_inline void
nxt_request_rpc_data_unlink(nxt_task_t *task,
nxt_request_rpc_data_t *req_rpc_data)
{
- int ra_use_delta;
- nxt_request_app_link_t *req_app_link;
+ nxt_http_request_t *r;
+
+ nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream);
if (req_rpc_data->app_port != NULL) {
nxt_router_app_port_release(task, req_rpc_data->app_port,
@@ -835,53 +527,34 @@ nxt_request_rpc_data_unlink(nxt_task_t *task,
req_rpc_data->app_port = NULL;
}
- nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream);
-
- req_app_link = req_rpc_data->req_app_link;
- if (req_app_link != NULL) {
- req_rpc_data->req_app_link = NULL;
- req_app_link->req_rpc_data = NULL;
-
- ra_use_delta = 0;
-
- nxt_thread_mutex_lock(&req_rpc_data->app->mutex);
+ if (req_rpc_data->app != NULL) {
+ nxt_router_app_use(task, req_rpc_data->app, -1);
- if (req_app_link->link_app_requests.next == NULL
- && req_app_link->link_port_pending.next == NULL
- && req_app_link->link_app_pending.next == NULL
- && req_app_link->link_port_websockets.next == NULL)
- {
- req_app_link = NULL;
+ req_rpc_data->app = NULL;
+ }
- } else {
- ra_use_delta -=
- nxt_queue_chk_remove(&req_app_link->link_app_requests)
- + nxt_queue_chk_remove(&req_app_link->link_port_pending)
- + nxt_queue_chk_remove(&req_app_link->link_port_websockets);
+ r = req_rpc_data->request;
- nxt_queue_chk_remove(&req_app_link->link_app_pending);
- }
+ if (r != NULL) {
+ r->timer_data = NULL;
- nxt_thread_mutex_unlock(&req_rpc_data->app->mutex);
+ nxt_router_http_request_release_post(task, r);
- if (req_app_link != NULL) {
- nxt_request_app_link_use(task, req_app_link, ra_use_delta);
- }
+ r->req_rpc_data = NULL;
+ req_rpc_data->request = NULL;
}
- if (req_rpc_data->app != NULL) {
- nxt_router_app_use(task, req_rpc_data->app, -1);
+ if (req_rpc_data->msg_info.body_fd != -1) {
+ nxt_fd_close(req_rpc_data->msg_info.body_fd);
- req_rpc_data->app = NULL;
+ req_rpc_data->msg_info.body_fd = -1;
}
- if (req_rpc_data->request != NULL) {
- req_rpc_data->request->timer_data = NULL;
-
- nxt_router_http_request_done(task, req_rpc_data->request);
+ if (req_rpc_data->rpc_cancel) {
+ req_rpc_data->rpc_cancel = 0;
- req_rpc_data->request->req_rpc_data = NULL;
- req_rpc_data->request = NULL;
+ nxt_port_rpc_cancel(task, task->thread->engine->port,
+ req_rpc_data->stream);
}
}
@@ -889,25 +562,62 @@ nxt_request_rpc_data_unlink(nxt_task_t *task,
void
nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
+ nxt_app_t *app;
+ nxt_port_t *port, *main_app_port;
+ nxt_runtime_t *rt;
+
nxt_port_new_port_handler(task, msg);
- if (msg->u.new_port != NULL
- && msg->u.new_port->type == NXT_PROCESS_CONTROLLER)
- {
+ port = msg->u.new_port;
+
+ if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) {
nxt_router_greet_controller(task, msg->u.new_port);
}
- if (msg->port_msg.stream == 0) {
- return;
- }
+ if (port == NULL || port->type != NXT_PROCESS_APP) {
+
+ if (msg->port_msg.stream == 0) {
+ return;
+ }
- if (msg->u.new_port == NULL
- || msg->u.new_port->type != NXT_PROCESS_APP)
- {
msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
}
- nxt_port_rpc_handler(task, msg);
+ if (msg->port_msg.stream != 0) {
+ nxt_port_rpc_handler(task, msg);
+ return;
+ }
+
+ /*
+ * Port with "id == 0" is application 'main' port and it always
+ * should come with non-zero stream.
+ */
+ nxt_assert(port->id != 0);
+
+ /* Find 'main' app port and get app reference. */
+ rt = task->thread->runtime;
+
+ /*
+ * It is safe to access 'runtime->ports' hash because 'NEW_PORT'
+ * sent to main port (with id == 0) and processed in main thread.
+ */
+ main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0);
+ nxt_assert(main_app_port != NULL);
+
+ app = main_app_port->app;
+ nxt_assert(app != NULL);
+
+ nxt_thread_mutex_lock(&app->mutex);
+
+ /* TODO here should be find-and-add code because there can be
+ port waiters in port_hash */
+ nxt_port_hash_add(&app->port_hash, port);
+ app->port_hash_count++;
+
+ nxt_thread_mutex_unlock(&app->mutex);
+
+ port->app = app;
+ port->main_app_port = main_app_port;
}
@@ -1100,8 +810,10 @@ nxt_router_app_can_start(nxt_app_t *app)
nxt_inline nxt_bool_t
nxt_router_app_need_start(nxt_app_t *app)
{
- return app->idle_processes + app->pending_processes
- < app->spare_processes;
+ return (app->active_requests
+ > app->port_hash_count + app->pending_processes)
+ || (app->spare_processes
+ > app->idle_processes + app->pending_processes);
}
@@ -1530,6 +1242,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_app_t *app, *prev;
nxt_str_t *t, *s, *targets;
nxt_uint_t n, i;
+ nxt_port_t *port;
nxt_router_t *router;
nxt_app_joint_t *app_joint;
nxt_conf_value_t *conf, *http, *value, *websocket;
@@ -1744,8 +1457,6 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_queue_init(&app->ports);
nxt_queue_init(&app->spare_ports);
nxt_queue_init(&app->idle_ports);
- nxt_queue_init(&app->requests);
- nxt_queue_init(&app->pending);
app->name.length = name.length;
nxt_memcpy(app->name.start, name.start, name.length);
@@ -1758,7 +1469,6 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
app->timeout = apcf.timeout;
app->res_timeout = apcf.res_timeout * 1000000;
app->idle_timeout = apcf.idle_timeout;
- app->max_pending_responses = 2;
app->max_requests = apcf.requests;
app->targets = targets;
@@ -1789,6 +1499,25 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
app_joint->free_app_work.handler = nxt_router_free_app;
app_joint->free_app_work.task = &engine->task;
app_joint->free_app_work.obj = app_joint;
+
+ port = nxt_port_new(task, (nxt_port_id_t) -1, nxt_pid,
+ NXT_PROCESS_APP);
+ if (nxt_slow_path(port == NULL)) {
+ return NXT_ERROR;
+ }
+
+ ret = nxt_port_socket_init(task, port, 0);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ nxt_port_use(task, port, -1);
+ return NXT_ERROR;
+ }
+
+ nxt_port_write_enable(task, port);
+ port->app = app;
+
+ app->shared_port = port;
+
+ nxt_thread_mutex_create(&app->outgoing.mutex);
}
}
@@ -2522,7 +2251,13 @@ nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
app = rpc->app;
port = msg->u.new_port;
+
+ nxt_assert(port != NULL);
+ nxt_assert(port->type == NXT_PROCESS_APP);
+ nxt_assert(port->id == 0);
+
port->app = app;
+ port->main_app_port = port;
app->pending_processes--;
app->processes++;
@@ -2532,11 +2267,15 @@ nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_queue_insert_tail(&app->ports, &port->app_link);
nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
+ nxt_port_hash_add(&app->port_hash, port);
+ app->port_hash_count++;
port->idle_start = 0;
nxt_port_inc_use(port);
+ nxt_router_app_shared_port_send(task, port);
+
nxt_work_queue_add(&engine->fast_work_queue,
nxt_router_conf_apply, task, rpc->temp_conf, NULL);
}
@@ -2939,10 +2678,11 @@ nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs)
static nxt_port_handlers_t nxt_router_app_port_handlers = {
- .rpc_error = nxt_port_rpc_handler,
- .mmap = nxt_port_mmap_handler,
- .data = nxt_port_rpc_handler,
- .oosm = nxt_router_oosm_handler,
+ .rpc_error = nxt_port_rpc_handler,
+ .mmap = nxt_port_mmap_handler,
+ .data = nxt_port_rpc_handler,
+ .oosm = nxt_router_oosm_handler,
+ .req_headers_ack = nxt_port_rpc_handler,
};
@@ -3736,22 +3476,17 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
void *data)
{
nxt_int_t ret;
+ nxt_app_t *app;
nxt_buf_t *b, *next;
nxt_port_t *app_port;
nxt_unit_field_t *f;
nxt_http_field_t *field;
nxt_http_request_t *r;
nxt_unit_response_t *resp;
- nxt_request_app_link_t *req_app_link;
nxt_request_rpc_data_t *req_rpc_data;
- b = msg->buf;
req_rpc_data = data;
- if (msg->size == 0) {
- b = NULL;
- }
-
r = req_rpc_data->request;
if (nxt_slow_path(r == NULL)) {
return;
@@ -3762,19 +3497,32 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
return;
}
+ app = req_rpc_data->app;
+ nxt_assert(app != NULL);
+
+ if (msg->port_msg.type == _NXT_PORT_MSG_REQ_HEADERS_ACK) {
+ nxt_router_req_headers_ack_handler(task, msg, req_rpc_data);
+
+ return;
+ }
+
+ b = (msg->size == 0) ? NULL : msg->buf;
+
if (msg->port_msg.last != 0) {
nxt_debug(task, "router data create last buf");
nxt_buf_chain_add(&b, nxt_http_buf_last(r));
+ req_rpc_data->rpc_cancel = 0;
+ req_rpc_data->apr_action = NXT_APR_GOT_RESPONSE;
+
nxt_request_rpc_data_unlink(task, req_rpc_data);
} else {
- if (req_rpc_data->app != NULL && req_rpc_data->app->timeout != 0) {
+ if (app->timeout != 0) {
r->timer.handler = nxt_router_app_timeout;
r->timer_data = req_rpc_data;
- nxt_timer_add(task->thread->engine, &r->timer,
- req_rpc_data->app->timeout);
+ nxt_timer_add(task->thread->engine, &r->timer, app->timeout);
}
}
@@ -3870,39 +3618,21 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
if (r->websocket_handshake
&& r->status == NXT_HTTP_SWITCHING_PROTOCOLS)
{
- req_app_link = nxt_request_app_link_alloc(task,
- req_rpc_data->req_app_link,
- req_rpc_data);
- if (nxt_slow_path(req_app_link == NULL)) {
- goto fail;
- }
-
- app_port = req_app_link->app_port;
-
- if (app_port == NULL && req_rpc_data->app_port != NULL) {
- req_app_link->app_port = req_rpc_data->app_port;
- app_port = req_app_link->app_port;
- req_app_link->apr_action = req_rpc_data->apr_action;
-
- req_rpc_data->app_port = NULL;
- }
-
+ app_port = req_rpc_data->app_port;
if (nxt_slow_path(app_port == NULL)) {
goto fail;
}
- nxt_thread_mutex_lock(&req_rpc_data->app->mutex);
+ nxt_thread_mutex_lock(&app->mutex);
- nxt_queue_insert_tail(&app_port->active_websockets,
- &req_app_link->link_port_websockets);
+ app_port->main_app_port->active_websockets++;
- nxt_thread_mutex_unlock(&req_rpc_data->app->mutex);
+ nxt_thread_mutex_unlock(&app->mutex);
nxt_router_app_port_release(task, app_port, NXT_APR_UPGRADE);
- req_app_link->apr_action = NXT_APR_CLOSE;
+ req_rpc_data->apr_action = NXT_APR_CLOSE;
- nxt_debug(task, "req_app_link stream #%uD upgrade",
- req_app_link->stream);
+ nxt_debug(task, "stream #%uD upgrade", req_rpc_data->stream);
r->state = &nxt_http_websocket;
@@ -3921,6 +3651,94 @@ fail:
}
+static void
+nxt_router_req_headers_ack_handler(nxt_task_t *task,
+ nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data)
+{
+ nxt_app_t *app;
+ nxt_bool_t start_process;
+ nxt_port_t *app_port, *main_app_port, *idle_port;
+ nxt_queue_link_t *idle_lnk;
+ nxt_http_request_t *r;
+
+ nxt_debug(task, "stream #%uD: got ack from %PI:%d",
+ req_rpc_data->stream,
+ msg->port_msg.pid, msg->port_msg.reply_port);
+
+ nxt_port_rpc_ex_set_peer(task, msg->port, req_rpc_data,
+ msg->port_msg.pid);
+
+ app = req_rpc_data->app;
+
+ start_process = 0;
+
+ nxt_thread_mutex_lock(&app->mutex);
+
+ app_port = nxt_port_hash_find(&app->port_hash, msg->port_msg.pid,
+ msg->port_msg.reply_port);
+ if (nxt_slow_path(app_port == NULL)) {
+ nxt_thread_mutex_unlock(&app->mutex);
+
+ r = req_rpc_data->request;
+ nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
+
+ return;
+ }
+
+ main_app_port = app_port->main_app_port;
+
+ if (nxt_queue_chk_remove(&main_app_port->idle_link)) {
+ app->idle_processes--;
+
+ /* Check port was in 'spare_ports' using idle_start field. */
+ if (main_app_port->idle_start == 0
+ && app->idle_processes >= app->spare_processes)
+ {
+ /*
+ * If there is a vacant space in spare ports,
+ * move the last idle to spare_ports.
+ */
+ nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
+
+ idle_lnk = nxt_queue_last(&app->idle_ports);
+ idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link);
+ nxt_queue_remove(idle_lnk);
+
+ nxt_queue_insert_tail(&app->spare_ports, idle_lnk);
+
+ idle_port->idle_start = 0;
+ }
+
+ if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) {
+ app->pending_processes++;
+ start_process = 1;
+ }
+ }
+
+ main_app_port->active_requests++;
+
+ nxt_port_inc_use(app_port);
+
+ nxt_thread_mutex_unlock(&app->mutex);
+
+ if (start_process) {
+ nxt_router_start_app_process(task, app);
+ }
+
+ nxt_port_use(task, req_rpc_data->app_port, -1);
+
+ req_rpc_data->app_port = app_port;
+
+ if (app->timeout != 0) {
+ r = req_rpc_data->request;
+
+ r->timer.handler = nxt_router_app_timeout;
+ r->timer_data = req_rpc_data;
+ nxt_timer_add(task->thread->engine, &r->timer, app->timeout);
+ }
+}
+
+
static const nxt_http_request_state_t nxt_http_request_send_state
nxt_aligned(64) =
{
@@ -3949,42 +3767,14 @@ static void
nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
void *data)
{
- nxt_int_t res;
- nxt_port_t *port;
- nxt_bool_t cancelled;
- nxt_request_app_link_t *req_app_link;
nxt_request_rpc_data_t *req_rpc_data;
req_rpc_data = data;
- req_app_link = req_rpc_data->req_app_link;
-
- if (req_app_link != NULL) {
- cancelled = nxt_router_msg_cancel(task, &req_app_link->msg_info,
- req_app_link->stream);
- if (cancelled) {
- res = nxt_router_app_port(task, req_rpc_data->app, req_app_link);
-
- if (res == NXT_OK) {
- port = req_app_link->app_port;
-
- if (nxt_slow_path(port == NULL)) {
- nxt_log(task, NXT_LOG_ERR,
- "port is NULL in cancelled req_app_link");
- return;
- }
-
- nxt_port_rpc_ex_set_peer(task, task->thread->engine->port,
- req_rpc_data, port->pid);
-
- nxt_router_app_prepare_request(task, req_app_link);
- }
+ req_rpc_data->rpc_cancel = 0;
- msg->port_msg.last = 0;
-
- return;
- }
- }
+ /* TODO cancel message and return if cancelled. */
+ // nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream);
if (req_rpc_data->request != NULL) {
nxt_http_request_error(task, req_rpc_data->request,
@@ -4008,6 +3798,8 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_assert(app_joint != NULL);
nxt_assert(port != NULL);
+ nxt_assert(port->type == NXT_PROCESS_APP);
+ nxt_assert(port->id == 0);
app = app_joint->app;
@@ -4022,6 +3814,7 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
}
port->app = app;
+ port->main_app_port = port;
nxt_thread_mutex_lock(&app->mutex);
@@ -4029,24 +3822,60 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
app->pending_processes--;
app->processes++;
+ nxt_port_hash_add(&app->port_hash, port);
+ app->port_hash_count++;
nxt_thread_mutex_unlock(&app->mutex);
nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d",
&app->name, port->pid, app->processes, app->pending_processes);
+ nxt_router_app_shared_port_send(task, port);
+
nxt_router_app_port_release(task, port, NXT_APR_NEW_PORT);
}
+static nxt_int_t
+nxt_router_app_shared_port_send(nxt_task_t *task, nxt_port_t *app_port)
+{
+ nxt_buf_t *b;
+ nxt_port_t *port;
+ nxt_port_msg_new_port_t *msg;
+
+ b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
+ sizeof(nxt_port_data_t));
+ if (nxt_slow_path(b == NULL)) {
+ return NXT_ERROR;
+ }
+
+ port = app_port->app->shared_port;
+
+ nxt_debug(task, "send port %FD to process %PI",
+ port->pair[0], app_port->pid);
+
+ b->mem.free += sizeof(nxt_port_msg_new_port_t);
+ msg = (nxt_port_msg_new_port_t *) b->mem.pos;
+
+ msg->id = port->id;
+ msg->pid = port->pid;
+ msg->max_size = port->max_size;
+ msg->max_share = port->max_share;
+ msg->type = port->type;
+
+ return nxt_port_socket_twrite(task, app_port,
+ NXT_PORT_MSG_NEW_PORT,
+ port->pair[0],
+ 0, 0, b, NULL);
+}
+
+
static void
nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
void *data)
{
- nxt_app_t *app;
- nxt_app_joint_t *app_joint;
- nxt_queue_link_t *lnk;
- nxt_request_app_link_t *req_app_link;
+ nxt_app_t *app;
+ nxt_app_joint_t *app_joint;
app_joint = data;
@@ -4070,32 +3899,11 @@ nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
app->pending_processes--;
- if (!nxt_queue_is_empty(&app->requests)) {
- lnk = nxt_queue_last(&app->requests);
- nxt_queue_remove(lnk);
- lnk->next = NULL;
-
- req_app_link = nxt_queue_link_data(lnk, nxt_request_app_link_t,
- link_app_requests);
-
- } else {
- req_app_link = NULL;
- }
-
nxt_thread_mutex_unlock(&app->mutex);
- if (req_app_link != NULL) {
- nxt_debug(task, "app '%V' %p abort next stream #%uD",
- &app->name, app, req_app_link->stream);
-
- nxt_request_app_link_error(task, app, req_app_link,
- "Failed to start application process");
- nxt_request_app_link_use(task, req_app_link, -1);
- }
+ /* TODO req_app_link to cancel first pending message */
}
-nxt_inline nxt_port_t *
-nxt_router_app_get_port_for_quit(nxt_app_t *app);
void
nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i)
@@ -4116,63 +3924,6 @@ nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i)
}
-nxt_inline nxt_bool_t
-nxt_router_app_first_port_busy(nxt_app_t *app)
-{
- nxt_port_t *port;
- nxt_queue_link_t *lnk;
-
- lnk = nxt_queue_first(&app->ports);
- port = nxt_queue_link_data(lnk, nxt_port_t, app_link);
-
- return port->app_pending_responses > 0;
-}
-
-
-nxt_inline nxt_port_t *
-nxt_router_pop_first_port(nxt_app_t *app)
-{
- nxt_port_t *port;
- nxt_queue_link_t *lnk;
-
- lnk = nxt_queue_first(&app->ports);
- nxt_queue_remove(lnk);
-
- port = nxt_queue_link_data(lnk, nxt_port_t, app_link);
-
- port->app_pending_responses++;
-
- if (nxt_queue_chk_remove(&port->idle_link)) {
- app->idle_processes--;
-
- if (port->idle_start == 0) {
- nxt_assert(app->idle_processes < app->spare_processes);
-
- } else {
- nxt_assert(app->idle_processes >= app->spare_processes);
-
- port->idle_start = 0;
- }
- }
-
- if ((app->max_pending_responses == 0
- || port->app_pending_responses < app->max_pending_responses)
- && (app->max_requests == 0
- || port->app_responses + port->app_pending_responses
- < app->max_requests))
- {
- nxt_queue_insert_tail(&app->ports, lnk);
-
- nxt_port_inc_use(port);
-
- } else {
- lnk->next = NULL;
- }
-
- return port;
-}
-
-
nxt_inline nxt_port_t *
nxt_router_app_get_port_for_quit(nxt_app_t *app)
{
@@ -4184,12 +3935,6 @@ nxt_router_app_get_port_for_quit(nxt_app_t *app)
nxt_queue_each(port, &app->ports, nxt_port_t, app_link) {
- if (port->app_pending_responses > 0) {
- port = NULL;
-
- continue;
- }
-
/* Caller is responsible to decrease port use count. */
nxt_queue_chk_remove(&port->app_link);
@@ -4197,6 +3942,9 @@ nxt_router_app_get_port_for_quit(nxt_app_t *app)
app->idle_processes--;
}
+ nxt_port_hash_remove(&app->port_hash, port);
+ app->port_hash_count--;
+
port->app = NULL;
app->processes--;
@@ -4222,71 +3970,36 @@ nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app)
static void
-nxt_router_app_process_request(nxt_task_t *task, void *obj, void *data)
-{
- nxt_request_app_link_t *req_app_link;
-
- req_app_link = data;
-
-#if (NXT_DEBUG)
- {
- nxt_app_t *app;
-
- app = obj;
-
- nxt_assert(app != NULL);
- nxt_assert(req_app_link != NULL);
- nxt_assert(req_app_link->app_port != NULL);
-
- nxt_debug(task, "app '%V' %p process next stream #%uD",
- &app->name, app, req_app_link->stream);
- }
-#endif
-
- nxt_router_app_prepare_request(task, req_app_link);
-
- nxt_request_app_link_use(task, req_app_link, -1);
-}
-
-
-static void
nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
nxt_apr_action_t action)
{
- int inc_use;
- uint32_t dec_pending, got_response;
- nxt_app_t *app;
- nxt_bool_t port_unchained;
- nxt_bool_t send_quit, cancelled, adjust_idle_timer;
- nxt_queue_link_t *lnk;
- nxt_request_app_link_t *req_app_link, *pending_ra, *re_ra;
- nxt_port_select_state_t state;
+ int inc_use;
+ uint32_t got_response, dec_requests;
+ nxt_app_t *app;
+ nxt_bool_t port_unchained, send_quit, adjust_idle_timer;
+ nxt_port_t *main_app_port;
nxt_assert(port != NULL);
nxt_assert(port->app != NULL);
- req_app_link = NULL;
-
app = port->app;
inc_use = 0;
- dec_pending = 0;
got_response = 0;
+ dec_requests = 0;
switch (action) {
case NXT_APR_NEW_PORT:
break;
case NXT_APR_REQUEST_FAILED:
- dec_pending = 1;
+ dec_requests = 1;
inc_use = -1;
break;
case NXT_APR_GOT_RESPONSE:
- dec_pending = 1;
got_response = 1;
inc_use = -1;
break;
case NXT_APR_UPGRADE:
- dec_pending = 1;
got_response = 1;
break;
case NXT_APR_CLOSE:
@@ -4294,120 +4007,49 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
break;
}
- nxt_thread_mutex_lock(&app->mutex);
-
- port->app_pending_responses -= dec_pending;
- port->app_responses += got_response;
+ nxt_debug(task, "app '%V' release port %PI:%d: %d %d", &app->name,
+ port->pid, port->id,
+ (int) inc_use, (int) got_response);
- if (port->pair[1] != -1
- && (app->max_pending_responses == 0
- || port->app_pending_responses < app->max_pending_responses)
- && (app->max_requests == 0
- || port->app_responses + port->app_pending_responses
- < app->max_requests))
- {
- if (port->app_link.next == NULL) {
- if (port->app_pending_responses > 0) {
- nxt_queue_insert_tail(&app->ports, &port->app_link);
+ if (port == app->shared_port) {
+ nxt_thread_mutex_lock(&app->mutex);
- } else {
- nxt_queue_insert_head(&app->ports, &port->app_link);
- }
+ app->active_requests -= got_response + dec_requests;
- nxt_port_inc_use(port);
+ nxt_thread_mutex_unlock(&app->mutex);
- } else {
- if (port->app_pending_responses == 0
- && nxt_queue_first(&app->ports) != &port->app_link)
- {
- nxt_queue_remove(&port->app_link);
- nxt_queue_insert_head(&app->ports, &port->app_link);
- }
- }
+ goto adjust_use;
}
- if (!nxt_queue_is_empty(&app->ports)
- && !nxt_queue_is_empty(&app->requests))
- {
- lnk = nxt_queue_first(&app->requests);
- nxt_queue_remove(lnk);
- lnk->next = NULL;
-
- req_app_link = nxt_queue_link_data(lnk, nxt_request_app_link_t,
- link_app_requests);
+ main_app_port = port->main_app_port;
- req_app_link->app_port = nxt_router_pop_first_port(app);
+ nxt_thread_mutex_lock(&app->mutex);
- if (req_app_link->app_port->app_pending_responses > 1) {
- nxt_request_app_link_pending(task, app, req_app_link);
- }
- }
+ main_app_port->app_responses += got_response;
+ main_app_port->active_requests -= got_response + dec_requests;
+ app->active_requests -= got_response + dec_requests;
- /* Pop first pending request for this port. */
- if (dec_pending > 0
- && !nxt_queue_is_empty(&port->pending_requests))
+ if (main_app_port->pair[1] != -1
+ && (app->max_requests == 0
+ || main_app_port->app_responses < app->max_requests))
{
- lnk = nxt_queue_first(&port->pending_requests);
- nxt_queue_remove(lnk);
- lnk->next = NULL;
-
- pending_ra = nxt_queue_link_data(lnk, nxt_request_app_link_t,
- link_port_pending);
-
- nxt_assert(pending_ra->link_app_pending.next != NULL);
-
- nxt_queue_remove(&pending_ra->link_app_pending);
- pending_ra->link_app_pending.next = NULL;
-
- } else {
- pending_ra = NULL;
- }
-
- /* Try to cancel and re-schedule first stalled request for this app. */
- if (got_response > 0 && !nxt_queue_is_empty(&app->pending)) {
- lnk = nxt_queue_first(&app->pending);
-
- re_ra = nxt_queue_link_data(lnk, nxt_request_app_link_t,
- link_app_pending);
-
- if (re_ra->res_time <= nxt_thread_monotonic_time(task->thread)) {
+ if (main_app_port->app_link.next == NULL) {
+ nxt_queue_insert_tail(&app->ports, &main_app_port->app_link);
- nxt_debug(task, "app '%V' stalled request #%uD detected",
- &app->name, re_ra->stream);
-
- cancelled = nxt_router_msg_cancel(task, &re_ra->msg_info,
- re_ra->stream);
-
- if (cancelled) {
- state.req_app_link = re_ra;
- state.app = app;
-
- /*
- * Need to increment use count "in advance" because
- * nxt_router_port_select() will remove re_ra from lists
- * and decrement use count.
- */
- nxt_request_app_link_inc_use(re_ra);
-
- nxt_router_port_select(task, &state);
-
- goto re_ra_cancelled;
- }
+ nxt_port_inc_use(main_app_port);
}
}
- re_ra = NULL;
-
-re_ra_cancelled:
-
send_quit = (app->max_requests > 0
- && port->app_pending_responses == 0
- && port->app_responses >= app->max_requests);
+ && main_app_port->app_responses >= app->max_requests);
if (send_quit) {
- port_unchained = nxt_queue_chk_remove(&port->app_link);
+ port_unchained = nxt_queue_chk_remove(&main_app_port->app_link);
- port->app = NULL;
+ nxt_port_hash_remove(&app->port_hash, main_app_port);
+ app->port_hash_count--;
+
+ main_app_port->app = NULL;
app->processes--;
} else {
@@ -4416,9 +4058,10 @@ re_ra_cancelled:
adjust_idle_timer = 0;
- if (port->pair[1] != -1 && !send_quit && port->app_pending_responses == 0
- && nxt_queue_is_empty(&port->active_websockets)
- && port->idle_link.next == NULL)
+ if (main_app_port->pair[1] != -1 && !send_quit
+ && main_app_port->active_requests == 0
+ && main_app_port->active_websockets == 0
+ && main_app_port->idle_link.next == NULL)
{
if (app->idle_processes == app->spare_processes
&& app->adjust_idle_work.data == NULL)
@@ -4429,12 +4072,12 @@ re_ra_cancelled:
}
if (app->idle_processes < app->spare_processes) {
- nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
+ nxt_queue_insert_tail(&app->spare_ports, &main_app_port->idle_link);
} else {
- nxt_queue_insert_tail(&app->idle_ports, &port->idle_link);
+ nxt_queue_insert_tail(&app->idle_ports, &main_app_port->idle_link);
- port->idle_start = task->thread->engine->timers.now;
+ main_app_port->idle_start = task->thread->engine->timers.now;
}
app->idle_processes++;
@@ -4447,60 +4090,22 @@ re_ra_cancelled:
nxt_event_engine_post(app->engine, &app->adjust_idle_work);
}
- if (pending_ra != NULL) {
- nxt_request_app_link_use(task, pending_ra, -1);
- }
-
- if (re_ra != NULL) {
- if (nxt_router_port_post_select(task, &state) == NXT_OK) {
- /*
- * Reference counter already incremented above, this will
- * keep re_ra while nxt_router_app_process_request()
- * task is in queue. Reference counter decreased in
- * nxt_router_app_process_request() after processing.
- */
-
- nxt_work_queue_add(&task->thread->engine->fast_work_queue,
- nxt_router_app_process_request,
- &task->thread->engine->task, app, re_ra);
-
- } else {
- nxt_request_app_link_use(task, re_ra, -1);
- }
- }
-
- if (req_app_link != NULL) {
- /*
- * There should be call nxt_request_app_link_inc_use(req_app_link),
- * because of one more link in the queue. But one link was
- * recently removed from app->requests linked list.
- * Corresponding decrement is in nxt_router_app_process_request().
- */
-
- nxt_work_queue_add(&task->thread->engine->fast_work_queue,
- nxt_router_app_process_request,
- &task->thread->engine->task, app, req_app_link);
-
- goto adjust_use;
- }
-
/* ? */
- if (port->pair[1] == -1) {
+ if (main_app_port->pair[1] == -1) {
nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)",
- &app->name, app, port, port->pid);
+ &app->name, app, main_app_port, main_app_port->pid);
goto adjust_use;
}
if (send_quit) {
- nxt_debug(task, "app '%V' %p send QUIT to port",
- &app->name, app);
+ nxt_debug(task, "app '%V' %p send QUIT to port", &app->name, app);
- nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT,
- -1, 0, 0, NULL);
+ nxt_port_socket_write(task, main_app_port, NXT_PORT_MSG_QUIT, -1, 0, 0,
+ NULL);
if (port_unchained) {
- nxt_port_use(task, port, -1);
+ nxt_port_use(task, main_app_port, -1);
}
goto adjust_use;
@@ -4529,6 +4134,18 @@ nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port)
nxt_thread_mutex_lock(&app->mutex);
+ nxt_port_hash_remove(&app->port_hash, port);
+ app->port_hash_count--;
+
+ if (port->id != 0) {
+ nxt_thread_mutex_unlock(&app->mutex);
+
+ nxt_debug(task, "app '%V' port (%PI, %d) closed", &app->name,
+ port->pid, port->id);
+
+ return;
+ }
+
unchain = nxt_queue_chk_remove(&port->app_link);
if (nxt_queue_chk_remove(&port->idle_link)) {
@@ -4553,8 +4170,7 @@ nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port)
start_process = !task->thread->engine->shutdown
&& nxt_router_app_can_start(app)
- && (!nxt_queue_is_empty(&app->requests)
- || nxt_router_app_need_start(app));
+ && nxt_router_app_need_start(app);
if (start_process) {
app->pending_processes++;
@@ -4603,6 +4219,10 @@ nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data)
app->adjust_idle_work.data = NULL;
}
+ nxt_debug(task, "app '%V' idle_processes %d, spare_processes %d",
+ &app->name,
+ (int) app->idle_processes, (int) app->spare_processes);
+
while (app->idle_processes > app->spare_processes) {
nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
@@ -4612,6 +4232,10 @@ nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data)
timeout = port->idle_start + app->idle_timeout;
+ nxt_debug(task, "app '%V' pid %PI, start %M, timeout %M, threshold %M",
+ &app->name, port->pid,
+ port->idle_start, timeout, threshold);
+
if (timeout > threshold) {
break;
}
@@ -4621,6 +4245,9 @@ nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data)
nxt_queue_chk_remove(&port->app_link);
+ nxt_port_hash_remove(&app->port_hash, port);
+ app->port_hash_count--;
+
app->idle_processes--;
app->processes--;
port->app = NULL;
@@ -4704,12 +4331,23 @@ nxt_router_free_app(nxt_task_t *task, void *obj, void *data)
}
nxt_assert(app->processes == 0);
+ nxt_assert(app->active_requests == 0);
+ nxt_assert(app->port_hash_count == 0);
nxt_assert(app->idle_processes == 0);
- nxt_assert(nxt_queue_is_empty(&app->requests));
nxt_assert(nxt_queue_is_empty(&app->ports));
nxt_assert(nxt_queue_is_empty(&app->spare_ports));
nxt_assert(nxt_queue_is_empty(&app->idle_ports));
+ nxt_port_mmaps_destroy(&app->outgoing, 1);
+
+ nxt_thread_mutex_destroy(&app->outgoing.mutex);
+
+ if (app->shared_port != NULL) {
+ app->shared_port->app = NULL;
+ nxt_port_close(task, app->shared_port);
+ nxt_port_use(task, app->shared_port, -1);
+ }
+
nxt_thread_mutex_destroy(&app->mutex);
nxt_mp_destroy(app->mem_pool);
@@ -4726,178 +4364,34 @@ nxt_router_free_app(nxt_task_t *task, void *obj, void *data)
static void
-nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state)
-{
- int ra_use_delta;
- nxt_app_t *app;
- nxt_bool_t can_start_process;
- nxt_request_app_link_t *req_app_link;
-
- req_app_link = state->req_app_link;
- app = state->app;
-
- state->failed_port_use_delta = 0;
- ra_use_delta = -nxt_queue_chk_remove(&req_app_link->link_app_requests);
-
- if (nxt_queue_chk_remove(&req_app_link->link_port_pending))
- {
- nxt_assert(req_app_link->link_app_pending.next != NULL);
-
- nxt_queue_remove(&req_app_link->link_app_pending);
- req_app_link->link_app_pending.next = NULL;
-
- ra_use_delta--;
- }
-
- state->failed_port = req_app_link->app_port;
-
- if (req_app_link->app_port != NULL) {
- state->failed_port_use_delta--;
-
- state->failed_port->app_pending_responses--;
-
- if (nxt_queue_chk_remove(&state->failed_port->app_link)) {
- state->failed_port_use_delta--;
- }
-
- req_app_link->app_port = NULL;
- }
-
- can_start_process = nxt_router_app_can_start(app);
-
- state->port = NULL;
- state->start_process = 0;
-
- if (nxt_queue_is_empty(&app->ports)
- || (can_start_process && nxt_router_app_first_port_busy(app)) )
- {
- req_app_link = nxt_request_app_link_alloc(task, req_app_link,
- req_app_link->req_rpc_data);
- if (nxt_slow_path(req_app_link == NULL)) {
- goto fail;
- }
-
- if (nxt_slow_path(state->failed_port != NULL)) {
- nxt_queue_insert_head(&app->requests,
- &req_app_link->link_app_requests);
-
- } else {
- nxt_queue_insert_tail(&app->requests,
- &req_app_link->link_app_requests);
- }
-
- nxt_request_app_link_inc_use(req_app_link);
-
- nxt_debug(task, "req_app_link stream #%uD enqueue to app->requests",
- req_app_link->stream);
-
- if (can_start_process) {
- app->pending_processes++;
- state->start_process = 1;
- }
-
- } else {
- state->port = nxt_router_pop_first_port(app);
-
- if (state->port->app_pending_responses > 1) {
- req_app_link = nxt_request_app_link_alloc(task, req_app_link,
- req_app_link->req_rpc_data);
- if (nxt_slow_path(req_app_link == NULL)) {
- goto fail;
- }
-
- req_app_link->app_port = state->port;
-
- nxt_request_app_link_pending(task, app, req_app_link);
- }
-
- if (can_start_process && nxt_router_app_need_start(app)) {
- app->pending_processes++;
- state->start_process = 1;
- }
- }
-
- nxt_request_app_link_chk_use(req_app_link, ra_use_delta);
-
-fail:
-
- state->shared_ra = req_app_link;
-}
-
-
-static nxt_int_t
-nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state)
+nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
+ nxt_request_rpc_data_t *req_rpc_data)
{
- nxt_int_t res;
- nxt_app_t *app;
- nxt_request_app_link_t *req_app_link;
-
- req_app_link = state->shared_ra;
- app = state->app;
-
- if (state->failed_port_use_delta != 0) {
- nxt_port_use(task, state->failed_port, state->failed_port_use_delta);
- }
-
- if (nxt_slow_path(req_app_link == NULL)) {
- if (state->port != NULL) {
- nxt_port_use(task, state->port, -1);
- }
-
- nxt_request_app_link_error(task, app, state->req_app_link,
- "Failed to allocate shared req<->app link");
-
- return NXT_ERROR;
- }
-
- if (state->port != NULL) {
- nxt_debug(task, "already have port for app '%V' %p ", &app->name, app);
+ nxt_bool_t start_process;
+ nxt_port_t *port;
- req_app_link->app_port = state->port;
+ start_process = 0;
- if (state->start_process) {
- nxt_router_start_app_process(task, app);
- }
+ nxt_thread_mutex_lock(&app->mutex);
- return NXT_OK;
- }
+ port = app->shared_port;
+ nxt_port_inc_use(port);
- if (!state->start_process) {
- nxt_debug(task, "app '%V' %p too many running or pending processes",
- &app->name, app);
+ app->active_requests++;
- return NXT_AGAIN;
+ if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) {
+ app->pending_processes++;
+ start_process = 1;
}
- res = nxt_router_start_app_process(task, app);
+ nxt_thread_mutex_unlock(&app->mutex);
- if (nxt_slow_path(res != NXT_OK)) {
- nxt_request_app_link_error(task, app, req_app_link,
- "Failed to start app process");
+ req_rpc_data->app_port = port;
+ req_rpc_data->apr_action = NXT_APR_REQUEST_FAILED;
- return NXT_ERROR;
+ if (start_process) {
+ nxt_router_start_app_process(task, app);
}
-
- return NXT_AGAIN;
-}
-
-
-static nxt_int_t
-nxt_router_app_port(nxt_task_t *task, nxt_app_t *app,
- nxt_request_app_link_t *req_app_link)
-{
- nxt_port_select_state_t state;
-
- state.req_app_link = req_app_link;
- state.app = app;
-
- nxt_thread_mutex_lock(&app->mutex);
-
- nxt_router_port_select(task, &state);
-
- nxt_thread_mutex_unlock(&app->mutex);
-
- return nxt_router_port_post_select(task, &state);
}
@@ -4905,10 +4399,7 @@ void
nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r,
nxt_app_t *app)
{
- nxt_int_t res;
- nxt_port_t *port;
nxt_event_engine_t *engine;
- nxt_request_app_link_t ra_local, *req_app_link;
nxt_request_rpc_data_t *req_rpc_data;
engine = task->thread->engine;
@@ -4927,7 +4418,7 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r,
* in port handlers. Need to fixup request memory pool. Counterpart
* release will be called via following call chain:
* nxt_request_rpc_data_unlink() ->
- * nxt_router_http_request_done() ->
+ * nxt_router_http_request_release_post() ->
* nxt_router_http_request_release()
*/
nxt_mp_retain(r->mem_pool);
@@ -4939,29 +4430,37 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r,
req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data);
req_rpc_data->app = app;
+ req_rpc_data->msg_info.body_fd = -1;
+ req_rpc_data->rpc_cancel = 1;
nxt_router_app_use(task, app, 1);
req_rpc_data->request = r;
r->req_rpc_data = req_rpc_data;
- req_app_link = &ra_local;
- nxt_request_app_link_init(task, req_app_link, req_rpc_data);
+ if (r->last != NULL) {
+ r->last->completion_handler = nxt_router_http_request_done;
+ }
- res = nxt_router_app_port(task, app, req_app_link);
- req_app_link = req_rpc_data->req_app_link;
+ nxt_router_app_port_get(task, app, req_rpc_data);
+ nxt_router_app_prepare_request(task, req_rpc_data);
+}
- if (res == NXT_OK) {
- port = req_app_link->app_port;
- nxt_assert(port != NULL);
+static void
+nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_http_request_t *r;
+
+ r = data;
- nxt_port_rpc_ex_set_peer(task, engine->port, req_rpc_data, port->pid);
+ nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data);
- nxt_router_app_prepare_request(task, req_app_link);
+ if (r->req_rpc_data) {
+ nxt_request_rpc_data_unlink(task, r->req_rpc_data);
}
- nxt_request_app_link_use(task, req_app_link, -1);
+ nxt_http_request_close_handler(task, r, r->proto.any);
}
@@ -4973,76 +4472,80 @@ nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj, void *data)
static void
nxt_router_app_prepare_request(nxt_task_t *task,
- nxt_request_app_link_t *req_app_link)
+ nxt_request_rpc_data_t *req_rpc_data)
{
- nxt_buf_t *buf;
+ nxt_app_t *app;
+ nxt_buf_t *buf, *body;
nxt_int_t res;
nxt_port_t *port, *reply_port;
- nxt_apr_action_t apr_action;
- nxt_assert(req_app_link->app_port != NULL);
+ app = req_rpc_data->app;
- port = req_app_link->app_port;
- reply_port = req_app_link->reply_port;
+ nxt_assert(app != NULL);
- apr_action = NXT_APR_REQUEST_FAILED;
+ port = req_rpc_data->app_port;
- buf = nxt_router_prepare_msg(task, req_app_link->request, port,
- nxt_app_msg_prefix[port->app->type]);
+ nxt_assert(port != NULL);
+
+ reply_port = task->thread->engine->port;
+ buf = nxt_router_prepare_msg(task, req_rpc_data->request, app,
+ nxt_app_msg_prefix[app->type]);
if (nxt_slow_path(buf == NULL)) {
- nxt_request_app_link_error(task, port->app, req_app_link,
- "Failed to prepare message for application");
- goto release_port;
+ nxt_alert(task, "stream #%uD, app '%V': failed to prepare app message",
+ req_rpc_data->stream, &app->name);
+
+ nxt_http_request_error(task, req_rpc_data->request,
+ NXT_HTTP_INTERNAL_SERVER_ERROR);
+
+ return;
}
nxt_debug(task, "about to send %O bytes buffer to app process port %d",
nxt_buf_used_size(buf),
port->socket.fd);
- apr_action = NXT_APR_NEW_PORT;
-
- req_app_link->msg_info.buf = buf;
- req_app_link->msg_info.completion_handler = buf->completion_handler;
+ req_rpc_data->msg_info.buf = buf;
+ req_rpc_data->msg_info.completion_handler = buf->completion_handler;
- for (; buf; buf = buf->next) {
+ do {
buf->completion_handler = nxt_router_dummy_buf_completion;
- }
+ buf = buf->next;
+ } while (buf != NULL);
- buf = req_app_link->msg_info.buf;
+ buf = req_rpc_data->msg_info.buf;
- res = nxt_port_mmap_get_tracking(task, &port->process->outgoing,
- &req_app_link->msg_info.tracking,
- req_app_link->stream);
- if (nxt_slow_path(res != NXT_OK)) {
- nxt_request_app_link_error(task, port->app, req_app_link,
- "Failed to get tracking area");
- goto release_port;
- }
+ body = req_rpc_data->request->body;
- if (req_app_link->body_fd != -1) {
- nxt_debug(task, "stream #%uD: send body fd %d", req_app_link->stream,
- req_app_link->body_fd);
+ if (body != NULL && nxt_buf_is_file(body)) {
+ req_rpc_data->msg_info.body_fd = body->file->fd;
- lseek(req_app_link->body_fd, 0, SEEK_SET);
+ body->file->fd = -1;
+
+ } else {
+ req_rpc_data->msg_info.body_fd = -1;
}
- res = nxt_port_socket_twrite(task, port, NXT_PORT_MSG_REQ_HEADERS,
- req_app_link->body_fd,
- req_app_link->stream, reply_port->id, buf,
- &req_app_link->msg_info.tracking);
+ if (req_rpc_data->msg_info.body_fd != -1) {
+ nxt_debug(task, "stream #%uD: send body fd %d", req_rpc_data->stream,
+ req_rpc_data->msg_info.body_fd);
- if (nxt_slow_path(res != NXT_OK)) {
- nxt_request_app_link_error(task, port->app, req_app_link,
- "Failed to send message to application");
- goto release_port;
+ lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET);
}
-release_port:
+ res = nxt_port_socket_twrite(task, port,
+ NXT_PORT_MSG_REQ_HEADERS,
+ req_rpc_data->msg_info.body_fd,
+ req_rpc_data->stream, reply_port->id, buf,
+ NULL);
- nxt_router_app_port_release(task, port, apr_action);
+ if (nxt_slow_path(res != NXT_OK)) {
+ nxt_alert(task, "stream #%uD, app '%V': failed to send app message",
+ req_rpc_data->stream, &app->name);
- nxt_request_app_link_update_peer(task, req_app_link);
+ nxt_http_request_error(task, req_rpc_data->request,
+ NXT_HTTP_INTERNAL_SERVER_ERROR);
+ }
}
@@ -5100,7 +4603,7 @@ nxt_fields_next(nxt_fields_iter_t *i)
static nxt_buf_t *
nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r,
- nxt_port_t *port, const nxt_str_t *prefix)
+ nxt_app_t *app, const nxt_str_t *prefix)
{
void *target_pos, *query_pos;
u_char *pos, *end, *p, c;
@@ -5141,7 +4644,7 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r,
return NULL;
}
- out = nxt_port_mmap_get_buf(task, &port->process->outgoing,
+ out = nxt_port_mmap_get_buf(task, &app->outgoing,
nxt_min(req_size + content_length, PORT_MMAP_DATA_SIZE));
if (nxt_slow_path(out == NULL)) {
return NULL;
@@ -5323,8 +4826,7 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r,
if (buf == NULL) {
free_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
- buf = nxt_port_mmap_get_buf(task, &port->process->outgoing,
- free_size);
+ buf = nxt_port_mmap_get_buf(task, &app->outgoing, free_size);
if (nxt_slow_path(buf == NULL)) {
while (out != NULL) {
buf = out->next;
@@ -5372,15 +4874,9 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r,
static void
nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
{
- nxt_app_t *app;
- nxt_bool_t cancelled, unlinked;
- nxt_port_t *port;
nxt_timer_t *timer;
- nxt_queue_link_t *lnk;
nxt_http_request_t *r;
- nxt_request_app_link_t *pending_ra;
nxt_request_rpc_data_t *req_rpc_data;
- nxt_port_select_state_t state;
timer = obj;
@@ -5388,94 +4884,6 @@ nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
r = nxt_timer_data(timer, nxt_http_request_t, timer);
req_rpc_data = r->timer_data;
- app = req_rpc_data->app;
-
- if (app == NULL) {
- goto generate_error;
- }
-
- port = NULL;
- pending_ra = NULL;
-
- if (req_rpc_data->app_port != NULL) {
- port = req_rpc_data->app_port;
- req_rpc_data->app_port = NULL;
- }
-
- if (port == NULL && req_rpc_data->req_app_link != NULL
- && req_rpc_data->req_app_link->app_port != NULL)
- {
- port = req_rpc_data->req_app_link->app_port;
- req_rpc_data->req_app_link->app_port = NULL;
- }
-
- if (port == NULL) {
- goto generate_error;
- }
-
- nxt_thread_mutex_lock(&app->mutex);
-
- unlinked = nxt_queue_chk_remove(&port->app_link);
-
- if (!nxt_queue_is_empty(&port->pending_requests)) {
- lnk = nxt_queue_first(&port->pending_requests);
-
- pending_ra = nxt_queue_link_data(lnk, nxt_request_app_link_t,
- link_port_pending);
-
- nxt_assert(pending_ra->link_app_pending.next != NULL);
-
- nxt_debug(task, "app '%V' pending request #%uD found",
- &app->name, pending_ra->stream);
-
- cancelled = nxt_router_msg_cancel(task, &pending_ra->msg_info,
- pending_ra->stream);
-
- if (cancelled) {
- state.req_app_link = pending_ra;
- state.app = app;
-
- /*
- * Need to increment use count "in advance" because
- * nxt_router_port_select() will remove pending_ra from lists
- * and decrement use count.
- */
- nxt_request_app_link_inc_use(pending_ra);
-
- nxt_router_port_select(task, &state);
-
- } else {
- pending_ra = NULL;
- }
- }
-
- nxt_thread_mutex_unlock(&app->mutex);
-
- if (pending_ra != NULL) {
- if (nxt_router_port_post_select(task, &state) == NXT_OK) {
- /*
- * Reference counter already incremented above, this will
- * keep pending_ra while nxt_router_app_process_request()
- * task is in queue. Reference counter decreased in
- * nxt_router_app_process_request() after processing.
- */
-
- nxt_work_queue_add(&task->thread->engine->fast_work_queue,
- nxt_router_app_process_request,
- &task->thread->engine->task, app, pending_ra);
-
- } else {
- nxt_request_app_link_use(task, pending_ra, -1);
- }
- }
-
- nxt_debug(task, "send quit to app '%V' pid %PI", &app->name, port->pid);
-
- nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
-
- nxt_port_use(task, port, unlinked ? -2 : -1);
-
-generate_error:
nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
@@ -5483,13 +4891,11 @@ generate_error:
}
-static nxt_int_t
-nxt_router_http_request_done(nxt_task_t *task, nxt_http_request_t *r)
+static void
+nxt_router_http_request_release_post(nxt_task_t *task, nxt_http_request_t *r)
{
r->timer.handler = nxt_router_http_request_release;
nxt_timer_add(task->thread->engine, &r->timer, 0);
-
- return NXT_OK;
}
@@ -5498,7 +4904,7 @@ nxt_router_http_request_release(nxt_task_t *task, void *obj, void *data)
{
nxt_http_request_t *r;
- nxt_debug(task, "http app release");
+ nxt_debug(task, "http request pool release");
r = nxt_timer_data(obj, nxt_http_request_t, timer);
@@ -5593,7 +4999,18 @@ nxt_router_get_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
nxt_assert(port->type == NXT_PROCESS_APP);
- mmaps = &port->process->outgoing;
+ if (nxt_slow_path(port->app == NULL)) {
+ nxt_alert(task, "get_mmap_handler: app == NULL for reply port %PI:%d",
+ port->pid, port->id);
+
+ // FIXME
+ nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
+ -1, msg->port_msg.stream, 0, NULL);
+
+ return;
+ }
+
+ mmaps = &port->app->outgoing;
nxt_thread_mutex_lock(&mmaps->mutex);
if (nxt_slow_path(get_mmap_msg->id >= mmaps->size)) {
@@ -5602,6 +5019,9 @@ nxt_router_get_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
nxt_alert(task, "get_mmap_handler: mmap id is too big (%d)",
(int) get_mmap_msg->id);
+ // FIXME
+ nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
+ -1, msg->port_msg.stream, 0, NULL);
return;
}