diff options
Diffstat (limited to 'src/nxt_port_rpc.c')
-rw-r--r-- | src/nxt_port_rpc.c | 261 |
1 files changed, 188 insertions, 73 deletions
diff --git a/src/nxt_port_rpc.c b/src/nxt_port_rpc.c index c5821439..04de7775 100644 --- a/src/nxt_port_rpc.c +++ b/src/nxt_port_rpc.c @@ -8,6 +8,8 @@ #include <nxt_port_rpc.h> +static nxt_atomic_t nxt_stream_ident = 1; + typedef struct nxt_port_rpc_reg_s nxt_port_rpc_reg_t; struct nxt_port_rpc_reg_s { @@ -15,6 +17,7 @@ struct nxt_port_rpc_reg_s { nxt_pid_t peer; nxt_queue_link_t link; + nxt_bool_t link_first; nxt_port_rpc_handler_t ready_handler; nxt_port_rpc_handler_t error_handler; @@ -62,30 +65,57 @@ nxt_port_rpc_register_handler(nxt_task_t *task, nxt_port_t *port, nxt_port_rpc_handler_t ready_handler, nxt_port_rpc_handler_t error_handler, nxt_pid_t peer, void *data) { + void *ex; + nxt_port_rpc_reg_t *reg; + + ex = nxt_port_rpc_register_handler_ex(task, port, ready_handler, + error_handler, 0); + + if (ex == NULL) { + return 0; + } + + if (peer != -1) { + nxt_port_rpc_ex_set_peer(task, port, ex, peer); + } + + reg = nxt_pointer_to(ex, -sizeof(nxt_port_rpc_reg_t)); + + nxt_assert(reg->data == ex); + + reg->data = data; + + return reg->stream; +} + + +void * +nxt_port_rpc_register_handler_ex(nxt_task_t *task, nxt_port_t *port, + nxt_port_rpc_handler_t ready_handler, nxt_port_rpc_handler_t error_handler, + size_t ex_size) +{ uint32_t stream; - nxt_queue_link_t *peer_link; nxt_port_rpc_reg_t *reg; nxt_lvlhsh_query_t lhq; nxt_assert(port->pair[0] != -1); - stream = port->next_stream++; + stream = + (uint32_t) nxt_atomic_fetch_add(&nxt_stream_ident, 1) & 0x3fffffff; - reg = nxt_mp_zalloc(port->mem_pool, sizeof(nxt_port_rpc_reg_t)); + reg = nxt_mp_zalloc(port->mem_pool, sizeof(nxt_port_rpc_reg_t) + ex_size); if (nxt_slow_path(reg == NULL)) { - nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to allocate " - "reg for stream #%uD", stream); + nxt_debug(task, "rpc: stream #%uD failed to allocate reg", stream); - return 0; + return NULL; } reg->stream = stream; - reg->peer = peer; + reg->peer = -1; reg->ready_handler = ready_handler; reg->error_handler = error_handler; - reg->data = data; - + reg->data = reg + 1; nxt_port_rpc_lhq_stream(&lhq, &stream); lhq.replace = 0; @@ -98,39 +128,140 @@ nxt_port_rpc_register_handler(nxt_task_t *task, nxt_port_t *port, break; default: - nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to add handler " - "for stream #%uD", stream); + nxt_log_error(NXT_LOG_ERR, task->log, "rpc: stream #%uD failed to add " + "reg ", stream); nxt_mp_free(port->mem_pool, reg); - return 0; + return NULL; } - if (peer != -1) { - nxt_port_rpc_lhq_peer(&lhq, &peer); - lhq.replace = 0; - lhq.value = ®->link; + nxt_debug(task, "rpc: stream #%uD registered", stream); + + return reg->data; +} + + +uint32_t +nxt_port_rpc_ex_stream(void *ex) +{ + nxt_port_rpc_reg_t *reg; + + reg = nxt_pointer_to(ex, -sizeof(nxt_port_rpc_reg_t)); + + nxt_assert(reg->data == ex); + + return reg->stream; +} + + +void +nxt_port_rpc_ex_set_peer(nxt_task_t *task, nxt_port_t *port, + void *ex, nxt_pid_t peer) +{ + nxt_int_t ret; + nxt_queue_link_t *peer_link; + nxt_port_rpc_reg_t *reg; + nxt_lvlhsh_query_t lhq; + + reg = nxt_pointer_to(ex, -sizeof(nxt_port_rpc_reg_t)); + + nxt_assert(reg->data == ex); + + if (peer == -1 || reg->peer != -1) { + nxt_log_error(NXT_LOG_ERR, task->log, "rpc: stream #%uD failed to " + "change peer %PI->%PI", reg->stream, reg->peer, peer); + + return; + } + + reg->peer = peer; + + nxt_port_rpc_lhq_peer(&lhq, &peer); + lhq.replace = 0; + lhq.value = ®->link; + lhq.pool = port->mem_pool; + + ret = nxt_lvlhsh_insert(&port->rpc_peers, &lhq); + + switch (ret) { + + case NXT_OK: + reg->link_first = 1; + nxt_queue_self(®->link); + + nxt_debug(task, "rpc: stream #%uD assigned uniq pid %PI (%p)", + reg->stream, reg->peer, reg->link.next); + break; + + case NXT_DECLINED: + reg->link_first = 0; + peer_link = lhq.value; + nxt_queue_insert_after(peer_link, ®->link); + + nxt_debug(task, "rpc: stream #%uD assigned duplicate pid %PI (%p)", + reg->stream, reg->peer, reg->link.next); + break; + + default: + nxt_log_error(NXT_LOG_ERR, task->log, "rpc: stream #%uD failed to add " + "peer for stream #%uD (%d)", reg->stream, ret); + + reg->peer = -1; + break; + } + +} + + +static void +nxt_port_rpc_remove_from_peers(nxt_task_t *task, nxt_port_t *port, + nxt_port_rpc_reg_t *reg) +{ + uint32_t stream; + nxt_int_t ret; + nxt_lvlhsh_query_t lhq; + nxt_port_rpc_reg_t *r; + + stream = reg->stream; + + if (reg->link_first != 0) { + nxt_port_rpc_lhq_peer(&lhq, ®->peer); lhq.pool = port->mem_pool; - switch (nxt_lvlhsh_insert(&port->rpc_peers, &lhq)) { + if (reg->link.next == ®->link) { + nxt_assert(reg->link.prev == ®->link); - case NXT_OK: - nxt_queue_self(®->link); - break; + nxt_debug(task, "rpc: stream #%uD remove first and last pid %PI " + "registration (%p)", stream, reg->peer, reg->link.next); - case NXT_DECLINED: - peer_link = lhq.value; - nxt_queue_insert_before(peer_link, ®->link); - break; + ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq); + } else { + nxt_debug(task, "rpc: stream #%uD remove first pid %PI " + "registration (%p)", stream, reg->peer, reg->link.next); + + lhq.replace = 1; + lhq.value = reg->link.next; - default: - nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to add peer " - "for stream #%uD", stream); - break; + r = nxt_queue_link_data(reg->link.next, nxt_port_rpc_reg_t, link); + r->link_first = 1; + + nxt_queue_remove(®->link); + + ret = nxt_lvlhsh_insert(&port->rpc_peers, &lhq); } + } else { + nxt_debug(task, "rpc: stream #%uD remove pid %PI " + "registration (%p)", stream, reg->peer, reg->link.next); + + nxt_queue_remove(®->link); + ret = NXT_OK; } - return stream; + if (nxt_slow_path(ret != NXT_OK)) { + nxt_log_error(NXT_LOG_ERR, task->log, "rpc: stream #%uD failed" + " to delete peer %PI (%d)", stream, reg->peer, ret); + } } @@ -150,8 +281,6 @@ nxt_port_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) last = msg->port_msg.last; type = msg->port_msg.type; - nxt_debug(task, "rpc: handler for stream #%uD, type %d", stream, type); - nxt_port_rpc_lhq_stream(&lhq, &stream); lhq.pool = port->mem_pool; @@ -163,11 +292,14 @@ nxt_port_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) } if (ret != NXT_OK) { - nxt_debug(task, "rpc: no handler found for stream #%uD", stream); + nxt_debug(task, "rpc: stream #%uD no handler found", stream); return; } + nxt_debug(task, "rpc: stream #%uD %shandler, type %d", stream, + (last ? "last " : ""), type); + reg = lhq.value; if (reg->peer != -1) { @@ -182,28 +314,15 @@ nxt_port_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) } if (last == 0) { - nxt_debug(task, "rpc: keep handler for stream #%uD", stream); - return; } if (reg->peer != -1) { - if (reg->link.next == ®->link) { - nxt_port_rpc_lhq_peer(&lhq, ®->peer); - lhq.pool = port->mem_pool; - - ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq); - - if (nxt_slow_path(ret != NXT_OK)) { - nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to delete " - "peer %PI", reg->peer); - } - - } else { - nxt_queue_remove(®->link); - } + nxt_port_rpc_remove_from_peers(task, port, reg); } + nxt_debug(task, "rpc: stream #%uD free registration", stream); + nxt_mp_free(port->mem_pool, reg); } @@ -215,7 +334,7 @@ nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer) uint32_t stream; nxt_int_t ret; nxt_buf_t buf; - nxt_queue_link_t *peer_link; + nxt_queue_link_t *peer_link, *next_link; nxt_port_rpc_reg_t *reg; nxt_lvlhsh_query_t lhq; nxt_port_recv_msg_t msg; @@ -226,7 +345,7 @@ nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer) ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq); if (nxt_slow_path(ret != NXT_OK)) { - nxt_debug(task, "rpc: no handler found for peer %PI", peer); + nxt_debug(task, "rpc: no reg found for peer %PI", peer); return; } @@ -249,14 +368,15 @@ nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer) reg = nxt_queue_link_data(peer_link, nxt_port_rpc_reg_t, link); - nxt_debug(task, "rpc: trigger error for stream #%uD", reg->stream); + nxt_assert(reg->peer == peer); - msg.port_msg.stream = reg->stream; + stream = reg->stream; - reg->error_handler(task, &msg, reg->data); + nxt_debug(task, "rpc: stream #%uD trigger error", stream); + msg.port_msg.stream = stream; - stream = reg->stream; + reg->error_handler(task, &msg, reg->data); nxt_port_rpc_lhq_stream(&lhq, &stream); lhq.pool = port->mem_pool; @@ -265,18 +385,24 @@ nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer) if (nxt_slow_path(ret != NXT_OK)) { nxt_log_error(NXT_LOG_ERR, task->log, - "rpc: failed to delete handler for stream #%uD", - stream); + "rpc: stream #%uD failed to delete handler", stream); return; } if (peer_link == peer_link->next) { + nxt_assert(peer_link->prev == peer_link); + last = 1; } else { - peer_link = peer_link->next; - nxt_queue_remove(peer_link->prev); + nxt_assert(peer_link->next->prev == peer_link); + nxt_assert(peer_link->prev->next == peer_link); + + next_link = peer_link->next; + nxt_queue_remove(peer_link); + + peer_link = next_link; } nxt_mp_free(port->mem_pool, reg); @@ -297,7 +423,7 @@ nxt_port_rpc_cancel(nxt_task_t *task, nxt_port_t *port, uint32_t stream) ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq); if (ret != NXT_OK) { - nxt_debug(task, "rpc: no handler found for stream %uxD", stream); + nxt_debug(task, "rpc: stream #%uD no handler found", stream); return; } @@ -305,21 +431,10 @@ nxt_port_rpc_cancel(nxt_task_t *task, nxt_port_t *port, uint32_t stream) reg = lhq.value; if (reg->peer != -1) { - if (reg->link.next == ®->link) { - nxt_port_rpc_lhq_peer(&lhq, ®->peer); - lhq.pool = port->mem_pool; - - ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq); - - if (nxt_slow_path(ret != NXT_OK)) { - nxt_log_error(NXT_LOG_ERR, task->log, - "rpc: failed to delete peer %PI", reg->peer); - } - - } else { - nxt_queue_remove(®->link); - } + nxt_port_rpc_remove_from_peers(task, port, reg); } + nxt_debug(task, "rpc: stream #%uD cancel registration", stream); + nxt_mp_free(port->mem_pool, reg); } |