summaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/nxt_port.c6
-rw-r--r--src/nxt_router.c243
-rw-r--r--src/nxt_router.h12
3 files changed, 168 insertions, 93 deletions
diff --git a/src/nxt_port.c b/src/nxt_port.c
index ae08b71d..c4b2af96 100644
--- a/src/nxt_port.c
+++ b/src/nxt_port.c
@@ -109,11 +109,7 @@ nxt_port_release(nxt_task_t *task, nxt_port_t *port)
nxt_debug(task, "port %p %d:%d release, type %d", port, port->pid,
port->id, port->type);
- if (port->app != NULL) {
- nxt_router_app_use(task, port->app, -1);
-
- port->app = NULL;
- }
+ port->app = NULL;
if (port->link.next != NULL) {
nxt_assert(port->process != NULL);
diff --git a/src/nxt_router.c b/src/nxt_router.c
index 4c25341f..8f820895 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -236,7 +236,7 @@ static void nxt_router_app_port_ready(nxt_task_t *task,
static void nxt_router_app_port_error(nxt_task_t *task,
nxt_port_recv_msg_t *msg, void *data);
-static void nxt_router_app_quit(nxt_task_t *task, nxt_app_t *app);
+static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app);
static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
uint32_t request_failed, uint32_t got_response);
static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_app_t *app,
@@ -252,12 +252,16 @@ static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj,
void *data);
static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj,
void *data);
-static void nxt_router_app_release_handler(nxt_task_t *task, void *obj,
+static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj,
void *data);
+static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data);
static const nxt_http_request_state_t nxt_http_request_send_state;
static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
+static void nxt_router_app_joint_use(nxt_task_t *task,
+ nxt_app_joint_t *app_joint, int i);
+
static nxt_router_t *nxt_router;
static const nxt_str_t http_prefix = nxt_string("HTTP_");
@@ -360,12 +364,16 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
*b->mem.free++ = '\0';
nxt_buf_cpystr(b, &app->conf);
+ nxt_router_app_joint_use(task, app->joint, 1);
+
stream = nxt_port_rpc_register_handler(task, port,
nxt_router_app_port_ready,
nxt_router_app_port_error,
- -1, app);
+ -1, app->joint);
if (nxt_slow_path(stream == 0)) {
+ nxt_router_app_joint_use(task, app->joint, -1);
+
goto failed;
}
@@ -374,9 +382,14 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
if (nxt_slow_path(ret != NXT_OK)) {
nxt_port_rpc_cancel(task, port, stream);
+
+ nxt_router_app_joint_use(task, app->joint, -1);
+
goto failed;
}
+ nxt_router_app_use(task, app, -1);
+
return;
failed:
@@ -397,6 +410,19 @@ failed:
}
+static void
+nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i)
+{
+ app_joint->use_count += i;
+
+ if (app_joint->use_count == 0) {
+ nxt_assert(app_joint->app == NULL);
+
+ nxt_free(app_joint);
+ }
+}
+
+
static nxt_int_t
nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
{
@@ -1098,7 +1124,7 @@ nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
- nxt_router_app_quit(task, app);
+ nxt_router_app_unlink(task, app);
} nxt_queue_loop;
@@ -1284,6 +1310,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_str_t name, path;
nxt_app_t *app, *prev;
nxt_router_t *router;
+ nxt_app_joint_t *app_joint;
nxt_conf_value_t *conf, *http, *value;
nxt_conf_value_t *applications, *application;
nxt_conf_value_t *listeners, *listener;
@@ -1375,6 +1402,13 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
apcf.limits_value = NULL;
apcf.processes_value = NULL;
+ app_joint = nxt_malloc(sizeof(nxt_app_joint_t));
+ if (nxt_slow_path(app_joint == NULL)) {
+ goto app_fail;
+ }
+
+ nxt_memzero(app_joint, sizeof(nxt_app_joint_t));
+
ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
nxt_nitems(nxt_router_app_conf), &apcf);
if (ret != NXT_OK) {
@@ -1453,7 +1487,6 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
app->timeout = apcf.timeout;
app->res_timeout = apcf.res_timeout * 1000000;
app->idle_timeout = apcf.idle_timeout;
- app->live = 1;
app->max_pending_responses = 2;
app->max_requests = apcf.requests;
@@ -1461,12 +1494,6 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
app->engine = engine;
- app->idle_timer.precision = NXT_TIMER_DEFAULT_PRECISION;
- app->idle_timer.work_queue = &engine->fast_work_queue;
- app->idle_timer.handler = nxt_router_app_idle_timeout;
- app->idle_timer.task = &engine->task;
- app->idle_timer.log = app->idle_timer.task->log;
-
app->adjust_idle_work.handler = nxt_router_adjust_idle_timer;
app->adjust_idle_work.task = &engine->task;
app->adjust_idle_work.obj = app;
@@ -1474,6 +1501,21 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_queue_insert_tail(&tmcf->apps, &app->link);
nxt_router_app_use(task, app, 1);
+
+ app->joint = app_joint;
+
+ app_joint->use_count = 1;
+ app_joint->app = app;
+
+ app_joint->idle_timer.precision = NXT_TIMER_DEFAULT_PRECISION;
+ app_joint->idle_timer.work_queue = &engine->fast_work_queue;
+ app_joint->idle_timer.handler = nxt_router_app_idle_timeout;
+ app_joint->idle_timer.task = &engine->task;
+ app_joint->idle_timer.log = app_joint->idle_timer.task->log;
+
+ app_joint->free_app_work.handler = nxt_router_free_app;
+ app_joint->free_app_work.task = &engine->task;
+ app_joint->free_app_work.obj = app_joint;
}
http = nxt_conf_get_path(conf, &http_path);
@@ -1975,8 +2017,6 @@ nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
port = msg->u.new_port;
port->app = app;
- nxt_router_app_use(task, app, 1);
-
app->pending_processes--;
app->processes++;
app->idle_processes++;
@@ -2329,7 +2369,7 @@ nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
nxt_queue_each(app, &router->apps, nxt_app_t, link) {
- nxt_router_app_quit(task, app);
+ nxt_router_app_unlink(task, app);
} nxt_queue_loop;
@@ -3365,15 +3405,28 @@ static void
nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
void *data)
{
- nxt_app_t *app;
- nxt_port_t *port;
+ nxt_app_t *app;
+ nxt_port_t *port;
+ nxt_app_joint_t *app_joint;
- app = data;
+ app_joint = data;
port = msg->u.new_port;
- nxt_assert(app != NULL);
+ nxt_assert(app_joint != NULL);
nxt_assert(port != NULL);
+ app = app_joint->app;
+
+ nxt_router_app_joint_use(task, app_joint, -1);
+
+ if (nxt_slow_path(app == NULL)) {
+ nxt_debug(task, "new port ready for released app, send QUIT");
+
+ nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
+
+ return;
+ }
+
port->app = app;
nxt_thread_mutex_lock(&app->mutex);
@@ -3397,12 +3450,23 @@ nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
void *data)
{
nxt_app_t *app;
+ nxt_app_joint_t *app_joint;
nxt_queue_link_t *lnk;
nxt_req_app_link_t *ra;
- app = data;
+ app_joint = data;
- nxt_assert(app != NULL);
+ nxt_assert(app_joint != NULL);
+
+ app = app_joint->app;
+
+ nxt_router_app_joint_use(task, app_joint, -1);
+
+ if (nxt_slow_path(app == NULL)) {
+ nxt_debug(task, "start error for released app");
+
+ return;
+ }
nxt_debug(task, "app '%V' %p start error", &app->name, app);
@@ -3432,10 +3496,10 @@ nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_router_ra_error(ra, 500, "Failed to start application process");
nxt_router_ra_use(task, ra, -1);
}
-
- nxt_router_app_use(task, app, -1);
}
+nxt_inline nxt_port_t *
+nxt_router_app_get_port_for_quit(nxt_app_t *app);
void
nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i)
@@ -3446,17 +3510,12 @@ nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i)
if (i < 0 && c == -i) {
- nxt_assert(app->live == 0);
- nxt_assert(app->processes == 0);
- nxt_assert(app->idle_processes == 0);
- nxt_assert(app->pending_processes == 0);
- nxt_assert(nxt_queue_is_empty(&app->requests));
- nxt_assert(nxt_queue_is_empty(&app->ports));
- nxt_assert(nxt_queue_is_empty(&app->spare_ports));
- nxt_assert(nxt_queue_is_empty(&app->idle_ports));
+ if (task->thread->engine != app->engine) {
+ nxt_event_engine_post(app->engine, &app->joint->free_app_work);
- nxt_thread_mutex_destroy(&app->mutex);
- nxt_free(app);
+ } else {
+ nxt_router_free_app(task, app->joint, NULL);
+ }
}
}
@@ -3542,7 +3601,6 @@ nxt_router_app_get_port_for_quit(nxt_app_t *app)
app->idle_processes--;
}
- /* Caller is responsible to decrease app use count. */
port->app = NULL;
app->processes--;
@@ -3557,37 +3615,13 @@ nxt_router_app_get_port_for_quit(nxt_app_t *app)
static void
-nxt_router_app_quit(nxt_task_t *task, nxt_app_t *app)
+nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app)
{
- nxt_port_t *port;
+ nxt_debug(task, "app '%V' %p unlink", &app->name, app);
nxt_queue_remove(&app->link);
- app->live = 0;
-
- for ( ;; ) {
- port = nxt_router_app_get_port_for_quit(app);
- if (port == NULL) {
- break;
- }
-
- nxt_debug(task, "send QUIT to app '%V' pid %PI", &app->name, port->pid);
-
- nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
-
- nxt_port_use(task, port, -1);
- nxt_router_app_use(task, app, -1);
- }
-
- if (nxt_timer_is_in_tree(&app->idle_timer)) {
- nxt_assert(app->engine == task->thread->engine);
-
- app->idle_timer.handler = nxt_router_app_release_handler;
- nxt_timer_add(app->engine, &app->idle_timer, 0);
-
- } else {
- nxt_router_app_use(task, app, -1);
- }
+ nxt_router_app_use(task, app, -1);
}
@@ -3640,10 +3674,6 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
port->app_pending_responses -= request_failed + got_response;
port->app_responses += got_response;
- if (nxt_slow_path(app->live == 0)) {
- goto app_dead;
- }
-
if (port->pair[1] != -1
&& (app->max_pending_responses == 0
|| port->app_pending_responses < app->max_pending_responses)
@@ -3687,8 +3717,6 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
}
}
-app_dead:
-
/* Pop first pending request for this port. */
if ((request_failed > 0 || got_response > 0)
&& !nxt_queue_is_empty(&port->pending_requests))
@@ -3740,9 +3768,9 @@ app_dead:
re_ra_cancelled:
- send_quit = (app->live == 0 && port->app_pending_responses == 0)
- || (app->max_requests > 0 && port->app_pending_responses == 0
- && port->app_responses >= app->max_requests);
+ send_quit = (app->max_requests > 0
+ && port->app_pending_responses == 0
+ && port->app_responses >= app->max_requests);
if (send_quit) {
port_unchained = nxt_queue_chk_remove(&port->app_link);
@@ -3827,8 +3855,6 @@ re_ra_cancelled:
nxt_port_use(task, port, -1);
}
- nxt_router_app_use(task, app, -1);
-
goto adjust_use;
}
@@ -3879,8 +3905,7 @@ nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port)
app->processes--;
- start_process = app->live != 0
- && !task->thread->engine->shutdown
+ start_process = !task->thread->engine->shutdown
&& nxt_router_app_can_start(app)
&& (!nxt_queue_is_empty(&app->requests)
|| nxt_router_app_need_start(app));
@@ -3923,7 +3948,7 @@ nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data)
nxt_assert(app->engine == engine);
- threshold = engine->timers.now + app->idle_timer.precision;
+ threshold = engine->timers.now + app->joint->idle_timer.precision;
timeout = 0;
nxt_thread_mutex_lock(&app->mutex);
@@ -3962,7 +3987,6 @@ nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data)
nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
nxt_port_use(task, port, -1);
- nxt_router_app_use(task, app, -1);
nxt_thread_mutex_lock(&app->mutex);
}
@@ -3970,10 +3994,10 @@ nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data)
nxt_thread_mutex_unlock(&app->mutex);
if (timeout > threshold) {
- nxt_timer_add(engine, &app->idle_timer, timeout - threshold);
+ nxt_timer_add(engine, &app->joint->idle_timer, timeout - threshold);
} else {
- nxt_timer_disable(engine, &app->idle_timer);
+ nxt_timer_disable(engine, &app->joint->idle_timer);
}
if (queued) {
@@ -3985,26 +4009,73 @@ nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data)
static void
nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, void *data)
{
- nxt_app_t *app;
- nxt_timer_t *timer;
+ nxt_timer_t *timer;
+ nxt_app_joint_t *app_joint;
timer = obj;
- app = nxt_container_of(timer, nxt_app_t, idle_timer);
+ app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer);
- nxt_router_adjust_idle_timer(task, app, NULL);
+ if (nxt_fast_path(app_joint->app != NULL)) {
+ nxt_router_adjust_idle_timer(task, app_joint->app, NULL);
+ }
}
static void
-nxt_router_app_release_handler(nxt_task_t *task, void *obj, void *data)
+nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj, void *data)
{
- nxt_app_t *app;
- nxt_timer_t *timer;
+ nxt_timer_t *timer;
+ nxt_app_joint_t *app_joint;
timer = obj;
- app = nxt_container_of(timer, nxt_app_t, idle_timer);
+ app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer);
- nxt_router_app_use(task, app, -1);
+ nxt_router_app_joint_use(task, app_joint, -1);
+}
+
+
+static void
+nxt_router_free_app(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_app_t *app;
+ nxt_port_t *port;
+ nxt_app_joint_t *app_joint;
+
+ app_joint = obj;
+ app = app_joint->app;
+
+ for ( ;; ) {
+ port = nxt_router_app_get_port_for_quit(app);
+ if (port == NULL) {
+ break;
+ }
+
+ nxt_debug(task, "send QUIT to app '%V' pid %PI", &app->name, port->pid);
+
+ nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
+
+ nxt_port_use(task, port, -1);
+ }
+
+ nxt_assert(app->processes == 0);
+ nxt_assert(app->idle_processes == 0);
+ nxt_assert(nxt_queue_is_empty(&app->requests));
+ nxt_assert(nxt_queue_is_empty(&app->ports));
+ nxt_assert(nxt_queue_is_empty(&app->spare_ports));
+ nxt_assert(nxt_queue_is_empty(&app->idle_ports));
+
+ nxt_thread_mutex_destroy(&app->mutex);
+ nxt_free(app);
+
+ app_joint->app = NULL;
+
+ if (nxt_timer_delete(task->thread->engine, &app_joint->idle_timer)) {
+ app_joint->idle_timer.handler = nxt_router_app_joint_release_handler;
+ nxt_timer_add(task->thread->engine, &app_joint->idle_timer, 0);
+
+ } else {
+ nxt_router_app_joint_use(task, app_joint, -1);
+ }
}
diff --git a/src/nxt_router.h b/src/nxt_router.h
index 57c526f3..008fc328 100644
--- a/src/nxt_router.h
+++ b/src/nxt_router.h
@@ -81,13 +81,20 @@ typedef struct {
} nxt_joint_job_t;
+typedef struct {
+ uint32_t use_count;
+ nxt_app_t *app;
+ nxt_timer_t idle_timer;
+ nxt_work_t free_app_work;
+} nxt_app_joint_t;
+
+
struct nxt_app_s {
nxt_thread_mutex_t mutex; /* Protects ports queue. */
nxt_queue_t ports; /* of nxt_port_t.app_link */
nxt_queue_t spare_ports; /* of nxt_port_t.idle_link */
nxt_queue_t idle_ports; /* of nxt_port_t.idle_link */
- nxt_timer_t idle_timer;
nxt_work_t adjust_idle_work;
nxt_event_engine_t *engine;
@@ -110,13 +117,14 @@ struct nxt_app_s {
nxt_msec_t idle_timeout;
nxt_app_type_t type:8;
- uint8_t live; /* 1 bit */
nxt_queue_link_t link;
nxt_str_t conf;
nxt_atomic_t use_count;
+
+ nxt_app_joint_t *joint;
};