diff options
author | Igor Sysoev <igor@sysoev.ru> | 2017-01-23 19:56:03 +0300 |
---|---|---|
committer | Igor Sysoev <igor@sysoev.ru> | 2017-01-23 19:56:03 +0300 |
commit | de532922d9ab42aa15b40d47c8db53ac2af38500 (patch) | |
tree | d6b7c6b21c7a6e0e3620a3e0c7198e63454d05e3 /src/nxt_job.c | |
parent | 16cbf3c076a0aca6d47adaf3f719493674cf2363 (diff) | |
download | unit-de532922d9ab42aa15b40d47c8db53ac2af38500.tar.gz unit-de532922d9ab42aa15b40d47c8db53ac2af38500.tar.bz2 |
Introducing tasks.
Diffstat (limited to 'src/nxt_job.c')
-rw-r--r-- | src/nxt_job.c | 49 |
1 files changed, 29 insertions, 20 deletions
diff --git a/src/nxt_job.c b/src/nxt_job.c index 2b7d8818..e1256d61 100644 --- a/src/nxt_job.c +++ b/src/nxt_job.c @@ -8,8 +8,8 @@ #if (NXT_THREADS) -static void nxt_job_thread_trampoline(nxt_thread_t *thr, void *obj, void *data); -static void nxt_job_thread_return_handler(nxt_thread_t *thr, void *obj, +static void nxt_job_thread_trampoline(nxt_task_t *task, void *obj, void *data); +static void nxt_job_thread_return_handler(nxt_task_t *task, void *obj, void *data); #endif @@ -44,6 +44,8 @@ nxt_job_create(nxt_mem_pool_t *mp, size_t size) /* Allow safe nxt_queue_remove() in nxt_job_destroy(). */ nxt_queue_self(&job->link); + job->task.ident = nxt_task_next_ident(); + return job; } @@ -56,6 +58,8 @@ nxt_job_init(nxt_job_t *job, size_t size) nxt_job_set_name(job, "job"); nxt_queue_self(&job->link); + + job->task.ident = nxt_task_next_ident(); } @@ -103,18 +107,19 @@ nxt_job_cleanup_add(nxt_mem_pool_t *mp, nxt_job_t *job) */ void -nxt_job_start(nxt_thread_t *thr, nxt_job_t *job, nxt_work_handler_t handler) +nxt_job_start(nxt_task_t *task, nxt_job_t *job, nxt_work_handler_t handler) { - nxt_log_debug(thr->log, "%s start", job->name); + nxt_debug(task, "%s start", job->name); #if (NXT_THREADS) if (job->thread_pool != NULL) { nxt_int_t ret; - job->engine = thr->engine; + job->engine = task->thread->engine; + ret = nxt_thread_pool_post(job->thread_pool, nxt_job_thread_trampoline, - job, (void *) handler, job->log); + &job->task, job, (void *) handler); if (ret == NXT_OK) { return; } @@ -124,7 +129,7 @@ nxt_job_start(nxt_thread_t *thr, nxt_job_t *job, nxt_work_handler_t handler) #endif - handler(thr, job, job->data); + handler(&job->task, job, job->data); } @@ -133,7 +138,7 @@ nxt_job_start(nxt_thread_t *thr, nxt_job_t *job, nxt_work_handler_t handler) /* A trampoline function is called by a thread pool thread. */ static void -nxt_job_thread_trampoline(nxt_thread_t *thr, void *obj, void *data) +nxt_job_thread_trampoline(nxt_task_t *task, void *obj, void *data) { nxt_job_t *job; nxt_work_handler_t handler; @@ -141,13 +146,15 @@ nxt_job_thread_trampoline(nxt_thread_t *thr, void *obj, void *data) job = obj; handler = (nxt_work_handler_t) data; - nxt_log_debug(thr->log, "%s thread", job->name); + job->task.log = job->log; + + nxt_debug(task, "%s thread", job->name); if (nxt_slow_path(job->cancel)) { - nxt_job_return(thr, job, job->abort_handler); + nxt_job_return(task, job, job->abort_handler); } else { - handler(thr, job, job->data); + handler(&job->task, job, job->data); } } @@ -155,35 +162,35 @@ nxt_job_thread_trampoline(nxt_thread_t *thr, void *obj, void *data) void -nxt_job_return(nxt_thread_t *thr, nxt_job_t *job, nxt_work_handler_t handler) +nxt_job_return(nxt_task_t *task, nxt_job_t *job, nxt_work_handler_t handler) { - nxt_log_debug(thr->log, "%s return", job->name); + nxt_debug(task, "%s return", job->name); #if (NXT_THREADS) if (job->engine != NULL) { /* A return function is called in thread pool thread context. */ nxt_event_engine_post(job->engine, nxt_job_thread_return_handler, - job, (void *) handler, job->log); + &job->task, job, (void *) handler, job->log); return; } #endif if (nxt_slow_path(job->cancel)) { - nxt_log_debug(thr->log, "%s cancellation", job->name); + nxt_debug(task, "%s cancellation", job->name); handler = job->abort_handler; } - nxt_thread_work_queue_push(thr, &thr->work_queue.main, - handler, job, job->data, thr->log); + nxt_thread_work_queue_push(task->thread, &task->thread->work_queue.main, + handler, &job->task, job, job->data); } #if (NXT_THREADS) static void -nxt_job_thread_return_handler(nxt_thread_t *thr, void *obj, void *data) +nxt_job_thread_return_handler(nxt_task_t *task, void *obj, void *data) { nxt_job_t *job; nxt_work_handler_t handler; @@ -191,12 +198,14 @@ nxt_job_thread_return_handler(nxt_thread_t *thr, void *obj, void *data) job = obj; handler = (nxt_work_handler_t) data; + job->task.thread = task->thread; + if (nxt_slow_path(job->cancel)) { - nxt_log_debug(thr->log, "%s cancellation", job->name); + nxt_debug(task, "%s cancellation", job->name); handler = job->abort_handler; } - handler(thr, job, job->data); + handler(&job->task, job, job->data); } #endif |