diff options
Diffstat (limited to 'src/nxt_thread_pool.c')
-rw-r--r-- | src/nxt_thread_pool.c | 55 |
1 files changed, 22 insertions, 33 deletions
diff --git a/src/nxt_thread_pool.c b/src/nxt_thread_pool.c index 463bfad4..fd7246c8 100644 --- a/src/nxt_thread_pool.c +++ b/src/nxt_thread_pool.c @@ -38,8 +38,7 @@ nxt_thread_pool_create(nxt_uint_t max_threads, nxt_nsec_t timeout, nxt_int_t -nxt_thread_pool_post(nxt_thread_pool_t *tp, nxt_work_handler_t handler, - nxt_task_t *task, void *obj, void *data) +nxt_thread_pool_post(nxt_thread_pool_t *tp, nxt_work_t *work) { nxt_thread_log_debug("thread pool post"); @@ -47,7 +46,7 @@ nxt_thread_pool_post(nxt_thread_pool_t *tp, nxt_work_handler_t handler, return NXT_ERROR; } - nxt_locked_work_queue_add(&tp->work_queue, handler, task, obj, data); + nxt_locked_work_queue_add(&tp->work_queue, work); (void) nxt_sem_post(&tp->sem); @@ -66,6 +65,11 @@ nxt_thread_pool_init(nxt_thread_pool_t *tp) return NXT_OK; } + if (tp->max_threads == 0) { + /* The pool is being destroyed. */ + return NXT_ERROR; + } + nxt_thread_spin_lock(&tp->work_queue.lock); ret = NXT_OK; @@ -78,8 +82,6 @@ nxt_thread_pool_init(nxt_thread_pool_t *tp) if (nxt_fast_path(nxt_sem_init(&tp->sem, 0) == NXT_OK)) { - nxt_locked_work_queue_create(&tp->work_queue, 0); - link = nxt_malloc(sizeof(nxt_thread_link_t)); if (nxt_fast_path(link != NULL)) { @@ -102,8 +104,6 @@ nxt_thread_pool_init(nxt_thread_pool_t *tp) (void) nxt_atomic_fetch_add(&tp->threads, -1); - nxt_locked_work_queue_destroy(&tp->work_queue); - ret = NXT_ERROR; } @@ -142,8 +142,6 @@ nxt_thread_pool_start(void *ctx) tp->init(); } - nxt_thread_work_queue_create(thr, 8); - for ( ;; ) { nxt_thread_pool_wait(tp); @@ -152,18 +150,8 @@ nxt_thread_pool_start(void *ctx) if (nxt_fast_path(handler != NULL)) { task->thread = thr; - nxt_log_debug(thr->log, "locked work queue"); - handler(task, obj, data); - } - for ( ;; ) { - thr->log = &nxt_main_log; - - handler = nxt_thread_work_queue_pop(thr, &task, &obj, &data); - - if (handler == NULL) { - break; - } + nxt_log_debug(thr->log, "locked work queue"); handler(task, obj, data); } @@ -245,8 +233,8 @@ nxt_thread_pool_destroy(nxt_thread_pool_t *tp) thr = nxt_thread(); if (!tp->ready) { - nxt_thread_work_queue_add(thr, &thr->work_queue.main, tp->exit, - &tp->task, tp, NULL); + nxt_work_queue_add(&thr->engine->fast_work_queue, tp->exit, + &tp->task, tp, NULL); return; } @@ -254,7 +242,9 @@ nxt_thread_pool_destroy(nxt_thread_pool_t *tp) /* Disable new threads creation and mark a pool as being destroyed. */ tp->max_threads = 0; - nxt_thread_pool_post(tp, nxt_thread_pool_exit, &tp->task, tp, NULL); + nxt_work_set(&tp->work, nxt_thread_pool_exit, &tp->task, tp, NULL); + + nxt_thread_pool_post(tp, &tp->work); } } @@ -293,24 +283,23 @@ nxt_thread_pool_exit(nxt_task_t *task, void *obj, void *data) nxt_debug(task, "thread pool threads: %A", threads); if (threads > 1) { - nxt_thread_pool_post(tp, nxt_thread_pool_exit, &tp->task, tp, - (void *) (uintptr_t) thread->handle); + nxt_work_set(&tp->work, nxt_thread_pool_exit, &tp->task, tp, + (void *) (uintptr_t) thread->handle); + + nxt_thread_pool_post(tp, &tp->work); } else { nxt_debug(task, "thread pool destroy"); - nxt_event_engine_post(tp->engine, tp->exit, &tp->task, tp, - (void *) (uintptr_t) thread->handle, - &nxt_main_log); - nxt_sem_destroy(&tp->sem); - nxt_locked_work_queue_destroy(&tp->work_queue); + nxt_work_set(&tp->work, nxt_thread_pool_exit, &tp->task, tp, + (void *) (uintptr_t) thread->handle); - nxt_free(tp); - } + nxt_event_engine_post(tp->engine, &tp->work); - nxt_thread_work_queue_destroy(thread); + /* The "tp" memory should be freed by tp->exit handler. */ + } nxt_thread_exit(thread); |