diff options
Diffstat (limited to '')
-rw-r--r-- | src/nxt_kqueue_engine.c (renamed from src/nxt_kqueue.c) | 481 |
1 files changed, 219 insertions, 262 deletions
diff --git a/src/nxt_kqueue.c b/src/nxt_kqueue_engine.c index a5a596b9..b4057389 100644 --- a/src/nxt_kqueue.c +++ b/src/nxt_kqueue_engine.c @@ -51,55 +51,52 @@ #endif -static nxt_event_set_t *nxt_kqueue_create(nxt_event_signals_t *signals, +static nxt_int_t nxt_kqueue_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, nxt_uint_t mevents); -static void nxt_kqueue_free(nxt_event_set_t *event_set); -static void nxt_kqueue_enable(nxt_event_set_t *event_set, nxt_event_fd_t *ev); -static void nxt_kqueue_disable(nxt_event_set_t *event_set, nxt_event_fd_t *ev); -static void nxt_kqueue_delete(nxt_event_set_t *event_set, nxt_event_fd_t *ev); -static void nxt_kqueue_close(nxt_event_set_t *event_set, nxt_event_fd_t *ev); -static void nxt_kqueue_drop_changes(nxt_event_set_t *event_set, - uintptr_t ident); -static void nxt_kqueue_enable_read(nxt_event_set_t *event_set, - nxt_event_fd_t *ev); -static void nxt_kqueue_enable_write(nxt_event_set_t *event_set, - nxt_event_fd_t *ev); -static void nxt_kqueue_disable_read(nxt_event_set_t *event_set, - nxt_event_fd_t *ev); -static void nxt_kqueue_disable_write(nxt_event_set_t *event_set, - nxt_event_fd_t *ev); -static void nxt_kqueue_block_read(nxt_event_set_t *event_set, - nxt_event_fd_t *ev); -static void nxt_kqueue_block_write(nxt_event_set_t *event_set, - nxt_event_fd_t *ev); -static void nxt_kqueue_oneshot_read(nxt_event_set_t *event_set, - nxt_event_fd_t *ev); -static void nxt_kqueue_oneshot_write(nxt_event_set_t *event_set, - nxt_event_fd_t *ev); -static void nxt_kqueue_enable_accept(nxt_event_set_t *event_set, - nxt_event_fd_t *ev); -static void nxt_kqueue_enable_file(nxt_event_set_t *event_set, +static void nxt_kqueue_free(nxt_event_engine_t *engine); +static void nxt_kqueue_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); +static void nxt_kqueue_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); +static void nxt_kqueue_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev); +static nxt_bool_t nxt_kqueue_close(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); +static void nxt_kqueue_enable_read(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); +static void nxt_kqueue_enable_write(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); +static void nxt_kqueue_disable_read(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); +static void nxt_kqueue_disable_write(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); +static void nxt_kqueue_block_read(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); +static void nxt_kqueue_block_write(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); +static void nxt_kqueue_oneshot_read(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); +static void nxt_kqueue_oneshot_write(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); +static void nxt_kqueue_enable_accept(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); +static void nxt_kqueue_enable_file(nxt_event_engine_t *engine, nxt_event_file_t *ev); -static void nxt_kqueue_close_file(nxt_event_set_t *event_set, +static void nxt_kqueue_close_file(nxt_event_engine_t *engine, nxt_event_file_t *ev); -static void nxt_kqueue_fd_set(nxt_event_set_t *event_set, nxt_event_fd_t *ev, +static void nxt_kqueue_fd_set(nxt_event_engine_t *engine, nxt_fd_event_t *ev, nxt_int_t filter, nxt_uint_t flags); -static struct kevent *nxt_kqueue_get_kevent(nxt_kqueue_event_set_t *ks); -static void nxt_kqueue_commit_changes(nxt_kqueue_event_set_t *ks); -static void nxt_kqueue_error(nxt_kqueue_event_set_t *ks); +static struct kevent *nxt_kqueue_get_kevent(nxt_event_engine_t *engine); +static void nxt_kqueue_error(nxt_event_engine_t *engine); static void nxt_kqueue_fd_error_handler(nxt_task_t *task, void *obj, void *data); static void nxt_kqueue_file_error_handler(nxt_task_t *task, void *obj, void *data); -static nxt_int_t nxt_kqueue_add_signal(nxt_kqueue_event_set_t *kq, - const nxt_event_sig_t *sigev); +static nxt_int_t nxt_kqueue_add_signal(nxt_event_engine_t *engine, + const nxt_sig_event_t *sigev); #if (NXT_HAVE_EVFILT_USER) -static nxt_int_t nxt_kqueue_enable_post(nxt_event_set_t *event_set, +static nxt_int_t nxt_kqueue_enable_post(nxt_event_engine_t *engine, nxt_work_handler_t handler); -static void nxt_kqueue_signal(nxt_event_set_t *event_set, nxt_uint_t signo); +static void nxt_kqueue_signal(nxt_event_engine_t *engine, nxt_uint_t signo); #endif -static void nxt_kqueue_poll(nxt_task_t *task, nxt_event_set_t *event_set, - nxt_msec_t timeout); +static void nxt_kqueue_poll(nxt_event_engine_t *engine, nxt_msec_t timeout); static void nxt_kqueue_event_conn_io_connect(nxt_task_t *task, void *obj, void *data); @@ -140,7 +137,7 @@ static nxt_event_conn_io_t nxt_kqueue_event_conn_io = { }; -const nxt_event_set_ops_t nxt_kqueue_event_set = { +const nxt_event_interface_t nxt_kqueue_engine = { "kqueue", nxt_kqueue_create, nxt_kqueue_free, @@ -175,91 +172,83 @@ const nxt_event_set_ops_t nxt_kqueue_event_set = { }; -static nxt_event_set_t * -nxt_kqueue_create(nxt_event_signals_t *signals, nxt_uint_t mchanges, +static nxt_int_t +nxt_kqueue_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, nxt_uint_t mevents) { - nxt_event_set_t *event_set; - const nxt_event_sig_t *sigev; - nxt_kqueue_event_set_t *ks; - - event_set = nxt_zalloc(sizeof(nxt_kqueue_event_set_t)); - if (event_set == NULL) { - return NULL; - } + const nxt_sig_event_t *sigev; - ks = &event_set->kqueue; + engine->u.kqueue.fd = -1; + engine->u.kqueue.mchanges = mchanges; + engine->u.kqueue.mevents = mevents; + engine->u.kqueue.pid = nxt_pid; - ks->kqueue = -1; - ks->mchanges = mchanges; - ks->mevents = mevents; - ks->pid = nxt_pid; - - ks->changes = nxt_malloc(sizeof(struct kevent) * mchanges); - if (ks->changes == NULL) { + engine->u.kqueue.changes = nxt_malloc(sizeof(struct kevent) * mchanges); + if (engine->u.kqueue.changes == NULL) { goto fail; } - ks->events = nxt_malloc(sizeof(struct kevent) * mevents); - if (ks->events == NULL) { + engine->u.kqueue.events = nxt_malloc(sizeof(struct kevent) * mevents); + if (engine->u.kqueue.events == NULL) { goto fail; } - ks->kqueue = kqueue(); - if (ks->kqueue == -1) { - nxt_main_log_emerg("kqueue() failed %E", nxt_errno); + engine->u.kqueue.fd = kqueue(); + if (engine->u.kqueue.fd == -1) { + nxt_log(&engine->task, NXT_LOG_CRIT, "kqueue() failed %E", nxt_errno); goto fail; } - nxt_main_log_debug("kqueue(): %d", ks->kqueue); + nxt_debug(&engine->task, "kqueue(): %d", engine->u.kqueue.fd); - if (signals != NULL) { - for (sigev = signals->sigev; sigev->signo != 0; sigev++) { - if (nxt_kqueue_add_signal(ks, sigev) != NXT_OK) { + if (engine->signals != NULL) { + for (sigev = engine->signals->sigev; sigev->signo != 0; sigev++) { + if (nxt_kqueue_add_signal(engine, sigev) != NXT_OK) { goto fail; } } } - return event_set; + return NXT_OK; fail: - nxt_kqueue_free(event_set); + nxt_kqueue_free(engine); - return NULL; + return NXT_ERROR; } static void -nxt_kqueue_free(nxt_event_set_t *event_set) +nxt_kqueue_free(nxt_event_engine_t *engine) { - nxt_kqueue_event_set_t *ks; + nxt_fd_t fd; - ks = &event_set->kqueue; + fd = engine->u.kqueue.fd; - nxt_main_log_debug("kqueue %d free", ks->kqueue); + nxt_debug(&engine->task, "kqueue %d free", fd); - if (ks->kqueue != -1 && ks->pid == nxt_pid) { + if (fd != -1 && engine->u.kqueue.pid == nxt_pid) { /* kqueue is not inherited by fork() */ - if (close(ks->kqueue) != 0) { - nxt_main_log_emerg("kqueue close(%d) failed %E", - ks->kqueue, nxt_errno); + if (close(fd) != 0) { + nxt_log(&engine->task, NXT_LOG_CRIT, "kqueue close(%d) failed %E", + fd, nxt_errno); } } - nxt_free(ks->events); - nxt_free(ks->changes); - nxt_free(ks); + nxt_free(engine->u.kqueue.events); + nxt_free(engine->u.kqueue.changes); + + nxt_memzero(&engine->u.kqueue, sizeof(nxt_kqueue_engine_t)); } static void -nxt_kqueue_enable(nxt_event_set_t *event_set, nxt_event_fd_t *ev) +nxt_kqueue_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev) { - nxt_kqueue_enable_read(event_set, ev); - nxt_kqueue_enable_write(event_set, ev); + nxt_kqueue_enable_read(engine, ev); + nxt_kqueue_enable_write(engine, ev); } @@ -269,31 +258,31 @@ nxt_kqueue_enable(nxt_event_set_t *event_set, nxt_event_fd_t *ev) */ static void -nxt_kqueue_disable(nxt_event_set_t *event_set, nxt_event_fd_t *ev) +nxt_kqueue_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev) { if (ev->read != NXT_EVENT_INACTIVE) { ev->read = NXT_EVENT_INACTIVE; - nxt_kqueue_fd_set(event_set, ev, EVFILT_READ, EV_DISABLE); + nxt_kqueue_fd_set(engine, ev, EVFILT_READ, EV_DISABLE); } if (ev->write != NXT_EVENT_INACTIVE) { ev->write = NXT_EVENT_INACTIVE; - nxt_kqueue_fd_set(event_set, ev, EVFILT_WRITE, EV_DISABLE); + nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE, EV_DISABLE); } } static void -nxt_kqueue_delete(nxt_event_set_t *event_set, nxt_event_fd_t *ev) +nxt_kqueue_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev) { if (ev->read != NXT_EVENT_INACTIVE) { ev->read = NXT_EVENT_INACTIVE; - nxt_kqueue_fd_set(event_set, ev, EVFILT_READ, EV_DELETE); + nxt_kqueue_fd_set(engine, ev, EVFILT_READ, EV_DELETE); } if (ev->write != NXT_EVENT_INACTIVE) { ev->write = NXT_EVENT_INACTIVE; - nxt_kqueue_fd_set(event_set, ev, EVFILT_WRITE, EV_DELETE); + nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE, EV_DELETE); } } @@ -303,104 +292,72 @@ nxt_kqueue_delete(nxt_event_set_t *event_set, nxt_event_fd_t *ev) * * Calling close() on a file descriptor will remove any kevents that * reference the descriptor. + * + * nxt_kqueue_close() always returns true as there are pending events on + * closing file descriptor because kevent() passes whole change list at once. */ -static void -nxt_kqueue_close(nxt_event_set_t *event_set, nxt_event_fd_t *ev) +static nxt_bool_t +nxt_kqueue_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev) { ev->read = NXT_EVENT_INACTIVE; ev->write = NXT_EVENT_INACTIVE; - nxt_kqueue_drop_changes(event_set, ev->fd); -} - - -static void -nxt_kqueue_drop_changes(nxt_event_set_t *event_set, uintptr_t ident) -{ - struct kevent *dst, *src, *end; - nxt_kqueue_event_set_t *ks; - - ks = &event_set->kqueue; - - dst = ks->changes; - end = dst + ks->nchanges; - - for (src = dst; src < end; src++) { - - if (src->ident == ident) { - - switch (src->filter) { - - case EVFILT_READ: - case EVFILT_WRITE: - case EVFILT_VNODE: - continue; - } - } - - if (dst != src) { - *dst = *src; - } - - dst++; - } - - ks->nchanges -= end - dst; + return 1; } /* - * The kqueue event set uses only three states: inactive, blocked, and - * default. An active oneshot event is marked as it is in the default - * state. The event will eventually be converted to the default EV_CLEAR + * The kqueue event engine uses only three states: inactive, blocked, and + * active. An active oneshot event is marked as it is in the default + * state. The event will be converted eventually to the default EV_CLEAR * mode after it will become inactive after delivery. */ static void -nxt_kqueue_enable_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev) +nxt_kqueue_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) { if (ev->read == NXT_EVENT_INACTIVE) { - nxt_kqueue_fd_set(event_set, ev, EVFILT_READ, + nxt_kqueue_fd_set(engine, ev, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR); } - ev->read = NXT_EVENT_DEFAULT; + ev->read = NXT_EVENT_ACTIVE; } static void -nxt_kqueue_enable_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev) +nxt_kqueue_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) { if (ev->write == NXT_EVENT_INACTIVE) { - nxt_kqueue_fd_set(event_set, ev, EVFILT_WRITE, + nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE, EV_ADD | EV_ENABLE | EV_CLEAR); } - ev->write = NXT_EVENT_DEFAULT; + ev->write = NXT_EVENT_ACTIVE; } static void -nxt_kqueue_disable_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev) +nxt_kqueue_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) { ev->read = NXT_EVENT_INACTIVE; - nxt_kqueue_fd_set(event_set, ev, EVFILT_READ, EV_DISABLE); + nxt_kqueue_fd_set(engine, ev, EVFILT_READ, EV_DISABLE); } static void -nxt_kqueue_disable_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev) +nxt_kqueue_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) { ev->write = NXT_EVENT_INACTIVE; - nxt_kqueue_fd_set(event_set, ev, EVFILT_WRITE, EV_DISABLE); + nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE, EV_DISABLE); } static void -nxt_kqueue_block_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev) +nxt_kqueue_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) { if (ev->read != NXT_EVENT_INACTIVE) { ev->read = NXT_EVENT_BLOCKED; @@ -409,7 +366,7 @@ nxt_kqueue_block_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev) static void -nxt_kqueue_block_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev) +nxt_kqueue_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) { if (ev->write != NXT_EVENT_INACTIVE) { ev->write = NXT_EVENT_BLOCKED; @@ -418,79 +375,75 @@ nxt_kqueue_block_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev) static void -nxt_kqueue_oneshot_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev) +nxt_kqueue_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) { - ev->write = NXT_EVENT_DEFAULT; + ev->write = NXT_EVENT_ACTIVE; - nxt_kqueue_fd_set(event_set, ev, EVFILT_WRITE, + nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE, EV_ADD | EV_ENABLE | NXT_KEVENT_ONESHOT); } static void -nxt_kqueue_oneshot_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev) +nxt_kqueue_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) { - ev->write = NXT_EVENT_DEFAULT; + ev->write = NXT_EVENT_ACTIVE; - nxt_kqueue_fd_set(event_set, ev, EVFILT_WRITE, + nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE, EV_ADD | EV_ENABLE | NXT_KEVENT_ONESHOT); } static void -nxt_kqueue_enable_accept(nxt_event_set_t *event_set, nxt_event_fd_t *ev) +nxt_kqueue_enable_accept(nxt_event_engine_t *engine, nxt_fd_event_t *ev) { - ev->read = NXT_EVENT_DEFAULT; + ev->read = NXT_EVENT_ACTIVE; ev->read_handler = nxt_kqueue_listen_handler; - nxt_kqueue_fd_set(event_set, ev, EVFILT_READ, EV_ADD | EV_ENABLE); + nxt_kqueue_fd_set(engine, ev, EVFILT_READ, EV_ADD | EV_ENABLE); } static void -nxt_kqueue_enable_file(nxt_event_set_t *event_set, nxt_event_file_t *ev) +nxt_kqueue_enable_file(nxt_event_engine_t *engine, nxt_event_file_t *ev) { - struct kevent *kev; - nxt_kqueue_event_set_t *ks; + struct kevent *kev; + + const nxt_int_t flags = EV_ADD | EV_ENABLE | EV_ONESHOT; + const nxt_uint_t fflags = NOTE_DELETE | NOTE_WRITE | NOTE_EXTEND + | NOTE_ATTRIB | NOTE_RENAME | NOTE_REVOKE; - ks = &event_set->kqueue; + nxt_debug(&engine->task, "kevent(%d) set: id:%d ft:%i fl:%04Xd, ff:%04XuD", + engine->u.kqueue.fd, ev->file->fd, EVFILT_VNODE, flags, fflags); - kev = nxt_kqueue_get_kevent(ks); + kev = nxt_kqueue_get_kevent(engine); kev->ident = ev->file->fd; kev->filter = EVFILT_VNODE; - kev->flags = EV_ADD | EV_ENABLE | EV_ONESHOT; - kev->fflags = NOTE_DELETE | NOTE_WRITE | NOTE_EXTEND - | NOTE_ATTRIB | NOTE_RENAME | NOTE_REVOKE; + kev->flags = flags; + kev->fflags = fflags; kev->data = 0; kev->udata = nxt_kevent_set_udata(ev); - - nxt_thread_log_debug("kevent(%d) set: id:%d ft:%i fl:%04Xd, ff:%04XuD", - ks->kqueue, ev->file->fd, EVFILT_VNODE, - kev->flags, kev->fflags); } static void -nxt_kqueue_close_file(nxt_event_set_t *event_set, nxt_event_file_t *ev) +nxt_kqueue_close_file(nxt_event_engine_t *engine, nxt_event_file_t *ev) { - nxt_kqueue_drop_changes(event_set, ev->file->fd); + /* TODO: pending event. */ } static void -nxt_kqueue_fd_set(nxt_event_set_t *event_set, nxt_event_fd_t *ev, +nxt_kqueue_fd_set(nxt_event_engine_t *engine, nxt_fd_event_t *ev, nxt_int_t filter, nxt_uint_t flags) { - struct kevent *kev; - nxt_kqueue_event_set_t *ks; - - ks = &event_set->kqueue; + struct kevent *kev; - nxt_log_debug(ev->log, "kevent(%d) set event: id:%d ft:%i fl:%04Xui", - ks->kqueue, ev->fd, filter, flags); + nxt_debug(ev->task, "kevent(%d) set event: id:%d ft:%i fl:%04Xui", + engine->u.kqueue.fd, ev->fd, filter, flags); - kev = nxt_kqueue_get_kevent(ks); + kev = nxt_kqueue_get_kevent(engine); kev->ident = ev->fd; kev->filter = filter; @@ -502,45 +455,46 @@ nxt_kqueue_fd_set(nxt_event_set_t *event_set, nxt_event_fd_t *ev, static struct kevent * -nxt_kqueue_get_kevent(nxt_kqueue_event_set_t *ks) +nxt_kqueue_get_kevent(nxt_event_engine_t *engine) { - if (nxt_slow_path(ks->nchanges >= ks->mchanges)) { - nxt_kqueue_commit_changes(ks); - } + int ret, nchanges; - return &ks->changes[ks->nchanges++]; -} + nchanges = engine->u.kqueue.nchanges; + if (nxt_slow_path(nchanges >= engine->u.kqueue.mchanges)) { -static void -nxt_kqueue_commit_changes(nxt_kqueue_event_set_t *ks) -{ - nxt_thread_log_debug("kevent(%d) changes:%d", ks->kqueue, ks->nchanges); + nxt_debug(&engine->task, "kevent(%d) changes:%d", + engine->u.kqueue.fd, nchanges); + + ret = kevent(engine->u.kqueue.fd, engine->u.kqueue.changes, nchanges, + NULL, 0, NULL); + + if (nxt_slow_path(ret != 0)) { + nxt_log(&engine->task, NXT_LOG_CRIT, "kevent(%d) failed %E", + engine->u.kqueue.fd, nxt_errno); - if (kevent(ks->kqueue, ks->changes, ks->nchanges, NULL, 0, NULL) != 0) { - nxt_thread_log_alert("kevent(%d) failed %E", ks->kqueue, nxt_errno); + nxt_kqueue_error(engine); + } - nxt_kqueue_error(ks); + engine->u.kqueue.nchanges = 0; } - ks->nchanges = 0; + return &engine->u.kqueue.changes[engine->u.kqueue.nchanges++]; } static void -nxt_kqueue_error(nxt_kqueue_event_set_t *ks) +nxt_kqueue_error(nxt_event_engine_t *engine) { struct kevent *kev, *end; - nxt_thread_t *thread; - nxt_event_fd_t *ev; + nxt_fd_event_t *ev; nxt_event_file_t *fev; nxt_work_queue_t *wq; - thread = nxt_thread(); - wq = &thread->engine->fast_work_queue; - end = &ks->changes[ks->nchanges]; + wq = &engine->fast_work_queue; + end = &engine->u.kqueue.changes[engine->u.kqueue.nchanges]; - for (kev = ks->changes; kev < end; kev++) { + for (kev = engine->u.kqueue.changes; kev < end; kev++) { switch (kev->filter) { @@ -564,7 +518,7 @@ nxt_kqueue_error(nxt_kqueue_event_set_t *ks) static void nxt_kqueue_fd_error_handler(nxt_task_t *task, void *obj, void *data) { - nxt_event_fd_t *ev; + nxt_fd_event_t *ev; ev = obj; @@ -595,7 +549,7 @@ nxt_kqueue_file_error_handler(nxt_task_t *task, void *obj, void *data) static nxt_int_t -nxt_kqueue_add_signal(nxt_kqueue_event_set_t *ks, const nxt_event_sig_t *sigev) +nxt_kqueue_add_signal(nxt_event_engine_t *engine, const nxt_sig_event_t *sigev) { int signo; struct kevent kev; @@ -615,12 +569,14 @@ nxt_kqueue_add_signal(nxt_kqueue_event_set_t *ks, const nxt_event_sig_t *sigev) sa.sa_handler = (signo == SIGCHLD) ? SIG_DFL : SIG_IGN; if (sigaction(signo, &sa, NULL) != 0) { - nxt_main_log_alert("sigaction(%d) failed %E", signo, nxt_errno); + nxt_log(&engine->task, NXT_LOG_CRIT, "sigaction(%d) failed %E", + signo, nxt_errno); + return NXT_ERROR; } - nxt_main_log_debug("kevent(%d) signo:%d (%s)", - ks->kqueue, signo, sigev->name); + nxt_debug(&engine->task, "kevent(%d) signo:%d (%s)", + engine->u.kqueue.fd, signo, sigev->name); kev.ident = signo; kev.filter = EVFILT_SIGNAL; @@ -629,11 +585,13 @@ nxt_kqueue_add_signal(nxt_kqueue_event_set_t *ks, const nxt_event_sig_t *sigev) kev.data = 0; kev.udata = nxt_kevent_set_udata(sigev); - if (kevent(ks->kqueue, &kev, 1, NULL, 0, NULL) == 0) { + if (kevent(engine->u.kqueue.fd, &kev, 1, NULL, 0, NULL) == 0) { return NXT_OK; } - nxt_main_log_alert("kevent(%d) failed %E", ks->kqueue, nxt_errno); + nxt_log(&engine->task, NXT_LOG_CRIT, "kevent(%d) failed %E", + kqueue, nxt_errno); + return NXT_ERROR; } @@ -641,10 +599,9 @@ nxt_kqueue_add_signal(nxt_kqueue_event_set_t *ks, const nxt_event_sig_t *sigev) #if (NXT_HAVE_EVFILT_USER) static nxt_int_t -nxt_kqueue_enable_post(nxt_event_set_t *event_set, nxt_work_handler_t handler) +nxt_kqueue_enable_post(nxt_event_engine_t *engine, nxt_work_handler_t handler) { - struct kevent kev; - nxt_kqueue_event_set_t *ks; + struct kevent kev; /* EVFILT_USER must be added to a kqueue before it can be triggered. */ @@ -655,23 +612,23 @@ nxt_kqueue_enable_post(nxt_event_set_t *event_set, nxt_work_handler_t handler) kev.data = 0; kev.udata = NULL; - ks = &event_set->kqueue; - ks->post_handler = handler; + engine->u.kqueue.post_handler = handler; - if (kevent(ks->kqueue, &kev, 1, NULL, 0, NULL) == 0) { + if (kevent(engine->u.kqueue.fd, &kev, 1, NULL, 0, NULL) == 0) { return NXT_OK; } - nxt_main_log_alert("kevent(%d) failed %E", ks->kqueue, nxt_errno); + nxt_log(&engine->task, NXT_LOG_CRIT, "kevent(%d) failed %E", + engine->u.kqueue.fd, nxt_errno); + return NXT_ERROR; } static void -nxt_kqueue_signal(nxt_event_set_t *event_set, nxt_uint_t signo) +nxt_kqueue_signal(nxt_event_engine_t *engine, nxt_uint_t signo) { - struct kevent kev; - nxt_kqueue_event_set_t *ks; + struct kevent kev; /* * kqueue has a builtin signal processing support, so the function @@ -685,10 +642,9 @@ nxt_kqueue_signal(nxt_event_set_t *event_set, nxt_uint_t signo) kev.data = 0; kev.udata = NULL; - ks = &event_set->kqueue; - - if (kevent(ks->kqueue, &kev, 1, NULL, 0, NULL) != 0) { - nxt_thread_log_alert("kevent(%d) failed %E", ks->kqueue, nxt_errno); + if (kevent(engine->u.kqueue.fd, &kev, 1, NULL, 0, NULL) != 0) { + nxt_log(&engine->task, NXT_LOG_CRIT, "kevent(%d) failed %E", + engine->u.kqueue.fd, nxt_errno); } } @@ -696,24 +652,22 @@ nxt_kqueue_signal(nxt_event_set_t *event_set, nxt_uint_t signo) static void -nxt_kqueue_poll(nxt_task_t *task, nxt_event_set_t *event_set, - nxt_msec_t timeout) -{ - int nevents; - void *obj, *data; - nxt_int_t i; - nxt_err_t err; - nxt_uint_t level; - nxt_bool_t error, eof; - nxt_task_t *event_task; - struct kevent *kev; - nxt_event_fd_t *ev; - nxt_event_sig_t *sigev; - struct timespec ts, *tp; - nxt_event_file_t *fev; - nxt_work_queue_t *wq; - nxt_work_handler_t handler; - nxt_kqueue_event_set_t *ks; +nxt_kqueue_poll(nxt_event_engine_t *engine, nxt_msec_t timeout) +{ + int nevents; + void *obj, *data; + nxt_int_t i; + nxt_err_t err; + nxt_uint_t level; + nxt_bool_t error, eof; + nxt_task_t *task; + struct kevent *kev; + nxt_fd_event_t *ev; + nxt_sig_event_t *sigev; + struct timespec ts, *tp; + nxt_event_file_t *fev; + nxt_work_queue_t *wq; + nxt_work_handler_t handler; if (timeout == NXT_INFINITE_MSEC) { tp = NULL; @@ -724,35 +678,36 @@ nxt_kqueue_poll(nxt_task_t *task, nxt_event_set_t *event_set, tp = &ts; } - ks = &event_set->kqueue; + nxt_debug(&engine->task, "kevent(%d) changes:%d timeout:%M", + engine->u.kqueue.fd, engine->u.kqueue.nchanges, timeout); - nxt_debug(task, "kevent(%d) changes:%d timeout:%M", - ks->kqueue, ks->nchanges, timeout); - - nevents = kevent(ks->kqueue, ks->changes, ks->nchanges, - ks->events, ks->mevents, tp); + nevents = kevent(engine->u.kqueue.fd, + engine->u.kqueue.changes, engine->u.kqueue.nchanges, + engine->u.kqueue.events, engine->u.kqueue.mevents, tp); err = (nevents == -1) ? nxt_errno : 0; - nxt_thread_time_update(task->thread); + nxt_thread_time_update(engine->task.thread); - nxt_debug(task, "kevent(%d): %d", ks->kqueue, nevents); + nxt_debug(&engine->task, "kevent(%d): %d", engine->u.kqueue.fd, nevents); if (nevents == -1) { - level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_ALERT; - nxt_log(task, level, "kevent(%d) failed %E", ks->kqueue, err); + level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_CRIT; + + nxt_log(&engine->task, level, "kevent(%d) failed %E", + engine->u.kqueue.fd, err); - nxt_kqueue_error(ks); + nxt_kqueue_error(engine); return; } - ks->nchanges = 0; + engine->u.kqueue.nchanges = 0; for (i = 0; i < nevents; i++) { - kev = &ks->events[i]; + kev = &engine->u.kqueue.events[i]; - nxt_debug(task, + nxt_debug(&engine->task, (kev->ident > 0x8000000 && kev->ident != (uintptr_t) -1) ? "kevent: id:%p ft:%d fl:%04Xd ff:%d d:%d ud:%p": "kevent: id:%d ft:%d fl:%04Xd ff:%d d:%d ud:%p", @@ -762,13 +717,13 @@ nxt_kqueue_poll(nxt_task_t *task, nxt_event_set_t *event_set, error = (kev->flags & EV_ERROR); if (nxt_slow_path(error)) { - nxt_log(task, NXT_LOG_CRIT, + nxt_log(&engine->task, NXT_LOG_CRIT, "kevent(%d) error %E on ident:%d filter:%d", - ks->kqueue, kev->data, kev->ident, kev->filter); + engine->u.kqueue.fd, kev->data, kev->ident, kev->filter); } - event_task = task; - wq = &task->thread->engine->fast_work_queue; + task = &engine->task; + wq = &engine->fast_work_queue; handler = nxt_kqueue_fd_error_handler; obj = nxt_kevent_get_udata(kev->udata); @@ -801,7 +756,7 @@ nxt_kqueue_poll(nxt_task_t *task, nxt_event_set_t *event_set, wq = ev->read_work_queue; } - event_task = ev->task; + task = ev->task; data = ev->data; break; @@ -832,7 +787,7 @@ nxt_kqueue_poll(nxt_task_t *task, nxt_event_set_t *event_set, wq = ev->write_work_queue; } - event_task = ev->task; + task = ev->task; data = ev->data; break; @@ -840,7 +795,7 @@ nxt_kqueue_poll(nxt_task_t *task, nxt_event_set_t *event_set, case EVFILT_VNODE: fev = obj; handler = fev->handler; - event_task = fev->task; + task = fev->task; data = fev->data; break; @@ -854,7 +809,7 @@ nxt_kqueue_poll(nxt_task_t *task, nxt_event_set_t *event_set, #if (NXT_HAVE_EVFILT_USER) case EVFILT_USER: - handler = ks->post_handler; + handler = engine->u.kqueue.post_handler; data = NULL; break; @@ -863,15 +818,15 @@ nxt_kqueue_poll(nxt_task_t *task, nxt_event_set_t *event_set, default: #if (NXT_DEBUG) - nxt_log(task, NXT_LOG_CRIT, + nxt_log(&engine->task, NXT_LOG_CRIT, "unexpected kevent(%d) filter %d on ident %d", - ks->kqueue, kev->filter, kev->ident); + engine->u.kqueue.fd, kev->filter, kev->ident); #endif continue; } - nxt_work_queue_add(wq, handler, event_task, obj, data); + nxt_work_queue_add(wq, handler, task, obj, data); } } @@ -885,6 +840,7 @@ static void nxt_kqueue_event_conn_io_connect(nxt_task_t *task, void *obj, void *data) { nxt_event_conn_t *c; + nxt_event_engine_t *engine; nxt_work_handler_t handler; const nxt_event_conn_state_t *state; @@ -903,9 +859,10 @@ nxt_kqueue_event_conn_io_connect(nxt_task_t *task, void *obj, void *data) c->socket.write_handler = nxt_kqueue_event_conn_connected; c->socket.error_handler = nxt_event_conn_connect_error; - nxt_event_conn_timer(task->thread->engine, c, state, &c->write_timer); + engine = task->thread->engine; + nxt_event_conn_timer(engine, c, state, &c->write_timer); - nxt_kqueue_enable_write(task->thread->engine->event_set, &c->socket); + nxt_kqueue_enable_write(engine, &c->socket); return; case NXT_DECLINED: @@ -1053,8 +1010,8 @@ nxt_kqueue_event_conn_io_recvbuf(nxt_event_conn_t *c, nxt_buf_t *b) c->socket.kq_available = 0; } - nxt_log_debug(c->socket.log, "kevent fd:%d avail:%D eof:%d", - c->socket.fd, c->socket.kq_available, c->socket.kq_eof); + nxt_debug(c->socket.task, "kevent fd:%d avail:%D eof:%d", + c->socket.fd, c->socket.kq_available, c->socket.kq_eof); c->socket.read_ready = (c->socket.kq_available != 0 || c->socket.kq_eof); |