summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_select.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nxt_select.c')
-rw-r--r--src/nxt_select.c393
1 files changed, 393 insertions, 0 deletions
diff --git a/src/nxt_select.c b/src/nxt_select.c
new file mode 100644
index 00000000..a9713ac0
--- /dev/null
+++ b/src/nxt_select.c
@@ -0,0 +1,393 @@
+
+/*
+ * Copyright (C) Igor Sysoev
+ * Copyright (C) NGINX, Inc.
+ */
+
+#include <nxt_main.h>
+
+
+static nxt_event_set_t *nxt_select_create(nxt_event_signals_t *signals,
+ nxt_uint_t mchanges, nxt_uint_t mevents);
+static void nxt_select_free(nxt_event_set_t *event_set);
+static void nxt_select_enable(nxt_event_set_t *event_set, nxt_event_fd_t *ev);
+static void nxt_select_disable(nxt_event_set_t *event_set, nxt_event_fd_t *ev);
+static void nxt_select_enable_read(nxt_event_set_t *event_set,
+ nxt_event_fd_t *ev);
+static void nxt_select_enable_write(nxt_event_set_t *event_set,
+ nxt_event_fd_t *ev);
+static void nxt_select_error_handler(nxt_thread_t *thr, void *obj,
+ void *data);
+static void nxt_select_disable_read(nxt_event_set_t *event_set,
+ nxt_event_fd_t *ev);
+static void nxt_select_disable_write(nxt_event_set_t *event_set,
+ nxt_event_fd_t *ev);
+static void nxt_select_block_read(nxt_event_set_t *event_set,
+ nxt_event_fd_t *ev);
+static void nxt_select_block_write(nxt_event_set_t *event_set,
+ nxt_event_fd_t *ev);
+static void nxt_select_oneshot_read(nxt_event_set_t *event_set,
+ nxt_event_fd_t *ev);
+static void nxt_select_oneshot_write(nxt_event_set_t *event_set,
+ nxt_event_fd_t *ev);
+static void nxt_select_poll(nxt_thread_t *thr, nxt_event_set_t *event_set,
+ nxt_msec_t timeout);
+
+
+const nxt_event_set_ops_t nxt_select_event_set = {
+ "select",
+ nxt_select_create,
+ nxt_select_free,
+ nxt_select_enable,
+ nxt_select_disable,
+ nxt_select_disable,
+ nxt_select_disable,
+ nxt_select_enable_read,
+ nxt_select_enable_write,
+ nxt_select_disable_read,
+ nxt_select_disable_write,
+ nxt_select_block_read,
+ nxt_select_block_write,
+ nxt_select_oneshot_read,
+ nxt_select_oneshot_write,
+ nxt_select_enable_read,
+ NULL,
+ NULL,
+ NULL,
+ NULL,
+ nxt_select_poll,
+
+ &nxt_unix_event_conn_io,
+
+ NXT_NO_FILE_EVENTS,
+ NXT_NO_SIGNAL_EVENTS,
+};
+
+
+static nxt_event_set_t *
+nxt_select_create(nxt_event_signals_t *signals, nxt_uint_t mchanges,
+ nxt_uint_t mevents)
+{
+ nxt_event_set_t *event_set;
+ nxt_select_event_set_t *ss;
+
+ event_set = nxt_zalloc(sizeof(nxt_select_event_set_t));
+ if (event_set == NULL) {
+ return NULL;
+ }
+
+ ss = &event_set->select;
+
+ ss->nfds = -1;
+ ss->update_nfds = 0;
+
+ ss->events = nxt_zalloc(FD_SETSIZE * sizeof(nxt_event_fd_t *));
+ if (ss->events != NULL) {
+ return event_set;
+ }
+
+ nxt_select_free(event_set);
+
+ return NULL;
+}
+
+
+static void
+nxt_select_free(nxt_event_set_t *event_set)
+{
+ nxt_select_event_set_t *ss;
+
+ nxt_main_log_debug("select free");
+
+ ss = &event_set->select;
+
+ nxt_free(ss->events);
+ nxt_free(ss);
+}
+
+
+static void
+nxt_select_enable(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
+{
+ nxt_select_enable_read(event_set, ev);
+ nxt_select_enable_write(event_set, ev);
+}
+
+
+static void
+nxt_select_disable(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
+{
+ if (ev->read != NXT_EVENT_INACTIVE) {
+ nxt_select_disable_read(event_set, ev);
+ }
+
+ if (ev->write != NXT_EVENT_INACTIVE) {
+ nxt_select_disable_write(event_set, ev);
+ }
+}
+
+
+static void
+nxt_select_enable_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
+{
+ nxt_fd_t fd;
+ nxt_thread_t *thr;
+ nxt_select_event_set_t *ss;
+
+ fd = ev->fd;
+
+ nxt_log_debug(ev->log, "select enable read: fd:%d", fd);
+
+ ss = &event_set->select;
+
+ if (fd < 0 || fd >= (nxt_fd_t) FD_SETSIZE) {
+ thr = nxt_thread();
+ nxt_thread_work_queue_add(thr, &thr->work_queue.main,
+ nxt_select_error_handler,
+ ev, ev->data, ev->log);
+ return;
+ }
+
+ ev->read = NXT_EVENT_DEFAULT;
+
+ FD_SET(fd, &ss->main_read_fd_set);
+ ss->events[fd] = ev;
+
+ if (ss->nfds < fd) {
+ ss->nfds = fd;
+ ss->update_nfds = 0;
+ }
+}
+
+
+static void
+nxt_select_enable_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
+{
+ nxt_fd_t fd;
+ nxt_thread_t *thr;
+ nxt_select_event_set_t *ss;
+
+ fd = ev->fd;
+
+ nxt_log_debug(ev->log, "select enable write: fd:%d", fd);
+
+ ss = &event_set->select;
+
+ if (fd < 0 || fd >= (nxt_fd_t) FD_SETSIZE) {
+ thr = nxt_thread();
+ nxt_thread_work_queue_add(thr, &thr->work_queue.main,
+ nxt_select_error_handler,
+ ev, ev->data, ev->log);
+ return;
+ }
+
+ ev->write = NXT_EVENT_DEFAULT;
+
+ FD_SET(fd, &ss->main_write_fd_set);
+ ss->events[fd] = ev;
+
+ if (ss->nfds < fd) {
+ ss->nfds = fd;
+ ss->update_nfds = 0;
+ }
+}
+
+
+static void
+nxt_select_error_handler(nxt_thread_t *thr, void *obj, void *data)
+{
+ nxt_event_fd_t *ev;
+
+ ev = obj;
+
+ ev->read = NXT_EVENT_INACTIVE;
+ ev->write = NXT_EVENT_INACTIVE;
+
+ ev->error_handler(thr, ev, data);
+}
+
+
+static void
+nxt_select_disable_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
+{
+ nxt_fd_t fd;
+ nxt_select_event_set_t *ss;
+
+ fd = ev->fd;
+
+ nxt_log_debug(ev->log, "select disable read: fd:%d", fd);
+
+ if (fd < 0 || fd >= (nxt_fd_t) FD_SETSIZE) {
+ return;
+ }
+
+ ss = &event_set->select;
+ FD_CLR(fd, &ss->main_read_fd_set);
+
+ ev->read = NXT_EVENT_INACTIVE;
+
+ if (ev->write == NXT_EVENT_INACTIVE) {
+ ss->events[fd] = NULL;
+ ss->update_nfds = 1;
+ }
+}
+
+
+static void
+nxt_select_disable_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
+{
+ nxt_fd_t fd;
+ nxt_select_event_set_t *ss;
+
+ fd = ev->fd;
+
+ nxt_log_debug(ev->log, "select disable write: fd:%d", fd);
+
+ if (fd < 0 || fd >= (nxt_fd_t) FD_SETSIZE) {
+ return;
+ }
+
+ ss = &event_set->select;
+ FD_CLR(fd, &ss->main_write_fd_set);
+
+ ev->write = NXT_EVENT_INACTIVE;
+
+ if (ev->read == NXT_EVENT_INACTIVE) {
+ ss->events[fd] = NULL;
+ ss->update_nfds = 1;
+ }
+}
+
+
+static void
+nxt_select_block_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
+{
+ if (ev->read != NXT_EVENT_INACTIVE) {
+ nxt_select_disable_read(event_set, ev);
+ }
+}
+
+
+static void
+nxt_select_block_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
+{
+ if (ev->write != NXT_EVENT_INACTIVE) {
+ nxt_select_disable_write(event_set, ev);
+ }
+}
+
+
+static void
+nxt_select_oneshot_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
+{
+ nxt_select_enable_read(event_set, ev);
+
+ ev->read = NXT_EVENT_ONESHOT;
+}
+
+
+static void
+nxt_select_oneshot_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
+{
+ nxt_select_enable_write(event_set, ev);
+
+ ev->write = NXT_EVENT_ONESHOT;
+}
+
+
+static void
+nxt_select_poll(nxt_thread_t *thr, nxt_event_set_t *event_set,
+ nxt_msec_t timeout)
+{
+ int nevents, nfds, found;
+ nxt_err_t err;
+ nxt_int_t i;
+ nxt_uint_t fd, level;
+ nxt_event_fd_t *ev;
+ struct timeval tv, *tp;
+ nxt_select_event_set_t *ss;
+
+ if (timeout == NXT_INFINITE_MSEC) {
+ tp = NULL;
+
+ } else {
+ tv.tv_sec = (long) (timeout / 1000);
+ tv.tv_usec = (long) ((timeout % 1000) * 1000);
+ tp = &tv;
+ }
+
+ ss = &event_set->select;
+
+ if (ss->update_nfds) {
+ for (i = ss->nfds; i >= 0; i--) {
+ if (ss->events[i] != NULL) {
+ ss->nfds = i;
+ ss->update_nfds = 0;
+ break;
+ }
+ }
+ }
+
+ ss->work_read_fd_set = ss->main_read_fd_set;
+ ss->work_write_fd_set = ss->main_write_fd_set;
+
+ nfds = ss->nfds + 1;
+
+ nxt_log_debug(thr->log, "select() nfds:%d timeout:%M", nfds, timeout);
+
+ nevents = select(nfds, &ss->work_read_fd_set, &ss->work_write_fd_set,
+ NULL, tp);
+
+ err = (nevents == -1) ? nxt_errno : 0;
+
+ nxt_thread_time_update(thr);
+
+ nxt_log_debug(thr->log, "select(): %d", nevents);
+
+ if (nevents == -1) {
+ level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_ALERT;
+ nxt_log_error(level, thr->log, "select() failed %E", err);
+ return;
+ }
+
+ for (fd = 0; fd < (nxt_uint_t) nfds && nevents != 0; fd++) {
+
+ found = 0;
+
+ if (FD_ISSET(fd, &ss->work_read_fd_set)) {
+ ev = ss->events[fd];
+
+ nxt_log_debug(ev->log, "select() fd:%ui read rd:%d wr:%d",
+ fd, ev->read, ev->write);
+
+ ev->read_ready = 1;
+
+ if (ev->read == NXT_EVENT_ONESHOT) {
+ nxt_select_disable_read(event_set, ev);
+ }
+
+ nxt_thread_work_queue_add(thr, ev->read_work_queue,
+ ev->read_handler, ev, ev->data, ev->log);
+ found = 1;
+ }
+
+ if (FD_ISSET(fd, &ss->work_write_fd_set)) {
+ ev = ss->events[fd];
+
+ nxt_log_debug(ev->log, "select() fd:%ui write rd:%d wr:%d",
+ fd, ev->read, ev->write);
+
+ ev->write_ready = 1;
+
+ if (ev->write == NXT_EVENT_ONESHOT) {
+ nxt_select_disable_write(event_set, ev);
+ }
+
+ nxt_thread_work_queue_add(thr, ev->write_work_queue,
+ ev->write_handler, ev, ev->data, ev->log);
+ found = 1;
+ }
+
+ nevents -= found;
+ }
+}