summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_unit.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nxt_unit.c')
-rw-r--r--src/nxt_unit.c377
1 files changed, 269 insertions, 108 deletions
diff --git a/src/nxt_unit.c b/src/nxt_unit.c
index c1ef977f..b321a0d4 100644
--- a/src/nxt_unit.c
+++ b/src/nxt_unit.c
@@ -51,6 +51,7 @@ static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
nxt_unit_port_t *router_port, nxt_unit_port_t *read_port,
int *log_fd, uint32_t *stream, uint32_t *shm_limit);
static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream);
+static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
@@ -103,12 +104,14 @@ static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps);
nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process);
nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process);
static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps);
-static nxt_port_mmap_header_t *nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx,
- nxt_unit_process_t *process, uint32_t id);
static int nxt_unit_tracking_read(nxt_unit_ctx_t *ctx,
- nxt_unit_recv_msg_t *recv_msg);
+ nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf);
+static int nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx,
+ nxt_unit_mmaps_t *mmaps, pid_t pid, uint32_t id,
+ nxt_port_mmap_header_t **hdr, nxt_unit_read_buf_t *rbuf);
static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx,
- nxt_unit_recv_msg_t *recv_msg);
+ nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf);
+static int nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id);
static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
nxt_unit_process_t *process,
nxt_port_mmap_header_t *hdr, void *start, uint32_t size);
@@ -240,7 +243,8 @@ struct nxt_unit_websocket_frame_impl_s {
struct nxt_unit_read_buf_s {
- nxt_unit_read_buf_t *next;
+ nxt_queue_link_t link;
+ nxt_unit_ctx_impl_t *ctx_impl;
ssize_t size;
char buf[16384];
char oob[256];
@@ -276,9 +280,11 @@ struct nxt_unit_ctx_impl_s {
/* of nxt_unit_request_info_impl_t */
nxt_queue_t ready_req;
- nxt_unit_read_buf_t *pending_read_head;
- nxt_unit_read_buf_t **pending_read_tail;
- nxt_unit_read_buf_t *free_read_buf;
+ /* of nxt_unit_read_buf_t */
+ nxt_queue_t pending_rbuf;
+
+ /* of nxt_unit_read_buf_t */
+ nxt_queue_t free_rbuf;
nxt_unit_mmap_buf_t ctx_buf[2];
nxt_unit_read_buf_t ctx_read_buf;
@@ -318,6 +324,7 @@ struct nxt_unit_port_impl_s {
nxt_atomic_t use_count;
+ /* for nxt_unit_process_t.ports */
nxt_queue_link_t link;
nxt_unit_process_t *process;
@@ -330,6 +337,9 @@ struct nxt_unit_port_impl_s {
struct nxt_unit_mmap_s {
nxt_port_mmap_header_t *hdr;
+
+ /* of nxt_unit_read_buf_t */
+ nxt_queue_t awaiting_rbuf;
};
@@ -345,7 +355,7 @@ struct nxt_unit_mmaps_s {
struct nxt_unit_process_s {
pid_t pid;
- nxt_queue_t ports;
+ nxt_queue_t ports; /* of nxt_unit_port_impl_t */
nxt_unit_mmaps_t incoming;
nxt_unit_mmaps_t outgoing;
@@ -537,17 +547,17 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
nxt_queue_init(&ctx_impl->free_ws);
nxt_queue_init(&ctx_impl->active_req);
nxt_queue_init(&ctx_impl->ready_req);
+ nxt_queue_init(&ctx_impl->pending_rbuf);
+ nxt_queue_init(&ctx_impl->free_rbuf);
ctx_impl->free_buf = NULL;
nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]);
nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]);
nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link);
+ nxt_queue_insert_tail(&ctx_impl->free_rbuf, &ctx_impl->ctx_read_buf.link);
- ctx_impl->pending_read_head = NULL;
- ctx_impl->pending_read_tail = &ctx_impl->pending_read_head;
- ctx_impl->free_read_buf = &ctx_impl->ctx_read_buf;
- ctx_impl->ctx_read_buf.next = NULL;
+ ctx_impl->ctx_read_buf.ctx_impl = ctx_impl;
ctx_impl->req.req.ctx = &ctx_impl->ctx;
ctx_impl->req.req.unit = &lib->unit;
@@ -767,9 +777,8 @@ nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream)
}
-int
-nxt_unit_process_msg(nxt_unit_ctx_t *ctx,
- void *buf, size_t buf_size, void *oob, size_t oob_size)
+static int
+nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
{
int rc;
pid_t pid;
@@ -783,11 +792,10 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx,
rc = NXT_UNIT_ERROR;
recv_msg.fd = -1;
recv_msg.process = NULL;
- port_msg = buf;
- cm = oob;
+ port_msg = (nxt_port_msg_t *) rbuf->buf;
+ cm = (struct cmsghdr *) rbuf->oob;
- if (oob_size >= CMSG_SPACE(sizeof(int))
- && cm->cmsg_len == CMSG_LEN(sizeof(int))
+ if (cm->cmsg_len == CMSG_LEN(sizeof(int))
&& cm->cmsg_level == SOL_SOCKET
&& cm->cmsg_type == SCM_RIGHTS)
{
@@ -796,8 +804,8 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx,
recv_msg.incoming_buf = NULL;
- if (nxt_slow_path(buf_size < sizeof(nxt_port_msg_t))) {
- nxt_unit_warn(ctx, "message too small (%d bytes)", (int) buf_size);
+ if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) {
+ nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size);
goto fail;
}
@@ -808,7 +816,7 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx,
recv_msg.mmap = port_msg->mmap;
recv_msg.start = port_msg + 1;
- recv_msg.size = buf_size - sizeof(nxt_port_msg_t);
+ recv_msg.size = rbuf->size - sizeof(nxt_port_msg_t);
if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) {
nxt_unit_warn(ctx, "#%"PRIu32": unknown message type (%d)",
@@ -816,10 +824,16 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx,
goto fail;
}
- if (port_msg->tracking && nxt_unit_tracking_read(ctx, &recv_msg) == 0) {
- rc = NXT_UNIT_OK;
+ if (port_msg->tracking) {
+ rc = nxt_unit_tracking_read(ctx, &recv_msg, rbuf);
- goto fail;
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ if (rc == NXT_UNIT_AGAIN) {
+ recv_msg.fd = -1;
+ }
+
+ goto fail;
+ }
}
/* Fragmentation is unsupported. */
@@ -830,7 +844,13 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx,
}
if (port_msg->mmap) {
- if (nxt_unit_mmap_read(ctx, &recv_msg) != NXT_UNIT_OK) {
+ rc = nxt_unit_mmap_read(ctx, &recv_msg, rbuf);
+
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ if (rc == NXT_UNIT_AGAIN) {
+ recv_msg.fd = -1;
+ }
+
goto fail;
}
}
@@ -1077,6 +1097,9 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port);
res = nxt_unit_request_check_response_port(req, &port_id);
+ if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
+ return NXT_UNIT_ERROR;
+ }
if (nxt_fast_path(res == NXT_UNIT_OK)) {
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
@@ -2376,33 +2399,41 @@ static nxt_unit_read_buf_t *
nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx)
{
nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_read_buf_t *rbuf;
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
pthread_mutex_lock(&ctx_impl->mutex);
- return nxt_unit_read_buf_get_impl(ctx_impl);
+ rbuf = nxt_unit_read_buf_get_impl(ctx_impl);
+
+ pthread_mutex_unlock(&ctx_impl->mutex);
+
+ return rbuf;
}
static nxt_unit_read_buf_t *
nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl)
{
+ nxt_queue_link_t *link;
nxt_unit_read_buf_t *rbuf;
- if (ctx_impl->free_read_buf != NULL) {
- rbuf = ctx_impl->free_read_buf;
- ctx_impl->free_read_buf = rbuf->next;
+ if (!nxt_queue_is_empty(&ctx_impl->free_rbuf)) {
+ link = nxt_queue_first(&ctx_impl->free_rbuf);
+ nxt_queue_remove(link);
- pthread_mutex_unlock(&ctx_impl->mutex);
+ rbuf = nxt_container_of(link, nxt_unit_read_buf_t, link);
return rbuf;
}
- pthread_mutex_unlock(&ctx_impl->mutex);
-
rbuf = malloc(sizeof(nxt_unit_read_buf_t));
+ if (nxt_fast_path(rbuf != NULL)) {
+ rbuf->ctx_impl = ctx_impl;
+ }
+
return rbuf;
}
@@ -2417,8 +2448,7 @@ nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
pthread_mutex_lock(&ctx_impl->mutex);
- rbuf->next = ctx_impl->free_read_buf;
- ctx_impl->free_read_buf = rbuf;
+ nxt_queue_insert_head(&ctx_impl->free_rbuf, &rbuf->link);
pthread_mutex_unlock(&ctx_impl->mutex);
}
@@ -3255,9 +3285,7 @@ nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx)
pthread_mutex_lock(&ctx_impl->mutex);
- *ctx_impl->pending_read_tail = rbuf;
- ctx_impl->pending_read_tail = &rbuf->next;
- rbuf->next = NULL;
+ nxt_queue_insert_tail(&ctx_impl->pending_rbuf, &rbuf->link);
pthread_mutex_unlock(&ctx_impl->mutex);
@@ -3275,7 +3303,12 @@ nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx)
static nxt_unit_mmap_t *
nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i)
{
- uint32_t cap;
+ uint32_t cap, n;
+ nxt_unit_mmap_t *e;
+
+ if (nxt_fast_path(mmaps->size > i)) {
+ return mmaps->elts + i;
+ }
cap = mmaps->cap;
@@ -3295,13 +3328,19 @@ nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i)
if (cap != mmaps->cap) {
- mmaps->elts = realloc(mmaps->elts, cap * sizeof(*mmaps->elts));
- if (nxt_slow_path(mmaps->elts == NULL)) {
+ e = realloc(mmaps->elts, cap * sizeof(nxt_unit_mmap_t));
+ if (nxt_slow_path(e == NULL)) {
return NULL;
}
- memset(mmaps->elts + mmaps->cap, 0,
- sizeof(*mmaps->elts) * (cap - mmaps->cap));
+ mmaps->elts = e;
+
+ for (n = mmaps->cap; n < cap; n++) {
+ e = mmaps->elts + n;
+
+ e->hdr = NULL;
+ nxt_queue_init(&e->awaiting_rbuf);
+ }
mmaps->cap = cap;
}
@@ -3581,13 +3620,16 @@ nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
static int
nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd)
{
- int rc;
- void *mem;
- struct stat mmap_stat;
- nxt_unit_mmap_t *mm;
- nxt_unit_impl_t *lib;
- nxt_unit_process_t *process;
- nxt_port_mmap_header_t *hdr;
+ int rc;
+ void *mem;
+ nxt_queue_t awaiting_rbuf;
+ struct stat mmap_stat;
+ nxt_unit_mmap_t *mm;
+ nxt_unit_impl_t *lib;
+ nxt_unit_process_t *process;
+ nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_read_buf_t *rbuf;
+ nxt_port_mmap_header_t *hdr;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
@@ -3626,7 +3668,7 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd)
hdr = mem;
- if (nxt_slow_path(hdr->src_pid != pid || hdr->dst_pid != lib->pid)) {
+ if (nxt_slow_path(hdr->src_pid != pid)) {
nxt_unit_warn(ctx, "incoming_mmap: unexpected pid in mmap header "
"detected: %d != %d or %d != %d", (int) hdr->src_pid,
@@ -3637,6 +3679,8 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd)
goto fail;
}
+ nxt_queue_init(&awaiting_rbuf);
+
pthread_mutex_lock(&process->incoming.mutex);
mm = nxt_unit_mmap_at(&process->incoming, hdr->id);
@@ -3650,11 +3694,28 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd)
hdr->sent_over = 0xFFFFu;
+ nxt_queue_add(&awaiting_rbuf, &mm->awaiting_rbuf);
+ nxt_queue_init(&mm->awaiting_rbuf);
+
rc = NXT_UNIT_OK;
}
pthread_mutex_unlock(&process->incoming.mutex);
+ nxt_queue_each(rbuf, &awaiting_rbuf, nxt_unit_read_buf_t, link) {
+
+ ctx_impl = rbuf->ctx_impl;
+
+ pthread_mutex_lock(&ctx_impl->mutex);
+
+ nxt_queue_insert_head(&ctx_impl->pending_rbuf, &rbuf->link);
+
+ pthread_mutex_unlock(&ctx_impl->mutex);
+
+ nxt_atomic_fetch_add(&ctx_impl->wait_items, -1);
+
+ } nxt_queue_loop;
+
fail:
nxt_unit_process_release(process);
@@ -3719,27 +3780,11 @@ nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps)
}
-static nxt_port_mmap_header_t *
-nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
- uint32_t id)
-{
- nxt_port_mmap_header_t *hdr;
-
- if (nxt_fast_path(process->incoming.size > id)) {
- hdr = process->incoming.elts[id].hdr;
-
- } else {
- hdr = NULL;
- }
-
- return hdr;
-}
-
-
static int
-nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
+nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
+ nxt_unit_read_buf_t *rbuf)
{
- int rc;
+ int res;
nxt_chunk_id_t c;
nxt_unit_process_t *process;
nxt_port_mmap_header_t *hdr;
@@ -3749,7 +3794,7 @@ nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: too small message (%d)",
recv_msg->stream, (int) recv_msg->size);
- return 0;
+ return NXT_UNIT_ERROR;
}
tracking_msg = recv_msg->start;
@@ -3759,44 +3804,95 @@ nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
process = nxt_unit_msg_get_process(ctx, recv_msg);
if (nxt_slow_path(process == NULL)) {
- return 0;
+ return NXT_UNIT_ERROR;
}
pthread_mutex_lock(&process->incoming.mutex);
- hdr = nxt_unit_get_incoming_mmap(ctx, process, tracking_msg->mmap_id);
- if (nxt_slow_path(hdr == NULL)) {
- pthread_mutex_unlock(&process->incoming.mutex);
-
- nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: "
- "invalid mmap id %d,%"PRIu32,
- recv_msg->stream, (int) process->pid,
- tracking_msg->mmap_id);
+ res = nxt_unit_check_rbuf_mmap(ctx, &process->incoming,
+ recv_msg->pid, tracking_msg->mmap_id,
+ &hdr, rbuf);
- return 0;
+ if (nxt_slow_path(res != NXT_UNIT_OK)) {
+ return res;
}
c = tracking_msg->tracking_id;
- rc = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->stream, 0);
+ res = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->stream, 0);
- if (rc == 0) {
+ if (res == 0) {
nxt_unit_debug(ctx, "#%"PRIu32": tracking cancelled",
recv_msg->stream);
nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c);
+
+ res = NXT_UNIT_CANCELLED;
+
+ } else {
+ res = NXT_UNIT_OK;
}
pthread_mutex_unlock(&process->incoming.mutex);
- return rc;
+ return res;
}
static int
-nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
+nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx, nxt_unit_mmaps_t *mmaps,
+ pid_t pid, uint32_t id, nxt_port_mmap_header_t **hdr,
+ nxt_unit_read_buf_t *rbuf)
{
+ int res, need_rbuf;
+ nxt_unit_mmap_t *mm;
+ nxt_unit_ctx_impl_t *ctx_impl;
+
+ mm = nxt_unit_mmap_at(mmaps, id);
+ if (nxt_slow_path(mm == NULL)) {
+ nxt_unit_alert(ctx, "failed to allocate mmap");
+
+ pthread_mutex_unlock(&mmaps->mutex);
+
+ *hdr = NULL;
+
+ return NXT_UNIT_ERROR;
+ }
+
+ *hdr = mm->hdr;
+
+ if (nxt_fast_path(*hdr != NULL)) {
+ return NXT_UNIT_OK;
+ }
+
+ need_rbuf = nxt_queue_is_empty(&mm->awaiting_rbuf);
+
+ nxt_queue_insert_tail(&mm->awaiting_rbuf, &rbuf->link);
+
+ pthread_mutex_unlock(&mmaps->mutex);
+
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
+ nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
+
+ if (need_rbuf) {
+ res = nxt_unit_get_mmap(ctx, pid, id);
+ if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
+ return NXT_UNIT_ERROR;
+ }
+ }
+
+ return NXT_UNIT_AGAIN;
+}
+
+
+static int
+nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
+ nxt_unit_read_buf_t *rbuf)
+{
+ int res;
void *start;
uint32_t size;
+ nxt_unit_mmaps_t *mmaps;
nxt_unit_process_t *process;
nxt_unit_mmap_buf_t *b, **incoming_tail;
nxt_port_mmap_msg_t *mmap_msg, *end;
@@ -3819,12 +3915,17 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
incoming_tail = &recv_msg->incoming_buf;
+ /* Allocating buffer structures. */
for (; mmap_msg < end; mmap_msg++) {
b = nxt_unit_mmap_buf_get(ctx);
if (nxt_slow_path(b == NULL)) {
nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf",
recv_msg->stream);
+ while (recv_msg->incoming_buf != NULL) {
+ nxt_unit_mmap_buf_release(recv_msg->incoming_buf);
+ }
+
return NXT_UNIT_ERROR;
}
@@ -3835,19 +3936,21 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
b = recv_msg->incoming_buf;
mmap_msg = recv_msg->start;
- pthread_mutex_lock(&process->incoming.mutex);
+ mmaps = &process->incoming;
+
+ pthread_mutex_lock(&mmaps->mutex);
for (; mmap_msg < end; mmap_msg++) {
- hdr = nxt_unit_get_incoming_mmap(ctx, process, mmap_msg->mmap_id);
- if (nxt_slow_path(hdr == NULL)) {
- pthread_mutex_unlock(&process->incoming.mutex);
+ res = nxt_unit_check_rbuf_mmap(ctx, mmaps,
+ recv_msg->pid, mmap_msg->mmap_id,
+ &hdr, rbuf);
- nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: "
- "invalid mmap id %d,%"PRIu32,
- recv_msg->stream, (int) process->pid,
- mmap_msg->mmap_id);
+ if (nxt_slow_path(res != NXT_UNIT_OK)) {
+ while (recv_msg->incoming_buf != NULL) {
+ nxt_unit_mmap_buf_release(recv_msg->incoming_buf);
+ }
- return NXT_UNIT_ERROR;
+ return res;
}
start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id);
@@ -3874,7 +3977,41 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
(int) mmap_msg->size);
}
- pthread_mutex_unlock(&process->incoming.mutex);
+ pthread_mutex_unlock(&mmaps->mutex);
+
+ return NXT_UNIT_OK;
+}
+
+
+static int
+nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id)
+{
+ ssize_t res;
+ nxt_unit_impl_t *lib;
+ nxt_unit_ctx_impl_t *ctx_impl;
+
+ struct {
+ nxt_port_msg_t msg;
+ nxt_port_msg_get_mmap_t get_mmap;
+ } m;
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
+ memset(&m.msg, 0, sizeof(nxt_port_msg_t));
+
+ m.msg.pid = lib->pid;
+ m.msg.reply_port = ctx_impl->read_port->id.id;
+ m.msg.type = _NXT_PORT_MSG_GET_MMAP;
+
+ m.get_mmap.id = id;
+
+ nxt_unit_debug(ctx, "get_mmap: %d %d", (int) pid, (int) id);
+
+ res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL, 0);
+ if (nxt_slow_path(res != sizeof(m))) {
+ return NXT_UNIT_ERROR;
+ }
return NXT_UNIT_OK;
}
@@ -4110,6 +4247,7 @@ int
nxt_unit_run_once(nxt_unit_ctx_t *ctx)
{
int rc;
+ nxt_queue_link_t *link;
nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_read_buf_t *rbuf;
@@ -4119,18 +4257,22 @@ nxt_unit_run_once(nxt_unit_ctx_t *ctx)
pthread_mutex_lock(&ctx_impl->mutex);
- if (ctx_impl->pending_read_head != NULL) {
- rbuf = ctx_impl->pending_read_head;
- ctx_impl->pending_read_head = rbuf->next;
+ if (!nxt_queue_is_empty(&ctx_impl->pending_rbuf)) {
- if (ctx_impl->pending_read_tail == &rbuf->next) {
- ctx_impl->pending_read_tail = &ctx_impl->pending_read_head;
- }
+next_pending:
+
+ link = nxt_queue_first(&ctx_impl->pending_rbuf);
+ nxt_queue_remove(link);
+
+ rbuf = nxt_container_of(link, nxt_unit_read_buf_t, link);
pthread_mutex_unlock(&ctx_impl->mutex);
} else {
rbuf = nxt_unit_read_buf_get_impl(ctx_impl);
+
+ pthread_mutex_unlock(&ctx_impl->mutex);
+
if (nxt_slow_path(rbuf == NULL)) {
nxt_unit_ctx_release(ctx_impl);
@@ -4142,21 +4284,40 @@ nxt_unit_run_once(nxt_unit_ctx_t *ctx)
}
if (nxt_fast_path(rbuf->size > 0)) {
- rc = nxt_unit_process_msg(ctx,
- rbuf->buf, rbuf->size,
- rbuf->oob, sizeof(rbuf->oob));
+ rc = nxt_unit_process_msg(ctx, rbuf);
#if (NXT_DEBUG)
- memset(rbuf->buf, 0xAC, rbuf->size);
+ if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) {
+ memset(rbuf->buf, 0xAC, rbuf->size);
+ }
#endif
} else {
rc = NXT_UNIT_ERROR;
}
- nxt_unit_read_buf_release(ctx, rbuf);
+ if (nxt_slow_path(rc == NXT_UNIT_AGAIN)) {
+ rc = NXT_UNIT_OK;
+
+ } else {
+ nxt_unit_read_buf_release(ctx, rbuf);
+ }
+
+ if (nxt_slow_path(rc == NXT_UNIT_CANCELLED)) {
+ rc = NXT_UNIT_OK;
+ }
+
+ if (nxt_fast_path(rc == NXT_UNIT_OK)) {
+ pthread_mutex_lock(&ctx_impl->mutex);
- nxt_unit_process_ready_req(ctx_impl);
+ if (!nxt_queue_is_empty(&ctx_impl->pending_rbuf)) {
+ goto next_pending;
+ }
+
+ pthread_mutex_unlock(&ctx_impl->mutex);
+
+ nxt_unit_process_ready_req(ctx_impl);
+ }
nxt_unit_ctx_release(ctx_impl);