summaryrefslogblamecommitdiffhomepage
path: root/src/nxt_poll.c
blob: 2cc0200b75d2a893002ce26c8c502bece0a6f2ac (plain) (tree)



























































                                                                                
                                                                           


















































































































































































































































































































































                                                                              
                                                                             






































































































































                                                                          
                                                               




















                                                                  
                                                                       




                                                    
                                         
 
                                           


                                                                  
                                                      
















                                                    


                                                                        







                                                                 

                                                                 

                                                      


                                                             



                                                                 



                                                                     








































                                                                             

                                                                                









                                                                         


                                                                         



















































































                                                                           

/*
 * Copyright (C) Igor Sysoev
 * Copyright (C) NGINX, Inc.
 */

#include <nxt_main.h>


#define NXT_POLL_ADD     0
#define NXT_POLL_CHANGE  1
#define NXT_POLL_DELETE  2


typedef struct {
    /*
     * A file descriptor is stored in hash entry to allow
     * nxt_poll_fd_hash_test() to not dereference a pointer to
     * nxt_event_fd_t which may be invalid if the file descriptor has
     * been already closed and the nxt_event_fd_t's memory has been freed.
     */
    nxt_socket_t         fd;

    uint32_t             index;
    void                 *event;
} nxt_poll_hash_entry_t;


static nxt_event_set_t *nxt_poll_create(nxt_event_signals_t *signals,
    nxt_uint_t mchanges, nxt_uint_t mevents);
static void nxt_poll_free(nxt_event_set_t *event_set);
static void nxt_poll_enable(nxt_event_set_t *event_set, nxt_event_fd_t *ev);
static void nxt_poll_disable(nxt_event_set_t *event_set, nxt_event_fd_t *ev);
static void nxt_poll_drop_changes(nxt_event_set_t *event_set,
    nxt_event_fd_t *ev);
static void nxt_poll_enable_read(nxt_event_set_t *event_set,
    nxt_event_fd_t *ev);
static void nxt_poll_enable_write(nxt_event_set_t *event_set,
    nxt_event_fd_t *ev);
static void nxt_poll_disable_read(nxt_event_set_t *event_set,
    nxt_event_fd_t *ev);
static void nxt_poll_disable_write(nxt_event_set_t *event_set,
    nxt_event_fd_t *ev);
static void nxt_poll_block_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev);
static void nxt_poll_block_write(nxt_event_set_t *event_set,
    nxt_event_fd_t *ev);
static void nxt_poll_oneshot_read(nxt_event_set_t *event_set,
    nxt_event_fd_t *ev);
static void nxt_poll_oneshot_write(nxt_event_set_t *event_set,
    nxt_event_fd_t *ev);
static void nxt_poll_change(nxt_event_set_t *event_set, nxt_event_fd_t *ev,
    nxt_uint_t op, nxt_uint_t events);
static nxt_int_t nxt_poll_commit_changes(nxt_thread_t *thr,
    nxt_poll_event_set_t *ps);
static nxt_int_t nxt_poll_set_add(nxt_thread_t *thr, nxt_poll_event_set_t *ps,
    nxt_poll_change_t *ch);
static nxt_int_t nxt_poll_set_change(nxt_thread_t *thr,
    nxt_poll_event_set_t *ps, nxt_poll_change_t *ch);
static nxt_int_t nxt_poll_set_delete(nxt_thread_t *thr,
    nxt_poll_event_set_t *ps, nxt_poll_change_t *ch);
static void nxt_poll_set_poll(nxt_task_t *task, nxt_event_set_t *event_set,
    nxt_msec_t timeout);
static nxt_poll_hash_entry_t *nxt_poll_fd_hash_get(nxt_poll_event_set_t *ps,
    nxt_fd_t fd);
static nxt_int_t nxt_poll_fd_hash_test(nxt_lvlhsh_query_t *lhq, void *data);
static void nxt_poll_fd_hash_destroy(nxt_lvlhsh_t *lh);


const nxt_event_set_ops_t  nxt_poll_event_set = {
    "poll",
    nxt_poll_create,
    nxt_poll_free,
    nxt_poll_enable,
    nxt_poll_disable,
    nxt_poll_disable,
    nxt_poll_disable,
    nxt_poll_enable_read,
    nxt_poll_enable_write,
    nxt_poll_disable_read,
    nxt_poll_disable_write,
    nxt_poll_block_read,
    nxt_poll_block_write,
    nxt_poll_oneshot_read,
    nxt_poll_oneshot_write,
    nxt_poll_enable_read,
    NULL,
    NULL,
    NULL,
    NULL,
    nxt_poll_set_poll,

    &nxt_unix_event_conn_io,

    NXT_NO_FILE_EVENTS,
    NXT_NO_SIGNAL_EVENTS,
};


static const nxt_lvlhsh_proto_t  nxt_poll_fd_hash_proto  nxt_aligned(64) =
{
    NXT_LVLHSH_LARGE_MEMALIGN,
    0,
    nxt_poll_fd_hash_test,
    nxt_lvlhsh_alloc,
    nxt_lvlhsh_free,
};


static nxt_event_set_t *
nxt_poll_create(nxt_event_signals_t *signals, nxt_uint_t mchanges,
    nxt_uint_t mevents)
{
    nxt_event_set_t       *event_set;
    nxt_poll_event_set_t  *ps;

    event_set = nxt_zalloc(sizeof(nxt_poll_event_set_t));
    if (event_set == NULL) {
        return NULL;
    }

    ps = &event_set->poll;

    ps->mchanges = mchanges;

    ps->changes = nxt_malloc(sizeof(nxt_poll_change_t) * mchanges);
    if (ps->changes == NULL) {
        nxt_free(event_set);
        return NULL;
    }

    return event_set;
}


static void
nxt_poll_free(nxt_event_set_t *event_set)
{
    nxt_poll_event_set_t  *ps;

    ps = &event_set->poll;

    nxt_main_log_debug("poll free");

    nxt_free(ps->poll_set);
    nxt_free(ps->changes);
    nxt_poll_fd_hash_destroy(&ps->fd_hash);
    nxt_free(ps);
}


static void
nxt_poll_enable(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
{
    ev->read = NXT_EVENT_DEFAULT;
    ev->write = NXT_EVENT_DEFAULT;

    nxt_poll_change(event_set, ev, NXT_POLL_ADD, POLLIN | POLLOUT);
}


static void
nxt_poll_disable(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
{
    ev->read = NXT_EVENT_INACTIVE;
    ev->write = NXT_EVENT_INACTIVE;

    nxt_poll_drop_changes(event_set, ev);
    /*
     * A simple non-zero value POLLHUP is a flag to ignore error handling
     * if the event is not present in poll set, because the event may be
     * freed at the time when the NXT_POLL_DELETE change will be processed
     * and correct event error_handler will not be available.
     */
    nxt_poll_change(event_set, ev, NXT_POLL_DELETE, POLLHUP);
}


static void
nxt_poll_drop_changes(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
{
    nxt_poll_change_t     *dst, *src, *end;
    nxt_poll_event_set_t  *ps;

    ps = &event_set->poll;

    dst = ps->changes;
    end = dst + ps->nchanges;

    for (src = dst; src < end; src++) {

        if (src->event == ev) {
            continue;
        }

        if (dst != src) {
            *dst = *src;
        }

        dst++;
    }

    ps->nchanges -= end - dst;
}


static void
nxt_poll_enable_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
{
    nxt_uint_t  op, events;

    ev->read = NXT_EVENT_DEFAULT;

    if (ev->write == NXT_EVENT_INACTIVE) {
        op = NXT_POLL_ADD;
        events = POLLIN;

    } else {
        op = NXT_POLL_CHANGE;
        events = POLLIN | POLLOUT;
    }

    nxt_poll_change(event_set, ev, op, events);
}


static void
nxt_poll_enable_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
{
    nxt_uint_t  op, events;

    ev->write = NXT_EVENT_DEFAULT;

    if (ev->read == NXT_EVENT_INACTIVE) {
        op = NXT_POLL_ADD;
        events = POLLOUT;

    } else {
        op = NXT_POLL_CHANGE;
        events = POLLIN | POLLOUT;
    }

    nxt_poll_change(event_set, ev, op, events);
}


static void
nxt_poll_disable_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
{
    nxt_uint_t  op, events;

    ev->read = NXT_EVENT_INACTIVE;

    if (ev->write == NXT_EVENT_INACTIVE) {
        op = NXT_POLL_DELETE;
        events = 0;

    } else {
        op = NXT_POLL_CHANGE;
        events = POLLOUT;
    }

    nxt_poll_change(event_set, ev, op, events);
}


static void
nxt_poll_disable_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
{
    nxt_uint_t  op, events;

    ev->write = NXT_EVENT_INACTIVE;

    if (ev->read == NXT_EVENT_INACTIVE) {
        op = NXT_POLL_DELETE;
        events = 0;

    } else {
        op = NXT_POLL_CHANGE;
        events = POLLIN;
    }

    nxt_poll_change(event_set, ev, op, events);
}


static void
nxt_poll_block_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
{
    if (ev->read != NXT_EVENT_INACTIVE) {
        nxt_poll_disable_read(event_set, ev);
    }
}


static void
nxt_poll_block_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
{
    if (ev->write != NXT_EVENT_INACTIVE) {
        nxt_poll_disable_write(event_set, ev);
    }
}


static void
nxt_poll_oneshot_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
{
    nxt_uint_t  op;

    op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ?
             NXT_POLL_ADD : NXT_POLL_CHANGE;

    ev->read = NXT_EVENT_ONESHOT;
    ev->write = NXT_EVENT_INACTIVE;

    nxt_poll_change(event_set, ev, op, POLLIN);
}


static void
nxt_poll_oneshot_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
{
    nxt_uint_t  op;

    op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ?
             NXT_POLL_ADD : NXT_POLL_CHANGE;

    ev->read = NXT_EVENT_INACTIVE;
    ev->write = NXT_EVENT_ONESHOT;

    nxt_poll_change(event_set, ev, op, POLLOUT);
}


/*
 * poll changes are batched to improve instruction and data cache
 * locality of several lvlhsh operations followed by poll() call.
 */

static void
nxt_poll_change(nxt_event_set_t *event_set, nxt_event_fd_t *ev, nxt_uint_t op,
    nxt_uint_t events)
{
    nxt_poll_change_t     *ch;
    nxt_poll_event_set_t  *ps;

    nxt_log_debug(ev->log, "poll change: fd:%d op:%d ev:%XD",
                  ev->fd, op, events);

    ps = &event_set->poll;

    if (ps->nchanges >= ps->mchanges) {
        (void) nxt_poll_commit_changes(nxt_thread(), ps);
    }

    ch = &ps->changes[ps->nchanges++];
    ch->op = op;
    ch->fd = ev->fd;
    ch->events = events;
    ch->event = ev;
}


static nxt_int_t
nxt_poll_commit_changes(nxt_thread_t *thr, nxt_poll_event_set_t *ps)
{
    nxt_int_t          ret;
    nxt_event_fd_t     *ev;
    nxt_poll_change_t  *ch, *end;

    nxt_log_debug(thr->log, "poll changes:%ui", ps->nchanges);

    ret = NXT_OK;
    ch = ps->changes;
    end = ch + ps->nchanges;

    do {
        ev = ch->event;

        switch (ch->op) {

        case NXT_POLL_ADD:
            if (nxt_fast_path(nxt_poll_set_add(thr, ps, ch) == NXT_OK)) {
                goto next;
            }
            break;

        case NXT_POLL_CHANGE:
            if (nxt_fast_path(nxt_poll_set_change(thr, ps, ch) == NXT_OK)) {
                goto next;
            }
            break;

        case NXT_POLL_DELETE:
            if (nxt_fast_path(nxt_poll_set_delete(thr, ps, ch) == NXT_OK)) {
                goto next;
            }
            break;
        }

        nxt_thread_work_queue_add(thr, &thr->work_queue.main,
                                  ev->error_handler, ev->task, ev, ev->data);

        ret = NXT_ERROR;

      next:

        ch++;

    } while (ch < end);

    ps->nchanges = 0;

    return ret;
}


static nxt_int_t
nxt_poll_set_add(nxt_thread_t *thr, nxt_poll_event_set_t *ps,
    nxt_poll_change_t *ch)
{
    nxt_uint_t             max_nfds;
    struct pollfd          *pfd;
    nxt_lvlhsh_query_t     lhq;
    nxt_poll_hash_entry_t  *phe;

    nxt_log_debug(thr->log, "poll add event: fd:%d ev:%04Xi",
                  ch->fd, ch->events);

    if (ps->nfds >= ps->max_nfds) {
        max_nfds = ps->max_nfds + 512; /* 4K */

        pfd = nxt_realloc(ps->poll_set, sizeof(struct pollfd) * max_nfds);
        if (nxt_slow_path(pfd == NULL)) {
            return NXT_ERROR;
        }

        ps->poll_set = pfd;
        ps->max_nfds = max_nfds;
    }

    phe = nxt_malloc(sizeof(nxt_poll_hash_entry_t));
    if (nxt_slow_path(phe == NULL)) {
        return NXT_ERROR;
    }

    phe->fd = ch->fd;
    phe->index = ps->nfds;
    phe->event = ch->event;

    pfd = &ps->poll_set[ps->nfds++];
    pfd->fd = ch->fd;
    pfd->events = ch->events;
    pfd->revents = 0;

    lhq.key_hash = nxt_murmur_hash2(&ch->fd, sizeof(nxt_fd_t));
    lhq.replace = 0;
    lhq.key.len = sizeof(nxt_fd_t);
    lhq.key.data = (u_char *) &ch->fd;
    lhq.value = phe;
    lhq.proto = &nxt_poll_fd_hash_proto;
    lhq.data = ps->poll_set;

    if (nxt_fast_path(nxt_lvlhsh_insert(&ps->fd_hash, &lhq) == NXT_OK)) {
        return NXT_OK;
    }

    nxt_free(phe);

    return NXT_ERROR;
}


static nxt_int_t
nxt_poll_set_change(nxt_thread_t *thr, nxt_poll_event_set_t *ps,
    nxt_poll_change_t *ch)
{
    nxt_poll_hash_entry_t  *phe;

    nxt_log_debug(thr->log, "poll change event: fd:%d ev:%04Xi",
                  ch->fd, ch->events);

    phe = nxt_poll_fd_hash_get(ps, ch->fd);

    if (nxt_fast_path(phe != NULL)) {
        ps->poll_set[phe->index].events = ch->events;
        return NXT_OK;
    }

    return NXT_ERROR;
}


static nxt_int_t
nxt_poll_set_delete(nxt_thread_t *thr, nxt_poll_event_set_t *ps,
    nxt_poll_change_t *ch)
{
    nxt_uint_t             index;
    nxt_lvlhsh_query_t     lhq;
    nxt_poll_hash_entry_t  *phe;

    nxt_log_debug(thr->log, "poll delete event: fd:%d", ch->fd);

    lhq.key_hash = nxt_murmur_hash2(&ch->fd, sizeof(nxt_fd_t));
    lhq.key.len = sizeof(nxt_fd_t);
    lhq.key.data = (u_char *) &ch->fd;
    lhq.proto = &nxt_poll_fd_hash_proto;
    lhq.data = ps->poll_set;

    if (nxt_slow_path(nxt_lvlhsh_delete(&ps->fd_hash, &lhq) != NXT_OK)) {
        /*
         * Ignore NXT_DECLINED error if ch->events
         * has the special value POLLHUP.
         */
        return (ch->events != 0) ? NXT_OK : NXT_ERROR;
    }

    phe = lhq.value;

    index = phe->index;
    ps->nfds--;

    if (index != ps->nfds) {
        ps->poll_set[index] = ps->poll_set[ps->nfds];

        phe = nxt_poll_fd_hash_get(ps, ps->poll_set[ps->nfds].fd);

        phe->index = index;
    }

    nxt_free(lhq.value);

    return NXT_OK;
}


static void
nxt_poll_set_poll(nxt_task_t *task, nxt_event_set_t *event_set,
    nxt_msec_t timeout)
{
    int                    nevents;
    nxt_fd_t               fd;
    nxt_err_t              err;
    nxt_bool_t             error;
    nxt_uint_t             i, events, level;
    struct pollfd          *pfd;
    nxt_event_fd_t         *ev;
    nxt_poll_event_set_t   *ps;
    nxt_poll_hash_entry_t  *phe;

    ps = &event_set->poll;

    if (ps->nchanges != 0) {
        if (nxt_poll_commit_changes(nxt_thread(), ps) != NXT_OK) {
            /* Error handlers have been enqueued on failure. */
            timeout = 0;
        }
    }

    nxt_debug(task, "poll() events:%ui timeout:%M", ps->nfds, timeout);

    nevents = poll(ps->poll_set, ps->nfds, timeout);

    err = (nevents == -1) ? nxt_errno : 0;

    nxt_thread_time_update(task->thread);

    nxt_debug(task, "poll(): %d", nevents);

    if (nevents == -1) {
        level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_ALERT;
        nxt_log(task, level, "poll() failed %E", err);
        return;
    }

    for (i = 0; i < ps->nfds && nevents != 0; i++) {

        pfd = &ps->poll_set[i];
        events = pfd->revents;

        if (events == 0) {
            continue;
        }

        fd = pfd->fd;

        phe = nxt_poll_fd_hash_get(ps, fd);

        if (nxt_slow_path(phe == NULL)) {
            nxt_log(task, NXT_LOG_CRIT,
                    "poll() returned invalid fd:%d ev:%04Xd rev:%04uXi",
                    fd, pfd->events, events);

            /* Mark the poll entry to ignore it by the kernel. */
            pfd->fd = -1;
            goto next;
        }

        ev = phe->event;

        nxt_debug(ev->task, "poll: fd:%d ev:%04uXi rd:%d %wr:%d",
                  fd, events, ev->read, ev->write);

        if (nxt_slow_path((events & POLLNVAL) != 0)) {
            nxt_log(ev->task, NXT_LOG_CRIT,
                    "poll() error fd:%d ev:%04Xd rev:%04uXi",
                    fd, pfd->events, events);

            /* Mark the poll entry to ignore it by the kernel. */
            pfd->fd = -1;

            nxt_thread_work_queue_add(task->thread,
                                      &task->thread->work_queue.main,
                                      ev->error_handler,
                                      ev->task, ev, ev->data);
            goto next;
        }

        /*
         * On a socket's remote end close:
         *
         *   Linux, FreeBSD, and Solaris set POLLIN;
         *   MacOSX sets POLLIN and POLLHUP;
         *   NetBSD sets POLLIN, and poll(2) claims this explicitly:
         *
         *     If the remote end of a socket is closed, poll()
         *     returns a POLLIN event, rather than a POLLHUP.
         *
         * On error:
         *
         *   Linux sets POLLHUP and POLLERR only;
         *   FreeBSD adds POLLHUP to POLLIN or POLLOUT, although poll(2)
         *   claims the opposite:
         *
         *     Note that POLLHUP and POLLOUT should never be
         *     present in the revents bitmask at the same time.
         *
         *   Solaris and NetBSD do not add POLLHUP or POLLERR;
         *   MacOSX sets POLLHUP only.
         *
         * If an implementation sets POLLERR or POLLHUP only without POLLIN
         * or POLLOUT, the "error" variable enqueues only one active handler.
         */

        error = (((events & (POLLERR | POLLHUP)) != 0)
                 && ((events & (POLLIN | POLLOUT)) == 0));

        if ((events & POLLIN) || (error && ev->read_handler != NULL)) {
            error = 0;
            ev->read_ready = 1;

            if (ev->read == NXT_EVENT_ONESHOT) {
                ev->read = NXT_EVENT_INACTIVE;
                nxt_poll_change(event_set, ev, NXT_POLL_DELETE, 0);
            }

            nxt_thread_work_queue_add(task->thread, ev->read_work_queue,
                                      ev->read_handler, ev->task, ev, ev->data);
        }

        if ((events & POLLOUT) || (error && ev->write_handler != NULL)) {
            ev->write_ready = 1;

            if (ev->write == NXT_EVENT_ONESHOT) {
                ev->write = NXT_EVENT_INACTIVE;
                nxt_poll_change(event_set, ev, NXT_POLL_DELETE, 0);
            }

            nxt_thread_work_queue_add(task->thread, ev->write_work_queue,
                                      ev->write_handler,
                                      ev->task, ev, ev->data);
        }

    next:

        nevents--;
    }
}


static nxt_poll_hash_entry_t *
nxt_poll_fd_hash_get(nxt_poll_event_set_t *ps, nxt_fd_t fd)
{
    nxt_lvlhsh_query_t     lhq;
    nxt_poll_hash_entry_t  *phe;

    lhq.key_hash = nxt_murmur_hash2(&fd, sizeof(nxt_fd_t));
    lhq.key.len = sizeof(nxt_fd_t);
    lhq.key.data = (u_char *) &fd;
    lhq.proto = &nxt_poll_fd_hash_proto;
    lhq.data = ps->poll_set;

    if (nxt_lvlhsh_find(&ps->fd_hash, &lhq) == NXT_OK) {
        phe = lhq.value;
        return phe;
    }

    nxt_thread_log_alert("fd %d not found in hash", fd);

    return NULL;
}


static nxt_int_t
nxt_poll_fd_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
{
    struct pollfd          *poll_set;
    nxt_poll_hash_entry_t  *phe;

    phe = data;

    if (*(nxt_fd_t *) lhq->key.data == phe->fd) {
        poll_set = lhq->data;

        if (nxt_fast_path(phe->fd == poll_set[phe->index].fd)) {
            return NXT_OK;
        }

        nxt_thread_log_alert("fd %d in hash mismatches fd %d in poll set",
                             phe->fd, poll_set[phe->index].fd);
    }

    return NXT_DECLINED;
}


static void
nxt_poll_fd_hash_destroy(nxt_lvlhsh_t *lh)
{
    nxt_lvlhsh_each_t      lhe;
    nxt_lvlhsh_query_t     lhq;
    nxt_poll_hash_entry_t  *phe;

    nxt_memzero(&lhe, sizeof(nxt_lvlhsh_each_t));
    lhe.proto = &nxt_poll_fd_hash_proto;
    lhq.proto = &nxt_poll_fd_hash_proto;

    for ( ;; ) {
        phe = nxt_lvlhsh_each(lh, &lhe);

        if (phe == NULL) {
            return;
        }

        lhq.key_hash = nxt_murmur_hash2(&phe->fd, sizeof(nxt_fd_t));
        lhq.key.len = sizeof(nxt_fd_t);
        lhq.key.data = (u_char *) &phe->fd;

        if (nxt_lvlhsh_delete(lh, &lhq) != NXT_OK) {
            nxt_thread_log_alert("event fd %d not found in hash", phe->fd);
        }

        nxt_free(phe);
    }
}