diff options
author | Igor Sysoev <igor@sysoev.ru> | 2017-01-17 20:00:00 +0300 |
---|---|---|
committer | Igor Sysoev <igor@sysoev.ru> | 2017-01-17 20:00:00 +0300 |
commit | 16cbf3c076a0aca6d47adaf3f719493674cf2363 (patch) | |
tree | e6530480020f62a2bdbf249988ec3e2a751d3927 /src/nxt_kqueue.c | |
download | unit-16cbf3c076a0aca6d47adaf3f719493674cf2363.tar.gz unit-16cbf3c076a0aca6d47adaf3f719493674cf2363.tar.bz2 |
Initial version.
Diffstat (limited to 'src/nxt_kqueue.c')
-rw-r--r-- | src/nxt_kqueue.c | 1063 |
1 files changed, 1063 insertions, 0 deletions
diff --git a/src/nxt_kqueue.c b/src/nxt_kqueue.c new file mode 100644 index 00000000..25d9eefe --- /dev/null +++ b/src/nxt_kqueue.c @@ -0,0 +1,1063 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) NGINX, Inc. + */ + +#include <nxt_main.h> + + +/* + * kqueue() has been introduced in FreeBSD 4.1 and then was ported + * to OpenBSD 2.9, MacOSX 10.3 (Panther), and NetBSD 2.0. + * DragonFlyBSD inherited it with FreeBSD 4 code base. + * + * NOTE_REVOKE has been introduced in FreeBSD 4.3 and then was ported + * to OpenBSD 2.9, MacOSX 10.3 (Panther), and NetBSD 2.0. + * DragonFlyBSD inherited it with FreeBSD 4 code base. + * + * EVFILT_TIMER has been introduced in FreeBSD 4.4-STABLE and then was + * ported to NetBSD 2.0, MacOSX 10.4 (Tiger), and OpenBSD 4.2. + * DragonFlyBSD inherited it with FreeBSD 4 code base. + * + * EVFILT_USER and EV_DISPATCH have been introduced in MacOSX 10.6 (Snow + * Leopard) as part of the Grand Central Dispatch framework + * and then were ported to FreeBSD 8.0-STABLE as part of the + * libdispatch support. + */ + + +/* + * EV_DISPATCH is better because it just disables an event on delivery + * whilst EV_ONESHOT deletes the event. This eliminates in-kernel memory + * deallocation and probable subsequent allocation with a lock acquiring. + */ +#ifdef EV_DISPATCH +#define NXT_KEVENT_ONESHOT EV_DISPATCH +#else +#define NXT_KEVENT_ONESHOT EV_ONESHOT +#endif + + +#if (NXT_NETBSD) +/* NetBSD defines the kevent.udata field as intptr_t. */ + +#define nxt_kevent_set_udata(udata) (intptr_t) (udata) +#define nxt_kevent_get_udata(udata) (void *) (udata) + +#else +#define nxt_kevent_set_udata(udata) (void *) (udata) +#define nxt_kevent_get_udata(udata) (udata) +#endif + + +static nxt_event_set_t *nxt_kqueue_create(nxt_event_signals_t *signals, + nxt_uint_t mchanges, nxt_uint_t mevents); +static void nxt_kqueue_free(nxt_event_set_t *event_set); +static void nxt_kqueue_enable(nxt_event_set_t *event_set, nxt_event_fd_t *ev); +static void nxt_kqueue_disable(nxt_event_set_t *event_set, nxt_event_fd_t *ev); +static void nxt_kqueue_delete(nxt_event_set_t *event_set, nxt_event_fd_t *ev); +static void nxt_kqueue_close(nxt_event_set_t *event_set, nxt_event_fd_t *ev); +static void nxt_kqueue_drop_changes(nxt_event_set_t *event_set, + uintptr_t ident); +static void nxt_kqueue_enable_read(nxt_event_set_t *event_set, + nxt_event_fd_t *ev); +static void nxt_kqueue_enable_write(nxt_event_set_t *event_set, + nxt_event_fd_t *ev); +static void nxt_kqueue_disable_read(nxt_event_set_t *event_set, + nxt_event_fd_t *ev); +static void nxt_kqueue_disable_write(nxt_event_set_t *event_set, + nxt_event_fd_t *ev); +static void nxt_kqueue_block_read(nxt_event_set_t *event_set, + nxt_event_fd_t *ev); +static void nxt_kqueue_block_write(nxt_event_set_t *event_set, + nxt_event_fd_t *ev); +static void nxt_kqueue_oneshot_read(nxt_event_set_t *event_set, + nxt_event_fd_t *ev); +static void nxt_kqueue_oneshot_write(nxt_event_set_t *event_set, + nxt_event_fd_t *ev); +static void nxt_kqueue_enable_accept(nxt_event_set_t *event_set, + nxt_event_fd_t *ev); +static void nxt_kqueue_enable_file(nxt_event_set_t *event_set, + nxt_event_file_t *ev); +static void nxt_kqueue_close_file(nxt_event_set_t *event_set, + nxt_event_file_t *ev); +static void nxt_kqueue_fd_set(nxt_event_set_t *event_set, nxt_event_fd_t *ev, + nxt_int_t filter, nxt_uint_t flags); +static struct kevent *nxt_kqueue_get_kevent(nxt_kqueue_event_set_t *ks); +static void nxt_kqueue_commit_changes(nxt_kqueue_event_set_t *ks); +static void nxt_kqueue_error(nxt_kqueue_event_set_t *ks); +static void nxt_kqueue_fd_error_handler(nxt_thread_t *thr, void *obj, + void *data); +static void nxt_kqueue_file_error_handler(nxt_thread_t *thr, void *obj, + void *data); +static nxt_int_t nxt_kqueue_add_signal(nxt_kqueue_event_set_t *kq, + const nxt_event_sig_t *sigev); +#if (NXT_HAVE_EVFILT_USER) +static nxt_int_t nxt_kqueue_enable_post(nxt_event_set_t *event_set, + nxt_work_handler_t handler); +static void nxt_kqueue_signal(nxt_event_set_t *event_set, nxt_uint_t signo); +#endif +static void nxt_kqueue_poll(nxt_thread_t *thr, nxt_event_set_t *event_set, + nxt_msec_t timeout); + +static void nxt_kqueue_event_conn_io_connect(nxt_thread_t *thr, void *obj, + void *data); +static void nxt_kqueue_event_conn_connected(nxt_thread_t *thr, void *obj, + void *data); +static void nxt_kqueue_listen_handler(nxt_thread_t *thr, void *obj, void *data); +static void nxt_kqueue_event_conn_io_accept(nxt_thread_t *thr, void *obj, + void *data); +static void nxt_kqueue_event_conn_io_read(nxt_thread_t *thr, void *obj, + void *data); +static ssize_t nxt_kqueue_event_conn_io_recvbuf(nxt_event_conn_t *c, + nxt_buf_t *b); + + +static nxt_event_conn_io_t nxt_kqueue_event_conn_io = { + nxt_kqueue_event_conn_io_connect, + nxt_kqueue_event_conn_io_accept, + + nxt_kqueue_event_conn_io_read, + nxt_kqueue_event_conn_io_recvbuf, + nxt_event_conn_io_recv, + + nxt_event_conn_io_write, + nxt_event_conn_io_write_chunk, + +#if (NXT_HAVE_FREEBSD_SENDFILE) + nxt_freebsd_event_conn_io_sendfile, +#elif (NXT_HAVE_MACOSX_SENDFILE) + nxt_macosx_event_conn_io_sendfile, +#else + nxt_event_conn_io_sendbuf, +#endif + + nxt_event_conn_io_writev, + nxt_event_conn_io_send, + + nxt_event_conn_io_shutdown, +}; + + +const nxt_event_set_ops_t nxt_kqueue_event_set = { + "kqueue", + nxt_kqueue_create, + nxt_kqueue_free, + nxt_kqueue_enable, + nxt_kqueue_disable, + nxt_kqueue_delete, + nxt_kqueue_close, + nxt_kqueue_enable_read, + nxt_kqueue_enable_write, + nxt_kqueue_disable_read, + nxt_kqueue_disable_write, + nxt_kqueue_block_read, + nxt_kqueue_block_write, + nxt_kqueue_oneshot_read, + nxt_kqueue_oneshot_write, + nxt_kqueue_enable_accept, + nxt_kqueue_enable_file, + nxt_kqueue_close_file, +#if (NXT_HAVE_EVFILT_USER) + nxt_kqueue_enable_post, + nxt_kqueue_signal, +#else + NULL, + NULL, +#endif + nxt_kqueue_poll, + + &nxt_kqueue_event_conn_io, + + NXT_FILE_EVENTS, + NXT_SIGNAL_EVENTS, +}; + + +static nxt_event_set_t * +nxt_kqueue_create(nxt_event_signals_t *signals, nxt_uint_t mchanges, + nxt_uint_t mevents) +{ + nxt_event_set_t *event_set; + const nxt_event_sig_t *sigev; + nxt_kqueue_event_set_t *ks; + + event_set = nxt_zalloc(sizeof(nxt_kqueue_event_set_t)); + if (event_set == NULL) { + return NULL; + } + + ks = &event_set->kqueue; + + ks->kqueue = -1; + ks->mchanges = mchanges; + ks->mevents = mevents; + ks->pid = nxt_pid; + + ks->changes = nxt_malloc(sizeof(struct kevent) * mchanges); + if (ks->changes == NULL) { + goto fail; + } + + ks->events = nxt_malloc(sizeof(struct kevent) * mevents); + if (ks->events == NULL) { + goto fail; + } + + ks->kqueue = kqueue(); + if (ks->kqueue == -1) { + nxt_main_log_emerg("kqueue() failed %E", nxt_errno); + goto fail; + } + + nxt_main_log_debug("kqueue(): %d", ks->kqueue); + + if (signals != NULL) { + for (sigev = signals->sigev; sigev->signo != 0; sigev++) { + if (nxt_kqueue_add_signal(ks, sigev) != NXT_OK) { + goto fail; + } + } + } + + return event_set; + +fail: + + nxt_kqueue_free(event_set); + + return NULL; +} + + +static void +nxt_kqueue_free(nxt_event_set_t *event_set) +{ + nxt_kqueue_event_set_t *ks; + + ks = &event_set->kqueue; + + nxt_main_log_debug("kqueue %d free", ks->kqueue); + + if (ks->kqueue != -1 && ks->pid == nxt_pid) { + /* kqueue is not inherited by fork() */ + + if (close(ks->kqueue) != 0) { + nxt_main_log_emerg("kqueue close(%d) failed %E", + ks->kqueue, nxt_errno); + } + } + + nxt_free(ks->events); + nxt_free(ks->changes); + nxt_free(ks); +} + + +static void +nxt_kqueue_enable(nxt_event_set_t *event_set, nxt_event_fd_t *ev) +{ + nxt_kqueue_enable_read(event_set, ev); + nxt_kqueue_enable_write(event_set, ev); +} + + +/* + * EV_DISABLE is better because it eliminates in-kernel memory + * deallocation and probable subsequent allocation with a lock acquiring. + */ + +static void +nxt_kqueue_disable(nxt_event_set_t *event_set, nxt_event_fd_t *ev) +{ + if (ev->read != NXT_EVENT_INACTIVE) { + ev->read = NXT_EVENT_INACTIVE; + nxt_kqueue_fd_set(event_set, ev, EVFILT_READ, EV_DISABLE); + } + + if (ev->write != NXT_EVENT_INACTIVE) { + ev->write = NXT_EVENT_INACTIVE; + nxt_kqueue_fd_set(event_set, ev, EVFILT_WRITE, EV_DISABLE); + } +} + + +static void +nxt_kqueue_delete(nxt_event_set_t *event_set, nxt_event_fd_t *ev) +{ + if (ev->read != NXT_EVENT_INACTIVE) { + ev->read = NXT_EVENT_INACTIVE; + nxt_kqueue_fd_set(event_set, ev, EVFILT_READ, EV_DELETE); + } + + if (ev->write != NXT_EVENT_INACTIVE) { + ev->write = NXT_EVENT_INACTIVE; + nxt_kqueue_fd_set(event_set, ev, EVFILT_WRITE, EV_DELETE); + } +} + + +/* + * kqueue(2): + * + * Calling close() on a file descriptor will remove any kevents that + * reference the descriptor. + */ + +static void +nxt_kqueue_close(nxt_event_set_t *event_set, nxt_event_fd_t *ev) +{ + ev->read = NXT_EVENT_INACTIVE; + ev->write = NXT_EVENT_INACTIVE; + + nxt_kqueue_drop_changes(event_set, ev->fd); +} + + +static void +nxt_kqueue_drop_changes(nxt_event_set_t *event_set, uintptr_t ident) +{ + struct kevent *dst, *src, *end; + nxt_kqueue_event_set_t *ks; + + ks = &event_set->kqueue; + + dst = ks->changes; + end = dst + ks->nchanges; + + for (src = dst; src < end; src++) { + + if (src->ident == ident) { + + switch (src->filter) { + + case EVFILT_READ: + case EVFILT_WRITE: + case EVFILT_VNODE: + continue; + } + } + + if (dst != src) { + *dst = *src; + } + + dst++; + } + + ks->nchanges -= end - dst; +} + + +/* + * The kqueue event set uses only three states: inactive, blocked, and + * default. An active oneshot event is marked as it is in the default + * state. The event will eventually be converted to the default EV_CLEAR + * mode after it will become inactive after delivery. + */ + +static void +nxt_kqueue_enable_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev) +{ + if (ev->read == NXT_EVENT_INACTIVE) { + nxt_kqueue_fd_set(event_set, ev, EVFILT_READ, + EV_ADD | EV_ENABLE | EV_CLEAR); + } + + ev->read = NXT_EVENT_DEFAULT; +} + + +static void +nxt_kqueue_enable_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev) +{ + if (ev->write == NXT_EVENT_INACTIVE) { + nxt_kqueue_fd_set(event_set, ev, EVFILT_WRITE, + EV_ADD | EV_ENABLE | EV_CLEAR); + } + + ev->write = NXT_EVENT_DEFAULT; +} + + +static void +nxt_kqueue_disable_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev) +{ + ev->read = NXT_EVENT_INACTIVE; + + nxt_kqueue_fd_set(event_set, ev, EVFILT_READ, EV_DISABLE); +} + + +static void +nxt_kqueue_disable_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev) +{ + ev->write = NXT_EVENT_INACTIVE; + + nxt_kqueue_fd_set(event_set, ev, EVFILT_WRITE, EV_DISABLE); +} + + +static void +nxt_kqueue_block_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev) +{ + if (ev->read != NXT_EVENT_INACTIVE) { + ev->read = NXT_EVENT_BLOCKED; + } +} + + +static void +nxt_kqueue_block_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev) +{ + if (ev->write != NXT_EVENT_INACTIVE) { + ev->write = NXT_EVENT_BLOCKED; + } +} + + +static void +nxt_kqueue_oneshot_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev) +{ + ev->write = NXT_EVENT_DEFAULT; + + nxt_kqueue_fd_set(event_set, ev, EVFILT_WRITE, + EV_ADD | EV_ENABLE | NXT_KEVENT_ONESHOT); +} + + +static void +nxt_kqueue_oneshot_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev) +{ + ev->write = NXT_EVENT_DEFAULT; + + nxt_kqueue_fd_set(event_set, ev, EVFILT_WRITE, + EV_ADD | EV_ENABLE | NXT_KEVENT_ONESHOT); +} + + +static void +nxt_kqueue_enable_accept(nxt_event_set_t *event_set, nxt_event_fd_t *ev) +{ + ev->read = NXT_EVENT_DEFAULT; + ev->read_handler = nxt_kqueue_listen_handler; + + nxt_kqueue_fd_set(event_set, ev, EVFILT_READ, EV_ADD | EV_ENABLE); +} + + +static void +nxt_kqueue_enable_file(nxt_event_set_t *event_set, nxt_event_file_t *ev) +{ + struct kevent *kev; + nxt_kqueue_event_set_t *ks; + + ks = &event_set->kqueue; + + kev = nxt_kqueue_get_kevent(ks); + + kev->ident = ev->file->fd; + kev->filter = EVFILT_VNODE; + kev->flags = EV_ADD | EV_ENABLE | EV_ONESHOT; + kev->fflags = NOTE_DELETE | NOTE_WRITE | NOTE_EXTEND + | NOTE_ATTRIB | NOTE_RENAME | NOTE_REVOKE; + kev->data = 0; + kev->udata = nxt_kevent_set_udata(ev); + + nxt_thread_log_debug("kevent(%d) set: id:%d ft:%i fl:%04Xd, ff:%04XuD", + ks->kqueue, ev->file->fd, EVFILT_VNODE, + kev->flags, kev->fflags); +} + + +static void +nxt_kqueue_close_file(nxt_event_set_t *event_set, nxt_event_file_t *ev) +{ + nxt_kqueue_drop_changes(event_set, ev->file->fd); +} + + +static void +nxt_kqueue_fd_set(nxt_event_set_t *event_set, nxt_event_fd_t *ev, + nxt_int_t filter, nxt_uint_t flags) +{ + struct kevent *kev; + nxt_kqueue_event_set_t *ks; + + ks = &event_set->kqueue; + + nxt_log_debug(ev->log, "kevent(%d) set event: id:%d ft:%i fl:%04Xui", + ks->kqueue, ev->fd, filter, flags); + + kev = nxt_kqueue_get_kevent(ks); + + kev->ident = ev->fd; + kev->filter = filter; + kev->flags = flags; + kev->fflags = 0; + kev->data = 0; + kev->udata = nxt_kevent_set_udata(ev); +} + + +static struct kevent * +nxt_kqueue_get_kevent(nxt_kqueue_event_set_t *ks) +{ + if (nxt_slow_path(ks->nchanges >= ks->mchanges)) { + nxt_kqueue_commit_changes(ks); + } + + return &ks->changes[ks->nchanges++]; +} + + +static void +nxt_kqueue_commit_changes(nxt_kqueue_event_set_t *ks) +{ + nxt_thread_log_debug("kevent(%d) changes:%d", ks->kqueue, ks->nchanges); + + if (kevent(ks->kqueue, ks->changes, ks->nchanges, NULL, 0, NULL) != 0) { + nxt_thread_log_alert("kevent(%d) failed %E", ks->kqueue, nxt_errno); + + nxt_kqueue_error(ks); + } + + ks->nchanges = 0; +} + + +static void +nxt_kqueue_error(nxt_kqueue_event_set_t *ks) +{ + struct kevent *kev, *end; + nxt_thread_t *thr; + nxt_event_fd_t *ev; + nxt_event_file_t *fev; + + thr = nxt_thread(); + end = &ks->changes[ks->nchanges]; + + for (kev = ks->changes; kev < end; kev++) { + + switch (kev->filter) { + + case EVFILT_READ: + case EVFILT_WRITE: + ev = nxt_kevent_get_udata(kev->udata); + nxt_thread_work_queue_add(thr, &thr->work_queue.main, + nxt_kqueue_fd_error_handler, + ev, ev->data, ev->log); + break; + + case EVFILT_VNODE: + fev = nxt_kevent_get_udata(kev->udata); + nxt_thread_work_queue_add(thr, &thr->work_queue.main, + nxt_kqueue_file_error_handler, + fev, fev->data, thr->log); + break; + } + } +} + + +static void +nxt_kqueue_fd_error_handler(nxt_thread_t *thr, void *obj, void *data) +{ + nxt_event_fd_t *ev; + + ev = obj; + + if (ev->kq_eof && ev->kq_errno != 0) { + ev->error = ev->kq_errno; + nxt_log_error(nxt_socket_error_level(ev->kq_errno, ev->log_error), + thr->log, "kevent() reported error on descriptor %d %E", + ev->fd, ev->kq_errno); + } + + ev->read = NXT_EVENT_INACTIVE; + ev->write = NXT_EVENT_INACTIVE; + ev->error = ev->kq_errno; + + ev->error_handler(thr, ev, data); +} + + +static void +nxt_kqueue_file_error_handler(nxt_thread_t *thr, void *obj, void *data) +{ + nxt_event_file_t *ev; + + ev = obj; + + ev->handler(thr, ev, data); +} + + +static nxt_int_t +nxt_kqueue_add_signal(nxt_kqueue_event_set_t *ks, const nxt_event_sig_t *sigev) +{ + int signo; + struct kevent kev; + struct sigaction sa; + + signo = sigev->signo; + + nxt_memzero(&sa, sizeof(struct sigaction)); + sigemptyset(&sa.sa_mask); + + /* + * SIGCHLD must not be set to SIG_IGN, since kqueue cannot catch + * this signal. It should be set to SIG_DFL instead. And although + * SIGCHLD default action is also ignoring, nevertheless SIG_DFL + * allows kqueue to catch the signal. + */ + sa.sa_handler = (signo == SIGCHLD) ? SIG_DFL : SIG_IGN; + + if (sigaction(signo, &sa, NULL) != 0) { + nxt_main_log_alert("sigaction(%d) failed %E", signo, nxt_errno); + return NXT_ERROR; + } + + nxt_main_log_debug("kevent(%d) signo:%d (%s)", + ks->kqueue, signo, sigev->name); + + kev.ident = signo; + kev.filter = EVFILT_SIGNAL; + kev.flags = EV_ADD; + kev.fflags = 0; + kev.data = 0; + kev.udata = nxt_kevent_set_udata(sigev); + + if (kevent(ks->kqueue, &kev, 1, NULL, 0, NULL) == 0) { + return NXT_OK; + } + + nxt_main_log_alert("kevent(%d) failed %E", ks->kqueue, nxt_errno); + return NXT_ERROR; +} + + +#if (NXT_HAVE_EVFILT_USER) + +static nxt_int_t +nxt_kqueue_enable_post(nxt_event_set_t *event_set, nxt_work_handler_t handler) +{ + struct kevent kev; + nxt_kqueue_event_set_t *ks; + + /* EVFILT_USER must be added to a kqueue before it can be triggered. */ + + kev.ident = 0; + kev.filter = EVFILT_USER; + kev.flags = EV_ADD | EV_CLEAR; + kev.fflags = 0; + kev.data = 0; + kev.udata = NULL; + + ks = &event_set->kqueue; + ks->post_handler = handler; + + if (kevent(ks->kqueue, &kev, 1, NULL, 0, NULL) == 0) { + return NXT_OK; + } + + nxt_main_log_alert("kevent(%d) failed %E", ks->kqueue, nxt_errno); + return NXT_ERROR; +} + + +static void +nxt_kqueue_signal(nxt_event_set_t *event_set, nxt_uint_t signo) +{ + struct kevent kev; + nxt_kqueue_event_set_t *ks; + + /* + * kqueue has a builtin signal processing support, so the function + * is used only to post events and the signo argument is ignored. + */ + + kev.ident = 0; + kev.filter = EVFILT_USER; + kev.flags = 0; + kev.fflags = NOTE_TRIGGER; + kev.data = 0; + kev.udata = NULL; + + ks = &event_set->kqueue; + + if (kevent(ks->kqueue, &kev, 1, NULL, 0, NULL) != 0) { + nxt_thread_log_alert("kevent(%d) failed %E", ks->kqueue, nxt_errno); + } +} + +#endif + + +static void +nxt_kqueue_poll(nxt_thread_t *thr, nxt_event_set_t *event_set, + nxt_msec_t timeout) +{ + int nevents; + void *obj, *data; + nxt_int_t i; + nxt_err_t err; + nxt_log_t *log; + nxt_uint_t level; + nxt_bool_t error, eof; + struct kevent *kev; + nxt_event_fd_t *ev; + nxt_event_sig_t *sigev; + struct timespec ts, *tp; + nxt_event_file_t *fev; + nxt_work_queue_t *wq; + nxt_work_handler_t handler; + nxt_kqueue_event_set_t *ks; + + if (timeout == NXT_INFINITE_MSEC) { + tp = NULL; + + } else { + ts.tv_sec = timeout / 1000; + ts.tv_nsec = (timeout % 1000) * 1000000; + tp = &ts; + } + + ks = &event_set->kqueue; + + nxt_log_debug(thr->log, "kevent(%d) changes:%d timeout:%M", + ks->kqueue, ks->nchanges, timeout); + + nevents = kevent(ks->kqueue, ks->changes, ks->nchanges, + ks->events, ks->mevents, tp); + + err = (nevents == -1) ? nxt_errno : 0; + + nxt_thread_time_update(thr); + + nxt_log_debug(thr->log, "kevent(%d): %d", ks->kqueue, nevents); + + if (nevents == -1) { + level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_ALERT; + nxt_log_error(level, thr->log, "kevent(%d) failed %E", ks->kqueue, err); + + nxt_kqueue_error(ks); + return; + } + + ks->nchanges = 0; + + for (i = 0; i < nevents; i++) { + + kev = &ks->events[i]; + + nxt_log_debug(thr->log, + (kev->ident > 0x8000000 && kev->ident != (uintptr_t) -1) ? + "kevent: id:%p ft:%d fl:%04Xd ff:%d d:%d ud:%p": + "kevent: id:%d ft:%d fl:%04Xd ff:%d d:%d ud:%p", + kev->ident, kev->filter, kev->flags, kev->fflags, + kev->data, kev->udata); + + error = (kev->flags & EV_ERROR); + + if (nxt_slow_path(error)) { + nxt_log_alert(thr->log, "kevent(%d) error %E on ident:%d filter:%d", + ks->kqueue, kev->data, kev->ident, kev->filter); + } + + wq = &thr->work_queue.main; + handler = nxt_kqueue_fd_error_handler; + obj = nxt_kevent_get_udata(kev->udata); + log = thr->log; + + switch (kev->filter) { + + case EVFILT_READ: + ev = obj; + ev->read_ready = 1; + ev->kq_available = (int32_t) kev->data; + err = kev->fflags; + eof = (kev->flags & EV_EOF) != 0; + ev->kq_errno = err; + ev->kq_eof = eof; + + if (ev->read == NXT_EVENT_BLOCKED) { + nxt_log_debug(ev->log, "blocked read event fd:%d", ev->fd); + continue; + } + + if ((kev->flags & NXT_KEVENT_ONESHOT) != 0) { + ev->read = NXT_EVENT_INACTIVE; + } + + if (nxt_slow_path(ev->kq_available == 0 && eof && err != 0)) { + error = 1; + } + + if (nxt_fast_path(!error)) { + handler = ev->read_handler; + wq = ev->read_work_queue; + } + + data = ev->data; + log = ev->log; + + break; + + case EVFILT_WRITE: + ev = obj; + ev->write_ready = 1; + err = kev->fflags; + eof = (kev->flags & EV_EOF) != 0; + ev->kq_errno = err; + ev->kq_eof = eof; + + if (ev->write == NXT_EVENT_BLOCKED) { + nxt_log_debug(ev->log, "blocked write event fd:%d", ev->fd); + continue; + } + + if ((kev->flags & NXT_KEVENT_ONESHOT) != 0) { + ev->write = NXT_EVENT_INACTIVE; + } + + if (nxt_slow_path(eof && err != 0)) { + error = 1; + } + + if (nxt_fast_path(!error)) { + handler = ev->write_handler; + wq = ev->write_work_queue; + } + + data = ev->data; + log = ev->log; + + break; + + case EVFILT_VNODE: + fev = obj; + handler = fev->handler; + data = fev->data; + break; + + case EVFILT_SIGNAL: + sigev = obj; + obj = (void *) kev->ident; + handler = sigev->handler; + data = (void *) sigev->name; + break; + +#if (NXT_HAVE_EVFILT_USER) + + case EVFILT_USER: + handler = ks->post_handler; + data = NULL; + break; + +#endif + + default: + +#if (NXT_DEBUG) + nxt_log_alert(thr->log, + "unexpected kevent(%d) filter %d on ident %d", + ks->kqueue, kev->filter, kev->ident); +#endif + + continue; + } + + nxt_thread_work_queue_add(thr, wq, handler, obj, data, log); + } +} + + +/* + * nxt_kqueue_event_conn_io_connect() eliminates the + * getsockopt() syscall to test pending connect() error. + */ + +static void +nxt_kqueue_event_conn_io_connect(nxt_thread_t *thr, void *obj, void *data) +{ + nxt_event_conn_t *c; + nxt_work_handler_t handler; + const nxt_event_conn_state_t *state; + + c = obj; + + state = c->write_state; + + switch (nxt_socket_connect(c->socket.fd, c->remote) ){ + + case NXT_OK: + c->socket.write_ready = 1; + handler = state->ready_handler; + break; + + case NXT_AGAIN: + c->socket.write_handler = nxt_kqueue_event_conn_connected; + c->socket.error_handler = nxt_event_conn_connect_error; + + nxt_event_conn_timer(thr->engine, c, state, &c->write_timer); + + nxt_kqueue_enable_write(thr->engine->event_set, &c->socket); + return; + + case NXT_DECLINED: + handler = state->close_handler; + break; + + default: /* NXT_ERROR */ + handler = state->error_handler; + break; + } + + nxt_event_conn_io_handle(thr, c->write_work_queue, handler, c, data); +} + + +static void +nxt_kqueue_event_conn_connected(nxt_thread_t *thr, void *obj, void *data) +{ + nxt_event_conn_t *c; + + c = obj; + + nxt_log_debug(thr->log, "kqueue event conn connected fd:%d", c->socket.fd); + + c->socket.write = NXT_EVENT_BLOCKED; + + if (c->write_state->autoreset_timer) { + nxt_event_timer_disable(&c->write_timer); + } + + nxt_thread_work_queue_add(thr, c->write_work_queue, + c->write_state->ready_handler, + c, data, c->socket.log); +} + + +static void +nxt_kqueue_listen_handler(nxt_thread_t *thr, void *obj, void *data) +{ + nxt_event_conn_listen_t *cls; + + cls = obj; + + nxt_log_debug(thr->log, "kevent fd:%d avail:%D", + cls->socket.fd, cls->socket.kq_available); + + cls->ready = nxt_min(cls->batch, (uint32_t) cls->socket.kq_available); + + nxt_kqueue_event_conn_io_accept(thr, cls, data); +} + + +static void +nxt_kqueue_event_conn_io_accept(nxt_thread_t *thr, void *obj, void *data) +{ + socklen_t len; + nxt_socket_t s; + struct sockaddr *sa; + nxt_event_conn_t *c; + nxt_event_conn_listen_t *cls; + + cls = obj; + c = data; + + cls->ready--; + cls->socket.read_ready = (cls->ready != 0); + + cls->socket.kq_available--; + cls->socket.read_ready = (cls->socket.kq_available != 0); + + len = nxt_socklen(c->remote); + + if (len >= sizeof(struct sockaddr)) { + sa = &c->remote->u.sockaddr; + + } else { + sa = NULL; + len = 0; + } + + s = accept(cls->socket.fd, sa, &len); + + if (s != -1) { + c->socket.fd = s; + + nxt_log_debug(thr->log, "accept(%d): %d", cls->socket.fd, s); + + nxt_event_conn_accept(thr, cls, c); + return; + } + + nxt_event_conn_accept_error(thr, cls, "accept", nxt_errno); +} + + +/* + * nxt_kqueue_event_conn_io_read() is just a wrapper to eliminate the + * readv() or recv() syscall if a remote side just closed connection. + */ + +static void +nxt_kqueue_event_conn_io_read(nxt_thread_t *thr, void *obj, void *data) +{ + nxt_event_conn_t *c; + + c = obj; + + nxt_log_debug(thr->log, "kqueue event conn read fd:%d", c->socket.fd); + + if (c->socket.kq_available == 0 && c->socket.kq_eof) { + nxt_log_debug(thr->log, "kevent fd:%d eof", c->socket.fd); + + c->socket.closed = 1; + nxt_thread_work_queue_add(thr, c->read_work_queue, + c->read_state->close_handler, + c, data, c->socket.log); + return; + } + + nxt_event_conn_io_read(thr, c, data); +} + + +/* + * nxt_kqueue_event_conn_io_recvbuf() is just wrapper around standard + * nxt_event_conn_io_recvbuf() to eliminate the readv() or recv() syscalls + * if there is no pending data or a remote side closed connection. + */ + +static ssize_t +nxt_kqueue_event_conn_io_recvbuf(nxt_event_conn_t *c, nxt_buf_t *b) +{ + ssize_t n; + + if (c->socket.kq_available == 0 && c->socket.kq_eof) { + c->socket.closed = 1; + return 0; + } + + n = nxt_event_conn_io_recvbuf(c, b); + + if (n > 0) { + c->socket.kq_available -= n; + + if (c->socket.kq_available < 0) { + c->socket.kq_available = 0; + } + + nxt_log_debug(c->socket.log, "kevent fd:%d avail:%D eof:%d", + c->socket.fd, c->socket.kq_available, c->socket.kq_eof); + + c->socket.read_ready = (c->socket.kq_available != 0 + || c->socket.kq_eof); + } + + return n; +} |