diff options
author | Igor Sysoev <igor@sysoev.ru> | 2017-01-23 19:56:03 +0300 |
---|---|---|
committer | Igor Sysoev <igor@sysoev.ru> | 2017-01-23 19:56:03 +0300 |
commit | de532922d9ab42aa15b40d47c8db53ac2af38500 (patch) | |
tree | d6b7c6b21c7a6e0e3620a3e0c7198e63454d05e3 /src/nxt_fastcgi_source.c | |
parent | 16cbf3c076a0aca6d47adaf3f719493674cf2363 (diff) | |
download | unit-de532922d9ab42aa15b40d47c8db53ac2af38500.tar.gz unit-de532922d9ab42aa15b40d47c8db53ac2af38500.tar.bz2 |
Introducing tasks.
Diffstat (limited to 'src/nxt_fastcgi_source.c')
-rw-r--r-- | src/nxt_fastcgi_source.c | 124 |
1 files changed, 60 insertions, 64 deletions
diff --git a/src/nxt_fastcgi_source.c b/src/nxt_fastcgi_source.c index 14c51d6f..d655fca8 100644 --- a/src/nxt_fastcgi_source.c +++ b/src/nxt_fastcgi_source.c @@ -49,16 +49,17 @@ static nxt_buf_t *nxt_fastcgi_request_create(nxt_fastcgi_source_t *fs); static nxt_int_t nxt_fastcgi_next_param(nxt_fastcgi_source_t *fs, nxt_fastcgi_param_t *param); -static void nxt_fastcgi_source_record_filter(nxt_thread_t *thr, void *obj, +static void nxt_fastcgi_source_record_filter(nxt_task_t *task, void *obj, void *data); -static void nxt_fastcgi_source_record_error(nxt_thread_t *thr, void *obj, +static void nxt_fastcgi_source_record_error(nxt_task_t *task, void *obj, void *data); -static void nxt_fastcgi_source_header_filter(nxt_thread_t *thr, void *obj, +static void nxt_fastcgi_source_header_filter(nxt_task_t *task, void *obj, void *data); -static void nxt_fastcgi_source_sync_buffer(nxt_thread_t *thr, +static void nxt_fastcgi_source_sync_buffer(nxt_task_t *task, nxt_fastcgi_source_t *fs, nxt_buf_t *b); -static nxt_int_t nxt_fastcgi_source_header_process(nxt_fastcgi_source_t *fs); +static nxt_int_t nxt_fastcgi_source_header_process(nxt_task_t *task, + nxt_fastcgi_source_t *fs); static nxt_int_t nxt_fastcgi_source_status(nxt_upstream_source_t *us, nxt_name_value_t *nv); static nxt_int_t nxt_fastcgi_source_content_length(nxt_upstream_source_t *us, @@ -66,11 +67,12 @@ static nxt_int_t nxt_fastcgi_source_content_length(nxt_upstream_source_t *us, static void nxt_fastcgi_source_header_ready(nxt_fastcgi_source_t *fs, nxt_buf_t *b); -static void nxt_fastcgi_source_body_filter(nxt_thread_t *thr, void *obj, +static void nxt_fastcgi_source_body_filter(nxt_task_t *task, void *obj, void *data); static nxt_buf_t *nxt_fastcgi_source_last_buf(nxt_fastcgi_parse_t *fp); -static void nxt_fastcgi_source_error(nxt_stream_source_t *stream); -static void nxt_fastcgi_source_fail(nxt_fastcgi_source_t *fs); +static void nxt_fastcgi_source_error(nxt_task_t *task, + nxt_stream_source_t *stream); +static void nxt_fastcgi_source_fail(nxt_task_t *task, nxt_fastcgi_source_t *fs); /* @@ -117,7 +119,7 @@ static const uint8_t nxt_fastcgi_stdin_record[] = { void -nxt_fastcgi_source_handler(nxt_upstream_source_t *us, +nxt_fastcgi_source_handler(nxt_task_t *task, nxt_upstream_source_t *us, nxt_fastcgi_source_request_create_t request_create) { nxt_stream_source_t *stream; @@ -181,13 +183,13 @@ nxt_fastcgi_source_handler(nxt_upstream_source_t *us, nxt_memzero(&fs->u.header, sizeof(nxt_http_split_header_parse_t)); fs->u.header.mem_pool = fs->upstream->buffers.mem_pool; - nxt_stream_source_connect(stream); + nxt_stream_source_connect(task, stream); return; } fail: - nxt_fastcgi_source_fail(fs); + nxt_fastcgi_source_fail(task, fs); } @@ -383,7 +385,7 @@ nxt_fastcgi_next_param(nxt_fastcgi_source_t *fs, nxt_fastcgi_param_t *param) static void -nxt_fastcgi_source_record_filter(nxt_thread_t *thr, void *obj, void *data) +nxt_fastcgi_source_record_filter(nxt_task_t *task, void *obj, void *data) { size_t size; u_char *p; @@ -394,18 +396,18 @@ nxt_fastcgi_source_record_filter(nxt_thread_t *thr, void *obj, void *data) fsr = obj; in = data; - nxt_log_debug(thr->log, "fastcgi source record filter"); + nxt_debug(task, "fastcgi source record filter"); if (nxt_slow_path(fsr->parse.done)) { return; } - nxt_fastcgi_record_parse(&fsr->parse, in); + nxt_fastcgi_record_parse(task, &fsr->parse, in); fs = nxt_container_of(fsr, nxt_fastcgi_source_t, record); if (fsr->parse.error) { - nxt_fastcgi_source_fail(fs); + nxt_fastcgi_source_fail(task, fs); return; } @@ -413,8 +415,9 @@ nxt_fastcgi_source_record_filter(nxt_thread_t *thr, void *obj, void *data) /* * Output all parsed before a FastCGI record error and close upstream. */ - nxt_thread_current_work_queue_add(thr, nxt_fastcgi_source_record_error, - fs, NULL, thr->log); + nxt_thread_current_work_queue_add(task->thread, + nxt_fastcgi_source_record_error, + task, fs, NULL); } /* Log FastCGI stderr output. */ @@ -430,36 +433,36 @@ nxt_fastcgi_source_record_filter(nxt_thread_t *thr, void *obj, void *data) size = (p + 1) - b->mem.pos; if (size != 0) { - nxt_log_error(NXT_LOG_ERR, thr->log, - "upstream sent in FastCGI stderr: \"%*s\"", - size, b->mem.pos); + nxt_log(task, NXT_LOG_ERR, + "upstream sent in FastCGI stderr: \"%*s\"", + size, b->mem.pos); } - b->completion_handler(thr, b, b->parent); + b->completion_handler(task, b, b->parent); } /* Process FastCGI stdout output. */ if (fsr->parse.out[0] != NULL) { - nxt_source_filter(thr, fs->upstream->work_queue, &fsr->next, - fsr->parse.out[0]); + nxt_source_filter(task->thread, fs->upstream->work_queue, task, + &fsr->next, fsr->parse.out[0]); } } static void -nxt_fastcgi_source_record_error(nxt_thread_t *thr, void *obj, void *data) +nxt_fastcgi_source_record_error(nxt_task_t *task, void *obj, void *data) { nxt_fastcgi_source_t *fs; fs = obj; - nxt_fastcgi_source_fail(fs); + nxt_fastcgi_source_fail(task, fs); } static void -nxt_fastcgi_source_header_filter(nxt_thread_t *thr, void *obj, void *data) +nxt_fastcgi_source_header_filter(nxt_task_t *task, void *obj, void *data) { nxt_int_t ret; nxt_buf_t *b; @@ -469,10 +472,10 @@ nxt_fastcgi_source_header_filter(nxt_thread_t *thr, void *obj, void *data) b = data; do { - nxt_log_debug(thr->log, "fastcgi source header filter"); + nxt_debug(task, "fastcgi source header filter"); if (nxt_slow_path(nxt_buf_is_sync(b))) { - nxt_fastcgi_source_sync_buffer(thr, fs, b); + nxt_fastcgi_source_sync_buffer(task, fs, b); return; } @@ -483,7 +486,7 @@ nxt_fastcgi_source_header_filter(nxt_thread_t *thr, void *obj, void *data) break; } - ret = nxt_fastcgi_source_header_process(fs); + ret = nxt_fastcgi_source_header_process(task, fs); if (nxt_slow_path(ret != NXT_OK)) { break; @@ -491,7 +494,7 @@ nxt_fastcgi_source_header_filter(nxt_thread_t *thr, void *obj, void *data) } if (nxt_fast_path(ret == NXT_DONE)) { - nxt_log_debug(thr->log, "fastcgi source header done"); + nxt_debug(task, "fastcgi source header done"); nxt_fastcgi_source_header_ready(fs, b); return; } @@ -500,16 +503,16 @@ nxt_fastcgi_source_header_filter(nxt_thread_t *thr, void *obj, void *data) if (ret != NXT_ERROR) { /* n == NXT_DECLINED: "\r" is not followed by "\n" */ - nxt_log_error(NXT_LOG_ERR, thr->log, - "upstream sent invalid header line: \"%*s\\r...\"", - fs->u.header.parse.header_end - - fs->u.header.parse.header_name_start, - fs->u.header.parse.header_name_start); + nxt_log(task, NXT_LOG_ERR, + "upstream sent invalid header line: \"%*s\\r...\"", + fs->u.header.parse.header_end + - fs->u.header.parse.header_name_start, + fs->u.header.parse.header_name_start); } /* ret == NXT_ERROR */ - nxt_fastcgi_source_fail(fs); + nxt_fastcgi_source_fail(task, fs); return; } @@ -520,45 +523,41 @@ nxt_fastcgi_source_header_filter(nxt_thread_t *thr, void *obj, void *data) static void -nxt_fastcgi_source_sync_buffer(nxt_thread_t *thr, nxt_fastcgi_source_t *fs, +nxt_fastcgi_source_sync_buffer(nxt_task_t *task, nxt_fastcgi_source_t *fs, nxt_buf_t *b) { if (nxt_buf_is_last(b)) { - nxt_log_error(NXT_LOG_ERR, thr->log, - "upstream closed prematurely connection"); + nxt_log(task, NXT_LOG_ERR, "upstream closed prematurely connection"); } else { - nxt_log_error(NXT_LOG_ERR, thr->log, "%ui buffers %uz each are not " - "enough to process upstream response header", - fs->upstream->buffers.max, - fs->upstream->buffers.size); + nxt_log(task, NXT_LOG_ERR, "%ui buffers %uz each are not " + "enough to process upstream response header", + fs->upstream->buffers.max, fs->upstream->buffers.size); } /* The stream source sends only the last and the nobuf sync buffer. */ - nxt_fastcgi_source_fail(fs); + nxt_fastcgi_source_fail(task, fs); } static nxt_int_t -nxt_fastcgi_source_header_process(nxt_fastcgi_source_t *fs) +nxt_fastcgi_source_header_process(nxt_task_t *task, nxt_fastcgi_source_t *fs) { size_t len; - nxt_thread_t *thr; nxt_name_value_t *nv; nxt_lvlhsh_query_t lhq; nxt_http_header_parse_t *hp; nxt_upstream_name_value_t *unv; - thr = nxt_thread(); hp = &fs->u.header.parse; len = hp->header_name_end - hp->header_name_start; if (len > 255) { - nxt_log_error(NXT_LOG_INFO, thr->log, - "upstream sent too long header field name: \"%*s\"", - len, hp->header_name_start); + nxt_log(task, NXT_LOG_INFO, + "upstream sent too long header field name: \"%*s\"", + len, hp->header_name_start); return NXT_ERROR; } @@ -574,8 +573,8 @@ nxt_fastcgi_source_header_process(nxt_fastcgi_source_t *fs) nv->value_len = hp->header_end - hp->header_start; nv->value_start = hp->header_start; - nxt_log_debug(thr->log, "http header: \"%*s: %*s\"", - nv->name_len, nv->name_start, nv->value_len, nv->value_start); + nxt_debug(task, "http header: \"%*s: %*s\"", + nv->name_len, nv->name_start, nv->value_len, nv->value_start); lhq.key_hash = nv->hash; lhq.key.len = nv->name_len; @@ -682,7 +681,7 @@ nxt_fastcgi_source_header_ready(nxt_fastcgi_source_t *fs, nxt_buf_t *b) */ static void -nxt_fastcgi_source_body_filter(nxt_thread_t *thr, void *obj, void *data) +nxt_fastcgi_source_body_filter(nxt_task_t *task, void *obj, void *data) { nxt_buf_t *b, *in; nxt_fastcgi_source_t *fs; @@ -690,7 +689,7 @@ nxt_fastcgi_source_body_filter(nxt_thread_t *thr, void *obj, void *data) fs = obj; in = data; - nxt_log_debug(thr->log, "fastcgi source body filter"); + nxt_debug(task, "fastcgi source body filter"); for (b = in; b != NULL; b = b->next) { @@ -701,7 +700,8 @@ nxt_fastcgi_source_body_filter(nxt_thread_t *thr, void *obj, void *data) } if (fs->next != NULL) { - nxt_source_filter(thr, fs->upstream->work_queue, fs->next, in); + nxt_source_filter(task->thread, fs->upstream->work_queue, task, + fs->next, in); return; } @@ -729,7 +729,7 @@ nxt_fastcgi_source_last_buf(nxt_fastcgi_parse_t *fp) static void -nxt_fastcgi_source_error(nxt_stream_source_t *stream) +nxt_fastcgi_source_error(nxt_task_t *task, nxt_stream_source_t *stream) { nxt_fastcgi_source_t *fs; @@ -737,20 +737,16 @@ nxt_fastcgi_source_error(nxt_stream_source_t *stream) fs = stream->upstream->protocol_source; - nxt_fastcgi_source_fail(fs); + nxt_fastcgi_source_fail(task, fs); } static void -nxt_fastcgi_source_fail(nxt_fastcgi_source_t *fs) +nxt_fastcgi_source_fail(nxt_task_t *task, nxt_fastcgi_source_t *fs) { - nxt_thread_t *thr; - - thr = nxt_thread(); - - nxt_log_debug(thr->log, "fastcgi source fail"); + nxt_debug(task, "fastcgi source fail"); /* TODO: fail, next upstream, or bad gateway */ - fs->upstream->state->error_handler(thr, fs, NULL); + fs->upstream->state->error_handler(task, fs, NULL); } |