diff options
-rw-r--r-- | src/nxt_port.h | 2 | ||||
-rw-r--r-- | src/nxt_port_socket.c | 142 |
2 files changed, 102 insertions, 42 deletions
diff --git a/src/nxt_port.h b/src/nxt_port.h index f6679bb2..fdb5a561 100644 --- a/src/nxt_port.h +++ b/src/nxt_port.h @@ -118,8 +118,6 @@ typedef struct { nxt_port_msg_t port_msg; nxt_work_t work; - nxt_event_engine_t *engine; - nxt_mp_t *mem_pool; } nxt_port_send_msg_t; diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c index d5ed493d..880cdf0f 100644 --- a/src/nxt_port_socket.c +++ b/src/nxt_port_socket.c @@ -142,9 +142,9 @@ nxt_port_release_send_msg(nxt_task_t *task, void *obj, void *data) engine = data; #if (NXT_DEBUG) - if (nxt_slow_path(data != msg->engine)) { + if (nxt_slow_path(data != msg->work.data)) { nxt_log_alert(task->log, "release msg data (%p) != msg->engine (%p)", - data, msg->engine); + data, msg->work.data); nxt_abort(); } #endif @@ -159,29 +159,28 @@ nxt_port_release_send_msg(nxt_task_t *task, void *obj, void *data) return; } - nxt_mp_release(msg->mem_pool, obj); + nxt_mp_release(engine->mem_pool, obj); } -nxt_int_t -nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, - nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b) +static nxt_port_send_msg_t * +nxt_port_msg_create(nxt_task_t *task, nxt_port_send_msg_t *m) { nxt_port_send_msg_t *msg; msg = nxt_mp_retain(task->thread->engine->mem_pool, sizeof(nxt_port_send_msg_t)); if (nxt_slow_path(msg == NULL)) { - return NXT_ERROR; + return NULL; } msg->link.next = NULL; msg->link.prev = NULL; - msg->buf = b; - msg->fd = fd; - msg->close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0; - msg->share = 0; + msg->buf = m->buf; + msg->fd = m->fd; + msg->close_fd = m->close_fd; + msg->port_msg = m->port_msg; msg->work.next = NULL; msg->work.handler = nxt_port_release_send_msg; @@ -189,26 +188,77 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, msg->work.obj = msg; msg->work.data = task->thread->engine; - msg->engine = task->thread->engine; - msg->mem_pool = msg->engine->mem_pool; + return msg; +} - msg->port_msg.stream = stream; - msg->port_msg.pid = nxt_pid; - msg->port_msg.reply_port = reply_port; - msg->port_msg.type = type & NXT_PORT_MSG_MASK; - msg->port_msg.last = (type & NXT_PORT_MSG_LAST) != 0; - msg->port_msg.mmap = 0; - nxt_thread_mutex_lock(&port->write_mutex); +static nxt_port_send_msg_t * +nxt_port_msg_push(nxt_task_t *task, nxt_port_t *port, nxt_port_send_msg_t *msg) +{ + if (msg->work.data == NULL) { + msg = nxt_port_msg_create(task, msg); + } - nxt_queue_insert_tail(&port->messages, &msg->link); + if (msg != NULL) { + nxt_queue_insert_tail(&port->messages, &msg->link); + } - nxt_thread_mutex_unlock(&port->write_mutex); + return msg; +} + + +static nxt_port_send_msg_t * +nxt_port_msg_first(nxt_task_t *task, nxt_port_t *port, nxt_port_send_msg_t *msg) +{ + nxt_queue_link_t *lnk; + + lnk = nxt_queue_first(&port->messages); + + if (lnk == nxt_queue_tail(&port->messages)) { + return msg; + } + + return nxt_queue_link_data(lnk, nxt_port_send_msg_t, link); +} + + +nxt_int_t +nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, + nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b) +{ + nxt_port_send_msg_t msg, *res; + + msg.link.next = NULL; + msg.link.prev = NULL; + + msg.buf = b; + msg.fd = fd; + msg.close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0; + msg.share = 0; + + msg.port_msg.stream = stream; + msg.port_msg.pid = nxt_pid; + msg.port_msg.reply_port = reply_port; + msg.port_msg.type = type & NXT_PORT_MSG_MASK; + msg.port_msg.last = (type & NXT_PORT_MSG_LAST) != 0; + msg.port_msg.mmap = 0; - nxt_port_use(task, port, 1); + msg.work.data = NULL; if (port->socket.write_ready) { - nxt_port_write_handler(task, &port->socket, NULL); + nxt_port_write_handler(task, &port->socket, &msg); + } else { + nxt_thread_mutex_lock(&port->write_mutex); + + res = nxt_port_msg_push(task, port, &msg); + + nxt_thread_mutex_unlock(&port->write_mutex); + + if (res == NULL) { + return NXT_ERROR; + } + + nxt_port_use(task, port, 1); } return NXT_OK; @@ -239,7 +289,6 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) nxt_port_t *port; struct iovec *iov; nxt_work_queue_t *wq; - nxt_queue_link_t *link; nxt_port_method_t m; nxt_port_send_msg_t *msg; nxt_sendbuf_coalesce_t sb; @@ -254,16 +303,16 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) iov = port->iov; + wq = &task->thread->engine->fast_work_queue; + do { - link = nxt_queue_first(&port->messages); + msg = nxt_port_msg_first(task, port, data); - if (link == nxt_queue_tail(&port->messages)) { + if (msg == NULL) { block_write = 1; goto unlock_mutex; } - msg = nxt_queue_link_data(link, nxt_port_send_msg_t, link); - iov[0].iov_base = &msg->port_msg; iov[0].iov_len = sizeof(nxt_port_msg_t); @@ -315,8 +364,6 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) msg->fd = -1; } - wq = &task->thread->engine->fast_work_queue; - msg->buf = nxt_sendbuf_completion(task, wq, msg->buf, plain_size, m == NXT_PORT_METHOD_MMAP); @@ -330,18 +377,34 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) if (msg->share >= port->max_share) { msg->share = 0; - nxt_queue_remove(link); - nxt_queue_insert_tail(&port->messages, link); + + if (msg->link.next != NULL) { + nxt_queue_remove(&msg->link); + use_delta--; + } + data = NULL; + + if (nxt_port_msg_push(task, port, msg) != NULL) { + use_delta++; + } } } else { - nxt_queue_remove(link); - use_delta--; - nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg, - msg->engine); + if (msg->link.next != NULL) { + nxt_queue_remove(&msg->link); + use_delta--; + nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg, + msg->work.data); + } + data = NULL; } } else if (nxt_slow_path(n == NXT_ERROR)) { + if (msg->link.next == NULL) { + if (nxt_port_msg_push(task, port, msg) != NULL) { + use_delta++; + } + } goto fail; } @@ -359,8 +422,7 @@ fail: use_delta++; - nxt_work_queue_add(&task->thread->engine->fast_work_queue, - nxt_port_error_handler, task, &port->socket, + nxt_work_queue_add(wq, nxt_port_error_handler, task, &port->socket, &port->socket); unlock_mutex: @@ -595,7 +657,7 @@ nxt_port_error_handler(nxt_task_t *task, void *obj, void *data) nxt_queue_remove(&msg->link); use_delta--; nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg, - msg->engine); + msg->work.data); } nxt_queue_loop; |