summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_epoll_engine.c
diff options
context:
space:
mode:
authorIgor Sysoev <igor@sysoev.ru>2017-02-07 20:04:56 +0300
committerIgor Sysoev <igor@sysoev.ru>2017-02-07 20:04:56 +0300
commit059a8642898a6bd4b47d13a1c1d599cd44af7e1c (patch)
treee3c8c530a04f1ae44777d5ea4fd6901dc55a8ebf /src/nxt_epoll_engine.c
parente57b95a92333fa7ff558737b0ba2b76894cc0412 (diff)
downloadunit-059a8642898a6bd4b47d13a1c1d599cd44af7e1c.tar.gz
unit-059a8642898a6bd4b47d13a1c1d599cd44af7e1c.tar.bz2
Event engines refactoring.
Diffstat (limited to '')
-rw-r--r--src/nxt_epoll_engine.c (renamed from src/nxt_epoll.c)541
1 files changed, 264 insertions, 277 deletions
diff --git a/src/nxt_epoll.c b/src/nxt_epoll_engine.c
index 42973e12..19e0389b 100644
--- a/src/nxt_epoll.c
+++ b/src/nxt_epoll_engine.c
@@ -27,63 +27,60 @@
* eventfd2() Linux 2.6.27, glibc 2.9.
* accept4() Linux 2.6.28, glibc 2.10.
* eventfd2(EFD_SEMAPHORE) Linux 2.6.30, glibc 2.10.
+ * EPOLLEXCLUSIVE Linux 4.5.
*/
#if (NXT_HAVE_EPOLL_EDGE)
-static nxt_event_set_t *nxt_epoll_edge_create(nxt_event_signals_t *signals,
+static nxt_int_t nxt_epoll_edge_create(nxt_event_engine_t *engine,
nxt_uint_t mchanges, nxt_uint_t mevents);
#endif
-static nxt_event_set_t *nxt_epoll_level_create(nxt_event_signals_t *signals,
+static nxt_int_t nxt_epoll_level_create(nxt_event_engine_t *engine,
nxt_uint_t mchanges, nxt_uint_t mevents);
-static nxt_event_set_t *nxt_epoll_create(nxt_event_signals_t *signals,
+static nxt_int_t nxt_epoll_create(nxt_event_engine_t *engine,
nxt_uint_t mchanges, nxt_uint_t mevents, nxt_event_conn_io_t *io,
uint32_t mode);
-static void nxt_epoll_test_accept4(nxt_event_conn_io_t *io);
-static void nxt_epoll_free(nxt_event_set_t *event_set);
-static void nxt_epoll_enable(nxt_event_set_t *event_set, nxt_event_fd_t *ev);
-static void nxt_epoll_disable(nxt_event_set_t *event_set, nxt_event_fd_t *ev);
-static void nxt_epoll_delete(nxt_event_set_t *event_set, nxt_event_fd_t *ev);
-static void nxt_epoll_close(nxt_event_set_t *event_set, nxt_event_fd_t *ev);
-static void nxt_epoll_enable_read(nxt_event_set_t *event_set,
- nxt_event_fd_t *ev);
-static void nxt_epoll_enable_write(nxt_event_set_t *event_set,
- nxt_event_fd_t *ev);
-static void nxt_epoll_disable_read(nxt_event_set_t *event_set,
- nxt_event_fd_t *ev);
-static void nxt_epoll_disable_write(nxt_event_set_t *event_set,
- nxt_event_fd_t *ev);
-static void nxt_epoll_block_read(nxt_event_set_t *event_set,
- nxt_event_fd_t *ev);
-static void nxt_epoll_block_write(nxt_event_set_t *event_set,
- nxt_event_fd_t *ev);
-static void nxt_epoll_oneshot_read(nxt_event_set_t *event_set,
- nxt_event_fd_t *ev);
-static void nxt_epoll_oneshot_write(nxt_event_set_t *event_set,
- nxt_event_fd_t *ev);
-static void nxt_epoll_enable_accept(nxt_event_set_t *event_set,
- nxt_event_fd_t *ev);
-static void nxt_epoll_change(nxt_event_set_t *event_set, nxt_event_fd_t *ev,
+static void nxt_epoll_test_accept4(nxt_event_engine_t *engine,
+ nxt_event_conn_io_t *io);
+static void nxt_epoll_free(nxt_event_engine_t *engine);
+static void nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
+static void nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
+static void nxt_epoll_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
+static nxt_bool_t nxt_epoll_close(nxt_event_engine_t *engine,
+ nxt_fd_event_t *ev);
+static void nxt_epoll_enable_read(nxt_event_engine_t *engine,
+ nxt_fd_event_t *ev);
+static void nxt_epoll_enable_write(nxt_event_engine_t *engine,
+ nxt_fd_event_t *ev);
+static void nxt_epoll_disable_read(nxt_event_engine_t *engine,
+ nxt_fd_event_t *ev);
+static void nxt_epoll_disable_write(nxt_event_engine_t *engine,
+ nxt_fd_event_t *ev);
+static void nxt_epoll_block_read(nxt_event_engine_t *engine,
+ nxt_fd_event_t *ev);
+static void nxt_epoll_block_write(nxt_event_engine_t *engine,
+ nxt_fd_event_t *ev);
+static void nxt_epoll_oneshot_read(nxt_event_engine_t *engine,
+ nxt_fd_event_t *ev);
+static void nxt_epoll_oneshot_write(nxt_event_engine_t *engine,
+ nxt_fd_event_t *ev);
+static void nxt_epoll_enable_accept(nxt_event_engine_t *engine,
+ nxt_fd_event_t *ev);
+static void nxt_epoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev,
int op, uint32_t events);
-static nxt_int_t nxt_epoll_commit_changes(nxt_task_t *task,
- nxt_epoll_event_set_t *es);
-static void nxt_epoll_error_handler(nxt_task_t *task, void *obj,
- void *data);
+static nxt_int_t nxt_epoll_commit_changes(nxt_event_engine_t *engine);
+static void nxt_epoll_error_handler(nxt_task_t *task, void *obj, void *data);
#if (NXT_HAVE_SIGNALFD)
-static nxt_int_t nxt_epoll_add_signal(nxt_epoll_event_set_t *es,
- nxt_event_signals_t *signals);
-static void nxt_epoll_signalfd_handler(nxt_task_t *task, void *obj,
- void *data);
+static nxt_int_t nxt_epoll_add_signal(nxt_event_engine_t *engine);
+static void nxt_epoll_signalfd_handler(nxt_task_t *task, void *obj, void *data);
#endif
#if (NXT_HAVE_EVENTFD)
-static nxt_int_t nxt_epoll_enable_post(nxt_event_set_t *event_set,
+static nxt_int_t nxt_epoll_enable_post(nxt_event_engine_t *engine,
nxt_work_handler_t handler);
-static void nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj,
- void *data);
-static void nxt_epoll_signal(nxt_event_set_t *event_set, nxt_uint_t signo);
+static void nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data);
+static void nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo);
#endif
-static void nxt_epoll_poll(nxt_task_t *task, nxt_event_set_t *event_set,
- nxt_msec_t timeout);
+static void nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout);
#if (NXT_HAVE_ACCEPT4)
static void nxt_epoll_event_conn_io_accept4(nxt_task_t *task, void *obj,
@@ -125,7 +122,7 @@ static nxt_event_conn_io_t nxt_epoll_edge_event_conn_io = {
};
-const nxt_event_set_ops_t nxt_epoll_edge_event_set = {
+const nxt_event_interface_t nxt_epoll_edge_engine = {
"epoll_edge",
nxt_epoll_edge_create,
nxt_epoll_free,
@@ -171,7 +168,7 @@ const nxt_event_set_ops_t nxt_epoll_edge_event_set = {
#endif
-const nxt_event_set_ops_t nxt_epoll_level_event_set = {
+const nxt_event_interface_t nxt_epoll_level_engine = {
"epoll_level",
nxt_epoll_level_create,
nxt_epoll_free,
@@ -217,11 +214,11 @@ const nxt_event_set_ops_t nxt_epoll_level_event_set = {
#if (NXT_HAVE_EPOLL_EDGE)
-static nxt_event_set_t *
-nxt_epoll_edge_create(nxt_event_signals_t *signals, nxt_uint_t mchanges,
+static nxt_int_t
+nxt_epoll_edge_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
nxt_uint_t mevents)
{
- return nxt_epoll_create(signals, mchanges, mevents,
+ return nxt_epoll_create(engine, mchanges, mevents,
&nxt_epoll_edge_event_conn_io,
EPOLLET | EPOLLRDHUP);
}
@@ -229,79 +226,71 @@ nxt_epoll_edge_create(nxt_event_signals_t *signals, nxt_uint_t mchanges,
#endif
-static nxt_event_set_t *
-nxt_epoll_level_create(nxt_event_signals_t *signals, nxt_uint_t mchanges,
+static nxt_int_t
+nxt_epoll_level_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
nxt_uint_t mevents)
{
- return nxt_epoll_create(signals, mchanges, mevents,
+ return nxt_epoll_create(engine, mchanges, mevents,
&nxt_unix_event_conn_io, 0);
}
-static nxt_event_set_t *
-nxt_epoll_create(nxt_event_signals_t *signals, nxt_uint_t mchanges,
+static nxt_int_t
+nxt_epoll_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
nxt_uint_t mevents, nxt_event_conn_io_t *io, uint32_t mode)
{
- nxt_event_set_t *event_set;
- nxt_epoll_event_set_t *es;
-
- event_set = nxt_zalloc(sizeof(nxt_epoll_event_set_t));
- if (event_set == NULL) {
- return NULL;
- }
-
- es = &event_set->epoll;
-
- es->epoll = -1;
- es->mode = mode;
- es->mchanges = mchanges;
- es->mevents = mevents;
+ engine->u.epoll.fd = -1;
+ engine->u.epoll.mode = mode;
+ engine->u.epoll.mchanges = mchanges;
+ engine->u.epoll.mevents = mevents;
#if (NXT_HAVE_SIGNALFD)
- es->signalfd.fd = -1;
+ engine->u.epoll.signalfd.fd = -1;
#endif
- es->changes = nxt_malloc(sizeof(nxt_epoll_change_t) * mchanges);
- if (es->changes == NULL) {
+ engine->u.epoll.changes = nxt_malloc(sizeof(nxt_epoll_change_t) * mchanges);
+ if (engine->u.epoll.changes == NULL) {
goto fail;
}
- es->events = nxt_malloc(sizeof(struct epoll_event) * mevents);
- if (es->events == NULL) {
+ engine->u.epoll.events = nxt_malloc(sizeof(struct epoll_event) * mevents);
+ if (engine->u.epoll.events == NULL) {
goto fail;
}
- es->epoll = epoll_create(1);
- if (es->epoll == -1) {
- nxt_main_log_emerg("epoll_create() failed %E", nxt_errno);
+ engine->u.epoll.fd = epoll_create(1);
+ if (engine->u.epoll.fd == -1) {
+ nxt_log(&engine->task, NXT_LOG_CRIT, "epoll_create() failed %E",
+ nxt_errno);
goto fail;
}
- nxt_main_log_debug("epoll_create(): %d", es->epoll);
+ nxt_debug(&engine->task, "epoll_create(): %d", engine->u.epoll.fd);
+
+ if (engine->signals != NULL) {
#if (NXT_HAVE_SIGNALFD)
- if (signals != NULL) {
- if (nxt_epoll_add_signal(es, signals) != NXT_OK) {
+ if (nxt_epoll_add_signal(engine) != NXT_OK) {
goto fail;
}
- }
#endif
- nxt_epoll_test_accept4(io);
+ nxt_epoll_test_accept4(engine, io);
+ }
- return event_set;
+ return NXT_OK;
fail:
- nxt_epoll_free(event_set);
+ nxt_epoll_free(engine);
- return NULL;
+ return NXT_ERROR;
}
static void
-nxt_epoll_test_accept4(nxt_event_conn_io_t *io)
+nxt_epoll_test_accept4(nxt_event_engine_t *engine, nxt_event_conn_io_t *io)
{
static nxt_work_handler_t handler;
@@ -317,8 +306,8 @@ nxt_epoll_test_accept4(nxt_event_conn_io_t *io)
handler = nxt_epoll_event_conn_io_accept4;
} else {
- nxt_main_log_error(NXT_LOG_NOTICE, "accept4() failed %E",
- NXT_ENOSYS);
+ nxt_log(&engine->task, NXT_LOG_INFO, "accept4() failed %E",
+ NXT_ENOSYS);
}
#endif
@@ -329,81 +318,80 @@ nxt_epoll_test_accept4(nxt_event_conn_io_t *io)
static void
-nxt_epoll_free(nxt_event_set_t *event_set)
+nxt_epoll_free(nxt_event_engine_t *engine)
{
- nxt_epoll_event_set_t *es;
+ int fd;
- es = &event_set->epoll;
-
- nxt_main_log_debug("epoll %d free", es->epoll);
+ nxt_debug(&engine->task, "epoll %d free", engine->u.epoll.fd);
#if (NXT_HAVE_SIGNALFD)
- if (es->signalfd.fd != -1) {
- if (close(es->signalfd.fd) != 0) {
- nxt_main_log_emerg("signalfd close(%d) failed %E",
- es->signalfd.fd, nxt_errno);
- }
+ fd = engine->u.epoll.signalfd.fd;
+
+ if (fd != -1 && close(fd) != 0) {
+ nxt_log(&engine->task, NXT_LOG_CRIT, "signalfd close(%d) failed %E",
+ fd, nxt_errno);
}
#endif
#if (NXT_HAVE_EVENTFD)
- if (es->eventfd.fd != -1) {
- if (close(es->eventfd.fd) != 0) {
- nxt_main_log_emerg("eventfd close(%d) failed %E",
- es->eventfd.fd, nxt_errno);
- }
+ fd = engine->u.epoll.eventfd.fd;
+
+ if (fd != -1 && close(fd) != 0) {
+ nxt_log(&engine->task, NXT_LOG_CRIT, "eventfd close(%d) failed %E",
+ fd, nxt_errno);
}
#endif
- if (es->epoll != -1) {
- if (close(es->epoll) != 0) {
- nxt_main_log_emerg("epoll close(%d) failed %E",
- es->epoll, nxt_errno);
- }
+ fd = engine->u.epoll.fd;
+
+ if (fd != -1 && close(fd) != 0) {
+ nxt_log(&engine->task, NXT_LOG_CRIT, "epoll close(%d) failed %E",
+ fd, nxt_errno);
}
- nxt_free(es->events);
- nxt_free(es);
+ nxt_free(engine->u.epoll.events);
+
+ nxt_memzero(&engine->u.epoll, sizeof(nxt_epoll_engine_t));
}
static void
-nxt_epoll_enable(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
+nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
{
- ev->read = NXT_EVENT_DEFAULT;
- ev->write = NXT_EVENT_DEFAULT;
+ ev->read = NXT_EVENT_ACTIVE;
+ ev->write = NXT_EVENT_ACTIVE;
- nxt_epoll_change(event_set, ev, EPOLL_CTL_ADD,
- EPOLLIN | EPOLLOUT | event_set->epoll.mode);
+ nxt_epoll_change(engine, ev, EPOLL_CTL_ADD,
+ EPOLLIN | EPOLLOUT | engine->u.epoll.mode);
}
static void
-nxt_epoll_disable(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
+nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
{
if (ev->read > NXT_EVENT_DISABLED || ev->write > NXT_EVENT_DISABLED) {
ev->read = NXT_EVENT_INACTIVE;
ev->write = NXT_EVENT_INACTIVE;
- nxt_epoll_change(event_set, ev, EPOLL_CTL_DEL, 0);
+ nxt_epoll_change(engine, ev, EPOLL_CTL_DEL, 0);
}
}
static void
-nxt_epoll_delete(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
+nxt_epoll_delete(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
{
if (ev->read != NXT_EVENT_INACTIVE || ev->write != NXT_EVENT_INACTIVE) {
ev->read = NXT_EVENT_INACTIVE;
ev->write = NXT_EVENT_INACTIVE;
- nxt_epoll_change(event_set, ev, EPOLL_CTL_DEL, 0);
+ nxt_epoll_change(engine, ev, EPOLL_CTL_DEL, 0);
}
}
@@ -417,23 +405,17 @@ nxt_epoll_delete(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
* eliminates possible lock contention.
*/
-static void
-nxt_epoll_close(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
+static nxt_bool_t
+nxt_epoll_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
{
- nxt_epoll_event_set_t *es;
+ nxt_epoll_delete(engine, ev);
- nxt_epoll_delete(event_set, ev);
-
- es = &event_set->epoll;
-
- if (es->nchanges != 0) {
- (void) nxt_epoll_commit_changes(ev->task, &event_set->epoll);
- }
+ return ev->changing;
}
static void
-nxt_epoll_enable_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
+nxt_epoll_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
{
int op;
uint32_t events;
@@ -441,7 +423,7 @@ nxt_epoll_enable_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
if (ev->read != NXT_EVENT_BLOCKED) {
op = EPOLL_CTL_MOD;
- events = EPOLLIN | event_set->epoll.mode;
+ events = EPOLLIN | engine->u.epoll.mode;
if (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) {
op = EPOLL_CTL_ADD;
@@ -450,15 +432,15 @@ nxt_epoll_enable_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
events |= EPOLLOUT;
}
- nxt_epoll_change(event_set, ev, op, events);
+ nxt_epoll_change(engine, ev, op, events);
}
- ev->read = NXT_EVENT_DEFAULT;
+ ev->read = NXT_EVENT_ACTIVE;
}
static void
-nxt_epoll_enable_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
+nxt_epoll_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
{
int op;
uint32_t events;
@@ -466,7 +448,7 @@ nxt_epoll_enable_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
if (ev->write != NXT_EVENT_BLOCKED) {
op = EPOLL_CTL_MOD;
- events = EPOLLOUT | event_set->epoll.mode;
+ events = EPOLLOUT | engine->u.epoll.mode;
if (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) {
op = EPOLL_CTL_ADD;
@@ -475,15 +457,15 @@ nxt_epoll_enable_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
events |= EPOLLIN;
}
- nxt_epoll_change(event_set, ev, op, events);
+ nxt_epoll_change(engine, ev, op, events);
}
- ev->write = NXT_EVENT_DEFAULT;
+ ev->write = NXT_EVENT_ACTIVE;
}
static void
-nxt_epoll_disable_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
+nxt_epoll_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
{
int op;
uint32_t events;
@@ -497,15 +479,15 @@ nxt_epoll_disable_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
} else {
op = EPOLL_CTL_MOD;
- events = EPOLLOUT | event_set->epoll.mode;
+ events = EPOLLOUT | engine->u.epoll.mode;
}
- nxt_epoll_change(event_set, ev, op, events);
+ nxt_epoll_change(engine, ev, op, events);
}
static void
-nxt_epoll_disable_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
+nxt_epoll_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
{
int op;
uint32_t events;
@@ -519,15 +501,15 @@ nxt_epoll_disable_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
} else {
op = EPOLL_CTL_MOD;
- events = EPOLLIN | event_set->epoll.mode;
+ events = EPOLLIN | engine->u.epoll.mode;
}
- nxt_epoll_change(event_set, ev, op, events);
+ nxt_epoll_change(engine, ev, op, events);
}
static void
-nxt_epoll_block_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
+nxt_epoll_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
{
if (ev->read != NXT_EVENT_INACTIVE) {
ev->read = NXT_EVENT_BLOCKED;
@@ -536,7 +518,7 @@ nxt_epoll_block_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
static void
-nxt_epoll_block_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
+nxt_epoll_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
{
if (ev->write != NXT_EVENT_INACTIVE) {
ev->write = NXT_EVENT_BLOCKED;
@@ -558,7 +540,7 @@ nxt_epoll_block_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
*/
static void
-nxt_epoll_oneshot_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
+nxt_epoll_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
{
int op;
@@ -568,12 +550,12 @@ nxt_epoll_oneshot_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
ev->read = NXT_EVENT_ONESHOT;
ev->write = NXT_EVENT_INACTIVE;
- nxt_epoll_change(event_set, ev, op, EPOLLIN | EPOLLONESHOT);
+ nxt_epoll_change(engine, ev, op, EPOLLIN | EPOLLONESHOT);
}
static void
-nxt_epoll_oneshot_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
+nxt_epoll_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
{
int op;
@@ -583,16 +565,16 @@ nxt_epoll_oneshot_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
ev->read = NXT_EVENT_INACTIVE;
ev->write = NXT_EVENT_ONESHOT;
- nxt_epoll_change(event_set, ev, op, EPOLLOUT | EPOLLONESHOT);
+ nxt_epoll_change(engine, ev, op, EPOLLOUT | EPOLLONESHOT);
}
static void
-nxt_epoll_enable_accept(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
+nxt_epoll_enable_accept(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
{
- ev->read = NXT_EVENT_DEFAULT;
+ ev->read = NXT_EVENT_ACTIVE;
- nxt_epoll_change(event_set, ev, EPOLL_CTL_ADD, EPOLLIN);
+ nxt_epoll_change(engine, ev, EPOLL_CTL_ADD, EPOLLIN);
}
@@ -602,73 +584,76 @@ nxt_epoll_enable_accept(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
*/
static void
-nxt_epoll_change(nxt_event_set_t *event_set, nxt_event_fd_t *ev, int op,
+nxt_epoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, int op,
uint32_t events)
{
- nxt_epoll_change_t *ch;
- nxt_epoll_event_set_t *es;
+ nxt_epoll_change_t *change;
- es = &event_set->epoll;
+ nxt_debug(ev->task, "epoll %d set event: fd:%d op:%d ev:%XD",
+ engine->u.epoll.fd, ev->fd, op, events);
- nxt_log_debug(ev->log, "epoll %d set event: fd:%d op:%d ev:%XD",
- es->epoll, ev->fd, op, events);
-
- if (es->nchanges >= es->mchanges) {
- (void) nxt_epoll_commit_changes(ev->task, es);
+ if (engine->u.epoll.nchanges >= engine->u.epoll.mchanges) {
+ (void) nxt_epoll_commit_changes(engine);
}
- ch = &es->changes[es->nchanges++];
- ch->op = op;
- ch->fd = ev->fd;
- ch->event.events = events;
- ch->event.data.ptr = ev;
+ ev->changing = 1;
+
+ change = &engine->u.epoll.changes[engine->u.epoll.nchanges++];
+ change->op = op;
+ change->event.events = events;
+ change->event.data.ptr = ev;
}
static nxt_int_t
-nxt_epoll_commit_changes(nxt_task_t *task, nxt_epoll_event_set_t *es)
+nxt_epoll_commit_changes(nxt_event_engine_t *engine)
{
- nxt_int_t ret;
- nxt_event_fd_t *ev;
- nxt_epoll_change_t *ch, *end;
+ int ret;
+ nxt_int_t retval;
+ nxt_fd_event_t *ev;
+ nxt_epoll_change_t *change, *end;
- nxt_debug(task, "epoll %d changes:%ui", es->epoll, es->nchanges);
+ nxt_debug(&engine->task, "epoll %d changes:%ui",
+ engine->u.epoll.fd, engine->u.epoll.nchanges);
- ret = NXT_OK;
- ch = es->changes;
- end = ch + es->nchanges;
+ retval = NXT_OK;
+ change = engine->u.epoll.changes;
+ end = change + engine->u.epoll.nchanges;
do {
- ev = ch->event.data.ptr;
+ ev = change->event.data.ptr;
+ ev->changing = 0;
nxt_debug(ev->task, "epoll_ctl(%d): fd:%d op:%d ev:%XD",
- es->epoll, ch->fd, ch->op, ch->event.events);
+ engine->u.epoll.fd, ev->fd, change->op,
+ change->event.events);
- if (epoll_ctl(es->epoll, ch->op, ch->fd, &ch->event) != 0) {
+ ret = epoll_ctl(engine->u.epoll.fd, change->op, ev->fd, &change->event);
+
+ if (nxt_slow_path(ret != 0)) {
nxt_log(ev->task, NXT_LOG_CRIT, "epoll_ctl(%d, %d, %d) failed %E",
- es->epoll, ch->op, ch->fd, nxt_errno);
+ engine->u.epoll.fd, change->op, ev->fd, nxt_errno);
- nxt_work_queue_add(&task->thread->engine->fast_work_queue,
- nxt_epoll_error_handler,
- ev->task, ev, ev->data);
+ nxt_work_queue_add(&engine->fast_work_queue,
+ nxt_epoll_error_handler, ev->task, ev, ev->data);
- ret = NXT_ERROR;
+ retval = NXT_ERROR;
}
- ch++;
+ change++;
- } while (ch < end);
+ } while (change < end);
- es->nchanges = 0;
+ engine->u.epoll.nchanges = 0;
- return ret;
+ return retval;
}
static void
nxt_epoll_error_handler(nxt_task_t *task, void *obj, void *data)
{
- nxt_event_fd_t *ev;
+ nxt_fd_event_t *ev;
ev = obj;
@@ -682,14 +667,14 @@ nxt_epoll_error_handler(nxt_task_t *task, void *obj, void *data)
#if (NXT_HAVE_SIGNALFD)
static nxt_int_t
-nxt_epoll_add_signal(nxt_epoll_event_set_t *es, nxt_event_signals_t *signals)
+nxt_epoll_add_signal(nxt_event_engine_t *engine)
{
int fd;
- nxt_thread_t *thr;
struct epoll_event ee;
- if (sigprocmask(SIG_BLOCK, &signals->sigmask, NULL) != 0) {
- nxt_main_log_alert("sigprocmask(SIG_BLOCK) failed %E", nxt_errno);
+ if (sigprocmask(SIG_BLOCK, &engine->signals->sigmask, NULL) != 0) {
+ nxt_log(&engine->task, NXT_LOG_CRIT,
+ "sigprocmask(SIG_BLOCK) failed %E", nxt_errno);
return NXT_ERROR;
}
@@ -702,36 +687,34 @@ nxt_epoll_add_signal(nxt_epoll_event_set_t *es, nxt_event_signals_t *signals)
* is set separately.
*/
- fd = signalfd(-1, &signals->sigmask, 0);
+ fd = signalfd(-1, &engine->signals->sigmask, 0);
if (fd == -1) {
- nxt_main_log_emerg("signalfd(%d) failed %E",
- es->signalfd.fd, nxt_errno);
+ nxt_log(&engine->task, NXT_LOG_CRIT, "signalfd(%d) failed %E",
+ engine->u.epoll.signalfd.fd, nxt_errno);
return NXT_ERROR;
}
- es->signalfd.fd = fd;
+ engine->u.epoll.signalfd.fd = fd;
if (nxt_fd_nonblocking(fd) != NXT_OK) {
return NXT_ERROR;
}
- nxt_main_log_debug("signalfd(): %d", fd);
-
- thr = nxt_thread();
+ nxt_debug(&engine->task, "signalfd(): %d", fd);
- es->signalfd.data = signals->handler;
- es->signalfd.read_work_queue = &thr->engine->fast_work_queue;
- es->signalfd.read_handler = nxt_epoll_signalfd_handler;
- es->signalfd.log = &nxt_main_log;
- es->signalfd.task = &thr->engine->task;
+ engine->u.epoll.signalfd.data = engine->signals->handler;
+ engine->u.epoll.signalfd.read_work_queue = &engine->fast_work_queue;
+ engine->u.epoll.signalfd.read_handler = nxt_epoll_signalfd_handler;
+ engine->u.epoll.signalfd.log = engine->task.log;
+ engine->u.epoll.signalfd.task = &engine->task;
ee.events = EPOLLIN;
- ee.data.ptr = &es->signalfd;
+ ee.data.ptr = &engine->u.epoll.signalfd;
- if (epoll_ctl(es->epoll, EPOLL_CTL_ADD, fd, &ee) != 0) {
- nxt_main_log_alert("epoll_ctl(%d, %d, %d) failed %E",
- es->epoll, EPOLL_CTL_ADD, fd, nxt_errno);
+ if (epoll_ctl(engine->u.epoll.fd, EPOLL_CTL_ADD, fd, &ee) != 0) {
+ nxt_log(&engine->task, NXT_LOG_CRIT, "epoll_ctl(%d, %d, %d) failed %E",
+ engine->u.epoll.fd, EPOLL_CTL_ADD, fd, nxt_errno);
return NXT_ERROR;
}
@@ -744,7 +727,7 @@ static void
nxt_epoll_signalfd_handler(nxt_task_t *task, void *obj, void *data)
{
int n;
- nxt_event_fd_t *ev;
+ nxt_fd_event_t *ev;
nxt_work_handler_t handler;
struct signalfd_siginfo sfd;
@@ -773,14 +756,12 @@ nxt_epoll_signalfd_handler(nxt_task_t *task, void *obj, void *data)
#if (NXT_HAVE_EVENTFD)
static nxt_int_t
-nxt_epoll_enable_post(nxt_event_set_t *event_set, nxt_work_handler_t handler)
+nxt_epoll_enable_post(nxt_event_engine_t *engine, nxt_work_handler_t handler)
{
- nxt_thread_t *thr;
- struct epoll_event ee;
- nxt_epoll_event_set_t *es;
+ int ret;
+ struct epoll_event ee;
- es = &event_set->epoll;
- es->post_handler = handler;
+ engine->u.epoll.post_handler = handler;
/*
* Glibc eventfd() wrapper always has the flags argument. Glibc 2.7
@@ -791,36 +772,38 @@ nxt_epoll_enable_post(nxt_event_set_t *event_set, nxt_work_handler_t handler)
* is set separately.
*/
- es->eventfd.fd = eventfd(0, 0);
+ engine->u.epoll.eventfd.fd = eventfd(0, 0);
- if (es->eventfd.fd == -1) {
- nxt_main_log_emerg("eventfd() failed %E", nxt_errno);
+ if (engine->u.epoll.eventfd.fd == -1) {
+ nxt_log(&engine->task, NXT_LOG_CRIT, "eventfd() failed %E", nxt_errno);
return NXT_ERROR;
}
- if (nxt_fd_nonblocking(es->eventfd.fd) != NXT_OK) {
+ if (nxt_fd_nonblocking(engine->u.epoll.eventfd.fd) != NXT_OK) {
return NXT_ERROR;
}
- nxt_main_log_debug("eventfd(): %d", es->eventfd.fd);
+ nxt_debug(&engine->task, "eventfd(): %d", engine->u.epoll.eventfd.fd);
- thr = nxt_thread();
-
- es->eventfd.read_work_queue = &thr->engine->fast_work_queue;
- es->eventfd.read_handler = nxt_epoll_eventfd_handler;
- es->eventfd.data = es;
- es->eventfd.log = &nxt_main_log;
- es->eventfd.task = &thr->engine->task;
+ engine->u.epoll.eventfd.read_work_queue = &engine->fast_work_queue;
+ engine->u.epoll.eventfd.read_handler = nxt_epoll_eventfd_handler;
+ engine->u.epoll.eventfd.data = engine;
+ engine->u.epoll.eventfd.log = engine->task.log;
+ engine->u.epoll.eventfd.task = &engine->task;
ee.events = EPOLLIN | EPOLLET;
- ee.data.ptr = &es->eventfd;
+ ee.data.ptr = &engine->u.epoll.eventfd;
+
+ ret = epoll_ctl(engine->u.epoll.fd, EPOLL_CTL_ADD,
+ engine->u.epoll.eventfd.fd, &ee);
- if (epoll_ctl(es->epoll, EPOLL_CTL_ADD, es->eventfd.fd, &ee) == 0) {
+ if (nxt_fast_path(ret == 0)) {
return NXT_OK;
}
- nxt_main_log_alert("epoll_ctl(%d, %d, %d) failed %E",
- es->epoll, EPOLL_CTL_ADD, es->eventfd.fd, nxt_errno);
+ nxt_log(&engine->task, NXT_LOG_CRIT, "epoll_ctl(%d, %d, %d) failed %E",
+ engine->u.epoll.fd, EPOLL_CTL_ADD, engine->u.epoll.eventfd.fd,
+ nxt_errno);
return NXT_ERROR;
}
@@ -829,13 +812,13 @@ nxt_epoll_enable_post(nxt_event_set_t *event_set, nxt_work_handler_t handler)
static void
nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data)
{
- int n;
- uint64_t events;
- nxt_epoll_event_set_t *es;
+ int n;
+ uint64_t events;
+ nxt_event_engine_t *engine;
- es = data;
+ engine = data;
- nxt_debug(task, "eventfd handler, times:%ui", es->neventfd);
+ nxt_debug(task, "eventfd handler, times:%ui", engine->u.epoll.neventfd);
/*
* The maximum value after write() to a eventfd() descriptor will
@@ -846,30 +829,29 @@ nxt_epoll_eventfd_handler(nxt_task_t *task, void *obj, void *data)
* only the latest write() to the descriptor.
*/
- if (es->neventfd++ >= 0xfffffffe) {
- es->neventfd = 0;
+ if (engine->u.epoll.neventfd++ >= 0xfffffffe) {
+ engine->u.epoll.neventfd = 0;
- n = read(es->eventfd.fd, &events, sizeof(uint64_t));
+ n = read(engine->u.epoll.eventfd.fd, &events, sizeof(uint64_t));
- nxt_debug(task, "read(%d): %d events:%uL", es->eventfd.fd, n, events);
+ nxt_debug(task, "read(%d): %d events:%uL",
+ engine->u.epoll.eventfd.fd, n, events);
if (n != sizeof(uint64_t)) {
nxt_log(task, NXT_LOG_CRIT, "read eventfd(%d) failed %E",
- es->eventfd.fd, nxt_errno);
+ engine->u.epoll.eventfd.fd, nxt_errno);
}
}
- es->post_handler(task, NULL, NULL);
+ engine->u.epoll.post_handler(task, NULL, NULL);
}
static void
-nxt_epoll_signal(nxt_event_set_t *event_set, nxt_uint_t signo)
+nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo)
{
- uint64_t event;
- nxt_epoll_event_set_t *es;
-
- es = &event_set->epoll;
+ size_t ret;
+ uint64_t event;
/*
* eventfd() presents along with signalfd(), so the function
@@ -878,9 +860,11 @@ nxt_epoll_signal(nxt_event_set_t *event_set, nxt_uint_t signo)
event = 1;
- if (write(es->eventfd.fd, &event, sizeof(uint64_t)) != sizeof(uint64_t)) {
- nxt_thread_log_alert("write(%d) to eventfd failed %E",
- es->eventfd.fd, nxt_errno);
+ ret = write(engine->u.epoll.eventfd.fd, &event, sizeof(uint64_t));
+
+ if (nxt_slow_path(ret != sizeof(uint64_t))) {
+ nxt_log(&engine->task, NXT_LOG_CRIT, "write(%d) to eventfd failed %E",
+ engine->u.epoll.eventfd.fd, nxt_errno);
}
}
@@ -888,47 +872,48 @@ nxt_epoll_signal(nxt_event_set_t *event_set, nxt_uint_t signo)
static void
-nxt_epoll_poll(nxt_task_t *task, nxt_event_set_t *event_set,
- nxt_msec_t timeout)
+nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout)
{
- int nevents;
- uint32_t events;
- nxt_int_t i;
- nxt_err_t err;
- nxt_bool_t error;
- nxt_uint_t level;
- nxt_event_fd_t *ev;
- struct epoll_event *event;
- nxt_epoll_event_set_t *es;
-
- es = &event_set->epoll;
-
- if (es->nchanges != 0) {
- if (nxt_epoll_commit_changes(task, es) != NXT_OK) {
+ int nevents;
+ uint32_t events;
+ nxt_int_t i;
+ nxt_err_t err;
+ nxt_bool_t error;
+ nxt_uint_t level;
+ nxt_fd_event_t *ev;
+ struct epoll_event *event;
+
+ if (engine->u.epoll.nchanges != 0) {
+ if (nxt_epoll_commit_changes(engine) != NXT_OK) {
/* Error handlers have been enqueued on failure. */
timeout = 0;
}
}
- nxt_debug(task, "epoll_wait(%d) timeout:%M", es->epoll, timeout);
+ nxt_debug(&engine->task, "epoll_wait(%d) timeout:%M",
+ engine->u.epoll.fd, timeout);
- nevents = epoll_wait(es->epoll, es->events, es->mevents, timeout);
+ nevents = epoll_wait(engine->u.epoll.fd, engine->u.epoll.events,
+ engine->u.epoll.mevents, timeout);
err = (nevents == -1) ? nxt_errno : 0;
- nxt_thread_time_update(task->thread);
+ nxt_thread_time_update(engine->task.thread);
- nxt_debug(task, "epoll_wait(%d): %d", es->epoll, nevents);
+ nxt_debug(&engine->task, "epoll_wait(%d): %d", engine->u.epoll.fd, nevents);
if (nevents == -1) {
- level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_ALERT;
- nxt_log(task, level, "epoll_wait(%d) failed %E", es->epoll, err);
+ level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_CRIT;
+
+ nxt_log(&engine->task, level, "epoll_wait(%d) failed %E",
+ engine->u.epoll.fd, err);
+
return;
}
for (i = 0; i < nevents; i++) {
- event = &es->events[i];
+ event = &engine->u.epoll.events[i];
events = event->events;
ev = event->data.ptr;
@@ -962,9 +947,9 @@ nxt_epoll_poll(nxt_task_t *task, nxt_event_set_t *event_set,
nxt_work_queue_add(ev->read_work_queue, ev->read_handler,
ev->task, ev, ev->data);
- } else if (event_set->epoll.mode == 0) {
+ } else if (engine->u.epoll.mode == 0) {
/* Level-triggered mode. */
- nxt_epoll_disable_read(event_set, ev);
+ nxt_epoll_disable_read(engine, ev);
}
}
@@ -982,9 +967,9 @@ nxt_epoll_poll(nxt_task_t *task, nxt_event_set_t *event_set,
nxt_work_queue_add(ev->write_work_queue, ev->write_handler,
ev->task, ev, ev->data);
- } else if (event_set->epoll.mode == 0) {
+ } else if (engine->u.epoll.mode == 0) {
/* Level-triggered mode. */
- nxt_epoll_disable_write(event_set, ev);
+ nxt_epoll_disable_write(engine, ev);
}
}
@@ -1056,6 +1041,7 @@ static void
nxt_epoll_edge_event_conn_io_connect(nxt_task_t *task, void *obj, void *data)
{
nxt_event_conn_t *c;
+ nxt_event_engine_t *engine;
nxt_work_handler_t handler;
const nxt_event_conn_state_t *state;
@@ -1074,15 +1060,16 @@ nxt_epoll_edge_event_conn_io_connect(nxt_task_t *task, void *obj, void *data)
c->socket.write_handler = nxt_epoll_edge_event_conn_connected;
c->socket.error_handler = nxt_event_conn_connect_error;
- nxt_event_conn_timer(task->thread->engine, c, state, &c->write_timer);
+ engine = task->thread->engine;
+ nxt_event_conn_timer(engine, c, state, &c->write_timer);
- nxt_epoll_enable(task->thread->engine->event_set, &c->socket);
+ nxt_epoll_enable(engine, &c->socket);
c->socket.read = NXT_EVENT_BLOCKED;
return;
#if 0
case NXT_AGAIN:
- nxt_event_conn_timer(thr->engine, c, state, &c->write_timer);
+ nxt_event_conn_timer(engine, c, state, &c->write_timer);
/* Fall through. */
@@ -1102,7 +1089,7 @@ nxt_epoll_edge_event_conn_io_connect(nxt_task_t *task, void *obj, void *data)
c->socket.write_handler = nxt_epoll_edge_event_conn_connected;
c->socket.error_handler = state->error_handler;
- nxt_epoll_enable(thr->engine->event_set, &c->socket);
+ nxt_epoll_enable(engine, &c->socket);
c->socket.read = NXT_EVENT_BLOCKED;
handler = state->ready_handler;