summaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2021-03-25 16:55:16 +0300
committerMax Romanov <max.romanov@nginx.com>2021-03-25 16:55:16 +0300
commitb8052b050e0111400c59f35e76c013d8ee553ea9 (patch)
tree39009fc66870ba1ac60bbe0e0ee996e8b5201562 /src
parent067c6096e2ec306c4fdae6993140fbbdf4f9a6fd (diff)
downloadunit-b8052b050e0111400c59f35e76c013d8ee553ea9.tar.gz
unit-b8052b050e0111400c59f35e76c013d8ee553ea9.tar.bz2
Fixing shm buffer leakage when sending over the port queue.
When the shm buffer is sent over the port queue, it needs to be completed because it's sent over the port socket.
Diffstat (limited to '')
-rw-r--r--src/nxt_port_socket.c97
1 files changed, 84 insertions, 13 deletions
diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c
index 9d8096b2..3cf2e79a 100644
--- a/src/nxt_port_socket.c
+++ b/src/nxt_port_socket.c
@@ -6,8 +6,16 @@
#include <nxt_main.h>
#include <nxt_port_queue.h>
+#include <nxt_port_memory_int.h>
+#define NXT_PORT_MAX_ENQUEUE_BUF_SIZE \
+ (int) (NXT_PORT_QUEUE_MSG_SIZE - sizeof(nxt_port_msg_t))
+
+
+static nxt_bool_t nxt_port_can_enqueue_buf(nxt_buf_t *b);
+static uint8_t nxt_port_enqueue_buf(nxt_task_t *task, nxt_port_msg_t *pm,
+ void *qbuf, nxt_buf_t *b);
static nxt_int_t nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port,
nxt_port_send_msg_t *msg);
static nxt_port_send_msg_t *nxt_port_msg_alloc(nxt_port_send_msg_t *m);
@@ -151,10 +159,13 @@ nxt_port_socket_write2(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
nxt_buf_t *b)
{
int notify;
- uint8_t *p;
+ uint8_t qmsg_size;
nxt_int_t res;
nxt_port_send_msg_t msg;
- uint8_t qmsg[NXT_PORT_QUEUE_MSG_SIZE];
+ struct {
+ nxt_port_msg_t pm;
+ uint8_t buf[NXT_PORT_MAX_ENQUEUE_BUF_SIZE];
+ } qmsg;
msg.link.next = NULL;
msg.link.prev = NULL;
@@ -177,21 +188,31 @@ nxt_port_socket_write2(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
if (port->queue != NULL && type != _NXT_PORT_MSG_READ_QUEUE) {
- if (fd == -1
- && (b == NULL
- || nxt_buf_mem_used_size(&b->mem)
- <= (int) (NXT_PORT_QUEUE_MSG_SIZE - sizeof(nxt_port_msg_t))))
- {
- p = nxt_cpymem(qmsg, &msg.port_msg, sizeof(nxt_port_msg_t));
+ if (fd == -1 && nxt_port_can_enqueue_buf(b)) {
+ qmsg.pm = msg.port_msg;
+
+ qmsg_size = sizeof(qmsg.pm);
+
if (b != NULL) {
- p = nxt_cpymem(p, b->mem.pos, nxt_buf_mem_used_size(&b->mem));
+ qmsg_size += nxt_port_enqueue_buf(task, &qmsg.pm, qmsg.buf, b);
}
- res = nxt_port_queue_send(port->queue, qmsg, p - qmsg, &notify);
+ res = nxt_port_queue_send(port->queue, &qmsg, qmsg_size, &notify);
nxt_debug(task, "port{%d,%d} %d: enqueue %d notify %d, %d",
(int) port->pid, (int) port->id, port->socket.fd,
- (int) (p - qmsg), notify, res);
+ (int) qmsg_size, notify, res);
+
+ if (b != NULL && nxt_fast_path(res == NXT_OK)) {
+ if (qmsg.pm.mmap) {
+ b->is_port_mmap_sent = 1;
+ }
+
+ b->mem.pos = b->mem.free;
+
+ nxt_work_queue_add(&task->thread->engine->fast_work_queue,
+ b->completion_handler, task, b, b->parent);
+ }
if (notify == 0) {
return res;
@@ -201,9 +222,9 @@ nxt_port_socket_write2(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
msg.buf = NULL;
} else {
- qmsg[0] = _NXT_PORT_MSG_READ_SOCKET;
+ qmsg.buf[0] = _NXT_PORT_MSG_READ_SOCKET;
- res = nxt_port_queue_send(port->queue, qmsg, 1, &notify);
+ res = nxt_port_queue_send(port->queue, qmsg.buf, 1, &notify);
nxt_debug(task, "port{%d,%d} %d: enqueue 1 notify %d, %d",
(int) port->pid, (int) port->id, port->socket.fd,
@@ -225,6 +246,56 @@ nxt_port_socket_write2(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
}
+static nxt_bool_t
+nxt_port_can_enqueue_buf(nxt_buf_t *b)
+{
+ if (b == NULL) {
+ return 1;
+ }
+
+ if (b->next != NULL) {
+ return 0;
+ }
+
+ return (nxt_buf_mem_used_size(&b->mem) <= NXT_PORT_MAX_ENQUEUE_BUF_SIZE
+ || nxt_buf_is_port_mmap(b));
+}
+
+
+static uint8_t
+nxt_port_enqueue_buf(nxt_task_t *task, nxt_port_msg_t *pm, void *qbuf,
+ nxt_buf_t *b)
+{
+ ssize_t size;
+ nxt_port_mmap_msg_t *mm;
+ nxt_port_mmap_header_t *hdr;
+ nxt_port_mmap_handler_t *mmap_handler;
+
+ size = nxt_buf_mem_used_size(&b->mem);
+
+ if (size <= NXT_PORT_MAX_ENQUEUE_BUF_SIZE) {
+ nxt_memcpy(qbuf, b->mem.pos, size);
+
+ return size;
+ }
+
+ mmap_handler = b->parent;
+ hdr = mmap_handler->hdr;
+ mm = qbuf;
+
+ mm->mmap_id = hdr->id;
+ mm->chunk_id = nxt_port_mmap_chunk_id(hdr, b->mem.pos);
+ mm->size = nxt_buf_mem_used_size(&b->mem);
+
+ pm->mmap = 1;
+
+ nxt_debug(task, "mmap_msg={%D, %D, %D}", mm->mmap_id, mm->chunk_id,
+ mm->size);
+
+ return sizeof(nxt_port_mmap_msg_t);
+}
+
+
static nxt_int_t
nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port,
nxt_port_send_msg_t *msg)