diff options
author | Max Romanov <max.romanov@nginx.com> | 2017-12-27 17:47:18 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2017-12-27 17:47:18 +0300 |
commit | bef2ec483eaa525da770c024069b31320b63977e (patch) | |
tree | a0e89da5619533e84a045ae357b9fb44cb250c30 /src/nxt_port_rpc.c | |
parent | ab138c91661aa9b3ba36fb9a1a2154461a1e2372 (diff) | |
download | unit-bef2ec483eaa525da770c024069b31320b63977e.tar.gz unit-bef2ec483eaa525da770c024069b31320b63977e.tar.bz2 |
Fixing application timeout.
Application timeout limits maximum time of worker response in processing
particular request. Not including the time required to start worker,
time in request queue etc.
Diffstat (limited to 'src/nxt_port_rpc.c')
-rw-r--r-- | src/nxt_port_rpc.c | 55 |
1 files changed, 37 insertions, 18 deletions
diff --git a/src/nxt_port_rpc.c b/src/nxt_port_rpc.c index 04de7775..15c550f4 100644 --- a/src/nxt_port_rpc.c +++ b/src/nxt_port_rpc.c @@ -25,6 +25,11 @@ struct nxt_port_rpc_reg_s { }; +static void +nxt_port_rpc_remove_from_peers(nxt_task_t *task, nxt_port_t *port, + nxt_port_rpc_reg_t *reg); + + static nxt_int_t nxt_rpc_reg_test(nxt_lvlhsh_query_t *lhq, void *data) { @@ -168,10 +173,17 @@ nxt_port_rpc_ex_set_peer(nxt_task_t *task, nxt_port_t *port, nxt_assert(reg->data == ex); - if (peer == -1 || reg->peer != -1) { - nxt_log_error(NXT_LOG_ERR, task->log, "rpc: stream #%uD failed to " - "change peer %PI->%PI", reg->stream, reg->peer, peer); + if (nxt_slow_path(peer == reg->peer)) { + return; + } + + if (reg->peer != -1) { + nxt_port_rpc_remove_from_peers(task, port, reg); + + reg->peer = -1; + } + if (peer == -1) { return; } @@ -359,7 +371,6 @@ nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer) msg.port_msg.pid = peer; msg.port_msg.type = _NXT_PORT_MSG_REMOVE_PID; - msg.port_msg.last = 1; peer_link = lhq.value; last = 0; @@ -375,20 +386,7 @@ nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer) nxt_debug(task, "rpc: stream #%uD trigger error", stream); msg.port_msg.stream = stream; - - reg->error_handler(task, &msg, reg->data); - - nxt_port_rpc_lhq_stream(&lhq, &stream); - lhq.pool = port->mem_pool; - - ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq); - - if (nxt_slow_path(ret != NXT_OK)) { - nxt_log_error(NXT_LOG_ERR, task->log, - "rpc: stream #%uD failed to delete handler", stream); - - return; - } + msg.port_msg.last = 1; if (peer_link == peer_link->next) { nxt_assert(peer_link->prev == peer_link); @@ -405,6 +403,27 @@ nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer) peer_link = next_link; } + reg->peer = -1; + + reg->error_handler(task, &msg, reg->data); + + /* Reset 'last' flag to preserve rpc handler. */ + if (msg.port_msg.last == 0) { + continue; + } + + nxt_port_rpc_lhq_stream(&lhq, &stream); + lhq.pool = port->mem_pool; + + ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq); + + if (nxt_slow_path(ret != NXT_OK)) { + nxt_log_error(NXT_LOG_ERR, task->log, + "rpc: stream #%uD failed to delete handler", stream); + + return; + } + nxt_mp_free(port->mem_pool, reg); } } |