diff options
author | Igor Sysoev <igor@sysoev.ru> | 2017-02-07 20:04:56 +0300 |
---|---|---|
committer | Igor Sysoev <igor@sysoev.ru> | 2017-02-07 20:04:56 +0300 |
commit | 059a8642898a6bd4b47d13a1c1d599cd44af7e1c (patch) | |
tree | e3c8c530a04f1ae44777d5ea4fd6901dc55a8ebf /src/nxt_epoll.c | |
parent | e57b95a92333fa7ff558737b0ba2b76894cc0412 (diff) | |
download | unit-059a8642898a6bd4b47d13a1c1d599cd44af7e1c.tar.gz unit-059a8642898a6bd4b47d13a1c1d599cd44af7e1c.tar.bz2 |
Event engines refactoring.
Diffstat (limited to '')
-rw-r--r-- | src/nxt_epoll_engine.c (renamed from src/nxt_epoll.c) | 541 |
1 files changed, 264 insertions, 277 deletions
diff --git a/src/nxt_epoll.c b/src/nxt_epoll_engine.c index 42973e12..19e0389b 100644 --- a/src/nxt_epoll.c +++ b/src/nxt_epoll_engine.c @@ -27,63 +27,60 @@ * eventfd2() Linux 2.6.27, glibc 2.9. * accept4() Linux 2.6.28, glibc 2.10. * eventfd2(EFD_SEMAPHORE) Linux 2.6.30, glibc 2.10. + * EPOLLEXCLUSIVE Linux 4.5. */ #if (NXT_HAVE_EPOLL_EDGE) -static nxt_event_set_t *nxt_epoll_edge_create(nxt_event_signals_t *signals, +static nxt_int_t nxt_epoll_edge_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, nxt_uint_t mevents); #endif -static nxt_event_set_t *nxt_epoll_level_create(nxt_event_signals_t *signals, +static nxt_int_t nxt_epoll_level_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, nxt_uint_t mevents); -static nxt_event_set_t *nxt_epoll_create(nxt_event_signals_t *signals, +static nxt_int_t nxt_epoll_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, nxt_uint_t mevents, nxt_event_conn_io_t *io, uint32_t mode); -static void nxt_epoll_test_accept4(nxt_event_conn_io_t *io); -static void nxt_epoll_free(nxt_event_set_t *event_set); -static void nxt_epoll_enable(nxt_event_set_t *event_set, nxt_event_fd_t *ev); -static void nxt_epoll_disable(nxt_event_set_t *event_set, nxt_event_fd_t *ev); -static void nxt_epoll_delete(nxt_event_set_t *event_set, nxt_event_fd_t *ev); -static void nxt_epoll_close(nxt_event_set_t *event_set, nxt_event_fd_t *ev); -static void nxt_epoll_enable_read(nxt_event_set_t *event_set, - nxt_event_fd_t *ev); -static void nxt_epoll_enable_write(nxt_event_set_t *event_set, - nxt_event_fd_t *ev); -static void nxt_epoll_disable_read(nxt_event_set_t *event_set, - nxt_event_fd_t *ev); -static void nxt_epoll_disable_write(nxt_event_set_t *event_set, - nxt_event_fd_t *ev); -static void nxt_epoll_block_read(nxt_event_set_t *event_set, - nxt_event_fd_t *ev); -static void nxt_epoll_block_write(nxt_event_set_t *event_set, - nxt_event_fd_t *ev); -static void nxt_epoll_oneshot_read(nxt_event_set_t *event_set, - nxt_event_fd_t *ev); -static void nxt_epoll_oneshot_write(nxt_event_set_t *event_set, - nxt_event_fd_t *ev); -static void nxt_epoll_enable_accept(nxt_event_set_t *event_set, - nxt_event_fd_t *ev); -static void nxt_epoll_change(nxt_event_set_t *event_set, nxt_event_fd_t *ev, +static void nxt_epoll_test_accept4(nxt_event_engine_t *engine, + nxt_event_conn_io_t *io); +static void nxt_epoll_free(nxt_event_engine_t *engine); +static void nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); +static void nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); +static void nxt_epoll_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev); +static nxt_bool_t nxt_epoll_close(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); +static void nxt_epoll_enable_read(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); +static void nxt_epoll_enable_write(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); +static void nxt_epoll_disable_read(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); +static void nxt_epoll_disable_write(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); +static void nxt_epoll_block_read(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); +static void nxt_epoll_block_write(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); +static void nxt_epoll_oneshot_read(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); +static void nxt_epoll_oneshot_write(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); +static void nxt_epoll_enable_accept(nxt_event_engine_t *engine, + nxt_fd_event_t *ev); +static void nxt_epoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, int op, uint32_t events); -static nxt_int_t nxt_epoll_commit_changes(nxt_task_t *task, - nxt_epoll_event_set_t *es); -static void nxt_epoll_error_handler(nxt_task_t *task, void *obj, - void *data); +static nxt_int_t nxt_epoll_commit_changes(nxt_event_engine_t *engine); +static void nxt_epoll_error_handler(nxt_task_t *task, void *obj, void *data); #if (NXT_HAVE_SIGNALFD) -static nxt_int_t nxt_epoll_add_signal(nxt_epoll_event_set_t *es, - nxt_event_signals_t *signals); -static void nxt_epoll_signalfd_handler(nxt_task_t *task, void *obj, - void *data); +static nxt_int_t nxt_epoll_add_signal(nxt_event_engine_t *engine); +static void nxt_epoll_signalfd_handler(nxt_task_t *task, void *obj, void *data); #endif #if (NXT_HAVE_EVENTFD) -static nxt_int_t nxt_epoll_enable_post(nxt_event_set_t *event_set, +static nxt_int_t nxt_epoll_enable_post(nxt_event_engine_t *engine, nxt_work_handler_t handler); -static void nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, - void *data); -static void nxt_epoll_signal(nxt_event_set_t *event_set, nxt_uint_t signo); +static void nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data); +static void nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo); #endif -static void nxt_epoll_poll(nxt_task_t *task, nxt_event_set_t *event_set, - nxt_msec_t timeout); +static void nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout); #if (NXT_HAVE_ACCEPT4) static void nxt_epoll_event_conn_io_accept4(nxt_task_t *task, void *obj, @@ -125,7 +122,7 @@ static nxt_event_conn_io_t nxt_epoll_edge_event_conn_io = { }; -const nxt_event_set_ops_t nxt_epoll_edge_event_set = { +const nxt_event_interface_t nxt_epoll_edge_engine = { "epoll_edge", nxt_epoll_edge_create, nxt_epoll_free, @@ -171,7 +168,7 @@ const nxt_event_set_ops_t nxt_epoll_edge_event_set = { #endif -const nxt_event_set_ops_t nxt_epoll_level_event_set = { +const nxt_event_interface_t nxt_epoll_level_engine = { "epoll_level", nxt_epoll_level_create, nxt_epoll_free, @@ -217,11 +214,11 @@ const nxt_event_set_ops_t nxt_epoll_level_event_set = { #if (NXT_HAVE_EPOLL_EDGE) -static nxt_event_set_t * -nxt_epoll_edge_create(nxt_event_signals_t *signals, nxt_uint_t mchanges, +static nxt_int_t +nxt_epoll_edge_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, nxt_uint_t mevents) { - return nxt_epoll_create(signals, mchanges, mevents, + return nxt_epoll_create(engine, mchanges, mevents, &nxt_epoll_edge_event_conn_io, EPOLLET | EPOLLRDHUP); } @@ -229,79 +226,71 @@ nxt_epoll_edge_create(nxt_event_signals_t *signals, nxt_uint_t mchanges, #endif -static nxt_event_set_t * -nxt_epoll_level_create(nxt_event_signals_t *signals, nxt_uint_t mchanges, +static nxt_int_t +nxt_epoll_level_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, nxt_uint_t mevents) { - return nxt_epoll_create(signals, mchanges, mevents, + return nxt_epoll_create(engine, mchanges, mevents, &nxt_unix_event_conn_io, 0); } -static nxt_event_set_t * -nxt_epoll_create(nxt_event_signals_t *signals, nxt_uint_t mchanges, +static nxt_int_t +nxt_epoll_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, nxt_uint_t mevents, nxt_event_conn_io_t *io, uint32_t mode) { - nxt_event_set_t *event_set; - nxt_epoll_event_set_t *es; - - event_set = nxt_zalloc(sizeof(nxt_epoll_event_set_t)); - if (event_set == NULL) { - return NULL; - } - - es = &event_set->epoll; - - es->epoll = -1; - es->mode = mode; - es->mchanges = mchanges; - es->mevents = mevents; + engine->u.epoll.fd = -1; + engine->u.epoll.mode = mode; + engine->u.epoll.mchanges = mchanges; + engine->u.epoll.mevents = mevents; #if (NXT_HAVE_SIGNALFD) - es->signalfd.fd = -1; + engine->u.epoll.signalfd.fd = -1; #endif - es->changes = nxt_malloc(sizeof(nxt_epoll_change_t) * mchanges); - if (es->changes == NULL) { + engine->u.epoll.changes = nxt_malloc(sizeof(nxt_epoll_change_t) * mchanges); + if (engine->u.epoll.changes == NULL) { goto fail; } - es->events = nxt_malloc(sizeof(struct epoll_event) * mevents); - if (es->events == NULL) { + engine->u.epoll.events = nxt_malloc(sizeof(struct epoll_event) * mevents); + if (engine->u.epoll.events == NULL) { goto fail; } - es->epoll = epoll_create(1); - if (es->epoll == -1) { - nxt_main_log_emerg("epoll_create() failed %E", nxt_errno); + engine->u.epoll.fd = epoll_create(1); + if (engine->u.epoll.fd == -1) { + nxt_log(&engine->task, NXT_LOG_CRIT, "epoll_create() failed %E", + nxt_errno); goto fail; } - nxt_main_log_debug("epoll_create(): %d", es->epoll); + nxt_debug(&engine->task, "epoll_create(): %d", engine->u.epoll.fd); + + if (engine->signals != NULL) { #if (NXT_HAVE_SIGNALFD) - if (signals != NULL) { - if (nxt_epoll_add_signal(es, signals) != NXT_OK) { + if (nxt_epoll_add_signal(engine) != NXT_OK) { goto fail; } - } #endif - nxt_epoll_test_accept4(io); + nxt_epoll_test_accept4(engine, io); + } - return event_set; + return NXT_OK; fail: - nxt_epoll_free(event_set); + nxt_epoll_free(engine); - return NULL; + return NXT_ERROR; } static void -nxt_epoll_test_accept4(nxt_event_conn_io_t *io) +nxt_epoll_test_accept4(nxt_event_engine_t *engine, nxt_event_conn_io_t *io) { static nxt_work_handler_t handler; @@ -317,8 +306,8 @@ nxt_epoll_test_accept4(nxt_event_conn_io_t *io) handler = nxt_epoll_event_conn_io_accept4; } else { - nxt_main_log_error(NXT_LOG_NOTICE, "accept4() failed %E", - NXT_ENOSYS); + nxt_log(&engine->task, NXT_LOG_INFO, "accept4() failed %E", + NXT_ENOSYS); } #endif @@ -329,81 +318,80 @@ nxt_epoll_test_accept4(nxt_event_conn_io_t *io) static void -nxt_epoll_free(nxt_event_set_t *event_set) +nxt_epoll_free(nxt_event_engine_t *engine) { - nxt_epoll_event_set_t *es; + int fd; - es = &event_set->epoll; - - nxt_main_log_debug("epoll %d free", es->epoll); + nxt_debug(&engine->task, "epoll %d free", engine->u.epoll.fd); #if (NXT_HAVE_SIGNALFD) - if (es->signalfd.fd != -1) { - if (close(es->signalfd.fd) != 0) { - nxt_main_log_emerg("signalfd close(%d) failed %E", - es->signalfd.fd, nxt_errno); - } + fd = engine->u.epoll.signalfd.fd; + + if (fd != -1 && close(fd) != 0) { + nxt_log(&engine->task, NXT_LOG_CRIT, "signalfd close(%d) failed %E", + fd, nxt_errno); } #endif #if (NXT_HAVE_EVENTFD) - if (es->eventfd.fd != -1) { - if (close(es->eventfd.fd) != 0) { - nxt_main_log_emerg("eventfd close(%d) failed %E", - es->eventfd.fd, nxt_errno); - } + fd = engine->u.epoll.eventfd.fd; + + if (fd != -1 && close(fd) != 0) { + nxt_log(&engine->task, NXT_LOG_CRIT, "eventfd close(%d) failed %E", + fd, nxt_errno); } #endif - if (es->epoll != -1) { - if (close(es->epoll) != 0) { - nxt_main_log_emerg("epoll close(%d) failed %E", - es->epoll, nxt_errno); - } + fd = engine->u.epoll.fd; + + if (fd != -1 && close(fd) != 0) { + nxt_log(&engine->task, NXT_LOG_CRIT, "epoll close(%d) failed %E", + fd, nxt_errno); } - nxt_free(es->events); - nxt_free(es); + nxt_free(engine->u.epoll.events); + + nxt_memzero(&engine->u.epoll, sizeof(nxt_epoll_engine_t)); } static void -nxt_epoll_enable(nxt_event_set_t *event_set, nxt_event_fd_t *ev) +nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev) { - ev->read = NXT_EVENT_DEFAULT; - ev->write = NXT_EVENT_DEFAULT; + ev->read = NXT_EVENT_ACTIVE; + ev->write = NXT_EVENT_ACTIVE; - nxt_epoll_change(event_set, ev, EPOLL_CTL_ADD, - EPOLLIN | EPOLLOUT | event_set->epoll.mode); + nxt_epoll_change(engine, ev, EPOLL_CTL_ADD, + EPOLLIN | EPOLLOUT | engine->u.epoll.mode); } static void -nxt_epoll_disable(nxt_event_set_t *event_set, nxt_event_fd_t *ev) +nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev) { if (ev->read > NXT_EVENT_DISABLED || ev->write > NXT_EVENT_DISABLED) { ev->read = NXT_EVENT_INACTIVE; ev->write = NXT_EVENT_INACTIVE; - nxt_epoll_change(event_set, ev, EPOLL_CTL_DEL, 0); + nxt_epoll_change(engine, ev, EPOLL_CTL_DEL, 0); } } static void -nxt_epoll_delete(nxt_event_set_t *event_set, nxt_event_fd_t *ev) +nxt_epoll_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev) { if (ev->read != NXT_EVENT_INACTIVE || ev->write != NXT_EVENT_INACTIVE) { ev->read = NXT_EVENT_INACTIVE; ev->write = NXT_EVENT_INACTIVE; - nxt_epoll_change(event_set, ev, EPOLL_CTL_DEL, 0); + nxt_epoll_change(engine, ev, EPOLL_CTL_DEL, 0); } } @@ -417,23 +405,17 @@ nxt_epoll_delete(nxt_event_set_t *event_set, nxt_event_fd_t *ev) * eliminates possible lock contention. */ -static void -nxt_epoll_close(nxt_event_set_t *event_set, nxt_event_fd_t *ev) +static nxt_bool_t +nxt_epoll_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev) { - nxt_epoll_event_set_t *es; + nxt_epoll_delete(engine, ev); - nxt_epoll_delete(event_set, ev); - - es = &event_set->epoll; - - if (es->nchanges != 0) { - (void) nxt_epoll_commit_changes(ev->task, &event_set->epoll); - } + return ev->changing; } static void -nxt_epoll_enable_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev) +nxt_epoll_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) { int op; uint32_t events; @@ -441,7 +423,7 @@ nxt_epoll_enable_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev) if (ev->read != NXT_EVENT_BLOCKED) { op = EPOLL_CTL_MOD; - events = EPOLLIN | event_set->epoll.mode; + events = EPOLLIN | engine->u.epoll.mode; if (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) { op = EPOLL_CTL_ADD; @@ -450,15 +432,15 @@ nxt_epoll_enable_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev) events |= EPOLLOUT; } - nxt_epoll_change(event_set, ev, op, events); + nxt_epoll_change(engine, ev, op, events); } - ev->read = NXT_EVENT_DEFAULT; + ev->read = NXT_EVENT_ACTIVE; } static void -nxt_epoll_enable_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev) +nxt_epoll_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) { int op; uint32_t events; @@ -466,7 +448,7 @@ nxt_epoll_enable_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev) if (ev->write != NXT_EVENT_BLOCKED) { op = EPOLL_CTL_MOD; - events = EPOLLOUT | event_set->epoll.mode; + events = EPOLLOUT | engine->u.epoll.mode; if (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) { op = EPOLL_CTL_ADD; @@ -475,15 +457,15 @@ nxt_epoll_enable_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev) events |= EPOLLIN; } - nxt_epoll_change(event_set, ev, op, events); + nxt_epoll_change(engine, ev, op, events); } - ev->write = NXT_EVENT_DEFAULT; + ev->write = NXT_EVENT_ACTIVE; } static void -nxt_epoll_disable_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev) +nxt_epoll_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) { int op; uint32_t events; @@ -497,15 +479,15 @@ nxt_epoll_disable_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev) } else { op = EPOLL_CTL_MOD; - events = EPOLLOUT | event_set->epoll.mode; + events = EPOLLOUT | engine->u.epoll.mode; } - nxt_epoll_change(event_set, ev, op, events); + nxt_epoll_change(engine, ev, op, events); } static void -nxt_epoll_disable_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev) +nxt_epoll_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) { int op; uint32_t events; @@ -519,15 +501,15 @@ nxt_epoll_disable_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev) } else { op = EPOLL_CTL_MOD; - events = EPOLLIN | event_set->epoll.mode; + events = EPOLLIN | engine->u.epoll.mode; } - nxt_epoll_change(event_set, ev, op, events); + nxt_epoll_change(engine, ev, op, events); } static void -nxt_epoll_block_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev) +nxt_epoll_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) { if (ev->read != NXT_EVENT_INACTIVE) { ev->read = NXT_EVENT_BLOCKED; @@ -536,7 +518,7 @@ nxt_epoll_block_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev) static void -nxt_epoll_block_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev) +nxt_epoll_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) { if (ev->write != NXT_EVENT_INACTIVE) { ev->write = NXT_EVENT_BLOCKED; @@ -558,7 +540,7 @@ nxt_epoll_block_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev) */ static void -nxt_epoll_oneshot_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev) +nxt_epoll_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev) { int op; @@ -568,12 +550,12 @@ nxt_epoll_oneshot_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev) ev->read = NXT_EVENT_ONESHOT; ev->write = NXT_EVENT_INACTIVE; - nxt_epoll_change(event_set, ev, op, EPOLLIN | EPOLLONESHOT); + nxt_epoll_change(engine, ev, op, EPOLLIN | EPOLLONESHOT); } static void -nxt_epoll_oneshot_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev) +nxt_epoll_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev) { int op; @@ -583,16 +565,16 @@ nxt_epoll_oneshot_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev) ev->read = NXT_EVENT_INACTIVE; ev->write = NXT_EVENT_ONESHOT; - nxt_epoll_change(event_set, ev, op, EPOLLOUT | EPOLLONESHOT); + nxt_epoll_change(engine, ev, op, EPOLLOUT | EPOLLONESHOT); } static void -nxt_epoll_enable_accept(nxt_event_set_t *event_set, nxt_event_fd_t *ev) +nxt_epoll_enable_accept(nxt_event_engine_t *engine, nxt_fd_event_t *ev) { - ev->read = NXT_EVENT_DEFAULT; + ev->read = NXT_EVENT_ACTIVE; - nxt_epoll_change(event_set, ev, EPOLL_CTL_ADD, EPOLLIN); + nxt_epoll_change(engine, ev, EPOLL_CTL_ADD, EPOLLIN); } @@ -602,73 +584,76 @@ nxt_epoll_enable_accept(nxt_event_set_t *event_set, nxt_event_fd_t *ev) */ static void -nxt_epoll_change(nxt_event_set_t *event_set, nxt_event_fd_t *ev, int op, +nxt_epoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, int op, uint32_t events) { - nxt_epoll_change_t *ch; - nxt_epoll_event_set_t *es; + nxt_epoll_change_t *change; - es = &event_set->epoll; + nxt_debug(ev->task, "epoll %d set event: fd:%d op:%d ev:%XD", + engine->u.epoll.fd, ev->fd, op, events); - nxt_log_debug(ev->log, "epoll %d set event: fd:%d op:%d ev:%XD", - es->epoll, ev->fd, op, events); - - if (es->nchanges >= es->mchanges) { - (void) nxt_epoll_commit_changes(ev->task, es); + if (engine->u.epoll.nchanges >= engine->u.epoll.mchanges) { + (void) nxt_epoll_commit_changes(engine); } - ch = &es->changes[es->nchanges++]; - ch->op = op; - ch->fd = ev->fd; - ch->event.events = events; - ch->event.data.ptr = ev; + ev->changing = 1; + + change = &engine->u.epoll.changes[engine->u.epoll.nchanges++]; + change->op = op; + change->event.events = events; + change->event.data.ptr = ev; } static nxt_int_t -nxt_epoll_commit_changes(nxt_task_t *task, nxt_epoll_event_set_t *es) +nxt_epoll_commit_changes(nxt_event_engine_t *engine) { - nxt_int_t ret; - nxt_event_fd_t *ev; - nxt_epoll_change_t *ch, *end; + int ret; + nxt_int_t retval; + nxt_fd_event_t *ev; + nxt_epoll_change_t *change, *end; - nxt_debug(task, "epoll %d changes:%ui", es->epoll, es->nchanges); + nxt_debug(&engine->task, "epoll %d changes:%ui", + engine->u.epoll.fd, engine->u.epoll.nchanges); - ret = NXT_OK; - ch = es->changes; - end = ch + es->nchanges; + retval = NXT_OK; + change = engine->u.epoll.changes; + end = change + engine->u.epoll.nchanges; do { - ev = ch->event.data.ptr; + ev = change->event.data.ptr; + ev->changing = 0; nxt_debug(ev->task, "epoll_ctl(%d): fd:%d op:%d ev:%XD", - es->epoll, ch->fd, ch->op, ch->event.events); + engine->u.epoll.fd, ev->fd, change->op, + change->event.events); - if (epoll_ctl(es->epoll, ch->op, ch->fd, &ch->event) != 0) { + ret = epoll_ctl(engine->u.epoll.fd, change->op, ev->fd, &change->event); + + if (nxt_slow_path(ret != 0)) { nxt_log(ev->task, NXT_LOG_CRIT, "epoll_ctl(%d, %d, %d) failed %E", - es->epoll, ch->op, ch->fd, nxt_errno); + engine->u.epoll.fd, change->op, ev->fd, nxt_errno); - nxt_work_queue_add(&task->thread->engine->fast_work_queue, - nxt_epoll_error_handler, - ev->task, ev, ev->data); + nxt_work_queue_add(&engine->fast_work_queue, + nxt_epoll_error_handler, ev->task, ev, ev->data); - ret = NXT_ERROR; + retval = NXT_ERROR; } - ch++; + change++; - } while (ch < end); + } while (change < end); - es->nchanges = 0; + engine->u.epoll.nchanges = 0; - return ret; + return retval; } static void nxt_epoll_error_handler(nxt_task_t *task, void *obj, void *data) { - nxt_event_fd_t *ev; + nxt_fd_event_t *ev; ev = obj; @@ -682,14 +667,14 @@ nxt_epoll_error_handler(nxt_task_t *task, void *obj, void *data) #if (NXT_HAVE_SIGNALFD) static nxt_int_t -nxt_epoll_add_signal(nxt_epoll_event_set_t *es, nxt_event_signals_t *signals) +nxt_epoll_add_signal(nxt_event_engine_t *engine) { int fd; - nxt_thread_t *thr; struct epoll_event ee; - if (sigprocmask(SIG_BLOCK, &signals->sigmask, NULL) != 0) { - nxt_main_log_alert("sigprocmask(SIG_BLOCK) failed %E", nxt_errno); + if (sigprocmask(SIG_BLOCK, &engine->signals->sigmask, NULL) != 0) { + nxt_log(&engine->task, NXT_LOG_CRIT, + "sigprocmask(SIG_BLOCK) failed %E", nxt_errno); return NXT_ERROR; } @@ -702,36 +687,34 @@ nxt_epoll_add_signal(nxt_epoll_event_set_t *es, nxt_event_signals_t *signals) * is set separately. */ - fd = signalfd(-1, &signals->sigmask, 0); + fd = signalfd(-1, &engine->signals->sigmask, 0); if (fd == -1) { - nxt_main_log_emerg("signalfd(%d) failed %E", - es->signalfd.fd, nxt_errno); + nxt_log(&engine->task, NXT_LOG_CRIT, "signalfd(%d) failed %E", + engine->u.epoll.signalfd.fd, nxt_errno); return NXT_ERROR; } - es->signalfd.fd = fd; + engine->u.epoll.signalfd.fd = fd; if (nxt_fd_nonblocking(fd) != NXT_OK) { return NXT_ERROR; } - nxt_main_log_debug("signalfd(): %d", fd); - - thr = nxt_thread(); + nxt_debug(&engine->task, "signalfd(): %d", fd); - es->signalfd.data = signals->handler; - es->signalfd.read_work_queue = &thr->engine->fast_work_queue; - es->signalfd.read_handler = nxt_epoll_signalfd_handler; - es->signalfd.log = &nxt_main_log; - es->signalfd.task = &thr->engine->task; + engine->u.epoll.signalfd.data = engine->signals->handler; + engine->u.epoll.signalfd.read_work_queue = &engine->fast_work_queue; + engine->u.epoll.signalfd.read_handler = nxt_epoll_signalfd_handler; + engine->u.epoll.signalfd.log = engine->task.log; + engine->u.epoll.signalfd.task = &engine->task; ee.events = EPOLLIN; - ee.data.ptr = &es->signalfd; + ee.data.ptr = &engine->u.epoll.signalfd; - if (epoll_ctl(es->epoll, EPOLL_CTL_ADD, fd, &ee) != 0) { - nxt_main_log_alert("epoll_ctl(%d, %d, %d) failed %E", - es->epoll, EPOLL_CTL_ADD, fd, nxt_errno); + if (epoll_ctl(engine->u.epoll.fd, EPOLL_CTL_ADD, fd, &ee) != 0) { + nxt_log(&engine->task, NXT_LOG_CRIT, "epoll_ctl(%d, %d, %d) failed %E", + engine->u.epoll.fd, EPOLL_CTL_ADD, fd, nxt_errno); return NXT_ERROR; } @@ -744,7 +727,7 @@ static void nxt_epoll_signalfd_handler(nxt_task_t *task, void *obj, void *data) { int n; - nxt_event_fd_t *ev; + nxt_fd_event_t *ev; nxt_work_handler_t handler; struct signalfd_siginfo sfd; @@ -773,14 +756,12 @@ nxt_epoll_signalfd_handler(nxt_task_t *task, void *obj, void *data) #if (NXT_HAVE_EVENTFD) static nxt_int_t -nxt_epoll_enable_post(nxt_event_set_t *event_set, nxt_work_handler_t handler) +nxt_epoll_enable_post(nxt_event_engine_t *engine, nxt_work_handler_t handler) { - nxt_thread_t *thr; - struct epoll_event ee; - nxt_epoll_event_set_t *es; + int ret; + struct epoll_event ee; - es = &event_set->epoll; - es->post_handler = handler; + engine->u.epoll.post_handler = handler; /* * Glibc eventfd() wrapper always has the flags argument. Glibc 2.7 @@ -791,36 +772,38 @@ nxt_epoll_enable_post(nxt_event_set_t *event_set, nxt_work_handler_t handler) * is set separately. */ - es->eventfd.fd = eventfd(0, 0); + engine->u.epoll.eventfd.fd = eventfd(0, 0); - if (es->eventfd.fd == -1) { - nxt_main_log_emerg("eventfd() failed %E", nxt_errno); + if (engine->u.epoll.eventfd.fd == -1) { + nxt_log(&engine->task, NXT_LOG_CRIT, "eventfd() failed %E", nxt_errno); return NXT_ERROR; } - if (nxt_fd_nonblocking(es->eventfd.fd) != NXT_OK) { + if (nxt_fd_nonblocking(engine->u.epoll.eventfd.fd) != NXT_OK) { return NXT_ERROR; } - nxt_main_log_debug("eventfd(): %d", es->eventfd.fd); + nxt_debug(&engine->task, "eventfd(): %d", engine->u.epoll.eventfd.fd); - thr = nxt_thread(); - - es->eventfd.read_work_queue = &thr->engine->fast_work_queue; - es->eventfd.read_handler = nxt_epoll_eventfd_handler; - es->eventfd.data = es; - es->eventfd.log = &nxt_main_log; - es->eventfd.task = &thr->engine->task; + engine->u.epoll.eventfd.read_work_queue = &engine->fast_work_queue; + engine->u.epoll.eventfd.read_handler = nxt_epoll_eventfd_handler; + engine->u.epoll.eventfd.data = engine; + engine->u.epoll.eventfd.log = engine->task.log; + engine->u.epoll.eventfd.task = &engine->task; ee.events = EPOLLIN | EPOLLET; - ee.data.ptr = &es->eventfd; + ee.data.ptr = &engine->u.epoll.eventfd; + + ret = epoll_ctl(engine->u.epoll.fd, EPOLL_CTL_ADD, + engine->u.epoll.eventfd.fd, &ee); - if (epoll_ctl(es->epoll, EPOLL_CTL_ADD, es->eventfd.fd, &ee) == 0) { + if (nxt_fast_path(ret == 0)) { return NXT_OK; } - nxt_main_log_alert("epoll_ctl(%d, %d, %d) failed %E", - es->epoll, EPOLL_CTL_ADD, es->eventfd.fd, nxt_errno); + nxt_log(&engine->task, NXT_LOG_CRIT, "epoll_ctl(%d, %d, %d) failed %E", + engine->u.epoll.fd, EPOLL_CTL_ADD, engine->u.epoll.eventfd.fd, + nxt_errno); return NXT_ERROR; } @@ -829,13 +812,13 @@ nxt_epoll_enable_post(nxt_event_set_t *event_set, nxt_work_handler_t handler) static void nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data) { - int n; - uint64_t events; - nxt_epoll_event_set_t *es; + int n; + uint64_t events; + nxt_event_engine_t *engine; - es = data; + engine = data; - nxt_debug(task, "eventfd handler, times:%ui", es->neventfd); + nxt_debug(task, "eventfd handler, times:%ui", engine->u.epoll.neventfd); /* * The maximum value after write() to a eventfd() descriptor will @@ -846,30 +829,29 @@ nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data) * only the latest write() to the descriptor. */ - if (es->neventfd++ >= 0xfffffffe) { - es->neventfd = 0; + if (engine->u.epoll.neventfd++ >= 0xfffffffe) { + engine->u.epoll.neventfd = 0; - n = read(es->eventfd.fd, &events, sizeof(uint64_t)); + n = read(engine->u.epoll.eventfd.fd, &events, sizeof(uint64_t)); - nxt_debug(task, "read(%d): %d events:%uL", es->eventfd.fd, n, events); + nxt_debug(task, "read(%d): %d events:%uL", + engine->u.epoll.eventfd.fd, n, events); if (n != sizeof(uint64_t)) { nxt_log(task, NXT_LOG_CRIT, "read eventfd(%d) failed %E", - es->eventfd.fd, nxt_errno); + engine->u.epoll.eventfd.fd, nxt_errno); } } - es->post_handler(task, NULL, NULL); + engine->u.epoll.post_handler(task, NULL, NULL); } static void -nxt_epoll_signal(nxt_event_set_t *event_set, nxt_uint_t signo) +nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo) { - uint64_t event; - nxt_epoll_event_set_t *es; - - es = &event_set->epoll; + size_t ret; + uint64_t event; /* * eventfd() presents along with signalfd(), so the function @@ -878,9 +860,11 @@ nxt_epoll_signal(nxt_event_set_t *event_set, nxt_uint_t signo) event = 1; - if (write(es->eventfd.fd, &event, sizeof(uint64_t)) != sizeof(uint64_t)) { - nxt_thread_log_alert("write(%d) to eventfd failed %E", - es->eventfd.fd, nxt_errno); + ret = write(engine->u.epoll.eventfd.fd, &event, sizeof(uint64_t)); + + if (nxt_slow_path(ret != sizeof(uint64_t))) { + nxt_log(&engine->task, NXT_LOG_CRIT, "write(%d) to eventfd failed %E", + engine->u.epoll.eventfd.fd, nxt_errno); } } @@ -888,47 +872,48 @@ nxt_epoll_signal(nxt_event_set_t *event_set, nxt_uint_t signo) static void -nxt_epoll_poll(nxt_task_t *task, nxt_event_set_t *event_set, - nxt_msec_t timeout) +nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout) { - int nevents; - uint32_t events; - nxt_int_t i; - nxt_err_t err; - nxt_bool_t error; - nxt_uint_t level; - nxt_event_fd_t *ev; - struct epoll_event *event; - nxt_epoll_event_set_t *es; - - es = &event_set->epoll; - - if (es->nchanges != 0) { - if (nxt_epoll_commit_changes(task, es) != NXT_OK) { + int nevents; + uint32_t events; + nxt_int_t i; + nxt_err_t err; + nxt_bool_t error; + nxt_uint_t level; + nxt_fd_event_t *ev; + struct epoll_event *event; + + if (engine->u.epoll.nchanges != 0) { + if (nxt_epoll_commit_changes(engine) != NXT_OK) { /* Error handlers have been enqueued on failure. */ timeout = 0; } } - nxt_debug(task, "epoll_wait(%d) timeout:%M", es->epoll, timeout); + nxt_debug(&engine->task, "epoll_wait(%d) timeout:%M", + engine->u.epoll.fd, timeout); - nevents = epoll_wait(es->epoll, es->events, es->mevents, timeout); + nevents = epoll_wait(engine->u.epoll.fd, engine->u.epoll.events, + engine->u.epoll.mevents, timeout); err = (nevents == -1) ? nxt_errno : 0; - nxt_thread_time_update(task->thread); + nxt_thread_time_update(engine->task.thread); - nxt_debug(task, "epoll_wait(%d): %d", es->epoll, nevents); + nxt_debug(&engine->task, "epoll_wait(%d): %d", engine->u.epoll.fd, nevents); if (nevents == -1) { - level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_ALERT; - nxt_log(task, level, "epoll_wait(%d) failed %E", es->epoll, err); + level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_CRIT; + + nxt_log(&engine->task, level, "epoll_wait(%d) failed %E", + engine->u.epoll.fd, err); + return; } for (i = 0; i < nevents; i++) { - event = &es->events[i]; + event = &engine->u.epoll.events[i]; events = event->events; ev = event->data.ptr; @@ -962,9 +947,9 @@ nxt_epoll_poll(nxt_task_t *task, nxt_event_set_t *event_set, nxt_work_queue_add(ev->read_work_queue, ev->read_handler, ev->task, ev, ev->data); - } else if (event_set->epoll.mode == 0) { + } else if (engine->u.epoll.mode == 0) { /* Level-triggered mode. */ - nxt_epoll_disable_read(event_set, ev); + nxt_epoll_disable_read(engine, ev); } } @@ -982,9 +967,9 @@ nxt_epoll_poll(nxt_task_t *task, nxt_event_set_t *event_set, nxt_work_queue_add(ev->write_work_queue, ev->write_handler, ev->task, ev, ev->data); - } else if (event_set->epoll.mode == 0) { + } else if (engine->u.epoll.mode == 0) { /* Level-triggered mode. */ - nxt_epoll_disable_write(event_set, ev); + nxt_epoll_disable_write(engine, ev); } } @@ -1056,6 +1041,7 @@ static void nxt_epoll_edge_event_conn_io_connect(nxt_task_t *task, void *obj, void *data) { nxt_event_conn_t *c; + nxt_event_engine_t *engine; nxt_work_handler_t handler; const nxt_event_conn_state_t *state; @@ -1074,15 +1060,16 @@ nxt_epoll_edge_event_conn_io_connect(nxt_task_t *task, void *obj, void *data) c->socket.write_handler = nxt_epoll_edge_event_conn_connected; c->socket.error_handler = nxt_event_conn_connect_error; - nxt_event_conn_timer(task->thread->engine, c, state, &c->write_timer); + engine = task->thread->engine; + nxt_event_conn_timer(engine, c, state, &c->write_timer); - nxt_epoll_enable(task->thread->engine->event_set, &c->socket); + nxt_epoll_enable(engine, &c->socket); c->socket.read = NXT_EVENT_BLOCKED; return; #if 0 case NXT_AGAIN: - nxt_event_conn_timer(thr->engine, c, state, &c->write_timer); + nxt_event_conn_timer(engine, c, state, &c->write_timer); /* Fall through. */ @@ -1102,7 +1089,7 @@ nxt_epoll_edge_event_conn_io_connect(nxt_task_t *task, void *obj, void *data) c->socket.write_handler = nxt_epoll_edge_event_conn_connected; c->socket.error_handler = state->error_handler; - nxt_epoll_enable(thr->engine->event_set, &c->socket); + nxt_epoll_enable(engine, &c->socket); c->socket.read = NXT_EVENT_BLOCKED; handler = state->ready_handler; |