summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/nxt_controller.c182
1 files changed, 116 insertions, 66 deletions
diff --git a/src/nxt_controller.c b/src/nxt_controller.c
index 7aea4c91..74142c2e 100644
--- a/src/nxt_controller.c
+++ b/src/nxt_controller.c
@@ -40,6 +40,11 @@ typedef struct {
static void nxt_controller_process_new_port_handler(nxt_task_t *task,
nxt_port_recv_msg_t *msg);
+static nxt_int_t nxt_controller_conf_default(void);
+static void nxt_controller_conf_init_handler(nxt_task_t *task,
+ nxt_port_recv_msg_t *msg, void *data);
+static nxt_int_t nxt_controller_conf_send(nxt_task_t *task,
+ nxt_conf_value_t *conf, nxt_port_rpc_handler_t handler, void *data);
static void nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data);
static void nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data);
@@ -64,8 +69,6 @@ static nxt_int_t nxt_controller_request_content_length(void *ctx,
static void nxt_controller_process_request(nxt_task_t *task,
nxt_controller_request_t *req);
-static nxt_int_t nxt_controller_conf_apply(nxt_task_t *task,
- nxt_controller_request_t *req);
static void nxt_controller_conf_handler(nxt_task_t *task,
nxt_port_recv_msg_t *msg, void *data);
static void nxt_controller_response(nxt_task_t *task,
@@ -134,41 +137,127 @@ static void
nxt_controller_process_new_port_handler(nxt_task_t *task,
nxt_port_recv_msg_t *msg)
{
- nxt_mp_t *mp;
+ nxt_int_t rc;
nxt_runtime_t *rt;
nxt_conf_value_t *conf;
- static const nxt_str_t json
- = nxt_string("{ \"listeners\": {}, \"applications\": {} }");
-
nxt_port_new_port_handler(task, msg);
- if (nxt_controller_conf.root != NULL
- || msg->new_port->type != NXT_PROCESS_ROUTER)
- {
+ if (msg->new_port->type != NXT_PROCESS_ROUTER) {
return;
}
+ conf = nxt_controller_conf.root;
+
+ if (conf != NULL) {
+ rc = nxt_controller_conf_send(task, conf,
+ nxt_controller_conf_init_handler, NULL);
+
+ if (nxt_fast_path(rc == NXT_OK)) {
+ return;
+ }
+
+ nxt_mp_destroy(nxt_controller_conf.pool);
+
+ if (nxt_slow_path(nxt_controller_conf_default() != NXT_OK)) {
+ nxt_abort();
+ }
+ }
+
+ if (nxt_slow_path(nxt_controller_conf_default() != NXT_OK)) {
+ nxt_abort();
+ }
+
+ rt = task->thread->runtime;
+
+ if (nxt_slow_path(nxt_listen_event(task, rt->controller_socket) == NULL)) {
+ nxt_abort();
+ }
+}
+
+
+static nxt_int_t
+nxt_controller_conf_default(void)
+{
+ nxt_mp_t *mp;
+ nxt_conf_value_t *conf;
+
+ static const nxt_str_t json
+ = nxt_string("{ \"listeners\": {}, \"applications\": {} }");
+
mp = nxt_mp_create(1024, 128, 256, 32);
if (nxt_slow_path(mp == NULL)) {
- nxt_abort();
+ return NXT_ERROR;
}
conf = nxt_conf_json_parse_str(mp, &json);
if (nxt_slow_path(conf == NULL)) {
- nxt_abort();
+ return NXT_ERROR;
}
nxt_controller_conf.root = conf;
nxt_controller_conf.pool = mp;
+ return NXT_OK;
+}
+
+
+static void
+nxt_controller_conf_init_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
+ void *data)
+{
+ if (msg->port_msg.type != NXT_PORT_MSG_RPC_READY) {
+ nxt_mp_destroy(nxt_controller_conf.pool);
+
+ if (nxt_slow_path(nxt_controller_conf_default() != NXT_OK)) {
+ nxt_abort();
+ }
+ }
+}
+
+
+static nxt_int_t
+nxt_controller_conf_send(nxt_task_t *task, nxt_conf_value_t *conf,
+ nxt_port_rpc_handler_t handler, void *data)
+{
+ size_t size;
+ uint32_t stream;
+ nxt_int_t rc;
+ nxt_buf_t *b;
+ nxt_port_t *router_port, *controller_port;
+ nxt_runtime_t *rt;
+
rt = task->thread->runtime;
- if (nxt_slow_path(nxt_listen_event(task, rt->controller_socket) == NULL)) {
- nxt_abort();
+ router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
+
+ if (nxt_slow_path(router_port == NULL)) {
+ return NXT_DECLINED;
}
+
+ controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
+
+ size = nxt_conf_json_length(conf, NULL);
+
+ b = nxt_port_mmap_get_buf(task, router_port, size);
+
+ b->mem.free = nxt_conf_json_print(b->mem.free, conf, NULL);
+
+ stream = nxt_port_rpc_register_handler(task, controller_port,
+ handler, handler,
+ router_port->pid, data);
+
+ rc = nxt_port_socket_write(task, router_port, NXT_PORT_MSG_DATA_LAST, -1,
+ stream, controller_port->id, b);
+
+ if (nxt_slow_path(rc != NXT_OK)) {
+ nxt_port_rpc_cancel(task, controller_port, stream);
+ return NXT_ERROR;
+ }
+
+ return NXT_OK;
}
@@ -679,10 +768,8 @@ nxt_controller_process_request(nxt_task_t *task, nxt_controller_request_t *req)
goto invalid_conf;
}
- req->conf.root = value;
- req->conf.pool = mp;
-
- rc = nxt_controller_conf_apply(task, req);
+ rc = nxt_controller_conf_send(task, value,
+ nxt_controller_conf_handler, req);
if (nxt_slow_path(rc != NXT_OK)) {
nxt_mp_destroy(mp);
@@ -695,6 +782,11 @@ nxt_controller_process_request(nxt_task_t *task, nxt_controller_request_t *req)
goto alloc_fail;
}
+ req->conf.root = value;
+ req->conf.pool = mp;
+
+ nxt_queue_insert_head(&nxt_controller_waiting_requests, &req->link);
+
return;
}
@@ -746,10 +838,8 @@ nxt_controller_process_request(nxt_task_t *task, nxt_controller_request_t *req)
goto invalid_conf;
}
- req->conf.root = value;
- req->conf.pool = mp;
-
- rc = nxt_controller_conf_apply(task, req);
+ rc = nxt_controller_conf_send(task, value,
+ nxt_controller_conf_handler, req);
if (nxt_slow_path(rc != NXT_OK)) {
nxt_mp_destroy(mp);
@@ -762,6 +852,11 @@ nxt_controller_process_request(nxt_task_t *task, nxt_controller_request_t *req)
goto alloc_fail;
}
+ req->conf.root = value;
+ req->conf.pool = mp;
+
+ nxt_queue_insert_head(&nxt_controller_waiting_requests, &req->link);
+
return;
}
@@ -810,51 +905,6 @@ no_router:
}
-static nxt_int_t
-nxt_controller_conf_apply(nxt_task_t *task, nxt_controller_request_t *req)
-{
- size_t size;
- uint32_t stream;
- nxt_int_t rc;
- nxt_buf_t *b;
- nxt_port_t *router_port, *controller_port;
- nxt_runtime_t *rt;
-
- rt = task->thread->runtime;
-
- router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
-
- if (nxt_slow_path(router_port == NULL)) {
- return NXT_DECLINED;
- }
-
- controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
-
- size = nxt_conf_json_length(req->conf.root, NULL);
-
- b = nxt_port_mmap_get_buf(task, router_port, size);
-
- b->mem.free = nxt_conf_json_print(b->mem.free, req->conf.root, NULL);
-
- stream = nxt_port_rpc_register_handler(task, controller_port,
- nxt_controller_conf_handler,
- nxt_controller_conf_handler,
- router_port->pid, req);
-
- rc = nxt_port_socket_write(task, router_port, NXT_PORT_MSG_DATA_LAST, -1,
- stream, controller_port->id, b);
-
- if (nxt_slow_path(rc != NXT_OK)) {
- nxt_port_rpc_cancel(task, controller_port, stream);
- return NXT_ERROR;
- }
-
- nxt_queue_insert_head(&nxt_controller_waiting_requests, &req->link);
-
- return NXT_OK;
-}
-
-
static void
nxt_controller_conf_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
void *data)