summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_app_queue.h
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2020-08-11 19:20:34 +0300
committerMax Romanov <max.romanov@nginx.com>2020-08-11 19:20:34 +0300
commite227fc9e6281c280c46139a81646ecd7b0510e2b (patch)
tree8f6e631790e7a83bb46c28bd45ebaa5e737a3424 /src/nxt_app_queue.h
parenta82cf4ffb68126f2831ab9877a7ef283dd517690 (diff)
downloadunit-e227fc9e6281c280c46139a81646ecd7b0510e2b.tar.gz
unit-e227fc9e6281c280c46139a81646ecd7b0510e2b.tar.bz2
Introducing application and port shared memory queues.
The goal is to minimize the number of syscalls needed to deliver a message.
Diffstat (limited to 'src/nxt_app_queue.h')
-rw-r--r--src/nxt_app_queue.h119
1 files changed, 119 insertions, 0 deletions
diff --git a/src/nxt_app_queue.h b/src/nxt_app_queue.h
new file mode 100644
index 00000000..127cb8f3
--- /dev/null
+++ b/src/nxt_app_queue.h
@@ -0,0 +1,119 @@
+
+/*
+ * Copyright (C) NGINX, Inc.
+ */
+
+#ifndef _NXT_APP_QUEUE_H_INCLUDED_
+#define _NXT_APP_QUEUE_H_INCLUDED_
+
+
+#include <nxt_app_nncq.h>
+
+
+/* Using Numeric Naive Circular Queue as a backend. */
+
+#define NXT_APP_QUEUE_SIZE NXT_APP_NNCQ_SIZE
+#define NXT_APP_QUEUE_MSG_SIZE 31
+
+typedef struct {
+ uint8_t size;
+ uint8_t data[NXT_APP_QUEUE_MSG_SIZE];
+ uint32_t tracking;
+} nxt_app_queue_item_t;
+
+
+typedef struct {
+ nxt_app_nncq_atomic_t nitems;
+ nxt_app_nncq_t free_items;
+ nxt_app_nncq_t queue;
+ nxt_app_queue_item_t items[NXT_APP_QUEUE_SIZE];
+} nxt_app_queue_t;
+
+
+nxt_inline void
+nxt_app_queue_init(nxt_app_queue_t volatile *q)
+{
+ nxt_app_nncq_atomic_t i;
+
+ nxt_app_nncq_init(&q->free_items);
+ nxt_app_nncq_init(&q->queue);
+
+ for (i = 0; i < NXT_APP_QUEUE_SIZE; i++) {
+ nxt_app_nncq_enqueue(&q->free_items, i);
+ }
+
+ q->nitems = 0;
+}
+
+
+nxt_inline nxt_int_t
+nxt_app_queue_send(nxt_app_queue_t volatile *q, const void *p,
+ uint8_t size, uint32_t tracking, int *notify, uint32_t *cookie)
+{
+ nxt_app_queue_item_t *qi;
+ nxt_app_nncq_atomic_t i;
+
+ i = nxt_app_nncq_dequeue(&q->free_items);
+ if (i == nxt_app_nncq_empty(&q->free_items)) {
+ return NXT_AGAIN;
+ }
+
+ qi = (nxt_app_queue_item_t *) &q->items[i];
+
+ qi->size = size;
+ nxt_memcpy(qi->data, p, size);
+ qi->tracking = tracking;
+ *cookie = i;
+
+ nxt_app_nncq_enqueue(&q->queue, i);
+
+ i = nxt_atomic_fetch_add(&q->nitems, 1);
+
+ if (notify != NULL) {
+ *notify = (i == 0);
+ }
+
+ return NXT_OK;
+}
+
+
+nxt_inline nxt_bool_t
+nxt_app_queue_cancel(nxt_app_queue_t volatile *q, uint32_t cookie,
+ uint32_t tracking)
+{
+ nxt_app_queue_item_t *qi;
+
+ qi = (nxt_app_queue_item_t *) &q->items[cookie];
+
+ return nxt_atomic_cmp_set(&qi->tracking, tracking, 0);
+}
+
+
+nxt_inline ssize_t
+nxt_app_queue_recv(nxt_app_queue_t volatile *q, void *p, uint32_t *cookie)
+{
+ ssize_t res;
+ nxt_app_queue_item_t *qi;
+ nxt_app_nncq_atomic_t i;
+
+ i = nxt_app_nncq_dequeue(&q->queue);
+ if (i == nxt_app_nncq_empty(&q->queue)) {
+ *cookie = 0;
+ return -1;
+ }
+
+ qi = (nxt_app_queue_item_t *) &q->items[i];
+
+ res = qi->size;
+ nxt_memcpy(p, qi->data, qi->size);
+ *cookie = i;
+
+ nxt_app_nncq_enqueue(&q->free_items, i);
+
+ nxt_atomic_fetch_add(&q->nitems, -1);
+
+ return res;
+}
+
+
+#endif /* _NXT_APP_QUEUE_H_INCLUDED_ */