summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_port_rpc.c
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2017-12-27 17:47:18 +0300
committerMax Romanov <max.romanov@nginx.com>2017-12-27 17:47:18 +0300
commitbef2ec483eaa525da770c024069b31320b63977e (patch)
treea0e89da5619533e84a045ae357b9fb44cb250c30 /src/nxt_port_rpc.c
parentab138c91661aa9b3ba36fb9a1a2154461a1e2372 (diff)
downloadunit-bef2ec483eaa525da770c024069b31320b63977e.tar.gz
unit-bef2ec483eaa525da770c024069b31320b63977e.tar.bz2
Fixing application timeout.
Application timeout limits maximum time of worker response in processing particular request. Not including the time required to start worker, time in request queue etc.
Diffstat (limited to 'src/nxt_port_rpc.c')
-rw-r--r--src/nxt_port_rpc.c55
1 files changed, 37 insertions, 18 deletions
diff --git a/src/nxt_port_rpc.c b/src/nxt_port_rpc.c
index 04de7775..15c550f4 100644
--- a/src/nxt_port_rpc.c
+++ b/src/nxt_port_rpc.c
@@ -25,6 +25,11 @@ struct nxt_port_rpc_reg_s {
};
+static void
+nxt_port_rpc_remove_from_peers(nxt_task_t *task, nxt_port_t *port,
+ nxt_port_rpc_reg_t *reg);
+
+
static nxt_int_t
nxt_rpc_reg_test(nxt_lvlhsh_query_t *lhq, void *data)
{
@@ -168,10 +173,17 @@ nxt_port_rpc_ex_set_peer(nxt_task_t *task, nxt_port_t *port,
nxt_assert(reg->data == ex);
- if (peer == -1 || reg->peer != -1) {
- nxt_log_error(NXT_LOG_ERR, task->log, "rpc: stream #%uD failed to "
- "change peer %PI->%PI", reg->stream, reg->peer, peer);
+ if (nxt_slow_path(peer == reg->peer)) {
+ return;
+ }
+
+ if (reg->peer != -1) {
+ nxt_port_rpc_remove_from_peers(task, port, reg);
+
+ reg->peer = -1;
+ }
+ if (peer == -1) {
return;
}
@@ -359,7 +371,6 @@ nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer)
msg.port_msg.pid = peer;
msg.port_msg.type = _NXT_PORT_MSG_REMOVE_PID;
- msg.port_msg.last = 1;
peer_link = lhq.value;
last = 0;
@@ -375,20 +386,7 @@ nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer)
nxt_debug(task, "rpc: stream #%uD trigger error", stream);
msg.port_msg.stream = stream;
-
- reg->error_handler(task, &msg, reg->data);
-
- nxt_port_rpc_lhq_stream(&lhq, &stream);
- lhq.pool = port->mem_pool;
-
- ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq);
-
- if (nxt_slow_path(ret != NXT_OK)) {
- nxt_log_error(NXT_LOG_ERR, task->log,
- "rpc: stream #%uD failed to delete handler", stream);
-
- return;
- }
+ msg.port_msg.last = 1;
if (peer_link == peer_link->next) {
nxt_assert(peer_link->prev == peer_link);
@@ -405,6 +403,27 @@ nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer)
peer_link = next_link;
}
+ reg->peer = -1;
+
+ reg->error_handler(task, &msg, reg->data);
+
+ /* Reset 'last' flag to preserve rpc handler. */
+ if (msg.port_msg.last == 0) {
+ continue;
+ }
+
+ nxt_port_rpc_lhq_stream(&lhq, &stream);
+ lhq.pool = port->mem_pool;
+
+ ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq);
+
+ if (nxt_slow_path(ret != NXT_OK)) {
+ nxt_log_error(NXT_LOG_ERR, task->log,
+ "rpc: stream #%uD failed to delete handler", stream);
+
+ return;
+ }
+
nxt_mp_free(port->mem_pool, reg);
}
}