diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/nxt_http_websocket.c | 3 | ||||
-rw-r--r-- | src/nxt_port.h | 7 | ||||
-rw-r--r-- | src/nxt_port_memory.c | 73 | ||||
-rw-r--r-- | src/nxt_port_memory.h | 12 | ||||
-rw-r--r-- | src/nxt_port_memory_int.h | 1 | ||||
-rw-r--r-- | src/nxt_router.c | 69 | ||||
-rw-r--r-- | src/nxt_unit.c | 377 | ||||
-rw-r--r-- | src/nxt_unit.h | 11 |
8 files changed, 374 insertions, 179 deletions
diff --git a/src/nxt_http_websocket.c b/src/nxt_http_websocket.c index fb888f5d..4d31b320 100644 --- a/src/nxt_http_websocket.c +++ b/src/nxt_http_websocket.c @@ -69,7 +69,8 @@ nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data) if (buf == NULL || buf_free_size == 0) { buf_free_size = nxt_min(frame_size, PORT_MMAP_DATA_SIZE); - buf = nxt_port_mmap_get_buf(task, req_app_link->app_port, + buf = nxt_port_mmap_get_buf(task, + &req_app_link->app_port->process->outgoing, buf_free_size); *out_tail = buf; diff --git a/src/nxt_port.h b/src/nxt_port.h index 838a7ffe..3a8a200a 100644 --- a/src/nxt_port.h +++ b/src/nxt_port.h @@ -27,6 +27,7 @@ struct nxt_port_handlers_s { nxt_port_handler_t new_port; nxt_port_handler_t get_port; nxt_port_handler_t mmap; + nxt_port_handler_t get_mmap; /* New process */ nxt_port_handler_t process_created; @@ -80,6 +81,7 @@ typedef enum { _NXT_PORT_MSG_NEW_PORT = nxt_port_handler_idx(new_port), _NXT_PORT_MSG_GET_PORT = nxt_port_handler_idx(get_port), _NXT_PORT_MSG_MMAP = nxt_port_handler_idx(mmap), + _NXT_PORT_MSG_GET_MMAP = nxt_port_handler_idx(get_mmap), _NXT_PORT_MSG_PROCESS_CREATED = nxt_port_handler_idx(process_created), _NXT_PORT_MSG_PROCESS_READY = nxt_port_handler_idx(process_ready), @@ -247,6 +249,11 @@ typedef struct { } nxt_port_msg_get_port_t; +typedef struct { + uint32_t id; +} nxt_port_msg_get_mmap_t; + + /* * nxt_port_data_t size is allocation size * which enables effective reuse of memory pool cache. diff --git a/src/nxt_port_memory.c b/src/nxt_port_memory.c index fd472cc6..1e01629e 100644 --- a/src/nxt_port_memory.c +++ b/src/nxt_port_memory.c @@ -282,8 +282,8 @@ fail: static nxt_port_mmap_handler_t * -nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, - nxt_port_t *port, nxt_bool_t tracking, nxt_int_t n) +nxt_port_new_port_mmap(nxt_task_t *task, nxt_port_mmaps_t *mmaps, + nxt_bool_t tracking, nxt_int_t n) { void *mem; nxt_fd_t fd; @@ -295,15 +295,14 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t)); if (nxt_slow_path(mmap_handler == NULL)) { - nxt_log(task, NXT_LOG_WARN, "failed to allocate mmap_handler"); + nxt_alert(task, "failed to allocate mmap_handler"); return NULL; } - port_mmap = nxt_port_mmap_at(&process->outgoing, process->outgoing.size); + port_mmap = nxt_port_mmap_at(mmaps, mmaps->size); if (nxt_slow_path(port_mmap == NULL)) { - nxt_log(task, NXT_LOG_WARN, - "failed to add port mmap to outgoing array"); + nxt_alert(task, "failed to add port mmap to mmaps array"); nxt_free(mmap_handler); return NULL; @@ -322,6 +321,7 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, } mmap_handler->hdr = mem; + mmap_handler->fd = fd; port_mmap->mmap_handler = mmap_handler; nxt_port_mmap_handler_use(mmap_handler, 1); @@ -331,10 +331,9 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map)); nxt_memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map)); - hdr->id = process->outgoing.size - 1; + hdr->id = mmaps->size - 1; hdr->src_pid = nxt_pid; - hdr->dst_pid = process->pid; - hdr->sent_over = port->id; + hdr->sent_over = 0xFFFFu; /* Mark first chunk as busy */ free_map = tracking ? hdr->free_tracking_map : hdr->free_map; @@ -347,13 +346,8 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT); nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT); - nxt_debug(task, "send mmap fd %FD to process %PI", fd, port->pid); - - /* TODO handle error */ - (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL); - - nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> %PI", - hdr->id, nxt_pid, process->pid); + nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> ...", + hdr->id, nxt_pid); return mmap_handler; @@ -361,7 +355,7 @@ remove_fail: nxt_free(mmap_handler); - process->outgoing.size--; + mmaps->size--; return NULL; } @@ -445,34 +439,28 @@ nxt_shm_open(nxt_task_t *task, size_t size) static nxt_port_mmap_handler_t * -nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c, +nxt_port_mmap_get(nxt_task_t *task, nxt_port_mmaps_t *mmaps, nxt_chunk_id_t *c, nxt_int_t n, nxt_bool_t tracking) { nxt_int_t i, res, nchunks; - nxt_process_t *process; nxt_free_map_t *free_map; nxt_port_mmap_t *port_mmap; nxt_port_mmap_t *end_port_mmap; nxt_port_mmap_header_t *hdr; nxt_port_mmap_handler_t *mmap_handler; - process = port->process; - if (nxt_slow_path(process == NULL)) { - return NULL; - } - - nxt_thread_mutex_lock(&process->outgoing.mutex); + nxt_thread_mutex_lock(&mmaps->mutex); - end_port_mmap = process->outgoing.elts + process->outgoing.size; + end_port_mmap = mmaps->elts + mmaps->size; - for (port_mmap = process->outgoing.elts; + for (port_mmap = mmaps->elts; port_mmap < end_port_mmap; port_mmap++) { mmap_handler = port_mmap->mmap_handler; hdr = mmap_handler->hdr; - if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port->id) { + if (hdr->sent_over != 0xFFFFu) { continue; } @@ -510,11 +498,11 @@ nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c, /* TODO introduce port_mmap limit and release wait. */ *c = 0; - mmap_handler = nxt_port_new_port_mmap(task, process, port, tracking, n); + mmap_handler = nxt_port_new_port_mmap(task, mmaps, tracking, n); unlock_return: - nxt_thread_mutex_unlock(&process->outgoing.mutex); + nxt_thread_mutex_unlock(&mmaps->mutex); return mmap_handler; } @@ -549,7 +537,7 @@ nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id) nxt_int_t -nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_t *port, +nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_mmaps_t *mmaps, nxt_port_mmap_tracking_t *tracking, uint32_t stream) { nxt_chunk_id_t c; @@ -558,7 +546,7 @@ nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_t *port, nxt_debug(task, "request tracking for stream #%uD", stream); - mmap_handler = nxt_port_mmap_get(task, port, &c, 1, 1); + mmap_handler = nxt_port_mmap_get(task, mmaps, &c, 1, 1); if (nxt_slow_path(mmap_handler == NULL)) { return NXT_ERROR; } @@ -680,7 +668,7 @@ nxt_port_mmap_tracking_read(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_buf_t * -nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size) +nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_mmaps_t *mmaps, size_t size) { nxt_mp_t *mp; nxt_buf_t *b; @@ -707,7 +695,7 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size) b->completion_handler = nxt_port_mmap_buf_completion; nxt_buf_set_port_mmap(b); - mmap_handler = nxt_port_mmap_get(task, port, &c, nchunks, 0); + mmap_handler = nxt_port_mmap_get(task, mmaps, &c, nchunks, 0); if (nxt_slow_path(mmap_handler == NULL)) { mp = task->thread->engine->mem_pool; nxt_mp_free(mp, b); @@ -943,9 +931,7 @@ nxt_port_mmap_read(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_port_method_t nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b) { - nxt_port_method_t m; - nxt_port_mmap_header_t *hdr; - nxt_port_mmap_handler_t *mmap_handler; + nxt_port_method_t m; m = NXT_PORT_METHOD_ANY; @@ -956,9 +942,6 @@ nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b) } if (nxt_buf_is_port_mmap(b)) { - mmap_handler = b->parent; - hdr = mmap_handler->hdr; - if (m == NXT_PORT_METHOD_PLAIN) { nxt_log_error(NXT_LOG_ERR, task->log, "mixing plain and mmap buffers, " @@ -967,16 +950,6 @@ nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b) break; } - if (port->pid != hdr->dst_pid) { - nxt_log_error(NXT_LOG_ERR, task->log, - "send mmap buffer for %PI to %PI, " - "using plain mode", hdr->dst_pid, port->pid); - - m = NXT_PORT_METHOD_PLAIN; - - break; - } - if (m == NXT_PORT_METHOD_ANY) { nxt_debug(task, "using mmap mode"); diff --git a/src/nxt_port_memory.h b/src/nxt_port_memory.h index 2cd4bd76..8e71af3d 100644 --- a/src/nxt_port_memory.h +++ b/src/nxt_port_memory.h @@ -23,7 +23,7 @@ struct nxt_port_mmap_tracking_s { }; nxt_int_t -nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_t *port, +nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_mmaps_t *mmaps, nxt_port_mmap_tracking_t *tracking, uint32_t stream); nxt_bool_t @@ -37,14 +37,12 @@ nxt_bool_t nxt_port_mmap_tracking_read(nxt_task_t *task, nxt_port_recv_msg_t *msg); /* - * Allocates nxt_but_t structure from port's mem_pool, assigns this buf 'mem' - * pointers to first available shared mem bucket(s). 'size' used as a hint to - * acquire several successive buckets if possible. - * - * This function assumes that current thread operates the 'port' exclusively. + * Allocates nxt_but_t structure from task's thread engine mem_pool, assigns + * this buf 'mem' pointers to first available shared mem bucket(s). 'size' + * used as a hint to acquire several successive buckets if possible. */ nxt_buf_t * -nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size); +nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_mmaps_t *mmaps, size_t size); nxt_int_t nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size, size_t min_size); diff --git a/src/nxt_port_memory_int.h b/src/nxt_port_memory_int.h index 87c3d833..d2524ee4 100644 --- a/src/nxt_port_memory_int.h +++ b/src/nxt_port_memory_int.h @@ -63,6 +63,7 @@ struct nxt_port_mmap_header_s { struct nxt_port_mmap_handler_s { nxt_port_mmap_header_t *hdr; nxt_atomic_t use_count; + nxt_fd_t fd; }; /* diff --git a/src/nxt_router.c b/src/nxt_router.c index 3380e133..4df1489d 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -257,6 +257,8 @@ static void nxt_router_http_request_release(nxt_task_t *task, void *obj, static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); static void nxt_router_get_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); +static void nxt_router_get_mmap_handler(nxt_task_t *task, + nxt_port_recv_msg_t *msg); extern const nxt_http_request_state_t nxt_http_websocket; @@ -281,6 +283,7 @@ static const nxt_port_handlers_t nxt_router_process_port_handlers = { .get_port = nxt_router_get_port_handler, .change_file = nxt_port_change_log_file_handler, .mmap = nxt_port_mmap_handler, + .get_mmap = nxt_router_get_mmap_handler, .data = nxt_router_conf_data_handler, .remove_pid = nxt_router_remove_pid_handler, .access_log = nxt_router_access_log_reopen_handler, @@ -5008,7 +5011,7 @@ nxt_router_app_prepare_request(nxt_task_t *task, buf = req_app_link->msg_info.buf; - res = nxt_port_mmap_get_tracking(task, port, + res = nxt_port_mmap_get_tracking(task, &port->process->outgoing, &req_app_link->msg_info.tracking, req_app_link->stream); if (nxt_slow_path(res != NXT_OK)) { @@ -5138,7 +5141,7 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r, return NULL; } - out = nxt_port_mmap_get_buf(task, port, + out = nxt_port_mmap_get_buf(task, &port->process->outgoing, nxt_min(req_size + content_length, PORT_MMAP_DATA_SIZE)); if (nxt_slow_path(out == NULL)) { return NULL; @@ -5320,7 +5323,8 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r, if (buf == NULL) { free_size = nxt_min(size, PORT_MMAP_DATA_SIZE); - buf = nxt_port_mmap_get_buf(task, port, free_size); + buf = nxt_port_mmap_get_buf(task, &port->process->outgoing, + free_size); if (nxt_slow_path(buf == NULL)) { while (out != NULL) { buf = out->next; @@ -5556,6 +5560,65 @@ nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) static void +nxt_router_get_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + nxt_fd_t fd; + nxt_port_t *port; + nxt_runtime_t *rt; + nxt_port_mmaps_t *mmaps; + nxt_port_msg_get_mmap_t *get_mmap_msg; + nxt_port_mmap_handler_t *mmap_handler; + + rt = task->thread->runtime; + + port = nxt_runtime_port_find(rt, msg->port_msg.pid, + msg->port_msg.reply_port); + if (nxt_slow_path(port == NULL)) { + nxt_alert(task, "get_mmap_handler: reply_port %PI:%d not found", + msg->port_msg.pid, msg->port_msg.reply_port); + + return; + } + + if (nxt_slow_path(nxt_buf_used_size(msg->buf) + < (int) sizeof(nxt_port_msg_get_mmap_t))) + { + nxt_alert(task, "get_mmap_handler: message buffer too small (%d)", + (int) nxt_buf_used_size(msg->buf)); + + return; + } + + get_mmap_msg = (nxt_port_msg_get_mmap_t *) msg->buf->mem.pos; + + nxt_assert(port->type == NXT_PROCESS_APP); + + mmaps = &port->process->outgoing; + nxt_thread_mutex_lock(&mmaps->mutex); + + if (nxt_slow_path(get_mmap_msg->id >= mmaps->size)) { + nxt_thread_mutex_unlock(&mmaps->mutex); + + nxt_alert(task, "get_mmap_handler: mmap id is too big (%d)", + (int) get_mmap_msg->id); + + return; + } + + mmap_handler = mmaps->elts[get_mmap_msg->id].mmap_handler; + + fd = mmap_handler->fd; + + nxt_thread_mutex_unlock(&mmaps->mutex); + + nxt_debug(task, "get mmap %PI:%d found", + msg->port_msg.pid, (int) get_mmap_msg->id); + + (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL); +} + + +static void nxt_router_get_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { nxt_port_t *port, *reply_port; diff --git a/src/nxt_unit.c b/src/nxt_unit.c index c1ef977f..b321a0d4 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -51,6 +51,7 @@ static int nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream, uint32_t *shm_limit); static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream); +static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf); static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, @@ -103,12 +104,14 @@ static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps); nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process); nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process); static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps); -static nxt_port_mmap_header_t *nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx, - nxt_unit_process_t *process, uint32_t id); static int nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, - nxt_unit_recv_msg_t *recv_msg); + nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf); +static int nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx, + nxt_unit_mmaps_t *mmaps, pid_t pid, uint32_t id, + nxt_port_mmap_header_t **hdr, nxt_unit_read_buf_t *rbuf); static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, - nxt_unit_recv_msg_t *recv_msg); + nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf); +static int nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id); static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, nxt_port_mmap_header_t *hdr, void *start, uint32_t size); @@ -240,7 +243,8 @@ struct nxt_unit_websocket_frame_impl_s { struct nxt_unit_read_buf_s { - nxt_unit_read_buf_t *next; + nxt_queue_link_t link; + nxt_unit_ctx_impl_t *ctx_impl; ssize_t size; char buf[16384]; char oob[256]; @@ -276,9 +280,11 @@ struct nxt_unit_ctx_impl_s { /* of nxt_unit_request_info_impl_t */ nxt_queue_t ready_req; - nxt_unit_read_buf_t *pending_read_head; - nxt_unit_read_buf_t **pending_read_tail; - nxt_unit_read_buf_t *free_read_buf; + /* of nxt_unit_read_buf_t */ + nxt_queue_t pending_rbuf; + + /* of nxt_unit_read_buf_t */ + nxt_queue_t free_rbuf; nxt_unit_mmap_buf_t ctx_buf[2]; nxt_unit_read_buf_t ctx_read_buf; @@ -318,6 +324,7 @@ struct nxt_unit_port_impl_s { nxt_atomic_t use_count; + /* for nxt_unit_process_t.ports */ nxt_queue_link_t link; nxt_unit_process_t *process; @@ -330,6 +337,9 @@ struct nxt_unit_port_impl_s { struct nxt_unit_mmap_s { nxt_port_mmap_header_t *hdr; + + /* of nxt_unit_read_buf_t */ + nxt_queue_t awaiting_rbuf; }; @@ -345,7 +355,7 @@ struct nxt_unit_mmaps_s { struct nxt_unit_process_s { pid_t pid; - nxt_queue_t ports; + nxt_queue_t ports; /* of nxt_unit_port_impl_t */ nxt_unit_mmaps_t incoming; nxt_unit_mmaps_t outgoing; @@ -537,17 +547,17 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, nxt_queue_init(&ctx_impl->free_ws); nxt_queue_init(&ctx_impl->active_req); nxt_queue_init(&ctx_impl->ready_req); + nxt_queue_init(&ctx_impl->pending_rbuf); + nxt_queue_init(&ctx_impl->free_rbuf); ctx_impl->free_buf = NULL; nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]); nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]); nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link); + nxt_queue_insert_tail(&ctx_impl->free_rbuf, &ctx_impl->ctx_read_buf.link); - ctx_impl->pending_read_head = NULL; - ctx_impl->pending_read_tail = &ctx_impl->pending_read_head; - ctx_impl->free_read_buf = &ctx_impl->ctx_read_buf; - ctx_impl->ctx_read_buf.next = NULL; + ctx_impl->ctx_read_buf.ctx_impl = ctx_impl; ctx_impl->req.req.ctx = &ctx_impl->ctx; ctx_impl->req.req.unit = &lib->unit; @@ -767,9 +777,8 @@ nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream) } -int -nxt_unit_process_msg(nxt_unit_ctx_t *ctx, - void *buf, size_t buf_size, void *oob, size_t oob_size) +static int +nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) { int rc; pid_t pid; @@ -783,11 +792,10 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, rc = NXT_UNIT_ERROR; recv_msg.fd = -1; recv_msg.process = NULL; - port_msg = buf; - cm = oob; + port_msg = (nxt_port_msg_t *) rbuf->buf; + cm = (struct cmsghdr *) rbuf->oob; - if (oob_size >= CMSG_SPACE(sizeof(int)) - && cm->cmsg_len == CMSG_LEN(sizeof(int)) + if (cm->cmsg_len == CMSG_LEN(sizeof(int)) && cm->cmsg_level == SOL_SOCKET && cm->cmsg_type == SCM_RIGHTS) { @@ -796,8 +804,8 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, recv_msg.incoming_buf = NULL; - if (nxt_slow_path(buf_size < sizeof(nxt_port_msg_t))) { - nxt_unit_warn(ctx, "message too small (%d bytes)", (int) buf_size); + if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) { + nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size); goto fail; } @@ -808,7 +816,7 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, recv_msg.mmap = port_msg->mmap; recv_msg.start = port_msg + 1; - recv_msg.size = buf_size - sizeof(nxt_port_msg_t); + recv_msg.size = rbuf->size - sizeof(nxt_port_msg_t); if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) { nxt_unit_warn(ctx, "#%"PRIu32": unknown message type (%d)", @@ -816,10 +824,16 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, goto fail; } - if (port_msg->tracking && nxt_unit_tracking_read(ctx, &recv_msg) == 0) { - rc = NXT_UNIT_OK; + if (port_msg->tracking) { + rc = nxt_unit_tracking_read(ctx, &recv_msg, rbuf); - goto fail; + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + if (rc == NXT_UNIT_AGAIN) { + recv_msg.fd = -1; + } + + goto fail; + } } /* Fragmentation is unsupported. */ @@ -830,7 +844,13 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, } if (port_msg->mmap) { - if (nxt_unit_mmap_read(ctx, &recv_msg) != NXT_UNIT_OK) { + rc = nxt_unit_mmap_read(ctx, &recv_msg, rbuf); + + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + if (rc == NXT_UNIT_AGAIN) { + recv_msg.fd = -1; + } + goto fail; } } @@ -1077,6 +1097,9 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port); res = nxt_unit_request_check_response_port(req, &port_id); + if (nxt_slow_path(res == NXT_UNIT_ERROR)) { + return NXT_UNIT_ERROR; + } if (nxt_fast_path(res == NXT_UNIT_OK)) { lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); @@ -2376,33 +2399,41 @@ static nxt_unit_read_buf_t * nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx) { nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_read_buf_t *rbuf; ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); pthread_mutex_lock(&ctx_impl->mutex); - return nxt_unit_read_buf_get_impl(ctx_impl); + rbuf = nxt_unit_read_buf_get_impl(ctx_impl); + + pthread_mutex_unlock(&ctx_impl->mutex); + + return rbuf; } static nxt_unit_read_buf_t * nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl) { + nxt_queue_link_t *link; nxt_unit_read_buf_t *rbuf; - if (ctx_impl->free_read_buf != NULL) { - rbuf = ctx_impl->free_read_buf; - ctx_impl->free_read_buf = rbuf->next; + if (!nxt_queue_is_empty(&ctx_impl->free_rbuf)) { + link = nxt_queue_first(&ctx_impl->free_rbuf); + nxt_queue_remove(link); - pthread_mutex_unlock(&ctx_impl->mutex); + rbuf = nxt_container_of(link, nxt_unit_read_buf_t, link); return rbuf; } - pthread_mutex_unlock(&ctx_impl->mutex); - rbuf = malloc(sizeof(nxt_unit_read_buf_t)); + if (nxt_fast_path(rbuf != NULL)) { + rbuf->ctx_impl = ctx_impl; + } + return rbuf; } @@ -2417,8 +2448,7 @@ nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx, pthread_mutex_lock(&ctx_impl->mutex); - rbuf->next = ctx_impl->free_read_buf; - ctx_impl->free_read_buf = rbuf; + nxt_queue_insert_head(&ctx_impl->free_rbuf, &rbuf->link); pthread_mutex_unlock(&ctx_impl->mutex); } @@ -3255,9 +3285,7 @@ nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx) pthread_mutex_lock(&ctx_impl->mutex); - *ctx_impl->pending_read_tail = rbuf; - ctx_impl->pending_read_tail = &rbuf->next; - rbuf->next = NULL; + nxt_queue_insert_tail(&ctx_impl->pending_rbuf, &rbuf->link); pthread_mutex_unlock(&ctx_impl->mutex); @@ -3275,7 +3303,12 @@ nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx) static nxt_unit_mmap_t * nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i) { - uint32_t cap; + uint32_t cap, n; + nxt_unit_mmap_t *e; + + if (nxt_fast_path(mmaps->size > i)) { + return mmaps->elts + i; + } cap = mmaps->cap; @@ -3295,13 +3328,19 @@ nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i) if (cap != mmaps->cap) { - mmaps->elts = realloc(mmaps->elts, cap * sizeof(*mmaps->elts)); - if (nxt_slow_path(mmaps->elts == NULL)) { + e = realloc(mmaps->elts, cap * sizeof(nxt_unit_mmap_t)); + if (nxt_slow_path(e == NULL)) { return NULL; } - memset(mmaps->elts + mmaps->cap, 0, - sizeof(*mmaps->elts) * (cap - mmaps->cap)); + mmaps->elts = e; + + for (n = mmaps->cap; n < cap; n++) { + e = mmaps->elts + n; + + e->hdr = NULL; + nxt_queue_init(&e->awaiting_rbuf); + } mmaps->cap = cap; } @@ -3581,13 +3620,16 @@ nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd) { - int rc; - void *mem; - struct stat mmap_stat; - nxt_unit_mmap_t *mm; - nxt_unit_impl_t *lib; - nxt_unit_process_t *process; - nxt_port_mmap_header_t *hdr; + int rc; + void *mem; + nxt_queue_t awaiting_rbuf; + struct stat mmap_stat; + nxt_unit_mmap_t *mm; + nxt_unit_impl_t *lib; + nxt_unit_process_t *process; + nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_read_buf_t *rbuf; + nxt_port_mmap_header_t *hdr; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); @@ -3626,7 +3668,7 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd) hdr = mem; - if (nxt_slow_path(hdr->src_pid != pid || hdr->dst_pid != lib->pid)) { + if (nxt_slow_path(hdr->src_pid != pid)) { nxt_unit_warn(ctx, "incoming_mmap: unexpected pid in mmap header " "detected: %d != %d or %d != %d", (int) hdr->src_pid, @@ -3637,6 +3679,8 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd) goto fail; } + nxt_queue_init(&awaiting_rbuf); + pthread_mutex_lock(&process->incoming.mutex); mm = nxt_unit_mmap_at(&process->incoming, hdr->id); @@ -3650,11 +3694,28 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd) hdr->sent_over = 0xFFFFu; + nxt_queue_add(&awaiting_rbuf, &mm->awaiting_rbuf); + nxt_queue_init(&mm->awaiting_rbuf); + rc = NXT_UNIT_OK; } pthread_mutex_unlock(&process->incoming.mutex); + nxt_queue_each(rbuf, &awaiting_rbuf, nxt_unit_read_buf_t, link) { + + ctx_impl = rbuf->ctx_impl; + + pthread_mutex_lock(&ctx_impl->mutex); + + nxt_queue_insert_head(&ctx_impl->pending_rbuf, &rbuf->link); + + pthread_mutex_unlock(&ctx_impl->mutex); + + nxt_atomic_fetch_add(&ctx_impl->wait_items, -1); + + } nxt_queue_loop; + fail: nxt_unit_process_release(process); @@ -3719,27 +3780,11 @@ nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps) } -static nxt_port_mmap_header_t * -nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, - uint32_t id) -{ - nxt_port_mmap_header_t *hdr; - - if (nxt_fast_path(process->incoming.size > id)) { - hdr = process->incoming.elts[id].hdr; - - } else { - hdr = NULL; - } - - return hdr; -} - - static int -nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) +nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, + nxt_unit_read_buf_t *rbuf) { - int rc; + int res; nxt_chunk_id_t c; nxt_unit_process_t *process; nxt_port_mmap_header_t *hdr; @@ -3749,7 +3794,7 @@ nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: too small message (%d)", recv_msg->stream, (int) recv_msg->size); - return 0; + return NXT_UNIT_ERROR; } tracking_msg = recv_msg->start; @@ -3759,44 +3804,95 @@ nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) process = nxt_unit_msg_get_process(ctx, recv_msg); if (nxt_slow_path(process == NULL)) { - return 0; + return NXT_UNIT_ERROR; } pthread_mutex_lock(&process->incoming.mutex); - hdr = nxt_unit_get_incoming_mmap(ctx, process, tracking_msg->mmap_id); - if (nxt_slow_path(hdr == NULL)) { - pthread_mutex_unlock(&process->incoming.mutex); - - nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: " - "invalid mmap id %d,%"PRIu32, - recv_msg->stream, (int) process->pid, - tracking_msg->mmap_id); + res = nxt_unit_check_rbuf_mmap(ctx, &process->incoming, + recv_msg->pid, tracking_msg->mmap_id, + &hdr, rbuf); - return 0; + if (nxt_slow_path(res != NXT_UNIT_OK)) { + return res; } c = tracking_msg->tracking_id; - rc = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->stream, 0); + res = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->stream, 0); - if (rc == 0) { + if (res == 0) { nxt_unit_debug(ctx, "#%"PRIu32": tracking cancelled", recv_msg->stream); nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c); + + res = NXT_UNIT_CANCELLED; + + } else { + res = NXT_UNIT_OK; } pthread_mutex_unlock(&process->incoming.mutex); - return rc; + return res; } static int -nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) +nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx, nxt_unit_mmaps_t *mmaps, + pid_t pid, uint32_t id, nxt_port_mmap_header_t **hdr, + nxt_unit_read_buf_t *rbuf) { + int res, need_rbuf; + nxt_unit_mmap_t *mm; + nxt_unit_ctx_impl_t *ctx_impl; + + mm = nxt_unit_mmap_at(mmaps, id); + if (nxt_slow_path(mm == NULL)) { + nxt_unit_alert(ctx, "failed to allocate mmap"); + + pthread_mutex_unlock(&mmaps->mutex); + + *hdr = NULL; + + return NXT_UNIT_ERROR; + } + + *hdr = mm->hdr; + + if (nxt_fast_path(*hdr != NULL)) { + return NXT_UNIT_OK; + } + + need_rbuf = nxt_queue_is_empty(&mm->awaiting_rbuf); + + nxt_queue_insert_tail(&mm->awaiting_rbuf, &rbuf->link); + + pthread_mutex_unlock(&mmaps->mutex); + + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + nxt_atomic_fetch_add(&ctx_impl->wait_items, 1); + + if (need_rbuf) { + res = nxt_unit_get_mmap(ctx, pid, id); + if (nxt_slow_path(res == NXT_UNIT_ERROR)) { + return NXT_UNIT_ERROR; + } + } + + return NXT_UNIT_AGAIN; +} + + +static int +nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, + nxt_unit_read_buf_t *rbuf) +{ + int res; void *start; uint32_t size; + nxt_unit_mmaps_t *mmaps; nxt_unit_process_t *process; nxt_unit_mmap_buf_t *b, **incoming_tail; nxt_port_mmap_msg_t *mmap_msg, *end; @@ -3819,12 +3915,17 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) incoming_tail = &recv_msg->incoming_buf; + /* Allocating buffer structures. */ for (; mmap_msg < end; mmap_msg++) { b = nxt_unit_mmap_buf_get(ctx); if (nxt_slow_path(b == NULL)) { nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf", recv_msg->stream); + while (recv_msg->incoming_buf != NULL) { + nxt_unit_mmap_buf_release(recv_msg->incoming_buf); + } + return NXT_UNIT_ERROR; } @@ -3835,19 +3936,21 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) b = recv_msg->incoming_buf; mmap_msg = recv_msg->start; - pthread_mutex_lock(&process->incoming.mutex); + mmaps = &process->incoming; + + pthread_mutex_lock(&mmaps->mutex); for (; mmap_msg < end; mmap_msg++) { - hdr = nxt_unit_get_incoming_mmap(ctx, process, mmap_msg->mmap_id); - if (nxt_slow_path(hdr == NULL)) { - pthread_mutex_unlock(&process->incoming.mutex); + res = nxt_unit_check_rbuf_mmap(ctx, mmaps, + recv_msg->pid, mmap_msg->mmap_id, + &hdr, rbuf); - nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: " - "invalid mmap id %d,%"PRIu32, - recv_msg->stream, (int) process->pid, - mmap_msg->mmap_id); + if (nxt_slow_path(res != NXT_UNIT_OK)) { + while (recv_msg->incoming_buf != NULL) { + nxt_unit_mmap_buf_release(recv_msg->incoming_buf); + } - return NXT_UNIT_ERROR; + return res; } start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id); @@ -3874,7 +3977,41 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) (int) mmap_msg->size); } - pthread_mutex_unlock(&process->incoming.mutex); + pthread_mutex_unlock(&mmaps->mutex); + + return NXT_UNIT_OK; +} + + +static int +nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id) +{ + ssize_t res; + nxt_unit_impl_t *lib; + nxt_unit_ctx_impl_t *ctx_impl; + + struct { + nxt_port_msg_t msg; + nxt_port_msg_get_mmap_t get_mmap; + } m; + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + memset(&m.msg, 0, sizeof(nxt_port_msg_t)); + + m.msg.pid = lib->pid; + m.msg.reply_port = ctx_impl->read_port->id.id; + m.msg.type = _NXT_PORT_MSG_GET_MMAP; + + m.get_mmap.id = id; + + nxt_unit_debug(ctx, "get_mmap: %d %d", (int) pid, (int) id); + + res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL, 0); + if (nxt_slow_path(res != sizeof(m))) { + return NXT_UNIT_ERROR; + } return NXT_UNIT_OK; } @@ -4110,6 +4247,7 @@ int nxt_unit_run_once(nxt_unit_ctx_t *ctx) { int rc; + nxt_queue_link_t *link; nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_read_buf_t *rbuf; @@ -4119,18 +4257,22 @@ nxt_unit_run_once(nxt_unit_ctx_t *ctx) pthread_mutex_lock(&ctx_impl->mutex); - if (ctx_impl->pending_read_head != NULL) { - rbuf = ctx_impl->pending_read_head; - ctx_impl->pending_read_head = rbuf->next; + if (!nxt_queue_is_empty(&ctx_impl->pending_rbuf)) { - if (ctx_impl->pending_read_tail == &rbuf->next) { - ctx_impl->pending_read_tail = &ctx_impl->pending_read_head; - } +next_pending: + + link = nxt_queue_first(&ctx_impl->pending_rbuf); + nxt_queue_remove(link); + + rbuf = nxt_container_of(link, nxt_unit_read_buf_t, link); pthread_mutex_unlock(&ctx_impl->mutex); } else { rbuf = nxt_unit_read_buf_get_impl(ctx_impl); + + pthread_mutex_unlock(&ctx_impl->mutex); + if (nxt_slow_path(rbuf == NULL)) { nxt_unit_ctx_release(ctx_impl); @@ -4142,21 +4284,40 @@ nxt_unit_run_once(nxt_unit_ctx_t *ctx) } if (nxt_fast_path(rbuf->size > 0)) { - rc = nxt_unit_process_msg(ctx, - rbuf->buf, rbuf->size, - rbuf->oob, sizeof(rbuf->oob)); + rc = nxt_unit_process_msg(ctx, rbuf); #if (NXT_DEBUG) - memset(rbuf->buf, 0xAC, rbuf->size); + if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) { + memset(rbuf->buf, 0xAC, rbuf->size); + } #endif } else { rc = NXT_UNIT_ERROR; } - nxt_unit_read_buf_release(ctx, rbuf); + if (nxt_slow_path(rc == NXT_UNIT_AGAIN)) { + rc = NXT_UNIT_OK; + + } else { + nxt_unit_read_buf_release(ctx, rbuf); + } + + if (nxt_slow_path(rc == NXT_UNIT_CANCELLED)) { + rc = NXT_UNIT_OK; + } + + if (nxt_fast_path(rc == NXT_UNIT_OK)) { + pthread_mutex_lock(&ctx_impl->mutex); - nxt_unit_process_ready_req(ctx_impl); + if (!nxt_queue_is_empty(&ctx_impl->pending_rbuf)) { + goto next_pending; + } + + pthread_mutex_unlock(&ctx_impl->mutex); + + nxt_unit_process_ready_req(ctx_impl); + } nxt_unit_ctx_release(ctx_impl); diff --git a/src/nxt_unit.h b/src/nxt_unit.h index 8fa64f4e..79157f5f 100644 --- a/src/nxt_unit.h +++ b/src/nxt_unit.h @@ -20,6 +20,7 @@ enum { NXT_UNIT_OK = 0, NXT_UNIT_ERROR = 1, NXT_UNIT_AGAIN = 2, + NXT_UNIT_CANCELLED = 3, }; enum { @@ -189,16 +190,6 @@ struct nxt_unit_read_info_s { nxt_unit_ctx_t *nxt_unit_init(nxt_unit_init_t *); /* - * Process received message, invoke configured callbacks. - * - * If application implements it's own event loop, each datagram received - * from port socket should be initially processed by unit. This function - * may invoke other application-defined callback for message processing. - */ -int nxt_unit_process_msg(nxt_unit_ctx_t *, - void *buf, size_t buf_size, void *oob, size_t oob_size); - -/* * Main function useful in case when application does not have it's own * event loop. nxt_unit_run() starts infinite message wait and process loop. * |