diff options
Diffstat (limited to 'src/nxt_chan.c')
-rw-r--r-- | src/nxt_chan.c | 464 |
1 files changed, 0 insertions, 464 deletions
diff --git a/src/nxt_chan.c b/src/nxt_chan.c deleted file mode 100644 index 6832ecdc..00000000 --- a/src/nxt_chan.c +++ /dev/null @@ -1,464 +0,0 @@ - -/* - * Copyright (C) Igor Sysoev - * Copyright (C) NGINX, Inc. - */ - -#include <nxt_main.h> - - -static void nxt_chan_write_handler(nxt_task_t *task, void *obj, void *data); -static void nxt_chan_read_handler(nxt_task_t *task, void *obj, void *data); -static void nxt_chan_read_msg_process(nxt_task_t *task, nxt_chan_t *chan, - nxt_chan_msg_t *msg, nxt_fd_t fd, nxt_buf_t *b, size_t size); -static nxt_buf_t *nxt_chan_buf_alloc(nxt_chan_t *chan); -static void nxt_chan_buf_free(nxt_chan_t *chan, nxt_buf_t *b); -static void nxt_chan_error_handler(nxt_task_t *task, void *obj, void *data); - - -nxt_chan_t * -nxt_chan_alloc(void) -{ - nxt_chan_t *chan; - nxt_mem_pool_t *mp; - - mp = nxt_mem_pool_create(1024); - - if (nxt_fast_path(mp != NULL)) { - /* This allocation cannot fail. */ - chan = nxt_mem_zalloc(mp, sizeof(nxt_chan_t)); - chan->mem_pool = mp; - - chan->pair[0] = -1; - chan->pair[1] = -1; - - nxt_queue_init(&chan->messages); - - return chan; - } - - return NULL; -} - - -nxt_chan_t * -nxt_chan_create(size_t max_size) -{ - nxt_int_t sndbuf, rcvbuf, size; - nxt_chan_t *chan; - nxt_socket_t snd, rcv; - - chan = nxt_chan_alloc(); - if (nxt_slow_path(chan == NULL)) { - return NULL; - } - - if (nxt_slow_path(nxt_socketpair_create(chan->pair) != NXT_OK)) { - goto socketpair_fail; - } - - snd = chan->pair[1]; - - sndbuf = nxt_socket_getsockopt(snd, SOL_SOCKET, SO_SNDBUF); - if (nxt_slow_path(sndbuf < 0)) { - goto getsockopt_fail; - } - - rcv = chan->pair[0]; - - rcvbuf = nxt_socket_getsockopt(rcv, SOL_SOCKET, SO_RCVBUF); - if (nxt_slow_path(rcvbuf < 0)) { - goto getsockopt_fail; - } - - if (max_size == 0) { - max_size = 16 * 1024; - } - - if ((size_t) sndbuf < max_size) { - /* - * On Unix domain sockets - * Linux uses 224K on both send and receive directions; - * FreeBSD, MacOSX, NetBSD, and OpenBSD use 2K buffer size - * on send direction and 4K buffer size on receive direction; - * Solaris uses 16K on send direction and 5K on receive direction. - */ - (void) nxt_socket_setsockopt(snd, SOL_SOCKET, SO_SNDBUF, max_size); - - sndbuf = nxt_socket_getsockopt(snd, SOL_SOCKET, SO_SNDBUF); - if (nxt_slow_path(sndbuf < 0)) { - goto getsockopt_fail; - } - - size = sndbuf * 4; - - if (rcvbuf < size) { - (void) nxt_socket_setsockopt(rcv, SOL_SOCKET, SO_RCVBUF, size); - - rcvbuf = nxt_socket_getsockopt(rcv, SOL_SOCKET, SO_RCVBUF); - if (nxt_slow_path(rcvbuf < 0)) { - goto getsockopt_fail; - } - } - } - - chan->max_size = nxt_min(max_size, (size_t) sndbuf); - chan->max_share = (64 * 1024); - - return chan; - -getsockopt_fail: - - nxt_socket_close(chan->pair[0]); - nxt_socket_close(chan->pair[1]); - -socketpair_fail: - - nxt_mem_pool_destroy(chan->mem_pool); - - return NULL; -} - - -void -nxt_chan_destroy(nxt_chan_t *chan) -{ - nxt_socket_close(chan->socket.fd); - nxt_mem_pool_destroy(chan->mem_pool); -} - - -void -nxt_chan_write_enable(nxt_task_t *task, nxt_chan_t *chan) -{ - chan->socket.fd = chan->pair[1]; - chan->socket.log = &nxt_main_log; - chan->socket.write_ready = 1; - - chan->task.thread = task->thread; - chan->task.log = chan->socket.log; - chan->task.ident = nxt_task_next_ident(); - - chan->socket.task = &chan->task; - - chan->socket.write_work_queue = &task->thread->engine->fast_work_queue; - chan->socket.write_handler = nxt_chan_write_handler; - chan->socket.error_handler = nxt_chan_error_handler; -} - - -void -nxt_chan_write_close(nxt_chan_t *chan) -{ - nxt_socket_close(chan->pair[1]); - chan->pair[1] = -1; -} - - -nxt_int_t -nxt_chan_write(nxt_task_t *task, nxt_chan_t *chan, nxt_uint_t type, - nxt_fd_t fd, uint32_t stream, nxt_buf_t *b) -{ - nxt_queue_link_t *link; - nxt_chan_send_msg_t *msg; - - for (link = nxt_queue_first(&chan->messages); - link != nxt_queue_tail(&chan->messages); - link = nxt_queue_next(link)) - { - msg = (nxt_chan_send_msg_t *) link; - - if (msg->chan_msg.stream == stream) { - /* - * An fd is ignored since a file descriptor - * must be sent only in the first message of a stream. - */ - nxt_buf_chain_add(&msg->buf, b); - - return NXT_OK; - } - } - - msg = nxt_mem_cache_zalloc0(chan->mem_pool, sizeof(nxt_chan_send_msg_t)); - if (nxt_slow_path(msg == NULL)) { - return NXT_ERROR; - } - - msg->buf = b; - msg->fd = fd; - msg->share = 0; - - msg->chan_msg.stream = stream; - msg->chan_msg.type = type; - msg->chan_msg.last = 0; - - nxt_queue_insert_tail(&chan->messages, &msg->link); - - if (chan->socket.write_ready) { - nxt_chan_write_handler(task, chan, NULL); - } - - return NXT_OK; -} - - -static void -nxt_chan_write_handler(nxt_task_t *task, void *obj, void *data) -{ - ssize_t n; - nxt_uint_t niob; - nxt_chan_t *chan; - struct iovec iob[NXT_IOBUF_MAX]; - nxt_queue_link_t *link; - nxt_chan_send_msg_t *msg; - nxt_sendbuf_coalesce_t sb; - - chan = obj; - - do { - link = nxt_queue_first(&chan->messages); - - if (link == nxt_queue_tail(&chan->messages)) { - nxt_event_fd_block_write(task->thread->engine, &chan->socket); - return; - } - - msg = (nxt_chan_send_msg_t *) link; - - nxt_iobuf_set(&iob[0], &msg->chan_msg, sizeof(nxt_chan_msg_t)); - - sb.buf = msg->buf; - sb.iobuf = &iob[1]; - sb.nmax = NXT_IOBUF_MAX - 1; - sb.sync = 0; - sb.last = 0; - sb.size = sizeof(nxt_chan_msg_t); - sb.limit = chan->max_size; - - niob = nxt_sendbuf_mem_coalesce(task, &sb); - - msg->chan_msg.last = sb.last; - - n = nxt_socketpair_send(&chan->socket, msg->fd, iob, niob + 1); - - if (n > 0) { - if (nxt_slow_path((size_t) n != sb.size)) { - nxt_log(task, NXT_LOG_CRIT, - "chan %d: short write: %z instead of %uz", - chan->socket.fd, n, sb.size); - goto fail; - } - - msg->buf = nxt_sendbuf_completion(task, - chan->socket.write_work_queue, - msg->buf, - n - sizeof(nxt_chan_msg_t)); - - if (msg->buf != NULL) { - /* - * A file descriptor is sent only - * in the first message of a stream. - */ - msg->fd = -1; - msg->share += n; - - if (msg->share >= chan->max_share) { - msg->share = 0; - nxt_queue_remove(link); - nxt_queue_insert_tail(&chan->messages, link); - } - - } else { - nxt_queue_remove(link); - nxt_mem_cache_free0(chan->mem_pool, msg, - sizeof(nxt_chan_send_msg_t)); - } - - } else if (nxt_slow_path(n == NXT_ERROR)) { - goto fail; - } - - /* n == NXT_AGAIN */ - - } while (chan->socket.write_ready); - - if (nxt_event_fd_is_disabled(chan->socket.write)) { - nxt_event_fd_enable_write(task->thread->engine, &chan->socket); - } - - return; - -fail: - - nxt_work_queue_add(&task->thread->engine->fast_work_queue, - nxt_chan_error_handler, task, &chan->socket, NULL); -} - - -void -nxt_chan_read_enable(nxt_task_t *task, nxt_chan_t *chan) -{ - chan->socket.fd = chan->pair[0]; - chan->socket.log = &nxt_main_log; - - chan->task.thread = task->thread; - chan->task.log = chan->socket.log; - chan->task.ident = nxt_task_next_ident(); - - chan->socket.task = &chan->task; - - chan->socket.read_work_queue = &task->thread->engine->fast_work_queue; - chan->socket.read_handler = nxt_chan_read_handler; - chan->socket.error_handler = nxt_chan_error_handler; - - nxt_event_fd_enable_read(task->thread->engine, &chan->socket); -} - - -void -nxt_chan_read_close(nxt_chan_t *chan) -{ - nxt_socket_close(chan->pair[0]); - chan->pair[0] = -1; -} - - -static void -nxt_chan_read_handler(nxt_task_t *task, void *obj, void *data) -{ - ssize_t n; - nxt_fd_t fd; - nxt_buf_t *b; - nxt_chan_t *chan; - nxt_iobuf_t iob[2]; - nxt_chan_msg_t msg; - - chan = obj; - - for ( ;; ) { - - b = nxt_chan_buf_alloc(chan); - - if (nxt_slow_path(b == NULL)) { - /* TODO: disable event for some time */ - } - - nxt_iobuf_set(&iob[0], &msg, sizeof(nxt_chan_msg_t)); - nxt_iobuf_set(&iob[1], b->mem.pos, chan->max_size); - - n = nxt_socketpair_recv(&chan->socket, &fd, iob, 2); - - if (n > 0) { - nxt_chan_read_msg_process(task, chan, &msg, fd, b, n); - - if (b->mem.pos == b->mem.free) { - - if (b->next != NULL) { - /* A sync buffer */ - nxt_buf_free(chan->mem_pool, b->next); - } - - nxt_chan_buf_free(chan, b); - } - - if (chan->socket.read_ready) { - continue; - } - - return; - } - - if (n == NXT_AGAIN) { - nxt_chan_buf_free(chan, b); - - nxt_event_fd_enable_read(task->thread->engine, &chan->socket); - return; - } - - /* n == 0 || n == NXT_ERROR */ - - nxt_work_queue_add(&task->thread->engine->fast_work_queue, - nxt_chan_error_handler, task, &chan->socket, NULL); - return; - } -} - - -static void -nxt_chan_read_msg_process(nxt_task_t *task, nxt_chan_t *chan, - nxt_chan_msg_t *msg, nxt_fd_t fd, nxt_buf_t *b, size_t size) -{ - nxt_buf_t *sync; - nxt_chan_recv_msg_t recv_msg; - - if (nxt_slow_path(size < sizeof(nxt_chan_msg_t))) { - nxt_log(chan->socket.task, NXT_LOG_CRIT, - "chan %d: too small message:%uz", chan->socket.fd, size); - goto fail; - } - - recv_msg.stream = msg->stream; - recv_msg.type = msg->type; - recv_msg.fd = fd; - recv_msg.buf = b; - recv_msg.chan = chan; - - b->mem.free += size - sizeof(nxt_chan_msg_t); - - if (msg->last) { - sync = nxt_buf_sync_alloc(chan->mem_pool, NXT_BUF_SYNC_LAST); - if (nxt_slow_path(sync == NULL)) { - goto fail; - } - - b->next = sync; - } - - chan->handler(task, &recv_msg); - - return; - -fail: - - if (fd != -1) { - nxt_fd_close(fd); - } -} - - -static nxt_buf_t * -nxt_chan_buf_alloc(nxt_chan_t *chan) -{ - nxt_buf_t *b; - - if (chan->free_bufs != NULL) { - b = chan->free_bufs; - chan->free_bufs = b->next; - - b->mem.pos = b->mem.start; - b->mem.free = b->mem.start; - - } else { - b = nxt_buf_mem_alloc(chan->mem_pool, chan->max_size, 0); - if (nxt_slow_path(b == NULL)) { - return NULL; - } - } - - return b; -} - - -static void -nxt_chan_buf_free(nxt_chan_t *chan, nxt_buf_t *b) -{ - b->next = chan->free_bufs; - chan->free_bufs = b; -} - - -static void -nxt_chan_error_handler(nxt_task_t *task, void *obj, void *data) -{ - /* TODO */ -} |