diff options
Diffstat (limited to 'src/nxt_router.c')
-rw-r--r-- | src/nxt_router.c | 234 |
1 files changed, 103 insertions, 131 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c index 103ba78c..f637c864 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -23,6 +23,13 @@ typedef struct { } nxt_router_listener_conf_t; +typedef struct nxt_msg_info_s { + nxt_buf_t *buf; + nxt_port_mmap_tracking_t tracking; + nxt_work_handler_t completion_handler; +} nxt_msg_info_t; + + typedef struct nxt_req_app_link_s nxt_req_app_link_t; @@ -32,6 +39,7 @@ typedef struct { nxt_app_t *app; nxt_port_t *app_port; nxt_app_parse_ctx_t *ap; + nxt_msg_info_t msg_info; nxt_req_app_link_t *ra; nxt_queue_link_t link; /* for nxt_conn_t.requests */ @@ -44,6 +52,7 @@ struct nxt_req_app_link_s { nxt_pid_t app_pid; nxt_port_t *reply_port; nxt_app_parse_ctx_t *ap; + nxt_msg_info_t msg_info; nxt_req_conn_link_t *rc; nxt_queue_link_t link; /* for nxt_app_t.requests */ @@ -64,9 +73,6 @@ typedef struct { static nxt_int_t nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app); -static void nxt_router_ra_error(nxt_task_t *task, nxt_req_app_link_t *ra, - int code, const char* str); - static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task); static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data); static void nxt_router_conf_ready(nxt_task_t *task, @@ -352,93 +358,87 @@ nxt_router_ra_create(nxt_task_t *task, nxt_req_app_link_t *ra_src) } -static void -nxt_router_ra_release(nxt_task_t *task, void *obj, void *data) +nxt_inline nxt_bool_t +nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info, + uint32_t stream) { - nxt_req_app_link_t *ra; - nxt_event_engine_t *engine; - nxt_req_conn_link_t *rc; + nxt_buf_t *b, *next; + nxt_bool_t cancelled; - ra = obj; - engine = data; - - if (task->thread->engine != engine) { - if (ra->app_port != NULL) { - ra->app_pid = ra->app_port->pid; - } - - ra->work.handler = nxt_router_ra_release; - ra->work.task = &engine->task; - ra->work.next = NULL; + if (msg_info->buf == NULL) { + return 0; + } - nxt_debug(task, "ra stream #%uD post release to %p", - ra->stream, engine); + cancelled = nxt_port_mmap_tracking_cancel(task, &msg_info->tracking, + stream); - nxt_event_engine_post(engine, &ra->work); - - return; + if (cancelled) { + nxt_debug(task, "stream #%uD: cancelled by router", stream); } - nxt_debug(task, "ra stream #%uD release", ra->stream); + for (b = msg_info->buf; b != NULL; b = next) { + next = b->next; - rc = ra->rc; + b->completion_handler = msg_info->completion_handler; - if (rc != NULL) { - if (ra->app_pid != -1) { - nxt_port_rpc_ex_set_peer(task, engine->port, rc, ra->app_pid); + if (b->is_port_mmap_sent) { + b->is_port_mmap_sent = cancelled == 0; + b->completion_handler(task, b, b->parent); } - - rc->app_port = ra->app_port; - - ra->app_port = NULL; - rc->ra = NULL; - ra->rc = NULL; } - if (ra->app_port != NULL) { - nxt_router_app_port_release(task, ra->app_port, 0, 1); - - ra->app_port = NULL; - } + msg_info->buf = NULL; - if (ra->mem_pool != NULL) { - nxt_mp_release(ra->mem_pool, ra); - } + return cancelled; } static void -nxt_router_ra_abort(nxt_task_t *task, void *obj, void *data) +nxt_router_ra_release(nxt_task_t *task, void *obj, void *data) { - nxt_conn_t *c; nxt_req_app_link_t *ra; - nxt_req_conn_link_t *rc; nxt_event_engine_t *engine; + nxt_req_conn_link_t *rc; ra = obj; engine = data; if (task->thread->engine != engine) { - ra->work.handler = nxt_router_ra_abort; + if (ra->app_port != NULL) { + ra->app_pid = ra->app_port->pid; + } + + ra->work.handler = nxt_router_ra_release; ra->work.task = &engine->task; ra->work.next = NULL; - nxt_debug(task, "ra stream #%uD post abort to %p", ra->stream, engine); + nxt_debug(task, "ra stream #%uD post release to %p", + ra->stream, engine); nxt_event_engine_post(engine, &ra->work); return; } - nxt_debug(task, "ra stream #%uD abort", ra->stream); + nxt_debug(task, "ra stream #%uD release", ra->stream); rc = ra->rc; if (rc != NULL) { - c = rc->conn; + if (ra->app_pid != -1) { + nxt_port_rpc_ex_set_peer(task, engine->port, rc, ra->app_pid); + } - nxt_router_gen_error(task, c, 500, - "Failed to start application worker"); + if (nxt_slow_path(ra->err_code != 0)) { + nxt_router_gen_error(task, rc->conn, ra->err_code, ra->err_str); + + } else { + rc->app_port = ra->app_port; + rc->msg_info = ra->msg_info; + + ra->app_port = NULL; + ra->msg_info.buf = NULL; + } rc->ra = NULL; ra->rc = NULL; @@ -450,70 +450,20 @@ nxt_router_ra_abort(nxt_task_t *task, void *obj, void *data) ra->app_port = NULL; } + nxt_router_msg_cancel(task, &ra->msg_info, ra->stream); + if (ra->mem_pool != NULL) { nxt_mp_release(ra->mem_pool, ra); } } -static void -nxt_router_ra_error_handler(nxt_task_t *task, void *obj, void *data) -{ - nxt_req_app_link_t *ra; - - ra = obj; - - nxt_router_ra_error(task, ra, ra->err_code, ra->err_str); -} - - -static void -nxt_router_ra_error(nxt_task_t *task, nxt_req_app_link_t *ra, int code, - const char* str) +nxt_inline void +nxt_router_ra_error(nxt_req_app_link_t *ra, int code, const char* str) { - nxt_conn_t *c; - nxt_req_conn_link_t *rc; - nxt_event_engine_t *engine; - - engine = ra->work.data; - - if (task->thread->engine != engine) { - ra->err_code = code; - ra->err_str = str; - - ra->work.handler = nxt_router_ra_error_handler; - ra->work.task = &engine->task; - ra->work.next = NULL; - - nxt_debug(task, "ra stream #%uD post error to %p", ra->stream, engine); - - nxt_event_engine_post(engine, &ra->work); - - return; - } - - nxt_debug(task, "ra stream #%uD error", ra->stream); - - rc = ra->rc; - - if (rc != NULL) { - c = rc->conn; - - nxt_router_gen_error(task, c, code, str); - - rc->ra = NULL; - ra->rc = NULL; - } - - if (ra->app_port != NULL) { - nxt_router_app_port_release(task, ra->app_port, 0, 1); - - ra->app_port = NULL; - } - - if (ra->mem_pool != NULL) { - nxt_mp_release(ra->mem_pool, ra); - } + ra->app_port = NULL; + ra->err_code = code; + ra->err_str = str; } @@ -528,6 +478,8 @@ nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc) rc->app_port = NULL; } + nxt_router_msg_cancel(task, &rc->msg_info, rc->stream); + ra = rc->ra; if (ra != NULL) { @@ -601,6 +553,10 @@ nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) return; } + nxt_debug(task, "nxt_router_conf_data_handler(%d): %*s", + nxt_buf_used_size(msg->buf), + nxt_buf_used_size(msg->buf), msg->buf->mem.pos); + b = nxt_buf_chk_make_plain(tmcf->conf->mem_pool, msg->buf, msg->size); nxt_assert(b != NULL); @@ -2417,8 +2373,6 @@ nxt_router_get_error_buf(nxt_task_t *task, nxt_mp_t *mp, int code, b->mem.free = nxt_cpymem(b->mem.free, str, nxt_strlen(str)); - nxt_log_alert(task->log, "error %d: %s", code, str); - last = nxt_buf_sync_alloc(mp, NXT_BUF_SYNC_LAST); if (nxt_slow_path(last == NULL)) { @@ -2442,13 +2396,16 @@ nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code, /* TODO: fix when called in the middle of response */ + nxt_log_alert(task->log, "error %d: %s", code, str); + + if (c->socket.fd == -1) { + return; + } + mp = c->mem_pool; b = nxt_router_get_error_buf(task, mp, code, str); - - if (c->socket.fd == -1) { - nxt_mp_free(mp, b->next); - nxt_mp_free(mp, b); + if (nxt_slow_path(b == NULL)) { return; } @@ -2533,7 +2490,8 @@ nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_debug(task, "app '%V' %p abort next stream #%uD", &app->name, app, ra->stream); - nxt_router_ra_abort(task, ra, ra->work.data); + nxt_router_ra_error(ra, 500, "Failed to start application worker"); + nxt_router_ra_release(task, ra, ra->work.data); } nxt_router_app_use(task, app, -1); @@ -3198,9 +3156,16 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, static void +nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj, void *data) +{ +} + + +static void nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra) { uint32_t request_failed; + nxt_buf_t *b; nxt_int_t res; nxt_port_t *port, *c_port, *reply_port; nxt_app_wmsg_t wmsg; @@ -3220,9 +3185,8 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra) res = nxt_port_send_port(task, port, reply_port, 0); if (nxt_slow_path(res != NXT_OK)) { - nxt_router_ra_error(task, ra, 500, + nxt_router_ra_error(ra, 500, "Failed to send reply port to application"); - ra = NULL; goto release_port; } @@ -3237,9 +3201,8 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra) res = port->app->prepare_msg(task, &ap->r, &wmsg); if (nxt_slow_path(res != NXT_OK)) { - nxt_router_ra_error(task, ra, 500, + nxt_router_ra_error(ra, 500, "Failed to prepare message for application"); - ra = NULL; goto release_port; } @@ -3249,13 +3212,28 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra) request_failed = 0; - res = nxt_port_socket_write(task, wmsg.port, NXT_PORT_MSG_DATA, - -1, ra->stream, reply_port->id, wmsg.write); + ra->msg_info.buf = wmsg.write; + ra->msg_info.completion_handler = wmsg.write->completion_handler; + + for (b = wmsg.write; b != NULL; b = b->next) { + b->completion_handler = nxt_router_dummy_buf_completion; + } + + res = nxt_port_mmap_get_tracking(task, port, &ra->msg_info.tracking, + ra->stream); + if (nxt_slow_path(res != NXT_OK)) { + nxt_router_ra_error(ra, 500, + "Failed to get tracking area"); + goto release_port; + } + + res = nxt_port_socket_twrite(task, wmsg.port, NXT_PORT_MSG_DATA, + -1, ra->stream, reply_port->id, wmsg.write, + &ra->msg_info.tracking); if (nxt_slow_path(res != NXT_OK)) { - nxt_router_ra_error(task, ra, 500, + nxt_router_ra_error(ra, 500, "Failed to send message to application"); - ra = NULL; goto release_port; } @@ -3263,13 +3241,7 @@ release_port: nxt_router_app_port_release(task, port, request_failed, 0); - if (ra != NULL) { - if (request_failed != 0) { - ra->app_port = 0; - } - - nxt_router_ra_release(task, ra, ra->work.data); - } + nxt_router_ra_release(task, ra, ra->work.data); } |