summaryrefslogtreecommitdiffhomepage
path: root/src/nginext/nxt_go_run_ctx.c
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2017-08-11 18:04:04 +0300
committerMax Romanov <max.romanov@nginx.com>2017-08-11 18:04:04 +0300
commit39a6a4c973dd378f1fad9d2514d7857fe491df4b (patch)
tree220ea8605a9396a93fa7a459ff3e558fe1a044aa /src/nginext/nxt_go_run_ctx.c
parente1e808bd94609c80b4990939285d47f124bb2eef (diff)
downloadunit-39a6a4c973dd378f1fad9d2514d7857fe491df4b.tar.gz
unit-39a6a4c973dd378f1fad9d2514d7857fe491df4b.tar.bz2
Request body read state implemented.
With specific timeout and buffer size settings.
Diffstat (limited to 'src/nginext/nxt_go_run_ctx.c')
-rw-r--r--src/nginext/nxt_go_run_ctx.c185
1 files changed, 155 insertions, 30 deletions
diff --git a/src/nginext/nxt_go_run_ctx.c b/src/nginext/nxt_go_run_ctx.c
index c6481b95..cca8273e 100644
--- a/src/nginext/nxt_go_run_ctx.c
+++ b/src/nginext/nxt_go_run_ctx.c
@@ -186,7 +186,7 @@ nxt_go_ctx_add_msg(nxt_go_run_ctx_t *ctx, nxt_port_msg_t *port_msg, size_t size)
msg->start_offset = ctx->msg_last->start_offset;
if (ctx->msg_last == &ctx->msg) {
- msg->start_offset += ctx->r.body.preread.length;
+ msg->start_offset += ctx->r.body.preread_size;
} else {
msg->start_offset += ctx->msg_last->data_size;
}
@@ -227,8 +227,8 @@ nxt_go_ctx_flush(nxt_go_run_ctx_t *ctx, int last)
}
-nxt_int_t
-nxt_go_ctx_write(nxt_go_run_ctx_t *ctx, void *data, size_t len)
+nxt_buf_t *
+nxt_go_port_mmap_get_buf(nxt_go_run_ctx_t *ctx, size_t size)
{
size_t nchunks;
nxt_buf_t *buf;
@@ -237,30 +237,16 @@ nxt_go_ctx_write(nxt_go_run_ctx_t *ctx, void *data, size_t len)
nxt_port_mmap_msg_t *mmap_msg;
nxt_port_mmap_header_t *hdr;
- buf = &ctx->wbuf;
-
- if (ctx->nwbuf > 0 && nxt_buf_mem_free_size(&buf->mem) >= len) {
- memcpy(buf->mem.free, data, len);
- buf->mem.free += len;
-
- mmap_msg = ctx->wmmap_msg + ctx->nwbuf - 1;
- mmap_msg->size += len;
-
- return NXT_OK;
- }
-
- if (ctx->nwbuf >= 8) {
- nxt_go_ctx_flush(ctx, 0);
- }
-
c = 0;
+ buf = &ctx->wbuf;
+
hdr = nxt_go_port_mmap_get(ctx->process,
ctx->msg.port_msg->reply_port, &c);
if (nxt_slow_path(hdr == NULL)) {
nxt_go_warn("failed to get port_mmap");
- return NXT_ERROR;
+ return NULL;
}
buf->mem.start = nxt_port_mmap_chunk_start(hdr, c);
@@ -268,12 +254,15 @@ nxt_go_ctx_write(nxt_go_run_ctx_t *ctx, void *data, size_t len)
buf->mem.free = buf->mem.start;
buf->mem.end = buf->mem.start + PORT_MMAP_CHUNK_SIZE;
+ buf->parent = hdr;
+
mmap_msg = ctx->wmmap_msg + ctx->nwbuf;
mmap_msg->mmap_id = hdr->id;
mmap_msg->chunk_id = c;
+ mmap_msg->size = 0;
- nchunks = len / PORT_MMAP_CHUNK_SIZE;
- if ((len % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) {
+ nchunks = size / PORT_MMAP_CHUNK_SIZE;
+ if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) {
nchunks++;
}
@@ -283,27 +272,124 @@ nxt_go_ctx_write(nxt_go_run_ctx_t *ctx, void *data, size_t len)
/* Try to acquire as much chunks as required. */
while (nchunks > 0) {
- if (nxt_port_mmap_get_chunk_busy(hdr, c)) {
+ if (nxt_port_mmap_chk_set_chunk_busy(hdr, c) == 0) {
break;
}
- nxt_port_mmap_set_chunk_busy(hdr, c);
buf->mem.end += PORT_MMAP_CHUNK_SIZE;
c++;
nchunks--;
}
- if (nxt_buf_mem_free_size(&buf->mem) < len) {
- len = nxt_buf_mem_free_size(&buf->mem);
+ ctx->nwbuf++;
+
+ return buf;
+}
+
+nxt_int_t
+nxt_go_port_mmap_increase_buf(nxt_buf_t *b, size_t size, size_t min_size)
+{
+ size_t nchunks, free_size;
+ nxt_chunk_id_t c, start;
+ nxt_port_mmap_header_t *hdr;
+
+ free_size = nxt_buf_mem_free_size(&b->mem);
+
+ if (nxt_slow_path(size <= free_size)) {
+ return NXT_OK;
}
- memcpy(buf->mem.free, data, len);
- buf->mem.free += len;
+ hdr = b->parent;
- mmap_msg->size = len;
+ start = nxt_port_mmap_chunk_id(hdr, b->mem.end);
- ctx->nwbuf++;
+ size -= free_size;
+
+ nchunks = size / PORT_MMAP_CHUNK_SIZE;
+ if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) {
+ nchunks++;
+ }
+
+ c = start;
+
+ /* Try to acquire as much chunks as required. */
+ while (nchunks > 0) {
+
+ if (nxt_port_mmap_chk_set_chunk_busy(hdr, c) == 0) {
+ break;
+ }
+
+ c++;
+ nchunks--;
+ }
+
+ if (nchunks != 0 &&
+ min_size > free_size + PORT_MMAP_CHUNK_SIZE * (c - start)) {
+
+ c--;
+ while (c >= start) {
+ nxt_port_mmap_set_chunk_free(hdr, c);
+ c--;
+ }
+
+ return NXT_ERROR;
+ } else {
+ b->mem.end += PORT_MMAP_CHUNK_SIZE * (c - start);
+
+ return NXT_OK;
+ }
+}
+
+
+nxt_int_t
+nxt_go_ctx_write(nxt_go_run_ctx_t *ctx, void *data, size_t len)
+{
+ size_t free_size, copy_size;
+ nxt_buf_t *buf;
+ nxt_port_mmap_msg_t *mmap_msg;
+
+ buf = &ctx->wbuf;
+
+ while (len > 0) {
+ if (ctx->nwbuf == 0) {
+ buf = nxt_go_port_mmap_get_buf(ctx, len);
+
+ if (nxt_slow_path(buf == NULL)) {
+ return NXT_ERROR;
+ }
+ }
+
+ do {
+ free_size = nxt_buf_mem_free_size(&buf->mem);
+
+ if (free_size > 0) {
+ copy_size = nxt_min(free_size, len);
+
+ buf->mem.free = nxt_cpymem(buf->mem.free, data, copy_size);
+
+ mmap_msg = ctx->wmmap_msg + ctx->nwbuf - 1;
+ mmap_msg->size += copy_size;
+
+ len -= copy_size;
+ data = nxt_pointer_to(data, copy_size);
+
+ if (len == 0) {
+ return NXT_OK;
+ }
+ }
+ } while (nxt_go_port_mmap_increase_buf(buf, len, 1) == NXT_OK);
+
+ if (ctx->nwbuf >= 8) {
+ nxt_go_ctx_flush(ctx, 0);
+ }
+
+ buf = nxt_go_port_mmap_get_buf(ctx, len);
+
+ if (nxt_slow_path(buf == NULL)) {
+ return NXT_ERROR;
+ }
+ }
return NXT_OK;
}
@@ -403,4 +489,43 @@ nxt_go_ctx_read_str(nxt_go_run_ctx_t *ctx, nxt_str_t *str)
}
+size_t
+nxt_go_ctx_read_raw(nxt_go_run_ctx_t *ctx, void *dst, size_t size)
+{
+ size_t res, read_size;
+ nxt_int_t rc;
+ nxt_buf_t *buf;
+
+ res = 0;
+
+ while (size > 0) {
+ buf = &ctx->rbuf;
+
+ if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) == 0)) {
+ ctx->nrbuf++;
+ rc = nxt_go_ctx_init_rbuf(ctx);
+ if (nxt_slow_path(rc != NXT_OK)) {
+ nxt_go_warn("read raw: init rbuf failed");
+ return res;
+ }
+
+ continue;
+ }
+
+ read_size = nxt_buf_mem_used_size(&buf->mem);
+ read_size = nxt_min(read_size, size);
+
+ dst = nxt_cpymem(dst, buf->mem.pos, read_size);
+
+ size -= read_size;
+ buf->mem.pos += read_size;
+ res += read_size;
+ }
+
+ nxt_go_debug("read_raw: %d", (int) res);
+
+ return res;
+}
+
+
#endif /* NXT_CONFIGURE */