summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--auto/sources1
-rw-r--r--src/nxt_main.h1
-rw-r--r--src/nxt_master_process.c2
-rw-r--r--src/nxt_port.c3
-rw-r--r--src/nxt_port.h9
-rw-r--r--src/nxt_port_rpc.c314
-rw-r--r--src/nxt_port_rpc.h24
-rw-r--r--src/nxt_router.c3
-rw-r--r--src/nxt_worker_process.c12
9 files changed, 369 insertions, 0 deletions
diff --git a/auto/sources b/auto/sources
index 04cc6297..f4447c2a 100644
--- a/auto/sources
+++ b/auto/sources
@@ -90,6 +90,7 @@ NXT_LIB_SRCS=" \
src/nxt_signal.c \
src/nxt_port_socket.c \
src/nxt_port_memory.c \
+ src/nxt_port_rpc.c \
src/nxt_port.c \
src/nxt_dyld.c \
src/nxt_random.c \
diff --git a/src/nxt_main.h b/src/nxt_main.h
index 547a0321..72b243f4 100644
--- a/src/nxt_main.h
+++ b/src/nxt_main.h
@@ -126,6 +126,7 @@ nxt_thread_extern_data(nxt_thread_t, nxt_thread_context);
#include <nxt_port.h>
#include <nxt_port_memory.h>
+#include <nxt_port_rpc.h>
#if (NXT_THREADS)
#include <nxt_thread_pool.h>
#endif
diff --git a/src/nxt_master_process.c b/src/nxt_master_process.c
index bd72a752..9e32785f 100644
--- a/src/nxt_master_process.c
+++ b/src/nxt_master_process.c
@@ -200,6 +200,8 @@ static nxt_port_handler_t nxt_master_process_port_handlers[] = {
nxt_port_master_data_handler,
NULL,
nxt_port_ready_handler,
+ nxt_port_rpc_handler,
+ nxt_port_rpc_handler,
};
diff --git a/src/nxt_port.c b/src/nxt_port.c
index 3c6b60b7..27b5ddbe 100644
--- a/src/nxt_port.c
+++ b/src/nxt_port.c
@@ -34,6 +34,7 @@ nxt_port_new(nxt_port_id_t id, nxt_pid_t pid, nxt_process_type_t type)
port->pid = pid;
port->type = type;
port->mem_pool = mp;
+ port->next_stream = 1;
nxt_queue_init(&port->messages);
@@ -421,6 +422,8 @@ nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
rt = task->thread->runtime;
pid = msg->port_msg.stream;
+ nxt_port_rpc_remove_peer(task, msg->port, pid);
+
process = nxt_runtime_process_find(rt, pid);
if (process) {
diff --git a/src/nxt_port.h b/src/nxt_port.h
index 27c00132..1cbaaef1 100644
--- a/src/nxt_port.h
+++ b/src/nxt_port.h
@@ -21,6 +21,8 @@ typedef enum {
_NXT_PORT_MSG_DATA,
_NXT_PORT_MSG_REMOVE_PID,
_NXT_PORT_MSG_READY,
+ _NXT_PORT_MSG_RPC_READY,
+ _NXT_PORT_MSG_RPC_ERROR,
NXT_PORT_MSG_MAX,
@@ -33,6 +35,9 @@ typedef enum {
NXT_PORT_MSG_DATA_LAST = _NXT_PORT_MSG_DATA | NXT_PORT_MSG_LAST,
NXT_PORT_MSG_REMOVE_PID = _NXT_PORT_MSG_REMOVE_PID | NXT_PORT_MSG_LAST,
NXT_PORT_MSG_READY = _NXT_PORT_MSG_READY | NXT_PORT_MSG_LAST,
+ NXT_PORT_MSG_RPC_READY = _NXT_PORT_MSG_RPC_READY,
+ NXT_PORT_MSG_RPC_READY_LAST = _NXT_PORT_MSG_RPC_READY | NXT_PORT_MSG_LAST,
+ NXT_PORT_MSG_RPC_ERROR = _NXT_PORT_MSG_RPC_ERROR | NXT_PORT_MSG_LAST,
} nxt_port_msg_type_t;
@@ -104,6 +109,10 @@ struct nxt_port_s {
nxt_port_id_t id;
nxt_pid_t pid;
+ nxt_lvlhsh_t rpc_streams; /* stream to nxt_port_rpc_reg_t */
+ nxt_lvlhsh_t rpc_peers; /* peer to queue of nxt_port_rpc_reg_t */
+ uint32_t next_stream;
+
nxt_process_type_t type;
nxt_work_t work;
};
diff --git a/src/nxt_port_rpc.c b/src/nxt_port_rpc.c
new file mode 100644
index 00000000..e6126748
--- /dev/null
+++ b/src/nxt_port_rpc.c
@@ -0,0 +1,314 @@
+
+/*
+ * Copyright (C) Max Romanov
+ * Copyright (C) NGINX, Inc.
+ */
+
+#include <nxt_main.h>
+#include <nxt_port_rpc.h>
+
+
+typedef struct nxt_port_rpc_reg_s nxt_port_rpc_reg_t;
+
+struct nxt_port_rpc_reg_s {
+ uint32_t stream;
+
+ nxt_pid_t peer;
+ nxt_queue_link_t link;
+
+ nxt_port_rpc_handler_t ready_handler;
+ nxt_port_rpc_handler_t error_handler;
+ void *data;
+};
+
+
+static nxt_int_t
+nxt_rpc_reg_test(nxt_lvlhsh_query_t *lhq, void *data)
+{
+ return NXT_OK;
+}
+
+
+static const nxt_lvlhsh_proto_t lvlhsh_rpc_reg_proto nxt_aligned(64) = {
+ NXT_LVLHSH_DEFAULT,
+ nxt_rpc_reg_test,
+ nxt_lvlhsh_alloc,
+ nxt_lvlhsh_free,
+};
+
+
+nxt_inline void
+nxt_port_rpc_lhq_stream(nxt_lvlhsh_query_t *lhq, uint32_t *stream)
+{
+ lhq->key_hash = nxt_murmur_hash2(stream, sizeof(*stream));
+ lhq->key.length = sizeof(*stream);
+ lhq->key.start = (u_char *) stream;
+ lhq->proto = &lvlhsh_rpc_reg_proto;
+}
+
+
+nxt_inline void
+nxt_port_rpc_lhq_peer(nxt_lvlhsh_query_t *lhq, nxt_pid_t *peer)
+{
+ lhq->key_hash = nxt_murmur_hash2(peer, sizeof(*peer));
+ lhq->key.length = sizeof(*peer);
+ lhq->key.start = (u_char *) peer;
+ lhq->proto = &lvlhsh_rpc_reg_proto;
+}
+
+
+uint32_t
+nxt_port_rpc_register_handler(nxt_task_t *task, nxt_port_t *port,
+ nxt_port_rpc_handler_t ready_handler, nxt_port_rpc_handler_t error_handler,
+ nxt_pid_t peer, void *data)
+{
+ uint32_t stream;
+ nxt_queue_link_t *peer_link;
+ nxt_port_rpc_reg_t *reg;
+ nxt_lvlhsh_query_t lhq;
+
+
+ nxt_assert(port->pair[0] != -1);
+
+ stream = port->next_stream++;
+
+ reg = nxt_mp_zalloc(port->mem_pool, sizeof(nxt_port_rpc_reg_t));
+
+ if (nxt_slow_path(reg == NULL)) {
+ nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to allocate "
+ "reg for stream #%uD", stream);
+
+ return 0;
+ }
+
+ reg->stream = stream;
+ reg->peer = peer;
+ reg->ready_handler = ready_handler;
+ reg->error_handler = error_handler;
+ reg->data = data;
+
+
+ nxt_port_rpc_lhq_stream(&lhq, &stream);
+ lhq.replace = 0;
+ lhq.value = reg;
+ lhq.pool = port->mem_pool;
+
+ switch (nxt_lvlhsh_insert(&port->rpc_streams, &lhq)) {
+
+ case NXT_OK:
+ break;
+
+ default:
+ nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to add handler "
+ "for stream #%uD", stream);
+
+ nxt_mp_free(port->mem_pool, reg);
+
+ return 0;
+ }
+
+
+ nxt_port_rpc_lhq_peer(&lhq, &peer);
+ lhq.replace = 0;
+ lhq.value = &reg->link;
+ lhq.pool = port->mem_pool;
+
+ switch (nxt_lvlhsh_insert(&port->rpc_peers, &lhq)) {
+
+ case NXT_OK:
+ nxt_queue_self(&reg->link);
+ break;
+
+ case NXT_DECLINED:
+ peer_link = lhq.value;
+ nxt_queue_insert_before(peer_link, &reg->link);
+ break;
+
+ default:
+ nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to add peer "
+ "for stream #%uD", stream);
+ break;
+ }
+
+ return stream;
+}
+
+
+void
+nxt_port_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
+{
+ uint8_t last;
+ uint32_t stream;
+ nxt_int_t ret;
+ nxt_port_t *port;
+ nxt_port_rpc_reg_t *reg;
+ nxt_lvlhsh_query_t lhq;
+ nxt_port_msg_type_t type;
+
+ stream = msg->port_msg.stream;
+ port = msg->port;
+ last = msg->port_msg.last;
+ type = msg->port_msg.type;
+
+ nxt_debug(task, "rpc: handler for stream #%uD, type %d", stream, type);
+
+ nxt_port_rpc_lhq_stream(&lhq, &stream);
+ lhq.pool = port->mem_pool;
+
+ if (last != 0) {
+ ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq);
+ } else {
+ ret = nxt_lvlhsh_find(&port->rpc_streams, &lhq);
+ }
+
+ if (ret != NXT_OK) {
+ nxt_debug(task, "rpc: no handler found for stream #%uD", stream);
+
+ return;
+ }
+
+ reg = lhq.value;
+
+ nxt_assert(reg->peer == msg->port_msg.pid);
+
+ if (type == _NXT_PORT_MSG_RPC_ERROR) {
+ reg->error_handler(task, msg, reg->data);
+ } else {
+ reg->ready_handler(task, msg, reg->data);
+ }
+
+ if (last == 0) {
+ nxt_debug(task, "rpc: keep handler for stream #%uD", stream);
+
+ return;
+ }
+
+ if (reg->link.next == &reg->link) {
+ nxt_port_rpc_lhq_peer(&lhq, &reg->peer);
+ lhq.pool = port->mem_pool;
+
+ ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq);
+
+ if (nxt_slow_path(ret != NXT_OK)) {
+ nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to delete "
+ "peer %PI", reg->peer);
+ }
+ } else {
+ nxt_queue_remove(&reg->link);
+ }
+
+ nxt_mp_free(port->mem_pool, reg);
+}
+
+
+void
+nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer)
+{
+ uint8_t last;
+ uint32_t stream;
+ nxt_int_t ret;
+ nxt_buf_t buf;
+ nxt_queue_link_t *peer_link;
+ nxt_port_rpc_reg_t *reg;
+ nxt_lvlhsh_query_t lhq;
+ nxt_port_recv_msg_t msg;
+
+ nxt_port_rpc_lhq_peer(&lhq, &peer);
+ lhq.pool = port->mem_pool;
+
+ ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq);
+
+ if (nxt_slow_path(ret != NXT_OK)) {
+ nxt_debug(task, "rpc: no handler found for peer %PI", peer);
+
+ return;
+ }
+
+ nxt_memzero(&msg, sizeof(msg));
+ nxt_memzero(&buf, sizeof(buf));
+
+ msg.fd = -1;
+ msg.buf = &buf;
+ msg.port = port;
+
+ msg.port_msg.pid = peer;
+ msg.port_msg.type = _NXT_PORT_MSG_REMOVE_PID;
+ msg.port_msg.last = 1;
+
+ peer_link = lhq.value;
+ last = 0;
+
+ while (last == 0) {
+
+ reg = nxt_queue_link_data(peer_link, nxt_port_rpc_reg_t, link);
+
+ nxt_debug(task, "rpc: trigger error for stream #%uD", reg->stream);
+
+ msg.port_msg.stream = reg->stream;
+
+ reg->error_handler(task, &msg, reg->data);
+
+
+ stream = reg->stream;
+
+ nxt_port_rpc_lhq_stream(&lhq, &stream);
+ lhq.pool = port->mem_pool;
+
+ ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq);
+
+ if (nxt_slow_path(ret != NXT_OK)) {
+ nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to delete "
+ "handler for stream #%uD", stream);
+
+ return;
+ }
+
+ if (peer_link == peer_link->next) {
+ last = 1;
+
+ } else {
+ peer_link = peer_link->next;
+ nxt_queue_remove(peer_link->prev);
+ }
+
+ nxt_mp_free(port->mem_pool, reg);
+ }
+}
+
+
+void
+nxt_port_rpc_cancel(nxt_task_t *task, nxt_port_t *port, uint32_t stream)
+{
+ nxt_int_t ret;
+ nxt_port_rpc_reg_t *reg;
+ nxt_lvlhsh_query_t lhq;
+
+ nxt_port_rpc_lhq_stream(&lhq, &stream);
+ lhq.pool = port->mem_pool;
+
+ ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq);
+
+ if (ret != NXT_OK) {
+ nxt_debug(task, "rpc: no handler found for stream %uxD", stream);
+
+ return;
+ }
+
+ reg = lhq.value;
+
+ if (reg->link.next == &reg->link) {
+ nxt_port_rpc_lhq_peer(&lhq, &reg->peer);
+ lhq.pool = port->mem_pool;
+
+ ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq);
+
+ if (nxt_slow_path(ret != NXT_OK)) {
+ nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to delete "
+ "peer %PI", reg->peer);
+ }
+ } else {
+ nxt_queue_remove(&reg->link);
+ }
+
+ nxt_mp_free(port->mem_pool, reg);
+}
diff --git a/src/nxt_port_rpc.h b/src/nxt_port_rpc.h
new file mode 100644
index 00000000..cae20539
--- /dev/null
+++ b/src/nxt_port_rpc.h
@@ -0,0 +1,24 @@
+
+/*
+ * Copyright (C) Max Romanov
+ * Copyright (C) NGINX, Inc.
+ */
+
+#ifndef _NXT_PORT_RPC_H_INCLUDED_
+#define _NXT_PORT_RPC_H_INCLUDED_
+
+
+typedef void (*nxt_port_rpc_handler_t)(nxt_task_t *task,
+ nxt_port_recv_msg_t *msg, void *data);
+
+uint32_t nxt_port_rpc_register_handler(nxt_task_t *task, nxt_port_t *port,
+ nxt_port_rpc_handler_t ready_handler, nxt_port_rpc_handler_t error_handler,
+ nxt_pid_t peer, void *data);
+void nxt_port_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
+void nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port,
+ nxt_pid_t peer);
+void nxt_port_rpc_cancel(nxt_task_t *task, nxt_port_t *port, uint32_t stream);
+
+
+#endif /* _NXT_PORT_RPC_H_INCLUDED_ */
+
diff --git a/src/nxt_router.c b/src/nxt_router.c
index 64cc53b7..ecce72ac 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -1407,6 +1407,9 @@ static nxt_port_handler_t nxt_router_app_port_handlers[] = {
nxt_port_mmap_handler,
nxt_router_app_data_handler,
nxt_port_remove_pid_handler,
+ NULL,
+ nxt_port_rpc_handler,
+ nxt_port_rpc_handler,
};
diff --git a/src/nxt_worker_process.c b/src/nxt_worker_process.c
index a7db8b6b..83c63445 100644
--- a/src/nxt_worker_process.c
+++ b/src/nxt_worker_process.c
@@ -29,6 +29,9 @@ nxt_port_handler_t nxt_controller_process_port_handlers[] = {
nxt_port_mmap_handler,
nxt_port_controller_data_handler,
nxt_port_remove_pid_handler,
+ NULL,
+ nxt_port_rpc_handler,
+ nxt_port_rpc_handler,
};
@@ -39,6 +42,9 @@ nxt_port_handler_t nxt_worker_process_port_handlers[] = {
nxt_port_mmap_handler,
nxt_port_data_handler,
nxt_port_remove_pid_handler,
+ NULL,
+ nxt_port_rpc_handler,
+ nxt_port_rpc_handler,
};
@@ -49,6 +55,9 @@ nxt_port_handler_t nxt_app_process_port_handlers[] = {
nxt_port_mmap_handler,
nxt_port_app_data_handler,
nxt_port_remove_pid_handler,
+ NULL,
+ nxt_port_rpc_handler,
+ nxt_port_rpc_handler,
};
@@ -59,6 +68,9 @@ nxt_port_handler_t nxt_router_process_port_handlers[] = {
nxt_port_mmap_handler,
nxt_router_conf_data_handler,
nxt_port_remove_pid_handler,
+ NULL,
+ nxt_port_rpc_handler,
+ nxt_port_rpc_handler,
};