diff options
author | Max Romanov <max.romanov@nginx.com> | 2020-08-11 19:20:10 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2020-08-11 19:20:10 +0300 |
commit | 3cbc22a6dc45abdeade4deb364601230ddca02c1 (patch) | |
tree | 3eff409de1a4405c646a2c11633d50711fa22746 /src | |
parent | bf647588ff781e606651f001b53a4e83bb34c000 (diff) | |
download | unit-3cbc22a6dc45abdeade4deb364601230ddca02c1.tar.gz unit-3cbc22a6dc45abdeade4deb364601230ddca02c1.tar.bz2 |
Changing router to application port exchange protocol.
The application process needs to request the port from the router instead of the
latter pushing the port before sending a request to the application. This is
required to simplify the communication between the router and the application
and to prepare the router to use the application shared port and then the queue.
Diffstat (limited to 'src')
-rw-r--r-- | src/nxt_port.h | 9 | ||||
-rw-r--r-- | src/nxt_process.c | 37 | ||||
-rw-r--r-- | src/nxt_process.h | 9 | ||||
-rw-r--r-- | src/nxt_router.c | 100 | ||||
-rw-r--r-- | src/nxt_runtime.c | 7 | ||||
-rw-r--r-- | src/nxt_unit.c | 276 | ||||
-rw-r--r-- | src/nxt_unit.h | 1 |
7 files changed, 342 insertions, 97 deletions
diff --git a/src/nxt_port.h b/src/nxt_port.h index 0e8707f3..838a7ffe 100644 --- a/src/nxt_port.h +++ b/src/nxt_port.h @@ -25,6 +25,7 @@ struct nxt_port_handlers_s { /* File descriptor exchange. */ nxt_port_handler_t change_file; nxt_port_handler_t new_port; + nxt_port_handler_t get_port; nxt_port_handler_t mmap; /* New process */ @@ -77,6 +78,7 @@ typedef enum { _NXT_PORT_MSG_CHANGE_FILE = nxt_port_handler_idx(change_file), _NXT_PORT_MSG_NEW_PORT = nxt_port_handler_idx(new_port), + _NXT_PORT_MSG_GET_PORT = nxt_port_handler_idx(get_port), _NXT_PORT_MSG_MMAP = nxt_port_handler_idx(mmap), _NXT_PORT_MSG_PROCESS_CREATED = nxt_port_handler_idx(process_created), @@ -107,6 +109,7 @@ typedef enum { NXT_PORT_MSG_ACCESS_LOG = nxt_msg_last(_NXT_PORT_MSG_ACCESS_LOG), NXT_PORT_MSG_CHANGE_FILE = nxt_msg_last(_NXT_PORT_MSG_CHANGE_FILE), NXT_PORT_MSG_NEW_PORT = nxt_msg_last(_NXT_PORT_MSG_NEW_PORT), + NXT_PORT_MSG_GET_PORT = nxt_msg_last(_NXT_PORT_MSG_GET_PORT), NXT_PORT_MSG_MMAP = nxt_msg_last(_NXT_PORT_MSG_MMAP) | NXT_PORT_MSG_CLOSE_FD | NXT_PORT_MSG_SYNC, @@ -238,6 +241,12 @@ typedef struct { } nxt_port_msg_new_port_t; +typedef struct { + nxt_port_id_t id; + nxt_pid_t pid; +} nxt_port_msg_get_port_t; + + /* * nxt_port_data_t size is allocation size * which enables effective reuse of memory pool cache. diff --git a/src/nxt_process.c b/src/nxt_process.c index 5a01c21e..0b3aa40f 100644 --- a/src/nxt_process.c +++ b/src/nxt_process.c @@ -1108,43 +1108,6 @@ nxt_process_close_ports(nxt_task_t *task, nxt_process_t *process) void -nxt_process_connected_port_add(nxt_process_t *process, nxt_port_t *port) -{ - nxt_thread_mutex_lock(&process->cp_mutex); - - nxt_port_hash_add(&process->connected_ports, port); - - nxt_thread_mutex_unlock(&process->cp_mutex); -} - - -void -nxt_process_connected_port_remove(nxt_process_t *process, nxt_port_t *port) -{ - nxt_thread_mutex_lock(&process->cp_mutex); - - nxt_port_hash_remove(&process->connected_ports, port); - - nxt_thread_mutex_unlock(&process->cp_mutex); -} - - -nxt_port_t * -nxt_process_connected_port_find(nxt_process_t *process, nxt_port_t *port) -{ - nxt_port_t *res; - - nxt_thread_mutex_lock(&process->cp_mutex); - - res = nxt_port_hash_find(&process->connected_ports, port->pid, port->id); - - nxt_thread_mutex_unlock(&process->cp_mutex); - - return res; -} - - -void nxt_process_quit(nxt_task_t *task, nxt_uint_t exit_status) { nxt_uint_t n; diff --git a/src/nxt_process.h b/src/nxt_process.h index d3311722..4076cefc 100644 --- a/src/nxt_process.h +++ b/src/nxt_process.h @@ -95,7 +95,6 @@ typedef struct { nxt_port_mmaps_t outgoing; nxt_thread_mutex_t cp_mutex; - nxt_lvlhsh_t connected_ports; /* of nxt_port_t */ uint32_t stream; @@ -172,14 +171,6 @@ void nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i); void nxt_process_close_ports(nxt_task_t *task, nxt_process_t *process); -void nxt_process_connected_port_add(nxt_process_t *process, nxt_port_t *port); - -void nxt_process_connected_port_remove(nxt_process_t *process, - nxt_port_t *port); - -nxt_port_t *nxt_process_connected_port_find(nxt_process_t *process, - nxt_port_t *port); - void nxt_process_quit(nxt_task_t *task, nxt_uint_t exit_status); void nxt_signal_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); diff --git a/src/nxt_router.c b/src/nxt_router.c index 758310a9..3380e133 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -182,6 +182,8 @@ static void nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs); static void nxt_router_thread_start(void *data); +static void nxt_router_rt_add_port(nxt_task_t *task, void *obj, + void *data); static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data); static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj, @@ -253,6 +255,8 @@ static nxt_int_t nxt_router_http_request_done(nxt_task_t *task, static void nxt_router_http_request_release(nxt_task_t *task, void *obj, void *data); static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); +static void nxt_router_get_port_handler(nxt_task_t *task, + nxt_port_recv_msg_t *msg); extern const nxt_http_request_state_t nxt_http_websocket; @@ -274,6 +278,7 @@ static const nxt_str_t *nxt_app_msg_prefix[] = { static const nxt_port_handlers_t nxt_router_process_port_handlers = { .quit = nxt_signal_quit_handler, .new_port = nxt_router_new_port_handler, + .get_port = nxt_router_get_port_handler, .change_file = nxt_port_change_log_file_handler, .mmap = nxt_port_mmap_handler, .data = nxt_router_conf_data_handler, @@ -2944,6 +2949,7 @@ nxt_router_thread_start(void *data) nxt_int_t ret; nxt_port_t *port; nxt_task_t *task; + nxt_work_t *work; nxt_thread_t *thread; nxt_thread_link_t *link; nxt_event_engine_t *engine; @@ -2988,11 +2994,43 @@ nxt_router_thread_start(void *data) nxt_port_enable(task, port, &nxt_router_app_port_handlers); + work = nxt_zalloc(sizeof(nxt_work_t)); + if (nxt_slow_path(work == NULL)) { + return; + } + + work->handler = nxt_router_rt_add_port; + work->task = link->work.task; + work->obj = work; + work->data = port; + + nxt_event_engine_post(link->work.task->thread->engine, work); + nxt_event_engine_start(engine); } static void +nxt_router_rt_add_port(nxt_task_t *task, void *obj, void *data) +{ + nxt_int_t res; + nxt_port_t *port; + nxt_runtime_t *rt; + + rt = task->thread->runtime; + port = data; + + nxt_free(obj); + + res = nxt_port_hash_add(&rt->ports, port); + + if (nxt_fast_path(res == NXT_OK)) { + nxt_port_use(task, port, 1); + } +} + + +static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data) { nxt_joint_job_t *job; @@ -3281,7 +3319,6 @@ nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint) } /* TODO remove engine->port */ - /* TODO excude from connected ports */ if (rtcf != NULL) { nxt_debug(task, "old router conf is destroyed"); @@ -4937,7 +4974,7 @@ nxt_router_app_prepare_request(nxt_task_t *task, { nxt_buf_t *buf; nxt_int_t res; - nxt_port_t *port, *c_port, *reply_port; + nxt_port_t *port, *reply_port; nxt_apr_action_t apr_action; nxt_assert(req_app_link->app_port != NULL); @@ -4947,21 +4984,6 @@ nxt_router_app_prepare_request(nxt_task_t *task, apr_action = NXT_APR_REQUEST_FAILED; - c_port = nxt_process_connected_port_find(port->process, reply_port); - - if (nxt_slow_path(c_port != reply_port)) { - res = nxt_port_send_port(task, port, reply_port, 0); - - if (nxt_slow_path(res != NXT_OK)) { - nxt_request_app_link_error(task, port->app, req_app_link, - "Failed to send reply port to application"); - - goto release_port; - } - - nxt_process_connected_port_add(port->process, reply_port); - } - buf = nxt_router_prepare_msg(task, req_app_link->request, port, nxt_app_msg_prefix[port->app->type]); @@ -5531,3 +5553,47 @@ nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) -1, 0, 0, NULL); } } + + +static void +nxt_router_get_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + nxt_port_t *port, *reply_port; + nxt_runtime_t *rt; + nxt_port_msg_get_port_t *get_port_msg; + + rt = task->thread->runtime; + + reply_port = nxt_runtime_port_find(rt, msg->port_msg.pid, + msg->port_msg.reply_port); + if (nxt_slow_path(reply_port == NULL)) { + nxt_alert(task, "get_port_handler: reply_port %PI:%d not found", + msg->port_msg.pid, msg->port_msg.reply_port); + + return; + } + + if (nxt_slow_path(nxt_buf_used_size(msg->buf) + < (int) sizeof(nxt_port_msg_get_port_t))) + { + nxt_alert(task, "get_port_handler: message buffer too small (%d)", + (int) nxt_buf_used_size(msg->buf)); + + return; + } + + get_port_msg = (nxt_port_msg_get_port_t *) msg->buf->mem.pos; + + port = nxt_runtime_port_find(rt, get_port_msg->pid, get_port_msg->id); + if (nxt_slow_path(port == NULL)) { + nxt_alert(task, "get_port_handler: port %PI:%d not found", + get_port_msg->pid, get_port_msg->id); + + return; + } + + nxt_debug(task, "get port %PI:%d found", get_port_msg->pid, + get_port_msg->id); + + (void) nxt_port_send_port(task, reply_port, port, msg->port_msg.stream); +} diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c index 694ce74d..c25b93cc 100644 --- a/src/nxt_runtime.c +++ b/src/nxt_runtime.c @@ -1389,8 +1389,6 @@ nxt_runtime_process_new(nxt_runtime_t *rt) void nxt_runtime_process_release(nxt_runtime_t *rt, nxt_process_t *process) { - nxt_port_t *port; - if (process->registered == 1) { nxt_runtime_process_remove(rt, process); } @@ -1401,11 +1399,6 @@ nxt_runtime_process_release(nxt_runtime_t *rt, nxt_process_t *process) nxt_port_mmaps_destroy(&process->incoming, 1); nxt_port_mmaps_destroy(&process->outgoing, 1); - do { - port = nxt_port_hash_retrieve(&process->connected_ports); - - } while (port != NULL); - nxt_thread_mutex_destroy(&process->incoming.mutex); nxt_thread_mutex_destroy(&process->outgoing.mutex); nxt_thread_mutex_destroy(&process->cp_mutex); diff --git a/src/nxt_unit.c b/src/nxt_unit.c index ddfd9c80..c1ef977f 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -55,6 +55,8 @@ static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); +static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, + nxt_unit_port_id_t *port_id); static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx); @@ -119,6 +121,7 @@ static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib, static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib); static void nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf); +static void nxt_unit_process_ready_req(nxt_unit_ctx_impl_t *ctx_impl); static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl); static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx); @@ -138,6 +141,7 @@ static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid); static void nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process); static void nxt_unit_quit(nxt_unit_ctx_t *ctx); +static int nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id); static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, const void *buf, size_t buf_size, const void *oob, size_t oob_size); @@ -215,7 +219,10 @@ struct nxt_unit_request_info_impl_s { nxt_unit_req_state_t state; uint8_t websocket; + /* for nxt_unit_ctx_impl_t.free_req or active_req */ nxt_queue_link_t link; + /* for nxt_unit_port_impl_t.awaiting_req */ + nxt_queue_link_t port_wait_link; char extra_data[]; }; @@ -244,6 +251,7 @@ struct nxt_unit_ctx_impl_s { nxt_unit_ctx_t ctx; nxt_atomic_t use_count; + nxt_atomic_t wait_items; pthread_mutex_t mutex; @@ -265,6 +273,9 @@ struct nxt_unit_ctx_impl_s { /* of nxt_unit_request_info_impl_t */ nxt_lvlhsh_t requests; + /* of nxt_unit_request_info_impl_t */ + nxt_queue_t ready_req; + nxt_unit_read_buf_t *pending_read_head; nxt_unit_read_buf_t **pending_read_tail; nxt_unit_read_buf_t *free_read_buf; @@ -309,6 +320,11 @@ struct nxt_unit_port_impl_s { nxt_queue_link_t link; nxt_unit_process_t *process; + + /* of nxt_unit_request_info_impl_t */ + nxt_queue_t awaiting_req; + + int ready; }; @@ -515,10 +531,12 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link); ctx_impl->use_count = 1; + ctx_impl->wait_items = 0; nxt_queue_init(&ctx_impl->free_req); nxt_queue_init(&ctx_impl->free_ws); nxt_queue_init(&ctx_impl->active_req); + nxt_queue_init(&ctx_impl->ready_req); ctx_impl->free_buf = NULL; nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]); @@ -973,8 +991,8 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) { + int res; nxt_unit_impl_t *lib; - nxt_unit_port_t *port; nxt_unit_port_id_t port_id; nxt_unit_request_t *r; nxt_unit_mmap_buf_t *b; @@ -1004,28 +1022,8 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) return NXT_UNIT_ERROR; } - nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port); - - lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); - - pthread_mutex_lock(&lib->mutex); - - port = nxt_unit_port_hash_find(&lib->ports, &port_id, 0); - - pthread_mutex_unlock(&lib->mutex); - - if (nxt_slow_path(port == NULL)) { - nxt_unit_alert(ctx, "#%"PRIu32": response port %d,%d not found", - recv_msg->stream, - (int) recv_msg->pid, (int) recv_msg->reply_port); - - return NXT_UNIT_ERROR; - } - req = &req_impl->req; - req->response_port = port; - req->request = recv_msg->start; b = recv_msg->incoming_buf; @@ -1076,13 +1074,130 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) (char *) nxt_unit_sptr_get(&r->target), (int) r->content_length); - lib->callbacks.request_handler(req); + nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port); + + res = nxt_unit_request_check_response_port(req, &port_id); + + if (nxt_fast_path(res == NXT_UNIT_OK)) { + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + + lib->callbacks.request_handler(req); + } return NXT_UNIT_OK; } static int +nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, + nxt_unit_port_id_t *port_id) +{ + int res; + nxt_unit_ctx_t *ctx; + nxt_unit_impl_t *lib; + nxt_unit_port_t *port; + nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_port_impl_t *port_impl; + nxt_unit_request_info_impl_t *req_impl; + + ctx = req->ctx; + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + pthread_mutex_lock(&lib->mutex); + + port = nxt_unit_port_hash_find(&lib->ports, port_id, 0); + port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); + + if (nxt_fast_path(port != NULL)) { + req->response_port = port; + + if (nxt_fast_path(port_impl->ready)) { + pthread_mutex_unlock(&lib->mutex); + + nxt_unit_debug(ctx, "check_response_port: found port{%d,%d}", + (int) port->id.pid, (int) port->id.id); + + return NXT_UNIT_OK; + } + + nxt_unit_debug(ctx, "check_response_port: " + "port{%d,%d} already requested", + (int) port->id.pid, (int) port->id.id); + + req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); + + nxt_queue_insert_tail(&port_impl->awaiting_req, + &req_impl->port_wait_link); + + pthread_mutex_unlock(&lib->mutex); + + nxt_atomic_fetch_add(&ctx_impl->wait_items, 1); + + return NXT_UNIT_AGAIN; + } + + port_impl = malloc(sizeof(nxt_unit_port_impl_t)); + if (nxt_slow_path(port_impl == NULL)) { + nxt_unit_alert(ctx, "check_response_port: malloc(%d) failed", + (int) sizeof(nxt_unit_port_impl_t)); + + pthread_mutex_unlock(&lib->mutex); + + return NXT_UNIT_ERROR; + } + + port = &port_impl->port; + + port->id = *port_id; + port->in_fd = -1; + port->out_fd = -1; + port->data = NULL; + + res = nxt_unit_port_hash_add(&lib->ports, port); + if (nxt_slow_path(res != NXT_UNIT_OK)) { + nxt_unit_alert(ctx, "check_response_port: %d,%d hash_add failed", + port->id.pid, port->id.id); + + pthread_mutex_unlock(&lib->mutex); + + free(port); + + return NXT_UNIT_ERROR; + } + + req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); + + nxt_queue_insert_tail(&req_impl->process->ports, &port_impl->link); + + port_impl->process = req_impl->process; + + + nxt_queue_init(&port_impl->awaiting_req); + + nxt_queue_insert_tail(&port_impl->awaiting_req, &req_impl->port_wait_link); + + port_impl->use_count = 2; + port_impl->ready = 0; + + req->response_port = port; + + pthread_mutex_unlock(&lib->mutex); + + nxt_unit_process_use(port_impl->process); + + res = nxt_unit_get_port(ctx, port_id); + if (nxt_slow_path(res == NXT_UNIT_ERROR)) { + return NXT_UNIT_ERROR; + } + + nxt_atomic_fetch_add(&ctx_impl->wait_items, 1); + + return NXT_UNIT_AGAIN; +} + + +static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) { size_t hsize; @@ -4041,6 +4156,8 @@ nxt_unit_run_once(nxt_unit_ctx_t *ctx) nxt_unit_read_buf_release(ctx, rbuf); + nxt_unit_process_ready_req(ctx_impl); + nxt_unit_ctx_release(ctx_impl); return rc; @@ -4062,6 +4179,39 @@ nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) } +static void +nxt_unit_process_ready_req(nxt_unit_ctx_impl_t *ctx_impl) +{ + nxt_queue_t ready_req; + nxt_unit_impl_t *lib; + nxt_unit_request_info_impl_t *req_impl; + + nxt_queue_init(&ready_req); + + pthread_mutex_lock(&ctx_impl->mutex); + + if (nxt_queue_is_empty(&ctx_impl->ready_req)) { + pthread_mutex_unlock(&ctx_impl->mutex); + + return; + } + + nxt_queue_add(&ready_req, &ctx_impl->ready_req); + nxt_queue_init(&ctx_impl->ready_req); + + pthread_mutex_unlock(&ctx_impl->mutex); + + nxt_queue_each(req_impl, &ready_req, + nxt_unit_request_info_impl_t, port_wait_link) + { + lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit); + + lib->callbacks.request_handler(&req_impl->req); + + } nxt_queue_loop; +} + + void nxt_unit_done(nxt_unit_ctx_t *ctx) { @@ -4371,11 +4521,14 @@ nxt_unit_port_process(nxt_unit_port_t *port) static nxt_unit_port_t * nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) { - int rc; - nxt_unit_impl_t *lib; - nxt_unit_port_t *old_port; - nxt_unit_process_t *process; - nxt_unit_port_impl_t *new_port; + int rc; + nxt_queue_t awaiting_req; + nxt_unit_impl_t *lib; + nxt_unit_port_t *old_port; + nxt_unit_process_t *process; + nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_port_impl_t *new_port, *old_port_impl; + nxt_unit_request_info_impl_t *req_impl; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); @@ -4415,6 +4568,17 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) *port = *old_port; + nxt_queue_init(&awaiting_req); + + old_port_impl = nxt_container_of(old_port, nxt_unit_port_impl_t, port); + + if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) { + nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req); + nxt_queue_init(&old_port_impl->awaiting_req); + } + + old_port_impl->ready = (port->in_fd != -1 || port->out_fd != -1); + pthread_mutex_unlock(&lib->mutex); if (lib->callbacks.add_port != NULL @@ -4423,6 +4587,25 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) lib->callbacks.add_port(ctx, old_port); } + nxt_queue_each(req_impl, &awaiting_req, + nxt_unit_request_info_impl_t, port_wait_link) + { + nxt_queue_remove(&req_impl->port_wait_link); + + ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, + ctx); + + pthread_mutex_lock(&ctx_impl->mutex); + + nxt_queue_insert_tail(&ctx_impl->ready_req, + &req_impl->port_wait_link); + + pthread_mutex_unlock(&ctx_impl->mutex); + + nxt_atomic_fetch_add(&ctx_impl->wait_items, -1); + + } nxt_queue_loop; + return old_port; } @@ -4464,6 +4647,9 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) new_port->use_count = 2; new_port->process = process; + new_port->ready = (port->in_fd != -1 || port->out_fd != -1); + + nxt_queue_init(&new_port->awaiting_req); process = NULL; @@ -4608,6 +4794,42 @@ nxt_unit_quit(nxt_unit_ctx_t *ctx) } +static int +nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) +{ + ssize_t res; + nxt_unit_impl_t *lib; + nxt_unit_ctx_impl_t *ctx_impl; + + struct { + nxt_port_msg_t msg; + nxt_port_msg_get_port_t get_port; + } m; + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + memset(&m.msg, 0, sizeof(nxt_port_msg_t)); + + m.msg.pid = lib->pid; + m.msg.reply_port = ctx_impl->read_port->id.id; + m.msg.type = _NXT_PORT_MSG_GET_PORT; + + m.get_port.id = port_id->id; + m.get_port.pid = port_id->pid; + + nxt_unit_debug(ctx, "get_port: %d %d", (int) port_id->pid, + (int) port_id->id); + + res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL, 0); + if (nxt_slow_path(res != sizeof(m))) { + return NXT_UNIT_ERROR; + } + + return NXT_UNIT_OK; +} + + static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, const void *buf, size_t buf_size, const void *oob, size_t oob_size) diff --git a/src/nxt_unit.h b/src/nxt_unit.h index 6723026f..8fa64f4e 100644 --- a/src/nxt_unit.h +++ b/src/nxt_unit.h @@ -19,6 +19,7 @@ enum { NXT_UNIT_OK = 0, NXT_UNIT_ERROR = 1, + NXT_UNIT_AGAIN = 2, }; enum { |