summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_thread_pool.c
diff options
context:
space:
mode:
authorIgor Sysoev <igor@sysoev.ru>2017-01-23 19:56:03 +0300
committerIgor Sysoev <igor@sysoev.ru>2017-01-23 19:56:03 +0300
commitde532922d9ab42aa15b40d47c8db53ac2af38500 (patch)
treed6b7c6b21c7a6e0e3620a3e0c7198e63454d05e3 /src/nxt_thread_pool.c
parent16cbf3c076a0aca6d47adaf3f719493674cf2363 (diff)
downloadunit-de532922d9ab42aa15b40d47c8db53ac2af38500.tar.gz
unit-de532922d9ab42aa15b40d47c8db53ac2af38500.tar.bz2
Introducing tasks.
Diffstat (limited to 'src/nxt_thread_pool.c')
-rw-r--r--src/nxt_thread_pool.c54
1 files changed, 32 insertions, 22 deletions
diff --git a/src/nxt_thread_pool.c b/src/nxt_thread_pool.c
index a9708ed2..463bfad4 100644
--- a/src/nxt_thread_pool.c
+++ b/src/nxt_thread_pool.c
@@ -8,7 +8,7 @@
static nxt_int_t nxt_thread_pool_init(nxt_thread_pool_t *tp);
-static void nxt_thread_pool_exit(nxt_thread_t *thr, void *obj, void *data);
+static void nxt_thread_pool_exit(nxt_task_t *task, void *obj, void *data);
static void nxt_thread_pool_start(void *ctx);
static void nxt_thread_pool_wait(nxt_thread_pool_t *tp);
@@ -28,6 +28,8 @@ nxt_thread_pool_create(nxt_uint_t max_threads, nxt_nsec_t timeout,
tp->max_threads = max_threads;
tp->timeout = timeout;
tp->engine = engine;
+ tp->task.thread = engine->task.thread;
+ tp->task.log = engine->task.log;
tp->init = init;
tp->exit = exit;
@@ -37,7 +39,7 @@ nxt_thread_pool_create(nxt_uint_t max_threads, nxt_nsec_t timeout,
nxt_int_t
nxt_thread_pool_post(nxt_thread_pool_t *tp, nxt_work_handler_t handler,
- void *obj, void *data, nxt_log_t *log)
+ nxt_task_t *task, void *obj, void *data)
{
nxt_thread_log_debug("thread pool post");
@@ -45,7 +47,7 @@ nxt_thread_pool_post(nxt_thread_pool_t *tp, nxt_work_handler_t handler,
return NXT_ERROR;
}
- nxt_locked_work_queue_add(&tp->work_queue, handler, obj, data, log);
+ nxt_locked_work_queue_add(&tp->work_queue, handler, task, obj, data);
(void) nxt_sem_post(&tp->sem);
@@ -117,6 +119,7 @@ static void
nxt_thread_pool_start(void *ctx)
{
void *obj, *data;
+ nxt_task_t *task;
nxt_thread_t *thr;
nxt_thread_pool_t *tp;
nxt_work_handler_t handler;
@@ -129,6 +132,8 @@ nxt_thread_pool_start(void *ctx)
tp->main = thr->handle;
nxt_free(thr->link);
thr->link = NULL;
+
+ tp->task.thread = thr;
}
thr->thread_pool = tp;
@@ -142,24 +147,25 @@ nxt_thread_pool_start(void *ctx)
for ( ;; ) {
nxt_thread_pool_wait(tp);
- handler = nxt_locked_work_queue_pop(&tp->work_queue, &obj,
- &data, &thr->log);
+ handler = nxt_locked_work_queue_pop(&tp->work_queue, &task, &obj,
+ &data);
if (nxt_fast_path(handler != NULL)) {
+ task->thread = thr;
nxt_log_debug(thr->log, "locked work queue");
- handler(thr, obj, data);
+ handler(task, obj, data);
}
for ( ;; ) {
thr->log = &nxt_main_log;
- handler = nxt_thread_work_queue_pop(thr, &obj, &data, &thr->log);
+ handler = nxt_thread_work_queue_pop(thr, &task, &obj, &data);
if (handler == NULL) {
break;
}
- handler(thr, obj, data);
+ handler(task, obj, data);
}
thr->log = &nxt_main_log;
@@ -236,11 +242,11 @@ nxt_thread_pool_destroy(nxt_thread_pool_t *tp)
{
nxt_thread_t *thr;
- if (!tp->ready) {
- thr = nxt_thread();
+ thr = nxt_thread();
+ if (!tp->ready) {
nxt_thread_work_queue_add(thr, &thr->work_queue.main, tp->exit,
- tp, NULL, &nxt_main_log);
+ &tp->task, tp, NULL);
return;
}
@@ -248,7 +254,7 @@ nxt_thread_pool_destroy(nxt_thread_pool_t *tp)
/* Disable new threads creation and mark a pool as being destroyed. */
tp->max_threads = 0;
- nxt_thread_pool_post(tp, nxt_thread_pool_exit, tp, NULL, &nxt_main_log);
+ nxt_thread_pool_post(tp, nxt_thread_pool_exit, &tp->task, tp, NULL);
}
}
@@ -265,15 +271,17 @@ nxt_thread_pool_destroy(nxt_thread_pool_t *tp)
*/
static void
-nxt_thread_pool_exit(nxt_thread_t *thr, void *obj, void *data)
+nxt_thread_pool_exit(nxt_task_t *task, void *obj, void *data)
{
+ nxt_thread_t *thread;
nxt_thread_pool_t *tp;
nxt_atomic_uint_t threads;
nxt_thread_handle_t handle;
tp = obj;
+ thread = task->thread;
- nxt_log_debug(thr->log, "thread pool exit");
+ nxt_debug(task, "thread pool exit");
if (data != NULL) {
handle = (nxt_thread_handle_t) (uintptr_t) data;
@@ -282,17 +290,18 @@ nxt_thread_pool_exit(nxt_thread_t *thr, void *obj, void *data)
threads = nxt_atomic_fetch_add(&tp->threads, -1);
- nxt_log_debug(thr->log, "thread pool threads: %A", threads);
+ nxt_debug(task, "thread pool threads: %A", threads);
if (threads > 1) {
- nxt_thread_pool_post(tp, nxt_thread_pool_exit, tp,
- (void *) (uintptr_t) thr->handle, &nxt_main_log);
+ nxt_thread_pool_post(tp, nxt_thread_pool_exit, &tp->task, tp,
+ (void *) (uintptr_t) thread->handle);
} else {
- nxt_main_log_debug("thread pool destroy");
+ nxt_debug(task, "thread pool destroy");
- nxt_event_engine_post(tp->engine, tp->exit, tp,
- (void *) (uintptr_t) thr->handle, &nxt_main_log);
+ nxt_event_engine_post(tp->engine, tp->exit, &tp->task, tp,
+ (void *) (uintptr_t) thread->handle,
+ &nxt_main_log);
nxt_sem_destroy(&tp->sem);
@@ -301,8 +310,9 @@ nxt_thread_pool_exit(nxt_thread_t *thr, void *obj, void *data)
nxt_free(tp);
}
- nxt_thread_work_queue_destroy(thr);
+ nxt_thread_work_queue_destroy(thread);
+
+ nxt_thread_exit(thread);
- nxt_thread_exit(thr);
nxt_unreachable();
}