diff options
author | Max Romanov <max.romanov@nginx.com> | 2017-10-04 14:59:35 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2017-10-04 14:59:35 +0300 |
commit | ebbe89bd5c22ef903e5cd1a898ad75c6739ae4bc (patch) | |
tree | b9f5ad9dde7c95561946cf6531ef6203691aa3f5 /src/nxt_port_socket.c | |
parent | 6a64533fa3b96bb64bfde4615e40376d65a292cb (diff) | |
download | unit-ebbe89bd5c22ef903e5cd1a898ad75c6739ae4bc.tar.gz unit-ebbe89bd5c22ef903e5cd1a898ad75c6739ae4bc.tar.bz2 |
Optimized send message allocations.
For empty write queue cases, it is possible to avoid allocation and enqueue
send message structures. Send message initialized on stack and passed to
write handler. If immediate write fails, send message allocated from engine
pool and enqueued.
Diffstat (limited to 'src/nxt_port_socket.c')
-rw-r--r-- | src/nxt_port_socket.c | 142 |
1 files changed, 102 insertions, 40 deletions
diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c index d5ed493d..880cdf0f 100644 --- a/src/nxt_port_socket.c +++ b/src/nxt_port_socket.c @@ -142,9 +142,9 @@ nxt_port_release_send_msg(nxt_task_t *task, void *obj, void *data) engine = data; #if (NXT_DEBUG) - if (nxt_slow_path(data != msg->engine)) { + if (nxt_slow_path(data != msg->work.data)) { nxt_log_alert(task->log, "release msg data (%p) != msg->engine (%p)", - data, msg->engine); + data, msg->work.data); nxt_abort(); } #endif @@ -159,29 +159,28 @@ nxt_port_release_send_msg(nxt_task_t *task, void *obj, void *data) return; } - nxt_mp_release(msg->mem_pool, obj); + nxt_mp_release(engine->mem_pool, obj); } -nxt_int_t -nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, - nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b) +static nxt_port_send_msg_t * +nxt_port_msg_create(nxt_task_t *task, nxt_port_send_msg_t *m) { nxt_port_send_msg_t *msg; msg = nxt_mp_retain(task->thread->engine->mem_pool, sizeof(nxt_port_send_msg_t)); if (nxt_slow_path(msg == NULL)) { - return NXT_ERROR; + return NULL; } msg->link.next = NULL; msg->link.prev = NULL; - msg->buf = b; - msg->fd = fd; - msg->close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0; - msg->share = 0; + msg->buf = m->buf; + msg->fd = m->fd; + msg->close_fd = m->close_fd; + msg->port_msg = m->port_msg; msg->work.next = NULL; msg->work.handler = nxt_port_release_send_msg; @@ -189,26 +188,77 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, msg->work.obj = msg; msg->work.data = task->thread->engine; - msg->engine = task->thread->engine; - msg->mem_pool = msg->engine->mem_pool; + return msg; +} - msg->port_msg.stream = stream; - msg->port_msg.pid = nxt_pid; - msg->port_msg.reply_port = reply_port; - msg->port_msg.type = type & NXT_PORT_MSG_MASK; - msg->port_msg.last = (type & NXT_PORT_MSG_LAST) != 0; - msg->port_msg.mmap = 0; - nxt_thread_mutex_lock(&port->write_mutex); +static nxt_port_send_msg_t * +nxt_port_msg_push(nxt_task_t *task, nxt_port_t *port, nxt_port_send_msg_t *msg) +{ + if (msg->work.data == NULL) { + msg = nxt_port_msg_create(task, msg); + } - nxt_queue_insert_tail(&port->messages, &msg->link); + if (msg != NULL) { + nxt_queue_insert_tail(&port->messages, &msg->link); + } - nxt_thread_mutex_unlock(&port->write_mutex); + return msg; +} + + +static nxt_port_send_msg_t * +nxt_port_msg_first(nxt_task_t *task, nxt_port_t *port, nxt_port_send_msg_t *msg) +{ + nxt_queue_link_t *lnk; + + lnk = nxt_queue_first(&port->messages); + + if (lnk == nxt_queue_tail(&port->messages)) { + return msg; + } + + return nxt_queue_link_data(lnk, nxt_port_send_msg_t, link); +} + + +nxt_int_t +nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, + nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b) +{ + nxt_port_send_msg_t msg, *res; + + msg.link.next = NULL; + msg.link.prev = NULL; + + msg.buf = b; + msg.fd = fd; + msg.close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0; + msg.share = 0; + + msg.port_msg.stream = stream; + msg.port_msg.pid = nxt_pid; + msg.port_msg.reply_port = reply_port; + msg.port_msg.type = type & NXT_PORT_MSG_MASK; + msg.port_msg.last = (type & NXT_PORT_MSG_LAST) != 0; + msg.port_msg.mmap = 0; - nxt_port_use(task, port, 1); + msg.work.data = NULL; if (port->socket.write_ready) { - nxt_port_write_handler(task, &port->socket, NULL); + nxt_port_write_handler(task, &port->socket, &msg); + } else { + nxt_thread_mutex_lock(&port->write_mutex); + + res = nxt_port_msg_push(task, port, &msg); + + nxt_thread_mutex_unlock(&port->write_mutex); + + if (res == NULL) { + return NXT_ERROR; + } + + nxt_port_use(task, port, 1); } return NXT_OK; @@ -239,7 +289,6 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) nxt_port_t *port; struct iovec *iov; nxt_work_queue_t *wq; - nxt_queue_link_t *link; nxt_port_method_t m; nxt_port_send_msg_t *msg; nxt_sendbuf_coalesce_t sb; @@ -254,16 +303,16 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) iov = port->iov; + wq = &task->thread->engine->fast_work_queue; + do { - link = nxt_queue_first(&port->messages); + msg = nxt_port_msg_first(task, port, data); - if (link == nxt_queue_tail(&port->messages)) { + if (msg == NULL) { block_write = 1; goto unlock_mutex; } - msg = nxt_queue_link_data(link, nxt_port_send_msg_t, link); - iov[0].iov_base = &msg->port_msg; iov[0].iov_len = sizeof(nxt_port_msg_t); @@ -315,8 +364,6 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) msg->fd = -1; } - wq = &task->thread->engine->fast_work_queue; - msg->buf = nxt_sendbuf_completion(task, wq, msg->buf, plain_size, m == NXT_PORT_METHOD_MMAP); @@ -330,18 +377,34 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) if (msg->share >= port->max_share) { msg->share = 0; - nxt_queue_remove(link); - nxt_queue_insert_tail(&port->messages, link); + + if (msg->link.next != NULL) { + nxt_queue_remove(&msg->link); + use_delta--; + } + data = NULL; + + if (nxt_port_msg_push(task, port, msg) != NULL) { + use_delta++; + } } } else { - nxt_queue_remove(link); - use_delta--; - nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg, - msg->engine); + if (msg->link.next != NULL) { + nxt_queue_remove(&msg->link); + use_delta--; + nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg, + msg->work.data); + } + data = NULL; } } else if (nxt_slow_path(n == NXT_ERROR)) { + if (msg->link.next == NULL) { + if (nxt_port_msg_push(task, port, msg) != NULL) { + use_delta++; + } + } goto fail; } @@ -359,8 +422,7 @@ fail: use_delta++; - nxt_work_queue_add(&task->thread->engine->fast_work_queue, - nxt_port_error_handler, task, &port->socket, + nxt_work_queue_add(wq, nxt_port_error_handler, task, &port->socket, &port->socket); unlock_mutex: @@ -595,7 +657,7 @@ nxt_port_error_handler(nxt_task_t *task, void *obj, void *data) nxt_queue_remove(&msg->link); use_delta--; nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg, - msg->engine); + msg->work.data); } nxt_queue_loop; |