summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/nxt_controller.c130
1 files changed, 49 insertions, 81 deletions
diff --git a/src/nxt_controller.c b/src/nxt_controller.c
index 313daf00..d03025b8 100644
--- a/src/nxt_controller.c
+++ b/src/nxt_controller.c
@@ -63,9 +63,8 @@ static void nxt_controller_process_request(nxt_task_t *task,
nxt_controller_request_t *req);
static nxt_int_t nxt_controller_conf_apply(nxt_task_t *task,
nxt_controller_request_t *req);
-static void nxt_controller_process_waiting(nxt_task_t *task);
-static nxt_int_t nxt_controller_conf_pass(nxt_task_t *task,
- nxt_conf_value_t *conf);
+static void nxt_controller_conf_handler(nxt_task_t *task,
+ nxt_port_recv_msg_t *msg, void *data);
static void nxt_controller_response(nxt_task_t *task,
nxt_controller_request_t *req, nxt_controller_response_t *resp);
static u_char *nxt_controller_date(u_char *buf, nxt_realtime_t *now,
@@ -81,10 +80,8 @@ static nxt_http_fields_hash_entry_t nxt_controller_request_fields[] = {
static nxt_http_fields_hash_t *nxt_controller_fields_hash;
-
-static nxt_controller_conf_t nxt_controller_conf;
-static nxt_queue_t nxt_controller_waiting_requests;
-static nxt_controller_request_t *nxt_controller_current_request;
+static nxt_controller_conf_t nxt_controller_conf;
+static nxt_queue_t nxt_controller_waiting_requests;
static const nxt_event_conn_state_t nxt_controller_conn_read_state;
@@ -584,6 +581,11 @@ nxt_controller_process_request(nxt_task_t *task, nxt_controller_request_t *req)
if (nxt_str_eq(&req->parser.method, "PUT", 3)) {
+ if (!nxt_queue_is_empty(&nxt_controller_waiting_requests)) {
+ nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link);
+ return;
+ }
+
mp = nxt_mp_create(1024, 128, 256, 32);
if (nxt_slow_path(mp == NULL)) {
@@ -654,6 +656,11 @@ nxt_controller_process_request(nxt_task_t *task, nxt_controller_request_t *req)
if (nxt_str_eq(&req->parser.method, "DELETE", 6)) {
+ if (!nxt_queue_is_empty(&nxt_controller_waiting_requests)) {
+ nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link);
+ return;
+ }
+
if (path.length == 1) {
mp = nxt_mp_create(1024, 128, 256, 32);
@@ -745,20 +752,38 @@ invalid_conf:
static nxt_int_t
nxt_controller_conf_apply(nxt_task_t *task, nxt_controller_request_t *req)
{
- nxt_int_t rc;
+ size_t size;
+ uint32_t stream;
+ nxt_int_t rc;
+ nxt_buf_t *b;
+ nxt_port_t *router_port, *controller_port;
+ nxt_runtime_t *rt;
- if (nxt_controller_current_request != NULL) {
- nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link);
- return NXT_OK;
- }
+ rt = task->thread->runtime;
+
+ router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
+ controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
+
+ size = nxt_conf_json_length(req->conf.root, NULL);
+
+ b = nxt_port_mmap_get_buf(task, router_port, size);
- rc = nxt_controller_conf_pass(task, req->conf.root);
+ b->mem.free = nxt_conf_json_print(b->mem.free, req->conf.root, NULL);
+
+ stream = nxt_port_rpc_register_handler(task, controller_port,
+ nxt_controller_conf_handler,
+ nxt_controller_conf_handler,
+ router_port->pid, req);
+
+ rc = nxt_port_socket_write(task, router_port, NXT_PORT_MSG_DATA_LAST, -1,
+ stream, controller_port->id, b);
if (nxt_slow_path(rc != NXT_OK)) {
+ nxt_port_rpc_cancel(task, controller_port, stream);
return NXT_ERROR;
}
- nxt_controller_current_request = req;
+ nxt_queue_insert_head(&nxt_controller_waiting_requests, &req->link);
return NXT_OK;
}
@@ -768,16 +793,18 @@ static void
nxt_controller_conf_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
void *data)
{
+ nxt_queue_t queue;
nxt_controller_request_t *req;
nxt_controller_response_t resp;
+ req = data;
+
nxt_debug(task, "controller conf ready: %*s",
nxt_buf_mem_used_size(&msg->buf->mem), msg->buf->mem.pos);
- nxt_memzero(&resp, sizeof(nxt_controller_response_t));
+ nxt_queue_remove(&req->link);
- req = nxt_controller_current_request;
- nxt_controller_current_request = NULL;
+ nxt_memzero(&resp, sizeof(nxt_controller_response_t));
if (msg->port_msg.type == NXT_PORT_MSG_RPC_READY) {
nxt_mp_destroy(nxt_controller_conf.pool);
@@ -797,76 +824,17 @@ nxt_controller_conf_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_controller_response(task, req, &resp);
- nxt_controller_process_waiting(task);
-}
-
-
-static void
-nxt_controller_process_waiting(nxt_task_t *task)
-{
- nxt_controller_request_t *req;
- nxt_controller_response_t resp;
+ nxt_queue_init(&queue);
+ nxt_queue_add(&queue, &nxt_controller_waiting_requests);
- nxt_queue_each(req, &nxt_controller_waiting_requests,
- nxt_controller_request_t, link)
- {
- nxt_queue_remove(&req->link);
-
- if (nxt_fast_path(nxt_controller_conf_apply(task, req) == NXT_OK)) {
- return;
- }
-
- nxt_mp_destroy(req->conf.pool);
-
- nxt_memzero(&resp, sizeof(nxt_controller_response_t));
-
- resp.status = 500;
- resp.title = (u_char *) "Memory allocation failed.";
- resp.offset = -1;
-
- nxt_controller_response(task, req, &resp);
+ nxt_queue_init(&nxt_controller_waiting_requests);
+ nxt_queue_each(req, &queue, nxt_controller_request_t, link) {
+ nxt_controller_process_request(task, req);
} nxt_queue_loop;
}
-static nxt_int_t
-nxt_controller_conf_pass(nxt_task_t *task, nxt_conf_value_t *conf)
-{
- size_t size;
- uint32_t stream;
- nxt_int_t rc;
- nxt_buf_t *b;
- nxt_port_t *router_port, *controller_port;
- nxt_runtime_t *rt;
-
- rt = task->thread->runtime;
-
- router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
- controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
-
- size = nxt_conf_json_length(conf, NULL);
-
- b = nxt_port_mmap_get_buf(task, router_port, size);
-
- b->mem.free = nxt_conf_json_print(b->mem.free, conf, NULL);
-
- stream = nxt_port_rpc_register_handler(task, controller_port,
- nxt_controller_conf_handler,
- nxt_controller_conf_handler,
- router_port->pid, NULL);
-
- rc = nxt_port_socket_write(task, router_port, NXT_PORT_MSG_DATA_LAST, -1,
- stream, controller_port->id, b);
-
- if (nxt_slow_path(rc != NXT_OK)) {
- nxt_port_rpc_cancel(task, controller_port, stream);
- }
-
- return rc;
-}
-
-
static void
nxt_controller_response(nxt_task_t *task, nxt_controller_request_t *req,
nxt_controller_response_t *resp)