diff options
author | Max Romanov <max.romanov@nginx.com> | 2017-10-19 17:36:56 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2017-10-19 17:36:56 +0300 |
commit | 6532e46465276efcedae299ce290eb8dff0ece57 (patch) | |
tree | 89c2716d8eb96a9f527b9b87df522fb9e8661f3a | |
parent | 6fd465f9d24040946f4218c281bd82517d5a350c (diff) | |
download | unit-6532e46465276efcedae299ce290eb8dff0ece57.tar.gz unit-6532e46465276efcedae299ce290eb8dff0ece57.tar.bz2 |
Supporting concurrent shared memory fd receive in router.
Two different router threads may send different requests to single
application worker. In this case shared memory fds from worker
to router will be send over 2 different router ports. These fds
will be received and processed by different threads in any order.
This patch made possible to add incoming shared memory segments in
arbitrary order. Additionally, array and memory pool are no longer
used to store segments because of pool's single threaded nature.
Custom array-like structure nxt_port_mmaps_t introduced.
-rw-r--r-- | src/nxt_main.h | 1 | ||||
-rw-r--r-- | src/nxt_port_memory.c | 150 | ||||
-rw-r--r-- | src/nxt_port_memory.h | 5 | ||||
-rw-r--r-- | src/nxt_process.c | 4 | ||||
-rw-r--r-- | src/nxt_process.h | 18 | ||||
-rw-r--r-- | src/nxt_runtime.c | 12 |
6 files changed, 86 insertions, 104 deletions
diff --git a/src/nxt_main.h b/src/nxt_main.h index 978381b9..a12e8153 100644 --- a/src/nxt_main.h +++ b/src/nxt_main.h @@ -103,7 +103,6 @@ typedef struct { #include <nxt_service.h> typedef struct nxt_buf_s nxt_buf_t; -typedef struct nxt_port_mmap_s nxt_port_mmap_t; #include <nxt_buf.h> #include <nxt_buf_pool.h> #include <nxt_recvbuf.h> diff --git a/src/nxt_port_memory.c b/src/nxt_port_memory.c index 997ce383..db2ad2df 100644 --- a/src/nxt_port_memory.c +++ b/src/nxt_port_memory.c @@ -16,7 +16,7 @@ #include <nxt_port_memory_int.h> -void +nxt_inline void nxt_port_mmap_destroy(nxt_port_mmap_t *port_mmap) { if (port_mmap->hdr != NULL) { @@ -26,32 +26,51 @@ nxt_port_mmap_destroy(nxt_port_mmap_t *port_mmap) } -static nxt_array_t * -nxt_port_mmaps_create() +static nxt_port_mmap_t * +nxt_port_mmap_at(nxt_port_mmaps_t *port_mmaps, uint32_t i) { - nxt_mp_t *mp; + uint32_t cap; - mp = nxt_mp_create(1024, 128, 256, 32); + cap = port_mmaps->cap; - if (nxt_slow_path(mp == NULL)) { - return NULL; + if (cap == 0) { + cap = i + 1; } - return nxt_array_create(mp, 1, sizeof(nxt_port_mmap_t)); -} + while (i + 1 > cap) { + if (cap < 16) { + cap = cap * 2; -static nxt_port_mmap_t * -nxt_port_mmap_add(nxt_array_t *port_mmaps) -{ - nxt_mp_thread_adopt(port_mmaps->mem_pool); + } else { + cap = cap + cap / 2; + } + } + + if (cap != port_mmaps->cap) { + + port_mmaps->elts = nxt_realloc(port_mmaps->elts, + cap * sizeof(nxt_port_mmap_t)); + if (nxt_slow_path(port_mmaps->elts == NULL)) { + return NULL; + } + + nxt_memzero(port_mmaps->elts + port_mmaps->cap, + sizeof(nxt_port_mmap_t) * (cap - port_mmaps->cap)); + + port_mmaps->cap = cap; + } + + if (i + 1 > port_mmaps->size) { + port_mmaps->size = i + 1; + } - return nxt_array_zero_add(port_mmaps); + return port_mmaps->elts + i; } void -nxt_port_mmaps_destroy(nxt_array_t *port_mmaps, nxt_bool_t destroy_pool) +nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free) { uint32_t i; nxt_port_mmap_t *port_mmap; @@ -60,18 +79,16 @@ nxt_port_mmaps_destroy(nxt_array_t *port_mmaps, nxt_bool_t destroy_pool) return; } - nxt_mp_thread_adopt(port_mmaps->mem_pool); - port_mmap = port_mmaps->elts; - for (i = 0; i < port_mmaps->nelts; i++) { - nxt_port_mmap_destroy(port_mmap); + for (i = 0; i < port_mmaps->size; i++) { + nxt_port_mmap_destroy(port_mmap + i); } - port_mmaps->nelts = 0; + port_mmaps->size = 0; - if (destroy_pool != 0) { - nxt_mp_destroy(port_mmaps->mem_pool); + if (free != 0) { + nxt_free(port_mmaps->elts); } } @@ -168,53 +185,39 @@ nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process, return NULL; } - nxt_thread_mutex_lock(&process->incoming_mutex); + mem = nxt_mem_mmap(NULL, mmap_stat.st_size, + PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + + if (nxt_slow_path(mem == MAP_FAILED)) { + nxt_log(task, NXT_LOG_WARN, "mmap() failed %E", nxt_errno); - if (process->incoming == NULL) { - process->incoming = nxt_port_mmaps_create(); + return NULL; } - if (nxt_slow_path(process->incoming == NULL)) { - nxt_log(task, NXT_LOG_WARN, "failed to allocate incoming array"); + hdr = mem; - goto fail; - } + nxt_thread_mutex_lock(&process->incoming.mutex); - port_mmap = nxt_port_mmap_add(process->incoming); + port_mmap = nxt_port_mmap_at(&process->incoming, hdr->id); if (nxt_slow_path(port_mmap == NULL)) { nxt_log(task, NXT_LOG_WARN, "failed to add mmap to incoming array"); - goto fail; - } - - mem = nxt_mem_mmap(NULL, mmap_stat.st_size, - PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); - - if (nxt_slow_path(mem == MAP_FAILED)) { - nxt_log(task, NXT_LOG_WARN, "mmap() failed %E", nxt_errno); - - port_mmap = NULL; + nxt_mem_munmap(mem, PORT_MMAP_SIZE); + hdr = NULL; goto fail; } - port_mmap->hdr = mem; - hdr = port_mmap->hdr; - - if (nxt_slow_path(port_mmap->hdr->id != process->incoming->nelts - 1)) { - nxt_log(task, NXT_LOG_WARN, "port mmap id mismatch (%d != %d)", - port_mmap->hdr->id, process->incoming->nelts - 1); - nxt_abort(); - } - nxt_assert(hdr->src_pid == process->pid); nxt_assert(hdr->dst_pid == nxt_pid); + port_mmap->hdr = hdr; + hdr->sent_over = 0xFFFFu; fail: - nxt_thread_mutex_unlock(&process->incoming_mutex); + nxt_thread_mutex_unlock(&process->incoming.mutex); return hdr; } @@ -230,19 +233,7 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, nxt_port_mmap_t *port_mmap; nxt_port_mmap_header_t *hdr; - port_mmap = NULL; - - if (process->outgoing == NULL) { - process->outgoing = nxt_port_mmaps_create(); - } - - if (nxt_slow_path(process->outgoing == NULL)) { - nxt_log(task, NXT_LOG_WARN, "failed to allocate outgoing array"); - - return NULL; - } - - port_mmap = nxt_port_mmap_add(process->outgoing); + port_mmap = nxt_port_mmap_at(&process->outgoing, process->outgoing.size); if (nxt_slow_path(port_mmap == NULL)) { nxt_log(task, NXT_LOG_WARN, "failed to add port mmap to outgoing array"); @@ -309,7 +300,7 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map)); - hdr->id = process->outgoing->nelts - 1; + hdr->id = process->outgoing.size - 1; hdr->src_pid = nxt_pid; hdr->dst_pid = process->pid; hdr->sent_over = port->id; @@ -332,7 +323,7 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, remove_fail: - nxt_array_remove(process->outgoing, port_mmap); + process->outgoing.size--; return NULL; } @@ -342,7 +333,6 @@ static nxt_port_mmap_header_t * nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c, size_t size) { - nxt_array_t *outgoing; nxt_process_t *process; nxt_port_mmap_t *port_mmap; nxt_port_mmap_t *end_port_mmap; @@ -357,17 +347,10 @@ nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c, port_mmap = NULL; hdr = NULL; - nxt_thread_mutex_lock(&process->outgoing_mutex); - - if (process->outgoing == NULL) { - hdr = nxt_port_new_port_mmap(task, process, port); - - goto unlock_return; - } + nxt_thread_mutex_lock(&process->outgoing.mutex); - outgoing = process->outgoing; - port_mmap = outgoing->elts; - end_port_mmap = port_mmap + outgoing->nelts; + port_mmap = process->outgoing.elts; + end_port_mmap = port_mmap + process->outgoing.size; while (port_mmap < end_port_mmap) { @@ -388,7 +371,7 @@ nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c, unlock_return: - nxt_thread_mutex_unlock(&process->outgoing_mutex); + nxt_thread_mutex_unlock(&process->outgoing.mutex); return hdr; } @@ -397,9 +380,7 @@ unlock_return: static nxt_port_mmap_header_t * nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id) { - nxt_array_t *incoming; nxt_process_t *process; - nxt_port_mmap_t *port_mmap; nxt_port_mmap_header_t *hdr; process = nxt_runtime_process_find(task->thread->runtime, spid); @@ -409,20 +390,17 @@ nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id) hdr = NULL; - nxt_thread_mutex_lock(&process->incoming_mutex); - - incoming = process->incoming; + nxt_thread_mutex_lock(&process->incoming.mutex); - if (nxt_fast_path(incoming != NULL && incoming->nelts > id)) { - port_mmap = incoming->elts; - hdr = port_mmap[id].hdr; + if (nxt_fast_path(process->incoming.size > id)) { + hdr = process->incoming.elts[id].hdr; } else { nxt_log(task, NXT_LOG_WARN, "failed to get incoming mmap #%d for process %PI", id, spid); } - nxt_thread_mutex_unlock(&process->incoming_mutex); + nxt_thread_mutex_unlock(&process->incoming.mutex); return hdr; } diff --git a/src/nxt_port_memory.h b/src/nxt_port_memory.h index ea51d001..51c40411 100644 --- a/src/nxt_port_memory.h +++ b/src/nxt_port_memory.h @@ -12,10 +12,7 @@ typedef struct nxt_port_mmap_header_s nxt_port_mmap_header_t; -void -nxt_port_mmap_destroy(nxt_port_mmap_t *port_mmap); - -void nxt_port_mmaps_destroy(nxt_array_t *port_mmaps, nxt_bool_t destroy_pool); +void nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free); /* * Allocates nxt_but_t structure from port's mem_pool, assigns this buf 'mem' diff --git a/src/nxt_process.c b/src/nxt_process.c index 272837b6..1b01713d 100644 --- a/src/nxt_process.c +++ b/src/nxt_process.c @@ -61,8 +61,8 @@ nxt_process_create(nxt_task_t *task, nxt_process_t *process) nxt_process_close_ports(task, p); } else { - nxt_port_mmaps_destroy(p->incoming, 0); - nxt_port_mmaps_destroy(p->outgoing, 0); + nxt_port_mmaps_destroy(&p->incoming, 0); + nxt_port_mmaps_destroy(&p->outgoing, 0); } } nxt_runtime_process_loop; diff --git a/src/nxt_process.h b/src/nxt_process.h index 26e7de18..ae0e1661 100644 --- a/src/nxt_process.h +++ b/src/nxt_process.h @@ -42,6 +42,17 @@ struct nxt_process_init_s { }; +typedef struct nxt_port_mmap_s nxt_port_mmap_t; +typedef struct nxt_port_mmaps_s nxt_port_mmaps_t; + +struct nxt_port_mmaps_s { + nxt_thread_mutex_t mutex; + uint32_t size; + uint32_t cap; + nxt_port_mmap_t *elts; +}; + + typedef struct { nxt_pid_t pid; nxt_queue_t ports; /* of nxt_port_t */ @@ -51,11 +62,8 @@ typedef struct { nxt_process_init_t *init; - nxt_thread_mutex_t incoming_mutex; - nxt_array_t *incoming; /* of nxt_port_mmap_t */ - - nxt_thread_mutex_t outgoing_mutex; - nxt_array_t *outgoing; /* of nxt_port_mmap_t */ + nxt_port_mmaps_t incoming; + nxt_port_mmaps_t outgoing; nxt_thread_mutex_t cp_mutex; nxt_lvlhsh_t connected_ports; /* of nxt_port_t */ diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c index 8fc2bc53..60ce45f6 100644 --- a/src/nxt_runtime.c +++ b/src/nxt_runtime.c @@ -1576,8 +1576,8 @@ nxt_runtime_process_new(nxt_runtime_t *rt) nxt_queue_init(&process->ports); - nxt_thread_mutex_create(&process->incoming_mutex); - nxt_thread_mutex_create(&process->outgoing_mutex); + nxt_thread_mutex_create(&process->incoming.mutex); + nxt_thread_mutex_create(&process->outgoing.mutex); nxt_thread_mutex_create(&process->cp_mutex); process->use_count = 1; @@ -1595,8 +1595,8 @@ nxt_runtime_process_destroy(nxt_runtime_t *rt, nxt_process_t *process) nxt_assert(process->use_count == 0); nxt_assert(process->registered == 0); - nxt_port_mmaps_destroy(process->incoming, 1); - nxt_port_mmaps_destroy(process->outgoing, 1); + nxt_port_mmaps_destroy(&process->incoming, 1); + nxt_port_mmaps_destroy(&process->outgoing, 1); port = nxt_port_hash_first(&process->connected_ports, &lhe); @@ -1606,8 +1606,8 @@ nxt_runtime_process_destroy(nxt_runtime_t *rt, nxt_process_t *process) port = nxt_port_hash_first(&process->connected_ports, &lhe); } - nxt_thread_mutex_destroy(&process->incoming_mutex); - nxt_thread_mutex_destroy(&process->outgoing_mutex); + nxt_thread_mutex_destroy(&process->incoming.mutex); + nxt_thread_mutex_destroy(&process->outgoing.mutex); nxt_thread_mutex_destroy(&process->cp_mutex); nxt_mp_free(rt->mem_pool, process); |