diff options
Diffstat (limited to '')
-rw-r--r-- | src/nxt_select_engine.c | 370 |
1 files changed, 370 insertions, 0 deletions
diff --git a/src/nxt_select_engine.c b/src/nxt_select_engine.c new file mode 100644 index 00000000..8a6f0710 --- /dev/null +++ b/src/nxt_select_engine.c @@ -0,0 +1,370 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) NGINX, Inc. + */ + +#include <nxt_main.h> + + +static nxt_int_t nxt_select_create(nxt_event_engine_t *engine, + nxt_uint_t mchanges, nxt_uint_t mevents); +static void nxt_select_free(nxt_event_engine_t *engine); +static void nxt_select_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); +static void nxt_select_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); +static nxt_bool_t nxt_select_close(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); +static void nxt_select_enable_read(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); +static void nxt_select_enable_write(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); +static void nxt_select_error_handler(nxt_task_t *task, void *obj, void *data); +static void nxt_select_disable_read(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); +static void nxt_select_disable_write(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); +static void nxt_select_block_read(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); +static void nxt_select_block_write(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); +static void nxt_select_oneshot_read(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); +static void nxt_select_oneshot_write(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); +static void nxt_select_poll(nxt_event_engine_t *engine, nxt_msec_t timeout); + + +const nxt_event_interface_t nxt_select_engine = { + "select", + nxt_select_create, + nxt_select_free, + nxt_select_enable, + nxt_select_disable, + nxt_select_disable, + nxt_select_close, + nxt_select_enable_read, + nxt_select_enable_write, + nxt_select_disable_read, + nxt_select_disable_write, + nxt_select_block_read, + nxt_select_block_write, + nxt_select_oneshot_read, + nxt_select_oneshot_write, + nxt_select_enable_read, + NULL, + NULL, + NULL, + NULL, + nxt_select_poll, + + &nxt_unix_event_conn_io, + + NXT_NO_FILE_EVENTS, + NXT_NO_SIGNAL_EVENTS, +}; + + +static nxt_int_t +nxt_select_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, + nxt_uint_t mevents) +{ + engine->u.select.nfds = -1; + engine->u.select.update_nfds = 0; + + engine->u.select.events = nxt_zalloc(FD_SETSIZE * sizeof(nxt_fd_event_t *)); + + if (engine->u.select.events != NULL) { + return NXT_OK; + } + + nxt_select_free(engine); + + return NXT_ERROR; +} + + +static void +nxt_select_free(nxt_event_engine_t *engine) +{ + nxt_debug(&engine->task, "select free"); + + nxt_free(engine->u.select.events); + + nxt_memzero(&engine->u.select, sizeof(nxt_select_engine_t)); +} + + +static void +nxt_select_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev) +{ + nxt_select_enable_read(engine, ev); + nxt_select_enable_write(engine, ev); +} + + +static void +nxt_select_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev) +{ + if (ev->read != NXT_EVENT_INACTIVE) { + nxt_select_disable_read(engine, ev); + } + + if (ev->write != NXT_EVENT_INACTIVE) { + nxt_select_disable_write(engine, ev); + } +} + + +static nxt_bool_t +nxt_select_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev) +{ + nxt_select_disable(engine, ev); + + return 0; +} + + +static void +nxt_select_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) +{ + nxt_fd_t fd; + + fd = ev->fd; + + nxt_debug(ev->task, "select enable read: fd:%d", fd); + + if (fd < 0 || fd >= (nxt_fd_t) FD_SETSIZE) { + nxt_work_queue_add(&engine->fast_work_queue, nxt_select_error_handler, + ev->task, ev, ev->data); + return; + } + + ev->read = NXT_EVENT_ACTIVE; + + FD_SET(fd, &engine->u.select.main_read_fd_set); + engine->u.select.events[fd] = ev; + + if (engine->u.select.nfds < fd) { + engine->u.select.nfds = fd; + engine->u.select.update_nfds = 0; + } +} + + +static void +nxt_select_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) +{ + nxt_fd_t fd; + + fd = ev->fd; + + nxt_debug(ev->task, "select enable write: fd:%d", fd); + + if (fd < 0 || fd >= (nxt_fd_t) FD_SETSIZE) { + nxt_work_queue_add(&engine->fast_work_queue, nxt_select_error_handler, + ev->task, ev, ev->data); + return; + } + + ev->write = NXT_EVENT_ACTIVE; + + FD_SET(fd, &engine->u.select.main_write_fd_set); + engine->u.select.events[fd] = ev; + + if (engine->u.select.nfds < fd) { + engine->u.select.nfds = fd; + engine->u.select.update_nfds = 0; + } +} + + +static void +nxt_select_error_handler(nxt_task_t *task, void *obj, void *data) +{ + nxt_fd_event_t *ev; + + ev = obj; + + ev->read = NXT_EVENT_INACTIVE; + ev->write = NXT_EVENT_INACTIVE; + + ev->error_handler(task, ev, data); +} + + +static void +nxt_select_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) +{ + nxt_fd_t fd; + + fd = ev->fd; + + nxt_debug(ev->task, "select disable read: fd:%d", fd); + + if (fd < 0 || fd >= (nxt_fd_t) FD_SETSIZE) { + return; + } + + FD_CLR(fd, &engine->u.select.main_read_fd_set); + + ev->read = NXT_EVENT_INACTIVE; + + if (ev->write == NXT_EVENT_INACTIVE) { + engine->u.select.events[fd] = NULL; + engine->u.select.update_nfds = 1; + } +} + + +static void +nxt_select_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) +{ + nxt_fd_t fd; + + fd = ev->fd; + + nxt_debug(ev->task, "select disable write: fd:%d", fd); + + if (fd < 0 || fd >= (nxt_fd_t) FD_SETSIZE) { + return; + } + + FD_CLR(fd, &engine->u.select.main_write_fd_set); + + ev->write = NXT_EVENT_INACTIVE; + + if (ev->read == NXT_EVENT_INACTIVE) { + engine->u.select.events[fd] = NULL; + engine->u.select.update_nfds = 1; + } +} + + +static void +nxt_select_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) +{ + if (ev->read != NXT_EVENT_INACTIVE) { + nxt_select_disable_read(engine, ev); + } +} + + +static void +nxt_select_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) +{ + if (ev->write != NXT_EVENT_INACTIVE) { + nxt_select_disable_write(engine, ev); + } +} + + +static void +nxt_select_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) +{ + nxt_select_enable_read(engine, ev); + + ev->read = NXT_EVENT_ONESHOT; +} + + +static void +nxt_select_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) +{ + nxt_select_enable_write(engine, ev); + + ev->write = NXT_EVENT_ONESHOT; +} + + +static void +nxt_select_poll(nxt_event_engine_t *engine, nxt_msec_t timeout) +{ + int nevents, nfds, found; + nxt_err_t err; + nxt_int_t i; + nxt_uint_t fd, level; + nxt_fd_event_t *ev; + struct timeval tv, *tp; + + if (timeout == NXT_INFINITE_MSEC) { + tp = NULL; + + } else { + tv.tv_sec = (long) (timeout / 1000); + tv.tv_usec = (long) ((timeout % 1000) * 1000); + tp = &tv; + } + + if (engine->u.select.update_nfds) { + for (i = engine->u.select.nfds; i >= 0; i--) { + if (engine->u.select.events[i] != NULL) { + engine->u.select.nfds = i; + engine->u.select.update_nfds = 0; + break; + } + } + } + + engine->u.select.work_read_fd_set = engine->u.select.main_read_fd_set; + engine->u.select.work_write_fd_set = engine->u.select.main_write_fd_set; + + nfds = engine->u.select.nfds + 1; + + nxt_debug(&engine->task, "select() nfds:%d timeout:%M", nfds, timeout); + + nevents = select(nfds, &engine->u.select.work_read_fd_set, + &engine->u.select.work_write_fd_set, NULL, tp); + + err = (nevents == -1) ? nxt_errno : 0; + + nxt_thread_time_update(engine->task.thread); + + nxt_debug(&engine->task, "select(): %d", nevents); + + if (nevents == -1) { + level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_ALERT; + nxt_log(&engine->task, level, "select() failed %E", err); + return; + } + + for (fd = 0; fd < (nxt_uint_t) nfds && nevents != 0; fd++) { + + found = 0; + + if (FD_ISSET(fd, &engine->u.select.work_read_fd_set)) { + ev = engine->u.select.events[fd]; + + nxt_debug(ev->task, "select() fd:%ui read rd:%d wr:%d", + fd, ev->read, ev->write); + + ev->read_ready = 1; + + if (ev->read == NXT_EVENT_ONESHOT) { + nxt_select_disable_read(engine, ev); + } + + nxt_work_queue_add(ev->read_work_queue, ev->read_handler, + ev->task, ev, ev->data); + found = 1; + } + + if (FD_ISSET(fd, &engine->u.select.work_write_fd_set)) { + ev = engine->u.select.events[fd]; + + nxt_debug(ev->task, "select() fd:%ui write rd:%d wr:%d", + fd, ev->read, ev->write); + + ev->write_ready = 1; + + if (ev->write == NXT_EVENT_ONESHOT) { + nxt_select_disable_write(engine, ev); + } + + nxt_work_queue_add(ev->write_work_queue, ev->write_handler, + ev->task, ev, ev->data); + found = 1; + } + + nevents -= found; + } +} |