summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_port_socket.c
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2017-10-04 14:58:47 +0300
committerMax Romanov <max.romanov@nginx.com>2017-10-04 14:58:47 +0300
commit6a64533fa3b96bb64bfde4615e40376d65a292cb (patch)
treea18ed8059158d833290519e1135209747e28af21 /src/nxt_port_socket.c
parent414d508e04d26ebef0e3e1ba4ed518b11d3af1a0 (diff)
downloadunit-6a64533fa3b96bb64bfde4615e40376d65a292cb.tar.gz
unit-6a64533fa3b96bb64bfde4615e40376d65a292cb.tar.bz2
Introducing use counters for port and app. Thread safe port write.
Use counter helps to simplify logic around port and application free. Port 'post' function introduced to simplify post execution of particular function to original port engine's thread. Write message queue is protected by mutex which makes port write operation thread safe.
Diffstat (limited to '')
-rw-r--r--src/nxt_port_socket.c104
1 files changed, 70 insertions, 34 deletions
diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c
index 75706459..d5ed493d 100644
--- a/src/nxt_port_socket.c
+++ b/src/nxt_port_socket.c
@@ -169,31 +169,6 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
{
nxt_port_send_msg_t *msg;
- nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) {
-
- if ((type & NXT_PORT_MSG_SYNC) != 0) {
- msg->opened = 0;
- continue;
- }
-
- if (msg->port_msg.stream == stream
- && msg->port_msg.reply_port == reply_port
- && msg->port_msg.last == 0
- && msg->opened) {
-
- /*
- * An fd is ignored since a file descriptor
- * must be sent only in the first message of a stream.
- */
- nxt_buf_chain_add(&msg->buf, b);
-
- msg->port_msg.last |= (type & NXT_PORT_MSG_LAST) != 0;
-
- return NXT_OK;
- }
-
- } nxt_queue_loop;
-
msg = nxt_mp_retain(task->thread->engine->mem_pool,
sizeof(nxt_port_send_msg_t));
if (nxt_slow_path(msg == NULL)) {
@@ -207,7 +182,6 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
msg->fd = fd;
msg->close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0;
msg->share = 0;
- msg->opened = 1;
msg->work.next = NULL;
msg->work.handler = nxt_port_release_send_msg;
@@ -225,8 +199,14 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
msg->port_msg.last = (type & NXT_PORT_MSG_LAST) != 0;
msg->port_msg.mmap = 0;
+ nxt_thread_mutex_lock(&port->write_mutex);
+
nxt_queue_insert_tail(&port->messages, &msg->link);
+ nxt_thread_mutex_unlock(&port->write_mutex);
+
+ nxt_port_use(task, port, 1);
+
if (port->socket.write_ready) {
nxt_port_write_handler(task, &port->socket, NULL);
}
@@ -236,10 +216,26 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
static void
+nxt_port_fd_block_write(nxt_task_t *task, nxt_port_t *port, void *data)
+{
+ nxt_fd_event_block_write(task->thread->engine, &port->socket);
+}
+
+
+static void
+nxt_port_fd_enable_write(nxt_task_t *task, nxt_port_t *port, void *data)
+{
+ nxt_fd_event_enable_write(task->thread->engine, &port->socket);
+}
+
+
+static void
nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
{
+ int use_delta;
size_t plain_size;
ssize_t n;
+ nxt_bool_t block_write, enable_write;
nxt_port_t *port;
struct iovec *iov;
nxt_work_queue_t *wq;
@@ -250,14 +246,20 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
port = nxt_container_of(obj, nxt_port_t, socket);
+ block_write = 0;
+ enable_write = 0;
+ use_delta = 0;
+
+ nxt_thread_mutex_lock(&port->write_mutex);
+
iov = port->iov;
do {
link = nxt_queue_first(&port->messages);
if (link == nxt_queue_tail(&port->messages)) {
- nxt_fd_event_block_write(task->thread->engine, &port->socket);
- return;
+ block_write = 1;
+ goto unlock_mutex;
}
msg = nxt_queue_link_data(link, nxt_port_send_msg_t, link);
@@ -334,6 +336,7 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
} else {
nxt_queue_remove(link);
+ use_delta--;
nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg,
msg->engine);
}
@@ -347,16 +350,33 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
} while (port->socket.write_ready);
if (nxt_fd_event_is_disabled(port->socket.write)) {
- /* TODO task->thread->engine or port->engine ? */
- nxt_fd_event_enable_write(task->thread->engine, &port->socket);
+ enable_write = 1;
}
- return;
+ goto unlock_mutex;
fail:
+ use_delta++;
+
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
- nxt_port_error_handler, task, &port->socket, NULL);
+ nxt_port_error_handler, task, &port->socket,
+ &port->socket);
+
+unlock_mutex:
+ nxt_thread_mutex_unlock(&port->write_mutex);
+
+ if (block_write && nxt_fd_event_is_active(port->socket.write)) {
+ nxt_port_post(task, port, nxt_port_fd_block_write, NULL);
+ }
+
+ if (enable_write) {
+ nxt_port_post(task, port, nxt_port_fd_enable_write, NULL);
+ }
+
+ if (use_delta != 0) {
+ nxt_port_use(task, port, use_delta);
+ }
}
@@ -541,6 +561,7 @@ nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b)
static void
nxt_port_error_handler(nxt_task_t *task, void *obj, void *data)
{
+ int use_delta;
nxt_buf_t *b;
nxt_port_t *port;
nxt_work_queue_t *wq;
@@ -551,9 +572,17 @@ nxt_port_error_handler(nxt_task_t *task, void *obj, void *data)
port = nxt_container_of(obj, nxt_port_t, socket);
- nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) {
+ use_delta = 0;
+
+ if (obj == data) {
+ use_delta--;
+ }
+
+ wq = &task->thread->engine->fast_work_queue;
- wq = &task->thread->engine->fast_work_queue;
+ nxt_thread_mutex_lock(&port->write_mutex);
+
+ nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) {
for(b = msg->buf; b != NULL; b = b->next) {
if (nxt_buf_is_sync(b)) {
@@ -564,8 +593,15 @@ nxt_port_error_handler(nxt_task_t *task, void *obj, void *data)
}
nxt_queue_remove(&msg->link);
+ use_delta--;
nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg,
msg->engine);
} nxt_queue_loop;
+
+ nxt_thread_mutex_unlock(&port->write_mutex);
+
+ if (use_delta != 0) {
+ nxt_port_use(task, port, use_delta);
+ }
}