summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_port_rpc.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nxt_port_rpc.c')
-rw-r--r--src/nxt_port_rpc.c261
1 files changed, 188 insertions, 73 deletions
diff --git a/src/nxt_port_rpc.c b/src/nxt_port_rpc.c
index c5821439..04de7775 100644
--- a/src/nxt_port_rpc.c
+++ b/src/nxt_port_rpc.c
@@ -8,6 +8,8 @@
#include <nxt_port_rpc.h>
+static nxt_atomic_t nxt_stream_ident = 1;
+
typedef struct nxt_port_rpc_reg_s nxt_port_rpc_reg_t;
struct nxt_port_rpc_reg_s {
@@ -15,6 +17,7 @@ struct nxt_port_rpc_reg_s {
nxt_pid_t peer;
nxt_queue_link_t link;
+ nxt_bool_t link_first;
nxt_port_rpc_handler_t ready_handler;
nxt_port_rpc_handler_t error_handler;
@@ -62,30 +65,57 @@ nxt_port_rpc_register_handler(nxt_task_t *task, nxt_port_t *port,
nxt_port_rpc_handler_t ready_handler, nxt_port_rpc_handler_t error_handler,
nxt_pid_t peer, void *data)
{
+ void *ex;
+ nxt_port_rpc_reg_t *reg;
+
+ ex = nxt_port_rpc_register_handler_ex(task, port, ready_handler,
+ error_handler, 0);
+
+ if (ex == NULL) {
+ return 0;
+ }
+
+ if (peer != -1) {
+ nxt_port_rpc_ex_set_peer(task, port, ex, peer);
+ }
+
+ reg = nxt_pointer_to(ex, -sizeof(nxt_port_rpc_reg_t));
+
+ nxt_assert(reg->data == ex);
+
+ reg->data = data;
+
+ return reg->stream;
+}
+
+
+void *
+nxt_port_rpc_register_handler_ex(nxt_task_t *task, nxt_port_t *port,
+ nxt_port_rpc_handler_t ready_handler, nxt_port_rpc_handler_t error_handler,
+ size_t ex_size)
+{
uint32_t stream;
- nxt_queue_link_t *peer_link;
nxt_port_rpc_reg_t *reg;
nxt_lvlhsh_query_t lhq;
nxt_assert(port->pair[0] != -1);
- stream = port->next_stream++;
+ stream =
+ (uint32_t) nxt_atomic_fetch_add(&nxt_stream_ident, 1) & 0x3fffffff;
- reg = nxt_mp_zalloc(port->mem_pool, sizeof(nxt_port_rpc_reg_t));
+ reg = nxt_mp_zalloc(port->mem_pool, sizeof(nxt_port_rpc_reg_t) + ex_size);
if (nxt_slow_path(reg == NULL)) {
- nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to allocate "
- "reg for stream #%uD", stream);
+ nxt_debug(task, "rpc: stream #%uD failed to allocate reg", stream);
- return 0;
+ return NULL;
}
reg->stream = stream;
- reg->peer = peer;
+ reg->peer = -1;
reg->ready_handler = ready_handler;
reg->error_handler = error_handler;
- reg->data = data;
-
+ reg->data = reg + 1;
nxt_port_rpc_lhq_stream(&lhq, &stream);
lhq.replace = 0;
@@ -98,39 +128,140 @@ nxt_port_rpc_register_handler(nxt_task_t *task, nxt_port_t *port,
break;
default:
- nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to add handler "
- "for stream #%uD", stream);
+ nxt_log_error(NXT_LOG_ERR, task->log, "rpc: stream #%uD failed to add "
+ "reg ", stream);
nxt_mp_free(port->mem_pool, reg);
- return 0;
+ return NULL;
}
- if (peer != -1) {
- nxt_port_rpc_lhq_peer(&lhq, &peer);
- lhq.replace = 0;
- lhq.value = &reg->link;
+ nxt_debug(task, "rpc: stream #%uD registered", stream);
+
+ return reg->data;
+}
+
+
+uint32_t
+nxt_port_rpc_ex_stream(void *ex)
+{
+ nxt_port_rpc_reg_t *reg;
+
+ reg = nxt_pointer_to(ex, -sizeof(nxt_port_rpc_reg_t));
+
+ nxt_assert(reg->data == ex);
+
+ return reg->stream;
+}
+
+
+void
+nxt_port_rpc_ex_set_peer(nxt_task_t *task, nxt_port_t *port,
+ void *ex, nxt_pid_t peer)
+{
+ nxt_int_t ret;
+ nxt_queue_link_t *peer_link;
+ nxt_port_rpc_reg_t *reg;
+ nxt_lvlhsh_query_t lhq;
+
+ reg = nxt_pointer_to(ex, -sizeof(nxt_port_rpc_reg_t));
+
+ nxt_assert(reg->data == ex);
+
+ if (peer == -1 || reg->peer != -1) {
+ nxt_log_error(NXT_LOG_ERR, task->log, "rpc: stream #%uD failed to "
+ "change peer %PI->%PI", reg->stream, reg->peer, peer);
+
+ return;
+ }
+
+ reg->peer = peer;
+
+ nxt_port_rpc_lhq_peer(&lhq, &peer);
+ lhq.replace = 0;
+ lhq.value = &reg->link;
+ lhq.pool = port->mem_pool;
+
+ ret = nxt_lvlhsh_insert(&port->rpc_peers, &lhq);
+
+ switch (ret) {
+
+ case NXT_OK:
+ reg->link_first = 1;
+ nxt_queue_self(&reg->link);
+
+ nxt_debug(task, "rpc: stream #%uD assigned uniq pid %PI (%p)",
+ reg->stream, reg->peer, reg->link.next);
+ break;
+
+ case NXT_DECLINED:
+ reg->link_first = 0;
+ peer_link = lhq.value;
+ nxt_queue_insert_after(peer_link, &reg->link);
+
+ nxt_debug(task, "rpc: stream #%uD assigned duplicate pid %PI (%p)",
+ reg->stream, reg->peer, reg->link.next);
+ break;
+
+ default:
+ nxt_log_error(NXT_LOG_ERR, task->log, "rpc: stream #%uD failed to add "
+ "peer for stream #%uD (%d)", reg->stream, ret);
+
+ reg->peer = -1;
+ break;
+ }
+
+}
+
+
+static void
+nxt_port_rpc_remove_from_peers(nxt_task_t *task, nxt_port_t *port,
+ nxt_port_rpc_reg_t *reg)
+{
+ uint32_t stream;
+ nxt_int_t ret;
+ nxt_lvlhsh_query_t lhq;
+ nxt_port_rpc_reg_t *r;
+
+ stream = reg->stream;
+
+ if (reg->link_first != 0) {
+ nxt_port_rpc_lhq_peer(&lhq, &reg->peer);
lhq.pool = port->mem_pool;
- switch (nxt_lvlhsh_insert(&port->rpc_peers, &lhq)) {
+ if (reg->link.next == &reg->link) {
+ nxt_assert(reg->link.prev == &reg->link);
- case NXT_OK:
- nxt_queue_self(&reg->link);
- break;
+ nxt_debug(task, "rpc: stream #%uD remove first and last pid %PI "
+ "registration (%p)", stream, reg->peer, reg->link.next);
- case NXT_DECLINED:
- peer_link = lhq.value;
- nxt_queue_insert_before(peer_link, &reg->link);
- break;
+ ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq);
+ } else {
+ nxt_debug(task, "rpc: stream #%uD remove first pid %PI "
+ "registration (%p)", stream, reg->peer, reg->link.next);
+
+ lhq.replace = 1;
+ lhq.value = reg->link.next;
- default:
- nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to add peer "
- "for stream #%uD", stream);
- break;
+ r = nxt_queue_link_data(reg->link.next, nxt_port_rpc_reg_t, link);
+ r->link_first = 1;
+
+ nxt_queue_remove(&reg->link);
+
+ ret = nxt_lvlhsh_insert(&port->rpc_peers, &lhq);
}
+ } else {
+ nxt_debug(task, "rpc: stream #%uD remove pid %PI "
+ "registration (%p)", stream, reg->peer, reg->link.next);
+
+ nxt_queue_remove(&reg->link);
+ ret = NXT_OK;
}
- return stream;
+ if (nxt_slow_path(ret != NXT_OK)) {
+ nxt_log_error(NXT_LOG_ERR, task->log, "rpc: stream #%uD failed"
+ " to delete peer %PI (%d)", stream, reg->peer, ret);
+ }
}
@@ -150,8 +281,6 @@ nxt_port_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
last = msg->port_msg.last;
type = msg->port_msg.type;
- nxt_debug(task, "rpc: handler for stream #%uD, type %d", stream, type);
-
nxt_port_rpc_lhq_stream(&lhq, &stream);
lhq.pool = port->mem_pool;
@@ -163,11 +292,14 @@ nxt_port_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
}
if (ret != NXT_OK) {
- nxt_debug(task, "rpc: no handler found for stream #%uD", stream);
+ nxt_debug(task, "rpc: stream #%uD no handler found", stream);
return;
}
+ nxt_debug(task, "rpc: stream #%uD %shandler, type %d", stream,
+ (last ? "last " : ""), type);
+
reg = lhq.value;
if (reg->peer != -1) {
@@ -182,28 +314,15 @@ nxt_port_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
}
if (last == 0) {
- nxt_debug(task, "rpc: keep handler for stream #%uD", stream);
-
return;
}
if (reg->peer != -1) {
- if (reg->link.next == &reg->link) {
- nxt_port_rpc_lhq_peer(&lhq, &reg->peer);
- lhq.pool = port->mem_pool;
-
- ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq);
-
- if (nxt_slow_path(ret != NXT_OK)) {
- nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to delete "
- "peer %PI", reg->peer);
- }
-
- } else {
- nxt_queue_remove(&reg->link);
- }
+ nxt_port_rpc_remove_from_peers(task, port, reg);
}
+ nxt_debug(task, "rpc: stream #%uD free registration", stream);
+
nxt_mp_free(port->mem_pool, reg);
}
@@ -215,7 +334,7 @@ nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer)
uint32_t stream;
nxt_int_t ret;
nxt_buf_t buf;
- nxt_queue_link_t *peer_link;
+ nxt_queue_link_t *peer_link, *next_link;
nxt_port_rpc_reg_t *reg;
nxt_lvlhsh_query_t lhq;
nxt_port_recv_msg_t msg;
@@ -226,7 +345,7 @@ nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer)
ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq);
if (nxt_slow_path(ret != NXT_OK)) {
- nxt_debug(task, "rpc: no handler found for peer %PI", peer);
+ nxt_debug(task, "rpc: no reg found for peer %PI", peer);
return;
}
@@ -249,14 +368,15 @@ nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer)
reg = nxt_queue_link_data(peer_link, nxt_port_rpc_reg_t, link);
- nxt_debug(task, "rpc: trigger error for stream #%uD", reg->stream);
+ nxt_assert(reg->peer == peer);
- msg.port_msg.stream = reg->stream;
+ stream = reg->stream;
- reg->error_handler(task, &msg, reg->data);
+ nxt_debug(task, "rpc: stream #%uD trigger error", stream);
+ msg.port_msg.stream = stream;
- stream = reg->stream;
+ reg->error_handler(task, &msg, reg->data);
nxt_port_rpc_lhq_stream(&lhq, &stream);
lhq.pool = port->mem_pool;
@@ -265,18 +385,24 @@ nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer)
if (nxt_slow_path(ret != NXT_OK)) {
nxt_log_error(NXT_LOG_ERR, task->log,
- "rpc: failed to delete handler for stream #%uD",
- stream);
+ "rpc: stream #%uD failed to delete handler", stream);
return;
}
if (peer_link == peer_link->next) {
+ nxt_assert(peer_link->prev == peer_link);
+
last = 1;
} else {
- peer_link = peer_link->next;
- nxt_queue_remove(peer_link->prev);
+ nxt_assert(peer_link->next->prev == peer_link);
+ nxt_assert(peer_link->prev->next == peer_link);
+
+ next_link = peer_link->next;
+ nxt_queue_remove(peer_link);
+
+ peer_link = next_link;
}
nxt_mp_free(port->mem_pool, reg);
@@ -297,7 +423,7 @@ nxt_port_rpc_cancel(nxt_task_t *task, nxt_port_t *port, uint32_t stream)
ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq);
if (ret != NXT_OK) {
- nxt_debug(task, "rpc: no handler found for stream %uxD", stream);
+ nxt_debug(task, "rpc: stream #%uD no handler found", stream);
return;
}
@@ -305,21 +431,10 @@ nxt_port_rpc_cancel(nxt_task_t *task, nxt_port_t *port, uint32_t stream)
reg = lhq.value;
if (reg->peer != -1) {
- if (reg->link.next == &reg->link) {
- nxt_port_rpc_lhq_peer(&lhq, &reg->peer);
- lhq.pool = port->mem_pool;
-
- ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq);
-
- if (nxt_slow_path(ret != NXT_OK)) {
- nxt_log_error(NXT_LOG_ERR, task->log,
- "rpc: failed to delete peer %PI", reg->peer);
- }
-
- } else {
- nxt_queue_remove(&reg->link);
- }
+ nxt_port_rpc_remove_from_peers(task, port, reg);
}
+ nxt_debug(task, "rpc: stream #%uD cancel registration", stream);
+
nxt_mp_free(port->mem_pool, reg);
}