diff options
Diffstat (limited to 'src/nxt_upstream_round_robin.c')
-rw-r--r-- | src/nxt_upstream_round_robin.c | 257 |
1 files changed, 124 insertions, 133 deletions
diff --git a/src/nxt_upstream_round_robin.c b/src/nxt_upstream_round_robin.c index 09a3bce3..fd76ecb5 100644 --- a/src/nxt_upstream_round_robin.c +++ b/src/nxt_upstream_round_robin.c @@ -4,197 +4,188 @@ * Copyright (C) NGINX, Inc. */ -#include <nxt_main.h> +#include <nxt_router.h> +#include <nxt_http.h> +#include <nxt_upstream.h> -typedef struct { - int32_t weight; - int32_t effective_weight; - int32_t current_weight; - uint32_t down; /* 1 bit */ - nxt_msec_t last_accessed; - nxt_sockaddr_t *sockaddr; -} nxt_upstream_round_robin_peer_t; +struct nxt_upstream_round_robin_server_s { + nxt_sockaddr_t *sockaddr; + int32_t current_weight; + int32_t effective_weight; + int32_t weight; -typedef struct { - nxt_uint_t npeers; - nxt_upstream_round_robin_peer_t *peers; - nxt_thread_spinlock_t lock; -} nxt_upstream_round_robin_t; + uint8_t protocol; +}; -static void nxt_upstream_round_robin_create(nxt_task_t *task, void *obj, - void *data); -static void nxt_upstream_round_robin_peer_error(nxt_task_t *task, void *obj, - void *data); -static void nxt_upstream_round_robin_get_peer(nxt_task_t *task, - nxt_upstream_peer_t *up); +struct nxt_upstream_round_robin_s { + uint32_t items; + nxt_upstream_round_robin_server_t server[0]; +}; -void -nxt_upstream_round_robin_peer(nxt_task_t *task, nxt_upstream_peer_t *up) -{ - nxt_job_sockaddr_parse_t *jbs; +static nxt_upstream_t *nxt_upstream_round_robin_joint_create( + nxt_router_temp_conf_t *tmcf, nxt_upstream_t *upstream); +static void nxt_upstream_round_robin_server_get(nxt_task_t *task, + nxt_upstream_server_t *us); - if (up->upstream != NULL) { - nxt_upstream_round_robin_get_peer(task, up); - } - jbs = nxt_job_create(up->mem_pool, sizeof(nxt_job_sockaddr_parse_t)); - if (nxt_slow_path(jbs == NULL)) { - up->ready_handler(task, up); - return; - } +static const nxt_upstream_server_proto_t nxt_upstream_round_robin_proto = { + .joint_create = nxt_upstream_round_robin_joint_create, + .get = nxt_upstream_round_robin_server_get, +}; - jbs->resolve.job.task = task; - jbs->resolve.job.data = up; - jbs->resolve.port = up->port; - jbs->resolve.log_level = NXT_LOG_ERR; - jbs->resolve.ready_handler = nxt_upstream_round_robin_create; - jbs->resolve.error_handler = nxt_upstream_round_robin_peer_error; - jbs->addr = up->addr; - nxt_job_sockaddr_parse(jbs); -} +static nxt_conf_map_t nxt_upstream_round_robin_server_conf[] = { + { + nxt_string("weight"), + NXT_CONF_MAP_INT32, + offsetof(nxt_upstream_round_robin_server_t, weight), + }, +}; -static void -nxt_upstream_round_robin_create(nxt_task_t *task, void *obj, void *data) +nxt_int_t +nxt_upstream_round_robin_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, + nxt_conf_value_t *upstream_conf, nxt_upstream_t *upstream) { - nxt_uint_t i; - nxt_sockaddr_t *sa; - nxt_upstream_peer_t *up; - nxt_job_sockaddr_parse_t *jbs; - nxt_upstream_round_robin_t *urr; - nxt_upstream_round_robin_peer_t *peer; + size_t size; + uint32_t i, n, next; + nxt_mp_t *mp; + nxt_str_t name; + nxt_sockaddr_t *sa; + nxt_conf_value_t *servers_conf, *srvcf; + nxt_upstream_round_robin_t *urr; - jbs = obj; - up = jbs->resolve.job.data; + static nxt_str_t servers = nxt_string("servers"); - urr = nxt_mp_zget(up->mem_pool, sizeof(nxt_upstream_round_robin_t)); - if (nxt_slow_path(urr == NULL)) { - goto fail; - } + mp = tmcf->router_conf->mem_pool; - urr->npeers = jbs->resolve.count; + servers_conf = nxt_conf_get_object_member(upstream_conf, &servers, NULL); + n = nxt_conf_object_members_count(servers_conf); - peer = nxt_mp_zget(up->mem_pool, - urr->npeers * sizeof(nxt_upstream_round_robin_peer_t)); - if (nxt_slow_path(peer == NULL)) { - goto fail; + size = sizeof(nxt_upstream_round_robin_t) + + n * sizeof(nxt_upstream_round_robin_server_t); + + urr = nxt_mp_zalloc(mp, size); + if (nxt_slow_path(urr == NULL)) { + return NXT_ERROR; } - urr->peers = peer; + urr->items = n; + next = 0; - for (i = 0; i < urr->npeers; i++) { - peer[i].weight = 1; - peer[i].effective_weight = 1; + for (i = 0; i < n; i++) { + srvcf = nxt_conf_next_object_member(servers_conf, &name, &next); - sa = jbs->resolve.sockaddrs[i]; + sa = nxt_sockaddr_parse(mp, &name); + if (nxt_slow_path(sa == NULL)) { + return NXT_ERROR; + } - /* STUB */ sa->type = SOCK_STREAM; - nxt_sockaddr_text(sa); + urr->server[i].sockaddr = sa; + urr->server[i].weight = 1; + urr->server[i].protocol = NXT_HTTP_PROTO_H1; - nxt_debug(task, "upstream peer: %*s", - (size_t) sa->length, nxt_sockaddr_start(sa)); + nxt_conf_map_object(mp, srvcf, nxt_upstream_round_robin_server_conf, + nxt_nitems(nxt_upstream_round_robin_server_conf), + &urr->server[i]); - /* TODO: memcpy to shared memory pool. */ - peer[i].sockaddr = sa; + urr->server[i].effective_weight = urr->server[i].weight; } - up->upstream = urr; + upstream->proto = &nxt_upstream_round_robin_proto; + upstream->type.round_robin = urr; - /* STUB */ - up->sockaddr = peer[0].sockaddr; + return NXT_OK; +} - nxt_job_destroy(task, jbs); - up->ready_handler(task, up); - //nxt_upstream_round_robin_get_peer(up); - return; +static nxt_upstream_t * +nxt_upstream_round_robin_joint_create(nxt_router_temp_conf_t *tmcf, + nxt_upstream_t *upstream) +{ + size_t size; + uint32_t i, n; + nxt_mp_t *mp; + nxt_upstream_t *u; + nxt_upstream_round_robin_t *urr, *urrcf; -fail: + mp = tmcf->router_conf->mem_pool; - nxt_job_destroy(task, jbs); + u = nxt_mp_alloc(mp, sizeof(nxt_upstream_t)); + if (nxt_slow_path(u == NULL)) { + return NULL; + } - up->ready_handler(task, up); -} + *u = *upstream; + urrcf = upstream->type.round_robin; -static void -nxt_upstream_round_robin_peer_error(nxt_task_t *task, void *obj, void *data) -{ - nxt_upstream_peer_t *up; - nxt_job_sockaddr_parse_t *jbs; + size = sizeof(nxt_upstream_round_robin_t) + + urrcf->items * sizeof(nxt_upstream_round_robin_server_t); - jbs = obj; - up = jbs->resolve.job.data; + urr = nxt_mp_alloc(mp, size); + if (nxt_slow_path(urr == NULL)) { + return NULL; + } - up->ready_handler(task, up); -} + u->type.round_robin = urr; + n = urrcf->items; + urr->items = n; -static void -nxt_upstream_round_robin_get_peer(nxt_task_t *task, nxt_upstream_peer_t *up) -{ - int32_t effective_weights; - nxt_uint_t i; - nxt_msec_t now; - nxt_upstream_round_robin_t *urr; - nxt_upstream_round_robin_peer_t *peer, *best; + for (i = 0; i < n; i++) { + urr->server[i] = urrcf->server[i]; + } - urr = up->upstream; + return u; +} - now = task->thread->engine->timers.now; - nxt_thread_spin_lock(&urr->lock); +static void +nxt_upstream_round_robin_server_get(nxt_task_t *task, nxt_upstream_server_t *us) +{ + int32_t total; + uint32_t i, n; + nxt_upstream_round_robin_t *round_robin; + nxt_upstream_round_robin_server_t *s, *best; best = NULL; - effective_weights = 0; - peer = urr->peers; + total = 0; - for (i = 0; i < urr->npeers; i++) { - - if (peer[i].down) { - continue; - } + round_robin = us->upstream->type.round_robin; -#if 0 - if (peer[i].max_fails != 0 && peer[i].fails >= peer->max_fails) { - good = peer[i].last_accessed + peer[i].fail_timeout; + s = round_robin->server; + n = round_robin->items; - if (nxt_msec_diff(now, peer[i].last_accessed) <= 0) { - continue; - } - } -#endif + for (i = 0; i < n; i++) { - peer[i].current_weight += peer[i].effective_weight; - effective_weights += peer[i].effective_weight; + s[i].current_weight += s[i].effective_weight; + total += s[i].effective_weight; - if (peer[i].effective_weight < peer[i].weight) { - peer[i].effective_weight++; + if (s[i].effective_weight < s[i].weight) { + s[i].effective_weight++; } - if (best == NULL || peer[i].current_weight > best->current_weight) { - best = &peer[i]; + if (best == NULL || s[i].current_weight > best->current_weight) { + best = &s[i]; } } - if (best != NULL) { - best->current_weight -= effective_weights; - best->last_accessed = now; - - up->sockaddr = best->sockaddr; - - } else { - up->sockaddr = NULL; + if (best == NULL) { + us->state->error(task, us); + return; } - nxt_thread_spin_unlock(&urr->lock); + best->current_weight -= total; + us->sockaddr = best->sockaddr; + us->protocol = best->protocol; + us->server.round_robin = best; - up->ready_handler(task, up); + us->state->ready(task, us); } |