diff options
author | Max Romanov <max.romanov@nginx.com> | 2020-08-11 19:20:32 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2020-08-11 19:20:32 +0300 |
commit | a82cf4ffb68126f2831ab9877a7ef283dd517690 (patch) | |
tree | 68c8c2e28dcf8b19c0caf2662a62b19132857671 /src/test | |
parent | a1e9df2aef5a3917728c6fd37280b03020d51123 (diff) | |
download | unit-a82cf4ffb68126f2831ab9877a7ef283dd517690.tar.gz unit-a82cf4ffb68126f2831ab9877a7ef283dd517690.tar.bz2 |
Circular queues implementations and a test.
- naive circular queue, described in the article "A Scalable, Portable, and
Memory-Efficient Lock-Free FIFO Queue" by Ruslan Nikolaev:
https://drops.dagstuhl.de/opus/volltexte/2019/11335/pdf/LIPIcs-DISC-2019-28.pdf
- circular queue, proposed by Valentin Bartenev in the "Unit router application
IPC" design draft
Diffstat (limited to '')
-rw-r--r-- | src/test/nxt_cq_test.c | 578 |
1 files changed, 578 insertions, 0 deletions
diff --git a/src/test/nxt_cq_test.c b/src/test/nxt_cq_test.c new file mode 100644 index 00000000..ae69505a --- /dev/null +++ b/src/test/nxt_cq_test.c @@ -0,0 +1,578 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + +#include <nxt_main.h> +#include <math.h> +#include <inttypes.h> + +#ifndef NXT_NCQ_TEST +#define NXT_NCQ_TEST 1 +#endif + +#define NXT_QTEST_USE_THREAD 0 + +#if NXT_NCQ_TEST +#include <nxt_nncq.h> +#else +#include <nxt_nvbcq.h> +#endif + + +#define MAX_ITER 20 +#define STAT_ITER 5 +#define MIN_COV 0.02 + +extern char **environ; +static uintptr_t nops = 10000000; + +static uintptr_t nprocs_enq = 0; +static uintptr_t nprocs_deq = 0; +static uintptr_t nprocs_wenq = 0; +static uintptr_t nprocs_wdeq = 0; +static uintptr_t nprocs_enq_deq = 0; +static uintptr_t nprocs_cas = 0; +static uintptr_t nprocs_faa = 0; + +static uintptr_t nprocs = 1; + + +static size_t +elapsed_time(size_t us) +{ + struct timeval t; + + gettimeofday(&t, NULL); + + return t.tv_sec * 1000000 + t.tv_usec - us; +} + + +static double +mean(const double *times, int n) +{ + int i; + double sum; + + sum = 0; + + for (i = 0; i < n; i++) { + sum += times[i]; + } + + return sum / n; +} + + +static double +cov(const double *times, double mean, int n) +{ + int i; + double variance; + + variance = 0; + + for (i = 0; i < n; i++) { + variance += (times[i] - mean) * (times[i] - mean); + } + + variance /= n; + + return sqrt(variance) / mean; +} + +typedef struct { +#if NXT_NCQ_TEST + nxt_nncq_t free_queue; + nxt_nncq_t active_queue; +#else + nxt_nvbcq_t free_queue; + nxt_nvbcq_t active_queue; +#endif + uint32_t counter; +} nxt_cq_t; + + +static nxt_cq_t *pgq; + + +#if NXT_NCQ_TEST +#define nxt_cq_enqueue nxt_nncq_enqueue +#define nxt_cq_dequeue nxt_nncq_dequeue +#define nxt_cq_empty nxt_nncq_empty +#define nxt_cq_init nxt_nncq_init +#define NXT_CQ_SIZE NXT_NNCQ_SIZE +#else +#define nxt_cq_enqueue nxt_nvbcq_enqueue +#define nxt_cq_dequeue nxt_nvbcq_dequeue +#define nxt_cq_empty nxt_nvbcq_empty +#define nxt_cq_init nxt_nvbcq_init +#define NXT_CQ_SIZE NXT_NVBCQ_SIZE +#endif + +typedef struct { + int id; + uint64_t enq; + uint64_t deq; + uint64_t wait_enq; + uint64_t wait_deq; + uint64_t own_res; + uint64_t cas; + uint64_t faa; + +#if NXT_QTEST_USE_THREAD + nxt_thread_handle_t handle; +#else + nxt_pid_t pid; + int status; +#endif +} nxt_worker_info_t; + + +static void +cas_worker(void *p) +{ + nxt_cq_t *q; + uint32_t c; + uintptr_t i; + nxt_worker_info_t *wi; + + q = pgq; + wi = p; + + for (i = 0; i < nops / nprocs_cas; i++) { + c = q->counter; + + if (nxt_atomic_cmp_set(&q->counter, c, c + 1)) { + ++wi->cas; + } + } +} + + +static void +faa_worker(void *p) +{ + nxt_cq_t *q; + uintptr_t i; + nxt_worker_info_t *wi; + + q = pgq; + wi = p; + + for (i = 0; i < nops / nprocs_faa; i++) { + nxt_atomic_fetch_add(&q->counter, 1); + wi->faa++; + } +} + + +static void +enq_deq_worker(void *p) +{ + nxt_cq_t *q; + uintptr_t i, v; + nxt_worker_info_t *wi; + + q = pgq; + wi = p; + + for (i = 0; i < nops / nprocs_enq_deq; i++) { + v = nxt_cq_dequeue(&q->free_queue); + + if (v != nxt_cq_empty(&q->free_queue)) { + nxt_cq_enqueue(&q->active_queue, wi->id); + wi->enq++; + } + + v = nxt_cq_dequeue(&q->active_queue); + + if (v != nxt_cq_empty(&q->active_queue)) { + nxt_cq_enqueue(&q->free_queue, v); + wi->deq++; + + if ((int) v == wi->id) { + wi->own_res++; + } + } + } +} + + +static void +enq_worker(void *p) +{ + nxt_cq_t *q; + uintptr_t i, v; + nxt_worker_info_t *wi; + + q = pgq; + wi = p; + + for (i = 0; i < nops / nprocs_enq; i++) { + v = nxt_cq_dequeue(&q->free_queue); + + if (v != nxt_cq_empty(&q->free_queue)) { + nxt_cq_enqueue(&q->active_queue, v); + wi->enq++; + } + } +} + + +static void +deq_worker(void *p) +{ + nxt_cq_t *q; + uintptr_t i, v; + nxt_worker_info_t *wi; + + q = pgq; + wi = p; + + for (i = 0; i < nops / nprocs_deq; i++) { + v = nxt_cq_dequeue(&q->active_queue); + + if (v != nxt_cq_empty(&q->active_queue)) { + nxt_cq_enqueue(&q->free_queue, v); + ++wi->deq; + } + } +} + + +static void +wenq_worker(void *p) +{ + nxt_cq_t *q; + uintptr_t i, v; + nxt_worker_info_t *wi; + + q = pgq; + wi = p; + + for (i = 0; i < nops / nprocs_wenq; i++) { + + do { + wi->wait_enq++; + v = nxt_cq_dequeue(&q->free_queue); + } while (v == nxt_cq_empty(&q->free_queue)); + + nxt_cq_enqueue(&q->active_queue, v); + + wi->enq++; + wi->wait_enq--; + } +} + + +static void +wdeq_worker(void *p) +{ + nxt_cq_t *q; + uintptr_t i, v; + nxt_worker_info_t *wi; + + q = pgq; + wi = p; + + for (i = 0; i < nops / nprocs_wdeq; i++) { + + do { + wi->wait_deq++; + v = nxt_cq_dequeue(&q->active_queue); + } while (v == nxt_cq_empty(&q->active_queue)); + + nxt_cq_enqueue(&q->free_queue, v); + + wi->deq++; + wi->wait_deq--; + } +} + + +static nxt_int_t +worker_create(nxt_worker_info_t *wi, int id, nxt_thread_start_t start) +{ + wi->id = id; + +#if NXT_QTEST_USE_THREAD + nxt_thread_link_t *link; + + link = nxt_zalloc(sizeof(nxt_thread_link_t)); + + link->start = start; + link->work.data = wi; + + return nxt_thread_create(&wi->handle, link); + +#else + pid_t pid = fork(); + + if (pid == 0) { + start(wi); + exit(0); + + } else { + wi->pid = pid; + } + + return NXT_OK; +#endif +} + + +static void +worker_wait(nxt_worker_info_t *wi) +{ +#if NXT_QTEST_USE_THREAD + pthread_join(wi->handle, NULL); + +#else + waitpid(wi->pid, &wi->status, 0); +#endif +} + + +int nxt_cdecl +main(int argc, char **argv) +{ + int i, k, id, verbose, objective, rk; + char *a; + size_t start, elapsed; + double *stats, m, c; + uint64_t total_ops; + uintptr_t j; + nxt_task_t task; + nxt_thread_t *thr; + nxt_worker_info_t *wi; + double times[MAX_ITER], mopsec[MAX_ITER]; + + verbose = 0; + objective = 0; + + for (i = 1; i < argc; i++) { + a = argv[i]; + + if (strcmp(a, "-v") == 0) { + verbose++; + continue; + } + + if (strcmp(a, "-n") == 0 && (i + 1) < argc) { + nops = atoi(argv[++i]); + continue; + } + + if (strcmp(a, "--enq") == 0 && (i + 1) < argc) { + nprocs_enq = atoi(argv[++i]); + continue; + } + + if (strcmp(a, "--deq") == 0 && (i + 1) < argc) { + nprocs_deq = atoi(argv[++i]); + continue; + } + + if (strcmp(a, "--wenq") == 0 && (i + 1) < argc) { + nprocs_wenq = atoi(argv[++i]); + continue; + } + + if (strcmp(a, "--wdeq") == 0 && (i + 1) < argc) { + nprocs_wdeq = atoi(argv[++i]); + continue; + } + + if (strcmp(a, "--ed") == 0 && (i + 1) < argc) { + nprocs_enq_deq = atoi(argv[++i]); + continue; + } + + if (strcmp(a, "--cas") == 0 && (i + 1) < argc) { + nprocs_cas = atoi(argv[++i]); + continue; + } + + if (strcmp(a, "--faa") == 0 && (i + 1) < argc) { + nprocs_faa = atoi(argv[++i]); + continue; + } + + if (strcmp(a, "--obj") == 0 && (i + 1) < argc) { + objective = atoi(argv[++i]); + continue; + } + + printf("unknown option %s", a); + + return 1; + } + + if (nxt_lib_start("ncq_test", argv, &environ) != NXT_OK) { + return 1; + } + + nprocs = nprocs_enq + nprocs_deq + nprocs_wenq + nprocs_wdeq + + nprocs_enq_deq + nprocs_cas + nprocs_faa; + + if (nprocs == 0) { + return 0; + } + + nxt_main_log.level = NXT_LOG_INFO; + task.log = &nxt_main_log; + + thr = nxt_thread(); + thr->task = &task; + + pgq = mmap(NULL, sizeof(nxt_cq_t), PROT_READ | PROT_WRITE, + MAP_ANON | MAP_SHARED, -1, 0); + if (pgq == MAP_FAILED) { + return 2; + } + + nxt_cq_init(&pgq->free_queue); + nxt_cq_init(&pgq->active_queue); + + for(i = 0; i < NXT_CQ_SIZE; i++) { + nxt_cq_enqueue(&pgq->free_queue, i); + } + + if (verbose >= 1) { + printf("number of workers: %d\n", (int) nprocs); + printf("number of ops: %d\n", (int) nops); + } + + wi = mmap(NULL, nprocs * sizeof(nxt_worker_info_t), PROT_READ | PROT_WRITE, + MAP_ANON | MAP_SHARED, -1, 0); + if (wi == MAP_FAILED) { + return 3; + } + + for (k = 0; k < MAX_ITER; k++) { + nxt_memzero(wi, nprocs * sizeof(nxt_worker_info_t)); + + nxt_cq_init(&pgq->free_queue); + nxt_cq_init(&pgq->active_queue); + + for(i = 0; i < NXT_CQ_SIZE; i++) { + nxt_cq_enqueue(&pgq->free_queue, i); + } + + start = elapsed_time(0); + + id = 0; + + for (j = 0; j < nprocs_enq; j++, id++) { + worker_create(wi + id, id, enq_worker); + } + + for (j = 0; j < nprocs_deq; j++, id++) { + worker_create(wi + id, id, deq_worker); + } + + for (j = 0; j < nprocs_wenq; j++, id++) { + worker_create(wi + id, id, wenq_worker); + } + + for (j = 0; j < nprocs_wdeq; j++, id++) { + worker_create(wi + id, id, wdeq_worker); + } + + for (j = 0; j < nprocs_enq_deq; j++, id++) { + worker_create(wi + id, id, enq_deq_worker); + } + + for (j = 0; j < nprocs_cas; j++, id++) { + worker_create(wi + id, id, cas_worker); + } + + for (j = 0; j < nprocs_faa; j++, id++) { + worker_create(wi + id, id, faa_worker); + } + + for (j = 0; j < nprocs; j++) { + worker_wait(wi + j); + } + + elapsed = elapsed_time(start); + + for (j = 1; j < nprocs; j++) { + wi[0].enq += wi[j].enq; + wi[0].deq += wi[j].deq; + wi[0].wait_enq += wi[j].wait_enq; + wi[0].wait_deq += wi[j].wait_deq; + wi[0].own_res += wi[j].own_res; + wi[0].cas += wi[j].cas; + wi[0].faa += wi[j].faa; + } + + total_ops = wi[0].enq + wi[0].deq + wi[0].cas + wi[0].faa; + + if (total_ops == 0) { + total_ops = nops; + } + + times[k] = elapsed / 1000.0; + mopsec[k] = (double) total_ops / elapsed; + + if (verbose >= 2) { + printf("enq %10"PRIu64"\n", wi[0].enq); + printf("deq %10"PRIu64"\n", wi[0].deq); + printf("wait_enq %10"PRIu64"\n", wi[0].wait_enq); + printf("wait_deq %10"PRIu64"\n", wi[0].wait_deq); + printf("own_res %10"PRIu64"\n", wi[0].own_res); + printf("cas %10"PRIu64"\n", wi[0].cas); + printf("faa %10"PRIu64"\n", wi[0].faa); + printf("total ops %10"PRIu64"\n", total_ops); + printf("Mops/sec %13.2f\n", mopsec[k]); + + printf("elapsed %10d us\n", (int) elapsed); + printf("per op %10d ns\n", (int) ((1000 * elapsed) / total_ops)); + } + + if (k >= STAT_ITER) { + stats = (objective == 0) ? times : mopsec; + + m = mean(stats + k - STAT_ITER, STAT_ITER); + c = cov(stats + k - STAT_ITER, m, STAT_ITER); + + if (verbose >= 1) { + if (objective == 0) { + printf(" #%02d elapsed time: %.2f ms; Mops/sec %.2f; " + "mean time %.2f ms; cov %.4f\n", + (int) k + 1, times[k], mopsec[k], m, c); + + } else { + printf(" #%02d elapsed time: %.2f ms; Mops/sec %.2f; " + "mean Mop/sec %.2f; cov %.4f\n", + (int) k + 1, times[k], mopsec[k], m, c); + } + } + + if (c < MIN_COV) { + rk = k - STAT_ITER; + + for (i = rk + 1; i <= k; i++) { + if (fabs(stats[i] - m) < fabs(stats[rk] - m)) { + rk = i; + } + } + + printf("#%d %.2f ms; %.2f\n", rk, times[rk], mopsec[rk]); + + return 0; + } + + } else { + if (verbose >= 1) { + printf(" #%02d elapsed time: %.2f ms; Mops/sec %.2f\n", + (int) k + 1, times[k], mopsec[k]); + } + } + } + + return 0; +} |