diff options
author | Max Romanov <max.romanov@nginx.com> | 2017-10-19 17:37:02 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2017-10-19 17:37:02 +0300 |
commit | 6031c63225838d6bc266c4f015d9a1085f600324 (patch) | |
tree | 5cf5e63f69820d087a91ebdcc786c46cd5bc1be0 /src/nxt_port_memory.c | |
parent | 6532e46465276efcedae299ce290eb8dff0ece57 (diff) | |
download | unit-6031c63225838d6bc266c4f015d9a1085f600324.tar.gz unit-6031c63225838d6bc266c4f015d9a1085f600324.tar.bz2 |
Introducing mmap_handler to count references to shared memory.
"All problems in computer science can be
solved by another level of indirection"
Butler Lampson
Completion handlers for application response buffers executed after
sending the data to client. Application worker can be stopped right
after send response buffers to router. Worker stop causes removal
of all data structures for the worker.
To prevent shared memory segment unmap, need to count the number of
buffers which uses it. So instead of direct reference to shared
memory, need to reference to intermediate 'handler' structure with
use counter and pointer to shared memory.
Diffstat (limited to '')
-rw-r--r-- | src/nxt_port_memory.c | 221 |
1 files changed, 137 insertions, 84 deletions
diff --git a/src/nxt_port_memory.c b/src/nxt_port_memory.c index db2ad2df..fa2ce449 100644 --- a/src/nxt_port_memory.c +++ b/src/nxt_port_memory.c @@ -16,12 +16,21 @@ #include <nxt_port_memory_int.h> + nxt_inline void -nxt_port_mmap_destroy(nxt_port_mmap_t *port_mmap) +nxt_port_mmap_handler_use(nxt_port_mmap_handler_t *mmap_handler, int i) { - if (port_mmap->hdr != NULL) { - nxt_mem_munmap(port_mmap->hdr, PORT_MMAP_SIZE); - port_mmap->hdr = NULL; + int c; + + c = nxt_atomic_fetch_add(&mmap_handler->use_count, i); + + if (i < 0 && c == -i) { + if (mmap_handler->hdr != NULL) { + nxt_mem_munmap(mmap_handler->hdr, PORT_MMAP_SIZE); + mmap_handler->hdr = NULL; + } + + nxt_free(mmap_handler); } } @@ -70,7 +79,7 @@ nxt_port_mmap_at(nxt_port_mmaps_t *port_mmaps, uint32_t i) void -nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free) +nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free_elts) { uint32_t i; nxt_port_mmap_t *port_mmap; @@ -82,12 +91,12 @@ nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free) port_mmap = port_mmaps->elts; for (i = 0; i < port_mmaps->size; i++) { - nxt_port_mmap_destroy(port_mmap + i); + nxt_port_mmap_handler_use(port_mmap[i].mmap_handler, -1); } port_mmaps->size = 0; - if (free != 0) { + if (free_elts != 0) { nxt_free(port_mmaps->elts); } } @@ -100,11 +109,12 @@ nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free) static void nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data) { - u_char *p; - nxt_mp_t *mp; - nxt_buf_t *b; - nxt_chunk_id_t c; - nxt_port_mmap_header_t *hdr; + u_char *p; + nxt_mp_t *mp; + nxt_buf_t *b; + nxt_chunk_id_t c; + nxt_port_mmap_header_t *hdr; + nxt_port_mmap_handler_t *mmap_handler; if (nxt_buf_ts_handle(task, obj, data)) { return; @@ -122,7 +132,8 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data) } #endif - hdr = data; + mmap_handler = data; + hdr = mmap_handler->hdr; if (nxt_slow_path(hdr->src_pid != nxt_pid && hdr->dst_pid != nxt_pid)) { nxt_debug(task, "mmap buf completion: mmap for other process pair " @@ -160,18 +171,21 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data) release_buf: + nxt_port_mmap_handler_use(mmap_handler, -1); + nxt_mp_release(mp, b); } -nxt_port_mmap_header_t * +nxt_port_mmap_handler_t * nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process, nxt_fd_t fd) { - void *mem; - struct stat mmap_stat; - nxt_port_mmap_t *port_mmap; - nxt_port_mmap_header_t *hdr; + void *mem; + struct stat mmap_stat; + nxt_port_mmap_t *port_mmap; + nxt_port_mmap_header_t *hdr; + nxt_port_mmap_handler_t *mmap_handler; nxt_debug(task, "got new mmap fd #%FD from process %PI", fd, process->pid); @@ -196,6 +210,15 @@ nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process, hdr = mem; + mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t)); + if (nxt_slow_path(mmap_handler == NULL)) { + nxt_log(task, NXT_LOG_WARN, "failed to allocate mmap_handler"); + + return NULL; + } + + mmap_handler->hdr = hdr; + nxt_thread_mutex_lock(&process->incoming.mutex); port_mmap = nxt_port_mmap_at(&process->incoming, hdr->id); @@ -205,13 +228,17 @@ nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process, nxt_mem_munmap(mem, PORT_MMAP_SIZE); hdr = NULL; + nxt_free(mmap_handler); + mmap_handler = NULL; + goto fail; } nxt_assert(hdr->src_pid == process->pid); nxt_assert(hdr->dst_pid == nxt_pid); - port_mmap->hdr = hdr; + port_mmap->mmap_handler = mmap_handler; + nxt_port_mmap_handler_use(mmap_handler, 1); hdr->sent_over = 0xFFFFu; @@ -219,25 +246,34 @@ fail: nxt_thread_mutex_unlock(&process->incoming.mutex); - return hdr; + return mmap_handler; } -static nxt_port_mmap_header_t * +static nxt_port_mmap_handler_t * nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, nxt_port_t *port) { - void *mem; - u_char *p, name[64]; - nxt_fd_t fd; - nxt_port_mmap_t *port_mmap; - nxt_port_mmap_header_t *hdr; + void *mem; + u_char *p, name[64]; + nxt_fd_t fd; + nxt_port_mmap_t *port_mmap; + nxt_port_mmap_header_t *hdr; + nxt_port_mmap_handler_t *mmap_handler; + + mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t)); + if (nxt_slow_path(mmap_handler == NULL)) { + nxt_log(task, NXT_LOG_WARN, "failed to allocate mmap_handler"); + + return NULL; + } port_mmap = nxt_port_mmap_at(&process->outgoing, process->outgoing.size); if (nxt_slow_path(port_mmap == NULL)) { nxt_log(task, NXT_LOG_WARN, "failed to add port mmap to outgoing array"); + nxt_free(mmap_handler); return NULL; } @@ -293,10 +329,12 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, goto remove_fail; } - port_mmap->hdr = mem; + mmap_handler->hdr = mem; + port_mmap->mmap_handler = mmap_handler; + nxt_port_mmap_handler_use(mmap_handler, 1); /* Init segment header. */ - hdr = port_mmap->hdr; + hdr = mmap_handler->hdr; nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map)); @@ -319,24 +357,27 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> %PI", hdr->id, nxt_pid, process->pid); - return hdr; + return mmap_handler; remove_fail: + nxt_free(mmap_handler); + process->outgoing.size--; return NULL; } -static nxt_port_mmap_header_t * +static nxt_port_mmap_handler_t * nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c, size_t size) { - nxt_process_t *process; - nxt_port_mmap_t *port_mmap; - nxt_port_mmap_t *end_port_mmap; - nxt_port_mmap_header_t *hdr; + nxt_process_t *process; + nxt_port_mmap_t *port_mmap; + nxt_port_mmap_t *end_port_mmap; + nxt_port_mmap_header_t *hdr; + nxt_port_mmap_handler_t *mmap_handler; process = port->process; if (nxt_slow_path(process == NULL)) { @@ -344,75 +385,72 @@ nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c, } *c = 0; - port_mmap = NULL; - hdr = NULL; nxt_thread_mutex_lock(&process->outgoing.mutex); - port_mmap = process->outgoing.elts; - end_port_mmap = port_mmap + process->outgoing.size; + end_port_mmap = process->outgoing.elts + process->outgoing.size; - while (port_mmap < end_port_mmap) { + for (port_mmap = process->outgoing.elts; + port_mmap < end_port_mmap; + port_mmap++) + { + mmap_handler = port_mmap->mmap_handler; + hdr = mmap_handler->hdr; - if ( (port_mmap->hdr->sent_over == 0xFFFFu || - port_mmap->hdr->sent_over == port->id) && - nxt_port_mmap_get_free_chunk(port_mmap->hdr, c)) { - hdr = port_mmap->hdr; + if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port->id) { + continue; + } + if (nxt_port_mmap_get_free_chunk(hdr, c)) { goto unlock_return; } - - port_mmap++; } /* TODO introduce port_mmap limit and release wait. */ - hdr = nxt_port_new_port_mmap(task, process, port); + mmap_handler = nxt_port_new_port_mmap(task, process, port); unlock_return: nxt_thread_mutex_unlock(&process->outgoing.mutex); - return hdr; + return mmap_handler; } -static nxt_port_mmap_header_t * +static nxt_port_mmap_handler_t * nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id) { - nxt_process_t *process; - nxt_port_mmap_header_t *hdr; + nxt_process_t *process; + nxt_port_mmap_handler_t *mmap_handler; process = nxt_runtime_process_find(task->thread->runtime, spid); if (nxt_slow_path(process == NULL)) { return NULL; } - hdr = NULL; + mmap_handler = NULL; nxt_thread_mutex_lock(&process->incoming.mutex); if (nxt_fast_path(process->incoming.size > id)) { - hdr = process->incoming.elts[id].hdr; - - } else { - nxt_log(task, NXT_LOG_WARN, - "failed to get incoming mmap #%d for process %PI", id, spid); + mmap_handler = process->incoming.elts[id].mmap_handler; } nxt_thread_mutex_unlock(&process->incoming.mutex); - return hdr; + return mmap_handler; } nxt_buf_t * nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size) { - size_t nchunks; - nxt_buf_t *b; - nxt_chunk_id_t c; - nxt_port_mmap_header_t *hdr; + size_t nchunks; + nxt_buf_t *b; + nxt_chunk_id_t c; + nxt_port_mmap_header_t *hdr; + nxt_port_mmap_handler_t *mmap_handler; nxt_debug(task, "request %z bytes shm buffer", size); @@ -424,13 +462,17 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size) b->completion_handler = nxt_port_mmap_buf_completion; nxt_buf_set_port_mmap(b); - hdr = nxt_port_mmap_get(task, port, &c, size); - if (nxt_slow_path(hdr == NULL)) { + mmap_handler = nxt_port_mmap_get(task, port, &c, size); + if (nxt_slow_path(mmap_handler == NULL)) { nxt_mp_release(task->thread->engine->mem_pool, b); return NULL; } - b->parent = hdr; + b->parent = mmap_handler; + + nxt_port_mmap_handler_use(mmap_handler, 1); + + hdr = mmap_handler->hdr; b->mem.start = nxt_port_mmap_chunk_start(hdr, c); b->mem.pos = b->mem.start; @@ -469,9 +511,10 @@ nxt_int_t nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size, size_t min_size) { - size_t nchunks, free_size; - nxt_chunk_id_t c, start; - nxt_port_mmap_header_t *hdr; + size_t nchunks, free_size; + nxt_chunk_id_t c, start; + nxt_port_mmap_header_t *hdr; + nxt_port_mmap_handler_t *mmap_handler; nxt_debug(task, "request increase %z bytes shm buffer", size); @@ -487,7 +530,8 @@ nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size, return NXT_OK; } - hdr = b->parent; + mmap_handler = b->parent; + hdr = mmap_handler->hdr; start = nxt_port_mmap_chunk_id(hdr, b->mem.end); @@ -536,12 +580,14 @@ static nxt_buf_t * nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port, nxt_pid_t spid, nxt_port_mmap_msg_t *mmap_msg) { - size_t nchunks; - nxt_buf_t *b; - nxt_port_mmap_header_t *hdr; - - hdr = nxt_port_get_port_incoming_mmap(task, spid, mmap_msg->mmap_id); - if (nxt_slow_path(hdr == NULL)) { + size_t nchunks; + nxt_buf_t *b; + nxt_port_mmap_header_t *hdr; + nxt_port_mmap_handler_t *mmap_handler; + + mmap_handler = nxt_port_get_port_incoming_mmap(task, spid, + mmap_msg->mmap_id); + if (nxt_slow_path(mmap_handler == NULL)) { return NULL; } @@ -559,12 +605,15 @@ nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port, nchunks++; } + hdr = mmap_handler->hdr; + b->mem.start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id); b->mem.pos = b->mem.start; b->mem.free = b->mem.start + mmap_msg->size; b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE; - b->parent = hdr; + b->parent = mmap_handler; + nxt_port_mmap_handler_use(mmap_handler, 1); nxt_debug(task, "incoming mmap buf allocation: %p [%p,%d] %PI->%PI,%d,%d", b, b->mem.start, b->mem.end - b->mem.start, @@ -578,11 +627,12 @@ void nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port, nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb) { - size_t bsize; - nxt_buf_t *bmem; - nxt_uint_t i; - nxt_port_mmap_msg_t *mmap_msg; - nxt_port_mmap_header_t *hdr; + size_t bsize; + nxt_buf_t *bmem; + nxt_uint_t i; + nxt_port_mmap_msg_t *mmap_msg; + nxt_port_mmap_header_t *hdr; + nxt_port_mmap_handler_t *mmap_handler; nxt_debug(task, "prepare %z bytes message for transfer to process %PI " "via shared memory", sb->size, port->pid); @@ -606,7 +656,8 @@ nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port, /* TODO clear b and exit */ } - hdr = bmem->parent; + mmap_handler = bmem->parent; + hdr = mmap_handler->hdr; mmap_msg->mmap_id = hdr->id; mmap_msg->chunk_id = nxt_port_mmap_chunk_id(hdr, bmem->mem.pos); @@ -667,8 +718,9 @@ nxt_port_mmap_read(nxt_task_t *task, nxt_port_t *port, nxt_port_method_t nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b) { - nxt_port_method_t m; - nxt_port_mmap_header_t *hdr; + nxt_port_method_t m; + nxt_port_mmap_header_t *hdr; + nxt_port_mmap_handler_t *mmap_handler; m = NXT_PORT_METHOD_ANY; @@ -679,7 +731,8 @@ nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b) } if (nxt_buf_is_port_mmap(b)) { - hdr = b->parent; + mmap_handler = b->parent; + hdr = mmap_handler->hdr; if (m == NXT_PORT_METHOD_PLAIN) { nxt_log_error(NXT_LOG_ERR, task->log, |