summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_buf_filter.c
diff options
context:
space:
mode:
authorIgor Sysoev <igor@sysoev.ru>2017-01-23 19:56:03 +0300
committerIgor Sysoev <igor@sysoev.ru>2017-01-23 19:56:03 +0300
commitde532922d9ab42aa15b40d47c8db53ac2af38500 (patch)
treed6b7c6b21c7a6e0e3620a3e0c7198e63454d05e3 /src/nxt_buf_filter.c
parent16cbf3c076a0aca6d47adaf3f719493674cf2363 (diff)
downloadunit-de532922d9ab42aa15b40d47c8db53ac2af38500.tar.gz
unit-de532922d9ab42aa15b40d47c8db53ac2af38500.tar.bz2
Introducing tasks.
Diffstat (limited to 'src/nxt_buf_filter.c')
-rw-r--r--src/nxt_buf_filter.c95
1 files changed, 48 insertions, 47 deletions
diff --git a/src/nxt_buf_filter.c b/src/nxt_buf_filter.c
index 0f040fc9..fff11f96 100644
--- a/src/nxt_buf_filter.c
+++ b/src/nxt_buf_filter.c
@@ -9,28 +9,28 @@
static nxt_int_t nxt_buf_filter_nobuf(nxt_buf_filter_t *f);
nxt_inline void nxt_buf_filter_next(nxt_buf_filter_t *f);
-static void nxt_buf_filter_file_read_start(nxt_thread_t *thr,
+static void nxt_buf_filter_file_read_start(nxt_task_t *task,
nxt_buf_filter_t *f);
-static void nxt_buf_filter_file_read(nxt_thread_t *thr, nxt_buf_filter_t *f);
-static void nxt_buf_filter_file_job_completion(nxt_thread_t *thr,
- void *obj, void *data);
-static void nxt_buf_filter_buf_completion(nxt_thread_t *thr, void *obj,
+static void nxt_buf_filter_file_read(nxt_task_t *task, nxt_buf_filter_t *f);
+static void nxt_buf_filter_file_job_completion(nxt_task_t *task, void *obj,
void *data);
-static void nxt_buf_filter_file_read_error(nxt_thread_t *thr, void *obj,
+static void nxt_buf_filter_buf_completion(nxt_task_t *task, void *obj,
+ void *data);
+static void nxt_buf_filter_file_read_error(nxt_task_t *task, void *obj,
void *data);
void
-nxt_buf_filter_add(nxt_thread_t *thr, nxt_buf_filter_t *f, nxt_buf_t *b)
+nxt_buf_filter_add(nxt_task_t *task, nxt_buf_filter_t *f, nxt_buf_t *b)
{
nxt_buf_chain_add(&f->input, b);
- nxt_buf_filter(thr, f, NULL);
+ nxt_buf_filter(task, f, NULL);
}
void
-nxt_buf_filter(nxt_thread_t *thr, void *obj, void *data)
+nxt_buf_filter(nxt_task_t *task, void *obj, void *data)
{
nxt_int_t ret;
nxt_buf_t *b;
@@ -38,7 +38,7 @@ nxt_buf_filter(nxt_thread_t *thr, void *obj, void *data)
f = obj;
- nxt_log_debug(thr->log, "buf filter");
+ nxt_debug(task, "buf filter");
if (f->done) {
return;
@@ -59,7 +59,7 @@ nxt_buf_filter(nxt_thread_t *thr, void *obj, void *data)
b = f->current;
- nxt_log_debug(thr->log, "buf filter current: %p", b);
+ nxt_debug(task, "buf filter current: %p", b);
if (b == NULL) {
@@ -69,7 +69,7 @@ nxt_buf_filter(nxt_thread_t *thr, void *obj, void *data)
b = f->input;
- nxt_log_debug(thr->log, "buf filter input: %p", b);
+ nxt_debug(task, "buf filter input: %p", b);
if (b == NULL) {
/*
@@ -93,7 +93,7 @@ nxt_buf_filter(nxt_thread_t *thr, void *obj, void *data)
nxt_buf_filter_next(f);
}
- nxt_buf_filter_file_read_start(thr, f);
+ nxt_buf_filter_file_read_start(task, f);
return;
}
}
@@ -139,9 +139,9 @@ nxt_buf_filter(nxt_thread_t *thr, void *obj, void *data)
*/
if (b != NULL && b->mem.pos == b->mem.free) {
f->current = b->next;
- nxt_thread_work_queue_add(thr, f->work_queue,
+ nxt_thread_work_queue_add(task->thread, f->work_queue,
b->completion_handler,
- b, b->parent, thr->log);
+ task, b, b->parent);
}
continue;
@@ -165,8 +165,8 @@ nobuf:
fail:
- nxt_thread_work_queue_add(thr, f->work_queue, f->run->filter_error,
- f, f->data, thr->log);
+ nxt_thread_work_queue_add(task->thread, f->work_queue, f->run->filter_error,
+ task, f, f->data);
}
@@ -208,20 +208,20 @@ nxt_buf_filter_next(nxt_buf_filter_t *f)
void
-nxt_buf_filter_enqueue(nxt_thread_t *thr, nxt_buf_filter_t *f)
+nxt_buf_filter_enqueue(nxt_task_t *task, nxt_buf_filter_t *f)
{
- nxt_log_debug(thr->log, "buf filter enqueue: %d", f->queued);
+ nxt_debug(task, "buf filter enqueue: %d", f->queued);
if (!f->queued && !f->done) {
f->queued = 1;
- nxt_thread_work_queue_add(thr, f->work_queue, nxt_buf_filter,
- f, NULL, thr->log);
+ nxt_thread_work_queue_add(task->thread, f->work_queue, nxt_buf_filter,
+ task, f, NULL);
}
}
static void
-nxt_buf_filter_file_read_start(nxt_thread_t *thr, nxt_buf_filter_t *f)
+nxt_buf_filter_file_read_start(nxt_task_t *task, nxt_buf_filter_t *f)
{
nxt_job_file_t *jbf;
nxt_buf_filter_file_t *ff;
@@ -229,8 +229,9 @@ nxt_buf_filter_file_read_start(nxt_thread_t *thr, nxt_buf_filter_t *f)
ff = f->run->job_file_create(f);
if (nxt_slow_path(ff == NULL)) {
- nxt_thread_work_queue_add(thr, f->work_queue, f->run->filter_error,
- f, f->data, thr->log);
+ nxt_thread_work_queue_add(task->thread, f->work_queue,
+ f->run->filter_error,
+ task, f, f->data);
return;
}
@@ -246,12 +247,12 @@ nxt_buf_filter_file_read_start(nxt_thread_t *thr, nxt_buf_filter_t *f)
f->reading = 1;
- nxt_buf_filter_file_read(thr, f);
+ nxt_buf_filter_file_read(task, f);
}
static void
-nxt_buf_filter_file_read(nxt_thread_t *thr, nxt_buf_filter_t *f)
+nxt_buf_filter_file_read(nxt_task_t *task, nxt_buf_filter_t *f)
{
nxt_int_t ret;
nxt_off_t size;
@@ -296,7 +297,7 @@ nxt_buf_filter_file_read(nxt_thread_t *thr, nxt_buf_filter_t *f)
f->run->job_file_retain(f);
- nxt_job_file_read(thr, &ff->job_file.job);
+ nxt_job_file_read(task, &ff->job_file.job);
return;
}
@@ -316,8 +317,8 @@ nxt_buf_filter_file_read(nxt_thread_t *thr, nxt_buf_filter_t *f)
}
}
- nxt_thread_work_queue_add(thr, f->work_queue, f->run->filter_error,
- f, f->data, thr->log);
+ nxt_thread_work_queue_add(task->thread, f->work_queue, f->run->filter_error,
+ task, f, f->data);
}
@@ -328,7 +329,7 @@ typedef struct {
static void
-nxt_buf_filter_file_job_completion(nxt_thread_t *thr, void *obj, void *data)
+nxt_buf_filter_file_job_completion(nxt_task_t *task, void *obj, void *data)
{
nxt_buf_t *b;
nxt_bool_t done;
@@ -341,8 +342,8 @@ nxt_buf_filter_file_job_completion(nxt_thread_t *thr, void *obj, void *data)
b = jbf->buffer;
jbf->buffer = NULL;
- nxt_log_debug(thr->log, "buf filter file completion: \"%FN\" %O-%O",
- jbf->file.name, b->file_pos, b->file_end);
+ nxt_debug(task, "buf filter file completion: \"%FN\" %O-%O",
+ jbf->file.name, b->file_pos, b->file_end);
f->run->job_file_release(f);
@@ -370,7 +371,7 @@ nxt_buf_filter_file_job_completion(nxt_thread_t *thr, void *obj, void *data)
nxt_buf_chain_add(&f->current, b);
- nxt_buf_filter(thr, f, NULL);
+ nxt_buf_filter(task, f, NULL);
if (b->mem.pos == b->mem.free) {
/*
@@ -383,20 +384,20 @@ nxt_buf_filter_file_job_completion(nxt_thread_t *thr, void *obj, void *data)
if (!done) {
/* Try to allocate another buffer and read the next file part. */
- nxt_buf_filter_file_read(thr, f);
+ nxt_buf_filter_file_read(task, f);
}
return;
fail:
- nxt_thread_work_queue_add(thr, f->work_queue, f->run->filter_error,
- f, f->data, thr->log);
+ nxt_thread_work_queue_add(task->thread, f->work_queue, f->run->filter_error,
+ task, f, f->data);
}
static void
-nxt_buf_filter_buf_completion(nxt_thread_t *thr, void *obj, void *data)
+nxt_buf_filter_buf_completion(nxt_task_t *task, void *obj, void *data)
{
nxt_buf_t *fb, *b;
nxt_buf_filter_t *f;
@@ -406,9 +407,8 @@ nxt_buf_filter_buf_completion(nxt_thread_t *thr, void *obj, void *data)
ctx = data;
f = ctx->filter;
- nxt_log_debug(thr->log, "buf filter completion: %p \"%FN\" %O-%O",
- b, f->filter_file->job_file.file.name,
- b->file_pos, b->file_end);
+ nxt_debug(task, "buf filter completion: %p \"%FN\" %O-%O",
+ b, f->filter_file->job_file.file.name, b->file_pos, b->file_end);
/* nxt_http_send_filter() might clear a buffer's file status. */
b->is_file = 1;
@@ -419,7 +419,7 @@ nxt_buf_filter_buf_completion(nxt_thread_t *thr, void *obj, void *data)
nxt_buf_pool_free(&f->filter_file->buffers, b);
if (fb->file_pos < fb->file_end) {
- nxt_buf_filter_file_read(thr, f);
+ nxt_buf_filter_file_read(task, f);
return;
}
@@ -428,21 +428,22 @@ nxt_buf_filter_buf_completion(nxt_thread_t *thr, void *obj, void *data)
nxt_job_destroy(&f->filter_file->job_file.job);
- nxt_thread_work_queue_add(thr, f->work_queue, fb->completion_handler,
- fb, fb->parent, thr->log);
+ nxt_thread_work_queue_add(task->thread, f->work_queue,
+ fb->completion_handler,
+ task, fb, fb->parent);
}
- nxt_buf_filter(thr, f, NULL);
+ nxt_buf_filter(task, f, NULL);
}
static void
-nxt_buf_filter_file_read_error(nxt_thread_t *thr, void *obj, void *data)
+nxt_buf_filter_file_read_error(nxt_task_t *task, void *obj, void *data)
{
nxt_buf_filter_t *f;
f = data;
- nxt_thread_work_queue_add(thr, f->work_queue, f->run->filter_error,
- f, f->data, thr->log);
+ nxt_thread_work_queue_add(task->thread, f->work_queue, f->run->filter_error,
+ task, f, f->data);
}