summaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/nxt_controller.c161
-rw-r--r--src/nxt_port.h3
-rw-r--r--src/nxt_router.c219
-rw-r--r--src/nxt_router.h2
4 files changed, 344 insertions, 41 deletions
diff --git a/src/nxt_controller.c b/src/nxt_controller.c
index 772d10c8..779a625d 100644
--- a/src/nxt_controller.c
+++ b/src/nxt_controller.c
@@ -92,6 +92,10 @@ static nxt_bool_t nxt_controller_cert_in_use(nxt_str_t *name);
static void nxt_controller_cert_cleanup(nxt_task_t *task, void *obj,
void *data);
#endif
+static void nxt_controller_process_control(nxt_task_t *task,
+ nxt_controller_request_t *req, nxt_str_t *path);
+static void nxt_controller_app_restart_handler(nxt_task_t *task,
+ nxt_port_recv_msg_t *msg, void *data);
static void nxt_controller_conf_handler(nxt_task_t *task,
nxt_port_recv_msg_t *msg, void *data);
static void nxt_controller_conf_store(nxt_task_t *task,
@@ -1022,6 +1026,14 @@ nxt_controller_process_request(nxt_task_t *task, nxt_controller_request_t *req)
#endif
+ if (nxt_str_start(&path, "/control/", 9)) {
+ path.length -= 9;
+ path.start += 9;
+
+ nxt_controller_process_control(task, req, &path);
+ return;
+ }
+
nxt_memzero(&resp, sizeof(nxt_controller_response_t));
if (path.length == 1 && path.start[0] == '/') {
@@ -1684,6 +1696,155 @@ nxt_controller_conf_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
static void
+nxt_controller_process_control(nxt_task_t *task,
+ nxt_controller_request_t *req, nxt_str_t *path)
+{
+ uint32_t stream;
+ nxt_buf_t *b;
+ nxt_int_t rc;
+ nxt_port_t *router_port, *controller_port;
+ nxt_runtime_t *rt;
+ nxt_conf_value_t *value;
+ nxt_controller_response_t resp;
+
+ static nxt_str_t applications = nxt_string("applications");
+
+ nxt_memzero(&resp, sizeof(nxt_controller_response_t));
+
+ if (!nxt_str_eq(&req->parser.method, "GET", 3)) {
+ goto not_allowed;
+ }
+
+ if (!nxt_str_start(path, "applications/", 13)
+ || nxt_memcmp(path->start + path->length - 8, "/restart", 8) != 0)
+ {
+ goto not_found;
+ }
+
+ path->start += 13;
+ path->length -= 13 + 8;
+
+ if (nxt_controller_check_postpone_request(task)) {
+ nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link);
+ return;
+ }
+
+ value = nxt_controller_conf.root;
+ if (value == NULL) {
+ goto not_found;
+ }
+
+ value = nxt_conf_get_object_member(value, &applications, NULL);
+ if (value == NULL) {
+ goto not_found;
+ }
+
+ value = nxt_conf_get_object_member(value, path, NULL);
+ if (value == NULL) {
+ goto not_found;
+ }
+
+ b = nxt_buf_mem_alloc(req->conn->mem_pool, path->length, 0);
+ if (nxt_slow_path(b == NULL)) {
+ goto alloc_fail;
+ }
+
+ b->mem.free = nxt_cpymem(b->mem.pos, path->start, path->length);
+
+ rt = task->thread->runtime;
+
+ controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
+ router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
+
+ stream = nxt_port_rpc_register_handler(task, controller_port,
+ nxt_controller_app_restart_handler,
+ nxt_controller_app_restart_handler,
+ router_port->pid, req);
+ if (nxt_slow_path(stream == 0)) {
+ goto alloc_fail;
+ }
+
+ rc = nxt_port_socket_write(task, router_port, NXT_PORT_MSG_APP_RESTART,
+ -1, stream, 0, b);
+ if (nxt_slow_path(rc != NXT_OK)) {
+ nxt_port_rpc_cancel(task, controller_port, stream);
+
+ goto fail;
+ }
+
+ nxt_queue_insert_head(&nxt_controller_waiting_requests, &req->link);
+
+ return;
+
+not_allowed:
+
+ resp.status = 405;
+ resp.title = (u_char *) "Method isn't allowed.";
+ resp.offset = -1;
+
+ nxt_controller_response(task, req, &resp);
+ return;
+
+not_found:
+
+ resp.status = 404;
+ resp.title = (u_char *) "Value doesn't exist.";
+ resp.offset = -1;
+
+ nxt_controller_response(task, req, &resp);
+ return;
+
+alloc_fail:
+
+ resp.status = 500;
+ resp.title = (u_char *) "Memory allocation failed.";
+ resp.offset = -1;
+
+ nxt_controller_response(task, req, &resp);
+ return;
+
+fail:
+
+ resp.status = 500;
+ resp.title = (u_char *) "Send restart failed.";
+ resp.offset = -1;
+
+ nxt_controller_response(task, req, &resp);
+}
+
+
+static void
+nxt_controller_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
+ void *data)
+{
+ nxt_controller_request_t *req;
+ nxt_controller_response_t resp;
+
+ req = data;
+
+ nxt_debug(task, "controller app restart handler");
+
+ nxt_queue_remove(&req->link);
+
+ nxt_memzero(&resp, sizeof(nxt_controller_response_t));
+
+ if (msg->port_msg.type == NXT_PORT_MSG_RPC_READY) {
+ resp.status = 200;
+ resp.title = (u_char *) "Ok";
+
+ } else {
+ resp.status = 500;
+ resp.title = (u_char *) "Failed to restart app.";
+ resp.offset = -1;
+ }
+
+ nxt_controller_response(task, req, &resp);
+
+ nxt_controller_flush_requests(task);
+}
+
+
+static void
nxt_controller_conf_store(nxt_task_t *task, nxt_conf_value_t *conf)
{
void *mem;
diff --git a/src/nxt_port.h b/src/nxt_port.h
index 5ece3bfa..a0bc2512 100644
--- a/src/nxt_port.h
+++ b/src/nxt_port.h
@@ -50,6 +50,7 @@ struct nxt_port_handlers_s {
/* Various data. */
nxt_port_handler_t data;
+ nxt_port_handler_t app_restart;
nxt_port_handler_t oosm;
nxt_port_handler_t shm_ack;
@@ -100,6 +101,7 @@ typedef enum {
_NXT_PORT_MSG_WEBSOCKET = nxt_port_handler_idx(websocket_frame),
_NXT_PORT_MSG_DATA = nxt_port_handler_idx(data),
+ _NXT_PORT_MSG_APP_RESTART = nxt_port_handler_idx(app_restart),
_NXT_PORT_MSG_OOSM = nxt_port_handler_idx(oosm),
_NXT_PORT_MSG_SHM_ACK = nxt_port_handler_idx(shm_ack),
@@ -139,6 +141,7 @@ typedef enum {
NXT_PORT_MSG_DATA = _NXT_PORT_MSG_DATA,
NXT_PORT_MSG_DATA_LAST = nxt_msg_last(_NXT_PORT_MSG_DATA),
+ NXT_PORT_MSG_APP_RESTART = nxt_msg_last(_NXT_PORT_MSG_APP_RESTART),
NXT_PORT_MSG_OOSM = nxt_msg_last(_NXT_PORT_MSG_OOSM),
NXT_PORT_MSG_SHM_ACK = nxt_msg_last(_NXT_PORT_MSG_SHM_ACK),
diff --git a/src/nxt_router.c b/src/nxt_router.c
index c766c25e..8360e75a 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -18,6 +18,8 @@
#include <nxt_app_queue.h>
#include <nxt_port_queue.h>
+#define NXT_SHARED_PORT_ID 0xFFFFu
+
typedef struct {
nxt_str_t type;
uint32_t processes;
@@ -67,6 +69,12 @@ typedef struct {
} nxt_app_rpc_t;
+typedef struct {
+ nxt_app_joint_t *app_joint;
+ uint32_t generation;
+} nxt_app_joint_rpc_t;
+
+
static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process,
nxt_mp_t *mp);
static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data);
@@ -79,6 +87,8 @@ static void nxt_router_new_port_handler(nxt_task_t *task,
nxt_port_recv_msg_t *msg);
static void nxt_router_conf_data_handler(nxt_task_t *task,
nxt_port_recv_msg_t *msg);
+static void nxt_router_app_restart_handler(nxt_task_t *task,
+ nxt_port_recv_msg_t *msg);
static void nxt_router_remove_pid_handler(nxt_task_t *task,
nxt_port_recv_msg_t *msg);
static void nxt_router_access_log_reopen_handler(nxt_task_t *task,
@@ -281,6 +291,7 @@ static const nxt_port_handlers_t nxt_router_process_port_handlers = {
.mmap = nxt_port_mmap_handler,
.get_mmap = nxt_router_get_mmap_handler,
.data = nxt_router_conf_data_handler,
+ .app_restart = nxt_router_app_restart_handler,
.remove_pid = nxt_router_remove_pid_handler,
.access_log = nxt_router_access_log_reopen_handler,
.rpc_ready = nxt_port_rpc_handler,
@@ -379,14 +390,15 @@ static void
nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
void *data)
{
- size_t size;
- uint32_t stream;
- nxt_mp_t *mp;
- nxt_int_t ret;
- nxt_app_t *app;
- nxt_buf_t *b;
- nxt_port_t *main_port;
- nxt_runtime_t *rt;
+ size_t size;
+ uint32_t stream;
+ nxt_mp_t *mp;
+ nxt_int_t ret;
+ nxt_app_t *app;
+ nxt_buf_t *b;
+ nxt_port_t *main_port;
+ nxt_runtime_t *rt;
+ nxt_app_joint_rpc_t *app_joint_rpc;
app = data;
@@ -407,30 +419,29 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
*b->mem.free++ = '\0';
nxt_buf_cpystr(b, &app->conf);
- nxt_router_app_joint_use(task, app->joint, 1);
-
- stream = nxt_port_rpc_register_handler(task, port,
- nxt_router_app_port_ready,
- nxt_router_app_port_error,
- -1, app->joint);
-
- if (nxt_slow_path(stream == 0)) {
- nxt_router_app_joint_use(task, app->joint, -1);
-
+ app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port,
+ nxt_router_app_port_ready,
+ nxt_router_app_port_error,
+ sizeof(nxt_app_joint_rpc_t));
+ if (nxt_slow_path(app_joint_rpc == NULL)) {
goto failed;
}
+ stream = nxt_port_rpc_ex_stream(app_joint_rpc);
+
ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS,
-1, stream, port->id, b);
-
if (nxt_slow_path(ret != NXT_OK)) {
nxt_port_rpc_cancel(task, port, stream);
- nxt_router_app_joint_use(task, app->joint, -1);
-
goto failed;
}
+ app_joint_rpc->app_joint = app->joint;
+ app_joint_rpc->generation = app->generation;
+
+ nxt_router_app_joint_use(task, app->joint, 1);
+
nxt_router_app_use(task, app, -1);
return;
@@ -504,6 +515,7 @@ nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data)
{
nxt_buf_t *b, *next;
nxt_bool_t cancelled;
+ nxt_port_t *app_port;
nxt_msg_info_t *msg_info;
msg_info = &req_rpc_data->msg_info;
@@ -512,13 +524,20 @@ nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data)
return 0;
}
- cancelled = nxt_app_queue_cancel(req_rpc_data->app->shared_port->queue,
- msg_info->tracking_cookie,
- req_rpc_data->stream);
+ app_port = req_rpc_data->app_port;
+
+ if (app_port != NULL && app_port->id == NXT_SHARED_PORT_ID) {
+ cancelled = nxt_app_queue_cancel(app_port->queue,
+ msg_info->tracking_cookie,
+ req_rpc_data->stream);
+
+ if (cancelled) {
+ nxt_debug(task, "stream #%uD: cancelled by router",
+ req_rpc_data->stream);
+ }
- if (cancelled) {
- nxt_debug(task, "stream #%uD: cancelled by router",
- req_rpc_data->stream);
+ } else {
+ cancelled = 0;
}
for (b = msg_info->buf; b != NULL; b = next) {
@@ -794,6 +813,90 @@ cleanup:
static void
+nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
+{
+ nxt_app_t *app;
+ nxt_int_t ret;
+ nxt_str_t app_name;
+ nxt_port_t *port, *reply_port, *shared_port, *old_shared_port;
+ nxt_port_msg_type_t reply;
+
+ reply_port = nxt_runtime_port_find(task->thread->runtime,
+ msg->port_msg.pid,
+ msg->port_msg.reply_port);
+ if (nxt_slow_path(reply_port == NULL)) {
+ nxt_alert(task, "app_restart_handler: reply port not found");
+ return;
+ }
+
+ app_name.length = nxt_buf_mem_used_size(&msg->buf->mem);
+ app_name.start = msg->buf->mem.pos;
+
+ nxt_debug(task, "app_restart_handler: %V", &app_name);
+
+ app = nxt_router_app_find(&nxt_router->apps, &app_name);
+
+ if (nxt_fast_path(app != NULL)) {
+ shared_port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
+ NXT_PROCESS_APP);
+ if (nxt_slow_path(shared_port == NULL)) {
+ goto fail;
+ }
+
+ ret = nxt_port_socket_init(task, shared_port, 0);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ nxt_port_use(task, shared_port, -1);
+ goto fail;
+ }
+
+ ret = nxt_router_app_queue_init(task, shared_port);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ nxt_port_write_close(shared_port);
+ nxt_port_read_close(shared_port);
+ nxt_port_use(task, shared_port, -1);
+ goto fail;
+ }
+
+ nxt_port_write_enable(task, shared_port);
+
+ nxt_thread_mutex_lock(&app->mutex);
+
+ nxt_queue_each(port, &app->ports, nxt_port_t, app_link) {
+
+ (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1,
+ 0, 0, NULL);
+
+ } nxt_queue_loop;
+
+ app->generation++;
+
+ shared_port->app = app;
+
+ old_shared_port = app->shared_port;
+ old_shared_port->app = NULL;
+
+ app->shared_port = shared_port;
+
+ nxt_thread_mutex_unlock(&app->mutex);
+
+ nxt_port_close(task, old_shared_port);
+ nxt_port_use(task, old_shared_port, -1);
+
+ reply = NXT_PORT_MSG_RPC_READY_LAST;
+
+ } else {
+
+fail:
+
+ reply = NXT_PORT_MSG_RPC_ERROR;
+ }
+
+ nxt_port_socket_write(task, reply_port, reply, -1, msg->port_msg.stream,
+ 0, NULL);
+}
+
+
+static void
nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port,
void *data)
{
@@ -1607,7 +1710,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
app_joint->free_app_work.task = &engine->task;
app_joint->free_app_work.obj = app_joint;
- port = nxt_port_new(task, (nxt_port_id_t) -1, nxt_pid,
+ port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
NXT_PROCESS_APP);
if (nxt_slow_path(port == NULL)) {
return NXT_ERROR;
@@ -4233,11 +4336,16 @@ static void
nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
void *data)
{
- nxt_app_t *app;
- nxt_port_t *port;
- nxt_app_joint_t *app_joint;
+ nxt_app_t *app;
+ nxt_bool_t start_process;
+ nxt_port_t *port;
+ nxt_app_joint_t *app_joint;
+ nxt_app_joint_rpc_t *app_joint_rpc;
- app_joint = data;
+ nxt_assert(data != NULL);
+
+ app_joint_rpc = data;
+ app_joint = app_joint_rpc->app_joint;
port = msg->u.new_port;
nxt_assert(app_joint != NULL);
@@ -4257,14 +4365,37 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
return;
}
- port->app = app;
- port->main_app_port = port;
-
nxt_thread_mutex_lock(&app->mutex);
nxt_assert(app->pending_processes != 0);
app->pending_processes--;
+
+ if (nxt_slow_path(app->generation != app_joint_rpc->generation)) {
+ nxt_debug(task, "new port ready for restarted app, send QUIT");
+
+ start_process = !task->thread->engine->shutdown
+ && nxt_router_app_can_start(app)
+ && nxt_router_app_need_start(app);
+
+ if (start_process) {
+ app->pending_processes++;
+ }
+
+ nxt_thread_mutex_unlock(&app->mutex);
+
+ nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
+
+ if (start_process) {
+ nxt_router_start_app_process(task, app);
+ }
+
+ return;
+ }
+
+ port->app = app;
+ port->main_app_port = port;
+
app->processes++;
nxt_port_hash_add(&app->port_hash, port);
app->port_hash_count++;
@@ -4318,12 +4449,16 @@ static void
nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
void *data)
{
- nxt_app_t *app;
- nxt_app_joint_t *app_joint;
- nxt_queue_link_t *link;
- nxt_http_request_t *r;
+ nxt_app_t *app;
+ nxt_app_joint_t *app_joint;
+ nxt_queue_link_t *link;
+ nxt_http_request_t *r;
+ nxt_app_joint_rpc_t *app_joint_rpc;
+
+ nxt_assert(data != NULL);
- app_joint = data;
+ app_joint_rpc = data;
+ app_joint = app_joint_rpc->app_joint;
nxt_assert(app_joint != NULL);
@@ -4490,7 +4625,7 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
port->pid, port->id,
(int) inc_use, (int) got_response);
- if (port == app->shared_port) {
+ if (port->id == NXT_SHARED_PORT_ID) {
nxt_thread_mutex_lock(&app->mutex);
app->active_requests -= got_response + dec_requests;
@@ -4860,6 +4995,8 @@ nxt_router_free_app(nxt_task_t *task, void *obj, void *data)
app->shared_port->app = NULL;
nxt_port_close(task, app->shared_port);
nxt_port_use(task, app->shared_port, -1);
+
+ app->shared_port = NULL;
}
nxt_thread_mutex_destroy(&app->mutex);
diff --git a/src/nxt_router.h b/src/nxt_router.h
index 9f26a622..6611cf45 100644
--- a/src/nxt_router.h
+++ b/src/nxt_router.h
@@ -125,6 +125,8 @@ struct nxt_app_s {
uint32_t max_pending_processes;
uint32_t max_requests;
+ uint32_t generation;
+
nxt_msec_t timeout;
nxt_msec_t idle_timeout;