diff options
-rw-r--r-- | src/nxt_port.h | 8 | ||||
-rw-r--r-- | src/nxt_port_memory.c | 6 | ||||
-rw-r--r-- | src/nxt_port_memory.h | 2 | ||||
-rw-r--r-- | src/nxt_port_socket.c | 308 |
4 files changed, 165 insertions, 159 deletions
diff --git a/src/nxt_port.h b/src/nxt_port.h index 76faa7d2..e4e76693 100644 --- a/src/nxt_port.h +++ b/src/nxt_port.h @@ -134,11 +134,10 @@ typedef struct { nxt_buf_t *buf; size_t share; nxt_fd_t fd; - nxt_bool_t close_fd; nxt_port_msg_t port_msg; uint32_t tracking_msg[2]; - - nxt_work_t work; + uint8_t close_fd; /* 1 bit */ + uint8_t allocated; /* 1 bit */ } nxt_port_send_msg_t; @@ -202,9 +201,6 @@ struct nxt_port_s { nxt_atomic_t use_count; nxt_process_type_t type; - - struct iovec *iov; - void *mmsg_buf; }; diff --git a/src/nxt_port_memory.c b/src/nxt_port_memory.c index b908041c..b7068c88 100644 --- a/src/nxt_port_memory.c +++ b/src/nxt_port_memory.c @@ -798,7 +798,7 @@ nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port, void nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port, - nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb) + nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb, void *mmsg_buf) { size_t bsize; nxt_buf_t *bmem; @@ -811,7 +811,7 @@ nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port, "via shared memory", sb->size, port->pid); bsize = sb->niov * sizeof(nxt_port_mmap_msg_t); - mmap_msg = port->mmsg_buf; + mmap_msg = mmsg_buf; bmem = msg->buf; @@ -841,7 +841,7 @@ nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port, port->pid); } - sb->iobuf[0].iov_base = port->mmsg_buf; + sb->iobuf[0].iov_base = mmsg_buf; sb->iobuf[0].iov_len = bsize; sb->niov = 1; sb->size = bsize; diff --git a/src/nxt_port_memory.h b/src/nxt_port_memory.h index c6a49ccf..748549b1 100644 --- a/src/nxt_port_memory.h +++ b/src/nxt_port_memory.h @@ -55,7 +55,7 @@ nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process, void nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port, - nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb); + nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb, void *mmsg_buf); void nxt_port_mmap_read(nxt_task_t *task, nxt_port_recv_msg_t *msg); diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c index fe113a68..4edc423a 100644 --- a/src/nxt_port_socket.c +++ b/src/nxt_port_socket.c @@ -7,9 +7,15 @@ #include <nxt_main.h> +static nxt_int_t nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port, + nxt_port_send_msg_t *msg); +static nxt_port_send_msg_t *nxt_port_msg_alloc(nxt_port_send_msg_t *m); static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data); +static nxt_port_send_msg_t *nxt_port_msg_first(nxt_port_t *port); static nxt_buf_t *nxt_port_buf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b, size_t sent, nxt_bool_t mmap_mode); +static nxt_port_send_msg_t *nxt_port_msg_insert_tail(nxt_port_t *port, + nxt_port_send_msg_t *msg); static void nxt_port_read_handler(nxt_task_t *task, void *obj, void *data); static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, nxt_port_recv_msg_t *msg); @@ -116,13 +122,6 @@ nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port) port->socket.write_work_queue = &port->engine->fast_work_queue; port->socket.write_handler = nxt_port_write_handler; port->socket.error_handler = nxt_port_error_handler; - - if (port->iov == NULL) { - port->iov = nxt_mp_get(port->mem_pool, - sizeof(struct iovec) * NXT_IOBUF_MAX * 10); - port->mmsg_buf = nxt_mp_get(port->mem_pool, - sizeof(uint32_t) * 3 * NXT_IOBUF_MAX * 10); - } } @@ -135,109 +134,11 @@ nxt_port_write_close(nxt_port_t *port) static void -nxt_port_release_send_msg(nxt_task_t *task, void *obj, void *data) -{ - nxt_event_engine_t *engine; - nxt_port_send_msg_t *msg; - - msg = obj; - engine = data; - - nxt_assert(data == msg->work.data); - - if (engine != task->thread->engine) { - - nxt_debug(task, "current thread is %PT, expected %PT", - task->thread->tid, engine->task.thread->tid); - - nxt_event_engine_post(engine, &msg->work); - - return; - } - - nxt_mp_free(engine->mem_pool, obj); - nxt_mp_release(engine->mem_pool); -} - - -static nxt_port_send_msg_t * -nxt_port_msg_create(nxt_task_t *task, nxt_port_send_msg_t *m) -{ - nxt_mp_t *mp; - nxt_port_send_msg_t *msg; - - mp = task->thread->engine->mem_pool; - - msg = nxt_mp_alloc(mp, sizeof(nxt_port_send_msg_t)); - if (nxt_slow_path(msg == NULL)) { - return NULL; - } - - nxt_mp_retain(mp); - - msg->link.next = NULL; - msg->link.prev = NULL; - - msg->buf = m->buf; - msg->share = m->share; - msg->fd = m->fd; - msg->close_fd = m->close_fd; - msg->port_msg = m->port_msg; - - msg->work.next = NULL; - msg->work.handler = nxt_port_release_send_msg; - msg->work.task = task; - msg->work.obj = msg; - msg->work.data = task->thread->engine; - - return msg; -} - - -static nxt_port_send_msg_t * -nxt_port_msg_insert_head(nxt_task_t *task, nxt_port_t *port, - nxt_port_send_msg_t *msg) +nxt_port_release_send_msg(nxt_port_send_msg_t *msg) { - if (msg->work.data == NULL) { - msg = nxt_port_msg_create(task, msg); + if (msg->allocated) { + nxt_free(msg); } - - if (msg != NULL) { - nxt_queue_insert_head(&port->messages, &msg->link); - } - - return msg; -} - - -static nxt_port_send_msg_t * -nxt_port_msg_insert_tail(nxt_task_t *task, nxt_port_t *port, - nxt_port_send_msg_t *msg) -{ - if (msg->work.data == NULL) { - msg = nxt_port_msg_create(task, msg); - } - - if (msg != NULL) { - nxt_queue_insert_tail(&port->messages, &msg->link); - } - - return msg; -} - - -static nxt_port_send_msg_t * -nxt_port_msg_first(nxt_task_t *task, nxt_port_t *port, nxt_port_send_msg_t *msg) -{ - nxt_queue_link_t *lnk; - - lnk = nxt_queue_first(&port->messages); - - if (lnk == nxt_queue_tail(&port->messages)) { - return msg; - } - - return nxt_queue_link_data(lnk, nxt_port_send_msg_t, link); } @@ -246,15 +147,17 @@ nxt_port_socket_twrite(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b, void *tracking) { - nxt_port_send_msg_t msg, *res; + nxt_int_t res; + nxt_port_send_msg_t msg; msg.link.next = NULL; msg.link.prev = NULL; msg.buf = b; + msg.share = 0; msg.fd = fd; msg.close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0; - msg.share = 0; + msg.allocated = 0; if (tracking != NULL) { nxt_port_mmap_tracking_write(msg.tracking_msg, tracking); @@ -270,25 +173,63 @@ nxt_port_socket_twrite(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, msg.port_msg.mf = 0; msg.port_msg.tracking = tracking != NULL; - msg.work.data = NULL; - - if (port->socket.write_ready) { + res = nxt_port_msg_chk_insert(task, port, &msg); + if (nxt_fast_path(res == NXT_DECLINED)) { nxt_port_write_handler(task, &port->socket, &msg); - } else { - nxt_thread_mutex_lock(&port->write_mutex); + res = NXT_OK; + } + + return res; +} + - res = nxt_port_msg_insert_tail(task, port, &msg); +static nxt_int_t +nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port, + nxt_port_send_msg_t *msg) +{ + nxt_int_t res; + + nxt_thread_mutex_lock(&port->write_mutex); + + if (nxt_fast_path(port->socket.write_ready + && nxt_queue_is_empty(&port->messages))) + { + res = NXT_DECLINED; + + } else { + msg = nxt_port_msg_alloc(msg); - nxt_thread_mutex_unlock(&port->write_mutex); + if (nxt_fast_path(msg != NULL)) { + nxt_queue_insert_tail(&port->messages, &msg->link); + nxt_port_use(task, port, 1); + res = NXT_OK; - if (res == NULL) { - return NXT_ERROR; + } else { + res = NXT_ERROR; } + } + + nxt_thread_mutex_unlock(&port->write_mutex); + + return res; +} + + +static nxt_port_send_msg_t * +nxt_port_msg_alloc(nxt_port_send_msg_t *m) +{ + nxt_port_send_msg_t *msg; - nxt_port_use(task, port, 1); + msg = nxt_malloc(sizeof(nxt_port_send_msg_t)); + if (nxt_slow_path(msg == NULL)) { + return NULL; } - return NXT_OK; + *msg = *m; + + msg->allocated = 1; + + return msg; } @@ -312,9 +253,10 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) int use_delta; size_t plain_size; ssize_t n; + uint32_t mmsg_buf[3 * NXT_IOBUF_MAX * 10]; nxt_bool_t block_write, enable_write; nxt_port_t *port; - struct iovec *iov; + struct iovec iov[NXT_IOBUF_MAX * 10]; nxt_work_queue_t *wq; nxt_port_method_t m; nxt_port_send_msg_t *msg; @@ -326,20 +268,23 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) enable_write = 0; use_delta = 0; - nxt_thread_mutex_lock(&port->write_mutex); - - iov = port->iov; - wq = &task->thread->engine->fast_work_queue; do { - msg = nxt_port_msg_first(task, port, data); + if (data) { + msg = data; - if (msg == NULL) { - block_write = 1; - goto unlock_mutex; + } else { + msg = nxt_port_msg_first(port); + + if (msg == NULL) { + block_write = 1; + goto cleanup; + } } +next_fragment: + iov[0].iov_base = &msg->port_msg; iov[0].iov_len = sizeof(nxt_port_msg_t); @@ -377,7 +322,7 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) * is bigger than PORT_MMAP_MIN_SIZE. */ if (m == NXT_PORT_METHOD_MMAP && plain_size > PORT_MMAP_MIN_SIZE) { - nxt_port_mmap_write(task, port, msg, &sb); + nxt_port_mmap_write(task, port, msg, &sb, mmsg_buf); } else { m = NXT_PORT_METHOD_PLAIN; @@ -402,7 +347,7 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) } msg->buf = nxt_port_buf_completion(task, wq, msg->buf, plain_size, - m == NXT_PORT_METHOD_MMAP); + m == NXT_PORT_METHOD_MMAP); if (msg->buf != NULL) { nxt_debug(task, "port %d: frag stream #%uD", port->socket.fd, @@ -421,36 +366,58 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) msg->share = 0; if (msg->link.next != NULL) { + nxt_thread_mutex_lock(&port->write_mutex); + nxt_queue_remove(&msg->link); - use_delta--; - } - data = NULL; + nxt_queue_insert_tail(&port->messages, &msg->link); + + nxt_thread_mutex_unlock(&port->write_mutex); + + } else { + msg = nxt_port_msg_insert_tail(port, msg); + if (nxt_slow_path(msg == NULL)) { + goto fail; + } - if (nxt_port_msg_insert_tail(task, port, msg) != NULL) { use_delta++; } + + } else { + goto next_fragment; } } else { if (msg->link.next != NULL) { + nxt_thread_mutex_lock(&port->write_mutex); + nxt_queue_remove(&msg->link); + msg->link.next = NULL; + + nxt_thread_mutex_unlock(&port->write_mutex); + use_delta--; - nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg, - msg->work.data); } - data = NULL; + + nxt_port_release_send_msg(msg); } - } else { - if (msg->link.next == NULL) { - if (nxt_port_msg_insert_head(task, port, msg) != NULL) { - use_delta++; - } + if (data != NULL) { + goto cleanup; } + } else { if (nxt_slow_path(n == NXT_ERROR)) { goto fail; } + + if (msg->link.next == NULL) { + msg = nxt_port_msg_insert_tail(port, msg); + if (nxt_slow_path(msg == NULL)) { + goto fail; + } + + use_delta++; + } } } while (port->socket.write_ready); @@ -459,7 +426,7 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) enable_write = 1; } - goto unlock_mutex; + goto cleanup; fail: @@ -468,8 +435,7 @@ fail: nxt_work_queue_add(wq, nxt_port_error_handler, task, &port->socket, &port->socket); -unlock_mutex: - nxt_thread_mutex_unlock(&port->write_mutex); +cleanup: if (block_write && nxt_fd_event_is_active(port->socket.write)) { nxt_port_post(task, port, nxt_port_fd_block_write, NULL); @@ -485,6 +451,29 @@ unlock_mutex: } +static nxt_port_send_msg_t * +nxt_port_msg_first(nxt_port_t *port) +{ + nxt_queue_link_t *lnk; + nxt_port_send_msg_t *msg; + + nxt_thread_mutex_lock(&port->write_mutex); + + lnk = nxt_queue_first(&port->messages); + + if (lnk == nxt_queue_tail(&port->messages)) { + msg = NULL; + + } else { + msg = nxt_queue_link_data(lnk, nxt_port_send_msg_t, link); + } + + nxt_thread_mutex_unlock(&port->write_mutex); + + return msg; +} + + static nxt_buf_t * nxt_port_buf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b, size_t sent, nxt_bool_t mmap_mode) @@ -546,6 +535,27 @@ nxt_port_buf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b, } +static nxt_port_send_msg_t * +nxt_port_msg_insert_tail(nxt_port_t *port, nxt_port_send_msg_t *msg) +{ + if (msg->allocated == 0) { + msg = nxt_port_msg_alloc(msg); + + if (nxt_slow_path(msg == NULL)) { + return NULL; + } + } + + nxt_thread_mutex_lock(&port->write_mutex); + + nxt_queue_insert_tail(&port->messages, &msg->link); + + nxt_thread_mutex_unlock(&port->write_mutex); + + return msg; +} + + void nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port) { @@ -986,8 +996,8 @@ nxt_port_error_handler(nxt_task_t *task, void *obj, void *data) nxt_queue_remove(&msg->link); use_delta--; - nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg, - msg->work.data); + + nxt_port_release_send_msg(msg); } nxt_queue_loop; |