/* * Copyright (C) Igor Sysoev * Copyright (C) NGINX, Inc. */ #include <nxt_main.h> /* * The first epoll version has been introduced in Linux 2.5.44. The * interface was changed several times since then and the final version * of epoll_create(), epoll_ctl(), epoll_wait(), and EPOLLET mode has * been introduced in Linux 2.6.0 and is supported since glibc 2.3.2. * * EPOLLET mode did not work reliable in early implementaions and in * Linux 2.4 backport. * * EPOLLONESHOT Linux 2.6.2, glibc 2.3. * EPOLLRDHUP Linux 2.6.17, glibc 2.8. * epoll_pwait() Linux 2.6.19, glibc 2.6. * signalfd() Linux 2.6.22, glibc 2.7. * eventfd() Linux 2.6.22, glibc 2.7. * timerfd_create() Linux 2.6.25, glibc 2.8. * epoll_create1() Linux 2.6.27, glibc 2.9. * signalfd4() Linux 2.6.27, glibc 2.9. * eventfd2() Linux 2.6.27, glibc 2.9. * accept4() Linux 2.6.28, glibc 2.10. * eventfd2(EFD_SEMAPHORE) Linux 2.6.30, glibc 2.10. * EPOLLEXCLUSIVE Linux 4.5, glibc 2.24. */ #if (NXT_HAVE_EPOLL_EDGE) static nxt_int_t nxt_epoll_edge_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, nxt_uint_t mevents); #endif static nxt_int_t nxt_epoll_level_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, nxt_uint_t mevents); static nxt_int_t nxt_epoll_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, nxt_uint_t mevents, nxt_conn_io_t *io, uint32_t mode); static void nxt_epoll_test_accept4(nxt_event_engine_t *engine, nxt_conn_io_t *io); static void nxt_epoll_free(nxt_event_engine_t *engine); static void nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); static void nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); static void nxt_epoll_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev); static nxt_bool_t nxt_epoll_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev); static void nxt_epoll_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev); static void nxt_epoll_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev); static void nxt_epoll_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev); static void nxt_epoll_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev); static void nxt_epoll_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev); static void nxt_epoll_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev); static void nxt_epoll_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev); static void nxt_epoll_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev); static void nxt_epoll_enable_accept(nxt_event_engine_t *engine, nxt_fd_event_t *ev); static void nxt_epoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, int op, uint32_t events); static void nxt_epoll_commit_changes(nxt_event_engine_t *engine); static void nxt_epoll_error_handler(nxt_task_t *task, void *obj, void *data); #if (NXT_HAVE_SIGNALFD) static nxt_int_t nxt_epoll_add_signal(nxt_event_engine_t *engine); static void nxt_epoll_signalfd_handler(nxt_task_t *task, void *obj, void *data); #endif #if (NXT_HAVE_EVENTFD) static nxt_int_t nxt_epoll_enable_post(nxt_event_engine_t *engine, nxt_work_handler_t handler); static void nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data); static void nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo); #endif static void nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout); #if (NXT_HAVE_ACCEPT4) static void nxt_epoll_conn_io_accept4(nxt_task_t *task, void *obj, void *data); #endif #if (NXT_HAVE_EPOLL_EDGE) static void nxt_epoll_edge_conn_io_connect(nxt_task_t *task, void *obj, void *data); static void nxt_epoll_edge_conn_connected(nxt_task_t *task, void *obj, void *data); static ssize_t nxt_epoll_edge_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b); static nxt_conn_io_t nxt_epoll_edge_conn_io = { .connect = nxt_epoll_edge_conn_io_connect, .accept = nxt_conn_io_accept, .read = nxt_conn_io_read, .recvbuf = nxt_epoll_edge_conn_io_recvbuf, .recv = nxt_conn_io_recv, .write = nxt_conn_io_write, .sendbuf = nxt_conn_io_sendbuf, #if (NXT_HAVE_LINUX_SENDFILE) .old_sendbuf = nxt_linux_event_conn_io_sendfile, #else .old_sendbuf = nxt_event_conn_io_sendbuf, #endif .writev = nxt_event_conn_io_writev, .send = nxt_event_conn_io_send, }; const nxt_event_interface_t nxt_epoll_edge_engine = { "epoll_edge", nxt_epoll_edge_create, nxt_epoll_free, nxt_epoll_enable, nxt_epoll_disable, nxt_epoll_delete, nxt_epoll_close, nxt_epoll_enable_read, nxt_epoll_enable_write, nxt_epoll_disable_read, nxt_epoll_disable_write, nxt_epoll_block_read, nxt_epoll_block_write, nxt_epoll_oneshot_read, nxt_epoll_oneshot_write, nxt_epoll_enable_accept, NULL, NULL, #if (NXT_HAVE_EVENTFD) nxt_epoll_enable_post, nxt_epoll_signal, #else NULL, NULL, #endif nxt_epoll_poll, &nxt_epoll_edge_conn_io, #if (NXT_HAVE_INOTIFY) NXT_FILE_EVENTS, #else NXT_NO_FILE_EVENTS, #endif #if (NXT_HAVE_SIGNALFD) NXT_SIGNAL_EVENTS, #else NXT_NO_SIGNAL_EVENTS, #endif }; #endif const nxt_event_interface_t nxt_epoll_level_engine = { "epoll_level", nxt_epoll_level_create, nxt_epoll_free, nxt_epoll_enable, nxt_epoll_disable, nxt_epoll_delete, nxt_epoll_close, nxt_epoll_enable_read, nxt_epoll_enable_write, nxt_epoll_disable_read, nxt_epoll_disable_write, nxt_epoll_block_read, nxt_epoll_block_write, nxt_epoll_oneshot_read, nxt_epoll_oneshot_write, nxt_epoll_enable_accept, NULL, NULL, #if (NXT_HAVE_EVENTFD) nxt_epoll_enable_post, nxt_epoll_signal, #else NULL, NULL, #endif nxt_epoll_poll, &nxt_unix_conn_io, #if (NXT_HAVE_INOTIFY) NXT_FILE_EVENTS, #else NXT_NO_FILE_EVENTS, #endif #if (NXT_HAVE_SIGNALFD) NXT_SIGNAL_EVENTS, #else NXT_NO_SIGNAL_EVENTS, #endif }; #if (NXT_HAVE_EPOLL_EDGE) static nxt_int_t nxt_epoll_edge_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, nxt_uint_t mevents) { return nxt_epoll_create(engine, mchanges, mevents, &nxt_epoll_edge_conn_io, EPOLLET | EPOLLRDHUP); } #endif static nxt_int_t nxt_epoll_level_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, nxt_uint_t mevents) { return nxt_epoll_create(engine, mchanges, mevents, &nxt_unix_conn_io, 0); } static nxt_int_t nxt_epoll_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, nxt_uint_t mevents, nxt_conn_io_t *io, uint32_t mode) { engine->u.epoll.fd = -1; engine->u.epoll.mode = mode; engine->u.epoll.mchanges = mchanges; engine->u.epoll.mevents = mevents; #if (NXT_HAVE_SIGNALFD) engine->u.epoll.signalfd.fd = -1; #endif engine->u.epoll.changes = nxt_malloc(sizeof(nxt_epoll_change_t) * mchanges); if (engine->u.epoll.changes == NULL) { goto fail; } engine->u.epoll.events = nxt_malloc(sizeof(struct epoll_event) * mevents); if (engine->u.epoll.events == NULL) { goto fail; } engine->u.epoll.fd = epoll_create(1); if (engine->u.epoll.fd == -1) { nxt_alert(&engine->task, "epoll_create() failed %E", nxt_errno); goto fail; } nxt_debug(&engine->task, "epoll_create(): %d", engine->u.epoll.fd); if (engine->signals != NULL) { #if (NXT_HAVE_SIGNALFD) if (nxt_epoll_add_signal(engine) != NXT_OK) { goto fail; } #endif nxt_epoll_test_accept4(engine, io); } return NXT_OK; fail: nxt_epoll_free(engine); return NXT_ERROR; } static void nxt_epoll_test_accept4(nxt_event_engine_t *engine, nxt_conn_io_t *io) { static nxt_work_handler_t handler; if (handler == NULL) { handler = io->accept; #if (NXT_HAVE_ACCEPT4) (void) accept4(-1, NULL, NULL, SOCK_NONBLOCK); if (nxt_errno != NXT_ENOSYS) { handler = nxt_epoll_conn_io_accept4; } else { nxt_log(&engine->task, NXT_LOG_INFO, "accept4() failed %E", NXT_ENOSYS); } #endif } io->accept = handler; } static void nxt_epoll_free(nxt_event_engine_t *engine) { int fd; nxt_debug(&engine->task, "epoll %d free", engine->u.epoll.fd); #if (NXT_HAVE_SIGNALFD) fd = engine->u.epoll.signalfd.fd; if (fd != -1 && close(fd) != 0) { nxt_alert(&engine->task, "signalfd close(%d) failed %E", fd, nxt_errno); } #endif #if (NXT_HAVE_EVENTFD) fd = engine->u.epoll.eventfd.fd; if (fd != -1 && close(fd) != 0) { nxt_alert(&engine->task, "eventfd close(%d) failed %E", fd, nxt_errno); } #endif fd = engine->u.epoll.fd; if (fd != -1 && close(fd) != 0) { nxt_alert(&engine->task, "epoll close(%d) failed %E", fd, nxt_errno); } nxt_free(engine->u.epoll.events); nxt_free(engine->u.epoll.changes); nxt_memzero(&engine->u.epoll, sizeof(nxt_epoll_engine_t)); } static void nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev) { ev->read = NXT_EVENT_ACTIVE; ev->write = NXT_EVENT_ACTIVE; nxt_epoll_change(engine, ev, EPOLL_CTL_ADD, EPOLLIN | EPOLLOUT | engine->u.epoll.mode); } static void nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev) { if (ev->read > NXT_EVENT_DISABLED || ev->write > NXT_EVENT_DISABLED) { ev->read = NXT_EVENT_INACTIVE; ev->write = NXT_EVENT_INACTIVE; nxt_epoll_change(engine, ev, EPOLL_CTL_DEL, 0); } } static void nxt_epoll_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev) { if (ev->read != NXT_EVENT_INACTIVE || ev->write != NXT_EVENT_INACTIVE) { ev->read = NXT_EVENT_INACTIVE; ev->write = NXT_EVENT_INACTIVE; nxt_epoll_change(engine, ev, EPOLL_CTL_DEL, 0); } } /* * Although calling close() on a file descriptor will remove any epoll * events that reference the descriptor, in this case the close() acquires * the kernel global "epmutex" while epoll_ctl(EPOLL_CTL_DEL) does not * acquire the "epmutex" since Linux 3.13 if the file descriptor presents * only in one epoll set. Thus removing events explicitly before closing * eliminates possible lock contention. */ static nxt_bool_t nxt_epoll_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev) { nxt_epoll_delete(engine, ev); return ev->changing; } static void nxt_epoll_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) { int op; uint32_t events; if (ev->read != NXT_EVENT_BLOCKED) { op = EPOLL_CTL_MOD; events = EPOLLIN | engine->u.epoll.mode; if (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) { op = EPOLL_CTL_ADD; } else if (ev->write >= NXT_EVENT_BLOCKED) { events |= EPOLLOUT; } nxt_epoll_change(engine, ev, op, events); } ev->read = NXT_EVENT_ACTIVE; } static void nxt_epoll_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) { int op; uint32_t events; if (ev->write != NXT_EVENT_BLOCKED) { op = EPOLL_CTL_MOD; events = EPOLLOUT | engine->u.epoll.mode; if (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) { op = EPOLL_CTL_ADD; } else if (ev->read >= NXT_EVENT_BLOCKED) { events |= EPOLLIN; } nxt_epoll_change(engine, ev, op, events); } ev->write = NXT_EVENT_ACTIVE; } static void nxt_epoll_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) { int op; uint32_t events; ev->read = NXT_EVENT_INACTIVE; if (ev->write <= NXT_EVENT_DISABLED) { ev->write = NXT_EVENT_INACTIVE; op = EPOLL_CTL_DEL; events = 0; } else { op = EPOLL_CTL_MOD; events = EPOLLOUT | engine->u.epoll.mode; } nxt_epoll_change(engine, ev, op, events); } static void nxt_epoll_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) { int op; uint32_t events; ev->write = NXT_EVENT_INACTIVE; if (ev->read <= NXT_EVENT_DISABLED) { ev->read = NXT_EVENT_INACTIVE; op = EPOLL_CTL_DEL; events = 0; } else { op = EPOLL_CTL_MOD; events = EPOLLIN | engine->u.epoll.mode; } nxt_epoll_change(engine, ev, op, events); } static void nxt_epoll_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) { if (ev->read != NXT_EVENT_INACTIVE) { ev->read = NXT_EVENT_BLOCKED; } } static void nxt_epoll_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) { if (ev->write != NXT_EVENT_INACTIVE) { ev->write = NXT_EVENT_BLOCKED; } } /* * NXT_EVENT_DISABLED state is used to track whether EPOLLONESHOT * event should be added or modified, epoll_ctl(2): * * EPOLLONESHOT (since Linux 2.6.2) * Sets the one-shot behavior for the associated file descriptor. * This means that after an event is pulled out with epoll_wait(2) * the associated file descriptor is internally disabled and no * other events will be reported by the epoll interface. The user * must call epoll_ctl() with EPOLL_CTL_MOD to rearm the file * descriptor with a new event mask. */ static void nxt_epoll_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) { int op; op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; ev->read = NXT_EVENT_ONESHOT; ev->write = NXT_EVENT_INACTIVE; nxt_epoll_change(engine, ev, op, EPOLLIN | EPOLLONESHOT); } static void nxt_epoll_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) { int op; op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; ev->read = NXT_EVENT_INACTIVE; ev->write = NXT_EVENT_ONESHOT; nxt_epoll_change(engine, ev, op, EPOLLOUT | EPOLLONESHOT); } static void nxt_epoll_enable_accept(nxt_event_engine_t *engine, nxt_fd_event_t *ev) { uint32_t events; ev->read = NXT_EVENT_ACTIVE; events = EPOLLIN; #ifdef EPOLLEXCLUSIVE events |= EPOLLEXCLUSIVE; #endif nxt_epoll_change(engine, ev, EPOLL_CTL_ADD, events); } /* * epoll changes are batched to improve instruction and data cache * locality of several epoll_ctl() calls followed by epoll_wait() call. */ static void nxt_epoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, int op, uint32_t events) { nxt_epoll_change_t *change; nxt_debug(ev->task, "epoll %d set event: fd:%d op:%d ev:%XD", engine->u.epoll.fd, ev->fd, op, events); if (engine->u.epoll.nchanges >= engine->u.epoll.mchanges) { nxt_epoll_commit_changes(engine); } ev->changing = 1; change = &engine->u.epoll.changes[engine->u.epoll.nchanges++]; change->op = op; change->event.events = events; change->event.data.ptr = ev; } static void nxt_epoll_commit_changes(nxt_event_engine_t *engine) { int ret; nxt_fd_event_t *ev; nxt_epoll_change_t *change, *end; nxt_debug(&engine->task, "epoll %d changes:%ui", engine->u.epoll.fd, engine->u.epoll.nchanges); change = engine->u.epoll.changes; end = change + engine->u.epoll.nchanges; do { ev = change->event.data.ptr; ev->changing = 0; nxt_debug(ev->task, "epoll_ctl(%d): fd:%d op:%d ev:%XD", engine->u.epoll.fd, ev->fd, change->op, change->event.events); ret = epoll_ctl(engine->u.epoll.fd, change->op, ev->fd, &change->event); if (nxt_slow_path(ret != 0)) { nxt_alert(ev->task, "epoll_ctl(%d, %d, %d) failed %E", engine->u.epoll.fd, change->op, ev->fd, nxt_errno); nxt_work_queue_add(&engine->fast_work_queue, nxt_epoll_error_handler, ev->task, ev, ev->data); engine->u.epoll.error = 1; } change++; } while (change < end); engine->u.epoll.nchanges = 0; } static void nxt_epoll_error_handler(nxt_task_t *task, void *obj, void *data) { nxt_fd_event_t *ev; ev = obj; ev->read = NXT_EVENT_INACTIVE; ev->write = NXT_EVENT_INACTIVE; ev->error_handler(ev->task, ev, data); } #if (NXT_HAVE_SIGNALFD) static nxt_int_t nxt_epoll_add_signal(nxt_event_engine_t *engine) { int fd; struct epoll_event ee; if (sigprocmask(SIG_BLOCK, &engine->signals->sigmask, NULL) != 0) { nxt_alert(&engine->task, "sigprocmask(SIG_BLOCK) failed %E", nxt_errno); return NXT_ERROR; } /* * Glibc signalfd() wrapper always has the flags argument. Glibc 2.7 * and 2.8 signalfd() wrappers call the original signalfd() syscall * without the flags argument. Glibc 2.9+ signalfd() wrapper at first * tries to call signalfd4() syscall and if it fails then calls the * original signalfd() syscall. For this reason the non-blocking mode * is set separately. */ fd = signalfd(-1, &engine->signals->sigmask, 0); if (fd == -1) { nxt_alert(&engine->task, "signalfd(%d) failed %E", engine->u.epoll.signalfd.fd, nxt_errno); return NXT_ERROR; } engine->u.epoll.signalfd.fd = fd; if (nxt_fd_nonblocking(&engine->task, fd) != NXT_OK) { return NXT_ERROR; } nxt_debug(&engine->task, "signalfd(): %d", fd); engine->u.epoll.signalfd.data = engine->signals->handler; engine->u.epoll.signalfd.read_work_queue = &engine->fast_work_queue; engine->u.epoll.signalfd.read_handler = nxt_epoll_signalfd_handler; engine->u.epoll.signalfd.log = engine->task.log; engine->u.epoll.signalfd.task = &engine->task; ee.events = EPOLLIN; ee.data.ptr = &engine->u.epoll.signalfd; if (epoll_ctl(engine->u.epoll.fd, EPOLL_CTL_ADD, fd, &ee) != 0) { nxt_alert(&engine->task, "epoll_ctl(%d, %d, %d) failed %E", engine->u.epoll.fd, EPOLL_CTL_ADD, fd, nxt_errno); return NXT_ERROR; } return NXT_OK; } static void nxt_epoll_signalfd_handler(nxt_task_t *task, void *obj, void *data) { int n; nxt_fd_event_t *ev; nxt_work_handler_t handler; struct signalfd_siginfo sfd; ev = obj; handler = data; nxt_debug(task, "signalfd handler"); n = read(ev->fd, &sfd, sizeof(struct signalfd_siginfo)); nxt_debug(task, "read signalfd(%d): %d", ev->fd, n); if (n != sizeof(struct signalfd_siginfo)) { nxt_alert(task, "read signalfd(%d) failed %E", ev->fd, nxt_errno); return; } nxt_debug(task, "signalfd(%d) signo:%d", ev->fd, sfd.ssi_signo); handler(task, (void *) (uintptr_t) sfd.ssi_signo, NULL); } #endif #if (NXT_HAVE_EVENTFD) static nxt_int_t nxt_epoll_enable_post(nxt_event_engine_t *engine, nxt_work_handler_t handler) { int ret; struct epoll_event ee; engine->u.epoll.post_handler = handler; /* * Glibc eventfd() wrapper always has the flags argument. Glibc 2.7 * and 2.8 eventfd() wrappers call the original eventfd() syscall * without the flags argument. Glibc 2.9+ eventfd() wrapper at first * tries to call eventfd2() syscall and if it fails then calls the * original eventfd() syscall. For this reason the non-blocking mode * is set separately. */ engine->u.epoll.eventfd.fd = eventfd(0, 0); if (engine->u.epoll.eventfd.fd == -1) { nxt_alert(&engine->task, "eventfd() failed %E", nxt_errno); return NXT_ERROR; } ret = nxt_fd_nonblocking(&engine->task, engine->u.epoll.eventfd.fd); if (nxt_slow_path(ret != NXT_OK)) { return NXT_ERROR; } nxt_debug(&engine->task, "eventfd(): %d", engine->u.epoll.eventfd.fd); engine->u.epoll.eventfd.read_work_queue = &engine->fast_work_queue; engine->u.epoll.eventfd.read_handler = nxt_epoll_eventfd_handler; engine->u.epoll.eventfd.data = engine; engine->u.epoll.eventfd.log = engine->task.log; engine->u.epoll.eventfd.task = &engine->task; ee.events = EPOLLIN | EPOLLET; ee.data.ptr = &engine->u.epoll.eventfd; ret = epoll_ctl(engine->u.epoll.fd, EPOLL_CTL_ADD, engine->u.epoll.eventfd.fd, &ee); if (nxt_fast_path(ret == 0)) { return NXT_OK; } nxt_alert(&engine->task, "epoll_ctl(%d, %d, %d) failed %E", engine->u.epoll.fd, EPOLL_CTL_ADD, engine->u.epoll.eventfd.fd, nxt_errno); return NXT_ERROR; } static void nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data) { int n; uint64_t events; nxt_event_engine_t *engine; engine = data; nxt_debug(task, "eventfd handler, times:%ui", engine->u.epoll.neventfd); /* * The maximum value after write() to a eventfd() descriptor will * block or return EAGAIN is 0xFFFFFFFFFFFFFFFE, so the descriptor * can be read once per many notifications, for example, once per * 2^32-2 noticifcations. Since the eventfd() file descriptor is * always registered in EPOLLET mode, epoll returns event about * only the latest write() to the descriptor. */ if (engine->u.epoll.neventfd++ >= 0xFFFFFFFE) { engine->u.epoll.neventfd = 0; n = read(engine->u.epoll.eventfd.fd, &events, sizeof(uint64_t)); nxt_debug(task, "read(%d): %d events:%uL", engine->u.epoll.eventfd.fd, n, events); if (n != sizeof(uint64_t)) { nxt_alert(task, "read eventfd(%d) failed %E", engine->u.epoll.eventfd.fd, nxt_errno); } } engine->u.epoll.post_handler(task, NULL, NULL); } static void nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo) { size_t ret; uint64_t event; /* * eventfd() presents along with signalfd(), so the function * is used only to post events and the signo argument is ignored. */ event = 1; ret = write(engine->u.epoll.eventfd.fd, &event, sizeof(uint64_t)); if (nxt_slow_path(ret != sizeof(uint64_t))) { nxt_alert(&engine->task, "write(%d) to eventfd failed %E", engine->u.epoll.eventfd.fd, nxt_errno); } } #endif static void nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout) { int nevents; uint32_t events; nxt_int_t i; nxt_err_t err; nxt_bool_t error; nxt_uint_t level; nxt_fd_event_t *ev; struct epoll_event *event; if (engine->u.epoll.nchanges != 0) { nxt_epoll_commit_changes(engine); } if (engine->u.epoll.error) { engine->u.epoll.error = 0; /* Error handlers have been enqueued on failure. */ timeout = 0; } nxt_debug(&engine->task, "epoll_wait(%d) timeout:%M", engine->u.epoll.fd, timeout); nevents = epoll_wait(engine->u.epoll.fd, engine->u.epoll.events, engine->u.epoll.mevents, timeout); err = (nevents == -1) ? nxt_errno : 0; nxt_thread_time_update(engine->task.thread); nxt_debug(&engine->task, "epoll_wait(%d): %d", engine->u.epoll.fd, nevents); if (nevents == -1) { level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_ALERT; nxt_log(&engine->task, level, "epoll_wait(%d) failed %E", engine->u.epoll.fd, err); return; } for (i = 0; i < nevents; i++) { event = &engine->u.epoll.events[i]; events = event->events; ev = event->data.ptr; nxt_debug(ev->task, "epoll: fd:%d ev:%04XD d:%p rd:%d wr:%d", ev->fd, events, ev, ev->read, ev->write); /* * On error epoll may set EPOLLERR and EPOLLHUP only without EPOLLIN * or EPOLLOUT, so the "error" variable enqueues only error handler. */ error = ((events & (EPOLLERR | EPOLLHUP)) != 0); ev->epoll_error = error; if (error && ev->read <= NXT_EVENT_BLOCKED && ev->write <= NXT_EVENT_BLOCKED) { error = 0; } #if (NXT_HAVE_EPOLL_EDGE) ev->epoll_eof = ((events & EPOLLRDHUP) != 0); #endif if ((events & EPOLLIN) != 0) { ev->read_ready = 1; if (ev->read != NXT_EVENT_BLOCKED) { if (ev->read == NXT_EVENT_ONESHOT) { ev->read = NXT_EVENT_DISABLED; } nxt_work_queue_add(ev->read_work_queue, ev->read_handler, ev->task, ev, ev->data); error = 0; } else if (engine->u.epoll.mode == 0) { /* Level-triggered mode. */ nxt_epoll_disable_read(engine, ev); } } if ((events & EPOLLOUT) != 0) { ev->write_ready = 1; if (ev->write != NXT_EVENT_BLOCKED) { if (ev->write == NXT_EVENT_ONESHOT) { ev->write = NXT_EVENT_DISABLED; } nxt_work_queue_add(ev->write_work_queue, ev->write_handler, ev->task, ev, ev->data); error = 0; } else if (engine->u.epoll.mode == 0) { /* Level-triggered mode. */ nxt_epoll_disable_write(engine, ev); } } if (!error) { continue; } ev->read_ready = 1; ev->write_ready = 1; if (ev->read == NXT_EVENT_BLOCKED && ev->write == NXT_EVENT_BLOCKED) { if (engine->u.epoll.mode == 0) { /* Level-triggered mode. */ nxt_epoll_disable(engine, ev); } continue; } nxt_work_queue_add(&engine->fast_work_queue, nxt_epoll_error_handler, ev->task, ev, ev->data); } } #if (NXT_HAVE_ACCEPT4) static void nxt_epoll_conn_io_accept4(nxt_task_t *task, void *obj, void *data) { socklen_t socklen; nxt_conn_t *c; nxt_socket_t s; struct sockaddr *sa; nxt_listen_event_t *lev; lev = obj; c = lev->next; lev->ready--; lev->socket.read_ready = (lev->ready != 0); sa = &c->remote->u.sockaddr; socklen = c->remote->socklen; /* * The returned socklen is ignored here, * see comment in nxt_conn_io_accept(). */ s = accept4(lev->socket.fd, sa, &socklen, SOCK_NONBLOCK); if (s != -1) { c->socket.fd = s; nxt_debug(task, "accept4(%d): %d", lev->socket.fd, s); nxt_conn_accept(task, lev, c); return; } nxt_conn_accept_error(task, lev, "accept4", nxt_errno); } #endif #if (NXT_HAVE_EPOLL_EDGE) /* * nxt_epoll_edge_event_conn_io_connect() eliminates the getsockopt() * syscall to test pending connect() error. Although this special * interface can work in both edge-triggered and level-triggered * modes it is enabled only for the former mode because this mode is * available in all modern Linux distributions. For the latter mode * it is required to create additional nxt_epoll_level_event_conn_io * with single non-generic connect() interface. */ static void nxt_epoll_edge_conn_io_connect(nxt_task_t *task, void *obj, void *data) { nxt_conn_t *c; nxt_event_engine_t *engine; nxt_work_handler_t handler; const nxt_event_conn_state_t *state; c = obj; state = c->write_state; switch (nxt_socket_connect(task, c->socket.fd, c->remote)) { case NXT_OK: c->socket.write_ready = 1; handler = state->ready_handler; break; case NXT_AGAIN: c->socket.write_handler = nxt_epoll_edge_conn_connected; c->socket.error_handler = nxt_conn_connect_error; engine = task->thread->engine; nxt_conn_timer(engine, c, state, &c->write_timer); nxt_epoll_enable(engine, &c->socket); c->socket.read = NXT_EVENT_BLOCKED; return; #if 0 case NXT_AGAIN: nxt_conn_timer(engine, c, state, &c->write_timer); /* Fall through. */ case NXT_OK: /* * Mark both read and write directions as ready and try to perform * I/O operations before receiving readiness notifications. * On unconnected socket Linux send() and recv() return EAGAIN * instead of ENOTCONN. */ c->socket.read_ready = 1; c->socket.write_ready = 1; /* * Enabling both read and write notifications on a getting * connected socket eliminates one epoll_ctl() syscall. */ c->socket.write_handler = nxt_epoll_edge_event_conn_connected; c->socket.error_handler = state->error_handler; nxt_epoll_enable(engine, &c->socket); c->socket.read = NXT_EVENT_BLOCKED; handler = state->ready_handler; break; #endif case NXT_ERROR: handler = state->error_handler; break; default: /* NXT_DECLINED: connection refused. */ handler = state->close_handler; break; } nxt_work_queue_add(c->write_work_queue, handler, task, c, data); } static void nxt_epoll_edge_conn_connected(nxt_task_t *task, void *obj, void *data) { nxt_conn_t *c; c = obj; nxt_debug(task, "epoll event conn connected fd:%d", c->socket.fd); if (!c->socket.epoll_error) { c->socket.write = NXT_EVENT_BLOCKED; if (c->write_state->timer_autoreset) { nxt_timer_disable(task->thread->engine, &c->write_timer); } nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler, task, c, data); return; } nxt_conn_connect_test(task, c, data); } /* * nxt_epoll_edge_conn_io_recvbuf() is just wrapper around * standard nxt_conn_io_recvbuf() to enforce to read a pending EOF * in edge-triggered mode. */ static ssize_t nxt_epoll_edge_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b) { ssize_t n; n = nxt_conn_io_recvbuf(c, b); if (n > 0 && c->socket.epoll_eof) { c->socket.read_ready = 1; } return n; } #endif