diff options
Diffstat (limited to '')
-rw-r--r-- | src/nxt_router.c | 449 |
1 files changed, 360 insertions, 89 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c index 015ae226..39d375f8 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -18,6 +18,8 @@ #include <nxt_app_queue.h> #include <nxt_port_queue.h> +#define NXT_SHARED_PORT_ID 0xFFFFu + typedef struct { nxt_str_t type; uint32_t processes; @@ -44,10 +46,10 @@ typedef struct { nxt_str_t name; nxt_socket_conf_t *socket_conf; nxt_router_temp_conf_t *temp_conf; - nxt_conf_value_t *conf_cmds; + nxt_tls_init_t *tls_init; nxt_bool_t last; - nxt_queue_link_t link; /* for nxt_socket_conf_t.tls */ + nxt_queue_link_t link; /* for nxt_socket_conf_t.tls */ } nxt_router_tlssock_t; #endif @@ -67,6 +69,12 @@ typedef struct { } nxt_app_rpc_t; +typedef struct { + nxt_app_joint_t *app_joint; + uint32_t generation; +} nxt_app_joint_rpc_t; + + static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp); static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data); @@ -79,6 +87,8 @@ static void nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); static void nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); +static void nxt_router_app_restart_handler(nxt_task_t *task, + nxt_port_recv_msg_t *msg); static void nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); static void nxt_router_access_log_reopen_handler(nxt_task_t *task, @@ -97,6 +107,9 @@ static nxt_int_t nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end); static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf, nxt_conf_value_t *conf); +static nxt_int_t nxt_router_conf_process_client_ip(nxt_task_t *task, + nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf, + nxt_conf_value_t *conf); static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name); static nxt_int_t nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data); @@ -123,8 +136,8 @@ static void nxt_router_listen_socket_error(nxt_task_t *task, static void nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data); static nxt_int_t nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf, - nxt_conf_value_t *value, nxt_socket_conf_t *skcf, - nxt_conf_value_t * conf_cmds); + nxt_conf_value_t *value, nxt_socket_conf_t *skcf, nxt_tls_init_t *tls_init, + nxt_bool_t last); #endif static void nxt_router_app_rpc_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_app_t *app); @@ -224,8 +237,6 @@ static void nxt_router_http_request_error(nxt_task_t *task, void *obj, static void nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data); -static void nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj, - void *data); static void nxt_router_app_prepare_request(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data); static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task, @@ -281,6 +292,7 @@ static const nxt_port_handlers_t nxt_router_process_port_handlers = { .mmap = nxt_port_mmap_handler, .get_mmap = nxt_router_get_mmap_handler, .data = nxt_router_conf_data_handler, + .app_restart = nxt_router_app_restart_handler, .remove_pid = nxt_router_remove_pid_handler, .access_log = nxt_router_access_log_reopen_handler, .rpc_ready = nxt_port_rpc_handler, @@ -379,14 +391,15 @@ static void nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, void *data) { - size_t size; - uint32_t stream; - nxt_mp_t *mp; - nxt_int_t ret; - nxt_app_t *app; - nxt_buf_t *b; - nxt_port_t *main_port; - nxt_runtime_t *rt; + size_t size; + uint32_t stream; + nxt_mp_t *mp; + nxt_int_t ret; + nxt_app_t *app; + nxt_buf_t *b; + nxt_port_t *main_port; + nxt_runtime_t *rt; + nxt_app_joint_rpc_t *app_joint_rpc; app = data; @@ -407,30 +420,29 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, *b->mem.free++ = '\0'; nxt_buf_cpystr(b, &app->conf); - nxt_router_app_joint_use(task, app->joint, 1); - - stream = nxt_port_rpc_register_handler(task, port, - nxt_router_app_port_ready, - nxt_router_app_port_error, - -1, app->joint); - - if (nxt_slow_path(stream == 0)) { - nxt_router_app_joint_use(task, app->joint, -1); - + app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port, + nxt_router_app_port_ready, + nxt_router_app_port_error, + sizeof(nxt_app_joint_rpc_t)); + if (nxt_slow_path(app_joint_rpc == NULL)) { goto failed; } + stream = nxt_port_rpc_ex_stream(app_joint_rpc); + ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS, -1, stream, port->id, b); - if (nxt_slow_path(ret != NXT_OK)) { nxt_port_rpc_cancel(task, port, stream); - nxt_router_app_joint_use(task, app->joint, -1); - goto failed; } + app_joint_rpc->app_joint = app->joint; + app_joint_rpc->generation = app->generation; + + nxt_router_app_joint_use(task, app->joint, 1); + nxt_router_app_use(task, app, -1); return; @@ -504,6 +516,7 @@ nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data) { nxt_buf_t *b, *next; nxt_bool_t cancelled; + nxt_port_t *app_port; nxt_msg_info_t *msg_info; msg_info = &req_rpc_data->msg_info; @@ -512,13 +525,20 @@ nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data) return 0; } - cancelled = nxt_app_queue_cancel(req_rpc_data->app->shared_port->queue, - msg_info->tracking_cookie, - req_rpc_data->stream); + app_port = req_rpc_data->app_port; + + if (app_port != NULL && app_port->id == NXT_SHARED_PORT_ID) { + cancelled = nxt_app_queue_cancel(app_port->queue, + msg_info->tracking_cookie, + req_rpc_data->stream); - if (cancelled) { - nxt_debug(task, "stream #%uD: cancelled by router", - req_rpc_data->stream); + if (cancelled) { + nxt_debug(task, "stream #%uD: cancelled by router", + req_rpc_data->stream); + } + + } else { + cancelled = 0; } for (b = msg_info->buf; b != NULL; b = next) { @@ -680,18 +700,20 @@ nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_assert(main_app_port != NULL); app = main_app_port->app; - nxt_assert(app != NULL); - nxt_thread_mutex_lock(&app->mutex); + if (nxt_fast_path(app != NULL)) { + nxt_thread_mutex_lock(&app->mutex); - /* TODO here should be find-and-add code because there can be - port waiters in port_hash */ - nxt_port_hash_add(&app->port_hash, port); - app->port_hash_count++; + /* TODO here should be find-and-add code because there can be + port waiters in port_hash */ + nxt_port_hash_add(&app->port_hash, port); + app->port_hash_count++; - nxt_thread_mutex_unlock(&app->mutex); + nxt_thread_mutex_unlock(&app->mutex); + + port->app = app; + } - port->app = app; port->main_app_port = main_app_port; nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL); @@ -792,6 +814,90 @@ cleanup: static void +nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + nxt_app_t *app; + nxt_int_t ret; + nxt_str_t app_name; + nxt_port_t *port, *reply_port, *shared_port, *old_shared_port; + nxt_port_msg_type_t reply; + + reply_port = nxt_runtime_port_find(task->thread->runtime, + msg->port_msg.pid, + msg->port_msg.reply_port); + if (nxt_slow_path(reply_port == NULL)) { + nxt_alert(task, "app_restart_handler: reply port not found"); + return; + } + + app_name.length = nxt_buf_mem_used_size(&msg->buf->mem); + app_name.start = msg->buf->mem.pos; + + nxt_debug(task, "app_restart_handler: %V", &app_name); + + app = nxt_router_app_find(&nxt_router->apps, &app_name); + + if (nxt_fast_path(app != NULL)) { + shared_port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid, + NXT_PROCESS_APP); + if (nxt_slow_path(shared_port == NULL)) { + goto fail; + } + + ret = nxt_port_socket_init(task, shared_port, 0); + if (nxt_slow_path(ret != NXT_OK)) { + nxt_port_use(task, shared_port, -1); + goto fail; + } + + ret = nxt_router_app_queue_init(task, shared_port); + if (nxt_slow_path(ret != NXT_OK)) { + nxt_port_write_close(shared_port); + nxt_port_read_close(shared_port); + nxt_port_use(task, shared_port, -1); + goto fail; + } + + nxt_port_write_enable(task, shared_port); + + nxt_thread_mutex_lock(&app->mutex); + + nxt_queue_each(port, &app->ports, nxt_port_t, app_link) { + + (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, + 0, 0, NULL); + + } nxt_queue_loop; + + app->generation++; + + shared_port->app = app; + + old_shared_port = app->shared_port; + old_shared_port->app = NULL; + + app->shared_port = shared_port; + + nxt_thread_mutex_unlock(&app->mutex); + + nxt_port_close(task, old_shared_port); + nxt_port_use(task, old_shared_port, -1); + + reply = NXT_PORT_MSG_RPC_READY_LAST; + + } else { + +fail: + + reply = NXT_PORT_MSG_RPC_ERROR; + } + + nxt_port_socket_write(task, reply_port, reply, -1, msg->port_msg.stream, + 0, NULL); +} + + +static void nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port, void *data) { @@ -956,8 +1062,6 @@ nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data) tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link); - tls->last = nxt_queue_is_empty(&tmcf->tls); - nxt_cert_store_get(task, &tls->name, tmcf->mem_pool, nxt_router_tls_rpc_handler, tls); return; @@ -1341,12 +1445,13 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_router_t *router; nxt_app_joint_t *app_joint; #if (NXT_TLS) - nxt_conf_value_t *certificate, *conf_cmds; + nxt_tls_init_t *tls_init; + nxt_conf_value_t *certificate; #endif nxt_conf_value_t *conf, *http, *value, *websocket; nxt_conf_value_t *applications, *application; nxt_conf_value_t *listeners, *listener; - nxt_conf_value_t *routes_conf, *static_conf; + nxt_conf_value_t *routes_conf, *static_conf, *client_ip_conf; nxt_socket_conf_t *skcf; nxt_http_routes_t *routes; nxt_event_engine_t *engine; @@ -1363,9 +1468,13 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, #if (NXT_TLS) static nxt_str_t certificate_path = nxt_string("/tls/certificate"); static nxt_str_t conf_commands_path = nxt_string("/tls/conf_commands"); + static nxt_str_t conf_cache_path = nxt_string("/tls/session/cache_size"); + static nxt_str_t conf_timeout_path = nxt_string("/tls/session/timeout"); + static nxt_str_t conf_tickets = nxt_string("/tls/session/tickets"); #endif static nxt_str_t static_path = nxt_string("/settings/http/static"); static nxt_str_t websocket_path = nxt_string("/settings/http/websocket"); + static nxt_str_t client_ip_path = nxt_string("/client_ip"); conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL); if (conf == NULL) { @@ -1604,7 +1713,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, app_joint->free_app_work.task = &engine->task; app_joint->free_app_work.obj = app_joint; - port = nxt_port_new(task, (nxt_port_id_t) -1, nxt_pid, + port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid, NXT_PROCESS_APP); if (nxt_slow_path(port == NULL)) { return NXT_ERROR; @@ -1737,11 +1846,40 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, t->length = nxt_strlen(t->start); } + client_ip_conf = nxt_conf_get_path(listener, &client_ip_path); + ret = nxt_router_conf_process_client_ip(task, tmcf, skcf, + client_ip_conf); + if (nxt_slow_path(ret != NXT_OK)) { + return NXT_ERROR; + } + #if (NXT_TLS) certificate = nxt_conf_get_path(listener, &certificate_path); if (certificate != NULL) { - conf_cmds = nxt_conf_get_path(listener, &conf_commands_path); + tls_init = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_tls_init_t)); + if (nxt_slow_path(tls_init == NULL)) { + return NXT_ERROR; + } + + tls_init->cache_size = 0; + tls_init->timeout = 300; + + value = nxt_conf_get_path(listener, &conf_cache_path); + if (value != NULL) { + tls_init->cache_size = nxt_conf_get_number(value); + } + + value = nxt_conf_get_path(listener, &conf_timeout_path); + if (value != NULL) { + tls_init->timeout = nxt_conf_get_number(value); + } + + tls_init->conf_cmds = nxt_conf_get_path(listener, + &conf_commands_path); + + tls_init->tickets_conf = nxt_conf_get_path(listener, + &conf_tickets); if (nxt_conf_type(certificate) == NXT_CONF_ARRAY) { n = nxt_conf_array_elements_count(certificate); @@ -1752,7 +1890,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_assert(value != NULL); ret = nxt_router_conf_tls_insert(tmcf, value, skcf, - conf_cmds); + tls_init, i == 0); if (nxt_slow_path(ret != NXT_OK)) { goto fail; } @@ -1761,7 +1899,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, } else { /* NXT_CONF_STRING */ ret = nxt_router_conf_tls_insert(tmcf, certificate, skcf, - conf_cmds); + tls_init, 1); if (nxt_slow_path(ret != NXT_OK)) { goto fail; } @@ -1856,7 +1994,7 @@ fail: static nxt_int_t nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf, nxt_conf_value_t *value, nxt_socket_conf_t *skcf, - nxt_conf_value_t *conf_cmds) + nxt_tls_init_t *tls_init, nxt_bool_t last) { nxt_router_tlssock_t *tls; @@ -1865,9 +2003,10 @@ nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf, return NXT_ERROR; } + tls->tls_init = tls_init; tls->socket_conf = skcf; - tls->conf_cmds = conf_cmds; tls->temp_conf = tmcf; + tls->last = last; nxt_conf_get_string(value, &tls->name); nxt_queue_insert_tail(&tmcf->tls, &tls->link); @@ -1884,7 +2023,7 @@ nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf, { uint32_t next, i; nxt_mp_t *mp; - nxt_str_t *type, extension, str; + nxt_str_t *type, exten, str; nxt_int_t ret; nxt_uint_t exts; nxt_conf_value_t *mtypes_conf, *ext_conf, *value; @@ -1922,12 +2061,12 @@ nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf, if (nxt_conf_type(ext_conf) == NXT_CONF_STRING) { nxt_conf_get_string(ext_conf, &str); - if (nxt_slow_path(nxt_str_dup(mp, &extension, &str) == NULL)) { + if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) { return NXT_ERROR; } ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash, - &extension, type); + &exten, type); if (nxt_slow_path(ret != NXT_OK)) { return NXT_ERROR; } @@ -1942,12 +2081,12 @@ nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf, nxt_conf_get_string(value, &str); - if (nxt_slow_path(nxt_str_dup(mp, &extension, &str) == NULL)) { + if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) { return NXT_ERROR; } ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash, - &extension, type); + &exten, type); if (nxt_slow_path(ret != NXT_OK)) { return NXT_ERROR; } @@ -1959,6 +2098,79 @@ nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf, } +static nxt_int_t +nxt_router_conf_process_client_ip(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, + nxt_socket_conf_t *skcf, nxt_conf_value_t *conf) +{ + char c; + size_t i; + nxt_mp_t *mp; + uint32_t hash; + nxt_str_t header; + nxt_conf_value_t *source_conf, *header_conf, *recursive_conf; + nxt_http_client_ip_t *client_ip; + nxt_http_route_addr_rule_t *source; + + static nxt_str_t header_path = nxt_string("/header"); + static nxt_str_t source_path = nxt_string("/source"); + static nxt_str_t recursive_path = nxt_string("/recursive"); + + if (conf == NULL) { + skcf->client_ip = NULL; + + return NXT_OK; + } + + mp = tmcf->router_conf->mem_pool; + + source_conf = nxt_conf_get_path(conf, &source_path); + header_conf = nxt_conf_get_path(conf, &header_path); + recursive_conf = nxt_conf_get_path(conf, &recursive_path); + + if (source_conf == NULL || header_conf == NULL) { + return NXT_ERROR; + } + + client_ip = nxt_mp_zget(mp, sizeof(nxt_http_client_ip_t)); + if (nxt_slow_path(client_ip == NULL)) { + return NXT_ERROR; + } + + source = nxt_http_route_addr_rule_create(task, mp, source_conf); + if (nxt_slow_path(source == NULL)) { + return NXT_ERROR; + } + + client_ip->source = source; + + nxt_conf_get_string(header_conf, &header); + + if (recursive_conf != NULL) { + client_ip->recursive = nxt_conf_get_boolean(recursive_conf); + } + + client_ip->header = nxt_str_dup(mp, NULL, &header); + if (nxt_slow_path(client_ip->header == NULL)) { + return NXT_ERROR; + } + + hash = NXT_HTTP_FIELD_HASH_INIT; + + for (i = 0; i < client_ip->header->length; i++) { + c = client_ip->header->start[i]; + hash = nxt_http_field_hash_char(hash, nxt_lowcase(c)); + } + + hash = nxt_http_field_hash_end(hash) & 0xFFFF; + + client_ip->header_hash = hash; + + skcf->client_ip = client_ip; + + return NXT_OK; +} + + static nxt_app_t * nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name) { @@ -2135,21 +2347,46 @@ nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, int i) } +typedef struct { + nxt_app_t *app; + nxt_int_t target; +} nxt_http_app_conf_t; + nxt_int_t -nxt_router_listener_application(nxt_router_conf_t *rtcf, nxt_str_t *name, - nxt_http_action_t *action) +nxt_router_application_init(nxt_router_conf_t *rtcf, nxt_str_t *name, + nxt_str_t *target, nxt_http_action_t *action) { - nxt_app_t *app; + nxt_app_t *app; + nxt_str_t *targets; + nxt_uint_t i; + nxt_http_app_conf_t *conf; app = nxt_router_apps_hash_get(rtcf, name); - if (app == NULL) { return NXT_DECLINED; } - action->u.app.application = app; + conf = nxt_mp_get(rtcf->mem_pool, sizeof(nxt_http_app_conf_t)); + if (nxt_slow_path(conf == NULL)) { + return NXT_ERROR; + } + action->handler = nxt_http_application_handler; + action->u.conf = conf; + + conf->app = app; + + if (target != NULL && target->length != 0) { + targets = app->targets; + + for (i = 0; !nxt_strstr_eq(target, &targets[i]); i++); + + conf->target = i; + + } else { + conf->target = 0; + } return NXT_OK; } @@ -2297,7 +2534,7 @@ nxt_router_listen_socket_rpc_create(nxt_task_t *task, goto fail; } - b->completion_handler = nxt_router_dummy_buf_completion; + b->completion_handler = nxt_buf_dummy_completion; b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size); @@ -2466,6 +2703,8 @@ nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, tlscf = tls->socket_conf->tls; } + tls->tls_init->conf = tlscf; + bundle = nxt_mp_get(mp, sizeof(nxt_tls_bundle_conf_t)); if (nxt_slow_path(bundle == NULL)) { goto fail; @@ -2479,8 +2718,8 @@ nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, bundle->next = tlscf->bundle; tlscf->bundle = bundle; - ret = task->thread->runtime->tls->server_init(task, tlscf, mp, - tls->conf_cmds, tls->last); + ret = task->thread->runtime->tls->server_init(task, mp, tls->tls_init, + tls->last); if (nxt_slow_path(ret != NXT_OK)) { goto fail; } @@ -2526,7 +2765,7 @@ nxt_router_app_rpc_create(nxt_task_t *task, goto fail; } - b->completion_handler = nxt_router_dummy_buf_completion; + b->completion_handler = nxt_buf_dummy_completion; nxt_buf_cpystr(b, &app->name); *b->mem.free++ = '\0'; @@ -3555,7 +3794,7 @@ nxt_router_access_log_open(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) goto fail; } - b->completion_handler = nxt_router_dummy_buf_completion; + b->completion_handler = nxt_buf_dummy_completion; nxt_buf_cpystr(b, &access_log->path); *b->mem.free++ = '\0'; @@ -4183,11 +4422,16 @@ static void nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) { - nxt_app_t *app; - nxt_port_t *port; - nxt_app_joint_t *app_joint; + nxt_app_t *app; + nxt_bool_t start_process; + nxt_port_t *port; + nxt_app_joint_t *app_joint; + nxt_app_joint_rpc_t *app_joint_rpc; + + nxt_assert(data != NULL); - app_joint = data; + app_joint_rpc = data; + app_joint = app_joint_rpc->app_joint; port = msg->u.new_port; nxt_assert(app_joint != NULL); @@ -4207,14 +4451,37 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, return; } - port->app = app; - port->main_app_port = port; - nxt_thread_mutex_lock(&app->mutex); nxt_assert(app->pending_processes != 0); app->pending_processes--; + + if (nxt_slow_path(app->generation != app_joint_rpc->generation)) { + nxt_debug(task, "new port ready for restarted app, send QUIT"); + + start_process = !task->thread->engine->shutdown + && nxt_router_app_can_start(app) + && nxt_router_app_need_start(app); + + if (start_process) { + app->pending_processes++; + } + + nxt_thread_mutex_unlock(&app->mutex); + + nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); + + if (start_process) { + nxt_router_start_app_process(task, app); + } + + return; + } + + port->app = app; + port->main_app_port = port; + app->processes++; nxt_port_hash_add(&app->port_hash, port); app->port_hash_count++; @@ -4268,12 +4535,16 @@ static void nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) { - nxt_app_t *app; - nxt_app_joint_t *app_joint; - nxt_queue_link_t *link; - nxt_http_request_t *r; + nxt_app_t *app; + nxt_app_joint_t *app_joint; + nxt_queue_link_t *link; + nxt_http_request_t *r; + nxt_app_joint_rpc_t *app_joint_rpc; + + nxt_assert(data != NULL); - app_joint = data; + app_joint_rpc = data; + app_joint = app_joint_rpc->app_joint; nxt_assert(app_joint != NULL); @@ -4440,7 +4711,7 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, port->pid, port->id, (int) inc_use, (int) got_response); - if (port == app->shared_port) { + if (port->id == NXT_SHARED_PORT_ID) { nxt_thread_mutex_lock(&app->mutex); app->active_requests -= got_response + dec_requests; @@ -4810,6 +5081,8 @@ nxt_router_free_app(nxt_task_t *task, void *obj, void *data) app->shared_port->app = NULL; nxt_port_close(task, app->shared_port); nxt_port_use(task, app->shared_port, -1); + + app->shared_port = NULL; } nxt_thread_mutex_destroy(&app->mutex); @@ -4876,13 +5149,17 @@ nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app, void nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r, - nxt_app_t *app) + nxt_http_action_t *action) { nxt_event_engine_t *engine; + nxt_http_app_conf_t *conf; nxt_request_rpc_data_t *req_rpc_data; + conf = action->u.conf; engine = task->thread->engine; + r->app_target = conf->target; + req_rpc_data = nxt_port_rpc_register_handler_ex(task, engine->port, nxt_router_response_ready_handler, nxt_router_response_error_handler, @@ -4913,11 +5190,11 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r, r->err_work.obj = r; req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data); - req_rpc_data->app = app; + req_rpc_data->app = conf->app; req_rpc_data->msg_info.body_fd = -1; req_rpc_data->rpc_cancel = 1; - nxt_router_app_use(task, app, 1); + nxt_router_app_use(task, conf->app, 1); req_rpc_data->request = r; r->req_rpc_data = req_rpc_data; @@ -4926,7 +5203,7 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r, r->last->completion_handler = nxt_router_http_request_done; } - nxt_router_app_port_get(task, app, req_rpc_data); + nxt_router_app_port_get(task, conf->app, req_rpc_data); nxt_router_app_prepare_request(task, req_rpc_data); } @@ -4968,12 +5245,6 @@ nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data) static void -nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj, void *data) -{ -} - - -static void nxt_router_app_prepare_request(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data) { |