diff options
author | Igor Sysoev <igor@sysoev.ru> | 2017-01-23 19:56:03 +0300 |
---|---|---|
committer | Igor Sysoev <igor@sysoev.ru> | 2017-01-23 19:56:03 +0300 |
commit | de532922d9ab42aa15b40d47c8db53ac2af38500 (patch) | |
tree | d6b7c6b21c7a6e0e3620a3e0c7198e63454d05e3 /src/nxt_thread_pool.c | |
parent | 16cbf3c076a0aca6d47adaf3f719493674cf2363 (diff) | |
download | unit-de532922d9ab42aa15b40d47c8db53ac2af38500.tar.gz unit-de532922d9ab42aa15b40d47c8db53ac2af38500.tar.bz2 |
Introducing tasks.
Diffstat (limited to 'src/nxt_thread_pool.c')
-rw-r--r-- | src/nxt_thread_pool.c | 54 |
1 files changed, 32 insertions, 22 deletions
diff --git a/src/nxt_thread_pool.c b/src/nxt_thread_pool.c index a9708ed2..463bfad4 100644 --- a/src/nxt_thread_pool.c +++ b/src/nxt_thread_pool.c @@ -8,7 +8,7 @@ static nxt_int_t nxt_thread_pool_init(nxt_thread_pool_t *tp); -static void nxt_thread_pool_exit(nxt_thread_t *thr, void *obj, void *data); +static void nxt_thread_pool_exit(nxt_task_t *task, void *obj, void *data); static void nxt_thread_pool_start(void *ctx); static void nxt_thread_pool_wait(nxt_thread_pool_t *tp); @@ -28,6 +28,8 @@ nxt_thread_pool_create(nxt_uint_t max_threads, nxt_nsec_t timeout, tp->max_threads = max_threads; tp->timeout = timeout; tp->engine = engine; + tp->task.thread = engine->task.thread; + tp->task.log = engine->task.log; tp->init = init; tp->exit = exit; @@ -37,7 +39,7 @@ nxt_thread_pool_create(nxt_uint_t max_threads, nxt_nsec_t timeout, nxt_int_t nxt_thread_pool_post(nxt_thread_pool_t *tp, nxt_work_handler_t handler, - void *obj, void *data, nxt_log_t *log) + nxt_task_t *task, void *obj, void *data) { nxt_thread_log_debug("thread pool post"); @@ -45,7 +47,7 @@ nxt_thread_pool_post(nxt_thread_pool_t *tp, nxt_work_handler_t handler, return NXT_ERROR; } - nxt_locked_work_queue_add(&tp->work_queue, handler, obj, data, log); + nxt_locked_work_queue_add(&tp->work_queue, handler, task, obj, data); (void) nxt_sem_post(&tp->sem); @@ -117,6 +119,7 @@ static void nxt_thread_pool_start(void *ctx) { void *obj, *data; + nxt_task_t *task; nxt_thread_t *thr; nxt_thread_pool_t *tp; nxt_work_handler_t handler; @@ -129,6 +132,8 @@ nxt_thread_pool_start(void *ctx) tp->main = thr->handle; nxt_free(thr->link); thr->link = NULL; + + tp->task.thread = thr; } thr->thread_pool = tp; @@ -142,24 +147,25 @@ nxt_thread_pool_start(void *ctx) for ( ;; ) { nxt_thread_pool_wait(tp); - handler = nxt_locked_work_queue_pop(&tp->work_queue, &obj, - &data, &thr->log); + handler = nxt_locked_work_queue_pop(&tp->work_queue, &task, &obj, + &data); if (nxt_fast_path(handler != NULL)) { + task->thread = thr; nxt_log_debug(thr->log, "locked work queue"); - handler(thr, obj, data); + handler(task, obj, data); } for ( ;; ) { thr->log = &nxt_main_log; - handler = nxt_thread_work_queue_pop(thr, &obj, &data, &thr->log); + handler = nxt_thread_work_queue_pop(thr, &task, &obj, &data); if (handler == NULL) { break; } - handler(thr, obj, data); + handler(task, obj, data); } thr->log = &nxt_main_log; @@ -236,11 +242,11 @@ nxt_thread_pool_destroy(nxt_thread_pool_t *tp) { nxt_thread_t *thr; - if (!tp->ready) { - thr = nxt_thread(); + thr = nxt_thread(); + if (!tp->ready) { nxt_thread_work_queue_add(thr, &thr->work_queue.main, tp->exit, - tp, NULL, &nxt_main_log); + &tp->task, tp, NULL); return; } @@ -248,7 +254,7 @@ nxt_thread_pool_destroy(nxt_thread_pool_t *tp) /* Disable new threads creation and mark a pool as being destroyed. */ tp->max_threads = 0; - nxt_thread_pool_post(tp, nxt_thread_pool_exit, tp, NULL, &nxt_main_log); + nxt_thread_pool_post(tp, nxt_thread_pool_exit, &tp->task, tp, NULL); } } @@ -265,15 +271,17 @@ nxt_thread_pool_destroy(nxt_thread_pool_t *tp) */ static void -nxt_thread_pool_exit(nxt_thread_t *thr, void *obj, void *data) +nxt_thread_pool_exit(nxt_task_t *task, void *obj, void *data) { + nxt_thread_t *thread; nxt_thread_pool_t *tp; nxt_atomic_uint_t threads; nxt_thread_handle_t handle; tp = obj; + thread = task->thread; - nxt_log_debug(thr->log, "thread pool exit"); + nxt_debug(task, "thread pool exit"); if (data != NULL) { handle = (nxt_thread_handle_t) (uintptr_t) data; @@ -282,17 +290,18 @@ nxt_thread_pool_exit(nxt_thread_t *thr, void *obj, void *data) threads = nxt_atomic_fetch_add(&tp->threads, -1); - nxt_log_debug(thr->log, "thread pool threads: %A", threads); + nxt_debug(task, "thread pool threads: %A", threads); if (threads > 1) { - nxt_thread_pool_post(tp, nxt_thread_pool_exit, tp, - (void *) (uintptr_t) thr->handle, &nxt_main_log); + nxt_thread_pool_post(tp, nxt_thread_pool_exit, &tp->task, tp, + (void *) (uintptr_t) thread->handle); } else { - nxt_main_log_debug("thread pool destroy"); + nxt_debug(task, "thread pool destroy"); - nxt_event_engine_post(tp->engine, tp->exit, tp, - (void *) (uintptr_t) thr->handle, &nxt_main_log); + nxt_event_engine_post(tp->engine, tp->exit, &tp->task, tp, + (void *) (uintptr_t) thread->handle, + &nxt_main_log); nxt_sem_destroy(&tp->sem); @@ -301,8 +310,9 @@ nxt_thread_pool_exit(nxt_thread_t *thr, void *obj, void *data) nxt_free(tp); } - nxt_thread_work_queue_destroy(thr); + nxt_thread_work_queue_destroy(thread); + + nxt_thread_exit(thread); - nxt_thread_exit(thr); nxt_unreachable(); } |