diff options
author | Max Romanov <max.romanov@nginx.com> | 2017-08-11 18:04:04 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2017-08-11 18:04:04 +0300 |
commit | 39a6a4c973dd378f1fad9d2514d7857fe491df4b (patch) | |
tree | 220ea8605a9396a93fa7a459ff3e558fe1a044aa /src | |
parent | e1e808bd94609c80b4990939285d47f124bb2eef (diff) | |
download | unit-39a6a4c973dd378f1fad9d2514d7857fe491df4b.tar.gz unit-39a6a4c973dd378f1fad9d2514d7857fe491df4b.tar.bz2 |
Request body read state implemented.
With specific timeout and buffer size settings.
Diffstat (limited to 'src')
-rw-r--r-- | src/nginext/nxt_go_lib.c | 42 | ||||
-rw-r--r-- | src/nginext/nxt_go_lib.h | 7 | ||||
-rw-r--r-- | src/nginext/nxt_go_port.c | 7 | ||||
-rw-r--r-- | src/nginext/nxt_go_run_ctx.c | 185 | ||||
-rw-r--r-- | src/nginext/nxt_go_run_ctx.h | 2 | ||||
-rw-r--r-- | src/nginext/request.go | 6 | ||||
-rw-r--r-- | src/nxt_application.c | 170 | ||||
-rw-r--r-- | src/nxt_application.h | 18 | ||||
-rw-r--r-- | src/nxt_go.c | 9 | ||||
-rw-r--r-- | src/nxt_php_sapi.c | 67 | ||||
-rw-r--r-- | src/nxt_port_memory.c | 34 | ||||
-rw-r--r-- | src/nxt_port_memory.h | 2 | ||||
-rw-r--r-- | src/nxt_python_wsgi.c | 69 | ||||
-rw-r--r-- | src/nxt_router.c | 242 | ||||
-rw-r--r-- | src/nxt_router.h | 4 |
15 files changed, 610 insertions, 254 deletions
diff --git a/src/nginext/nxt_go_lib.c b/src/nginext/nxt_go_lib.c index 87f583d7..6d5d2a03 100644 --- a/src/nginext/nxt_go_lib.c +++ b/src/nginext/nxt_go_lib.c @@ -17,14 +17,14 @@ nxt_go_response_write(nxt_go_request_t r, void *buf, size_t len) } int -nxt_go_request_read(nxt_go_request_t r, off_t off, void *dst, size_t dst_len) +nxt_go_request_read(nxt_go_request_t r, void *dst, size_t dst_len) { return -1; } int -nxt_go_request_read_from(nxt_go_request_t r, off_t off, void *dst, - size_t dst_len, void *src, size_t src_len) +nxt_go_request_read_from(nxt_go_request_t r, void *dst, size_t dst_len, + void *src, size_t src_len) { return -1; } @@ -71,7 +71,7 @@ nxt_go_response_write(nxt_go_request_t r, void *buf, size_t len) return 0; } - nxt_go_debug("write: %d %.*s", (int) len, (int) len, (char *) buf); + nxt_go_debug("write: %d", (int) len); ctx = (nxt_go_run_ctx_t *) r; rc = nxt_go_ctx_write(ctx, buf, len); @@ -81,44 +81,30 @@ nxt_go_response_write(nxt_go_request_t r, void *buf, size_t len) int -nxt_go_request_read(nxt_go_request_t r, off_t off, void *dst, size_t dst_len) +nxt_go_request_read(nxt_go_request_t r, void *dst, size_t dst_len) { - nxt_go_msg_t *msg; - nxt_go_run_ctx_t *ctx; - nxt_app_request_body_t *b; - nxt_app_request_header_t *h; + size_t res; + nxt_go_run_ctx_t *ctx; if (nxt_slow_path(r == 0)) { return 0; } ctx = (nxt_go_run_ctx_t *) r; - b = &ctx->r.body; - h = &ctx->r.header; - if (off >= h->parsed_content_length) { - return 0; - } + dst_len = nxt_min(dst_len, ctx->r.body.preread_size); - if (off < b->preread.length) { - dst_len = nxt_min(b->preread.length - off, dst_len); + res = nxt_go_ctx_read_raw(ctx, dst, dst_len); - if (dst_len != 0) { - nxt_memcpy(dst, b->preread.start + off, dst_len); - } + ctx->r.body.preread_size -= res; - return dst_len; - } - - /* TODO find msg to read */ - - return NXT_AGAIN; + return res; } int -nxt_go_request_read_from(nxt_go_request_t r, off_t off, void *dst, - size_t dst_len, void *src, size_t src_len) +nxt_go_request_read_from(nxt_go_request_t r, void *dst, size_t dst_len, + void *src, size_t src_len) { nxt_go_run_ctx_t *ctx; @@ -130,7 +116,7 @@ nxt_go_request_read_from(nxt_go_request_t r, off_t off, void *dst, nxt_go_ctx_add_msg(ctx, src, src_len); - return nxt_go_request_read(r, off, dst, dst_len); + return nxt_go_request_read(r, dst, dst_len); } diff --git a/src/nginext/nxt_go_lib.h b/src/nginext/nxt_go_lib.h index 220fc9b3..b3a86be9 100644 --- a/src/nginext/nxt_go_lib.h +++ b/src/nginext/nxt_go_lib.h @@ -21,11 +21,10 @@ typedef uintptr_t nxt_go_request_t; int nxt_go_response_write(nxt_go_request_t r, void *buf, size_t len); -int nxt_go_request_read(nxt_go_request_t r, off_t off, void *dst, - size_t dst_len); +int nxt_go_request_read(nxt_go_request_t r, void *dst, size_t dst_len); -int nxt_go_request_read_from(nxt_go_request_t r, off_t off, void *dst, - size_t dst_len, void *src, size_t src_len); +int nxt_go_request_read_from(nxt_go_request_t r, void *dst, size_t dst_len, + void *src, size_t src_len); int nxt_go_request_close(nxt_go_request_t r); diff --git a/src/nginext/nxt_go_port.c b/src/nginext/nxt_go_port.c index af50b860..fca3cf9a 100644 --- a/src/nginext/nxt_go_port.c +++ b/src/nginext/nxt_go_port.c @@ -83,22 +83,23 @@ nxt_go_data_handler(nxt_port_msg_t *port_msg, size_t size) do { rc = nxt_go_ctx_read_str(ctx, &n); - rc = nxt_go_ctx_read_str(ctx, &v); if (n.length == 0) { break; } + rc = nxt_go_ctx_read_str(ctx, &v); nxt_go_request_add_header(r, nxt_go_str(&n), nxt_go_str(&v)); } while(1); - ctx->r.body.preread = v; + nxt_go_ctx_read_size(ctx, &s); + ctx->r.body.preread_size = s; if (h->parsed_content_length > 0) { nxt_go_request_set_content_length(r, h->parsed_content_length); } - if (v.length < h->parsed_content_length) { + if (ctx->r.body.preread_size < h->parsed_content_length) { nxt_go_request_create_channel(r); } diff --git a/src/nginext/nxt_go_run_ctx.c b/src/nginext/nxt_go_run_ctx.c index c6481b95..cca8273e 100644 --- a/src/nginext/nxt_go_run_ctx.c +++ b/src/nginext/nxt_go_run_ctx.c @@ -186,7 +186,7 @@ nxt_go_ctx_add_msg(nxt_go_run_ctx_t *ctx, nxt_port_msg_t *port_msg, size_t size) msg->start_offset = ctx->msg_last->start_offset; if (ctx->msg_last == &ctx->msg) { - msg->start_offset += ctx->r.body.preread.length; + msg->start_offset += ctx->r.body.preread_size; } else { msg->start_offset += ctx->msg_last->data_size; } @@ -227,8 +227,8 @@ nxt_go_ctx_flush(nxt_go_run_ctx_t *ctx, int last) } -nxt_int_t -nxt_go_ctx_write(nxt_go_run_ctx_t *ctx, void *data, size_t len) +nxt_buf_t * +nxt_go_port_mmap_get_buf(nxt_go_run_ctx_t *ctx, size_t size) { size_t nchunks; nxt_buf_t *buf; @@ -237,30 +237,16 @@ nxt_go_ctx_write(nxt_go_run_ctx_t *ctx, void *data, size_t len) nxt_port_mmap_msg_t *mmap_msg; nxt_port_mmap_header_t *hdr; - buf = &ctx->wbuf; - - if (ctx->nwbuf > 0 && nxt_buf_mem_free_size(&buf->mem) >= len) { - memcpy(buf->mem.free, data, len); - buf->mem.free += len; - - mmap_msg = ctx->wmmap_msg + ctx->nwbuf - 1; - mmap_msg->size += len; - - return NXT_OK; - } - - if (ctx->nwbuf >= 8) { - nxt_go_ctx_flush(ctx, 0); - } - c = 0; + buf = &ctx->wbuf; + hdr = nxt_go_port_mmap_get(ctx->process, ctx->msg.port_msg->reply_port, &c); if (nxt_slow_path(hdr == NULL)) { nxt_go_warn("failed to get port_mmap"); - return NXT_ERROR; + return NULL; } buf->mem.start = nxt_port_mmap_chunk_start(hdr, c); @@ -268,12 +254,15 @@ nxt_go_ctx_write(nxt_go_run_ctx_t *ctx, void *data, size_t len) buf->mem.free = buf->mem.start; buf->mem.end = buf->mem.start + PORT_MMAP_CHUNK_SIZE; + buf->parent = hdr; + mmap_msg = ctx->wmmap_msg + ctx->nwbuf; mmap_msg->mmap_id = hdr->id; mmap_msg->chunk_id = c; + mmap_msg->size = 0; - nchunks = len / PORT_MMAP_CHUNK_SIZE; - if ((len % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) { + nchunks = size / PORT_MMAP_CHUNK_SIZE; + if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) { nchunks++; } @@ -283,27 +272,124 @@ nxt_go_ctx_write(nxt_go_run_ctx_t *ctx, void *data, size_t len) /* Try to acquire as much chunks as required. */ while (nchunks > 0) { - if (nxt_port_mmap_get_chunk_busy(hdr, c)) { + if (nxt_port_mmap_chk_set_chunk_busy(hdr, c) == 0) { break; } - nxt_port_mmap_set_chunk_busy(hdr, c); buf->mem.end += PORT_MMAP_CHUNK_SIZE; c++; nchunks--; } - if (nxt_buf_mem_free_size(&buf->mem) < len) { - len = nxt_buf_mem_free_size(&buf->mem); + ctx->nwbuf++; + + return buf; +} + +nxt_int_t +nxt_go_port_mmap_increase_buf(nxt_buf_t *b, size_t size, size_t min_size) +{ + size_t nchunks, free_size; + nxt_chunk_id_t c, start; + nxt_port_mmap_header_t *hdr; + + free_size = nxt_buf_mem_free_size(&b->mem); + + if (nxt_slow_path(size <= free_size)) { + return NXT_OK; } - memcpy(buf->mem.free, data, len); - buf->mem.free += len; + hdr = b->parent; - mmap_msg->size = len; + start = nxt_port_mmap_chunk_id(hdr, b->mem.end); - ctx->nwbuf++; + size -= free_size; + + nchunks = size / PORT_MMAP_CHUNK_SIZE; + if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) { + nchunks++; + } + + c = start; + + /* Try to acquire as much chunks as required. */ + while (nchunks > 0) { + + if (nxt_port_mmap_chk_set_chunk_busy(hdr, c) == 0) { + break; + } + + c++; + nchunks--; + } + + if (nchunks != 0 && + min_size > free_size + PORT_MMAP_CHUNK_SIZE * (c - start)) { + + c--; + while (c >= start) { + nxt_port_mmap_set_chunk_free(hdr, c); + c--; + } + + return NXT_ERROR; + } else { + b->mem.end += PORT_MMAP_CHUNK_SIZE * (c - start); + + return NXT_OK; + } +} + + +nxt_int_t +nxt_go_ctx_write(nxt_go_run_ctx_t *ctx, void *data, size_t len) +{ + size_t free_size, copy_size; + nxt_buf_t *buf; + nxt_port_mmap_msg_t *mmap_msg; + + buf = &ctx->wbuf; + + while (len > 0) { + if (ctx->nwbuf == 0) { + buf = nxt_go_port_mmap_get_buf(ctx, len); + + if (nxt_slow_path(buf == NULL)) { + return NXT_ERROR; + } + } + + do { + free_size = nxt_buf_mem_free_size(&buf->mem); + + if (free_size > 0) { + copy_size = nxt_min(free_size, len); + + buf->mem.free = nxt_cpymem(buf->mem.free, data, copy_size); + + mmap_msg = ctx->wmmap_msg + ctx->nwbuf - 1; + mmap_msg->size += copy_size; + + len -= copy_size; + data = nxt_pointer_to(data, copy_size); + + if (len == 0) { + return NXT_OK; + } + } + } while (nxt_go_port_mmap_increase_buf(buf, len, 1) == NXT_OK); + + if (ctx->nwbuf >= 8) { + nxt_go_ctx_flush(ctx, 0); + } + + buf = nxt_go_port_mmap_get_buf(ctx, len); + + if (nxt_slow_path(buf == NULL)) { + return NXT_ERROR; + } + } return NXT_OK; } @@ -403,4 +489,43 @@ nxt_go_ctx_read_str(nxt_go_run_ctx_t *ctx, nxt_str_t *str) } +size_t +nxt_go_ctx_read_raw(nxt_go_run_ctx_t *ctx, void *dst, size_t size) +{ + size_t res, read_size; + nxt_int_t rc; + nxt_buf_t *buf; + + res = 0; + + while (size > 0) { + buf = &ctx->rbuf; + + if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) == 0)) { + ctx->nrbuf++; + rc = nxt_go_ctx_init_rbuf(ctx); + if (nxt_slow_path(rc != NXT_OK)) { + nxt_go_warn("read raw: init rbuf failed"); + return res; + } + + continue; + } + + read_size = nxt_buf_mem_used_size(&buf->mem); + read_size = nxt_min(read_size, size); + + dst = nxt_cpymem(dst, buf->mem.pos, read_size); + + size -= read_size; + buf->mem.pos += read_size; + res += read_size; + } + + nxt_go_debug("read_raw: %d", (int) res); + + return res; +} + + #endif /* NXT_CONFIGURE */ diff --git a/src/nginext/nxt_go_run_ctx.h b/src/nginext/nxt_go_run_ctx.h index c7c3da15..4ead1df5 100644 --- a/src/nginext/nxt_go_run_ctx.h +++ b/src/nginext/nxt_go_run_ctx.h @@ -68,5 +68,7 @@ nxt_int_t nxt_go_ctx_read_size(nxt_go_run_ctx_t *ctx, size_t *size); nxt_int_t nxt_go_ctx_read_str(nxt_go_run_ctx_t *ctx, nxt_str_t *str); +size_t nxt_go_ctx_read_raw(nxt_go_run_ctx_t *ctx, void *dst, size_t size); + #endif /* _NXT_GO_RUN_CTX_H_INCLUDED_ */ diff --git a/src/nginext/request.go b/src/nginext/request.go index 8667a074..1679f1c7 100644 --- a/src/nginext/request.go +++ b/src/nginext/request.go @@ -21,7 +21,6 @@ type request struct { resp *response c_req C.nxt_go_request_t id C.uint32_t - read_pos C.off_t msgs []*cmsg ch chan *cmsg } @@ -29,18 +28,17 @@ type request struct { func (r *request) Read(p []byte) (n int, err error) { c := C.size_t(cap(p)) b := C.malloc(c) - res := C.nxt_go_request_read(r.c_req, r.read_pos, b, c) + res := C.nxt_go_request_read(r.c_req, b, c) if res == -2 /* NXT_AGAIN */ { m := <-r.ch - res = C.nxt_go_request_read_from(r.c_req, r.read_pos, b, c, m.buf.b, m.buf.s) + res = C.nxt_go_request_read_from(r.c_req, b, c, m.buf.b, m.buf.s) r.push(m) } if res > 0 { copy(p, C.GoBytes(b, res)) - r.read_pos += C.off_t(res) } C.free(b) diff --git a/src/nxt_application.c b/src/nxt_application.c index 50feac2d..1b23eca3 100644 --- a/src/nxt_application.c +++ b/src/nxt_application.c @@ -154,7 +154,7 @@ nxt_app_msg_write_get_buf(nxt_task_t *task, nxt_app_wmsg_t *msg, size_t size) return res; } - if (nxt_port_mmap_increase_buf(task, b, size) == NXT_OK) { + if (nxt_port_mmap_increase_buf(task, b, size, size) == NXT_OK) { res = b->mem.free; b->mem.free += size; @@ -306,6 +306,43 @@ nxt_app_msg_read_str(nxt_task_t *task, nxt_app_rmsg_t *msg, nxt_str_t *str) } +size_t +nxt_app_msg_read_raw(nxt_task_t *task, nxt_app_rmsg_t *msg, void *dst, + size_t size) +{ + size_t res, read_size; + nxt_buf_t *buf; + + res = 0; + + while (size > 0) { + buf = msg->buf; + + if (nxt_slow_path(buf == NULL)) { + break; + } + + if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) == 0)) { + msg->buf = buf->next; + continue; + } + + read_size = nxt_buf_mem_used_size(&buf->mem); + read_size = nxt_min(read_size, size); + + dst = nxt_cpymem(dst, buf->mem.pos, read_size); + + size -= read_size; + buf->mem.pos += read_size; + res += read_size; + } + + nxt_debug(task, "nxt_read_raw: %uz", res); + + return res; +} + + nxt_int_t nxt_app_msg_read_nvp(nxt_task_t *task, nxt_app_rmsg_t *rmsg, nxt_str_t *n, nxt_str_t *v) @@ -459,7 +496,7 @@ nxt_app_http_req_init(nxt_task_t *task, nxt_app_parse_ctx_t *ctx) nxt_int_t -nxt_app_http_req_parse(nxt_task_t *task, nxt_app_parse_ctx_t *ctx, +nxt_app_http_req_header_parse(nxt_task_t *task, nxt_app_parse_ctx_t *ctx, nxt_buf_t *buf) { nxt_int_t rc; @@ -471,50 +508,76 @@ nxt_app_http_req_parse(nxt_task_t *task, nxt_app_parse_ctx_t *ctx, b = &ctx->r.body; h = &ctx->r.header; - if (h->done == 0) { - rc = nxt_http_parse_request(p, &buf->mem); + nxt_assert(h->done == 0); - if (nxt_slow_path(rc != NXT_DONE)) { - return rc; - } + rc = nxt_http_parse_request(p, &buf->mem); - rc = nxt_http_fields_process(p->fields, ctx, task->log); + if (nxt_slow_path(rc != NXT_DONE)) { + return rc; + } - if (nxt_slow_path(rc != NXT_OK)) { - return rc; - } + rc = nxt_http_fields_process(p->fields, ctx, task->log); - h->fields = p->fields; - h->done = 1; + if (nxt_slow_path(rc != NXT_OK)) { + return rc; + } - h->version.start = p->version.str; - h->version.length = nxt_strlen(p->version.str); + h->fields = p->fields; + h->done = 1; - h->method = p->method; + h->version.start = p->version.str; + h->version.length = nxt_strlen(p->version.str); - h->target.start = p->target_start; - h->target.length = p->target_end - p->target_start; + h->method = p->method; - h->path = p->path; - h->query = p->args; + h->target.start = p->target_start; + h->target.length = p->target_end - p->target_start; - if (h->parsed_content_length == 0) { - b->done = 1; - } - } + h->path = p->path; + h->query = p->args; - if (b->done == 0) { - b->preread.length = buf->mem.free - buf->mem.pos; - b->preread.start = buf->mem.pos; + if (h->parsed_content_length == 0) { + b->done = 1; - b->done = b->preread.length >= (size_t) h->parsed_content_length; } - if (h->done == 1 && b->done == 1) { + if (buf->mem.free == buf->mem.pos) { return NXT_DONE; } - return NXT_AGAIN; + b->buf = buf; + b->done = nxt_buf_mem_used_size(&buf->mem) >= + h->parsed_content_length; + + if (b->done == 1) { + b->preread_size = nxt_buf_mem_used_size(&buf->mem); + } + + return NXT_DONE; +} + + +nxt_int_t +nxt_app_http_req_body_read(nxt_task_t *task, nxt_app_parse_ctx_t *ctx, + nxt_buf_t *buf) +{ + nxt_app_request_body_t *b; + nxt_app_request_header_t *h; + + b = &ctx->r.body; + h = &ctx->r.header; + + nxt_assert(h->done == 1); + nxt_assert(b->done == 0); + + b->done = nxt_buf_mem_used_size(&buf->mem) + b->preread_size >= + (size_t) h->parsed_content_length; + + if (b->done == 1) { + b->preread_size += nxt_buf_mem_used_size(&buf->mem); + } + + return b->done == 1 ? NXT_DONE : NXT_AGAIN; } @@ -571,17 +634,48 @@ nxt_int_t nxt_app_msg_write_raw(nxt_task_t *task, nxt_app_wmsg_t *msg, const u_char *c, size_t size) { - u_char *dst; + size_t free_size, copy_size; + nxt_buf_t *b; + nxt_port_t *port; - dst = nxt_app_msg_write_get_buf(task, msg, size); - if (nxt_slow_path(dst == NULL)) { - return NXT_ERROR; - } + nxt_debug(task, "nxt_app_msg_write_raw: %uz", size); + + while (size > 0) { + b = *msg->buf; + + if (b == NULL) { + port = nxt_app_msg_get_port(task, msg); + if (nxt_slow_path(port == NULL)) { + return NXT_ERROR; + } + + b = nxt_port_mmap_get_buf(task, port, size); + if (nxt_slow_path(b == NULL)) { + return NXT_ERROR; + } + + *msg->buf = b; + } + + do { + free_size = nxt_buf_mem_free_size(&b->mem); + + if (free_size > 0) { + copy_size = nxt_min(free_size, size); - nxt_memcpy(dst, c, size); + b->mem.free = nxt_cpymem(b->mem.free, c, copy_size); - nxt_debug(task, "nxt_app_msg_write_raw: %d %*s", (int)size, - (int)size, c); + size -= copy_size; + c += copy_size; + + if (size == 0) { + return NXT_OK; + } + } + } while (nxt_port_mmap_increase_buf(task, b, size, 1) == NXT_OK); + + msg->buf = &b->next; + } return NXT_OK; } diff --git a/src/nxt_application.h b/src/nxt_application.h index f27f90cd..9efb5008 100644 --- a/src/nxt_application.h +++ b/src/nxt_application.h @@ -84,12 +84,17 @@ typedef struct { off_t parsed_content_length; nxt_bool_t done; + + size_t bufs; + nxt_buf_t *buf; } nxt_app_request_header_t; typedef struct { - nxt_str_t preread; + size_t preread_size; nxt_bool_t done; + + nxt_buf_t *buf; } nxt_app_request_body_t; @@ -112,8 +117,12 @@ struct nxt_app_parse_ctx_s { nxt_int_t nxt_app_http_req_init(nxt_task_t *task, nxt_app_parse_ctx_t *ctx); -nxt_int_t nxt_app_http_req_parse(nxt_task_t *task, nxt_app_parse_ctx_t *ctx, - nxt_buf_t *buf); +nxt_int_t nxt_app_http_req_header_parse(nxt_task_t *task, + nxt_app_parse_ctx_t *ctx, nxt_buf_t *buf); + +nxt_int_t nxt_app_http_req_body_read(nxt_task_t *task, + nxt_app_parse_ctx_t *ctx, nxt_buf_t *buf); + nxt_int_t nxt_app_http_req_done(nxt_task_t *task, nxt_app_parse_ctx_t *ctx); @@ -178,6 +187,9 @@ nxt_int_t nxt_app_msg_write_raw(nxt_task_t *task, nxt_app_wmsg_t *msg, nxt_int_t nxt_app_msg_read_str(nxt_task_t *task, nxt_app_rmsg_t *msg, nxt_str_t *str); +size_t nxt_app_msg_read_raw(nxt_task_t *task, nxt_app_rmsg_t *msg, void *buf, + size_t size); + nxt_int_t nxt_app_msg_read_nvp(nxt_task_t *task, nxt_app_rmsg_t *rmsg, nxt_str_t *n, nxt_str_t *v); diff --git a/src/nxt_go.c b/src/nxt_go.c index 17240804..b821cced 100644 --- a/src/nxt_go.c +++ b/src/nxt_go.c @@ -116,6 +116,7 @@ static nxt_int_t nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, nxt_app_wmsg_t *wmsg) { nxt_int_t rc; + nxt_buf_t *b; nxt_http_field_t *field; nxt_app_request_header_t *h; @@ -168,7 +169,13 @@ nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, nxt_app_wmsg_t *wmsg) /* end-of-headers mark */ NXT_WRITE(&eof); - NXT_WRITE(&r->body.preread); + + RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size)); + + for(b = r->body.buf; b != NULL; b = b->next) { + RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos, + nxt_buf_mem_used_size(&b->mem))); + } #undef NXT_WRITE #undef RC diff --git a/src/nxt_php_sapi.c b/src/nxt_php_sapi.c index 0f638201..4f8f4696 100644 --- a/src/nxt_php_sapi.c +++ b/src/nxt_php_sapi.c @@ -128,6 +128,8 @@ typedef struct { nxt_str_t script; nxt_app_wmsg_t *wmsg; nxt_mp_t *mem_pool; + + size_t body_preread_size; } nxt_php_run_ctx_t; nxt_inline nxt_int_t nxt_php_write(nxt_php_run_ctx_t *ctx, @@ -342,8 +344,6 @@ nxt_php_read_request(nxt_task_t *task, nxt_app_rmsg_t *rmsg, RC(nxt_app_msg_read_size(task, rmsg, &s)); h->parsed_content_length = s; - NXT_READ(&ctx->r.body.preread); - #undef NXT_READ #undef RC @@ -361,6 +361,7 @@ nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, nxt_app_wmsg_t *wmsg) { nxt_int_t rc; + nxt_buf_t *b; nxt_http_field_t *field; nxt_app_request_header_t *h; @@ -413,8 +414,6 @@ nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, RC(nxt_app_msg_write_size(task, wmsg, h->parsed_content_length)); - NXT_WRITE(&r->body.preread); - nxt_list_each(field, h->fields) { RC(nxt_app_msg_write_prefixed_upcase(task, wmsg, &prefix, &field->name)); @@ -425,6 +424,13 @@ nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, /* end-of-headers mark */ NXT_WRITE(&eof); + RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size)); + + for(b = r->body.buf; b != NULL; b = b->next) { + RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos, + nxt_buf_mem_used_size(&b->mem))); + } + #undef NXT_WRITE #undef RC @@ -673,23 +679,14 @@ static int nxt_php_read_post(char *buffer, uint count_bytes TSRMLS_DC) #endif { - off_t rest; - size_t size; -/* - ssize_t n; - nxt_err_t err; - nxt_php_ctx_t *ctx; - nxt_app_request_t *r; -*/ + size_t size, rest; nxt_php_run_ctx_t *ctx; - nxt_app_request_body_t *b; nxt_app_request_header_t *h; ctx = SG(server_context); h = &ctx->r.header; - b = &ctx->r.body; - rest = h->parsed_content_length - SG(read_post_bytes); + rest = (size_t) h->parsed_content_length - SG(read_post_bytes); nxt_debug(ctx->task, "nxt_php_read_post %O", rest); @@ -697,43 +694,11 @@ nxt_php_read_post(char *buffer, uint count_bytes TSRMLS_DC) return 0; } - size = 0; -#ifdef NXT_PHP7 - count_bytes = (size_t) nxt_min(rest, (off_t) count_bytes); -#else - count_bytes = (uint) nxt_min(rest, (off_t) count_bytes); -#endif - - if (b->preread.length != 0) { - size = nxt_min(b->preread.length, count_bytes); - - nxt_memcpy(buffer, b->preread.start, size); + rest = nxt_min(ctx->body_preread_size, (size_t) count_bytes); + size = nxt_app_msg_read_raw(ctx->task, ctx->rmsg, buffer, rest); - b->preread.length -= size; - b->preread.start += size; + ctx->body_preread_size -= size; - if (size == count_bytes) { - return size; - } - } - -#if 0 - nxt_debug(ctx->task, "recv %z", (size_t) count_bytes - size); - - n = recv(r->event_conn->socket.fd, buffer + size, count_bytes - size, 0); - - if (nxt_slow_path(n <= 0)) { - err = (n == 0) ? 0 : nxt_socket_errno; - - nxt_log_error(NXT_LOG_ERR, r->log, "recv(%d, %uz) failed %E", - r->event_conn->socket.fd, (size_t) count_bytes - size, - err); - - return size; - } - - return size + n; -#endif return size; } @@ -868,6 +833,8 @@ nxt_php_register_variables(zval *track_vars_array TSRMLS_DC) NXT_PHP_SET(n.start, v); } + nxt_app_msg_read_size(task, ctx->rmsg, &ctx->body_preread_size); + #undef NXT_PHP_SET } diff --git a/src/nxt_port_memory.c b/src/nxt_port_memory.c index e714b654..afb2f4a4 100644 --- a/src/nxt_port_memory.c +++ b/src/nxt_port_memory.c @@ -95,8 +95,6 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data) b = obj; - nxt_debug(task, "mmap buf completion: %p %p", b, b->mem.start); - mp = b->data; #if (NXT_DEBUG) @@ -125,6 +123,10 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data) nxt_port_mmap_free_junk(p, b->mem.end - p); + nxt_debug(task, "mmap buf completion: %p [%p,%d] (sent=%d), %PI,%d,%d", b, + b->mem.start, b->mem.end - b->mem.start, b->is_port_mmap_sent, + hdr->pid, hdr->id, c); + while (p < b->mem.end) { nxt_port_mmap_set_chunk_free(hdr, c); @@ -414,11 +416,6 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size) nxt_debug(task, "request %z bytes shm buffer", size); - if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) { - nxt_debug(task, "requested size (%z bytes) too big", size); - return NULL; - } - b = nxt_buf_mem_ts_alloc(task, port->mem_pool, 0); if (nxt_slow_path(b == NULL)) { return NULL; @@ -445,6 +442,10 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size) nchunks++; } + nxt_debug(task, "outgoing mmap buf allocation: %p [%p,%d] %PI,%d,%d", b, + b->mem.start, b->mem.end - b->mem.start, + hdr->pid, hdr->id, c); + c++; nchunks--; @@ -465,9 +466,10 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size) nxt_int_t -nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size) +nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size, + size_t min_size) { - size_t nchunks; + size_t nchunks, free_size; nxt_chunk_id_t c, start; nxt_port_mmap_header_t *hdr; @@ -479,7 +481,9 @@ nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size) return NXT_ERROR; } - if (nxt_slow_path(size <= (size_t) nxt_buf_mem_free_size(&b->mem))) { + free_size = nxt_buf_mem_free_size(&b->mem); + + if (nxt_slow_path(size <= free_size)) { return NXT_OK; } @@ -487,7 +491,7 @@ nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size) start = nxt_port_mmap_chunk_id(hdr, b->mem.end); - size -= nxt_buf_mem_free_size(&b->mem); + size -= free_size; nchunks = size / PORT_MMAP_CHUNK_SIZE; if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) { @@ -507,7 +511,9 @@ nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size) nchunks--; } - if (nchunks != 0) { + if (nchunks != 0 && + min_size > free_size + PORT_MMAP_CHUNK_SIZE * (c - start)) { + c--; while (c >= start) { nxt_port_mmap_set_chunk_free(hdr, c); @@ -559,6 +565,10 @@ nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port, b->parent = hdr; + nxt_debug(task, "incoming mmap buf allocation: %p [%p,%d] %PI,%d,%d", b, + b->mem.start, b->mem.end - b->mem.start, + hdr->pid, hdr->id, mmap_msg->chunk_id); + return b; } diff --git a/src/nxt_port_memory.h b/src/nxt_port_memory.h index 0b64fa89..ea51d001 100644 --- a/src/nxt_port_memory.h +++ b/src/nxt_port_memory.h @@ -28,7 +28,7 @@ nxt_buf_t * nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size); nxt_int_t nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, - size_t size); + size_t size, size_t min_size); nxt_port_mmap_header_t * nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process, diff --git a/src/nxt_python_wsgi.c b/src/nxt_python_wsgi.c index 1dcf8bf7..f1bd170f 100644 --- a/src/nxt_python_wsgi.c +++ b/src/nxt_python_wsgi.c @@ -57,6 +57,7 @@ typedef struct { //nxt_app_request_t *request; } nxt_py_error_t; +typedef struct nxt_python_run_ctx_s nxt_python_run_ctx_t; static nxt_int_t nxt_python_init(nxt_task_t *task, nxt_common_app_conf_t *conf); @@ -68,7 +69,7 @@ static nxt_int_t nxt_python_run(nxt_task_t *task, static PyObject *nxt_python_create_environ(nxt_task_t *task); static PyObject *nxt_python_get_environ(nxt_task_t *task, - nxt_app_rmsg_t *rmsg); + nxt_app_rmsg_t *rmsg, nxt_python_run_ctx_t *ctx); static PyObject *nxt_py_start_resp(PyObject *self, PyObject *args); @@ -77,11 +78,13 @@ static PyObject *nxt_py_input_read(nxt_py_input_t *self, PyObject *args); static PyObject *nxt_py_input_readline(nxt_py_input_t *self, PyObject *args); static PyObject *nxt_py_input_readlines(nxt_py_input_t *self, PyObject *args); -typedef struct { +struct nxt_python_run_ctx_s { nxt_task_t *task; nxt_app_rmsg_t *rmsg; nxt_app_wmsg_t *wmsg; -} nxt_python_run_ctx_t; + + size_t body_preread_size; +}; nxt_inline nxt_int_t nxt_python_write(nxt_python_run_ctx_t *ctx, const u_char *data, size_t len, @@ -171,8 +174,6 @@ static PyObject *nxt_py_application; static PyObject *nxt_py_start_resp_obj; static PyObject *nxt_py_environ_ptyp; -static nxt_str_t nxt_python_request_body; - static nxt_python_run_ctx_t *nxt_python_run_ctx; @@ -323,6 +324,7 @@ nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, nxt_app_wmsg_t *wmsg) { nxt_int_t rc; + nxt_buf_t *b; nxt_http_field_t *field; nxt_app_request_header_t *h; @@ -369,14 +371,20 @@ nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, nxt_list_each(field, h->fields) { RC(nxt_app_msg_write_prefixed_upcase(task, wmsg, - &prefix, &field->name)); + &prefix, &field->name)); NXT_WRITE(&field->value); } nxt_list_loop; /* end-of-headers mark */ NXT_WRITE(&eof); - NXT_WRITE(&r->body.preread); + + RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size)); + + for(b = r->body.buf; b != NULL; b = b->next) { + RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos, + nxt_buf_mem_used_size(&b->mem))); + } #undef NXT_WRITE #undef RC @@ -395,9 +403,9 @@ nxt_python_run(nxt_task_t *task, nxt_app_rmsg_t *rmsg, nxt_app_wmsg_t *wmsg) u_char *buf; size_t size; PyObject *result, *iterator, *item, *args, *environ; - nxt_python_run_ctx_t run_ctx = {task, rmsg, wmsg}; + nxt_python_run_ctx_t run_ctx = {task, rmsg, wmsg, 0}; - environ = nxt_python_get_environ(task, rmsg); + environ = nxt_python_get_environ(task, rmsg, &run_ctx); if (nxt_slow_path(environ == NULL)) { return NXT_ERROR; @@ -465,8 +473,8 @@ nxt_python_run(nxt_task_t *task, nxt_app_rmsg_t *rmsg, nxt_app_wmsg_t *wmsg) size = PyBytes_GET_SIZE(item); buf = (u_char *) PyBytes_AS_STRING(item); - nxt_debug(task, "nxt_app_write(fake): %d %*s", (int)size, (int)size, - buf); + nxt_debug(task, "nxt_app_write(fake): %uz", size); + nxt_python_write(&run_ctx, buf, size, 1, 0); Py_DECREF(item); @@ -688,7 +696,8 @@ nxt_python_read_add_env(nxt_task_t *task, nxt_app_rmsg_t *rmsg, static PyObject * -nxt_python_get_environ(nxt_task_t *task, nxt_app_rmsg_t *rmsg) +nxt_python_get_environ(nxt_task_t *task, nxt_app_rmsg_t *rmsg, + nxt_python_run_ctx_t *ctx) { size_t s; u_char *colon; @@ -774,22 +783,24 @@ nxt_python_get_environ(nxt_task_t *task, nxt_app_rmsg_t *rmsg) NXT_READ("CONTENT_TYPE"); NXT_READ("CONTENT_LENGTH"); - while ( (rc = nxt_app_msg_read_nvp(task, rmsg, &n, &v)) == NXT_OK) { + while (nxt_app_msg_read_str(task, rmsg, &n) == NXT_OK) { if (nxt_slow_path(n.length == 0)) { - rc = NXT_DONE; + break; + } + + rc = nxt_app_msg_read_str(task, rmsg, &v); + if (nxt_slow_path(rc != NXT_OK)) { break; } RC(nxt_python_add_env(task, environ, (char *) n.start, &v)); } + RC(nxt_app_msg_read_size(task, rmsg, &ctx->body_preread_size)); + #undef NXT_READ #undef RC - if (rc == NXT_DONE && v.length > 0) { - nxt_python_request_body = v; - } - return environ; fail: @@ -900,11 +911,15 @@ static PyObject * nxt_py_input_read(nxt_py_input_t *self, PyObject *args) { u_char *buf; + size_t copy_size; PyObject *body, *obj; Py_ssize_t size; nxt_uint_t n; + nxt_python_run_ctx_t *ctx; + + ctx = nxt_python_run_ctx; - size = nxt_python_request_body.length; + size = ctx->body_preread_size; n = PyTuple_GET_SIZE(args); @@ -926,8 +941,8 @@ nxt_py_input_read(nxt_py_input_t *self, PyObject *args) "the read body size cannot be zero or less"); } - if (size == 0 || size > (Py_ssize_t) nxt_python_request_body.length) { - size = nxt_python_request_body.length; + if (size == 0 || size > (Py_ssize_t) ctx->body_preread_size) { + size = ctx->body_preread_size; } } @@ -937,16 +952,12 @@ nxt_py_input_read(nxt_py_input_t *self, PyObject *args) return NULL; } - if (size > 0) { - buf = (u_char *) PyBytes_AS_STRING(body); + buf = (u_char *) PyBytes_AS_STRING(body); - nxt_memcpy(buf, nxt_python_request_body.start, size); + copy_size = nxt_min((size_t) size, ctx->body_preread_size); + copy_size = nxt_app_msg_read_raw(ctx->task, ctx->rmsg, buf, copy_size); - nxt_python_request_body.start += size; - nxt_python_request_body.length -= size; - - /* TODO wait body */ - } + ctx->body_preread_size -= copy_size; return body; } diff --git a/src/nxt_router.c b/src/nxt_router.c index 5beceb38..fa2e957d 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -128,6 +128,8 @@ static void nxt_router_app_release_port(nxt_task_t *task, void *obj, static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data); static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data); +static void nxt_router_conn_http_body_read(nxt_task_t *task, void *obj, + void *data); static void nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, nxt_app_parse_ctx_t *ap); static void nxt_router_process_http_request_mp(nxt_task_t *task, @@ -604,10 +606,34 @@ static nxt_conf_map_t nxt_router_http_conf[] = { }, { + nxt_string("large_header_buffers"), + NXT_CONF_MAP_SIZE, + offsetof(nxt_socket_conf_t, large_header_buffers), + }, + + { + nxt_string("body_buffer_size"), + NXT_CONF_MAP_SIZE, + offsetof(nxt_socket_conf_t, body_buffer_size), + }, + + { + nxt_string("max_body_size"), + NXT_CONF_MAP_SIZE, + offsetof(nxt_socket_conf_t, max_body_size), + }, + + { nxt_string("header_read_timeout"), NXT_CONF_MAP_MSEC, offsetof(nxt_socket_conf_t, header_read_timeout), }, + + { + nxt_string("body_read_timeout"), + NXT_CONF_MAP_MSEC, + offsetof(nxt_socket_conf_t, body_read_timeout), + }, }; @@ -792,7 +818,11 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, // STUB, default values if http block is not defined. skcf->header_buffer_size = 2048; skcf->large_header_buffer_size = 8192; + skcf->large_header_buffers = 4; + skcf->body_buffer_size = 16 * 1024; + skcf->max_body_size = 2 * 1024 * 1024; skcf->header_read_timeout = 5000; + skcf->body_read_timeout = 5000; if (http != NULL) { ret = nxt_conf_map_object(http, nxt_router_http_conf, @@ -1807,7 +1837,7 @@ nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data) } -static const nxt_conn_state_t nxt_router_conn_read_state +static const nxt_conn_state_t nxt_router_conn_read_header_state nxt_aligned(64) = { .ready_handler = nxt_router_conn_http_header_parse, @@ -1820,6 +1850,20 @@ static const nxt_conn_state_t nxt_router_conn_read_state }; +static const nxt_conn_state_t nxt_router_conn_read_body_state + nxt_aligned(64) = +{ + .ready_handler = nxt_router_conn_http_body_read, + .close_handler = nxt_router_conn_close, + .error_handler = nxt_router_conn_error, + + .timer_handler = nxt_router_conn_timeout, + .timer_value = nxt_router_conn_timeout_value, + .timer_data = offsetof(nxt_socket_conf_t, body_read_timeout), + .timer_autoreset = 1, +}; + + static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data) { @@ -1844,7 +1888,7 @@ nxt_router_conn_init(nxt_task_t *task, void *obj, void *data) c->read_work_queue = &engine->fast_work_queue; c->write_work_queue = &engine->fast_work_queue; - c->read_state = &nxt_router_conn_read_state; + c->read_state = &nxt_router_conn_read_header_state; nxt_conn_read(engine, c); } @@ -1873,7 +1917,6 @@ nxt_router_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) rc = nxt_event_engine_request_find(engine, msg->port_msg.stream); if (nxt_slow_path(rc == NULL)) { - nxt_debug(task, "request id %08uxD not found", msg->port_msg.stream); return; @@ -1910,14 +1953,18 @@ nxt_router_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) rc->app_port = NULL; } + + rc->conn = NULL; } if (b == NULL) { return; } - /* Disable instant buffer completion/re-using by port. */ - msg->buf = NULL; + if (msg->buf == b) { + /* Disable instant buffer completion/re-using by port. */ + msg->buf = NULL; + } if (c->write == NULL) { c->write = b; @@ -1938,6 +1985,9 @@ nxt_router_text_by_code(int code) case 400: return "Bad request"; case 404: return "Not found"; case 403: return "Forbidden"; + case 408: return "Request Timeout"; + case 411: return "Length Required"; + case 413: return "Request Entity Too Large"; case 500: default: return "Internal server error"; } @@ -1965,6 +2015,7 @@ nxt_router_get_error_buf(nxt_task_t *task, nxt_mp_t *mp, int code, msg = (const char *) b->mem.free; b->mem.free = nxt_vsprintf(b->mem.free, b->mem.end, fmt, args); + b->mem.free[0] = '\0'; nxt_log_alert(task->log, "error %d: %s", code, msg); @@ -1996,6 +2047,11 @@ nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code, b = nxt_router_get_error_buf(task, c->mem_pool, code, fmt, args); va_end(args); + if (c->socket.data != NULL) { + nxt_mp_free(c->mem_pool, c->socket.data); + c->socket.data = NULL; + } + if (c->write == NULL) { c->write = b; c->write_state = &nxt_router_conn_write_state; @@ -2345,22 +2401,24 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra) static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data) { - size_t size, preread; + size_t size; nxt_int_t ret; - nxt_buf_t *b; + nxt_buf_t *buf; nxt_conn_t *c; nxt_app_parse_ctx_t *ap; + nxt_app_request_body_t *b; nxt_socket_conf_joint_t *joint; nxt_app_request_header_t *h; c = obj; ap = data; - b = c->read; + buf = c->read; + joint = c->listen->socket.data; nxt_debug(task, "router conn http header parse"); if (ap == NULL) { - ap = nxt_mp_zget(c->mem_pool, sizeof(nxt_app_parse_ctx_t)); + ap = nxt_mp_zalloc(c->mem_pool, sizeof(nxt_app_parse_ctx_t)); if (nxt_slow_path(ap == NULL)) { nxt_router_conn_close(task, c, data); return; @@ -2376,78 +2434,157 @@ nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data) ap->r.remote.start = nxt_sockaddr_address(c->remote); ap->r.remote.length = c->remote->address_length; + + ap->r.header.buf = buf; } h = &ap->r.header; + b = &ap->r.body; - ret = nxt_app_http_req_parse(task, ap, b); + ret = nxt_app_http_req_header_parse(task, ap, buf); - nxt_debug(task, "http parse request: %d", ret); + nxt_debug(task, "http parse request header: %d", ret); switch (nxt_expect(NXT_DONE, ret)) { case NXT_DONE: - preread = nxt_buf_mem_used_size(&b->mem); - nxt_debug(task, "router request header parsing complete, " "content length: %O, preread: %uz", - h->parsed_content_length, preread); + h->parsed_content_length, nxt_buf_mem_used_size(&buf->mem)); - nxt_router_process_http_request(task, c, ap); - return; + if (b->done) { + nxt_router_process_http_request(task, c, ap); + + return; + } + + if (joint->socket_conf->max_body_size > 0 && + (size_t) h->parsed_content_length > + joint->socket_conf->max_body_size) { + + nxt_router_gen_error(task, c, 413, "Content-Length too big"); + return; + } + + if (nxt_buf_mem_free_size(&buf->mem) == 0) { + size = nxt_min(joint->socket_conf->body_buffer_size, + (size_t) h->parsed_content_length); + + buf->next = nxt_buf_mem_alloc(c->mem_pool, size, 0); + if (nxt_slow_path(buf->next == NULL)) { + nxt_router_gen_error(task, c, 500, "Failed to allocate " + "buffer for request body"); + return; + } + + c->read = buf->next; + + b->preread_size += nxt_buf_mem_used_size(&buf->mem); + } + + if (b->buf == NULL) { + b->buf = c->read; + } + + c->read_state = &nxt_router_conn_read_body_state; + break; case NXT_ERROR: - nxt_router_conn_close(task, c, data); + nxt_router_gen_error(task, c, 400, "Request header parse error"); return; default: /* NXT_AGAIN */ - if (h->done == 0) { + if (c->read->mem.free == c->read->mem.end) { + size = joint->socket_conf->large_header_buffer_size; - if (c->read->mem.free == c->read->mem.end) { - joint = c->listen->socket.data; - size = joint->socket_conf->large_header_buffer_size; + if (size <= (size_t) nxt_buf_mem_used_size(&buf->mem) || + ap->r.header.bufs >= joint->socket_conf->large_header_buffers) { + nxt_router_gen_error(task, c, 413, + "Too long request headers"); + return; + } - if (size > (size_t) nxt_buf_mem_size(&b->mem)) { - b = nxt_buf_mem_alloc(c->mem_pool, size, 0); - if (nxt_slow_path(b == NULL)) { - nxt_router_conn_close(task, c, data); - return; - } + buf->next = nxt_buf_mem_alloc(c->mem_pool, size, 0); + if (nxt_slow_path(buf->next == NULL)) { + nxt_router_gen_error(task, c, 500, + "Failed to allocate large header " + "buffer"); + return; + } - size = c->read->mem.free - c->read->mem.pos; + ap->r.header.bufs++; - c->read = nxt_buf_cpy(b, c->read->mem.pos, size); - } else { - nxt_router_gen_error(task, c, 400, - "Too long request headers"); - return; - } - } + size = c->read->mem.free - c->read->mem.pos; + + c->read = nxt_buf_cpy(buf->next, c->read->mem.pos, size); } - if (ap->r.body.done == 0) { + } + + nxt_conn_read(task->thread->engine, c); +} - preread = nxt_buf_mem_used_size(&b->mem); - if (h->parsed_content_length - preread > - (size_t) nxt_buf_mem_free_size(&b->mem)) { +static void +nxt_router_conn_http_body_read(nxt_task_t *task, void *obj, void *data) +{ + size_t size; + nxt_int_t ret; + nxt_buf_t *buf; + nxt_conn_t *c; + nxt_app_parse_ctx_t *ap; + nxt_app_request_body_t *b; + nxt_socket_conf_joint_t *joint; + nxt_app_request_header_t *h; - b = nxt_buf_mem_alloc(c->mem_pool, h->parsed_content_length, 0); - if (nxt_slow_path(b == NULL)) { - nxt_router_gen_error(task, c, 500, "Failed to allocate " - "buffer for request body"); - return; - } + c = obj; + ap = data; + buf = c->read; - c->read = nxt_buf_cpy(b, c->read->mem.pos, preread); - } + nxt_debug(task, "router conn http body read"); - nxt_debug(task, "router request body read again, rest: %uz", - h->parsed_content_length - preread); + nxt_assert(ap != NULL); + b = &ap->r.body; + h = &ap->r.header; + + ret = nxt_app_http_req_body_read(task, ap, buf); + + nxt_debug(task, "http read request body: %d", ret); + + switch (nxt_expect(NXT_DONE, ret)) { + + case NXT_DONE: + nxt_router_process_http_request(task, c, ap); + return; + + case NXT_ERROR: + nxt_router_gen_error(task, c, 500, "Read body error"); + return; + + default: /* NXT_AGAIN */ + + if (nxt_buf_mem_free_size(&buf->mem) == 0) { + joint = c->listen->socket.data; + + b->preread_size += nxt_buf_mem_used_size(&buf->mem); + + size = nxt_min(joint->socket_conf->body_buffer_size, + (size_t) h->parsed_content_length - b->preread_size); + + buf->next = nxt_buf_mem_alloc(c->mem_pool, size, 0); + if (nxt_slow_path(buf->next == NULL)) { + nxt_router_gen_error(task, c, 500, "Failed to allocate " + "buffer for request body"); + return; + } + + c->read = buf->next; } + nxt_debug(task, "router request body read again, rest: %uz", + h->parsed_content_length - b->preread_size); } nxt_conn_read(task->thread->engine, c); @@ -2725,9 +2862,12 @@ nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data) c = nxt_read_timer_conn(timer); - c->write_state = &nxt_router_conn_close_state; + if (c->read_state == &nxt_router_conn_read_header_state) { + nxt_router_gen_error(task, c, 408, "Read header timeout"); - nxt_conn_close(task->thread->engine, c); + } else { + nxt_router_gen_error(task, c, 408, "Read body timeout"); + } } diff --git a/src/nxt_router.h b/src/nxt_router.h index 201b786c..2a8a30e1 100644 --- a/src/nxt_router.h +++ b/src/nxt_router.h @@ -106,7 +106,11 @@ typedef struct { size_t header_buffer_size; size_t large_header_buffer_size; + size_t large_header_buffers; + size_t body_buffer_size; + size_t max_body_size; nxt_msec_t header_read_timeout; + nxt_msec_t body_read_timeout; } nxt_socket_conf_t; |