diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/nxt_conf_validation.c | 130 | ||||
-rw-r--r-- | src/nxt_h1proto.c | 3 | ||||
-rw-r--r-- | src/nxt_http.h | 18 | ||||
-rw-r--r-- | src/nxt_http_proxy.c | 124 | ||||
-rw-r--r-- | src/nxt_http_route.c | 6 | ||||
-rw-r--r-- | src/nxt_router.c | 11 | ||||
-rw-r--r-- | src/nxt_router.h | 5 | ||||
-rw-r--r-- | src/nxt_upstream.c | 135 | ||||
-rw-r--r-- | src/nxt_upstream.h | 70 | ||||
-rw-r--r-- | src/nxt_upstream_round_robin.c | 185 |
10 files changed, 653 insertions, 34 deletions
diff --git a/src/nxt_conf_validation.c b/src/nxt_conf_validation.c index 8189cd44..86c1dbcb 100644 --- a/src/nxt_conf_validation.c +++ b/src/nxt_conf_validation.c @@ -110,6 +110,12 @@ static nxt_int_t nxt_conf_vldt_java_classpath(nxt_conf_validation_t *vldt, nxt_conf_value_t *value); static nxt_int_t nxt_conf_vldt_java_option(nxt_conf_validation_t *vldt, nxt_conf_value_t *value); +static nxt_int_t nxt_conf_vldt_upstream(nxt_conf_validation_t *vldt, + nxt_str_t *name, nxt_conf_value_t *value); +static nxt_int_t nxt_conf_vldt_server(nxt_conf_validation_t *vldt, + nxt_str_t *name, nxt_conf_value_t *value); +static nxt_int_t nxt_conf_vldt_server_weight(nxt_conf_validation_t *vldt, + nxt_conf_value_t *value, void *data); static nxt_int_t nxt_conf_vldt_isolation(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, void *data); @@ -226,6 +232,11 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_root_members[] = { &nxt_conf_vldt_object_iterator, (void *) &nxt_conf_vldt_app }, + { nxt_string("upstreams"), + NXT_CONF_VLDT_OBJECT, + &nxt_conf_vldt_object_iterator, + (void *) &nxt_conf_vldt_upstream }, + { nxt_string("access_log"), NXT_CONF_VLDT_STRING, NULL, @@ -682,6 +693,26 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_java_members[] = { }; +static nxt_conf_vldt_object_t nxt_conf_vldt_upstream_members[] = { + { nxt_string("servers"), + NXT_CONF_VLDT_OBJECT, + &nxt_conf_vldt_object_iterator, + (void *) &nxt_conf_vldt_server }, + + NXT_CONF_VLDT_END +}; + + +static nxt_conf_vldt_object_t nxt_conf_vldt_upstream_server_members[] = { + { nxt_string("weight"), + NXT_CONF_VLDT_INTEGER, + &nxt_conf_vldt_server_weight, + NULL }, + + NXT_CONF_VLDT_END +}; + + nxt_int_t nxt_conf_validate(nxt_conf_validation_t *vldt) { @@ -1017,6 +1048,27 @@ nxt_conf_vldt_pass(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, return NXT_OK; } + if (nxt_str_eq(&first, "upstreams", 9)) { + + if (second.length == 0) { + goto error; + } + + value = nxt_conf_get_object_member(vldt->conf, &first, NULL); + + if (nxt_slow_path(value == NULL)) { + goto error; + } + + value = nxt_conf_get_object_member(value, &second, NULL); + + if (nxt_slow_path(value == NULL)) { + goto error; + } + + return NXT_OK; + } + if (nxt_str_eq(&first, "routes", 6)) { value = nxt_conf_get_object_member(vldt->conf, &first, NULL); @@ -1901,3 +1953,81 @@ nxt_conf_vldt_java_option(nxt_conf_validation_t *vldt, nxt_conf_value_t *value) return NXT_OK; } + + +static nxt_int_t +nxt_conf_vldt_upstream(nxt_conf_validation_t *vldt, nxt_str_t *name, + nxt_conf_value_t *value) +{ + nxt_int_t ret; + nxt_conf_value_t *conf; + + static nxt_str_t servers = nxt_string("servers"); + + ret = nxt_conf_vldt_type(vldt, name, value, NXT_CONF_VLDT_OBJECT); + + if (ret != NXT_OK) { + return ret; + } + + ret = nxt_conf_vldt_object(vldt, value, nxt_conf_vldt_upstream_members); + + if (ret != NXT_OK) { + return ret; + } + + conf = nxt_conf_get_object_member(value, &servers, NULL); + if (conf == NULL) { + return nxt_conf_vldt_error(vldt, "The \"%V\" upstream must contain " + "\"servers\" object value.", name); + } + + return NXT_OK; +} + + +static nxt_int_t +nxt_conf_vldt_server(nxt_conf_validation_t *vldt, nxt_str_t *name, + nxt_conf_value_t *value) +{ + nxt_int_t ret; + nxt_sockaddr_t *sa; + + ret = nxt_conf_vldt_type(vldt, name, value, NXT_CONF_VLDT_OBJECT); + + if (ret != NXT_OK) { + return ret; + } + + sa = nxt_sockaddr_parse(vldt->pool, name); + + if (sa == NULL) { + return nxt_conf_vldt_error(vldt, "The \"%V\" is not valid " + "server address.", name); + } + + return nxt_conf_vldt_object(vldt, value, + nxt_conf_vldt_upstream_server_members); +} + + +static nxt_int_t +nxt_conf_vldt_server_weight(nxt_conf_validation_t *vldt, + nxt_conf_value_t *value, void *data) +{ + int64_t int_value; + + int_value = nxt_conf_get_integer(value); + + if (int_value <= 0) { + return nxt_conf_vldt_error(vldt, "The \"weight\" number must be " + "greater than 0."); + } + + if (int_value > NXT_INT32_T_MAX) { + return nxt_conf_vldt_error(vldt, "The \"weight\" number must " + "not exceed %d.", NXT_INT32_T_MAX); + } + + return NXT_OK; +} diff --git a/src/nxt_h1proto.c b/src/nxt_h1proto.c index 8ce57893..1a6c537e 100644 --- a/src/nxt_h1proto.c +++ b/src/nxt_h1proto.c @@ -6,6 +6,7 @@ #include <nxt_router.h> #include <nxt_http.h> +#include <nxt_upstream.h> #include <nxt_h1proto.h> #include <nxt_websocket.h> #include <nxt_websocket_header.h> @@ -2004,7 +2005,7 @@ nxt_h1p_peer_connect(nxt_task_t *task, nxt_http_peer_t *peer) c->read_timer.task = task; c->write_timer.task = task; c->socket.data = peer; - c->remote = peer->sockaddr; + c->remote = peer->server->sockaddr; c->socket.write_ready = 1; c->write_state = &nxt_h1p_peer_connect_state; diff --git a/src/nxt_http.h b/src/nxt_http.h index 67c7645b..0e0694e5 100644 --- a/src/nxt_http.h +++ b/src/nxt_http.h @@ -106,10 +106,12 @@ typedef struct { } nxt_http_response_t; +typedef struct nxt_upstream_server_s nxt_upstream_server_t; + typedef struct { nxt_http_proto_t proto; nxt_http_request_t *request; - nxt_sockaddr_t *sockaddr; + nxt_upstream_server_t *server; nxt_list_t *fields; nxt_buf_t *body; nxt_off_t remainder; @@ -178,7 +180,6 @@ struct nxt_http_request_s { typedef struct nxt_http_route_s nxt_http_route_t; -typedef struct nxt_http_upstream_s nxt_http_upstream_t; struct nxt_http_action_s { @@ -187,9 +188,10 @@ struct nxt_http_action_s { nxt_http_action_t *action); union { nxt_http_route_t *route; - nxt_http_upstream_t *upstream; nxt_app_t *application; nxt_http_action_t *fallback; + nxt_upstream_t *upstream; + uint32_t upstream_number; } u; nxt_str_t name; @@ -275,6 +277,11 @@ nxt_http_action_t *nxt_http_pass_application(nxt_task_t *task, void nxt_http_routes_cleanup(nxt_task_t *task, nxt_http_routes_t *routes); void nxt_http_action_cleanup(nxt_task_t *task, nxt_http_action_t *action); +nxt_int_t nxt_upstreams_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, + nxt_conf_value_t *conf); +nxt_int_t nxt_upstreams_joint_create(nxt_router_temp_conf_t *tmcf, + nxt_upstream_t ***upstream_joint); + nxt_http_action_t *nxt_http_static_handler(nxt_task_t *task, nxt_http_request_t *r, nxt_http_action_t *action); nxt_int_t nxt_http_static_mtypes_init(nxt_mp_t *mp, nxt_lvlhsh_t *hash); @@ -285,6 +292,11 @@ nxt_str_t *nxt_http_static_mtypes_hash_find(nxt_lvlhsh_t *hash, nxt_http_action_t *nxt_http_application_handler(nxt_task_t *task, nxt_http_request_t *r, nxt_http_action_t *action); +void nxt_upstream_find(nxt_upstreams_t *upstreams, nxt_str_t *name, + nxt_http_action_t *action); +nxt_http_action_t *nxt_upstream_proxy_handler(nxt_task_t *task, + nxt_http_request_t *r, nxt_upstream_t *upstream); + nxt_int_t nxt_http_proxy_create(nxt_mp_t *mp, nxt_http_action_t *action); nxt_int_t nxt_http_proxy_date(void *ctx, nxt_http_field_t *field, diff --git a/src/nxt_http_proxy.c b/src/nxt_http_proxy.c index 7f4eeff2..893e9303 100644 --- a/src/nxt_http_proxy.c +++ b/src/nxt_http_proxy.c @@ -6,23 +6,21 @@ #include <nxt_router.h> #include <nxt_http.h> +#include <nxt_upstream.h> -typedef void (*nxt_http_upstream_connect_t)(nxt_task_t *task, - nxt_http_upstream_t *upstream, nxt_http_peer_t *peer); - - -struct nxt_http_upstream_s { - uint32_t current; - uint32_t n; - uint8_t protocol; - nxt_http_upstream_connect_t connect; - nxt_sockaddr_t *sockaddr[1]; +struct nxt_upstream_proxy_s { + nxt_sockaddr_t *sockaddr; + uint8_t protocol; }; -static void nxt_http_upstream_connect(nxt_task_t *task, - nxt_http_upstream_t *upstream, nxt_http_peer_t *peer); +static void nxt_http_proxy_server_get(nxt_task_t *task, + nxt_upstream_server_t *us); +static void nxt_http_proxy_upstream_ready(nxt_task_t *task, + nxt_upstream_server_t *us); +static void nxt_http_proxy_upstream_error(nxt_task_t *task, + nxt_upstream_server_t *us); static nxt_http_action_t *nxt_http_proxy_handler(nxt_task_t *task, nxt_http_request_t *r, nxt_http_action_t *action); static void nxt_http_proxy_header_send(nxt_task_t *task, void *obj, void *data); @@ -43,12 +41,24 @@ static const nxt_http_request_state_t nxt_http_proxy_header_read_state; static const nxt_http_request_state_t nxt_http_proxy_read_state; +static const nxt_upstream_server_proto_t nxt_upstream_simple_proto = { + .get = nxt_http_proxy_server_get, +}; + + +static const nxt_upstream_peer_state_t nxt_upstream_proxy_state = { + .ready = nxt_http_proxy_upstream_ready, + .error = nxt_http_proxy_upstream_error, +}; + + nxt_int_t nxt_http_proxy_create(nxt_mp_t *mp, nxt_http_action_t *action) { - nxt_str_t name; - nxt_sockaddr_t *sa; - nxt_http_upstream_t *upstream; + nxt_str_t name; + nxt_sockaddr_t *sa; + nxt_upstream_t *up; + nxt_upstream_proxy_t *proxy; sa = NULL; name = action->name; @@ -66,18 +76,25 @@ nxt_http_proxy_create(nxt_mp_t *mp, nxt_http_action_t *action) } if (sa != NULL) { - upstream = nxt_mp_alloc(mp, sizeof(nxt_http_upstream_t)); - if (nxt_slow_path(upstream == NULL)) { + up = nxt_mp_alloc(mp, sizeof(nxt_upstream_t)); + if (nxt_slow_path(up == NULL)) { + return NXT_ERROR; + } + + up->name.length = sa->length; + up->name.start = nxt_sockaddr_start(sa); + up->proto = &nxt_upstream_simple_proto; + + proxy = nxt_mp_alloc(mp, sizeof(nxt_upstream_proxy_t)); + if (nxt_slow_path(proxy == NULL)) { return NXT_ERROR; } - upstream->current = 0; - upstream->n = 1; - upstream->protocol = NXT_HTTP_PROTO_H1; - upstream->connect = nxt_http_upstream_connect; - upstream->sockaddr[0] = sa; + proxy->sockaddr = sa; + proxy->protocol = NXT_HTTP_PROTO_H1; + up->type.proxy = proxy; - action->u.upstream = upstream; + action->u.upstream = up; action->handler = nxt_http_proxy_handler; } @@ -89,7 +106,22 @@ static nxt_http_action_t * nxt_http_proxy_handler(nxt_task_t *task, nxt_http_request_t *r, nxt_http_action_t *action) { - nxt_http_peer_t *peer; + return nxt_upstream_proxy_handler(task, r, action->u.upstream); +} + + +nxt_http_action_t * +nxt_upstream_proxy_handler(nxt_task_t *task, nxt_http_request_t *r, + nxt_upstream_t *upstream) +{ + nxt_http_peer_t *peer; + nxt_upstream_server_t *us; + + us = nxt_mp_zalloc(r->mem_pool, sizeof(nxt_upstream_server_t)); + if (nxt_slow_path(us == NULL)) { + nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); + return NULL; + } peer = nxt_mp_zalloc(r->mem_pool, sizeof(nxt_http_peer_t)); if (nxt_slow_path(peer == NULL)) { @@ -102,18 +134,39 @@ nxt_http_proxy_handler(nxt_task_t *task, nxt_http_request_t *r, nxt_mp_retain(r->mem_pool); - action->u.upstream->connect(task, action->u.upstream, peer); + us->state = &nxt_upstream_proxy_state; + us->peer.http = peer; + peer->server = us; + + us->upstream = upstream; + upstream->proto->get(task, us); return NULL; } static void -nxt_http_upstream_connect(nxt_task_t *task, nxt_http_upstream_t *upstream, - nxt_http_peer_t *peer) +nxt_http_proxy_server_get(nxt_task_t *task, nxt_upstream_server_t *us) { - peer->protocol = upstream->protocol; - peer->sockaddr = upstream->sockaddr[0]; + nxt_upstream_proxy_t *proxy; + + proxy = us->upstream->type.proxy; + + us->sockaddr = proxy->sockaddr; + us->protocol = proxy->protocol; + + us->state->ready(task, us); +} + + +static void +nxt_http_proxy_upstream_ready(nxt_task_t *task, nxt_upstream_server_t *us) +{ + nxt_http_peer_t *peer; + + peer = us->peer.http; + + peer->protocol = us->protocol; peer->request->state = &nxt_http_proxy_header_send_state; @@ -121,6 +174,19 @@ nxt_http_upstream_connect(nxt_task_t *task, nxt_http_upstream_t *upstream, } +static void +nxt_http_proxy_upstream_error(nxt_task_t *task, nxt_upstream_server_t *us) +{ + nxt_http_request_t *r; + + r = us->peer.http->request; + + nxt_mp_release(r->mem_pool); + + nxt_http_request_error(task, r, NXT_HTTP_BAD_GATEWAY); +} + + static const nxt_http_request_state_t nxt_http_proxy_header_send_state nxt_aligned(64) = { diff --git a/src/nxt_http_route.c b/src/nxt_http_route.c index df52894a..d7f20bcb 100644 --- a/src/nxt_http_route.c +++ b/src/nxt_http_route.c @@ -1114,6 +1114,12 @@ nxt_http_action_resolve(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_router_listener_application(tmcf, &name, action); nxt_router_app_use(task, action->u.application, 1); + } else if (nxt_str_start(&name, "upstreams/", 10)) { + name.length -= 10; + name.start += 10; + + nxt_upstream_find(tmcf->router_conf->upstreams, &name, action); + } else if (nxt_str_start(&name, "routes", 6)) { if (name.length == 6) { diff --git a/src/nxt_router.c b/src/nxt_router.c index 46a6b921..d77ffa2b 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -1634,6 +1634,11 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, tmcf->router_conf->routes = routes; } + ret = nxt_upstreams_create(task, tmcf, conf); + if (nxt_slow_path(ret != NXT_OK)) { + return ret; + } + http = nxt_conf_get_path(conf, &http_path); #if 0 if (http == NULL) { @@ -2526,6 +2531,7 @@ nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_work_handler_t handler) { + nxt_int_t ret; nxt_joint_job_t *job; nxt_queue_link_t *qlk; nxt_socket_conf_t *skcf; @@ -2559,6 +2565,11 @@ nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf, job->work.data = joint; + ret = nxt_upstreams_joint_create(tmcf, &joint->upstreams); + if (nxt_slow_path(ret != NXT_OK)) { + return ret; + } + joint->count = 1; skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); diff --git a/src/nxt_router.h b/src/nxt_router.h index 9ce9f3be..85ef9a6c 100644 --- a/src/nxt_router.h +++ b/src/nxt_router.h @@ -18,6 +18,8 @@ typedef struct nxt_http_request_s nxt_http_request_t; typedef struct nxt_http_action_s nxt_http_action_t; typedef struct nxt_http_routes_s nxt_http_routes_t; +typedef struct nxt_upstream_s nxt_upstream_t; +typedef struct nxt_upstreams_s nxt_upstreams_t; typedef struct nxt_router_access_log_s nxt_router_access_log_t; @@ -43,6 +45,7 @@ typedef struct { nxt_router_t *router; nxt_http_routes_t *routes; + nxt_upstreams_t *upstreams; nxt_lvlhsh_t mtypes_hash; @@ -196,6 +199,8 @@ typedef struct { nxt_event_engine_t *engine; nxt_socket_conf_t *socket_conf; + nxt_upstream_t **upstreams; + /* Modules configuraitons. */ } nxt_socket_conf_joint_t; diff --git a/src/nxt_upstream.c b/src/nxt_upstream.c index 3899a3ce..66b6619a 100644 --- a/src/nxt_upstream.c +++ b/src/nxt_upstream.c @@ -4,4 +4,137 @@ * Copyright (C) NGINX, Inc. */ -#include <nxt_main.h> +#include <nxt_router.h> +#include <nxt_http.h> +#include <nxt_upstream.h> + + +static nxt_http_action_t *nxt_upstream_handler(nxt_task_t *task, + nxt_http_request_t *r, nxt_http_action_t *action); + + +nxt_int_t +nxt_upstreams_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, + nxt_conf_value_t *conf) +{ + size_t size; + uint32_t i, n, next; + nxt_mp_t *mp; + nxt_int_t ret; + nxt_str_t name, *string; + nxt_upstreams_t *upstreams; + nxt_conf_value_t *upstreams_conf, *upcf; + + static nxt_str_t upstreams_name = nxt_string("upstreams"); + + upstreams_conf = nxt_conf_get_object_member(conf, &upstreams_name, NULL); + + if (upstreams_conf == NULL) { + return NXT_OK; + } + + n = nxt_conf_object_members_count(upstreams_conf); + + if (n == 0) { + return NXT_OK; + } + + mp = tmcf->router_conf->mem_pool; + size = sizeof(nxt_upstreams_t) + n * sizeof(nxt_upstream_t); + + upstreams = nxt_mp_zalloc(mp, size); + if (nxt_slow_path(upstreams == NULL)) { + return NXT_ERROR; + } + + upstreams->items = n; + next = 0; + + for (i = 0; i < n; i++) { + upcf = nxt_conf_next_object_member(upstreams_conf, &name, &next); + + string = nxt_str_dup(mp, &upstreams->upstream[i].name, &name); + if (nxt_slow_path(string == NULL)) { + return NXT_ERROR; + } + + ret = nxt_upstream_round_robin_create(task, tmcf, upcf, + &upstreams->upstream[i]); + if (nxt_slow_path(ret != NXT_OK)) { + return NXT_ERROR; + } + } + + tmcf->router_conf->upstreams = upstreams; + + return NXT_OK; +} + + +void +nxt_upstream_find(nxt_upstreams_t *upstreams, nxt_str_t *name, + nxt_http_action_t *action) +{ + uint32_t i, n; + nxt_upstream_t *upstream; + + upstream = &upstreams->upstream[0]; + n = upstreams->items; + + for (i = 0; i < n; i++) { + if (nxt_strstr_eq(&upstream[i].name, name)) { + action->u.upstream_number = i; + action->handler = nxt_upstream_handler; + + return; + } + } +} + + +nxt_int_t +nxt_upstreams_joint_create(nxt_router_temp_conf_t *tmcf, + nxt_upstream_t ***upstream_joint) +{ + uint32_t i, n; + nxt_upstream_t *u, **up; + nxt_upstreams_t *upstreams; + nxt_router_conf_t *router_conf; + + router_conf = tmcf->router_conf; + upstreams = router_conf->upstreams; + + if (upstreams == NULL) { + *upstream_joint = NULL; + return NXT_OK; + } + + n = upstreams->items; + + up = nxt_mp_zalloc(router_conf->mem_pool, n * sizeof(nxt_upstream_t *)); + if (nxt_slow_path(up == NULL)) { + return NXT_ERROR; + } + + u = &upstreams->upstream[0]; + + for (i = 0; i < n; i++) { + up[i] = u[i].proto->joint_create(tmcf, &u[i]); + if (nxt_slow_path(up[i] == NULL)) { + return NXT_ERROR; + } + } + + *upstream_joint = up; + + return NXT_OK; +} + + +static nxt_http_action_t * +nxt_upstream_handler(nxt_task_t *task, nxt_http_request_t *r, + nxt_http_action_t *action) +{ + return nxt_upstream_proxy_handler(task, r, + r->conf->upstreams[action->u.upstream_number]); +} diff --git a/src/nxt_upstream.h b/src/nxt_upstream.h index a25e0940..afc53774 100644 --- a/src/nxt_upstream.h +++ b/src/nxt_upstream.h @@ -8,4 +8,74 @@ #define _NXT_UPSTREAM_H_INCLUDED_ +typedef struct nxt_upstream_proxy_s nxt_upstream_proxy_t; +typedef struct nxt_upstream_round_robin_s nxt_upstream_round_robin_t; +typedef struct nxt_upstream_round_robin_server_s + nxt_upstream_round_robin_server_t; + + +typedef void (*nxt_upstream_peer_ready_t)(nxt_task_t *task, + nxt_upstream_server_t *us); +typedef void (*nxt_upstream_peer_error_t)(nxt_task_t *task, + nxt_upstream_server_t *us); + + +typedef struct { + nxt_upstream_peer_ready_t ready; + nxt_upstream_peer_error_t error; +} nxt_upstream_peer_state_t; + + +typedef nxt_upstream_t *(*nxt_upstream_joint_create_t)( + nxt_router_temp_conf_t *tmcf, nxt_upstream_t *upstream); +typedef void (*nxt_upstream_server_get_t)(nxt_task_t *task, + nxt_upstream_server_t *us); + + +typedef struct { + nxt_upstream_joint_create_t joint_create; + nxt_upstream_server_get_t get; +} nxt_upstream_server_proto_t; + + +struct nxt_upstream_s { + const nxt_upstream_server_proto_t *proto; + + union { + nxt_upstream_proxy_t *proxy; + nxt_upstream_round_robin_t *round_robin; + } type; + + nxt_str_t name; +}; + + +struct nxt_upstreams_s { + uint32_t items; + nxt_upstream_t upstream[0]; +}; + + +struct nxt_upstream_server_s { + nxt_sockaddr_t *sockaddr; + const nxt_upstream_peer_state_t *state; + nxt_upstream_t *upstream; + + uint8_t protocol; + + union { + nxt_upstream_round_robin_server_t *round_robin; + } server; + + union { + nxt_http_peer_t *http; + } peer; +}; + + +nxt_int_t nxt_upstream_round_robin_create(nxt_task_t *task, + nxt_router_temp_conf_t *tmcf, nxt_conf_value_t *upstream_conf, + nxt_upstream_t *upstream); + + #endif /* _NXT_UPSTREAM_H_INCLUDED_ */ diff --git a/src/nxt_upstream_round_robin.c b/src/nxt_upstream_round_robin.c index 4d53c716..fd76ecb5 100644 --- a/src/nxt_upstream_round_robin.c +++ b/src/nxt_upstream_round_robin.c @@ -4,3 +4,188 @@ * Copyright (C) NGINX, Inc. */ +#include <nxt_router.h> +#include <nxt_http.h> +#include <nxt_upstream.h> + + +struct nxt_upstream_round_robin_server_s { + nxt_sockaddr_t *sockaddr; + + int32_t current_weight; + int32_t effective_weight; + int32_t weight; + + uint8_t protocol; +}; + + +struct nxt_upstream_round_robin_s { + uint32_t items; + nxt_upstream_round_robin_server_t server[0]; +}; + + +static nxt_upstream_t *nxt_upstream_round_robin_joint_create( + nxt_router_temp_conf_t *tmcf, nxt_upstream_t *upstream); +static void nxt_upstream_round_robin_server_get(nxt_task_t *task, + nxt_upstream_server_t *us); + + +static const nxt_upstream_server_proto_t nxt_upstream_round_robin_proto = { + .joint_create = nxt_upstream_round_robin_joint_create, + .get = nxt_upstream_round_robin_server_get, +}; + + +static nxt_conf_map_t nxt_upstream_round_robin_server_conf[] = { + { + nxt_string("weight"), + NXT_CONF_MAP_INT32, + offsetof(nxt_upstream_round_robin_server_t, weight), + }, +}; + + +nxt_int_t +nxt_upstream_round_robin_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, + nxt_conf_value_t *upstream_conf, nxt_upstream_t *upstream) +{ + size_t size; + uint32_t i, n, next; + nxt_mp_t *mp; + nxt_str_t name; + nxt_sockaddr_t *sa; + nxt_conf_value_t *servers_conf, *srvcf; + nxt_upstream_round_robin_t *urr; + + static nxt_str_t servers = nxt_string("servers"); + + mp = tmcf->router_conf->mem_pool; + + servers_conf = nxt_conf_get_object_member(upstream_conf, &servers, NULL); + n = nxt_conf_object_members_count(servers_conf); + + size = sizeof(nxt_upstream_round_robin_t) + + n * sizeof(nxt_upstream_round_robin_server_t); + + urr = nxt_mp_zalloc(mp, size); + if (nxt_slow_path(urr == NULL)) { + return NXT_ERROR; + } + + urr->items = n; + next = 0; + + for (i = 0; i < n; i++) { + srvcf = nxt_conf_next_object_member(servers_conf, &name, &next); + + sa = nxt_sockaddr_parse(mp, &name); + if (nxt_slow_path(sa == NULL)) { + return NXT_ERROR; + } + + sa->type = SOCK_STREAM; + + urr->server[i].sockaddr = sa; + urr->server[i].weight = 1; + urr->server[i].protocol = NXT_HTTP_PROTO_H1; + + nxt_conf_map_object(mp, srvcf, nxt_upstream_round_robin_server_conf, + nxt_nitems(nxt_upstream_round_robin_server_conf), + &urr->server[i]); + + urr->server[i].effective_weight = urr->server[i].weight; + } + + upstream->proto = &nxt_upstream_round_robin_proto; + upstream->type.round_robin = urr; + + return NXT_OK; +} + + +static nxt_upstream_t * +nxt_upstream_round_robin_joint_create(nxt_router_temp_conf_t *tmcf, + nxt_upstream_t *upstream) +{ + size_t size; + uint32_t i, n; + nxt_mp_t *mp; + nxt_upstream_t *u; + nxt_upstream_round_robin_t *urr, *urrcf; + + mp = tmcf->router_conf->mem_pool; + + u = nxt_mp_alloc(mp, sizeof(nxt_upstream_t)); + if (nxt_slow_path(u == NULL)) { + return NULL; + } + + *u = *upstream; + + urrcf = upstream->type.round_robin; + + size = sizeof(nxt_upstream_round_robin_t) + + urrcf->items * sizeof(nxt_upstream_round_robin_server_t); + + urr = nxt_mp_alloc(mp, size); + if (nxt_slow_path(urr == NULL)) { + return NULL; + } + + u->type.round_robin = urr; + + n = urrcf->items; + urr->items = n; + + for (i = 0; i < n; i++) { + urr->server[i] = urrcf->server[i]; + } + + return u; +} + + +static void +nxt_upstream_round_robin_server_get(nxt_task_t *task, nxt_upstream_server_t *us) +{ + int32_t total; + uint32_t i, n; + nxt_upstream_round_robin_t *round_robin; + nxt_upstream_round_robin_server_t *s, *best; + + best = NULL; + total = 0; + + round_robin = us->upstream->type.round_robin; + + s = round_robin->server; + n = round_robin->items; + + for (i = 0; i < n; i++) { + + s[i].current_weight += s[i].effective_weight; + total += s[i].effective_weight; + + if (s[i].effective_weight < s[i].weight) { + s[i].effective_weight++; + } + + if (best == NULL || s[i].current_weight > best->current_weight) { + best = &s[i]; + } + } + + if (best == NULL) { + us->state->error(task, us); + return; + } + + best->current_weight -= total; + us->sockaddr = best->sockaddr; + us->protocol = best->protocol; + us->server.round_robin = best; + + us->state->ready(task, us); +} |