summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/nxt_http_websocket.c3
-rw-r--r--src/nxt_port.h7
-rw-r--r--src/nxt_port_memory.c73
-rw-r--r--src/nxt_port_memory.h12
-rw-r--r--src/nxt_port_memory_int.h1
-rw-r--r--src/nxt_router.c69
-rw-r--r--src/nxt_unit.c377
-rw-r--r--src/nxt_unit.h11
8 files changed, 374 insertions, 179 deletions
diff --git a/src/nxt_http_websocket.c b/src/nxt_http_websocket.c
index fb888f5d..4d31b320 100644
--- a/src/nxt_http_websocket.c
+++ b/src/nxt_http_websocket.c
@@ -69,7 +69,8 @@ nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data)
if (buf == NULL || buf_free_size == 0) {
buf_free_size = nxt_min(frame_size, PORT_MMAP_DATA_SIZE);
- buf = nxt_port_mmap_get_buf(task, req_app_link->app_port,
+ buf = nxt_port_mmap_get_buf(task,
+ &req_app_link->app_port->process->outgoing,
buf_free_size);
*out_tail = buf;
diff --git a/src/nxt_port.h b/src/nxt_port.h
index 838a7ffe..3a8a200a 100644
--- a/src/nxt_port.h
+++ b/src/nxt_port.h
@@ -27,6 +27,7 @@ struct nxt_port_handlers_s {
nxt_port_handler_t new_port;
nxt_port_handler_t get_port;
nxt_port_handler_t mmap;
+ nxt_port_handler_t get_mmap;
/* New process */
nxt_port_handler_t process_created;
@@ -80,6 +81,7 @@ typedef enum {
_NXT_PORT_MSG_NEW_PORT = nxt_port_handler_idx(new_port),
_NXT_PORT_MSG_GET_PORT = nxt_port_handler_idx(get_port),
_NXT_PORT_MSG_MMAP = nxt_port_handler_idx(mmap),
+ _NXT_PORT_MSG_GET_MMAP = nxt_port_handler_idx(get_mmap),
_NXT_PORT_MSG_PROCESS_CREATED = nxt_port_handler_idx(process_created),
_NXT_PORT_MSG_PROCESS_READY = nxt_port_handler_idx(process_ready),
@@ -247,6 +249,11 @@ typedef struct {
} nxt_port_msg_get_port_t;
+typedef struct {
+ uint32_t id;
+} nxt_port_msg_get_mmap_t;
+
+
/*
* nxt_port_data_t size is allocation size
* which enables effective reuse of memory pool cache.
diff --git a/src/nxt_port_memory.c b/src/nxt_port_memory.c
index fd472cc6..1e01629e 100644
--- a/src/nxt_port_memory.c
+++ b/src/nxt_port_memory.c
@@ -282,8 +282,8 @@ fail:
static nxt_port_mmap_handler_t *
-nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
- nxt_port_t *port, nxt_bool_t tracking, nxt_int_t n)
+nxt_port_new_port_mmap(nxt_task_t *task, nxt_port_mmaps_t *mmaps,
+ nxt_bool_t tracking, nxt_int_t n)
{
void *mem;
nxt_fd_t fd;
@@ -295,15 +295,14 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t));
if (nxt_slow_path(mmap_handler == NULL)) {
- nxt_log(task, NXT_LOG_WARN, "failed to allocate mmap_handler");
+ nxt_alert(task, "failed to allocate mmap_handler");
return NULL;
}
- port_mmap = nxt_port_mmap_at(&process->outgoing, process->outgoing.size);
+ port_mmap = nxt_port_mmap_at(mmaps, mmaps->size);
if (nxt_slow_path(port_mmap == NULL)) {
- nxt_log(task, NXT_LOG_WARN,
- "failed to add port mmap to outgoing array");
+ nxt_alert(task, "failed to add port mmap to mmaps array");
nxt_free(mmap_handler);
return NULL;
@@ -322,6 +321,7 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
}
mmap_handler->hdr = mem;
+ mmap_handler->fd = fd;
port_mmap->mmap_handler = mmap_handler;
nxt_port_mmap_handler_use(mmap_handler, 1);
@@ -331,10 +331,9 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
nxt_memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map));
- hdr->id = process->outgoing.size - 1;
+ hdr->id = mmaps->size - 1;
hdr->src_pid = nxt_pid;
- hdr->dst_pid = process->pid;
- hdr->sent_over = port->id;
+ hdr->sent_over = 0xFFFFu;
/* Mark first chunk as busy */
free_map = tracking ? hdr->free_tracking_map : hdr->free_map;
@@ -347,13 +346,8 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT);
nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT);
- nxt_debug(task, "send mmap fd %FD to process %PI", fd, port->pid);
-
- /* TODO handle error */
- (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL);
-
- nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> %PI",
- hdr->id, nxt_pid, process->pid);
+ nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> ...",
+ hdr->id, nxt_pid);
return mmap_handler;
@@ -361,7 +355,7 @@ remove_fail:
nxt_free(mmap_handler);
- process->outgoing.size--;
+ mmaps->size--;
return NULL;
}
@@ -445,34 +439,28 @@ nxt_shm_open(nxt_task_t *task, size_t size)
static nxt_port_mmap_handler_t *
-nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c,
+nxt_port_mmap_get(nxt_task_t *task, nxt_port_mmaps_t *mmaps, nxt_chunk_id_t *c,
nxt_int_t n, nxt_bool_t tracking)
{
nxt_int_t i, res, nchunks;
- nxt_process_t *process;
nxt_free_map_t *free_map;
nxt_port_mmap_t *port_mmap;
nxt_port_mmap_t *end_port_mmap;
nxt_port_mmap_header_t *hdr;
nxt_port_mmap_handler_t *mmap_handler;
- process = port->process;
- if (nxt_slow_path(process == NULL)) {
- return NULL;
- }
-
- nxt_thread_mutex_lock(&process->outgoing.mutex);
+ nxt_thread_mutex_lock(&mmaps->mutex);
- end_port_mmap = process->outgoing.elts + process->outgoing.size;
+ end_port_mmap = mmaps->elts + mmaps->size;
- for (port_mmap = process->outgoing.elts;
+ for (port_mmap = mmaps->elts;
port_mmap < end_port_mmap;
port_mmap++)
{
mmap_handler = port_mmap->mmap_handler;
hdr = mmap_handler->hdr;
- if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port->id) {
+ if (hdr->sent_over != 0xFFFFu) {
continue;
}
@@ -510,11 +498,11 @@ nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c,
/* TODO introduce port_mmap limit and release wait. */
*c = 0;
- mmap_handler = nxt_port_new_port_mmap(task, process, port, tracking, n);
+ mmap_handler = nxt_port_new_port_mmap(task, mmaps, tracking, n);
unlock_return:
- nxt_thread_mutex_unlock(&process->outgoing.mutex);
+ nxt_thread_mutex_unlock(&mmaps->mutex);
return mmap_handler;
}
@@ -549,7 +537,7 @@ nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id)
nxt_int_t
-nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_t *port,
+nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_mmaps_t *mmaps,
nxt_port_mmap_tracking_t *tracking, uint32_t stream)
{
nxt_chunk_id_t c;
@@ -558,7 +546,7 @@ nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_t *port,
nxt_debug(task, "request tracking for stream #%uD", stream);
- mmap_handler = nxt_port_mmap_get(task, port, &c, 1, 1);
+ mmap_handler = nxt_port_mmap_get(task, mmaps, &c, 1, 1);
if (nxt_slow_path(mmap_handler == NULL)) {
return NXT_ERROR;
}
@@ -680,7 +668,7 @@ nxt_port_mmap_tracking_read(nxt_task_t *task, nxt_port_recv_msg_t *msg)
nxt_buf_t *
-nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
+nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_mmaps_t *mmaps, size_t size)
{
nxt_mp_t *mp;
nxt_buf_t *b;
@@ -707,7 +695,7 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
b->completion_handler = nxt_port_mmap_buf_completion;
nxt_buf_set_port_mmap(b);
- mmap_handler = nxt_port_mmap_get(task, port, &c, nchunks, 0);
+ mmap_handler = nxt_port_mmap_get(task, mmaps, &c, nchunks, 0);
if (nxt_slow_path(mmap_handler == NULL)) {
mp = task->thread->engine->mem_pool;
nxt_mp_free(mp, b);
@@ -943,9 +931,7 @@ nxt_port_mmap_read(nxt_task_t *task, nxt_port_recv_msg_t *msg)
nxt_port_method_t
nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b)
{
- nxt_port_method_t m;
- nxt_port_mmap_header_t *hdr;
- nxt_port_mmap_handler_t *mmap_handler;
+ nxt_port_method_t m;
m = NXT_PORT_METHOD_ANY;
@@ -956,9 +942,6 @@ nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b)
}
if (nxt_buf_is_port_mmap(b)) {
- mmap_handler = b->parent;
- hdr = mmap_handler->hdr;
-
if (m == NXT_PORT_METHOD_PLAIN) {
nxt_log_error(NXT_LOG_ERR, task->log,
"mixing plain and mmap buffers, "
@@ -967,16 +950,6 @@ nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b)
break;
}
- if (port->pid != hdr->dst_pid) {
- nxt_log_error(NXT_LOG_ERR, task->log,
- "send mmap buffer for %PI to %PI, "
- "using plain mode", hdr->dst_pid, port->pid);
-
- m = NXT_PORT_METHOD_PLAIN;
-
- break;
- }
-
if (m == NXT_PORT_METHOD_ANY) {
nxt_debug(task, "using mmap mode");
diff --git a/src/nxt_port_memory.h b/src/nxt_port_memory.h
index 2cd4bd76..8e71af3d 100644
--- a/src/nxt_port_memory.h
+++ b/src/nxt_port_memory.h
@@ -23,7 +23,7 @@ struct nxt_port_mmap_tracking_s {
};
nxt_int_t
-nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_t *port,
+nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_mmaps_t *mmaps,
nxt_port_mmap_tracking_t *tracking, uint32_t stream);
nxt_bool_t
@@ -37,14 +37,12 @@ nxt_bool_t
nxt_port_mmap_tracking_read(nxt_task_t *task, nxt_port_recv_msg_t *msg);
/*
- * Allocates nxt_but_t structure from port's mem_pool, assigns this buf 'mem'
- * pointers to first available shared mem bucket(s). 'size' used as a hint to
- * acquire several successive buckets if possible.
- *
- * This function assumes that current thread operates the 'port' exclusively.
+ * Allocates nxt_but_t structure from task's thread engine mem_pool, assigns
+ * this buf 'mem' pointers to first available shared mem bucket(s). 'size'
+ * used as a hint to acquire several successive buckets if possible.
*/
nxt_buf_t *
-nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size);
+nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_mmaps_t *mmaps, size_t size);
nxt_int_t nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b,
size_t size, size_t min_size);
diff --git a/src/nxt_port_memory_int.h b/src/nxt_port_memory_int.h
index 87c3d833..d2524ee4 100644
--- a/src/nxt_port_memory_int.h
+++ b/src/nxt_port_memory_int.h
@@ -63,6 +63,7 @@ struct nxt_port_mmap_header_s {
struct nxt_port_mmap_handler_s {
nxt_port_mmap_header_t *hdr;
nxt_atomic_t use_count;
+ nxt_fd_t fd;
};
/*
diff --git a/src/nxt_router.c b/src/nxt_router.c
index 3380e133..4df1489d 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -257,6 +257,8 @@ static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
static void nxt_router_get_port_handler(nxt_task_t *task,
nxt_port_recv_msg_t *msg);
+static void nxt_router_get_mmap_handler(nxt_task_t *task,
+ nxt_port_recv_msg_t *msg);
extern const nxt_http_request_state_t nxt_http_websocket;
@@ -281,6 +283,7 @@ static const nxt_port_handlers_t nxt_router_process_port_handlers = {
.get_port = nxt_router_get_port_handler,
.change_file = nxt_port_change_log_file_handler,
.mmap = nxt_port_mmap_handler,
+ .get_mmap = nxt_router_get_mmap_handler,
.data = nxt_router_conf_data_handler,
.remove_pid = nxt_router_remove_pid_handler,
.access_log = nxt_router_access_log_reopen_handler,
@@ -5008,7 +5011,7 @@ nxt_router_app_prepare_request(nxt_task_t *task,
buf = req_app_link->msg_info.buf;
- res = nxt_port_mmap_get_tracking(task, port,
+ res = nxt_port_mmap_get_tracking(task, &port->process->outgoing,
&req_app_link->msg_info.tracking,
req_app_link->stream);
if (nxt_slow_path(res != NXT_OK)) {
@@ -5138,7 +5141,7 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r,
return NULL;
}
- out = nxt_port_mmap_get_buf(task, port,
+ out = nxt_port_mmap_get_buf(task, &port->process->outgoing,
nxt_min(req_size + content_length, PORT_MMAP_DATA_SIZE));
if (nxt_slow_path(out == NULL)) {
return NULL;
@@ -5320,7 +5323,8 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r,
if (buf == NULL) {
free_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
- buf = nxt_port_mmap_get_buf(task, port, free_size);
+ buf = nxt_port_mmap_get_buf(task, &port->process->outgoing,
+ free_size);
if (nxt_slow_path(buf == NULL)) {
while (out != NULL) {
buf = out->next;
@@ -5556,6 +5560,65 @@ nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
static void
+nxt_router_get_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
+{
+ nxt_fd_t fd;
+ nxt_port_t *port;
+ nxt_runtime_t *rt;
+ nxt_port_mmaps_t *mmaps;
+ nxt_port_msg_get_mmap_t *get_mmap_msg;
+ nxt_port_mmap_handler_t *mmap_handler;
+
+ rt = task->thread->runtime;
+
+ port = nxt_runtime_port_find(rt, msg->port_msg.pid,
+ msg->port_msg.reply_port);
+ if (nxt_slow_path(port == NULL)) {
+ nxt_alert(task, "get_mmap_handler: reply_port %PI:%d not found",
+ msg->port_msg.pid, msg->port_msg.reply_port);
+
+ return;
+ }
+
+ if (nxt_slow_path(nxt_buf_used_size(msg->buf)
+ < (int) sizeof(nxt_port_msg_get_mmap_t)))
+ {
+ nxt_alert(task, "get_mmap_handler: message buffer too small (%d)",
+ (int) nxt_buf_used_size(msg->buf));
+
+ return;
+ }
+
+ get_mmap_msg = (nxt_port_msg_get_mmap_t *) msg->buf->mem.pos;
+
+ nxt_assert(port->type == NXT_PROCESS_APP);
+
+ mmaps = &port->process->outgoing;
+ nxt_thread_mutex_lock(&mmaps->mutex);
+
+ if (nxt_slow_path(get_mmap_msg->id >= mmaps->size)) {
+ nxt_thread_mutex_unlock(&mmaps->mutex);
+
+ nxt_alert(task, "get_mmap_handler: mmap id is too big (%d)",
+ (int) get_mmap_msg->id);
+
+ return;
+ }
+
+ mmap_handler = mmaps->elts[get_mmap_msg->id].mmap_handler;
+
+ fd = mmap_handler->fd;
+
+ nxt_thread_mutex_unlock(&mmaps->mutex);
+
+ nxt_debug(task, "get mmap %PI:%d found",
+ msg->port_msg.pid, (int) get_mmap_msg->id);
+
+ (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL);
+}
+
+
+static void
nxt_router_get_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
nxt_port_t *port, *reply_port;
diff --git a/src/nxt_unit.c b/src/nxt_unit.c
index c1ef977f..b321a0d4 100644
--- a/src/nxt_unit.c
+++ b/src/nxt_unit.c
@@ -51,6 +51,7 @@ static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
nxt_unit_port_t *router_port, nxt_unit_port_t *read_port,
int *log_fd, uint32_t *stream, uint32_t *shm_limit);
static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream);
+static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
@@ -103,12 +104,14 @@ static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps);
nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process);
nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process);
static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps);
-static nxt_port_mmap_header_t *nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx,
- nxt_unit_process_t *process, uint32_t id);
static int nxt_unit_tracking_read(nxt_unit_ctx_t *ctx,
- nxt_unit_recv_msg_t *recv_msg);
+ nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf);
+static int nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx,
+ nxt_unit_mmaps_t *mmaps, pid_t pid, uint32_t id,
+ nxt_port_mmap_header_t **hdr, nxt_unit_read_buf_t *rbuf);
static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx,
- nxt_unit_recv_msg_t *recv_msg);
+ nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf);
+static int nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id);
static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
nxt_unit_process_t *process,
nxt_port_mmap_header_t *hdr, void *start, uint32_t size);
@@ -240,7 +243,8 @@ struct nxt_unit_websocket_frame_impl_s {
struct nxt_unit_read_buf_s {
- nxt_unit_read_buf_t *next;
+ nxt_queue_link_t link;
+ nxt_unit_ctx_impl_t *ctx_impl;
ssize_t size;
char buf[16384];
char oob[256];
@@ -276,9 +280,11 @@ struct nxt_unit_ctx_impl_s {
/* of nxt_unit_request_info_impl_t */
nxt_queue_t ready_req;
- nxt_unit_read_buf_t *pending_read_head;
- nxt_unit_read_buf_t **pending_read_tail;
- nxt_unit_read_buf_t *free_read_buf;
+ /* of nxt_unit_read_buf_t */
+ nxt_queue_t pending_rbuf;
+
+ /* of nxt_unit_read_buf_t */
+ nxt_queue_t free_rbuf;
nxt_unit_mmap_buf_t ctx_buf[2];
nxt_unit_read_buf_t ctx_read_buf;
@@ -318,6 +324,7 @@ struct nxt_unit_port_impl_s {
nxt_atomic_t use_count;
+ /* for nxt_unit_process_t.ports */
nxt_queue_link_t link;
nxt_unit_process_t *process;
@@ -330,6 +337,9 @@ struct nxt_unit_port_impl_s {
struct nxt_unit_mmap_s {
nxt_port_mmap_header_t *hdr;
+
+ /* of nxt_unit_read_buf_t */
+ nxt_queue_t awaiting_rbuf;
};
@@ -345,7 +355,7 @@ struct nxt_unit_mmaps_s {
struct nxt_unit_process_s {
pid_t pid;
- nxt_queue_t ports;
+ nxt_queue_t ports; /* of nxt_unit_port_impl_t */
nxt_unit_mmaps_t incoming;
nxt_unit_mmaps_t outgoing;
@@ -537,17 +547,17 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
nxt_queue_init(&ctx_impl->free_ws);
nxt_queue_init(&ctx_impl->active_req);
nxt_queue_init(&ctx_impl->ready_req);
+ nxt_queue_init(&ctx_impl->pending_rbuf);
+ nxt_queue_init(&ctx_impl->free_rbuf);
ctx_impl->free_buf = NULL;
nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]);
nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]);
nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link);
+ nxt_queue_insert_tail(&ctx_impl->free_rbuf, &ctx_impl->ctx_read_buf.link);
- ctx_impl->pending_read_head = NULL;
- ctx_impl->pending_read_tail = &ctx_impl->pending_read_head;
- ctx_impl->free_read_buf = &ctx_impl->ctx_read_buf;
- ctx_impl->ctx_read_buf.next = NULL;
+ ctx_impl->ctx_read_buf.ctx_impl = ctx_impl;
ctx_impl->req.req.ctx = &ctx_impl->ctx;
ctx_impl->req.req.unit = &lib->unit;
@@ -767,9 +777,8 @@ nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream)
}
-int
-nxt_unit_process_msg(nxt_unit_ctx_t *ctx,
- void *buf, size_t buf_size, void *oob, size_t oob_size)
+static int
+nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
{
int rc;
pid_t pid;
@@ -783,11 +792,10 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx,
rc = NXT_UNIT_ERROR;
recv_msg.fd = -1;
recv_msg.process = NULL;
- port_msg = buf;
- cm = oob;
+ port_msg = (nxt_port_msg_t *) rbuf->buf;
+ cm = (struct cmsghdr *) rbuf->oob;
- if (oob_size >= CMSG_SPACE(sizeof(int))
- && cm->cmsg_len == CMSG_LEN(sizeof(int))
+ if (cm->cmsg_len == CMSG_LEN(sizeof(int))
&& cm->cmsg_level == SOL_SOCKET
&& cm->cmsg_type == SCM_RIGHTS)
{
@@ -796,8 +804,8 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx,
recv_msg.incoming_buf = NULL;
- if (nxt_slow_path(buf_size < sizeof(nxt_port_msg_t))) {
- nxt_unit_warn(ctx, "message too small (%d bytes)", (int) buf_size);
+ if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) {
+ nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size);
goto fail;
}
@@ -808,7 +816,7 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx,
recv_msg.mmap = port_msg->mmap;
recv_msg.start = port_msg + 1;
- recv_msg.size = buf_size - sizeof(nxt_port_msg_t);
+ recv_msg.size = rbuf->size - sizeof(nxt_port_msg_t);
if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) {
nxt_unit_warn(ctx, "#%"PRIu32": unknown message type (%d)",
@@ -816,10 +824,16 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx,
goto fail;
}
- if (port_msg->tracking && nxt_unit_tracking_read(ctx, &recv_msg) == 0) {
- rc = NXT_UNIT_OK;
+ if (port_msg->tracking) {
+ rc = nxt_unit_tracking_read(ctx, &recv_msg, rbuf);
- goto fail;
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ if (rc == NXT_UNIT_AGAIN) {
+ recv_msg.fd = -1;
+ }
+
+ goto fail;
+ }
}
/* Fragmentation is unsupported. */
@@ -830,7 +844,13 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx,
}
if (port_msg->mmap) {
- if (nxt_unit_mmap_read(ctx, &recv_msg) != NXT_UNIT_OK) {
+ rc = nxt_unit_mmap_read(ctx, &recv_msg, rbuf);
+
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ if (rc == NXT_UNIT_AGAIN) {
+ recv_msg.fd = -1;
+ }
+
goto fail;
}
}
@@ -1077,6 +1097,9 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port);
res = nxt_unit_request_check_response_port(req, &port_id);
+ if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
+ return NXT_UNIT_ERROR;
+ }
if (nxt_fast_path(res == NXT_UNIT_OK)) {
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
@@ -2376,33 +2399,41 @@ static nxt_unit_read_buf_t *
nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx)
{
nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_read_buf_t *rbuf;
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
pthread_mutex_lock(&ctx_impl->mutex);
- return nxt_unit_read_buf_get_impl(ctx_impl);
+ rbuf = nxt_unit_read_buf_get_impl(ctx_impl);
+
+ pthread_mutex_unlock(&ctx_impl->mutex);
+
+ return rbuf;
}
static nxt_unit_read_buf_t *
nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl)
{
+ nxt_queue_link_t *link;
nxt_unit_read_buf_t *rbuf;
- if (ctx_impl->free_read_buf != NULL) {
- rbuf = ctx_impl->free_read_buf;
- ctx_impl->free_read_buf = rbuf->next;
+ if (!nxt_queue_is_empty(&ctx_impl->free_rbuf)) {
+ link = nxt_queue_first(&ctx_impl->free_rbuf);
+ nxt_queue_remove(link);
- pthread_mutex_unlock(&ctx_impl->mutex);
+ rbuf = nxt_container_of(link, nxt_unit_read_buf_t, link);
return rbuf;
}
- pthread_mutex_unlock(&ctx_impl->mutex);
-
rbuf = malloc(sizeof(nxt_unit_read_buf_t));
+ if (nxt_fast_path(rbuf != NULL)) {
+ rbuf->ctx_impl = ctx_impl;
+ }
+
return rbuf;
}
@@ -2417,8 +2448,7 @@ nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
pthread_mutex_lock(&ctx_impl->mutex);
- rbuf->next = ctx_impl->free_read_buf;
- ctx_impl->free_read_buf = rbuf;
+ nxt_queue_insert_head(&ctx_impl->free_rbuf, &rbuf->link);
pthread_mutex_unlock(&ctx_impl->mutex);
}
@@ -3255,9 +3285,7 @@ nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx)
pthread_mutex_lock(&ctx_impl->mutex);
- *ctx_impl->pending_read_tail = rbuf;
- ctx_impl->pending_read_tail = &rbuf->next;
- rbuf->next = NULL;
+ nxt_queue_insert_tail(&ctx_impl->pending_rbuf, &rbuf->link);
pthread_mutex_unlock(&ctx_impl->mutex);
@@ -3275,7 +3303,12 @@ nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx)
static nxt_unit_mmap_t *
nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i)
{
- uint32_t cap;
+ uint32_t cap, n;
+ nxt_unit_mmap_t *e;
+
+ if (nxt_fast_path(mmaps->size > i)) {
+ return mmaps->elts + i;
+ }
cap = mmaps->cap;
@@ -3295,13 +3328,19 @@ nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i)
if (cap != mmaps->cap) {
- mmaps->elts = realloc(mmaps->elts, cap * sizeof(*mmaps->elts));
- if (nxt_slow_path(mmaps->elts == NULL)) {
+ e = realloc(mmaps->elts, cap * sizeof(nxt_unit_mmap_t));
+ if (nxt_slow_path(e == NULL)) {
return NULL;
}
- memset(mmaps->elts + mmaps->cap, 0,
- sizeof(*mmaps->elts) * (cap - mmaps->cap));
+ mmaps->elts = e;
+
+ for (n = mmaps->cap; n < cap; n++) {
+ e = mmaps->elts + n;
+
+ e->hdr = NULL;
+ nxt_queue_init(&e->awaiting_rbuf);
+ }
mmaps->cap = cap;
}
@@ -3581,13 +3620,16 @@ nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
static int
nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd)
{
- int rc;
- void *mem;
- struct stat mmap_stat;
- nxt_unit_mmap_t *mm;
- nxt_unit_impl_t *lib;
- nxt_unit_process_t *process;
- nxt_port_mmap_header_t *hdr;
+ int rc;
+ void *mem;
+ nxt_queue_t awaiting_rbuf;
+ struct stat mmap_stat;
+ nxt_unit_mmap_t *mm;
+ nxt_unit_impl_t *lib;
+ nxt_unit_process_t *process;
+ nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_read_buf_t *rbuf;
+ nxt_port_mmap_header_t *hdr;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
@@ -3626,7 +3668,7 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd)
hdr = mem;
- if (nxt_slow_path(hdr->src_pid != pid || hdr->dst_pid != lib->pid)) {
+ if (nxt_slow_path(hdr->src_pid != pid)) {
nxt_unit_warn(ctx, "incoming_mmap: unexpected pid in mmap header "
"detected: %d != %d or %d != %d", (int) hdr->src_pid,
@@ -3637,6 +3679,8 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd)
goto fail;
}
+ nxt_queue_init(&awaiting_rbuf);
+
pthread_mutex_lock(&process->incoming.mutex);
mm = nxt_unit_mmap_at(&process->incoming, hdr->id);
@@ -3650,11 +3694,28 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd)
hdr->sent_over = 0xFFFFu;
+ nxt_queue_add(&awaiting_rbuf, &mm->awaiting_rbuf);
+ nxt_queue_init(&mm->awaiting_rbuf);
+
rc = NXT_UNIT_OK;
}
pthread_mutex_unlock(&process->incoming.mutex);
+ nxt_queue_each(rbuf, &awaiting_rbuf, nxt_unit_read_buf_t, link) {
+
+ ctx_impl = rbuf->ctx_impl;
+
+ pthread_mutex_lock(&ctx_impl->mutex);
+
+ nxt_queue_insert_head(&ctx_impl->pending_rbuf, &rbuf->link);
+
+ pthread_mutex_unlock(&ctx_impl->mutex);
+
+ nxt_atomic_fetch_add(&ctx_impl->wait_items, -1);
+
+ } nxt_queue_loop;
+
fail:
nxt_unit_process_release(process);
@@ -3719,27 +3780,11 @@ nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps)
}
-static nxt_port_mmap_header_t *
-nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
- uint32_t id)
-{
- nxt_port_mmap_header_t *hdr;
-
- if (nxt_fast_path(process->incoming.size > id)) {
- hdr = process->incoming.elts[id].hdr;
-
- } else {
- hdr = NULL;
- }
-
- return hdr;
-}
-
-
static int
-nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
+nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
+ nxt_unit_read_buf_t *rbuf)
{
- int rc;
+ int res;
nxt_chunk_id_t c;
nxt_unit_process_t *process;
nxt_port_mmap_header_t *hdr;
@@ -3749,7 +3794,7 @@ nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: too small message (%d)",
recv_msg->stream, (int) recv_msg->size);
- return 0;
+ return NXT_UNIT_ERROR;
}
tracking_msg = recv_msg->start;
@@ -3759,44 +3804,95 @@ nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
process = nxt_unit_msg_get_process(ctx, recv_msg);
if (nxt_slow_path(process == NULL)) {
- return 0;
+ return NXT_UNIT_ERROR;
}
pthread_mutex_lock(&process->incoming.mutex);
- hdr = nxt_unit_get_incoming_mmap(ctx, process, tracking_msg->mmap_id);
- if (nxt_slow_path(hdr == NULL)) {
- pthread_mutex_unlock(&process->incoming.mutex);
-
- nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: "
- "invalid mmap id %d,%"PRIu32,
- recv_msg->stream, (int) process->pid,
- tracking_msg->mmap_id);
+ res = nxt_unit_check_rbuf_mmap(ctx, &process->incoming,
+ recv_msg->pid, tracking_msg->mmap_id,
+ &hdr, rbuf);
- return 0;
+ if (nxt_slow_path(res != NXT_UNIT_OK)) {
+ return res;
}
c = tracking_msg->tracking_id;
- rc = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->stream, 0);
+ res = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->stream, 0);
- if (rc == 0) {
+ if (res == 0) {
nxt_unit_debug(ctx, "#%"PRIu32": tracking cancelled",
recv_msg->stream);
nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c);
+
+ res = NXT_UNIT_CANCELLED;
+
+ } else {
+ res = NXT_UNIT_OK;
}
pthread_mutex_unlock(&process->incoming.mutex);
- return rc;
+ return res;
}
static int
-nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
+nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx, nxt_unit_mmaps_t *mmaps,
+ pid_t pid, uint32_t id, nxt_port_mmap_header_t **hdr,
+ nxt_unit_read_buf_t *rbuf)
{
+ int res, need_rbuf;
+ nxt_unit_mmap_t *mm;
+ nxt_unit_ctx_impl_t *ctx_impl;
+
+ mm = nxt_unit_mmap_at(mmaps, id);
+ if (nxt_slow_path(mm == NULL)) {
+ nxt_unit_alert(ctx, "failed to allocate mmap");
+
+ pthread_mutex_unlock(&mmaps->mutex);
+
+ *hdr = NULL;
+
+ return NXT_UNIT_ERROR;
+ }
+
+ *hdr = mm->hdr;
+
+ if (nxt_fast_path(*hdr != NULL)) {
+ return NXT_UNIT_OK;
+ }
+
+ need_rbuf = nxt_queue_is_empty(&mm->awaiting_rbuf);
+
+ nxt_queue_insert_tail(&mm->awaiting_rbuf, &rbuf->link);
+
+ pthread_mutex_unlock(&mmaps->mutex);
+
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
+ nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
+
+ if (need_rbuf) {
+ res = nxt_unit_get_mmap(ctx, pid, id);
+ if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
+ return NXT_UNIT_ERROR;
+ }
+ }
+
+ return NXT_UNIT_AGAIN;
+}
+
+
+static int
+nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
+ nxt_unit_read_buf_t *rbuf)
+{
+ int res;
void *start;
uint32_t size;
+ nxt_unit_mmaps_t *mmaps;
nxt_unit_process_t *process;
nxt_unit_mmap_buf_t *b, **incoming_tail;
nxt_port_mmap_msg_t *mmap_msg, *end;
@@ -3819,12 +3915,17 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
incoming_tail = &recv_msg->incoming_buf;
+ /* Allocating buffer structures. */
for (; mmap_msg < end; mmap_msg++) {
b = nxt_unit_mmap_buf_get(ctx);
if (nxt_slow_path(b == NULL)) {
nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf",
recv_msg->stream);
+ while (recv_msg->incoming_buf != NULL) {
+ nxt_unit_mmap_buf_release(recv_msg->incoming_buf);
+ }
+
return NXT_UNIT_ERROR;
}
@@ -3835,19 +3936,21 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
b = recv_msg->incoming_buf;
mmap_msg = recv_msg->start;
- pthread_mutex_lock(&process->incoming.mutex);
+ mmaps = &process->incoming;
+
+ pthread_mutex_lock(&mmaps->mutex);
for (; mmap_msg < end; mmap_msg++) {
- hdr = nxt_unit_get_incoming_mmap(ctx, process, mmap_msg->mmap_id);
- if (nxt_slow_path(hdr == NULL)) {
- pthread_mutex_unlock(&process->incoming.mutex);
+ res = nxt_unit_check_rbuf_mmap(ctx, mmaps,
+ recv_msg->pid, mmap_msg->mmap_id,
+ &hdr, rbuf);
- nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: "
- "invalid mmap id %d,%"PRIu32,
- recv_msg->stream, (int) process->pid,
- mmap_msg->mmap_id);
+ if (nxt_slow_path(res != NXT_UNIT_OK)) {
+ while (recv_msg->incoming_buf != NULL) {
+ nxt_unit_mmap_buf_release(recv_msg->incoming_buf);
+ }
- return NXT_UNIT_ERROR;
+ return res;
}
start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id);
@@ -3874,7 +3977,41 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
(int) mmap_msg->size);
}
- pthread_mutex_unlock(&process->incoming.mutex);
+ pthread_mutex_unlock(&mmaps->mutex);
+
+ return NXT_UNIT_OK;
+}
+
+
+static int
+nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id)
+{
+ ssize_t res;
+ nxt_unit_impl_t *lib;
+ nxt_unit_ctx_impl_t *ctx_impl;
+
+ struct {
+ nxt_port_msg_t msg;
+ nxt_port_msg_get_mmap_t get_mmap;
+ } m;
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
+ memset(&m.msg, 0, sizeof(nxt_port_msg_t));
+
+ m.msg.pid = lib->pid;
+ m.msg.reply_port = ctx_impl->read_port->id.id;
+ m.msg.type = _NXT_PORT_MSG_GET_MMAP;
+
+ m.get_mmap.id = id;
+
+ nxt_unit_debug(ctx, "get_mmap: %d %d", (int) pid, (int) id);
+
+ res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL, 0);
+ if (nxt_slow_path(res != sizeof(m))) {
+ return NXT_UNIT_ERROR;
+ }
return NXT_UNIT_OK;
}
@@ -4110,6 +4247,7 @@ int
nxt_unit_run_once(nxt_unit_ctx_t *ctx)
{
int rc;
+ nxt_queue_link_t *link;
nxt_unit_ctx_impl_t *ctx_impl;
nxt_unit_read_buf_t *rbuf;
@@ -4119,18 +4257,22 @@ nxt_unit_run_once(nxt_unit_ctx_t *ctx)
pthread_mutex_lock(&ctx_impl->mutex);
- if (ctx_impl->pending_read_head != NULL) {
- rbuf = ctx_impl->pending_read_head;
- ctx_impl->pending_read_head = rbuf->next;
+ if (!nxt_queue_is_empty(&ctx_impl->pending_rbuf)) {
- if (ctx_impl->pending_read_tail == &rbuf->next) {
- ctx_impl->pending_read_tail = &ctx_impl->pending_read_head;
- }
+next_pending:
+
+ link = nxt_queue_first(&ctx_impl->pending_rbuf);
+ nxt_queue_remove(link);
+
+ rbuf = nxt_container_of(link, nxt_unit_read_buf_t, link);
pthread_mutex_unlock(&ctx_impl->mutex);
} else {
rbuf = nxt_unit_read_buf_get_impl(ctx_impl);
+
+ pthread_mutex_unlock(&ctx_impl->mutex);
+
if (nxt_slow_path(rbuf == NULL)) {
nxt_unit_ctx_release(ctx_impl);
@@ -4142,21 +4284,40 @@ nxt_unit_run_once(nxt_unit_ctx_t *ctx)
}
if (nxt_fast_path(rbuf->size > 0)) {
- rc = nxt_unit_process_msg(ctx,
- rbuf->buf, rbuf->size,
- rbuf->oob, sizeof(rbuf->oob));
+ rc = nxt_unit_process_msg(ctx, rbuf);
#if (NXT_DEBUG)
- memset(rbuf->buf, 0xAC, rbuf->size);
+ if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) {
+ memset(rbuf->buf, 0xAC, rbuf->size);
+ }
#endif
} else {
rc = NXT_UNIT_ERROR;
}
- nxt_unit_read_buf_release(ctx, rbuf);
+ if (nxt_slow_path(rc == NXT_UNIT_AGAIN)) {
+ rc = NXT_UNIT_OK;
+
+ } else {
+ nxt_unit_read_buf_release(ctx, rbuf);
+ }
+
+ if (nxt_slow_path(rc == NXT_UNIT_CANCELLED)) {
+ rc = NXT_UNIT_OK;
+ }
+
+ if (nxt_fast_path(rc == NXT_UNIT_OK)) {
+ pthread_mutex_lock(&ctx_impl->mutex);
- nxt_unit_process_ready_req(ctx_impl);
+ if (!nxt_queue_is_empty(&ctx_impl->pending_rbuf)) {
+ goto next_pending;
+ }
+
+ pthread_mutex_unlock(&ctx_impl->mutex);
+
+ nxt_unit_process_ready_req(ctx_impl);
+ }
nxt_unit_ctx_release(ctx_impl);
diff --git a/src/nxt_unit.h b/src/nxt_unit.h
index 8fa64f4e..79157f5f 100644
--- a/src/nxt_unit.h
+++ b/src/nxt_unit.h
@@ -20,6 +20,7 @@ enum {
NXT_UNIT_OK = 0,
NXT_UNIT_ERROR = 1,
NXT_UNIT_AGAIN = 2,
+ NXT_UNIT_CANCELLED = 3,
};
enum {
@@ -189,16 +190,6 @@ struct nxt_unit_read_info_s {
nxt_unit_ctx_t *nxt_unit_init(nxt_unit_init_t *);
/*
- * Process received message, invoke configured callbacks.
- *
- * If application implements it's own event loop, each datagram received
- * from port socket should be initially processed by unit. This function
- * may invoke other application-defined callback for message processing.
- */
-int nxt_unit_process_msg(nxt_unit_ctx_t *,
- void *buf, size_t buf_size, void *oob, size_t oob_size);
-
-/*
* Main function useful in case when application does not have it's own
* event loop. nxt_unit_run() starts infinite message wait and process loop.
*