diff options
Diffstat (limited to 'src/nxt_port_rpc.c')
-rw-r--r-- | src/nxt_port_rpc.c | 55 |
1 files changed, 37 insertions, 18 deletions
diff --git a/src/nxt_port_rpc.c b/src/nxt_port_rpc.c index 04de7775..15c550f4 100644 --- a/src/nxt_port_rpc.c +++ b/src/nxt_port_rpc.c @@ -25,6 +25,11 @@ struct nxt_port_rpc_reg_s { }; +static void +nxt_port_rpc_remove_from_peers(nxt_task_t *task, nxt_port_t *port, + nxt_port_rpc_reg_t *reg); + + static nxt_int_t nxt_rpc_reg_test(nxt_lvlhsh_query_t *lhq, void *data) { @@ -168,10 +173,17 @@ nxt_port_rpc_ex_set_peer(nxt_task_t *task, nxt_port_t *port, nxt_assert(reg->data == ex); - if (peer == -1 || reg->peer != -1) { - nxt_log_error(NXT_LOG_ERR, task->log, "rpc: stream #%uD failed to " - "change peer %PI->%PI", reg->stream, reg->peer, peer); + if (nxt_slow_path(peer == reg->peer)) { + return; + } + + if (reg->peer != -1) { + nxt_port_rpc_remove_from_peers(task, port, reg); + + reg->peer = -1; + } + if (peer == -1) { return; } @@ -359,7 +371,6 @@ nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer) msg.port_msg.pid = peer; msg.port_msg.type = _NXT_PORT_MSG_REMOVE_PID; - msg.port_msg.last = 1; peer_link = lhq.value; last = 0; @@ -375,20 +386,7 @@ nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer) nxt_debug(task, "rpc: stream #%uD trigger error", stream); msg.port_msg.stream = stream; - - reg->error_handler(task, &msg, reg->data); - - nxt_port_rpc_lhq_stream(&lhq, &stream); - lhq.pool = port->mem_pool; - - ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq); - - if (nxt_slow_path(ret != NXT_OK)) { - nxt_log_error(NXT_LOG_ERR, task->log, - "rpc: stream #%uD failed to delete handler", stream); - - return; - } + msg.port_msg.last = 1; if (peer_link == peer_link->next) { nxt_assert(peer_link->prev == peer_link); @@ -405,6 +403,27 @@ nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer) peer_link = next_link; } + reg->peer = -1; + + reg->error_handler(task, &msg, reg->data); + + /* Reset 'last' flag to preserve rpc handler. */ + if (msg.port_msg.last == 0) { + continue; + } + + nxt_port_rpc_lhq_stream(&lhq, &stream); + lhq.pool = port->mem_pool; + + ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq); + + if (nxt_slow_path(ret != NXT_OK)) { + nxt_log_error(NXT_LOG_ERR, task->log, + "rpc: stream #%uD failed to delete handler", stream); + + return; + } + nxt_mp_free(port->mem_pool, reg); } } |