diff options
author | Max Romanov <max.romanov@nginx.com> | 2020-08-11 19:20:13 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2020-08-11 19:20:13 +0300 |
commit | 6e31d6cd39be9d3f4ee680fc13c3fe42f5cd39e7 (patch) | |
tree | 2ef1d993674825791636486a0929ead424c8dd7a | |
parent | 3cbc22a6dc45abdeade4deb364601230ddca02c1 (diff) | |
download | unit-6e31d6cd39be9d3f4ee680fc13c3fe42f5cd39e7.tar.gz unit-6e31d6cd39be9d3f4ee680fc13c3fe42f5cd39e7.tar.bz2 |
Changing router to application shared memory exchange protocol.
The application process needs to request the shared memory segment from the
router instead of the latter pushing the segment before sending a request to
the application. This is required to simplify the communication between the
router and the application and to prepare the router for using the application
shared port and then the queue.
-rw-r--r-- | src/nxt_http_websocket.c | 3 | ||||
-rw-r--r-- | src/nxt_port.h | 7 | ||||
-rw-r--r-- | src/nxt_port_memory.c | 73 | ||||
-rw-r--r-- | src/nxt_port_memory.h | 12 | ||||
-rw-r--r-- | src/nxt_port_memory_int.h | 1 | ||||
-rw-r--r-- | src/nxt_router.c | 69 | ||||
-rw-r--r-- | src/nxt_unit.c | 377 | ||||
-rw-r--r-- | src/nxt_unit.h | 11 |
8 files changed, 374 insertions, 179 deletions
diff --git a/src/nxt_http_websocket.c b/src/nxt_http_websocket.c index fb888f5d..4d31b320 100644 --- a/src/nxt_http_websocket.c +++ b/src/nxt_http_websocket.c @@ -69,7 +69,8 @@ nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data) if (buf == NULL || buf_free_size == 0) { buf_free_size = nxt_min(frame_size, PORT_MMAP_DATA_SIZE); - buf = nxt_port_mmap_get_buf(task, req_app_link->app_port, + buf = nxt_port_mmap_get_buf(task, + &req_app_link->app_port->process->outgoing, buf_free_size); *out_tail = buf; diff --git a/src/nxt_port.h b/src/nxt_port.h index 838a7ffe..3a8a200a 100644 --- a/src/nxt_port.h +++ b/src/nxt_port.h @@ -27,6 +27,7 @@ struct nxt_port_handlers_s { nxt_port_handler_t new_port; nxt_port_handler_t get_port; nxt_port_handler_t mmap; + nxt_port_handler_t get_mmap; /* New process */ nxt_port_handler_t process_created; @@ -80,6 +81,7 @@ typedef enum { _NXT_PORT_MSG_NEW_PORT = nxt_port_handler_idx(new_port), _NXT_PORT_MSG_GET_PORT = nxt_port_handler_idx(get_port), _NXT_PORT_MSG_MMAP = nxt_port_handler_idx(mmap), + _NXT_PORT_MSG_GET_MMAP = nxt_port_handler_idx(get_mmap), _NXT_PORT_MSG_PROCESS_CREATED = nxt_port_handler_idx(process_created), _NXT_PORT_MSG_PROCESS_READY = nxt_port_handler_idx(process_ready), @@ -247,6 +249,11 @@ typedef struct { } nxt_port_msg_get_port_t; +typedef struct { + uint32_t id; +} nxt_port_msg_get_mmap_t; + + /* * nxt_port_data_t size is allocation size * which enables effective reuse of memory pool cache. diff --git a/src/nxt_port_memory.c b/src/nxt_port_memory.c index fd472cc6..1e01629e 100644 --- a/src/nxt_port_memory.c +++ b/src/nxt_port_memory.c @@ -282,8 +282,8 @@ fail: static nxt_port_mmap_handler_t * -nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, - nxt_port_t *port, nxt_bool_t tracking, nxt_int_t n) +nxt_port_new_port_mmap(nxt_task_t *task, nxt_port_mmaps_t *mmaps, + nxt_bool_t tracking, nxt_int_t n) { void *mem; nxt_fd_t fd; @@ -295,15 +295,14 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t)); if (nxt_slow_path(mmap_handler == NULL)) { - nxt_log(task, NXT_LOG_WARN, "failed to allocate mmap_handler"); + nxt_alert(task, "failed to allocate mmap_handler"); return NULL; } - port_mmap = nxt_port_mmap_at(&process->outgoing, process->outgoing.size); + port_mmap = nxt_port_mmap_at(mmaps, mmaps->size); if (nxt_slow_path(port_mmap == NULL)) { - nxt_log(task, NXT_LOG_WARN, - "failed to add port mmap to outgoing array"); + nxt_alert(task, "failed to add port mmap to mmaps array"); nxt_free(mmap_handler); return NULL; @@ -322,6 +321,7 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, } mmap_handler->hdr = mem; + mmap_handler->fd = fd; port_mmap->mmap_handler = mmap_handler; nxt_port_mmap_handler_use(mmap_handler, 1); @@ -331,10 +331,9 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map)); nxt_memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map)); - hdr->id = process->outgoing.size - 1; + hdr->id = mmaps->size - 1; hdr->src_pid = nxt_pid; - hdr->dst_pid = process->pid; - hdr->sent_over = port->id; + hdr->sent_over = 0xFFFFu; /* Mark first chunk as busy */ free_map = tracking ? hdr->free_tracking_map : hdr->free_map; @@ -347,13 +346,8 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT); nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT); - nxt_debug(task, "send mmap fd %FD to process %PI", fd, port->pid); - - /* TODO handle error */ - (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL); - - nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> %PI", - hdr->id, nxt_pid, process->pid); + nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> ...", + hdr->id, nxt_pid); return mmap_handler; @@ -361,7 +355,7 @@ remove_fail: nxt_free(mmap_handler); - process->outgoing.size--; + mmaps->size--; return NULL; } @@ -445,34 +439,28 @@ nxt_shm_open(nxt_task_t *task, size_t size) static nxt_port_mmap_handler_t * -nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c, +nxt_port_mmap_get(nxt_task_t *task, nxt_port_mmaps_t *mmaps, nxt_chunk_id_t *c, nxt_int_t n, nxt_bool_t tracking) { nxt_int_t i, res, nchunks; - nxt_process_t *process; nxt_free_map_t *free_map; nxt_port_mmap_t *port_mmap; nxt_port_mmap_t *end_port_mmap; nxt_port_mmap_header_t *hdr; nxt_port_mmap_handler_t *mmap_handler; - process = port->process; - if (nxt_slow_path(process == NULL)) { - return NULL; - } - - nxt_thread_mutex_lock(&process->outgoing.mutex); + nxt_thread_mutex_lock(&mmaps->mutex); - end_port_mmap = process->outgoing.elts + process->outgoing.size; + end_port_mmap = mmaps->elts + mmaps->size; - for (port_mmap = process->outgoing.elts; + for (port_mmap = mmaps->elts; port_mmap < end_port_mmap; port_mmap++) { mmap_handler = port_mmap->mmap_handler; hdr = mmap_handler->hdr; - if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port->id) { + if (hdr->sent_over != 0xFFFFu) { continue; } @@ -510,11 +498,11 @@ nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c, /* TODO introduce port_mmap limit and release wait. */ *c = 0; - mmap_handler = nxt_port_new_port_mmap(task, process, port, tracking, n); + mmap_handler = nxt_port_new_port_mmap(task, mmaps, tracking, n); unlock_return: - nxt_thread_mutex_unlock(&process->outgoing.mutex); + nxt_thread_mutex_unlock(&mmaps->mutex); return mmap_handler; } @@ -549,7 +537,7 @@ nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id) nxt_int_t -nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_t *port, +nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_mmaps_t *mmaps, nxt_port_mmap_tracking_t *tracking, uint32_t stream) { nxt_chunk_id_t c; @@ -558,7 +546,7 @@ nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_t *port, nxt_debug(task, "request tracking for stream #%uD", stream); - mmap_handler = nxt_port_mmap_get(task, port, &c, 1, 1); + mmap_handler = nxt_port_mmap_get(task, mmaps, &c, 1, 1); if (nxt_slow_path(mmap_handler == NULL)) { return NXT_ERROR; } @@ -680,7 +668,7 @@ nxt_port_mmap_tracking_read(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_buf_t * -nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size) +nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_mmaps_t *mmaps, size_t size) { nxt_mp_t *mp; nxt_buf_t *b; @@ -707,7 +695,7 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size) b->completion_handler = nxt_port_mmap_buf_completion; nxt_buf_set_port_mmap(b); - mmap_handler = nxt_port_mmap_get(task, port, &c, nchunks, 0); + mmap_handler = nxt_port_mmap_get(task, mmaps, &c, nchunks, 0); if (nxt_slow_path(mmap_handler == NULL)) { mp = task->thread->engine->mem_pool; nxt_mp_free(mp, b); @@ -943,9 +931,7 @@ nxt_port_mmap_read(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_port_method_t nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b) { - nxt_port_method_t m; - nxt_port_mmap_header_t *hdr; - nxt_port_mmap_handler_t *mmap_handler; + nxt_port_method_t m; m = NXT_PORT_METHOD_ANY; @@ -956,9 +942,6 @@ nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b) } if (nxt_buf_is_port_mmap(b)) { - mmap_handler = b->parent; - hdr = mmap_handler->hdr; - if (m == NXT_PORT_METHOD_PLAIN) { nxt_log_error(NXT_LOG_ERR, task->log, "mixing plain and mmap buffers, " @@ -967,16 +950,6 @@ nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b) break; } - if (port->pid != hdr->dst_pid) { - nxt_log_error(NXT_LOG_ERR, task->log, - "send mmap buffer for %PI to %PI, " - "using plain mode", hdr->dst_pid, port->pid); - - m = NXT_PORT_METHOD_PLAIN; - - break; - } - if (m == NXT_PORT_METHOD_ANY) { nxt_debug(task, "using mmap mode"); diff --git a/src/nxt_port_memory.h b/src/nxt_port_memory.h index 2cd4bd76..8e71af3d 100644 --- a/src/nxt_port_memory.h +++ b/src/nxt_port_memory.h @@ -23,7 +23,7 @@ struct nxt_port_mmap_tracking_s { }; nxt_int_t -nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_t *port, +nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_mmaps_t *mmaps, nxt_port_mmap_tracking_t *tracking, uint32_t stream); nxt_bool_t @@ -37,14 +37,12 @@ nxt_bool_t nxt_port_mmap_tracking_read(nxt_task_t *task, nxt_port_recv_msg_t *msg); /* - * Allocates nxt_but_t structure from port's mem_pool, assigns this buf 'mem' - * pointers to first available shared mem bucket(s). 'size' used as a hint to - * acquire several successive buckets if possible. - * - * This function assumes that current thread operates the 'port' exclusively. + * Allocates nxt_but_t structure from task's thread engine mem_pool, assigns + * this buf 'mem' pointers to first available shared mem bucket(s). 'size' + * used as a hint to acquire several successive buckets if possible. */ nxt_buf_t * -nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size); +nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_mmaps_t *mmaps, size_t size); nxt_int_t nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size, size_t min_size); diff --git a/src/nxt_port_memory_int.h b/src/nxt_port_memory_int.h index 87c3d833..d2524ee4 100644 --- a/src/nxt_port_memory_int.h +++ b/src/nxt_port_memory_int.h @@ -63,6 +63,7 @@ struct nxt_port_mmap_header_s { struct nxt_port_mmap_handler_s { nxt_port_mmap_header_t *hdr; nxt_atomic_t use_count; + nxt_fd_t fd; }; /* diff --git a/src/nxt_router.c b/src/nxt_router.c index 3380e133..4df1489d 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -257,6 +257,8 @@ static void nxt_router_http_request_release(nxt_task_t *task, void *obj, static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); static void nxt_router_get_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); +static void nxt_router_get_mmap_handler(nxt_task_t *task, + nxt_port_recv_msg_t *msg); extern const nxt_http_request_state_t nxt_http_websocket; @@ -281,6 +283,7 @@ static const nxt_port_handlers_t nxt_router_process_port_handlers = { .get_port = nxt_router_get_port_handler, .change_file = nxt_port_change_log_file_handler, .mmap = nxt_port_mmap_handler, + .get_mmap = nxt_router_get_mmap_handler, .data = nxt_router_conf_data_handler, .remove_pid = nxt_router_remove_pid_handler, .access_log = nxt_router_access_log_reopen_handler, @@ -5008,7 +5011,7 @@ nxt_router_app_prepare_request(nxt_task_t *task, buf = req_app_link->msg_info.buf; - res = nxt_port_mmap_get_tracking(task, port, + res = nxt_port_mmap_get_tracking(task, &port->process->outgoing, &req_app_link->msg_info.tracking, req_app_link->stream); if (nxt_slow_path(res != NXT_OK)) { @@ -5138,7 +5141,7 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r, return NULL; } - out = nxt_port_mmap_get_buf(task, port, + out = nxt_port_mmap_get_buf(task, &port->process->outgoing, nxt_min(req_size + content_length, PORT_MMAP_DATA_SIZE)); if (nxt_slow_path(out == NULL)) { return NULL; @@ -5320,7 +5323,8 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r, if (buf == NULL) { free_size = nxt_min(size, PORT_MMAP_DATA_SIZE); - buf = nxt_port_mmap_get_buf(task, port, free_size); + buf = nxt_port_mmap_get_buf(task, &port->process->outgoing, + free_size); if (nxt_slow_path(buf == NULL)) { while (out != NULL) { buf = out->next; @@ -5556,6 +5560,65 @@ nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) static void +nxt_router_get_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + nxt_fd_t fd; + nxt_port_t *port; + nxt_runtime_t *rt; + nxt_port_mmaps_t *mmaps; + nxt_port_msg_get_mmap_t *get_mmap_msg; + nxt_port_mmap_handler_t *mmap_handler; + + rt = task->thread->runtime; + + port = nxt_runtime_port_find(rt, msg->port_msg.pid, + msg->port_msg.reply_port); + if (nxt_slow_path(port == NULL)) { + nxt_alert(task, "get_mmap_handler: reply_port %PI:%d not found", + msg->port_msg.pid, msg->port_msg.reply_port); + + return; + } + + if (nxt_slow_path(nxt_buf_used_size(msg->buf) + < (int) sizeof(nxt_port_msg_get_mmap_t))) + { + nxt_alert(task, "get_mmap_handler: message buffer too small (%d)", + (int) nxt_buf_used_size(msg->buf)); + + return; + } + + get_mmap_msg = (nxt_port_msg_get_mmap_t *) msg->buf->mem.pos; + + nxt_assert(port->type == NXT_PROCESS_APP); + + mmaps = &port->process->outgoing; + nxt_thread_mutex_lock(&mmaps->mutex); + + if (nxt_slow_path(get_mmap_msg->id >= mmaps->size)) { + nxt_thread_mutex_unlock(&mmaps->mutex); + + nxt_alert(task, "get_mmap_handler: mmap id is too big (%d)", + (int) get_mmap_msg->id); + + return; + } + + mmap_handler = mmaps->elts[get_mmap_msg->id].mmap_handler; + + fd = mmap_handler->fd; + + nxt_thread_mutex_unlock(&mmaps->mutex); + + nxt_debug(task, "get mmap %PI:%d found", + msg->port_msg.pid, (int) get_mmap_msg->id); + + (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL); +} + + +static void nxt_router_get_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { nxt_port_t *port, *reply_port; diff --git a/src/nxt_unit.c b/src/nxt_unit.c index c1ef977f..b321a0d4 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -51,6 +51,7 @@ static int nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream, uint32_t *shm_limit); static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream); +static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf); static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, @@ -103,12 +104,14 @@ static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps); nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process); nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process); static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps); -static nxt_port_mmap_header_t *nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx, - nxt_unit_process_t *process, uint32_t id); static int nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, - nxt_unit_recv_msg_t *recv_msg); + nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf); +static int nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx, + nxt_unit_mmaps_t *mmaps, pid_t pid, uint32_t id, + nxt_port_mmap_header_t **hdr, nxt_unit_read_buf_t *rbuf); static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, - nxt_unit_recv_msg_t *recv_msg); + nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf); +static int nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id); static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, nxt_port_mmap_header_t *hdr, void *start, uint32_t size); @@ -240,7 +243,8 @@ struct nxt_unit_websocket_frame_impl_s { struct nxt_unit_read_buf_s { - nxt_unit_read_buf_t *next; + nxt_queue_link_t link; + nxt_unit_ctx_impl_t *ctx_impl; ssize_t size; char buf[16384]; char oob[256]; @@ -276,9 +280,11 @@ struct nxt_unit_ctx_impl_s { /* of nxt_unit_request_info_impl_t */ nxt_queue_t ready_req; - nxt_unit_read_buf_t *pending_read_head; - nxt_unit_read_buf_t **pending_read_tail; - nxt_unit_read_buf_t *free_read_buf; + /* of nxt_unit_read_buf_t */ + nxt_queue_t pending_rbuf; + + /* of nxt_unit_read_buf_t */ + nxt_queue_t free_rbuf; nxt_unit_mmap_buf_t ctx_buf[2]; nxt_unit_read_buf_t ctx_read_buf; @@ -318,6 +324,7 @@ struct nxt_unit_port_impl_s { nxt_atomic_t use_count; + /* for nxt_unit_process_t.ports */ nxt_queue_link_t link; nxt_unit_process_t *process; @@ -330,6 +337,9 @@ struct nxt_unit_port_impl_s { struct nxt_unit_mmap_s { nxt_port_mmap_header_t *hdr; + + /* of nxt_unit_read_buf_t */ + nxt_queue_t awaiting_rbuf; }; @@ -345,7 +355,7 @@ struct nxt_unit_mmaps_s { struct nxt_unit_process_s { pid_t pid; - nxt_queue_t ports; + nxt_queue_t ports; /* of nxt_unit_port_impl_t */ nxt_unit_mmaps_t incoming; nxt_unit_mmaps_t outgoing; @@ -537,17 +547,17 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, nxt_queue_init(&ctx_impl->free_ws); nxt_queue_init(&ctx_impl->active_req); nxt_queue_init(&ctx_impl->ready_req); + nxt_queue_init(&ctx_impl->pending_rbuf); + nxt_queue_init(&ctx_impl->free_rbuf); ctx_impl->free_buf = NULL; nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]); nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]); nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link); + nxt_queue_insert_tail(&ctx_impl->free_rbuf, &ctx_impl->ctx_read_buf.link); - ctx_impl->pending_read_head = NULL; - ctx_impl->pending_read_tail = &ctx_impl->pending_read_head; - ctx_impl->free_read_buf = &ctx_impl->ctx_read_buf; - ctx_impl->ctx_read_buf.next = NULL; + ctx_impl->ctx_read_buf.ctx_impl = ctx_impl; ctx_impl->req.req.ctx = &ctx_impl->ctx; ctx_impl->req.req.unit = &lib->unit; @@ -767,9 +777,8 @@ nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream) } -int -nxt_unit_process_msg(nxt_unit_ctx_t *ctx, - void *buf, size_t buf_size, void *oob, size_t oob_size) +static int +nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) { int rc; pid_t pid; @@ -783,11 +792,10 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, rc = NXT_UNIT_ERROR; recv_msg.fd = -1; recv_msg.process = NULL; - port_msg = buf; - cm = oob; + port_msg = (nxt_port_msg_t *) rbuf->buf; + cm = (struct cmsghdr *) rbuf->oob; - if (oob_size >= CMSG_SPACE(sizeof(int)) - && cm->cmsg_len == CMSG_LEN(sizeof(int)) + if (cm->cmsg_len == CMSG_LEN(sizeof(int)) && cm->cmsg_level == SOL_SOCKET && cm->cmsg_type == SCM_RIGHTS) { @@ -796,8 +804,8 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, recv_msg.incoming_buf = NULL; - if (nxt_slow_path(buf_size < sizeof(nxt_port_msg_t))) { - nxt_unit_warn(ctx, "message too small (%d bytes)", (int) buf_size); + if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) { + nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size); goto fail; } @@ -808,7 +816,7 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, recv_msg.mmap = port_msg->mmap; recv_msg.start = port_msg + 1; - recv_msg.size = buf_size - sizeof(nxt_port_msg_t); + recv_msg.size = rbuf->size - sizeof(nxt_port_msg_t); if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) { nxt_unit_warn(ctx, "#%"PRIu32": unknown message type (%d)", @@ -816,10 +824,16 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, goto fail; } - if (port_msg->tracking && nxt_unit_tracking_read(ctx, &recv_msg) == 0) { - rc = NXT_UNIT_OK; + if (port_msg->tracking) { + rc = nxt_unit_tracking_read(ctx, &recv_msg, rbuf); - goto fail; + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + if (rc == NXT_UNIT_AGAIN) { + recv_msg.fd = -1; + } + + goto fail; + } } /* Fragmentation is unsupported. */ @@ -830,7 +844,13 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, } if (port_msg->mmap) { - if (nxt_unit_mmap_read(ctx, &recv_msg) != NXT_UNIT_OK) { + rc = nxt_unit_mmap_read(ctx, &recv_msg, rbuf); + + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + if (rc == NXT_UNIT_AGAIN) { + recv_msg.fd = -1; + } + goto fail; } } @@ -1077,6 +1097,9 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port); res = nxt_unit_request_check_response_port(req, &port_id); + if (nxt_slow_path(res == NXT_UNIT_ERROR)) { + return NXT_UNIT_ERROR; + } if (nxt_fast_path(res == NXT_UNIT_OK)) { lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); @@ -2376,33 +2399,41 @@ static nxt_unit_read_buf_t * nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx) { nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_read_buf_t *rbuf; ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); pthread_mutex_lock(&ctx_impl->mutex); - return nxt_unit_read_buf_get_impl(ctx_impl); + rbuf = nxt_unit_read_buf_get_impl(ctx_impl); + + pthread_mutex_unlock(&ctx_impl->mutex); + + return rbuf; } static nxt_unit_read_buf_t * nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl) { + nxt_queue_link_t *link; nxt_unit_read_buf_t *rbuf; - if (ctx_impl->free_read_buf != NULL) { - rbuf = ctx_impl->free_read_buf; - ctx_impl->free_read_buf = rbuf->next; + if (!nxt_queue_is_empty(&ctx_impl->free_rbuf)) { + link = nxt_queue_first(&ctx_impl->free_rbuf); + nxt_queue_remove(link); - pthread_mutex_unlock(&ctx_impl->mutex); + rbuf = nxt_container_of(link, nxt_unit_read_buf_t, link); return rbuf; } - pthread_mutex_unlock(&ctx_impl->mutex); - rbuf = malloc(sizeof(nxt_unit_read_buf_t)); + if (nxt_fast_path(rbuf != NULL)) { + rbuf->ctx_impl = ctx_impl; + } + return rbuf; } @@ -2417,8 +2448,7 @@ nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx, pthread_mutex_lock(&ctx_impl->mutex); - rbuf->next = ctx_impl->free_read_buf; - ctx_impl->free_read_buf = rbuf; + nxt_queue_insert_head(&ctx_impl->free_rbuf, &rbuf->link); pthread_mutex_unlock(&ctx_impl->mutex); } @@ -3255,9 +3285,7 @@ nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx) pthread_mutex_lock(&ctx_impl->mutex); - *ctx_impl->pending_read_tail = rbuf; - ctx_impl->pending_read_tail = &rbuf->next; - rbuf->next = NULL; + nxt_queue_insert_tail(&ctx_impl->pending_rbuf, &rbuf->link); pthread_mutex_unlock(&ctx_impl->mutex); @@ -3275,7 +3303,12 @@ nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx) static nxt_unit_mmap_t * nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i) { - uint32_t cap; + uint32_t cap, n; + nxt_unit_mmap_t *e; + + if (nxt_fast_path(mmaps->size > i)) { + return mmaps->elts + i; + } cap = mmaps->cap; @@ -3295,13 +3328,19 @@ nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i) if (cap != mmaps->cap) { - mmaps->elts = realloc(mmaps->elts, cap * sizeof(*mmaps->elts)); - if (nxt_slow_path(mmaps->elts == NULL)) { + e = realloc(mmaps->elts, cap * sizeof(nxt_unit_mmap_t)); + if (nxt_slow_path(e == NULL)) { return NULL; } - memset(mmaps->elts + mmaps->cap, 0, - sizeof(*mmaps->elts) * (cap - mmaps->cap)); + mmaps->elts = e; + + for (n = mmaps->cap; n < cap; n++) { + e = mmaps->elts + n; + + e->hdr = NULL; + nxt_queue_init(&e->awaiting_rbuf); + } mmaps->cap = cap; } @@ -3581,13 +3620,16 @@ nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd) { - int rc; - void *mem; - struct stat mmap_stat; - nxt_unit_mmap_t *mm; - nxt_unit_impl_t *lib; - nxt_unit_process_t *process; - nxt_port_mmap_header_t *hdr; + int rc; + void *mem; + nxt_queue_t awaiting_rbuf; + struct stat mmap_stat; + nxt_unit_mmap_t *mm; + nxt_unit_impl_t *lib; + nxt_unit_process_t *process; + nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_read_buf_t *rbuf; + nxt_port_mmap_header_t *hdr; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); @@ -3626,7 +3668,7 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd) hdr = mem; - if (nxt_slow_path(hdr->src_pid != pid || hdr->dst_pid != lib->pid)) { + if (nxt_slow_path(hdr->src_pid != pid)) { nxt_unit_warn(ctx, "incoming_mmap: unexpected pid in mmap header " "detected: %d != %d or %d != %d", (int) hdr->src_pid, @@ -3637,6 +3679,8 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd) goto fail; } + nxt_queue_init(&awaiting_rbuf); + pthread_mutex_lock(&process->incoming.mutex); mm = nxt_unit_mmap_at(&process->incoming, hdr->id); @@ -3650,11 +3694,28 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd) hdr->sent_over = 0xFFFFu; + nxt_queue_add(&awaiting_rbuf, &mm->awaiting_rbuf); + nxt_queue_init(&mm->awaiting_rbuf); + rc = NXT_UNIT_OK; } pthread_mutex_unlock(&process->incoming.mutex); + nxt_queue_each(rbuf, &awaiting_rbuf, nxt_unit_read_buf_t, link) { + + ctx_impl = rbuf->ctx_impl; + + pthread_mutex_lock(&ctx_impl->mutex); + + nxt_queue_insert_head(&ctx_impl->pending_rbuf, &rbuf->link); + + pthread_mutex_unlock(&ctx_impl->mutex); + + nxt_atomic_fetch_add(&ctx_impl->wait_items, -1); + + } nxt_queue_loop; + fail: nxt_unit_process_release(process); @@ -3719,27 +3780,11 @@ nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps) } -static nxt_port_mmap_header_t * -nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, - uint32_t id) -{ - nxt_port_mmap_header_t *hdr; - - if (nxt_fast_path(process->incoming.size > id)) { - hdr = process->incoming.elts[id].hdr; - - } else { - hdr = NULL; - } - - return hdr; -} - - static int -nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) +nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, + nxt_unit_read_buf_t *rbuf) { - int rc; + int res; nxt_chunk_id_t c; nxt_unit_process_t *process; nxt_port_mmap_header_t *hdr; @@ -3749,7 +3794,7 @@ nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: too small message (%d)", recv_msg->stream, (int) recv_msg->size); - return 0; + return NXT_UNIT_ERROR; } tracking_msg = recv_msg->start; @@ -3759,44 +3804,95 @@ nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) process = nxt_unit_msg_get_process(ctx, recv_msg); if (nxt_slow_path(process == NULL)) { - return 0; + return NXT_UNIT_ERROR; } pthread_mutex_lock(&process->incoming.mutex); - hdr = nxt_unit_get_incoming_mmap(ctx, process, tracking_msg->mmap_id); - if (nxt_slow_path(hdr == NULL)) { - pthread_mutex_unlock(&process->incoming.mutex); - - nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: " - "invalid mmap id %d,%"PRIu32, - recv_msg->stream, (int) process->pid, - tracking_msg->mmap_id); + res = nxt_unit_check_rbuf_mmap(ctx, &process->incoming, + recv_msg->pid, tracking_msg->mmap_id, + &hdr, rbuf); - return 0; + if (nxt_slow_path(res != NXT_UNIT_OK)) { + return res; } c = tracking_msg->tracking_id; - rc = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->stream, 0); + res = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->stream, 0); - if (rc == 0) { + if (res == 0) { nxt_unit_debug(ctx, "#%"PRIu32": tracking cancelled", recv_msg->stream); nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c); + + res = NXT_UNIT_CANCELLED; + + } else { + res = NXT_UNIT_OK; } pthread_mutex_unlock(&process->incoming.mutex); - return rc; + return res; } static int -nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) +nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx, nxt_unit_mmaps_t *mmaps, + pid_t pid, uint32_t id, nxt_port_mmap_header_t **hdr, + nxt_unit_read_buf_t *rbuf) { + int res, need_rbuf; + nxt_unit_mmap_t *mm; + nxt_unit_ctx_impl_t *ctx_impl; + + mm = nxt_unit_mmap_at(mmaps, id); + if (nxt_slow_path(mm == NULL)) { + nxt_unit_alert(ctx, "failed to allocate mmap"); + + pthread_mutex_unlock(&mmaps->mutex); + + *hdr = NULL; + + return NXT_UNIT_ERROR; + } + + *hdr = mm->hdr; + + if (nxt_fast_path(*hdr != NULL)) { + return NXT_UNIT_OK; + } + + need_rbuf = nxt_queue_is_empty(&mm->awaiting_rbuf); + + nxt_queue_insert_tail(&mm->awaiting_rbuf, &rbuf->link); + + pthread_mutex_unlock(&mmaps->mutex); + + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + nxt_atomic_fetch_add(&ctx_impl->wait_items, 1); + + if (need_rbuf) { + res = nxt_unit_get_mmap(ctx, pid, id); + if (nxt_slow_path(res == NXT_UNIT_ERROR)) { + return NXT_UNIT_ERROR; + } + } + + return NXT_UNIT_AGAIN; +} + + +static int +nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, + nxt_unit_read_buf_t *rbuf) +{ + int res; void *start; uint32_t size; + nxt_unit_mmaps_t *mmaps; nxt_unit_process_t *process; nxt_unit_mmap_buf_t *b, **incoming_tail; nxt_port_mmap_msg_t *mmap_msg, *end; @@ -3819,12 +3915,17 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) incoming_tail = &recv_msg->incoming_buf; + /* Allocating buffer structures. */ for (; mmap_msg < end; mmap_msg++) { b = nxt_unit_mmap_buf_get(ctx); if (nxt_slow_path(b == NULL)) { nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf", recv_msg->stream); + while (recv_msg->incoming_buf != NULL) { + nxt_unit_mmap_buf_release(recv_msg->incoming_buf); + } + return NXT_UNIT_ERROR; } @@ -3835,19 +3936,21 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) b = recv_msg->incoming_buf; mmap_msg = recv_msg->start; - pthread_mutex_lock(&process->incoming.mutex); + mmaps = &process->incoming; + + pthread_mutex_lock(&mmaps->mutex); for (; mmap_msg < end; mmap_msg++) { - hdr = nxt_unit_get_incoming_mmap(ctx, process, mmap_msg->mmap_id); - if (nxt_slow_path(hdr == NULL)) { - pthread_mutex_unlock(&process->incoming.mutex); + res = nxt_unit_check_rbuf_mmap(ctx, mmaps, + recv_msg->pid, mmap_msg->mmap_id, + &hdr, rbuf); - nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: " - "invalid mmap id %d,%"PRIu32, - recv_msg->stream, (int) process->pid, - mmap_msg->mmap_id); + if (nxt_slow_path(res != NXT_UNIT_OK)) { + while (recv_msg->incoming_buf != NULL) { + nxt_unit_mmap_buf_release(recv_msg->incoming_buf); + } - return NXT_UNIT_ERROR; + return res; } start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id); @@ -3874,7 +3977,41 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) (int) mmap_msg->size); } - pthread_mutex_unlock(&process->incoming.mutex); + pthread_mutex_unlock(&mmaps->mutex); + + return NXT_UNIT_OK; +} + + +static int +nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id) +{ + ssize_t res; + nxt_unit_impl_t *lib; + nxt_unit_ctx_impl_t *ctx_impl; + + struct { + nxt_port_msg_t msg; + nxt_port_msg_get_mmap_t get_mmap; + } m; + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + memset(&m.msg, 0, sizeof(nxt_port_msg_t)); + + m.msg.pid = lib->pid; + m.msg.reply_port = ctx_impl->read_port->id.id; + m.msg.type = _NXT_PORT_MSG_GET_MMAP; + + m.get_mmap.id = id; + + nxt_unit_debug(ctx, "get_mmap: %d %d", (int) pid, (int) id); + + res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL, 0); + if (nxt_slow_path(res != sizeof(m))) { + return NXT_UNIT_ERROR; + } return NXT_UNIT_OK; } @@ -4110,6 +4247,7 @@ int nxt_unit_run_once(nxt_unit_ctx_t *ctx) { int rc; + nxt_queue_link_t *link; nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_read_buf_t *rbuf; @@ -4119,18 +4257,22 @@ nxt_unit_run_once(nxt_unit_ctx_t *ctx) pthread_mutex_lock(&ctx_impl->mutex); - if (ctx_impl->pending_read_head != NULL) { - rbuf = ctx_impl->pending_read_head; - ctx_impl->pending_read_head = rbuf->next; + if (!nxt_queue_is_empty(&ctx_impl->pending_rbuf)) { - if (ctx_impl->pending_read_tail == &rbuf->next) { - ctx_impl->pending_read_tail = &ctx_impl->pending_read_head; - } +next_pending: + + link = nxt_queue_first(&ctx_impl->pending_rbuf); + nxt_queue_remove(link); + + rbuf = nxt_container_of(link, nxt_unit_read_buf_t, link); pthread_mutex_unlock(&ctx_impl->mutex); } else { rbuf = nxt_unit_read_buf_get_impl(ctx_impl); + + pthread_mutex_unlock(&ctx_impl->mutex); + if (nxt_slow_path(rbuf == NULL)) { nxt_unit_ctx_release(ctx_impl); @@ -4142,21 +4284,40 @@ nxt_unit_run_once(nxt_unit_ctx_t *ctx) } if (nxt_fast_path(rbuf->size > 0)) { - rc = nxt_unit_process_msg(ctx, - rbuf->buf, rbuf->size, - rbuf->oob, sizeof(rbuf->oob)); + rc = nxt_unit_process_msg(ctx, rbuf); #if (NXT_DEBUG) - memset(rbuf->buf, 0xAC, rbuf->size); + if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) { + memset(rbuf->buf, 0xAC, rbuf->size); + } #endif } else { rc = NXT_UNIT_ERROR; } - nxt_unit_read_buf_release(ctx, rbuf); + if (nxt_slow_path(rc == NXT_UNIT_AGAIN)) { + rc = NXT_UNIT_OK; + + } else { + nxt_unit_read_buf_release(ctx, rbuf); + } + + if (nxt_slow_path(rc == NXT_UNIT_CANCELLED)) { + rc = NXT_UNIT_OK; + } + + if (nxt_fast_path(rc == NXT_UNIT_OK)) { + pthread_mutex_lock(&ctx_impl->mutex); - nxt_unit_process_ready_req(ctx_impl); + if (!nxt_queue_is_empty(&ctx_impl->pending_rbuf)) { + goto next_pending; + } + + pthread_mutex_unlock(&ctx_impl->mutex); + + nxt_unit_process_ready_req(ctx_impl); + } nxt_unit_ctx_release(ctx_impl); diff --git a/src/nxt_unit.h b/src/nxt_unit.h index 8fa64f4e..79157f5f 100644 --- a/src/nxt_unit.h +++ b/src/nxt_unit.h @@ -20,6 +20,7 @@ enum { NXT_UNIT_OK = 0, NXT_UNIT_ERROR = 1, NXT_UNIT_AGAIN = 2, + NXT_UNIT_CANCELLED = 3, }; enum { @@ -189,16 +190,6 @@ struct nxt_unit_read_info_s { nxt_unit_ctx_t *nxt_unit_init(nxt_unit_init_t *); /* - * Process received message, invoke configured callbacks. - * - * If application implements it's own event loop, each datagram received - * from port socket should be initially processed by unit. This function - * may invoke other application-defined callback for message processing. - */ -int nxt_unit_process_msg(nxt_unit_ctx_t *, - void *buf, size_t buf_size, void *oob, size_t oob_size); - -/* * Main function useful in case when application does not have it's own * event loop. nxt_unit_run() starts infinite message wait and process loop. * |