diff options
author | Max Romanov <max.romanov@nginx.com> | 2020-08-12 15:25:29 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2020-08-12 15:25:29 +0300 |
commit | 09685e2b4143ec19afef7673a455cf7e4d1414b7 (patch) | |
tree | 411475465b305cce6307fa6687398251fb916744 | |
parent | 2136eb411c9b99ffd65751bd13e10ce426be2492 (diff) | |
download | unit-09685e2b4143ec19afef7673a455cf7e4d1414b7.tar.gz unit-09685e2b4143ec19afef7673a455cf7e4d1414b7.tar.bz2 |
Responding with error in case of first process start failure.
After shared application port introducing, request queue in router was
removed and requests may stuck forever waiting for another process start.
-rw-r--r-- | src/nxt_http.h | 4 | ||||
-rw-r--r-- | src/nxt_router.c | 142 | ||||
-rw-r--r-- | src/nxt_router.h | 1 |
3 files changed, 129 insertions, 18 deletions
diff --git a/src/nxt_http.h b/src/nxt_http.h index 67ac00d8..6c84843f 100644 --- a/src/nxt_http.h +++ b/src/nxt_http.h @@ -169,6 +169,10 @@ struct nxt_http_request_s { nxt_http_peer_t *peer; nxt_buf_t *last; + nxt_queue_link_t app_link; /* nxt_app_t.ack_waiting_req */ + nxt_event_engine_t *engine; + nxt_work_t err_work; + nxt_http_response_t resp; nxt_http_status_t status:16; diff --git a/src/nxt_router.c b/src/nxt_router.c index df0d96ad..0ccf6593 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -204,6 +204,8 @@ static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, nxt_apr_action_t action); static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app, nxt_request_rpc_data_t *req_rpc_data); +static void nxt_router_http_request_error(nxt_task_t *task, void *obj, + void *data); static void nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data); @@ -539,6 +541,8 @@ nxt_inline void nxt_request_rpc_data_unlink(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data) { + nxt_app_t *app; + nxt_bool_t unlinked; nxt_http_request_t *r; nxt_router_msg_cancel(task, req_rpc_data); @@ -550,12 +554,7 @@ nxt_request_rpc_data_unlink(nxt_task_t *task, req_rpc_data->app_port = NULL; } - if (req_rpc_data->app != NULL) { - nxt_router_app_use(task, req_rpc_data->app, -1); - - req_rpc_data->app = NULL; - } - + app = req_rpc_data->app; r = req_rpc_data->request; if (r != NULL) { @@ -565,6 +564,31 @@ nxt_request_rpc_data_unlink(nxt_task_t *task, r->req_rpc_data = NULL; req_rpc_data->request = NULL; + + if (app != NULL) { + unlinked = 0; + + nxt_thread_mutex_lock(&app->mutex); + + if (r->app_link.next != NULL) { + nxt_queue_remove(&r->app_link); + r->app_link.next = NULL; + + unlinked = 1; + } + + nxt_thread_mutex_unlock(&app->mutex); + + if (unlinked) { + nxt_mp_release(r->mem_pool); + } + } + } + + if (app != NULL) { + nxt_router_app_use(task, app, -1); + + req_rpc_data->app = NULL; } if (req_rpc_data->msg_info.body_fd != -1) { @@ -1492,6 +1516,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_queue_init(&app->ports); nxt_queue_init(&app->spare_ports); nxt_queue_init(&app->idle_ports); + nxt_queue_init(&app->ack_waiting_req); app->name.length = name.length; nxt_memcpy(app->name.start, name.start, name.length); @@ -3784,7 +3809,7 @@ nxt_router_req_headers_ack_handler(nxt_task_t *task, { int res; nxt_app_t *app; - nxt_bool_t start_process; + nxt_bool_t start_process, unlinked; nxt_port_t *app_port, *main_app_port, *idle_port; nxt_queue_link_t *idle_lnk; nxt_http_request_t *r; @@ -3797,19 +3822,31 @@ nxt_router_req_headers_ack_handler(nxt_task_t *task, msg->port_msg.pid); app = req_rpc_data->app; + r = req_rpc_data->request; start_process = 0; + unlinked = 0; nxt_thread_mutex_lock(&app->mutex); + if (r->app_link.next != NULL) { + nxt_queue_remove(&r->app_link); + r->app_link.next = NULL; + + unlinked = 1; + } + app_port = nxt_port_hash_find(&app->port_hash, msg->port_msg.pid, msg->port_msg.reply_port); if (nxt_slow_path(app_port == NULL)) { nxt_thread_mutex_unlock(&app->mutex); - r = req_rpc_data->request; nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); + if (unlinked) { + nxt_mp_release(r->mem_pool); + } + return; } @@ -3857,6 +3894,10 @@ nxt_router_req_headers_ack_handler(nxt_task_t *task, nxt_thread_mutex_unlock(&app->mutex); + if (unlinked) { + nxt_mp_release(r->mem_pool); + } + if (start_process) { nxt_router_start_app_process(task, app); } @@ -3877,15 +3918,11 @@ nxt_router_req_headers_ack_handler(nxt_task_t *task, task->thread->engine->port->id, NULL); if (nxt_slow_path(res != NXT_OK)) { - r = req_rpc_data->request; - nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); } } if (app->timeout != 0) { - r = req_rpc_data->request; - r->timer.handler = nxt_router_app_timeout; r->timer_data = req_rpc_data; nxt_timer_add(task->thread->engine, &r->timer, app->timeout); @@ -4028,8 +4065,10 @@ static void nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) { - nxt_app_t *app; - nxt_app_joint_t *app_joint; + nxt_app_t *app; + nxt_app_joint_t *app_joint; + nxt_queue_link_t *link; + nxt_http_request_t *r; app_joint = data; @@ -4047,15 +4086,43 @@ nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_debug(task, "app '%V' %p start error", &app->name, app); + link = NULL; + nxt_thread_mutex_lock(&app->mutex); nxt_assert(app->pending_processes != 0); app->pending_processes--; + if (app->processes == 0 && !nxt_queue_is_empty(&app->ack_waiting_req)) { + link = nxt_queue_first(&app->ack_waiting_req); + + nxt_queue_remove(link); + link->next = NULL; + } + nxt_thread_mutex_unlock(&app->mutex); - /* TODO req_app_link to cancel first pending message */ + while (link != NULL) { + r = nxt_container_of(link, nxt_http_request_t, app_link); + + nxt_event_engine_post(r->engine, &r->err_work); + + link = NULL; + + nxt_thread_mutex_lock(&app->mutex); + + if (app->processes == 0 && app->pending_processes == 0 + && !nxt_queue_is_empty(&app->ack_waiting_req)) + { + link = nxt_queue_first(&app->ack_waiting_req); + + nxt_queue_remove(link); + link->next = NULL; + } + + nxt_thread_mutex_unlock(&app->mutex); + } } @@ -4541,8 +4608,9 @@ static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app, nxt_request_rpc_data_t *req_rpc_data) { - nxt_bool_t start_process; - nxt_port_t *port; + nxt_bool_t start_process; + nxt_port_t *port; + nxt_http_request_t *r; start_process = 0; @@ -4558,8 +4626,22 @@ nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app, start_process = 1; } + r = req_rpc_data->request; + + /* + * Put request into application-wide list to be able to cancel request + * if something goes wrong with application processes. + */ + nxt_queue_insert_tail(&app->ack_waiting_req, &r->app_link); + nxt_thread_mutex_unlock(&app->mutex); + /* + * Retain request memory pool while request is linked in ack_waiting_req + * to guarantee request structure memory is accessble. + */ + nxt_mp_retain(r->mem_pool); + req_rpc_data->app_port = port; req_rpc_data->apr_action = NXT_APR_REQUEST_FAILED; @@ -4602,6 +4684,11 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r, r->timer.log = engine->task.log; r->timer.bias = NXT_TIMER_DEFAULT_BIAS; + r->engine = engine; + r->err_work.handler = nxt_router_http_request_error; + r->err_work.task = task; + r->err_work.obj = r; + req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data); req_rpc_data->app = app; req_rpc_data->msg_info.body_fd = -1; @@ -4622,6 +4709,25 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r, static void +nxt_router_http_request_error(nxt_task_t *task, void *obj, void *data) +{ + nxt_http_request_t *r; + + r = obj; + + nxt_debug(task, "router http request error (rpc_data %p)", r->req_rpc_data); + + nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); + + if (r->req_rpc_data != NULL) { + nxt_request_rpc_data_unlink(task, r->req_rpc_data); + } + + nxt_mp_release(r->mem_pool); +} + + +static void nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data) { nxt_http_request_t *r; @@ -4630,7 +4736,7 @@ nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data) nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data); - if (r->req_rpc_data) { + if (r->req_rpc_data != NULL) { nxt_request_rpc_data_unlink(task, r->req_rpc_data); } diff --git a/src/nxt_router.h b/src/nxt_router.h index ead8f292..0b1147f8 100644 --- a/src/nxt_router.h +++ b/src/nxt_router.h @@ -138,6 +138,7 @@ struct nxt_app_s { nxt_str_t conf; nxt_atomic_t use_count; + nxt_queue_t ack_waiting_req; /* of nxt_http_request_t.app_link */ nxt_app_joint_t *joint; nxt_port_t *shared_port; |