diff options
-rw-r--r-- | docs/changes.xml | 6 | ||||
-rw-r--r-- | src/nxt_controller.c | 161 | ||||
-rw-r--r-- | src/nxt_port.h | 3 | ||||
-rw-r--r-- | src/nxt_router.c | 219 | ||||
-rw-r--r-- | src/nxt_router.h | 2 | ||||
-rw-r--r-- | test/python/restart/longstart.py | 10 | ||||
-rw-r--r-- | test/python/restart/v1.py | 7 | ||||
-rw-r--r-- | test/python/restart/v2.py | 7 | ||||
-rw-r--r-- | test/test_python_procman.py | 79 | ||||
-rw-r--r-- | test/unit/applications/lang/python.py | 1 |
10 files changed, 454 insertions, 41 deletions
diff --git a/docs/changes.xml b/docs/changes.xml index 634bf9cd..ce18875a 100644 --- a/docs/changes.xml +++ b/docs/changes.xml @@ -43,6 +43,12 @@ process and thread lifecycle hooks in Ruby. </para> </change> +<change type="feature"> +<para> +application restart control. +</para> +</change> + <change type="bugfix"> <para> TLS connection was rejected for configuration with more than one diff --git a/src/nxt_controller.c b/src/nxt_controller.c index 772d10c8..779a625d 100644 --- a/src/nxt_controller.c +++ b/src/nxt_controller.c @@ -92,6 +92,10 @@ static nxt_bool_t nxt_controller_cert_in_use(nxt_str_t *name); static void nxt_controller_cert_cleanup(nxt_task_t *task, void *obj, void *data); #endif +static void nxt_controller_process_control(nxt_task_t *task, + nxt_controller_request_t *req, nxt_str_t *path); +static void nxt_controller_app_restart_handler(nxt_task_t *task, + nxt_port_recv_msg_t *msg, void *data); static void nxt_controller_conf_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data); static void nxt_controller_conf_store(nxt_task_t *task, @@ -1022,6 +1026,14 @@ nxt_controller_process_request(nxt_task_t *task, nxt_controller_request_t *req) #endif + if (nxt_str_start(&path, "/control/", 9)) { + path.length -= 9; + path.start += 9; + + nxt_controller_process_control(task, req, &path); + return; + } + nxt_memzero(&resp, sizeof(nxt_controller_response_t)); if (path.length == 1 && path.start[0] == '/') { @@ -1684,6 +1696,155 @@ nxt_controller_conf_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, static void +nxt_controller_process_control(nxt_task_t *task, + nxt_controller_request_t *req, nxt_str_t *path) +{ + uint32_t stream; + nxt_buf_t *b; + nxt_int_t rc; + nxt_port_t *router_port, *controller_port; + nxt_runtime_t *rt; + nxt_conf_value_t *value; + nxt_controller_response_t resp; + + static nxt_str_t applications = nxt_string("applications"); + + nxt_memzero(&resp, sizeof(nxt_controller_response_t)); + + if (!nxt_str_eq(&req->parser.method, "GET", 3)) { + goto not_allowed; + } + + if (!nxt_str_start(path, "applications/", 13) + || nxt_memcmp(path->start + path->length - 8, "/restart", 8) != 0) + { + goto not_found; + } + + path->start += 13; + path->length -= 13 + 8; + + if (nxt_controller_check_postpone_request(task)) { + nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link); + return; + } + + value = nxt_controller_conf.root; + if (value == NULL) { + goto not_found; + } + + value = nxt_conf_get_object_member(value, &applications, NULL); + if (value == NULL) { + goto not_found; + } + + value = nxt_conf_get_object_member(value, path, NULL); + if (value == NULL) { + goto not_found; + } + + b = nxt_buf_mem_alloc(req->conn->mem_pool, path->length, 0); + if (nxt_slow_path(b == NULL)) { + goto alloc_fail; + } + + b->mem.free = nxt_cpymem(b->mem.pos, path->start, path->length); + + rt = task->thread->runtime; + + controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER]; + router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; + + stream = nxt_port_rpc_register_handler(task, controller_port, + nxt_controller_app_restart_handler, + nxt_controller_app_restart_handler, + router_port->pid, req); + if (nxt_slow_path(stream == 0)) { + goto alloc_fail; + } + + rc = nxt_port_socket_write(task, router_port, NXT_PORT_MSG_APP_RESTART, + -1, stream, 0, b); + if (nxt_slow_path(rc != NXT_OK)) { + nxt_port_rpc_cancel(task, controller_port, stream); + + goto fail; + } + + nxt_queue_insert_head(&nxt_controller_waiting_requests, &req->link); + + return; + +not_allowed: + + resp.status = 405; + resp.title = (u_char *) "Method isn't allowed."; + resp.offset = -1; + + nxt_controller_response(task, req, &resp); + return; + +not_found: + + resp.status = 404; + resp.title = (u_char *) "Value doesn't exist."; + resp.offset = -1; + + nxt_controller_response(task, req, &resp); + return; + +alloc_fail: + + resp.status = 500; + resp.title = (u_char *) "Memory allocation failed."; + resp.offset = -1; + + nxt_controller_response(task, req, &resp); + return; + +fail: + + resp.status = 500; + resp.title = (u_char *) "Send restart failed."; + resp.offset = -1; + + nxt_controller_response(task, req, &resp); +} + + +static void +nxt_controller_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, + void *data) +{ + nxt_controller_request_t *req; + nxt_controller_response_t resp; + + req = data; + + nxt_debug(task, "controller app restart handler"); + + nxt_queue_remove(&req->link); + + nxt_memzero(&resp, sizeof(nxt_controller_response_t)); + + if (msg->port_msg.type == NXT_PORT_MSG_RPC_READY) { + resp.status = 200; + resp.title = (u_char *) "Ok"; + + } else { + resp.status = 500; + resp.title = (u_char *) "Failed to restart app."; + resp.offset = -1; + } + + nxt_controller_response(task, req, &resp); + + nxt_controller_flush_requests(task); +} + + +static void nxt_controller_conf_store(nxt_task_t *task, nxt_conf_value_t *conf) { void *mem; diff --git a/src/nxt_port.h b/src/nxt_port.h index 5ece3bfa..a0bc2512 100644 --- a/src/nxt_port.h +++ b/src/nxt_port.h @@ -50,6 +50,7 @@ struct nxt_port_handlers_s { /* Various data. */ nxt_port_handler_t data; + nxt_port_handler_t app_restart; nxt_port_handler_t oosm; nxt_port_handler_t shm_ack; @@ -100,6 +101,7 @@ typedef enum { _NXT_PORT_MSG_WEBSOCKET = nxt_port_handler_idx(websocket_frame), _NXT_PORT_MSG_DATA = nxt_port_handler_idx(data), + _NXT_PORT_MSG_APP_RESTART = nxt_port_handler_idx(app_restart), _NXT_PORT_MSG_OOSM = nxt_port_handler_idx(oosm), _NXT_PORT_MSG_SHM_ACK = nxt_port_handler_idx(shm_ack), @@ -139,6 +141,7 @@ typedef enum { NXT_PORT_MSG_DATA = _NXT_PORT_MSG_DATA, NXT_PORT_MSG_DATA_LAST = nxt_msg_last(_NXT_PORT_MSG_DATA), + NXT_PORT_MSG_APP_RESTART = nxt_msg_last(_NXT_PORT_MSG_APP_RESTART), NXT_PORT_MSG_OOSM = nxt_msg_last(_NXT_PORT_MSG_OOSM), NXT_PORT_MSG_SHM_ACK = nxt_msg_last(_NXT_PORT_MSG_SHM_ACK), diff --git a/src/nxt_router.c b/src/nxt_router.c index c766c25e..8360e75a 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -18,6 +18,8 @@ #include <nxt_app_queue.h> #include <nxt_port_queue.h> +#define NXT_SHARED_PORT_ID 0xFFFFu + typedef struct { nxt_str_t type; uint32_t processes; @@ -67,6 +69,12 @@ typedef struct { } nxt_app_rpc_t; +typedef struct { + nxt_app_joint_t *app_joint; + uint32_t generation; +} nxt_app_joint_rpc_t; + + static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp); static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data); @@ -79,6 +87,8 @@ static void nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); static void nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); +static void nxt_router_app_restart_handler(nxt_task_t *task, + nxt_port_recv_msg_t *msg); static void nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); static void nxt_router_access_log_reopen_handler(nxt_task_t *task, @@ -281,6 +291,7 @@ static const nxt_port_handlers_t nxt_router_process_port_handlers = { .mmap = nxt_port_mmap_handler, .get_mmap = nxt_router_get_mmap_handler, .data = nxt_router_conf_data_handler, + .app_restart = nxt_router_app_restart_handler, .remove_pid = nxt_router_remove_pid_handler, .access_log = nxt_router_access_log_reopen_handler, .rpc_ready = nxt_port_rpc_handler, @@ -379,14 +390,15 @@ static void nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, void *data) { - size_t size; - uint32_t stream; - nxt_mp_t *mp; - nxt_int_t ret; - nxt_app_t *app; - nxt_buf_t *b; - nxt_port_t *main_port; - nxt_runtime_t *rt; + size_t size; + uint32_t stream; + nxt_mp_t *mp; + nxt_int_t ret; + nxt_app_t *app; + nxt_buf_t *b; + nxt_port_t *main_port; + nxt_runtime_t *rt; + nxt_app_joint_rpc_t *app_joint_rpc; app = data; @@ -407,30 +419,29 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, *b->mem.free++ = '\0'; nxt_buf_cpystr(b, &app->conf); - nxt_router_app_joint_use(task, app->joint, 1); - - stream = nxt_port_rpc_register_handler(task, port, - nxt_router_app_port_ready, - nxt_router_app_port_error, - -1, app->joint); - - if (nxt_slow_path(stream == 0)) { - nxt_router_app_joint_use(task, app->joint, -1); - + app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port, + nxt_router_app_port_ready, + nxt_router_app_port_error, + sizeof(nxt_app_joint_rpc_t)); + if (nxt_slow_path(app_joint_rpc == NULL)) { goto failed; } + stream = nxt_port_rpc_ex_stream(app_joint_rpc); + ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS, -1, stream, port->id, b); - if (nxt_slow_path(ret != NXT_OK)) { nxt_port_rpc_cancel(task, port, stream); - nxt_router_app_joint_use(task, app->joint, -1); - goto failed; } + app_joint_rpc->app_joint = app->joint; + app_joint_rpc->generation = app->generation; + + nxt_router_app_joint_use(task, app->joint, 1); + nxt_router_app_use(task, app, -1); return; @@ -504,6 +515,7 @@ nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data) { nxt_buf_t *b, *next; nxt_bool_t cancelled; + nxt_port_t *app_port; nxt_msg_info_t *msg_info; msg_info = &req_rpc_data->msg_info; @@ -512,13 +524,20 @@ nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data) return 0; } - cancelled = nxt_app_queue_cancel(req_rpc_data->app->shared_port->queue, - msg_info->tracking_cookie, - req_rpc_data->stream); + app_port = req_rpc_data->app_port; + + if (app_port != NULL && app_port->id == NXT_SHARED_PORT_ID) { + cancelled = nxt_app_queue_cancel(app_port->queue, + msg_info->tracking_cookie, + req_rpc_data->stream); + + if (cancelled) { + nxt_debug(task, "stream #%uD: cancelled by router", + req_rpc_data->stream); + } - if (cancelled) { - nxt_debug(task, "stream #%uD: cancelled by router", - req_rpc_data->stream); + } else { + cancelled = 0; } for (b = msg_info->buf; b != NULL; b = next) { @@ -794,6 +813,90 @@ cleanup: static void +nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + nxt_app_t *app; + nxt_int_t ret; + nxt_str_t app_name; + nxt_port_t *port, *reply_port, *shared_port, *old_shared_port; + nxt_port_msg_type_t reply; + + reply_port = nxt_runtime_port_find(task->thread->runtime, + msg->port_msg.pid, + msg->port_msg.reply_port); + if (nxt_slow_path(reply_port == NULL)) { + nxt_alert(task, "app_restart_handler: reply port not found"); + return; + } + + app_name.length = nxt_buf_mem_used_size(&msg->buf->mem); + app_name.start = msg->buf->mem.pos; + + nxt_debug(task, "app_restart_handler: %V", &app_name); + + app = nxt_router_app_find(&nxt_router->apps, &app_name); + + if (nxt_fast_path(app != NULL)) { + shared_port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid, + NXT_PROCESS_APP); + if (nxt_slow_path(shared_port == NULL)) { + goto fail; + } + + ret = nxt_port_socket_init(task, shared_port, 0); + if (nxt_slow_path(ret != NXT_OK)) { + nxt_port_use(task, shared_port, -1); + goto fail; + } + + ret = nxt_router_app_queue_init(task, shared_port); + if (nxt_slow_path(ret != NXT_OK)) { + nxt_port_write_close(shared_port); + nxt_port_read_close(shared_port); + nxt_port_use(task, shared_port, -1); + goto fail; + } + + nxt_port_write_enable(task, shared_port); + + nxt_thread_mutex_lock(&app->mutex); + + nxt_queue_each(port, &app->ports, nxt_port_t, app_link) { + + (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, + 0, 0, NULL); + + } nxt_queue_loop; + + app->generation++; + + shared_port->app = app; + + old_shared_port = app->shared_port; + old_shared_port->app = NULL; + + app->shared_port = shared_port; + + nxt_thread_mutex_unlock(&app->mutex); + + nxt_port_close(task, old_shared_port); + nxt_port_use(task, old_shared_port, -1); + + reply = NXT_PORT_MSG_RPC_READY_LAST; + + } else { + +fail: + + reply = NXT_PORT_MSG_RPC_ERROR; + } + + nxt_port_socket_write(task, reply_port, reply, -1, msg->port_msg.stream, + 0, NULL); +} + + +static void nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port, void *data) { @@ -1607,7 +1710,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, app_joint->free_app_work.task = &engine->task; app_joint->free_app_work.obj = app_joint; - port = nxt_port_new(task, (nxt_port_id_t) -1, nxt_pid, + port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid, NXT_PROCESS_APP); if (nxt_slow_path(port == NULL)) { return NXT_ERROR; @@ -4233,11 +4336,16 @@ static void nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) { - nxt_app_t *app; - nxt_port_t *port; - nxt_app_joint_t *app_joint; + nxt_app_t *app; + nxt_bool_t start_process; + nxt_port_t *port; + nxt_app_joint_t *app_joint; + nxt_app_joint_rpc_t *app_joint_rpc; - app_joint = data; + nxt_assert(data != NULL); + + app_joint_rpc = data; + app_joint = app_joint_rpc->app_joint; port = msg->u.new_port; nxt_assert(app_joint != NULL); @@ -4257,14 +4365,37 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, return; } - port->app = app; - port->main_app_port = port; - nxt_thread_mutex_lock(&app->mutex); nxt_assert(app->pending_processes != 0); app->pending_processes--; + + if (nxt_slow_path(app->generation != app_joint_rpc->generation)) { + nxt_debug(task, "new port ready for restarted app, send QUIT"); + + start_process = !task->thread->engine->shutdown + && nxt_router_app_can_start(app) + && nxt_router_app_need_start(app); + + if (start_process) { + app->pending_processes++; + } + + nxt_thread_mutex_unlock(&app->mutex); + + nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); + + if (start_process) { + nxt_router_start_app_process(task, app); + } + + return; + } + + port->app = app; + port->main_app_port = port; + app->processes++; nxt_port_hash_add(&app->port_hash, port); app->port_hash_count++; @@ -4318,12 +4449,16 @@ static void nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) { - nxt_app_t *app; - nxt_app_joint_t *app_joint; - nxt_queue_link_t *link; - nxt_http_request_t *r; + nxt_app_t *app; + nxt_app_joint_t *app_joint; + nxt_queue_link_t *link; + nxt_http_request_t *r; + nxt_app_joint_rpc_t *app_joint_rpc; + + nxt_assert(data != NULL); - app_joint = data; + app_joint_rpc = data; + app_joint = app_joint_rpc->app_joint; nxt_assert(app_joint != NULL); @@ -4490,7 +4625,7 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, port->pid, port->id, (int) inc_use, (int) got_response); - if (port == app->shared_port) { + if (port->id == NXT_SHARED_PORT_ID) { nxt_thread_mutex_lock(&app->mutex); app->active_requests -= got_response + dec_requests; @@ -4860,6 +4995,8 @@ nxt_router_free_app(nxt_task_t *task, void *obj, void *data) app->shared_port->app = NULL; nxt_port_close(task, app->shared_port); nxt_port_use(task, app->shared_port, -1); + + app->shared_port = NULL; } nxt_thread_mutex_destroy(&app->mutex); diff --git a/src/nxt_router.h b/src/nxt_router.h index 9f26a622..6611cf45 100644 --- a/src/nxt_router.h +++ b/src/nxt_router.h @@ -125,6 +125,8 @@ struct nxt_app_s { uint32_t max_pending_processes; uint32_t max_requests; + uint32_t generation; + nxt_msec_t timeout; nxt_msec_t idle_timeout; diff --git a/test/python/restart/longstart.py b/test/python/restart/longstart.py new file mode 100644 index 00000000..777398ac --- /dev/null +++ b/test/python/restart/longstart.py @@ -0,0 +1,10 @@ +import os +import time + +time.sleep(2) + +def application(environ, start_response): + body = str(os.getpid()).encode() + + start_response('200', [('Content-Length', str(len(body)))]) + return [body] diff --git a/test/python/restart/v1.py b/test/python/restart/v1.py new file mode 100644 index 00000000..2e45b269 --- /dev/null +++ b/test/python/restart/v1.py @@ -0,0 +1,7 @@ +import os + +def application(environ, start_response): + body = "v1".encode() + + start_response('200', [('Content-Length', str(len(body)))]) + return [body] diff --git a/test/python/restart/v2.py b/test/python/restart/v2.py new file mode 100644 index 00000000..59e3d30f --- /dev/null +++ b/test/python/restart/v2.py @@ -0,0 +1,7 @@ +import os + +def application(environ, start_response): + body = "v2".encode() + + start_response('200', [('Content-Length', str(len(body)))]) + return [body] diff --git a/test/test_python_procman.py b/test/test_python_procman.py index b0d0f5af..a95c5680 100644 --- a/test/test_python_procman.py +++ b/test/test_python_procman.py @@ -1,4 +1,5 @@ import re +import shutil import subprocess import time @@ -201,3 +202,81 @@ class TestPythonProcman(TestApplicationPython): assert 'success' in self.conf({"listeners": {}, "applications": {}}) assert len(self.pids_for_process()) == 0, 'stop all' + + def test_python_restart(self, temp_dir): + shutil.copyfile( + option.test_dir + '/python/restart/v1.py', temp_dir + '/wsgi.py' + ) + + self.load( + temp_dir, + name=self.app_name, + processes=1, + environment={'PYTHONDONTWRITEBYTECODE': '1'}, + ) + + b = self.get()['body'] + assert b == "v1", 'process started' + + shutil.copyfile( + option.test_dir + '/python/restart/v2.py', temp_dir + '/wsgi.py' + ) + + b = self.get()['body'] + assert b == "v1", 'still old process' + + assert 'success' in self.conf_get( + '/control/applications/' + self.app_name + '/restart' + ), 'restart processes' + + b = self.get()['body'] + assert b == "v2", 'new process started' + + assert 'error' in self.conf_get( + '/control/applications/blah/restart' + ), 'application incorrect' + + assert 'error' in self.conf_delete( + '/control/applications/' + self.app_name + '/restart' + ), 'method incorrect' + + def test_python_restart_multi(self): + self.conf_proc('2') + + pids = self.pids_for_process() + assert len(pids) == 2, 'restart 2 started' + + assert 'success' in self.conf_get( + '/control/applications/' + self.app_name + '/restart' + ), 'restart processes' + + new_pids = self.pids_for_process() + assert len(new_pids) == 2, 'restart still 2' + + assert len(new_pids.intersection(pids)) == 0, 'restart all new' + + def test_python_restart_longstart(self): + self.load( + 'restart', + name=self.app_name, + module="longstart", + processes={"spare": 1, "max": 2, "idle_timeout": 5}, + ) + + assert len(self.pids_for_process()) == 1, 'longstarts == 1' + + pid = self.get()['body'] + pids = self.pids_for_process() + assert len(pids) == 2, 'longstarts == 2' + + assert 'success' in self.conf_get( + '/control/applications/' + self.app_name + '/restart' + ), 'restart processes' + + # wait for longstarted app + time.sleep(2) + + new_pids = self.pids_for_process() + assert len(new_pids) == 1, 'restart 1' + + assert len(new_pids.intersection(pids)) == 0, 'restart all new' diff --git a/test/unit/applications/lang/python.py b/test/unit/applications/lang/python.py index b399dffd..215aa332 100644 --- a/test/unit/applications/lang/python.py +++ b/test/unit/applications/lang/python.py @@ -44,6 +44,7 @@ class TestApplicationPython(TestApplicationProto): for attr in ( 'callable', + 'environment', 'home', 'limits', 'path', |