summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_router.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/nxt_router.c2295
1 files changed, 1158 insertions, 1137 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c
index 788199c7..0e1de6fa 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -15,6 +15,8 @@
#include <nxt_unit_request.h>
#include <nxt_unit_response.h>
#include <nxt_router_request.h>
+#include <nxt_app_queue.h>
+#include <nxt_port_queue.h>
typedef struct {
nxt_str_t type;
@@ -61,59 +63,22 @@ typedef struct {
} nxt_app_rpc_t;
-struct nxt_port_select_state_s {
- nxt_app_t *app;
- nxt_request_app_link_t *req_app_link;
-
- nxt_port_t *failed_port;
- int failed_port_use_delta;
-
- uint8_t start_process; /* 1 bit */
- nxt_request_app_link_t *shared_ra;
- nxt_port_t *port;
-};
-
-typedef struct nxt_port_select_state_s nxt_port_select_state_t;
-
static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process,
nxt_mp_t *mp);
static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data);
static void nxt_router_greet_controller(nxt_task_t *task,
nxt_port_t *controller_port);
-static void nxt_router_port_select(nxt_task_t *task,
- nxt_port_select_state_t *state);
-
-static nxt_int_t nxt_router_port_post_select(nxt_task_t *task,
- nxt_port_select_state_t *state);
-
static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
-static void nxt_request_app_link_update_peer(nxt_task_t *task,
- nxt_request_app_link_t *req_app_link);
-
-nxt_inline void
-nxt_request_app_link_inc_use(nxt_request_app_link_t *req_app_link)
-{
- nxt_atomic_fetch_add(&req_app_link->use_count, 1);
-}
-
-nxt_inline void
-nxt_request_app_link_chk_use(nxt_request_app_link_t *req_app_link, int i)
-{
-#if (NXT_DEBUG)
- int c;
-
- c = nxt_atomic_fetch_add(&req_app_link->use_count, i);
-
- nxt_assert((c + i) > 0);
-#else
- (void) nxt_atomic_fetch_add(&req_app_link->use_count, i);
-#endif
-}
-
-static void nxt_request_app_link_use(nxt_task_t *task,
- nxt_request_app_link_t *req_app_link, int i);
+static void nxt_router_new_port_handler(nxt_task_t *task,
+ nxt_port_recv_msg_t *msg);
+static void nxt_router_conf_data_handler(nxt_task_t *task,
+ nxt_port_recv_msg_t *msg);
+static void nxt_router_remove_pid_handler(nxt_task_t *task,
+ nxt_port_recv_msg_t *msg);
+static void nxt_router_access_log_reopen_handler(nxt_task_t *task,
+ nxt_port_recv_msg_t *msg);
static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
@@ -128,7 +93,22 @@ static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task,
nxt_router_conf_t *rtcf, nxt_conf_value_t *conf);
+
static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
+static nxt_int_t nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data);
+static nxt_int_t nxt_router_apps_hash_add(nxt_router_conf_t *rtcf,
+ nxt_app_t *app);
+static nxt_app_t *nxt_router_apps_hash_get(nxt_router_conf_t *rtcf,
+ nxt_str_t *name);
+static void nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf,
+ int i);
+
+static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task,
+ nxt_port_t *port);
+static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task,
+ nxt_port_t *port);
+static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task,
+ nxt_port_t *port, nxt_fd_t fd);
static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
static void nxt_router_listen_socket_ready(nxt_task_t *task,
@@ -182,6 +162,8 @@ static void nxt_router_engine_post(nxt_event_engine_t *engine,
nxt_work_t *jobs);
static void nxt_router_thread_start(void *data);
+static void nxt_router_rt_add_port(nxt_task_t *task, void *obj,
+ void *data);
static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
void *data);
static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
@@ -194,6 +176,8 @@ static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
void *data);
static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
void *data);
+static void nxt_router_req_headers_ack_handler(nxt_task_t *task,
+ nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data);
static void nxt_router_listen_socket_release(nxt_task_t *task,
nxt_socket_conf_t *skcf);
@@ -218,20 +202,29 @@ static void nxt_router_access_log_reopen_error(nxt_task_t *task,
static void nxt_router_app_port_ready(nxt_task_t *task,
nxt_port_recv_msg_t *msg, void *data);
+static nxt_int_t nxt_router_app_shared_port_send(nxt_task_t *task,
+ nxt_port_t *app_port);
static void nxt_router_app_port_error(nxt_task_t *task,
nxt_port_recv_msg_t *msg, void *data);
+static void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i);
static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app);
static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
nxt_apr_action_t action);
-static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_app_t *app,
- nxt_request_app_link_t *req_app_link);
+static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
+ nxt_request_rpc_data_t *req_rpc_data);
+static void nxt_router_http_request_error(nxt_task_t *task, void *obj,
+ void *data);
+static void nxt_router_http_request_done(nxt_task_t *task, void *obj,
+ void *data);
+static void nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj,
+ void *data);
static void nxt_router_app_prepare_request(nxt_task_t *task,
- nxt_request_app_link_t *req_app_link);
+ nxt_request_rpc_data_t *req_rpc_data);
static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task,
- nxt_http_request_t *r, nxt_port_t *port, const nxt_str_t *prefix);
+ nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix);
static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj,
@@ -248,11 +241,15 @@ static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
static void nxt_router_app_joint_use(nxt_task_t *task,
nxt_app_joint_t *app_joint, int i);
-static nxt_int_t nxt_router_http_request_done(nxt_task_t *task,
+static void nxt_router_http_request_release_post(nxt_task_t *task,
nxt_http_request_t *r);
static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
void *data);
static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
+static void nxt_router_get_port_handler(nxt_task_t *task,
+ nxt_port_recv_msg_t *msg);
+static void nxt_router_get_mmap_handler(nxt_task_t *task,
+ nxt_port_recv_msg_t *msg);
extern const nxt_http_request_state_t nxt_http_websocket;
@@ -274,8 +271,10 @@ static const nxt_str_t *nxt_app_msg_prefix[] = {
static const nxt_port_handlers_t nxt_router_process_port_handlers = {
.quit = nxt_signal_quit_handler,
.new_port = nxt_router_new_port_handler,
+ .get_port = nxt_router_get_port_handler,
.change_file = nxt_port_change_log_file_handler,
.mmap = nxt_port_mmap_handler,
+ .get_mmap = nxt_router_get_mmap_handler,
.data = nxt_router_conf_data_handler,
.remove_pid = nxt_router_remove_pid_handler,
.access_log = nxt_router_access_log_reopen_handler,
@@ -297,6 +296,14 @@ const nxt_process_init_t nxt_router_process = {
};
+/* Queues of nxt_socket_conf_t */
+nxt_queue_t creating_sockets;
+nxt_queue_t pending_sockets;
+nxt_queue_t updating_sockets;
+nxt_queue_t keeping_sockets;
+nxt_queue_t deleting_sockets;
+
+
static nxt_int_t
nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp)
{
@@ -461,6 +468,8 @@ nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
nxt_port_t *router_port;
nxt_runtime_t *rt;
+ nxt_debug(task, "app '%V' start process", &app->name);
+
rt = task->thread->runtime;
router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
@@ -485,99 +494,26 @@ nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
}
-nxt_inline void
-nxt_request_app_link_init(nxt_task_t *task,
- nxt_request_app_link_t *req_app_link, nxt_request_rpc_data_t *req_rpc_data)
-{
- nxt_buf_t *body;
- nxt_event_engine_t *engine;
-
- engine = task->thread->engine;
-
- nxt_memzero(req_app_link, sizeof(nxt_request_app_link_t));
-
- req_app_link->stream = req_rpc_data->stream;
- req_app_link->use_count = 1;
- req_app_link->req_rpc_data = req_rpc_data;
- req_rpc_data->req_app_link = req_app_link;
- req_app_link->reply_port = engine->port;
- req_app_link->request = req_rpc_data->request;
- req_app_link->apr_action = NXT_APR_GOT_RESPONSE;
-
- req_app_link->work.handler = NULL;
- req_app_link->work.task = &engine->task;
- req_app_link->work.obj = req_app_link;
- req_app_link->work.data = engine;
-
- body = req_rpc_data->request->body;
-
- if (body != NULL && nxt_buf_is_file(body)) {
- req_app_link->body_fd = body->file->fd;
-
- body->file->fd = -1;
-
- } else {
- req_app_link->body_fd = -1;
- }
-}
-
-
-nxt_inline nxt_request_app_link_t *
-nxt_request_app_link_alloc(nxt_task_t *task,
- nxt_request_app_link_t *ra_src, nxt_request_rpc_data_t *req_rpc_data)
-{
- nxt_mp_t *mp;
- nxt_request_app_link_t *req_app_link;
-
- if (ra_src != NULL && ra_src->mem_pool != NULL) {
- return ra_src;
- }
-
- mp = req_rpc_data->request->mem_pool;
-
- req_app_link = nxt_mp_alloc(mp, sizeof(nxt_request_app_link_t));
-
- if (nxt_slow_path(req_app_link == NULL)) {
-
- req_rpc_data->req_app_link = NULL;
-
- if (ra_src != NULL) {
- ra_src->req_rpc_data = NULL;
- }
-
- return NULL;
- }
-
- nxt_mp_retain(mp);
-
- nxt_request_app_link_init(task, req_app_link, req_rpc_data);
-
- if (ra_src != NULL) {
- req_app_link->body_fd = ra_src->body_fd;
- }
-
- req_app_link->mem_pool = mp;
-
- return req_app_link;
-}
-
-
nxt_inline nxt_bool_t
-nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info,
- uint32_t stream)
+nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data)
{
- nxt_buf_t *b, *next;
- nxt_bool_t cancelled;
+ nxt_buf_t *b, *next;
+ nxt_bool_t cancelled;
+ nxt_msg_info_t *msg_info;
+
+ msg_info = &req_rpc_data->msg_info;
if (msg_info->buf == NULL) {
return 0;
}
- cancelled = nxt_port_mmap_tracking_cancel(task, &msg_info->tracking,
- stream);
+ cancelled = nxt_app_queue_cancel(req_rpc_data->app->shared_port->queue,
+ msg_info->tracking_cookie,
+ req_rpc_data->stream);
if (cancelled) {
- nxt_debug(task, "stream #%uD: cancelled by router", stream);
+ nxt_debug(task, "stream #%uD: cancelled by router",
+ req_rpc_data->stream);
}
for (b = msg_info->buf; b != NULL; b = next) {
@@ -598,320 +534,204 @@ nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info,
}
-static void
-nxt_request_app_link_update_peer_handler(nxt_task_t *task, void *obj,
- void *data)
+nxt_inline nxt_bool_t
+nxt_queue_chk_remove(nxt_queue_link_t *lnk)
{
- nxt_request_app_link_t *req_app_link;
+ if (lnk->next != NULL) {
+ nxt_queue_remove(lnk);
- req_app_link = obj;
+ lnk->next = NULL;
- nxt_request_app_link_update_peer(task, req_app_link);
+ return 1;
+ }
- nxt_request_app_link_use(task, req_app_link, -1);
+ return 0;
}
-static void
-nxt_request_app_link_update_peer(nxt_task_t *task,
- nxt_request_app_link_t *req_app_link)
+nxt_inline void
+nxt_request_rpc_data_unlink(nxt_task_t *task,
+ nxt_request_rpc_data_t *req_rpc_data)
{
- nxt_event_engine_t *engine;
- nxt_request_rpc_data_t *req_rpc_data;
-
- engine = req_app_link->work.data;
-
- if (task->thread->engine != engine) {
- nxt_request_app_link_inc_use(req_app_link);
-
- req_app_link->work.handler = nxt_request_app_link_update_peer_handler;
- req_app_link->work.task = &engine->task;
- req_app_link->work.next = NULL;
+ nxt_app_t *app;
+ nxt_bool_t unlinked;
+ nxt_http_request_t *r;
- nxt_debug(task, "req_app_link stream #%uD post update peer to %p",
- req_app_link->stream, engine);
+ nxt_router_msg_cancel(task, req_rpc_data);
- nxt_event_engine_post(engine, &req_app_link->work);
+ if (req_rpc_data->app_port != NULL) {
+ nxt_router_app_port_release(task, req_rpc_data->app_port,
+ req_rpc_data->apr_action);
- return;
+ req_rpc_data->app_port = NULL;
}
- nxt_debug(task, "req_app_link stream #%uD update peer",
- req_app_link->stream);
-
- req_rpc_data = req_app_link->req_rpc_data;
-
- if (req_rpc_data != NULL && req_app_link->app_port != NULL) {
- nxt_port_rpc_ex_set_peer(task, engine->port, req_rpc_data,
- req_app_link->app_port->pid);
- }
-}
+ app = req_rpc_data->app;
+ r = req_rpc_data->request;
+ if (r != NULL) {
+ r->timer_data = NULL;
-static void
-nxt_request_app_link_release(nxt_task_t *task,
- nxt_request_app_link_t *req_app_link)
-{
- nxt_mp_t *mp;
- nxt_http_request_t *r;
- nxt_request_rpc_data_t *req_rpc_data;
+ nxt_router_http_request_release_post(task, r);
- nxt_assert(task->thread->engine == req_app_link->work.data);
- nxt_assert(req_app_link->use_count == 0);
+ r->req_rpc_data = NULL;
+ req_rpc_data->request = NULL;
- nxt_debug(task, "req_app_link stream #%uD release", req_app_link->stream);
+ if (app != NULL) {
+ unlinked = 0;
- req_rpc_data = req_app_link->req_rpc_data;
+ nxt_thread_mutex_lock(&app->mutex);
- if (req_rpc_data != NULL) {
- if (nxt_slow_path(req_app_link->err_code != 0)) {
- nxt_http_request_error(task, req_rpc_data->request,
- req_app_link->err_code);
+ if (r->app_link.next != NULL) {
+ nxt_queue_remove(&r->app_link);
+ r->app_link.next = NULL;
- } else {
- req_rpc_data->app_port = req_app_link->app_port;
- req_rpc_data->apr_action = req_app_link->apr_action;
- req_rpc_data->msg_info = req_app_link->msg_info;
+ unlinked = 1;
+ }
- if (req_rpc_data->app->timeout != 0) {
- r = req_rpc_data->request;
+ nxt_thread_mutex_unlock(&app->mutex);
- r->timer.handler = nxt_router_app_timeout;
- r->timer_data = req_rpc_data;
- nxt_timer_add(task->thread->engine, &r->timer,
- req_rpc_data->app->timeout);
+ if (unlinked) {
+ nxt_mp_release(r->mem_pool);
}
-
- req_app_link->app_port = NULL;
- req_app_link->msg_info.buf = NULL;
}
-
- req_rpc_data->req_app_link = NULL;
- req_app_link->req_rpc_data = NULL;
}
- if (req_app_link->app_port != NULL) {
- nxt_router_app_port_release(task, req_app_link->app_port,
- req_app_link->apr_action);
+ if (app != NULL) {
+ nxt_router_app_use(task, app, -1);
- req_app_link->app_port = NULL;
+ req_rpc_data->app = NULL;
}
- if (req_app_link->body_fd != -1) {
- nxt_fd_close(req_app_link->body_fd);
+ if (req_rpc_data->msg_info.body_fd != -1) {
+ nxt_fd_close(req_rpc_data->msg_info.body_fd);
- req_app_link->body_fd = -1;
+ req_rpc_data->msg_info.body_fd = -1;
}
- nxt_router_msg_cancel(task, &req_app_link->msg_info, req_app_link->stream);
+ if (req_rpc_data->rpc_cancel) {
+ req_rpc_data->rpc_cancel = 0;
- mp = req_app_link->mem_pool;
-
- if (mp != NULL) {
- nxt_mp_free(mp, req_app_link);
- nxt_mp_release(mp);
+ nxt_port_rpc_cancel(task, task->thread->engine->port,
+ req_rpc_data->stream);
}
}
static void
-nxt_request_app_link_release_handler(nxt_task_t *task, void *obj, void *data)
+nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
- nxt_request_app_link_t *req_app_link;
-
- req_app_link = obj;
-
- nxt_assert(req_app_link->work.data == data);
-
- nxt_atomic_fetch_add(&req_app_link->use_count, -1);
-
- nxt_request_app_link_release(task, req_app_link);
-}
-
+ nxt_int_t res;
+ nxt_app_t *app;
+ nxt_port_t *port, *main_app_port;
+ nxt_runtime_t *rt;
-static void
-nxt_request_app_link_use(nxt_task_t *task, nxt_request_app_link_t *req_app_link,
- int i)
-{
- int c;
- nxt_event_engine_t *engine;
+ nxt_port_new_port_handler(task, msg);
- c = nxt_atomic_fetch_add(&req_app_link->use_count, i);
+ port = msg->u.new_port;
- if (i < 0 && c == -i) {
- engine = req_app_link->work.data;
+ if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) {
+ nxt_router_greet_controller(task, msg->u.new_port);
+ }
- if (task->thread->engine == engine) {
- nxt_request_app_link_release(task, req_app_link);
+ if (port == NULL || port->type != NXT_PROCESS_APP) {
+ if (msg->port_msg.stream == 0) {
return;
}
- nxt_request_app_link_inc_use(req_app_link);
-
- req_app_link->work.handler = nxt_request_app_link_release_handler;
- req_app_link->work.task = &engine->task;
- req_app_link->work.next = NULL;
+ msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
- nxt_debug(task, "req_app_link stream #%uD post release to %p",
- req_app_link->stream, engine);
+ } else {
+ if (msg->fd[1] != -1) {
+ res = nxt_router_port_queue_map(task, port, msg->fd[1]);
+ if (nxt_slow_path(res != NXT_OK)) {
+ return;
+ }
- nxt_event_engine_post(engine, &req_app_link->work);
+ nxt_fd_close(msg->fd[1]);
+ msg->fd[1] = -1;
+ }
}
-}
-
-
-nxt_inline void
-nxt_request_app_link_error(nxt_task_t *task, nxt_app_t *app,
- nxt_request_app_link_t *req_app_link, const char *str)
-{
- req_app_link->app_port = NULL;
- req_app_link->err_code = 500;
- req_app_link->err_str = str;
-
- nxt_alert(task, "app \"%V\" internal error: %s on #%uD",
- &app->name, str, req_app_link->stream);
-}
-
-nxt_inline void
-nxt_request_app_link_pending(nxt_task_t *task, nxt_app_t *app,
- nxt_request_app_link_t *req_app_link)
-{
- nxt_queue_insert_tail(&req_app_link->app_port->pending_requests,
- &req_app_link->link_port_pending);
- nxt_queue_insert_tail(&app->pending, &req_app_link->link_app_pending);
+ if (msg->port_msg.stream != 0) {
+ nxt_port_rpc_handler(task, msg);
+ return;
+ }
- nxt_request_app_link_inc_use(req_app_link);
+ /*
+ * Port with "id == 0" is application 'main' port and it always
+ * should come with non-zero stream.
+ */
+ nxt_assert(port->id != 0);
- req_app_link->res_time = nxt_thread_monotonic_time(task->thread)
- + app->res_timeout;
+ /* Find 'main' app port and get app reference. */
+ rt = task->thread->runtime;
- nxt_debug(task, "req_app_link stream #%uD enqueue to pending_requests",
- req_app_link->stream);
-}
+ /*
+ * It is safe to access 'runtime->ports' hash because 'NEW_PORT'
+ * sent to main port (with id == 0) and processed in main thread.
+ */
+ main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0);
+ nxt_assert(main_app_port != NULL);
+ app = main_app_port->app;
+ nxt_assert(app != NULL);
-nxt_inline nxt_bool_t
-nxt_queue_chk_remove(nxt_queue_link_t *lnk)
-{
- if (lnk->next != NULL) {
- nxt_queue_remove(lnk);
+ nxt_thread_mutex_lock(&app->mutex);
- lnk->next = NULL;
+ /* TODO here should be find-and-add code because there can be
+ port waiters in port_hash */
+ nxt_port_hash_add(&app->port_hash, port);
+ app->port_hash_count++;
- return 1;
- }
+ nxt_thread_mutex_unlock(&app->mutex);
- return 0;
+ port->app = app;
+ port->main_app_port = main_app_port;
}
-nxt_inline void
-nxt_request_rpc_data_unlink(nxt_task_t *task,
- nxt_request_rpc_data_t *req_rpc_data)
+static void
+nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
- int ra_use_delta;
- nxt_request_app_link_t *req_app_link;
-
- if (req_rpc_data->app_port != NULL) {
- nxt_router_app_port_release(task, req_rpc_data->app_port,
- req_rpc_data->apr_action);
-
- req_rpc_data->app_port = NULL;
- }
-
- nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream);
-
- req_app_link = req_rpc_data->req_app_link;
- if (req_app_link != NULL) {
- req_rpc_data->req_app_link = NULL;
- req_app_link->req_rpc_data = NULL;
-
- ra_use_delta = 0;
-
- nxt_thread_mutex_lock(&req_rpc_data->app->mutex);
-
- if (req_app_link->link_app_requests.next == NULL
- && req_app_link->link_port_pending.next == NULL
- && req_app_link->link_app_pending.next == NULL
- && req_app_link->link_port_websockets.next == NULL)
- {
- req_app_link = NULL;
-
- } else {
- ra_use_delta -=
- nxt_queue_chk_remove(&req_app_link->link_app_requests)
- + nxt_queue_chk_remove(&req_app_link->link_port_pending)
- + nxt_queue_chk_remove(&req_app_link->link_port_websockets);
-
- nxt_queue_chk_remove(&req_app_link->link_app_pending);
- }
-
- nxt_thread_mutex_unlock(&req_rpc_data->app->mutex);
-
- if (req_app_link != NULL) {
- nxt_request_app_link_use(task, req_app_link, ra_use_delta);
- }
- }
-
- if (req_rpc_data->app != NULL) {
- nxt_router_app_use(task, req_rpc_data->app, -1);
+ void *p;
+ size_t size;
+ nxt_int_t ret;
+ nxt_router_temp_conf_t *tmcf;
- req_rpc_data->app = NULL;
+ tmcf = nxt_router_temp_conf(task);
+ if (nxt_slow_path(tmcf == NULL)) {
+ return;
}
- if (req_rpc_data->request != NULL) {
- req_rpc_data->request->timer_data = NULL;
-
- nxt_router_http_request_done(task, req_rpc_data->request);
-
- req_rpc_data->request->req_rpc_data = NULL;
- req_rpc_data->request = NULL;
+ if (nxt_slow_path(msg->fd[0] == -1)) {
+ nxt_alert(task, "conf_data_handler: invalid file shm fd");
+ return;
}
-}
+ if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) {
+ nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)",
+ (int) nxt_buf_mem_used_size(&msg->buf->mem));
-void
-nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
-{
- nxt_port_new_port_handler(task, msg);
-
- if (msg->u.new_port != NULL
- && msg->u.new_port->type == NXT_PROCESS_CONTROLLER)
- {
- nxt_router_greet_controller(task, msg->u.new_port);
- }
+ nxt_fd_close(msg->fd[0]);
+ msg->fd[0] = -1;
- if (msg->port_msg.stream == 0) {
return;
}
- if (msg->u.new_port == NULL
- || msg->u.new_port->type != NXT_PROCESS_APP)
- {
- msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
- }
-
- nxt_port_rpc_handler(task, msg);
-}
+ nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t));
+ p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0);
-void
-nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
-{
- nxt_int_t ret;
- nxt_buf_t *b;
- nxt_router_temp_conf_t *tmcf;
+ nxt_fd_close(msg->fd[0]);
+ msg->fd[0] = -1;
- tmcf = nxt_router_temp_conf(task);
- if (nxt_slow_path(tmcf == NULL)) {
+ if (nxt_slow_path(p == MAP_FAILED)) {
return;
}
- nxt_debug(task, "nxt_router_conf_data_handler(%O): %*s",
- nxt_buf_used_size(msg->buf),
- (size_t) nxt_buf_used_size(msg->buf), msg->buf->mem.pos);
+ nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p);
tmcf->router_conf->router = nxt_router;
tmcf->stream = msg->port_msg.stream;
@@ -922,20 +742,12 @@ nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
if (nxt_slow_path(tmcf->port == NULL)) {
nxt_alert(task, "reply port not found");
- return;
+ goto fail;
}
nxt_port_use(task, tmcf->port, 1);
- b = nxt_buf_chk_make_plain(tmcf->router_conf->mem_pool,
- msg->buf, msg->size);
- if (nxt_slow_path(b == NULL)) {
- nxt_router_conf_error(task, tmcf);
-
- return;
- }
-
- ret = nxt_router_conf_create(task, tmcf, b->mem.pos, b->mem.free);
+ ret = nxt_router_conf_create(task, tmcf, p, nxt_pointer_to(p, size));
if (nxt_fast_path(ret == NXT_OK)) {
nxt_router_conf_apply(task, tmcf, NULL);
@@ -943,6 +755,10 @@ nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
} else {
nxt_router_conf_error(task, tmcf);
}
+
+fail:
+
+ nxt_mem_munmap(p, size);
}
@@ -961,7 +777,7 @@ nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port,
}
-void
+static void
nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
nxt_event_engine_t *engine;
@@ -1027,11 +843,11 @@ nxt_router_temp_conf(nxt_task_t *task)
goto temp_fail;
}
- nxt_queue_init(&tmcf->deleting);
- nxt_queue_init(&tmcf->keeping);
- nxt_queue_init(&tmcf->updating);
- nxt_queue_init(&tmcf->pending);
- nxt_queue_init(&tmcf->creating);
+ nxt_queue_init(&creating_sockets);
+ nxt_queue_init(&pending_sockets);
+ nxt_queue_init(&updating_sockets);
+ nxt_queue_init(&keeping_sockets);
+ nxt_queue_init(&deleting_sockets);
#if (NXT_TLS)
nxt_queue_init(&tmcf->tls);
@@ -1065,8 +881,10 @@ nxt_router_app_can_start(nxt_app_t *app)
nxt_inline nxt_bool_t
nxt_router_app_need_start(nxt_app_t *app)
{
- return app->idle_processes + app->pending_processes
- < app->spare_processes;
+ return (app->active_requests
+ > app->port_hash_count + app->pending_processes)
+ || (app->spare_processes
+ > app->idle_processes + app->pending_processes);
}
@@ -1088,11 +906,11 @@ nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
tmcf = obj;
- qlk = nxt_queue_first(&tmcf->pending);
+ qlk = nxt_queue_first(&pending_sockets);
- if (qlk != nxt_queue_tail(&tmcf->pending)) {
+ if (qlk != nxt_queue_tail(&pending_sockets)) {
nxt_queue_remove(qlk);
- nxt_queue_insert_tail(&tmcf->creating, qlk);
+ nxt_queue_insert_tail(&creating_sockets, qlk);
skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
@@ -1148,10 +966,12 @@ nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
nxt_router_apps_sort(task, router, tmcf);
+ nxt_router_apps_hash_use(task, rtcf, 1);
+
nxt_router_engines_post(router, tmcf);
- nxt_queue_add(&router->sockets, &tmcf->updating);
- nxt_queue_add(&router->sockets, &tmcf->creating);
+ nxt_queue_add(&router->sockets, &updating_sockets);
+ nxt_queue_add(&router->sockets, &creating_sockets);
router->access_log = rtcf->access_log;
@@ -1181,11 +1001,39 @@ nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data)
static void
nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
{
- nxt_debug(task, "temp conf count:%D", tmcf->count);
+ uint32_t count;
+ nxt_router_conf_t *rtcf;
+ nxt_thread_spinlock_t *lock;
+
+ nxt_debug(task, "temp conf %p count: %D", tmcf, tmcf->count);
- if (--tmcf->count == 0) {
- nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
+ if (--tmcf->count > 0) {
+ return;
}
+
+ nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
+
+ rtcf = tmcf->router_conf;
+
+ lock = &rtcf->router->lock;
+
+ nxt_thread_spin_lock(lock);
+
+ count = rtcf->count;
+
+ nxt_thread_spin_unlock(lock);
+
+ nxt_debug(task, "rtcf %p: %D", rtcf, count);
+
+ if (count == 0) {
+ nxt_router_apps_hash_use(task, rtcf, -1);
+
+ nxt_router_access_log_release(task, lock, rtcf->access_log);
+
+ nxt_mp_destroy(rtcf->mem_pool);
+ }
+
+ nxt_mp_destroy(tmcf->mem_pool);
}
@@ -1202,8 +1050,8 @@ nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
nxt_alert(task, "failed to apply new conf");
- for (qlk = nxt_queue_first(&tmcf->creating);
- qlk != nxt_queue_tail(&tmcf->creating);
+ for (qlk = nxt_queue_first(&creating_sockets);
+ qlk != nxt_queue_tail(&creating_sockets);
qlk = nxt_queue_next(qlk))
{
skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
@@ -1217,22 +1065,12 @@ nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
}
nxt_queue_init(&new_socket_confs);
- nxt_queue_add(&new_socket_confs, &tmcf->updating);
- nxt_queue_add(&new_socket_confs, &tmcf->pending);
- nxt_queue_add(&new_socket_confs, &tmcf->creating);
+ nxt_queue_add(&new_socket_confs, &updating_sockets);
+ nxt_queue_add(&new_socket_confs, &pending_sockets);
+ nxt_queue_add(&new_socket_confs, &creating_sockets);
rtcf = tmcf->router_conf;
- nxt_http_routes_cleanup(task, rtcf->routes);
-
- nxt_queue_each(skcf, &new_socket_confs, nxt_socket_conf_t, link) {
-
- if (skcf->action != NULL) {
- nxt_http_action_cleanup(task, skcf->action);
- }
-
- } nxt_queue_loop;
-
nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
nxt_router_app_unlink(task, app);
@@ -1241,8 +1079,8 @@ nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
router = rtcf->router;
- nxt_queue_add(&router->sockets, &tmcf->keeping);
- nxt_queue_add(&router->sockets, &tmcf->deleting);
+ nxt_queue_add(&router->sockets, &keeping_sockets);
+ nxt_queue_add(&router->sockets, &deleting_sockets);
nxt_queue_add(&router->apps, &tmcf->previous);
@@ -1253,6 +1091,8 @@ nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
nxt_mp_destroy(rtcf->mem_pool);
nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR);
+
+ nxt_mp_destroy(tmcf->mem_pool);
}
@@ -1465,6 +1305,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_app_t *app, *prev;
nxt_str_t *t, *s, *targets;
nxt_uint_t n, i;
+ nxt_port_t *port;
nxt_router_t *router;
nxt_app_joint_t *app_joint;
nxt_conf_value_t *conf, *http, *value, *websocket;
@@ -1569,6 +1410,12 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_queue_remove(&prev->link);
nxt_queue_insert_tail(&tmcf->previous, &prev->link);
+
+ ret = nxt_router_apps_hash_add(tmcf->router_conf, prev);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ goto fail;
+ }
+
continue;
}
@@ -1679,8 +1526,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_queue_init(&app->ports);
nxt_queue_init(&app->spare_ports);
nxt_queue_init(&app->idle_ports);
- nxt_queue_init(&app->requests);
- nxt_queue_init(&app->pending);
+ nxt_queue_init(&app->ack_waiting_req);
app->name.length = name.length;
nxt_memcpy(app->name.start, name.start, name.length);
@@ -1693,7 +1539,6 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
app->timeout = apcf.timeout;
app->res_timeout = apcf.res_timeout * 1000000;
app->idle_timeout = apcf.idle_timeout;
- app->max_pending_responses = 2;
app->max_requests = apcf.requests;
app->targets = targets;
@@ -1708,6 +1553,11 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_queue_insert_tail(&tmcf->apps, &app->link);
+ ret = nxt_router_apps_hash_add(tmcf->router_conf, app);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ goto app_fail;
+ }
+
nxt_router_app_use(task, app, 1);
app->joint = app_joint;
@@ -1724,6 +1574,31 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
app_joint->free_app_work.handler = nxt_router_free_app;
app_joint->free_app_work.task = &engine->task;
app_joint->free_app_work.obj = app_joint;
+
+ port = nxt_port_new(task, (nxt_port_id_t) -1, nxt_pid,
+ NXT_PROCESS_APP);
+ if (nxt_slow_path(port == NULL)) {
+ return NXT_ERROR;
+ }
+
+ ret = nxt_port_socket_init(task, port, 0);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ nxt_port_use(task, port, -1);
+ return NXT_ERROR;
+ }
+
+ ret = nxt_router_app_queue_init(task, port);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ nxt_port_use(task, port, -1);
+ return NXT_ERROR;
+ }
+
+ nxt_port_write_enable(task, port);
+ port->app = app;
+
+ app->shared_port = port;
+
+ nxt_thread_mutex_create(&app->outgoing.mutex);
}
}
@@ -1857,7 +1732,8 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
/* COMPATIBILITY: listener application. */
} else if (lscf.application.length > 0) {
- skcf->action = nxt_http_pass_application(task, tmcf,
+ skcf->action = nxt_http_pass_application(task,
+ tmcf->router_conf,
&lscf.application);
}
}
@@ -1902,7 +1778,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
tmcf->router_conf->access_log = access_log;
}
- nxt_queue_add(&tmcf->deleting, &router->sockets);
+ nxt_queue_add(&deleting_sockets, &router->sockets);
nxt_queue_init(&router->sockets);
return NXT_OK;
@@ -2023,20 +1899,182 @@ nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
}
-void
-nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, nxt_str_t *name,
+static nxt_int_t
+nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port)
+{
+ void *mem;
+ nxt_int_t fd;
+
+ fd = nxt_shm_open(task, sizeof(nxt_app_queue_t));
+ if (nxt_slow_path(fd == -1)) {
+ return NXT_ERROR;
+ }
+
+ mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t),
+ PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
+ if (nxt_slow_path(mem == MAP_FAILED)) {
+ nxt_fd_close(fd);
+
+ return NXT_ERROR;
+ }
+
+ nxt_app_queue_init(mem);
+
+ port->queue_fd = fd;
+ port->queue = mem;
+
+ return NXT_OK;
+}
+
+
+static nxt_int_t
+nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port)
+{
+ void *mem;
+ nxt_int_t fd;
+
+ fd = nxt_shm_open(task, sizeof(nxt_port_queue_t));
+ if (nxt_slow_path(fd == -1)) {
+ return NXT_ERROR;
+ }
+
+ mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
+ PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
+ if (nxt_slow_path(mem == MAP_FAILED)) {
+ nxt_fd_close(fd);
+
+ return NXT_ERROR;
+ }
+
+ nxt_port_queue_init(mem);
+
+ port->queue_fd = fd;
+ port->queue = mem;
+
+ return NXT_OK;
+}
+
+
+static nxt_int_t
+nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd)
+{
+ void *mem;
+
+ nxt_assert(fd != -1);
+
+ mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
+ PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
+ if (nxt_slow_path(mem == MAP_FAILED)) {
+
+ return NXT_ERROR;
+ }
+
+ port->queue = mem;
+
+ return NXT_OK;
+}
+
+
+static const nxt_lvlhsh_proto_t nxt_router_apps_hash_proto nxt_aligned(64) = {
+ NXT_LVLHSH_DEFAULT,
+ nxt_router_apps_hash_test,
+ nxt_mp_lvlhsh_alloc,
+ nxt_mp_lvlhsh_free,
+};
+
+
+static nxt_int_t
+nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
+{
+ nxt_app_t *app;
+
+ app = data;
+
+ return nxt_strstr_eq(&lhq->key, &app->name) ? NXT_OK : NXT_DECLINED;
+}
+
+
+static nxt_int_t
+nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, nxt_app_t *app)
+{
+ nxt_lvlhsh_query_t lhq;
+
+ lhq.key_hash = nxt_djb_hash(app->name.start, app->name.length);
+ lhq.replace = 0;
+ lhq.key = app->name;
+ lhq.value = app;
+ lhq.proto = &nxt_router_apps_hash_proto;
+ lhq.pool = rtcf->mem_pool;
+
+ switch (nxt_lvlhsh_insert(&rtcf->apps_hash, &lhq)) {
+
+ case NXT_OK:
+ return NXT_OK;
+
+ case NXT_DECLINED:
+ nxt_thread_log_alert("router app hash adding failed: "
+ "\"%V\" is already in hash", &lhq.key);
+ /* Fall through. */
+ default:
+ return NXT_ERROR;
+ }
+}
+
+
+static nxt_app_t *
+nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, nxt_str_t *name)
+{
+ nxt_lvlhsh_query_t lhq;
+
+ lhq.key_hash = nxt_djb_hash(name->start, name->length);
+ lhq.key = *name;
+ lhq.proto = &nxt_router_apps_hash_proto;
+
+ if (nxt_lvlhsh_find(&rtcf->apps_hash, &lhq) != NXT_OK) {
+ return NULL;
+ }
+
+ return lhq.value;
+}
+
+
+static void
+nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, int i)
+{
+ nxt_app_t *app;
+ nxt_lvlhsh_each_t lhe;
+
+ nxt_lvlhsh_each_init(&lhe, &nxt_router_apps_hash_proto);
+
+ for ( ;; ) {
+ app = nxt_lvlhsh_each(&rtcf->apps_hash, &lhe);
+
+ if (app == NULL) {
+ break;
+ }
+
+ nxt_router_app_use(task, app, i);
+ }
+}
+
+
+
+nxt_int_t
+nxt_router_listener_application(nxt_router_conf_t *rtcf, nxt_str_t *name,
nxt_http_action_t *action)
{
nxt_app_t *app;
- app = nxt_router_app_find(&tmcf->apps, name);
+ app = nxt_router_apps_hash_get(rtcf, name);
if (app == NULL) {
- app = nxt_router_app_find(&tmcf->previous, name);
+ return NXT_DECLINED;
}
action->u.application = app;
action->handler = nxt_http_application_handler;
+
+ return NXT_OK;
}
@@ -2141,15 +2179,15 @@ nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
nskcf->listen = skcf->listen;
nxt_queue_remove(qlk);
- nxt_queue_insert_tail(&tmcf->keeping, qlk);
+ nxt_queue_insert_tail(&keeping_sockets, qlk);
- nxt_queue_insert_tail(&tmcf->updating, &nskcf->link);
+ nxt_queue_insert_tail(&updating_sockets, &nskcf->link);
return NXT_OK;
}
}
- nxt_queue_insert_tail(&tmcf->pending, &nskcf->link);
+ nxt_queue_insert_tail(&pending_sockets, &nskcf->link);
return NXT_DECLINED;
}
@@ -2182,6 +2220,8 @@ nxt_router_listen_socket_rpc_create(nxt_task_t *task,
goto fail;
}
+ b->completion_handler = nxt_router_dummy_buf_completion;
+
b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size);
rt = task->thread->runtime;
@@ -2222,7 +2262,7 @@ nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
rpc = data;
- s = msg->fd;
+ s = msg->fd[0];
ret = nxt_socket_nonblocking(task, s);
if (nxt_slow_path(ret != NXT_OK)) {
@@ -2360,7 +2400,7 @@ nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
goto fail;
}
- tlscf->chain_file = msg->fd;
+ tlscf->chain_file = msg->fd[0];
ret = task->thread->runtime->tls->server_init(task, tlscf);
if (nxt_slow_path(ret != NXT_OK)) {
@@ -2410,6 +2450,8 @@ nxt_router_app_rpc_create(nxt_task_t *task,
goto fail;
}
+ b->completion_handler = nxt_router_dummy_buf_completion;
+
nxt_buf_cpystr(b, &app->name);
*b->mem.free++ = '\0';
nxt_buf_cpystr(b, &app->conf);
@@ -2457,7 +2499,13 @@ nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
app = rpc->app;
port = msg->u.new_port;
+
+ nxt_assert(port != NULL);
+ nxt_assert(port->type == NXT_PROCESS_APP);
+ nxt_assert(port->id == 0);
+
port->app = app;
+ port->main_app_port = port;
app->pending_processes--;
app->processes++;
@@ -2468,10 +2516,18 @@ nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_queue_insert_tail(&app->ports, &port->app_link);
nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
+ nxt_debug(task, "app '%V' move new port %PI:%d to spare_ports",
+ &app->name, port->pid, port->id);
+
+ nxt_port_hash_add(&app->port_hash, port);
+ app->port_hash_count++;
+
port->idle_start = 0;
nxt_port_inc_use(port);
+ nxt_router_app_shared_port_send(task, port);
+
nxt_work_queue_add(&engine->fast_work_queue,
nxt_router_conf_apply, task, rpc->temp_conf, NULL);
}
@@ -2577,13 +2633,13 @@ nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
{
nxt_int_t ret;
- ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating,
+ ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
nxt_router_listen_socket_create);
if (nxt_slow_path(ret != NXT_OK)) {
return ret;
}
- ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating,
+ ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
nxt_router_listen_socket_create);
if (nxt_slow_path(ret != NXT_OK)) {
return ret;
@@ -2599,19 +2655,19 @@ nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
{
nxt_int_t ret;
- ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating,
+ ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
nxt_router_listen_socket_create);
if (nxt_slow_path(ret != NXT_OK)) {
return ret;
}
- ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating,
+ ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
nxt_router_listen_socket_update);
if (nxt_slow_path(ret != NXT_OK)) {
return ret;
}
- ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting);
+ ret = nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
if (nxt_slow_path(ret != NXT_OK)) {
return ret;
}
@@ -2631,12 +2687,12 @@ nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
return ret;
}
- ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->updating);
+ ret = nxt_router_engine_joints_delete(tmcf, recf, &updating_sockets);
if (nxt_slow_path(ret != NXT_OK)) {
return ret;
}
- return nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting);
+ return nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
}
@@ -2874,10 +2930,11 @@ nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs)
static nxt_port_handlers_t nxt_router_app_port_handlers = {
- .rpc_error = nxt_port_rpc_handler,
- .mmap = nxt_port_mmap_handler,
- .data = nxt_port_rpc_handler,
- .oosm = nxt_router_oosm_handler,
+ .rpc_error = nxt_port_rpc_handler,
+ .mmap = nxt_port_mmap_handler,
+ .data = nxt_port_rpc_handler,
+ .oosm = nxt_router_oosm_handler,
+ .req_headers_ack = nxt_port_rpc_handler,
};
@@ -2887,6 +2944,7 @@ nxt_router_thread_start(void *data)
nxt_int_t ret;
nxt_port_t *port;
nxt_task_t *task;
+ nxt_work_t *work;
nxt_thread_t *thread;
nxt_thread_link_t *link;
nxt_event_engine_t *engine;
@@ -2927,15 +2985,53 @@ nxt_router_thread_start(void *data)
return;
}
+ ret = nxt_router_port_queue_init(task, port);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ nxt_port_use(task, port, -1);
+ return;
+ }
+
engine->port = port;
nxt_port_enable(task, port, &nxt_router_app_port_handlers);
+ work = nxt_zalloc(sizeof(nxt_work_t));
+ if (nxt_slow_path(work == NULL)) {
+ return;
+ }
+
+ work->handler = nxt_router_rt_add_port;
+ work->task = link->work.task;
+ work->obj = work;
+ work->data = port;
+
+ nxt_event_engine_post(link->work.task->thread->engine, work);
+
nxt_event_engine_start(engine);
}
static void
+nxt_router_rt_add_port(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_int_t res;
+ nxt_port_t *port;
+ nxt_runtime_t *rt;
+
+ rt = task->thread->runtime;
+ port = data;
+
+ nxt_free(obj);
+
+ res = nxt_port_hash_add(&rt->ports, port);
+
+ if (nxt_fast_path(res == NXT_OK)) {
+ nxt_port_use(task, port, 1);
+ }
+}
+
+
+static void
nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data)
{
nxt_joint_job_t *job;
@@ -3146,7 +3242,15 @@ nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev,
nxt_debug(task, "listen event count: %D", lev->count);
+ engine = task->thread->engine;
+
if (--lev->count == 0) {
+ if (lev->next != NULL) {
+ nxt_sockaddr_cache_free(engine, lev->next);
+
+ nxt_conn_free(task, lev->next);
+ }
+
nxt_free(lev);
}
@@ -3154,8 +3258,6 @@ nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev,
nxt_router_conf_release(task, joint);
}
- engine = task->thread->engine;
-
if (engine->shutdown && nxt_queue_is_empty(&engine->joints)) {
nxt_thread_exit(task->thread);
}
@@ -3205,25 +3307,18 @@ nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
nxt_thread_spin_unlock(lock);
- if (skcf != NULL) {
- if (skcf->action != NULL) {
- nxt_http_action_cleanup(task, skcf->action);
- }
-
#if (NXT_TLS)
- if (skcf->tls != NULL) {
- task->thread->runtime->tls->server_free(task, skcf->tls);
- }
-#endif
+ if (skcf != NULL && skcf->tls != NULL) {
+ task->thread->runtime->tls->server_free(task, skcf->tls);
}
+#endif
/* TODO remove engine->port */
- /* TODO excude from connected ports */
if (rtcf != NULL) {
nxt_debug(task, "old router conf is destroyed");
- nxt_http_routes_cleanup(task, rtcf->routes);
+ nxt_router_apps_hash_use(task, rtcf, -1);
nxt_router_access_log_release(task, lock, rtcf->access_log);
@@ -3380,6 +3475,8 @@ nxt_router_access_log_open(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
goto fail;
}
+ b->completion_handler = nxt_router_dummy_buf_completion;
+
nxt_buf_cpystr(b, &access_log->path);
*b->mem.free++ = '\0';
@@ -3422,7 +3519,7 @@ nxt_router_access_log_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
access_log = tmcf->router_conf->access_log;
- access_log->fd = msg->fd;
+ access_log->fd = msg->fd[0];
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
nxt_router_conf_apply, task, tmcf, NULL);
@@ -3474,7 +3571,7 @@ typedef struct {
} nxt_router_access_log_reopen_t;
-void
+static void
nxt_router_access_log_reopen_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
nxt_mp_t *mp;
@@ -3571,13 +3668,13 @@ nxt_router_access_log_reopen_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
if (access_log == nxt_router->access_log) {
- if (nxt_slow_path(dup2(msg->fd, access_log->fd) == -1)) {
+ if (nxt_slow_path(dup2(msg->fd[0], access_log->fd) == -1)) {
nxt_alert(task, "dup2(%FD, %FD) failed %E",
- msg->fd, access_log->fd, nxt_errno);
+ msg->fd[0], access_log->fd, nxt_errno);
}
}
- nxt_fd_close(msg->fd);
+ nxt_fd_close(msg->fd[0]);
nxt_mp_release(reopen->mem_pool);
}
@@ -3633,22 +3730,17 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
void *data)
{
nxt_int_t ret;
+ nxt_app_t *app;
nxt_buf_t *b, *next;
nxt_port_t *app_port;
nxt_unit_field_t *f;
nxt_http_field_t *field;
nxt_http_request_t *r;
nxt_unit_response_t *resp;
- nxt_request_app_link_t *req_app_link;
nxt_request_rpc_data_t *req_rpc_data;
- b = msg->buf;
req_rpc_data = data;
- if (msg->size == 0) {
- b = NULL;
- }
-
r = req_rpc_data->request;
if (nxt_slow_path(r == NULL)) {
return;
@@ -3659,19 +3751,32 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
return;
}
+ app = req_rpc_data->app;
+ nxt_assert(app != NULL);
+
+ if (msg->port_msg.type == _NXT_PORT_MSG_REQ_HEADERS_ACK) {
+ nxt_router_req_headers_ack_handler(task, msg, req_rpc_data);
+
+ return;
+ }
+
+ b = (msg->size == 0) ? NULL : msg->buf;
+
if (msg->port_msg.last != 0) {
nxt_debug(task, "router data create last buf");
nxt_buf_chain_add(&b, nxt_http_buf_last(r));
+ req_rpc_data->rpc_cancel = 0;
+ req_rpc_data->apr_action = NXT_APR_GOT_RESPONSE;
+
nxt_request_rpc_data_unlink(task, req_rpc_data);
} else {
- if (req_rpc_data->app != NULL && req_rpc_data->app->timeout != 0) {
+ if (app->timeout != 0) {
r->timer.handler = nxt_router_app_timeout;
r->timer_data = req_rpc_data;
- nxt_timer_add(task->thread->engine, &r->timer,
- req_rpc_data->app->timeout);
+ nxt_timer_add(task->thread->engine, &r->timer, app->timeout);
}
}
@@ -3767,39 +3872,21 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
if (r->websocket_handshake
&& r->status == NXT_HTTP_SWITCHING_PROTOCOLS)
{
- req_app_link = nxt_request_app_link_alloc(task,
- req_rpc_data->req_app_link,
- req_rpc_data);
- if (nxt_slow_path(req_app_link == NULL)) {
- goto fail;
- }
-
- app_port = req_app_link->app_port;
-
- if (app_port == NULL && req_rpc_data->app_port != NULL) {
- req_app_link->app_port = req_rpc_data->app_port;
- app_port = req_app_link->app_port;
- req_app_link->apr_action = req_rpc_data->apr_action;
-
- req_rpc_data->app_port = NULL;
- }
-
+ app_port = req_rpc_data->app_port;
if (nxt_slow_path(app_port == NULL)) {
goto fail;
}
- nxt_thread_mutex_lock(&req_rpc_data->app->mutex);
+ nxt_thread_mutex_lock(&app->mutex);
- nxt_queue_insert_tail(&app_port->active_websockets,
- &req_app_link->link_port_websockets);
+ app_port->main_app_port->active_websockets++;
- nxt_thread_mutex_unlock(&req_rpc_data->app->mutex);
+ nxt_thread_mutex_unlock(&app->mutex);
nxt_router_app_port_release(task, app_port, NXT_APR_UPGRADE);
- req_app_link->apr_action = NXT_APR_CLOSE;
+ req_rpc_data->apr_action = NXT_APR_CLOSE;
- nxt_debug(task, "req_app_link stream #%uD upgrade",
- req_app_link->stream);
+ nxt_debug(task, "stream #%uD upgrade", req_rpc_data->stream);
r->state = &nxt_http_websocket;
@@ -3818,6 +3905,133 @@ fail:
}
+static void
+nxt_router_req_headers_ack_handler(nxt_task_t *task,
+ nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data)
+{
+ int res;
+ nxt_app_t *app;
+ nxt_bool_t start_process, unlinked;
+ nxt_port_t *app_port, *main_app_port, *idle_port;
+ nxt_queue_link_t *idle_lnk;
+ nxt_http_request_t *r;
+
+ nxt_debug(task, "stream #%uD: got ack from %PI:%d",
+ req_rpc_data->stream,
+ msg->port_msg.pid, msg->port_msg.reply_port);
+
+ nxt_port_rpc_ex_set_peer(task, msg->port, req_rpc_data,
+ msg->port_msg.pid);
+
+ app = req_rpc_data->app;
+ r = req_rpc_data->request;
+
+ start_process = 0;
+ unlinked = 0;
+
+ nxt_thread_mutex_lock(&app->mutex);
+
+ if (r->app_link.next != NULL) {
+ nxt_queue_remove(&r->app_link);
+ r->app_link.next = NULL;
+
+ unlinked = 1;
+ }
+
+ app_port = nxt_port_hash_find(&app->port_hash, msg->port_msg.pid,
+ msg->port_msg.reply_port);
+ if (nxt_slow_path(app_port == NULL)) {
+ nxt_thread_mutex_unlock(&app->mutex);
+
+ nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
+
+ if (unlinked) {
+ nxt_mp_release(r->mem_pool);
+ }
+
+ return;
+ }
+
+ main_app_port = app_port->main_app_port;
+
+ if (nxt_queue_chk_remove(&main_app_port->idle_link)) {
+ app->idle_processes--;
+
+ nxt_debug(task, "app '%V' move port %PI:%d out of %s (ack)",
+ &app->name, main_app_port->pid, main_app_port->id,
+ (main_app_port->idle_start ? "idle_ports" : "spare_ports"));
+
+ /* Check port was in 'spare_ports' using idle_start field. */
+ if (main_app_port->idle_start == 0
+ && app->idle_processes >= app->spare_processes)
+ {
+ /*
+ * If there is a vacant space in spare ports,
+ * move the last idle to spare_ports.
+ */
+ nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
+
+ idle_lnk = nxt_queue_last(&app->idle_ports);
+ idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link);
+ nxt_queue_remove(idle_lnk);
+
+ nxt_queue_insert_tail(&app->spare_ports, idle_lnk);
+
+ idle_port->idle_start = 0;
+
+ nxt_debug(task, "app '%V' move port %PI:%d from idle_ports "
+ "to spare_ports",
+ &app->name, idle_port->pid, idle_port->id);
+ }
+
+ if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) {
+ app->pending_processes++;
+ start_process = 1;
+ }
+ }
+
+ main_app_port->active_requests++;
+
+ nxt_port_inc_use(app_port);
+
+ nxt_thread_mutex_unlock(&app->mutex);
+
+ if (unlinked) {
+ nxt_mp_release(r->mem_pool);
+ }
+
+ if (start_process) {
+ nxt_router_start_app_process(task, app);
+ }
+
+ nxt_port_use(task, req_rpc_data->app_port, -1);
+
+ req_rpc_data->app_port = app_port;
+
+ if (req_rpc_data->msg_info.body_fd != -1) {
+ nxt_debug(task, "stream #%uD: send body fd %d", req_rpc_data->stream,
+ req_rpc_data->msg_info.body_fd);
+
+ lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET);
+
+ res = nxt_port_socket_write(task, app_port, NXT_PORT_MSG_REQ_BODY,
+ req_rpc_data->msg_info.body_fd,
+ req_rpc_data->stream,
+ task->thread->engine->port->id, NULL);
+
+ if (nxt_slow_path(res != NXT_OK)) {
+ nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
+ }
+ }
+
+ if (app->timeout != 0) {
+ r->timer.handler = nxt_router_app_timeout;
+ r->timer_data = req_rpc_data;
+ nxt_timer_add(task->thread->engine, &r->timer, app->timeout);
+ }
+}
+
+
static const nxt_http_request_state_t nxt_http_request_send_state
nxt_aligned(64) =
{
@@ -3846,42 +4060,14 @@ static void
nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
void *data)
{
- nxt_int_t res;
- nxt_port_t *port;
- nxt_bool_t cancelled;
- nxt_request_app_link_t *req_app_link;
nxt_request_rpc_data_t *req_rpc_data;
req_rpc_data = data;
- req_app_link = req_rpc_data->req_app_link;
-
- if (req_app_link != NULL) {
- cancelled = nxt_router_msg_cancel(task, &req_app_link->msg_info,
- req_app_link->stream);
- if (cancelled) {
- res = nxt_router_app_port(task, req_rpc_data->app, req_app_link);
+ req_rpc_data->rpc_cancel = 0;
- if (res == NXT_OK) {
- port = req_app_link->app_port;
-
- if (nxt_slow_path(port == NULL)) {
- nxt_log(task, NXT_LOG_ERR,
- "port is NULL in cancelled req_app_link");
- return;
- }
-
- nxt_port_rpc_ex_set_peer(task, task->thread->engine->port,
- req_rpc_data, port->pid);
-
- nxt_router_app_prepare_request(task, req_app_link);
- }
-
- msg->port_msg.last = 0;
-
- return;
- }
- }
+ /* TODO cancel message and return if cancelled. */
+ // nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream);
if (req_rpc_data->request != NULL) {
nxt_http_request_error(task, req_rpc_data->request,
@@ -3905,6 +4091,8 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_assert(app_joint != NULL);
nxt_assert(port != NULL);
+ nxt_assert(port->type == NXT_PROCESS_APP);
+ nxt_assert(port->id == 0);
app = app_joint->app;
@@ -3919,6 +4107,7 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
}
port->app = app;
+ port->main_app_port = port;
nxt_thread_mutex_lock(&app->mutex);
@@ -3926,24 +4115,62 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
app->pending_processes--;
app->processes++;
+ nxt_port_hash_add(&app->port_hash, port);
+ app->port_hash_count++;
nxt_thread_mutex_unlock(&app->mutex);
nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d",
&app->name, port->pid, app->processes, app->pending_processes);
+ nxt_router_app_shared_port_send(task, port);
+
nxt_router_app_port_release(task, port, NXT_APR_NEW_PORT);
}
+static nxt_int_t
+nxt_router_app_shared_port_send(nxt_task_t *task, nxt_port_t *app_port)
+{
+ nxt_buf_t *b;
+ nxt_port_t *port;
+ nxt_port_msg_new_port_t *msg;
+
+ b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
+ sizeof(nxt_port_data_t));
+ if (nxt_slow_path(b == NULL)) {
+ return NXT_ERROR;
+ }
+
+ port = app_port->app->shared_port;
+
+ nxt_debug(task, "send port %FD to process %PI",
+ port->pair[0], app_port->pid);
+
+ b->mem.free += sizeof(nxt_port_msg_new_port_t);
+ msg = (nxt_port_msg_new_port_t *) b->mem.pos;
+
+ msg->id = port->id;
+ msg->pid = port->pid;
+ msg->max_size = port->max_size;
+ msg->max_share = port->max_share;
+ msg->type = port->type;
+
+ return nxt_port_socket_write2(task, app_port,
+ NXT_PORT_MSG_NEW_PORT,
+ port->pair[0], port->queue_fd,
+ 0, 0, b);
+}
+
+
static void
nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
void *data)
{
- nxt_app_t *app;
- nxt_app_joint_t *app_joint;
- nxt_queue_link_t *lnk;
- nxt_request_app_link_t *req_app_link;
+ nxt_app_t *app;
+ nxt_app_joint_t *app_joint;
+ nxt_queue_link_t *link;
+ nxt_http_request_t *r;
app_joint = data;
@@ -3961,117 +4188,49 @@ nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_debug(task, "app '%V' %p start error", &app->name, app);
+ link = NULL;
+
nxt_thread_mutex_lock(&app->mutex);
nxt_assert(app->pending_processes != 0);
app->pending_processes--;
- if (!nxt_queue_is_empty(&app->requests)) {
- lnk = nxt_queue_last(&app->requests);
- nxt_queue_remove(lnk);
- lnk->next = NULL;
+ if (app->processes == 0 && !nxt_queue_is_empty(&app->ack_waiting_req)) {
+ link = nxt_queue_first(&app->ack_waiting_req);
- req_app_link = nxt_queue_link_data(lnk, nxt_request_app_link_t,
- link_app_requests);
-
- } else {
- req_app_link = NULL;
+ nxt_queue_remove(link);
+ link->next = NULL;
}
nxt_thread_mutex_unlock(&app->mutex);
- if (req_app_link != NULL) {
- nxt_debug(task, "app '%V' %p abort next stream #%uD",
- &app->name, app, req_app_link->stream);
-
- nxt_request_app_link_error(task, app, req_app_link,
- "Failed to start application process");
- nxt_request_app_link_use(task, req_app_link, -1);
- }
-}
-
-nxt_inline nxt_port_t *
-nxt_router_app_get_port_for_quit(nxt_app_t *app);
-
-void
-nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i)
-{
- int c;
-
- c = nxt_atomic_fetch_add(&app->use_count, i);
-
- if (i < 0 && c == -i) {
-
- if (task->thread->engine != app->engine) {
- nxt_event_engine_post(app->engine, &app->joint->free_app_work);
-
- } else {
- nxt_router_free_app(task, app->joint, NULL);
- }
- }
-}
-
-
-nxt_inline nxt_bool_t
-nxt_router_app_first_port_busy(nxt_app_t *app)
-{
- nxt_port_t *port;
- nxt_queue_link_t *lnk;
-
- lnk = nxt_queue_first(&app->ports);
- port = nxt_queue_link_data(lnk, nxt_port_t, app_link);
-
- return port->app_pending_responses > 0;
-}
-
-
-nxt_inline nxt_port_t *
-nxt_router_pop_first_port(nxt_app_t *app)
-{
- nxt_port_t *port;
- nxt_queue_link_t *lnk;
-
- lnk = nxt_queue_first(&app->ports);
- nxt_queue_remove(lnk);
+ while (link != NULL) {
+ r = nxt_container_of(link, nxt_http_request_t, app_link);
- port = nxt_queue_link_data(lnk, nxt_port_t, app_link);
+ nxt_event_engine_post(r->engine, &r->err_work);
- port->app_pending_responses++;
+ link = NULL;
- if (nxt_queue_chk_remove(&port->idle_link)) {
- app->idle_processes--;
-
- if (port->idle_start == 0) {
- nxt_assert(app->idle_processes < app->spare_processes);
+ nxt_thread_mutex_lock(&app->mutex);
- } else {
- nxt_assert(app->idle_processes >= app->spare_processes);
+ if (app->processes == 0 && app->pending_processes == 0
+ && !nxt_queue_is_empty(&app->ack_waiting_req))
+ {
+ link = nxt_queue_first(&app->ack_waiting_req);
- port->idle_start = 0;
+ nxt_queue_remove(link);
+ link->next = NULL;
}
- }
- if ((app->max_pending_responses == 0
- || port->app_pending_responses < app->max_pending_responses)
- && (app->max_requests == 0
- || port->app_responses + port->app_pending_responses
- < app->max_requests))
- {
- nxt_queue_insert_tail(&app->ports, lnk);
-
- nxt_port_inc_use(port);
-
- } else {
- lnk->next = NULL;
+ nxt_thread_mutex_unlock(&app->mutex);
}
-
- return port;
}
+
nxt_inline nxt_port_t *
-nxt_router_app_get_port_for_quit(nxt_app_t *app)
+nxt_router_app_get_port_for_quit(nxt_task_t *task, nxt_app_t *app)
{
nxt_port_t *port;
@@ -4081,19 +4240,20 @@ nxt_router_app_get_port_for_quit(nxt_app_t *app)
nxt_queue_each(port, &app->ports, nxt_port_t, app_link) {
- if (port->app_pending_responses > 0) {
- port = NULL;
-
- continue;
- }
-
/* Caller is responsible to decrease port use count. */
nxt_queue_chk_remove(&port->app_link);
if (nxt_queue_chk_remove(&port->idle_link)) {
app->idle_processes--;
+
+ nxt_debug(task, "app '%V' move port %PI:%d out of %s for quit",
+ &app->name, port->pid, port->id,
+ (port->idle_start ? "idle_ports" : "spare_ports"));
}
+ nxt_port_hash_remove(&app->port_hash, port);
+ app->port_hash_count--;
+
port->app = NULL;
app->processes--;
@@ -4108,41 +4268,32 @@ nxt_router_app_get_port_for_quit(nxt_app_t *app)
static void
-nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app)
+nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i)
{
- nxt_debug(task, "app '%V' %p unlink", &app->name, app);
+ int c;
- nxt_queue_remove(&app->link);
+ c = nxt_atomic_fetch_add(&app->use_count, i);
- nxt_router_app_use(task, app, -1);
+ if (i < 0 && c == -i) {
+
+ if (task->thread->engine != app->engine) {
+ nxt_event_engine_post(app->engine, &app->joint->free_app_work);
+
+ } else {
+ nxt_router_free_app(task, app->joint, NULL);
+ }
+ }
}
static void
-nxt_router_app_process_request(nxt_task_t *task, void *obj, void *data)
+nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app)
{
- nxt_request_app_link_t *req_app_link;
-
- req_app_link = data;
-
-#if (NXT_DEBUG)
- {
- nxt_app_t *app;
-
- app = obj;
-
- nxt_assert(app != NULL);
- nxt_assert(req_app_link != NULL);
- nxt_assert(req_app_link->app_port != NULL);
-
- nxt_debug(task, "app '%V' %p process next stream #%uD",
- &app->name, app, req_app_link->stream);
- }
-#endif
+ nxt_debug(task, "app '%V' %p unlink", &app->name, app);
- nxt_router_app_prepare_request(task, req_app_link);
+ nxt_queue_remove(&app->link);
- nxt_request_app_link_use(task, req_app_link, -1);
+ nxt_router_app_use(task, app, -1);
}
@@ -4150,40 +4301,33 @@ static void
nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
nxt_apr_action_t action)
{
- int inc_use;
- uint32_t dec_pending, got_response;
- nxt_app_t *app;
- nxt_bool_t port_unchained;
- nxt_bool_t send_quit, cancelled, adjust_idle_timer;
- nxt_queue_link_t *lnk;
- nxt_request_app_link_t *req_app_link, *pending_ra, *re_ra;
- nxt_port_select_state_t state;
+ int inc_use;
+ uint32_t got_response, dec_requests;
+ nxt_app_t *app;
+ nxt_bool_t port_unchained, send_quit, adjust_idle_timer;
+ nxt_port_t *main_app_port;
nxt_assert(port != NULL);
nxt_assert(port->app != NULL);
- req_app_link = NULL;
-
app = port->app;
inc_use = 0;
- dec_pending = 0;
got_response = 0;
+ dec_requests = 0;
switch (action) {
case NXT_APR_NEW_PORT:
break;
case NXT_APR_REQUEST_FAILED:
- dec_pending = 1;
+ dec_requests = 1;
inc_use = -1;
break;
case NXT_APR_GOT_RESPONSE:
- dec_pending = 1;
got_response = 1;
inc_use = -1;
break;
case NXT_APR_UPGRADE:
- dec_pending = 1;
got_response = 1;
break;
case NXT_APR_CLOSE:
@@ -4191,120 +4335,49 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
break;
}
- nxt_thread_mutex_lock(&app->mutex);
-
- port->app_pending_responses -= dec_pending;
- port->app_responses += got_response;
+ nxt_debug(task, "app '%V' release port %PI:%d: %d %d", &app->name,
+ port->pid, port->id,
+ (int) inc_use, (int) got_response);
- if (port->pair[1] != -1
- && (app->max_pending_responses == 0
- || port->app_pending_responses < app->max_pending_responses)
- && (app->max_requests == 0
- || port->app_responses + port->app_pending_responses
- < app->max_requests))
- {
- if (port->app_link.next == NULL) {
- if (port->app_pending_responses > 0) {
- nxt_queue_insert_tail(&app->ports, &port->app_link);
+ if (port == app->shared_port) {
+ nxt_thread_mutex_lock(&app->mutex);
- } else {
- nxt_queue_insert_head(&app->ports, &port->app_link);
- }
+ app->active_requests -= got_response + dec_requests;
- nxt_port_inc_use(port);
+ nxt_thread_mutex_unlock(&app->mutex);
- } else {
- if (port->app_pending_responses == 0
- && nxt_queue_first(&app->ports) != &port->app_link)
- {
- nxt_queue_remove(&port->app_link);
- nxt_queue_insert_head(&app->ports, &port->app_link);
- }
- }
+ goto adjust_use;
}
- if (!nxt_queue_is_empty(&app->ports)
- && !nxt_queue_is_empty(&app->requests))
- {
- lnk = nxt_queue_first(&app->requests);
- nxt_queue_remove(lnk);
- lnk->next = NULL;
+ main_app_port = port->main_app_port;
- req_app_link = nxt_queue_link_data(lnk, nxt_request_app_link_t,
- link_app_requests);
+ nxt_thread_mutex_lock(&app->mutex);
- req_app_link->app_port = nxt_router_pop_first_port(app);
+ main_app_port->app_responses += got_response;
+ main_app_port->active_requests -= got_response + dec_requests;
+ app->active_requests -= got_response + dec_requests;
- if (req_app_link->app_port->app_pending_responses > 1) {
- nxt_request_app_link_pending(task, app, req_app_link);
- }
- }
-
- /* Pop first pending request for this port. */
- if (dec_pending > 0
- && !nxt_queue_is_empty(&port->pending_requests))
+ if (main_app_port->pair[1] != -1
+ && (app->max_requests == 0
+ || main_app_port->app_responses < app->max_requests))
{
- lnk = nxt_queue_first(&port->pending_requests);
- nxt_queue_remove(lnk);
- lnk->next = NULL;
-
- pending_ra = nxt_queue_link_data(lnk, nxt_request_app_link_t,
- link_port_pending);
-
- nxt_assert(pending_ra->link_app_pending.next != NULL);
-
- nxt_queue_remove(&pending_ra->link_app_pending);
- pending_ra->link_app_pending.next = NULL;
-
- } else {
- pending_ra = NULL;
- }
-
- /* Try to cancel and re-schedule first stalled request for this app. */
- if (got_response > 0 && !nxt_queue_is_empty(&app->pending)) {
- lnk = nxt_queue_first(&app->pending);
+ if (main_app_port->app_link.next == NULL) {
+ nxt_queue_insert_tail(&app->ports, &main_app_port->app_link);
- re_ra = nxt_queue_link_data(lnk, nxt_request_app_link_t,
- link_app_pending);
-
- if (re_ra->res_time <= nxt_thread_monotonic_time(task->thread)) {
-
- nxt_debug(task, "app '%V' stalled request #%uD detected",
- &app->name, re_ra->stream);
-
- cancelled = nxt_router_msg_cancel(task, &re_ra->msg_info,
- re_ra->stream);
-
- if (cancelled) {
- state.req_app_link = re_ra;
- state.app = app;
-
- /*
- * Need to increment use count "in advance" because
- * nxt_router_port_select() will remove re_ra from lists
- * and decrement use count.
- */
- nxt_request_app_link_inc_use(re_ra);
-
- nxt_router_port_select(task, &state);
-
- goto re_ra_cancelled;
- }
+ nxt_port_inc_use(main_app_port);
}
}
- re_ra = NULL;
-
-re_ra_cancelled:
-
send_quit = (app->max_requests > 0
- && port->app_pending_responses == 0
- && port->app_responses >= app->max_requests);
+ && main_app_port->app_responses >= app->max_requests);
if (send_quit) {
- port_unchained = nxt_queue_chk_remove(&port->app_link);
+ port_unchained = nxt_queue_chk_remove(&main_app_port->app_link);
- port->app = NULL;
+ nxt_port_hash_remove(&app->port_hash, main_app_port);
+ app->port_hash_count--;
+
+ main_app_port->app = NULL;
app->processes--;
} else {
@@ -4313,9 +4386,10 @@ re_ra_cancelled:
adjust_idle_timer = 0;
- if (port->pair[1] != -1 && !send_quit && port->app_pending_responses == 0
- && nxt_queue_is_empty(&port->active_websockets)
- && port->idle_link.next == NULL)
+ if (main_app_port->pair[1] != -1 && !send_quit
+ && main_app_port->active_requests == 0
+ && main_app_port->active_websockets == 0
+ && main_app_port->idle_link.next == NULL)
{
if (app->idle_processes == app->spare_processes
&& app->adjust_idle_work.data == NULL)
@@ -4326,12 +4400,17 @@ re_ra_cancelled:
}
if (app->idle_processes < app->spare_processes) {
- nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
+ nxt_queue_insert_tail(&app->spare_ports, &main_app_port->idle_link);
+ nxt_debug(task, "app '%V' move port %PI:%d to spare_ports",
+ &app->name, main_app_port->pid, main_app_port->id);
} else {
- nxt_queue_insert_tail(&app->idle_ports, &port->idle_link);
+ nxt_queue_insert_tail(&app->idle_ports, &main_app_port->idle_link);
- port->idle_start = task->thread->engine->timers.now;
+ main_app_port->idle_start = task->thread->engine->timers.now;
+
+ nxt_debug(task, "app '%V' move port %PI:%d to idle_ports",
+ &app->name, main_app_port->pid, main_app_port->id);
}
app->idle_processes++;
@@ -4344,60 +4423,22 @@ re_ra_cancelled:
nxt_event_engine_post(app->engine, &app->adjust_idle_work);
}
- if (pending_ra != NULL) {
- nxt_request_app_link_use(task, pending_ra, -1);
- }
-
- if (re_ra != NULL) {
- if (nxt_router_port_post_select(task, &state) == NXT_OK) {
- /*
- * Reference counter already incremented above, this will
- * keep re_ra while nxt_router_app_process_request()
- * task is in queue. Reference counter decreased in
- * nxt_router_app_process_request() after processing.
- */
-
- nxt_work_queue_add(&task->thread->engine->fast_work_queue,
- nxt_router_app_process_request,
- &task->thread->engine->task, app, re_ra);
-
- } else {
- nxt_request_app_link_use(task, re_ra, -1);
- }
- }
-
- if (req_app_link != NULL) {
- /*
- * There should be call nxt_request_app_link_inc_use(req_app_link),
- * because of one more link in the queue. But one link was
- * recently removed from app->requests linked list.
- * Corresponding decrement is in nxt_router_app_process_request().
- */
-
- nxt_work_queue_add(&task->thread->engine->fast_work_queue,
- nxt_router_app_process_request,
- &task->thread->engine->task, app, req_app_link);
-
- goto adjust_use;
- }
-
/* ? */
- if (port->pair[1] == -1) {
+ if (main_app_port->pair[1] == -1) {
nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)",
- &app->name, app, port, port->pid);
+ &app->name, app, main_app_port, main_app_port->pid);
goto adjust_use;
}
if (send_quit) {
- nxt_debug(task, "app '%V' %p send QUIT to port",
- &app->name, app);
+ nxt_debug(task, "app '%V' %p send QUIT to port", &app->name, app);
- nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT,
- -1, 0, 0, NULL);
+ nxt_port_socket_write(task, main_app_port, NXT_PORT_MSG_QUIT, -1, 0, 0,
+ NULL);
if (port_unchained) {
- nxt_port_use(task, port, -1);
+ nxt_port_use(task, main_app_port, -1);
}
goto adjust_use;
@@ -4426,11 +4467,27 @@ nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port)
nxt_thread_mutex_lock(&app->mutex);
+ nxt_port_hash_remove(&app->port_hash, port);
+ app->port_hash_count--;
+
+ if (port->id != 0) {
+ nxt_thread_mutex_unlock(&app->mutex);
+
+ nxt_debug(task, "app '%V' port (%PI, %d) closed", &app->name,
+ port->pid, port->id);
+
+ return;
+ }
+
unchain = nxt_queue_chk_remove(&port->app_link);
if (nxt_queue_chk_remove(&port->idle_link)) {
app->idle_processes--;
+ nxt_debug(task, "app '%V' move port %PI:%d out of %s before close",
+ &app->name, port->pid, port->id,
+ (port->idle_start ? "idle_ports" : "spare_ports"));
+
if (port->idle_start == 0
&& app->idle_processes >= app->spare_processes)
{
@@ -4443,6 +4500,10 @@ nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port)
nxt_queue_insert_tail(&app->spare_ports, idle_lnk);
idle_port->idle_start = 0;
+
+ nxt_debug(task, "app '%V' move port %PI:%d from idle_ports "
+ "to spare_ports",
+ &app->name, idle_port->pid, idle_port->id);
}
}
@@ -4450,8 +4511,7 @@ nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port)
start_process = !task->thread->engine->shutdown
&& nxt_router_app_can_start(app)
- && (!nxt_queue_is_empty(&app->requests)
- || nxt_router_app_need_start(app));
+ && nxt_router_app_need_start(app);
if (start_process) {
app->pending_processes++;
@@ -4500,6 +4560,10 @@ nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data)
app->adjust_idle_work.data = NULL;
}
+ nxt_debug(task, "app '%V' idle_processes %d, spare_processes %d",
+ &app->name,
+ (int) app->idle_processes, (int) app->spare_processes);
+
while (app->idle_processes > app->spare_processes) {
nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
@@ -4509,6 +4573,10 @@ nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data)
timeout = port->idle_start + app->idle_timeout;
+ nxt_debug(task, "app '%V' pid %PI, start %M, timeout %M, threshold %M",
+ &app->name, port->pid,
+ port->idle_start, timeout, threshold);
+
if (timeout > threshold) {
break;
}
@@ -4516,8 +4584,14 @@ nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data)
nxt_queue_remove(lnk);
lnk->next = NULL;
+ nxt_debug(task, "app '%V' move port %PI:%d out of idle_ports (timeout)",
+ &app->name, port->pid, port->id);
+
nxt_queue_chk_remove(&port->app_link);
+ nxt_port_hash_remove(&app->port_hash, port);
+ app->port_hash_count--;
+
app->idle_processes--;
app->processes--;
port->app = NULL;
@@ -4588,7 +4662,7 @@ nxt_router_free_app(nxt_task_t *task, void *obj, void *data)
app = app_joint->app;
for ( ;; ) {
- port = nxt_router_app_get_port_for_quit(app);
+ port = nxt_router_app_get_port_for_quit(task, app);
if (port == NULL) {
break;
}
@@ -4601,12 +4675,23 @@ nxt_router_free_app(nxt_task_t *task, void *obj, void *data)
}
nxt_assert(app->processes == 0);
+ nxt_assert(app->active_requests == 0);
+ nxt_assert(app->port_hash_count == 0);
nxt_assert(app->idle_processes == 0);
- nxt_assert(nxt_queue_is_empty(&app->requests));
nxt_assert(nxt_queue_is_empty(&app->ports));
nxt_assert(nxt_queue_is_empty(&app->spare_ports));
nxt_assert(nxt_queue_is_empty(&app->idle_ports));
+ nxt_port_mmaps_destroy(&app->outgoing, 1);
+
+ nxt_thread_mutex_destroy(&app->outgoing.mutex);
+
+ if (app->shared_port != NULL) {
+ app->shared_port->app = NULL;
+ nxt_port_close(task, app->shared_port);
+ nxt_port_use(task, app->shared_port, -1);
+ }
+
nxt_thread_mutex_destroy(&app->mutex);
nxt_mp_destroy(app->mem_pool);
@@ -4623,178 +4708,49 @@ nxt_router_free_app(nxt_task_t *task, void *obj, void *data)
static void
-nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state)
-{
- int ra_use_delta;
- nxt_app_t *app;
- nxt_bool_t can_start_process;
- nxt_request_app_link_t *req_app_link;
-
- req_app_link = state->req_app_link;
- app = state->app;
-
- state->failed_port_use_delta = 0;
- ra_use_delta = -nxt_queue_chk_remove(&req_app_link->link_app_requests);
-
- if (nxt_queue_chk_remove(&req_app_link->link_port_pending))
- {
- nxt_assert(req_app_link->link_app_pending.next != NULL);
-
- nxt_queue_remove(&req_app_link->link_app_pending);
- req_app_link->link_app_pending.next = NULL;
-
- ra_use_delta--;
- }
-
- state->failed_port = req_app_link->app_port;
-
- if (req_app_link->app_port != NULL) {
- state->failed_port_use_delta--;
-
- state->failed_port->app_pending_responses--;
-
- if (nxt_queue_chk_remove(&state->failed_port->app_link)) {
- state->failed_port_use_delta--;
- }
-
- req_app_link->app_port = NULL;
- }
-
- can_start_process = nxt_router_app_can_start(app);
-
- state->port = NULL;
- state->start_process = 0;
-
- if (nxt_queue_is_empty(&app->ports)
- || (can_start_process && nxt_router_app_first_port_busy(app)) )
- {
- req_app_link = nxt_request_app_link_alloc(task, req_app_link,
- req_app_link->req_rpc_data);
- if (nxt_slow_path(req_app_link == NULL)) {
- goto fail;
- }
-
- if (nxt_slow_path(state->failed_port != NULL)) {
- nxt_queue_insert_head(&app->requests,
- &req_app_link->link_app_requests);
-
- } else {
- nxt_queue_insert_tail(&app->requests,
- &req_app_link->link_app_requests);
- }
-
- ra_use_delta++;
-
- nxt_debug(task, "req_app_link stream #%uD enqueue to app->requests",
- req_app_link->stream);
-
- if (can_start_process) {
- app->pending_processes++;
- state->start_process = 1;
- }
-
- } else {
- state->port = nxt_router_pop_first_port(app);
-
- if (state->port->app_pending_responses > 1) {
- req_app_link = nxt_request_app_link_alloc(task, req_app_link,
- req_app_link->req_rpc_data);
- if (nxt_slow_path(req_app_link == NULL)) {
- goto fail;
- }
-
- req_app_link->app_port = state->port;
-
- nxt_request_app_link_pending(task, app, req_app_link);
- }
-
- if (can_start_process && nxt_router_app_need_start(app)) {
- app->pending_processes++;
- state->start_process = 1;
- }
- }
-
- nxt_request_app_link_chk_use(req_app_link, ra_use_delta);
-
-fail:
-
- state->shared_ra = req_app_link;
-}
-
-
-static nxt_int_t
-nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state)
+nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
+ nxt_request_rpc_data_t *req_rpc_data)
{
- nxt_int_t res;
- nxt_app_t *app;
- nxt_request_app_link_t *req_app_link;
+ nxt_bool_t start_process;
+ nxt_port_t *port;
+ nxt_http_request_t *r;
- req_app_link = state->shared_ra;
- app = state->app;
+ start_process = 0;
- if (state->failed_port_use_delta != 0) {
- nxt_port_use(task, state->failed_port, state->failed_port_use_delta);
- }
+ nxt_thread_mutex_lock(&app->mutex);
- if (nxt_slow_path(req_app_link == NULL)) {
- if (state->port != NULL) {
- nxt_port_use(task, state->port, -1);
- }
+ port = app->shared_port;
+ nxt_port_inc_use(port);
- nxt_request_app_link_error(task, app, state->req_app_link,
- "Failed to allocate shared req<->app link");
+ app->active_requests++;
- return NXT_ERROR;
+ if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) {
+ app->pending_processes++;
+ start_process = 1;
}
- if (state->port != NULL) {
- nxt_debug(task, "already have port for app '%V' %p ", &app->name, app);
-
- req_app_link->app_port = state->port;
-
- if (state->start_process) {
- nxt_router_start_app_process(task, app);
- }
-
- return NXT_OK;
- }
+ r = req_rpc_data->request;
- if (!state->start_process) {
- nxt_debug(task, "app '%V' %p too many running or pending processes",
- &app->name, app);
+ /*
+ * Put request into application-wide list to be able to cancel request
+ * if something goes wrong with application processes.
+ */
+ nxt_queue_insert_tail(&app->ack_waiting_req, &r->app_link);
- return NXT_AGAIN;
- }
+ nxt_thread_mutex_unlock(&app->mutex);
- res = nxt_router_start_app_process(task, app);
+ /*
+ * Retain request memory pool while request is linked in ack_waiting_req
+ * to guarantee request structure memory is accessble.
+ */
+ nxt_mp_retain(r->mem_pool);
- if (nxt_slow_path(res != NXT_OK)) {
- nxt_request_app_link_error(task, app, req_app_link,
- "Failed to start app process");
+ req_rpc_data->app_port = port;
+ req_rpc_data->apr_action = NXT_APR_REQUEST_FAILED;
- return NXT_ERROR;
+ if (start_process) {
+ nxt_router_start_app_process(task, app);
}
-
- return NXT_AGAIN;
-}
-
-
-static nxt_int_t
-nxt_router_app_port(nxt_task_t *task, nxt_app_t *app,
- nxt_request_app_link_t *req_app_link)
-{
- nxt_port_select_state_t state;
-
- state.req_app_link = req_app_link;
- state.app = app;
-
- nxt_thread_mutex_lock(&app->mutex);
-
- nxt_router_port_select(task, &state);
-
- nxt_thread_mutex_unlock(&app->mutex);
-
- return nxt_router_port_post_select(task, &state);
}
@@ -4802,10 +4758,7 @@ void
nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r,
nxt_app_t *app)
{
- nxt_int_t res;
- nxt_port_t *port;
nxt_event_engine_t *engine;
- nxt_request_app_link_t ra_local, *req_app_link;
nxt_request_rpc_data_t *req_rpc_data;
engine = task->thread->engine;
@@ -4824,7 +4777,7 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r,
* in port handlers. Need to fixup request memory pool. Counterpart
* release will be called via following call chain:
* nxt_request_rpc_data_unlink() ->
- * nxt_router_http_request_done() ->
+ * nxt_router_http_request_release_post() ->
* nxt_router_http_request_release()
*/
nxt_mp_retain(r->mem_pool);
@@ -4834,31 +4787,63 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r,
r->timer.log = engine->task.log;
r->timer.bias = NXT_TIMER_DEFAULT_BIAS;
+ r->engine = engine;
+ r->err_work.handler = nxt_router_http_request_error;
+ r->err_work.task = task;
+ r->err_work.obj = r;
+
req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data);
req_rpc_data->app = app;
+ req_rpc_data->msg_info.body_fd = -1;
+ req_rpc_data->rpc_cancel = 1;
nxt_router_app_use(task, app, 1);
req_rpc_data->request = r;
r->req_rpc_data = req_rpc_data;
- req_app_link = &ra_local;
- nxt_request_app_link_init(task, req_app_link, req_rpc_data);
+ if (r->last != NULL) {
+ r->last->completion_handler = nxt_router_http_request_done;
+ }
- res = nxt_router_app_port(task, app, req_app_link);
- req_app_link = req_rpc_data->req_app_link;
+ nxt_router_app_port_get(task, app, req_rpc_data);
+ nxt_router_app_prepare_request(task, req_rpc_data);
+}
- if (res == NXT_OK) {
- port = req_app_link->app_port;
- nxt_assert(port != NULL);
+static void
+nxt_router_http_request_error(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_http_request_t *r;
+
+ r = obj;
+
+ nxt_debug(task, "router http request error (rpc_data %p)", r->req_rpc_data);
- nxt_port_rpc_ex_set_peer(task, engine->port, req_rpc_data, port->pid);
+ nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
+
+ if (r->req_rpc_data != NULL) {
+ nxt_request_rpc_data_unlink(task, r->req_rpc_data);
+ }
+
+ nxt_mp_release(r->mem_pool);
+}
+
+
+static void
+nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_http_request_t *r;
+
+ r = data;
- nxt_router_app_prepare_request(task, req_app_link);
+ nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data);
+
+ if (r->req_rpc_data != NULL) {
+ nxt_request_rpc_data_unlink(task, r->req_rpc_data);
}
- nxt_request_app_link_use(task, req_app_link, -1);
+ nxt_http_request_close_handler(task, r, r->proto.any);
}
@@ -4870,91 +4855,106 @@ nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj, void *data)
static void
nxt_router_app_prepare_request(nxt_task_t *task,
- nxt_request_app_link_t *req_app_link)
+ nxt_request_rpc_data_t *req_rpc_data)
{
- nxt_buf_t *buf;
+ nxt_app_t *app;
+ nxt_buf_t *buf, *body;
nxt_int_t res;
- nxt_port_t *port, *c_port, *reply_port;
- nxt_apr_action_t apr_action;
+ nxt_port_t *port, *reply_port;
- nxt_assert(req_app_link->app_port != NULL);
+ int notify;
+ struct {
+ nxt_port_msg_t pm;
+ nxt_port_mmap_msg_t mm;
+ } msg;
- port = req_app_link->app_port;
- reply_port = req_app_link->reply_port;
- apr_action = NXT_APR_REQUEST_FAILED;
+ app = req_rpc_data->app;
- c_port = nxt_process_connected_port_find(port->process, reply_port);
+ nxt_assert(app != NULL);
- if (nxt_slow_path(c_port != reply_port)) {
- res = nxt_port_send_port(task, port, reply_port, 0);
+ port = req_rpc_data->app_port;
- if (nxt_slow_path(res != NXT_OK)) {
- nxt_request_app_link_error(task, port->app, req_app_link,
- "Failed to send reply port to application");
+ nxt_assert(port != NULL);
+ nxt_assert(port->queue != NULL);
- goto release_port;
- }
+ reply_port = task->thread->engine->port;
- nxt_process_connected_port_add(port->process, reply_port);
- }
+ buf = nxt_router_prepare_msg(task, req_rpc_data->request, app,
+ nxt_app_msg_prefix[app->type]);
+ if (nxt_slow_path(buf == NULL)) {
+ nxt_alert(task, "stream #%uD, app '%V': failed to prepare app message",
+ req_rpc_data->stream, &app->name);
- buf = nxt_router_prepare_msg(task, req_app_link->request, port,
- nxt_app_msg_prefix[port->app->type]);
+ nxt_http_request_error(task, req_rpc_data->request,
+ NXT_HTTP_INTERNAL_SERVER_ERROR);
- if (nxt_slow_path(buf == NULL)) {
- nxt_request_app_link_error(task, port->app, req_app_link,
- "Failed to prepare message for application");
- goto release_port;
+ return;
}
nxt_debug(task, "about to send %O bytes buffer to app process port %d",
nxt_buf_used_size(buf),
port->socket.fd);
- apr_action = NXT_APR_NEW_PORT;
-
- req_app_link->msg_info.buf = buf;
- req_app_link->msg_info.completion_handler = buf->completion_handler;
+ req_rpc_data->msg_info.buf = buf;
+ req_rpc_data->msg_info.completion_handler = buf->completion_handler;
- for (; buf; buf = buf->next) {
+ do {
buf->completion_handler = nxt_router_dummy_buf_completion;
- }
+ buf = buf->next;
+ } while (buf != NULL);
- buf = req_app_link->msg_info.buf;
+ buf = req_rpc_data->msg_info.buf;
- res = nxt_port_mmap_get_tracking(task, port,
- &req_app_link->msg_info.tracking,
- req_app_link->stream);
- if (nxt_slow_path(res != NXT_OK)) {
- nxt_request_app_link_error(task, port->app, req_app_link,
- "Failed to get tracking area");
- goto release_port;
- }
-
- if (req_app_link->body_fd != -1) {
- nxt_debug(task, "stream #%uD: send body fd %d", req_app_link->stream,
- req_app_link->body_fd);
+ body = req_rpc_data->request->body;
- lseek(req_app_link->body_fd, 0, SEEK_SET);
- }
+ if (body != NULL && nxt_buf_is_file(body)) {
+ req_rpc_data->msg_info.body_fd = body->file->fd;
- res = nxt_port_socket_twrite(task, port, NXT_PORT_MSG_REQ_HEADERS,
- req_app_link->body_fd,
- req_app_link->stream, reply_port->id, buf,
- &req_app_link->msg_info.tracking);
+ body->file->fd = -1;
- if (nxt_slow_path(res != NXT_OK)) {
- nxt_request_app_link_error(task, port->app, req_app_link,
- "Failed to send message to application");
- goto release_port;
- }
+ } else {
+ req_rpc_data->msg_info.body_fd = -1;
+ }
+
+ msg.pm.stream = req_rpc_data->stream;
+ msg.pm.pid = reply_port->pid;
+ msg.pm.reply_port = reply_port->id;
+ msg.pm.type = NXT_PORT_MSG_REQ_HEADERS;
+ msg.pm.last = 0;
+ msg.pm.mmap = 1;
+ msg.pm.nf = 0;
+ msg.pm.mf = 0;
+ msg.pm.tracking = 0;
+
+ nxt_port_mmap_handler_t *mmap_handler = buf->parent;
+ nxt_port_mmap_header_t *hdr = mmap_handler->hdr;
+
+ msg.mm.mmap_id = hdr->id;
+ msg.mm.chunk_id = nxt_port_mmap_chunk_id(hdr, buf->mem.pos);
+ msg.mm.size = nxt_buf_used_size(buf);
+
+ res = nxt_app_queue_send(port->queue, &msg, sizeof(msg),
+ req_rpc_data->stream, &notify,
+ &req_rpc_data->msg_info.tracking_cookie);
+ if (nxt_fast_path(res == NXT_OK)) {
+ if (notify != 0) {
+ (void) nxt_port_socket_write(task, port,
+ NXT_PORT_MSG_READ_QUEUE,
+ -1, req_rpc_data->stream,
+ reply_port->id, NULL);
-release_port:
+ } else {
+ nxt_debug(task, "queue is not empty");
+ }
- nxt_router_app_port_release(task, port, apr_action);
+ } else {
+ nxt_alert(task, "stream #%uD, app '%V': failed to send app message",
+ req_rpc_data->stream, &app->name);
- nxt_request_app_link_update_peer(task, req_app_link);
+ nxt_http_request_error(task, req_rpc_data->request,
+ NXT_HTTP_INTERNAL_SERVER_ERROR);
+ }
}
@@ -5012,7 +5012,7 @@ nxt_fields_next(nxt_fields_iter_t *i)
static nxt_buf_t *
nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r,
- nxt_port_t *port, const nxt_str_t *prefix)
+ nxt_app_t *app, const nxt_str_t *prefix)
{
void *target_pos, *query_pos;
u_char *pos, *end, *p, c;
@@ -5053,7 +5053,7 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r,
return NULL;
}
- out = nxt_port_mmap_get_buf(task, port,
+ out = nxt_port_mmap_get_buf(task, &app->outgoing,
nxt_min(req_size + content_length, PORT_MMAP_DATA_SIZE));
if (nxt_slow_path(out == NULL)) {
return NULL;
@@ -5235,7 +5235,7 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r,
if (buf == NULL) {
free_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
- buf = nxt_port_mmap_get_buf(task, port, free_size);
+ buf = nxt_port_mmap_get_buf(task, &app->outgoing, free_size);
if (nxt_slow_path(buf == NULL)) {
while (out != NULL) {
buf = out->next;
@@ -5283,15 +5283,9 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r,
static void
nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
{
- nxt_app_t *app;
- nxt_bool_t cancelled, unlinked;
- nxt_port_t *port;
nxt_timer_t *timer;
- nxt_queue_link_t *lnk;
nxt_http_request_t *r;
- nxt_request_app_link_t *pending_ra;
nxt_request_rpc_data_t *req_rpc_data;
- nxt_port_select_state_t state;
timer = obj;
@@ -5299,94 +5293,6 @@ nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
r = nxt_timer_data(timer, nxt_http_request_t, timer);
req_rpc_data = r->timer_data;
- app = req_rpc_data->app;
-
- if (app == NULL) {
- goto generate_error;
- }
-
- port = NULL;
- pending_ra = NULL;
-
- if (req_rpc_data->app_port != NULL) {
- port = req_rpc_data->app_port;
- req_rpc_data->app_port = NULL;
- }
-
- if (port == NULL && req_rpc_data->req_app_link != NULL
- && req_rpc_data->req_app_link->app_port != NULL)
- {
- port = req_rpc_data->req_app_link->app_port;
- req_rpc_data->req_app_link->app_port = NULL;
- }
-
- if (port == NULL) {
- goto generate_error;
- }
-
- nxt_thread_mutex_lock(&app->mutex);
-
- unlinked = nxt_queue_chk_remove(&port->app_link);
-
- if (!nxt_queue_is_empty(&port->pending_requests)) {
- lnk = nxt_queue_first(&port->pending_requests);
-
- pending_ra = nxt_queue_link_data(lnk, nxt_request_app_link_t,
- link_port_pending);
-
- nxt_assert(pending_ra->link_app_pending.next != NULL);
-
- nxt_debug(task, "app '%V' pending request #%uD found",
- &app->name, pending_ra->stream);
-
- cancelled = nxt_router_msg_cancel(task, &pending_ra->msg_info,
- pending_ra->stream);
-
- if (cancelled) {
- state.req_app_link = pending_ra;
- state.app = app;
-
- /*
- * Need to increment use count "in advance" because
- * nxt_router_port_select() will remove pending_ra from lists
- * and decrement use count.
- */
- nxt_request_app_link_inc_use(pending_ra);
-
- nxt_router_port_select(task, &state);
-
- } else {
- pending_ra = NULL;
- }
- }
-
- nxt_thread_mutex_unlock(&app->mutex);
-
- if (pending_ra != NULL) {
- if (nxt_router_port_post_select(task, &state) == NXT_OK) {
- /*
- * Reference counter already incremented above, this will
- * keep pending_ra while nxt_router_app_process_request()
- * task is in queue. Reference counter decreased in
- * nxt_router_app_process_request() after processing.
- */
-
- nxt_work_queue_add(&task->thread->engine->fast_work_queue,
- nxt_router_app_process_request,
- &task->thread->engine->task, app, pending_ra);
-
- } else {
- nxt_request_app_link_use(task, pending_ra, -1);
- }
- }
-
- nxt_debug(task, "send quit to app '%V' pid %PI", &app->name, port->pid);
-
- nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
-
- nxt_port_use(task, port, unlinked ? -2 : -1);
-
-generate_error:
nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
@@ -5394,13 +5300,11 @@ generate_error:
}
-static nxt_int_t
-nxt_router_http_request_done(nxt_task_t *task, nxt_http_request_t *r)
+static void
+nxt_router_http_request_release_post(nxt_task_t *task, nxt_http_request_t *r)
{
r->timer.handler = nxt_router_http_request_release;
nxt_timer_add(task->thread->engine, &r->timer, 0);
-
- return NXT_OK;
}
@@ -5409,7 +5313,7 @@ nxt_router_http_request_release(nxt_task_t *task, void *obj, void *data)
{
nxt_http_request_t *r;
- nxt_debug(task, "http app release");
+ nxt_debug(task, "http request pool release");
r = nxt_timer_data(obj, nxt_http_request_t, timer);
@@ -5468,3 +5372,120 @@ nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
-1, 0, 0, NULL);
}
}
+
+
+static void
+nxt_router_get_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
+{
+ nxt_fd_t fd;
+ nxt_port_t *port;
+ nxt_runtime_t *rt;
+ nxt_port_mmaps_t *mmaps;
+ nxt_port_msg_get_mmap_t *get_mmap_msg;
+ nxt_port_mmap_handler_t *mmap_handler;
+
+ rt = task->thread->runtime;
+
+ port = nxt_runtime_port_find(rt, msg->port_msg.pid,
+ msg->port_msg.reply_port);
+ if (nxt_slow_path(port == NULL)) {
+ nxt_alert(task, "get_mmap_handler: reply_port %PI:%d not found",
+ msg->port_msg.pid, msg->port_msg.reply_port);
+
+ return;
+ }
+
+ if (nxt_slow_path(nxt_buf_used_size(msg->buf)
+ < (int) sizeof(nxt_port_msg_get_mmap_t)))
+ {
+ nxt_alert(task, "get_mmap_handler: message buffer too small (%d)",
+ (int) nxt_buf_used_size(msg->buf));
+
+ return;
+ }
+
+ get_mmap_msg = (nxt_port_msg_get_mmap_t *) msg->buf->mem.pos;
+
+ nxt_assert(port->type == NXT_PROCESS_APP);
+
+ if (nxt_slow_path(port->app == NULL)) {
+ nxt_alert(task, "get_mmap_handler: app == NULL for reply port %PI:%d",
+ port->pid, port->id);
+
+ // FIXME
+ nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
+ -1, msg->port_msg.stream, 0, NULL);
+
+ return;
+ }
+
+ mmaps = &port->app->outgoing;
+ nxt_thread_mutex_lock(&mmaps->mutex);
+
+ if (nxt_slow_path(get_mmap_msg->id >= mmaps->size)) {
+ nxt_thread_mutex_unlock(&mmaps->mutex);
+
+ nxt_alert(task, "get_mmap_handler: mmap id is too big (%d)",
+ (int) get_mmap_msg->id);
+
+ // FIXME
+ nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
+ -1, msg->port_msg.stream, 0, NULL);
+ return;
+ }
+
+ mmap_handler = mmaps->elts[get_mmap_msg->id].mmap_handler;
+
+ fd = mmap_handler->fd;
+
+ nxt_thread_mutex_unlock(&mmaps->mutex);
+
+ nxt_debug(task, "get mmap %PI:%d found",
+ msg->port_msg.pid, (int) get_mmap_msg->id);
+
+ (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL);
+}
+
+
+static void
+nxt_router_get_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
+{
+ nxt_port_t *port, *reply_port;
+ nxt_runtime_t *rt;
+ nxt_port_msg_get_port_t *get_port_msg;
+
+ rt = task->thread->runtime;
+
+ reply_port = nxt_runtime_port_find(rt, msg->port_msg.pid,
+ msg->port_msg.reply_port);
+ if (nxt_slow_path(reply_port == NULL)) {
+ nxt_alert(task, "get_port_handler: reply_port %PI:%d not found",
+ msg->port_msg.pid, msg->port_msg.reply_port);
+
+ return;
+ }
+
+ if (nxt_slow_path(nxt_buf_used_size(msg->buf)
+ < (int) sizeof(nxt_port_msg_get_port_t)))
+ {
+ nxt_alert(task, "get_port_handler: message buffer too small (%d)",
+ (int) nxt_buf_used_size(msg->buf));
+
+ return;
+ }
+
+ get_port_msg = (nxt_port_msg_get_port_t *) msg->buf->mem.pos;
+
+ port = nxt_runtime_port_find(rt, get_port_msg->pid, get_port_msg->id);
+ if (nxt_slow_path(port == NULL)) {
+ nxt_alert(task, "get_port_handler: port %PI:%d not found",
+ get_port_msg->pid, get_port_msg->id);
+
+ return;
+ }
+
+ nxt_debug(task, "get port %PI:%d found", get_port_msg->pid,
+ get_port_msg->id);
+
+ (void) nxt_port_send_port(task, reply_port, port, msg->port_msg.stream);
+}