diff options
author | Max Romanov <max.romanov@nginx.com> | 2017-10-19 17:36:56 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2017-10-19 17:36:56 +0300 |
commit | 6532e46465276efcedae299ce290eb8dff0ece57 (patch) | |
tree | 89c2716d8eb96a9f527b9b87df522fb9e8661f3a /src/nxt_port_memory.c | |
parent | 6fd465f9d24040946f4218c281bd82517d5a350c (diff) | |
download | unit-6532e46465276efcedae299ce290eb8dff0ece57.tar.gz unit-6532e46465276efcedae299ce290eb8dff0ece57.tar.bz2 |
Supporting concurrent shared memory fd receive in router.
Two different router threads may send different requests to single
application worker. In this case shared memory fds from worker
to router will be send over 2 different router ports. These fds
will be received and processed by different threads in any order.
This patch made possible to add incoming shared memory segments in
arbitrary order. Additionally, array and memory pool are no longer
used to store segments because of pool's single threaded nature.
Custom array-like structure nxt_port_mmaps_t introduced.
Diffstat (limited to 'src/nxt_port_memory.c')
-rw-r--r-- | src/nxt_port_memory.c | 150 |
1 files changed, 64 insertions, 86 deletions
diff --git a/src/nxt_port_memory.c b/src/nxt_port_memory.c index 997ce383..db2ad2df 100644 --- a/src/nxt_port_memory.c +++ b/src/nxt_port_memory.c @@ -16,7 +16,7 @@ #include <nxt_port_memory_int.h> -void +nxt_inline void nxt_port_mmap_destroy(nxt_port_mmap_t *port_mmap) { if (port_mmap->hdr != NULL) { @@ -26,32 +26,51 @@ nxt_port_mmap_destroy(nxt_port_mmap_t *port_mmap) } -static nxt_array_t * -nxt_port_mmaps_create() +static nxt_port_mmap_t * +nxt_port_mmap_at(nxt_port_mmaps_t *port_mmaps, uint32_t i) { - nxt_mp_t *mp; + uint32_t cap; - mp = nxt_mp_create(1024, 128, 256, 32); + cap = port_mmaps->cap; - if (nxt_slow_path(mp == NULL)) { - return NULL; + if (cap == 0) { + cap = i + 1; } - return nxt_array_create(mp, 1, sizeof(nxt_port_mmap_t)); -} + while (i + 1 > cap) { + if (cap < 16) { + cap = cap * 2; -static nxt_port_mmap_t * -nxt_port_mmap_add(nxt_array_t *port_mmaps) -{ - nxt_mp_thread_adopt(port_mmaps->mem_pool); + } else { + cap = cap + cap / 2; + } + } + + if (cap != port_mmaps->cap) { + + port_mmaps->elts = nxt_realloc(port_mmaps->elts, + cap * sizeof(nxt_port_mmap_t)); + if (nxt_slow_path(port_mmaps->elts == NULL)) { + return NULL; + } + + nxt_memzero(port_mmaps->elts + port_mmaps->cap, + sizeof(nxt_port_mmap_t) * (cap - port_mmaps->cap)); + + port_mmaps->cap = cap; + } + + if (i + 1 > port_mmaps->size) { + port_mmaps->size = i + 1; + } - return nxt_array_zero_add(port_mmaps); + return port_mmaps->elts + i; } void -nxt_port_mmaps_destroy(nxt_array_t *port_mmaps, nxt_bool_t destroy_pool) +nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free) { uint32_t i; nxt_port_mmap_t *port_mmap; @@ -60,18 +79,16 @@ nxt_port_mmaps_destroy(nxt_array_t *port_mmaps, nxt_bool_t destroy_pool) return; } - nxt_mp_thread_adopt(port_mmaps->mem_pool); - port_mmap = port_mmaps->elts; - for (i = 0; i < port_mmaps->nelts; i++) { - nxt_port_mmap_destroy(port_mmap); + for (i = 0; i < port_mmaps->size; i++) { + nxt_port_mmap_destroy(port_mmap + i); } - port_mmaps->nelts = 0; + port_mmaps->size = 0; - if (destroy_pool != 0) { - nxt_mp_destroy(port_mmaps->mem_pool); + if (free != 0) { + nxt_free(port_mmaps->elts); } } @@ -168,53 +185,39 @@ nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process, return NULL; } - nxt_thread_mutex_lock(&process->incoming_mutex); + mem = nxt_mem_mmap(NULL, mmap_stat.st_size, + PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + + if (nxt_slow_path(mem == MAP_FAILED)) { + nxt_log(task, NXT_LOG_WARN, "mmap() failed %E", nxt_errno); - if (process->incoming == NULL) { - process->incoming = nxt_port_mmaps_create(); + return NULL; } - if (nxt_slow_path(process->incoming == NULL)) { - nxt_log(task, NXT_LOG_WARN, "failed to allocate incoming array"); + hdr = mem; - goto fail; - } + nxt_thread_mutex_lock(&process->incoming.mutex); - port_mmap = nxt_port_mmap_add(process->incoming); + port_mmap = nxt_port_mmap_at(&process->incoming, hdr->id); if (nxt_slow_path(port_mmap == NULL)) { nxt_log(task, NXT_LOG_WARN, "failed to add mmap to incoming array"); - goto fail; - } - - mem = nxt_mem_mmap(NULL, mmap_stat.st_size, - PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); - - if (nxt_slow_path(mem == MAP_FAILED)) { - nxt_log(task, NXT_LOG_WARN, "mmap() failed %E", nxt_errno); - - port_mmap = NULL; + nxt_mem_munmap(mem, PORT_MMAP_SIZE); + hdr = NULL; goto fail; } - port_mmap->hdr = mem; - hdr = port_mmap->hdr; - - if (nxt_slow_path(port_mmap->hdr->id != process->incoming->nelts - 1)) { - nxt_log(task, NXT_LOG_WARN, "port mmap id mismatch (%d != %d)", - port_mmap->hdr->id, process->incoming->nelts - 1); - nxt_abort(); - } - nxt_assert(hdr->src_pid == process->pid); nxt_assert(hdr->dst_pid == nxt_pid); + port_mmap->hdr = hdr; + hdr->sent_over = 0xFFFFu; fail: - nxt_thread_mutex_unlock(&process->incoming_mutex); + nxt_thread_mutex_unlock(&process->incoming.mutex); return hdr; } @@ -230,19 +233,7 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, nxt_port_mmap_t *port_mmap; nxt_port_mmap_header_t *hdr; - port_mmap = NULL; - - if (process->outgoing == NULL) { - process->outgoing = nxt_port_mmaps_create(); - } - - if (nxt_slow_path(process->outgoing == NULL)) { - nxt_log(task, NXT_LOG_WARN, "failed to allocate outgoing array"); - - return NULL; - } - - port_mmap = nxt_port_mmap_add(process->outgoing); + port_mmap = nxt_port_mmap_at(&process->outgoing, process->outgoing.size); if (nxt_slow_path(port_mmap == NULL)) { nxt_log(task, NXT_LOG_WARN, "failed to add port mmap to outgoing array"); @@ -309,7 +300,7 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map)); - hdr->id = process->outgoing->nelts - 1; + hdr->id = process->outgoing.size - 1; hdr->src_pid = nxt_pid; hdr->dst_pid = process->pid; hdr->sent_over = port->id; @@ -332,7 +323,7 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, remove_fail: - nxt_array_remove(process->outgoing, port_mmap); + process->outgoing.size--; return NULL; } @@ -342,7 +333,6 @@ static nxt_port_mmap_header_t * nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c, size_t size) { - nxt_array_t *outgoing; nxt_process_t *process; nxt_port_mmap_t *port_mmap; nxt_port_mmap_t *end_port_mmap; @@ -357,17 +347,10 @@ nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c, port_mmap = NULL; hdr = NULL; - nxt_thread_mutex_lock(&process->outgoing_mutex); - - if (process->outgoing == NULL) { - hdr = nxt_port_new_port_mmap(task, process, port); - - goto unlock_return; - } + nxt_thread_mutex_lock(&process->outgoing.mutex); - outgoing = process->outgoing; - port_mmap = outgoing->elts; - end_port_mmap = port_mmap + outgoing->nelts; + port_mmap = process->outgoing.elts; + end_port_mmap = port_mmap + process->outgoing.size; while (port_mmap < end_port_mmap) { @@ -388,7 +371,7 @@ nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c, unlock_return: - nxt_thread_mutex_unlock(&process->outgoing_mutex); + nxt_thread_mutex_unlock(&process->outgoing.mutex); return hdr; } @@ -397,9 +380,7 @@ unlock_return: static nxt_port_mmap_header_t * nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id) { - nxt_array_t *incoming; nxt_process_t *process; - nxt_port_mmap_t *port_mmap; nxt_port_mmap_header_t *hdr; process = nxt_runtime_process_find(task->thread->runtime, spid); @@ -409,20 +390,17 @@ nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id) hdr = NULL; - nxt_thread_mutex_lock(&process->incoming_mutex); - - incoming = process->incoming; + nxt_thread_mutex_lock(&process->incoming.mutex); - if (nxt_fast_path(incoming != NULL && incoming->nelts > id)) { - port_mmap = incoming->elts; - hdr = port_mmap[id].hdr; + if (nxt_fast_path(process->incoming.size > id)) { + hdr = process->incoming.elts[id].hdr; } else { nxt_log(task, NXT_LOG_WARN, "failed to get incoming mmap #%d for process %PI", id, spid); } - nxt_thread_mutex_unlock(&process->incoming_mutex); + nxt_thread_mutex_unlock(&process->incoming.mutex); return hdr; } |