diff options
Diffstat (limited to '')
-rw-r--r-- | src/nxt_mp.c | 56 | ||||
-rw-r--r-- | src/nxt_mp.h | 2 | ||||
-rw-r--r-- | src/nxt_port_memory.c | 10 | ||||
-rw-r--r-- | src/nxt_process.h | 2 | ||||
-rw-r--r-- | src/nxt_router.c | 2 | ||||
-rw-r--r-- | src/nxt_runtime.c | 44 |
6 files changed, 100 insertions, 16 deletions
diff --git a/src/nxt_mp.c b/src/nxt_mp.c index f1ddd86b..464df1c1 100644 --- a/src/nxt_mp.c +++ b/src/nxt_mp.c @@ -109,6 +109,11 @@ struct nxt_mp_s { uint32_t cluster_size; uint32_t retain; +#if (NXT_DEBUG) + nxt_pid_t pid; + nxt_tid_t tid; +#endif + /* Lists of nxt_mp_page_t. */ nxt_queue_t free_pages; nxt_queue_t nget_pages; @@ -186,6 +191,49 @@ nxt_lg2(uint64_t v) #endif +#if (NXT_DEBUG) + +nxt_inline void +nxt_mp_thread_assert(nxt_mp_t *mp) +{ + nxt_tid_t tid; + nxt_thread_t *thread; + + thread = nxt_thread(); + tid = nxt_thread_tid(thread); + + if (nxt_fast_path(mp->tid == tid)) { + return; + } + + if (nxt_slow_path(nxt_pid != mp->pid)) { + mp->pid = nxt_pid; + mp->tid = tid; + + return; + } + + nxt_log_alert(thread->log, "mem_pool locked by thread %PT", mp->tid); + nxt_abort(); +} + +#else + +#define nxt_mp_thread_assert(mp) + +#endif + + +void +nxt_mp_thread_adopt(nxt_mp_t *mp) +{ +#if (NXT_DEBUG) + mp->pid = nxt_pid; + mp->tid = nxt_thread_tid(NULL); +#endif +} + + nxt_mp_t * nxt_mp_create(size_t cluster_size, size_t page_alignment, size_t page_size, size_t min_chunk_size) @@ -417,6 +465,8 @@ nxt_mp_alloc_small(nxt_mp_t *mp, size_t size) nxt_mp_page_t *page; nxt_queue_link_t *link; + nxt_mp_thread_assert(mp); + p = NULL; if (size <= mp->page_size / 2) { @@ -489,6 +539,8 @@ nxt_mp_get_small(nxt_mp_t *mp, nxt_queue_t *pages, size_t size) nxt_mp_page_t *page; nxt_queue_link_t *link, *next; + nxt_mp_thread_assert(mp); + for (link = nxt_queue_first(pages); link != nxt_queue_tail(pages); link = next) @@ -604,6 +656,8 @@ nxt_mp_alloc_large(nxt_mp_t *mp, size_t alignment, size_t size) uint8_t type; nxt_mp_block_t *block; + nxt_mp_thread_assert(mp); + /* Allocation must be less than 4G. */ if (nxt_slow_path(size >= 0xFFFFFFFF)) { return NULL; @@ -664,6 +718,8 @@ nxt_mp_free(nxt_mp_t *mp, void *p) nxt_thread_t *thread; nxt_mp_block_t *block; + nxt_mp_thread_assert(mp); + nxt_debug_alloc("mp free %p", p); block = nxt_mp_find_block(&mp->blocks, p); diff --git a/src/nxt_mp.h b/src/nxt_mp.h index 40abc707..78040df2 100644 --- a/src/nxt_mp.h +++ b/src/nxt_mp.h @@ -109,4 +109,6 @@ NXT_EXPORT void *nxt_mp_zget(nxt_mp_t *mp, size_t size) NXT_MALLOC_LIKE; +NXT_EXPORT void nxt_mp_thread_adopt(nxt_mp_t *mp); + #endif /* _NXT_MP_H_INCLUDED_ */ diff --git a/src/nxt_port_memory.c b/src/nxt_port_memory.c index cf69a758..6aaff5cf 100644 --- a/src/nxt_port_memory.c +++ b/src/nxt_port_memory.c @@ -103,7 +103,8 @@ nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process, nxt_thread_mutex_lock(&process->incoming_mutex); if (process->incoming == NULL) { - process->incoming = nxt_array_create(process->mem_pool, 1, + process->incoming_mp = nxt_mp_create(1024, 128, 256, 32); + process->incoming = nxt_array_create(process->incoming_mp, 1, sizeof(nxt_port_mmap_t)); } @@ -113,6 +114,8 @@ nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process, goto fail; } + nxt_mp_thread_adopt(process->incoming_mp); + port_mmap = nxt_array_zero_add(process->incoming); if (nxt_slow_path(port_mmap == NULL)) { nxt_log(task, NXT_LOG_WARN, "failed to add mmap to incoming array"); @@ -192,7 +195,8 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, port_mmap = NULL; if (process->outgoing == NULL) { - process->outgoing = nxt_array_create(process->mem_pool, 1, + process->outgoing_mp = nxt_mp_create(1024, 128, 256, 32); + process->outgoing = nxt_array_create(process->outgoing_mp, 1, sizeof(nxt_port_mmap_t)); } @@ -202,6 +206,8 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, return NULL; } + nxt_mp_thread_adopt(process->outgoing_mp); + port_mmap = nxt_array_zero_add(process->outgoing); if (nxt_slow_path(port_mmap == NULL)) { nxt_log(task, NXT_LOG_WARN, diff --git a/src/nxt_process.h b/src/nxt_process.h index d0ed7343..28824bd0 100644 --- a/src/nxt_process.h +++ b/src/nxt_process.h @@ -57,8 +57,10 @@ typedef struct { nxt_process_init_t *init; nxt_thread_mutex_t incoming_mutex; + nxt_mp_t *incoming_mp; nxt_array_t *incoming; /* of nxt_port_mmap_t */ nxt_thread_mutex_t outgoing_mutex; + nxt_mp_t *outgoing_mp; nxt_array_t *outgoing; /* of nxt_port_mmap_t */ nxt_lvlhsh_t connected_ports; /* of nxt_port_t */ diff --git a/src/nxt_router.c b/src/nxt_router.c index 9753d289..a0c5a17d 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -926,6 +926,8 @@ nxt_router_thread_start(void *data) thread->task = &engine->task; thread->fiber = &engine->fibers->fiber; + nxt_mp_thread_adopt(engine->port->mem_pool); + engine->port->socket.task = task; nxt_port_create(task, engine->port, nxt_router_app_port_handlers); diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c index 9d7a3fd7..91c4ba70 100644 --- a/src/nxt_runtime.c +++ b/src/nxt_runtime.c @@ -1522,8 +1522,6 @@ nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid) lhq.key.start = (u_char *) &pid; lhq.proto = &lvlhsh_processes_proto; - /* TODO lock processes */ - if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) { nxt_thread_log_debug("process %PI found", pid); return lhq.value; @@ -1546,8 +1544,6 @@ nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid) lhq.key.start = (u_char *) &pid; lhq.proto = &lvlhsh_processes_proto; - /* TODO lock processes */ - if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) { nxt_thread_log_debug("process %PI found", pid); return lhq.value; @@ -1599,8 +1595,6 @@ nxt_runtime_process_add(nxt_runtime_t *rt, nxt_process_t *process) lhq.value = process; lhq.pool = rt->mem_pool; - /* TODO lock processes */ - switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) { case NXT_OK: @@ -1627,7 +1621,9 @@ nxt_runtime_process_add(nxt_runtime_t *rt, nxt_process_t *process) void nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process) { + uint32_t i; nxt_port_t *port; + nxt_port_mmap_t *port_mmap; nxt_lvlhsh_query_t lhq; lhq.key_hash = nxt_murmur_hash2(&process->pid, sizeof(process->pid)); @@ -1638,8 +1634,6 @@ nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process) lhq.value = process; lhq.pool = rt->mem_pool; - /* TODO lock processes */ - switch (nxt_lvlhsh_delete(&rt->processes, &lhq)) { case NXT_OK: @@ -1651,6 +1645,34 @@ nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process) } nxt_process_port_loop; + if (process->incoming) { + nxt_mp_thread_adopt(process->incoming_mp); + + port_mmap = process->incoming->elts; + + for (i = 0; i < process->incoming->nelts; i++) { + nxt_port_mmap_destroy(port_mmap); + } + + nxt_thread_mutex_destroy(&process->incoming_mutex); + + nxt_mp_destroy(process->incoming_mp); + } + + if (process->outgoing) { + nxt_mp_thread_adopt(process->outgoing_mp); + + port_mmap = process->outgoing->elts; + + for (i = 0; i < process->outgoing->nelts; i++) { + nxt_port_mmap_destroy(port_mmap); + } + + nxt_thread_mutex_destroy(&process->outgoing_mutex); + + nxt_mp_destroy(process->outgoing_mp); + } + nxt_mp_free(rt->mem_pool, process); break; @@ -1681,8 +1703,6 @@ nxt_runtime_port_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe) void nxt_runtime_port_add(nxt_runtime_t *rt, nxt_port_t *port) { - /* TODO lock ports */ - nxt_port_hash_add(&rt->ports, rt->mem_pool, port); } @@ -1690,8 +1710,6 @@ nxt_runtime_port_add(nxt_runtime_t *rt, nxt_port_t *port) void nxt_runtime_port_remove(nxt_runtime_t *rt, nxt_port_t *port) { - /* TODO lock ports */ - nxt_port_hash_remove(&rt->ports, rt->mem_pool, port); if (port->pair[0] != -1) { @@ -1714,7 +1732,5 @@ nxt_port_t * nxt_runtime_port_find(nxt_runtime_t *rt, nxt_pid_t pid, nxt_port_id_t port_id) { - /* TODO lock ports */ - return nxt_port_hash_find(&rt->ports, pid, port_id); } |