summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_port_socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nxt_port_socket.c')
-rw-r--r--src/nxt_port_socket.c104
1 files changed, 70 insertions, 34 deletions
diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c
index 75706459..d5ed493d 100644
--- a/src/nxt_port_socket.c
+++ b/src/nxt_port_socket.c
@@ -169,31 +169,6 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
{
nxt_port_send_msg_t *msg;
- nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) {
-
- if ((type & NXT_PORT_MSG_SYNC) != 0) {
- msg->opened = 0;
- continue;
- }
-
- if (msg->port_msg.stream == stream
- && msg->port_msg.reply_port == reply_port
- && msg->port_msg.last == 0
- && msg->opened) {
-
- /*
- * An fd is ignored since a file descriptor
- * must be sent only in the first message of a stream.
- */
- nxt_buf_chain_add(&msg->buf, b);
-
- msg->port_msg.last |= (type & NXT_PORT_MSG_LAST) != 0;
-
- return NXT_OK;
- }
-
- } nxt_queue_loop;
-
msg = nxt_mp_retain(task->thread->engine->mem_pool,
sizeof(nxt_port_send_msg_t));
if (nxt_slow_path(msg == NULL)) {
@@ -207,7 +182,6 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
msg->fd = fd;
msg->close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0;
msg->share = 0;
- msg->opened = 1;
msg->work.next = NULL;
msg->work.handler = nxt_port_release_send_msg;
@@ -225,8 +199,14 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
msg->port_msg.last = (type & NXT_PORT_MSG_LAST) != 0;
msg->port_msg.mmap = 0;
+ nxt_thread_mutex_lock(&port->write_mutex);
+
nxt_queue_insert_tail(&port->messages, &msg->link);
+ nxt_thread_mutex_unlock(&port->write_mutex);
+
+ nxt_port_use(task, port, 1);
+
if (port->socket.write_ready) {
nxt_port_write_handler(task, &port->socket, NULL);
}
@@ -236,10 +216,26 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
static void
+nxt_port_fd_block_write(nxt_task_t *task, nxt_port_t *port, void *data)
+{
+ nxt_fd_event_block_write(task->thread->engine, &port->socket);
+}
+
+
+static void
+nxt_port_fd_enable_write(nxt_task_t *task, nxt_port_t *port, void *data)
+{
+ nxt_fd_event_enable_write(task->thread->engine, &port->socket);
+}
+
+
+static void
nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
{
+ int use_delta;
size_t plain_size;
ssize_t n;
+ nxt_bool_t block_write, enable_write;
nxt_port_t *port;
struct iovec *iov;
nxt_work_queue_t *wq;
@@ -250,14 +246,20 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
port = nxt_container_of(obj, nxt_port_t, socket);
+ block_write = 0;
+ enable_write = 0;
+ use_delta = 0;
+
+ nxt_thread_mutex_lock(&port->write_mutex);
+
iov = port->iov;
do {
link = nxt_queue_first(&port->messages);
if (link == nxt_queue_tail(&port->messages)) {
- nxt_fd_event_block_write(task->thread->engine, &port->socket);
- return;
+ block_write = 1;
+ goto unlock_mutex;
}
msg = nxt_queue_link_data(link, nxt_port_send_msg_t, link);
@@ -334,6 +336,7 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
} else {
nxt_queue_remove(link);
+ use_delta--;
nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg,
msg->engine);
}
@@ -347,16 +350,33 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
} while (port->socket.write_ready);
if (nxt_fd_event_is_disabled(port->socket.write)) {
- /* TODO task->thread->engine or port->engine ? */
- nxt_fd_event_enable_write(task->thread->engine, &port->socket);
+ enable_write = 1;
}
- return;
+ goto unlock_mutex;
fail:
+ use_delta++;
+
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
- nxt_port_error_handler, task, &port->socket, NULL);
+ nxt_port_error_handler, task, &port->socket,
+ &port->socket);
+
+unlock_mutex:
+ nxt_thread_mutex_unlock(&port->write_mutex);
+
+ if (block_write && nxt_fd_event_is_active(port->socket.write)) {
+ nxt_port_post(task, port, nxt_port_fd_block_write, NULL);
+ }
+
+ if (enable_write) {
+ nxt_port_post(task, port, nxt_port_fd_enable_write, NULL);
+ }
+
+ if (use_delta != 0) {
+ nxt_port_use(task, port, use_delta);
+ }
}
@@ -541,6 +561,7 @@ nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b)
static void
nxt_port_error_handler(nxt_task_t *task, void *obj, void *data)
{
+ int use_delta;
nxt_buf_t *b;
nxt_port_t *port;
nxt_work_queue_t *wq;
@@ -551,9 +572,17 @@ nxt_port_error_handler(nxt_task_t *task, void *obj, void *data)
port = nxt_container_of(obj, nxt_port_t, socket);
- nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) {
+ use_delta = 0;
+
+ if (obj == data) {
+ use_delta--;
+ }
+
+ wq = &task->thread->engine->fast_work_queue;
- wq = &task->thread->engine->fast_work_queue;
+ nxt_thread_mutex_lock(&port->write_mutex);
+
+ nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) {
for(b = msg->buf; b != NULL; b = b->next) {
if (nxt_buf_is_sync(b)) {
@@ -564,8 +593,15 @@ nxt_port_error_handler(nxt_task_t *task, void *obj, void *data)
}
nxt_queue_remove(&msg->link);
+ use_delta--;
nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg,
msg->engine);
} nxt_queue_loop;
+
+ nxt_thread_mutex_unlock(&port->write_mutex);
+
+ if (use_delta != 0) {
+ nxt_port_use(task, port, use_delta);
+ }
}