summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/nxt_main.h1
-rw-r--r--src/nxt_port_memory.c150
-rw-r--r--src/nxt_port_memory.h5
-rw-r--r--src/nxt_process.c4
-rw-r--r--src/nxt_process.h18
-rw-r--r--src/nxt_runtime.c12
6 files changed, 86 insertions, 104 deletions
diff --git a/src/nxt_main.h b/src/nxt_main.h
index 978381b9..a12e8153 100644
--- a/src/nxt_main.h
+++ b/src/nxt_main.h
@@ -103,7 +103,6 @@ typedef struct {
#include <nxt_service.h>
typedef struct nxt_buf_s nxt_buf_t;
-typedef struct nxt_port_mmap_s nxt_port_mmap_t;
#include <nxt_buf.h>
#include <nxt_buf_pool.h>
#include <nxt_recvbuf.h>
diff --git a/src/nxt_port_memory.c b/src/nxt_port_memory.c
index 997ce383..db2ad2df 100644
--- a/src/nxt_port_memory.c
+++ b/src/nxt_port_memory.c
@@ -16,7 +16,7 @@
#include <nxt_port_memory_int.h>
-void
+nxt_inline void
nxt_port_mmap_destroy(nxt_port_mmap_t *port_mmap)
{
if (port_mmap->hdr != NULL) {
@@ -26,32 +26,51 @@ nxt_port_mmap_destroy(nxt_port_mmap_t *port_mmap)
}
-static nxt_array_t *
-nxt_port_mmaps_create()
+static nxt_port_mmap_t *
+nxt_port_mmap_at(nxt_port_mmaps_t *port_mmaps, uint32_t i)
{
- nxt_mp_t *mp;
+ uint32_t cap;
- mp = nxt_mp_create(1024, 128, 256, 32);
+ cap = port_mmaps->cap;
- if (nxt_slow_path(mp == NULL)) {
- return NULL;
+ if (cap == 0) {
+ cap = i + 1;
}
- return nxt_array_create(mp, 1, sizeof(nxt_port_mmap_t));
-}
+ while (i + 1 > cap) {
+ if (cap < 16) {
+ cap = cap * 2;
-static nxt_port_mmap_t *
-nxt_port_mmap_add(nxt_array_t *port_mmaps)
-{
- nxt_mp_thread_adopt(port_mmaps->mem_pool);
+ } else {
+ cap = cap + cap / 2;
+ }
+ }
+
+ if (cap != port_mmaps->cap) {
+
+ port_mmaps->elts = nxt_realloc(port_mmaps->elts,
+ cap * sizeof(nxt_port_mmap_t));
+ if (nxt_slow_path(port_mmaps->elts == NULL)) {
+ return NULL;
+ }
+
+ nxt_memzero(port_mmaps->elts + port_mmaps->cap,
+ sizeof(nxt_port_mmap_t) * (cap - port_mmaps->cap));
+
+ port_mmaps->cap = cap;
+ }
+
+ if (i + 1 > port_mmaps->size) {
+ port_mmaps->size = i + 1;
+ }
- return nxt_array_zero_add(port_mmaps);
+ return port_mmaps->elts + i;
}
void
-nxt_port_mmaps_destroy(nxt_array_t *port_mmaps, nxt_bool_t destroy_pool)
+nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free)
{
uint32_t i;
nxt_port_mmap_t *port_mmap;
@@ -60,18 +79,16 @@ nxt_port_mmaps_destroy(nxt_array_t *port_mmaps, nxt_bool_t destroy_pool)
return;
}
- nxt_mp_thread_adopt(port_mmaps->mem_pool);
-
port_mmap = port_mmaps->elts;
- for (i = 0; i < port_mmaps->nelts; i++) {
- nxt_port_mmap_destroy(port_mmap);
+ for (i = 0; i < port_mmaps->size; i++) {
+ nxt_port_mmap_destroy(port_mmap + i);
}
- port_mmaps->nelts = 0;
+ port_mmaps->size = 0;
- if (destroy_pool != 0) {
- nxt_mp_destroy(port_mmaps->mem_pool);
+ if (free != 0) {
+ nxt_free(port_mmaps->elts);
}
}
@@ -168,53 +185,39 @@ nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process,
return NULL;
}
- nxt_thread_mutex_lock(&process->incoming_mutex);
+ mem = nxt_mem_mmap(NULL, mmap_stat.st_size,
+ PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
+
+ if (nxt_slow_path(mem == MAP_FAILED)) {
+ nxt_log(task, NXT_LOG_WARN, "mmap() failed %E", nxt_errno);
- if (process->incoming == NULL) {
- process->incoming = nxt_port_mmaps_create();
+ return NULL;
}
- if (nxt_slow_path(process->incoming == NULL)) {
- nxt_log(task, NXT_LOG_WARN, "failed to allocate incoming array");
+ hdr = mem;
- goto fail;
- }
+ nxt_thread_mutex_lock(&process->incoming.mutex);
- port_mmap = nxt_port_mmap_add(process->incoming);
+ port_mmap = nxt_port_mmap_at(&process->incoming, hdr->id);
if (nxt_slow_path(port_mmap == NULL)) {
nxt_log(task, NXT_LOG_WARN, "failed to add mmap to incoming array");
- goto fail;
- }
-
- mem = nxt_mem_mmap(NULL, mmap_stat.st_size,
- PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
-
- if (nxt_slow_path(mem == MAP_FAILED)) {
- nxt_log(task, NXT_LOG_WARN, "mmap() failed %E", nxt_errno);
-
- port_mmap = NULL;
+ nxt_mem_munmap(mem, PORT_MMAP_SIZE);
+ hdr = NULL;
goto fail;
}
- port_mmap->hdr = mem;
- hdr = port_mmap->hdr;
-
- if (nxt_slow_path(port_mmap->hdr->id != process->incoming->nelts - 1)) {
- nxt_log(task, NXT_LOG_WARN, "port mmap id mismatch (%d != %d)",
- port_mmap->hdr->id, process->incoming->nelts - 1);
- nxt_abort();
- }
-
nxt_assert(hdr->src_pid == process->pid);
nxt_assert(hdr->dst_pid == nxt_pid);
+ port_mmap->hdr = hdr;
+
hdr->sent_over = 0xFFFFu;
fail:
- nxt_thread_mutex_unlock(&process->incoming_mutex);
+ nxt_thread_mutex_unlock(&process->incoming.mutex);
return hdr;
}
@@ -230,19 +233,7 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
nxt_port_mmap_t *port_mmap;
nxt_port_mmap_header_t *hdr;
- port_mmap = NULL;
-
- if (process->outgoing == NULL) {
- process->outgoing = nxt_port_mmaps_create();
- }
-
- if (nxt_slow_path(process->outgoing == NULL)) {
- nxt_log(task, NXT_LOG_WARN, "failed to allocate outgoing array");
-
- return NULL;
- }
-
- port_mmap = nxt_port_mmap_add(process->outgoing);
+ port_mmap = nxt_port_mmap_at(&process->outgoing, process->outgoing.size);
if (nxt_slow_path(port_mmap == NULL)) {
nxt_log(task, NXT_LOG_WARN,
"failed to add port mmap to outgoing array");
@@ -309,7 +300,7 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
- hdr->id = process->outgoing->nelts - 1;
+ hdr->id = process->outgoing.size - 1;
hdr->src_pid = nxt_pid;
hdr->dst_pid = process->pid;
hdr->sent_over = port->id;
@@ -332,7 +323,7 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
remove_fail:
- nxt_array_remove(process->outgoing, port_mmap);
+ process->outgoing.size--;
return NULL;
}
@@ -342,7 +333,6 @@ static nxt_port_mmap_header_t *
nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c,
size_t size)
{
- nxt_array_t *outgoing;
nxt_process_t *process;
nxt_port_mmap_t *port_mmap;
nxt_port_mmap_t *end_port_mmap;
@@ -357,17 +347,10 @@ nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c,
port_mmap = NULL;
hdr = NULL;
- nxt_thread_mutex_lock(&process->outgoing_mutex);
-
- if (process->outgoing == NULL) {
- hdr = nxt_port_new_port_mmap(task, process, port);
-
- goto unlock_return;
- }
+ nxt_thread_mutex_lock(&process->outgoing.mutex);
- outgoing = process->outgoing;
- port_mmap = outgoing->elts;
- end_port_mmap = port_mmap + outgoing->nelts;
+ port_mmap = process->outgoing.elts;
+ end_port_mmap = port_mmap + process->outgoing.size;
while (port_mmap < end_port_mmap) {
@@ -388,7 +371,7 @@ nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c,
unlock_return:
- nxt_thread_mutex_unlock(&process->outgoing_mutex);
+ nxt_thread_mutex_unlock(&process->outgoing.mutex);
return hdr;
}
@@ -397,9 +380,7 @@ unlock_return:
static nxt_port_mmap_header_t *
nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id)
{
- nxt_array_t *incoming;
nxt_process_t *process;
- nxt_port_mmap_t *port_mmap;
nxt_port_mmap_header_t *hdr;
process = nxt_runtime_process_find(task->thread->runtime, spid);
@@ -409,20 +390,17 @@ nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id)
hdr = NULL;
- nxt_thread_mutex_lock(&process->incoming_mutex);
-
- incoming = process->incoming;
+ nxt_thread_mutex_lock(&process->incoming.mutex);
- if (nxt_fast_path(incoming != NULL && incoming->nelts > id)) {
- port_mmap = incoming->elts;
- hdr = port_mmap[id].hdr;
+ if (nxt_fast_path(process->incoming.size > id)) {
+ hdr = process->incoming.elts[id].hdr;
} else {
nxt_log(task, NXT_LOG_WARN,
"failed to get incoming mmap #%d for process %PI", id, spid);
}
- nxt_thread_mutex_unlock(&process->incoming_mutex);
+ nxt_thread_mutex_unlock(&process->incoming.mutex);
return hdr;
}
diff --git a/src/nxt_port_memory.h b/src/nxt_port_memory.h
index ea51d001..51c40411 100644
--- a/src/nxt_port_memory.h
+++ b/src/nxt_port_memory.h
@@ -12,10 +12,7 @@
typedef struct nxt_port_mmap_header_s nxt_port_mmap_header_t;
-void
-nxt_port_mmap_destroy(nxt_port_mmap_t *port_mmap);
-
-void nxt_port_mmaps_destroy(nxt_array_t *port_mmaps, nxt_bool_t destroy_pool);
+void nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free);
/*
* Allocates nxt_but_t structure from port's mem_pool, assigns this buf 'mem'
diff --git a/src/nxt_process.c b/src/nxt_process.c
index 272837b6..1b01713d 100644
--- a/src/nxt_process.c
+++ b/src/nxt_process.c
@@ -61,8 +61,8 @@ nxt_process_create(nxt_task_t *task, nxt_process_t *process)
nxt_process_close_ports(task, p);
} else {
- nxt_port_mmaps_destroy(p->incoming, 0);
- nxt_port_mmaps_destroy(p->outgoing, 0);
+ nxt_port_mmaps_destroy(&p->incoming, 0);
+ nxt_port_mmaps_destroy(&p->outgoing, 0);
}
} nxt_runtime_process_loop;
diff --git a/src/nxt_process.h b/src/nxt_process.h
index 26e7de18..ae0e1661 100644
--- a/src/nxt_process.h
+++ b/src/nxt_process.h
@@ -42,6 +42,17 @@ struct nxt_process_init_s {
};
+typedef struct nxt_port_mmap_s nxt_port_mmap_t;
+typedef struct nxt_port_mmaps_s nxt_port_mmaps_t;
+
+struct nxt_port_mmaps_s {
+ nxt_thread_mutex_t mutex;
+ uint32_t size;
+ uint32_t cap;
+ nxt_port_mmap_t *elts;
+};
+
+
typedef struct {
nxt_pid_t pid;
nxt_queue_t ports; /* of nxt_port_t */
@@ -51,11 +62,8 @@ typedef struct {
nxt_process_init_t *init;
- nxt_thread_mutex_t incoming_mutex;
- nxt_array_t *incoming; /* of nxt_port_mmap_t */
-
- nxt_thread_mutex_t outgoing_mutex;
- nxt_array_t *outgoing; /* of nxt_port_mmap_t */
+ nxt_port_mmaps_t incoming;
+ nxt_port_mmaps_t outgoing;
nxt_thread_mutex_t cp_mutex;
nxt_lvlhsh_t connected_ports; /* of nxt_port_t */
diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c
index 8fc2bc53..60ce45f6 100644
--- a/src/nxt_runtime.c
+++ b/src/nxt_runtime.c
@@ -1576,8 +1576,8 @@ nxt_runtime_process_new(nxt_runtime_t *rt)
nxt_queue_init(&process->ports);
- nxt_thread_mutex_create(&process->incoming_mutex);
- nxt_thread_mutex_create(&process->outgoing_mutex);
+ nxt_thread_mutex_create(&process->incoming.mutex);
+ nxt_thread_mutex_create(&process->outgoing.mutex);
nxt_thread_mutex_create(&process->cp_mutex);
process->use_count = 1;
@@ -1595,8 +1595,8 @@ nxt_runtime_process_destroy(nxt_runtime_t *rt, nxt_process_t *process)
nxt_assert(process->use_count == 0);
nxt_assert(process->registered == 0);
- nxt_port_mmaps_destroy(process->incoming, 1);
- nxt_port_mmaps_destroy(process->outgoing, 1);
+ nxt_port_mmaps_destroy(&process->incoming, 1);
+ nxt_port_mmaps_destroy(&process->outgoing, 1);
port = nxt_port_hash_first(&process->connected_ports, &lhe);
@@ -1606,8 +1606,8 @@ nxt_runtime_process_destroy(nxt_runtime_t *rt, nxt_process_t *process)
port = nxt_port_hash_first(&process->connected_ports, &lhe);
}
- nxt_thread_mutex_destroy(&process->incoming_mutex);
- nxt_thread_mutex_destroy(&process->outgoing_mutex);
+ nxt_thread_mutex_destroy(&process->incoming.mutex);
+ nxt_thread_mutex_destroy(&process->outgoing.mutex);
nxt_thread_mutex_destroy(&process->cp_mutex);
nxt_mp_free(rt->mem_pool, process);