summaryrefslogtreecommitdiffhomepage
path: root/src/go/unit/nxt_go_run_ctx.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/go/unit/nxt_go_run_ctx.c')
-rw-r--r--src/go/unit/nxt_go_run_ctx.c80
1 files changed, 51 insertions, 29 deletions
diff --git a/src/go/unit/nxt_go_run_ctx.c b/src/go/unit/nxt_go_run_ctx.c
index c5e41ee8..6aa5db77 100644
--- a/src/go/unit/nxt_go_run_ctx.c
+++ b/src/go/unit/nxt_go_run_ctx.c
@@ -85,6 +85,7 @@ static void
nxt_go_ctx_init_msg(nxt_go_msg_t *msg, nxt_port_msg_t *port_msg,
size_t payload_size)
{
+ void *data, *end;
nxt_port_mmap_msg_t *mmap_msg;
memset(msg, 0, sizeof(nxt_go_msg_t));
@@ -92,9 +93,17 @@ nxt_go_ctx_init_msg(nxt_go_msg_t *msg, nxt_port_msg_t *port_msg,
msg->port_msg = port_msg;
msg->raw_size = payload_size;
+ data = port_msg + 1;
+ end = nxt_pointer_to(data, payload_size);
+
+ if (port_msg->tracking) {
+ msg->tracking = data;
+ data = msg->tracking + 1;
+ }
+
if (nxt_fast_path(port_msg->mmap != 0)) {
- msg->mmap_msg = (nxt_port_mmap_msg_t *) (port_msg + 1);
- msg->end = nxt_pointer_to(msg->mmap_msg, payload_size);
+ msg->mmap_msg = data;
+ msg->end = end;
mmap_msg = msg->mmap_msg;
while(mmap_msg < msg->end) {
@@ -134,7 +143,7 @@ nxt_go_ctx_release_msg(nxt_go_run_ctx_t *ctx, nxt_go_msg_t *msg)
e = b + mmap_msg->size;
while (b < e) {
- nxt_port_mmap_set_chunk_free(port_mmap->hdr, c);
+ nxt_port_mmap_set_chunk_free(port_mmap->hdr->free_map, c);
b += PORT_MMAP_CHUNK_SIZE;
c++;
@@ -149,6 +158,10 @@ nxt_int_t
nxt_go_ctx_init(nxt_go_run_ctx_t *ctx, nxt_port_msg_t *port_msg,
size_t payload_size)
{
+ nxt_atomic_t *val;
+ nxt_go_port_mmap_t *port_mmap;
+ nxt_port_mmap_tracking_msg_t *tracking;
+
memset(ctx, 0, sizeof(nxt_go_run_ctx_t));
ctx->process = nxt_go_get_process(port_msg->pid);
@@ -160,39 +173,45 @@ nxt_go_ctx_init(nxt_go_run_ctx_t *ctx, nxt_port_msg_t *port_msg,
nxt_go_ctx_init_msg(&ctx->msg, port_msg, payload_size);
- ctx->msg_last = &ctx->msg;
+ if (ctx->msg.tracking != NULL) {
+ tracking = ctx->msg.tracking;
- ctx->wport_msg.stream = port_msg->stream;
- ctx->wport_msg.pid = getpid();
- ctx->wport_msg.type = _NXT_PORT_MSG_DATA;
- ctx->wport_msg.mmap = 1;
+ if (nxt_slow_path(tracking->mmap_id >= ctx->process->incoming.nelts)) {
+ nxt_go_warn("incoming shared memory segment #%d not found "
+ "for process %d", (int) tracking->mmap_id,
+ (int) port_msg->pid);
- ctx->wmmap_msg = (nxt_port_mmap_msg_t *) ( &ctx->wport_msg + 1 );
+ return NXT_ERROR;
+ }
- return nxt_go_ctx_init_rbuf(ctx);
-}
+ nxt_go_mutex_lock(&ctx->process->incoming_mutex);
+ port_mmap = nxt_go_array_at(&ctx->process->incoming, tracking->mmap_id);
-void
-nxt_go_ctx_add_msg(nxt_go_run_ctx_t *ctx, nxt_port_msg_t *port_msg, size_t size)
-{
- nxt_go_msg_t *msg;
+ nxt_go_mutex_unlock(&ctx->process->incoming_mutex);
- msg = malloc(sizeof(nxt_go_msg_t));
+ val = port_mmap->hdr->tracking + tracking->tracking_id;
- nxt_go_ctx_init_msg(msg, port_msg, size - sizeof(nxt_port_msg_t));
+ ctx->cancelled = nxt_atomic_cmp_set(val, port_msg->stream, 0) == 0;
- msg->start_offset = ctx->msg_last->start_offset;
+ if (ctx->cancelled) {
+ nxt_port_mmap_set_chunk_free(port_mmap->hdr->free_tracking_map,
+ tracking->tracking_id);
- if (ctx->msg_last == &ctx->msg) {
- msg->start_offset += ctx->request.body.preread_size;
-
- } else {
- msg->start_offset += ctx->msg_last->data_size;
+ return NXT_OK;
+ }
}
- ctx->msg_last->next = msg;
- ctx->msg_last = msg;
+ ctx->msg_last = &ctx->msg;
+
+ ctx->wport_msg.stream = port_msg->stream;
+ ctx->wport_msg.pid = getpid();
+ ctx->wport_msg.type = _NXT_PORT_MSG_DATA;
+ ctx->wport_msg.mmap = 1;
+
+ ctx->wmmap_msg = (nxt_port_mmap_msg_t *) ( &ctx->wport_msg + 1 );
+
+ return nxt_go_ctx_init_rbuf(ctx);
}
@@ -243,7 +262,8 @@ nxt_go_port_mmap_get_buf(nxt_go_run_ctx_t *ctx, size_t size)
buf = &ctx->wbuf;
- hdr = nxt_go_port_mmap_get(ctx->process, ctx->msg.port_msg->reply_port, &c);
+ hdr = nxt_go_port_mmap_get(ctx->process, ctx->msg.port_msg->reply_port, &c,
+ 0);
if (nxt_slow_path(hdr == NULL)) {
nxt_go_warn("failed to get port_mmap");
@@ -273,7 +293,7 @@ nxt_go_port_mmap_get_buf(nxt_go_run_ctx_t *ctx, size_t size)
/* Try to acquire as much chunks as required. */
while (nchunks > 0) {
- if (nxt_port_mmap_chk_set_chunk_busy(hdr, c) == 0) {
+ if (nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, c) == 0) {
break;
}
@@ -317,7 +337,7 @@ nxt_go_port_mmap_increase_buf(nxt_buf_t *b, size_t size, size_t min_size)
/* Try to acquire as much chunks as required. */
while (nchunks > 0) {
- if (nxt_port_mmap_chk_set_chunk_busy(hdr, c) == 0) {
+ if (nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, c) == 0) {
break;
}
@@ -330,7 +350,7 @@ nxt_go_port_mmap_increase_buf(nxt_buf_t *b, size_t size, size_t min_size)
{
c--;
while (c >= start) {
- nxt_port_mmap_set_chunk_free(hdr, c);
+ nxt_port_mmap_set_chunk_free(hdr->free_map, c);
c--;
}
@@ -438,6 +458,7 @@ nxt_go_ctx_read_size_(nxt_go_run_ctx_t *ctx, size_t *size)
return NXT_OK;
}
+
nxt_int_t
nxt_go_ctx_read_size(nxt_go_run_ctx_t *ctx, size_t *size)
{
@@ -452,6 +473,7 @@ nxt_go_ctx_read_size(nxt_go_run_ctx_t *ctx, size_t *size)
return rc;
}
+
nxt_int_t
nxt_go_ctx_read_str(nxt_go_run_ctx_t *ctx, nxt_str_t *str)
{