summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_router.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/nxt_router.c121
1 files changed, 98 insertions, 23 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c
index 93b750a0..788199c7 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -27,6 +27,7 @@ typedef struct {
uint32_t requests;
nxt_conf_value_t *limits_value;
nxt_conf_value_t *processes_value;
+ nxt_conf_value_t *targets_value;
} nxt_router_app_conf_t;
@@ -74,6 +75,9 @@ struct nxt_port_select_state_s {
typedef struct nxt_port_select_state_s nxt_port_select_state_t;
+static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process,
+ nxt_mp_t *mp);
+static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data);
static void nxt_router_greet_controller(nxt_task_t *task,
nxt_port_t *controller_port);
@@ -267,8 +271,8 @@ static const nxt_str_t *nxt_app_msg_prefix[] = {
};
-nxt_port_handlers_t nxt_router_process_port_handlers = {
- .quit = nxt_worker_process_quit_handler,
+static const nxt_port_handlers_t nxt_router_process_port_handlers = {
+ .quit = nxt_signal_quit_handler,
.new_port = nxt_router_new_port_handler,
.change_file = nxt_port_change_log_file_handler,
.mmap = nxt_port_mmap_handler,
@@ -281,8 +285,29 @@ nxt_port_handlers_t nxt_router_process_port_handlers = {
};
-nxt_int_t
-nxt_router_start(nxt_task_t *task, void *data)
+const nxt_process_init_t nxt_router_process = {
+ .name = "router",
+ .type = NXT_PROCESS_ROUTER,
+ .prefork = nxt_router_prefork,
+ .restart = 1,
+ .setup = nxt_process_core_setup,
+ .start = nxt_router_start,
+ .port_handlers = &nxt_router_process_port_handlers,
+ .signals = nxt_process_signals,
+};
+
+
+static nxt_int_t
+nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp)
+{
+ nxt_runtime_stop_app_processes(task, task->thread->runtime);
+
+ return NXT_OK;
+}
+
+
+static nxt_int_t
+nxt_router_start(nxt_task_t *task, nxt_process_data_t *data)
{
nxt_int_t ret;
nxt_port_t *controller_port;
@@ -291,6 +316,8 @@ nxt_router_start(nxt_task_t *task, void *data)
rt = task->thread->runtime;
+ nxt_log(task, NXT_LOG_INFO, "router started");
+
#if (NXT_TLS)
rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL");
if (nxt_slow_path(rt->tls == NULL)) {
@@ -381,8 +408,8 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
goto failed;
}
- ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1,
- stream, port->id, b);
+ ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS,
+ -1, stream, port->id, b);
if (nxt_slow_path(ret != NXT_OK)) {
nxt_port_rpc_cancel(task, port, stream);
@@ -861,7 +888,7 @@ nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
}
if (msg->u.new_port == NULL
- || msg->u.new_port->type != NXT_PROCESS_WORKER)
+ || msg->u.new_port->type != NXT_PROCESS_APP)
{
msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
}
@@ -943,8 +970,10 @@ nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
{
- nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid,
- msg->u.data);
+ if (nxt_fast_path(engine->port != NULL)) {
+ nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid,
+ msg->u.data);
+ }
}
nxt_queue_loop;
@@ -1272,6 +1301,12 @@ static nxt_conf_map_t nxt_router_app_conf[] = {
NXT_CONF_MAP_PTR,
offsetof(nxt_router_app_conf_t, processes_value),
},
+
+ {
+ nxt_string("targets"),
+ NXT_CONF_MAP_PTR,
+ offsetof(nxt_router_app_conf_t, targets_value),
+ },
};
@@ -1423,12 +1458,13 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
{
u_char *p;
size_t size;
- nxt_mp_t *mp;
- uint32_t next;
+ nxt_mp_t *mp, *app_mp;
+ uint32_t next, next_target;
nxt_int_t ret;
- nxt_str_t name, path;
+ nxt_str_t name, path, target;
nxt_app_t *app, *prev;
- nxt_str_t *t;
+ nxt_str_t *t, *s, *targets;
+ nxt_uint_t n, i;
nxt_router_t *router;
nxt_app_joint_t *app_joint;
nxt_conf_value_t *conf, *http, *value, *websocket;
@@ -1501,13 +1537,20 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
size = nxt_conf_json_length(application, NULL);
- app = nxt_malloc(sizeof(nxt_app_t) + name.length + size);
- if (app == NULL) {
+ app_mp = nxt_mp_create(4096, 128, 1024, 64);
+ if (nxt_slow_path(app_mp == NULL)) {
goto fail;
}
+ app = nxt_mp_get(app_mp, sizeof(nxt_app_t) + name.length + size);
+ if (app == NULL) {
+ goto app_fail;
+ }
+
nxt_memzero(app, sizeof(nxt_app_t));
+ app->mem_pool = app_mp;
+
app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t));
app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t)
+ name.length);
@@ -1522,7 +1565,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
prev = nxt_router_app_find(&router->apps, &name);
if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
- nxt_free(app);
+ nxt_mp_destroy(app_mp);
nxt_queue_remove(&prev->link);
nxt_queue_insert_tail(&tmcf->previous, &prev->link);
@@ -1538,6 +1581,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
apcf.requests = 0;
apcf.limits_value = NULL;
apcf.processes_value = NULL;
+ apcf.targets_value = NULL;
app_joint = nxt_malloc(sizeof(nxt_app_joint_t));
if (nxt_slow_path(app_joint == NULL)) {
@@ -1587,6 +1631,30 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
apcf.spare_processes = apcf.processes;
}
+ if (apcf.targets_value != NULL) {
+ n = nxt_conf_object_members_count(apcf.targets_value);
+
+ targets = nxt_mp_get(app_mp, sizeof(nxt_str_t) * n);
+ if (nxt_slow_path(targets == NULL)) {
+ goto app_fail;
+ }
+
+ next_target = 0;
+
+ for (i = 0; i < n; i++) {
+ (void) nxt_conf_next_object_member(apcf.targets_value,
+ &target, &next_target);
+
+ s = nxt_str_dup(app_mp, &targets[i], &target);
+ if (nxt_slow_path(s == NULL)) {
+ goto app_fail;
+ }
+ }
+
+ } else {
+ targets = NULL;
+ }
+
nxt_debug(task, "application type: %V", &apcf.type);
nxt_debug(task, "application processes: %D", apcf.processes);
nxt_debug(task, "application request timeout: %M", apcf.timeout);
@@ -1628,6 +1696,8 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
app->max_pending_responses = 2;
app->max_requests = apcf.requests;
+ app->targets = targets;
+
engine = task->thread->engine;
app->engine = engine;
@@ -1793,6 +1863,11 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
}
}
+ ret = nxt_http_routes_resolve(task, tmcf);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ goto fail;
+ }
+
value = nxt_conf_get_path(conf, &access_log_path);
if (value != NULL) {
@@ -1827,8 +1902,6 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
tmcf->router_conf->access_log = access_log;
}
- nxt_http_routes_resolve(task, tmcf);
-
nxt_queue_add(&tmcf->deleting, &router->sockets);
nxt_queue_init(&router->sockets);
@@ -1836,7 +1909,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
app_fail:
- nxt_free(app);
+ nxt_mp_destroy(app_mp);
fail:
@@ -1844,7 +1917,7 @@ fail:
nxt_queue_remove(&app->link);
nxt_thread_mutex_destroy(&app->mutex);
- nxt_free(app);
+ nxt_mp_destroy(app->mem_pool);
} nxt_queue_loop;
@@ -2353,8 +2426,8 @@ nxt_router_app_rpc_create(nxt_task_t *task,
goto fail;
}
- ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1,
- stream, router_port->id, b);
+ ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS,
+ -1, stream, router_port->id, b);
if (nxt_slow_path(ret != NXT_OK)) {
nxt_port_rpc_cancel(task, router_port, stream);
@@ -4535,7 +4608,7 @@ nxt_router_free_app(nxt_task_t *task, void *obj, void *data)
nxt_assert(nxt_queue_is_empty(&app->idle_ports));
nxt_thread_mutex_destroy(&app->mutex);
- nxt_free(app);
+ nxt_mp_destroy(app->mem_pool);
app_joint->app = NULL;
@@ -4989,6 +5062,8 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r,
req = (nxt_unit_request_t *) out->mem.free;
out->mem.free += req_size;
+ req->app_target = r->app_target;
+
req->content_length = content_length;
p = (u_char *) (req->fields + fields_count);