From 7935ea45436ea832344cec945d39a61ae91f2a69 Mon Sep 17 00:00:00 2001 From: Igor Sysoev Date: Fri, 6 Mar 2020 18:28:54 +0300 Subject: Round robin upstream added. --- src/nxt_http_proxy.c | 124 +++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 95 insertions(+), 29 deletions(-) (limited to 'src/nxt_http_proxy.c') diff --git a/src/nxt_http_proxy.c b/src/nxt_http_proxy.c index 7f4eeff2..893e9303 100644 --- a/src/nxt_http_proxy.c +++ b/src/nxt_http_proxy.c @@ -6,23 +6,21 @@ #include #include +#include -typedef void (*nxt_http_upstream_connect_t)(nxt_task_t *task, - nxt_http_upstream_t *upstream, nxt_http_peer_t *peer); - - -struct nxt_http_upstream_s { - uint32_t current; - uint32_t n; - uint8_t protocol; - nxt_http_upstream_connect_t connect; - nxt_sockaddr_t *sockaddr[1]; +struct nxt_upstream_proxy_s { + nxt_sockaddr_t *sockaddr; + uint8_t protocol; }; -static void nxt_http_upstream_connect(nxt_task_t *task, - nxt_http_upstream_t *upstream, nxt_http_peer_t *peer); +static void nxt_http_proxy_server_get(nxt_task_t *task, + nxt_upstream_server_t *us); +static void nxt_http_proxy_upstream_ready(nxt_task_t *task, + nxt_upstream_server_t *us); +static void nxt_http_proxy_upstream_error(nxt_task_t *task, + nxt_upstream_server_t *us); static nxt_http_action_t *nxt_http_proxy_handler(nxt_task_t *task, nxt_http_request_t *r, nxt_http_action_t *action); static void nxt_http_proxy_header_send(nxt_task_t *task, void *obj, void *data); @@ -43,12 +41,24 @@ static const nxt_http_request_state_t nxt_http_proxy_header_read_state; static const nxt_http_request_state_t nxt_http_proxy_read_state; +static const nxt_upstream_server_proto_t nxt_upstream_simple_proto = { + .get = nxt_http_proxy_server_get, +}; + + +static const nxt_upstream_peer_state_t nxt_upstream_proxy_state = { + .ready = nxt_http_proxy_upstream_ready, + .error = nxt_http_proxy_upstream_error, +}; + + nxt_int_t nxt_http_proxy_create(nxt_mp_t *mp, nxt_http_action_t *action) { - nxt_str_t name; - nxt_sockaddr_t *sa; - nxt_http_upstream_t *upstream; + nxt_str_t name; + nxt_sockaddr_t *sa; + nxt_upstream_t *up; + nxt_upstream_proxy_t *proxy; sa = NULL; name = action->name; @@ -66,18 +76,25 @@ nxt_http_proxy_create(nxt_mp_t *mp, nxt_http_action_t *action) } if (sa != NULL) { - upstream = nxt_mp_alloc(mp, sizeof(nxt_http_upstream_t)); - if (nxt_slow_path(upstream == NULL)) { + up = nxt_mp_alloc(mp, sizeof(nxt_upstream_t)); + if (nxt_slow_path(up == NULL)) { + return NXT_ERROR; + } + + up->name.length = sa->length; + up->name.start = nxt_sockaddr_start(sa); + up->proto = &nxt_upstream_simple_proto; + + proxy = nxt_mp_alloc(mp, sizeof(nxt_upstream_proxy_t)); + if (nxt_slow_path(proxy == NULL)) { return NXT_ERROR; } - upstream->current = 0; - upstream->n = 1; - upstream->protocol = NXT_HTTP_PROTO_H1; - upstream->connect = nxt_http_upstream_connect; - upstream->sockaddr[0] = sa; + proxy->sockaddr = sa; + proxy->protocol = NXT_HTTP_PROTO_H1; + up->type.proxy = proxy; - action->u.upstream = upstream; + action->u.upstream = up; action->handler = nxt_http_proxy_handler; } @@ -89,7 +106,22 @@ static nxt_http_action_t * nxt_http_proxy_handler(nxt_task_t *task, nxt_http_request_t *r, nxt_http_action_t *action) { - nxt_http_peer_t *peer; + return nxt_upstream_proxy_handler(task, r, action->u.upstream); +} + + +nxt_http_action_t * +nxt_upstream_proxy_handler(nxt_task_t *task, nxt_http_request_t *r, + nxt_upstream_t *upstream) +{ + nxt_http_peer_t *peer; + nxt_upstream_server_t *us; + + us = nxt_mp_zalloc(r->mem_pool, sizeof(nxt_upstream_server_t)); + if (nxt_slow_path(us == NULL)) { + nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); + return NULL; + } peer = nxt_mp_zalloc(r->mem_pool, sizeof(nxt_http_peer_t)); if (nxt_slow_path(peer == NULL)) { @@ -102,18 +134,39 @@ nxt_http_proxy_handler(nxt_task_t *task, nxt_http_request_t *r, nxt_mp_retain(r->mem_pool); - action->u.upstream->connect(task, action->u.upstream, peer); + us->state = &nxt_upstream_proxy_state; + us->peer.http = peer; + peer->server = us; + + us->upstream = upstream; + upstream->proto->get(task, us); return NULL; } static void -nxt_http_upstream_connect(nxt_task_t *task, nxt_http_upstream_t *upstream, - nxt_http_peer_t *peer) +nxt_http_proxy_server_get(nxt_task_t *task, nxt_upstream_server_t *us) { - peer->protocol = upstream->protocol; - peer->sockaddr = upstream->sockaddr[0]; + nxt_upstream_proxy_t *proxy; + + proxy = us->upstream->type.proxy; + + us->sockaddr = proxy->sockaddr; + us->protocol = proxy->protocol; + + us->state->ready(task, us); +} + + +static void +nxt_http_proxy_upstream_ready(nxt_task_t *task, nxt_upstream_server_t *us) +{ + nxt_http_peer_t *peer; + + peer = us->peer.http; + + peer->protocol = us->protocol; peer->request->state = &nxt_http_proxy_header_send_state; @@ -121,6 +174,19 @@ nxt_http_upstream_connect(nxt_task_t *task, nxt_http_upstream_t *upstream, } +static void +nxt_http_proxy_upstream_error(nxt_task_t *task, nxt_upstream_server_t *us) +{ + nxt_http_request_t *r; + + r = us->peer.http->request; + + nxt_mp_release(r->mem_pool); + + nxt_http_request_error(task, r, NXT_HTTP_BAD_GATEWAY); +} + + static const nxt_http_request_state_t nxt_http_proxy_header_send_state nxt_aligned(64) = { -- cgit