diff options
Diffstat (limited to '')
-rw-r--r-- | auto/sources | 13 | ||||
-rw-r--r-- | src/nxt_buf_filter.c | 449 | ||||
-rw-r--r-- | src/nxt_buf_filter.h | 115 | ||||
-rw-r--r-- | src/nxt_cache.c | 642 | ||||
-rw-r--r-- | src/nxt_cache.h | 122 | ||||
-rw-r--r-- | src/nxt_fastcgi_record_parse.c | 307 | ||||
-rw-r--r-- | src/nxt_fastcgi_source.c | 750 | ||||
-rw-r--r-- | src/nxt_fastcgi_source.h | 93 | ||||
-rw-r--r-- | src/nxt_file_cache.c | 508 | ||||
-rw-r--r-- | src/nxt_http_source.c | 629 | ||||
-rw-r--r-- | src/nxt_http_source.h | 47 | ||||
-rw-r--r-- | src/nxt_job_file.c | 302 | ||||
-rw-r--r-- | src/nxt_job_file.h | 74 | ||||
-rw-r--r-- | src/nxt_job_file_cache.c | 42 | ||||
-rw-r--r-- | src/nxt_main.h | 4 | ||||
-rw-r--r-- | src/nxt_mem_pool_cleanup.c | 39 | ||||
-rw-r--r-- | src/nxt_mem_pool_cleanup.h | 15 | ||||
-rw-r--r-- | src/nxt_stream_module.c | 131 | ||||
-rw-r--r-- | src/nxt_stream_source.c | 480 | ||||
-rw-r--r-- | src/nxt_stream_source.h | 32 | ||||
-rw-r--r-- | src/nxt_upstream_source.c | 71 | ||||
-rw-r--r-- | src/nxt_upstream_source.h | 83 |
22 files changed, 0 insertions, 4948 deletions
diff --git a/auto/sources b/auto/sources index 2ca78844..f74985b9 100644 --- a/auto/sources +++ b/auto/sources @@ -108,19 +108,6 @@ NXT_LIB_SRCS=" \ src/nxt_fs.c \ " -NXT_LIB_SRC0=" \ - src/nxt_buf_filter.c \ - src/nxt_job_file.c \ - src/nxt_stream_module.c \ - src/nxt_stream_source.c \ - src/nxt_upstream_source.c \ - src/nxt_http_source.c \ - src/nxt_fastcgi_source.c \ - src/nxt_fastcgi_record_parse.c \ -\ - src/nxt_mem_pool_cleanup.h \ - src/nxt_mem_pool_cleanup.c \ -" NXT_LIB_UNIT_SRCS="src/nxt_unit.c" diff --git a/src/nxt_buf_filter.c b/src/nxt_buf_filter.c deleted file mode 100644 index 83e5baa9..00000000 --- a/src/nxt_buf_filter.c +++ /dev/null @@ -1,449 +0,0 @@ - -/* - * Copyright (C) Igor Sysoev - * Copyright (C) NGINX, Inc. - */ - -#include <nxt_main.h> - - -static nxt_int_t nxt_buf_filter_nobuf(nxt_buf_filter_t *f); -nxt_inline void nxt_buf_filter_next(nxt_buf_filter_t *f); -static void nxt_buf_filter_file_read_start(nxt_task_t *task, - nxt_buf_filter_t *f); -static void nxt_buf_filter_file_read(nxt_task_t *task, nxt_buf_filter_t *f); -static void nxt_buf_filter_file_job_completion(nxt_task_t *task, void *obj, - void *data); -static void nxt_buf_filter_buf_completion(nxt_task_t *task, void *obj, - void *data); -static void nxt_buf_filter_file_read_error(nxt_task_t *task, void *obj, - void *data); - - -void -nxt_buf_filter_add(nxt_task_t *task, nxt_buf_filter_t *f, nxt_buf_t *b) -{ - nxt_buf_chain_add(&f->input, b); - - nxt_buf_filter(task, f, NULL); -} - - -void -nxt_buf_filter(nxt_task_t *task, void *obj, void *data) -{ - nxt_int_t ret; - nxt_buf_t *b; - nxt_buf_filter_t *f; - - f = obj; - - nxt_debug(task, "buf filter"); - - if (f->done) { - return; - } - - f->queued = 0; - - for ( ;; ) { - /* - * f->input is a chain of original incoming buffers: memory, - * mapped, file, and sync buffers; - * f->current is a currently processed memory buffer or a chain - * of memory/file or mapped/file buffers which are read of - * or populated from file; - * f->output is a chain of output buffers; - * f->last is the last output buffer in the chain. - */ - - b = f->current; - - nxt_debug(task, "buf filter current: %p", b); - - if (b == NULL) { - - if (f->reading) { - return; - } - - b = f->input; - - nxt_debug(task, "buf filter input: %p", b); - - if (b == NULL) { - /* - * The end of the input chain, pass - * the output chain to the next filter. - */ - nxt_buf_filter_next(f); - - return; - } - - if (nxt_buf_is_mem(b)) { - - f->current = b; - f->input = b->next; - b->next = NULL; - - } else if (nxt_buf_is_file(b)) { - - if (f->run->filter_ready(f) != NXT_OK) { - nxt_buf_filter_next(f); - } - - nxt_buf_filter_file_read_start(task, f); - return; - } - } - - if (nxt_buf_is_sync(b)) { - - ret = NXT_OK; - f->current = b; - f->input = b->next; - b->next = NULL; - - if (nxt_buf_is_nobuf(b)) { - ret = f->run->filter_sync_nobuf(f); - - } else if (nxt_buf_is_flush(b)) { - ret = f->run->filter_sync_flush(f); - - } else if (nxt_buf_is_last(b)) { - ret = f->run->filter_sync_last(f); - - f->done = (ret == NXT_OK); - } - - if (nxt_fast_path(ret == NXT_OK)) { - continue; - } - - if (nxt_slow_path(ret == NXT_ERROR)) { - goto fail; - } - - /* ret == NXT_AGAIN: No filter internal buffers available. */ - goto nobuf; - } - - ret = f->run->filter_process(f); - - if (nxt_fast_path(ret == NXT_OK)) { - b = f->current; - /* - * A filter may just move f->current to f->output - * and then set f->current to NULL. - */ - if (b != NULL && b->mem.pos == b->mem.free) { - f->current = b->next; - nxt_thread_work_queue_add(task->thread, f->work_queue, - b->completion_handler, - task, b, b->parent); - } - - continue; - } - - if (nxt_slow_path(ret == NXT_ERROR)) { - goto fail; - } - - /* ret == NXT_AGAIN: No filter internal buffers available. */ - goto nobuf; - } - -nobuf: - - /* ret == NXT_AGAIN: No filter internal buffers available. */ - - if (nxt_buf_filter_nobuf(f) == NXT_OK) { - return; - } - -fail: - - nxt_thread_work_queue_add(task->thread, f->work_queue, f->run->filter_error, - task, f, f->data); -} - - -static nxt_int_t -nxt_buf_filter_nobuf(nxt_buf_filter_t *f) -{ - nxt_buf_t *b; - - nxt_thread_log_debug("buf filter nobuf"); - - b = nxt_buf_sync_alloc(f->mem_pool, NXT_BUF_SYNC_NOBUF); - - if (nxt_fast_path(b != NULL)) { - - nxt_buf_chain_add(&f->output, b); - f->last = NULL; - - f->run->filter_next(f); - - f->output = NULL; - - return NXT_OK; - } - - return NXT_ERROR; -} - - -nxt_inline void -nxt_buf_filter_next(nxt_buf_filter_t *f) -{ - if (f->output != NULL) { - f->last = NULL; - - f->run->filter_next(f); - f->output = NULL; - } -} - - -void -nxt_buf_filter_enqueue(nxt_task_t *task, nxt_buf_filter_t *f) -{ - nxt_debug(task, "buf filter enqueue: %d", f->queued); - - if (!f->queued && !f->done) { - f->queued = 1; - nxt_thread_work_queue_add(task->thread, f->work_queue, nxt_buf_filter, - task, f, NULL); - } -} - - -static void -nxt_buf_filter_file_read_start(nxt_task_t *task, nxt_buf_filter_t *f) -{ - nxt_job_file_t *jbf; - nxt_buf_filter_file_t *ff; - - ff = f->run->job_file_create(f); - - if (nxt_slow_path(ff == NULL)) { - nxt_thread_work_queue_add(task->thread, f->work_queue, - f->run->filter_error, - task, f, f->data); - return; - } - - f->filter_file = ff; - - jbf = &ff->job_file; - jbf->file = *f->input->file; - - jbf->ready_handler = nxt_buf_filter_file_job_completion; - jbf->error_handler = nxt_buf_filter_file_read_error; - - nxt_job_set_name(&jbf->job, "buf filter job file"); - - f->reading = 1; - - nxt_buf_filter_file_read(task, f); -} - - -static void -nxt_buf_filter_file_read(nxt_task_t *task, nxt_buf_filter_t *f) -{ - nxt_int_t ret; - nxt_off_t size; - nxt_buf_t *b; - nxt_buf_filter_file_t *ff; - - ff = f->filter_file; - - if (ff->job_file.buffer != NULL) { - /* File is now being read. */ - return; - } - - size = f->input->file_end - f->input->file_pos; - - if (size > (nxt_off_t) NXT_SIZE_T_MAX) { - /* - * Small size value is a hint for buffer pool allocation - * size, but if size of the size_t type is lesser than size - * of the nxt_off_t type, the large size value may be truncated, - * so use a default buffer pool allocation size. - */ - size = 0; - } - - if (f->mmap) { - ret = nxt_buf_pool_mmap_alloc(&ff->buffers, (size_t) size); - - } else { - ret = nxt_buf_pool_file_alloc(&ff->buffers, (size_t) size); - } - - if (nxt_fast_path(ret == NXT_OK)) { - b = ff->buffers.current; - - b->file_pos = f->input->file_pos; - b->file_end = f->input->file_pos; - b->file = f->input->file; - - ff->job_file.buffer = b; - ff->job_file.offset = f->input->file_pos; - - f->run->job_file_retain(f); - - nxt_job_file_read(task, &ff->job_file.job); - return; - } - - if (nxt_fast_path(ret != NXT_ERROR)) { - - /* ret == NXT_AGAIN: No buffers available. */ - - if (f->buffering) { - f->buffering = 0; - - if (nxt_fast_path(f->run->filter_flush(f) != NXT_ERROR)) { - return; - } - - } else if (nxt_fast_path(nxt_buf_filter_nobuf(f) == NXT_OK)) { - return; - } - } - - nxt_thread_work_queue_add(task->thread, f->work_queue, f->run->filter_error, - task, f, f->data); -} - - -typedef struct { - nxt_buf_filter_t *filter; - nxt_buf_t *buf; -} nxt_buf_filter_ctx_t; - - -static void -nxt_buf_filter_file_job_completion(nxt_task_t *task, void *obj, void *data) -{ - nxt_buf_t *b; - nxt_bool_t done; - nxt_job_file_t *jbf; - nxt_buf_filter_t *f; - nxt_buf_filter_ctx_t *ctx; - - jbf = obj; - f = data; - b = jbf->buffer; - jbf->buffer = NULL; - - nxt_debug(task, "buf filter file completion: \"%FN\" %O-%O", - jbf->file.name, b->file_pos, b->file_end); - - f->run->job_file_release(f); - - ctx = nxt_mem_cache_alloc0(f->mem_pool, sizeof(nxt_buf_filter_ctx_t)); - if (nxt_slow_path(ctx == NULL)) { - goto fail; - } - - ctx->filter = f; - ctx->buf = f->input; - - f->input->file_pos = b->file_end; - - done = (f->input->file_pos == f->input->file_end); - - if (done) { - f->input = f->input->next; - f->reading = 0; - } - - b->data = f->data; - b->completion_handler = nxt_buf_filter_buf_completion; - b->parent = (nxt_buf_t *) ctx; - b->next = NULL; - - nxt_buf_chain_add(&f->current, b); - - nxt_buf_filter(task, f, NULL); - - if (b->mem.pos == b->mem.free) { - /* - * The buffer has been completely processed by nxt_buf_filter(), - * its completion handler has been placed in workqueue and - * nxt_buf_filter_buf_completion() should be eventually called. - */ - return; - } - - if (!done) { - /* Try to allocate another buffer and read the next file part. */ - nxt_buf_filter_file_read(task, f); - } - - return; - -fail: - - nxt_thread_work_queue_add(task->thread, f->work_queue, f->run->filter_error, - task, f, f->data); -} - - -static void -nxt_buf_filter_buf_completion(nxt_task_t *task, void *obj, void *data) -{ - nxt_buf_t *fb, *b; - nxt_buf_filter_t *f; - nxt_buf_filter_ctx_t *ctx; - - b = obj; - ctx = data; - f = ctx->filter; - - nxt_debug(task, "buf filter completion: %p \"%FN\" %O-%O", - b, f->filter_file->job_file.file.name, b->file_pos, b->file_end); - - /* nxt_http_send_filter() might clear a buffer's file status. */ - b->is_file = 1; - - fb = ctx->buf; - - nxt_mp_free(f->mem_pool, ctx); - nxt_buf_pool_free(&f->filter_file->buffers, b); - - if (fb->file_pos < fb->file_end) { - nxt_buf_filter_file_read(task, f); - return; - } - - if (b->file_end == fb->file_end) { - nxt_buf_pool_destroy(&f->filter_file->buffers); - - nxt_job_destroy(&f->filter_file->job_file.job); - - nxt_thread_work_queue_add(task->thread, f->work_queue, - fb->completion_handler, - task, fb, fb->parent); - } - - nxt_buf_filter(task, f, NULL); -} - - -static void -nxt_buf_filter_file_read_error(nxt_task_t *task, void *obj, void *data) -{ - nxt_buf_filter_t *f; - - f = data; - - nxt_thread_work_queue_add(task->thread, f->work_queue, f->run->filter_error, - task, f, f->data); -} diff --git a/src/nxt_buf_filter.h b/src/nxt_buf_filter.h deleted file mode 100644 index 27487baa..00000000 --- a/src/nxt_buf_filter.h +++ /dev/null @@ -1,115 +0,0 @@ - -/* - * Copyright (C) Igor Sysoev - * Copyright (C) NGINX, Inc. - */ - -#ifndef _NXT_BUF_FILTER_H_INCLUDED_ -#define _NXT_BUF_FILTER_H_INCLUDED_ - - -/* - * nxt_buf_filter is a framework intended to simplify processing file - * buffers content by a filter. The filter should set callbacks and - * call nxt_buf_filter_add() to start processing. - * - * At first buf_filter calls filter_ready() and the filter ensures - * it may allocate or reuse its internal buffer. No real allocation - * is performed at this step. - * - * TODO prevent unneeded allocaiton if no input data. - * - * - * TODO: The filter can flush data buffered - * previously, if all internal buffers are full. - * - * Then buf_filter looks buffer chains. There are two buffer chains: - * the input chain is a chain of original incoming memory, file, and sync - * buffers; and the current chain is a chain of memory/file buffers read - * from a file-only buffer. The current chain is processed first. Since - * buffers in this chain always contains a memory part, they can be passed - * one by one to the filter using filter_process(). If there is an output - * buffer after the buffer processing, it is added to output chain. The - * output buffers are not filter internal buffers. They just point to these - * internal buffers and one internal buffer can correspond to several output - * buffers which point to adjoining parts of the internal buffer. Further - * processing depends on filter_process() result code: if it returns NXT_OK, - * then the filter internal buffer is not full and buf_filter looks the next - * current or input buffer. If result code is NXT_AGAIN, then the filter - * internal buffer is full and buf_filter calls filter_flush() and then - * schedules to run nxt_buf_filter_repeat(). nxt_buf_filter_repeat() will - * run after all ready output buffer completion handlers and will call - * buf_filter again if no one completion handler will do it already using - * nxt_buf_filter_enqueue(). So in any case buf_filter will run again only - * once. - * - * TODO: - * in ideal just one the filter internal buffer. - * This allows to minimize number of the filter internal buffers if they - * flush fast. - * - * If the current chain is empty, the buf_filter processes the input chain. - * Memory buffers are passed to the filter using filter_process(). If an - * input buffer is a file buffer, then buf_filter calls filter_flush() - * and starts a file job to read the buffer in memory. The file job reads - * file parts into memory/file buffers and adds them to the current chain. - * - * Sync buffers are passed to the filter using filter_sync(). Its - * post-processing is similar to the filter_process() post-processing, - * except sync buffers are always added unmodified to the output chain. - */ - -typedef struct { - nxt_job_file_t job_file; - nxt_buf_pool_t buffers; -} nxt_buf_filter_file_t; - - -typedef struct nxt_buf_filter_s nxt_buf_filter_t; - -typedef struct { - nxt_int_t (*filter_ready)(nxt_buf_filter_t *f); - nxt_int_t (*filter_process)(nxt_buf_filter_t *f); - nxt_int_t (*filter_flush)(nxt_buf_filter_t *f); - - nxt_int_t (*filter_sync_nobuf)(nxt_buf_filter_t *f); - nxt_int_t (*filter_sync_flush)(nxt_buf_filter_t *f); - nxt_int_t (*filter_sync_last)(nxt_buf_filter_t *f); - - void (*filter_next)(nxt_buf_filter_t *f); - nxt_work_handler_t filter_error; - - nxt_buf_filter_file_t *(*job_file_create)(nxt_buf_filter_t *f); - void (*job_file_retain)(nxt_buf_filter_t *f); - void (*job_file_release)(nxt_buf_filter_t *f); -} nxt_buf_filter_ops_t; - - -struct nxt_buf_filter_s { - nxt_buf_t *current; - nxt_buf_t *input; - nxt_buf_t *output; - nxt_buf_t *last; - - nxt_work_queue_t *work_queue; - nxt_buf_filter_file_t *filter_file; - void *data; - nxt_mp_t *mem_pool; - - const nxt_buf_filter_ops_t *run; - - uint8_t mmap; /* 1 bit */ - uint8_t done; /* 1 bit */ - uint8_t queued; /* 1 bit */ - uint8_t reading; /* 1 bit */ - uint8_t buffering; /* 1 bit */ -}; - - -NXT_EXPORT void nxt_buf_filter_add(nxt_task_t *task, nxt_buf_filter_t *f, - nxt_buf_t *b); -NXT_EXPORT void nxt_buf_filter(nxt_task_t *task, void *obj, void *data); -NXT_EXPORT void nxt_buf_filter_enqueue(nxt_task_t *task, nxt_buf_filter_t *f); - - -#endif /* _NXT_BUF_FILTER_H_INCLUDED_ */ diff --git a/src/nxt_cache.c b/src/nxt_cache.c deleted file mode 100644 index e81d63dc..00000000 --- a/src/nxt_cache.c +++ /dev/null @@ -1,642 +0,0 @@ - -/* - * Copyright (C) Igor Sysoev - * Copyright (C) NGINX, Inc. - */ - -#include <nxt_main.h> - - -/* A cache time resolution is 10ms. */ -#define nxt_cache_time(thr) \ - (uint64_t) (nxt_thread_time(thr) * 100) - - -static nxt_int_t nxt_cache_lvlhsh_test(nxt_lvlhsh_query_t *lhq, void *data); -static nxt_work_handler_t nxt_cache_query_locked(nxt_cache_t *cache, - nxt_cache_query_t *q, nxt_lvlhsh_query_t *lhq); -static nxt_work_handler_t nxt_cache_node_hold(nxt_cache_t *cache, - nxt_cache_query_t *q, nxt_lvlhsh_query_t *lhq); -static nxt_work_handler_t nxt_cache_node_test(nxt_cache_t *cache, - nxt_cache_query_t *q); - -static void nxt_cache_wait_handler(nxt_thread_t *thr, void *obj, void *data); -static void nxt_cache_timeout_handler(nxt_thread_t *thr, void *obj, void *data); -static void nxt_cache_wake_handler(nxt_thread_t *thr, void *obj, void *data); -static ssize_t nxt_cache_release_locked(nxt_cache_t *cache, - nxt_cache_query_t *q, u_char *buf, size_t size); - -static nxt_cache_node_t *nxt_cache_node_alloc(nxt_cache_t *cache); -static void nxt_cache_node_free(nxt_cache_t *cache, nxt_cache_node_t *node, - nxt_bool_t fast); -static nxt_cache_query_wait_t *nxt_cache_query_wait_alloc(nxt_cache_t *cache, - nxt_bool_t *slow); -static void nxt_cache_query_wait_free(nxt_cache_t *cache, - nxt_cache_query_wait_t *qw); - - -/* STUB */ -nxt_int_t nxt_cache_shm_create(nxt_mem_zone_t *pool); -static void *nxt_cache_shm_alloc(void *data, size_t size, nxt_uint_t nalloc); -/**/ - - -nxt_int_t -nxt_cache_shm_create(nxt_mem_zone_t *mz) -{ - nxt_cache_t *cache; - - static const nxt_lvlhsh_proto_t proto nxt_aligned(64) = { - NXT_LVLHSH_LARGE_SLAB, - 0, - nxt_cache_lvlhsh_test, - (nxt_lvlhsh_alloc_t) nxt_cache_shm_alloc, - (nxt_lvlhsh_free_t) nxt_mem_zone_free, - }; - - cache = nxt_mem_zone_zalloc(mz, sizeof(nxt_cache_t)); - - if (cache == NULL) { - return NXT_ERROR; - } - - cache->proto = &proto; - cache->pool = mz; - - cache->start_time = nxt_cache_time(nxt_thread()); - - return NXT_OK; -} - - -static void * -nxt_cache_shm_alloc(void *data, size_t size, nxt_uint_t nalloc) -{ - return nxt_mem_zone_align(data, size, size); -} - - -void -nxt_cache_init(nxt_cache_t *cache) -{ - static const nxt_lvlhsh_proto_t proto nxt_aligned(64) = { - NXT_LVLHSH_LARGE_MEMALIGN, - 0, - nxt_cache_lvlhsh_test, - nxt_lvlhsh_alloc, - nxt_lvlhsh_free, - }; - - cache->proto = &proto; - - cache->start_time = nxt_cache_time(nxt_thread()); -} - - -static nxt_int_t -nxt_cache_lvlhsh_test(nxt_lvlhsh_query_t *lhq, void *data) -{ - nxt_cache_node_t *node; - - node = data; - - if (nxt_str_eq(&lhq->key, node->key_data, node->key_len)) { - return NXT_OK; - } - - return NXT_DECLINED; -} - - -nxt_inline void -nxt_cache_lock(nxt_cache_t *cache) -{ - if (cache->shared) { - nxt_thread_spin_lock(&cache->lock); - } -} - - -nxt_inline void -nxt_cache_unlock(nxt_cache_t *cache) -{ - if (cache->shared) { - nxt_thread_spin_unlock(&cache->lock); - } -} - - -void -nxt_cache_query(nxt_cache_t *cache, nxt_cache_query_t *q) -{ - nxt_thread_t *thr; - nxt_lvlhsh_query_t lhq; - nxt_work_handler_t handler; - - thr = nxt_thread(); - - if (cache != NULL) { - lhq.key_hash = nxt_murmur_hash2(q->key_data, q->key_len); - lhq.replace = 0; - lhq.key.len = q->key_len; - lhq.key.data = q->key_data; - lhq.proto = cache->proto; - lhq.pool = cache->pool; - - q->now = nxt_cache_time(thr); - - nxt_cache_lock(cache); - - handler = nxt_cache_query_locked(cache, q, &lhq); - - nxt_cache_unlock(cache); - - } else { - handler = q->state->nocache_handler; - } - - handler(thr, q, NULL); -} - - -static nxt_work_handler_t -nxt_cache_query_locked(nxt_cache_t *cache, nxt_cache_query_t *q, - nxt_lvlhsh_query_t *lhq) -{ - nxt_int_t ret; - nxt_time_t expiry; - nxt_cache_node_t *node; - nxt_cache_query_state_t *state; - - if (q->hold) { - return nxt_cache_node_hold(cache, q, lhq); - } - - ret = nxt_lvlhsh_find(&cache->lvlhsh, lhq); - - state = q->state; - - if (ret != NXT_OK) { - /* NXT_DECLINED */ - return state->nocache_handler; - } - - node = lhq->value; - node->count++; - q->node = node; - - expiry = cache->start_time + node->expiry; - - if (q->now < expiry) { - return state->ready_handler; - } - - q->stale = 1; - - return state->stale_handler; -} - - -static nxt_work_handler_t -nxt_cache_node_hold(nxt_cache_t *cache, nxt_cache_query_t *q, - nxt_lvlhsh_query_t *lhq) -{ - nxt_int_t ret; - nxt_bool_t slow; - nxt_cache_node_t *node, *sentinel; - nxt_work_handler_t handler; - nxt_cache_query_wait_t *qw; - nxt_cache_query_state_t *state; - - state = q->state; - sentinel = nxt_cache_node_alloc(cache); - - if (nxt_slow_path(sentinel == NULL)) { - return state->error_handler; - } - - sentinel->key_data = q->key_data; - sentinel->key_len = q->key_len; - lhq->value = sentinel; - - /* - * Try to insert an empty sentinel node to hold updating - * process if there is no existent cache node in cache. - */ - ret = nxt_lvlhsh_insert(&cache->lvlhsh, lhq); - - if (ret == NXT_OK) { - /* The sentinel node was successully added. */ - - q->node = sentinel; - sentinel->updating = 1; - return state->update_handler; - } - - nxt_cache_node_free(cache, sentinel, 1); - - if (ret == NXT_ERROR) { - return state->error_handler; - } - - /* NXT_DECLINED: a cache node exists. */ - - node = lhq->value; - node->count++; - q->node = node; - - handler = nxt_cache_node_test(cache, q); - if (handler != NULL) { - return handler; - } - - /* Add the node to a wait queue. */ - - qw = nxt_cache_query_wait_alloc(cache, &slow); - if (nxt_slow_path(qw == NULL)) { - return state->error_handler; - } - - if (slow) { - /* The node state may have been changed during slow allocation. */ - - handler = nxt_cache_node_test(cache, q); - if (handler != NULL) { - nxt_cache_query_wait_free(cache, qw); - return handler; - } - } - - qw->query = q; - qw->next = node->waiting; - qw->busy = 0; - qw->deleted = 0; - qw->pid = nxt_pid; - qw->engine = nxt_thread_event_engine(); - qw->handler = nxt_cache_wake_handler; - qw->cache = cache; - - node->waiting = qw; - - return nxt_cache_wait_handler; -} - - -static nxt_work_handler_t -nxt_cache_node_test(nxt_cache_t *cache, nxt_cache_query_t *q) -{ - nxt_time_t expiry; - nxt_cache_node_t *node; - nxt_cache_query_state_t *state; - - q->stale = 0; - state = q->state; - node = q->node; - - expiry = cache->start_time + node->expiry; - - if (q->now < expiry) { - return state->ready_handler; - } - - /* - * A valid stale or empty sentinel cache node. - * The sentinel node can be only in updating state. - */ - - if (node->updating) { - - if (node->expiry != 0) { - /* A valid stale cache node. */ - - q->stale = 1; - - if (q->use_stale) { - return state->stale_handler; - } - } - - /* A sentinel node. */ - return NULL; - } - - /* A valid stale cache node is not being updated now. */ - - q->stale = 1; - - if (q->use_stale) { - - if (q->update_stale) { - node->updating = 1; - return state->update_stale_handler; - } - - return state->stale_handler; - } - - node->updating = 1; - return state->update_handler; -} - - -static void -nxt_cache_wait_handler(nxt_thread_t *thr, void *obj, void *data) -{ - nxt_event_timer_t *ev; - nxt_cache_query_t *cq; - - cq = obj; - - if (cq->timeout != 0) { - - ev = &cq->timer; - - if (ev->state == NXT_EVENT_TIMER_DISABLED) { - ev->handler = nxt_cache_timeout_handler; - nxt_event_timer_ident(ev, -1); - - nxt_event_timer_add(thr->engine, ev, cq->timeout); - } - } -} - - -static void -nxt_cache_timeout_handler(nxt_thread_t *thr, void *obj, void *data) -{ - nxt_cache_query_t *cq; - nxt_event_timer_t *ev; - - ev = obj; - - cq = nxt_event_timer_data(ev, nxt_cache_query_t, timer); - - cq->state->timeout_handler(thr, cq, NULL); -} - - -static void -nxt_cache_wake_handler(nxt_thread_t *thr, void *obj, void *data) -{ - nxt_cache_t *cache; - nxt_work_handler_t handler; - nxt_cache_query_t *q; - nxt_cache_query_wait_t *qw; - - qw = obj; - q = qw->query; - cache = qw->cache; - - nxt_cache_lock(cache); - - handler = nxt_cache_node_test(cache, q); - - if (handler != NULL) { - nxt_cache_query_wait_free(cache, qw); - - } else { - /* Wait again. */ - qw->next = q->node->waiting; - q->node->waiting = qw; - } - - nxt_cache_unlock(cache); - - handler(thr, q, NULL); -} - - -nxt_int_t -nxt_cache_update(nxt_cache_t *cache, nxt_cache_query_t *q) -{ - nxt_int_t ret; - nxt_cache_node_t *node; - nxt_lvlhsh_query_t lhq; - - node = q->node; - - node->accessed = nxt_cache_time(nxt_thread()) - cache->start_time; - - node->updating = 0; - node->count = 1; - - lhq.key_hash = nxt_murmur_hash2(node->key_data, node->key_len); - lhq.replace = 1; - lhq.key.len = node->key_len; - lhq.key.data = node->key_data; - lhq.value = node; - lhq.proto = cache->proto; - lhq.pool = cache->pool; - - nxt_cache_lock(cache); - - ret = nxt_lvlhsh_insert(&cache->lvlhsh, &lhq); - - if (nxt_fast_path(ret != NXT_OK)) { - - nxt_queue_insert_head(&cache->expiry_queue, &node->link); - - node = lhq.value; - - if (node != NULL) { - /* A replaced node. */ - - nxt_queue_remove(&node->link); - - if (node->count != 0) { - node->deleted = 1; - - } else { - // delete cache node - } - } - } - - nxt_cache_unlock(cache); - - return ret; -} - - -void -nxt_cache_release(nxt_cache_t *cache, nxt_cache_query_t *q) -{ - u_char *p, *data; - size_t size; - ssize_t ret; - nxt_thread_t *thr; - u_char buf[1024]; - - thr = nxt_thread(); - q->now = nxt_cache_time(thr); - - p = buf; - size = sizeof(buf); - - for ( ;; ) { - nxt_cache_lock(cache); - - ret = nxt_cache_release_locked(cache, q, p, size); - - nxt_cache_unlock(cache); - - if (ret == 0) { - return; - } - - size = nxt_abs(ret); - - data = nxt_malloc(size); - - if (data == NULL) { - /* TODO: retry */ - return; - } - - if (ret < 0) { - p = data; - continue; - } - - if (p != data) { - nxt_memcpy(data, p, size); - } - - nxt_thread_work_queue_add(thr, &thr->work_queue.main, - cache->delete_handler, data, NULL, thr->log); - } -} - - -static ssize_t -nxt_cache_release_locked(nxt_cache_t *cache, nxt_cache_query_t *q, - u_char *buf, size_t size) -{ - ssize_t ret; - nxt_cache_node_t *node; - - node = q->node; - node->count--; - - if (node->count != 0) { - return 0; - } - - if (!node->deleted) { - /* - * A cache node is locked whilst its count is non zero. - * To minimize number of operations the node's place in expiry - * queue can be updated only if the node is not currently used. - */ - node->accessed = q->now - cache->start_time; - - nxt_queue_remove(&node->link); - nxt_queue_insert_head(&cache->expiry_queue, &node->link); - - return 0; - } - - ret = 0; -#if 0 - - ret = cache->delete_copy(cache, node, buf, size); - - if (ret < 0) { - return ret; - } - -#endif - - nxt_cache_node_free(cache, node, 0); - - return ret; -} - - -static nxt_cache_node_t * -nxt_cache_node_alloc(nxt_cache_t *cache) -{ - nxt_queue_link_t *link; - nxt_cache_node_t *node; - - link = nxt_queue_first(&cache->free_nodes); - - if (nxt_fast_path(link != nxt_queue_tail(&cache->free_nodes))) { - cache->nfree_nodes--; - nxt_queue_remove(link); - - node = nxt_queue_link_data(link, nxt_cache_node_t, link); - nxt_memzero(node, sizeof(nxt_cache_node_t)); - - return node; - } - - nxt_cache_unlock(cache); - - node = cache->alloc(cache->data, sizeof(nxt_cache_node_t)); - - nxt_cache_lock(cache); - - return node; -} - - -static void -nxt_cache_node_free(nxt_cache_t *cache, nxt_cache_node_t *node, nxt_bool_t fast) -{ - if (fast || cache->nfree_nodes < 32) { - nxt_queue_insert_head(&cache->free_nodes, &node->link); - cache->nfree_nodes++; - return; - } - - nxt_cache_unlock(cache); - - cache->free(cache->data, node); - - nxt_cache_lock(cache); -} - - -static nxt_cache_query_wait_t * -nxt_cache_query_wait_alloc(nxt_cache_t *cache, nxt_bool_t *slow) -{ - nxt_cache_query_wait_t *qw; - - qw = cache->free_query_wait; - - if (nxt_fast_path(qw != NULL)) { - cache->free_query_wait = qw->next; - cache->nfree_query_wait--; - - *slow = 0; - return qw; - } - - nxt_cache_unlock(cache); - - qw = cache->alloc(cache->data, sizeof(nxt_cache_query_wait_t)); - *slow = 1; - - nxt_cache_lock(cache); - - return qw; -} - - -static void -nxt_cache_query_wait_free(nxt_cache_t *cache, nxt_cache_query_wait_t *qw) -{ - if (cache->nfree_query_wait < 32) { - qw->next = cache->free_query_wait; - cache->free_query_wait = qw; - cache->nfree_query_wait++; - return; - } - - nxt_cache_unlock(cache); - - cache->free(cache->data, qw); - - nxt_cache_lock(cache); -} diff --git a/src/nxt_cache.h b/src/nxt_cache.h deleted file mode 100644 index 567b5581..00000000 --- a/src/nxt_cache.h +++ /dev/null @@ -1,122 +0,0 @@ - -/* - * Copyright (C) Igor Sysoev - * Copyright (C) NGINX, Inc. - */ - -#ifndef _NXT_CACHE_INCLUDED_ -#define _NXT_CACHE_INCLUDED_ - - -typedef struct nxt_cache_query_s nxt_cache_query_t; -typedef struct nxt_cache_query_wait_s nxt_cache_query_wait_t; - - -typedef struct { - uint32_t shared; /* 1 bit */ - nxt_thread_spinlock_t lock; - - nxt_lvlhsh_t lvlhsh; - const nxt_lvlhsh_proto_t *proto; - void *pool; - - nxt_queue_t expiry_queue; - - nxt_queue_t free_nodes; - uint32_t nfree_nodes; - - uint32_t nfree_query_wait; - nxt_cache_query_wait_t *free_query_wait; - - uint64_t start_time; - - /* STUB: use nxt_lvlhsh_proto_t */ - void *(*alloc)(void *data, size_t size); - void (*free)(void *data, void *p); - void *data; - - nxt_work_handler_t delete_handler; -} nxt_cache_t; - - -typedef struct { - u_char *key_data; - - uint16_t key_len; /* 16 bits */ - uint8_t uses; /* 8 bits */ - uint8_t updating:1; - uint8_t deleted:1; - - uint32_t count; - - /* Times relative to the cache->start_time. */ - uint32_t expiry; - uint32_t accessed; - - nxt_off_t size; - - nxt_queue_link_t link; - - nxt_cache_query_wait_t *waiting; -} nxt_cache_node_t; - - -struct nxt_cache_query_wait_s { - nxt_cache_query_t *query; - nxt_cache_query_wait_t *next; - - uint8_t busy; /* 1 bit */ - uint8_t deleted; /* 1 bit */ - - nxt_pid_t pid; - nxt_event_engine_t *engine; - nxt_work_handler_t handler; - nxt_cache_t *cache; -}; - - -typedef struct { - nxt_work_handler_t nocache_handler; - nxt_work_handler_t ready_handler; - nxt_work_handler_t stale_handler; - nxt_work_handler_t update_stale_handler; - nxt_work_handler_t update_handler; - nxt_work_handler_t timeout_handler; - nxt_work_handler_t error_handler; -} nxt_cache_query_state_t; - - -struct nxt_cache_query_s { - u_char *key_data; - - uint16_t key_len; /* 16 bits */ -#if (NXT_64_BIT) - uint8_t hold; /* 1 bit */ - uint8_t use_stale; /* 1 bit */ - uint8_t update_stale; /* 1 bit */ - uint8_t stale; /* 1 bit */ -#else - uint8_t hold:1; - uint8_t use_stale:1; - uint8_t update_stale:1; - uint8_t stale:1; -#endif - - nxt_cache_node_t *node; - nxt_cache_query_t *next; - nxt_cache_query_state_t *state; - - nxt_time_t now; - - nxt_msec_t timeout; - nxt_timer_t timer; -}; - - -NXT_EXPORT void nxt_cache_init(nxt_cache_t *cache); -NXT_EXPORT void nxt_cache_query(nxt_cache_t *cache, nxt_cache_query_t *q); -NXT_EXPORT void nxt_cache_release(nxt_cache_t *cache, nxt_cache_query_t *q); -NXT_EXPORT nxt_int_t nxt_cache_update(nxt_cache_t *cache, nxt_cache_query_t *q); - - -#endif /* _NXT_CACHE_INCLUDED_ */ diff --git a/src/nxt_fastcgi_record_parse.c b/src/nxt_fastcgi_record_parse.c deleted file mode 100644 index 7d2ce32e..00000000 --- a/src/nxt_fastcgi_record_parse.c +++ /dev/null @@ -1,307 +0,0 @@ - -/* - * Copyright (C) Igor Sysoev - * Copyright (C) NGINX, Inc. - */ - -#include <nxt_main.h> - - -#define NXT_FASTCGI_DATA_MIDDLE 0 -#define NXT_FASTCGI_DATA_END_ON_BORDER 1 -#define NXT_FASTCGI_DATA_END 2 - - -static nxt_int_t nxt_fastcgi_buffer(nxt_fastcgi_parse_t *fp, nxt_buf_t ***tail, - nxt_buf_t *in); - - -void -nxt_fastcgi_record_parse(nxt_task_t *task, nxt_fastcgi_parse_t *fp, - nxt_buf_t *in) -{ - u_char ch; - nxt_int_t ret, stream; - nxt_buf_t *b, *nb, **tail[2]; - const char *msg; - enum { - sw_fastcgi_version = 0, - sw_fastcgi_type, - sw_fastcgi_request_id_high, - sw_fastcgi_request_id_low, - sw_fastcgi_content_length_high, - sw_fastcgi_content_length_low, - sw_fastcgi_padding_length, - sw_fastcgi_reserved, - sw_fastcgi_data, - sw_fastcgi_padding, - sw_fastcgi_end_request, - } state; - - fp->out[0] = NULL; - fp->out[1] = NULL; - - tail[0] = &fp->out[0]; - tail[1] = &fp->out[1]; - - state = fp->state; - - for (b = in; b != NULL; b = b->next) { - - if (nxt_buf_is_sync(b)) { - **tail = b; - *tail = &b->next; - continue; - } - - fp->pos = b->mem.pos; - - while (fp->pos < b->mem.free) { - /* - * The sw_fastcgi_data state is tested outside the - * switch to preserve fp->pos and to not touch memory. - */ - if (state == sw_fastcgi_data) { - - /* - * fp->type here can be only NXT_FASTCGI_STDOUT - * or NXT_FASTCGI_STDERR. NXT_FASTCGI_END_REQUEST - * is tested in sw_fastcgi_reserved. - */ - stream = fp->type - NXT_FASTCGI_STDOUT; - - ret = nxt_fastcgi_buffer(fp, &tail[stream], b); - - if (ret == NXT_FASTCGI_DATA_MIDDLE) { - goto next; - } - - if (nxt_slow_path(ret == NXT_ERROR)) { - fp->error = 1; - goto done; - } - - if (fp->padding == 0) { - state = sw_fastcgi_version; - - } else { - state = sw_fastcgi_padding; - } - - if (ret == NXT_FASTCGI_DATA_END_ON_BORDER) { - goto next; - } - - /* ret == NXT_FASTCGI_DATA_END */ - } - - ch = *fp->pos++; - - nxt_thread_log_debug("fastcgi record byte: %02Xd", ch); - - switch (state) { - - case sw_fastcgi_version: - if (nxt_fast_path(ch == 1)) { - state = sw_fastcgi_type; - continue; - } - - msg = "unsupported FastCGI protocol version"; - goto fastcgi_error; - - case sw_fastcgi_type: - switch (ch) { - case NXT_FASTCGI_STDOUT: - case NXT_FASTCGI_STDERR: - case NXT_FASTCGI_END_REQUEST: - fp->type = ch; - state = sw_fastcgi_request_id_high; - continue; - default: - msg = "invalid FastCGI record type"; - goto fastcgi_error; - } - - case sw_fastcgi_request_id_high: - /* FastCGI multiplexing is not supported. */ - if (nxt_fast_path(ch == 0)) { - state = sw_fastcgi_request_id_low; - continue; - } - - msg = "unexpected FastCGI request ID high byte"; - goto fastcgi_error; - - case sw_fastcgi_request_id_low: - if (nxt_fast_path(ch == 1)) { - state = sw_fastcgi_content_length_high; - continue; - } - - msg = "unexpected FastCGI request ID low byte"; - goto fastcgi_error; - - case sw_fastcgi_content_length_high: - fp->length = ch << 8; - state = sw_fastcgi_content_length_low; - continue; - - case sw_fastcgi_content_length_low: - fp->length |= ch; - state = sw_fastcgi_padding_length; - continue; - - case sw_fastcgi_padding_length: - fp->padding = ch; - state = sw_fastcgi_reserved; - continue; - - case sw_fastcgi_reserved: - nxt_thread_log_debug("fastcgi record type:%d " - "length:%uz padding:%d", - fp->type, fp->length, fp->padding); - - if (nxt_fast_path(fp->type != NXT_FASTCGI_END_REQUEST)) { - state = sw_fastcgi_data; - continue; - } - - state = sw_fastcgi_end_request; - continue; - - case sw_fastcgi_data: - /* - * This state is processed before the switch. - * It added here just to suppress a warning. - */ - continue; - - case sw_fastcgi_padding: - /* - * No special fast processing of padding - * because it usually takes just 1-7 bytes. - */ - fp->padding--; - - if (fp->padding == 0) { - nxt_thread_log_debug("fastcgi record end"); - state = sw_fastcgi_version; - } - continue; - - case sw_fastcgi_end_request: - /* Just skip 8 bytes of END_REQUEST. */ - fp->length--; - - if (fp->length != 0) { - continue; - } - - fp->done = 1; - - nxt_thread_log_debug("fastcgi end request"); - - goto done; - } - } - - if (b->retain == 0) { - /* No record data was found in a buffer. */ - nxt_thread_current_work_queue_add(task->thread, - b->completion_handler, - task, b, b->parent); - } - - next: - - continue; - } - - fp->state = state; - - return; - -fastcgi_error: - - nxt_thread_log_error(NXT_LOG_ERR, "upstream sent %s: %d", msg, ch); - - fp->fastcgi_error = 1; - -done: - - nb = fp->last_buf(fp); - - if (nxt_fast_path(nb != NULL)) { - *tail[0] = nb; - - } else { - fp->error = 1; - } - - // STUB: fp->fastcgi_error = 1; - // STUB: fp->error = 1; - - return; -} - - -static nxt_int_t -nxt_fastcgi_buffer(nxt_fastcgi_parse_t *fp, nxt_buf_t ***tail, nxt_buf_t *in) -{ - u_char *p; - size_t size; - nxt_buf_t *b; - - if (fp->length == 0) { - return NXT_FASTCGI_DATA_END; - } - - p = fp->pos; - size = in->mem.free - p; - - if (fp->length >= size && in->retain == 0) { - /* - * Use original buffer if the buffer is lesser than or equal to - * FastCGI record size and this is the first record in the buffer. - */ - in->mem.pos = p; - **tail = in; - *tail = &in->next; - - } else { - b = nxt_buf_mem_alloc(fp->mem_pool, 0, 0); - if (nxt_slow_path(b == NULL)) { - return NXT_ERROR; - } - - **tail = b; - *tail = &b->next; - - b->parent = in; - in->retain++; - b->mem.pos = p; - b->mem.start = p; - - if (fp->length < size) { - p += fp->length; - fp->pos = p; - - b->mem.free = p; - b->mem.end = p; - - return NXT_FASTCGI_DATA_END; - } - - b->mem.free = in->mem.free; - b->mem.end = in->mem.free; - } - - fp->length -= size; - - if (fp->length == 0) { - return NXT_FASTCGI_DATA_END_ON_BORDER; - } - - return NXT_FASTCGI_DATA_MIDDLE; -} diff --git a/src/nxt_fastcgi_source.c b/src/nxt_fastcgi_source.c deleted file mode 100644 index b2424292..00000000 --- a/src/nxt_fastcgi_source.c +++ /dev/null @@ -1,750 +0,0 @@ - -/* - * Copyright (C) Igor Sysoev - * Copyright (C) NGINX, Inc. - */ - -#include <nxt_main.h> - - -#define NXT_FASTCGI_RESPONDER 1 -#define NXT_FASTCGI_KEEP_CONN 1 - - -typedef struct { - u_char *buf; - uint32_t len; - u_char length[4]; -} nxt_fastcgi_param_t; - - -#define nxt_fastcgi_set_record_length(p, length) \ - do { \ - uint32_t len = length; \ - \ - p[1] = (u_char) len; len >>= 8; \ - p[0] = (u_char) len; \ - } while (0) - - -nxt_inline size_t -nxt_fastcgi_param_length(u_char *p, uint32_t length) -{ - if (nxt_fast_path(length < 128)) { - *p = (u_char) length; - return 1; - } - - p[3] = (u_char) length; length >>= 8; - p[2] = (u_char) length; length >>= 8; - p[1] = (u_char) length; length >>= 8; - p[0] = (u_char) (length | 0x80); - - return 4; -} - - -static nxt_buf_t *nxt_fastcgi_request_create(nxt_fastcgi_source_t *fs); -static nxt_int_t nxt_fastcgi_next_param(nxt_fastcgi_source_t *fs, - nxt_fastcgi_param_t *param); - -static void nxt_fastcgi_source_record_filter(nxt_task_t *task, void *obj, - void *data); -static void nxt_fastcgi_source_record_error(nxt_task_t *task, void *obj, - void *data); -static void nxt_fastcgi_source_header_filter(nxt_task_t *task, void *obj, - void *data); -static void nxt_fastcgi_source_sync_buffer(nxt_task_t *task, - nxt_fastcgi_source_t *fs, nxt_buf_t *b); - -static nxt_int_t nxt_fastcgi_source_header_process(nxt_task_t *task, - nxt_fastcgi_source_t *fs); -static nxt_int_t nxt_fastcgi_source_status(nxt_upstream_source_t *us, - nxt_name_value_t *nv); -static nxt_int_t nxt_fastcgi_source_content_length(nxt_upstream_source_t *us, - nxt_name_value_t *nv); - -static void nxt_fastcgi_source_header_ready(nxt_fastcgi_source_t *fs, - nxt_buf_t *b); -static void nxt_fastcgi_source_body_filter(nxt_task_t *task, void *obj, - void *data); -static nxt_buf_t *nxt_fastcgi_source_last_buf(nxt_fastcgi_parse_t *fp); -static void nxt_fastcgi_source_error(nxt_task_t *task, - nxt_stream_source_t *stream); -static void nxt_fastcgi_source_fail(nxt_task_t *task, nxt_fastcgi_source_t *fs); - - -/* - * A FastCGI request: - * FCGI_BEGIN_REQUEST record; - * Several FCGI_PARAMS records, the last FCGI_PARAMS record must have - * zero content length, - * Several FCGI_STDIN records, the last FCGI_STDIN record must have - * zero content length. - */ - -static const uint8_t nxt_fastcgi_begin_request[] = { - 1, /* FastCGI version. */ - NXT_FASTCGI_BEGIN_REQUEST, /* The BEGIN_REQUEST record type. */ - 0, 1, /* Request ID. */ - 0, 8, /* Content length of the Role record. */ - 0, /* Padding length. */ - 0, /* Reserved. */ - - 0, NXT_FASTCGI_RESPONDER, /* The Responder Role. */ - 0, /* Flags. */ - 0, 0, 0, 0, 0, /* Reserved. */ -}; - - -static const uint8_t nxt_fastcgi_params_record[] = { - 1, /* FastCGI version. */ - NXT_FASTCGI_PARAMS, /* The PARAMS record type. */ - 0, 1, /* Request ID. */ - 0, 0, /* Content length. */ - 0, /* Padding length. */ - 0, /* Reserved. */ -}; - - -static const uint8_t nxt_fastcgi_stdin_record[] = { - 1, /* FastCGI version. */ - NXT_FASTCGI_STDIN, /* The STDIN record type. */ - 0, 1, /* Request ID. */ - 0, 0, /* Content length. */ - 0, /* Padding length. */ - 0, /* Reserved. */ -}; - - -void -nxt_fastcgi_source_handler(nxt_task_t *task, nxt_upstream_source_t *us, - nxt_fastcgi_source_request_create_t request_create) -{ - nxt_stream_source_t *stream; - nxt_fastcgi_source_t *fs; - - fs = nxt_mp_zget(us->buffers.mem_pool, sizeof(nxt_fastcgi_source_t)); - if (nxt_slow_path(fs == NULL)) { - goto fail; - } - - us->protocol_source = fs; - - fs->header_in.list = nxt_list_create(us->buffers.mem_pool, 8, - sizeof(nxt_name_value_t)); - if (nxt_slow_path(fs->header_in.list == NULL)) { - goto fail; - } - - fs->header_in.hash = us->header_hash; - fs->upstream = us; - fs->request_create = request_create; - - stream = us->stream; - - if (stream == NULL) { - stream = nxt_mp_zget(us->buffers.mem_pool, sizeof(nxt_stream_source_t)); - if (nxt_slow_path(stream == NULL)) { - goto fail; - } - - us->stream = stream; - stream->upstream = us; - - } else { - nxt_memzero(stream, sizeof(nxt_stream_source_t)); - } - - /* - * Create the FastCGI source filter chain: - * stream source | FastCGI record filter | FastCGI HTTP header filter - */ - stream->next = &fs->query; - stream->error_handler = nxt_fastcgi_source_error; - - fs->record.next.context = fs; - fs->record.next.filter = nxt_fastcgi_source_header_filter; - - fs->record.parse.last_buf = nxt_fastcgi_source_last_buf; - fs->record.parse.data = fs; - fs->record.parse.mem_pool = us->buffers.mem_pool; - - fs->query.context = &fs->record.parse; - fs->query.filter = nxt_fastcgi_source_record_filter; - - fs->header_in.content_length = -1; - - stream->out = nxt_fastcgi_request_create(fs); - - if (nxt_fast_path(stream->out != NULL)) { - nxt_memzero(&fs->u.header, sizeof(nxt_http_split_header_parse_t)); - fs->u.header.mem_pool = fs->upstream->buffers.mem_pool; - - nxt_stream_source_connect(task, stream); - return; - } - -fail: - - nxt_fastcgi_source_fail(task, fs); -} - - -static nxt_buf_t * -nxt_fastcgi_request_create(nxt_fastcgi_source_t *fs) -{ - u_char *p, *record_length; - size_t len, size, max_record_size; - nxt_int_t ret; - nxt_buf_t *b, *req, **prev; - nxt_bool_t begin_request; - nxt_fastcgi_param_t param; - - nxt_thread_log_debug("fastcgi request"); - - begin_request = 1; - param.len = 0; - prev = &req; - -new_buffer: - - ret = nxt_buf_pool_mem_alloc(&fs->upstream->buffers, 0); - if (nxt_slow_path(ret != NXT_OK)) { - return NULL; - } - - b = fs->upstream->buffers.current; - fs->upstream->buffers.current = NULL; - - *prev = b; - prev = &b->next; - -new_record: - - size = b->mem.end - b->mem.free; - size = nxt_align_size(size, 8) - 8; - /* The maximal FastCGI record content size is 65535. 65528 is 64K - 8. */ - max_record_size = nxt_min(65528, size); - - p = b->mem.free; - - if (begin_request) { - /* TODO: fastcgi keep conn in flags. */ - p = nxt_cpymem(p, nxt_fastcgi_begin_request, 16); - max_record_size -= 16; - begin_request = 0; - } - - b->mem.free = nxt_cpymem(p, nxt_fastcgi_params_record, 8); - record_length = &p[4]; - size = 0; - - for ( ;; ) { - if (param.len == 0) { - ret = nxt_fastcgi_next_param(fs, ¶m); - - if (nxt_slow_path(ret != NXT_OK)) { - - if (nxt_slow_path(ret == NXT_ERROR)) { - return NULL; - } - - /* ret == NXT_DONE */ - break; - } - } - - len = max_record_size; - - if (nxt_fast_path(len >= param.len)) { - len = param.len; - param.len = 0; - - } else { - param.len -= len; - } - - nxt_thread_log_debug("fastcgi copy len:%uz", len); - - b->mem.free = nxt_cpymem(b->mem.free, param.buf, len); - - size += len; - max_record_size -= len; - - if (nxt_slow_path(param.len != 0)) { - /* The record is full. */ - - param.buf += len; - - nxt_thread_log_debug("fastcgi content size:%uz", size); - - nxt_fastcgi_set_record_length(record_length, size); - - /* The minimal size of aligned record with content is 16 bytes. */ - if (b->mem.end - b->mem.free >= 16) { - goto new_record; - } - - nxt_thread_log_debug("\"%*s\"", b->mem.free - b->mem.pos, - b->mem.pos); - goto new_buffer; - } - } - - nxt_thread_log_debug("fastcgi content size:%uz", size); - - nxt_fastcgi_set_record_length(record_length, size); - - /* A padding length. */ - size = 8 - size % 8; - record_length[2] = (u_char) size; - nxt_memzero(b->mem.free, size); - b->mem.free += size; - - nxt_thread_log_debug("fastcgi padding:%uz", size); - - if (b->mem.end - b->mem.free < 16) { - nxt_thread_log_debug("\"%*s\"", b->mem.free - b->mem.pos, b->mem.pos); - - b = nxt_buf_mem_alloc(fs->upstream->buffers.mem_pool, 16, 0); - if (nxt_slow_path(b == NULL)) { - return NULL; - } - - *prev = b; - prev = &b->next; - } - - /* The end of FastCGI params. */ - p = nxt_cpymem(b->mem.free, nxt_fastcgi_params_record, 8); - - /* The end of FastCGI stdin. */ - b->mem.free = nxt_cpymem(p, nxt_fastcgi_stdin_record, 8); - - nxt_thread_log_debug("\"%*s\"", b->mem.free - b->mem.pos, b->mem.pos); - - return req; -} - - -static nxt_int_t -nxt_fastcgi_next_param(nxt_fastcgi_source_t *fs, nxt_fastcgi_param_t *param) -{ - nxt_int_t ret; - - enum { - sw_name_length = 0, - sw_value_length, - sw_name, - sw_value, - }; - - switch (fs->state) { - - case sw_name_length: - ret = fs->request_create(fs); - - if (nxt_slow_path(ret != NXT_OK)) { - return ret; - } - - nxt_thread_log_debug("fastcgi param \"%V: %V\"", - &fs->u.request.name, &fs->u.request.value); - - fs->state = sw_value_length; - param->buf = param->length; - param->len = nxt_fastcgi_param_length(param->length, - fs->u.request.name.len); - break; - - case sw_value_length: - fs->state = sw_name; - param->buf = param->length; - param->len = nxt_fastcgi_param_length(param->length, - fs->u.request.value.len); - break; - - case sw_name: - fs->state = sw_value; - param->buf = fs->u.request.name.data; - param->len = fs->u.request.name.len; - break; - - case sw_value: - fs->state = sw_name_length; - param->buf = fs->u.request.value.data; - param->len = fs->u.request.value.len; - break; - } - - return NXT_OK; -} - - -static void -nxt_fastcgi_source_record_filter(nxt_task_t *task, void *obj, void *data) -{ - size_t size; - u_char *p; - nxt_buf_t *b, *in; - nxt_fastcgi_source_t *fs; - nxt_fastcgi_source_record_t *fsr; - - fsr = obj; - in = data; - - nxt_debug(task, "fastcgi source record filter"); - - if (nxt_slow_path(fsr->parse.done)) { - return; - } - - nxt_fastcgi_record_parse(task, &fsr->parse, in); - - fs = nxt_container_of(fsr, nxt_fastcgi_source_t, record); - - if (fsr->parse.error) { - nxt_fastcgi_source_fail(task, fs); - return; - } - - if (fsr->parse.fastcgi_error) { - /* - * Output all parsed before a FastCGI record error and close upstream. - */ - nxt_thread_current_work_queue_add(task->thread, - nxt_fastcgi_source_record_error, - task, fs, NULL); - } - - /* Log FastCGI stderr output. */ - - for (b = fsr->parse.out[1]; b != NULL; b = b->next) { - - for (p = b->mem.free - 1; p >= b->mem.pos; p--) { - if (*p != '\r' && *p != '\n') { - break; - } - } - - size = (p + 1) - b->mem.pos; - - if (size != 0) { - nxt_log(task, NXT_LOG_ERR, - "upstream sent in FastCGI stderr: \"%*s\"", - size, b->mem.pos); - } - - b->completion_handler(task, b, b->parent); - } - - /* Process FastCGI stdout output. */ - - if (fsr->parse.out[0] != NULL) { - nxt_source_filter(task->thread, fs->upstream->work_queue, task, - &fsr->next, fsr->parse.out[0]); - } -} - - -static void -nxt_fastcgi_source_record_error(nxt_task_t *task, void *obj, void *data) -{ - nxt_fastcgi_source_t *fs; - - fs = obj; - - nxt_fastcgi_source_fail(task, fs); -} - - -static void -nxt_fastcgi_source_header_filter(nxt_task_t *task, void *obj, void *data) -{ - nxt_int_t ret; - nxt_buf_t *b; - nxt_fastcgi_source_t *fs; - - fs = obj; - b = data; - - do { - nxt_debug(task, "fastcgi source header filter"); - - if (nxt_slow_path(nxt_buf_is_sync(b))) { - nxt_fastcgi_source_sync_buffer(task, fs, b); - return; - } - - for ( ;; ) { - ret = nxt_http_split_header_parse(&fs->u.header, &b->mem); - - if (nxt_slow_path(ret != NXT_OK)) { - break; - } - - ret = nxt_fastcgi_source_header_process(task, fs); - - if (nxt_slow_path(ret != NXT_OK)) { - break; - } - } - - if (nxt_fast_path(ret == NXT_DONE)) { - nxt_debug(task, "fastcgi source header done"); - nxt_fastcgi_source_header_ready(fs, b); - return; - } - - if (nxt_fast_path(ret != NXT_AGAIN)) { - - if (ret != NXT_ERROR) { - /* n == NXT_DECLINED: "\r" is not followed by "\n" */ - nxt_log(task, NXT_LOG_ERR, - "upstream sent invalid header line: \"%*s\\r...\"", - fs->u.header.parse.header_end - - fs->u.header.parse.header_name_start, - fs->u.header.parse.header_name_start); - } - - /* ret == NXT_ERROR */ - - nxt_fastcgi_source_fail(task, fs); - return; - } - - b = b->next; - - } while (b != NULL); -} - - -static void -nxt_fastcgi_source_sync_buffer(nxt_task_t *task, nxt_fastcgi_source_t *fs, - nxt_buf_t *b) -{ - if (nxt_buf_is_last(b)) { - nxt_log(task, NXT_LOG_ERR, "upstream closed prematurely connection"); - - } else { - nxt_log(task, NXT_LOG_ERR, "%ui buffers %uz each are not " - "enough to process upstream response header", - fs->upstream->buffers.max, fs->upstream->buffers.size); - } - - /* The stream source sends only the last and the nobuf sync buffer. */ - - nxt_fastcgi_source_fail(task, fs); -} - - -static nxt_int_t -nxt_fastcgi_source_header_process(nxt_task_t *task, nxt_fastcgi_source_t *fs) -{ - size_t len; - nxt_name_value_t *nv; - nxt_lvlhsh_query_t lhq; - nxt_http_header_parse_t *hp; - nxt_upstream_name_value_t *unv; - - hp = &fs->u.header.parse; - - len = hp->header_name_end - hp->header_name_start; - - if (len > 255) { - nxt_log(task, NXT_LOG_INFO, - "upstream sent too long header field name: \"%*s\"", - len, hp->header_name_start); - return NXT_ERROR; - } - - nv = nxt_list_add(fs->header_in.list); - if (nxt_slow_path(nv == NULL)) { - return NXT_ERROR; - } - - nv->hash = hp->header_hash; - nv->skip = 0; - nv->name_len = len; - nv->name_start = hp->header_name_start; - nv->value_len = hp->header_end - hp->header_start; - nv->value_start = hp->header_start; - - nxt_debug(task, "http header: \"%*s: %*s\"", - nv->name_len, nv->name_start, nv->value_len, nv->value_start); - - lhq.key_hash = nv->hash; - lhq.key.len = nv->name_len; - lhq.key.data = nv->name_start; - lhq.proto = &nxt_upstream_header_hash_proto; - - if (nxt_lvlhsh_find(&fs->header_in.hash, &lhq) == NXT_OK) { - unv = lhq.value; - - if (unv->handler(fs->upstream, nv) == NXT_OK) { - return NXT_ERROR; - } - } - - return NXT_OK; -} - - -static const nxt_upstream_name_value_t nxt_fastcgi_source_headers[] - nxt_aligned(32) = -{ - { nxt_fastcgi_source_status, - nxt_upstream_name_value("status") }, - - { nxt_fastcgi_source_content_length, - nxt_upstream_name_value("content-length") }, -}; - - -nxt_int_t -nxt_fastcgi_source_hash_create(nxt_mp_t *mp, nxt_lvlhsh_t *lh) -{ - return nxt_upstream_header_hash_add(mp, lh, nxt_fastcgi_source_headers, - nxt_nitems(nxt_fastcgi_source_headers)); -} - - -static nxt_int_t -nxt_fastcgi_source_status(nxt_upstream_source_t *us, nxt_name_value_t *nv) -{ - nxt_int_t n; - nxt_str_t s; - nxt_fastcgi_source_t *fs; - - s.len = nv->value_len; - s.data = nv->value_start; - - n = nxt_str_int_parse(&s); - - if (nxt_fast_path(n > 0)) { - fs = us->protocol_source; - fs->header_in.status = n; - return NXT_OK; - } - - return NXT_ERROR; -} - - -static nxt_int_t -nxt_fastcgi_source_content_length(nxt_upstream_source_t *us, - nxt_name_value_t *nv) -{ - nxt_off_t length; - nxt_fastcgi_source_t *fs; - - length = nxt_off_t_parse(nv->value_start, nv->value_len); - - if (nxt_fast_path(length > 0)) { - fs = us->protocol_source; - fs->header_in.content_length = length; - return NXT_OK; - } - - return NXT_ERROR; -} - - -static void -nxt_fastcgi_source_header_ready(nxt_fastcgi_source_t *fs, nxt_buf_t *b) -{ - /* - * Change the FastCGI source filter chain: - * stream source | FastCGI record filter | FastCGI body filter - */ - fs->record.next.filter = nxt_fastcgi_source_body_filter; - - if (nxt_buf_mem_used_size(&b->mem) != 0) { - fs->rest = b; - } - - if (fs->header_in.status == 0) { - /* The "200 OK" status by default. */ - fs->header_in.status = 200; - } - - fs->upstream->state->ready_handler(fs); -} - - -/* - * The FastCGI source body filter accumulates first body buffers before the next - * filter will be established and sets completion handler for the last buffer. - */ - -static void -nxt_fastcgi_source_body_filter(nxt_task_t *task, void *obj, void *data) -{ - nxt_buf_t *b, *in; - nxt_fastcgi_source_t *fs; - - fs = obj; - in = data; - - nxt_debug(task, "fastcgi source body filter"); - - for (b = in; b != NULL; b = b->next) { - - if (nxt_buf_is_last(b)) { - b->data = fs->upstream->data; - b->completion_handler = fs->upstream->state->completion_handler; - } - } - - if (fs->next != NULL) { - nxt_source_filter(task->thread, fs->upstream->work_queue, task, - fs->next, in); - return; - } - - nxt_buf_chain_add(&fs->rest, in); -} - - -static nxt_buf_t * -nxt_fastcgi_source_last_buf(nxt_fastcgi_parse_t *fp) -{ - nxt_buf_t *b; - nxt_fastcgi_source_t *fs; - - fs = fp->data; - - b = nxt_buf_sync_alloc(fp->mem_pool, NXT_BUF_SYNC_LAST); - - if (nxt_fast_path(b != NULL)) { - b->data = fs->upstream->data; - b->completion_handler = fs->upstream->state->completion_handler; - } - - return b; -} - - -static void -nxt_fastcgi_source_error(nxt_task_t *task, nxt_stream_source_t *stream) -{ - nxt_fastcgi_source_t *fs; - - nxt_thread_log_debug("fastcgi source error"); - - fs = stream->upstream->protocol_source; - - nxt_fastcgi_source_fail(task, fs); -} - - -static void -nxt_fastcgi_source_fail(nxt_task_t *task, nxt_fastcgi_source_t *fs) -{ - nxt_debug(task, "fastcgi source fail"); - - /* TODO: fail, next upstream, or bad gateway */ - - fs->upstream->state->error_handler(task, fs, NULL); -} diff --git a/src/nxt_fastcgi_source.h b/src/nxt_fastcgi_source.h deleted file mode 100644 index 979e962b..00000000 --- a/src/nxt_fastcgi_source.h +++ /dev/null @@ -1,93 +0,0 @@ - -/* - * Copyright (C) Igor Sysoev - * Copyright (C) NGINX, Inc. - */ - -#ifndef _NXT_FASTCGI_SOURCE_H_INCLUDED_ -#define _NXT_FASTCGI_SOURCE_H_INCLUDED_ - - -#define NXT_FASTCGI_BEGIN_REQUEST 1 -#define NXT_FASTCGI_ABORT_REQUEST 2 -#define NXT_FASTCGI_END_REQUEST 3 -#define NXT_FASTCGI_PARAMS 4 -#define NXT_FASTCGI_STDIN 5 -#define NXT_FASTCGI_STDOUT 6 -#define NXT_FASTCGI_STDERR 7 -#define NXT_FASTCGI_DATA 8 - - -typedef struct nxt_fastcgi_parse_s nxt_fastcgi_parse_t; - -struct nxt_fastcgi_parse_s { - u_char *pos; - - uint16_t length; /* 16 bits */ - uint8_t padding; - uint8_t type; - - uint8_t state; - uint8_t fastcgi_error; /* 1 bit */ - uint8_t error; /* 1 bit */ - uint8_t done; /* 1 bit */ - - /* FastCGI stdout and stderr buffer chains. */ - nxt_buf_t *out[2]; - - nxt_buf_t *(*last_buf)(nxt_fastcgi_parse_t *fp); - void *data; - nxt_mp_t *mem_pool; -}; - - -typedef struct { - nxt_fastcgi_parse_t parse; - nxt_source_hook_t next; -} nxt_fastcgi_source_record_t; - - -typedef struct { - nxt_str_t name; - nxt_str_t value; - uintptr_t data[3]; -} nxt_fastcgi_source_request_t; - - -typedef struct nxt_fastcgi_source_s nxt_fastcgi_source_t; -typedef nxt_int_t (*nxt_fastcgi_source_request_create_t)( - nxt_fastcgi_source_t *fs); - - -struct nxt_fastcgi_source_s { - nxt_source_hook_t query; - nxt_source_hook_t *next; - - nxt_upstream_source_t *upstream; - - nxt_fastcgi_source_request_create_t request_create; - - nxt_upstream_header_in_t header_in; - - nxt_buf_t *rest; - - uint32_t state; /* 2 bits */ - - nxt_fastcgi_source_record_t record; - - union { - nxt_fastcgi_source_request_t request; - } u; -}; - - -NXT_EXPORT void nxt_fastcgi_source_handler(nxt_task_t *task, - nxt_upstream_source_t *us, - nxt_fastcgi_source_request_create_t request_create); -NXT_EXPORT nxt_int_t nxt_fastcgi_source_hash_create(nxt_mp_t *mp, - nxt_lvlhsh_t *lh); -void nxt_fastcgi_record_parse(nxt_task_t *task, nxt_fastcgi_parse_t *fp, - nxt_buf_t *in); - - -#endif /* _NXT_FASTCGI_SOURCE_H_INCLUDED_ */ diff --git a/src/nxt_file_cache.c b/src/nxt_file_cache.c deleted file mode 100644 index 3af3c0c5..00000000 --- a/src/nxt_file_cache.c +++ /dev/null @@ -1,508 +0,0 @@ - -/* - * Copyright (C) Igor Sysoev - * Copyright (C) NGINX, Inc. - */ - -#include <nxt_main.h> - - -static nxt_int_t nxt_file_cache_lvlhsh_test(nxt_lvlhsh_key_t *hkey, void *data); -static nxt_work_handler_t nxt_file_cache_query_locked(nxt_file_cache_t *cache, - nxt_file_cache_query_t *q, nxt_lvlhsh_key_t *hkey); -static nxt_work_handler_t nxt_file_cache_node_hold(nxt_file_cache_t *cache, - nxt_file_cache_query_t *q, nxt_lvlhsh_key_t *hkey); -static nxt_work_handler_t nxt_file_cache_node_test(nxt_file_cache_t *cache, - nxt_file_cache_query_t *q); - -static void nxt_file_cache_wait_handler(void *data); -static void nxt_file_cache_timeout_handler(nxt_event_timer_t *ev); -static void nxt_file_cache_wake_handler(void *data); - -static nxt_file_cache_node_t *nxt_file_cache_node_alloc(nxt_cache_t *cache); -static void nxt_file_cache_node_free(nxt_file_cache_t *cache, - nxt_file_cache_node_t *node, nxt_bool_t fast); -static nxt_file_cache_query_wait_t *nxt_file_cache_query_wait_alloc( - nxt_file_cache_t *cache, nxt_bool_t *fast); -static void nxt_file_cache_query_wait_free(nxt_file_cache_t *cache, - nxt_file_cache_query_wait_t *qw); -static void nxt_file_cache_lock(nxt_file_cache_t *cache); -static void nxt_file_cache_unlock(nxt_file_cache_t *cache); - - -void -nxt_file_cache_init(nxt_cache_t *cache) -{ - static const nxt_lvlhsh_ctx_t ctx = { - nxt_file_cache_lvlhsh_test, - nxt_lvlhsh_alloc, - nxt_lvlhsh_free, - 0, - }; - - /* lvlhsh with large first level. */ - cache->lvlhsh.shift[1] = 10; - - cache->lvlhsh.ctx = &ctx; - - cache->start_time = nxt_thread_time(); -} - - -static nxt_int_t -nxt_file_cache_lvlhsh_test(nxt_lvlhsh_key_t *hkey, void *data) -{ - nxt_file_cache_node_t *node; - - node = data; - - if (nxt_strmem_eq(&hkey->key, node->key_data, node->key_len)) { - return NXT_OK; - } - - return NXT_DECLINED; -} - - -void -nxt_file_cache_query(nxt_file_cache_t *cache, nxt_file_cache_query_t *q) -{ - nxt_lvlhsh_key_t hkey; - nxt_work_handler_t handler; - - if (cache != NULL) { - hkey.key.len = q->key_len; - hkey.key.data = q->key_data; - hkey.key_hash = nxt_murmur_hash2(q->key_data, q->key_len); - hkey.replace = 0; - - nxt_file_cache_lock(cache); - - handler = nxt_file_cache_query_locked(cache, q, &hkey); - - nxt_file_cache_unlock(cache); - - } else { - handler = q->state->nocache_handler; - } - - handler(q); -} - - -static nxt_work_handler_t -nxt_file_cache_query_locked(nxt_file_cache_t *cache, nxt_file_cache_query_t *q, - nxt_lvlhsh_key_t *hkey) -{ - nxt_int_t ret; - nxt_bool_t fast; - nxt_work_handler_t handler; - nxt_file_cache_node_t *node, *sentinel; - nxt_file_cache_query_wait_t *qw; - nxt_file_cache_query_state_t *state; - - state = q->state; - sentinel = nxt_file_cache_node_alloc(cache); - - if (nxt_slow_path(sentinel == NULL)) { - return state->error_handler; - } - - sentinel->key_data = q->key_data; - sentinel->key_len = q->key_len; - hkey->value = sentinel; - - /* - * Try to insert an empty sentinel node to hold updating - * process if there is no existent cache node in cache. - */ - - ret = nxt_lvlhsh_insert(&cache->lvlhsh, hkey); - - if (ret == NXT_OK) { - /* The sentinel node was successully added. */ - - q->node = sentinel; - sentinel->updating = 1; - return state->update_handler; - } - - nxt_cache_node_free(cache, sentinel, 1); - - if (ret == NXT_ERROR) { - return state->error_handler; - } - - /* NXT_DECLINED: a cache node exists. */ - - node = hkey->value; - node->count++; - q->node = node; - - handler = nxt_cache_node_test(cache, q); - - if (handler == NULL) { - /* Add the node to a wait queue. */ - - qw = nxt_cache_query_wait_alloc(cache, &fast); - if (nxt_slow_path(qw == NULL)) { - return state->error_handler; - } - - if (!fast) { - /* The node state may be changed during slow allocation. */ - handler = nxt_cache_node_test(cache, q); - - if (handler != NULL) { - nxt_cache_query_wait_free(cache, qw); - return handler; - } - } - - qw->query = q; - qw->next = node->waiting; - qw->busy = 0; - qw->deleted = 0; - qw->pid = nxt_pid; - qw->engine = nxt_thread_event_engine(); - qw->handler = nxt_cache_wake_handler; - qw->cache = cache; - - node->waiting = qw; - - return nxt_cache_wait_handler; - } - - return handler; -} - - -static nxt_work_handler_t -nxt_cache_node_test(nxt_cache_t *cache, nxt_cache_query_t *q) -{ - nxt_time_t expiry; - nxt_cache_node_t *node; - nxt_cache_query_state_t *state; - - q->stale = 0; - state = q->state; - node = q->node; - - expiry = cache->start_time + node->expiry; - - if (nxt_thread_time() < expiry) { - return state->ready_handler; - } - - /* - * A valid stale or empty sentinel cache node. - * The sentinel node can be only in updating state. - */ - - if (node->updating) { - - if (node->expiry != 0) { - /* A valid stale cache node. */ - - q->stale = 1; - - if (q->use_stale) { - return state->stale_handler; - } - } - - return NULL; - } - - /* A valid stale cache node is not being updated now. */ - - q->stale = 1; - - if (q->use_stale) { - - if (q->update_stale) { - node->updating = 1; - return state->update_stale_handler; - } - - return state->stale_handler; - } - - node->updating = 1; - return state->update_handler; -} - - -static void -nxt_cache_wait_handler(void *data) -{ - nxt_thread_t *thr; - nxt_event_timer_t *ev; - nxt_cache_query_t *q; - - q = data; - - if (&q->timeout == 0) { - return; - } - - ev = &q->timer; - - if (!nxt_event_timer_is_set(ev)) { - thr = nxt_thread(); - ev->log = thr->log; - ev->handler = nxt_cache_timeout_handler; - ev->data = q; - nxt_event_timer_ident(ev, -1); - - nxt_event_timer_add(thr->engine, ev, q->timeout); - } -} - - -static void -nxt_cache_timeout_handler(nxt_event_timer_t *ev) -{ - nxt_cache_query_t *q; - - q = ev->data; - - q->state->timeout_handler(q); -} - - -static void -nxt_cache_wake_handler(void *data) -{ - nxt_cache_t *cache; - nxt_work_handler_t handler; - nxt_cache_query_t *q; - nxt_cache_query_wait_t *qw; - - qw = data; - q = qw->query; - cache = qw->cache; - - nxt_cache_lock(cache); - - handler = nxt_cache_node_test(cache, q); - - if (handler == NULL) { - /* Wait again. */ - qw->next = q->node->waiting; - q->node->waiting = qw; - } - - nxt_cache_unlock(cache); - - if (handler != NULL) { - nxt_cache_query_wait_free(cache, qw); - } - - handler(q); -} - - -static nxt_cache_node_t * -nxt_cache_node_alloc(nxt_cache_t *cache) -{ - nxt_queue_node_t *qn; - nxt_cache_node_t *node; - - qn = nxt_queue_first(&cache->free_nodes); - - if (nxt_fast_path(qn != nxt_queue_tail(&cache->free_nodes))) { - cache->nfree_nodes--; - nxt_queue_remove(qn); - - node = nxt_queue_node_data(qn, nxt_cache_node_t, queue); - nxt_memzero(node, sizeof(nxt_cache_node_t)); - - return node; - } - - nxt_cache_unlock(cache); - - node = cache->alloc(cache->data, sizeof(nxt_cache_node_t)); - - nxt_cache_lock(cache); - - return node; -} - - -static void -nxt_cache_node_free(nxt_cache_t *cache, nxt_cache_node_t *node, nxt_bool_t fast) -{ - if (fast || cache->nfree_nodes < 32) { - nxt_queue_insert_head(&cache->free_nodes, &node->queue); - cache->nfree_nodes++; - return; - } - - nxt_cache_unlock(cache); - - cache->free(cache->data, node); - - nxt_cache_lock(cache); -} - - -static nxt_cache_query_wait_t * -nxt_cache_query_wait_alloc(nxt_cache_t *cache, nxt_bool_t *fast) -{ - nxt_cache_query_wait_t *qw; - - qw = cache->free_query_wait; - - if (nxt_fast_path(qw != NULL)) { - cache->free_query_wait = qw->next; - cache->nfree_query_wait--; - - *fast = 1; - return qw; - } - - nxt_cache_unlock(cache); - - qw = cache->alloc(cache->data, sizeof(nxt_cache_query_wait_t)); - *fast = 0; - - nxt_cache_lock(cache); - - return qw; -} - - -static void -nxt_cache_query_wait_free(nxt_cache_t *cache, nxt_cache_query_wait_t *qw) -{ - if (cache->nfree_query_wait < 32) { - qw->next = cache->free_query_wait; - cache->free_query_wait = qw; - cache->nfree_query_wait++; - return; - } - - nxt_cache_unlock(cache); - - cache->free(cache->data, qw); - - nxt_cache_lock(cache); -} - - -#if 0 - -nxt_int_t -nxt_cache_update(nxt_cache_t *cache, nxt_cache_node_t *node) -{ - nxt_lvlhsh_key_t hkey; - - if (node->expiry == 0) { - /* An empty sentinel node. */ - nxt_cache_release(cache, node); - return; - } - - hkey.key.len = node->key_len; - hkey.key.data = node->key_data; - hkey.key_hash = nxt_murmur_hash2(node->key_data, node->key_len); - hkey.replace = 1; - hkey.value = node; - - node->count = 1; - - if (nxt_lvlhsh_insert(&cache->lvlhsh, &hkey) != NXT_OK) { - return NXT_ERROR; - } - - node = hkey.value; - - if (node != NULL) { - if (node->count != 0) { - node->delete = 1; - - } else { - // delete cache node - } - } - - return NXT_OK; -} - -#endif - - -void -nxt_cache_node_release(nxt_cache_t *cache, nxt_cache_node_t *node) -{ - nxt_bool_t delete; - - nxt_cache_lock(cache); - - delete = nxt_cache_node_release_locked(cache, node); - - nxt_cache_unlock(cache); - - if (delete) { - nxt_thread_work_queue_add(cache->delete_handler, node); - } -} - - -nxt_bool_t -nxt_cache_node_release_locked(nxt_cache_t *cache, nxt_cache_node_t *node) -{ -#if 0 - nxt_lvlhsh_key_t hkey; -#endif - - node->count--; - - if (node->count != 0) { - return 0; - } - - if (!node->deleted) { - /* - * A cache node is locked whilst its count is non zero. - * To minimize number of operations the node's place in expiry - * queue can be updated only if the node is not currently used. - */ - node->accessed = nxt_thread_time() - cache->start_time; - - nxt_queue_remove(&node->queue); - nxt_queue_insert_head(&cache->expiry_queue, &node->queue); - - return 0; - } - -#if 0 - hkey.key.len = node->key_len; - hkey.key.data = node->key_data; - hkey.key_hash = nxt_murmur_hash2(node->key_data, node->key_len); - - nxt_lvlhsh_delete(&cache->lvlhsh, &hkey); -#endif - - return 1; -} - - -static void -nxt_file_cache_lock(nxt_file_cache_t *cache) -{ - if (cache->shared) { - nxt_thread_spin_lock(&cache->lock); - } -} - - -static void -nxt_file_cache_unlock(nxt_file_cache_t *cache) -{ - if (cache->shared) { - nxt_thread_spin_unlock(&cache->lock); - } -} diff --git a/src/nxt_http_source.c b/src/nxt_http_source.c deleted file mode 100644 index 889dcd08..00000000 --- a/src/nxt_http_source.c +++ /dev/null @@ -1,629 +0,0 @@ - -/* - * Copyright (C) Igor Sysoev - * Copyright (C) NGINX, Inc. - */ - -#include <nxt_main.h> - - -typedef struct { - nxt_http_chunk_parse_t parse; - nxt_source_hook_t next; -} nxt_http_source_chunk_t; - - -static nxt_buf_t *nxt_http_source_request_create(nxt_http_source_t *hs); - -static void nxt_http_source_status_filter(nxt_task_t *task, void *obj, - void *data); -static void nxt_http_source_header_filter(nxt_task_t *task, void *obj, - void *data); - -static nxt_int_t nxt_http_source_header_line_process(nxt_http_source_t *hs); -static nxt_int_t nxt_http_source_content_length(nxt_upstream_source_t *us, - nxt_name_value_t *nv); -static nxt_int_t nxt_http_source_transfer_encoding(nxt_upstream_source_t *us, - nxt_name_value_t *nv); - -static void nxt_http_source_header_ready(nxt_task_t *task, - nxt_http_source_t *hs, nxt_buf_t *rest); -static void nxt_http_source_chunk_filter(nxt_task_t *task, void *obj, - void *data); -static void nxt_http_source_chunk_error(nxt_task_t *task, void *obj, - void *data); -static void nxt_http_source_body_filter(nxt_task_t *task, void *obj, - void *data); - -static void nxt_http_source_sync_buffer(nxt_task_t *task, nxt_http_source_t *hs, - nxt_buf_t *b); -static void nxt_http_source_error(nxt_task_t *task, - nxt_stream_source_t *stream); -static void nxt_http_source_fail(nxt_task_t *task, nxt_http_source_t *hs); -static void nxt_http_source_message(const char *msg, size_t len, u_char *p); - - -void -nxt_http_source_handler(nxt_task_t *task, nxt_upstream_source_t *us, - nxt_http_source_request_create_t request_create) -{ - nxt_http_source_t *hs; - nxt_stream_source_t *stream; - - hs = nxt_mp_zget(us->buffers.mem_pool, sizeof(nxt_http_source_t)); - if (nxt_slow_path(hs == NULL)) { - goto fail; - } - - us->protocol_source = hs; - - hs->header_in.list = nxt_list_create(us->buffers.mem_pool, 8, - sizeof(nxt_name_value_t)); - if (nxt_slow_path(hs->header_in.list == NULL)) { - goto fail; - } - - hs->header_in.hash = us->header_hash; - hs->upstream = us; - hs->request_create = request_create; - - stream = us->stream; - - if (stream == NULL) { - stream = nxt_mp_zget(us->buffers.mem_pool, sizeof(nxt_stream_source_t)); - if (nxt_slow_path(stream == NULL)) { - goto fail; - } - - us->stream = stream; - stream->upstream = us; - - } else { - nxt_memzero(stream, sizeof(nxt_stream_source_t)); - } - - /* - * Create the HTTP source filter chain: - * stream source | HTTP status line filter - */ - stream->next = &hs->query; - stream->error_handler = nxt_http_source_error; - - hs->query.context = hs; - hs->query.filter = nxt_http_source_status_filter; - - hs->header_in.content_length = -1; - - stream->out = nxt_http_source_request_create(hs); - - if (nxt_fast_path(stream->out != NULL)) { - nxt_memzero(&hs->u.status_parse, sizeof(nxt_http_status_parse_t)); - - nxt_stream_source_connect(task, stream); - return; - } - -fail: - - nxt_http_source_fail(task, hs); -} - - -nxt_inline u_char * -nxt_http_source_copy(u_char *p, nxt_str_t *src, size_t len) -{ - u_char *s; - - if (nxt_fast_path(len >= src->len)) { - len = src->len; - src->len = 0; - - } else { - src->len -= len; - } - - s = src->data; - src->data += len; - - return nxt_cpymem(p, s, len); -} - - -static nxt_buf_t * -nxt_http_source_request_create(nxt_http_source_t *hs) -{ - nxt_int_t ret; - nxt_buf_t *b, *req, **prev; - - nxt_thread_log_debug("http source create request"); - - prev = &req; - -new_buffer: - - ret = nxt_buf_pool_mem_alloc(&hs->upstream->buffers, 0); - if (nxt_slow_path(ret != NXT_OK)) { - return NULL; - } - - b = hs->upstream->buffers.current; - hs->upstream->buffers.current = NULL; - - *prev = b; - prev = &b->next; - - for ( ;; ) { - ret = hs->request_create(hs); - - if (nxt_fast_path(ret == NXT_OK)) { - b->mem.free = nxt_http_source_copy(b->mem.free, &hs->u.request.copy, - b->mem.end - b->mem.free); - - if (nxt_fast_path(hs->u.request.copy.len == 0)) { - continue; - } - - nxt_thread_log_debug("\"%*s\"", b->mem.free - b->mem.pos, - b->mem.pos); - - goto new_buffer; - } - - if (nxt_slow_path(ret == NXT_ERROR)) { - return NULL; - } - - /* ret == NXT_DONE */ - break; - } - - nxt_thread_log_debug("\"%*s\"", b->mem.free - b->mem.pos, b->mem.pos); - - return req; -} - - -static void -nxt_http_source_status_filter(nxt_task_t *task, void *obj, void *data) -{ - nxt_int_t ret; - nxt_buf_t *b; - nxt_http_source_t *hs; - - hs = obj; - b = data; - - /* - * No cycle over buffer chain is required since at - * start the stream source passes buffers one at a time. - */ - - nxt_debug(task, "http source status filter"); - - if (nxt_slow_path(nxt_buf_is_sync(b))) { - nxt_http_source_sync_buffer(task, hs, b); - return; - } - - ret = nxt_http_status_parse(&hs->u.status_parse, &b->mem); - - if (nxt_fast_path(ret == NXT_OK)) { - /* - * Change the HTTP source filter chain: - * stream source | HTTP header filter - */ - hs->query.filter = nxt_http_source_header_filter; - - nxt_debug(task, "upstream status: \"%*s\"", - hs->u.status_parse.end - b->mem.start, b->mem.start); - - hs->header_in.status = hs->u.status_parse.code; - - nxt_debug(task, "upstream version:%d status:%uD \"%*s\"", - hs->u.status_parse.http_version, - hs->u.status_parse.code, - hs->u.status_parse.end - hs->u.status_parse.start, - hs->u.status_parse.start); - - nxt_memzero(&hs->u.header, sizeof(nxt_http_split_header_parse_t)); - hs->u.header.mem_pool = hs->upstream->buffers.mem_pool; - - nxt_http_source_header_filter(task, hs, b); - return; - } - - if (nxt_slow_path(ret == NXT_ERROR)) { - /* HTTP/0.9 response. */ - hs->header_in.status = 200; - nxt_http_source_header_ready(task, hs, b); - return; - } - - /* ret == NXT_AGAIN */ - - /* - * b->mem.pos is always equal to b->mem.end because b is a buffer - * which points to a response part read by the stream source. - * However, since the stream source is an immediate source of the - * status filter, b->parent is a buffer the stream source reads in. - */ - if (b->parent->mem.pos == b->parent->mem.end) { - nxt_http_source_message("upstream sent too long status line: \"%*s\"", - b->mem.pos - b->mem.start, b->mem.start); - - nxt_http_source_fail(task, hs); - } -} - - -static void -nxt_http_source_header_filter(nxt_task_t *task, void *obj, void *data) -{ - nxt_int_t ret; - nxt_buf_t *b; - nxt_http_source_t *hs; - - hs = obj; - b = data; - - /* - * No cycle over buffer chain is required since at - * start the stream source passes buffers one at a time. - */ - - nxt_debug(task, "http source header filter"); - - if (nxt_slow_path(nxt_buf_is_sync(b))) { - nxt_http_source_sync_buffer(task, hs, b); - return; - } - - for ( ;; ) { - ret = nxt_http_split_header_parse(&hs->u.header, &b->mem); - - if (nxt_slow_path(ret != NXT_OK)) { - break; - } - - ret = nxt_http_source_header_line_process(hs); - - if (nxt_slow_path(ret != NXT_OK)) { - break; - } - } - - if (nxt_fast_path(ret == NXT_DONE)) { - nxt_debug(task, "http source header done"); - nxt_http_source_header_ready(task, hs, b); - return; - } - - if (nxt_fast_path(ret == NXT_AGAIN)) { - return; - } - - if (ret != NXT_ERROR) { - /* ret == NXT_DECLINED: "\r" is not followed by "\n" */ - nxt_log(task, NXT_LOG_ERR, - "upstream sent invalid header line: \"%*s\\r...\"", - hs->u.header.parse.header_end - - hs->u.header.parse.header_name_start, - hs->u.header.parse.header_name_start); - } - - /* ret == NXT_ERROR */ - - nxt_http_source_fail(task, hs); -} - - -static nxt_int_t -nxt_http_source_header_line_process(nxt_http_source_t *hs) -{ - size_t name_len; - nxt_name_value_t *nv; - nxt_lvlhsh_query_t lhq; - nxt_http_header_parse_t *hp; - nxt_upstream_name_value_t *unv; - - hp = &hs->u.header.parse; - - name_len = hp->header_name_end - hp->header_name_start; - - if (name_len > 255) { - nxt_http_source_message("upstream sent too long header field name: " - "\"%*s\"", name_len, hp->header_name_start); - return NXT_ERROR; - } - - nv = nxt_list_add(hs->header_in.list); - if (nxt_slow_path(nv == NULL)) { - return NXT_ERROR; - } - - nv->hash = hp->header_hash; - nv->skip = 0; - nv->name_len = name_len; - nv->name_start = hp->header_name_start; - nv->value_len = hp->header_end - hp->header_start; - nv->value_start = hp->header_start; - - nxt_thread_log_debug("upstream header: \"%*s: %*s\"", - nv->name_len, nv->name_start, - nv->value_len, nv->value_start); - - lhq.key_hash = nv->hash; - lhq.key.len = nv->name_len; - lhq.key.data = nv->name_start; - lhq.proto = &nxt_upstream_header_hash_proto; - - if (nxt_lvlhsh_find(&hs->header_in.hash, &lhq) == NXT_OK) { - unv = lhq.value; - - if (unv->handler(hs->upstream, nv) != NXT_OK) { - return NXT_ERROR; - } - } - - return NXT_OK; -} - - -static const nxt_upstream_name_value_t nxt_http_source_headers[] - nxt_aligned(32) = -{ - { nxt_http_source_content_length, - nxt_upstream_name_value("content-length") }, - - { nxt_http_source_transfer_encoding, - nxt_upstream_name_value("transfer-encoding") }, -}; - - -nxt_int_t -nxt_http_source_hash_create(nxt_mp_t *mp, nxt_lvlhsh_t *lh) -{ - return nxt_upstream_header_hash_add(mp, lh, nxt_http_source_headers, - nxt_nitems(nxt_http_source_headers)); -} - - -static nxt_int_t -nxt_http_source_content_length(nxt_upstream_source_t *us, nxt_name_value_t *nv) -{ - nxt_off_t length; - nxt_http_source_t *hs; - - length = nxt_off_t_parse(nv->value_start, nv->value_len); - - if (nxt_fast_path(length > 0)) { - hs = us->protocol_source; - hs->header_in.content_length = length; - return NXT_OK; - } - - return NXT_ERROR; -} - - -static nxt_int_t -nxt_http_source_transfer_encoding(nxt_upstream_source_t *us, - nxt_name_value_t *nv) -{ - u_char *end; - nxt_http_source_t *hs; - - end = nv->value_start + nv->value_len; - - if (nxt_memcasestrn(nv->value_start, end, "chunked", 7) != NULL) { - hs = us->protocol_source; - hs->chunked = 1; - } - - return NXT_OK; -} - - -static void -nxt_http_source_header_ready(nxt_task_t *task, nxt_http_source_t *hs, - nxt_buf_t *rest) -{ - nxt_buf_t *b; - nxt_upstream_source_t *us; - nxt_http_source_chunk_t *hsc; - - us = hs->upstream; - - /* Free buffers used for request header. */ - - for (b = us->stream->out; b != NULL; b = b->next) { - nxt_buf_pool_free(&us->buffers, b); - } - - if (nxt_fast_path(nxt_buf_pool_available(&us->buffers))) { - - if (hs->chunked) { - hsc = nxt_mp_zalloc(hs->upstream->buffers.mem_pool, - sizeof(nxt_http_source_chunk_t)); - if (nxt_slow_path(hsc == NULL)) { - goto fail; - } - - /* - * Change the HTTP source filter chain: - * stream source | chunk filter | HTTP body filter - */ - hs->query.context = hsc; - hs->query.filter = nxt_http_source_chunk_filter; - - hsc->next.context = hs; - hsc->next.filter = nxt_http_source_body_filter; - - hsc->parse.mem_pool = hs->upstream->buffers.mem_pool; - - if (nxt_buf_mem_used_size(&rest->mem) != 0) { - hs->rest = nxt_http_chunk_parse(task, &hsc->parse, rest); - - if (nxt_slow_path(hs->rest == NULL)) { - goto fail; - } - } - - } else { - /* - * Change the HTTP source filter chain: - * stream source | HTTP body filter - */ - hs->query.filter = nxt_http_source_body_filter; - - if (nxt_buf_mem_used_size(&rest->mem) != 0) { - hs->rest = rest; - } - } - - hs->upstream->state->ready_handler(hs); - return; - } - - nxt_thread_log_error(NXT_LOG_ERR, "%d buffers %uDK each " - "are not enough to read upstream response", - us->buffers.max, us->buffers.size / 1024); -fail: - - nxt_http_source_fail(task, hs); -} - - -static void -nxt_http_source_chunk_filter(nxt_task_t *task, void *obj, void *data) -{ - nxt_buf_t *b; - nxt_http_source_t *hs; - nxt_http_source_chunk_t *hsc; - - hsc = obj; - b = data; - - nxt_debug(task, "http source chunk filter"); - - b = nxt_http_chunk_parse(task, &hsc->parse, b); - - hs = hsc->next.context; - - if (hsc->parse.error) { - nxt_http_source_fail(task, hs); - return; - } - - if (hsc->parse.chunk_error) { - /* Output all parsed before a chunk error and close upstream. */ - nxt_thread_current_work_queue_add(task->thread, - nxt_http_source_chunk_error, - task, hs, NULL); - } - - if (b != NULL) { - nxt_source_filter(task->thread, hs->upstream->work_queue, task, - &hsc->next, b); - } -} - - -static void -nxt_http_source_chunk_error(nxt_task_t *task, void *obj, void *data) -{ - nxt_http_source_t *hs; - - hs = obj; - - nxt_http_source_fail(task, hs); -} - - -/* - * The HTTP source body filter accumulates first body buffers before the next - * filter will be established and sets completion handler for the last buffer. - */ - -static void -nxt_http_source_body_filter(nxt_task_t *task, void *obj, void *data) -{ - nxt_buf_t *b, *in; - nxt_http_source_t *hs; - - hs = obj; - in = data; - - nxt_debug(task, "http source body filter"); - - for (b = in; b != NULL; b = b->next) { - - if (nxt_buf_is_last(b)) { - b->data = hs->upstream->data; - b->completion_handler = hs->upstream->state->completion_handler; - } - } - - if (hs->next != NULL) { - nxt_source_filter(task->thread, hs->upstream->work_queue, task, - hs->next, in); - return; - } - - nxt_buf_chain_add(&hs->rest, in); -} - - -static void -nxt_http_source_sync_buffer(nxt_task_t *task, nxt_http_source_t *hs, - nxt_buf_t *b) -{ - if (nxt_buf_is_last(b)) { - nxt_log(task, NXT_LOG_ERR, - "upstream closed prematurely connection"); - - } else { - nxt_log(task, NXT_LOG_ERR,"%ui buffers %uz each are not " - "enough to process upstream response header", - hs->upstream->buffers.max, hs->upstream->buffers.size); - } - - /* The stream source sends only the last and the nobuf sync buffer. */ - - nxt_http_source_fail(task, hs); -} - - -static void -nxt_http_source_error(nxt_task_t *task, nxt_stream_source_t *stream) -{ - nxt_http_source_t *hs; - - nxt_thread_log_debug("http source error"); - - hs = stream->next->context; - nxt_http_source_fail(task, hs); -} - - -static void -nxt_http_source_fail(nxt_task_t *task, nxt_http_source_t *hs) -{ - nxt_debug(task, "http source fail"); - - /* TODO: fail, next upstream, or bad gateway */ - - hs->upstream->state->error_handler(task, hs, NULL); -} - - -static void -nxt_http_source_message(const char *msg, size_t len, u_char *p) -{ - if (len > NXT_MAX_ERROR_STR - 300) { - len = NXT_MAX_ERROR_STR - 300; - p[len++] = '.'; p[len++] = '.'; p[len++] = '.'; - } - - nxt_thread_log_error(NXT_LOG_ERR, msg, len, p); -} diff --git a/src/nxt_http_source.h b/src/nxt_http_source.h deleted file mode 100644 index 7cf2876b..00000000 --- a/src/nxt_http_source.h +++ /dev/null @@ -1,47 +0,0 @@ - -/* - * Copyright (C) Igor Sysoev - * Copyright (C) NGINX, Inc. - */ - -#ifndef _NXT_HTTP_SOURCE_H_INCLUDED_ -#define _NXT_HTTP_SOURCE_H_INCLUDED_ - - -typedef struct { - nxt_str_t copy; - uintptr_t data[3]; -} nxt_http_source_request_t; - - -typedef struct nxt_http_source_s nxt_http_source_t; -typedef nxt_int_t (*nxt_http_source_request_create_t)(nxt_http_source_t *hs); - - -struct nxt_http_source_s { - nxt_source_hook_t query; - nxt_source_hook_t *next; - - nxt_upstream_source_t *upstream; - - nxt_http_source_request_create_t request_create; - - nxt_upstream_header_in_t header_in; - - nxt_buf_t *rest; - - uint32_t chunked; /* 1 bit */ - - union { - nxt_http_source_request_t request; - } u; -}; - - -NXT_EXPORT void nxt_http_source_handler(nxt_task_t *task, - nxt_upstream_source_t *us, nxt_http_source_request_create_t request_create); -NXT_EXPORT nxt_int_t nxt_http_source_hash_create(nxt_mp_t *mp, - nxt_lvlhsh_t *lh); - - -#endif /* _NXT_HTTP_SOURCE_H_INCLUDED_ */ diff --git a/src/nxt_job_file.c b/src/nxt_job_file.c deleted file mode 100644 index 675bed2f..00000000 --- a/src/nxt_job_file.c +++ /dev/null @@ -1,302 +0,0 @@ - -/* - * Copyright (C) Igor Sysoev - * Copyright (C) NGINX, Inc. - */ - - -#include <nxt_main.h> - - -static void nxt_job_file_open_and_read(nxt_task_t *task, void *obj, void *data); -static nxt_int_t nxt_job_file_open(nxt_job_file_t *jbf); -static nxt_int_t nxt_job_file_info(nxt_job_file_t *jbf); -static nxt_int_t nxt_job_file_mmap(nxt_job_file_t *jbf, size_t size); -static nxt_int_t nxt_job_file_read_data(nxt_job_file_t *jbf, size_t size); -static nxt_int_t nxt_job_file_read_required(nxt_job_file_t *jbf); - - -nxt_job_file_t * -nxt_job_file_create(nxt_mp_t *mp) -{ - nxt_job_file_t *jbf; - - jbf = nxt_job_create(mp, sizeof(nxt_job_file_t)); - - if (nxt_fast_path(jbf != NULL)) { - jbf->file.fd = NXT_FILE_INVALID; - jbf->file.accessed = NXT_FILE_ACCESSED_LONG_AGO; - jbf->read_required = nxt_job_file_read_required; - } - - return jbf; -} - - -void -nxt_job_file_init(nxt_job_file_t *jbf) -{ - nxt_job_init(&jbf->job, sizeof(nxt_job_file_t)); - - jbf->file.fd = NXT_FILE_INVALID; - jbf->file.accessed = NXT_FILE_ACCESSED_LONG_AGO; - jbf->read_required = nxt_job_file_read_required; -} - - -/* - * Must be a function but not a macro, because - * it can be used as function pointer. - */ - -void -nxt_job_file_read(nxt_task_t *task, nxt_job_t *job) -{ - nxt_job_start(task, job, nxt_job_file_open_and_read); -} - - -static void -nxt_job_file_open_and_read(nxt_task_t *task, void *obj, void *data) -{ - size_t size; - nxt_int_t n; - nxt_bool_t read_ahead; - nxt_file_t *file; - nxt_job_file_t *jbf; - nxt_work_handler_t handler; - - jbf = obj; - file = &jbf->file; - - nxt_debug(task, "file job read: \"%FN\"", file->name); - - if (file->fd != NXT_FILE_INVALID && jbf->close_before_open) { - nxt_file_close(file); - file->fd = NXT_FILE_INVALID; - } - - if (file->fd == NXT_FILE_INVALID) { - - switch (nxt_job_file_open(jbf)) { - - case NXT_OK: - break; - - case NXT_DECLINED: - handler = jbf->ready_handler; - goto done; - - default: /* NXT_ERROR */ - handler = jbf->error_handler; - goto done; - } - } - - if (file->size > 0) { - - if (jbf->buffer != NULL) { - size = nxt_buf_mem_size(&jbf->buffer->mem); - size = nxt_min(file->size, (nxt_off_t) size); - read_ahead = nxt_buf_is_mmap(jbf->buffer); - - } else { - size = nxt_min(file->size, 1024 * 1024); - read_ahead = jbf->read_ahead; - } - - if (read_ahead) { - nxt_file_read_ahead(&jbf->file, jbf->offset, size); - } - - if (jbf->buffer != NULL) { - - if (nxt_buf_is_mmap(jbf->buffer)) { - n = nxt_job_file_mmap(jbf, size); - - } else { - n = nxt_job_file_read_data(jbf, size); - } - - if (nxt_slow_path(n != NXT_OK)) { - handler = jbf->error_handler; - goto done; - } - } - } - - if (jbf->offset == file->size) { - jbf->complete = 1; - - if (jbf->close) { - nxt_file_close(file); - file->fd = NXT_FILE_INVALID; - } - } - - nxt_job_return(task, &jbf->job, jbf->ready_handler); - return; - -done: - - if (file->fd != NXT_FILE_INVALID) { - nxt_file_close(file); - file->fd = NXT_FILE_INVALID; - } - - nxt_job_return(task, &jbf->job, handler); -} - - -static nxt_int_t -nxt_job_file_open(nxt_job_file_t *jbf) -{ - nxt_int_t n; - - if (jbf->test_before_open) { - n = nxt_job_file_info(jbf); - - if (n != NXT_OK) { - goto test_directory; - } - - if (jbf->file.type == NXT_FILE_DIRECTORY) { - return NXT_DECLINED; - } - - if (jbf->read_required(jbf) != NXT_OK) { - return NXT_DECLINED; - } - } - - n = nxt_file_open(&jbf->file, NXT_FILE_RDONLY, NXT_FILE_OPEN, 0); - - if (n == NXT_OK) { - n = nxt_job_file_info(jbf); - - if (nxt_fast_path(n == NXT_OK)) { - - if (jbf->file.type == NXT_FILE_DIRECTORY) { - return NXT_DECLINED; - } - - return jbf->read_required(jbf); - } - - return n; - } - -test_directory: - - if (jbf->directory_end != 0 - && jbf->file.error != NXT_ENOTDIR - && jbf->file.error != NXT_ENAMETOOLONG - && jbf->file.error != NXT_EACCES) - { - jbf->file.name[jbf->directory_end] = '\0'; - - return nxt_job_file_info(jbf); - } - - return n; -} - - -static nxt_int_t -nxt_job_file_info(nxt_job_file_t *jbf) -{ - nxt_int_t n; - nxt_file_t *file; - nxt_file_info_t fi; - - file = &jbf->file; - - n = nxt_file_info(file, &fi); - - if (n != NXT_OK) { - return NXT_ERROR; - } - - if (nxt_is_file(&fi)) { - file->type = NXT_FILE_REGULAR; - file->size = nxt_file_size(&fi); - file->mtime = nxt_file_mtime(&fi); - - } else if (nxt_is_dir(&fi)) { - file->type = NXT_FILE_DIRECTORY; - file->size = nxt_file_size(&fi); - file->mtime = nxt_file_mtime(&fi); - } - - return NXT_OK; -} - - -static nxt_int_t -nxt_job_file_mmap(nxt_job_file_t *jbf, size_t size) -{ - u_char *p, *end; - static nxt_uint_t n; - - p = nxt_mem_map(NULL, &jbf->buffer->mmap, size, NXT_MEM_MAP_READ, - (NXT_MEM_MAP_FILE | NXT_MEM_MAP_PREFAULT), - jbf->file.fd, jbf->offset); - - if (nxt_fast_path(p != NXT_MEM_MAP_FAILED)) { - - end = p + size; - - jbf->buffer->mem.pos = p; - jbf->buffer->mem.free = end; - jbf->buffer->mem.start = p; - jbf->buffer->mem.end = end; - jbf->buffer->file_end += size; - jbf->offset += size; - - /* - * The mapped pages should be already preloaded in the kernel page - * cache by nxt_file_read_ahead(). Touching them should wire the pages - * in user land memory if mmap() did not do this. Adding to the static - * variable "n" disables the loop elimination during optimization. - */ - n += *p; - - for (p = nxt_align_ptr(p, nxt_pagesize); p < end; p += nxt_pagesize) { - n += *p; - } - - return NXT_OK; - } - - return NXT_ERROR; -} - - -static nxt_int_t -nxt_job_file_read_data(nxt_job_file_t *jbf, size_t size) -{ - ssize_t n; - - n = nxt_file_read(&jbf->file, jbf->buffer->mem.free, size, jbf->offset); - - if (nxt_fast_path(n > 0)) { - - jbf->buffer->mem.free += n; - jbf->offset += n; - - if (nxt_buf_is_file(jbf->buffer)) { - jbf->buffer->file_end += n; - } - - return NXT_OK; - } - - return NXT_ERROR; -} - - -static nxt_int_t -nxt_job_file_read_required(nxt_job_file_t *jbf) -{ - return NXT_OK; -} diff --git a/src/nxt_job_file.h b/src/nxt_job_file.h deleted file mode 100644 index 93c6393c..00000000 --- a/src/nxt_job_file.h +++ /dev/null @@ -1,74 +0,0 @@ - -/* - * Copyright (C) Igor Sysoev - * Copyright (C) NGINX, Inc. - */ - -#ifndef _NXT_JOB_FILE_H_INCLUDED_ -#define _NXT_JOB_FILE_H_INCLUDED_ - - -/* - * nxt_job_file_read() allows to open a file, to get its type, size, and - * modification time, to read or map file content to memory, and to close - * the file. It can be done as one operation for small file or as several - * operations for large file. On each operation completion ready_handler - * or error_handler completion handlers are called. Since they are job - * operations, they can be run by a thread pool. - * - * If a file is not opened then it is opened and its type, size, and - * modification time are got. Then file content starting from given offset - * is read or mapped in memory if there is a buffer supplied. The offset - * field is correspondingly updated. - * - * If there is no buffer but the read_ahead flag is set then the first - * byte is read to initiate read ahead operation. - * - * If the close flag is set then file descriptor is closed when the file - * is completely read. - * - * The complete flag is set by nxt_job_file_read() when the file is - * completely read. - * - * The test_before_open flag allows to save syscalls in some case, for - * example, not to open and then not to close a directory. It calls - * nxt_file_info() to get file type, size, and modification time before - * opening the file. A custom read_required() callback combined with this - * flag can also omit opening and reading on some conditions. However, - * if the callback forces opening then additional nxt_file_info() is - * called after opening. The default read_required() callback always - * forces opening and reading. - */ - - -typedef struct nxt_job_file_s nxt_job_file_t; - -struct nxt_job_file_s { - nxt_job_t job; - - nxt_file_t file; - - nxt_off_t offset; - nxt_buf_t *buffer; - - nxt_work_handler_t ready_handler; - nxt_work_handler_t error_handler; - - nxt_int_t (*read_required)(nxt_job_file_t *jbf); - - uint16_t directory_end; - - uint16_t close_before_open:1; - uint16_t test_before_open:1; - uint16_t read_ahead:1; - uint16_t close:1; - uint16_t complete:1; -}; - - -NXT_EXPORT nxt_job_file_t *nxt_job_file_create(nxt_mp_t *mp); -NXT_EXPORT void nxt_job_file_init(nxt_job_file_t *jbf); -NXT_EXPORT void nxt_job_file_read(nxt_task_t *task, nxt_job_t *job); - - -#endif /* _NXT_JOB_FILE_H_INCLUDED_ */ diff --git a/src/nxt_job_file_cache.c b/src/nxt_job_file_cache.c deleted file mode 100644 index 680d0665..00000000 --- a/src/nxt_job_file_cache.c +++ /dev/null @@ -1,42 +0,0 @@ - -/* - * Copyright (C) Igor Sysoev - * Copyright (C) NGINX, Inc. - */ - - -#include <nxt_main.h> - - -typedef struct { - nxt_cache_node_t node; - nxt_file_t file; -} nxt_file_cache_t; - - -void -nxt_job_file_cache_read(nxt_cache_t *cache, nxt_job_file_t *jbf) -{ - nxt_file_cache_node_t *node; - - node = nxt_cache_find(cache); - - if (node != NULL) { - - if (node->fd != -1) { - nxt_job_return(&jbf->job, jbf->ready_handler); - return; - } - - if (node->error != 0) { - nxt_job_return(&jbf->job, jbf->error_handler); - return; - } - - if (node->accessed + 60 > nxt_thread_time()) { - jbf->job.thread_pool = NULL; - } - } - - nxt_job_file_read(jbf); -} diff --git a/src/nxt_main.h b/src/nxt_main.h index b0cdc2d3..75e1724e 100644 --- a/src/nxt_main.h +++ b/src/nxt_main.h @@ -145,13 +145,9 @@ typedef void (*nxt_event_conn_handler_t)(nxt_thread_t *thr, nxt_conn_t *c); #include <nxt_event_engine.h> #include <nxt_job.h> -#include <nxt_job_file.h> -#include <nxt_buf_filter.h> #include <nxt_sockaddr.h> -#include <nxt_cache.h> - #include <nxt_http_parse.h> #include <nxt_runtime.h> #include <nxt_port_hash.h> diff --git a/src/nxt_mem_pool_cleanup.c b/src/nxt_mem_pool_cleanup.c deleted file mode 100644 index ceafc9c8..00000000 --- a/src/nxt_mem_pool_cleanup.c +++ /dev/null @@ -1,39 +0,0 @@ - -/* - * Copyright (C) Igor Sysoev - * Copyright (C) NGINX, Inc. - */ - -#include <nxt_main.h> - - -static void nxt_mem_pool_file_cleanup_handler(nxt_task_t *task, void *data); - - -nxt_mem_pool_cleanup_t * -nxt_mem_pool_file_cleanup(nxt_mem_pool_t *mp, nxt_file_t *file) -{ - nxt_mem_pool_cleanup_t *mpcl; - - mpcl = nxt_mem_pool_cleanup(mp, 0); - - if (nxt_fast_path(mpcl != NULL)) { - mpcl->handler = nxt_mem_pool_file_cleanup_handler; - mpcl->data = file; - } - - return mpcl; -} - - -static void -nxt_mem_pool_file_cleanup_handler(nxt_task_t *task, void *data) -{ - nxt_file_t *file; - - file = data; - - if (file->fd != NXT_FILE_INVALID) { - nxt_file_close(task, file); - } -} diff --git a/src/nxt_mem_pool_cleanup.h b/src/nxt_mem_pool_cleanup.h deleted file mode 100644 index f84395d0..00000000 --- a/src/nxt_mem_pool_cleanup.h +++ /dev/null @@ -1,15 +0,0 @@ - -/* - * Copyright (C) Igor Sysoev - * Copyright (C) NGINX, Inc. - */ - -#ifndef _NXT_MEM_POOL_CLEANUP_H_INCLUDED_ -#define _NXT_MEM_POOL_CLEANUP_H_INCLUDED_ - - -NXT_EXPORT nxt_mem_pool_cleanup_t *nxt_mem_pool_file_cleanup(nxt_mem_pool_t *mp, - nxt_file_t *file); - - -#endif /* _NXT_MEM_POOL_CLEANUP_H_INCLUDED_ */ diff --git a/src/nxt_stream_module.c b/src/nxt_stream_module.c deleted file mode 100644 index 25aaec57..00000000 --- a/src/nxt_stream_module.c +++ /dev/null @@ -1,131 +0,0 @@ - -/* - * Copyright (C) Igor Sysoev - * Copyright (C) NGINX, Inc. - */ - -#include <nxt_main.h> -#include <nxt_runtime.h> - - -static void nxt_stream_connection_peer(nxt_task_t *task, - nxt_upstream_peer_t *up); -static void nxt_stream_connection_close(nxt_task_t *task, void *obj, - void *data); - - -void -nxt_stream_connection_init(nxt_task_t *task, void *obj, void *data) -{ - nxt_conn_t *c; - nxt_runtime_t *rt; - nxt_upstream_peer_t *up; - - c = obj; - - nxt_debug(task, "stream connection init"); - - up = nxt_mp_zget(c->mem_pool, sizeof(nxt_upstream_peer_t)); - if (nxt_slow_path(up == NULL)) { - goto fail; - } - - up->data = c; - - rt = task->thread->runtime; - - if (rt->upstream.length != 0) { - up->addr = rt->upstream; - - } else { - nxt_str_set(&up->addr, "127.0.0.1:8080"); - } - - up->ready_handler = nxt_stream_connection_peer; - up->mem_pool = c->mem_pool; - - nxt_upstream_round_robin_peer(task, up); - return; - -fail: - - /* TODO: close connection */ - return; -} - - -static void -nxt_stream_connection_peer(nxt_task_t *task, nxt_upstream_peer_t *up) -{ - nxt_conn_t *c; - nxt_conn_proxy_t *p; - - c = up->data; - - up->sockaddr->type = SOCK_STREAM; - - nxt_log_debug(c->socket.log, "stream connection peer %*s", - (size_t) up->sockaddr->length, - nxt_sockaddr_start(up->sockaddr)); - - p = nxt_conn_proxy_create(c); - if (nxt_slow_path(p == NULL)) { - goto fail; - } - - p->client->socket.data = p; - p->peer->socket.data = p; - - p->client_buffer_size = 1024; - p->peer_buffer_size = 4096; - //p->client_wait_timeout = 9000; - p->connect_timeout = 7000; - p->reconnect_timeout = 500; - //p->peer_wait_timeout = 5000; - p->client_write_timeout = 3000; - p->peer_write_timeout = 3000; - p->completion_handler = nxt_stream_connection_close; - //p->retries = 10; - p->peer->remote = up->sockaddr; - - if (0) { - nxt_event_engine_t *engine; - nxt_event_write_rate_t *rate; - - rate = nxt_mp_get(c->mem_pool, sizeof(nxt_event_write_rate_t)); - - if (nxt_slow_path(rate == NULL)) { - goto fail; - } - - c->rate = rate; - - rate->limit = 1024; - rate->limit_after = 0; - rate->average = rate->limit; - - engine = nxt_thread_event_engine(); - rate->last = engine->timers.now; - } - - nxt_conn_proxy(task, p); - return; - -fail: - - /* TODO: close connection */ - return; -} - - -static void -nxt_stream_connection_close(nxt_task_t *task, void *obj, void *data) -{ - nxt_event_conn_proxy_t *p; - - p = obj; - - nxt_log_debug(p->client->socket.log, "stream connection close"); - - nxt_mp_destroy(p->client->mem_pool); -} diff --git a/src/nxt_stream_source.c b/src/nxt_stream_source.c deleted file mode 100644 index 66ec1640..00000000 --- a/src/nxt_stream_source.c +++ /dev/null @@ -1,480 +0,0 @@ - -/* - * Copyright (C) Igor Sysoev - * Copyright (C) NGINX, Inc. - */ - -#include <nxt_main.h> - - -static void nxt_stream_source_connected(nxt_task_t *task, void *obj, - void *data); -static void nxt_stream_source_write_ready(nxt_task_t *task, void *obj, - void *data); -static void nxt_stream_source_read_ready(nxt_task_t *task, void *obj, - void *data); -static nxt_buf_t *nxt_stream_source_process_buffers(nxt_stream_source_t *stream, - nxt_event_conn_t *c); -static void nxt_stream_source_buf_completion(nxt_task_t *task, void *obj, - void *data); -static void nxt_stream_source_read_done(nxt_task_t *task, void *obj, - void *data); -static void nxt_stream_source_refused(nxt_task_t *task, void *obj, void *data); -static void nxt_stream_source_closed(nxt_task_t *task, void *obj, void *data); -static void nxt_stream_source_error(nxt_task_t *task, void *obj, void *data); -static void nxt_stream_source_close(nxt_task_t *task, - nxt_stream_source_t *stream); - - -static const nxt_event_conn_state_t nxt_stream_source_connect_state; -static const nxt_event_conn_state_t nxt_stream_source_request_write_state; -static const nxt_event_conn_state_t nxt_stream_source_response_ready_state; -static const nxt_event_conn_state_t nxt_stream_source_response_read_state; - - -void -nxt_stream_source_connect(nxt_task_t *task, nxt_stream_source_t *stream) -{ - nxt_thread_t *thr; - nxt_event_conn_t *c; - nxt_upstream_source_t *us; - - thr = nxt_thread(); - - us = stream->upstream; - - if (nxt_slow_path(!nxt_buf_pool_obtainable(&us->buffers))) { - nxt_log(task, NXT_LOG_ERR, - "%d buffers %uDK each are not enough to read upstream response", - us->buffers.max, us->buffers.size / 1024); - goto fail; - } - - c = nxt_event_conn_create(us->buffers.mem_pool, thr->log); - if (nxt_slow_path(c == NULL)) { - goto fail; - } - - stream->conn = c; - c->socket.data = stream; - - nxt_conn_work_queue_set(c, us->work_queue); - - c->remote = us->peer->sockaddr; - c->write_state = &nxt_stream_source_connect_state; - - nxt_event_conn_connect(task, c); - return; - -fail: - - stream->error_handler(task, stream); -} - - -static const nxt_event_conn_state_t nxt_stream_source_connect_state - nxt_aligned(64) = -{ - NXT_EVENT_NO_BUF_PROCESS, - NXT_EVENT_TIMER_AUTORESET, - - nxt_stream_source_connected, - nxt_stream_source_refused, - nxt_stream_source_error, - - NULL, /* timeout */ - NULL, /* timeout value */ - 0, /* connect_timeout */ -}; - - -static void -nxt_stream_source_connected(nxt_task_t *task, void *obj, void *data) -{ - nxt_event_conn_t *c; - nxt_stream_source_t *stream; - - c = obj; - stream = data; - - nxt_debug(task, "stream source connected fd:%d", c->socket.fd); - - c->read_state = &nxt_stream_source_response_ready_state; - c->write = stream->out; - c->write_state = &nxt_stream_source_request_write_state; - - if (task->thread->engine->batch != 0) { - nxt_event_conn_write(task, c); - - } else { - stream->read_queued = 1; - nxt_thread_work_queue_add(task->thread, - &task->thread->engine->read_work_queue, - c->io->read, task, c, stream); - - c->io->write(task, c, stream); - } -} - - -static const nxt_event_conn_state_t nxt_stream_source_request_write_state - nxt_aligned(64) = -{ - NXT_EVENT_NO_BUF_PROCESS, - NXT_EVENT_TIMER_AUTORESET, - - nxt_stream_source_write_ready, - NULL, - nxt_stream_source_error, - - NULL, /* timeout */ - NULL, /* timeout value */ - 0, /* connect_timeout */ -}; - - -static const nxt_event_conn_state_t nxt_stream_source_response_ready_state - nxt_aligned(64) = -{ - NXT_EVENT_NO_BUF_PROCESS, - NXT_EVENT_TIMER_AUTORESET, - - nxt_stream_source_read_ready, - nxt_stream_source_closed, - nxt_stream_source_error, - - NULL, /* timeout */ - NULL, /* timeout value */ - 0, /* connect_timeout */ -}; - - -static void -nxt_stream_source_write_ready(nxt_task_t *task, void *obj, void *data) -{ - nxt_event_conn_t *c; - - c = obj; - - nxt_debug(task, "stream source write ready fd:%d", c->socket.fd); - - nxt_conn_read(task, c); -} - - -static void -nxt_stream_source_read_ready(nxt_task_t *task, void *obj, void *data) -{ - nxt_int_t ret; - nxt_buf_t *b; - nxt_buf_pool_t *buffers; - nxt_event_conn_t *c; - nxt_stream_source_t *stream; - - c = obj; - stream = data; - stream->read_queued = 0; - - nxt_debug(task, "stream source read ready fd:%d", c->socket.fd); - - if (c->read == NULL) { - - buffers = &stream->upstream->buffers; - - ret = nxt_buf_pool_mem_alloc(buffers, 0); - - if (nxt_slow_path(ret != NXT_OK)) { - - if (nxt_slow_path(ret == NXT_ERROR)) { - goto fail; - } - - /* ret == NXT_AGAIN */ - - nxt_debug(task, "stream source flush"); - - b = nxt_buf_sync_alloc(buffers->mem_pool, NXT_BUF_SYNC_NOBUF); - - if (nxt_slow_path(b == NULL)) { - goto fail; - } - - nxt_event_fd_block_read(task->thread->engine, &c->socket); - - nxt_source_filter(task->thread, c->write_work_queue, task, - stream->next, b); - return; - } - - c->read = buffers->current; - buffers->current = NULL; - } - - c->read_state = &nxt_stream_source_response_read_state; - - nxt_conn_read(task, c); - return; - -fail: - - nxt_stream_source_close(task, stream); -} - - -static const nxt_event_conn_state_t nxt_stream_source_response_read_state - nxt_aligned(64) = -{ - NXT_EVENT_NO_BUF_PROCESS, - NXT_EVENT_TIMER_AUTORESET, - - nxt_stream_source_read_done, - nxt_stream_source_closed, - nxt_stream_source_error, - - NULL, /* timeout */ - NULL, /* timeout value */ - 0, /* connect_timeout */ -}; - - -static void -nxt_stream_source_read_done(nxt_task_t *task, void *obj, void *data) -{ - nxt_buf_t *b; - nxt_bool_t batch; - nxt_event_conn_t *c; - nxt_stream_source_t *stream; - - c = obj; - stream = data; - - nxt_debug(task, "stream source read done fd:%d", c->socket.fd); - - if (c->read != NULL) { - b = nxt_stream_source_process_buffers(stream, c); - - if (nxt_slow_path(b == NULL)) { - nxt_stream_source_close(task, stream); - return; - } - - batch = (task->thread->engine->batch != 0); - - if (batch) { - nxt_thread_work_queue_add(task->thread, - stream->upstream->work_queue, - nxt_source_filter_handler, - task, stream->next, b); - } - - if (!stream->read_queued) { - stream->read_queued = 1; - nxt_thread_work_queue_add(task->thread, - stream->upstream->work_queue, - nxt_stream_source_read_ready, - task, c, stream); - } - - if (!batch) { - stream->next->filter(task, stream->next->context, b); - } - } -} - - -static nxt_buf_t * -nxt_stream_source_process_buffers(nxt_stream_source_t *stream, - nxt_event_conn_t *c) -{ - size_t size, nbytes; - nxt_buf_t *b, *in, *head, **prev; - - nbytes = c->nbytes; - prev = &head; - - do { - b = nxt_buf_mem_alloc(stream->upstream->buffers.mem_pool, 0, 0); - - if (nxt_slow_path(b == NULL)) { - return NULL; - } - - *prev = b; - - b->data = stream; - b->completion_handler = nxt_stream_source_buf_completion; - - in = c->read; - in->retain++; - b->parent = in; - - b->mem.pos = in->mem.free; - b->mem.start = in->mem.free; - - size = nxt_buf_mem_free_size(&in->mem); - - if (nbytes < size) { - in->mem.free += nbytes; - - b->mem.free = in->mem.free; - b->mem.end = in->mem.free; - - break; - } - - in->mem.free = in->mem.end; - - b->mem.free = in->mem.free; - b->mem.end = in->mem.free; - nbytes -= size; - - prev = &b->next; - c->read = in->next; - in->next = NULL; - - } while (c->read != NULL); - - return head; -} - - -static void -nxt_stream_source_buf_completion(nxt_task_t *task, void *obj, void *data) -{ - size_t size; - nxt_buf_t *b, *parent; - nxt_stream_source_t *stream; - - b = obj; - parent = data; - -#if 0 - nxt_debug(thr->log, - "stream source buf completion: %p parent:%p retain:%uD", - b, parent, parent->retain); -#endif - - stream = b->data; - - /* A parent is a buffer where stream reads data. */ - - parent->mem.pos = b->mem.pos; - parent->retain--; - - if (parent->retain == 0 && !stream->conn->socket.closed) { - size = nxt_buf_mem_size(&parent->mem); - - parent->mem.pos = parent->mem.start; - parent->mem.free = parent->mem.start; - - /* - * A buffer's original size can be changed by filters - * so reuse the buffer only if it is still large enough. - */ - if (size >= 256 || size >= stream->upstream->buffers.size) { - - if (stream->conn->read != parent) { - nxt_buf_chain_add(&stream->conn->read, parent); - } - - if (!stream->read_queued) { - stream->read_queued = 1; - nxt_thread_work_queue_add(task->thread, - stream->upstream->work_queue, - nxt_stream_source_read_ready, - task, stream->conn, - stream->conn->socket.data); - } - } - } - - nxt_buf_free(stream->upstream->buffers.mem_pool, b); -} - - -static void -nxt_stream_source_refused(nxt_task_t *task, void *obj, void *data) -{ - nxt_stream_source_t *stream; - - stream = data; - -#if (NXT_DEBUG) - { - nxt_event_conn_t *c; - - c = obj; - - nxt_debug(task, "stream source refused fd:%d", c->socket.fd); - } -#endif - - nxt_stream_source_close(task, stream); -} - - -static void -nxt_stream_source_closed(nxt_task_t *task, void *obj, void *data) -{ - nxt_buf_t *b; - nxt_event_conn_t *c; - nxt_stream_source_t *stream; - - c = obj; - stream = data; - - nxt_debug(task, "stream source closed fd:%d", c->socket.fd); - - nxt_conn_close(task, c); - - b = nxt_buf_sync_alloc(stream->upstream->buffers.mem_pool, - NXT_BUF_SYNC_LAST); - - if (nxt_slow_path(b == NULL)) { - stream->error_handler(task, stream); - return; - } - - nxt_source_filter(task->thread, c->write_work_queue, task, stream->next, b); -} - - -static void -nxt_stream_source_error(nxt_task_t *task, void *obj, void *data) -{ - nxt_stream_source_t *stream; - - stream = data; - -#if (NXT_DEBUG) - { - nxt_event_fd_t *ev; - - ev = obj; - - nxt_debug(task, "stream source error fd:%d", ev->fd); - } -#endif - - nxt_stream_source_close(task, stream); -} - - -static void -nxt_stream_source_close(nxt_task_t *task, nxt_stream_source_t *stream) -{ - nxt_conn_close(task, stream->conn); - - stream->error_handler(task, stream); -} - - -void -nxt_source_filter_handler(nxt_task_t *task, void *obj, void *data) -{ - nxt_source_hook_t *next; - - next = obj; - - next->filter(task, next->context, data); -} diff --git a/src/nxt_stream_source.h b/src/nxt_stream_source.h deleted file mode 100644 index 2d57073f..00000000 --- a/src/nxt_stream_source.h +++ /dev/null @@ -1,32 +0,0 @@ - -/* - * Copyright (C) Igor Sysoev - * Copyright (C) NGINX, Inc. - */ - -#ifndef _NXT_STREAM_SOURCE_H_INCLUDED_ -#define _NXT_STREAM_SOURCE_H_INCLUDED_ - - -typedef struct nxt_stream_source_s nxt_stream_source_t; - -typedef void (*nxt_stream_source_handler_t)(nxt_task_t *task, - nxt_stream_source_t *s); - -struct nxt_stream_source_s { - nxt_conn_t *conn; - nxt_source_hook_t *next; - nxt_upstream_source_t *upstream; - - nxt_buf_t *out; - - uint32_t read_queued; /* 1 bit */ - - nxt_stream_source_handler_t error_handler; -}; - - -void nxt_stream_source_connect(nxt_task_t *task, nxt_stream_source_t *stream); - - -#endif /* _NXT_STREAM_SOURCE_H_INCLUDED_ */ diff --git a/src/nxt_upstream_source.c b/src/nxt_upstream_source.c deleted file mode 100644 index ee3fc21e..00000000 --- a/src/nxt_upstream_source.c +++ /dev/null @@ -1,71 +0,0 @@ - -/* - * Copyright (C) Igor Sysoev - * Copyright (C) NGINX, Inc. - */ - -#include <nxt_main.h> - - -static nxt_int_t nxt_upstream_header_hash_test(nxt_lvlhsh_query_t *lhq, - void *data); - - -const nxt_lvlhsh_proto_t nxt_upstream_header_hash_proto nxt_aligned(64) = { - NXT_LVLHSH_DEFAULT, - 0, - nxt_upstream_header_hash_test, - nxt_mem_lvlhsh_alloc, - nxt_mem_lvlhsh_free, -}; - - -nxt_int_t -nxt_upstream_header_hash_add(nxt_mp_t *mp, nxt_lvlhsh_t *lh, - const nxt_upstream_name_value_t *unv, nxt_uint_t n) -{ - nxt_lvlhsh_query_t lhq; - - while (n != 0) { - lhq.key_hash = nxt_djb_hash(unv->name, unv->len); - lhq.replace = 1; - lhq.key.len = unv->len; - lhq.key.data = (u_char *) unv->name; - lhq.value = (void *) unv; - lhq.proto = &nxt_upstream_header_hash_proto; - lhq.pool = mp; - - if (nxt_lvlhsh_insert(lh, &lhq) != NXT_OK) { - return NXT_ERROR; - } - - unv++; - n--; - } - - return NXT_OK; -} - - -static nxt_int_t -nxt_upstream_header_hash_test(nxt_lvlhsh_query_t *lhq, void *data) -{ - nxt_upstream_name_value_t *unv; - - unv = data; - - if (lhq->key.len == unv->len - && nxt_memcasecmp(lhq->key.data, unv->name, unv->len) == 0) - { - return NXT_OK; - } - - return NXT_DECLINED; -} - - -nxt_int_t -nxt_upstream_name_value_ignore(nxt_upstream_source_t *us, nxt_name_value_t *nv) -{ - return NXT_OK; -} diff --git a/src/nxt_upstream_source.h b/src/nxt_upstream_source.h deleted file mode 100644 index 143b8d0c..00000000 --- a/src/nxt_upstream_source.h +++ /dev/null @@ -1,83 +0,0 @@ - -/* - * Copyright (C) Igor Sysoev - * Copyright (C) NGINX, Inc. - */ - -#ifndef _NXT_UPSTREAM_SOURCE_H_INCLUDED_ -#define _NXT_UPSTREAM_SOURCE_H_INCLUDED_ - - -typedef struct { - uint32_t hash; - - unsigned value_len:23; - unsigned skip:1; - unsigned name_len:8; - - u_char *value_start; - u_char *name_start; -} nxt_name_value_t; - - -typedef struct { - nxt_list_t *list; - nxt_lvlhsh_t hash; - - uint16_t status; /* 16 bits */ - - nxt_off_t content_length; -} nxt_upstream_header_in_t; - - -typedef nxt_int_t (*nxt_upstream_name_value_handler_t)( - nxt_upstream_source_t *us, nxt_name_value_t *nv); - - -typedef struct { - nxt_upstream_name_value_handler_t handler; - - uint8_t len; - /* - * A name is inlined to test it with one memory access. - * The struct size is aligned to 32 bytes. - */ -#if (NXT_64BIT) - u_char name[23]; -#else - u_char name[27]; -#endif -} nxt_upstream_name_value_t; - - -struct nxt_upstream_source_s { - nxt_upstream_peer_t *peer; - - const nxt_upstream_state_t *state; - - void *protocol_source; - void *data; - nxt_work_queue_t *work_queue; - - nxt_buf_pool_t buffers; - - nxt_lvlhsh_t header_hash; - nxt_stream_source_t *stream; -}; - - -#define NXT_UPSTREAM_NAME_VALUE_MIN_SIZE \ - offsetof(nxt_http_upstream_header_t, name) - -#define nxt_upstream_name_value(s) nxt_length(s), s - - -NXT_EXPORT nxt_int_t nxt_upstream_header_hash_add(nxt_mp_t *mp, - nxt_lvlhsh_t *lh, const nxt_upstream_name_value_t *unv, nxt_uint_t n); -NXT_EXPORT nxt_int_t nxt_upstream_name_value_ignore(nxt_upstream_source_t *us, - nxt_name_value_t *nv); - -NXT_EXPORT extern const nxt_lvlhsh_proto_t nxt_upstream_header_hash_proto; - - -#endif /* _NXT_UPSTREAM_SOURCE_H_INCLUDED_ */ |