summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_router.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/nxt_router.c100
1 files changed, 83 insertions, 17 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c
index 758310a9..3380e133 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -182,6 +182,8 @@ static void nxt_router_engine_post(nxt_event_engine_t *engine,
nxt_work_t *jobs);
static void nxt_router_thread_start(void *data);
+static void nxt_router_rt_add_port(nxt_task_t *task, void *obj,
+ void *data);
static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
void *data);
static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
@@ -253,6 +255,8 @@ static nxt_int_t nxt_router_http_request_done(nxt_task_t *task,
static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
void *data);
static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
+static void nxt_router_get_port_handler(nxt_task_t *task,
+ nxt_port_recv_msg_t *msg);
extern const nxt_http_request_state_t nxt_http_websocket;
@@ -274,6 +278,7 @@ static const nxt_str_t *nxt_app_msg_prefix[] = {
static const nxt_port_handlers_t nxt_router_process_port_handlers = {
.quit = nxt_signal_quit_handler,
.new_port = nxt_router_new_port_handler,
+ .get_port = nxt_router_get_port_handler,
.change_file = nxt_port_change_log_file_handler,
.mmap = nxt_port_mmap_handler,
.data = nxt_router_conf_data_handler,
@@ -2944,6 +2949,7 @@ nxt_router_thread_start(void *data)
nxt_int_t ret;
nxt_port_t *port;
nxt_task_t *task;
+ nxt_work_t *work;
nxt_thread_t *thread;
nxt_thread_link_t *link;
nxt_event_engine_t *engine;
@@ -2988,11 +2994,43 @@ nxt_router_thread_start(void *data)
nxt_port_enable(task, port, &nxt_router_app_port_handlers);
+ work = nxt_zalloc(sizeof(nxt_work_t));
+ if (nxt_slow_path(work == NULL)) {
+ return;
+ }
+
+ work->handler = nxt_router_rt_add_port;
+ work->task = link->work.task;
+ work->obj = work;
+ work->data = port;
+
+ nxt_event_engine_post(link->work.task->thread->engine, work);
+
nxt_event_engine_start(engine);
}
static void
+nxt_router_rt_add_port(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_int_t res;
+ nxt_port_t *port;
+ nxt_runtime_t *rt;
+
+ rt = task->thread->runtime;
+ port = data;
+
+ nxt_free(obj);
+
+ res = nxt_port_hash_add(&rt->ports, port);
+
+ if (nxt_fast_path(res == NXT_OK)) {
+ nxt_port_use(task, port, 1);
+ }
+}
+
+
+static void
nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data)
{
nxt_joint_job_t *job;
@@ -3281,7 +3319,6 @@ nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
}
/* TODO remove engine->port */
- /* TODO excude from connected ports */
if (rtcf != NULL) {
nxt_debug(task, "old router conf is destroyed");
@@ -4937,7 +4974,7 @@ nxt_router_app_prepare_request(nxt_task_t *task,
{
nxt_buf_t *buf;
nxt_int_t res;
- nxt_port_t *port, *c_port, *reply_port;
+ nxt_port_t *port, *reply_port;
nxt_apr_action_t apr_action;
nxt_assert(req_app_link->app_port != NULL);
@@ -4947,21 +4984,6 @@ nxt_router_app_prepare_request(nxt_task_t *task,
apr_action = NXT_APR_REQUEST_FAILED;
- c_port = nxt_process_connected_port_find(port->process, reply_port);
-
- if (nxt_slow_path(c_port != reply_port)) {
- res = nxt_port_send_port(task, port, reply_port, 0);
-
- if (nxt_slow_path(res != NXT_OK)) {
- nxt_request_app_link_error(task, port->app, req_app_link,
- "Failed to send reply port to application");
-
- goto release_port;
- }
-
- nxt_process_connected_port_add(port->process, reply_port);
- }
-
buf = nxt_router_prepare_msg(task, req_app_link->request, port,
nxt_app_msg_prefix[port->app->type]);
@@ -5531,3 +5553,47 @@ nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
-1, 0, 0, NULL);
}
}
+
+
+static void
+nxt_router_get_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
+{
+ nxt_port_t *port, *reply_port;
+ nxt_runtime_t *rt;
+ nxt_port_msg_get_port_t *get_port_msg;
+
+ rt = task->thread->runtime;
+
+ reply_port = nxt_runtime_port_find(rt, msg->port_msg.pid,
+ msg->port_msg.reply_port);
+ if (nxt_slow_path(reply_port == NULL)) {
+ nxt_alert(task, "get_port_handler: reply_port %PI:%d not found",
+ msg->port_msg.pid, msg->port_msg.reply_port);
+
+ return;
+ }
+
+ if (nxt_slow_path(nxt_buf_used_size(msg->buf)
+ < (int) sizeof(nxt_port_msg_get_port_t)))
+ {
+ nxt_alert(task, "get_port_handler: message buffer too small (%d)",
+ (int) nxt_buf_used_size(msg->buf));
+
+ return;
+ }
+
+ get_port_msg = (nxt_port_msg_get_port_t *) msg->buf->mem.pos;
+
+ port = nxt_runtime_port_find(rt, get_port_msg->pid, get_port_msg->id);
+ if (nxt_slow_path(port == NULL)) {
+ nxt_alert(task, "get_port_handler: port %PI:%d not found",
+ get_port_msg->pid, get_port_msg->id);
+
+ return;
+ }
+
+ nxt_debug(task, "get port %PI:%d found", get_port_msg->pid,
+ get_port_msg->id);
+
+ (void) nxt_port_send_port(task, reply_port, port, msg->port_msg.stream);
+}