diff options
author | Igor Sysoev <igor@sysoev.ru> | 2017-02-07 20:04:56 +0300 |
---|---|---|
committer | Igor Sysoev <igor@sysoev.ru> | 2017-02-07 20:04:56 +0300 |
commit | 059a8642898a6bd4b47d13a1c1d599cd44af7e1c (patch) | |
tree | e3c8c530a04f1ae44777d5ea4fd6901dc55a8ebf /src/nxt_pollset_engine.c | |
parent | e57b95a92333fa7ff558737b0ba2b76894cc0412 (diff) | |
download | unit-059a8642898a6bd4b47d13a1c1d599cd44af7e1c.tar.gz unit-059a8642898a6bd4b47d13a1c1d599cd44af7e1c.tar.bz2 |
Event engines refactoring.
Diffstat (limited to 'src/nxt_pollset_engine.c')
-rw-r--r-- | src/nxt_pollset_engine.c | 647 |
1 files changed, 647 insertions, 0 deletions
diff --git a/src/nxt_pollset_engine.c b/src/nxt_pollset_engine.c new file mode 100644 index 00000000..571ad794 --- /dev/null +++ b/src/nxt_pollset_engine.c @@ -0,0 +1,647 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) NGINX, Inc. + */ + +#include <nxt_main.h> + + +/* + * pollset has been introduced in AIX 5L 5.3. + * + * pollset_create() returns a pollset_t descriptor which is not + * a file descriptor, so it cannot be added to another pollset. + * The first pollset_create() call returns 0. + */ + + +#define NXT_POLLSET_ADD 0 +#define NXT_POLLSET_UPDATE 1 +#define NXT_POLLSET_CHANGE 2 +#define NXT_POLLSET_DELETE 3 + + +static nxt_int_t nxt_pollset_create(nxt_event_engine_t *engine, + nxt_uint_t mchanges, nxt_uint_t mevents); +static void nxt_pollset_free(nxt_event_engine_t *engine); +static void nxt_pollset_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); +static void nxt_pollset_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); +static nxt_bool_t nxt_pollset_close(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); +static void nxt_pollset_enable_read(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); +static void nxt_pollset_enable_write(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); +static void nxt_pollset_disable_read(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); +static void nxt_pollset_disable_write(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); +static void nxt_pollset_block_read(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); +static void nxt_pollset_block_write(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); +static void nxt_pollset_oneshot_read(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); +static void nxt_pollset_oneshot_write(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); +static void nxt_pollset_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, + nxt_uint_t op, nxt_uint_t events); +static nxt_int_t nxt_pollset_commit_changes(nxt_event_engine_t *engine); +static void nxt_pollset_change_error(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); +static void nxt_pollset_remove(nxt_event_engine_t *engine, nxt_fd_t fd); +static nxt_int_t nxt_pollset_write(nxt_event_engine_t *engine, + struct poll_ctl *ctl, int n); +static void nxt_pollset_poll(nxt_event_engine_t *engine, nxt_msec_t timeout); + + +const nxt_event_interface_t nxt_pollset_engine = { + "pollset", + nxt_pollset_create, + nxt_pollset_free, + nxt_pollset_enable, + nxt_pollset_disable, + nxt_pollset_disable, + nxt_pollset_close, + nxt_pollset_enable_read, + nxt_pollset_enable_write, + nxt_pollset_disable_read, + nxt_pollset_disable_write, + nxt_pollset_block_read, + nxt_pollset_block_write, + nxt_pollset_oneshot_read, + nxt_pollset_oneshot_write, + nxt_pollset_enable_read, + NULL, + NULL, + NULL, + NULL, + nxt_pollset_poll, + + &nxt_unix_event_conn_io, + + NXT_NO_FILE_EVENTS, + NXT_NO_SIGNAL_EVENTS, +}; + + +static nxt_int_t +nxt_pollset_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, + nxt_uint_t mevents) +{ + void *changes; + + engine->u.pollset.ps = -1; + engine->u.pollset.mchanges = mchanges; + engine->u.pollset.mevents = mevents; + + changes = nxt_malloc(sizeof(nxt_pollset_change_t) * mchanges); + if (changes == NULL) { + goto fail; + } + + engine->u.pollset.changes = changes; + + /* + * NXT_POLLSET_CHANGE requires two struct poll_ctl's + * for PS_DELETE and subsequent PS_ADD. + */ + changes = nxt_malloc(2 * sizeof(struct poll_ctl) * mchanges); + if (changes == NULL) { + goto fail; + } + + engine->u.pollset.write_changes = changes; + + engine->u.pollset.events = nxt_malloc(sizeof(struct pollfd) * mevents); + if (engine->u.pollset.events == NULL) { + goto fail; + } + + engine->u.pollset.ps = pollset_create(-1); + + if (engine->u.pollset.ps == -1) { + nxt_log(&engine->task, NXT_LOG_CRIT, "pollset_create() failed %E", + nxt_errno); + goto fail; + } + + nxt_debug(&engine->task, "pollset_create(): %d", engine->u.pollset.ps); + + return NXT_OK; + +fail: + + nxt_pollset_free(engine); + + return NXT_ERROR; +} + + +static void +nxt_pollset_free(nxt_event_engine_t *engine) +{ + pollset_t ps; + + ps = engine->u.pollset.ps; + + nxt_debug(&engine->task, "pollset %d free", ps); + + if (ps != -1 && pollset_destroy(ps) != 0) { + nxt_log(&engine->task, NXT_LOG_CRIT, "pollset_destroy(%d) failed %E", + ps, nxt_errno); + } + + nxt_free(engine->u.pollset.events); + nxt_free(engine->u.pollset.write_changes); + nxt_free(engine->u.pollset.changes); + nxt_fd_event_hash_destroy(&engine->u.pollset.fd_hash); + + nxt_memzero(&engine->u.pollset, sizeof(nxt_pollset_engine_t)); +} + + +static void +nxt_pollset_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev) +{ + ev->read = NXT_EVENT_ACTIVE; + ev->write = NXT_EVENT_ACTIVE; + + nxt_pollset_change(engine, ev, NXT_POLLSET_ADD, POLLIN | POLLOUT); +} + + +static void +nxt_pollset_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev) +{ + if (ev->read != NXT_EVENT_INACTIVE || ev->write != NXT_EVENT_INACTIVE) { + + ev->read = NXT_EVENT_INACTIVE; + ev->write = NXT_EVENT_INACTIVE; + + nxt_pollset_change(engine, ev, NXT_POLLSET_DELETE, 0); + } +} + + +/* + * A closed descriptor must be deleted from a pollset, otherwise next + * pollset_poll() will return POLLNVAL on it. However, pollset_ctl() + * allows to delete the already closed file descriptor from the pollset + * using PS_DELETE, so the removal can be batched, pollset_ctl(2): + * + * After a file descriptor is added to a pollset, the file descriptor will + * not be removed until a pollset_ctl call with the cmd of PS_DELETE is + * executed. The file descriptor remains in the pollset even if the file + * descriptor is closed. A pollset_poll operation on a pollset containing + * a closed file descriptor returns a POLLNVAL event for that file + * descriptor. If the file descriptor is later allocated to a new object, + * the new object will be polled on future pollset_poll calls. + */ + +static nxt_bool_t +nxt_pollset_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev) +{ + nxt_pollset_disable(engine, ev); + + return ev->changing; +} + + +static void +nxt_pollset_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) +{ + nxt_uint_t op, events; + + if (ev->read != NXT_EVENT_BLOCKED) { + + events = POLLIN; + + if (ev->write == NXT_EVENT_INACTIVE) { + op = NXT_POLLSET_ADD; + + } else if (ev->write == NXT_EVENT_BLOCKED) { + ev->write = NXT_EVENT_INACTIVE; + op = NXT_POLLSET_CHANGE; + + } else { + op = NXT_POLLSET_UPDATE; + events = POLLIN | POLLOUT; + } + + nxt_pollset_change(engine, ev, op, events); + } + + ev->read = NXT_EVENT_ACTIVE; +} + + +static void +nxt_pollset_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) +{ + nxt_uint_t op, events; + + if (ev->write != NXT_EVENT_BLOCKED) { + + events = POLLOUT; + + if (ev->read == NXT_EVENT_INACTIVE) { + op = NXT_POLLSET_ADD; + + } else if (ev->read == NXT_EVENT_BLOCKED) { + ev->read = NXT_EVENT_INACTIVE; + op = NXT_POLLSET_CHANGE; + + } else { + op = NXT_POLLSET_UPDATE; + events = POLLIN | POLLOUT; + } + + nxt_pollset_change(engine, ev, op, events); + } + + ev->write = NXT_EVENT_ACTIVE; +} + + +static void +nxt_pollset_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) +{ + nxt_uint_t op, events; + + ev->read = NXT_EVENT_INACTIVE; + + if (ev->write <= NXT_EVENT_BLOCKED) { + ev->write = NXT_EVENT_INACTIVE; + op = NXT_POLLSET_DELETE; + events = POLLREMOVE; + + } else { + op = NXT_POLLSET_CHANGE; + events = POLLOUT; + } + + nxt_pollset_change(engine, ev, op, events); +} + + +static void +nxt_pollset_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) +{ + nxt_uint_t op, events; + + ev->write = NXT_EVENT_INACTIVE; + + if (ev->read <= NXT_EVENT_BLOCKED) { + ev->read = NXT_EVENT_INACTIVE; + op = NXT_POLLSET_DELETE; + events = POLLREMOVE; + + } else { + op = NXT_POLLSET_CHANGE; + events = POLLIN; + } + + nxt_pollset_change(engine, ev, op, events); +} + + +static void +nxt_pollset_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) +{ + if (ev->read != NXT_EVENT_INACTIVE) { + ev->read = NXT_EVENT_BLOCKED; + } +} + + +static void +nxt_pollset_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) +{ + if (ev->write != NXT_EVENT_INACTIVE) { + ev->write = NXT_EVENT_BLOCKED; + } +} + + +static void +nxt_pollset_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) +{ + nxt_pollset_enable_read(engine, ev); + + ev->read = NXT_EVENT_ONESHOT; +} + + +static void +nxt_pollset_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) +{ + nxt_pollset_enable_write(engine, ev); + + ev->write = NXT_EVENT_ONESHOT; +} + + +/* + * PS_ADD adds only a new file descriptor to a pollset. + * PS_DELETE removes a file descriptor from a pollset. + * + * PS_MOD can add a new file descriptor or modify events for a file + * descriptor which is already in a pollset. However, modified events + * are always ORed, so to delete an event for a file descriptor, + * the file descriptor must be removed using PS_DELETE and then + * added again without the event. + */ + +static void +nxt_pollset_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, + nxt_uint_t op, nxt_uint_t events) +{ + nxt_pollset_change_t *change; + + nxt_debug(ev->task, "pollset %d change fd:%d op:%ui ev:%04Xi", + engine->u.pollset.ps, ev->fd, op, events); + + if (engine->u.pollset.nchanges >= engine->u.pollset.mchanges) { + (void) nxt_pollset_commit_changes(engine); + } + + ev->changing = 1; + + change = &engine->u.pollset.changes[engine->u.pollset.nchanges++]; + change->op = op; + change->cmd = (op == NXT_POLLSET_DELETE) ? PS_DELETE : PS_MOD; + change->events = events; + change->event = ev; +} + + +static nxt_int_t +nxt_pollset_commit_changes(nxt_event_engine_t *engine) +{ + size_t n; + nxt_int_t ret, retval; + nxt_fd_event_t *ev; + struct poll_ctl *ctl, *write_changes; + nxt_pollset_change_t *change, *end; + + nxt_debug(&engine->task, "pollset %d changes:%ui", + engine->u.pollset.ps, engine->u.pollset.nchanges); + + retval = NXT_OK; + n = 0; + write_changes = engine->u.pollset.write_changes; + change = engine->u.pollset.changes; + end = change + engine->u.pollset.nchanges; + + do { + ev = change->event; + ev->changing = 0; + + nxt_debug(&engine->task, "pollset fd:%d op:%d ev:%04Xd", + ev->fd, change->op, change->events); + + if (change->op == NXT_POLLSET_CHANGE) { + ctl = &write_changes[n++]; + ctl->cmd = PS_DELETE; + ctl->events = 0; + ctl->fd = ev->fd; + } + + ctl = &write_changes[n++]; + ctl->cmd = change->cmd; + ctl->events = change->events; + ctl->fd = ev->fd; + + change++; + + } while (change < end); + + change = engine->u.pollset.changes; + end = change + engine->u.pollset.nchanges; + + ret = nxt_pollset_write(engine, write_changes, n); + + if (nxt_slow_path(ret != NXT_OK)) { + + do { + nxt_pollset_change_error(engine, change->event); + change++; + } while (change < end); + + engine->u.pollset.nchanges = 0; + + return NXT_ERROR; + } + + do { + ev = change->event; + + if (change->op == NXT_POLLSET_ADD) { + ret = nxt_fd_event_hash_add(&engine->u.pollset.fd_hash, ev->fd, ev); + + if (nxt_slow_path(ret != NXT_OK)) { + nxt_pollset_change_error(engine, ev); + retval = NXT_ERROR; + } + + } else if (change->op == NXT_POLLSET_DELETE) { + nxt_fd_event_hash_delete(&engine->task, &engine->u.pollset.fd_hash, + ev->fd, 0); + } + + /* Nothing to do for NXT_POLLSET_UPDATE and NXT_POLLSET_CHANGE. */ + + change++; + + } while (change < end); + + engine->u.pollset.nchanges = 0; + + return retval; +} + + +static void +nxt_pollset_change_error(nxt_event_engine_t *engine, nxt_fd_event_t *ev) +{ + ev->read = NXT_EVENT_INACTIVE; + ev->write = NXT_EVENT_INACTIVE; + + nxt_work_queue_add(&engine->fast_work_queue, ev->error_handler, + ev->task, ev, ev->data); + + nxt_fd_event_hash_delete(&engine->task, &engine->u.pollset.fd_hash, + ev->fd, 1); + + nxt_pollset_remove(engine, ev->fd); +} + + +static void +nxt_pollset_remove(nxt_event_engine_t *engine, nxt_fd_t fd) +{ + int n; + struct pollfd pfd; + struct poll_ctl ctl; + + pfd.fd = fd; + pfd.events = 0; + pfd.revents = 0; + + n = pollset_query(engine->u.pollset.ps, &pfd); + + nxt_debug(&engine->task, "pollset_query(%d, %d): %d", + engine->u.pollset.ps, fd, n); + + if (n == 0) { + /* The file descriptor is not in the pollset. */ + return; + } + + if (n == -1) { + nxt_log(&engine->task, NXT_LOG_CRIT, "pollset_query(%d, %d) failed %E", + engine->u.pollset.ps, fd, nxt_errno); + /* Fall through. */ + } + + /* n == 1: The file descriptor is in the pollset. */ + + nxt_debug(&engine->task, "pollset %d remove fd:%d", + engine->u.pollset.ps, fd); + + ctl.cmd = PS_DELETE; + ctl.events = 0; + ctl.fd = fd; + + nxt_pollset_write(engine, &ctl, 1); +} + + +static nxt_int_t +nxt_pollset_write(nxt_event_engine_t *engine, struct poll_ctl *ctl, int n) +{ + pollset_t ps; + + ps = engine->u.pollset.ps; + + nxt_debug(&engine->task, "pollset_ctl(%d) changes:%d", ps, n); + + nxt_set_errno(0); + + n = pollset_ctl(ps, ctl, n); + + if (nxt_fast_path(n == 0)) { + return NXT_OK; + } + + nxt_log(&engine->task, NXT_LOG_CRIT, "pollset_ctl(%d) failed: %d %E", + ps, n, nxt_errno); + + return NXT_ERROR; +} + + +static void +nxt_pollset_poll(nxt_event_engine_t *engine, nxt_msec_t timeout) +{ + int nevents; + nxt_fd_t fd; + nxt_int_t i; + nxt_err_t err; + nxt_uint_t events, level; + struct pollfd *pfd; + nxt_fd_event_t *ev; + + if (engine->u.pollset.nchanges != 0) { + if (nxt_pollset_commit_changes(engine) != NXT_OK) { + /* Error handlers have been enqueued on failure. */ + timeout = 0; + } + } + + nxt_debug(&engine->task, "pollset_poll(%d) timeout:%M", + engine->u.pollset.ps, timeout); + + nevents = pollset_poll(engine->u.pollset.ps, engine->u.pollset.events, + engine->u.pollset.mevents, timeout); + + err = (nevents == -1) ? nxt_errno : 0; + + nxt_thread_time_update(engine->task.thread); + + nxt_debug(&engine->task, "pollset_poll(%d): %d", + engine->u.pollset.ps, nevents); + + if (nevents == -1) { + level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_CRIT; + + nxt_log(&engine->task, level, "pollset_poll(%d) failed %E", + engine->u.pollset.ps, err); + + return; + } + + for (i = 0; i < nevents; i++) { + + pfd = &engine->u.pollset.events[i]; + fd = pfd->fd; + events = pfd->revents; + + ev = nxt_fd_event_hash_get(&engine->task, &engine->u.pollset.fd_hash, + fd); + + if (nxt_slow_path(ev == NULL)) { + nxt_log(&engine->task, NXT_LOG_CRIT, + "pollset_poll(%d) returned invalid " + "fd:%d ev:%04Xd rev:%04uXi", + engine->u.pollset.ps, fd, pfd->events, events); + + nxt_pollset_remove(engine, fd); + continue; + } + + nxt_debug(ev->task, "pollset: fd:%d ev:%04uXi", fd, events); + + if (nxt_slow_path(events & (POLLERR | POLLHUP | POLLNVAL)) != 0) { + nxt_log(ev->task, NXT_LOG_CRIT, + "pollset_poll(%d) error fd:%d ev:%04Xd rev:%04uXi", + engine->u.pollset.ps, fd, pfd->events, events); + + nxt_work_queue_add(&engine->fast_work_queue, ev->error_handler, + ev->task, ev, ev->data); + continue; + } + + if (events & POLLIN) { + ev->read_ready = 1; + + if (ev->read != NXT_EVENT_BLOCKED) { + nxt_work_queue_add(ev->read_work_queue, ev->read_handler, + ev->task, ev, ev->data); + } + + if (ev->read == NXT_EVENT_BLOCKED + || ev->read == NXT_EVENT_ONESHOT) + { + nxt_pollset_disable_read(engine, ev); + } + } + + if (events & POLLOUT) { + ev->write_ready = 1; + + if (ev->write != NXT_EVENT_BLOCKED) { + nxt_work_queue_add(ev->write_work_queue, ev->write_handler, + ev->task, ev, ev->data); + } + + if (ev->write == NXT_EVENT_BLOCKED + || ev->write == NXT_EVENT_ONESHOT) + { + nxt_pollset_disable_write(engine, ev); + } + } + } +} |