diff options
Diffstat (limited to '')
-rw-r--r-- | src/nxt_thread_pool.c | 308 |
1 files changed, 308 insertions, 0 deletions
diff --git a/src/nxt_thread_pool.c b/src/nxt_thread_pool.c new file mode 100644 index 00000000..a9708ed2 --- /dev/null +++ b/src/nxt_thread_pool.c @@ -0,0 +1,308 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) NGINX, Inc. + */ + +#include <nxt_main.h> + + +static nxt_int_t nxt_thread_pool_init(nxt_thread_pool_t *tp); +static void nxt_thread_pool_exit(nxt_thread_t *thr, void *obj, void *data); +static void nxt_thread_pool_start(void *ctx); +static void nxt_thread_pool_wait(nxt_thread_pool_t *tp); + + +nxt_thread_pool_t * +nxt_thread_pool_create(nxt_uint_t max_threads, nxt_nsec_t timeout, + nxt_thread_pool_init_t init, nxt_event_engine_t *engine, + nxt_work_handler_t exit) +{ + nxt_thread_pool_t *tp; + + tp = nxt_zalloc(sizeof(nxt_thread_pool_t)); + if (tp == NULL) { + return NULL; + } + + tp->max_threads = max_threads; + tp->timeout = timeout; + tp->engine = engine; + tp->init = init; + tp->exit = exit; + + return tp; +} + + +nxt_int_t +nxt_thread_pool_post(nxt_thread_pool_t *tp, nxt_work_handler_t handler, + void *obj, void *data, nxt_log_t *log) +{ + nxt_thread_log_debug("thread pool post"); + + if (nxt_slow_path(nxt_thread_pool_init(tp) != NXT_OK)) { + return NXT_ERROR; + } + + nxt_locked_work_queue_add(&tp->work_queue, handler, obj, data, log); + + (void) nxt_sem_post(&tp->sem); + + return NXT_OK; +} + + +static nxt_int_t +nxt_thread_pool_init(nxt_thread_pool_t *tp) +{ + nxt_int_t ret; + nxt_thread_link_t *link; + nxt_thread_handle_t handle; + + if (nxt_fast_path(tp->ready)) { + return NXT_OK; + } + + nxt_thread_spin_lock(&tp->work_queue.lock); + + ret = NXT_OK; + + if (!tp->ready) { + + nxt_thread_log_debug("thread pool init"); + + (void) nxt_atomic_fetch_add(&tp->threads, 1); + + if (nxt_fast_path(nxt_sem_init(&tp->sem, 0) == NXT_OK)) { + + nxt_locked_work_queue_create(&tp->work_queue, 0); + + link = nxt_malloc(sizeof(nxt_thread_link_t)); + + if (nxt_fast_path(link != NULL)) { + link->start = nxt_thread_pool_start; + link->data = tp; + link->engine = tp->engine; + /* + * link->exit is not used. link->engine is used just to + * set thr->link by nxt_thread_trampoline() and the link + * is a mark of the first thread of pool. + */ + if (nxt_thread_create(&handle, link) == NXT_OK) { + tp->ready = 1; + goto done; + } + } + + nxt_sem_destroy(&tp->sem); + } + + (void) nxt_atomic_fetch_add(&tp->threads, -1); + + nxt_locked_work_queue_destroy(&tp->work_queue); + + ret = NXT_ERROR; + } + +done: + + nxt_thread_spin_unlock(&tp->work_queue.lock); + + return ret; +} + + +static void +nxt_thread_pool_start(void *ctx) +{ + void *obj, *data; + nxt_thread_t *thr; + nxt_thread_pool_t *tp; + nxt_work_handler_t handler; + + tp = ctx; + thr = nxt_thread(); + + if (thr->link != NULL) { + /* Only the first thread has a link. */ + tp->main = thr->handle; + nxt_free(thr->link); + thr->link = NULL; + } + + thr->thread_pool = tp; + + if (tp->init != NULL) { + tp->init(); + } + + nxt_thread_work_queue_create(thr, 8); + + for ( ;; ) { + nxt_thread_pool_wait(tp); + + handler = nxt_locked_work_queue_pop(&tp->work_queue, &obj, + &data, &thr->log); + + if (nxt_fast_path(handler != NULL)) { + nxt_log_debug(thr->log, "locked work queue"); + handler(thr, obj, data); + } + + for ( ;; ) { + thr->log = &nxt_main_log; + + handler = nxt_thread_work_queue_pop(thr, &obj, &data, &thr->log); + + if (handler == NULL) { + break; + } + + handler(thr, obj, data); + } + + thr->log = &nxt_main_log; + } +} + + +static void +nxt_thread_pool_wait(nxt_thread_pool_t *tp) +{ + nxt_err_t err; + nxt_thread_t *thr; + nxt_atomic_uint_t waiting, threads; + nxt_thread_link_t *link; + nxt_thread_handle_t handle; + + thr = nxt_thread(); + + nxt_log_debug(thr->log, "thread pool wait"); + + (void) nxt_atomic_fetch_add(&tp->waiting, 1); + + for ( ;; ) { + err = nxt_sem_wait(&tp->sem, tp->timeout); + + if (err == 0) { + waiting = nxt_atomic_fetch_add(&tp->waiting, -1); + break; + } + + if (err == NXT_ETIMEDOUT) { + if (nxt_thread_handle_equal(thr->handle, tp->main)) { + continue; + } + } + + (void) nxt_atomic_fetch_add(&tp->waiting, -1); + (void) nxt_atomic_fetch_add(&tp->threads, -1); + + nxt_thread_exit(thr); + nxt_unreachable(); + } + + nxt_log_debug(thr->log, "thread pool awake, waiting: %A", waiting); + + if (waiting > 1) { + return; + } + + do { + threads = tp->threads; + + if (threads >= tp->max_threads) { + return; + } + + } while (!nxt_atomic_cmp_set(&tp->threads, threads, threads + 1)); + + link = nxt_zalloc(sizeof(nxt_thread_link_t)); + + if (nxt_fast_path(link != NULL)) { + link->start = nxt_thread_pool_start; + link->data = tp; + + if (nxt_thread_create(&handle, link) != NXT_OK) { + (void) nxt_atomic_fetch_add(&tp->threads, -1); + } + } +} + + +void +nxt_thread_pool_destroy(nxt_thread_pool_t *tp) +{ + nxt_thread_t *thr; + + if (!tp->ready) { + thr = nxt_thread(); + + nxt_thread_work_queue_add(thr, &thr->work_queue.main, tp->exit, + tp, NULL, &nxt_main_log); + return; + } + + if (tp->max_threads != 0) { + /* Disable new threads creation and mark a pool as being destroyed. */ + tp->max_threads = 0; + + nxt_thread_pool_post(tp, nxt_thread_pool_exit, tp, NULL, &nxt_main_log); + } +} + + +/* + * A thread handle (pthread_t) is either pointer or integer, so it can be + * passed as work handler pointer "data" argument. To convert void pointer + * to pthread_t and vice versa the source argument should be cast first to + * uintptr_t type and then to the destination type. + * + * If the handle would be a struct it should be stored in thread pool and + * the thread pool must be freed in the thread pool exit procedure after + * the last thread of pool will exit. + */ + +static void +nxt_thread_pool_exit(nxt_thread_t *thr, void *obj, void *data) +{ + nxt_thread_pool_t *tp; + nxt_atomic_uint_t threads; + nxt_thread_handle_t handle; + + tp = obj; + + nxt_log_debug(thr->log, "thread pool exit"); + + if (data != NULL) { + handle = (nxt_thread_handle_t) (uintptr_t) data; + nxt_thread_wait(handle); + } + + threads = nxt_atomic_fetch_add(&tp->threads, -1); + + nxt_log_debug(thr->log, "thread pool threads: %A", threads); + + if (threads > 1) { + nxt_thread_pool_post(tp, nxt_thread_pool_exit, tp, + (void *) (uintptr_t) thr->handle, &nxt_main_log); + + } else { + nxt_main_log_debug("thread pool destroy"); + + nxt_event_engine_post(tp->engine, tp->exit, tp, + (void *) (uintptr_t) thr->handle, &nxt_main_log); + + nxt_sem_destroy(&tp->sem); + + nxt_locked_work_queue_destroy(&tp->work_queue); + + nxt_free(tp); + } + + nxt_thread_work_queue_destroy(thr); + + nxt_thread_exit(thr); + nxt_unreachable(); +} |