/* * Copyright (C) Igor Sysoev * Copyright (C) NGINX, Inc. */ #include typedef struct { nxt_http_chunk_parse_t parse; nxt_source_hook_t next; } nxt_http_source_chunk_t; static nxt_buf_t *nxt_http_source_request_create(nxt_http_source_t *hs); static void nxt_http_source_status_filter(nxt_task_t *task, void *obj, void *data); static void nxt_http_source_header_filter(nxt_task_t *task, void *obj, void *data); static nxt_int_t nxt_http_source_header_line_process(nxt_http_source_t *hs); static nxt_int_t nxt_http_source_content_length(nxt_upstream_source_t *us, nxt_name_value_t *nv); static nxt_int_t nxt_http_source_transfer_encoding(nxt_upstream_source_t *us, nxt_name_value_t *nv); static void nxt_http_source_header_ready(nxt_task_t *task, nxt_http_source_t *hs, nxt_buf_t *rest); static void nxt_http_source_chunk_filter(nxt_task_t *task, void *obj, void *data); static void nxt_http_source_chunk_error(nxt_task_t *task, void *obj, void *data); static void nxt_http_source_body_filter(nxt_task_t *task, void *obj, void *data); static void nxt_http_source_sync_buffer(nxt_task_t *task, nxt_http_source_t *hs, nxt_buf_t *b); static void nxt_http_source_error(nxt_task_t *task, nxt_stream_source_t *stream); static void nxt_http_source_fail(nxt_task_t *task, nxt_http_source_t *hs); static void nxt_http_source_message(const char *msg, size_t len, u_char *p); void nxt_http_source_handler(nxt_task_t *task, nxt_upstream_source_t *us, nxt_http_source_request_create_t request_create) { nxt_http_source_t *hs; nxt_stream_source_t *stream; hs = nxt_mp_zget(us->buffers.mem_pool, sizeof(nxt_http_source_t)); if (nxt_slow_path(hs == NULL)) { goto fail; } us->protocol_source = hs; hs->header_in.list = nxt_list_create(us->buffers.mem_pool, 8, sizeof(nxt_name_value_t)); if (nxt_slow_path(hs->header_in.list == NULL)) { goto fail; } hs->header_in.hash = us->header_hash; hs->upstream = us; hs->request_create = request_create; stream = us->stream; if (stream == NULL) { stream = nxt_mp_zget(us->buffers.mem_pool, sizeof(nxt_stream_source_t)); if (nxt_slow_path(stream == NULL)) { goto fail; } us->stream = stream; stream->upstream = us; } else { nxt_memzero(stream, sizeof(nxt_stream_source_t)); } /* * Create the HTTP source filter chain: * stream source | HTTP status line filter */ stream->next = &hs->query; stream->error_handler = nxt_http_source_error; hs->query.context = hs; hs->query.filter = nxt_http_source_status_filter; hs->header_in.content_length = -1; stream->out = nxt_http_source_request_create(hs); if (nxt_fast_path(stream->out != NULL)) { nxt_memzero(&hs->u.status_parse, sizeof(nxt_http_status_parse_t)); nxt_stream_source_connect(task, stream); return; } fail: nxt_http_source_fail(task, hs); } nxt_inline u_char * nxt_http_source_copy(u_char *p, nxt_str_t *src, size_t len) { u_char *s; if (nxt_fast_path(len >= src->len)) { len = src->len; src->len = 0; } else { src->len -= len; } s = src->data; src->data += len; return nxt_cpymem(p, s, len); } static nxt_buf_t * nxt_http_source_request_create(nxt_http_source_t *hs) { nxt_int_t ret; nxt_buf_t *b, *req, **prev; nxt_thread_log_debug("http source create request"); prev = &req; new_buffer: ret = nxt_buf_pool_mem_alloc(&hs->upstream->buffers, 0); if (nxt_slow_path(ret != NXT_OK)) { return NULL; } b = hs->upstream->buffers.current; hs->upstream->buffers.current = NULL; *prev = b; prev = &b->next; for ( ;; ) { ret = hs->request_create(hs); if (nxt_fast_path(ret == NXT_OK)) { b->mem.free = nxt_http_source_copy(b->mem.free, &hs->u.request.copy, b->mem.end - b->mem.free); if (nxt_fast_path(hs->u.request.copy.len == 0)) { continue; } nxt_thread_log_debug("\"%*s\"", b->mem.free - b->mem.pos, b->mem.pos); goto new_buffer; } if (nxt_slow_path(ret == NXT_ERROR)) { return NULL; } /* ret == NXT_DONE */ break; } nxt_thread_log_debug("\"%*s\"", b->mem.free - b->mem.pos, b->mem.pos); return req; } static void nxt_http_source_status_filter(nxt_task_t *task, void *obj, void *data) { nxt_int_t ret; nxt_buf_t *b; nxt_http_source_t *hs; hs = obj; b = data; /* * No cycle over buffer chain is required since at * start the stream source passes buffers one at a time. */ nxt_debug(task, "http source status filter"); if (nxt_slow_path(nxt_buf_is_sync(b))) { nxt_http_source_sync_buffer(task, hs, b); return; } ret = nxt_http_status_parse(&hs->u.status_parse, &b->mem); if (nxt_fast_path(ret == NXT_OK)) { /* * Change the HTTP source filter chain: * stream source | HTTP header filter */ hs->query.filter = nxt_http_source_header_filter; nxt_debug(task, "upstream status: \"%*s\"", hs->u.status_parse.end - b->mem.start, b->mem.start); hs->header_in.status = hs->u.status_parse.code; nxt_debug(task, "upstream version:%d status:%uD \"%*s\"", hs->u.status_parse.http_version, hs->u.status_parse.code, hs->u.status_parse.end - hs->u.status_parse.start, hs->u.status_parse.start); nxt_memzero(&hs->u.header, sizeof(nxt_http_split_header_parse_t)); hs->u.header.mem_pool = hs->upstream->buffers.mem_pool; nxt_http_source_header_filter(task, hs, b); return; } if (nxt_slow_path(ret == NXT_ERROR)) { /* HTTP/0.9 response. */ hs->header_in.status = 200; nxt_http_source_header_ready(task, hs, b); return; } /* ret == NXT_AGAIN */ /* * b->mem.pos is always equal to b->mem.end because b is a buffer * which points to a response part read by the stream source. * However, since the stream source is an immediate source of the * status filter, b->parent is a buffer the stream source reads in. */ if (b->parent->mem.pos == b->parent->mem.end) { nxt_http_source_message("upstream sent too long status line: \"%*s\"", b->mem.pos - b->mem.start, b->mem.start); nxt_http_source_fail(task, hs); } } static void nxt_http_source_header_filter(nxt_task_t *task, void *obj, void *data) { nxt_int_t ret; nxt_buf_t *b; nxt_http_source_t *hs; hs = obj; b = data; /* * No cycle over buffer chain is required since at * start the stream source passes buffers one at a time. */ nxt_debug(task, "http source header filter"); if (nxt_slow_path(nxt_buf_is_sync(b))) { nxt_http_source_sync_buffer(task, hs, b); return; } for ( ;; ) { ret = nxt_http_split_header_parse(&hs->u.header, &b->mem); if (nxt_slow_path(ret != NXT_OK)) { break; } ret = nxt_http_source_header_line_process(hs); if (nxt_slow_path(ret != NXT_OK)) { break; } } if (nxt_fast_path(ret == NXT_DONE)) { nxt_debug(task, "http source header done"); nxt_http_source_header_ready(task, hs, b); return; } if (nxt_fast_path(ret == NXT_AGAIN)) { return; } if (ret != NXT_ERROR) { /* ret == NXT_DECLINED: "\r" is not followed by "\n" */ nxt_log(task, NXT_LOG_ERR, "upstream sent invalid header line: \"%*s\\r...\"", hs->u.header.parse.header_end - hs->u.header.parse.header_name_start, hs->u.header.parse.header_name_start); } /* ret == NXT_ERROR */ nxt_http_source_fail(task, hs); } static nxt_int_t nxt_http_source_header_line_process(nxt_http_source_t *hs) { size_t name_len; nxt_name_value_t *nv; nxt_lvlhsh_query_t lhq; nxt_http_header_parse_t *hp; nxt_upstream_name_value_t *unv; hp = &hs->u.header.parse; name_len = hp->header_name_end - hp->header_name_start; if (name_len > 255) { nxt_http_source_message("upstream sent too long header field name: " "\"%*s\"", name_len, hp->header_name_start); return NXT_ERROR; } nv = nxt_list_add(hs->header_in.list); if (nxt_slow_path(nv == NULL)) { return NXT_ERROR; } nv->hash = hp->header_hash; nv->skip = 0; nv->name_len = name_len; nv->name_start = hp->header_name_start; nv->value_len = hp->header_end - hp->header_start; nv->value_start = hp->header_start; nxt_thread_log_debug("upstream header: \"%*s: %*s\"", nv->name_len, nv->name_start, nv->value_len, nv->value_start); lhq.key_hash = nv->hash; lhq.key.len = nv->name_len; lhq.key.data = nv->name_start; lhq.proto = &nxt_upstream_header_hash_proto; if (nxt_lvlhsh_find(&hs->header_in.hash, &lhq) == NXT_OK) { unv = lhq.value; if (unv->handler(hs->upstream, nv) != NXT_OK) { return NXT_ERROR; } } return NXT_OK; } static const nxt_upstream_name_value_t nxt_http_source_headers[] nxt_aligned(32) = { { nxt_http_source_content_length, nxt_upstream_name_value("content-length") }, { nxt_http_source_transfer_encoding, nxt_upstream_name_value("transfer-encoding") }, }; nxt_int_t nxt_http_source_hash_create(nxt_mp_t *mp, nxt_lvlhsh_t *lh) { return nxt_upstream_header_hash_add(mp, lh, nxt_http_source_headers, nxt_nitems(nxt_http_source_headers)); } static nxt_int_t nxt_http_source_content_length(nxt_upstream_source_t *us, nxt_name_value_t *nv) { nxt_off_t length; nxt_http_source_t *hs; length = nxt_off_t_parse(nv->value_start, nv->value_len); if (nxt_fast_path(length > 0)) { hs = us->protocol_source; hs->header_in.content_length = length; return NXT_OK; } return NXT_ERROR; } static nxt_int_t nxt_http_source_transfer_encoding(nxt_upstream_source_t *us, nxt_name_value_t *nv) { u_char *end; nxt_http_source_t *hs; end = nv->value_start + nv->value_len; if (nxt_memcasestrn(nv->value_start, end, "chunked", 7) != NULL) { hs = us->protocol_source; hs->chunked = 1; } return NXT_OK; } static void nxt_http_source_header_ready(nxt_task_t *task, nxt_http_source_t *hs, nxt_buf_t *rest) { nxt_buf_t *b; nxt_upstream_source_t *us; nxt_http_source_chunk_t *hsc; us = hs->upstream; /* Free buffers used for request header. */ for (b = us->stream->out; b != NULL; b = b->next) { nxt_buf_pool_free(&us->buffers, b); } if (nxt_fast_path(nxt_buf_pool_available(&us->buffers))) { if (hs->chunked) { hsc = nxt_mp_zalloc(hs->upstream->buffers.mem_pool, sizeof(nxt_http_source_chunk_t)); if (nxt_slow_path(hsc == NULL)) { goto fail; } /* * Change the HTTP source filter chain: * stream source | chunk filter | HTTP body filter */ hs->query.context = hsc; hs->query.filter = nxt_http_source_chunk_filter; hsc->next.context = hs; hsc->next.filter = nxt_http_source_body_filter; hsc->parse.mem_pool = hs->upstream->buffers.mem_pool; if (nxt_buf_mem_used_size(&rest->mem) != 0) { hs->rest = nxt_http_chunk_parse(task, &hsc->parse, rest); if (nxt_slow_path(hs->rest == NULL)) { goto fail; } } } else { /* * Change the HTTP source filter chain: * stream source | HTTP body filter */ hs->query.filter = nxt_http_source_body_filter; if (nxt_buf_mem_used_size(&rest->mem) != 0) { hs->rest = rest; } } hs->upstream->state->ready_handler(hs); return; } nxt_thread_log_error(NXT_LOG_ERR, "%d buffers %uDK each " "are not enough to read upstream response", us->buffers.max, us->buffers.size / 1024); fail: nxt_http_source_fail(task, hs); } static void nxt_http_source_chunk_filter(nxt_task_t *task, void *obj, void *data) { nxt_buf_t *b; nxt_http_source_t *hs; nxt_http_source_chunk_t *hsc; hsc = obj; b = data; nxt_debug(task, "http source chunk filter"); b = nxt_http_chunk_parse(task, &hsc->parse, b); hs = hsc->next.context; if (hsc->parse.error) { nxt_http_source_fail(task, hs); return; } if (hsc->parse.chunk_error) { /* Output all parsed before a chunk error and close upstream. */ nxt_thread_current_work_queue_add(task->thread, nxt_http_source_chunk_error, task, hs, NULL); } if (b != NULL) { nxt_source_filter(task->thread, hs->upstream->work_queue, task, &hsc->next, b); } } static void nxt_http_source_chunk_error(nxt_task_t *task, void *obj, void *data) { nxt_http_source_t *hs; hs = obj; nxt_http_source_fail(task, hs); } /* * The HTTP source body filter accumulates first body buffers before the next * filter will be established and sets completion handler for the last buffer. */ static void nxt_http_source_body_filter(nxt_task_t *task, void *obj, void *data) { nxt_buf_t *b, *in; nxt_http_source_t *hs; hs = obj; in = data; nxt_debug(task, "http source body filter"); for (b = in; b != NULL; b = b->next) { if (nxt_buf_is_last(b)) { b->data = hs->upstream->data; b->completion_handler = hs->upstream->state->completion_handler; } } if (hs->next != NULL) { nxt_source_filter(task->thread, hs->upstream->work_queue, task, hs->next, in); return; } nxt_buf_chain_add(&hs->rest, in); } static void nxt_http_source_sync_buffer(nxt_task_t *task, nxt_http_source_t *hs, nxt_buf_t *b) { if (nxt_buf_is_last(b)) { nxt_log(task, NXT_LOG_ERR, "upstream closed prematurely connection"); } else { nxt_log(task, NXT_LOG_ERR,"%ui buffers %uz each are not " "enough to process upstream response header", hs->upstream->buffers.max, hs->upstream->buffers.size); } /* The stream source sends only the last and the nobuf sync buffer. */ nxt_http_source_fail(task, hs); } static void nxt_http_source_error(nxt_task_t *task, nxt_stream_source_t *stream) { nxt_http_source_t *hs; nxt_thread_log_debug("http source error"); hs = stream->next->context; nxt_http_source_fail(task, hs); } static void nxt_http_source_fail(nxt_task_t *task, nxt_http_source_t *hs) { nxt_debug(task, "http source fail"); /* TODO: fail, next upstream, or bad gateway */ hs->upstream->state->error_handler(task, hs, NULL); } static void nxt_http_source_message(const char *msg, size_t len, u_char *p) { if (len > NXT_MAX_ERROR_STR - 300) { len = NXT_MAX_ERROR_STR - 300; p[len++] = '.'; p[len++] = '.'; p[len++] = '.'; } nxt_thread_log_error(NXT_LOG_ERR, msg, len, p); }