summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorIgor Sysoev <igor@sysoev.ru>2020-03-06 18:28:54 +0300
committerIgor Sysoev <igor@sysoev.ru>2020-03-06 18:28:54 +0300
commit7935ea45436ea832344cec945d39a61ae91f2a69 (patch)
tree6ae820cdaf2204331300bb967090f23b61ed4d0a
parent794248090a74f31cbfcf24ea8c835df2d4d21073 (diff)
downloadunit-7935ea45436ea832344cec945d39a61ae91f2a69.tar.gz
unit-7935ea45436ea832344cec945d39a61ae91f2a69.tar.bz2
Round robin upstream added.
Diffstat (limited to '')
-rw-r--r--auto/sources1
-rw-r--r--src/nxt_conf_validation.c130
-rw-r--r--src/nxt_h1proto.c3
-rw-r--r--src/nxt_http.h18
-rw-r--r--src/nxt_http_proxy.c124
-rw-r--r--src/nxt_http_route.c6
-rw-r--r--src/nxt_router.c11
-rw-r--r--src/nxt_router.h5
-rw-r--r--src/nxt_upstream.c135
-rw-r--r--src/nxt_upstream.h70
-rw-r--r--src/nxt_upstream_round_robin.c185
11 files changed, 654 insertions, 34 deletions
diff --git a/auto/sources b/auto/sources
index 98e4a1f4..2283e543 100644
--- a/auto/sources
+++ b/auto/sources
@@ -69,6 +69,7 @@ NXT_LIB_SRCS=" \
src/nxt_job_resolve.c \
src/nxt_sockaddr.c \
src/nxt_listen_socket.c \
+ src/nxt_upstream.c \
src/nxt_upstream_round_robin.c \
src/nxt_http_parse.c \
src/nxt_app_log.c \
diff --git a/src/nxt_conf_validation.c b/src/nxt_conf_validation.c
index 8189cd44..86c1dbcb 100644
--- a/src/nxt_conf_validation.c
+++ b/src/nxt_conf_validation.c
@@ -110,6 +110,12 @@ static nxt_int_t nxt_conf_vldt_java_classpath(nxt_conf_validation_t *vldt,
nxt_conf_value_t *value);
static nxt_int_t nxt_conf_vldt_java_option(nxt_conf_validation_t *vldt,
nxt_conf_value_t *value);
+static nxt_int_t nxt_conf_vldt_upstream(nxt_conf_validation_t *vldt,
+ nxt_str_t *name, nxt_conf_value_t *value);
+static nxt_int_t nxt_conf_vldt_server(nxt_conf_validation_t *vldt,
+ nxt_str_t *name, nxt_conf_value_t *value);
+static nxt_int_t nxt_conf_vldt_server_weight(nxt_conf_validation_t *vldt,
+ nxt_conf_value_t *value, void *data);
static nxt_int_t nxt_conf_vldt_isolation(nxt_conf_validation_t *vldt,
nxt_conf_value_t *value, void *data);
@@ -226,6 +232,11 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_root_members[] = {
&nxt_conf_vldt_object_iterator,
(void *) &nxt_conf_vldt_app },
+ { nxt_string("upstreams"),
+ NXT_CONF_VLDT_OBJECT,
+ &nxt_conf_vldt_object_iterator,
+ (void *) &nxt_conf_vldt_upstream },
+
{ nxt_string("access_log"),
NXT_CONF_VLDT_STRING,
NULL,
@@ -682,6 +693,26 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_java_members[] = {
};
+static nxt_conf_vldt_object_t nxt_conf_vldt_upstream_members[] = {
+ { nxt_string("servers"),
+ NXT_CONF_VLDT_OBJECT,
+ &nxt_conf_vldt_object_iterator,
+ (void *) &nxt_conf_vldt_server },
+
+ NXT_CONF_VLDT_END
+};
+
+
+static nxt_conf_vldt_object_t nxt_conf_vldt_upstream_server_members[] = {
+ { nxt_string("weight"),
+ NXT_CONF_VLDT_INTEGER,
+ &nxt_conf_vldt_server_weight,
+ NULL },
+
+ NXT_CONF_VLDT_END
+};
+
+
nxt_int_t
nxt_conf_validate(nxt_conf_validation_t *vldt)
{
@@ -1017,6 +1048,27 @@ nxt_conf_vldt_pass(nxt_conf_validation_t *vldt, nxt_conf_value_t *value,
return NXT_OK;
}
+ if (nxt_str_eq(&first, "upstreams", 9)) {
+
+ if (second.length == 0) {
+ goto error;
+ }
+
+ value = nxt_conf_get_object_member(vldt->conf, &first, NULL);
+
+ if (nxt_slow_path(value == NULL)) {
+ goto error;
+ }
+
+ value = nxt_conf_get_object_member(value, &second, NULL);
+
+ if (nxt_slow_path(value == NULL)) {
+ goto error;
+ }
+
+ return NXT_OK;
+ }
+
if (nxt_str_eq(&first, "routes", 6)) {
value = nxt_conf_get_object_member(vldt->conf, &first, NULL);
@@ -1901,3 +1953,81 @@ nxt_conf_vldt_java_option(nxt_conf_validation_t *vldt, nxt_conf_value_t *value)
return NXT_OK;
}
+
+
+static nxt_int_t
+nxt_conf_vldt_upstream(nxt_conf_validation_t *vldt, nxt_str_t *name,
+ nxt_conf_value_t *value)
+{
+ nxt_int_t ret;
+ nxt_conf_value_t *conf;
+
+ static nxt_str_t servers = nxt_string("servers");
+
+ ret = nxt_conf_vldt_type(vldt, name, value, NXT_CONF_VLDT_OBJECT);
+
+ if (ret != NXT_OK) {
+ return ret;
+ }
+
+ ret = nxt_conf_vldt_object(vldt, value, nxt_conf_vldt_upstream_members);
+
+ if (ret != NXT_OK) {
+ return ret;
+ }
+
+ conf = nxt_conf_get_object_member(value, &servers, NULL);
+ if (conf == NULL) {
+ return nxt_conf_vldt_error(vldt, "The \"%V\" upstream must contain "
+ "\"servers\" object value.", name);
+ }
+
+ return NXT_OK;
+}
+
+
+static nxt_int_t
+nxt_conf_vldt_server(nxt_conf_validation_t *vldt, nxt_str_t *name,
+ nxt_conf_value_t *value)
+{
+ nxt_int_t ret;
+ nxt_sockaddr_t *sa;
+
+ ret = nxt_conf_vldt_type(vldt, name, value, NXT_CONF_VLDT_OBJECT);
+
+ if (ret != NXT_OK) {
+ return ret;
+ }
+
+ sa = nxt_sockaddr_parse(vldt->pool, name);
+
+ if (sa == NULL) {
+ return nxt_conf_vldt_error(vldt, "The \"%V\" is not valid "
+ "server address.", name);
+ }
+
+ return nxt_conf_vldt_object(vldt, value,
+ nxt_conf_vldt_upstream_server_members);
+}
+
+
+static nxt_int_t
+nxt_conf_vldt_server_weight(nxt_conf_validation_t *vldt,
+ nxt_conf_value_t *value, void *data)
+{
+ int64_t int_value;
+
+ int_value = nxt_conf_get_integer(value);
+
+ if (int_value <= 0) {
+ return nxt_conf_vldt_error(vldt, "The \"weight\" number must be "
+ "greater than 0.");
+ }
+
+ if (int_value > NXT_INT32_T_MAX) {
+ return nxt_conf_vldt_error(vldt, "The \"weight\" number must "
+ "not exceed %d.", NXT_INT32_T_MAX);
+ }
+
+ return NXT_OK;
+}
diff --git a/src/nxt_h1proto.c b/src/nxt_h1proto.c
index 8ce57893..1a6c537e 100644
--- a/src/nxt_h1proto.c
+++ b/src/nxt_h1proto.c
@@ -6,6 +6,7 @@
#include <nxt_router.h>
#include <nxt_http.h>
+#include <nxt_upstream.h>
#include <nxt_h1proto.h>
#include <nxt_websocket.h>
#include <nxt_websocket_header.h>
@@ -2004,7 +2005,7 @@ nxt_h1p_peer_connect(nxt_task_t *task, nxt_http_peer_t *peer)
c->read_timer.task = task;
c->write_timer.task = task;
c->socket.data = peer;
- c->remote = peer->sockaddr;
+ c->remote = peer->server->sockaddr;
c->socket.write_ready = 1;
c->write_state = &nxt_h1p_peer_connect_state;
diff --git a/src/nxt_http.h b/src/nxt_http.h
index 67c7645b..0e0694e5 100644
--- a/src/nxt_http.h
+++ b/src/nxt_http.h
@@ -106,10 +106,12 @@ typedef struct {
} nxt_http_response_t;
+typedef struct nxt_upstream_server_s nxt_upstream_server_t;
+
typedef struct {
nxt_http_proto_t proto;
nxt_http_request_t *request;
- nxt_sockaddr_t *sockaddr;
+ nxt_upstream_server_t *server;
nxt_list_t *fields;
nxt_buf_t *body;
nxt_off_t remainder;
@@ -178,7 +180,6 @@ struct nxt_http_request_s {
typedef struct nxt_http_route_s nxt_http_route_t;
-typedef struct nxt_http_upstream_s nxt_http_upstream_t;
struct nxt_http_action_s {
@@ -187,9 +188,10 @@ struct nxt_http_action_s {
nxt_http_action_t *action);
union {
nxt_http_route_t *route;
- nxt_http_upstream_t *upstream;
nxt_app_t *application;
nxt_http_action_t *fallback;
+ nxt_upstream_t *upstream;
+ uint32_t upstream_number;
} u;
nxt_str_t name;
@@ -275,6 +277,11 @@ nxt_http_action_t *nxt_http_pass_application(nxt_task_t *task,
void nxt_http_routes_cleanup(nxt_task_t *task, nxt_http_routes_t *routes);
void nxt_http_action_cleanup(nxt_task_t *task, nxt_http_action_t *action);
+nxt_int_t nxt_upstreams_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
+ nxt_conf_value_t *conf);
+nxt_int_t nxt_upstreams_joint_create(nxt_router_temp_conf_t *tmcf,
+ nxt_upstream_t ***upstream_joint);
+
nxt_http_action_t *nxt_http_static_handler(nxt_task_t *task,
nxt_http_request_t *r, nxt_http_action_t *action);
nxt_int_t nxt_http_static_mtypes_init(nxt_mp_t *mp, nxt_lvlhsh_t *hash);
@@ -285,6 +292,11 @@ nxt_str_t *nxt_http_static_mtypes_hash_find(nxt_lvlhsh_t *hash,
nxt_http_action_t *nxt_http_application_handler(nxt_task_t *task,
nxt_http_request_t *r, nxt_http_action_t *action);
+void nxt_upstream_find(nxt_upstreams_t *upstreams, nxt_str_t *name,
+ nxt_http_action_t *action);
+nxt_http_action_t *nxt_upstream_proxy_handler(nxt_task_t *task,
+ nxt_http_request_t *r, nxt_upstream_t *upstream);
+
nxt_int_t nxt_http_proxy_create(nxt_mp_t *mp, nxt_http_action_t *action);
nxt_int_t nxt_http_proxy_date(void *ctx, nxt_http_field_t *field,
diff --git a/src/nxt_http_proxy.c b/src/nxt_http_proxy.c
index 7f4eeff2..893e9303 100644
--- a/src/nxt_http_proxy.c
+++ b/src/nxt_http_proxy.c
@@ -6,23 +6,21 @@
#include <nxt_router.h>
#include <nxt_http.h>
+#include <nxt_upstream.h>
-typedef void (*nxt_http_upstream_connect_t)(nxt_task_t *task,
- nxt_http_upstream_t *upstream, nxt_http_peer_t *peer);
-
-
-struct nxt_http_upstream_s {
- uint32_t current;
- uint32_t n;
- uint8_t protocol;
- nxt_http_upstream_connect_t connect;
- nxt_sockaddr_t *sockaddr[1];
+struct nxt_upstream_proxy_s {
+ nxt_sockaddr_t *sockaddr;
+ uint8_t protocol;
};
-static void nxt_http_upstream_connect(nxt_task_t *task,
- nxt_http_upstream_t *upstream, nxt_http_peer_t *peer);
+static void nxt_http_proxy_server_get(nxt_task_t *task,
+ nxt_upstream_server_t *us);
+static void nxt_http_proxy_upstream_ready(nxt_task_t *task,
+ nxt_upstream_server_t *us);
+static void nxt_http_proxy_upstream_error(nxt_task_t *task,
+ nxt_upstream_server_t *us);
static nxt_http_action_t *nxt_http_proxy_handler(nxt_task_t *task,
nxt_http_request_t *r, nxt_http_action_t *action);
static void nxt_http_proxy_header_send(nxt_task_t *task, void *obj, void *data);
@@ -43,12 +41,24 @@ static const nxt_http_request_state_t nxt_http_proxy_header_read_state;
static const nxt_http_request_state_t nxt_http_proxy_read_state;
+static const nxt_upstream_server_proto_t nxt_upstream_simple_proto = {
+ .get = nxt_http_proxy_server_get,
+};
+
+
+static const nxt_upstream_peer_state_t nxt_upstream_proxy_state = {
+ .ready = nxt_http_proxy_upstream_ready,
+ .error = nxt_http_proxy_upstream_error,
+};
+
+
nxt_int_t
nxt_http_proxy_create(nxt_mp_t *mp, nxt_http_action_t *action)
{
- nxt_str_t name;
- nxt_sockaddr_t *sa;
- nxt_http_upstream_t *upstream;
+ nxt_str_t name;
+ nxt_sockaddr_t *sa;
+ nxt_upstream_t *up;
+ nxt_upstream_proxy_t *proxy;
sa = NULL;
name = action->name;
@@ -66,18 +76,25 @@ nxt_http_proxy_create(nxt_mp_t *mp, nxt_http_action_t *action)
}
if (sa != NULL) {
- upstream = nxt_mp_alloc(mp, sizeof(nxt_http_upstream_t));
- if (nxt_slow_path(upstream == NULL)) {
+ up = nxt_mp_alloc(mp, sizeof(nxt_upstream_t));
+ if (nxt_slow_path(up == NULL)) {
+ return NXT_ERROR;
+ }
+
+ up->name.length = sa->length;
+ up->name.start = nxt_sockaddr_start(sa);
+ up->proto = &nxt_upstream_simple_proto;
+
+ proxy = nxt_mp_alloc(mp, sizeof(nxt_upstream_proxy_t));
+ if (nxt_slow_path(proxy == NULL)) {
return NXT_ERROR;
}
- upstream->current = 0;
- upstream->n = 1;
- upstream->protocol = NXT_HTTP_PROTO_H1;
- upstream->connect = nxt_http_upstream_connect;
- upstream->sockaddr[0] = sa;
+ proxy->sockaddr = sa;
+ proxy->protocol = NXT_HTTP_PROTO_H1;
+ up->type.proxy = proxy;
- action->u.upstream = upstream;
+ action->u.upstream = up;
action->handler = nxt_http_proxy_handler;
}
@@ -89,7 +106,22 @@ static nxt_http_action_t *
nxt_http_proxy_handler(nxt_task_t *task, nxt_http_request_t *r,
nxt_http_action_t *action)
{
- nxt_http_peer_t *peer;
+ return nxt_upstream_proxy_handler(task, r, action->u.upstream);
+}
+
+
+nxt_http_action_t *
+nxt_upstream_proxy_handler(nxt_task_t *task, nxt_http_request_t *r,
+ nxt_upstream_t *upstream)
+{
+ nxt_http_peer_t *peer;
+ nxt_upstream_server_t *us;
+
+ us = nxt_mp_zalloc(r->mem_pool, sizeof(nxt_upstream_server_t));
+ if (nxt_slow_path(us == NULL)) {
+ nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
+ return NULL;
+ }
peer = nxt_mp_zalloc(r->mem_pool, sizeof(nxt_http_peer_t));
if (nxt_slow_path(peer == NULL)) {
@@ -102,18 +134,39 @@ nxt_http_proxy_handler(nxt_task_t *task, nxt_http_request_t *r,
nxt_mp_retain(r->mem_pool);
- action->u.upstream->connect(task, action->u.upstream, peer);
+ us->state = &nxt_upstream_proxy_state;
+ us->peer.http = peer;
+ peer->server = us;
+
+ us->upstream = upstream;
+ upstream->proto->get(task, us);
return NULL;
}
static void
-nxt_http_upstream_connect(nxt_task_t *task, nxt_http_upstream_t *upstream,
- nxt_http_peer_t *peer)
+nxt_http_proxy_server_get(nxt_task_t *task, nxt_upstream_server_t *us)
{
- peer->protocol = upstream->protocol;
- peer->sockaddr = upstream->sockaddr[0];
+ nxt_upstream_proxy_t *proxy;
+
+ proxy = us->upstream->type.proxy;
+
+ us->sockaddr = proxy->sockaddr;
+ us->protocol = proxy->protocol;
+
+ us->state->ready(task, us);
+}
+
+
+static void
+nxt_http_proxy_upstream_ready(nxt_task_t *task, nxt_upstream_server_t *us)
+{
+ nxt_http_peer_t *peer;
+
+ peer = us->peer.http;
+
+ peer->protocol = us->protocol;
peer->request->state = &nxt_http_proxy_header_send_state;
@@ -121,6 +174,19 @@ nxt_http_upstream_connect(nxt_task_t *task, nxt_http_upstream_t *upstream,
}
+static void
+nxt_http_proxy_upstream_error(nxt_task_t *task, nxt_upstream_server_t *us)
+{
+ nxt_http_request_t *r;
+
+ r = us->peer.http->request;
+
+ nxt_mp_release(r->mem_pool);
+
+ nxt_http_request_error(task, r, NXT_HTTP_BAD_GATEWAY);
+}
+
+
static const nxt_http_request_state_t nxt_http_proxy_header_send_state
nxt_aligned(64) =
{
diff --git a/src/nxt_http_route.c b/src/nxt_http_route.c
index df52894a..d7f20bcb 100644
--- a/src/nxt_http_route.c
+++ b/src/nxt_http_route.c
@@ -1114,6 +1114,12 @@ nxt_http_action_resolve(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_router_listener_application(tmcf, &name, action);
nxt_router_app_use(task, action->u.application, 1);
+ } else if (nxt_str_start(&name, "upstreams/", 10)) {
+ name.length -= 10;
+ name.start += 10;
+
+ nxt_upstream_find(tmcf->router_conf->upstreams, &name, action);
+
} else if (nxt_str_start(&name, "routes", 6)) {
if (name.length == 6) {
diff --git a/src/nxt_router.c b/src/nxt_router.c
index 46a6b921..d77ffa2b 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -1634,6 +1634,11 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
tmcf->router_conf->routes = routes;
}
+ ret = nxt_upstreams_create(task, tmcf, conf);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ return ret;
+ }
+
http = nxt_conf_get_path(conf, &http_path);
#if 0
if (http == NULL) {
@@ -2526,6 +2531,7 @@ nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
nxt_work_handler_t handler)
{
+ nxt_int_t ret;
nxt_joint_job_t *job;
nxt_queue_link_t *qlk;
nxt_socket_conf_t *skcf;
@@ -2559,6 +2565,11 @@ nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
job->work.data = joint;
+ ret = nxt_upstreams_joint_create(tmcf, &joint->upstreams);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ return ret;
+ }
+
joint->count = 1;
skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
diff --git a/src/nxt_router.h b/src/nxt_router.h
index 9ce9f3be..85ef9a6c 100644
--- a/src/nxt_router.h
+++ b/src/nxt_router.h
@@ -18,6 +18,8 @@ typedef struct nxt_http_request_s nxt_http_request_t;
typedef struct nxt_http_action_s nxt_http_action_t;
typedef struct nxt_http_routes_s nxt_http_routes_t;
+typedef struct nxt_upstream_s nxt_upstream_t;
+typedef struct nxt_upstreams_s nxt_upstreams_t;
typedef struct nxt_router_access_log_s nxt_router_access_log_t;
@@ -43,6 +45,7 @@ typedef struct {
nxt_router_t *router;
nxt_http_routes_t *routes;
+ nxt_upstreams_t *upstreams;
nxt_lvlhsh_t mtypes_hash;
@@ -196,6 +199,8 @@ typedef struct {
nxt_event_engine_t *engine;
nxt_socket_conf_t *socket_conf;
+ nxt_upstream_t **upstreams;
+
/* Modules configuraitons. */
} nxt_socket_conf_joint_t;
diff --git a/src/nxt_upstream.c b/src/nxt_upstream.c
index 3899a3ce..66b6619a 100644
--- a/src/nxt_upstream.c
+++ b/src/nxt_upstream.c
@@ -4,4 +4,137 @@
* Copyright (C) NGINX, Inc.
*/
-#include <nxt_main.h>
+#include <nxt_router.h>
+#include <nxt_http.h>
+#include <nxt_upstream.h>
+
+
+static nxt_http_action_t *nxt_upstream_handler(nxt_task_t *task,
+ nxt_http_request_t *r, nxt_http_action_t *action);
+
+
+nxt_int_t
+nxt_upstreams_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
+ nxt_conf_value_t *conf)
+{
+ size_t size;
+ uint32_t i, n, next;
+ nxt_mp_t *mp;
+ nxt_int_t ret;
+ nxt_str_t name, *string;
+ nxt_upstreams_t *upstreams;
+ nxt_conf_value_t *upstreams_conf, *upcf;
+
+ static nxt_str_t upstreams_name = nxt_string("upstreams");
+
+ upstreams_conf = nxt_conf_get_object_member(conf, &upstreams_name, NULL);
+
+ if (upstreams_conf == NULL) {
+ return NXT_OK;
+ }
+
+ n = nxt_conf_object_members_count(upstreams_conf);
+
+ if (n == 0) {
+ return NXT_OK;
+ }
+
+ mp = tmcf->router_conf->mem_pool;
+ size = sizeof(nxt_upstreams_t) + n * sizeof(nxt_upstream_t);
+
+ upstreams = nxt_mp_zalloc(mp, size);
+ if (nxt_slow_path(upstreams == NULL)) {
+ return NXT_ERROR;
+ }
+
+ upstreams->items = n;
+ next = 0;
+
+ for (i = 0; i < n; i++) {
+ upcf = nxt_conf_next_object_member(upstreams_conf, &name, &next);
+
+ string = nxt_str_dup(mp, &upstreams->upstream[i].name, &name);
+ if (nxt_slow_path(string == NULL)) {
+ return NXT_ERROR;
+ }
+
+ ret = nxt_upstream_round_robin_create(task, tmcf, upcf,
+ &upstreams->upstream[i]);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ return NXT_ERROR;
+ }
+ }
+
+ tmcf->router_conf->upstreams = upstreams;
+
+ return NXT_OK;
+}
+
+
+void
+nxt_upstream_find(nxt_upstreams_t *upstreams, nxt_str_t *name,
+ nxt_http_action_t *action)
+{
+ uint32_t i, n;
+ nxt_upstream_t *upstream;
+
+ upstream = &upstreams->upstream[0];
+ n = upstreams->items;
+
+ for (i = 0; i < n; i++) {
+ if (nxt_strstr_eq(&upstream[i].name, name)) {
+ action->u.upstream_number = i;
+ action->handler = nxt_upstream_handler;
+
+ return;
+ }
+ }
+}
+
+
+nxt_int_t
+nxt_upstreams_joint_create(nxt_router_temp_conf_t *tmcf,
+ nxt_upstream_t ***upstream_joint)
+{
+ uint32_t i, n;
+ nxt_upstream_t *u, **up;
+ nxt_upstreams_t *upstreams;
+ nxt_router_conf_t *router_conf;
+
+ router_conf = tmcf->router_conf;
+ upstreams = router_conf->upstreams;
+
+ if (upstreams == NULL) {
+ *upstream_joint = NULL;
+ return NXT_OK;
+ }
+
+ n = upstreams->items;
+
+ up = nxt_mp_zalloc(router_conf->mem_pool, n * sizeof(nxt_upstream_t *));
+ if (nxt_slow_path(up == NULL)) {
+ return NXT_ERROR;
+ }
+
+ u = &upstreams->upstream[0];
+
+ for (i = 0; i < n; i++) {
+ up[i] = u[i].proto->joint_create(tmcf, &u[i]);
+ if (nxt_slow_path(up[i] == NULL)) {
+ return NXT_ERROR;
+ }
+ }
+
+ *upstream_joint = up;
+
+ return NXT_OK;
+}
+
+
+static nxt_http_action_t *
+nxt_upstream_handler(nxt_task_t *task, nxt_http_request_t *r,
+ nxt_http_action_t *action)
+{
+ return nxt_upstream_proxy_handler(task, r,
+ r->conf->upstreams[action->u.upstream_number]);
+}
diff --git a/src/nxt_upstream.h b/src/nxt_upstream.h
index a25e0940..afc53774 100644
--- a/src/nxt_upstream.h
+++ b/src/nxt_upstream.h
@@ -8,4 +8,74 @@
#define _NXT_UPSTREAM_H_INCLUDED_
+typedef struct nxt_upstream_proxy_s nxt_upstream_proxy_t;
+typedef struct nxt_upstream_round_robin_s nxt_upstream_round_robin_t;
+typedef struct nxt_upstream_round_robin_server_s
+ nxt_upstream_round_robin_server_t;
+
+
+typedef void (*nxt_upstream_peer_ready_t)(nxt_task_t *task,
+ nxt_upstream_server_t *us);
+typedef void (*nxt_upstream_peer_error_t)(nxt_task_t *task,
+ nxt_upstream_server_t *us);
+
+
+typedef struct {
+ nxt_upstream_peer_ready_t ready;
+ nxt_upstream_peer_error_t error;
+} nxt_upstream_peer_state_t;
+
+
+typedef nxt_upstream_t *(*nxt_upstream_joint_create_t)(
+ nxt_router_temp_conf_t *tmcf, nxt_upstream_t *upstream);
+typedef void (*nxt_upstream_server_get_t)(nxt_task_t *task,
+ nxt_upstream_server_t *us);
+
+
+typedef struct {
+ nxt_upstream_joint_create_t joint_create;
+ nxt_upstream_server_get_t get;
+} nxt_upstream_server_proto_t;
+
+
+struct nxt_upstream_s {
+ const nxt_upstream_server_proto_t *proto;
+
+ union {
+ nxt_upstream_proxy_t *proxy;
+ nxt_upstream_round_robin_t *round_robin;
+ } type;
+
+ nxt_str_t name;
+};
+
+
+struct nxt_upstreams_s {
+ uint32_t items;
+ nxt_upstream_t upstream[0];
+};
+
+
+struct nxt_upstream_server_s {
+ nxt_sockaddr_t *sockaddr;
+ const nxt_upstream_peer_state_t *state;
+ nxt_upstream_t *upstream;
+
+ uint8_t protocol;
+
+ union {
+ nxt_upstream_round_robin_server_t *round_robin;
+ } server;
+
+ union {
+ nxt_http_peer_t *http;
+ } peer;
+};
+
+
+nxt_int_t nxt_upstream_round_robin_create(nxt_task_t *task,
+ nxt_router_temp_conf_t *tmcf, nxt_conf_value_t *upstream_conf,
+ nxt_upstream_t *upstream);
+
+
#endif /* _NXT_UPSTREAM_H_INCLUDED_ */
diff --git a/src/nxt_upstream_round_robin.c b/src/nxt_upstream_round_robin.c
index 4d53c716..fd76ecb5 100644
--- a/src/nxt_upstream_round_robin.c
+++ b/src/nxt_upstream_round_robin.c
@@ -4,3 +4,188 @@
* Copyright (C) NGINX, Inc.
*/
+#include <nxt_router.h>
+#include <nxt_http.h>
+#include <nxt_upstream.h>
+
+
+struct nxt_upstream_round_robin_server_s {
+ nxt_sockaddr_t *sockaddr;
+
+ int32_t current_weight;
+ int32_t effective_weight;
+ int32_t weight;
+
+ uint8_t protocol;
+};
+
+
+struct nxt_upstream_round_robin_s {
+ uint32_t items;
+ nxt_upstream_round_robin_server_t server[0];
+};
+
+
+static nxt_upstream_t *nxt_upstream_round_robin_joint_create(
+ nxt_router_temp_conf_t *tmcf, nxt_upstream_t *upstream);
+static void nxt_upstream_round_robin_server_get(nxt_task_t *task,
+ nxt_upstream_server_t *us);
+
+
+static const nxt_upstream_server_proto_t nxt_upstream_round_robin_proto = {
+ .joint_create = nxt_upstream_round_robin_joint_create,
+ .get = nxt_upstream_round_robin_server_get,
+};
+
+
+static nxt_conf_map_t nxt_upstream_round_robin_server_conf[] = {
+ {
+ nxt_string("weight"),
+ NXT_CONF_MAP_INT32,
+ offsetof(nxt_upstream_round_robin_server_t, weight),
+ },
+};
+
+
+nxt_int_t
+nxt_upstream_round_robin_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
+ nxt_conf_value_t *upstream_conf, nxt_upstream_t *upstream)
+{
+ size_t size;
+ uint32_t i, n, next;
+ nxt_mp_t *mp;
+ nxt_str_t name;
+ nxt_sockaddr_t *sa;
+ nxt_conf_value_t *servers_conf, *srvcf;
+ nxt_upstream_round_robin_t *urr;
+
+ static nxt_str_t servers = nxt_string("servers");
+
+ mp = tmcf->router_conf->mem_pool;
+
+ servers_conf = nxt_conf_get_object_member(upstream_conf, &servers, NULL);
+ n = nxt_conf_object_members_count(servers_conf);
+
+ size = sizeof(nxt_upstream_round_robin_t)
+ + n * sizeof(nxt_upstream_round_robin_server_t);
+
+ urr = nxt_mp_zalloc(mp, size);
+ if (nxt_slow_path(urr == NULL)) {
+ return NXT_ERROR;
+ }
+
+ urr->items = n;
+ next = 0;
+
+ for (i = 0; i < n; i++) {
+ srvcf = nxt_conf_next_object_member(servers_conf, &name, &next);
+
+ sa = nxt_sockaddr_parse(mp, &name);
+ if (nxt_slow_path(sa == NULL)) {
+ return NXT_ERROR;
+ }
+
+ sa->type = SOCK_STREAM;
+
+ urr->server[i].sockaddr = sa;
+ urr->server[i].weight = 1;
+ urr->server[i].protocol = NXT_HTTP_PROTO_H1;
+
+ nxt_conf_map_object(mp, srvcf, nxt_upstream_round_robin_server_conf,
+ nxt_nitems(nxt_upstream_round_robin_server_conf),
+ &urr->server[i]);
+
+ urr->server[i].effective_weight = urr->server[i].weight;
+ }
+
+ upstream->proto = &nxt_upstream_round_robin_proto;
+ upstream->type.round_robin = urr;
+
+ return NXT_OK;
+}
+
+
+static nxt_upstream_t *
+nxt_upstream_round_robin_joint_create(nxt_router_temp_conf_t *tmcf,
+ nxt_upstream_t *upstream)
+{
+ size_t size;
+ uint32_t i, n;
+ nxt_mp_t *mp;
+ nxt_upstream_t *u;
+ nxt_upstream_round_robin_t *urr, *urrcf;
+
+ mp = tmcf->router_conf->mem_pool;
+
+ u = nxt_mp_alloc(mp, sizeof(nxt_upstream_t));
+ if (nxt_slow_path(u == NULL)) {
+ return NULL;
+ }
+
+ *u = *upstream;
+
+ urrcf = upstream->type.round_robin;
+
+ size = sizeof(nxt_upstream_round_robin_t)
+ + urrcf->items * sizeof(nxt_upstream_round_robin_server_t);
+
+ urr = nxt_mp_alloc(mp, size);
+ if (nxt_slow_path(urr == NULL)) {
+ return NULL;
+ }
+
+ u->type.round_robin = urr;
+
+ n = urrcf->items;
+ urr->items = n;
+
+ for (i = 0; i < n; i++) {
+ urr->server[i] = urrcf->server[i];
+ }
+
+ return u;
+}
+
+
+static void
+nxt_upstream_round_robin_server_get(nxt_task_t *task, nxt_upstream_server_t *us)
+{
+ int32_t total;
+ uint32_t i, n;
+ nxt_upstream_round_robin_t *round_robin;
+ nxt_upstream_round_robin_server_t *s, *best;
+
+ best = NULL;
+ total = 0;
+
+ round_robin = us->upstream->type.round_robin;
+
+ s = round_robin->server;
+ n = round_robin->items;
+
+ for (i = 0; i < n; i++) {
+
+ s[i].current_weight += s[i].effective_weight;
+ total += s[i].effective_weight;
+
+ if (s[i].effective_weight < s[i].weight) {
+ s[i].effective_weight++;
+ }
+
+ if (best == NULL || s[i].current_weight > best->current_weight) {
+ best = &s[i];
+ }
+ }
+
+ if (best == NULL) {
+ us->state->error(task, us);
+ return;
+ }
+
+ best->current_weight -= total;
+ us->sockaddr = best->sockaddr;
+ us->protocol = best->protocol;
+ us->server.round_robin = best;
+
+ us->state->ready(task, us);
+}