diff options
author | Igor Sysoev <igor@sysoev.ru> | 2017-01-17 20:00:00 +0300 |
---|---|---|
committer | Igor Sysoev <igor@sysoev.ru> | 2017-01-17 20:00:00 +0300 |
commit | 16cbf3c076a0aca6d47adaf3f719493674cf2363 (patch) | |
tree | e6530480020f62a2bdbf249988ec3e2a751d3927 /src/nxt_http_source.c | |
download | unit-16cbf3c076a0aca6d47adaf3f719493674cf2363.tar.gz unit-16cbf3c076a0aca6d47adaf3f719493674cf2363.tar.bz2 |
Initial version.
Diffstat (limited to '')
-rw-r--r-- | src/nxt_http_source.c | 630 |
1 files changed, 630 insertions, 0 deletions
diff --git a/src/nxt_http_source.c b/src/nxt_http_source.c new file mode 100644 index 00000000..045d585b --- /dev/null +++ b/src/nxt_http_source.c @@ -0,0 +1,630 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) NGINX, Inc. + */ + +#include <nxt_main.h> + + +typedef struct { + nxt_http_chunk_parse_t parse; + nxt_source_hook_t next; +} nxt_http_source_chunk_t; + + +static nxt_buf_t *nxt_http_source_request_create(nxt_http_source_t *hs); + +static void nxt_http_source_status_filter(nxt_thread_t *thr, void *obj, + void *data); +static void nxt_http_source_header_filter(nxt_thread_t *thr, void *obj, + void *data); + +static nxt_int_t nxt_http_source_header_line_process(nxt_http_source_t *hs); +static nxt_int_t nxt_http_source_content_length(nxt_upstream_source_t *us, + nxt_name_value_t *nv); +static nxt_int_t nxt_http_source_transfer_encoding(nxt_upstream_source_t *us, + nxt_name_value_t *nv); + +static void nxt_http_source_header_ready(nxt_http_source_t *hs, + nxt_buf_t *rest); +static void nxt_http_source_chunk_filter(nxt_thread_t *thr, void *obj, + void *data); +static void nxt_http_source_chunk_error(nxt_thread_t *thr, void *obj, + void *data); +static void nxt_http_source_body_filter(nxt_thread_t *thr, void *obj, + void *data); + +static void nxt_http_source_sync_buffer(nxt_thread_t *thr, + nxt_http_source_t *hs, nxt_buf_t *b); +static void nxt_http_source_error(nxt_stream_source_t *stream); +static void nxt_http_source_fail(nxt_http_source_t *hs); +static void nxt_http_source_message(const char *msg, size_t len, u_char *p); + + +void +nxt_http_source_handler(nxt_upstream_source_t *us, + nxt_http_source_request_create_t request_create) +{ + nxt_http_source_t *hs; + nxt_stream_source_t *stream; + + hs = nxt_mem_zalloc(us->buffers.mem_pool, sizeof(nxt_http_source_t)); + if (nxt_slow_path(hs == NULL)) { + goto fail; + } + + us->protocol_source = hs; + + hs->header_in.list = nxt_list_create(us->buffers.mem_pool, 8, + sizeof(nxt_name_value_t)); + if (nxt_slow_path(hs->header_in.list == NULL)) { + goto fail; + } + + hs->header_in.hash = us->header_hash; + hs->upstream = us; + hs->request_create = request_create; + + stream = us->stream; + + if (stream == NULL) { + stream = nxt_mem_zalloc(us->buffers.mem_pool, + sizeof(nxt_stream_source_t)); + if (nxt_slow_path(stream == NULL)) { + goto fail; + } + + us->stream = stream; + stream->upstream = us; + + } else { + nxt_memzero(stream, sizeof(nxt_stream_source_t)); + } + + /* + * Create the HTTP source filter chain: + * stream source | HTTP status line filter + */ + stream->next = &hs->query; + stream->error_handler = nxt_http_source_error; + + hs->query.context = hs; + hs->query.filter = nxt_http_source_status_filter; + + hs->header_in.content_length = -1; + + stream->out = nxt_http_source_request_create(hs); + + if (nxt_fast_path(stream->out != NULL)) { + nxt_memzero(&hs->u.status_parse, sizeof(nxt_http_status_parse_t)); + + nxt_stream_source_connect(stream); + return; + } + +fail: + + nxt_http_source_fail(hs); +} + + +nxt_inline u_char * +nxt_http_source_copy(u_char *p, nxt_str_t *src, size_t len) +{ + u_char *s; + + if (nxt_fast_path(len >= src->len)) { + len = src->len; + src->len = 0; + + } else { + src->len -= len; + } + + s = src->data; + src->data += len; + + return nxt_cpymem(p, s, len); +} + + +static nxt_buf_t * +nxt_http_source_request_create(nxt_http_source_t *hs) +{ + nxt_int_t ret; + nxt_buf_t *b, *req, **prev; + + nxt_thread_log_debug("http source create request"); + + prev = &req; + +new_buffer: + + ret = nxt_buf_pool_mem_alloc(&hs->upstream->buffers, 0); + if (nxt_slow_path(ret != NXT_OK)) { + return NULL; + } + + b = hs->upstream->buffers.current; + hs->upstream->buffers.current = NULL; + + *prev = b; + prev = &b->next; + + for ( ;; ) { + ret = hs->request_create(hs); + + if (nxt_fast_path(ret == NXT_OK)) { + b->mem.free = nxt_http_source_copy(b->mem.free, &hs->u.request.copy, + b->mem.end - b->mem.free); + + if (nxt_fast_path(hs->u.request.copy.len == 0)) { + continue; + } + + nxt_thread_log_debug("\"%*s\"", b->mem.free - b->mem.pos, + b->mem.pos); + + goto new_buffer; + } + + if (nxt_slow_path(ret == NXT_ERROR)) { + return NULL; + } + + /* ret == NXT_DONE */ + break; + } + + nxt_thread_log_debug("\"%*s\"", b->mem.free - b->mem.pos, b->mem.pos); + + return req; +} + + +static void +nxt_http_source_status_filter(nxt_thread_t *thr, void *obj, void *data) +{ + nxt_int_t ret; + nxt_buf_t *b; + nxt_http_source_t *hs; + + hs = obj; + b = data; + + /* + * No cycle over buffer chain is required since at + * start the stream source passes buffers one at a time. + */ + + nxt_log_debug(thr->log, "http source status filter"); + + if (nxt_slow_path(nxt_buf_is_sync(b))) { + nxt_http_source_sync_buffer(thr, hs, b); + return; + } + + ret = nxt_http_status_parse(&hs->u.status_parse, &b->mem); + + if (nxt_fast_path(ret == NXT_OK)) { + /* + * Change the HTTP source filter chain: + * stream source | HTTP header filter + */ + hs->query.filter = nxt_http_source_header_filter; + + nxt_log_debug(thr->log, "upstream status: \"%*s\"", + hs->u.status_parse.end - b->mem.start, b->mem.start); + + hs->header_in.status = hs->u.status_parse.code; + + nxt_log_debug(thr->log, "upstream version:%d status:%uD \"%*s\"", + hs->u.status_parse.http_version, + hs->u.status_parse.code, + hs->u.status_parse.end - hs->u.status_parse.start, + hs->u.status_parse.start); + + nxt_memzero(&hs->u.header, sizeof(nxt_http_split_header_parse_t)); + hs->u.header.mem_pool = hs->upstream->buffers.mem_pool; + + nxt_http_source_header_filter(thr, hs, b); + return; + } + + if (nxt_slow_path(ret == NXT_ERROR)) { + /* HTTP/0.9 response. */ + hs->header_in.status = 200; + nxt_http_source_header_ready(hs, b); + return; + } + + /* ret == NXT_AGAIN */ + + /* + * b->mem.pos is always equal to b->mem.end because b is a buffer + * which points to a response part read by the stream source. + * However, since the stream source is an immediate source of the + * status filter, b->parent is a buffer the stream source reads in. + */ + if (b->parent->mem.pos == b->parent->mem.end) { + nxt_http_source_message("upstream sent too long status line: \"%*s\"", + b->mem.pos - b->mem.start, b->mem.start); + + nxt_http_source_fail(hs); + } +} + + +static void +nxt_http_source_header_filter(nxt_thread_t *thr, void *obj, void *data) +{ + nxt_int_t ret; + nxt_buf_t *b; + nxt_http_source_t *hs; + + hs = obj; + b = data; + + /* + * No cycle over buffer chain is required since at + * start the stream source passes buffers one at a time. + */ + + nxt_log_debug(thr->log, "http source header filter"); + + if (nxt_slow_path(nxt_buf_is_sync(b))) { + nxt_http_source_sync_buffer(thr, hs, b); + return; + } + + for ( ;; ) { + ret = nxt_http_split_header_parse(&hs->u.header, &b->mem); + + if (nxt_slow_path(ret != NXT_OK)) { + break; + } + + ret = nxt_http_source_header_line_process(hs); + + if (nxt_slow_path(ret != NXT_OK)) { + break; + } + } + + if (nxt_fast_path(ret == NXT_DONE)) { + nxt_log_debug(thr->log, "http source header done"); + nxt_http_source_header_ready(hs, b); + return; + } + + if (nxt_fast_path(ret == NXT_AGAIN)) { + return; + } + + if (ret != NXT_ERROR) { + /* ret == NXT_DECLINED: "\r" is not followed by "\n" */ + nxt_log_error(NXT_LOG_ERR, thr->log, + "upstream sent invalid header line: \"%*s\\r...\"", + hs->u.header.parse.header_end + - hs->u.header.parse.header_name_start, + hs->u.header.parse.header_name_start); + } + + /* ret == NXT_ERROR */ + + nxt_http_source_fail(hs); +} + + +static nxt_int_t +nxt_http_source_header_line_process(nxt_http_source_t *hs) +{ + size_t name_len; + nxt_name_value_t *nv; + nxt_lvlhsh_query_t lhq; + nxt_http_header_parse_t *hp; + nxt_upstream_name_value_t *unv; + + hp = &hs->u.header.parse; + + name_len = hp->header_name_end - hp->header_name_start; + + if (name_len > 255) { + nxt_http_source_message("upstream sent too long header field name: " + "\"%*s\"", name_len, hp->header_name_start); + return NXT_ERROR; + } + + nv = nxt_list_add(hs->header_in.list); + if (nxt_slow_path(nv == NULL)) { + return NXT_ERROR; + } + + nv->hash = hp->header_hash; + nv->skip = 0; + nv->name_len = name_len; + nv->name_start = hp->header_name_start; + nv->value_len = hp->header_end - hp->header_start; + nv->value_start = hp->header_start; + + nxt_thread_log_debug("upstream header: \"%*s: %*s\"", + nv->name_len, nv->name_start, + nv->value_len, nv->value_start); + + lhq.key_hash = nv->hash; + lhq.key.len = nv->name_len; + lhq.key.data = nv->name_start; + lhq.proto = &nxt_upstream_header_hash_proto; + + if (nxt_lvlhsh_find(&hs->header_in.hash, &lhq) == NXT_OK) { + unv = lhq.value; + + if (unv->handler(hs->upstream, nv) != NXT_OK) { + return NXT_ERROR; + } + } + + return NXT_OK; +} + + +static const nxt_upstream_name_value_t nxt_http_source_headers[] + nxt_aligned(32) = +{ + { nxt_http_source_content_length, + nxt_upstream_name_value("content-length") }, + + { nxt_http_source_transfer_encoding, + nxt_upstream_name_value("transfer-encoding") }, +}; + + +nxt_int_t +nxt_http_source_hash_create(nxt_mem_pool_t *mp, nxt_lvlhsh_t *lh) +{ + return nxt_upstream_header_hash_add(mp, lh, nxt_http_source_headers, + nxt_nitems(nxt_http_source_headers)); +} + + +static nxt_int_t +nxt_http_source_content_length(nxt_upstream_source_t *us, nxt_name_value_t *nv) +{ + nxt_off_t length; + nxt_http_source_t *hs; + + length = nxt_off_t_parse(nv->value_start, nv->value_len); + + if (nxt_fast_path(length > 0)) { + hs = us->protocol_source; + hs->header_in.content_length = length; + return NXT_OK; + } + + return NXT_ERROR; +} + + +static nxt_int_t +nxt_http_source_transfer_encoding(nxt_upstream_source_t *us, + nxt_name_value_t *nv) +{ + u_char *end; + nxt_http_source_t *hs; + + end = nv->value_start + nv->value_len; + + if (nxt_memcasestrn(nv->value_start, end, "chunked", 7) != NULL) { + hs = us->protocol_source; + hs->chunked = 1; + } + + return NXT_OK; +} + + +static void +nxt_http_source_header_ready(nxt_http_source_t *hs, nxt_buf_t *rest) +{ + nxt_buf_t *b; + nxt_upstream_source_t *us; + nxt_http_source_chunk_t *hsc; + + us = hs->upstream; + + /* Free buffers used for request header. */ + + for (b = us->stream->out; b != NULL; b = b->next) { + nxt_buf_pool_free(&us->buffers, b); + } + + if (nxt_fast_path(nxt_buf_pool_available(&us->buffers))) { + + if (hs->chunked) { + hsc = nxt_mem_zalloc(hs->upstream->buffers.mem_pool, + sizeof(nxt_http_source_chunk_t)); + if (nxt_slow_path(hsc == NULL)) { + goto fail; + } + + /* + * Change the HTTP source filter chain: + * stream source | chunk filter | HTTP body filter + */ + hs->query.context = hsc; + hs->query.filter = nxt_http_source_chunk_filter; + + hsc->next.context = hs; + hsc->next.filter = nxt_http_source_body_filter; + + hsc->parse.mem_pool = hs->upstream->buffers.mem_pool; + + if (nxt_buf_mem_used_size(&rest->mem) != 0) { + hs->rest = nxt_http_chunk_parse(&hsc->parse, rest); + + if (nxt_slow_path(hs->rest == NULL)) { + goto fail; + } + } + + } else { + /* + * Change the HTTP source filter chain: + * stream source | HTTP body filter + */ + hs->query.filter = nxt_http_source_body_filter; + + if (nxt_buf_mem_used_size(&rest->mem) != 0) { + hs->rest = rest; + } + } + + hs->upstream->state->ready_handler(hs); + return; + } + + nxt_thread_log_error(NXT_LOG_ERR, "%d buffers %uDK each " + "are not enough to read upstream response", + us->buffers.max, us->buffers.size / 1024); +fail: + + nxt_http_source_fail(hs); +} + + +static void +nxt_http_source_chunk_filter(nxt_thread_t *thr, void *obj, void *data) +{ + nxt_buf_t *b; + nxt_http_source_t *hs; + nxt_http_source_chunk_t *hsc; + + hsc = obj; + b = data; + + nxt_log_debug(thr->log, "http source chunk filter"); + + b = nxt_http_chunk_parse(&hsc->parse, b); + + hs = hsc->next.context; + + if (hsc->parse.error) { + nxt_http_source_fail(hs); + return; + } + + if (hsc->parse.chunk_error) { + /* Output all parsed before a chunk error and close upstream. */ + nxt_thread_current_work_queue_add(thr, nxt_http_source_chunk_error, + hs, NULL, thr->log); + } + + if (b != NULL) { + nxt_source_filter(thr, hs->upstream->work_queue, &hsc->next, b); + } +} + + +static void +nxt_http_source_chunk_error(nxt_thread_t *thr, void *obj, void *data) +{ + nxt_http_source_t *hs; + + hs = obj; + + nxt_http_source_fail(hs); +} + + +/* + * The HTTP source body filter accumulates first body buffers before the next + * filter will be established and sets completion handler for the last buffer. + */ + +static void +nxt_http_source_body_filter(nxt_thread_t *thr, void *obj, void *data) +{ + nxt_buf_t *b, *in; + nxt_http_source_t *hs; + + hs = obj; + in = data; + + nxt_log_debug(thr->log, "http source body filter"); + + for (b = in; b != NULL; b = b->next) { + + if (nxt_buf_is_last(b)) { + b->data = hs->upstream->data; + b->completion_handler = hs->upstream->state->completion_handler; + } + } + + if (hs->next != NULL) { + nxt_source_filter(thr, hs->upstream->work_queue, hs->next, in); + return; + } + + nxt_buf_chain_add(&hs->rest, in); +} + + +static void +nxt_http_source_sync_buffer(nxt_thread_t *thr, nxt_http_source_t *hs, + nxt_buf_t *b) +{ + if (nxt_buf_is_last(b)) { + nxt_log_error(NXT_LOG_ERR, thr->log, + "upstream closed prematurely connection"); + + } else { + nxt_log_error(NXT_LOG_ERR, thr->log, "%ui buffers %uz each are not " + "enough to process upstream response header", + hs->upstream->buffers.max, + hs->upstream->buffers.size); + } + + /* The stream source sends only the last and the nobuf sync buffer. */ + + nxt_http_source_fail(hs); +} + + +static void +nxt_http_source_error(nxt_stream_source_t *stream) +{ + nxt_http_source_t *hs; + + nxt_thread_log_debug("http source error"); + + hs = stream->next->context; + nxt_http_source_fail(hs); +} + + +static void +nxt_http_source_fail(nxt_http_source_t *hs) +{ + nxt_thread_t *thr; + + thr = nxt_thread(); + + nxt_log_debug(thr->log, "http source fail"); + + /* TODO: fail, next upstream, or bad gateway */ + + hs->upstream->state->error_handler(thr, hs, NULL); +} + + +static void +nxt_http_source_message(const char *msg, size_t len, u_char *p) +{ + if (len > NXT_MAX_ERROR_STR - 300) { + len = NXT_MAX_ERROR_STR - 300; + p[len++] = '.'; p[len++] = '.'; p[len++] = '.'; + } + + nxt_thread_log_error(NXT_LOG_ERR, msg, len, p); +} |