summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/nxt_http.h4
-rw-r--r--src/nxt_router.c142
-rw-r--r--src/nxt_router.h1
3 files changed, 129 insertions, 18 deletions
diff --git a/src/nxt_http.h b/src/nxt_http.h
index 67ac00d8..6c84843f 100644
--- a/src/nxt_http.h
+++ b/src/nxt_http.h
@@ -169,6 +169,10 @@ struct nxt_http_request_s {
nxt_http_peer_t *peer;
nxt_buf_t *last;
+ nxt_queue_link_t app_link; /* nxt_app_t.ack_waiting_req */
+ nxt_event_engine_t *engine;
+ nxt_work_t err_work;
+
nxt_http_response_t resp;
nxt_http_status_t status:16;
diff --git a/src/nxt_router.c b/src/nxt_router.c
index df0d96ad..0ccf6593 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -204,6 +204,8 @@ static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
nxt_apr_action_t action);
static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
nxt_request_rpc_data_t *req_rpc_data);
+static void nxt_router_http_request_error(nxt_task_t *task, void *obj,
+ void *data);
static void nxt_router_http_request_done(nxt_task_t *task, void *obj,
void *data);
@@ -539,6 +541,8 @@ nxt_inline void
nxt_request_rpc_data_unlink(nxt_task_t *task,
nxt_request_rpc_data_t *req_rpc_data)
{
+ nxt_app_t *app;
+ nxt_bool_t unlinked;
nxt_http_request_t *r;
nxt_router_msg_cancel(task, req_rpc_data);
@@ -550,12 +554,7 @@ nxt_request_rpc_data_unlink(nxt_task_t *task,
req_rpc_data->app_port = NULL;
}
- if (req_rpc_data->app != NULL) {
- nxt_router_app_use(task, req_rpc_data->app, -1);
-
- req_rpc_data->app = NULL;
- }
-
+ app = req_rpc_data->app;
r = req_rpc_data->request;
if (r != NULL) {
@@ -565,6 +564,31 @@ nxt_request_rpc_data_unlink(nxt_task_t *task,
r->req_rpc_data = NULL;
req_rpc_data->request = NULL;
+
+ if (app != NULL) {
+ unlinked = 0;
+
+ nxt_thread_mutex_lock(&app->mutex);
+
+ if (r->app_link.next != NULL) {
+ nxt_queue_remove(&r->app_link);
+ r->app_link.next = NULL;
+
+ unlinked = 1;
+ }
+
+ nxt_thread_mutex_unlock(&app->mutex);
+
+ if (unlinked) {
+ nxt_mp_release(r->mem_pool);
+ }
+ }
+ }
+
+ if (app != NULL) {
+ nxt_router_app_use(task, app, -1);
+
+ req_rpc_data->app = NULL;
}
if (req_rpc_data->msg_info.body_fd != -1) {
@@ -1492,6 +1516,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_queue_init(&app->ports);
nxt_queue_init(&app->spare_ports);
nxt_queue_init(&app->idle_ports);
+ nxt_queue_init(&app->ack_waiting_req);
app->name.length = name.length;
nxt_memcpy(app->name.start, name.start, name.length);
@@ -3784,7 +3809,7 @@ nxt_router_req_headers_ack_handler(nxt_task_t *task,
{
int res;
nxt_app_t *app;
- nxt_bool_t start_process;
+ nxt_bool_t start_process, unlinked;
nxt_port_t *app_port, *main_app_port, *idle_port;
nxt_queue_link_t *idle_lnk;
nxt_http_request_t *r;
@@ -3797,19 +3822,31 @@ nxt_router_req_headers_ack_handler(nxt_task_t *task,
msg->port_msg.pid);
app = req_rpc_data->app;
+ r = req_rpc_data->request;
start_process = 0;
+ unlinked = 0;
nxt_thread_mutex_lock(&app->mutex);
+ if (r->app_link.next != NULL) {
+ nxt_queue_remove(&r->app_link);
+ r->app_link.next = NULL;
+
+ unlinked = 1;
+ }
+
app_port = nxt_port_hash_find(&app->port_hash, msg->port_msg.pid,
msg->port_msg.reply_port);
if (nxt_slow_path(app_port == NULL)) {
nxt_thread_mutex_unlock(&app->mutex);
- r = req_rpc_data->request;
nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
+ if (unlinked) {
+ nxt_mp_release(r->mem_pool);
+ }
+
return;
}
@@ -3857,6 +3894,10 @@ nxt_router_req_headers_ack_handler(nxt_task_t *task,
nxt_thread_mutex_unlock(&app->mutex);
+ if (unlinked) {
+ nxt_mp_release(r->mem_pool);
+ }
+
if (start_process) {
nxt_router_start_app_process(task, app);
}
@@ -3877,15 +3918,11 @@ nxt_router_req_headers_ack_handler(nxt_task_t *task,
task->thread->engine->port->id, NULL);
if (nxt_slow_path(res != NXT_OK)) {
- r = req_rpc_data->request;
-
nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
}
}
if (app->timeout != 0) {
- r = req_rpc_data->request;
-
r->timer.handler = nxt_router_app_timeout;
r->timer_data = req_rpc_data;
nxt_timer_add(task->thread->engine, &r->timer, app->timeout);
@@ -4028,8 +4065,10 @@ static void
nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
void *data)
{
- nxt_app_t *app;
- nxt_app_joint_t *app_joint;
+ nxt_app_t *app;
+ nxt_app_joint_t *app_joint;
+ nxt_queue_link_t *link;
+ nxt_http_request_t *r;
app_joint = data;
@@ -4047,15 +4086,43 @@ nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_debug(task, "app '%V' %p start error", &app->name, app);
+ link = NULL;
+
nxt_thread_mutex_lock(&app->mutex);
nxt_assert(app->pending_processes != 0);
app->pending_processes--;
+ if (app->processes == 0 && !nxt_queue_is_empty(&app->ack_waiting_req)) {
+ link = nxt_queue_first(&app->ack_waiting_req);
+
+ nxt_queue_remove(link);
+ link->next = NULL;
+ }
+
nxt_thread_mutex_unlock(&app->mutex);
- /* TODO req_app_link to cancel first pending message */
+ while (link != NULL) {
+ r = nxt_container_of(link, nxt_http_request_t, app_link);
+
+ nxt_event_engine_post(r->engine, &r->err_work);
+
+ link = NULL;
+
+ nxt_thread_mutex_lock(&app->mutex);
+
+ if (app->processes == 0 && app->pending_processes == 0
+ && !nxt_queue_is_empty(&app->ack_waiting_req))
+ {
+ link = nxt_queue_first(&app->ack_waiting_req);
+
+ nxt_queue_remove(link);
+ link->next = NULL;
+ }
+
+ nxt_thread_mutex_unlock(&app->mutex);
+ }
}
@@ -4541,8 +4608,9 @@ static void
nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
nxt_request_rpc_data_t *req_rpc_data)
{
- nxt_bool_t start_process;
- nxt_port_t *port;
+ nxt_bool_t start_process;
+ nxt_port_t *port;
+ nxt_http_request_t *r;
start_process = 0;
@@ -4558,8 +4626,22 @@ nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
start_process = 1;
}
+ r = req_rpc_data->request;
+
+ /*
+ * Put request into application-wide list to be able to cancel request
+ * if something goes wrong with application processes.
+ */
+ nxt_queue_insert_tail(&app->ack_waiting_req, &r->app_link);
+
nxt_thread_mutex_unlock(&app->mutex);
+ /*
+ * Retain request memory pool while request is linked in ack_waiting_req
+ * to guarantee request structure memory is accessble.
+ */
+ nxt_mp_retain(r->mem_pool);
+
req_rpc_data->app_port = port;
req_rpc_data->apr_action = NXT_APR_REQUEST_FAILED;
@@ -4602,6 +4684,11 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r,
r->timer.log = engine->task.log;
r->timer.bias = NXT_TIMER_DEFAULT_BIAS;
+ r->engine = engine;
+ r->err_work.handler = nxt_router_http_request_error;
+ r->err_work.task = task;
+ r->err_work.obj = r;
+
req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data);
req_rpc_data->app = app;
req_rpc_data->msg_info.body_fd = -1;
@@ -4622,6 +4709,25 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r,
static void
+nxt_router_http_request_error(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_http_request_t *r;
+
+ r = obj;
+
+ nxt_debug(task, "router http request error (rpc_data %p)", r->req_rpc_data);
+
+ nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
+
+ if (r->req_rpc_data != NULL) {
+ nxt_request_rpc_data_unlink(task, r->req_rpc_data);
+ }
+
+ nxt_mp_release(r->mem_pool);
+}
+
+
+static void
nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data)
{
nxt_http_request_t *r;
@@ -4630,7 +4736,7 @@ nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data)
nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data);
- if (r->req_rpc_data) {
+ if (r->req_rpc_data != NULL) {
nxt_request_rpc_data_unlink(task, r->req_rpc_data);
}
diff --git a/src/nxt_router.h b/src/nxt_router.h
index ead8f292..0b1147f8 100644
--- a/src/nxt_router.h
+++ b/src/nxt_router.h
@@ -138,6 +138,7 @@ struct nxt_app_s {
nxt_str_t conf;
nxt_atomic_t use_count;
+ nxt_queue_t ack_waiting_req; /* of nxt_http_request_t.app_link */
nxt_app_joint_t *joint;
nxt_port_t *shared_port;