diff options
author | Max Romanov <max.romanov@nginx.com> | 2017-12-27 17:46:17 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2017-12-27 17:46:17 +0300 |
commit | 89c0f7c5db5003b8fd8df3e1babb0c802004bf4c (patch) | |
tree | 7ac164d6fe41fd76beb3d328e6fde4020f744b63 | |
parent | 45d08d5145c63fd788f85d9e789314dcf093c99e (diff) | |
download | unit-89c0f7c5db5003b8fd8df3e1babb0c802004bf4c.tar.gz unit-89c0f7c5db5003b8fd8df3e1babb0c802004bf4c.tar.bz2 |
Implementing the ability to cancel request before worker starts processing it.
-rw-r--r-- | src/go/unit/nxt_go_lib.c | 19 | ||||
-rw-r--r-- | src/go/unit/nxt_go_lib.h | 3 | ||||
-rw-r--r-- | src/go/unit/nxt_go_port.c | 14 | ||||
-rw-r--r-- | src/go/unit/nxt_go_port_memory.c | 37 | ||||
-rw-r--r-- | src/go/unit/nxt_go_port_memory.h | 2 | ||||
-rw-r--r-- | src/go/unit/nxt_go_run_ctx.c | 80 | ||||
-rw-r--r-- | src/go/unit/nxt_go_run_ctx.h | 8 | ||||
-rw-r--r-- | src/nxt_application.c | 92 | ||||
-rw-r--r-- | src/nxt_port.h | 27 | ||||
-rw-r--r-- | src/nxt_port_memory.c | 211 | ||||
-rw-r--r-- | src/nxt_port_memory.h | 24 | ||||
-rw-r--r-- | src/nxt_port_memory_int.h | 44 | ||||
-rw-r--r-- | src/nxt_port_socket.c | 104 | ||||
-rw-r--r-- | src/nxt_router.c | 234 |
14 files changed, 542 insertions, 357 deletions
diff --git a/src/go/unit/nxt_go_lib.c b/src/go/unit/nxt_go_lib.c index 4c6b9ac2..4455e783 100644 --- a/src/go/unit/nxt_go_lib.c +++ b/src/go/unit/nxt_go_lib.c @@ -54,24 +54,6 @@ nxt_go_request_read(nxt_go_request_t r, uintptr_t dst, size_t dst_len) int -nxt_go_request_read_from(nxt_go_request_t r, uintptr_t dst, size_t dst_len, - uintptr_t src, size_t src_len) -{ - nxt_go_run_ctx_t *ctx; - - if (nxt_slow_path(r == 0)) { - return 0; - } - - ctx = (nxt_go_run_ctx_t *) r; - - nxt_go_ctx_add_msg(ctx, (void *) src, src_len); - - return nxt_go_request_read(r, dst, dst_len); -} - - -int nxt_go_request_close(nxt_go_request_t r) { return 0; @@ -124,6 +106,7 @@ nxt_go_ready(uint32_t stream) port_msg.mmap = 0; port_msg.nf = 0; port_msg.mf = 0; + port_msg.tracking = 0; nxt_go_main_send(&port_msg, sizeof(port_msg), NULL, 0); } diff --git a/src/go/unit/nxt_go_lib.h b/src/go/unit/nxt_go_lib.h index aecdd4af..60ac09f6 100644 --- a/src/go/unit/nxt_go_lib.h +++ b/src/go/unit/nxt_go_lib.h @@ -23,9 +23,6 @@ int nxt_go_response_write(nxt_go_request_t r, uintptr_t buf, size_t len); int nxt_go_request_read(nxt_go_request_t r, uintptr_t dst, size_t dst_len); -int nxt_go_request_read_from(nxt_go_request_t r, uintptr_t dst, size_t dst_len, - uintptr_t src, size_t src_len); - int nxt_go_request_close(nxt_go_request_t r); int nxt_go_request_done(nxt_go_request_t r); diff --git a/src/go/unit/nxt_go_port.c b/src/go/unit/nxt_go_port.c index cbf12ab2..f2f1fc5a 100644 --- a/src/go/unit/nxt_go_port.c +++ b/src/go/unit/nxt_go_port.c @@ -29,10 +29,18 @@ nxt_go_data_handler(nxt_port_msg_t *port_msg, size_t size) ctx = malloc(sizeof(nxt_go_run_ctx_t) + size); - memcpy(ctx + 1, port_msg, size); - port_msg = (nxt_port_msg_t *) (ctx + 1); + memcpy(ctx->port_msg, port_msg, size); + port_msg = ctx->port_msg; - nxt_go_ctx_init(ctx, port_msg, size - sizeof(nxt_port_msg_t)); + size -= sizeof(nxt_port_msg_t); + + nxt_go_ctx_init(ctx, port_msg, size); + + if (nxt_slow_path(ctx->cancelled)) { + nxt_go_debug("request already cancelled by router"); + free(ctx); + return 0; + } r = (nxt_go_request_t)(ctx); h = &ctx->request.header; diff --git a/src/go/unit/nxt_go_port_memory.c b/src/go/unit/nxt_go_port_memory.c index e1b732d2..a919d6cd 100644 --- a/src/go/unit/nxt_go_port_memory.c +++ b/src/go/unit/nxt_go_port_memory.c @@ -23,12 +23,14 @@ static nxt_port_mmap_header_t * -nxt_go_new_port_mmap(nxt_go_process_t *process, nxt_port_id_t id) +nxt_go_new_port_mmap(nxt_go_process_t *process, nxt_port_id_t id, + nxt_bool_t tracking) { int name_len, rc; void *mem; char name[64]; nxt_fd_t fd; + nxt_free_map_t *free_map; nxt_port_msg_t port_msg; nxt_go_port_mmap_t *port_mmap; nxt_port_mmap_header_t *hdr; @@ -101,6 +103,7 @@ nxt_go_new_port_mmap(nxt_go_process_t *process, nxt_port_id_t id) hdr = port_mmap->hdr; memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map)); + memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map)); hdr->id = process->outgoing.nelts - 1; hdr->src_pid = getpid(); @@ -108,10 +111,13 @@ nxt_go_new_port_mmap(nxt_go_process_t *process, nxt_port_id_t id) hdr->sent_over = id; /* Mark first chunk as busy */ - nxt_port_mmap_set_chunk_busy(hdr, 0); + free_map = tracking ? hdr->free_tracking_map : hdr->free_map; + + nxt_port_mmap_set_chunk_busy(free_map, 0); /* Mark as busy chunk followed the last available chunk. */ - nxt_port_mmap_set_chunk_busy(hdr, PORT_MMAP_CHUNK_COUNT); + nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT); + nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT); port_msg.stream = 0; port_msg.pid = getpid(); @@ -121,6 +127,7 @@ nxt_go_new_port_mmap(nxt_go_process_t *process, nxt_port_id_t id) port_msg.mmap = 0; port_msg.nf = 0; port_msg.mf = 0; + port_msg.tracking = 0; cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int)); cmsg.cm.cmsg_level = SOL_SOCKET; @@ -160,34 +167,34 @@ remove_fail: nxt_port_mmap_header_t * nxt_go_port_mmap_get(nxt_go_process_t *process, nxt_port_id_t port_id, - nxt_chunk_id_t *c) + nxt_chunk_id_t *c, nxt_bool_t tracking) { + nxt_free_map_t *free_map; nxt_go_port_mmap_t *port_mmap; nxt_go_port_mmap_t *end_port_mmap; nxt_port_mmap_header_t *hdr; - port_mmap = NULL; - hdr = NULL; - nxt_go_mutex_lock(&process->outgoing_mutex); port_mmap = process->outgoing.elts; end_port_mmap = port_mmap + process->outgoing.nelts; - while (port_mmap < end_port_mmap) { + for (; port_mmap < end_port_mmap; port_mmap++) + { + hdr = port_mmap->hdr; - if ( (port_mmap->hdr->sent_over == 0xFFFFu || - port_mmap->hdr->sent_over == port_id) && - nxt_port_mmap_get_free_chunk(port_mmap->hdr, c)) { - hdr = port_mmap->hdr; + if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port_id) { + continue; + } + free_map = tracking ? hdr->free_tracking_map : hdr->free_map; + + if (nxt_port_mmap_get_free_chunk(free_map, c)) { goto unlock_return; } - - port_mmap++; } - hdr = nxt_go_new_port_mmap(process, port_id); + hdr = nxt_go_new_port_mmap(process, port_id, tracking); unlock_return: diff --git a/src/go/unit/nxt_go_port_memory.h b/src/go/unit/nxt_go_port_memory.h index d7e321c0..558eb68f 100644 --- a/src/go/unit/nxt_go_port_memory.h +++ b/src/go/unit/nxt_go_port_memory.h @@ -24,7 +24,7 @@ struct nxt_go_port_mmap_s { struct nxt_port_mmap_header_s * nxt_go_port_mmap_get(nxt_go_process_t *process, nxt_port_id_t port_id, - nxt_chunk_id_t *c); + nxt_chunk_id_t *c, nxt_bool_t tracking); #endif /* _NXT_GO_PORT_MEMORY_H_INCLUDED_ */ diff --git a/src/go/unit/nxt_go_run_ctx.c b/src/go/unit/nxt_go_run_ctx.c index c5e41ee8..6aa5db77 100644 --- a/src/go/unit/nxt_go_run_ctx.c +++ b/src/go/unit/nxt_go_run_ctx.c @@ -85,6 +85,7 @@ static void nxt_go_ctx_init_msg(nxt_go_msg_t *msg, nxt_port_msg_t *port_msg, size_t payload_size) { + void *data, *end; nxt_port_mmap_msg_t *mmap_msg; memset(msg, 0, sizeof(nxt_go_msg_t)); @@ -92,9 +93,17 @@ nxt_go_ctx_init_msg(nxt_go_msg_t *msg, nxt_port_msg_t *port_msg, msg->port_msg = port_msg; msg->raw_size = payload_size; + data = port_msg + 1; + end = nxt_pointer_to(data, payload_size); + + if (port_msg->tracking) { + msg->tracking = data; + data = msg->tracking + 1; + } + if (nxt_fast_path(port_msg->mmap != 0)) { - msg->mmap_msg = (nxt_port_mmap_msg_t *) (port_msg + 1); - msg->end = nxt_pointer_to(msg->mmap_msg, payload_size); + msg->mmap_msg = data; + msg->end = end; mmap_msg = msg->mmap_msg; while(mmap_msg < msg->end) { @@ -134,7 +143,7 @@ nxt_go_ctx_release_msg(nxt_go_run_ctx_t *ctx, nxt_go_msg_t *msg) e = b + mmap_msg->size; while (b < e) { - nxt_port_mmap_set_chunk_free(port_mmap->hdr, c); + nxt_port_mmap_set_chunk_free(port_mmap->hdr->free_map, c); b += PORT_MMAP_CHUNK_SIZE; c++; @@ -149,6 +158,10 @@ nxt_int_t nxt_go_ctx_init(nxt_go_run_ctx_t *ctx, nxt_port_msg_t *port_msg, size_t payload_size) { + nxt_atomic_t *val; + nxt_go_port_mmap_t *port_mmap; + nxt_port_mmap_tracking_msg_t *tracking; + memset(ctx, 0, sizeof(nxt_go_run_ctx_t)); ctx->process = nxt_go_get_process(port_msg->pid); @@ -160,39 +173,45 @@ nxt_go_ctx_init(nxt_go_run_ctx_t *ctx, nxt_port_msg_t *port_msg, nxt_go_ctx_init_msg(&ctx->msg, port_msg, payload_size); - ctx->msg_last = &ctx->msg; + if (ctx->msg.tracking != NULL) { + tracking = ctx->msg.tracking; - ctx->wport_msg.stream = port_msg->stream; - ctx->wport_msg.pid = getpid(); - ctx->wport_msg.type = _NXT_PORT_MSG_DATA; - ctx->wport_msg.mmap = 1; + if (nxt_slow_path(tracking->mmap_id >= ctx->process->incoming.nelts)) { + nxt_go_warn("incoming shared memory segment #%d not found " + "for process %d", (int) tracking->mmap_id, + (int) port_msg->pid); - ctx->wmmap_msg = (nxt_port_mmap_msg_t *) ( &ctx->wport_msg + 1 ); + return NXT_ERROR; + } - return nxt_go_ctx_init_rbuf(ctx); -} + nxt_go_mutex_lock(&ctx->process->incoming_mutex); + port_mmap = nxt_go_array_at(&ctx->process->incoming, tracking->mmap_id); -void -nxt_go_ctx_add_msg(nxt_go_run_ctx_t *ctx, nxt_port_msg_t *port_msg, size_t size) -{ - nxt_go_msg_t *msg; + nxt_go_mutex_unlock(&ctx->process->incoming_mutex); - msg = malloc(sizeof(nxt_go_msg_t)); + val = port_mmap->hdr->tracking + tracking->tracking_id; - nxt_go_ctx_init_msg(msg, port_msg, size - sizeof(nxt_port_msg_t)); + ctx->cancelled = nxt_atomic_cmp_set(val, port_msg->stream, 0) == 0; - msg->start_offset = ctx->msg_last->start_offset; + if (ctx->cancelled) { + nxt_port_mmap_set_chunk_free(port_mmap->hdr->free_tracking_map, + tracking->tracking_id); - if (ctx->msg_last == &ctx->msg) { - msg->start_offset += ctx->request.body.preread_size; - - } else { - msg->start_offset += ctx->msg_last->data_size; + return NXT_OK; + } } - ctx->msg_last->next = msg; - ctx->msg_last = msg; + ctx->msg_last = &ctx->msg; + + ctx->wport_msg.stream = port_msg->stream; + ctx->wport_msg.pid = getpid(); + ctx->wport_msg.type = _NXT_PORT_MSG_DATA; + ctx->wport_msg.mmap = 1; + + ctx->wmmap_msg = (nxt_port_mmap_msg_t *) ( &ctx->wport_msg + 1 ); + + return nxt_go_ctx_init_rbuf(ctx); } @@ -243,7 +262,8 @@ nxt_go_port_mmap_get_buf(nxt_go_run_ctx_t *ctx, size_t size) buf = &ctx->wbuf; - hdr = nxt_go_port_mmap_get(ctx->process, ctx->msg.port_msg->reply_port, &c); + hdr = nxt_go_port_mmap_get(ctx->process, ctx->msg.port_msg->reply_port, &c, + 0); if (nxt_slow_path(hdr == NULL)) { nxt_go_warn("failed to get port_mmap"); @@ -273,7 +293,7 @@ nxt_go_port_mmap_get_buf(nxt_go_run_ctx_t *ctx, size_t size) /* Try to acquire as much chunks as required. */ while (nchunks > 0) { - if (nxt_port_mmap_chk_set_chunk_busy(hdr, c) == 0) { + if (nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, c) == 0) { break; } @@ -317,7 +337,7 @@ nxt_go_port_mmap_increase_buf(nxt_buf_t *b, size_t size, size_t min_size) /* Try to acquire as much chunks as required. */ while (nchunks > 0) { - if (nxt_port_mmap_chk_set_chunk_busy(hdr, c) == 0) { + if (nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, c) == 0) { break; } @@ -330,7 +350,7 @@ nxt_go_port_mmap_increase_buf(nxt_buf_t *b, size_t size, size_t min_size) { c--; while (c >= start) { - nxt_port_mmap_set_chunk_free(hdr, c); + nxt_port_mmap_set_chunk_free(hdr->free_map, c); c--; } @@ -438,6 +458,7 @@ nxt_go_ctx_read_size_(nxt_go_run_ctx_t *ctx, size_t *size) return NXT_OK; } + nxt_int_t nxt_go_ctx_read_size(nxt_go_run_ctx_t *ctx, size_t *size) { @@ -452,6 +473,7 @@ nxt_go_ctx_read_size(nxt_go_run_ctx_t *ctx, size_t *size) return rc; } + nxt_int_t nxt_go_ctx_read_str(nxt_go_run_ctx_t *ctx, nxt_str_t *str) { diff --git a/src/go/unit/nxt_go_run_ctx.h b/src/go/unit/nxt_go_run_ctx.h index f35b48de..5c6775ab 100644 --- a/src/go/unit/nxt_go_run_ctx.h +++ b/src/go/unit/nxt_go_run_ctx.h @@ -29,6 +29,8 @@ struct nxt_go_msg_s { nxt_port_mmap_msg_t *mmap_msg; nxt_port_mmap_msg_t *end; + nxt_port_mmap_tracking_msg_t *tracking; + nxt_go_msg_t *next; }; @@ -38,6 +40,7 @@ typedef struct { nxt_go_process_t *process; nxt_port_mmap_msg_t *wmmap_msg; + nxt_bool_t cancelled; uint32_t nrbuf; nxt_buf_t rbuf; @@ -51,6 +54,8 @@ typedef struct { uintptr_t go_request; nxt_go_msg_t *msg_last; + + nxt_port_msg_t port_msg[]; } nxt_go_run_ctx_t; @@ -59,9 +64,6 @@ void nxt_go_ctx_release_msg(nxt_go_run_ctx_t *ctx, nxt_go_msg_t *msg); nxt_int_t nxt_go_ctx_init(nxt_go_run_ctx_t *ctx, nxt_port_msg_t *port_msg, size_t payload_size); -void nxt_go_ctx_add_msg(nxt_go_run_ctx_t *ctx, nxt_port_msg_t *port_msg, - size_t size); - nxt_int_t nxt_go_ctx_flush(nxt_go_run_ctx_t *ctx, int last); nxt_int_t nxt_go_ctx_write(nxt_go_run_ctx_t *ctx, void *data, size_t len); diff --git a/src/nxt_application.c b/src/nxt_application.c index 2cd3c921..a854564e 100644 --- a/src/nxt_application.c +++ b/src/nxt_application.c @@ -413,7 +413,9 @@ nxt_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid, msg->port_msg.reply_port); if (nxt_slow_path(port == NULL)) { - // + nxt_debug(task, "stream #%uD: reply port %d not found", + msg->port_msg.stream, msg->port_msg.reply_port); + return; } wmsg.port = port; @@ -425,20 +427,12 @@ nxt_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) } -nxt_inline nxt_port_t * -nxt_app_msg_get_port(nxt_task_t *task, nxt_app_wmsg_t *msg) -{ - return msg->port; -} - - u_char * nxt_app_msg_write_get_buf(nxt_task_t *task, nxt_app_wmsg_t *msg, size_t size) { size_t free_size; u_char *res; nxt_buf_t *b; - nxt_port_t *port; res = NULL; @@ -446,12 +440,7 @@ nxt_app_msg_write_get_buf(nxt_task_t *task, nxt_app_wmsg_t *msg, size_t size) b = *msg->buf; if (b == NULL) { - port = nxt_app_msg_get_port(task, msg); - if (nxt_slow_path(port == NULL)) { - return NULL; - } - - b = nxt_port_mmap_get_buf(task, port, size); + b = nxt_port_mmap_get_buf(task, msg->port, size); if (nxt_slow_path(b == NULL)) { return NULL; } @@ -573,10 +562,9 @@ nxt_app_msg_write_prefixed_upcase(nxt_task_t *task, nxt_app_wmsg_t *msg, } -nxt_int_t -nxt_app_msg_read_str(nxt_task_t *task, nxt_app_rmsg_t *msg, nxt_str_t *str) +nxt_inline nxt_int_t +nxt_app_msg_read_size_(nxt_task_t *task, nxt_app_rmsg_t *msg, size_t *size) { - size_t length; nxt_buf_t *buf; do { @@ -603,7 +591,25 @@ nxt_app_msg_read_str(nxt_task_t *task, nxt_app_rmsg_t *msg, nxt_str_t *str) break; } while (1); - buf->mem.pos = nxt_app_msg_read_length(buf->mem.pos, &length); + buf->mem.pos = nxt_app_msg_read_length(buf->mem.pos, size); + + return NXT_OK; +} + + +nxt_int_t +nxt_app_msg_read_str(nxt_task_t *task, nxt_app_rmsg_t *msg, nxt_str_t *str) +{ + size_t length; + nxt_int_t ret; + nxt_buf_t *buf; + + ret = nxt_app_msg_read_size_(task, msg, &length); + if (ret != NXT_OK) { + return ret; + } + + buf = msg->buf; if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < (intptr_t) length)) { return NXT_ERROR; @@ -688,37 +694,13 @@ nxt_app_msg_read_nvp(nxt_task_t *task, nxt_app_rmsg_t *rmsg, nxt_str_t *n, nxt_int_t nxt_app_msg_read_size(nxt_task_t *task, nxt_app_rmsg_t *msg, size_t *size) { - nxt_buf_t *buf; - - do { - buf = msg->buf; + nxt_int_t ret; - if (nxt_slow_path(buf == NULL)) { - return NXT_DONE; - } - - if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 1)) { - if (nxt_fast_path(nxt_buf_mem_used_size(&buf->mem) == 0)) { - msg->buf = buf->next; - continue; - } - return NXT_ERROR; - } - - if (buf->mem.pos[0] >= 128) { - if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 4)) { - return NXT_ERROR; - } - } - - break; - } while (1); - - buf->mem.pos = nxt_app_msg_read_length(buf->mem.pos, size); + ret = nxt_app_msg_read_size_(task, msg, size); nxt_debug(task, "nxt_read_size: %d", (int) *size); - return NXT_OK; + return ret; } @@ -921,21 +903,15 @@ nxt_app_msg_flush(nxt_task_t *task, nxt_app_wmsg_t *msg, nxt_bool_t last) { nxt_int_t rc; nxt_buf_t *b; - nxt_port_t *port; rc = NXT_OK; - port = nxt_app_msg_get_port(task, msg); - if (nxt_slow_path(port == NULL)) { - return NXT_ERROR; - } - if (nxt_slow_path(last == 1)) { do { b = *msg->buf; if (b == NULL) { - b = nxt_buf_sync_alloc(port->mem_pool, NXT_BUF_SYNC_LAST); + b = nxt_buf_sync_alloc(msg->port->mem_pool, NXT_BUF_SYNC_LAST); *msg->buf = b; break; } @@ -945,7 +921,7 @@ nxt_app_msg_flush(nxt_task_t *task, nxt_app_wmsg_t *msg, nxt_bool_t last) } if (nxt_slow_path(msg->write != NULL)) { - rc = nxt_port_socket_write(task, port, NXT_PORT_MSG_DATA, + rc = nxt_port_socket_write(task, msg->port, NXT_PORT_MSG_DATA, -1, msg->stream, 0, msg->write); msg->write = NULL; @@ -962,7 +938,6 @@ nxt_app_msg_write_raw(nxt_task_t *task, nxt_app_wmsg_t *msg, const u_char *c, { size_t free_size, copy_size; nxt_buf_t *b; - nxt_port_t *port; nxt_debug(task, "nxt_app_msg_write_raw: %uz", size); @@ -970,12 +945,7 @@ nxt_app_msg_write_raw(nxt_task_t *task, nxt_app_wmsg_t *msg, const u_char *c, b = *msg->buf; if (b == NULL) { - port = nxt_app_msg_get_port(task, msg); - if (nxt_slow_path(port == NULL)) { - return NXT_ERROR; - } - - b = nxt_port_mmap_get_buf(task, port, size); + b = nxt_port_mmap_get_buf(task, msg->port, size); if (nxt_slow_path(b == NULL)) { return NXT_ERROR; } diff --git a/src/nxt_port.h b/src/nxt_port.h index 2c9a1a99..c54a1537 100644 --- a/src/nxt_port.h +++ b/src/nxt_port.h @@ -102,13 +102,21 @@ typedef struct { nxt_port_id_t reply_port; uint8_t type; + + /* Last message for this stream. */ uint8_t last; /* 1 bit */ /* Message data send using mmap, next chunk is a nxt_port_mmap_msg_t. */ uint8_t mmap; /* 1 bit */ - uint8_t nf; - uint8_t mf; + /* Non-First fragment in fragmented message sequence. */ + uint8_t nf; /* 1 bit */ + + /* More Fragments followed. */ + uint8_t mf; /* 1 bit */ + + /* Message delivery tracking enabled, next chunk is tracking msg. */ + uint8_t tracking; /* 1 bit */ } nxt_port_msg_t; @@ -119,6 +127,7 @@ typedef struct { nxt_fd_t fd; nxt_bool_t close_fd; nxt_port_msg_t port_msg; + uint32_t tracking_msg[2]; nxt_work_t work; } nxt_port_send_msg_t; @@ -130,6 +139,7 @@ struct nxt_port_recv_msg_s { nxt_port_t *port; nxt_port_msg_t port_msg; size_t size; + nxt_bool_t cancelled; union { nxt_port_t *new_port; nxt_pid_t removed_pid; @@ -221,9 +231,18 @@ void nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port); void nxt_port_write_close(nxt_port_t *port); void nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port); void nxt_port_read_close(nxt_port_t *port); -nxt_int_t nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, +nxt_int_t nxt_port_socket_twrite(nxt_task_t *task, nxt_port_t *port, + nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, + nxt_buf_t *b, void *tracking); + +nxt_inline nxt_int_t +nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, - nxt_buf_t *b); + nxt_buf_t *b) +{ + return nxt_port_socket_twrite(task, port, type, fd, stream, reply_port, b, + NULL); +} void nxt_port_enable(nxt_task_t *task, nxt_port_t *port, nxt_port_handlers_t *handlers); diff --git a/src/nxt_port_memory.c b/src/nxt_port_memory.c index 9f27ab09..3a1ec198 100644 --- a/src/nxt_port_memory.c +++ b/src/nxt_port_memory.c @@ -163,7 +163,7 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data) b->is_port_mmap_sent, hdr->src_pid, hdr->dst_pid, hdr->id, c); while (p < b->mem.end) { - nxt_port_mmap_set_chunk_free(hdr, c); + nxt_port_mmap_set_chunk_free(hdr->free_map, c); p += PORT_MMAP_CHUNK_SIZE; c++; @@ -253,11 +253,12 @@ fail: static nxt_port_mmap_handler_t * nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, - nxt_port_t *port) + nxt_port_t *port, nxt_bool_t tracking) { void *mem; u_char *p, name[64]; nxt_fd_t fd; + nxt_free_map_t *free_map; nxt_port_mmap_t *port_mmap; nxt_port_mmap_header_t *hdr; nxt_port_mmap_handler_t *mmap_handler; @@ -342,6 +343,7 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, hdr = mmap_handler->hdr; nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map)); + nxt_memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map)); hdr->id = process->outgoing.size - 1; hdr->src_pid = nxt_pid; @@ -349,10 +351,13 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, hdr->sent_over = port->id; /* Mark first chunk as busy */ - nxt_port_mmap_set_chunk_busy(hdr, 0); + free_map = tracking ? hdr->free_tracking_map : hdr->free_map; + + nxt_port_mmap_set_chunk_busy(free_map, 0); /* Mark as busy chunk followed the last available chunk. */ - nxt_port_mmap_set_chunk_busy(hdr, PORT_MMAP_CHUNK_COUNT); + nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT); + nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT); nxt_debug(task, "send mmap fd %FD to process %PI", fd, port->pid); @@ -376,9 +381,10 @@ remove_fail: static nxt_port_mmap_handler_t * nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c, - size_t size) + nxt_bool_t tracking) { nxt_process_t *process; + nxt_free_map_t *free_map; nxt_port_mmap_t *port_mmap; nxt_port_mmap_t *end_port_mmap; nxt_port_mmap_header_t *hdr; @@ -406,14 +412,16 @@ nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c, continue; } - if (nxt_port_mmap_get_free_chunk(hdr, c)) { + free_map = tracking ? hdr->free_tracking_map : hdr->free_map; + + if (nxt_port_mmap_get_free_chunk(free_map, c)) { goto unlock_return; } } /* TODO introduce port_mmap limit and release wait. */ - mmap_handler = nxt_port_new_port_mmap(task, process, port); + mmap_handler = nxt_port_new_port_mmap(task, process, port, tracking); unlock_return: @@ -434,12 +442,15 @@ nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id) return NULL; } - mmap_handler = NULL; - nxt_thread_mutex_lock(&process->incoming.mutex); if (nxt_fast_path(process->incoming.size > id)) { mmap_handler = process->incoming.elts[id].mmap_handler; + + } else { + mmap_handler = NULL; + + nxt_debug(task, "invalid incoming mmap id %uD for pid %PI", id, spid); } nxt_thread_mutex_unlock(&process->incoming.mutex); @@ -448,6 +459,131 @@ nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id) } +nxt_int_t +nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_t *port, + nxt_port_mmap_tracking_t *tracking, uint32_t stream) +{ + nxt_chunk_id_t c; + nxt_port_mmap_header_t *hdr; + nxt_port_mmap_handler_t *mmap_handler; + + nxt_debug(task, "request tracking for stream #%uD", stream); + + mmap_handler = nxt_port_mmap_get(task, port, &c, 1); + if (nxt_slow_path(mmap_handler == NULL)) { + return NXT_ERROR; + } + + nxt_port_mmap_handler_use(mmap_handler, 1); + + hdr = mmap_handler->hdr; + + tracking->mmap_handler = mmap_handler; + tracking->tracking = hdr->tracking + c; + + *tracking->tracking = stream; + + nxt_debug(task, "outgoing tracking allocation: %PI->%PI,%d,%d", + hdr->src_pid, hdr->dst_pid, hdr->id, c); + + return NXT_OK; +} + + +nxt_bool_t +nxt_port_mmap_tracking_cancel(nxt_task_t *task, + nxt_port_mmap_tracking_t *tracking, uint32_t stream) +{ + nxt_bool_t res; + nxt_chunk_id_t c; + nxt_port_mmap_header_t *hdr; + nxt_port_mmap_handler_t *mmap_handler; + + mmap_handler = tracking->mmap_handler; + + if (nxt_slow_path(mmap_handler == NULL)) { + return 0; + } + + hdr = mmap_handler->hdr; + + res = nxt_atomic_cmp_set(tracking->tracking, stream, 0); + + nxt_debug(task, "%s tracking for stream #%uD", + (res ? "cancelled" : "failed to cancel"), stream); + + if (!res) { + c = tracking->tracking - hdr->tracking; + nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c); + } + + nxt_port_mmap_handler_use(mmap_handler, -1); + + return res; +} + + +nxt_int_t +nxt_port_mmap_tracking_write(uint32_t *buf, nxt_port_mmap_tracking_t *t) +{ + nxt_atomic_t *tracking; + nxt_port_mmap_handler_t *mmap_handler; + + mmap_handler = t->mmap_handler; + tracking = mmap_handler->hdr->tracking; + + nxt_assert(t->tracking >= tracking); + nxt_assert(t->tracking < tracking + PORT_MMAP_CHUNK_COUNT); + + buf[0] = mmap_handler->hdr->id; + buf[1] = t->tracking - mmap_handler->hdr->tracking; + + return NXT_OK; +} + +nxt_bool_t +nxt_port_mmap_tracking_read(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + nxt_buf_t *b; + nxt_bool_t res; + nxt_chunk_id_t c; + nxt_port_mmap_header_t *hdr; + nxt_port_mmap_handler_t *mmap_handler; + nxt_port_mmap_tracking_msg_t *tracking_msg; + + b = msg->buf; + + if (nxt_buf_used_size(b) < (int)sizeof(nxt_port_mmap_tracking_msg_t)) { + nxt_debug(task, "too small message %u", nxt_buf_used_size(b)); + return 0; + } + + tracking_msg = (nxt_port_mmap_tracking_msg_t *) b->mem.pos; + + b->mem.pos += sizeof(nxt_port_mmap_tracking_msg_t); + mmap_handler = nxt_port_get_port_incoming_mmap(task, msg->port_msg.pid, + tracking_msg->mmap_id); + + if (nxt_slow_path(mmap_handler == NULL)) { + return 0; + } + + hdr = mmap_handler->hdr; + + c = tracking_msg->tracking_id; + res = nxt_atomic_cmp_set(hdr->tracking + c, msg->port_msg.stream, 0); + + nxt_debug(task, "tracking for stream #%uD %s", msg->port_msg.stream, + (res ? "received" : "already cancelled")); + + if (!res) { + nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c); + } + + return res; +} + + nxt_buf_t * nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size) { @@ -467,7 +603,7 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size) b->completion_handler = nxt_port_mmap_buf_completion; nxt_buf_set_port_mmap(b); - mmap_handler = nxt_port_mmap_get(task, port, &c, size); + mmap_handler = nxt_port_mmap_get(task, port, &c, 0); if (nxt_slow_path(mmap_handler == NULL)) { nxt_mp_release(task->thread->engine->mem_pool, b); return NULL; @@ -499,7 +635,7 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size) /* Try to acquire as much chunks as required. */ while (nchunks > 0) { - if (nxt_port_mmap_chk_set_chunk_busy(hdr, c) == 0) { + if (nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, c) == 0) { break; } @@ -552,7 +688,7 @@ nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size, /* Try to acquire as much chunks as required. */ while (nchunks > 0) { - if (nxt_port_mmap_chk_set_chunk_busy(hdr, c) == 0) { + if (nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, c) == 0) { break; } @@ -565,7 +701,7 @@ nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size, { c--; while (c >= start) { - nxt_port_mmap_set_chunk_free(hdr, c); + nxt_port_mmap_set_chunk_free(hdr->free_map, c); c--; } @@ -683,40 +819,43 @@ nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port, void -nxt_port_mmap_read(nxt_task_t *task, nxt_port_t *port, - nxt_port_recv_msg_t *msg) +nxt_port_mmap_read(nxt_task_t *task, nxt_port_recv_msg_t *msg) { nxt_buf_t *b, **pb; nxt_port_mmap_msg_t *end, *mmap_msg; - b = msg->buf; - - mmap_msg = (nxt_port_mmap_msg_t *) b->mem.pos; - end = (nxt_port_mmap_msg_t *) b->mem.free; - pb = &msg->buf; msg->size = 0; - while (mmap_msg < end) { - nxt_debug(task, "mmap_msg={%D, %D, %D} from %PI", - mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size, - msg->port_msg.pid); + for (b = msg->buf; b != NULL; b = b->next) { - *pb = nxt_port_mmap_get_incoming_buf(task, port, msg->port_msg.pid, - mmap_msg); - if (nxt_slow_path(*pb == NULL)) { - nxt_log_error(NXT_LOG_ERR, task->log, "failed to get mmap buffer"); + mmap_msg = (nxt_port_mmap_msg_t *) b->mem.pos; + end = (nxt_port_mmap_msg_t *) b->mem.free; - break; - } + while (mmap_msg < end) { + nxt_assert(mmap_msg + 1 <= end); - msg->size += mmap_msg->size; - pb = &(*pb)->next; - mmap_msg++; - } + nxt_debug(task, "mmap_msg={%D, %D, %D} from %PI", + mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size, + msg->port_msg.pid); - /* Mark original buf as complete. */ - b->mem.pos += nxt_buf_used_size(b); + *pb = nxt_port_mmap_get_incoming_buf(task, msg->port, + msg->port_msg.pid, mmap_msg); + if (nxt_slow_path(*pb == NULL)) { + nxt_log_error(NXT_LOG_ERR, task->log, + "failed to get mmap buffer"); + + break; + } + + msg->size += mmap_msg->size; + pb = &(*pb)->next; + mmap_msg++; + + /* Mark original buf as complete. */ + b->mem.pos += sizeof(nxt_port_mmap_msg_t); + } + } } diff --git a/src/nxt_port_memory.h b/src/nxt_port_memory.h index d1f29df8..c6a49ccf 100644 --- a/src/nxt_port_memory.h +++ b/src/nxt_port_memory.h @@ -15,6 +15,27 @@ typedef struct nxt_port_mmap_handler_s nxt_port_mmap_handler_t; void nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free_elts); +typedef struct nxt_port_mmap_tracking_s nxt_port_mmap_tracking_t; + +struct nxt_port_mmap_tracking_s { + void *mmap_handler; + nxt_atomic_t *tracking; +}; + +nxt_int_t +nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_t *port, + nxt_port_mmap_tracking_t *tracking, uint32_t stream); + +nxt_bool_t +nxt_port_mmap_tracking_cancel(nxt_task_t *task, + nxt_port_mmap_tracking_t *tracking, uint32_t stream); + +nxt_int_t +nxt_port_mmap_tracking_write(uint32_t *buf, nxt_port_mmap_tracking_t *t); + +nxt_bool_t +nxt_port_mmap_tracking_read(nxt_task_t *task, nxt_port_recv_msg_t *msg); + /* * Allocates nxt_but_t structure from port's mem_pool, assigns this buf 'mem' * pointers to first available shared mem bucket(s). 'size' used as a hint to @@ -37,8 +58,7 @@ nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port, nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb); void -nxt_port_mmap_read(nxt_task_t *task, nxt_port_t *port, - nxt_port_recv_msg_t *msg); +nxt_port_mmap_read(nxt_task_t *task, nxt_port_recv_msg_t *msg); enum nxt_port_method_e { NXT_PORT_METHOD_ANY = 0, diff --git a/src/nxt_port_memory_int.h b/src/nxt_port_memory_int.h index 0168e1ae..589416b6 100644 --- a/src/nxt_port_memory_int.h +++ b/src/nxt_port_memory_int.h @@ -52,6 +52,10 @@ struct nxt_port_mmap_header_s { nxt_pid_t dst_pid; /* For sanity check. */ nxt_port_id_t sent_over; nxt_free_map_t free_map[MAX_FREE_IDX]; + nxt_free_map_t free_map_padding; + nxt_free_map_t free_tracking_map[MAX_FREE_IDX]; + nxt_free_map_t free_tracking_map_padding; + nxt_atomic_t tracking[PORT_MMAP_CHUNK_COUNT]; }; @@ -78,20 +82,27 @@ struct nxt_port_mmap_msg_s { }; +typedef struct nxt_port_mmap_tracking_msg_s nxt_port_mmap_tracking_msg_t; + +struct nxt_port_mmap_tracking_msg_s { + uint32_t mmap_id; /* Mmap index in nxt_process_t.outgoing. */ + nxt_chunk_id_t tracking_id; /* Tracking index. */ +}; + static nxt_bool_t -nxt_port_mmap_get_free_chunk(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t *c); +nxt_port_mmap_get_free_chunk(nxt_free_map_t *m, nxt_chunk_id_t *c); -#define nxt_port_mmap_get_chunk_busy(hdr, c) \ - ((hdr->free_map[FREE_IDX(c)] & FREE_MASK(c)) == 0) +#define nxt_port_mmap_get_chunk_busy(m, c) \ + ((m[FREE_IDX(c)] & FREE_MASK(c)) == 0) nxt_inline void -nxt_port_mmap_set_chunk_busy(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c); +nxt_port_mmap_set_chunk_busy(nxt_free_map_t *m, nxt_chunk_id_t c); nxt_inline nxt_bool_t -nxt_port_mmap_chk_set_chunk_busy(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c); +nxt_port_mmap_chk_set_chunk_busy(nxt_free_map_t *m, nxt_chunk_id_t c); nxt_inline void -nxt_port_mmap_set_chunk_free(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c); +nxt_port_mmap_set_chunk_free(nxt_free_map_t *m, nxt_chunk_id_t c); nxt_inline nxt_chunk_id_t nxt_port_mmap_chunk_id(nxt_port_mmap_header_t *hdr, u_char *p) @@ -116,18 +127,15 @@ nxt_port_mmap_chunk_start(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c) static nxt_bool_t -nxt_port_mmap_get_free_chunk(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t *c) +nxt_port_mmap_get_free_chunk(nxt_free_map_t *m, nxt_chunk_id_t *c) { int ffs; size_t i; nxt_chunk_id_t chunk; nxt_free_map_t bits; - nxt_free_map_t *free_map; - - free_map = hdr->free_map; for (i = 0; i < MAX_FREE_IDX; i++) { - bits = free_map[i]; + bits = m[i]; if (bits == 0) { continue; } @@ -136,7 +144,7 @@ nxt_port_mmap_get_free_chunk(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t *c) if (ffs != 0) { chunk = i * FREE_BITS + ffs - 1; - if (nxt_port_mmap_chk_set_chunk_busy(hdr, chunk)) { + if (nxt_port_mmap_chk_set_chunk_busy(m, chunk)) { *c = chunk; return 1; } @@ -148,19 +156,19 @@ nxt_port_mmap_get_free_chunk(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t *c) nxt_inline void -nxt_port_mmap_set_chunk_busy(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c) +nxt_port_mmap_set_chunk_busy(nxt_free_map_t *m, nxt_chunk_id_t c) { - nxt_atomic_and_fetch(hdr->free_map + FREE_IDX(c), ~FREE_MASK(c)); + nxt_atomic_and_fetch(m + FREE_IDX(c), ~FREE_MASK(c)); } nxt_inline nxt_bool_t -nxt_port_mmap_chk_set_chunk_busy(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c) +nxt_port_mmap_chk_set_chunk_busy(nxt_free_map_t *m, nxt_chunk_id_t c) { nxt_free_map_t *f; nxt_free_map_t free_val, busy_val; - f = hdr->free_map + FREE_IDX(c); + f = m + FREE_IDX(c); while ( (*f & FREE_MASK(c)) != 0 ) { @@ -177,9 +185,9 @@ nxt_port_mmap_chk_set_chunk_busy(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c) nxt_inline void -nxt_port_mmap_set_chunk_free(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c) +nxt_port_mmap_set_chunk_free(nxt_free_map_t *m, nxt_chunk_id_t c) { - nxt_atomic_or_fetch(hdr->free_map + FREE_IDX(c), FREE_MASK(c)); + nxt_atomic_or_fetch(m + FREE_IDX(c), FREE_MASK(c)); } diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c index 08f87e84..6d762bbd 100644 --- a/src/nxt_port_socket.c +++ b/src/nxt_port_socket.c @@ -223,8 +223,9 @@ nxt_port_msg_first(nxt_task_t *task, nxt_port_t *port, nxt_port_send_msg_t *msg) nxt_int_t -nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, - nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b) +nxt_port_socket_twrite(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, + nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b, + void *tracking) { nxt_port_send_msg_t msg, *res; @@ -236,6 +237,10 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, msg.close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0; msg.share = 0; + if (tracking != NULL) { + nxt_port_mmap_tracking_write(msg.tracking_msg, tracking); + } + msg.port_msg.stream = stream; msg.port_msg.pid = nxt_pid; msg.port_msg.reply_port = reply_port; @@ -244,6 +249,7 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, msg.port_msg.mmap = 0; msg.port_msg.nf = 0; msg.port_msg.mf = 0; + msg.port_msg.tracking = tracking != NULL; msg.work.data = NULL; @@ -337,6 +343,10 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) port->max_size / PORT_MMAP_MIN_SIZE); } + if (msg->port_msg.tracking) { + iov[0].iov_len += sizeof(msg->tracking_msg); + } + nxt_sendbuf_mem_coalesce(task, &sb); plain_size = sb.size; @@ -385,6 +395,7 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) msg->fd = -1; msg->share += n; msg->port_msg.nf = 1; + msg->port_msg.tracking = 0; if (msg->share >= port->max_share) { msg->share = 0; @@ -677,7 +688,12 @@ nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, if (nxt_slow_path(msg->size < sizeof(nxt_port_msg_t))) { nxt_log(task, NXT_LOG_CRIT, "port %d: too small message:%uz", port->socket.fd, msg->size); - goto fail; + + if (msg->fd != -1) { + nxt_fd_close(msg->fd); + } + + return; } /* adjust size to actual buffer used size */ @@ -686,52 +702,82 @@ nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, b = orig_b = msg->buf; b->mem.free += msg->size; - if (msg->port_msg.mmap) { - nxt_port_mmap_read(task, port, msg); - b = msg->buf; + if (msg->port_msg.tracking) { + msg->cancelled = nxt_port_mmap_tracking_read(task, msg) == 0; + + } else { + msg->cancelled = 0; } if (nxt_slow_path(msg->port_msg.nf != 0)) { + fmsg = nxt_port_frag_find(task, port, msg->port_msg.stream, msg->port_msg.mf == 0); - if (nxt_slow_path(fmsg == NULL)) { - nxt_assert(fmsg != NULL); - } + nxt_assert(fmsg != NULL); - nxt_buf_chain_add(&fmsg->buf, msg->buf); + if (nxt_fast_path(fmsg->cancelled == 0)) { - fmsg->size += msg->size; + if (msg->port_msg.mmap) { + nxt_port_mmap_read(task, msg); + b = msg->buf; + } - msg->buf = NULL; - b = NULL; + nxt_buf_chain_add(&fmsg->buf, msg->buf); - if (nxt_fast_path(msg->port_msg.mf == 0)) { - b = fmsg->buf; + fmsg->size += msg->size; + msg->buf = NULL; + b = NULL; - port->handler(task, fmsg); + if (nxt_fast_path(msg->port_msg.mf == 0)) { - msg->buf = fmsg->buf; - msg->fd = fmsg->fd; + b = fmsg->buf; + port->handler(task, fmsg); + + msg->buf = fmsg->buf; + msg->fd = fmsg->fd; + } + } + + if (nxt_fast_path(msg->port_msg.mf == 0)) { nxt_mp_free(port->mem_pool, fmsg); } } else { if (nxt_slow_path(msg->port_msg.mf != 0)) { - fmsg = nxt_port_frag_start(task, port, msg); - if (nxt_slow_path(fmsg == NULL)) { - nxt_assert(fmsg != NULL); + if (msg->port_msg.mmap && msg->cancelled == 0) { + nxt_port_mmap_read(task, msg); + b = msg->buf; } + fmsg = nxt_port_frag_start(task, port, msg); + + nxt_assert(fmsg != NULL); + fmsg->port_msg.nf = 0; fmsg->port_msg.mf = 0; - msg->buf = NULL; - msg->fd = -1; - b = NULL; + if (nxt_fast_path(msg->cancelled == 0)) { + msg->buf = NULL; + msg->fd = -1; + b = NULL; + + } else { + if (msg->fd != -1) { + nxt_fd_close(msg->fd); + } + } } else { - port->handler(task, msg); + if (nxt_fast_path(msg->cancelled == 0)) { + + if (msg->port_msg.mmap) { + nxt_port_mmap_read(task, msg); + b = msg->buf; + } + + port->handler(task, msg); + } } } @@ -754,14 +800,6 @@ nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, /* restore original buf */ msg->buf = orig_b; } - - return; - -fail: - - if (msg->fd != -1) { - nxt_fd_close(msg->fd); - } } diff --git a/src/nxt_router.c b/src/nxt_router.c index 103ba78c..f637c864 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -23,6 +23,13 @@ typedef struct { } nxt_router_listener_conf_t; +typedef struct nxt_msg_info_s { + nxt_buf_t *buf; + nxt_port_mmap_tracking_t tracking; + nxt_work_handler_t completion_handler; +} nxt_msg_info_t; + + typedef struct nxt_req_app_link_s nxt_req_app_link_t; @@ -32,6 +39,7 @@ typedef struct { nxt_app_t *app; nxt_port_t *app_port; nxt_app_parse_ctx_t *ap; + nxt_msg_info_t msg_info; nxt_req_app_link_t *ra; nxt_queue_link_t link; /* for nxt_conn_t.requests */ @@ -44,6 +52,7 @@ struct nxt_req_app_link_s { nxt_pid_t app_pid; nxt_port_t *reply_port; nxt_app_parse_ctx_t *ap; + nxt_msg_info_t msg_info; nxt_req_conn_link_t *rc; nxt_queue_link_t link; /* for nxt_app_t.requests */ @@ -64,9 +73,6 @@ typedef struct { static nxt_int_t nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app); -static void nxt_router_ra_error(nxt_task_t *task, nxt_req_app_link_t *ra, - int code, const char* str); - static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task); static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data); static void nxt_router_conf_ready(nxt_task_t *task, @@ -352,93 +358,87 @@ nxt_router_ra_create(nxt_task_t *task, nxt_req_app_link_t *ra_src) } -static void -nxt_router_ra_release(nxt_task_t *task, void *obj, void *data) +nxt_inline nxt_bool_t +nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info, + uint32_t stream) { - nxt_req_app_link_t *ra; - nxt_event_engine_t *engine; - nxt_req_conn_link_t *rc; + nxt_buf_t *b, *next; + nxt_bool_t cancelled; - ra = obj; - engine = data; - - if (task->thread->engine != engine) { - if (ra->app_port != NULL) { - ra->app_pid = ra->app_port->pid; - } - - ra->work.handler = nxt_router_ra_release; - ra->work.task = &engine->task; - ra->work.next = NULL; + if (msg_info->buf == NULL) { + return 0; + } - nxt_debug(task, "ra stream #%uD post release to %p", - ra->stream, engine); + cancelled = nxt_port_mmap_tracking_cancel(task, &msg_info->tracking, + stream); - nxt_event_engine_post(engine, &ra->work); - - return; + if (cancelled) { + nxt_debug(task, "stream #%uD: cancelled by router", stream); } - nxt_debug(task, "ra stream #%uD release", ra->stream); + for (b = msg_info->buf; b != NULL; b = next) { + next = b->next; - rc = ra->rc; + b->completion_handler = msg_info->completion_handler; - if (rc != NULL) { - if (ra->app_pid != -1) { - nxt_port_rpc_ex_set_peer(task, engine->port, rc, ra->app_pid); + if (b->is_port_mmap_sent) { + b->is_port_mmap_sent = cancelled == 0; + b->completion_handler(task, b, b->parent); } - - rc->app_port = ra->app_port; - - ra->app_port = NULL; - rc->ra = NULL; - ra->rc = NULL; } - if (ra->app_port != NULL) { - nxt_router_app_port_release(task, ra->app_port, 0, 1); - - ra->app_port = NULL; - } + msg_info->buf = NULL; - if (ra->mem_pool != NULL) { - nxt_mp_release(ra->mem_pool, ra); - } + return cancelled; } static void -nxt_router_ra_abort(nxt_task_t *task, void *obj, void *data) +nxt_router_ra_release(nxt_task_t *task, void *obj, void *data) { - nxt_conn_t *c; nxt_req_app_link_t *ra; - nxt_req_conn_link_t *rc; nxt_event_engine_t *engine; + nxt_req_conn_link_t *rc; ra = obj; engine = data; if (task->thread->engine != engine) { - ra->work.handler = nxt_router_ra_abort; + if (ra->app_port != NULL) { + ra->app_pid = ra->app_port->pid; + } + + ra->work.handler = nxt_router_ra_release; ra->work.task = &engine->task; ra->work.next = NULL; - nxt_debug(task, "ra stream #%uD post abort to %p", ra->stream, engine); + nxt_debug(task, "ra stream #%uD post release to %p", + ra->stream, engine); nxt_event_engine_post(engine, &ra->work); return; } - nxt_debug(task, "ra stream #%uD abort", ra->stream); + nxt_debug(task, "ra stream #%uD release", ra->stream); rc = ra->rc; if (rc != NULL) { - c = rc->conn; + if (ra->app_pid != -1) { + nxt_port_rpc_ex_set_peer(task, engine->port, rc, ra->app_pid); + } - nxt_router_gen_error(task, c, 500, - "Failed to start application worker"); + if (nxt_slow_path(ra->err_code != 0)) { + nxt_router_gen_error(task, rc->conn, ra->err_code, ra->err_str); + + } else { + rc->app_port = ra->app_port; + rc->msg_info = ra->msg_info; + + ra->app_port = NULL; + ra->msg_info.buf = NULL; + } rc->ra = NULL; ra->rc = NULL; @@ -450,70 +450,20 @@ nxt_router_ra_abort(nxt_task_t *task, void *obj, void *data) ra->app_port = NULL; } + nxt_router_msg_cancel(task, &ra->msg_info, ra->stream); + if (ra->mem_pool != NULL) { nxt_mp_release(ra->mem_pool, ra); } } -static void -nxt_router_ra_error_handler(nxt_task_t *task, void *obj, void *data) -{ - nxt_req_app_link_t *ra; - - ra = obj; - - nxt_router_ra_error(task, ra, ra->err_code, ra->err_str); -} - - -static void -nxt_router_ra_error(nxt_task_t *task, nxt_req_app_link_t *ra, int code, - const char* str) +nxt_inline void +nxt_router_ra_error(nxt_req_app_link_t *ra, int code, const char* str) { - nxt_conn_t *c; - nxt_req_conn_link_t *rc; - nxt_event_engine_t *engine; - - engine = ra->work.data; - - if (task->thread->engine != engine) { - ra->err_code = code; - ra->err_str = str; - - ra->work.handler = nxt_router_ra_error_handler; - ra->work.task = &engine->task; - ra->work.next = NULL; - - nxt_debug(task, "ra stream #%uD post error to %p", ra->stream, engine); - - nxt_event_engine_post(engine, &ra->work); - - return; - } - - nxt_debug(task, "ra stream #%uD error", ra->stream); - - rc = ra->rc; - - if (rc != NULL) { - c = rc->conn; - - nxt_router_gen_error(task, c, code, str); - - rc->ra = NULL; - ra->rc = NULL; - } - - if (ra->app_port != NULL) { - nxt_router_app_port_release(task, ra->app_port, 0, 1); - - ra->app_port = NULL; - } - - if (ra->mem_pool != NULL) { - nxt_mp_release(ra->mem_pool, ra); - } + ra->app_port = NULL; + ra->err_code = code; + ra->err_str = str; } @@ -528,6 +478,8 @@ nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc) rc->app_port = NULL; } + nxt_router_msg_cancel(task, &rc->msg_info, rc->stream); + ra = rc->ra; if (ra != NULL) { @@ -601,6 +553,10 @@ nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) return; } + nxt_debug(task, "nxt_router_conf_data_handler(%d): %*s", + nxt_buf_used_size(msg->buf), + nxt_buf_used_size(msg->buf), msg->buf->mem.pos); + b = nxt_buf_chk_make_plain(tmcf->conf->mem_pool, msg->buf, msg->size); nxt_assert(b != NULL); @@ -2417,8 +2373,6 @@ nxt_router_get_error_buf(nxt_task_t *task, nxt_mp_t *mp, int code, b->mem.free = nxt_cpymem(b->mem.free, str, nxt_strlen(str)); - nxt_log_alert(task->log, "error %d: %s", code, str); - last = nxt_buf_sync_alloc(mp, NXT_BUF_SYNC_LAST); if (nxt_slow_path(last == NULL)) { @@ -2442,13 +2396,16 @@ nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code, /* TODO: fix when called in the middle of response */ + nxt_log_alert(task->log, "error %d: %s", code, str); + + if (c->socket.fd == -1) { + return; + } + mp = c->mem_pool; b = nxt_router_get_error_buf(task, mp, code, str); - - if (c->socket.fd == -1) { - nxt_mp_free(mp, b->next); - nxt_mp_free(mp, b); + if (nxt_slow_path(b == NULL)) { return; } @@ -2533,7 +2490,8 @@ nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_debug(task, "app '%V' %p abort next stream #%uD", &app->name, app, ra->stream); - nxt_router_ra_abort(task, ra, ra->work.data); + nxt_router_ra_error(ra, 500, "Failed to start application worker"); + nxt_router_ra_release(task, ra, ra->work.data); } nxt_router_app_use(task, app, -1); @@ -3198,9 +3156,16 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, static void +nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj, void *data) +{ +} + + +static void nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra) { uint32_t request_failed; + nxt_buf_t *b; nxt_int_t res; nxt_port_t *port, *c_port, *reply_port; nxt_app_wmsg_t wmsg; @@ -3220,9 +3185,8 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra) res = nxt_port_send_port(task, port, reply_port, 0); if (nxt_slow_path(res != NXT_OK)) { - nxt_router_ra_error(task, ra, 500, + nxt_router_ra_error(ra, 500, "Failed to send reply port to application"); - ra = NULL; goto release_port; } @@ -3237,9 +3201,8 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra) res = port->app->prepare_msg(task, &ap->r, &wmsg); if (nxt_slow_path(res != NXT_OK)) { - nxt_router_ra_error(task, ra, 500, + nxt_router_ra_error(ra, 500, "Failed to prepare message for application"); - ra = NULL; goto release_port; } @@ -3249,13 +3212,28 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra) request_failed = 0; - res = nxt_port_socket_write(task, wmsg.port, NXT_PORT_MSG_DATA, - -1, ra->stream, reply_port->id, wmsg.write); + ra->msg_info.buf = wmsg.write; + ra->msg_info.completion_handler = wmsg.write->completion_handler; + + for (b = wmsg.write; b != NULL; b = b->next) { + b->completion_handler = nxt_router_dummy_buf_completion; + } + + res = nxt_port_mmap_get_tracking(task, port, &ra->msg_info.tracking, + ra->stream); + if (nxt_slow_path(res != NXT_OK)) { + nxt_router_ra_error(ra, 500, + "Failed to get tracking area"); + goto release_port; + } + + res = nxt_port_socket_twrite(task, wmsg.port, NXT_PORT_MSG_DATA, + -1, ra->stream, reply_port->id, wmsg.write, + &ra->msg_info.tracking); if (nxt_slow_path(res != NXT_OK)) { - nxt_router_ra_error(task, ra, 500, + nxt_router_ra_error(ra, 500, "Failed to send message to application"); - ra = NULL; goto release_port; } @@ -3263,13 +3241,7 @@ release_port: nxt_router_app_port_release(task, port, request_failed, 0); - if (ra != NULL) { - if (request_failed != 0) { - ra->app_port = 0; - } - - nxt_router_ra_release(task, ra, ra->work.data); - } + nxt_router_ra_release(task, ra, ra->work.data); } |