summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_unit.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nxt_unit.c')
-rw-r--r--src/nxt_unit.c467
1 files changed, 361 insertions, 106 deletions
diff --git a/src/nxt_unit.c b/src/nxt_unit.c
index b321a0d4..7fb2826d 100644
--- a/src/nxt_unit.c
+++ b/src/nxt_unit.c
@@ -38,8 +38,8 @@ typedef struct nxt_unit_websocket_frame_impl_s nxt_unit_websocket_frame_impl_t;
static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init);
static int nxt_unit_ctx_init(nxt_unit_impl_t *lib,
nxt_unit_ctx_impl_t *ctx_impl, void *data);
-nxt_inline void nxt_unit_ctx_use(nxt_unit_ctx_impl_t *ctx_impl);
-nxt_inline void nxt_unit_ctx_release(nxt_unit_ctx_impl_t *ctx_impl);
+nxt_inline void nxt_unit_ctx_use(nxt_unit_ctx_t *ctx);
+nxt_inline void nxt_unit_ctx_release(nxt_unit_ctx_t *ctx);
nxt_inline void nxt_unit_lib_use(nxt_unit_impl_t *lib);
nxt_inline void nxt_unit_lib_release(nxt_unit_impl_t *lib);
nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
@@ -58,6 +58,7 @@ static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
nxt_unit_port_id_t *port_id);
+static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req);
static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx);
@@ -122,9 +123,12 @@ static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_impl_t *lib,
static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib,
pid_t pid, int remove);
static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
-static void nxt_unit_read_buf(nxt_unit_ctx_t *ctx,
- nxt_unit_read_buf_t *rbuf);
-static void nxt_unit_process_ready_req(nxt_unit_ctx_impl_t *ctx_impl);
+static int nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx);
+static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
+static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx);
+static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx);
+static int nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx,
+ nxt_unit_port_t *port);
static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl);
static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx);
@@ -150,9 +154,8 @@ static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx,
const void *oob, size_t oob_size);
static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
const void *buf, size_t buf_size, const void *oob, size_t oob_size);
-static ssize_t nxt_unit_port_recv(nxt_unit_ctx_t *ctx,
- nxt_unit_port_t *port, void *buf, size_t buf_size,
- void *oob, size_t oob_size);
+static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
+ nxt_unit_read_buf_t *rbuf);
static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
nxt_unit_port_t *port);
@@ -308,6 +311,7 @@ struct nxt_unit_impl_s {
nxt_lvlhsh_t ports; /* of nxt_unit_port_impl_t */
nxt_unit_port_t *router_port;
+ nxt_unit_port_t *shared_port;
nxt_queue_t contexts; /* of nxt_unit_ctx_impl_t */
@@ -452,7 +456,7 @@ nxt_unit_init(nxt_unit_init_t *init)
fail:
- nxt_unit_ctx_release(&lib->main_ctx);
+ nxt_unit_ctx_release(&lib->main_ctx.ctx);
return NULL;
}
@@ -496,6 +500,7 @@ nxt_unit_create(nxt_unit_init_t *init)
lib->use_count = 0;
lib->router_port = NULL;
+ lib->shared_port = NULL;
rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
@@ -570,16 +575,23 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
nxt_inline void
-nxt_unit_ctx_use(nxt_unit_ctx_impl_t *ctx_impl)
+nxt_unit_ctx_use(nxt_unit_ctx_t *ctx)
{
+ nxt_unit_ctx_impl_t *ctx_impl;
+
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
nxt_atomic_fetch_add(&ctx_impl->use_count, 1);
}
nxt_inline void
-nxt_unit_ctx_release(nxt_unit_ctx_impl_t *ctx_impl)
+nxt_unit_ctx_release(nxt_unit_ctx_t *ctx)
{
- long c;
+ long c;
+ nxt_unit_ctx_impl_t *ctx_impl;
+
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
c = nxt_atomic_fetch_add(&ctx_impl->use_count, -1);
@@ -624,6 +636,10 @@ nxt_unit_lib_release(nxt_unit_impl_t *lib)
nxt_unit_port_release(lib->router_port);
}
+ if (nxt_fast_path(lib->shared_port != NULL)) {
+ nxt_unit_port_release(lib->shared_port);
+ }
+
free(lib);
}
}
@@ -805,6 +821,15 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
recv_msg.incoming_buf = NULL;
if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) {
+ if (nxt_slow_path(rbuf->size == 0)) {
+ nxt_unit_debug(ctx, "read port closed");
+
+ nxt_unit_quit(ctx);
+ rc = NXT_UNIT_OK;
+
+ goto fail;
+ }
+
nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size);
goto fail;
}
@@ -946,6 +971,13 @@ fail:
nxt_unit_process_release(recv_msg.process);
}
+ if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) {
+#if (NXT_DEBUG)
+ memset(rbuf->buf, 0xAC, rbuf->size);
+#endif
+ nxt_unit_read_buf_release(ctx, rbuf);
+ }
+
return rc;
}
@@ -954,6 +986,7 @@ static int
nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
{
int nb;
+ nxt_unit_impl_t *lib;
nxt_unit_port_t new_port, *port;
nxt_port_msg_new_port_t *new_port_msg;
@@ -978,21 +1011,33 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
recv_msg->stream, (int) new_port_msg->pid,
(int) new_port_msg->id, recv_msg->fd);
- nb = 0;
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+ if (new_port_msg->id == (nxt_port_id_t) -1) {
+ nxt_unit_port_id_init(&new_port.id, lib->pid, new_port_msg->id);
- if (nxt_slow_path(ioctl(recv_msg->fd, FIONBIO, &nb) == -1)) {
- nxt_unit_alert(ctx, "#%"PRIu32": new_port: ioctl(%d, FIONBIO, 0) "
- "failed: %s (%d)",
- recv_msg->stream, recv_msg->fd, strerror(errno), errno);
+ new_port.in_fd = recv_msg->fd;
+ new_port.out_fd = -1;
- return NXT_UNIT_ERROR;
+ } else {
+ nb = 0;
+
+ if (nxt_slow_path(ioctl(recv_msg->fd, FIONBIO, &nb) == -1)) {
+ nxt_unit_alert(ctx, "#%"PRIu32": new_port: ioctl(%d, FIONBIO, 0) "
+ "failed: %s (%d)",
+ recv_msg->stream, recv_msg->fd, strerror(errno), errno);
+
+ return NXT_UNIT_ERROR;
+ }
+
+ nxt_unit_port_id_init(&new_port.id, new_port_msg->pid,
+ new_port_msg->id);
+
+ new_port.in_fd = -1;
+ new_port.out_fd = recv_msg->fd;
}
- nxt_unit_port_id_init(&new_port.id, new_port_msg->pid,
- new_port_msg->id);
- new_port.in_fd = -1;
- new_port.out_fd = recv_msg->fd;
new_port.data = NULL;
recv_msg->fd = -1;
@@ -1002,7 +1047,12 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
return NXT_UNIT_ERROR;
}
- nxt_unit_port_release(port);
+ if (new_port_msg->id == (nxt_port_id_t) -1) {
+ lib->shared_port = port;
+
+ } else {
+ nxt_unit_port_release(port);
+ }
return NXT_UNIT_OK;
}
@@ -1102,6 +1152,11 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
}
if (nxt_fast_path(res == NXT_UNIT_OK)) {
+ res = nxt_unit_send_req_headers_ack(req);
+ if (nxt_slow_path(res != NXT_UNIT_OK)) {
+ return res;
+ }
+
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
lib->callbacks.request_handler(req);
@@ -1221,6 +1276,36 @@ nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
static int
+nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req)
+{
+ ssize_t res;
+ nxt_port_msg_t msg;
+ nxt_unit_impl_t *lib;
+ nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_request_info_impl_t *req_impl;
+
+ lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
+ ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
+ req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
+
+ memset(&msg, 0, sizeof(nxt_port_msg_t));
+
+ msg.stream = req_impl->stream;
+ msg.pid = lib->pid;
+ msg.reply_port = ctx_impl->read_port->id.id;
+ msg.type = _NXT_PORT_MSG_REQ_HEADERS_ACK;
+
+ res = nxt_unit_port_send(req->ctx, req->response_port,
+ &msg, sizeof(msg), NULL, 0);
+ if (nxt_slow_path(res != sizeof(msg))) {
+ return NXT_UNIT_ERROR;
+ }
+
+ return NXT_UNIT_OK;
+}
+
+
+static int
nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
{
size_t hsize;
@@ -3267,7 +3352,9 @@ nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx)
return NXT_UNIT_ERROR;
}
- nxt_unit_read_buf(ctx, rbuf);
+ memset(rbuf->oob, 0, sizeof(struct cmsghdr));
+
+ nxt_unit_port_recv(ctx, ctx_impl->read_port, rbuf);
if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) {
nxt_unit_read_buf_release(ctx, rbuf);
@@ -4218,26 +4305,23 @@ nxt_unit_process_pop_first(nxt_unit_impl_t *lib)
int
nxt_unit_run(nxt_unit_ctx_t *ctx)
{
- int rc;
- nxt_unit_impl_t *lib;
- nxt_unit_ctx_impl_t *ctx_impl;
-
- ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+ int rc;
+ nxt_unit_impl_t *lib;
- nxt_unit_ctx_use(ctx_impl);
+ nxt_unit_ctx_use(ctx);
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
rc = NXT_UNIT_OK;
while (nxt_fast_path(lib->online)) {
- rc = nxt_unit_run_once(ctx);
+ rc = nxt_unit_run_once_impl(ctx);
- if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
break;
}
}
- nxt_unit_ctx_release(ctx_impl);
+ nxt_unit_ctx_release(ctx);
return rc;
}
@@ -4246,109 +4330,163 @@ nxt_unit_run(nxt_unit_ctx_t *ctx)
int
nxt_unit_run_once(nxt_unit_ctx_t *ctx)
{
- int rc;
- nxt_queue_link_t *link;
- nxt_unit_ctx_impl_t *ctx_impl;
- nxt_unit_read_buf_t *rbuf;
+ int rc;
- ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+ nxt_unit_ctx_use(ctx);
- nxt_unit_ctx_use(ctx_impl);
+ rc = nxt_unit_run_once_impl(ctx);
- pthread_mutex_lock(&ctx_impl->mutex);
+ nxt_unit_ctx_release(ctx);
- if (!nxt_queue_is_empty(&ctx_impl->pending_rbuf)) {
+ return rc;
+}
-next_pending:
- link = nxt_queue_first(&ctx_impl->pending_rbuf);
- nxt_queue_remove(link);
+static int
+nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx)
+{
+ int rc;
+ nxt_unit_read_buf_t *rbuf;
- rbuf = nxt_container_of(link, nxt_unit_read_buf_t, link);
+ rbuf = nxt_unit_read_buf_get(ctx);
+ if (nxt_slow_path(rbuf == NULL)) {
+ return NXT_UNIT_ERROR;
+ }
- pthread_mutex_unlock(&ctx_impl->mutex);
+ rc = nxt_unit_read_buf(ctx, rbuf);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ nxt_unit_read_buf_release(ctx, rbuf);
- } else {
- rbuf = nxt_unit_read_buf_get_impl(ctx_impl);
+ return rc;
+ }
- pthread_mutex_unlock(&ctx_impl->mutex);
+ rc = nxt_unit_process_msg(ctx, rbuf);
+ if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
+ return NXT_UNIT_ERROR;
+ }
- if (nxt_slow_path(rbuf == NULL)) {
+ rc = nxt_unit_process_pending_rbuf(ctx);
+ if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
+ return NXT_UNIT_ERROR;
+ }
- nxt_unit_ctx_release(ctx_impl);
+ nxt_unit_process_ready_req(ctx);
- return NXT_UNIT_ERROR;
- }
+ return rc;
+}
- nxt_unit_read_buf(ctx, rbuf);
- }
- if (nxt_fast_path(rbuf->size > 0)) {
- rc = nxt_unit_process_msg(ctx, rbuf);
+static int
+nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
+{
+ int res, err;
+ nxt_unit_impl_t *lib;
+ nxt_unit_ctx_impl_t *ctx_impl;
+ struct pollfd fds[2];
-#if (NXT_DEBUG)
- if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) {
- memset(rbuf->buf, 0xAC, rbuf->size);
- }
-#endif
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
- } else {
- rc = NXT_UNIT_ERROR;
+ memset(rbuf->oob, 0, sizeof(struct cmsghdr));
+
+ if (ctx_impl->wait_items > 0 || lib->shared_port == NULL) {
+ return nxt_unit_port_recv(ctx, ctx_impl->read_port, rbuf);
}
- if (nxt_slow_path(rc == NXT_UNIT_AGAIN)) {
- rc = NXT_UNIT_OK;
+retry:
- } else {
- nxt_unit_read_buf_release(ctx, rbuf);
- }
+ fds[0].fd = ctx_impl->read_port->in_fd;
+ fds[0].events = POLLIN;
+ fds[0].revents = 0;
- if (nxt_slow_path(rc == NXT_UNIT_CANCELLED)) {
- rc = NXT_UNIT_OK;
- }
+ fds[1].fd = lib->shared_port->in_fd;
+ fds[1].events = POLLIN;
+ fds[1].revents = 0;
- if (nxt_fast_path(rc == NXT_UNIT_OK)) {
- pthread_mutex_lock(&ctx_impl->mutex);
+ res = poll(fds, 2, -1);
+ if (nxt_slow_path(res < 0)) {
+ err = errno;
- if (!nxt_queue_is_empty(&ctx_impl->pending_rbuf)) {
- goto next_pending;
+ if (err == EINTR) {
+ goto retry;
}
- pthread_mutex_unlock(&ctx_impl->mutex);
+ nxt_unit_alert(ctx, "poll() failed: %s (%d)",
+ strerror(err), err);
- nxt_unit_process_ready_req(ctx_impl);
+ rbuf->size = -1;
+
+ return (err == EAGAIN) ? NXT_UNIT_AGAIN : NXT_UNIT_ERROR;
}
- nxt_unit_ctx_release(ctx_impl);
+ if ((fds[0].revents & POLLIN) != 0) {
+ return nxt_unit_port_recv(ctx, ctx_impl->read_port, rbuf);
+ }
- return rc;
+ if ((fds[1].revents & POLLIN) != 0) {
+ return nxt_unit_port_recv(ctx, lib->shared_port, rbuf);
+ }
+
+ rbuf->size = -1;
+
+ return NXT_UNIT_ERROR;
}
-static void
-nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
+static int
+nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx)
{
+ int rc;
+ nxt_queue_t pending_rbuf;
nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_read_buf_t *rbuf;
+
+ nxt_queue_init(&pending_rbuf);
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
- memset(rbuf->oob, 0, sizeof(struct cmsghdr));
+ pthread_mutex_lock(&ctx_impl->mutex);
+
+ if (nxt_queue_is_empty(&ctx_impl->pending_rbuf)) {
+ pthread_mutex_unlock(&ctx_impl->mutex);
+
+ return NXT_UNIT_OK;
+ }
+
+ nxt_queue_add(&pending_rbuf, &ctx_impl->pending_rbuf);
+ nxt_queue_init(&ctx_impl->pending_rbuf);
- rbuf->size = nxt_unit_port_recv(ctx, ctx_impl->read_port,
- rbuf->buf, sizeof(rbuf->buf),
- rbuf->oob, sizeof(rbuf->oob));
+ pthread_mutex_unlock(&ctx_impl->mutex);
+
+ rc = NXT_UNIT_OK;
+
+ nxt_queue_each(rbuf, &pending_rbuf, nxt_unit_read_buf_t, link) {
+
+ if (nxt_fast_path(rc != NXT_UNIT_ERROR)) {
+ rc = nxt_unit_process_msg(&ctx_impl->ctx, rbuf);
+
+ } else {
+ nxt_unit_read_buf_release(ctx, rbuf);
+ }
+
+ } nxt_queue_loop;
+
+ return rc;
}
static void
-nxt_unit_process_ready_req(nxt_unit_ctx_impl_t *ctx_impl)
+nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx)
{
nxt_queue_t ready_req;
nxt_unit_impl_t *lib;
+ nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_request_info_impl_t *req_impl;
nxt_queue_init(&ready_req);
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
pthread_mutex_lock(&ctx_impl->mutex);
if (nxt_queue_is_empty(&ctx_impl->ready_req)) {
@@ -4367,20 +4505,121 @@ nxt_unit_process_ready_req(nxt_unit_ctx_impl_t *ctx_impl)
{
lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit);
+ (void) nxt_unit_send_req_headers_ack(&req_impl->req);
+
lib->callbacks.request_handler(&req_impl->req);
} nxt_queue_loop;
}
-void
-nxt_unit_done(nxt_unit_ctx_t *ctx)
+int
+nxt_unit_run_ctx(nxt_unit_ctx_t *ctx)
{
+ int rc;
+ nxt_unit_impl_t *lib;
nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_ctx_use(ctx);
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
- nxt_unit_ctx_release(ctx_impl);
+ rc = NXT_UNIT_OK;
+
+ while (nxt_fast_path(lib->online)) {
+ rc = nxt_unit_process_port_msg_impl(ctx, ctx_impl->read_port);
+
+ if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
+ break;
+ }
+ }
+
+ nxt_unit_ctx_release(ctx);
+
+ return rc;
+}
+
+
+int
+nxt_unit_run_shared(nxt_unit_ctx_t *ctx)
+{
+ int rc;
+ nxt_unit_impl_t *lib;
+
+ nxt_unit_ctx_use(ctx);
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ rc = NXT_UNIT_OK;
+
+ while (nxt_fast_path(lib->online)) {
+ rc = nxt_unit_process_port_msg_impl(ctx, lib->shared_port);
+
+ if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
+ break;
+ }
+ }
+
+ nxt_unit_ctx_release(ctx);
+
+ return rc;
+}
+
+
+int
+nxt_unit_process_port_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
+{
+ int rc;
+
+ nxt_unit_ctx_use(ctx);
+
+ rc = nxt_unit_process_port_msg_impl(ctx, port);
+
+ nxt_unit_ctx_release(ctx);
+
+ return rc;
+}
+
+
+static int
+nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
+{
+ int rc;
+ nxt_unit_read_buf_t *rbuf;
+
+ rbuf = nxt_unit_read_buf_get(ctx);
+ if (nxt_slow_path(rbuf == NULL)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ memset(rbuf->oob, 0, sizeof(struct cmsghdr));
+
+ rc = nxt_unit_port_recv(ctx, port, rbuf);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ nxt_unit_read_buf_release(ctx, rbuf);
+ return rc;
+ }
+
+ rc = nxt_unit_process_msg(ctx, rbuf);
+ if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ rc = nxt_unit_process_pending_rbuf(ctx);
+ if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ nxt_unit_process_ready_req(ctx);
+
+ return rc;
+}
+
+
+void
+nxt_unit_done(nxt_unit_ctx_t *ctx)
+{
+ nxt_unit_ctx_release(ctx);
}
@@ -5056,12 +5295,11 @@ retry:
}
-static ssize_t
+static int
nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
- void *buf, size_t buf_size, void *oob, size_t oob_size)
+ nxt_unit_read_buf_t *rbuf)
{
- int fd;
- ssize_t res;
+ int fd, err;
struct iovec iov[1];
struct msghdr msg;
nxt_unit_impl_t *lib;
@@ -5069,40 +5307,57 @@ nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
if (lib->callbacks.port_recv != NULL) {
- return lib->callbacks.port_recv(ctx, port,
- buf, buf_size, oob, oob_size);
+ rbuf->size = lib->callbacks.port_recv(ctx, port,
+ rbuf->buf, sizeof(rbuf->buf),
+ rbuf->oob, sizeof(rbuf->oob));
+
+ if (nxt_slow_path(rbuf->size < 0)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ return NXT_UNIT_OK;
}
- iov[0].iov_base = buf;
- iov[0].iov_len = buf_size;
+ iov[0].iov_base = rbuf->buf;
+ iov[0].iov_len = sizeof(rbuf->buf);
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = iov;
msg.msg_iovlen = 1;
msg.msg_flags = 0;
- msg.msg_control = oob;
- msg.msg_controllen = oob_size;
+ msg.msg_control = rbuf->oob;
+ msg.msg_controllen = sizeof(rbuf->oob);
fd = port->in_fd;
retry:
- res = recvmsg(fd, &msg, 0);
+ rbuf->size = recvmsg(fd, &msg, 0);
- if (nxt_slow_path(res == -1)) {
- if (errno == EINTR) {
+ if (nxt_slow_path(rbuf->size == -1)) {
+ err = errno;
+
+ if (err == EINTR) {
goto retry;
}
+ if (err == EAGAIN) {
+ nxt_unit_debug(ctx, "recvmsg(%d) failed: %s (%d)",
+ fd, strerror(errno), errno);
+
+ return NXT_UNIT_AGAIN;
+ }
+
nxt_unit_alert(ctx, "recvmsg(%d) failed: %s (%d)",
fd, strerror(errno), errno);
- } else {
- nxt_unit_debug(ctx, "recvmsg(%d): %d", fd, (int) res);
+ return NXT_UNIT_ERROR;
}
- return res;
+ nxt_unit_debug(ctx, "recvmsg(%d): %d", fd, (int) rbuf->size);
+
+ return NXT_UNIT_OK;
}