diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/nxt_http.h | 4 | ||||
-rw-r--r-- | src/nxt_router.c | 142 | ||||
-rw-r--r-- | src/nxt_router.h | 1 |
3 files changed, 129 insertions, 18 deletions
diff --git a/src/nxt_http.h b/src/nxt_http.h index 67ac00d8..6c84843f 100644 --- a/src/nxt_http.h +++ b/src/nxt_http.h @@ -169,6 +169,10 @@ struct nxt_http_request_s { nxt_http_peer_t *peer; nxt_buf_t *last; + nxt_queue_link_t app_link; /* nxt_app_t.ack_waiting_req */ + nxt_event_engine_t *engine; + nxt_work_t err_work; + nxt_http_response_t resp; nxt_http_status_t status:16; diff --git a/src/nxt_router.c b/src/nxt_router.c index df0d96ad..0ccf6593 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -204,6 +204,8 @@ static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, nxt_apr_action_t action); static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app, nxt_request_rpc_data_t *req_rpc_data); +static void nxt_router_http_request_error(nxt_task_t *task, void *obj, + void *data); static void nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data); @@ -539,6 +541,8 @@ nxt_inline void nxt_request_rpc_data_unlink(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data) { + nxt_app_t *app; + nxt_bool_t unlinked; nxt_http_request_t *r; nxt_router_msg_cancel(task, req_rpc_data); @@ -550,12 +554,7 @@ nxt_request_rpc_data_unlink(nxt_task_t *task, req_rpc_data->app_port = NULL; } - if (req_rpc_data->app != NULL) { - nxt_router_app_use(task, req_rpc_data->app, -1); - - req_rpc_data->app = NULL; - } - + app = req_rpc_data->app; r = req_rpc_data->request; if (r != NULL) { @@ -565,6 +564,31 @@ nxt_request_rpc_data_unlink(nxt_task_t *task, r->req_rpc_data = NULL; req_rpc_data->request = NULL; + + if (app != NULL) { + unlinked = 0; + + nxt_thread_mutex_lock(&app->mutex); + + if (r->app_link.next != NULL) { + nxt_queue_remove(&r->app_link); + r->app_link.next = NULL; + + unlinked = 1; + } + + nxt_thread_mutex_unlock(&app->mutex); + + if (unlinked) { + nxt_mp_release(r->mem_pool); + } + } + } + + if (app != NULL) { + nxt_router_app_use(task, app, -1); + + req_rpc_data->app = NULL; } if (req_rpc_data->msg_info.body_fd != -1) { @@ -1492,6 +1516,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf, nxt_queue_init(&app->ports); nxt_queue_init(&app->spare_ports); nxt_queue_init(&app->idle_ports); + nxt_queue_init(&app->ack_waiting_req); app->name.length = name.length; nxt_memcpy(app->name.start, name.start, name.length); @@ -3784,7 +3809,7 @@ nxt_router_req_headers_ack_handler(nxt_task_t *task, { int res; nxt_app_t *app; - nxt_bool_t start_process; + nxt_bool_t start_process, unlinked; nxt_port_t *app_port, *main_app_port, *idle_port; nxt_queue_link_t *idle_lnk; nxt_http_request_t *r; @@ -3797,19 +3822,31 @@ nxt_router_req_headers_ack_handler(nxt_task_t *task, msg->port_msg.pid); app = req_rpc_data->app; + r = req_rpc_data->request; start_process = 0; + unlinked = 0; nxt_thread_mutex_lock(&app->mutex); + if (r->app_link.next != NULL) { + nxt_queue_remove(&r->app_link); + r->app_link.next = NULL; + + unlinked = 1; + } + app_port = nxt_port_hash_find(&app->port_hash, msg->port_msg.pid, msg->port_msg.reply_port); if (nxt_slow_path(app_port == NULL)) { nxt_thread_mutex_unlock(&app->mutex); - r = req_rpc_data->request; nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); + if (unlinked) { + nxt_mp_release(r->mem_pool); + } + return; } @@ -3857,6 +3894,10 @@ nxt_router_req_headers_ack_handler(nxt_task_t *task, nxt_thread_mutex_unlock(&app->mutex); + if (unlinked) { + nxt_mp_release(r->mem_pool); + } + if (start_process) { nxt_router_start_app_process(task, app); } @@ -3877,15 +3918,11 @@ nxt_router_req_headers_ack_handler(nxt_task_t *task, task->thread->engine->port->id, NULL); if (nxt_slow_path(res != NXT_OK)) { - r = req_rpc_data->request; - nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); } } if (app->timeout != 0) { - r = req_rpc_data->request; - r->timer.handler = nxt_router_app_timeout; r->timer_data = req_rpc_data; nxt_timer_add(task->thread->engine, &r->timer, app->timeout); @@ -4028,8 +4065,10 @@ static void nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) { - nxt_app_t *app; - nxt_app_joint_t *app_joint; + nxt_app_t *app; + nxt_app_joint_t *app_joint; + nxt_queue_link_t *link; + nxt_http_request_t *r; app_joint = data; @@ -4047,15 +4086,43 @@ nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_debug(task, "app '%V' %p start error", &app->name, app); + link = NULL; + nxt_thread_mutex_lock(&app->mutex); nxt_assert(app->pending_processes != 0); app->pending_processes--; + if (app->processes == 0 && !nxt_queue_is_empty(&app->ack_waiting_req)) { + link = nxt_queue_first(&app->ack_waiting_req); + + nxt_queue_remove(link); + link->next = NULL; + } + nxt_thread_mutex_unlock(&app->mutex); - /* TODO req_app_link to cancel first pending message */ + while (link != NULL) { + r = nxt_container_of(link, nxt_http_request_t, app_link); + + nxt_event_engine_post(r->engine, &r->err_work); + + link = NULL; + + nxt_thread_mutex_lock(&app->mutex); + + if (app->processes == 0 && app->pending_processes == 0 + && !nxt_queue_is_empty(&app->ack_waiting_req)) + { + link = nxt_queue_first(&app->ack_waiting_req); + + nxt_queue_remove(link); + link->next = NULL; + } + + nxt_thread_mutex_unlock(&app->mutex); + } } @@ -4541,8 +4608,9 @@ static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app, nxt_request_rpc_data_t *req_rpc_data) { - nxt_bool_t start_process; - nxt_port_t *port; + nxt_bool_t start_process; + nxt_port_t *port; + nxt_http_request_t *r; start_process = 0; @@ -4558,8 +4626,22 @@ nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app, start_process = 1; } + r = req_rpc_data->request; + + /* + * Put request into application-wide list to be able to cancel request + * if something goes wrong with application processes. + */ + nxt_queue_insert_tail(&app->ack_waiting_req, &r->app_link); + nxt_thread_mutex_unlock(&app->mutex); + /* + * Retain request memory pool while request is linked in ack_waiting_req + * to guarantee request structure memory is accessble. + */ + nxt_mp_retain(r->mem_pool); + req_rpc_data->app_port = port; req_rpc_data->apr_action = NXT_APR_REQUEST_FAILED; @@ -4602,6 +4684,11 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r, r->timer.log = engine->task.log; r->timer.bias = NXT_TIMER_DEFAULT_BIAS; + r->engine = engine; + r->err_work.handler = nxt_router_http_request_error; + r->err_work.task = task; + r->err_work.obj = r; + req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data); req_rpc_data->app = app; req_rpc_data->msg_info.body_fd = -1; @@ -4622,6 +4709,25 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r, static void +nxt_router_http_request_error(nxt_task_t *task, void *obj, void *data) +{ + nxt_http_request_t *r; + + r = obj; + + nxt_debug(task, "router http request error (rpc_data %p)", r->req_rpc_data); + + nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); + + if (r->req_rpc_data != NULL) { + nxt_request_rpc_data_unlink(task, r->req_rpc_data); + } + + nxt_mp_release(r->mem_pool); +} + + +static void nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data) { nxt_http_request_t *r; @@ -4630,7 +4736,7 @@ nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data) nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data); - if (r->req_rpc_data) { + if (r->req_rpc_data != NULL) { nxt_request_rpc_data_unlink(task, r->req_rpc_data); } diff --git a/src/nxt_router.h b/src/nxt_router.h index ead8f292..0b1147f8 100644 --- a/src/nxt_router.h +++ b/src/nxt_router.h @@ -138,6 +138,7 @@ struct nxt_app_s { nxt_str_t conf; nxt_atomic_t use_count; + nxt_queue_t ack_waiting_req; /* of nxt_http_request_t.app_link */ nxt_app_joint_t *joint; nxt_port_t *shared_port; |