diff options
author | Max Romanov <max.romanov@nginx.com> | 2017-07-12 20:32:16 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2017-07-12 20:32:16 +0300 |
commit | b0c1e740cf404f8fed5eed75fddb205ca74314e0 (patch) | |
tree | 08dcefc827c5dfb1570b682ea8d1e9abf17a31dc /src/nxt_port_memory.c | |
parent | c38bcb7d70729434893ae4d5f2f58a78a36d2bd5 (diff) | |
download | unit-b0c1e740cf404f8fed5eed75fddb205ca74314e0.tar.gz unit-b0c1e740cf404f8fed5eed75fddb205ca74314e0.tar.bz2 |
New process port exchange changed. READY message type introduced.
Application process start request DATA message from router to master.
Master notifies router via NEW_PORT message after worker process become ready.
Diffstat (limited to '')
-rw-r--r-- | src/nxt_port_memory.c | 68 |
1 files changed, 56 insertions, 12 deletions
diff --git a/src/nxt_port_memory.c b/src/nxt_port_memory.c index 38e7db70..32d4aa5f 100644 --- a/src/nxt_port_memory.c +++ b/src/nxt_port_memory.c @@ -26,6 +26,56 @@ nxt_port_mmap_destroy(nxt_port_mmap_t *port_mmap) } +static nxt_array_t * +nxt_port_mmaps_create() +{ + nxt_mp_t *mp; + + mp = nxt_mp_create(1024, 128, 256, 32); + + if (nxt_slow_path(mp == NULL)) { + return NULL; + } + + return nxt_array_create(mp, 1, sizeof(nxt_port_mmap_t)); +} + + +static nxt_port_mmap_t * +nxt_port_mmap_add(nxt_array_t *port_mmaps) +{ + nxt_mp_thread_adopt(port_mmaps->mem_pool); + + return nxt_array_zero_add(port_mmaps); +} + + +void +nxt_port_mmaps_destroy(nxt_array_t *port_mmaps, nxt_bool_t destroy_pool) +{ + uint32_t i; + nxt_port_mmap_t *port_mmap; + + if (port_mmaps == NULL) { + return; + } + + nxt_mp_thread_adopt(port_mmaps->mem_pool); + + port_mmap = port_mmaps->elts; + + for (i = 0; i < port_mmaps->nelts; i++) { + nxt_port_mmap_destroy(port_mmap); + } + + port_mmaps->nelts = 0; + + if (destroy_pool != 0) { + nxt_mp_destroy(port_mmaps->mem_pool); + } +} + + static void nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data) { @@ -63,6 +113,7 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data) p = b->mem.pos - 1; c = nxt_port_mmap_chunk_id(hdr, p) + 1; p = nxt_port_mmap_chunk_start(hdr, c); + } else { p = b->mem.start; c = nxt_port_mmap_chunk_id(hdr, p); @@ -103,9 +154,7 @@ nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process, nxt_thread_mutex_lock(&process->incoming_mutex); if (process->incoming == NULL) { - process->incoming_mp = nxt_mp_create(1024, 128, 256, 32); - process->incoming = nxt_array_create(process->incoming_mp, 1, - sizeof(nxt_port_mmap_t)); + process->incoming = nxt_port_mmaps_create(); } if (nxt_slow_path(process->incoming == NULL)) { @@ -114,9 +163,7 @@ nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process, goto fail; } - nxt_mp_thread_adopt(process->incoming_mp); - - port_mmap = nxt_array_zero_add(process->incoming); + port_mmap = nxt_port_mmap_add(process->incoming); if (nxt_slow_path(port_mmap == NULL)) { nxt_log(task, NXT_LOG_WARN, "failed to add mmap to incoming array"); @@ -140,6 +187,7 @@ nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process, if (nxt_slow_path(port_mmap->hdr->id != process->incoming->nelts - 1)) { nxt_log(task, NXT_LOG_WARN, "port mmap id mismatch (%d != %d)", port_mmap->hdr->id, process->incoming->nelts - 1); + nxt_abort(); } fail: @@ -195,9 +243,7 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, port_mmap = NULL; if (process->outgoing == NULL) { - process->outgoing_mp = nxt_mp_create(1024, 128, 256, 32); - process->outgoing = nxt_array_create(process->outgoing_mp, 1, - sizeof(nxt_port_mmap_t)); + process->outgoing = nxt_port_mmaps_create(); } if (nxt_slow_path(process->outgoing == NULL)) { @@ -206,9 +252,7 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, return NULL; } - nxt_mp_thread_adopt(process->outgoing_mp); - - port_mmap = nxt_array_zero_add(process->outgoing); + port_mmap = nxt_port_mmap_add(process->outgoing); if (nxt_slow_path(port_mmap == NULL)) { nxt_log(task, NXT_LOG_WARN, "failed to add port mmap to outgoing array"); |