diff options
author | Max Romanov <max.romanov@nginx.com> | 2017-08-11 18:04:04 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2017-08-11 18:04:04 +0300 |
commit | 39a6a4c973dd378f1fad9d2514d7857fe491df4b (patch) | |
tree | 220ea8605a9396a93fa7a459ff3e558fe1a044aa /src/nginext | |
parent | e1e808bd94609c80b4990939285d47f124bb2eef (diff) | |
download | unit-39a6a4c973dd378f1fad9d2514d7857fe491df4b.tar.gz unit-39a6a4c973dd378f1fad9d2514d7857fe491df4b.tar.bz2 |
Request body read state implemented.
With specific timeout and buffer size settings.
Diffstat (limited to 'src/nginext')
-rw-r--r-- | src/nginext/nxt_go_lib.c | 42 | ||||
-rw-r--r-- | src/nginext/nxt_go_lib.h | 7 | ||||
-rw-r--r-- | src/nginext/nxt_go_port.c | 7 | ||||
-rw-r--r-- | src/nginext/nxt_go_run_ctx.c | 185 | ||||
-rw-r--r-- | src/nginext/nxt_go_run_ctx.h | 2 | ||||
-rw-r--r-- | src/nginext/request.go | 6 |
6 files changed, 180 insertions, 69 deletions
diff --git a/src/nginext/nxt_go_lib.c b/src/nginext/nxt_go_lib.c index 87f583d7..6d5d2a03 100644 --- a/src/nginext/nxt_go_lib.c +++ b/src/nginext/nxt_go_lib.c @@ -17,14 +17,14 @@ nxt_go_response_write(nxt_go_request_t r, void *buf, size_t len) } int -nxt_go_request_read(nxt_go_request_t r, off_t off, void *dst, size_t dst_len) +nxt_go_request_read(nxt_go_request_t r, void *dst, size_t dst_len) { return -1; } int -nxt_go_request_read_from(nxt_go_request_t r, off_t off, void *dst, - size_t dst_len, void *src, size_t src_len) +nxt_go_request_read_from(nxt_go_request_t r, void *dst, size_t dst_len, + void *src, size_t src_len) { return -1; } @@ -71,7 +71,7 @@ nxt_go_response_write(nxt_go_request_t r, void *buf, size_t len) return 0; } - nxt_go_debug("write: %d %.*s", (int) len, (int) len, (char *) buf); + nxt_go_debug("write: %d", (int) len); ctx = (nxt_go_run_ctx_t *) r; rc = nxt_go_ctx_write(ctx, buf, len); @@ -81,44 +81,30 @@ nxt_go_response_write(nxt_go_request_t r, void *buf, size_t len) int -nxt_go_request_read(nxt_go_request_t r, off_t off, void *dst, size_t dst_len) +nxt_go_request_read(nxt_go_request_t r, void *dst, size_t dst_len) { - nxt_go_msg_t *msg; - nxt_go_run_ctx_t *ctx; - nxt_app_request_body_t *b; - nxt_app_request_header_t *h; + size_t res; + nxt_go_run_ctx_t *ctx; if (nxt_slow_path(r == 0)) { return 0; } ctx = (nxt_go_run_ctx_t *) r; - b = &ctx->r.body; - h = &ctx->r.header; - if (off >= h->parsed_content_length) { - return 0; - } + dst_len = nxt_min(dst_len, ctx->r.body.preread_size); - if (off < b->preread.length) { - dst_len = nxt_min(b->preread.length - off, dst_len); + res = nxt_go_ctx_read_raw(ctx, dst, dst_len); - if (dst_len != 0) { - nxt_memcpy(dst, b->preread.start + off, dst_len); - } + ctx->r.body.preread_size -= res; - return dst_len; - } - - /* TODO find msg to read */ - - return NXT_AGAIN; + return res; } int -nxt_go_request_read_from(nxt_go_request_t r, off_t off, void *dst, - size_t dst_len, void *src, size_t src_len) +nxt_go_request_read_from(nxt_go_request_t r, void *dst, size_t dst_len, + void *src, size_t src_len) { nxt_go_run_ctx_t *ctx; @@ -130,7 +116,7 @@ nxt_go_request_read_from(nxt_go_request_t r, off_t off, void *dst, nxt_go_ctx_add_msg(ctx, src, src_len); - return nxt_go_request_read(r, off, dst, dst_len); + return nxt_go_request_read(r, dst, dst_len); } diff --git a/src/nginext/nxt_go_lib.h b/src/nginext/nxt_go_lib.h index 220fc9b3..b3a86be9 100644 --- a/src/nginext/nxt_go_lib.h +++ b/src/nginext/nxt_go_lib.h @@ -21,11 +21,10 @@ typedef uintptr_t nxt_go_request_t; int nxt_go_response_write(nxt_go_request_t r, void *buf, size_t len); -int nxt_go_request_read(nxt_go_request_t r, off_t off, void *dst, - size_t dst_len); +int nxt_go_request_read(nxt_go_request_t r, void *dst, size_t dst_len); -int nxt_go_request_read_from(nxt_go_request_t r, off_t off, void *dst, - size_t dst_len, void *src, size_t src_len); +int nxt_go_request_read_from(nxt_go_request_t r, void *dst, size_t dst_len, + void *src, size_t src_len); int nxt_go_request_close(nxt_go_request_t r); diff --git a/src/nginext/nxt_go_port.c b/src/nginext/nxt_go_port.c index af50b860..fca3cf9a 100644 --- a/src/nginext/nxt_go_port.c +++ b/src/nginext/nxt_go_port.c @@ -83,22 +83,23 @@ nxt_go_data_handler(nxt_port_msg_t *port_msg, size_t size) do { rc = nxt_go_ctx_read_str(ctx, &n); - rc = nxt_go_ctx_read_str(ctx, &v); if (n.length == 0) { break; } + rc = nxt_go_ctx_read_str(ctx, &v); nxt_go_request_add_header(r, nxt_go_str(&n), nxt_go_str(&v)); } while(1); - ctx->r.body.preread = v; + nxt_go_ctx_read_size(ctx, &s); + ctx->r.body.preread_size = s; if (h->parsed_content_length > 0) { nxt_go_request_set_content_length(r, h->parsed_content_length); } - if (v.length < h->parsed_content_length) { + if (ctx->r.body.preread_size < h->parsed_content_length) { nxt_go_request_create_channel(r); } diff --git a/src/nginext/nxt_go_run_ctx.c b/src/nginext/nxt_go_run_ctx.c index c6481b95..cca8273e 100644 --- a/src/nginext/nxt_go_run_ctx.c +++ b/src/nginext/nxt_go_run_ctx.c @@ -186,7 +186,7 @@ nxt_go_ctx_add_msg(nxt_go_run_ctx_t *ctx, nxt_port_msg_t *port_msg, size_t size) msg->start_offset = ctx->msg_last->start_offset; if (ctx->msg_last == &ctx->msg) { - msg->start_offset += ctx->r.body.preread.length; + msg->start_offset += ctx->r.body.preread_size; } else { msg->start_offset += ctx->msg_last->data_size; } @@ -227,8 +227,8 @@ nxt_go_ctx_flush(nxt_go_run_ctx_t *ctx, int last) } -nxt_int_t -nxt_go_ctx_write(nxt_go_run_ctx_t *ctx, void *data, size_t len) +nxt_buf_t * +nxt_go_port_mmap_get_buf(nxt_go_run_ctx_t *ctx, size_t size) { size_t nchunks; nxt_buf_t *buf; @@ -237,30 +237,16 @@ nxt_go_ctx_write(nxt_go_run_ctx_t *ctx, void *data, size_t len) nxt_port_mmap_msg_t *mmap_msg; nxt_port_mmap_header_t *hdr; - buf = &ctx->wbuf; - - if (ctx->nwbuf > 0 && nxt_buf_mem_free_size(&buf->mem) >= len) { - memcpy(buf->mem.free, data, len); - buf->mem.free += len; - - mmap_msg = ctx->wmmap_msg + ctx->nwbuf - 1; - mmap_msg->size += len; - - return NXT_OK; - } - - if (ctx->nwbuf >= 8) { - nxt_go_ctx_flush(ctx, 0); - } - c = 0; + buf = &ctx->wbuf; + hdr = nxt_go_port_mmap_get(ctx->process, ctx->msg.port_msg->reply_port, &c); if (nxt_slow_path(hdr == NULL)) { nxt_go_warn("failed to get port_mmap"); - return NXT_ERROR; + return NULL; } buf->mem.start = nxt_port_mmap_chunk_start(hdr, c); @@ -268,12 +254,15 @@ nxt_go_ctx_write(nxt_go_run_ctx_t *ctx, void *data, size_t len) buf->mem.free = buf->mem.start; buf->mem.end = buf->mem.start + PORT_MMAP_CHUNK_SIZE; + buf->parent = hdr; + mmap_msg = ctx->wmmap_msg + ctx->nwbuf; mmap_msg->mmap_id = hdr->id; mmap_msg->chunk_id = c; + mmap_msg->size = 0; - nchunks = len / PORT_MMAP_CHUNK_SIZE; - if ((len % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) { + nchunks = size / PORT_MMAP_CHUNK_SIZE; + if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) { nchunks++; } @@ -283,27 +272,124 @@ nxt_go_ctx_write(nxt_go_run_ctx_t *ctx, void *data, size_t len) /* Try to acquire as much chunks as required. */ while (nchunks > 0) { - if (nxt_port_mmap_get_chunk_busy(hdr, c)) { + if (nxt_port_mmap_chk_set_chunk_busy(hdr, c) == 0) { break; } - nxt_port_mmap_set_chunk_busy(hdr, c); buf->mem.end += PORT_MMAP_CHUNK_SIZE; c++; nchunks--; } - if (nxt_buf_mem_free_size(&buf->mem) < len) { - len = nxt_buf_mem_free_size(&buf->mem); + ctx->nwbuf++; + + return buf; +} + +nxt_int_t +nxt_go_port_mmap_increase_buf(nxt_buf_t *b, size_t size, size_t min_size) +{ + size_t nchunks, free_size; + nxt_chunk_id_t c, start; + nxt_port_mmap_header_t *hdr; + + free_size = nxt_buf_mem_free_size(&b->mem); + + if (nxt_slow_path(size <= free_size)) { + return NXT_OK; } - memcpy(buf->mem.free, data, len); - buf->mem.free += len; + hdr = b->parent; - mmap_msg->size = len; + start = nxt_port_mmap_chunk_id(hdr, b->mem.end); - ctx->nwbuf++; + size -= free_size; + + nchunks = size / PORT_MMAP_CHUNK_SIZE; + if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) { + nchunks++; + } + + c = start; + + /* Try to acquire as much chunks as required. */ + while (nchunks > 0) { + + if (nxt_port_mmap_chk_set_chunk_busy(hdr, c) == 0) { + break; + } + + c++; + nchunks--; + } + + if (nchunks != 0 && + min_size > free_size + PORT_MMAP_CHUNK_SIZE * (c - start)) { + + c--; + while (c >= start) { + nxt_port_mmap_set_chunk_free(hdr, c); + c--; + } + + return NXT_ERROR; + } else { + b->mem.end += PORT_MMAP_CHUNK_SIZE * (c - start); + + return NXT_OK; + } +} + + +nxt_int_t +nxt_go_ctx_write(nxt_go_run_ctx_t *ctx, void *data, size_t len) +{ + size_t free_size, copy_size; + nxt_buf_t *buf; + nxt_port_mmap_msg_t *mmap_msg; + + buf = &ctx->wbuf; + + while (len > 0) { + if (ctx->nwbuf == 0) { + buf = nxt_go_port_mmap_get_buf(ctx, len); + + if (nxt_slow_path(buf == NULL)) { + return NXT_ERROR; + } + } + + do { + free_size = nxt_buf_mem_free_size(&buf->mem); + + if (free_size > 0) { + copy_size = nxt_min(free_size, len); + + buf->mem.free = nxt_cpymem(buf->mem.free, data, copy_size); + + mmap_msg = ctx->wmmap_msg + ctx->nwbuf - 1; + mmap_msg->size += copy_size; + + len -= copy_size; + data = nxt_pointer_to(data, copy_size); + + if (len == 0) { + return NXT_OK; + } + } + } while (nxt_go_port_mmap_increase_buf(buf, len, 1) == NXT_OK); + + if (ctx->nwbuf >= 8) { + nxt_go_ctx_flush(ctx, 0); + } + + buf = nxt_go_port_mmap_get_buf(ctx, len); + + if (nxt_slow_path(buf == NULL)) { + return NXT_ERROR; + } + } return NXT_OK; } @@ -403,4 +489,43 @@ nxt_go_ctx_read_str(nxt_go_run_ctx_t *ctx, nxt_str_t *str) } +size_t +nxt_go_ctx_read_raw(nxt_go_run_ctx_t *ctx, void *dst, size_t size) +{ + size_t res, read_size; + nxt_int_t rc; + nxt_buf_t *buf; + + res = 0; + + while (size > 0) { + buf = &ctx->rbuf; + + if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) == 0)) { + ctx->nrbuf++; + rc = nxt_go_ctx_init_rbuf(ctx); + if (nxt_slow_path(rc != NXT_OK)) { + nxt_go_warn("read raw: init rbuf failed"); + return res; + } + + continue; + } + + read_size = nxt_buf_mem_used_size(&buf->mem); + read_size = nxt_min(read_size, size); + + dst = nxt_cpymem(dst, buf->mem.pos, read_size); + + size -= read_size; + buf->mem.pos += read_size; + res += read_size; + } + + nxt_go_debug("read_raw: %d", (int) res); + + return res; +} + + #endif /* NXT_CONFIGURE */ diff --git a/src/nginext/nxt_go_run_ctx.h b/src/nginext/nxt_go_run_ctx.h index c7c3da15..4ead1df5 100644 --- a/src/nginext/nxt_go_run_ctx.h +++ b/src/nginext/nxt_go_run_ctx.h @@ -68,5 +68,7 @@ nxt_int_t nxt_go_ctx_read_size(nxt_go_run_ctx_t *ctx, size_t *size); nxt_int_t nxt_go_ctx_read_str(nxt_go_run_ctx_t *ctx, nxt_str_t *str); +size_t nxt_go_ctx_read_raw(nxt_go_run_ctx_t *ctx, void *dst, size_t size); + #endif /* _NXT_GO_RUN_CTX_H_INCLUDED_ */ diff --git a/src/nginext/request.go b/src/nginext/request.go index 8667a074..1679f1c7 100644 --- a/src/nginext/request.go +++ b/src/nginext/request.go @@ -21,7 +21,6 @@ type request struct { resp *response c_req C.nxt_go_request_t id C.uint32_t - read_pos C.off_t msgs []*cmsg ch chan *cmsg } @@ -29,18 +28,17 @@ type request struct { func (r *request) Read(p []byte) (n int, err error) { c := C.size_t(cap(p)) b := C.malloc(c) - res := C.nxt_go_request_read(r.c_req, r.read_pos, b, c) + res := C.nxt_go_request_read(r.c_req, b, c) if res == -2 /* NXT_AGAIN */ { m := <-r.ch - res = C.nxt_go_request_read_from(r.c_req, r.read_pos, b, c, m.buf.b, m.buf.s) + res = C.nxt_go_request_read_from(r.c_req, b, c, m.buf.b, m.buf.s) r.push(m) } if res > 0 { copy(p, C.GoBytes(b, res)) - r.read_pos += C.off_t(res) } C.free(b) |