summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2017-08-11 18:04:04 +0300
committerMax Romanov <max.romanov@nginx.com>2017-08-11 18:04:04 +0300
commit39a6a4c973dd378f1fad9d2514d7857fe491df4b (patch)
tree220ea8605a9396a93fa7a459ff3e558fe1a044aa
parente1e808bd94609c80b4990939285d47f124bb2eef (diff)
downloadunit-39a6a4c973dd378f1fad9d2514d7857fe491df4b.tar.gz
unit-39a6a4c973dd378f1fad9d2514d7857fe491df4b.tar.bz2
Request body read state implemented.
With specific timeout and buffer size settings.
Diffstat (limited to '')
-rw-r--r--src/nginext/nxt_go_lib.c42
-rw-r--r--src/nginext/nxt_go_lib.h7
-rw-r--r--src/nginext/nxt_go_port.c7
-rw-r--r--src/nginext/nxt_go_run_ctx.c185
-rw-r--r--src/nginext/nxt_go_run_ctx.h2
-rw-r--r--src/nginext/request.go6
-rw-r--r--src/nxt_application.c170
-rw-r--r--src/nxt_application.h18
-rw-r--r--src/nxt_go.c9
-rw-r--r--src/nxt_php_sapi.c67
-rw-r--r--src/nxt_port_memory.c34
-rw-r--r--src/nxt_port_memory.h2
-rw-r--r--src/nxt_python_wsgi.c69
-rw-r--r--src/nxt_router.c242
-rw-r--r--src/nxt_router.h4
15 files changed, 610 insertions, 254 deletions
diff --git a/src/nginext/nxt_go_lib.c b/src/nginext/nxt_go_lib.c
index 87f583d7..6d5d2a03 100644
--- a/src/nginext/nxt_go_lib.c
+++ b/src/nginext/nxt_go_lib.c
@@ -17,14 +17,14 @@ nxt_go_response_write(nxt_go_request_t r, void *buf, size_t len)
}
int
-nxt_go_request_read(nxt_go_request_t r, off_t off, void *dst, size_t dst_len)
+nxt_go_request_read(nxt_go_request_t r, void *dst, size_t dst_len)
{
return -1;
}
int
-nxt_go_request_read_from(nxt_go_request_t r, off_t off, void *dst,
- size_t dst_len, void *src, size_t src_len)
+nxt_go_request_read_from(nxt_go_request_t r, void *dst, size_t dst_len,
+ void *src, size_t src_len)
{
return -1;
}
@@ -71,7 +71,7 @@ nxt_go_response_write(nxt_go_request_t r, void *buf, size_t len)
return 0;
}
- nxt_go_debug("write: %d %.*s", (int) len, (int) len, (char *) buf);
+ nxt_go_debug("write: %d", (int) len);
ctx = (nxt_go_run_ctx_t *) r;
rc = nxt_go_ctx_write(ctx, buf, len);
@@ -81,44 +81,30 @@ nxt_go_response_write(nxt_go_request_t r, void *buf, size_t len)
int
-nxt_go_request_read(nxt_go_request_t r, off_t off, void *dst, size_t dst_len)
+nxt_go_request_read(nxt_go_request_t r, void *dst, size_t dst_len)
{
- nxt_go_msg_t *msg;
- nxt_go_run_ctx_t *ctx;
- nxt_app_request_body_t *b;
- nxt_app_request_header_t *h;
+ size_t res;
+ nxt_go_run_ctx_t *ctx;
if (nxt_slow_path(r == 0)) {
return 0;
}
ctx = (nxt_go_run_ctx_t *) r;
- b = &ctx->r.body;
- h = &ctx->r.header;
- if (off >= h->parsed_content_length) {
- return 0;
- }
+ dst_len = nxt_min(dst_len, ctx->r.body.preread_size);
- if (off < b->preread.length) {
- dst_len = nxt_min(b->preread.length - off, dst_len);
+ res = nxt_go_ctx_read_raw(ctx, dst, dst_len);
- if (dst_len != 0) {
- nxt_memcpy(dst, b->preread.start + off, dst_len);
- }
+ ctx->r.body.preread_size -= res;
- return dst_len;
- }
-
- /* TODO find msg to read */
-
- return NXT_AGAIN;
+ return res;
}
int
-nxt_go_request_read_from(nxt_go_request_t r, off_t off, void *dst,
- size_t dst_len, void *src, size_t src_len)
+nxt_go_request_read_from(nxt_go_request_t r, void *dst, size_t dst_len,
+ void *src, size_t src_len)
{
nxt_go_run_ctx_t *ctx;
@@ -130,7 +116,7 @@ nxt_go_request_read_from(nxt_go_request_t r, off_t off, void *dst,
nxt_go_ctx_add_msg(ctx, src, src_len);
- return nxt_go_request_read(r, off, dst, dst_len);
+ return nxt_go_request_read(r, dst, dst_len);
}
diff --git a/src/nginext/nxt_go_lib.h b/src/nginext/nxt_go_lib.h
index 220fc9b3..b3a86be9 100644
--- a/src/nginext/nxt_go_lib.h
+++ b/src/nginext/nxt_go_lib.h
@@ -21,11 +21,10 @@ typedef uintptr_t nxt_go_request_t;
int nxt_go_response_write(nxt_go_request_t r, void *buf, size_t len);
-int nxt_go_request_read(nxt_go_request_t r, off_t off, void *dst,
- size_t dst_len);
+int nxt_go_request_read(nxt_go_request_t r, void *dst, size_t dst_len);
-int nxt_go_request_read_from(nxt_go_request_t r, off_t off, void *dst,
- size_t dst_len, void *src, size_t src_len);
+int nxt_go_request_read_from(nxt_go_request_t r, void *dst, size_t dst_len,
+ void *src, size_t src_len);
int nxt_go_request_close(nxt_go_request_t r);
diff --git a/src/nginext/nxt_go_port.c b/src/nginext/nxt_go_port.c
index af50b860..fca3cf9a 100644
--- a/src/nginext/nxt_go_port.c
+++ b/src/nginext/nxt_go_port.c
@@ -83,22 +83,23 @@ nxt_go_data_handler(nxt_port_msg_t *port_msg, size_t size)
do {
rc = nxt_go_ctx_read_str(ctx, &n);
- rc = nxt_go_ctx_read_str(ctx, &v);
if (n.length == 0) {
break;
}
+ rc = nxt_go_ctx_read_str(ctx, &v);
nxt_go_request_add_header(r, nxt_go_str(&n), nxt_go_str(&v));
} while(1);
- ctx->r.body.preread = v;
+ nxt_go_ctx_read_size(ctx, &s);
+ ctx->r.body.preread_size = s;
if (h->parsed_content_length > 0) {
nxt_go_request_set_content_length(r, h->parsed_content_length);
}
- if (v.length < h->parsed_content_length) {
+ if (ctx->r.body.preread_size < h->parsed_content_length) {
nxt_go_request_create_channel(r);
}
diff --git a/src/nginext/nxt_go_run_ctx.c b/src/nginext/nxt_go_run_ctx.c
index c6481b95..cca8273e 100644
--- a/src/nginext/nxt_go_run_ctx.c
+++ b/src/nginext/nxt_go_run_ctx.c
@@ -186,7 +186,7 @@ nxt_go_ctx_add_msg(nxt_go_run_ctx_t *ctx, nxt_port_msg_t *port_msg, size_t size)
msg->start_offset = ctx->msg_last->start_offset;
if (ctx->msg_last == &ctx->msg) {
- msg->start_offset += ctx->r.body.preread.length;
+ msg->start_offset += ctx->r.body.preread_size;
} else {
msg->start_offset += ctx->msg_last->data_size;
}
@@ -227,8 +227,8 @@ nxt_go_ctx_flush(nxt_go_run_ctx_t *ctx, int last)
}
-nxt_int_t
-nxt_go_ctx_write(nxt_go_run_ctx_t *ctx, void *data, size_t len)
+nxt_buf_t *
+nxt_go_port_mmap_get_buf(nxt_go_run_ctx_t *ctx, size_t size)
{
size_t nchunks;
nxt_buf_t *buf;
@@ -237,30 +237,16 @@ nxt_go_ctx_write(nxt_go_run_ctx_t *ctx, void *data, size_t len)
nxt_port_mmap_msg_t *mmap_msg;
nxt_port_mmap_header_t *hdr;
- buf = &ctx->wbuf;
-
- if (ctx->nwbuf > 0 && nxt_buf_mem_free_size(&buf->mem) >= len) {
- memcpy(buf->mem.free, data, len);
- buf->mem.free += len;
-
- mmap_msg = ctx->wmmap_msg + ctx->nwbuf - 1;
- mmap_msg->size += len;
-
- return NXT_OK;
- }
-
- if (ctx->nwbuf >= 8) {
- nxt_go_ctx_flush(ctx, 0);
- }
-
c = 0;
+ buf = &ctx->wbuf;
+
hdr = nxt_go_port_mmap_get(ctx->process,
ctx->msg.port_msg->reply_port, &c);
if (nxt_slow_path(hdr == NULL)) {
nxt_go_warn("failed to get port_mmap");
- return NXT_ERROR;
+ return NULL;
}
buf->mem.start = nxt_port_mmap_chunk_start(hdr, c);
@@ -268,12 +254,15 @@ nxt_go_ctx_write(nxt_go_run_ctx_t *ctx, void *data, size_t len)
buf->mem.free = buf->mem.start;
buf->mem.end = buf->mem.start + PORT_MMAP_CHUNK_SIZE;
+ buf->parent = hdr;
+
mmap_msg = ctx->wmmap_msg + ctx->nwbuf;
mmap_msg->mmap_id = hdr->id;
mmap_msg->chunk_id = c;
+ mmap_msg->size = 0;
- nchunks = len / PORT_MMAP_CHUNK_SIZE;
- if ((len % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) {
+ nchunks = size / PORT_MMAP_CHUNK_SIZE;
+ if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) {
nchunks++;
}
@@ -283,27 +272,124 @@ nxt_go_ctx_write(nxt_go_run_ctx_t *ctx, void *data, size_t len)
/* Try to acquire as much chunks as required. */
while (nchunks > 0) {
- if (nxt_port_mmap_get_chunk_busy(hdr, c)) {
+ if (nxt_port_mmap_chk_set_chunk_busy(hdr, c) == 0) {
break;
}
- nxt_port_mmap_set_chunk_busy(hdr, c);
buf->mem.end += PORT_MMAP_CHUNK_SIZE;
c++;
nchunks--;
}
- if (nxt_buf_mem_free_size(&buf->mem) < len) {
- len = nxt_buf_mem_free_size(&buf->mem);
+ ctx->nwbuf++;
+
+ return buf;
+}
+
+nxt_int_t
+nxt_go_port_mmap_increase_buf(nxt_buf_t *b, size_t size, size_t min_size)
+{
+ size_t nchunks, free_size;
+ nxt_chunk_id_t c, start;
+ nxt_port_mmap_header_t *hdr;
+
+ free_size = nxt_buf_mem_free_size(&b->mem);
+
+ if (nxt_slow_path(size <= free_size)) {
+ return NXT_OK;
}
- memcpy(buf->mem.free, data, len);
- buf->mem.free += len;
+ hdr = b->parent;
- mmap_msg->size = len;
+ start = nxt_port_mmap_chunk_id(hdr, b->mem.end);
- ctx->nwbuf++;
+ size -= free_size;
+
+ nchunks = size / PORT_MMAP_CHUNK_SIZE;
+ if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) {
+ nchunks++;
+ }
+
+ c = start;
+
+ /* Try to acquire as much chunks as required. */
+ while (nchunks > 0) {
+
+ if (nxt_port_mmap_chk_set_chunk_busy(hdr, c) == 0) {
+ break;
+ }
+
+ c++;
+ nchunks--;
+ }
+
+ if (nchunks != 0 &&
+ min_size > free_size + PORT_MMAP_CHUNK_SIZE * (c - start)) {
+
+ c--;
+ while (c >= start) {
+ nxt_port_mmap_set_chunk_free(hdr, c);
+ c--;
+ }
+
+ return NXT_ERROR;
+ } else {
+ b->mem.end += PORT_MMAP_CHUNK_SIZE * (c - start);
+
+ return NXT_OK;
+ }
+}
+
+
+nxt_int_t
+nxt_go_ctx_write(nxt_go_run_ctx_t *ctx, void *data, size_t len)
+{
+ size_t free_size, copy_size;
+ nxt_buf_t *buf;
+ nxt_port_mmap_msg_t *mmap_msg;
+
+ buf = &ctx->wbuf;
+
+ while (len > 0) {
+ if (ctx->nwbuf == 0) {
+ buf = nxt_go_port_mmap_get_buf(ctx, len);
+
+ if (nxt_slow_path(buf == NULL)) {
+ return NXT_ERROR;
+ }
+ }
+
+ do {
+ free_size = nxt_buf_mem_free_size(&buf->mem);
+
+ if (free_size > 0) {
+ copy_size = nxt_min(free_size, len);
+
+ buf->mem.free = nxt_cpymem(buf->mem.free, data, copy_size);
+
+ mmap_msg = ctx->wmmap_msg + ctx->nwbuf - 1;
+ mmap_msg->size += copy_size;
+
+ len -= copy_size;
+ data = nxt_pointer_to(data, copy_size);
+
+ if (len == 0) {
+ return NXT_OK;
+ }
+ }
+ } while (nxt_go_port_mmap_increase_buf(buf, len, 1) == NXT_OK);
+
+ if (ctx->nwbuf >= 8) {
+ nxt_go_ctx_flush(ctx, 0);
+ }
+
+ buf = nxt_go_port_mmap_get_buf(ctx, len);
+
+ if (nxt_slow_path(buf == NULL)) {
+ return NXT_ERROR;
+ }
+ }
return NXT_OK;
}
@@ -403,4 +489,43 @@ nxt_go_ctx_read_str(nxt_go_run_ctx_t *ctx, nxt_str_t *str)
}
+size_t
+nxt_go_ctx_read_raw(nxt_go_run_ctx_t *ctx, void *dst, size_t size)
+{
+ size_t res, read_size;
+ nxt_int_t rc;
+ nxt_buf_t *buf;
+
+ res = 0;
+
+ while (size > 0) {
+ buf = &ctx->rbuf;
+
+ if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) == 0)) {
+ ctx->nrbuf++;
+ rc = nxt_go_ctx_init_rbuf(ctx);
+ if (nxt_slow_path(rc != NXT_OK)) {
+ nxt_go_warn("read raw: init rbuf failed");
+ return res;
+ }
+
+ continue;
+ }
+
+ read_size = nxt_buf_mem_used_size(&buf->mem);
+ read_size = nxt_min(read_size, size);
+
+ dst = nxt_cpymem(dst, buf->mem.pos, read_size);
+
+ size -= read_size;
+ buf->mem.pos += read_size;
+ res += read_size;
+ }
+
+ nxt_go_debug("read_raw: %d", (int) res);
+
+ return res;
+}
+
+
#endif /* NXT_CONFIGURE */
diff --git a/src/nginext/nxt_go_run_ctx.h b/src/nginext/nxt_go_run_ctx.h
index c7c3da15..4ead1df5 100644
--- a/src/nginext/nxt_go_run_ctx.h
+++ b/src/nginext/nxt_go_run_ctx.h
@@ -68,5 +68,7 @@ nxt_int_t nxt_go_ctx_read_size(nxt_go_run_ctx_t *ctx, size_t *size);
nxt_int_t nxt_go_ctx_read_str(nxt_go_run_ctx_t *ctx, nxt_str_t *str);
+size_t nxt_go_ctx_read_raw(nxt_go_run_ctx_t *ctx, void *dst, size_t size);
+
#endif /* _NXT_GO_RUN_CTX_H_INCLUDED_ */
diff --git a/src/nginext/request.go b/src/nginext/request.go
index 8667a074..1679f1c7 100644
--- a/src/nginext/request.go
+++ b/src/nginext/request.go
@@ -21,7 +21,6 @@ type request struct {
resp *response
c_req C.nxt_go_request_t
id C.uint32_t
- read_pos C.off_t
msgs []*cmsg
ch chan *cmsg
}
@@ -29,18 +28,17 @@ type request struct {
func (r *request) Read(p []byte) (n int, err error) {
c := C.size_t(cap(p))
b := C.malloc(c)
- res := C.nxt_go_request_read(r.c_req, r.read_pos, b, c)
+ res := C.nxt_go_request_read(r.c_req, b, c)
if res == -2 /* NXT_AGAIN */ {
m := <-r.ch
- res = C.nxt_go_request_read_from(r.c_req, r.read_pos, b, c, m.buf.b, m.buf.s)
+ res = C.nxt_go_request_read_from(r.c_req, b, c, m.buf.b, m.buf.s)
r.push(m)
}
if res > 0 {
copy(p, C.GoBytes(b, res))
- r.read_pos += C.off_t(res)
}
C.free(b)
diff --git a/src/nxt_application.c b/src/nxt_application.c
index 50feac2d..1b23eca3 100644
--- a/src/nxt_application.c
+++ b/src/nxt_application.c
@@ -154,7 +154,7 @@ nxt_app_msg_write_get_buf(nxt_task_t *task, nxt_app_wmsg_t *msg, size_t size)
return res;
}
- if (nxt_port_mmap_increase_buf(task, b, size) == NXT_OK) {
+ if (nxt_port_mmap_increase_buf(task, b, size, size) == NXT_OK) {
res = b->mem.free;
b->mem.free += size;
@@ -306,6 +306,43 @@ nxt_app_msg_read_str(nxt_task_t *task, nxt_app_rmsg_t *msg, nxt_str_t *str)
}
+size_t
+nxt_app_msg_read_raw(nxt_task_t *task, nxt_app_rmsg_t *msg, void *dst,
+ size_t size)
+{
+ size_t res, read_size;
+ nxt_buf_t *buf;
+
+ res = 0;
+
+ while (size > 0) {
+ buf = msg->buf;
+
+ if (nxt_slow_path(buf == NULL)) {
+ break;
+ }
+
+ if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) == 0)) {
+ msg->buf = buf->next;
+ continue;
+ }
+
+ read_size = nxt_buf_mem_used_size(&buf->mem);
+ read_size = nxt_min(read_size, size);
+
+ dst = nxt_cpymem(dst, buf->mem.pos, read_size);
+
+ size -= read_size;
+ buf->mem.pos += read_size;
+ res += read_size;
+ }
+
+ nxt_debug(task, "nxt_read_raw: %uz", res);
+
+ return res;
+}
+
+
nxt_int_t
nxt_app_msg_read_nvp(nxt_task_t *task, nxt_app_rmsg_t *rmsg, nxt_str_t *n,
nxt_str_t *v)
@@ -459,7 +496,7 @@ nxt_app_http_req_init(nxt_task_t *task, nxt_app_parse_ctx_t *ctx)
nxt_int_t
-nxt_app_http_req_parse(nxt_task_t *task, nxt_app_parse_ctx_t *ctx,
+nxt_app_http_req_header_parse(nxt_task_t *task, nxt_app_parse_ctx_t *ctx,
nxt_buf_t *buf)
{
nxt_int_t rc;
@@ -471,50 +508,76 @@ nxt_app_http_req_parse(nxt_task_t *task, nxt_app_parse_ctx_t *ctx,
b = &ctx->r.body;
h = &ctx->r.header;
- if (h->done == 0) {
- rc = nxt_http_parse_request(p, &buf->mem);
+ nxt_assert(h->done == 0);
- if (nxt_slow_path(rc != NXT_DONE)) {
- return rc;
- }
+ rc = nxt_http_parse_request(p, &buf->mem);
- rc = nxt_http_fields_process(p->fields, ctx, task->log);
+ if (nxt_slow_path(rc != NXT_DONE)) {
+ return rc;
+ }
- if (nxt_slow_path(rc != NXT_OK)) {
- return rc;
- }
+ rc = nxt_http_fields_process(p->fields, ctx, task->log);
- h->fields = p->fields;
- h->done = 1;
+ if (nxt_slow_path(rc != NXT_OK)) {
+ return rc;
+ }
- h->version.start = p->version.str;
- h->version.length = nxt_strlen(p->version.str);
+ h->fields = p->fields;
+ h->done = 1;
- h->method = p->method;
+ h->version.start = p->version.str;
+ h->version.length = nxt_strlen(p->version.str);
- h->target.start = p->target_start;
- h->target.length = p->target_end - p->target_start;
+ h->method = p->method;
- h->path = p->path;
- h->query = p->args;
+ h->target.start = p->target_start;
+ h->target.length = p->target_end - p->target_start;
- if (h->parsed_content_length == 0) {
- b->done = 1;
- }
- }
+ h->path = p->path;
+ h->query = p->args;
- if (b->done == 0) {
- b->preread.length = buf->mem.free - buf->mem.pos;
- b->preread.start = buf->mem.pos;
+ if (h->parsed_content_length == 0) {
+ b->done = 1;
- b->done = b->preread.length >= (size_t) h->parsed_content_length;
}
- if (h->done == 1 && b->done == 1) {
+ if (buf->mem.free == buf->mem.pos) {
return NXT_DONE;
}
- return NXT_AGAIN;
+ b->buf = buf;
+ b->done = nxt_buf_mem_used_size(&buf->mem) >=
+ h->parsed_content_length;
+
+ if (b->done == 1) {
+ b->preread_size = nxt_buf_mem_used_size(&buf->mem);
+ }
+
+ return NXT_DONE;
+}
+
+
+nxt_int_t
+nxt_app_http_req_body_read(nxt_task_t *task, nxt_app_parse_ctx_t *ctx,
+ nxt_buf_t *buf)
+{
+ nxt_app_request_body_t *b;
+ nxt_app_request_header_t *h;
+
+ b = &ctx->r.body;
+ h = &ctx->r.header;
+
+ nxt_assert(h->done == 1);
+ nxt_assert(b->done == 0);
+
+ b->done = nxt_buf_mem_used_size(&buf->mem) + b->preread_size >=
+ (size_t) h->parsed_content_length;
+
+ if (b->done == 1) {
+ b->preread_size += nxt_buf_mem_used_size(&buf->mem);
+ }
+
+ return b->done == 1 ? NXT_DONE : NXT_AGAIN;
}
@@ -571,17 +634,48 @@ nxt_int_t
nxt_app_msg_write_raw(nxt_task_t *task, nxt_app_wmsg_t *msg, const u_char *c,
size_t size)
{
- u_char *dst;
+ size_t free_size, copy_size;
+ nxt_buf_t *b;
+ nxt_port_t *port;
- dst = nxt_app_msg_write_get_buf(task, msg, size);
- if (nxt_slow_path(dst == NULL)) {
- return NXT_ERROR;
- }
+ nxt_debug(task, "nxt_app_msg_write_raw: %uz", size);
+
+ while (size > 0) {
+ b = *msg->buf;
+
+ if (b == NULL) {
+ port = nxt_app_msg_get_port(task, msg);
+ if (nxt_slow_path(port == NULL)) {
+ return NXT_ERROR;
+ }
+
+ b = nxt_port_mmap_get_buf(task, port, size);
+ if (nxt_slow_path(b == NULL)) {
+ return NXT_ERROR;
+ }
+
+ *msg->buf = b;
+ }
+
+ do {
+ free_size = nxt_buf_mem_free_size(&b->mem);
+
+ if (free_size > 0) {
+ copy_size = nxt_min(free_size, size);
- nxt_memcpy(dst, c, size);
+ b->mem.free = nxt_cpymem(b->mem.free, c, copy_size);
- nxt_debug(task, "nxt_app_msg_write_raw: %d %*s", (int)size,
- (int)size, c);
+ size -= copy_size;
+ c += copy_size;
+
+ if (size == 0) {
+ return NXT_OK;
+ }
+ }
+ } while (nxt_port_mmap_increase_buf(task, b, size, 1) == NXT_OK);
+
+ msg->buf = &b->next;
+ }
return NXT_OK;
}
diff --git a/src/nxt_application.h b/src/nxt_application.h
index f27f90cd..9efb5008 100644
--- a/src/nxt_application.h
+++ b/src/nxt_application.h
@@ -84,12 +84,17 @@ typedef struct {
off_t parsed_content_length;
nxt_bool_t done;
+
+ size_t bufs;
+ nxt_buf_t *buf;
} nxt_app_request_header_t;
typedef struct {
- nxt_str_t preread;
+ size_t preread_size;
nxt_bool_t done;
+
+ nxt_buf_t *buf;
} nxt_app_request_body_t;
@@ -112,8 +117,12 @@ struct nxt_app_parse_ctx_s {
nxt_int_t nxt_app_http_req_init(nxt_task_t *task, nxt_app_parse_ctx_t *ctx);
-nxt_int_t nxt_app_http_req_parse(nxt_task_t *task, nxt_app_parse_ctx_t *ctx,
- nxt_buf_t *buf);
+nxt_int_t nxt_app_http_req_header_parse(nxt_task_t *task,
+ nxt_app_parse_ctx_t *ctx, nxt_buf_t *buf);
+
+nxt_int_t nxt_app_http_req_body_read(nxt_task_t *task,
+ nxt_app_parse_ctx_t *ctx, nxt_buf_t *buf);
+
nxt_int_t nxt_app_http_req_done(nxt_task_t *task, nxt_app_parse_ctx_t *ctx);
@@ -178,6 +187,9 @@ nxt_int_t nxt_app_msg_write_raw(nxt_task_t *task, nxt_app_wmsg_t *msg,
nxt_int_t nxt_app_msg_read_str(nxt_task_t *task, nxt_app_rmsg_t *msg,
nxt_str_t *str);
+size_t nxt_app_msg_read_raw(nxt_task_t *task, nxt_app_rmsg_t *msg, void *buf,
+ size_t size);
+
nxt_int_t nxt_app_msg_read_nvp(nxt_task_t *task, nxt_app_rmsg_t *rmsg,
nxt_str_t *n, nxt_str_t *v);
diff --git a/src/nxt_go.c b/src/nxt_go.c
index 17240804..b821cced 100644
--- a/src/nxt_go.c
+++ b/src/nxt_go.c
@@ -116,6 +116,7 @@ static nxt_int_t
nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, nxt_app_wmsg_t *wmsg)
{
nxt_int_t rc;
+ nxt_buf_t *b;
nxt_http_field_t *field;
nxt_app_request_header_t *h;
@@ -168,7 +169,13 @@ nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, nxt_app_wmsg_t *wmsg)
/* end-of-headers mark */
NXT_WRITE(&eof);
- NXT_WRITE(&r->body.preread);
+
+ RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size));
+
+ for(b = r->body.buf; b != NULL; b = b->next) {
+ RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos,
+ nxt_buf_mem_used_size(&b->mem)));
+ }
#undef NXT_WRITE
#undef RC
diff --git a/src/nxt_php_sapi.c b/src/nxt_php_sapi.c
index 0f638201..4f8f4696 100644
--- a/src/nxt_php_sapi.c
+++ b/src/nxt_php_sapi.c
@@ -128,6 +128,8 @@ typedef struct {
nxt_str_t script;
nxt_app_wmsg_t *wmsg;
nxt_mp_t *mem_pool;
+
+ size_t body_preread_size;
} nxt_php_run_ctx_t;
nxt_inline nxt_int_t nxt_php_write(nxt_php_run_ctx_t *ctx,
@@ -342,8 +344,6 @@ nxt_php_read_request(nxt_task_t *task, nxt_app_rmsg_t *rmsg,
RC(nxt_app_msg_read_size(task, rmsg, &s));
h->parsed_content_length = s;
- NXT_READ(&ctx->r.body.preread);
-
#undef NXT_READ
#undef RC
@@ -361,6 +361,7 @@ nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
nxt_app_wmsg_t *wmsg)
{
nxt_int_t rc;
+ nxt_buf_t *b;
nxt_http_field_t *field;
nxt_app_request_header_t *h;
@@ -413,8 +414,6 @@ nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
RC(nxt_app_msg_write_size(task, wmsg, h->parsed_content_length));
- NXT_WRITE(&r->body.preread);
-
nxt_list_each(field, h->fields) {
RC(nxt_app_msg_write_prefixed_upcase(task, wmsg,
&prefix, &field->name));
@@ -425,6 +424,13 @@ nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
/* end-of-headers mark */
NXT_WRITE(&eof);
+ RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size));
+
+ for(b = r->body.buf; b != NULL; b = b->next) {
+ RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos,
+ nxt_buf_mem_used_size(&b->mem)));
+ }
+
#undef NXT_WRITE
#undef RC
@@ -673,23 +679,14 @@ static int
nxt_php_read_post(char *buffer, uint count_bytes TSRMLS_DC)
#endif
{
- off_t rest;
- size_t size;
-/*
- ssize_t n;
- nxt_err_t err;
- nxt_php_ctx_t *ctx;
- nxt_app_request_t *r;
-*/
+ size_t size, rest;
nxt_php_run_ctx_t *ctx;
- nxt_app_request_body_t *b;
nxt_app_request_header_t *h;
ctx = SG(server_context);
h = &ctx->r.header;
- b = &ctx->r.body;
- rest = h->parsed_content_length - SG(read_post_bytes);
+ rest = (size_t) h->parsed_content_length - SG(read_post_bytes);
nxt_debug(ctx->task, "nxt_php_read_post %O", rest);
@@ -697,43 +694,11 @@ nxt_php_read_post(char *buffer, uint count_bytes TSRMLS_DC)
return 0;
}
- size = 0;
-#ifdef NXT_PHP7
- count_bytes = (size_t) nxt_min(rest, (off_t) count_bytes);
-#else
- count_bytes = (uint) nxt_min(rest, (off_t) count_bytes);
-#endif
-
- if (b->preread.length != 0) {
- size = nxt_min(b->preread.length, count_bytes);
-
- nxt_memcpy(buffer, b->preread.start, size);
+ rest = nxt_min(ctx->body_preread_size, (size_t) count_bytes);
+ size = nxt_app_msg_read_raw(ctx->task, ctx->rmsg, buffer, rest);
- b->preread.length -= size;
- b->preread.start += size;
+ ctx->body_preread_size -= size;
- if (size == count_bytes) {
- return size;
- }
- }
-
-#if 0
- nxt_debug(ctx->task, "recv %z", (size_t) count_bytes - size);
-
- n = recv(r->event_conn->socket.fd, buffer + size, count_bytes - size, 0);
-
- if (nxt_slow_path(n <= 0)) {
- err = (n == 0) ? 0 : nxt_socket_errno;
-
- nxt_log_error(NXT_LOG_ERR, r->log, "recv(%d, %uz) failed %E",
- r->event_conn->socket.fd, (size_t) count_bytes - size,
- err);
-
- return size;
- }
-
- return size + n;
-#endif
return size;
}
@@ -868,6 +833,8 @@ nxt_php_register_variables(zval *track_vars_array TSRMLS_DC)
NXT_PHP_SET(n.start, v);
}
+ nxt_app_msg_read_size(task, ctx->rmsg, &ctx->body_preread_size);
+
#undef NXT_PHP_SET
}
diff --git a/src/nxt_port_memory.c b/src/nxt_port_memory.c
index e714b654..afb2f4a4 100644
--- a/src/nxt_port_memory.c
+++ b/src/nxt_port_memory.c
@@ -95,8 +95,6 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
b = obj;
- nxt_debug(task, "mmap buf completion: %p %p", b, b->mem.start);
-
mp = b->data;
#if (NXT_DEBUG)
@@ -125,6 +123,10 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
nxt_port_mmap_free_junk(p, b->mem.end - p);
+ nxt_debug(task, "mmap buf completion: %p [%p,%d] (sent=%d), %PI,%d,%d", b,
+ b->mem.start, b->mem.end - b->mem.start, b->is_port_mmap_sent,
+ hdr->pid, hdr->id, c);
+
while (p < b->mem.end) {
nxt_port_mmap_set_chunk_free(hdr, c);
@@ -414,11 +416,6 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
nxt_debug(task, "request %z bytes shm buffer", size);
- if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) {
- nxt_debug(task, "requested size (%z bytes) too big", size);
- return NULL;
- }
-
b = nxt_buf_mem_ts_alloc(task, port->mem_pool, 0);
if (nxt_slow_path(b == NULL)) {
return NULL;
@@ -445,6 +442,10 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
nchunks++;
}
+ nxt_debug(task, "outgoing mmap buf allocation: %p [%p,%d] %PI,%d,%d", b,
+ b->mem.start, b->mem.end - b->mem.start,
+ hdr->pid, hdr->id, c);
+
c++;
nchunks--;
@@ -465,9 +466,10 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
nxt_int_t
-nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size)
+nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size,
+ size_t min_size)
{
- size_t nchunks;
+ size_t nchunks, free_size;
nxt_chunk_id_t c, start;
nxt_port_mmap_header_t *hdr;
@@ -479,7 +481,9 @@ nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size)
return NXT_ERROR;
}
- if (nxt_slow_path(size <= (size_t) nxt_buf_mem_free_size(&b->mem))) {
+ free_size = nxt_buf_mem_free_size(&b->mem);
+
+ if (nxt_slow_path(size <= free_size)) {
return NXT_OK;
}
@@ -487,7 +491,7 @@ nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size)
start = nxt_port_mmap_chunk_id(hdr, b->mem.end);
- size -= nxt_buf_mem_free_size(&b->mem);
+ size -= free_size;
nchunks = size / PORT_MMAP_CHUNK_SIZE;
if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) {
@@ -507,7 +511,9 @@ nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size)
nchunks--;
}
- if (nchunks != 0) {
+ if (nchunks != 0 &&
+ min_size > free_size + PORT_MMAP_CHUNK_SIZE * (c - start)) {
+
c--;
while (c >= start) {
nxt_port_mmap_set_chunk_free(hdr, c);
@@ -559,6 +565,10 @@ nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port,
b->parent = hdr;
+ nxt_debug(task, "incoming mmap buf allocation: %p [%p,%d] %PI,%d,%d", b,
+ b->mem.start, b->mem.end - b->mem.start,
+ hdr->pid, hdr->id, mmap_msg->chunk_id);
+
return b;
}
diff --git a/src/nxt_port_memory.h b/src/nxt_port_memory.h
index 0b64fa89..ea51d001 100644
--- a/src/nxt_port_memory.h
+++ b/src/nxt_port_memory.h
@@ -28,7 +28,7 @@ nxt_buf_t *
nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size);
nxt_int_t nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b,
- size_t size);
+ size_t size, size_t min_size);
nxt_port_mmap_header_t *
nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process,
diff --git a/src/nxt_python_wsgi.c b/src/nxt_python_wsgi.c
index 1dcf8bf7..f1bd170f 100644
--- a/src/nxt_python_wsgi.c
+++ b/src/nxt_python_wsgi.c
@@ -57,6 +57,7 @@ typedef struct {
//nxt_app_request_t *request;
} nxt_py_error_t;
+typedef struct nxt_python_run_ctx_s nxt_python_run_ctx_t;
static nxt_int_t nxt_python_init(nxt_task_t *task, nxt_common_app_conf_t *conf);
@@ -68,7 +69,7 @@ static nxt_int_t nxt_python_run(nxt_task_t *task,
static PyObject *nxt_python_create_environ(nxt_task_t *task);
static PyObject *nxt_python_get_environ(nxt_task_t *task,
- nxt_app_rmsg_t *rmsg);
+ nxt_app_rmsg_t *rmsg, nxt_python_run_ctx_t *ctx);
static PyObject *nxt_py_start_resp(PyObject *self, PyObject *args);
@@ -77,11 +78,13 @@ static PyObject *nxt_py_input_read(nxt_py_input_t *self, PyObject *args);
static PyObject *nxt_py_input_readline(nxt_py_input_t *self, PyObject *args);
static PyObject *nxt_py_input_readlines(nxt_py_input_t *self, PyObject *args);
-typedef struct {
+struct nxt_python_run_ctx_s {
nxt_task_t *task;
nxt_app_rmsg_t *rmsg;
nxt_app_wmsg_t *wmsg;
-} nxt_python_run_ctx_t;
+
+ size_t body_preread_size;
+};
nxt_inline nxt_int_t nxt_python_write(nxt_python_run_ctx_t *ctx,
const u_char *data, size_t len,
@@ -171,8 +174,6 @@ static PyObject *nxt_py_application;
static PyObject *nxt_py_start_resp_obj;
static PyObject *nxt_py_environ_ptyp;
-static nxt_str_t nxt_python_request_body;
-
static nxt_python_run_ctx_t *nxt_python_run_ctx;
@@ -323,6 +324,7 @@ nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
nxt_app_wmsg_t *wmsg)
{
nxt_int_t rc;
+ nxt_buf_t *b;
nxt_http_field_t *field;
nxt_app_request_header_t *h;
@@ -369,14 +371,20 @@ nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
nxt_list_each(field, h->fields) {
RC(nxt_app_msg_write_prefixed_upcase(task, wmsg,
- &prefix, &field->name));
+ &prefix, &field->name));
NXT_WRITE(&field->value);
} nxt_list_loop;
/* end-of-headers mark */
NXT_WRITE(&eof);
- NXT_WRITE(&r->body.preread);
+
+ RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size));
+
+ for(b = r->body.buf; b != NULL; b = b->next) {
+ RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos,
+ nxt_buf_mem_used_size(&b->mem)));
+ }
#undef NXT_WRITE
#undef RC
@@ -395,9 +403,9 @@ nxt_python_run(nxt_task_t *task, nxt_app_rmsg_t *rmsg, nxt_app_wmsg_t *wmsg)
u_char *buf;
size_t size;
PyObject *result, *iterator, *item, *args, *environ;
- nxt_python_run_ctx_t run_ctx = {task, rmsg, wmsg};
+ nxt_python_run_ctx_t run_ctx = {task, rmsg, wmsg, 0};
- environ = nxt_python_get_environ(task, rmsg);
+ environ = nxt_python_get_environ(task, rmsg, &run_ctx);
if (nxt_slow_path(environ == NULL)) {
return NXT_ERROR;
@@ -465,8 +473,8 @@ nxt_python_run(nxt_task_t *task, nxt_app_rmsg_t *rmsg, nxt_app_wmsg_t *wmsg)
size = PyBytes_GET_SIZE(item);
buf = (u_char *) PyBytes_AS_STRING(item);
- nxt_debug(task, "nxt_app_write(fake): %d %*s", (int)size, (int)size,
- buf);
+ nxt_debug(task, "nxt_app_write(fake): %uz", size);
+
nxt_python_write(&run_ctx, buf, size, 1, 0);
Py_DECREF(item);
@@ -688,7 +696,8 @@ nxt_python_read_add_env(nxt_task_t *task, nxt_app_rmsg_t *rmsg,
static PyObject *
-nxt_python_get_environ(nxt_task_t *task, nxt_app_rmsg_t *rmsg)
+nxt_python_get_environ(nxt_task_t *task, nxt_app_rmsg_t *rmsg,
+ nxt_python_run_ctx_t *ctx)
{
size_t s;
u_char *colon;
@@ -774,22 +783,24 @@ nxt_python_get_environ(nxt_task_t *task, nxt_app_rmsg_t *rmsg)
NXT_READ("CONTENT_TYPE");
NXT_READ("CONTENT_LENGTH");
- while ( (rc = nxt_app_msg_read_nvp(task, rmsg, &n, &v)) == NXT_OK) {
+ while (nxt_app_msg_read_str(task, rmsg, &n) == NXT_OK) {
if (nxt_slow_path(n.length == 0)) {
- rc = NXT_DONE;
+ break;
+ }
+
+ rc = nxt_app_msg_read_str(task, rmsg, &v);
+ if (nxt_slow_path(rc != NXT_OK)) {
break;
}
RC(nxt_python_add_env(task, environ, (char *) n.start, &v));
}
+ RC(nxt_app_msg_read_size(task, rmsg, &ctx->body_preread_size));
+
#undef NXT_READ
#undef RC
- if (rc == NXT_DONE && v.length > 0) {
- nxt_python_request_body = v;
- }
-
return environ;
fail:
@@ -900,11 +911,15 @@ static PyObject *
nxt_py_input_read(nxt_py_input_t *self, PyObject *args)
{
u_char *buf;
+ size_t copy_size;
PyObject *body, *obj;
Py_ssize_t size;
nxt_uint_t n;
+ nxt_python_run_ctx_t *ctx;
+
+ ctx = nxt_python_run_ctx;
- size = nxt_python_request_body.length;
+ size = ctx->body_preread_size;
n = PyTuple_GET_SIZE(args);
@@ -926,8 +941,8 @@ nxt_py_input_read(nxt_py_input_t *self, PyObject *args)
"the read body size cannot be zero or less");
}
- if (size == 0 || size > (Py_ssize_t) nxt_python_request_body.length) {
- size = nxt_python_request_body.length;
+ if (size == 0 || size > (Py_ssize_t) ctx->body_preread_size) {
+ size = ctx->body_preread_size;
}
}
@@ -937,16 +952,12 @@ nxt_py_input_read(nxt_py_input_t *self, PyObject *args)
return NULL;
}
- if (size > 0) {
- buf = (u_char *) PyBytes_AS_STRING(body);
+ buf = (u_char *) PyBytes_AS_STRING(body);
- nxt_memcpy(buf, nxt_python_request_body.start, size);
+ copy_size = nxt_min((size_t) size, ctx->body_preread_size);
+ copy_size = nxt_app_msg_read_raw(ctx->task, ctx->rmsg, buf, copy_size);
- nxt_python_request_body.start += size;
- nxt_python_request_body.length -= size;
-
- /* TODO wait body */
- }
+ ctx->body_preread_size -= copy_size;
return body;
}
diff --git a/src/nxt_router.c b/src/nxt_router.c
index 5beceb38..fa2e957d 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -128,6 +128,8 @@ static void nxt_router_app_release_port(nxt_task_t *task, void *obj,
static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data);
static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj,
void *data);
+static void nxt_router_conn_http_body_read(nxt_task_t *task, void *obj,
+ void *data);
static void nxt_router_process_http_request(nxt_task_t *task,
nxt_conn_t *c, nxt_app_parse_ctx_t *ap);
static void nxt_router_process_http_request_mp(nxt_task_t *task,
@@ -604,10 +606,34 @@ static nxt_conf_map_t nxt_router_http_conf[] = {
},
{
+ nxt_string("large_header_buffers"),
+ NXT_CONF_MAP_SIZE,
+ offsetof(nxt_socket_conf_t, large_header_buffers),
+ },
+
+ {
+ nxt_string("body_buffer_size"),
+ NXT_CONF_MAP_SIZE,
+ offsetof(nxt_socket_conf_t, body_buffer_size),
+ },
+
+ {
+ nxt_string("max_body_size"),
+ NXT_CONF_MAP_SIZE,
+ offsetof(nxt_socket_conf_t, max_body_size),
+ },
+
+ {
nxt_string("header_read_timeout"),
NXT_CONF_MAP_MSEC,
offsetof(nxt_socket_conf_t, header_read_timeout),
},
+
+ {
+ nxt_string("body_read_timeout"),
+ NXT_CONF_MAP_MSEC,
+ offsetof(nxt_socket_conf_t, body_read_timeout),
+ },
};
@@ -792,7 +818,11 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
// STUB, default values if http block is not defined.
skcf->header_buffer_size = 2048;
skcf->large_header_buffer_size = 8192;
+ skcf->large_header_buffers = 4;
+ skcf->body_buffer_size = 16 * 1024;
+ skcf->max_body_size = 2 * 1024 * 1024;
skcf->header_read_timeout = 5000;
+ skcf->body_read_timeout = 5000;
if (http != NULL) {
ret = nxt_conf_map_object(http, nxt_router_http_conf,
@@ -1807,7 +1837,7 @@ nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data)
}
-static const nxt_conn_state_t nxt_router_conn_read_state
+static const nxt_conn_state_t nxt_router_conn_read_header_state
nxt_aligned(64) =
{
.ready_handler = nxt_router_conn_http_header_parse,
@@ -1820,6 +1850,20 @@ static const nxt_conn_state_t nxt_router_conn_read_state
};
+static const nxt_conn_state_t nxt_router_conn_read_body_state
+ nxt_aligned(64) =
+{
+ .ready_handler = nxt_router_conn_http_body_read,
+ .close_handler = nxt_router_conn_close,
+ .error_handler = nxt_router_conn_error,
+
+ .timer_handler = nxt_router_conn_timeout,
+ .timer_value = nxt_router_conn_timeout_value,
+ .timer_data = offsetof(nxt_socket_conf_t, body_read_timeout),
+ .timer_autoreset = 1,
+};
+
+
static void
nxt_router_conn_init(nxt_task_t *task, void *obj, void *data)
{
@@ -1844,7 +1888,7 @@ nxt_router_conn_init(nxt_task_t *task, void *obj, void *data)
c->read_work_queue = &engine->fast_work_queue;
c->write_work_queue = &engine->fast_work_queue;
- c->read_state = &nxt_router_conn_read_state;
+ c->read_state = &nxt_router_conn_read_header_state;
nxt_conn_read(engine, c);
}
@@ -1873,7 +1917,6 @@ nxt_router_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
rc = nxt_event_engine_request_find(engine, msg->port_msg.stream);
if (nxt_slow_path(rc == NULL)) {
-
nxt_debug(task, "request id %08uxD not found", msg->port_msg.stream);
return;
@@ -1910,14 +1953,18 @@ nxt_router_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
rc->app_port = NULL;
}
+
+ rc->conn = NULL;
}
if (b == NULL) {
return;
}
- /* Disable instant buffer completion/re-using by port. */
- msg->buf = NULL;
+ if (msg->buf == b) {
+ /* Disable instant buffer completion/re-using by port. */
+ msg->buf = NULL;
+ }
if (c->write == NULL) {
c->write = b;
@@ -1938,6 +1985,9 @@ nxt_router_text_by_code(int code)
case 400: return "Bad request";
case 404: return "Not found";
case 403: return "Forbidden";
+ case 408: return "Request Timeout";
+ case 411: return "Length Required";
+ case 413: return "Request Entity Too Large";
case 500:
default: return "Internal server error";
}
@@ -1965,6 +2015,7 @@ nxt_router_get_error_buf(nxt_task_t *task, nxt_mp_t *mp, int code,
msg = (const char *) b->mem.free;
b->mem.free = nxt_vsprintf(b->mem.free, b->mem.end, fmt, args);
+ b->mem.free[0] = '\0';
nxt_log_alert(task->log, "error %d: %s", code, msg);
@@ -1996,6 +2047,11 @@ nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code,
b = nxt_router_get_error_buf(task, c->mem_pool, code, fmt, args);
va_end(args);
+ if (c->socket.data != NULL) {
+ nxt_mp_free(c->mem_pool, c->socket.data);
+ c->socket.data = NULL;
+ }
+
if (c->write == NULL) {
c->write = b;
c->write_state = &nxt_router_conn_write_state;
@@ -2345,22 +2401,24 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
static void
nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data)
{
- size_t size, preread;
+ size_t size;
nxt_int_t ret;
- nxt_buf_t *b;
+ nxt_buf_t *buf;
nxt_conn_t *c;
nxt_app_parse_ctx_t *ap;
+ nxt_app_request_body_t *b;
nxt_socket_conf_joint_t *joint;
nxt_app_request_header_t *h;
c = obj;
ap = data;
- b = c->read;
+ buf = c->read;
+ joint = c->listen->socket.data;
nxt_debug(task, "router conn http header parse");
if (ap == NULL) {
- ap = nxt_mp_zget(c->mem_pool, sizeof(nxt_app_parse_ctx_t));
+ ap = nxt_mp_zalloc(c->mem_pool, sizeof(nxt_app_parse_ctx_t));
if (nxt_slow_path(ap == NULL)) {
nxt_router_conn_close(task, c, data);
return;
@@ -2376,78 +2434,157 @@ nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data)
ap->r.remote.start = nxt_sockaddr_address(c->remote);
ap->r.remote.length = c->remote->address_length;
+
+ ap->r.header.buf = buf;
}
h = &ap->r.header;
+ b = &ap->r.body;
- ret = nxt_app_http_req_parse(task, ap, b);
+ ret = nxt_app_http_req_header_parse(task, ap, buf);
- nxt_debug(task, "http parse request: %d", ret);
+ nxt_debug(task, "http parse request header: %d", ret);
switch (nxt_expect(NXT_DONE, ret)) {
case NXT_DONE:
- preread = nxt_buf_mem_used_size(&b->mem);
-
nxt_debug(task, "router request header parsing complete, "
"content length: %O, preread: %uz",
- h->parsed_content_length, preread);
+ h->parsed_content_length, nxt_buf_mem_used_size(&buf->mem));
- nxt_router_process_http_request(task, c, ap);
- return;
+ if (b->done) {
+ nxt_router_process_http_request(task, c, ap);
+
+ return;
+ }
+
+ if (joint->socket_conf->max_body_size > 0 &&
+ (size_t) h->parsed_content_length >
+ joint->socket_conf->max_body_size) {
+
+ nxt_router_gen_error(task, c, 413, "Content-Length too big");
+ return;
+ }
+
+ if (nxt_buf_mem_free_size(&buf->mem) == 0) {
+ size = nxt_min(joint->socket_conf->body_buffer_size,
+ (size_t) h->parsed_content_length);
+
+ buf->next = nxt_buf_mem_alloc(c->mem_pool, size, 0);
+ if (nxt_slow_path(buf->next == NULL)) {
+ nxt_router_gen_error(task, c, 500, "Failed to allocate "
+ "buffer for request body");
+ return;
+ }
+
+ c->read = buf->next;
+
+ b->preread_size += nxt_buf_mem_used_size(&buf->mem);
+ }
+
+ if (b->buf == NULL) {
+ b->buf = c->read;
+ }
+
+ c->read_state = &nxt_router_conn_read_body_state;
+ break;
case NXT_ERROR:
- nxt_router_conn_close(task, c, data);
+ nxt_router_gen_error(task, c, 400, "Request header parse error");
return;
default: /* NXT_AGAIN */
- if (h->done == 0) {
+ if (c->read->mem.free == c->read->mem.end) {
+ size = joint->socket_conf->large_header_buffer_size;
- if (c->read->mem.free == c->read->mem.end) {
- joint = c->listen->socket.data;
- size = joint->socket_conf->large_header_buffer_size;
+ if (size <= (size_t) nxt_buf_mem_used_size(&buf->mem) ||
+ ap->r.header.bufs >= joint->socket_conf->large_header_buffers) {
+ nxt_router_gen_error(task, c, 413,
+ "Too long request headers");
+ return;
+ }
- if (size > (size_t) nxt_buf_mem_size(&b->mem)) {
- b = nxt_buf_mem_alloc(c->mem_pool, size, 0);
- if (nxt_slow_path(b == NULL)) {
- nxt_router_conn_close(task, c, data);
- return;
- }
+ buf->next = nxt_buf_mem_alloc(c->mem_pool, size, 0);
+ if (nxt_slow_path(buf->next == NULL)) {
+ nxt_router_gen_error(task, c, 500,
+ "Failed to allocate large header "
+ "buffer");
+ return;
+ }
- size = c->read->mem.free - c->read->mem.pos;
+ ap->r.header.bufs++;
- c->read = nxt_buf_cpy(b, c->read->mem.pos, size);
- } else {
- nxt_router_gen_error(task, c, 400,
- "Too long request headers");
- return;
- }
- }
+ size = c->read->mem.free - c->read->mem.pos;
+
+ c->read = nxt_buf_cpy(buf->next, c->read->mem.pos, size);
}
- if (ap->r.body.done == 0) {
+ }
+
+ nxt_conn_read(task->thread->engine, c);
+}
- preread = nxt_buf_mem_used_size(&b->mem);
- if (h->parsed_content_length - preread >
- (size_t) nxt_buf_mem_free_size(&b->mem)) {
+static void
+nxt_router_conn_http_body_read(nxt_task_t *task, void *obj, void *data)
+{
+ size_t size;
+ nxt_int_t ret;
+ nxt_buf_t *buf;
+ nxt_conn_t *c;
+ nxt_app_parse_ctx_t *ap;
+ nxt_app_request_body_t *b;
+ nxt_socket_conf_joint_t *joint;
+ nxt_app_request_header_t *h;
- b = nxt_buf_mem_alloc(c->mem_pool, h->parsed_content_length, 0);
- if (nxt_slow_path(b == NULL)) {
- nxt_router_gen_error(task, c, 500, "Failed to allocate "
- "buffer for request body");
- return;
- }
+ c = obj;
+ ap = data;
+ buf = c->read;
- c->read = nxt_buf_cpy(b, c->read->mem.pos, preread);
- }
+ nxt_debug(task, "router conn http body read");
- nxt_debug(task, "router request body read again, rest: %uz",
- h->parsed_content_length - preread);
+ nxt_assert(ap != NULL);
+ b = &ap->r.body;
+ h = &ap->r.header;
+
+ ret = nxt_app_http_req_body_read(task, ap, buf);
+
+ nxt_debug(task, "http read request body: %d", ret);
+
+ switch (nxt_expect(NXT_DONE, ret)) {
+
+ case NXT_DONE:
+ nxt_router_process_http_request(task, c, ap);
+ return;
+
+ case NXT_ERROR:
+ nxt_router_gen_error(task, c, 500, "Read body error");
+ return;
+
+ default: /* NXT_AGAIN */
+
+ if (nxt_buf_mem_free_size(&buf->mem) == 0) {
+ joint = c->listen->socket.data;
+
+ b->preread_size += nxt_buf_mem_used_size(&buf->mem);
+
+ size = nxt_min(joint->socket_conf->body_buffer_size,
+ (size_t) h->parsed_content_length - b->preread_size);
+
+ buf->next = nxt_buf_mem_alloc(c->mem_pool, size, 0);
+ if (nxt_slow_path(buf->next == NULL)) {
+ nxt_router_gen_error(task, c, 500, "Failed to allocate "
+ "buffer for request body");
+ return;
+ }
+
+ c->read = buf->next;
}
+ nxt_debug(task, "router request body read again, rest: %uz",
+ h->parsed_content_length - b->preread_size);
}
nxt_conn_read(task->thread->engine, c);
@@ -2725,9 +2862,12 @@ nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data)
c = nxt_read_timer_conn(timer);
- c->write_state = &nxt_router_conn_close_state;
+ if (c->read_state == &nxt_router_conn_read_header_state) {
+ nxt_router_gen_error(task, c, 408, "Read header timeout");
- nxt_conn_close(task->thread->engine, c);
+ } else {
+ nxt_router_gen_error(task, c, 408, "Read body timeout");
+ }
}
diff --git a/src/nxt_router.h b/src/nxt_router.h
index 201b786c..2a8a30e1 100644
--- a/src/nxt_router.h
+++ b/src/nxt_router.h
@@ -106,7 +106,11 @@ typedef struct {
size_t header_buffer_size;
size_t large_header_buffer_size;
+ size_t large_header_buffers;
+ size_t body_buffer_size;
+ size_t max_body_size;
nxt_msec_t header_read_timeout;
+ nxt_msec_t body_read_timeout;
} nxt_socket_conf_t;