diff options
author | Max Romanov <max.romanov@nginx.com> | 2017-10-04 15:00:05 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2017-10-04 15:00:05 +0300 |
commit | a4b5b5d45d26077ac822b25d5d26f58661c02dcf (patch) | |
tree | 4784bc865c01499b3e073972d2f2fe0fbfb75fa3 /src/nxt_router.c | |
parent | ebbe89bd5c22ef903e5cd1a898ad75c6739ae4bc (diff) | |
download | unit-a4b5b5d45d26077ac822b25d5d26f58661c02dcf.tar.gz unit-a4b5b5d45d26077ac822b25d5d26f58661c02dcf.tar.bz2 |
Fixed error generation during request processing.
Request can be processed in thread different from the thread where the
connection originally handled.
Because of possible racing conditions, using original connection structures
is unsafe. To solve this, error condition is registered in 'ra' (request <->
application link) and traversed back to original connection thread where
the error message can be generated and send back to client.
Diffstat (limited to 'src/nxt_router.c')
-rw-r--r-- | src/nxt_router.c | 134 |
1 files changed, 94 insertions, 40 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c index 5eb95b59..63d11055 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -49,6 +49,9 @@ struct nxt_req_app_link_s { nxt_mp_t *mem_pool; nxt_work_t work; + + int err_code; + const char *err_str; }; @@ -67,6 +70,9 @@ typedef struct { static nxt_int_t nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app); +static void nxt_router_ra_error(nxt_task_t *task, nxt_req_app_link_t *ra, + int code, const char* str); + static void nxt_router_worker_remove_pid_handler(nxt_task_t *task, void *obj, void *data); static void nxt_router_worker_remove_pid_done(nxt_task_t *task, void *obj, @@ -178,7 +184,7 @@ static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data); static nxt_msec_t nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data); static void nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code, - const char* fmt, ...); + const char* str); static nxt_router_t *nxt_router; @@ -444,6 +450,65 @@ nxt_router_ra_abort(nxt_task_t *task, void *obj, void *data) } +static void +nxt_router_ra_error_handler(nxt_task_t *task, void *obj, void *data) +{ + nxt_req_app_link_t *ra; + + ra = obj; + + nxt_router_ra_error(task, ra, ra->err_code, ra->err_str); +} + + +static void +nxt_router_ra_error(nxt_task_t *task, nxt_req_app_link_t *ra, int code, + const char* str) +{ + nxt_conn_t *c; + nxt_req_conn_link_t *rc; + nxt_event_engine_t *engine; + + engine = ra->work.data; + + if (task->thread->engine != engine) { + ra->err_code = code; + ra->err_str = str; + + ra->work.handler = nxt_router_ra_error_handler; + ra->work.task = &engine->task; + ra->work.next = NULL; + + nxt_debug(task, "ra stream #%uD post error to %p", ra->stream, engine); + + nxt_event_engine_post(engine, &ra->work); + + return; + } + + nxt_debug(task, "ra stream #%uD error", ra->stream); + + rc = ra->rc; + + if (rc != NULL) { + c = rc->conn; + + nxt_router_gen_error(task, c, code, str); + + rc->ra = NULL; + ra->rc = NULL; + } + + if (ra->app_port != NULL) { + nxt_router_app_port_release(task, ra->app_port, 0, 1); + + ra->app_port = NULL; + } + + nxt_mp_release(ra->mem_pool, ra); +} + + nxt_inline void nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc) { @@ -2382,12 +2447,11 @@ nxt_router_text_by_code(int code) static nxt_buf_t * nxt_router_get_error_buf(nxt_task_t *task, nxt_mp_t *mp, int code, - const char* fmt, va_list args) + const char* str) { - nxt_buf_t *b, *last; - const char *msg; + nxt_buf_t *b, *last; - b = nxt_buf_mem_ts_alloc(task, mp, 16384); + b = nxt_buf_mem_alloc(mp, 16384, 0); if (nxt_slow_path(b == NULL)) { return NULL; } @@ -2398,23 +2462,17 @@ nxt_router_get_error_buf(nxt_task_t *task, nxt_mp_t *mp, int code, "Connection: close\r\n\r\n", code, nxt_router_text_by_code(code)); - msg = (const char *) b->mem.free; + b->mem.free = nxt_cpymem(b->mem.free, str, nxt_strlen(str)); - b->mem.free = nxt_vsprintf(b->mem.free, b->mem.end, fmt, args); - b->mem.free[0] = '\0'; + nxt_log_alert(task->log, "error %d: %s", code, str); - nxt_log_alert(task->log, "error %d: %s", code, msg); - - last = nxt_buf_mem_ts_alloc(task, mp, 0); + last = nxt_buf_sync_alloc(mp, NXT_BUF_SYNC_LAST); if (nxt_slow_path(last == NULL)) { - nxt_mp_release(mp, b); + nxt_mp_free(mp, b); return NULL; } - nxt_buf_set_sync(last); - nxt_buf_set_last(last); - nxt_buf_chain_add(&b, last); return b; @@ -2424,24 +2482,20 @@ nxt_router_get_error_buf(nxt_task_t *task, nxt_mp_t *mp, int code, static void nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code, - const char* fmt, ...) + const char* str) { - va_list args; nxt_mp_t *mp; nxt_buf_t *b; - /* TODO: fix when called from main thread */ /* TODO: fix when called in the middle of response */ - mp = nxt_mp_create(1024, 128, 256, 32); + mp = c->mem_pool; - va_start(args, fmt); - b = nxt_router_get_error_buf(task, mp, code, fmt, args); - va_end(args); + b = nxt_router_get_error_buf(task, mp, code, str); if (c->socket.fd == -1) { - nxt_mp_release(mp, b->next); - nxt_mp_release(mp, b); + nxt_mp_free(mp, b->next); + nxt_mp_free(mp, b); return; } @@ -3115,19 +3169,14 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra) uint32_t request_failed; nxt_int_t res; nxt_port_t *port, *c_port, *reply_port; - nxt_conn_t *c; nxt_app_wmsg_t wmsg; nxt_app_parse_ctx_t *ap; - /* TODO: it is unsafe to use ra->rc and ra->rc->conn in main thread */ - - nxt_assert(ra->rc != NULL); nxt_assert(ra->app_port != NULL); port = ra->app_port; reply_port = ra->reply_port; ap = ra->ap; - c = ra->rc->conn; request_failed = 1; @@ -3137,8 +3186,9 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra) res = nxt_port_send_port(task, port, reply_port, 0); if (nxt_slow_path(res != NXT_OK)) { - nxt_router_gen_error(task, c, 500, - "Failed to send reply port to application"); + nxt_router_ra_error(task, ra, 500, + "Failed to send reply port to application"); + ra = NULL; goto release_port; } @@ -3153,8 +3203,9 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra) res = port->app->prepare_msg(task, &ap->r, &wmsg); if (nxt_slow_path(res != NXT_OK)) { - nxt_router_gen_error(task, c, 500, - "Failed to prepare message for application"); + nxt_router_ra_error(task, ra, 500, + "Failed to prepare message for application"); + ra = NULL; goto release_port; } @@ -3168,20 +3219,23 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra) -1, ra->stream, reply_port->id, wmsg.write); if (nxt_slow_path(res != NXT_OK)) { - nxt_router_gen_error(task, c, 500, - "Failed to send message to application"); + nxt_router_ra_error(task, ra, 500, + "Failed to send message to application"); + ra = NULL; goto release_port; } release_port: - if (request_failed != 0) { - ra->app_port = 0; - } - nxt_router_app_port_release(task, port, request_failed, 0); - nxt_router_ra_release(task, ra, ra->work.data); + if (ra != NULL) { + if (request_failed != 0) { + ra->app_port = 0; + } + + nxt_router_ra_release(task, ra, ra->work.data); + } } |