diff options
author | Igor Sysoev <igor@sysoev.ru> | 2017-02-07 20:04:56 +0300 |
---|---|---|
committer | Igor Sysoev <igor@sysoev.ru> | 2017-02-07 20:04:56 +0300 |
commit | 059a8642898a6bd4b47d13a1c1d599cd44af7e1c (patch) | |
tree | e3c8c530a04f1ae44777d5ea4fd6901dc55a8ebf /src/nxt_epoll_engine.c | |
parent | e57b95a92333fa7ff558737b0ba2b76894cc0412 (diff) | |
download | unit-059a8642898a6bd4b47d13a1c1d599cd44af7e1c.tar.gz unit-059a8642898a6bd4b47d13a1c1d599cd44af7e1c.tar.bz2 |
Event engines refactoring.
Diffstat (limited to 'src/nxt_epoll_engine.c')
-rw-r--r-- | src/nxt_epoll_engine.c | 1158 |
1 files changed, 1158 insertions, 0 deletions
diff --git a/src/nxt_epoll_engine.c b/src/nxt_epoll_engine.c new file mode 100644 index 00000000..19e0389b --- /dev/null +++ b/src/nxt_epoll_engine.c @@ -0,0 +1,1158 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) NGINX, Inc. + */ + +#include <nxt_main.h> + + +/* + * The first epoll version has been introduced in Linux 2.5.44. The + * interface was changed several times since then and the final version + * of epoll_create(), epoll_ctl(), epoll_wait(), and EPOLLET mode has + * been introduced in Linux 2.6.0 and is supported since glibc 2.3.2. + * + * EPOLLET mode did not work reliable in early implementaions and in + * Linux 2.4 backport. + * + * EPOLLONESHOT Linux 2.6.2, glibc 2.3. + * EPOLLRDHUP Linux 2.6.17, glibc 2.8. + * epoll_pwait() Linux 2.6.19, glibc 2.6. + * signalfd() Linux 2.6.22, glibc 2.7. + * eventfd() Linux 2.6.22, glibc 2.7. + * timerfd_create() Linux 2.6.25, glibc 2.8. + * epoll_create1() Linux 2.6.27, glibc 2.9. + * signalfd4() Linux 2.6.27, glibc 2.9. + * eventfd2() Linux 2.6.27, glibc 2.9. + * accept4() Linux 2.6.28, glibc 2.10. + * eventfd2(EFD_SEMAPHORE) Linux 2.6.30, glibc 2.10. + * EPOLLEXCLUSIVE Linux 4.5. + */ + + +#if (NXT_HAVE_EPOLL_EDGE) +static nxt_int_t nxt_epoll_edge_create(nxt_event_engine_t *engine, + nxt_uint_t mchanges, nxt_uint_t mevents); +#endif +static nxt_int_t nxt_epoll_level_create(nxt_event_engine_t *engine, + nxt_uint_t mchanges, nxt_uint_t mevents); +static nxt_int_t nxt_epoll_create(nxt_event_engine_t *engine, + nxt_uint_t mchanges, nxt_uint_t mevents, nxt_event_conn_io_t *io, + uint32_t mode); +static void nxt_epoll_test_accept4(nxt_event_engine_t *engine, + nxt_event_conn_io_t *io); +static void nxt_epoll_free(nxt_event_engine_t *engine); +static void nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); +static void nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); +static void nxt_epoll_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev); +static nxt_bool_t nxt_epoll_close(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); +static void nxt_epoll_enable_read(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); +static void nxt_epoll_enable_write(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); +static void nxt_epoll_disable_read(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); +static void nxt_epoll_disable_write(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); +static void nxt_epoll_block_read(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); +static void nxt_epoll_block_write(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); +static void nxt_epoll_oneshot_read(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); +static void nxt_epoll_oneshot_write(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); +static void nxt_epoll_enable_accept(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); +static void nxt_epoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, + int op, uint32_t events); +static nxt_int_t nxt_epoll_commit_changes(nxt_event_engine_t *engine); +static void nxt_epoll_error_handler(nxt_task_t *task, void *obj, void *data); +#if (NXT_HAVE_SIGNALFD) +static nxt_int_t nxt_epoll_add_signal(nxt_event_engine_t *engine); +static void nxt_epoll_signalfd_handler(nxt_task_t *task, void *obj, void *data); +#endif +#if (NXT_HAVE_EVENTFD) +static nxt_int_t nxt_epoll_enable_post(nxt_event_engine_t *engine, + nxt_work_handler_t handler); +static void nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data); +static void nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo); +#endif +static void nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout); + +#if (NXT_HAVE_ACCEPT4) +static void nxt_epoll_event_conn_io_accept4(nxt_task_t *task, void *obj, + void *data); +#endif + + +#if (NXT_HAVE_EPOLL_EDGE) + +static void nxt_epoll_edge_event_conn_io_connect(nxt_task_t *task, void *obj, + void *data); +static void nxt_epoll_edge_event_conn_connected(nxt_task_t *task, void *obj, + void *data); +static ssize_t nxt_epoll_edge_event_conn_io_recvbuf(nxt_event_conn_t *c, + nxt_buf_t *b); + + +static nxt_event_conn_io_t nxt_epoll_edge_event_conn_io = { + nxt_epoll_edge_event_conn_io_connect, + nxt_event_conn_io_accept, + + nxt_event_conn_io_read, + nxt_epoll_edge_event_conn_io_recvbuf, + nxt_event_conn_io_recv, + + nxt_event_conn_io_write, + nxt_event_conn_io_write_chunk, + +#if (NXT_HAVE_LINUX_SENDFILE) + nxt_linux_event_conn_io_sendfile, +#else + nxt_event_conn_io_sendbuf, +#endif + + nxt_event_conn_io_writev, + nxt_event_conn_io_send, + + nxt_event_conn_io_shutdown, +}; + + +const nxt_event_interface_t nxt_epoll_edge_engine = { + "epoll_edge", + nxt_epoll_edge_create, + nxt_epoll_free, + nxt_epoll_enable, + nxt_epoll_disable, + nxt_epoll_delete, + nxt_epoll_close, + nxt_epoll_enable_read, + nxt_epoll_enable_write, + nxt_epoll_disable_read, + nxt_epoll_disable_write, + nxt_epoll_block_read, + nxt_epoll_block_write, + nxt_epoll_oneshot_read, + nxt_epoll_oneshot_write, + nxt_epoll_enable_accept, + NULL, + NULL, +#if (NXT_HAVE_EVENTFD) + nxt_epoll_enable_post, + nxt_epoll_signal, +#else + NULL, + NULL, +#endif + nxt_epoll_poll, + + &nxt_epoll_edge_event_conn_io, + +#if (NXT_HAVE_INOTIFY) + NXT_FILE_EVENTS, +#else + NXT_NO_FILE_EVENTS, +#endif + +#if (NXT_HAVE_SIGNALFD) + NXT_SIGNAL_EVENTS, +#else + NXT_NO_SIGNAL_EVENTS, +#endif +}; + +#endif + + +const nxt_event_interface_t nxt_epoll_level_engine = { + "epoll_level", + nxt_epoll_level_create, + nxt_epoll_free, + nxt_epoll_enable, + nxt_epoll_disable, + nxt_epoll_delete, + nxt_epoll_close, + nxt_epoll_enable_read, + nxt_epoll_enable_write, + nxt_epoll_disable_read, + nxt_epoll_disable_write, + nxt_epoll_block_read, + nxt_epoll_block_write, + nxt_epoll_oneshot_read, + nxt_epoll_oneshot_write, + nxt_epoll_enable_accept, + NULL, + NULL, +#if (NXT_HAVE_EVENTFD) + nxt_epoll_enable_post, + nxt_epoll_signal, +#else + NULL, + NULL, +#endif + nxt_epoll_poll, + + &nxt_unix_event_conn_io, + +#if (NXT_HAVE_INOTIFY) + NXT_FILE_EVENTS, +#else + NXT_NO_FILE_EVENTS, +#endif + +#if (NXT_HAVE_SIGNALFD) + NXT_SIGNAL_EVENTS, +#else + NXT_NO_SIGNAL_EVENTS, +#endif +}; + + +#if (NXT_HAVE_EPOLL_EDGE) + +static nxt_int_t +nxt_epoll_edge_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, + nxt_uint_t mevents) +{ + return nxt_epoll_create(engine, mchanges, mevents, + &nxt_epoll_edge_event_conn_io, + EPOLLET | EPOLLRDHUP); +} + +#endif + + +static nxt_int_t +nxt_epoll_level_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, + nxt_uint_t mevents) +{ + return nxt_epoll_create(engine, mchanges, mevents, + &nxt_unix_event_conn_io, 0); +} + + +static nxt_int_t +nxt_epoll_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, + nxt_uint_t mevents, nxt_event_conn_io_t *io, uint32_t mode) +{ + engine->u.epoll.fd = -1; + engine->u.epoll.mode = mode; + engine->u.epoll.mchanges = mchanges; + engine->u.epoll.mevents = mevents; +#if (NXT_HAVE_SIGNALFD) + engine->u.epoll.signalfd.fd = -1; +#endif + + engine->u.epoll.changes = nxt_malloc(sizeof(nxt_epoll_change_t) * mchanges); + if (engine->u.epoll.changes == NULL) { + goto fail; + } + + engine->u.epoll.events = nxt_malloc(sizeof(struct epoll_event) * mevents); + if (engine->u.epoll.events == NULL) { + goto fail; + } + + engine->u.epoll.fd = epoll_create(1); + if (engine->u.epoll.fd == -1) { + nxt_log(&engine->task, NXT_LOG_CRIT, "epoll_create() failed %E", + nxt_errno); + goto fail; + } + + nxt_debug(&engine->task, "epoll_create(): %d", engine->u.epoll.fd); + + if (engine->signals != NULL) { + +#if (NXT_HAVE_SIGNALFD) + + if (nxt_epoll_add_signal(engine) != NXT_OK) { + goto fail; + } + +#endif + + nxt_epoll_test_accept4(engine, io); + } + + return NXT_OK; + +fail: + + nxt_epoll_free(engine); + + return NXT_ERROR; +} + + +static void +nxt_epoll_test_accept4(nxt_event_engine_t *engine, nxt_event_conn_io_t *io) +{ + static nxt_work_handler_t handler; + + if (handler == NULL) { + + handler = io->accept; + +#if (NXT_HAVE_ACCEPT4) + + (void) accept4(-1, NULL, NULL, SOCK_NONBLOCK); + + if (nxt_errno != NXT_ENOSYS) { + handler = nxt_epoll_event_conn_io_accept4; + + } else { + nxt_log(&engine->task, NXT_LOG_INFO, "accept4() failed %E", + NXT_ENOSYS); + } + +#endif + } + + io->accept = handler; +} + + +static void +nxt_epoll_free(nxt_event_engine_t *engine) +{ + int fd; + + nxt_debug(&engine->task, "epoll %d free", engine->u.epoll.fd); + +#if (NXT_HAVE_SIGNALFD) + + fd = engine->u.epoll.signalfd.fd; + + if (fd != -1 && close(fd) != 0) { + nxt_log(&engine->task, NXT_LOG_CRIT, "signalfd close(%d) failed %E", + fd, nxt_errno); + } + +#endif + +#if (NXT_HAVE_EVENTFD) + + fd = engine->u.epoll.eventfd.fd; + + if (fd != -1 && close(fd) != 0) { + nxt_log(&engine->task, NXT_LOG_CRIT, "eventfd close(%d) failed %E", + fd, nxt_errno); + } + +#endif + + fd = engine->u.epoll.fd; + + if (fd != -1 && close(fd) != 0) { + nxt_log(&engine->task, NXT_LOG_CRIT, "epoll close(%d) failed %E", + fd, nxt_errno); + } + + nxt_free(engine->u.epoll.events); + + nxt_memzero(&engine->u.epoll, sizeof(nxt_epoll_engine_t)); +} + + +static void +nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev) +{ + ev->read = NXT_EVENT_ACTIVE; + ev->write = NXT_EVENT_ACTIVE; + + nxt_epoll_change(engine, ev, EPOLL_CTL_ADD, + EPOLLIN | EPOLLOUT | engine->u.epoll.mode); +} + + +static void +nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev) +{ + if (ev->read > NXT_EVENT_DISABLED || ev->write > NXT_EVENT_DISABLED) { + + ev->read = NXT_EVENT_INACTIVE; + ev->write = NXT_EVENT_INACTIVE; + + nxt_epoll_change(engine, ev, EPOLL_CTL_DEL, 0); + } +} + + +static void +nxt_epoll_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev) +{ + if (ev->read != NXT_EVENT_INACTIVE || ev->write != NXT_EVENT_INACTIVE) { + + ev->read = NXT_EVENT_INACTIVE; + ev->write = NXT_EVENT_INACTIVE; + + nxt_epoll_change(engine, ev, EPOLL_CTL_DEL, 0); + } +} + + +/* + * Although calling close() on a file descriptor will remove any epoll + * events that reference the descriptor, in this case the close() acquires + * the kernel global "epmutex" while epoll_ctl(EPOLL_CTL_DEL) does not + * acquire the "epmutex" since Linux 3.13 if the file descriptor presents + * only in one epoll set. Thus removing events explicitly before closing + * eliminates possible lock contention. + */ + +static nxt_bool_t +nxt_epoll_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev) +{ + nxt_epoll_delete(engine, ev); + + return ev->changing; +} + + +static void +nxt_epoll_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) +{ + int op; + uint32_t events; + + if (ev->read != NXT_EVENT_BLOCKED) { + + op = EPOLL_CTL_MOD; + events = EPOLLIN | engine->u.epoll.mode; + + if (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) { + op = EPOLL_CTL_ADD; + + } else if (ev->write >= NXT_EVENT_BLOCKED) { + events |= EPOLLOUT; + } + + nxt_epoll_change(engine, ev, op, events); + } + + ev->read = NXT_EVENT_ACTIVE; +} + + +static void +nxt_epoll_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) +{ + int op; + uint32_t events; + + if (ev->write != NXT_EVENT_BLOCKED) { + + op = EPOLL_CTL_MOD; + events = EPOLLOUT | engine->u.epoll.mode; + + if (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) { + op = EPOLL_CTL_ADD; + + } else if (ev->read >= NXT_EVENT_BLOCKED) { + events |= EPOLLIN; + } + + nxt_epoll_change(engine, ev, op, events); + } + + ev->write = NXT_EVENT_ACTIVE; +} + + +static void +nxt_epoll_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) +{ + int op; + uint32_t events; + + ev->read = NXT_EVENT_INACTIVE; + + if (ev->write <= NXT_EVENT_DISABLED) { + ev->write = NXT_EVENT_INACTIVE; + op = EPOLL_CTL_DEL; + events = 0; + + } else { + op = EPOLL_CTL_MOD; + events = EPOLLOUT | engine->u.epoll.mode; + } + + nxt_epoll_change(engine, ev, op, events); +} + + +static void +nxt_epoll_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) +{ + int op; + uint32_t events; + + ev->write = NXT_EVENT_INACTIVE; + + if (ev->read <= NXT_EVENT_DISABLED) { + ev->write = NXT_EVENT_INACTIVE; + op = EPOLL_CTL_DEL; + events = 0; + + } else { + op = EPOLL_CTL_MOD; + events = EPOLLIN | engine->u.epoll.mode; + } + + nxt_epoll_change(engine, ev, op, events); +} + + +static void +nxt_epoll_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) +{ + if (ev->read != NXT_EVENT_INACTIVE) { + ev->read = NXT_EVENT_BLOCKED; + } +} + + +static void +nxt_epoll_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) +{ + if (ev->write != NXT_EVENT_INACTIVE) { + ev->write = NXT_EVENT_BLOCKED; + } +} + + +/* + * NXT_EVENT_DISABLED state is used to track whether EPOLLONESHOT + * event should be added or modified, epoll_ctl(2): + * + * EPOLLONESHOT (since Linux 2.6.2) + * Sets the one-shot behavior for the associated file descriptor. + * This means that after an event is pulled out with epoll_wait(2) + * the associated file descriptor is internally disabled and no + * other events will be reported by the epoll interface. The user + * must call epoll_ctl() with EPOLL_CTL_MOD to rearm the file + * descriptor with a new event mask. + */ + +static void +nxt_epoll_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) +{ + int op; + + op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ? + EPOLL_CTL_ADD : EPOLL_CTL_MOD; + + ev->read = NXT_EVENT_ONESHOT; + ev->write = NXT_EVENT_INACTIVE; + + nxt_epoll_change(engine, ev, op, EPOLLIN | EPOLLONESHOT); +} + + +static void +nxt_epoll_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) +{ + int op; + + op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ? + EPOLL_CTL_ADD : EPOLL_CTL_MOD; + + ev->read = NXT_EVENT_INACTIVE; + ev->write = NXT_EVENT_ONESHOT; + + nxt_epoll_change(engine, ev, op, EPOLLOUT | EPOLLONESHOT); +} + + +static void +nxt_epoll_enable_accept(nxt_event_engine_t *engine, nxt_fd_event_t *ev) +{ + ev->read = NXT_EVENT_ACTIVE; + + nxt_epoll_change(engine, ev, EPOLL_CTL_ADD, EPOLLIN); +} + + +/* + * epoll changes are batched to improve instruction and data cache + * locality of several epoll_ctl() calls followed by epoll_wait() call. + */ + +static void +nxt_epoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, int op, + uint32_t events) +{ + nxt_epoll_change_t *change; + + nxt_debug(ev->task, "epoll %d set event: fd:%d op:%d ev:%XD", + engine->u.epoll.fd, ev->fd, op, events); + + if (engine->u.epoll.nchanges >= engine->u.epoll.mchanges) { + (void) nxt_epoll_commit_changes(engine); + } + + ev->changing = 1; + + change = &engine->u.epoll.changes[engine->u.epoll.nchanges++]; + change->op = op; + change->event.events = events; + change->event.data.ptr = ev; +} + + +static nxt_int_t +nxt_epoll_commit_changes(nxt_event_engine_t *engine) +{ + int ret; + nxt_int_t retval; + nxt_fd_event_t *ev; + nxt_epoll_change_t *change, *end; + + nxt_debug(&engine->task, "epoll %d changes:%ui", + engine->u.epoll.fd, engine->u.epoll.nchanges); + + retval = NXT_OK; + change = engine->u.epoll.changes; + end = change + engine->u.epoll.nchanges; + + do { + ev = change->event.data.ptr; + ev->changing = 0; + + nxt_debug(ev->task, "epoll_ctl(%d): fd:%d op:%d ev:%XD", + engine->u.epoll.fd, ev->fd, change->op, + change->event.events); + + ret = epoll_ctl(engine->u.epoll.fd, change->op, ev->fd, &change->event); + + if (nxt_slow_path(ret != 0)) { + nxt_log(ev->task, NXT_LOG_CRIT, "epoll_ctl(%d, %d, %d) failed %E", + engine->u.epoll.fd, change->op, ev->fd, nxt_errno); + + nxt_work_queue_add(&engine->fast_work_queue, + nxt_epoll_error_handler, ev->task, ev, ev->data); + + retval = NXT_ERROR; + } + + change++; + + } while (change < end); + + engine->u.epoll.nchanges = 0; + + return retval; +} + + +static void +nxt_epoll_error_handler(nxt_task_t *task, void *obj, void *data) +{ + nxt_fd_event_t *ev; + + ev = obj; + + ev->read = NXT_EVENT_INACTIVE; + ev->write = NXT_EVENT_INACTIVE; + + ev->error_handler(ev->task, ev, data); +} + + +#if (NXT_HAVE_SIGNALFD) + +static nxt_int_t +nxt_epoll_add_signal(nxt_event_engine_t *engine) +{ + int fd; + struct epoll_event ee; + + if (sigprocmask(SIG_BLOCK, &engine->signals->sigmask, NULL) != 0) { + nxt_log(&engine->task, NXT_LOG_CRIT, + "sigprocmask(SIG_BLOCK) failed %E", nxt_errno); + return NXT_ERROR; + } + + /* + * Glibc signalfd() wrapper always has the flags argument. Glibc 2.7 + * and 2.8 signalfd() wrappers call the original signalfd() syscall + * without the flags argument. Glibc 2.9+ signalfd() wrapper at first + * tries to call signalfd4() syscall and if it fails then calls the + * original signalfd() syscall. For this reason the non-blocking mode + * is set separately. + */ + + fd = signalfd(-1, &engine->signals->sigmask, 0); + + if (fd == -1) { + nxt_log(&engine->task, NXT_LOG_CRIT, "signalfd(%d) failed %E", + engine->u.epoll.signalfd.fd, nxt_errno); + return NXT_ERROR; + } + + engine->u.epoll.signalfd.fd = fd; + + if (nxt_fd_nonblocking(fd) != NXT_OK) { + return NXT_ERROR; + } + + nxt_debug(&engine->task, "signalfd(): %d", fd); + + engine->u.epoll.signalfd.data = engine->signals->handler; + engine->u.epoll.signalfd.read_work_queue = &engine->fast_work_queue; + engine->u.epoll.signalfd.read_handler = nxt_epoll_signalfd_handler; + engine->u.epoll.signalfd.log = engine->task.log; + engine->u.epoll.signalfd.task = &engine->task; + + ee.events = EPOLLIN; + ee.data.ptr = &engine->u.epoll.signalfd; + + if (epoll_ctl(engine->u.epoll.fd, EPOLL_CTL_ADD, fd, &ee) != 0) { + nxt_log(&engine->task, NXT_LOG_CRIT, "epoll_ctl(%d, %d, %d) failed %E", + engine->u.epoll.fd, EPOLL_CTL_ADD, fd, nxt_errno); + + return NXT_ERROR; + } + + return NXT_OK; +} + + +static void +nxt_epoll_signalfd_handler(nxt_task_t *task, void *obj, void *data) +{ + int n; + nxt_fd_event_t *ev; + nxt_work_handler_t handler; + struct signalfd_siginfo sfd; + + ev = obj; + handler = data; + + nxt_debug(task, "signalfd handler"); + + n = read(ev->fd, &sfd, sizeof(struct signalfd_siginfo)); + + nxt_debug(task, "read signalfd(%d): %d", ev->fd, n); + + if (n != sizeof(struct signalfd_siginfo)) { + nxt_log(task, NXT_LOG_CRIT, "read signalfd(%d) failed %E", + ev->fd, nxt_errno); + } + + nxt_debug(task, "signalfd(%d) signo:%d", ev->fd, sfd.ssi_signo); + + handler(task, (void *) (uintptr_t) sfd.ssi_signo, NULL); +} + +#endif + + +#if (NXT_HAVE_EVENTFD) + +static nxt_int_t +nxt_epoll_enable_post(nxt_event_engine_t *engine, nxt_work_handler_t handler) +{ + int ret; + struct epoll_event ee; + + engine->u.epoll.post_handler = handler; + + /* + * Glibc eventfd() wrapper always has the flags argument. Glibc 2.7 + * and 2.8 eventfd() wrappers call the original eventfd() syscall + * without the flags argument. Glibc 2.9+ eventfd() wrapper at first + * tries to call eventfd2() syscall and if it fails then calls the + * original eventfd() syscall. For this reason the non-blocking mode + * is set separately. + */ + + engine->u.epoll.eventfd.fd = eventfd(0, 0); + + if (engine->u.epoll.eventfd.fd == -1) { + nxt_log(&engine->task, NXT_LOG_CRIT, "eventfd() failed %E", nxt_errno); + return NXT_ERROR; + } + + if (nxt_fd_nonblocking(engine->u.epoll.eventfd.fd) != NXT_OK) { + return NXT_ERROR; + } + + nxt_debug(&engine->task, "eventfd(): %d", engine->u.epoll.eventfd.fd); + + engine->u.epoll.eventfd.read_work_queue = &engine->fast_work_queue; + engine->u.epoll.eventfd.read_handler = nxt_epoll_eventfd_handler; + engine->u.epoll.eventfd.data = engine; + engine->u.epoll.eventfd.log = engine->task.log; + engine->u.epoll.eventfd.task = &engine->task; + + ee.events = EPOLLIN | EPOLLET; + ee.data.ptr = &engine->u.epoll.eventfd; + + ret = epoll_ctl(engine->u.epoll.fd, EPOLL_CTL_ADD, + engine->u.epoll.eventfd.fd, &ee); + + if (nxt_fast_path(ret == 0)) { + return NXT_OK; + } + + nxt_log(&engine->task, NXT_LOG_CRIT, "epoll_ctl(%d, %d, %d) failed %E", + engine->u.epoll.fd, EPOLL_CTL_ADD, engine->u.epoll.eventfd.fd, + nxt_errno); + + return NXT_ERROR; +} + + +static void +nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data) +{ + int n; + uint64_t events; + nxt_event_engine_t *engine; + + engine = data; + + nxt_debug(task, "eventfd handler, times:%ui", engine->u.epoll.neventfd); + + /* + * The maximum value after write() to a eventfd() descriptor will + * block or return EAGAIN is 0xfffffffffffffffe, so the descriptor + * can be read once per many notifications, for example, once per + * 2^32-2 noticifcations. Since the eventfd() file descriptor is + * always registered in EPOLLET mode, epoll returns event about + * only the latest write() to the descriptor. + */ + + if (engine->u.epoll.neventfd++ >= 0xfffffffe) { + engine->u.epoll.neventfd = 0; + + n = read(engine->u.epoll.eventfd.fd, &events, sizeof(uint64_t)); + + nxt_debug(task, "read(%d): %d events:%uL", + engine->u.epoll.eventfd.fd, n, events); + + if (n != sizeof(uint64_t)) { + nxt_log(task, NXT_LOG_CRIT, "read eventfd(%d) failed %E", + engine->u.epoll.eventfd.fd, nxt_errno); + } + } + + engine->u.epoll.post_handler(task, NULL, NULL); +} + + +static void +nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo) +{ + size_t ret; + uint64_t event; + + /* + * eventfd() presents along with signalfd(), so the function + * is used only to post events and the signo argument is ignored. + */ + + event = 1; + + ret = write(engine->u.epoll.eventfd.fd, &event, sizeof(uint64_t)); + + if (nxt_slow_path(ret != sizeof(uint64_t))) { + nxt_log(&engine->task, NXT_LOG_CRIT, "write(%d) to eventfd failed %E", + engine->u.epoll.eventfd.fd, nxt_errno); + } +} + +#endif + + +static void +nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout) +{ + int nevents; + uint32_t events; + nxt_int_t i; + nxt_err_t err; + nxt_bool_t error; + nxt_uint_t level; + nxt_fd_event_t *ev; + struct epoll_event *event; + + if (engine->u.epoll.nchanges != 0) { + if (nxt_epoll_commit_changes(engine) != NXT_OK) { + /* Error handlers have been enqueued on failure. */ + timeout = 0; + } + } + + nxt_debug(&engine->task, "epoll_wait(%d) timeout:%M", + engine->u.epoll.fd, timeout); + + nevents = epoll_wait(engine->u.epoll.fd, engine->u.epoll.events, + engine->u.epoll.mevents, timeout); + + err = (nevents == -1) ? nxt_errno : 0; + + nxt_thread_time_update(engine->task.thread); + + nxt_debug(&engine->task, "epoll_wait(%d): %d", engine->u.epoll.fd, nevents); + + if (nevents == -1) { + level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_CRIT; + + nxt_log(&engine->task, level, "epoll_wait(%d) failed %E", + engine->u.epoll.fd, err); + + return; + } + + for (i = 0; i < nevents; i++) { + + event = &engine->u.epoll.events[i]; + events = event->events; + ev = event->data.ptr; + + nxt_debug(ev->task, "epoll: fd:%d ev:%04XD d:%p rd:%d wr:%d", + ev->fd, events, ev, ev->read, ev->write); + + /* + * On error epoll may set EPOLLERR and EPOLLHUP only without EPOLLIN or + * EPOLLOUT, so the "error" variable enqueues only one active handler. + */ + error = ((events & (EPOLLERR | EPOLLHUP)) != 0); + ev->epoll_error = error; + +#if (NXT_HAVE_EPOLL_EDGE) + + ev->epoll_eof = ((events & EPOLLRDHUP) != 0); + +#endif + + if ((events & EPOLLIN) || error) { + ev->read_ready = 1; + + if (ev->read != NXT_EVENT_BLOCKED) { + + if (ev->read == NXT_EVENT_ONESHOT) { + ev->read = NXT_EVENT_DISABLED; + } + + error = 0; + + nxt_work_queue_add(ev->read_work_queue, ev->read_handler, + ev->task, ev, ev->data); + + } else if (engine->u.epoll.mode == 0) { + /* Level-triggered mode. */ + nxt_epoll_disable_read(engine, ev); + } + } + + if ((events & EPOLLOUT) || error) { + ev->write_ready = 1; + + if (ev->write != NXT_EVENT_BLOCKED) { + + if (ev->write == NXT_EVENT_ONESHOT) { + ev->write = NXT_EVENT_DISABLED; + } + + error = 0; + + nxt_work_queue_add(ev->write_work_queue, ev->write_handler, + ev->task, ev, ev->data); + + } else if (engine->u.epoll.mode == 0) { + /* Level-triggered mode. */ + nxt_epoll_disable_write(engine, ev); + } + } + + if (error) { + ev->read_ready = 1; + ev->write_ready = 1; + } + } +} + + +#if (NXT_HAVE_ACCEPT4) + +static void +nxt_epoll_event_conn_io_accept4(nxt_task_t *task, void *obj, void *data) +{ + socklen_t len; + nxt_socket_t s; + struct sockaddr *sa; + nxt_event_conn_t *c; + nxt_event_conn_listen_t *cls; + + cls = obj; + c = data; + + cls->ready--; + cls->socket.read_ready = (cls->ready != 0); + + len = nxt_socklen(c->remote); + + if (len >= sizeof(struct sockaddr)) { + sa = &c->remote->u.sockaddr; + + } else { + sa = NULL; + len = 0; + } + + s = accept4(cls->socket.fd, sa, &len, SOCK_NONBLOCK); + + if (s != -1) { + c->socket.fd = s; + + nxt_debug(task, "accept4(%d): %d", cls->socket.fd, s); + + nxt_event_conn_accept(task, cls, c); + return; + } + + nxt_event_conn_accept_error(task, cls, "accept4", nxt_errno); +} + +#endif + + +#if (NXT_HAVE_EPOLL_EDGE) + +/* + * nxt_epoll_edge_event_conn_io_connect() eliminates the getsockopt() + * syscall to test pending connect() error. Although this special + * interface can work in both edge-triggered and level-triggered + * modes it is enabled only for the former mode because this mode is + * available in all modern Linux distributions. For the latter mode + * it is required to create additional nxt_epoll_level_event_conn_io + * with single non-generic connect() interface. + */ + +static void +nxt_epoll_edge_event_conn_io_connect(nxt_task_t *task, void *obj, void *data) +{ + nxt_event_conn_t *c; + nxt_event_engine_t *engine; + nxt_work_handler_t handler; + const nxt_event_conn_state_t *state; + + c = obj; + + state = c->write_state; + + switch (nxt_socket_connect(c->socket.fd, c->remote) ){ + + case NXT_OK: + c->socket.write_ready = 1; + handler = state->ready_handler; + break; + + case NXT_AGAIN: + c->socket.write_handler = nxt_epoll_edge_event_conn_connected; + c->socket.error_handler = nxt_event_conn_connect_error; + + engine = task->thread->engine; + nxt_event_conn_timer(engine, c, state, &c->write_timer); + + nxt_epoll_enable(engine, &c->socket); + c->socket.read = NXT_EVENT_BLOCKED; + return; + +#if 0 + case NXT_AGAIN: + nxt_event_conn_timer(engine, c, state, &c->write_timer); + + /* Fall through. */ + + case NXT_OK: + /* + * Mark both read and write directions as ready and try to perform + * I/O operations before receiving readiness notifications. + * On unconnected socket Linux send() and recv() return EAGAIN + * instead of ENOTCONN. + */ + c->socket.read_ready = 1; + c->socket.write_ready = 1; + /* + * Enabling both read and write notifications on a getting + * connected socket eliminates one epoll_ctl() syscall. + */ + c->socket.write_handler = nxt_epoll_edge_event_conn_connected; + c->socket.error_handler = state->error_handler; + + nxt_epoll_enable(engine, &c->socket); + c->socket.read = NXT_EVENT_BLOCKED; + + handler = state->ready_handler; + break; +#endif + + case NXT_ERROR: + handler = state->error_handler; + break; + + default: /* NXT_DECLINED: connection refused. */ + handler = state->close_handler; + break; + } + + nxt_event_conn_io_handle(task->thread, c->write_work_queue, handler, + task, c, data); +} + + +static void +nxt_epoll_edge_event_conn_connected(nxt_task_t *task, void *obj, void *data) +{ + nxt_event_conn_t *c; + + c = obj; + + nxt_debug(task, "epoll event conn connected fd:%d", c->socket.fd); + + if (!c->socket.epoll_error) { + c->socket.write = NXT_EVENT_BLOCKED; + + if (c->write_state->autoreset_timer) { + nxt_timer_disable(task->thread->engine, &c->write_timer); + } + + nxt_event_conn_io_handle(task->thread, c->write_work_queue, + c->write_state->ready_handler, task, c, data); + return; + } + + nxt_event_conn_connect_test(task, c, data); +} + + +/* + * nxt_epoll_edge_event_conn_io_recvbuf() is just wrapper around + * standard nxt_event_conn_io_recvbuf() to enforce to read a pending EOF + * in edge-triggered mode. + */ + +static ssize_t +nxt_epoll_edge_event_conn_io_recvbuf(nxt_event_conn_t *c, nxt_buf_t *b) +{ + ssize_t n; + + n = nxt_event_conn_io_recvbuf(c, b); + + if (n > 0 && c->socket.epoll_eof) { + c->socket.read_ready = 1; + } + + return n; +} + +#endif |