summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_kqueue_engine.c
diff options
context:
space:
mode:
authorIgor Sysoev <igor@sysoev.ru>2017-02-07 20:04:56 +0300
committerIgor Sysoev <igor@sysoev.ru>2017-02-07 20:04:56 +0300
commit059a8642898a6bd4b47d13a1c1d599cd44af7e1c (patch)
treee3c8c530a04f1ae44777d5ea4fd6901dc55a8ebf /src/nxt_kqueue_engine.c
parente57b95a92333fa7ff558737b0ba2b76894cc0412 (diff)
downloadunit-059a8642898a6bd4b47d13a1c1d599cd44af7e1c.tar.gz
unit-059a8642898a6bd4b47d13a1c1d599cd44af7e1c.tar.bz2
Event engines refactoring.
Diffstat (limited to '')
-rw-r--r--src/nxt_kqueue_engine.c (renamed from src/nxt_kqueue.c)481
1 files changed, 219 insertions, 262 deletions
diff --git a/src/nxt_kqueue.c b/src/nxt_kqueue_engine.c
index a5a596b9..b4057389 100644
--- a/src/nxt_kqueue.c
+++ b/src/nxt_kqueue_engine.c
@@ -51,55 +51,52 @@
#endif
-static nxt_event_set_t *nxt_kqueue_create(nxt_event_signals_t *signals,
+static nxt_int_t nxt_kqueue_create(nxt_event_engine_t *engine,
nxt_uint_t mchanges, nxt_uint_t mevents);
-static void nxt_kqueue_free(nxt_event_set_t *event_set);
-static void nxt_kqueue_enable(nxt_event_set_t *event_set, nxt_event_fd_t *ev);
-static void nxt_kqueue_disable(nxt_event_set_t *event_set, nxt_event_fd_t *ev);
-static void nxt_kqueue_delete(nxt_event_set_t *event_set, nxt_event_fd_t *ev);
-static void nxt_kqueue_close(nxt_event_set_t *event_set, nxt_event_fd_t *ev);
-static void nxt_kqueue_drop_changes(nxt_event_set_t *event_set,
- uintptr_t ident);
-static void nxt_kqueue_enable_read(nxt_event_set_t *event_set,
- nxt_event_fd_t *ev);
-static void nxt_kqueue_enable_write(nxt_event_set_t *event_set,
- nxt_event_fd_t *ev);
-static void nxt_kqueue_disable_read(nxt_event_set_t *event_set,
- nxt_event_fd_t *ev);
-static void nxt_kqueue_disable_write(nxt_event_set_t *event_set,
- nxt_event_fd_t *ev);
-static void nxt_kqueue_block_read(nxt_event_set_t *event_set,
- nxt_event_fd_t *ev);
-static void nxt_kqueue_block_write(nxt_event_set_t *event_set,
- nxt_event_fd_t *ev);
-static void nxt_kqueue_oneshot_read(nxt_event_set_t *event_set,
- nxt_event_fd_t *ev);
-static void nxt_kqueue_oneshot_write(nxt_event_set_t *event_set,
- nxt_event_fd_t *ev);
-static void nxt_kqueue_enable_accept(nxt_event_set_t *event_set,
- nxt_event_fd_t *ev);
-static void nxt_kqueue_enable_file(nxt_event_set_t *event_set,
+static void nxt_kqueue_free(nxt_event_engine_t *engine);
+static void nxt_kqueue_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
+static void nxt_kqueue_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
+static void nxt_kqueue_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
+static nxt_bool_t nxt_kqueue_close(nxt_event_engine_t *engine,
+ nxt_fd_event_t *ev);
+static void nxt_kqueue_enable_read(nxt_event_engine_t *engine,
+ nxt_fd_event_t *ev);
+static void nxt_kqueue_enable_write(nxt_event_engine_t *engine,
+ nxt_fd_event_t *ev);
+static void nxt_kqueue_disable_read(nxt_event_engine_t *engine,
+ nxt_fd_event_t *ev);
+static void nxt_kqueue_disable_write(nxt_event_engine_t *engine,
+ nxt_fd_event_t *ev);
+static void nxt_kqueue_block_read(nxt_event_engine_t *engine,
+ nxt_fd_event_t *ev);
+static void nxt_kqueue_block_write(nxt_event_engine_t *engine,
+ nxt_fd_event_t *ev);
+static void nxt_kqueue_oneshot_read(nxt_event_engine_t *engine,
+ nxt_fd_event_t *ev);
+static void nxt_kqueue_oneshot_write(nxt_event_engine_t *engine,
+ nxt_fd_event_t *ev);
+static void nxt_kqueue_enable_accept(nxt_event_engine_t *engine,
+ nxt_fd_event_t *ev);
+static void nxt_kqueue_enable_file(nxt_event_engine_t *engine,
nxt_event_file_t *ev);
-static void nxt_kqueue_close_file(nxt_event_set_t *event_set,
+static void nxt_kqueue_close_file(nxt_event_engine_t *engine,
nxt_event_file_t *ev);
-static void nxt_kqueue_fd_set(nxt_event_set_t *event_set, nxt_event_fd_t *ev,
+static void nxt_kqueue_fd_set(nxt_event_engine_t *engine, nxt_fd_event_t *ev,
nxt_int_t filter, nxt_uint_t flags);
-static struct kevent *nxt_kqueue_get_kevent(nxt_kqueue_event_set_t *ks);
-static void nxt_kqueue_commit_changes(nxt_kqueue_event_set_t *ks);
-static void nxt_kqueue_error(nxt_kqueue_event_set_t *ks);
+static struct kevent *nxt_kqueue_get_kevent(nxt_event_engine_t *engine);
+static void nxt_kqueue_error(nxt_event_engine_t *engine);
static void nxt_kqueue_fd_error_handler(nxt_task_t *task, void *obj,
void *data);
static void nxt_kqueue_file_error_handler(nxt_task_t *task, void *obj,
void *data);
-static nxt_int_t nxt_kqueue_add_signal(nxt_kqueue_event_set_t *kq,
- const nxt_event_sig_t *sigev);
+static nxt_int_t nxt_kqueue_add_signal(nxt_event_engine_t *engine,
+ const nxt_sig_event_t *sigev);
#if (NXT_HAVE_EVFILT_USER)
-static nxt_int_t nxt_kqueue_enable_post(nxt_event_set_t *event_set,
+static nxt_int_t nxt_kqueue_enable_post(nxt_event_engine_t *engine,
nxt_work_handler_t handler);
-static void nxt_kqueue_signal(nxt_event_set_t *event_set, nxt_uint_t signo);
+static void nxt_kqueue_signal(nxt_event_engine_t *engine, nxt_uint_t signo);
#endif
-static void nxt_kqueue_poll(nxt_task_t *task, nxt_event_set_t *event_set,
- nxt_msec_t timeout);
+static void nxt_kqueue_poll(nxt_event_engine_t *engine, nxt_msec_t timeout);
static void nxt_kqueue_event_conn_io_connect(nxt_task_t *task, void *obj,
void *data);
@@ -140,7 +137,7 @@ static nxt_event_conn_io_t nxt_kqueue_event_conn_io = {
};
-const nxt_event_set_ops_t nxt_kqueue_event_set = {
+const nxt_event_interface_t nxt_kqueue_engine = {
"kqueue",
nxt_kqueue_create,
nxt_kqueue_free,
@@ -175,91 +172,83 @@ const nxt_event_set_ops_t nxt_kqueue_event_set = {
};
-static nxt_event_set_t *
-nxt_kqueue_create(nxt_event_signals_t *signals, nxt_uint_t mchanges,
+static nxt_int_t
+nxt_kqueue_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
nxt_uint_t mevents)
{
- nxt_event_set_t *event_set;
- const nxt_event_sig_t *sigev;
- nxt_kqueue_event_set_t *ks;
-
- event_set = nxt_zalloc(sizeof(nxt_kqueue_event_set_t));
- if (event_set == NULL) {
- return NULL;
- }
+ const nxt_sig_event_t *sigev;
- ks = &event_set->kqueue;
+ engine->u.kqueue.fd = -1;
+ engine->u.kqueue.mchanges = mchanges;
+ engine->u.kqueue.mevents = mevents;
+ engine->u.kqueue.pid = nxt_pid;
- ks->kqueue = -1;
- ks->mchanges = mchanges;
- ks->mevents = mevents;
- ks->pid = nxt_pid;
-
- ks->changes = nxt_malloc(sizeof(struct kevent) * mchanges);
- if (ks->changes == NULL) {
+ engine->u.kqueue.changes = nxt_malloc(sizeof(struct kevent) * mchanges);
+ if (engine->u.kqueue.changes == NULL) {
goto fail;
}
- ks->events = nxt_malloc(sizeof(struct kevent) * mevents);
- if (ks->events == NULL) {
+ engine->u.kqueue.events = nxt_malloc(sizeof(struct kevent) * mevents);
+ if (engine->u.kqueue.events == NULL) {
goto fail;
}
- ks->kqueue = kqueue();
- if (ks->kqueue == -1) {
- nxt_main_log_emerg("kqueue() failed %E", nxt_errno);
+ engine->u.kqueue.fd = kqueue();
+ if (engine->u.kqueue.fd == -1) {
+ nxt_log(&engine->task, NXT_LOG_CRIT, "kqueue() failed %E", nxt_errno);
goto fail;
}
- nxt_main_log_debug("kqueue(): %d", ks->kqueue);
+ nxt_debug(&engine->task, "kqueue(): %d", engine->u.kqueue.fd);
- if (signals != NULL) {
- for (sigev = signals->sigev; sigev->signo != 0; sigev++) {
- if (nxt_kqueue_add_signal(ks, sigev) != NXT_OK) {
+ if (engine->signals != NULL) {
+ for (sigev = engine->signals->sigev; sigev->signo != 0; sigev++) {
+ if (nxt_kqueue_add_signal(engine, sigev) != NXT_OK) {
goto fail;
}
}
}
- return event_set;
+ return NXT_OK;
fail:
- nxt_kqueue_free(event_set);
+ nxt_kqueue_free(engine);
- return NULL;
+ return NXT_ERROR;
}
static void
-nxt_kqueue_free(nxt_event_set_t *event_set)
+nxt_kqueue_free(nxt_event_engine_t *engine)
{
- nxt_kqueue_event_set_t *ks;
+ nxt_fd_t fd;
- ks = &event_set->kqueue;
+ fd = engine->u.kqueue.fd;
- nxt_main_log_debug("kqueue %d free", ks->kqueue);
+ nxt_debug(&engine->task, "kqueue %d free", fd);
- if (ks->kqueue != -1 && ks->pid == nxt_pid) {
+ if (fd != -1 && engine->u.kqueue.pid == nxt_pid) {
/* kqueue is not inherited by fork() */
- if (close(ks->kqueue) != 0) {
- nxt_main_log_emerg("kqueue close(%d) failed %E",
- ks->kqueue, nxt_errno);
+ if (close(fd) != 0) {
+ nxt_log(&engine->task, NXT_LOG_CRIT, "kqueue close(%d) failed %E",
+ fd, nxt_errno);
}
}
- nxt_free(ks->events);
- nxt_free(ks->changes);
- nxt_free(ks);
+ nxt_free(engine->u.kqueue.events);
+ nxt_free(engine->u.kqueue.changes);
+
+ nxt_memzero(&engine->u.kqueue, sizeof(nxt_kqueue_engine_t));
}
static void
-nxt_kqueue_enable(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
+nxt_kqueue_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
{
- nxt_kqueue_enable_read(event_set, ev);
- nxt_kqueue_enable_write(event_set, ev);
+ nxt_kqueue_enable_read(engine, ev);
+ nxt_kqueue_enable_write(engine, ev);
}
@@ -269,31 +258,31 @@ nxt_kqueue_enable(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
*/
static void
-nxt_kqueue_disable(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
+nxt_kqueue_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
{
if (ev->read != NXT_EVENT_INACTIVE) {
ev->read = NXT_EVENT_INACTIVE;
- nxt_kqueue_fd_set(event_set, ev, EVFILT_READ, EV_DISABLE);
+ nxt_kqueue_fd_set(engine, ev, EVFILT_READ, EV_DISABLE);
}
if (ev->write != NXT_EVENT_INACTIVE) {
ev->write = NXT_EVENT_INACTIVE;
- nxt_kqueue_fd_set(event_set, ev, EVFILT_WRITE, EV_DISABLE);
+ nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE, EV_DISABLE);
}
}
static void
-nxt_kqueue_delete(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
+nxt_kqueue_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
{
if (ev->read != NXT_EVENT_INACTIVE) {
ev->read = NXT_EVENT_INACTIVE;
- nxt_kqueue_fd_set(event_set, ev, EVFILT_READ, EV_DELETE);
+ nxt_kqueue_fd_set(engine, ev, EVFILT_READ, EV_DELETE);
}
if (ev->write != NXT_EVENT_INACTIVE) {
ev->write = NXT_EVENT_INACTIVE;
- nxt_kqueue_fd_set(event_set, ev, EVFILT_WRITE, EV_DELETE);
+ nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE, EV_DELETE);
}
}
@@ -303,104 +292,72 @@ nxt_kqueue_delete(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
*
* Calling close() on a file descriptor will remove any kevents that
* reference the descriptor.
+ *
+ * nxt_kqueue_close() always returns true as there are pending events on
+ * closing file descriptor because kevent() passes whole change list at once.
*/
-static void
-nxt_kqueue_close(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
+static nxt_bool_t
+nxt_kqueue_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
{
ev->read = NXT_EVENT_INACTIVE;
ev->write = NXT_EVENT_INACTIVE;
- nxt_kqueue_drop_changes(event_set, ev->fd);
-}
-
-
-static void
-nxt_kqueue_drop_changes(nxt_event_set_t *event_set, uintptr_t ident)
-{
- struct kevent *dst, *src, *end;
- nxt_kqueue_event_set_t *ks;
-
- ks = &event_set->kqueue;
-
- dst = ks->changes;
- end = dst + ks->nchanges;
-
- for (src = dst; src < end; src++) {
-
- if (src->ident == ident) {
-
- switch (src->filter) {
-
- case EVFILT_READ:
- case EVFILT_WRITE:
- case EVFILT_VNODE:
- continue;
- }
- }
-
- if (dst != src) {
- *dst = *src;
- }
-
- dst++;
- }
-
- ks->nchanges -= end - dst;
+ return 1;
}
/*
- * The kqueue event set uses only three states: inactive, blocked, and
- * default. An active oneshot event is marked as it is in the default
- * state. The event will eventually be converted to the default EV_CLEAR
+ * The kqueue event engine uses only three states: inactive, blocked, and
+ * active. An active oneshot event is marked as it is in the default
+ * state. The event will be converted eventually to the default EV_CLEAR
* mode after it will become inactive after delivery.
*/
static void
-nxt_kqueue_enable_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
+nxt_kqueue_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
{
if (ev->read == NXT_EVENT_INACTIVE) {
- nxt_kqueue_fd_set(event_set, ev, EVFILT_READ,
+ nxt_kqueue_fd_set(engine, ev, EVFILT_READ,
EV_ADD | EV_ENABLE | EV_CLEAR);
}
- ev->read = NXT_EVENT_DEFAULT;
+ ev->read = NXT_EVENT_ACTIVE;
}
static void
-nxt_kqueue_enable_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
+nxt_kqueue_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
{
if (ev->write == NXT_EVENT_INACTIVE) {
- nxt_kqueue_fd_set(event_set, ev, EVFILT_WRITE,
+ nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE,
EV_ADD | EV_ENABLE | EV_CLEAR);
}
- ev->write = NXT_EVENT_DEFAULT;
+ ev->write = NXT_EVENT_ACTIVE;
}
static void
-nxt_kqueue_disable_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
+nxt_kqueue_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
{
ev->read = NXT_EVENT_INACTIVE;
- nxt_kqueue_fd_set(event_set, ev, EVFILT_READ, EV_DISABLE);
+ nxt_kqueue_fd_set(engine, ev, EVFILT_READ, EV_DISABLE);
}
static void
-nxt_kqueue_disable_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
+nxt_kqueue_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
{
ev->write = NXT_EVENT_INACTIVE;
- nxt_kqueue_fd_set(event_set, ev, EVFILT_WRITE, EV_DISABLE);
+ nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE, EV_DISABLE);
}
static void
-nxt_kqueue_block_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
+nxt_kqueue_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
{
if (ev->read != NXT_EVENT_INACTIVE) {
ev->read = NXT_EVENT_BLOCKED;
@@ -409,7 +366,7 @@ nxt_kqueue_block_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
static void
-nxt_kqueue_block_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
+nxt_kqueue_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
{
if (ev->write != NXT_EVENT_INACTIVE) {
ev->write = NXT_EVENT_BLOCKED;
@@ -418,79 +375,75 @@ nxt_kqueue_block_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
static void
-nxt_kqueue_oneshot_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
+nxt_kqueue_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
{
- ev->write = NXT_EVENT_DEFAULT;
+ ev->write = NXT_EVENT_ACTIVE;
- nxt_kqueue_fd_set(event_set, ev, EVFILT_WRITE,
+ nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE,
EV_ADD | EV_ENABLE | NXT_KEVENT_ONESHOT);
}
static void
-nxt_kqueue_oneshot_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
+nxt_kqueue_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
{
- ev->write = NXT_EVENT_DEFAULT;
+ ev->write = NXT_EVENT_ACTIVE;
- nxt_kqueue_fd_set(event_set, ev, EVFILT_WRITE,
+ nxt_kqueue_fd_set(engine, ev, EVFILT_WRITE,
EV_ADD | EV_ENABLE | NXT_KEVENT_ONESHOT);
}
static void
-nxt_kqueue_enable_accept(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
+nxt_kqueue_enable_accept(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
{
- ev->read = NXT_EVENT_DEFAULT;
+ ev->read = NXT_EVENT_ACTIVE;
ev->read_handler = nxt_kqueue_listen_handler;
- nxt_kqueue_fd_set(event_set, ev, EVFILT_READ, EV_ADD | EV_ENABLE);
+ nxt_kqueue_fd_set(engine, ev, EVFILT_READ, EV_ADD | EV_ENABLE);
}
static void
-nxt_kqueue_enable_file(nxt_event_set_t *event_set, nxt_event_file_t *ev)
+nxt_kqueue_enable_file(nxt_event_engine_t *engine, nxt_event_file_t *ev)
{
- struct kevent *kev;
- nxt_kqueue_event_set_t *ks;
+ struct kevent *kev;
+
+ const nxt_int_t flags = EV_ADD | EV_ENABLE | EV_ONESHOT;
+ const nxt_uint_t fflags = NOTE_DELETE | NOTE_WRITE | NOTE_EXTEND
+ | NOTE_ATTRIB | NOTE_RENAME | NOTE_REVOKE;
- ks = &event_set->kqueue;
+ nxt_debug(&engine->task, "kevent(%d) set: id:%d ft:%i fl:%04Xd, ff:%04XuD",
+ engine->u.kqueue.fd, ev->file->fd, EVFILT_VNODE, flags, fflags);
- kev = nxt_kqueue_get_kevent(ks);
+ kev = nxt_kqueue_get_kevent(engine);
kev->ident = ev->file->fd;
kev->filter = EVFILT_VNODE;
- kev->flags = EV_ADD | EV_ENABLE | EV_ONESHOT;
- kev->fflags = NOTE_DELETE | NOTE_WRITE | NOTE_EXTEND
- | NOTE_ATTRIB | NOTE_RENAME | NOTE_REVOKE;
+ kev->flags = flags;
+ kev->fflags = fflags;
kev->data = 0;
kev->udata = nxt_kevent_set_udata(ev);
-
- nxt_thread_log_debug("kevent(%d) set: id:%d ft:%i fl:%04Xd, ff:%04XuD",
- ks->kqueue, ev->file->fd, EVFILT_VNODE,
- kev->flags, kev->fflags);
}
static void
-nxt_kqueue_close_file(nxt_event_set_t *event_set, nxt_event_file_t *ev)
+nxt_kqueue_close_file(nxt_event_engine_t *engine, nxt_event_file_t *ev)
{
- nxt_kqueue_drop_changes(event_set, ev->file->fd);
+ /* TODO: pending event. */
}
static void
-nxt_kqueue_fd_set(nxt_event_set_t *event_set, nxt_event_fd_t *ev,
+nxt_kqueue_fd_set(nxt_event_engine_t *engine, nxt_fd_event_t *ev,
nxt_int_t filter, nxt_uint_t flags)
{
- struct kevent *kev;
- nxt_kqueue_event_set_t *ks;
-
- ks = &event_set->kqueue;
+ struct kevent *kev;
- nxt_log_debug(ev->log, "kevent(%d) set event: id:%d ft:%i fl:%04Xui",
- ks->kqueue, ev->fd, filter, flags);
+ nxt_debug(ev->task, "kevent(%d) set event: id:%d ft:%i fl:%04Xui",
+ engine->u.kqueue.fd, ev->fd, filter, flags);
- kev = nxt_kqueue_get_kevent(ks);
+ kev = nxt_kqueue_get_kevent(engine);
kev->ident = ev->fd;
kev->filter = filter;
@@ -502,45 +455,46 @@ nxt_kqueue_fd_set(nxt_event_set_t *event_set, nxt_event_fd_t *ev,
static struct kevent *
-nxt_kqueue_get_kevent(nxt_kqueue_event_set_t *ks)
+nxt_kqueue_get_kevent(nxt_event_engine_t *engine)
{
- if (nxt_slow_path(ks->nchanges >= ks->mchanges)) {
- nxt_kqueue_commit_changes(ks);
- }
+ int ret, nchanges;
- return &ks->changes[ks->nchanges++];
-}
+ nchanges = engine->u.kqueue.nchanges;
+ if (nxt_slow_path(nchanges >= engine->u.kqueue.mchanges)) {
-static void
-nxt_kqueue_commit_changes(nxt_kqueue_event_set_t *ks)
-{
- nxt_thread_log_debug("kevent(%d) changes:%d", ks->kqueue, ks->nchanges);
+ nxt_debug(&engine->task, "kevent(%d) changes:%d",
+ engine->u.kqueue.fd, nchanges);
+
+ ret = kevent(engine->u.kqueue.fd, engine->u.kqueue.changes, nchanges,
+ NULL, 0, NULL);
+
+ if (nxt_slow_path(ret != 0)) {
+ nxt_log(&engine->task, NXT_LOG_CRIT, "kevent(%d) failed %E",
+ engine->u.kqueue.fd, nxt_errno);
- if (kevent(ks->kqueue, ks->changes, ks->nchanges, NULL, 0, NULL) != 0) {
- nxt_thread_log_alert("kevent(%d) failed %E", ks->kqueue, nxt_errno);
+ nxt_kqueue_error(engine);
+ }
- nxt_kqueue_error(ks);
+ engine->u.kqueue.nchanges = 0;
}
- ks->nchanges = 0;
+ return &engine->u.kqueue.changes[engine->u.kqueue.nchanges++];
}
static void
-nxt_kqueue_error(nxt_kqueue_event_set_t *ks)
+nxt_kqueue_error(nxt_event_engine_t *engine)
{
struct kevent *kev, *end;
- nxt_thread_t *thread;
- nxt_event_fd_t *ev;
+ nxt_fd_event_t *ev;
nxt_event_file_t *fev;
nxt_work_queue_t *wq;
- thread = nxt_thread();
- wq = &thread->engine->fast_work_queue;
- end = &ks->changes[ks->nchanges];
+ wq = &engine->fast_work_queue;
+ end = &engine->u.kqueue.changes[engine->u.kqueue.nchanges];
- for (kev = ks->changes; kev < end; kev++) {
+ for (kev = engine->u.kqueue.changes; kev < end; kev++) {
switch (kev->filter) {
@@ -564,7 +518,7 @@ nxt_kqueue_error(nxt_kqueue_event_set_t *ks)
static void
nxt_kqueue_fd_error_handler(nxt_task_t *task, void *obj, void *data)
{
- nxt_event_fd_t *ev;
+ nxt_fd_event_t *ev;
ev = obj;
@@ -595,7 +549,7 @@ nxt_kqueue_file_error_handler(nxt_task_t *task, void *obj, void *data)
static nxt_int_t
-nxt_kqueue_add_signal(nxt_kqueue_event_set_t *ks, const nxt_event_sig_t *sigev)
+nxt_kqueue_add_signal(nxt_event_engine_t *engine, const nxt_sig_event_t *sigev)
{
int signo;
struct kevent kev;
@@ -615,12 +569,14 @@ nxt_kqueue_add_signal(nxt_kqueue_event_set_t *ks, const nxt_event_sig_t *sigev)
sa.sa_handler = (signo == SIGCHLD) ? SIG_DFL : SIG_IGN;
if (sigaction(signo, &sa, NULL) != 0) {
- nxt_main_log_alert("sigaction(%d) failed %E", signo, nxt_errno);
+ nxt_log(&engine->task, NXT_LOG_CRIT, "sigaction(%d) failed %E",
+ signo, nxt_errno);
+
return NXT_ERROR;
}
- nxt_main_log_debug("kevent(%d) signo:%d (%s)",
- ks->kqueue, signo, sigev->name);
+ nxt_debug(&engine->task, "kevent(%d) signo:%d (%s)",
+ engine->u.kqueue.fd, signo, sigev->name);
kev.ident = signo;
kev.filter = EVFILT_SIGNAL;
@@ -629,11 +585,13 @@ nxt_kqueue_add_signal(nxt_kqueue_event_set_t *ks, const nxt_event_sig_t *sigev)
kev.data = 0;
kev.udata = nxt_kevent_set_udata(sigev);
- if (kevent(ks->kqueue, &kev, 1, NULL, 0, NULL) == 0) {
+ if (kevent(engine->u.kqueue.fd, &kev, 1, NULL, 0, NULL) == 0) {
return NXT_OK;
}
- nxt_main_log_alert("kevent(%d) failed %E", ks->kqueue, nxt_errno);
+ nxt_log(&engine->task, NXT_LOG_CRIT, "kevent(%d) failed %E",
+ kqueue, nxt_errno);
+
return NXT_ERROR;
}
@@ -641,10 +599,9 @@ nxt_kqueue_add_signal(nxt_kqueue_event_set_t *ks, const nxt_event_sig_t *sigev)
#if (NXT_HAVE_EVFILT_USER)
static nxt_int_t
-nxt_kqueue_enable_post(nxt_event_set_t *event_set, nxt_work_handler_t handler)
+nxt_kqueue_enable_post(nxt_event_engine_t *engine, nxt_work_handler_t handler)
{
- struct kevent kev;
- nxt_kqueue_event_set_t *ks;
+ struct kevent kev;
/* EVFILT_USER must be added to a kqueue before it can be triggered. */
@@ -655,23 +612,23 @@ nxt_kqueue_enable_post(nxt_event_set_t *event_set, nxt_work_handler_t handler)
kev.data = 0;
kev.udata = NULL;
- ks = &event_set->kqueue;
- ks->post_handler = handler;
+ engine->u.kqueue.post_handler = handler;
- if (kevent(ks->kqueue, &kev, 1, NULL, 0, NULL) == 0) {
+ if (kevent(engine->u.kqueue.fd, &kev, 1, NULL, 0, NULL) == 0) {
return NXT_OK;
}
- nxt_main_log_alert("kevent(%d) failed %E", ks->kqueue, nxt_errno);
+ nxt_log(&engine->task, NXT_LOG_CRIT, "kevent(%d) failed %E",
+ engine->u.kqueue.fd, nxt_errno);
+
return NXT_ERROR;
}
static void
-nxt_kqueue_signal(nxt_event_set_t *event_set, nxt_uint_t signo)
+nxt_kqueue_signal(nxt_event_engine_t *engine, nxt_uint_t signo)
{
- struct kevent kev;
- nxt_kqueue_event_set_t *ks;
+ struct kevent kev;
/*
* kqueue has a builtin signal processing support, so the function
@@ -685,10 +642,9 @@ nxt_kqueue_signal(nxt_event_set_t *event_set, nxt_uint_t signo)
kev.data = 0;
kev.udata = NULL;
- ks = &event_set->kqueue;
-
- if (kevent(ks->kqueue, &kev, 1, NULL, 0, NULL) != 0) {
- nxt_thread_log_alert("kevent(%d) failed %E", ks->kqueue, nxt_errno);
+ if (kevent(engine->u.kqueue.fd, &kev, 1, NULL, 0, NULL) != 0) {
+ nxt_log(&engine->task, NXT_LOG_CRIT, "kevent(%d) failed %E",
+ engine->u.kqueue.fd, nxt_errno);
}
}
@@ -696,24 +652,22 @@ nxt_kqueue_signal(nxt_event_set_t *event_set, nxt_uint_t signo)
static void
-nxt_kqueue_poll(nxt_task_t *task, nxt_event_set_t *event_set,
- nxt_msec_t timeout)
-{
- int nevents;
- void *obj, *data;
- nxt_int_t i;
- nxt_err_t err;
- nxt_uint_t level;
- nxt_bool_t error, eof;
- nxt_task_t *event_task;
- struct kevent *kev;
- nxt_event_fd_t *ev;
- nxt_event_sig_t *sigev;
- struct timespec ts, *tp;
- nxt_event_file_t *fev;
- nxt_work_queue_t *wq;
- nxt_work_handler_t handler;
- nxt_kqueue_event_set_t *ks;
+nxt_kqueue_poll(nxt_event_engine_t *engine, nxt_msec_t timeout)
+{
+ int nevents;
+ void *obj, *data;
+ nxt_int_t i;
+ nxt_err_t err;
+ nxt_uint_t level;
+ nxt_bool_t error, eof;
+ nxt_task_t *task;
+ struct kevent *kev;
+ nxt_fd_event_t *ev;
+ nxt_sig_event_t *sigev;
+ struct timespec ts, *tp;
+ nxt_event_file_t *fev;
+ nxt_work_queue_t *wq;
+ nxt_work_handler_t handler;
if (timeout == NXT_INFINITE_MSEC) {
tp = NULL;
@@ -724,35 +678,36 @@ nxt_kqueue_poll(nxt_task_t *task, nxt_event_set_t *event_set,
tp = &ts;
}
- ks = &event_set->kqueue;
+ nxt_debug(&engine->task, "kevent(%d) changes:%d timeout:%M",
+ engine->u.kqueue.fd, engine->u.kqueue.nchanges, timeout);
- nxt_debug(task, "kevent(%d) changes:%d timeout:%M",
- ks->kqueue, ks->nchanges, timeout);
-
- nevents = kevent(ks->kqueue, ks->changes, ks->nchanges,
- ks->events, ks->mevents, tp);
+ nevents = kevent(engine->u.kqueue.fd,
+ engine->u.kqueue.changes, engine->u.kqueue.nchanges,
+ engine->u.kqueue.events, engine->u.kqueue.mevents, tp);
err = (nevents == -1) ? nxt_errno : 0;
- nxt_thread_time_update(task->thread);
+ nxt_thread_time_update(engine->task.thread);
- nxt_debug(task, "kevent(%d): %d", ks->kqueue, nevents);
+ nxt_debug(&engine->task, "kevent(%d): %d", engine->u.kqueue.fd, nevents);
if (nevents == -1) {
- level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_ALERT;
- nxt_log(task, level, "kevent(%d) failed %E", ks->kqueue, err);
+ level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_CRIT;
+
+ nxt_log(&engine->task, level, "kevent(%d) failed %E",
+ engine->u.kqueue.fd, err);
- nxt_kqueue_error(ks);
+ nxt_kqueue_error(engine);
return;
}
- ks->nchanges = 0;
+ engine->u.kqueue.nchanges = 0;
for (i = 0; i < nevents; i++) {
- kev = &ks->events[i];
+ kev = &engine->u.kqueue.events[i];
- nxt_debug(task,
+ nxt_debug(&engine->task,
(kev->ident > 0x8000000 && kev->ident != (uintptr_t) -1) ?
"kevent: id:%p ft:%d fl:%04Xd ff:%d d:%d ud:%p":
"kevent: id:%d ft:%d fl:%04Xd ff:%d d:%d ud:%p",
@@ -762,13 +717,13 @@ nxt_kqueue_poll(nxt_task_t *task, nxt_event_set_t *event_set,
error = (kev->flags & EV_ERROR);
if (nxt_slow_path(error)) {
- nxt_log(task, NXT_LOG_CRIT,
+ nxt_log(&engine->task, NXT_LOG_CRIT,
"kevent(%d) error %E on ident:%d filter:%d",
- ks->kqueue, kev->data, kev->ident, kev->filter);
+ engine->u.kqueue.fd, kev->data, kev->ident, kev->filter);
}
- event_task = task;
- wq = &task->thread->engine->fast_work_queue;
+ task = &engine->task;
+ wq = &engine->fast_work_queue;
handler = nxt_kqueue_fd_error_handler;
obj = nxt_kevent_get_udata(kev->udata);
@@ -801,7 +756,7 @@ nxt_kqueue_poll(nxt_task_t *task, nxt_event_set_t *event_set,
wq = ev->read_work_queue;
}
- event_task = ev->task;
+ task = ev->task;
data = ev->data;
break;
@@ -832,7 +787,7 @@ nxt_kqueue_poll(nxt_task_t *task, nxt_event_set_t *event_set,
wq = ev->write_work_queue;
}
- event_task = ev->task;
+ task = ev->task;
data = ev->data;
break;
@@ -840,7 +795,7 @@ nxt_kqueue_poll(nxt_task_t *task, nxt_event_set_t *event_set,
case EVFILT_VNODE:
fev = obj;
handler = fev->handler;
- event_task = fev->task;
+ task = fev->task;
data = fev->data;
break;
@@ -854,7 +809,7 @@ nxt_kqueue_poll(nxt_task_t *task, nxt_event_set_t *event_set,
#if (NXT_HAVE_EVFILT_USER)
case EVFILT_USER:
- handler = ks->post_handler;
+ handler = engine->u.kqueue.post_handler;
data = NULL;
break;
@@ -863,15 +818,15 @@ nxt_kqueue_poll(nxt_task_t *task, nxt_event_set_t *event_set,
default:
#if (NXT_DEBUG)
- nxt_log(task, NXT_LOG_CRIT,
+ nxt_log(&engine->task, NXT_LOG_CRIT,
"unexpected kevent(%d) filter %d on ident %d",
- ks->kqueue, kev->filter, kev->ident);
+ engine->u.kqueue.fd, kev->filter, kev->ident);
#endif
continue;
}
- nxt_work_queue_add(wq, handler, event_task, obj, data);
+ nxt_work_queue_add(wq, handler, task, obj, data);
}
}
@@ -885,6 +840,7 @@ static void
nxt_kqueue_event_conn_io_connect(nxt_task_t *task, void *obj, void *data)
{
nxt_event_conn_t *c;
+ nxt_event_engine_t *engine;
nxt_work_handler_t handler;
const nxt_event_conn_state_t *state;
@@ -903,9 +859,10 @@ nxt_kqueue_event_conn_io_connect(nxt_task_t *task, void *obj, void *data)
c->socket.write_handler = nxt_kqueue_event_conn_connected;
c->socket.error_handler = nxt_event_conn_connect_error;
- nxt_event_conn_timer(task->thread->engine, c, state, &c->write_timer);
+ engine = task->thread->engine;
+ nxt_event_conn_timer(engine, c, state, &c->write_timer);
- nxt_kqueue_enable_write(task->thread->engine->event_set, &c->socket);
+ nxt_kqueue_enable_write(engine, &c->socket);
return;
case NXT_DECLINED:
@@ -1053,8 +1010,8 @@ nxt_kqueue_event_conn_io_recvbuf(nxt_event_conn_t *c, nxt_buf_t *b)
c->socket.kq_available = 0;
}
- nxt_log_debug(c->socket.log, "kevent fd:%d avail:%D eof:%d",
- c->socket.fd, c->socket.kq_available, c->socket.kq_eof);
+ nxt_debug(c->socket.task, "kevent fd:%d avail:%D eof:%d",
+ c->socket.fd, c->socket.kq_available, c->socket.kq_eof);
c->socket.read_ready = (c->socket.kq_available != 0
|| c->socket.kq_eof);