/*
* Copyright (C) Igor Sysoev
* Copyright (C) NGINX, Inc.
*/
#include <nxt_main.h>
static nxt_int_t nxt_buf_filter_nobuf(nxt_buf_filter_t *f);
nxt_inline void nxt_buf_filter_next(nxt_buf_filter_t *f);
static void nxt_buf_filter_file_read_start(nxt_task_t *task,
nxt_buf_filter_t *f);
static void nxt_buf_filter_file_read(nxt_task_t *task, nxt_buf_filter_t *f);
static void nxt_buf_filter_file_job_completion(nxt_task_t *task, void *obj,
void *data);
static void nxt_buf_filter_buf_completion(nxt_task_t *task, void *obj,
void *data);
static void nxt_buf_filter_file_read_error(nxt_task_t *task, void *obj,
void *data);
void
nxt_buf_filter_add(nxt_task_t *task, nxt_buf_filter_t *f, nxt_buf_t *b)
{
nxt_buf_chain_add(&f->input, b);
nxt_buf_filter(task, f, NULL);
}
void
nxt_buf_filter(nxt_task_t *task, void *obj, void *data)
{
nxt_int_t ret;
nxt_buf_t *b;
nxt_buf_filter_t *f;
f = obj;
nxt_debug(task, "buf filter");
if (f->done) {
return;
}
f->queued = 0;
for ( ;; ) {
/*
* f->input is a chain of original incoming buffers: memory,
* mapped, file, and sync buffers;
* f->current is a currently processed memory buffer or a chain
* of memory/file or mapped/file buffers which are read of
* or populated from file;
* f->output is a chain of output buffers;
* f->last is the last output buffer in the chain.
*/
b = f->current;
nxt_debug(task, "buf filter current: %p", b);
if (b == NULL) {
if (f->reading) {
return;
}
b = f->input;
nxt_debug(task, "buf filter input: %p", b);
if (b == NULL) {
/*
* The end of the input chain, pass
* the output chain to the next filter.
*/
nxt_buf_filter_next(f);
return;
}
if (nxt_buf_is_mem(b)) {
f->current = b;
f->input = b->next;
b->next = NULL;
} else if (nxt_buf_is_file(b)) {
if (f->run->filter_ready(f) != NXT_OK) {
nxt_buf_filter_next(f);
}
nxt_buf_filter_file_read_start(task, f);
return;
}
}
if (nxt_buf_is_sync(b)) {
ret = NXT_OK;
f->current = b;
f->input = b->next;
b->next = NULL;
if (nxt_buf_is_nobuf(b)) {
ret = f->run->filter_sync_nobuf(f);
} else if (nxt_buf_is_flush(b)) {
ret = f->run->filter_sync_flush(f);
} else if (nxt_buf_is_last(b)) {
ret = f->run->filter_sync_last(f);
f->done = (ret == NXT_OK);
}
if (nxt_fast_path(ret == NXT_OK)) {
continue;
}
if (nxt_slow_path(ret == NXT_ERROR)) {
goto fail;
}
/* ret == NXT_AGAIN: No filter internal buffers available. */
goto nobuf;
}
ret = f->run->filter_process(f);
if (nxt_fast_path(ret == NXT_OK)) {
b = f->current;
/*
* A filter may just move f->current to f->output
* and then set f->current to NULL.
*/
if (b != NULL && b->mem.pos == b->mem.free) {
f->current = b->next;
nxt_thread_work_queue_add(task->thread, f->work_queue,
b->completion_handler,
task, b, b->parent);
}
continue;
}
if (nxt_slow_path(ret == NXT_ERROR)) {
goto fail;
}
/* ret == NXT_AGAIN: No filter internal buffers available. */
goto nobuf;
}
nobuf:
/* ret == NXT_AGAIN: No filter internal buffers available. */
if (nxt_buf_filter_nobuf(f) == NXT_OK) {
return;
}
fail:
nxt_thread_work_queue_add(task->thread, f->work_queue, f->run->filter_error,
task, f, f->data);
}
static nxt_int_t
nxt_buf_filter_nobuf(nxt_buf_filter_t *f)
{
nxt_buf_t *b;
nxt_thread_log_debug("buf filter nobuf");
b = nxt_buf_sync_alloc(f->mem_pool, NXT_BUF_SYNC_NOBUF);
if (nxt_fast_path(b != NULL)) {
nxt_buf_chain_add(&f->output, b);
f->last = NULL;
f->run->filter_next(f);
f->output = NULL;
return NXT_OK;
}
return NXT_ERROR;
}
nxt_inline void
nxt_buf_filter_next(nxt_buf_filter_t *f)
{
if (f->output != NULL) {
f->last = NULL;
f->run->filter_next(f);
f->output = NULL;
}
}
void
nxt_buf_filter_enqueue(nxt_task_t *task, nxt_buf_filter_t *f)
{
nxt_debug(task, "buf filter enqueue: %d", f->queued);
if (!f->queued && !f->done) {
f->queued = 1;
nxt_thread_work_queue_add(task->thread, f->work_queue, nxt_buf_filter,
task, f, NULL);
}
}
static void
nxt_buf_filter_file_read_start(nxt_task_t *task, nxt_buf_filter_t *f)
{
nxt_job_file_t *jbf;
nxt_buf_filter_file_t *ff;
ff = f->run->job_file_create(f);
if (nxt_slow_path(ff == NULL)) {
nxt_thread_work_queue_add(task->thread, f->work_queue,
f->run->filter_error,
task, f, f->data);
return;
}
f->filter_file = ff;
jbf = &ff->job_file;
jbf->file = *f->input->file;
jbf->ready_handler = nxt_buf_filter_file_job_completion;
jbf->error_handler = nxt_buf_filter_file_read_error;
nxt_job_set_name(&jbf->job, "buf filter job file");
f->reading = 1;
nxt_buf_filter_file_read(task, f);
}
static void
nxt_buf_filter_file_read(nxt_task_t *task, nxt_buf_filter_t *f)
{
nxt_int_t ret;
nxt_off_t size;
nxt_buf_t *b;
nxt_buf_filter_file_t *ff;
ff = f->filter_file;
if (ff->job_file.buffer != NULL) {
/* File is now being read. */
return;
}
size = f->input->file_end - f->input->file_pos;
if (size > (nxt_off_t) NXT_SIZE_T_MAX) {
/*
* Small size value is a hint for buffer pool allocation
* size, but if size of the size_t type is lesser than size
* of the nxt_off_t type, the large size value may be truncated,
* so use a default buffer pool allocation size.
*/
size = 0;
}
if (f->mmap) {
ret = nxt_buf_pool_mmap_alloc(&ff->buffers, (size_t) size);
} else {
ret = nxt_buf_pool_file_alloc(&ff->buffers, (size_t) size);
}
if (nxt_fast_path(ret == NXT_OK)) {
b = ff->buffers.current;
b->file_pos = f->input->file_pos;
b->file_end = f->input->file_pos;
b->file = f->input->file;
ff->job_file.buffer = b;
ff->job_file.offset = f->input->file_pos;
f->run->job_file_retain(f);
nxt_job_file_read(task, &ff->job_file.job);
return;
}
if (nxt_fast_path(ret != NXT_ERROR)) {
/* ret == NXT_AGAIN: No buffers available. */
if (f->buffering) {
f->buffering = 0;
if (nxt_fast_path(f->run->filter_flush(f) != NXT_ERROR)) {
return;
}
} else if (nxt_fast_path(nxt_buf_filter_nobuf(f) == NXT_OK)) {
return;
}
}
nxt_thread_work_queue_add(task->thread, f->work_queue, f->run->filter_error,
task, f, f->data);
}
typedef struct {
nxt_buf_filter_t *filter;
nxt_buf_t *buf;
} nxt_buf_filter_ctx_t;
static void
nxt_buf_filter_file_job_completion(nxt_task_t *task, void *obj, void *data)
{
nxt_buf_t *b;
nxt_bool_t done;
nxt_job_file_t *jbf;
nxt_buf_filter_t *f;
nxt_buf_filter_ctx_t *ctx;
jbf = obj;
f = data;
b = jbf->buffer;
jbf->buffer = NULL;
nxt_debug(task, "buf filter file completion: \"%FN\" %O-%O",
jbf->file.name, b->file_pos, b->file_end);
f->run->job_file_release(f);
ctx = nxt_mem_cache_alloc0(f->mem_pool, sizeof(nxt_buf_filter_ctx_t));
if (nxt_slow_path(ctx == NULL)) {
goto fail;
}
ctx->filter = f;
ctx->buf = f->input;
f->input->file_pos = b->file_end;
done = (f->input->file_pos == f->input->file_end);
if (done) {
f->input = f->input->next;
f->reading = 0;
}
b->data = f->data;
b->completion_handler = nxt_buf_filter_buf_completion;
b->parent = (nxt_buf_t *) ctx;
b->next = NULL;
nxt_buf_chain_add(&f->current, b);
nxt_buf_filter(task, f, NULL);
if (b->mem.pos == b->mem.free) {
/*
* The buffer has been completely processed by nxt_buf_filter(),
* its completion handler has been placed in workqueue and
* nxt_buf_filter_buf_completion() should be eventually called.
*/
return;
}
if (!done) {
/* Try to allocate another buffer and read the next file part. */
nxt_buf_filter_file_read(task, f);
}
return;
fail:
nxt_thread_work_queue_add(task->thread, f->work_queue, f->run->filter_error,
task, f, f->data);
}
static void
nxt_buf_filter_buf_completion(nxt_task_t *task, void *obj, void *data)
{
nxt_buf_t *fb, *b;
nxt_buf_filter_t *f;
nxt_buf_filter_ctx_t *ctx;
b = obj;
ctx = data;
f = ctx->filter;
nxt_debug(task, "buf filter completion: %p \"%FN\" %O-%O",
b, f->filter_file->job_file.file.name, b->file_pos, b->file_end);
/* nxt_http_send_filter() might clear a buffer's file status. */
b->is_file = 1;
fb = ctx->buf;
nxt_mp_free(f->mem_pool, ctx);
nxt_buf_pool_free(&f->filter_file->buffers, b);
if (fb->file_pos < fb->file_end) {
nxt_buf_filter_file_read(task, f);
return;
}
if (b->file_end == fb->file_end) {
nxt_buf_pool_destroy(&f->filter_file->buffers);
nxt_job_destroy(&f->filter_file->job_file.job);
nxt_thread_work_queue_add(task->thread, f->work_queue,
fb->completion_handler,
task, fb, fb->parent);
}
nxt_buf_filter(task, f, NULL);
}
static void
nxt_buf_filter_file_read_error(nxt_task_t *task, void *obj, void *data)
{
nxt_buf_filter_t *f;
f = data;
nxt_thread_work_queue_add(task->thread, f->work_queue, f->run->filter_error,
task, f, f->data);
}