summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_port_rpc.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nxt_port_rpc.c')
-rw-r--r--src/nxt_port_rpc.c55
1 files changed, 37 insertions, 18 deletions
diff --git a/src/nxt_port_rpc.c b/src/nxt_port_rpc.c
index 04de7775..15c550f4 100644
--- a/src/nxt_port_rpc.c
+++ b/src/nxt_port_rpc.c
@@ -25,6 +25,11 @@ struct nxt_port_rpc_reg_s {
};
+static void
+nxt_port_rpc_remove_from_peers(nxt_task_t *task, nxt_port_t *port,
+ nxt_port_rpc_reg_t *reg);
+
+
static nxt_int_t
nxt_rpc_reg_test(nxt_lvlhsh_query_t *lhq, void *data)
{
@@ -168,10 +173,17 @@ nxt_port_rpc_ex_set_peer(nxt_task_t *task, nxt_port_t *port,
nxt_assert(reg->data == ex);
- if (peer == -1 || reg->peer != -1) {
- nxt_log_error(NXT_LOG_ERR, task->log, "rpc: stream #%uD failed to "
- "change peer %PI->%PI", reg->stream, reg->peer, peer);
+ if (nxt_slow_path(peer == reg->peer)) {
+ return;
+ }
+
+ if (reg->peer != -1) {
+ nxt_port_rpc_remove_from_peers(task, port, reg);
+
+ reg->peer = -1;
+ }
+ if (peer == -1) {
return;
}
@@ -359,7 +371,6 @@ nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer)
msg.port_msg.pid = peer;
msg.port_msg.type = _NXT_PORT_MSG_REMOVE_PID;
- msg.port_msg.last = 1;
peer_link = lhq.value;
last = 0;
@@ -375,20 +386,7 @@ nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer)
nxt_debug(task, "rpc: stream #%uD trigger error", stream);
msg.port_msg.stream = stream;
-
- reg->error_handler(task, &msg, reg->data);
-
- nxt_port_rpc_lhq_stream(&lhq, &stream);
- lhq.pool = port->mem_pool;
-
- ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq);
-
- if (nxt_slow_path(ret != NXT_OK)) {
- nxt_log_error(NXT_LOG_ERR, task->log,
- "rpc: stream #%uD failed to delete handler", stream);
-
- return;
- }
+ msg.port_msg.last = 1;
if (peer_link == peer_link->next) {
nxt_assert(peer_link->prev == peer_link);
@@ -405,6 +403,27 @@ nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer)
peer_link = next_link;
}
+ reg->peer = -1;
+
+ reg->error_handler(task, &msg, reg->data);
+
+ /* Reset 'last' flag to preserve rpc handler. */
+ if (msg.port_msg.last == 0) {
+ continue;
+ }
+
+ nxt_port_rpc_lhq_stream(&lhq, &stream);
+ lhq.pool = port->mem_pool;
+
+ ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq);
+
+ if (nxt_slow_path(ret != NXT_OK)) {
+ nxt_log_error(NXT_LOG_ERR, task->log,
+ "rpc: stream #%uD failed to delete handler", stream);
+
+ return;
+ }
+
nxt_mp_free(port->mem_pool, reg);
}
}