summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_event_engine.c
diff options
context:
space:
mode:
authorIgor Sysoev <igor@sysoev.ru>2017-01-27 11:35:11 +0300
committerIgor Sysoev <igor@sysoev.ru>2017-01-27 11:35:11 +0300
commitba0391577b06446307fa073f856f57748557e0dd (patch)
treeb2b871a041edee242662c95197bed292531c3a9a /src/nxt_event_engine.c
parent6886b83c1f3bfdc514d58ad6e9ab40873cafcb54 (diff)
downloadunit-ba0391577b06446307fa073f856f57748557e0dd.tar.gz
unit-ba0391577b06446307fa073f856f57748557e0dd.tar.bz2
Work queues refactoring.
Diffstat (limited to 'src/nxt_event_engine.c')
-rw-r--r--src/nxt_event_engine.c147
1 files changed, 75 insertions, 72 deletions
diff --git a/src/nxt_event_engine.c b/src/nxt_event_engine.c
index ead63b72..921de23c 100644
--- a/src/nxt_event_engine.c
+++ b/src/nxt_event_engine.c
@@ -20,8 +20,8 @@ static void nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj,
void *data);
static void nxt_event_engine_signal_handler(nxt_task_t *task, void *obj,
void *data);
-static const nxt_event_sig_t *nxt_event_engine_signal_find(nxt_task_t *task,
- nxt_uint_t signo);
+static nxt_work_handler_t nxt_event_engine_queue_pop(nxt_event_engine_t *engine,
+ nxt_task_t **task, void **obj, void **data);
nxt_event_engine_t *
@@ -52,8 +52,21 @@ nxt_event_engine_create(nxt_thread_t *thr, const nxt_event_set_ops_t *event_set,
}
}
- nxt_thread_work_queue_create(thr, 0);
+ engine->current_work_queue = &engine->fast_work_queue;
+ nxt_work_queue_cache_create(&engine->work_queue_cache, 0);
+
+ engine->fast_work_queue.cache = &engine->work_queue_cache;
+ engine->accept_work_queue.cache = &engine->work_queue_cache;
+ engine->read_work_queue.cache = &engine->work_queue_cache;
+ engine->socket_work_queue.cache = &engine->work_queue_cache;
+ engine->connect_work_queue.cache = &engine->work_queue_cache;
+ engine->write_work_queue.cache = &engine->work_queue_cache;
+ engine->shutdown_work_queue.cache = &engine->work_queue_cache;
+ engine->close_work_queue.cache = &engine->work_queue_cache;
+ engine->final_work_queue.cache = &engine->work_queue_cache;
+
+ nxt_work_queue_name(&engine->fast_work_queue, "fast");
nxt_work_queue_name(&engine->accept_work_queue, "accept");
nxt_work_queue_name(&engine->read_work_queue, "read");
nxt_work_queue_name(&engine->socket_work_queue, "socket");
@@ -61,12 +74,7 @@ nxt_event_engine_create(nxt_thread_t *thr, const nxt_event_set_ops_t *event_set,
nxt_work_queue_name(&engine->write_work_queue, "write");
nxt_work_queue_name(&engine->shutdown_work_queue, "shutdown");
nxt_work_queue_name(&engine->close_work_queue, "close");
-
-#if (NXT_THREADS)
-
- nxt_locked_work_queue_create(&engine->work_queue, 7);
-
-#endif
+ nxt_work_queue_name(&engine->final_work_queue, "final");
if (signals != NULL) {
engine->signals = nxt_event_engine_signals(signals);
@@ -134,7 +142,7 @@ event_set_fail:
signals_fail:
nxt_free(engine->signals);
- nxt_thread_work_queue_destroy(thr);
+ nxt_work_queue_cache_destroy(&engine->work_queue_cache);
nxt_free(engine->fibers);
fibers_fail:
@@ -193,9 +201,9 @@ nxt_event_engine_signal_pipe_create(nxt_event_engine_t *engine)
}
pipe->event.fd = pipe->fds[0];
- pipe->event.read_work_queue = &engine->task.thread->work_queue.main;
+ pipe->event.read_work_queue = &engine->fast_work_queue;
pipe->event.read_handler = nxt_event_engine_signal_pipe;
- pipe->event.write_work_queue = &engine->task.thread->work_queue.main;
+ pipe->event.write_work_queue = &engine->fast_work_queue;
pipe->event.error_handler = nxt_event_engine_signal_pipe_error;
pipe->event.log = &nxt_main_log;
@@ -237,12 +245,11 @@ nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj, void *data)
void
-nxt_event_engine_post(nxt_event_engine_t *engine, nxt_work_handler_t handler,
- nxt_task_t *task, void *obj, void *data, nxt_log_t *log)
+nxt_event_engine_post(nxt_event_engine_t *engine, nxt_work_t *work)
{
nxt_thread_log_debug("event engine post");
- nxt_locked_work_queue_add(&engine->work_queue, handler, task, obj, data);
+ nxt_locked_work_queue_add(&engine->locked_work_queue, work);
nxt_event_engine_signal(engine, 0);
}
@@ -273,12 +280,11 @@ nxt_event_engine_signal(nxt_event_engine_t *engine, nxt_uint_t signo)
static void
nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj, void *data)
{
- int i, n;
- u_char signo;
- nxt_bool_t post;
- nxt_event_fd_t *ev;
- const nxt_event_sig_t *sigev;
- u_char buf[128];
+ int i, n;
+ u_char signo;
+ nxt_bool_t post;
+ nxt_event_fd_t *ev;
+ u_char buf[128];
ev = obj;
@@ -299,12 +305,8 @@ nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj, void *data)
post = 1;
} else {
- sigev = nxt_event_engine_signal_find(task, signo);
-
- if (nxt_fast_path(sigev != NULL)) {
- sigev->handler(task, (void *) (uintptr_t) signo,
- (void *) sigev->name);
- }
+ nxt_event_engine_signal_handler(task,
+ (void *) (uintptr_t) signo, NULL);
}
}
@@ -319,12 +321,14 @@ nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj, void *data)
static void
nxt_event_engine_post_handler(nxt_task_t *task, void *obj, void *data)
{
- nxt_thread_t *thread;
+ nxt_thread_t *thread;
+ nxt_event_engine_t *engine;
thread = task->thread;
+ engine = thread->engine;
- nxt_locked_work_queue_move(thread, &thread->engine->work_queue,
- &thread->work_queue.main);
+ nxt_locked_work_queue_move(thread, &engine->locked_work_queue,
+ &engine->fast_work_queue);
}
@@ -351,31 +355,17 @@ nxt_event_engine_signal_handler(nxt_task_t *task, void *obj, void *data)
signo = (uintptr_t) obj;
- sigev = nxt_event_engine_signal_find(task, signo);
-
- if (nxt_fast_path(sigev != NULL)) {
- sigev->handler(task, (void *) (uintptr_t) signo, (void *) sigev->name);
- }
-}
-
-
-static const nxt_event_sig_t *
-nxt_event_engine_signal_find(nxt_task_t *task, nxt_uint_t signo)
-{
- const nxt_event_sig_t *sigev;
-
for (sigev = task->thread->engine->signals->sigev;
sigev->signo != 0;
sigev++)
{
if (signo == (nxt_uint_t) sigev->signo) {
- return sigev;
+ sigev->handler(task, (void *) signo, (void *) sigev->name);
+ return;
}
}
nxt_log(task, NXT_LOG_CRIT, "signal %ui handler not found", signo);
-
- return NULL;
}
@@ -397,7 +387,7 @@ nxt_event_engine_change(nxt_thread_t *thr, nxt_task_t *task,
nxt_event_engine_signals_stop(engine);
/*
- * Add to thread main work queue the signal events possibly
+ * Add to engine fast work queue the signal events possibly
* received before the blocking signal processing.
*/
nxt_event_engine_signal_pipe(task, &engine->pipe->event, NULL);
@@ -406,11 +396,11 @@ nxt_event_engine_change(nxt_thread_t *thr, nxt_task_t *task,
if (engine->pipe != NULL && event_set->enable_post != NULL) {
/*
* An engine pipe must be closed after all signal events
- * added above to thread main work queue will be processed.
+ * added above to engine fast work queue will be processed.
*/
- nxt_thread_work_queue_add(thr, &thr->work_queue.main,
- nxt_event_engine_signal_pipe_close,
- &engine->task, engine->pipe, NULL);
+ nxt_work_queue_add(&engine->final_work_queue,
+ nxt_event_engine_signal_pipe_close,
+ &engine->task, engine->pipe, NULL);
engine->pipe = NULL;
}
@@ -455,8 +445,7 @@ nxt_event_engine_free(nxt_event_engine_t *engine)
nxt_event_engine_signal_pipe_free(engine);
nxt_free(engine->signals);
- nxt_locked_work_queue_destroy(&engine->work_queue);
- nxt_thread_work_queue_destroy(nxt_thread());
+ nxt_work_queue_cache_destroy(&engine->work_queue_cache);
engine->event->free(engine->event_set);
@@ -466,6 +455,35 @@ nxt_event_engine_free(nxt_event_engine_t *engine)
}
+static nxt_work_handler_t
+nxt_event_engine_queue_pop(nxt_event_engine_t *engine, nxt_task_t **task,
+ void **obj, void **data)
+{
+ nxt_work_queue_t *wq;
+
+ wq = engine->current_work_queue;
+
+ if (wq->head == NULL) {
+ wq = &engine->fast_work_queue;
+
+ while (wq->head == NULL) {
+ engine->current_work_queue++;
+ wq = engine->current_work_queue;
+
+ if (wq > &engine->final_work_queue) {
+ engine->current_work_queue = &engine->fast_work_queue;
+
+ return NULL;
+ }
+ }
+ }
+
+ nxt_debug(&engine->task, "work queue: %s", wq->name);
+
+ return nxt_work_queue_pop(wq, task, obj, data);
+}
+
+
void
nxt_event_engine_start(nxt_event_engine_t *engine)
{
@@ -487,40 +505,25 @@ nxt_event_engine_start(nxt_event_engine_t *engine)
/* A return point from fibers. */
}
- for ( ;; ) {
+ thr->log = &nxt_main_log;
- for ( ;; ) {
- handler = nxt_thread_work_queue_pop(thr, &task, &obj, &data);
-
- if (handler == NULL) {
- break;
- }
-
- handler(task, obj, data);
-
- thr->log = &nxt_main_log;
- }
+ for ( ;; ) {
for ( ;; ) {
- handler = nxt_thread_last_work_queue_pop(thr, &task, &obj, &data);
+ handler = nxt_event_engine_queue_pop(engine, &task, &obj, &data);
if (handler == NULL) {
break;
}
handler(task, obj, data);
-
- thr->log = &nxt_main_log;
}
/* Attach some event engine work queues in preferred order. */
- nxt_work_queue_attach(thr, &engine->accept_work_queue);
- nxt_work_queue_attach(thr, &engine->read_work_queue);
-
timeout = nxt_event_timer_find(engine);
- engine->event->poll(task, engine->event_set, timeout);
+ engine->event->poll(&engine->task, engine->event_set, timeout);
/*
* Look up expired timers only if a new zero timer has been