summaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorIgor Sysoev <igor@sysoev.ru>2017-07-05 14:05:21 +0300
committerIgor Sysoev <igor@sysoev.ru>2017-07-05 14:05:21 +0300
commit4fe5d22dcc5d6e42c5faa6fe06dd076cde799324 (patch)
tree91f346ac5380888617f4ed32fa0933445c4bd65a /src
parentd28d502aa5c6982552ce6626088a77f3295465f6 (diff)
downloadunit-4fe5d22dcc5d6e42c5faa6fe06dd076cde799324.tar.gz
unit-4fe5d22dcc5d6e42c5faa6fe06dd076cde799324.tar.bz2
Router: processing JSON configuration.
Diffstat (limited to 'src')
-rw-r--r--src/nxt_conn_accept.c2
-rw-r--r--src/nxt_event_engine.h2
-rw-r--r--src/nxt_listen_socket.c84
-rw-r--r--src/nxt_listen_socket.h7
-rw-r--r--src/nxt_queue.h6
-rw-r--r--src/nxt_router.c446
-rw-r--r--src/nxt_router.h27
-rw-r--r--src/nxt_socket.c2
8 files changed, 445 insertions, 131 deletions
diff --git a/src/nxt_conn_accept.c b/src/nxt_conn_accept.c
index 4fcd530d..6f42cf3d 100644
--- a/src/nxt_conn_accept.c
+++ b/src/nxt_conn_accept.c
@@ -69,7 +69,7 @@ nxt_listen_event(nxt_task_t *task, nxt_listen_socket_t *ls)
if (nxt_conn_accept_alloc(task, lev) != NULL) {
nxt_fd_event_enable_accept(engine, &lev->socket);
- nxt_queue_insert_head(&engine->listen_connections, &lev->link);
+ nxt_queue_insert_tail(&engine->listen_connections, &lev->link);
}
return lev;
diff --git a/src/nxt_event_engine.h b/src/nxt_event_engine.h
index ceb1654e..d8741669 100644
--- a/src/nxt_event_engine.h
+++ b/src/nxt_event_engine.h
@@ -494,6 +494,8 @@ struct nxt_event_engine_s {
nxt_lvlhsh_t requests; /* req_id to nxt_req_conn_link_t */
nxt_queue_link_t link;
+ // STUB: router link
+ nxt_queue_link_t link0;
};
diff --git a/src/nxt_listen_socket.c b/src/nxt_listen_socket.c
index 56bff6ea..bbd6d34a 100644
--- a/src/nxt_listen_socket.c
+++ b/src/nxt_listen_socket.c
@@ -11,6 +11,90 @@ static u_char *nxt_listen_socket_log_handler(void *ctx, u_char *pos,
u_char *last);
+nxt_socket_t
+nxt_listen_socket_create0(nxt_task_t *task, nxt_sockaddr_t *sa,
+ nxt_uint_t flags)
+{
+ nxt_int_t ret;
+ nxt_socket_t s;
+
+ s = nxt_socket_create(task, sa->u.sockaddr.sa_family, sa->type, 0, flags);
+ if (nxt_slow_path(s == -1)) {
+ return s;
+ }
+
+ ret = nxt_socket_setsockopt(task, s, SOL_SOCKET, SO_REUSEADDR, 1);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ goto fail;
+ }
+
+#if (NXT_INET6)
+
+ if (sa->u.sockaddr.sa_family == AF_INET6) {
+ ret = nxt_socket_setsockopt(task, s, IPPROTO_IPV6, IPV6_V6ONLY, 1);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ goto fail;
+ }
+ }
+
+#endif
+
+#ifdef TCP_DEFER_ACCEPT
+
+ /* Defer Linux accept() up to for 1 second. */
+ (void) nxt_socket_setsockopt(task, s, IPPROTO_TCP, TCP_DEFER_ACCEPT, 1);
+
+#endif
+
+ ret = nxt_socket_bind(task, s, sa, 0);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ goto fail;
+ }
+
+#if (NXT_HAVE_UNIX_DOMAIN)
+
+ if (sa->u.sockaddr.sa_family == AF_UNIX) {
+ nxt_file_name_t *name;
+ nxt_file_access_t access;
+
+ name = (nxt_file_name_t *) sa->u.sockaddr_un.sun_path;
+
+ access = (S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
+
+ ret = nxt_file_set_access(name, access);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ goto fail;
+ }
+ }
+
+#endif
+
+ return s;
+
+fail:
+
+ nxt_socket_close(task, s);
+
+ return -1;
+}
+
+
+nxt_int_t
+nxt_listen_socket(nxt_task_t *task, nxt_socket_t s, int backlog)
+{
+ nxt_debug(task, "listen(%d, %d)", s, backlog);
+
+ if (nxt_fast_path(listen(s, backlog) == 0)) {
+ return NXT_OK;
+ }
+
+ nxt_log(task, NXT_LOG_CRIT, "listen(%d, %d) failed %E",
+ s, backlog, nxt_socket_errno);
+
+ return NXT_ERROR;
+}
+
+
nxt_int_t
nxt_listen_socket_create(nxt_task_t *task, nxt_listen_socket_t *ls,
nxt_bool_t bind_test)
diff --git a/src/nxt_listen_socket.h b/src/nxt_listen_socket.h
index 907c3a6e..c58850ba 100644
--- a/src/nxt_listen_socket.h
+++ b/src/nxt_listen_socket.h
@@ -30,8 +30,6 @@ typedef struct {
uint8_t socklen;
uint8_t address_length;
-
- uint32_t count;
} nxt_listen_socket_t;
@@ -51,6 +49,11 @@ typedef struct {
#endif
+NXT_EXPORT nxt_socket_t nxt_listen_socket_create0(nxt_task_t *task,
+ nxt_sockaddr_t *sa, nxt_uint_t flags);
+NXT_EXPORT nxt_int_t nxt_listen_socket(nxt_task_t *task, nxt_socket_t s,
+ int backlog);
+
NXT_EXPORT nxt_int_t nxt_listen_socket_create(nxt_task_t *task,
nxt_listen_socket_t *ls, nxt_bool_t bind_test);
NXT_EXPORT nxt_int_t nxt_listen_socket_update(nxt_task_t *task,
diff --git a/src/nxt_queue.h b/src/nxt_queue.h
index f1efd328..e107b6aa 100644
--- a/src/nxt_queue.h
+++ b/src/nxt_queue.h
@@ -193,7 +193,11 @@ nxt_queue_truncate(queue, link) \
} while (0)
-/* Add the queue "tail" to the queue "queue". */
+/*
+ * Add the queue "tail" to the queue "queue".
+ * If the queue "tail" is intended to be reused again,
+ * it must be initiated with nxt_queue_init(tail).
+ */
#define \
nxt_queue_add(queue, tail) \
diff --git a/src/nxt_router.c b/src/nxt_router.c
index 34d163dc..6e9bb994 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -6,37 +6,43 @@
*/
#include <nxt_router.h>
+#include <nxt_conf.h>
#include <nxt_application.h>
+typedef struct {
+ nxt_str_t application_type;
+ uint32_t application_workers;
+} nxt_router_listener_conf_t;
+
+
static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task,
nxt_router_t *router);
static void nxt_router_listen_sockets_sort(nxt_router_t *router,
nxt_router_temp_conf_t *tmcf);
-static nxt_int_t nxt_router_stub_conf(nxt_task_t *task,
- nxt_router_temp_conf_t *tmcf);
+static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
+ nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
static nxt_int_t nxt_router_listen_sockets_stub_create(nxt_task_t *task,
nxt_router_temp_conf_t *tmcf);
static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task, nxt_mp_t *mp,
nxt_sockaddr_t *sa);
-static nxt_sockaddr_t *nxt_router_listen_sockaddr_stub(nxt_task_t *task,
- nxt_mp_t *mp, uint32_t port);
static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
const nxt_event_interface_t *interface);
-static nxt_int_t nxt_router_engine_conf_create(nxt_task_t *task, nxt_mp_t *mp,
- nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf);
-static nxt_int_t nxt_router_engine_conf_update(nxt_task_t *task, nxt_mp_t *mp,
- nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf);
-static nxt_int_t nxt_router_engine_conf_delete(nxt_task_t *task, nxt_mp_t *mp,
- nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf);
-static nxt_int_t nxt_router_engine_joints_create(nxt_task_t *task, nxt_mp_t *mp,
+static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
+ nxt_router_engine_conf_t *recf);
+static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
+ nxt_router_engine_conf_t *recf);
+static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
+ nxt_router_engine_conf_t *recf);
+static void nxt_router_engine_socket_count(nxt_queue_t *sockets);
+static nxt_int_t nxt_router_engine_joints_create(nxt_mp_t *mp,
nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array,
nxt_work_handler_t handler);
-static nxt_int_t nxt_router_engine_joints_delete(nxt_task_t *task,
- nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array);
+static nxt_int_t nxt_router_engine_joints_delete(nxt_router_engine_conf_t *recf,
+ nxt_queue_t *sockets);
static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
nxt_router_temp_conf_t *tmcf);
@@ -78,10 +84,8 @@ static nxt_msec_t nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data);
nxt_int_t
nxt_router_start(nxt_task_t *task, nxt_runtime_t *rt)
{
- nxt_int_t ret;
- nxt_router_t *router;
- nxt_router_temp_conf_t *tmcf;
- const nxt_event_interface_t *interface;
+ nxt_int_t ret;
+ nxt_router_t *router;
ret = nxt_app_http_init(task, rt);
if (nxt_slow_path(ret != NXT_OK)) {
@@ -96,14 +100,24 @@ nxt_router_start(nxt_task_t *task, nxt_runtime_t *rt)
nxt_queue_init(&router->engines);
nxt_queue_init(&router->sockets);
- /**/
+ return NXT_OK;
+}
+
+
+nxt_int_t
+nxt_router_new_conf(nxt_task_t *task, nxt_runtime_t *rt, nxt_router_t *router,
+ u_char *start, u_char *end)
+{
+ nxt_int_t ret;
+ nxt_router_temp_conf_t *tmcf;
+ const nxt_event_interface_t *interface;
tmcf = nxt_router_temp_conf(task, router);
if (nxt_slow_path(tmcf == NULL)) {
return NXT_ERROR;
}
- ret = nxt_router_stub_conf(task, tmcf);
+ ret = nxt_router_conf_create(task, tmcf, start, end);
if (nxt_slow_path(ret != NXT_OK)) {
return ret;
}
@@ -132,6 +146,8 @@ nxt_router_start(nxt_task_t *task, nxt_runtime_t *rt)
nxt_queue_add(&router->sockets, &tmcf->updating);
nxt_queue_add(&router->sockets, &tmcf->creating);
+// nxt_mp_destroy(tmcf->mem_pool);
+
return NXT_OK;
}
@@ -196,34 +212,159 @@ fail:
}
+static nxt_conf_map_t nxt_router_conf[] = {
+ {
+ nxt_string("threads"),
+ NXT_CONF_MAP_INT32,
+ offsetof(nxt_router_conf_t, threads),
+ },
+
+ {
+ nxt_null_string, 0, 0,
+ },
+};
+
+
+static nxt_conf_map_t nxt_router_listener_conf[] = {
+ {
+ nxt_string("_application_type"),
+ NXT_CONF_MAP_STR,
+ offsetof(nxt_router_listener_conf_t, application_type),
+ },
+
+ {
+ nxt_string("_application_workers"),
+ NXT_CONF_MAP_INT32,
+ offsetof(nxt_router_listener_conf_t, application_workers),
+ },
+
+ {
+ nxt_null_string, 0, 0,
+ },
+};
+
+
+static nxt_conf_map_t nxt_router_http_conf[] = {
+ {
+ nxt_string("header_buffer_size"),
+ NXT_CONF_MAP_SIZE,
+ offsetof(nxt_socket_conf_t, header_buffer_size),
+ },
+
+ {
+ nxt_string("large_header_buffer_size"),
+ NXT_CONF_MAP_SIZE,
+ offsetof(nxt_socket_conf_t, large_header_buffer_size),
+ },
+
+ {
+ nxt_string("header_read_timeout"),
+ NXT_CONF_MAP_MSEC,
+ offsetof(nxt_socket_conf_t, header_read_timeout),
+ },
+
+ {
+ nxt_null_string, 0, 0,
+ },
+};
+
+
static nxt_int_t
-nxt_router_stub_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
+nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
+ u_char *start, u_char *end)
{
- nxt_mp_t *mp;
- nxt_sockaddr_t *sa;
- nxt_socket_conf_t *skcf;
+ nxt_mp_t *mp;
+ uint32_t next;
+ nxt_int_t ret;
+ nxt_str_t name;
+ nxt_sockaddr_t *sa;
+ nxt_conf_value_t *conf, *listeners, *router, *http, *listener;
+ nxt_socket_conf_t *skcf;
+ nxt_router_listener_conf_t lscf;
+
+ static nxt_str_t router_path = nxt_string("/router");
+ static nxt_str_t http_path = nxt_string("/http");
+ static nxt_str_t listeners_path = nxt_string("/listeners");
+
+ conf = nxt_conf_json_parse(tmcf->mem_pool, start, end);
+ if (conf == NULL) {
+ nxt_log(task, NXT_LOG_CRIT, "configuration parsing error");
+ return NXT_ERROR;
+ }
+
+ router = nxt_conf_get_path(conf, &router_path);
+
+ if (router == NULL) {
+ nxt_log(task, NXT_LOG_CRIT, "no \"/router\" block");
+ return NXT_ERROR;
+ }
+
+ ret = nxt_conf_map_object(router, nxt_router_conf, tmcf->conf);
+ if (ret != NXT_OK) {
+ nxt_log(task, NXT_LOG_CRIT, "router map error");
+ return NXT_ERROR;
+ }
- tmcf->conf->threads = 1;
+ http = nxt_conf_get_path(conf, &http_path);
+
+ if (http == NULL) {
+ nxt_log(task, NXT_LOG_CRIT, "no \"/http\" block");
+ return NXT_ERROR;
+ }
+
+ listeners = nxt_conf_get_path(conf, &listeners_path);
+
+ if (listeners == NULL) {
+ nxt_log(task, NXT_LOG_CRIT, "no \"/listeners\" block");
+ return NXT_ERROR;
+ }
mp = tmcf->conf->mem_pool;
- sa = nxt_router_listen_sockaddr_stub(task, mp, 8000);
- skcf = nxt_router_socket_conf(task, mp, sa);
+ next = 0;
+
+ for ( ;; ) {
+ listener = nxt_conf_next_object_member(listeners, &name, &next);
+ if (listener == NULL) {
+ break;
+ }
+
+ sa = nxt_sockaddr_parse(mp, &name);
+ if (sa == NULL) {
+ nxt_log(task, NXT_LOG_CRIT, "invalid listener \"%V\"", &name);
+ return NXT_ERROR;
+ }
+
+ sa->type = SOCK_STREAM;
+
+ nxt_debug(task, "router listener: \"%*s\"",
+ sa->length, nxt_sockaddr_start(sa));
+
+ skcf = nxt_router_socket_conf(task, mp, sa);
+ if (skcf == NULL) {
+ return NXT_ERROR;
+ }
- skcf->listen.handler = nxt_router_conn_init;
- skcf->header_buffer_size = 2048;
- skcf->large_header_buffer_size = 8192;
- skcf->header_read_timeout = 5000;
+ ret = nxt_conf_map_object(listener, nxt_router_listener_conf, &lscf);
+ if (ret != NXT_OK) {
+ nxt_log(task, NXT_LOG_CRIT, "listener map error");
+ return NXT_ERROR;
+ }
- nxt_queue_insert_tail(&tmcf->pending, &skcf->link);
+ nxt_debug(task, "router type: %V", &lscf.application_type);
+ nxt_debug(task, "router workers: %D", lscf.application_workers);
- sa = nxt_router_listen_sockaddr_stub(task, mp, 8001);
- skcf = nxt_router_socket_conf(task, mp, sa);
+ ret = nxt_conf_map_object(http, nxt_router_http_conf, skcf);
+ if (ret != NXT_OK) {
+ nxt_log(task, NXT_LOG_CRIT, "http map error");
+ return NXT_ERROR;
+ }
- skcf->listen.handler = nxt_stream_connection_init;
- skcf->header_read_timeout = 5000;
+ skcf->listen.handler = nxt_router_conn_init;
+ skcf->router_conf = tmcf->conf;
- nxt_queue_insert_tail(&tmcf->pending, &skcf->link);
+ nxt_queue_insert_tail(&tmcf->pending, &skcf->link);
+ }
return NXT_OK;
}
@@ -239,6 +380,8 @@ nxt_router_socket_conf(nxt_task_t *task, nxt_mp_t *mp, nxt_sockaddr_t *sa)
return NULL;
}
+ conf->sockaddr = sa;
+
conf->listen.sockaddr = sa;
conf->listen.socklen = sa->socklen;
conf->listen.address_length = sa->length;
@@ -252,31 +395,6 @@ nxt_router_socket_conf(nxt_task_t *task, nxt_mp_t *mp, nxt_sockaddr_t *sa)
}
-static nxt_sockaddr_t *
-nxt_router_listen_sockaddr_stub(nxt_task_t *task, nxt_mp_t *mp, uint32_t port)
-{
- nxt_sockaddr_t *sa;
- struct sockaddr_in sin;
-
- nxt_memzero(&sin, sizeof(struct sockaddr_in));
-
- sin.sin_family = AF_INET;
- sin.sin_port = htons(port);
-
- sa = nxt_sockaddr_create(mp, (struct sockaddr *) &sin,
- sizeof(struct sockaddr_in), NXT_INET_ADDR_STR_LEN);
- if (nxt_slow_path(sa == NULL)) {
- return NULL;
- }
-
- sa->type = SOCK_STREAM;
-
- nxt_sockaddr_text(sa);
-
- return sa;
-}
-
-
static void
nxt_router_listen_sockets_sort(nxt_router_t *router,
nxt_router_temp_conf_t *tmcf)
@@ -297,9 +415,10 @@ nxt_router_listen_sockets_sort(nxt_router_t *router,
{
oskcf = nxt_queue_link_data(oqlk, nxt_socket_conf_t, link);
- if (nxt_sockaddr_cmp(nskcf->listen.sockaddr,
- oskcf->listen.sockaddr))
- {
+ if (nxt_sockaddr_cmp(nskcf->sockaddr, oskcf->sockaddr)) {
+ nskcf->socket = oskcf->socket;
+ nskcf->listen.socket = oskcf->listen.socket;
+
nxt_queue_remove(oqlk);
nxt_queue_insert_tail(&tmcf->keeping, oqlk);
@@ -312,6 +431,7 @@ nxt_router_listen_sockets_sort(nxt_router_t *router,
}
nxt_queue_add(&tmcf->deleting, &router->sockets);
+ nxt_queue_init(&router->sockets);
}
@@ -319,19 +439,40 @@ static nxt_int_t
nxt_router_listen_sockets_stub_create(nxt_task_t *task,
nxt_router_temp_conf_t *tmcf)
{
- nxt_queue_link_t *qlk, *nqlk;
- nxt_socket_conf_t *skcf;
+ nxt_int_t ret;
+ nxt_socket_t s;
+ nxt_queue_link_t *qlk, *nqlk;
+ nxt_socket_conf_t *skcf;
+ nxt_router_socket_t *rtsk;
for (qlk = nxt_queue_first(&tmcf->pending);
qlk != nxt_queue_tail(&tmcf->pending);
qlk = nqlk)
{
+ rtsk = nxt_malloc(sizeof(nxt_router_socket_t));
+ if (nxt_slow_path(rtsk == NULL)) {
+ return NXT_ERROR;
+ }
+
+ rtsk->count = 0;
+
skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
+ skcf->socket = rtsk;
+
+ s = nxt_listen_socket_create0(task, skcf->sockaddr, NXT_NONBLOCK);
+ if (nxt_slow_path(s == -1)) {
+ return NXT_ERROR;
+ }
- if (nxt_listen_socket_create(task, &skcf->listen, 0) != NXT_OK) {
+ ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG);
+ if (nxt_slow_path(ret != NXT_OK)) {
return NXT_ERROR;
}
+ skcf->listen.socket = s;
+
+ rtsk->fd = s;
+
nqlk = nxt_queue_next(qlk);
nxt_queue_remove(qlk);
nxt_queue_insert_tail(&tmcf->creating, qlk);
@@ -345,13 +486,11 @@ static nxt_int_t
nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
{
- nxt_mp_t *mp;
nxt_int_t ret;
nxt_uint_t n, threads;
nxt_queue_link_t *qlk;
nxt_router_engine_conf_t *recf;
- mp = tmcf->conf->mem_pool;
threads = tmcf->conf->threads;
tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
@@ -371,15 +510,15 @@ nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
return NXT_ERROR;
}
- recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link);
+ recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0);
// STUB
recf->task = recf->engine->task;
if (n < threads) {
- ret = nxt_router_engine_conf_update(task, mp, tmcf, recf);
+ ret = nxt_router_engine_conf_update(tmcf, recf);
} else {
- ret = nxt_router_engine_conf_delete(task, mp, tmcf, recf);
+ ret = nxt_router_engine_conf_delete(tmcf, recf);
}
if (nxt_slow_path(ret != NXT_OK)) {
@@ -404,11 +543,13 @@ nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
// STUB
recf->task = recf->engine->task;
- ret = nxt_router_engine_conf_create(task, mp, tmcf, recf);
+ ret = nxt_router_engine_conf_create(tmcf, recf);
if (nxt_slow_path(ret != NXT_OK)) {
return ret;
}
+ nxt_queue_insert_tail(&router->engines, &recf->engine->link0);
+
n++;
}
@@ -417,39 +558,61 @@ nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
static nxt_int_t
-nxt_router_engine_conf_create(nxt_task_t *task, nxt_mp_t *mp,
- nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf)
+nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
+ nxt_router_engine_conf_t *recf)
{
- nxt_int_t ret;
+ nxt_mp_t *mp;
+ nxt_int_t ret;
+ nxt_thread_spinlock_t *lock;
recf->creating = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t));
if (nxt_slow_path(recf->creating == NULL)) {
return NXT_ERROR;
}
- ret = nxt_router_engine_joints_create(task, mp, recf, &tmcf->creating,
+ mp = tmcf->conf->mem_pool;
+
+ ret = nxt_router_engine_joints_create(mp, recf, &tmcf->creating,
recf->creating, nxt_router_listen_socket_create);
if (nxt_slow_path(ret != NXT_OK)) {
return ret;
}
- return nxt_router_engine_joints_create(task, mp, recf, &tmcf->updating,
+ ret = nxt_router_engine_joints_create(mp, recf, &tmcf->updating,
recf->creating, nxt_router_listen_socket_create);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ return ret;
+ }
+
+ lock = &tmcf->conf->router->lock;
+
+ nxt_thread_spin_lock(lock);
+
+ nxt_router_engine_socket_count(&tmcf->creating);
+ nxt_router_engine_socket_count(&tmcf->updating);
+
+ nxt_thread_spin_unlock(lock);
+
+ return ret;
}
static nxt_int_t
-nxt_router_engine_conf_update(nxt_task_t *task, nxt_mp_t *mp,
- nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf)
+nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
+ nxt_router_engine_conf_t *recf)
{
- nxt_int_t ret;
+ nxt_mp_t *mp;
+ nxt_int_t ret;
+ nxt_thread_spinlock_t *lock;
recf->creating = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t));
if (nxt_slow_path(recf->creating == NULL)) {
return NXT_ERROR;
}
- ret = nxt_router_engine_joints_create(task, mp, recf, &tmcf->creating,
+ mp = tmcf->conf->mem_pool;
+
+ ret = nxt_router_engine_joints_create(mp, recf, &tmcf->creating,
recf->creating, nxt_router_listen_socket_create);
if (nxt_slow_path(ret != NXT_OK)) {
return ret;
@@ -460,7 +623,7 @@ nxt_router_engine_conf_update(nxt_task_t *task, nxt_mp_t *mp,
return NXT_ERROR;
}
- ret = nxt_router_engine_joints_create(task, mp, recf, &tmcf->updating,
+ ret = nxt_router_engine_joints_create(mp, recf, &tmcf->updating,
recf->updating, nxt_router_listen_socket_update);
if (nxt_slow_path(ret != NXT_OK)) {
return ret;
@@ -471,14 +634,26 @@ nxt_router_engine_conf_update(nxt_task_t *task, nxt_mp_t *mp,
return NXT_ERROR;
}
- return nxt_router_engine_joints_delete(task, recf, &tmcf->deleting,
- recf->deleting);
+ ret = nxt_router_engine_joints_delete(recf, &tmcf->deleting);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ return ret;
+ }
+
+ lock = &tmcf->conf->router->lock;
+
+ nxt_thread_spin_lock(lock);
+
+ nxt_router_engine_socket_count(&tmcf->creating);
+
+ nxt_thread_spin_unlock(lock);
+
+ return ret;
}
static nxt_int_t
-nxt_router_engine_conf_delete(nxt_task_t *task, nxt_mp_t *mp,
- nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf)
+nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
+ nxt_router_engine_conf_t *recf)
{
nxt_int_t ret;
@@ -487,20 +662,18 @@ nxt_router_engine_conf_delete(nxt_task_t *task, nxt_mp_t *mp,
return NXT_ERROR;
}
- ret = nxt_router_engine_joints_delete(task, recf, &tmcf->updating,
- recf->deleting);
+ ret = nxt_router_engine_joints_delete(recf, &tmcf->updating);
if (nxt_slow_path(ret != NXT_OK)) {
return ret;
}
- return nxt_router_engine_joints_delete(task, recf, &tmcf->deleting,
- recf->deleting);
+ return nxt_router_engine_joints_delete(recf, &tmcf->deleting);
}
static nxt_int_t
-nxt_router_engine_joints_create(nxt_task_t *task, nxt_mp_t *mp,
- nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array,
+nxt_router_engine_joints_create(nxt_mp_t *mp, nxt_router_engine_conf_t *recf,
+ nxt_queue_t *sockets, nxt_array_t *array,
nxt_work_handler_t handler)
{
nxt_work_t *work;
@@ -531,15 +704,33 @@ nxt_router_engine_joints_create(nxt_task_t *task, nxt_mp_t *mp,
joint->count = 1;
joint->socket_conf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
joint->engine = recf->engine;
+
+ nxt_queue_insert_tail(&joint->engine->joints, &joint->link);
}
return NXT_OK;
}
+static void
+nxt_router_engine_socket_count(nxt_queue_t *sockets)
+{
+ nxt_queue_link_t *qlk;
+ nxt_socket_conf_t *skcf;
+
+ for (qlk = nxt_queue_first(sockets);
+ qlk != nxt_queue_tail(sockets);
+ qlk = nxt_queue_next(qlk))
+ {
+ skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
+ skcf->socket->count++;
+ }
+}
+
+
static nxt_int_t
-nxt_router_engine_joints_delete(nxt_task_t *task,
- nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array)
+nxt_router_engine_joints_delete(nxt_router_engine_conf_t *recf,
+ nxt_queue_t *sockets)
{
nxt_work_t *work;
nxt_queue_link_t *qlk;
@@ -548,7 +739,7 @@ nxt_router_engine_joints_delete(nxt_task_t *task,
qlk != nxt_queue_tail(sockets);
qlk = nxt_queue_next(qlk))
{
- work = nxt_array_add(array);
+ work = nxt_array_add(recf->deleting);
if (nxt_slow_path(work == NULL)) {
return NXT_ERROR;
}
@@ -611,7 +802,6 @@ nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
nxt_queue_insert_tail(&rt->engines, &engine->link);
-
process = nxt_runtime_process_find(rt, nxt_pid);
if (nxt_slow_path(process == NULL)) {
return NXT_ERROR;
@@ -640,7 +830,6 @@ nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
nxt_runtime_port_add(rt, port);
-
ret = nxt_thread_create(&handle, link);
if (nxt_slow_path(ret != NXT_OK)) {
@@ -672,11 +861,31 @@ nxt_router_engine_post(nxt_router_engine_conf_t *recf)
nxt_uint_t n;
nxt_work_t *work;
- work = recf->creating->elts;
+ if (recf->creating != NULL) {
+ work = recf->creating->elts;
+
+ for (n = recf->creating->nelts; n != 0; n--) {
+ nxt_event_engine_post(recf->engine, work);
+ work++;
+ }
+ }
+
+ if (recf->updating != NULL) {
+ work = recf->updating->elts;
+
+ for (n = recf->updating->nelts; n != 0; n--) {
+ nxt_event_engine_post(recf->engine, work);
+ work++;
+ }
+ }
+
+ if (recf->deleting != NULL) {
+ work = recf->deleting->elts;
- for (n = recf->creating->nelts; n != 0; n--) {
- nxt_event_engine_post(recf->engine, work);
- work++;
+ for (n = recf->deleting->nelts; n != 0; n--) {
+ nxt_event_engine_post(recf->engine, work);
+ work++;
+ }
}
}
@@ -750,19 +959,19 @@ nxt_inline nxt_listen_event_t *
nxt_router_listen_event(nxt_queue_t *listen_connections,
nxt_socket_conf_t *skcf)
{
- nxt_socket_t socket;
- nxt_queue_link_t *link;
+ nxt_socket_t fd;
+ nxt_queue_link_t *qlk;
nxt_listen_event_t *listen;
- socket = skcf->listen.socket;
+ fd = skcf->socket->fd;
- for (link = nxt_queue_first(listen_connections);
- link != nxt_queue_tail(listen_connections);
- link = nxt_queue_next(link))
+ for (qlk = nxt_queue_first(listen_connections);
+ qlk != nxt_queue_tail(listen_connections);
+ qlk = nxt_queue_next(qlk))
{
- listen = nxt_queue_link_data(link, nxt_listen_event_t, link);
+ listen = nxt_queue_link_data(qlk, nxt_listen_event_t, link);
- if (socket == listen->socket.fd) {
+ if (fd == listen->socket.fd) {
return listen;
}
}
@@ -834,25 +1043,23 @@ static void
nxt_router_listen_socket_release(nxt_task_t *task,
nxt_socket_conf_joint_t *joint)
{
- nxt_socket_t s;
- nxt_listen_socket_t *ls;
+ nxt_router_socket_t *rtsk;
nxt_thread_spinlock_t *lock;
- s = -1;
- ls = &joint->socket_conf->listen;
+ rtsk = joint->socket_conf->socket;
lock = &joint->socket_conf->router_conf->router->lock;
nxt_thread_spin_lock(lock);
- if (--ls->count == 0) {
- s = ls->socket;
- ls->socket = -1;
+ if (--rtsk->count != 0) {
+ rtsk = NULL;
}
nxt_thread_spin_unlock(lock);
- if (s != -1) {
- nxt_socket_close(task, s);
+ if (rtsk != NULL) {
+ nxt_socket_close(task, rtsk->fd);
+ nxt_free(rtsk);
}
nxt_router_conf_release(task, joint);
@@ -894,6 +1101,7 @@ nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
nxt_thread_spin_unlock(lock);
if (rtcf != NULL) {
+ nxt_debug(task, "old router conf is destroyed");
nxt_mp_destroy(rtcf->mem_pool);
}
diff --git a/src/nxt_router.h b/src/nxt_router.h
index 9e2dd0a6..9c6f75b4 100644
--- a/src/nxt_router.h
+++ b/src/nxt_router.h
@@ -17,7 +17,7 @@ typedef struct {
nxt_thread_spinlock_t lock;
nxt_queue_t engines;
- nxt_queue_t sockets;
+ nxt_queue_t sockets; /* of nxt_socket_conf_t */
} nxt_router_t;
@@ -39,11 +39,11 @@ typedef struct {
typedef struct {
- nxt_queue_t creating;
- nxt_queue_t pending;
- nxt_queue_t updating;
- nxt_queue_t keeping;
- nxt_queue_t deleting;
+ nxt_queue_t creating; /* of nxt_socket_conf_t */
+ nxt_queue_t pending; /* of nxt_socket_conf_t */
+ nxt_queue_t updating; /* of nxt_socket_conf_t */
+ nxt_queue_t keeping; /* of nxt_socket_conf_t */
+ nxt_queue_t deleting; /* of nxt_socket_conf_t */
uint32_t new_threads;
@@ -55,9 +55,18 @@ typedef struct {
typedef struct {
uint32_t count;
- nxt_listen_socket_t listen;
+ nxt_socket_t fd;
+} nxt_router_socket_t;
+
+
+typedef struct {
+ uint32_t count;
nxt_queue_link_t link;
+ nxt_router_socket_t *socket;
nxt_router_conf_t *router_conf;
+ nxt_sockaddr_t *sockaddr;
+
+ nxt_listen_socket_t listen;
size_t header_buffer_size;
size_t large_header_buffer_size;
@@ -75,4 +84,8 @@ typedef struct {
} nxt_socket_conf_joint_t;
+nxt_int_t nxt_router_new_conf(nxt_task_t *task, nxt_runtime_t *rt,
+ nxt_router_t *router, u_char *start, u_char *end);
+
+
#endif /* _NXT_ROUTER_H_INCLUDED_ */
diff --git a/src/nxt_socket.c b/src/nxt_socket.c
index 2663b855..c174081c 100644
--- a/src/nxt_socket.c
+++ b/src/nxt_socket.c
@@ -145,7 +145,7 @@ nxt_socket_sockopt_name(nxt_uint_t level, nxt_uint_t sockopt)
break;
-#if (NXT_INET6 && defined IPV6_V6ONLY)
+#if (NXT_INET6)
case IPPROTO_IPV6:
switch (sockopt) {