diff options
author | Igor Sysoev <igor@sysoev.ru> | 2017-02-07 20:04:56 +0300 |
---|---|---|
committer | Igor Sysoev <igor@sysoev.ru> | 2017-02-07 20:04:56 +0300 |
commit | 059a8642898a6bd4b47d13a1c1d599cd44af7e1c (patch) | |
tree | e3c8c530a04f1ae44777d5ea4fd6901dc55a8ebf /src/nxt_eventport.c | |
parent | e57b95a92333fa7ff558737b0ba2b76894cc0412 (diff) | |
download | unit-059a8642898a6bd4b47d13a1c1d599cd44af7e1c.tar.gz unit-059a8642898a6bd4b47d13a1c1d599cd44af7e1c.tar.bz2 |
Event engines refactoring.
Diffstat (limited to '')
-rw-r--r-- | src/nxt_eventport.c | 646 |
1 files changed, 0 insertions, 646 deletions
diff --git a/src/nxt_eventport.c b/src/nxt_eventport.c deleted file mode 100644 index 02984573..00000000 --- a/src/nxt_eventport.c +++ /dev/null @@ -1,646 +0,0 @@ - -/* - * Copyright (C) Igor Sysoev - * Copyright (C) NGINX, Inc. - */ - -#include <nxt_main.h> - - -/* - * The event ports have been introduced in Solaris 10. - * The PORT_SOURCE_MQ and PORT_SOURCE_FILE sources have - * been added in OpenSolaris. - */ - - -static nxt_event_set_t *nxt_eventport_create(nxt_event_signals_t *signals, - nxt_uint_t mchanges, nxt_uint_t mevents); -static void nxt_eventport_free(nxt_event_set_t *event_set); -static void nxt_eventport_enable(nxt_event_set_t *event_set, - nxt_event_fd_t *ev); -static void nxt_eventport_disable(nxt_event_set_t *event_set, - nxt_event_fd_t *ev); -static void nxt_eventport_close(nxt_event_set_t *event_set, nxt_event_fd_t *ev); -static void nxt_eventport_drop_changes(nxt_event_set_t *event_set, - nxt_event_fd_t *ev); -static void nxt_eventport_enable_read(nxt_event_set_t *event_set, - nxt_event_fd_t *ev); -static void nxt_eventport_enable_write(nxt_event_set_t *event_set, - nxt_event_fd_t *ev); -static void nxt_eventport_enable_event(nxt_event_set_t *event_set, - nxt_event_fd_t *ev, nxt_uint_t events); -static void nxt_eventport_disable_read(nxt_event_set_t *event_set, - nxt_event_fd_t *ev); -static void nxt_eventport_disable_write(nxt_event_set_t *event_set, - nxt_event_fd_t *ev); -static void nxt_eventport_disable_event(nxt_event_set_t *event_set, - nxt_event_fd_t *ev); -static nxt_int_t nxt_eventport_commit_changes(nxt_thread_t *thr, - nxt_eventport_event_set_t *es); -static void nxt_eventport_error_handler(nxt_thread_t *thr, void *obj, - void *data); -static void nxt_eventport_block_read(nxt_event_set_t *event_set, - nxt_event_fd_t *ev); -static void nxt_eventport_block_write(nxt_event_set_t *event_set, - nxt_event_fd_t *ev); -static void nxt_eventport_oneshot_read(nxt_event_set_t *event_set, - nxt_event_fd_t *ev); -static void nxt_eventport_oneshot_write(nxt_event_set_t *event_set, - nxt_event_fd_t *ev); -static void nxt_eventport_enable_accept(nxt_event_set_t *event_set, - nxt_event_fd_t *ev); -static nxt_int_t nxt_eventport_enable_post(nxt_event_set_t *event_set, - nxt_work_handler_t handler); -static void nxt_eventport_signal(nxt_event_set_t *event_set, nxt_uint_t signo); -static void nxt_eventport_poll(nxt_thread_t *thr, nxt_event_set_t *event_set, - nxt_msec_t timeout); - - -const nxt_event_set_ops_t nxt_eventport_event_set = { - "eventport", - nxt_eventport_create, - nxt_eventport_free, - nxt_eventport_enable, - nxt_eventport_disable, - nxt_eventport_disable, - nxt_eventport_close, - nxt_eventport_enable_read, - nxt_eventport_enable_write, - nxt_eventport_disable_read, - nxt_eventport_disable_write, - nxt_eventport_block_read, - nxt_eventport_block_write, - nxt_eventport_oneshot_read, - nxt_eventport_oneshot_write, - nxt_eventport_enable_accept, - NULL, - NULL, - nxt_eventport_enable_post, - nxt_eventport_signal, - nxt_eventport_poll, - - &nxt_unix_event_conn_io, - - NXT_NO_FILE_EVENTS, - NXT_NO_SIGNAL_EVENTS, -}; - - -static nxt_event_set_t * -nxt_eventport_create(nxt_event_signals_t *signals, nxt_uint_t mchanges, - nxt_uint_t mevents) -{ - nxt_event_set_t *event_set; - nxt_eventport_event_set_t *es; - - event_set = nxt_zalloc(sizeof(nxt_eventport_event_set_t)); - if (event_set == NULL) { - return NULL; - } - - es = &event_set->eventport; - - es->port = -1; - es->mchanges = mchanges; - es->mevents = mevents; - - es->changes = nxt_malloc(sizeof(nxt_eventport_change_t) * mchanges); - if (es->changes == NULL) { - goto fail; - } - - es->events = nxt_malloc(sizeof(port_event_t) * mevents); - if (es->events == NULL) { - goto fail; - } - - es->port = port_create(); - if (es->port == -1) { - nxt_main_log_emerg("port_create() failed %E", nxt_errno); - goto fail; - } - - nxt_main_log_debug("port_create(): %d", es->port); - - if (signals != NULL) { - es->signal_handler = signals->handler; - } - - return event_set; - -fail: - - nxt_eventport_free(event_set); - - return NULL; -} - - -static void -nxt_eventport_free(nxt_event_set_t *event_set) -{ - nxt_eventport_event_set_t *es; - - es = &event_set->eventport; - - nxt_main_log_debug("eventport %d free", es->port); - - if (es->port != -1) { - if (close(es->port) != 0) { - nxt_main_log_emerg("eventport close(%d) failed %E", - es->port, nxt_errno); - } - } - - nxt_free(es->events); - nxt_free(es); -} - - -static void -nxt_eventport_enable(nxt_event_set_t *event_set, nxt_event_fd_t *ev) -{ - ev->read = NXT_EVENT_DEFAULT; - ev->write = NXT_EVENT_DEFAULT; - - nxt_eventport_enable_event(event_set, ev, POLLIN | POLLOUT); -} - - -static void -nxt_eventport_disable(nxt_event_set_t *event_set, nxt_event_fd_t *ev) -{ - if (ev->read != NXT_EVENT_INACTIVE || ev->write != NXT_EVENT_INACTIVE) { - - ev->read = NXT_EVENT_INACTIVE; - ev->write = NXT_EVENT_INACTIVE; - - nxt_eventport_disable_event(event_set, ev); - } -} - - -static void -nxt_eventport_close(nxt_event_set_t *event_set, nxt_event_fd_t *ev) -{ - ev->read = NXT_EVENT_INACTIVE; - ev->write = NXT_EVENT_INACTIVE; - - nxt_eventport_drop_changes(event_set, ev); -} - - -static void -nxt_eventport_drop_changes(nxt_event_set_t *event_set, nxt_event_fd_t *ev) -{ - nxt_eventport_change_t *dst, *src, *end; - nxt_eventport_event_set_t *es; - - es = &event_set->eventport; - - dst = es->changes; - end = dst + es->nchanges; - - for (src = dst; src < end; src++) { - - if (src->event == ev) { - continue; - } - - if (dst != src) { - *dst = *src; - } - - dst++; - } - - es->nchanges -= end - dst; -} - - -static void -nxt_eventport_enable_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev) -{ - nxt_uint_t events; - - if (ev->read != NXT_EVENT_BLOCKED) { - events = (ev->write == NXT_EVENT_INACTIVE) ? POLLIN: - (POLLIN | POLLOUT); - nxt_eventport_enable_event(event_set, ev, events); - } - - ev->read = NXT_EVENT_DEFAULT; -} - - -static void -nxt_eventport_enable_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev) -{ - nxt_uint_t events; - - if (ev->write != NXT_EVENT_BLOCKED) { - events = (ev->read == NXT_EVENT_INACTIVE) ? POLLOUT: - (POLLIN | POLLOUT); - nxt_eventport_enable_event(event_set, ev, events); - } - - ev->write = NXT_EVENT_DEFAULT; -} - - -/* - * eventport changes are batched to improve instruction and data - * cache locality of several port_associate() and port_dissociate() - * calls followed by port_getn() call. - */ - -static void -nxt_eventport_enable_event(nxt_event_set_t *event_set, nxt_event_fd_t *ev, - nxt_uint_t events) -{ - nxt_eventport_change_t *ch; - nxt_eventport_event_set_t *es; - - es = &event_set->eventport; - - nxt_log_debug(ev->log, "port %d set event: fd:%d ev:%04XD u:%p", - es->port, ev->fd, events, ev); - - if (es->nchanges >= es->mchanges) { - (void) nxt_eventport_commit_changes(nxt_thread(), es); - } - - ch = &es->changes[es->nchanges++]; - ch->fd = ev->fd; - ch->events = events; - ch->event = ev; -} - - -static void -nxt_eventport_disable_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev) -{ - ev->read = NXT_EVENT_INACTIVE; - - if (ev->write == NXT_EVENT_INACTIVE) { - nxt_eventport_disable_event(event_set, ev); - - } else { - nxt_eventport_enable_event(event_set, ev, POLLOUT); - } -} - - -static void -nxt_eventport_disable_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev) -{ - ev->write = NXT_EVENT_INACTIVE; - - if (ev->read == NXT_EVENT_INACTIVE) { - nxt_eventport_disable_event(event_set, ev); - - } else { - nxt_eventport_enable_event(event_set, ev, POLLIN); - } -} - - -static void -nxt_eventport_disable_event(nxt_event_set_t *event_set, nxt_event_fd_t *ev) -{ - nxt_eventport_change_t *ch; - nxt_eventport_event_set_t *es; - - es = &event_set->eventport; - - nxt_log_debug(ev->log, "port %d disable event : fd:%d", es->port, ev->fd); - - if (es->nchanges >= es->mchanges) { - (void) nxt_eventport_commit_changes(nxt_thread(), es); - } - - ch = &es->changes[es->nchanges++]; - ch->fd = ev->fd; - ch->events = 0; - ch->event = ev; -} - - -static nxt_int_t -nxt_eventport_commit_changes(nxt_thread_t *thr, nxt_eventport_event_set_t *es) -{ - int ret; - nxt_int_t retval; - nxt_event_fd_t *ev; - nxt_eventport_change_t *ch, *end; - - nxt_log_debug(thr->log, "eventport %d changes:%ui", es->port, es->nchanges); - - retval = NXT_OK; - ch = es->changes; - end = ch + es->nchanges; - - do { - ev = ch->event; - - if (ch->events != 0) { - nxt_log_debug(ev->log, "port_associate(%d): fd:%d ev:%04XD u:%p", - es->port, ch->fd, ch->events, ev); - - ret = port_associate(es->port, PORT_SOURCE_FD, ch->fd, - ch->events, ev); - if (ret == 0) { - goto next; - } - - nxt_log_alert(ev->log, - "port_associate(%d, %d, %d, %04XD) failed %E", - es->port, PORT_SOURCE_FD, ch->fd, ch->events, - nxt_errno); - - } else { - nxt_log_debug(ev->log, "port_dissociate(%d): fd:%d", - es->port, ch->fd); - - if (port_dissociate(es->port, PORT_SOURCE_FD, ch->fd) == 0) { - goto next; - } - - nxt_log_alert(ev->log, "port_dissociate(%d, %d, %d) failed %E", - es->port, PORT_SOURCE_FD, ch->fd, nxt_errno); - } - - nxt_thread_work_queue_add(thr, &thr->work_queue.main, - nxt_eventport_error_handler, - ev, ev->data, ev->log); - - retval = NXT_ERROR; - - next: - - ch++; - - } while (ch < end); - - es->nchanges = 0; - - return retval; -} - - -static void -nxt_eventport_error_handler(nxt_thread_t *thr, void *obj, void *data) -{ - nxt_event_fd_t *ev; - - ev = obj; - - ev->read = NXT_EVENT_INACTIVE; - ev->write = NXT_EVENT_INACTIVE; - - ev->error_handler(thr, ev, data); -} - - -static void -nxt_eventport_block_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev) -{ - if (ev->read != NXT_EVENT_INACTIVE) { - ev->read = NXT_EVENT_BLOCKED; - } -} - - -static void -nxt_eventport_block_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev) -{ - if (ev->write != NXT_EVENT_INACTIVE) { - ev->write = NXT_EVENT_BLOCKED; - } -} - - -static void -nxt_eventport_oneshot_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev) -{ - if (ev->read == NXT_EVENT_INACTIVE) { - ev->read = NXT_EVENT_DEFAULT; - - nxt_eventport_enable_event(event_set, ev, POLLIN); - } -} - - -static void -nxt_eventport_oneshot_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev) -{ - if (ev->write == NXT_EVENT_INACTIVE) { - ev->write = NXT_EVENT_DEFAULT; - - nxt_eventport_enable_event(event_set, ev, POLLOUT); - } -} - - -static void -nxt_eventport_enable_accept(nxt_event_set_t *event_set, nxt_event_fd_t *ev) -{ - ev->read = NXT_EVENT_LEVEL; - - nxt_eventport_enable_event(event_set, ev, POLLIN); -} - - -static nxt_int_t -nxt_eventport_enable_post(nxt_event_set_t *event_set, - nxt_work_handler_t handler) -{ - event_set->eventport.post_handler = handler; - - return NXT_OK; -} - - -static void -nxt_eventport_signal(nxt_event_set_t *event_set, nxt_uint_t signo) -{ - nxt_eventport_event_set_t *es; - - es = &event_set->eventport; - - nxt_thread_log_debug("port_send(%d, %ui)", es->port, signo); - - if (port_send(es->port, signo, NULL) != 0) { - nxt_thread_log_alert("port_send(%d) failed %E", es->port, nxt_errno); - } -} - - -static void -nxt_eventport_poll(nxt_thread_t *thr, nxt_event_set_t *event_set, - nxt_msec_t timeout) -{ - int n, events, signo; - uint_t nevents; - nxt_err_t err; - nxt_uint_t i, level; - timespec_t ts, *tp; - port_event_t *event; - nxt_event_fd_t *ev; - nxt_work_handler_t handler; - nxt_eventport_event_set_t *es; - - es = &event_set->eventport; - - if (es->nchanges != 0) { - if (nxt_eventport_commit_changes(thr, es) != NXT_OK) { - /* Error handlers have been enqueued on failure. */ - timeout = 0; - } - } - - if (timeout == NXT_INFINITE_MSEC) { - tp = NULL; - - } else { - ts.tv_sec = timeout / 1000; - ts.tv_nsec = (timeout % 1000) * 1000000; - tp = &ts; - } - - nxt_log_debug(thr->log, "port_getn(%d) timeout: %M", es->port, timeout); - - /* - * A trap for possible error when Solaris does not update nevents - * if ETIME or EINTR is returned. This issue will be logged as - * "unexpected port_getn() event". - * - * The details are in OpenSolaris mailing list thread "port_getn() - * and timeouts - is this a bug or an undocumented feature?" - */ - event = &es->events[0]; - event->portev_events = -1; /* invalid port events */ - event->portev_source = -1; /* invalid port source */ - event->portev_object = -1; - event->portev_user = (void *) -1; - - nevents = 1; - n = port_getn(es->port, es->events, es->mevents, &nevents, tp); - - /* - * 32-bit port_getn() on Solaris 10 x86 returns large negative - * values instead of 0 when returning immediately. - */ - err = (n < 0) ? nxt_errno : 0; - - nxt_thread_time_update(thr); - - if (n == -1) { - if (err == NXT_ETIME || err == NXT_EINTR) { - if (nevents != 0) { - nxt_log_alert(thr->log, "port_getn(%d) failed %E, events:%ud", - es->port, err, nevents); - } - } - - if (err != NXT_ETIME) { - level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_ALERT; - nxt_log_error(level, thr->log, "port_getn(%d) failed %E", - es->port, err); - - if (err != NXT_EINTR) { - return; - } - } - } - - nxt_log_debug(thr->log, "port_getn(%d) events: %d", es->port, nevents); - - for (i = 0; i < nevents; i++) { - event = &es->events[i]; - - switch (event->portev_source) { - - case PORT_SOURCE_FD: - ev = event->portev_user; - events = event->portev_events; - - nxt_log_debug(ev->log, "eventport: fd:%d ev:%04Xd u:%p rd:%d wr:%d", - event->portev_object, events, ev, - ev->read, ev->write); - - if (nxt_slow_path(events & (POLLERR | POLLHUP | POLLNVAL)) != 0) { - nxt_log_alert(ev->log, - "port_getn(%d) error fd:%d events:%04Xud", - es->port, ev->fd, events); - - nxt_thread_work_queue_add(thr, &thr->work_queue.main, - nxt_eventport_error_handler, - ev, ev->data, ev->log); - continue; - } - - if (events & POLLIN) { - ev->read_ready = 1; - - if (ev->read != NXT_EVENT_BLOCKED) { - nxt_thread_work_queue_add(thr, ev->read_work_queue, - ev->read_handler, - ev, ev->data, ev->log); - - } - - if (ev->read != NXT_EVENT_LEVEL) { - ev->read = NXT_EVENT_INACTIVE; - } - } - - if (events & POLLOUT) { - ev->write_ready = 1; - - if (ev->write != NXT_EVENT_BLOCKED) { - nxt_thread_work_queue_add(thr, ev->write_work_queue, - ev->write_handler, - ev, ev->data, ev->log); - } - - ev->write = NXT_EVENT_INACTIVE; - } - - /* - * Reactivate counterpart direction, because the - * eventport is oneshot notification facility. - */ - events = (ev->read == NXT_EVENT_INACTIVE) ? 0 : POLLIN; - events |= (ev->write == NXT_EVENT_INACTIVE) ? 0 : POLLOUT; - - if (events != 0) { - nxt_eventport_enable_event(event_set, ev, events); - } - - break; - - case PORT_SOURCE_USER: - nxt_log_debug(thr->log, "eventport: user ev:%d u:%p", - event->portev_events, event->portev_user); - - signo = event->portev_events; - - handler = (signo == 0) ? es->post_handler : es->signal_handler; - - nxt_thread_work_queue_add(thr, &thr->work_queue.main, handler, - (void *) (uintptr_t) signo, NULL, - thr->log); - - break; - - default: - nxt_log_alert(thr->log, "unexpected port_getn(%d) event: " - "ev:%d src:%d obj:%p u:%p", - es->port, event->portev_events, - event->portev_source, event->portev_object, - event->portev_user); - } - } -} |