summaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/nxt_application.h8
-rw-r--r--src/nxt_router.c270
-rw-r--r--src/nxt_router.h24
3 files changed, 267 insertions, 35 deletions
diff --git a/src/nxt_application.h b/src/nxt_application.h
index 9b51ba6d..5a585c89 100644
--- a/src/nxt_application.h
+++ b/src/nxt_application.h
@@ -9,6 +9,14 @@
#define _NXT_APPLICATION_H_INCLUDED_
+typedef enum {
+ NXT_APP_PYTHON = 0,
+ NXT_APP_PHP,
+ NXT_APP_RUBY,
+ NXT_APP_GO,
+} nxt_app_type_t;
+
+
typedef struct {
nxt_str_t name;
nxt_str_t value;
diff --git a/src/nxt_router.c b/src/nxt_router.c
index b4f31536..f6d2ae34 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -7,12 +7,16 @@
#include <nxt_router.h>
#include <nxt_conf.h>
-#include <nxt_application.h>
typedef struct {
- nxt_str_t application_type;
- uint32_t application_workers;
+ nxt_str_t type;
+ uint32_t workers;
+} nxt_router_app_conf_t;
+
+
+typedef struct {
+ nxt_str_t application;
} nxt_router_listener_conf_t;
@@ -23,6 +27,9 @@ static void nxt_router_listen_sockets_sort(nxt_router_t *router,
static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
+static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
+static nxt_app_t *nxt_router_listener_application(nxt_router_temp_conf_t *tmcf,
+ nxt_str_t *name);
static nxt_int_t nxt_router_listen_sockets_stub_create(nxt_task_t *task,
nxt_router_temp_conf_t *tmcf);
static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task, nxt_mp_t *mp,
@@ -48,6 +55,8 @@ static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
nxt_router_temp_conf_t *tmcf);
static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
nxt_event_engine_t *engine);
+static void nxt_router_apps_sort(nxt_router_t *router,
+ nxt_router_temp_conf_t *tmcf);
static void nxt_router_engines_post(nxt_router_temp_conf_t *tmcf);
static void nxt_router_engine_post(nxt_router_engine_conf_t *recf);
@@ -100,6 +109,7 @@ nxt_router_start(nxt_task_t *task, nxt_runtime_t *rt)
nxt_queue_init(&router->engines);
nxt_queue_init(&router->sockets);
+ nxt_queue_init(&router->apps);
nxt_router = router;
@@ -144,6 +154,8 @@ nxt_router_new_conf(nxt_task_t *task, nxt_runtime_t *rt, nxt_router_t *router,
return ret;
}
+ nxt_router_apps_sort(router, tmcf);
+
nxt_router_engines_post(tmcf);
nxt_queue_add(&router->sockets, &tmcf->updating);
@@ -200,6 +212,8 @@ nxt_router_temp_conf(nxt_task_t *task, nxt_router_t *router)
nxt_queue_init(&tmcf->updating);
nxt_queue_init(&tmcf->pending);
nxt_queue_init(&tmcf->creating);
+ nxt_queue_init(&tmcf->apps);
+ nxt_queue_init(&tmcf->previous);
return tmcf;
@@ -217,7 +231,7 @@ fail:
static nxt_conf_map_t nxt_router_conf[] = {
{
- nxt_string("threads"),
+ nxt_string("listeners_threads"),
NXT_CONF_MAP_INT32,
offsetof(nxt_router_conf_t, threads),
},
@@ -228,17 +242,30 @@ static nxt_conf_map_t nxt_router_conf[] = {
};
-static nxt_conf_map_t nxt_router_listener_conf[] = {
+static nxt_conf_map_t nxt_router_app_conf[] = {
{
- nxt_string("_application_type"),
+ nxt_string("type"),
NXT_CONF_MAP_STR,
- offsetof(nxt_router_listener_conf_t, application_type),
+ offsetof(nxt_router_app_conf_t, type),
},
{
- nxt_string("_application_workers"),
+ nxt_string("workers"),
NXT_CONF_MAP_INT32,
- offsetof(nxt_router_listener_conf_t, application_workers),
+ offsetof(nxt_router_app_conf_t, workers),
+ },
+
+ {
+ nxt_null_string, 0, 0,
+ },
+};
+
+
+static nxt_conf_map_t nxt_router_listener_conf[] = {
+ {
+ nxt_string("application"),
+ NXT_CONF_MAP_STR,
+ offsetof(nxt_router_listener_conf_t, application),
},
{
@@ -276,17 +303,25 @@ static nxt_int_t
nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
u_char *start, u_char *end)
{
+ u_char *p;
+ size_t size;
nxt_mp_t *mp;
uint32_t next;
nxt_int_t ret;
nxt_str_t name;
+ nxt_app_t *app, *prev;
+ nxt_app_type_t type;
nxt_sockaddr_t *sa;
- nxt_conf_value_t *conf, *listeners, *router, *http, *listener;
+ nxt_queue_link_t *qlk, *nqlk;
+ nxt_conf_value_t *conf, *http;
+ nxt_conf_value_t *applications, *application;
+ nxt_conf_value_t *listeners, *listener;
nxt_socket_conf_t *skcf;
+ nxt_router_app_conf_t apcf;
nxt_router_listener_conf_t lscf;
- static nxt_str_t router_path = nxt_string("/router");
static nxt_str_t http_path = nxt_string("/http");
+ static nxt_str_t applications_path = nxt_string("/applications");
static nxt_str_t listeners_path = nxt_string("/listeners");
conf = nxt_conf_json_parse(tmcf->mem_pool, start, end);
@@ -295,16 +330,9 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
return NXT_ERROR;
}
- router = nxt_conf_get_path(conf, &router_path);
-
- if (router == NULL) {
- nxt_log(task, NXT_LOG_CRIT, "no \"/router\" block");
- return NXT_ERROR;
- }
-
- ret = nxt_conf_map_object(router, nxt_router_conf, tmcf->conf);
+ ret = nxt_conf_map_object(conf, nxt_router_conf, tmcf->conf);
if (ret != NXT_OK) {
- nxt_log(task, NXT_LOG_CRIT, "router map error");
+ nxt_log(task, NXT_LOG_CRIT, "root map error");
return NXT_ERROR;
}
@@ -312,24 +340,109 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
tmcf->conf->threads = nxt_ncpu;
}
- http = nxt_conf_get_path(conf, &http_path);
+ applications = nxt_conf_get_path(conf, &applications_path);
+ if (applications == NULL) {
+ nxt_log(task, NXT_LOG_CRIT, "no \"applications\" block");
+ return NXT_ERROR;
+ }
+
+ next = 0;
+
+ for ( ;; ) {
+ application = nxt_conf_next_object_member(applications, &name, &next);
+ if (application == NULL) {
+ break;
+ }
+
+ nxt_debug(task, "application \"%V\"", &name);
+
+ app = nxt_zalloc(sizeof(nxt_app_t));
+ if (app == NULL) {
+ goto fail;
+ }
+
+ size = nxt_conf_json_length(application, NULL);
+
+ app->conf.start = nxt_malloc(size);
+ if (app->conf.start == NULL) {
+ nxt_free(app);
+ goto fail;
+ }
+
+ p = nxt_conf_json_print(app->conf.start, application, NULL);
+ app->conf.length = p - app->conf.start;
+
+ nxt_debug(task, "application conf \"%V\"", &app->conf);
+
+ prev = nxt_router_app_find(&tmcf->conf->router->apps, &name);
+
+ if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
+ nxt_free(app->conf.start);
+ nxt_free(app);
+
+ nxt_queue_remove(&prev->link);
+ nxt_queue_insert_tail(&tmcf->previous, &prev->link);
+ continue;
+ }
+
+ ret = nxt_conf_map_object(application, nxt_router_app_conf, &apcf);
+ if (ret != NXT_OK) {
+ nxt_log(task, NXT_LOG_CRIT, "application map error");
+ goto app_fail;
+ }
+
+ nxt_debug(task, "application type: %V", &apcf.type);
+ nxt_debug(task, "application workers: %D", apcf.workers);
+
+ if (nxt_str_eq(&apcf.type, "python", 6)) {
+ type = NXT_APP_PYTHON;
+
+ } else if (nxt_str_eq(&apcf.type, "php", 3)) {
+ type = NXT_APP_PHP;
+
+ } else if (nxt_str_eq(&apcf.type, "ruby", 4)) {
+ type = NXT_APP_RUBY;
+
+ } else if (nxt_str_eq(&apcf.type, "go", 2)) {
+ type = NXT_APP_GO;
+
+ } else {
+ nxt_log(task, NXT_LOG_CRIT, "unsupported application type: \"%V\"",
+ &apcf.type);
+ goto app_fail;
+ }
+
+ ret = nxt_thread_mutex_create(&app->mutex);
+ if (ret != NXT_OK) {
+ goto app_fail;
+ }
+
+ app->name = name;
+ app->type = type;
+ app->max_workers = apcf.workers;
+ app->live = 1;
+
+ nxt_queue_insert_tail(&tmcf->apps, &app->link);
+ }
+ http = nxt_conf_get_path(conf, &http_path);
+#if 0
if (http == NULL) {
- nxt_log(task, NXT_LOG_CRIT, "no \"/http\" block");
+ nxt_log(task, NXT_LOG_CRIT, "no \"http\" block");
return NXT_ERROR;
}
+#endif
listeners = nxt_conf_get_path(conf, &listeners_path);
-
if (listeners == NULL) {
- nxt_log(task, NXT_LOG_CRIT, "no \"/listeners\" block");
+ nxt_log(task, NXT_LOG_CRIT, "no \"listeners\" block");
return NXT_ERROR;
}
- mp = tmcf->conf->mem_pool;
-
next = 0;
+ mp = tmcf->conf->mem_pool;
+
for ( ;; ) {
listener = nxt_conf_next_object_member(listeners, &name, &next);
if (listener == NULL) {
@@ -339,7 +452,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
sa = nxt_sockaddr_parse(mp, &name);
if (sa == NULL) {
nxt_log(task, NXT_LOG_CRIT, "invalid listener \"%V\"", &name);
- return NXT_ERROR;
+ goto fail;
}
sa->type = SOCK_STREAM;
@@ -349,31 +462,95 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
skcf = nxt_router_socket_conf(task, mp, sa);
if (skcf == NULL) {
- return NXT_ERROR;
+ goto fail;
}
ret = nxt_conf_map_object(listener, nxt_router_listener_conf, &lscf);
if (ret != NXT_OK) {
nxt_log(task, NXT_LOG_CRIT, "listener map error");
- return NXT_ERROR;
+ goto fail;
}
- nxt_debug(task, "router type: %V", &lscf.application_type);
- nxt_debug(task, "router workers: %D", lscf.application_workers);
+ nxt_debug(task, "application: %V", &lscf.application);
- ret = nxt_conf_map_object(http, nxt_router_http_conf, skcf);
- if (ret != NXT_OK) {
- nxt_log(task, NXT_LOG_CRIT, "http map error");
- return NXT_ERROR;
+ // STUB, default values if http block is not defined.
+ skcf->header_buffer_size = 2048;
+ skcf->large_header_buffer_size = 8192;
+ skcf->header_read_timeout = 5000;
+
+ if (http != NULL) {
+ ret = nxt_conf_map_object(http, nxt_router_http_conf, skcf);
+ if (ret != NXT_OK) {
+ nxt_log(task, NXT_LOG_CRIT, "http map error");
+ goto fail;
+ }
}
skcf->listen.handler = nxt_router_conn_init;
skcf->router_conf = tmcf->conf;
+ skcf->application = nxt_router_listener_application(tmcf,
+ &lscf.application);
nxt_queue_insert_tail(&tmcf->pending, &skcf->link);
}
return NXT_OK;
+
+app_fail:
+
+ nxt_free(app->conf.start);
+ nxt_free(app);
+
+fail:
+
+ for (qlk = nxt_queue_first(&tmcf->apps);
+ qlk != nxt_queue_tail(&tmcf->apps);
+ qlk = nqlk)
+ {
+ nqlk = nxt_queue_next(qlk);
+ app = nxt_queue_link_data(qlk, nxt_app_t, link);
+
+ nxt_thread_mutex_destroy(&app->mutex);
+ nxt_free(app);
+ }
+
+ return NXT_ERROR;
+}
+
+
+static nxt_app_t *
+nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
+{
+ nxt_app_t *app;
+ nxt_queue_link_t *qlk;
+
+ for (qlk = nxt_queue_first(queue);
+ qlk != nxt_queue_tail(queue);
+ qlk = nxt_queue_next(qlk))
+ {
+ app = nxt_queue_link_data(qlk, nxt_app_t, link);
+
+ if (nxt_strstr_eq(name, &app->name)) {
+ return app;
+ }
+ }
+
+ return NULL;
+}
+
+
+static nxt_app_t *
+nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, nxt_str_t *name)
+{
+ nxt_app_t *app;
+
+ app = nxt_router_app_find(&tmcf->apps, name);
+
+ if (app == NULL) {
+ app = nxt_router_app_find(&tmcf->conf->router->apps, name);
+ }
+
+ return app;
}
@@ -841,6 +1018,29 @@ nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
static void
+nxt_router_apps_sort(nxt_router_t *router, nxt_router_temp_conf_t *tmcf)
+{
+ nxt_app_t *app;
+ nxt_queue_link_t *qlk, *nqlk;
+
+ for (qlk = nxt_queue_first(&router->apps);
+ qlk != nxt_queue_tail(&router->apps);
+ qlk = nqlk)
+ {
+ nqlk = nxt_queue_next(qlk);
+ app = nxt_queue_link_data(qlk, nxt_app_t, link);
+
+ nxt_queue_remove(&app->link);
+
+ // RELEASE APP
+ }
+
+ nxt_queue_add(&router->apps, &tmcf->previous);
+ nxt_queue_add(&router->apps, &tmcf->apps);
+}
+
+
+static void
nxt_router_engines_post(nxt_router_temp_conf_t *tmcf)
{
nxt_uint_t n;
diff --git a/src/nxt_router.h b/src/nxt_router.h
index a7fb54a9..ff5a7c41 100644
--- a/src/nxt_router.h
+++ b/src/nxt_router.h
@@ -11,6 +11,7 @@
#include <nxt_main.h>
#include <nxt_runtime.h>
#include <nxt_master_process.h>
+#include <nxt_application.h>
typedef struct {
@@ -18,6 +19,7 @@ typedef struct {
nxt_queue_t engines;
nxt_queue_t sockets; /* of nxt_socket_conf_t */
+ nxt_queue_t apps; /* of nxt_app_t */
} nxt_router_t;
@@ -45,6 +47,9 @@ typedef struct {
nxt_queue_t keeping; /* of nxt_socket_conf_t */
nxt_queue_t deleting; /* of nxt_socket_conf_t */
+ nxt_queue_t apps; /* of nxt_app_t */
+ nxt_queue_t previous; /* of nxt_app_t */
+
uint32_t new_threads;
nxt_array_t *engines;
@@ -54,6 +59,23 @@ typedef struct {
typedef struct {
+ nxt_thread_mutex_t mutex;
+ nxt_queue_t ports;
+ nxt_str_t name;
+
+ uint32_t workers;
+ uint32_t max_workers;
+
+ nxt_app_type_t type:8;
+ uint8_t live; /* 1 bit */
+
+ nxt_queue_link_t link;
+
+ nxt_str_t conf;
+} nxt_app_t;
+
+
+typedef struct {
uint32_t count;
nxt_socket_t fd;
} nxt_router_socket_t;
@@ -66,6 +88,8 @@ typedef struct {
nxt_router_conf_t *router_conf;
nxt_sockaddr_t *sockaddr;
+ nxt_app_t *application;
+
nxt_listen_socket_t listen;
size_t header_buffer_size;