diff options
author | Igor Sysoev <igor@sysoev.ru> | 2017-01-23 19:56:03 +0300 |
---|---|---|
committer | Igor Sysoev <igor@sysoev.ru> | 2017-01-23 19:56:03 +0300 |
commit | de532922d9ab42aa15b40d47c8db53ac2af38500 (patch) | |
tree | d6b7c6b21c7a6e0e3620a3e0c7198e63454d05e3 /src/nxt_event_engine.c | |
parent | 16cbf3c076a0aca6d47adaf3f719493674cf2363 (diff) | |
download | unit-de532922d9ab42aa15b40d47c8db53ac2af38500.tar.gz unit-de532922d9ab42aa15b40d47c8db53ac2af38500.tar.bz2 |
Introducing tasks.
Diffstat (limited to 'src/nxt_event_engine.c')
-rw-r--r-- | src/nxt_event_engine.c | 120 |
1 files changed, 66 insertions, 54 deletions
diff --git a/src/nxt_event_engine.c b/src/nxt_event_engine.c index dd0f5fe3..ead63b72 100644 --- a/src/nxt_event_engine.c +++ b/src/nxt_event_engine.c @@ -7,21 +7,20 @@ #include <nxt_main.h> -static nxt_int_t nxt_event_engine_post_init(nxt_thread_t *thr, +static nxt_int_t nxt_event_engine_post_init(nxt_event_engine_t *engine); +static nxt_int_t nxt_event_engine_signal_pipe_create( nxt_event_engine_t *engine); -static nxt_int_t nxt_event_engine_signal_pipe_create(nxt_thread_t *thr, - nxt_event_engine_t *engine); -static void nxt_event_engine_signal_pipe_close(nxt_thread_t *thr, void *obj, +static void nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj, void *data); -static void nxt_event_engine_signal_pipe(nxt_thread_t *thr, void *obj, +static void nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj, void *data); -static void nxt_event_engine_post_handler(nxt_thread_t *thr, void *obj, +static void nxt_event_engine_post_handler(nxt_task_t *task, void *obj, void *data); -static void nxt_event_engine_signal_pipe_error(nxt_thread_t *thr, void *obj, +static void nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj, void *data); -static void nxt_event_engine_signal_handler(nxt_thread_t *thr, void *obj, +static void nxt_event_engine_signal_handler(nxt_task_t *task, void *obj, void *data); -static const nxt_event_sig_t *nxt_event_engine_signal_find(nxt_thread_t *thr, +static const nxt_event_sig_t *nxt_event_engine_signal_find(nxt_task_t *task, nxt_uint_t signo); @@ -37,6 +36,13 @@ nxt_event_engine_create(nxt_thread_t *thr, const nxt_event_set_ops_t *event_set, return NULL; } + engine->task.thread = thr; + engine->task.log = thr->log; + engine->task.ident = nxt_task_next_ident(); + + thr->engine = engine; + thr->fiber = &engine->fibers->fiber; + engine->batch = batch; if (flags & NXT_ENGINE_FIBERS) { @@ -91,7 +97,7 @@ nxt_event_engine_create(nxt_thread_t *thr, const nxt_event_set_ops_t *event_set, engine->event = event_set; - if (nxt_event_engine_post_init(thr, engine) != NXT_OK) { + if (nxt_event_engine_post_init(engine) != NXT_OK) { goto post_fail; } @@ -107,8 +113,7 @@ nxt_event_engine_create(nxt_thread_t *thr, const nxt_event_set_ops_t *event_set, nxt_queue_init(&engine->listen_connections); nxt_queue_init(&engine->idle_connections); - thr->engine = engine; - thr->fiber = &engine->fibers->fiber; + engine->thread = thr; #if !(NXT_THREADS) @@ -140,7 +145,7 @@ fibers_fail: static nxt_int_t -nxt_event_engine_post_init(nxt_thread_t *thr, nxt_event_engine_t *engine) +nxt_event_engine_post_init(nxt_event_engine_t *engine) { if (engine->event->enable_post != NULL) { return engine->event->enable_post(engine->event_set, @@ -157,7 +162,7 @@ nxt_event_engine_post_init(nxt_thread_t *thr, nxt_event_engine_t *engine) #endif - if (nxt_event_engine_signal_pipe_create(thr, engine) != NXT_OK) { + if (nxt_event_engine_signal_pipe_create(engine) != NXT_OK) { return NXT_ERROR; } @@ -166,8 +171,7 @@ nxt_event_engine_post_init(nxt_thread_t *thr, nxt_event_engine_t *engine) static nxt_int_t -nxt_event_engine_signal_pipe_create(nxt_thread_t *thr, - nxt_event_engine_t *engine) +nxt_event_engine_signal_pipe_create(nxt_event_engine_t *engine) { nxt_event_engine_pipe_t *pipe; @@ -189,9 +193,9 @@ nxt_event_engine_signal_pipe_create(nxt_thread_t *thr, } pipe->event.fd = pipe->fds[0]; - pipe->event.read_work_queue = &thr->work_queue.main; + pipe->event.read_work_queue = &engine->task.thread->work_queue.main; pipe->event.read_handler = nxt_event_engine_signal_pipe; - pipe->event.write_work_queue = &thr->work_queue.main; + pipe->event.write_work_queue = &engine->task.thread->work_queue.main; pipe->event.error_handler = nxt_event_engine_signal_pipe_error; pipe->event.log = &nxt_main_log; @@ -221,7 +225,7 @@ nxt_event_engine_signal_pipe_free(nxt_event_engine_t *engine) static void -nxt_event_engine_signal_pipe_close(nxt_thread_t *thr, void *obj, void *data) +nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj, void *data) { nxt_event_engine_pipe_t *pipe; @@ -234,11 +238,11 @@ nxt_event_engine_signal_pipe_close(nxt_thread_t *thr, void *obj, void *data) void nxt_event_engine_post(nxt_event_engine_t *engine, nxt_work_handler_t handler, - void *obj, void *data, nxt_log_t *log) + nxt_task_t *task, void *obj, void *data, nxt_log_t *log) { nxt_thread_log_debug("event engine post"); - nxt_locked_work_queue_add(&engine->work_queue, handler, obj, data, log); + nxt_locked_work_queue_add(&engine->work_queue, handler, task, obj, data); nxt_event_engine_signal(engine, 0); } @@ -267,7 +271,7 @@ nxt_event_engine_signal(nxt_event_engine_t *engine, nxt_uint_t signo) static void -nxt_event_engine_signal_pipe(nxt_thread_t *thr, void *obj, void *data) +nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj, void *data) { int i, n; u_char signo; @@ -278,7 +282,7 @@ nxt_event_engine_signal_pipe(nxt_thread_t *thr, void *obj, void *data) ev = obj; - nxt_log_debug(thr->log, "engine signal pipe"); + nxt_debug(task, "engine signal pipe"); post = 0; @@ -288,17 +292,17 @@ nxt_event_engine_signal_pipe(nxt_thread_t *thr, void *obj, void *data) for (i = 0; i < n; i++) { signo = buf[i]; - nxt_log_debug(thr->log, "engine pipe signo:%d", signo); + nxt_debug(task, "engine pipe signo:%d", signo); if (signo == 0) { /* A post should be processed only once. */ post = 1; } else { - sigev = nxt_event_engine_signal_find(thr, signo); + sigev = nxt_event_engine_signal_find(task, signo); if (nxt_fast_path(sigev != NULL)) { - sigev->handler(thr, (void *) (uintptr_t) signo, + sigev->handler(task, (void *) (uintptr_t) signo, (void *) sigev->name); } } @@ -307,70 +311,77 @@ nxt_event_engine_signal_pipe(nxt_thread_t *thr, void *obj, void *data) } while (n == sizeof(buf)); if (post) { - nxt_event_engine_post_handler(thr, NULL, NULL); + nxt_event_engine_post_handler(task, NULL, NULL); } } static void -nxt_event_engine_post_handler(nxt_thread_t *thr, void *obj, void *data) +nxt_event_engine_post_handler(nxt_task_t *task, void *obj, void *data) { - nxt_locked_work_queue_move(thr, &thr->engine->work_queue, - &thr->work_queue.main); + nxt_thread_t *thread; + + thread = task->thread; + + nxt_locked_work_queue_move(thread, &thread->engine->work_queue, + &thread->work_queue.main); } static void -nxt_event_engine_signal_pipe_error(nxt_thread_t *thr, void *obj, void *data) +nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj, void *data) { - nxt_event_fd_t *ev; + nxt_event_engine_t *engine; - ev = obj; + engine = task->thread->engine; - nxt_log_alert(ev->log, "engine pipe(%FD:%FD) event error", - thr->engine->pipe->fds[0], thr->engine->pipe->fds[1]); + nxt_log(task, NXT_LOG_CRIT, "engine pipe(%FD:%FD) event error", + engine->pipe->fds[0], engine->pipe->fds[1]); - nxt_event_fd_close(thr->engine, &thr->engine->pipe->event); - nxt_pipe_close(thr->engine->pipe->fds); + nxt_event_fd_close(engine, &engine->pipe->event); + nxt_pipe_close(engine->pipe->fds); } static void -nxt_event_engine_signal_handler(nxt_thread_t *thr, void *obj, void *data) +nxt_event_engine_signal_handler(nxt_task_t *task, void *obj, void *data) { uintptr_t signo; const nxt_event_sig_t *sigev; signo = (uintptr_t) obj; - sigev = nxt_event_engine_signal_find(thr, signo); + sigev = nxt_event_engine_signal_find(task, signo); if (nxt_fast_path(sigev != NULL)) { - sigev->handler(thr, (void *) (uintptr_t) signo, (void *) sigev->name); + sigev->handler(task, (void *) (uintptr_t) signo, (void *) sigev->name); } } static const nxt_event_sig_t * -nxt_event_engine_signal_find(nxt_thread_t *thr, nxt_uint_t signo) +nxt_event_engine_signal_find(nxt_task_t *task, nxt_uint_t signo) { const nxt_event_sig_t *sigev; - for (sigev = thr->engine->signals->sigev; sigev->signo != 0; sigev++) { + for (sigev = task->thread->engine->signals->sigev; + sigev->signo != 0; + sigev++) + { if (signo == (nxt_uint_t) sigev->signo) { return sigev; } } - nxt_log_alert(thr->log, "signal %ui handler not found", signo); + nxt_log(task, NXT_LOG_CRIT, "signal %ui handler not found", signo); return NULL; } nxt_int_t -nxt_event_engine_change(nxt_thread_t *thr, const nxt_event_set_ops_t *event_set, - nxt_uint_t batch) +nxt_event_engine_change(nxt_thread_t *thr, nxt_task_t *task, + const nxt_event_set_ops_t *event_set, nxt_uint_t batch) { nxt_uint_t events; nxt_event_engine_t *engine; @@ -389,7 +400,7 @@ nxt_event_engine_change(nxt_thread_t *thr, const nxt_event_set_ops_t *event_set, * Add to thread main work queue the signal events possibly * received before the blocking signal processing. */ - nxt_event_engine_signal_pipe(thr, &engine->pipe->event, NULL); + nxt_event_engine_signal_pipe(task, &engine->pipe->event, NULL); } if (engine->pipe != NULL && event_set->enable_post != NULL) { @@ -399,7 +410,7 @@ nxt_event_engine_change(nxt_thread_t *thr, const nxt_event_set_ops_t *event_set, */ nxt_thread_work_queue_add(thr, &thr->work_queue.main, nxt_event_engine_signal_pipe_close, - engine->pipe, NULL, &nxt_main_log); + &engine->task, engine->pipe, NULL); engine->pipe = NULL; } @@ -415,7 +426,7 @@ nxt_event_engine_change(nxt_thread_t *thr, const nxt_event_set_ops_t *event_set, engine->event = event_set; - if (nxt_event_engine_post_init(thr, engine) != NXT_OK) { + if (nxt_event_engine_post_init(engine) != NXT_OK) { return NXT_ERROR; } @@ -459,6 +470,7 @@ void nxt_event_engine_start(nxt_event_engine_t *engine) { void *obj, *data; + nxt_task_t *task; nxt_msec_t timeout, now; nxt_thread_t *thr; nxt_work_handler_t handler; @@ -478,25 +490,25 @@ nxt_event_engine_start(nxt_event_engine_t *engine) for ( ;; ) { for ( ;; ) { - handler = nxt_thread_work_queue_pop(thr, &obj, &data, &thr->log); + handler = nxt_thread_work_queue_pop(thr, &task, &obj, &data); if (handler == NULL) { break; } - handler(thr, obj, data); + handler(task, obj, data); thr->log = &nxt_main_log; } for ( ;; ) { - handler = nxt_thread_last_work_queue_pop(thr, &obj, &data, - &thr->log); + handler = nxt_thread_last_work_queue_pop(thr, &task, &obj, &data); + if (handler == NULL) { break; } - handler(thr, obj, data); + handler(task, obj, data); thr->log = &nxt_main_log; } @@ -508,7 +520,7 @@ nxt_event_engine_start(nxt_event_engine_t *engine) timeout = nxt_event_timer_find(engine); - engine->event->poll(thr, engine->event_set, timeout); + engine->event->poll(task, engine->event_set, timeout); /* * Look up expired timers only if a new zero timer has been |