summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_http_source.c
diff options
context:
space:
mode:
authorIgor Sysoev <igor@sysoev.ru>2017-01-17 20:00:00 +0300
committerIgor Sysoev <igor@sysoev.ru>2017-01-17 20:00:00 +0300
commit16cbf3c076a0aca6d47adaf3f719493674cf2363 (patch)
treee6530480020f62a2bdbf249988ec3e2a751d3927 /src/nxt_http_source.c
downloadunit-16cbf3c076a0aca6d47adaf3f719493674cf2363.tar.gz
unit-16cbf3c076a0aca6d47adaf3f719493674cf2363.tar.bz2
Initial version.
Diffstat (limited to '')
-rw-r--r--src/nxt_http_source.c630
1 files changed, 630 insertions, 0 deletions
diff --git a/src/nxt_http_source.c b/src/nxt_http_source.c
new file mode 100644
index 00000000..045d585b
--- /dev/null
+++ b/src/nxt_http_source.c
@@ -0,0 +1,630 @@
+
+/*
+ * Copyright (C) Igor Sysoev
+ * Copyright (C) NGINX, Inc.
+ */
+
+#include <nxt_main.h>
+
+
+typedef struct {
+ nxt_http_chunk_parse_t parse;
+ nxt_source_hook_t next;
+} nxt_http_source_chunk_t;
+
+
+static nxt_buf_t *nxt_http_source_request_create(nxt_http_source_t *hs);
+
+static void nxt_http_source_status_filter(nxt_thread_t *thr, void *obj,
+ void *data);
+static void nxt_http_source_header_filter(nxt_thread_t *thr, void *obj,
+ void *data);
+
+static nxt_int_t nxt_http_source_header_line_process(nxt_http_source_t *hs);
+static nxt_int_t nxt_http_source_content_length(nxt_upstream_source_t *us,
+ nxt_name_value_t *nv);
+static nxt_int_t nxt_http_source_transfer_encoding(nxt_upstream_source_t *us,
+ nxt_name_value_t *nv);
+
+static void nxt_http_source_header_ready(nxt_http_source_t *hs,
+ nxt_buf_t *rest);
+static void nxt_http_source_chunk_filter(nxt_thread_t *thr, void *obj,
+ void *data);
+static void nxt_http_source_chunk_error(nxt_thread_t *thr, void *obj,
+ void *data);
+static void nxt_http_source_body_filter(nxt_thread_t *thr, void *obj,
+ void *data);
+
+static void nxt_http_source_sync_buffer(nxt_thread_t *thr,
+ nxt_http_source_t *hs, nxt_buf_t *b);
+static void nxt_http_source_error(nxt_stream_source_t *stream);
+static void nxt_http_source_fail(nxt_http_source_t *hs);
+static void nxt_http_source_message(const char *msg, size_t len, u_char *p);
+
+
+void
+nxt_http_source_handler(nxt_upstream_source_t *us,
+ nxt_http_source_request_create_t request_create)
+{
+ nxt_http_source_t *hs;
+ nxt_stream_source_t *stream;
+
+ hs = nxt_mem_zalloc(us->buffers.mem_pool, sizeof(nxt_http_source_t));
+ if (nxt_slow_path(hs == NULL)) {
+ goto fail;
+ }
+
+ us->protocol_source = hs;
+
+ hs->header_in.list = nxt_list_create(us->buffers.mem_pool, 8,
+ sizeof(nxt_name_value_t));
+ if (nxt_slow_path(hs->header_in.list == NULL)) {
+ goto fail;
+ }
+
+ hs->header_in.hash = us->header_hash;
+ hs->upstream = us;
+ hs->request_create = request_create;
+
+ stream = us->stream;
+
+ if (stream == NULL) {
+ stream = nxt_mem_zalloc(us->buffers.mem_pool,
+ sizeof(nxt_stream_source_t));
+ if (nxt_slow_path(stream == NULL)) {
+ goto fail;
+ }
+
+ us->stream = stream;
+ stream->upstream = us;
+
+ } else {
+ nxt_memzero(stream, sizeof(nxt_stream_source_t));
+ }
+
+ /*
+ * Create the HTTP source filter chain:
+ * stream source | HTTP status line filter
+ */
+ stream->next = &hs->query;
+ stream->error_handler = nxt_http_source_error;
+
+ hs->query.context = hs;
+ hs->query.filter = nxt_http_source_status_filter;
+
+ hs->header_in.content_length = -1;
+
+ stream->out = nxt_http_source_request_create(hs);
+
+ if (nxt_fast_path(stream->out != NULL)) {
+ nxt_memzero(&hs->u.status_parse, sizeof(nxt_http_status_parse_t));
+
+ nxt_stream_source_connect(stream);
+ return;
+ }
+
+fail:
+
+ nxt_http_source_fail(hs);
+}
+
+
+nxt_inline u_char *
+nxt_http_source_copy(u_char *p, nxt_str_t *src, size_t len)
+{
+ u_char *s;
+
+ if (nxt_fast_path(len >= src->len)) {
+ len = src->len;
+ src->len = 0;
+
+ } else {
+ src->len -= len;
+ }
+
+ s = src->data;
+ src->data += len;
+
+ return nxt_cpymem(p, s, len);
+}
+
+
+static nxt_buf_t *
+nxt_http_source_request_create(nxt_http_source_t *hs)
+{
+ nxt_int_t ret;
+ nxt_buf_t *b, *req, **prev;
+
+ nxt_thread_log_debug("http source create request");
+
+ prev = &req;
+
+new_buffer:
+
+ ret = nxt_buf_pool_mem_alloc(&hs->upstream->buffers, 0);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ return NULL;
+ }
+
+ b = hs->upstream->buffers.current;
+ hs->upstream->buffers.current = NULL;
+
+ *prev = b;
+ prev = &b->next;
+
+ for ( ;; ) {
+ ret = hs->request_create(hs);
+
+ if (nxt_fast_path(ret == NXT_OK)) {
+ b->mem.free = nxt_http_source_copy(b->mem.free, &hs->u.request.copy,
+ b->mem.end - b->mem.free);
+
+ if (nxt_fast_path(hs->u.request.copy.len == 0)) {
+ continue;
+ }
+
+ nxt_thread_log_debug("\"%*s\"", b->mem.free - b->mem.pos,
+ b->mem.pos);
+
+ goto new_buffer;
+ }
+
+ if (nxt_slow_path(ret == NXT_ERROR)) {
+ return NULL;
+ }
+
+ /* ret == NXT_DONE */
+ break;
+ }
+
+ nxt_thread_log_debug("\"%*s\"", b->mem.free - b->mem.pos, b->mem.pos);
+
+ return req;
+}
+
+
+static void
+nxt_http_source_status_filter(nxt_thread_t *thr, void *obj, void *data)
+{
+ nxt_int_t ret;
+ nxt_buf_t *b;
+ nxt_http_source_t *hs;
+
+ hs = obj;
+ b = data;
+
+ /*
+ * No cycle over buffer chain is required since at
+ * start the stream source passes buffers one at a time.
+ */
+
+ nxt_log_debug(thr->log, "http source status filter");
+
+ if (nxt_slow_path(nxt_buf_is_sync(b))) {
+ nxt_http_source_sync_buffer(thr, hs, b);
+ return;
+ }
+
+ ret = nxt_http_status_parse(&hs->u.status_parse, &b->mem);
+
+ if (nxt_fast_path(ret == NXT_OK)) {
+ /*
+ * Change the HTTP source filter chain:
+ * stream source | HTTP header filter
+ */
+ hs->query.filter = nxt_http_source_header_filter;
+
+ nxt_log_debug(thr->log, "upstream status: \"%*s\"",
+ hs->u.status_parse.end - b->mem.start, b->mem.start);
+
+ hs->header_in.status = hs->u.status_parse.code;
+
+ nxt_log_debug(thr->log, "upstream version:%d status:%uD \"%*s\"",
+ hs->u.status_parse.http_version,
+ hs->u.status_parse.code,
+ hs->u.status_parse.end - hs->u.status_parse.start,
+ hs->u.status_parse.start);
+
+ nxt_memzero(&hs->u.header, sizeof(nxt_http_split_header_parse_t));
+ hs->u.header.mem_pool = hs->upstream->buffers.mem_pool;
+
+ nxt_http_source_header_filter(thr, hs, b);
+ return;
+ }
+
+ if (nxt_slow_path(ret == NXT_ERROR)) {
+ /* HTTP/0.9 response. */
+ hs->header_in.status = 200;
+ nxt_http_source_header_ready(hs, b);
+ return;
+ }
+
+ /* ret == NXT_AGAIN */
+
+ /*
+ * b->mem.pos is always equal to b->mem.end because b is a buffer
+ * which points to a response part read by the stream source.
+ * However, since the stream source is an immediate source of the
+ * status filter, b->parent is a buffer the stream source reads in.
+ */
+ if (b->parent->mem.pos == b->parent->mem.end) {
+ nxt_http_source_message("upstream sent too long status line: \"%*s\"",
+ b->mem.pos - b->mem.start, b->mem.start);
+
+ nxt_http_source_fail(hs);
+ }
+}
+
+
+static void
+nxt_http_source_header_filter(nxt_thread_t *thr, void *obj, void *data)
+{
+ nxt_int_t ret;
+ nxt_buf_t *b;
+ nxt_http_source_t *hs;
+
+ hs = obj;
+ b = data;
+
+ /*
+ * No cycle over buffer chain is required since at
+ * start the stream source passes buffers one at a time.
+ */
+
+ nxt_log_debug(thr->log, "http source header filter");
+
+ if (nxt_slow_path(nxt_buf_is_sync(b))) {
+ nxt_http_source_sync_buffer(thr, hs, b);
+ return;
+ }
+
+ for ( ;; ) {
+ ret = nxt_http_split_header_parse(&hs->u.header, &b->mem);
+
+ if (nxt_slow_path(ret != NXT_OK)) {
+ break;
+ }
+
+ ret = nxt_http_source_header_line_process(hs);
+
+ if (nxt_slow_path(ret != NXT_OK)) {
+ break;
+ }
+ }
+
+ if (nxt_fast_path(ret == NXT_DONE)) {
+ nxt_log_debug(thr->log, "http source header done");
+ nxt_http_source_header_ready(hs, b);
+ return;
+ }
+
+ if (nxt_fast_path(ret == NXT_AGAIN)) {
+ return;
+ }
+
+ if (ret != NXT_ERROR) {
+ /* ret == NXT_DECLINED: "\r" is not followed by "\n" */
+ nxt_log_error(NXT_LOG_ERR, thr->log,
+ "upstream sent invalid header line: \"%*s\\r...\"",
+ hs->u.header.parse.header_end
+ - hs->u.header.parse.header_name_start,
+ hs->u.header.parse.header_name_start);
+ }
+
+ /* ret == NXT_ERROR */
+
+ nxt_http_source_fail(hs);
+}
+
+
+static nxt_int_t
+nxt_http_source_header_line_process(nxt_http_source_t *hs)
+{
+ size_t name_len;
+ nxt_name_value_t *nv;
+ nxt_lvlhsh_query_t lhq;
+ nxt_http_header_parse_t *hp;
+ nxt_upstream_name_value_t *unv;
+
+ hp = &hs->u.header.parse;
+
+ name_len = hp->header_name_end - hp->header_name_start;
+
+ if (name_len > 255) {
+ nxt_http_source_message("upstream sent too long header field name: "
+ "\"%*s\"", name_len, hp->header_name_start);
+ return NXT_ERROR;
+ }
+
+ nv = nxt_list_add(hs->header_in.list);
+ if (nxt_slow_path(nv == NULL)) {
+ return NXT_ERROR;
+ }
+
+ nv->hash = hp->header_hash;
+ nv->skip = 0;
+ nv->name_len = name_len;
+ nv->name_start = hp->header_name_start;
+ nv->value_len = hp->header_end - hp->header_start;
+ nv->value_start = hp->header_start;
+
+ nxt_thread_log_debug("upstream header: \"%*s: %*s\"",
+ nv->name_len, nv->name_start,
+ nv->value_len, nv->value_start);
+
+ lhq.key_hash = nv->hash;
+ lhq.key.len = nv->name_len;
+ lhq.key.data = nv->name_start;
+ lhq.proto = &nxt_upstream_header_hash_proto;
+
+ if (nxt_lvlhsh_find(&hs->header_in.hash, &lhq) == NXT_OK) {
+ unv = lhq.value;
+
+ if (unv->handler(hs->upstream, nv) != NXT_OK) {
+ return NXT_ERROR;
+ }
+ }
+
+ return NXT_OK;
+}
+
+
+static const nxt_upstream_name_value_t nxt_http_source_headers[]
+ nxt_aligned(32) =
+{
+ { nxt_http_source_content_length,
+ nxt_upstream_name_value("content-length") },
+
+ { nxt_http_source_transfer_encoding,
+ nxt_upstream_name_value("transfer-encoding") },
+};
+
+
+nxt_int_t
+nxt_http_source_hash_create(nxt_mem_pool_t *mp, nxt_lvlhsh_t *lh)
+{
+ return nxt_upstream_header_hash_add(mp, lh, nxt_http_source_headers,
+ nxt_nitems(nxt_http_source_headers));
+}
+
+
+static nxt_int_t
+nxt_http_source_content_length(nxt_upstream_source_t *us, nxt_name_value_t *nv)
+{
+ nxt_off_t length;
+ nxt_http_source_t *hs;
+
+ length = nxt_off_t_parse(nv->value_start, nv->value_len);
+
+ if (nxt_fast_path(length > 0)) {
+ hs = us->protocol_source;
+ hs->header_in.content_length = length;
+ return NXT_OK;
+ }
+
+ return NXT_ERROR;
+}
+
+
+static nxt_int_t
+nxt_http_source_transfer_encoding(nxt_upstream_source_t *us,
+ nxt_name_value_t *nv)
+{
+ u_char *end;
+ nxt_http_source_t *hs;
+
+ end = nv->value_start + nv->value_len;
+
+ if (nxt_memcasestrn(nv->value_start, end, "chunked", 7) != NULL) {
+ hs = us->protocol_source;
+ hs->chunked = 1;
+ }
+
+ return NXT_OK;
+}
+
+
+static void
+nxt_http_source_header_ready(nxt_http_source_t *hs, nxt_buf_t *rest)
+{
+ nxt_buf_t *b;
+ nxt_upstream_source_t *us;
+ nxt_http_source_chunk_t *hsc;
+
+ us = hs->upstream;
+
+ /* Free buffers used for request header. */
+
+ for (b = us->stream->out; b != NULL; b = b->next) {
+ nxt_buf_pool_free(&us->buffers, b);
+ }
+
+ if (nxt_fast_path(nxt_buf_pool_available(&us->buffers))) {
+
+ if (hs->chunked) {
+ hsc = nxt_mem_zalloc(hs->upstream->buffers.mem_pool,
+ sizeof(nxt_http_source_chunk_t));
+ if (nxt_slow_path(hsc == NULL)) {
+ goto fail;
+ }
+
+ /*
+ * Change the HTTP source filter chain:
+ * stream source | chunk filter | HTTP body filter
+ */
+ hs->query.context = hsc;
+ hs->query.filter = nxt_http_source_chunk_filter;
+
+ hsc->next.context = hs;
+ hsc->next.filter = nxt_http_source_body_filter;
+
+ hsc->parse.mem_pool = hs->upstream->buffers.mem_pool;
+
+ if (nxt_buf_mem_used_size(&rest->mem) != 0) {
+ hs->rest = nxt_http_chunk_parse(&hsc->parse, rest);
+
+ if (nxt_slow_path(hs->rest == NULL)) {
+ goto fail;
+ }
+ }
+
+ } else {
+ /*
+ * Change the HTTP source filter chain:
+ * stream source | HTTP body filter
+ */
+ hs->query.filter = nxt_http_source_body_filter;
+
+ if (nxt_buf_mem_used_size(&rest->mem) != 0) {
+ hs->rest = rest;
+ }
+ }
+
+ hs->upstream->state->ready_handler(hs);
+ return;
+ }
+
+ nxt_thread_log_error(NXT_LOG_ERR, "%d buffers %uDK each "
+ "are not enough to read upstream response",
+ us->buffers.max, us->buffers.size / 1024);
+fail:
+
+ nxt_http_source_fail(hs);
+}
+
+
+static void
+nxt_http_source_chunk_filter(nxt_thread_t *thr, void *obj, void *data)
+{
+ nxt_buf_t *b;
+ nxt_http_source_t *hs;
+ nxt_http_source_chunk_t *hsc;
+
+ hsc = obj;
+ b = data;
+
+ nxt_log_debug(thr->log, "http source chunk filter");
+
+ b = nxt_http_chunk_parse(&hsc->parse, b);
+
+ hs = hsc->next.context;
+
+ if (hsc->parse.error) {
+ nxt_http_source_fail(hs);
+ return;
+ }
+
+ if (hsc->parse.chunk_error) {
+ /* Output all parsed before a chunk error and close upstream. */
+ nxt_thread_current_work_queue_add(thr, nxt_http_source_chunk_error,
+ hs, NULL, thr->log);
+ }
+
+ if (b != NULL) {
+ nxt_source_filter(thr, hs->upstream->work_queue, &hsc->next, b);
+ }
+}
+
+
+static void
+nxt_http_source_chunk_error(nxt_thread_t *thr, void *obj, void *data)
+{
+ nxt_http_source_t *hs;
+
+ hs = obj;
+
+ nxt_http_source_fail(hs);
+}
+
+
+/*
+ * The HTTP source body filter accumulates first body buffers before the next
+ * filter will be established and sets completion handler for the last buffer.
+ */
+
+static void
+nxt_http_source_body_filter(nxt_thread_t *thr, void *obj, void *data)
+{
+ nxt_buf_t *b, *in;
+ nxt_http_source_t *hs;
+
+ hs = obj;
+ in = data;
+
+ nxt_log_debug(thr->log, "http source body filter");
+
+ for (b = in; b != NULL; b = b->next) {
+
+ if (nxt_buf_is_last(b)) {
+ b->data = hs->upstream->data;
+ b->completion_handler = hs->upstream->state->completion_handler;
+ }
+ }
+
+ if (hs->next != NULL) {
+ nxt_source_filter(thr, hs->upstream->work_queue, hs->next, in);
+ return;
+ }
+
+ nxt_buf_chain_add(&hs->rest, in);
+}
+
+
+static void
+nxt_http_source_sync_buffer(nxt_thread_t *thr, nxt_http_source_t *hs,
+ nxt_buf_t *b)
+{
+ if (nxt_buf_is_last(b)) {
+ nxt_log_error(NXT_LOG_ERR, thr->log,
+ "upstream closed prematurely connection");
+
+ } else {
+ nxt_log_error(NXT_LOG_ERR, thr->log, "%ui buffers %uz each are not "
+ "enough to process upstream response header",
+ hs->upstream->buffers.max,
+ hs->upstream->buffers.size);
+ }
+
+ /* The stream source sends only the last and the nobuf sync buffer. */
+
+ nxt_http_source_fail(hs);
+}
+
+
+static void
+nxt_http_source_error(nxt_stream_source_t *stream)
+{
+ nxt_http_source_t *hs;
+
+ nxt_thread_log_debug("http source error");
+
+ hs = stream->next->context;
+ nxt_http_source_fail(hs);
+}
+
+
+static void
+nxt_http_source_fail(nxt_http_source_t *hs)
+{
+ nxt_thread_t *thr;
+
+ thr = nxt_thread();
+
+ nxt_log_debug(thr->log, "http source fail");
+
+ /* TODO: fail, next upstream, or bad gateway */
+
+ hs->upstream->state->error_handler(thr, hs, NULL);
+}
+
+
+static void
+nxt_http_source_message(const char *msg, size_t len, u_char *p)
+{
+ if (len > NXT_MAX_ERROR_STR - 300) {
+ len = NXT_MAX_ERROR_STR - 300;
+ p[len++] = '.'; p[len++] = '.'; p[len++] = '.';
+ }
+
+ nxt_thread_log_error(NXT_LOG_ERR, msg, len, p);
+}