summaryrefslogblamecommitdiffhomepage
path: root/src/nxt_upstream_round_robin.c
blob: f80357625db255409a2d4fbbd11e688561fc4074 (plain) (tree)







































































































































































































                                                                              

/*
 * Copyright (C) Igor Sysoev
 * Copyright (C) NGINX, Inc.
 */

#include <nxt_main.h>


typedef struct {
    int32_t                          weight;
    int32_t                          effective_weight;
    int32_t                          current_weight;
    uint32_t                         down;              /* 1 bit */
    nxt_msec_t                       last_accessed;
    nxt_sockaddr_t                   *sockaddr;
} nxt_upstream_round_robin_peer_t;


typedef struct {
    nxt_uint_t                       npeers;
    nxt_upstream_round_robin_peer_t  *peers;
    nxt_thread_spinlock_t            lock;
} nxt_upstream_round_robin_t;


static void nxt_upstream_round_robin_create(nxt_thread_t *thr, void *obj,
    void *data);
static void nxt_upstream_round_robin_peer_error(nxt_thread_t *thr, void *obj,
    void *data);
static void nxt_upstream_round_robin_get_peer(nxt_upstream_peer_t *up);


void
nxt_upstream_round_robin_peer(nxt_upstream_peer_t *up)
{
    nxt_job_sockaddr_parse_t  *jbs;

    if (up->upstream != NULL) {
        nxt_upstream_round_robin_get_peer(up);
    }

    jbs = nxt_job_create(up->mem_pool, sizeof(nxt_job_sockaddr_parse_t));
    if (nxt_slow_path(jbs == NULL)) {
        up->ready_handler(up);
        return;
    }

    jbs->resolve.job.data = up;
    jbs->resolve.port = up->port;
    jbs->resolve.log_level = NXT_LOG_ERR;
    jbs->resolve.ready_handler = nxt_upstream_round_robin_create;
    jbs->resolve.error_handler = nxt_upstream_round_robin_peer_error;
    jbs->addr = up->addr;

    nxt_job_sockaddr_parse(jbs);
}


static void
nxt_upstream_round_robin_create(nxt_thread_t *thr, void *obj, void *data)
{
    nxt_uint_t                       i;
    nxt_sockaddr_t                   *sa;
    nxt_upstream_peer_t              *up;
    nxt_job_sockaddr_parse_t         *jbs;
    nxt_upstream_round_robin_t       *urr;
    nxt_upstream_round_robin_peer_t  *peer;

    jbs = obj;
    up = jbs->resolve.job.data;

    urr = nxt_mem_zalloc(up->mem_pool, sizeof(nxt_upstream_round_robin_t));
    if (nxt_slow_path(urr == NULL)) {
        goto fail;
    }

    urr->npeers = jbs->resolve.count;

    peer = nxt_mem_zalloc(up->mem_pool,
                   urr->npeers * sizeof(nxt_upstream_round_robin_peer_t));
    if (nxt_slow_path(peer == NULL)) {
        goto fail;
    }

    urr->peers = peer;

    for (i = 0; i < urr->npeers; i++) {
        peer[i].weight = 1;
        peer[i].effective_weight = 1;

        sa = jbs->resolve.sockaddrs[i];

        /* STUB */
        sa->type = SOCK_STREAM;

        /* TODO: test ret */
        (void) nxt_sockaddr_text(up->mem_pool, sa, 1);

        nxt_log_debug(thr->log, "upstream peer: %*s", sa->text_len, sa->text);

        /* TODO: memcpy to shared memory pool. */
        peer[i].sockaddr = sa;
    }

    up->upstream = urr;

    /* STUB */
    up->sockaddr = peer[0].sockaddr;

    nxt_job_destroy(jbs);
    up->ready_handler(up);

    //nxt_upstream_round_robin_get_peer(up);
    return;

fail:

    nxt_job_destroy(jbs);

    up->ready_handler(up);
}


static void
nxt_upstream_round_robin_peer_error(nxt_thread_t *thr, void *obj, void *data)
{
    nxt_upstream_peer_t       *up;
    nxt_job_sockaddr_parse_t  *jbs;

    jbs = obj;
    up = jbs->resolve.job.data;

    up->ready_handler(up);
}


static void
nxt_upstream_round_robin_get_peer(nxt_upstream_peer_t *up)
{
    int32_t                          effective_weights;
    nxt_uint_t                       i;
    nxt_msec_t                       now;
    nxt_event_engine_t               *engine;
    nxt_upstream_round_robin_t       *urr;
    nxt_upstream_round_robin_peer_t  *peer, *best;

    urr = up->upstream;

    engine = nxt_thread_event_engine();
    now = engine->timers.now;

    nxt_thread_spin_lock(&urr->lock);

    best = NULL;
    effective_weights = 0;
    peer = urr->peers;

    for (i = 0; i < urr->npeers; i++) {

        if (peer[i].down) {
            continue;
        }

#if 0
        if (peer[i].max_fails != 0 && peer[i].fails >= peer->max_fails) {
            good = peer[i].last_accessed + peer[i].fail_timeout;

            if (nxt_msec_diff(now, peer[i].last_accessed) <= 0) {
                continue;
            }
        }
#endif

        peer[i].current_weight += peer[i].effective_weight;
        effective_weights += peer[i].effective_weight;

        if (peer[i].effective_weight < peer[i].weight) {
            peer[i].effective_weight++;
        }

        if (best == NULL || peer[i].current_weight > best->current_weight) {
            best = &peer[i];
        }
    }

    if (best != NULL) {
        best->current_weight -= effective_weights;
        best->last_accessed = now;

        up->sockaddr = best->sockaddr;

    } else {
        up->sockaddr = NULL;
    }

    nxt_thread_spin_unlock(&urr->lock);

    up->ready_handler(up);
}