diff options
Diffstat (limited to '')
-rw-r--r-- | src/nxt_port_socket.c | 54 |
1 files changed, 38 insertions, 16 deletions
diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c index 30c64a94..01e46f93 100644 --- a/src/nxt_port_socket.c +++ b/src/nxt_port_socket.c @@ -114,6 +114,13 @@ nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port) port->socket.write_work_queue = &port->engine->fast_work_queue; port->socket.write_handler = nxt_port_write_handler; port->socket.error_handler = nxt_port_error_handler; + + if (port->iov == NULL) { + port->iov = nxt_mp_get(port->mem_pool, sizeof(struct iovec) * + NXT_IOBUF_MAX * 10); + port->mmsg_buf = nxt_mp_get(port->mem_pool, sizeof(uint32_t) * 3 * + NXT_IOBUF_MAX * 10); + } } @@ -224,20 +231,20 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) { + size_t plain_size; ssize_t n; nxt_port_t *port; - struct iovec iov[NXT_IOBUF_MAX * 10]; + struct iovec *iov; nxt_work_queue_t *wq; nxt_queue_link_t *link; nxt_port_method_t m; nxt_port_send_msg_t *msg; nxt_sendbuf_coalesce_t sb; - size_t plain_size; - nxt_buf_t *plain_buf; - port = nxt_container_of(obj, nxt_port_t, socket); + iov = port->iov; + do { link = nxt_queue_first(&port->messages); @@ -269,7 +276,6 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) nxt_sendbuf_mem_coalesce(task, &sb); plain_size = sb.size; - plain_buf = msg->buf; /* * Send through mmap enabled only when payload @@ -302,16 +308,6 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) wq = &task->thread->engine->fast_work_queue; - if (msg->buf != plain_buf) { - /* - * Complete crafted mmap_msgs buf and restore msg->buf - * for regular completion call. - */ - nxt_port_mmap_completion(task, wq, msg->buf); - - msg->buf = plain_buf; - } - msg->buf = nxt_sendbuf_completion(task, wq, msg->buf, plain_size); if (msg->buf != NULL) { @@ -330,7 +326,8 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) } else { nxt_queue_remove(link); - nxt_port_release_send_msg(task, msg, msg->engine); + nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg, + msg->engine); } } else if (nxt_slow_path(n == NXT_ERROR)) { @@ -536,6 +533,31 @@ nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b) static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data) { + nxt_buf_t *b; + nxt_port_t *port; + nxt_work_queue_t *wq; + nxt_port_send_msg_t *msg; + nxt_debug(task, "port error handler %p", obj); /* TODO */ + + port = nxt_container_of(obj, nxt_port_t, socket); + + nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) { + + wq = &task->thread->engine->fast_work_queue; + + for(b = msg->buf; b != NULL; b = b->next) { + if (nxt_buf_is_sync(b)) { + continue; + } + + nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent); + } + + nxt_queue_remove(&msg->link); + nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg, + msg->engine); + + } nxt_queue_loop; } |