summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--docs/changes.xml7
-rw-r--r--src/nxt_app_queue.h18
-rw-r--r--src/nxt_unit.c7
3 files changed, 25 insertions, 7 deletions
diff --git a/docs/changes.xml b/docs/changes.xml
index ca4ffe5f..f157e9bf 100644
--- a/docs/changes.xml
+++ b/docs/changes.xml
@@ -41,6 +41,13 @@ the router process could crash with multithreaded applications under high load.
</para>
</change>
+<change type="bugfix">
+<para>
+applications could stop processing new requests under high load; the bug had
+appeared in 1.19.0.
+</para>
+</change>
+
</changes>
diff --git a/src/nxt_app_queue.h b/src/nxt_app_queue.h
index 127cb8f3..a1cc2f11 100644
--- a/src/nxt_app_queue.h
+++ b/src/nxt_app_queue.h
@@ -23,7 +23,7 @@ typedef struct {
typedef struct {
- nxt_app_nncq_atomic_t nitems;
+ nxt_app_nncq_atomic_t notified;
nxt_app_nncq_t free_items;
nxt_app_nncq_t queue;
nxt_app_queue_item_t items[NXT_APP_QUEUE_SIZE];
@@ -42,7 +42,7 @@ nxt_app_queue_init(nxt_app_queue_t volatile *q)
nxt_app_nncq_enqueue(&q->free_items, i);
}
- q->nitems = 0;
+ q->notified = 0;
}
@@ -50,6 +50,7 @@ nxt_inline nxt_int_t
nxt_app_queue_send(nxt_app_queue_t volatile *q, const void *p,
uint8_t size, uint32_t tracking, int *notify, uint32_t *cookie)
{
+ int n;
nxt_app_queue_item_t *qi;
nxt_app_nncq_atomic_t i;
@@ -67,16 +68,23 @@ nxt_app_queue_send(nxt_app_queue_t volatile *q, const void *p,
nxt_app_nncq_enqueue(&q->queue, i);
- i = nxt_atomic_fetch_add(&q->nitems, 1);
+ n = nxt_atomic_cmp_set(&q->notified, 0, 1);
if (notify != NULL) {
- *notify = (i == 0);
+ *notify = n;
}
return NXT_OK;
}
+nxt_inline void
+nxt_app_queue_notification_received(nxt_app_queue_t volatile *q)
+{
+ q->notified = 0;
+}
+
+
nxt_inline nxt_bool_t
nxt_app_queue_cancel(nxt_app_queue_t volatile *q, uint32_t cookie,
uint32_t tracking)
@@ -110,8 +118,6 @@ nxt_app_queue_recv(nxt_app_queue_t volatile *q, void *p, uint32_t *cookie)
nxt_app_nncq_enqueue(&q->free_items, i);
- nxt_atomic_fetch_add(&q->nitems, -1);
-
return res;
}
diff --git a/src/nxt_unit.c b/src/nxt_unit.c
index b6904ce9..2cdc75f8 100644
--- a/src/nxt_unit.c
+++ b/src/nxt_unit.c
@@ -6092,7 +6092,10 @@ static int
nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
nxt_unit_read_buf_t *rbuf)
{
- int res;
+ int res;
+ nxt_unit_port_impl_t *port_impl;
+
+ port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
retry:
@@ -6105,6 +6108,8 @@ retry:
}
if (nxt_unit_is_read_queue(rbuf)) {
+ nxt_app_queue_notification_received(port_impl->queue);
+
nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue",
(int) port->id.pid, (int) port->id.id, (int) rbuf->size);