diff options
Diffstat (limited to 'src/nxt_controller.c')
-rw-r--r-- | src/nxt_controller.c | 34 |
1 files changed, 24 insertions, 10 deletions
diff --git a/src/nxt_controller.c b/src/nxt_controller.c index 5ef1f944..f26dccf5 100644 --- a/src/nxt_controller.c +++ b/src/nxt_controller.c @@ -776,8 +776,9 @@ nxt_controller_conf_apply(nxt_task_t *task, nxt_controller_request_t *req) } -void -nxt_port_controller_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +static void +nxt_controller_conf_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, + void *data) { size_t size; nxt_buf_t *b; @@ -787,15 +788,14 @@ nxt_port_controller_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) b = msg->buf; size = b->mem.free - b->mem.pos; - nxt_debug(task, "contoller data: %*s ...", size, b->mem.pos); + nxt_debug(task, "controller conf ready: %*s ...", size, b->mem.pos); nxt_memzero(&resp, sizeof(nxt_controller_response_t)); req = nxt_controller_current_request; nxt_controller_current_request = NULL; - if (size == 2 && nxt_memcmp(b->mem.pos, "OK", 2) == 0) { - + if (msg->port_msg.type == NXT_PORT_MSG_RPC_READY) { nxt_mp_destroy(nxt_controller_conf.pool); nxt_controller_conf = req->conf; @@ -848,22 +848,36 @@ static nxt_int_t nxt_controller_conf_pass(nxt_task_t *task, nxt_conf_value_t *conf) { size_t size; + uint32_t stream; + nxt_int_t rc; nxt_buf_t *b; - nxt_port_t *port; + nxt_port_t *router_port, *controller_port; nxt_runtime_t *rt; rt = task->thread->runtime; - port = rt->port_by_type[NXT_PROCESS_ROUTER]; + router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; + controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER]; size = nxt_conf_json_length(conf, NULL); - b = nxt_port_mmap_get_buf(task, port, size); + b = nxt_port_mmap_get_buf(task, router_port, size); b->mem.free = nxt_conf_json_print(b->mem.free, conf, NULL); - return nxt_port_socket_write(task, port, NXT_PORT_MSG_DATA_LAST, -1, 0, - 0, b); + stream = nxt_port_rpc_register_handler(task, controller_port, + nxt_controller_conf_handler, + nxt_controller_conf_handler, + router_port->pid, NULL); + + rc = nxt_port_socket_write(task, router_port, NXT_PORT_MSG_DATA_LAST, -1, + stream, controller_port->id, b); + + if (nxt_slow_path(rc != NXT_OK)) { + nxt_port_rpc_cancel(task, controller_port, stream); + } + + return rc; } |