diff options
Diffstat (limited to 'src/nxt_poll.c')
-rw-r--r-- | src/nxt_poll.c | 752 |
1 files changed, 0 insertions, 752 deletions
diff --git a/src/nxt_poll.c b/src/nxt_poll.c deleted file mode 100644 index fd195261..00000000 --- a/src/nxt_poll.c +++ /dev/null @@ -1,752 +0,0 @@ - -/* - * Copyright (C) Igor Sysoev - * Copyright (C) NGINX, Inc. - */ - -#include <nxt_main.h> - - -#define NXT_POLL_ADD 0 -#define NXT_POLL_CHANGE 1 -#define NXT_POLL_DELETE 2 - - -typedef struct { - /* - * A file descriptor is stored in hash entry to allow - * nxt_poll_fd_hash_test() to not dereference a pointer to - * nxt_event_fd_t which may be invalid if the file descriptor has - * been already closed and the nxt_event_fd_t's memory has been freed. - */ - nxt_socket_t fd; - - uint32_t index; - void *event; -} nxt_poll_hash_entry_t; - - -static nxt_event_set_t *nxt_poll_create(nxt_event_signals_t *signals, - nxt_uint_t mchanges, nxt_uint_t mevents); -static void nxt_poll_free(nxt_event_set_t *event_set); -static void nxt_poll_enable(nxt_event_set_t *event_set, nxt_event_fd_t *ev); -static void nxt_poll_disable(nxt_event_set_t *event_set, nxt_event_fd_t *ev); -static void nxt_poll_drop_changes(nxt_event_set_t *event_set, - nxt_event_fd_t *ev); -static void nxt_poll_enable_read(nxt_event_set_t *event_set, - nxt_event_fd_t *ev); -static void nxt_poll_enable_write(nxt_event_set_t *event_set, - nxt_event_fd_t *ev); -static void nxt_poll_disable_read(nxt_event_set_t *event_set, - nxt_event_fd_t *ev); -static void nxt_poll_disable_write(nxt_event_set_t *event_set, - nxt_event_fd_t *ev); -static void nxt_poll_block_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev); -static void nxt_poll_block_write(nxt_event_set_t *event_set, - nxt_event_fd_t *ev); -static void nxt_poll_oneshot_read(nxt_event_set_t *event_set, - nxt_event_fd_t *ev); -static void nxt_poll_oneshot_write(nxt_event_set_t *event_set, - nxt_event_fd_t *ev); -static void nxt_poll_change(nxt_event_set_t *event_set, nxt_event_fd_t *ev, - nxt_uint_t op, nxt_uint_t events); -static nxt_int_t nxt_poll_commit_changes(nxt_thread_t *thr, - nxt_poll_event_set_t *ps); -static nxt_int_t nxt_poll_set_add(nxt_thread_t *thr, nxt_poll_event_set_t *ps, - nxt_poll_change_t *ch); -static nxt_int_t nxt_poll_set_change(nxt_thread_t *thr, - nxt_poll_event_set_t *ps, nxt_poll_change_t *ch); -static nxt_int_t nxt_poll_set_delete(nxt_thread_t *thr, - nxt_poll_event_set_t *ps, nxt_poll_change_t *ch); -static void nxt_poll_set_poll(nxt_task_t *task, nxt_event_set_t *event_set, - nxt_msec_t timeout); -static nxt_poll_hash_entry_t *nxt_poll_fd_hash_get(nxt_poll_event_set_t *ps, - nxt_fd_t fd); -static nxt_int_t nxt_poll_fd_hash_test(nxt_lvlhsh_query_t *lhq, void *data); -static void nxt_poll_fd_hash_destroy(nxt_lvlhsh_t *lh); - - -const nxt_event_set_ops_t nxt_poll_event_set = { - "poll", - nxt_poll_create, - nxt_poll_free, - nxt_poll_enable, - nxt_poll_disable, - nxt_poll_disable, - nxt_poll_disable, - nxt_poll_enable_read, - nxt_poll_enable_write, - nxt_poll_disable_read, - nxt_poll_disable_write, - nxt_poll_block_read, - nxt_poll_block_write, - nxt_poll_oneshot_read, - nxt_poll_oneshot_write, - nxt_poll_enable_read, - NULL, - NULL, - NULL, - NULL, - nxt_poll_set_poll, - - &nxt_unix_event_conn_io, - - NXT_NO_FILE_EVENTS, - NXT_NO_SIGNAL_EVENTS, -}; - - -static const nxt_lvlhsh_proto_t nxt_poll_fd_hash_proto nxt_aligned(64) = -{ - NXT_LVLHSH_LARGE_MEMALIGN, - 0, - nxt_poll_fd_hash_test, - nxt_lvlhsh_alloc, - nxt_lvlhsh_free, -}; - - -static nxt_event_set_t * -nxt_poll_create(nxt_event_signals_t *signals, nxt_uint_t mchanges, - nxt_uint_t mevents) -{ - nxt_event_set_t *event_set; - nxt_poll_event_set_t *ps; - - event_set = nxt_zalloc(sizeof(nxt_poll_event_set_t)); - if (event_set == NULL) { - return NULL; - } - - ps = &event_set->poll; - - ps->mchanges = mchanges; - - ps->changes = nxt_malloc(sizeof(nxt_poll_change_t) * mchanges); - if (ps->changes == NULL) { - nxt_free(event_set); - return NULL; - } - - return event_set; -} - - -static void -nxt_poll_free(nxt_event_set_t *event_set) -{ - nxt_poll_event_set_t *ps; - - ps = &event_set->poll; - - nxt_main_log_debug("poll free"); - - nxt_free(ps->poll_set); - nxt_free(ps->changes); - nxt_poll_fd_hash_destroy(&ps->fd_hash); - nxt_free(ps); -} - - -static void -nxt_poll_enable(nxt_event_set_t *event_set, nxt_event_fd_t *ev) -{ - ev->read = NXT_EVENT_DEFAULT; - ev->write = NXT_EVENT_DEFAULT; - - nxt_poll_change(event_set, ev, NXT_POLL_ADD, POLLIN | POLLOUT); -} - - -static void -nxt_poll_disable(nxt_event_set_t *event_set, nxt_event_fd_t *ev) -{ - ev->read = NXT_EVENT_INACTIVE; - ev->write = NXT_EVENT_INACTIVE; - - nxt_poll_drop_changes(event_set, ev); - /* - * A simple non-zero value POLLHUP is a flag to ignore error handling - * if the event is not present in poll set, because the event may be - * freed at the time when the NXT_POLL_DELETE change will be processed - * and correct event error_handler will not be available. - */ - nxt_poll_change(event_set, ev, NXT_POLL_DELETE, POLLHUP); -} - - -static void -nxt_poll_drop_changes(nxt_event_set_t *event_set, nxt_event_fd_t *ev) -{ - nxt_poll_change_t *dst, *src, *end; - nxt_poll_event_set_t *ps; - - ps = &event_set->poll; - - dst = ps->changes; - end = dst + ps->nchanges; - - for (src = dst; src < end; src++) { - - if (src->event == ev) { - continue; - } - - if (dst != src) { - *dst = *src; - } - - dst++; - } - - ps->nchanges -= end - dst; -} - - -static void -nxt_poll_enable_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev) -{ - nxt_uint_t op, events; - - ev->read = NXT_EVENT_DEFAULT; - - if (ev->write == NXT_EVENT_INACTIVE) { - op = NXT_POLL_ADD; - events = POLLIN; - - } else { - op = NXT_POLL_CHANGE; - events = POLLIN | POLLOUT; - } - - nxt_poll_change(event_set, ev, op, events); -} - - -static void -nxt_poll_enable_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev) -{ - nxt_uint_t op, events; - - ev->write = NXT_EVENT_DEFAULT; - - if (ev->read == NXT_EVENT_INACTIVE) { - op = NXT_POLL_ADD; - events = POLLOUT; - - } else { - op = NXT_POLL_CHANGE; - events = POLLIN | POLLOUT; - } - - nxt_poll_change(event_set, ev, op, events); -} - - -static void -nxt_poll_disable_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev) -{ - nxt_uint_t op, events; - - ev->read = NXT_EVENT_INACTIVE; - - if (ev->write == NXT_EVENT_INACTIVE) { - op = NXT_POLL_DELETE; - events = 0; - - } else { - op = NXT_POLL_CHANGE; - events = POLLOUT; - } - - nxt_poll_change(event_set, ev, op, events); -} - - -static void -nxt_poll_disable_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev) -{ - nxt_uint_t op, events; - - ev->write = NXT_EVENT_INACTIVE; - - if (ev->read == NXT_EVENT_INACTIVE) { - op = NXT_POLL_DELETE; - events = 0; - - } else { - op = NXT_POLL_CHANGE; - events = POLLIN; - } - - nxt_poll_change(event_set, ev, op, events); -} - - -static void -nxt_poll_block_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev) -{ - if (ev->read != NXT_EVENT_INACTIVE) { - nxt_poll_disable_read(event_set, ev); - } -} - - -static void -nxt_poll_block_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev) -{ - if (ev->write != NXT_EVENT_INACTIVE) { - nxt_poll_disable_write(event_set, ev); - } -} - - -static void -nxt_poll_oneshot_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev) -{ - nxt_uint_t op; - - op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ? - NXT_POLL_ADD : NXT_POLL_CHANGE; - - ev->read = NXT_EVENT_ONESHOT; - ev->write = NXT_EVENT_INACTIVE; - - nxt_poll_change(event_set, ev, op, POLLIN); -} - - -static void -nxt_poll_oneshot_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev) -{ - nxt_uint_t op; - - op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ? - NXT_POLL_ADD : NXT_POLL_CHANGE; - - ev->read = NXT_EVENT_INACTIVE; - ev->write = NXT_EVENT_ONESHOT; - - nxt_poll_change(event_set, ev, op, POLLOUT); -} - - -/* - * poll changes are batched to improve instruction and data cache - * locality of several lvlhsh operations followed by poll() call. - */ - -static void -nxt_poll_change(nxt_event_set_t *event_set, nxt_event_fd_t *ev, nxt_uint_t op, - nxt_uint_t events) -{ - nxt_poll_change_t *ch; - nxt_poll_event_set_t *ps; - - nxt_log_debug(ev->log, "poll change: fd:%d op:%d ev:%XD", - ev->fd, op, events); - - ps = &event_set->poll; - - if (ps->nchanges >= ps->mchanges) { - (void) nxt_poll_commit_changes(nxt_thread(), ps); - } - - ch = &ps->changes[ps->nchanges++]; - ch->op = op; - ch->fd = ev->fd; - ch->events = events; - ch->event = ev; -} - - -static nxt_int_t -nxt_poll_commit_changes(nxt_thread_t *thr, nxt_poll_event_set_t *ps) -{ - nxt_int_t ret; - nxt_event_fd_t *ev; - nxt_poll_change_t *ch, *end; - - nxt_log_debug(thr->log, "poll changes:%ui", ps->nchanges); - - ret = NXT_OK; - ch = ps->changes; - end = ch + ps->nchanges; - - do { - ev = ch->event; - - switch (ch->op) { - - case NXT_POLL_ADD: - if (nxt_fast_path(nxt_poll_set_add(thr, ps, ch) == NXT_OK)) { - goto next; - } - break; - - case NXT_POLL_CHANGE: - if (nxt_fast_path(nxt_poll_set_change(thr, ps, ch) == NXT_OK)) { - goto next; - } - break; - - case NXT_POLL_DELETE: - if (nxt_fast_path(nxt_poll_set_delete(thr, ps, ch) == NXT_OK)) { - goto next; - } - break; - } - - nxt_work_queue_add(&thr->engine->fast_work_queue, ev->error_handler, - ev->task, ev, ev->data); - - ret = NXT_ERROR; - - next: - - ch++; - - } while (ch < end); - - ps->nchanges = 0; - - return ret; -} - - -static nxt_int_t -nxt_poll_set_add(nxt_thread_t *thr, nxt_poll_event_set_t *ps, - nxt_poll_change_t *ch) -{ - nxt_uint_t max_nfds; - struct pollfd *pfd; - nxt_lvlhsh_query_t lhq; - nxt_poll_hash_entry_t *phe; - - nxt_log_debug(thr->log, "poll add event: fd:%d ev:%04Xi", - ch->fd, ch->events); - - if (ps->nfds >= ps->max_nfds) { - max_nfds = ps->max_nfds + 512; /* 4K */ - - pfd = nxt_realloc(ps->poll_set, sizeof(struct pollfd) * max_nfds); - if (nxt_slow_path(pfd == NULL)) { - return NXT_ERROR; - } - - ps->poll_set = pfd; - ps->max_nfds = max_nfds; - } - - phe = nxt_malloc(sizeof(nxt_poll_hash_entry_t)); - if (nxt_slow_path(phe == NULL)) { - return NXT_ERROR; - } - - phe->fd = ch->fd; - phe->index = ps->nfds; - phe->event = ch->event; - - pfd = &ps->poll_set[ps->nfds++]; - pfd->fd = ch->fd; - pfd->events = ch->events; - pfd->revents = 0; - - lhq.key_hash = nxt_murmur_hash2(&ch->fd, sizeof(nxt_fd_t)); - lhq.replace = 0; - lhq.key.length = sizeof(nxt_fd_t); - lhq.key.start = (u_char *) &ch->fd; - lhq.value = phe; - lhq.proto = &nxt_poll_fd_hash_proto; - lhq.data = ps->poll_set; - - if (nxt_fast_path(nxt_lvlhsh_insert(&ps->fd_hash, &lhq) == NXT_OK)) { - return NXT_OK; - } - - nxt_free(phe); - - return NXT_ERROR; -} - - -static nxt_int_t -nxt_poll_set_change(nxt_thread_t *thr, nxt_poll_event_set_t *ps, - nxt_poll_change_t *ch) -{ - nxt_poll_hash_entry_t *phe; - - nxt_log_debug(thr->log, "poll change event: fd:%d ev:%04Xi", - ch->fd, ch->events); - - phe = nxt_poll_fd_hash_get(ps, ch->fd); - - if (nxt_fast_path(phe != NULL)) { - ps->poll_set[phe->index].events = ch->events; - return NXT_OK; - } - - return NXT_ERROR; -} - - -static nxt_int_t -nxt_poll_set_delete(nxt_thread_t *thr, nxt_poll_event_set_t *ps, - nxt_poll_change_t *ch) -{ - nxt_uint_t index; - nxt_lvlhsh_query_t lhq; - nxt_poll_hash_entry_t *phe; - - nxt_log_debug(thr->log, "poll delete event: fd:%d", ch->fd); - - lhq.key_hash = nxt_murmur_hash2(&ch->fd, sizeof(nxt_fd_t)); - lhq.key.length = sizeof(nxt_fd_t); - lhq.key.start = (u_char *) &ch->fd; - lhq.proto = &nxt_poll_fd_hash_proto; - lhq.data = ps->poll_set; - - if (nxt_slow_path(nxt_lvlhsh_delete(&ps->fd_hash, &lhq) != NXT_OK)) { - /* - * Ignore NXT_DECLINED error if ch->events - * has the special value POLLHUP. - */ - return (ch->events != 0) ? NXT_OK : NXT_ERROR; - } - - phe = lhq.value; - - index = phe->index; - ps->nfds--; - - if (index != ps->nfds) { - ps->poll_set[index] = ps->poll_set[ps->nfds]; - - phe = nxt_poll_fd_hash_get(ps, ps->poll_set[ps->nfds].fd); - - phe->index = index; - } - - nxt_free(lhq.value); - - return NXT_OK; -} - - -static void -nxt_poll_set_poll(nxt_task_t *task, nxt_event_set_t *event_set, - nxt_msec_t timeout) -{ - int nevents; - nxt_fd_t fd; - nxt_err_t err; - nxt_bool_t error; - nxt_uint_t i, events, level; - struct pollfd *pfd; - nxt_event_fd_t *ev; - nxt_poll_event_set_t *ps; - nxt_poll_hash_entry_t *phe; - - ps = &event_set->poll; - - if (ps->nchanges != 0) { - if (nxt_poll_commit_changes(nxt_thread(), ps) != NXT_OK) { - /* Error handlers have been enqueued on failure. */ - timeout = 0; - } - } - - nxt_debug(task, "poll() events:%ui timeout:%M", ps->nfds, timeout); - - nevents = poll(ps->poll_set, ps->nfds, timeout); - - err = (nevents == -1) ? nxt_errno : 0; - - nxt_thread_time_update(task->thread); - - nxt_debug(task, "poll(): %d", nevents); - - if (nevents == -1) { - level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_ALERT; - nxt_log(task, level, "poll() failed %E", err); - return; - } - - for (i = 0; i < ps->nfds && nevents != 0; i++) { - - pfd = &ps->poll_set[i]; - events = pfd->revents; - - if (events == 0) { - continue; - } - - fd = pfd->fd; - - phe = nxt_poll_fd_hash_get(ps, fd); - - if (nxt_slow_path(phe == NULL)) { - nxt_log(task, NXT_LOG_CRIT, - "poll() returned invalid fd:%d ev:%04Xd rev:%04uXi", - fd, pfd->events, events); - - /* Mark the poll entry to ignore it by the kernel. */ - pfd->fd = -1; - goto next; - } - - ev = phe->event; - - nxt_debug(ev->task, "poll: fd:%d ev:%04uXi rd:%d %wr:%d", - fd, events, ev->read, ev->write); - - if (nxt_slow_path((events & POLLNVAL) != 0)) { - nxt_log(ev->task, NXT_LOG_CRIT, - "poll() error fd:%d ev:%04Xd rev:%04uXi", - fd, pfd->events, events); - - /* Mark the poll entry to ignore it by the kernel. */ - pfd->fd = -1; - - nxt_work_queue_add(&ev->task->thread->engine->fast_work_queue, - ev->error_handler, - ev->task, ev, ev->data); - goto next; - } - - /* - * On a socket's remote end close: - * - * Linux, FreeBSD, and Solaris set POLLIN; - * MacOSX sets POLLIN and POLLHUP; - * NetBSD sets POLLIN, and poll(2) claims this explicitly: - * - * If the remote end of a socket is closed, poll() - * returns a POLLIN event, rather than a POLLHUP. - * - * On error: - * - * Linux sets POLLHUP and POLLERR only; - * FreeBSD adds POLLHUP to POLLIN or POLLOUT, although poll(2) - * claims the opposite: - * - * Note that POLLHUP and POLLOUT should never be - * present in the revents bitmask at the same time. - * - * Solaris and NetBSD do not add POLLHUP or POLLERR; - * MacOSX sets POLLHUP only. - * - * If an implementation sets POLLERR or POLLHUP only without POLLIN - * or POLLOUT, the "error" variable enqueues only one active handler. - */ - - error = (((events & (POLLERR | POLLHUP)) != 0) - && ((events & (POLLIN | POLLOUT)) == 0)); - - if ((events & POLLIN) || (error && ev->read_handler != NULL)) { - error = 0; - ev->read_ready = 1; - - if (ev->read == NXT_EVENT_ONESHOT) { - ev->read = NXT_EVENT_INACTIVE; - nxt_poll_change(event_set, ev, NXT_POLL_DELETE, 0); - } - - nxt_work_queue_add(ev->read_work_queue, ev->read_handler, - ev->task, ev, ev->data); - } - - if ((events & POLLOUT) || (error && ev->write_handler != NULL)) { - ev->write_ready = 1; - - if (ev->write == NXT_EVENT_ONESHOT) { - ev->write = NXT_EVENT_INACTIVE; - nxt_poll_change(event_set, ev, NXT_POLL_DELETE, 0); - } - - nxt_work_queue_add(ev->write_work_queue, ev->write_handler, - ev->task, ev, ev->data); - } - - next: - - nevents--; - } -} - - -static nxt_poll_hash_entry_t * -nxt_poll_fd_hash_get(nxt_poll_event_set_t *ps, nxt_fd_t fd) -{ - nxt_lvlhsh_query_t lhq; - nxt_poll_hash_entry_t *phe; - - lhq.key_hash = nxt_murmur_hash2(&fd, sizeof(nxt_fd_t)); - lhq.key.length = sizeof(nxt_fd_t); - lhq.key.start = (u_char *) &fd; - lhq.proto = &nxt_poll_fd_hash_proto; - lhq.data = ps->poll_set; - - if (nxt_lvlhsh_find(&ps->fd_hash, &lhq) == NXT_OK) { - phe = lhq.value; - return phe; - } - - nxt_thread_log_alert("fd %d not found in hash", fd); - - return NULL; -} - - -static nxt_int_t -nxt_poll_fd_hash_test(nxt_lvlhsh_query_t *lhq, void *data) -{ - struct pollfd *poll_set; - nxt_poll_hash_entry_t *phe; - - phe = data; - - if (*(nxt_fd_t *) lhq->key.start == phe->fd) { - poll_set = lhq->data; - - if (nxt_fast_path(phe->fd == poll_set[phe->index].fd)) { - return NXT_OK; - } - - nxt_thread_log_alert("fd %d in hash mismatches fd %d in poll set", - phe->fd, poll_set[phe->index].fd); - } - - return NXT_DECLINED; -} - - -static void -nxt_poll_fd_hash_destroy(nxt_lvlhsh_t *lh) -{ - nxt_lvlhsh_each_t lhe; - nxt_lvlhsh_query_t lhq; - nxt_poll_hash_entry_t *phe; - - nxt_memzero(&lhe, sizeof(nxt_lvlhsh_each_t)); - lhe.proto = &nxt_poll_fd_hash_proto; - lhq.proto = &nxt_poll_fd_hash_proto; - - for ( ;; ) { - phe = nxt_lvlhsh_each(lh, &lhe); - - if (phe == NULL) { - return; - } - - lhq.key_hash = nxt_murmur_hash2(&phe->fd, sizeof(nxt_fd_t)); - lhq.key.length = sizeof(nxt_fd_t); - lhq.key.start = (u_char *) &phe->fd; - - if (nxt_lvlhsh_delete(lh, &lhq) != NXT_OK) { - nxt_thread_log_alert("event fd %d not found in hash", phe->fd); - } - - nxt_free(phe); - } -} |