diff options
author | Max Romanov <max.romanov@nginx.com> | 2017-12-27 17:46:17 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2017-12-27 17:46:17 +0300 |
commit | 89c0f7c5db5003b8fd8df3e1babb0c802004bf4c (patch) | |
tree | 7ac164d6fe41fd76beb3d328e6fde4020f744b63 /src/go/unit/nxt_go_run_ctx.c | |
parent | 45d08d5145c63fd788f85d9e789314dcf093c99e (diff) | |
download | unit-89c0f7c5db5003b8fd8df3e1babb0c802004bf4c.tar.gz unit-89c0f7c5db5003b8fd8df3e1babb0c802004bf4c.tar.bz2 |
Implementing the ability to cancel request before worker starts processing it.
Diffstat (limited to '')
-rw-r--r-- | src/go/unit/nxt_go_run_ctx.c | 80 |
1 files changed, 51 insertions, 29 deletions
diff --git a/src/go/unit/nxt_go_run_ctx.c b/src/go/unit/nxt_go_run_ctx.c index c5e41ee8..6aa5db77 100644 --- a/src/go/unit/nxt_go_run_ctx.c +++ b/src/go/unit/nxt_go_run_ctx.c @@ -85,6 +85,7 @@ static void nxt_go_ctx_init_msg(nxt_go_msg_t *msg, nxt_port_msg_t *port_msg, size_t payload_size) { + void *data, *end; nxt_port_mmap_msg_t *mmap_msg; memset(msg, 0, sizeof(nxt_go_msg_t)); @@ -92,9 +93,17 @@ nxt_go_ctx_init_msg(nxt_go_msg_t *msg, nxt_port_msg_t *port_msg, msg->port_msg = port_msg; msg->raw_size = payload_size; + data = port_msg + 1; + end = nxt_pointer_to(data, payload_size); + + if (port_msg->tracking) { + msg->tracking = data; + data = msg->tracking + 1; + } + if (nxt_fast_path(port_msg->mmap != 0)) { - msg->mmap_msg = (nxt_port_mmap_msg_t *) (port_msg + 1); - msg->end = nxt_pointer_to(msg->mmap_msg, payload_size); + msg->mmap_msg = data; + msg->end = end; mmap_msg = msg->mmap_msg; while(mmap_msg < msg->end) { @@ -134,7 +143,7 @@ nxt_go_ctx_release_msg(nxt_go_run_ctx_t *ctx, nxt_go_msg_t *msg) e = b + mmap_msg->size; while (b < e) { - nxt_port_mmap_set_chunk_free(port_mmap->hdr, c); + nxt_port_mmap_set_chunk_free(port_mmap->hdr->free_map, c); b += PORT_MMAP_CHUNK_SIZE; c++; @@ -149,6 +158,10 @@ nxt_int_t nxt_go_ctx_init(nxt_go_run_ctx_t *ctx, nxt_port_msg_t *port_msg, size_t payload_size) { + nxt_atomic_t *val; + nxt_go_port_mmap_t *port_mmap; + nxt_port_mmap_tracking_msg_t *tracking; + memset(ctx, 0, sizeof(nxt_go_run_ctx_t)); ctx->process = nxt_go_get_process(port_msg->pid); @@ -160,39 +173,45 @@ nxt_go_ctx_init(nxt_go_run_ctx_t *ctx, nxt_port_msg_t *port_msg, nxt_go_ctx_init_msg(&ctx->msg, port_msg, payload_size); - ctx->msg_last = &ctx->msg; + if (ctx->msg.tracking != NULL) { + tracking = ctx->msg.tracking; - ctx->wport_msg.stream = port_msg->stream; - ctx->wport_msg.pid = getpid(); - ctx->wport_msg.type = _NXT_PORT_MSG_DATA; - ctx->wport_msg.mmap = 1; + if (nxt_slow_path(tracking->mmap_id >= ctx->process->incoming.nelts)) { + nxt_go_warn("incoming shared memory segment #%d not found " + "for process %d", (int) tracking->mmap_id, + (int) port_msg->pid); - ctx->wmmap_msg = (nxt_port_mmap_msg_t *) ( &ctx->wport_msg + 1 ); + return NXT_ERROR; + } - return nxt_go_ctx_init_rbuf(ctx); -} + nxt_go_mutex_lock(&ctx->process->incoming_mutex); + port_mmap = nxt_go_array_at(&ctx->process->incoming, tracking->mmap_id); -void -nxt_go_ctx_add_msg(nxt_go_run_ctx_t *ctx, nxt_port_msg_t *port_msg, size_t size) -{ - nxt_go_msg_t *msg; + nxt_go_mutex_unlock(&ctx->process->incoming_mutex); - msg = malloc(sizeof(nxt_go_msg_t)); + val = port_mmap->hdr->tracking + tracking->tracking_id; - nxt_go_ctx_init_msg(msg, port_msg, size - sizeof(nxt_port_msg_t)); + ctx->cancelled = nxt_atomic_cmp_set(val, port_msg->stream, 0) == 0; - msg->start_offset = ctx->msg_last->start_offset; + if (ctx->cancelled) { + nxt_port_mmap_set_chunk_free(port_mmap->hdr->free_tracking_map, + tracking->tracking_id); - if (ctx->msg_last == &ctx->msg) { - msg->start_offset += ctx->request.body.preread_size; - - } else { - msg->start_offset += ctx->msg_last->data_size; + return NXT_OK; + } } - ctx->msg_last->next = msg; - ctx->msg_last = msg; + ctx->msg_last = &ctx->msg; + + ctx->wport_msg.stream = port_msg->stream; + ctx->wport_msg.pid = getpid(); + ctx->wport_msg.type = _NXT_PORT_MSG_DATA; + ctx->wport_msg.mmap = 1; + + ctx->wmmap_msg = (nxt_port_mmap_msg_t *) ( &ctx->wport_msg + 1 ); + + return nxt_go_ctx_init_rbuf(ctx); } @@ -243,7 +262,8 @@ nxt_go_port_mmap_get_buf(nxt_go_run_ctx_t *ctx, size_t size) buf = &ctx->wbuf; - hdr = nxt_go_port_mmap_get(ctx->process, ctx->msg.port_msg->reply_port, &c); + hdr = nxt_go_port_mmap_get(ctx->process, ctx->msg.port_msg->reply_port, &c, + 0); if (nxt_slow_path(hdr == NULL)) { nxt_go_warn("failed to get port_mmap"); @@ -273,7 +293,7 @@ nxt_go_port_mmap_get_buf(nxt_go_run_ctx_t *ctx, size_t size) /* Try to acquire as much chunks as required. */ while (nchunks > 0) { - if (nxt_port_mmap_chk_set_chunk_busy(hdr, c) == 0) { + if (nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, c) == 0) { break; } @@ -317,7 +337,7 @@ nxt_go_port_mmap_increase_buf(nxt_buf_t *b, size_t size, size_t min_size) /* Try to acquire as much chunks as required. */ while (nchunks > 0) { - if (nxt_port_mmap_chk_set_chunk_busy(hdr, c) == 0) { + if (nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, c) == 0) { break; } @@ -330,7 +350,7 @@ nxt_go_port_mmap_increase_buf(nxt_buf_t *b, size_t size, size_t min_size) { c--; while (c >= start) { - nxt_port_mmap_set_chunk_free(hdr, c); + nxt_port_mmap_set_chunk_free(hdr->free_map, c); c--; } @@ -438,6 +458,7 @@ nxt_go_ctx_read_size_(nxt_go_run_ctx_t *ctx, size_t *size) return NXT_OK; } + nxt_int_t nxt_go_ctx_read_size(nxt_go_run_ctx_t *ctx, size_t *size) { @@ -452,6 +473,7 @@ nxt_go_ctx_read_size(nxt_go_run_ctx_t *ctx, size_t *size) return rc; } + nxt_int_t nxt_go_ctx_read_str(nxt_go_run_ctx_t *ctx, nxt_str_t *str) { |