diff options
Diffstat (limited to 'src/nxt_upstream_round_robin.c')
-rw-r--r-- | src/nxt_upstream_round_robin.c | 200 |
1 files changed, 200 insertions, 0 deletions
diff --git a/src/nxt_upstream_round_robin.c b/src/nxt_upstream_round_robin.c new file mode 100644 index 00000000..f8035762 --- /dev/null +++ b/src/nxt_upstream_round_robin.c @@ -0,0 +1,200 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) NGINX, Inc. + */ + +#include <nxt_main.h> + + +typedef struct { + int32_t weight; + int32_t effective_weight; + int32_t current_weight; + uint32_t down; /* 1 bit */ + nxt_msec_t last_accessed; + nxt_sockaddr_t *sockaddr; +} nxt_upstream_round_robin_peer_t; + + +typedef struct { + nxt_uint_t npeers; + nxt_upstream_round_robin_peer_t *peers; + nxt_thread_spinlock_t lock; +} nxt_upstream_round_robin_t; + + +static void nxt_upstream_round_robin_create(nxt_thread_t *thr, void *obj, + void *data); +static void nxt_upstream_round_robin_peer_error(nxt_thread_t *thr, void *obj, + void *data); +static void nxt_upstream_round_robin_get_peer(nxt_upstream_peer_t *up); + + +void +nxt_upstream_round_robin_peer(nxt_upstream_peer_t *up) +{ + nxt_job_sockaddr_parse_t *jbs; + + if (up->upstream != NULL) { + nxt_upstream_round_robin_get_peer(up); + } + + jbs = nxt_job_create(up->mem_pool, sizeof(nxt_job_sockaddr_parse_t)); + if (nxt_slow_path(jbs == NULL)) { + up->ready_handler(up); + return; + } + + jbs->resolve.job.data = up; + jbs->resolve.port = up->port; + jbs->resolve.log_level = NXT_LOG_ERR; + jbs->resolve.ready_handler = nxt_upstream_round_robin_create; + jbs->resolve.error_handler = nxt_upstream_round_robin_peer_error; + jbs->addr = up->addr; + + nxt_job_sockaddr_parse(jbs); +} + + +static void +nxt_upstream_round_robin_create(nxt_thread_t *thr, void *obj, void *data) +{ + nxt_uint_t i; + nxt_sockaddr_t *sa; + nxt_upstream_peer_t *up; + nxt_job_sockaddr_parse_t *jbs; + nxt_upstream_round_robin_t *urr; + nxt_upstream_round_robin_peer_t *peer; + + jbs = obj; + up = jbs->resolve.job.data; + + urr = nxt_mem_zalloc(up->mem_pool, sizeof(nxt_upstream_round_robin_t)); + if (nxt_slow_path(urr == NULL)) { + goto fail; + } + + urr->npeers = jbs->resolve.count; + + peer = nxt_mem_zalloc(up->mem_pool, + urr->npeers * sizeof(nxt_upstream_round_robin_peer_t)); + if (nxt_slow_path(peer == NULL)) { + goto fail; + } + + urr->peers = peer; + + for (i = 0; i < urr->npeers; i++) { + peer[i].weight = 1; + peer[i].effective_weight = 1; + + sa = jbs->resolve.sockaddrs[i]; + + /* STUB */ + sa->type = SOCK_STREAM; + + /* TODO: test ret */ + (void) nxt_sockaddr_text(up->mem_pool, sa, 1); + + nxt_log_debug(thr->log, "upstream peer: %*s", sa->text_len, sa->text); + + /* TODO: memcpy to shared memory pool. */ + peer[i].sockaddr = sa; + } + + up->upstream = urr; + + /* STUB */ + up->sockaddr = peer[0].sockaddr; + + nxt_job_destroy(jbs); + up->ready_handler(up); + + //nxt_upstream_round_robin_get_peer(up); + return; + +fail: + + nxt_job_destroy(jbs); + + up->ready_handler(up); +} + + +static void +nxt_upstream_round_robin_peer_error(nxt_thread_t *thr, void *obj, void *data) +{ + nxt_upstream_peer_t *up; + nxt_job_sockaddr_parse_t *jbs; + + jbs = obj; + up = jbs->resolve.job.data; + + up->ready_handler(up); +} + + +static void +nxt_upstream_round_robin_get_peer(nxt_upstream_peer_t *up) +{ + int32_t effective_weights; + nxt_uint_t i; + nxt_msec_t now; + nxt_event_engine_t *engine; + nxt_upstream_round_robin_t *urr; + nxt_upstream_round_robin_peer_t *peer, *best; + + urr = up->upstream; + + engine = nxt_thread_event_engine(); + now = engine->timers.now; + + nxt_thread_spin_lock(&urr->lock); + + best = NULL; + effective_weights = 0; + peer = urr->peers; + + for (i = 0; i < urr->npeers; i++) { + + if (peer[i].down) { + continue; + } + +#if 0 + if (peer[i].max_fails != 0 && peer[i].fails >= peer->max_fails) { + good = peer[i].last_accessed + peer[i].fail_timeout; + + if (nxt_msec_diff(now, peer[i].last_accessed) <= 0) { + continue; + } + } +#endif + + peer[i].current_weight += peer[i].effective_weight; + effective_weights += peer[i].effective_weight; + + if (peer[i].effective_weight < peer[i].weight) { + peer[i].effective_weight++; + } + + if (best == NULL || peer[i].current_weight > best->current_weight) { + best = &peer[i]; + } + } + + if (best != NULL) { + best->current_weight -= effective_weights; + best->last_accessed = now; + + up->sockaddr = best->sockaddr; + + } else { + up->sockaddr = NULL; + } + + nxt_thread_spin_unlock(&urr->lock); + + up->ready_handler(up); +} |