diff options
author | Max Romanov <max.romanov@nginx.com> | 2017-05-12 20:32:41 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2017-05-12 20:32:41 +0300 |
commit | f7b4bdfd892a0b479dc946896435a3ba7f9615dd (patch) | |
tree | a6f0c4ebaeed2d9f0fcb1c07178b52a684a53280 /src/nxt_runtime.c | |
parent | 1782c771fab999b37a8c04ed72760e3528205be7 (diff) | |
download | unit-f7b4bdfd892a0b479dc946896435a3ba7f9615dd.tar.gz unit-f7b4bdfd892a0b479dc946896435a3ba7f9615dd.tar.bz2 |
Using shared memory to send data via nxt_port.
Usage:
b = nxt_port_mmap_get_buf(task, port, size);
b->mem.free = nxt_cpymem(b->mem.free, data, size);
nxt_port_socket_write(task, port, NXT_PORT_MSG_DATA, -1, 0, b);
Diffstat (limited to 'src/nxt_runtime.c')
-rw-r--r-- | src/nxt_runtime.c | 327 |
1 files changed, 306 insertions, 21 deletions
diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c index 84812cae..8337712a 100644 --- a/src/nxt_runtime.c +++ b/src/nxt_runtime.c @@ -16,7 +16,6 @@ static nxt_int_t nxt_runtime_inherited_listen_sockets(nxt_task_t *task, static nxt_int_t nxt_runtime_systemd_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt); static nxt_int_t nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt); -static nxt_int_t nxt_runtime_processes(nxt_runtime_t *rt); static nxt_int_t nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt); static void nxt_runtime_start(nxt_task_t *task, void *obj, void *data); static void nxt_runtime_initial_start(nxt_task_t *task); @@ -105,10 +104,6 @@ nxt_runtime_create(nxt_task_t *task) goto fail; } - if (nxt_slow_path(nxt_runtime_processes(rt) != NXT_OK)) { - goto fail; - } - if (nxt_slow_path(nxt_runtime_thread_pools(task->thread, rt) != NXT_OK)) { goto fail; } @@ -294,18 +289,6 @@ nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt) static nxt_int_t -nxt_runtime_processes(nxt_runtime_t *rt) -{ - rt->processes = nxt_array_create(rt->mem_pool, 4, sizeof(nxt_process_t)); - if (nxt_slow_path(rt->processes == NULL)) { - return NXT_ERROR; - } - - return NXT_OK; -} - - -static nxt_int_t nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt) { #if (NXT_THREADS) @@ -1479,21 +1462,323 @@ nxt_runtime_pid_file_create(nxt_task_t *task, nxt_file_name_t *pid_file) nxt_process_t * -nxt_runtime_new_process(nxt_runtime_t *rt) +nxt_runtime_process_new(nxt_runtime_t *rt) { nxt_process_t *process; /* TODO: memory failures. */ - process = nxt_array_zero_add(rt->processes); + process = nxt_mem_cache_zalloc0(rt->mem_pool, sizeof(nxt_process_t)); if (nxt_slow_path(process == NULL)) { return NULL; } - process->ports = nxt_array_create(rt->mem_pool, 1, sizeof(nxt_port_t)); - if (nxt_slow_path(process->ports == NULL)) { + nxt_queue_init(&process->ports); + + /* TODO each process should have it's own mem_pool for ports allocation */ + process->mem_pool = rt->mem_pool; + + return process; +} + + +static nxt_int_t +nxt_runtime_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data) +{ + nxt_process_t *process; + + process = data; + + if (lhq->key.length == sizeof(nxt_pid_t) && + *(nxt_pid_t *) lhq->key.start == process->pid) { + return NXT_OK; + } + + return NXT_DECLINED; +} + +static const nxt_lvlhsh_proto_t lvlhsh_processes_proto nxt_aligned(64) = { + NXT_LVLHSH_DEFAULT, + 0, + nxt_runtime_lvlhsh_pid_test, + nxt_lvlhsh_alloc, + nxt_lvlhsh_free, +}; + + +typedef struct { + nxt_pid_t pid; + nxt_port_id_t port_id; +} nxt_pid_port_id_t; + +static nxt_int_t +nxt_runtime_lvlhsh_port_test(nxt_lvlhsh_query_t *lhq, void *data) +{ + nxt_port_t *port; + nxt_pid_port_id_t *pid_port_id; + + port = data; + pid_port_id = (nxt_pid_port_id_t *) lhq->key.start; + + if (lhq->key.length == sizeof(nxt_pid_port_id_t) && + pid_port_id->pid == port->pid && + pid_port_id->port_id == port->id) { + return NXT_OK; + } + + return NXT_DECLINED; +} + +static const nxt_lvlhsh_proto_t lvlhsh_ports_proto nxt_aligned(64) = { + NXT_LVLHSH_DEFAULT, + 0, + nxt_runtime_lvlhsh_port_test, + nxt_lvlhsh_alloc, + nxt_lvlhsh_free, +}; + + +nxt_process_t * +nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid) +{ + nxt_lvlhsh_query_t lhq; + + lhq.key_hash = nxt_murmur_hash2(&pid, sizeof(pid)); + lhq.key.length = sizeof(pid); + lhq.key.start = (u_char *) &pid; + lhq.proto = &lvlhsh_processes_proto; + + /* TODO lock processes */ + + if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) { + nxt_thread_log_debug("process %PI found", pid); + return lhq.value; + } + + nxt_thread_log_debug("process %PI not found", pid); + + return NULL; +} + + +nxt_process_t * +nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid) +{ + nxt_process_t *process; + nxt_lvlhsh_query_t lhq; + + lhq.key_hash = nxt_murmur_hash2(&pid, sizeof(pid)); + lhq.key.length = sizeof(pid); + lhq.key.start = (u_char *) &pid; + lhq.proto = &lvlhsh_processes_proto; + + /* TODO lock processes */ + + if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) { + nxt_thread_log_debug("process %PI found", pid); + return lhq.value; + } + + process = nxt_runtime_process_new(rt); + if (nxt_slow_path(process == NULL)) { return NULL; } + process->pid = pid; + + lhq.replace = 0; + lhq.value = process; + lhq.pool = rt->mem_pool; + + switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) { + + case NXT_OK: + if (rt->nprocesses == 0) { + rt->mprocess = process; + } + + rt->nprocesses++; + + nxt_thread_log_debug("process %PI insert", pid); + break; + + default: + nxt_thread_log_debug("process %PI insert failed", pid); + break; + } + return process; } + + +void +nxt_runtime_process_add(nxt_runtime_t *rt, nxt_process_t *process) +{ + nxt_port_t *port; + nxt_lvlhsh_query_t lhq; + + lhq.key_hash = nxt_murmur_hash2(&process->pid, sizeof(process->pid)); + lhq.key.length = sizeof(process->pid); + lhq.key.start = (u_char *) &process->pid; + lhq.proto = &lvlhsh_processes_proto; + lhq.replace = 0; + lhq.value = process; + lhq.pool = rt->mem_pool; + + /* TODO lock processes */ + + switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) { + + case NXT_OK: + if (rt->nprocesses == 0) { + rt->mprocess = process; + } + + rt->nprocesses++; + + nxt_process_port_each(process, port) { + + nxt_runtime_port_add(rt, port); + + } nxt_process_port_loop; + + break; + + default: + break; + } +} + + +void +nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process) +{ + nxt_port_t *port; + nxt_lvlhsh_query_t lhq; + + lhq.key_hash = nxt_murmur_hash2(&process->pid, sizeof(process->pid)); + lhq.key.length = sizeof(process->pid); + lhq.key.start = (u_char *) &process->pid; + lhq.proto = &lvlhsh_processes_proto; + lhq.replace = 0; + lhq.value = process; + lhq.pool = rt->mem_pool; + + /* TODO lock processes */ + + switch (nxt_lvlhsh_delete(&rt->processes, &lhq)) { + + case NXT_OK: + rt->nprocesses--; + + nxt_process_port_each(process, port) { + + nxt_runtime_port_remove(rt, port); + + } nxt_process_port_loop; + + break; + + default: + break; + } +} + + +nxt_process_t * +nxt_runtime_process_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe) +{ + nxt_memzero(lhe, sizeof(nxt_lvlhsh_each_t)); + + lhe->proto = &lvlhsh_processes_proto; + + return nxt_runtime_process_next(rt, lhe); +} + + +void +nxt_runtime_port_add(nxt_runtime_t *rt, nxt_port_t *port) +{ + nxt_pid_port_id_t pid_port; + nxt_lvlhsh_query_t lhq; + + pid_port.pid = port->pid; + pid_port.port_id = port->id; + + lhq.key_hash = nxt_murmur_hash2(&pid_port, sizeof(pid_port)); + lhq.key.length = sizeof(pid_port); + lhq.key.start = (u_char *) &pid_port; + lhq.proto = &lvlhsh_ports_proto; + lhq.replace = 0; + lhq.value = port; + lhq.pool = rt->mem_pool; + + /* TODO lock ports */ + + switch (nxt_lvlhsh_insert(&rt->ports, &lhq)) { + + case NXT_OK: + break; + + default: + break; + } +} + + +void +nxt_runtime_port_remove(nxt_runtime_t *rt, nxt_port_t *port) +{ + nxt_pid_port_id_t pid_port; + nxt_lvlhsh_query_t lhq; + + pid_port.pid = port->pid; + pid_port.port_id = port->id; + + lhq.key_hash = nxt_murmur_hash2(&pid_port, sizeof(pid_port)); + lhq.key.length = sizeof(pid_port); + lhq.key.start = (u_char *) &pid_port; + lhq.proto = &lvlhsh_ports_proto; + lhq.replace = 0; + lhq.value = port; + lhq.pool = rt->mem_pool; + + /* TODO lock ports */ + + switch (nxt_lvlhsh_delete(&rt->ports, &lhq)) { + + case NXT_OK: + break; + + default: + break; + } +} + + +nxt_port_t * +nxt_runtime_port_find(nxt_runtime_t *rt, nxt_pid_t pid, + nxt_port_id_t port_id) +{ + nxt_pid_port_id_t pid_port; + nxt_lvlhsh_query_t lhq; + + pid_port.pid = pid; + pid_port.port_id = port_id; + + lhq.key_hash = nxt_murmur_hash2(&pid_port, sizeof(pid_port)); + lhq.key.length = sizeof(pid_port); + lhq.key.start = (u_char *) &pid_port; + lhq.proto = &lvlhsh_ports_proto; + + /* TODO lock ports */ + + if (nxt_lvlhsh_find(&rt->ports, &lhq) == NXT_OK) { + nxt_thread_log_debug("process port (%PI, %d) found", pid, port_id); + return lhq.value; + } + + nxt_thread_log_debug("process port (%PI, %d) not found", pid, port_id); + + return NULL; +} |