summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_unit.c
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2020-08-11 19:20:15 +0300
committerMax Romanov <max.romanov@nginx.com>2020-08-11 19:20:15 +0300
commit83595606121a821f9e3cef0f0b7e7fe87eb1e50a (patch)
tree2374867dd2f69654a71e95b7abec3fdad13ffd1a /src/nxt_unit.c
parent6e31d6cd39be9d3f4ee680fc13c3fe42f5cd39e7 (diff)
downloadunit-83595606121a821f9e3cef0f0b7e7fe87eb1e50a.tar.gz
unit-83595606121a821f9e3cef0f0b7e7fe87eb1e50a.tar.bz2
Introducing the shared application port.
This is the port shared between all application processes which use it to pass requests for processing. Using it significantly simplifies the request processing code in the router. The drawback is 2 more file descriptors per each configured application and more complex libunit message wait/read code.
Diffstat (limited to 'src/nxt_unit.c')
-rw-r--r--src/nxt_unit.c467
1 files changed, 361 insertions, 106 deletions
diff --git a/src/nxt_unit.c b/src/nxt_unit.c
index b321a0d4..7fb2826d 100644
--- a/src/nxt_unit.c
+++ b/src/nxt_unit.c
@@ -38,8 +38,8 @@ typedef struct nxt_unit_websocket_frame_impl_s nxt_unit_websocket_frame_impl_t;
static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init);
static int nxt_unit_ctx_init(nxt_unit_impl_t *lib,
nxt_unit_ctx_impl_t *ctx_impl, void *data);
-nxt_inline void nxt_unit_ctx_use(nxt_unit_ctx_impl_t *ctx_impl);
-nxt_inline void nxt_unit_ctx_release(nxt_unit_ctx_impl_t *ctx_impl);
+nxt_inline void nxt_unit_ctx_use(nxt_unit_ctx_t *ctx);
+nxt_inline void nxt_unit_ctx_release(nxt_unit_ctx_t *ctx);
nxt_inline void nxt_unit_lib_use(nxt_unit_impl_t *lib);
nxt_inline void nxt_unit_lib_release(nxt_unit_impl_t *lib);
nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
@@ -58,6 +58,7 @@ static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
nxt_unit_port_id_t *port_id);
+static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req);
static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx);
@@ -122,9 +123,12 @@ static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_impl_t *lib,
static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib,
pid_t pid, int remove);
static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
-static void nxt_unit_read_buf(nxt_unit_ctx_t *ctx,
- nxt_unit_read_buf_t *rbuf);
-static void nxt_unit_process_ready_req(nxt_unit_ctx_impl_t *ctx_impl);
+static int nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx);
+static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
+static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx);
+static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx);
+static int nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx,
+ nxt_unit_port_t *port);
static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl);
static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx);
@@ -150,9 +154,8 @@ static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx,
const void *oob, size_t oob_size);
static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
const void *buf, size_t buf_size, const void *oob, size_t oob_size);
-static ssize_t nxt_unit_port_recv(nxt_unit_ctx_t *ctx,
- nxt_unit_port_t *port, void *buf, size_t buf_size,
- void *oob, size_t oob_size);
+static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
+ nxt_unit_read_buf_t *rbuf);
static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
nxt_unit_port_t *port);
@@ -308,6 +311,7 @@ struct nxt_unit_impl_s {
nxt_lvlhsh_t ports; /* of nxt_unit_port_impl_t */
nxt_unit_port_t *router_port;
+ nxt_unit_port_t *shared_port;
nxt_queue_t contexts; /* of nxt_unit_ctx_impl_t */
@@ -452,7 +456,7 @@ nxt_unit_init(nxt_unit_init_t *init)
fail:
- nxt_unit_ctx_release(&lib->main_ctx);
+ nxt_unit_ctx_release(&lib->main_ctx.ctx);
return NULL;
}
@@ -496,6 +500,7 @@ nxt_unit_create(nxt_unit_init_t *init)
lib->use_count = 0;
lib->router_port = NULL;
+ lib->shared_port = NULL;
rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
@@ -570,16 +575,23 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
nxt_inline void
-nxt_unit_ctx_use(nxt_unit_ctx_impl_t *ctx_impl)
+nxt_unit_ctx_use(nxt_unit_ctx_t *ctx)
{
+ nxt_unit_ctx_impl_t *ctx_impl;
+
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
nxt_atomic_fetch_add(&ctx_impl->use_count, 1);
}
nxt_inline void
-nxt_unit_ctx_release(nxt_unit_ctx_impl_t *ctx_impl)
+nxt_unit_ctx_release(nxt_unit_ctx_t *ctx)
{
- long c;
+ long c;
+ nxt_unit_ctx_impl_t *ctx_impl;
+
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
c = nxt_atomic_fetch_add(&ctx_impl->use_count, -1);
@@ -624,6 +636,10 @@ nxt_unit_lib_release(nxt_unit_impl_t *lib)
nxt_unit_port_release(lib->router_port);
}
+ if (nxt_fast_path(lib->shared_port != NULL)) {
+ nxt_unit_port_release(lib->shared_port);
+ }
+
free(lib);
}
}
@@ -805,6 +821,15 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
recv_msg.incoming_buf = NULL;
if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) {
+ if (nxt_slow_path(rbuf->size == 0)) {
+ nxt_unit_debug(ctx, "read port closed");
+
+ nxt_unit_quit(ctx);
+ rc = NXT_UNIT_OK;
+
+ goto fail;
+ }
+
nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size);
goto fail;
}
@@ -946,6 +971,13 @@ fail:
nxt_unit_process_release(recv_msg.process);
}
+ if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) {
+#if (NXT_DEBUG)
+ memset(rbuf->buf, 0xAC, rbuf->size);
+#endif
+ nxt_unit_read_buf_release(ctx, rbuf);
+ }
+
return rc;
}
@@ -954,6 +986,7 @@ static int
nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
{
int nb;
+ nxt_unit_impl_t *lib;
nxt_unit_port_t new_port, *port;
nxt_port_msg_new_port_t *new_port_msg;
@@ -978,21 +1011,33 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
recv_msg->stream, (int) new_port_msg->pid,
(int) new_port_msg->id, recv_msg->fd);
- nb = 0;
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+ if (new_port_msg->id == (nxt_port_id_t) -1) {
+ nxt_unit_port_id_init(&new_port.id, lib->pid, new_port_msg->id);
- if (nxt_slow_path(ioctl(recv_msg->fd, FIONBIO, &nb) == -1)) {
- nxt_unit_alert(ctx, "#%"PRIu32": new_port: ioctl(%d, FIONBIO, 0) "
- "failed: %s (%d)",
- recv_msg->stream, recv_msg->fd, strerror(errno), errno);
+ new_port.in_fd = recv_msg->fd;
+ new_port.out_fd = -1;
- return NXT_UNIT_ERROR;
+ } else {
+ nb = 0;
+
+ if (nxt_slow_path(ioctl(recv_msg->fd, FIONBIO, &nb) == -1)) {
+ nxt_unit_alert(ctx, "#%"PRIu32": new_port: ioctl(%d, FIONBIO, 0) "
+ "failed: %s (%d)",
+ recv_msg->stream, recv_msg->fd, strerror(errno), errno);
+
+ return NXT_UNIT_ERROR;
+ }
+
+ nxt_unit_port_id_init(&new_port.id, new_port_msg->pid,
+ new_port_msg->id);
+
+ new_port.in_fd = -1;
+ new_port.out_fd = recv_msg->fd;
}
- nxt_unit_port_id_init(&new_port.id, new_port_msg->pid,
- new_port_msg->id);
- new_port.in_fd = -1;
- new_port.out_fd = recv_msg->fd;
new_port.data = NULL;
recv_msg->fd = -1;
@@ -1002,7 +1047,12 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
return NXT_UNIT_ERROR;
}
- nxt_unit_port_release(port);
+ if (new_port_msg->id == (nxt_port_id_t) -1) {
+ lib->shared_port = port;
+
+ } else {
+ nxt_unit_port_release(port);
+ }
return NXT_UNIT_OK;
}
@@ -1102,6 +1152,11 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
}
if (nxt_fast_path(res == NXT_UNIT_OK)) {
+ res = nxt_unit_send_req_headers_ack(req);
+ if (nxt_slow_path(res != NXT_UNIT_OK)) {
+ return res;
+ }
+
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
lib->callbacks.request_handler(req);
@@ -1221,6 +1276,36 @@ nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
static int
+nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req)
+{
+ ssize_t res;
+ nxt_port_msg_t msg;
+ nxt_unit_impl_t *lib;
+ nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_request_info_impl_t *req_impl;
+
+ lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
+ ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
+ req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
+
+ memset(&msg, 0, sizeof(nxt_port_msg_t));
+
+ msg.stream = req_impl->stream;
+ msg.pid = lib->pid;
+ msg.reply_port = ctx_impl->read_port->id.id;
+ msg.type = _NXT_PORT_MSG_REQ_HEADERS_ACK;
+
+ res = nxt_unit_port_send(req->ctx, req->response_port,
+ &msg, sizeof(msg), NULL, 0);
+ if (nxt_slow_path(res != sizeof(msg))) {
+ return NXT_UNIT_ERROR;
+ }
+
+ return NXT_UNIT_OK;
+}
+
+
+static int
nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
{
size_t hsize;
@@ -3267,7 +3352,9 @@ nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx)
return NXT_UNIT_ERROR;
}
- nxt_unit_read_buf(ctx, rbuf);
+ memset(rbuf->oob, 0, sizeof(struct cmsghdr));
+
+ nxt_unit_port_recv(ctx, ctx_impl->read_port, rbuf);
if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) {
nxt_unit_read_buf_release(ctx, rbuf);
@@ -4218,26 +4305,23 @@ nxt_unit_process_pop_first(nxt_unit_impl_t *lib)
int
nxt_unit_run(nxt_unit_ctx_t *ctx)
{
- int rc;
- nxt_unit_impl_t *lib;
- nxt_unit_ctx_impl_t *ctx_impl;
-
- ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+ int rc;
+ nxt_unit_impl_t *lib;
- nxt_unit_ctx_use(ctx_impl);
+ nxt_unit_ctx_use(ctx);
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
rc = NXT_UNIT_OK;
while (nxt_fast_path(lib->online)) {
- rc = nxt_unit_run_once(ctx);
+ rc = nxt_unit_run_once_impl(ctx);
- if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
break;
}
}
- nxt_unit_ctx_release(ctx_impl);
+ nxt_unit_ctx_release(ctx);
return rc;
}
@@ -4246,109 +4330,163 @@ nxt_unit_run(nxt_unit_ctx_t *ctx)
int
nxt_unit_run_once(nxt_unit_ctx_t *ctx)
{
- int rc;
- nxt_queue_link_t *link;
- nxt_unit_ctx_impl_t *ctx_impl;
- nxt_unit_read_buf_t *rbuf;
+ int rc;
- ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+ nxt_unit_ctx_use(ctx);
- nxt_unit_ctx_use(ctx_impl);
+ rc = nxt_unit_run_once_impl(ctx);
- pthread_mutex_lock(&ctx_impl->mutex);
+ nxt_unit_ctx_release(ctx);
- if (!nxt_queue_is_empty(&ctx_impl->pending_rbuf)) {
+ return rc;
+}
-next_pending:
- link = nxt_queue_first(&ctx_impl->pending_rbuf);
- nxt_queue_remove(link);
+static int
+nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx)
+{
+ int rc;
+ nxt_unit_read_buf_t *rbuf;
- rbuf = nxt_container_of(link, nxt_unit_read_buf_t, link);
+ rbuf = nxt_unit_read_buf_get(ctx);
+ if (nxt_slow_path(rbuf == NULL)) {
+ return NXT_UNIT_ERROR;
+ }
- pthread_mutex_unlock(&ctx_impl->mutex);
+ rc = nxt_unit_read_buf(ctx, rbuf);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ nxt_unit_read_buf_release(ctx, rbuf);
- } else {
- rbuf = nxt_unit_read_buf_get_impl(ctx_impl);
+ return rc;
+ }
- pthread_mutex_unlock(&ctx_impl->mutex);
+ rc = nxt_unit_process_msg(ctx, rbuf);
+ if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
+ return NXT_UNIT_ERROR;
+ }
- if (nxt_slow_path(rbuf == NULL)) {
+ rc = nxt_unit_process_pending_rbuf(ctx);
+ if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
+ return NXT_UNIT_ERROR;
+ }
- nxt_unit_ctx_release(ctx_impl);
+ nxt_unit_process_ready_req(ctx);
- return NXT_UNIT_ERROR;
- }
+ return rc;
+}
- nxt_unit_read_buf(ctx, rbuf);
- }
- if (nxt_fast_path(rbuf->size > 0)) {
- rc = nxt_unit_process_msg(ctx, rbuf);
+static int
+nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
+{
+ int res, err;
+ nxt_unit_impl_t *lib;
+ nxt_unit_ctx_impl_t *ctx_impl;
+ struct pollfd fds[2];
-#if (NXT_DEBUG)
- if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) {
- memset(rbuf->buf, 0xAC, rbuf->size);
- }
-#endif
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
- } else {
- rc = NXT_UNIT_ERROR;
+ memset(rbuf->oob, 0, sizeof(struct cmsghdr));
+
+ if (ctx_impl->wait_items > 0 || lib->shared_port == NULL) {
+ return nxt_unit_port_recv(ctx, ctx_impl->read_port, rbuf);
}
- if (nxt_slow_path(rc == NXT_UNIT_AGAIN)) {
- rc = NXT_UNIT_OK;
+retry:
- } else {
- nxt_unit_read_buf_release(ctx, rbuf);
- }
+ fds[0].fd = ctx_impl->read_port->in_fd;
+ fds[0].events = POLLIN;
+ fds[0].revents = 0;
- if (nxt_slow_path(rc == NXT_UNIT_CANCELLED)) {
- rc = NXT_UNIT_OK;
- }
+ fds[1].fd = lib->shared_port->in_fd;
+ fds[1].events = POLLIN;
+ fds[1].revents = 0;
- if (nxt_fast_path(rc == NXT_UNIT_OK)) {
- pthread_mutex_lock(&ctx_impl->mutex);
+ res = poll(fds, 2, -1);
+ if (nxt_slow_path(res < 0)) {
+ err = errno;
- if (!nxt_queue_is_empty(&ctx_impl->pending_rbuf)) {
- goto next_pending;
+ if (err == EINTR) {
+ goto retry;
}
- pthread_mutex_unlock(&ctx_impl->mutex);
+ nxt_unit_alert(ctx, "poll() failed: %s (%d)",
+ strerror(err), err);
- nxt_unit_process_ready_req(ctx_impl);
+ rbuf->size = -1;
+
+ return (err == EAGAIN) ? NXT_UNIT_AGAIN : NXT_UNIT_ERROR;
}
- nxt_unit_ctx_release(ctx_impl);
+ if ((fds[0].revents & POLLIN) != 0) {
+ return nxt_unit_port_recv(ctx, ctx_impl->read_port, rbuf);
+ }
- return rc;
+ if ((fds[1].revents & POLLIN) != 0) {
+ return nxt_unit_port_recv(ctx, lib->shared_port, rbuf);
+ }
+
+ rbuf->size = -1;
+
+ return NXT_UNIT_ERROR;
}
-static void
-nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
+static int
+nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx)
{
+ int rc;
+ nxt_queue_t pending_rbuf;
nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_read_buf_t *rbuf;
+
+ nxt_queue_init(&pending_rbuf);
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
- memset(rbuf->oob, 0, sizeof(struct cmsghdr));
+ pthread_mutex_lock(&ctx_impl->mutex);
+
+ if (nxt_queue_is_empty(&ctx_impl->pending_rbuf)) {
+ pthread_mutex_unlock(&ctx_impl->mutex);
+
+ return NXT_UNIT_OK;
+ }
+
+ nxt_queue_add(&pending_rbuf, &ctx_impl->pending_rbuf);
+ nxt_queue_init(&ctx_impl->pending_rbuf);
- rbuf->size = nxt_unit_port_recv(ctx, ctx_impl->read_port,
- rbuf->buf, sizeof(rbuf->buf),
- rbuf->oob, sizeof(rbuf->oob));
+ pthread_mutex_unlock(&ctx_impl->mutex);
+
+ rc = NXT_UNIT_OK;
+
+ nxt_queue_each(rbuf, &pending_rbuf, nxt_unit_read_buf_t, link) {
+
+ if (nxt_fast_path(rc != NXT_UNIT_ERROR)) {
+ rc = nxt_unit_process_msg(&ctx_impl->ctx, rbuf);
+
+ } else {
+ nxt_unit_read_buf_release(ctx, rbuf);
+ }
+
+ } nxt_queue_loop;
+
+ return rc;
}
static void
-nxt_unit_process_ready_req(nxt_unit_ctx_impl_t *ctx_impl)
+nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx)
{
nxt_queue_t ready_req;
nxt_unit_impl_t *lib;
+ nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_request_info_impl_t *req_impl;
nxt_queue_init(&ready_req);
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
pthread_mutex_lock(&ctx_impl->mutex);
if (nxt_queue_is_empty(&ctx_impl->ready_req)) {
@@ -4367,20 +4505,121 @@ nxt_unit_process_ready_req(nxt_unit_ctx_impl_t *ctx_impl)
{
lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit);
+ (void) nxt_unit_send_req_headers_ack(&req_impl->req);
+
lib->callbacks.request_handler(&req_impl->req);
} nxt_queue_loop;
}
-void
-nxt_unit_done(nxt_unit_ctx_t *ctx)
+int
+nxt_unit_run_ctx(nxt_unit_ctx_t *ctx)
{
+ int rc;
+ nxt_unit_impl_t *lib;
nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_ctx_use(ctx);
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
- nxt_unit_ctx_release(ctx_impl);
+ rc = NXT_UNIT_OK;
+
+ while (nxt_fast_path(lib->online)) {
+ rc = nxt_unit_process_port_msg_impl(ctx, ctx_impl->read_port);
+
+ if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
+ break;
+ }
+ }
+
+ nxt_unit_ctx_release(ctx);
+
+ return rc;
+}
+
+
+int
+nxt_unit_run_shared(nxt_unit_ctx_t *ctx)
+{
+ int rc;
+ nxt_unit_impl_t *lib;
+
+ nxt_unit_ctx_use(ctx);
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ rc = NXT_UNIT_OK;
+
+ while (nxt_fast_path(lib->online)) {
+ rc = nxt_unit_process_port_msg_impl(ctx, lib->shared_port);
+
+ if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
+ break;
+ }
+ }
+
+ nxt_unit_ctx_release(ctx);
+
+ return rc;
+}
+
+
+int
+nxt_unit_process_port_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
+{
+ int rc;
+
+ nxt_unit_ctx_use(ctx);
+
+ rc = nxt_unit_process_port_msg_impl(ctx, port);
+
+ nxt_unit_ctx_release(ctx);
+
+ return rc;
+}
+
+
+static int
+nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
+{
+ int rc;
+ nxt_unit_read_buf_t *rbuf;
+
+ rbuf = nxt_unit_read_buf_get(ctx);
+ if (nxt_slow_path(rbuf == NULL)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ memset(rbuf->oob, 0, sizeof(struct cmsghdr));
+
+ rc = nxt_unit_port_recv(ctx, port, rbuf);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ nxt_unit_read_buf_release(ctx, rbuf);
+ return rc;
+ }
+
+ rc = nxt_unit_process_msg(ctx, rbuf);
+ if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ rc = nxt_unit_process_pending_rbuf(ctx);
+ if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ nxt_unit_process_ready_req(ctx);
+
+ return rc;
+}
+
+
+void
+nxt_unit_done(nxt_unit_ctx_t *ctx)
+{
+ nxt_unit_ctx_release(ctx);
}
@@ -5056,12 +5295,11 @@ retry:
}
-static ssize_t
+static int
nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
- void *buf, size_t buf_size, void *oob, size_t oob_size)
+ nxt_unit_read_buf_t *rbuf)
{
- int fd;
- ssize_t res;
+ int fd, err;
struct iovec iov[1];
struct msghdr msg;
nxt_unit_impl_t *lib;
@@ -5069,40 +5307,57 @@ nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
if (lib->callbacks.port_recv != NULL) {
- return lib->callbacks.port_recv(ctx, port,
- buf, buf_size, oob, oob_size);
+ rbuf->size = lib->callbacks.port_recv(ctx, port,
+ rbuf->buf, sizeof(rbuf->buf),
+ rbuf->oob, sizeof(rbuf->oob));
+
+ if (nxt_slow_path(rbuf->size < 0)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ return NXT_UNIT_OK;
}
- iov[0].iov_base = buf;
- iov[0].iov_len = buf_size;
+ iov[0].iov_base = rbuf->buf;
+ iov[0].iov_len = sizeof(rbuf->buf);
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = iov;
msg.msg_iovlen = 1;
msg.msg_flags = 0;
- msg.msg_control = oob;
- msg.msg_controllen = oob_size;
+ msg.msg_control = rbuf->oob;
+ msg.msg_controllen = sizeof(rbuf->oob);
fd = port->in_fd;
retry:
- res = recvmsg(fd, &msg, 0);
+ rbuf->size = recvmsg(fd, &msg, 0);
- if (nxt_slow_path(res == -1)) {
- if (errno == EINTR) {
+ if (nxt_slow_path(rbuf->size == -1)) {
+ err = errno;
+
+ if (err == EINTR) {
goto retry;
}
+ if (err == EAGAIN) {
+ nxt_unit_debug(ctx, "recvmsg(%d) failed: %s (%d)",
+ fd, strerror(errno), errno);
+
+ return NXT_UNIT_AGAIN;
+ }
+
nxt_unit_alert(ctx, "recvmsg(%d) failed: %s (%d)",
fd, strerror(errno), errno);
- } else {
- nxt_unit_debug(ctx, "recvmsg(%d): %d", fd, (int) res);
+ return NXT_UNIT_ERROR;
}
- return res;
+ nxt_unit_debug(ctx, "recvmsg(%d): %d", fd, (int) rbuf->size);
+
+ return NXT_UNIT_OK;
}