diff options
author | Max Romanov <max.romanov@nginx.com> | 2017-08-02 13:20:57 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2017-08-02 13:20:57 +0300 |
commit | bcf99f87e25d3702ec01595a5cf3e91c00cc6a98 (patch) | |
tree | b425438c319b92f902427267f53d8aafdde3ea2c | |
parent | 82c0304ab8f4fb2d406fe709639bee535b45d888 (diff) | |
download | unit-bcf99f87e25d3702ec01595a5cf3e91c00cc6a98.tar.gz unit-bcf99f87e25d3702ec01595a5cf3e91c00cc6a98.tar.bz2 |
Using port rpc in controller->router configuration update.
Diffstat (limited to '')
-rw-r--r-- | src/nxt_controller.c | 34 | ||||
-rw-r--r-- | src/nxt_router.c | 37 | ||||
-rw-r--r-- | src/nxt_runtime.h | 3 | ||||
-rw-r--r-- | src/nxt_worker_process.c | 2 |
4 files changed, 30 insertions, 46 deletions
diff --git a/src/nxt_controller.c b/src/nxt_controller.c index 5ef1f944..f26dccf5 100644 --- a/src/nxt_controller.c +++ b/src/nxt_controller.c @@ -776,8 +776,9 @@ nxt_controller_conf_apply(nxt_task_t *task, nxt_controller_request_t *req) } -void -nxt_port_controller_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +static void +nxt_controller_conf_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, + void *data) { size_t size; nxt_buf_t *b; @@ -787,15 +788,14 @@ nxt_port_controller_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) b = msg->buf; size = b->mem.free - b->mem.pos; - nxt_debug(task, "contoller data: %*s ...", size, b->mem.pos); + nxt_debug(task, "controller conf ready: %*s ...", size, b->mem.pos); nxt_memzero(&resp, sizeof(nxt_controller_response_t)); req = nxt_controller_current_request; nxt_controller_current_request = NULL; - if (size == 2 && nxt_memcmp(b->mem.pos, "OK", 2) == 0) { - + if (msg->port_msg.type == NXT_PORT_MSG_RPC_READY) { nxt_mp_destroy(nxt_controller_conf.pool); nxt_controller_conf = req->conf; @@ -848,22 +848,36 @@ static nxt_int_t nxt_controller_conf_pass(nxt_task_t *task, nxt_conf_value_t *conf) { size_t size; + uint32_t stream; + nxt_int_t rc; nxt_buf_t *b; - nxt_port_t *port; + nxt_port_t *router_port, *controller_port; nxt_runtime_t *rt; rt = task->thread->runtime; - port = rt->port_by_type[NXT_PROCESS_ROUTER]; + router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; + controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER]; size = nxt_conf_json_length(conf, NULL); - b = nxt_port_mmap_get_buf(task, port, size); + b = nxt_port_mmap_get_buf(task, router_port, size); b->mem.free = nxt_conf_json_print(b->mem.free, conf, NULL); - return nxt_port_socket_write(task, port, NXT_PORT_MSG_DATA_LAST, -1, 0, - 0, b); + stream = nxt_port_rpc_register_handler(task, controller_port, + nxt_controller_conf_handler, + nxt_controller_conf_handler, + router_port->pid, NULL); + + rc = nxt_port_socket_write(task, router_port, NXT_PORT_MSG_DATA_LAST, -1, + stream, controller_port->id, b); + + if (nxt_slow_path(rc != NXT_OK)) { + nxt_port_rpc_cancel(task, controller_port, stream); + } + + return rc; } diff --git a/src/nxt_router.c b/src/nxt_router.c index 40da942a..4cf476f6 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -53,9 +53,7 @@ static void nxt_router_conf_success(nxt_task_t *task, static void nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf); static void nxt_router_conf_send(nxt_task_t *task, - nxt_router_temp_conf_t *tmcf, u_char *start, size_t size); -static void nxt_router_conf_buf_completion(nxt_task_t *task, void *obj, - void *data); + nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type); static void nxt_router_listen_sockets_sort(nxt_router_t *router, nxt_router_temp_conf_t *tmcf); @@ -490,7 +488,7 @@ nxt_router_conf_success(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) nxt_debug(task, "temp conf count:%D", tmcf->count); if (--tmcf->count == 0) { - nxt_router_conf_send(task, tmcf, (u_char *) "OK", 2); + nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST); } } @@ -526,40 +524,15 @@ nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) nxt_mp_destroy(tmcf->conf->mem_pool); - nxt_router_conf_send(task, tmcf, (u_char *) "ERROR", 5); + nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR); } static void nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, - u_char *start, size_t size) + nxt_port_msg_type_t type) { - nxt_buf_t *b; - - b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); - if (nxt_slow_path(b == NULL)) { - return; - } - - b->mem.free = nxt_cpymem(b->mem.free, start, size); - - b->parent = tmcf->mem_pool; - b->completion_handler = nxt_router_conf_buf_completion; - - nxt_port_socket_write(task, tmcf->port, NXT_PORT_MSG_DATA_LAST, -1, - tmcf->stream, 0, b); -} - - -static void -nxt_router_conf_buf_completion(nxt_task_t *task, void *obj, void *data) -{ - nxt_mp_t *mp; - - /* nxt_router_temp_conf_t mem pool. */ - mp = data; - - nxt_mp_destroy(mp); + nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL); } diff --git a/src/nxt_runtime.h b/src/nxt_runtime.h index 01087719..8ee8560b 100644 --- a/src/nxt_runtime.h +++ b/src/nxt_runtime.h @@ -135,9 +135,6 @@ nxt_port_t *nxt_runtime_port_first(nxt_runtime_t *rt, /* STUB */ nxt_int_t nxt_runtime_controller_socket(nxt_task_t *task, nxt_runtime_t *rt); -void nxt_port_controller_data_handler(nxt_task_t *task, - nxt_port_recv_msg_t *msg); - nxt_str_t *nxt_current_directory(nxt_mp_t *mp); nxt_listen_socket_t *nxt_runtime_listen_socket_add(nxt_runtime_t *rt, diff --git a/src/nxt_worker_process.c b/src/nxt_worker_process.c index 181d854e..fd705f97 100644 --- a/src/nxt_worker_process.c +++ b/src/nxt_worker_process.c @@ -27,7 +27,7 @@ nxt_port_handler_t nxt_controller_process_port_handlers[] = { nxt_port_new_port_handler, nxt_port_change_log_file_handler, nxt_port_mmap_handler, - nxt_port_controller_data_handler, + nxt_port_data_handler, nxt_port_remove_pid_handler, NULL, /* NXT_PORT_MSG_READY */ NULL, /* NXT_PORT_MSG_START_WORKER */ |