diff options
-rw-r--r-- | src/nxt_router.c | 134 |
1 files changed, 94 insertions, 40 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c index 5eb95b59..63d11055 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -49,6 +49,9 @@ struct nxt_req_app_link_s { nxt_mp_t *mem_pool; nxt_work_t work; + + int err_code; + const char *err_str; }; @@ -67,6 +70,9 @@ typedef struct { static nxt_int_t nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app); +static void nxt_router_ra_error(nxt_task_t *task, nxt_req_app_link_t *ra, + int code, const char* str); + static void nxt_router_worker_remove_pid_handler(nxt_task_t *task, void *obj, void *data); static void nxt_router_worker_remove_pid_done(nxt_task_t *task, void *obj, @@ -178,7 +184,7 @@ static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data); static nxt_msec_t nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data); static void nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code, - const char* fmt, ...); + const char* str); static nxt_router_t *nxt_router; @@ -444,6 +450,65 @@ nxt_router_ra_abort(nxt_task_t *task, void *obj, void *data) } +static void +nxt_router_ra_error_handler(nxt_task_t *task, void *obj, void *data) +{ + nxt_req_app_link_t *ra; + + ra = obj; + + nxt_router_ra_error(task, ra, ra->err_code, ra->err_str); +} + + +static void +nxt_router_ra_error(nxt_task_t *task, nxt_req_app_link_t *ra, int code, + const char* str) +{ + nxt_conn_t *c; + nxt_req_conn_link_t *rc; + nxt_event_engine_t *engine; + + engine = ra->work.data; + + if (task->thread->engine != engine) { + ra->err_code = code; + ra->err_str = str; + + ra->work.handler = nxt_router_ra_error_handler; + ra->work.task = &engine->task; + ra->work.next = NULL; + + nxt_debug(task, "ra stream #%uD post error to %p", ra->stream, engine); + + nxt_event_engine_post(engine, &ra->work); + + return; + } + + nxt_debug(task, "ra stream #%uD error", ra->stream); + + rc = ra->rc; + + if (rc != NULL) { + c = rc->conn; + + nxt_router_gen_error(task, c, code, str); + + rc->ra = NULL; + ra->rc = NULL; + } + + if (ra->app_port != NULL) { + nxt_router_app_port_release(task, ra->app_port, 0, 1); + + ra->app_port = NULL; + } + + nxt_mp_release(ra->mem_pool, ra); +} + + nxt_inline void nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc) { @@ -2382,12 +2447,11 @@ nxt_router_text_by_code(int code) static nxt_buf_t * nxt_router_get_error_buf(nxt_task_t *task, nxt_mp_t *mp, int code, - const char* fmt, va_list args) + const char* str) { - nxt_buf_t *b, *last; - const char *msg; + nxt_buf_t *b, *last; - b = nxt_buf_mem_ts_alloc(task, mp, 16384); + b = nxt_buf_mem_alloc(mp, 16384, 0); if (nxt_slow_path(b == NULL)) { return NULL; } @@ -2398,23 +2462,17 @@ nxt_router_get_error_buf(nxt_task_t *task, nxt_mp_t *mp, int code, "Connection: close\r\n\r\n", code, nxt_router_text_by_code(code)); - msg = (const char *) b->mem.free; + b->mem.free = nxt_cpymem(b->mem.free, str, nxt_strlen(str)); - b->mem.free = nxt_vsprintf(b->mem.free, b->mem.end, fmt, args); - b->mem.free[0] = '\0'; + nxt_log_alert(task->log, "error %d: %s", code, str); - nxt_log_alert(task->log, "error %d: %s", code, msg); - - last = nxt_buf_mem_ts_alloc(task, mp, 0); + last = nxt_buf_sync_alloc(mp, NXT_BUF_SYNC_LAST); if (nxt_slow_path(last == NULL)) { - nxt_mp_release(mp, b); + nxt_mp_free(mp, b); return NULL; } - nxt_buf_set_sync(last); - nxt_buf_set_last(last); - nxt_buf_chain_add(&b, last); return b; @@ -2424,24 +2482,20 @@ nxt_router_get_error_buf(nxt_task_t *task, nxt_mp_t *mp, int code, static void nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code, - const char* fmt, ...) + const char* str) { - va_list args; nxt_mp_t *mp; nxt_buf_t *b; - /* TODO: fix when called from main thread */ /* TODO: fix when called in the middle of response */ - mp = nxt_mp_create(1024, 128, 256, 32); + mp = c->mem_pool; - va_start(args, fmt); - b = nxt_router_get_error_buf(task, mp, code, fmt, args); - va_end(args); + b = nxt_router_get_error_buf(task, mp, code, str); if (c->socket.fd == -1) { - nxt_mp_release(mp, b->next); - nxt_mp_release(mp, b); + nxt_mp_free(mp, b->next); + nxt_mp_free(mp, b); return; } @@ -3115,19 +3169,14 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra) uint32_t request_failed; nxt_int_t res; nxt_port_t *port, *c_port, *reply_port; - nxt_conn_t *c; nxt_app_wmsg_t wmsg; nxt_app_parse_ctx_t *ap; - /* TODO: it is unsafe to use ra->rc and ra->rc->conn in main thread */ - - nxt_assert(ra->rc != NULL); nxt_assert(ra->app_port != NULL); port = ra->app_port; reply_port = ra->reply_port; ap = ra->ap; - c = ra->rc->conn; request_failed = 1; @@ -3137,8 +3186,9 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra) res = nxt_port_send_port(task, port, reply_port, 0); if (nxt_slow_path(res != NXT_OK)) { - nxt_router_gen_error(task, c, 500, - "Failed to send reply port to application"); + nxt_router_ra_error(task, ra, 500, + "Failed to send reply port to application"); + ra = NULL; goto release_port; } @@ -3153,8 +3203,9 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra) res = port->app->prepare_msg(task, &ap->r, &wmsg); if (nxt_slow_path(res != NXT_OK)) { - nxt_router_gen_error(task, c, 500, - "Failed to prepare message for application"); + nxt_router_ra_error(task, ra, 500, + "Failed to prepare message for application"); + ra = NULL; goto release_port; } @@ -3168,20 +3219,23 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra) -1, ra->stream, reply_port->id, wmsg.write); if (nxt_slow_path(res != NXT_OK)) { - nxt_router_gen_error(task, c, 500, - "Failed to send message to application"); + nxt_router_ra_error(task, ra, 500, + "Failed to send message to application"); + ra = NULL; goto release_port; } release_port: - if (request_failed != 0) { - ra->app_port = 0; - } - nxt_router_app_port_release(task, port, request_failed, 0); - nxt_router_ra_release(task, ra, ra->work.data); + if (ra != NULL) { + if (request_failed != 0) { + ra->app_port = 0; + } + + nxt_router_ra_release(task, ra, ra->work.data); + } } |