diff options
-rw-r--r-- | auto/sources | 1 | ||||
-rw-r--r-- | src/nxt_main.h | 1 | ||||
-rw-r--r-- | src/nxt_master_process.c | 2 | ||||
-rw-r--r-- | src/nxt_port.c | 3 | ||||
-rw-r--r-- | src/nxt_port.h | 9 | ||||
-rw-r--r-- | src/nxt_port_rpc.c | 314 | ||||
-rw-r--r-- | src/nxt_port_rpc.h | 24 | ||||
-rw-r--r-- | src/nxt_router.c | 3 | ||||
-rw-r--r-- | src/nxt_worker_process.c | 12 |
9 files changed, 369 insertions, 0 deletions
diff --git a/auto/sources b/auto/sources index 04cc6297..f4447c2a 100644 --- a/auto/sources +++ b/auto/sources @@ -90,6 +90,7 @@ NXT_LIB_SRCS=" \ src/nxt_signal.c \ src/nxt_port_socket.c \ src/nxt_port_memory.c \ + src/nxt_port_rpc.c \ src/nxt_port.c \ src/nxt_dyld.c \ src/nxt_random.c \ diff --git a/src/nxt_main.h b/src/nxt_main.h index 547a0321..72b243f4 100644 --- a/src/nxt_main.h +++ b/src/nxt_main.h @@ -126,6 +126,7 @@ nxt_thread_extern_data(nxt_thread_t, nxt_thread_context); #include <nxt_port.h> #include <nxt_port_memory.h> +#include <nxt_port_rpc.h> #if (NXT_THREADS) #include <nxt_thread_pool.h> #endif diff --git a/src/nxt_master_process.c b/src/nxt_master_process.c index bd72a752..9e32785f 100644 --- a/src/nxt_master_process.c +++ b/src/nxt_master_process.c @@ -200,6 +200,8 @@ static nxt_port_handler_t nxt_master_process_port_handlers[] = { nxt_port_master_data_handler, NULL, nxt_port_ready_handler, + nxt_port_rpc_handler, + nxt_port_rpc_handler, }; diff --git a/src/nxt_port.c b/src/nxt_port.c index 3c6b60b7..27b5ddbe 100644 --- a/src/nxt_port.c +++ b/src/nxt_port.c @@ -34,6 +34,7 @@ nxt_port_new(nxt_port_id_t id, nxt_pid_t pid, nxt_process_type_t type) port->pid = pid; port->type = type; port->mem_pool = mp; + port->next_stream = 1; nxt_queue_init(&port->messages); @@ -421,6 +422,8 @@ nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) rt = task->thread->runtime; pid = msg->port_msg.stream; + nxt_port_rpc_remove_peer(task, msg->port, pid); + process = nxt_runtime_process_find(rt, pid); if (process) { diff --git a/src/nxt_port.h b/src/nxt_port.h index 27c00132..1cbaaef1 100644 --- a/src/nxt_port.h +++ b/src/nxt_port.h @@ -21,6 +21,8 @@ typedef enum { _NXT_PORT_MSG_DATA, _NXT_PORT_MSG_REMOVE_PID, _NXT_PORT_MSG_READY, + _NXT_PORT_MSG_RPC_READY, + _NXT_PORT_MSG_RPC_ERROR, NXT_PORT_MSG_MAX, @@ -33,6 +35,9 @@ typedef enum { NXT_PORT_MSG_DATA_LAST = _NXT_PORT_MSG_DATA | NXT_PORT_MSG_LAST, NXT_PORT_MSG_REMOVE_PID = _NXT_PORT_MSG_REMOVE_PID | NXT_PORT_MSG_LAST, NXT_PORT_MSG_READY = _NXT_PORT_MSG_READY | NXT_PORT_MSG_LAST, + NXT_PORT_MSG_RPC_READY = _NXT_PORT_MSG_RPC_READY, + NXT_PORT_MSG_RPC_READY_LAST = _NXT_PORT_MSG_RPC_READY | NXT_PORT_MSG_LAST, + NXT_PORT_MSG_RPC_ERROR = _NXT_PORT_MSG_RPC_ERROR | NXT_PORT_MSG_LAST, } nxt_port_msg_type_t; @@ -104,6 +109,10 @@ struct nxt_port_s { nxt_port_id_t id; nxt_pid_t pid; + nxt_lvlhsh_t rpc_streams; /* stream to nxt_port_rpc_reg_t */ + nxt_lvlhsh_t rpc_peers; /* peer to queue of nxt_port_rpc_reg_t */ + uint32_t next_stream; + nxt_process_type_t type; nxt_work_t work; }; diff --git a/src/nxt_port_rpc.c b/src/nxt_port_rpc.c new file mode 100644 index 00000000..e6126748 --- /dev/null +++ b/src/nxt_port_rpc.c @@ -0,0 +1,314 @@ + +/* + * Copyright (C) Max Romanov + * Copyright (C) NGINX, Inc. + */ + +#include <nxt_main.h> +#include <nxt_port_rpc.h> + + +typedef struct nxt_port_rpc_reg_s nxt_port_rpc_reg_t; + +struct nxt_port_rpc_reg_s { + uint32_t stream; + + nxt_pid_t peer; + nxt_queue_link_t link; + + nxt_port_rpc_handler_t ready_handler; + nxt_port_rpc_handler_t error_handler; + void *data; +}; + + +static nxt_int_t +nxt_rpc_reg_test(nxt_lvlhsh_query_t *lhq, void *data) +{ + return NXT_OK; +} + + +static const nxt_lvlhsh_proto_t lvlhsh_rpc_reg_proto nxt_aligned(64) = { + NXT_LVLHSH_DEFAULT, + nxt_rpc_reg_test, + nxt_lvlhsh_alloc, + nxt_lvlhsh_free, +}; + + +nxt_inline void +nxt_port_rpc_lhq_stream(nxt_lvlhsh_query_t *lhq, uint32_t *stream) +{ + lhq->key_hash = nxt_murmur_hash2(stream, sizeof(*stream)); + lhq->key.length = sizeof(*stream); + lhq->key.start = (u_char *) stream; + lhq->proto = &lvlhsh_rpc_reg_proto; +} + + +nxt_inline void +nxt_port_rpc_lhq_peer(nxt_lvlhsh_query_t *lhq, nxt_pid_t *peer) +{ + lhq->key_hash = nxt_murmur_hash2(peer, sizeof(*peer)); + lhq->key.length = sizeof(*peer); + lhq->key.start = (u_char *) peer; + lhq->proto = &lvlhsh_rpc_reg_proto; +} + + +uint32_t +nxt_port_rpc_register_handler(nxt_task_t *task, nxt_port_t *port, + nxt_port_rpc_handler_t ready_handler, nxt_port_rpc_handler_t error_handler, + nxt_pid_t peer, void *data) +{ + uint32_t stream; + nxt_queue_link_t *peer_link; + nxt_port_rpc_reg_t *reg; + nxt_lvlhsh_query_t lhq; + + + nxt_assert(port->pair[0] != -1); + + stream = port->next_stream++; + + reg = nxt_mp_zalloc(port->mem_pool, sizeof(nxt_port_rpc_reg_t)); + + if (nxt_slow_path(reg == NULL)) { + nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to allocate " + "reg for stream #%uD", stream); + + return 0; + } + + reg->stream = stream; + reg->peer = peer; + reg->ready_handler = ready_handler; + reg->error_handler = error_handler; + reg->data = data; + + + nxt_port_rpc_lhq_stream(&lhq, &stream); + lhq.replace = 0; + lhq.value = reg; + lhq.pool = port->mem_pool; + + switch (nxt_lvlhsh_insert(&port->rpc_streams, &lhq)) { + + case NXT_OK: + break; + + default: + nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to add handler " + "for stream #%uD", stream); + + nxt_mp_free(port->mem_pool, reg); + + return 0; + } + + + nxt_port_rpc_lhq_peer(&lhq, &peer); + lhq.replace = 0; + lhq.value = ®->link; + lhq.pool = port->mem_pool; + + switch (nxt_lvlhsh_insert(&port->rpc_peers, &lhq)) { + + case NXT_OK: + nxt_queue_self(®->link); + break; + + case NXT_DECLINED: + peer_link = lhq.value; + nxt_queue_insert_before(peer_link, ®->link); + break; + + default: + nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to add peer " + "for stream #%uD", stream); + break; + } + + return stream; +} + + +void +nxt_port_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + uint8_t last; + uint32_t stream; + nxt_int_t ret; + nxt_port_t *port; + nxt_port_rpc_reg_t *reg; + nxt_lvlhsh_query_t lhq; + nxt_port_msg_type_t type; + + stream = msg->port_msg.stream; + port = msg->port; + last = msg->port_msg.last; + type = msg->port_msg.type; + + nxt_debug(task, "rpc: handler for stream #%uD, type %d", stream, type); + + nxt_port_rpc_lhq_stream(&lhq, &stream); + lhq.pool = port->mem_pool; + + if (last != 0) { + ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq); + } else { + ret = nxt_lvlhsh_find(&port->rpc_streams, &lhq); + } + + if (ret != NXT_OK) { + nxt_debug(task, "rpc: no handler found for stream #%uD", stream); + + return; + } + + reg = lhq.value; + + nxt_assert(reg->peer == msg->port_msg.pid); + + if (type == _NXT_PORT_MSG_RPC_ERROR) { + reg->error_handler(task, msg, reg->data); + } else { + reg->ready_handler(task, msg, reg->data); + } + + if (last == 0) { + nxt_debug(task, "rpc: keep handler for stream #%uD", stream); + + return; + } + + if (reg->link.next == ®->link) { + nxt_port_rpc_lhq_peer(&lhq, ®->peer); + lhq.pool = port->mem_pool; + + ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq); + + if (nxt_slow_path(ret != NXT_OK)) { + nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to delete " + "peer %PI", reg->peer); + } + } else { + nxt_queue_remove(®->link); + } + + nxt_mp_free(port->mem_pool, reg); +} + + +void +nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer) +{ + uint8_t last; + uint32_t stream; + nxt_int_t ret; + nxt_buf_t buf; + nxt_queue_link_t *peer_link; + nxt_port_rpc_reg_t *reg; + nxt_lvlhsh_query_t lhq; + nxt_port_recv_msg_t msg; + + nxt_port_rpc_lhq_peer(&lhq, &peer); + lhq.pool = port->mem_pool; + + ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq); + + if (nxt_slow_path(ret != NXT_OK)) { + nxt_debug(task, "rpc: no handler found for peer %PI", peer); + + return; + } + + nxt_memzero(&msg, sizeof(msg)); + nxt_memzero(&buf, sizeof(buf)); + + msg.fd = -1; + msg.buf = &buf; + msg.port = port; + + msg.port_msg.pid = peer; + msg.port_msg.type = _NXT_PORT_MSG_REMOVE_PID; + msg.port_msg.last = 1; + + peer_link = lhq.value; + last = 0; + + while (last == 0) { + + reg = nxt_queue_link_data(peer_link, nxt_port_rpc_reg_t, link); + + nxt_debug(task, "rpc: trigger error for stream #%uD", reg->stream); + + msg.port_msg.stream = reg->stream; + + reg->error_handler(task, &msg, reg->data); + + + stream = reg->stream; + + nxt_port_rpc_lhq_stream(&lhq, &stream); + lhq.pool = port->mem_pool; + + ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq); + + if (nxt_slow_path(ret != NXT_OK)) { + nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to delete " + "handler for stream #%uD", stream); + + return; + } + + if (peer_link == peer_link->next) { + last = 1; + + } else { + peer_link = peer_link->next; + nxt_queue_remove(peer_link->prev); + } + + nxt_mp_free(port->mem_pool, reg); + } +} + + +void +nxt_port_rpc_cancel(nxt_task_t *task, nxt_port_t *port, uint32_t stream) +{ + nxt_int_t ret; + nxt_port_rpc_reg_t *reg; + nxt_lvlhsh_query_t lhq; + + nxt_port_rpc_lhq_stream(&lhq, &stream); + lhq.pool = port->mem_pool; + + ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq); + + if (ret != NXT_OK) { + nxt_debug(task, "rpc: no handler found for stream %uxD", stream); + + return; + } + + reg = lhq.value; + + if (reg->link.next == ®->link) { + nxt_port_rpc_lhq_peer(&lhq, ®->peer); + lhq.pool = port->mem_pool; + + ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq); + + if (nxt_slow_path(ret != NXT_OK)) { + nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to delete " + "peer %PI", reg->peer); + } + } else { + nxt_queue_remove(®->link); + } + + nxt_mp_free(port->mem_pool, reg); +} diff --git a/src/nxt_port_rpc.h b/src/nxt_port_rpc.h new file mode 100644 index 00000000..cae20539 --- /dev/null +++ b/src/nxt_port_rpc.h @@ -0,0 +1,24 @@ + +/* + * Copyright (C) Max Romanov + * Copyright (C) NGINX, Inc. + */ + +#ifndef _NXT_PORT_RPC_H_INCLUDED_ +#define _NXT_PORT_RPC_H_INCLUDED_ + + +typedef void (*nxt_port_rpc_handler_t)(nxt_task_t *task, + nxt_port_recv_msg_t *msg, void *data); + +uint32_t nxt_port_rpc_register_handler(nxt_task_t *task, nxt_port_t *port, + nxt_port_rpc_handler_t ready_handler, nxt_port_rpc_handler_t error_handler, + nxt_pid_t peer, void *data); +void nxt_port_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); +void nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, + nxt_pid_t peer); +void nxt_port_rpc_cancel(nxt_task_t *task, nxt_port_t *port, uint32_t stream); + + +#endif /* _NXT_PORT_RPC_H_INCLUDED_ */ + diff --git a/src/nxt_router.c b/src/nxt_router.c index 64cc53b7..ecce72ac 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -1407,6 +1407,9 @@ static nxt_port_handler_t nxt_router_app_port_handlers[] = { nxt_port_mmap_handler, nxt_router_app_data_handler, nxt_port_remove_pid_handler, + NULL, + nxt_port_rpc_handler, + nxt_port_rpc_handler, }; diff --git a/src/nxt_worker_process.c b/src/nxt_worker_process.c index a7db8b6b..83c63445 100644 --- a/src/nxt_worker_process.c +++ b/src/nxt_worker_process.c @@ -29,6 +29,9 @@ nxt_port_handler_t nxt_controller_process_port_handlers[] = { nxt_port_mmap_handler, nxt_port_controller_data_handler, nxt_port_remove_pid_handler, + NULL, + nxt_port_rpc_handler, + nxt_port_rpc_handler, }; @@ -39,6 +42,9 @@ nxt_port_handler_t nxt_worker_process_port_handlers[] = { nxt_port_mmap_handler, nxt_port_data_handler, nxt_port_remove_pid_handler, + NULL, + nxt_port_rpc_handler, + nxt_port_rpc_handler, }; @@ -49,6 +55,9 @@ nxt_port_handler_t nxt_app_process_port_handlers[] = { nxt_port_mmap_handler, nxt_port_app_data_handler, nxt_port_remove_pid_handler, + NULL, + nxt_port_rpc_handler, + nxt_port_rpc_handler, }; @@ -59,6 +68,9 @@ nxt_port_handler_t nxt_router_process_port_handlers[] = { nxt_port_mmap_handler, nxt_router_conf_data_handler, nxt_port_remove_pid_handler, + NULL, + nxt_port_rpc_handler, + nxt_port_rpc_handler, }; |