summaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2018-04-26 16:44:20 +0300
committerMax Romanov <max.romanov@nginx.com>2018-04-26 16:44:20 +0300
commit179819dbee58242f95c2b866c67a4d6a0a863ef6 (patch)
treea60a57504830cc93ef1c53104ab769d712cbccf2 /src
parent17160e93529ad72805617b4b7674f28bd1a627d7 (diff)
downloadunit-179819dbee58242f95c2b866c67a4d6a0a863ef6.tar.gz
unit-179819dbee58242f95c2b866c67a4d6a0a863ef6.tar.bz2
Controller waits READY message from router.
This required to avoid racing condition when controller receive router port before router receives controller port.
Diffstat (limited to '')
-rw-r--r--src/nxt_controller.c61
-rw-r--r--src/nxt_router.c34
-rw-r--r--src/nxt_worker_process.c13
3 files changed, 81 insertions, 27 deletions
diff --git a/src/nxt_controller.c b/src/nxt_controller.c
index 0b863f0c..17627e83 100644
--- a/src/nxt_controller.c
+++ b/src/nxt_controller.c
@@ -40,6 +40,9 @@ typedef struct {
static void nxt_controller_process_new_port_handler(nxt_task_t *task,
nxt_port_recv_msg_t *msg);
+static void nxt_controller_send_current_conf(nxt_task_t *task);
+static void nxt_controller_router_ready_handler(nxt_task_t *task,
+ nxt_port_recv_msg_t *msg);
static nxt_int_t nxt_controller_conf_default(void);
static void nxt_controller_conf_init_handler(nxt_task_t *task,
nxt_port_recv_msg_t *msg, void *data);
@@ -87,6 +90,7 @@ static nxt_http_field_proc_t nxt_controller_request_fields[] = {
static nxt_lvlhsh_t nxt_controller_fields_hash;
static nxt_uint_t nxt_controller_listening;
+static nxt_uint_t nxt_controller_router_ready;
static nxt_controller_conf_t nxt_controller_conf;
static nxt_queue_t nxt_controller_waiting_requests;
@@ -98,14 +102,15 @@ static const nxt_event_conn_state_t nxt_controller_conn_close_state;
nxt_port_handlers_t nxt_controller_process_port_handlers = {
- .quit = nxt_worker_process_quit_handler,
- .new_port = nxt_controller_process_new_port_handler,
- .change_file = nxt_port_change_log_file_handler,
- .mmap = nxt_port_mmap_handler,
- .data = nxt_port_data_handler,
- .remove_pid = nxt_port_remove_pid_handler,
- .rpc_ready = nxt_port_rpc_handler,
- .rpc_error = nxt_port_rpc_handler,
+ .quit = nxt_worker_process_quit_handler,
+ .new_port = nxt_controller_process_new_port_handler,
+ .change_file = nxt_port_change_log_file_handler,
+ .mmap = nxt_port_mmap_handler,
+ .process_ready = nxt_controller_router_ready_handler,
+ .data = nxt_port_data_handler,
+ .remove_pid = nxt_port_remove_pid_handler,
+ .rpc_ready = nxt_port_rpc_handler,
+ .rpc_error = nxt_port_rpc_handler,
};
@@ -202,16 +207,25 @@ static void
nxt_controller_process_new_port_handler(nxt_task_t *task,
nxt_port_recv_msg_t *msg)
{
- nxt_int_t rc;
- nxt_runtime_t *rt;
- nxt_conf_value_t *conf;
-
nxt_port_new_port_handler(task, msg);
- if (msg->u.new_port->type != NXT_PROCESS_ROUTER) {
+ if (msg->u.new_port->type != NXT_PROCESS_ROUTER
+ || !nxt_controller_router_ready)
+ {
return;
}
+ nxt_controller_send_current_conf(task);
+}
+
+
+static void
+nxt_controller_send_current_conf(nxt_task_t *task)
+{
+ nxt_int_t rc;
+ nxt_runtime_t *rt;
+ nxt_conf_value_t *conf;
+
conf = nxt_controller_conf.root;
if (conf != NULL) {
@@ -243,6 +257,25 @@ nxt_controller_process_new_port_handler(nxt_task_t *task,
}
+static void
+nxt_controller_router_ready_handler(nxt_task_t *task,
+ nxt_port_recv_msg_t *msg)
+{
+ nxt_port_t *router_port;
+ nxt_runtime_t *rt;
+
+ rt = task->thread->runtime;
+
+ router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
+
+ nxt_controller_router_ready = 1;
+
+ if (router_port != NULL) {
+ nxt_controller_send_current_conf(task);
+ }
+}
+
+
static nxt_int_t
nxt_controller_conf_default(void)
{
@@ -316,7 +349,7 @@ nxt_controller_conf_send(nxt_task_t *task, nxt_conf_value_t *conf,
router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
- if (nxt_slow_path(router_port == NULL)) {
+ if (nxt_slow_path(router_port == NULL || !nxt_controller_router_ready)) {
return NXT_DECLINED;
}
diff --git a/src/nxt_router.c b/src/nxt_router.c
index ddedaa59..48ad55cd 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -100,6 +100,9 @@ struct nxt_port_select_state_s {
typedef struct nxt_port_select_state_s nxt_port_select_state_t;
+static void nxt_router_greet_controller(nxt_task_t *task,
+ nxt_port_t *controller_port);
+
static void nxt_router_port_select(nxt_task_t *task,
nxt_port_select_state_t *state);
@@ -275,10 +278,24 @@ static nxt_app_prepare_msg_t nxt_app_prepare_msg[] = {
};
+nxt_port_handlers_t nxt_router_process_port_handlers = {
+ .quit = nxt_worker_process_quit_handler,
+ .new_port = nxt_router_new_port_handler,
+ .change_file = nxt_port_change_log_file_handler,
+ .mmap = nxt_port_mmap_handler,
+ .data = nxt_router_conf_data_handler,
+ .remove_pid = nxt_router_remove_pid_handler,
+ .access_log = nxt_router_access_log_reopen_handler,
+ .rpc_ready = nxt_port_rpc_handler,
+ .rpc_error = nxt_port_rpc_handler,
+};
+
+
nxt_int_t
nxt_router_start(nxt_task_t *task, void *data)
{
nxt_int_t ret;
+ nxt_port_t *controller_port;
nxt_router_t *router;
nxt_runtime_t *rt;
@@ -300,11 +317,24 @@ nxt_router_start(nxt_task_t *task, void *data)
nxt_router = router;
+ controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
+ if (controller_port != NULL) {
+ nxt_router_greet_controller(task, controller_port);
+ }
+
return NXT_OK;
}
static void
+nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port)
+{
+ nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY,
+ -1, 0, 0, NULL);
+}
+
+
+static void
nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
void *data)
{
@@ -749,6 +779,10 @@ nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
nxt_port_new_port_handler(task, msg);
+ if (msg->u.new_port->type == NXT_PROCESS_CONTROLLER) {
+ nxt_router_greet_controller(task, msg->u.new_port);
+ }
+
if (msg->port_msg.stream == 0) {
return;
}
diff --git a/src/nxt_worker_process.c b/src/nxt_worker_process.c
index 0e23aea0..bf35f2b5 100644
--- a/src/nxt_worker_process.c
+++ b/src/nxt_worker_process.c
@@ -30,19 +30,6 @@ nxt_port_handlers_t nxt_app_process_port_handlers = {
};
-nxt_port_handlers_t nxt_router_process_port_handlers = {
- .quit = nxt_worker_process_quit_handler,
- .new_port = nxt_router_new_port_handler,
- .change_file = nxt_port_change_log_file_handler,
- .mmap = nxt_port_mmap_handler,
- .data = nxt_router_conf_data_handler,
- .remove_pid = nxt_router_remove_pid_handler,
- .access_log = nxt_router_access_log_reopen_handler,
- .rpc_ready = nxt_port_rpc_handler,
- .rpc_error = nxt_port_rpc_handler,
-};
-
-
nxt_port_handlers_t nxt_discovery_process_port_handlers = {
.quit = nxt_worker_process_quit_handler,
.new_port = nxt_port_new_port_handler,