summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorIgor Sysoev <igor@sysoev.ru>2020-06-23 14:16:45 +0300
committerIgor Sysoev <igor@sysoev.ru>2020-06-23 14:16:45 +0300
commit65799c7252e56d287d967bf3f036a10d5764f82c (patch)
tree70488dd10766b07784732fab1074dde526732fae
parentf671d1bc54d6db164cf4b03a9ef0e1ddcdd39c72 (diff)
downloadunit-65799c7252e56d287d967bf3f036a10d5764f82c.tar.gz
unit-65799c7252e56d287d967bf3f036a10d5764f82c.tar.bz2
Upstream chunked transfer encoding support.
-rw-r--r--auto/sources2
-rw-r--r--src/nxt_h1proto.c101
-rw-r--r--src/nxt_h1proto.h2
-rw-r--r--src/nxt_http.h1
-rw-r--r--src/nxt_http_chunk_parse.c113
-rw-r--r--src/nxt_http_parse.h16
-rw-r--r--src/nxt_http_proxy.c42
7 files changed, 179 insertions, 98 deletions
diff --git a/auto/sources b/auto/sources
index 2075ca0f..0dd2cbd6 100644
--- a/auto/sources
+++ b/auto/sources
@@ -90,6 +90,7 @@ NXT_LIB_SRCS=" \
src/nxt_http_return.c \
src/nxt_http_static.c \
src/nxt_http_proxy.c \
+ src/nxt_http_chunk_parse.c \
src/nxt_application.c \
src/nxt_external.c \
src/nxt_port_hash.c \
@@ -107,7 +108,6 @@ NXT_LIB_SRC0=" \
src/nxt_stream_source.c \
src/nxt_upstream_source.c \
src/nxt_http_source.c \
- src/nxt_http_chunk_parse.c \
src/nxt_fastcgi_source.c \
src/nxt_fastcgi_record_parse.c \
\
diff --git a/src/nxt_h1proto.c b/src/nxt_h1proto.c
index a139f611..859ed02f 100644
--- a/src/nxt_h1proto.c
+++ b/src/nxt_h1proto.c
@@ -99,6 +99,7 @@ static nxt_int_t nxt_h1p_peer_header_parse(nxt_http_peer_t *peer,
nxt_buf_mem_t *bm);
static void nxt_h1p_peer_read(nxt_task_t *task, nxt_http_peer_t *peer);
static void nxt_h1p_peer_read_done(nxt_task_t *task, void *obj, void *data);
+static void nxt_h1p_peer_body_process(nxt_task_t *task, nxt_http_peer_t *peer, nxt_buf_t *out);
static void nxt_h1p_peer_closed(nxt_task_t *task, void *obj, void *data);
static void nxt_h1p_peer_error(nxt_task_t *task, void *obj, void *data);
static void nxt_h1p_peer_send_timeout(nxt_task_t *task, void *obj, void *data);
@@ -106,6 +107,8 @@ static void nxt_h1p_peer_read_timeout(nxt_task_t *task, void *obj, void *data);
static nxt_msec_t nxt_h1p_peer_timer_value(nxt_conn_t *c, uintptr_t data);
static void nxt_h1p_peer_close(nxt_task_t *task, nxt_http_peer_t *peer);
static void nxt_h1p_peer_free(nxt_task_t *task, void *obj, void *data);
+static nxt_int_t nxt_h1p_peer_transfer_encoding(void *ctx,
+ nxt_http_field_t *field, uintptr_t data);
#if (NXT_TLS)
static const nxt_conn_state_t nxt_http_idle_state;
@@ -178,7 +181,7 @@ static nxt_lvlhsh_t nxt_h1p_peer_fields_hash;
static nxt_http_field_proc_t nxt_h1p_peer_fields[] = {
{ nxt_string("Connection"), &nxt_http_proxy_skip, 0 },
- { nxt_string("Transfer-Encoding"), &nxt_http_proxy_skip, 0 },
+ { nxt_string("Transfer-Encoding"), &nxt_h1p_peer_transfer_encoding, 0 },
{ nxt_string("Server"), &nxt_http_proxy_skip, 0 },
{ nxt_string("Date"), &nxt_http_proxy_date, 0 },
{ nxt_string("Content-Length"), &nxt_http_proxy_content_length, 0 },
@@ -2139,9 +2142,6 @@ nxt_h1p_peer_connect(nxt_task_t *task, nxt_http_peer_t *peer)
peer->proto.h1 = h1p;
h1p->request = r;
- c->socket.task = task;
- c->read_timer.task = task;
- c->write_timer.task = task;
c->socket.data = peer;
c->remote = peer->server->sockaddr;
@@ -2238,7 +2238,8 @@ nxt_h1p_peer_header_send(nxt_task_t *task, nxt_http_peer_t *peer)
r = peer->request;
size = r->method->length + sizeof(" ") + r->target.length
- + sizeof(" HTTP/1.0\r\n")
+ + sizeof(" HTTP/1.1\r\n")
+ + sizeof("Connection: close\r\n")
+ sizeof("\r\n");
nxt_list_each(field, r->fields) {
@@ -2261,7 +2262,8 @@ nxt_h1p_peer_header_send(nxt_task_t *task, nxt_http_peer_t *peer)
p = nxt_cpymem(p, r->method->start, r->method->length);
*p++ = ' ';
p = nxt_cpymem(p, r->target.start, r->target.length);
- p = nxt_cpymem(p, " HTTP/1.0\r\n", 11);
+ p = nxt_cpymem(p, " HTTP/1.1\r\n", 11);
+ p = nxt_cpymem(p, "Connection: close\r\n", 19);
nxt_list_each(field, r->fields) {
@@ -2466,6 +2468,7 @@ nxt_h1p_peer_header_read_done(nxt_task_t *task, void *obj, void *data)
nxt_int_t ret;
nxt_buf_t *b;
nxt_conn_t *c;
+ nxt_h1proto_t *h1p;
nxt_http_peer_t *peer;
nxt_http_request_t *r;
nxt_event_engine_t *engine;
@@ -2503,11 +2506,26 @@ nxt_h1p_peer_header_read_done(nxt_task_t *task, void *obj, void *data)
c->read = NULL;
- if (nxt_buf_mem_used_size(&b->mem) != 0) {
- peer->body = b;
+ peer->header_received = 1;
+
+ h1p = peer->proto.h1;
+
+ if (h1p->chunked) {
+ if (r->resp.content_length != NULL) {
+ peer->status = NXT_HTTP_BAD_GATEWAY;
+ break;
+ }
+
+ h1p->chunked_parse.mem_pool = c->mem_pool;
+
+ } else if (r->resp.content_length_n > 0) {
+ h1p->remainder = r->resp.content_length_n;
}
- peer->header_received = 1;
+ if (nxt_buf_mem_used_size(&b->mem) != 0) {
+ nxt_h1p_peer_body_process(task, peer, b);
+ return;
+ }
r->state->ready_handler(task, r, peer);
return;
@@ -2613,18 +2631,54 @@ static const nxt_conn_state_t nxt_h1p_peer_read_state
static void
nxt_h1p_peer_read_done(nxt_task_t *task, void *obj, void *data)
{
- nxt_conn_t *c;
- nxt_http_peer_t *peer;
- nxt_http_request_t *r;
+ nxt_buf_t *out;
+ nxt_conn_t *c;
+ nxt_http_peer_t *peer;
c = obj;
peer = data;
nxt_debug(task, "h1p peer read done");
- peer->body = c->read;
+ out = c->read;
c->read = NULL;
+ nxt_h1p_peer_body_process(task, peer, out);
+}
+
+
+static void
+nxt_h1p_peer_body_process(nxt_task_t *task, nxt_http_peer_t *peer,
+ nxt_buf_t *out)
+{
+ size_t length;
+ nxt_h1proto_t *h1p;
+ nxt_http_request_t *r;
+
+ h1p = peer->proto.h1;
+
+ if (h1p->chunked) {
+ out = nxt_http_chunk_parse(task, &h1p->chunked_parse, out);
+
+ if (h1p->chunked_parse.chunk_error || h1p->chunked_parse.error) {
+ peer->status = NXT_HTTP_BAD_GATEWAY;
+ r = peer->request;
+ r->state->error_handler(task, r, peer);
+ return;
+ }
+
+ if (h1p->chunked_parse.last) {
+ nxt_buf_chain_add(&out, nxt_http_buf_last(peer->request));
+ peer->closed = 1;
+ }
+
+ } else if (h1p->remainder > 0) {
+ length = nxt_buf_chain_length(out);
+ h1p->remainder -= length;
+ }
+
+ peer->body = out;
+
r = peer->request;
r->state->ready_handler(task, r, peer);
}
@@ -2644,8 +2698,8 @@ nxt_h1p_peer_closed(nxt_task_t *task, void *obj, void *data)
if (peer->header_received) {
peer->body = nxt_http_buf_last(r);
-
peer->closed = 1;
+ r->inconsistent = (peer->proto.h1->remainder != 0);
r->state->ready_handler(task, r, peer);
@@ -2777,3 +2831,22 @@ nxt_h1p_peer_free(nxt_task_t *task, void *obj, void *data)
nxt_conn_free(task, c);
}
+
+
+static nxt_int_t
+nxt_h1p_peer_transfer_encoding(void *ctx, nxt_http_field_t *field,
+ uintptr_t data)
+{
+ nxt_http_request_t *r;
+
+ r = ctx;
+ field->skip = 1;
+
+ if (field->value_length == 7
+ && nxt_memcmp(field->value, "chunked", 7) == 0)
+ {
+ r->peer->proto.h1->chunked = 1;
+ }
+
+ return NXT_OK;
+}
diff --git a/src/nxt_h1proto.h b/src/nxt_h1proto.h
index 3294713f..f8500963 100644
--- a/src/nxt_h1proto.h
+++ b/src/nxt_h1proto.h
@@ -18,6 +18,8 @@ typedef struct nxt_h1p_websocket_timer_s nxt_h1p_websocket_timer_t;
struct nxt_h1proto_s {
nxt_http_request_parse_t parser;
+ nxt_http_chunk_parse_t chunked_parse;
+ nxt_off_t remainder;
uint8_t nbuffers;
uint8_t header_buffer_slot;
diff --git a/src/nxt_http.h b/src/nxt_http.h
index 68051e69..67ac00d8 100644
--- a/src/nxt_http.h
+++ b/src/nxt_http.h
@@ -119,7 +119,6 @@ typedef struct {
nxt_upstream_server_t *server;
nxt_list_t *fields;
nxt_buf_t *body;
- nxt_off_t remainder;
nxt_http_status_t status:16;
nxt_http_protocol_t protocol:8; /* 2 bits */
diff --git a/src/nxt_http_chunk_parse.c b/src/nxt_http_chunk_parse.c
index 644b9805..2164524b 100644
--- a/src/nxt_http_chunk_parse.c
+++ b/src/nxt_http_chunk_parse.c
@@ -21,13 +21,17 @@ static nxt_int_t nxt_http_chunk_buffer(nxt_http_chunk_parse_t *hcp,
nxt_buf_t ***tail, nxt_buf_t *in);
+static void nxt_http_chunk_buf_completion(nxt_task_t *task, void *obj,
+ void *data);
+
+
nxt_buf_t *
nxt_http_chunk_parse(nxt_task_t *task, nxt_http_chunk_parse_t *hcp,
nxt_buf_t *in)
{
u_char c, ch;
nxt_int_t ret;
- nxt_buf_t *b, *out, *nb, **tail;
+ nxt_buf_t *b, *out, *next, **tail;
enum {
sw_start = 0,
sw_chunk_size,
@@ -37,12 +41,13 @@ nxt_http_chunk_parse(nxt_task_t *task, nxt_http_chunk_parse_t *hcp,
sw_chunk,
} state;
+ next = NULL;
out = NULL;
tail = &out;
state = hcp->state;
- for (b = in; b != NULL; b = b->next) {
+ for (b = in; b != NULL; b = next) {
hcp->pos = b->mem.pos;
@@ -60,7 +65,7 @@ nxt_http_chunk_parse(nxt_task_t *task, nxt_http_chunk_parse_t *hcp,
if (nxt_slow_path(ret == NXT_ERROR)) {
hcp->error = 1;
- goto done;
+ return out;
}
state = sw_chunk_end_newline;
@@ -152,7 +157,7 @@ nxt_http_chunk_parse(nxt_task_t *task, nxt_http_chunk_parse_t *hcp,
continue;
}
- goto done;
+ return out;
}
goto chunk_error;
@@ -168,15 +173,15 @@ nxt_http_chunk_parse(nxt_task_t *task, nxt_http_chunk_parse_t *hcp,
if (b->retain == 0) {
/* No chunk data was found in a buffer. */
- nxt_thread_current_work_queue_add(task->thread,
- b->completion_handler,
- task, b, b->parent);
+ nxt_work_queue_add(&task->thread->engine->fast_work_queue,
+ b->completion_handler, task, b, b->parent);
}
next:
- continue;
+ next = b->next;
+ b->next = NULL;
}
hcp->state = state;
@@ -187,20 +192,6 @@ chunk_error:
hcp->chunk_error = 1;
-done:
-
- nb = nxt_buf_sync_alloc(hcp->mem_pool, NXT_BUF_SYNC_LAST);
-
- if (nxt_fast_path(nb != NULL)) {
- *tail = nb;
-
- } else {
- hcp->error = 1;
- }
-
- // STUB: hcp->chunk_error = 1;
- // STUB: hcp->error = 1;
-
return out;
}
@@ -216,43 +207,35 @@ nxt_http_chunk_buffer(nxt_http_chunk_parse_t *hcp, nxt_buf_t ***tail,
p = hcp->pos;
size = in->mem.free - p;
- if (hcp->chunk_size >= size && in->retain == 0) {
- /*
- * Use original buffer if the buffer is lesser than or equal
- * to a chunk size and this is the first chunk in the buffer.
- */
- in->mem.pos = p;
- **tail = in;
- *tail = &in->next;
-
- } else {
- b = nxt_buf_mem_alloc(hcp->mem_pool, 0, 0);
- if (nxt_slow_path(b == NULL)) {
- return NXT_ERROR;
- }
+ b = nxt_buf_mem_alloc(hcp->mem_pool, 0, 0);
+ if (nxt_slow_path(b == NULL)) {
+ return NXT_ERROR;
+ }
- **tail = b;
- *tail = &b->next;
+ **tail = b;
+ *tail = &b->next;
- b->parent = in;
- in->retain++;
- b->mem.pos = p;
- b->mem.start = p;
+ nxt_mp_retain(hcp->mem_pool);
+ b->completion_handler = nxt_http_chunk_buf_completion;
- if (hcp->chunk_size < size) {
- p += hcp->chunk_size;
- hcp->pos = p;
+ b->parent = in;
+ in->retain++;
+ b->mem.pos = p;
+ b->mem.start = p;
- b->mem.free = p;
- b->mem.end = p;
+ if (hcp->chunk_size < size) {
+ p += hcp->chunk_size;
+ hcp->pos = p;
- return NXT_HTTP_CHUNK_END;
- }
+ b->mem.free = p;
+ b->mem.end = p;
- b->mem.free = in->mem.free;
- b->mem.end = in->mem.free;
+ return NXT_HTTP_CHUNK_END;
}
+ b->mem.free = in->mem.free;
+ b->mem.end = in->mem.free;
+
hcp->chunk_size -= size;
if (hcp->chunk_size == 0) {
@@ -261,3 +244,31 @@ nxt_http_chunk_buffer(nxt_http_chunk_parse_t *hcp, nxt_buf_t ***tail,
return NXT_HTTP_CHUNK_MIDDLE;
}
+
+
+static void
+nxt_http_chunk_buf_completion(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_mp_t *mp;
+ nxt_buf_t *b, *next, *parent;
+
+ b = obj;
+ parent = data;
+
+ nxt_debug(task, "buf completion: %p %p", b, b->mem.start);
+
+ nxt_assert(data == b->parent);
+
+ do {
+ next = b->next;
+ parent = b->parent;
+ mp = b->data;
+
+ nxt_mp_free(mp, b);
+ nxt_mp_release(mp);
+
+ nxt_buf_parent_completion(task, parent);
+
+ b = next;
+ } while (b != NULL);
+}
diff --git a/src/nxt_http_parse.h b/src/nxt_http_parse.h
index 0f888949..cbfc8433 100644
--- a/src/nxt_http_parse.h
+++ b/src/nxt_http_parse.h
@@ -90,6 +90,19 @@ struct nxt_http_field_s {
};
+typedef struct {
+ u_char *pos;
+ nxt_mp_t *mem_pool;
+
+ uint64_t chunk_size;
+
+ uint8_t state;
+ uint8_t last; /* 1 bit */
+ uint8_t chunk_error; /* 1 bit */
+ uint8_t error; /* 1 bit */
+} nxt_http_chunk_parse_t;
+
+
#define NXT_HTTP_FIELD_HASH_INIT 159406U
#define nxt_http_field_hash_char(h, c) (((h) << 4) + (h) + (c))
#define nxt_http_field_hash_end(h) (((h) >> 16) ^ (h))
@@ -109,6 +122,9 @@ nxt_uint_t nxt_http_fields_hash_collisions(nxt_lvlhsh_t *hash,
nxt_int_t nxt_http_fields_process(nxt_list_t *fields, nxt_lvlhsh_t *hash,
void *ctx);
+nxt_buf_t *nxt_http_chunk_parse(nxt_task_t *task, nxt_http_chunk_parse_t *hcp,
+ nxt_buf_t *in);
+
extern const nxt_lvlhsh_proto_t nxt_http_fields_hash_proto;
diff --git a/src/nxt_http_proxy.c b/src/nxt_http_proxy.c
index 893e9303..34d0f36e 100644
--- a/src/nxt_http_proxy.c
+++ b/src/nxt_http_proxy.c
@@ -27,8 +27,6 @@ static void nxt_http_proxy_header_send(nxt_task_t *task, void *obj, void *data);
static void nxt_http_proxy_header_sent(nxt_task_t *task, void *obj, void *data);
static void nxt_http_proxy_header_read(nxt_task_t *task, void *obj, void *data);
static void nxt_http_proxy_send_body(nxt_task_t *task, void *obj, void *data);
-static void nxt_http_proxy_request_send(nxt_task_t *task,
- nxt_http_request_t *r, nxt_buf_t *out);
static void nxt_http_proxy_read(nxt_task_t *task, void *obj, void *data);
static void nxt_http_proxy_buf_mem_completion(nxt_task_t *task, void *obj,
void *data);
@@ -253,10 +251,6 @@ nxt_http_proxy_header_read(nxt_task_t *task, void *obj, void *data)
nxt_debug(task, "http proxy status: %d", peer->status);
- if (r->resp.content_length_n > 0) {
- peer->remainder = r->resp.content_length_n;
- }
-
nxt_list_each(field, peer->fields) {
nxt_debug(task, "http proxy header: \"%*s: %*s\"",
@@ -275,6 +269,8 @@ nxt_http_proxy_header_read(nxt_task_t *task, void *obj, void *data)
} nxt_list_loop;
+ r->state = &nxt_http_proxy_read_state;
+
nxt_http_request_header_send(task, r, nxt_http_proxy_send_body, peer);
}
@@ -292,27 +288,13 @@ nxt_http_proxy_send_body(nxt_task_t *task, void *obj, void *data)
if (out != NULL) {
peer->body = NULL;
- nxt_http_proxy_request_send(task, r, out);
- }
-
- r->state = &nxt_http_proxy_read_state;
-
- nxt_http_proto[peer->protocol].peer_read(task, peer);
-}
-
+ nxt_http_request_send(task, r, out);
-static void
-nxt_http_proxy_request_send(nxt_task_t *task, nxt_http_request_t *r,
- nxt_buf_t *out)
-{
- size_t length;
-
- if (r->peer->remainder > 0) {
- length = nxt_buf_chain_length(out);
- r->peer->remainder -= length;
}
- nxt_http_request_send(task, r, out);
+ if (!peer->closed) {
+ nxt_http_proto[peer->protocol].peer_read(task, peer);
+ }
}
@@ -328,7 +310,6 @@ static void
nxt_http_proxy_read(nxt_task_t *task, void *obj, void *data)
{
nxt_buf_t *out;
- nxt_bool_t last;
nxt_http_peer_t *peer;
nxt_http_request_t *r;
@@ -336,16 +317,15 @@ nxt_http_proxy_read(nxt_task_t *task, void *obj, void *data)
peer = data;
out = peer->body;
peer->body = NULL;
- last = nxt_buf_is_last(out);
- nxt_http_proxy_request_send(task, r, out);
+ if (out != NULL) {
+ nxt_http_request_send(task, r, out);
+ }
- if (!last) {
+ if (!peer->closed) {
nxt_http_proto[peer->protocol].peer_read(task, peer);
} else {
- r->inconsistent = (peer->remainder != 0);
-
nxt_http_proto[peer->protocol].peer_close(task, peer);
nxt_mp_release(r->mem_pool);
@@ -422,7 +402,7 @@ nxt_http_proxy_error(nxt_task_t *task, void *obj, void *data)
nxt_mp_release(r->mem_pool);
- nxt_http_request_error(task, r, peer->status);
+ nxt_http_request_error(&r->task, r, peer->status);
}