diff options
-rw-r--r-- | src/nxt_port_memory.c | 221 | ||||
-rw-r--r-- | src/nxt_port_memory.h | 5 | ||||
-rw-r--r-- | src/nxt_port_memory_int.h | 7 |
3 files changed, 146 insertions, 87 deletions
diff --git a/src/nxt_port_memory.c b/src/nxt_port_memory.c index db2ad2df..fa2ce449 100644 --- a/src/nxt_port_memory.c +++ b/src/nxt_port_memory.c @@ -16,12 +16,21 @@ #include <nxt_port_memory_int.h> + nxt_inline void -nxt_port_mmap_destroy(nxt_port_mmap_t *port_mmap) +nxt_port_mmap_handler_use(nxt_port_mmap_handler_t *mmap_handler, int i) { - if (port_mmap->hdr != NULL) { - nxt_mem_munmap(port_mmap->hdr, PORT_MMAP_SIZE); - port_mmap->hdr = NULL; + int c; + + c = nxt_atomic_fetch_add(&mmap_handler->use_count, i); + + if (i < 0 && c == -i) { + if (mmap_handler->hdr != NULL) { + nxt_mem_munmap(mmap_handler->hdr, PORT_MMAP_SIZE); + mmap_handler->hdr = NULL; + } + + nxt_free(mmap_handler); } } @@ -70,7 +79,7 @@ nxt_port_mmap_at(nxt_port_mmaps_t *port_mmaps, uint32_t i) void -nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free) +nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free_elts) { uint32_t i; nxt_port_mmap_t *port_mmap; @@ -82,12 +91,12 @@ nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free) port_mmap = port_mmaps->elts; for (i = 0; i < port_mmaps->size; i++) { - nxt_port_mmap_destroy(port_mmap + i); + nxt_port_mmap_handler_use(port_mmap[i].mmap_handler, -1); } port_mmaps->size = 0; - if (free != 0) { + if (free_elts != 0) { nxt_free(port_mmaps->elts); } } @@ -100,11 +109,12 @@ nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free) static void nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data) { - u_char *p; - nxt_mp_t *mp; - nxt_buf_t *b; - nxt_chunk_id_t c; - nxt_port_mmap_header_t *hdr; + u_char *p; + nxt_mp_t *mp; + nxt_buf_t *b; + nxt_chunk_id_t c; + nxt_port_mmap_header_t *hdr; + nxt_port_mmap_handler_t *mmap_handler; if (nxt_buf_ts_handle(task, obj, data)) { return; @@ -122,7 +132,8 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data) } #endif - hdr = data; + mmap_handler = data; + hdr = mmap_handler->hdr; if (nxt_slow_path(hdr->src_pid != nxt_pid && hdr->dst_pid != nxt_pid)) { nxt_debug(task, "mmap buf completion: mmap for other process pair " @@ -160,18 +171,21 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data) release_buf: + nxt_port_mmap_handler_use(mmap_handler, -1); + nxt_mp_release(mp, b); } -nxt_port_mmap_header_t * +nxt_port_mmap_handler_t * nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process, nxt_fd_t fd) { - void *mem; - struct stat mmap_stat; - nxt_port_mmap_t *port_mmap; - nxt_port_mmap_header_t *hdr; + void *mem; + struct stat mmap_stat; + nxt_port_mmap_t *port_mmap; + nxt_port_mmap_header_t *hdr; + nxt_port_mmap_handler_t *mmap_handler; nxt_debug(task, "got new mmap fd #%FD from process %PI", fd, process->pid); @@ -196,6 +210,15 @@ nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process, hdr = mem; + mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t)); + if (nxt_slow_path(mmap_handler == NULL)) { + nxt_log(task, NXT_LOG_WARN, "failed to allocate mmap_handler"); + + return NULL; + } + + mmap_handler->hdr = hdr; + nxt_thread_mutex_lock(&process->incoming.mutex); port_mmap = nxt_port_mmap_at(&process->incoming, hdr->id); @@ -205,13 +228,17 @@ nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process, nxt_mem_munmap(mem, PORT_MMAP_SIZE); hdr = NULL; + nxt_free(mmap_handler); + mmap_handler = NULL; + goto fail; } nxt_assert(hdr->src_pid == process->pid); nxt_assert(hdr->dst_pid == nxt_pid); - port_mmap->hdr = hdr; + port_mmap->mmap_handler = mmap_handler; + nxt_port_mmap_handler_use(mmap_handler, 1); hdr->sent_over = 0xFFFFu; @@ -219,25 +246,34 @@ fail: nxt_thread_mutex_unlock(&process->incoming.mutex); - return hdr; + return mmap_handler; } -static nxt_port_mmap_header_t * +static nxt_port_mmap_handler_t * nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, nxt_port_t *port) { - void *mem; - u_char *p, name[64]; - nxt_fd_t fd; - nxt_port_mmap_t *port_mmap; - nxt_port_mmap_header_t *hdr; + void *mem; + u_char *p, name[64]; + nxt_fd_t fd; + nxt_port_mmap_t *port_mmap; + nxt_port_mmap_header_t *hdr; + nxt_port_mmap_handler_t *mmap_handler; + + mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t)); + if (nxt_slow_path(mmap_handler == NULL)) { + nxt_log(task, NXT_LOG_WARN, "failed to allocate mmap_handler"); + + return NULL; + } port_mmap = nxt_port_mmap_at(&process->outgoing, process->outgoing.size); if (nxt_slow_path(port_mmap == NULL)) { nxt_log(task, NXT_LOG_WARN, "failed to add port mmap to outgoing array"); + nxt_free(mmap_handler); return NULL; } @@ -293,10 +329,12 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, goto remove_fail; } - port_mmap->hdr = mem; + mmap_handler->hdr = mem; + port_mmap->mmap_handler = mmap_handler; + nxt_port_mmap_handler_use(mmap_handler, 1); /* Init segment header. */ - hdr = port_mmap->hdr; + hdr = mmap_handler->hdr; nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map)); @@ -319,24 +357,27 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> %PI", hdr->id, nxt_pid, process->pid); - return hdr; + return mmap_handler; remove_fail: + nxt_free(mmap_handler); + process->outgoing.size--; return NULL; } -static nxt_port_mmap_header_t * +static nxt_port_mmap_handler_t * nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c, size_t size) { - nxt_process_t *process; - nxt_port_mmap_t *port_mmap; - nxt_port_mmap_t *end_port_mmap; - nxt_port_mmap_header_t *hdr; + nxt_process_t *process; + nxt_port_mmap_t *port_mmap; + nxt_port_mmap_t *end_port_mmap; + nxt_port_mmap_header_t *hdr; + nxt_port_mmap_handler_t *mmap_handler; process = port->process; if (nxt_slow_path(process == NULL)) { @@ -344,75 +385,72 @@ nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c, } *c = 0; - port_mmap = NULL; - hdr = NULL; nxt_thread_mutex_lock(&process->outgoing.mutex); - port_mmap = process->outgoing.elts; - end_port_mmap = port_mmap + process->outgoing.size; + end_port_mmap = process->outgoing.elts + process->outgoing.size; - while (port_mmap < end_port_mmap) { + for (port_mmap = process->outgoing.elts; + port_mmap < end_port_mmap; + port_mmap++) + { + mmap_handler = port_mmap->mmap_handler; + hdr = mmap_handler->hdr; - if ( (port_mmap->hdr->sent_over == 0xFFFFu || - port_mmap->hdr->sent_over == port->id) && - nxt_port_mmap_get_free_chunk(port_mmap->hdr, c)) { - hdr = port_mmap->hdr; + if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port->id) { + continue; + } + if (nxt_port_mmap_get_free_chunk(hdr, c)) { goto unlock_return; } - - port_mmap++; } /* TODO introduce port_mmap limit and release wait. */ - hdr = nxt_port_new_port_mmap(task, process, port); + mmap_handler = nxt_port_new_port_mmap(task, process, port); unlock_return: nxt_thread_mutex_unlock(&process->outgoing.mutex); - return hdr; + return mmap_handler; } -static nxt_port_mmap_header_t * +static nxt_port_mmap_handler_t * nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id) { - nxt_process_t *process; - nxt_port_mmap_header_t *hdr; + nxt_process_t *process; + nxt_port_mmap_handler_t *mmap_handler; process = nxt_runtime_process_find(task->thread->runtime, spid); if (nxt_slow_path(process == NULL)) { return NULL; } - hdr = NULL; + mmap_handler = NULL; nxt_thread_mutex_lock(&process->incoming.mutex); if (nxt_fast_path(process->incoming.size > id)) { - hdr = process->incoming.elts[id].hdr; - - } else { - nxt_log(task, NXT_LOG_WARN, - "failed to get incoming mmap #%d for process %PI", id, spid); + mmap_handler = process->incoming.elts[id].mmap_handler; } nxt_thread_mutex_unlock(&process->incoming.mutex); - return hdr; + return mmap_handler; } nxt_buf_t * nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size) { - size_t nchunks; - nxt_buf_t *b; - nxt_chunk_id_t c; - nxt_port_mmap_header_t *hdr; + size_t nchunks; + nxt_buf_t *b; + nxt_chunk_id_t c; + nxt_port_mmap_header_t *hdr; + nxt_port_mmap_handler_t *mmap_handler; nxt_debug(task, "request %z bytes shm buffer", size); @@ -424,13 +462,17 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size) b->completion_handler = nxt_port_mmap_buf_completion; nxt_buf_set_port_mmap(b); - hdr = nxt_port_mmap_get(task, port, &c, size); - if (nxt_slow_path(hdr == NULL)) { + mmap_handler = nxt_port_mmap_get(task, port, &c, size); + if (nxt_slow_path(mmap_handler == NULL)) { nxt_mp_release(task->thread->engine->mem_pool, b); return NULL; } - b->parent = hdr; + b->parent = mmap_handler; + + nxt_port_mmap_handler_use(mmap_handler, 1); + + hdr = mmap_handler->hdr; b->mem.start = nxt_port_mmap_chunk_start(hdr, c); b->mem.pos = b->mem.start; @@ -469,9 +511,10 @@ nxt_int_t nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size, size_t min_size) { - size_t nchunks, free_size; - nxt_chunk_id_t c, start; - nxt_port_mmap_header_t *hdr; + size_t nchunks, free_size; + nxt_chunk_id_t c, start; + nxt_port_mmap_header_t *hdr; + nxt_port_mmap_handler_t *mmap_handler; nxt_debug(task, "request increase %z bytes shm buffer", size); @@ -487,7 +530,8 @@ nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size, return NXT_OK; } - hdr = b->parent; + mmap_handler = b->parent; + hdr = mmap_handler->hdr; start = nxt_port_mmap_chunk_id(hdr, b->mem.end); @@ -536,12 +580,14 @@ static nxt_buf_t * nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port, nxt_pid_t spid, nxt_port_mmap_msg_t *mmap_msg) { - size_t nchunks; - nxt_buf_t *b; - nxt_port_mmap_header_t *hdr; - - hdr = nxt_port_get_port_incoming_mmap(task, spid, mmap_msg->mmap_id); - if (nxt_slow_path(hdr == NULL)) { + size_t nchunks; + nxt_buf_t *b; + nxt_port_mmap_header_t *hdr; + nxt_port_mmap_handler_t *mmap_handler; + + mmap_handler = nxt_port_get_port_incoming_mmap(task, spid, + mmap_msg->mmap_id); + if (nxt_slow_path(mmap_handler == NULL)) { return NULL; } @@ -559,12 +605,15 @@ nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port, nchunks++; } + hdr = mmap_handler->hdr; + b->mem.start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id); b->mem.pos = b->mem.start; b->mem.free = b->mem.start + mmap_msg->size; b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE; - b->parent = hdr; + b->parent = mmap_handler; + nxt_port_mmap_handler_use(mmap_handler, 1); nxt_debug(task, "incoming mmap buf allocation: %p [%p,%d] %PI->%PI,%d,%d", b, b->mem.start, b->mem.end - b->mem.start, @@ -578,11 +627,12 @@ void nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port, nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb) { - size_t bsize; - nxt_buf_t *bmem; - nxt_uint_t i; - nxt_port_mmap_msg_t *mmap_msg; - nxt_port_mmap_header_t *hdr; + size_t bsize; + nxt_buf_t *bmem; + nxt_uint_t i; + nxt_port_mmap_msg_t *mmap_msg; + nxt_port_mmap_header_t *hdr; + nxt_port_mmap_handler_t *mmap_handler; nxt_debug(task, "prepare %z bytes message for transfer to process %PI " "via shared memory", sb->size, port->pid); @@ -606,7 +656,8 @@ nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port, /* TODO clear b and exit */ } - hdr = bmem->parent; + mmap_handler = bmem->parent; + hdr = mmap_handler->hdr; mmap_msg->mmap_id = hdr->id; mmap_msg->chunk_id = nxt_port_mmap_chunk_id(hdr, bmem->mem.pos); @@ -667,8 +718,9 @@ nxt_port_mmap_read(nxt_task_t *task, nxt_port_t *port, nxt_port_method_t nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b) { - nxt_port_method_t m; - nxt_port_mmap_header_t *hdr; + nxt_port_method_t m; + nxt_port_mmap_header_t *hdr; + nxt_port_mmap_handler_t *mmap_handler; m = NXT_PORT_METHOD_ANY; @@ -679,7 +731,8 @@ nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b) } if (nxt_buf_is_port_mmap(b)) { - hdr = b->parent; + mmap_handler = b->parent; + hdr = mmap_handler->hdr; if (m == NXT_PORT_METHOD_PLAIN) { nxt_log_error(NXT_LOG_ERR, task->log, diff --git a/src/nxt_port_memory.h b/src/nxt_port_memory.h index 51c40411..d1f29df8 100644 --- a/src/nxt_port_memory.h +++ b/src/nxt_port_memory.h @@ -11,8 +11,9 @@ #define PORT_MMAP_MIN_SIZE (3 * sizeof(uint32_t)) typedef struct nxt_port_mmap_header_s nxt_port_mmap_header_t; +typedef struct nxt_port_mmap_handler_s nxt_port_mmap_handler_t; -void nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free); +void nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free_elts); /* * Allocates nxt_but_t structure from port's mem_pool, assigns this buf 'mem' @@ -27,7 +28,7 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size); nxt_int_t nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size, size_t min_size); -nxt_port_mmap_header_t * +nxt_port_mmap_handler_t * nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process, nxt_fd_t fd); diff --git a/src/nxt_port_memory_int.h b/src/nxt_port_memory_int.h index 6ccc3d83..0168e1ae 100644 --- a/src/nxt_port_memory_int.h +++ b/src/nxt_port_memory_int.h @@ -55,12 +55,17 @@ struct nxt_port_mmap_header_s { }; +struct nxt_port_mmap_handler_s { + nxt_port_mmap_header_t *hdr; + nxt_atomic_t use_count; +}; + /* * Element of nxt_process_t.incoming/outgoing, shared memory segment * descriptor. */ struct nxt_port_mmap_s { - nxt_port_mmap_header_t *hdr; + nxt_port_mmap_handler_t *mmap_handler; }; typedef struct nxt_port_mmap_msg_s nxt_port_mmap_msg_t; |