summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2021-03-25 16:55:16 +0300
committerMax Romanov <max.romanov@nginx.com>2021-03-25 16:55:16 +0300
commitb8052b050e0111400c59f35e76c013d8ee553ea9 (patch)
tree39009fc66870ba1ac60bbe0e0ee996e8b5201562
parent067c6096e2ec306c4fdae6993140fbbdf4f9a6fd (diff)
downloadunit-b8052b050e0111400c59f35e76c013d8ee553ea9.tar.gz
unit-b8052b050e0111400c59f35e76c013d8ee553ea9.tar.bz2
Fixing shm buffer leakage when sending over the port queue.
When the shm buffer is sent over the port queue, it needs to be completed because it's sent over the port socket.
Diffstat (limited to '')
-rw-r--r--docs/changes.xml7
-rw-r--r--src/nxt_port_socket.c97
2 files changed, 91 insertions, 13 deletions
diff --git a/docs/changes.xml b/docs/changes.xml
index cca6bbc4..89c4630d 100644
--- a/docs/changes.xml
+++ b/docs/changes.xml
@@ -38,6 +38,13 @@ appeared in 1.6.
<change type="bugfix">
<para>
+a descriptor and memory leak occurring in the router process when processing
+small WebSocket frames from a client; the bug had appeared in 1.19.0.
+</para>
+</change>
+
+<change type="bugfix">
+<para>
a descriptor leak occurring in the router process when removing or
reconfiguring an application; the bug had appeared in 1.19.0.
</para>
diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c
index 9d8096b2..3cf2e79a 100644
--- a/src/nxt_port_socket.c
+++ b/src/nxt_port_socket.c
@@ -6,8 +6,16 @@
#include <nxt_main.h>
#include <nxt_port_queue.h>
+#include <nxt_port_memory_int.h>
+#define NXT_PORT_MAX_ENQUEUE_BUF_SIZE \
+ (int) (NXT_PORT_QUEUE_MSG_SIZE - sizeof(nxt_port_msg_t))
+
+
+static nxt_bool_t nxt_port_can_enqueue_buf(nxt_buf_t *b);
+static uint8_t nxt_port_enqueue_buf(nxt_task_t *task, nxt_port_msg_t *pm,
+ void *qbuf, nxt_buf_t *b);
static nxt_int_t nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port,
nxt_port_send_msg_t *msg);
static nxt_port_send_msg_t *nxt_port_msg_alloc(nxt_port_send_msg_t *m);
@@ -151,10 +159,13 @@ nxt_port_socket_write2(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
nxt_buf_t *b)
{
int notify;
- uint8_t *p;
+ uint8_t qmsg_size;
nxt_int_t res;
nxt_port_send_msg_t msg;
- uint8_t qmsg[NXT_PORT_QUEUE_MSG_SIZE];
+ struct {
+ nxt_port_msg_t pm;
+ uint8_t buf[NXT_PORT_MAX_ENQUEUE_BUF_SIZE];
+ } qmsg;
msg.link.next = NULL;
msg.link.prev = NULL;
@@ -177,21 +188,31 @@ nxt_port_socket_write2(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
if (port->queue != NULL && type != _NXT_PORT_MSG_READ_QUEUE) {
- if (fd == -1
- && (b == NULL
- || nxt_buf_mem_used_size(&b->mem)
- <= (int) (NXT_PORT_QUEUE_MSG_SIZE - sizeof(nxt_port_msg_t))))
- {
- p = nxt_cpymem(qmsg, &msg.port_msg, sizeof(nxt_port_msg_t));
+ if (fd == -1 && nxt_port_can_enqueue_buf(b)) {
+ qmsg.pm = msg.port_msg;
+
+ qmsg_size = sizeof(qmsg.pm);
+
if (b != NULL) {
- p = nxt_cpymem(p, b->mem.pos, nxt_buf_mem_used_size(&b->mem));
+ qmsg_size += nxt_port_enqueue_buf(task, &qmsg.pm, qmsg.buf, b);
}
- res = nxt_port_queue_send(port->queue, qmsg, p - qmsg, &notify);
+ res = nxt_port_queue_send(port->queue, &qmsg, qmsg_size, &notify);
nxt_debug(task, "port{%d,%d} %d: enqueue %d notify %d, %d",
(int) port->pid, (int) port->id, port->socket.fd,
- (int) (p - qmsg), notify, res);
+ (int) qmsg_size, notify, res);
+
+ if (b != NULL && nxt_fast_path(res == NXT_OK)) {
+ if (qmsg.pm.mmap) {
+ b->is_port_mmap_sent = 1;
+ }
+
+ b->mem.pos = b->mem.free;
+
+ nxt_work_queue_add(&task->thread->engine->fast_work_queue,
+ b->completion_handler, task, b, b->parent);
+ }
if (notify == 0) {
return res;
@@ -201,9 +222,9 @@ nxt_port_socket_write2(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
msg.buf = NULL;
} else {
- qmsg[0] = _NXT_PORT_MSG_READ_SOCKET;
+ qmsg.buf[0] = _NXT_PORT_MSG_READ_SOCKET;
- res = nxt_port_queue_send(port->queue, qmsg, 1, &notify);
+ res = nxt_port_queue_send(port->queue, qmsg.buf, 1, &notify);
nxt_debug(task, "port{%d,%d} %d: enqueue 1 notify %d, %d",
(int) port->pid, (int) port->id, port->socket.fd,
@@ -225,6 +246,56 @@ nxt_port_socket_write2(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
}
+static nxt_bool_t
+nxt_port_can_enqueue_buf(nxt_buf_t *b)
+{
+ if (b == NULL) {
+ return 1;
+ }
+
+ if (b->next != NULL) {
+ return 0;
+ }
+
+ return (nxt_buf_mem_used_size(&b->mem) <= NXT_PORT_MAX_ENQUEUE_BUF_SIZE
+ || nxt_buf_is_port_mmap(b));
+}
+
+
+static uint8_t
+nxt_port_enqueue_buf(nxt_task_t *task, nxt_port_msg_t *pm, void *qbuf,
+ nxt_buf_t *b)
+{
+ ssize_t size;
+ nxt_port_mmap_msg_t *mm;
+ nxt_port_mmap_header_t *hdr;
+ nxt_port_mmap_handler_t *mmap_handler;
+
+ size = nxt_buf_mem_used_size(&b->mem);
+
+ if (size <= NXT_PORT_MAX_ENQUEUE_BUF_SIZE) {
+ nxt_memcpy(qbuf, b->mem.pos, size);
+
+ return size;
+ }
+
+ mmap_handler = b->parent;
+ hdr = mmap_handler->hdr;
+ mm = qbuf;
+
+ mm->mmap_id = hdr->id;
+ mm->chunk_id = nxt_port_mmap_chunk_id(hdr, b->mem.pos);
+ mm->size = nxt_buf_mem_used_size(&b->mem);
+
+ pm->mmap = 1;
+
+ nxt_debug(task, "mmap_msg={%D, %D, %D}", mm->mmap_id, mm->chunk_id,
+ mm->size);
+
+ return sizeof(nxt_port_mmap_msg_t);
+}
+
+
static nxt_int_t
nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port,
nxt_port_send_msg_t *msg)