diff options
author | Max Romanov <max.romanov@nginx.com> | 2021-03-25 16:55:16 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2021-03-25 16:55:16 +0300 |
commit | b8052b050e0111400c59f35e76c013d8ee553ea9 (patch) | |
tree | 39009fc66870ba1ac60bbe0e0ee996e8b5201562 | |
parent | 067c6096e2ec306c4fdae6993140fbbdf4f9a6fd (diff) | |
download | unit-b8052b050e0111400c59f35e76c013d8ee553ea9.tar.gz unit-b8052b050e0111400c59f35e76c013d8ee553ea9.tar.bz2 |
Fixing shm buffer leakage when sending over the port queue.
When the shm buffer is sent over the port queue, it needs to be completed
because it's sent over the port socket.
-rw-r--r-- | docs/changes.xml | 7 | ||||
-rw-r--r-- | src/nxt_port_socket.c | 97 |
2 files changed, 91 insertions, 13 deletions
diff --git a/docs/changes.xml b/docs/changes.xml index cca6bbc4..89c4630d 100644 --- a/docs/changes.xml +++ b/docs/changes.xml @@ -38,6 +38,13 @@ appeared in 1.6. <change type="bugfix"> <para> +a descriptor and memory leak occurring in the router process when processing +small WebSocket frames from a client; the bug had appeared in 1.19.0. +</para> +</change> + +<change type="bugfix"> +<para> a descriptor leak occurring in the router process when removing or reconfiguring an application; the bug had appeared in 1.19.0. </para> diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c index 9d8096b2..3cf2e79a 100644 --- a/src/nxt_port_socket.c +++ b/src/nxt_port_socket.c @@ -6,8 +6,16 @@ #include <nxt_main.h> #include <nxt_port_queue.h> +#include <nxt_port_memory_int.h> +#define NXT_PORT_MAX_ENQUEUE_BUF_SIZE \ + (int) (NXT_PORT_QUEUE_MSG_SIZE - sizeof(nxt_port_msg_t)) + + +static nxt_bool_t nxt_port_can_enqueue_buf(nxt_buf_t *b); +static uint8_t nxt_port_enqueue_buf(nxt_task_t *task, nxt_port_msg_t *pm, + void *qbuf, nxt_buf_t *b); static nxt_int_t nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port, nxt_port_send_msg_t *msg); static nxt_port_send_msg_t *nxt_port_msg_alloc(nxt_port_send_msg_t *m); @@ -151,10 +159,13 @@ nxt_port_socket_write2(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, nxt_buf_t *b) { int notify; - uint8_t *p; + uint8_t qmsg_size; nxt_int_t res; nxt_port_send_msg_t msg; - uint8_t qmsg[NXT_PORT_QUEUE_MSG_SIZE]; + struct { + nxt_port_msg_t pm; + uint8_t buf[NXT_PORT_MAX_ENQUEUE_BUF_SIZE]; + } qmsg; msg.link.next = NULL; msg.link.prev = NULL; @@ -177,21 +188,31 @@ nxt_port_socket_write2(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, if (port->queue != NULL && type != _NXT_PORT_MSG_READ_QUEUE) { - if (fd == -1 - && (b == NULL - || nxt_buf_mem_used_size(&b->mem) - <= (int) (NXT_PORT_QUEUE_MSG_SIZE - sizeof(nxt_port_msg_t)))) - { - p = nxt_cpymem(qmsg, &msg.port_msg, sizeof(nxt_port_msg_t)); + if (fd == -1 && nxt_port_can_enqueue_buf(b)) { + qmsg.pm = msg.port_msg; + + qmsg_size = sizeof(qmsg.pm); + if (b != NULL) { - p = nxt_cpymem(p, b->mem.pos, nxt_buf_mem_used_size(&b->mem)); + qmsg_size += nxt_port_enqueue_buf(task, &qmsg.pm, qmsg.buf, b); } - res = nxt_port_queue_send(port->queue, qmsg, p - qmsg, ¬ify); + res = nxt_port_queue_send(port->queue, &qmsg, qmsg_size, ¬ify); nxt_debug(task, "port{%d,%d} %d: enqueue %d notify %d, %d", (int) port->pid, (int) port->id, port->socket.fd, - (int) (p - qmsg), notify, res); + (int) qmsg_size, notify, res); + + if (b != NULL && nxt_fast_path(res == NXT_OK)) { + if (qmsg.pm.mmap) { + b->is_port_mmap_sent = 1; + } + + b->mem.pos = b->mem.free; + + nxt_work_queue_add(&task->thread->engine->fast_work_queue, + b->completion_handler, task, b, b->parent); + } if (notify == 0) { return res; @@ -201,9 +222,9 @@ nxt_port_socket_write2(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, msg.buf = NULL; } else { - qmsg[0] = _NXT_PORT_MSG_READ_SOCKET; + qmsg.buf[0] = _NXT_PORT_MSG_READ_SOCKET; - res = nxt_port_queue_send(port->queue, qmsg, 1, ¬ify); + res = nxt_port_queue_send(port->queue, qmsg.buf, 1, ¬ify); nxt_debug(task, "port{%d,%d} %d: enqueue 1 notify %d, %d", (int) port->pid, (int) port->id, port->socket.fd, @@ -225,6 +246,56 @@ nxt_port_socket_write2(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, } +static nxt_bool_t +nxt_port_can_enqueue_buf(nxt_buf_t *b) +{ + if (b == NULL) { + return 1; + } + + if (b->next != NULL) { + return 0; + } + + return (nxt_buf_mem_used_size(&b->mem) <= NXT_PORT_MAX_ENQUEUE_BUF_SIZE + || nxt_buf_is_port_mmap(b)); +} + + +static uint8_t +nxt_port_enqueue_buf(nxt_task_t *task, nxt_port_msg_t *pm, void *qbuf, + nxt_buf_t *b) +{ + ssize_t size; + nxt_port_mmap_msg_t *mm; + nxt_port_mmap_header_t *hdr; + nxt_port_mmap_handler_t *mmap_handler; + + size = nxt_buf_mem_used_size(&b->mem); + + if (size <= NXT_PORT_MAX_ENQUEUE_BUF_SIZE) { + nxt_memcpy(qbuf, b->mem.pos, size); + + return size; + } + + mmap_handler = b->parent; + hdr = mmap_handler->hdr; + mm = qbuf; + + mm->mmap_id = hdr->id; + mm->chunk_id = nxt_port_mmap_chunk_id(hdr, b->mem.pos); + mm->size = nxt_buf_mem_used_size(&b->mem); + + pm->mmap = 1; + + nxt_debug(task, "mmap_msg={%D, %D, %D}", mm->mmap_id, mm->chunk_id, + mm->size); + + return sizeof(nxt_port_mmap_msg_t); +} + + static nxt_int_t nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port, nxt_port_send_msg_t *msg) |