diff options
author | Max Romanov <max.romanov@nginx.com> | 2020-12-18 00:25:27 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2020-12-18 00:25:27 +0300 |
commit | 7389a50835696fe256c5decf31bec129f1d59bbf (patch) | |
tree | c067cade372516da24d997c3955a639a61c5a223 | |
parent | c0449e13f80312a2e09f643e1a69e536384eae79 (diff) | |
download | unit-7389a50835696fe256c5decf31bec129f1d59bbf.tar.gz unit-7389a50835696fe256c5decf31bec129f1d59bbf.tar.bz2 |
Limiting app queue notifications count in socket.
Under high load, a queue synchonization issue may occur, starting from the
steady state when an app queue message is dequeued immediately after it has been
enqueued. In this state, the router always puts the first message in the queue
and is forced to notify the app about a new message in an empty queue using a
socket pair. On the other hand, the application dequeues and processes the
message without reading the notification from the socket, so the socket buffer
overflows with notifications.
The issue was reproduced during Unit load tests. After a socket buffer
overflow, the router is unable to notify the app about a new first message.
When another message is enqueued, a notification is not required, so the queue
grows without being read by the app. As a result, request processing stops.
This patch changes the notification algorithm by counting the notifications in
the pipe instead of getting the number of messages in the queue.
-rw-r--r-- | docs/changes.xml | 7 | ||||
-rw-r--r-- | src/nxt_app_queue.h | 18 | ||||
-rw-r--r-- | src/nxt_unit.c | 7 |
3 files changed, 25 insertions, 7 deletions
diff --git a/docs/changes.xml b/docs/changes.xml index ca4ffe5f..f157e9bf 100644 --- a/docs/changes.xml +++ b/docs/changes.xml @@ -41,6 +41,13 @@ the router process could crash with multithreaded applications under high load. </para> </change> +<change type="bugfix"> +<para> +applications could stop processing new requests under high load; the bug had +appeared in 1.19.0. +</para> +</change> + </changes> diff --git a/src/nxt_app_queue.h b/src/nxt_app_queue.h index 127cb8f3..a1cc2f11 100644 --- a/src/nxt_app_queue.h +++ b/src/nxt_app_queue.h @@ -23,7 +23,7 @@ typedef struct { typedef struct { - nxt_app_nncq_atomic_t nitems; + nxt_app_nncq_atomic_t notified; nxt_app_nncq_t free_items; nxt_app_nncq_t queue; nxt_app_queue_item_t items[NXT_APP_QUEUE_SIZE]; @@ -42,7 +42,7 @@ nxt_app_queue_init(nxt_app_queue_t volatile *q) nxt_app_nncq_enqueue(&q->free_items, i); } - q->nitems = 0; + q->notified = 0; } @@ -50,6 +50,7 @@ nxt_inline nxt_int_t nxt_app_queue_send(nxt_app_queue_t volatile *q, const void *p, uint8_t size, uint32_t tracking, int *notify, uint32_t *cookie) { + int n; nxt_app_queue_item_t *qi; nxt_app_nncq_atomic_t i; @@ -67,16 +68,23 @@ nxt_app_queue_send(nxt_app_queue_t volatile *q, const void *p, nxt_app_nncq_enqueue(&q->queue, i); - i = nxt_atomic_fetch_add(&q->nitems, 1); + n = nxt_atomic_cmp_set(&q->notified, 0, 1); if (notify != NULL) { - *notify = (i == 0); + *notify = n; } return NXT_OK; } +nxt_inline void +nxt_app_queue_notification_received(nxt_app_queue_t volatile *q) +{ + q->notified = 0; +} + + nxt_inline nxt_bool_t nxt_app_queue_cancel(nxt_app_queue_t volatile *q, uint32_t cookie, uint32_t tracking) @@ -110,8 +118,6 @@ nxt_app_queue_recv(nxt_app_queue_t volatile *q, void *p, uint32_t *cookie) nxt_app_nncq_enqueue(&q->free_items, i); - nxt_atomic_fetch_add(&q->nitems, -1); - return res; } diff --git a/src/nxt_unit.c b/src/nxt_unit.c index b6904ce9..2cdc75f8 100644 --- a/src/nxt_unit.c +++ b/src/nxt_unit.c @@ -6092,7 +6092,10 @@ static int nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf) { - int res; + int res; + nxt_unit_port_impl_t *port_impl; + + port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); retry: @@ -6105,6 +6108,8 @@ retry: } if (nxt_unit_is_read_queue(rbuf)) { + nxt_app_queue_notification_received(port_impl->queue); + nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue", (int) port->id.pid, (int) port->id.id, (int) rbuf->size); |