summaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/nxt_port_socket.c97
1 files changed, 84 insertions, 13 deletions
diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c
index 9d8096b2..3cf2e79a 100644
--- a/src/nxt_port_socket.c
+++ b/src/nxt_port_socket.c
@@ -6,8 +6,16 @@
#include <nxt_main.h>
#include <nxt_port_queue.h>
+#include <nxt_port_memory_int.h>
+#define NXT_PORT_MAX_ENQUEUE_BUF_SIZE \
+ (int) (NXT_PORT_QUEUE_MSG_SIZE - sizeof(nxt_port_msg_t))
+
+
+static nxt_bool_t nxt_port_can_enqueue_buf(nxt_buf_t *b);
+static uint8_t nxt_port_enqueue_buf(nxt_task_t *task, nxt_port_msg_t *pm,
+ void *qbuf, nxt_buf_t *b);
static nxt_int_t nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port,
nxt_port_send_msg_t *msg);
static nxt_port_send_msg_t *nxt_port_msg_alloc(nxt_port_send_msg_t *m);
@@ -151,10 +159,13 @@ nxt_port_socket_write2(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
nxt_buf_t *b)
{
int notify;
- uint8_t *p;
+ uint8_t qmsg_size;
nxt_int_t res;
nxt_port_send_msg_t msg;
- uint8_t qmsg[NXT_PORT_QUEUE_MSG_SIZE];
+ struct {
+ nxt_port_msg_t pm;
+ uint8_t buf[NXT_PORT_MAX_ENQUEUE_BUF_SIZE];
+ } qmsg;
msg.link.next = NULL;
msg.link.prev = NULL;
@@ -177,21 +188,31 @@ nxt_port_socket_write2(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
if (port->queue != NULL && type != _NXT_PORT_MSG_READ_QUEUE) {
- if (fd == -1
- && (b == NULL
- || nxt_buf_mem_used_size(&b->mem)
- <= (int) (NXT_PORT_QUEUE_MSG_SIZE - sizeof(nxt_port_msg_t))))
- {
- p = nxt_cpymem(qmsg, &msg.port_msg, sizeof(nxt_port_msg_t));
+ if (fd == -1 && nxt_port_can_enqueue_buf(b)) {
+ qmsg.pm = msg.port_msg;
+
+ qmsg_size = sizeof(qmsg.pm);
+
if (b != NULL) {
- p = nxt_cpymem(p, b->mem.pos, nxt_buf_mem_used_size(&b->mem));
+ qmsg_size += nxt_port_enqueue_buf(task, &qmsg.pm, qmsg.buf, b);
}
- res = nxt_port_queue_send(port->queue, qmsg, p - qmsg, &notify);
+ res = nxt_port_queue_send(port->queue, &qmsg, qmsg_size, &notify);
nxt_debug(task, "port{%d,%d} %d: enqueue %d notify %d, %d",
(int) port->pid, (int) port->id, port->socket.fd,
- (int) (p - qmsg), notify, res);
+ (int) qmsg_size, notify, res);
+
+ if (b != NULL && nxt_fast_path(res == NXT_OK)) {
+ if (qmsg.pm.mmap) {
+ b->is_port_mmap_sent = 1;
+ }
+
+ b->mem.pos = b->mem.free;
+
+ nxt_work_queue_add(&task->thread->engine->fast_work_queue,
+ b->completion_handler, task, b, b->parent);
+ }
if (notify == 0) {
return res;
@@ -201,9 +222,9 @@ nxt_port_socket_write2(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
msg.buf = NULL;
} else {
- qmsg[0] = _NXT_PORT_MSG_READ_SOCKET;
+ qmsg.buf[0] = _NXT_PORT_MSG_READ_SOCKET;
- res = nxt_port_queue_send(port->queue, qmsg, 1, &notify);
+ res = nxt_port_queue_send(port->queue, qmsg.buf, 1, &notify);
nxt_debug(task, "port{%d,%d} %d: enqueue 1 notify %d, %d",
(int) port->pid, (int) port->id, port->socket.fd,
@@ -225,6 +246,56 @@ nxt_port_socket_write2(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
}
+static nxt_bool_t
+nxt_port_can_enqueue_buf(nxt_buf_t *b)
+{
+ if (b == NULL) {
+ return 1;
+ }
+
+ if (b->next != NULL) {
+ return 0;
+ }
+
+ return (nxt_buf_mem_used_size(&b->mem) <= NXT_PORT_MAX_ENQUEUE_BUF_SIZE
+ || nxt_buf_is_port_mmap(b));
+}
+
+
+static uint8_t
+nxt_port_enqueue_buf(nxt_task_t *task, nxt_port_msg_t *pm, void *qbuf,
+ nxt_buf_t *b)
+{
+ ssize_t size;
+ nxt_port_mmap_msg_t *mm;
+ nxt_port_mmap_header_t *hdr;
+ nxt_port_mmap_handler_t *mmap_handler;
+
+ size = nxt_buf_mem_used_size(&b->mem);
+
+ if (size <= NXT_PORT_MAX_ENQUEUE_BUF_SIZE) {
+ nxt_memcpy(qbuf, b->mem.pos, size);
+
+ return size;
+ }
+
+ mmap_handler = b->parent;
+ hdr = mmap_handler->hdr;
+ mm = qbuf;
+
+ mm->mmap_id = hdr->id;
+ mm->chunk_id = nxt_port_mmap_chunk_id(hdr, b->mem.pos);
+ mm->size = nxt_buf_mem_used_size(&b->mem);
+
+ pm->mmap = 1;
+
+ nxt_debug(task, "mmap_msg={%D, %D, %D}", mm->mmap_id, mm->chunk_id,
+ mm->size);
+
+ return sizeof(nxt_port_mmap_msg_t);
+}
+
+
static nxt_int_t
nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port,
nxt_port_send_msg_t *msg)