summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_router.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/nxt_router.c630
1 files changed, 545 insertions, 85 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c
index 1ba7072d..19a37c69 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -12,11 +12,15 @@
typedef struct {
nxt_str_t type;
- uint32_t workers;
+ uint32_t processes;
+ uint32_t max_processes;
+ uint32_t spare_processes;
nxt_msec_t timeout;
nxt_msec_t res_timeout;
+ nxt_msec_t idle_timeout;
uint32_t requests;
nxt_conf_value_t *limits_value;
+ nxt_conf_value_t *processes_value;
} nxt_router_app_conf_t;
@@ -76,6 +80,12 @@ typedef struct {
} nxt_socket_rpc_t;
+typedef struct {
+ nxt_app_t *app;
+ nxt_router_temp_conf_t *temp_conf;
+} nxt_app_rpc_t;
+
+
struct nxt_port_select_state_s {
nxt_app_t *app;
nxt_req_app_link_t *ra;
@@ -83,7 +93,7 @@ struct nxt_port_select_state_s {
nxt_port_t *failed_port;
int failed_port_use_delta;
- nxt_bool_t can_start_worker;
+ uint8_t start_process; /* 1 bit */
nxt_req_app_link_t *shared_ra;
nxt_port_t *port;
};
@@ -96,7 +106,7 @@ static void nxt_router_port_select(nxt_task_t *task,
static nxt_int_t nxt_router_port_post_select(nxt_task_t *task,
nxt_port_select_state_t *state);
-static nxt_int_t nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app);
+static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
nxt_inline void
nxt_router_ra_inc_use(nxt_req_app_link_t *ra)
@@ -136,6 +146,12 @@ static void nxt_router_listen_socket_ready(nxt_task_t *task,
nxt_port_recv_msg_t *msg, void *data);
static void nxt_router_listen_socket_error(nxt_task_t *task,
nxt_port_recv_msg_t *msg, void *data);
+static void nxt_router_app_rpc_create(nxt_task_t *task,
+ nxt_router_temp_conf_t *tmcf, nxt_app_t *app);
+static void nxt_router_app_prefork_ready(nxt_task_t *task,
+ nxt_port_recv_msg_t *msg, void *data);
+static void nxt_router_app_prefork_error(nxt_task_t *task,
+ nxt_port_recv_msg_t *msg, void *data);
static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
@@ -193,7 +209,7 @@ static void nxt_router_app_port_ready(nxt_task_t *task,
static void nxt_router_app_port_error(nxt_task_t *task,
nxt_port_recv_msg_t *msg, void *data);
-static nxt_port_t * nxt_router_app_get_idle_port(nxt_app_t *app);
+static void nxt_router_app_quit(nxt_task_t *task, nxt_app_t *app);
static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
uint32_t request_failed, uint32_t got_response);
static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_app_t *app,
@@ -209,6 +225,12 @@ static nxt_int_t nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
nxt_app_wmsg_t *wmsg);
static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data);
static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
+static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj,
+ void *data);
+static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj,
+ void *data);
+static void nxt_router_app_release_handler(nxt_task_t *task, void *obj,
+ void *data);
static const nxt_http_request_state_t nxt_http_request_send_state;
static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
@@ -253,7 +275,8 @@ nxt_router_start(nxt_task_t *task, void *data)
static void
-nxt_router_start_worker_handler(nxt_task_t *task, nxt_port_t *port, void *data)
+nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
+ void *data)
{
size_t size;
uint32_t stream;
@@ -268,7 +291,7 @@ nxt_router_start_worker_handler(nxt_task_t *task, nxt_port_t *port, void *data)
rt = task->thread->runtime;
main_port = rt->port_by_type[NXT_PROCESS_MAIN];
- nxt_debug(task, "app '%V' %p start worker", &app->name, app);
+ nxt_debug(task, "app '%V' %p start process", &app->name, app);
size = app->name.length + 1 + app->conf.length;
@@ -304,7 +327,7 @@ failed:
nxt_thread_mutex_lock(&app->mutex);
- app->pending_workers--;
+ app->pending_processes--;
nxt_thread_mutex_unlock(&app->mutex);
@@ -313,7 +336,7 @@ failed:
static nxt_int_t
-nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app)
+nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
{
nxt_int_t res;
nxt_port_t *router_port;
@@ -324,7 +347,7 @@ nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app)
nxt_router_app_use(task, app, 1);
- res = nxt_port_post(task, router_port, nxt_router_start_worker_handler,
+ res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler,
app);
if (res == NXT_OK) {
@@ -333,7 +356,7 @@ nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app)
nxt_thread_mutex_lock(&app->mutex);
- app->pending_workers--;
+ app->pending_processes--;
nxt_thread_mutex_unlock(&app->mutex);
@@ -737,7 +760,8 @@ nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
static void
-nxt_router_worker_remove_pid(nxt_task_t *task, nxt_port_t *port, void *data)
+nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port,
+ void *data)
{
union {
nxt_pid_t removed_pid;
@@ -763,7 +787,7 @@ nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
{
- nxt_port_post(task, engine->port, nxt_router_worker_remove_pid,
+ nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid,
msg->u.data);
}
nxt_queue_loop;
@@ -837,10 +861,27 @@ fail:
}
+nxt_inline nxt_bool_t
+nxt_router_app_can_start(nxt_app_t *app)
+{
+ return app->processes + app->pending_processes < app->max_processes
+ && app->pending_processes < app->max_pending_processes;
+}
+
+
+nxt_inline nxt_bool_t
+nxt_router_app_need_start(nxt_app_t *app)
+{
+ return app->idle_processes + app->pending_processes
+ < app->spare_processes;
+}
+
+
static void
nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
{
nxt_int_t ret;
+ nxt_app_t *app;
nxt_router_t *router;
nxt_runtime_t *rt;
nxt_queue_link_t *qlk;
@@ -863,6 +904,15 @@ nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
return;
}
+ nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
+
+ if (nxt_router_app_need_start(app)) {
+ nxt_router_app_rpc_create(task, tmcf, app);
+ return;
+ }
+
+ } nxt_queue_loop;
+
rt = task->thread->runtime;
interface = nxt_service_get(rt->services, "engine", NULL);
@@ -923,6 +973,7 @@ nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
static void
nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
{
+ nxt_app_t *app;
nxt_socket_t s;
nxt_router_t *router;
nxt_queue_link_t *qlk;
@@ -944,6 +995,12 @@ nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
nxt_free(skcf->listen);
}
+ nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
+
+ nxt_router_app_quit(task, app);
+
+ } nxt_queue_loop;
+
router = tmcf->conf->router;
nxt_queue_add(&router->sockets, &tmcf->keeping);
@@ -984,15 +1041,21 @@ static nxt_conf_map_t nxt_router_app_conf[] = {
},
{
- nxt_string("workers"),
+ nxt_string("limits"),
+ NXT_CONF_MAP_PTR,
+ offsetof(nxt_router_app_conf_t, limits_value),
+ },
+
+ {
+ nxt_string("processes"),
NXT_CONF_MAP_INT32,
- offsetof(nxt_router_app_conf_t, workers),
+ offsetof(nxt_router_app_conf_t, processes),
},
{
- nxt_string("limits"),
+ nxt_string("processes"),
NXT_CONF_MAP_PTR,
- offsetof(nxt_router_app_conf_t, limits_value),
+ offsetof(nxt_router_app_conf_t, processes_value),
},
};
@@ -1018,6 +1081,27 @@ static nxt_conf_map_t nxt_router_app_limits_conf[] = {
};
+static nxt_conf_map_t nxt_router_app_processes_conf[] = {
+ {
+ nxt_string("spare"),
+ NXT_CONF_MAP_INT32,
+ offsetof(nxt_router_app_conf_t, spare_processes),
+ },
+
+ {
+ nxt_string("max"),
+ NXT_CONF_MAP_INT32,
+ offsetof(nxt_router_app_conf_t, max_processes),
+ },
+
+ {
+ nxt_string("idle_timeout"),
+ NXT_CONF_MAP_MSEC,
+ offsetof(nxt_router_app_conf_t, idle_timeout),
+ },
+};
+
+
static nxt_conf_map_t nxt_router_listener_conf[] = {
{
nxt_string("application"),
@@ -1100,6 +1184,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_conf_value_t *applications, *application;
nxt_conf_value_t *listeners, *listener;
nxt_socket_conf_t *skcf;
+ nxt_event_engine_t *engine;
nxt_app_lang_module_t *lang;
nxt_router_app_conf_t apcf;
nxt_router_listener_conf_t lscf;
@@ -1174,11 +1259,15 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
continue;
}
- apcf.workers = 1;
+ apcf.processes = 1;
+ apcf.max_processes = 1;
+ apcf.spare_processes = 1;
apcf.timeout = 0;
apcf.res_timeout = 1000;
+ apcf.idle_timeout = 15000;
apcf.requests = 0;
apcf.limits_value = NULL;
+ apcf.processes_value = NULL;
ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
nxt_nitems(nxt_router_app_conf), &apcf);
@@ -1204,10 +1293,27 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
}
}
+ if (apcf.processes_value != NULL
+ && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT)
+ {
+ ret = nxt_conf_map_object(mp, apcf.processes_value,
+ nxt_router_app_processes_conf,
+ nxt_nitems(nxt_router_app_processes_conf),
+ &apcf);
+ if (ret != NXT_OK) {
+ nxt_log(task, NXT_LOG_CRIT, "application processes map error");
+ goto app_fail;
+ }
+
+ } else {
+ apcf.max_processes = apcf.processes;
+ apcf.spare_processes = apcf.processes;
+ }
+
nxt_debug(task, "application type: %V", &apcf.type);
- nxt_debug(task, "application workers: %D", apcf.workers);
- nxt_debug(task, "application request timeout: %D", apcf.timeout);
- nxt_debug(task, "application reschedule timeout: %D", apcf.res_timeout);
+ nxt_debug(task, "application processes: %D", apcf.processes);
+ nxt_debug(task, "application request timeout: %M", apcf.timeout);
+ nxt_debug(task, "application reschedule timeout: %M", apcf.res_timeout);
nxt_debug(task, "application requests: %D", apcf.requests);
lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
@@ -1226,6 +1332,8 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
}
nxt_queue_init(&app->ports);
+ nxt_queue_init(&app->spare_ports);
+ nxt_queue_init(&app->idle_ports);
nxt_queue_init(&app->requests);
nxt_queue_init(&app->pending);
@@ -1233,14 +1341,32 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_memcpy(app->name.start, name.start, name.length);
app->type = lang->type;
- app->max_workers = apcf.workers;
+ app->max_processes = apcf.max_processes;
+ app->spare_processes = apcf.spare_processes;
+ app->max_pending_processes = apcf.spare_processes
+ ? apcf.spare_processes : 1;
app->timeout = apcf.timeout;
app->res_timeout = apcf.res_timeout * 1000000;
+ app->idle_timeout = apcf.idle_timeout;
app->live = 1;
app->max_pending_responses = 2;
app->max_requests = apcf.requests;
app->prepare_msg = nxt_app_prepare_msg[lang->type];
+ engine = task->thread->engine;
+
+ app->engine = engine;
+
+ app->idle_timer.precision = NXT_TIMER_DEFAULT_PRECISION;
+ app->idle_timer.work_queue = &engine->fast_work_queue;
+ app->idle_timer.handler = nxt_router_app_idle_timeout;
+ app->idle_timer.task = &engine->task;
+ app->idle_timer.log = app->idle_timer.task->log;
+
+ app->adjust_idle_work.handler = nxt_router_adjust_idle_timer;
+ app->adjust_idle_work.task = &engine->task;
+ app->adjust_idle_work.obj = app;
+
nxt_queue_insert_tail(&tmcf->apps, &app->link);
nxt_router_app_use(task, app, 1);
@@ -1624,6 +1750,119 @@ nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
}
+static void
+nxt_router_app_rpc_create(nxt_task_t *task,
+ nxt_router_temp_conf_t *tmcf, nxt_app_t *app)
+{
+ size_t size;
+ uint32_t stream;
+ nxt_buf_t *b;
+ nxt_port_t *main_port, *router_port;
+ nxt_runtime_t *rt;
+ nxt_app_rpc_t *rpc;
+
+ rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_app_rpc_t));
+ if (rpc == NULL) {
+ goto fail;
+ }
+
+ rpc->app = app;
+ rpc->temp_conf = tmcf;
+
+ nxt_debug(task, "app '%V' prefork", &app->name);
+
+ size = app->name.length + 1 + app->conf.length;
+
+ b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
+ if (nxt_slow_path(b == NULL)) {
+ goto fail;
+ }
+
+ nxt_buf_cpystr(b, &app->name);
+ *b->mem.free++ = '\0';
+ nxt_buf_cpystr(b, &app->conf);
+
+ rt = task->thread->runtime;
+ main_port = rt->port_by_type[NXT_PROCESS_MAIN];
+ router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
+
+ stream = nxt_port_rpc_register_handler(task, router_port,
+ nxt_router_app_prefork_ready,
+ nxt_router_app_prefork_error,
+ -1, rpc);
+ if (nxt_slow_path(stream == 0)) {
+ goto fail;
+ }
+
+ app->pending_processes++;
+
+ nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1,
+ stream, router_port->id, b);
+
+ return;
+
+fail:
+
+ nxt_router_conf_error(task, tmcf);
+}
+
+
+static void
+nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
+ void *data)
+{
+ nxt_app_t *app;
+ nxt_port_t *port;
+ nxt_app_rpc_t *rpc;
+ nxt_event_engine_t *engine;
+
+ rpc = data;
+ app = rpc->app;
+
+ port = msg->u.new_port;
+ port->app = app;
+
+ nxt_router_app_use(task, app, 1);
+
+ app->pending_processes--;
+ app->processes++;
+ app->idle_processes++;
+
+ engine = task->thread->engine;
+
+ nxt_queue_insert_tail(&app->ports, &port->app_link);
+ nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
+
+ port->idle_start = 0;
+
+ nxt_port_inc_use(port);
+
+ nxt_work_queue_add(&engine->fast_work_queue,
+ nxt_router_conf_apply, task, rpc->temp_conf, NULL);
+}
+
+
+static void
+nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
+ void *data)
+{
+ nxt_app_t *app;
+ nxt_app_rpc_t *rpc;
+ nxt_router_temp_conf_t *tmcf;
+
+ rpc = data;
+ app = rpc->app;
+ tmcf = rpc->temp_conf;
+
+ nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"",
+ &app->name);
+
+ app->pending_processes--;
+
+ nxt_router_conf_error(task, tmcf);
+}
+
+
static nxt_int_t
nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
@@ -1933,32 +2172,11 @@ static void
nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
nxt_router_temp_conf_t *tmcf)
{
- nxt_app_t *app;
- nxt_port_t *port;
+ nxt_app_t *app;
nxt_queue_each(app, &router->apps, nxt_app_t, link) {
- nxt_queue_remove(&app->link);
-
- nxt_debug(task, "about to free app '%V' %p", &app->name, app);
-
- app->live = 0;
-
- do {
- port = nxt_router_app_get_idle_port(app);
- if (port == NULL) {
- break;
- }
-
- nxt_debug(task, "port %p send quit", port);
-
- nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0,
- NULL);
-
- nxt_port_use(task, port, -1);
- } while (1);
-
- nxt_router_app_use(task, app, -1);
+ nxt_router_app_quit(task, app);
} nxt_queue_loop;
@@ -2454,6 +2672,7 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
if (nxt_buf_mem_used_size(&b->mem) == 0) {
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
b->completion_handler, task, b, b->parent);
+
} else {
nxt_buf_chain_add(&r->out, b);
}
@@ -2561,14 +2780,15 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_thread_mutex_lock(&app->mutex);
- nxt_assert(app->pending_workers != 0);
+ nxt_assert(app->pending_processes != 0);
- app->pending_workers--;
- app->workers++;
+ app->pending_processes--;
+ app->processes++;
nxt_thread_mutex_unlock(&app->mutex);
- nxt_debug(task, "app '%V' %p new port ready", &app->name, app);
+ nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d",
+ &app->name, port->pid, app->processes, app->pending_processes);
nxt_router_app_port_release(task, port, 0, 0);
}
@@ -2590,9 +2810,9 @@ nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_thread_mutex_lock(&app->mutex);
- nxt_assert(app->pending_workers != 0);
+ nxt_assert(app->pending_processes != 0);
- app->pending_workers--;
+ app->pending_processes--;
if (!nxt_queue_is_empty(&app->requests)) {
lnk = nxt_queue_last(&app->requests);
@@ -2611,7 +2831,7 @@ nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_debug(task, "app '%V' %p abort next stream #%uD",
&app->name, app, ra->stream);
- nxt_router_ra_error(ra, 500, "Failed to start application worker");
+ nxt_router_ra_error(ra, 500, "Failed to start application process");
nxt_router_ra_use(task, ra, -1);
}
@@ -2629,10 +2849,13 @@ nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i)
if (i < 0 && c == -i) {
nxt_assert(app->live == 0);
- nxt_assert(app->workers == 0);
- nxt_assert(app->pending_workers == 0);
+ nxt_assert(app->processes == 0);
+ nxt_assert(app->idle_processes == 0);
+ nxt_assert(app->pending_processes == 0);
nxt_assert(nxt_queue_is_empty(&app->requests) != 0);
nxt_assert(nxt_queue_is_empty(&app->ports) != 0);
+ nxt_assert(nxt_queue_is_empty(&app->spare_ports) != 0);
+ nxt_assert(nxt_queue_is_empty(&app->idle_ports) != 0);
nxt_thread_mutex_destroy(&app->mutex);
nxt_free(app);
@@ -2666,6 +2889,19 @@ nxt_router_pop_first_port(nxt_app_t *app)
port->app_pending_responses++;
+ if (nxt_queue_chk_remove(&port->idle_link)) {
+ app->idle_processes--;
+
+ if (port->idle_start == 0) {
+ nxt_assert(app->idle_processes < app->spare_processes);
+
+ } else {
+ nxt_assert(app->idle_processes >= app->spare_processes);
+
+ port->idle_start = 0;
+ }
+ }
+
if ((app->max_pending_responses == 0
|| port->app_pending_responses < app->max_pending_responses)
&& (app->max_requests == 0
@@ -2684,8 +2920,8 @@ nxt_router_pop_first_port(nxt_app_t *app)
}
-static nxt_port_t *
-nxt_router_app_get_idle_port(nxt_app_t *app)
+nxt_inline nxt_port_t *
+nxt_router_app_get_port_for_quit(nxt_app_t *app)
{
nxt_port_t *port;
@@ -2701,8 +2937,16 @@ nxt_router_app_get_idle_port(nxt_app_t *app)
continue;
}
- nxt_queue_remove(&port->app_link);
- port->app_link.next = NULL;
+ /* Caller is responsible to decrease port use count. */
+ nxt_queue_chk_remove(&port->app_link);
+
+ if (nxt_queue_chk_remove(&port->idle_link)) {
+ app->idle_processes--;
+ }
+
+ /* Caller is responsible to decrease app use count. */
+ port->app = NULL;
+ app->processes--;
break;
@@ -2715,6 +2959,41 @@ nxt_router_app_get_idle_port(nxt_app_t *app)
static void
+nxt_router_app_quit(nxt_task_t *task, nxt_app_t *app)
+{
+ nxt_port_t *port;
+
+ nxt_queue_remove(&app->link);
+
+ app->live = 0;
+
+ for ( ;; ) {
+ port = nxt_router_app_get_port_for_quit(app);
+ if (port == NULL) {
+ break;
+ }
+
+ nxt_debug(task, "send QUIT to app '%V' pid %PI", &app->name, port->pid);
+
+ nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
+
+ nxt_port_use(task, port, -1);
+ nxt_router_app_use(task, app, -1);
+ }
+
+ if (nxt_timer_is_in_tree(&app->idle_timer)) {
+ nxt_assert(app->engine == task->thread->engine);
+
+ app->idle_timer.handler = nxt_router_app_release_handler;
+ nxt_timer_add(app->engine, &app->idle_timer, 0);
+
+ } else {
+ nxt_router_app_use(task, app, -1);
+ }
+}
+
+
+static void
nxt_router_app_process_request(nxt_task_t *task, void *obj, void *data)
{
nxt_app_t *app;
@@ -2739,7 +3018,8 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
uint32_t request_failed, uint32_t got_response)
{
nxt_app_t *app;
- nxt_bool_t send_quit, cancelled;
+ nxt_bool_t port_unchained;
+ nxt_bool_t send_quit, cancelled, adjust_idle_timer;
nxt_queue_link_t *lnk;
nxt_req_app_link_t *ra, *pending_ra, *re_ra;
nxt_port_select_state_t state;
@@ -2856,12 +3136,52 @@ app_dead:
re_ra_cancelled:
- send_quit = (app->live == 0 && port->app_pending_responses > 0)
+ send_quit = (app->live == 0 && port->app_pending_responses == 0)
|| (app->max_requests > 0 && port->app_pending_responses == 0
&& port->app_responses >= app->max_requests);
+ if (send_quit) {
+ port_unchained = nxt_queue_chk_remove(&port->app_link);
+
+ port->app = NULL;
+ app->processes--;
+
+ } else {
+ port_unchained = 0;
+ }
+
+ adjust_idle_timer = 0;
+
+ if (!send_quit && port->app_pending_responses == 0) {
+ nxt_assert(port->idle_link.next == NULL);
+
+ if (app->idle_processes == app->spare_processes
+ && app->adjust_idle_work.data == NULL)
+ {
+ adjust_idle_timer = 1;
+ app->adjust_idle_work.data = app;
+ app->adjust_idle_work.next = NULL;
+ }
+
+ if (app->idle_processes < app->spare_processes) {
+ nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
+
+ } else {
+ nxt_queue_insert_tail(&app->idle_ports, &port->idle_link);
+
+ port->idle_start = task->thread->engine->timers.now;
+ }
+
+ app->idle_processes++;
+ }
+
nxt_thread_mutex_unlock(&app->mutex);
+ if (adjust_idle_timer) {
+ nxt_router_app_use(task, app, 1);
+ nxt_event_engine_post(app->engine, &app->adjust_idle_work);
+ }
+
if (pending_ra != NULL) {
nxt_router_ra_use(task, pending_ra, -1);
}
@@ -2893,12 +3213,18 @@ re_ra_cancelled:
}
if (send_quit) {
- nxt_debug(task, "app '%V' %p is not alive, send QUIT to port",
+ nxt_debug(task, "app '%V' %p send QUIT to port",
&app->name, app);
nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT,
-1, 0, 0, NULL);
+ if (port_unchained) {
+ nxt_port_use(task, port, -1);
+ }
+
+ nxt_router_app_use(task, app, -1);
+
goto adjust_use;
}
@@ -2916,8 +3242,10 @@ adjust_use:
void
nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port)
{
- nxt_app_t *app;
- nxt_bool_t unchain, start_worker;
+ nxt_app_t *app;
+ nxt_bool_t unchain, start_process;
+ nxt_port_t *idle_port;
+ nxt_queue_link_t *idle_lnk;
app = port->app;
@@ -2925,41 +3253,162 @@ nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port)
nxt_thread_mutex_lock(&app->mutex);
- unchain = port->app_link.next != NULL;
+ unchain = nxt_queue_chk_remove(&port->app_link);
- if (unchain) {
- nxt_queue_remove(&port->app_link);
- port->app_link.next = NULL;
+ if (nxt_queue_chk_remove(&port->idle_link)) {
+ app->idle_processes--;
+
+ if (port->idle_start == 0
+ && app->idle_processes >= app->spare_processes)
+ {
+ nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
+
+ idle_lnk = nxt_queue_last(&app->idle_ports);
+ idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link);
+ nxt_queue_remove(idle_lnk);
+
+ nxt_queue_insert_tail(&app->spare_ports, idle_lnk);
+
+ idle_port->idle_start = 0;
+ }
}
- app->workers--;
+ app->processes--;
- start_worker = app->live != 0
- && nxt_queue_is_empty(&app->requests) == 0
- && app->workers + app->pending_workers < app->max_workers;
+ start_process = app->live != 0
+ && !task->thread->engine->shutdown
+ && nxt_router_app_can_start(app)
+ && (!nxt_queue_is_empty(&app->requests)
+ || nxt_router_app_need_start(app));
- if (start_worker) {
- app->pending_workers++;
+ if (start_process) {
+ app->pending_processes++;
}
nxt_thread_mutex_unlock(&app->mutex);
- nxt_debug(task, "app '%V' %p port %p close", &app->name, app, port);
+ nxt_debug(task, "app '%V' pid %PI closed", &app->name, port->pid);
if (unchain) {
nxt_port_use(task, port, -1);
}
- if (start_worker) {
- nxt_router_start_worker(task, app);
+ if (start_process) {
+ nxt_router_start_app_process(task, app);
}
}
static void
+nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_app_t *app;
+ nxt_bool_t queued;
+ nxt_port_t *port;
+ nxt_msec_t timeout, threshold;
+ nxt_queue_link_t *lnk;
+ nxt_event_engine_t *engine;
+
+ app = obj;
+ queued = (data == app);
+
+ nxt_debug(task, "nxt_router_adjust_idle_timer: app \"%V\", queued %b",
+ &app->name, queued);
+
+ engine = task->thread->engine;
+
+ nxt_assert(app->engine == engine);
+
+ threshold = engine->timers.now + app->idle_timer.precision;
+ timeout = 0;
+
+ nxt_thread_mutex_lock(&app->mutex);
+
+ if (queued) {
+ app->adjust_idle_work.data = NULL;
+ }
+
+ while (app->idle_processes > app->spare_processes) {
+
+ nxt_assert(nxt_queue_is_empty(&app->idle_ports) == 0);
+
+ lnk = nxt_queue_first(&app->idle_ports);
+ port = nxt_queue_link_data(lnk, nxt_port_t, idle_link);
+
+ timeout = port->idle_start + app->idle_timeout;
+
+ if (timeout > threshold) {
+ break;
+ }
+
+ nxt_queue_remove(lnk);
+ lnk->next = NULL;
+
+ nxt_queue_chk_remove(&port->app_link);
+
+ app->idle_processes--;
+ app->processes--;
+ port->app = NULL;
+
+ nxt_thread_mutex_unlock(&app->mutex);
+
+ nxt_debug(task, "app '%V' send QUIT to idle port %PI",
+ &app->name, port->pid);
+
+ nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
+
+ nxt_port_use(task, port, -1);
+ nxt_router_app_use(task, app, -1);
+
+ nxt_thread_mutex_lock(&app->mutex);
+ }
+
+ nxt_thread_mutex_unlock(&app->mutex);
+
+ if (timeout > threshold) {
+ nxt_timer_add(engine, &app->idle_timer, timeout - threshold);
+
+ } else {
+ nxt_timer_disable(engine, &app->idle_timer);
+ }
+
+ if (queued) {
+ nxt_router_app_use(task, app, -1);
+ }
+}
+
+
+static void
+nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_app_t *app;
+ nxt_timer_t *timer;
+
+ timer = obj;
+ app = nxt_container_of(timer, nxt_app_t, idle_timer);
+
+ nxt_router_adjust_idle_timer(task, app, NULL);
+}
+
+
+static void
+nxt_router_app_release_handler(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_app_t *app;
+ nxt_timer_t *timer;
+
+ timer = obj;
+ app = nxt_container_of(timer, nxt_app_t, idle_timer);
+
+ nxt_router_app_use(task, app, -1);
+}
+
+
+static void
nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state)
{
nxt_app_t *app;
+ nxt_bool_t can_start_process;
nxt_req_app_link_t *ra;
ra = state->ra;
@@ -2996,12 +3445,13 @@ nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state)
ra->app_port = NULL;
}
- state->can_start_worker = (app->workers + app->pending_workers)
- < app->max_workers;
+ can_start_process = nxt_router_app_can_start(app);
+
state->port = NULL;
+ state->start_process = 0;
if (nxt_queue_is_empty(&app->ports)
- || (state->can_start_worker && nxt_router_app_first_port_busy(app)) )
+ || (can_start_process && nxt_router_app_first_port_busy(app)) )
{
ra = nxt_router_ra_create(task, ra);
@@ -3020,8 +3470,9 @@ nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state)
nxt_debug(task, "ra stream #%uD enqueue to app->requests", ra->stream);
- if (state->can_start_worker) {
- app->pending_workers++;
+ if (can_start_process) {
+ app->pending_processes++;
+ state->start_process = 1;
}
} else {
@@ -3038,6 +3489,11 @@ nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state)
nxt_router_ra_pending(task, app, ra);
}
+
+ if (can_start_process && nxt_router_app_need_start(app)) {
+ app->pending_processes++;
+ state->start_process = 1;
+ }
}
fail:
@@ -3077,20 +3533,24 @@ nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state)
ra->app_port = state->port;
+ if (state->start_process) {
+ nxt_router_start_app_process(task, app);
+ }
+
return NXT_OK;
}
- if (!state->can_start_worker) {
- nxt_debug(task, "app '%V' %p too many running or pending workers",
+ if (!state->start_process) {
+ nxt_debug(task, "app '%V' %p too many running or pending processes",
&app->name, app);
return NXT_AGAIN;
}
- res = nxt_router_start_worker(task, app);
+ res = nxt_router_start_app_process(task, app);
if (nxt_slow_path(res != NXT_OK)) {
- nxt_router_ra_error(ra, 500, "Failed to start worker");
+ nxt_router_ra_error(ra, 500, "Failed to start app process");
nxt_router_ra_use(task, ra, -1);
return NXT_ERROR;
@@ -3227,7 +3687,7 @@ nxt_router_app_prepare_request(nxt_task_t *task, nxt_req_app_link_t *ra)
goto release_port;
}
- nxt_debug(task, "about to send %O bytes buffer to worker port %d",
+ nxt_debug(task, "about to send %O bytes buffer to app process port %d",
nxt_buf_used_size(wmsg.write),
wmsg.port->socket.fd);