diff options
author | Igor Sysoev <igor@sysoev.ru> | 2020-06-23 14:16:45 +0300 |
---|---|---|
committer | Igor Sysoev <igor@sysoev.ru> | 2020-06-23 14:16:45 +0300 |
commit | 65799c7252e56d287d967bf3f036a10d5764f82c (patch) | |
tree | 70488dd10766b07784732fab1074dde526732fae | |
parent | f671d1bc54d6db164cf4b03a9ef0e1ddcdd39c72 (diff) | |
download | unit-65799c7252e56d287d967bf3f036a10d5764f82c.tar.gz unit-65799c7252e56d287d967bf3f036a10d5764f82c.tar.bz2 |
Upstream chunked transfer encoding support.
-rw-r--r-- | auto/sources | 2 | ||||
-rw-r--r-- | src/nxt_h1proto.c | 101 | ||||
-rw-r--r-- | src/nxt_h1proto.h | 2 | ||||
-rw-r--r-- | src/nxt_http.h | 1 | ||||
-rw-r--r-- | src/nxt_http_chunk_parse.c | 113 | ||||
-rw-r--r-- | src/nxt_http_parse.h | 16 | ||||
-rw-r--r-- | src/nxt_http_proxy.c | 42 |
7 files changed, 179 insertions, 98 deletions
diff --git a/auto/sources b/auto/sources index 2075ca0f..0dd2cbd6 100644 --- a/auto/sources +++ b/auto/sources @@ -90,6 +90,7 @@ NXT_LIB_SRCS=" \ src/nxt_http_return.c \ src/nxt_http_static.c \ src/nxt_http_proxy.c \ + src/nxt_http_chunk_parse.c \ src/nxt_application.c \ src/nxt_external.c \ src/nxt_port_hash.c \ @@ -107,7 +108,6 @@ NXT_LIB_SRC0=" \ src/nxt_stream_source.c \ src/nxt_upstream_source.c \ src/nxt_http_source.c \ - src/nxt_http_chunk_parse.c \ src/nxt_fastcgi_source.c \ src/nxt_fastcgi_record_parse.c \ \ diff --git a/src/nxt_h1proto.c b/src/nxt_h1proto.c index a139f611..859ed02f 100644 --- a/src/nxt_h1proto.c +++ b/src/nxt_h1proto.c @@ -99,6 +99,7 @@ static nxt_int_t nxt_h1p_peer_header_parse(nxt_http_peer_t *peer, nxt_buf_mem_t *bm); static void nxt_h1p_peer_read(nxt_task_t *task, nxt_http_peer_t *peer); static void nxt_h1p_peer_read_done(nxt_task_t *task, void *obj, void *data); +static void nxt_h1p_peer_body_process(nxt_task_t *task, nxt_http_peer_t *peer, nxt_buf_t *out); static void nxt_h1p_peer_closed(nxt_task_t *task, void *obj, void *data); static void nxt_h1p_peer_error(nxt_task_t *task, void *obj, void *data); static void nxt_h1p_peer_send_timeout(nxt_task_t *task, void *obj, void *data); @@ -106,6 +107,8 @@ static void nxt_h1p_peer_read_timeout(nxt_task_t *task, void *obj, void *data); static nxt_msec_t nxt_h1p_peer_timer_value(nxt_conn_t *c, uintptr_t data); static void nxt_h1p_peer_close(nxt_task_t *task, nxt_http_peer_t *peer); static void nxt_h1p_peer_free(nxt_task_t *task, void *obj, void *data); +static nxt_int_t nxt_h1p_peer_transfer_encoding(void *ctx, + nxt_http_field_t *field, uintptr_t data); #if (NXT_TLS) static const nxt_conn_state_t nxt_http_idle_state; @@ -178,7 +181,7 @@ static nxt_lvlhsh_t nxt_h1p_peer_fields_hash; static nxt_http_field_proc_t nxt_h1p_peer_fields[] = { { nxt_string("Connection"), &nxt_http_proxy_skip, 0 }, - { nxt_string("Transfer-Encoding"), &nxt_http_proxy_skip, 0 }, + { nxt_string("Transfer-Encoding"), &nxt_h1p_peer_transfer_encoding, 0 }, { nxt_string("Server"), &nxt_http_proxy_skip, 0 }, { nxt_string("Date"), &nxt_http_proxy_date, 0 }, { nxt_string("Content-Length"), &nxt_http_proxy_content_length, 0 }, @@ -2139,9 +2142,6 @@ nxt_h1p_peer_connect(nxt_task_t *task, nxt_http_peer_t *peer) peer->proto.h1 = h1p; h1p->request = r; - c->socket.task = task; - c->read_timer.task = task; - c->write_timer.task = task; c->socket.data = peer; c->remote = peer->server->sockaddr; @@ -2238,7 +2238,8 @@ nxt_h1p_peer_header_send(nxt_task_t *task, nxt_http_peer_t *peer) r = peer->request; size = r->method->length + sizeof(" ") + r->target.length - + sizeof(" HTTP/1.0\r\n") + + sizeof(" HTTP/1.1\r\n") + + sizeof("Connection: close\r\n") + sizeof("\r\n"); nxt_list_each(field, r->fields) { @@ -2261,7 +2262,8 @@ nxt_h1p_peer_header_send(nxt_task_t *task, nxt_http_peer_t *peer) p = nxt_cpymem(p, r->method->start, r->method->length); *p++ = ' '; p = nxt_cpymem(p, r->target.start, r->target.length); - p = nxt_cpymem(p, " HTTP/1.0\r\n", 11); + p = nxt_cpymem(p, " HTTP/1.1\r\n", 11); + p = nxt_cpymem(p, "Connection: close\r\n", 19); nxt_list_each(field, r->fields) { @@ -2466,6 +2468,7 @@ nxt_h1p_peer_header_read_done(nxt_task_t *task, void *obj, void *data) nxt_int_t ret; nxt_buf_t *b; nxt_conn_t *c; + nxt_h1proto_t *h1p; nxt_http_peer_t *peer; nxt_http_request_t *r; nxt_event_engine_t *engine; @@ -2503,11 +2506,26 @@ nxt_h1p_peer_header_read_done(nxt_task_t *task, void *obj, void *data) c->read = NULL; - if (nxt_buf_mem_used_size(&b->mem) != 0) { - peer->body = b; + peer->header_received = 1; + + h1p = peer->proto.h1; + + if (h1p->chunked) { + if (r->resp.content_length != NULL) { + peer->status = NXT_HTTP_BAD_GATEWAY; + break; + } + + h1p->chunked_parse.mem_pool = c->mem_pool; + + } else if (r->resp.content_length_n > 0) { + h1p->remainder = r->resp.content_length_n; } - peer->header_received = 1; + if (nxt_buf_mem_used_size(&b->mem) != 0) { + nxt_h1p_peer_body_process(task, peer, b); + return; + } r->state->ready_handler(task, r, peer); return; @@ -2613,18 +2631,54 @@ static const nxt_conn_state_t nxt_h1p_peer_read_state static void nxt_h1p_peer_read_done(nxt_task_t *task, void *obj, void *data) { - nxt_conn_t *c; - nxt_http_peer_t *peer; - nxt_http_request_t *r; + nxt_buf_t *out; + nxt_conn_t *c; + nxt_http_peer_t *peer; c = obj; peer = data; nxt_debug(task, "h1p peer read done"); - peer->body = c->read; + out = c->read; c->read = NULL; + nxt_h1p_peer_body_process(task, peer, out); +} + + +static void +nxt_h1p_peer_body_process(nxt_task_t *task, nxt_http_peer_t *peer, + nxt_buf_t *out) +{ + size_t length; + nxt_h1proto_t *h1p; + nxt_http_request_t *r; + + h1p = peer->proto.h1; + + if (h1p->chunked) { + out = nxt_http_chunk_parse(task, &h1p->chunked_parse, out); + + if (h1p->chunked_parse.chunk_error || h1p->chunked_parse.error) { + peer->status = NXT_HTTP_BAD_GATEWAY; + r = peer->request; + r->state->error_handler(task, r, peer); + return; + } + + if (h1p->chunked_parse.last) { + nxt_buf_chain_add(&out, nxt_http_buf_last(peer->request)); + peer->closed = 1; + } + + } else if (h1p->remainder > 0) { + length = nxt_buf_chain_length(out); + h1p->remainder -= length; + } + + peer->body = out; + r = peer->request; r->state->ready_handler(task, r, peer); } @@ -2644,8 +2698,8 @@ nxt_h1p_peer_closed(nxt_task_t *task, void *obj, void *data) if (peer->header_received) { peer->body = nxt_http_buf_last(r); - peer->closed = 1; + r->inconsistent = (peer->proto.h1->remainder != 0); r->state->ready_handler(task, r, peer); @@ -2777,3 +2831,22 @@ nxt_h1p_peer_free(nxt_task_t *task, void *obj, void *data) nxt_conn_free(task, c); } + + +static nxt_int_t +nxt_h1p_peer_transfer_encoding(void *ctx, nxt_http_field_t *field, + uintptr_t data) +{ + nxt_http_request_t *r; + + r = ctx; + field->skip = 1; + + if (field->value_length == 7 + && nxt_memcmp(field->value, "chunked", 7) == 0) + { + r->peer->proto.h1->chunked = 1; + } + + return NXT_OK; +} diff --git a/src/nxt_h1proto.h b/src/nxt_h1proto.h index 3294713f..f8500963 100644 --- a/src/nxt_h1proto.h +++ b/src/nxt_h1proto.h @@ -18,6 +18,8 @@ typedef struct nxt_h1p_websocket_timer_s nxt_h1p_websocket_timer_t; struct nxt_h1proto_s { nxt_http_request_parse_t parser; + nxt_http_chunk_parse_t chunked_parse; + nxt_off_t remainder; uint8_t nbuffers; uint8_t header_buffer_slot; diff --git a/src/nxt_http.h b/src/nxt_http.h index 68051e69..67ac00d8 100644 --- a/src/nxt_http.h +++ b/src/nxt_http.h @@ -119,7 +119,6 @@ typedef struct { nxt_upstream_server_t *server; nxt_list_t *fields; nxt_buf_t *body; - nxt_off_t remainder; nxt_http_status_t status:16; nxt_http_protocol_t protocol:8; /* 2 bits */ diff --git a/src/nxt_http_chunk_parse.c b/src/nxt_http_chunk_parse.c index 644b9805..2164524b 100644 --- a/src/nxt_http_chunk_parse.c +++ b/src/nxt_http_chunk_parse.c @@ -21,13 +21,17 @@ static nxt_int_t nxt_http_chunk_buffer(nxt_http_chunk_parse_t *hcp, nxt_buf_t ***tail, nxt_buf_t *in); +static void nxt_http_chunk_buf_completion(nxt_task_t *task, void *obj, + void *data); + + nxt_buf_t * nxt_http_chunk_parse(nxt_task_t *task, nxt_http_chunk_parse_t *hcp, nxt_buf_t *in) { u_char c, ch; nxt_int_t ret; - nxt_buf_t *b, *out, *nb, **tail; + nxt_buf_t *b, *out, *next, **tail; enum { sw_start = 0, sw_chunk_size, @@ -37,12 +41,13 @@ nxt_http_chunk_parse(nxt_task_t *task, nxt_http_chunk_parse_t *hcp, sw_chunk, } state; + next = NULL; out = NULL; tail = &out; state = hcp->state; - for (b = in; b != NULL; b = b->next) { + for (b = in; b != NULL; b = next) { hcp->pos = b->mem.pos; @@ -60,7 +65,7 @@ nxt_http_chunk_parse(nxt_task_t *task, nxt_http_chunk_parse_t *hcp, if (nxt_slow_path(ret == NXT_ERROR)) { hcp->error = 1; - goto done; + return out; } state = sw_chunk_end_newline; @@ -152,7 +157,7 @@ nxt_http_chunk_parse(nxt_task_t *task, nxt_http_chunk_parse_t *hcp, continue; } - goto done; + return out; } goto chunk_error; @@ -168,15 +173,15 @@ nxt_http_chunk_parse(nxt_task_t *task, nxt_http_chunk_parse_t *hcp, if (b->retain == 0) { /* No chunk data was found in a buffer. */ - nxt_thread_current_work_queue_add(task->thread, - b->completion_handler, - task, b, b->parent); + nxt_work_queue_add(&task->thread->engine->fast_work_queue, + b->completion_handler, task, b, b->parent); } next: - continue; + next = b->next; + b->next = NULL; } hcp->state = state; @@ -187,20 +192,6 @@ chunk_error: hcp->chunk_error = 1; -done: - - nb = nxt_buf_sync_alloc(hcp->mem_pool, NXT_BUF_SYNC_LAST); - - if (nxt_fast_path(nb != NULL)) { - *tail = nb; - - } else { - hcp->error = 1; - } - - // STUB: hcp->chunk_error = 1; - // STUB: hcp->error = 1; - return out; } @@ -216,43 +207,35 @@ nxt_http_chunk_buffer(nxt_http_chunk_parse_t *hcp, nxt_buf_t ***tail, p = hcp->pos; size = in->mem.free - p; - if (hcp->chunk_size >= size && in->retain == 0) { - /* - * Use original buffer if the buffer is lesser than or equal - * to a chunk size and this is the first chunk in the buffer. - */ - in->mem.pos = p; - **tail = in; - *tail = &in->next; - - } else { - b = nxt_buf_mem_alloc(hcp->mem_pool, 0, 0); - if (nxt_slow_path(b == NULL)) { - return NXT_ERROR; - } + b = nxt_buf_mem_alloc(hcp->mem_pool, 0, 0); + if (nxt_slow_path(b == NULL)) { + return NXT_ERROR; + } - **tail = b; - *tail = &b->next; + **tail = b; + *tail = &b->next; - b->parent = in; - in->retain++; - b->mem.pos = p; - b->mem.start = p; + nxt_mp_retain(hcp->mem_pool); + b->completion_handler = nxt_http_chunk_buf_completion; - if (hcp->chunk_size < size) { - p += hcp->chunk_size; - hcp->pos = p; + b->parent = in; + in->retain++; + b->mem.pos = p; + b->mem.start = p; - b->mem.free = p; - b->mem.end = p; + if (hcp->chunk_size < size) { + p += hcp->chunk_size; + hcp->pos = p; - return NXT_HTTP_CHUNK_END; - } + b->mem.free = p; + b->mem.end = p; - b->mem.free = in->mem.free; - b->mem.end = in->mem.free; + return NXT_HTTP_CHUNK_END; } + b->mem.free = in->mem.free; + b->mem.end = in->mem.free; + hcp->chunk_size -= size; if (hcp->chunk_size == 0) { @@ -261,3 +244,31 @@ nxt_http_chunk_buffer(nxt_http_chunk_parse_t *hcp, nxt_buf_t ***tail, return NXT_HTTP_CHUNK_MIDDLE; } + + +static void +nxt_http_chunk_buf_completion(nxt_task_t *task, void *obj, void *data) +{ + nxt_mp_t *mp; + nxt_buf_t *b, *next, *parent; + + b = obj; + parent = data; + + nxt_debug(task, "buf completion: %p %p", b, b->mem.start); + + nxt_assert(data == b->parent); + + do { + next = b->next; + parent = b->parent; + mp = b->data; + + nxt_mp_free(mp, b); + nxt_mp_release(mp); + + nxt_buf_parent_completion(task, parent); + + b = next; + } while (b != NULL); +} diff --git a/src/nxt_http_parse.h b/src/nxt_http_parse.h index 0f888949..cbfc8433 100644 --- a/src/nxt_http_parse.h +++ b/src/nxt_http_parse.h @@ -90,6 +90,19 @@ struct nxt_http_field_s { }; +typedef struct { + u_char *pos; + nxt_mp_t *mem_pool; + + uint64_t chunk_size; + + uint8_t state; + uint8_t last; /* 1 bit */ + uint8_t chunk_error; /* 1 bit */ + uint8_t error; /* 1 bit */ +} nxt_http_chunk_parse_t; + + #define NXT_HTTP_FIELD_HASH_INIT 159406U #define nxt_http_field_hash_char(h, c) (((h) << 4) + (h) + (c)) #define nxt_http_field_hash_end(h) (((h) >> 16) ^ (h)) @@ -109,6 +122,9 @@ nxt_uint_t nxt_http_fields_hash_collisions(nxt_lvlhsh_t *hash, nxt_int_t nxt_http_fields_process(nxt_list_t *fields, nxt_lvlhsh_t *hash, void *ctx); +nxt_buf_t *nxt_http_chunk_parse(nxt_task_t *task, nxt_http_chunk_parse_t *hcp, + nxt_buf_t *in); + extern const nxt_lvlhsh_proto_t nxt_http_fields_hash_proto; diff --git a/src/nxt_http_proxy.c b/src/nxt_http_proxy.c index 893e9303..34d0f36e 100644 --- a/src/nxt_http_proxy.c +++ b/src/nxt_http_proxy.c @@ -27,8 +27,6 @@ static void nxt_http_proxy_header_send(nxt_task_t *task, void *obj, void *data); static void nxt_http_proxy_header_sent(nxt_task_t *task, void *obj, void *data); static void nxt_http_proxy_header_read(nxt_task_t *task, void *obj, void *data); static void nxt_http_proxy_send_body(nxt_task_t *task, void *obj, void *data); -static void nxt_http_proxy_request_send(nxt_task_t *task, - nxt_http_request_t *r, nxt_buf_t *out); static void nxt_http_proxy_read(nxt_task_t *task, void *obj, void *data); static void nxt_http_proxy_buf_mem_completion(nxt_task_t *task, void *obj, void *data); @@ -253,10 +251,6 @@ nxt_http_proxy_header_read(nxt_task_t *task, void *obj, void *data) nxt_debug(task, "http proxy status: %d", peer->status); - if (r->resp.content_length_n > 0) { - peer->remainder = r->resp.content_length_n; - } - nxt_list_each(field, peer->fields) { nxt_debug(task, "http proxy header: \"%*s: %*s\"", @@ -275,6 +269,8 @@ nxt_http_proxy_header_read(nxt_task_t *task, void *obj, void *data) } nxt_list_loop; + r->state = &nxt_http_proxy_read_state; + nxt_http_request_header_send(task, r, nxt_http_proxy_send_body, peer); } @@ -292,27 +288,13 @@ nxt_http_proxy_send_body(nxt_task_t *task, void *obj, void *data) if (out != NULL) { peer->body = NULL; - nxt_http_proxy_request_send(task, r, out); - } - - r->state = &nxt_http_proxy_read_state; - - nxt_http_proto[peer->protocol].peer_read(task, peer); -} - + nxt_http_request_send(task, r, out); -static void -nxt_http_proxy_request_send(nxt_task_t *task, nxt_http_request_t *r, - nxt_buf_t *out) -{ - size_t length; - - if (r->peer->remainder > 0) { - length = nxt_buf_chain_length(out); - r->peer->remainder -= length; } - nxt_http_request_send(task, r, out); + if (!peer->closed) { + nxt_http_proto[peer->protocol].peer_read(task, peer); + } } @@ -328,7 +310,6 @@ static void nxt_http_proxy_read(nxt_task_t *task, void *obj, void *data) { nxt_buf_t *out; - nxt_bool_t last; nxt_http_peer_t *peer; nxt_http_request_t *r; @@ -336,16 +317,15 @@ nxt_http_proxy_read(nxt_task_t *task, void *obj, void *data) peer = data; out = peer->body; peer->body = NULL; - last = nxt_buf_is_last(out); - nxt_http_proxy_request_send(task, r, out); + if (out != NULL) { + nxt_http_request_send(task, r, out); + } - if (!last) { + if (!peer->closed) { nxt_http_proto[peer->protocol].peer_read(task, peer); } else { - r->inconsistent = (peer->remainder != 0); - nxt_http_proto[peer->protocol].peer_close(task, peer); nxt_mp_release(r->mem_pool); @@ -422,7 +402,7 @@ nxt_http_proxy_error(nxt_task_t *task, void *obj, void *data) nxt_mp_release(r->mem_pool); - nxt_http_request_error(task, r, peer->status); + nxt_http_request_error(&r->task, r, peer->status); } |