summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_router.c
diff options
context:
space:
mode:
authorAndrei Belov <defan@nginx.com>2021-08-19 18:17:12 +0300
committerAndrei Belov <defan@nginx.com>2021-08-19 18:17:12 +0300
commitdb442f1be7e713e6a219621ff97a51046590dbd6 (patch)
tree913734275bc890ec175e51fcb0f36b01a3c52c24 /src/nxt_router.c
parenta1d2ced6fc2317d36bc917c5d0ac339bc647dc34 (diff)
parent13c0025dfa6e041563d0ad5dd81679b44522694c (diff)
downloadunit-db442f1be7e713e6a219621ff97a51046590dbd6.tar.gz
unit-db442f1be7e713e6a219621ff97a51046590dbd6.tar.bz2
Merged with the default branch.1.25.0-1
Diffstat (limited to 'src/nxt_router.c')
-rw-r--r--src/nxt_router.c449
1 files changed, 360 insertions, 89 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c
index 015ae226..39d375f8 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -18,6 +18,8 @@
#include <nxt_app_queue.h>
#include <nxt_port_queue.h>
+#define NXT_SHARED_PORT_ID 0xFFFFu
+
typedef struct {
nxt_str_t type;
uint32_t processes;
@@ -44,10 +46,10 @@ typedef struct {
nxt_str_t name;
nxt_socket_conf_t *socket_conf;
nxt_router_temp_conf_t *temp_conf;
- nxt_conf_value_t *conf_cmds;
+ nxt_tls_init_t *tls_init;
nxt_bool_t last;
- nxt_queue_link_t link; /* for nxt_socket_conf_t.tls */
+ nxt_queue_link_t link; /* for nxt_socket_conf_t.tls */
} nxt_router_tlssock_t;
#endif
@@ -67,6 +69,12 @@ typedef struct {
} nxt_app_rpc_t;
+typedef struct {
+ nxt_app_joint_t *app_joint;
+ uint32_t generation;
+} nxt_app_joint_rpc_t;
+
+
static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process,
nxt_mp_t *mp);
static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data);
@@ -79,6 +87,8 @@ static void nxt_router_new_port_handler(nxt_task_t *task,
nxt_port_recv_msg_t *msg);
static void nxt_router_conf_data_handler(nxt_task_t *task,
nxt_port_recv_msg_t *msg);
+static void nxt_router_app_restart_handler(nxt_task_t *task,
+ nxt_port_recv_msg_t *msg);
static void nxt_router_remove_pid_handler(nxt_task_t *task,
nxt_port_recv_msg_t *msg);
static void nxt_router_access_log_reopen_handler(nxt_task_t *task,
@@ -97,6 +107,9 @@ static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task,
nxt_router_conf_t *rtcf, nxt_conf_value_t *conf);
+static nxt_int_t nxt_router_conf_process_client_ip(nxt_task_t *task,
+ nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf,
+ nxt_conf_value_t *conf);
static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
static nxt_int_t nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data);
@@ -123,8 +136,8 @@ static void nxt_router_listen_socket_error(nxt_task_t *task,
static void nxt_router_tls_rpc_handler(nxt_task_t *task,
nxt_port_recv_msg_t *msg, void *data);
static nxt_int_t nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
- nxt_conf_value_t *value, nxt_socket_conf_t *skcf,
- nxt_conf_value_t * conf_cmds);
+ nxt_conf_value_t *value, nxt_socket_conf_t *skcf, nxt_tls_init_t *tls_init,
+ nxt_bool_t last);
#endif
static void nxt_router_app_rpc_create(nxt_task_t *task,
nxt_router_temp_conf_t *tmcf, nxt_app_t *app);
@@ -224,8 +237,6 @@ static void nxt_router_http_request_error(nxt_task_t *task, void *obj,
static void nxt_router_http_request_done(nxt_task_t *task, void *obj,
void *data);
-static void nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj,
- void *data);
static void nxt_router_app_prepare_request(nxt_task_t *task,
nxt_request_rpc_data_t *req_rpc_data);
static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task,
@@ -281,6 +292,7 @@ static const nxt_port_handlers_t nxt_router_process_port_handlers = {
.mmap = nxt_port_mmap_handler,
.get_mmap = nxt_router_get_mmap_handler,
.data = nxt_router_conf_data_handler,
+ .app_restart = nxt_router_app_restart_handler,
.remove_pid = nxt_router_remove_pid_handler,
.access_log = nxt_router_access_log_reopen_handler,
.rpc_ready = nxt_port_rpc_handler,
@@ -379,14 +391,15 @@ static void
nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
void *data)
{
- size_t size;
- uint32_t stream;
- nxt_mp_t *mp;
- nxt_int_t ret;
- nxt_app_t *app;
- nxt_buf_t *b;
- nxt_port_t *main_port;
- nxt_runtime_t *rt;
+ size_t size;
+ uint32_t stream;
+ nxt_mp_t *mp;
+ nxt_int_t ret;
+ nxt_app_t *app;
+ nxt_buf_t *b;
+ nxt_port_t *main_port;
+ nxt_runtime_t *rt;
+ nxt_app_joint_rpc_t *app_joint_rpc;
app = data;
@@ -407,30 +420,29 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
*b->mem.free++ = '\0';
nxt_buf_cpystr(b, &app->conf);
- nxt_router_app_joint_use(task, app->joint, 1);
-
- stream = nxt_port_rpc_register_handler(task, port,
- nxt_router_app_port_ready,
- nxt_router_app_port_error,
- -1, app->joint);
-
- if (nxt_slow_path(stream == 0)) {
- nxt_router_app_joint_use(task, app->joint, -1);
-
+ app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port,
+ nxt_router_app_port_ready,
+ nxt_router_app_port_error,
+ sizeof(nxt_app_joint_rpc_t));
+ if (nxt_slow_path(app_joint_rpc == NULL)) {
goto failed;
}
+ stream = nxt_port_rpc_ex_stream(app_joint_rpc);
+
ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS,
-1, stream, port->id, b);
-
if (nxt_slow_path(ret != NXT_OK)) {
nxt_port_rpc_cancel(task, port, stream);
- nxt_router_app_joint_use(task, app->joint, -1);
-
goto failed;
}
+ app_joint_rpc->app_joint = app->joint;
+ app_joint_rpc->generation = app->generation;
+
+ nxt_router_app_joint_use(task, app->joint, 1);
+
nxt_router_app_use(task, app, -1);
return;
@@ -504,6 +516,7 @@ nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data)
{
nxt_buf_t *b, *next;
nxt_bool_t cancelled;
+ nxt_port_t *app_port;
nxt_msg_info_t *msg_info;
msg_info = &req_rpc_data->msg_info;
@@ -512,13 +525,20 @@ nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data)
return 0;
}
- cancelled = nxt_app_queue_cancel(req_rpc_data->app->shared_port->queue,
- msg_info->tracking_cookie,
- req_rpc_data->stream);
+ app_port = req_rpc_data->app_port;
+
+ if (app_port != NULL && app_port->id == NXT_SHARED_PORT_ID) {
+ cancelled = nxt_app_queue_cancel(app_port->queue,
+ msg_info->tracking_cookie,
+ req_rpc_data->stream);
- if (cancelled) {
- nxt_debug(task, "stream #%uD: cancelled by router",
- req_rpc_data->stream);
+ if (cancelled) {
+ nxt_debug(task, "stream #%uD: cancelled by router",
+ req_rpc_data->stream);
+ }
+
+ } else {
+ cancelled = 0;
}
for (b = msg_info->buf; b != NULL; b = next) {
@@ -680,18 +700,20 @@ nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
nxt_assert(main_app_port != NULL);
app = main_app_port->app;
- nxt_assert(app != NULL);
- nxt_thread_mutex_lock(&app->mutex);
+ if (nxt_fast_path(app != NULL)) {
+ nxt_thread_mutex_lock(&app->mutex);
- /* TODO here should be find-and-add code because there can be
- port waiters in port_hash */
- nxt_port_hash_add(&app->port_hash, port);
- app->port_hash_count++;
+ /* TODO here should be find-and-add code because there can be
+ port waiters in port_hash */
+ nxt_port_hash_add(&app->port_hash, port);
+ app->port_hash_count++;
- nxt_thread_mutex_unlock(&app->mutex);
+ nxt_thread_mutex_unlock(&app->mutex);
+
+ port->app = app;
+ }
- port->app = app;
port->main_app_port = main_app_port;
nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
@@ -792,6 +814,90 @@ cleanup:
static void
+nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
+{
+ nxt_app_t *app;
+ nxt_int_t ret;
+ nxt_str_t app_name;
+ nxt_port_t *port, *reply_port, *shared_port, *old_shared_port;
+ nxt_port_msg_type_t reply;
+
+ reply_port = nxt_runtime_port_find(task->thread->runtime,
+ msg->port_msg.pid,
+ msg->port_msg.reply_port);
+ if (nxt_slow_path(reply_port == NULL)) {
+ nxt_alert(task, "app_restart_handler: reply port not found");
+ return;
+ }
+
+ app_name.length = nxt_buf_mem_used_size(&msg->buf->mem);
+ app_name.start = msg->buf->mem.pos;
+
+ nxt_debug(task, "app_restart_handler: %V", &app_name);
+
+ app = nxt_router_app_find(&nxt_router->apps, &app_name);
+
+ if (nxt_fast_path(app != NULL)) {
+ shared_port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
+ NXT_PROCESS_APP);
+ if (nxt_slow_path(shared_port == NULL)) {
+ goto fail;
+ }
+
+ ret = nxt_port_socket_init(task, shared_port, 0);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ nxt_port_use(task, shared_port, -1);
+ goto fail;
+ }
+
+ ret = nxt_router_app_queue_init(task, shared_port);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ nxt_port_write_close(shared_port);
+ nxt_port_read_close(shared_port);
+ nxt_port_use(task, shared_port, -1);
+ goto fail;
+ }
+
+ nxt_port_write_enable(task, shared_port);
+
+ nxt_thread_mutex_lock(&app->mutex);
+
+ nxt_queue_each(port, &app->ports, nxt_port_t, app_link) {
+
+ (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1,
+ 0, 0, NULL);
+
+ } nxt_queue_loop;
+
+ app->generation++;
+
+ shared_port->app = app;
+
+ old_shared_port = app->shared_port;
+ old_shared_port->app = NULL;
+
+ app->shared_port = shared_port;
+
+ nxt_thread_mutex_unlock(&app->mutex);
+
+ nxt_port_close(task, old_shared_port);
+ nxt_port_use(task, old_shared_port, -1);
+
+ reply = NXT_PORT_MSG_RPC_READY_LAST;
+
+ } else {
+
+fail:
+
+ reply = NXT_PORT_MSG_RPC_ERROR;
+ }
+
+ nxt_port_socket_write(task, reply_port, reply, -1, msg->port_msg.stream,
+ 0, NULL);
+}
+
+
+static void
nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port,
void *data)
{
@@ -956,8 +1062,6 @@ nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link);
- tls->last = nxt_queue_is_empty(&tmcf->tls);
-
nxt_cert_store_get(task, &tls->name, tmcf->mem_pool,
nxt_router_tls_rpc_handler, tls);
return;
@@ -1341,12 +1445,13 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_router_t *router;
nxt_app_joint_t *app_joint;
#if (NXT_TLS)
- nxt_conf_value_t *certificate, *conf_cmds;
+ nxt_tls_init_t *tls_init;
+ nxt_conf_value_t *certificate;
#endif
nxt_conf_value_t *conf, *http, *value, *websocket;
nxt_conf_value_t *applications, *application;
nxt_conf_value_t *listeners, *listener;
- nxt_conf_value_t *routes_conf, *static_conf;
+ nxt_conf_value_t *routes_conf, *static_conf, *client_ip_conf;
nxt_socket_conf_t *skcf;
nxt_http_routes_t *routes;
nxt_event_engine_t *engine;
@@ -1363,9 +1468,13 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
#if (NXT_TLS)
static nxt_str_t certificate_path = nxt_string("/tls/certificate");
static nxt_str_t conf_commands_path = nxt_string("/tls/conf_commands");
+ static nxt_str_t conf_cache_path = nxt_string("/tls/session/cache_size");
+ static nxt_str_t conf_timeout_path = nxt_string("/tls/session/timeout");
+ static nxt_str_t conf_tickets = nxt_string("/tls/session/tickets");
#endif
static nxt_str_t static_path = nxt_string("/settings/http/static");
static nxt_str_t websocket_path = nxt_string("/settings/http/websocket");
+ static nxt_str_t client_ip_path = nxt_string("/client_ip");
conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
if (conf == NULL) {
@@ -1604,7 +1713,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
app_joint->free_app_work.task = &engine->task;
app_joint->free_app_work.obj = app_joint;
- port = nxt_port_new(task, (nxt_port_id_t) -1, nxt_pid,
+ port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
NXT_PROCESS_APP);
if (nxt_slow_path(port == NULL)) {
return NXT_ERROR;
@@ -1737,11 +1846,40 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
t->length = nxt_strlen(t->start);
}
+ client_ip_conf = nxt_conf_get_path(listener, &client_ip_path);
+ ret = nxt_router_conf_process_client_ip(task, tmcf, skcf,
+ client_ip_conf);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ return NXT_ERROR;
+ }
+
#if (NXT_TLS)
certificate = nxt_conf_get_path(listener, &certificate_path);
if (certificate != NULL) {
- conf_cmds = nxt_conf_get_path(listener, &conf_commands_path);
+ tls_init = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_tls_init_t));
+ if (nxt_slow_path(tls_init == NULL)) {
+ return NXT_ERROR;
+ }
+
+ tls_init->cache_size = 0;
+ tls_init->timeout = 300;
+
+ value = nxt_conf_get_path(listener, &conf_cache_path);
+ if (value != NULL) {
+ tls_init->cache_size = nxt_conf_get_number(value);
+ }
+
+ value = nxt_conf_get_path(listener, &conf_timeout_path);
+ if (value != NULL) {
+ tls_init->timeout = nxt_conf_get_number(value);
+ }
+
+ tls_init->conf_cmds = nxt_conf_get_path(listener,
+ &conf_commands_path);
+
+ tls_init->tickets_conf = nxt_conf_get_path(listener,
+ &conf_tickets);
if (nxt_conf_type(certificate) == NXT_CONF_ARRAY) {
n = nxt_conf_array_elements_count(certificate);
@@ -1752,7 +1890,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_assert(value != NULL);
ret = nxt_router_conf_tls_insert(tmcf, value, skcf,
- conf_cmds);
+ tls_init, i == 0);
if (nxt_slow_path(ret != NXT_OK)) {
goto fail;
}
@@ -1761,7 +1899,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
} else {
/* NXT_CONF_STRING */
ret = nxt_router_conf_tls_insert(tmcf, certificate, skcf,
- conf_cmds);
+ tls_init, 1);
if (nxt_slow_path(ret != NXT_OK)) {
goto fail;
}
@@ -1856,7 +1994,7 @@ fail:
static nxt_int_t
nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
nxt_conf_value_t *value, nxt_socket_conf_t *skcf,
- nxt_conf_value_t *conf_cmds)
+ nxt_tls_init_t *tls_init, nxt_bool_t last)
{
nxt_router_tlssock_t *tls;
@@ -1865,9 +2003,10 @@ nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
return NXT_ERROR;
}
+ tls->tls_init = tls_init;
tls->socket_conf = skcf;
- tls->conf_cmds = conf_cmds;
tls->temp_conf = tmcf;
+ tls->last = last;
nxt_conf_get_string(value, &tls->name);
nxt_queue_insert_tail(&tmcf->tls, &tls->link);
@@ -1884,7 +2023,7 @@ nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf,
{
uint32_t next, i;
nxt_mp_t *mp;
- nxt_str_t *type, extension, str;
+ nxt_str_t *type, exten, str;
nxt_int_t ret;
nxt_uint_t exts;
nxt_conf_value_t *mtypes_conf, *ext_conf, *value;
@@ -1922,12 +2061,12 @@ nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf,
if (nxt_conf_type(ext_conf) == NXT_CONF_STRING) {
nxt_conf_get_string(ext_conf, &str);
- if (nxt_slow_path(nxt_str_dup(mp, &extension, &str) == NULL)) {
+ if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
return NXT_ERROR;
}
ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
- &extension, type);
+ &exten, type);
if (nxt_slow_path(ret != NXT_OK)) {
return NXT_ERROR;
}
@@ -1942,12 +2081,12 @@ nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf,
nxt_conf_get_string(value, &str);
- if (nxt_slow_path(nxt_str_dup(mp, &extension, &str) == NULL)) {
+ if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
return NXT_ERROR;
}
ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
- &extension, type);
+ &exten, type);
if (nxt_slow_path(ret != NXT_OK)) {
return NXT_ERROR;
}
@@ -1959,6 +2098,79 @@ nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf,
}
+static nxt_int_t
+nxt_router_conf_process_client_ip(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
+ nxt_socket_conf_t *skcf, nxt_conf_value_t *conf)
+{
+ char c;
+ size_t i;
+ nxt_mp_t *mp;
+ uint32_t hash;
+ nxt_str_t header;
+ nxt_conf_value_t *source_conf, *header_conf, *recursive_conf;
+ nxt_http_client_ip_t *client_ip;
+ nxt_http_route_addr_rule_t *source;
+
+ static nxt_str_t header_path = nxt_string("/header");
+ static nxt_str_t source_path = nxt_string("/source");
+ static nxt_str_t recursive_path = nxt_string("/recursive");
+
+ if (conf == NULL) {
+ skcf->client_ip = NULL;
+
+ return NXT_OK;
+ }
+
+ mp = tmcf->router_conf->mem_pool;
+
+ source_conf = nxt_conf_get_path(conf, &source_path);
+ header_conf = nxt_conf_get_path(conf, &header_path);
+ recursive_conf = nxt_conf_get_path(conf, &recursive_path);
+
+ if (source_conf == NULL || header_conf == NULL) {
+ return NXT_ERROR;
+ }
+
+ client_ip = nxt_mp_zget(mp, sizeof(nxt_http_client_ip_t));
+ if (nxt_slow_path(client_ip == NULL)) {
+ return NXT_ERROR;
+ }
+
+ source = nxt_http_route_addr_rule_create(task, mp, source_conf);
+ if (nxt_slow_path(source == NULL)) {
+ return NXT_ERROR;
+ }
+
+ client_ip->source = source;
+
+ nxt_conf_get_string(header_conf, &header);
+
+ if (recursive_conf != NULL) {
+ client_ip->recursive = nxt_conf_get_boolean(recursive_conf);
+ }
+
+ client_ip->header = nxt_str_dup(mp, NULL, &header);
+ if (nxt_slow_path(client_ip->header == NULL)) {
+ return NXT_ERROR;
+ }
+
+ hash = NXT_HTTP_FIELD_HASH_INIT;
+
+ for (i = 0; i < client_ip->header->length; i++) {
+ c = client_ip->header->start[i];
+ hash = nxt_http_field_hash_char(hash, nxt_lowcase(c));
+ }
+
+ hash = nxt_http_field_hash_end(hash) & 0xFFFF;
+
+ client_ip->header_hash = hash;
+
+ skcf->client_ip = client_ip;
+
+ return NXT_OK;
+}
+
+
static nxt_app_t *
nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
{
@@ -2135,21 +2347,46 @@ nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, int i)
}
+typedef struct {
+ nxt_app_t *app;
+ nxt_int_t target;
+} nxt_http_app_conf_t;
+
nxt_int_t
-nxt_router_listener_application(nxt_router_conf_t *rtcf, nxt_str_t *name,
- nxt_http_action_t *action)
+nxt_router_application_init(nxt_router_conf_t *rtcf, nxt_str_t *name,
+ nxt_str_t *target, nxt_http_action_t *action)
{
- nxt_app_t *app;
+ nxt_app_t *app;
+ nxt_str_t *targets;
+ nxt_uint_t i;
+ nxt_http_app_conf_t *conf;
app = nxt_router_apps_hash_get(rtcf, name);
-
if (app == NULL) {
return NXT_DECLINED;
}
- action->u.app.application = app;
+ conf = nxt_mp_get(rtcf->mem_pool, sizeof(nxt_http_app_conf_t));
+ if (nxt_slow_path(conf == NULL)) {
+ return NXT_ERROR;
+ }
+
action->handler = nxt_http_application_handler;
+ action->u.conf = conf;
+
+ conf->app = app;
+
+ if (target != NULL && target->length != 0) {
+ targets = app->targets;
+
+ for (i = 0; !nxt_strstr_eq(target, &targets[i]); i++);
+
+ conf->target = i;
+
+ } else {
+ conf->target = 0;
+ }
return NXT_OK;
}
@@ -2297,7 +2534,7 @@ nxt_router_listen_socket_rpc_create(nxt_task_t *task,
goto fail;
}
- b->completion_handler = nxt_router_dummy_buf_completion;
+ b->completion_handler = nxt_buf_dummy_completion;
b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size);
@@ -2466,6 +2703,8 @@ nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
tlscf = tls->socket_conf->tls;
}
+ tls->tls_init->conf = tlscf;
+
bundle = nxt_mp_get(mp, sizeof(nxt_tls_bundle_conf_t));
if (nxt_slow_path(bundle == NULL)) {
goto fail;
@@ -2479,8 +2718,8 @@ nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
bundle->next = tlscf->bundle;
tlscf->bundle = bundle;
- ret = task->thread->runtime->tls->server_init(task, tlscf, mp,
- tls->conf_cmds, tls->last);
+ ret = task->thread->runtime->tls->server_init(task, mp, tls->tls_init,
+ tls->last);
if (nxt_slow_path(ret != NXT_OK)) {
goto fail;
}
@@ -2526,7 +2765,7 @@ nxt_router_app_rpc_create(nxt_task_t *task,
goto fail;
}
- b->completion_handler = nxt_router_dummy_buf_completion;
+ b->completion_handler = nxt_buf_dummy_completion;
nxt_buf_cpystr(b, &app->name);
*b->mem.free++ = '\0';
@@ -3555,7 +3794,7 @@ nxt_router_access_log_open(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
goto fail;
}
- b->completion_handler = nxt_router_dummy_buf_completion;
+ b->completion_handler = nxt_buf_dummy_completion;
nxt_buf_cpystr(b, &access_log->path);
*b->mem.free++ = '\0';
@@ -4183,11 +4422,16 @@ static void
nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
void *data)
{
- nxt_app_t *app;
- nxt_port_t *port;
- nxt_app_joint_t *app_joint;
+ nxt_app_t *app;
+ nxt_bool_t start_process;
+ nxt_port_t *port;
+ nxt_app_joint_t *app_joint;
+ nxt_app_joint_rpc_t *app_joint_rpc;
+
+ nxt_assert(data != NULL);
- app_joint = data;
+ app_joint_rpc = data;
+ app_joint = app_joint_rpc->app_joint;
port = msg->u.new_port;
nxt_assert(app_joint != NULL);
@@ -4207,14 +4451,37 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
return;
}
- port->app = app;
- port->main_app_port = port;
-
nxt_thread_mutex_lock(&app->mutex);
nxt_assert(app->pending_processes != 0);
app->pending_processes--;
+
+ if (nxt_slow_path(app->generation != app_joint_rpc->generation)) {
+ nxt_debug(task, "new port ready for restarted app, send QUIT");
+
+ start_process = !task->thread->engine->shutdown
+ && nxt_router_app_can_start(app)
+ && nxt_router_app_need_start(app);
+
+ if (start_process) {
+ app->pending_processes++;
+ }
+
+ nxt_thread_mutex_unlock(&app->mutex);
+
+ nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
+
+ if (start_process) {
+ nxt_router_start_app_process(task, app);
+ }
+
+ return;
+ }
+
+ port->app = app;
+ port->main_app_port = port;
+
app->processes++;
nxt_port_hash_add(&app->port_hash, port);
app->port_hash_count++;
@@ -4268,12 +4535,16 @@ static void
nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
void *data)
{
- nxt_app_t *app;
- nxt_app_joint_t *app_joint;
- nxt_queue_link_t *link;
- nxt_http_request_t *r;
+ nxt_app_t *app;
+ nxt_app_joint_t *app_joint;
+ nxt_queue_link_t *link;
+ nxt_http_request_t *r;
+ nxt_app_joint_rpc_t *app_joint_rpc;
+
+ nxt_assert(data != NULL);
- app_joint = data;
+ app_joint_rpc = data;
+ app_joint = app_joint_rpc->app_joint;
nxt_assert(app_joint != NULL);
@@ -4440,7 +4711,7 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
port->pid, port->id,
(int) inc_use, (int) got_response);
- if (port == app->shared_port) {
+ if (port->id == NXT_SHARED_PORT_ID) {
nxt_thread_mutex_lock(&app->mutex);
app->active_requests -= got_response + dec_requests;
@@ -4810,6 +5081,8 @@ nxt_router_free_app(nxt_task_t *task, void *obj, void *data)
app->shared_port->app = NULL;
nxt_port_close(task, app->shared_port);
nxt_port_use(task, app->shared_port, -1);
+
+ app->shared_port = NULL;
}
nxt_thread_mutex_destroy(&app->mutex);
@@ -4876,13 +5149,17 @@ nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
void
nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r,
- nxt_app_t *app)
+ nxt_http_action_t *action)
{
nxt_event_engine_t *engine;
+ nxt_http_app_conf_t *conf;
nxt_request_rpc_data_t *req_rpc_data;
+ conf = action->u.conf;
engine = task->thread->engine;
+ r->app_target = conf->target;
+
req_rpc_data = nxt_port_rpc_register_handler_ex(task, engine->port,
nxt_router_response_ready_handler,
nxt_router_response_error_handler,
@@ -4913,11 +5190,11 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r,
r->err_work.obj = r;
req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data);
- req_rpc_data->app = app;
+ req_rpc_data->app = conf->app;
req_rpc_data->msg_info.body_fd = -1;
req_rpc_data->rpc_cancel = 1;
- nxt_router_app_use(task, app, 1);
+ nxt_router_app_use(task, conf->app, 1);
req_rpc_data->request = r;
r->req_rpc_data = req_rpc_data;
@@ -4926,7 +5203,7 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r,
r->last->completion_handler = nxt_router_http_request_done;
}
- nxt_router_app_port_get(task, app, req_rpc_data);
+ nxt_router_app_port_get(task, conf->app, req_rpc_data);
nxt_router_app_prepare_request(task, req_rpc_data);
}
@@ -4968,12 +5245,6 @@ nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data)
static void
-nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj, void *data)
-{
-}
-
-
-static void
nxt_router_app_prepare_request(nxt_task_t *task,
nxt_request_rpc_data_t *req_rpc_data)
{