summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2017-10-04 14:59:35 +0300
committerMax Romanov <max.romanov@nginx.com>2017-10-04 14:59:35 +0300
commitebbe89bd5c22ef903e5cd1a898ad75c6739ae4bc (patch)
treeb9f5ad9dde7c95561946cf6531ef6203691aa3f5
parent6a64533fa3b96bb64bfde4615e40376d65a292cb (diff)
downloadunit-ebbe89bd5c22ef903e5cd1a898ad75c6739ae4bc.tar.gz
unit-ebbe89bd5c22ef903e5cd1a898ad75c6739ae4bc.tar.bz2
Optimized send message allocations.
For empty write queue cases, it is possible to avoid allocation and enqueue send message structures. Send message initialized on stack and passed to write handler. If immediate write fails, send message allocated from engine pool and enqueued.
Diffstat (limited to '')
-rw-r--r--src/nxt_port.h2
-rw-r--r--src/nxt_port_socket.c142
2 files changed, 102 insertions, 42 deletions
diff --git a/src/nxt_port.h b/src/nxt_port.h
index f6679bb2..fdb5a561 100644
--- a/src/nxt_port.h
+++ b/src/nxt_port.h
@@ -118,8 +118,6 @@ typedef struct {
nxt_port_msg_t port_msg;
nxt_work_t work;
- nxt_event_engine_t *engine;
- nxt_mp_t *mem_pool;
} nxt_port_send_msg_t;
diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c
index d5ed493d..880cdf0f 100644
--- a/src/nxt_port_socket.c
+++ b/src/nxt_port_socket.c
@@ -142,9 +142,9 @@ nxt_port_release_send_msg(nxt_task_t *task, void *obj, void *data)
engine = data;
#if (NXT_DEBUG)
- if (nxt_slow_path(data != msg->engine)) {
+ if (nxt_slow_path(data != msg->work.data)) {
nxt_log_alert(task->log, "release msg data (%p) != msg->engine (%p)",
- data, msg->engine);
+ data, msg->work.data);
nxt_abort();
}
#endif
@@ -159,29 +159,28 @@ nxt_port_release_send_msg(nxt_task_t *task, void *obj, void *data)
return;
}
- nxt_mp_release(msg->mem_pool, obj);
+ nxt_mp_release(engine->mem_pool, obj);
}
-nxt_int_t
-nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
- nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b)
+static nxt_port_send_msg_t *
+nxt_port_msg_create(nxt_task_t *task, nxt_port_send_msg_t *m)
{
nxt_port_send_msg_t *msg;
msg = nxt_mp_retain(task->thread->engine->mem_pool,
sizeof(nxt_port_send_msg_t));
if (nxt_slow_path(msg == NULL)) {
- return NXT_ERROR;
+ return NULL;
}
msg->link.next = NULL;
msg->link.prev = NULL;
- msg->buf = b;
- msg->fd = fd;
- msg->close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0;
- msg->share = 0;
+ msg->buf = m->buf;
+ msg->fd = m->fd;
+ msg->close_fd = m->close_fd;
+ msg->port_msg = m->port_msg;
msg->work.next = NULL;
msg->work.handler = nxt_port_release_send_msg;
@@ -189,26 +188,77 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
msg->work.obj = msg;
msg->work.data = task->thread->engine;
- msg->engine = task->thread->engine;
- msg->mem_pool = msg->engine->mem_pool;
+ return msg;
+}
- msg->port_msg.stream = stream;
- msg->port_msg.pid = nxt_pid;
- msg->port_msg.reply_port = reply_port;
- msg->port_msg.type = type & NXT_PORT_MSG_MASK;
- msg->port_msg.last = (type & NXT_PORT_MSG_LAST) != 0;
- msg->port_msg.mmap = 0;
- nxt_thread_mutex_lock(&port->write_mutex);
+static nxt_port_send_msg_t *
+nxt_port_msg_push(nxt_task_t *task, nxt_port_t *port, nxt_port_send_msg_t *msg)
+{
+ if (msg->work.data == NULL) {
+ msg = nxt_port_msg_create(task, msg);
+ }
- nxt_queue_insert_tail(&port->messages, &msg->link);
+ if (msg != NULL) {
+ nxt_queue_insert_tail(&port->messages, &msg->link);
+ }
- nxt_thread_mutex_unlock(&port->write_mutex);
+ return msg;
+}
+
+
+static nxt_port_send_msg_t *
+nxt_port_msg_first(nxt_task_t *task, nxt_port_t *port, nxt_port_send_msg_t *msg)
+{
+ nxt_queue_link_t *lnk;
+
+ lnk = nxt_queue_first(&port->messages);
+
+ if (lnk == nxt_queue_tail(&port->messages)) {
+ return msg;
+ }
+
+ return nxt_queue_link_data(lnk, nxt_port_send_msg_t, link);
+}
+
+
+nxt_int_t
+nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
+ nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b)
+{
+ nxt_port_send_msg_t msg, *res;
+
+ msg.link.next = NULL;
+ msg.link.prev = NULL;
+
+ msg.buf = b;
+ msg.fd = fd;
+ msg.close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0;
+ msg.share = 0;
+
+ msg.port_msg.stream = stream;
+ msg.port_msg.pid = nxt_pid;
+ msg.port_msg.reply_port = reply_port;
+ msg.port_msg.type = type & NXT_PORT_MSG_MASK;
+ msg.port_msg.last = (type & NXT_PORT_MSG_LAST) != 0;
+ msg.port_msg.mmap = 0;
- nxt_port_use(task, port, 1);
+ msg.work.data = NULL;
if (port->socket.write_ready) {
- nxt_port_write_handler(task, &port->socket, NULL);
+ nxt_port_write_handler(task, &port->socket, &msg);
+ } else {
+ nxt_thread_mutex_lock(&port->write_mutex);
+
+ res = nxt_port_msg_push(task, port, &msg);
+
+ nxt_thread_mutex_unlock(&port->write_mutex);
+
+ if (res == NULL) {
+ return NXT_ERROR;
+ }
+
+ nxt_port_use(task, port, 1);
}
return NXT_OK;
@@ -239,7 +289,6 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
nxt_port_t *port;
struct iovec *iov;
nxt_work_queue_t *wq;
- nxt_queue_link_t *link;
nxt_port_method_t m;
nxt_port_send_msg_t *msg;
nxt_sendbuf_coalesce_t sb;
@@ -254,16 +303,16 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
iov = port->iov;
+ wq = &task->thread->engine->fast_work_queue;
+
do {
- link = nxt_queue_first(&port->messages);
+ msg = nxt_port_msg_first(task, port, data);
- if (link == nxt_queue_tail(&port->messages)) {
+ if (msg == NULL) {
block_write = 1;
goto unlock_mutex;
}
- msg = nxt_queue_link_data(link, nxt_port_send_msg_t, link);
-
iov[0].iov_base = &msg->port_msg;
iov[0].iov_len = sizeof(nxt_port_msg_t);
@@ -315,8 +364,6 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
msg->fd = -1;
}
- wq = &task->thread->engine->fast_work_queue;
-
msg->buf = nxt_sendbuf_completion(task, wq, msg->buf, plain_size,
m == NXT_PORT_METHOD_MMAP);
@@ -330,18 +377,34 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
if (msg->share >= port->max_share) {
msg->share = 0;
- nxt_queue_remove(link);
- nxt_queue_insert_tail(&port->messages, link);
+
+ if (msg->link.next != NULL) {
+ nxt_queue_remove(&msg->link);
+ use_delta--;
+ }
+ data = NULL;
+
+ if (nxt_port_msg_push(task, port, msg) != NULL) {
+ use_delta++;
+ }
}
} else {
- nxt_queue_remove(link);
- use_delta--;
- nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg,
- msg->engine);
+ if (msg->link.next != NULL) {
+ nxt_queue_remove(&msg->link);
+ use_delta--;
+ nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg,
+ msg->work.data);
+ }
+ data = NULL;
}
} else if (nxt_slow_path(n == NXT_ERROR)) {
+ if (msg->link.next == NULL) {
+ if (nxt_port_msg_push(task, port, msg) != NULL) {
+ use_delta++;
+ }
+ }
goto fail;
}
@@ -359,8 +422,7 @@ fail:
use_delta++;
- nxt_work_queue_add(&task->thread->engine->fast_work_queue,
- nxt_port_error_handler, task, &port->socket,
+ nxt_work_queue_add(wq, nxt_port_error_handler, task, &port->socket,
&port->socket);
unlock_mutex:
@@ -595,7 +657,7 @@ nxt_port_error_handler(nxt_task_t *task, void *obj, void *data)
nxt_queue_remove(&msg->link);
use_delta--;
nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg,
- msg->engine);
+ msg->work.data);
} nxt_queue_loop;