summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_work_queue.c
diff options
context:
space:
mode:
authorIgor Sysoev <igor@sysoev.ru>2017-01-17 20:00:00 +0300
committerIgor Sysoev <igor@sysoev.ru>2017-01-17 20:00:00 +0300
commit16cbf3c076a0aca6d47adaf3f719493674cf2363 (patch)
treee6530480020f62a2bdbf249988ec3e2a751d3927 /src/nxt_work_queue.c
downloadunit-16cbf3c076a0aca6d47adaf3f719493674cf2363.tar.gz
unit-16cbf3c076a0aca6d47adaf3f719493674cf2363.tar.bz2
Initial version.
Diffstat (limited to 'src/nxt_work_queue.c')
-rw-r--r--src/nxt_work_queue.c610
1 files changed, 610 insertions, 0 deletions
diff --git a/src/nxt_work_queue.c b/src/nxt_work_queue.c
new file mode 100644
index 00000000..914d6125
--- /dev/null
+++ b/src/nxt_work_queue.c
@@ -0,0 +1,610 @@
+
+/*
+ * Copyright (C) Igor Sysoev
+ * Copyright (C) NGINX, Inc.
+ */
+
+#include <nxt_main.h>
+
+
+/*
+ * Available work items are crucial for overall engine operation, so
+ * the items are preallocated in two chunks: cache and spare chunks.
+ * By default each chunk preallocates 409 work items on two or four
+ * CPU pages depending on platform. If all items in a cache chunk are
+ * exhausted then a spare chunk becomes a cache chunk, and a new spare
+ * chunk is allocated. This two-step allocation mitigates low memory
+ * condition impact on work queue operation. However, if both chunks
+ * are exhausted then a thread will sleep in reliance on another thread
+ * frees some memory. However, this may lead to deadlock and probably
+ * a process should be aborted. This behaviour should be considered as
+ * abort on program stack exhaustion.
+ *
+ * The cache and spare chunks initially are also allocated in two steps:
+ * a spare chunk is allocated first, then it becomes the cache chunk and
+ * a new spare chunk is allocated again.
+ */
+
+static void nxt_work_queue_allocate(nxt_work_queue_cache_t *cache,
+ nxt_thread_spinlock_t *lock);
+static void nxt_work_queue_sleep(nxt_thread_spinlock_t *lock);
+static nxt_work_queue_t *nxt_thread_current_work_queue(nxt_thread_t *thr);
+static nxt_work_handler_t nxt_locked_work_queue_pop_work(
+ nxt_locked_work_queue_t *lwq, void **obj, void **data, nxt_log_t **log);
+
+
+/* It should be adjusted with the "work_queue_bucket_items" directive. */
+static nxt_uint_t nxt_work_queue_bucket_items = 409;
+
+
+void
+nxt_thread_work_queue_create(nxt_thread_t *thr, size_t chunk_size)
+{
+ nxt_memzero(&thr->work_queue, sizeof(nxt_thread_work_queue_t));
+
+ nxt_work_queue_name(&thr->work_queue.main, "main");
+ nxt_work_queue_name(&thr->work_queue.last, "last");
+
+ if (chunk_size == 0) {
+ chunk_size = nxt_work_queue_bucket_items;
+ }
+
+ /* nxt_work_queue_chunk_t already has one work item. */
+ thr->work_queue.cache.chunk_size = chunk_size - 1;
+
+ while (thr->work_queue.cache.next == NULL) {
+ nxt_work_queue_allocate(&thr->work_queue.cache, NULL);
+ }
+}
+
+
+void
+nxt_thread_work_queue_destroy(nxt_thread_t *thr)
+{
+ nxt_work_queue_chunk_t *chunk, *next;
+
+ for (chunk = thr->work_queue.cache.chunk; chunk; chunk = next) {
+ next = chunk->next;
+ nxt_free(chunk);
+ }
+}
+
+
+static void
+nxt_work_queue_allocate(nxt_work_queue_cache_t *cache,
+ nxt_thread_spinlock_t *lock)
+{
+ size_t size;
+ nxt_uint_t i, n;
+ nxt_work_t *work;
+ nxt_work_queue_chunk_t *chunk;
+
+ n = cache->chunk_size;
+ size = sizeof(nxt_work_queue_chunk_t) + n * sizeof(nxt_work_t);
+
+ chunk = nxt_malloc(size);
+
+ if (nxt_fast_path(chunk != NULL)) {
+
+ chunk->next = cache->chunk;
+ cache->chunk = chunk;
+ work = &chunk->work;
+
+ for (i = 0; i < n; i++) {
+ work[i].next = &work[i + 1];
+ }
+
+ work[i].next = NULL;
+ work++;
+
+ } else if (cache->spare != NULL) {
+
+ work = NULL;
+
+ } else {
+ nxt_work_queue_sleep(lock);
+ return;
+ }
+
+ cache->next = cache->spare;
+ cache->spare = work;
+}
+
+
+static void
+nxt_work_queue_sleep(nxt_thread_spinlock_t *lock)
+{
+ if (lock != NULL) {
+ nxt_thread_spin_unlock(lock);
+ }
+
+ nxt_nanosleep(100 * 1000000); /* 100ms */
+
+ if (lock != NULL) {
+ nxt_thread_spin_lock(lock);
+ }
+}
+
+
+/* Add a work to a work queue tail. */
+
+void
+nxt_thread_work_queue_add(nxt_thread_t *thr, nxt_work_queue_t *wq,
+ nxt_work_handler_t handler, void *obj, void *data, nxt_log_t *log)
+{
+ nxt_work_t *work;
+
+ nxt_work_queue_attach(thr, wq);
+
+ for ( ;; ) {
+ work = thr->work_queue.cache.next;
+
+ if (nxt_fast_path(work != NULL)) {
+ thr->work_queue.cache.next = work->next;
+ work->next = NULL;
+
+ work->handler = handler;
+ work->obj = obj;
+ work->data = data;
+ work->log = log;
+
+ if (wq->tail != NULL) {
+ wq->tail->next = work;
+
+ } else {
+ wq->head = work;
+ }
+
+ wq->tail = work;
+
+ return;
+ }
+
+ nxt_work_queue_allocate(&thr->work_queue.cache, NULL);
+ }
+}
+
+
+/* Push a work to a work queue head. */
+
+void
+nxt_thread_work_queue_push(nxt_thread_t *thr, nxt_work_queue_t *wq,
+ nxt_work_handler_t handler, void *obj, void *data, nxt_log_t *log)
+{
+ nxt_work_t *work;
+
+ nxt_work_queue_attach(thr, wq);
+
+ for ( ;; ) {
+ work = thr->work_queue.cache.next;
+
+ if (nxt_fast_path(work != NULL)) {
+ thr->work_queue.cache.next = work->next;
+ work->next = wq->head;
+
+ work->handler = handler;
+ work->obj = obj;
+ work->data = data;
+ work->log = log;
+
+ wq->head = work;
+
+ if (wq->tail == NULL) {
+ wq->tail = work;
+ }
+
+ return;
+ }
+
+ nxt_work_queue_allocate(&thr->work_queue.cache, NULL);
+ }
+}
+
+
+/* Attach a work queue to a thread work queue. */
+
+void
+nxt_work_queue_attach(nxt_thread_t *thr, nxt_work_queue_t *wq)
+{
+ if (wq->next == NULL && wq != thr->work_queue.tail) {
+
+ if (thr->work_queue.tail != NULL) {
+ thr->work_queue.tail->next = wq;
+
+ } else {
+ thr->work_queue.head = wq;
+ }
+
+ thr->work_queue.tail = wq;
+ }
+}
+
+
+/* Pop a work from a thread work queue head. */
+
+nxt_work_handler_t
+nxt_thread_work_queue_pop(nxt_thread_t *thr, void **obj, void **data,
+ nxt_log_t **log)
+{
+ nxt_work_t *work;
+ nxt_work_queue_t *wq;
+
+ wq = nxt_thread_current_work_queue(thr);
+
+ if (wq != NULL) {
+
+ work = wq->head;
+
+ if (work != NULL) {
+ wq->head = work->next;
+
+ if (work->next == NULL) {
+ wq->tail = NULL;
+ }
+
+ *obj = work->obj;
+ nxt_prefetch(*obj);
+ *data = work->data;
+ nxt_prefetch(*data);
+
+ work->next = thr->work_queue.cache.next;
+ thr->work_queue.cache.next = work;
+
+ *log = work->log;
+
+#if (NXT_DEBUG)
+
+ if (work->handler == NULL) {
+ nxt_log_alert(thr->log, "null work handler");
+ nxt_abort();
+ }
+
+#endif
+
+ return work->handler;
+ }
+ }
+
+ return NULL;
+}
+
+
+static nxt_work_queue_t *
+nxt_thread_current_work_queue(nxt_thread_t *thr)
+{
+ nxt_work_queue_t *wq, *next;
+
+ for (wq = thr->work_queue.head; wq != NULL; wq = next) {
+
+ if (wq->head != NULL) {
+ nxt_log_debug(thr->log, "work queue: %s", wq->name);
+ return wq;
+ }
+
+ /* Detach empty work queue. */
+ next = wq->next;
+ wq->next = NULL;
+ thr->work_queue.head = next;
+ }
+
+ thr->work_queue.tail = NULL;
+
+ return NULL;
+}
+
+
+/* Drop a work with specified data from a thread work queue. */
+
+void
+nxt_thread_work_queue_drop(nxt_thread_t *thr, void *data)
+{
+ nxt_work_t *work, *prev, *next, **link;
+ nxt_work_queue_t *wq;
+
+ for (wq = thr->work_queue.head; wq != NULL; wq = wq->next) {
+
+ prev = NULL;
+ link = &wq->head;
+
+ for (work = wq->head; work != NULL; work = next) {
+
+ next = work->next;
+
+ if (data != work->obj) {
+ prev = work;
+ link = &work->next;
+
+ } else {
+ if (next == NULL) {
+ wq->tail = prev;
+ }
+
+ nxt_log_debug(thr->log, "work queue drop");
+
+ *link = next;
+
+ work->next = thr->work_queue.cache.next;
+ thr->work_queue.cache.next = work;
+ }
+ }
+ }
+}
+
+
+/* Add a work to the thread last work queue's tail. */
+
+void
+nxt_thread_last_work_queue_add(nxt_thread_t *thr, nxt_work_handler_t handler,
+ void *obj, void *data, nxt_log_t *log)
+{
+ nxt_work_t *work;
+
+ for ( ;; ) {
+ work = thr->work_queue.cache.next;
+
+ if (nxt_fast_path(work != NULL)) {
+ thr->work_queue.cache.next = work->next;
+ work->next = NULL;
+
+ work->handler = handler;
+ work->obj = obj;
+ work->data = data;
+ work->log = log;
+
+ if (thr->work_queue.last.tail != NULL) {
+ thr->work_queue.last.tail->next = work;
+
+ } else {
+ thr->work_queue.last.head = work;
+ }
+
+ thr->work_queue.last.tail = work;
+
+ return;
+ }
+
+ nxt_work_queue_allocate(&thr->work_queue.cache, NULL);
+ }
+}
+
+
+/* Pop a work from the thread last work queue's head. */
+
+nxt_work_handler_t
+nxt_thread_last_work_queue_pop(nxt_thread_t *thr, void **obj, void **data,
+ nxt_log_t **log)
+{
+ nxt_work_t *work;
+
+ work = thr->work_queue.last.head;
+
+ if (work != NULL) {
+ nxt_log_debug(thr->log, "work queue: %s", thr->work_queue.last.name);
+
+ thr->work_queue.last.head = work->next;
+
+ if (work->next == NULL) {
+ thr->work_queue.last.tail = NULL;
+ }
+
+ *obj = work->obj;
+ nxt_prefetch(*obj);
+ *data = work->data;
+ nxt_prefetch(*data);
+
+ work->next = thr->work_queue.cache.next;
+ thr->work_queue.cache.next = work;
+
+ *log = work->log;
+
+#if (NXT_DEBUG)
+
+ if (work->handler == NULL) {
+ nxt_log_alert(thr->log, "null work handler");
+ nxt_abort();
+ }
+
+#endif
+
+ return work->handler;
+ }
+
+ return NULL;
+}
+
+
+void
+nxt_work_queue_destroy(nxt_work_queue_t *wq)
+{
+ nxt_thread_t *thr;
+ nxt_work_queue_t *q;
+
+ thr = nxt_thread();
+
+ /* Detach from a thread work queue. */
+
+ if (thr->work_queue.head == wq) {
+ thr->work_queue.head = wq->next;
+ q = NULL;
+ goto found;
+ }
+
+ for (q = thr->work_queue.head; q != NULL; q = q->next) {
+ if (q->next == wq) {
+ q->next = wq->next;
+ goto found;
+ }
+ }
+
+ return;
+
+found:
+
+ if (thr->work_queue.tail == wq) {
+ thr->work_queue.tail = q;
+ }
+
+ /* Move all queue's works to a thread work queue cache. */
+
+ if (wq->tail != NULL) {
+ wq->tail->next = thr->work_queue.cache.next;
+ }
+
+ if (wq->head != NULL) {
+ thr->work_queue.cache.next = wq->head;
+ }
+}
+
+
+/* Locked work queue operations. */
+
+void
+nxt_locked_work_queue_create(nxt_locked_work_queue_t *lwq, size_t chunk_size)
+{
+ nxt_memzero(lwq, sizeof(nxt_locked_work_queue_t));
+
+ if (chunk_size == 0) {
+ chunk_size = nxt_work_queue_bucket_items;
+ }
+
+ lwq->cache.chunk_size = chunk_size;
+
+ while (lwq->cache.next == NULL) {
+ nxt_work_queue_allocate(&lwq->cache, NULL);
+ }
+}
+
+
+void
+nxt_locked_work_queue_destroy(nxt_locked_work_queue_t *lwq)
+{
+ nxt_work_queue_chunk_t *chunk, *next;
+
+ for (chunk = lwq->cache.chunk; chunk; chunk = next) {
+ next = chunk->next;
+ nxt_free(chunk);
+ }
+}
+
+
+/* Add a work to a locked work queue tail. */
+
+void
+nxt_locked_work_queue_add(nxt_locked_work_queue_t *lwq,
+ nxt_work_handler_t handler, void *obj, void *data, nxt_log_t *log)
+{
+ nxt_work_t *work;
+
+ nxt_thread_spin_lock(&lwq->lock);
+
+ for ( ;; ) {
+ work = lwq->cache.next;
+
+ if (nxt_fast_path(work != NULL)) {
+ lwq->cache.next = work->next;
+
+ work->next = NULL;
+ work->handler = handler;
+ work->obj = obj;
+ work->data = data;
+ work->log = log;
+
+ if (lwq->tail != NULL) {
+ lwq->tail->next = work;
+
+ } else {
+ lwq->head = work;
+ }
+
+ lwq->tail = work;
+
+ break;
+ }
+
+ nxt_work_queue_allocate(&lwq->cache, &lwq->lock);
+ }
+
+ nxt_thread_spin_unlock(&lwq->lock);
+}
+
+
+/* Pop a work from a locked work queue head. */
+
+nxt_work_handler_t
+nxt_locked_work_queue_pop(nxt_locked_work_queue_t *lwq, void **obj,
+ void **data, nxt_log_t **log)
+{
+ nxt_work_handler_t handler;
+
+ nxt_thread_spin_lock(&lwq->lock);
+
+ handler = nxt_locked_work_queue_pop_work(lwq, obj, data, log);
+
+ nxt_thread_spin_unlock(&lwq->lock);
+
+ return handler;
+}
+
+
+static nxt_work_handler_t
+nxt_locked_work_queue_pop_work(nxt_locked_work_queue_t *lwq, void **obj,
+ void **data, nxt_log_t **log)
+{
+ nxt_work_t *work;
+
+ work = lwq->head;
+
+ if (work == NULL) {
+ return NULL;
+ }
+
+ *obj = work->obj;
+ nxt_prefetch(*obj);
+ *data = work->data;
+ nxt_prefetch(*data);
+
+ lwq->head = work->next;
+
+ if (work->next == NULL) {
+ lwq->tail = NULL;
+ }
+
+ work->next = lwq->cache.next;
+ lwq->cache.next = work;
+
+ *log = work->log;
+
+ return work->handler;
+}
+
+
+/* Move all works from a locked work queue to a usual work queue. */
+
+void
+nxt_locked_work_queue_move(nxt_thread_t *thr, nxt_locked_work_queue_t *lwq,
+ nxt_work_queue_t *wq)
+{
+ void *obj, *data;
+ nxt_log_t *log;
+ nxt_work_handler_t handler;
+
+ /* Locked work queue head can be tested without a lock. */
+
+ if (nxt_fast_path(lwq->head == NULL)) {
+ return;
+ }
+
+ nxt_thread_spin_lock(&lwq->lock);
+
+ for ( ;; ) {
+ handler = nxt_locked_work_queue_pop_work(lwq, &obj, &data, &log);
+
+ if (handler == NULL) {
+ break;
+ }
+
+ nxt_thread_work_queue_add(thr, wq, handler, obj, data, log);
+ }
+
+ nxt_thread_spin_unlock(&lwq->lock);
+}