/* * Copyright (C) Igor Sysoev * Copyright (C) NGINX, Inc. */ #include #define NXT_POLL_ADD 0 #define NXT_POLL_CHANGE 1 #define NXT_POLL_DELETE 2 typedef struct { /* * A file descriptor is stored in hash entry to allow * nxt_poll_fd_hash_test() to not dereference a pointer to * nxt_event_fd_t which may be invalid if the file descriptor has * been already closed and the nxt_event_fd_t's memory has been freed. */ nxt_socket_t fd; uint32_t index; void *event; } nxt_poll_hash_entry_t; static nxt_event_set_t *nxt_poll_create(nxt_event_signals_t *signals, nxt_uint_t mchanges, nxt_uint_t mevents); static void nxt_poll_free(nxt_event_set_t *event_set); static void nxt_poll_enable(nxt_event_set_t *event_set, nxt_event_fd_t *ev); static void nxt_poll_disable(nxt_event_set_t *event_set, nxt_event_fd_t *ev); static void nxt_poll_drop_changes(nxt_event_set_t *event_set, nxt_event_fd_t *ev); static void nxt_poll_enable_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev); static void nxt_poll_enable_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev); static void nxt_poll_disable_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev); static void nxt_poll_disable_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev); static void nxt_poll_block_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev); static void nxt_poll_block_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev); static void nxt_poll_oneshot_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev); static void nxt_poll_oneshot_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev); static void nxt_poll_change(nxt_event_set_t *event_set, nxt_event_fd_t *ev, nxt_uint_t op, nxt_uint_t events); static nxt_int_t nxt_poll_commit_changes(nxt_thread_t *thr, nxt_poll_event_set_t *ps); static nxt_int_t nxt_poll_set_add(nxt_thread_t *thr, nxt_poll_event_set_t *ps, nxt_poll_change_t *ch); static nxt_int_t nxt_poll_set_change(nxt_thread_t *thr, nxt_poll_event_set_t *ps, nxt_poll_change_t *ch); static nxt_int_t nxt_poll_set_delete(nxt_thread_t *thr, nxt_poll_event_set_t *ps, nxt_poll_change_t *ch); static void nxt_poll_set_poll(nxt_task_t *task, nxt_event_set_t *event_set, nxt_msec_t timeout); static nxt_poll_hash_entry_t *nxt_poll_fd_hash_get(nxt_poll_event_set_t *ps, nxt_fd_t fd); static nxt_int_t nxt_poll_fd_hash_test(nxt_lvlhsh_query_t *lhq, void *data); static void nxt_poll_fd_hash_destroy(nxt_lvlhsh_t *lh); const nxt_event_set_ops_t nxt_poll_event_set = { "poll", nxt_poll_create, nxt_poll_free, nxt_poll_enable, nxt_poll_disable, nxt_poll_disable, nxt_poll_disable, nxt_poll_enable_read, nxt_poll_enable_write, nxt_poll_disable_read, nxt_poll_disable_write, nxt_poll_block_read, nxt_poll_block_write, nxt_poll_oneshot_read, nxt_poll_oneshot_write, nxt_poll_enable_read, NULL, NULL, NULL, NULL, nxt_poll_set_poll, &nxt_unix_event_conn_io, NXT_NO_FILE_EVENTS, NXT_NO_SIGNAL_EVENTS, }; static const nxt_lvlhsh_proto_t nxt_poll_fd_hash_proto nxt_aligned(64) = { NXT_LVLHSH_LARGE_MEMALIGN, 0, nxt_poll_fd_hash_test, nxt_lvlhsh_alloc, nxt_lvlhsh_free, }; static nxt_event_set_t * nxt_poll_create(nxt_event_signals_t *signals, nxt_uint_t mchanges, nxt_uint_t mevents) { nxt_event_set_t *event_set; nxt_poll_event_set_t *ps; event_set = nxt_zalloc(sizeof(nxt_poll_event_set_t)); if (event_set == NULL) { return NULL; } ps = &event_set->poll; ps->mchanges = mchanges; ps->changes = nxt_malloc(sizeof(nxt_poll_change_t) * mchanges); if (ps->changes == NULL) { nxt_free(event_set); return NULL; } return event_set; } static void nxt_poll_free(nxt_event_set_t *event_set) { nxt_poll_event_set_t *ps; ps = &event_set->poll; nxt_main_log_debug("poll free"); nxt_free(ps->poll_set); nxt_free(ps->changes); nxt_poll_fd_hash_destroy(&ps->fd_hash); nxt_free(ps); } static void nxt_poll_enable(nxt_event_set_t *event_set, nxt_event_fd_t *ev) { ev->read = NXT_EVENT_DEFAULT; ev->write = NXT_EVENT_DEFAULT; nxt_poll_change(event_set, ev, NXT_POLL_ADD, POLLIN | POLLOUT); } static void nxt_poll_disable(nxt_event_set_t *event_set, nxt_event_fd_t *ev) { ev->read = NXT_EVENT_INACTIVE; ev->write = NXT_EVENT_INACTIVE; nxt_poll_drop_changes(event_set, ev); /* * A simple non-zero value POLLHUP is a flag to ignore error handling * if the event is not present in poll set, because the event may be * freed at the time when the NXT_POLL_DELETE change will be processed * and correct event error_handler will not be available. */ nxt_poll_change(event_set, ev, NXT_POLL_DELETE, POLLHUP); } static void nxt_poll_drop_changes(nxt_event_set_t *event_set, nxt_event_fd_t *ev) { nxt_poll_change_t *dst, *src, *end; nxt_poll_event_set_t *ps; ps = &event_set->poll; dst = ps->changes; end = dst + ps->nchanges; for (src = dst; src < end; src++) { if (src->event == ev) { continue; } if (dst != src) { *dst = *src; } dst++; } ps->nchanges -= end - dst; } static void nxt_poll_enable_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev) { nxt_uint_t op, events; ev->read = NXT_EVENT_DEFAULT; if (ev->write == NXT_EVENT_INACTIVE) { op = NXT_POLL_ADD; events = POLLIN; } else { op = NXT_POLL_CHANGE; events = POLLIN | POLLOUT; } nxt_poll_change(event_set, ev, op, events); } static void nxt_poll_enable_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev) { nxt_uint_t op, events; ev->write = NXT_EVENT_DEFAULT; if (ev->read == NXT_EVENT_INACTIVE) { op = NXT_POLL_ADD; events = POLLOUT; } else { op = NXT_POLL_CHANGE; events = POLLIN | POLLOUT; } nxt_poll_change(event_set, ev, op, events); } static void nxt_poll_disable_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev) { nxt_uint_t op, events; ev->read = NXT_EVENT_INACTIVE; if (ev->write == NXT_EVENT_INACTIVE) { op = NXT_POLL_DELETE; events = 0; } else { op = NXT_POLL_CHANGE; events = POLLOUT; } nxt_poll_change(event_set, ev, op, events); } static void nxt_poll_disable_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev) { nxt_uint_t op, events; ev->write = NXT_EVENT_INACTIVE; if (ev->read == NXT_EVENT_INACTIVE) { op = NXT_POLL_DELETE; events = 0; } else { op = NXT_POLL_CHANGE; events = POLLIN; } nxt_poll_change(event_set, ev, op, events); } static void nxt_poll_block_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev) { if (ev->read != NXT_EVENT_INACTIVE) { nxt_poll_disable_read(event_set, ev); } } static void nxt_poll_block_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev) { if (ev->write != NXT_EVENT_INACTIVE) { nxt_poll_disable_write(event_set, ev); } } static void nxt_poll_oneshot_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev) { nxt_uint_t op; op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ? NXT_POLL_ADD : NXT_POLL_CHANGE; ev->read = NXT_EVENT_ONESHOT; ev->write = NXT_EVENT_INACTIVE; nxt_poll_change(event_set, ev, op, POLLIN); } static void nxt_poll_oneshot_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev) { nxt_uint_t op; op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ? NXT_POLL_ADD : NXT_POLL_CHANGE; ev->read = NXT_EVENT_INACTIVE; ev->write = NXT_EVENT_ONESHOT; nxt_poll_change(event_set, ev, op, POLLOUT); } /* * poll changes are batched to improve instruction and data cache * locality of several lvlhsh operations followed by poll() call. */ static void nxt_poll_change(nxt_event_set_t *event_set, nxt_event_fd_t *ev, nxt_uint_t op, nxt_uint_t events) { nxt_poll_change_t *ch; nxt_poll_event_set_t *ps; nxt_log_debug(ev->log, "poll change: fd:%d op:%d ev:%XD", ev->fd, op, events); ps = &event_set->poll; if (ps->nchanges >= ps->mchanges) { (void) nxt_poll_commit_changes(nxt_thread(), ps); } ch = &ps->changes[ps->nchanges++]; ch->op = op; ch->fd = ev->fd; ch->events = events; ch->event = ev; } static nxt_int_t nxt_poll_commit_changes(nxt_thread_t *thr, nxt_poll_event_set_t *ps) { nxt_int_t ret; nxt_event_fd_t *ev; nxt_poll_change_t *ch, *end; nxt_log_debug(thr->log, "poll changes:%ui", ps->nchanges); ret = NXT_OK; ch = ps->changes; end = ch + ps->nchanges; do { ev = ch->event; switch (ch->op) { case NXT_POLL_ADD: if (nxt_fast_path(nxt_poll_set_add(thr, ps, ch) == NXT_OK)) { goto next; } break; case NXT_POLL_CHANGE: if (nxt_fast_path(nxt_poll_set_change(thr, ps, ch) == NXT_OK)) { goto next; } break; case NXT_POLL_DELETE: if (nxt_fast_path(nxt_poll_set_delete(thr, ps, ch) == NXT_OK)) { goto next; } break; } nxt_thread_work_queue_add(thr, &thr->work_queue.main, ev->error_handler, ev->task, ev, ev->data); ret = NXT_ERROR; next: ch++; } while (ch < end); ps->nchanges = 0; return ret; } static nxt_int_t nxt_poll_set_add(nxt_thread_t *thr, nxt_poll_event_set_t *ps, nxt_poll_change_t *ch) { nxt_uint_t max_nfds; struct pollfd *pfd; nxt_lvlhsh_query_t lhq; nxt_poll_hash_entry_t *phe; nxt_log_debug(thr->log, "poll add event: fd:%d ev:%04Xi", ch->fd, ch->events); if (ps->nfds >= ps->max_nfds) { max_nfds = ps->max_nfds + 512; /* 4K */ pfd = nxt_realloc(ps->poll_set, sizeof(struct pollfd) * max_nfds); if (nxt_slow_path(pfd == NULL)) { return NXT_ERROR; } ps->poll_set = pfd; ps->max_nfds = max_nfds; } phe = nxt_malloc(sizeof(nxt_poll_hash_entry_t)); if (nxt_slow_path(phe == NULL)) { return NXT_ERROR; } phe->fd = ch->fd; phe->index = ps->nfds; phe->event = ch->event; pfd = &ps->poll_set[ps->nfds++]; pfd->fd = ch->fd; pfd->events = ch->events; pfd->revents = 0; lhq.key_hash = nxt_murmur_hash2(&ch->fd, sizeof(nxt_fd_t)); lhq.replace = 0; lhq.key.len = sizeof(nxt_fd_t); lhq.key.data = (u_char *) &ch->fd; lhq.value = phe; lhq.proto = &nxt_poll_fd_hash_proto; lhq.data = ps->poll_set; if (nxt_fast_path(nxt_lvlhsh_insert(&ps->fd_hash, &lhq) == NXT_OK)) { return NXT_OK; } nxt_free(phe); return NXT_ERROR; } static nxt_int_t nxt_poll_set_change(nxt_thread_t *thr, nxt_poll_event_set_t *ps, nxt_poll_change_t *ch) { nxt_poll_hash_entry_t *phe; nxt_log_debug(thr->log, "poll change event: fd:%d ev:%04Xi", ch->fd, ch->events); phe = nxt_poll_fd_hash_get(ps, ch->fd); if (nxt_fast_path(phe != NULL)) { ps->poll_set[phe->index].events = ch->events; return NXT_OK; } return NXT_ERROR; } static nxt_int_t nxt_poll_set_delete(nxt_thread_t *thr, nxt_poll_event_set_t *ps, nxt_poll_change_t *ch) { nxt_uint_t index; nxt_lvlhsh_query_t lhq; nxt_poll_hash_entry_t *phe; nxt_log_debug(thr->log, "poll delete event: fd:%d", ch->fd); lhq.key_hash = nxt_murmur_hash2(&ch->fd, sizeof(nxt_fd_t)); lhq.key.len = sizeof(nxt_fd_t); lhq.key.data = (u_char *) &ch->fd; lhq.proto = &nxt_poll_fd_hash_proto; lhq.data = ps->poll_set; if (nxt_slow_path(nxt_lvlhsh_delete(&ps->fd_hash, &lhq) != NXT_OK)) { /* * Ignore NXT_DECLINED error if ch->events * has the special value POLLHUP. */ return (ch->events != 0) ? NXT_OK : NXT_ERROR; } phe = lhq.value; index = phe->index; ps->nfds--; if (index != ps->nfds) { ps->poll_set[index] = ps->poll_set[ps->nfds]; phe = nxt_poll_fd_hash_get(ps, ps->poll_set[ps->nfds].fd); phe->index = index; } nxt_free(lhq.value); return NXT_OK; } static void nxt_poll_set_poll(nxt_task_t *task, nxt_event_set_t *event_set, nxt_msec_t timeout) { int nevents; nxt_fd_t fd; nxt_err_t err; nxt_bool_t error; nxt_uint_t i, events, level; struct pollfd *pfd; nxt_event_fd_t *ev; nxt_poll_event_set_t *ps; nxt_poll_hash_entry_t *phe; ps = &event_set->poll; if (ps->nchanges != 0) { if (nxt_poll_commit_changes(nxt_thread(), ps) != NXT_OK) { /* Error handlers have been enqueued on failure. */ timeout = 0; } } nxt_debug(task, "poll() events:%ui timeout:%M", ps->nfds, timeout); nevents = poll(ps->poll_set, ps->nfds, timeout); err = (nevents == -1) ? nxt_errno : 0; nxt_thread_time_update(task->thread); nxt_debug(task, "poll(): %d", nevents); if (nevents == -1) { level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_ALERT; nxt_log(task, level, "poll() failed %E", err); return; } for (i = 0; i < ps->nfds && nevents != 0; i++) { pfd = &ps->poll_set[i]; events = pfd->revents; if (events == 0) { continue; } fd = pfd->fd; phe = nxt_poll_fd_hash_get(ps, fd); if (nxt_slow_path(phe == NULL)) { nxt_log(task, NXT_LOG_CRIT, "poll() returned invalid fd:%d ev:%04Xd rev:%04uXi", fd, pfd->events, events); /* Mark the poll entry to ignore it by the kernel. */ pfd->fd = -1; goto next; } ev = phe->event; nxt_debug(ev->task, "poll: fd:%d ev:%04uXi rd:%d %wr:%d", fd, events, ev->read, ev->write); if (nxt_slow_path((events & POLLNVAL) != 0)) { nxt_log(ev->task, NXT_LOG_CRIT, "poll() error fd:%d ev:%04Xd rev:%04uXi", fd, pfd->events, events); /* Mark the poll entry to ignore it by the kernel. */ pfd->fd = -1; nxt_thread_work_queue_add(task->thread, &task->thread->work_queue.main, ev->error_handler, ev->task, ev, ev->data); goto next; } /* * On a socket's remote end close: * * Linux, FreeBSD, and Solaris set POLLIN; * MacOSX sets POLLIN and POLLHUP; * NetBSD sets POLLIN, and poll(2) claims this explicitly: * * If the remote end of a socket is closed, poll() * returns a POLLIN event, rather than a POLLHUP. * * On error: * * Linux sets POLLHUP and POLLERR only; * FreeBSD adds POLLHUP to POLLIN or POLLOUT, although poll(2) * claims the opposite: * * Note that POLLHUP and POLLOUT should never be * present in the revents bitmask at the same time. * * Solaris and NetBSD do not add POLLHUP or POLLERR; * MacOSX sets POLLHUP only. * * If an implementation sets POLLERR or POLLHUP only without POLLIN * or POLLOUT, the "error" variable enqueues only one active handler. */ error = (((events & (POLLERR | POLLHUP)) != 0) && ((events & (POLLIN | POLLOUT)) == 0)); if ((events & POLLIN) || (error && ev->read_handler != NULL)) { error = 0; ev->read_ready = 1; if (ev->read == NXT_EVENT_ONESHOT) { ev->read = NXT_EVENT_INACTIVE; nxt_poll_change(event_set, ev, NXT_POLL_DELETE, 0); } nxt_thread_work_queue_add(task->thread, ev->read_work_queue, ev->read_handler, ev->task, ev, ev->data); } if ((events & POLLOUT) || (error && ev->write_handler != NULL)) { ev->write_ready = 1; if (ev->write == NXT_EVENT_ONESHOT) { ev->write = NXT_EVENT_INACTIVE; nxt_poll_change(event_set, ev, NXT_POLL_DELETE, 0); } nxt_thread_work_queue_add(task->thread, ev->write_work_queue, ev->write_handler, ev->task, ev, ev->data); } next: nevents--; } } static nxt_poll_hash_entry_t * nxt_poll_fd_hash_get(nxt_poll_event_set_t *ps, nxt_fd_t fd) { nxt_lvlhsh_query_t lhq; nxt_poll_hash_entry_t *phe; lhq.key_hash = nxt_murmur_hash2(&fd, sizeof(nxt_fd_t)); lhq.key.len = sizeof(nxt_fd_t); lhq.key.data = (u_char *) &fd; lhq.proto = &nxt_poll_fd_hash_proto; lhq.data = ps->poll_set; if (nxt_lvlhsh_find(&ps->fd_hash, &lhq) == NXT_OK) { phe = lhq.value; return phe; } nxt_thread_log_alert("fd %d not found in hash", fd); return NULL; } static nxt_int_t nxt_poll_fd_hash_test(nxt_lvlhsh_query_t *lhq, void *data) { struct pollfd *poll_set; nxt_poll_hash_entry_t *phe; phe = data; if (*(nxt_fd_t *) lhq->key.data == phe->fd) { poll_set = lhq->data; if (nxt_fast_path(phe->fd == poll_set[phe->index].fd)) { return NXT_OK; } nxt_thread_log_alert("fd %d in hash mismatches fd %d in poll set", phe->fd, poll_set[phe->index].fd); } return NXT_DECLINED; } static void nxt_poll_fd_hash_destroy(nxt_lvlhsh_t *lh) { nxt_lvlhsh_each_t lhe; nxt_lvlhsh_query_t lhq; nxt_poll_hash_entry_t *phe; nxt_memzero(&lhe, sizeof(nxt_lvlhsh_each_t)); lhe.proto = &nxt_poll_fd_hash_proto; lhq.proto = &nxt_poll_fd_hash_proto; for ( ;; ) { phe = nxt_lvlhsh_each(lh, &lhe); if (phe == NULL) { return; } lhq.key_hash = nxt_murmur_hash2(&phe->fd, sizeof(nxt_fd_t)); lhq.key.len = sizeof(nxt_fd_t); lhq.key.data = (u_char *) &phe->fd; if (nxt_lvlhsh_delete(lh, &lhq) != NXT_OK) { nxt_thread_log_alert("event fd %d not found in hash", phe->fd); } nxt_free(phe); } }