diff options
Diffstat (limited to '')
-rw-r--r-- | src/nxt_router.c | 630 |
1 files changed, 545 insertions, 85 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c index 1ba7072d..19a37c69 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -12,11 +12,15 @@ typedef struct { nxt_str_t type; - uint32_t workers; + uint32_t processes; + uint32_t max_processes; + uint32_t spare_processes; nxt_msec_t timeout; nxt_msec_t res_timeout; + nxt_msec_t idle_timeout; uint32_t requests; nxt_conf_value_t *limits_value; + nxt_conf_value_t *processes_value; } nxt_router_app_conf_t; @@ -76,6 +80,12 @@ typedef struct { } nxt_socket_rpc_t; +typedef struct { + nxt_app_t *app; + nxt_router_temp_conf_t *temp_conf; +} nxt_app_rpc_t; + + struct nxt_port_select_state_s { nxt_app_t *app; nxt_req_app_link_t *ra; @@ -83,7 +93,7 @@ struct nxt_port_select_state_s { nxt_port_t *failed_port; int failed_port_use_delta; - nxt_bool_t can_start_worker; + uint8_t start_process; /* 1 bit */ nxt_req_app_link_t *shared_ra; nxt_port_t *port; }; @@ -96,7 +106,7 @@ static void nxt_router_port_select(nxt_task_t *task, static nxt_int_t nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state); -static nxt_int_t nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app); +static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app); nxt_inline void nxt_router_ra_inc_use(nxt_req_app_link_t *ra) @@ -136,6 +146,12 @@ static void nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data); static void nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data); +static void nxt_router_app_rpc_create(nxt_task_t *task, + nxt_router_temp_conf_t *tmcf, nxt_app_t *app); +static void nxt_router_app_prefork_ready(nxt_task_t *task, + nxt_port_recv_msg_t *msg, void *data); +static void nxt_router_app_prefork_error(nxt_task_t *task, + nxt_port_recv_msg_t *msg, void *data); static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_str_t *name); static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf, @@ -193,7 +209,7 @@ static void nxt_router_app_port_ready(nxt_task_t *task, static void nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data); -static nxt_port_t * nxt_router_app_get_idle_port(nxt_app_t *app); +static void nxt_router_app_quit(nxt_task_t *task, nxt_app_t *app); static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, uint32_t request_failed, uint32_t got_response); static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_app_t *app, @@ -209,6 +225,12 @@ static nxt_int_t nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, nxt_app_wmsg_t *wmsg); static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data); static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data); +static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, + void *data); +static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, + void *data); +static void nxt_router_app_release_handler(nxt_task_t *task, void *obj, + void *data); static const nxt_http_request_state_t nxt_http_request_send_state; static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data); @@ -253,7 +275,8 @@ nxt_router_start(nxt_task_t *task, void *data) static void -nxt_router_start_worker_handler(nxt_task_t *task, nxt_port_t *port, void *data) +nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, + void *data) { size_t size; uint32_t stream; @@ -268,7 +291,7 @@ nxt_router_start_worker_handler(nxt_task_t *task, nxt_port_t *port, void *data) rt = task->thread->runtime; main_port = rt->port_by_type[NXT_PROCESS_MAIN]; - nxt_debug(task, "app '%V' %p start worker", &app->name, app); + nxt_debug(task, "app '%V' %p start process", &app->name, app); size = app->name.length + 1 + app->conf.length; @@ -304,7 +327,7 @@ failed: nxt_thread_mutex_lock(&app->mutex); - app->pending_workers--; + app->pending_processes--; nxt_thread_mutex_unlock(&app->mutex); @@ -313,7 +336,7 @@ failed: static nxt_int_t -nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app) +nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app) { nxt_int_t res; nxt_port_t *router_port; @@ -324,7 +347,7 @@ nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app) nxt_router_app_use(task, app, 1); - res = nxt_port_post(task, router_port, nxt_router_start_worker_handler, + res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler, app); if (res == NXT_OK) { @@ -333,7 +356,7 @@ nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app) nxt_thread_mutex_lock(&app->mutex); - app->pending_workers--; + app->pending_processes--; nxt_thread_mutex_unlock(&app->mutex); @@ -737,7 +760,8 @@ nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) static void -nxt_router_worker_remove_pid(nxt_task_t *task, nxt_port_t *port, void *data) +nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port, + void *data) { union { nxt_pid_t removed_pid; @@ -763,7 +787,7 @@ nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0) { - nxt_port_post(task, engine->port, nxt_router_worker_remove_pid, + nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid, msg->u.data); } nxt_queue_loop; @@ -837,10 +861,27 @@ fail: } +nxt_inline nxt_bool_t +nxt_router_app_can_start(nxt_app_t *app) +{ + return app->processes + app->pending_processes < app->max_processes + && app->pending_processes < app->max_pending_processes; +} + + +nxt_inline nxt_bool_t +nxt_router_app_need_start(nxt_app_t *app) +{ + return app->idle_processes + app->pending_processes + < app->spare_processes; +} + + static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data) { nxt_int_t ret; + nxt_app_t *app; nxt_router_t *router; nxt_runtime_t *rt; nxt_queue_link_t *qlk; @@ -863,6 +904,15 @@ nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data) return; } + nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { + + if (nxt_router_app_need_start(app)) { + nxt_router_app_rpc_create(task, tmcf, app); + return; + } + + } nxt_queue_loop; + rt = task->thread->runtime; interface = nxt_service_get(rt->services, "engine", NULL); @@ -923,6 +973,7 @@ nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) static void nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) { + nxt_app_t *app; nxt_socket_t s; nxt_router_t *router; nxt_queue_link_t *qlk; @@ -944,6 +995,12 @@ nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) nxt_free(skcf->listen); } + nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { + + nxt_router_app_quit(task, app); + + } nxt_queue_loop; + router = tmcf->conf->router; nxt_queue_add(&router->sockets, &tmcf->keeping); @@ -984,15 +1041,21 @@ static nxt_conf_map_t nxt_router_app_conf[] = { }, { - nxt_string("workers"), + nxt_string("limits"), + NXT_CONF_MAP_PTR, + offsetof(nxt_router_app_conf_t, limits_value), + }, + + { + nxt_string("processes"), NXT_CONF_MAP_INT32, - offsetof(nxt_router_app_conf_t, workers), + offsetof(nxt_router_app_conf_t, processes), }, { - nxt_string("limits"), + nxt_string("processes"), NXT_CONF_MAP_PTR, - offsetof(nxt_router_app_conf_t, limits_value), + offsetof(nxt_router_app_conf_t, processes_value), }, }; @@ -1018,6 +1081,27 @@ static nxt_conf_map_t nxt_router_app_limits_conf[] = { }; +static nxt_conf_map_t nxt_router_app_processes_conf[] = { + { + nxt_string("spare"), + NXT_CONF_MAP_INT32, + offsetof(nxt_router_app_conf_t, spare_processes), + }, + + { + nxt_string("max"), + NXT_CONF_MAP_INT32, + offsetof(nxt_router_app_conf_t, max_processes), + }, + + { + nxt_string("idle_timeout"), + NXT_CONF_MAP_MSEC, + offsetof(nxt_router_app_conf_t, idle_timeout), + }, +}; + + static nxt_conf_map_t nxt_router_listener_conf[] = { { nxt_string("application"), @@ -1100,6 +1184,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_conf_value_t *applications, *application; nxt_conf_value_t *listeners, *listener; nxt_socket_conf_t *skcf; + nxt_event_engine_t *engine; nxt_app_lang_module_t *lang; nxt_router_app_conf_t apcf; nxt_router_listener_conf_t lscf; @@ -1174,11 +1259,15 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, continue; } - apcf.workers = 1; + apcf.processes = 1; + apcf.max_processes = 1; + apcf.spare_processes = 1; apcf.timeout = 0; apcf.res_timeout = 1000; + apcf.idle_timeout = 15000; apcf.requests = 0; apcf.limits_value = NULL; + apcf.processes_value = NULL; ret = nxt_conf_map_object(mp, application, nxt_router_app_conf, nxt_nitems(nxt_router_app_conf), &apcf); @@ -1204,10 +1293,27 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, } } + if (apcf.processes_value != NULL + && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT) + { + ret = nxt_conf_map_object(mp, apcf.processes_value, + nxt_router_app_processes_conf, + nxt_nitems(nxt_router_app_processes_conf), + &apcf); + if (ret != NXT_OK) { + nxt_log(task, NXT_LOG_CRIT, "application processes map error"); + goto app_fail; + } + + } else { + apcf.max_processes = apcf.processes; + apcf.spare_processes = apcf.processes; + } + nxt_debug(task, "application type: %V", &apcf.type); - nxt_debug(task, "application workers: %D", apcf.workers); - nxt_debug(task, "application request timeout: %D", apcf.timeout); - nxt_debug(task, "application reschedule timeout: %D", apcf.res_timeout); + nxt_debug(task, "application processes: %D", apcf.processes); + nxt_debug(task, "application request timeout: %M", apcf.timeout); + nxt_debug(task, "application reschedule timeout: %M", apcf.res_timeout); nxt_debug(task, "application requests: %D", apcf.requests); lang = nxt_app_lang_module(task->thread->runtime, &apcf.type); @@ -1226,6 +1332,8 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, } nxt_queue_init(&app->ports); + nxt_queue_init(&app->spare_ports); + nxt_queue_init(&app->idle_ports); nxt_queue_init(&app->requests); nxt_queue_init(&app->pending); @@ -1233,14 +1341,32 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_memcpy(app->name.start, name.start, name.length); app->type = lang->type; - app->max_workers = apcf.workers; + app->max_processes = apcf.max_processes; + app->spare_processes = apcf.spare_processes; + app->max_pending_processes = apcf.spare_processes + ? apcf.spare_processes : 1; app->timeout = apcf.timeout; app->res_timeout = apcf.res_timeout * 1000000; + app->idle_timeout = apcf.idle_timeout; app->live = 1; app->max_pending_responses = 2; app->max_requests = apcf.requests; app->prepare_msg = nxt_app_prepare_msg[lang->type]; + engine = task->thread->engine; + + app->engine = engine; + + app->idle_timer.precision = NXT_TIMER_DEFAULT_PRECISION; + app->idle_timer.work_queue = &engine->fast_work_queue; + app->idle_timer.handler = nxt_router_app_idle_timeout; + app->idle_timer.task = &engine->task; + app->idle_timer.log = app->idle_timer.task->log; + + app->adjust_idle_work.handler = nxt_router_adjust_idle_timer; + app->adjust_idle_work.task = &engine->task; + app->adjust_idle_work.obj = app; + nxt_queue_insert_tail(&tmcf->apps, &app->link); nxt_router_app_use(task, app, 1); @@ -1624,6 +1750,119 @@ nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, } +static void +nxt_router_app_rpc_create(nxt_task_t *task, + nxt_router_temp_conf_t *tmcf, nxt_app_t *app) +{ + size_t size; + uint32_t stream; + nxt_buf_t *b; + nxt_port_t *main_port, *router_port; + nxt_runtime_t *rt; + nxt_app_rpc_t *rpc; + + rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_app_rpc_t)); + if (rpc == NULL) { + goto fail; + } + + rpc->app = app; + rpc->temp_conf = tmcf; + + nxt_debug(task, "app '%V' prefork", &app->name); + + size = app->name.length + 1 + app->conf.length; + + b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0); + if (nxt_slow_path(b == NULL)) { + goto fail; + } + + nxt_buf_cpystr(b, &app->name); + *b->mem.free++ = '\0'; + nxt_buf_cpystr(b, &app->conf); + + rt = task->thread->runtime; + main_port = rt->port_by_type[NXT_PROCESS_MAIN]; + router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; + + stream = nxt_port_rpc_register_handler(task, router_port, + nxt_router_app_prefork_ready, + nxt_router_app_prefork_error, + -1, rpc); + if (nxt_slow_path(stream == 0)) { + goto fail; + } + + app->pending_processes++; + + nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1, + stream, router_port->id, b); + + return; + +fail: + + nxt_router_conf_error(task, tmcf); +} + + +static void +nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, + void *data) +{ + nxt_app_t *app; + nxt_port_t *port; + nxt_app_rpc_t *rpc; + nxt_event_engine_t *engine; + + rpc = data; + app = rpc->app; + + port = msg->u.new_port; + port->app = app; + + nxt_router_app_use(task, app, 1); + + app->pending_processes--; + app->processes++; + app->idle_processes++; + + engine = task->thread->engine; + + nxt_queue_insert_tail(&app->ports, &port->app_link); + nxt_queue_insert_tail(&app->spare_ports, &port->idle_link); + + port->idle_start = 0; + + nxt_port_inc_use(port); + + nxt_work_queue_add(&engine->fast_work_queue, + nxt_router_conf_apply, task, rpc->temp_conf, NULL); +} + + +static void +nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, + void *data) +{ + nxt_app_t *app; + nxt_app_rpc_t *rpc; + nxt_router_temp_conf_t *tmcf; + + rpc = data; + app = rpc->app; + tmcf = rpc->temp_conf; + + nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"", + &app->name); + + app->pending_processes--; + + nxt_router_conf_error(task, tmcf); +} + + static nxt_int_t nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router, nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface) @@ -1933,32 +2172,11 @@ static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router, nxt_router_temp_conf_t *tmcf) { - nxt_app_t *app; - nxt_port_t *port; + nxt_app_t *app; nxt_queue_each(app, &router->apps, nxt_app_t, link) { - nxt_queue_remove(&app->link); - - nxt_debug(task, "about to free app '%V' %p", &app->name, app); - - app->live = 0; - - do { - port = nxt_router_app_get_idle_port(app); - if (port == NULL) { - break; - } - - nxt_debug(task, "port %p send quit", port); - - nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, - NULL); - - nxt_port_use(task, port, -1); - } while (1); - - nxt_router_app_use(task, app, -1); + nxt_router_app_quit(task, app); } nxt_queue_loop; @@ -2454,6 +2672,7 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, if (nxt_buf_mem_used_size(&b->mem) == 0) { nxt_work_queue_add(&task->thread->engine->fast_work_queue, b->completion_handler, task, b, b->parent); + } else { nxt_buf_chain_add(&r->out, b); } @@ -2561,14 +2780,15 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_thread_mutex_lock(&app->mutex); - nxt_assert(app->pending_workers != 0); + nxt_assert(app->pending_processes != 0); - app->pending_workers--; - app->workers++; + app->pending_processes--; + app->processes++; nxt_thread_mutex_unlock(&app->mutex); - nxt_debug(task, "app '%V' %p new port ready", &app->name, app); + nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d", + &app->name, port->pid, app->processes, app->pending_processes); nxt_router_app_port_release(task, port, 0, 0); } @@ -2590,9 +2810,9 @@ nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_thread_mutex_lock(&app->mutex); - nxt_assert(app->pending_workers != 0); + nxt_assert(app->pending_processes != 0); - app->pending_workers--; + app->pending_processes--; if (!nxt_queue_is_empty(&app->requests)) { lnk = nxt_queue_last(&app->requests); @@ -2611,7 +2831,7 @@ nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_debug(task, "app '%V' %p abort next stream #%uD", &app->name, app, ra->stream); - nxt_router_ra_error(ra, 500, "Failed to start application worker"); + nxt_router_ra_error(ra, 500, "Failed to start application process"); nxt_router_ra_use(task, ra, -1); } @@ -2629,10 +2849,13 @@ nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i) if (i < 0 && c == -i) { nxt_assert(app->live == 0); - nxt_assert(app->workers == 0); - nxt_assert(app->pending_workers == 0); + nxt_assert(app->processes == 0); + nxt_assert(app->idle_processes == 0); + nxt_assert(app->pending_processes == 0); nxt_assert(nxt_queue_is_empty(&app->requests) != 0); nxt_assert(nxt_queue_is_empty(&app->ports) != 0); + nxt_assert(nxt_queue_is_empty(&app->spare_ports) != 0); + nxt_assert(nxt_queue_is_empty(&app->idle_ports) != 0); nxt_thread_mutex_destroy(&app->mutex); nxt_free(app); @@ -2666,6 +2889,19 @@ nxt_router_pop_first_port(nxt_app_t *app) port->app_pending_responses++; + if (nxt_queue_chk_remove(&port->idle_link)) { + app->idle_processes--; + + if (port->idle_start == 0) { + nxt_assert(app->idle_processes < app->spare_processes); + + } else { + nxt_assert(app->idle_processes >= app->spare_processes); + + port->idle_start = 0; + } + } + if ((app->max_pending_responses == 0 || port->app_pending_responses < app->max_pending_responses) && (app->max_requests == 0 @@ -2684,8 +2920,8 @@ nxt_router_pop_first_port(nxt_app_t *app) } -static nxt_port_t * -nxt_router_app_get_idle_port(nxt_app_t *app) +nxt_inline nxt_port_t * +nxt_router_app_get_port_for_quit(nxt_app_t *app) { nxt_port_t *port; @@ -2701,8 +2937,16 @@ nxt_router_app_get_idle_port(nxt_app_t *app) continue; } - nxt_queue_remove(&port->app_link); - port->app_link.next = NULL; + /* Caller is responsible to decrease port use count. */ + nxt_queue_chk_remove(&port->app_link); + + if (nxt_queue_chk_remove(&port->idle_link)) { + app->idle_processes--; + } + + /* Caller is responsible to decrease app use count. */ + port->app = NULL; + app->processes--; break; @@ -2715,6 +2959,41 @@ nxt_router_app_get_idle_port(nxt_app_t *app) static void +nxt_router_app_quit(nxt_task_t *task, nxt_app_t *app) +{ + nxt_port_t *port; + + nxt_queue_remove(&app->link); + + app->live = 0; + + for ( ;; ) { + port = nxt_router_app_get_port_for_quit(app); + if (port == NULL) { + break; + } + + nxt_debug(task, "send QUIT to app '%V' pid %PI", &app->name, port->pid); + + nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); + + nxt_port_use(task, port, -1); + nxt_router_app_use(task, app, -1); + } + + if (nxt_timer_is_in_tree(&app->idle_timer)) { + nxt_assert(app->engine == task->thread->engine); + + app->idle_timer.handler = nxt_router_app_release_handler; + nxt_timer_add(app->engine, &app->idle_timer, 0); + + } else { + nxt_router_app_use(task, app, -1); + } +} + + +static void nxt_router_app_process_request(nxt_task_t *task, void *obj, void *data) { nxt_app_t *app; @@ -2739,7 +3018,8 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, uint32_t request_failed, uint32_t got_response) { nxt_app_t *app; - nxt_bool_t send_quit, cancelled; + nxt_bool_t port_unchained; + nxt_bool_t send_quit, cancelled, adjust_idle_timer; nxt_queue_link_t *lnk; nxt_req_app_link_t *ra, *pending_ra, *re_ra; nxt_port_select_state_t state; @@ -2856,12 +3136,52 @@ app_dead: re_ra_cancelled: - send_quit = (app->live == 0 && port->app_pending_responses > 0) + send_quit = (app->live == 0 && port->app_pending_responses == 0) || (app->max_requests > 0 && port->app_pending_responses == 0 && port->app_responses >= app->max_requests); + if (send_quit) { + port_unchained = nxt_queue_chk_remove(&port->app_link); + + port->app = NULL; + app->processes--; + + } else { + port_unchained = 0; + } + + adjust_idle_timer = 0; + + if (!send_quit && port->app_pending_responses == 0) { + nxt_assert(port->idle_link.next == NULL); + + if (app->idle_processes == app->spare_processes + && app->adjust_idle_work.data == NULL) + { + adjust_idle_timer = 1; + app->adjust_idle_work.data = app; + app->adjust_idle_work.next = NULL; + } + + if (app->idle_processes < app->spare_processes) { + nxt_queue_insert_tail(&app->spare_ports, &port->idle_link); + + } else { + nxt_queue_insert_tail(&app->idle_ports, &port->idle_link); + + port->idle_start = task->thread->engine->timers.now; + } + + app->idle_processes++; + } + nxt_thread_mutex_unlock(&app->mutex); + if (adjust_idle_timer) { + nxt_router_app_use(task, app, 1); + nxt_event_engine_post(app->engine, &app->adjust_idle_work); + } + if (pending_ra != NULL) { nxt_router_ra_use(task, pending_ra, -1); } @@ -2893,12 +3213,18 @@ re_ra_cancelled: } if (send_quit) { - nxt_debug(task, "app '%V' %p is not alive, send QUIT to port", + nxt_debug(task, "app '%V' %p send QUIT to port", &app->name, app); nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); + if (port_unchained) { + nxt_port_use(task, port, -1); + } + + nxt_router_app_use(task, app, -1); + goto adjust_use; } @@ -2916,8 +3242,10 @@ adjust_use: void nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port) { - nxt_app_t *app; - nxt_bool_t unchain, start_worker; + nxt_app_t *app; + nxt_bool_t unchain, start_process; + nxt_port_t *idle_port; + nxt_queue_link_t *idle_lnk; app = port->app; @@ -2925,41 +3253,162 @@ nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port) nxt_thread_mutex_lock(&app->mutex); - unchain = port->app_link.next != NULL; + unchain = nxt_queue_chk_remove(&port->app_link); - if (unchain) { - nxt_queue_remove(&port->app_link); - port->app_link.next = NULL; + if (nxt_queue_chk_remove(&port->idle_link)) { + app->idle_processes--; + + if (port->idle_start == 0 + && app->idle_processes >= app->spare_processes) + { + nxt_assert(!nxt_queue_is_empty(&app->idle_ports)); + + idle_lnk = nxt_queue_last(&app->idle_ports); + idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link); + nxt_queue_remove(idle_lnk); + + nxt_queue_insert_tail(&app->spare_ports, idle_lnk); + + idle_port->idle_start = 0; + } } - app->workers--; + app->processes--; - start_worker = app->live != 0 - && nxt_queue_is_empty(&app->requests) == 0 - && app->workers + app->pending_workers < app->max_workers; + start_process = app->live != 0 + && !task->thread->engine->shutdown + && nxt_router_app_can_start(app) + && (!nxt_queue_is_empty(&app->requests) + || nxt_router_app_need_start(app)); - if (start_worker) { - app->pending_workers++; + if (start_process) { + app->pending_processes++; } nxt_thread_mutex_unlock(&app->mutex); - nxt_debug(task, "app '%V' %p port %p close", &app->name, app, port); + nxt_debug(task, "app '%V' pid %PI closed", &app->name, port->pid); if (unchain) { nxt_port_use(task, port, -1); } - if (start_worker) { - nxt_router_start_worker(task, app); + if (start_process) { + nxt_router_start_app_process(task, app); } } static void +nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data) +{ + nxt_app_t *app; + nxt_bool_t queued; + nxt_port_t *port; + nxt_msec_t timeout, threshold; + nxt_queue_link_t *lnk; + nxt_event_engine_t *engine; + + app = obj; + queued = (data == app); + + nxt_debug(task, "nxt_router_adjust_idle_timer: app \"%V\", queued %b", + &app->name, queued); + + engine = task->thread->engine; + + nxt_assert(app->engine == engine); + + threshold = engine->timers.now + app->idle_timer.precision; + timeout = 0; + + nxt_thread_mutex_lock(&app->mutex); + + if (queued) { + app->adjust_idle_work.data = NULL; + } + + while (app->idle_processes > app->spare_processes) { + + nxt_assert(nxt_queue_is_empty(&app->idle_ports) == 0); + + lnk = nxt_queue_first(&app->idle_ports); + port = nxt_queue_link_data(lnk, nxt_port_t, idle_link); + + timeout = port->idle_start + app->idle_timeout; + + if (timeout > threshold) { + break; + } + + nxt_queue_remove(lnk); + lnk->next = NULL; + + nxt_queue_chk_remove(&port->app_link); + + app->idle_processes--; + app->processes--; + port->app = NULL; + + nxt_thread_mutex_unlock(&app->mutex); + + nxt_debug(task, "app '%V' send QUIT to idle port %PI", + &app->name, port->pid); + + nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); + + nxt_port_use(task, port, -1); + nxt_router_app_use(task, app, -1); + + nxt_thread_mutex_lock(&app->mutex); + } + + nxt_thread_mutex_unlock(&app->mutex); + + if (timeout > threshold) { + nxt_timer_add(engine, &app->idle_timer, timeout - threshold); + + } else { + nxt_timer_disable(engine, &app->idle_timer); + } + + if (queued) { + nxt_router_app_use(task, app, -1); + } +} + + +static void +nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, void *data) +{ + nxt_app_t *app; + nxt_timer_t *timer; + + timer = obj; + app = nxt_container_of(timer, nxt_app_t, idle_timer); + + nxt_router_adjust_idle_timer(task, app, NULL); +} + + +static void +nxt_router_app_release_handler(nxt_task_t *task, void *obj, void *data) +{ + nxt_app_t *app; + nxt_timer_t *timer; + + timer = obj; + app = nxt_container_of(timer, nxt_app_t, idle_timer); + + nxt_router_app_use(task, app, -1); +} + + +static void nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state) { nxt_app_t *app; + nxt_bool_t can_start_process; nxt_req_app_link_t *ra; ra = state->ra; @@ -2996,12 +3445,13 @@ nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state) ra->app_port = NULL; } - state->can_start_worker = (app->workers + app->pending_workers) - < app->max_workers; + can_start_process = nxt_router_app_can_start(app); + state->port = NULL; + state->start_process = 0; if (nxt_queue_is_empty(&app->ports) - || (state->can_start_worker && nxt_router_app_first_port_busy(app)) ) + || (can_start_process && nxt_router_app_first_port_busy(app)) ) { ra = nxt_router_ra_create(task, ra); @@ -3020,8 +3470,9 @@ nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state) nxt_debug(task, "ra stream #%uD enqueue to app->requests", ra->stream); - if (state->can_start_worker) { - app->pending_workers++; + if (can_start_process) { + app->pending_processes++; + state->start_process = 1; } } else { @@ -3038,6 +3489,11 @@ nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state) nxt_router_ra_pending(task, app, ra); } + + if (can_start_process && nxt_router_app_need_start(app)) { + app->pending_processes++; + state->start_process = 1; + } } fail: @@ -3077,20 +3533,24 @@ nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state) ra->app_port = state->port; + if (state->start_process) { + nxt_router_start_app_process(task, app); + } + return NXT_OK; } - if (!state->can_start_worker) { - nxt_debug(task, "app '%V' %p too many running or pending workers", + if (!state->start_process) { + nxt_debug(task, "app '%V' %p too many running or pending processes", &app->name, app); return NXT_AGAIN; } - res = nxt_router_start_worker(task, app); + res = nxt_router_start_app_process(task, app); if (nxt_slow_path(res != NXT_OK)) { - nxt_router_ra_error(ra, 500, "Failed to start worker"); + nxt_router_ra_error(ra, 500, "Failed to start app process"); nxt_router_ra_use(task, ra, -1); return NXT_ERROR; @@ -3227,7 +3687,7 @@ nxt_router_app_prepare_request(nxt_task_t *task, nxt_req_app_link_t *ra) goto release_port; } - nxt_debug(task, "about to send %O bytes buffer to worker port %d", + nxt_debug(task, "about to send %O bytes buffer to app process port %d", nxt_buf_used_size(wmsg.write), wmsg.port->socket.fd); |