summaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/nxt_unit.c304
1 files changed, 203 insertions, 101 deletions
diff --git a/src/nxt_unit.c b/src/nxt_unit.c
index 0cf32916..d97d19f2 100644
--- a/src/nxt_unit.c
+++ b/src/nxt_unit.c
@@ -19,6 +19,10 @@
#include <linux/memfd.h>
#endif
+#define NXT_UNIT_MAX_PLAIN_SIZE 1024
+#define NXT_UNIT_LOCAL_BUF_SIZE \
+ (NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t))
+
typedef struct nxt_unit_impl_s nxt_unit_impl_t;
typedef struct nxt_unit_mmap_s nxt_unit_mmap_t;
typedef struct nxt_unit_mmaps_s nxt_unit_mmaps_t;
@@ -63,6 +67,7 @@ static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf);
static int nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
nxt_unit_mmap_buf_t *mmap_buf, int last);
static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf);
+static void nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf);
static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst,
size_t size);
static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx,
@@ -75,7 +80,7 @@ static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
int fd);
static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx,
nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, uint32_t size,
- nxt_unit_mmap_buf_t *mmap_buf);
+ nxt_unit_mmap_buf_t *mmap_buf, char *local_buf);
static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd);
static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps);
@@ -135,10 +140,11 @@ struct nxt_unit_mmap_buf_s {
nxt_unit_mmap_buf_t **prev;
nxt_port_mmap_header_t *hdr;
-// nxt_queue_link_t link;
nxt_unit_port_id_t port_id;
nxt_unit_request_info_t *req;
nxt_unit_ctx_impl_t *ctx_impl;
+ char *free_ptr;
+ char *plain_ptr;
};
@@ -196,8 +202,6 @@ struct nxt_unit_websocket_frame_impl_s {
nxt_queue_link_t link;
nxt_unit_ctx_impl_t *ctx_impl;
-
- void *retain_buf;
};
@@ -961,7 +965,6 @@ nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
ws_impl->ws.req = req;
ws_impl->buf = NULL;
- ws_impl->retain_buf = NULL;
if (recv_msg->mmap) {
for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
@@ -986,7 +989,6 @@ nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
return NXT_UNIT_ERROR;
}
- b->hdr = NULL;
b->req = req;
b->buf.start = recv_msg->start;
b->buf.free = b->buf.start;
@@ -1193,12 +1195,6 @@ nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws)
ws->req = NULL;
- if (ws_impl->retain_buf != NULL) {
- free(ws_impl->retain_buf);
-
- ws_impl->retain_buf = NULL;
- }
-
pthread_mutex_lock(&ws_impl->ctx_impl->mutex);
nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link);
@@ -1649,7 +1645,7 @@ nxt_unit_response_send(nxt_unit_request_info_t *req)
req->response_buf = NULL;
req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;
- nxt_unit_mmap_buf_release(mmap_buf);
+ nxt_unit_mmap_buf_free(mmap_buf);
}
return rc;
@@ -1697,7 +1693,7 @@ nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size)
nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf);
rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
- &req->response_port, size, mmap_buf);
+ &req->response_port, size, mmap_buf, NULL);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
nxt_unit_mmap_buf_release(mmap_buf);
@@ -1762,6 +1758,9 @@ nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx)
mmap_buf->ctx_impl = ctx_impl;
+ mmap_buf->hdr = NULL;
+ mmap_buf->free_ptr = NULL;
+
return mmap_buf;
}
@@ -1896,7 +1895,7 @@ nxt_unit_buf_send(nxt_unit_buf_t *buf)
}
}
- nxt_unit_mmap_buf_release(mmap_buf);
+ nxt_unit_mmap_buf_free(mmap_buf);
return NXT_UNIT_OK;
}
@@ -1917,7 +1916,7 @@ nxt_unit_buf_send_done(nxt_unit_buf_t *buf)
rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 1);
if (nxt_slow_path(rc == NXT_UNIT_OK)) {
- nxt_unit_mmap_buf_release(mmap_buf);
+ nxt_unit_mmap_buf_free(mmap_buf);
nxt_unit_request_info_release(req);
@@ -1936,7 +1935,7 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
nxt_port_mmap_msg_t mmap_msg;
} m;
- u_char *end, *last_used, *first_free;
+ u_char *last_used, *first_free;
ssize_t res;
nxt_chunk_id_t first_free_chunk;
nxt_unit_buf_t *buf;
@@ -1960,35 +1959,67 @@ nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
m.msg.mf = 0;
m.msg.tracking = 0;
- if (hdr != NULL) {
+ if (m.msg.mmap) {
m.mmap_msg.mmap_id = hdr->id;
m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr,
(u_char *) buf->start);
- }
- nxt_unit_debug(ctx, "#%"PRIu32": send mmap: (%d,%d,%d)",
- stream,
- (int) m.mmap_msg.mmap_id,
- (int) m.mmap_msg.chunk_id,
- (int) m.mmap_msg.size);
+ nxt_unit_debug(ctx, "#%"PRIu32": send mmap: (%d,%d,%d)",
+ stream,
+ (int) m.mmap_msg.mmap_id,
+ (int) m.mmap_msg.chunk_id,
+ (int) m.mmap_msg.size);
- res = lib->callbacks.port_send(ctx, &mmap_buf->port_id, &m,
- m.msg.mmap ? sizeof(m) : sizeof(m.msg),
- NULL, 0);
- if (nxt_slow_path(res != sizeof(m))) {
- return NXT_UNIT_ERROR;
- }
+ res = lib->callbacks.port_send(ctx, &mmap_buf->port_id, &m, sizeof(m),
+ NULL, 0);
+ if (nxt_slow_path(res != sizeof(m))) {
+ return NXT_UNIT_ERROR;
+ }
+
+ if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) {
+ last_used = (u_char *) buf->free - 1;
+
+ first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1;
+ first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk);
- if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE && hdr != NULL) {
- last_used = (u_char *) buf->free - 1;
+ buf->start = (char *) first_free;
+ buf->free = buf->start;
- first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1;
- first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk);
- end = (u_char *) buf->end;
+ if (buf->end < buf->start) {
+ buf->end = buf->start;
+ }
- nxt_unit_mmap_release(hdr, first_free, (uint32_t) (end - first_free));
+ } else {
+ buf->start = NULL;
+ buf->free = NULL;
+ buf->end = NULL;
- buf->end = (char *) first_free;
+ mmap_buf->hdr = NULL;
+ }
+
+ } else {
+ if (nxt_slow_path(mmap_buf->plain_ptr == NULL
+ || mmap_buf->plain_ptr > buf->start - sizeof(m.msg)))
+ {
+ nxt_unit_warn(ctx, "#%"PRIu32": failed to send plain memory buffer"
+ ": no space reserved for message header", stream);
+
+ return NXT_UNIT_ERROR;
+ }
+
+ memcpy(buf->start - sizeof(m.msg), &m.msg, sizeof(m.msg));
+
+ nxt_unit_debug(ctx, "#%"PRIu32": send plain: %d",
+ stream,
+ (int) (sizeof(m.msg) + m.mmap_msg.size));
+
+ res = lib->callbacks.port_send(ctx, &mmap_buf->port_id,
+ buf->start - sizeof(m.msg),
+ m.mmap_msg.size + sizeof(m.msg),
+ NULL, 0);
+ if (nxt_slow_path(res != (ssize_t) (m.mmap_msg.size + sizeof(m.msg)))) {
+ return NXT_UNIT_ERROR;
+ }
}
return NXT_UNIT_OK;
@@ -2005,12 +2036,22 @@ nxt_unit_buf_free(nxt_unit_buf_t *buf)
static void
nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf)
{
- if (nxt_fast_path(mmap_buf->hdr != NULL)) {
+ nxt_unit_free_outgoing_buf(mmap_buf);
+
+ nxt_unit_mmap_buf_release(mmap_buf);
+}
+
+
+static void
+nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf)
+{
+ if (mmap_buf->hdr != NULL) {
nxt_unit_mmap_release(mmap_buf->hdr, mmap_buf->buf.start,
mmap_buf->buf.end - mmap_buf->buf.start);
- }
- nxt_unit_mmap_buf_release(mmap_buf);
+ } else if (mmap_buf->free_ptr != NULL) {
+ free(mmap_buf->free_ptr);
+ }
}
@@ -2052,11 +2093,18 @@ nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
const char *part_start;
nxt_unit_mmap_buf_t mmap_buf;
nxt_unit_request_info_impl_t *req_impl;
+ char local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
part_start = start;
+ if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
+ nxt_unit_req_warn(req, "write: response not initialized yet");
+
+ return NXT_UNIT_ERROR;
+ }
+
/* Check if response is not send yet. */
if (nxt_slow_path(req->response_buf)) {
part_size = req->response_buf->end - req->response_buf->free;
@@ -2081,7 +2129,7 @@ nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
&req->response_port, part_size,
- &mmap_buf);
+ &mmap_buf, local_buf);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
return rc;
}
@@ -2090,10 +2138,10 @@ nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
part_start, part_size);
rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, &mmap_buf, 0);
- if (nxt_slow_path(rc != NXT_UNIT_OK)) {
- nxt_unit_mmap_release(mmap_buf.hdr, mmap_buf.buf.start,
- mmap_buf.buf.end - mmap_buf.buf.start);
+ nxt_unit_free_outgoing_buf(&mmap_buf);
+
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
return rc;
}
@@ -2109,9 +2157,14 @@ int
nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
nxt_unit_read_info_t *read_info)
{
- int rc;
- ssize_t n;
- nxt_unit_buf_t *buf;
+ int rc;
+ ssize_t n;
+ nxt_unit_buf_t *buf;
+ nxt_unit_mmap_buf_t mmap_buf;
+ nxt_unit_request_info_impl_t *req_impl;
+ char local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
+
+ req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
/* Check if response is not send yet. */
if (nxt_slow_path(req->response_buf)) {
@@ -2159,20 +2212,23 @@ nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
nxt_unit_req_debug(req, "write_cb, alloc %"PRIu32"",
read_info->buf_size);
- buf = nxt_unit_response_buf_alloc(req, nxt_min(read_info->buf_size,
- PORT_MMAP_DATA_SIZE));
- if (nxt_slow_path(buf == NULL)) {
- nxt_unit_req_error(req, "Failed to allocate buf for content");
-
- return NXT_UNIT_ERROR;
+ rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
+ &req->response_port,
+ nxt_min(read_info->buf_size,
+ PORT_MMAP_DATA_SIZE),
+ &mmap_buf, local_buf);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ return rc;
}
+ buf = &mmap_buf.buf;
+
while (!read_info->eof && buf->end > buf->free) {
n = read_info->read(read_info, buf->free, buf->end - buf->free);
if (nxt_slow_path(n < 0)) {
nxt_unit_req_error(req, "Read error");
- nxt_unit_buf_free(buf);
+ nxt_unit_free_outgoing_buf(&mmap_buf);
return NXT_UNIT_ERROR;
}
@@ -2180,7 +2236,10 @@ nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
buf->free += n;
}
- rc = nxt_unit_buf_send(buf);
+ rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, &mmap_buf, 0);
+
+ nxt_unit_free_outgoing_buf(&mmap_buf);
+
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
nxt_unit_req_error(req, "Failed to send content");
@@ -2325,12 +2384,17 @@ int
nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode,
uint8_t last, const struct iovec *iov, int iovcnt)
{
- int i, rc;
- size_t l, copy;
- uint32_t payload_len, buf_size;
- const uint8_t *b;
- nxt_unit_buf_t *buf;
- nxt_websocket_header_t *wh;
+ int i, rc;
+ size_t l, copy;
+ uint32_t payload_len, buf_size;
+ const uint8_t *b;
+ nxt_unit_buf_t *buf;
+ nxt_unit_mmap_buf_t mmap_buf;
+ nxt_websocket_header_t *wh;
+ nxt_unit_request_info_impl_t *req_impl;
+ char local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
+
+ req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
payload_len = 0;
@@ -2340,17 +2404,21 @@ nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode,
buf_size = 10 + payload_len;
- buf = nxt_unit_response_buf_alloc(req, nxt_min(buf_size,
- PORT_MMAP_DATA_SIZE));
- if (nxt_slow_path(buf == NULL)) {
- nxt_unit_req_error(req, "Failed to allocate buf for content");
-
- return NXT_UNIT_ERROR;
+ rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
+ &req->response_port,
+ nxt_min(buf_size, PORT_MMAP_DATA_SIZE),
+ &mmap_buf, local_buf);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ return rc;
}
+ buf = &mmap_buf.buf;
+
buf->start[0] = 0;
buf->start[1] = 0;
+ buf_size -= buf->end - buf->start;
+
wh = (void *) buf->free;
buf->free = nxt_websocket_frame_init(wh, payload_len);
@@ -2370,32 +2438,36 @@ nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode,
l -= copy;
if (l > 0) {
- buf_size -= buf->end - buf->start;
+ if (nxt_fast_path(buf->free > buf->start)) {
+ rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream,
+ &mmap_buf, 0);
- rc = nxt_unit_buf_send(buf);
- if (nxt_slow_path(rc != NXT_UNIT_OK)) {
- nxt_unit_req_error(req, "Failed to send content");
+ nxt_unit_free_outgoing_buf(&mmap_buf);
- return NXT_UNIT_ERROR;
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ return rc;
+ }
}
- buf = nxt_unit_response_buf_alloc(req, nxt_min(buf_size,
- PORT_MMAP_DATA_SIZE));
- if (nxt_slow_path(buf == NULL)) {
- nxt_unit_req_error(req,
- "Failed to allocate buf for content");
-
- return NXT_UNIT_ERROR;
+ rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
+ &req->response_port,
+ nxt_min(buf_size,
+ PORT_MMAP_DATA_SIZE),
+ &mmap_buf, local_buf);
+ if (nxt_slow_path(rc != NXT_UNIT_OK)) {
+ return rc;
}
+
+ buf_size -= buf->end - buf->start;
}
}
}
if (buf->free > buf->start) {
- rc = nxt_unit_buf_send(buf);
- if (nxt_slow_path(rc != NXT_UNIT_OK)) {
- nxt_unit_req_error(req, "Failed to send content");
- }
+ rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream,
+ &mmap_buf, 0);
+
+ nxt_unit_free_outgoing_buf(&mmap_buf);
}
return rc;
@@ -2437,7 +2509,7 @@ nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws)
ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);
- if (ws_impl->retain_buf != NULL || ws_impl->buf->hdr != NULL) {
+ if (ws_impl->buf->free_ptr != NULL || ws_impl->buf->hdr != NULL) {
return NXT_UNIT_OK;
}
@@ -2454,7 +2526,7 @@ nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws)
ws_impl->buf->buf.free = b;
ws_impl->buf->buf.end = b + size;
- ws_impl->retain_buf = b;
+ ws_impl->buf->free_ptr = b;
return NXT_UNIT_OK;
}
@@ -2760,12 +2832,38 @@ nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id, int fd)
static int
nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
nxt_unit_port_id_t *port_id, uint32_t size,
- nxt_unit_mmap_buf_t *mmap_buf)
+ nxt_unit_mmap_buf_t *mmap_buf, char *local_buf)
{
uint32_t nchunks;
nxt_chunk_id_t c;
nxt_port_mmap_header_t *hdr;
+ if (size <= NXT_UNIT_MAX_PLAIN_SIZE) {
+ if (local_buf != NULL) {
+ mmap_buf->free_ptr = NULL;
+ mmap_buf->plain_ptr = local_buf;
+
+ } else {
+ mmap_buf->free_ptr = malloc(size + sizeof(nxt_port_msg_t));
+ if (nxt_slow_path(mmap_buf->free_ptr == NULL)) {
+ return NXT_UNIT_ERROR;
+ }
+
+ mmap_buf->plain_ptr = mmap_buf->free_ptr;
+ }
+
+ mmap_buf->hdr = NULL;
+ mmap_buf->buf.start = mmap_buf->plain_ptr + sizeof(nxt_port_msg_t);
+ mmap_buf->buf.free = mmap_buf->buf.start;
+ mmap_buf->buf.end = mmap_buf->buf.start + size;
+ mmap_buf->port_id = *port_id;
+
+ nxt_unit_debug(ctx, "outgoing plain buffer allocation: (%p, %d)",
+ mmap_buf->buf.start, (int) size);
+
+ return NXT_UNIT_OK;
+ }
+
nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
hdr = nxt_unit_mmap_get(ctx, process, port_id, &c, nchunks);
@@ -2778,6 +2876,7 @@ nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
mmap_buf->buf.free = mmap_buf->buf.start;
mmap_buf->buf.end = mmap_buf->buf.start + nchunks * PORT_MMAP_CHUNK_SIZE;
mmap_buf->port_id = *port_id;
+ mmap_buf->free_ptr = NULL;
nxt_unit_debug(ctx, "outgoing mmap allocation: (%d,%d,%d)",
(int) hdr->id, (int) c,
@@ -3020,6 +3119,22 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
incoming_tail = &recv_msg->incoming_buf;
+ for (; mmap_msg < end; mmap_msg++) {
+ b = nxt_unit_mmap_buf_get(ctx);
+ if (nxt_slow_path(b == NULL)) {
+ nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf",
+ recv_msg->stream);
+
+ return NXT_UNIT_ERROR;
+ }
+
+ nxt_unit_mmap_buf_insert(incoming_tail, b);
+ incoming_tail = &b->next;
+ }
+
+ b = recv_msg->incoming_buf;
+ mmap_msg = recv_msg->start;
+
pthread_mutex_lock(&process->incoming.mutex);
for (; mmap_msg < end; mmap_msg++) {
@@ -3043,26 +3158,13 @@ nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
recv_msg->size = size;
}
- b = nxt_unit_mmap_buf_get(ctx);
- if (nxt_slow_path(b == NULL)) {
- pthread_mutex_unlock(&process->incoming.mutex);
-
- nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf",
- recv_msg->stream);
-
- nxt_unit_mmap_release(hdr, start, size);
-
- return NXT_UNIT_ERROR;
- }
-
- nxt_unit_mmap_buf_insert(incoming_tail, b);
- incoming_tail = &b->next;
-
b->buf.start = start;
b->buf.free = start;
b->buf.end = b->buf.start + size;
b->hdr = hdr;
+ b = b->next;
+
nxt_unit_debug(ctx, "#%"PRIu32": mmap_read: [%p,%d] %d->%d,(%d,%d,%d)",
recv_msg->stream,
start, (int) size,