summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/nxt_router.c134
1 files changed, 94 insertions, 40 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c
index 5eb95b59..63d11055 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -49,6 +49,9 @@ struct nxt_req_app_link_s {
nxt_mp_t *mem_pool;
nxt_work_t work;
+
+ int err_code;
+ const char *err_str;
};
@@ -67,6 +70,9 @@ typedef struct {
static nxt_int_t nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app);
+static void nxt_router_ra_error(nxt_task_t *task, nxt_req_app_link_t *ra,
+ int code, const char* str);
+
static void nxt_router_worker_remove_pid_handler(nxt_task_t *task, void *obj,
void *data);
static void nxt_router_worker_remove_pid_done(nxt_task_t *task, void *obj,
@@ -178,7 +184,7 @@ static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
static nxt_msec_t nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data);
static void nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code,
- const char* fmt, ...);
+ const char* str);
static nxt_router_t *nxt_router;
@@ -444,6 +450,65 @@ nxt_router_ra_abort(nxt_task_t *task, void *obj, void *data)
}
+static void
+nxt_router_ra_error_handler(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_req_app_link_t *ra;
+
+ ra = obj;
+
+ nxt_router_ra_error(task, ra, ra->err_code, ra->err_str);
+}
+
+
+static void
+nxt_router_ra_error(nxt_task_t *task, nxt_req_app_link_t *ra, int code,
+ const char* str)
+{
+ nxt_conn_t *c;
+ nxt_req_conn_link_t *rc;
+ nxt_event_engine_t *engine;
+
+ engine = ra->work.data;
+
+ if (task->thread->engine != engine) {
+ ra->err_code = code;
+ ra->err_str = str;
+
+ ra->work.handler = nxt_router_ra_error_handler;
+ ra->work.task = &engine->task;
+ ra->work.next = NULL;
+
+ nxt_debug(task, "ra stream #%uD post error to %p", ra->stream, engine);
+
+ nxt_event_engine_post(engine, &ra->work);
+
+ return;
+ }
+
+ nxt_debug(task, "ra stream #%uD error", ra->stream);
+
+ rc = ra->rc;
+
+ if (rc != NULL) {
+ c = rc->conn;
+
+ nxt_router_gen_error(task, c, code, str);
+
+ rc->ra = NULL;
+ ra->rc = NULL;
+ }
+
+ if (ra->app_port != NULL) {
+ nxt_router_app_port_release(task, ra->app_port, 0, 1);
+
+ ra->app_port = NULL;
+ }
+
+ nxt_mp_release(ra->mem_pool, ra);
+}
+
+
nxt_inline void
nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc)
{
@@ -2382,12 +2447,11 @@ nxt_router_text_by_code(int code)
static nxt_buf_t *
nxt_router_get_error_buf(nxt_task_t *task, nxt_mp_t *mp, int code,
- const char* fmt, va_list args)
+ const char* str)
{
- nxt_buf_t *b, *last;
- const char *msg;
+ nxt_buf_t *b, *last;
- b = nxt_buf_mem_ts_alloc(task, mp, 16384);
+ b = nxt_buf_mem_alloc(mp, 16384, 0);
if (nxt_slow_path(b == NULL)) {
return NULL;
}
@@ -2398,23 +2462,17 @@ nxt_router_get_error_buf(nxt_task_t *task, nxt_mp_t *mp, int code,
"Connection: close\r\n\r\n",
code, nxt_router_text_by_code(code));
- msg = (const char *) b->mem.free;
+ b->mem.free = nxt_cpymem(b->mem.free, str, nxt_strlen(str));
- b->mem.free = nxt_vsprintf(b->mem.free, b->mem.end, fmt, args);
- b->mem.free[0] = '\0';
+ nxt_log_alert(task->log, "error %d: %s", code, str);
- nxt_log_alert(task->log, "error %d: %s", code, msg);
-
- last = nxt_buf_mem_ts_alloc(task, mp, 0);
+ last = nxt_buf_sync_alloc(mp, NXT_BUF_SYNC_LAST);
if (nxt_slow_path(last == NULL)) {
- nxt_mp_release(mp, b);
+ nxt_mp_free(mp, b);
return NULL;
}
- nxt_buf_set_sync(last);
- nxt_buf_set_last(last);
-
nxt_buf_chain_add(&b, last);
return b;
@@ -2424,24 +2482,20 @@ nxt_router_get_error_buf(nxt_task_t *task, nxt_mp_t *mp, int code,
static void
nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code,
- const char* fmt, ...)
+ const char* str)
{
- va_list args;
nxt_mp_t *mp;
nxt_buf_t *b;
- /* TODO: fix when called from main thread */
/* TODO: fix when called in the middle of response */
- mp = nxt_mp_create(1024, 128, 256, 32);
+ mp = c->mem_pool;
- va_start(args, fmt);
- b = nxt_router_get_error_buf(task, mp, code, fmt, args);
- va_end(args);
+ b = nxt_router_get_error_buf(task, mp, code, str);
if (c->socket.fd == -1) {
- nxt_mp_release(mp, b->next);
- nxt_mp_release(mp, b);
+ nxt_mp_free(mp, b->next);
+ nxt_mp_free(mp, b);
return;
}
@@ -3115,19 +3169,14 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra)
uint32_t request_failed;
nxt_int_t res;
nxt_port_t *port, *c_port, *reply_port;
- nxt_conn_t *c;
nxt_app_wmsg_t wmsg;
nxt_app_parse_ctx_t *ap;
- /* TODO: it is unsafe to use ra->rc and ra->rc->conn in main thread */
-
- nxt_assert(ra->rc != NULL);
nxt_assert(ra->app_port != NULL);
port = ra->app_port;
reply_port = ra->reply_port;
ap = ra->ap;
- c = ra->rc->conn;
request_failed = 1;
@@ -3137,8 +3186,9 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra)
res = nxt_port_send_port(task, port, reply_port, 0);
if (nxt_slow_path(res != NXT_OK)) {
- nxt_router_gen_error(task, c, 500,
- "Failed to send reply port to application");
+ nxt_router_ra_error(task, ra, 500,
+ "Failed to send reply port to application");
+ ra = NULL;
goto release_port;
}
@@ -3153,8 +3203,9 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra)
res = port->app->prepare_msg(task, &ap->r, &wmsg);
if (nxt_slow_path(res != NXT_OK)) {
- nxt_router_gen_error(task, c, 500,
- "Failed to prepare message for application");
+ nxt_router_ra_error(task, ra, 500,
+ "Failed to prepare message for application");
+ ra = NULL;
goto release_port;
}
@@ -3168,20 +3219,23 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra)
-1, ra->stream, reply_port->id, wmsg.write);
if (nxt_slow_path(res != NXT_OK)) {
- nxt_router_gen_error(task, c, 500,
- "Failed to send message to application");
+ nxt_router_ra_error(task, ra, 500,
+ "Failed to send message to application");
+ ra = NULL;
goto release_port;
}
release_port:
- if (request_failed != 0) {
- ra->app_port = 0;
- }
-
nxt_router_app_port_release(task, port, request_failed, 0);
- nxt_router_ra_release(task, ra, ra->work.data);
+ if (ra != NULL) {
+ if (request_failed != 0) {
+ ra->app_port = 0;
+ }
+
+ nxt_router_ra_release(task, ra, ra->work.data);
+ }
}