summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_port_socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nxt_port_socket.c')
-rw-r--r--src/nxt_port_socket.c308
1 files changed, 159 insertions, 149 deletions
diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c
index fe113a68..4edc423a 100644
--- a/src/nxt_port_socket.c
+++ b/src/nxt_port_socket.c
@@ -7,9 +7,15 @@
#include <nxt_main.h>
+static nxt_int_t nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port,
+ nxt_port_send_msg_t *msg);
+static nxt_port_send_msg_t *nxt_port_msg_alloc(nxt_port_send_msg_t *m);
static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data);
+static nxt_port_send_msg_t *nxt_port_msg_first(nxt_port_t *port);
static nxt_buf_t *nxt_port_buf_completion(nxt_task_t *task,
nxt_work_queue_t *wq, nxt_buf_t *b, size_t sent, nxt_bool_t mmap_mode);
+static nxt_port_send_msg_t *nxt_port_msg_insert_tail(nxt_port_t *port,
+ nxt_port_send_msg_t *msg);
static void nxt_port_read_handler(nxt_task_t *task, void *obj, void *data);
static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
nxt_port_recv_msg_t *msg);
@@ -116,13 +122,6 @@ nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port)
port->socket.write_work_queue = &port->engine->fast_work_queue;
port->socket.write_handler = nxt_port_write_handler;
port->socket.error_handler = nxt_port_error_handler;
-
- if (port->iov == NULL) {
- port->iov = nxt_mp_get(port->mem_pool,
- sizeof(struct iovec) * NXT_IOBUF_MAX * 10);
- port->mmsg_buf = nxt_mp_get(port->mem_pool,
- sizeof(uint32_t) * 3 * NXT_IOBUF_MAX * 10);
- }
}
@@ -135,109 +134,11 @@ nxt_port_write_close(nxt_port_t *port)
static void
-nxt_port_release_send_msg(nxt_task_t *task, void *obj, void *data)
-{
- nxt_event_engine_t *engine;
- nxt_port_send_msg_t *msg;
-
- msg = obj;
- engine = data;
-
- nxt_assert(data == msg->work.data);
-
- if (engine != task->thread->engine) {
-
- nxt_debug(task, "current thread is %PT, expected %PT",
- task->thread->tid, engine->task.thread->tid);
-
- nxt_event_engine_post(engine, &msg->work);
-
- return;
- }
-
- nxt_mp_free(engine->mem_pool, obj);
- nxt_mp_release(engine->mem_pool);
-}
-
-
-static nxt_port_send_msg_t *
-nxt_port_msg_create(nxt_task_t *task, nxt_port_send_msg_t *m)
-{
- nxt_mp_t *mp;
- nxt_port_send_msg_t *msg;
-
- mp = task->thread->engine->mem_pool;
-
- msg = nxt_mp_alloc(mp, sizeof(nxt_port_send_msg_t));
- if (nxt_slow_path(msg == NULL)) {
- return NULL;
- }
-
- nxt_mp_retain(mp);
-
- msg->link.next = NULL;
- msg->link.prev = NULL;
-
- msg->buf = m->buf;
- msg->share = m->share;
- msg->fd = m->fd;
- msg->close_fd = m->close_fd;
- msg->port_msg = m->port_msg;
-
- msg->work.next = NULL;
- msg->work.handler = nxt_port_release_send_msg;
- msg->work.task = task;
- msg->work.obj = msg;
- msg->work.data = task->thread->engine;
-
- return msg;
-}
-
-
-static nxt_port_send_msg_t *
-nxt_port_msg_insert_head(nxt_task_t *task, nxt_port_t *port,
- nxt_port_send_msg_t *msg)
+nxt_port_release_send_msg(nxt_port_send_msg_t *msg)
{
- if (msg->work.data == NULL) {
- msg = nxt_port_msg_create(task, msg);
+ if (msg->allocated) {
+ nxt_free(msg);
}
-
- if (msg != NULL) {
- nxt_queue_insert_head(&port->messages, &msg->link);
- }
-
- return msg;
-}
-
-
-static nxt_port_send_msg_t *
-nxt_port_msg_insert_tail(nxt_task_t *task, nxt_port_t *port,
- nxt_port_send_msg_t *msg)
-{
- if (msg->work.data == NULL) {
- msg = nxt_port_msg_create(task, msg);
- }
-
- if (msg != NULL) {
- nxt_queue_insert_tail(&port->messages, &msg->link);
- }
-
- return msg;
-}
-
-
-static nxt_port_send_msg_t *
-nxt_port_msg_first(nxt_task_t *task, nxt_port_t *port, nxt_port_send_msg_t *msg)
-{
- nxt_queue_link_t *lnk;
-
- lnk = nxt_queue_first(&port->messages);
-
- if (lnk == nxt_queue_tail(&port->messages)) {
- return msg;
- }
-
- return nxt_queue_link_data(lnk, nxt_port_send_msg_t, link);
}
@@ -246,15 +147,17 @@ nxt_port_socket_twrite(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b,
void *tracking)
{
- nxt_port_send_msg_t msg, *res;
+ nxt_int_t res;
+ nxt_port_send_msg_t msg;
msg.link.next = NULL;
msg.link.prev = NULL;
msg.buf = b;
+ msg.share = 0;
msg.fd = fd;
msg.close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0;
- msg.share = 0;
+ msg.allocated = 0;
if (tracking != NULL) {
nxt_port_mmap_tracking_write(msg.tracking_msg, tracking);
@@ -270,25 +173,63 @@ nxt_port_socket_twrite(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
msg.port_msg.mf = 0;
msg.port_msg.tracking = tracking != NULL;
- msg.work.data = NULL;
-
- if (port->socket.write_ready) {
+ res = nxt_port_msg_chk_insert(task, port, &msg);
+ if (nxt_fast_path(res == NXT_DECLINED)) {
nxt_port_write_handler(task, &port->socket, &msg);
- } else {
- nxt_thread_mutex_lock(&port->write_mutex);
+ res = NXT_OK;
+ }
+
+ return res;
+}
+
- res = nxt_port_msg_insert_tail(task, port, &msg);
+static nxt_int_t
+nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port,
+ nxt_port_send_msg_t *msg)
+{
+ nxt_int_t res;
+
+ nxt_thread_mutex_lock(&port->write_mutex);
+
+ if (nxt_fast_path(port->socket.write_ready
+ && nxt_queue_is_empty(&port->messages)))
+ {
+ res = NXT_DECLINED;
+
+ } else {
+ msg = nxt_port_msg_alloc(msg);
- nxt_thread_mutex_unlock(&port->write_mutex);
+ if (nxt_fast_path(msg != NULL)) {
+ nxt_queue_insert_tail(&port->messages, &msg->link);
+ nxt_port_use(task, port, 1);
+ res = NXT_OK;
- if (res == NULL) {
- return NXT_ERROR;
+ } else {
+ res = NXT_ERROR;
}
+ }
+
+ nxt_thread_mutex_unlock(&port->write_mutex);
+
+ return res;
+}
+
+
+static nxt_port_send_msg_t *
+nxt_port_msg_alloc(nxt_port_send_msg_t *m)
+{
+ nxt_port_send_msg_t *msg;
- nxt_port_use(task, port, 1);
+ msg = nxt_malloc(sizeof(nxt_port_send_msg_t));
+ if (nxt_slow_path(msg == NULL)) {
+ return NULL;
}
- return NXT_OK;
+ *msg = *m;
+
+ msg->allocated = 1;
+
+ return msg;
}
@@ -312,9 +253,10 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
int use_delta;
size_t plain_size;
ssize_t n;
+ uint32_t mmsg_buf[3 * NXT_IOBUF_MAX * 10];
nxt_bool_t block_write, enable_write;
nxt_port_t *port;
- struct iovec *iov;
+ struct iovec iov[NXT_IOBUF_MAX * 10];
nxt_work_queue_t *wq;
nxt_port_method_t m;
nxt_port_send_msg_t *msg;
@@ -326,20 +268,23 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
enable_write = 0;
use_delta = 0;
- nxt_thread_mutex_lock(&port->write_mutex);
-
- iov = port->iov;
-
wq = &task->thread->engine->fast_work_queue;
do {
- msg = nxt_port_msg_first(task, port, data);
+ if (data) {
+ msg = data;
- if (msg == NULL) {
- block_write = 1;
- goto unlock_mutex;
+ } else {
+ msg = nxt_port_msg_first(port);
+
+ if (msg == NULL) {
+ block_write = 1;
+ goto cleanup;
+ }
}
+next_fragment:
+
iov[0].iov_base = &msg->port_msg;
iov[0].iov_len = sizeof(nxt_port_msg_t);
@@ -377,7 +322,7 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
* is bigger than PORT_MMAP_MIN_SIZE.
*/
if (m == NXT_PORT_METHOD_MMAP && plain_size > PORT_MMAP_MIN_SIZE) {
- nxt_port_mmap_write(task, port, msg, &sb);
+ nxt_port_mmap_write(task, port, msg, &sb, mmsg_buf);
} else {
m = NXT_PORT_METHOD_PLAIN;
@@ -402,7 +347,7 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
}
msg->buf = nxt_port_buf_completion(task, wq, msg->buf, plain_size,
- m == NXT_PORT_METHOD_MMAP);
+ m == NXT_PORT_METHOD_MMAP);
if (msg->buf != NULL) {
nxt_debug(task, "port %d: frag stream #%uD", port->socket.fd,
@@ -421,36 +366,58 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
msg->share = 0;
if (msg->link.next != NULL) {
+ nxt_thread_mutex_lock(&port->write_mutex);
+
nxt_queue_remove(&msg->link);
- use_delta--;
- }
- data = NULL;
+ nxt_queue_insert_tail(&port->messages, &msg->link);
+
+ nxt_thread_mutex_unlock(&port->write_mutex);
+
+ } else {
+ msg = nxt_port_msg_insert_tail(port, msg);
+ if (nxt_slow_path(msg == NULL)) {
+ goto fail;
+ }
- if (nxt_port_msg_insert_tail(task, port, msg) != NULL) {
use_delta++;
}
+
+ } else {
+ goto next_fragment;
}
} else {
if (msg->link.next != NULL) {
+ nxt_thread_mutex_lock(&port->write_mutex);
+
nxt_queue_remove(&msg->link);
+ msg->link.next = NULL;
+
+ nxt_thread_mutex_unlock(&port->write_mutex);
+
use_delta--;
- nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg,
- msg->work.data);
}
- data = NULL;
+
+ nxt_port_release_send_msg(msg);
}
- } else {
- if (msg->link.next == NULL) {
- if (nxt_port_msg_insert_head(task, port, msg) != NULL) {
- use_delta++;
- }
+ if (data != NULL) {
+ goto cleanup;
}
+ } else {
if (nxt_slow_path(n == NXT_ERROR)) {
goto fail;
}
+
+ if (msg->link.next == NULL) {
+ msg = nxt_port_msg_insert_tail(port, msg);
+ if (nxt_slow_path(msg == NULL)) {
+ goto fail;
+ }
+
+ use_delta++;
+ }
}
} while (port->socket.write_ready);
@@ -459,7 +426,7 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
enable_write = 1;
}
- goto unlock_mutex;
+ goto cleanup;
fail:
@@ -468,8 +435,7 @@ fail:
nxt_work_queue_add(wq, nxt_port_error_handler, task, &port->socket,
&port->socket);
-unlock_mutex:
- nxt_thread_mutex_unlock(&port->write_mutex);
+cleanup:
if (block_write && nxt_fd_event_is_active(port->socket.write)) {
nxt_port_post(task, port, nxt_port_fd_block_write, NULL);
@@ -485,6 +451,29 @@ unlock_mutex:
}
+static nxt_port_send_msg_t *
+nxt_port_msg_first(nxt_port_t *port)
+{
+ nxt_queue_link_t *lnk;
+ nxt_port_send_msg_t *msg;
+
+ nxt_thread_mutex_lock(&port->write_mutex);
+
+ lnk = nxt_queue_first(&port->messages);
+
+ if (lnk == nxt_queue_tail(&port->messages)) {
+ msg = NULL;
+
+ } else {
+ msg = nxt_queue_link_data(lnk, nxt_port_send_msg_t, link);
+ }
+
+ nxt_thread_mutex_unlock(&port->write_mutex);
+
+ return msg;
+}
+
+
static nxt_buf_t *
nxt_port_buf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b,
size_t sent, nxt_bool_t mmap_mode)
@@ -546,6 +535,27 @@ nxt_port_buf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b,
}
+static nxt_port_send_msg_t *
+nxt_port_msg_insert_tail(nxt_port_t *port, nxt_port_send_msg_t *msg)
+{
+ if (msg->allocated == 0) {
+ msg = nxt_port_msg_alloc(msg);
+
+ if (nxt_slow_path(msg == NULL)) {
+ return NULL;
+ }
+ }
+
+ nxt_thread_mutex_lock(&port->write_mutex);
+
+ nxt_queue_insert_tail(&port->messages, &msg->link);
+
+ nxt_thread_mutex_unlock(&port->write_mutex);
+
+ return msg;
+}
+
+
void
nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port)
{
@@ -986,8 +996,8 @@ nxt_port_error_handler(nxt_task_t *task, void *obj, void *data)
nxt_queue_remove(&msg->link);
use_delta--;
- nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg,
- msg->work.data);
+
+ nxt_port_release_send_msg(msg);
} nxt_queue_loop;