diff options
author | Igor Sysoev <igor@sysoev.ru> | 2017-01-27 11:35:11 +0300 |
---|---|---|
committer | Igor Sysoev <igor@sysoev.ru> | 2017-01-27 11:35:11 +0300 |
commit | ba0391577b06446307fa073f856f57748557e0dd (patch) | |
tree | b2b871a041edee242662c95197bed292531c3a9a /src/nxt_event_engine.c | |
parent | 6886b83c1f3bfdc514d58ad6e9ab40873cafcb54 (diff) | |
download | unit-ba0391577b06446307fa073f856f57748557e0dd.tar.gz unit-ba0391577b06446307fa073f856f57748557e0dd.tar.bz2 |
Work queues refactoring.
Diffstat (limited to 'src/nxt_event_engine.c')
-rw-r--r-- | src/nxt_event_engine.c | 147 |
1 files changed, 75 insertions, 72 deletions
diff --git a/src/nxt_event_engine.c b/src/nxt_event_engine.c index ead63b72..921de23c 100644 --- a/src/nxt_event_engine.c +++ b/src/nxt_event_engine.c @@ -20,8 +20,8 @@ static void nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj, void *data); static void nxt_event_engine_signal_handler(nxt_task_t *task, void *obj, void *data); -static const nxt_event_sig_t *nxt_event_engine_signal_find(nxt_task_t *task, - nxt_uint_t signo); +static nxt_work_handler_t nxt_event_engine_queue_pop(nxt_event_engine_t *engine, + nxt_task_t **task, void **obj, void **data); nxt_event_engine_t * @@ -52,8 +52,21 @@ nxt_event_engine_create(nxt_thread_t *thr, const nxt_event_set_ops_t *event_set, } } - nxt_thread_work_queue_create(thr, 0); + engine->current_work_queue = &engine->fast_work_queue; + nxt_work_queue_cache_create(&engine->work_queue_cache, 0); + + engine->fast_work_queue.cache = &engine->work_queue_cache; + engine->accept_work_queue.cache = &engine->work_queue_cache; + engine->read_work_queue.cache = &engine->work_queue_cache; + engine->socket_work_queue.cache = &engine->work_queue_cache; + engine->connect_work_queue.cache = &engine->work_queue_cache; + engine->write_work_queue.cache = &engine->work_queue_cache; + engine->shutdown_work_queue.cache = &engine->work_queue_cache; + engine->close_work_queue.cache = &engine->work_queue_cache; + engine->final_work_queue.cache = &engine->work_queue_cache; + + nxt_work_queue_name(&engine->fast_work_queue, "fast"); nxt_work_queue_name(&engine->accept_work_queue, "accept"); nxt_work_queue_name(&engine->read_work_queue, "read"); nxt_work_queue_name(&engine->socket_work_queue, "socket"); @@ -61,12 +74,7 @@ nxt_event_engine_create(nxt_thread_t *thr, const nxt_event_set_ops_t *event_set, nxt_work_queue_name(&engine->write_work_queue, "write"); nxt_work_queue_name(&engine->shutdown_work_queue, "shutdown"); nxt_work_queue_name(&engine->close_work_queue, "close"); - -#if (NXT_THREADS) - - nxt_locked_work_queue_create(&engine->work_queue, 7); - -#endif + nxt_work_queue_name(&engine->final_work_queue, "final"); if (signals != NULL) { engine->signals = nxt_event_engine_signals(signals); @@ -134,7 +142,7 @@ event_set_fail: signals_fail: nxt_free(engine->signals); - nxt_thread_work_queue_destroy(thr); + nxt_work_queue_cache_destroy(&engine->work_queue_cache); nxt_free(engine->fibers); fibers_fail: @@ -193,9 +201,9 @@ nxt_event_engine_signal_pipe_create(nxt_event_engine_t *engine) } pipe->event.fd = pipe->fds[0]; - pipe->event.read_work_queue = &engine->task.thread->work_queue.main; + pipe->event.read_work_queue = &engine->fast_work_queue; pipe->event.read_handler = nxt_event_engine_signal_pipe; - pipe->event.write_work_queue = &engine->task.thread->work_queue.main; + pipe->event.write_work_queue = &engine->fast_work_queue; pipe->event.error_handler = nxt_event_engine_signal_pipe_error; pipe->event.log = &nxt_main_log; @@ -237,12 +245,11 @@ nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj, void *data) void -nxt_event_engine_post(nxt_event_engine_t *engine, nxt_work_handler_t handler, - nxt_task_t *task, void *obj, void *data, nxt_log_t *log) +nxt_event_engine_post(nxt_event_engine_t *engine, nxt_work_t *work) { nxt_thread_log_debug("event engine post"); - nxt_locked_work_queue_add(&engine->work_queue, handler, task, obj, data); + nxt_locked_work_queue_add(&engine->locked_work_queue, work); nxt_event_engine_signal(engine, 0); } @@ -273,12 +280,11 @@ nxt_event_engine_signal(nxt_event_engine_t *engine, nxt_uint_t signo) static void nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj, void *data) { - int i, n; - u_char signo; - nxt_bool_t post; - nxt_event_fd_t *ev; - const nxt_event_sig_t *sigev; - u_char buf[128]; + int i, n; + u_char signo; + nxt_bool_t post; + nxt_event_fd_t *ev; + u_char buf[128]; ev = obj; @@ -299,12 +305,8 @@ nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj, void *data) post = 1; } else { - sigev = nxt_event_engine_signal_find(task, signo); - - if (nxt_fast_path(sigev != NULL)) { - sigev->handler(task, (void *) (uintptr_t) signo, - (void *) sigev->name); - } + nxt_event_engine_signal_handler(task, + (void *) (uintptr_t) signo, NULL); } } @@ -319,12 +321,14 @@ nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj, void *data) static void nxt_event_engine_post_handler(nxt_task_t *task, void *obj, void *data) { - nxt_thread_t *thread; + nxt_thread_t *thread; + nxt_event_engine_t *engine; thread = task->thread; + engine = thread->engine; - nxt_locked_work_queue_move(thread, &thread->engine->work_queue, - &thread->work_queue.main); + nxt_locked_work_queue_move(thread, &engine->locked_work_queue, + &engine->fast_work_queue); } @@ -351,31 +355,17 @@ nxt_event_engine_signal_handler(nxt_task_t *task, void *obj, void *data) signo = (uintptr_t) obj; - sigev = nxt_event_engine_signal_find(task, signo); - - if (nxt_fast_path(sigev != NULL)) { - sigev->handler(task, (void *) (uintptr_t) signo, (void *) sigev->name); - } -} - - -static const nxt_event_sig_t * -nxt_event_engine_signal_find(nxt_task_t *task, nxt_uint_t signo) -{ - const nxt_event_sig_t *sigev; - for (sigev = task->thread->engine->signals->sigev; sigev->signo != 0; sigev++) { if (signo == (nxt_uint_t) sigev->signo) { - return sigev; + sigev->handler(task, (void *) signo, (void *) sigev->name); + return; } } nxt_log(task, NXT_LOG_CRIT, "signal %ui handler not found", signo); - - return NULL; } @@ -397,7 +387,7 @@ nxt_event_engine_change(nxt_thread_t *thr, nxt_task_t *task, nxt_event_engine_signals_stop(engine); /* - * Add to thread main work queue the signal events possibly + * Add to engine fast work queue the signal events possibly * received before the blocking signal processing. */ nxt_event_engine_signal_pipe(task, &engine->pipe->event, NULL); @@ -406,11 +396,11 @@ nxt_event_engine_change(nxt_thread_t *thr, nxt_task_t *task, if (engine->pipe != NULL && event_set->enable_post != NULL) { /* * An engine pipe must be closed after all signal events - * added above to thread main work queue will be processed. + * added above to engine fast work queue will be processed. */ - nxt_thread_work_queue_add(thr, &thr->work_queue.main, - nxt_event_engine_signal_pipe_close, - &engine->task, engine->pipe, NULL); + nxt_work_queue_add(&engine->final_work_queue, + nxt_event_engine_signal_pipe_close, + &engine->task, engine->pipe, NULL); engine->pipe = NULL; } @@ -455,8 +445,7 @@ nxt_event_engine_free(nxt_event_engine_t *engine) nxt_event_engine_signal_pipe_free(engine); nxt_free(engine->signals); - nxt_locked_work_queue_destroy(&engine->work_queue); - nxt_thread_work_queue_destroy(nxt_thread()); + nxt_work_queue_cache_destroy(&engine->work_queue_cache); engine->event->free(engine->event_set); @@ -466,6 +455,35 @@ nxt_event_engine_free(nxt_event_engine_t *engine) } +static nxt_work_handler_t +nxt_event_engine_queue_pop(nxt_event_engine_t *engine, nxt_task_t **task, + void **obj, void **data) +{ + nxt_work_queue_t *wq; + + wq = engine->current_work_queue; + + if (wq->head == NULL) { + wq = &engine->fast_work_queue; + + while (wq->head == NULL) { + engine->current_work_queue++; + wq = engine->current_work_queue; + + if (wq > &engine->final_work_queue) { + engine->current_work_queue = &engine->fast_work_queue; + + return NULL; + } + } + } + + nxt_debug(&engine->task, "work queue: %s", wq->name); + + return nxt_work_queue_pop(wq, task, obj, data); +} + + void nxt_event_engine_start(nxt_event_engine_t *engine) { @@ -487,40 +505,25 @@ nxt_event_engine_start(nxt_event_engine_t *engine) /* A return point from fibers. */ } - for ( ;; ) { + thr->log = &nxt_main_log; - for ( ;; ) { - handler = nxt_thread_work_queue_pop(thr, &task, &obj, &data); - - if (handler == NULL) { - break; - } - - handler(task, obj, data); - - thr->log = &nxt_main_log; - } + for ( ;; ) { for ( ;; ) { - handler = nxt_thread_last_work_queue_pop(thr, &task, &obj, &data); + handler = nxt_event_engine_queue_pop(engine, &task, &obj, &data); if (handler == NULL) { break; } handler(task, obj, data); - - thr->log = &nxt_main_log; } /* Attach some event engine work queues in preferred order. */ - nxt_work_queue_attach(thr, &engine->accept_work_queue); - nxt_work_queue_attach(thr, &engine->read_work_queue); - timeout = nxt_event_timer_find(engine); - engine->event->poll(task, engine->event_set, timeout); + engine->event->poll(&engine->task, engine->event_set, timeout); /* * Look up expired timers only if a new zero timer has been |