summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_upstream_round_robin.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/nxt_upstream_round_robin.c257
1 files changed, 124 insertions, 133 deletions
diff --git a/src/nxt_upstream_round_robin.c b/src/nxt_upstream_round_robin.c
index 09a3bce3..fd76ecb5 100644
--- a/src/nxt_upstream_round_robin.c
+++ b/src/nxt_upstream_round_robin.c
@@ -4,197 +4,188 @@
* Copyright (C) NGINX, Inc.
*/
-#include <nxt_main.h>
+#include <nxt_router.h>
+#include <nxt_http.h>
+#include <nxt_upstream.h>
-typedef struct {
- int32_t weight;
- int32_t effective_weight;
- int32_t current_weight;
- uint32_t down; /* 1 bit */
- nxt_msec_t last_accessed;
- nxt_sockaddr_t *sockaddr;
-} nxt_upstream_round_robin_peer_t;
+struct nxt_upstream_round_robin_server_s {
+ nxt_sockaddr_t *sockaddr;
+ int32_t current_weight;
+ int32_t effective_weight;
+ int32_t weight;
-typedef struct {
- nxt_uint_t npeers;
- nxt_upstream_round_robin_peer_t *peers;
- nxt_thread_spinlock_t lock;
-} nxt_upstream_round_robin_t;
+ uint8_t protocol;
+};
-static void nxt_upstream_round_robin_create(nxt_task_t *task, void *obj,
- void *data);
-static void nxt_upstream_round_robin_peer_error(nxt_task_t *task, void *obj,
- void *data);
-static void nxt_upstream_round_robin_get_peer(nxt_task_t *task,
- nxt_upstream_peer_t *up);
+struct nxt_upstream_round_robin_s {
+ uint32_t items;
+ nxt_upstream_round_robin_server_t server[0];
+};
-void
-nxt_upstream_round_robin_peer(nxt_task_t *task, nxt_upstream_peer_t *up)
-{
- nxt_job_sockaddr_parse_t *jbs;
+static nxt_upstream_t *nxt_upstream_round_robin_joint_create(
+ nxt_router_temp_conf_t *tmcf, nxt_upstream_t *upstream);
+static void nxt_upstream_round_robin_server_get(nxt_task_t *task,
+ nxt_upstream_server_t *us);
- if (up->upstream != NULL) {
- nxt_upstream_round_robin_get_peer(task, up);
- }
- jbs = nxt_job_create(up->mem_pool, sizeof(nxt_job_sockaddr_parse_t));
- if (nxt_slow_path(jbs == NULL)) {
- up->ready_handler(task, up);
- return;
- }
+static const nxt_upstream_server_proto_t nxt_upstream_round_robin_proto = {
+ .joint_create = nxt_upstream_round_robin_joint_create,
+ .get = nxt_upstream_round_robin_server_get,
+};
- jbs->resolve.job.task = task;
- jbs->resolve.job.data = up;
- jbs->resolve.port = up->port;
- jbs->resolve.log_level = NXT_LOG_ERR;
- jbs->resolve.ready_handler = nxt_upstream_round_robin_create;
- jbs->resolve.error_handler = nxt_upstream_round_robin_peer_error;
- jbs->addr = up->addr;
- nxt_job_sockaddr_parse(jbs);
-}
+static nxt_conf_map_t nxt_upstream_round_robin_server_conf[] = {
+ {
+ nxt_string("weight"),
+ NXT_CONF_MAP_INT32,
+ offsetof(nxt_upstream_round_robin_server_t, weight),
+ },
+};
-static void
-nxt_upstream_round_robin_create(nxt_task_t *task, void *obj, void *data)
+nxt_int_t
+nxt_upstream_round_robin_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
+ nxt_conf_value_t *upstream_conf, nxt_upstream_t *upstream)
{
- nxt_uint_t i;
- nxt_sockaddr_t *sa;
- nxt_upstream_peer_t *up;
- nxt_job_sockaddr_parse_t *jbs;
- nxt_upstream_round_robin_t *urr;
- nxt_upstream_round_robin_peer_t *peer;
+ size_t size;
+ uint32_t i, n, next;
+ nxt_mp_t *mp;
+ nxt_str_t name;
+ nxt_sockaddr_t *sa;
+ nxt_conf_value_t *servers_conf, *srvcf;
+ nxt_upstream_round_robin_t *urr;
- jbs = obj;
- up = jbs->resolve.job.data;
+ static nxt_str_t servers = nxt_string("servers");
- urr = nxt_mp_zget(up->mem_pool, sizeof(nxt_upstream_round_robin_t));
- if (nxt_slow_path(urr == NULL)) {
- goto fail;
- }
+ mp = tmcf->router_conf->mem_pool;
- urr->npeers = jbs->resolve.count;
+ servers_conf = nxt_conf_get_object_member(upstream_conf, &servers, NULL);
+ n = nxt_conf_object_members_count(servers_conf);
- peer = nxt_mp_zget(up->mem_pool,
- urr->npeers * sizeof(nxt_upstream_round_robin_peer_t));
- if (nxt_slow_path(peer == NULL)) {
- goto fail;
+ size = sizeof(nxt_upstream_round_robin_t)
+ + n * sizeof(nxt_upstream_round_robin_server_t);
+
+ urr = nxt_mp_zalloc(mp, size);
+ if (nxt_slow_path(urr == NULL)) {
+ return NXT_ERROR;
}
- urr->peers = peer;
+ urr->items = n;
+ next = 0;
- for (i = 0; i < urr->npeers; i++) {
- peer[i].weight = 1;
- peer[i].effective_weight = 1;
+ for (i = 0; i < n; i++) {
+ srvcf = nxt_conf_next_object_member(servers_conf, &name, &next);
- sa = jbs->resolve.sockaddrs[i];
+ sa = nxt_sockaddr_parse(mp, &name);
+ if (nxt_slow_path(sa == NULL)) {
+ return NXT_ERROR;
+ }
- /* STUB */
sa->type = SOCK_STREAM;
- nxt_sockaddr_text(sa);
+ urr->server[i].sockaddr = sa;
+ urr->server[i].weight = 1;
+ urr->server[i].protocol = NXT_HTTP_PROTO_H1;
- nxt_debug(task, "upstream peer: %*s",
- (size_t) sa->length, nxt_sockaddr_start(sa));
+ nxt_conf_map_object(mp, srvcf, nxt_upstream_round_robin_server_conf,
+ nxt_nitems(nxt_upstream_round_robin_server_conf),
+ &urr->server[i]);
- /* TODO: memcpy to shared memory pool. */
- peer[i].sockaddr = sa;
+ urr->server[i].effective_weight = urr->server[i].weight;
}
- up->upstream = urr;
+ upstream->proto = &nxt_upstream_round_robin_proto;
+ upstream->type.round_robin = urr;
- /* STUB */
- up->sockaddr = peer[0].sockaddr;
+ return NXT_OK;
+}
- nxt_job_destroy(task, jbs);
- up->ready_handler(task, up);
- //nxt_upstream_round_robin_get_peer(up);
- return;
+static nxt_upstream_t *
+nxt_upstream_round_robin_joint_create(nxt_router_temp_conf_t *tmcf,
+ nxt_upstream_t *upstream)
+{
+ size_t size;
+ uint32_t i, n;
+ nxt_mp_t *mp;
+ nxt_upstream_t *u;
+ nxt_upstream_round_robin_t *urr, *urrcf;
-fail:
+ mp = tmcf->router_conf->mem_pool;
- nxt_job_destroy(task, jbs);
+ u = nxt_mp_alloc(mp, sizeof(nxt_upstream_t));
+ if (nxt_slow_path(u == NULL)) {
+ return NULL;
+ }
- up->ready_handler(task, up);
-}
+ *u = *upstream;
+ urrcf = upstream->type.round_robin;
-static void
-nxt_upstream_round_robin_peer_error(nxt_task_t *task, void *obj, void *data)
-{
- nxt_upstream_peer_t *up;
- nxt_job_sockaddr_parse_t *jbs;
+ size = sizeof(nxt_upstream_round_robin_t)
+ + urrcf->items * sizeof(nxt_upstream_round_robin_server_t);
- jbs = obj;
- up = jbs->resolve.job.data;
+ urr = nxt_mp_alloc(mp, size);
+ if (nxt_slow_path(urr == NULL)) {
+ return NULL;
+ }
- up->ready_handler(task, up);
-}
+ u->type.round_robin = urr;
+ n = urrcf->items;
+ urr->items = n;
-static void
-nxt_upstream_round_robin_get_peer(nxt_task_t *task, nxt_upstream_peer_t *up)
-{
- int32_t effective_weights;
- nxt_uint_t i;
- nxt_msec_t now;
- nxt_upstream_round_robin_t *urr;
- nxt_upstream_round_robin_peer_t *peer, *best;
+ for (i = 0; i < n; i++) {
+ urr->server[i] = urrcf->server[i];
+ }
- urr = up->upstream;
+ return u;
+}
- now = task->thread->engine->timers.now;
- nxt_thread_spin_lock(&urr->lock);
+static void
+nxt_upstream_round_robin_server_get(nxt_task_t *task, nxt_upstream_server_t *us)
+{
+ int32_t total;
+ uint32_t i, n;
+ nxt_upstream_round_robin_t *round_robin;
+ nxt_upstream_round_robin_server_t *s, *best;
best = NULL;
- effective_weights = 0;
- peer = urr->peers;
+ total = 0;
- for (i = 0; i < urr->npeers; i++) {
-
- if (peer[i].down) {
- continue;
- }
+ round_robin = us->upstream->type.round_robin;
-#if 0
- if (peer[i].max_fails != 0 && peer[i].fails >= peer->max_fails) {
- good = peer[i].last_accessed + peer[i].fail_timeout;
+ s = round_robin->server;
+ n = round_robin->items;
- if (nxt_msec_diff(now, peer[i].last_accessed) <= 0) {
- continue;
- }
- }
-#endif
+ for (i = 0; i < n; i++) {
- peer[i].current_weight += peer[i].effective_weight;
- effective_weights += peer[i].effective_weight;
+ s[i].current_weight += s[i].effective_weight;
+ total += s[i].effective_weight;
- if (peer[i].effective_weight < peer[i].weight) {
- peer[i].effective_weight++;
+ if (s[i].effective_weight < s[i].weight) {
+ s[i].effective_weight++;
}
- if (best == NULL || peer[i].current_weight > best->current_weight) {
- best = &peer[i];
+ if (best == NULL || s[i].current_weight > best->current_weight) {
+ best = &s[i];
}
}
- if (best != NULL) {
- best->current_weight -= effective_weights;
- best->last_accessed = now;
-
- up->sockaddr = best->sockaddr;
-
- } else {
- up->sockaddr = NULL;
+ if (best == NULL) {
+ us->state->error(task, us);
+ return;
}
- nxt_thread_spin_unlock(&urr->lock);
+ best->current_weight -= total;
+ us->sockaddr = best->sockaddr;
+ us->protocol = best->protocol;
+ us->server.round_robin = best;
- up->ready_handler(task, up);
+ us->state->ready(task, us);
}