diff options
author | Igor Sysoev <igor@sysoev.ru> | 2017-01-23 19:56:03 +0300 |
---|---|---|
committer | Igor Sysoev <igor@sysoev.ru> | 2017-01-23 19:56:03 +0300 |
commit | de532922d9ab42aa15b40d47c8db53ac2af38500 (patch) | |
tree | d6b7c6b21c7a6e0e3620a3e0c7198e63454d05e3 /src/nxt_work_queue.c | |
parent | 16cbf3c076a0aca6d47adaf3f719493674cf2363 (diff) | |
download | unit-de532922d9ab42aa15b40d47c8db53ac2af38500.tar.gz unit-de532922d9ab42aa15b40d47c8db53ac2af38500.tar.bz2 |
Introducing tasks.
Diffstat (limited to '')
-rw-r--r-- | src/nxt_work_queue.c | 51 |
1 files changed, 24 insertions, 27 deletions
diff --git a/src/nxt_work_queue.c b/src/nxt_work_queue.c index 914d6125..ffb9317f 100644 --- a/src/nxt_work_queue.c +++ b/src/nxt_work_queue.c @@ -30,7 +30,7 @@ static void nxt_work_queue_allocate(nxt_work_queue_cache_t *cache, static void nxt_work_queue_sleep(nxt_thread_spinlock_t *lock); static nxt_work_queue_t *nxt_thread_current_work_queue(nxt_thread_t *thr); static nxt_work_handler_t nxt_locked_work_queue_pop_work( - nxt_locked_work_queue_t *lwq, void **obj, void **data, nxt_log_t **log); + nxt_locked_work_queue_t *lwq, nxt_task_t **task, void **obj, void **data); /* It should be adjusted with the "work_queue_bucket_items" directive. */ @@ -130,7 +130,7 @@ nxt_work_queue_sleep(nxt_thread_spinlock_t *lock) void nxt_thread_work_queue_add(nxt_thread_t *thr, nxt_work_queue_t *wq, - nxt_work_handler_t handler, void *obj, void *data, nxt_log_t *log) + nxt_work_handler_t handler, nxt_task_t *task, void *obj, void *data) { nxt_work_t *work; @@ -144,9 +144,9 @@ nxt_thread_work_queue_add(nxt_thread_t *thr, nxt_work_queue_t *wq, work->next = NULL; work->handler = handler; + work->task = task; work->obj = obj; work->data = data; - work->log = log; if (wq->tail != NULL) { wq->tail->next = work; @@ -169,7 +169,7 @@ nxt_thread_work_queue_add(nxt_thread_t *thr, nxt_work_queue_t *wq, void nxt_thread_work_queue_push(nxt_thread_t *thr, nxt_work_queue_t *wq, - nxt_work_handler_t handler, void *obj, void *data, nxt_log_t *log) + nxt_work_handler_t handler, nxt_task_t *task, void *obj, void *data) { nxt_work_t *work; @@ -185,7 +185,6 @@ nxt_thread_work_queue_push(nxt_thread_t *thr, nxt_work_queue_t *wq, work->handler = handler; work->obj = obj; work->data = data; - work->log = log; wq->head = work; @@ -223,8 +222,8 @@ nxt_work_queue_attach(nxt_thread_t *thr, nxt_work_queue_t *wq) /* Pop a work from a thread work queue head. */ nxt_work_handler_t -nxt_thread_work_queue_pop(nxt_thread_t *thr, void **obj, void **data, - nxt_log_t **log) +nxt_thread_work_queue_pop(nxt_thread_t *thr, nxt_task_t **task, void **obj, + void **data) { nxt_work_t *work; nxt_work_queue_t *wq; @@ -242,6 +241,7 @@ nxt_thread_work_queue_pop(nxt_thread_t *thr, void **obj, void **data, wq->tail = NULL; } + *task = work->task; *obj = work->obj; nxt_prefetch(*obj); *data = work->data; @@ -250,8 +250,6 @@ nxt_thread_work_queue_pop(nxt_thread_t *thr, void **obj, void **data, work->next = thr->work_queue.cache.next; thr->work_queue.cache.next = work; - *log = work->log; - #if (NXT_DEBUG) if (work->handler == NULL) { @@ -335,7 +333,7 @@ nxt_thread_work_queue_drop(nxt_thread_t *thr, void *data) void nxt_thread_last_work_queue_add(nxt_thread_t *thr, nxt_work_handler_t handler, - void *obj, void *data, nxt_log_t *log) + void *obj, void *data) { nxt_work_t *work; @@ -349,7 +347,6 @@ nxt_thread_last_work_queue_add(nxt_thread_t *thr, nxt_work_handler_t handler, work->handler = handler; work->obj = obj; work->data = data; - work->log = log; if (thr->work_queue.last.tail != NULL) { thr->work_queue.last.tail->next = work; @@ -371,8 +368,8 @@ nxt_thread_last_work_queue_add(nxt_thread_t *thr, nxt_work_handler_t handler, /* Pop a work from the thread last work queue's head. */ nxt_work_handler_t -nxt_thread_last_work_queue_pop(nxt_thread_t *thr, void **obj, void **data, - nxt_log_t **log) +nxt_thread_last_work_queue_pop(nxt_thread_t *thr, nxt_task_t **task, void **obj, + void **data) { nxt_work_t *work; @@ -387,6 +384,7 @@ nxt_thread_last_work_queue_pop(nxt_thread_t *thr, void **obj, void **data, thr->work_queue.last.tail = NULL; } + *task = work->task; *obj = work->obj; nxt_prefetch(*obj); *data = work->data; @@ -395,8 +393,6 @@ nxt_thread_last_work_queue_pop(nxt_thread_t *thr, void **obj, void **data, work->next = thr->work_queue.cache.next; thr->work_queue.cache.next = work; - *log = work->log; - #if (NXT_DEBUG) if (work->handler == NULL) { @@ -491,7 +487,7 @@ nxt_locked_work_queue_destroy(nxt_locked_work_queue_t *lwq) void nxt_locked_work_queue_add(nxt_locked_work_queue_t *lwq, - nxt_work_handler_t handler, void *obj, void *data, nxt_log_t *log) + nxt_work_handler_t handler, nxt_task_t *task, void *obj, void *data) { nxt_work_t *work; @@ -505,9 +501,9 @@ nxt_locked_work_queue_add(nxt_locked_work_queue_t *lwq, work->next = NULL; work->handler = handler; + work->task = task; work->obj = obj; work->data = data; - work->log = log; if (lwq->tail != NULL) { lwq->tail->next = work; @@ -531,14 +527,14 @@ nxt_locked_work_queue_add(nxt_locked_work_queue_t *lwq, /* Pop a work from a locked work queue head. */ nxt_work_handler_t -nxt_locked_work_queue_pop(nxt_locked_work_queue_t *lwq, void **obj, - void **data, nxt_log_t **log) +nxt_locked_work_queue_pop(nxt_locked_work_queue_t *lwq, nxt_task_t **task, + void **obj, void **data) { nxt_work_handler_t handler; nxt_thread_spin_lock(&lwq->lock); - handler = nxt_locked_work_queue_pop_work(lwq, obj, data, log); + handler = nxt_locked_work_queue_pop_work(lwq, task, obj, data); nxt_thread_spin_unlock(&lwq->lock); @@ -547,8 +543,8 @@ nxt_locked_work_queue_pop(nxt_locked_work_queue_t *lwq, void **obj, static nxt_work_handler_t -nxt_locked_work_queue_pop_work(nxt_locked_work_queue_t *lwq, void **obj, - void **data, nxt_log_t **log) +nxt_locked_work_queue_pop_work(nxt_locked_work_queue_t *lwq, nxt_task_t **task, + void **obj, void **data) { nxt_work_t *work; @@ -558,6 +554,7 @@ nxt_locked_work_queue_pop_work(nxt_locked_work_queue_t *lwq, void **obj, return NULL; } + *task = work->task; *obj = work->obj; nxt_prefetch(*obj); *data = work->data; @@ -572,8 +569,6 @@ nxt_locked_work_queue_pop_work(nxt_locked_work_queue_t *lwq, void **obj, work->next = lwq->cache.next; lwq->cache.next = work; - *log = work->log; - return work->handler; } @@ -585,7 +580,7 @@ nxt_locked_work_queue_move(nxt_thread_t *thr, nxt_locked_work_queue_t *lwq, nxt_work_queue_t *wq) { void *obj, *data; - nxt_log_t *log; + nxt_task_t *task; nxt_work_handler_t handler; /* Locked work queue head can be tested without a lock. */ @@ -597,13 +592,15 @@ nxt_locked_work_queue_move(nxt_thread_t *thr, nxt_locked_work_queue_t *lwq, nxt_thread_spin_lock(&lwq->lock); for ( ;; ) { - handler = nxt_locked_work_queue_pop_work(lwq, &obj, &data, &log); + handler = nxt_locked_work_queue_pop_work(lwq, &task, &obj, &data); if (handler == NULL) { break; } - nxt_thread_work_queue_add(thr, wq, handler, obj, data, log); + task->thread = thr; + + nxt_thread_work_queue_add(thr, wq, handler, task, obj, data); } nxt_thread_spin_unlock(&lwq->lock); |