summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_poll_engine.c
diff options
context:
space:
mode:
authorIgor Sysoev <igor@sysoev.ru>2017-02-07 20:04:56 +0300
committerIgor Sysoev <igor@sysoev.ru>2017-02-07 20:04:56 +0300
commit059a8642898a6bd4b47d13a1c1d599cd44af7e1c (patch)
treee3c8c530a04f1ae44777d5ea4fd6901dc55a8ebf /src/nxt_poll_engine.c
parente57b95a92333fa7ff558737b0ba2b76894cc0412 (diff)
downloadunit-059a8642898a6bd4b47d13a1c1d599cd44af7e1c.tar.gz
unit-059a8642898a6bd4b47d13a1c1d599cd44af7e1c.tar.bz2
Event engines refactoring.
Diffstat (limited to 'src/nxt_poll_engine.c')
-rw-r--r--src/nxt_poll_engine.c710
1 files changed, 710 insertions, 0 deletions
diff --git a/src/nxt_poll_engine.c b/src/nxt_poll_engine.c
new file mode 100644
index 00000000..90a8176e
--- /dev/null
+++ b/src/nxt_poll_engine.c
@@ -0,0 +1,710 @@
+
+/*
+ * Copyright (C) Igor Sysoev
+ * Copyright (C) NGINX, Inc.
+ */
+
+#include <nxt_main.h>
+
+
+#define NXT_POLL_ADD 0
+#define NXT_POLL_CHANGE 1
+#define NXT_POLL_DELETE 2
+
+
+typedef struct {
+ /*
+ * A file descriptor is stored in hash entry to allow
+ * nxt_poll_fd_hash_test() to not dereference a pointer to
+ * nxt_fd_event_t which may be invalid if the file descriptor has
+ * been already closed and the nxt_fd_event_t's memory has been freed.
+ */
+ nxt_socket_t fd;
+
+ uint32_t index;
+ void *event;
+} nxt_poll_hash_entry_t;
+
+
+static nxt_int_t nxt_poll_create(nxt_event_engine_t *engine,
+ nxt_uint_t mchanges, nxt_uint_t mevents);
+static void nxt_poll_free(nxt_event_engine_t *engine);
+static void nxt_poll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
+static void nxt_poll_disable(nxt_event_engine_t *engine,
+ nxt_fd_event_t *ev);
+static nxt_bool_t nxt_poll_close(nxt_event_engine_t *engine,
+ nxt_fd_event_t *ev);
+static void nxt_poll_enable_read(nxt_event_engine_t *engine,
+ nxt_fd_event_t *ev);
+static void nxt_poll_enable_write(nxt_event_engine_t *engine,
+ nxt_fd_event_t *ev);
+static void nxt_poll_disable_read(nxt_event_engine_t *engine,
+ nxt_fd_event_t *ev);
+static void nxt_poll_disable_write(nxt_event_engine_t *engine,
+ nxt_fd_event_t *ev);
+static void nxt_poll_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
+static void nxt_poll_block_write(nxt_event_engine_t *engine,
+ nxt_fd_event_t *ev);
+static void nxt_poll_oneshot_read(nxt_event_engine_t *engine,
+ nxt_fd_event_t *ev);
+static void nxt_poll_oneshot_write(nxt_event_engine_t *engine,
+ nxt_fd_event_t *ev);
+static void nxt_poll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev,
+ nxt_uint_t op, nxt_uint_t events);
+static nxt_int_t nxt_poll_commit_changes(nxt_event_engine_t *engine);
+static nxt_int_t nxt_poll_set_add(nxt_event_engine_t *engine,
+ nxt_fd_event_t *ev, int events);
+static nxt_int_t nxt_poll_set_change(nxt_event_engine_t *engine,
+ nxt_fd_t fd, int events);
+static nxt_int_t nxt_poll_set_delete(nxt_event_engine_t *engine, nxt_fd_t fd);
+static void nxt_poll(nxt_event_engine_t *engine, nxt_msec_t timeout);
+static nxt_poll_hash_entry_t *nxt_poll_fd_hash_get(nxt_event_engine_t *engine,
+ nxt_fd_t fd);
+static nxt_int_t nxt_poll_fd_hash_test(nxt_lvlhsh_query_t *lhq, void *data);
+static void nxt_poll_fd_hash_destroy(nxt_event_engine_t *engine,
+ nxt_lvlhsh_t *lh);
+
+
+const nxt_event_interface_t nxt_poll_engine = {
+ "poll",
+ nxt_poll_create,
+ nxt_poll_free,
+ nxt_poll_enable,
+ nxt_poll_disable,
+ nxt_poll_disable,
+ nxt_poll_close,
+ nxt_poll_enable_read,
+ nxt_poll_enable_write,
+ nxt_poll_disable_read,
+ nxt_poll_disable_write,
+ nxt_poll_block_read,
+ nxt_poll_block_write,
+ nxt_poll_oneshot_read,
+ nxt_poll_oneshot_write,
+ nxt_poll_enable_read,
+ NULL,
+ NULL,
+ NULL,
+ NULL,
+ nxt_poll,
+
+ &nxt_unix_event_conn_io,
+
+ NXT_NO_FILE_EVENTS,
+ NXT_NO_SIGNAL_EVENTS,
+};
+
+
+static const nxt_lvlhsh_proto_t nxt_poll_fd_hash_proto nxt_aligned(64) =
+{
+ NXT_LVLHSH_LARGE_MEMALIGN,
+ 0,
+ nxt_poll_fd_hash_test,
+ nxt_lvlhsh_alloc,
+ nxt_lvlhsh_free,
+};
+
+
+static nxt_int_t
+nxt_poll_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
+ nxt_uint_t mevents)
+{
+ engine->u.poll.mchanges = mchanges;
+
+ engine->u.poll.changes = nxt_malloc(sizeof(nxt_poll_change_t) * mchanges);
+
+ if (engine->u.poll.changes != NULL) {
+ return NXT_OK;
+ }
+
+ return NXT_ERROR;
+}
+
+
+static void
+nxt_poll_free(nxt_event_engine_t *engine)
+{
+ nxt_debug(&engine->task, "poll free");
+
+ nxt_free(engine->u.poll.set);
+ nxt_free(engine->u.poll.changes);
+ nxt_poll_fd_hash_destroy(engine, &engine->u.poll.fd_hash);
+
+ nxt_memzero(&engine->u.poll, sizeof(nxt_poll_engine_t));
+}
+
+
+static void
+nxt_poll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
+{
+ ev->read = NXT_EVENT_ACTIVE;
+ ev->write = NXT_EVENT_ACTIVE;
+
+ nxt_poll_change(engine, ev, NXT_POLL_ADD, POLLIN | POLLOUT);
+}
+
+
+static void
+nxt_poll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
+{
+ if (ev->read != NXT_EVENT_INACTIVE && ev->write != NXT_EVENT_INACTIVE) {
+ ev->read = NXT_EVENT_INACTIVE;
+ ev->write = NXT_EVENT_INACTIVE;
+
+ nxt_poll_change(engine, ev, NXT_POLL_DELETE, 0);
+ }
+}
+
+
+static nxt_bool_t
+nxt_poll_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
+{
+ nxt_poll_disable(engine, ev);
+
+ return ev->changing;
+}
+
+
+static void
+nxt_poll_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
+{
+ nxt_uint_t op, events;
+
+ ev->read = NXT_EVENT_ACTIVE;
+
+ if (ev->write == NXT_EVENT_INACTIVE) {
+ op = NXT_POLL_ADD;
+ events = POLLIN;
+
+ } else {
+ op = NXT_POLL_CHANGE;
+ events = POLLIN | POLLOUT;
+ }
+
+ nxt_poll_change(engine, ev, op, events);
+}
+
+
+static void
+nxt_poll_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
+{
+ nxt_uint_t op, events;
+
+ ev->write = NXT_EVENT_ACTIVE;
+
+ if (ev->read == NXT_EVENT_INACTIVE) {
+ op = NXT_POLL_ADD;
+ events = POLLOUT;
+
+ } else {
+ op = NXT_POLL_CHANGE;
+ events = POLLIN | POLLOUT;
+ }
+
+ nxt_poll_change(engine, ev, op, events);
+}
+
+
+static void
+nxt_poll_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
+{
+ nxt_uint_t op, events;
+
+ ev->read = NXT_EVENT_INACTIVE;
+
+ if (ev->write == NXT_EVENT_INACTIVE) {
+ op = NXT_POLL_DELETE;
+ events = 0;
+
+ } else {
+ op = NXT_POLL_CHANGE;
+ events = POLLOUT;
+ }
+
+ nxt_poll_change(engine, ev, op, events);
+}
+
+
+static void
+nxt_poll_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
+{
+ nxt_uint_t op, events;
+
+ ev->write = NXT_EVENT_INACTIVE;
+
+ if (ev->read == NXT_EVENT_INACTIVE) {
+ op = NXT_POLL_DELETE;
+ events = 0;
+
+ } else {
+ op = NXT_POLL_CHANGE;
+ events = POLLIN;
+ }
+
+ nxt_poll_change(engine, ev, op, events);
+}
+
+
+static void
+nxt_poll_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
+{
+ if (ev->read != NXT_EVENT_INACTIVE) {
+ nxt_poll_disable_read(engine, ev);
+ }
+}
+
+
+static void
+nxt_poll_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
+{
+ if (ev->write != NXT_EVENT_INACTIVE) {
+ nxt_poll_disable_write(engine, ev);
+ }
+}
+
+
+static void
+nxt_poll_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
+{
+ nxt_uint_t op;
+
+ op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ?
+ NXT_POLL_ADD : NXT_POLL_CHANGE;
+
+ ev->read = NXT_EVENT_ONESHOT;
+ ev->write = NXT_EVENT_INACTIVE;
+
+ nxt_poll_change(engine, ev, op, POLLIN);
+}
+
+
+static void
+nxt_poll_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
+{
+ nxt_uint_t op;
+
+ op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ?
+ NXT_POLL_ADD : NXT_POLL_CHANGE;
+
+ ev->read = NXT_EVENT_INACTIVE;
+ ev->write = NXT_EVENT_ONESHOT;
+
+ nxt_poll_change(engine, ev, op, POLLOUT);
+}
+
+
+/*
+ * poll changes are batched to improve instruction and data cache
+ * locality of several lvlhsh operations followed by poll() call.
+ */
+
+static void
+nxt_poll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, nxt_uint_t op,
+ nxt_uint_t events)
+{
+ nxt_poll_change_t *change;
+
+ nxt_debug(ev->task, "poll change: fd:%d op:%d ev:%XD", ev->fd, op, events);
+
+ if (engine->u.poll.nchanges >= engine->u.poll.mchanges) {
+ (void) nxt_poll_commit_changes(engine);
+ }
+
+ ev->changing = 1;
+
+ change = &engine->u.poll.changes[engine->u.poll.nchanges++];
+ change->op = op;
+ change->events = events;
+ change->event = ev;
+}
+
+
+static nxt_int_t
+nxt_poll_commit_changes(nxt_event_engine_t *engine)
+{
+ nxt_int_t ret, retval;
+ nxt_fd_event_t *ev;
+ nxt_poll_change_t *change, *end;
+
+ nxt_debug(&engine->task, "poll changes:%ui", engine->u.poll.nchanges);
+
+ retval = NXT_OK;
+ change = engine->u.poll.changes;
+ end = change + engine->u.poll.nchanges;
+
+ do {
+ ev = change->event;
+ ev->changing = 0;
+
+ switch (change->op) {
+
+ case NXT_POLL_ADD:
+ ret = nxt_poll_set_add(engine, ev, change->events);
+
+ if (nxt_fast_path(ret == NXT_OK)) {
+ goto next;
+ }
+
+ break;
+
+ case NXT_POLL_CHANGE:
+ ret = nxt_poll_set_change(engine, ev->fd, change->events);
+
+ if (nxt_fast_path(ret == NXT_OK)) {
+ goto next;
+ }
+
+ break;
+
+ case NXT_POLL_DELETE:
+ ret = nxt_poll_set_delete(engine, ev->fd);
+
+ if (nxt_fast_path(ret == NXT_OK)) {
+ goto next;
+ }
+
+ break;
+ }
+
+ nxt_work_queue_add(&engine->fast_work_queue, ev->error_handler,
+ ev->task, ev, ev->data);
+
+ retval = NXT_ERROR;
+
+ next:
+
+ change++;
+
+ } while (change < end);
+
+ engine->u.poll.nchanges = 0;
+
+ return retval;
+}
+
+
+static nxt_int_t
+nxt_poll_set_add(nxt_event_engine_t *engine, nxt_fd_event_t *ev, int events)
+{
+ nxt_int_t ret;
+ nxt_uint_t max_nfds;
+ struct pollfd *pfd;
+ nxt_lvlhsh_query_t lhq;
+ nxt_poll_hash_entry_t *phe;
+
+ nxt_debug(&engine->task, "poll add event: fd:%d ev:%04Xi", ev->fd, events);
+
+ if (engine->u.poll.nfds >= engine->u.poll.max_nfds) {
+ max_nfds = engine->u.poll.max_nfds + 512; /* 4K */
+
+ pfd = nxt_realloc(engine->u.poll.set, sizeof(struct pollfd) * max_nfds);
+ if (nxt_slow_path(pfd == NULL)) {
+ return NXT_ERROR;
+ }
+
+ engine->u.poll.set = pfd;
+ engine->u.poll.max_nfds = max_nfds;
+ }
+
+ phe = nxt_malloc(sizeof(nxt_poll_hash_entry_t));
+ if (nxt_slow_path(phe == NULL)) {
+ return NXT_ERROR;
+ }
+
+ phe->fd = ev->fd;
+ phe->index = engine->u.poll.nfds;
+ phe->event = ev;
+
+ pfd = &engine->u.poll.set[engine->u.poll.nfds++];
+ pfd->fd = ev->fd;
+ pfd->events = events;
+ pfd->revents = 0;
+
+ lhq.key_hash = nxt_murmur_hash2(&ev->fd, sizeof(nxt_fd_t));
+ lhq.replace = 0;
+ lhq.value = phe;
+ lhq.proto = &nxt_poll_fd_hash_proto;
+ lhq.data = engine;
+
+ ret = nxt_lvlhsh_insert(&engine->u.poll.fd_hash, &lhq);
+
+ if (nxt_fast_path(ret == NXT_OK)) {
+ return NXT_OK;
+ }
+
+ nxt_free(phe);
+
+ return NXT_ERROR;
+}
+
+
+static nxt_int_t
+nxt_poll_set_change(nxt_event_engine_t *engine, nxt_fd_t fd, int events)
+{
+ nxt_poll_hash_entry_t *phe;
+
+ nxt_debug(&engine->task, "poll change event: fd:%d ev:%04Xi",
+ fd, events);
+
+ phe = nxt_poll_fd_hash_get(engine, fd);
+
+ if (nxt_fast_path(phe != NULL)) {
+ engine->u.poll.set[phe->index].events = events;
+ return NXT_OK;
+ }
+
+ return NXT_ERROR;
+}
+
+
+static nxt_int_t
+nxt_poll_set_delete(nxt_event_engine_t *engine, nxt_fd_t fd)
+{
+ nxt_int_t ret;
+ nxt_uint_t index, nfds;
+ nxt_lvlhsh_query_t lhq;
+ nxt_poll_hash_entry_t *phe;
+
+ nxt_debug(&engine->task, "poll delete event: fd:%d", fd);
+
+ lhq.key_hash = nxt_murmur_hash2(&fd, sizeof(nxt_fd_t));
+ lhq.proto = &nxt_poll_fd_hash_proto;
+ lhq.data = engine;
+
+ ret = nxt_lvlhsh_delete(&engine->u.poll.fd_hash, &lhq);
+
+ if (nxt_slow_path(ret != NXT_OK)) {
+ return NXT_ERROR;
+ }
+
+ phe = lhq.value;
+
+ index = phe->index;
+ engine->u.poll.nfds--;
+ nfds = engine->u.poll.nfds;
+
+ if (index != nfds) {
+ engine->u.poll.set[index] = engine->u.poll.set[nfds];
+
+ phe = nxt_poll_fd_hash_get(engine, engine->u.poll.set[nfds].fd);
+
+ phe->index = index;
+ }
+
+ nxt_free(lhq.value);
+
+ return NXT_OK;
+}
+
+
+static void
+nxt_poll(nxt_event_engine_t *engine, nxt_msec_t timeout)
+{
+ int nevents;
+ nxt_fd_t fd;
+ nxt_err_t err;
+ nxt_bool_t error;
+ nxt_uint_t i, events, level;
+ struct pollfd *pfd;
+ nxt_fd_event_t *ev;
+ nxt_poll_hash_entry_t *phe;
+
+ if (engine->u.poll.nchanges != 0) {
+ if (nxt_poll_commit_changes(engine) != NXT_OK) {
+ /* Error handlers have been enqueued on failure. */
+ timeout = 0;
+ }
+ }
+
+ nxt_debug(&engine->task, "poll() events:%ui timeout:%M",
+ engine->u.poll.nfds, timeout);
+
+ nevents = poll(engine->u.poll.set, engine->u.poll.nfds, timeout);
+
+ err = (nevents == -1) ? nxt_errno : 0;
+
+ nxt_thread_time_update(engine->task.thread);
+
+ nxt_debug(&engine->task, "poll(): %d", nevents);
+
+ if (nevents == -1) {
+ level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_ALERT;
+ nxt_log(&engine->task, level, "poll() failed %E", err);
+ return;
+ }
+
+ for (i = 0; i < engine->u.poll.nfds && nevents != 0; i++) {
+
+ pfd = &engine->u.poll.set[i];
+ events = pfd->revents;
+
+ if (events == 0) {
+ continue;
+ }
+
+ fd = pfd->fd;
+
+ phe = nxt_poll_fd_hash_get(engine, fd);
+
+ if (nxt_slow_path(phe == NULL)) {
+ nxt_log(&engine->task, NXT_LOG_CRIT,
+ "poll() returned invalid fd:%d ev:%04Xd rev:%04uXi",
+ fd, pfd->events, events);
+
+ /* Mark the poll entry to ignore it by the kernel. */
+ pfd->fd = -1;
+ goto next;
+ }
+
+ ev = phe->event;
+
+ nxt_debug(ev->task, "poll: fd:%d ev:%04uXi rd:%d %wr:%d",
+ fd, events, ev->read, ev->write);
+
+ if (nxt_slow_path((events & POLLNVAL) != 0)) {
+ nxt_log(ev->task, NXT_LOG_CRIT,
+ "poll() error fd:%d ev:%04Xd rev:%04uXi",
+ fd, pfd->events, events);
+
+ /* Mark the poll entry to ignore it by the kernel. */
+ pfd->fd = -1;
+
+ nxt_work_queue_add(&engine->fast_work_queue,
+ ev->error_handler, ev->task, ev, ev->data);
+ goto next;
+ }
+
+ /*
+ * On a socket's remote end close:
+ *
+ * Linux, FreeBSD, and Solaris set POLLIN;
+ * MacOSX sets POLLIN and POLLHUP;
+ * NetBSD sets POLLIN, and poll(2) claims this explicitly:
+ *
+ * If the remote end of a socket is closed, poll()
+ * returns a POLLIN event, rather than a POLLHUP.
+ *
+ * On error:
+ *
+ * Linux sets POLLHUP and POLLERR only;
+ * FreeBSD adds POLLHUP to POLLIN or POLLOUT, although poll(2)
+ * claims the opposite:
+ *
+ * Note that POLLHUP and POLLOUT should never be
+ * present in the revents bitmask at the same time.
+ *
+ * Solaris and NetBSD do not add POLLHUP or POLLERR;
+ * MacOSX sets POLLHUP only.
+ *
+ * If an implementation sets POLLERR or POLLHUP only without POLLIN
+ * or POLLOUT, the "error" variable enqueues only one active handler.
+ */
+
+ error = (((events & (POLLERR | POLLHUP)) != 0)
+ && ((events & (POLLIN | POLLOUT)) == 0));
+
+ if ((events & POLLIN) || (error && ev->read_handler != NULL)) {
+ error = 0;
+ ev->read_ready = 1;
+
+ if (ev->read == NXT_EVENT_ONESHOT) {
+ ev->read = NXT_EVENT_INACTIVE;
+ nxt_poll_change(engine, ev, NXT_POLL_DELETE, 0);
+ }
+
+ nxt_work_queue_add(ev->read_work_queue, ev->read_handler,
+ ev->task, ev, ev->data);
+ }
+
+ if ((events & POLLOUT) || (error && ev->write_handler != NULL)) {
+ ev->write_ready = 1;
+
+ if (ev->write == NXT_EVENT_ONESHOT) {
+ ev->write = NXT_EVENT_INACTIVE;
+ nxt_poll_change(engine, ev, NXT_POLL_DELETE, 0);
+ }
+
+ nxt_work_queue_add(ev->write_work_queue, ev->write_handler,
+ ev->task, ev, ev->data);
+ }
+
+ next:
+
+ nevents--;
+ }
+}
+
+
+static nxt_poll_hash_entry_t *
+nxt_poll_fd_hash_get(nxt_event_engine_t *engine, nxt_fd_t fd)
+{
+ nxt_lvlhsh_query_t lhq;
+ nxt_poll_hash_entry_t *phe;
+
+ lhq.key_hash = nxt_murmur_hash2(&fd, sizeof(nxt_fd_t));
+ lhq.proto = &nxt_poll_fd_hash_proto;
+ lhq.data = engine;
+
+ if (nxt_lvlhsh_find(&engine->u.poll.fd_hash, &lhq) == NXT_OK) {
+ phe = lhq.value;
+ return phe;
+ }
+
+ nxt_log(&engine->task, NXT_LOG_CRIT, "fd %d not found in hash", fd);
+
+ return NULL;
+}
+
+
+static nxt_int_t
+nxt_poll_fd_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
+{
+ nxt_event_engine_t *engine;
+ nxt_poll_hash_entry_t *phe;
+
+ phe = data;
+
+ /* nxt_murmur_hash2() is unique for 4 bytes. */
+
+ engine = lhq->data;
+
+ if (nxt_fast_path(phe->fd == engine->u.poll.set[phe->index].fd)) {
+ return NXT_OK;
+ }
+
+ nxt_log(&engine->task, NXT_LOG_CRIT,
+ "fd %d in hash mismatches fd %d in poll set",
+ phe->fd, engine->u.poll.set[phe->index].fd);
+
+ return NXT_DECLINED;
+}
+
+
+static void
+nxt_poll_fd_hash_destroy(nxt_event_engine_t *engine, nxt_lvlhsh_t *lh)
+{
+ nxt_lvlhsh_each_t lhe;
+ nxt_lvlhsh_query_t lhq;
+ nxt_poll_hash_entry_t *phe;
+
+ nxt_memzero(&lhe, sizeof(nxt_lvlhsh_each_t));
+ lhe.proto = &nxt_poll_fd_hash_proto;
+ lhq.proto = &nxt_poll_fd_hash_proto;
+
+ for ( ;; ) {
+ phe = nxt_lvlhsh_each(lh, &lhe);
+
+ if (phe == NULL) {
+ return;
+ }
+
+ lhq.key_hash = nxt_murmur_hash2(&phe->fd, sizeof(nxt_fd_t));
+
+ if (nxt_lvlhsh_delete(lh, &lhq) != NXT_OK) {
+ nxt_log(&engine->task, NXT_LOG_CRIT,
+ "event fd %d not found in hash", phe->fd);
+ }
+
+ nxt_free(phe);
+ }
+}