summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_fastcgi_source.c
diff options
context:
space:
mode:
authorIgor Sysoev <igor@sysoev.ru>2017-01-23 19:56:03 +0300
committerIgor Sysoev <igor@sysoev.ru>2017-01-23 19:56:03 +0300
commitde532922d9ab42aa15b40d47c8db53ac2af38500 (patch)
treed6b7c6b21c7a6e0e3620a3e0c7198e63454d05e3 /src/nxt_fastcgi_source.c
parent16cbf3c076a0aca6d47adaf3f719493674cf2363 (diff)
downloadunit-de532922d9ab42aa15b40d47c8db53ac2af38500.tar.gz
unit-de532922d9ab42aa15b40d47c8db53ac2af38500.tar.bz2
Introducing tasks.
Diffstat (limited to 'src/nxt_fastcgi_source.c')
-rw-r--r--src/nxt_fastcgi_source.c124
1 files changed, 60 insertions, 64 deletions
diff --git a/src/nxt_fastcgi_source.c b/src/nxt_fastcgi_source.c
index 14c51d6f..d655fca8 100644
--- a/src/nxt_fastcgi_source.c
+++ b/src/nxt_fastcgi_source.c
@@ -49,16 +49,17 @@ static nxt_buf_t *nxt_fastcgi_request_create(nxt_fastcgi_source_t *fs);
static nxt_int_t nxt_fastcgi_next_param(nxt_fastcgi_source_t *fs,
nxt_fastcgi_param_t *param);
-static void nxt_fastcgi_source_record_filter(nxt_thread_t *thr, void *obj,
+static void nxt_fastcgi_source_record_filter(nxt_task_t *task, void *obj,
void *data);
-static void nxt_fastcgi_source_record_error(nxt_thread_t *thr, void *obj,
+static void nxt_fastcgi_source_record_error(nxt_task_t *task, void *obj,
void *data);
-static void nxt_fastcgi_source_header_filter(nxt_thread_t *thr, void *obj,
+static void nxt_fastcgi_source_header_filter(nxt_task_t *task, void *obj,
void *data);
-static void nxt_fastcgi_source_sync_buffer(nxt_thread_t *thr,
+static void nxt_fastcgi_source_sync_buffer(nxt_task_t *task,
nxt_fastcgi_source_t *fs, nxt_buf_t *b);
-static nxt_int_t nxt_fastcgi_source_header_process(nxt_fastcgi_source_t *fs);
+static nxt_int_t nxt_fastcgi_source_header_process(nxt_task_t *task,
+ nxt_fastcgi_source_t *fs);
static nxt_int_t nxt_fastcgi_source_status(nxt_upstream_source_t *us,
nxt_name_value_t *nv);
static nxt_int_t nxt_fastcgi_source_content_length(nxt_upstream_source_t *us,
@@ -66,11 +67,12 @@ static nxt_int_t nxt_fastcgi_source_content_length(nxt_upstream_source_t *us,
static void nxt_fastcgi_source_header_ready(nxt_fastcgi_source_t *fs,
nxt_buf_t *b);
-static void nxt_fastcgi_source_body_filter(nxt_thread_t *thr, void *obj,
+static void nxt_fastcgi_source_body_filter(nxt_task_t *task, void *obj,
void *data);
static nxt_buf_t *nxt_fastcgi_source_last_buf(nxt_fastcgi_parse_t *fp);
-static void nxt_fastcgi_source_error(nxt_stream_source_t *stream);
-static void nxt_fastcgi_source_fail(nxt_fastcgi_source_t *fs);
+static void nxt_fastcgi_source_error(nxt_task_t *task,
+ nxt_stream_source_t *stream);
+static void nxt_fastcgi_source_fail(nxt_task_t *task, nxt_fastcgi_source_t *fs);
/*
@@ -117,7 +119,7 @@ static const uint8_t nxt_fastcgi_stdin_record[] = {
void
-nxt_fastcgi_source_handler(nxt_upstream_source_t *us,
+nxt_fastcgi_source_handler(nxt_task_t *task, nxt_upstream_source_t *us,
nxt_fastcgi_source_request_create_t request_create)
{
nxt_stream_source_t *stream;
@@ -181,13 +183,13 @@ nxt_fastcgi_source_handler(nxt_upstream_source_t *us,
nxt_memzero(&fs->u.header, sizeof(nxt_http_split_header_parse_t));
fs->u.header.mem_pool = fs->upstream->buffers.mem_pool;
- nxt_stream_source_connect(stream);
+ nxt_stream_source_connect(task, stream);
return;
}
fail:
- nxt_fastcgi_source_fail(fs);
+ nxt_fastcgi_source_fail(task, fs);
}
@@ -383,7 +385,7 @@ nxt_fastcgi_next_param(nxt_fastcgi_source_t *fs, nxt_fastcgi_param_t *param)
static void
-nxt_fastcgi_source_record_filter(nxt_thread_t *thr, void *obj, void *data)
+nxt_fastcgi_source_record_filter(nxt_task_t *task, void *obj, void *data)
{
size_t size;
u_char *p;
@@ -394,18 +396,18 @@ nxt_fastcgi_source_record_filter(nxt_thread_t *thr, void *obj, void *data)
fsr = obj;
in = data;
- nxt_log_debug(thr->log, "fastcgi source record filter");
+ nxt_debug(task, "fastcgi source record filter");
if (nxt_slow_path(fsr->parse.done)) {
return;
}
- nxt_fastcgi_record_parse(&fsr->parse, in);
+ nxt_fastcgi_record_parse(task, &fsr->parse, in);
fs = nxt_container_of(fsr, nxt_fastcgi_source_t, record);
if (fsr->parse.error) {
- nxt_fastcgi_source_fail(fs);
+ nxt_fastcgi_source_fail(task, fs);
return;
}
@@ -413,8 +415,9 @@ nxt_fastcgi_source_record_filter(nxt_thread_t *thr, void *obj, void *data)
/*
* Output all parsed before a FastCGI record error and close upstream.
*/
- nxt_thread_current_work_queue_add(thr, nxt_fastcgi_source_record_error,
- fs, NULL, thr->log);
+ nxt_thread_current_work_queue_add(task->thread,
+ nxt_fastcgi_source_record_error,
+ task, fs, NULL);
}
/* Log FastCGI stderr output. */
@@ -430,36 +433,36 @@ nxt_fastcgi_source_record_filter(nxt_thread_t *thr, void *obj, void *data)
size = (p + 1) - b->mem.pos;
if (size != 0) {
- nxt_log_error(NXT_LOG_ERR, thr->log,
- "upstream sent in FastCGI stderr: \"%*s\"",
- size, b->mem.pos);
+ nxt_log(task, NXT_LOG_ERR,
+ "upstream sent in FastCGI stderr: \"%*s\"",
+ size, b->mem.pos);
}
- b->completion_handler(thr, b, b->parent);
+ b->completion_handler(task, b, b->parent);
}
/* Process FastCGI stdout output. */
if (fsr->parse.out[0] != NULL) {
- nxt_source_filter(thr, fs->upstream->work_queue, &fsr->next,
- fsr->parse.out[0]);
+ nxt_source_filter(task->thread, fs->upstream->work_queue, task,
+ &fsr->next, fsr->parse.out[0]);
}
}
static void
-nxt_fastcgi_source_record_error(nxt_thread_t *thr, void *obj, void *data)
+nxt_fastcgi_source_record_error(nxt_task_t *task, void *obj, void *data)
{
nxt_fastcgi_source_t *fs;
fs = obj;
- nxt_fastcgi_source_fail(fs);
+ nxt_fastcgi_source_fail(task, fs);
}
static void
-nxt_fastcgi_source_header_filter(nxt_thread_t *thr, void *obj, void *data)
+nxt_fastcgi_source_header_filter(nxt_task_t *task, void *obj, void *data)
{
nxt_int_t ret;
nxt_buf_t *b;
@@ -469,10 +472,10 @@ nxt_fastcgi_source_header_filter(nxt_thread_t *thr, void *obj, void *data)
b = data;
do {
- nxt_log_debug(thr->log, "fastcgi source header filter");
+ nxt_debug(task, "fastcgi source header filter");
if (nxt_slow_path(nxt_buf_is_sync(b))) {
- nxt_fastcgi_source_sync_buffer(thr, fs, b);
+ nxt_fastcgi_source_sync_buffer(task, fs, b);
return;
}
@@ -483,7 +486,7 @@ nxt_fastcgi_source_header_filter(nxt_thread_t *thr, void *obj, void *data)
break;
}
- ret = nxt_fastcgi_source_header_process(fs);
+ ret = nxt_fastcgi_source_header_process(task, fs);
if (nxt_slow_path(ret != NXT_OK)) {
break;
@@ -491,7 +494,7 @@ nxt_fastcgi_source_header_filter(nxt_thread_t *thr, void *obj, void *data)
}
if (nxt_fast_path(ret == NXT_DONE)) {
- nxt_log_debug(thr->log, "fastcgi source header done");
+ nxt_debug(task, "fastcgi source header done");
nxt_fastcgi_source_header_ready(fs, b);
return;
}
@@ -500,16 +503,16 @@ nxt_fastcgi_source_header_filter(nxt_thread_t *thr, void *obj, void *data)
if (ret != NXT_ERROR) {
/* n == NXT_DECLINED: "\r" is not followed by "\n" */
- nxt_log_error(NXT_LOG_ERR, thr->log,
- "upstream sent invalid header line: \"%*s\\r...\"",
- fs->u.header.parse.header_end
- - fs->u.header.parse.header_name_start,
- fs->u.header.parse.header_name_start);
+ nxt_log(task, NXT_LOG_ERR,
+ "upstream sent invalid header line: \"%*s\\r...\"",
+ fs->u.header.parse.header_end
+ - fs->u.header.parse.header_name_start,
+ fs->u.header.parse.header_name_start);
}
/* ret == NXT_ERROR */
- nxt_fastcgi_source_fail(fs);
+ nxt_fastcgi_source_fail(task, fs);
return;
}
@@ -520,45 +523,41 @@ nxt_fastcgi_source_header_filter(nxt_thread_t *thr, void *obj, void *data)
static void
-nxt_fastcgi_source_sync_buffer(nxt_thread_t *thr, nxt_fastcgi_source_t *fs,
+nxt_fastcgi_source_sync_buffer(nxt_task_t *task, nxt_fastcgi_source_t *fs,
nxt_buf_t *b)
{
if (nxt_buf_is_last(b)) {
- nxt_log_error(NXT_LOG_ERR, thr->log,
- "upstream closed prematurely connection");
+ nxt_log(task, NXT_LOG_ERR, "upstream closed prematurely connection");
} else {
- nxt_log_error(NXT_LOG_ERR, thr->log, "%ui buffers %uz each are not "
- "enough to process upstream response header",
- fs->upstream->buffers.max,
- fs->upstream->buffers.size);
+ nxt_log(task, NXT_LOG_ERR, "%ui buffers %uz each are not "
+ "enough to process upstream response header",
+ fs->upstream->buffers.max, fs->upstream->buffers.size);
}
/* The stream source sends only the last and the nobuf sync buffer. */
- nxt_fastcgi_source_fail(fs);
+ nxt_fastcgi_source_fail(task, fs);
}
static nxt_int_t
-nxt_fastcgi_source_header_process(nxt_fastcgi_source_t *fs)
+nxt_fastcgi_source_header_process(nxt_task_t *task, nxt_fastcgi_source_t *fs)
{
size_t len;
- nxt_thread_t *thr;
nxt_name_value_t *nv;
nxt_lvlhsh_query_t lhq;
nxt_http_header_parse_t *hp;
nxt_upstream_name_value_t *unv;
- thr = nxt_thread();
hp = &fs->u.header.parse;
len = hp->header_name_end - hp->header_name_start;
if (len > 255) {
- nxt_log_error(NXT_LOG_INFO, thr->log,
- "upstream sent too long header field name: \"%*s\"",
- len, hp->header_name_start);
+ nxt_log(task, NXT_LOG_INFO,
+ "upstream sent too long header field name: \"%*s\"",
+ len, hp->header_name_start);
return NXT_ERROR;
}
@@ -574,8 +573,8 @@ nxt_fastcgi_source_header_process(nxt_fastcgi_source_t *fs)
nv->value_len = hp->header_end - hp->header_start;
nv->value_start = hp->header_start;
- nxt_log_debug(thr->log, "http header: \"%*s: %*s\"",
- nv->name_len, nv->name_start, nv->value_len, nv->value_start);
+ nxt_debug(task, "http header: \"%*s: %*s\"",
+ nv->name_len, nv->name_start, nv->value_len, nv->value_start);
lhq.key_hash = nv->hash;
lhq.key.len = nv->name_len;
@@ -682,7 +681,7 @@ nxt_fastcgi_source_header_ready(nxt_fastcgi_source_t *fs, nxt_buf_t *b)
*/
static void
-nxt_fastcgi_source_body_filter(nxt_thread_t *thr, void *obj, void *data)
+nxt_fastcgi_source_body_filter(nxt_task_t *task, void *obj, void *data)
{
nxt_buf_t *b, *in;
nxt_fastcgi_source_t *fs;
@@ -690,7 +689,7 @@ nxt_fastcgi_source_body_filter(nxt_thread_t *thr, void *obj, void *data)
fs = obj;
in = data;
- nxt_log_debug(thr->log, "fastcgi source body filter");
+ nxt_debug(task, "fastcgi source body filter");
for (b = in; b != NULL; b = b->next) {
@@ -701,7 +700,8 @@ nxt_fastcgi_source_body_filter(nxt_thread_t *thr, void *obj, void *data)
}
if (fs->next != NULL) {
- nxt_source_filter(thr, fs->upstream->work_queue, fs->next, in);
+ nxt_source_filter(task->thread, fs->upstream->work_queue, task,
+ fs->next, in);
return;
}
@@ -729,7 +729,7 @@ nxt_fastcgi_source_last_buf(nxt_fastcgi_parse_t *fp)
static void
-nxt_fastcgi_source_error(nxt_stream_source_t *stream)
+nxt_fastcgi_source_error(nxt_task_t *task, nxt_stream_source_t *stream)
{
nxt_fastcgi_source_t *fs;
@@ -737,20 +737,16 @@ nxt_fastcgi_source_error(nxt_stream_source_t *stream)
fs = stream->upstream->protocol_source;
- nxt_fastcgi_source_fail(fs);
+ nxt_fastcgi_source_fail(task, fs);
}
static void
-nxt_fastcgi_source_fail(nxt_fastcgi_source_t *fs)
+nxt_fastcgi_source_fail(nxt_task_t *task, nxt_fastcgi_source_t *fs)
{
- nxt_thread_t *thr;
-
- thr = nxt_thread();
-
- nxt_log_debug(thr->log, "fastcgi source fail");
+ nxt_debug(task, "fastcgi source fail");
/* TODO: fail, next upstream, or bad gateway */
- fs->upstream->state->error_handler(thr, fs, NULL);
+ fs->upstream->state->error_handler(task, fs, NULL);
}