diff options
author | Igor Sysoev <igor@sysoev.ru> | 2017-01-23 19:56:03 +0300 |
---|---|---|
committer | Igor Sysoev <igor@sysoev.ru> | 2017-01-23 19:56:03 +0300 |
commit | de532922d9ab42aa15b40d47c8db53ac2af38500 (patch) | |
tree | d6b7c6b21c7a6e0e3620a3e0c7198e63454d05e3 /src/nxt_buf_filter.c | |
parent | 16cbf3c076a0aca6d47adaf3f719493674cf2363 (diff) | |
download | unit-de532922d9ab42aa15b40d47c8db53ac2af38500.tar.gz unit-de532922d9ab42aa15b40d47c8db53ac2af38500.tar.bz2 |
Introducing tasks.
Diffstat (limited to 'src/nxt_buf_filter.c')
-rw-r--r-- | src/nxt_buf_filter.c | 95 |
1 files changed, 48 insertions, 47 deletions
diff --git a/src/nxt_buf_filter.c b/src/nxt_buf_filter.c index 0f040fc9..fff11f96 100644 --- a/src/nxt_buf_filter.c +++ b/src/nxt_buf_filter.c @@ -9,28 +9,28 @@ static nxt_int_t nxt_buf_filter_nobuf(nxt_buf_filter_t *f); nxt_inline void nxt_buf_filter_next(nxt_buf_filter_t *f); -static void nxt_buf_filter_file_read_start(nxt_thread_t *thr, +static void nxt_buf_filter_file_read_start(nxt_task_t *task, nxt_buf_filter_t *f); -static void nxt_buf_filter_file_read(nxt_thread_t *thr, nxt_buf_filter_t *f); -static void nxt_buf_filter_file_job_completion(nxt_thread_t *thr, - void *obj, void *data); -static void nxt_buf_filter_buf_completion(nxt_thread_t *thr, void *obj, +static void nxt_buf_filter_file_read(nxt_task_t *task, nxt_buf_filter_t *f); +static void nxt_buf_filter_file_job_completion(nxt_task_t *task, void *obj, void *data); -static void nxt_buf_filter_file_read_error(nxt_thread_t *thr, void *obj, +static void nxt_buf_filter_buf_completion(nxt_task_t *task, void *obj, + void *data); +static void nxt_buf_filter_file_read_error(nxt_task_t *task, void *obj, void *data); void -nxt_buf_filter_add(nxt_thread_t *thr, nxt_buf_filter_t *f, nxt_buf_t *b) +nxt_buf_filter_add(nxt_task_t *task, nxt_buf_filter_t *f, nxt_buf_t *b) { nxt_buf_chain_add(&f->input, b); - nxt_buf_filter(thr, f, NULL); + nxt_buf_filter(task, f, NULL); } void -nxt_buf_filter(nxt_thread_t *thr, void *obj, void *data) +nxt_buf_filter(nxt_task_t *task, void *obj, void *data) { nxt_int_t ret; nxt_buf_t *b; @@ -38,7 +38,7 @@ nxt_buf_filter(nxt_thread_t *thr, void *obj, void *data) f = obj; - nxt_log_debug(thr->log, "buf filter"); + nxt_debug(task, "buf filter"); if (f->done) { return; @@ -59,7 +59,7 @@ nxt_buf_filter(nxt_thread_t *thr, void *obj, void *data) b = f->current; - nxt_log_debug(thr->log, "buf filter current: %p", b); + nxt_debug(task, "buf filter current: %p", b); if (b == NULL) { @@ -69,7 +69,7 @@ nxt_buf_filter(nxt_thread_t *thr, void *obj, void *data) b = f->input; - nxt_log_debug(thr->log, "buf filter input: %p", b); + nxt_debug(task, "buf filter input: %p", b); if (b == NULL) { /* @@ -93,7 +93,7 @@ nxt_buf_filter(nxt_thread_t *thr, void *obj, void *data) nxt_buf_filter_next(f); } - nxt_buf_filter_file_read_start(thr, f); + nxt_buf_filter_file_read_start(task, f); return; } } @@ -139,9 +139,9 @@ nxt_buf_filter(nxt_thread_t *thr, void *obj, void *data) */ if (b != NULL && b->mem.pos == b->mem.free) { f->current = b->next; - nxt_thread_work_queue_add(thr, f->work_queue, + nxt_thread_work_queue_add(task->thread, f->work_queue, b->completion_handler, - b, b->parent, thr->log); + task, b, b->parent); } continue; @@ -165,8 +165,8 @@ nobuf: fail: - nxt_thread_work_queue_add(thr, f->work_queue, f->run->filter_error, - f, f->data, thr->log); + nxt_thread_work_queue_add(task->thread, f->work_queue, f->run->filter_error, + task, f, f->data); } @@ -208,20 +208,20 @@ nxt_buf_filter_next(nxt_buf_filter_t *f) void -nxt_buf_filter_enqueue(nxt_thread_t *thr, nxt_buf_filter_t *f) +nxt_buf_filter_enqueue(nxt_task_t *task, nxt_buf_filter_t *f) { - nxt_log_debug(thr->log, "buf filter enqueue: %d", f->queued); + nxt_debug(task, "buf filter enqueue: %d", f->queued); if (!f->queued && !f->done) { f->queued = 1; - nxt_thread_work_queue_add(thr, f->work_queue, nxt_buf_filter, - f, NULL, thr->log); + nxt_thread_work_queue_add(task->thread, f->work_queue, nxt_buf_filter, + task, f, NULL); } } static void -nxt_buf_filter_file_read_start(nxt_thread_t *thr, nxt_buf_filter_t *f) +nxt_buf_filter_file_read_start(nxt_task_t *task, nxt_buf_filter_t *f) { nxt_job_file_t *jbf; nxt_buf_filter_file_t *ff; @@ -229,8 +229,9 @@ nxt_buf_filter_file_read_start(nxt_thread_t *thr, nxt_buf_filter_t *f) ff = f->run->job_file_create(f); if (nxt_slow_path(ff == NULL)) { - nxt_thread_work_queue_add(thr, f->work_queue, f->run->filter_error, - f, f->data, thr->log); + nxt_thread_work_queue_add(task->thread, f->work_queue, + f->run->filter_error, + task, f, f->data); return; } @@ -246,12 +247,12 @@ nxt_buf_filter_file_read_start(nxt_thread_t *thr, nxt_buf_filter_t *f) f->reading = 1; - nxt_buf_filter_file_read(thr, f); + nxt_buf_filter_file_read(task, f); } static void -nxt_buf_filter_file_read(nxt_thread_t *thr, nxt_buf_filter_t *f) +nxt_buf_filter_file_read(nxt_task_t *task, nxt_buf_filter_t *f) { nxt_int_t ret; nxt_off_t size; @@ -296,7 +297,7 @@ nxt_buf_filter_file_read(nxt_thread_t *thr, nxt_buf_filter_t *f) f->run->job_file_retain(f); - nxt_job_file_read(thr, &ff->job_file.job); + nxt_job_file_read(task, &ff->job_file.job); return; } @@ -316,8 +317,8 @@ nxt_buf_filter_file_read(nxt_thread_t *thr, nxt_buf_filter_t *f) } } - nxt_thread_work_queue_add(thr, f->work_queue, f->run->filter_error, - f, f->data, thr->log); + nxt_thread_work_queue_add(task->thread, f->work_queue, f->run->filter_error, + task, f, f->data); } @@ -328,7 +329,7 @@ typedef struct { static void -nxt_buf_filter_file_job_completion(nxt_thread_t *thr, void *obj, void *data) +nxt_buf_filter_file_job_completion(nxt_task_t *task, void *obj, void *data) { nxt_buf_t *b; nxt_bool_t done; @@ -341,8 +342,8 @@ nxt_buf_filter_file_job_completion(nxt_thread_t *thr, void *obj, void *data) b = jbf->buffer; jbf->buffer = NULL; - nxt_log_debug(thr->log, "buf filter file completion: \"%FN\" %O-%O", - jbf->file.name, b->file_pos, b->file_end); + nxt_debug(task, "buf filter file completion: \"%FN\" %O-%O", + jbf->file.name, b->file_pos, b->file_end); f->run->job_file_release(f); @@ -370,7 +371,7 @@ nxt_buf_filter_file_job_completion(nxt_thread_t *thr, void *obj, void *data) nxt_buf_chain_add(&f->current, b); - nxt_buf_filter(thr, f, NULL); + nxt_buf_filter(task, f, NULL); if (b->mem.pos == b->mem.free) { /* @@ -383,20 +384,20 @@ nxt_buf_filter_file_job_completion(nxt_thread_t *thr, void *obj, void *data) if (!done) { /* Try to allocate another buffer and read the next file part. */ - nxt_buf_filter_file_read(thr, f); + nxt_buf_filter_file_read(task, f); } return; fail: - nxt_thread_work_queue_add(thr, f->work_queue, f->run->filter_error, - f, f->data, thr->log); + nxt_thread_work_queue_add(task->thread, f->work_queue, f->run->filter_error, + task, f, f->data); } static void -nxt_buf_filter_buf_completion(nxt_thread_t *thr, void *obj, void *data) +nxt_buf_filter_buf_completion(nxt_task_t *task, void *obj, void *data) { nxt_buf_t *fb, *b; nxt_buf_filter_t *f; @@ -406,9 +407,8 @@ nxt_buf_filter_buf_completion(nxt_thread_t *thr, void *obj, void *data) ctx = data; f = ctx->filter; - nxt_log_debug(thr->log, "buf filter completion: %p \"%FN\" %O-%O", - b, f->filter_file->job_file.file.name, - b->file_pos, b->file_end); + nxt_debug(task, "buf filter completion: %p \"%FN\" %O-%O", + b, f->filter_file->job_file.file.name, b->file_pos, b->file_end); /* nxt_http_send_filter() might clear a buffer's file status. */ b->is_file = 1; @@ -419,7 +419,7 @@ nxt_buf_filter_buf_completion(nxt_thread_t *thr, void *obj, void *data) nxt_buf_pool_free(&f->filter_file->buffers, b); if (fb->file_pos < fb->file_end) { - nxt_buf_filter_file_read(thr, f); + nxt_buf_filter_file_read(task, f); return; } @@ -428,21 +428,22 @@ nxt_buf_filter_buf_completion(nxt_thread_t *thr, void *obj, void *data) nxt_job_destroy(&f->filter_file->job_file.job); - nxt_thread_work_queue_add(thr, f->work_queue, fb->completion_handler, - fb, fb->parent, thr->log); + nxt_thread_work_queue_add(task->thread, f->work_queue, + fb->completion_handler, + task, fb, fb->parent); } - nxt_buf_filter(thr, f, NULL); + nxt_buf_filter(task, f, NULL); } static void -nxt_buf_filter_file_read_error(nxt_thread_t *thr, void *obj, void *data) +nxt_buf_filter_file_read_error(nxt_task_t *task, void *obj, void *data) { nxt_buf_filter_t *f; f = data; - nxt_thread_work_queue_add(thr, f->work_queue, f->run->filter_error, - f, f->data, thr->log); + nxt_thread_work_queue_add(task->thread, f->work_queue, f->run->filter_error, + task, f, f->data); } |