summaryrefslogblamecommitdiffhomepage
path: root/src/nxt_upstream_round_robin.c
blob: 31e2f48a058b45c0db4d5c95723508a551bdc894 (plain) (tree)
1
2
3
4
5
6
7





                            
                 

































                                                                            



                                                                               
                                            
                                     
                                               


                                     
                                                             


                                                      
                                                    





                                                                             



















                                                                        





















                                                                        

                                                    


                                                                
 

                                             
















































































                                                                                
                                     










                                   

/*
 * Copyright (C) Igor Sysoev
 * Copyright (C) NGINX, Inc.
 */

#include <math.h>
#include <nxt_router.h>
#include <nxt_http.h>
#include <nxt_upstream.h>


struct nxt_upstream_round_robin_server_s {
    nxt_sockaddr_t                     *sockaddr;

    int32_t                            current_weight;
    int32_t                            effective_weight;
    int32_t                            weight;

    uint8_t                            protocol;
};


struct nxt_upstream_round_robin_s {
    uint32_t                           items;
    nxt_upstream_round_robin_server_t  server[0];
};


static nxt_upstream_t *nxt_upstream_round_robin_joint_create(
    nxt_router_temp_conf_t *tmcf, nxt_upstream_t *upstream);
static void nxt_upstream_round_robin_server_get(nxt_task_t *task,
    nxt_upstream_server_t *us);


static const nxt_upstream_server_proto_t  nxt_upstream_round_robin_proto = {
    .joint_create = nxt_upstream_round_robin_joint_create,
    .get          = nxt_upstream_round_robin_server_get,
};


nxt_int_t
nxt_upstream_round_robin_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
    nxt_conf_value_t *upstream_conf, nxt_upstream_t *upstream)
{
    double                      total, k, w;
    size_t                      size;
    uint32_t                    i, n, next, wt;
    nxt_mp_t                    *mp;
    nxt_str_t                   name;
    nxt_sockaddr_t              *sa;
    nxt_conf_value_t            *servers_conf, *srvcf, *wtcf;
    nxt_upstream_round_robin_t  *urr;

    static nxt_str_t  servers = nxt_string("servers");
    static nxt_str_t  weight = nxt_string("weight");

    mp = tmcf->router_conf->mem_pool;

    servers_conf = nxt_conf_get_object_member(upstream_conf, &servers, NULL);
    n = nxt_conf_object_members_count(servers_conf);

    total = 0.0;
    next = 0;

    for (i = 0; i < n; i++) {
        srvcf = nxt_conf_next_object_member(servers_conf, &name, &next);
        wtcf = nxt_conf_get_object_member(srvcf, &weight, NULL);
        w = (wtcf != NULL) ? nxt_conf_get_number(wtcf) : 1;
        total += w;
    }

    /*
     * This prevents overflow of int32_t
     * in nxt_upstream_round_robin_server_get().
     */
    k = (total == 0) ? 0 : (NXT_INT32_T_MAX / 2) / total;

    if (isinf(k)) {
        k = 1;
    }

    size = sizeof(nxt_upstream_round_robin_t)
           + n * sizeof(nxt_upstream_round_robin_server_t);

    urr = nxt_mp_zalloc(mp, size);
    if (nxt_slow_path(urr == NULL)) {
        return NXT_ERROR;
    }

    urr->items = n;
    next = 0;

    for (i = 0; i < n; i++) {
        srvcf = nxt_conf_next_object_member(servers_conf, &name, &next);

        sa = nxt_sockaddr_parse(mp, &name);
        if (nxt_slow_path(sa == NULL)) {
            return NXT_ERROR;
        }

        sa->type = SOCK_STREAM;

        urr->server[i].sockaddr = sa;
        urr->server[i].protocol = NXT_HTTP_PROTO_H1;

        wtcf = nxt_conf_get_object_member(srvcf, &weight, NULL);
        w = (wtcf != NULL) ? k * nxt_conf_get_number(wtcf) : k;
        wt = (w > 1 || w == 0) ? round(w) : 1;

        urr->server[i].weight = wt;
        urr->server[i].effective_weight = wt;
    }

    upstream->proto = &nxt_upstream_round_robin_proto;
    upstream->type.round_robin = urr;

    return NXT_OK;
}


static nxt_upstream_t *
nxt_upstream_round_robin_joint_create(nxt_router_temp_conf_t *tmcf,
    nxt_upstream_t *upstream)
{
    size_t                      size;
    uint32_t                    i, n;
    nxt_mp_t                    *mp;
    nxt_upstream_t              *u;
    nxt_upstream_round_robin_t  *urr, *urrcf;

    mp = tmcf->router_conf->mem_pool;

    u = nxt_mp_alloc(mp, sizeof(nxt_upstream_t));
    if (nxt_slow_path(u == NULL)) {
        return NULL;
    }

    *u = *upstream;

    urrcf = upstream->type.round_robin;

    size = sizeof(nxt_upstream_round_robin_t)
           + urrcf->items * sizeof(nxt_upstream_round_robin_server_t);

    urr = nxt_mp_alloc(mp, size);
    if (nxt_slow_path(urr == NULL)) {
        return NULL;
    }

    u->type.round_robin = urr;

    n = urrcf->items;
    urr->items = n;

    for (i = 0; i < n; i++) {
        urr->server[i] = urrcf->server[i];
    }

    return u;
}


static void
nxt_upstream_round_robin_server_get(nxt_task_t *task, nxt_upstream_server_t *us)
{
    int32_t                            total;
    uint32_t                           i, n;
    nxt_upstream_round_robin_t         *round_robin;
    nxt_upstream_round_robin_server_t  *s, *best;

    best = NULL;
    total = 0;

    round_robin = us->upstream->type.round_robin;

    s = round_robin->server;
    n = round_robin->items;

    for (i = 0; i < n; i++) {

        s[i].current_weight += s[i].effective_weight;
        total += s[i].effective_weight;

        if (s[i].effective_weight < s[i].weight) {
            s[i].effective_weight++;
        }

        if (best == NULL || s[i].current_weight > best->current_weight) {
            best = &s[i];
        }
    }

    if (best == NULL || total == 0) {
        us->state->error(task, us);
        return;
    }

    best->current_weight -= total;
    us->sockaddr = best->sockaddr;
    us->protocol = best->protocol;
    us->server.round_robin = best;

    us->state->ready(task, us);
}