diff options
-rw-r--r-- | src/nxt_application.h | 2 | ||||
-rw-r--r-- | src/nxt_conf.c | 7 | ||||
-rw-r--r-- | src/nxt_conf.h | 1 | ||||
-rw-r--r-- | src/nxt_conf_validation.c | 143 | ||||
-rw-r--r-- | src/nxt_main_process.c | 6 | ||||
-rw-r--r-- | src/nxt_port.c | 1 | ||||
-rw-r--r-- | src/nxt_port.h | 3 | ||||
-rw-r--r-- | src/nxt_router.c | 630 | ||||
-rw-r--r-- | src/nxt_router.h | 17 | ||||
-rw-r--r-- | test/test_configuration.py | 30 | ||||
-rw-r--r-- | test/test_php_basic.py | 18 | ||||
-rw-r--r-- | test/test_python_application.py | 2 | ||||
-rw-r--r-- | test/test_python_atexit.py | 2 | ||||
-rw-r--r-- | test/test_python_basic.py | 18 | ||||
-rw-r--r-- | test/test_python_keepalive.py | 2 | ||||
-rw-r--r-- | test/test_python_procman.py | 237 |
16 files changed, 982 insertions, 137 deletions
diff --git a/src/nxt_application.h b/src/nxt_application.h index 016214b3..d36d8cd1 100644 --- a/src/nxt_application.h +++ b/src/nxt_application.h @@ -60,8 +60,6 @@ struct nxt_common_app_conf_s { char *working_directory; - uint32_t workers; - union { nxt_python_app_conf_t python; nxt_php_app_conf_t php; diff --git a/src/nxt_conf.c b/src/nxt_conf.c index 872a62af..684135cf 100644 --- a/src/nxt_conf.c +++ b/src/nxt_conf.c @@ -163,6 +163,13 @@ nxt_conf_get_string(nxt_conf_value_t *value, nxt_str_t *str) } +int64_t +nxt_conf_get_integer(nxt_conf_value_t *value) +{ + return value->u.integer; +} + + nxt_uint_t nxt_conf_object_members_count(nxt_conf_value_t *value) { diff --git a/src/nxt_conf.h b/src/nxt_conf.h index d0c898a4..a7675d3b 100644 --- a/src/nxt_conf.h +++ b/src/nxt_conf.h @@ -100,6 +100,7 @@ void nxt_conf_json_position(u_char *start, u_char *pos, nxt_uint_t *line, nxt_int_t nxt_conf_validate(nxt_conf_validation_t *vldt); void nxt_conf_get_string(nxt_conf_value_t *value, nxt_str_t *str); +int64_t nxt_conf_get_integer(nxt_conf_value_t *value); // FIXME reimplement and reorder functions below nxt_uint_t nxt_conf_object_members_count(nxt_conf_value_t *value); diff --git a/src/nxt_conf_validation.c b/src/nxt_conf_validation.c index cb17a62b..e4109b00 100644 --- a/src/nxt_conf_validation.c +++ b/src/nxt_conf_validation.c @@ -54,6 +54,8 @@ static nxt_int_t nxt_conf_vldt_app(nxt_conf_validation_t *vldt, nxt_str_t *name, nxt_conf_value_t *value); static nxt_int_t nxt_conf_vldt_object(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, void *data); +static nxt_int_t nxt_conf_vldt_processes(nxt_conf_validation_t *vldt, + nxt_conf_value_t *value, void *data); static nxt_int_t nxt_conf_vldt_object_iterator(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, void *data); static nxt_int_t nxt_conf_vldt_system(nxt_conf_validation_t *vldt, @@ -107,22 +109,42 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_app_limits_members[] = { }; -static nxt_conf_vldt_object_t nxt_conf_vldt_common_members[] = { - { nxt_string("type"), - NXT_CONF_VLDT_STRING, +static nxt_conf_vldt_object_t nxt_conf_vldt_app_processes_members[] = { + { nxt_string("spare"), + NXT_CONF_VLDT_INTEGER, + NULL, + NULL }, + + { nxt_string("max"), + NXT_CONF_VLDT_INTEGER, NULL, NULL }, - { nxt_string("workers"), + { nxt_string("idle_timeout"), NXT_CONF_VLDT_INTEGER, NULL, NULL }, + NXT_CONF_VLDT_END +}; + + +static nxt_conf_vldt_object_t nxt_conf_vldt_common_members[] = { + { nxt_string("type"), + NXT_CONF_VLDT_STRING, + NULL, + NULL }, + { nxt_string("limits"), NXT_CONF_VLDT_OBJECT, &nxt_conf_vldt_object, (void *) &nxt_conf_vldt_app_limits_members }, + { nxt_string("processes"), + NXT_CONF_VLDT_INTEGER | NXT_CONF_VLDT_OBJECT, + &nxt_conf_vldt_processes, + (void *) &nxt_conf_vldt_app_processes_members }, + { nxt_string("user"), NXT_CONF_VLDT_STRING, nxt_conf_vldt_system, @@ -475,6 +497,119 @@ nxt_conf_vldt_object(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, } +typedef struct { + int64_t spare; + int64_t max; + int64_t idle_timeout; +} nxt_conf_vldt_processes_conf_t; + + +static nxt_conf_map_t nxt_conf_vldt_processes_conf_map[] = { + { + nxt_string("spare"), + NXT_CONF_MAP_INT64, + offsetof(nxt_conf_vldt_processes_conf_t, spare), + }, + + { + nxt_string("max"), + NXT_CONF_MAP_INT64, + offsetof(nxt_conf_vldt_processes_conf_t, max), + }, + + { + nxt_string("idle_timeout"), + NXT_CONF_MAP_INT64, + offsetof(nxt_conf_vldt_processes_conf_t, idle_timeout), + }, +}; + + +static nxt_int_t +nxt_conf_vldt_processes(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, + void *data) +{ + int64_t int_value; + nxt_int_t ret; + nxt_conf_vldt_processes_conf_t proc; + + static nxt_str_t max_str = nxt_string("max"); + + if (nxt_conf_type(value) == NXT_CONF_INTEGER) { + int_value = nxt_conf_get_integer(value); + + if (int_value < 1) { + return nxt_conf_vldt_error(vldt, "The \"processes\" number must be " + "equal to or greater than 1."); + } + + if (int_value > NXT_INT32_T_MAX) { + return nxt_conf_vldt_error(vldt, "The \"processes\" number must " + "not exceed %d.", NXT_INT32_T_MAX); + } + + return NXT_OK; + } + + ret = nxt_conf_vldt_object(vldt, value, data); + if (ret != NXT_OK) { + return ret; + } + + proc.spare = 1; + proc.max = 1; + proc.idle_timeout = 15; + + ret = nxt_conf_map_object(vldt->pool, value, + nxt_conf_vldt_processes_conf_map, + nxt_nitems(nxt_conf_vldt_processes_conf_map), + &proc); + if (ret != NXT_OK) { + return ret; + } + + if (proc.spare < 0) { + return nxt_conf_vldt_error(vldt, "The \"spare\" number must not be " + "negative."); + } + + if (proc.spare > NXT_INT32_T_MAX) { + return nxt_conf_vldt_error(vldt, "The \"spare\" number must not " + "not exceed %d.", NXT_INT32_T_MAX); + } + + if (nxt_conf_get_object_member(value, &max_str, NULL) != NULL) { + + if (proc.max < 1) { + return nxt_conf_vldt_error(vldt, "The \"max\" number must be equal " + "to or greater than 1."); + } + + if (proc.max > NXT_INT32_T_MAX) { + return nxt_conf_vldt_error(vldt, "The \"max\" number must not " + "not exceed %d.", NXT_INT32_T_MAX); + } + + if (proc.max < proc.spare) { + return nxt_conf_vldt_error(vldt, "The \"spare\" number must be " + "lower than \"max\"."); + } + } + + if (proc.idle_timeout < 0) { + return nxt_conf_vldt_error(vldt, "The \"idle_timeout\" number must not " + "be negative."); + } + + if (proc.idle_timeout > NXT_INT32_T_MAX / 1000) { + return nxt_conf_vldt_error(vldt, "The \"idle_timeout\" number must not " + "not exceed %d.", NXT_INT32_T_MAX / 1000); + } + + return NXT_OK; +} + + static nxt_int_t nxt_conf_vldt_object_iterator(nxt_conf_validation_t *vldt, nxt_conf_value_t *value, void *data) diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c index f50475ed..fedd30cd 100644 --- a/src/nxt_main_process.c +++ b/src/nxt_main_process.c @@ -115,12 +115,6 @@ static nxt_conf_map_t nxt_common_app_conf[] = { }, { - nxt_string("workers"), - NXT_CONF_MAP_INT32, - offsetof(nxt_common_app_conf_t, workers), - }, - - { nxt_string("home"), NXT_CONF_MAP_CSTRZ, offsetof(nxt_common_app_conf_t, u.python.home), diff --git a/src/nxt_port.c b/src/nxt_port.c index e8e94a94..1fcb0244 100644 --- a/src/nxt_port.c +++ b/src/nxt_port.c @@ -29,6 +29,7 @@ nxt_port_mp_cleanup(nxt_task_t *task, void *obj, void *data) nxt_assert(port->use_count == 0); nxt_assert(port->app_link.next == NULL); + nxt_assert(port->idle_link.next == NULL); nxt_assert(nxt_queue_is_empty(&port->messages)); nxt_assert(nxt_lvlhsh_is_empty(&port->rpc_streams)); diff --git a/src/nxt_port.h b/src/nxt_port.h index 74d37dcb..d916b34b 100644 --- a/src/nxt_port.h +++ b/src/nxt_port.h @@ -158,6 +158,9 @@ struct nxt_port_s { nxt_queue_link_t app_link; /* for nxt_app_t.ports */ nxt_app_t *app; + nxt_queue_link_t idle_link; /* for nxt_app_t.idle_ports */ + nxt_msec_t idle_start; + nxt_queue_t messages; /* of nxt_port_send_msg_t */ nxt_thread_mutex_t write_mutex; diff --git a/src/nxt_router.c b/src/nxt_router.c index 1ba7072d..19a37c69 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -12,11 +12,15 @@ typedef struct { nxt_str_t type; - uint32_t workers; + uint32_t processes; + uint32_t max_processes; + uint32_t spare_processes; nxt_msec_t timeout; nxt_msec_t res_timeout; + nxt_msec_t idle_timeout; uint32_t requests; nxt_conf_value_t *limits_value; + nxt_conf_value_t *processes_value; } nxt_router_app_conf_t; @@ -76,6 +80,12 @@ typedef struct { } nxt_socket_rpc_t; +typedef struct { + nxt_app_t *app; + nxt_router_temp_conf_t *temp_conf; +} nxt_app_rpc_t; + + struct nxt_port_select_state_s { nxt_app_t *app; nxt_req_app_link_t *ra; @@ -83,7 +93,7 @@ struct nxt_port_select_state_s { nxt_port_t *failed_port; int failed_port_use_delta; - nxt_bool_t can_start_worker; + uint8_t start_process; /* 1 bit */ nxt_req_app_link_t *shared_ra; nxt_port_t *port; }; @@ -96,7 +106,7 @@ static void nxt_router_port_select(nxt_task_t *task, static nxt_int_t nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state); -static nxt_int_t nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app); +static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app); nxt_inline void nxt_router_ra_inc_use(nxt_req_app_link_t *ra) @@ -136,6 +146,12 @@ static void nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data); static void nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data); +static void nxt_router_app_rpc_create(nxt_task_t *task, + nxt_router_temp_conf_t *tmcf, nxt_app_t *app); +static void nxt_router_app_prefork_ready(nxt_task_t *task, + nxt_port_recv_msg_t *msg, void *data); +static void nxt_router_app_prefork_error(nxt_task_t *task, + nxt_port_recv_msg_t *msg, void *data); static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_str_t *name); static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf, @@ -193,7 +209,7 @@ static void nxt_router_app_port_ready(nxt_task_t *task, static void nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data); -static nxt_port_t * nxt_router_app_get_idle_port(nxt_app_t *app); +static void nxt_router_app_quit(nxt_task_t *task, nxt_app_t *app); static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, uint32_t request_failed, uint32_t got_response); static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_app_t *app, @@ -209,6 +225,12 @@ static nxt_int_t nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, nxt_app_wmsg_t *wmsg); static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data); static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data); +static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, + void *data); +static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, + void *data); +static void nxt_router_app_release_handler(nxt_task_t *task, void *obj, + void *data); static const nxt_http_request_state_t nxt_http_request_send_state; static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data); @@ -253,7 +275,8 @@ nxt_router_start(nxt_task_t *task, void *data) static void -nxt_router_start_worker_handler(nxt_task_t *task, nxt_port_t *port, void *data) +nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, + void *data) { size_t size; uint32_t stream; @@ -268,7 +291,7 @@ nxt_router_start_worker_handler(nxt_task_t *task, nxt_port_t *port, void *data) rt = task->thread->runtime; main_port = rt->port_by_type[NXT_PROCESS_MAIN]; - nxt_debug(task, "app '%V' %p start worker", &app->name, app); + nxt_debug(task, "app '%V' %p start process", &app->name, app); size = app->name.length + 1 + app->conf.length; @@ -304,7 +327,7 @@ failed: nxt_thread_mutex_lock(&app->mutex); - app->pending_workers--; + app->pending_processes--; nxt_thread_mutex_unlock(&app->mutex); @@ -313,7 +336,7 @@ failed: static nxt_int_t -nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app) +nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app) { nxt_int_t res; nxt_port_t *router_port; @@ -324,7 +347,7 @@ nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app) nxt_router_app_use(task, app, 1); - res = nxt_port_post(task, router_port, nxt_router_start_worker_handler, + res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler, app); if (res == NXT_OK) { @@ -333,7 +356,7 @@ nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app) nxt_thread_mutex_lock(&app->mutex); - app->pending_workers--; + app->pending_processes--; nxt_thread_mutex_unlock(&app->mutex); @@ -737,7 +760,8 @@ nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) static void -nxt_router_worker_remove_pid(nxt_task_t *task, nxt_port_t *port, void *data) +nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port, + void *data) { union { nxt_pid_t removed_pid; @@ -763,7 +787,7 @@ nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0) { - nxt_port_post(task, engine->port, nxt_router_worker_remove_pid, + nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid, msg->u.data); } nxt_queue_loop; @@ -837,10 +861,27 @@ fail: } +nxt_inline nxt_bool_t +nxt_router_app_can_start(nxt_app_t *app) +{ + return app->processes + app->pending_processes < app->max_processes + && app->pending_processes < app->max_pending_processes; +} + + +nxt_inline nxt_bool_t +nxt_router_app_need_start(nxt_app_t *app) +{ + return app->idle_processes + app->pending_processes + < app->spare_processes; +} + + static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data) { nxt_int_t ret; + nxt_app_t *app; nxt_router_t *router; nxt_runtime_t *rt; nxt_queue_link_t *qlk; @@ -863,6 +904,15 @@ nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data) return; } + nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { + + if (nxt_router_app_need_start(app)) { + nxt_router_app_rpc_create(task, tmcf, app); + return; + } + + } nxt_queue_loop; + rt = task->thread->runtime; interface = nxt_service_get(rt->services, "engine", NULL); @@ -923,6 +973,7 @@ nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) static void nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) { + nxt_app_t *app; nxt_socket_t s; nxt_router_t *router; nxt_queue_link_t *qlk; @@ -944,6 +995,12 @@ nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) nxt_free(skcf->listen); } + nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { + + nxt_router_app_quit(task, app); + + } nxt_queue_loop; + router = tmcf->conf->router; nxt_queue_add(&router->sockets, &tmcf->keeping); @@ -984,15 +1041,21 @@ static nxt_conf_map_t nxt_router_app_conf[] = { }, { - nxt_string("workers"), + nxt_string("limits"), + NXT_CONF_MAP_PTR, + offsetof(nxt_router_app_conf_t, limits_value), + }, + + { + nxt_string("processes"), NXT_CONF_MAP_INT32, - offsetof(nxt_router_app_conf_t, workers), + offsetof(nxt_router_app_conf_t, processes), }, { - nxt_string("limits"), + nxt_string("processes"), NXT_CONF_MAP_PTR, - offsetof(nxt_router_app_conf_t, limits_value), + offsetof(nxt_router_app_conf_t, processes_value), }, }; @@ -1018,6 +1081,27 @@ static nxt_conf_map_t nxt_router_app_limits_conf[] = { }; +static nxt_conf_map_t nxt_router_app_processes_conf[] = { + { + nxt_string("spare"), + NXT_CONF_MAP_INT32, + offsetof(nxt_router_app_conf_t, spare_processes), + }, + + { + nxt_string("max"), + NXT_CONF_MAP_INT32, + offsetof(nxt_router_app_conf_t, max_processes), + }, + + { + nxt_string("idle_timeout"), + NXT_CONF_MAP_MSEC, + offsetof(nxt_router_app_conf_t, idle_timeout), + }, +}; + + static nxt_conf_map_t nxt_router_listener_conf[] = { { nxt_string("application"), @@ -1100,6 +1184,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_conf_value_t *applications, *application; nxt_conf_value_t *listeners, *listener; nxt_socket_conf_t *skcf; + nxt_event_engine_t *engine; nxt_app_lang_module_t *lang; nxt_router_app_conf_t apcf; nxt_router_listener_conf_t lscf; @@ -1174,11 +1259,15 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, continue; } - apcf.workers = 1; + apcf.processes = 1; + apcf.max_processes = 1; + apcf.spare_processes = 1; apcf.timeout = 0; apcf.res_timeout = 1000; + apcf.idle_timeout = 15000; apcf.requests = 0; apcf.limits_value = NULL; + apcf.processes_value = NULL; ret = nxt_conf_map_object(mp, application, nxt_router_app_conf, nxt_nitems(nxt_router_app_conf), &apcf); @@ -1204,10 +1293,27 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, } } + if (apcf.processes_value != NULL + && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT) + { + ret = nxt_conf_map_object(mp, apcf.processes_value, + nxt_router_app_processes_conf, + nxt_nitems(nxt_router_app_processes_conf), + &apcf); + if (ret != NXT_OK) { + nxt_log(task, NXT_LOG_CRIT, "application processes map error"); + goto app_fail; + } + + } else { + apcf.max_processes = apcf.processes; + apcf.spare_processes = apcf.processes; + } + nxt_debug(task, "application type: %V", &apcf.type); - nxt_debug(task, "application workers: %D", apcf.workers); - nxt_debug(task, "application request timeout: %D", apcf.timeout); - nxt_debug(task, "application reschedule timeout: %D", apcf.res_timeout); + nxt_debug(task, "application processes: %D", apcf.processes); + nxt_debug(task, "application request timeout: %M", apcf.timeout); + nxt_debug(task, "application reschedule timeout: %M", apcf.res_timeout); nxt_debug(task, "application requests: %D", apcf.requests); lang = nxt_app_lang_module(task->thread->runtime, &apcf.type); @@ -1226,6 +1332,8 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, } nxt_queue_init(&app->ports); + nxt_queue_init(&app->spare_ports); + nxt_queue_init(&app->idle_ports); nxt_queue_init(&app->requests); nxt_queue_init(&app->pending); @@ -1233,14 +1341,32 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_memcpy(app->name.start, name.start, name.length); app->type = lang->type; - app->max_workers = apcf.workers; + app->max_processes = apcf.max_processes; + app->spare_processes = apcf.spare_processes; + app->max_pending_processes = apcf.spare_processes + ? apcf.spare_processes : 1; app->timeout = apcf.timeout; app->res_timeout = apcf.res_timeout * 1000000; + app->idle_timeout = apcf.idle_timeout; app->live = 1; app->max_pending_responses = 2; app->max_requests = apcf.requests; app->prepare_msg = nxt_app_prepare_msg[lang->type]; + engine = task->thread->engine; + + app->engine = engine; + + app->idle_timer.precision = NXT_TIMER_DEFAULT_PRECISION; + app->idle_timer.work_queue = &engine->fast_work_queue; + app->idle_timer.handler = nxt_router_app_idle_timeout; + app->idle_timer.task = &engine->task; + app->idle_timer.log = app->idle_timer.task->log; + + app->adjust_idle_work.handler = nxt_router_adjust_idle_timer; + app->adjust_idle_work.task = &engine->task; + app->adjust_idle_work.obj = app; + nxt_queue_insert_tail(&tmcf->apps, &app->link); nxt_router_app_use(task, app, 1); @@ -1624,6 +1750,119 @@ nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, } +static void +nxt_router_app_rpc_create(nxt_task_t *task, + nxt_router_temp_conf_t *tmcf, nxt_app_t *app) +{ + size_t size; + uint32_t stream; + nxt_buf_t *b; + nxt_port_t *main_port, *router_port; + nxt_runtime_t *rt; + nxt_app_rpc_t *rpc; + + rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_app_rpc_t)); + if (rpc == NULL) { + goto fail; + } + + rpc->app = app; + rpc->temp_conf = tmcf; + + nxt_debug(task, "app '%V' prefork", &app->name); + + size = app->name.length + 1 + app->conf.length; + + b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); + if (nxt_slow_path(b == NULL)) { + goto fail; + } + + nxt_buf_cpystr(b, &app->name); + *b->mem.free++ = '\0'; + nxt_buf_cpystr(b, &app->conf); + + rt = task->thread->runtime; + main_port = rt->port_by_type[NXT_PROCESS_MAIN]; + router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; + + stream = nxt_port_rpc_register_handler(task, router_port, + nxt_router_app_prefork_ready, + nxt_router_app_prefork_error, + -1, rpc); + if (nxt_slow_path(stream == 0)) { + goto fail; + } + + app->pending_processes++; + + nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1, + stream, router_port->id, b); + + return; + +fail: + + nxt_router_conf_error(task, tmcf); +} + + +static void +nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, + void *data) +{ + nxt_app_t *app; + nxt_port_t *port; + nxt_app_rpc_t *rpc; + nxt_event_engine_t *engine; + + rpc = data; + app = rpc->app; + + port = msg->u.new_port; + port->app = app; + + nxt_router_app_use(task, app, 1); + + app->pending_processes--; + app->processes++; + app->idle_processes++; + + engine = task->thread->engine; + + nxt_queue_insert_tail(&app->ports, &port->app_link); + nxt_queue_insert_tail(&app->spare_ports, &port->idle_link); + + port->idle_start = 0; + + nxt_port_inc_use(port); + + nxt_work_queue_add(&engine->fast_work_queue, + nxt_router_conf_apply, task, rpc->temp_conf, NULL); +} + + +static void +nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, + void *data) +{ + nxt_app_t *app; + nxt_app_rpc_t *rpc; + nxt_router_temp_conf_t *tmcf; + + rpc = data; + app = rpc->app; + tmcf = rpc->temp_conf; + + nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"", + &app->name); + + app->pending_processes--; + + nxt_router_conf_error(task, tmcf); +} + + static nxt_int_t nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router, nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface) @@ -1933,32 +2172,11 @@ static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router, nxt_router_temp_conf_t *tmcf) { - nxt_app_t *app; - nxt_port_t *port; + nxt_app_t *app; nxt_queue_each(app, &router->apps, nxt_app_t, link) { - nxt_queue_remove(&app->link); - - nxt_debug(task, "about to free app '%V' %p", &app->name, app); - - app->live = 0; - - do { - port = nxt_router_app_get_idle_port(app); - if (port == NULL) { - break; - } - - nxt_debug(task, "port %p send quit", port); - - nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, - NULL); - - nxt_port_use(task, port, -1); - } while (1); - - nxt_router_app_use(task, app, -1); + nxt_router_app_quit(task, app); } nxt_queue_loop; @@ -2454,6 +2672,7 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, if (nxt_buf_mem_used_size(&b->mem) == 0) { nxt_work_queue_add(&task->thread->engine->fast_work_queue, b->completion_handler, task, b, b->parent); + } else { nxt_buf_chain_add(&r->out, b); } @@ -2561,14 +2780,15 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_thread_mutex_lock(&app->mutex); - nxt_assert(app->pending_workers != 0); + nxt_assert(app->pending_processes != 0); - app->pending_workers--; - app->workers++; + app->pending_processes--; + app->processes++; nxt_thread_mutex_unlock(&app->mutex); - nxt_debug(task, "app '%V' %p new port ready", &app->name, app); + nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d", + &app->name, port->pid, app->processes, app->pending_processes); nxt_router_app_port_release(task, port, 0, 0); } @@ -2590,9 +2810,9 @@ nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_thread_mutex_lock(&app->mutex); - nxt_assert(app->pending_workers != 0); + nxt_assert(app->pending_processes != 0); - app->pending_workers--; + app->pending_processes--; if (!nxt_queue_is_empty(&app->requests)) { lnk = nxt_queue_last(&app->requests); @@ -2611,7 +2831,7 @@ nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_debug(task, "app '%V' %p abort next stream #%uD", &app->name, app, ra->stream); - nxt_router_ra_error(ra, 500, "Failed to start application worker"); + nxt_router_ra_error(ra, 500, "Failed to start application process"); nxt_router_ra_use(task, ra, -1); } @@ -2629,10 +2849,13 @@ nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i) if (i < 0 && c == -i) { nxt_assert(app->live == 0); - nxt_assert(app->workers == 0); - nxt_assert(app->pending_workers == 0); + nxt_assert(app->processes == 0); + nxt_assert(app->idle_processes == 0); + nxt_assert(app->pending_processes == 0); nxt_assert(nxt_queue_is_empty(&app->requests) != 0); nxt_assert(nxt_queue_is_empty(&app->ports) != 0); + nxt_assert(nxt_queue_is_empty(&app->spare_ports) != 0); + nxt_assert(nxt_queue_is_empty(&app->idle_ports) != 0); nxt_thread_mutex_destroy(&app->mutex); nxt_free(app); @@ -2666,6 +2889,19 @@ nxt_router_pop_first_port(nxt_app_t *app) port->app_pending_responses++; + if (nxt_queue_chk_remove(&port->idle_link)) { + app->idle_processes--; + + if (port->idle_start == 0) { + nxt_assert(app->idle_processes < app->spare_processes); + + } else { + nxt_assert(app->idle_processes >= app->spare_processes); + + port->idle_start = 0; + } + } + if ((app->max_pending_responses == 0 || port->app_pending_responses < app->max_pending_responses) && (app->max_requests == 0 @@ -2684,8 +2920,8 @@ nxt_router_pop_first_port(nxt_app_t *app) } -static nxt_port_t * -nxt_router_app_get_idle_port(nxt_app_t *app) +nxt_inline nxt_port_t * +nxt_router_app_get_port_for_quit(nxt_app_t *app) { nxt_port_t *port; @@ -2701,8 +2937,16 @@ nxt_router_app_get_idle_port(nxt_app_t *app) continue; } - nxt_queue_remove(&port->app_link); - port->app_link.next = NULL; + /* Caller is responsible to decrease port use count. */ + nxt_queue_chk_remove(&port->app_link); + + if (nxt_queue_chk_remove(&port->idle_link)) { + app->idle_processes--; + } + + /* Caller is responsible to decrease app use count. */ + port->app = NULL; + app->processes--; break; @@ -2715,6 +2959,41 @@ nxt_router_app_get_idle_port(nxt_app_t *app) static void +nxt_router_app_quit(nxt_task_t *task, nxt_app_t *app) +{ + nxt_port_t *port; + + nxt_queue_remove(&app->link); + + app->live = 0; + + for ( ;; ) { + port = nxt_router_app_get_port_for_quit(app); + if (port == NULL) { + break; + } + + nxt_debug(task, "send QUIT to app '%V' pid %PI", &app->name, port->pid); + + nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); + + nxt_port_use(task, port, -1); + nxt_router_app_use(task, app, -1); + } + + if (nxt_timer_is_in_tree(&app->idle_timer)) { + nxt_assert(app->engine == task->thread->engine); + + app->idle_timer.handler = nxt_router_app_release_handler; + nxt_timer_add(app->engine, &app->idle_timer, 0); + + } else { + nxt_router_app_use(task, app, -1); + } +} + + +static void nxt_router_app_process_request(nxt_task_t *task, void *obj, void *data) { nxt_app_t *app; @@ -2739,7 +3018,8 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, uint32_t request_failed, uint32_t got_response) { nxt_app_t *app; - nxt_bool_t send_quit, cancelled; + nxt_bool_t port_unchained; + nxt_bool_t send_quit, cancelled, adjust_idle_timer; nxt_queue_link_t *lnk; nxt_req_app_link_t *ra, *pending_ra, *re_ra; nxt_port_select_state_t state; @@ -2856,12 +3136,52 @@ app_dead: re_ra_cancelled: - send_quit = (app->live == 0 && port->app_pending_responses > 0) + send_quit = (app->live == 0 && port->app_pending_responses == 0) || (app->max_requests > 0 && port->app_pending_responses == 0 && port->app_responses >= app->max_requests); + if (send_quit) { + port_unchained = nxt_queue_chk_remove(&port->app_link); + + port->app = NULL; + app->processes--; + + } else { + port_unchained = 0; + } + + adjust_idle_timer = 0; + + if (!send_quit && port->app_pending_responses == 0) { + nxt_assert(port->idle_link.next == NULL); + + if (app->idle_processes == app->spare_processes + && app->adjust_idle_work.data == NULL) + { + adjust_idle_timer = 1; + app->adjust_idle_work.data = app; + app->adjust_idle_work.next = NULL; + } + + if (app->idle_processes < app->spare_processes) { + nxt_queue_insert_tail(&app->spare_ports, &port->idle_link); + + } else { + nxt_queue_insert_tail(&app->idle_ports, &port->idle_link); + + port->idle_start = task->thread->engine->timers.now; + } + + app->idle_processes++; + } + nxt_thread_mutex_unlock(&app->mutex); + if (adjust_idle_timer) { + nxt_router_app_use(task, app, 1); + nxt_event_engine_post(app->engine, &app->adjust_idle_work); + } + if (pending_ra != NULL) { nxt_router_ra_use(task, pending_ra, -1); } @@ -2893,12 +3213,18 @@ re_ra_cancelled: } if (send_quit) { - nxt_debug(task, "app '%V' %p is not alive, send QUIT to port", + nxt_debug(task, "app '%V' %p send QUIT to port", &app->name, app); nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); + if (port_unchained) { + nxt_port_use(task, port, -1); + } + + nxt_router_app_use(task, app, -1); + goto adjust_use; } @@ -2916,8 +3242,10 @@ adjust_use: void nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port) { - nxt_app_t *app; - nxt_bool_t unchain, start_worker; + nxt_app_t *app; + nxt_bool_t unchain, start_process; + nxt_port_t *idle_port; + nxt_queue_link_t *idle_lnk; app = port->app; @@ -2925,41 +3253,162 @@ nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port) nxt_thread_mutex_lock(&app->mutex); - unchain = port->app_link.next != NULL; + unchain = nxt_queue_chk_remove(&port->app_link); - if (unchain) { - nxt_queue_remove(&port->app_link); - port->app_link.next = NULL; + if (nxt_queue_chk_remove(&port->idle_link)) { + app->idle_processes--; + + if (port->idle_start == 0 + && app->idle_processes >= app->spare_processes) + { + nxt_assert(!nxt_queue_is_empty(&app->idle_ports)); + + idle_lnk = nxt_queue_last(&app->idle_ports); + idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link); + nxt_queue_remove(idle_lnk); + + nxt_queue_insert_tail(&app->spare_ports, idle_lnk); + + idle_port->idle_start = 0; + } } - app->workers--; + app->processes--; - start_worker = app->live != 0 - && nxt_queue_is_empty(&app->requests) == 0 - && app->workers + app->pending_workers < app->max_workers; + start_process = app->live != 0 + && !task->thread->engine->shutdown + && nxt_router_app_can_start(app) + && (!nxt_queue_is_empty(&app->requests) + || nxt_router_app_need_start(app)); - if (start_worker) { - app->pending_workers++; + if (start_process) { + app->pending_processes++; } nxt_thread_mutex_unlock(&app->mutex); - nxt_debug(task, "app '%V' %p port %p close", &app->name, app, port); + nxt_debug(task, "app '%V' pid %PI closed", &app->name, port->pid); if (unchain) { nxt_port_use(task, port, -1); } - if (start_worker) { - nxt_router_start_worker(task, app); + if (start_process) { + nxt_router_start_app_process(task, app); } } static void +nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data) +{ + nxt_app_t *app; + nxt_bool_t queued; + nxt_port_t *port; + nxt_msec_t timeout, threshold; + nxt_queue_link_t *lnk; + nxt_event_engine_t *engine; + + app = obj; + queued = (data == app); + + nxt_debug(task, "nxt_router_adjust_idle_timer: app \"%V\", queued %b", + &app->name, queued); + + engine = task->thread->engine; + + nxt_assert(app->engine == engine); + + threshold = engine->timers.now + app->idle_timer.precision; + timeout = 0; + + nxt_thread_mutex_lock(&app->mutex); + + if (queued) { + app->adjust_idle_work.data = NULL; + } + + while (app->idle_processes > app->spare_processes) { + + nxt_assert(nxt_queue_is_empty(&app->idle_ports) == 0); + + lnk = nxt_queue_first(&app->idle_ports); + port = nxt_queue_link_data(lnk, nxt_port_t, idle_link); + + timeout = port->idle_start + app->idle_timeout; + + if (timeout > threshold) { + break; + } + + nxt_queue_remove(lnk); + lnk->next = NULL; + + nxt_queue_chk_remove(&port->app_link); + + app->idle_processes--; + app->processes--; + port->app = NULL; + + nxt_thread_mutex_unlock(&app->mutex); + + nxt_debug(task, "app '%V' send QUIT to idle port %PI", + &app->name, port->pid); + + nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); + + nxt_port_use(task, port, -1); + nxt_router_app_use(task, app, -1); + + nxt_thread_mutex_lock(&app->mutex); + } + + nxt_thread_mutex_unlock(&app->mutex); + + if (timeout > threshold) { + nxt_timer_add(engine, &app->idle_timer, timeout - threshold); + + } else { + nxt_timer_disable(engine, &app->idle_timer); + } + + if (queued) { + nxt_router_app_use(task, app, -1); + } +} + + +static void +nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, void *data) +{ + nxt_app_t *app; + nxt_timer_t *timer; + + timer = obj; + app = nxt_container_of(timer, nxt_app_t, idle_timer); + + nxt_router_adjust_idle_timer(task, app, NULL); +} + + +static void +nxt_router_app_release_handler(nxt_task_t *task, void *obj, void *data) +{ + nxt_app_t *app; + nxt_timer_t *timer; + + timer = obj; + app = nxt_container_of(timer, nxt_app_t, idle_timer); + + nxt_router_app_use(task, app, -1); +} + + +static void nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state) { nxt_app_t *app; + nxt_bool_t can_start_process; nxt_req_app_link_t *ra; ra = state->ra; @@ -2996,12 +3445,13 @@ nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state) ra->app_port = NULL; } - state->can_start_worker = (app->workers + app->pending_workers) - < app->max_workers; + can_start_process = nxt_router_app_can_start(app); + state->port = NULL; + state->start_process = 0; if (nxt_queue_is_empty(&app->ports) - || (state->can_start_worker && nxt_router_app_first_port_busy(app)) ) + || (can_start_process && nxt_router_app_first_port_busy(app)) ) { ra = nxt_router_ra_create(task, ra); @@ -3020,8 +3470,9 @@ nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state) nxt_debug(task, "ra stream #%uD enqueue to app->requests", ra->stream); - if (state->can_start_worker) { - app->pending_workers++; + if (can_start_process) { + app->pending_processes++; + state->start_process = 1; } } else { @@ -3038,6 +3489,11 @@ nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state) nxt_router_ra_pending(task, app, ra); } + + if (can_start_process && nxt_router_app_need_start(app)) { + app->pending_processes++; + state->start_process = 1; + } } fail: @@ -3077,20 +3533,24 @@ nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state) ra->app_port = state->port; + if (state->start_process) { + nxt_router_start_app_process(task, app); + } + return NXT_OK; } - if (!state->can_start_worker) { - nxt_debug(task, "app '%V' %p too many running or pending workers", + if (!state->start_process) { + nxt_debug(task, "app '%V' %p too many running or pending processes", &app->name, app); return NXT_AGAIN; } - res = nxt_router_start_worker(task, app); + res = nxt_router_start_app_process(task, app); if (nxt_slow_path(res != NXT_OK)) { - nxt_router_ra_error(ra, 500, "Failed to start worker"); + nxt_router_ra_error(ra, 500, "Failed to start app process"); nxt_router_ra_use(task, ra, -1); return NXT_ERROR; @@ -3227,7 +3687,7 @@ nxt_router_app_prepare_request(nxt_task_t *task, nxt_req_app_link_t *ra) goto release_port; } - nxt_debug(task, "about to send %O bytes buffer to worker port %d", + nxt_debug(task, "about to send %O bytes buffer to app process port %d", nxt_buf_used_size(wmsg.write), wmsg.port->socket.fd); diff --git a/src/nxt_router.h b/src/nxt_router.h index 1370262c..a904901c 100644 --- a/src/nxt_router.h +++ b/src/nxt_router.h @@ -83,18 +83,29 @@ struct nxt_app_s { nxt_thread_mutex_t mutex; /* Protects ports queue. */ nxt_queue_t ports; /* of nxt_port_t.app_link */ + nxt_queue_t spare_ports; /* of nxt_port_t.idle_link */ + nxt_queue_t idle_ports; /* of nxt_port_t.idle_link */ + nxt_timer_t idle_timer; + nxt_work_t adjust_idle_work; + nxt_event_engine_t *engine; + nxt_queue_t requests; /* of nxt_req_app_link_t */ nxt_queue_t pending; /* of nxt_req_app_link_t */ nxt_str_t name; - uint32_t pending_workers; - uint32_t workers; - uint32_t max_workers; + uint32_t pending_processes; + uint32_t processes; + uint32_t idle_processes; + + uint32_t max_processes; + uint32_t spare_processes; + uint32_t max_pending_processes; uint32_t max_pending_responses; uint32_t max_requests; nxt_msec_t timeout; nxt_nsec_t res_timeout; + nxt_msec_t idle_timeout; nxt_app_type_t type:8; uint8_t live; /* 1 bit */ diff --git a/test/test_configuration.py b/test/test_configuration.py index f7490069..700e4aa1 100644 --- a/test/test_configuration.py +++ b/test/test_configuration.py @@ -14,7 +14,7 @@ class TestUnitConfiguration(unit.TestUnitControl): { "ap\u0070": { "type": "\u0070ython", - "workers": 1, + "processes": { "spare": 0 }, "path": "\u002Fapp", "module": "wsgi" } @@ -25,7 +25,7 @@ class TestUnitConfiguration(unit.TestUnitControl): self.assertIn('success', self.conf({ "приложение": { "type": "python", - "workers": 1, + "processes": { "spare": 0 }, "path": "/app", "module": "wsgi" } @@ -36,7 +36,7 @@ class TestUnitConfiguration(unit.TestUnitControl): { "app": { "type": "python", - "workers": \u0031, + "processes": { "spare": \u0030 }, "path": "/app", "module": "wsgi" } @@ -49,18 +49,16 @@ class TestUnitConfiguration(unit.TestUnitControl): def test_applications_string(self): self.assertIn('error', self.conf('"{}"', '/applications'), 'string') - @unittest.expectedFailure - def test_negative_workers(self): + def test_negative_spare(self): self.assertIn('error', self.conf({ "app": { "type": "python", - "workers": -1, + "processes": { "spare": -1 }, "path": "/app", "module": "wsgi" } - }, '/applications'), 'negative workers') + }, '/applications'), 'negative spare') - @unittest.expectedFailure def test_applications_type_only(self): self.assertIn('error', self.conf({ "app": { @@ -73,7 +71,7 @@ class TestUnitConfiguration(unit.TestUnitControl): { app": { "type": "python", - "workers": 1, + "processes": { "spare": 0 }, "path": "/app", "module": "wsgi" } @@ -85,7 +83,7 @@ class TestUnitConfiguration(unit.TestUnitControl): { "app" { "type": "python", - "workers": 1, + "processes": { "spare": 0 }, "path": "/app", "module": "wsgi" } @@ -97,7 +95,7 @@ class TestUnitConfiguration(unit.TestUnitControl): { "app": { "type": "python" - "workers": 1, + "processes": { "spare": 0 }, "path": "/app", "module": "wsgi" } @@ -112,7 +110,7 @@ class TestUnitConfiguration(unit.TestUnitControl): self.assertIn('success', self.conf({ "app": { "type": "python", - "workers": 1, + "processes": { "spare": 0 }, "path": "../app", "module": "wsgi" } @@ -137,7 +135,7 @@ class TestUnitConfiguration(unit.TestUnitControl): "applications": { "app": { "type": "python", - "workers": 1, + "processes": { "spare": 0 }, "path": "/app", "module": "wsgi" } @@ -154,7 +152,7 @@ class TestUnitConfiguration(unit.TestUnitControl): "applications": { "app": { "type": "python", - "workers": 1, + "processes": { "spare": 0 }, "path": "/app", "module": "wsgi" } @@ -171,7 +169,7 @@ class TestUnitConfiguration(unit.TestUnitControl): "applications": { "app": { "type": "python", - "workers": 1, + "processes": { "spare": 0 }, "path": "/app", "module": "wsgi" } @@ -188,7 +186,7 @@ class TestUnitConfiguration(unit.TestUnitControl): "applications": { "app": { "type": "python", - "workers": 1, + "processes": { "spare": 0 }, "path": "/app", "module": "wsgi" } diff --git a/test/test_php_basic.py b/test/test_php_basic.py index dd757925..1487a9c5 100644 --- a/test/test_php_basic.py +++ b/test/test_php_basic.py @@ -9,7 +9,7 @@ class TestUnitBasic(unit.TestUnitControl): conf_app = { "app": { "type": "php", - "workers": 1, + "processes": { "spare": 0 }, "root": "/app", "index": "index.php" } @@ -34,7 +34,7 @@ class TestUnitBasic(unit.TestUnitControl): { "app": { "type": "php", - "workers": 1, + "processes": { "spare": 0 }, "root": "/app", "index": "index.php" } @@ -48,7 +48,7 @@ class TestUnitBasic(unit.TestUnitControl): { "app": { "type": "php", - "workers": 1, + "processes": { "spare": 0 }, "root": "/app", "index": "index.php" } @@ -61,7 +61,7 @@ class TestUnitBasic(unit.TestUnitControl): self.assertEqual(self.conf_get('/applications/app'), { "type": "php", - "workers": 1, + "processes": { "spare": 0 }, "root": "/app", "index": "index.php" }, @@ -72,8 +72,8 @@ class TestUnitBasic(unit.TestUnitControl): self.assertEqual(self.conf_get('/applications/app/type'), 'php', 'type') - self.assertEqual(self.conf_get('/applications/app/workers'), 1, - 'workers') + self.assertEqual(self.conf_get('/applications/app/processes/spare'), 0, + 'spare processes') def test_php_get_listeners(self): self.conf(self.conf_basic) @@ -118,9 +118,9 @@ class TestUnitBasic(unit.TestUnitControl): def test_php_change_application(self): self.conf(self.conf_basic) - self.conf('30', '/applications/app/workers') - self.assertEqual(self.conf_get('/applications/app/workers'), 30, - 'change application workers') + self.conf('30', '/applications/app/processes/max') + self.assertEqual(self.conf_get('/applications/app/processes/max'), 30, + 'change application max') self.conf('"/www"', '/applications/app/root') self.assertEqual(self.conf_get('/applications/app/root'), '/www', diff --git a/test/test_python_application.py b/test/test_python_application.py index 4fcedc43..1b99ea44 100644 --- a/test/test_python_application.py +++ b/test/test_python_application.py @@ -19,7 +19,7 @@ class TestUnitApplication(unit.TestUnitControl): "applications": { "app": { "type": "python", - "workers": 1, + "processes": { "spare": 0 }, "path": self.testdir + '/' + name, "module": "wsgi" } diff --git a/test/test_python_atexit.py b/test/test_python_atexit.py index 4dc984c1..4c8a6543 100644 --- a/test/test_python_atexit.py +++ b/test/test_python_atexit.py @@ -37,7 +37,7 @@ def application(env, start_response): "applications": { "app": { "type": "python", - "workers": 1, + "processes": { "spare": 0 }, "path": self.testdir + '/' + name, "module": "wsgi" } diff --git a/test/test_python_basic.py b/test/test_python_basic.py index ba63f4c6..ed0bd738 100644 --- a/test/test_python_basic.py +++ b/test/test_python_basic.py @@ -9,7 +9,7 @@ class TestUnitBasic(unit.TestUnitControl): conf_app = { "app": { "type": "python", - "workers": 1, + "processes": { "spare": 0 }, "path": "/app", "module": "wsgi" } @@ -44,7 +44,7 @@ class TestUnitBasic(unit.TestUnitControl): { "app": { "type": "python", - "workers": 1, + "processes": { "spare": 0 }, "path": "/app", "module": "wsgi" } @@ -58,7 +58,7 @@ class TestUnitBasic(unit.TestUnitControl): { "app": { "type": "python", - "workers": 1, + "processes": { "spare": 0 }, "path": "/app", "module":"wsgi" } @@ -71,7 +71,7 @@ class TestUnitBasic(unit.TestUnitControl): self.assertEqual(self.conf_get('/applications/app'), { "type": "python", - "workers": 1, + "processes": { "spare": 0 }, "path": "/app", "module": "wsgi" }, @@ -82,8 +82,8 @@ class TestUnitBasic(unit.TestUnitControl): self.assertEqual(self.conf_get('/applications/app/type'), 'python', 'type') - self.assertEqual(self.conf_get('/applications/app/workers'), 1, - 'workers') + self.assertEqual(self.conf_get('/applications/app/processes/spare'), 0, + 'spare') def test_python_get_listeners(self): self.conf(self.conf_basic) @@ -128,9 +128,9 @@ class TestUnitBasic(unit.TestUnitControl): def test_python_change_application(self): self.conf(self.conf_basic) - self.conf('30', '/applications/app/workers') - self.assertEqual(self.conf_get('/applications/app/workers'), 30, - 'change application workers') + self.conf('30', '/applications/app/processes/max') + self.assertEqual(self.conf_get('/applications/app/processes/max'), 30, + 'change application max') self.conf('"/www"', '/applications/app/path') self.assertEqual(self.conf_get('/applications/app/path'), '/www', diff --git a/test/test_python_keepalive.py b/test/test_python_keepalive.py index 518befba..55607e9c 100644 --- a/test/test_python_keepalive.py +++ b/test/test_python_keepalive.py @@ -37,7 +37,7 @@ def application(environ, start_response): "applications": { "app": { "type": "python", - "workers": 1, + "processes": { "spare": 0 }, "path": self.testdir + '/' + name, "module": "wsgi" } diff --git a/test/test_python_procman.py b/test/test_python_procman.py new file mode 100644 index 00000000..5d2a5f50 --- /dev/null +++ b/test/test_python_procman.py @@ -0,0 +1,237 @@ +import os +import time +import unittest +import unit + +class TestUnitApplication(unit.TestUnitControl): + + def setUpClass(): + u = unit.TestUnit() + + u.check_modules('python') + u.check_version('0.3') + + def getWorkerCount(self): + n = 0 + for f in os.listdir(self.testdir): + if f.startswith('proctest.'): + n += 1 + + return n + + def getTestCode(self): + return """ +import atexit +import os + +fname = "%s.%%d" %% os.getpid() + +def remove_file(): + os.remove(fname) + +atexit.register(remove_file) + +open(fname, 'w') + +def application(env, start_response): + start_response('200 OK', [('Content-Type','text/html')]) + return [b'body'] + +""" % (self.testdir + '/proctest') + + + def test_python_prefork(self): + code, name = self.getTestCode(), 'py_app' + + self.python_application(name, code) + + self.conf({ + "listeners": { + "*:7080": { + "application": "app" + } + }, + "applications": { + "app": { + "type": "python", + "processes": 2, + "path": self.testdir + '/' + name, + "module": "wsgi" + } + } + }) + + self.assertEqual(self.getWorkerCount(), 2, 'python prefork 2 processes') + + self.get() + self.assertEqual(self.getWorkerCount(), 2, 'python prefork, still 2') + + self.conf('4', '/applications/app/processes') + + time.sleep(0.2) + + self.assertEqual(self.getWorkerCount(), 4, 'python prefork 4 processes') + + self.get() + self.assertEqual(self.getWorkerCount(), 4, 'python prefork, still 4') + + self.conf({ + "listeners": {}, + "applications": {} + }) + + time.sleep(0.2) + self.assertEqual(self.getWorkerCount(), 0, 'python stop all processes') + + time.sleep(2.2) + + + def test_python_ondemand(self): + code, name = self.getTestCode(), 'py_app' + + self.python_application(name, code) + + self.conf({ + "listeners": { + "*:7080": { + "application": "app" + } + }, + "applications": { + "app": { + "type": "python", + "processes": { + "spare": 0, + "max": 8, + "idle_timeout": 2 + }, + "path": self.testdir + '/' + name, + "module": "wsgi" + } + } + }) + + self.assertEqual(self.getWorkerCount(), 0, 'python on-demand') + + self.get() + self.assertEqual(self.getWorkerCount(), 1, 'python start on-demand') + + self.get() + self.assertEqual(self.getWorkerCount(), 1, 'python still 1') + + time.sleep(2.2) + self.assertEqual(self.getWorkerCount(), 0, 'python stop idle') + + self.conf({ + "listeners": {}, + "applications": {} + }) + + time.sleep(0.2) + self.assertEqual(self.getWorkerCount(), 0, 'python stop all processes') + + time.sleep(2.2) + + def test_python_scale_updown(self): + code, name = self.getTestCode(), 'py_app' + + self.python_application(name, code) + + self.conf({ + "listeners": { + "*:7080": { + "application": "app" + } + }, + "applications": { + "app": { + "type": "python", + "processes": { + "spare": 2, + "max": 8, + "idle_timeout": 2 + }, + "path": self.testdir + '/' + name, + "module": "wsgi" + } + } + }) + + self.assertEqual(self.getWorkerCount(), 2, 'python prefork 2') + + self.get() + time.sleep(0.2) + self.assertEqual(self.getWorkerCount(), 3, 'python keep 2 idle, 1 busy') + + self.get() + time.sleep(0.2) + self.assertEqual(self.getWorkerCount(), 3, 'python still 3') + + time.sleep(2.2) + self.assertEqual(self.getWorkerCount(), 2, 'python stop idle') + + self.get() + + time.sleep(0.5) + self.assertEqual(self.getWorkerCount(), 3, 'python keep 2 idle, 1 busy') + + self.conf({ + "listeners": {}, + "applications": {} + }) + + time.sleep(0.2) + self.assertEqual(self.getWorkerCount(), 0, 'python stop all processes') + + time.sleep(2.2) + + def test_python_reconfigure(self): + code, name = self.getTestCode(), 'py_app' + + self.python_application(name, code) + + self.conf({ + "listeners": { + "*:7080": { + "application": "app" + } + }, + "applications": { + "app": { + "type": "python", + "processes": { + "spare": 2, + "max": 6, + "idle_timeout": 2 + }, + "path": self.testdir + '/' + name, + "module": "wsgi" + } + } + }) + + self.assertEqual(self.getWorkerCount(), 2, 'python prefork 2') + + self.get() + time.sleep(0.2) + self.assertEqual(self.getWorkerCount(), 3, 'python keep 2 idle, 1 busy') + + self.conf('6', '/applications/app/processes/spare') + self.assertEqual(self.getWorkerCount(), 6, 'python prefork 6') + + self.get() + time.sleep(0.2) + self.assertEqual(self.getWorkerCount(), 6, 'python still 6') + + self.conf({ + "listeners": {}, + "applications": {} + }) + + time.sleep(0.2) + self.assertEqual(self.getWorkerCount(), 0, 'python stop all processes') + + time.sleep(2.2) + +if __name__ == '__main__': + unittest.main() |