summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_eventport.c
diff options
context:
space:
mode:
authorIgor Sysoev <igor@sysoev.ru>2017-02-07 20:04:56 +0300
committerIgor Sysoev <igor@sysoev.ru>2017-02-07 20:04:56 +0300
commit059a8642898a6bd4b47d13a1c1d599cd44af7e1c (patch)
treee3c8c530a04f1ae44777d5ea4fd6901dc55a8ebf /src/nxt_eventport.c
parente57b95a92333fa7ff558737b0ba2b76894cc0412 (diff)
downloadunit-059a8642898a6bd4b47d13a1c1d599cd44af7e1c.tar.gz
unit-059a8642898a6bd4b47d13a1c1d599cd44af7e1c.tar.bz2
Event engines refactoring.
Diffstat (limited to '')
-rw-r--r--src/nxt_eventport.c646
1 files changed, 0 insertions, 646 deletions
diff --git a/src/nxt_eventport.c b/src/nxt_eventport.c
deleted file mode 100644
index 02984573..00000000
--- a/src/nxt_eventport.c
+++ /dev/null
@@ -1,646 +0,0 @@
-
-/*
- * Copyright (C) Igor Sysoev
- * Copyright (C) NGINX, Inc.
- */
-
-#include <nxt_main.h>
-
-
-/*
- * The event ports have been introduced in Solaris 10.
- * The PORT_SOURCE_MQ and PORT_SOURCE_FILE sources have
- * been added in OpenSolaris.
- */
-
-
-static nxt_event_set_t *nxt_eventport_create(nxt_event_signals_t *signals,
- nxt_uint_t mchanges, nxt_uint_t mevents);
-static void nxt_eventport_free(nxt_event_set_t *event_set);
-static void nxt_eventport_enable(nxt_event_set_t *event_set,
- nxt_event_fd_t *ev);
-static void nxt_eventport_disable(nxt_event_set_t *event_set,
- nxt_event_fd_t *ev);
-static void nxt_eventport_close(nxt_event_set_t *event_set, nxt_event_fd_t *ev);
-static void nxt_eventport_drop_changes(nxt_event_set_t *event_set,
- nxt_event_fd_t *ev);
-static void nxt_eventport_enable_read(nxt_event_set_t *event_set,
- nxt_event_fd_t *ev);
-static void nxt_eventport_enable_write(nxt_event_set_t *event_set,
- nxt_event_fd_t *ev);
-static void nxt_eventport_enable_event(nxt_event_set_t *event_set,
- nxt_event_fd_t *ev, nxt_uint_t events);
-static void nxt_eventport_disable_read(nxt_event_set_t *event_set,
- nxt_event_fd_t *ev);
-static void nxt_eventport_disable_write(nxt_event_set_t *event_set,
- nxt_event_fd_t *ev);
-static void nxt_eventport_disable_event(nxt_event_set_t *event_set,
- nxt_event_fd_t *ev);
-static nxt_int_t nxt_eventport_commit_changes(nxt_thread_t *thr,
- nxt_eventport_event_set_t *es);
-static void nxt_eventport_error_handler(nxt_thread_t *thr, void *obj,
- void *data);
-static void nxt_eventport_block_read(nxt_event_set_t *event_set,
- nxt_event_fd_t *ev);
-static void nxt_eventport_block_write(nxt_event_set_t *event_set,
- nxt_event_fd_t *ev);
-static void nxt_eventport_oneshot_read(nxt_event_set_t *event_set,
- nxt_event_fd_t *ev);
-static void nxt_eventport_oneshot_write(nxt_event_set_t *event_set,
- nxt_event_fd_t *ev);
-static void nxt_eventport_enable_accept(nxt_event_set_t *event_set,
- nxt_event_fd_t *ev);
-static nxt_int_t nxt_eventport_enable_post(nxt_event_set_t *event_set,
- nxt_work_handler_t handler);
-static void nxt_eventport_signal(nxt_event_set_t *event_set, nxt_uint_t signo);
-static void nxt_eventport_poll(nxt_thread_t *thr, nxt_event_set_t *event_set,
- nxt_msec_t timeout);
-
-
-const nxt_event_set_ops_t nxt_eventport_event_set = {
- "eventport",
- nxt_eventport_create,
- nxt_eventport_free,
- nxt_eventport_enable,
- nxt_eventport_disable,
- nxt_eventport_disable,
- nxt_eventport_close,
- nxt_eventport_enable_read,
- nxt_eventport_enable_write,
- nxt_eventport_disable_read,
- nxt_eventport_disable_write,
- nxt_eventport_block_read,
- nxt_eventport_block_write,
- nxt_eventport_oneshot_read,
- nxt_eventport_oneshot_write,
- nxt_eventport_enable_accept,
- NULL,
- NULL,
- nxt_eventport_enable_post,
- nxt_eventport_signal,
- nxt_eventport_poll,
-
- &nxt_unix_event_conn_io,
-
- NXT_NO_FILE_EVENTS,
- NXT_NO_SIGNAL_EVENTS,
-};
-
-
-static nxt_event_set_t *
-nxt_eventport_create(nxt_event_signals_t *signals, nxt_uint_t mchanges,
- nxt_uint_t mevents)
-{
- nxt_event_set_t *event_set;
- nxt_eventport_event_set_t *es;
-
- event_set = nxt_zalloc(sizeof(nxt_eventport_event_set_t));
- if (event_set == NULL) {
- return NULL;
- }
-
- es = &event_set->eventport;
-
- es->port = -1;
- es->mchanges = mchanges;
- es->mevents = mevents;
-
- es->changes = nxt_malloc(sizeof(nxt_eventport_change_t) * mchanges);
- if (es->changes == NULL) {
- goto fail;
- }
-
- es->events = nxt_malloc(sizeof(port_event_t) * mevents);
- if (es->events == NULL) {
- goto fail;
- }
-
- es->port = port_create();
- if (es->port == -1) {
- nxt_main_log_emerg("port_create() failed %E", nxt_errno);
- goto fail;
- }
-
- nxt_main_log_debug("port_create(): %d", es->port);
-
- if (signals != NULL) {
- es->signal_handler = signals->handler;
- }
-
- return event_set;
-
-fail:
-
- nxt_eventport_free(event_set);
-
- return NULL;
-}
-
-
-static void
-nxt_eventport_free(nxt_event_set_t *event_set)
-{
- nxt_eventport_event_set_t *es;
-
- es = &event_set->eventport;
-
- nxt_main_log_debug("eventport %d free", es->port);
-
- if (es->port != -1) {
- if (close(es->port) != 0) {
- nxt_main_log_emerg("eventport close(%d) failed %E",
- es->port, nxt_errno);
- }
- }
-
- nxt_free(es->events);
- nxt_free(es);
-}
-
-
-static void
-nxt_eventport_enable(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
-{
- ev->read = NXT_EVENT_DEFAULT;
- ev->write = NXT_EVENT_DEFAULT;
-
- nxt_eventport_enable_event(event_set, ev, POLLIN | POLLOUT);
-}
-
-
-static void
-nxt_eventport_disable(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
-{
- if (ev->read != NXT_EVENT_INACTIVE || ev->write != NXT_EVENT_INACTIVE) {
-
- ev->read = NXT_EVENT_INACTIVE;
- ev->write = NXT_EVENT_INACTIVE;
-
- nxt_eventport_disable_event(event_set, ev);
- }
-}
-
-
-static void
-nxt_eventport_close(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
-{
- ev->read = NXT_EVENT_INACTIVE;
- ev->write = NXT_EVENT_INACTIVE;
-
- nxt_eventport_drop_changes(event_set, ev);
-}
-
-
-static void
-nxt_eventport_drop_changes(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
-{
- nxt_eventport_change_t *dst, *src, *end;
- nxt_eventport_event_set_t *es;
-
- es = &event_set->eventport;
-
- dst = es->changes;
- end = dst + es->nchanges;
-
- for (src = dst; src < end; src++) {
-
- if (src->event == ev) {
- continue;
- }
-
- if (dst != src) {
- *dst = *src;
- }
-
- dst++;
- }
-
- es->nchanges -= end - dst;
-}
-
-
-static void
-nxt_eventport_enable_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
-{
- nxt_uint_t events;
-
- if (ev->read != NXT_EVENT_BLOCKED) {
- events = (ev->write == NXT_EVENT_INACTIVE) ? POLLIN:
- (POLLIN | POLLOUT);
- nxt_eventport_enable_event(event_set, ev, events);
- }
-
- ev->read = NXT_EVENT_DEFAULT;
-}
-
-
-static void
-nxt_eventport_enable_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
-{
- nxt_uint_t events;
-
- if (ev->write != NXT_EVENT_BLOCKED) {
- events = (ev->read == NXT_EVENT_INACTIVE) ? POLLOUT:
- (POLLIN | POLLOUT);
- nxt_eventport_enable_event(event_set, ev, events);
- }
-
- ev->write = NXT_EVENT_DEFAULT;
-}
-
-
-/*
- * eventport changes are batched to improve instruction and data
- * cache locality of several port_associate() and port_dissociate()
- * calls followed by port_getn() call.
- */
-
-static void
-nxt_eventport_enable_event(nxt_event_set_t *event_set, nxt_event_fd_t *ev,
- nxt_uint_t events)
-{
- nxt_eventport_change_t *ch;
- nxt_eventport_event_set_t *es;
-
- es = &event_set->eventport;
-
- nxt_log_debug(ev->log, "port %d set event: fd:%d ev:%04XD u:%p",
- es->port, ev->fd, events, ev);
-
- if (es->nchanges >= es->mchanges) {
- (void) nxt_eventport_commit_changes(nxt_thread(), es);
- }
-
- ch = &es->changes[es->nchanges++];
- ch->fd = ev->fd;
- ch->events = events;
- ch->event = ev;
-}
-
-
-static void
-nxt_eventport_disable_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
-{
- ev->read = NXT_EVENT_INACTIVE;
-
- if (ev->write == NXT_EVENT_INACTIVE) {
- nxt_eventport_disable_event(event_set, ev);
-
- } else {
- nxt_eventport_enable_event(event_set, ev, POLLOUT);
- }
-}
-
-
-static void
-nxt_eventport_disable_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
-{
- ev->write = NXT_EVENT_INACTIVE;
-
- if (ev->read == NXT_EVENT_INACTIVE) {
- nxt_eventport_disable_event(event_set, ev);
-
- } else {
- nxt_eventport_enable_event(event_set, ev, POLLIN);
- }
-}
-
-
-static void
-nxt_eventport_disable_event(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
-{
- nxt_eventport_change_t *ch;
- nxt_eventport_event_set_t *es;
-
- es = &event_set->eventport;
-
- nxt_log_debug(ev->log, "port %d disable event : fd:%d", es->port, ev->fd);
-
- if (es->nchanges >= es->mchanges) {
- (void) nxt_eventport_commit_changes(nxt_thread(), es);
- }
-
- ch = &es->changes[es->nchanges++];
- ch->fd = ev->fd;
- ch->events = 0;
- ch->event = ev;
-}
-
-
-static nxt_int_t
-nxt_eventport_commit_changes(nxt_thread_t *thr, nxt_eventport_event_set_t *es)
-{
- int ret;
- nxt_int_t retval;
- nxt_event_fd_t *ev;
- nxt_eventport_change_t *ch, *end;
-
- nxt_log_debug(thr->log, "eventport %d changes:%ui", es->port, es->nchanges);
-
- retval = NXT_OK;
- ch = es->changes;
- end = ch + es->nchanges;
-
- do {
- ev = ch->event;
-
- if (ch->events != 0) {
- nxt_log_debug(ev->log, "port_associate(%d): fd:%d ev:%04XD u:%p",
- es->port, ch->fd, ch->events, ev);
-
- ret = port_associate(es->port, PORT_SOURCE_FD, ch->fd,
- ch->events, ev);
- if (ret == 0) {
- goto next;
- }
-
- nxt_log_alert(ev->log,
- "port_associate(%d, %d, %d, %04XD) failed %E",
- es->port, PORT_SOURCE_FD, ch->fd, ch->events,
- nxt_errno);
-
- } else {
- nxt_log_debug(ev->log, "port_dissociate(%d): fd:%d",
- es->port, ch->fd);
-
- if (port_dissociate(es->port, PORT_SOURCE_FD, ch->fd) == 0) {
- goto next;
- }
-
- nxt_log_alert(ev->log, "port_dissociate(%d, %d, %d) failed %E",
- es->port, PORT_SOURCE_FD, ch->fd, nxt_errno);
- }
-
- nxt_thread_work_queue_add(thr, &thr->work_queue.main,
- nxt_eventport_error_handler,
- ev, ev->data, ev->log);
-
- retval = NXT_ERROR;
-
- next:
-
- ch++;
-
- } while (ch < end);
-
- es->nchanges = 0;
-
- return retval;
-}
-
-
-static void
-nxt_eventport_error_handler(nxt_thread_t *thr, void *obj, void *data)
-{
- nxt_event_fd_t *ev;
-
- ev = obj;
-
- ev->read = NXT_EVENT_INACTIVE;
- ev->write = NXT_EVENT_INACTIVE;
-
- ev->error_handler(thr, ev, data);
-}
-
-
-static void
-nxt_eventport_block_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
-{
- if (ev->read != NXT_EVENT_INACTIVE) {
- ev->read = NXT_EVENT_BLOCKED;
- }
-}
-
-
-static void
-nxt_eventport_block_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
-{
- if (ev->write != NXT_EVENT_INACTIVE) {
- ev->write = NXT_EVENT_BLOCKED;
- }
-}
-
-
-static void
-nxt_eventport_oneshot_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
-{
- if (ev->read == NXT_EVENT_INACTIVE) {
- ev->read = NXT_EVENT_DEFAULT;
-
- nxt_eventport_enable_event(event_set, ev, POLLIN);
- }
-}
-
-
-static void
-nxt_eventport_oneshot_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
-{
- if (ev->write == NXT_EVENT_INACTIVE) {
- ev->write = NXT_EVENT_DEFAULT;
-
- nxt_eventport_enable_event(event_set, ev, POLLOUT);
- }
-}
-
-
-static void
-nxt_eventport_enable_accept(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
-{
- ev->read = NXT_EVENT_LEVEL;
-
- nxt_eventport_enable_event(event_set, ev, POLLIN);
-}
-
-
-static nxt_int_t
-nxt_eventport_enable_post(nxt_event_set_t *event_set,
- nxt_work_handler_t handler)
-{
- event_set->eventport.post_handler = handler;
-
- return NXT_OK;
-}
-
-
-static void
-nxt_eventport_signal(nxt_event_set_t *event_set, nxt_uint_t signo)
-{
- nxt_eventport_event_set_t *es;
-
- es = &event_set->eventport;
-
- nxt_thread_log_debug("port_send(%d, %ui)", es->port, signo);
-
- if (port_send(es->port, signo, NULL) != 0) {
- nxt_thread_log_alert("port_send(%d) failed %E", es->port, nxt_errno);
- }
-}
-
-
-static void
-nxt_eventport_poll(nxt_thread_t *thr, nxt_event_set_t *event_set,
- nxt_msec_t timeout)
-{
- int n, events, signo;
- uint_t nevents;
- nxt_err_t err;
- nxt_uint_t i, level;
- timespec_t ts, *tp;
- port_event_t *event;
- nxt_event_fd_t *ev;
- nxt_work_handler_t handler;
- nxt_eventport_event_set_t *es;
-
- es = &event_set->eventport;
-
- if (es->nchanges != 0) {
- if (nxt_eventport_commit_changes(thr, es) != NXT_OK) {
- /* Error handlers have been enqueued on failure. */
- timeout = 0;
- }
- }
-
- if (timeout == NXT_INFINITE_MSEC) {
- tp = NULL;
-
- } else {
- ts.tv_sec = timeout / 1000;
- ts.tv_nsec = (timeout % 1000) * 1000000;
- tp = &ts;
- }
-
- nxt_log_debug(thr->log, "port_getn(%d) timeout: %M", es->port, timeout);
-
- /*
- * A trap for possible error when Solaris does not update nevents
- * if ETIME or EINTR is returned. This issue will be logged as
- * "unexpected port_getn() event".
- *
- * The details are in OpenSolaris mailing list thread "port_getn()
- * and timeouts - is this a bug or an undocumented feature?"
- */
- event = &es->events[0];
- event->portev_events = -1; /* invalid port events */
- event->portev_source = -1; /* invalid port source */
- event->portev_object = -1;
- event->portev_user = (void *) -1;
-
- nevents = 1;
- n = port_getn(es->port, es->events, es->mevents, &nevents, tp);
-
- /*
- * 32-bit port_getn() on Solaris 10 x86 returns large negative
- * values instead of 0 when returning immediately.
- */
- err = (n < 0) ? nxt_errno : 0;
-
- nxt_thread_time_update(thr);
-
- if (n == -1) {
- if (err == NXT_ETIME || err == NXT_EINTR) {
- if (nevents != 0) {
- nxt_log_alert(thr->log, "port_getn(%d) failed %E, events:%ud",
- es->port, err, nevents);
- }
- }
-
- if (err != NXT_ETIME) {
- level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_ALERT;
- nxt_log_error(level, thr->log, "port_getn(%d) failed %E",
- es->port, err);
-
- if (err != NXT_EINTR) {
- return;
- }
- }
- }
-
- nxt_log_debug(thr->log, "port_getn(%d) events: %d", es->port, nevents);
-
- for (i = 0; i < nevents; i++) {
- event = &es->events[i];
-
- switch (event->portev_source) {
-
- case PORT_SOURCE_FD:
- ev = event->portev_user;
- events = event->portev_events;
-
- nxt_log_debug(ev->log, "eventport: fd:%d ev:%04Xd u:%p rd:%d wr:%d",
- event->portev_object, events, ev,
- ev->read, ev->write);
-
- if (nxt_slow_path(events & (POLLERR | POLLHUP | POLLNVAL)) != 0) {
- nxt_log_alert(ev->log,
- "port_getn(%d) error fd:%d events:%04Xud",
- es->port, ev->fd, events);
-
- nxt_thread_work_queue_add(thr, &thr->work_queue.main,
- nxt_eventport_error_handler,
- ev, ev->data, ev->log);
- continue;
- }
-
- if (events & POLLIN) {
- ev->read_ready = 1;
-
- if (ev->read != NXT_EVENT_BLOCKED) {
- nxt_thread_work_queue_add(thr, ev->read_work_queue,
- ev->read_handler,
- ev, ev->data, ev->log);
-
- }
-
- if (ev->read != NXT_EVENT_LEVEL) {
- ev->read = NXT_EVENT_INACTIVE;
- }
- }
-
- if (events & POLLOUT) {
- ev->write_ready = 1;
-
- if (ev->write != NXT_EVENT_BLOCKED) {
- nxt_thread_work_queue_add(thr, ev->write_work_queue,
- ev->write_handler,
- ev, ev->data, ev->log);
- }
-
- ev->write = NXT_EVENT_INACTIVE;
- }
-
- /*
- * Reactivate counterpart direction, because the
- * eventport is oneshot notification facility.
- */
- events = (ev->read == NXT_EVENT_INACTIVE) ? 0 : POLLIN;
- events |= (ev->write == NXT_EVENT_INACTIVE) ? 0 : POLLOUT;
-
- if (events != 0) {
- nxt_eventport_enable_event(event_set, ev, events);
- }
-
- break;
-
- case PORT_SOURCE_USER:
- nxt_log_debug(thr->log, "eventport: user ev:%d u:%p",
- event->portev_events, event->portev_user);
-
- signo = event->portev_events;
-
- handler = (signo == 0) ? es->post_handler : es->signal_handler;
-
- nxt_thread_work_queue_add(thr, &thr->work_queue.main, handler,
- (void *) (uintptr_t) signo, NULL,
- thr->log);
-
- break;
-
- default:
- nxt_log_alert(thr->log, "unexpected port_getn(%d) event: "
- "ev:%d src:%d obj:%p u:%p",
- es->port, event->portev_events,
- event->portev_source, event->portev_object,
- event->portev_user);
- }
- }
-}