diff options
Diffstat (limited to '')
-rw-r--r-- | src/nxt_unit.c | 467 |
1 files changed, 361 insertions, 106 deletions
diff --git a/src/nxt_unit.c b/src/nxt_unit.c index b321a0d4..7fb2826d 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -38,8 +38,8 @@ typedef struct nxt_unit_websocket_frame_impl_s nxt_unit_websocket_frame_impl_t; static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init); static int nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, void *data); -nxt_inline void nxt_unit_ctx_use(nxt_unit_ctx_impl_t *ctx_impl); -nxt_inline void nxt_unit_ctx_release(nxt_unit_ctx_impl_t *ctx_impl); +nxt_inline void nxt_unit_ctx_use(nxt_unit_ctx_t *ctx); +nxt_inline void nxt_unit_ctx_release(nxt_unit_ctx_t *ctx); nxt_inline void nxt_unit_lib_use(nxt_unit_impl_t *lib); nxt_inline void nxt_unit_lib_release(nxt_unit_impl_t *lib); nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head, @@ -58,6 +58,7 @@ static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, nxt_unit_port_id_t *port_id); +static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req); static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg); static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx); @@ -122,9 +123,12 @@ static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_impl_t *lib, static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib, pid_t pid, int remove); static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib); -static void nxt_unit_read_buf(nxt_unit_ctx_t *ctx, - nxt_unit_read_buf_t *rbuf); -static void nxt_unit_process_ready_req(nxt_unit_ctx_impl_t *ctx_impl); +static int nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx); +static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf); +static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx); +static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx); +static int nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, + nxt_unit_port_t *port); static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl); static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx); @@ -150,9 +154,8 @@ static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx, const void *oob, size_t oob_size); static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd, const void *buf, size_t buf_size, const void *oob, size_t oob_size); -static ssize_t nxt_unit_port_recv(nxt_unit_ctx_t *ctx, - nxt_unit_port_t *port, void *buf, size_t buf_size, - void *oob, size_t oob_size); +static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, + nxt_unit_read_buf_t *rbuf); static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, nxt_unit_port_t *port); @@ -308,6 +311,7 @@ struct nxt_unit_impl_s { nxt_lvlhsh_t ports; /* of nxt_unit_port_impl_t */ nxt_unit_port_t *router_port; + nxt_unit_port_t *shared_port; nxt_queue_t contexts; /* of nxt_unit_ctx_impl_t */ @@ -452,7 +456,7 @@ nxt_unit_init(nxt_unit_init_t *init) fail: - nxt_unit_ctx_release(&lib->main_ctx); + nxt_unit_ctx_release(&lib->main_ctx.ctx); return NULL; } @@ -496,6 +500,7 @@ nxt_unit_create(nxt_unit_init_t *init) lib->use_count = 0; lib->router_port = NULL; + lib->shared_port = NULL; rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data); if (nxt_slow_path(rc != NXT_UNIT_OK)) { @@ -570,16 +575,23 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, nxt_inline void -nxt_unit_ctx_use(nxt_unit_ctx_impl_t *ctx_impl) +nxt_unit_ctx_use(nxt_unit_ctx_t *ctx) { + nxt_unit_ctx_impl_t *ctx_impl; + + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + nxt_atomic_fetch_add(&ctx_impl->use_count, 1); } nxt_inline void -nxt_unit_ctx_release(nxt_unit_ctx_impl_t *ctx_impl) +nxt_unit_ctx_release(nxt_unit_ctx_t *ctx) { - long c; + long c; + nxt_unit_ctx_impl_t *ctx_impl; + + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); c = nxt_atomic_fetch_add(&ctx_impl->use_count, -1); @@ -624,6 +636,10 @@ nxt_unit_lib_release(nxt_unit_impl_t *lib) nxt_unit_port_release(lib->router_port); } + if (nxt_fast_path(lib->shared_port != NULL)) { + nxt_unit_port_release(lib->shared_port); + } + free(lib); } } @@ -805,6 +821,15 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) recv_msg.incoming_buf = NULL; if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) { + if (nxt_slow_path(rbuf->size == 0)) { + nxt_unit_debug(ctx, "read port closed"); + + nxt_unit_quit(ctx); + rc = NXT_UNIT_OK; + + goto fail; + } + nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size); goto fail; } @@ -946,6 +971,13 @@ fail: nxt_unit_process_release(recv_msg.process); } + if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) { +#if (NXT_DEBUG) + memset(rbuf->buf, 0xAC, rbuf->size); +#endif + nxt_unit_read_buf_release(ctx, rbuf); + } + return rc; } @@ -954,6 +986,7 @@ static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) { int nb; + nxt_unit_impl_t *lib; nxt_unit_port_t new_port, *port; nxt_port_msg_new_port_t *new_port_msg; @@ -978,21 +1011,33 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) recv_msg->stream, (int) new_port_msg->pid, (int) new_port_msg->id, recv_msg->fd); - nb = 0; + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + + if (new_port_msg->id == (nxt_port_id_t) -1) { + nxt_unit_port_id_init(&new_port.id, lib->pid, new_port_msg->id); - if (nxt_slow_path(ioctl(recv_msg->fd, FIONBIO, &nb) == -1)) { - nxt_unit_alert(ctx, "#%"PRIu32": new_port: ioctl(%d, FIONBIO, 0) " - "failed: %s (%d)", - recv_msg->stream, recv_msg->fd, strerror(errno), errno); + new_port.in_fd = recv_msg->fd; + new_port.out_fd = -1; - return NXT_UNIT_ERROR; + } else { + nb = 0; + + if (nxt_slow_path(ioctl(recv_msg->fd, FIONBIO, &nb) == -1)) { + nxt_unit_alert(ctx, "#%"PRIu32": new_port: ioctl(%d, FIONBIO, 0) " + "failed: %s (%d)", + recv_msg->stream, recv_msg->fd, strerror(errno), errno); + + return NXT_UNIT_ERROR; + } + + nxt_unit_port_id_init(&new_port.id, new_port_msg->pid, + new_port_msg->id); + + new_port.in_fd = -1; + new_port.out_fd = recv_msg->fd; } - nxt_unit_port_id_init(&new_port.id, new_port_msg->pid, - new_port_msg->id); - new_port.in_fd = -1; - new_port.out_fd = recv_msg->fd; new_port.data = NULL; recv_msg->fd = -1; @@ -1002,7 +1047,12 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) return NXT_UNIT_ERROR; } - nxt_unit_port_release(port); + if (new_port_msg->id == (nxt_port_id_t) -1) { + lib->shared_port = port; + + } else { + nxt_unit_port_release(port); + } return NXT_UNIT_OK; } @@ -1102,6 +1152,11 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) } if (nxt_fast_path(res == NXT_UNIT_OK)) { + res = nxt_unit_send_req_headers_ack(req); + if (nxt_slow_path(res != NXT_UNIT_OK)) { + return res; + } + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); lib->callbacks.request_handler(req); @@ -1221,6 +1276,36 @@ nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, static int +nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req) +{ + ssize_t res; + nxt_port_msg_t msg; + nxt_unit_impl_t *lib; + nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_request_info_impl_t *req_impl; + + lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit); + ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx); + req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); + + memset(&msg, 0, sizeof(nxt_port_msg_t)); + + msg.stream = req_impl->stream; + msg.pid = lib->pid; + msg.reply_port = ctx_impl->read_port->id.id; + msg.type = _NXT_PORT_MSG_REQ_HEADERS_ACK; + + res = nxt_unit_port_send(req->ctx, req->response_port, + &msg, sizeof(msg), NULL, 0); + if (nxt_slow_path(res != sizeof(msg))) { + return NXT_UNIT_ERROR; + } + + return NXT_UNIT_OK; +} + + +static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) { size_t hsize; @@ -3267,7 +3352,9 @@ nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx) return NXT_UNIT_ERROR; } - nxt_unit_read_buf(ctx, rbuf); + memset(rbuf->oob, 0, sizeof(struct cmsghdr)); + + nxt_unit_port_recv(ctx, ctx_impl->read_port, rbuf); if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) { nxt_unit_read_buf_release(ctx, rbuf); @@ -4218,26 +4305,23 @@ nxt_unit_process_pop_first(nxt_unit_impl_t *lib) int nxt_unit_run(nxt_unit_ctx_t *ctx) { - int rc; - nxt_unit_impl_t *lib; - nxt_unit_ctx_impl_t *ctx_impl; - - ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + int rc; + nxt_unit_impl_t *lib; - nxt_unit_ctx_use(ctx_impl); + nxt_unit_ctx_use(ctx); lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); rc = NXT_UNIT_OK; while (nxt_fast_path(lib->online)) { - rc = nxt_unit_run_once(ctx); + rc = nxt_unit_run_once_impl(ctx); - if (nxt_slow_path(rc != NXT_UNIT_OK)) { + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { break; } } - nxt_unit_ctx_release(ctx_impl); + nxt_unit_ctx_release(ctx); return rc; } @@ -4246,109 +4330,163 @@ nxt_unit_run(nxt_unit_ctx_t *ctx) int nxt_unit_run_once(nxt_unit_ctx_t *ctx) { - int rc; - nxt_queue_link_t *link; - nxt_unit_ctx_impl_t *ctx_impl; - nxt_unit_read_buf_t *rbuf; + int rc; - ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + nxt_unit_ctx_use(ctx); - nxt_unit_ctx_use(ctx_impl); + rc = nxt_unit_run_once_impl(ctx); - pthread_mutex_lock(&ctx_impl->mutex); + nxt_unit_ctx_release(ctx); - if (!nxt_queue_is_empty(&ctx_impl->pending_rbuf)) { + return rc; +} -next_pending: - link = nxt_queue_first(&ctx_impl->pending_rbuf); - nxt_queue_remove(link); +static int +nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx) +{ + int rc; + nxt_unit_read_buf_t *rbuf; - rbuf = nxt_container_of(link, nxt_unit_read_buf_t, link); + rbuf = nxt_unit_read_buf_get(ctx); + if (nxt_slow_path(rbuf == NULL)) { + return NXT_UNIT_ERROR; + } - pthread_mutex_unlock(&ctx_impl->mutex); + rc = nxt_unit_read_buf(ctx, rbuf); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + nxt_unit_read_buf_release(ctx, rbuf); - } else { - rbuf = nxt_unit_read_buf_get_impl(ctx_impl); + return rc; + } - pthread_mutex_unlock(&ctx_impl->mutex); + rc = nxt_unit_process_msg(ctx, rbuf); + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { + return NXT_UNIT_ERROR; + } - if (nxt_slow_path(rbuf == NULL)) { + rc = nxt_unit_process_pending_rbuf(ctx); + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { + return NXT_UNIT_ERROR; + } - nxt_unit_ctx_release(ctx_impl); + nxt_unit_process_ready_req(ctx); - return NXT_UNIT_ERROR; - } + return rc; +} - nxt_unit_read_buf(ctx, rbuf); - } - if (nxt_fast_path(rbuf->size > 0)) { - rc = nxt_unit_process_msg(ctx, rbuf); +static int +nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) +{ + int res, err; + nxt_unit_impl_t *lib; + nxt_unit_ctx_impl_t *ctx_impl; + struct pollfd fds[2]; -#if (NXT_DEBUG) - if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) { - memset(rbuf->buf, 0xAC, rbuf->size); - } -#endif + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); - } else { - rc = NXT_UNIT_ERROR; + memset(rbuf->oob, 0, sizeof(struct cmsghdr)); + + if (ctx_impl->wait_items > 0 || lib->shared_port == NULL) { + return nxt_unit_port_recv(ctx, ctx_impl->read_port, rbuf); } - if (nxt_slow_path(rc == NXT_UNIT_AGAIN)) { - rc = NXT_UNIT_OK; +retry: - } else { - nxt_unit_read_buf_release(ctx, rbuf); - } + fds[0].fd = ctx_impl->read_port->in_fd; + fds[0].events = POLLIN; + fds[0].revents = 0; - if (nxt_slow_path(rc == NXT_UNIT_CANCELLED)) { - rc = NXT_UNIT_OK; - } + fds[1].fd = lib->shared_port->in_fd; + fds[1].events = POLLIN; + fds[1].revents = 0; - if (nxt_fast_path(rc == NXT_UNIT_OK)) { - pthread_mutex_lock(&ctx_impl->mutex); + res = poll(fds, 2, -1); + if (nxt_slow_path(res < 0)) { + err = errno; - if (!nxt_queue_is_empty(&ctx_impl->pending_rbuf)) { - goto next_pending; + if (err == EINTR) { + goto retry; } - pthread_mutex_unlock(&ctx_impl->mutex); + nxt_unit_alert(ctx, "poll() failed: %s (%d)", + strerror(err), err); - nxt_unit_process_ready_req(ctx_impl); + rbuf->size = -1; + + return (err == EAGAIN) ? NXT_UNIT_AGAIN : NXT_UNIT_ERROR; } - nxt_unit_ctx_release(ctx_impl); + if ((fds[0].revents & POLLIN) != 0) { + return nxt_unit_port_recv(ctx, ctx_impl->read_port, rbuf); + } - return rc; + if ((fds[1].revents & POLLIN) != 0) { + return nxt_unit_port_recv(ctx, lib->shared_port, rbuf); + } + + rbuf->size = -1; + + return NXT_UNIT_ERROR; } -static void -nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) +static int +nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx) { + int rc; + nxt_queue_t pending_rbuf; nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_read_buf_t *rbuf; + + nxt_queue_init(&pending_rbuf); ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); - memset(rbuf->oob, 0, sizeof(struct cmsghdr)); + pthread_mutex_lock(&ctx_impl->mutex); + + if (nxt_queue_is_empty(&ctx_impl->pending_rbuf)) { + pthread_mutex_unlock(&ctx_impl->mutex); + + return NXT_UNIT_OK; + } + + nxt_queue_add(&pending_rbuf, &ctx_impl->pending_rbuf); + nxt_queue_init(&ctx_impl->pending_rbuf); - rbuf->size = nxt_unit_port_recv(ctx, ctx_impl->read_port, - rbuf->buf, sizeof(rbuf->buf), - rbuf->oob, sizeof(rbuf->oob)); + pthread_mutex_unlock(&ctx_impl->mutex); + + rc = NXT_UNIT_OK; + + nxt_queue_each(rbuf, &pending_rbuf, nxt_unit_read_buf_t, link) { + + if (nxt_fast_path(rc != NXT_UNIT_ERROR)) { + rc = nxt_unit_process_msg(&ctx_impl->ctx, rbuf); + + } else { + nxt_unit_read_buf_release(ctx, rbuf); + } + + } nxt_queue_loop; + + return rc; } static void -nxt_unit_process_ready_req(nxt_unit_ctx_impl_t *ctx_impl) +nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx) { nxt_queue_t ready_req; nxt_unit_impl_t *lib; + nxt_unit_ctx_impl_t *ctx_impl; nxt_unit_request_info_impl_t *req_impl; nxt_queue_init(&ready_req); + ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); + pthread_mutex_lock(&ctx_impl->mutex); if (nxt_queue_is_empty(&ctx_impl->ready_req)) { @@ -4367,20 +4505,121 @@ nxt_unit_process_ready_req(nxt_unit_ctx_impl_t *ctx_impl) { lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit); + (void) nxt_unit_send_req_headers_ack(&req_impl->req); + lib->callbacks.request_handler(&req_impl->req); } nxt_queue_loop; } -void -nxt_unit_done(nxt_unit_ctx_t *ctx) +int +nxt_unit_run_ctx(nxt_unit_ctx_t *ctx) { + int rc; + nxt_unit_impl_t *lib; nxt_unit_ctx_impl_t *ctx_impl; + nxt_unit_ctx_use(ctx); + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); - nxt_unit_ctx_release(ctx_impl); + rc = NXT_UNIT_OK; + + while (nxt_fast_path(lib->online)) { + rc = nxt_unit_process_port_msg_impl(ctx, ctx_impl->read_port); + + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { + break; + } + } + + nxt_unit_ctx_release(ctx); + + return rc; +} + + +int +nxt_unit_run_shared(nxt_unit_ctx_t *ctx) +{ + int rc; + nxt_unit_impl_t *lib; + + nxt_unit_ctx_use(ctx); + + lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); + rc = NXT_UNIT_OK; + + while (nxt_fast_path(lib->online)) { + rc = nxt_unit_process_port_msg_impl(ctx, lib->shared_port); + + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { + break; + } + } + + nxt_unit_ctx_release(ctx); + + return rc; +} + + +int +nxt_unit_process_port_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) +{ + int rc; + + nxt_unit_ctx_use(ctx); + + rc = nxt_unit_process_port_msg_impl(ctx, port); + + nxt_unit_ctx_release(ctx); + + return rc; +} + + +static int +nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) +{ + int rc; + nxt_unit_read_buf_t *rbuf; + + rbuf = nxt_unit_read_buf_get(ctx); + if (nxt_slow_path(rbuf == NULL)) { + return NXT_UNIT_ERROR; + } + + memset(rbuf->oob, 0, sizeof(struct cmsghdr)); + + rc = nxt_unit_port_recv(ctx, port, rbuf); + if (nxt_slow_path(rc != NXT_UNIT_OK)) { + nxt_unit_read_buf_release(ctx, rbuf); + return rc; + } + + rc = nxt_unit_process_msg(ctx, rbuf); + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { + return NXT_UNIT_ERROR; + } + + rc = nxt_unit_process_pending_rbuf(ctx); + if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { + return NXT_UNIT_ERROR; + } + + nxt_unit_process_ready_req(ctx); + + return rc; +} + + +void +nxt_unit_done(nxt_unit_ctx_t *ctx) +{ + nxt_unit_ctx_release(ctx); } @@ -5056,12 +5295,11 @@ retry: } -static ssize_t +static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, - void *buf, size_t buf_size, void *oob, size_t oob_size) + nxt_unit_read_buf_t *rbuf) { - int fd; - ssize_t res; + int fd, err; struct iovec iov[1]; struct msghdr msg; nxt_unit_impl_t *lib; @@ -5069,40 +5307,57 @@ nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); if (lib->callbacks.port_recv != NULL) { - return lib->callbacks.port_recv(ctx, port, - buf, buf_size, oob, oob_size); + rbuf->size = lib->callbacks.port_recv(ctx, port, + rbuf->buf, sizeof(rbuf->buf), + rbuf->oob, sizeof(rbuf->oob)); + + if (nxt_slow_path(rbuf->size < 0)) { + return NXT_UNIT_ERROR; + } + + return NXT_UNIT_OK; } - iov[0].iov_base = buf; - iov[0].iov_len = buf_size; + iov[0].iov_base = rbuf->buf; + iov[0].iov_len = sizeof(rbuf->buf); msg.msg_name = NULL; msg.msg_namelen = 0; msg.msg_iov = iov; msg.msg_iovlen = 1; msg.msg_flags = 0; - msg.msg_control = oob; - msg.msg_controllen = oob_size; + msg.msg_control = rbuf->oob; + msg.msg_controllen = sizeof(rbuf->oob); fd = port->in_fd; retry: - res = recvmsg(fd, &msg, 0); + rbuf->size = recvmsg(fd, &msg, 0); - if (nxt_slow_path(res == -1)) { - if (errno == EINTR) { + if (nxt_slow_path(rbuf->size == -1)) { + err = errno; + + if (err == EINTR) { goto retry; } + if (err == EAGAIN) { + nxt_unit_debug(ctx, "recvmsg(%d) failed: %s (%d)", + fd, strerror(errno), errno); + + return NXT_UNIT_AGAIN; + } + nxt_unit_alert(ctx, "recvmsg(%d) failed: %s (%d)", fd, strerror(errno), errno); - } else { - nxt_unit_debug(ctx, "recvmsg(%d): %d", fd, (int) res); + return NXT_UNIT_ERROR; } - return res; + nxt_unit_debug(ctx, "recvmsg(%d): %d", fd, (int) rbuf->size); + + return NXT_UNIT_OK; } |