summaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/nxt_port.h9
-rw-r--r--src/nxt_process.c37
-rw-r--r--src/nxt_process.h9
-rw-r--r--src/nxt_router.c100
-rw-r--r--src/nxt_runtime.c7
-rw-r--r--src/nxt_unit.c276
-rw-r--r--src/nxt_unit.h1
7 files changed, 342 insertions, 97 deletions
diff --git a/src/nxt_port.h b/src/nxt_port.h
index 0e8707f3..838a7ffe 100644
--- a/src/nxt_port.h
+++ b/src/nxt_port.h
@@ -25,6 +25,7 @@ struct nxt_port_handlers_s {
/* File descriptor exchange. */
nxt_port_handler_t change_file;
nxt_port_handler_t new_port;
+ nxt_port_handler_t get_port;
nxt_port_handler_t mmap;
/* New process */
@@ -77,6 +78,7 @@ typedef enum {
_NXT_PORT_MSG_CHANGE_FILE = nxt_port_handler_idx(change_file),
_NXT_PORT_MSG_NEW_PORT = nxt_port_handler_idx(new_port),
+ _NXT_PORT_MSG_GET_PORT = nxt_port_handler_idx(get_port),
_NXT_PORT_MSG_MMAP = nxt_port_handler_idx(mmap),
_NXT_PORT_MSG_PROCESS_CREATED = nxt_port_handler_idx(process_created),
@@ -107,6 +109,7 @@ typedef enum {
NXT_PORT_MSG_ACCESS_LOG = nxt_msg_last(_NXT_PORT_MSG_ACCESS_LOG),
NXT_PORT_MSG_CHANGE_FILE = nxt_msg_last(_NXT_PORT_MSG_CHANGE_FILE),
NXT_PORT_MSG_NEW_PORT = nxt_msg_last(_NXT_PORT_MSG_NEW_PORT),
+ NXT_PORT_MSG_GET_PORT = nxt_msg_last(_NXT_PORT_MSG_GET_PORT),
NXT_PORT_MSG_MMAP = nxt_msg_last(_NXT_PORT_MSG_MMAP)
| NXT_PORT_MSG_CLOSE_FD | NXT_PORT_MSG_SYNC,
@@ -238,6 +241,12 @@ typedef struct {
} nxt_port_msg_new_port_t;
+typedef struct {
+ nxt_port_id_t id;
+ nxt_pid_t pid;
+} nxt_port_msg_get_port_t;
+
+
/*
* nxt_port_data_t size is allocation size
* which enables effective reuse of memory pool cache.
diff --git a/src/nxt_process.c b/src/nxt_process.c
index 5a01c21e..0b3aa40f 100644
--- a/src/nxt_process.c
+++ b/src/nxt_process.c
@@ -1108,43 +1108,6 @@ nxt_process_close_ports(nxt_task_t *task, nxt_process_t *process)
void
-nxt_process_connected_port_add(nxt_process_t *process, nxt_port_t *port)
-{
- nxt_thread_mutex_lock(&process->cp_mutex);
-
- nxt_port_hash_add(&process->connected_ports, port);
-
- nxt_thread_mutex_unlock(&process->cp_mutex);
-}
-
-
-void
-nxt_process_connected_port_remove(nxt_process_t *process, nxt_port_t *port)
-{
- nxt_thread_mutex_lock(&process->cp_mutex);
-
- nxt_port_hash_remove(&process->connected_ports, port);
-
- nxt_thread_mutex_unlock(&process->cp_mutex);
-}
-
-
-nxt_port_t *
-nxt_process_connected_port_find(nxt_process_t *process, nxt_port_t *port)
-{
- nxt_port_t *res;
-
- nxt_thread_mutex_lock(&process->cp_mutex);
-
- res = nxt_port_hash_find(&process->connected_ports, port->pid, port->id);
-
- nxt_thread_mutex_unlock(&process->cp_mutex);
-
- return res;
-}
-
-
-void
nxt_process_quit(nxt_task_t *task, nxt_uint_t exit_status)
{
nxt_uint_t n;
diff --git a/src/nxt_process.h b/src/nxt_process.h
index d3311722..4076cefc 100644
--- a/src/nxt_process.h
+++ b/src/nxt_process.h
@@ -95,7 +95,6 @@ typedef struct {
nxt_port_mmaps_t outgoing;
nxt_thread_mutex_t cp_mutex;
- nxt_lvlhsh_t connected_ports; /* of nxt_port_t */
uint32_t stream;
@@ -172,14 +171,6 @@ void nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i);
void nxt_process_close_ports(nxt_task_t *task, nxt_process_t *process);
-void nxt_process_connected_port_add(nxt_process_t *process, nxt_port_t *port);
-
-void nxt_process_connected_port_remove(nxt_process_t *process,
- nxt_port_t *port);
-
-nxt_port_t *nxt_process_connected_port_find(nxt_process_t *process,
- nxt_port_t *port);
-
void nxt_process_quit(nxt_task_t *task, nxt_uint_t exit_status);
void nxt_signal_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
diff --git a/src/nxt_router.c b/src/nxt_router.c
index 758310a9..3380e133 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -182,6 +182,8 @@ static void nxt_router_engine_post(nxt_event_engine_t *engine,
nxt_work_t *jobs);
static void nxt_router_thread_start(void *data);
+static void nxt_router_rt_add_port(nxt_task_t *task, void *obj,
+ void *data);
static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
void *data);
static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
@@ -253,6 +255,8 @@ static nxt_int_t nxt_router_http_request_done(nxt_task_t *task,
static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
void *data);
static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
+static void nxt_router_get_port_handler(nxt_task_t *task,
+ nxt_port_recv_msg_t *msg);
extern const nxt_http_request_state_t nxt_http_websocket;
@@ -274,6 +278,7 @@ static const nxt_str_t *nxt_app_msg_prefix[] = {
static const nxt_port_handlers_t nxt_router_process_port_handlers = {
.quit = nxt_signal_quit_handler,
.new_port = nxt_router_new_port_handler,
+ .get_port = nxt_router_get_port_handler,
.change_file = nxt_port_change_log_file_handler,
.mmap = nxt_port_mmap_handler,
.data = nxt_router_conf_data_handler,
@@ -2944,6 +2949,7 @@ nxt_router_thread_start(void *data)
nxt_int_t ret;
nxt_port_t *port;
nxt_task_t *task;
+ nxt_work_t *work;
nxt_thread_t *thread;
nxt_thread_link_t *link;
nxt_event_engine_t *engine;
@@ -2988,11 +2994,43 @@ nxt_router_thread_start(void *data)
nxt_port_enable(task, port, &nxt_router_app_port_handlers);
+ work = nxt_zalloc(sizeof(nxt_work_t));
+ if (nxt_slow_path(work == NULL)) {
+ return;
+ }
+
+ work->handler = nxt_router_rt_add_port;
+ work->task = link->work.task;
+ work->obj = work;
+ work->data = port;
+
+ nxt_event_engine_post(link->work.task->thread->engine, work);
+
nxt_event_engine_start(engine);
}
static void
+nxt_router_rt_add_port(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_int_t res;
+ nxt_port_t *port;
+ nxt_runtime_t *rt;
+
+ rt = task->thread->runtime;
+ port = data;
+
+ nxt_free(obj);
+
+ res = nxt_port_hash_add(&rt->ports, port);
+
+ if (nxt_fast_path(res == NXT_OK)) {
+ nxt_port_use(task, port, 1);
+ }
+}
+
+
+static void
nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data)
{
nxt_joint_job_t *job;
@@ -3281,7 +3319,6 @@ nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
}
/* TODO remove engine->port */
- /* TODO excude from connected ports */
if (rtcf != NULL) {
nxt_debug(task, "old router conf is destroyed");
@@ -4937,7 +4974,7 @@ nxt_router_app_prepare_request(nxt_task_t *task,
{
nxt_buf_t *buf;
nxt_int_t res;
- nxt_port_t *port, *c_port, *reply_port;
+ nxt_port_t *port, *reply_port;
nxt_apr_action_t apr_action;
nxt_assert(req_app_link->app_port != NULL);
@@ -4947,21 +4984,6 @@ nxt_router_app_prepare_request(nxt_task_t *task,
apr_action = NXT_APR_REQUEST_FAILED;
- c_port = nxt_process_connected_port_find(port->process, reply_port);
-
- if (nxt_slow_path(c_port != reply_port)) {
- res = nxt_port_send_port(task, port, reply_port, 0);
-
- if (nxt_slow_path(res != NXT_OK)) {
- nxt_request_app_link_error(task, port->app, req_app_link,
- "Failed to send reply port to application");
-
- goto release_port;
- }
-
- nxt_process_connected_port_add(port->process, reply_port);
- }
-
buf = nxt_router_prepare_msg(task, req_app_link->request, port,
nxt_app_msg_prefix[port->app->type]);
@@ -5531,3 +5553,47 @@ nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
-1, 0, 0, NULL);
}
}
+
+
+static void
+nxt_router_get_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
+{
+ nxt_port_t *port, *reply_port;
+ nxt_runtime_t *rt;
+ nxt_port_msg_get_port_t *get_port_msg;
+
+ rt = task->thread->runtime;
+
+ reply_port = nxt_runtime_port_find(rt, msg->port_msg.pid,
+ msg->port_msg.reply_port);
+ if (nxt_slow_path(reply_port == NULL)) {
+ nxt_alert(task, "get_port_handler: reply_port %PI:%d not found",
+ msg->port_msg.pid, msg->port_msg.reply_port);
+
+ return;
+ }
+
+ if (nxt_slow_path(nxt_buf_used_size(msg->buf)
+ < (int) sizeof(nxt_port_msg_get_port_t)))
+ {
+ nxt_alert(task, "get_port_handler: message buffer too small (%d)",
+ (int) nxt_buf_used_size(msg->buf));
+
+ return;
+ }
+
+ get_port_msg = (nxt_port_msg_get_port_t *) msg->buf->mem.pos;
+
+ port = nxt_runtime_port_find(rt, get_port_msg->pid, get_port_msg->id);
+ if (nxt_slow_path(port == NULL)) {
+ nxt_alert(task, "get_port_handler: port %PI:%d not found",
+ get_port_msg->pid, get_port_msg->id);
+
+ return;
+ }
+
+ nxt_debug(task, "get port %PI:%d found", get_port_msg->pid,
+ get_port_msg->id);
+
+ (void) nxt_port_send_port(task, reply_port, port, msg->port_msg.stream);
+}
diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c
index 694ce74d..c25b93cc 100644
--- a/src/nxt_runtime.c
+++ b/src/nxt_runtime.c
@@ -1389,8 +1389,6 @@ nxt_runtime_process_new(nxt_runtime_t *rt)
void
nxt_runtime_process_release(nxt_runtime_t *rt, nxt_process_t *process)
{
- nxt_port_t *port;
-
if (process->registered == 1) {
nxt_runtime_process_remove(rt, process);
}
@@ -1401,11 +1399,6 @@ nxt_runtime_process_release(nxt_runtime_t *rt, nxt_process_t *process)
nxt_port_mmaps_destroy(&process->incoming, 1);
nxt_port_mmaps_destroy(&process->outgoing, 1);
- do {
- port = nxt_port_hash_retrieve(&process->connected_ports);
-
- } while (port != NULL);
-
nxt_thread_mutex_destroy(&process->incoming.mutex);
nxt_thread_mutex_destroy(&process->outgoing.mutex);
nxt_thread_mutex_destroy(&process->cp_mutex);
diff --git a/src/nxt_unit.c b/src/nxt_unit.c
index ddfd9c80..c1ef977f 100644
--- a/src/nxt_unit.c
+++ b/src/nxt_unit.c
@@ -55,6 +55,8 @@ static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
+static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
+ nxt_unit_port_id_t *port_id);
static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx);
@@ -119,6 +121,7 @@ static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib,
static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
static void nxt_unit_read_buf(nxt_unit_ctx_t *ctx,
nxt_unit_read_buf_t *rbuf);
+static void nxt_unit_process_ready_req(nxt_unit_ctx_impl_t *ctx_impl);
static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl);
static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx);
@@ -138,6 +141,7 @@ static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid);
static void nxt_unit_remove_process(nxt_unit_impl_t *lib,
nxt_unit_process_t *process);
static void nxt_unit_quit(nxt_unit_ctx_t *ctx);
+static int nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id);
static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx,
nxt_unit_port_t *port, const void *buf, size_t buf_size,
const void *oob, size_t oob_size);
@@ -215,7 +219,10 @@ struct nxt_unit_request_info_impl_s {
nxt_unit_req_state_t state;
uint8_t websocket;
+ /* for nxt_unit_ctx_impl_t.free_req or active_req */
nxt_queue_link_t link;
+ /* for nxt_unit_port_impl_t.awaiting_req */
+ nxt_queue_link_t port_wait_link;
char extra_data[];
};
@@ -244,6 +251,7 @@ struct nxt_unit_ctx_impl_s {
nxt_unit_ctx_t ctx;
nxt_atomic_t use_count;
+ nxt_atomic_t wait_items;
pthread_mutex_t mutex;
@@ -265,6 +273,9 @@ struct nxt_unit_ctx_impl_s {
/* of nxt_unit_request_info_impl_t */
nxt_lvlhsh_t requests;
+ /* of nxt_unit_request_info_impl_t */
+ nxt_queue_t ready_req;
+
nxt_unit_read_buf_t *pending_read_head;
nxt_unit_read_buf_t **pending_read_tail;
nxt_unit_read_buf_t *free_read_buf;
@@ -309,6 +320,11 @@ struct nxt_unit_port_impl_s {
nxt_queue_link_t link;
nxt_unit_process_t *process;
+
+ /* of nxt_unit_request_info_impl_t */
+ nxt_queue_t awaiting_req;
+
+ int ready;
};
@@ -515,10 +531,12 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link);
ctx_impl->use_count = 1;
+ ctx_impl->wait_items = 0;
nxt_queue_init(&ctx_impl->free_req);
nxt_queue_init(&ctx_impl->free_ws);
nxt_queue_init(&ctx_impl->active_req);
+ nxt_queue_init(&ctx_impl->ready_req);
ctx_impl->free_buf = NULL;
nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]);
@@ -973,8 +991,8 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
static int
nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
{
+ int res;
nxt_unit_impl_t *lib;
- nxt_unit_port_t *port;
nxt_unit_port_id_t port_id;
nxt_unit_request_t *r;
nxt_unit_mmap_buf_t *b;
@@ -1004,28 +1022,8 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
return NXT_UNIT_ERROR;
}
- nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port);
-
- lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
-
- pthread_mutex_lock(&lib->mutex);
-
- port = nxt_unit_port_hash_find(&lib->ports, &port_id, 0);
-
- pthread_mutex_unlock(&lib->mutex);
-
- if (nxt_slow_path(port == NULL)) {
- nxt_unit_alert(ctx, "#%"PRIu32": response port %d,%d not found",
- recv_msg->stream,
- (int) recv_msg->pid, (int) recv_msg->reply_port);
-
- return NXT_UNIT_ERROR;
- }
-
req = &req_impl->req;
- req->response_port = port;
-
req->request = recv_msg->start;
b = recv_msg->incoming_buf;
@@ -1076,13 +1074,130 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
(char *) nxt_unit_sptr_get(&r->target),
(int) r->content_length);
- lib->callbacks.request_handler(req);
+ nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port);
+
+ res = nxt_unit_request_check_response_port(req, &port_id);
+
+ if (nxt_fast_path(res == NXT_UNIT_OK)) {
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+ lib->callbacks.request_handler(req);
+ }
return NXT_UNIT_OK;
}
static int
+nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
+ nxt_unit_port_id_t *port_id)
+{
+ int res;
+ nxt_unit_ctx_t *ctx;
+ nxt_unit_impl_t *lib;
+ nxt_unit_port_t *port;
+ nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_port_impl_t *port_impl;
+ nxt_unit_request_info_impl_t *req_impl;
+
+ ctx = req->ctx;
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
+ pthread_mutex_lock(&lib->mutex);
+
+ port = nxt_unit_port_hash_find(&lib->ports, port_id, 0);
+ port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
+
+ if (nxt_fast_path(port != NULL)) {
+ req->response_port = port;
+
+ if (nxt_fast_path(port_impl->ready)) {
+ pthread_mutex_unlock(&lib->mutex);
+
+ nxt_unit_debug(ctx, "check_response_port: found port{%d,%d}",
+ (int) port->id.pid, (int) port->id.id);
+
+ return NXT_UNIT_OK;
+ }
+
+ nxt_unit_debug(ctx, "check_response_port: "
+ "port{%d,%d} already requested",
+ (int) port->id.pid, (int) port->id.id);
+
+ req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
+
+ nxt_queue_insert_tail(&port_impl->awaiting_req,
+ &req_impl->port_wait_link);
+
+ pthread_mutex_unlock(&lib->mutex);
+
+ nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
+
+ return NXT_UNIT_AGAIN;
+ }
+
+ port_impl = malloc(sizeof(nxt_unit_port_impl_t));
+ if (nxt_slow_path(port_impl == NULL)) {
+ nxt_unit_alert(ctx, "check_response_port: malloc(%d) failed",
+ (int) sizeof(nxt_unit_port_impl_t));
+
+ pthread_mutex_unlock(&lib->mutex);
+
+ return NXT_UNIT_ERROR;
+ }
+
+ port = &port_impl->port;
+
+ port->id = *port_id;
+ port->in_fd = -1;
+ port->out_fd = -1;
+ port->data = NULL;
+
+ res = nxt_unit_port_hash_add(&lib->ports, port);
+ if (nxt_slow_path(res != NXT_UNIT_OK)) {
+ nxt_unit_alert(ctx, "check_response_port: %d,%d hash_add failed",
+ port->id.pid, port->id.id);
+
+ pthread_mutex_unlock(&lib->mutex);
+
+ free(port);
+
+ return NXT_UNIT_ERROR;
+ }
+
+ req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
+
+ nxt_queue_insert_tail(&req_impl->process->ports, &port_impl->link);
+
+ port_impl->process = req_impl->process;
+
+
+ nxt_queue_init(&port_impl->awaiting_req);
+
+ nxt_queue_insert_tail(&port_impl->awaiting_req, &req_impl->port_wait_link);
+
+ port_impl->use_count = 2;
+ port_impl->ready = 0;
+
+ req->response_port = port;
+
+ pthread_mutex_unlock(&lib->mutex);
+
+ nxt_unit_process_use(port_impl->process);
+
+ res = nxt_unit_get_port(ctx, port_id);
+ if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
+
+ return NXT_UNIT_AGAIN;
+}
+
+
+static int
nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
{
size_t hsize;
@@ -4041,6 +4156,8 @@ nxt_unit_run_once(nxt_unit_ctx_t *ctx)
nxt_unit_read_buf_release(ctx, rbuf);
+ nxt_unit_process_ready_req(ctx_impl);
+
nxt_unit_ctx_release(ctx_impl);
return rc;
@@ -4062,6 +4179,39 @@ nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
}
+static void
+nxt_unit_process_ready_req(nxt_unit_ctx_impl_t *ctx_impl)
+{
+ nxt_queue_t ready_req;
+ nxt_unit_impl_t *lib;
+ nxt_unit_request_info_impl_t *req_impl;
+
+ nxt_queue_init(&ready_req);
+
+ pthread_mutex_lock(&ctx_impl->mutex);
+
+ if (nxt_queue_is_empty(&ctx_impl->ready_req)) {
+ pthread_mutex_unlock(&ctx_impl->mutex);
+
+ return;
+ }
+
+ nxt_queue_add(&ready_req, &ctx_impl->ready_req);
+ nxt_queue_init(&ctx_impl->ready_req);
+
+ pthread_mutex_unlock(&ctx_impl->mutex);
+
+ nxt_queue_each(req_impl, &ready_req,
+ nxt_unit_request_info_impl_t, port_wait_link)
+ {
+ lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit);
+
+ lib->callbacks.request_handler(&req_impl->req);
+
+ } nxt_queue_loop;
+}
+
+
void
nxt_unit_done(nxt_unit_ctx_t *ctx)
{
@@ -4371,11 +4521,14 @@ nxt_unit_port_process(nxt_unit_port_t *port)
static nxt_unit_port_t *
nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
{
- int rc;
- nxt_unit_impl_t *lib;
- nxt_unit_port_t *old_port;
- nxt_unit_process_t *process;
- nxt_unit_port_impl_t *new_port;
+ int rc;
+ nxt_queue_t awaiting_req;
+ nxt_unit_impl_t *lib;
+ nxt_unit_port_t *old_port;
+ nxt_unit_process_t *process;
+ nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_port_impl_t *new_port, *old_port_impl;
+ nxt_unit_request_info_impl_t *req_impl;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
@@ -4415,6 +4568,17 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
*port = *old_port;
+ nxt_queue_init(&awaiting_req);
+
+ old_port_impl = nxt_container_of(old_port, nxt_unit_port_impl_t, port);
+
+ if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) {
+ nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req);
+ nxt_queue_init(&old_port_impl->awaiting_req);
+ }
+
+ old_port_impl->ready = (port->in_fd != -1 || port->out_fd != -1);
+
pthread_mutex_unlock(&lib->mutex);
if (lib->callbacks.add_port != NULL
@@ -4423,6 +4587,25 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
lib->callbacks.add_port(ctx, old_port);
}
+ nxt_queue_each(req_impl, &awaiting_req,
+ nxt_unit_request_info_impl_t, port_wait_link)
+ {
+ nxt_queue_remove(&req_impl->port_wait_link);
+
+ ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t,
+ ctx);
+
+ pthread_mutex_lock(&ctx_impl->mutex);
+
+ nxt_queue_insert_tail(&ctx_impl->ready_req,
+ &req_impl->port_wait_link);
+
+ pthread_mutex_unlock(&ctx_impl->mutex);
+
+ nxt_atomic_fetch_add(&ctx_impl->wait_items, -1);
+
+ } nxt_queue_loop;
+
return old_port;
}
@@ -4464,6 +4647,9 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
new_port->use_count = 2;
new_port->process = process;
+ new_port->ready = (port->in_fd != -1 || port->out_fd != -1);
+
+ nxt_queue_init(&new_port->awaiting_req);
process = NULL;
@@ -4608,6 +4794,42 @@ nxt_unit_quit(nxt_unit_ctx_t *ctx)
}
+static int
+nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
+{
+ ssize_t res;
+ nxt_unit_impl_t *lib;
+ nxt_unit_ctx_impl_t *ctx_impl;
+
+ struct {
+ nxt_port_msg_t msg;
+ nxt_port_msg_get_port_t get_port;
+ } m;
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
+ memset(&m.msg, 0, sizeof(nxt_port_msg_t));
+
+ m.msg.pid = lib->pid;
+ m.msg.reply_port = ctx_impl->read_port->id.id;
+ m.msg.type = _NXT_PORT_MSG_GET_PORT;
+
+ m.get_port.id = port_id->id;
+ m.get_port.pid = port_id->pid;
+
+ nxt_unit_debug(ctx, "get_port: %d %d", (int) port_id->pid,
+ (int) port_id->id);
+
+ res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL, 0);
+ if (nxt_slow_path(res != sizeof(m))) {
+ return NXT_UNIT_ERROR;
+ }
+
+ return NXT_UNIT_OK;
+}
+
+
static ssize_t
nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
const void *buf, size_t buf_size, const void *oob, size_t oob_size)
diff --git a/src/nxt_unit.h b/src/nxt_unit.h
index 6723026f..8fa64f4e 100644
--- a/src/nxt_unit.h
+++ b/src/nxt_unit.h
@@ -19,6 +19,7 @@
enum {
NXT_UNIT_OK = 0,
NXT_UNIT_ERROR = 1,
+ NXT_UNIT_AGAIN = 2,
};
enum {