diff options
Diffstat (limited to '')
-rw-r--r-- | src/nxt_epoll.c | 1167 |
1 files changed, 1167 insertions, 0 deletions
diff --git a/src/nxt_epoll.c b/src/nxt_epoll.c new file mode 100644 index 00000000..65e9eb8d --- /dev/null +++ b/src/nxt_epoll.c @@ -0,0 +1,1167 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) NGINX, Inc. + */ + +#include <nxt_main.h> + + +/* + * The first epoll version has been introduced in Linux 2.5.44. The + * interface was changed several times since then and the final version + * of epoll_create(), epoll_ctl(), epoll_wait(), and EPOLLET mode has + * been introduced in Linux 2.6.0 and is supported since glibc 2.3.2. + * + * EPOLLET mode did not work reliable in early implementaions and in + * Linux 2.4 backport. + * + * EPOLLONESHOT Linux 2.6.2, glibc 2.3. + * EPOLLRDHUP Linux 2.6.17, glibc 2.8. + * epoll_pwait() Linux 2.6.19, glibc 2.6. + * signalfd() Linux 2.6.22, glibc 2.7. + * eventfd() Linux 2.6.22, glibc 2.7. + * timerfd_create() Linux 2.6.25, glibc 2.8. + * epoll_create1() Linux 2.6.27, glibc 2.9. + * signalfd4() Linux 2.6.27, glibc 2.9. + * eventfd2() Linux 2.6.27, glibc 2.9. + * accept4() Linux 2.6.28, glibc 2.10. + * eventfd2(EFD_SEMAPHORE) Linux 2.6.30, glibc 2.10. + */ + + +#if (NXT_HAVE_EPOLL_EDGE) +static nxt_event_set_t *nxt_epoll_edge_create(nxt_event_signals_t *signals, + nxt_uint_t mchanges, nxt_uint_t mevents); +#endif +static nxt_event_set_t *nxt_epoll_level_create(nxt_event_signals_t *signals, + nxt_uint_t mchanges, nxt_uint_t mevents); +static nxt_event_set_t *nxt_epoll_create(nxt_event_signals_t *signals, + nxt_uint_t mchanges, nxt_uint_t mevents, nxt_event_conn_io_t *io, + uint32_t mode); +static void nxt_epoll_test_accept4(nxt_event_conn_io_t *io); +static void nxt_epoll_free(nxt_event_set_t *event_set); +static void nxt_epoll_enable(nxt_event_set_t *event_set, nxt_event_fd_t *ev); +static void nxt_epoll_disable(nxt_event_set_t *event_set, nxt_event_fd_t *ev); +static void nxt_epoll_delete(nxt_event_set_t *event_set, nxt_event_fd_t *ev); +static void nxt_epoll_close(nxt_event_set_t *event_set, nxt_event_fd_t *ev); +static void nxt_epoll_enable_read(nxt_event_set_t *event_set, + nxt_event_fd_t *ev); +static void nxt_epoll_enable_write(nxt_event_set_t *event_set, + nxt_event_fd_t *ev); +static void nxt_epoll_disable_read(nxt_event_set_t *event_set, + nxt_event_fd_t *ev); +static void nxt_epoll_disable_write(nxt_event_set_t *event_set, + nxt_event_fd_t *ev); +static void nxt_epoll_block_read(nxt_event_set_t *event_set, + nxt_event_fd_t *ev); +static void nxt_epoll_block_write(nxt_event_set_t *event_set, + nxt_event_fd_t *ev); +static void nxt_epoll_oneshot_read(nxt_event_set_t *event_set, + nxt_event_fd_t *ev); +static void nxt_epoll_oneshot_write(nxt_event_set_t *event_set, + nxt_event_fd_t *ev); +static void nxt_epoll_enable_accept(nxt_event_set_t *event_set, + nxt_event_fd_t *ev); +static void nxt_epoll_change(nxt_event_set_t *event_set, nxt_event_fd_t *ev, + int op, uint32_t events); +static nxt_int_t nxt_epoll_commit_changes(nxt_thread_t *thr, + nxt_epoll_event_set_t *es); +static void nxt_epoll_error_handler(nxt_thread_t *thr, void *obj, + void *data); +#if (NXT_HAVE_SIGNALFD) +static nxt_int_t nxt_epoll_add_signal(nxt_epoll_event_set_t *es, + nxt_event_signals_t *signals); +static void nxt_epoll_signalfd_handler(nxt_thread_t *thr, void *obj, + void *data); +#endif +#if (NXT_HAVE_EVENTFD) +static nxt_int_t nxt_epoll_enable_post(nxt_event_set_t *event_set, + nxt_work_handler_t handler); +static void nxt_epoll_eventfd_handler(nxt_thread_t *thr, void *obj, + void *data); +static void nxt_epoll_signal(nxt_event_set_t *event_set, nxt_uint_t signo); +#endif +static void nxt_epoll_poll(nxt_thread_t *thr, nxt_event_set_t *event_set, + nxt_msec_t timeout); + +#if (NXT_HAVE_ACCEPT4) +static void nxt_epoll_event_conn_io_accept4(nxt_thread_t *thr, void *obj, + void *data); +#endif + + +#if (NXT_HAVE_EPOLL_EDGE) + +static void nxt_epoll_edge_event_conn_io_connect(nxt_thread_t *thr, void *obj, + void *data); +static void nxt_epoll_edge_event_conn_connected(nxt_thread_t *thr, void *obj, + void *data); +static ssize_t nxt_epoll_edge_event_conn_io_recvbuf(nxt_event_conn_t *c, + nxt_buf_t *b); + + +static nxt_event_conn_io_t nxt_epoll_edge_event_conn_io = { + nxt_epoll_edge_event_conn_io_connect, + nxt_event_conn_io_accept, + + nxt_event_conn_io_read, + nxt_epoll_edge_event_conn_io_recvbuf, + nxt_event_conn_io_recv, + + nxt_event_conn_io_write, + nxt_event_conn_io_write_chunk, + +#if (NXT_HAVE_LINUX_SENDFILE) + nxt_linux_event_conn_io_sendfile, +#else + nxt_event_conn_io_sendbuf, +#endif + + nxt_event_conn_io_writev, + nxt_event_conn_io_send, + + nxt_event_conn_io_shutdown, +}; + + +const nxt_event_set_ops_t nxt_epoll_edge_event_set = { + "epoll_edge", + nxt_epoll_edge_create, + nxt_epoll_free, + nxt_epoll_enable, + nxt_epoll_disable, + nxt_epoll_delete, + nxt_epoll_close, + nxt_epoll_enable_read, + nxt_epoll_enable_write, + nxt_epoll_disable_read, + nxt_epoll_disable_write, + nxt_epoll_block_read, + nxt_epoll_block_write, + nxt_epoll_oneshot_read, + nxt_epoll_oneshot_write, + nxt_epoll_enable_accept, + NULL, + NULL, +#if (NXT_HAVE_EVENTFD) + nxt_epoll_enable_post, + nxt_epoll_signal, +#else + NULL, + NULL, +#endif + nxt_epoll_poll, + + &nxt_epoll_edge_event_conn_io, + +#if (NXT_HAVE_INOTIFY) + NXT_FILE_EVENTS, +#else + NXT_NO_FILE_EVENTS, +#endif + +#if (NXT_HAVE_SIGNALFD) + NXT_SIGNAL_EVENTS, +#else + NXT_NO_SIGNAL_EVENTS, +#endif +}; + +#endif + + +const nxt_event_set_ops_t nxt_epoll_level_event_set = { + "epoll_level", + nxt_epoll_level_create, + nxt_epoll_free, + nxt_epoll_enable, + nxt_epoll_disable, + nxt_epoll_delete, + nxt_epoll_close, + nxt_epoll_enable_read, + nxt_epoll_enable_write, + nxt_epoll_disable_read, + nxt_epoll_disable_write, + nxt_epoll_block_read, + nxt_epoll_block_write, + nxt_epoll_oneshot_read, + nxt_epoll_oneshot_write, + nxt_epoll_enable_accept, + NULL, + NULL, +#if (NXT_HAVE_EVENTFD) + nxt_epoll_enable_post, + nxt_epoll_signal, +#else + NULL, + NULL, +#endif + nxt_epoll_poll, + + &nxt_unix_event_conn_io, + +#if (NXT_HAVE_INOTIFY) + NXT_FILE_EVENTS, +#else + NXT_NO_FILE_EVENTS, +#endif + +#if (NXT_HAVE_SIGNALFD) + NXT_SIGNAL_EVENTS, +#else + NXT_NO_SIGNAL_EVENTS, +#endif +}; + + +#if (NXT_HAVE_EPOLL_EDGE) + +static nxt_event_set_t * +nxt_epoll_edge_create(nxt_event_signals_t *signals, nxt_uint_t mchanges, + nxt_uint_t mevents) +{ + return nxt_epoll_create(signals, mchanges, mevents, + &nxt_epoll_edge_event_conn_io, + EPOLLET | EPOLLRDHUP); +} + +#endif + + +static nxt_event_set_t * +nxt_epoll_level_create(nxt_event_signals_t *signals, nxt_uint_t mchanges, + nxt_uint_t mevents) +{ + return nxt_epoll_create(signals, mchanges, mevents, + &nxt_unix_event_conn_io, 0); +} + + +static nxt_event_set_t * +nxt_epoll_create(nxt_event_signals_t *signals, nxt_uint_t mchanges, + nxt_uint_t mevents, nxt_event_conn_io_t *io, uint32_t mode) +{ + nxt_event_set_t *event_set; + nxt_epoll_event_set_t *es; + + event_set = nxt_zalloc(sizeof(nxt_epoll_event_set_t)); + if (event_set == NULL) { + return NULL; + } + + es = &event_set->epoll; + + es->epoll = -1; + es->mode = mode; + es->mchanges = mchanges; + es->mevents = mevents; +#if (NXT_HAVE_SIGNALFD) + es->signalfd.fd = -1; +#endif + + es->changes = nxt_malloc(sizeof(nxt_epoll_change_t) * mchanges); + if (es->changes == NULL) { + goto fail; + } + + es->events = nxt_malloc(sizeof(struct epoll_event) * mevents); + if (es->events == NULL) { + goto fail; + } + + es->epoll = epoll_create(1); + if (es->epoll == -1) { + nxt_main_log_emerg("epoll_create() failed %E", nxt_errno); + goto fail; + } + + nxt_main_log_debug("epoll_create(): %d", es->epoll); + +#if (NXT_HAVE_SIGNALFD) + + if (signals != NULL) { + if (nxt_epoll_add_signal(es, signals) != NXT_OK) { + goto fail; + } + } + +#endif + + nxt_epoll_test_accept4(io); + + return event_set; + +fail: + + nxt_epoll_free(event_set); + + return NULL; +} + + +static void +nxt_epoll_test_accept4(nxt_event_conn_io_t *io) +{ + static nxt_work_handler_t handler; + + if (handler == NULL) { + + handler = io->accept; + +#if (NXT_HAVE_ACCEPT4) + + (void) accept4(-1, NULL, NULL, SOCK_NONBLOCK); + + if (nxt_errno != NXT_ENOSYS) { + handler = nxt_epoll_event_conn_io_accept4; + + } else { + nxt_main_log_error(NXT_LOG_NOTICE, "accept4() failed %E", + NXT_ENOSYS); + } + +#endif + } + + io->accept = handler; +} + + +static void +nxt_epoll_free(nxt_event_set_t *event_set) +{ + nxt_epoll_event_set_t *es; + + es = &event_set->epoll; + + nxt_main_log_debug("epoll %d free", es->epoll); + +#if (NXT_HAVE_SIGNALFD) + + if (es->signalfd.fd != -1) { + if (close(es->signalfd.fd) != 0) { + nxt_main_log_emerg("signalfd close(%d) failed %E", + es->signalfd.fd, nxt_errno); + } + } + +#endif + +#if (NXT_HAVE_EVENTFD) + + if (es->eventfd.fd != -1) { + if (close(es->eventfd.fd) != 0) { + nxt_main_log_emerg("eventfd close(%d) failed %E", + es->eventfd.fd, nxt_errno); + } + } + +#endif + + if (es->epoll != -1) { + if (close(es->epoll) != 0) { + nxt_main_log_emerg("epoll close(%d) failed %E", + es->epoll, nxt_errno); + } + } + + nxt_free(es->events); + nxt_free(es); +} + + +static void +nxt_epoll_enable(nxt_event_set_t *event_set, nxt_event_fd_t *ev) +{ + ev->read = NXT_EVENT_DEFAULT; + ev->write = NXT_EVENT_DEFAULT; + + nxt_epoll_change(event_set, ev, EPOLL_CTL_ADD, + EPOLLIN | EPOLLOUT | event_set->epoll.mode); +} + + +static void +nxt_epoll_disable(nxt_event_set_t *event_set, nxt_event_fd_t *ev) +{ + if (ev->read > NXT_EVENT_DISABLED || ev->write > NXT_EVENT_DISABLED) { + + ev->read = NXT_EVENT_INACTIVE; + ev->write = NXT_EVENT_INACTIVE; + + nxt_epoll_change(event_set, ev, EPOLL_CTL_DEL, 0); + } +} + + +static void +nxt_epoll_delete(nxt_event_set_t *event_set, nxt_event_fd_t *ev) +{ + if (ev->read != NXT_EVENT_INACTIVE || ev->write != NXT_EVENT_INACTIVE) { + + ev->read = NXT_EVENT_INACTIVE; + ev->write = NXT_EVENT_INACTIVE; + + nxt_epoll_change(event_set, ev, EPOLL_CTL_DEL, 0); + } +} + + +/* + * Although calling close() on a file descriptor will remove any epoll + * events that reference the descriptor, in this case the close() acquires + * the kernel global "epmutex" while epoll_ctl(EPOLL_CTL_DEL) does not + * acquire the "epmutex" since Linux 3.13 if the file descriptor presents + * only in one epoll set. Thus removing events explicitly before closing + * eliminates possible lock contention. + */ + +static void +nxt_epoll_close(nxt_event_set_t *event_set, nxt_event_fd_t *ev) +{ + nxt_epoll_event_set_t *es; + + nxt_epoll_delete(event_set, ev); + + es = &event_set->epoll; + + if (es->nchanges != 0) { + (void) nxt_epoll_commit_changes(nxt_thread(), &event_set->epoll); + } +} + + +static void +nxt_epoll_enable_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev) +{ + int op; + uint32_t events; + + if (ev->read != NXT_EVENT_BLOCKED) { + + op = EPOLL_CTL_MOD; + events = EPOLLIN | event_set->epoll.mode; + + if (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) { + op = EPOLL_CTL_ADD; + + } else if (ev->write >= NXT_EVENT_BLOCKED) { + events |= EPOLLOUT; + } + + nxt_epoll_change(event_set, ev, op, events); + } + + ev->read = NXT_EVENT_DEFAULT; +} + + +static void +nxt_epoll_enable_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev) +{ + int op; + uint32_t events; + + if (ev->write != NXT_EVENT_BLOCKED) { + + op = EPOLL_CTL_MOD; + events = EPOLLOUT | event_set->epoll.mode; + + if (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) { + op = EPOLL_CTL_ADD; + + } else if (ev->read >= NXT_EVENT_BLOCKED) { + events |= EPOLLIN; + } + + nxt_epoll_change(event_set, ev, op, events); + } + + ev->write = NXT_EVENT_DEFAULT; +} + + +static void +nxt_epoll_disable_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev) +{ + int op; + uint32_t events; + + ev->read = NXT_EVENT_INACTIVE; + + if (ev->write <= NXT_EVENT_DISABLED) { + ev->write = NXT_EVENT_INACTIVE; + op = EPOLL_CTL_DEL; + events = 0; + + } else { + op = EPOLL_CTL_MOD; + events = EPOLLOUT | event_set->epoll.mode; + } + + nxt_epoll_change(event_set, ev, op, events); +} + + +static void +nxt_epoll_disable_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev) +{ + int op; + uint32_t events; + + ev->write = NXT_EVENT_INACTIVE; + + if (ev->read <= NXT_EVENT_DISABLED) { + ev->write = NXT_EVENT_INACTIVE; + op = EPOLL_CTL_DEL; + events = 0; + + } else { + op = EPOLL_CTL_MOD; + events = EPOLLIN | event_set->epoll.mode; + } + + nxt_epoll_change(event_set, ev, op, events); +} + + +static void +nxt_epoll_block_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev) +{ + if (ev->read != NXT_EVENT_INACTIVE) { + ev->read = NXT_EVENT_BLOCKED; + } +} + + +static void +nxt_epoll_block_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev) +{ + if (ev->write != NXT_EVENT_INACTIVE) { + ev->write = NXT_EVENT_BLOCKED; + } +} + + +/* + * NXT_EVENT_DISABLED state is used to track whether EPOLLONESHOT + * event should be added or modified, epoll_ctl(2): + * + * EPOLLONESHOT (since Linux 2.6.2) + * Sets the one-shot behavior for the associated file descriptor. + * This means that after an event is pulled out with epoll_wait(2) + * the associated file descriptor is internally disabled and no + * other events will be reported by the epoll interface. The user + * must call epoll_ctl() with EPOLL_CTL_MOD to rearm the file + * descriptor with a new event mask. + */ + +static void +nxt_epoll_oneshot_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev) +{ + int op; + + op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ? + EPOLL_CTL_ADD : EPOLL_CTL_MOD; + + ev->read = NXT_EVENT_ONESHOT; + ev->write = NXT_EVENT_INACTIVE; + + nxt_epoll_change(event_set, ev, op, EPOLLIN | EPOLLONESHOT); +} + + +static void +nxt_epoll_oneshot_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev) +{ + int op; + + op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ? + EPOLL_CTL_ADD : EPOLL_CTL_MOD; + + ev->read = NXT_EVENT_INACTIVE; + ev->write = NXT_EVENT_ONESHOT; + + nxt_epoll_change(event_set, ev, op, EPOLLOUT | EPOLLONESHOT); +} + + +static void +nxt_epoll_enable_accept(nxt_event_set_t *event_set, nxt_event_fd_t *ev) +{ + ev->read = NXT_EVENT_DEFAULT; + + nxt_epoll_change(event_set, ev, EPOLL_CTL_ADD, EPOLLIN); +} + + +/* + * epoll changes are batched to improve instruction and data cache + * locality of several epoll_ctl() calls followed by epoll_wait() call. + */ + +static void +nxt_epoll_change(nxt_event_set_t *event_set, nxt_event_fd_t *ev, int op, + uint32_t events) +{ + nxt_epoll_change_t *ch; + nxt_epoll_event_set_t *es; + + es = &event_set->epoll; + + nxt_log_debug(ev->log, "epoll %d set event: fd:%d op:%d ev:%XD", + es->epoll, ev->fd, op, events); + + if (es->nchanges >= es->mchanges) { + (void) nxt_epoll_commit_changes(nxt_thread(), es); + } + + ch = &es->changes[es->nchanges++]; + ch->op = op; + ch->fd = ev->fd; + ch->event.events = events; + ch->event.data.ptr = ev; +} + + +static nxt_int_t +nxt_epoll_commit_changes(nxt_thread_t *thr, nxt_epoll_event_set_t *es) +{ + nxt_int_t ret; + nxt_event_fd_t *ev; + nxt_epoll_change_t *ch, *end; + + nxt_log_debug(thr->log, "epoll %d changes:%ui", es->epoll, es->nchanges); + + ret = NXT_OK; + ch = es->changes; + end = ch + es->nchanges; + + do { + ev = ch->event.data.ptr; + + nxt_log_debug(ev->log, "epoll_ctl(%d): fd:%d op:%d ev:%XD", + es->epoll, ch->fd, ch->op, ch->event.events); + + if (epoll_ctl(es->epoll, ch->op, ch->fd, &ch->event) != 0) { + nxt_log_alert(ev->log, "epoll_ctl(%d, %d, %d) failed %E", + es->epoll, ch->op, ch->fd, nxt_errno); + + nxt_thread_work_queue_add(thr, &thr->work_queue.main, + nxt_epoll_error_handler, + ev, ev->data, ev->log); + + ret = NXT_ERROR; + } + + ch++; + + } while (ch < end); + + es->nchanges = 0; + + return ret; +} + + +static void +nxt_epoll_error_handler(nxt_thread_t *thr, void *obj, void *data) +{ + nxt_event_fd_t *ev; + + ev = obj; + + ev->read = NXT_EVENT_INACTIVE; + ev->write = NXT_EVENT_INACTIVE; + + ev->error_handler(thr, ev, data); +} + + +#if (NXT_HAVE_SIGNALFD) + +static nxt_int_t +nxt_epoll_add_signal(nxt_epoll_event_set_t *es, nxt_event_signals_t *signals) +{ + int fd; + struct epoll_event ee; + + if (sigprocmask(SIG_BLOCK, &signals->sigmask, NULL) != 0) { + nxt_main_log_alert("sigprocmask(SIG_BLOCK) failed %E", nxt_errno); + return NXT_ERROR; + } + + /* + * Glibc signalfd() wrapper always has the flags argument. Glibc 2.7 + * and 2.8 signalfd() wrappers call the original signalfd() syscall + * without the flags argument. Glibc 2.9+ signalfd() wrapper at first + * tries to call signalfd4() syscall and if it fails then calls the + * original signalfd() syscall. For this reason the non-blocking mode + * is set separately. + */ + + fd = signalfd(-1, &signals->sigmask, 0); + + if (fd == -1) { + nxt_main_log_emerg("signalfd(%d) failed %E", + es->signalfd.fd, nxt_errno); + return NXT_ERROR; + } + + es->signalfd.fd = fd; + + if (nxt_fd_nonblocking(fd) != NXT_OK) { + return NXT_ERROR; + } + + nxt_main_log_debug("signalfd(): %d", fd); + + es->signalfd.data = signals->handler; + es->signalfd.read_work_queue = nxt_thread_main_work_queue(); + es->signalfd.read_handler = nxt_epoll_signalfd_handler; + es->signalfd.log = &nxt_main_log; + + ee.events = EPOLLIN; + ee.data.ptr = &es->signalfd; + + if (epoll_ctl(es->epoll, EPOLL_CTL_ADD, fd, &ee) != 0) { + nxt_main_log_alert("epoll_ctl(%d, %d, %d) failed %E", + es->epoll, EPOLL_CTL_ADD, fd, nxt_errno); + + return NXT_ERROR; + } + + return NXT_OK; +} + + +static void +nxt_epoll_signalfd_handler(nxt_thread_t *thr, void *obj, void *data) +{ + int n; + nxt_event_fd_t *ev; + nxt_work_handler_t handler; + struct signalfd_siginfo sfd; + + ev = obj; + handler = data; + + nxt_log_debug(thr->log, "signalfd handler"); + + n = read(ev->fd, &sfd, sizeof(struct signalfd_siginfo)); + + nxt_log_debug(thr->log, "read signalfd(%d): %d", ev->fd, n); + + if (n != sizeof(struct signalfd_siginfo)) { + nxt_log_alert(thr->log, "read signalfd(%d) failed %E", + ev->fd, nxt_errno); + } + + nxt_log_debug(thr->log, "signalfd(%d) signo:%d", ev->fd, sfd.ssi_signo); + + handler(thr, (void *) (uintptr_t) sfd.ssi_signo, NULL); +} + +#endif + + +#if (NXT_HAVE_EVENTFD) + +static nxt_int_t +nxt_epoll_enable_post(nxt_event_set_t *event_set, nxt_work_handler_t handler) +{ + struct epoll_event ee; + nxt_epoll_event_set_t *es; + + es = &event_set->epoll; + es->post_handler = handler; + + /* + * Glibc eventfd() wrapper always has the flags argument. Glibc 2.7 + * and 2.8 eventfd() wrappers call the original eventfd() syscall + * without the flags argument. Glibc 2.9+ eventfd() wrapper at first + * tries to call eventfd2() syscall and if it fails then calls the + * original eventfd() syscall. For this reason the non-blocking mode + * is set separately. + */ + + es->eventfd.fd = eventfd(0, 0); + + if (es->eventfd.fd == -1) { + nxt_main_log_emerg("eventfd() failed %E", nxt_errno); + return NXT_ERROR; + } + + if (nxt_fd_nonblocking(es->eventfd.fd) != NXT_OK) { + return NXT_ERROR; + } + + nxt_main_log_debug("eventfd(): %d", es->eventfd.fd); + + es->eventfd.read_work_queue = nxt_thread_main_work_queue(); + es->eventfd.read_handler = nxt_epoll_eventfd_handler; + es->eventfd.data = es; + es->eventfd.log = &nxt_main_log; + + ee.events = EPOLLIN | EPOLLET; + ee.data.ptr = &es->eventfd; + + if (epoll_ctl(es->epoll, EPOLL_CTL_ADD, es->eventfd.fd, &ee) == 0) { + return NXT_OK; + } + + nxt_main_log_alert("epoll_ctl(%d, %d, %d) failed %E", + es->epoll, EPOLL_CTL_ADD, es->eventfd.fd, nxt_errno); + + return NXT_ERROR; +} + + +static void +nxt_epoll_eventfd_handler(nxt_thread_t *thr, void *obj, void *data) +{ + int n; + uint64_t events; + nxt_epoll_event_set_t *es; + + es = data; + + nxt_log_debug(thr->log, "eventfd handler, times:%ui", es->neventfd); + + /* + * The maximum value after write() to a eventfd() descriptor will + * block or return EAGAIN is 0xfffffffffffffffe, so the descriptor + * can be read once per many notifications, for example, once per + * 2^32-2 noticifcations. Since the eventfd() file descriptor is + * always registered in EPOLLET mode, epoll returns event about + * only the latest write() to the descriptor. + */ + + if (es->neventfd++ >= 0xfffffffe) { + es->neventfd = 0; + + n = read(es->eventfd.fd, &events, sizeof(uint64_t)); + + nxt_log_debug(thr->log, "read(%d): %d events:%uL", + es->eventfd.fd, n, events); + + if (n != sizeof(uint64_t)) { + nxt_log_alert(thr->log, "read eventfd(%d) failed %E", + es->eventfd.fd, nxt_errno); + } + } + + es->post_handler(thr, NULL, NULL); +} + + +static void +nxt_epoll_signal(nxt_event_set_t *event_set, nxt_uint_t signo) +{ + uint64_t event; + nxt_epoll_event_set_t *es; + + es = &event_set->epoll; + + /* + * eventfd() presents along with signalfd(), so the function + * is used only to post events and the signo argument is ignored. + */ + + event = 1; + + if (write(es->eventfd.fd, &event, sizeof(uint64_t)) != sizeof(uint64_t)) { + nxt_thread_log_alert("write(%d) to eventfd failed %E", + es->eventfd.fd, nxt_errno); + } +} + +#endif + + +static void +nxt_epoll_poll(nxt_thread_t *thr, nxt_event_set_t *event_set, + nxt_msec_t timeout) +{ + int nevents; + uint32_t events; + nxt_int_t i; + nxt_err_t err; + nxt_bool_t error; + nxt_uint_t level; + nxt_event_fd_t *ev; + struct epoll_event *event; + nxt_epoll_event_set_t *es; + + es = &event_set->epoll; + + if (es->nchanges != 0) { + if (nxt_epoll_commit_changes(thr, es) != NXT_OK) { + /* Error handlers have been enqueued on failure. */ + timeout = 0; + } + } + + nxt_log_debug(thr->log, "epoll_wait(%d) timeout:%M", es->epoll, timeout); + + nevents = epoll_wait(es->epoll, es->events, es->mevents, timeout); + + err = (nevents == -1) ? nxt_errno : 0; + + nxt_thread_time_update(thr); + + nxt_log_debug(thr->log, "epoll_wait(%d): %d", es->epoll, nevents); + + if (nevents == -1) { + level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_ALERT; + nxt_log_error(level, thr->log, "epoll_wait(%d) failed %E", + es->epoll, err); + return; + } + + for (i = 0; i < nevents; i++) { + + event = &es->events[i]; + events = event->events; + ev = event->data.ptr; + + nxt_log_debug(ev->log, "epoll: fd:%d ev:%04XD d:%p rd:%d wr:%d", + ev->fd, events, ev, ev->read, ev->write); + + /* + * On error epoll may set EPOLLERR and EPOLLHUP only without EPOLLIN or + * EPOLLOUT, so the "error" variable enqueues only one active handler. + */ + error = ((events & (EPOLLERR | EPOLLHUP)) != 0); + ev->epoll_error = error; + +#if (NXT_HAVE_EPOLL_EDGE) + + ev->epoll_eof = ((events & EPOLLRDHUP) != 0); + +#endif + + if ((events & EPOLLIN) || error) { + ev->read_ready = 1; + + if (ev->read != NXT_EVENT_BLOCKED) { + + if (ev->read == NXT_EVENT_ONESHOT) { + ev->read = NXT_EVENT_DISABLED; + } + + error = 0; + + nxt_thread_work_queue_add(thr, ev->read_work_queue, + ev->read_handler, + ev, ev->data, ev->log); + + } else if (event_set->epoll.mode == 0) { + /* Level-triggered mode. */ + nxt_epoll_disable_read(event_set, ev); + } + } + + if ((events & EPOLLOUT) || error) { + ev->write_ready = 1; + + if (ev->write != NXT_EVENT_BLOCKED) { + + if (ev->write == NXT_EVENT_ONESHOT) { + ev->write = NXT_EVENT_DISABLED; + } + + error = 0; + + nxt_thread_work_queue_add(thr, ev->write_work_queue, + ev->write_handler, + ev, ev->data, ev->log); + + } else if (event_set->epoll.mode == 0) { + /* Level-triggered mode. */ + nxt_epoll_disable_write(event_set, ev); + } + } + + if (error) { + ev->read_ready = 1; + ev->write_ready = 1; + } + } +} + + +#if (NXT_HAVE_ACCEPT4) + +static void +nxt_epoll_event_conn_io_accept4(nxt_thread_t *thr, void *obj, void *data) +{ + socklen_t len; + nxt_socket_t s; + struct sockaddr *sa; + nxt_event_conn_t *c; + nxt_event_conn_listen_t *cls; + + cls = obj; + c = data; + + cls->ready--; + cls->socket.read_ready = (cls->ready != 0); + + len = nxt_socklen(c->remote); + + if (len >= sizeof(struct sockaddr)) { + sa = &c->remote->u.sockaddr; + + } else { + sa = NULL; + len = 0; + } + + s = accept4(cls->socket.fd, sa, &len, SOCK_NONBLOCK); + + if (s != -1) { + c->socket.fd = s; + + nxt_log_debug(thr->log, "accept4(%d): %d", cls->socket.fd, s); + + nxt_event_conn_accept(thr, cls, c); + return; + } + + nxt_event_conn_accept_error(thr, cls, "accept4", nxt_errno); +} + +#endif + + +#if (NXT_HAVE_EPOLL_EDGE) + +/* + * nxt_epoll_edge_event_conn_io_connect() eliminates the getsockopt() + * syscall to test pending connect() error. Although this special + * interface can work in both edge-triggered and level-triggered + * modes it is enabled only for the former mode because this mode is + * available in all modern Linux distributions. For the latter mode + * it is required to create additional nxt_epoll_level_event_conn_io + * with single non-generic connect() interface. + */ + +static void +nxt_epoll_edge_event_conn_io_connect(nxt_thread_t *thr, void *obj, + void *data) +{ + nxt_event_conn_t *c; + nxt_work_handler_t handler; + const nxt_event_conn_state_t *state; + + c = obj; + + state = c->write_state; + + switch (nxt_socket_connect(c->socket.fd, c->remote) ){ + + case NXT_OK: + c->socket.write_ready = 1; + handler = state->ready_handler; + break; + + case NXT_AGAIN: + c->socket.write_handler = nxt_epoll_edge_event_conn_connected; + c->socket.error_handler = nxt_event_conn_connect_error; + + nxt_event_conn_timer(thr->engine, c, state, &c->write_timer); + + nxt_epoll_enable(thr->engine->event_set, &c->socket); + c->socket.read = NXT_EVENT_BLOCKED; + return; + +#if 0 + case NXT_AGAIN: + nxt_event_conn_timer(thr->engine, c, state, &c->write_timer); + + /* Fall through. */ + + case NXT_OK: + /* + * Mark both read and write directions as ready and try to perform + * I/O operations before receiving readiness notifications. + * On unconnected socket Linux send() and recv() return EAGAIN + * instead of ENOTCONN. + */ + c->socket.read_ready = 1; + c->socket.write_ready = 1; + /* + * Enabling both read and write notifications on a getting + * connected socket eliminates one epoll_ctl() syscall. + */ + c->socket.write_handler = nxt_epoll_edge_event_conn_connected; + c->socket.error_handler = state->error_handler; + + nxt_epoll_enable(thr->engine->event_set, &c->socket); + c->socket.read = NXT_EVENT_BLOCKED; + + handler = state->ready_handler; + break; +#endif + + case NXT_ERROR: + handler = state->error_handler; + break; + + default: /* NXT_DECLINED: connection refused. */ + handler = state->close_handler; + break; + } + + nxt_event_conn_io_handle(thr, c->write_work_queue, handler, c, data); +} + + +static void +nxt_epoll_edge_event_conn_connected(nxt_thread_t *thr, void *obj, void *data) +{ + nxt_event_conn_t *c; + + c = obj; + + nxt_log_debug(thr->log, "epoll event conn connected fd:%d", c->socket.fd); + + if (!c->socket.epoll_error) { + c->socket.write = NXT_EVENT_BLOCKED; + + if (c->write_state->autoreset_timer) { + nxt_event_timer_disable(&c->write_timer); + } + + nxt_event_conn_io_handle(thr, c->write_work_queue, + c->write_state->ready_handler, c, data); + return; + } + + nxt_event_conn_connect_test(thr, c, data); +} + + +/* + * nxt_epoll_edge_event_conn_io_recvbuf() is just wrapper around + * standard nxt_event_conn_io_recvbuf() to enforce to read a pending EOF + * in edge-triggered mode. + */ + +static ssize_t +nxt_epoll_edge_event_conn_io_recvbuf(nxt_event_conn_t *c, nxt_buf_t *b) +{ + ssize_t n; + + n = nxt_event_conn_io_recvbuf(c, b); + + if (n > 0 && c->socket.epoll_eof) { + c->socket.read_ready = 1; + } + + return n; +} + +#endif |