diff options
author | Max Romanov <max.romanov@nginx.com> | 2020-08-12 15:25:29 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2020-08-12 15:25:29 +0300 |
commit | 09685e2b4143ec19afef7673a455cf7e4d1414b7 (patch) | |
tree | 411475465b305cce6307fa6687398251fb916744 /src/nxt_router.c | |
parent | 2136eb411c9b99ffd65751bd13e10ce426be2492 (diff) | |
download | unit-09685e2b4143ec19afef7673a455cf7e4d1414b7.tar.gz unit-09685e2b4143ec19afef7673a455cf7e4d1414b7.tar.bz2 |
Responding with error in case of first process start failure.
After shared application port introducing, request queue in router was
removed and requests may stuck forever waiting for another process start.
Diffstat (limited to 'src/nxt_router.c')
-rw-r--r-- | src/nxt_router.c | 142 |
1 files changed, 124 insertions, 18 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c index df0d96ad..0ccf6593 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -204,6 +204,8 @@ static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, nxt_apr_action_t action); static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app, nxt_request_rpc_data_t *req_rpc_data); +static void nxt_router_http_request_error(nxt_task_t *task, void *obj, + void *data); static void nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data); @@ -539,6 +541,8 @@ nxt_inline void nxt_request_rpc_data_unlink(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data) { + nxt_app_t *app; + nxt_bool_t unlinked; nxt_http_request_t *r; nxt_router_msg_cancel(task, req_rpc_data); @@ -550,12 +554,7 @@ nxt_request_rpc_data_unlink(nxt_task_t *task, req_rpc_data->app_port = NULL; } - if (req_rpc_data->app != NULL) { - nxt_router_app_use(task, req_rpc_data->app, -1); - - req_rpc_data->app = NULL; - } - + app = req_rpc_data->app; r = req_rpc_data->request; if (r != NULL) { @@ -565,6 +564,31 @@ nxt_request_rpc_data_unlink(nxt_task_t *task, r->req_rpc_data = NULL; req_rpc_data->request = NULL; + + if (app != NULL) { + unlinked = 0; + + nxt_thread_mutex_lock(&app->mutex); + + if (r->app_link.next != NULL) { + nxt_queue_remove(&r->app_link); + r->app_link.next = NULL; + + unlinked = 1; + } + + nxt_thread_mutex_unlock(&app->mutex); + + if (unlinked) { + nxt_mp_release(r->mem_pool); + } + } + } + + if (app != NULL) { + nxt_router_app_use(task, app, -1); + + req_rpc_data->app = NULL; } if (req_rpc_data->msg_info.body_fd != -1) { @@ -1492,6 +1516,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_queue_init(&app->ports); nxt_queue_init(&app->spare_ports); nxt_queue_init(&app->idle_ports); + nxt_queue_init(&app->ack_waiting_req); app->name.length = name.length; nxt_memcpy(app->name.start, name.start, name.length); @@ -3784,7 +3809,7 @@ nxt_router_req_headers_ack_handler(nxt_task_t *task, { int res; nxt_app_t *app; - nxt_bool_t start_process; + nxt_bool_t start_process, unlinked; nxt_port_t *app_port, *main_app_port, *idle_port; nxt_queue_link_t *idle_lnk; nxt_http_request_t *r; @@ -3797,19 +3822,31 @@ nxt_router_req_headers_ack_handler(nxt_task_t *task, msg->port_msg.pid); app = req_rpc_data->app; + r = req_rpc_data->request; start_process = 0; + unlinked = 0; nxt_thread_mutex_lock(&app->mutex); + if (r->app_link.next != NULL) { + nxt_queue_remove(&r->app_link); + r->app_link.next = NULL; + + unlinked = 1; + } + app_port = nxt_port_hash_find(&app->port_hash, msg->port_msg.pid, msg->port_msg.reply_port); if (nxt_slow_path(app_port == NULL)) { nxt_thread_mutex_unlock(&app->mutex); - r = req_rpc_data->request; nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); + if (unlinked) { + nxt_mp_release(r->mem_pool); + } + return; } @@ -3857,6 +3894,10 @@ nxt_router_req_headers_ack_handler(nxt_task_t *task, nxt_thread_mutex_unlock(&app->mutex); + if (unlinked) { + nxt_mp_release(r->mem_pool); + } + if (start_process) { nxt_router_start_app_process(task, app); } @@ -3877,15 +3918,11 @@ nxt_router_req_headers_ack_handler(nxt_task_t *task, task->thread->engine->port->id, NULL); if (nxt_slow_path(res != NXT_OK)) { - r = req_rpc_data->request; - nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); } } if (app->timeout != 0) { - r = req_rpc_data->request; - r->timer.handler = nxt_router_app_timeout; r->timer_data = req_rpc_data; nxt_timer_add(task->thread->engine, &r->timer, app->timeout); @@ -4028,8 +4065,10 @@ static void nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) { - nxt_app_t *app; - nxt_app_joint_t *app_joint; + nxt_app_t *app; + nxt_app_joint_t *app_joint; + nxt_queue_link_t *link; + nxt_http_request_t *r; app_joint = data; @@ -4047,15 +4086,43 @@ nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_debug(task, "app '%V' %p start error", &app->name, app); + link = NULL; + nxt_thread_mutex_lock(&app->mutex); nxt_assert(app->pending_processes != 0); app->pending_processes--; + if (app->processes == 0 && !nxt_queue_is_empty(&app->ack_waiting_req)) { + link = nxt_queue_first(&app->ack_waiting_req); + + nxt_queue_remove(link); + link->next = NULL; + } + nxt_thread_mutex_unlock(&app->mutex); - /* TODO req_app_link to cancel first pending message */ + while (link != NULL) { + r = nxt_container_of(link, nxt_http_request_t, app_link); + + nxt_event_engine_post(r->engine, &r->err_work); + + link = NULL; + + nxt_thread_mutex_lock(&app->mutex); + + if (app->processes == 0 && app->pending_processes == 0 + && !nxt_queue_is_empty(&app->ack_waiting_req)) + { + link = nxt_queue_first(&app->ack_waiting_req); + + nxt_queue_remove(link); + link->next = NULL; + } + + nxt_thread_mutex_unlock(&app->mutex); + } } @@ -4541,8 +4608,9 @@ static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app, nxt_request_rpc_data_t *req_rpc_data) { - nxt_bool_t start_process; - nxt_port_t *port; + nxt_bool_t start_process; + nxt_port_t *port; + nxt_http_request_t *r; start_process = 0; @@ -4558,8 +4626,22 @@ nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app, start_process = 1; } + r = req_rpc_data->request; + + /* + * Put request into application-wide list to be able to cancel request + * if something goes wrong with application processes. + */ + nxt_queue_insert_tail(&app->ack_waiting_req, &r->app_link); + nxt_thread_mutex_unlock(&app->mutex); + /* + * Retain request memory pool while request is linked in ack_waiting_req + * to guarantee request structure memory is accessble. + */ + nxt_mp_retain(r->mem_pool); + req_rpc_data->app_port = port; req_rpc_data->apr_action = NXT_APR_REQUEST_FAILED; @@ -4602,6 +4684,11 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r, r->timer.log = engine->task.log; r->timer.bias = NXT_TIMER_DEFAULT_BIAS; + r->engine = engine; + r->err_work.handler = nxt_router_http_request_error; + r->err_work.task = task; + r->err_work.obj = r; + req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data); req_rpc_data->app = app; req_rpc_data->msg_info.body_fd = -1; @@ -4622,6 +4709,25 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r, static void +nxt_router_http_request_error(nxt_task_t *task, void *obj, void *data) +{ + nxt_http_request_t *r; + + r = obj; + + nxt_debug(task, "router http request error (rpc_data %p)", r->req_rpc_data); + + nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); + + if (r->req_rpc_data != NULL) { + nxt_request_rpc_data_unlink(task, r->req_rpc_data); + } + + nxt_mp_release(r->mem_pool); +} + + +static void nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data) { nxt_http_request_t *r; @@ -4630,7 +4736,7 @@ nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data) nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data); - if (r->req_rpc_data) { + if (r->req_rpc_data != NULL) { nxt_request_rpc_data_unlink(task, r->req_rpc_data); } |