summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_port_memory.c
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2017-07-12 20:32:16 +0300
committerMax Romanov <max.romanov@nginx.com>2017-07-12 20:32:16 +0300
commitb0c1e740cf404f8fed5eed75fddb205ca74314e0 (patch)
tree08dcefc827c5dfb1570b682ea8d1e9abf17a31dc /src/nxt_port_memory.c
parentc38bcb7d70729434893ae4d5f2f58a78a36d2bd5 (diff)
downloadunit-b0c1e740cf404f8fed5eed75fddb205ca74314e0.tar.gz
unit-b0c1e740cf404f8fed5eed75fddb205ca74314e0.tar.bz2
New process port exchange changed. READY message type introduced.
Application process start request DATA message from router to master. Master notifies router via NEW_PORT message after worker process become ready.
Diffstat (limited to '')
-rw-r--r--src/nxt_port_memory.c68
1 files changed, 56 insertions, 12 deletions
diff --git a/src/nxt_port_memory.c b/src/nxt_port_memory.c
index 38e7db70..32d4aa5f 100644
--- a/src/nxt_port_memory.c
+++ b/src/nxt_port_memory.c
@@ -26,6 +26,56 @@ nxt_port_mmap_destroy(nxt_port_mmap_t *port_mmap)
}
+static nxt_array_t *
+nxt_port_mmaps_create()
+{
+ nxt_mp_t *mp;
+
+ mp = nxt_mp_create(1024, 128, 256, 32);
+
+ if (nxt_slow_path(mp == NULL)) {
+ return NULL;
+ }
+
+ return nxt_array_create(mp, 1, sizeof(nxt_port_mmap_t));
+}
+
+
+static nxt_port_mmap_t *
+nxt_port_mmap_add(nxt_array_t *port_mmaps)
+{
+ nxt_mp_thread_adopt(port_mmaps->mem_pool);
+
+ return nxt_array_zero_add(port_mmaps);
+}
+
+
+void
+nxt_port_mmaps_destroy(nxt_array_t *port_mmaps, nxt_bool_t destroy_pool)
+{
+ uint32_t i;
+ nxt_port_mmap_t *port_mmap;
+
+ if (port_mmaps == NULL) {
+ return;
+ }
+
+ nxt_mp_thread_adopt(port_mmaps->mem_pool);
+
+ port_mmap = port_mmaps->elts;
+
+ for (i = 0; i < port_mmaps->nelts; i++) {
+ nxt_port_mmap_destroy(port_mmap);
+ }
+
+ port_mmaps->nelts = 0;
+
+ if (destroy_pool != 0) {
+ nxt_mp_destroy(port_mmaps->mem_pool);
+ }
+}
+
+
static void
nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
{
@@ -63,6 +113,7 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
p = b->mem.pos - 1;
c = nxt_port_mmap_chunk_id(hdr, p) + 1;
p = nxt_port_mmap_chunk_start(hdr, c);
+
} else {
p = b->mem.start;
c = nxt_port_mmap_chunk_id(hdr, p);
@@ -103,9 +154,7 @@ nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process,
nxt_thread_mutex_lock(&process->incoming_mutex);
if (process->incoming == NULL) {
- process->incoming_mp = nxt_mp_create(1024, 128, 256, 32);
- process->incoming = nxt_array_create(process->incoming_mp, 1,
- sizeof(nxt_port_mmap_t));
+ process->incoming = nxt_port_mmaps_create();
}
if (nxt_slow_path(process->incoming == NULL)) {
@@ -114,9 +163,7 @@ nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process,
goto fail;
}
- nxt_mp_thread_adopt(process->incoming_mp);
-
- port_mmap = nxt_array_zero_add(process->incoming);
+ port_mmap = nxt_port_mmap_add(process->incoming);
if (nxt_slow_path(port_mmap == NULL)) {
nxt_log(task, NXT_LOG_WARN, "failed to add mmap to incoming array");
@@ -140,6 +187,7 @@ nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process,
if (nxt_slow_path(port_mmap->hdr->id != process->incoming->nelts - 1)) {
nxt_log(task, NXT_LOG_WARN, "port mmap id mismatch (%d != %d)",
port_mmap->hdr->id, process->incoming->nelts - 1);
+ nxt_abort();
}
fail:
@@ -195,9 +243,7 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
port_mmap = NULL;
if (process->outgoing == NULL) {
- process->outgoing_mp = nxt_mp_create(1024, 128, 256, 32);
- process->outgoing = nxt_array_create(process->outgoing_mp, 1,
- sizeof(nxt_port_mmap_t));
+ process->outgoing = nxt_port_mmaps_create();
}
if (nxt_slow_path(process->outgoing == NULL)) {
@@ -206,9 +252,7 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
return NULL;
}
- nxt_mp_thread_adopt(process->outgoing_mp);
-
- port_mmap = nxt_array_zero_add(process->outgoing);
+ port_mmap = nxt_port_mmap_add(process->outgoing);
if (nxt_slow_path(port_mmap == NULL)) {
nxt_log(task, NXT_LOG_WARN,
"failed to add port mmap to outgoing array");