summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_unit.c
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2020-08-11 19:20:13 +0300
committerMax Romanov <max.romanov@nginx.com>2020-08-11 19:20:13 +0300
commit6e31d6cd39be9d3f4ee680fc13c3fe42f5cd39e7 (patch)
tree2ef1d993674825791636486a0929ead424c8dd7a /src/nxt_unit.c
parent3cbc22a6dc45abdeade4deb364601230ddca02c1 (diff)
downloadunit-6e31d6cd39be9d3f4ee680fc13c3fe42f5cd39e7.tar.gz
unit-6e31d6cd39be9d3f4ee680fc13c3fe42f5cd39e7.tar.bz2
Changing router to application shared memory exchange protocol.
The application process needs to request the shared memory segment from the router instead of the latter pushing the segment before sending a request to the application. This is required to simplify the communication between the router and the application and to prepare the router for using the application shared port and then the queue.
Diffstat (limited to 'src/nxt_unit.c')
-rw-r--r--src/nxt_unit.c377
1 files changed, 269 insertions, 108 deletions
diff --git a/src/nxt_unit.c b/src/nxt_unit.c
index c1ef977f..b321a0d4 100644
--- a/src/nxt_unit.c
+++ b/src/nxt_unit.c
@@ -51,6 +51,7 @@ static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
nxt_unit_port_t *router_port, nxt_unit_port_t *read_port,
int *log_fd, uint32_t *stream, uint32_t *shm_limit);
static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream);
+static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
@@ -103,12 +104,14 @@ static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps);
nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process);
nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process);
static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps);
-static nxt_port_mmap_header_t *nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx,
- nxt_unit_process_t *process, uint32_t id);
static int nxt_unit_tracking_read(nxt_unit_ctx_t *ctx,
- nxt_unit_recv_msg_t *recv_msg);
+ nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf);
+static int nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx,
+ nxt_unit_mmaps_t *mmaps, pid_t pid, uint32_t id,
+ nxt_port_mmap_header_t **hdr, nxt_unit_read_buf_t *rbuf);
static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx,
- nxt_unit_recv_msg_t *recv_msg);
+ nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf);
+static int nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id);
static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
nxt_unit_process_t *process,
nxt_port_mmap_header_t *hdr, void *start, uint32_t size);
@@ -240,7 +243,8 @@ struct nxt_unit_websocket_frame_impl_s {
struct nxt_unit_read_buf_s {
- nxt_unit_read_buf_t *next;
+ nxt_queue_link_t link;
+ nxt_unit_ctx_impl_t *ctx_impl;
ssize_t size;
char buf[16384];
char oob[256];
@@ -276,9 +280,11 @@ struct nxt_unit_ctx_impl_s {
/* of nxt_unit_request_info_impl_t */
nxt_queue_t ready_req;
- nxt_unit_read_buf_t *pending_read_head;
- nxt_unit_read_buf_t **pending_read_tail;
- nxt_unit_read_buf_t *free_read_buf;
+ /* of nxt_unit_read_buf_t */
+ nxt_queue_t pending_rbuf;
+
+ /* of nxt_unit_read_buf_t */
+ nxt_queue_t free_rbuf;
nxt_unit_mmap_buf_t ctx_buf[2];
nxt_unit_read_buf_t ctx_read_buf;
@@ -318,6 +324,7 @@ struct nxt_unit_port_impl_s {
nxt_atomic_t use_count;
+ /* for nxt_unit_process_t.ports */
nxt_queue_link_t link;
nxt_unit_process_t *process;
@@ -330,6 +337,9 @@ struct nxt_unit_port_impl_s {
struct nxt_unit_mmap_s {
nxt_port_mmap_header_t *hdr;
+
+ /* of nxt_unit_read_buf_t */
+ nxt_queue_t awaiting_rbuf;
};
@@ -345,7 +355,7 @@ struct nxt_unit_mmaps_s {
struct nxt_unit_process_s {
pid_t pid;
- nxt_queue_t ports;
+ nxt_queue_t ports; /* of nxt_unit_port_impl_t */
nxt_unit_mmaps_t incoming;
nxt_unit_mmaps_t outgoing;
@@ -537,17 +547,17 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
nxt_queue_init(&ctx_impl->free_ws);
nxt_queue_init(&ctx_impl->active_req);
nxt_queue_init(&ctx_impl->ready_req);
+ nxt_queue_init(&ctx_impl->pending_rbuf);
+ nxt_queue_init(&ctx_impl->free_rbuf);
ctx_impl->free_buf = NULL;
nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]);
nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]);
nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link);
+ nxt_queue_insert_tail(&ctx_impl->free_rbuf, &ctx_impl->ctx_read_buf.link);
- ctx_impl->pending_read_head = NULL;
- ctx_impl->pending_read_tail = &ctx_impl->pending_read_head;
- ctx_impl->free_read_buf = &ctx_impl->ctx_read_buf;
- ctx_impl->ctx_read_buf.next = NULL;
+ ctx_impl->ctx_read_buf.ctx_impl = ctx_impl;
ctx_impl->req.req.ctx = &ctx_impl->ctx;
ctx_impl->req.req.unit = &lib->unit;
@@ -767,9 +777,8 @@ nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream)
}
-int
-nxt_unit_process_msg(nxt_unit_ctx_t *ctx,
- void *buf, size_t buf_size, void *oob, size_t oob_size)
+static int
+nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
{
int rc;
pid_t pid;
@@ -783,11 +792,10 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx,
rc = NXT_UNIT_ERROR;
recv_msg.fd = -1;
recv_msg.process = NULL;
- port_msg = buf;
- cm = oob;
+ port_msg = (nxt_port_msg_t *) rbuf->buf;
+ cm = (struct cmsghdr *) rbuf->oob;
- if (oob_size >= CMSG_SPACE(sizeof(int))
- && cm->cmsg_len == CMSG_LEN(sizeof(int))
+ if (cm->cmsg_len == CMSG_LEN(sizeof(int))
&& cm->cmsg_level == SOL_SOCKET
&& cm->cmsg_type == SCM_RIGHTS)
{
@@ -796,8 +804,8 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx,
recv_msg.incoming_buf = NULL;
- if (nxt_slow_path(buf_size < sizeof(nxt_port_msg_t))) {
- nxt_unit_warn(ctx, "message too small (%d bytes)", (int) buf_size);
+ if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) {
+ nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size);
goto fail;
}
@@ -808,7 +816,7 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx,
recv_msg.mmap = port_msg->mmap;
recv_msg.start = port_msg + 1;
- recv_msg.size = buf_size - sizeof(nxt_port_msg_t);
+ recv_msg.size = rbuf->size - sizeof(nxt_port_msg_t);
if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) {
nxt_unit_warn(ctx, "#%"PRIu32": unknown message type (%d)",
@@ -816,10 +824,16 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx,
goto fail;
}
- if (port_msg->tracking && nxt_unit_tracking_read(ctx, &recv_msg) == 0) {
- rc = NXT_UNIT_OK;
+ if (port_msg->tracking) {
+ rc = nxt_unit_tracking_read(ctx, &recv_msg, rbuf);
- goto fail;
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ if (rc == NXT_UNIT_AGAIN) {
+ recv_msg.fd = -1;
+ }
+
+ goto fail;
+ }
}
/* Fragmentation is unsupported. */
@@ -830,7 +844,13 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx,
}
if (port_msg->mmap) {
- if (nxt_unit_mmap_read(ctx, &recv_msg) != NXT_UNIT_OK) {
+ rc = nxt_unit_mmap_read(ctx, &recv_msg, rbuf);
+
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ if (rc == NXT_UNIT_AGAIN) {
+ recv_msg.fd = -1;
+ }
+
goto fail;
}
}
@@ -1077,6 +1097,9 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port);
res = nxt_unit_request_check_response_port(req, &port_id);
+ if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
+ return NXT_UNIT_ERROR;
+ }
if (nxt_fast_path(res == NXT_UNIT_OK)) {
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
@@ -2376,33 +2399,41 @@ static nxt_unit_read_buf_t *
nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx)
{
nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_read_buf_t *rbuf;
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
pthread_mutex_lock(&ctx_impl->mutex);
- return nxt_unit_read_buf_get_impl(ctx_impl);
+ rbuf = nxt_unit_read_buf_get_impl(ctx_impl);
+
+ pthread_mutex_unlock(&ctx_impl->mutex);
+
+ return rbuf;
}
static nxt_unit_read_buf_t *
nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl)
{
+ nxt_queue_link_t *link;
nxt_unit_read_buf_t *rbuf;
- if (ctx_impl->free_read_buf != NULL) {
- rbuf = ctx_impl->free_read_buf;
- ctx_impl->free_read_buf = rbuf->next;
+ if (!nxt_queue_is_empty(&ctx_impl->free_rbuf)) {
+ link = nxt_queue_first(&ctx_impl->free_rbuf);
+ nxt_queue_remove(link);
- pthread_mutex_unlock(&ctx_impl->mutex);
+ rbuf = nxt_container_of(link, nxt_unit_read_buf_t, link);
return rbuf;
}
- pthread_mutex_unlock(&ctx_impl->mutex);
-
rbuf = malloc(sizeof(nxt_unit_read_buf_t));
+ if (nxt_fast_path(rbuf != NULL)) {
+ rbuf->ctx_impl = ctx_impl;
+ }
+
return rbuf;
}
@@ -2417,8 +2448,7 @@ nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
pthread_mutex_lock(&ctx_impl->mutex);
- rbuf->next = ctx_impl->free_read_buf;
- ctx_impl->free_read_buf = rbuf;
+ nxt_queue_insert_head(&ctx_impl->free_rbuf, &rbuf->link);
pthread_mutex_unlock(&ctx_impl->mutex);
}
@@ -3255,9 +3285,7 @@ nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx)
pthread_mutex_lock(&ctx_impl->mutex);
- *ctx_impl->pending_read_tail = rbuf;
- ctx_impl->pending_read_tail = &rbuf->next;
- rbuf->next = NULL;
+ nxt_queue_insert_tail(&ctx_impl->pending_rbuf, &rbuf->link);
pthread_mutex_unlock(&ctx_impl->mutex);
@@ -3275,7 +3303,12 @@ nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx)
static nxt_unit_mmap_t *
nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i)
{
- uint32_t cap;
+ uint32_t cap, n;
+ nxt_unit_mmap_t *e;
+
+ if (nxt_fast_path(mmaps->size > i)) {
+ return mmaps->elts + i;
+ }
cap = mmaps->cap;
@@ -3295,13 +3328,19 @@ nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i)
if (cap != mmaps->cap) {
- mmaps->elts = realloc(mmaps->elts, cap * sizeof(*mmaps->elts));
- if (nxt_slow_path(mmaps->elts == NULL)) {
+ e = realloc(mmaps->elts, cap * sizeof(nxt_unit_mmap_t));
+ if (nxt_slow_path(e == NULL)) {
return NULL;
}
- memset(mmaps->elts + mmaps->cap, 0,
- sizeof(*mmaps->elts) * (cap - mmaps->cap));
+ mmaps->elts = e;
+
+ for (n = mmaps->cap; n < cap; n++) {
+ e = mmaps->elts + n;
+
+ e->hdr = NULL;
+ nxt_queue_init(&e->awaiting_rbuf);
+ }
mmaps->cap = cap;
}
@@ -3581,13 +3620,16 @@ nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
static int
nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd)
{
- int rc;
- void *mem;
- struct stat mmap_stat;
- nxt_unit_mmap_t *mm;
- nxt_unit_impl_t *lib;
- nxt_unit_process_t *process;
- nxt_port_mmap_header_t *hdr;
+ int rc;
+ void *mem;
+ nxt_queue_t awaiting_rbuf;
+ struct stat mmap_stat;
+ nxt_unit_mmap_t *mm;
+ nxt_unit_impl_t *lib;
+ nxt_unit_process_t *process;
+ nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_read_buf_t *rbuf;
+ nxt_port_mmap_header_t *hdr;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
@@ -3626,7 +3668,7 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd)
hdr = mem;
- if (nxt_slow_path(hdr->src_pid != pid || hdr->dst_pid != lib->pid)) {
+ if (nxt_slow_path(hdr->src_pid != pid)) {
nxt_unit_warn(ctx, "incoming_mmap: unexpected pid in mmap header "
"detected: %d != %d or %d != %d", (int) hdr->src_pid,
@@ -3637,6 +3679,8 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd)
goto fail;
}
+ nxt_queue_init(&awaiting_rbuf);
+
pthread_mutex_lock(&process->incoming.mutex);
mm = nxt_unit_mmap_at(&process->incoming, hdr->id);
@@ -3650,11 +3694,28 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd)
hdr->sent_over = 0xFFFFu;
+ nxt_queue_add(&awaiting_rbuf, &mm->awaiting_rbuf);
+ nxt_queue_init(&mm->awaiting_rbuf);
+
rc = NXT_UNIT_OK;
}
pthread_mutex_unlock(&process->incoming.mutex);
+ nxt_queue_each(rbuf, &awaiting_rbuf, nxt_unit_read_buf_t, link) {
+
+ ctx_impl = rbuf->ctx_impl;
+
+ pthread_mutex_lock(&ctx_impl->mutex);
+
+ nxt_queue_insert_head(&ctx_impl->pending_rbuf, &rbuf->link);
+
+ pthread_mutex_unlock(&ctx_impl->mutex);
+
+ nxt_atomic_fetch_add(&ctx_impl->wait_items, -1);
+
+ } nxt_queue_loop;
+
fail:
nxt_unit_process_release(process);
@@ -3719,27 +3780,11 @@ nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps)
}
-static nxt_port_mmap_header_t *
-nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
- uint32_t id)
-{
- nxt_port_mmap_header_t *hdr;
-
- if (nxt_fast_path(process->incoming.size > id)) {
- hdr = process->incoming.elts[id].hdr;
-
- } else {
- hdr = NULL;
- }
-
- return hdr;
-}
-
-
static int
-nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
+nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
+ nxt_unit_read_buf_t *rbuf)
{
- int rc;
+ int res;
nxt_chunk_id_t c;
nxt_unit_process_t *process;
nxt_port_mmap_header_t *hdr;
@@ -3749,7 +3794,7 @@ nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: too small message (%d)",
recv_msg->stream, (int) recv_msg->size);
- return 0;
+ return NXT_UNIT_ERROR;
}
tracking_msg = recv_msg->start;
@@ -3759,44 +3804,95 @@ nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
process = nxt_unit_msg_get_process(ctx, recv_msg);
if (nxt_slow_path(process == NULL)) {
- return 0;
+ return NXT_UNIT_ERROR;
}
pthread_mutex_lock(&process->incoming.mutex);
- hdr = nxt_unit_get_incoming_mmap(ctx, process, tracking_msg->mmap_id);
- if (nxt_slow_path(hdr == NULL)) {
- pthread_mutex_unlock(&process->incoming.mutex);
-
- nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: "
- "invalid mmap id %d,%"PRIu32,
- recv_msg->stream, (int) process->pid,
- tracking_msg->mmap_id);
+ res = nxt_unit_check_rbuf_mmap(ctx, &process->incoming,
+ recv_msg->pid, tracking_msg->mmap_id,
+ &hdr, rbuf);
- return 0;
+ if (nxt_slow_path(res != NXT_UNIT_OK)) {
+ return res;
}
c = tracking_msg->tracking_id;
- rc = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->stream, 0);
+ res = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->stream, 0);
- if (rc == 0) {
+ if (res == 0) {
nxt_unit_debug(ctx, "#%"PRIu32": tracking cancelled",
recv_msg->stream);
nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c);
+
+ res = NXT_UNIT_CANCELLED;
+
+ } else {
+ res = NXT_UNIT_OK;
}
pthread_mutex_unlock(&process->incoming.mutex);
- return rc;
+ return res;
}
static int
-nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
+nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx, nxt_unit_mmaps_t *mmaps,
+ pid_t pid, uint32_t id, nxt_port_mmap_header_t **hdr,
+ nxt_unit_read_buf_t *rbuf)
{
+ int res, need_rbuf;
+ nxt_unit_mmap_t *mm;
+ nxt_unit_ctx_impl_t *ctx_impl;
+
+ mm = nxt_unit_mmap_at(mmaps, id);
+ if (nxt_slow_path(mm == NULL)) {
+ nxt_unit_alert(ctx, "failed to allocate mmap");
+
+ pthread_mutex_unlock(&mmaps->mutex);
+
+ *hdr = NULL;
+
+ return NXT_UNIT_ERROR;
+ }
+
+ *hdr = mm->hdr;
+
+ if (nxt_fast_path(*hdr != NULL)) {
+ return NXT_UNIT_OK;
+ }
+
+ need_rbuf = nxt_queue_is_empty(&mm->awaiting_rbuf);
+
+ nxt_queue_insert_tail(&mm->awaiting_rbuf, &rbuf->link);
+
+ pthread_mutex_unlock(&mmaps->mutex);
+
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
+ nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
+
+ if (need_rbuf) {
+ res = nxt_unit_get_mmap(ctx, pid, id);
+ if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
+ return NXT_UNIT_ERROR;
+ }
+ }
+
+ return NXT_UNIT_AGAIN;
+}
+
+
+static int
+nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
+ nxt_unit_read_buf_t *rbuf)
+{
+ int res;
void *start;
uint32_t size;
+ nxt_unit_mmaps_t *mmaps;
nxt_unit_process_t *process;
nxt_unit_mmap_buf_t *b, **incoming_tail;
nxt_port_mmap_msg_t *mmap_msg, *end;
@@ -3819,12 +3915,17 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
incoming_tail = &recv_msg->incoming_buf;
+ /* Allocating buffer structures. */
for (; mmap_msg < end; mmap_msg++) {
b = nxt_unit_mmap_buf_get(ctx);
if (nxt_slow_path(b == NULL)) {
nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf",
recv_msg->stream);
+ while (recv_msg->incoming_buf != NULL) {
+ nxt_unit_mmap_buf_release(recv_msg->incoming_buf);
+ }
+
return NXT_UNIT_ERROR;
}
@@ -3835,19 +3936,21 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
b = recv_msg->incoming_buf;
mmap_msg = recv_msg->start;
- pthread_mutex_lock(&process->incoming.mutex);
+ mmaps = &process->incoming;
+
+ pthread_mutex_lock(&mmaps->mutex);
for (; mmap_msg < end; mmap_msg++) {
- hdr = nxt_unit_get_incoming_mmap(ctx, process, mmap_msg->mmap_id);
- if (nxt_slow_path(hdr == NULL)) {
- pthread_mutex_unlock(&process->incoming.mutex);
+ res = nxt_unit_check_rbuf_mmap(ctx, mmaps,
+ recv_msg->pid, mmap_msg->mmap_id,
+ &hdr, rbuf);
- nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: "
- "invalid mmap id %d,%"PRIu32,
- recv_msg->stream, (int) process->pid,
- mmap_msg->mmap_id);
+ if (nxt_slow_path(res != NXT_UNIT_OK)) {
+ while (recv_msg->incoming_buf != NULL) {
+ nxt_unit_mmap_buf_release(recv_msg->incoming_buf);
+ }
- return NXT_UNIT_ERROR;
+ return res;
}
start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id);
@@ -3874,7 +3977,41 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
(int) mmap_msg->size);
}
- pthread_mutex_unlock(&process->incoming.mutex);
+ pthread_mutex_unlock(&mmaps->mutex);
+
+ return NXT_UNIT_OK;
+}
+
+
+static int
+nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id)
+{
+ ssize_t res;
+ nxt_unit_impl_t *lib;
+ nxt_unit_ctx_impl_t *ctx_impl;
+
+ struct {
+ nxt_port_msg_t msg;
+ nxt_port_msg_get_mmap_t get_mmap;
+ } m;
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
+ memset(&m.msg, 0, sizeof(nxt_port_msg_t));
+
+ m.msg.pid = lib->pid;
+ m.msg.reply_port = ctx_impl->read_port->id.id;
+ m.msg.type = _NXT_PORT_MSG_GET_MMAP;
+
+ m.get_mmap.id = id;
+
+ nxt_unit_debug(ctx, "get_mmap: %d %d", (int) pid, (int) id);
+
+ res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL, 0);
+ if (nxt_slow_path(res != sizeof(m))) {
+ return NXT_UNIT_ERROR;
+ }
return NXT_UNIT_OK;
}
@@ -4110,6 +4247,7 @@ int
nxt_unit_run_once(nxt_unit_ctx_t *ctx)
{
int rc;
+ nxt_queue_link_t *link;
nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_read_buf_t *rbuf;
@@ -4119,18 +4257,22 @@ nxt_unit_run_once(nxt_unit_ctx_t *ctx)
pthread_mutex_lock(&ctx_impl->mutex);
- if (ctx_impl->pending_read_head != NULL) {
- rbuf = ctx_impl->pending_read_head;
- ctx_impl->pending_read_head = rbuf->next;
+ if (!nxt_queue_is_empty(&ctx_impl->pending_rbuf)) {
- if (ctx_impl->pending_read_tail == &rbuf->next) {
- ctx_impl->pending_read_tail = &ctx_impl->pending_read_head;
- }
+next_pending:
+
+ link = nxt_queue_first(&ctx_impl->pending_rbuf);
+ nxt_queue_remove(link);
+
+ rbuf = nxt_container_of(link, nxt_unit_read_buf_t, link);
pthread_mutex_unlock(&ctx_impl->mutex);
} else {
rbuf = nxt_unit_read_buf_get_impl(ctx_impl);
+
+ pthread_mutex_unlock(&ctx_impl->mutex);
+
if (nxt_slow_path(rbuf == NULL)) {
nxt_unit_ctx_release(ctx_impl);
@@ -4142,21 +4284,40 @@ nxt_unit_run_once(nxt_unit_ctx_t *ctx)
}
if (nxt_fast_path(rbuf->size > 0)) {
- rc = nxt_unit_process_msg(ctx,
- rbuf->buf, rbuf->size,
- rbuf->oob, sizeof(rbuf->oob));
+ rc = nxt_unit_process_msg(ctx, rbuf);
#if (NXT_DEBUG)
- memset(rbuf->buf, 0xAC, rbuf->size);
+ if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) {
+ memset(rbuf->buf, 0xAC, rbuf->size);
+ }
#endif
} else {
rc = NXT_UNIT_ERROR;
}
- nxt_unit_read_buf_release(ctx, rbuf);
+ if (nxt_slow_path(rc == NXT_UNIT_AGAIN)) {
+ rc = NXT_UNIT_OK;
+
+ } else {
+ nxt_unit_read_buf_release(ctx, rbuf);
+ }
+
+ if (nxt_slow_path(rc == NXT_UNIT_CANCELLED)) {
+ rc = NXT_UNIT_OK;
+ }
+
+ if (nxt_fast_path(rc == NXT_UNIT_OK)) {
+ pthread_mutex_lock(&ctx_impl->mutex);
- nxt_unit_process_ready_req(ctx_impl);
+ if (!nxt_queue_is_empty(&ctx_impl->pending_rbuf)) {
+ goto next_pending;
+ }
+
+ pthread_mutex_unlock(&ctx_impl->mutex);
+
+ nxt_unit_process_ready_req(ctx_impl);
+ }
nxt_unit_ctx_release(ctx_impl);