summaryrefslogtreecommitdiffhomepage
path: root/src/nginext
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2017-08-11 18:04:04 +0300
committerMax Romanov <max.romanov@nginx.com>2017-08-11 18:04:04 +0300
commit39a6a4c973dd378f1fad9d2514d7857fe491df4b (patch)
tree220ea8605a9396a93fa7a459ff3e558fe1a044aa /src/nginext
parente1e808bd94609c80b4990939285d47f124bb2eef (diff)
downloadunit-39a6a4c973dd378f1fad9d2514d7857fe491df4b.tar.gz
unit-39a6a4c973dd378f1fad9d2514d7857fe491df4b.tar.bz2
Request body read state implemented.
With specific timeout and buffer size settings.
Diffstat (limited to '')
-rw-r--r--src/nginext/nxt_go_lib.c42
-rw-r--r--src/nginext/nxt_go_lib.h7
-rw-r--r--src/nginext/nxt_go_port.c7
-rw-r--r--src/nginext/nxt_go_run_ctx.c185
-rw-r--r--src/nginext/nxt_go_run_ctx.h2
-rw-r--r--src/nginext/request.go6
6 files changed, 180 insertions, 69 deletions
diff --git a/src/nginext/nxt_go_lib.c b/src/nginext/nxt_go_lib.c
index 87f583d7..6d5d2a03 100644
--- a/src/nginext/nxt_go_lib.c
+++ b/src/nginext/nxt_go_lib.c
@@ -17,14 +17,14 @@ nxt_go_response_write(nxt_go_request_t r, void *buf, size_t len)
}
int
-nxt_go_request_read(nxt_go_request_t r, off_t off, void *dst, size_t dst_len)
+nxt_go_request_read(nxt_go_request_t r, void *dst, size_t dst_len)
{
return -1;
}
int
-nxt_go_request_read_from(nxt_go_request_t r, off_t off, void *dst,
- size_t dst_len, void *src, size_t src_len)
+nxt_go_request_read_from(nxt_go_request_t r, void *dst, size_t dst_len,
+ void *src, size_t src_len)
{
return -1;
}
@@ -71,7 +71,7 @@ nxt_go_response_write(nxt_go_request_t r, void *buf, size_t len)
return 0;
}
- nxt_go_debug("write: %d %.*s", (int) len, (int) len, (char *) buf);
+ nxt_go_debug("write: %d", (int) len);
ctx = (nxt_go_run_ctx_t *) r;
rc = nxt_go_ctx_write(ctx, buf, len);
@@ -81,44 +81,30 @@ nxt_go_response_write(nxt_go_request_t r, void *buf, size_t len)
int
-nxt_go_request_read(nxt_go_request_t r, off_t off, void *dst, size_t dst_len)
+nxt_go_request_read(nxt_go_request_t r, void *dst, size_t dst_len)
{
- nxt_go_msg_t *msg;
- nxt_go_run_ctx_t *ctx;
- nxt_app_request_body_t *b;
- nxt_app_request_header_t *h;
+ size_t res;
+ nxt_go_run_ctx_t *ctx;
if (nxt_slow_path(r == 0)) {
return 0;
}
ctx = (nxt_go_run_ctx_t *) r;
- b = &ctx->r.body;
- h = &ctx->r.header;
- if (off >= h->parsed_content_length) {
- return 0;
- }
+ dst_len = nxt_min(dst_len, ctx->r.body.preread_size);
- if (off < b->preread.length) {
- dst_len = nxt_min(b->preread.length - off, dst_len);
+ res = nxt_go_ctx_read_raw(ctx, dst, dst_len);
- if (dst_len != 0) {
- nxt_memcpy(dst, b->preread.start + off, dst_len);
- }
+ ctx->r.body.preread_size -= res;
- return dst_len;
- }
-
- /* TODO find msg to read */
-
- return NXT_AGAIN;
+ return res;
}
int
-nxt_go_request_read_from(nxt_go_request_t r, off_t off, void *dst,
- size_t dst_len, void *src, size_t src_len)
+nxt_go_request_read_from(nxt_go_request_t r, void *dst, size_t dst_len,
+ void *src, size_t src_len)
{
nxt_go_run_ctx_t *ctx;
@@ -130,7 +116,7 @@ nxt_go_request_read_from(nxt_go_request_t r, off_t off, void *dst,
nxt_go_ctx_add_msg(ctx, src, src_len);
- return nxt_go_request_read(r, off, dst, dst_len);
+ return nxt_go_request_read(r, dst, dst_len);
}
diff --git a/src/nginext/nxt_go_lib.h b/src/nginext/nxt_go_lib.h
index 220fc9b3..b3a86be9 100644
--- a/src/nginext/nxt_go_lib.h
+++ b/src/nginext/nxt_go_lib.h
@@ -21,11 +21,10 @@ typedef uintptr_t nxt_go_request_t;
int nxt_go_response_write(nxt_go_request_t r, void *buf, size_t len);
-int nxt_go_request_read(nxt_go_request_t r, off_t off, void *dst,
- size_t dst_len);
+int nxt_go_request_read(nxt_go_request_t r, void *dst, size_t dst_len);
-int nxt_go_request_read_from(nxt_go_request_t r, off_t off, void *dst,
- size_t dst_len, void *src, size_t src_len);
+int nxt_go_request_read_from(nxt_go_request_t r, void *dst, size_t dst_len,
+ void *src, size_t src_len);
int nxt_go_request_close(nxt_go_request_t r);
diff --git a/src/nginext/nxt_go_port.c b/src/nginext/nxt_go_port.c
index af50b860..fca3cf9a 100644
--- a/src/nginext/nxt_go_port.c
+++ b/src/nginext/nxt_go_port.c
@@ -83,22 +83,23 @@ nxt_go_data_handler(nxt_port_msg_t *port_msg, size_t size)
do {
rc = nxt_go_ctx_read_str(ctx, &n);
- rc = nxt_go_ctx_read_str(ctx, &v);
if (n.length == 0) {
break;
}
+ rc = nxt_go_ctx_read_str(ctx, &v);
nxt_go_request_add_header(r, nxt_go_str(&n), nxt_go_str(&v));
} while(1);
- ctx->r.body.preread = v;
+ nxt_go_ctx_read_size(ctx, &s);
+ ctx->r.body.preread_size = s;
if (h->parsed_content_length > 0) {
nxt_go_request_set_content_length(r, h->parsed_content_length);
}
- if (v.length < h->parsed_content_length) {
+ if (ctx->r.body.preread_size < h->parsed_content_length) {
nxt_go_request_create_channel(r);
}
diff --git a/src/nginext/nxt_go_run_ctx.c b/src/nginext/nxt_go_run_ctx.c
index c6481b95..cca8273e 100644
--- a/src/nginext/nxt_go_run_ctx.c
+++ b/src/nginext/nxt_go_run_ctx.c
@@ -186,7 +186,7 @@ nxt_go_ctx_add_msg(nxt_go_run_ctx_t *ctx, nxt_port_msg_t *port_msg, size_t size)
msg->start_offset = ctx->msg_last->start_offset;
if (ctx->msg_last == &ctx->msg) {
- msg->start_offset += ctx->r.body.preread.length;
+ msg->start_offset += ctx->r.body.preread_size;
} else {
msg->start_offset += ctx->msg_last->data_size;
}
@@ -227,8 +227,8 @@ nxt_go_ctx_flush(nxt_go_run_ctx_t *ctx, int last)
}
-nxt_int_t
-nxt_go_ctx_write(nxt_go_run_ctx_t *ctx, void *data, size_t len)
+nxt_buf_t *
+nxt_go_port_mmap_get_buf(nxt_go_run_ctx_t *ctx, size_t size)
{
size_t nchunks;
nxt_buf_t *buf;
@@ -237,30 +237,16 @@ nxt_go_ctx_write(nxt_go_run_ctx_t *ctx, void *data, size_t len)
nxt_port_mmap_msg_t *mmap_msg;
nxt_port_mmap_header_t *hdr;
- buf = &ctx->wbuf;
-
- if (ctx->nwbuf > 0 && nxt_buf_mem_free_size(&buf->mem) >= len) {
- memcpy(buf->mem.free, data, len);
- buf->mem.free += len;
-
- mmap_msg = ctx->wmmap_msg + ctx->nwbuf - 1;
- mmap_msg->size += len;
-
- return NXT_OK;
- }
-
- if (ctx->nwbuf >= 8) {
- nxt_go_ctx_flush(ctx, 0);
- }
-
c = 0;
+ buf = &ctx->wbuf;
+
hdr = nxt_go_port_mmap_get(ctx->process,
ctx->msg.port_msg->reply_port, &c);
if (nxt_slow_path(hdr == NULL)) {
nxt_go_warn("failed to get port_mmap");
- return NXT_ERROR;
+ return NULL;
}
buf->mem.start = nxt_port_mmap_chunk_start(hdr, c);
@@ -268,12 +254,15 @@ nxt_go_ctx_write(nxt_go_run_ctx_t *ctx, void *data, size_t len)
buf->mem.free = buf->mem.start;
buf->mem.end = buf->mem.start + PORT_MMAP_CHUNK_SIZE;
+ buf->parent = hdr;
+
mmap_msg = ctx->wmmap_msg + ctx->nwbuf;
mmap_msg->mmap_id = hdr->id;
mmap_msg->chunk_id = c;
+ mmap_msg->size = 0;
- nchunks = len / PORT_MMAP_CHUNK_SIZE;
- if ((len % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) {
+ nchunks = size / PORT_MMAP_CHUNK_SIZE;
+ if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) {
nchunks++;
}
@@ -283,27 +272,124 @@ nxt_go_ctx_write(nxt_go_run_ctx_t *ctx, void *data, size_t len)
/* Try to acquire as much chunks as required. */
while (nchunks > 0) {
- if (nxt_port_mmap_get_chunk_busy(hdr, c)) {
+ if (nxt_port_mmap_chk_set_chunk_busy(hdr, c) == 0) {
break;
}
- nxt_port_mmap_set_chunk_busy(hdr, c);
buf->mem.end += PORT_MMAP_CHUNK_SIZE;
c++;
nchunks--;
}
- if (nxt_buf_mem_free_size(&buf->mem) < len) {
- len = nxt_buf_mem_free_size(&buf->mem);
+ ctx->nwbuf++;
+
+ return buf;
+}
+
+nxt_int_t
+nxt_go_port_mmap_increase_buf(nxt_buf_t *b, size_t size, size_t min_size)
+{
+ size_t nchunks, free_size;
+ nxt_chunk_id_t c, start;
+ nxt_port_mmap_header_t *hdr;
+
+ free_size = nxt_buf_mem_free_size(&b->mem);
+
+ if (nxt_slow_path(size <= free_size)) {
+ return NXT_OK;
}
- memcpy(buf->mem.free, data, len);
- buf->mem.free += len;
+ hdr = b->parent;
- mmap_msg->size = len;
+ start = nxt_port_mmap_chunk_id(hdr, b->mem.end);
- ctx->nwbuf++;
+ size -= free_size;
+
+ nchunks = size / PORT_MMAP_CHUNK_SIZE;
+ if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) {
+ nchunks++;
+ }
+
+ c = start;
+
+ /* Try to acquire as much chunks as required. */
+ while (nchunks > 0) {
+
+ if (nxt_port_mmap_chk_set_chunk_busy(hdr, c) == 0) {
+ break;
+ }
+
+ c++;
+ nchunks--;
+ }
+
+ if (nchunks != 0 &&
+ min_size > free_size + PORT_MMAP_CHUNK_SIZE * (c - start)) {
+
+ c--;
+ while (c >= start) {
+ nxt_port_mmap_set_chunk_free(hdr, c);
+ c--;
+ }
+
+ return NXT_ERROR;
+ } else {
+ b->mem.end += PORT_MMAP_CHUNK_SIZE * (c - start);
+
+ return NXT_OK;
+ }
+}
+
+
+nxt_int_t
+nxt_go_ctx_write(nxt_go_run_ctx_t *ctx, void *data, size_t len)
+{
+ size_t free_size, copy_size;
+ nxt_buf_t *buf;
+ nxt_port_mmap_msg_t *mmap_msg;
+
+ buf = &ctx->wbuf;
+
+ while (len > 0) {
+ if (ctx->nwbuf == 0) {
+ buf = nxt_go_port_mmap_get_buf(ctx, len);
+
+ if (nxt_slow_path(buf == NULL)) {
+ return NXT_ERROR;
+ }
+ }
+
+ do {
+ free_size = nxt_buf_mem_free_size(&buf->mem);
+
+ if (free_size > 0) {
+ copy_size = nxt_min(free_size, len);
+
+ buf->mem.free = nxt_cpymem(buf->mem.free, data, copy_size);
+
+ mmap_msg = ctx->wmmap_msg + ctx->nwbuf - 1;
+ mmap_msg->size += copy_size;
+
+ len -= copy_size;
+ data = nxt_pointer_to(data, copy_size);
+
+ if (len == 0) {
+ return NXT_OK;
+ }
+ }
+ } while (nxt_go_port_mmap_increase_buf(buf, len, 1) == NXT_OK);
+
+ if (ctx->nwbuf >= 8) {
+ nxt_go_ctx_flush(ctx, 0);
+ }
+
+ buf = nxt_go_port_mmap_get_buf(ctx, len);
+
+ if (nxt_slow_path(buf == NULL)) {
+ return NXT_ERROR;
+ }
+ }
return NXT_OK;
}
@@ -403,4 +489,43 @@ nxt_go_ctx_read_str(nxt_go_run_ctx_t *ctx, nxt_str_t *str)
}
+size_t
+nxt_go_ctx_read_raw(nxt_go_run_ctx_t *ctx, void *dst, size_t size)
+{
+ size_t res, read_size;
+ nxt_int_t rc;
+ nxt_buf_t *buf;
+
+ res = 0;
+
+ while (size > 0) {
+ buf = &ctx->rbuf;
+
+ if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) == 0)) {
+ ctx->nrbuf++;
+ rc = nxt_go_ctx_init_rbuf(ctx);
+ if (nxt_slow_path(rc != NXT_OK)) {
+ nxt_go_warn("read raw: init rbuf failed");
+ return res;
+ }
+
+ continue;
+ }
+
+ read_size = nxt_buf_mem_used_size(&buf->mem);
+ read_size = nxt_min(read_size, size);
+
+ dst = nxt_cpymem(dst, buf->mem.pos, read_size);
+
+ size -= read_size;
+ buf->mem.pos += read_size;
+ res += read_size;
+ }
+
+ nxt_go_debug("read_raw: %d", (int) res);
+
+ return res;
+}
+
+
#endif /* NXT_CONFIGURE */
diff --git a/src/nginext/nxt_go_run_ctx.h b/src/nginext/nxt_go_run_ctx.h
index c7c3da15..4ead1df5 100644
--- a/src/nginext/nxt_go_run_ctx.h
+++ b/src/nginext/nxt_go_run_ctx.h
@@ -68,5 +68,7 @@ nxt_int_t nxt_go_ctx_read_size(nxt_go_run_ctx_t *ctx, size_t *size);
nxt_int_t nxt_go_ctx_read_str(nxt_go_run_ctx_t *ctx, nxt_str_t *str);
+size_t nxt_go_ctx_read_raw(nxt_go_run_ctx_t *ctx, void *dst, size_t size);
+
#endif /* _NXT_GO_RUN_CTX_H_INCLUDED_ */
diff --git a/src/nginext/request.go b/src/nginext/request.go
index 8667a074..1679f1c7 100644
--- a/src/nginext/request.go
+++ b/src/nginext/request.go
@@ -21,7 +21,6 @@ type request struct {
resp *response
c_req C.nxt_go_request_t
id C.uint32_t
- read_pos C.off_t
msgs []*cmsg
ch chan *cmsg
}
@@ -29,18 +28,17 @@ type request struct {
func (r *request) Read(p []byte) (n int, err error) {
c := C.size_t(cap(p))
b := C.malloc(c)
- res := C.nxt_go_request_read(r.c_req, r.read_pos, b, c)
+ res := C.nxt_go_request_read(r.c_req, b, c)
if res == -2 /* NXT_AGAIN */ {
m := <-r.ch
- res = C.nxt_go_request_read_from(r.c_req, r.read_pos, b, c, m.buf.b, m.buf.s)
+ res = C.nxt_go_request_read_from(r.c_req, b, c, m.buf.b, m.buf.s)
r.push(m)
}
if res > 0 {
copy(p, C.GoBytes(b, res))
- r.read_pos += C.off_t(res)
}
C.free(b)