summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_job.c
diff options
context:
space:
mode:
authorIgor Sysoev <igor@sysoev.ru>2017-01-23 19:56:03 +0300
committerIgor Sysoev <igor@sysoev.ru>2017-01-23 19:56:03 +0300
commitde532922d9ab42aa15b40d47c8db53ac2af38500 (patch)
treed6b7c6b21c7a6e0e3620a3e0c7198e63454d05e3 /src/nxt_job.c
parent16cbf3c076a0aca6d47adaf3f719493674cf2363 (diff)
downloadunit-de532922d9ab42aa15b40d47c8db53ac2af38500.tar.gz
unit-de532922d9ab42aa15b40d47c8db53ac2af38500.tar.bz2
Introducing tasks.
Diffstat (limited to 'src/nxt_job.c')
-rw-r--r--src/nxt_job.c49
1 files changed, 29 insertions, 20 deletions
diff --git a/src/nxt_job.c b/src/nxt_job.c
index 2b7d8818..e1256d61 100644
--- a/src/nxt_job.c
+++ b/src/nxt_job.c
@@ -8,8 +8,8 @@
#if (NXT_THREADS)
-static void nxt_job_thread_trampoline(nxt_thread_t *thr, void *obj, void *data);
-static void nxt_job_thread_return_handler(nxt_thread_t *thr, void *obj,
+static void nxt_job_thread_trampoline(nxt_task_t *task, void *obj, void *data);
+static void nxt_job_thread_return_handler(nxt_task_t *task, void *obj,
void *data);
#endif
@@ -44,6 +44,8 @@ nxt_job_create(nxt_mem_pool_t *mp, size_t size)
/* Allow safe nxt_queue_remove() in nxt_job_destroy(). */
nxt_queue_self(&job->link);
+ job->task.ident = nxt_task_next_ident();
+
return job;
}
@@ -56,6 +58,8 @@ nxt_job_init(nxt_job_t *job, size_t size)
nxt_job_set_name(job, "job");
nxt_queue_self(&job->link);
+
+ job->task.ident = nxt_task_next_ident();
}
@@ -103,18 +107,19 @@ nxt_job_cleanup_add(nxt_mem_pool_t *mp, nxt_job_t *job)
*/
void
-nxt_job_start(nxt_thread_t *thr, nxt_job_t *job, nxt_work_handler_t handler)
+nxt_job_start(nxt_task_t *task, nxt_job_t *job, nxt_work_handler_t handler)
{
- nxt_log_debug(thr->log, "%s start", job->name);
+ nxt_debug(task, "%s start", job->name);
#if (NXT_THREADS)
if (job->thread_pool != NULL) {
nxt_int_t ret;
- job->engine = thr->engine;
+ job->engine = task->thread->engine;
+
ret = nxt_thread_pool_post(job->thread_pool, nxt_job_thread_trampoline,
- job, (void *) handler, job->log);
+ &job->task, job, (void *) handler);
if (ret == NXT_OK) {
return;
}
@@ -124,7 +129,7 @@ nxt_job_start(nxt_thread_t *thr, nxt_job_t *job, nxt_work_handler_t handler)
#endif
- handler(thr, job, job->data);
+ handler(&job->task, job, job->data);
}
@@ -133,7 +138,7 @@ nxt_job_start(nxt_thread_t *thr, nxt_job_t *job, nxt_work_handler_t handler)
/* A trampoline function is called by a thread pool thread. */
static void
-nxt_job_thread_trampoline(nxt_thread_t *thr, void *obj, void *data)
+nxt_job_thread_trampoline(nxt_task_t *task, void *obj, void *data)
{
nxt_job_t *job;
nxt_work_handler_t handler;
@@ -141,13 +146,15 @@ nxt_job_thread_trampoline(nxt_thread_t *thr, void *obj, void *data)
job = obj;
handler = (nxt_work_handler_t) data;
- nxt_log_debug(thr->log, "%s thread", job->name);
+ job->task.log = job->log;
+
+ nxt_debug(task, "%s thread", job->name);
if (nxt_slow_path(job->cancel)) {
- nxt_job_return(thr, job, job->abort_handler);
+ nxt_job_return(task, job, job->abort_handler);
} else {
- handler(thr, job, job->data);
+ handler(&job->task, job, job->data);
}
}
@@ -155,35 +162,35 @@ nxt_job_thread_trampoline(nxt_thread_t *thr, void *obj, void *data)
void
-nxt_job_return(nxt_thread_t *thr, nxt_job_t *job, nxt_work_handler_t handler)
+nxt_job_return(nxt_task_t *task, nxt_job_t *job, nxt_work_handler_t handler)
{
- nxt_log_debug(thr->log, "%s return", job->name);
+ nxt_debug(task, "%s return", job->name);
#if (NXT_THREADS)
if (job->engine != NULL) {
/* A return function is called in thread pool thread context. */
nxt_event_engine_post(job->engine, nxt_job_thread_return_handler,
- job, (void *) handler, job->log);
+ &job->task, job, (void *) handler, job->log);
return;
}
#endif
if (nxt_slow_path(job->cancel)) {
- nxt_log_debug(thr->log, "%s cancellation", job->name);
+ nxt_debug(task, "%s cancellation", job->name);
handler = job->abort_handler;
}
- nxt_thread_work_queue_push(thr, &thr->work_queue.main,
- handler, job, job->data, thr->log);
+ nxt_thread_work_queue_push(task->thread, &task->thread->work_queue.main,
+ handler, &job->task, job, job->data);
}
#if (NXT_THREADS)
static void
-nxt_job_thread_return_handler(nxt_thread_t *thr, void *obj, void *data)
+nxt_job_thread_return_handler(nxt_task_t *task, void *obj, void *data)
{
nxt_job_t *job;
nxt_work_handler_t handler;
@@ -191,12 +198,14 @@ nxt_job_thread_return_handler(nxt_thread_t *thr, void *obj, void *data)
job = obj;
handler = (nxt_work_handler_t) data;
+ job->task.thread = task->thread;
+
if (nxt_slow_path(job->cancel)) {
- nxt_log_debug(thr->log, "%s cancellation", job->name);
+ nxt_debug(task, "%s cancellation", job->name);
handler = job->abort_handler;
}
- handler(thr, job, job->data);
+ handler(&job->task, job, job->data);
}
#endif