summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/nxt_port.h2
-rw-r--r--src/nxt_port_socket.c142
2 files changed, 102 insertions, 42 deletions
diff --git a/src/nxt_port.h b/src/nxt_port.h
index f6679bb2..fdb5a561 100644
--- a/src/nxt_port.h
+++ b/src/nxt_port.h
@@ -118,8 +118,6 @@ typedef struct {
nxt_port_msg_t port_msg;
nxt_work_t work;
- nxt_event_engine_t *engine;
- nxt_mp_t *mem_pool;
} nxt_port_send_msg_t;
diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c
index d5ed493d..880cdf0f 100644
--- a/src/nxt_port_socket.c
+++ b/src/nxt_port_socket.c
@@ -142,9 +142,9 @@ nxt_port_release_send_msg(nxt_task_t *task, void *obj, void *data)
engine = data;
#if (NXT_DEBUG)
- if (nxt_slow_path(data != msg->engine)) {
+ if (nxt_slow_path(data != msg->work.data)) {
nxt_log_alert(task->log, "release msg data (%p) != msg->engine (%p)",
- data, msg->engine);
+ data, msg->work.data);
nxt_abort();
}
#endif
@@ -159,29 +159,28 @@ nxt_port_release_send_msg(nxt_task_t *task, void *obj, void *data)
return;
}
- nxt_mp_release(msg->mem_pool, obj);
+ nxt_mp_release(engine->mem_pool, obj);
}
-nxt_int_t
-nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
- nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b)
+static nxt_port_send_msg_t *
+nxt_port_msg_create(nxt_task_t *task, nxt_port_send_msg_t *m)
{
nxt_port_send_msg_t *msg;
msg = nxt_mp_retain(task->thread->engine->mem_pool,
sizeof(nxt_port_send_msg_t));
if (nxt_slow_path(msg == NULL)) {
- return NXT_ERROR;
+ return NULL;
}
msg->link.next = NULL;
msg->link.prev = NULL;
- msg->buf = b;
- msg->fd = fd;
- msg->close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0;
- msg->share = 0;
+ msg->buf = m->buf;
+ msg->fd = m->fd;
+ msg->close_fd = m->close_fd;
+ msg->port_msg = m->port_msg;
msg->work.next = NULL;
msg->work.handler = nxt_port_release_send_msg;
@@ -189,26 +188,77 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
msg->work.obj = msg;
msg->work.data = task->thread->engine;
- msg->engine = task->thread->engine;
- msg->mem_pool = msg->engine->mem_pool;
+ return msg;
+}
- msg->port_msg.stream = stream;
- msg->port_msg.pid = nxt_pid;
- msg->port_msg.reply_port = reply_port;
- msg->port_msg.type = type & NXT_PORT_MSG_MASK;
- msg->port_msg.last = (type & NXT_PORT_MSG_LAST) != 0;
- msg->port_msg.mmap = 0;
- nxt_thread_mutex_lock(&port->write_mutex);
+static nxt_port_send_msg_t *
+nxt_port_msg_push(nxt_task_t *task, nxt_port_t *port, nxt_port_send_msg_t *msg)
+{
+ if (msg->work.data == NULL) {
+ msg = nxt_port_msg_create(task, msg);
+ }
- nxt_queue_insert_tail(&port->messages, &msg->link);
+ if (msg != NULL) {
+ nxt_queue_insert_tail(&port->messages, &msg->link);
+ }
- nxt_thread_mutex_unlock(&port->write_mutex);
+ return msg;
+}
+
+
+static nxt_port_send_msg_t *
+nxt_port_msg_first(nxt_task_t *task, nxt_port_t *port, nxt_port_send_msg_t *msg)
+{
+ nxt_queue_link_t *lnk;
+
+ lnk = nxt_queue_first(&port->messages);
+
+ if (lnk == nxt_queue_tail(&port->messages)) {
+ return msg;
+ }
+
+ return nxt_queue_link_data(lnk, nxt_port_send_msg_t, link);
+}
+
+
+nxt_int_t
+nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
+ nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b)
+{
+ nxt_port_send_msg_t msg, *res;
+
+ msg.link.next = NULL;
+ msg.link.prev = NULL;
+
+ msg.buf = b;
+ msg.fd = fd;
+ msg.close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0;
+ msg.share = 0;
+
+ msg.port_msg.stream = stream;
+ msg.port_msg.pid = nxt_pid;
+ msg.port_msg.reply_port = reply_port;
+ msg.port_msg.type = type & NXT_PORT_MSG_MASK;
+ msg.port_msg.last = (type & NXT_PORT_MSG_LAST) != 0;
+ msg.port_msg.mmap = 0;
- nxt_port_use(task, port, 1);
+ msg.work.data = NULL;
if (port->socket.write_ready) {
- nxt_port_write_handler(task, &port->socket, NULL);
+ nxt_port_write_handler(task, &port->socket, &msg);
+ } else {
+ nxt_thread_mutex_lock(&port->write_mutex);
+
+ res = nxt_port_msg_push(task, port, &msg);
+
+ nxt_thread_mutex_unlock(&port->write_mutex);
+
+ if (res == NULL) {
+ return NXT_ERROR;
+ }
+
+ nxt_port_use(task, port, 1);
}
return NXT_OK;
@@ -239,7 +289,6 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
nxt_port_t *port;
struct iovec *iov;
nxt_work_queue_t *wq;
- nxt_queue_link_t *link;
nxt_port_method_t m;
nxt_port_send_msg_t *msg;
nxt_sendbuf_coalesce_t sb;
@@ -254,16 +303,16 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
iov = port->iov;
+ wq = &task->thread->engine->fast_work_queue;
+
do {
- link = nxt_queue_first(&port->messages);
+ msg = nxt_port_msg_first(task, port, data);
- if (link == nxt_queue_tail(&port->messages)) {
+ if (msg == NULL) {
block_write = 1;
goto unlock_mutex;
}
- msg = nxt_queue_link_data(link, nxt_port_send_msg_t, link);
-
iov[0].iov_base = &msg->port_msg;
iov[0].iov_len = sizeof(nxt_port_msg_t);
@@ -315,8 +364,6 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
msg->fd = -1;
}
- wq = &task->thread->engine->fast_work_queue;
-
msg->buf = nxt_sendbuf_completion(task, wq, msg->buf, plain_size,
m == NXT_PORT_METHOD_MMAP);
@@ -330,18 +377,34 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
if (msg->share >= port->max_share) {
msg->share = 0;
- nxt_queue_remove(link);
- nxt_queue_insert_tail(&port->messages, link);
+
+ if (msg->link.next != NULL) {
+ nxt_queue_remove(&msg->link);
+ use_delta--;
+ }
+ data = NULL;
+
+ if (nxt_port_msg_push(task, port, msg) != NULL) {
+ use_delta++;
+ }
}
} else {
- nxt_queue_remove(link);
- use_delta--;
- nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg,
- msg->engine);
+ if (msg->link.next != NULL) {
+ nxt_queue_remove(&msg->link);
+ use_delta--;
+ nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg,
+ msg->work.data);
+ }
+ data = NULL;
}
} else if (nxt_slow_path(n == NXT_ERROR)) {
+ if (msg->link.next == NULL) {
+ if (nxt_port_msg_push(task, port, msg) != NULL) {
+ use_delta++;
+ }
+ }
goto fail;
}
@@ -359,8 +422,7 @@ fail:
use_delta++;
- nxt_work_queue_add(&task->thread->engine->fast_work_queue,
- nxt_port_error_handler, task, &port->socket,
+ nxt_work_queue_add(wq, nxt_port_error_handler, task, &port->socket,
&port->socket);
unlock_mutex:
@@ -595,7 +657,7 @@ nxt_port_error_handler(nxt_task_t *task, void *obj, void *data)
nxt_queue_remove(&msg->link);
use_delta--;
nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg,
- msg->engine);
+ msg->work.data);
} nxt_queue_loop;