summaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/nxt_port_rpc.c79
1 files changed, 43 insertions, 36 deletions
diff --git a/src/nxt_port_rpc.c b/src/nxt_port_rpc.c
index e6126748..6f31a296 100644
--- a/src/nxt_port_rpc.c
+++ b/src/nxt_port_rpc.c
@@ -107,27 +107,28 @@ nxt_port_rpc_register_handler(nxt_task_t *task, nxt_port_t *port,
return 0;
}
+ if (peer != -1) {
+ nxt_port_rpc_lhq_peer(&lhq, &peer);
+ lhq.replace = 0;
+ lhq.value = &reg->link;
+ lhq.pool = port->mem_pool;
- nxt_port_rpc_lhq_peer(&lhq, &peer);
- lhq.replace = 0;
- lhq.value = &reg->link;
- lhq.pool = port->mem_pool;
-
- switch (nxt_lvlhsh_insert(&port->rpc_peers, &lhq)) {
+ switch (nxt_lvlhsh_insert(&port->rpc_peers, &lhq)) {
- case NXT_OK:
- nxt_queue_self(&reg->link);
- break;
+ case NXT_OK:
+ nxt_queue_self(&reg->link);
+ break;
- case NXT_DECLINED:
- peer_link = lhq.value;
- nxt_queue_insert_before(peer_link, &reg->link);
- break;
+ case NXT_DECLINED:
+ peer_link = lhq.value;
+ nxt_queue_insert_before(peer_link, &reg->link);
+ break;
- default:
- nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to add peer "
- "for stream #%uD", stream);
- break;
+ default:
+ nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to add peer "
+ "for stream #%uD", stream);
+ break;
+ }
}
return stream;
@@ -169,7 +170,9 @@ nxt_port_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
reg = lhq.value;
- nxt_assert(reg->peer == msg->port_msg.pid);
+ if (reg->peer != -1) {
+ nxt_assert(reg->peer == msg->port_msg.pid);
+ }
if (type == _NXT_PORT_MSG_RPC_ERROR) {
reg->error_handler(task, msg, reg->data);
@@ -183,18 +186,20 @@ nxt_port_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
return;
}
- if (reg->link.next == &reg->link) {
- nxt_port_rpc_lhq_peer(&lhq, &reg->peer);
- lhq.pool = port->mem_pool;
+ if (reg->peer != -1) {
+ if (reg->link.next == &reg->link) {
+ nxt_port_rpc_lhq_peer(&lhq, &reg->peer);
+ lhq.pool = port->mem_pool;
- ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq);
+ ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq);
- if (nxt_slow_path(ret != NXT_OK)) {
- nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to delete "
- "peer %PI", reg->peer);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to delete "
+ "peer %PI", reg->peer);
+ }
+ } else {
+ nxt_queue_remove(&reg->link);
}
- } else {
- nxt_queue_remove(&reg->link);
}
nxt_mp_free(port->mem_pool, reg);
@@ -296,18 +301,20 @@ nxt_port_rpc_cancel(nxt_task_t *task, nxt_port_t *port, uint32_t stream)
reg = lhq.value;
- if (reg->link.next == &reg->link) {
- nxt_port_rpc_lhq_peer(&lhq, &reg->peer);
- lhq.pool = port->mem_pool;
+ if (reg->peer != -1) {
+ if (reg->link.next == &reg->link) {
+ nxt_port_rpc_lhq_peer(&lhq, &reg->peer);
+ lhq.pool = port->mem_pool;
- ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq);
+ ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq);
- if (nxt_slow_path(ret != NXT_OK)) {
- nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to delete "
- "peer %PI", reg->peer);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to delete "
+ "peer %PI", reg->peer);
+ }
+ } else {
+ nxt_queue_remove(&reg->link);
}
- } else {
- nxt_queue_remove(&reg->link);
}
nxt_mp_free(port->mem_pool, reg);