summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_event_engine.c
diff options
context:
space:
mode:
authorIgor Sysoev <igor@sysoev.ru>2017-02-07 20:04:56 +0300
committerIgor Sysoev <igor@sysoev.ru>2017-02-07 20:04:56 +0300
commit059a8642898a6bd4b47d13a1c1d599cd44af7e1c (patch)
treee3c8c530a04f1ae44777d5ea4fd6901dc55a8ebf /src/nxt_event_engine.c
parente57b95a92333fa7ff558737b0ba2b76894cc0412 (diff)
downloadunit-059a8642898a6bd4b47d13a1c1d599cd44af7e1c.tar.gz
unit-059a8642898a6bd4b47d13a1c1d599cd44af7e1c.tar.bz2
Event engines refactoring.
Diffstat (limited to 'src/nxt_event_engine.c')
-rw-r--r--src/nxt_event_engine.c71
1 files changed, 33 insertions, 38 deletions
diff --git a/src/nxt_event_engine.c b/src/nxt_event_engine.c
index ea7fbe62..fbcf2384 100644
--- a/src/nxt_event_engine.c
+++ b/src/nxt_event_engine.c
@@ -25,8 +25,9 @@ static nxt_work_handler_t nxt_event_engine_queue_pop(nxt_event_engine_t *engine,
nxt_event_engine_t *
-nxt_event_engine_create(nxt_thread_t *thr, const nxt_event_set_ops_t *event_set,
- const nxt_event_sig_t *signals, nxt_uint_t flags, nxt_uint_t batch)
+nxt_event_engine_create(nxt_thread_t *thr,
+ const nxt_event_interface_t *interface, const nxt_sig_event_t *signals,
+ nxt_uint_t flags, nxt_uint_t batch)
{
nxt_uint_t events;
nxt_event_engine_t *engine;
@@ -64,7 +65,6 @@ nxt_event_engine_create(nxt_thread_t *thr, const nxt_event_set_ops_t *event_set,
engine->write_work_queue.cache = &engine->work_queue_cache;
engine->shutdown_work_queue.cache = &engine->work_queue_cache;
engine->close_work_queue.cache = &engine->work_queue_cache;
- engine->final_work_queue.cache = &engine->work_queue_cache;
nxt_work_queue_name(&engine->fast_work_queue, "fast");
nxt_work_queue_name(&engine->accept_work_queue, "accept");
@@ -74,7 +74,6 @@ nxt_event_engine_create(nxt_thread_t *thr, const nxt_event_set_ops_t *event_set,
nxt_work_queue_name(&engine->write_work_queue, "write");
nxt_work_queue_name(&engine->shutdown_work_queue, "shutdown");
nxt_work_queue_name(&engine->close_work_queue, "close");
- nxt_work_queue_name(&engine->final_work_queue, "final");
if (signals != NULL) {
engine->signals = nxt_event_engine_signals(signals);
@@ -84,7 +83,7 @@ nxt_event_engine_create(nxt_thread_t *thr, const nxt_event_set_ops_t *event_set,
engine->signals->handler = nxt_event_engine_signal_handler;
- if (!event_set->signal_support) {
+ if (!interface->signal_support) {
if (nxt_event_engine_signals_start(engine) != NXT_OK) {
goto signals_fail;
}
@@ -98,12 +97,11 @@ nxt_event_engine_create(nxt_thread_t *thr, const nxt_event_set_ops_t *event_set,
*/
events = (batch != 0) ? batch : 32;
- engine->event_set = event_set->create(engine->signals, 4 * events, events);
- if (engine->event_set == NULL) {
+ if (interface->create(engine, 4 * events, events) != NXT_OK) {
goto event_set_fail;
}
- engine->event = event_set;
+ engine->event = *interface;
if (nxt_event_engine_post_init(engine) != NXT_OK) {
goto post_fail;
@@ -121,11 +119,9 @@ nxt_event_engine_create(nxt_thread_t *thr, const nxt_event_set_ops_t *event_set,
nxt_queue_init(&engine->listen_connections);
nxt_queue_init(&engine->idle_connections);
- engine->thread = thr;
-
#if !(NXT_THREADS)
- if (engine->event->signal_support) {
+ if (interface->signal_support) {
thr->time.signal = -1;
}
@@ -136,7 +132,7 @@ nxt_event_engine_create(nxt_thread_t *thr, const nxt_event_set_ops_t *event_set,
timers_fail:
post_fail:
- event_set->free(engine->event_set);
+ interface->free(engine);
event_set_fail:
signals_fail:
@@ -155,9 +151,8 @@ fibers_fail:
static nxt_int_t
nxt_event_engine_post_init(nxt_event_engine_t *engine)
{
- if (engine->event->enable_post != NULL) {
- return engine->event->enable_post(engine->event_set,
- nxt_event_engine_post_handler);
+ if (engine->event.enable_post != NULL) {
+ return engine->event.enable_post(engine, nxt_event_engine_post_handler);
}
#if !(NXT_THREADS)
@@ -201,13 +196,14 @@ nxt_event_engine_signal_pipe_create(nxt_event_engine_t *engine)
}
pipe->event.fd = pipe->fds[0];
+ pipe->event.task = &engine->task;
pipe->event.read_work_queue = &engine->fast_work_queue;
pipe->event.read_handler = nxt_event_engine_signal_pipe;
pipe->event.write_work_queue = &engine->fast_work_queue;
pipe->event.error_handler = nxt_event_engine_signal_pipe_error;
- pipe->event.log = &nxt_main_log;
+ pipe->event.log = engine->task.log;
- nxt_event_fd_enable_read(engine, &pipe->event);
+ nxt_fd_event_enable_read(engine, &pipe->event);
return NXT_OK;
}
@@ -223,7 +219,7 @@ nxt_event_engine_signal_pipe_free(nxt_event_engine_t *engine)
if (pipe != NULL) {
if (pipe->event.read_work_queue != NULL) {
- nxt_event_fd_close(engine, &pipe->event);
+ nxt_fd_event_close(engine, &pipe->event);
nxt_pipe_close(pipe->fds);
}
@@ -247,7 +243,7 @@ nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj, void *data)
void
nxt_event_engine_post(nxt_event_engine_t *engine, nxt_work_t *work)
{
- nxt_thread_log_debug("event engine post");
+ nxt_debug(&engine->task, "event engine post");
nxt_locked_work_queue_add(&engine->locked_work_queue, work);
@@ -260,15 +256,15 @@ nxt_event_engine_signal(nxt_event_engine_t *engine, nxt_uint_t signo)
{
u_char buf;
- nxt_thread_log_debug("event engine signal:%ui", signo);
+ nxt_debug(&engine->task, "event engine signal:%ui", signo);
/*
* A signal number may be sent in a signal context, so the signal
* information cannot be passed via a locked work queue.
*/
- if (engine->event->signal != NULL) {
- engine->event->signal(engine->event_set, signo);
+ if (engine->event.signal != NULL) {
+ engine->event.signal(engine, signo);
return;
}
@@ -283,7 +279,7 @@ nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj, void *data)
int i, n;
u_char signo;
nxt_bool_t post;
- nxt_event_fd_t *ev;
+ nxt_fd_event_t *ev;
u_char buf[128];
ev = obj;
@@ -342,7 +338,7 @@ nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj, void *data)
nxt_log(task, NXT_LOG_CRIT, "engine pipe(%FD:%FD) event error",
engine->pipe->fds[0], engine->pipe->fds[1]);
- nxt_event_fd_close(engine, &engine->pipe->event);
+ nxt_fd_event_close(engine, &engine->pipe->event);
nxt_pipe_close(engine->pipe->fds);
}
@@ -351,7 +347,7 @@ static void
nxt_event_engine_signal_handler(nxt_task_t *task, void *obj, void *data)
{
uintptr_t signo;
- const nxt_event_sig_t *sigev;
+ const nxt_sig_event_t *sigev;
signo = (uintptr_t) obj;
@@ -371,7 +367,7 @@ nxt_event_engine_signal_handler(nxt_task_t *task, void *obj, void *data)
nxt_int_t
nxt_event_engine_change(nxt_thread_t *thr, nxt_task_t *task,
- const nxt_event_set_ops_t *event_set, nxt_uint_t batch)
+ const nxt_event_interface_t *interface, nxt_uint_t batch)
{
nxt_uint_t events;
nxt_event_engine_t *engine;
@@ -379,7 +375,7 @@ nxt_event_engine_change(nxt_thread_t *thr, nxt_task_t *task,
engine = thr->engine;
engine->batch = batch;
- if (!engine->event->signal_support && event_set->signal_support) {
+ if (!engine->event.signal_support && interface->signal_support) {
/*
* Block signal processing if the current event
* facility does not support signal processing.
@@ -393,28 +389,27 @@ nxt_event_engine_change(nxt_thread_t *thr, nxt_task_t *task,
nxt_event_engine_signal_pipe(task, &engine->pipe->event, NULL);
}
- if (engine->pipe != NULL && event_set->enable_post != NULL) {
+ if (engine->pipe != NULL && interface->enable_post != NULL) {
/*
* An engine pipe must be closed after all signal events
* added above to engine fast work queue will be processed.
*/
- nxt_work_queue_add(&engine->final_work_queue,
+ nxt_work_queue_add(&engine->fast_work_queue,
nxt_event_engine_signal_pipe_close,
&engine->task, engine->pipe, NULL);
engine->pipe = NULL;
}
- engine->event->free(engine->event_set);
+ engine->event.free(engine);
events = (batch != 0) ? batch : 32;
- engine->event_set = event_set->create(engine->signals, 4 * events, events);
- if (engine->event_set == NULL) {
+ if (interface->create(engine, 4 * events, events) != NXT_OK) {
return NXT_ERROR;
}
- engine->event = event_set;
+ engine->event = *interface;
if (nxt_event_engine_post_init(engine) != NXT_OK) {
return NXT_ERROR;
@@ -422,7 +417,7 @@ nxt_event_engine_change(nxt_thread_t *thr, nxt_task_t *task,
if (engine->signals != NULL) {
- if (!engine->event->signal_support) {
+ if (!engine->event.signal_support) {
return nxt_event_engine_signals_start(engine);
}
@@ -447,7 +442,7 @@ nxt_event_engine_free(nxt_event_engine_t *engine)
nxt_work_queue_cache_destroy(&engine->work_queue_cache);
- engine->event->free(engine->event_set);
+ engine->event.free(engine);
/* TODO: free timers */
@@ -473,7 +468,7 @@ nxt_event_engine_queue_pop(nxt_event_engine_t *engine, nxt_task_t **task,
engine->current_work_queue++;
wq = engine->current_work_queue;
- if (wq > &engine->final_work_queue) {
+ if (wq > &engine->close_work_queue) {
wq = &engine->fast_work_queue;
engine->current_work_queue = wq;
}
@@ -519,7 +514,7 @@ nxt_event_engine_start(nxt_event_engine_t *engine)
/* A return point from fibers. */
}
- thr->log = &nxt_main_log;
+ thr->log = engine->task.log;
for ( ;; ) {
@@ -537,7 +532,7 @@ nxt_event_engine_start(nxt_event_engine_t *engine)
timeout = nxt_timer_find(engine);
- engine->event->poll(&engine->task, engine->event_set, timeout);
+ engine->event.poll(engine, timeout);
now = nxt_thread_monotonic_time(thr) / 1000000;