summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_unit.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nxt_unit.c')
-rw-r--r--src/nxt_unit.c828
1 files changed, 675 insertions, 153 deletions
diff --git a/src/nxt_unit.c b/src/nxt_unit.c
index 0cf32916..95874db3 100644
--- a/src/nxt_unit.c
+++ b/src/nxt_unit.c
@@ -19,12 +19,21 @@
#include <linux/memfd.h>
#endif
+#define NXT_UNIT_MAX_PLAIN_SIZE 1024
+#define NXT_UNIT_LOCAL_BUF_SIZE \
+ (NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t))
+
+#define NXT_UNIT_MAX_PLAIN_SIZE 1024
+#define NXT_UNIT_LOCAL_BUF_SIZE \
+ (NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t))
+
typedef struct nxt_unit_impl_s nxt_unit_impl_t;
typedef struct nxt_unit_mmap_s nxt_unit_mmap_t;
typedef struct nxt_unit_mmaps_s nxt_unit_mmaps_t;
typedef struct nxt_unit_process_s nxt_unit_process_t;
typedef struct nxt_unit_mmap_buf_s nxt_unit_mmap_buf_t;
typedef struct nxt_unit_recv_msg_s nxt_unit_recv_msg_t;
+typedef struct nxt_unit_read_buf_s nxt_unit_read_buf_t;
typedef struct nxt_unit_ctx_impl_s nxt_unit_ctx_impl_t;
typedef struct nxt_unit_port_impl_s nxt_unit_port_impl_t;
typedef struct nxt_unit_request_info_impl_s nxt_unit_request_info_impl_t;
@@ -37,9 +46,10 @@ nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
nxt_unit_mmap_buf_t *mmap_buf);
nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
nxt_unit_mmap_buf_t *mmap_buf);
-nxt_inline void nxt_unit_mmap_buf_remove(nxt_unit_mmap_buf_t *mmap_buf);
+nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf);
static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
- nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream);
+ nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream,
+ uint32_t *shm_limit);
static int nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
uint32_t stream);
static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
@@ -48,6 +58,7 @@ static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
+static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx);
static nxt_unit_request_info_impl_t *nxt_unit_request_info_get(
nxt_unit_ctx_t *ctx);
static void nxt_unit_request_info_release(nxt_unit_request_info_t *req);
@@ -63,11 +74,19 @@ static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf);
static int nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
nxt_unit_mmap_buf_t *mmap_buf, int last);
static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf);
+static void nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf);
+static nxt_unit_read_buf_t *nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx);
+static nxt_unit_read_buf_t *nxt_unit_read_buf_get_impl(
+ nxt_unit_ctx_impl_t *ctx_impl);
+static void nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
+ nxt_unit_read_buf_t *rbuf);
static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst,
size_t size);
static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx,
nxt_unit_process_t *process, nxt_unit_port_id_t *port_id,
- nxt_chunk_id_t *c, int n);
+ nxt_chunk_id_t *c, int *n, int min_n);
+static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id);
+static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx);
static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i);
static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx,
nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, int n);
@@ -75,7 +94,7 @@ static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
int fd);
static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx,
nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, uint32_t size,
- nxt_unit_mmap_buf_t *mmap_buf);
+ uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf);
static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd);
static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps);
@@ -88,14 +107,18 @@ static int nxt_unit_tracking_read(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx,
nxt_unit_recv_msg_t *recv_msg);
-static int nxt_unit_mmap_release(nxt_port_mmap_header_t *hdr, void *start,
- uint32_t size);
+static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
+ nxt_unit_process_t *process,
+ nxt_port_mmap_header_t *hdr, void *start, uint32_t size);
+static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid);
static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx,
pid_t pid);
static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_ctx_t *ctx,
pid_t pid, int remove);
static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
+static void nxt_unit_read_buf(nxt_unit_ctx_t *ctx,
+ nxt_unit_read_buf_t *rbuf);
static int nxt_unit_create_port(nxt_unit_ctx_t *ctx,
nxt_unit_port_id_t *port_id, int *fd);
@@ -135,10 +158,12 @@ struct nxt_unit_mmap_buf_s {
nxt_unit_mmap_buf_t **prev;
nxt_port_mmap_header_t *hdr;
-// nxt_queue_link_t link;
nxt_unit_port_id_t port_id;
nxt_unit_request_info_t *req;
nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_process_t *process;
+ char *free_ptr;
+ char *plain_ptr;
};
@@ -196,8 +221,14 @@ struct nxt_unit_websocket_frame_impl_s {
nxt_queue_link_t link;
nxt_unit_ctx_impl_t *ctx_impl;
+};
+
- void *retain_buf;
+struct nxt_unit_read_buf_s {
+ nxt_unit_read_buf_t *next;
+ ssize_t size;
+ char buf[16384];
+ char oob[256];
};
@@ -225,7 +256,12 @@ struct nxt_unit_ctx_impl_s {
/* of nxt_unit_request_info_impl_t */
nxt_lvlhsh_t requests;
+ nxt_unit_read_buf_t *pending_read_head;
+ nxt_unit_read_buf_t **pending_read_tail;
+ nxt_unit_read_buf_t *free_read_buf;
+
nxt_unit_mmap_buf_t ctx_buf[2];
+ nxt_unit_read_buf_t ctx_read_buf;
nxt_unit_request_info_impl_t req;
};
@@ -236,6 +272,7 @@ struct nxt_unit_impl_s {
nxt_unit_callbacks_t callbacks;
uint32_t request_data_size;
+ uint32_t shm_mmap_limit;
pthread_mutex_t mutex;
@@ -271,6 +308,7 @@ struct nxt_unit_mmaps_s {
pthread_mutex_t mutex;
uint32_t size;
uint32_t cap;
+ nxt_atomic_t allocated_chunks;
nxt_unit_mmap_t *elts;
};
@@ -302,7 +340,7 @@ nxt_unit_ctx_t *
nxt_unit_init(nxt_unit_init_t *init)
{
int rc;
- uint32_t ready_stream;
+ uint32_t ready_stream, shm_limit;
nxt_unit_ctx_t *ctx;
nxt_unit_impl_t *lib;
nxt_unit_port_t ready_port, read_port;
@@ -325,12 +363,20 @@ nxt_unit_init(nxt_unit_init_t *init)
ready_port.id.id);
nxt_unit_port_id_init(&read_port.id, read_port.id.pid,
read_port.id.id);
+
} else {
rc = nxt_unit_read_env(&ready_port, &read_port, &lib->log_fd,
- &ready_stream);
+ &ready_stream, &shm_limit);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
goto fail;
}
+
+ lib->shm_mmap_limit = (shm_limit + PORT_MMAP_DATA_SIZE - 1)
+ / PORT_MMAP_DATA_SIZE;
+ }
+
+ if (nxt_slow_path(lib->shm_mmap_limit < 1)) {
+ lib->shm_mmap_limit = 1;
}
lib->pid = read_port.id.pid;
@@ -395,6 +441,8 @@ nxt_unit_create(nxt_unit_init_t *init)
lib->callbacks = init->callbacks;
lib->request_data_size = init->request_data_size;
+ lib->shm_mmap_limit = (init->shm_limit + PORT_MMAP_DATA_SIZE - 1)
+ / PORT_MMAP_DATA_SIZE;
lib->processes.slot = NULL;
lib->ports.slot = NULL;
@@ -479,6 +527,11 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link);
+ ctx_impl->pending_read_head = NULL;
+ ctx_impl->pending_read_tail = &ctx_impl->pending_read_head;
+ ctx_impl->free_read_buf = &ctx_impl->ctx_read_buf;
+ ctx_impl->ctx_read_buf.next = NULL;
+
ctx_impl->req.req.ctx = &ctx_impl->ctx;
ctx_impl->req.req.unit = &lib->unit;
@@ -517,7 +570,7 @@ nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
nxt_inline void
-nxt_unit_mmap_buf_remove(nxt_unit_mmap_buf_t *mmap_buf)
+nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf)
{
nxt_unit_mmap_buf_t **prev;
@@ -535,7 +588,7 @@ nxt_unit_mmap_buf_remove(nxt_unit_mmap_buf_t *mmap_buf)
static int
nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *read_port,
- int *log_fd, uint32_t *stream)
+ int *log_fd, uint32_t *stream, uint32_t *shm_limit)
{
int rc;
int ready_fd, read_fd;
@@ -570,14 +623,14 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *read_port,
"%"PRIu32";"
"%"PRId64",%"PRIu32",%d;"
"%"PRId64",%"PRIu32",%d;"
- "%d",
+ "%d,%"PRIu32,
&ready_stream,
&ready_pid, &ready_id, &ready_fd,
&read_pid, &read_id, &read_fd,
- log_fd);
+ log_fd, shm_limit);
- if (nxt_slow_path(rc != 8)) {
- nxt_unit_alert(NULL, "failed to scan variables");
+ if (nxt_slow_path(rc != 9)) {
+ nxt_unit_alert(NULL, "failed to scan variables: %d", rc);
return NXT_UNIT_ERROR;
}
@@ -756,6 +809,10 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
rc = NXT_UNIT_OK;
break;
+ case _NXT_PORT_MSG_SHM_ACK:
+ rc = nxt_unit_process_shm_ack(ctx);
+ break;
+
default:
nxt_unit_debug(ctx, "#%"PRIu32": ignore message type: %d",
port_msg->stream, (int) port_msg->type);
@@ -961,7 +1018,6 @@ nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
ws_impl->ws.req = req;
ws_impl->buf = NULL;
- ws_impl->retain_buf = NULL;
if (recv_msg->mmap) {
for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
@@ -986,7 +1042,6 @@ nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
return NXT_UNIT_ERROR;
}
- b->hdr = NULL;
b->req = req;
b->buf.start = recv_msg->start;
b->buf.free = b->buf.start;
@@ -1038,6 +1093,23 @@ nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
}
+static int
+nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx)
+{
+ nxt_unit_impl_t *lib;
+ nxt_unit_callbacks_t *cb;
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+ cb = &lib->callbacks;
+
+ if (cb->shm_ack_handler != NULL) {
+ cb->shm_ack_handler(ctx);
+ }
+
+ return NXT_UNIT_OK;
+}
+
+
static nxt_unit_request_info_impl_t *
nxt_unit_request_info_get(nxt_unit_ctx_t *ctx)
{
@@ -1193,12 +1265,6 @@ nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws)
ws->req = NULL;
- if (ws_impl->retain_buf != NULL) {
- free(ws_impl->retain_buf);
-
- ws_impl->retain_buf = NULL;
- }
-
pthread_mutex_lock(&ws_impl->ctx_impl->mutex);
nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link);
@@ -1649,7 +1715,7 @@ nxt_unit_response_send(nxt_unit_request_info_t *req)
req->response_buf = NULL;
req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;
- nxt_unit_mmap_buf_release(mmap_buf);
+ nxt_unit_mmap_buf_free(mmap_buf);
}
return rc;
@@ -1697,7 +1763,8 @@ nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size)
nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf);
rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
- &req->response_port, size, mmap_buf);
+ &req->response_port, size, size, mmap_buf,
+ NULL);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
nxt_unit_mmap_buf_release(mmap_buf);
@@ -1755,13 +1822,16 @@ nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx)
} else {
mmap_buf = ctx_impl->free_buf;
- nxt_unit_mmap_buf_remove(mmap_buf);
+ nxt_unit_mmap_buf_unlink(mmap_buf);
pthread_mutex_unlock(&ctx_impl->mutex);
}
mmap_buf->ctx_impl = ctx_impl;
+ mmap_buf->hdr = NULL;
+ mmap_buf->free_ptr = NULL;
+
return mmap_buf;
}
@@ -1769,7 +1839,7 @@ nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx)
static void
nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf)
{
- nxt_unit_mmap_buf_remove(mmap_buf);
+ nxt_unit_mmap_buf_unlink(mmap_buf);
pthread_mutex_lock(&mmap_buf->ctx_impl->mutex);
@@ -1896,7 +1966,7 @@ nxt_unit_buf_send(nxt_unit_buf_t *buf)
}
}
- nxt_unit_mmap_buf_release(mmap_buf);
+ nxt_unit_mmap_buf_free(mmap_buf);
return NXT_UNIT_OK;
}
@@ -1917,7 +1987,7 @@ nxt_unit_buf_send_done(nxt_unit_buf_t *buf)
rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 1);
if (nxt_slow_path(rc == NXT_UNIT_OK)) {
- nxt_unit_mmap_buf_release(mmap_buf);
+ nxt_unit_mmap_buf_free(mmap_buf);
nxt_unit_request_info_release(req);
@@ -1936,7 +2006,8 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
nxt_port_mmap_msg_t mmap_msg;
} m;
- u_char *end, *last_used, *first_free;
+ int rc;
+ u_char *last_used, *first_free;
ssize_t res;
nxt_chunk_id_t first_free_chunk;
nxt_unit_buf_t *buf;
@@ -1960,38 +2031,85 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
m.msg.mf = 0;
m.msg.tracking = 0;
- if (hdr != NULL) {
+ rc = NXT_UNIT_ERROR;
+
+ if (m.msg.mmap) {
m.mmap_msg.mmap_id = hdr->id;
m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr,
(u_char *) buf->start);
- }
- nxt_unit_debug(ctx, "#%"PRIu32": send mmap: (%d,%d,%d)",
- stream,
- (int) m.mmap_msg.mmap_id,
- (int) m.mmap_msg.chunk_id,
- (int) m.mmap_msg.size);
+ nxt_unit_debug(ctx, "#%"PRIu32": send mmap: (%d,%d,%d)",
+ stream,
+ (int) m.mmap_msg.mmap_id,
+ (int) m.mmap_msg.chunk_id,
+ (int) m.mmap_msg.size);
- res = lib->callbacks.port_send(ctx, &mmap_buf->port_id, &m,
- m.msg.mmap ? sizeof(m) : sizeof(m.msg),
- NULL, 0);
- if (nxt_slow_path(res != sizeof(m))) {
- return NXT_UNIT_ERROR;
- }
+ res = lib->callbacks.port_send(ctx, &mmap_buf->port_id, &m, sizeof(m),
+ NULL, 0);
+ if (nxt_slow_path(res != sizeof(m))) {
+ goto free_buf;
+ }
- if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE && hdr != NULL) {
last_used = (u_char *) buf->free - 1;
-
first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1;
- first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk);
- end = (u_char *) buf->end;
- nxt_unit_mmap_release(hdr, first_free, (uint32_t) (end - first_free));
+ if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) {
+ first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk);
+
+ buf->start = (char *) first_free;
+ buf->free = buf->start;
+
+ if (buf->end < buf->start) {
+ buf->end = buf->start;
+ }
+
+ } else {
+ buf->start = NULL;
+ buf->free = NULL;
+ buf->end = NULL;
+
+ mmap_buf->hdr = NULL;
+ }
+
+ nxt_atomic_fetch_add(&mmap_buf->process->outgoing.allocated_chunks,
+ (int) m.mmap_msg.chunk_id - (int) first_free_chunk);
+
+ nxt_unit_debug(ctx, "process %d allocated_chunks %d",
+ mmap_buf->process->pid,
+ mmap_buf->process->outgoing.allocated_chunks);
+
+ } else {
+ if (nxt_slow_path(mmap_buf->plain_ptr == NULL
+ || mmap_buf->plain_ptr > buf->start - sizeof(m.msg)))
+ {
+ nxt_unit_warn(ctx, "#%"PRIu32": failed to send plain memory buffer"
+ ": no space reserved for message header", stream);
+
+ goto free_buf;
+ }
+
+ memcpy(buf->start - sizeof(m.msg), &m.msg, sizeof(m.msg));
+
+ nxt_unit_debug(ctx, "#%"PRIu32": send plain: %d",
+ stream,
+ (int) (sizeof(m.msg) + m.mmap_msg.size));
- buf->end = (char *) first_free;
+ res = lib->callbacks.port_send(ctx, &mmap_buf->port_id,
+ buf->start - sizeof(m.msg),
+ m.mmap_msg.size + sizeof(m.msg),
+ NULL, 0);
+ if (nxt_slow_path(res != (ssize_t) (m.mmap_msg.size + sizeof(m.msg)))) {
+ goto free_buf;
+ }
}
- return NXT_UNIT_OK;
+ rc = NXT_UNIT_OK;
+
+free_buf:
+
+ nxt_unit_free_outgoing_buf(mmap_buf);
+
+ return rc;
}
@@ -2005,12 +2123,83 @@ nxt_unit_buf_free(nxt_unit_buf_t *buf)
static void
nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf)
{
- if (nxt_fast_path(mmap_buf->hdr != NULL)) {
- nxt_unit_mmap_release(mmap_buf->hdr, mmap_buf->buf.start,
+ nxt_unit_free_outgoing_buf(mmap_buf);
+
+ nxt_unit_mmap_buf_release(mmap_buf);
+}
+
+
+static void
+nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf)
+{
+ if (mmap_buf->hdr != NULL) {
+ nxt_unit_mmap_release(&mmap_buf->ctx_impl->ctx,
+ mmap_buf->process,
+ mmap_buf->hdr, mmap_buf->buf.start,
mmap_buf->buf.end - mmap_buf->buf.start);
+
+ mmap_buf->hdr = NULL;
+
+ return;
}
- nxt_unit_mmap_buf_release(mmap_buf);
+ if (mmap_buf->free_ptr != NULL) {
+ free(mmap_buf->free_ptr);
+
+ mmap_buf->free_ptr = NULL;
+ }
+}
+
+
+static nxt_unit_read_buf_t *
+nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx)
+{
+ nxt_unit_ctx_impl_t *ctx_impl;
+
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
+ pthread_mutex_lock(&ctx_impl->mutex);
+
+ return nxt_unit_read_buf_get_impl(ctx_impl);
+}
+
+
+static nxt_unit_read_buf_t *
+nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl)
+{
+ nxt_unit_read_buf_t *rbuf;
+
+ if (ctx_impl->free_read_buf != NULL) {
+ rbuf = ctx_impl->free_read_buf;
+ ctx_impl->free_read_buf = rbuf->next;
+
+ pthread_mutex_unlock(&ctx_impl->mutex);
+
+ return rbuf;
+ }
+
+ pthread_mutex_unlock(&ctx_impl->mutex);
+
+ rbuf = malloc(sizeof(nxt_unit_read_buf_t));
+
+ return rbuf;
+}
+
+
+static void
+nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
+ nxt_unit_read_buf_t *rbuf)
+{
+ nxt_unit_ctx_impl_t *ctx_impl;
+
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
+ pthread_mutex_lock(&ctx_impl->mutex);
+
+ rbuf->next = ctx_impl->free_read_buf;
+ ctx_impl->free_read_buf = rbuf;
+
+ pthread_mutex_unlock(&ctx_impl->mutex);
}
@@ -2047,61 +2236,93 @@ int
nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
size_t size)
{
+ ssize_t res;
+
+ res = nxt_unit_response_write_nb(req, start, size, size);
+
+ return res < 0 ? -res : NXT_UNIT_OK;
+}
+
+
+ssize_t
+nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start,
+ size_t size, size_t min_size)
+{
int rc;
- uint32_t part_size;
+ ssize_t sent;
+ uint32_t part_size, min_part_size, buf_size;
const char *part_start;
nxt_unit_mmap_buf_t mmap_buf;
nxt_unit_request_info_impl_t *req_impl;
+ char local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
part_start = start;
+ sent = 0;
+
+ if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
+ nxt_unit_req_warn(req, "write: response not initialized yet");
+
+ return -NXT_UNIT_ERROR;
+ }
/* Check if response is not send yet. */
- if (nxt_slow_path(req->response_buf)) {
+ if (nxt_slow_path(req->response_buf != NULL)) {
part_size = req->response_buf->end - req->response_buf->free;
part_size = nxt_min(size, part_size);
rc = nxt_unit_response_add_content(req, part_start, part_size);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
- return rc;
+ return -rc;
}
rc = nxt_unit_response_send(req);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
- return rc;
+ return -rc;
}
size -= part_size;
part_start += part_size;
+ sent += part_size;
+
+ min_size -= nxt_min(min_size, part_size);
}
while (size > 0) {
part_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
+ min_part_size = nxt_min(min_size, part_size);
+ min_part_size = nxt_min(min_part_size, PORT_MMAP_CHUNK_SIZE);
rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
&req->response_port, part_size,
- &mmap_buf);
+ min_part_size, &mmap_buf, local_buf);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
- return rc;
+ return -rc;
}
+ buf_size = mmap_buf.buf.end - mmap_buf.buf.free;
+ if (nxt_slow_path(buf_size == 0)) {
+ return sent;
+ }
+ part_size = nxt_min(buf_size, part_size);
+
mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free,
part_start, part_size);
rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, &mmap_buf, 0);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
- nxt_unit_mmap_release(mmap_buf.hdr, mmap_buf.buf.start,
- mmap_buf.buf.end - mmap_buf.buf.start);
-
- return rc;
+ return -rc;
}
size -= part_size;
part_start += part_size;
+ sent += part_size;
+
+ min_size -= nxt_min(min_size, part_size);
}
- return NXT_UNIT_OK;
+ return sent;
}
@@ -2109,9 +2330,15 @@ int
nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
nxt_unit_read_info_t *read_info)
{
- int rc;
- ssize_t n;
- nxt_unit_buf_t *buf;
+ int rc;
+ ssize_t n;
+ uint32_t buf_size;
+ nxt_unit_buf_t *buf;
+ nxt_unit_mmap_buf_t mmap_buf;
+ nxt_unit_request_info_impl_t *req_impl;
+ char local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
+
+ req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
/* Check if response is not send yet. */
if (nxt_slow_path(req->response_buf)) {
@@ -2159,20 +2386,24 @@ nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
nxt_unit_req_debug(req, "write_cb, alloc %"PRIu32"",
read_info->buf_size);
- buf = nxt_unit_response_buf_alloc(req, nxt_min(read_info->buf_size,
- PORT_MMAP_DATA_SIZE));
- if (nxt_slow_path(buf == NULL)) {
- nxt_unit_req_error(req, "Failed to allocate buf for content");
+ buf_size = nxt_min(read_info->buf_size, PORT_MMAP_DATA_SIZE);
- return NXT_UNIT_ERROR;
+ rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
+ &req->response_port,
+ buf_size, buf_size,
+ &mmap_buf, local_buf);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ return rc;
}
+ buf = &mmap_buf.buf;
+
while (!read_info->eof && buf->end > buf->free) {
n = read_info->read(read_info, buf->free, buf->end - buf->free);
if (nxt_slow_path(n < 0)) {
nxt_unit_req_error(req, "Read error");
- nxt_unit_buf_free(buf);
+ nxt_unit_free_outgoing_buf(&mmap_buf);
return NXT_UNIT_ERROR;
}
@@ -2180,7 +2411,7 @@ nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
buf->free += n;
}
- rc = nxt_unit_buf_send(buf);
+ rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, &mmap_buf, 0);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
nxt_unit_req_error(req, "Failed to send content");
@@ -2325,12 +2556,17 @@ int
nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode,
uint8_t last, const struct iovec *iov, int iovcnt)
{
- int i, rc;
- size_t l, copy;
- uint32_t payload_len, buf_size;
- const uint8_t *b;
- nxt_unit_buf_t *buf;
- nxt_websocket_header_t *wh;
+ int i, rc;
+ size_t l, copy;
+ uint32_t payload_len, buf_size, alloc_size;
+ const uint8_t *b;
+ nxt_unit_buf_t *buf;
+ nxt_unit_mmap_buf_t mmap_buf;
+ nxt_websocket_header_t *wh;
+ nxt_unit_request_info_impl_t *req_impl;
+ char local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
+
+ req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
payload_len = 0;
@@ -2339,18 +2575,23 @@ nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode,
}
buf_size = 10 + payload_len;
+ alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE);
- buf = nxt_unit_response_buf_alloc(req, nxt_min(buf_size,
- PORT_MMAP_DATA_SIZE));
- if (nxt_slow_path(buf == NULL)) {
- nxt_unit_req_error(req, "Failed to allocate buf for content");
-
- return NXT_UNIT_ERROR;
+ rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
+ &req->response_port,
+ alloc_size, alloc_size,
+ &mmap_buf, local_buf);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ return rc;
}
+ buf = &mmap_buf.buf;
+
buf->start[0] = 0;
buf->start[1] = 0;
+ buf_size -= buf->end - buf->start;
+
wh = (void *) buf->free;
buf->free = nxt_websocket_frame_init(wh, payload_len);
@@ -2370,32 +2611,33 @@ nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode,
l -= copy;
if (l > 0) {
- buf_size -= buf->end - buf->start;
-
- rc = nxt_unit_buf_send(buf);
- if (nxt_slow_path(rc != NXT_UNIT_OK)) {
- nxt_unit_req_error(req, "Failed to send content");
+ if (nxt_fast_path(buf->free > buf->start)) {
+ rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream,
+ &mmap_buf, 0);
- return NXT_UNIT_ERROR;
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ return rc;
+ }
}
- buf = nxt_unit_response_buf_alloc(req, nxt_min(buf_size,
- PORT_MMAP_DATA_SIZE));
- if (nxt_slow_path(buf == NULL)) {
- nxt_unit_req_error(req,
- "Failed to allocate buf for content");
+ alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE);
- return NXT_UNIT_ERROR;
+ rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
+ &req->response_port,
+ alloc_size, alloc_size,
+ &mmap_buf, local_buf);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ return rc;
}
+
+ buf_size -= buf->end - buf->start;
}
}
}
if (buf->free > buf->start) {
- rc = nxt_unit_buf_send(buf);
- if (nxt_slow_path(rc != NXT_UNIT_OK)) {
- nxt_unit_req_error(req, "Failed to send content");
- }
+ rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream,
+ &mmap_buf, 0);
}
return rc;
@@ -2437,7 +2679,7 @@ nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws)
ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);
- if (ws_impl->retain_buf != NULL || ws_impl->buf->hdr != NULL) {
+ if (ws_impl->buf->free_ptr != NULL || ws_impl->buf->hdr != NULL) {
return NXT_UNIT_OK;
}
@@ -2454,7 +2696,7 @@ nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws)
ws_impl->buf->buf.free = b;
ws_impl->buf->buf.end = b + size;
- ws_impl->retain_buf = b;
+ ws_impl->buf->free_ptr = b;
return NXT_UNIT_OK;
}
@@ -2469,15 +2711,23 @@ nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws)
static nxt_port_mmap_header_t *
nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
- nxt_unit_port_id_t *port_id, nxt_chunk_id_t *c, int n)
+ nxt_unit_port_id_t *port_id, nxt_chunk_id_t *c, int *n, int min_n)
{
int res, nchunks, i;
+ uint32_t outgoing_size;
nxt_unit_mmap_t *mm, *mm_end;
+ nxt_unit_impl_t *lib;
nxt_port_mmap_header_t *hdr;
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
pthread_mutex_lock(&process->outgoing.mutex);
- mm_end = process->outgoing.elts + process->outgoing.size;
+retry:
+
+ outgoing_size = process->outgoing.size;
+
+ mm_end = process->outgoing.elts + outgoing_size;
for (mm = process->outgoing.elts; mm < mm_end; mm++) {
hdr = mm->hdr;
@@ -2491,11 +2741,17 @@ nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
while (nxt_port_mmap_get_free_chunk(hdr->free_map, c)) {
nchunks = 1;
- while (nchunks < n) {
+ while (nchunks < *n) {
res = nxt_port_mmap_chk_set_chunk_busy(hdr->free_map,
*c + nchunks);
if (res == 0) {
+ if (nchunks >= min_n) {
+ *n = nchunks;
+
+ goto unlock;
+ }
+
for (i = 0; i < nchunks; i++) {
nxt_port_mmap_set_chunk_free(hdr->free_map, *c + i);
}
@@ -2508,23 +2764,155 @@ nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
nchunks++;
}
- if (nchunks == n) {
+ if (nchunks >= min_n) {
+ *n = nchunks;
+
goto unlock;
}
}
+
+ hdr->oosm = 1;
+ }
+
+ if (outgoing_size >= lib->shm_mmap_limit) {
+ /* Cannot allocate more shared memory. */
+ pthread_mutex_unlock(&process->outgoing.mutex);
+
+ if (min_n == 0) {
+ *n = 0;
+ }
+
+ if (nxt_slow_path(process->outgoing.allocated_chunks + min_n
+ >= lib->shm_mmap_limit * PORT_MMAP_CHUNK_COUNT))
+ {
+ /* Memory allocated by application, but not send to router. */
+ return NULL;
+ }
+
+ /* Notify router about OOSM condition. */
+
+ res = nxt_unit_send_oosm(ctx, port_id);
+ if (nxt_slow_path(res != NXT_UNIT_OK)) {
+ return NULL;
+ }
+
+ /* Return if caller can handle OOSM condition. Non-blocking mode. */
+
+ if (min_n == 0) {
+ return NULL;
+ }
+
+ nxt_unit_debug(ctx, "oosm: waiting for ACK");
+
+ res = nxt_unit_wait_shm_ack(ctx);
+ if (nxt_slow_path(res != NXT_UNIT_OK)) {
+ return NULL;
+ }
+
+ nxt_unit_debug(ctx, "oosm: retry");
+
+ pthread_mutex_lock(&process->outgoing.mutex);
+
+ goto retry;
}
*c = 0;
- hdr = nxt_unit_new_mmap(ctx, process, port_id, n);
+ hdr = nxt_unit_new_mmap(ctx, process, port_id, *n);
unlock:
+ nxt_atomic_fetch_add(&process->outgoing.allocated_chunks, *n);
+
+ nxt_unit_debug(ctx, "process %d allocated_chunks %d",
+ process->pid,
+ process->outgoing.allocated_chunks);
+
pthread_mutex_unlock(&process->outgoing.mutex);
return hdr;
}
+static int
+nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
+{
+ ssize_t res;
+ nxt_port_msg_t msg;
+ nxt_unit_impl_t *lib;
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+ msg.stream = 0;
+ msg.pid = lib->pid;
+ msg.reply_port = 0;
+ msg.type = _NXT_PORT_MSG_OOSM;
+ msg.last = 0;
+ msg.mmap = 0;
+ msg.nf = 0;
+ msg.mf = 0;
+ msg.tracking = 0;
+
+ res = lib->callbacks.port_send(ctx, port_id, &msg, sizeof(msg), NULL, 0);
+ if (nxt_slow_path(res != sizeof(msg))) {
+ nxt_unit_warn(ctx, "failed to send oosm to %d: %s (%d)",
+ (int) port_id->pid, strerror(errno), errno);
+
+ return NXT_UNIT_ERROR;
+ }
+
+ return NXT_UNIT_OK;
+}
+
+
+static int
+nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx)
+{
+ nxt_port_msg_t *port_msg;
+ nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_read_buf_t *rbuf;
+
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
+ while (1) {
+ rbuf = nxt_unit_read_buf_get(ctx);
+ if (nxt_slow_path(rbuf == NULL)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ nxt_unit_read_buf(ctx, rbuf);
+ if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) {
+ nxt_unit_read_buf_release(ctx, rbuf);
+
+ return NXT_UNIT_ERROR;
+ }
+
+ port_msg = (nxt_port_msg_t *) rbuf->buf;
+
+ if (port_msg->type == _NXT_PORT_MSG_SHM_ACK) {
+ nxt_unit_read_buf_release(ctx, rbuf);
+
+ break;
+ }
+
+ pthread_mutex_lock(&ctx_impl->mutex);
+
+ *ctx_impl->pending_read_tail = rbuf;
+ ctx_impl->pending_read_tail = &rbuf->next;
+ rbuf->next = NULL;
+
+ pthread_mutex_unlock(&ctx_impl->mutex);
+
+ if (port_msg->type == _NXT_PORT_MSG_QUIT) {
+ nxt_unit_debug(ctx, "oosm: quit received");
+
+ return NXT_UNIT_ERROR;
+ }
+ }
+
+ return NXT_UNIT_OK;
+}
+
+
static nxt_unit_mmap_t *
nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i)
{
@@ -2759,17 +3147,55 @@ nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int fd)
static int
nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
- nxt_unit_port_id_t *port_id, uint32_t size,
- nxt_unit_mmap_buf_t *mmap_buf)
+ nxt_unit_port_id_t *port_id, uint32_t size, uint32_t min_size,
+ nxt_unit_mmap_buf_t *mmap_buf, char *local_buf)
{
- uint32_t nchunks;
+ int nchunks, min_nchunks;
nxt_chunk_id_t c;
nxt_port_mmap_header_t *hdr;
+ if (size <= NXT_UNIT_MAX_PLAIN_SIZE) {
+ if (local_buf != NULL) {
+ mmap_buf->free_ptr = NULL;
+ mmap_buf->plain_ptr = local_buf;
+
+ } else {
+ mmap_buf->free_ptr = malloc(size + sizeof(nxt_port_msg_t));
+ if (nxt_slow_path(mmap_buf->free_ptr == NULL)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ mmap_buf->plain_ptr = mmap_buf->free_ptr;
+ }
+
+ mmap_buf->hdr = NULL;
+ mmap_buf->buf.start = mmap_buf->plain_ptr + sizeof(nxt_port_msg_t);
+ mmap_buf->buf.free = mmap_buf->buf.start;
+ mmap_buf->buf.end = mmap_buf->buf.start + size;
+ mmap_buf->port_id = *port_id;
+ mmap_buf->process = process;
+
+ nxt_unit_debug(ctx, "outgoing plain buffer allocation: (%p, %d)",
+ mmap_buf->buf.start, (int) size);
+
+ return NXT_UNIT_OK;
+ }
+
nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
+ min_nchunks = (min_size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
- hdr = nxt_unit_mmap_get(ctx, process, port_id, &c, nchunks);
+ hdr = nxt_unit_mmap_get(ctx, process, port_id, &c, &nchunks, min_nchunks);
if (nxt_slow_path(hdr == NULL)) {
+ if (nxt_fast_path(min_nchunks == 0 && nchunks == 0)) {
+ mmap_buf->hdr = NULL;
+ mmap_buf->buf.start = NULL;
+ mmap_buf->buf.free = NULL;
+ mmap_buf->buf.end = NULL;
+ mmap_buf->free_ptr = NULL;
+
+ return NXT_UNIT_OK;
+ }
+
return NXT_UNIT_ERROR;
}
@@ -2778,6 +3204,8 @@ nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
mmap_buf->buf.free = mmap_buf->buf.start;
mmap_buf->buf.end = mmap_buf->buf.start + nchunks * PORT_MMAP_CHUNK_SIZE;
mmap_buf->port_id = *port_id;
+ mmap_buf->process = process;
+ mmap_buf->free_ptr = NULL;
nxt_unit_debug(ctx, "outgoing mmap allocation: (%d,%d,%d)",
(int) hdr->id, (int) c,
@@ -2880,6 +3308,7 @@ nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps)
mmaps->size = 0;
mmaps->cap = 0;
mmaps->elts = NULL;
+ mmaps->allocated_chunks = 0;
}
@@ -3020,6 +3449,22 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
incoming_tail = &recv_msg->incoming_buf;
+ for (; mmap_msg < end; mmap_msg++) {
+ b = nxt_unit_mmap_buf_get(ctx);
+ if (nxt_slow_path(b == NULL)) {
+ nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf",
+ recv_msg->stream);
+
+ return NXT_UNIT_ERROR;
+ }
+
+ nxt_unit_mmap_buf_insert(incoming_tail, b);
+ incoming_tail = &b->next;
+ }
+
+ b = recv_msg->incoming_buf;
+ mmap_msg = recv_msg->start;
+
pthread_mutex_lock(&process->incoming.mutex);
for (; mmap_msg < end; mmap_msg++) {
@@ -3043,25 +3488,13 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
recv_msg->size = size;
}
- b = nxt_unit_mmap_buf_get(ctx);
- if (nxt_slow_path(b == NULL)) {
- pthread_mutex_unlock(&process->incoming.mutex);
-
- nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf",
- recv_msg->stream);
-
- nxt_unit_mmap_release(hdr, start, size);
-
- return NXT_UNIT_ERROR;
- }
-
- nxt_unit_mmap_buf_insert(incoming_tail, b);
- incoming_tail = &b->next;
-
b->buf.start = start;
b->buf.free = start;
b->buf.end = b->buf.start + size;
b->hdr = hdr;
+ b->process = process;
+
+ b = b->next;
nxt_unit_debug(ctx, "#%"PRIu32": mmap_read: [%p,%d] %d->%d,(%d,%d,%d)",
recv_msg->stream,
@@ -3077,23 +3510,79 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
}
-static int
-nxt_unit_mmap_release(nxt_port_mmap_header_t *hdr, void *start, uint32_t size)
+static void
+nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
+ nxt_unit_process_t *process, nxt_port_mmap_header_t *hdr,
+ void *start, uint32_t size)
{
- u_char *p, *end;
- nxt_chunk_id_t c;
+ int freed_chunks;
+ u_char *p, *end;
+ nxt_chunk_id_t c;
+ nxt_unit_impl_t *lib;
memset(start, 0xA5, size);
p = start;
end = p + size;
c = nxt_port_mmap_chunk_id(hdr, p);
+ freed_chunks = 0;
while (p < end) {
nxt_port_mmap_set_chunk_free(hdr->free_map, c);
p += PORT_MMAP_CHUNK_SIZE;
c++;
+ freed_chunks++;
+ }
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+ if (hdr->src_pid == lib->pid && freed_chunks != 0) {
+ nxt_atomic_fetch_add(&process->outgoing.allocated_chunks,
+ -freed_chunks);
+
+ nxt_unit_debug(ctx, "process %d allocated_chunks %d",
+ process->pid,
+ process->outgoing.allocated_chunks);
+ }
+
+ if (hdr->dst_pid == lib->pid
+ && freed_chunks != 0
+ && nxt_atomic_cmp_set(&hdr->oosm, 1, 0))
+ {
+ nxt_unit_send_shm_ack(ctx, hdr->src_pid);
+ }
+}
+
+
+static int
+nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid)
+{
+ ssize_t res;
+ nxt_port_msg_t msg;
+ nxt_unit_impl_t *lib;
+ nxt_unit_port_id_t port_id;
+
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+ nxt_unit_port_id_init(&port_id, pid, 0);
+
+ msg.stream = 0;
+ msg.pid = lib->pid;
+ msg.reply_port = 0;
+ msg.type = _NXT_PORT_MSG_SHM_ACK;
+ msg.last = 0;
+ msg.mmap = 0;
+ msg.nf = 0;
+ msg.mf = 0;
+ msg.tracking = 0;
+
+ res = lib->callbacks.port_send(ctx, &port_id, &msg, sizeof(msg), NULL, 0);
+ if (nxt_slow_path(res != sizeof(msg))) {
+ nxt_unit_warn(ctx, "failed to send ack to %d: %s (%d)",
+ (int) port_id.pid, strerror(errno), errno);
+
+ return NXT_UNIT_ERROR;
}
return NXT_UNIT_OK;
@@ -3255,43 +3744,76 @@ int
nxt_unit_run_once(nxt_unit_ctx_t *ctx)
{
int rc;
- char buf[4096];
- char oob[256];
- ssize_t rsize;
- nxt_unit_impl_t *lib;
nxt_unit_ctx_impl_t *ctx_impl;
+ nxt_unit_read_buf_t *rbuf;
- lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
- memset(oob, 0, sizeof(struct cmsghdr));
+ pthread_mutex_lock(&ctx_impl->mutex);
+
+ if (ctx_impl->pending_read_head != NULL) {
+ rbuf = ctx_impl->pending_read_head;
+ ctx_impl->pending_read_head = rbuf->next;
+
+ if (ctx_impl->pending_read_tail == &rbuf->next) {
+ ctx_impl->pending_read_tail = &ctx_impl->pending_read_head;
+ }
+
+ pthread_mutex_unlock(&ctx_impl->mutex);
- if (ctx_impl->read_port_fd != -1) {
- rsize = nxt_unit_port_recv(ctx, ctx_impl->read_port_fd,
- buf, sizeof(buf),
- oob, sizeof(oob));
} else {
- rsize = lib->callbacks.port_recv(ctx, &ctx_impl->read_port_id,
- buf, sizeof(buf),
- oob, sizeof(oob));
+ rbuf = nxt_unit_read_buf_get_impl(ctx_impl);
+ if (nxt_slow_path(rbuf == NULL)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ nxt_unit_read_buf(ctx, rbuf);
}
- if (nxt_fast_path(rsize > 0)) {
- rc = nxt_unit_process_msg(ctx, &ctx_impl->read_port_id, buf, rsize,
- oob, sizeof(oob));
+ if (nxt_fast_path(rbuf->size > 0)) {
+ rc = nxt_unit_process_msg(ctx, &ctx_impl->read_port_id,
+ rbuf->buf, rbuf->size,
+ rbuf->oob, sizeof(rbuf->oob));
#if (NXT_DEBUG)
- memset(buf, 0xAC, rsize);
+ memset(rbuf->buf, 0xAC, rbuf->size);
#endif
} else {
rc = NXT_UNIT_ERROR;
}
+ nxt_unit_read_buf_release(ctx, rbuf);
+
return rc;
}
+static void
+nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
+{
+ nxt_unit_impl_t *lib;
+ nxt_unit_ctx_impl_t *ctx_impl;
+
+ ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
+
+ memset(rbuf->oob, 0, sizeof(struct cmsghdr));
+
+ if (ctx_impl->read_port_fd != -1) {
+ rbuf->size = nxt_unit_port_recv(ctx, ctx_impl->read_port_fd,
+ rbuf->buf, sizeof(rbuf->buf),
+ rbuf->oob, sizeof(rbuf->oob));
+
+ } else {
+ lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
+
+ rbuf->size = lib->callbacks.port_recv(ctx, &ctx_impl->read_port_id,
+ rbuf->buf, sizeof(rbuf->buf),
+ rbuf->oob, sizeof(rbuf->oob));
+ }
+}
+
+
void
nxt_unit_done(nxt_unit_ctx_t *ctx)
{
@@ -3399,12 +3921,12 @@ nxt_unit_ctx_free(nxt_unit_ctx_t *ctx)
} nxt_queue_loop;
- nxt_unit_mmap_buf_remove(&ctx_impl->ctx_buf[0]);
- nxt_unit_mmap_buf_remove(&ctx_impl->ctx_buf[1]);
+ nxt_unit_mmap_buf_unlink(&ctx_impl->ctx_buf[0]);
+ nxt_unit_mmap_buf_unlink(&ctx_impl->ctx_buf[1]);
while (ctx_impl->free_buf != NULL) {
mmap_buf = ctx_impl->free_buf;
- nxt_unit_mmap_buf_remove(mmap_buf);
+ nxt_unit_mmap_buf_unlink(mmap_buf);
free(mmap_buf);
}