summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_work_queue.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/nxt_work_queue.c51
1 files changed, 24 insertions, 27 deletions
diff --git a/src/nxt_work_queue.c b/src/nxt_work_queue.c
index 914d6125..ffb9317f 100644
--- a/src/nxt_work_queue.c
+++ b/src/nxt_work_queue.c
@@ -30,7 +30,7 @@ static void nxt_work_queue_allocate(nxt_work_queue_cache_t *cache,
static void nxt_work_queue_sleep(nxt_thread_spinlock_t *lock);
static nxt_work_queue_t *nxt_thread_current_work_queue(nxt_thread_t *thr);
static nxt_work_handler_t nxt_locked_work_queue_pop_work(
- nxt_locked_work_queue_t *lwq, void **obj, void **data, nxt_log_t **log);
+ nxt_locked_work_queue_t *lwq, nxt_task_t **task, void **obj, void **data);
/* It should be adjusted with the "work_queue_bucket_items" directive. */
@@ -130,7 +130,7 @@ nxt_work_queue_sleep(nxt_thread_spinlock_t *lock)
void
nxt_thread_work_queue_add(nxt_thread_t *thr, nxt_work_queue_t *wq,
- nxt_work_handler_t handler, void *obj, void *data, nxt_log_t *log)
+ nxt_work_handler_t handler, nxt_task_t *task, void *obj, void *data)
{
nxt_work_t *work;
@@ -144,9 +144,9 @@ nxt_thread_work_queue_add(nxt_thread_t *thr, nxt_work_queue_t *wq,
work->next = NULL;
work->handler = handler;
+ work->task = task;
work->obj = obj;
work->data = data;
- work->log = log;
if (wq->tail != NULL) {
wq->tail->next = work;
@@ -169,7 +169,7 @@ nxt_thread_work_queue_add(nxt_thread_t *thr, nxt_work_queue_t *wq,
void
nxt_thread_work_queue_push(nxt_thread_t *thr, nxt_work_queue_t *wq,
- nxt_work_handler_t handler, void *obj, void *data, nxt_log_t *log)
+ nxt_work_handler_t handler, nxt_task_t *task, void *obj, void *data)
{
nxt_work_t *work;
@@ -185,7 +185,6 @@ nxt_thread_work_queue_push(nxt_thread_t *thr, nxt_work_queue_t *wq,
work->handler = handler;
work->obj = obj;
work->data = data;
- work->log = log;
wq->head = work;
@@ -223,8 +222,8 @@ nxt_work_queue_attach(nxt_thread_t *thr, nxt_work_queue_t *wq)
/* Pop a work from a thread work queue head. */
nxt_work_handler_t
-nxt_thread_work_queue_pop(nxt_thread_t *thr, void **obj, void **data,
- nxt_log_t **log)
+nxt_thread_work_queue_pop(nxt_thread_t *thr, nxt_task_t **task, void **obj,
+ void **data)
{
nxt_work_t *work;
nxt_work_queue_t *wq;
@@ -242,6 +241,7 @@ nxt_thread_work_queue_pop(nxt_thread_t *thr, void **obj, void **data,
wq->tail = NULL;
}
+ *task = work->task;
*obj = work->obj;
nxt_prefetch(*obj);
*data = work->data;
@@ -250,8 +250,6 @@ nxt_thread_work_queue_pop(nxt_thread_t *thr, void **obj, void **data,
work->next = thr->work_queue.cache.next;
thr->work_queue.cache.next = work;
- *log = work->log;
-
#if (NXT_DEBUG)
if (work->handler == NULL) {
@@ -335,7 +333,7 @@ nxt_thread_work_queue_drop(nxt_thread_t *thr, void *data)
void
nxt_thread_last_work_queue_add(nxt_thread_t *thr, nxt_work_handler_t handler,
- void *obj, void *data, nxt_log_t *log)
+ void *obj, void *data)
{
nxt_work_t *work;
@@ -349,7 +347,6 @@ nxt_thread_last_work_queue_add(nxt_thread_t *thr, nxt_work_handler_t handler,
work->handler = handler;
work->obj = obj;
work->data = data;
- work->log = log;
if (thr->work_queue.last.tail != NULL) {
thr->work_queue.last.tail->next = work;
@@ -371,8 +368,8 @@ nxt_thread_last_work_queue_add(nxt_thread_t *thr, nxt_work_handler_t handler,
/* Pop a work from the thread last work queue's head. */
nxt_work_handler_t
-nxt_thread_last_work_queue_pop(nxt_thread_t *thr, void **obj, void **data,
- nxt_log_t **log)
+nxt_thread_last_work_queue_pop(nxt_thread_t *thr, nxt_task_t **task, void **obj,
+ void **data)
{
nxt_work_t *work;
@@ -387,6 +384,7 @@ nxt_thread_last_work_queue_pop(nxt_thread_t *thr, void **obj, void **data,
thr->work_queue.last.tail = NULL;
}
+ *task = work->task;
*obj = work->obj;
nxt_prefetch(*obj);
*data = work->data;
@@ -395,8 +393,6 @@ nxt_thread_last_work_queue_pop(nxt_thread_t *thr, void **obj, void **data,
work->next = thr->work_queue.cache.next;
thr->work_queue.cache.next = work;
- *log = work->log;
-
#if (NXT_DEBUG)
if (work->handler == NULL) {
@@ -491,7 +487,7 @@ nxt_locked_work_queue_destroy(nxt_locked_work_queue_t *lwq)
void
nxt_locked_work_queue_add(nxt_locked_work_queue_t *lwq,
- nxt_work_handler_t handler, void *obj, void *data, nxt_log_t *log)
+ nxt_work_handler_t handler, nxt_task_t *task, void *obj, void *data)
{
nxt_work_t *work;
@@ -505,9 +501,9 @@ nxt_locked_work_queue_add(nxt_locked_work_queue_t *lwq,
work->next = NULL;
work->handler = handler;
+ work->task = task;
work->obj = obj;
work->data = data;
- work->log = log;
if (lwq->tail != NULL) {
lwq->tail->next = work;
@@ -531,14 +527,14 @@ nxt_locked_work_queue_add(nxt_locked_work_queue_t *lwq,
/* Pop a work from a locked work queue head. */
nxt_work_handler_t
-nxt_locked_work_queue_pop(nxt_locked_work_queue_t *lwq, void **obj,
- void **data, nxt_log_t **log)
+nxt_locked_work_queue_pop(nxt_locked_work_queue_t *lwq, nxt_task_t **task,
+ void **obj, void **data)
{
nxt_work_handler_t handler;
nxt_thread_spin_lock(&lwq->lock);
- handler = nxt_locked_work_queue_pop_work(lwq, obj, data, log);
+ handler = nxt_locked_work_queue_pop_work(lwq, task, obj, data);
nxt_thread_spin_unlock(&lwq->lock);
@@ -547,8 +543,8 @@ nxt_locked_work_queue_pop(nxt_locked_work_queue_t *lwq, void **obj,
static nxt_work_handler_t
-nxt_locked_work_queue_pop_work(nxt_locked_work_queue_t *lwq, void **obj,
- void **data, nxt_log_t **log)
+nxt_locked_work_queue_pop_work(nxt_locked_work_queue_t *lwq, nxt_task_t **task,
+ void **obj, void **data)
{
nxt_work_t *work;
@@ -558,6 +554,7 @@ nxt_locked_work_queue_pop_work(nxt_locked_work_queue_t *lwq, void **obj,
return NULL;
}
+ *task = work->task;
*obj = work->obj;
nxt_prefetch(*obj);
*data = work->data;
@@ -572,8 +569,6 @@ nxt_locked_work_queue_pop_work(nxt_locked_work_queue_t *lwq, void **obj,
work->next = lwq->cache.next;
lwq->cache.next = work;
- *log = work->log;
-
return work->handler;
}
@@ -585,7 +580,7 @@ nxt_locked_work_queue_move(nxt_thread_t *thr, nxt_locked_work_queue_t *lwq,
nxt_work_queue_t *wq)
{
void *obj, *data;
- nxt_log_t *log;
+ nxt_task_t *task;
nxt_work_handler_t handler;
/* Locked work queue head can be tested without a lock. */
@@ -597,13 +592,15 @@ nxt_locked_work_queue_move(nxt_thread_t *thr, nxt_locked_work_queue_t *lwq,
nxt_thread_spin_lock(&lwq->lock);
for ( ;; ) {
- handler = nxt_locked_work_queue_pop_work(lwq, &obj, &data, &log);
+ handler = nxt_locked_work_queue_pop_work(lwq, &task, &obj, &data);
if (handler == NULL) {
break;
}
- nxt_thread_work_queue_add(thr, wq, handler, obj, data, log);
+ task->thread = thr;
+
+ nxt_thread_work_queue_add(thr, wq, handler, task, obj, data);
}
nxt_thread_spin_unlock(&lwq->lock);