diff options
-rw-r--r-- | src/nxt_port.h | 4 | ||||
-rw-r--r-- | src/nxt_port_memory.c | 131 | ||||
-rw-r--r-- | src/nxt_port_memory.h | 21 | ||||
-rw-r--r-- | src/nxt_port_memory_int.h | 7 | ||||
-rw-r--r-- | src/nxt_port_rpc.c | 1 | ||||
-rw-r--r-- | src/nxt_router.c | 1 | ||||
-rw-r--r-- | src/nxt_unit.c | 7 |
7 files changed, 0 insertions, 172 deletions
diff --git a/src/nxt_port.h b/src/nxt_port.h index 3b66edfd..6b4d3c8f 100644 --- a/src/nxt_port.h +++ b/src/nxt_port.h @@ -174,9 +174,6 @@ typedef struct { /* More Fragments followed. */ uint8_t mf; /* 1 bit */ - - /* Message delivery tracking enabled, next chunk is tracking msg. */ - uint8_t tracking; /* 1 bit */ } nxt_port_msg_t; @@ -186,7 +183,6 @@ typedef struct { size_t share; nxt_fd_t fd[2]; nxt_port_msg_t port_msg; - uint32_t tracking_msg[2]; uint8_t close_fd; /* 1 bit */ uint8_t allocated; /* 1 bit */ } nxt_port_send_msg_t; diff --git a/src/nxt_port_memory.c b/src/nxt_port_memory.c index e799f860..0a4a6c53 100644 --- a/src/nxt_port_memory.c +++ b/src/nxt_port_memory.c @@ -539,137 +539,6 @@ nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id) } -nxt_int_t -nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_mmaps_t *mmaps, - nxt_port_mmap_tracking_t *tracking, uint32_t stream) -{ - nxt_chunk_id_t c; - nxt_port_mmap_header_t *hdr; - nxt_port_mmap_handler_t *mmap_handler; - - nxt_debug(task, "request tracking for stream #%uD", stream); - - mmap_handler = nxt_port_mmap_get(task, mmaps, &c, 1, 1); - if (nxt_slow_path(mmap_handler == NULL)) { - return NXT_ERROR; - } - - nxt_port_mmap_handler_use(mmap_handler, 1); - - hdr = mmap_handler->hdr; - - tracking->mmap_handler = mmap_handler; - tracking->tracking = hdr->tracking + c; - - *tracking->tracking = stream; - - nxt_debug(task, "outgoing tracking allocation: %PI->%PI,%d,%d", - hdr->src_pid, hdr->dst_pid, hdr->id, c); - - return NXT_OK; -} - - -nxt_bool_t -nxt_port_mmap_tracking_cancel(nxt_task_t *task, - nxt_port_mmap_tracking_t *tracking, uint32_t stream) -{ - nxt_bool_t res; - nxt_chunk_id_t c; - nxt_port_mmap_header_t *hdr; - nxt_port_mmap_handler_t *mmap_handler; - - mmap_handler = tracking->mmap_handler; - - if (nxt_slow_path(mmap_handler == NULL)) { - return 0; - } - - hdr = mmap_handler->hdr; - - res = nxt_atomic_cmp_set(tracking->tracking, stream, 0); - - nxt_debug(task, "%s tracking for stream #%uD", - (res ? "cancelled" : "failed to cancel"), stream); - - if (!res) { - c = tracking->tracking - hdr->tracking; - nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c); - } - - nxt_port_mmap_handler_use(mmap_handler, -1); - - return res; -} - - -nxt_int_t -nxt_port_mmap_tracking_write(uint32_t *buf, nxt_port_mmap_tracking_t *t) -{ - nxt_port_mmap_handler_t *mmap_handler; - - mmap_handler = t->mmap_handler; - -#if (NXT_DEBUG) - { - nxt_atomic_t *tracking; - - tracking = mmap_handler->hdr->tracking; - - nxt_assert(t->tracking >= tracking); - nxt_assert(t->tracking < tracking + PORT_MMAP_CHUNK_COUNT); - } -#endif - - buf[0] = mmap_handler->hdr->id; - buf[1] = t->tracking - mmap_handler->hdr->tracking; - - return NXT_OK; -} - -nxt_bool_t -nxt_port_mmap_tracking_read(nxt_task_t *task, nxt_port_recv_msg_t *msg) -{ - nxt_buf_t *b; - nxt_bool_t res; - nxt_chunk_id_t c; - nxt_port_mmap_header_t *hdr; - nxt_port_mmap_handler_t *mmap_handler; - nxt_port_mmap_tracking_msg_t *tracking_msg; - - b = msg->buf; - - if (nxt_buf_used_size(b) < (int) sizeof(nxt_port_mmap_tracking_msg_t)) { - nxt_debug(task, "too small message %O", nxt_buf_used_size(b)); - return 0; - } - - tracking_msg = (nxt_port_mmap_tracking_msg_t *) b->mem.pos; - - b->mem.pos += sizeof(nxt_port_mmap_tracking_msg_t); - mmap_handler = nxt_port_get_port_incoming_mmap(task, msg->port_msg.pid, - tracking_msg->mmap_id); - - if (nxt_slow_path(mmap_handler == NULL)) { - return 0; - } - - hdr = mmap_handler->hdr; - - c = tracking_msg->tracking_id; - res = nxt_atomic_cmp_set(hdr->tracking + c, msg->port_msg.stream, 0); - - nxt_debug(task, "tracking for stream #%uD %s", msg->port_msg.stream, - (res ? "received" : "already cancelled")); - - if (!res) { - nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c); - } - - return res; -} - - nxt_buf_t * nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_mmaps_t *mmaps, size_t size) { diff --git a/src/nxt_port_memory.h b/src/nxt_port_memory.h index a2cdf5dd..f1e70964 100644 --- a/src/nxt_port_memory.h +++ b/src/nxt_port_memory.h @@ -15,27 +15,6 @@ typedef struct nxt_port_mmap_handler_s nxt_port_mmap_handler_t; void nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free_elts); -typedef struct nxt_port_mmap_tracking_s nxt_port_mmap_tracking_t; - -struct nxt_port_mmap_tracking_s { - void *mmap_handler; - nxt_atomic_t *tracking; -}; - -nxt_int_t -nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_mmaps_t *mmaps, - nxt_port_mmap_tracking_t *tracking, uint32_t stream); - -nxt_bool_t -nxt_port_mmap_tracking_cancel(nxt_task_t *task, - nxt_port_mmap_tracking_t *tracking, uint32_t stream); - -nxt_int_t -nxt_port_mmap_tracking_write(uint32_t *buf, nxt_port_mmap_tracking_t *t); - -nxt_bool_t -nxt_port_mmap_tracking_read(nxt_task_t *task, nxt_port_recv_msg_t *msg); - /* * Allocates nxt_but_t structure from task's thread engine mem_pool, assigns * this buf 'mem' pointers to first available shared mem bucket(s). 'size' diff --git a/src/nxt_port_memory_int.h b/src/nxt_port_memory_int.h index d2524ee4..c84615d5 100644 --- a/src/nxt_port_memory_int.h +++ b/src/nxt_port_memory_int.h @@ -84,13 +84,6 @@ struct nxt_port_mmap_msg_s { }; -typedef struct nxt_port_mmap_tracking_msg_s nxt_port_mmap_tracking_msg_t; - -struct nxt_port_mmap_tracking_msg_s { - uint32_t mmap_id; /* Mmap index in nxt_process_t.outgoing. */ - nxt_chunk_id_t tracking_id; /* Tracking index. */ -}; - nxt_inline nxt_bool_t nxt_port_mmap_get_free_chunk(nxt_free_map_t *m, nxt_chunk_id_t *c); diff --git a/src/nxt_port_rpc.c b/src/nxt_port_rpc.c index 0cac5cbb..28590933 100644 --- a/src/nxt_port_rpc.c +++ b/src/nxt_port_rpc.c @@ -512,7 +512,6 @@ nxt_port_rpc_close(nxt_task_t *task, nxt_port_t *port) msg.port_msg.mmap = 0; msg.port_msg.nf = 0; msg.port_msg.mf = 0; - msg.port_msg.tracking = 0; msg.size = 0; msg.cancelled = 0; msg.u.data = NULL; diff --git a/src/nxt_router.c b/src/nxt_router.c index 48547a75..6b69b2ce 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -5384,7 +5384,6 @@ nxt_router_app_prepare_request(nxt_task_t *task, msg.pm.mmap = 1; msg.pm.nf = 0; msg.pm.mf = 0; - msg.pm.tracking = 0; nxt_port_mmap_handler_t *mmap_handler = buf->parent; nxt_port_mmap_header_t *hdr = mmap_handler->hdr; diff --git a/src/nxt_unit.c b/src/nxt_unit.c index 32bb07ab..f183ac6e 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -942,7 +942,6 @@ nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd) msg.mmap = 0; msg.nf = 0; msg.mf = 0; - msg.tracking = 0; nxt_socket_msg_oob_init(&oob, fds); @@ -2644,7 +2643,6 @@ nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req, m.msg.mmap = hdr != NULL && m.mmap_msg.size > 0; m.msg.nf = 0; m.msg.mf = 0; - m.msg.tracking = 0; rc = NXT_UNIT_ERROR; @@ -3296,7 +3294,6 @@ skip_response_send: msg.mmap = 0; msg.nf = 0; msg.mf = 0; - msg.tracking = 0; (void) nxt_unit_port_send(req->ctx, req->response_port, &msg, sizeof(msg), NULL); @@ -3619,7 +3616,6 @@ nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) msg.mmap = 0; msg.nf = 0; msg.mf = 0; - msg.tracking = 0; res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL); if (nxt_slow_path(res != sizeof(msg))) { @@ -3905,7 +3901,6 @@ nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int fd) msg.mmap = 0; msg.nf = 0; msg.mf = 0; - msg.tracking = 0; nxt_socket_msg_oob_init(&oob, fds); @@ -4390,7 +4385,6 @@ nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid) msg.mmap = 0; msg.nf = 0; msg.mf = 0; - msg.tracking = 0; res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL); if (nxt_slow_path(res != sizeof(msg))) { @@ -5356,7 +5350,6 @@ nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst, m.msg.mmap = 0; m.msg.nf = 0; m.msg.mf = 0; - m.msg.tracking = 0; m.new_port.id = port->id.id; m.new_port.pid = port->id.pid; |