summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/nxt_application.h2
-rw-r--r--src/nxt_conf.c7
-rw-r--r--src/nxt_conf.h1
-rw-r--r--src/nxt_conf_validation.c143
-rw-r--r--src/nxt_main_process.c6
-rw-r--r--src/nxt_port.c1
-rw-r--r--src/nxt_port.h3
-rw-r--r--src/nxt_router.c630
-rw-r--r--src/nxt_router.h17
-rw-r--r--test/test_configuration.py30
-rw-r--r--test/test_php_basic.py18
-rw-r--r--test/test_python_application.py2
-rw-r--r--test/test_python_atexit.py2
-rw-r--r--test/test_python_basic.py18
-rw-r--r--test/test_python_keepalive.py2
-rw-r--r--test/test_python_procman.py237
16 files changed, 982 insertions, 137 deletions
diff --git a/src/nxt_application.h b/src/nxt_application.h
index 016214b3..d36d8cd1 100644
--- a/src/nxt_application.h
+++ b/src/nxt_application.h
@@ -60,8 +60,6 @@ struct nxt_common_app_conf_s {
char *working_directory;
- uint32_t workers;
-
union {
nxt_python_app_conf_t python;
nxt_php_app_conf_t php;
diff --git a/src/nxt_conf.c b/src/nxt_conf.c
index 872a62af..684135cf 100644
--- a/src/nxt_conf.c
+++ b/src/nxt_conf.c
@@ -163,6 +163,13 @@ nxt_conf_get_string(nxt_conf_value_t *value, nxt_str_t *str)
}
+int64_t
+nxt_conf_get_integer(nxt_conf_value_t *value)
+{
+ return value->u.integer;
+}
+
+
nxt_uint_t
nxt_conf_object_members_count(nxt_conf_value_t *value)
{
diff --git a/src/nxt_conf.h b/src/nxt_conf.h
index d0c898a4..a7675d3b 100644
--- a/src/nxt_conf.h
+++ b/src/nxt_conf.h
@@ -100,6 +100,7 @@ void nxt_conf_json_position(u_char *start, u_char *pos, nxt_uint_t *line,
nxt_int_t nxt_conf_validate(nxt_conf_validation_t *vldt);
void nxt_conf_get_string(nxt_conf_value_t *value, nxt_str_t *str);
+int64_t nxt_conf_get_integer(nxt_conf_value_t *value);
// FIXME reimplement and reorder functions below
nxt_uint_t nxt_conf_object_members_count(nxt_conf_value_t *value);
diff --git a/src/nxt_conf_validation.c b/src/nxt_conf_validation.c
index cb17a62b..e4109b00 100644
--- a/src/nxt_conf_validation.c
+++ b/src/nxt_conf_validation.c
@@ -54,6 +54,8 @@ static nxt_int_t nxt_conf_vldt_app(nxt_conf_validation_t *vldt,
nxt_str_t *name, nxt_conf_value_t *value);
static nxt_int_t nxt_conf_vldt_object(nxt_conf_validation_t *vldt,
nxt_conf_value_t *value, void *data);
+static nxt_int_t nxt_conf_vldt_processes(nxt_conf_validation_t *vldt,
+ nxt_conf_value_t *value, void *data);
static nxt_int_t nxt_conf_vldt_object_iterator(nxt_conf_validation_t *vldt,
nxt_conf_value_t *value, void *data);
static nxt_int_t nxt_conf_vldt_system(nxt_conf_validation_t *vldt,
@@ -107,22 +109,42 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_app_limits_members[] = {
};
-static nxt_conf_vldt_object_t nxt_conf_vldt_common_members[] = {
- { nxt_string("type"),
- NXT_CONF_VLDT_STRING,
+static nxt_conf_vldt_object_t nxt_conf_vldt_app_processes_members[] = {
+ { nxt_string("spare"),
+ NXT_CONF_VLDT_INTEGER,
+ NULL,
+ NULL },
+
+ { nxt_string("max"),
+ NXT_CONF_VLDT_INTEGER,
NULL,
NULL },
- { nxt_string("workers"),
+ { nxt_string("idle_timeout"),
NXT_CONF_VLDT_INTEGER,
NULL,
NULL },
+ NXT_CONF_VLDT_END
+};
+
+
+static nxt_conf_vldt_object_t nxt_conf_vldt_common_members[] = {
+ { nxt_string("type"),
+ NXT_CONF_VLDT_STRING,
+ NULL,
+ NULL },
+
{ nxt_string("limits"),
NXT_CONF_VLDT_OBJECT,
&nxt_conf_vldt_object,
(void *) &nxt_conf_vldt_app_limits_members },
+ { nxt_string("processes"),
+ NXT_CONF_VLDT_INTEGER | NXT_CONF_VLDT_OBJECT,
+ &nxt_conf_vldt_processes,
+ (void *) &nxt_conf_vldt_app_processes_members },
+
{ nxt_string("user"),
NXT_CONF_VLDT_STRING,
nxt_conf_vldt_system,
@@ -475,6 +497,119 @@ nxt_conf_vldt_object(nxt_conf_validation_t *vldt, nxt_conf_value_t *value,
}
+typedef struct {
+ int64_t spare;
+ int64_t max;
+ int64_t idle_timeout;
+} nxt_conf_vldt_processes_conf_t;
+
+
+static nxt_conf_map_t nxt_conf_vldt_processes_conf_map[] = {
+ {
+ nxt_string("spare"),
+ NXT_CONF_MAP_INT64,
+ offsetof(nxt_conf_vldt_processes_conf_t, spare),
+ },
+
+ {
+ nxt_string("max"),
+ NXT_CONF_MAP_INT64,
+ offsetof(nxt_conf_vldt_processes_conf_t, max),
+ },
+
+ {
+ nxt_string("idle_timeout"),
+ NXT_CONF_MAP_INT64,
+ offsetof(nxt_conf_vldt_processes_conf_t, idle_timeout),
+ },
+};
+
+
+static nxt_int_t
+nxt_conf_vldt_processes(nxt_conf_validation_t *vldt, nxt_conf_value_t *value,
+ void *data)
+{
+ int64_t int_value;
+ nxt_int_t ret;
+ nxt_conf_vldt_processes_conf_t proc;
+
+ static nxt_str_t max_str = nxt_string("max");
+
+ if (nxt_conf_type(value) == NXT_CONF_INTEGER) {
+ int_value = nxt_conf_get_integer(value);
+
+ if (int_value < 1) {
+ return nxt_conf_vldt_error(vldt, "The \"processes\" number must be "
+ "equal to or greater than 1.");
+ }
+
+ if (int_value > NXT_INT32_T_MAX) {
+ return nxt_conf_vldt_error(vldt, "The \"processes\" number must "
+ "not exceed %d.", NXT_INT32_T_MAX);
+ }
+
+ return NXT_OK;
+ }
+
+ ret = nxt_conf_vldt_object(vldt, value, data);
+ if (ret != NXT_OK) {
+ return ret;
+ }
+
+ proc.spare = 1;
+ proc.max = 1;
+ proc.idle_timeout = 15;
+
+ ret = nxt_conf_map_object(vldt->pool, value,
+ nxt_conf_vldt_processes_conf_map,
+ nxt_nitems(nxt_conf_vldt_processes_conf_map),
+ &proc);
+ if (ret != NXT_OK) {
+ return ret;
+ }
+
+ if (proc.spare < 0) {
+ return nxt_conf_vldt_error(vldt, "The \"spare\" number must not be "
+ "negative.");
+ }
+
+ if (proc.spare > NXT_INT32_T_MAX) {
+ return nxt_conf_vldt_error(vldt, "The \"spare\" number must not "
+ "not exceed %d.", NXT_INT32_T_MAX);
+ }
+
+ if (nxt_conf_get_object_member(value, &max_str, NULL) != NULL) {
+
+ if (proc.max < 1) {
+ return nxt_conf_vldt_error(vldt, "The \"max\" number must be equal "
+ "to or greater than 1.");
+ }
+
+ if (proc.max > NXT_INT32_T_MAX) {
+ return nxt_conf_vldt_error(vldt, "The \"max\" number must not "
+ "not exceed %d.", NXT_INT32_T_MAX);
+ }
+
+ if (proc.max < proc.spare) {
+ return nxt_conf_vldt_error(vldt, "The \"spare\" number must be "
+ "lower than \"max\".");
+ }
+ }
+
+ if (proc.idle_timeout < 0) {
+ return nxt_conf_vldt_error(vldt, "The \"idle_timeout\" number must not "
+ "be negative.");
+ }
+
+ if (proc.idle_timeout > NXT_INT32_T_MAX / 1000) {
+ return nxt_conf_vldt_error(vldt, "The \"idle_timeout\" number must not "
+ "not exceed %d.", NXT_INT32_T_MAX / 1000);
+ }
+
+ return NXT_OK;
+}
+
+
static nxt_int_t
nxt_conf_vldt_object_iterator(nxt_conf_validation_t *vldt,
nxt_conf_value_t *value, void *data)
diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c
index f50475ed..fedd30cd 100644
--- a/src/nxt_main_process.c
+++ b/src/nxt_main_process.c
@@ -115,12 +115,6 @@ static nxt_conf_map_t nxt_common_app_conf[] = {
},
{
- nxt_string("workers"),
- NXT_CONF_MAP_INT32,
- offsetof(nxt_common_app_conf_t, workers),
- },
-
- {
nxt_string("home"),
NXT_CONF_MAP_CSTRZ,
offsetof(nxt_common_app_conf_t, u.python.home),
diff --git a/src/nxt_port.c b/src/nxt_port.c
index e8e94a94..1fcb0244 100644
--- a/src/nxt_port.c
+++ b/src/nxt_port.c
@@ -29,6 +29,7 @@ nxt_port_mp_cleanup(nxt_task_t *task, void *obj, void *data)
nxt_assert(port->use_count == 0);
nxt_assert(port->app_link.next == NULL);
+ nxt_assert(port->idle_link.next == NULL);
nxt_assert(nxt_queue_is_empty(&port->messages));
nxt_assert(nxt_lvlhsh_is_empty(&port->rpc_streams));
diff --git a/src/nxt_port.h b/src/nxt_port.h
index 74d37dcb..d916b34b 100644
--- a/src/nxt_port.h
+++ b/src/nxt_port.h
@@ -158,6 +158,9 @@ struct nxt_port_s {
nxt_queue_link_t app_link; /* for nxt_app_t.ports */
nxt_app_t *app;
+ nxt_queue_link_t idle_link; /* for nxt_app_t.idle_ports */
+ nxt_msec_t idle_start;
+
nxt_queue_t messages; /* of nxt_port_send_msg_t */
nxt_thread_mutex_t write_mutex;
diff --git a/src/nxt_router.c b/src/nxt_router.c
index 1ba7072d..19a37c69 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -12,11 +12,15 @@
typedef struct {
nxt_str_t type;
- uint32_t workers;
+ uint32_t processes;
+ uint32_t max_processes;
+ uint32_t spare_processes;
nxt_msec_t timeout;
nxt_msec_t res_timeout;
+ nxt_msec_t idle_timeout;
uint32_t requests;
nxt_conf_value_t *limits_value;
+ nxt_conf_value_t *processes_value;
} nxt_router_app_conf_t;
@@ -76,6 +80,12 @@ typedef struct {
} nxt_socket_rpc_t;
+typedef struct {
+ nxt_app_t *app;
+ nxt_router_temp_conf_t *temp_conf;
+} nxt_app_rpc_t;
+
+
struct nxt_port_select_state_s {
nxt_app_t *app;
nxt_req_app_link_t *ra;
@@ -83,7 +93,7 @@ struct nxt_port_select_state_s {
nxt_port_t *failed_port;
int failed_port_use_delta;
- nxt_bool_t can_start_worker;
+ uint8_t start_process; /* 1 bit */
nxt_req_app_link_t *shared_ra;
nxt_port_t *port;
};
@@ -96,7 +106,7 @@ static void nxt_router_port_select(nxt_task_t *task,
static nxt_int_t nxt_router_port_post_select(nxt_task_t *task,
nxt_port_select_state_t *state);
-static nxt_int_t nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app);
+static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
nxt_inline void
nxt_router_ra_inc_use(nxt_req_app_link_t *ra)
@@ -136,6 +146,12 @@ static void nxt_router_listen_socket_ready(nxt_task_t *task,
nxt_port_recv_msg_t *msg, void *data);
static void nxt_router_listen_socket_error(nxt_task_t *task,
nxt_port_recv_msg_t *msg, void *data);
+static void nxt_router_app_rpc_create(nxt_task_t *task,
+ nxt_router_temp_conf_t *tmcf, nxt_app_t *app);
+static void nxt_router_app_prefork_ready(nxt_task_t *task,
+ nxt_port_recv_msg_t *msg, void *data);
+static void nxt_router_app_prefork_error(nxt_task_t *task,
+ nxt_port_recv_msg_t *msg, void *data);
static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
@@ -193,7 +209,7 @@ static void nxt_router_app_port_ready(nxt_task_t *task,
static void nxt_router_app_port_error(nxt_task_t *task,
nxt_port_recv_msg_t *msg, void *data);
-static nxt_port_t * nxt_router_app_get_idle_port(nxt_app_t *app);
+static void nxt_router_app_quit(nxt_task_t *task, nxt_app_t *app);
static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
uint32_t request_failed, uint32_t got_response);
static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_app_t *app,
@@ -209,6 +225,12 @@ static nxt_int_t nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
nxt_app_wmsg_t *wmsg);
static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data);
static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
+static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj,
+ void *data);
+static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj,
+ void *data);
+static void nxt_router_app_release_handler(nxt_task_t *task, void *obj,
+ void *data);
static const nxt_http_request_state_t nxt_http_request_send_state;
static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
@@ -253,7 +275,8 @@ nxt_router_start(nxt_task_t *task, void *data)
static void
-nxt_router_start_worker_handler(nxt_task_t *task, nxt_port_t *port, void *data)
+nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
+ void *data)
{
size_t size;
uint32_t stream;
@@ -268,7 +291,7 @@ nxt_router_start_worker_handler(nxt_task_t *task, nxt_port_t *port, void *data)
rt = task->thread->runtime;
main_port = rt->port_by_type[NXT_PROCESS_MAIN];
- nxt_debug(task, "app '%V' %p start worker", &app->name, app);
+ nxt_debug(task, "app '%V' %p start process", &app->name, app);
size = app->name.length + 1 + app->conf.length;
@@ -304,7 +327,7 @@ failed:
nxt_thread_mutex_lock(&app->mutex);
- app->pending_workers--;
+ app->pending_processes--;
nxt_thread_mutex_unlock(&app->mutex);
@@ -313,7 +336,7 @@ failed:
static nxt_int_t
-nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app)
+nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
{
nxt_int_t res;
nxt_port_t *router_port;
@@ -324,7 +347,7 @@ nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app)
nxt_router_app_use(task, app, 1);
- res = nxt_port_post(task, router_port, nxt_router_start_worker_handler,
+ res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler,
app);
if (res == NXT_OK) {
@@ -333,7 +356,7 @@ nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app)
nxt_thread_mutex_lock(&app->mutex);
- app->pending_workers--;
+ app->pending_processes--;
nxt_thread_mutex_unlock(&app->mutex);
@@ -737,7 +760,8 @@ nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
static void
-nxt_router_worker_remove_pid(nxt_task_t *task, nxt_port_t *port, void *data)
+nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port,
+ void *data)
{
union {
nxt_pid_t removed_pid;
@@ -763,7 +787,7 @@ nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
{
- nxt_port_post(task, engine->port, nxt_router_worker_remove_pid,
+ nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid,
msg->u.data);
}
nxt_queue_loop;
@@ -837,10 +861,27 @@ fail:
}
+nxt_inline nxt_bool_t
+nxt_router_app_can_start(nxt_app_t *app)
+{
+ return app->processes + app->pending_processes < app->max_processes
+ && app->pending_processes < app->max_pending_processes;
+}
+
+
+nxt_inline nxt_bool_t
+nxt_router_app_need_start(nxt_app_t *app)
+{
+ return app->idle_processes + app->pending_processes
+ < app->spare_processes;
+}
+
+
static void
nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
{
nxt_int_t ret;
+ nxt_app_t *app;
nxt_router_t *router;
nxt_runtime_t *rt;
nxt_queue_link_t *qlk;
@@ -863,6 +904,15 @@ nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
return;
}
+ nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
+
+ if (nxt_router_app_need_start(app)) {
+ nxt_router_app_rpc_create(task, tmcf, app);
+ return;
+ }
+
+ } nxt_queue_loop;
+
rt = task->thread->runtime;
interface = nxt_service_get(rt->services, "engine", NULL);
@@ -923,6 +973,7 @@ nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
static void
nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
{
+ nxt_app_t *app;
nxt_socket_t s;
nxt_router_t *router;
nxt_queue_link_t *qlk;
@@ -944,6 +995,12 @@ nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
nxt_free(skcf->listen);
}
+ nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
+
+ nxt_router_app_quit(task, app);
+
+ } nxt_queue_loop;
+
router = tmcf->conf->router;
nxt_queue_add(&router->sockets, &tmcf->keeping);
@@ -984,15 +1041,21 @@ static nxt_conf_map_t nxt_router_app_conf[] = {
},
{
- nxt_string("workers"),
+ nxt_string("limits"),
+ NXT_CONF_MAP_PTR,
+ offsetof(nxt_router_app_conf_t, limits_value),
+ },
+
+ {
+ nxt_string("processes"),
NXT_CONF_MAP_INT32,
- offsetof(nxt_router_app_conf_t, workers),
+ offsetof(nxt_router_app_conf_t, processes),
},
{
- nxt_string("limits"),
+ nxt_string("processes"),
NXT_CONF_MAP_PTR,
- offsetof(nxt_router_app_conf_t, limits_value),
+ offsetof(nxt_router_app_conf_t, processes_value),
},
};
@@ -1018,6 +1081,27 @@ static nxt_conf_map_t nxt_router_app_limits_conf[] = {
};
+static nxt_conf_map_t nxt_router_app_processes_conf[] = {
+ {
+ nxt_string("spare"),
+ NXT_CONF_MAP_INT32,
+ offsetof(nxt_router_app_conf_t, spare_processes),
+ },
+
+ {
+ nxt_string("max"),
+ NXT_CONF_MAP_INT32,
+ offsetof(nxt_router_app_conf_t, max_processes),
+ },
+
+ {
+ nxt_string("idle_timeout"),
+ NXT_CONF_MAP_MSEC,
+ offsetof(nxt_router_app_conf_t, idle_timeout),
+ },
+};
+
+
static nxt_conf_map_t nxt_router_listener_conf[] = {
{
nxt_string("application"),
@@ -1100,6 +1184,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_conf_value_t *applications, *application;
nxt_conf_value_t *listeners, *listener;
nxt_socket_conf_t *skcf;
+ nxt_event_engine_t *engine;
nxt_app_lang_module_t *lang;
nxt_router_app_conf_t apcf;
nxt_router_listener_conf_t lscf;
@@ -1174,11 +1259,15 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
continue;
}
- apcf.workers = 1;
+ apcf.processes = 1;
+ apcf.max_processes = 1;
+ apcf.spare_processes = 1;
apcf.timeout = 0;
apcf.res_timeout = 1000;
+ apcf.idle_timeout = 15000;
apcf.requests = 0;
apcf.limits_value = NULL;
+ apcf.processes_value = NULL;
ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
nxt_nitems(nxt_router_app_conf), &apcf);
@@ -1204,10 +1293,27 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
}
}
+ if (apcf.processes_value != NULL
+ && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT)
+ {
+ ret = nxt_conf_map_object(mp, apcf.processes_value,
+ nxt_router_app_processes_conf,
+ nxt_nitems(nxt_router_app_processes_conf),
+ &apcf);
+ if (ret != NXT_OK) {
+ nxt_log(task, NXT_LOG_CRIT, "application processes map error");
+ goto app_fail;
+ }
+
+ } else {
+ apcf.max_processes = apcf.processes;
+ apcf.spare_processes = apcf.processes;
+ }
+
nxt_debug(task, "application type: %V", &apcf.type);
- nxt_debug(task, "application workers: %D", apcf.workers);
- nxt_debug(task, "application request timeout: %D", apcf.timeout);
- nxt_debug(task, "application reschedule timeout: %D", apcf.res_timeout);
+ nxt_debug(task, "application processes: %D", apcf.processes);
+ nxt_debug(task, "application request timeout: %M", apcf.timeout);
+ nxt_debug(task, "application reschedule timeout: %M", apcf.res_timeout);
nxt_debug(task, "application requests: %D", apcf.requests);
lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
@@ -1226,6 +1332,8 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
}
nxt_queue_init(&app->ports);
+ nxt_queue_init(&app->spare_ports);
+ nxt_queue_init(&app->idle_ports);
nxt_queue_init(&app->requests);
nxt_queue_init(&app->pending);
@@ -1233,14 +1341,32 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_memcpy(app->name.start, name.start, name.length);
app->type = lang->type;
- app->max_workers = apcf.workers;
+ app->max_processes = apcf.max_processes;
+ app->spare_processes = apcf.spare_processes;
+ app->max_pending_processes = apcf.spare_processes
+ ? apcf.spare_processes : 1;
app->timeout = apcf.timeout;
app->res_timeout = apcf.res_timeout * 1000000;
+ app->idle_timeout = apcf.idle_timeout;
app->live = 1;
app->max_pending_responses = 2;
app->max_requests = apcf.requests;
app->prepare_msg = nxt_app_prepare_msg[lang->type];
+ engine = task->thread->engine;
+
+ app->engine = engine;
+
+ app->idle_timer.precision = NXT_TIMER_DEFAULT_PRECISION;
+ app->idle_timer.work_queue = &engine->fast_work_queue;
+ app->idle_timer.handler = nxt_router_app_idle_timeout;
+ app->idle_timer.task = &engine->task;
+ app->idle_timer.log = app->idle_timer.task->log;
+
+ app->adjust_idle_work.handler = nxt_router_adjust_idle_timer;
+ app->adjust_idle_work.task = &engine->task;
+ app->adjust_idle_work.obj = app;
+
nxt_queue_insert_tail(&tmcf->apps, &app->link);
nxt_router_app_use(task, app, 1);
@@ -1624,6 +1750,119 @@ nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
}
+static void
+nxt_router_app_rpc_create(nxt_task_t *task,
+ nxt_router_temp_conf_t *tmcf, nxt_app_t *app)
+{
+ size_t size;
+ uint32_t stream;
+ nxt_buf_t *b;
+ nxt_port_t *main_port, *router_port;
+ nxt_runtime_t *rt;
+ nxt_app_rpc_t *rpc;
+
+ rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_app_rpc_t));
+ if (rpc == NULL) {
+ goto fail;
+ }
+
+ rpc->app = app;
+ rpc->temp_conf = tmcf;
+
+ nxt_debug(task, "app '%V' prefork", &app->name);
+
+ size = app->name.length + 1 + app->conf.length;
+
+ b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
+ if (nxt_slow_path(b == NULL)) {
+ goto fail;
+ }
+
+ nxt_buf_cpystr(b, &app->name);
+ *b->mem.free++ = '\0';
+ nxt_buf_cpystr(b, &app->conf);
+
+ rt = task->thread->runtime;
+ main_port = rt->port_by_type[NXT_PROCESS_MAIN];
+ router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
+
+ stream = nxt_port_rpc_register_handler(task, router_port,
+ nxt_router_app_prefork_ready,
+ nxt_router_app_prefork_error,
+ -1, rpc);
+ if (nxt_slow_path(stream == 0)) {
+ goto fail;
+ }
+
+ app->pending_processes++;
+
+ nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1,
+ stream, router_port->id, b);
+
+ return;
+
+fail:
+
+ nxt_router_conf_error(task, tmcf);
+}
+
+
+static void
+nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
+ void *data)
+{
+ nxt_app_t *app;
+ nxt_port_t *port;
+ nxt_app_rpc_t *rpc;
+ nxt_event_engine_t *engine;
+
+ rpc = data;
+ app = rpc->app;
+
+ port = msg->u.new_port;
+ port->app = app;
+
+ nxt_router_app_use(task, app, 1);
+
+ app->pending_processes--;
+ app->processes++;
+ app->idle_processes++;
+
+ engine = task->thread->engine;
+
+ nxt_queue_insert_tail(&app->ports, &port->app_link);
+ nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
+
+ port->idle_start = 0;
+
+ nxt_port_inc_use(port);
+
+ nxt_work_queue_add(&engine->fast_work_queue,
+ nxt_router_conf_apply, task, rpc->temp_conf, NULL);
+}
+
+
+static void
+nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
+ void *data)
+{
+ nxt_app_t *app;
+ nxt_app_rpc_t *rpc;
+ nxt_router_temp_conf_t *tmcf;
+
+ rpc = data;
+ app = rpc->app;
+ tmcf = rpc->temp_conf;
+
+ nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"",
+ &app->name);
+
+ app->pending_processes--;
+
+ nxt_router_conf_error(task, tmcf);
+}
+
+
static nxt_int_t
nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
@@ -1933,32 +2172,11 @@ static void
nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
nxt_router_temp_conf_t *tmcf)
{
- nxt_app_t *app;
- nxt_port_t *port;
+ nxt_app_t *app;
nxt_queue_each(app, &router->apps, nxt_app_t, link) {
- nxt_queue_remove(&app->link);
-
- nxt_debug(task, "about to free app '%V' %p", &app->name, app);
-
- app->live = 0;
-
- do {
- port = nxt_router_app_get_idle_port(app);
- if (port == NULL) {
- break;
- }
-
- nxt_debug(task, "port %p send quit", port);
-
- nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0,
- NULL);
-
- nxt_port_use(task, port, -1);
- } while (1);
-
- nxt_router_app_use(task, app, -1);
+ nxt_router_app_quit(task, app);
} nxt_queue_loop;
@@ -2454,6 +2672,7 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
if (nxt_buf_mem_used_size(&b->mem) == 0) {
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
b->completion_handler, task, b, b->parent);
+
} else {
nxt_buf_chain_add(&r->out, b);
}
@@ -2561,14 +2780,15 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_thread_mutex_lock(&app->mutex);
- nxt_assert(app->pending_workers != 0);
+ nxt_assert(app->pending_processes != 0);
- app->pending_workers--;
- app->workers++;
+ app->pending_processes--;
+ app->processes++;
nxt_thread_mutex_unlock(&app->mutex);
- nxt_debug(task, "app '%V' %p new port ready", &app->name, app);
+ nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d",
+ &app->name, port->pid, app->processes, app->pending_processes);
nxt_router_app_port_release(task, port, 0, 0);
}
@@ -2590,9 +2810,9 @@ nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_thread_mutex_lock(&app->mutex);
- nxt_assert(app->pending_workers != 0);
+ nxt_assert(app->pending_processes != 0);
- app->pending_workers--;
+ app->pending_processes--;
if (!nxt_queue_is_empty(&app->requests)) {
lnk = nxt_queue_last(&app->requests);
@@ -2611,7 +2831,7 @@ nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_debug(task, "app '%V' %p abort next stream #%uD",
&app->name, app, ra->stream);
- nxt_router_ra_error(ra, 500, "Failed to start application worker");
+ nxt_router_ra_error(ra, 500, "Failed to start application process");
nxt_router_ra_use(task, ra, -1);
}
@@ -2629,10 +2849,13 @@ nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i)
if (i < 0 && c == -i) {
nxt_assert(app->live == 0);
- nxt_assert(app->workers == 0);
- nxt_assert(app->pending_workers == 0);
+ nxt_assert(app->processes == 0);
+ nxt_assert(app->idle_processes == 0);
+ nxt_assert(app->pending_processes == 0);
nxt_assert(nxt_queue_is_empty(&app->requests) != 0);
nxt_assert(nxt_queue_is_empty(&app->ports) != 0);
+ nxt_assert(nxt_queue_is_empty(&app->spare_ports) != 0);
+ nxt_assert(nxt_queue_is_empty(&app->idle_ports) != 0);
nxt_thread_mutex_destroy(&app->mutex);
nxt_free(app);
@@ -2666,6 +2889,19 @@ nxt_router_pop_first_port(nxt_app_t *app)
port->app_pending_responses++;
+ if (nxt_queue_chk_remove(&port->idle_link)) {
+ app->idle_processes--;
+
+ if (port->idle_start == 0) {
+ nxt_assert(app->idle_processes < app->spare_processes);
+
+ } else {
+ nxt_assert(app->idle_processes >= app->spare_processes);
+
+ port->idle_start = 0;
+ }
+ }
+
if ((app->max_pending_responses == 0
|| port->app_pending_responses < app->max_pending_responses)
&& (app->max_requests == 0
@@ -2684,8 +2920,8 @@ nxt_router_pop_first_port(nxt_app_t *app)
}
-static nxt_port_t *
-nxt_router_app_get_idle_port(nxt_app_t *app)
+nxt_inline nxt_port_t *
+nxt_router_app_get_port_for_quit(nxt_app_t *app)
{
nxt_port_t *port;
@@ -2701,8 +2937,16 @@ nxt_router_app_get_idle_port(nxt_app_t *app)
continue;
}
- nxt_queue_remove(&port->app_link);
- port->app_link.next = NULL;
+ /* Caller is responsible to decrease port use count. */
+ nxt_queue_chk_remove(&port->app_link);
+
+ if (nxt_queue_chk_remove(&port->idle_link)) {
+ app->idle_processes--;
+ }
+
+ /* Caller is responsible to decrease app use count. */
+ port->app = NULL;
+ app->processes--;
break;
@@ -2715,6 +2959,41 @@ nxt_router_app_get_idle_port(nxt_app_t *app)
static void
+nxt_router_app_quit(nxt_task_t *task, nxt_app_t *app)
+{
+ nxt_port_t *port;
+
+ nxt_queue_remove(&app->link);
+
+ app->live = 0;
+
+ for ( ;; ) {
+ port = nxt_router_app_get_port_for_quit(app);
+ if (port == NULL) {
+ break;
+ }
+
+ nxt_debug(task, "send QUIT to app '%V' pid %PI", &app->name, port->pid);
+
+ nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
+
+ nxt_port_use(task, port, -1);
+ nxt_router_app_use(task, app, -1);
+ }
+
+ if (nxt_timer_is_in_tree(&app->idle_timer)) {
+ nxt_assert(app->engine == task->thread->engine);
+
+ app->idle_timer.handler = nxt_router_app_release_handler;
+ nxt_timer_add(app->engine, &app->idle_timer, 0);
+
+ } else {
+ nxt_router_app_use(task, app, -1);
+ }
+}
+
+
+static void
nxt_router_app_process_request(nxt_task_t *task, void *obj, void *data)
{
nxt_app_t *app;
@@ -2739,7 +3018,8 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
uint32_t request_failed, uint32_t got_response)
{
nxt_app_t *app;
- nxt_bool_t send_quit, cancelled;
+ nxt_bool_t port_unchained;
+ nxt_bool_t send_quit, cancelled, adjust_idle_timer;
nxt_queue_link_t *lnk;
nxt_req_app_link_t *ra, *pending_ra, *re_ra;
nxt_port_select_state_t state;
@@ -2856,12 +3136,52 @@ app_dead:
re_ra_cancelled:
- send_quit = (app->live == 0 && port->app_pending_responses > 0)
+ send_quit = (app->live == 0 && port->app_pending_responses == 0)
|| (app->max_requests > 0 && port->app_pending_responses == 0
&& port->app_responses >= app->max_requests);
+ if (send_quit) {
+ port_unchained = nxt_queue_chk_remove(&port->app_link);
+
+ port->app = NULL;
+ app->processes--;
+
+ } else {
+ port_unchained = 0;
+ }
+
+ adjust_idle_timer = 0;
+
+ if (!send_quit && port->app_pending_responses == 0) {
+ nxt_assert(port->idle_link.next == NULL);
+
+ if (app->idle_processes == app->spare_processes
+ && app->adjust_idle_work.data == NULL)
+ {
+ adjust_idle_timer = 1;
+ app->adjust_idle_work.data = app;
+ app->adjust_idle_work.next = NULL;
+ }
+
+ if (app->idle_processes < app->spare_processes) {
+ nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
+
+ } else {
+ nxt_queue_insert_tail(&app->idle_ports, &port->idle_link);
+
+ port->idle_start = task->thread->engine->timers.now;
+ }
+
+ app->idle_processes++;
+ }
+
nxt_thread_mutex_unlock(&app->mutex);
+ if (adjust_idle_timer) {
+ nxt_router_app_use(task, app, 1);
+ nxt_event_engine_post(app->engine, &app->adjust_idle_work);
+ }
+
if (pending_ra != NULL) {
nxt_router_ra_use(task, pending_ra, -1);
}
@@ -2893,12 +3213,18 @@ re_ra_cancelled:
}
if (send_quit) {
- nxt_debug(task, "app '%V' %p is not alive, send QUIT to port",
+ nxt_debug(task, "app '%V' %p send QUIT to port",
&app->name, app);
nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT,
-1, 0, 0, NULL);
+ if (port_unchained) {
+ nxt_port_use(task, port, -1);
+ }
+
+ nxt_router_app_use(task, app, -1);
+
goto adjust_use;
}
@@ -2916,8 +3242,10 @@ adjust_use:
void
nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port)
{
- nxt_app_t *app;
- nxt_bool_t unchain, start_worker;
+ nxt_app_t *app;
+ nxt_bool_t unchain, start_process;
+ nxt_port_t *idle_port;
+ nxt_queue_link_t *idle_lnk;
app = port->app;
@@ -2925,41 +3253,162 @@ nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port)
nxt_thread_mutex_lock(&app->mutex);
- unchain = port->app_link.next != NULL;
+ unchain = nxt_queue_chk_remove(&port->app_link);
- if (unchain) {
- nxt_queue_remove(&port->app_link);
- port->app_link.next = NULL;
+ if (nxt_queue_chk_remove(&port->idle_link)) {
+ app->idle_processes--;
+
+ if (port->idle_start == 0
+ && app->idle_processes >= app->spare_processes)
+ {
+ nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
+
+ idle_lnk = nxt_queue_last(&app->idle_ports);
+ idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link);
+ nxt_queue_remove(idle_lnk);
+
+ nxt_queue_insert_tail(&app->spare_ports, idle_lnk);
+
+ idle_port->idle_start = 0;
+ }
}
- app->workers--;
+ app->processes--;
- start_worker = app->live != 0
- && nxt_queue_is_empty(&app->requests) == 0
- && app->workers + app->pending_workers < app->max_workers;
+ start_process = app->live != 0
+ && !task->thread->engine->shutdown
+ && nxt_router_app_can_start(app)
+ && (!nxt_queue_is_empty(&app->requests)
+ || nxt_router_app_need_start(app));
- if (start_worker) {
- app->pending_workers++;
+ if (start_process) {
+ app->pending_processes++;
}
nxt_thread_mutex_unlock(&app->mutex);
- nxt_debug(task, "app '%V' %p port %p close", &app->name, app, port);
+ nxt_debug(task, "app '%V' pid %PI closed", &app->name, port->pid);
if (unchain) {
nxt_port_use(task, port, -1);
}
- if (start_worker) {
- nxt_router_start_worker(task, app);
+ if (start_process) {
+ nxt_router_start_app_process(task, app);
}
}
static void
+nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_app_t *app;
+ nxt_bool_t queued;
+ nxt_port_t *port;
+ nxt_msec_t timeout, threshold;
+ nxt_queue_link_t *lnk;
+ nxt_event_engine_t *engine;
+
+ app = obj;
+ queued = (data == app);
+
+ nxt_debug(task, "nxt_router_adjust_idle_timer: app \"%V\", queued %b",
+ &app->name, queued);
+
+ engine = task->thread->engine;
+
+ nxt_assert(app->engine == engine);
+
+ threshold = engine->timers.now + app->idle_timer.precision;
+ timeout = 0;
+
+ nxt_thread_mutex_lock(&app->mutex);
+
+ if (queued) {
+ app->adjust_idle_work.data = NULL;
+ }
+
+ while (app->idle_processes > app->spare_processes) {
+
+ nxt_assert(nxt_queue_is_empty(&app->idle_ports) == 0);
+
+ lnk = nxt_queue_first(&app->idle_ports);
+ port = nxt_queue_link_data(lnk, nxt_port_t, idle_link);
+
+ timeout = port->idle_start + app->idle_timeout;
+
+ if (timeout > threshold) {
+ break;
+ }
+
+ nxt_queue_remove(lnk);
+ lnk->next = NULL;
+
+ nxt_queue_chk_remove(&port->app_link);
+
+ app->idle_processes--;
+ app->processes--;
+ port->app = NULL;
+
+ nxt_thread_mutex_unlock(&app->mutex);
+
+ nxt_debug(task, "app '%V' send QUIT to idle port %PI",
+ &app->name, port->pid);
+
+ nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
+
+ nxt_port_use(task, port, -1);
+ nxt_router_app_use(task, app, -1);
+
+ nxt_thread_mutex_lock(&app->mutex);
+ }
+
+ nxt_thread_mutex_unlock(&app->mutex);
+
+ if (timeout > threshold) {
+ nxt_timer_add(engine, &app->idle_timer, timeout - threshold);
+
+ } else {
+ nxt_timer_disable(engine, &app->idle_timer);
+ }
+
+ if (queued) {
+ nxt_router_app_use(task, app, -1);
+ }
+}
+
+
+static void
+nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_app_t *app;
+ nxt_timer_t *timer;
+
+ timer = obj;
+ app = nxt_container_of(timer, nxt_app_t, idle_timer);
+
+ nxt_router_adjust_idle_timer(task, app, NULL);
+}
+
+
+static void
+nxt_router_app_release_handler(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_app_t *app;
+ nxt_timer_t *timer;
+
+ timer = obj;
+ app = nxt_container_of(timer, nxt_app_t, idle_timer);
+
+ nxt_router_app_use(task, app, -1);
+}
+
+
+static void
nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state)
{
nxt_app_t *app;
+ nxt_bool_t can_start_process;
nxt_req_app_link_t *ra;
ra = state->ra;
@@ -2996,12 +3445,13 @@ nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state)
ra->app_port = NULL;
}
- state->can_start_worker = (app->workers + app->pending_workers)
- < app->max_workers;
+ can_start_process = nxt_router_app_can_start(app);
+
state->port = NULL;
+ state->start_process = 0;
if (nxt_queue_is_empty(&app->ports)
- || (state->can_start_worker && nxt_router_app_first_port_busy(app)) )
+ || (can_start_process && nxt_router_app_first_port_busy(app)) )
{
ra = nxt_router_ra_create(task, ra);
@@ -3020,8 +3470,9 @@ nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state)
nxt_debug(task, "ra stream #%uD enqueue to app->requests", ra->stream);
- if (state->can_start_worker) {
- app->pending_workers++;
+ if (can_start_process) {
+ app->pending_processes++;
+ state->start_process = 1;
}
} else {
@@ -3038,6 +3489,11 @@ nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state)
nxt_router_ra_pending(task, app, ra);
}
+
+ if (can_start_process && nxt_router_app_need_start(app)) {
+ app->pending_processes++;
+ state->start_process = 1;
+ }
}
fail:
@@ -3077,20 +3533,24 @@ nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state)
ra->app_port = state->port;
+ if (state->start_process) {
+ nxt_router_start_app_process(task, app);
+ }
+
return NXT_OK;
}
- if (!state->can_start_worker) {
- nxt_debug(task, "app '%V' %p too many running or pending workers",
+ if (!state->start_process) {
+ nxt_debug(task, "app '%V' %p too many running or pending processes",
&app->name, app);
return NXT_AGAIN;
}
- res = nxt_router_start_worker(task, app);
+ res = nxt_router_start_app_process(task, app);
if (nxt_slow_path(res != NXT_OK)) {
- nxt_router_ra_error(ra, 500, "Failed to start worker");
+ nxt_router_ra_error(ra, 500, "Failed to start app process");
nxt_router_ra_use(task, ra, -1);
return NXT_ERROR;
@@ -3227,7 +3687,7 @@ nxt_router_app_prepare_request(nxt_task_t *task, nxt_req_app_link_t *ra)
goto release_port;
}
- nxt_debug(task, "about to send %O bytes buffer to worker port %d",
+ nxt_debug(task, "about to send %O bytes buffer to app process port %d",
nxt_buf_used_size(wmsg.write),
wmsg.port->socket.fd);
diff --git a/src/nxt_router.h b/src/nxt_router.h
index 1370262c..a904901c 100644
--- a/src/nxt_router.h
+++ b/src/nxt_router.h
@@ -83,18 +83,29 @@ struct nxt_app_s {
nxt_thread_mutex_t mutex; /* Protects ports queue. */
nxt_queue_t ports; /* of nxt_port_t.app_link */
+ nxt_queue_t spare_ports; /* of nxt_port_t.idle_link */
+ nxt_queue_t idle_ports; /* of nxt_port_t.idle_link */
+ nxt_timer_t idle_timer;
+ nxt_work_t adjust_idle_work;
+ nxt_event_engine_t *engine;
+
nxt_queue_t requests; /* of nxt_req_app_link_t */
nxt_queue_t pending; /* of nxt_req_app_link_t */
nxt_str_t name;
- uint32_t pending_workers;
- uint32_t workers;
- uint32_t max_workers;
+ uint32_t pending_processes;
+ uint32_t processes;
+ uint32_t idle_processes;
+
+ uint32_t max_processes;
+ uint32_t spare_processes;
+ uint32_t max_pending_processes;
uint32_t max_pending_responses;
uint32_t max_requests;
nxt_msec_t timeout;
nxt_nsec_t res_timeout;
+ nxt_msec_t idle_timeout;
nxt_app_type_t type:8;
uint8_t live; /* 1 bit */
diff --git a/test/test_configuration.py b/test/test_configuration.py
index f7490069..700e4aa1 100644
--- a/test/test_configuration.py
+++ b/test/test_configuration.py
@@ -14,7 +14,7 @@ class TestUnitConfiguration(unit.TestUnitControl):
{
"ap\u0070": {
"type": "\u0070ython",
- "workers": 1,
+ "processes": { "spare": 0 },
"path": "\u002Fapp",
"module": "wsgi"
}
@@ -25,7 +25,7 @@ class TestUnitConfiguration(unit.TestUnitControl):
self.assertIn('success', self.conf({
"приложение": {
"type": "python",
- "workers": 1,
+ "processes": { "spare": 0 },
"path": "/app",
"module": "wsgi"
}
@@ -36,7 +36,7 @@ class TestUnitConfiguration(unit.TestUnitControl):
{
"app": {
"type": "python",
- "workers": \u0031,
+ "processes": { "spare": \u0030 },
"path": "/app",
"module": "wsgi"
}
@@ -49,18 +49,16 @@ class TestUnitConfiguration(unit.TestUnitControl):
def test_applications_string(self):
self.assertIn('error', self.conf('"{}"', '/applications'), 'string')
- @unittest.expectedFailure
- def test_negative_workers(self):
+ def test_negative_spare(self):
self.assertIn('error', self.conf({
"app": {
"type": "python",
- "workers": -1,
+ "processes": { "spare": -1 },
"path": "/app",
"module": "wsgi"
}
- }, '/applications'), 'negative workers')
+ }, '/applications'), 'negative spare')
- @unittest.expectedFailure
def test_applications_type_only(self):
self.assertIn('error', self.conf({
"app": {
@@ -73,7 +71,7 @@ class TestUnitConfiguration(unit.TestUnitControl):
{
app": {
"type": "python",
- "workers": 1,
+ "processes": { "spare": 0 },
"path": "/app",
"module": "wsgi"
}
@@ -85,7 +83,7 @@ class TestUnitConfiguration(unit.TestUnitControl):
{
"app" {
"type": "python",
- "workers": 1,
+ "processes": { "spare": 0 },
"path": "/app",
"module": "wsgi"
}
@@ -97,7 +95,7 @@ class TestUnitConfiguration(unit.TestUnitControl):
{
"app": {
"type": "python"
- "workers": 1,
+ "processes": { "spare": 0 },
"path": "/app",
"module": "wsgi"
}
@@ -112,7 +110,7 @@ class TestUnitConfiguration(unit.TestUnitControl):
self.assertIn('success', self.conf({
"app": {
"type": "python",
- "workers": 1,
+ "processes": { "spare": 0 },
"path": "../app",
"module": "wsgi"
}
@@ -137,7 +135,7 @@ class TestUnitConfiguration(unit.TestUnitControl):
"applications": {
"app": {
"type": "python",
- "workers": 1,
+ "processes": { "spare": 0 },
"path": "/app",
"module": "wsgi"
}
@@ -154,7 +152,7 @@ class TestUnitConfiguration(unit.TestUnitControl):
"applications": {
"app": {
"type": "python",
- "workers": 1,
+ "processes": { "spare": 0 },
"path": "/app",
"module": "wsgi"
}
@@ -171,7 +169,7 @@ class TestUnitConfiguration(unit.TestUnitControl):
"applications": {
"app": {
"type": "python",
- "workers": 1,
+ "processes": { "spare": 0 },
"path": "/app",
"module": "wsgi"
}
@@ -188,7 +186,7 @@ class TestUnitConfiguration(unit.TestUnitControl):
"applications": {
"app": {
"type": "python",
- "workers": 1,
+ "processes": { "spare": 0 },
"path": "/app",
"module": "wsgi"
}
diff --git a/test/test_php_basic.py b/test/test_php_basic.py
index dd757925..1487a9c5 100644
--- a/test/test_php_basic.py
+++ b/test/test_php_basic.py
@@ -9,7 +9,7 @@ class TestUnitBasic(unit.TestUnitControl):
conf_app = {
"app": {
"type": "php",
- "workers": 1,
+ "processes": { "spare": 0 },
"root": "/app",
"index": "index.php"
}
@@ -34,7 +34,7 @@ class TestUnitBasic(unit.TestUnitControl):
{
"app": {
"type": "php",
- "workers": 1,
+ "processes": { "spare": 0 },
"root": "/app",
"index": "index.php"
}
@@ -48,7 +48,7 @@ class TestUnitBasic(unit.TestUnitControl):
{
"app": {
"type": "php",
- "workers": 1,
+ "processes": { "spare": 0 },
"root": "/app",
"index": "index.php"
}
@@ -61,7 +61,7 @@ class TestUnitBasic(unit.TestUnitControl):
self.assertEqual(self.conf_get('/applications/app'),
{
"type": "php",
- "workers": 1,
+ "processes": { "spare": 0 },
"root": "/app",
"index": "index.php"
},
@@ -72,8 +72,8 @@ class TestUnitBasic(unit.TestUnitControl):
self.assertEqual(self.conf_get('/applications/app/type'), 'php',
'type')
- self.assertEqual(self.conf_get('/applications/app/workers'), 1,
- 'workers')
+ self.assertEqual(self.conf_get('/applications/app/processes/spare'), 0,
+ 'spare processes')
def test_php_get_listeners(self):
self.conf(self.conf_basic)
@@ -118,9 +118,9 @@ class TestUnitBasic(unit.TestUnitControl):
def test_php_change_application(self):
self.conf(self.conf_basic)
- self.conf('30', '/applications/app/workers')
- self.assertEqual(self.conf_get('/applications/app/workers'), 30,
- 'change application workers')
+ self.conf('30', '/applications/app/processes/max')
+ self.assertEqual(self.conf_get('/applications/app/processes/max'), 30,
+ 'change application max')
self.conf('"/www"', '/applications/app/root')
self.assertEqual(self.conf_get('/applications/app/root'), '/www',
diff --git a/test/test_python_application.py b/test/test_python_application.py
index 4fcedc43..1b99ea44 100644
--- a/test/test_python_application.py
+++ b/test/test_python_application.py
@@ -19,7 +19,7 @@ class TestUnitApplication(unit.TestUnitControl):
"applications": {
"app": {
"type": "python",
- "workers": 1,
+ "processes": { "spare": 0 },
"path": self.testdir + '/' + name,
"module": "wsgi"
}
diff --git a/test/test_python_atexit.py b/test/test_python_atexit.py
index 4dc984c1..4c8a6543 100644
--- a/test/test_python_atexit.py
+++ b/test/test_python_atexit.py
@@ -37,7 +37,7 @@ def application(env, start_response):
"applications": {
"app": {
"type": "python",
- "workers": 1,
+ "processes": { "spare": 0 },
"path": self.testdir + '/' + name,
"module": "wsgi"
}
diff --git a/test/test_python_basic.py b/test/test_python_basic.py
index ba63f4c6..ed0bd738 100644
--- a/test/test_python_basic.py
+++ b/test/test_python_basic.py
@@ -9,7 +9,7 @@ class TestUnitBasic(unit.TestUnitControl):
conf_app = {
"app": {
"type": "python",
- "workers": 1,
+ "processes": { "spare": 0 },
"path": "/app",
"module": "wsgi"
}
@@ -44,7 +44,7 @@ class TestUnitBasic(unit.TestUnitControl):
{
"app": {
"type": "python",
- "workers": 1,
+ "processes": { "spare": 0 },
"path": "/app",
"module": "wsgi"
}
@@ -58,7 +58,7 @@ class TestUnitBasic(unit.TestUnitControl):
{
"app": {
"type": "python",
- "workers": 1,
+ "processes": { "spare": 0 },
"path": "/app",
"module":"wsgi"
}
@@ -71,7 +71,7 @@ class TestUnitBasic(unit.TestUnitControl):
self.assertEqual(self.conf_get('/applications/app'),
{
"type": "python",
- "workers": 1,
+ "processes": { "spare": 0 },
"path": "/app",
"module": "wsgi"
},
@@ -82,8 +82,8 @@ class TestUnitBasic(unit.TestUnitControl):
self.assertEqual(self.conf_get('/applications/app/type'), 'python',
'type')
- self.assertEqual(self.conf_get('/applications/app/workers'), 1,
- 'workers')
+ self.assertEqual(self.conf_get('/applications/app/processes/spare'), 0,
+ 'spare')
def test_python_get_listeners(self):
self.conf(self.conf_basic)
@@ -128,9 +128,9 @@ class TestUnitBasic(unit.TestUnitControl):
def test_python_change_application(self):
self.conf(self.conf_basic)
- self.conf('30', '/applications/app/workers')
- self.assertEqual(self.conf_get('/applications/app/workers'), 30,
- 'change application workers')
+ self.conf('30', '/applications/app/processes/max')
+ self.assertEqual(self.conf_get('/applications/app/processes/max'), 30,
+ 'change application max')
self.conf('"/www"', '/applications/app/path')
self.assertEqual(self.conf_get('/applications/app/path'), '/www',
diff --git a/test/test_python_keepalive.py b/test/test_python_keepalive.py
index 518befba..55607e9c 100644
--- a/test/test_python_keepalive.py
+++ b/test/test_python_keepalive.py
@@ -37,7 +37,7 @@ def application(environ, start_response):
"applications": {
"app": {
"type": "python",
- "workers": 1,
+ "processes": { "spare": 0 },
"path": self.testdir + '/' + name,
"module": "wsgi"
}
diff --git a/test/test_python_procman.py b/test/test_python_procman.py
new file mode 100644
index 00000000..5d2a5f50
--- /dev/null
+++ b/test/test_python_procman.py
@@ -0,0 +1,237 @@
+import os
+import time
+import unittest
+import unit
+
+class TestUnitApplication(unit.TestUnitControl):
+
+ def setUpClass():
+ u = unit.TestUnit()
+
+ u.check_modules('python')
+ u.check_version('0.3')
+
+ def getWorkerCount(self):
+ n = 0
+ for f in os.listdir(self.testdir):
+ if f.startswith('proctest.'):
+ n += 1
+
+ return n
+
+ def getTestCode(self):
+ return """
+import atexit
+import os
+
+fname = "%s.%%d" %% os.getpid()
+
+def remove_file():
+ os.remove(fname)
+
+atexit.register(remove_file)
+
+open(fname, 'w')
+
+def application(env, start_response):
+ start_response('200 OK', [('Content-Type','text/html')])
+ return [b'body']
+
+""" % (self.testdir + '/proctest')
+
+
+ def test_python_prefork(self):
+ code, name = self.getTestCode(), 'py_app'
+
+ self.python_application(name, code)
+
+ self.conf({
+ "listeners": {
+ "*:7080": {
+ "application": "app"
+ }
+ },
+ "applications": {
+ "app": {
+ "type": "python",
+ "processes": 2,
+ "path": self.testdir + '/' + name,
+ "module": "wsgi"
+ }
+ }
+ })
+
+ self.assertEqual(self.getWorkerCount(), 2, 'python prefork 2 processes')
+
+ self.get()
+ self.assertEqual(self.getWorkerCount(), 2, 'python prefork, still 2')
+
+ self.conf('4', '/applications/app/processes')
+
+ time.sleep(0.2)
+
+ self.assertEqual(self.getWorkerCount(), 4, 'python prefork 4 processes')
+
+ self.get()
+ self.assertEqual(self.getWorkerCount(), 4, 'python prefork, still 4')
+
+ self.conf({
+ "listeners": {},
+ "applications": {}
+ })
+
+ time.sleep(0.2)
+ self.assertEqual(self.getWorkerCount(), 0, 'python stop all processes')
+
+ time.sleep(2.2)
+
+
+ def test_python_ondemand(self):
+ code, name = self.getTestCode(), 'py_app'
+
+ self.python_application(name, code)
+
+ self.conf({
+ "listeners": {
+ "*:7080": {
+ "application": "app"
+ }
+ },
+ "applications": {
+ "app": {
+ "type": "python",
+ "processes": {
+ "spare": 0,
+ "max": 8,
+ "idle_timeout": 2
+ },
+ "path": self.testdir + '/' + name,
+ "module": "wsgi"
+ }
+ }
+ })
+
+ self.assertEqual(self.getWorkerCount(), 0, 'python on-demand')
+
+ self.get()
+ self.assertEqual(self.getWorkerCount(), 1, 'python start on-demand')
+
+ self.get()
+ self.assertEqual(self.getWorkerCount(), 1, 'python still 1')
+
+ time.sleep(2.2)
+ self.assertEqual(self.getWorkerCount(), 0, 'python stop idle')
+
+ self.conf({
+ "listeners": {},
+ "applications": {}
+ })
+
+ time.sleep(0.2)
+ self.assertEqual(self.getWorkerCount(), 0, 'python stop all processes')
+
+ time.sleep(2.2)
+
+ def test_python_scale_updown(self):
+ code, name = self.getTestCode(), 'py_app'
+
+ self.python_application(name, code)
+
+ self.conf({
+ "listeners": {
+ "*:7080": {
+ "application": "app"
+ }
+ },
+ "applications": {
+ "app": {
+ "type": "python",
+ "processes": {
+ "spare": 2,
+ "max": 8,
+ "idle_timeout": 2
+ },
+ "path": self.testdir + '/' + name,
+ "module": "wsgi"
+ }
+ }
+ })
+
+ self.assertEqual(self.getWorkerCount(), 2, 'python prefork 2')
+
+ self.get()
+ time.sleep(0.2)
+ self.assertEqual(self.getWorkerCount(), 3, 'python keep 2 idle, 1 busy')
+
+ self.get()
+ time.sleep(0.2)
+ self.assertEqual(self.getWorkerCount(), 3, 'python still 3')
+
+ time.sleep(2.2)
+ self.assertEqual(self.getWorkerCount(), 2, 'python stop idle')
+
+ self.get()
+
+ time.sleep(0.5)
+ self.assertEqual(self.getWorkerCount(), 3, 'python keep 2 idle, 1 busy')
+
+ self.conf({
+ "listeners": {},
+ "applications": {}
+ })
+
+ time.sleep(0.2)
+ self.assertEqual(self.getWorkerCount(), 0, 'python stop all processes')
+
+ time.sleep(2.2)
+
+ def test_python_reconfigure(self):
+ code, name = self.getTestCode(), 'py_app'
+
+ self.python_application(name, code)
+
+ self.conf({
+ "listeners": {
+ "*:7080": {
+ "application": "app"
+ }
+ },
+ "applications": {
+ "app": {
+ "type": "python",
+ "processes": {
+ "spare": 2,
+ "max": 6,
+ "idle_timeout": 2
+ },
+ "path": self.testdir + '/' + name,
+ "module": "wsgi"
+ }
+ }
+ })
+
+ self.assertEqual(self.getWorkerCount(), 2, 'python prefork 2')
+
+ self.get()
+ time.sleep(0.2)
+ self.assertEqual(self.getWorkerCount(), 3, 'python keep 2 idle, 1 busy')
+
+ self.conf('6', '/applications/app/processes/spare')
+ self.assertEqual(self.getWorkerCount(), 6, 'python prefork 6')
+
+ self.get()
+ time.sleep(0.2)
+ self.assertEqual(self.getWorkerCount(), 6, 'python still 6')
+
+ self.conf({
+ "listeners": {},
+ "applications": {}
+ })
+
+ time.sleep(0.2)
+ self.assertEqual(self.getWorkerCount(), 0, 'python stop all processes')
+
+ time.sleep(2.2)
+
+if __name__ == '__main__':
+ unittest.main()