diff options
Diffstat (limited to 'src/nxt_unit.c')
-rw-r--r-- | src/nxt_unit.c | 377 |
1 files changed, 269 insertions, 108 deletions
diff --git a/src/nxt_unit.c b/src/nxt_unit.c index c1ef977f..b321a0d4 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -51,6 +51,7 @@ static int nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream, uint32_t *shm_limit); static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream); +static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf); static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, @@ -103,12 +104,14 @@ static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps); nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process); nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process); static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps); -static nxt_port_mmap_header_t *nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx, - nxt_unit_process_t *process, uint32_t id); static int nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, - nxt_unit_recv_msg_t *recv_msg); + nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf); +static int nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx, + nxt_unit_mmaps_t *mmaps, pid_t pid, uint32_t id, + nxt_port_mmap_header_t **hdr, nxt_unit_read_buf_t *rbuf); static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, - nxt_unit_recv_msg_t *recv_msg); + nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf); +static int nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id); static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, nxt_port_mmap_header_t *hdr, void *start, uint32_t size); @@ -240,7 +243,8 @@ struct nxt_unit_websocket_frame_impl_s { struct nxt_unit_read_buf_s { - nxt_unit_read_buf_t *next; + nxt_queue_link_t link; + nxt_unit_ctx_impl_t *ctx_impl; ssize_t size; char buf[16384]; char oob[256]; @@ -276,9 +280,11 @@ struct nxt_unit_ctx_impl_s { /* of nxt_unit_request_info_impl_t */ nxt_queue_t ready_req; - nxt_unit_read_buf_t *pending_read_head; - nxt_unit_read_buf_t **pending_read_tail; - nxt_unit_read_buf_t *free_read_buf; + /* of nxt_unit_read_buf_t */ + nxt_queue_t pending_rbuf; + + /* of nxt_unit_read_buf_t */ + nxt_queue_t free_rbuf; nxt_unit_mmap_buf_t ctx_buf[2]; nxt_unit_read_buf_t ctx_read_buf; @@ -318,6 +324,7 @@ struct nxt_unit_port_impl_s { nxt_atomic_t use_count; + /* for nxt_unit_process_t.ports */ nxt_queue_link_t link; nxt_unit_process_t *process; @@ -330,6 +337,9 @@ struct nxt_unit_port_impl_s { struct nxt_unit_mmap_s { nxt_port_mmap_header_t *hdr; + + /* of nxt_unit_read_buf_t */ + nxt_queue_t awaiting_rbuf; }; @@ -345,7 +355,7 @@ struct nxt_unit_mmaps_s { struct nxt_unit_process_s { pid_t pid; - nxt_queue_t ports; + nxt_queue_t ports; /* of nxt_unit_port_impl_t */ nxt_unit_mmaps_t incoming; nxt_unit_mmaps_t outgoing; @@ -537,17 +547,17 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, nxt_queue_init(&ctx_impl->free_ws); nxt_queue_init(&ctx_impl->active_req); nxt_queue_init(&ctx_impl->ready_req); + nxt_queue_init(&ctx_impl->pending_rbuf); + nxt_queue_init(&ctx_impl->free_rbuf); ctx_impl->free_buf = NULL; nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]); nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]); nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link); + nxt_queue_insert_tail(&ctx_impl->free_rbuf, &ctx_impl->ctx_read_buf.link); - ctx_impl->pending_read_head = NULL; - ctx_impl->pending_read_tail = &ctx_impl->pending_read_head; - ctx_impl->free_read_buf = &ctx_impl->ctx_read_buf; - ctx_impl->ctx_read_buf.next = NULL; + ctx_impl->ctx_read_buf.ctx_impl = ctx_impl; ctx_impl->req.req.ctx = &ctx_impl->ctx; ctx_impl->req.req.unit = &lib->unit; @@ -767,9 +777,8 @@ nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream) } -int -nxt_unit_process_msg(nxt_unit_ctx_t *ctx, - void *buf, size_t buf_size, void *oob, size_t oob_size) +static int +nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) { int rc; pid_t pid; @@ -783,11 +792,10 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, rc = NXT_UNIT_ERROR; recv_msg.fd = -1; recv_msg.process = NULL; - port_msg = buf; - cm = oob; + port_msg = (nxt_port_msg_t *) rbuf->buf; + cm = (struct cmsghdr *) rbuf->oob; - if (oob_size >= CMSG_SPACE(sizeof(int)) - && cm->cmsg_len == CMSG_LEN(sizeof(int)) + if (cm->cmsg_len == CMSG_LEN(sizeof(int)) && cm->cmsg_level == SOL_SOCKET && cm->cmsg_type == SCM_RIGHTS) { @@ -796,8 +804,8 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, recv_msg.incoming_buf = NULL; - if (nxt_slow_path(buf_size < sizeof(nxt_port_msg_t))) { - nxt_unit_warn(ctx, "message too small (%d bytes)", (int) buf_size); + if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) { + nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size); goto fail; } @@ -808,7 +816,7 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, recv_msg.mmap = port_msg->mmap; recv_msg.start = port_msg + 1; - recv_msg.size = buf_size - sizeof(nxt_port_msg_t); + recv_msg.size = rbuf->size - sizeof(nxt_port_msg_t); if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) { nxt_unit_warn(ctx, "#%"PRIu32": unknown message type (%d)", @@ -816,10 +824,16 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, goto fail; } - if (port_msg->tracking && nxt_unit_tracking_read(ctx, &recv_msg) == 0) { - rc = NXT_UNIT_OK; + if (port_msg->tracking) { + rc = nxt_unit_tracking_read(ctx, &recv_msg, rbuf); - goto fail; + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + if (rc == NXT_UNIT_AGAIN) { + recv_msg.fd = -1; + } + + goto fail; + } } /* Fragmentation is unsupported. */ @@ -830,7 +844,13 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, } if (port_msg->mmap) { - if (nxt_unit_mmap_read(ctx, &recv_msg) != NXT_UNIT_OK) { + rc = nxt_unit_mmap_read(ctx, &recv_msg, rbuf); + + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + if (rc == NXT_UNIT_AGAIN) { + recv_msg.fd = -1; + } + goto fail; } } @@ -1077,6 +1097,9 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port); res = nxt_unit_request_check_response_port(req, &port_id); + if (nxt_slow_path(res == NXT_UNIT_ERROR)) { + return NXT_UNIT_ERROR; + } if (nxt_fast_path(res == NXT_UNIT_OK)) { lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); @@ -2376,33 +2399,41 @@ static nxt_unit_read_buf_t * nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx) { nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_read_buf_t *rbuf; ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); pthread_mutex_lock(&ctx_impl->mutex); - return nxt_unit_read_buf_get_impl(ctx_impl); + rbuf = nxt_unit_read_buf_get_impl(ctx_impl); + + pthread_mutex_unlock(&ctx_impl->mutex); + + return rbuf; } static nxt_unit_read_buf_t * nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl) { + nxt_queue_link_t *link; nxt_unit_read_buf_t *rbuf; - if (ctx_impl->free_read_buf != NULL) { - rbuf = ctx_impl->free_read_buf; - ctx_impl->free_read_buf = rbuf->next; + if (!nxt_queue_is_empty(&ctx_impl->free_rbuf)) { + link = nxt_queue_first(&ctx_impl->free_rbuf); + nxt_queue_remove(link); - pthread_mutex_unlock(&ctx_impl->mutex); + rbuf = nxt_container_of(link, nxt_unit_read_buf_t, link); return rbuf; } - pthread_mutex_unlock(&ctx_impl->mutex); - rbuf = malloc(sizeof(nxt_unit_read_buf_t)); + if (nxt_fast_path(rbuf != NULL)) { + rbuf->ctx_impl = ctx_impl; + } + return rbuf; } @@ -2417,8 +2448,7 @@ nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx, pthread_mutex_lock(&ctx_impl->mutex); - rbuf->next = ctx_impl->free_read_buf; - ctx_impl->free_read_buf = rbuf; + nxt_queue_insert_head(&ctx_impl->free_rbuf, &rbuf->link); pthread_mutex_unlock(&ctx_impl->mutex); } @@ -3255,9 +3285,7 @@ nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx) pthread_mutex_lock(&ctx_impl->mutex); - *ctx_impl->pending_read_tail = rbuf; - ctx_impl->pending_read_tail = &rbuf->next; - rbuf->next = NULL; + nxt_queue_insert_tail(&ctx_impl->pending_rbuf, &rbuf->link); pthread_mutex_unlock(&ctx_impl->mutex); @@ -3275,7 +3303,12 @@ nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx) static nxt_unit_mmap_t * nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i) { - uint32_t cap; + uint32_t cap, n; + nxt_unit_mmap_t *e; + + if (nxt_fast_path(mmaps->size > i)) { + return mmaps->elts + i; + } cap = mmaps->cap; @@ -3295,13 +3328,19 @@ nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i) if (cap != mmaps->cap) { - mmaps->elts = realloc(mmaps->elts, cap * sizeof(*mmaps->elts)); - if (nxt_slow_path(mmaps->elts == NULL)) { + e = realloc(mmaps->elts, cap * sizeof(nxt_unit_mmap_t)); + if (nxt_slow_path(e == NULL)) { return NULL; } - memset(mmaps->elts + mmaps->cap, 0, - sizeof(*mmaps->elts) * (cap - mmaps->cap)); + mmaps->elts = e; + + for (n = mmaps->cap; n < cap; n++) { + e = mmaps->elts + n; + + e->hdr = NULL; + nxt_queue_init(&e->awaiting_rbuf); + } mmaps->cap = cap; } @@ -3581,13 +3620,16 @@ nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd) { - int rc; - void *mem; - struct stat mmap_stat; - nxt_unit_mmap_t *mm; - nxt_unit_impl_t *lib; - nxt_unit_process_t *process; - nxt_port_mmap_header_t *hdr; + int rc; + void *mem; + nxt_queue_t awaiting_rbuf; + struct stat mmap_stat; + nxt_unit_mmap_t *mm; + nxt_unit_impl_t *lib; + nxt_unit_process_t *process; + nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_read_buf_t *rbuf; + nxt_port_mmap_header_t *hdr; lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); @@ -3626,7 +3668,7 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd) hdr = mem; - if (nxt_slow_path(hdr->src_pid != pid || hdr->dst_pid != lib->pid)) { + if (nxt_slow_path(hdr->src_pid != pid)) { nxt_unit_warn(ctx, "incoming_mmap: unexpected pid in mmap header " "detected: %d != %d or %d != %d", (int) hdr->src_pid, @@ -3637,6 +3679,8 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd) goto fail; } + nxt_queue_init(&awaiting_rbuf); + pthread_mutex_lock(&process->incoming.mutex); mm = nxt_unit_mmap_at(&process->incoming, hdr->id); @@ -3650,11 +3694,28 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd) hdr->sent_over = 0xFFFFu; + nxt_queue_add(&awaiting_rbuf, &mm->awaiting_rbuf); + nxt_queue_init(&mm->awaiting_rbuf); + rc = NXT_UNIT_OK; } pthread_mutex_unlock(&process->incoming.mutex); + nxt_queue_each(rbuf, &awaiting_rbuf, nxt_unit_read_buf_t, link) { + + ctx_impl = rbuf->ctx_impl; + + pthread_mutex_lock(&ctx_impl->mutex); + + nxt_queue_insert_head(&ctx_impl->pending_rbuf, &rbuf->link); + + pthread_mutex_unlock(&ctx_impl->mutex); + + nxt_atomic_fetch_add(&ctx_impl->wait_items, -1); + + } nxt_queue_loop; + fail: nxt_unit_process_release(process); @@ -3719,27 +3780,11 @@ nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps) } -static nxt_port_mmap_header_t * -nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process, - uint32_t id) -{ - nxt_port_mmap_header_t *hdr; - - if (nxt_fast_path(process->incoming.size > id)) { - hdr = process->incoming.elts[id].hdr; - - } else { - hdr = NULL; - } - - return hdr; -} - - static int -nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) +nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, + nxt_unit_read_buf_t *rbuf) { - int rc; + int res; nxt_chunk_id_t c; nxt_unit_process_t *process; nxt_port_mmap_header_t *hdr; @@ -3749,7 +3794,7 @@ nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: too small message (%d)", recv_msg->stream, (int) recv_msg->size); - return 0; + return NXT_UNIT_ERROR; } tracking_msg = recv_msg->start; @@ -3759,44 +3804,95 @@ nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) process = nxt_unit_msg_get_process(ctx, recv_msg); if (nxt_slow_path(process == NULL)) { - return 0; + return NXT_UNIT_ERROR; } pthread_mutex_lock(&process->incoming.mutex); - hdr = nxt_unit_get_incoming_mmap(ctx, process, tracking_msg->mmap_id); - if (nxt_slow_path(hdr == NULL)) { - pthread_mutex_unlock(&process->incoming.mutex); - - nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: " - "invalid mmap id %d,%"PRIu32, - recv_msg->stream, (int) process->pid, - tracking_msg->mmap_id); + res = nxt_unit_check_rbuf_mmap(ctx, &process->incoming, + recv_msg->pid, tracking_msg->mmap_id, + &hdr, rbuf); - return 0; + if (nxt_slow_path(res != NXT_UNIT_OK)) { + return res; } c = tracking_msg->tracking_id; - rc = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->stream, 0); + res = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->stream, 0); - if (rc == 0) { + if (res == 0) { nxt_unit_debug(ctx, "#%"PRIu32": tracking cancelled", recv_msg->stream); nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c); + + res = NXT_UNIT_CANCELLED; + + } else { + res = NXT_UNIT_OK; } pthread_mutex_unlock(&process->incoming.mutex); - return rc; + return res; } static int -nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) +nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx, nxt_unit_mmaps_t *mmaps, + pid_t pid, uint32_t id, nxt_port_mmap_header_t **hdr, + nxt_unit_read_buf_t *rbuf) { + int res, need_rbuf; + nxt_unit_mmap_t *mm; + nxt_unit_ctx_impl_t *ctx_impl; + + mm = nxt_unit_mmap_at(mmaps, id); + if (nxt_slow_path(mm == NULL)) { + nxt_unit_alert(ctx, "failed to allocate mmap"); + + pthread_mutex_unlock(&mmaps->mutex); + + *hdr = NULL; + + return NXT_UNIT_ERROR; + } + + *hdr = mm->hdr; + + if (nxt_fast_path(*hdr != NULL)) { + return NXT_UNIT_OK; + } + + need_rbuf = nxt_queue_is_empty(&mm->awaiting_rbuf); + + nxt_queue_insert_tail(&mm->awaiting_rbuf, &rbuf->link); + + pthread_mutex_unlock(&mmaps->mutex); + + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + nxt_atomic_fetch_add(&ctx_impl->wait_items, 1); + + if (need_rbuf) { + res = nxt_unit_get_mmap(ctx, pid, id); + if (nxt_slow_path(res == NXT_UNIT_ERROR)) { + return NXT_UNIT_ERROR; + } + } + + return NXT_UNIT_AGAIN; +} + + +static int +nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, + nxt_unit_read_buf_t *rbuf) +{ + int res; void *start; uint32_t size; + nxt_unit_mmaps_t *mmaps; nxt_unit_process_t *process; nxt_unit_mmap_buf_t *b, **incoming_tail; nxt_port_mmap_msg_t *mmap_msg, *end; @@ -3819,12 +3915,17 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) incoming_tail = &recv_msg->incoming_buf; + /* Allocating buffer structures. */ for (; mmap_msg < end; mmap_msg++) { b = nxt_unit_mmap_buf_get(ctx); if (nxt_slow_path(b == NULL)) { nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf", recv_msg->stream); + while (recv_msg->incoming_buf != NULL) { + nxt_unit_mmap_buf_release(recv_msg->incoming_buf); + } + return NXT_UNIT_ERROR; } @@ -3835,19 +3936,21 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) b = recv_msg->incoming_buf; mmap_msg = recv_msg->start; - pthread_mutex_lock(&process->incoming.mutex); + mmaps = &process->incoming; + + pthread_mutex_lock(&mmaps->mutex); for (; mmap_msg < end; mmap_msg++) { - hdr = nxt_unit_get_incoming_mmap(ctx, process, mmap_msg->mmap_id); - if (nxt_slow_path(hdr == NULL)) { - pthread_mutex_unlock(&process->incoming.mutex); + res = nxt_unit_check_rbuf_mmap(ctx, mmaps, + recv_msg->pid, mmap_msg->mmap_id, + &hdr, rbuf); - nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: " - "invalid mmap id %d,%"PRIu32, - recv_msg->stream, (int) process->pid, - mmap_msg->mmap_id); + if (nxt_slow_path(res != NXT_UNIT_OK)) { + while (recv_msg->incoming_buf != NULL) { + nxt_unit_mmap_buf_release(recv_msg->incoming_buf); + } - return NXT_UNIT_ERROR; + return res; } start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id); @@ -3874,7 +3977,41 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) (int) mmap_msg->size); } - pthread_mutex_unlock(&process->incoming.mutex); + pthread_mutex_unlock(&mmaps->mutex); + + return NXT_UNIT_OK; +} + + +static int +nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id) +{ + ssize_t res; + nxt_unit_impl_t *lib; + nxt_unit_ctx_impl_t *ctx_impl; + + struct { + nxt_port_msg_t msg; + nxt_port_msg_get_mmap_t get_mmap; + } m; + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + + memset(&m.msg, 0, sizeof(nxt_port_msg_t)); + + m.msg.pid = lib->pid; + m.msg.reply_port = ctx_impl->read_port->id.id; + m.msg.type = _NXT_PORT_MSG_GET_MMAP; + + m.get_mmap.id = id; + + nxt_unit_debug(ctx, "get_mmap: %d %d", (int) pid, (int) id); + + res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL, 0); + if (nxt_slow_path(res != sizeof(m))) { + return NXT_UNIT_ERROR; + } return NXT_UNIT_OK; } @@ -4110,6 +4247,7 @@ int nxt_unit_run_once(nxt_unit_ctx_t *ctx) { int rc; + nxt_queue_link_t *link; nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_read_buf_t *rbuf; @@ -4119,18 +4257,22 @@ nxt_unit_run_once(nxt_unit_ctx_t *ctx) pthread_mutex_lock(&ctx_impl->mutex); - if (ctx_impl->pending_read_head != NULL) { - rbuf = ctx_impl->pending_read_head; - ctx_impl->pending_read_head = rbuf->next; + if (!nxt_queue_is_empty(&ctx_impl->pending_rbuf)) { - if (ctx_impl->pending_read_tail == &rbuf->next) { - ctx_impl->pending_read_tail = &ctx_impl->pending_read_head; - } +next_pending: + + link = nxt_queue_first(&ctx_impl->pending_rbuf); + nxt_queue_remove(link); + + rbuf = nxt_container_of(link, nxt_unit_read_buf_t, link); pthread_mutex_unlock(&ctx_impl->mutex); } else { rbuf = nxt_unit_read_buf_get_impl(ctx_impl); + + pthread_mutex_unlock(&ctx_impl->mutex); + if (nxt_slow_path(rbuf == NULL)) { nxt_unit_ctx_release(ctx_impl); @@ -4142,21 +4284,40 @@ nxt_unit_run_once(nxt_unit_ctx_t *ctx) } if (nxt_fast_path(rbuf->size > 0)) { - rc = nxt_unit_process_msg(ctx, - rbuf->buf, rbuf->size, - rbuf->oob, sizeof(rbuf->oob)); + rc = nxt_unit_process_msg(ctx, rbuf); #if (NXT_DEBUG) - memset(rbuf->buf, 0xAC, rbuf->size); + if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) { + memset(rbuf->buf, 0xAC, rbuf->size); + } #endif } else { rc = NXT_UNIT_ERROR; } - nxt_unit_read_buf_release(ctx, rbuf); + if (nxt_slow_path(rc == NXT_UNIT_AGAIN)) { + rc = NXT_UNIT_OK; + + } else { + nxt_unit_read_buf_release(ctx, rbuf); + } + + if (nxt_slow_path(rc == NXT_UNIT_CANCELLED)) { + rc = NXT_UNIT_OK; + } + + if (nxt_fast_path(rc == NXT_UNIT_OK)) { + pthread_mutex_lock(&ctx_impl->mutex); - nxt_unit_process_ready_req(ctx_impl); + if (!nxt_queue_is_empty(&ctx_impl->pending_rbuf)) { + goto next_pending; + } + + pthread_mutex_unlock(&ctx_impl->mutex); + + nxt_unit_process_ready_req(ctx_impl); + } nxt_unit_ctx_release(ctx_impl); |