diff options
Diffstat (limited to '')
-rw-r--r-- | src/nxt_http_source.c | 140 |
1 files changed, 70 insertions, 70 deletions
diff --git a/src/nxt_http_source.c b/src/nxt_http_source.c index 045d585b..10c5527b 100644 --- a/src/nxt_http_source.c +++ b/src/nxt_http_source.c @@ -15,9 +15,9 @@ typedef struct { static nxt_buf_t *nxt_http_source_request_create(nxt_http_source_t *hs); -static void nxt_http_source_status_filter(nxt_thread_t *thr, void *obj, +static void nxt_http_source_status_filter(nxt_task_t *task, void *obj, void *data); -static void nxt_http_source_header_filter(nxt_thread_t *thr, void *obj, +static void nxt_http_source_header_filter(nxt_task_t *task, void *obj, void *data); static nxt_int_t nxt_http_source_header_line_process(nxt_http_source_t *hs); @@ -26,24 +26,25 @@ static nxt_int_t nxt_http_source_content_length(nxt_upstream_source_t *us, static nxt_int_t nxt_http_source_transfer_encoding(nxt_upstream_source_t *us, nxt_name_value_t *nv); -static void nxt_http_source_header_ready(nxt_http_source_t *hs, - nxt_buf_t *rest); -static void nxt_http_source_chunk_filter(nxt_thread_t *thr, void *obj, +static void nxt_http_source_header_ready(nxt_task_t *task, + nxt_http_source_t *hs, nxt_buf_t *rest); +static void nxt_http_source_chunk_filter(nxt_task_t *task, void *obj, void *data); -static void nxt_http_source_chunk_error(nxt_thread_t *thr, void *obj, +static void nxt_http_source_chunk_error(nxt_task_t *task, void *obj, void *data); -static void nxt_http_source_body_filter(nxt_thread_t *thr, void *obj, +static void nxt_http_source_body_filter(nxt_task_t *task, void *obj, void *data); -static void nxt_http_source_sync_buffer(nxt_thread_t *thr, - nxt_http_source_t *hs, nxt_buf_t *b); -static void nxt_http_source_error(nxt_stream_source_t *stream); -static void nxt_http_source_fail(nxt_http_source_t *hs); +static void nxt_http_source_sync_buffer(nxt_task_t *task, nxt_http_source_t *hs, + nxt_buf_t *b); +static void nxt_http_source_error(nxt_task_t *task, + nxt_stream_source_t *stream); +static void nxt_http_source_fail(nxt_task_t *task, nxt_http_source_t *hs); static void nxt_http_source_message(const char *msg, size_t len, u_char *p); void -nxt_http_source_handler(nxt_upstream_source_t *us, +nxt_http_source_handler(nxt_task_t *task, nxt_upstream_source_t *us, nxt_http_source_request_create_t request_create) { nxt_http_source_t *hs; @@ -99,13 +100,13 @@ nxt_http_source_handler(nxt_upstream_source_t *us, if (nxt_fast_path(stream->out != NULL)) { nxt_memzero(&hs->u.status_parse, sizeof(nxt_http_status_parse_t)); - nxt_stream_source_connect(stream); + nxt_stream_source_connect(task, stream); return; } fail: - nxt_http_source_fail(hs); + nxt_http_source_fail(task, hs); } @@ -184,7 +185,7 @@ new_buffer: static void -nxt_http_source_status_filter(nxt_thread_t *thr, void *obj, void *data) +nxt_http_source_status_filter(nxt_task_t *task, void *obj, void *data) { nxt_int_t ret; nxt_buf_t *b; @@ -198,10 +199,10 @@ nxt_http_source_status_filter(nxt_thread_t *thr, void *obj, void *data) * start the stream source passes buffers one at a time. */ - nxt_log_debug(thr->log, "http source status filter"); + nxt_debug(task, "http source status filter"); if (nxt_slow_path(nxt_buf_is_sync(b))) { - nxt_http_source_sync_buffer(thr, hs, b); + nxt_http_source_sync_buffer(task, hs, b); return; } @@ -214,28 +215,28 @@ nxt_http_source_status_filter(nxt_thread_t *thr, void *obj, void *data) */ hs->query.filter = nxt_http_source_header_filter; - nxt_log_debug(thr->log, "upstream status: \"%*s\"", - hs->u.status_parse.end - b->mem.start, b->mem.start); + nxt_debug(task, "upstream status: \"%*s\"", + hs->u.status_parse.end - b->mem.start, b->mem.start); hs->header_in.status = hs->u.status_parse.code; - nxt_log_debug(thr->log, "upstream version:%d status:%uD \"%*s\"", - hs->u.status_parse.http_version, - hs->u.status_parse.code, - hs->u.status_parse.end - hs->u.status_parse.start, - hs->u.status_parse.start); + nxt_debug(task, "upstream version:%d status:%uD \"%*s\"", + hs->u.status_parse.http_version, + hs->u.status_parse.code, + hs->u.status_parse.end - hs->u.status_parse.start, + hs->u.status_parse.start); nxt_memzero(&hs->u.header, sizeof(nxt_http_split_header_parse_t)); hs->u.header.mem_pool = hs->upstream->buffers.mem_pool; - nxt_http_source_header_filter(thr, hs, b); + nxt_http_source_header_filter(task, hs, b); return; } if (nxt_slow_path(ret == NXT_ERROR)) { /* HTTP/0.9 response. */ hs->header_in.status = 200; - nxt_http_source_header_ready(hs, b); + nxt_http_source_header_ready(task, hs, b); return; } @@ -251,13 +252,13 @@ nxt_http_source_status_filter(nxt_thread_t *thr, void *obj, void *data) nxt_http_source_message("upstream sent too long status line: \"%*s\"", b->mem.pos - b->mem.start, b->mem.start); - nxt_http_source_fail(hs); + nxt_http_source_fail(task, hs); } } static void -nxt_http_source_header_filter(nxt_thread_t *thr, void *obj, void *data) +nxt_http_source_header_filter(nxt_task_t *task, void *obj, void *data) { nxt_int_t ret; nxt_buf_t *b; @@ -271,10 +272,10 @@ nxt_http_source_header_filter(nxt_thread_t *thr, void *obj, void *data) * start the stream source passes buffers one at a time. */ - nxt_log_debug(thr->log, "http source header filter"); + nxt_debug(task, "http source header filter"); if (nxt_slow_path(nxt_buf_is_sync(b))) { - nxt_http_source_sync_buffer(thr, hs, b); + nxt_http_source_sync_buffer(task, hs, b); return; } @@ -293,8 +294,8 @@ nxt_http_source_header_filter(nxt_thread_t *thr, void *obj, void *data) } if (nxt_fast_path(ret == NXT_DONE)) { - nxt_log_debug(thr->log, "http source header done"); - nxt_http_source_header_ready(hs, b); + nxt_debug(task, "http source header done"); + nxt_http_source_header_ready(task, hs, b); return; } @@ -304,16 +305,16 @@ nxt_http_source_header_filter(nxt_thread_t *thr, void *obj, void *data) if (ret != NXT_ERROR) { /* ret == NXT_DECLINED: "\r" is not followed by "\n" */ - nxt_log_error(NXT_LOG_ERR, thr->log, - "upstream sent invalid header line: \"%*s\\r...\"", - hs->u.header.parse.header_end - - hs->u.header.parse.header_name_start, - hs->u.header.parse.header_name_start); + nxt_log(task, NXT_LOG_ERR, + "upstream sent invalid header line: \"%*s\\r...\"", + hs->u.header.parse.header_end + - hs->u.header.parse.header_name_start, + hs->u.header.parse.header_name_start); } /* ret == NXT_ERROR */ - nxt_http_source_fail(hs); + nxt_http_source_fail(task, hs); } @@ -425,7 +426,8 @@ nxt_http_source_transfer_encoding(nxt_upstream_source_t *us, static void -nxt_http_source_header_ready(nxt_http_source_t *hs, nxt_buf_t *rest) +nxt_http_source_header_ready(nxt_task_t *task, nxt_http_source_t *hs, + nxt_buf_t *rest) { nxt_buf_t *b; nxt_upstream_source_t *us; @@ -461,7 +463,7 @@ nxt_http_source_header_ready(nxt_http_source_t *hs, nxt_buf_t *rest) hsc->parse.mem_pool = hs->upstream->buffers.mem_pool; if (nxt_buf_mem_used_size(&rest->mem) != 0) { - hs->rest = nxt_http_chunk_parse(&hsc->parse, rest); + hs->rest = nxt_http_chunk_parse(task, &hsc->parse, rest); if (nxt_slow_path(hs->rest == NULL)) { goto fail; @@ -489,12 +491,12 @@ nxt_http_source_header_ready(nxt_http_source_t *hs, nxt_buf_t *rest) us->buffers.max, us->buffers.size / 1024); fail: - nxt_http_source_fail(hs); + nxt_http_source_fail(task, hs); } static void -nxt_http_source_chunk_filter(nxt_thread_t *thr, void *obj, void *data) +nxt_http_source_chunk_filter(nxt_task_t *task, void *obj, void *data) { nxt_buf_t *b; nxt_http_source_t *hs; @@ -503,37 +505,39 @@ nxt_http_source_chunk_filter(nxt_thread_t *thr, void *obj, void *data) hsc = obj; b = data; - nxt_log_debug(thr->log, "http source chunk filter"); + nxt_debug(task, "http source chunk filter"); - b = nxt_http_chunk_parse(&hsc->parse, b); + b = nxt_http_chunk_parse(task, &hsc->parse, b); hs = hsc->next.context; if (hsc->parse.error) { - nxt_http_source_fail(hs); + nxt_http_source_fail(task, hs); return; } if (hsc->parse.chunk_error) { /* Output all parsed before a chunk error and close upstream. */ - nxt_thread_current_work_queue_add(thr, nxt_http_source_chunk_error, - hs, NULL, thr->log); + nxt_thread_current_work_queue_add(task->thread, + nxt_http_source_chunk_error, + task, hs, NULL); } if (b != NULL) { - nxt_source_filter(thr, hs->upstream->work_queue, &hsc->next, b); + nxt_source_filter(task->thread, hs->upstream->work_queue, task, + &hsc->next, b); } } static void -nxt_http_source_chunk_error(nxt_thread_t *thr, void *obj, void *data) +nxt_http_source_chunk_error(nxt_task_t *task, void *obj, void *data) { nxt_http_source_t *hs; hs = obj; - nxt_http_source_fail(hs); + nxt_http_source_fail(task, hs); } @@ -543,7 +547,7 @@ nxt_http_source_chunk_error(nxt_thread_t *thr, void *obj, void *data) */ static void -nxt_http_source_body_filter(nxt_thread_t *thr, void *obj, void *data) +nxt_http_source_body_filter(nxt_task_t *task, void *obj, void *data) { nxt_buf_t *b, *in; nxt_http_source_t *hs; @@ -551,7 +555,7 @@ nxt_http_source_body_filter(nxt_thread_t *thr, void *obj, void *data) hs = obj; in = data; - nxt_log_debug(thr->log, "http source body filter"); + nxt_debug(task, "http source body filter"); for (b = in; b != NULL; b = b->next) { @@ -562,7 +566,8 @@ nxt_http_source_body_filter(nxt_thread_t *thr, void *obj, void *data) } if (hs->next != NULL) { - nxt_source_filter(thr, hs->upstream->work_queue, hs->next, in); + nxt_source_filter(task->thread, hs->upstream->work_queue, task, + hs->next, in); return; } @@ -571,50 +576,45 @@ nxt_http_source_body_filter(nxt_thread_t *thr, void *obj, void *data) static void -nxt_http_source_sync_buffer(nxt_thread_t *thr, nxt_http_source_t *hs, +nxt_http_source_sync_buffer(nxt_task_t *task, nxt_http_source_t *hs, nxt_buf_t *b) { if (nxt_buf_is_last(b)) { - nxt_log_error(NXT_LOG_ERR, thr->log, - "upstream closed prematurely connection"); + nxt_log(task, NXT_LOG_ERR, + "upstream closed prematurely connection"); } else { - nxt_log_error(NXT_LOG_ERR, thr->log, "%ui buffers %uz each are not " - "enough to process upstream response header", - hs->upstream->buffers.max, - hs->upstream->buffers.size); + nxt_log(task, NXT_LOG_ERR,"%ui buffers %uz each are not " + "enough to process upstream response header", + hs->upstream->buffers.max, hs->upstream->buffers.size); } /* The stream source sends only the last and the nobuf sync buffer. */ - nxt_http_source_fail(hs); + nxt_http_source_fail(task, hs); } static void -nxt_http_source_error(nxt_stream_source_t *stream) +nxt_http_source_error(nxt_task_t *task, nxt_stream_source_t *stream) { nxt_http_source_t *hs; nxt_thread_log_debug("http source error"); hs = stream->next->context; - nxt_http_source_fail(hs); + nxt_http_source_fail(task, hs); } static void -nxt_http_source_fail(nxt_http_source_t *hs) +nxt_http_source_fail(nxt_task_t *task, nxt_http_source_t *hs) { - nxt_thread_t *thr; - - thr = nxt_thread(); - - nxt_log_debug(thr->log, "http source fail"); + nxt_debug(task, "http source fail"); /* TODO: fail, next upstream, or bad gateway */ - hs->upstream->state->error_handler(thr, hs, NULL); + hs->upstream->state->error_handler(task, hs, NULL); } |