summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_poll.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nxt_poll.c')
-rw-r--r--src/nxt_poll.c752
1 files changed, 0 insertions, 752 deletions
diff --git a/src/nxt_poll.c b/src/nxt_poll.c
deleted file mode 100644
index fd195261..00000000
--- a/src/nxt_poll.c
+++ /dev/null
@@ -1,752 +0,0 @@
-
-/*
- * Copyright (C) Igor Sysoev
- * Copyright (C) NGINX, Inc.
- */
-
-#include <nxt_main.h>
-
-
-#define NXT_POLL_ADD 0
-#define NXT_POLL_CHANGE 1
-#define NXT_POLL_DELETE 2
-
-
-typedef struct {
- /*
- * A file descriptor is stored in hash entry to allow
- * nxt_poll_fd_hash_test() to not dereference a pointer to
- * nxt_event_fd_t which may be invalid if the file descriptor has
- * been already closed and the nxt_event_fd_t's memory has been freed.
- */
- nxt_socket_t fd;
-
- uint32_t index;
- void *event;
-} nxt_poll_hash_entry_t;
-
-
-static nxt_event_set_t *nxt_poll_create(nxt_event_signals_t *signals,
- nxt_uint_t mchanges, nxt_uint_t mevents);
-static void nxt_poll_free(nxt_event_set_t *event_set);
-static void nxt_poll_enable(nxt_event_set_t *event_set, nxt_event_fd_t *ev);
-static void nxt_poll_disable(nxt_event_set_t *event_set, nxt_event_fd_t *ev);
-static void nxt_poll_drop_changes(nxt_event_set_t *event_set,
- nxt_event_fd_t *ev);
-static void nxt_poll_enable_read(nxt_event_set_t *event_set,
- nxt_event_fd_t *ev);
-static void nxt_poll_enable_write(nxt_event_set_t *event_set,
- nxt_event_fd_t *ev);
-static void nxt_poll_disable_read(nxt_event_set_t *event_set,
- nxt_event_fd_t *ev);
-static void nxt_poll_disable_write(nxt_event_set_t *event_set,
- nxt_event_fd_t *ev);
-static void nxt_poll_block_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev);
-static void nxt_poll_block_write(nxt_event_set_t *event_set,
- nxt_event_fd_t *ev);
-static void nxt_poll_oneshot_read(nxt_event_set_t *event_set,
- nxt_event_fd_t *ev);
-static void nxt_poll_oneshot_write(nxt_event_set_t *event_set,
- nxt_event_fd_t *ev);
-static void nxt_poll_change(nxt_event_set_t *event_set, nxt_event_fd_t *ev,
- nxt_uint_t op, nxt_uint_t events);
-static nxt_int_t nxt_poll_commit_changes(nxt_thread_t *thr,
- nxt_poll_event_set_t *ps);
-static nxt_int_t nxt_poll_set_add(nxt_thread_t *thr, nxt_poll_event_set_t *ps,
- nxt_poll_change_t *ch);
-static nxt_int_t nxt_poll_set_change(nxt_thread_t *thr,
- nxt_poll_event_set_t *ps, nxt_poll_change_t *ch);
-static nxt_int_t nxt_poll_set_delete(nxt_thread_t *thr,
- nxt_poll_event_set_t *ps, nxt_poll_change_t *ch);
-static void nxt_poll_set_poll(nxt_task_t *task, nxt_event_set_t *event_set,
- nxt_msec_t timeout);
-static nxt_poll_hash_entry_t *nxt_poll_fd_hash_get(nxt_poll_event_set_t *ps,
- nxt_fd_t fd);
-static nxt_int_t nxt_poll_fd_hash_test(nxt_lvlhsh_query_t *lhq, void *data);
-static void nxt_poll_fd_hash_destroy(nxt_lvlhsh_t *lh);
-
-
-const nxt_event_set_ops_t nxt_poll_event_set = {
- "poll",
- nxt_poll_create,
- nxt_poll_free,
- nxt_poll_enable,
- nxt_poll_disable,
- nxt_poll_disable,
- nxt_poll_disable,
- nxt_poll_enable_read,
- nxt_poll_enable_write,
- nxt_poll_disable_read,
- nxt_poll_disable_write,
- nxt_poll_block_read,
- nxt_poll_block_write,
- nxt_poll_oneshot_read,
- nxt_poll_oneshot_write,
- nxt_poll_enable_read,
- NULL,
- NULL,
- NULL,
- NULL,
- nxt_poll_set_poll,
-
- &nxt_unix_event_conn_io,
-
- NXT_NO_FILE_EVENTS,
- NXT_NO_SIGNAL_EVENTS,
-};
-
-
-static const nxt_lvlhsh_proto_t nxt_poll_fd_hash_proto nxt_aligned(64) =
-{
- NXT_LVLHSH_LARGE_MEMALIGN,
- 0,
- nxt_poll_fd_hash_test,
- nxt_lvlhsh_alloc,
- nxt_lvlhsh_free,
-};
-
-
-static nxt_event_set_t *
-nxt_poll_create(nxt_event_signals_t *signals, nxt_uint_t mchanges,
- nxt_uint_t mevents)
-{
- nxt_event_set_t *event_set;
- nxt_poll_event_set_t *ps;
-
- event_set = nxt_zalloc(sizeof(nxt_poll_event_set_t));
- if (event_set == NULL) {
- return NULL;
- }
-
- ps = &event_set->poll;
-
- ps->mchanges = mchanges;
-
- ps->changes = nxt_malloc(sizeof(nxt_poll_change_t) * mchanges);
- if (ps->changes == NULL) {
- nxt_free(event_set);
- return NULL;
- }
-
- return event_set;
-}
-
-
-static void
-nxt_poll_free(nxt_event_set_t *event_set)
-{
- nxt_poll_event_set_t *ps;
-
- ps = &event_set->poll;
-
- nxt_main_log_debug("poll free");
-
- nxt_free(ps->poll_set);
- nxt_free(ps->changes);
- nxt_poll_fd_hash_destroy(&ps->fd_hash);
- nxt_free(ps);
-}
-
-
-static void
-nxt_poll_enable(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
-{
- ev->read = NXT_EVENT_DEFAULT;
- ev->write = NXT_EVENT_DEFAULT;
-
- nxt_poll_change(event_set, ev, NXT_POLL_ADD, POLLIN | POLLOUT);
-}
-
-
-static void
-nxt_poll_disable(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
-{
- ev->read = NXT_EVENT_INACTIVE;
- ev->write = NXT_EVENT_INACTIVE;
-
- nxt_poll_drop_changes(event_set, ev);
- /*
- * A simple non-zero value POLLHUP is a flag to ignore error handling
- * if the event is not present in poll set, because the event may be
- * freed at the time when the NXT_POLL_DELETE change will be processed
- * and correct event error_handler will not be available.
- */
- nxt_poll_change(event_set, ev, NXT_POLL_DELETE, POLLHUP);
-}
-
-
-static void
-nxt_poll_drop_changes(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
-{
- nxt_poll_change_t *dst, *src, *end;
- nxt_poll_event_set_t *ps;
-
- ps = &event_set->poll;
-
- dst = ps->changes;
- end = dst + ps->nchanges;
-
- for (src = dst; src < end; src++) {
-
- if (src->event == ev) {
- continue;
- }
-
- if (dst != src) {
- *dst = *src;
- }
-
- dst++;
- }
-
- ps->nchanges -= end - dst;
-}
-
-
-static void
-nxt_poll_enable_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
-{
- nxt_uint_t op, events;
-
- ev->read = NXT_EVENT_DEFAULT;
-
- if (ev->write == NXT_EVENT_INACTIVE) {
- op = NXT_POLL_ADD;
- events = POLLIN;
-
- } else {
- op = NXT_POLL_CHANGE;
- events = POLLIN | POLLOUT;
- }
-
- nxt_poll_change(event_set, ev, op, events);
-}
-
-
-static void
-nxt_poll_enable_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
-{
- nxt_uint_t op, events;
-
- ev->write = NXT_EVENT_DEFAULT;
-
- if (ev->read == NXT_EVENT_INACTIVE) {
- op = NXT_POLL_ADD;
- events = POLLOUT;
-
- } else {
- op = NXT_POLL_CHANGE;
- events = POLLIN | POLLOUT;
- }
-
- nxt_poll_change(event_set, ev, op, events);
-}
-
-
-static void
-nxt_poll_disable_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
-{
- nxt_uint_t op, events;
-
- ev->read = NXT_EVENT_INACTIVE;
-
- if (ev->write == NXT_EVENT_INACTIVE) {
- op = NXT_POLL_DELETE;
- events = 0;
-
- } else {
- op = NXT_POLL_CHANGE;
- events = POLLOUT;
- }
-
- nxt_poll_change(event_set, ev, op, events);
-}
-
-
-static void
-nxt_poll_disable_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
-{
- nxt_uint_t op, events;
-
- ev->write = NXT_EVENT_INACTIVE;
-
- if (ev->read == NXT_EVENT_INACTIVE) {
- op = NXT_POLL_DELETE;
- events = 0;
-
- } else {
- op = NXT_POLL_CHANGE;
- events = POLLIN;
- }
-
- nxt_poll_change(event_set, ev, op, events);
-}
-
-
-static void
-nxt_poll_block_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
-{
- if (ev->read != NXT_EVENT_INACTIVE) {
- nxt_poll_disable_read(event_set, ev);
- }
-}
-
-
-static void
-nxt_poll_block_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
-{
- if (ev->write != NXT_EVENT_INACTIVE) {
- nxt_poll_disable_write(event_set, ev);
- }
-}
-
-
-static void
-nxt_poll_oneshot_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
-{
- nxt_uint_t op;
-
- op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ?
- NXT_POLL_ADD : NXT_POLL_CHANGE;
-
- ev->read = NXT_EVENT_ONESHOT;
- ev->write = NXT_EVENT_INACTIVE;
-
- nxt_poll_change(event_set, ev, op, POLLIN);
-}
-
-
-static void
-nxt_poll_oneshot_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
-{
- nxt_uint_t op;
-
- op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ?
- NXT_POLL_ADD : NXT_POLL_CHANGE;
-
- ev->read = NXT_EVENT_INACTIVE;
- ev->write = NXT_EVENT_ONESHOT;
-
- nxt_poll_change(event_set, ev, op, POLLOUT);
-}
-
-
-/*
- * poll changes are batched to improve instruction and data cache
- * locality of several lvlhsh operations followed by poll() call.
- */
-
-static void
-nxt_poll_change(nxt_event_set_t *event_set, nxt_event_fd_t *ev, nxt_uint_t op,
- nxt_uint_t events)
-{
- nxt_poll_change_t *ch;
- nxt_poll_event_set_t *ps;
-
- nxt_log_debug(ev->log, "poll change: fd:%d op:%d ev:%XD",
- ev->fd, op, events);
-
- ps = &event_set->poll;
-
- if (ps->nchanges >= ps->mchanges) {
- (void) nxt_poll_commit_changes(nxt_thread(), ps);
- }
-
- ch = &ps->changes[ps->nchanges++];
- ch->op = op;
- ch->fd = ev->fd;
- ch->events = events;
- ch->event = ev;
-}
-
-
-static nxt_int_t
-nxt_poll_commit_changes(nxt_thread_t *thr, nxt_poll_event_set_t *ps)
-{
- nxt_int_t ret;
- nxt_event_fd_t *ev;
- nxt_poll_change_t *ch, *end;
-
- nxt_log_debug(thr->log, "poll changes:%ui", ps->nchanges);
-
- ret = NXT_OK;
- ch = ps->changes;
- end = ch + ps->nchanges;
-
- do {
- ev = ch->event;
-
- switch (ch->op) {
-
- case NXT_POLL_ADD:
- if (nxt_fast_path(nxt_poll_set_add(thr, ps, ch) == NXT_OK)) {
- goto next;
- }
- break;
-
- case NXT_POLL_CHANGE:
- if (nxt_fast_path(nxt_poll_set_change(thr, ps, ch) == NXT_OK)) {
- goto next;
- }
- break;
-
- case NXT_POLL_DELETE:
- if (nxt_fast_path(nxt_poll_set_delete(thr, ps, ch) == NXT_OK)) {
- goto next;
- }
- break;
- }
-
- nxt_work_queue_add(&thr->engine->fast_work_queue, ev->error_handler,
- ev->task, ev, ev->data);
-
- ret = NXT_ERROR;
-
- next:
-
- ch++;
-
- } while (ch < end);
-
- ps->nchanges = 0;
-
- return ret;
-}
-
-
-static nxt_int_t
-nxt_poll_set_add(nxt_thread_t *thr, nxt_poll_event_set_t *ps,
- nxt_poll_change_t *ch)
-{
- nxt_uint_t max_nfds;
- struct pollfd *pfd;
- nxt_lvlhsh_query_t lhq;
- nxt_poll_hash_entry_t *phe;
-
- nxt_log_debug(thr->log, "poll add event: fd:%d ev:%04Xi",
- ch->fd, ch->events);
-
- if (ps->nfds >= ps->max_nfds) {
- max_nfds = ps->max_nfds + 512; /* 4K */
-
- pfd = nxt_realloc(ps->poll_set, sizeof(struct pollfd) * max_nfds);
- if (nxt_slow_path(pfd == NULL)) {
- return NXT_ERROR;
- }
-
- ps->poll_set = pfd;
- ps->max_nfds = max_nfds;
- }
-
- phe = nxt_malloc(sizeof(nxt_poll_hash_entry_t));
- if (nxt_slow_path(phe == NULL)) {
- return NXT_ERROR;
- }
-
- phe->fd = ch->fd;
- phe->index = ps->nfds;
- phe->event = ch->event;
-
- pfd = &ps->poll_set[ps->nfds++];
- pfd->fd = ch->fd;
- pfd->events = ch->events;
- pfd->revents = 0;
-
- lhq.key_hash = nxt_murmur_hash2(&ch->fd, sizeof(nxt_fd_t));
- lhq.replace = 0;
- lhq.key.length = sizeof(nxt_fd_t);
- lhq.key.start = (u_char *) &ch->fd;
- lhq.value = phe;
- lhq.proto = &nxt_poll_fd_hash_proto;
- lhq.data = ps->poll_set;
-
- if (nxt_fast_path(nxt_lvlhsh_insert(&ps->fd_hash, &lhq) == NXT_OK)) {
- return NXT_OK;
- }
-
- nxt_free(phe);
-
- return NXT_ERROR;
-}
-
-
-static nxt_int_t
-nxt_poll_set_change(nxt_thread_t *thr, nxt_poll_event_set_t *ps,
- nxt_poll_change_t *ch)
-{
- nxt_poll_hash_entry_t *phe;
-
- nxt_log_debug(thr->log, "poll change event: fd:%d ev:%04Xi",
- ch->fd, ch->events);
-
- phe = nxt_poll_fd_hash_get(ps, ch->fd);
-
- if (nxt_fast_path(phe != NULL)) {
- ps->poll_set[phe->index].events = ch->events;
- return NXT_OK;
- }
-
- return NXT_ERROR;
-}
-
-
-static nxt_int_t
-nxt_poll_set_delete(nxt_thread_t *thr, nxt_poll_event_set_t *ps,
- nxt_poll_change_t *ch)
-{
- nxt_uint_t index;
- nxt_lvlhsh_query_t lhq;
- nxt_poll_hash_entry_t *phe;
-
- nxt_log_debug(thr->log, "poll delete event: fd:%d", ch->fd);
-
- lhq.key_hash = nxt_murmur_hash2(&ch->fd, sizeof(nxt_fd_t));
- lhq.key.length = sizeof(nxt_fd_t);
- lhq.key.start = (u_char *) &ch->fd;
- lhq.proto = &nxt_poll_fd_hash_proto;
- lhq.data = ps->poll_set;
-
- if (nxt_slow_path(nxt_lvlhsh_delete(&ps->fd_hash, &lhq) != NXT_OK)) {
- /*
- * Ignore NXT_DECLINED error if ch->events
- * has the special value POLLHUP.
- */
- return (ch->events != 0) ? NXT_OK : NXT_ERROR;
- }
-
- phe = lhq.value;
-
- index = phe->index;
- ps->nfds--;
-
- if (index != ps->nfds) {
- ps->poll_set[index] = ps->poll_set[ps->nfds];
-
- phe = nxt_poll_fd_hash_get(ps, ps->poll_set[ps->nfds].fd);
-
- phe->index = index;
- }
-
- nxt_free(lhq.value);
-
- return NXT_OK;
-}
-
-
-static void
-nxt_poll_set_poll(nxt_task_t *task, nxt_event_set_t *event_set,
- nxt_msec_t timeout)
-{
- int nevents;
- nxt_fd_t fd;
- nxt_err_t err;
- nxt_bool_t error;
- nxt_uint_t i, events, level;
- struct pollfd *pfd;
- nxt_event_fd_t *ev;
- nxt_poll_event_set_t *ps;
- nxt_poll_hash_entry_t *phe;
-
- ps = &event_set->poll;
-
- if (ps->nchanges != 0) {
- if (nxt_poll_commit_changes(nxt_thread(), ps) != NXT_OK) {
- /* Error handlers have been enqueued on failure. */
- timeout = 0;
- }
- }
-
- nxt_debug(task, "poll() events:%ui timeout:%M", ps->nfds, timeout);
-
- nevents = poll(ps->poll_set, ps->nfds, timeout);
-
- err = (nevents == -1) ? nxt_errno : 0;
-
- nxt_thread_time_update(task->thread);
-
- nxt_debug(task, "poll(): %d", nevents);
-
- if (nevents == -1) {
- level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_ALERT;
- nxt_log(task, level, "poll() failed %E", err);
- return;
- }
-
- for (i = 0; i < ps->nfds && nevents != 0; i++) {
-
- pfd = &ps->poll_set[i];
- events = pfd->revents;
-
- if (events == 0) {
- continue;
- }
-
- fd = pfd->fd;
-
- phe = nxt_poll_fd_hash_get(ps, fd);
-
- if (nxt_slow_path(phe == NULL)) {
- nxt_log(task, NXT_LOG_CRIT,
- "poll() returned invalid fd:%d ev:%04Xd rev:%04uXi",
- fd, pfd->events, events);
-
- /* Mark the poll entry to ignore it by the kernel. */
- pfd->fd = -1;
- goto next;
- }
-
- ev = phe->event;
-
- nxt_debug(ev->task, "poll: fd:%d ev:%04uXi rd:%d %wr:%d",
- fd, events, ev->read, ev->write);
-
- if (nxt_slow_path((events & POLLNVAL) != 0)) {
- nxt_log(ev->task, NXT_LOG_CRIT,
- "poll() error fd:%d ev:%04Xd rev:%04uXi",
- fd, pfd->events, events);
-
- /* Mark the poll entry to ignore it by the kernel. */
- pfd->fd = -1;
-
- nxt_work_queue_add(&ev->task->thread->engine->fast_work_queue,
- ev->error_handler,
- ev->task, ev, ev->data);
- goto next;
- }
-
- /*
- * On a socket's remote end close:
- *
- * Linux, FreeBSD, and Solaris set POLLIN;
- * MacOSX sets POLLIN and POLLHUP;
- * NetBSD sets POLLIN, and poll(2) claims this explicitly:
- *
- * If the remote end of a socket is closed, poll()
- * returns a POLLIN event, rather than a POLLHUP.
- *
- * On error:
- *
- * Linux sets POLLHUP and POLLERR only;
- * FreeBSD adds POLLHUP to POLLIN or POLLOUT, although poll(2)
- * claims the opposite:
- *
- * Note that POLLHUP and POLLOUT should never be
- * present in the revents bitmask at the same time.
- *
- * Solaris and NetBSD do not add POLLHUP or POLLERR;
- * MacOSX sets POLLHUP only.
- *
- * If an implementation sets POLLERR or POLLHUP only without POLLIN
- * or POLLOUT, the "error" variable enqueues only one active handler.
- */
-
- error = (((events & (POLLERR | POLLHUP)) != 0)
- && ((events & (POLLIN | POLLOUT)) == 0));
-
- if ((events & POLLIN) || (error && ev->read_handler != NULL)) {
- error = 0;
- ev->read_ready = 1;
-
- if (ev->read == NXT_EVENT_ONESHOT) {
- ev->read = NXT_EVENT_INACTIVE;
- nxt_poll_change(event_set, ev, NXT_POLL_DELETE, 0);
- }
-
- nxt_work_queue_add(ev->read_work_queue, ev->read_handler,
- ev->task, ev, ev->data);
- }
-
- if ((events & POLLOUT) || (error && ev->write_handler != NULL)) {
- ev->write_ready = 1;
-
- if (ev->write == NXT_EVENT_ONESHOT) {
- ev->write = NXT_EVENT_INACTIVE;
- nxt_poll_change(event_set, ev, NXT_POLL_DELETE, 0);
- }
-
- nxt_work_queue_add(ev->write_work_queue, ev->write_handler,
- ev->task, ev, ev->data);
- }
-
- next:
-
- nevents--;
- }
-}
-
-
-static nxt_poll_hash_entry_t *
-nxt_poll_fd_hash_get(nxt_poll_event_set_t *ps, nxt_fd_t fd)
-{
- nxt_lvlhsh_query_t lhq;
- nxt_poll_hash_entry_t *phe;
-
- lhq.key_hash = nxt_murmur_hash2(&fd, sizeof(nxt_fd_t));
- lhq.key.length = sizeof(nxt_fd_t);
- lhq.key.start = (u_char *) &fd;
- lhq.proto = &nxt_poll_fd_hash_proto;
- lhq.data = ps->poll_set;
-
- if (nxt_lvlhsh_find(&ps->fd_hash, &lhq) == NXT_OK) {
- phe = lhq.value;
- return phe;
- }
-
- nxt_thread_log_alert("fd %d not found in hash", fd);
-
- return NULL;
-}
-
-
-static nxt_int_t
-nxt_poll_fd_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
-{
- struct pollfd *poll_set;
- nxt_poll_hash_entry_t *phe;
-
- phe = data;
-
- if (*(nxt_fd_t *) lhq->key.start == phe->fd) {
- poll_set = lhq->data;
-
- if (nxt_fast_path(phe->fd == poll_set[phe->index].fd)) {
- return NXT_OK;
- }
-
- nxt_thread_log_alert("fd %d in hash mismatches fd %d in poll set",
- phe->fd, poll_set[phe->index].fd);
- }
-
- return NXT_DECLINED;
-}
-
-
-static void
-nxt_poll_fd_hash_destroy(nxt_lvlhsh_t *lh)
-{
- nxt_lvlhsh_each_t lhe;
- nxt_lvlhsh_query_t lhq;
- nxt_poll_hash_entry_t *phe;
-
- nxt_memzero(&lhe, sizeof(nxt_lvlhsh_each_t));
- lhe.proto = &nxt_poll_fd_hash_proto;
- lhq.proto = &nxt_poll_fd_hash_proto;
-
- for ( ;; ) {
- phe = nxt_lvlhsh_each(lh, &lhe);
-
- if (phe == NULL) {
- return;
- }
-
- lhq.key_hash = nxt_murmur_hash2(&phe->fd, sizeof(nxt_fd_t));
- lhq.key.length = sizeof(nxt_fd_t);
- lhq.key.start = (u_char *) &phe->fd;
-
- if (nxt_lvlhsh_delete(lh, &lhq) != NXT_OK) {
- nxt_thread_log_alert("event fd %d not found in hash", phe->fd);
- }
-
- nxt_free(phe);
- }
-}