diff options
Diffstat (limited to '')
-rw-r--r-- | src/nxt_controller.c | 179 | ||||
-rw-r--r-- | src/nxt_master_process.c | 2 | ||||
-rw-r--r-- | src/nxt_master_process.h | 1 | ||||
-rw-r--r-- | src/nxt_router.c | 2 | ||||
-rw-r--r-- | src/nxt_runtime.h | 3 | ||||
-rw-r--r-- | src/nxt_worker_process.c | 10 |
6 files changed, 153 insertions, 44 deletions
diff --git a/src/nxt_controller.c b/src/nxt_controller.c index 85454550..4cf6ff95 100644 --- a/src/nxt_controller.c +++ b/src/nxt_controller.c @@ -20,8 +20,9 @@ typedef struct { typedef struct { nxt_http_request_parse_t parser; size_t length; - nxt_controller_conf_t conf; + nxt_conn_t *conn; + nxt_queue_link_t link; } nxt_controller_request_t; @@ -54,11 +55,14 @@ static nxt_int_t nxt_controller_request_content_length(void *ctx, nxt_http_field_t *field, nxt_log_t *log); static void nxt_controller_process_request(nxt_task_t *task, - nxt_conn_t *c, nxt_controller_request_t *r); + nxt_controller_request_t *req); static nxt_int_t nxt_controller_conf_apply(nxt_task_t *task, + nxt_controller_request_t *req); +static void nxt_controller_process_waiting(nxt_task_t *task); +static nxt_int_t nxt_controller_conf_pass(nxt_task_t *task, nxt_conf_value_t *conf); -static nxt_int_t nxt_controller_response(nxt_task_t *task, nxt_conn_t *c, - nxt_controller_response_t *resp); +static void nxt_controller_response(nxt_task_t *task, + nxt_controller_request_t *req, nxt_controller_response_t *resp); static nxt_buf_t *nxt_controller_response_body(nxt_controller_response_t *resp, nxt_mp_t *pool); @@ -73,7 +77,9 @@ static nxt_http_fields_hash_entry_t nxt_controller_request_fields[] = { static nxt_http_fields_hash_t *nxt_controller_fields_hash; -static nxt_controller_conf_t nxt_controller_conf; +static nxt_controller_conf_t nxt_controller_conf; +static nxt_queue_t nxt_controller_waiting_requests; +static nxt_controller_request_t *nxt_controller_current_request; static const nxt_event_conn_state_t nxt_controller_conn_read_state; @@ -119,6 +125,8 @@ nxt_controller_start(nxt_task_t *task, nxt_runtime_t *rt) nxt_controller_conf.root = conf; nxt_controller_conf.pool = mp; + nxt_queue_init(&nxt_controller_waiting_requests); + return NXT_OK; } @@ -210,6 +218,8 @@ nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data) return; } + r->conn = c; + if (nxt_slow_path(nxt_http_parse_request_init(&r->parser, c->mem_pool) != NXT_OK)) { @@ -307,7 +317,7 @@ nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data) r->length, preread); if (preread >= r->length) { - nxt_controller_process_request(task, c, r); + nxt_controller_process_request(task, r); return; } @@ -399,7 +409,7 @@ nxt_controller_conn_body_read(nxt_task_t *task, void *obj, void *data) read, r->length); if (read >= r->length) { - nxt_controller_process_request(task, c, data); + nxt_controller_process_request(task, r); return; } @@ -542,12 +552,12 @@ nxt_controller_request_content_length(void *ctx, nxt_http_field_t *field, static void -nxt_controller_process_request(nxt_task_t *task, nxt_conn_t *c, - nxt_controller_request_t *req) +nxt_controller_process_request(nxt_task_t *task, nxt_controller_request_t *req) { nxt_mp_t *mp; nxt_int_t rc; nxt_str_t path; + nxt_conn_t *c; nxt_uint_t status; nxt_buf_mem_t *mbuf; nxt_conf_op_t *ops; @@ -556,6 +566,7 @@ nxt_controller_process_request(nxt_task_t *task, nxt_conn_t *c, static const nxt_str_t empty_obj = nxt_string("{}"); + c = req->conn; path = req->parser.path; if (path.length > 1 && path.start[path.length - 1] == '/') { @@ -631,21 +642,16 @@ nxt_controller_process_request(nxt_task_t *task, nxt_conn_t *c, goto done; } - if (nxt_slow_path(nxt_controller_conf_apply(task, value) != NXT_OK)) { + req->conf.root = value; + req->conf.pool = mp; + + if (nxt_controller_conf_apply(task, req) != NXT_OK) { nxt_mp_destroy(mp); status = 500; goto done; } - nxt_mp_destroy(nxt_controller_conf.pool); - - nxt_controller_conf.root = value; - nxt_controller_conf.pool = mp; - - nxt_str_set(&resp.json, "{ \"success\": \"Updated.\" }"); - - status = 200; - goto done; + return; } if (nxt_str_eq(&req->parser.method, "DELETE", 6)) { @@ -699,21 +705,16 @@ nxt_controller_process_request(nxt_task_t *task, nxt_conn_t *c, goto done; } - if (nxt_slow_path(nxt_controller_conf_apply(task, value) != NXT_OK)) { + req->conf.root = value; + req->conf.pool = mp; + + if (nxt_controller_conf_apply(task, req) != NXT_OK) { nxt_mp_destroy(mp); status = 500; goto done; } - nxt_mp_destroy(nxt_controller_conf.pool); - - nxt_controller_conf.root = value; - nxt_controller_conf.pool = mp; - - nxt_str_set(&resp.json, "{ \"success\": \"Deleted.\" }"); - - status = 200; - goto done; + return; } status = 405; @@ -746,14 +747,104 @@ done: break; } - if (nxt_controller_response(task, c, &resp) != NXT_OK) { - nxt_controller_conn_close(task, c, req); + nxt_controller_response(task, req, &resp); +} + + +static nxt_int_t +nxt_controller_conf_apply(nxt_task_t *task, nxt_controller_request_t *req) +{ + nxt_int_t rc; + + if (nxt_controller_current_request != NULL) { + nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link); + return NXT_OK; + } + + rc = nxt_controller_conf_pass(task, req->conf.root); + + if (nxt_slow_path(rc != NXT_OK)) { + return NXT_ERROR; } + + nxt_controller_current_request = req; + + return NXT_OK; +} + + +void +nxt_port_controller_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + size_t size, dump_size; + nxt_buf_t *b; + nxt_controller_request_t *req; + nxt_controller_response_t resp; + + b = msg->buf; + size = b->mem.free - b->mem.pos; + + dump_size = size > 300 ? 300 : size; + + nxt_debug(task, "contoller data: %*s ...", dump_size, b->mem.pos); + + nxt_memzero(&resp, sizeof(nxt_controller_response_t)); + + req = nxt_controller_current_request; + nxt_controller_current_request = NULL; + + if (size == 2 && nxt_memcmp(b->mem.pos, "OK", 2) == 0) { + + nxt_mp_destroy(nxt_controller_conf.pool); + + nxt_controller_conf = req->conf; + + nxt_str_set(&resp.status_line, "200 OK"); + nxt_str_set(&resp.json, "{ \"success\": \"Reconfiguration done.\" }"); + + } else { + nxt_mp_destroy(req->conf.pool); + + nxt_str_set(&resp.status_line, "500 Internal Server Error"); + nxt_str_set(&resp.json, + "{ \"error\": \"Failed to apply new configuration.\" }"); + } + + nxt_controller_response(task, req, &resp); + + nxt_controller_process_waiting(task); +} + + +static void +nxt_controller_process_waiting(nxt_task_t *task) +{ + nxt_controller_request_t *req; + nxt_controller_response_t resp; + + nxt_queue_each(req, &nxt_controller_waiting_requests, + nxt_controller_request_t, link) + { + nxt_queue_remove(&req->link); + + if (nxt_fast_path(nxt_controller_conf_apply(task, req) == NXT_OK)) { + return; + } + + nxt_mp_destroy(req->conf.pool); + + nxt_str_set(&resp.status_line, "500 Internal Server Error"); + nxt_str_set(&resp.json, + "{ \"error\": \"Memory allocation failed.\" }"); + + nxt_controller_response(task, req, &resp); + + } nxt_queue_loop; } static nxt_int_t -nxt_controller_conf_apply(nxt_task_t *task, nxt_conf_value_t *conf) +nxt_controller_conf_pass(nxt_task_t *task, nxt_conf_value_t *conf) { size_t size; nxt_buf_t *b; @@ -776,24 +867,27 @@ nxt_controller_conf_apply(nxt_task_t *task, nxt_conf_value_t *conf) b->mem.free = nxt_conf_json_print(b->mem.free, conf, NULL); - (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_DATA, -1, 0, 0, b); - - return NXT_OK; + return nxt_port_socket_write(task, port, NXT_PORT_MSG_DATA, -1, 0, 0, b); } -static nxt_int_t -nxt_controller_response(nxt_task_t *task, nxt_conn_t *c, + +static void +nxt_controller_response(nxt_task_t *task, nxt_controller_request_t *req, nxt_controller_response_t *resp) { - size_t size; - nxt_buf_t *b; + size_t size; + nxt_buf_t *b; + nxt_conn_t *c; + + c = req->conn; size = sizeof("HTTP/1.0 " "\r\n\r\n") - 1 + resp->status_line.length; b = nxt_buf_mem_alloc(c->mem_pool, size, 0); if (nxt_slow_path(b == NULL)) { - return NXT_ERROR; + nxt_controller_conn_close(task, c, req); + return; } b->mem.free = nxt_cpymem(b->mem.free, "HTTP/1.0 ", sizeof("HTTP/1.0 ") - 1); @@ -805,15 +899,14 @@ nxt_controller_response(nxt_task_t *task, nxt_conn_t *c, b->next = nxt_controller_response_body(resp, c->mem_pool); if (nxt_slow_path(b->next == NULL)) { - return NXT_ERROR; + nxt_controller_conn_close(task, c, req); + return; } c->write = b; c->write_state = &nxt_controller_conn_write_state; nxt_conn_write(task->thread->engine, c); - - return NXT_OK; } diff --git a/src/nxt_master_process.c b/src/nxt_master_process.c index c9617c9f..3b638389 100644 --- a/src/nxt_master_process.c +++ b/src/nxt_master_process.c @@ -145,7 +145,7 @@ nxt_master_start_controller_process(nxt_task_t *task, nxt_runtime_t *rt) init->start = nxt_controller_start; init->name = "controller process"; init->user_cred = &rt->user_cred; - init->port_handlers = nxt_worker_process_port_handlers; + init->port_handlers = nxt_controller_process_port_handlers; init->signals = nxt_worker_process_signals; init->type = NXT_PROCESS_CONTROLLER; diff --git a/src/nxt_master_process.h b/src/nxt_master_process.h index 417c0aa5..6f7cc067 100644 --- a/src/nxt_master_process.h +++ b/src/nxt_master_process.h @@ -16,6 +16,7 @@ nxt_int_t nxt_controller_start(nxt_task_t *task, nxt_runtime_t *rt); nxt_int_t nxt_router_start(nxt_task_t *task, nxt_runtime_t *rt); +extern nxt_port_handler_t nxt_controller_process_port_handlers[]; extern nxt_port_handler_t nxt_worker_process_port_handlers[]; extern nxt_port_handler_t nxt_app_process_port_handlers[]; extern nxt_port_handler_t nxt_router_process_port_handlers[]; diff --git a/src/nxt_router.c b/src/nxt_router.c index 28765deb..b54d1cf3 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -323,6 +323,8 @@ nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, return; } + b->mem.free = nxt_cpymem(b->mem.free, start, size); + b->parent = tmcf->mem_pool; b->completion_handler = nxt_router_conf_buf_completion; diff --git a/src/nxt_runtime.h b/src/nxt_runtime.h index 46e8e9d2..7cbd466a 100644 --- a/src/nxt_runtime.h +++ b/src/nxt_runtime.h @@ -132,6 +132,9 @@ nxt_port_t *nxt_runtime_port_first(nxt_runtime_t *rt, /* STUB */ nxt_int_t nxt_runtime_controller_socket(nxt_task_t *task, nxt_runtime_t *rt); +void nxt_port_controller_data_handler(nxt_task_t *task, + nxt_port_recv_msg_t *msg); + nxt_str_t *nxt_current_directory(nxt_mp_t *mp); nxt_listen_socket_t *nxt_runtime_listen_socket_add(nxt_runtime_t *rt, diff --git a/src/nxt_worker_process.c b/src/nxt_worker_process.c index cdc1b1c5..86ee6408 100644 --- a/src/nxt_worker_process.c +++ b/src/nxt_worker_process.c @@ -22,6 +22,16 @@ static void nxt_worker_process_sigquit_handler(nxt_task_t *task, void *obj, void *data); +nxt_port_handler_t nxt_controller_process_port_handlers[] = { + nxt_worker_process_quit_handler, + nxt_port_new_port_handler, + nxt_port_change_log_file_handler, + nxt_port_mmap_handler, + nxt_port_controller_data_handler, + nxt_port_remove_pid_handler, +}; + + nxt_port_handler_t nxt_worker_process_port_handlers[] = { nxt_worker_process_quit_handler, nxt_port_new_port_handler, |