diff options
Diffstat (limited to 'src/nxt_port_memory.c')
-rw-r--r-- | src/nxt_port_memory.c | 211 |
1 files changed, 175 insertions, 36 deletions
diff --git a/src/nxt_port_memory.c b/src/nxt_port_memory.c index 9f27ab09..3a1ec198 100644 --- a/src/nxt_port_memory.c +++ b/src/nxt_port_memory.c @@ -163,7 +163,7 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data) b->is_port_mmap_sent, hdr->src_pid, hdr->dst_pid, hdr->id, c); while (p < b->mem.end) { - nxt_port_mmap_set_chunk_free(hdr, c); + nxt_port_mmap_set_chunk_free(hdr->free_map, c); p += PORT_MMAP_CHUNK_SIZE; c++; @@ -253,11 +253,12 @@ fail: static nxt_port_mmap_handler_t * nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, - nxt_port_t *port) + nxt_port_t *port, nxt_bool_t tracking) { void *mem; u_char *p, name[64]; nxt_fd_t fd; + nxt_free_map_t *free_map; nxt_port_mmap_t *port_mmap; nxt_port_mmap_header_t *hdr; nxt_port_mmap_handler_t *mmap_handler; @@ -342,6 +343,7 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, hdr = mmap_handler->hdr; nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map)); + nxt_memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map)); hdr->id = process->outgoing.size - 1; hdr->src_pid = nxt_pid; @@ -349,10 +351,13 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, hdr->sent_over = port->id; /* Mark first chunk as busy */ - nxt_port_mmap_set_chunk_busy(hdr, 0); + free_map = tracking ? hdr->free_tracking_map : hdr->free_map; + + nxt_port_mmap_set_chunk_busy(free_map, 0); /* Mark as busy chunk followed the last available chunk. */ - nxt_port_mmap_set_chunk_busy(hdr, PORT_MMAP_CHUNK_COUNT); + nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT); + nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT); nxt_debug(task, "send mmap fd %FD to process %PI", fd, port->pid); @@ -376,9 +381,10 @@ remove_fail: static nxt_port_mmap_handler_t * nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c, - size_t size) + nxt_bool_t tracking) { nxt_process_t *process; + nxt_free_map_t *free_map; nxt_port_mmap_t *port_mmap; nxt_port_mmap_t *end_port_mmap; nxt_port_mmap_header_t *hdr; @@ -406,14 +412,16 @@ nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c, continue; } - if (nxt_port_mmap_get_free_chunk(hdr, c)) { + free_map = tracking ? hdr->free_tracking_map : hdr->free_map; + + if (nxt_port_mmap_get_free_chunk(free_map, c)) { goto unlock_return; } } /* TODO introduce port_mmap limit and release wait. */ - mmap_handler = nxt_port_new_port_mmap(task, process, port); + mmap_handler = nxt_port_new_port_mmap(task, process, port, tracking); unlock_return: @@ -434,12 +442,15 @@ nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id) return NULL; } - mmap_handler = NULL; - nxt_thread_mutex_lock(&process->incoming.mutex); if (nxt_fast_path(process->incoming.size > id)) { mmap_handler = process->incoming.elts[id].mmap_handler; + + } else { + mmap_handler = NULL; + + nxt_debug(task, "invalid incoming mmap id %uD for pid %PI", id, spid); } nxt_thread_mutex_unlock(&process->incoming.mutex); @@ -448,6 +459,131 @@ nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id) } +nxt_int_t +nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_t *port, + nxt_port_mmap_tracking_t *tracking, uint32_t stream) +{ + nxt_chunk_id_t c; + nxt_port_mmap_header_t *hdr; + nxt_port_mmap_handler_t *mmap_handler; + + nxt_debug(task, "request tracking for stream #%uD", stream); + + mmap_handler = nxt_port_mmap_get(task, port, &c, 1); + if (nxt_slow_path(mmap_handler == NULL)) { + return NXT_ERROR; + } + + nxt_port_mmap_handler_use(mmap_handler, 1); + + hdr = mmap_handler->hdr; + + tracking->mmap_handler = mmap_handler; + tracking->tracking = hdr->tracking + c; + + *tracking->tracking = stream; + + nxt_debug(task, "outgoing tracking allocation: %PI->%PI,%d,%d", + hdr->src_pid, hdr->dst_pid, hdr->id, c); + + return NXT_OK; +} + + +nxt_bool_t +nxt_port_mmap_tracking_cancel(nxt_task_t *task, + nxt_port_mmap_tracking_t *tracking, uint32_t stream) +{ + nxt_bool_t res; + nxt_chunk_id_t c; + nxt_port_mmap_header_t *hdr; + nxt_port_mmap_handler_t *mmap_handler; + + mmap_handler = tracking->mmap_handler; + + if (nxt_slow_path(mmap_handler == NULL)) { + return 0; + } + + hdr = mmap_handler->hdr; + + res = nxt_atomic_cmp_set(tracking->tracking, stream, 0); + + nxt_debug(task, "%s tracking for stream #%uD", + (res ? "cancelled" : "failed to cancel"), stream); + + if (!res) { + c = tracking->tracking - hdr->tracking; + nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c); + } + + nxt_port_mmap_handler_use(mmap_handler, -1); + + return res; +} + + +nxt_int_t +nxt_port_mmap_tracking_write(uint32_t *buf, nxt_port_mmap_tracking_t *t) +{ + nxt_atomic_t *tracking; + nxt_port_mmap_handler_t *mmap_handler; + + mmap_handler = t->mmap_handler; + tracking = mmap_handler->hdr->tracking; + + nxt_assert(t->tracking >= tracking); + nxt_assert(t->tracking < tracking + PORT_MMAP_CHUNK_COUNT); + + buf[0] = mmap_handler->hdr->id; + buf[1] = t->tracking - mmap_handler->hdr->tracking; + + return NXT_OK; +} + +nxt_bool_t +nxt_port_mmap_tracking_read(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + nxt_buf_t *b; + nxt_bool_t res; + nxt_chunk_id_t c; + nxt_port_mmap_header_t *hdr; + nxt_port_mmap_handler_t *mmap_handler; + nxt_port_mmap_tracking_msg_t *tracking_msg; + + b = msg->buf; + + if (nxt_buf_used_size(b) < (int)sizeof(nxt_port_mmap_tracking_msg_t)) { + nxt_debug(task, "too small message %u", nxt_buf_used_size(b)); + return 0; + } + + tracking_msg = (nxt_port_mmap_tracking_msg_t *) b->mem.pos; + + b->mem.pos += sizeof(nxt_port_mmap_tracking_msg_t); + mmap_handler = nxt_port_get_port_incoming_mmap(task, msg->port_msg.pid, + tracking_msg->mmap_id); + + if (nxt_slow_path(mmap_handler == NULL)) { + return 0; + } + + hdr = mmap_handler->hdr; + + c = tracking_msg->tracking_id; + res = nxt_atomic_cmp_set(hdr->tracking + c, msg->port_msg.stream, 0); + + nxt_debug(task, "tracking for stream #%uD %s", msg->port_msg.stream, + (res ? "received" : "already cancelled")); + + if (!res) { + nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c); + } + + return res; +} + + nxt_buf_t * nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size) { @@ -467,7 +603,7 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size) b->completion_handler = nxt_port_mmap_buf_completion; nxt_buf_set_port_mmap(b); - mmap_handler = nxt_port_mmap_get(task, port, &c, size); + mmap_handler = nxt_port_mmap_get(task, port, &c, 0); if (nxt_slow_path(mmap_handler == NULL)) { nxt_mp_release(task->thread->engine->mem_pool, b); return NULL; @@ -499,7 +635,7 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size) /* Try to acquire as much chunks as required. */ while (nchunks > 0) { - if (nxt_port_mmap_chk_set_chunk_busy(hdr, c) == 0) { + if (nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, c) == 0) { break; } @@ -552,7 +688,7 @@ nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size, /* Try to acquire as much chunks as required. */ while (nchunks > 0) { - if (nxt_port_mmap_chk_set_chunk_busy(hdr, c) == 0) { + if (nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, c) == 0) { break; } @@ -565,7 +701,7 @@ nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size, { c--; while (c >= start) { - nxt_port_mmap_set_chunk_free(hdr, c); + nxt_port_mmap_set_chunk_free(hdr->free_map, c); c--; } @@ -683,40 +819,43 @@ nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port, void -nxt_port_mmap_read(nxt_task_t *task, nxt_port_t *port, - nxt_port_recv_msg_t *msg) +nxt_port_mmap_read(nxt_task_t *task, nxt_port_recv_msg_t *msg) { nxt_buf_t *b, **pb; nxt_port_mmap_msg_t *end, *mmap_msg; - b = msg->buf; - - mmap_msg = (nxt_port_mmap_msg_t *) b->mem.pos; - end = (nxt_port_mmap_msg_t *) b->mem.free; - pb = &msg->buf; msg->size = 0; - while (mmap_msg < end) { - nxt_debug(task, "mmap_msg={%D, %D, %D} from %PI", - mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size, - msg->port_msg.pid); + for (b = msg->buf; b != NULL; b = b->next) { - *pb = nxt_port_mmap_get_incoming_buf(task, port, msg->port_msg.pid, - mmap_msg); - if (nxt_slow_path(*pb == NULL)) { - nxt_log_error(NXT_LOG_ERR, task->log, "failed to get mmap buffer"); + mmap_msg = (nxt_port_mmap_msg_t *) b->mem.pos; + end = (nxt_port_mmap_msg_t *) b->mem.free; - break; - } + while (mmap_msg < end) { + nxt_assert(mmap_msg + 1 <= end); - msg->size += mmap_msg->size; - pb = &(*pb)->next; - mmap_msg++; - } + nxt_debug(task, "mmap_msg={%D, %D, %D} from %PI", + mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size, + msg->port_msg.pid); - /* Mark original buf as complete. */ - b->mem.pos += nxt_buf_used_size(b); + *pb = nxt_port_mmap_get_incoming_buf(task, msg->port, + msg->port_msg.pid, mmap_msg); + if (nxt_slow_path(*pb == NULL)) { + nxt_log_error(NXT_LOG_ERR, task->log, + "failed to get mmap buffer"); + + break; + } + + msg->size += mmap_msg->size; + pb = &(*pb)->next; + mmap_msg++; + + /* Mark original buf as complete. */ + b->mem.pos += sizeof(nxt_port_mmap_msg_t); + } + } } |