summaryrefslogtreecommitdiffhomepage
path: root/src/test
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2020-08-11 19:20:32 +0300
committerMax Romanov <max.romanov@nginx.com>2020-08-11 19:20:32 +0300
commita82cf4ffb68126f2831ab9877a7ef283dd517690 (patch)
tree68c8c2e28dcf8b19c0caf2662a62b19132857671 /src/test
parenta1e9df2aef5a3917728c6fd37280b03020d51123 (diff)
downloadunit-a82cf4ffb68126f2831ab9877a7ef283dd517690.tar.gz
unit-a82cf4ffb68126f2831ab9877a7ef283dd517690.tar.bz2
Circular queues implementations and a test.
- naive circular queue, described in the article "A Scalable, Portable, and Memory-Efficient Lock-Free FIFO Queue" by Ruslan Nikolaev: https://drops.dagstuhl.de/opus/volltexte/2019/11335/pdf/LIPIcs-DISC-2019-28.pdf - circular queue, proposed by Valentin Bartenev in the "Unit router application IPC" design draft
Diffstat (limited to 'src/test')
-rw-r--r--src/test/nxt_cq_test.c578
1 files changed, 578 insertions, 0 deletions
diff --git a/src/test/nxt_cq_test.c b/src/test/nxt_cq_test.c
new file mode 100644
index 00000000..ae69505a
--- /dev/null
+++ b/src/test/nxt_cq_test.c
@@ -0,0 +1,578 @@
+
+/*
+ * Copyright (C) NGINX, Inc.
+ */
+
+#include <nxt_main.h>
+#include <math.h>
+#include <inttypes.h>
+
+#ifndef NXT_NCQ_TEST
+#define NXT_NCQ_TEST 1
+#endif
+
+#define NXT_QTEST_USE_THREAD 0
+
+#if NXT_NCQ_TEST
+#include <nxt_nncq.h>
+#else
+#include <nxt_nvbcq.h>
+#endif
+
+
+#define MAX_ITER 20
+#define STAT_ITER 5
+#define MIN_COV 0.02
+
+extern char **environ;
+static uintptr_t nops = 10000000;
+
+static uintptr_t nprocs_enq = 0;
+static uintptr_t nprocs_deq = 0;
+static uintptr_t nprocs_wenq = 0;
+static uintptr_t nprocs_wdeq = 0;
+static uintptr_t nprocs_enq_deq = 0;
+static uintptr_t nprocs_cas = 0;
+static uintptr_t nprocs_faa = 0;
+
+static uintptr_t nprocs = 1;
+
+
+static size_t
+elapsed_time(size_t us)
+{
+ struct timeval t;
+
+ gettimeofday(&t, NULL);
+
+ return t.tv_sec * 1000000 + t.tv_usec - us;
+}
+
+
+static double
+mean(const double *times, int n)
+{
+ int i;
+ double sum;
+
+ sum = 0;
+
+ for (i = 0; i < n; i++) {
+ sum += times[i];
+ }
+
+ return sum / n;
+}
+
+
+static double
+cov(const double *times, double mean, int n)
+{
+ int i;
+ double variance;
+
+ variance = 0;
+
+ for (i = 0; i < n; i++) {
+ variance += (times[i] - mean) * (times[i] - mean);
+ }
+
+ variance /= n;
+
+ return sqrt(variance) / mean;
+}
+
+typedef struct {
+#if NXT_NCQ_TEST
+ nxt_nncq_t free_queue;
+ nxt_nncq_t active_queue;
+#else
+ nxt_nvbcq_t free_queue;
+ nxt_nvbcq_t active_queue;
+#endif
+ uint32_t counter;
+} nxt_cq_t;
+
+
+static nxt_cq_t *pgq;
+
+
+#if NXT_NCQ_TEST
+#define nxt_cq_enqueue nxt_nncq_enqueue
+#define nxt_cq_dequeue nxt_nncq_dequeue
+#define nxt_cq_empty nxt_nncq_empty
+#define nxt_cq_init nxt_nncq_init
+#define NXT_CQ_SIZE NXT_NNCQ_SIZE
+#else
+#define nxt_cq_enqueue nxt_nvbcq_enqueue
+#define nxt_cq_dequeue nxt_nvbcq_dequeue
+#define nxt_cq_empty nxt_nvbcq_empty
+#define nxt_cq_init nxt_nvbcq_init
+#define NXT_CQ_SIZE NXT_NVBCQ_SIZE
+#endif
+
+typedef struct {
+ int id;
+ uint64_t enq;
+ uint64_t deq;
+ uint64_t wait_enq;
+ uint64_t wait_deq;
+ uint64_t own_res;
+ uint64_t cas;
+ uint64_t faa;
+
+#if NXT_QTEST_USE_THREAD
+ nxt_thread_handle_t handle;
+#else
+ nxt_pid_t pid;
+ int status;
+#endif
+} nxt_worker_info_t;
+
+
+static void
+cas_worker(void *p)
+{
+ nxt_cq_t *q;
+ uint32_t c;
+ uintptr_t i;
+ nxt_worker_info_t *wi;
+
+ q = pgq;
+ wi = p;
+
+ for (i = 0; i < nops / nprocs_cas; i++) {
+ c = q->counter;
+
+ if (nxt_atomic_cmp_set(&q->counter, c, c + 1)) {
+ ++wi->cas;
+ }
+ }
+}
+
+
+static void
+faa_worker(void *p)
+{
+ nxt_cq_t *q;
+ uintptr_t i;
+ nxt_worker_info_t *wi;
+
+ q = pgq;
+ wi = p;
+
+ for (i = 0; i < nops / nprocs_faa; i++) {
+ nxt_atomic_fetch_add(&q->counter, 1);
+ wi->faa++;
+ }
+}
+
+
+static void
+enq_deq_worker(void *p)
+{
+ nxt_cq_t *q;
+ uintptr_t i, v;
+ nxt_worker_info_t *wi;
+
+ q = pgq;
+ wi = p;
+
+ for (i = 0; i < nops / nprocs_enq_deq; i++) {
+ v = nxt_cq_dequeue(&q->free_queue);
+
+ if (v != nxt_cq_empty(&q->free_queue)) {
+ nxt_cq_enqueue(&q->active_queue, wi->id);
+ wi->enq++;
+ }
+
+ v = nxt_cq_dequeue(&q->active_queue);
+
+ if (v != nxt_cq_empty(&q->active_queue)) {
+ nxt_cq_enqueue(&q->free_queue, v);
+ wi->deq++;
+
+ if ((int) v == wi->id) {
+ wi->own_res++;
+ }
+ }
+ }
+}
+
+
+static void
+enq_worker(void *p)
+{
+ nxt_cq_t *q;
+ uintptr_t i, v;
+ nxt_worker_info_t *wi;
+
+ q = pgq;
+ wi = p;
+
+ for (i = 0; i < nops / nprocs_enq; i++) {
+ v = nxt_cq_dequeue(&q->free_queue);
+
+ if (v != nxt_cq_empty(&q->free_queue)) {
+ nxt_cq_enqueue(&q->active_queue, v);
+ wi->enq++;
+ }
+ }
+}
+
+
+static void
+deq_worker(void *p)
+{
+ nxt_cq_t *q;
+ uintptr_t i, v;
+ nxt_worker_info_t *wi;
+
+ q = pgq;
+ wi = p;
+
+ for (i = 0; i < nops / nprocs_deq; i++) {
+ v = nxt_cq_dequeue(&q->active_queue);
+
+ if (v != nxt_cq_empty(&q->active_queue)) {
+ nxt_cq_enqueue(&q->free_queue, v);
+ ++wi->deq;
+ }
+ }
+}
+
+
+static void
+wenq_worker(void *p)
+{
+ nxt_cq_t *q;
+ uintptr_t i, v;
+ nxt_worker_info_t *wi;
+
+ q = pgq;
+ wi = p;
+
+ for (i = 0; i < nops / nprocs_wenq; i++) {
+
+ do {
+ wi->wait_enq++;
+ v = nxt_cq_dequeue(&q->free_queue);
+ } while (v == nxt_cq_empty(&q->free_queue));
+
+ nxt_cq_enqueue(&q->active_queue, v);
+
+ wi->enq++;
+ wi->wait_enq--;
+ }
+}
+
+
+static void
+wdeq_worker(void *p)
+{
+ nxt_cq_t *q;
+ uintptr_t i, v;
+ nxt_worker_info_t *wi;
+
+ q = pgq;
+ wi = p;
+
+ for (i = 0; i < nops / nprocs_wdeq; i++) {
+
+ do {
+ wi->wait_deq++;
+ v = nxt_cq_dequeue(&q->active_queue);
+ } while (v == nxt_cq_empty(&q->active_queue));
+
+ nxt_cq_enqueue(&q->free_queue, v);
+
+ wi->deq++;
+ wi->wait_deq--;
+ }
+}
+
+
+static nxt_int_t
+worker_create(nxt_worker_info_t *wi, int id, nxt_thread_start_t start)
+{
+ wi->id = id;
+
+#if NXT_QTEST_USE_THREAD
+ nxt_thread_link_t *link;
+
+ link = nxt_zalloc(sizeof(nxt_thread_link_t));
+
+ link->start = start;
+ link->work.data = wi;
+
+ return nxt_thread_create(&wi->handle, link);
+
+#else
+ pid_t pid = fork();
+
+ if (pid == 0) {
+ start(wi);
+ exit(0);
+
+ } else {
+ wi->pid = pid;
+ }
+
+ return NXT_OK;
+#endif
+}
+
+
+static void
+worker_wait(nxt_worker_info_t *wi)
+{
+#if NXT_QTEST_USE_THREAD
+ pthread_join(wi->handle, NULL);
+
+#else
+ waitpid(wi->pid, &wi->status, 0);
+#endif
+}
+
+
+int nxt_cdecl
+main(int argc, char **argv)
+{
+ int i, k, id, verbose, objective, rk;
+ char *a;
+ size_t start, elapsed;
+ double *stats, m, c;
+ uint64_t total_ops;
+ uintptr_t j;
+ nxt_task_t task;
+ nxt_thread_t *thr;
+ nxt_worker_info_t *wi;
+ double times[MAX_ITER], mopsec[MAX_ITER];
+
+ verbose = 0;
+ objective = 0;
+
+ for (i = 1; i < argc; i++) {
+ a = argv[i];
+
+ if (strcmp(a, "-v") == 0) {
+ verbose++;
+ continue;
+ }
+
+ if (strcmp(a, "-n") == 0 && (i + 1) < argc) {
+ nops = atoi(argv[++i]);
+ continue;
+ }
+
+ if (strcmp(a, "--enq") == 0 && (i + 1) < argc) {
+ nprocs_enq = atoi(argv[++i]);
+ continue;
+ }
+
+ if (strcmp(a, "--deq") == 0 && (i + 1) < argc) {
+ nprocs_deq = atoi(argv[++i]);
+ continue;
+ }
+
+ if (strcmp(a, "--wenq") == 0 && (i + 1) < argc) {
+ nprocs_wenq = atoi(argv[++i]);
+ continue;
+ }
+
+ if (strcmp(a, "--wdeq") == 0 && (i + 1) < argc) {
+ nprocs_wdeq = atoi(argv[++i]);
+ continue;
+ }
+
+ if (strcmp(a, "--ed") == 0 && (i + 1) < argc) {
+ nprocs_enq_deq = atoi(argv[++i]);
+ continue;
+ }
+
+ if (strcmp(a, "--cas") == 0 && (i + 1) < argc) {
+ nprocs_cas = atoi(argv[++i]);
+ continue;
+ }
+
+ if (strcmp(a, "--faa") == 0 && (i + 1) < argc) {
+ nprocs_faa = atoi(argv[++i]);
+ continue;
+ }
+
+ if (strcmp(a, "--obj") == 0 && (i + 1) < argc) {
+ objective = atoi(argv[++i]);
+ continue;
+ }
+
+ printf("unknown option %s", a);
+
+ return 1;
+ }
+
+ if (nxt_lib_start("ncq_test", argv, &environ) != NXT_OK) {
+ return 1;
+ }
+
+ nprocs = nprocs_enq + nprocs_deq + nprocs_wenq + nprocs_wdeq
+ + nprocs_enq_deq + nprocs_cas + nprocs_faa;
+
+ if (nprocs == 0) {
+ return 0;
+ }
+
+ nxt_main_log.level = NXT_LOG_INFO;
+ task.log = &nxt_main_log;
+
+ thr = nxt_thread();
+ thr->task = &task;
+
+ pgq = mmap(NULL, sizeof(nxt_cq_t), PROT_READ | PROT_WRITE,
+ MAP_ANON | MAP_SHARED, -1, 0);
+ if (pgq == MAP_FAILED) {
+ return 2;
+ }
+
+ nxt_cq_init(&pgq->free_queue);
+ nxt_cq_init(&pgq->active_queue);
+
+ for(i = 0; i < NXT_CQ_SIZE; i++) {
+ nxt_cq_enqueue(&pgq->free_queue, i);
+ }
+
+ if (verbose >= 1) {
+ printf("number of workers: %d\n", (int) nprocs);
+ printf("number of ops: %d\n", (int) nops);
+ }
+
+ wi = mmap(NULL, nprocs * sizeof(nxt_worker_info_t), PROT_READ | PROT_WRITE,
+ MAP_ANON | MAP_SHARED, -1, 0);
+ if (wi == MAP_FAILED) {
+ return 3;
+ }
+
+ for (k = 0; k < MAX_ITER; k++) {
+ nxt_memzero(wi, nprocs * sizeof(nxt_worker_info_t));
+
+ nxt_cq_init(&pgq->free_queue);
+ nxt_cq_init(&pgq->active_queue);
+
+ for(i = 0; i < NXT_CQ_SIZE; i++) {
+ nxt_cq_enqueue(&pgq->free_queue, i);
+ }
+
+ start = elapsed_time(0);
+
+ id = 0;
+
+ for (j = 0; j < nprocs_enq; j++, id++) {
+ worker_create(wi + id, id, enq_worker);
+ }
+
+ for (j = 0; j < nprocs_deq; j++, id++) {
+ worker_create(wi + id, id, deq_worker);
+ }
+
+ for (j = 0; j < nprocs_wenq; j++, id++) {
+ worker_create(wi + id, id, wenq_worker);
+ }
+
+ for (j = 0; j < nprocs_wdeq; j++, id++) {
+ worker_create(wi + id, id, wdeq_worker);
+ }
+
+ for (j = 0; j < nprocs_enq_deq; j++, id++) {
+ worker_create(wi + id, id, enq_deq_worker);
+ }
+
+ for (j = 0; j < nprocs_cas; j++, id++) {
+ worker_create(wi + id, id, cas_worker);
+ }
+
+ for (j = 0; j < nprocs_faa; j++, id++) {
+ worker_create(wi + id, id, faa_worker);
+ }
+
+ for (j = 0; j < nprocs; j++) {
+ worker_wait(wi + j);
+ }
+
+ elapsed = elapsed_time(start);
+
+ for (j = 1; j < nprocs; j++) {
+ wi[0].enq += wi[j].enq;
+ wi[0].deq += wi[j].deq;
+ wi[0].wait_enq += wi[j].wait_enq;
+ wi[0].wait_deq += wi[j].wait_deq;
+ wi[0].own_res += wi[j].own_res;
+ wi[0].cas += wi[j].cas;
+ wi[0].faa += wi[j].faa;
+ }
+
+ total_ops = wi[0].enq + wi[0].deq + wi[0].cas + wi[0].faa;
+
+ if (total_ops == 0) {
+ total_ops = nops;
+ }
+
+ times[k] = elapsed / 1000.0;
+ mopsec[k] = (double) total_ops / elapsed;
+
+ if (verbose >= 2) {
+ printf("enq %10"PRIu64"\n", wi[0].enq);
+ printf("deq %10"PRIu64"\n", wi[0].deq);
+ printf("wait_enq %10"PRIu64"\n", wi[0].wait_enq);
+ printf("wait_deq %10"PRIu64"\n", wi[0].wait_deq);
+ printf("own_res %10"PRIu64"\n", wi[0].own_res);
+ printf("cas %10"PRIu64"\n", wi[0].cas);
+ printf("faa %10"PRIu64"\n", wi[0].faa);
+ printf("total ops %10"PRIu64"\n", total_ops);
+ printf("Mops/sec %13.2f\n", mopsec[k]);
+
+ printf("elapsed %10d us\n", (int) elapsed);
+ printf("per op %10d ns\n", (int) ((1000 * elapsed) / total_ops));
+ }
+
+ if (k >= STAT_ITER) {
+ stats = (objective == 0) ? times : mopsec;
+
+ m = mean(stats + k - STAT_ITER, STAT_ITER);
+ c = cov(stats + k - STAT_ITER, m, STAT_ITER);
+
+ if (verbose >= 1) {
+ if (objective == 0) {
+ printf(" #%02d elapsed time: %.2f ms; Mops/sec %.2f; "
+ "mean time %.2f ms; cov %.4f\n",
+ (int) k + 1, times[k], mopsec[k], m, c);
+
+ } else {
+ printf(" #%02d elapsed time: %.2f ms; Mops/sec %.2f; "
+ "mean Mop/sec %.2f; cov %.4f\n",
+ (int) k + 1, times[k], mopsec[k], m, c);
+ }
+ }
+
+ if (c < MIN_COV) {
+ rk = k - STAT_ITER;
+
+ for (i = rk + 1; i <= k; i++) {
+ if (fabs(stats[i] - m) < fabs(stats[rk] - m)) {
+ rk = i;
+ }
+ }
+
+ printf("#%d %.2f ms; %.2f\n", rk, times[rk], mopsec[rk]);
+
+ return 0;
+ }
+
+ } else {
+ if (verbose >= 1) {
+ printf(" #%02d elapsed time: %.2f ms; Mops/sec %.2f\n",
+ (int) k + 1, times[k], mopsec[k]);
+ }
+ }
+ }
+
+ return 0;
+}