summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_application.c
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2017-08-11 18:04:04 +0300
committerMax Romanov <max.romanov@nginx.com>2017-08-11 18:04:04 +0300
commit39a6a4c973dd378f1fad9d2514d7857fe491df4b (patch)
tree220ea8605a9396a93fa7a459ff3e558fe1a044aa /src/nxt_application.c
parente1e808bd94609c80b4990939285d47f124bb2eef (diff)
downloadunit-39a6a4c973dd378f1fad9d2514d7857fe491df4b.tar.gz
unit-39a6a4c973dd378f1fad9d2514d7857fe491df4b.tar.bz2
Request body read state implemented.
With specific timeout and buffer size settings.
Diffstat (limited to 'src/nxt_application.c')
-rw-r--r--src/nxt_application.c170
1 files changed, 132 insertions, 38 deletions
diff --git a/src/nxt_application.c b/src/nxt_application.c
index 50feac2d..1b23eca3 100644
--- a/src/nxt_application.c
+++ b/src/nxt_application.c
@@ -154,7 +154,7 @@ nxt_app_msg_write_get_buf(nxt_task_t *task, nxt_app_wmsg_t *msg, size_t size)
return res;
}
- if (nxt_port_mmap_increase_buf(task, b, size) == NXT_OK) {
+ if (nxt_port_mmap_increase_buf(task, b, size, size) == NXT_OK) {
res = b->mem.free;
b->mem.free += size;
@@ -306,6 +306,43 @@ nxt_app_msg_read_str(nxt_task_t *task, nxt_app_rmsg_t *msg, nxt_str_t *str)
}
+size_t
+nxt_app_msg_read_raw(nxt_task_t *task, nxt_app_rmsg_t *msg, void *dst,
+ size_t size)
+{
+ size_t res, read_size;
+ nxt_buf_t *buf;
+
+ res = 0;
+
+ while (size > 0) {
+ buf = msg->buf;
+
+ if (nxt_slow_path(buf == NULL)) {
+ break;
+ }
+
+ if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) == 0)) {
+ msg->buf = buf->next;
+ continue;
+ }
+
+ read_size = nxt_buf_mem_used_size(&buf->mem);
+ read_size = nxt_min(read_size, size);
+
+ dst = nxt_cpymem(dst, buf->mem.pos, read_size);
+
+ size -= read_size;
+ buf->mem.pos += read_size;
+ res += read_size;
+ }
+
+ nxt_debug(task, "nxt_read_raw: %uz", res);
+
+ return res;
+}
+
+
nxt_int_t
nxt_app_msg_read_nvp(nxt_task_t *task, nxt_app_rmsg_t *rmsg, nxt_str_t *n,
nxt_str_t *v)
@@ -459,7 +496,7 @@ nxt_app_http_req_init(nxt_task_t *task, nxt_app_parse_ctx_t *ctx)
nxt_int_t
-nxt_app_http_req_parse(nxt_task_t *task, nxt_app_parse_ctx_t *ctx,
+nxt_app_http_req_header_parse(nxt_task_t *task, nxt_app_parse_ctx_t *ctx,
nxt_buf_t *buf)
{
nxt_int_t rc;
@@ -471,50 +508,76 @@ nxt_app_http_req_parse(nxt_task_t *task, nxt_app_parse_ctx_t *ctx,
b = &ctx->r.body;
h = &ctx->r.header;
- if (h->done == 0) {
- rc = nxt_http_parse_request(p, &buf->mem);
+ nxt_assert(h->done == 0);
- if (nxt_slow_path(rc != NXT_DONE)) {
- return rc;
- }
+ rc = nxt_http_parse_request(p, &buf->mem);
- rc = nxt_http_fields_process(p->fields, ctx, task->log);
+ if (nxt_slow_path(rc != NXT_DONE)) {
+ return rc;
+ }
- if (nxt_slow_path(rc != NXT_OK)) {
- return rc;
- }
+ rc = nxt_http_fields_process(p->fields, ctx, task->log);
- h->fields = p->fields;
- h->done = 1;
+ if (nxt_slow_path(rc != NXT_OK)) {
+ return rc;
+ }
- h->version.start = p->version.str;
- h->version.length = nxt_strlen(p->version.str);
+ h->fields = p->fields;
+ h->done = 1;
- h->method = p->method;
+ h->version.start = p->version.str;
+ h->version.length = nxt_strlen(p->version.str);
- h->target.start = p->target_start;
- h->target.length = p->target_end - p->target_start;
+ h->method = p->method;
- h->path = p->path;
- h->query = p->args;
+ h->target.start = p->target_start;
+ h->target.length = p->target_end - p->target_start;
- if (h->parsed_content_length == 0) {
- b->done = 1;
- }
- }
+ h->path = p->path;
+ h->query = p->args;
- if (b->done == 0) {
- b->preread.length = buf->mem.free - buf->mem.pos;
- b->preread.start = buf->mem.pos;
+ if (h->parsed_content_length == 0) {
+ b->done = 1;
- b->done = b->preread.length >= (size_t) h->parsed_content_length;
}
- if (h->done == 1 && b->done == 1) {
+ if (buf->mem.free == buf->mem.pos) {
return NXT_DONE;
}
- return NXT_AGAIN;
+ b->buf = buf;
+ b->done = nxt_buf_mem_used_size(&buf->mem) >=
+ h->parsed_content_length;
+
+ if (b->done == 1) {
+ b->preread_size = nxt_buf_mem_used_size(&buf->mem);
+ }
+
+ return NXT_DONE;
+}
+
+
+nxt_int_t
+nxt_app_http_req_body_read(nxt_task_t *task, nxt_app_parse_ctx_t *ctx,
+ nxt_buf_t *buf)
+{
+ nxt_app_request_body_t *b;
+ nxt_app_request_header_t *h;
+
+ b = &ctx->r.body;
+ h = &ctx->r.header;
+
+ nxt_assert(h->done == 1);
+ nxt_assert(b->done == 0);
+
+ b->done = nxt_buf_mem_used_size(&buf->mem) + b->preread_size >=
+ (size_t) h->parsed_content_length;
+
+ if (b->done == 1) {
+ b->preread_size += nxt_buf_mem_used_size(&buf->mem);
+ }
+
+ return b->done == 1 ? NXT_DONE : NXT_AGAIN;
}
@@ -571,17 +634,48 @@ nxt_int_t
nxt_app_msg_write_raw(nxt_task_t *task, nxt_app_wmsg_t *msg, const u_char *c,
size_t size)
{
- u_char *dst;
+ size_t free_size, copy_size;
+ nxt_buf_t *b;
+ nxt_port_t *port;
- dst = nxt_app_msg_write_get_buf(task, msg, size);
- if (nxt_slow_path(dst == NULL)) {
- return NXT_ERROR;
- }
+ nxt_debug(task, "nxt_app_msg_write_raw: %uz", size);
+
+ while (size > 0) {
+ b = *msg->buf;
+
+ if (b == NULL) {
+ port = nxt_app_msg_get_port(task, msg);
+ if (nxt_slow_path(port == NULL)) {
+ return NXT_ERROR;
+ }
+
+ b = nxt_port_mmap_get_buf(task, port, size);
+ if (nxt_slow_path(b == NULL)) {
+ return NXT_ERROR;
+ }
+
+ *msg->buf = b;
+ }
+
+ do {
+ free_size = nxt_buf_mem_free_size(&b->mem);
+
+ if (free_size > 0) {
+ copy_size = nxt_min(free_size, size);
- nxt_memcpy(dst, c, size);
+ b->mem.free = nxt_cpymem(b->mem.free, c, copy_size);
- nxt_debug(task, "nxt_app_msg_write_raw: %d %*s", (int)size,
- (int)size, c);
+ size -= copy_size;
+ c += copy_size;
+
+ if (size == 0) {
+ return NXT_OK;
+ }
+ }
+ } while (nxt_port_mmap_increase_buf(task, b, size, 1) == NXT_OK);
+
+ msg->buf = &b->next;
+ }
return NXT_OK;
}