/* * Copyright (C) Igor Sysoev * Copyright (C) NGINX, Inc. */ #include static nxt_int_t nxt_thread_pool_init(nxt_thread_pool_t *tp); static void nxt_thread_pool_exit(nxt_task_t *task, void *obj, void *data); static void nxt_thread_pool_start(void *ctx); static void nxt_thread_pool_wait(nxt_thread_pool_t *tp); nxt_thread_pool_t * nxt_thread_pool_create(nxt_uint_t max_threads, nxt_nsec_t timeout, nxt_thread_pool_init_t init, nxt_event_engine_t *engine, nxt_work_handler_t exit) { nxt_thread_pool_t *tp; tp = nxt_zalloc(sizeof(nxt_thread_pool_t)); if (tp == NULL) { return NULL; } tp->max_threads = max_threads; tp->timeout = timeout; tp->engine = engine; tp->task.thread = engine->task.thread; tp->task.log = engine->task.log; tp->init = init; tp->exit = exit; return tp; } nxt_int_t nxt_thread_pool_post(nxt_thread_pool_t *tp, nxt_work_handler_t handler, nxt_task_t *task, void *obj, void *data) { nxt_thread_log_debug("thread pool post"); if (nxt_slow_path(nxt_thread_pool_init(tp) != NXT_OK)) { return NXT_ERROR; } nxt_locked_work_queue_add(&tp->work_queue, handler, task, obj, data); (void) nxt_sem_post(&tp->sem); return NXT_OK; } static nxt_int_t nxt_thread_pool_init(nxt_thread_pool_t *tp) { nxt_int_t ret; nxt_thread_link_t *link; nxt_thread_handle_t handle; if (nxt_fast_path(tp->ready)) { return NXT_OK; } nxt_thread_spin_lock(&tp->work_queue.lock); ret = NXT_OK; if (!tp->ready) { nxt_thread_log_debug("thread pool init"); (void) nxt_atomic_fetch_add(&tp->threads, 1); if (nxt_fast_path(nxt_sem_init(&tp->sem, 0) == NXT_OK)) { nxt_locked_work_queue_create(&tp->work_queue, 0); link = nxt_malloc(sizeof(nxt_thread_link_t)); if (nxt_fast_path(link != NULL)) { link->start = nxt_thread_pool_start; link->data = tp; link->engine = tp->engine; /* * link->exit is not used. link->engine is used just to * set thr->link by nxt_thread_trampoline() and the link * is a mark of the first thread of pool. */ if (nxt_thread_create(&handle, link) == NXT_OK) { tp->ready = 1; goto done; } } nxt_sem_destroy(&tp->sem); } (void) nxt_atomic_fetch_add(&tp->threads, -1); nxt_locked_work_queue_destroy(&tp->work_queue); ret = NXT_ERROR; } done: nxt_thread_spin_unlock(&tp->work_queue.lock); return ret; } static void nxt_thread_pool_start(void *ctx) { void *obj, *data; nxt_task_t *task; nxt_thread_t *thr; nxt_thread_pool_t *tp; nxt_work_handler_t handler; tp = ctx; thr = nxt_thread(); if (thr->link != NULL) { /* Only the first thread has a link. */ tp->main = thr->handle; nxt_free(thr->link); thr->link = NULL; tp->task.thread = thr; } thr->thread_pool = tp; if (tp->init != NULL) { tp->init(); } nxt_thread_work_queue_create(thr, 8); for ( ;; ) { nxt_thread_pool_wait(tp); handler = nxt_locked_work_queue_pop(&tp->work_queue, &task, &obj, &data); if (nxt_fast_path(handler != NULL)) { task->thread = thr; nxt_log_debug(thr->log, "locked work queue"); handler(task, obj, data); } for ( ;; ) { thr->log = &nxt_main_log; handler = nxt_thread_work_queue_pop(thr, &task, &obj, &data); if (handler == NULL) { break; } handler(task, obj, data); } thr->log = &nxt_main_log; } } static void nxt_thread_pool_wait(nxt_thread_pool_t *tp) { nxt_err_t err; nxt_thread_t *thr; nxt_atomic_uint_t waiting, threads; nxt_thread_link_t *link; nxt_thread_handle_t handle; thr = nxt_thread(); nxt_log_debug(thr->log, "thread pool wait"); (void) nxt_atomic_fetch_add(&tp->waiting, 1); for ( ;; ) { err = nxt_sem_wait(&tp->sem, tp->timeout); if (err == 0) { waiting = nxt_atomic_fetch_add(&tp->waiting, -1); break; } if (err == NXT_ETIMEDOUT) { if (nxt_thread_handle_equal(thr->handle, tp->main)) { continue; } } (void) nxt_atomic_fetch_add(&tp->waiting, -1); (void) nxt_atomic_fetch_add(&tp->threads, -1); nxt_thread_exit(thr); nxt_unreachable(); } nxt_log_debug(thr->log, "thread pool awake, waiting: %A", waiting); if (waiting > 1) { return; } do { threads = tp->threads; if (threads >= tp->max_threads) { return; } } while (!nxt_atomic_cmp_set(&tp->threads, threads, threads + 1)); link = nxt_zalloc(sizeof(nxt_thread_link_t)); if (nxt_fast_path(link != NULL)) { link->start = nxt_thread_pool_start; link->data = tp; if (nxt_thread_create(&handle, link) != NXT_OK) { (void) nxt_atomic_fetch_add(&tp->threads, -1); } } } void nxt_thread_pool_destroy(nxt_thread_pool_t *tp) { nxt_thread_t *thr; thr = nxt_thread(); if (!tp->ready) { nxt_thread_work_queue_add(thr, &thr->work_queue.main, tp->exit, &tp->task, tp, NULL); return; } if (tp->max_threads != 0) { /* Disable new threads creation and mark a pool as being destroyed. */ tp->max_threads = 0; nxt_thread_pool_post(tp, nxt_thread_pool_exit, &tp->task, tp, NULL); } } /* * A thread handle (pthread_t) is either pointer or integer, so it can be * passed as work handler pointer "data" argument. To convert void pointer * to pthread_t and vice versa the source argument should be cast first to * uintptr_t type and then to the destination type. * * If the handle would be a struct it should be stored in thread pool and * the thread pool must be freed in the thread pool exit procedure after * the last thread of pool will exit. */ static void nxt_thread_pool_exit(nxt_task_t *task, void *obj, void *data) { nxt_thread_t *thread; nxt_thread_pool_t *tp; nxt_atomic_uint_t threads; nxt_thread_handle_t handle; tp = obj; thread = task->thread; nxt_debug(task, "thread pool exit"); if (data != NULL) { handle = (nxt_thread_handle_t) (uintptr_t) data; nxt_thread_wait(handle); } threads = nxt_atomic_fetch_add(&tp->threads, -1); nxt_debug(task, "thread pool threads: %A", threads); if (threads > 1) { nxt_thread_pool_post(tp, nxt_thread_pool_exit, &tp->task, tp, (void *) (uintptr_t) thread->handle); } else { nxt_debug(task, "thread pool destroy"); nxt_event_engine_post(tp->engine, tp->exit, &tp->task, tp, (void *) (uintptr_t) thread->handle, &nxt_main_log); nxt_sem_destroy(&tp->sem); nxt_locked_work_queue_destroy(&tp->work_queue); nxt_free(tp); } nxt_thread_work_queue_destroy(thread); nxt_thread_exit(thread); nxt_unreachable(); }