summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_thread_pool.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/nxt_thread_pool.c308
1 files changed, 308 insertions, 0 deletions
diff --git a/src/nxt_thread_pool.c b/src/nxt_thread_pool.c
new file mode 100644
index 00000000..a9708ed2
--- /dev/null
+++ b/src/nxt_thread_pool.c
@@ -0,0 +1,308 @@
+
+/*
+ * Copyright (C) Igor Sysoev
+ * Copyright (C) NGINX, Inc.
+ */
+
+#include <nxt_main.h>
+
+
+static nxt_int_t nxt_thread_pool_init(nxt_thread_pool_t *tp);
+static void nxt_thread_pool_exit(nxt_thread_t *thr, void *obj, void *data);
+static void nxt_thread_pool_start(void *ctx);
+static void nxt_thread_pool_wait(nxt_thread_pool_t *tp);
+
+
+nxt_thread_pool_t *
+nxt_thread_pool_create(nxt_uint_t max_threads, nxt_nsec_t timeout,
+ nxt_thread_pool_init_t init, nxt_event_engine_t *engine,
+ nxt_work_handler_t exit)
+{
+ nxt_thread_pool_t *tp;
+
+ tp = nxt_zalloc(sizeof(nxt_thread_pool_t));
+ if (tp == NULL) {
+ return NULL;
+ }
+
+ tp->max_threads = max_threads;
+ tp->timeout = timeout;
+ tp->engine = engine;
+ tp->init = init;
+ tp->exit = exit;
+
+ return tp;
+}
+
+
+nxt_int_t
+nxt_thread_pool_post(nxt_thread_pool_t *tp, nxt_work_handler_t handler,
+ void *obj, void *data, nxt_log_t *log)
+{
+ nxt_thread_log_debug("thread pool post");
+
+ if (nxt_slow_path(nxt_thread_pool_init(tp) != NXT_OK)) {
+ return NXT_ERROR;
+ }
+
+ nxt_locked_work_queue_add(&tp->work_queue, handler, obj, data, log);
+
+ (void) nxt_sem_post(&tp->sem);
+
+ return NXT_OK;
+}
+
+
+static nxt_int_t
+nxt_thread_pool_init(nxt_thread_pool_t *tp)
+{
+ nxt_int_t ret;
+ nxt_thread_link_t *link;
+ nxt_thread_handle_t handle;
+
+ if (nxt_fast_path(tp->ready)) {
+ return NXT_OK;
+ }
+
+ nxt_thread_spin_lock(&tp->work_queue.lock);
+
+ ret = NXT_OK;
+
+ if (!tp->ready) {
+
+ nxt_thread_log_debug("thread pool init");
+
+ (void) nxt_atomic_fetch_add(&tp->threads, 1);
+
+ if (nxt_fast_path(nxt_sem_init(&tp->sem, 0) == NXT_OK)) {
+
+ nxt_locked_work_queue_create(&tp->work_queue, 0);
+
+ link = nxt_malloc(sizeof(nxt_thread_link_t));
+
+ if (nxt_fast_path(link != NULL)) {
+ link->start = nxt_thread_pool_start;
+ link->data = tp;
+ link->engine = tp->engine;
+ /*
+ * link->exit is not used. link->engine is used just to
+ * set thr->link by nxt_thread_trampoline() and the link
+ * is a mark of the first thread of pool.
+ */
+ if (nxt_thread_create(&handle, link) == NXT_OK) {
+ tp->ready = 1;
+ goto done;
+ }
+ }
+
+ nxt_sem_destroy(&tp->sem);
+ }
+
+ (void) nxt_atomic_fetch_add(&tp->threads, -1);
+
+ nxt_locked_work_queue_destroy(&tp->work_queue);
+
+ ret = NXT_ERROR;
+ }
+
+done:
+
+ nxt_thread_spin_unlock(&tp->work_queue.lock);
+
+ return ret;
+}
+
+
+static void
+nxt_thread_pool_start(void *ctx)
+{
+ void *obj, *data;
+ nxt_thread_t *thr;
+ nxt_thread_pool_t *tp;
+ nxt_work_handler_t handler;
+
+ tp = ctx;
+ thr = nxt_thread();
+
+ if (thr->link != NULL) {
+ /* Only the first thread has a link. */
+ tp->main = thr->handle;
+ nxt_free(thr->link);
+ thr->link = NULL;
+ }
+
+ thr->thread_pool = tp;
+
+ if (tp->init != NULL) {
+ tp->init();
+ }
+
+ nxt_thread_work_queue_create(thr, 8);
+
+ for ( ;; ) {
+ nxt_thread_pool_wait(tp);
+
+ handler = nxt_locked_work_queue_pop(&tp->work_queue, &obj,
+ &data, &thr->log);
+
+ if (nxt_fast_path(handler != NULL)) {
+ nxt_log_debug(thr->log, "locked work queue");
+ handler(thr, obj, data);
+ }
+
+ for ( ;; ) {
+ thr->log = &nxt_main_log;
+
+ handler = nxt_thread_work_queue_pop(thr, &obj, &data, &thr->log);
+
+ if (handler == NULL) {
+ break;
+ }
+
+ handler(thr, obj, data);
+ }
+
+ thr->log = &nxt_main_log;
+ }
+}
+
+
+static void
+nxt_thread_pool_wait(nxt_thread_pool_t *tp)
+{
+ nxt_err_t err;
+ nxt_thread_t *thr;
+ nxt_atomic_uint_t waiting, threads;
+ nxt_thread_link_t *link;
+ nxt_thread_handle_t handle;
+
+ thr = nxt_thread();
+
+ nxt_log_debug(thr->log, "thread pool wait");
+
+ (void) nxt_atomic_fetch_add(&tp->waiting, 1);
+
+ for ( ;; ) {
+ err = nxt_sem_wait(&tp->sem, tp->timeout);
+
+ if (err == 0) {
+ waiting = nxt_atomic_fetch_add(&tp->waiting, -1);
+ break;
+ }
+
+ if (err == NXT_ETIMEDOUT) {
+ if (nxt_thread_handle_equal(thr->handle, tp->main)) {
+ continue;
+ }
+ }
+
+ (void) nxt_atomic_fetch_add(&tp->waiting, -1);
+ (void) nxt_atomic_fetch_add(&tp->threads, -1);
+
+ nxt_thread_exit(thr);
+ nxt_unreachable();
+ }
+
+ nxt_log_debug(thr->log, "thread pool awake, waiting: %A", waiting);
+
+ if (waiting > 1) {
+ return;
+ }
+
+ do {
+ threads = tp->threads;
+
+ if (threads >= tp->max_threads) {
+ return;
+ }
+
+ } while (!nxt_atomic_cmp_set(&tp->threads, threads, threads + 1));
+
+ link = nxt_zalloc(sizeof(nxt_thread_link_t));
+
+ if (nxt_fast_path(link != NULL)) {
+ link->start = nxt_thread_pool_start;
+ link->data = tp;
+
+ if (nxt_thread_create(&handle, link) != NXT_OK) {
+ (void) nxt_atomic_fetch_add(&tp->threads, -1);
+ }
+ }
+}
+
+
+void
+nxt_thread_pool_destroy(nxt_thread_pool_t *tp)
+{
+ nxt_thread_t *thr;
+
+ if (!tp->ready) {
+ thr = nxt_thread();
+
+ nxt_thread_work_queue_add(thr, &thr->work_queue.main, tp->exit,
+ tp, NULL, &nxt_main_log);
+ return;
+ }
+
+ if (tp->max_threads != 0) {
+ /* Disable new threads creation and mark a pool as being destroyed. */
+ tp->max_threads = 0;
+
+ nxt_thread_pool_post(tp, nxt_thread_pool_exit, tp, NULL, &nxt_main_log);
+ }
+}
+
+
+/*
+ * A thread handle (pthread_t) is either pointer or integer, so it can be
+ * passed as work handler pointer "data" argument. To convert void pointer
+ * to pthread_t and vice versa the source argument should be cast first to
+ * uintptr_t type and then to the destination type.
+ *
+ * If the handle would be a struct it should be stored in thread pool and
+ * the thread pool must be freed in the thread pool exit procedure after
+ * the last thread of pool will exit.
+ */
+
+static void
+nxt_thread_pool_exit(nxt_thread_t *thr, void *obj, void *data)
+{
+ nxt_thread_pool_t *tp;
+ nxt_atomic_uint_t threads;
+ nxt_thread_handle_t handle;
+
+ tp = obj;
+
+ nxt_log_debug(thr->log, "thread pool exit");
+
+ if (data != NULL) {
+ handle = (nxt_thread_handle_t) (uintptr_t) data;
+ nxt_thread_wait(handle);
+ }
+
+ threads = nxt_atomic_fetch_add(&tp->threads, -1);
+
+ nxt_log_debug(thr->log, "thread pool threads: %A", threads);
+
+ if (threads > 1) {
+ nxt_thread_pool_post(tp, nxt_thread_pool_exit, tp,
+ (void *) (uintptr_t) thr->handle, &nxt_main_log);
+
+ } else {
+ nxt_main_log_debug("thread pool destroy");
+
+ nxt_event_engine_post(tp->engine, tp->exit, tp,
+ (void *) (uintptr_t) thr->handle, &nxt_main_log);
+
+ nxt_sem_destroy(&tp->sem);
+
+ nxt_locked_work_queue_destroy(&tp->work_queue);
+
+ nxt_free(tp);
+ }
+
+ nxt_thread_work_queue_destroy(thr);
+
+ nxt_thread_exit(thr);
+ nxt_unreachable();
+}