diff options
Diffstat (limited to '')
-rw-r--r-- | src/nxt_nncq.h | 162 | ||||
-rw-r--r-- | src/nxt_nvbcq.h | 146 | ||||
-rw-r--r-- | src/test/nxt_cq_test.c | 578 |
3 files changed, 886 insertions, 0 deletions
diff --git a/src/nxt_nncq.h b/src/nxt_nncq.h new file mode 100644 index 00000000..20e7ecff --- /dev/null +++ b/src/nxt_nncq.h @@ -0,0 +1,162 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + +#ifndef _NXT_NNCQ_H_INCLUDED_ +#define _NXT_NNCQ_H_INCLUDED_ + + +/* Numeric Naive Circular Queue */ + +#define NXT_NNCQ_SIZE 16384 + +typedef uint32_t nxt_nncq_atomic_t; +typedef uint16_t nxt_nncq_cycle_t; + +typedef struct { + nxt_nncq_atomic_t head; + nxt_nncq_atomic_t entries[NXT_NNCQ_SIZE]; + nxt_nncq_atomic_t tail; +} nxt_nncq_t; + + +static inline nxt_nncq_atomic_t +nxt_nncq_head(nxt_nncq_t const volatile *q) +{ + return q->head; +} + + +static inline nxt_nncq_atomic_t +nxt_nncq_tail(nxt_nncq_t const volatile *q) +{ + return q->tail; +} + + +static inline void +nxt_nncq_tail_cmp_inc(nxt_nncq_t volatile *q, nxt_nncq_atomic_t t) +{ + nxt_atomic_cmp_set(&q->tail, t, t + 1); +} + + +static inline nxt_nncq_atomic_t +nxt_nncq_index(nxt_nncq_t const volatile *q, nxt_nncq_atomic_t i) +{ + return i % NXT_NNCQ_SIZE; +} + + +static inline nxt_nncq_atomic_t +nxt_nncq_map(nxt_nncq_t const volatile *q, nxt_nncq_atomic_t i) +{ + return i % NXT_NNCQ_SIZE; +} + + +static inline nxt_nncq_cycle_t +nxt_nncq_cycle(nxt_nncq_t const volatile *q, nxt_nncq_atomic_t i) +{ + return i / NXT_NNCQ_SIZE; +} + + +static inline nxt_nncq_cycle_t +nxt_nncq_next_cycle(nxt_nncq_t const volatile *q, nxt_nncq_cycle_t i) +{ + return i + 1; +} + + +static inline nxt_nncq_atomic_t +nxt_nncq_new_entry(nxt_nncq_t const volatile *q, nxt_nncq_cycle_t cycle, + nxt_nncq_atomic_t i) +{ + return cycle * NXT_NNCQ_SIZE + (i % NXT_NNCQ_SIZE); +} + + +static inline nxt_nncq_atomic_t +nxt_nncq_empty(nxt_nncq_t const volatile *q) +{ + return NXT_NNCQ_SIZE; +} + + +static void +nxt_nncq_init(nxt_nncq_t volatile *q) +{ + q->head = NXT_NNCQ_SIZE; + nxt_memzero((void *) q->entries, NXT_NNCQ_SIZE * sizeof(nxt_nncq_atomic_t)); + q->tail = NXT_NNCQ_SIZE; +} + + +static void +nxt_nncq_enqueue(nxt_nncq_t volatile *q, nxt_nncq_atomic_t val) +{ + nxt_nncq_cycle_t e_cycle, t_cycle; + nxt_nncq_atomic_t n, t, e, j; + + for ( ;; ) { + t = nxt_nncq_tail(q); + j = nxt_nncq_map(q, t); + e = q->entries[j]; + + e_cycle = nxt_nncq_cycle(q, e); + t_cycle = nxt_nncq_cycle(q, t); + + if (e_cycle == t_cycle) { + nxt_nncq_tail_cmp_inc(q, t); + continue; + } + + if (nxt_nncq_next_cycle(q, e_cycle) != t_cycle) { + continue; + } + + n = nxt_nncq_new_entry(q, t_cycle, val); + + if (nxt_atomic_cmp_set(&q->entries[j], e, n)) { + break; + } + } + + nxt_nncq_tail_cmp_inc(q, t); +} + + +static nxt_nncq_atomic_t +nxt_nncq_dequeue(nxt_nncq_t volatile *q) +{ + nxt_nncq_cycle_t e_cycle, h_cycle; + nxt_nncq_atomic_t h, j, e; + + for ( ;; ) { + h = nxt_nncq_head(q); + j = nxt_nncq_map(q, h); + e = q->entries[j]; + + e_cycle = nxt_nncq_cycle(q, e); + h_cycle = nxt_nncq_cycle(q, h); + + if (e_cycle != h_cycle) { + if (nxt_nncq_next_cycle(q, e_cycle) == h_cycle) { + return nxt_nncq_empty(q); + } + + continue; + } + + if (nxt_atomic_cmp_set(&q->head, h, h + 1)) { + break; + } + } + + return nxt_nncq_index(q, e); +} + + +#endif /* _NXT_NNCQ_H_INCLUDED_ */ diff --git a/src/nxt_nvbcq.h b/src/nxt_nvbcq.h new file mode 100644 index 00000000..2b019dcc --- /dev/null +++ b/src/nxt_nvbcq.h @@ -0,0 +1,146 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + +#ifndef _NXT_NVBCQ_H_INCLUDED_ +#define _NXT_NVBCQ_H_INCLUDED_ + + +/* Numeric VBart Circular Queue */ + +#define NXT_NVBCQ_SIZE 16384 + +typedef uint32_t nxt_nvbcq_atomic_t; + +struct nxt_nvbcq_s { + nxt_nvbcq_atomic_t head; + nxt_nvbcq_atomic_t entries[NXT_NVBCQ_SIZE]; + nxt_nvbcq_atomic_t tail; +}; + +typedef struct nxt_nvbcq_s nxt_nvbcq_t; + + +static inline nxt_nvbcq_atomic_t +nxt_nvbcq_head(nxt_nvbcq_t const volatile *q) +{ + return q->head; +} + + +static inline nxt_nvbcq_atomic_t +nxt_nvbcq_tail(nxt_nvbcq_t const volatile *q) +{ + return q->tail; +} + + +static inline void +nxt_nvbcq_tail_cmp_inc(nxt_nvbcq_t volatile *q, nxt_nvbcq_atomic_t t) +{ + nxt_atomic_cmp_set(&q->tail, t, t + 1); +} + + +static inline nxt_nvbcq_atomic_t +nxt_nvbcq_index(nxt_nvbcq_t const volatile *q, nxt_nvbcq_atomic_t i) +{ + return i % NXT_NVBCQ_SIZE; +} + + +static inline nxt_nvbcq_atomic_t +nxt_nvbcq_map(nxt_nvbcq_t const volatile *q, nxt_nvbcq_atomic_t i) +{ + return i % NXT_NVBCQ_SIZE; +} + + +static inline nxt_nvbcq_atomic_t +nxt_nvbcq_empty(nxt_nvbcq_t const volatile *q) +{ + return NXT_NVBCQ_SIZE; +} + + +static void +nxt_nvbcq_init(nxt_nvbcq_t volatile *q) +{ + nxt_nvbcq_atomic_t i; + + q->head = 0; + + for (i = 0; i < NXT_NVBCQ_SIZE; i++) { + q->entries[i] = NXT_NVBCQ_SIZE; + } + + q->tail = NXT_NVBCQ_SIZE; +} + + +static void +nxt_nvbcq_enqueue(nxt_nvbcq_t volatile *q, nxt_nvbcq_atomic_t val) +{ + nxt_nvbcq_atomic_t t, h, i; + + t = nxt_nvbcq_tail(q); + h = t - NXT_NVBCQ_SIZE; + + for ( ;; ) { + i = nxt_nvbcq_map(q, t); + + if (q->entries[i] == NXT_NVBCQ_SIZE + && nxt_atomic_cmp_set(&q->entries[i], NXT_NVBCQ_SIZE, val)) + { + nxt_nvbcq_tail_cmp_inc(q, t); + return; + } + + if ((t - h) == NXT_NVBCQ_SIZE) { + h = nxt_nvbcq_head(q); + + if ((t - h) == NXT_NVBCQ_SIZE) { + return; + } + } + + t++; + } +} + + +static nxt_nvbcq_atomic_t +nxt_nvbcq_dequeue(nxt_nvbcq_t volatile *q) +{ + nxt_nvbcq_atomic_t h, t, i, e; + + h = nxt_nvbcq_head(q); + t = h + NXT_NVBCQ_SIZE; + + for ( ;; ) { + i = nxt_nvbcq_map(q, h); + e = q->entries[i]; + + if (e < NXT_NVBCQ_SIZE + && nxt_atomic_cmp_set(&q->entries[i], e, NXT_NVBCQ_SIZE)) + { + nxt_atomic_cmp_set(&q->head, h, h + 1); + + return e; + } + + if ((t - h) == NXT_NVBCQ_SIZE) { + t = nxt_nvbcq_tail(q); + + if ((t - h) == NXT_NVBCQ_SIZE) { + return NXT_NVBCQ_SIZE; + } + } + + h++; + } +} + + +#endif /* _NXT_NVBCQ_H_INCLUDED_ */ diff --git a/src/test/nxt_cq_test.c b/src/test/nxt_cq_test.c new file mode 100644 index 00000000..ae69505a --- /dev/null +++ b/src/test/nxt_cq_test.c @@ -0,0 +1,578 @@ + +/* + * Copyright (C) NGINX, Inc. + */ + +#include <nxt_main.h> +#include <math.h> +#include <inttypes.h> + +#ifndef NXT_NCQ_TEST +#define NXT_NCQ_TEST 1 +#endif + +#define NXT_QTEST_USE_THREAD 0 + +#if NXT_NCQ_TEST +#include <nxt_nncq.h> +#else +#include <nxt_nvbcq.h> +#endif + + +#define MAX_ITER 20 +#define STAT_ITER 5 +#define MIN_COV 0.02 + +extern char **environ; +static uintptr_t nops = 10000000; + +static uintptr_t nprocs_enq = 0; +static uintptr_t nprocs_deq = 0; +static uintptr_t nprocs_wenq = 0; +static uintptr_t nprocs_wdeq = 0; +static uintptr_t nprocs_enq_deq = 0; +static uintptr_t nprocs_cas = 0; +static uintptr_t nprocs_faa = 0; + +static uintptr_t nprocs = 1; + + +static size_t +elapsed_time(size_t us) +{ + struct timeval t; + + gettimeofday(&t, NULL); + + return t.tv_sec * 1000000 + t.tv_usec - us; +} + + +static double +mean(const double *times, int n) +{ + int i; + double sum; + + sum = 0; + + for (i = 0; i < n; i++) { + sum += times[i]; + } + + return sum / n; +} + + +static double +cov(const double *times, double mean, int n) +{ + int i; + double variance; + + variance = 0; + + for (i = 0; i < n; i++) { + variance += (times[i] - mean) * (times[i] - mean); + } + + variance /= n; + + return sqrt(variance) / mean; +} + +typedef struct { +#if NXT_NCQ_TEST + nxt_nncq_t free_queue; + nxt_nncq_t active_queue; +#else + nxt_nvbcq_t free_queue; + nxt_nvbcq_t active_queue; +#endif + uint32_t counter; +} nxt_cq_t; + + +static nxt_cq_t *pgq; + + +#if NXT_NCQ_TEST +#define nxt_cq_enqueue nxt_nncq_enqueue +#define nxt_cq_dequeue nxt_nncq_dequeue +#define nxt_cq_empty nxt_nncq_empty +#define nxt_cq_init nxt_nncq_init +#define NXT_CQ_SIZE NXT_NNCQ_SIZE +#else +#define nxt_cq_enqueue nxt_nvbcq_enqueue +#define nxt_cq_dequeue nxt_nvbcq_dequeue +#define nxt_cq_empty nxt_nvbcq_empty +#define nxt_cq_init nxt_nvbcq_init +#define NXT_CQ_SIZE NXT_NVBCQ_SIZE +#endif + +typedef struct { + int id; + uint64_t enq; + uint64_t deq; + uint64_t wait_enq; + uint64_t wait_deq; + uint64_t own_res; + uint64_t cas; + uint64_t faa; + +#if NXT_QTEST_USE_THREAD + nxt_thread_handle_t handle; +#else + nxt_pid_t pid; + int status; +#endif +} nxt_worker_info_t; + + +static void +cas_worker(void *p) +{ + nxt_cq_t *q; + uint32_t c; + uintptr_t i; + nxt_worker_info_t *wi; + + q = pgq; + wi = p; + + for (i = 0; i < nops / nprocs_cas; i++) { + c = q->counter; + + if (nxt_atomic_cmp_set(&q->counter, c, c + 1)) { + ++wi->cas; + } + } +} + + +static void +faa_worker(void *p) +{ + nxt_cq_t *q; + uintptr_t i; + nxt_worker_info_t *wi; + + q = pgq; + wi = p; + + for (i = 0; i < nops / nprocs_faa; i++) { + nxt_atomic_fetch_add(&q->counter, 1); + wi->faa++; + } +} + + +static void +enq_deq_worker(void *p) +{ + nxt_cq_t *q; + uintptr_t i, v; + nxt_worker_info_t *wi; + + q = pgq; + wi = p; + + for (i = 0; i < nops / nprocs_enq_deq; i++) { + v = nxt_cq_dequeue(&q->free_queue); + + if (v != nxt_cq_empty(&q->free_queue)) { + nxt_cq_enqueue(&q->active_queue, wi->id); + wi->enq++; + } + + v = nxt_cq_dequeue(&q->active_queue); + + if (v != nxt_cq_empty(&q->active_queue)) { + nxt_cq_enqueue(&q->free_queue, v); + wi->deq++; + + if ((int) v == wi->id) { + wi->own_res++; + } + } + } +} + + +static void +enq_worker(void *p) +{ + nxt_cq_t *q; + uintptr_t i, v; + nxt_worker_info_t *wi; + + q = pgq; + wi = p; + + for (i = 0; i < nops / nprocs_enq; i++) { + v = nxt_cq_dequeue(&q->free_queue); + + if (v != nxt_cq_empty(&q->free_queue)) { + nxt_cq_enqueue(&q->active_queue, v); + wi->enq++; + } + } +} + + +static void +deq_worker(void *p) +{ + nxt_cq_t *q; + uintptr_t i, v; + nxt_worker_info_t *wi; + + q = pgq; + wi = p; + + for (i = 0; i < nops / nprocs_deq; i++) { + v = nxt_cq_dequeue(&q->active_queue); + + if (v != nxt_cq_empty(&q->active_queue)) { + nxt_cq_enqueue(&q->free_queue, v); + ++wi->deq; + } + } +} + + +static void +wenq_worker(void *p) +{ + nxt_cq_t *q; + uintptr_t i, v; + nxt_worker_info_t *wi; + + q = pgq; + wi = p; + + for (i = 0; i < nops / nprocs_wenq; i++) { + + do { + wi->wait_enq++; + v = nxt_cq_dequeue(&q->free_queue); + } while (v == nxt_cq_empty(&q->free_queue)); + + nxt_cq_enqueue(&q->active_queue, v); + + wi->enq++; + wi->wait_enq--; + } +} + + +static void +wdeq_worker(void *p) +{ + nxt_cq_t *q; + uintptr_t i, v; + nxt_worker_info_t *wi; + + q = pgq; + wi = p; + + for (i = 0; i < nops / nprocs_wdeq; i++) { + + do { + wi->wait_deq++; + v = nxt_cq_dequeue(&q->active_queue); + } while (v == nxt_cq_empty(&q->active_queue)); + + nxt_cq_enqueue(&q->free_queue, v); + + wi->deq++; + wi->wait_deq--; + } +} + + +static nxt_int_t +worker_create(nxt_worker_info_t *wi, int id, nxt_thread_start_t start) +{ + wi->id = id; + +#if NXT_QTEST_USE_THREAD + nxt_thread_link_t *link; + + link = nxt_zalloc(sizeof(nxt_thread_link_t)); + + link->start = start; + link->work.data = wi; + + return nxt_thread_create(&wi->handle, link); + +#else + pid_t pid = fork(); + + if (pid == 0) { + start(wi); + exit(0); + + } else { + wi->pid = pid; + } + + return NXT_OK; +#endif +} + + +static void +worker_wait(nxt_worker_info_t *wi) +{ +#if NXT_QTEST_USE_THREAD + pthread_join(wi->handle, NULL); + +#else + waitpid(wi->pid, &wi->status, 0); +#endif +} + + +int nxt_cdecl +main(int argc, char **argv) +{ + int i, k, id, verbose, objective, rk; + char *a; + size_t start, elapsed; + double *stats, m, c; + uint64_t total_ops; + uintptr_t j; + nxt_task_t task; + nxt_thread_t *thr; + nxt_worker_info_t *wi; + double times[MAX_ITER], mopsec[MAX_ITER]; + + verbose = 0; + objective = 0; + + for (i = 1; i < argc; i++) { + a = argv[i]; + + if (strcmp(a, "-v") == 0) { + verbose++; + continue; + } + + if (strcmp(a, "-n") == 0 && (i + 1) < argc) { + nops = atoi(argv[++i]); + continue; + } + + if (strcmp(a, "--enq") == 0 && (i + 1) < argc) { + nprocs_enq = atoi(argv[++i]); + continue; + } + + if (strcmp(a, "--deq") == 0 && (i + 1) < argc) { + nprocs_deq = atoi(argv[++i]); + continue; + } + + if (strcmp(a, "--wenq") == 0 && (i + 1) < argc) { + nprocs_wenq = atoi(argv[++i]); + continue; + } + + if (strcmp(a, "--wdeq") == 0 && (i + 1) < argc) { + nprocs_wdeq = atoi(argv[++i]); + continue; + } + + if (strcmp(a, "--ed") == 0 && (i + 1) < argc) { + nprocs_enq_deq = atoi(argv[++i]); + continue; + } + + if (strcmp(a, "--cas") == 0 && (i + 1) < argc) { + nprocs_cas = atoi(argv[++i]); + continue; + } + + if (strcmp(a, "--faa") == 0 && (i + 1) < argc) { + nprocs_faa = atoi(argv[++i]); + continue; + } + + if (strcmp(a, "--obj") == 0 && (i + 1) < argc) { + objective = atoi(argv[++i]); + continue; + } + + printf("unknown option %s", a); + + return 1; + } + + if (nxt_lib_start("ncq_test", argv, &environ) != NXT_OK) { + return 1; + } + + nprocs = nprocs_enq + nprocs_deq + nprocs_wenq + nprocs_wdeq + + nprocs_enq_deq + nprocs_cas + nprocs_faa; + + if (nprocs == 0) { + return 0; + } + + nxt_main_log.level = NXT_LOG_INFO; + task.log = &nxt_main_log; + + thr = nxt_thread(); + thr->task = &task; + + pgq = mmap(NULL, sizeof(nxt_cq_t), PROT_READ | PROT_WRITE, + MAP_ANON | MAP_SHARED, -1, 0); + if (pgq == MAP_FAILED) { + return 2; + } + + nxt_cq_init(&pgq->free_queue); + nxt_cq_init(&pgq->active_queue); + + for(i = 0; i < NXT_CQ_SIZE; i++) { + nxt_cq_enqueue(&pgq->free_queue, i); + } + + if (verbose >= 1) { + printf("number of workers: %d\n", (int) nprocs); + printf("number of ops: %d\n", (int) nops); + } + + wi = mmap(NULL, nprocs * sizeof(nxt_worker_info_t), PROT_READ | PROT_WRITE, + MAP_ANON | MAP_SHARED, -1, 0); + if (wi == MAP_FAILED) { + return 3; + } + + for (k = 0; k < MAX_ITER; k++) { + nxt_memzero(wi, nprocs * sizeof(nxt_worker_info_t)); + + nxt_cq_init(&pgq->free_queue); + nxt_cq_init(&pgq->active_queue); + + for(i = 0; i < NXT_CQ_SIZE; i++) { + nxt_cq_enqueue(&pgq->free_queue, i); + } + + start = elapsed_time(0); + + id = 0; + + for (j = 0; j < nprocs_enq; j++, id++) { + worker_create(wi + id, id, enq_worker); + } + + for (j = 0; j < nprocs_deq; j++, id++) { + worker_create(wi + id, id, deq_worker); + } + + for (j = 0; j < nprocs_wenq; j++, id++) { + worker_create(wi + id, id, wenq_worker); + } + + for (j = 0; j < nprocs_wdeq; j++, id++) { + worker_create(wi + id, id, wdeq_worker); + } + + for (j = 0; j < nprocs_enq_deq; j++, id++) { + worker_create(wi + id, id, enq_deq_worker); + } + + for (j = 0; j < nprocs_cas; j++, id++) { + worker_create(wi + id, id, cas_worker); + } + + for (j = 0; j < nprocs_faa; j++, id++) { + worker_create(wi + id, id, faa_worker); + } + + for (j = 0; j < nprocs; j++) { + worker_wait(wi + j); + } + + elapsed = elapsed_time(start); + + for (j = 1; j < nprocs; j++) { + wi[0].enq += wi[j].enq; + wi[0].deq += wi[j].deq; + wi[0].wait_enq += wi[j].wait_enq; + wi[0].wait_deq += wi[j].wait_deq; + wi[0].own_res += wi[j].own_res; + wi[0].cas += wi[j].cas; + wi[0].faa += wi[j].faa; + } + + total_ops = wi[0].enq + wi[0].deq + wi[0].cas + wi[0].faa; + + if (total_ops == 0) { + total_ops = nops; + } + + times[k] = elapsed / 1000.0; + mopsec[k] = (double) total_ops / elapsed; + + if (verbose >= 2) { + printf("enq %10"PRIu64"\n", wi[0].enq); + printf("deq %10"PRIu64"\n", wi[0].deq); + printf("wait_enq %10"PRIu64"\n", wi[0].wait_enq); + printf("wait_deq %10"PRIu64"\n", wi[0].wait_deq); + printf("own_res %10"PRIu64"\n", wi[0].own_res); + printf("cas %10"PRIu64"\n", wi[0].cas); + printf("faa %10"PRIu64"\n", wi[0].faa); + printf("total ops %10"PRIu64"\n", total_ops); + printf("Mops/sec %13.2f\n", mopsec[k]); + + printf("elapsed %10d us\n", (int) elapsed); + printf("per op %10d ns\n", (int) ((1000 * elapsed) / total_ops)); + } + + if (k >= STAT_ITER) { + stats = (objective == 0) ? times : mopsec; + + m = mean(stats + k - STAT_ITER, STAT_ITER); + c = cov(stats + k - STAT_ITER, m, STAT_ITER); + + if (verbose >= 1) { + if (objective == 0) { + printf(" #%02d elapsed time: %.2f ms; Mops/sec %.2f; " + "mean time %.2f ms; cov %.4f\n", + (int) k + 1, times[k], mopsec[k], m, c); + + } else { + printf(" #%02d elapsed time: %.2f ms; Mops/sec %.2f; " + "mean Mop/sec %.2f; cov %.4f\n", + (int) k + 1, times[k], mopsec[k], m, c); + } + } + + if (c < MIN_COV) { + rk = k - STAT_ITER; + + for (i = rk + 1; i <= k; i++) { + if (fabs(stats[i] - m) < fabs(stats[rk] - m)) { + rk = i; + } + } + + printf("#%d %.2f ms; %.2f\n", rk, times[rk], mopsec[rk]); + + return 0; + } + + } else { + if (verbose >= 1) { + printf(" #%02d elapsed time: %.2f ms; Mops/sec %.2f\n", + (int) k + 1, times[k], mopsec[k]); + } + } + } + + return 0; +} |