diff options
Diffstat (limited to '')
-rw-r--r-- | src/nxt_router.c | 243 |
1 files changed, 157 insertions, 86 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c index 4c25341f..8f820895 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -236,7 +236,7 @@ static void nxt_router_app_port_ready(nxt_task_t *task, static void nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data); -static void nxt_router_app_quit(nxt_task_t *task, nxt_app_t *app); +static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app); static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, uint32_t request_failed, uint32_t got_response); static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_app_t *app, @@ -252,12 +252,16 @@ static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data); static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, void *data); -static void nxt_router_app_release_handler(nxt_task_t *task, void *obj, +static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj, void *data); +static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data); static const nxt_http_request_state_t nxt_http_request_send_state; static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data); +static void nxt_router_app_joint_use(nxt_task_t *task, + nxt_app_joint_t *app_joint, int i); + static nxt_router_t *nxt_router; static const nxt_str_t http_prefix = nxt_string("HTTP_"); @@ -360,12 +364,16 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, *b->mem.free++ = '\0'; nxt_buf_cpystr(b, &app->conf); + nxt_router_app_joint_use(task, app->joint, 1); + stream = nxt_port_rpc_register_handler(task, port, nxt_router_app_port_ready, nxt_router_app_port_error, - -1, app); + -1, app->joint); if (nxt_slow_path(stream == 0)) { + nxt_router_app_joint_use(task, app->joint, -1); + goto failed; } @@ -374,9 +382,14 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, if (nxt_slow_path(ret != NXT_OK)) { nxt_port_rpc_cancel(task, port, stream); + + nxt_router_app_joint_use(task, app->joint, -1); + goto failed; } + nxt_router_app_use(task, app, -1); + return; failed: @@ -397,6 +410,19 @@ failed: } +static void +nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i) +{ + app_joint->use_count += i; + + if (app_joint->use_count == 0) { + nxt_assert(app_joint->app == NULL); + + nxt_free(app_joint); + } +} + + static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app) { @@ -1098,7 +1124,7 @@ nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) { - nxt_router_app_quit(task, app); + nxt_router_app_unlink(task, app); } nxt_queue_loop; @@ -1284,6 +1310,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_str_t name, path; nxt_app_t *app, *prev; nxt_router_t *router; + nxt_app_joint_t *app_joint; nxt_conf_value_t *conf, *http, *value; nxt_conf_value_t *applications, *application; nxt_conf_value_t *listeners, *listener; @@ -1375,6 +1402,13 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, apcf.limits_value = NULL; apcf.processes_value = NULL; + app_joint = nxt_malloc(sizeof(nxt_app_joint_t)); + if (nxt_slow_path(app_joint == NULL)) { + goto app_fail; + } + + nxt_memzero(app_joint, sizeof(nxt_app_joint_t)); + ret = nxt_conf_map_object(mp, application, nxt_router_app_conf, nxt_nitems(nxt_router_app_conf), &apcf); if (ret != NXT_OK) { @@ -1453,7 +1487,6 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, app->timeout = apcf.timeout; app->res_timeout = apcf.res_timeout * 1000000; app->idle_timeout = apcf.idle_timeout; - app->live = 1; app->max_pending_responses = 2; app->max_requests = apcf.requests; @@ -1461,12 +1494,6 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, app->engine = engine; - app->idle_timer.precision = NXT_TIMER_DEFAULT_PRECISION; - app->idle_timer.work_queue = &engine->fast_work_queue; - app->idle_timer.handler = nxt_router_app_idle_timeout; - app->idle_timer.task = &engine->task; - app->idle_timer.log = app->idle_timer.task->log; - app->adjust_idle_work.handler = nxt_router_adjust_idle_timer; app->adjust_idle_work.task = &engine->task; app->adjust_idle_work.obj = app; @@ -1474,6 +1501,21 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_queue_insert_tail(&tmcf->apps, &app->link); nxt_router_app_use(task, app, 1); + + app->joint = app_joint; + + app_joint->use_count = 1; + app_joint->app = app; + + app_joint->idle_timer.precision = NXT_TIMER_DEFAULT_PRECISION; + app_joint->idle_timer.work_queue = &engine->fast_work_queue; + app_joint->idle_timer.handler = nxt_router_app_idle_timeout; + app_joint->idle_timer.task = &engine->task; + app_joint->idle_timer.log = app_joint->idle_timer.task->log; + + app_joint->free_app_work.handler = nxt_router_free_app; + app_joint->free_app_work.task = &engine->task; + app_joint->free_app_work.obj = app_joint; } http = nxt_conf_get_path(conf, &http_path); @@ -1975,8 +2017,6 @@ nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, port = msg->u.new_port; port->app = app; - nxt_router_app_use(task, app, 1); - app->pending_processes--; app->processes++; app->idle_processes++; @@ -2329,7 +2369,7 @@ nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router, nxt_queue_each(app, &router->apps, nxt_app_t, link) { - nxt_router_app_quit(task, app); + nxt_router_app_unlink(task, app); } nxt_queue_loop; @@ -3365,15 +3405,28 @@ static void nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) { - nxt_app_t *app; - nxt_port_t *port; + nxt_app_t *app; + nxt_port_t *port; + nxt_app_joint_t *app_joint; - app = data; + app_joint = data; port = msg->u.new_port; - nxt_assert(app != NULL); + nxt_assert(app_joint != NULL); nxt_assert(port != NULL); + app = app_joint->app; + + nxt_router_app_joint_use(task, app_joint, -1); + + if (nxt_slow_path(app == NULL)) { + nxt_debug(task, "new port ready for released app, send QUIT"); + + nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); + + return; + } + port->app = app; nxt_thread_mutex_lock(&app->mutex); @@ -3397,12 +3450,23 @@ nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) { nxt_app_t *app; + nxt_app_joint_t *app_joint; nxt_queue_link_t *lnk; nxt_req_app_link_t *ra; - app = data; + app_joint = data; - nxt_assert(app != NULL); + nxt_assert(app_joint != NULL); + + app = app_joint->app; + + nxt_router_app_joint_use(task, app_joint, -1); + + if (nxt_slow_path(app == NULL)) { + nxt_debug(task, "start error for released app"); + + return; + } nxt_debug(task, "app '%V' %p start error", &app->name, app); @@ -3432,10 +3496,10 @@ nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_router_ra_error(ra, 500, "Failed to start application process"); nxt_router_ra_use(task, ra, -1); } - - nxt_router_app_use(task, app, -1); } +nxt_inline nxt_port_t * +nxt_router_app_get_port_for_quit(nxt_app_t *app); void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i) @@ -3446,17 +3510,12 @@ nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i) if (i < 0 && c == -i) { - nxt_assert(app->live == 0); - nxt_assert(app->processes == 0); - nxt_assert(app->idle_processes == 0); - nxt_assert(app->pending_processes == 0); - nxt_assert(nxt_queue_is_empty(&app->requests)); - nxt_assert(nxt_queue_is_empty(&app->ports)); - nxt_assert(nxt_queue_is_empty(&app->spare_ports)); - nxt_assert(nxt_queue_is_empty(&app->idle_ports)); + if (task->thread->engine != app->engine) { + nxt_event_engine_post(app->engine, &app->joint->free_app_work); - nxt_thread_mutex_destroy(&app->mutex); - nxt_free(app); + } else { + nxt_router_free_app(task, app->joint, NULL); + } } } @@ -3542,7 +3601,6 @@ nxt_router_app_get_port_for_quit(nxt_app_t *app) app->idle_processes--; } - /* Caller is responsible to decrease app use count. */ port->app = NULL; app->processes--; @@ -3557,37 +3615,13 @@ nxt_router_app_get_port_for_quit(nxt_app_t *app) static void -nxt_router_app_quit(nxt_task_t *task, nxt_app_t *app) +nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app) { - nxt_port_t *port; + nxt_debug(task, "app '%V' %p unlink", &app->name, app); nxt_queue_remove(&app->link); - app->live = 0; - - for ( ;; ) { - port = nxt_router_app_get_port_for_quit(app); - if (port == NULL) { - break; - } - - nxt_debug(task, "send QUIT to app '%V' pid %PI", &app->name, port->pid); - - nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); - - nxt_port_use(task, port, -1); - nxt_router_app_use(task, app, -1); - } - - if (nxt_timer_is_in_tree(&app->idle_timer)) { - nxt_assert(app->engine == task->thread->engine); - - app->idle_timer.handler = nxt_router_app_release_handler; - nxt_timer_add(app->engine, &app->idle_timer, 0); - - } else { - nxt_router_app_use(task, app, -1); - } + nxt_router_app_use(task, app, -1); } @@ -3640,10 +3674,6 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, port->app_pending_responses -= request_failed + got_response; port->app_responses += got_response; - if (nxt_slow_path(app->live == 0)) { - goto app_dead; - } - if (port->pair[1] != -1 && (app->max_pending_responses == 0 || port->app_pending_responses < app->max_pending_responses) @@ -3687,8 +3717,6 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, } } -app_dead: - /* Pop first pending request for this port. */ if ((request_failed > 0 || got_response > 0) && !nxt_queue_is_empty(&port->pending_requests)) @@ -3740,9 +3768,9 @@ app_dead: re_ra_cancelled: - send_quit = (app->live == 0 && port->app_pending_responses == 0) - || (app->max_requests > 0 && port->app_pending_responses == 0 - && port->app_responses >= app->max_requests); + send_quit = (app->max_requests > 0 + && port->app_pending_responses == 0 + && port->app_responses >= app->max_requests); if (send_quit) { port_unchained = nxt_queue_chk_remove(&port->app_link); @@ -3827,8 +3855,6 @@ re_ra_cancelled: nxt_port_use(task, port, -1); } - nxt_router_app_use(task, app, -1); - goto adjust_use; } @@ -3879,8 +3905,7 @@ nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port) app->processes--; - start_process = app->live != 0 - && !task->thread->engine->shutdown + start_process = !task->thread->engine->shutdown && nxt_router_app_can_start(app) && (!nxt_queue_is_empty(&app->requests) || nxt_router_app_need_start(app)); @@ -3923,7 +3948,7 @@ nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data) nxt_assert(app->engine == engine); - threshold = engine->timers.now + app->idle_timer.precision; + threshold = engine->timers.now + app->joint->idle_timer.precision; timeout = 0; nxt_thread_mutex_lock(&app->mutex); @@ -3962,7 +3987,6 @@ nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); nxt_port_use(task, port, -1); - nxt_router_app_use(task, app, -1); nxt_thread_mutex_lock(&app->mutex); } @@ -3970,10 +3994,10 @@ nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data) nxt_thread_mutex_unlock(&app->mutex); if (timeout > threshold) { - nxt_timer_add(engine, &app->idle_timer, timeout - threshold); + nxt_timer_add(engine, &app->joint->idle_timer, timeout - threshold); } else { - nxt_timer_disable(engine, &app->idle_timer); + nxt_timer_disable(engine, &app->joint->idle_timer); } if (queued) { @@ -3985,26 +4009,73 @@ nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data) static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, void *data) { - nxt_app_t *app; - nxt_timer_t *timer; + nxt_timer_t *timer; + nxt_app_joint_t *app_joint; timer = obj; - app = nxt_container_of(timer, nxt_app_t, idle_timer); + app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer); - nxt_router_adjust_idle_timer(task, app, NULL); + if (nxt_fast_path(app_joint->app != NULL)) { + nxt_router_adjust_idle_timer(task, app_joint->app, NULL); + } } static void -nxt_router_app_release_handler(nxt_task_t *task, void *obj, void *data) +nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj, void *data) { - nxt_app_t *app; - nxt_timer_t *timer; + nxt_timer_t *timer; + nxt_app_joint_t *app_joint; timer = obj; - app = nxt_container_of(timer, nxt_app_t, idle_timer); + app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer); - nxt_router_app_use(task, app, -1); + nxt_router_app_joint_use(task, app_joint, -1); +} + + +static void +nxt_router_free_app(nxt_task_t *task, void *obj, void *data) +{ + nxt_app_t *app; + nxt_port_t *port; + nxt_app_joint_t *app_joint; + + app_joint = obj; + app = app_joint->app; + + for ( ;; ) { + port = nxt_router_app_get_port_for_quit(app); + if (port == NULL) { + break; + } + + nxt_debug(task, "send QUIT to app '%V' pid %PI", &app->name, port->pid); + + nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); + + nxt_port_use(task, port, -1); + } + + nxt_assert(app->processes == 0); + nxt_assert(app->idle_processes == 0); + nxt_assert(nxt_queue_is_empty(&app->requests)); + nxt_assert(nxt_queue_is_empty(&app->ports)); + nxt_assert(nxt_queue_is_empty(&app->spare_ports)); + nxt_assert(nxt_queue_is_empty(&app->idle_ports)); + + nxt_thread_mutex_destroy(&app->mutex); + nxt_free(app); + + app_joint->app = NULL; + + if (nxt_timer_delete(task->thread->engine, &app_joint->idle_timer)) { + app_joint->idle_timer.handler = nxt_router_app_joint_release_handler; + nxt_timer_add(task->thread->engine, &app_joint->idle_timer, 0); + + } else { + nxt_router_app_joint_use(task, app_joint, -1); + } } |