diff options
-rw-r--r-- | src/nxt_port.h | 9 | ||||
-rw-r--r-- | src/nxt_process.c | 37 | ||||
-rw-r--r-- | src/nxt_process.h | 9 | ||||
-rw-r--r-- | src/nxt_router.c | 100 | ||||
-rw-r--r-- | src/nxt_runtime.c | 7 | ||||
-rw-r--r-- | src/nxt_unit.c | 276 | ||||
-rw-r--r-- | src/nxt_unit.h | 1 |
7 files changed, 342 insertions, 97 deletions
diff --git a/src/nxt_port.h b/src/nxt_port.h index 0e8707f3..838a7ffe 100644 --- a/src/nxt_port.h +++ b/src/nxt_port.h @@ -25,6 +25,7 @@ struct nxt_port_handlers_s { /* File descriptor exchange. */ nxt_port_handler_t change_file; nxt_port_handler_t new_port; + nxt_port_handler_t get_port; nxt_port_handler_t mmap; /* New process */ @@ -77,6 +78,7 @@ typedef enum { _NXT_PORT_MSG_CHANGE_FILE = nxt_port_handler_idx(change_file), _NXT_PORT_MSG_NEW_PORT = nxt_port_handler_idx(new_port), + _NXT_PORT_MSG_GET_PORT = nxt_port_handler_idx(get_port), _NXT_PORT_MSG_MMAP = nxt_port_handler_idx(mmap), _NXT_PORT_MSG_PROCESS_CREATED = nxt_port_handler_idx(process_created), @@ -107,6 +109,7 @@ typedef enum { NXT_PORT_MSG_ACCESS_LOG = nxt_msg_last(_NXT_PORT_MSG_ACCESS_LOG), NXT_PORT_MSG_CHANGE_FILE = nxt_msg_last(_NXT_PORT_MSG_CHANGE_FILE), NXT_PORT_MSG_NEW_PORT = nxt_msg_last(_NXT_PORT_MSG_NEW_PORT), + NXT_PORT_MSG_GET_PORT = nxt_msg_last(_NXT_PORT_MSG_GET_PORT), NXT_PORT_MSG_MMAP = nxt_msg_last(_NXT_PORT_MSG_MMAP) | NXT_PORT_MSG_CLOSE_FD | NXT_PORT_MSG_SYNC, @@ -238,6 +241,12 @@ typedef struct { } nxt_port_msg_new_port_t; +typedef struct { + nxt_port_id_t id; + nxt_pid_t pid; +} nxt_port_msg_get_port_t; + + /* * nxt_port_data_t size is allocation size * which enables effective reuse of memory pool cache. diff --git a/src/nxt_process.c b/src/nxt_process.c index 5a01c21e..0b3aa40f 100644 --- a/src/nxt_process.c +++ b/src/nxt_process.c @@ -1108,43 +1108,6 @@ nxt_process_close_ports(nxt_task_t *task, nxt_process_t *process) void -nxt_process_connected_port_add(nxt_process_t *process, nxt_port_t *port) -{ - nxt_thread_mutex_lock(&process->cp_mutex); - - nxt_port_hash_add(&process->connected_ports, port); - - nxt_thread_mutex_unlock(&process->cp_mutex); -} - - -void -nxt_process_connected_port_remove(nxt_process_t *process, nxt_port_t *port) -{ - nxt_thread_mutex_lock(&process->cp_mutex); - - nxt_port_hash_remove(&process->connected_ports, port); - - nxt_thread_mutex_unlock(&process->cp_mutex); -} - - -nxt_port_t * -nxt_process_connected_port_find(nxt_process_t *process, nxt_port_t *port) -{ - nxt_port_t *res; - - nxt_thread_mutex_lock(&process->cp_mutex); - - res = nxt_port_hash_find(&process->connected_ports, port->pid, port->id); - - nxt_thread_mutex_unlock(&process->cp_mutex); - - return res; -} - - -void nxt_process_quit(nxt_task_t *task, nxt_uint_t exit_status) { nxt_uint_t n; diff --git a/src/nxt_process.h b/src/nxt_process.h index d3311722..4076cefc 100644 --- a/src/nxt_process.h +++ b/src/nxt_process.h @@ -95,7 +95,6 @@ typedef struct { nxt_port_mmaps_t outgoing; nxt_thread_mutex_t cp_mutex; - nxt_lvlhsh_t connected_ports; /* of nxt_port_t */ uint32_t stream; @@ -172,14 +171,6 @@ void nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i); void nxt_process_close_ports(nxt_task_t *task, nxt_process_t *process); -void nxt_process_connected_port_add(nxt_process_t *process, nxt_port_t *port); - -void nxt_process_connected_port_remove(nxt_process_t *process, - nxt_port_t *port); - -nxt_port_t *nxt_process_connected_port_find(nxt_process_t *process, - nxt_port_t *port); - void nxt_process_quit(nxt_task_t *task, nxt_uint_t exit_status); void nxt_signal_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); diff --git a/src/nxt_router.c b/src/nxt_router.c index 758310a9..3380e133 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -182,6 +182,8 @@ static void nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs); static void nxt_router_thread_start(void *data); +static void nxt_router_rt_add_port(nxt_task_t *task, void *obj, + void *data); static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data); static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj, @@ -253,6 +255,8 @@ static nxt_int_t nxt_router_http_request_done(nxt_task_t *task, static void nxt_router_http_request_release(nxt_task_t *task, void *obj, void *data); static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); +static void nxt_router_get_port_handler(nxt_task_t *task, + nxt_port_recv_msg_t *msg); extern const nxt_http_request_state_t nxt_http_websocket; @@ -274,6 +278,7 @@ static const nxt_str_t *nxt_app_msg_prefix[] = { static const nxt_port_handlers_t nxt_router_process_port_handlers = { .quit = nxt_signal_quit_handler, .new_port = nxt_router_new_port_handler, + .get_port = nxt_router_get_port_handler, .change_file = nxt_port_change_log_file_handler, .mmap = nxt_port_mmap_handler, .data = nxt_router_conf_data_handler, @@ -2944,6 +2949,7 @@ nxt_router_thread_start(void *data) nxt_int_t ret; nxt_port_t *port; nxt_task_t *task; + nxt_work_t *work; nxt_thread_t *thread; nxt_thread_link_t *link; nxt_event_engine_t *engine; @@ -2988,11 +2994,43 @@ nxt_router_thread_start(void *data) nxt_port_enable(task, port, &nxt_router_app_port_handlers); + work = nxt_zalloc(sizeof(nxt_work_t)); + if (nxt_slow_path(work == NULL)) { + return; + } + + work->handler = nxt_router_rt_add_port; + work->task = link->work.task; + work->obj = work; + work->data = port; + + nxt_event_engine_post(link->work.task->thread->engine, work); + nxt_event_engine_start(engine); } static void +nxt_router_rt_add_port(nxt_task_t *task, void *obj, void *data) +{ + nxt_int_t res; + nxt_port_t *port; + nxt_runtime_t *rt; + + rt = task->thread->runtime; + port = data; + + nxt_free(obj); + + res = nxt_port_hash_add(&rt->ports, port); + + if (nxt_fast_path(res == NXT_OK)) { + nxt_port_use(task, port, 1); + } +} + + +static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data) { nxt_joint_job_t *job; @@ -3281,7 +3319,6 @@ nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint) } /* TODO remove engine->port */ - /* TODO excude from connected ports */ if (rtcf != NULL) { nxt_debug(task, "old router conf is destroyed"); @@ -4937,7 +4974,7 @@ nxt_router_app_prepare_request(nxt_task_t *task, { nxt_buf_t *buf; nxt_int_t res; - nxt_port_t *port, *c_port, *reply_port; + nxt_port_t *port, *reply_port; nxt_apr_action_t apr_action; nxt_assert(req_app_link->app_port != NULL); @@ -4947,21 +4984,6 @@ nxt_router_app_prepare_request(nxt_task_t *task, apr_action = NXT_APR_REQUEST_FAILED; - c_port = nxt_process_connected_port_find(port->process, reply_port); - - if (nxt_slow_path(c_port != reply_port)) { - res = nxt_port_send_port(task, port, reply_port, 0); - - if (nxt_slow_path(res != NXT_OK)) { - nxt_request_app_link_error(task, port->app, req_app_link, - "Failed to send reply port to application"); - - goto release_port; - } - - nxt_process_connected_port_add(port->process, reply_port); - } - buf = nxt_router_prepare_msg(task, req_app_link->request, port, nxt_app_msg_prefix[port->app->type]); @@ -5531,3 +5553,47 @@ nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) -1, 0, 0, NULL); } } + + +static void +nxt_router_get_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + nxt_port_t *port, *reply_port; + nxt_runtime_t *rt; + nxt_port_msg_get_port_t *get_port_msg; + + rt = task->thread->runtime; + + reply_port = nxt_runtime_port_find(rt, msg->port_msg.pid, + msg->port_msg.reply_port); + if (nxt_slow_path(reply_port == NULL)) { + nxt_alert(task, "get_port_handler: reply_port %PI:%d not found", + msg->port_msg.pid, msg->port_msg.reply_port); + + return; + } + + if (nxt_slow_path(nxt_buf_used_size(msg->buf) + < (int) sizeof(nxt_port_msg_get_port_t))) + { + nxt_alert(task, "get_port_handler: message buffer too small (%d)", + (int) nxt_buf_used_size(msg->buf)); + + return; + } + + get_port_msg = (nxt_port_msg_get_port_t *) msg->buf->mem.pos; + + port = nxt_runtime_port_find(rt, get_port_msg->pid, get_port_msg->id); + if (nxt_slow_path(port == NULL)) { + nxt_alert(task, "get_port_handler: port %PI:%d not found", + get_port_msg->pid, get_port_msg->id); + + return; + } + + nxt_debug(task, "get port %PI:%d found", get_port_msg->pid, + get_port_msg->id); + + (void) nxt_port_send_port(task, reply_port, port, msg->port_msg.stream); +} diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c index 694ce74d..c25b93cc 100644 --- a/src/nxt_runtime.c +++ b/src/nxt_runtime.c @@ -1389,8 +1389,6 @@ nxt_runtime_process_new(nxt_runtime_t *rt) void nxt_runtime_process_release(nxt_runtime_t *rt, nxt_process_t *process) { - nxt_port_t *port; - if (process->registered == 1) { nxt_runtime_process_remove(rt, process); } @@ -1401,11 +1399,6 @@ nxt_runtime_process_release(nxt_runtime_t *rt, nxt_process_t *process) nxt_port_mmaps_destroy(&process->incoming, 1); nxt_port_mmaps_destroy(&process->outgoing, 1); - do { - port = nxt_port_hash_retrieve(&process->connected_ports); - - } while (port != NULL); - nxt_thread_mutex_destroy(&process->incoming.mutex); nxt_thread_mutex_destroy(&process->outgoing.mutex); nxt_thread_mutex_destroy(&process->cp_mutex); diff --git a/src/nxt_unit.c b/src/nxt_unit.c index ddfd9c80..c1ef977f 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -55,6 +55,8 @@ static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); +static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, + nxt_unit_port_id_t *port_id); static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx); @@ -119,6 +121,7 @@ static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib, static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib); static void nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf); +static void nxt_unit_process_ready_req(nxt_unit_ctx_impl_t *ctx_impl); static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl); static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx); @@ -138,6 +141,7 @@ static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid); static void nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process); static void nxt_unit_quit(nxt_unit_ctx_t *ctx); +static int nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id); static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, const void *buf, size_t buf_size, const void *oob, size_t oob_size); @@ -215,7 +219,10 @@ struct nxt_unit_request_info_impl_s { nxt_unit_req_state_t state; uint8_t websocket; + /* for nxt_unit_ctx_impl_t.free_req or active_req */ nxt_queue_link_t link; + /* for nxt_unit_port_impl_t.awaiting_req */ + nxt_queue_link_t port_wait_link; char extra_data[]; }; @@ -244,6 +251,7 @@ struct nxt_unit_ctx_impl_s { nxt_unit_ctx_t ctx; nxt_atomic_t use_count; + nxt_atomic_t wait_items; pthread_mutex_t mutex; @@ -265,6 +273,9 @@ struct nxt_unit_ctx_impl_s { /* of nxt_unit_request_info_impl_t */ nxt_lvlhsh_t requests; + /* of nxt_unit_request_info_impl_t */ + nxt_queue_t ready_req; + nxt_unit_read_buf_t *pending_read_head; nxt_unit_read_buf_t **pending_read_tail; nxt_unit_read_buf_t *free_read_buf; @@ -309,6 +320,11 @@ struct nxt_unit_port_impl_s { nxt_queue_link_t link; nxt_unit_process_t *process; + + /* of nxt_unit_request_info_impl_t */ + nxt_queue_t awaiting_req; + + int ready; }; @@ -515,10 +531,12 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link); ctx_impl->use_count = 1; + ctx_impl->wait_items = 0; nxt_queue_init(&ctx_impl->free_req); nxt_queue_init(&ctx_impl->free_ws); nxt_queue_init(&ctx_impl->active_req); + nxt_queue_init(&ctx_impl->ready_req); ctx_impl->free_buf = NULL; nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]); @@ -973,8 +991,8 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) { + int res; nxt_unit_impl_t *lib; - nxt_unit_port_t *port; nxt_unit_port_id_t port_id; nxt_unit_request_t *r; nxt_unit_mmap_buf_t *b; @@ -1004,28 +1022,8 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) return NXT_UNIT_ERROR; } - nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port); - - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - - pthread_mutex_lock(&lib->mutex); - - port = nxt_unit_port_hash_find(&lib->ports, &port_id, 0); - - pthread_mutex_unlock(&lib->mutex); - - if (nxt_slow_path(port == NULL)) { - nxt_unit_alert(ctx, "#%"PRIu32": response port %d,%d not found", - recv_msg->stream, - (int) recv_msg->pid, (int) recv_msg->reply_port); - - return NXT_UNIT_ERROR; - } - req = &req_impl->req; - req->response_port = port; - req->request = recv_msg->start; b = recv_msg->incoming_buf; @@ -1076,13 +1074,130 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) (char *) nxt_unit_sptr_get(&r->target), (int) r->content_length); - lib->callbacks.request_handler(req); + nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port); + + res = nxt_unit_request_check_response_port(req, &port_id); + + if (nxt_fast_path(res == NXT_UNIT_OK)) { + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + + lib->callbacks.request_handler(req); + } return NXT_UNIT_OK; } static int +nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, + nxt_unit_port_id_t *port_id) +{ + int res; + nxt_unit_ctx_t *ctx; + nxt_unit_impl_t *lib; + nxt_unit_port_t *port; + nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_port_impl_t *port_impl; + nxt_unit_request_info_impl_t *req_impl; + + ctx = req->ctx; + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + pthread_mutex_lock(&lib->mutex); + + port = nxt_unit_port_hash_find(&lib->ports, port_id, 0); + port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); + + if (nxt_fast_path(port != NULL)) { + req->response_port = port; + + if (nxt_fast_path(port_impl->ready)) { + pthread_mutex_unlock(&lib->mutex); + + nxt_unit_debug(ctx, "check_response_port: found port{%d,%d}", + (int) port->id.pid, (int) port->id.id); + + return NXT_UNIT_OK; + } + + nxt_unit_debug(ctx, "check_response_port: " + "port{%d,%d} already requested", + (int) port->id.pid, (int) port->id.id); + + req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); + + nxt_queue_insert_tail(&port_impl->awaiting_req, + &req_impl->port_wait_link); + + pthread_mutex_unlock(&lib->mutex); + + nxt_atomic_fetch_add(&ctx_impl->wait_items, 1); + + return NXT_UNIT_AGAIN; + } + + port_impl = malloc(sizeof(nxt_unit_port_impl_t)); + if (nxt_slow_path(port_impl == NULL)) { + nxt_unit_alert(ctx, "check_response_port: malloc(%d) failed", + (int) sizeof(nxt_unit_port_impl_t)); + + pthread_mutex_unlock(&lib->mutex); + + return NXT_UNIT_ERROR; + } + + port = &port_impl->port; + + port->id = *port_id; + port->in_fd = -1; + port->out_fd = -1; + port->data = NULL; + + res = nxt_unit_port_hash_add(&lib->ports, port); + if (nxt_slow_path(res != NXT_UNIT_OK)) { + nxt_unit_alert(ctx, "check_response_port: %d,%d hash_add failed", + port->id.pid, port->id.id); + + pthread_mutex_unlock(&lib->mutex); + + free(port); + + return NXT_UNIT_ERROR; + } + + req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); + + nxt_queue_insert_tail(&req_impl->process->ports, &port_impl->link); + + port_impl->process = req_impl->process; + + + nxt_queue_init(&port_impl->awaiting_req); + + nxt_queue_insert_tail(&port_impl->awaiting_req, &req_impl->port_wait_link); + + port_impl->use_count = 2; + port_impl->ready = 0; + + req->response_port = port; + + pthread_mutex_unlock(&lib->mutex); + + nxt_unit_process_use(port_impl->process); + + res = nxt_unit_get_port(ctx, port_id); + if (nxt_slow_path(res == NXT_UNIT_ERROR)) { + return NXT_UNIT_ERROR; + } + + nxt_atomic_fetch_add(&ctx_impl->wait_items, 1); + + return NXT_UNIT_AGAIN; +} + + +static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) { size_t hsize; @@ -4041,6 +4156,8 @@ nxt_unit_run_once(nxt_unit_ctx_t *ctx) nxt_unit_read_buf_release(ctx, rbuf); + nxt_unit_process_ready_req(ctx_impl); + nxt_unit_ctx_release(ctx_impl); return rc; @@ -4062,6 +4179,39 @@ nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) } +static void +nxt_unit_process_ready_req(nxt_unit_ctx_impl_t *ctx_impl) +{ + nxt_queue_t ready_req; + nxt_unit_impl_t *lib; + nxt_unit_request_info_impl_t *req_impl; + + nxt_queue_init(&ready_req); + + pthread_mutex_lock(&ctx_impl->mutex); + + if (nxt_queue_is_empty(&ctx_impl->ready_req)) { + pthread_mutex_unlock(&ctx_impl->mutex); + + return; + } + + nxt_queue_add(&ready_req, &ctx_impl->ready_req); + nxt_queue_init(&ctx_impl->ready_req); + + pthread_mutex_unlock(&ctx_impl->mutex); + + nxt_queue_each(req_impl, &ready_req, + nxt_unit_request_info_impl_t, port_wait_link) + { + lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit); + + lib->callbacks.request_handler(&req_impl->req); + + } nxt_queue_loop; +} + + void nxt_unit_done(nxt_unit_ctx_t *ctx) { @@ -4371,11 +4521,14 @@ nxt_unit_port_process(nxt_unit_port_t *port) static nxt_unit_port_t * nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) { - int rc; - nxt_unit_impl_t *lib; - nxt_unit_port_t *old_port; - nxt_unit_process_t *process; - nxt_unit_port_impl_t *new_port; + int rc; + nxt_queue_t awaiting_req; + nxt_unit_impl_t *lib; + nxt_unit_port_t *old_port; + nxt_unit_process_t *process; + nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_port_impl_t *new_port, *old_port_impl; + nxt_unit_request_info_impl_t *req_impl; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); @@ -4415,6 +4568,17 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) *port = *old_port; + nxt_queue_init(&awaiting_req); + + old_port_impl = nxt_container_of(old_port, nxt_unit_port_impl_t, port); + + if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) { + nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req); + nxt_queue_init(&old_port_impl->awaiting_req); + } + + old_port_impl->ready = (port->in_fd != -1 || port->out_fd != -1); + pthread_mutex_unlock(&lib->mutex); if (lib->callbacks.add_port != NULL @@ -4423,6 +4587,25 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) lib->callbacks.add_port(ctx, old_port); } + nxt_queue_each(req_impl, &awaiting_req, + nxt_unit_request_info_impl_t, port_wait_link) + { + nxt_queue_remove(&req_impl->port_wait_link); + + ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, + ctx); + + pthread_mutex_lock(&ctx_impl->mutex); + + nxt_queue_insert_tail(&ctx_impl->ready_req, + &req_impl->port_wait_link); + + pthread_mutex_unlock(&ctx_impl->mutex); + + nxt_atomic_fetch_add(&ctx_impl->wait_items, -1); + + } nxt_queue_loop; + return old_port; } @@ -4464,6 +4647,9 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) new_port->use_count = 2; new_port->process = process; + new_port->ready = (port->in_fd != -1 || port->out_fd != -1); + + nxt_queue_init(&new_port->awaiting_req); process = NULL; @@ -4608,6 +4794,42 @@ nxt_unit_quit(nxt_unit_ctx_t *ctx) } +static int +nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) +{ + ssize_t res; + nxt_unit_impl_t *lib; + nxt_unit_ctx_impl_t *ctx_impl; + + struct { + nxt_port_msg_t msg; + nxt_port_msg_get_port_t get_port; + } m; + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + memset(&m.msg, 0, sizeof(nxt_port_msg_t)); + + m.msg.pid = lib->pid; + m.msg.reply_port = ctx_impl->read_port->id.id; + m.msg.type = _NXT_PORT_MSG_GET_PORT; + + m.get_port.id = port_id->id; + m.get_port.pid = port_id->pid; + + nxt_unit_debug(ctx, "get_port: %d %d", (int) port_id->pid, + (int) port_id->id); + + res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL, 0); + if (nxt_slow_path(res != sizeof(m))) { + return NXT_UNIT_ERROR; + } + + return NXT_UNIT_OK; +} + + static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, const void *buf, size_t buf_size, const void *oob, size_t oob_size) diff --git a/src/nxt_unit.h b/src/nxt_unit.h index 6723026f..8fa64f4e 100644 --- a/src/nxt_unit.h +++ b/src/nxt_unit.h @@ -19,6 +19,7 @@ enum { NXT_UNIT_OK = 0, NXT_UNIT_ERROR = 1, + NXT_UNIT_AGAIN = 2, }; enum { |