summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_unit.c
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2020-12-18 00:25:27 +0300
committerMax Romanov <max.romanov@nginx.com>2020-12-18 00:25:27 +0300
commit7389a50835696fe256c5decf31bec129f1d59bbf (patch)
treec067cade372516da24d997c3955a639a61c5a223 /src/nxt_unit.c
parentc0449e13f80312a2e09f643e1a69e536384eae79 (diff)
downloadunit-7389a50835696fe256c5decf31bec129f1d59bbf.tar.gz
unit-7389a50835696fe256c5decf31bec129f1d59bbf.tar.bz2
Limiting app queue notifications count in socket.
Under high load, a queue synchonization issue may occur, starting from the steady state when an app queue message is dequeued immediately after it has been enqueued. In this state, the router always puts the first message in the queue and is forced to notify the app about a new message in an empty queue using a socket pair. On the other hand, the application dequeues and processes the message without reading the notification from the socket, so the socket buffer overflows with notifications. The issue was reproduced during Unit load tests. After a socket buffer overflow, the router is unable to notify the app about a new first message. When another message is enqueued, a notification is not required, so the queue grows without being read by the app. As a result, request processing stops. This patch changes the notification algorithm by counting the notifications in the pipe instead of getting the number of messages in the queue.
Diffstat (limited to 'src/nxt_unit.c')
-rw-r--r--src/nxt_unit.c7
1 files changed, 6 insertions, 1 deletions
diff --git a/src/nxt_unit.c b/src/nxt_unit.c
index b6904ce9..2cdc75f8 100644
--- a/src/nxt_unit.c
+++ b/src/nxt_unit.c
@@ -6092,7 +6092,10 @@ static int
nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
nxt_unit_read_buf_t *rbuf)
{
- int res;
+ int res;
+ nxt_unit_port_impl_t *port_impl;
+
+ port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
retry:
@@ -6105,6 +6108,8 @@ retry:
}
if (nxt_unit_is_read_queue(rbuf)) {
+ nxt_app_queue_notification_received(port_impl->queue);
+
nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue",
(int) port->id.pid, (int) port->id.id, (int) rbuf->size);