diff options
author | Max Romanov <max.romanov@nginx.com> | 2017-08-11 18:04:04 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2017-08-11 18:04:04 +0300 |
commit | 162afe4719afcba65b5477a75cee4cbba8874892 (patch) | |
tree | 2644c6393d8be295329489476a4dffbcfd1852bd /src/nxt_port_rpc.c | |
parent | 1b354421c33917239a793e891e3d43e02566e550 (diff) | |
download | unit-162afe4719afcba65b5477a75cee4cbba8874892.tar.gz unit-162afe4719afcba65b5477a75cee4cbba8874892.tar.bz2 |
RPC: peer pid special value -1 may be used if pid is unknown.
Diffstat (limited to 'src/nxt_port_rpc.c')
-rw-r--r-- | src/nxt_port_rpc.c | 79 |
1 files changed, 43 insertions, 36 deletions
diff --git a/src/nxt_port_rpc.c b/src/nxt_port_rpc.c index e6126748..6f31a296 100644 --- a/src/nxt_port_rpc.c +++ b/src/nxt_port_rpc.c @@ -107,27 +107,28 @@ nxt_port_rpc_register_handler(nxt_task_t *task, nxt_port_t *port, return 0; } + if (peer != -1) { + nxt_port_rpc_lhq_peer(&lhq, &peer); + lhq.replace = 0; + lhq.value = ®->link; + lhq.pool = port->mem_pool; - nxt_port_rpc_lhq_peer(&lhq, &peer); - lhq.replace = 0; - lhq.value = ®->link; - lhq.pool = port->mem_pool; - - switch (nxt_lvlhsh_insert(&port->rpc_peers, &lhq)) { + switch (nxt_lvlhsh_insert(&port->rpc_peers, &lhq)) { - case NXT_OK: - nxt_queue_self(®->link); - break; + case NXT_OK: + nxt_queue_self(®->link); + break; - case NXT_DECLINED: - peer_link = lhq.value; - nxt_queue_insert_before(peer_link, ®->link); - break; + case NXT_DECLINED: + peer_link = lhq.value; + nxt_queue_insert_before(peer_link, ®->link); + break; - default: - nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to add peer " - "for stream #%uD", stream); - break; + default: + nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to add peer " + "for stream #%uD", stream); + break; + } } return stream; @@ -169,7 +170,9 @@ nxt_port_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) reg = lhq.value; - nxt_assert(reg->peer == msg->port_msg.pid); + if (reg->peer != -1) { + nxt_assert(reg->peer == msg->port_msg.pid); + } if (type == _NXT_PORT_MSG_RPC_ERROR) { reg->error_handler(task, msg, reg->data); @@ -183,18 +186,20 @@ nxt_port_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) return; } - if (reg->link.next == ®->link) { - nxt_port_rpc_lhq_peer(&lhq, ®->peer); - lhq.pool = port->mem_pool; + if (reg->peer != -1) { + if (reg->link.next == ®->link) { + nxt_port_rpc_lhq_peer(&lhq, ®->peer); + lhq.pool = port->mem_pool; - ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq); + ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq); - if (nxt_slow_path(ret != NXT_OK)) { - nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to delete " - "peer %PI", reg->peer); + if (nxt_slow_path(ret != NXT_OK)) { + nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to delete " + "peer %PI", reg->peer); + } + } else { + nxt_queue_remove(®->link); } - } else { - nxt_queue_remove(®->link); } nxt_mp_free(port->mem_pool, reg); @@ -296,18 +301,20 @@ nxt_port_rpc_cancel(nxt_task_t *task, nxt_port_t *port, uint32_t stream) reg = lhq.value; - if (reg->link.next == ®->link) { - nxt_port_rpc_lhq_peer(&lhq, ®->peer); - lhq.pool = port->mem_pool; + if (reg->peer != -1) { + if (reg->link.next == ®->link) { + nxt_port_rpc_lhq_peer(&lhq, ®->peer); + lhq.pool = port->mem_pool; - ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq); + ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq); - if (nxt_slow_path(ret != NXT_OK)) { - nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to delete " - "peer %PI", reg->peer); + if (nxt_slow_path(ret != NXT_OK)) { + nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to delete " + "peer %PI", reg->peer); + } + } else { + nxt_queue_remove(®->link); } - } else { - nxt_queue_remove(®->link); } nxt_mp_free(port->mem_pool, reg); |