summaryrefslogblamecommitdiffhomepage
path: root/src/nxt_chan.c
blob: 6832ecdccc1bf2572c32aa1c4bf9044e1d849564 (plain) (tree)
1
2
3
4
5
6
7
8
9








                            


                                                                            


                                                                 
                                                                            


















































































































                                                                            
                                                         




                                     





                                             
                                                                           













                                                        

                                                                   
 



































                                                                             
                                                 






                  
                                                               














                                                      
                                                                          














                                                                       
                                                   






                                                                       


                                                                  


                          
                                                   
































                                                                            
                                                                       





           

                                                                          



    
                                                        



                                     





                                             
                                                                          


                                                        
                                                                  











                                     
                                                              























                                                             
                                                                  




















                                                          
                                                                          




                                      

                                                                              





               
                                                             





                                                                

                                                                         



















                                                                     
                                   










































                                                                 
                                                               


              

/*
 * Copyright (C) Igor Sysoev
 * Copyright (C) NGINX, Inc.
 */

#include <nxt_main.h>


static void nxt_chan_write_handler(nxt_task_t *task, void *obj, void *data);
static void nxt_chan_read_handler(nxt_task_t *task, void *obj, void *data);
static void nxt_chan_read_msg_process(nxt_task_t *task, nxt_chan_t *chan,
    nxt_chan_msg_t *msg, nxt_fd_t fd, nxt_buf_t *b, size_t size);
static nxt_buf_t *nxt_chan_buf_alloc(nxt_chan_t *chan);
static void nxt_chan_buf_free(nxt_chan_t *chan, nxt_buf_t *b);
static void nxt_chan_error_handler(nxt_task_t *task, void *obj, void *data);


nxt_chan_t *
nxt_chan_alloc(void)
{
    nxt_chan_t      *chan;
    nxt_mem_pool_t  *mp;

    mp = nxt_mem_pool_create(1024);

    if (nxt_fast_path(mp != NULL)) {
        /* This allocation cannot fail. */
        chan = nxt_mem_zalloc(mp, sizeof(nxt_chan_t));
        chan->mem_pool = mp;

        chan->pair[0] = -1;
        chan->pair[1] = -1;

        nxt_queue_init(&chan->messages);

        return chan;
    }

    return NULL;
}


nxt_chan_t *
nxt_chan_create(size_t max_size)
{
    nxt_int_t     sndbuf, rcvbuf, size;
    nxt_chan_t    *chan;
    nxt_socket_t  snd, rcv;

    chan = nxt_chan_alloc();
    if (nxt_slow_path(chan == NULL)) {
        return NULL;
    }

    if (nxt_slow_path(nxt_socketpair_create(chan->pair) != NXT_OK)) {
        goto socketpair_fail;
    }

    snd = chan->pair[1];

    sndbuf = nxt_socket_getsockopt(snd, SOL_SOCKET, SO_SNDBUF);
    if (nxt_slow_path(sndbuf < 0)) {
        goto getsockopt_fail;
    }

    rcv = chan->pair[0];

    rcvbuf = nxt_socket_getsockopt(rcv, SOL_SOCKET, SO_RCVBUF);
    if (nxt_slow_path(rcvbuf < 0)) {
        goto getsockopt_fail;
    }

    if (max_size == 0) {
        max_size = 16 * 1024;
    }

    if ((size_t) sndbuf < max_size) {
        /*
         * On Unix domain sockets
         *   Linux uses 224K on both send and receive directions;
         *   FreeBSD, MacOSX, NetBSD, and OpenBSD use 2K buffer size
         *   on send direction and 4K buffer size on receive direction;
         *   Solaris uses 16K on send direction and 5K on receive direction.
         */
        (void) nxt_socket_setsockopt(snd, SOL_SOCKET, SO_SNDBUF, max_size);

        sndbuf = nxt_socket_getsockopt(snd, SOL_SOCKET, SO_SNDBUF);
        if (nxt_slow_path(sndbuf < 0)) {
            goto getsockopt_fail;
        }

        size = sndbuf * 4;

        if (rcvbuf < size) {
            (void) nxt_socket_setsockopt(rcv, SOL_SOCKET, SO_RCVBUF, size);

            rcvbuf = nxt_socket_getsockopt(rcv, SOL_SOCKET, SO_RCVBUF);
            if (nxt_slow_path(rcvbuf < 0)) {
                goto getsockopt_fail;
            }
        }
    }

    chan->max_size = nxt_min(max_size, (size_t) sndbuf);
    chan->max_share = (64 * 1024);

    return chan;

getsockopt_fail:

    nxt_socket_close(chan->pair[0]);
    nxt_socket_close(chan->pair[1]);

socketpair_fail:

    nxt_mem_pool_destroy(chan->mem_pool);

    return NULL;
}


void
nxt_chan_destroy(nxt_chan_t *chan)
{
    nxt_socket_close(chan->socket.fd);
    nxt_mem_pool_destroy(chan->mem_pool);
}


void
nxt_chan_write_enable(nxt_task_t *task, nxt_chan_t *chan)
{
    chan->socket.fd = chan->pair[1];
    chan->socket.log = &nxt_main_log;
    chan->socket.write_ready = 1;

    chan->task.thread = task->thread;
    chan->task.log = chan->socket.log;
    chan->task.ident = nxt_task_next_ident();

    chan->socket.task = &chan->task;

    chan->socket.write_work_queue = &task->thread->engine->fast_work_queue;
    chan->socket.write_handler = nxt_chan_write_handler;
    chan->socket.error_handler = nxt_chan_error_handler;
}


void
nxt_chan_write_close(nxt_chan_t *chan)
{
    nxt_socket_close(chan->pair[1]);
    chan->pair[1] = -1;
}


nxt_int_t
nxt_chan_write(nxt_task_t *task, nxt_chan_t *chan, nxt_uint_t type,
    nxt_fd_t fd, uint32_t stream, nxt_buf_t *b)
{
    nxt_queue_link_t     *link;
    nxt_chan_send_msg_t  *msg;

    for (link = nxt_queue_first(&chan->messages);
         link != nxt_queue_tail(&chan->messages);
         link = nxt_queue_next(link))
    {
        msg = (nxt_chan_send_msg_t *) link;

        if (msg->chan_msg.stream == stream) {
            /*
             * An fd is ignored since a file descriptor
             * must be sent only in the first message of a stream.
             */
            nxt_buf_chain_add(&msg->buf, b);

            return NXT_OK;
        }
    }

    msg = nxt_mem_cache_zalloc0(chan->mem_pool, sizeof(nxt_chan_send_msg_t));
    if (nxt_slow_path(msg == NULL)) {
        return NXT_ERROR;
    }

    msg->buf = b;
    msg->fd = fd;
    msg->share = 0;

    msg->chan_msg.stream = stream;
    msg->chan_msg.type = type;
    msg->chan_msg.last = 0;

    nxt_queue_insert_tail(&chan->messages, &msg->link);

    if (chan->socket.write_ready) {
        nxt_chan_write_handler(task, chan, NULL);
    }

    return NXT_OK;
}


static void
nxt_chan_write_handler(nxt_task_t *task, void *obj, void *data)
{
    ssize_t                 n;
    nxt_uint_t              niob;
    nxt_chan_t              *chan;
    struct iovec            iob[NXT_IOBUF_MAX];
    nxt_queue_link_t        *link;
    nxt_chan_send_msg_t     *msg;
    nxt_sendbuf_coalesce_t  sb;

    chan = obj;

    do {
        link = nxt_queue_first(&chan->messages);

        if (link == nxt_queue_tail(&chan->messages)) {
            nxt_event_fd_block_write(task->thread->engine, &chan->socket);
            return;
        }

        msg = (nxt_chan_send_msg_t *) link;

        nxt_iobuf_set(&iob[0], &msg->chan_msg, sizeof(nxt_chan_msg_t));

        sb.buf = msg->buf;
        sb.iobuf = &iob[1];
        sb.nmax = NXT_IOBUF_MAX - 1;
        sb.sync = 0;
        sb.last = 0;
        sb.size = sizeof(nxt_chan_msg_t);
        sb.limit = chan->max_size;

        niob = nxt_sendbuf_mem_coalesce(task, &sb);

        msg->chan_msg.last = sb.last;

        n = nxt_socketpair_send(&chan->socket, msg->fd, iob, niob + 1);

        if (n > 0) {
            if (nxt_slow_path((size_t) n != sb.size)) {
                nxt_log(task, NXT_LOG_CRIT,
                        "chan %d: short write: %z instead of %uz",
                        chan->socket.fd, n, sb.size);
                goto fail;
            }

            msg->buf = nxt_sendbuf_completion(task,
                                              chan->socket.write_work_queue,
                                              msg->buf,
                                              n - sizeof(nxt_chan_msg_t));

            if (msg->buf != NULL) {
                /*
                 * A file descriptor is sent only
                 * in the first message of a stream.
                 */
                msg->fd = -1;
                msg->share += n;

                if (msg->share >= chan->max_share) {
                    msg->share = 0;
                    nxt_queue_remove(link);
                    nxt_queue_insert_tail(&chan->messages, link);
                }

            } else {
                nxt_queue_remove(link);
                nxt_mem_cache_free0(chan->mem_pool, msg,
                                    sizeof(nxt_chan_send_msg_t));
            }

        } else if (nxt_slow_path(n == NXT_ERROR)) {
            goto fail;
        }

        /* n == NXT_AGAIN */

    } while (chan->socket.write_ready);

    if (nxt_event_fd_is_disabled(chan->socket.write)) {
        nxt_event_fd_enable_write(task->thread->engine, &chan->socket);
    }

    return;

fail:

    nxt_work_queue_add(&task->thread->engine->fast_work_queue,
                       nxt_chan_error_handler, task, &chan->socket, NULL);
}


void
nxt_chan_read_enable(nxt_task_t *task, nxt_chan_t *chan)
{
    chan->socket.fd = chan->pair[0];
    chan->socket.log = &nxt_main_log;

    chan->task.thread = task->thread;
    chan->task.log = chan->socket.log;
    chan->task.ident = nxt_task_next_ident();

    chan->socket.task = &chan->task;

    chan->socket.read_work_queue = &task->thread->engine->fast_work_queue;
    chan->socket.read_handler = nxt_chan_read_handler;
    chan->socket.error_handler = nxt_chan_error_handler;

    nxt_event_fd_enable_read(task->thread->engine, &chan->socket);
}


void
nxt_chan_read_close(nxt_chan_t *chan)
{
    nxt_socket_close(chan->pair[0]);
    chan->pair[0] = -1;
}


static void
nxt_chan_read_handler(nxt_task_t *task, void *obj, void *data)
{
    ssize_t         n;
    nxt_fd_t        fd;
    nxt_buf_t       *b;
    nxt_chan_t      *chan;
    nxt_iobuf_t     iob[2];
    nxt_chan_msg_t  msg;

    chan = obj;

    for ( ;; ) {

        b = nxt_chan_buf_alloc(chan);

        if (nxt_slow_path(b == NULL)) {
            /* TODO: disable event for some time */
        }

        nxt_iobuf_set(&iob[0], &msg, sizeof(nxt_chan_msg_t));
        nxt_iobuf_set(&iob[1], b->mem.pos, chan->max_size);

        n = nxt_socketpair_recv(&chan->socket, &fd, iob, 2);

        if (n > 0) {
            nxt_chan_read_msg_process(task, chan, &msg, fd, b, n);

            if (b->mem.pos == b->mem.free) {

                if (b->next != NULL) {
                    /* A sync buffer */
                    nxt_buf_free(chan->mem_pool, b->next);
                }

                nxt_chan_buf_free(chan, b);
            }

            if (chan->socket.read_ready) {
                continue;
            }

            return;
        }

        if (n == NXT_AGAIN) {
            nxt_chan_buf_free(chan, b);

            nxt_event_fd_enable_read(task->thread->engine, &chan->socket);
            return;
        }

        /* n == 0 || n == NXT_ERROR */

        nxt_work_queue_add(&task->thread->engine->fast_work_queue,
                           nxt_chan_error_handler, task, &chan->socket, NULL);
        return;
    }
}


static void
nxt_chan_read_msg_process(nxt_task_t *task, nxt_chan_t *chan,
    nxt_chan_msg_t *msg, nxt_fd_t fd, nxt_buf_t *b, size_t size)
{
    nxt_buf_t            *sync;
    nxt_chan_recv_msg_t  recv_msg;

    if (nxt_slow_path(size < sizeof(nxt_chan_msg_t))) {
        nxt_log(chan->socket.task, NXT_LOG_CRIT,
                "chan %d: too small message:%uz", chan->socket.fd, size);
        goto fail;
    }

    recv_msg.stream = msg->stream;
    recv_msg.type = msg->type;
    recv_msg.fd = fd;
    recv_msg.buf = b;
    recv_msg.chan = chan;

    b->mem.free += size - sizeof(nxt_chan_msg_t);

    if (msg->last) {
        sync = nxt_buf_sync_alloc(chan->mem_pool, NXT_BUF_SYNC_LAST);
        if (nxt_slow_path(sync == NULL)) {
            goto fail;
        }

        b->next = sync;
    }

    chan->handler(task, &recv_msg);

    return;

fail:

    if (fd != -1) {
        nxt_fd_close(fd);
    }
}


static nxt_buf_t *
nxt_chan_buf_alloc(nxt_chan_t *chan)
{
    nxt_buf_t  *b;

    if (chan->free_bufs != NULL) {
        b = chan->free_bufs;
        chan->free_bufs = b->next;

        b->mem.pos = b->mem.start;
        b->mem.free = b->mem.start;

    } else {
        b = nxt_buf_mem_alloc(chan->mem_pool, chan->max_size, 0);
        if (nxt_slow_path(b == NULL)) {
            return NULL;
        }
    }

    return b;
}


static void
nxt_chan_buf_free(nxt_chan_t *chan, nxt_buf_t *b)
{
    b->next = chan->free_bufs;
    chan->free_bufs = b;
}


static void
nxt_chan_error_handler(nxt_task_t *task, void *obj, void *data)
{
    /* TODO */
}