summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_controller.c
diff options
context:
space:
mode:
authorValentin Bartenev <vbart@nginx.com>2017-08-28 10:20:40 +0300
committerValentin Bartenev <vbart@nginx.com>2017-08-28 10:20:40 +0300
commit1a5ec7fd08e6c4efa8469a0547cdc05a282dd754 (patch)
treec10ce3755f34fddadcbfa35389020d7afc5f0813 /src/nxt_controller.c
parent3ab49d14c0ce300189b13a8488f9cb5b1cf513ad (diff)
downloadunit-1a5ec7fd08e6c4efa8469a0547cdc05a282dd754.tar.gz
unit-1a5ec7fd08e6c4efa8469a0547cdc05a282dd754.tar.bz2
Improved reconfiguration requests serialization.
Previously, only applying of updated configuration was serialized, while the changes themselves could be done in parallel on the same configuration. That resulted in inconsistent behaviour.
Diffstat (limited to 'src/nxt_controller.c')
-rw-r--r--src/nxt_controller.c130
1 files changed, 49 insertions, 81 deletions
diff --git a/src/nxt_controller.c b/src/nxt_controller.c
index 313daf00..d03025b8 100644
--- a/src/nxt_controller.c
+++ b/src/nxt_controller.c
@@ -63,9 +63,8 @@ static void nxt_controller_process_request(nxt_task_t *task,
nxt_controller_request_t *req);
static nxt_int_t nxt_controller_conf_apply(nxt_task_t *task,
nxt_controller_request_t *req);
-static void nxt_controller_process_waiting(nxt_task_t *task);
-static nxt_int_t nxt_controller_conf_pass(nxt_task_t *task,
- nxt_conf_value_t *conf);
+static void nxt_controller_conf_handler(nxt_task_t *task,
+ nxt_port_recv_msg_t *msg, void *data);
static void nxt_controller_response(nxt_task_t *task,
nxt_controller_request_t *req, nxt_controller_response_t *resp);
static u_char *nxt_controller_date(u_char *buf, nxt_realtime_t *now,
@@ -81,10 +80,8 @@ static nxt_http_fields_hash_entry_t nxt_controller_request_fields[] = {
static nxt_http_fields_hash_t *nxt_controller_fields_hash;
-
-static nxt_controller_conf_t nxt_controller_conf;
-static nxt_queue_t nxt_controller_waiting_requests;
-static nxt_controller_request_t *nxt_controller_current_request;
+static nxt_controller_conf_t nxt_controller_conf;
+static nxt_queue_t nxt_controller_waiting_requests;
static const nxt_event_conn_state_t nxt_controller_conn_read_state;
@@ -584,6 +581,11 @@ nxt_controller_process_request(nxt_task_t *task, nxt_controller_request_t *req)
if (nxt_str_eq(&req->parser.method, "PUT", 3)) {
+ if (!nxt_queue_is_empty(&nxt_controller_waiting_requests)) {
+ nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link);
+ return;
+ }
+
mp = nxt_mp_create(1024, 128, 256, 32);
if (nxt_slow_path(mp == NULL)) {
@@ -654,6 +656,11 @@ nxt_controller_process_request(nxt_task_t *task, nxt_controller_request_t *req)
if (nxt_str_eq(&req->parser.method, "DELETE", 6)) {
+ if (!nxt_queue_is_empty(&nxt_controller_waiting_requests)) {
+ nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link);
+ return;
+ }
+
if (path.length == 1) {
mp = nxt_mp_create(1024, 128, 256, 32);
@@ -745,20 +752,38 @@ invalid_conf:
static nxt_int_t
nxt_controller_conf_apply(nxt_task_t *task, nxt_controller_request_t *req)
{
- nxt_int_t rc;
+ size_t size;
+ uint32_t stream;
+ nxt_int_t rc;
+ nxt_buf_t *b;
+ nxt_port_t *router_port, *controller_port;
+ nxt_runtime_t *rt;
- if (nxt_controller_current_request != NULL) {
- nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link);
- return NXT_OK;
- }
+ rt = task->thread->runtime;
+
+ router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
+ controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
+
+ size = nxt_conf_json_length(req->conf.root, NULL);
+
+ b = nxt_port_mmap_get_buf(task, router_port, size);
- rc = nxt_controller_conf_pass(task, req->conf.root);
+ b->mem.free = nxt_conf_json_print(b->mem.free, req->conf.root, NULL);
+
+ stream = nxt_port_rpc_register_handler(task, controller_port,
+ nxt_controller_conf_handler,
+ nxt_controller_conf_handler,
+ router_port->pid, req);
+
+ rc = nxt_port_socket_write(task, router_port, NXT_PORT_MSG_DATA_LAST, -1,
+ stream, controller_port->id, b);
if (nxt_slow_path(rc != NXT_OK)) {
+ nxt_port_rpc_cancel(task, controller_port, stream);
return NXT_ERROR;
}
- nxt_controller_current_request = req;
+ nxt_queue_insert_head(&nxt_controller_waiting_requests, &req->link);
return NXT_OK;
}
@@ -768,16 +793,18 @@ static void
nxt_controller_conf_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
void *data)
{
+ nxt_queue_t queue;
nxt_controller_request_t *req;
nxt_controller_response_t resp;
+ req = data;
+
nxt_debug(task, "controller conf ready: %*s",
nxt_buf_mem_used_size(&msg->buf->mem), msg->buf->mem.pos);
- nxt_memzero(&resp, sizeof(nxt_controller_response_t));
+ nxt_queue_remove(&req->link);
- req = nxt_controller_current_request;
- nxt_controller_current_request = NULL;
+ nxt_memzero(&resp, sizeof(nxt_controller_response_t));
if (msg->port_msg.type == NXT_PORT_MSG_RPC_READY) {
nxt_mp_destroy(nxt_controller_conf.pool);
@@ -797,76 +824,17 @@ nxt_controller_conf_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_controller_response(task, req, &resp);
- nxt_controller_process_waiting(task);
-}
-
-
-static void
-nxt_controller_process_waiting(nxt_task_t *task)
-{
- nxt_controller_request_t *req;
- nxt_controller_response_t resp;
+ nxt_queue_init(&queue);
+ nxt_queue_add(&queue, &nxt_controller_waiting_requests);
- nxt_queue_each(req, &nxt_controller_waiting_requests,
- nxt_controller_request_t, link)
- {
- nxt_queue_remove(&req->link);
-
- if (nxt_fast_path(nxt_controller_conf_apply(task, req) == NXT_OK)) {
- return;
- }
-
- nxt_mp_destroy(req->conf.pool);
-
- nxt_memzero(&resp, sizeof(nxt_controller_response_t));
-
- resp.status = 500;
- resp.title = (u_char *) "Memory allocation failed.";
- resp.offset = -1;
-
- nxt_controller_response(task, req, &resp);
+ nxt_queue_init(&nxt_controller_waiting_requests);
+ nxt_queue_each(req, &queue, nxt_controller_request_t, link) {
+ nxt_controller_process_request(task, req);
} nxt_queue_loop;
}
-static nxt_int_t
-nxt_controller_conf_pass(nxt_task_t *task, nxt_conf_value_t *conf)
-{
- size_t size;
- uint32_t stream;
- nxt_int_t rc;
- nxt_buf_t *b;
- nxt_port_t *router_port, *controller_port;
- nxt_runtime_t *rt;
-
- rt = task->thread->runtime;
-
- router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
- controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
-
- size = nxt_conf_json_length(conf, NULL);
-
- b = nxt_port_mmap_get_buf(task, router_port, size);
-
- b->mem.free = nxt_conf_json_print(b->mem.free, conf, NULL);
-
- stream = nxt_port_rpc_register_handler(task, controller_port,
- nxt_controller_conf_handler,
- nxt_controller_conf_handler,
- router_port->pid, NULL);
-
- rc = nxt_port_socket_write(task, router_port, NXT_PORT_MSG_DATA_LAST, -1,
- stream, controller_port->id, b);
-
- if (nxt_slow_path(rc != NXT_OK)) {
- nxt_port_rpc_cancel(task, controller_port, stream);
- }
-
- return rc;
-}
-
-
static void
nxt_controller_response(nxt_task_t *task, nxt_controller_request_t *req,
nxt_controller_response_t *resp)