summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_router.c
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2020-08-12 15:25:29 +0300
committerMax Romanov <max.romanov@nginx.com>2020-08-12 15:25:29 +0300
commit09685e2b4143ec19afef7673a455cf7e4d1414b7 (patch)
tree411475465b305cce6307fa6687398251fb916744 /src/nxt_router.c
parent2136eb411c9b99ffd65751bd13e10ce426be2492 (diff)
downloadunit-09685e2b4143ec19afef7673a455cf7e4d1414b7.tar.gz
unit-09685e2b4143ec19afef7673a455cf7e4d1414b7.tar.bz2
Responding with error in case of first process start failure.
After shared application port introducing, request queue in router was removed and requests may stuck forever waiting for another process start.
Diffstat (limited to '')
-rw-r--r--src/nxt_router.c142
1 files changed, 124 insertions, 18 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c
index df0d96ad..0ccf6593 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -204,6 +204,8 @@ static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
nxt_apr_action_t action);
static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
nxt_request_rpc_data_t *req_rpc_data);
+static void nxt_router_http_request_error(nxt_task_t *task, void *obj,
+ void *data);
static void nxt_router_http_request_done(nxt_task_t *task, void *obj,
void *data);
@@ -539,6 +541,8 @@ nxt_inline void
nxt_request_rpc_data_unlink(nxt_task_t *task,
nxt_request_rpc_data_t *req_rpc_data)
{
+ nxt_app_t *app;
+ nxt_bool_t unlinked;
nxt_http_request_t *r;
nxt_router_msg_cancel(task, req_rpc_data);
@@ -550,12 +554,7 @@ nxt_request_rpc_data_unlink(nxt_task_t *task,
req_rpc_data->app_port = NULL;
}
- if (req_rpc_data->app != NULL) {
- nxt_router_app_use(task, req_rpc_data->app, -1);
-
- req_rpc_data->app = NULL;
- }
-
+ app = req_rpc_data->app;
r = req_rpc_data->request;
if (r != NULL) {
@@ -565,6 +564,31 @@ nxt_request_rpc_data_unlink(nxt_task_t *task,
r->req_rpc_data = NULL;
req_rpc_data->request = NULL;
+
+ if (app != NULL) {
+ unlinked = 0;
+
+ nxt_thread_mutex_lock(&app->mutex);
+
+ if (r->app_link.next != NULL) {
+ nxt_queue_remove(&r->app_link);
+ r->app_link.next = NULL;
+
+ unlinked = 1;
+ }
+
+ nxt_thread_mutex_unlock(&app->mutex);
+
+ if (unlinked) {
+ nxt_mp_release(r->mem_pool);
+ }
+ }
+ }
+
+ if (app != NULL) {
+ nxt_router_app_use(task, app, -1);
+
+ req_rpc_data->app = NULL;
}
if (req_rpc_data->msg_info.body_fd != -1) {
@@ -1492,6 +1516,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_queue_init(&app->ports);
nxt_queue_init(&app->spare_ports);
nxt_queue_init(&app->idle_ports);
+ nxt_queue_init(&app->ack_waiting_req);
app->name.length = name.length;
nxt_memcpy(app->name.start, name.start, name.length);
@@ -3784,7 +3809,7 @@ nxt_router_req_headers_ack_handler(nxt_task_t *task,
{
int res;
nxt_app_t *app;
- nxt_bool_t start_process;
+ nxt_bool_t start_process, unlinked;
nxt_port_t *app_port, *main_app_port, *idle_port;
nxt_queue_link_t *idle_lnk;
nxt_http_request_t *r;
@@ -3797,19 +3822,31 @@ nxt_router_req_headers_ack_handler(nxt_task_t *task,
msg->port_msg.pid);
app = req_rpc_data->app;
+ r = req_rpc_data->request;
start_process = 0;
+ unlinked = 0;
nxt_thread_mutex_lock(&app->mutex);
+ if (r->app_link.next != NULL) {
+ nxt_queue_remove(&r->app_link);
+ r->app_link.next = NULL;
+
+ unlinked = 1;
+ }
+
app_port = nxt_port_hash_find(&app->port_hash, msg->port_msg.pid,
msg->port_msg.reply_port);
if (nxt_slow_path(app_port == NULL)) {
nxt_thread_mutex_unlock(&app->mutex);
- r = req_rpc_data->request;
nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
+ if (unlinked) {
+ nxt_mp_release(r->mem_pool);
+ }
+
return;
}
@@ -3857,6 +3894,10 @@ nxt_router_req_headers_ack_handler(nxt_task_t *task,
nxt_thread_mutex_unlock(&app->mutex);
+ if (unlinked) {
+ nxt_mp_release(r->mem_pool);
+ }
+
if (start_process) {
nxt_router_start_app_process(task, app);
}
@@ -3877,15 +3918,11 @@ nxt_router_req_headers_ack_handler(nxt_task_t *task,
task->thread->engine->port->id, NULL);
if (nxt_slow_path(res != NXT_OK)) {
- r = req_rpc_data->request;
-
nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
}
}
if (app->timeout != 0) {
- r = req_rpc_data->request;
-
r->timer.handler = nxt_router_app_timeout;
r->timer_data = req_rpc_data;
nxt_timer_add(task->thread->engine, &r->timer, app->timeout);
@@ -4028,8 +4065,10 @@ static void
nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
void *data)
{
- nxt_app_t *app;
- nxt_app_joint_t *app_joint;
+ nxt_app_t *app;
+ nxt_app_joint_t *app_joint;
+ nxt_queue_link_t *link;
+ nxt_http_request_t *r;
app_joint = data;
@@ -4047,15 +4086,43 @@ nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_debug(task, "app '%V' %p start error", &app->name, app);
+ link = NULL;
+
nxt_thread_mutex_lock(&app->mutex);
nxt_assert(app->pending_processes != 0);
app->pending_processes--;
+ if (app->processes == 0 && !nxt_queue_is_empty(&app->ack_waiting_req)) {
+ link = nxt_queue_first(&app->ack_waiting_req);
+
+ nxt_queue_remove(link);
+ link->next = NULL;
+ }
+
nxt_thread_mutex_unlock(&app->mutex);
- /* TODO req_app_link to cancel first pending message */
+ while (link != NULL) {
+ r = nxt_container_of(link, nxt_http_request_t, app_link);
+
+ nxt_event_engine_post(r->engine, &r->err_work);
+
+ link = NULL;
+
+ nxt_thread_mutex_lock(&app->mutex);
+
+ if (app->processes == 0 && app->pending_processes == 0
+ && !nxt_queue_is_empty(&app->ack_waiting_req))
+ {
+ link = nxt_queue_first(&app->ack_waiting_req);
+
+ nxt_queue_remove(link);
+ link->next = NULL;
+ }
+
+ nxt_thread_mutex_unlock(&app->mutex);
+ }
}
@@ -4541,8 +4608,9 @@ static void
nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
nxt_request_rpc_data_t *req_rpc_data)
{
- nxt_bool_t start_process;
- nxt_port_t *port;
+ nxt_bool_t start_process;
+ nxt_port_t *port;
+ nxt_http_request_t *r;
start_process = 0;
@@ -4558,8 +4626,22 @@ nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
start_process = 1;
}
+ r = req_rpc_data->request;
+
+ /*
+ * Put request into application-wide list to be able to cancel request
+ * if something goes wrong with application processes.
+ */
+ nxt_queue_insert_tail(&app->ack_waiting_req, &r->app_link);
+
nxt_thread_mutex_unlock(&app->mutex);
+ /*
+ * Retain request memory pool while request is linked in ack_waiting_req
+ * to guarantee request structure memory is accessble.
+ */
+ nxt_mp_retain(r->mem_pool);
+
req_rpc_data->app_port = port;
req_rpc_data->apr_action = NXT_APR_REQUEST_FAILED;
@@ -4602,6 +4684,11 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r,
r->timer.log = engine->task.log;
r->timer.bias = NXT_TIMER_DEFAULT_BIAS;
+ r->engine = engine;
+ r->err_work.handler = nxt_router_http_request_error;
+ r->err_work.task = task;
+ r->err_work.obj = r;
+
req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data);
req_rpc_data->app = app;
req_rpc_data->msg_info.body_fd = -1;
@@ -4622,6 +4709,25 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r,
static void
+nxt_router_http_request_error(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_http_request_t *r;
+
+ r = obj;
+
+ nxt_debug(task, "router http request error (rpc_data %p)", r->req_rpc_data);
+
+ nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
+
+ if (r->req_rpc_data != NULL) {
+ nxt_request_rpc_data_unlink(task, r->req_rpc_data);
+ }
+
+ nxt_mp_release(r->mem_pool);
+}
+
+
+static void
nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data)
{
nxt_http_request_t *r;
@@ -4630,7 +4736,7 @@ nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data)
nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data);
- if (r->req_rpc_data) {
+ if (r->req_rpc_data != NULL) {
nxt_request_rpc_data_unlink(task, r->req_rpc_data);
}