summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/nxt_port.h9
-rw-r--r--src/nxt_port_memory.c19
-rw-r--r--src/nxt_port_memory_int.h1
-rw-r--r--src/nxt_router.c56
-rw-r--r--src/nxt_unit.c544
-rw-r--r--src/nxt_unit.h6
6 files changed, 567 insertions, 68 deletions
diff --git a/src/nxt_port.h b/src/nxt_port.h
index 3f9302e8..c6f15238 100644
--- a/src/nxt_port.h
+++ b/src/nxt_port.h
@@ -44,6 +44,9 @@ struct nxt_port_handlers_s {
/* Various data. */
nxt_port_handler_t data;
+
+ nxt_port_handler_t oosm;
+ nxt_port_handler_t shm_ack;
};
@@ -82,6 +85,9 @@ typedef enum {
_NXT_PORT_MSG_DATA = nxt_port_handler_idx(data),
+ _NXT_PORT_MSG_OOSM = nxt_port_handler_idx(oosm),
+ _NXT_PORT_MSG_SHM_ACK = nxt_port_handler_idx(shm_ack),
+
NXT_PORT_MSG_MAX = sizeof(nxt_port_handlers_t)
/ sizeof(nxt_port_handler_t),
@@ -114,6 +120,9 @@ typedef enum {
NXT_PORT_MSG_DATA = _NXT_PORT_MSG_DATA,
NXT_PORT_MSG_DATA_LAST = _NXT_PORT_MSG_DATA | NXT_PORT_MSG_LAST,
+
+ NXT_PORT_MSG_OOSM = _NXT_PORT_MSG_OOSM | NXT_PORT_MSG_LAST,
+ NXT_PORT_MSG_SHM_ACK = _NXT_PORT_MSG_SHM_ACK | NXT_PORT_MSG_LAST,
} nxt_port_msg_type_t;
diff --git a/src/nxt_port_memory.c b/src/nxt_port_memory.c
index b7068c88..24a40406 100644
--- a/src/nxt_port_memory.c
+++ b/src/nxt_port_memory.c
@@ -112,6 +112,8 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
u_char *p;
nxt_mp_t *mp;
nxt_buf_t *b;
+ nxt_port_t *port;
+ nxt_process_t *process;
nxt_chunk_id_t c;
nxt_port_mmap_header_t *hdr;
nxt_port_mmap_handler_t *mmap_handler;
@@ -163,6 +165,21 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
c++;
}
+ if (hdr->dst_pid == nxt_pid
+ && nxt_atomic_cmp_set(&hdr->oosm, 1, 0))
+ {
+ process = nxt_runtime_process_find(task->thread->runtime, hdr->src_pid);
+
+ if (process != NULL && !nxt_queue_is_empty(&process->ports)) {
+ port = nxt_process_port_first(process);
+
+ if (port->type == NXT_PROCESS_WORKER) {
+ (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_SHM_ACK,
+ -1, 0, 0, NULL);
+ }
+ }
+ }
+
release_buf:
nxt_port_mmap_handler_use(mmap_handler, -1);
@@ -454,6 +471,8 @@ nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c,
goto unlock_return;
}
}
+
+ hdr->oosm = 1;
}
/* TODO introduce port_mmap limit and release wait. */
diff --git a/src/nxt_port_memory_int.h b/src/nxt_port_memory_int.h
index 53dfaebf..87c3d833 100644
--- a/src/nxt_port_memory_int.h
+++ b/src/nxt_port_memory_int.h
@@ -51,6 +51,7 @@ struct nxt_port_mmap_header_s {
nxt_pid_t src_pid; /* For sanity check. */
nxt_pid_t dst_pid; /* For sanity check. */
nxt_port_id_t sent_over;
+ nxt_atomic_t oosm;
nxt_free_map_t free_map[MAX_FREE_IDX];
nxt_free_map_t free_map_padding;
nxt_free_map_t free_tracking_map[MAX_FREE_IDX];
diff --git a/src/nxt_router.c b/src/nxt_router.c
index 38396e86..6a1f3792 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -248,6 +248,7 @@ static nxt_int_t nxt_router_http_request_done(nxt_task_t *task,
nxt_http_request_t *r);
static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
void *data);
+static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
extern const nxt_http_request_state_t nxt_http_websocket;
@@ -276,6 +277,7 @@ nxt_port_handlers_t nxt_router_process_port_handlers = {
.access_log = nxt_router_access_log_reopen_handler,
.rpc_ready = nxt_port_rpc_handler,
.rpc_error = nxt_port_rpc_handler,
+ .oosm = nxt_router_oosm_handler,
};
@@ -2748,6 +2750,7 @@ static nxt_port_handlers_t nxt_router_app_port_handlers = {
.rpc_error = nxt_port_rpc_handler,
.mmap = nxt_port_mmap_handler,
.data = nxt_port_rpc_handler,
+ .oosm = nxt_router_oosm_handler,
};
@@ -5241,3 +5244,56 @@ nxt_router_http_request_release(nxt_task_t *task, void *obj, void *data)
nxt_mp_release(r->mem_pool);
}
+
+
+static void
+nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
+{
+ size_t mi;
+ uint32_t i;
+ nxt_bool_t ack;
+ nxt_process_t *process;
+ nxt_free_map_t *m;
+ nxt_port_mmap_header_t *hdr;
+
+ nxt_debug(task, "oosm in %PI", msg->port_msg.pid);
+
+ process = nxt_runtime_process_find(task->thread->runtime,
+ msg->port_msg.pid);
+ if (nxt_slow_path(process == NULL)) {
+ return;
+ }
+
+ ack = 0;
+
+ /*
+ * To mitigate possible racing condition (when OOSM message received
+ * after some of the memory was already freed), need to try to find
+ * first free segment in shared memory and send ACK if found.
+ */
+
+ nxt_thread_mutex_lock(&process->incoming.mutex);
+
+ for (i = 0; i < process->incoming.size; i++) {
+ hdr = process->incoming.elts[i].mmap_handler->hdr;
+ m = hdr->free_map;
+
+ for (mi = 0; mi < MAX_FREE_IDX; mi++) {
+ if (m[mi] != 0) {
+ ack = 1;
+
+ nxt_debug(task, "oosm: already free #%uD %uz = 0x%08xA",
+ i, mi, m[mi]);
+
+ break;
+ }
+ }
+ }
+
+ nxt_thread_mutex_unlock(&process->incoming.mutex);
+
+ if (ack) {
+ (void) nxt_port_socket_write(task, msg->port, NXT_PORT_MSG_SHM_ACK,
+ -1, 0, 0, NULL);
+ }
+}
diff --git a/src/nxt_unit.c b/src/nxt_unit.c
index 8bc7b679..95874db3 100644
--- a/src/nxt_unit.c
+++ b/src/nxt_unit.c
@@ -23,12 +23,17 @@
#define NXT_UNIT_LOCAL_BUF_SIZE \
(NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t))
+#define NXT_UNIT_MAX_PLAIN_SIZE 1024
+#define NXT_UNIT_LOCAL_BUF_SIZE \
+ (NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t))
+
typedef struct nxt_unit_impl_s nxt_unit_impl_t;
typedef struct nxt_unit_mmap_s nxt_unit_mmap_t;
typedef struct nxt_unit_mmaps_s nxt_unit_mmaps_t;
typedef struct nxt_unit_process_s nxt_unit_process_t;
typedef struct nxt_unit_mmap_buf_s nxt_unit_mmap_buf_t;
typedef struct nxt_unit_recv_msg_s nxt_unit_recv_msg_t;
+typedef struct nxt_unit_read_buf_s nxt_unit_read_buf_t;
typedef struct nxt_unit_ctx_impl_s nxt_unit_ctx_impl_t;
typedef struct nxt_unit_port_impl_s nxt_unit_port_impl_t;
typedef struct nxt_unit_request_info_impl_s nxt_unit_request_info_impl_t;
@@ -53,6 +58,7 @@ static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
+static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx);
static nxt_unit_request_info_impl_t *nxt_unit_request_info_get(
nxt_unit_ctx_t *ctx);
static void nxt_unit_request_info_release(nxt_unit_request_info_t *req);
@@ -69,11 +75,18 @@ static int nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
nxt_unit_mmap_buf_t *mmap_buf, int last);
static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf);
static void nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf);
+static nxt_unit_read_buf_t *nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx);
+static nxt_unit_read_buf_t *nxt_unit_read_buf_get_impl(
+ nxt_unit_ctx_impl_t *ctx_impl);
+static void nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
+ nxt_unit_read_buf_t *rbuf);
static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst,
size_t size);
static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx,
nxt_unit_process_t *process, nxt_unit_port_id_t *port_id,
- nxt_chunk_id_t *c, int n);
+ nxt_chunk_id_t *c, int *n, int min_n);
+static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id);
+static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx);
static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i);
static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx,
nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, int n);
@@ -81,7 +94,7 @@ static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
int fd);
static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx,
nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, uint32_t size,
- nxt_unit_mmap_buf_t *mmap_buf, char *local_buf);
+ uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf);
static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd);
static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps);
@@ -94,14 +107,18 @@ static int nxt_unit_tracking_read(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
-static int nxt_unit_mmap_release(nxt_port_mmap_header_t *hdr, void *start,
- uint32_t size);
+static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
+ nxt_unit_process_t *process,
+ nxt_port_mmap_header_t *hdr, void *start, uint32_t size);
+static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid);
static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx,
pid_t pid);
static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_ctx_t *ctx,
pid_t pid, int remove);
static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
+static void nxt_unit_read_buf(nxt_unit_ctx_t *ctx,
+ nxt_unit_read_buf_t *rbuf);
static int nxt_unit_create_port(nxt_unit_ctx_t *ctx,
nxt_unit_port_id_t *port_id, int *fd);
@@ -144,6 +161,7 @@ struct nxt_unit_mmap_buf_s {
nxt_unit_port_id_t port_id;
nxt_unit_request_info_t *req;
nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_process_t *process;
char *free_ptr;
char *plain_ptr;
};
@@ -206,6 +224,14 @@ struct nxt_unit_websocket_frame_impl_s {
};
+struct nxt_unit_read_buf_s {
+ nxt_unit_read_buf_t *next;
+ ssize_t size;
+ char buf[16384];
+ char oob[256];
+};
+
+
struct nxt_unit_ctx_impl_s {
nxt_unit_ctx_t ctx;
@@ -230,7 +256,12 @@ struct nxt_unit_ctx_impl_s {
/* of nxt_unit_request_info_impl_t */
nxt_lvlhsh_t requests;
+ nxt_unit_read_buf_t *pending_read_head;
+ nxt_unit_read_buf_t **pending_read_tail;
+ nxt_unit_read_buf_t *free_read_buf;
+
nxt_unit_mmap_buf_t ctx_buf[2];
+ nxt_unit_read_buf_t ctx_read_buf;
nxt_unit_request_info_impl_t req;
};
@@ -277,6 +308,7 @@ struct nxt_unit_mmaps_s {
pthread_mutex_t mutex;
uint32_t size;
uint32_t cap;
+ nxt_atomic_t allocated_chunks;
nxt_unit_mmap_t *elts;
};
@@ -495,6 +527,11 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link);
+ ctx_impl->pending_read_head = NULL;
+ ctx_impl->pending_read_tail = &ctx_impl->pending_read_head;
+ ctx_impl->free_read_buf = &ctx_impl->ctx_read_buf;
+ ctx_impl->ctx_read_buf.next = NULL;
+
ctx_impl->req.req.ctx = &ctx_impl->ctx;
ctx_impl->req.req.unit = &lib->unit;
@@ -772,6 +809,10 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
rc = NXT_UNIT_OK;
break;
+ case _NXT_PORT_MSG_SHM_ACK:
+ rc = nxt_unit_process_shm_ack(ctx);
+ break;
+
default:
nxt_unit_debug(ctx, "#%"PRIu32": ignore message type: %d",
port_msg->stream, (int) port_msg->type);
@@ -1052,6 +1093,23 @@ nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
}
+static int
+nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx)
+{
+ nxt_unit_impl_t *lib;
+ nxt_unit_callbacks_t *cb;
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ cb = &lib->callbacks;
+
+ if (cb->shm_ack_handler != NULL) {
+ cb->shm_ack_handler(ctx);
+ }
+
+ return NXT_UNIT_OK;
+}
+
+
static nxt_unit_request_info_impl_t *
nxt_unit_request_info_get(nxt_unit_ctx_t *ctx)
{
@@ -1705,7 +1763,8 @@ nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size)
nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf);
rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
- &req->response_port, size, mmap_buf, NULL);
+ &req->response_port, size, size, mmap_buf,
+ NULL);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
nxt_unit_mmap_buf_release(mmap_buf);
@@ -1947,6 +2006,7 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
nxt_port_mmap_msg_t mmap_msg;
} m;
+ int rc;
u_char *last_used, *first_free;
ssize_t res;
nxt_chunk_id_t first_free_chunk;
@@ -1971,6 +2031,8 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
m.msg.mf = 0;
m.msg.tracking = 0;
+ rc = NXT_UNIT_ERROR;
+
if (m.msg.mmap) {
m.mmap_msg.mmap_id = hdr->id;
m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr,
@@ -1985,13 +2047,13 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
res = lib->callbacks.port_send(ctx, &mmap_buf->port_id, &m, sizeof(m),
NULL, 0);
if (nxt_slow_path(res != sizeof(m))) {
- return NXT_UNIT_ERROR;
+ goto free_buf;
}
- if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) {
- last_used = (u_char *) buf->free - 1;
+ last_used = (u_char *) buf->free - 1;
+ first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1;
- first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1;
+ if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) {
first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk);
buf->start = (char *) first_free;
@@ -2009,6 +2071,13 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
mmap_buf->hdr = NULL;
}
+ nxt_atomic_fetch_add(&mmap_buf->process->outgoing.allocated_chunks,
+ (int) m.mmap_msg.chunk_id - (int) first_free_chunk);
+
+ nxt_unit_debug(ctx, "process %d allocated_chunks %d",
+ mmap_buf->process->pid,
+ mmap_buf->process->outgoing.allocated_chunks);
+
} else {
if (nxt_slow_path(mmap_buf->plain_ptr == NULL
|| mmap_buf->plain_ptr > buf->start - sizeof(m.msg)))
@@ -2016,7 +2085,7 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
nxt_unit_warn(ctx, "#%"PRIu32": failed to send plain memory buffer"
": no space reserved for message header", stream);
- return NXT_UNIT_ERROR;
+ goto free_buf;
}
memcpy(buf->start - sizeof(m.msg), &m.msg, sizeof(m.msg));
@@ -2030,11 +2099,17 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
m.mmap_msg.size + sizeof(m.msg),
NULL, 0);
if (nxt_slow_path(res != (ssize_t) (m.mmap_msg.size + sizeof(m.msg)))) {
- return NXT_UNIT_ERROR;
+ goto free_buf;
}
}
- return NXT_UNIT_OK;
+ rc = NXT_UNIT_OK;
+
+free_buf:
+
+ nxt_unit_free_outgoing_buf(mmap_buf);
+
+ return rc;
}
@@ -2058,15 +2133,76 @@ static void
nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf)
{
if (mmap_buf->hdr != NULL) {
- nxt_unit_mmap_release(mmap_buf->hdr, mmap_buf->buf.start,
+ nxt_unit_mmap_release(&mmap_buf->ctx_impl->ctx,
+ mmap_buf->process,
+ mmap_buf->hdr, mmap_buf->buf.start,
mmap_buf->buf.end - mmap_buf->buf.start);
- } else if (mmap_buf->free_ptr != NULL) {
+ mmap_buf->hdr = NULL;
+
+ return;
+ }
+
+ if (mmap_buf->free_ptr != NULL) {
free(mmap_buf->free_ptr);
+
+ mmap_buf->free_ptr = NULL;
}
}
+static nxt_unit_read_buf_t *
+nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx)
+{
+ nxt_unit_ctx_impl_t *ctx_impl;
+
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
+ pthread_mutex_lock(&ctx_impl->mutex);
+
+ return nxt_unit_read_buf_get_impl(ctx_impl);
+}
+
+
+static nxt_unit_read_buf_t *
+nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl)
+{
+ nxt_unit_read_buf_t *rbuf;
+
+ if (ctx_impl->free_read_buf != NULL) {
+ rbuf = ctx_impl->free_read_buf;
+ ctx_impl->free_read_buf = rbuf->next;
+
+ pthread_mutex_unlock(&ctx_impl->mutex);
+
+ return rbuf;
+ }
+
+ pthread_mutex_unlock(&ctx_impl->mutex);
+
+ rbuf = malloc(sizeof(nxt_unit_read_buf_t));
+
+ return rbuf;
+}
+
+
+static void
+nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
+ nxt_unit_read_buf_t *rbuf)
+{
+ nxt_unit_ctx_impl_t *ctx_impl;
+
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
+ pthread_mutex_lock(&ctx_impl->mutex);
+
+ rbuf->next = ctx_impl->free_read_buf;
+ ctx_impl->free_read_buf = rbuf;
+
+ pthread_mutex_unlock(&ctx_impl->mutex);
+}
+
+
nxt_unit_buf_t *
nxt_unit_buf_next(nxt_unit_buf_t *buf)
{
@@ -2100,8 +2236,21 @@ int
nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
size_t size)
{
+ ssize_t res;
+
+ res = nxt_unit_response_write_nb(req, start, size, size);
+
+ return res < 0 ? -res : NXT_UNIT_OK;
+}
+
+
+ssize_t
+nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start,
+ size_t size, size_t min_size)
+{
int rc;
- uint32_t part_size;
+ ssize_t sent;
+ uint32_t part_size, min_part_size, buf_size;
const char *part_start;
nxt_unit_mmap_buf_t mmap_buf;
nxt_unit_request_info_impl_t *req_impl;
@@ -2110,58 +2259,70 @@ nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
part_start = start;
+ sent = 0;
if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
nxt_unit_req_warn(req, "write: response not initialized yet");
- return NXT_UNIT_ERROR;
+ return -NXT_UNIT_ERROR;
}
/* Check if response is not send yet. */
- if (nxt_slow_path(req->response_buf)) {
+ if (nxt_slow_path(req->response_buf != NULL)) {
part_size = req->response_buf->end - req->response_buf->free;
part_size = nxt_min(size, part_size);
rc = nxt_unit_response_add_content(req, part_start, part_size);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
- return rc;
+ return -rc;
}
rc = nxt_unit_response_send(req);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
- return rc;
+ return -rc;
}
size -= part_size;
part_start += part_size;
+ sent += part_size;
+
+ min_size -= nxt_min(min_size, part_size);
}
while (size > 0) {
part_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
+ min_part_size = nxt_min(min_size, part_size);
+ min_part_size = nxt_min(min_part_size, PORT_MMAP_CHUNK_SIZE);
rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
&req->response_port, part_size,
- &mmap_buf, local_buf);
+ min_part_size, &mmap_buf, local_buf);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
- return rc;
+ return -rc;
+ }
+
+ buf_size = mmap_buf.buf.end - mmap_buf.buf.free;
+ if (nxt_slow_path(buf_size == 0)) {
+ return sent;
}
+ part_size = nxt_min(buf_size, part_size);
mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free,
part_start, part_size);
rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, &mmap_buf, 0);
-
- nxt_unit_free_outgoing_buf(&mmap_buf);
-
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
- return rc;
+ return -rc;
}
size -= part_size;
part_start += part_size;
+ sent += part_size;
+
+ min_size -= nxt_min(min_size, part_size);
}
- return NXT_UNIT_OK;
+ return sent;
}
@@ -2171,6 +2332,7 @@ nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
{
int rc;
ssize_t n;
+ uint32_t buf_size;
nxt_unit_buf_t *buf;
nxt_unit_mmap_buf_t mmap_buf;
nxt_unit_request_info_impl_t *req_impl;
@@ -2224,10 +2386,11 @@ nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
nxt_unit_req_debug(req, "write_cb, alloc %"PRIu32"",
read_info->buf_size);
+ buf_size = nxt_min(read_info->buf_size, PORT_MMAP_DATA_SIZE);
+
rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
&req->response_port,
- nxt_min(read_info->buf_size,
- PORT_MMAP_DATA_SIZE),
+ buf_size, buf_size,
&mmap_buf, local_buf);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
return rc;
@@ -2249,9 +2412,6 @@ nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
}
rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, &mmap_buf, 0);
-
- nxt_unit_free_outgoing_buf(&mmap_buf);
-
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
nxt_unit_req_error(req, "Failed to send content");
@@ -2398,7 +2558,7 @@ nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode,
{
int i, rc;
size_t l, copy;
- uint32_t payload_len, buf_size;
+ uint32_t payload_len, buf_size, alloc_size;
const uint8_t *b;
nxt_unit_buf_t *buf;
nxt_unit_mmap_buf_t mmap_buf;
@@ -2415,10 +2575,11 @@ nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode,
}
buf_size = 10 + payload_len;
+ alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE);
rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
&req->response_port,
- nxt_min(buf_size, PORT_MMAP_DATA_SIZE),
+ alloc_size, alloc_size,
&mmap_buf, local_buf);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
return rc;
@@ -2454,17 +2615,16 @@ nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode,
rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream,
&mmap_buf, 0);
- nxt_unit_free_outgoing_buf(&mmap_buf);
-
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
return rc;
}
}
+ alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE);
+
rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
&req->response_port,
- nxt_min(buf_size,
- PORT_MMAP_DATA_SIZE),
+ alloc_size, alloc_size,
&mmap_buf, local_buf);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
return rc;
@@ -2478,8 +2638,6 @@ nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode,
if (buf->free > buf->start) {
rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream,
&mmap_buf, 0);
-
- nxt_unit_free_outgoing_buf(&mmap_buf);
}
return rc;
@@ -2553,15 +2711,23 @@ nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws)
static nxt_port_mmap_header_t *
nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
- nxt_unit_port_id_t *port_id, nxt_chunk_id_t *c, int n)
+ nxt_unit_port_id_t *port_id, nxt_chunk_id_t *c, int *n, int min_n)
{
int res, nchunks, i;
+ uint32_t outgoing_size;
nxt_unit_mmap_t *mm, *mm_end;
+ nxt_unit_impl_t *lib;
nxt_port_mmap_header_t *hdr;
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
pthread_mutex_lock(&process->outgoing.mutex);
- mm_end = process->outgoing.elts + process->outgoing.size;
+retry:
+
+ outgoing_size = process->outgoing.size;
+
+ mm_end = process->outgoing.elts + outgoing_size;
for (mm = process->outgoing.elts; mm < mm_end; mm++) {
hdr = mm->hdr;
@@ -2575,11 +2741,17 @@ nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
while (nxt_port_mmap_get_free_chunk(hdr->free_map, c)) {
nchunks = 1;
- while (nchunks < n) {
+ while (nchunks < *n) {
res = nxt_port_mmap_chk_set_chunk_busy(hdr->free_map,
*c + nchunks);
if (res == 0) {
+ if (nchunks >= min_n) {
+ *n = nchunks;
+
+ goto unlock;
+ }
+
for (i = 0; i < nchunks; i++) {
nxt_port_mmap_set_chunk_free(hdr->free_map, *c + i);
}
@@ -2592,23 +2764,155 @@ nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
nchunks++;
}
- if (nchunks == n) {
+ if (nchunks >= min_n) {
+ *n = nchunks;
+
goto unlock;
}
}
+
+ hdr->oosm = 1;
+ }
+
+ if (outgoing_size >= lib->shm_mmap_limit) {
+ /* Cannot allocate more shared memory. */
+ pthread_mutex_unlock(&process->outgoing.mutex);
+
+ if (min_n == 0) {
+ *n = 0;
+ }
+
+ if (nxt_slow_path(process->outgoing.allocated_chunks + min_n
+ >= lib->shm_mmap_limit * PORT_MMAP_CHUNK_COUNT))
+ {
+ /* Memory allocated by application, but not send to router. */
+ return NULL;
+ }
+
+ /* Notify router about OOSM condition. */
+
+ res = nxt_unit_send_oosm(ctx, port_id);
+ if (nxt_slow_path(res != NXT_UNIT_OK)) {
+ return NULL;
+ }
+
+ /* Return if caller can handle OOSM condition. Non-blocking mode. */
+
+ if (min_n == 0) {
+ return NULL;
+ }
+
+ nxt_unit_debug(ctx, "oosm: waiting for ACK");
+
+ res = nxt_unit_wait_shm_ack(ctx);
+ if (nxt_slow_path(res != NXT_UNIT_OK)) {
+ return NULL;
+ }
+
+ nxt_unit_debug(ctx, "oosm: retry");
+
+ pthread_mutex_lock(&process->outgoing.mutex);
+
+ goto retry;
}
*c = 0;
- hdr = nxt_unit_new_mmap(ctx, process, port_id, n);
+ hdr = nxt_unit_new_mmap(ctx, process, port_id, *n);
unlock:
+ nxt_atomic_fetch_add(&process->outgoing.allocated_chunks, *n);
+
+ nxt_unit_debug(ctx, "process %d allocated_chunks %d",
+ process->pid,
+ process->outgoing.allocated_chunks);
+
pthread_mutex_unlock(&process->outgoing.mutex);
return hdr;
}
+static int
+nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
+{
+ ssize_t res;
+ nxt_port_msg_t msg;
+ nxt_unit_impl_t *lib;
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+ msg.stream = 0;
+ msg.pid = lib->pid;
+ msg.reply_port = 0;
+ msg.type = _NXT_PORT_MSG_OOSM;
+ msg.last = 0;
+ msg.mmap = 0;
+ msg.nf = 0;
+ msg.mf = 0;
+ msg.tracking = 0;
+
+ res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg), NULL, 0);
+ if (nxt_slow_path(res != sizeof(msg))) {
+ nxt_unit_warn(ctx, "failed to send oosm to %d: %s (%d)",
+ (int) port_id->pid, strerror(errno), errno);
+
+ return NXT_UNIT_ERROR;
+ }
+
+ return NXT_UNIT_OK;
+}
+
+
+static int
+nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx)
+{
+ nxt_port_msg_t *port_msg;
+ nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_read_buf_t *rbuf;
+
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
+ while (1) {
+ rbuf = nxt_unit_read_buf_get(ctx);
+ if (nxt_slow_path(rbuf == NULL)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ nxt_unit_read_buf(ctx, rbuf);
+ if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) {
+ nxt_unit_read_buf_release(ctx, rbuf);
+
+ return NXT_UNIT_ERROR;
+ }
+
+ port_msg = (nxt_port_msg_t *) rbuf->buf;
+
+ if (port_msg->type == _NXT_PORT_MSG_SHM_ACK) {
+ nxt_unit_read_buf_release(ctx, rbuf);
+
+ break;
+ }
+
+ pthread_mutex_lock(&ctx_impl->mutex);
+
+ *ctx_impl->pending_read_tail = rbuf;
+ ctx_impl->pending_read_tail = &rbuf->next;
+ rbuf->next = NULL;
+
+ pthread_mutex_unlock(&ctx_impl->mutex);
+
+ if (port_msg->type == _NXT_PORT_MSG_QUIT) {
+ nxt_unit_debug(ctx, "oosm: quit received");
+
+ return NXT_UNIT_ERROR;
+ }
+ }
+
+ return NXT_UNIT_OK;
+}
+
+
static nxt_unit_mmap_t *
nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i)
{
@@ -2843,10 +3147,10 @@ nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int fd)
static int
nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
- nxt_unit_port_id_t *port_id, uint32_t size,
+ nxt_unit_port_id_t *port_id, uint32_t size, uint32_t min_size,
nxt_unit_mmap_buf_t *mmap_buf, char *local_buf)
{
- uint32_t nchunks;
+ int nchunks, min_nchunks;
nxt_chunk_id_t c;
nxt_port_mmap_header_t *hdr;
@@ -2869,6 +3173,7 @@ nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
mmap_buf->buf.free = mmap_buf->buf.start;
mmap_buf->buf.end = mmap_buf->buf.start + size;
mmap_buf->port_id = *port_id;
+ mmap_buf->process = process;
nxt_unit_debug(ctx, "outgoing plain buffer allocation: (%p, %d)",
mmap_buf->buf.start, (int) size);
@@ -2877,9 +3182,20 @@ nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
}
nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
+ min_nchunks = (min_size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
- hdr = nxt_unit_mmap_get(ctx, process, port_id, &c, nchunks);
+ hdr = nxt_unit_mmap_get(ctx, process, port_id, &c, &nchunks, min_nchunks);
if (nxt_slow_path(hdr == NULL)) {
+ if (nxt_fast_path(min_nchunks == 0 && nchunks == 0)) {
+ mmap_buf->hdr = NULL;
+ mmap_buf->buf.start = NULL;
+ mmap_buf->buf.free = NULL;
+ mmap_buf->buf.end = NULL;
+ mmap_buf->free_ptr = NULL;
+
+ return NXT_UNIT_OK;
+ }
+
return NXT_UNIT_ERROR;
}
@@ -2888,6 +3204,7 @@ nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
mmap_buf->buf.free = mmap_buf->buf.start;
mmap_buf->buf.end = mmap_buf->buf.start + nchunks * PORT_MMAP_CHUNK_SIZE;
mmap_buf->port_id = *port_id;
+ mmap_buf->process = process;
mmap_buf->free_ptr = NULL;
nxt_unit_debug(ctx, "outgoing mmap allocation: (%d,%d,%d)",
@@ -2991,6 +3308,7 @@ nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps)
mmaps->size = 0;
mmaps->cap = 0;
mmaps->elts = NULL;
+ mmaps->allocated_chunks = 0;
}
@@ -3174,6 +3492,7 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
b->buf.free = start;
b->buf.end = b->buf.start + size;
b->hdr = hdr;
+ b->process = process;
b = b->next;
@@ -3191,23 +3510,79 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
}
-static int
-nxt_unit_mmap_release(nxt_port_mmap_header_t *hdr, void *start, uint32_t size)
+static void
+nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
+ nxt_unit_process_t *process, nxt_port_mmap_header_t *hdr,
+ void *start, uint32_t size)
{
- u_char *p, *end;
- nxt_chunk_id_t c;
+ int freed_chunks;
+ u_char *p, *end;
+ nxt_chunk_id_t c;
+ nxt_unit_impl_t *lib;
memset(start, 0xA5, size);
p = start;
end = p + size;
c = nxt_port_mmap_chunk_id(hdr, p);
+ freed_chunks = 0;
while (p < end) {
nxt_port_mmap_set_chunk_free(hdr->free_map, c);
p += PORT_MMAP_CHUNK_SIZE;
c++;
+ freed_chunks++;
+ }
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+ if (hdr->src_pid == lib->pid && freed_chunks != 0) {
+ nxt_atomic_fetch_add(&process->outgoing.allocated_chunks,
+ -freed_chunks);
+
+ nxt_unit_debug(ctx, "process %d allocated_chunks %d",
+ process->pid,
+ process->outgoing.allocated_chunks);
+ }
+
+ if (hdr->dst_pid == lib->pid
+ && freed_chunks != 0
+ && nxt_atomic_cmp_set(&hdr->oosm, 1, 0))
+ {
+ nxt_unit_send_shm_ack(ctx, hdr->src_pid);
+ }
+}
+
+
+static int
+nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid)
+{
+ ssize_t res;
+ nxt_port_msg_t msg;
+ nxt_unit_impl_t *lib;
+ nxt_unit_port_id_t port_id;
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+ nxt_unit_port_id_init(&port_id, pid, 0);
+
+ msg.stream = 0;
+ msg.pid = lib->pid;
+ msg.reply_port = 0;
+ msg.type = _NXT_PORT_MSG_SHM_ACK;
+ msg.last = 0;
+ msg.mmap = 0;
+ msg.nf = 0;
+ msg.mf = 0;
+ msg.tracking = 0;
+
+ res = lib->callbacks.port_send(ctx, &port_id, &msg, sizeof(msg), NULL, 0);
+ if (nxt_slow_path(res != sizeof(msg))) {
+ nxt_unit_warn(ctx, "failed to send ack to %d: %s (%d)",
+ (int) port_id.pid, strerror(errno), errno);
+
+ return NXT_UNIT_ERROR;
}
return NXT_UNIT_OK;
@@ -3369,43 +3744,76 @@ int
nxt_unit_run_once(nxt_unit_ctx_t *ctx)
{
int rc;
- char buf[4096];
- char oob[256];
- ssize_t rsize;
- nxt_unit_impl_t *lib;
nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_read_buf_t *rbuf;
- lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
- memset(oob, 0, sizeof(struct cmsghdr));
+ pthread_mutex_lock(&ctx_impl->mutex);
+
+ if (ctx_impl->pending_read_head != NULL) {
+ rbuf = ctx_impl->pending_read_head;
+ ctx_impl->pending_read_head = rbuf->next;
+
+ if (ctx_impl->pending_read_tail == &rbuf->next) {
+ ctx_impl->pending_read_tail = &ctx_impl->pending_read_head;
+ }
+
+ pthread_mutex_unlock(&ctx_impl->mutex);
- if (ctx_impl->read_port_fd != -1) {
- rsize = nxt_unit_port_recv(ctx, ctx_impl->read_port_fd,
- buf, sizeof(buf),
- oob, sizeof(oob));
} else {
- rsize = lib->callbacks.port_recv(ctx, &ctx_impl->read_port_id,
- buf, sizeof(buf),
- oob, sizeof(oob));
+ rbuf = nxt_unit_read_buf_get_impl(ctx_impl);
+ if (nxt_slow_path(rbuf == NULL)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ nxt_unit_read_buf(ctx, rbuf);
}
- if (nxt_fast_path(rsize > 0)) {
- rc = nxt_unit_process_msg(ctx, &ctx_impl->read_port_id, buf, rsize,
- oob, sizeof(oob));
+ if (nxt_fast_path(rbuf->size > 0)) {
+ rc = nxt_unit_process_msg(ctx, &ctx_impl->read_port_id,
+ rbuf->buf, rbuf->size,
+ rbuf->oob, sizeof(rbuf->oob));
#if (NXT_DEBUG)
- memset(buf, 0xAC, rsize);
+ memset(rbuf->buf, 0xAC, rbuf->size);
#endif
} else {
rc = NXT_UNIT_ERROR;
}
+ nxt_unit_read_buf_release(ctx, rbuf);
+
return rc;
}
+static void
+nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
+{
+ nxt_unit_impl_t *lib;
+ nxt_unit_ctx_impl_t *ctx_impl;
+
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
+ memset(rbuf->oob, 0, sizeof(struct cmsghdr));
+
+ if (ctx_impl->read_port_fd != -1) {
+ rbuf->size = nxt_unit_port_recv(ctx, ctx_impl->read_port_fd,
+ rbuf->buf, sizeof(rbuf->buf),
+ rbuf->oob, sizeof(rbuf->oob));
+
+ } else {
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+ rbuf->size = lib->callbacks.port_recv(ctx, &ctx_impl->read_port_id,
+ rbuf->buf, sizeof(rbuf->buf),
+ rbuf->oob, sizeof(rbuf->oob));
+ }
+}
+
+
void
nxt_unit_done(nxt_unit_ctx_t *ctx)
{
diff --git a/src/nxt_unit.h b/src/nxt_unit.h
index 6948b253..c8aaa124 100644
--- a/src/nxt_unit.h
+++ b/src/nxt_unit.h
@@ -137,6 +137,9 @@ struct nxt_unit_callbacks_s {
/* Gracefully quit the application. Optional. */
void (*quit)(nxt_unit_ctx_t *);
+ /* Shared memory release acknowledgement. */
+ void (*shm_ack_handler)(nxt_unit_ctx_t *);
+
/* Send data and control to process pid using port id. Optional. */
ssize_t (*port_send)(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id,
const void *buf, size_t buf_size,
@@ -323,6 +326,9 @@ uint32_t nxt_unit_buf_min(void);
int nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
size_t size);
+ssize_t nxt_unit_response_write_nb(nxt_unit_request_info_t *req,
+ const void *start, size_t size, size_t min_size);
+
int nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
nxt_unit_read_info_t *read_info);