diff options
-rw-r--r-- | src/go/unit/nxt_go_port_memory.c | 5 | ||||
-rw-r--r-- | src/nxt_port_memory.c | 38 | ||||
-rw-r--r-- | src/nxt_port_memory_int.h | 3 |
3 files changed, 30 insertions, 16 deletions
diff --git a/src/go/unit/nxt_go_port_memory.c b/src/go/unit/nxt_go_port_memory.c index d2c0a0aa..310e17cb 100644 --- a/src/go/unit/nxt_go_port_memory.c +++ b/src/go/unit/nxt_go_port_memory.c @@ -105,7 +105,8 @@ nxt_go_new_port_mmap(nxt_go_process_t *process, nxt_port_id_t id) memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map)); hdr->id = process->outgoing.nelts - 1; - hdr->pid = process->pid; + hdr->src_pid = getpid(); + hdr->dst_pid = process->pid; hdr->sent_over = id; /* Mark first chunk as busy */ @@ -136,7 +137,7 @@ nxt_go_new_port_mmap(nxt_go_process_t *process, nxt_port_id_t id) */ memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int)); - rc = nxt_go_port_send(hdr->pid, id, &port_msg, sizeof(port_msg), + rc = nxt_go_port_send(hdr->dst_pid, id, &port_msg, sizeof(port_msg), &cmsg, sizeof(cmsg)); nxt_go_debug("new mmap #%d created for %d -> %d", diff --git a/src/nxt_port_memory.c b/src/nxt_port_memory.c index 677421e3..997ce383 100644 --- a/src/nxt_port_memory.c +++ b/src/nxt_port_memory.c @@ -107,6 +107,13 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data) hdr = data; + if (nxt_slow_path(hdr->src_pid != nxt_pid && hdr->dst_pid != nxt_pid)) { + nxt_debug(task, "mmap buf completion: mmap for other process pair " + "%PI->%PI", hdr->src_pid, hdr->dst_pid); + + goto release_buf; + } + if (b->is_port_mmap_sent && b->mem.pos > b->mem.start) { /* * Chunks until b->mem.pos has been sent to other side, @@ -123,9 +130,9 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data) nxt_port_mmap_free_junk(p, b->mem.end - p); - nxt_debug(task, "mmap buf completion: %p [%p,%d] (sent=%d), %PI,%d,%d", b, - b->mem.start, b->mem.end - b->mem.start, b->is_port_mmap_sent, - hdr->pid, hdr->id, c); + nxt_debug(task, "mmap buf completion: %p [%p,%d] (sent=%d), " + "%PI->%PI,%d,%d", b, b->mem.start, b->mem.end - b->mem.start, + b->is_port_mmap_sent, hdr->src_pid, hdr->dst_pid, hdr->id, c); while (p < b->mem.end) { nxt_port_mmap_set_chunk_free(hdr, c); @@ -134,6 +141,8 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data) c++; } +release_buf: + nxt_mp_release(mp, b); } @@ -198,6 +207,9 @@ nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process, nxt_abort(); } + nxt_assert(hdr->src_pid == process->pid); + nxt_assert(hdr->dst_pid == nxt_pid); + hdr->sent_over = 0xFFFFu; fail: @@ -298,7 +310,8 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map)); hdr->id = process->outgoing->nelts - 1; - hdr->pid = process->pid; + hdr->src_pid = nxt_pid; + hdr->dst_pid = process->pid; hdr->sent_over = port->id; /* Mark first chunk as busy */ @@ -307,8 +320,7 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, /* Mark as busy chunk followed the last available chunk. */ nxt_port_mmap_set_chunk_busy(hdr, PORT_MMAP_CHUNK_COUNT); - nxt_debug(task, "send mmap fd %FD to process %PI", fd, - port->pid); + nxt_debug(task, "send mmap fd %FD to process %PI", fd, port->pid); /* TODO handle error */ (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL); @@ -452,9 +464,9 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size) nchunks++; } - nxt_debug(task, "outgoing mmap buf allocation: %p [%p,%d] %PI,%d,%d", b, - b->mem.start, b->mem.end - b->mem.start, - hdr->pid, hdr->id, c); + nxt_debug(task, "outgoing mmap buf allocation: %p [%p,%d] %PI->%PI,%d,%d", + b, b->mem.start, b->mem.end - b->mem.start, + hdr->src_pid, hdr->dst_pid, hdr->id, c); c++; nchunks--; @@ -576,9 +588,9 @@ nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port, b->parent = hdr; - nxt_debug(task, "incoming mmap buf allocation: %p [%p,%d] %PI,%d,%d", + nxt_debug(task, "incoming mmap buf allocation: %p [%p,%d] %PI->%PI,%d,%d", b, b->mem.start, b->mem.end - b->mem.start, - hdr->pid, hdr->id, mmap_msg->chunk_id); + hdr->src_pid, hdr->dst_pid, hdr->id, mmap_msg->chunk_id); return b; } @@ -699,10 +711,10 @@ nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b) break; } - if (port->pid != hdr->pid) { + if (port->pid != hdr->dst_pid) { nxt_log_error(NXT_LOG_ERR, task->log, "send mmap buffer for %PI to %PI, " - "using plain mode", hdr->pid, port->pid); + "using plain mode", hdr->dst_pid, port->pid); m = NXT_PORT_METHOD_PLAIN; diff --git a/src/nxt_port_memory_int.h b/src/nxt_port_memory_int.h index 3c6c6ac6..6ccc3d83 100644 --- a/src/nxt_port_memory_int.h +++ b/src/nxt_port_memory_int.h @@ -48,7 +48,8 @@ typedef nxt_atomic_uint_t nxt_free_map_t; /* Mapped at the start of shared memory segment. */ struct nxt_port_mmap_header_s { uint32_t id; - nxt_pid_t pid; /* For sanity check. */ + nxt_pid_t src_pid; /* For sanity check. */ + nxt_pid_t dst_pid; /* For sanity check. */ nxt_port_id_t sent_over; nxt_free_map_t free_map[MAX_FREE_IDX]; }; |