diff options
Diffstat (limited to 'src/nxt_port_socket.c')
-rw-r--r-- | src/nxt_port_socket.c | 104 |
1 files changed, 70 insertions, 34 deletions
diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c index 75706459..d5ed493d 100644 --- a/src/nxt_port_socket.c +++ b/src/nxt_port_socket.c @@ -169,31 +169,6 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, { nxt_port_send_msg_t *msg; - nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) { - - if ((type & NXT_PORT_MSG_SYNC) != 0) { - msg->opened = 0; - continue; - } - - if (msg->port_msg.stream == stream - && msg->port_msg.reply_port == reply_port - && msg->port_msg.last == 0 - && msg->opened) { - - /* - * An fd is ignored since a file descriptor - * must be sent only in the first message of a stream. - */ - nxt_buf_chain_add(&msg->buf, b); - - msg->port_msg.last |= (type & NXT_PORT_MSG_LAST) != 0; - - return NXT_OK; - } - - } nxt_queue_loop; - msg = nxt_mp_retain(task->thread->engine->mem_pool, sizeof(nxt_port_send_msg_t)); if (nxt_slow_path(msg == NULL)) { @@ -207,7 +182,6 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, msg->fd = fd; msg->close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0; msg->share = 0; - msg->opened = 1; msg->work.next = NULL; msg->work.handler = nxt_port_release_send_msg; @@ -225,8 +199,14 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, msg->port_msg.last = (type & NXT_PORT_MSG_LAST) != 0; msg->port_msg.mmap = 0; + nxt_thread_mutex_lock(&port->write_mutex); + nxt_queue_insert_tail(&port->messages, &msg->link); + nxt_thread_mutex_unlock(&port->write_mutex); + + nxt_port_use(task, port, 1); + if (port->socket.write_ready) { nxt_port_write_handler(task, &port->socket, NULL); } @@ -236,10 +216,26 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, static void +nxt_port_fd_block_write(nxt_task_t *task, nxt_port_t *port, void *data) +{ + nxt_fd_event_block_write(task->thread->engine, &port->socket); +} + + +static void +nxt_port_fd_enable_write(nxt_task_t *task, nxt_port_t *port, void *data) +{ + nxt_fd_event_enable_write(task->thread->engine, &port->socket); +} + + +static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) { + int use_delta; size_t plain_size; ssize_t n; + nxt_bool_t block_write, enable_write; nxt_port_t *port; struct iovec *iov; nxt_work_queue_t *wq; @@ -250,14 +246,20 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) port = nxt_container_of(obj, nxt_port_t, socket); + block_write = 0; + enable_write = 0; + use_delta = 0; + + nxt_thread_mutex_lock(&port->write_mutex); + iov = port->iov; do { link = nxt_queue_first(&port->messages); if (link == nxt_queue_tail(&port->messages)) { - nxt_fd_event_block_write(task->thread->engine, &port->socket); - return; + block_write = 1; + goto unlock_mutex; } msg = nxt_queue_link_data(link, nxt_port_send_msg_t, link); @@ -334,6 +336,7 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) } else { nxt_queue_remove(link); + use_delta--; nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg, msg->engine); } @@ -347,16 +350,33 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) } while (port->socket.write_ready); if (nxt_fd_event_is_disabled(port->socket.write)) { - /* TODO task->thread->engine or port->engine ? */ - nxt_fd_event_enable_write(task->thread->engine, &port->socket); + enable_write = 1; } - return; + goto unlock_mutex; fail: + use_delta++; + nxt_work_queue_add(&task->thread->engine->fast_work_queue, - nxt_port_error_handler, task, &port->socket, NULL); + nxt_port_error_handler, task, &port->socket, + &port->socket); + +unlock_mutex: + nxt_thread_mutex_unlock(&port->write_mutex); + + if (block_write && nxt_fd_event_is_active(port->socket.write)) { + nxt_port_post(task, port, nxt_port_fd_block_write, NULL); + } + + if (enable_write) { + nxt_port_post(task, port, nxt_port_fd_enable_write, NULL); + } + + if (use_delta != 0) { + nxt_port_use(task, port, use_delta); + } } @@ -541,6 +561,7 @@ nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b) static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data) { + int use_delta; nxt_buf_t *b; nxt_port_t *port; nxt_work_queue_t *wq; @@ -551,9 +572,17 @@ nxt_port_error_handler(nxt_task_t *task, void *obj, void *data) port = nxt_container_of(obj, nxt_port_t, socket); - nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) { + use_delta = 0; + + if (obj == data) { + use_delta--; + } + + wq = &task->thread->engine->fast_work_queue; - wq = &task->thread->engine->fast_work_queue; + nxt_thread_mutex_lock(&port->write_mutex); + + nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) { for(b = msg->buf; b != NULL; b = b->next) { if (nxt_buf_is_sync(b)) { @@ -564,8 +593,15 @@ nxt_port_error_handler(nxt_task_t *task, void *obj, void *data) } nxt_queue_remove(&msg->link); + use_delta--; nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg, msg->engine); } nxt_queue_loop; + + nxt_thread_mutex_unlock(&port->write_mutex); + + if (use_delta != 0) { + nxt_port_use(task, port, use_delta); + } } |