diff options
author | Max Romanov <max.romanov@nginx.com> | 2017-05-12 20:32:41 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2017-05-12 20:32:41 +0300 |
commit | f7b4bdfd892a0b479dc946896435a3ba7f9615dd (patch) | |
tree | a6f0c4ebaeed2d9f0fcb1c07178b52a684a53280 /src/nxt_port_socket.c | |
parent | 1782c771fab999b37a8c04ed72760e3528205be7 (diff) | |
download | unit-f7b4bdfd892a0b479dc946896435a3ba7f9615dd.tar.gz unit-f7b4bdfd892a0b479dc946896435a3ba7f9615dd.tar.bz2 |
Using shared memory to send data via nxt_port.
Usage:
b = nxt_port_mmap_get_buf(task, port, size);
b->mem.free = nxt_cpymem(b->mem.free, data, size);
nxt_port_socket_write(task, port, NXT_PORT_MSG_DATA, -1, 0, b);
Diffstat (limited to 'src/nxt_port_socket.c')
-rw-r--r-- | src/nxt_port_socket.c | 141 |
1 files changed, 101 insertions, 40 deletions
diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c index 671c3334..51d88ff7 100644 --- a/src/nxt_port_socket.c +++ b/src/nxt_port_socket.c @@ -10,7 +10,7 @@ static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data); static void nxt_port_read_handler(nxt_task_t *task, void *obj, void *data); static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, - nxt_port_msg_t *msg, nxt_fd_t fd, nxt_buf_t *b, size_t size); + nxt_port_recv_msg_t *msg, size_t size); static nxt_buf_t *nxt_port_buf_alloc(nxt_port_t *port); static void nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b); static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data); @@ -137,7 +137,7 @@ nxt_port_write_close(nxt_port_t *port) nxt_int_t nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, - nxt_fd_t fd, uint32_t stream, nxt_buf_t *b) + nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b) { nxt_queue_link_t *link; nxt_port_send_msg_t *msg; @@ -169,8 +169,11 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, msg->share = 0; msg->port_msg.stream = stream; + msg->port_msg.pid = nxt_pid; + msg->port_msg.reply_port = reply_port; msg->port_msg.type = type; msg->port_msg.last = 0; + msg->port_msg.mmap = 0; nxt_queue_insert_tail(&port->messages, &msg->link); @@ -186,12 +189,15 @@ static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) { ssize_t n; - nxt_uint_t niov; nxt_port_t *port; struct iovec iov[NXT_IOBUF_MAX]; nxt_queue_link_t *link; nxt_port_send_msg_t *msg; nxt_sendbuf_coalesce_t sb; + nxt_port_method_t m; + + size_t plain_size; + nxt_buf_t *plain_buf; port = obj; @@ -213,27 +219,59 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) sb.nmax = NXT_IOBUF_MAX - 1; sb.sync = 0; sb.last = 0; - sb.size = sizeof(nxt_port_msg_t); + sb.size = 0; sb.limit = port->max_size; - niov = nxt_sendbuf_mem_coalesce(task, &sb); + m = nxt_port_mmap_get_method(task, port, msg->buf); + + if (m == NXT_PORT_METHOD_MMAP) { + sb.limit = (1ULL << 31) - 1; + } + + nxt_sendbuf_mem_coalesce(task, &sb); + + plain_size = sb.size; + plain_buf = msg->buf; + + /* + * Send through mmap enabled only when payload + * is bigger than PORT_MMAP_MIN_SIZE. + */ + if (m == NXT_PORT_METHOD_MMAP && plain_size > PORT_MMAP_MIN_SIZE) { + nxt_port_mmap_write(task, port, msg, &sb); + + } else { + m = NXT_PORT_METHOD_PLAIN; + } msg->port_msg.last = sb.last; - n = nxt_socketpair_send(&port->socket, msg->fd, iov, niov + 1); + n = nxt_socketpair_send(&port->socket, msg->fd, iov, sb.niov + 1); if (n > 0) { - if (nxt_slow_path((size_t) n != sb.size)) { + if (nxt_slow_path((size_t) n != sb.size + iov[0].iov_len)) { nxt_log(task, NXT_LOG_CRIT, "port %d: short write: %z instead of %uz", - port->socket.fd, n, sb.size); + port->socket.fd, n, sb.size + iov[0].iov_len); goto fail; } + if (msg->buf != plain_buf) { + /* + * Complete crafted mmap_msgs buf and restore msg->buf + * for regular completion call. + */ + nxt_port_mmap_completion(task, + port->socket.write_work_queue, + msg->buf); + + msg->buf = plain_buf; + } + msg->buf = nxt_sendbuf_completion(task, port->socket.write_work_queue, msg->buf, - n - sizeof(nxt_port_msg_t)); + plain_size); if (msg->buf != NULL) { /* @@ -301,14 +339,13 @@ nxt_port_read_close(nxt_port_t *port) static void nxt_port_read_handler(nxt_task_t *task, void *obj, void *data) { - ssize_t n; - nxt_fd_t fd; - nxt_buf_t *b; - nxt_port_t *port; - struct iovec iov[2]; - nxt_port_msg_t msg; + ssize_t n; + nxt_buf_t *b; + nxt_port_t *port; + struct iovec iov[2]; + nxt_port_recv_msg_t msg; - port = obj; + port = msg.port = obj; for ( ;; ) { @@ -318,24 +355,21 @@ nxt_port_read_handler(nxt_task_t *task, void *obj, void *data) /* TODO: disable event for some time */ } - iov[0].iov_base = &msg; + iov[0].iov_base = &msg.port_msg; iov[0].iov_len = sizeof(nxt_port_msg_t); iov[1].iov_base = b->mem.pos; iov[1].iov_len = port->max_size; - n = nxt_socketpair_recv(&port->socket, &fd, iov, 2); + n = nxt_socketpair_recv(&port->socket, &msg.fd, iov, 2); if (n > 0) { - nxt_port_read_msg_process(task, port, &msg, fd, b, n); - if (b->mem.pos == b->mem.free) { + msg.buf = b; - if (b->next != NULL) { - /* A sync buffer */ - nxt_buf_free(port->mem_pool, b->next); - } + nxt_port_read_msg_process(task, port, &msg, n); + if (b->mem.pos == b->mem.free) { nxt_port_buf_free(port, b); } @@ -364,10 +398,11 @@ nxt_port_read_handler(nxt_task_t *task, void *obj, void *data) static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, - nxt_port_msg_t *msg, nxt_fd_t fd, nxt_buf_t *b, size_t size) + nxt_port_recv_msg_t *msg, size_t size) { - nxt_buf_t *sync; - nxt_port_recv_msg_t recv_msg; + nxt_buf_t *b; + nxt_buf_t *orig_b; + nxt_buf_t **last_next; if (nxt_slow_path(size < sizeof(nxt_port_msg_t))) { nxt_log(port->socket.task, NXT_LOG_CRIT, @@ -375,31 +410,56 @@ nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, goto fail; } - recv_msg.stream = msg->stream; - recv_msg.type = msg->type; - recv_msg.fd = fd; - recv_msg.buf = b; - recv_msg.port = port; + /* adjust size to actual buffer used size */ + size -= sizeof(nxt_port_msg_t); + + b = orig_b = msg->buf; + b->mem.free += size; + + if (msg->port_msg.mmap) { + nxt_port_mmap_read(task, port, msg, size); + b = msg->buf; + } - b->mem.free += size - sizeof(nxt_port_msg_t); + last_next = &b->next; - if (msg->last) { - sync = nxt_buf_sync_alloc(port->mem_pool, NXT_BUF_SYNC_LAST); - if (nxt_slow_path(sync == NULL)) { + if (msg->port_msg.last) { + /* find reference to last next, the NULL one */ + while (*last_next) { + last_next = &(*last_next)->next; + } + + *last_next = nxt_buf_sync_alloc(port->mem_pool, NXT_BUF_SYNC_LAST); + if (nxt_slow_path(*last_next == NULL)) { goto fail; } + } + + port->handler(task, msg); - b->next = sync; + if (*last_next != NULL) { + /* A sync buffer */ + nxt_buf_free(port->mem_pool, *last_next); + *last_next = NULL; } - port->handler(task, &recv_msg); + if (orig_b != b) { + /* complete mmap buffers */ + for (; b && nxt_buf_used_size(b) == 0; + b = b->next) { + nxt_debug(task, "complete buffer %p", b); + + nxt_work_queue_add(port->socket.read_work_queue, + b->completion_handler, task, b, b->parent); + } + } return; fail: - if (fd != -1) { - nxt_fd_close(fd); + if (msg->fd != -1) { + nxt_fd_close(msg->fd); } } @@ -415,6 +475,7 @@ nxt_port_buf_alloc(nxt_port_t *port) b->mem.pos = b->mem.start; b->mem.free = b->mem.start; + b->next = NULL; } else { b = nxt_buf_mem_alloc(port->mem_pool, port->max_size, 0); |