diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/nxt_main.h | 1 | ||||
-rw-r--r-- | src/nxt_port_memory.c | 150 | ||||
-rw-r--r-- | src/nxt_port_memory.h | 5 | ||||
-rw-r--r-- | src/nxt_process.c | 4 | ||||
-rw-r--r-- | src/nxt_process.h | 18 | ||||
-rw-r--r-- | src/nxt_runtime.c | 12 |
6 files changed, 86 insertions, 104 deletions
diff --git a/src/nxt_main.h b/src/nxt_main.h index 978381b9..a12e8153 100644 --- a/src/nxt_main.h +++ b/src/nxt_main.h @@ -103,7 +103,6 @@ typedef struct { #include <nxt_service.h> typedef struct nxt_buf_s nxt_buf_t; -typedef struct nxt_port_mmap_s nxt_port_mmap_t; #include <nxt_buf.h> #include <nxt_buf_pool.h> #include <nxt_recvbuf.h> diff --git a/src/nxt_port_memory.c b/src/nxt_port_memory.c index 997ce383..db2ad2df 100644 --- a/src/nxt_port_memory.c +++ b/src/nxt_port_memory.c @@ -16,7 +16,7 @@ #include <nxt_port_memory_int.h> -void +nxt_inline void nxt_port_mmap_destroy(nxt_port_mmap_t *port_mmap) { if (port_mmap->hdr != NULL) { @@ -26,32 +26,51 @@ nxt_port_mmap_destroy(nxt_port_mmap_t *port_mmap) } -static nxt_array_t * -nxt_port_mmaps_create() +static nxt_port_mmap_t * +nxt_port_mmap_at(nxt_port_mmaps_t *port_mmaps, uint32_t i) { - nxt_mp_t *mp; + uint32_t cap; - mp = nxt_mp_create(1024, 128, 256, 32); + cap = port_mmaps->cap; - if (nxt_slow_path(mp == NULL)) { - return NULL; + if (cap == 0) { + cap = i + 1; } - return nxt_array_create(mp, 1, sizeof(nxt_port_mmap_t)); -} + while (i + 1 > cap) { + if (cap < 16) { + cap = cap * 2; -static nxt_port_mmap_t * -nxt_port_mmap_add(nxt_array_t *port_mmaps) -{ - nxt_mp_thread_adopt(port_mmaps->mem_pool); + } else { + cap = cap + cap / 2; + } + } + + if (cap != port_mmaps->cap) { + + port_mmaps->elts = nxt_realloc(port_mmaps->elts, + cap * sizeof(nxt_port_mmap_t)); + if (nxt_slow_path(port_mmaps->elts == NULL)) { + return NULL; + } + + nxt_memzero(port_mmaps->elts + port_mmaps->cap, + sizeof(nxt_port_mmap_t) * (cap - port_mmaps->cap)); + + port_mmaps->cap = cap; + } + + if (i + 1 > port_mmaps->size) { + port_mmaps->size = i + 1; + } - return nxt_array_zero_add(port_mmaps); + return port_mmaps->elts + i; } void -nxt_port_mmaps_destroy(nxt_array_t *port_mmaps, nxt_bool_t destroy_pool) +nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free) { uint32_t i; nxt_port_mmap_t *port_mmap; @@ -60,18 +79,16 @@ nxt_port_mmaps_destroy(nxt_array_t *port_mmaps, nxt_bool_t destroy_pool) return; } - nxt_mp_thread_adopt(port_mmaps->mem_pool); - port_mmap = port_mmaps->elts; - for (i = 0; i < port_mmaps->nelts; i++) { - nxt_port_mmap_destroy(port_mmap); + for (i = 0; i < port_mmaps->size; i++) { + nxt_port_mmap_destroy(port_mmap + i); } - port_mmaps->nelts = 0; + port_mmaps->size = 0; - if (destroy_pool != 0) { - nxt_mp_destroy(port_mmaps->mem_pool); + if (free != 0) { + nxt_free(port_mmaps->elts); } } @@ -168,53 +185,39 @@ nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process, return NULL; } - nxt_thread_mutex_lock(&process->incoming_mutex); + mem = nxt_mem_mmap(NULL, mmap_stat.st_size, + PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + + if (nxt_slow_path(mem == MAP_FAILED)) { + nxt_log(task, NXT_LOG_WARN, "mmap() failed %E", nxt_errno); - if (process->incoming == NULL) { - process->incoming = nxt_port_mmaps_create(); + return NULL; } - if (nxt_slow_path(process->incoming == NULL)) { - nxt_log(task, NXT_LOG_WARN, "failed to allocate incoming array"); + hdr = mem; - goto fail; - } + nxt_thread_mutex_lock(&process->incoming.mutex); - port_mmap = nxt_port_mmap_add(process->incoming); + port_mmap = nxt_port_mmap_at(&process->incoming, hdr->id); if (nxt_slow_path(port_mmap == NULL)) { nxt_log(task, NXT_LOG_WARN, "failed to add mmap to incoming array"); - goto fail; - } - - mem = nxt_mem_mmap(NULL, mmap_stat.st_size, - PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); - - if (nxt_slow_path(mem == MAP_FAILED)) { - nxt_log(task, NXT_LOG_WARN, "mmap() failed %E", nxt_errno); - - port_mmap = NULL; + nxt_mem_munmap(mem, PORT_MMAP_SIZE); + hdr = NULL; goto fail; } - port_mmap->hdr = mem; - hdr = port_mmap->hdr; - - if (nxt_slow_path(port_mmap->hdr->id != process->incoming->nelts - 1)) { - nxt_log(task, NXT_LOG_WARN, "port mmap id mismatch (%d != %d)", - port_mmap->hdr->id, process->incoming->nelts - 1); - nxt_abort(); - } - nxt_assert(hdr->src_pid == process->pid); nxt_assert(hdr->dst_pid == nxt_pid); + port_mmap->hdr = hdr; + hdr->sent_over = 0xFFFFu; fail: - nxt_thread_mutex_unlock(&process->incoming_mutex); + nxt_thread_mutex_unlock(&process->incoming.mutex); return hdr; } @@ -230,19 +233,7 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, nxt_port_mmap_t *port_mmap; nxt_port_mmap_header_t *hdr; - port_mmap = NULL; - - if (process->outgoing == NULL) { - process->outgoing = nxt_port_mmaps_create(); - } - - if (nxt_slow_path(process->outgoing == NULL)) { - nxt_log(task, NXT_LOG_WARN, "failed to allocate outgoing array"); - - return NULL; - } - - port_mmap = nxt_port_mmap_add(process->outgoing); + port_mmap = nxt_port_mmap_at(&process->outgoing, process->outgoing.size); if (nxt_slow_path(port_mmap == NULL)) { nxt_log(task, NXT_LOG_WARN, "failed to add port mmap to outgoing array"); @@ -309,7 +300,7 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map)); - hdr->id = process->outgoing->nelts - 1; + hdr->id = process->outgoing.size - 1; hdr->src_pid = nxt_pid; hdr->dst_pid = process->pid; hdr->sent_over = port->id; @@ -332,7 +323,7 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, remove_fail: - nxt_array_remove(process->outgoing, port_mmap); + process->outgoing.size--; return NULL; } @@ -342,7 +333,6 @@ static nxt_port_mmap_header_t * nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c, size_t size) { - nxt_array_t *outgoing; nxt_process_t *process; nxt_port_mmap_t *port_mmap; nxt_port_mmap_t *end_port_mmap; @@ -357,17 +347,10 @@ nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c, port_mmap = NULL; hdr = NULL; - nxt_thread_mutex_lock(&process->outgoing_mutex); - - if (process->outgoing == NULL) { - hdr = nxt_port_new_port_mmap(task, process, port); - - goto unlock_return; - } + nxt_thread_mutex_lock(&process->outgoing.mutex); - outgoing = process->outgoing; - port_mmap = outgoing->elts; - end_port_mmap = port_mmap + outgoing->nelts; + port_mmap = process->outgoing.elts; + end_port_mmap = port_mmap + process->outgoing.size; while (port_mmap < end_port_mmap) { @@ -388,7 +371,7 @@ nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c, unlock_return: - nxt_thread_mutex_unlock(&process->outgoing_mutex); + nxt_thread_mutex_unlock(&process->outgoing.mutex); return hdr; } @@ -397,9 +380,7 @@ unlock_return: static nxt_port_mmap_header_t * nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id) { - nxt_array_t *incoming; nxt_process_t *process; - nxt_port_mmap_t *port_mmap; nxt_port_mmap_header_t *hdr; process = nxt_runtime_process_find(task->thread->runtime, spid); @@ -409,20 +390,17 @@ nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id) hdr = NULL; - nxt_thread_mutex_lock(&process->incoming_mutex); - - incoming = process->incoming; + nxt_thread_mutex_lock(&process->incoming.mutex); - if (nxt_fast_path(incoming != NULL && incoming->nelts > id)) { - port_mmap = incoming->elts; - hdr = port_mmap[id].hdr; + if (nxt_fast_path(process->incoming.size > id)) { + hdr = process->incoming.elts[id].hdr; } else { nxt_log(task, NXT_LOG_WARN, "failed to get incoming mmap #%d for process %PI", id, spid); } - nxt_thread_mutex_unlock(&process->incoming_mutex); + nxt_thread_mutex_unlock(&process->incoming.mutex); return hdr; } diff --git a/src/nxt_port_memory.h b/src/nxt_port_memory.h index ea51d001..51c40411 100644 --- a/src/nxt_port_memory.h +++ b/src/nxt_port_memory.h @@ -12,10 +12,7 @@ typedef struct nxt_port_mmap_header_s nxt_port_mmap_header_t; -void -nxt_port_mmap_destroy(nxt_port_mmap_t *port_mmap); - -void nxt_port_mmaps_destroy(nxt_array_t *port_mmaps, nxt_bool_t destroy_pool); +void nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free); /* * Allocates nxt_but_t structure from port's mem_pool, assigns this buf 'mem' diff --git a/src/nxt_process.c b/src/nxt_process.c index 272837b6..1b01713d 100644 --- a/src/nxt_process.c +++ b/src/nxt_process.c @@ -61,8 +61,8 @@ nxt_process_create(nxt_task_t *task, nxt_process_t *process) nxt_process_close_ports(task, p); } else { - nxt_port_mmaps_destroy(p->incoming, 0); - nxt_port_mmaps_destroy(p->outgoing, 0); + nxt_port_mmaps_destroy(&p->incoming, 0); + nxt_port_mmaps_destroy(&p->outgoing, 0); } } nxt_runtime_process_loop; diff --git a/src/nxt_process.h b/src/nxt_process.h index 26e7de18..ae0e1661 100644 --- a/src/nxt_process.h +++ b/src/nxt_process.h @@ -42,6 +42,17 @@ struct nxt_process_init_s { }; +typedef struct nxt_port_mmap_s nxt_port_mmap_t; +typedef struct nxt_port_mmaps_s nxt_port_mmaps_t; + +struct nxt_port_mmaps_s { + nxt_thread_mutex_t mutex; + uint32_t size; + uint32_t cap; + nxt_port_mmap_t *elts; +}; + + typedef struct { nxt_pid_t pid; nxt_queue_t ports; /* of nxt_port_t */ @@ -51,11 +62,8 @@ typedef struct { nxt_process_init_t *init; - nxt_thread_mutex_t incoming_mutex; - nxt_array_t *incoming; /* of nxt_port_mmap_t */ - - nxt_thread_mutex_t outgoing_mutex; - nxt_array_t *outgoing; /* of nxt_port_mmap_t */ + nxt_port_mmaps_t incoming; + nxt_port_mmaps_t outgoing; nxt_thread_mutex_t cp_mutex; nxt_lvlhsh_t connected_ports; /* of nxt_port_t */ diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c index 8fc2bc53..60ce45f6 100644 --- a/src/nxt_runtime.c +++ b/src/nxt_runtime.c @@ -1576,8 +1576,8 @@ nxt_runtime_process_new(nxt_runtime_t *rt) nxt_queue_init(&process->ports); - nxt_thread_mutex_create(&process->incoming_mutex); - nxt_thread_mutex_create(&process->outgoing_mutex); + nxt_thread_mutex_create(&process->incoming.mutex); + nxt_thread_mutex_create(&process->outgoing.mutex); nxt_thread_mutex_create(&process->cp_mutex); process->use_count = 1; @@ -1595,8 +1595,8 @@ nxt_runtime_process_destroy(nxt_runtime_t *rt, nxt_process_t *process) nxt_assert(process->use_count == 0); nxt_assert(process->registered == 0); - nxt_port_mmaps_destroy(process->incoming, 1); - nxt_port_mmaps_destroy(process->outgoing, 1); + nxt_port_mmaps_destroy(&process->incoming, 1); + nxt_port_mmaps_destroy(&process->outgoing, 1); port = nxt_port_hash_first(&process->connected_ports, &lhe); @@ -1606,8 +1606,8 @@ nxt_runtime_process_destroy(nxt_runtime_t *rt, nxt_process_t *process) port = nxt_port_hash_first(&process->connected_ports, &lhe); } - nxt_thread_mutex_destroy(&process->incoming_mutex); - nxt_thread_mutex_destroy(&process->outgoing_mutex); + nxt_thread_mutex_destroy(&process->incoming.mutex); + nxt_thread_mutex_destroy(&process->outgoing.mutex); nxt_thread_mutex_destroy(&process->cp_mutex); nxt_mp_free(rt->mem_pool, process); |