diff options
author | Igor Sysoev <igor@sysoev.ru> | 2017-07-07 21:09:45 +0300 |
---|---|---|
committer | Igor Sysoev <igor@sysoev.ru> | 2017-07-07 21:09:45 +0300 |
commit | b54bcef53de43cc07c3a7ce60377d6221d49610d (patch) | |
tree | 698703fa699857230feacfb5c67bfe8ed1ca3830 | |
parent | fd0a4ff064a1d359078a06904db0402f811d739c (diff) | |
download | unit-b54bcef53de43cc07c3a7ce60377d6221d49610d.tar.gz unit-b54bcef53de43cc07c3a7ce60377d6221d49610d.tar.bz2 |
Router: processing application configuration.
-rw-r--r-- | src/nxt_application.h | 8 | ||||
-rw-r--r-- | src/nxt_router.c | 270 | ||||
-rw-r--r-- | src/nxt_router.h | 24 |
3 files changed, 267 insertions, 35 deletions
diff --git a/src/nxt_application.h b/src/nxt_application.h index 9b51ba6d..5a585c89 100644 --- a/src/nxt_application.h +++ b/src/nxt_application.h @@ -9,6 +9,14 @@ #define _NXT_APPLICATION_H_INCLUDED_ +typedef enum { + NXT_APP_PYTHON = 0, + NXT_APP_PHP, + NXT_APP_RUBY, + NXT_APP_GO, +} nxt_app_type_t; + + typedef struct { nxt_str_t name; nxt_str_t value; diff --git a/src/nxt_router.c b/src/nxt_router.c index b4f31536..f6d2ae34 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -7,12 +7,16 @@ #include <nxt_router.h> #include <nxt_conf.h> -#include <nxt_application.h> typedef struct { - nxt_str_t application_type; - uint32_t application_workers; + nxt_str_t type; + uint32_t workers; +} nxt_router_app_conf_t; + + +typedef struct { + nxt_str_t application; } nxt_router_listener_conf_t; @@ -23,6 +27,9 @@ static void nxt_router_listen_sockets_sort(nxt_router_t *router, static nxt_int_t nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end); +static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name); +static nxt_app_t *nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, + nxt_str_t *name); static nxt_int_t nxt_router_listen_sockets_stub_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf); static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task, nxt_mp_t *mp, @@ -48,6 +55,8 @@ static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, nxt_router_temp_conf_t *tmcf); static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, nxt_event_engine_t *engine); +static void nxt_router_apps_sort(nxt_router_t *router, + nxt_router_temp_conf_t *tmcf); static void nxt_router_engines_post(nxt_router_temp_conf_t *tmcf); static void nxt_router_engine_post(nxt_router_engine_conf_t *recf); @@ -100,6 +109,7 @@ nxt_router_start(nxt_task_t *task, nxt_runtime_t *rt) nxt_queue_init(&router->engines); nxt_queue_init(&router->sockets); + nxt_queue_init(&router->apps); nxt_router = router; @@ -144,6 +154,8 @@ nxt_router_new_conf(nxt_task_t *task, nxt_runtime_t *rt, nxt_router_t *router, return ret; } + nxt_router_apps_sort(router, tmcf); + nxt_router_engines_post(tmcf); nxt_queue_add(&router->sockets, &tmcf->updating); @@ -200,6 +212,8 @@ nxt_router_temp_conf(nxt_task_t *task, nxt_router_t *router) nxt_queue_init(&tmcf->updating); nxt_queue_init(&tmcf->pending); nxt_queue_init(&tmcf->creating); + nxt_queue_init(&tmcf->apps); + nxt_queue_init(&tmcf->previous); return tmcf; @@ -217,7 +231,7 @@ fail: static nxt_conf_map_t nxt_router_conf[] = { { - nxt_string("threads"), + nxt_string("listeners_threads"), NXT_CONF_MAP_INT32, offsetof(nxt_router_conf_t, threads), }, @@ -228,17 +242,30 @@ static nxt_conf_map_t nxt_router_conf[] = { }; -static nxt_conf_map_t nxt_router_listener_conf[] = { +static nxt_conf_map_t nxt_router_app_conf[] = { { - nxt_string("_application_type"), + nxt_string("type"), NXT_CONF_MAP_STR, - offsetof(nxt_router_listener_conf_t, application_type), + offsetof(nxt_router_app_conf_t, type), }, { - nxt_string("_application_workers"), + nxt_string("workers"), NXT_CONF_MAP_INT32, - offsetof(nxt_router_listener_conf_t, application_workers), + offsetof(nxt_router_app_conf_t, workers), + }, + + { + nxt_null_string, 0, 0, + }, +}; + + +static nxt_conf_map_t nxt_router_listener_conf[] = { + { + nxt_string("application"), + NXT_CONF_MAP_STR, + offsetof(nxt_router_listener_conf_t, application), }, { @@ -276,17 +303,25 @@ static nxt_int_t nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end) { + u_char *p; + size_t size; nxt_mp_t *mp; uint32_t next; nxt_int_t ret; nxt_str_t name; + nxt_app_t *app, *prev; + nxt_app_type_t type; nxt_sockaddr_t *sa; - nxt_conf_value_t *conf, *listeners, *router, *http, *listener; + nxt_queue_link_t *qlk, *nqlk; + nxt_conf_value_t *conf, *http; + nxt_conf_value_t *applications, *application; + nxt_conf_value_t *listeners, *listener; nxt_socket_conf_t *skcf; + nxt_router_app_conf_t apcf; nxt_router_listener_conf_t lscf; - static nxt_str_t router_path = nxt_string("/router"); static nxt_str_t http_path = nxt_string("/http"); + static nxt_str_t applications_path = nxt_string("/applications"); static nxt_str_t listeners_path = nxt_string("/listeners"); conf = nxt_conf_json_parse(tmcf->mem_pool, start, end); @@ -295,16 +330,9 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, return NXT_ERROR; } - router = nxt_conf_get_path(conf, &router_path); - - if (router == NULL) { - nxt_log(task, NXT_LOG_CRIT, "no \"/router\" block"); - return NXT_ERROR; - } - - ret = nxt_conf_map_object(router, nxt_router_conf, tmcf->conf); + ret = nxt_conf_map_object(conf, nxt_router_conf, tmcf->conf); if (ret != NXT_OK) { - nxt_log(task, NXT_LOG_CRIT, "router map error"); + nxt_log(task, NXT_LOG_CRIT, "root map error"); return NXT_ERROR; } @@ -312,24 +340,109 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, tmcf->conf->threads = nxt_ncpu; } - http = nxt_conf_get_path(conf, &http_path); + applications = nxt_conf_get_path(conf, &applications_path); + if (applications == NULL) { + nxt_log(task, NXT_LOG_CRIT, "no \"applications\" block"); + return NXT_ERROR; + } + + next = 0; + + for ( ;; ) { + application = nxt_conf_next_object_member(applications, &name, &next); + if (application == NULL) { + break; + } + + nxt_debug(task, "application \"%V\"", &name); + + app = nxt_zalloc(sizeof(nxt_app_t)); + if (app == NULL) { + goto fail; + } + + size = nxt_conf_json_length(application, NULL); + + app->conf.start = nxt_malloc(size); + if (app->conf.start == NULL) { + nxt_free(app); + goto fail; + } + + p = nxt_conf_json_print(app->conf.start, application, NULL); + app->conf.length = p - app->conf.start; + + nxt_debug(task, "application conf \"%V\"", &app->conf); + + prev = nxt_router_app_find(&tmcf->conf->router->apps, &name); + + if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) { + nxt_free(app->conf.start); + nxt_free(app); + + nxt_queue_remove(&prev->link); + nxt_queue_insert_tail(&tmcf->previous, &prev->link); + continue; + } + + ret = nxt_conf_map_object(application, nxt_router_app_conf, &apcf); + if (ret != NXT_OK) { + nxt_log(task, NXT_LOG_CRIT, "application map error"); + goto app_fail; + } + + nxt_debug(task, "application type: %V", &apcf.type); + nxt_debug(task, "application workers: %D", apcf.workers); + + if (nxt_str_eq(&apcf.type, "python", 6)) { + type = NXT_APP_PYTHON; + + } else if (nxt_str_eq(&apcf.type, "php", 3)) { + type = NXT_APP_PHP; + + } else if (nxt_str_eq(&apcf.type, "ruby", 4)) { + type = NXT_APP_RUBY; + + } else if (nxt_str_eq(&apcf.type, "go", 2)) { + type = NXT_APP_GO; + + } else { + nxt_log(task, NXT_LOG_CRIT, "unsupported application type: \"%V\"", + &apcf.type); + goto app_fail; + } + + ret = nxt_thread_mutex_create(&app->mutex); + if (ret != NXT_OK) { + goto app_fail; + } + + app->name = name; + app->type = type; + app->max_workers = apcf.workers; + app->live = 1; + + nxt_queue_insert_tail(&tmcf->apps, &app->link); + } + http = nxt_conf_get_path(conf, &http_path); +#if 0 if (http == NULL) { - nxt_log(task, NXT_LOG_CRIT, "no \"/http\" block"); + nxt_log(task, NXT_LOG_CRIT, "no \"http\" block"); return NXT_ERROR; } +#endif listeners = nxt_conf_get_path(conf, &listeners_path); - if (listeners == NULL) { - nxt_log(task, NXT_LOG_CRIT, "no \"/listeners\" block"); + nxt_log(task, NXT_LOG_CRIT, "no \"listeners\" block"); return NXT_ERROR; } - mp = tmcf->conf->mem_pool; - next = 0; + mp = tmcf->conf->mem_pool; + for ( ;; ) { listener = nxt_conf_next_object_member(listeners, &name, &next); if (listener == NULL) { @@ -339,7 +452,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, sa = nxt_sockaddr_parse(mp, &name); if (sa == NULL) { nxt_log(task, NXT_LOG_CRIT, "invalid listener \"%V\"", &name); - return NXT_ERROR; + goto fail; } sa->type = SOCK_STREAM; @@ -349,31 +462,95 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, skcf = nxt_router_socket_conf(task, mp, sa); if (skcf == NULL) { - return NXT_ERROR; + goto fail; } ret = nxt_conf_map_object(listener, nxt_router_listener_conf, &lscf); if (ret != NXT_OK) { nxt_log(task, NXT_LOG_CRIT, "listener map error"); - return NXT_ERROR; + goto fail; } - nxt_debug(task, "router type: %V", &lscf.application_type); - nxt_debug(task, "router workers: %D", lscf.application_workers); + nxt_debug(task, "application: %V", &lscf.application); - ret = nxt_conf_map_object(http, nxt_router_http_conf, skcf); - if (ret != NXT_OK) { - nxt_log(task, NXT_LOG_CRIT, "http map error"); - return NXT_ERROR; + // STUB, default values if http block is not defined. + skcf->header_buffer_size = 2048; + skcf->large_header_buffer_size = 8192; + skcf->header_read_timeout = 5000; + + if (http != NULL) { + ret = nxt_conf_map_object(http, nxt_router_http_conf, skcf); + if (ret != NXT_OK) { + nxt_log(task, NXT_LOG_CRIT, "http map error"); + goto fail; + } } skcf->listen.handler = nxt_router_conn_init; skcf->router_conf = tmcf->conf; + skcf->application = nxt_router_listener_application(tmcf, + &lscf.application); nxt_queue_insert_tail(&tmcf->pending, &skcf->link); } return NXT_OK; + +app_fail: + + nxt_free(app->conf.start); + nxt_free(app); + +fail: + + for (qlk = nxt_queue_first(&tmcf->apps); + qlk != nxt_queue_tail(&tmcf->apps); + qlk = nqlk) + { + nqlk = nxt_queue_next(qlk); + app = nxt_queue_link_data(qlk, nxt_app_t, link); + + nxt_thread_mutex_destroy(&app->mutex); + nxt_free(app); + } + + return NXT_ERROR; +} + + +static nxt_app_t * +nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name) +{ + nxt_app_t *app; + nxt_queue_link_t *qlk; + + for (qlk = nxt_queue_first(queue); + qlk != nxt_queue_tail(queue); + qlk = nxt_queue_next(qlk)) + { + app = nxt_queue_link_data(qlk, nxt_app_t, link); + + if (nxt_strstr_eq(name, &app->name)) { + return app; + } + } + + return NULL; +} + + +static nxt_app_t * +nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, nxt_str_t *name) +{ + nxt_app_t *app; + + app = nxt_router_app_find(&tmcf->apps, name); + + if (app == NULL) { + app = nxt_router_app_find(&tmcf->conf->router->apps, name); + } + + return app; } @@ -841,6 +1018,29 @@ nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, static void +nxt_router_apps_sort(nxt_router_t *router, nxt_router_temp_conf_t *tmcf) +{ + nxt_app_t *app; + nxt_queue_link_t *qlk, *nqlk; + + for (qlk = nxt_queue_first(&router->apps); + qlk != nxt_queue_tail(&router->apps); + qlk = nqlk) + { + nqlk = nxt_queue_next(qlk); + app = nxt_queue_link_data(qlk, nxt_app_t, link); + + nxt_queue_remove(&app->link); + + // RELEASE APP + } + + nxt_queue_add(&router->apps, &tmcf->previous); + nxt_queue_add(&router->apps, &tmcf->apps); +} + + +static void nxt_router_engines_post(nxt_router_temp_conf_t *tmcf) { nxt_uint_t n; diff --git a/src/nxt_router.h b/src/nxt_router.h index a7fb54a9..ff5a7c41 100644 --- a/src/nxt_router.h +++ b/src/nxt_router.h @@ -11,6 +11,7 @@ #include <nxt_main.h> #include <nxt_runtime.h> #include <nxt_master_process.h> +#include <nxt_application.h> typedef struct { @@ -18,6 +19,7 @@ typedef struct { nxt_queue_t engines; nxt_queue_t sockets; /* of nxt_socket_conf_t */ + nxt_queue_t apps; /* of nxt_app_t */ } nxt_router_t; @@ -45,6 +47,9 @@ typedef struct { nxt_queue_t keeping; /* of nxt_socket_conf_t */ nxt_queue_t deleting; /* of nxt_socket_conf_t */ + nxt_queue_t apps; /* of nxt_app_t */ + nxt_queue_t previous; /* of nxt_app_t */ + uint32_t new_threads; nxt_array_t *engines; @@ -54,6 +59,23 @@ typedef struct { typedef struct { + nxt_thread_mutex_t mutex; + nxt_queue_t ports; + nxt_str_t name; + + uint32_t workers; + uint32_t max_workers; + + nxt_app_type_t type:8; + uint8_t live; /* 1 bit */ + + nxt_queue_link_t link; + + nxt_str_t conf; +} nxt_app_t; + + +typedef struct { uint32_t count; nxt_socket_t fd; } nxt_router_socket_t; @@ -66,6 +88,8 @@ typedef struct { nxt_router_conf_t *router_conf; nxt_sockaddr_t *sockaddr; + nxt_app_t *application; + nxt_listen_socket_t listen; size_t header_buffer_size; |