diff options
author | Valentin Bartenev <vbart@nginx.com> | 2017-08-28 10:20:40 +0300 |
---|---|---|
committer | Valentin Bartenev <vbart@nginx.com> | 2017-08-28 10:20:40 +0300 |
commit | 1a5ec7fd08e6c4efa8469a0547cdc05a282dd754 (patch) | |
tree | c10ce3755f34fddadcbfa35389020d7afc5f0813 | |
parent | 3ab49d14c0ce300189b13a8488f9cb5b1cf513ad (diff) | |
download | unit-1a5ec7fd08e6c4efa8469a0547cdc05a282dd754.tar.gz unit-1a5ec7fd08e6c4efa8469a0547cdc05a282dd754.tar.bz2 |
Improved reconfiguration requests serialization.
Previously, only applying of updated configuration was serialized,
while the changes themselves could be done in parallel on the same
configuration. That resulted in inconsistent behaviour.
-rw-r--r-- | src/nxt_controller.c | 130 |
1 files changed, 49 insertions, 81 deletions
diff --git a/src/nxt_controller.c b/src/nxt_controller.c index 313daf00..d03025b8 100644 --- a/src/nxt_controller.c +++ b/src/nxt_controller.c @@ -63,9 +63,8 @@ static void nxt_controller_process_request(nxt_task_t *task, nxt_controller_request_t *req); static nxt_int_t nxt_controller_conf_apply(nxt_task_t *task, nxt_controller_request_t *req); -static void nxt_controller_process_waiting(nxt_task_t *task); -static nxt_int_t nxt_controller_conf_pass(nxt_task_t *task, - nxt_conf_value_t *conf); +static void nxt_controller_conf_handler(nxt_task_t *task, + nxt_port_recv_msg_t *msg, void *data); static void nxt_controller_response(nxt_task_t *task, nxt_controller_request_t *req, nxt_controller_response_t *resp); static u_char *nxt_controller_date(u_char *buf, nxt_realtime_t *now, @@ -81,10 +80,8 @@ static nxt_http_fields_hash_entry_t nxt_controller_request_fields[] = { static nxt_http_fields_hash_t *nxt_controller_fields_hash; - -static nxt_controller_conf_t nxt_controller_conf; -static nxt_queue_t nxt_controller_waiting_requests; -static nxt_controller_request_t *nxt_controller_current_request; +static nxt_controller_conf_t nxt_controller_conf; +static nxt_queue_t nxt_controller_waiting_requests; static const nxt_event_conn_state_t nxt_controller_conn_read_state; @@ -584,6 +581,11 @@ nxt_controller_process_request(nxt_task_t *task, nxt_controller_request_t *req) if (nxt_str_eq(&req->parser.method, "PUT", 3)) { + if (!nxt_queue_is_empty(&nxt_controller_waiting_requests)) { + nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link); + return; + } + mp = nxt_mp_create(1024, 128, 256, 32); if (nxt_slow_path(mp == NULL)) { @@ -654,6 +656,11 @@ nxt_controller_process_request(nxt_task_t *task, nxt_controller_request_t *req) if (nxt_str_eq(&req->parser.method, "DELETE", 6)) { + if (!nxt_queue_is_empty(&nxt_controller_waiting_requests)) { + nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link); + return; + } + if (path.length == 1) { mp = nxt_mp_create(1024, 128, 256, 32); @@ -745,20 +752,38 @@ invalid_conf: static nxt_int_t nxt_controller_conf_apply(nxt_task_t *task, nxt_controller_request_t *req) { - nxt_int_t rc; + size_t size; + uint32_t stream; + nxt_int_t rc; + nxt_buf_t *b; + nxt_port_t *router_port, *controller_port; + nxt_runtime_t *rt; - if (nxt_controller_current_request != NULL) { - nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link); - return NXT_OK; - } + rt = task->thread->runtime; + + router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; + controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER]; + + size = nxt_conf_json_length(req->conf.root, NULL); + + b = nxt_port_mmap_get_buf(task, router_port, size); - rc = nxt_controller_conf_pass(task, req->conf.root); + b->mem.free = nxt_conf_json_print(b->mem.free, req->conf.root, NULL); + + stream = nxt_port_rpc_register_handler(task, controller_port, + nxt_controller_conf_handler, + nxt_controller_conf_handler, + router_port->pid, req); + + rc = nxt_port_socket_write(task, router_port, NXT_PORT_MSG_DATA_LAST, -1, + stream, controller_port->id, b); if (nxt_slow_path(rc != NXT_OK)) { + nxt_port_rpc_cancel(task, controller_port, stream); return NXT_ERROR; } - nxt_controller_current_request = req; + nxt_queue_insert_head(&nxt_controller_waiting_requests, &req->link); return NXT_OK; } @@ -768,16 +793,18 @@ static void nxt_controller_conf_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) { + nxt_queue_t queue; nxt_controller_request_t *req; nxt_controller_response_t resp; + req = data; + nxt_debug(task, "controller conf ready: %*s", nxt_buf_mem_used_size(&msg->buf->mem), msg->buf->mem.pos); - nxt_memzero(&resp, sizeof(nxt_controller_response_t)); + nxt_queue_remove(&req->link); - req = nxt_controller_current_request; - nxt_controller_current_request = NULL; + nxt_memzero(&resp, sizeof(nxt_controller_response_t)); if (msg->port_msg.type == NXT_PORT_MSG_RPC_READY) { nxt_mp_destroy(nxt_controller_conf.pool); @@ -797,76 +824,17 @@ nxt_controller_conf_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_controller_response(task, req, &resp); - nxt_controller_process_waiting(task); -} - - -static void -nxt_controller_process_waiting(nxt_task_t *task) -{ - nxt_controller_request_t *req; - nxt_controller_response_t resp; + nxt_queue_init(&queue); + nxt_queue_add(&queue, &nxt_controller_waiting_requests); - nxt_queue_each(req, &nxt_controller_waiting_requests, - nxt_controller_request_t, link) - { - nxt_queue_remove(&req->link); - - if (nxt_fast_path(nxt_controller_conf_apply(task, req) == NXT_OK)) { - return; - } - - nxt_mp_destroy(req->conf.pool); - - nxt_memzero(&resp, sizeof(nxt_controller_response_t)); - - resp.status = 500; - resp.title = (u_char *) "Memory allocation failed."; - resp.offset = -1; - - nxt_controller_response(task, req, &resp); + nxt_queue_init(&nxt_controller_waiting_requests); + nxt_queue_each(req, &queue, nxt_controller_request_t, link) { + nxt_controller_process_request(task, req); } nxt_queue_loop; } -static nxt_int_t -nxt_controller_conf_pass(nxt_task_t *task, nxt_conf_value_t *conf) -{ - size_t size; - uint32_t stream; - nxt_int_t rc; - nxt_buf_t *b; - nxt_port_t *router_port, *controller_port; - nxt_runtime_t *rt; - - rt = task->thread->runtime; - - router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; - controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER]; - - size = nxt_conf_json_length(conf, NULL); - - b = nxt_port_mmap_get_buf(task, router_port, size); - - b->mem.free = nxt_conf_json_print(b->mem.free, conf, NULL); - - stream = nxt_port_rpc_register_handler(task, controller_port, - nxt_controller_conf_handler, - nxt_controller_conf_handler, - router_port->pid, NULL); - - rc = nxt_port_socket_write(task, router_port, NXT_PORT_MSG_DATA_LAST, -1, - stream, controller_port->id, b); - - if (nxt_slow_path(rc != NXT_OK)) { - nxt_port_rpc_cancel(task, controller_port, stream); - } - - return rc; -} - - static void nxt_controller_response(nxt_task_t *task, nxt_controller_request_t *req, nxt_controller_response_t *resp) |