summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_port_socket.c
diff options
context:
space:
mode:
authorIgor Sysoev <igor@sysoev.ru>2017-02-01 20:03:45 +0300
committerIgor Sysoev <igor@sysoev.ru>2017-02-01 20:03:45 +0300
commite57b95a92333fa7ff558737b0ba2b76894cc0412 (patch)
treeb2de846a4fd958517be379b552f38652dd6d9078 /src/nxt_port_socket.c
parent6e67bee0f4f96ad0a7d1a231dfdae8431714c66a (diff)
downloadunit-e57b95a92333fa7ff558737b0ba2b76894cc0412.tar.gz
unit-e57b95a92333fa7ff558737b0ba2b76894cc0412.tar.bz2
Process channels have been renamed to ports.
Diffstat (limited to 'src/nxt_port_socket.c')
-rw-r--r--src/nxt_port_socket.c464
1 files changed, 464 insertions, 0 deletions
diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c
new file mode 100644
index 00000000..4dc72d64
--- /dev/null
+++ b/src/nxt_port_socket.c
@@ -0,0 +1,464 @@
+
+/*
+ * Copyright (C) Igor Sysoev
+ * Copyright (C) NGINX, Inc.
+ */
+
+#include <nxt_main.h>
+
+
+static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data);
+static void nxt_port_read_handler(nxt_task_t *task, void *obj, void *data);
+static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
+ nxt_port_msg_t *msg, nxt_fd_t fd, nxt_buf_t *b, size_t size);
+static nxt_buf_t *nxt_port_buf_alloc(nxt_port_t *port);
+static void nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b);
+static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data);
+
+
+nxt_port_t *
+nxt_port_alloc(void)
+{
+ nxt_port_t *port;
+ nxt_mem_pool_t *mp;
+
+ mp = nxt_mem_pool_create(1024);
+
+ if (nxt_fast_path(mp != NULL)) {
+ /* This allocation cannot fail. */
+ port = nxt_mem_zalloc(mp, sizeof(nxt_port_t));
+ port->mem_pool = mp;
+
+ port->pair[0] = -1;
+ port->pair[1] = -1;
+
+ nxt_queue_init(&port->messages);
+
+ return port;
+ }
+
+ return NULL;
+}
+
+
+nxt_port_t *
+nxt_port_create(size_t max_size)
+{
+ nxt_int_t sndbuf, rcvbuf, size;
+ nxt_port_t *port;
+ nxt_socket_t snd, rcv;
+
+ port = nxt_port_alloc();
+ if (nxt_slow_path(port == NULL)) {
+ return NULL;
+ }
+
+ if (nxt_slow_path(nxt_socketpair_create(port->pair) != NXT_OK)) {
+ goto socketpair_fail;
+ }
+
+ snd = port->pair[1];
+
+ sndbuf = nxt_socket_getsockopt(snd, SOL_SOCKET, SO_SNDBUF);
+ if (nxt_slow_path(sndbuf < 0)) {
+ goto getsockopt_fail;
+ }
+
+ rcv = port->pair[0];
+
+ rcvbuf = nxt_socket_getsockopt(rcv, SOL_SOCKET, SO_RCVBUF);
+ if (nxt_slow_path(rcvbuf < 0)) {
+ goto getsockopt_fail;
+ }
+
+ if (max_size == 0) {
+ max_size = 16 * 1024;
+ }
+
+ if ((size_t) sndbuf < max_size) {
+ /*
+ * On Unix domain sockets
+ * Linux uses 224K on both send and receive directions;
+ * FreeBSD, MacOSX, NetBSD, and OpenBSD use 2K buffer size
+ * on send direction and 4K buffer size on receive direction;
+ * Solaris uses 16K on send direction and 5K on receive direction.
+ */
+ (void) nxt_socket_setsockopt(snd, SOL_SOCKET, SO_SNDBUF, max_size);
+
+ sndbuf = nxt_socket_getsockopt(snd, SOL_SOCKET, SO_SNDBUF);
+ if (nxt_slow_path(sndbuf < 0)) {
+ goto getsockopt_fail;
+ }
+
+ size = sndbuf * 4;
+
+ if (rcvbuf < size) {
+ (void) nxt_socket_setsockopt(rcv, SOL_SOCKET, SO_RCVBUF, size);
+
+ rcvbuf = nxt_socket_getsockopt(rcv, SOL_SOCKET, SO_RCVBUF);
+ if (nxt_slow_path(rcvbuf < 0)) {
+ goto getsockopt_fail;
+ }
+ }
+ }
+
+ port->max_size = nxt_min(max_size, (size_t) sndbuf);
+ port->max_share = (64 * 1024);
+
+ return port;
+
+getsockopt_fail:
+
+ nxt_socket_close(port->pair[0]);
+ nxt_socket_close(port->pair[1]);
+
+socketpair_fail:
+
+ nxt_mem_pool_destroy(port->mem_pool);
+
+ return NULL;
+}
+
+
+void
+nxt_port_destroy(nxt_port_t *port)
+{
+ nxt_socket_close(port->socket.fd);
+ nxt_mem_pool_destroy(port->mem_pool);
+}
+
+
+void
+nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port)
+{
+ port->socket.fd = port->pair[1];
+ port->socket.log = &nxt_main_log;
+ port->socket.write_ready = 1;
+
+ port->task.thread = task->thread;
+ port->task.log = port->socket.log;
+ port->task.ident = nxt_task_next_ident();
+
+ port->socket.task = &port->task;
+
+ port->socket.write_work_queue = &task->thread->engine->fast_work_queue;
+ port->socket.write_handler = nxt_port_write_handler;
+ port->socket.error_handler = nxt_port_error_handler;
+}
+
+
+void
+nxt_port_write_close(nxt_port_t *port)
+{
+ nxt_socket_close(port->pair[1]);
+ port->pair[1] = -1;
+}
+
+
+nxt_int_t
+nxt_port_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
+ nxt_fd_t fd, uint32_t stream, nxt_buf_t *b)
+{
+ nxt_queue_link_t *link;
+ nxt_port_send_msg_t *msg;
+
+ for (link = nxt_queue_first(&port->messages);
+ link != nxt_queue_tail(&port->messages);
+ link = nxt_queue_next(link))
+ {
+ msg = (nxt_port_send_msg_t *) link;
+
+ if (msg->port_msg.stream == stream) {
+ /*
+ * An fd is ignored since a file descriptor
+ * must be sent only in the first message of a stream.
+ */
+ nxt_buf_chain_add(&msg->buf, b);
+
+ return NXT_OK;
+ }
+ }
+
+ msg = nxt_mem_cache_zalloc0(port->mem_pool, sizeof(nxt_port_send_msg_t));
+ if (nxt_slow_path(msg == NULL)) {
+ return NXT_ERROR;
+ }
+
+ msg->buf = b;
+ msg->fd = fd;
+ msg->share = 0;
+
+ msg->port_msg.stream = stream;
+ msg->port_msg.type = type;
+ msg->port_msg.last = 0;
+
+ nxt_queue_insert_tail(&port->messages, &msg->link);
+
+ if (port->socket.write_ready) {
+ nxt_port_write_handler(task, port, NULL);
+ }
+
+ return NXT_OK;
+}
+
+
+static void
+nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
+{
+ ssize_t n;
+ nxt_uint_t niob;
+ nxt_port_t *port;
+ struct iovec iob[NXT_IOBUF_MAX];
+ nxt_queue_link_t *link;
+ nxt_port_send_msg_t *msg;
+ nxt_sendbuf_coalesce_t sb;
+
+ port = obj;
+
+ do {
+ link = nxt_queue_first(&port->messages);
+
+ if (link == nxt_queue_tail(&port->messages)) {
+ nxt_event_fd_block_write(task->thread->engine, &port->socket);
+ return;
+ }
+
+ msg = (nxt_port_send_msg_t *) link;
+
+ nxt_iobuf_set(&iob[0], &msg->port_msg, sizeof(nxt_port_msg_t));
+
+ sb.buf = msg->buf;
+ sb.iobuf = &iob[1];
+ sb.nmax = NXT_IOBUF_MAX - 1;
+ sb.sync = 0;
+ sb.last = 0;
+ sb.size = sizeof(nxt_port_msg_t);
+ sb.limit = port->max_size;
+
+ niob = nxt_sendbuf_mem_coalesce(task, &sb);
+
+ msg->port_msg.last = sb.last;
+
+ n = nxt_socketpair_send(&port->socket, msg->fd, iob, niob + 1);
+
+ if (n > 0) {
+ if (nxt_slow_path((size_t) n != sb.size)) {
+ nxt_log(task, NXT_LOG_CRIT,
+ "port %d: short write: %z instead of %uz",
+ port->socket.fd, n, sb.size);
+ goto fail;
+ }
+
+ msg->buf = nxt_sendbuf_completion(task,
+ port->socket.write_work_queue,
+ msg->buf,
+ n - sizeof(nxt_port_msg_t));
+
+ if (msg->buf != NULL) {
+ /*
+ * A file descriptor is sent only
+ * in the first message of a stream.
+ */
+ msg->fd = -1;
+ msg->share += n;
+
+ if (msg->share >= port->max_share) {
+ msg->share = 0;
+ nxt_queue_remove(link);
+ nxt_queue_insert_tail(&port->messages, link);
+ }
+
+ } else {
+ nxt_queue_remove(link);
+ nxt_mem_cache_free0(port->mem_pool, msg,
+ sizeof(nxt_port_send_msg_t));
+ }
+
+ } else if (nxt_slow_path(n == NXT_ERROR)) {
+ goto fail;
+ }
+
+ /* n == NXT_AGAIN */
+
+ } while (port->socket.write_ready);
+
+ if (nxt_event_fd_is_disabled(port->socket.write)) {
+ nxt_event_fd_enable_write(task->thread->engine, &port->socket);
+ }
+
+ return;
+
+fail:
+
+ nxt_work_queue_add(&task->thread->engine->fast_work_queue,
+ nxt_port_error_handler, task, &port->socket, NULL);
+}
+
+
+void
+nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port)
+{
+ port->socket.fd = port->pair[0];
+ port->socket.log = &nxt_main_log;
+
+ port->task.thread = task->thread;
+ port->task.log = port->socket.log;
+ port->task.ident = nxt_task_next_ident();
+
+ port->socket.task = &port->task;
+
+ port->socket.read_work_queue = &task->thread->engine->fast_work_queue;
+ port->socket.read_handler = nxt_port_read_handler;
+ port->socket.error_handler = nxt_port_error_handler;
+
+ nxt_event_fd_enable_read(task->thread->engine, &port->socket);
+}
+
+
+void
+nxt_port_read_close(nxt_port_t *port)
+{
+ nxt_socket_close(port->pair[0]);
+ port->pair[0] = -1;
+}
+
+
+static void
+nxt_port_read_handler(nxt_task_t *task, void *obj, void *data)
+{
+ ssize_t n;
+ nxt_fd_t fd;
+ nxt_buf_t *b;
+ nxt_port_t *port;
+ nxt_iobuf_t iob[2];
+ nxt_port_msg_t msg;
+
+ port = obj;
+
+ for ( ;; ) {
+
+ b = nxt_port_buf_alloc(port);
+
+ if (nxt_slow_path(b == NULL)) {
+ /* TODO: disable event for some time */
+ }
+
+ nxt_iobuf_set(&iob[0], &msg, sizeof(nxt_port_msg_t));
+ nxt_iobuf_set(&iob[1], b->mem.pos, port->max_size);
+
+ n = nxt_socketpair_recv(&port->socket, &fd, iob, 2);
+
+ if (n > 0) {
+ nxt_port_read_msg_process(task, port, &msg, fd, b, n);
+
+ if (b->mem.pos == b->mem.free) {
+
+ if (b->next != NULL) {
+ /* A sync buffer */
+ nxt_buf_free(port->mem_pool, b->next);
+ }
+
+ nxt_port_buf_free(port, b);
+ }
+
+ if (port->socket.read_ready) {
+ continue;
+ }
+
+ return;
+ }
+
+ if (n == NXT_AGAIN) {
+ nxt_port_buf_free(port, b);
+
+ nxt_event_fd_enable_read(task->thread->engine, &port->socket);
+ return;
+ }
+
+ /* n == 0 || n == NXT_ERROR */
+
+ nxt_work_queue_add(&task->thread->engine->fast_work_queue,
+ nxt_port_error_handler, task, &port->socket, NULL);
+ return;
+ }
+}
+
+
+static void
+nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
+ nxt_port_msg_t *msg, nxt_fd_t fd, nxt_buf_t *b, size_t size)
+{
+ nxt_buf_t *sync;
+ nxt_port_recv_msg_t recv_msg;
+
+ if (nxt_slow_path(size < sizeof(nxt_port_msg_t))) {
+ nxt_log(port->socket.task, NXT_LOG_CRIT,
+ "port %d: too small message:%uz", port->socket.fd, size);
+ goto fail;
+ }
+
+ recv_msg.stream = msg->stream;
+ recv_msg.type = msg->type;
+ recv_msg.fd = fd;
+ recv_msg.buf = b;
+ recv_msg.port = port;
+
+ b->mem.free += size - sizeof(nxt_port_msg_t);
+
+ if (msg->last) {
+ sync = nxt_buf_sync_alloc(port->mem_pool, NXT_BUF_SYNC_LAST);
+ if (nxt_slow_path(sync == NULL)) {
+ goto fail;
+ }
+
+ b->next = sync;
+ }
+
+ port->handler(task, &recv_msg);
+
+ return;
+
+fail:
+
+ if (fd != -1) {
+ nxt_fd_close(fd);
+ }
+}
+
+
+static nxt_buf_t *
+nxt_port_buf_alloc(nxt_port_t *port)
+{
+ nxt_buf_t *b;
+
+ if (port->free_bufs != NULL) {
+ b = port->free_bufs;
+ port->free_bufs = b->next;
+
+ b->mem.pos = b->mem.start;
+ b->mem.free = b->mem.start;
+
+ } else {
+ b = nxt_buf_mem_alloc(port->mem_pool, port->max_size, 0);
+ if (nxt_slow_path(b == NULL)) {
+ return NULL;
+ }
+ }
+
+ return b;
+}
+
+
+static void
+nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b)
+{
+ b->next = port->free_bufs;
+ port->free_bufs = b;
+}
+
+
+static void
+nxt_port_error_handler(nxt_task_t *task, void *obj, void *data)
+{
+ /* TODO */
+}