diff options
Diffstat (limited to 'src/nxt_controller.c')
-rw-r--r-- | src/nxt_controller.c | 182 |
1 files changed, 116 insertions, 66 deletions
diff --git a/src/nxt_controller.c b/src/nxt_controller.c index 7aea4c91..74142c2e 100644 --- a/src/nxt_controller.c +++ b/src/nxt_controller.c @@ -40,6 +40,11 @@ typedef struct { static void nxt_controller_process_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); +static nxt_int_t nxt_controller_conf_default(void); +static void nxt_controller_conf_init_handler(nxt_task_t *task, + nxt_port_recv_msg_t *msg, void *data); +static nxt_int_t nxt_controller_conf_send(nxt_task_t *task, + nxt_conf_value_t *conf, nxt_port_rpc_handler_t handler, void *data); static void nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data); static void nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data); @@ -64,8 +69,6 @@ static nxt_int_t nxt_controller_request_content_length(void *ctx, static void nxt_controller_process_request(nxt_task_t *task, nxt_controller_request_t *req); -static nxt_int_t nxt_controller_conf_apply(nxt_task_t *task, - nxt_controller_request_t *req); static void nxt_controller_conf_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data); static void nxt_controller_response(nxt_task_t *task, @@ -134,41 +137,127 @@ static void nxt_controller_process_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { - nxt_mp_t *mp; + nxt_int_t rc; nxt_runtime_t *rt; nxt_conf_value_t *conf; - static const nxt_str_t json - = nxt_string("{ \"listeners\": {}, \"applications\": {} }"); - nxt_port_new_port_handler(task, msg); - if (nxt_controller_conf.root != NULL - || msg->new_port->type != NXT_PROCESS_ROUTER) - { + if (msg->new_port->type != NXT_PROCESS_ROUTER) { return; } + conf = nxt_controller_conf.root; + + if (conf != NULL) { + rc = nxt_controller_conf_send(task, conf, + nxt_controller_conf_init_handler, NULL); + + if (nxt_fast_path(rc == NXT_OK)) { + return; + } + + nxt_mp_destroy(nxt_controller_conf.pool); + + if (nxt_slow_path(nxt_controller_conf_default() != NXT_OK)) { + nxt_abort(); + } + } + + if (nxt_slow_path(nxt_controller_conf_default() != NXT_OK)) { + nxt_abort(); + } + + rt = task->thread->runtime; + + if (nxt_slow_path(nxt_listen_event(task, rt->controller_socket) == NULL)) { + nxt_abort(); + } +} + + +static nxt_int_t +nxt_controller_conf_default(void) +{ + nxt_mp_t *mp; + nxt_conf_value_t *conf; + + static const nxt_str_t json + = nxt_string("{ \"listeners\": {}, \"applications\": {} }"); + mp = nxt_mp_create(1024, 128, 256, 32); if (nxt_slow_path(mp == NULL)) { - nxt_abort(); + return NXT_ERROR; } conf = nxt_conf_json_parse_str(mp, &json); if (nxt_slow_path(conf == NULL)) { - nxt_abort(); + return NXT_ERROR; } nxt_controller_conf.root = conf; nxt_controller_conf.pool = mp; + return NXT_OK; +} + + +static void +nxt_controller_conf_init_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, + void *data) +{ + if (msg->port_msg.type != NXT_PORT_MSG_RPC_READY) { + nxt_mp_destroy(nxt_controller_conf.pool); + + if (nxt_slow_path(nxt_controller_conf_default() != NXT_OK)) { + nxt_abort(); + } + } +} + + +static nxt_int_t +nxt_controller_conf_send(nxt_task_t *task, nxt_conf_value_t *conf, + nxt_port_rpc_handler_t handler, void *data) +{ + size_t size; + uint32_t stream; + nxt_int_t rc; + nxt_buf_t *b; + nxt_port_t *router_port, *controller_port; + nxt_runtime_t *rt; + rt = task->thread->runtime; - if (nxt_slow_path(nxt_listen_event(task, rt->controller_socket) == NULL)) { - nxt_abort(); + router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; + + if (nxt_slow_path(router_port == NULL)) { + return NXT_DECLINED; } + + controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER]; + + size = nxt_conf_json_length(conf, NULL); + + b = nxt_port_mmap_get_buf(task, router_port, size); + + b->mem.free = nxt_conf_json_print(b->mem.free, conf, NULL); + + stream = nxt_port_rpc_register_handler(task, controller_port, + handler, handler, + router_port->pid, data); + + rc = nxt_port_socket_write(task, router_port, NXT_PORT_MSG_DATA_LAST, -1, + stream, controller_port->id, b); + + if (nxt_slow_path(rc != NXT_OK)) { + nxt_port_rpc_cancel(task, controller_port, stream); + return NXT_ERROR; + } + + return NXT_OK; } @@ -679,10 +768,8 @@ nxt_controller_process_request(nxt_task_t *task, nxt_controller_request_t *req) goto invalid_conf; } - req->conf.root = value; - req->conf.pool = mp; - - rc = nxt_controller_conf_apply(task, req); + rc = nxt_controller_conf_send(task, value, + nxt_controller_conf_handler, req); if (nxt_slow_path(rc != NXT_OK)) { nxt_mp_destroy(mp); @@ -695,6 +782,11 @@ nxt_controller_process_request(nxt_task_t *task, nxt_controller_request_t *req) goto alloc_fail; } + req->conf.root = value; + req->conf.pool = mp; + + nxt_queue_insert_head(&nxt_controller_waiting_requests, &req->link); + return; } @@ -746,10 +838,8 @@ nxt_controller_process_request(nxt_task_t *task, nxt_controller_request_t *req) goto invalid_conf; } - req->conf.root = value; - req->conf.pool = mp; - - rc = nxt_controller_conf_apply(task, req); + rc = nxt_controller_conf_send(task, value, + nxt_controller_conf_handler, req); if (nxt_slow_path(rc != NXT_OK)) { nxt_mp_destroy(mp); @@ -762,6 +852,11 @@ nxt_controller_process_request(nxt_task_t *task, nxt_controller_request_t *req) goto alloc_fail; } + req->conf.root = value; + req->conf.pool = mp; + + nxt_queue_insert_head(&nxt_controller_waiting_requests, &req->link); + return; } @@ -810,51 +905,6 @@ no_router: } -static nxt_int_t -nxt_controller_conf_apply(nxt_task_t *task, nxt_controller_request_t *req) -{ - size_t size; - uint32_t stream; - nxt_int_t rc; - nxt_buf_t *b; - nxt_port_t *router_port, *controller_port; - nxt_runtime_t *rt; - - rt = task->thread->runtime; - - router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; - - if (nxt_slow_path(router_port == NULL)) { - return NXT_DECLINED; - } - - controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER]; - - size = nxt_conf_json_length(req->conf.root, NULL); - - b = nxt_port_mmap_get_buf(task, router_port, size); - - b->mem.free = nxt_conf_json_print(b->mem.free, req->conf.root, NULL); - - stream = nxt_port_rpc_register_handler(task, controller_port, - nxt_controller_conf_handler, - nxt_controller_conf_handler, - router_port->pid, req); - - rc = nxt_port_socket_write(task, router_port, NXT_PORT_MSG_DATA_LAST, -1, - stream, controller_port->id, b); - - if (nxt_slow_path(rc != NXT_OK)) { - nxt_port_rpc_cancel(task, controller_port, stream); - return NXT_ERROR; - } - - nxt_queue_insert_head(&nxt_controller_waiting_requests, &req->link); - - return NXT_OK; -} - - static void nxt_controller_conf_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) |