summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_work_queue.c
diff options
context:
space:
mode:
authorIgor Sysoev <igor@sysoev.ru>2017-01-27 11:35:11 +0300
committerIgor Sysoev <igor@sysoev.ru>2017-01-27 11:35:11 +0300
commitba0391577b06446307fa073f856f57748557e0dd (patch)
treeb2b871a041edee242662c95197bed292531c3a9a /src/nxt_work_queue.c
parent6886b83c1f3bfdc514d58ad6e9ab40873cafcb54 (diff)
downloadunit-ba0391577b06446307fa073f856f57748557e0dd.tar.gz
unit-ba0391577b06446307fa073f856f57748557e0dd.tar.bz2
Work queues refactoring.
Diffstat (limited to 'src/nxt_work_queue.c')
-rw-r--r--src/nxt_work_queue.c477
1 files changed, 62 insertions, 415 deletions
diff --git a/src/nxt_work_queue.c b/src/nxt_work_queue.c
index ffb9317f..ecdc65ff 100644
--- a/src/nxt_work_queue.c
+++ b/src/nxt_work_queue.c
@@ -25,12 +25,7 @@
* a new spare chunk is allocated again.
*/
-static void nxt_work_queue_allocate(nxt_work_queue_cache_t *cache,
- nxt_thread_spinlock_t *lock);
-static void nxt_work_queue_sleep(nxt_thread_spinlock_t *lock);
-static nxt_work_queue_t *nxt_thread_current_work_queue(nxt_thread_t *thr);
-static nxt_work_handler_t nxt_locked_work_queue_pop_work(
- nxt_locked_work_queue_t *lwq, nxt_task_t **task, void **obj, void **data);
+static void nxt_work_queue_allocate(nxt_work_queue_cache_t *cache);
/* It should be adjusted with the "work_queue_bucket_items" directive. */
@@ -38,32 +33,29 @@ static nxt_uint_t nxt_work_queue_bucket_items = 409;
void
-nxt_thread_work_queue_create(nxt_thread_t *thr, size_t chunk_size)
+nxt_work_queue_cache_create(nxt_work_queue_cache_t *cache, size_t chunk_size)
{
- nxt_memzero(&thr->work_queue, sizeof(nxt_thread_work_queue_t));
-
- nxt_work_queue_name(&thr->work_queue.main, "main");
- nxt_work_queue_name(&thr->work_queue.last, "last");
+ nxt_memzero(cache, sizeof(nxt_work_queue_cache_t));
if (chunk_size == 0) {
chunk_size = nxt_work_queue_bucket_items;
}
/* nxt_work_queue_chunk_t already has one work item. */
- thr->work_queue.cache.chunk_size = chunk_size - 1;
+ cache->chunk_size = chunk_size - 1;
- while (thr->work_queue.cache.next == NULL) {
- nxt_work_queue_allocate(&thr->work_queue.cache, NULL);
+ while (cache->next == NULL) {
+ nxt_work_queue_allocate(cache);
}
}
void
-nxt_thread_work_queue_destroy(nxt_thread_t *thr)
+nxt_work_queue_cache_destroy(nxt_work_queue_cache_t *cache)
{
nxt_work_queue_chunk_t *chunk, *next;
- for (chunk = thr->work_queue.cache.chunk; chunk; chunk = next) {
+ for (chunk = cache->chunk; chunk; chunk = next) {
next = chunk->next;
nxt_free(chunk);
}
@@ -71,8 +63,7 @@ nxt_thread_work_queue_destroy(nxt_thread_t *thr)
static void
-nxt_work_queue_allocate(nxt_work_queue_cache_t *cache,
- nxt_thread_spinlock_t *lock)
+nxt_work_queue_allocate(nxt_work_queue_cache_t *cache)
{
size_t size;
nxt_uint_t i, n;
@@ -102,7 +93,6 @@ nxt_work_queue_allocate(nxt_work_queue_cache_t *cache,
work = NULL;
} else {
- nxt_work_queue_sleep(lock);
return;
}
@@ -111,36 +101,19 @@ nxt_work_queue_allocate(nxt_work_queue_cache_t *cache,
}
-static void
-nxt_work_queue_sleep(nxt_thread_spinlock_t *lock)
-{
- if (lock != NULL) {
- nxt_thread_spin_unlock(lock);
- }
-
- nxt_nanosleep(100 * 1000000); /* 100ms */
-
- if (lock != NULL) {
- nxt_thread_spin_lock(lock);
- }
-}
-
-
/* Add a work to a work queue tail. */
void
-nxt_thread_work_queue_add(nxt_thread_t *thr, nxt_work_queue_t *wq,
- nxt_work_handler_t handler, nxt_task_t *task, void *obj, void *data)
+nxt_work_queue_add(nxt_work_queue_t *wq, nxt_work_handler_t handler,
+ nxt_task_t *task, void *obj, void *data)
{
nxt_work_t *work;
- nxt_work_queue_attach(thr, wq);
-
for ( ;; ) {
- work = thr->work_queue.cache.next;
+ work = wq->cache->next;
if (nxt_fast_path(work != NULL)) {
- thr->work_queue.cache.next = work->next;
+ wq->cache->next = work->next;
work->next = NULL;
work->handler = handler;
@@ -160,366 +133,56 @@ nxt_thread_work_queue_add(nxt_thread_t *thr, nxt_work_queue_t *wq,
return;
}
- nxt_work_queue_allocate(&thr->work_queue.cache, NULL);
+ nxt_work_queue_allocate(wq->cache);
}
}
-/* Push a work to a work queue head. */
-
-void
-nxt_thread_work_queue_push(nxt_thread_t *thr, nxt_work_queue_t *wq,
- nxt_work_handler_t handler, nxt_task_t *task, void *obj, void *data)
-{
- nxt_work_t *work;
-
- nxt_work_queue_attach(thr, wq);
-
- for ( ;; ) {
- work = thr->work_queue.cache.next;
-
- if (nxt_fast_path(work != NULL)) {
- thr->work_queue.cache.next = work->next;
- work->next = wq->head;
-
- work->handler = handler;
- work->obj = obj;
- work->data = data;
-
- wq->head = work;
-
- if (wq->tail == NULL) {
- wq->tail = work;
- }
-
- return;
- }
-
- nxt_work_queue_allocate(&thr->work_queue.cache, NULL);
- }
-}
-
-
-/* Attach a work queue to a thread work queue. */
-
-void
-nxt_work_queue_attach(nxt_thread_t *thr, nxt_work_queue_t *wq)
-{
- if (wq->next == NULL && wq != thr->work_queue.tail) {
-
- if (thr->work_queue.tail != NULL) {
- thr->work_queue.tail->next = wq;
-
- } else {
- thr->work_queue.head = wq;
- }
-
- thr->work_queue.tail = wq;
- }
-}
-
-
-/* Pop a work from a thread work queue head. */
-
nxt_work_handler_t
-nxt_thread_work_queue_pop(nxt_thread_t *thr, nxt_task_t **task, void **obj,
+nxt_work_queue_pop(nxt_work_queue_t *wq, nxt_task_t **task, void **obj,
void **data)
{
- nxt_work_t *work;
- nxt_work_queue_t *wq;
-
- wq = nxt_thread_current_work_queue(thr);
-
- if (wq != NULL) {
-
- work = wq->head;
-
- if (work != NULL) {
- wq->head = work->next;
-
- if (work->next == NULL) {
- wq->tail = NULL;
- }
-
- *task = work->task;
- *obj = work->obj;
- nxt_prefetch(*obj);
- *data = work->data;
- nxt_prefetch(*data);
-
- work->next = thr->work_queue.cache.next;
- thr->work_queue.cache.next = work;
-
-#if (NXT_DEBUG)
-
- if (work->handler == NULL) {
- nxt_log_alert(thr->log, "null work handler");
- nxt_abort();
- }
-
-#endif
-
- return work->handler;
- }
- }
-
- return NULL;
-}
-
-
-static nxt_work_queue_t *
-nxt_thread_current_work_queue(nxt_thread_t *thr)
-{
- nxt_work_queue_t *wq, *next;
-
- for (wq = thr->work_queue.head; wq != NULL; wq = next) {
-
- if (wq->head != NULL) {
- nxt_log_debug(thr->log, "work queue: %s", wq->name);
- return wq;
- }
-
- /* Detach empty work queue. */
- next = wq->next;
- wq->next = NULL;
- thr->work_queue.head = next;
- }
-
- thr->work_queue.tail = NULL;
-
- return NULL;
-}
-
-
-/* Drop a work with specified data from a thread work queue. */
-
-void
-nxt_thread_work_queue_drop(nxt_thread_t *thr, void *data)
-{
- nxt_work_t *work, *prev, *next, **link;
- nxt_work_queue_t *wq;
-
- for (wq = thr->work_queue.head; wq != NULL; wq = wq->next) {
-
- prev = NULL;
- link = &wq->head;
-
- for (work = wq->head; work != NULL; work = next) {
-
- next = work->next;
-
- if (data != work->obj) {
- prev = work;
- link = &work->next;
-
- } else {
- if (next == NULL) {
- wq->tail = prev;
- }
-
- nxt_log_debug(thr->log, "work queue drop");
-
- *link = next;
-
- work->next = thr->work_queue.cache.next;
- thr->work_queue.cache.next = work;
- }
- }
- }
-}
-
-
-/* Add a work to the thread last work queue's tail. */
-
-void
-nxt_thread_last_work_queue_add(nxt_thread_t *thr, nxt_work_handler_t handler,
- void *obj, void *data)
-{
nxt_work_t *work;
- for ( ;; ) {
- work = thr->work_queue.cache.next;
-
- if (nxt_fast_path(work != NULL)) {
- thr->work_queue.cache.next = work->next;
- work->next = NULL;
-
- work->handler = handler;
- work->obj = obj;
- work->data = data;
-
- if (thr->work_queue.last.tail != NULL) {
- thr->work_queue.last.tail->next = work;
-
- } else {
- thr->work_queue.last.head = work;
- }
+ work = wq->head;
- thr->work_queue.last.tail = work;
+ wq->head = work->next;
- return;
- }
-
- nxt_work_queue_allocate(&thr->work_queue.cache, NULL);
- }
-}
-
-
-/* Pop a work from the thread last work queue's head. */
-
-nxt_work_handler_t
-nxt_thread_last_work_queue_pop(nxt_thread_t *thr, nxt_task_t **task, void **obj,
- void **data)
-{
- nxt_work_t *work;
-
- work = thr->work_queue.last.head;
-
- if (work != NULL) {
- nxt_log_debug(thr->log, "work queue: %s", thr->work_queue.last.name);
-
- thr->work_queue.last.head = work->next;
-
- if (work->next == NULL) {
- thr->work_queue.last.tail = NULL;
- }
-
- *task = work->task;
- *obj = work->obj;
- nxt_prefetch(*obj);
- *data = work->data;
- nxt_prefetch(*data);
-
- work->next = thr->work_queue.cache.next;
- thr->work_queue.cache.next = work;
-
-#if (NXT_DEBUG)
-
- if (work->handler == NULL) {
- nxt_log_alert(thr->log, "null work handler");
- nxt_abort();
- }
-
-#endif
-
- return work->handler;
- }
-
- return NULL;
-}
-
-
-void
-nxt_work_queue_destroy(nxt_work_queue_t *wq)
-{
- nxt_thread_t *thr;
- nxt_work_queue_t *q;
-
- thr = nxt_thread();
-
- /* Detach from a thread work queue. */
-
- if (thr->work_queue.head == wq) {
- thr->work_queue.head = wq->next;
- q = NULL;
- goto found;
- }
-
- for (q = thr->work_queue.head; q != NULL; q = q->next) {
- if (q->next == wq) {
- q->next = wq->next;
- goto found;
- }
- }
-
- return;
-
-found:
-
- if (thr->work_queue.tail == wq) {
- thr->work_queue.tail = q;
- }
-
- /* Move all queue's works to a thread work queue cache. */
-
- if (wq->tail != NULL) {
- wq->tail->next = thr->work_queue.cache.next;
- }
-
- if (wq->head != NULL) {
- thr->work_queue.cache.next = wq->head;
- }
-}
-
-
-/* Locked work queue operations. */
-
-void
-nxt_locked_work_queue_create(nxt_locked_work_queue_t *lwq, size_t chunk_size)
-{
- nxt_memzero(lwq, sizeof(nxt_locked_work_queue_t));
-
- if (chunk_size == 0) {
- chunk_size = nxt_work_queue_bucket_items;
+ if (work->next == NULL) {
+ wq->tail = NULL;
}
- lwq->cache.chunk_size = chunk_size;
+ *task = work->task;
- while (lwq->cache.next == NULL) {
- nxt_work_queue_allocate(&lwq->cache, NULL);
- }
-}
+ *obj = work->obj;
+ nxt_prefetch(*obj);
+ *data = work->data;
+ nxt_prefetch(*data);
-void
-nxt_locked_work_queue_destroy(nxt_locked_work_queue_t *lwq)
-{
- nxt_work_queue_chunk_t *chunk, *next;
+ work->next = wq->cache->next;
+ wq->cache->next = work;
- for (chunk = lwq->cache.chunk; chunk; chunk = next) {
- next = chunk->next;
- nxt_free(chunk);
- }
+ return work->handler;
}
/* Add a work to a locked work queue tail. */
void
-nxt_locked_work_queue_add(nxt_locked_work_queue_t *lwq,
- nxt_work_handler_t handler, nxt_task_t *task, void *obj, void *data)
+nxt_locked_work_queue_add(nxt_locked_work_queue_t *lwq, nxt_work_t *work)
{
- nxt_work_t *work;
-
nxt_thread_spin_lock(&lwq->lock);
- for ( ;; ) {
- work = lwq->cache.next;
-
- if (nxt_fast_path(work != NULL)) {
- lwq->cache.next = work->next;
-
- work->next = NULL;
- work->handler = handler;
- work->task = task;
- work->obj = obj;
- work->data = data;
-
- if (lwq->tail != NULL) {
- lwq->tail->next = work;
-
- } else {
- lwq->head = work;
- }
-
- lwq->tail = work;
+ if (lwq->tail != NULL) {
+ lwq->tail->next = work;
- break;
- }
-
- nxt_work_queue_allocate(&lwq->cache, &lwq->lock);
+ } else {
+ lwq->head = work;
}
+ lwq->tail = work;
+
nxt_thread_spin_unlock(&lwq->lock);
}
@@ -530,46 +193,36 @@ nxt_work_handler_t
nxt_locked_work_queue_pop(nxt_locked_work_queue_t *lwq, nxt_task_t **task,
void **obj, void **data)
{
+ nxt_work_t *work;
nxt_work_handler_t handler;
- nxt_thread_spin_lock(&lwq->lock);
-
- handler = nxt_locked_work_queue_pop_work(lwq, task, obj, data);
-
- nxt_thread_spin_unlock(&lwq->lock);
+ handler = NULL;
- return handler;
-}
+ nxt_thread_spin_lock(&lwq->lock);
+ work = lwq->head;
-static nxt_work_handler_t
-nxt_locked_work_queue_pop_work(nxt_locked_work_queue_t *lwq, nxt_task_t **task,
- void **obj, void **data)
-{
- nxt_work_t *work;
+ if (work != NULL) {
+ *task = work->task;
- work = lwq->head;
+ *obj = work->obj;
+ nxt_prefetch(*obj);
- if (work == NULL) {
- return NULL;
- }
+ *data = work->data;
+ nxt_prefetch(*data);
- *task = work->task;
- *obj = work->obj;
- nxt_prefetch(*obj);
- *data = work->data;
- nxt_prefetch(*data);
+ lwq->head = work->next;
- lwq->head = work->next;
+ if (work->next == NULL) {
+ lwq->tail = NULL;
+ }
- if (work->next == NULL) {
- lwq->tail = NULL;
+ handler = work->handler;
}
- work->next = lwq->cache.next;
- lwq->cache.next = work;
+ nxt_thread_spin_unlock(&lwq->lock);
- return work->handler;
+ return handler;
}
@@ -579,29 +232,23 @@ void
nxt_locked_work_queue_move(nxt_thread_t *thr, nxt_locked_work_queue_t *lwq,
nxt_work_queue_t *wq)
{
- void *obj, *data;
- nxt_task_t *task;
- nxt_work_handler_t handler;
+ nxt_work_t *work;
- /* Locked work queue head can be tested without a lock. */
+ nxt_thread_spin_lock(&lwq->lock);
- if (nxt_fast_path(lwq->head == NULL)) {
- return;
- }
+ work = lwq->head;
- nxt_thread_spin_lock(&lwq->lock);
+ lwq->head = NULL;
+ lwq->tail = NULL;
- for ( ;; ) {
- handler = nxt_locked_work_queue_pop_work(lwq, &task, &obj, &data);
+ nxt_thread_spin_unlock(&lwq->lock);
- if (handler == NULL) {
- break;
- }
+ while (work != NULL) {
+ work->task->thread = thr;
- task->thread = thr;
+ nxt_work_queue_add(wq, work->handler, work->task,
+ work->obj, work->data);
- nxt_thread_work_queue_add(thr, wq, handler, task, obj, data);
+ work = work->next;
}
-
- nxt_thread_spin_unlock(&lwq->lock);
}