diff options
-rw-r--r-- | docs/changes.xml | 7 | ||||
-rw-r--r-- | src/nxt_port_socket.c | 97 |
2 files changed, 91 insertions, 13 deletions
diff --git a/docs/changes.xml b/docs/changes.xml index cca6bbc4..89c4630d 100644 --- a/docs/changes.xml +++ b/docs/changes.xml @@ -38,6 +38,13 @@ appeared in 1.6. <change type="bugfix"> <para> +a descriptor and memory leak occurring in the router process when processing +small WebSocket frames from a client; the bug had appeared in 1.19.0. +</para> +</change> + +<change type="bugfix"> +<para> a descriptor leak occurring in the router process when removing or reconfiguring an application; the bug had appeared in 1.19.0. </para> diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c index 9d8096b2..3cf2e79a 100644 --- a/src/nxt_port_socket.c +++ b/src/nxt_port_socket.c @@ -6,8 +6,16 @@ #include <nxt_main.h> #include <nxt_port_queue.h> +#include <nxt_port_memory_int.h> +#define NXT_PORT_MAX_ENQUEUE_BUF_SIZE \ + (int) (NXT_PORT_QUEUE_MSG_SIZE - sizeof(nxt_port_msg_t)) + + +static nxt_bool_t nxt_port_can_enqueue_buf(nxt_buf_t *b); +static uint8_t nxt_port_enqueue_buf(nxt_task_t *task, nxt_port_msg_t *pm, + void *qbuf, nxt_buf_t *b); static nxt_int_t nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port, nxt_port_send_msg_t *msg); static nxt_port_send_msg_t *nxt_port_msg_alloc(nxt_port_send_msg_t *m); @@ -151,10 +159,13 @@ nxt_port_socket_write2(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, nxt_buf_t *b) { int notify; - uint8_t *p; + uint8_t qmsg_size; nxt_int_t res; nxt_port_send_msg_t msg; - uint8_t qmsg[NXT_PORT_QUEUE_MSG_SIZE]; + struct { + nxt_port_msg_t pm; + uint8_t buf[NXT_PORT_MAX_ENQUEUE_BUF_SIZE]; + } qmsg; msg.link.next = NULL; msg.link.prev = NULL; @@ -177,21 +188,31 @@ nxt_port_socket_write2(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, if (port->queue != NULL && type != _NXT_PORT_MSG_READ_QUEUE) { - if (fd == -1 - && (b == NULL - || nxt_buf_mem_used_size(&b->mem) - <= (int) (NXT_PORT_QUEUE_MSG_SIZE - sizeof(nxt_port_msg_t)))) - { - p = nxt_cpymem(qmsg, &msg.port_msg, sizeof(nxt_port_msg_t)); + if (fd == -1 && nxt_port_can_enqueue_buf(b)) { + qmsg.pm = msg.port_msg; + + qmsg_size = sizeof(qmsg.pm); + if (b != NULL) { - p = nxt_cpymem(p, b->mem.pos, nxt_buf_mem_used_size(&b->mem)); + qmsg_size += nxt_port_enqueue_buf(task, &qmsg.pm, qmsg.buf, b); } - res = nxt_port_queue_send(port->queue, qmsg, p - qmsg, ¬ify); + res = nxt_port_queue_send(port->queue, &qmsg, qmsg_size, ¬ify); nxt_debug(task, "port{%d,%d} %d: enqueue %d notify %d, %d", (int) port->pid, (int) port->id, port->socket.fd, - (int) (p - qmsg), notify, res); + (int) qmsg_size, notify, res); + + if (b != NULL && nxt_fast_path(res == NXT_OK)) { + if (qmsg.pm.mmap) { + b->is_port_mmap_sent = 1; + } + + b->mem.pos = b->mem.free; + + nxt_work_queue_add(&task->thread->engine->fast_work_queue, + b->completion_handler, task, b, b->parent); + } if (notify == 0) { return res; @@ -201,9 +222,9 @@ nxt_port_socket_write2(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, msg.buf = NULL; } else { - qmsg[0] = _NXT_PORT_MSG_READ_SOCKET; + qmsg.buf[0] = _NXT_PORT_MSG_READ_SOCKET; - res = nxt_port_queue_send(port->queue, qmsg, 1, ¬ify); + res = nxt_port_queue_send(port->queue, qmsg.buf, 1, ¬ify); nxt_debug(task, "port{%d,%d} %d: enqueue 1 notify %d, %d", (int) port->pid, (int) port->id, port->socket.fd, @@ -225,6 +246,56 @@ nxt_port_socket_write2(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, } +static nxt_bool_t +nxt_port_can_enqueue_buf(nxt_buf_t *b) +{ + if (b == NULL) { + return 1; + } + + if (b->next != NULL) { + return 0; + } + + return (nxt_buf_mem_used_size(&b->mem) <= NXT_PORT_MAX_ENQUEUE_BUF_SIZE + || nxt_buf_is_port_mmap(b)); +} + + +static uint8_t +nxt_port_enqueue_buf(nxt_task_t *task, nxt_port_msg_t *pm, void *qbuf, + nxt_buf_t *b) +{ + ssize_t size; + nxt_port_mmap_msg_t *mm; + nxt_port_mmap_header_t *hdr; + nxt_port_mmap_handler_t *mmap_handler; + + size = nxt_buf_mem_used_size(&b->mem); + + if (size <= NXT_PORT_MAX_ENQUEUE_BUF_SIZE) { + nxt_memcpy(qbuf, b->mem.pos, size); + + return size; + } + + mmap_handler = b->parent; + hdr = mmap_handler->hdr; + mm = qbuf; + + mm->mmap_id = hdr->id; + mm->chunk_id = nxt_port_mmap_chunk_id(hdr, b->mem.pos); + mm->size = nxt_buf_mem_used_size(&b->mem); + + pm->mmap = 1; + + nxt_debug(task, "mmap_msg={%D, %D, %D}", mm->mmap_id, mm->chunk_id, + mm->size); + + return sizeof(nxt_port_mmap_msg_t); +} + + static nxt_int_t nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port, nxt_port_send_msg_t *msg) |