/* * Copyright (C) NGINX, Inc. */ #include <nxt_main.h> #include <math.h> #include <inttypes.h> #ifndef NXT_NCQ_TEST #define NXT_NCQ_TEST 1 #endif #define NXT_QTEST_USE_THREAD 0 #if NXT_NCQ_TEST #include <nxt_nncq.h> #else #include <nxt_nvbcq.h> #endif #define MAX_ITER 20 #define STAT_ITER 5 #define MIN_COV 0.02 extern char **environ; static uintptr_t nops = 10000000; static uintptr_t nprocs_enq = 0; static uintptr_t nprocs_deq = 0; static uintptr_t nprocs_wenq = 0; static uintptr_t nprocs_wdeq = 0; static uintptr_t nprocs_enq_deq = 0; static uintptr_t nprocs_cas = 0; static uintptr_t nprocs_faa = 0; static uintptr_t nprocs = 1; static size_t elapsed_time(size_t us) { struct timeval t; gettimeofday(&t, NULL); return t.tv_sec * 1000000 + t.tv_usec - us; } static double mean(const double *times, int n) { int i; double sum; sum = 0; for (i = 0; i < n; i++) { sum += times[i]; } return sum / n; } static double cov(const double *times, double mean, int n) { int i; double variance; variance = 0; for (i = 0; i < n; i++) { variance += (times[i] - mean) * (times[i] - mean); } variance /= n; return sqrt(variance) / mean; } typedef struct { #if NXT_NCQ_TEST nxt_nncq_t free_queue; nxt_nncq_t active_queue; #else nxt_nvbcq_t free_queue; nxt_nvbcq_t active_queue; #endif uint32_t counter; } nxt_cq_t; static nxt_cq_t *pgq; #if NXT_NCQ_TEST #define nxt_cq_enqueue nxt_nncq_enqueue #define nxt_cq_dequeue nxt_nncq_dequeue #define nxt_cq_empty nxt_nncq_empty #define nxt_cq_init nxt_nncq_init #define NXT_CQ_SIZE NXT_NNCQ_SIZE #else #define nxt_cq_enqueue nxt_nvbcq_enqueue #define nxt_cq_dequeue nxt_nvbcq_dequeue #define nxt_cq_empty nxt_nvbcq_empty #define nxt_cq_init nxt_nvbcq_init #define NXT_CQ_SIZE NXT_NVBCQ_SIZE #endif typedef struct { int id; uint64_t enq; uint64_t deq; uint64_t wait_enq; uint64_t wait_deq; uint64_t own_res; uint64_t cas; uint64_t faa; #if NXT_QTEST_USE_THREAD nxt_thread_handle_t handle; #else nxt_pid_t pid; int status; #endif } nxt_worker_info_t; static void cas_worker(void *p) { nxt_cq_t *q; uint32_t c; uintptr_t i; nxt_worker_info_t *wi; q = pgq; wi = p; for (i = 0; i < nops / nprocs_cas; i++) { c = q->counter; if (nxt_atomic_cmp_set(&q->counter, c, c + 1)) { ++wi->cas; } } } static void faa_worker(void *p) { nxt_cq_t *q; uintptr_t i; nxt_worker_info_t *wi; q = pgq; wi = p; for (i = 0; i < nops / nprocs_faa; i++) { nxt_atomic_fetch_add(&q->counter, 1); wi->faa++; } } static void enq_deq_worker(void *p) { nxt_cq_t *q; uintptr_t i, v; nxt_worker_info_t *wi; q = pgq; wi = p; for (i = 0; i < nops / nprocs_enq_deq; i++) { v = nxt_cq_dequeue(&q->free_queue); if (v != nxt_cq_empty(&q->free_queue)) { nxt_cq_enqueue(&q->active_queue, wi->id); wi->enq++; } v = nxt_cq_dequeue(&q->active_queue); if (v != nxt_cq_empty(&q->active_queue)) { nxt_cq_enqueue(&q->free_queue, v); wi->deq++; if ((int) v == wi->id) { wi->own_res++; } } } } static void enq_worker(void *p) { nxt_cq_t *q; uintptr_t i, v; nxt_worker_info_t *wi; q = pgq; wi = p; for (i = 0; i < nops / nprocs_enq; i++) { v = nxt_cq_dequeue(&q->free_queue); if (v != nxt_cq_empty(&q->free_queue)) { nxt_cq_enqueue(&q->active_queue, v); wi->enq++; } } } static void deq_worker(void *p) { nxt_cq_t *q; uintptr_t i, v; nxt_worker_info_t *wi; q = pgq; wi = p; for (i = 0; i < nops / nprocs_deq; i++) { v = nxt_cq_dequeue(&q->active_queue); if (v != nxt_cq_empty(&q->active_queue)) { nxt_cq_enqueue(&q->free_queue, v); ++wi->deq; } } } static void wenq_worker(void *p) { nxt_cq_t *q; uintptr_t i, v; nxt_worker_info_t *wi; q = pgq; wi = p; for (i = 0; i < nops / nprocs_wenq; i++) { do { wi->wait_enq++; v = nxt_cq_dequeue(&q->free_queue); } while (v == nxt_cq_empty(&q->free_queue)); nxt_cq_enqueue(&q->active_queue, v); wi->enq++; wi->wait_enq--; } } static void wdeq_worker(void *p) { nxt_cq_t *q; uintptr_t i, v; nxt_worker_info_t *wi; q = pgq; wi = p; for (i = 0; i < nops / nprocs_wdeq; i++) { do { wi->wait_deq++; v = nxt_cq_dequeue(&q->active_queue); } while (v == nxt_cq_empty(&q->active_queue)); nxt_cq_enqueue(&q->free_queue, v); wi->deq++; wi->wait_deq--; } } static nxt_int_t worker_create(nxt_worker_info_t *wi, int id, nxt_thread_start_t start) { wi->id = id; #if NXT_QTEST_USE_THREAD nxt_thread_link_t *link; link = nxt_zalloc(sizeof(nxt_thread_link_t)); link->start = start; link->work.data = wi; return nxt_thread_create(&wi->handle, link); #else pid_t pid = fork(); if (pid == 0) { start(wi); exit(0); } else { wi->pid = pid; } return NXT_OK; #endif } static void worker_wait(nxt_worker_info_t *wi) { #if NXT_QTEST_USE_THREAD pthread_join(wi->handle, NULL); #else waitpid(wi->pid, &wi->status, 0); #endif } int nxt_cdecl main(int argc, char **argv) { int i, k, id, verbose, objective, rk; char *a; size_t start, elapsed; double *stats, m, c; uint64_t total_ops; uintptr_t j; nxt_task_t task; nxt_thread_t *thr; nxt_worker_info_t *wi; double times[MAX_ITER], mopsec[MAX_ITER]; verbose = 0; objective = 0; for (i = 1; i < argc; i++) { a = argv[i]; if (strcmp(a, "-v") == 0) { verbose++; continue; } if (strcmp(a, "-n") == 0 && (i + 1) < argc) { nops = atoi(argv[++i]); continue; } if (strcmp(a, "--enq") == 0 && (i + 1) < argc) { nprocs_enq = atoi(argv[++i]); continue; } if (strcmp(a, "--deq") == 0 && (i + 1) < argc) { nprocs_deq = atoi(argv[++i]); continue; } if (strcmp(a, "--wenq") == 0 && (i + 1) < argc) { nprocs_wenq = atoi(argv[++i]); continue; } if (strcmp(a, "--wdeq") == 0 && (i + 1) < argc) { nprocs_wdeq = atoi(argv[++i]); continue; } if (strcmp(a, "--ed") == 0 && (i + 1) < argc) { nprocs_enq_deq = atoi(argv[++i]); continue; } if (strcmp(a, "--cas") == 0 && (i + 1) < argc) { nprocs_cas = atoi(argv[++i]); continue; } if (strcmp(a, "--faa") == 0 && (i + 1) < argc) { nprocs_faa = atoi(argv[++i]); continue; } if (strcmp(a, "--obj") == 0 && (i + 1) < argc) { objective = atoi(argv[++i]); continue; } printf("unknown option %s", a); return 1; } if (nxt_lib_start("ncq_test", argv, &environ) != NXT_OK) { return 1; } nprocs = nprocs_enq + nprocs_deq + nprocs_wenq + nprocs_wdeq + nprocs_enq_deq + nprocs_cas + nprocs_faa; if (nprocs == 0) { return 0; } nxt_main_log.level = NXT_LOG_INFO; task.log = &nxt_main_log; thr = nxt_thread(); thr->task = &task; pgq = mmap(NULL, sizeof(nxt_cq_t), PROT_READ | PROT_WRITE, MAP_ANON | MAP_SHARED, -1, 0); if (pgq == MAP_FAILED) { return 2; } nxt_cq_init(&pgq->free_queue); nxt_cq_init(&pgq->active_queue); for(i = 0; i < NXT_CQ_SIZE; i++) { nxt_cq_enqueue(&pgq->free_queue, i); } if (verbose >= 1) { printf("number of workers: %d\n", (int) nprocs); printf("number of ops: %d\n", (int) nops); } wi = mmap(NULL, nprocs * sizeof(nxt_worker_info_t), PROT_READ | PROT_WRITE, MAP_ANON | MAP_SHARED, -1, 0); if (wi == MAP_FAILED) { return 3; } for (k = 0; k < MAX_ITER; k++) { nxt_memzero(wi, nprocs * sizeof(nxt_worker_info_t)); nxt_cq_init(&pgq->free_queue); nxt_cq_init(&pgq->active_queue); for(i = 0; i < NXT_CQ_SIZE; i++) { nxt_cq_enqueue(&pgq->free_queue, i); } start = elapsed_time(0); id = 0; for (j = 0; j < nprocs_enq; j++, id++) { worker_create(wi + id, id, enq_worker); } for (j = 0; j < nprocs_deq; j++, id++) { worker_create(wi + id, id, deq_worker); } for (j = 0; j < nprocs_wenq; j++, id++) { worker_create(wi + id, id, wenq_worker); } for (j = 0; j < nprocs_wdeq; j++, id++) { worker_create(wi + id, id, wdeq_worker); } for (j = 0; j < nprocs_enq_deq; j++, id++) { worker_create(wi + id, id, enq_deq_worker); } for (j = 0; j < nprocs_cas; j++, id++) { worker_create(wi + id, id, cas_worker); } for (j = 0; j < nprocs_faa; j++, id++) { worker_create(wi + id, id, faa_worker); } for (j = 0; j < nprocs; j++) { worker_wait(wi + j); } elapsed = elapsed_time(start); for (j = 1; j < nprocs; j++) { wi[0].enq += wi[j].enq; wi[0].deq += wi[j].deq; wi[0].wait_enq += wi[j].wait_enq; wi[0].wait_deq += wi[j].wait_deq; wi[0].own_res += wi[j].own_res; wi[0].cas += wi[j].cas; wi[0].faa += wi[j].faa; } total_ops = wi[0].enq + wi[0].deq + wi[0].cas + wi[0].faa; if (total_ops == 0) { total_ops = nops; } times[k] = elapsed / 1000.0; mopsec[k] = (double) total_ops / elapsed; if (verbose >= 2) { printf("enq %10"PRIu64"\n", wi[0].enq); printf("deq %10"PRIu64"\n", wi[0].deq); printf("wait_enq %10"PRIu64"\n", wi[0].wait_enq); printf("wait_deq %10"PRIu64"\n", wi[0].wait_deq); printf("own_res %10"PRIu64"\n", wi[0].own_res); printf("cas %10"PRIu64"\n", wi[0].cas); printf("faa %10"PRIu64"\n", wi[0].faa); printf("total ops %10"PRIu64"\n", total_ops); printf("Mops/sec %13.2f\n", mopsec[k]); printf("elapsed %10d us\n", (int) elapsed); printf("per op %10d ns\n", (int) ((1000 * elapsed) / total_ops)); } if (k >= STAT_ITER) { stats = (objective == 0) ? times : mopsec; m = mean(stats + k - STAT_ITER, STAT_ITER); c = cov(stats + k - STAT_ITER, m, STAT_ITER); if (verbose >= 1) { if (objective == 0) { printf(" #%02d elapsed time: %.2f ms; Mops/sec %.2f; " "mean time %.2f ms; cov %.4f\n", (int) k + 1, times[k], mopsec[k], m, c); } else { printf(" #%02d elapsed time: %.2f ms; Mops/sec %.2f; " "mean Mop/sec %.2f; cov %.4f\n", (int) k + 1, times[k], mopsec[k], m, c); } } if (c < MIN_COV) { rk = k - STAT_ITER; for (i = rk + 1; i <= k; i++) { if (fabs(stats[i] - m) < fabs(stats[rk] - m)) { rk = i; } } printf("#%d %.2f ms; %.2f\n", rk, times[rk], mopsec[rk]); return 0; } } else { if (verbose >= 1) { printf(" #%02d elapsed time: %.2f ms; Mops/sec %.2f\n", (int) k + 1, times[k], mopsec[k]); } } } return 0; }