/*
 * Copyright (C) Igor Sysoev
 * Copyright (C) NGINX, Inc.
 */

#include <nxt_main.h>


/*
 * The first epoll version has been introduced in Linux 2.5.44.  The
 * interface was changed several times since then and the final version
 * of epoll_create(), epoll_ctl(), epoll_wait(), and EPOLLET mode has
 * been introduced in Linux 2.6.0 and is supported since glibc 2.3.2.
 *
 * EPOLLET mode did not work reliable in early implementaions and in
 * Linux 2.4 backport.
 *
 * EPOLLONESHOT             Linux 2.6.2,  glibc 2.3.
 * EPOLLRDHUP               Linux 2.6.17, glibc 2.8.
 * epoll_pwait()            Linux 2.6.19, glibc 2.6.
 * signalfd()               Linux 2.6.22, glibc 2.7.
 * eventfd()                Linux 2.6.22, glibc 2.7.
 * timerfd_create()         Linux 2.6.25, glibc 2.8.
 * epoll_create1()          Linux 2.6.27, glibc 2.9.
 * signalfd4()              Linux 2.6.27, glibc 2.9.
 * eventfd2()               Linux 2.6.27, glibc 2.9.
 * accept4()                Linux 2.6.28, glibc 2.10.
 * eventfd2(EFD_SEMAPHORE)  Linux 2.6.30, glibc 2.10.
 * EPOLLEXCLUSIVE           Linux 4.5, glibc 2.24.
 */


#if (NXT_HAVE_EPOLL_EDGE)
static nxt_int_t nxt_epoll_edge_create(nxt_event_engine_t *engine,
    nxt_uint_t mchanges, nxt_uint_t mevents);
#endif
static nxt_int_t nxt_epoll_level_create(nxt_event_engine_t *engine,
    nxt_uint_t mchanges, nxt_uint_t mevents);
static nxt_int_t nxt_epoll_create(nxt_event_engine_t *engine,
    nxt_uint_t mchanges, nxt_uint_t mevents, nxt_conn_io_t *io, uint32_t mode);
static void nxt_epoll_test_accept4(nxt_event_engine_t *engine,
    nxt_conn_io_t *io);
static void nxt_epoll_free(nxt_event_engine_t *engine);
static void nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
static void nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
static void nxt_epoll_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
static nxt_bool_t nxt_epoll_close(nxt_event_engine_t *engine,
    nxt_fd_event_t *ev);
static void nxt_epoll_enable_read(nxt_event_engine_t *engine,
    nxt_fd_event_t *ev);
static void nxt_epoll_enable_write(nxt_event_engine_t *engine,
    nxt_fd_event_t *ev);
static void nxt_epoll_disable_read(nxt_event_engine_t *engine,
    nxt_fd_event_t *ev);
static void nxt_epoll_disable_write(nxt_event_engine_t *engine,
    nxt_fd_event_t *ev);
static void nxt_epoll_block_read(nxt_event_engine_t *engine,
    nxt_fd_event_t *ev);
static void nxt_epoll_block_write(nxt_event_engine_t *engine,
    nxt_fd_event_t *ev);
static void nxt_epoll_oneshot_read(nxt_event_engine_t *engine,
    nxt_fd_event_t *ev);
static void nxt_epoll_oneshot_write(nxt_event_engine_t *engine,
    nxt_fd_event_t *ev);
static void nxt_epoll_enable_accept(nxt_event_engine_t *engine,
    nxt_fd_event_t *ev);
static void nxt_epoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev,
    int op, uint32_t events);
static void nxt_epoll_commit_changes(nxt_event_engine_t *engine);
static void nxt_epoll_error_handler(nxt_task_t *task, void *obj, void *data);
#if (NXT_HAVE_SIGNALFD)
static nxt_int_t nxt_epoll_add_signal(nxt_event_engine_t *engine);
static void nxt_epoll_signalfd_handler(nxt_task_t *task, void *obj, void *data);
#endif
#if (NXT_HAVE_EVENTFD)
static nxt_int_t nxt_epoll_enable_post(nxt_event_engine_t *engine,
    nxt_work_handler_t handler);
static void nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data);
static void nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo);
#endif
static void nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout);

#if (NXT_HAVE_ACCEPT4)
static void nxt_epoll_conn_io_accept4(nxt_task_t *task, void *obj,
    void *data);
#endif


#if (NXT_HAVE_EPOLL_EDGE)

static void nxt_epoll_edge_conn_io_connect(nxt_task_t *task, void *obj,
    void *data);
static void nxt_epoll_edge_conn_connected(nxt_task_t *task, void *obj,
    void *data);
static ssize_t nxt_epoll_edge_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b);


static nxt_conn_io_t  nxt_epoll_edge_conn_io = {
    .connect = nxt_epoll_edge_conn_io_connect,
    .accept = nxt_conn_io_accept,

    .read = nxt_conn_io_read,
    .recvbuf = nxt_epoll_edge_conn_io_recvbuf,
    .recv = nxt_conn_io_recv,

    .write = nxt_conn_io_write,
    .sendbuf = nxt_conn_io_sendbuf,

#if (NXT_HAVE_LINUX_SENDFILE)
    .old_sendbuf = nxt_linux_event_conn_io_sendfile,
#else
    .old_sendbuf = nxt_event_conn_io_sendbuf,
#endif

    .writev = nxt_event_conn_io_writev,
    .send = nxt_event_conn_io_send,
};


const nxt_event_interface_t  nxt_epoll_edge_engine = {
    "epoll_edge",
    nxt_epoll_edge_create,
    nxt_epoll_free,
    nxt_epoll_enable,
    nxt_epoll_disable,
    nxt_epoll_delete,
    nxt_epoll_close,
    nxt_epoll_enable_read,
    nxt_epoll_enable_write,
    nxt_epoll_disable_read,
    nxt_epoll_disable_write,
    nxt_epoll_block_read,
    nxt_epoll_block_write,
    nxt_epoll_oneshot_read,
    nxt_epoll_oneshot_write,
    nxt_epoll_enable_accept,
    NULL,
    NULL,
#if (NXT_HAVE_EVENTFD)
    nxt_epoll_enable_post,
    nxt_epoll_signal,
#else
    NULL,
    NULL,
#endif
    nxt_epoll_poll,

    &nxt_epoll_edge_conn_io,

#if (NXT_HAVE_INOTIFY)
    NXT_FILE_EVENTS,
#else
    NXT_NO_FILE_EVENTS,
#endif

#if (NXT_HAVE_SIGNALFD)
    NXT_SIGNAL_EVENTS,
#else
    NXT_NO_SIGNAL_EVENTS,
#endif
};

#endif


const nxt_event_interface_t  nxt_epoll_level_engine = {
    "epoll_level",
    nxt_epoll_level_create,
    nxt_epoll_free,
    nxt_epoll_enable,
    nxt_epoll_disable,
    nxt_epoll_delete,
    nxt_epoll_close,
    nxt_epoll_enable_read,
    nxt_epoll_enable_write,
    nxt_epoll_disable_read,
    nxt_epoll_disable_write,
    nxt_epoll_block_read,
    nxt_epoll_block_write,
    nxt_epoll_oneshot_read,
    nxt_epoll_oneshot_write,
    nxt_epoll_enable_accept,
    NULL,
    NULL,
#if (NXT_HAVE_EVENTFD)
    nxt_epoll_enable_post,
    nxt_epoll_signal,
#else
    NULL,
    NULL,
#endif
    nxt_epoll_poll,

    &nxt_unix_conn_io,

#if (NXT_HAVE_INOTIFY)
    NXT_FILE_EVENTS,
#else
    NXT_NO_FILE_EVENTS,
#endif

#if (NXT_HAVE_SIGNALFD)
    NXT_SIGNAL_EVENTS,
#else
    NXT_NO_SIGNAL_EVENTS,
#endif
};


#if (NXT_HAVE_EPOLL_EDGE)

static nxt_int_t
nxt_epoll_edge_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
    nxt_uint_t mevents)
{
    return nxt_epoll_create(engine, mchanges, mevents, &nxt_epoll_edge_conn_io,
                            EPOLLET | EPOLLRDHUP);
}

#endif


static nxt_int_t
nxt_epoll_level_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
    nxt_uint_t mevents)
{
    return nxt_epoll_create(engine, mchanges, mevents,
                            &nxt_unix_conn_io, 0);
}


static nxt_int_t
nxt_epoll_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
    nxt_uint_t mevents, nxt_conn_io_t *io, uint32_t mode)
{
    engine->u.epoll.fd = -1;
    engine->u.epoll.mode = mode;
    engine->u.epoll.mchanges = mchanges;
    engine->u.epoll.mevents = mevents;
#if (NXT_HAVE_SIGNALFD)
    engine->u.epoll.signalfd.fd = -1;
#endif

    engine->u.epoll.changes = nxt_malloc(sizeof(nxt_epoll_change_t) * mchanges);
    if (engine->u.epoll.changes == NULL) {
        goto fail;
    }

    engine->u.epoll.events = nxt_malloc(sizeof(struct epoll_event) * mevents);
    if (engine->u.epoll.events == NULL) {
        goto fail;
    }

    engine->u.epoll.fd = epoll_create(1);
    if (engine->u.epoll.fd == -1) {
        nxt_alert(&engine->task, "epoll_create() failed %E", nxt_errno);
        goto fail;
    }

    nxt_debug(&engine->task, "epoll_create(): %d", engine->u.epoll.fd);

    if (engine->signals != NULL) {

#if (NXT_HAVE_SIGNALFD)

        if (nxt_epoll_add_signal(engine) != NXT_OK) {
            goto fail;
        }

#endif

        nxt_epoll_test_accept4(engine, io);
    }

    return NXT_OK;

fail:

    nxt_epoll_free(engine);

    return NXT_ERROR;
}


static void
nxt_epoll_test_accept4(nxt_event_engine_t *engine, nxt_conn_io_t *io)
{
    static nxt_work_handler_t  handler;

    if (handler == NULL) {

        handler = io->accept;

#if (NXT_HAVE_ACCEPT4)

        (void) accept4(-1, NULL, NULL, SOCK_NONBLOCK);

        if (nxt_errno != NXT_ENOSYS) {
            handler = nxt_epoll_conn_io_accept4;

        } else {
            nxt_log(&engine->task, NXT_LOG_INFO, "accept4() failed %E",
                    NXT_ENOSYS);
        }

#endif
    }

    io->accept = handler;
}


static void
nxt_epoll_free(nxt_event_engine_t *engine)
{
    int  fd;

    nxt_debug(&engine->task, "epoll %d free", engine->u.epoll.fd);

#if (NXT_HAVE_SIGNALFD)

    fd = engine->u.epoll.signalfd.fd;

    if (fd != -1 && close(fd) != 0) {
        nxt_alert(&engine->task, "signalfd close(%d) failed %E", fd, nxt_errno);
    }

#endif

#if (NXT_HAVE_EVENTFD)

    fd = engine->u.epoll.eventfd.fd;

    if (fd != -1 && close(fd) != 0) {
        nxt_alert(&engine->task, "eventfd close(%d) failed %E", fd, nxt_errno);
    }

#endif

    fd = engine->u.epoll.fd;

    if (fd != -1 && close(fd) != 0) {
        nxt_alert(&engine->task, "epoll close(%d) failed %E", fd, nxt_errno);
    }

    nxt_free(engine->u.epoll.events);
    nxt_free(engine->u.epoll.changes);

    nxt_memzero(&engine->u.epoll, sizeof(nxt_epoll_engine_t));
}


static void
nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
{
    ev->read = NXT_EVENT_ACTIVE;
    ev->write = NXT_EVENT_ACTIVE;

    nxt_epoll_change(engine, ev, EPOLL_CTL_ADD,
                     EPOLLIN | EPOLLOUT | engine->u.epoll.mode);
}


static void
nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
{
    if (ev->read > NXT_EVENT_DISABLED || ev->write > NXT_EVENT_DISABLED) {

        ev->read = NXT_EVENT_INACTIVE;
        ev->write = NXT_EVENT_INACTIVE;

        nxt_epoll_change(engine, ev, EPOLL_CTL_DEL, 0);
    }
}


static void
nxt_epoll_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
{
    if (ev->read != NXT_EVENT_INACTIVE || ev->write != NXT_EVENT_INACTIVE) {

        ev->read = NXT_EVENT_INACTIVE;
        ev->write = NXT_EVENT_INACTIVE;

        nxt_epoll_change(engine, ev, EPOLL_CTL_DEL, 0);
    }
}


/*
 * Although calling close() on a file descriptor will remove any epoll
 * events that reference the descriptor, in this case the close() acquires
 * the kernel global "epmutex" while epoll_ctl(EPOLL_CTL_DEL) does not
 * acquire the "epmutex" since Linux 3.13 if the file descriptor presents
 * only in one epoll set.  Thus removing events explicitly before closing
 * eliminates possible lock contention.
 */

static nxt_bool_t
nxt_epoll_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
{
    nxt_epoll_delete(engine, ev);

    return ev->changing;
}


static void
nxt_epoll_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
{
    int       op;
    uint32_t  events;

    if (ev->read != NXT_EVENT_BLOCKED) {

        op = EPOLL_CTL_MOD;
        events = EPOLLIN | engine->u.epoll.mode;

        if (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) {
            op = EPOLL_CTL_ADD;

        } else if (ev->write >= NXT_EVENT_BLOCKED) {
            events |= EPOLLOUT;
        }

        nxt_epoll_change(engine, ev, op, events);
    }

    ev->read = NXT_EVENT_ACTIVE;
}


static void
nxt_epoll_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
{
    int       op;
    uint32_t  events;

    if (ev->write != NXT_EVENT_BLOCKED) {

        op = EPOLL_CTL_MOD;
        events = EPOLLOUT | engine->u.epoll.mode;

        if (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) {
            op = EPOLL_CTL_ADD;

        } else if (ev->read >= NXT_EVENT_BLOCKED) {
            events |= EPOLLIN;
        }

        nxt_epoll_change(engine, ev, op, events);
    }

    ev->write = NXT_EVENT_ACTIVE;
}


static void
nxt_epoll_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
{
    int       op;
    uint32_t  events;

    ev->read = NXT_EVENT_INACTIVE;

    if (ev->write <= NXT_EVENT_DISABLED) {
        ev->write = NXT_EVENT_INACTIVE;
        op = EPOLL_CTL_DEL;
        events = 0;

    } else {
        op = EPOLL_CTL_MOD;
        events = EPOLLOUT | engine->u.epoll.mode;
    }

    nxt_epoll_change(engine, ev, op, events);
}


static void
nxt_epoll_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
{
    int       op;
    uint32_t  events;

    ev->write = NXT_EVENT_INACTIVE;

    if (ev->read <= NXT_EVENT_DISABLED) {
        ev->read = NXT_EVENT_INACTIVE;
        op = EPOLL_CTL_DEL;
        events = 0;

    } else {
        op = EPOLL_CTL_MOD;
        events = EPOLLIN | engine->u.epoll.mode;
    }

    nxt_epoll_change(engine, ev, op, events);
}


static void
nxt_epoll_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
{
    if (ev->read != NXT_EVENT_INACTIVE) {
        ev->read = NXT_EVENT_BLOCKED;
    }
}


static void
nxt_epoll_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
{
    if (ev->write != NXT_EVENT_INACTIVE) {
        ev->write = NXT_EVENT_BLOCKED;
    }
}


/*
 * NXT_EVENT_DISABLED state is used to track whether EPOLLONESHOT
 * event should be added or modified, epoll_ctl(2):
 *
 * EPOLLONESHOT (since Linux 2.6.2)
 *     Sets the one-shot behavior for the associated file descriptor.
 *     This means that after an event is pulled out with epoll_wait(2)
 *     the associated file descriptor is internally disabled and no
 *     other events will be reported by the epoll interface.  The user
 *     must call epoll_ctl() with EPOLL_CTL_MOD to rearm the file
 *     descriptor with a new event mask.
 */

static void
nxt_epoll_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
{
    int  op;

    op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ?
             EPOLL_CTL_ADD : EPOLL_CTL_MOD;

    ev->read = NXT_EVENT_ONESHOT;
    ev->write = NXT_EVENT_INACTIVE;

    nxt_epoll_change(engine, ev, op, EPOLLIN | EPOLLONESHOT);
}


static void
nxt_epoll_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
{
    int  op;

    op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ?
             EPOLL_CTL_ADD : EPOLL_CTL_MOD;

    ev->read = NXT_EVENT_INACTIVE;
    ev->write = NXT_EVENT_ONESHOT;

    nxt_epoll_change(engine, ev, op, EPOLLOUT | EPOLLONESHOT);
}


static void
nxt_epoll_enable_accept(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
{
    uint32_t  events;

    ev->read = NXT_EVENT_ACTIVE;

    events = EPOLLIN;

#ifdef EPOLLEXCLUSIVE
    events |= EPOLLEXCLUSIVE;
#endif

    nxt_epoll_change(engine, ev, EPOLL_CTL_ADD, events);
}


/*
 * epoll changes are batched to improve instruction and data cache
 * locality of several epoll_ctl() calls followed by epoll_wait() call.
 */

static void
nxt_epoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, int op,
    uint32_t events)
{
    nxt_epoll_change_t  *change;

    nxt_debug(ev->task, "epoll %d set event: fd:%d op:%d ev:%XD",
              engine->u.epoll.fd, ev->fd, op, events);

    if (engine->u.epoll.nchanges >= engine->u.epoll.mchanges) {
        nxt_epoll_commit_changes(engine);
    }

    ev->changing = 1;

    change = &engine->u.epoll.changes[engine->u.epoll.nchanges++];
    change->op = op;
    change->event.events = events;
    change->event.data.ptr = ev;
}


static void
nxt_epoll_commit_changes(nxt_event_engine_t *engine)
{
    int                 ret;
    nxt_fd_event_t      *ev;
    nxt_epoll_change_t  *change, *end;

    nxt_debug(&engine->task, "epoll %d changes:%ui",
              engine->u.epoll.fd, engine->u.epoll.nchanges);

    change = engine->u.epoll.changes;
    end = change + engine->u.epoll.nchanges;

    do {
        ev = change->event.data.ptr;
        ev->changing = 0;

        nxt_debug(ev->task, "epoll_ctl(%d): fd:%d op:%d ev:%XD",
                  engine->u.epoll.fd, ev->fd, change->op,
                  change->event.events);

        ret = epoll_ctl(engine->u.epoll.fd, change->op, ev->fd, &change->event);

        if (nxt_slow_path(ret != 0)) {
            nxt_alert(ev->task, "epoll_ctl(%d, %d, %d) failed %E",
                      engine->u.epoll.fd, change->op, ev->fd, nxt_errno);

            nxt_work_queue_add(&engine->fast_work_queue,
                               nxt_epoll_error_handler, ev->task, ev, ev->data);

            engine->u.epoll.error = 1;
        }

        change++;

    } while (change < end);

    engine->u.epoll.nchanges = 0;
}


static void
nxt_epoll_error_handler(nxt_task_t *task, void *obj, void *data)
{
    nxt_fd_event_t  *ev;

    ev = obj;

    ev->read = NXT_EVENT_INACTIVE;
    ev->write = NXT_EVENT_INACTIVE;

    ev->error_handler(ev->task, ev, data);
}


#if (NXT_HAVE_SIGNALFD)

static nxt_int_t
nxt_epoll_add_signal(nxt_event_engine_t *engine)
{
    int                 fd;
    struct epoll_event  ee;

    if (sigprocmask(SIG_BLOCK, &engine->signals->sigmask, NULL) != 0) {
        nxt_alert(&engine->task, "sigprocmask(SIG_BLOCK) failed %E", nxt_errno);
        return NXT_ERROR;
    }

    /*
     * Glibc signalfd() wrapper always has the flags argument.  Glibc 2.7
     * and 2.8 signalfd() wrappers call the original signalfd() syscall
     * without the flags argument.  Glibc 2.9+ signalfd() wrapper at first
     * tries to call signalfd4() syscall and if it fails then calls the
     * original signalfd() syscall.  For this reason the non-blocking mode
     * is set separately.
     */

    fd = signalfd(-1, &engine->signals->sigmask, 0);

    if (fd == -1) {
        nxt_alert(&engine->task, "signalfd(%d) failed %E",
                  engine->u.epoll.signalfd.fd, nxt_errno);
        return NXT_ERROR;
    }

    engine->u.epoll.signalfd.fd = fd;

    if (nxt_fd_nonblocking(&engine->task, fd) != NXT_OK) {
        return NXT_ERROR;
    }

    nxt_debug(&engine->task, "signalfd(): %d", fd);

    engine->u.epoll.signalfd.data = engine->signals->handler;
    engine->u.epoll.signalfd.read_work_queue = &engine->fast_work_queue;
    engine->u.epoll.signalfd.read_handler = nxt_epoll_signalfd_handler;
    engine->u.epoll.signalfd.log = engine->task.log;
    engine->u.epoll.signalfd.task = &engine->task;

    ee.events = EPOLLIN;
    ee.data.ptr = &engine->u.epoll.signalfd;

    if (epoll_ctl(engine->u.epoll.fd, EPOLL_CTL_ADD, fd, &ee) != 0) {
        nxt_alert(&engine->task, "epoll_ctl(%d, %d, %d) failed %E",
                  engine->u.epoll.fd, EPOLL_CTL_ADD, fd, nxt_errno);

        return NXT_ERROR;
    }

    return NXT_OK;
}


static void
nxt_epoll_signalfd_handler(nxt_task_t *task, void *obj, void *data)
{
    int                      n;
    nxt_fd_event_t           *ev;
    nxt_work_handler_t       handler;
    struct signalfd_siginfo  sfd;

    ev = obj;
    handler = data;

    nxt_debug(task, "signalfd handler");

    n = read(ev->fd, &sfd, sizeof(struct signalfd_siginfo));

    nxt_debug(task, "read signalfd(%d): %d", ev->fd, n);

    if (n != sizeof(struct signalfd_siginfo)) {
        nxt_alert(task, "read signalfd(%d) failed %E", ev->fd, nxt_errno);
        return;
    }

    nxt_debug(task, "signalfd(%d) signo:%d", ev->fd, sfd.ssi_signo);

    handler(task, (void *) (uintptr_t) sfd.ssi_signo, NULL);
}

#endif


#if (NXT_HAVE_EVENTFD)

static nxt_int_t
nxt_epoll_enable_post(nxt_event_engine_t *engine, nxt_work_handler_t handler)
{
    int                 ret;
    struct epoll_event  ee;

    engine->u.epoll.post_handler = handler;

    /*
     * Glibc eventfd() wrapper always has the flags argument.  Glibc 2.7
     * and 2.8 eventfd() wrappers call the original eventfd() syscall
     * without the flags argument.  Glibc 2.9+ eventfd() wrapper at first
     * tries to call eventfd2() syscall and if it fails then calls the
     * original eventfd() syscall.  For this reason the non-blocking mode
     * is set separately.
     */

    engine->u.epoll.eventfd.fd = eventfd(0, 0);

    if (engine->u.epoll.eventfd.fd == -1) {
        nxt_alert(&engine->task, "eventfd() failed %E", nxt_errno);
        return NXT_ERROR;
    }

    ret = nxt_fd_nonblocking(&engine->task, engine->u.epoll.eventfd.fd);
    if (nxt_slow_path(ret != NXT_OK)) {
        return NXT_ERROR;
    }

    nxt_debug(&engine->task, "eventfd(): %d", engine->u.epoll.eventfd.fd);

    engine->u.epoll.eventfd.read_work_queue = &engine->fast_work_queue;
    engine->u.epoll.eventfd.read_handler = nxt_epoll_eventfd_handler;
    engine->u.epoll.eventfd.data = engine;
    engine->u.epoll.eventfd.log = engine->task.log;
    engine->u.epoll.eventfd.task = &engine->task;

    ee.events = EPOLLIN | EPOLLET;
    ee.data.ptr = &engine->u.epoll.eventfd;

    ret = epoll_ctl(engine->u.epoll.fd, EPOLL_CTL_ADD,
                    engine->u.epoll.eventfd.fd, &ee);

    if (nxt_fast_path(ret == 0)) {
        return NXT_OK;
    }

    nxt_alert(&engine->task, "epoll_ctl(%d, %d, %d) failed %E",
              engine->u.epoll.fd, EPOLL_CTL_ADD, engine->u.epoll.eventfd.fd,
              nxt_errno);

    return NXT_ERROR;
}


static void
nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data)
{
    int                 n;
    uint64_t            events;
    nxt_event_engine_t  *engine;

    engine = data;

    nxt_debug(task, "eventfd handler, times:%ui", engine->u.epoll.neventfd);

    /*
     * The maximum value after write() to a eventfd() descriptor will
     * block or return EAGAIN is 0xFFFFFFFFFFFFFFFE, so the descriptor
     * can be read once per many notifications, for example, once per
     * 2^32-2 noticifcations.  Since the eventfd() file descriptor is
     * always registered in EPOLLET mode, epoll returns event about
     * only the latest write() to the descriptor.
     */

    if (engine->u.epoll.neventfd++ >= 0xFFFFFFFE) {
        engine->u.epoll.neventfd = 0;

        n = read(engine->u.epoll.eventfd.fd, &events, sizeof(uint64_t));

        nxt_debug(task, "read(%d): %d events:%uL",
                  engine->u.epoll.eventfd.fd, n, events);

        if (n != sizeof(uint64_t)) {
            nxt_alert(task, "read eventfd(%d) failed %E",
                      engine->u.epoll.eventfd.fd, nxt_errno);
        }
    }

    engine->u.epoll.post_handler(task, NULL, NULL);
}


static void
nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo)
{
    size_t    ret;
    uint64_t  event;

    /*
     * eventfd() presents along with signalfd(), so the function
     * is used only to post events and the signo argument is ignored.
     */

    event = 1;

    ret = write(engine->u.epoll.eventfd.fd, &event, sizeof(uint64_t));

    if (nxt_slow_path(ret != sizeof(uint64_t))) {
        nxt_alert(&engine->task, "write(%d) to eventfd failed %E",
                  engine->u.epoll.eventfd.fd, nxt_errno);
    }
}

#endif


static void
nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout)
{
    int                 nevents;
    uint32_t            events;
    nxt_int_t           i;
    nxt_err_t           err;
    nxt_bool_t          error;
    nxt_uint_t          level;
    nxt_fd_event_t      *ev;
    struct epoll_event  *event;

    if (engine->u.epoll.nchanges != 0) {
        nxt_epoll_commit_changes(engine);
    }

    if (engine->u.epoll.error) {
        engine->u.epoll.error = 0;
        /* Error handlers have been enqueued on failure. */
        timeout = 0;
    }

    nxt_debug(&engine->task, "epoll_wait(%d) timeout:%M",
              engine->u.epoll.fd, timeout);

    nevents = epoll_wait(engine->u.epoll.fd, engine->u.epoll.events,
                         engine->u.epoll.mevents, timeout);

    err = (nevents == -1) ? nxt_errno : 0;

    nxt_thread_time_update(engine->task.thread);

    nxt_debug(&engine->task, "epoll_wait(%d): %d", engine->u.epoll.fd, nevents);

    if (nevents == -1) {
        level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_ALERT;

        nxt_log(&engine->task, level, "epoll_wait(%d) failed %E",
                engine->u.epoll.fd, err);

        return;
    }

    for (i = 0; i < nevents; i++) {

        event = &engine->u.epoll.events[i];
        events = event->events;
        ev = event->data.ptr;

        nxt_debug(ev->task, "epoll: fd:%d ev:%04XD d:%p rd:%d wr:%d",
                  ev->fd, events, ev, ev->read, ev->write);

        /*
         * On error epoll may set EPOLLERR and EPOLLHUP only without EPOLLIN
         * or EPOLLOUT, so the "error" variable enqueues only error handler.
         */
        error = ((events & (EPOLLERR | EPOLLHUP)) != 0);
        ev->epoll_error = error;

        if (error
            && ev->read <= NXT_EVENT_BLOCKED
            && ev->write <= NXT_EVENT_BLOCKED)
        {
            error = 0;
        }

#if (NXT_HAVE_EPOLL_EDGE)

        ev->epoll_eof = ((events & EPOLLRDHUP) != 0);

#endif

        if ((events & EPOLLIN) != 0) {
            ev->read_ready = 1;

            if (ev->read != NXT_EVENT_BLOCKED) {

                if (ev->read == NXT_EVENT_ONESHOT) {
                    ev->read = NXT_EVENT_DISABLED;
                }

                nxt_work_queue_add(ev->read_work_queue, ev->read_handler,
                                   ev->task, ev, ev->data);

                error = 0;

            } else if (engine->u.epoll.mode == 0) {
                /* Level-triggered mode. */
                nxt_epoll_disable_read(engine, ev);
            }
        }

        if ((events & EPOLLOUT) != 0) {
            ev->write_ready = 1;

            if (ev->write != NXT_EVENT_BLOCKED) {

                if (ev->write == NXT_EVENT_ONESHOT) {
                    ev->write = NXT_EVENT_DISABLED;
                }

                nxt_work_queue_add(ev->write_work_queue, ev->write_handler,
                                   ev->task, ev, ev->data);

                error = 0;

            } else if (engine->u.epoll.mode == 0) {
                /* Level-triggered mode. */
                nxt_epoll_disable_write(engine, ev);
            }
        }

        if (!error) {
            continue;
        }

        ev->read_ready = 1;
        ev->write_ready = 1;

        if (ev->read == NXT_EVENT_BLOCKED && ev->write == NXT_EVENT_BLOCKED) {

            if (engine->u.epoll.mode == 0) {
                /* Level-triggered mode. */
                nxt_epoll_disable(engine, ev);
            }

            continue;
        }

        nxt_work_queue_add(&engine->fast_work_queue, nxt_epoll_error_handler,
                           ev->task, ev, ev->data);
    }
}


#if (NXT_HAVE_ACCEPT4)

static void
nxt_epoll_conn_io_accept4(nxt_task_t *task, void *obj, void *data)
{
    socklen_t           socklen;
    nxt_conn_t          *c;
    nxt_socket_t        s;
    struct sockaddr     *sa;
    nxt_listen_event_t  *lev;

    lev = obj;
    c = lev->next;

    lev->ready--;
    lev->socket.read_ready = (lev->ready != 0);

    sa = &c->remote->u.sockaddr;
    socklen = c->remote->socklen;
    /*
     * The returned socklen is ignored here,
     * see comment in nxt_conn_io_accept().
     */
    s = accept4(lev->socket.fd, sa, &socklen, SOCK_NONBLOCK);

    if (s != -1) {
        c->socket.fd = s;

        nxt_debug(task, "accept4(%d): %d", lev->socket.fd, s);

        nxt_conn_accept(task, lev, c);
        return;
    }

    nxt_conn_accept_error(task, lev, "accept4", nxt_errno);
}

#endif


#if (NXT_HAVE_EPOLL_EDGE)

/*
 * nxt_epoll_edge_event_conn_io_connect() eliminates the getsockopt()
 * syscall to test pending connect() error.  Although this special
 * interface can work in both edge-triggered and level-triggered
 * modes it is enabled only for the former mode because this mode is
 * available in all modern Linux distributions.  For the latter mode
 * it is required to create additional nxt_epoll_level_event_conn_io
 * with single non-generic connect() interface.
 */

static void
nxt_epoll_edge_conn_io_connect(nxt_task_t *task, void *obj, void *data)
{
    nxt_conn_t                    *c;
    nxt_event_engine_t            *engine;
    nxt_work_handler_t            handler;
    const nxt_event_conn_state_t  *state;

    c = obj;

    state = c->write_state;

    switch (nxt_socket_connect(task, c->socket.fd, c->remote)) {

    case NXT_OK:
        c->socket.write_ready = 1;
        handler = state->ready_handler;
        break;

    case NXT_AGAIN:
        c->socket.write_handler = nxt_epoll_edge_conn_connected;
        c->socket.error_handler = nxt_conn_connect_error;

        engine = task->thread->engine;
        nxt_conn_timer(engine, c, state, &c->write_timer);

        nxt_epoll_enable(engine, &c->socket);
        c->socket.read = NXT_EVENT_BLOCKED;
        return;

#if 0
    case NXT_AGAIN:
        nxt_conn_timer(engine, c, state, &c->write_timer);

        /* Fall through. */

    case NXT_OK:
        /*
         * Mark both read and write directions as ready and try to perform
         * I/O operations before receiving readiness notifications.
         * On unconnected socket Linux send() and recv() return EAGAIN
         * instead of ENOTCONN.
         */
        c->socket.read_ready = 1;
        c->socket.write_ready = 1;
        /*
         * Enabling both read and write notifications on a getting
         * connected socket eliminates one epoll_ctl() syscall.
         */
        c->socket.write_handler = nxt_epoll_edge_event_conn_connected;
        c->socket.error_handler = state->error_handler;

        nxt_epoll_enable(engine, &c->socket);
        c->socket.read = NXT_EVENT_BLOCKED;

        handler = state->ready_handler;
        break;
#endif

    case NXT_ERROR:
        handler = state->error_handler;
        break;

    default:  /* NXT_DECLINED: connection refused. */
        handler = state->close_handler;
        break;
    }

    nxt_work_queue_add(c->write_work_queue, handler, task, c, data);
}


static void
nxt_epoll_edge_conn_connected(nxt_task_t *task, void *obj, void *data)
{
    nxt_conn_t  *c;

    c = obj;

    nxt_debug(task, "epoll event conn connected fd:%d", c->socket.fd);

    if (!c->socket.epoll_error) {
        c->socket.write = NXT_EVENT_BLOCKED;

        if (c->write_state->timer_autoreset) {
            nxt_timer_disable(task->thread->engine, &c->write_timer);
        }

        nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler,
                           task, c, data);
        return;
    }

    nxt_conn_connect_test(task, c, data);
}


/*
 * nxt_epoll_edge_conn_io_recvbuf() is just wrapper around
 * standard nxt_conn_io_recvbuf() to enforce to read a pending EOF
 * in edge-triggered mode.
 */

static ssize_t
nxt_epoll_edge_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b)
{
    ssize_t  n;

    n = nxt_conn_io_recvbuf(c, b);

    if (n > 0 && c->socket.epoll_eof) {
        c->socket.read_ready = 1;
    }

    return n;
}

#endif