summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2017-08-02 13:20:57 +0300
committerMax Romanov <max.romanov@nginx.com>2017-08-02 13:20:57 +0300
commitbcf99f87e25d3702ec01595a5cf3e91c00cc6a98 (patch)
treeb425438c319b92f902427267f53d8aafdde3ea2c
parent82c0304ab8f4fb2d406fe709639bee535b45d888 (diff)
downloadunit-bcf99f87e25d3702ec01595a5cf3e91c00cc6a98.tar.gz
unit-bcf99f87e25d3702ec01595a5cf3e91c00cc6a98.tar.bz2
Using port rpc in controller->router configuration update.
Diffstat (limited to '')
-rw-r--r--src/nxt_controller.c34
-rw-r--r--src/nxt_router.c37
-rw-r--r--src/nxt_runtime.h3
-rw-r--r--src/nxt_worker_process.c2
4 files changed, 30 insertions, 46 deletions
diff --git a/src/nxt_controller.c b/src/nxt_controller.c
index 5ef1f944..f26dccf5 100644
--- a/src/nxt_controller.c
+++ b/src/nxt_controller.c
@@ -776,8 +776,9 @@ nxt_controller_conf_apply(nxt_task_t *task, nxt_controller_request_t *req)
}
-void
-nxt_port_controller_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
+static void
+nxt_controller_conf_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
+ void *data)
{
size_t size;
nxt_buf_t *b;
@@ -787,15 +788,14 @@ nxt_port_controller_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
b = msg->buf;
size = b->mem.free - b->mem.pos;
- nxt_debug(task, "contoller data: %*s ...", size, b->mem.pos);
+ nxt_debug(task, "controller conf ready: %*s ...", size, b->mem.pos);
nxt_memzero(&resp, sizeof(nxt_controller_response_t));
req = nxt_controller_current_request;
nxt_controller_current_request = NULL;
- if (size == 2 && nxt_memcmp(b->mem.pos, "OK", 2) == 0) {
-
+ if (msg->port_msg.type == NXT_PORT_MSG_RPC_READY) {
nxt_mp_destroy(nxt_controller_conf.pool);
nxt_controller_conf = req->conf;
@@ -848,22 +848,36 @@ static nxt_int_t
nxt_controller_conf_pass(nxt_task_t *task, nxt_conf_value_t *conf)
{
size_t size;
+ uint32_t stream;
+ nxt_int_t rc;
nxt_buf_t *b;
- nxt_port_t *port;
+ nxt_port_t *router_port, *controller_port;
nxt_runtime_t *rt;
rt = task->thread->runtime;
- port = rt->port_by_type[NXT_PROCESS_ROUTER];
+ router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
+ controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
size = nxt_conf_json_length(conf, NULL);
- b = nxt_port_mmap_get_buf(task, port, size);
+ b = nxt_port_mmap_get_buf(task, router_port, size);
b->mem.free = nxt_conf_json_print(b->mem.free, conf, NULL);
- return nxt_port_socket_write(task, port, NXT_PORT_MSG_DATA_LAST, -1, 0,
- 0, b);
+ stream = nxt_port_rpc_register_handler(task, controller_port,
+ nxt_controller_conf_handler,
+ nxt_controller_conf_handler,
+ router_port->pid, NULL);
+
+ rc = nxt_port_socket_write(task, router_port, NXT_PORT_MSG_DATA_LAST, -1,
+ stream, controller_port->id, b);
+
+ if (nxt_slow_path(rc != NXT_OK)) {
+ nxt_port_rpc_cancel(task, controller_port, stream);
+ }
+
+ return rc;
}
diff --git a/src/nxt_router.c b/src/nxt_router.c
index 40da942a..4cf476f6 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -53,9 +53,7 @@ static void nxt_router_conf_success(nxt_task_t *task,
static void nxt_router_conf_error(nxt_task_t *task,
nxt_router_temp_conf_t *tmcf);
static void nxt_router_conf_send(nxt_task_t *task,
- nxt_router_temp_conf_t *tmcf, u_char *start, size_t size);
-static void nxt_router_conf_buf_completion(nxt_task_t *task, void *obj,
- void *data);
+ nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
static void nxt_router_listen_sockets_sort(nxt_router_t *router,
nxt_router_temp_conf_t *tmcf);
@@ -490,7 +488,7 @@ nxt_router_conf_success(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
nxt_debug(task, "temp conf count:%D", tmcf->count);
if (--tmcf->count == 0) {
- nxt_router_conf_send(task, tmcf, (u_char *) "OK", 2);
+ nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
}
}
@@ -526,40 +524,15 @@ nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
nxt_mp_destroy(tmcf->conf->mem_pool);
- nxt_router_conf_send(task, tmcf, (u_char *) "ERROR", 5);
+ nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR);
}
static void
nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
- u_char *start, size_t size)
+ nxt_port_msg_type_t type)
{
- nxt_buf_t *b;
-
- b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
- if (nxt_slow_path(b == NULL)) {
- return;
- }
-
- b->mem.free = nxt_cpymem(b->mem.free, start, size);
-
- b->parent = tmcf->mem_pool;
- b->completion_handler = nxt_router_conf_buf_completion;
-
- nxt_port_socket_write(task, tmcf->port, NXT_PORT_MSG_DATA_LAST, -1,
- tmcf->stream, 0, b);
-}
-
-
-static void
-nxt_router_conf_buf_completion(nxt_task_t *task, void *obj, void *data)
-{
- nxt_mp_t *mp;
-
- /* nxt_router_temp_conf_t mem pool. */
- mp = data;
-
- nxt_mp_destroy(mp);
+ nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL);
}
diff --git a/src/nxt_runtime.h b/src/nxt_runtime.h
index 01087719..8ee8560b 100644
--- a/src/nxt_runtime.h
+++ b/src/nxt_runtime.h
@@ -135,9 +135,6 @@ nxt_port_t *nxt_runtime_port_first(nxt_runtime_t *rt,
/* STUB */
nxt_int_t nxt_runtime_controller_socket(nxt_task_t *task, nxt_runtime_t *rt);
-void nxt_port_controller_data_handler(nxt_task_t *task,
- nxt_port_recv_msg_t *msg);
-
nxt_str_t *nxt_current_directory(nxt_mp_t *mp);
nxt_listen_socket_t *nxt_runtime_listen_socket_add(nxt_runtime_t *rt,
diff --git a/src/nxt_worker_process.c b/src/nxt_worker_process.c
index 181d854e..fd705f97 100644
--- a/src/nxt_worker_process.c
+++ b/src/nxt_worker_process.c
@@ -27,7 +27,7 @@ nxt_port_handler_t nxt_controller_process_port_handlers[] = {
nxt_port_new_port_handler,
nxt_port_change_log_file_handler,
nxt_port_mmap_handler,
- nxt_port_controller_data_handler,
+ nxt_port_data_handler,
nxt_port_remove_pid_handler,
NULL, /* NXT_PORT_MSG_READY */
NULL, /* NXT_PORT_MSG_START_WORKER */