summaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorValentin Bartenev <vbart@nginx.com>2017-07-12 20:21:17 +0300
committerValentin Bartenev <vbart@nginx.com>2017-07-12 20:21:17 +0300
commitc38bcb7d70729434893ae4d5f2f58a78a36d2bd5 (patch)
treee05be6b95fc1fda97255e9c03dbb89e8acf8842b /src
parent4f4061061b023cd88a2c54121853a8cb1922f100 (diff)
downloadunit-c38bcb7d70729434893ae4d5f2f58a78a36d2bd5.tar.gz
unit-c38bcb7d70729434893ae4d5f2f58a78a36d2bd5.tar.bz2
Controller: proper reconfiguration requests handling.
Now controller serializes all reconfiguration requests and waits for result from router.
Diffstat (limited to 'src')
-rw-r--r--src/nxt_controller.c179
-rw-r--r--src/nxt_master_process.c2
-rw-r--r--src/nxt_master_process.h1
-rw-r--r--src/nxt_router.c2
-rw-r--r--src/nxt_runtime.h3
-rw-r--r--src/nxt_worker_process.c10
6 files changed, 153 insertions, 44 deletions
diff --git a/src/nxt_controller.c b/src/nxt_controller.c
index 85454550..4cf6ff95 100644
--- a/src/nxt_controller.c
+++ b/src/nxt_controller.c
@@ -20,8 +20,9 @@ typedef struct {
typedef struct {
nxt_http_request_parse_t parser;
size_t length;
-
nxt_controller_conf_t conf;
+ nxt_conn_t *conn;
+ nxt_queue_link_t link;
} nxt_controller_request_t;
@@ -54,11 +55,14 @@ static nxt_int_t nxt_controller_request_content_length(void *ctx,
nxt_http_field_t *field, nxt_log_t *log);
static void nxt_controller_process_request(nxt_task_t *task,
- nxt_conn_t *c, nxt_controller_request_t *r);
+ nxt_controller_request_t *req);
static nxt_int_t nxt_controller_conf_apply(nxt_task_t *task,
+ nxt_controller_request_t *req);
+static void nxt_controller_process_waiting(nxt_task_t *task);
+static nxt_int_t nxt_controller_conf_pass(nxt_task_t *task,
nxt_conf_value_t *conf);
-static nxt_int_t nxt_controller_response(nxt_task_t *task, nxt_conn_t *c,
- nxt_controller_response_t *resp);
+static void nxt_controller_response(nxt_task_t *task,
+ nxt_controller_request_t *req, nxt_controller_response_t *resp);
static nxt_buf_t *nxt_controller_response_body(nxt_controller_response_t *resp,
nxt_mp_t *pool);
@@ -73,7 +77,9 @@ static nxt_http_fields_hash_entry_t nxt_controller_request_fields[] = {
static nxt_http_fields_hash_t *nxt_controller_fields_hash;
-static nxt_controller_conf_t nxt_controller_conf;
+static nxt_controller_conf_t nxt_controller_conf;
+static nxt_queue_t nxt_controller_waiting_requests;
+static nxt_controller_request_t *nxt_controller_current_request;
static const nxt_event_conn_state_t nxt_controller_conn_read_state;
@@ -119,6 +125,8 @@ nxt_controller_start(nxt_task_t *task, nxt_runtime_t *rt)
nxt_controller_conf.root = conf;
nxt_controller_conf.pool = mp;
+ nxt_queue_init(&nxt_controller_waiting_requests);
+
return NXT_OK;
}
@@ -210,6 +218,8 @@ nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data)
return;
}
+ r->conn = c;
+
if (nxt_slow_path(nxt_http_parse_request_init(&r->parser, c->mem_pool)
!= NXT_OK))
{
@@ -307,7 +317,7 @@ nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data)
r->length, preread);
if (preread >= r->length) {
- nxt_controller_process_request(task, c, r);
+ nxt_controller_process_request(task, r);
return;
}
@@ -399,7 +409,7 @@ nxt_controller_conn_body_read(nxt_task_t *task, void *obj, void *data)
read, r->length);
if (read >= r->length) {
- nxt_controller_process_request(task, c, data);
+ nxt_controller_process_request(task, r);
return;
}
@@ -542,12 +552,12 @@ nxt_controller_request_content_length(void *ctx, nxt_http_field_t *field,
static void
-nxt_controller_process_request(nxt_task_t *task, nxt_conn_t *c,
- nxt_controller_request_t *req)
+nxt_controller_process_request(nxt_task_t *task, nxt_controller_request_t *req)
{
nxt_mp_t *mp;
nxt_int_t rc;
nxt_str_t path;
+ nxt_conn_t *c;
nxt_uint_t status;
nxt_buf_mem_t *mbuf;
nxt_conf_op_t *ops;
@@ -556,6 +566,7 @@ nxt_controller_process_request(nxt_task_t *task, nxt_conn_t *c,
static const nxt_str_t empty_obj = nxt_string("{}");
+ c = req->conn;
path = req->parser.path;
if (path.length > 1 && path.start[path.length - 1] == '/') {
@@ -631,21 +642,16 @@ nxt_controller_process_request(nxt_task_t *task, nxt_conn_t *c,
goto done;
}
- if (nxt_slow_path(nxt_controller_conf_apply(task, value) != NXT_OK)) {
+ req->conf.root = value;
+ req->conf.pool = mp;
+
+ if (nxt_controller_conf_apply(task, req) != NXT_OK) {
nxt_mp_destroy(mp);
status = 500;
goto done;
}
- nxt_mp_destroy(nxt_controller_conf.pool);
-
- nxt_controller_conf.root = value;
- nxt_controller_conf.pool = mp;
-
- nxt_str_set(&resp.json, "{ \"success\": \"Updated.\" }");
-
- status = 200;
- goto done;
+ return;
}
if (nxt_str_eq(&req->parser.method, "DELETE", 6)) {
@@ -699,21 +705,16 @@ nxt_controller_process_request(nxt_task_t *task, nxt_conn_t *c,
goto done;
}
- if (nxt_slow_path(nxt_controller_conf_apply(task, value) != NXT_OK)) {
+ req->conf.root = value;
+ req->conf.pool = mp;
+
+ if (nxt_controller_conf_apply(task, req) != NXT_OK) {
nxt_mp_destroy(mp);
status = 500;
goto done;
}
- nxt_mp_destroy(nxt_controller_conf.pool);
-
- nxt_controller_conf.root = value;
- nxt_controller_conf.pool = mp;
-
- nxt_str_set(&resp.json, "{ \"success\": \"Deleted.\" }");
-
- status = 200;
- goto done;
+ return;
}
status = 405;
@@ -746,14 +747,104 @@ done:
break;
}
- if (nxt_controller_response(task, c, &resp) != NXT_OK) {
- nxt_controller_conn_close(task, c, req);
+ nxt_controller_response(task, req, &resp);
+}
+
+
+static nxt_int_t
+nxt_controller_conf_apply(nxt_task_t *task, nxt_controller_request_t *req)
+{
+ nxt_int_t rc;
+
+ if (nxt_controller_current_request != NULL) {
+ nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link);
+ return NXT_OK;
+ }
+
+ rc = nxt_controller_conf_pass(task, req->conf.root);
+
+ if (nxt_slow_path(rc != NXT_OK)) {
+ return NXT_ERROR;
}
+
+ nxt_controller_current_request = req;
+
+ return NXT_OK;
+}
+
+
+void
+nxt_port_controller_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
+{
+ size_t size, dump_size;
+ nxt_buf_t *b;
+ nxt_controller_request_t *req;
+ nxt_controller_response_t resp;
+
+ b = msg->buf;
+ size = b->mem.free - b->mem.pos;
+
+ dump_size = size > 300 ? 300 : size;
+
+ nxt_debug(task, "contoller data: %*s ...", dump_size, b->mem.pos);
+
+ nxt_memzero(&resp, sizeof(nxt_controller_response_t));
+
+ req = nxt_controller_current_request;
+ nxt_controller_current_request = NULL;
+
+ if (size == 2 && nxt_memcmp(b->mem.pos, "OK", 2) == 0) {
+
+ nxt_mp_destroy(nxt_controller_conf.pool);
+
+ nxt_controller_conf = req->conf;
+
+ nxt_str_set(&resp.status_line, "200 OK");
+ nxt_str_set(&resp.json, "{ \"success\": \"Reconfiguration done.\" }");
+
+ } else {
+ nxt_mp_destroy(req->conf.pool);
+
+ nxt_str_set(&resp.status_line, "500 Internal Server Error");
+ nxt_str_set(&resp.json,
+ "{ \"error\": \"Failed to apply new configuration.\" }");
+ }
+
+ nxt_controller_response(task, req, &resp);
+
+ nxt_controller_process_waiting(task);
+}
+
+
+static void
+nxt_controller_process_waiting(nxt_task_t *task)
+{
+ nxt_controller_request_t *req;
+ nxt_controller_response_t resp;
+
+ nxt_queue_each(req, &nxt_controller_waiting_requests,
+ nxt_controller_request_t, link)
+ {
+ nxt_queue_remove(&req->link);
+
+ if (nxt_fast_path(nxt_controller_conf_apply(task, req) == NXT_OK)) {
+ return;
+ }
+
+ nxt_mp_destroy(req->conf.pool);
+
+ nxt_str_set(&resp.status_line, "500 Internal Server Error");
+ nxt_str_set(&resp.json,
+ "{ \"error\": \"Memory allocation failed.\" }");
+
+ nxt_controller_response(task, req, &resp);
+
+ } nxt_queue_loop;
}
static nxt_int_t
-nxt_controller_conf_apply(nxt_task_t *task, nxt_conf_value_t *conf)
+nxt_controller_conf_pass(nxt_task_t *task, nxt_conf_value_t *conf)
{
size_t size;
nxt_buf_t *b;
@@ -776,24 +867,27 @@ nxt_controller_conf_apply(nxt_task_t *task, nxt_conf_value_t *conf)
b->mem.free = nxt_conf_json_print(b->mem.free, conf, NULL);
- (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_DATA, -1, 0, 0, b);
-
- return NXT_OK;
+ return nxt_port_socket_write(task, port, NXT_PORT_MSG_DATA, -1, 0, 0, b);
}
-static nxt_int_t
-nxt_controller_response(nxt_task_t *task, nxt_conn_t *c,
+
+static void
+nxt_controller_response(nxt_task_t *task, nxt_controller_request_t *req,
nxt_controller_response_t *resp)
{
- size_t size;
- nxt_buf_t *b;
+ size_t size;
+ nxt_buf_t *b;
+ nxt_conn_t *c;
+
+ c = req->conn;
size = sizeof("HTTP/1.0 " "\r\n\r\n") - 1 + resp->status_line.length;
b = nxt_buf_mem_alloc(c->mem_pool, size, 0);
if (nxt_slow_path(b == NULL)) {
- return NXT_ERROR;
+ nxt_controller_conn_close(task, c, req);
+ return;
}
b->mem.free = nxt_cpymem(b->mem.free, "HTTP/1.0 ", sizeof("HTTP/1.0 ") - 1);
@@ -805,15 +899,14 @@ nxt_controller_response(nxt_task_t *task, nxt_conn_t *c,
b->next = nxt_controller_response_body(resp, c->mem_pool);
if (nxt_slow_path(b->next == NULL)) {
- return NXT_ERROR;
+ nxt_controller_conn_close(task, c, req);
+ return;
}
c->write = b;
c->write_state = &nxt_controller_conn_write_state;
nxt_conn_write(task->thread->engine, c);
-
- return NXT_OK;
}
diff --git a/src/nxt_master_process.c b/src/nxt_master_process.c
index c9617c9f..3b638389 100644
--- a/src/nxt_master_process.c
+++ b/src/nxt_master_process.c
@@ -145,7 +145,7 @@ nxt_master_start_controller_process(nxt_task_t *task, nxt_runtime_t *rt)
init->start = nxt_controller_start;
init->name = "controller process";
init->user_cred = &rt->user_cred;
- init->port_handlers = nxt_worker_process_port_handlers;
+ init->port_handlers = nxt_controller_process_port_handlers;
init->signals = nxt_worker_process_signals;
init->type = NXT_PROCESS_CONTROLLER;
diff --git a/src/nxt_master_process.h b/src/nxt_master_process.h
index 417c0aa5..6f7cc067 100644
--- a/src/nxt_master_process.h
+++ b/src/nxt_master_process.h
@@ -16,6 +16,7 @@ nxt_int_t nxt_controller_start(nxt_task_t *task, nxt_runtime_t *rt);
nxt_int_t nxt_router_start(nxt_task_t *task, nxt_runtime_t *rt);
+extern nxt_port_handler_t nxt_controller_process_port_handlers[];
extern nxt_port_handler_t nxt_worker_process_port_handlers[];
extern nxt_port_handler_t nxt_app_process_port_handlers[];
extern nxt_port_handler_t nxt_router_process_port_handlers[];
diff --git a/src/nxt_router.c b/src/nxt_router.c
index 28765deb..b54d1cf3 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -323,6 +323,8 @@ nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
return;
}
+ b->mem.free = nxt_cpymem(b->mem.free, start, size);
+
b->parent = tmcf->mem_pool;
b->completion_handler = nxt_router_conf_buf_completion;
diff --git a/src/nxt_runtime.h b/src/nxt_runtime.h
index 46e8e9d2..7cbd466a 100644
--- a/src/nxt_runtime.h
+++ b/src/nxt_runtime.h
@@ -132,6 +132,9 @@ nxt_port_t *nxt_runtime_port_first(nxt_runtime_t *rt,
/* STUB */
nxt_int_t nxt_runtime_controller_socket(nxt_task_t *task, nxt_runtime_t *rt);
+void nxt_port_controller_data_handler(nxt_task_t *task,
+ nxt_port_recv_msg_t *msg);
+
nxt_str_t *nxt_current_directory(nxt_mp_t *mp);
nxt_listen_socket_t *nxt_runtime_listen_socket_add(nxt_runtime_t *rt,
diff --git a/src/nxt_worker_process.c b/src/nxt_worker_process.c
index cdc1b1c5..86ee6408 100644
--- a/src/nxt_worker_process.c
+++ b/src/nxt_worker_process.c
@@ -22,6 +22,16 @@ static void nxt_worker_process_sigquit_handler(nxt_task_t *task, void *obj,
void *data);
+nxt_port_handler_t nxt_controller_process_port_handlers[] = {
+ nxt_worker_process_quit_handler,
+ nxt_port_new_port_handler,
+ nxt_port_change_log_file_handler,
+ nxt_port_mmap_handler,
+ nxt_port_controller_data_handler,
+ nxt_port_remove_pid_handler,
+};
+
+
nxt_port_handler_t nxt_worker_process_port_handlers[] = {
nxt_worker_process_quit_handler,
nxt_port_new_port_handler,