diff options
Diffstat (limited to 'src/nginext/nxt_go_run_ctx.c')
-rw-r--r-- | src/nginext/nxt_go_run_ctx.c | 185 |
1 files changed, 155 insertions, 30 deletions
diff --git a/src/nginext/nxt_go_run_ctx.c b/src/nginext/nxt_go_run_ctx.c index c6481b95..cca8273e 100644 --- a/src/nginext/nxt_go_run_ctx.c +++ b/src/nginext/nxt_go_run_ctx.c @@ -186,7 +186,7 @@ nxt_go_ctx_add_msg(nxt_go_run_ctx_t *ctx, nxt_port_msg_t *port_msg, size_t size) msg->start_offset = ctx->msg_last->start_offset; if (ctx->msg_last == &ctx->msg) { - msg->start_offset += ctx->r.body.preread.length; + msg->start_offset += ctx->r.body.preread_size; } else { msg->start_offset += ctx->msg_last->data_size; } @@ -227,8 +227,8 @@ nxt_go_ctx_flush(nxt_go_run_ctx_t *ctx, int last) } -nxt_int_t -nxt_go_ctx_write(nxt_go_run_ctx_t *ctx, void *data, size_t len) +nxt_buf_t * +nxt_go_port_mmap_get_buf(nxt_go_run_ctx_t *ctx, size_t size) { size_t nchunks; nxt_buf_t *buf; @@ -237,30 +237,16 @@ nxt_go_ctx_write(nxt_go_run_ctx_t *ctx, void *data, size_t len) nxt_port_mmap_msg_t *mmap_msg; nxt_port_mmap_header_t *hdr; - buf = &ctx->wbuf; - - if (ctx->nwbuf > 0 && nxt_buf_mem_free_size(&buf->mem) >= len) { - memcpy(buf->mem.free, data, len); - buf->mem.free += len; - - mmap_msg = ctx->wmmap_msg + ctx->nwbuf - 1; - mmap_msg->size += len; - - return NXT_OK; - } - - if (ctx->nwbuf >= 8) { - nxt_go_ctx_flush(ctx, 0); - } - c = 0; + buf = &ctx->wbuf; + hdr = nxt_go_port_mmap_get(ctx->process, ctx->msg.port_msg->reply_port, &c); if (nxt_slow_path(hdr == NULL)) { nxt_go_warn("failed to get port_mmap"); - return NXT_ERROR; + return NULL; } buf->mem.start = nxt_port_mmap_chunk_start(hdr, c); @@ -268,12 +254,15 @@ nxt_go_ctx_write(nxt_go_run_ctx_t *ctx, void *data, size_t len) buf->mem.free = buf->mem.start; buf->mem.end = buf->mem.start + PORT_MMAP_CHUNK_SIZE; + buf->parent = hdr; + mmap_msg = ctx->wmmap_msg + ctx->nwbuf; mmap_msg->mmap_id = hdr->id; mmap_msg->chunk_id = c; + mmap_msg->size = 0; - nchunks = len / PORT_MMAP_CHUNK_SIZE; - if ((len % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) { + nchunks = size / PORT_MMAP_CHUNK_SIZE; + if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) { nchunks++; } @@ -283,27 +272,124 @@ nxt_go_ctx_write(nxt_go_run_ctx_t *ctx, void *data, size_t len) /* Try to acquire as much chunks as required. */ while (nchunks > 0) { - if (nxt_port_mmap_get_chunk_busy(hdr, c)) { + if (nxt_port_mmap_chk_set_chunk_busy(hdr, c) == 0) { break; } - nxt_port_mmap_set_chunk_busy(hdr, c); buf->mem.end += PORT_MMAP_CHUNK_SIZE; c++; nchunks--; } - if (nxt_buf_mem_free_size(&buf->mem) < len) { - len = nxt_buf_mem_free_size(&buf->mem); + ctx->nwbuf++; + + return buf; +} + +nxt_int_t +nxt_go_port_mmap_increase_buf(nxt_buf_t *b, size_t size, size_t min_size) +{ + size_t nchunks, free_size; + nxt_chunk_id_t c, start; + nxt_port_mmap_header_t *hdr; + + free_size = nxt_buf_mem_free_size(&b->mem); + + if (nxt_slow_path(size <= free_size)) { + return NXT_OK; } - memcpy(buf->mem.free, data, len); - buf->mem.free += len; + hdr = b->parent; - mmap_msg->size = len; + start = nxt_port_mmap_chunk_id(hdr, b->mem.end); - ctx->nwbuf++; + size -= free_size; + + nchunks = size / PORT_MMAP_CHUNK_SIZE; + if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) { + nchunks++; + } + + c = start; + + /* Try to acquire as much chunks as required. */ + while (nchunks > 0) { + + if (nxt_port_mmap_chk_set_chunk_busy(hdr, c) == 0) { + break; + } + + c++; + nchunks--; + } + + if (nchunks != 0 && + min_size > free_size + PORT_MMAP_CHUNK_SIZE * (c - start)) { + + c--; + while (c >= start) { + nxt_port_mmap_set_chunk_free(hdr, c); + c--; + } + + return NXT_ERROR; + } else { + b->mem.end += PORT_MMAP_CHUNK_SIZE * (c - start); + + return NXT_OK; + } +} + + +nxt_int_t +nxt_go_ctx_write(nxt_go_run_ctx_t *ctx, void *data, size_t len) +{ + size_t free_size, copy_size; + nxt_buf_t *buf; + nxt_port_mmap_msg_t *mmap_msg; + + buf = &ctx->wbuf; + + while (len > 0) { + if (ctx->nwbuf == 0) { + buf = nxt_go_port_mmap_get_buf(ctx, len); + + if (nxt_slow_path(buf == NULL)) { + return NXT_ERROR; + } + } + + do { + free_size = nxt_buf_mem_free_size(&buf->mem); + + if (free_size > 0) { + copy_size = nxt_min(free_size, len); + + buf->mem.free = nxt_cpymem(buf->mem.free, data, copy_size); + + mmap_msg = ctx->wmmap_msg + ctx->nwbuf - 1; + mmap_msg->size += copy_size; + + len -= copy_size; + data = nxt_pointer_to(data, copy_size); + + if (len == 0) { + return NXT_OK; + } + } + } while (nxt_go_port_mmap_increase_buf(buf, len, 1) == NXT_OK); + + if (ctx->nwbuf >= 8) { + nxt_go_ctx_flush(ctx, 0); + } + + buf = nxt_go_port_mmap_get_buf(ctx, len); + + if (nxt_slow_path(buf == NULL)) { + return NXT_ERROR; + } + } return NXT_OK; } @@ -403,4 +489,43 @@ nxt_go_ctx_read_str(nxt_go_run_ctx_t *ctx, nxt_str_t *str) } +size_t +nxt_go_ctx_read_raw(nxt_go_run_ctx_t *ctx, void *dst, size_t size) +{ + size_t res, read_size; + nxt_int_t rc; + nxt_buf_t *buf; + + res = 0; + + while (size > 0) { + buf = &ctx->rbuf; + + if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) == 0)) { + ctx->nrbuf++; + rc = nxt_go_ctx_init_rbuf(ctx); + if (nxt_slow_path(rc != NXT_OK)) { + nxt_go_warn("read raw: init rbuf failed"); + return res; + } + + continue; + } + + read_size = nxt_buf_mem_used_size(&buf->mem); + read_size = nxt_min(read_size, size); + + dst = nxt_cpymem(dst, buf->mem.pos, read_size); + + size -= read_size; + buf->mem.pos += read_size; + res += read_size; + } + + nxt_go_debug("read_raw: %d", (int) res); + + return res; +} + + #endif /* NXT_CONFIGURE */ |