diff options
Diffstat (limited to 'src/go/unit')
-rw-r--r-- | src/go/unit/nxt_go_lib.c | 19 | ||||
-rw-r--r-- | src/go/unit/nxt_go_lib.h | 3 | ||||
-rw-r--r-- | src/go/unit/nxt_go_port.c | 14 | ||||
-rw-r--r-- | src/go/unit/nxt_go_port_memory.c | 37 | ||||
-rw-r--r-- | src/go/unit/nxt_go_port_memory.h | 2 | ||||
-rw-r--r-- | src/go/unit/nxt_go_run_ctx.c | 80 | ||||
-rw-r--r-- | src/go/unit/nxt_go_run_ctx.h | 8 |
7 files changed, 91 insertions, 72 deletions
diff --git a/src/go/unit/nxt_go_lib.c b/src/go/unit/nxt_go_lib.c index 4c6b9ac2..4455e783 100644 --- a/src/go/unit/nxt_go_lib.c +++ b/src/go/unit/nxt_go_lib.c @@ -54,24 +54,6 @@ nxt_go_request_read(nxt_go_request_t r, uintptr_t dst, size_t dst_len) int -nxt_go_request_read_from(nxt_go_request_t r, uintptr_t dst, size_t dst_len, - uintptr_t src, size_t src_len) -{ - nxt_go_run_ctx_t *ctx; - - if (nxt_slow_path(r == 0)) { - return 0; - } - - ctx = (nxt_go_run_ctx_t *) r; - - nxt_go_ctx_add_msg(ctx, (void *) src, src_len); - - return nxt_go_request_read(r, dst, dst_len); -} - - -int nxt_go_request_close(nxt_go_request_t r) { return 0; @@ -124,6 +106,7 @@ nxt_go_ready(uint32_t stream) port_msg.mmap = 0; port_msg.nf = 0; port_msg.mf = 0; + port_msg.tracking = 0; nxt_go_main_send(&port_msg, sizeof(port_msg), NULL, 0); } diff --git a/src/go/unit/nxt_go_lib.h b/src/go/unit/nxt_go_lib.h index aecdd4af..60ac09f6 100644 --- a/src/go/unit/nxt_go_lib.h +++ b/src/go/unit/nxt_go_lib.h @@ -23,9 +23,6 @@ int nxt_go_response_write(nxt_go_request_t r, uintptr_t buf, size_t len); int nxt_go_request_read(nxt_go_request_t r, uintptr_t dst, size_t dst_len); -int nxt_go_request_read_from(nxt_go_request_t r, uintptr_t dst, size_t dst_len, - uintptr_t src, size_t src_len); - int nxt_go_request_close(nxt_go_request_t r); int nxt_go_request_done(nxt_go_request_t r); diff --git a/src/go/unit/nxt_go_port.c b/src/go/unit/nxt_go_port.c index cbf12ab2..f2f1fc5a 100644 --- a/src/go/unit/nxt_go_port.c +++ b/src/go/unit/nxt_go_port.c @@ -29,10 +29,18 @@ nxt_go_data_handler(nxt_port_msg_t *port_msg, size_t size) ctx = malloc(sizeof(nxt_go_run_ctx_t) + size); - memcpy(ctx + 1, port_msg, size); - port_msg = (nxt_port_msg_t *) (ctx + 1); + memcpy(ctx->port_msg, port_msg, size); + port_msg = ctx->port_msg; - nxt_go_ctx_init(ctx, port_msg, size - sizeof(nxt_port_msg_t)); + size -= sizeof(nxt_port_msg_t); + + nxt_go_ctx_init(ctx, port_msg, size); + + if (nxt_slow_path(ctx->cancelled)) { + nxt_go_debug("request already cancelled by router"); + free(ctx); + return 0; + } r = (nxt_go_request_t)(ctx); h = &ctx->request.header; diff --git a/src/go/unit/nxt_go_port_memory.c b/src/go/unit/nxt_go_port_memory.c index e1b732d2..a919d6cd 100644 --- a/src/go/unit/nxt_go_port_memory.c +++ b/src/go/unit/nxt_go_port_memory.c @@ -23,12 +23,14 @@ static nxt_port_mmap_header_t * -nxt_go_new_port_mmap(nxt_go_process_t *process, nxt_port_id_t id) +nxt_go_new_port_mmap(nxt_go_process_t *process, nxt_port_id_t id, + nxt_bool_t tracking) { int name_len, rc; void *mem; char name[64]; nxt_fd_t fd; + nxt_free_map_t *free_map; nxt_port_msg_t port_msg; nxt_go_port_mmap_t *port_mmap; nxt_port_mmap_header_t *hdr; @@ -101,6 +103,7 @@ nxt_go_new_port_mmap(nxt_go_process_t *process, nxt_port_id_t id) hdr = port_mmap->hdr; memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map)); + memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map)); hdr->id = process->outgoing.nelts - 1; hdr->src_pid = getpid(); @@ -108,10 +111,13 @@ nxt_go_new_port_mmap(nxt_go_process_t *process, nxt_port_id_t id) hdr->sent_over = id; /* Mark first chunk as busy */ - nxt_port_mmap_set_chunk_busy(hdr, 0); + free_map = tracking ? hdr->free_tracking_map : hdr->free_map; + + nxt_port_mmap_set_chunk_busy(free_map, 0); /* Mark as busy chunk followed the last available chunk. */ - nxt_port_mmap_set_chunk_busy(hdr, PORT_MMAP_CHUNK_COUNT); + nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT); + nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT); port_msg.stream = 0; port_msg.pid = getpid(); @@ -121,6 +127,7 @@ nxt_go_new_port_mmap(nxt_go_process_t *process, nxt_port_id_t id) port_msg.mmap = 0; port_msg.nf = 0; port_msg.mf = 0; + port_msg.tracking = 0; cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int)); cmsg.cm.cmsg_level = SOL_SOCKET; @@ -160,34 +167,34 @@ remove_fail: nxt_port_mmap_header_t * nxt_go_port_mmap_get(nxt_go_process_t *process, nxt_port_id_t port_id, - nxt_chunk_id_t *c) + nxt_chunk_id_t *c, nxt_bool_t tracking) { + nxt_free_map_t *free_map; nxt_go_port_mmap_t *port_mmap; nxt_go_port_mmap_t *end_port_mmap; nxt_port_mmap_header_t *hdr; - port_mmap = NULL; - hdr = NULL; - nxt_go_mutex_lock(&process->outgoing_mutex); port_mmap = process->outgoing.elts; end_port_mmap = port_mmap + process->outgoing.nelts; - while (port_mmap < end_port_mmap) { + for (; port_mmap < end_port_mmap; port_mmap++) + { + hdr = port_mmap->hdr; - if ( (port_mmap->hdr->sent_over == 0xFFFFu || - port_mmap->hdr->sent_over == port_id) && - nxt_port_mmap_get_free_chunk(port_mmap->hdr, c)) { - hdr = port_mmap->hdr; + if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port_id) { + continue; + } + free_map = tracking ? hdr->free_tracking_map : hdr->free_map; + + if (nxt_port_mmap_get_free_chunk(free_map, c)) { goto unlock_return; } - - port_mmap++; } - hdr = nxt_go_new_port_mmap(process, port_id); + hdr = nxt_go_new_port_mmap(process, port_id, tracking); unlock_return: diff --git a/src/go/unit/nxt_go_port_memory.h b/src/go/unit/nxt_go_port_memory.h index d7e321c0..558eb68f 100644 --- a/src/go/unit/nxt_go_port_memory.h +++ b/src/go/unit/nxt_go_port_memory.h @@ -24,7 +24,7 @@ struct nxt_go_port_mmap_s { struct nxt_port_mmap_header_s * nxt_go_port_mmap_get(nxt_go_process_t *process, nxt_port_id_t port_id, - nxt_chunk_id_t *c); + nxt_chunk_id_t *c, nxt_bool_t tracking); #endif /* _NXT_GO_PORT_MEMORY_H_INCLUDED_ */ diff --git a/src/go/unit/nxt_go_run_ctx.c b/src/go/unit/nxt_go_run_ctx.c index c5e41ee8..6aa5db77 100644 --- a/src/go/unit/nxt_go_run_ctx.c +++ b/src/go/unit/nxt_go_run_ctx.c @@ -85,6 +85,7 @@ static void nxt_go_ctx_init_msg(nxt_go_msg_t *msg, nxt_port_msg_t *port_msg, size_t payload_size) { + void *data, *end; nxt_port_mmap_msg_t *mmap_msg; memset(msg, 0, sizeof(nxt_go_msg_t)); @@ -92,9 +93,17 @@ nxt_go_ctx_init_msg(nxt_go_msg_t *msg, nxt_port_msg_t *port_msg, msg->port_msg = port_msg; msg->raw_size = payload_size; + data = port_msg + 1; + end = nxt_pointer_to(data, payload_size); + + if (port_msg->tracking) { + msg->tracking = data; + data = msg->tracking + 1; + } + if (nxt_fast_path(port_msg->mmap != 0)) { - msg->mmap_msg = (nxt_port_mmap_msg_t *) (port_msg + 1); - msg->end = nxt_pointer_to(msg->mmap_msg, payload_size); + msg->mmap_msg = data; + msg->end = end; mmap_msg = msg->mmap_msg; while(mmap_msg < msg->end) { @@ -134,7 +143,7 @@ nxt_go_ctx_release_msg(nxt_go_run_ctx_t *ctx, nxt_go_msg_t *msg) e = b + mmap_msg->size; while (b < e) { - nxt_port_mmap_set_chunk_free(port_mmap->hdr, c); + nxt_port_mmap_set_chunk_free(port_mmap->hdr->free_map, c); b += PORT_MMAP_CHUNK_SIZE; c++; @@ -149,6 +158,10 @@ nxt_int_t nxt_go_ctx_init(nxt_go_run_ctx_t *ctx, nxt_port_msg_t *port_msg, size_t payload_size) { + nxt_atomic_t *val; + nxt_go_port_mmap_t *port_mmap; + nxt_port_mmap_tracking_msg_t *tracking; + memset(ctx, 0, sizeof(nxt_go_run_ctx_t)); ctx->process = nxt_go_get_process(port_msg->pid); @@ -160,39 +173,45 @@ nxt_go_ctx_init(nxt_go_run_ctx_t *ctx, nxt_port_msg_t *port_msg, nxt_go_ctx_init_msg(&ctx->msg, port_msg, payload_size); - ctx->msg_last = &ctx->msg; + if (ctx->msg.tracking != NULL) { + tracking = ctx->msg.tracking; - ctx->wport_msg.stream = port_msg->stream; - ctx->wport_msg.pid = getpid(); - ctx->wport_msg.type = _NXT_PORT_MSG_DATA; - ctx->wport_msg.mmap = 1; + if (nxt_slow_path(tracking->mmap_id >= ctx->process->incoming.nelts)) { + nxt_go_warn("incoming shared memory segment #%d not found " + "for process %d", (int) tracking->mmap_id, + (int) port_msg->pid); - ctx->wmmap_msg = (nxt_port_mmap_msg_t *) ( &ctx->wport_msg + 1 ); + return NXT_ERROR; + } - return nxt_go_ctx_init_rbuf(ctx); -} + nxt_go_mutex_lock(&ctx->process->incoming_mutex); + port_mmap = nxt_go_array_at(&ctx->process->incoming, tracking->mmap_id); -void -nxt_go_ctx_add_msg(nxt_go_run_ctx_t *ctx, nxt_port_msg_t *port_msg, size_t size) -{ - nxt_go_msg_t *msg; + nxt_go_mutex_unlock(&ctx->process->incoming_mutex); - msg = malloc(sizeof(nxt_go_msg_t)); + val = port_mmap->hdr->tracking + tracking->tracking_id; - nxt_go_ctx_init_msg(msg, port_msg, size - sizeof(nxt_port_msg_t)); + ctx->cancelled = nxt_atomic_cmp_set(val, port_msg->stream, 0) == 0; - msg->start_offset = ctx->msg_last->start_offset; + if (ctx->cancelled) { + nxt_port_mmap_set_chunk_free(port_mmap->hdr->free_tracking_map, + tracking->tracking_id); - if (ctx->msg_last == &ctx->msg) { - msg->start_offset += ctx->request.body.preread_size; - - } else { - msg->start_offset += ctx->msg_last->data_size; + return NXT_OK; + } } - ctx->msg_last->next = msg; - ctx->msg_last = msg; + ctx->msg_last = &ctx->msg; + + ctx->wport_msg.stream = port_msg->stream; + ctx->wport_msg.pid = getpid(); + ctx->wport_msg.type = _NXT_PORT_MSG_DATA; + ctx->wport_msg.mmap = 1; + + ctx->wmmap_msg = (nxt_port_mmap_msg_t *) ( &ctx->wport_msg + 1 ); + + return nxt_go_ctx_init_rbuf(ctx); } @@ -243,7 +262,8 @@ nxt_go_port_mmap_get_buf(nxt_go_run_ctx_t *ctx, size_t size) buf = &ctx->wbuf; - hdr = nxt_go_port_mmap_get(ctx->process, ctx->msg.port_msg->reply_port, &c); + hdr = nxt_go_port_mmap_get(ctx->process, ctx->msg.port_msg->reply_port, &c, + 0); if (nxt_slow_path(hdr == NULL)) { nxt_go_warn("failed to get port_mmap"); @@ -273,7 +293,7 @@ nxt_go_port_mmap_get_buf(nxt_go_run_ctx_t *ctx, size_t size) /* Try to acquire as much chunks as required. */ while (nchunks > 0) { - if (nxt_port_mmap_chk_set_chunk_busy(hdr, c) == 0) { + if (nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, c) == 0) { break; } @@ -317,7 +337,7 @@ nxt_go_port_mmap_increase_buf(nxt_buf_t *b, size_t size, size_t min_size) /* Try to acquire as much chunks as required. */ while (nchunks > 0) { - if (nxt_port_mmap_chk_set_chunk_busy(hdr, c) == 0) { + if (nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, c) == 0) { break; } @@ -330,7 +350,7 @@ nxt_go_port_mmap_increase_buf(nxt_buf_t *b, size_t size, size_t min_size) { c--; while (c >= start) { - nxt_port_mmap_set_chunk_free(hdr, c); + nxt_port_mmap_set_chunk_free(hdr->free_map, c); c--; } @@ -438,6 +458,7 @@ nxt_go_ctx_read_size_(nxt_go_run_ctx_t *ctx, size_t *size) return NXT_OK; } + nxt_int_t nxt_go_ctx_read_size(nxt_go_run_ctx_t *ctx, size_t *size) { @@ -452,6 +473,7 @@ nxt_go_ctx_read_size(nxt_go_run_ctx_t *ctx, size_t *size) return rc; } + nxt_int_t nxt_go_ctx_read_str(nxt_go_run_ctx_t *ctx, nxt_str_t *str) { diff --git a/src/go/unit/nxt_go_run_ctx.h b/src/go/unit/nxt_go_run_ctx.h index f35b48de..5c6775ab 100644 --- a/src/go/unit/nxt_go_run_ctx.h +++ b/src/go/unit/nxt_go_run_ctx.h @@ -29,6 +29,8 @@ struct nxt_go_msg_s { nxt_port_mmap_msg_t *mmap_msg; nxt_port_mmap_msg_t *end; + nxt_port_mmap_tracking_msg_t *tracking; + nxt_go_msg_t *next; }; @@ -38,6 +40,7 @@ typedef struct { nxt_go_process_t *process; nxt_port_mmap_msg_t *wmmap_msg; + nxt_bool_t cancelled; uint32_t nrbuf; nxt_buf_t rbuf; @@ -51,6 +54,8 @@ typedef struct { uintptr_t go_request; nxt_go_msg_t *msg_last; + + nxt_port_msg_t port_msg[]; } nxt_go_run_ctx_t; @@ -59,9 +64,6 @@ void nxt_go_ctx_release_msg(nxt_go_run_ctx_t *ctx, nxt_go_msg_t *msg); nxt_int_t nxt_go_ctx_init(nxt_go_run_ctx_t *ctx, nxt_port_msg_t *port_msg, size_t payload_size); -void nxt_go_ctx_add_msg(nxt_go_run_ctx_t *ctx, nxt_port_msg_t *port_msg, - size_t size); - nxt_int_t nxt_go_ctx_flush(nxt_go_run_ctx_t *ctx, int last); nxt_int_t nxt_go_ctx_write(nxt_go_run_ctx_t *ctx, void *data, size_t len); |