summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_port_socket.c
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2017-05-12 20:32:41 +0300
committerMax Romanov <max.romanov@nginx.com>2017-05-12 20:32:41 +0300
commitf7b4bdfd892a0b479dc946896435a3ba7f9615dd (patch)
treea6f0c4ebaeed2d9f0fcb1c07178b52a684a53280 /src/nxt_port_socket.c
parent1782c771fab999b37a8c04ed72760e3528205be7 (diff)
downloadunit-f7b4bdfd892a0b479dc946896435a3ba7f9615dd.tar.gz
unit-f7b4bdfd892a0b479dc946896435a3ba7f9615dd.tar.bz2
Using shared memory to send data via nxt_port.
Usage: b = nxt_port_mmap_get_buf(task, port, size); b->mem.free = nxt_cpymem(b->mem.free, data, size); nxt_port_socket_write(task, port, NXT_PORT_MSG_DATA, -1, 0, b);
Diffstat (limited to 'src/nxt_port_socket.c')
-rw-r--r--src/nxt_port_socket.c141
1 files changed, 101 insertions, 40 deletions
diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c
index 671c3334..51d88ff7 100644
--- a/src/nxt_port_socket.c
+++ b/src/nxt_port_socket.c
@@ -10,7 +10,7 @@
static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data);
static void nxt_port_read_handler(nxt_task_t *task, void *obj, void *data);
static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
- nxt_port_msg_t *msg, nxt_fd_t fd, nxt_buf_t *b, size_t size);
+ nxt_port_recv_msg_t *msg, size_t size);
static nxt_buf_t *nxt_port_buf_alloc(nxt_port_t *port);
static void nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b);
static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data);
@@ -137,7 +137,7 @@ nxt_port_write_close(nxt_port_t *port)
nxt_int_t
nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
- nxt_fd_t fd, uint32_t stream, nxt_buf_t *b)
+ nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b)
{
nxt_queue_link_t *link;
nxt_port_send_msg_t *msg;
@@ -169,8 +169,11 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
msg->share = 0;
msg->port_msg.stream = stream;
+ msg->port_msg.pid = nxt_pid;
+ msg->port_msg.reply_port = reply_port;
msg->port_msg.type = type;
msg->port_msg.last = 0;
+ msg->port_msg.mmap = 0;
nxt_queue_insert_tail(&port->messages, &msg->link);
@@ -186,12 +189,15 @@ static void
nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
{
ssize_t n;
- nxt_uint_t niov;
nxt_port_t *port;
struct iovec iov[NXT_IOBUF_MAX];
nxt_queue_link_t *link;
nxt_port_send_msg_t *msg;
nxt_sendbuf_coalesce_t sb;
+ nxt_port_method_t m;
+
+ size_t plain_size;
+ nxt_buf_t *plain_buf;
port = obj;
@@ -213,27 +219,59 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
sb.nmax = NXT_IOBUF_MAX - 1;
sb.sync = 0;
sb.last = 0;
- sb.size = sizeof(nxt_port_msg_t);
+ sb.size = 0;
sb.limit = port->max_size;
- niov = nxt_sendbuf_mem_coalesce(task, &sb);
+ m = nxt_port_mmap_get_method(task, port, msg->buf);
+
+ if (m == NXT_PORT_METHOD_MMAP) {
+ sb.limit = (1ULL << 31) - 1;
+ }
+
+ nxt_sendbuf_mem_coalesce(task, &sb);
+
+ plain_size = sb.size;
+ plain_buf = msg->buf;
+
+ /*
+ * Send through mmap enabled only when payload
+ * is bigger than PORT_MMAP_MIN_SIZE.
+ */
+ if (m == NXT_PORT_METHOD_MMAP && plain_size > PORT_MMAP_MIN_SIZE) {
+ nxt_port_mmap_write(task, port, msg, &sb);
+
+ } else {
+ m = NXT_PORT_METHOD_PLAIN;
+ }
msg->port_msg.last = sb.last;
- n = nxt_socketpair_send(&port->socket, msg->fd, iov, niov + 1);
+ n = nxt_socketpair_send(&port->socket, msg->fd, iov, sb.niov + 1);
if (n > 0) {
- if (nxt_slow_path((size_t) n != sb.size)) {
+ if (nxt_slow_path((size_t) n != sb.size + iov[0].iov_len)) {
nxt_log(task, NXT_LOG_CRIT,
"port %d: short write: %z instead of %uz",
- port->socket.fd, n, sb.size);
+ port->socket.fd, n, sb.size + iov[0].iov_len);
goto fail;
}
+ if (msg->buf != plain_buf) {
+ /*
+ * Complete crafted mmap_msgs buf and restore msg->buf
+ * for regular completion call.
+ */
+ nxt_port_mmap_completion(task,
+ port->socket.write_work_queue,
+ msg->buf);
+
+ msg->buf = plain_buf;
+ }
+
msg->buf = nxt_sendbuf_completion(task,
port->socket.write_work_queue,
msg->buf,
- n - sizeof(nxt_port_msg_t));
+ plain_size);
if (msg->buf != NULL) {
/*
@@ -301,14 +339,13 @@ nxt_port_read_close(nxt_port_t *port)
static void
nxt_port_read_handler(nxt_task_t *task, void *obj, void *data)
{
- ssize_t n;
- nxt_fd_t fd;
- nxt_buf_t *b;
- nxt_port_t *port;
- struct iovec iov[2];
- nxt_port_msg_t msg;
+ ssize_t n;
+ nxt_buf_t *b;
+ nxt_port_t *port;
+ struct iovec iov[2];
+ nxt_port_recv_msg_t msg;
- port = obj;
+ port = msg.port = obj;
for ( ;; ) {
@@ -318,24 +355,21 @@ nxt_port_read_handler(nxt_task_t *task, void *obj, void *data)
/* TODO: disable event for some time */
}
- iov[0].iov_base = &msg;
+ iov[0].iov_base = &msg.port_msg;
iov[0].iov_len = sizeof(nxt_port_msg_t);
iov[1].iov_base = b->mem.pos;
iov[1].iov_len = port->max_size;
- n = nxt_socketpair_recv(&port->socket, &fd, iov, 2);
+ n = nxt_socketpair_recv(&port->socket, &msg.fd, iov, 2);
if (n > 0) {
- nxt_port_read_msg_process(task, port, &msg, fd, b, n);
- if (b->mem.pos == b->mem.free) {
+ msg.buf = b;
- if (b->next != NULL) {
- /* A sync buffer */
- nxt_buf_free(port->mem_pool, b->next);
- }
+ nxt_port_read_msg_process(task, port, &msg, n);
+ if (b->mem.pos == b->mem.free) {
nxt_port_buf_free(port, b);
}
@@ -364,10 +398,11 @@ nxt_port_read_handler(nxt_task_t *task, void *obj, void *data)
static void
nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
- nxt_port_msg_t *msg, nxt_fd_t fd, nxt_buf_t *b, size_t size)
+ nxt_port_recv_msg_t *msg, size_t size)
{
- nxt_buf_t *sync;
- nxt_port_recv_msg_t recv_msg;
+ nxt_buf_t *b;
+ nxt_buf_t *orig_b;
+ nxt_buf_t **last_next;
if (nxt_slow_path(size < sizeof(nxt_port_msg_t))) {
nxt_log(port->socket.task, NXT_LOG_CRIT,
@@ -375,31 +410,56 @@ nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
goto fail;
}
- recv_msg.stream = msg->stream;
- recv_msg.type = msg->type;
- recv_msg.fd = fd;
- recv_msg.buf = b;
- recv_msg.port = port;
+ /* adjust size to actual buffer used size */
+ size -= sizeof(nxt_port_msg_t);
+
+ b = orig_b = msg->buf;
+ b->mem.free += size;
+
+ if (msg->port_msg.mmap) {
+ nxt_port_mmap_read(task, port, msg, size);
+ b = msg->buf;
+ }
- b->mem.free += size - sizeof(nxt_port_msg_t);
+ last_next = &b->next;
- if (msg->last) {
- sync = nxt_buf_sync_alloc(port->mem_pool, NXT_BUF_SYNC_LAST);
- if (nxt_slow_path(sync == NULL)) {
+ if (msg->port_msg.last) {
+ /* find reference to last next, the NULL one */
+ while (*last_next) {
+ last_next = &(*last_next)->next;
+ }
+
+ *last_next = nxt_buf_sync_alloc(port->mem_pool, NXT_BUF_SYNC_LAST);
+ if (nxt_slow_path(*last_next == NULL)) {
goto fail;
}
+ }
+
+ port->handler(task, msg);
- b->next = sync;
+ if (*last_next != NULL) {
+ /* A sync buffer */
+ nxt_buf_free(port->mem_pool, *last_next);
+ *last_next = NULL;
}
- port->handler(task, &recv_msg);
+ if (orig_b != b) {
+ /* complete mmap buffers */
+ for (; b && nxt_buf_used_size(b) == 0;
+ b = b->next) {
+ nxt_debug(task, "complete buffer %p", b);
+
+ nxt_work_queue_add(port->socket.read_work_queue,
+ b->completion_handler, task, b, b->parent);
+ }
+ }
return;
fail:
- if (fd != -1) {
- nxt_fd_close(fd);
+ if (msg->fd != -1) {
+ nxt_fd_close(msg->fd);
}
}
@@ -415,6 +475,7 @@ nxt_port_buf_alloc(nxt_port_t *port)
b->mem.pos = b->mem.start;
b->mem.free = b->mem.start;
+ b->next = NULL;
} else {
b = nxt_buf_mem_alloc(port->mem_pool, port->max_size, 0);