summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_router.c
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2017-12-27 17:46:17 +0300
committerMax Romanov <max.romanov@nginx.com>2017-12-27 17:46:17 +0300
commit89c0f7c5db5003b8fd8df3e1babb0c802004bf4c (patch)
tree7ac164d6fe41fd76beb3d328e6fde4020f744b63 /src/nxt_router.c
parent45d08d5145c63fd788f85d9e789314dcf093c99e (diff)
downloadunit-89c0f7c5db5003b8fd8df3e1babb0c802004bf4c.tar.gz
unit-89c0f7c5db5003b8fd8df3e1babb0c802004bf4c.tar.bz2
Implementing the ability to cancel request before worker starts processing it.
Diffstat (limited to 'src/nxt_router.c')
-rw-r--r--src/nxt_router.c234
1 files changed, 103 insertions, 131 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c
index 103ba78c..f637c864 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -23,6 +23,13 @@ typedef struct {
} nxt_router_listener_conf_t;
+typedef struct nxt_msg_info_s {
+ nxt_buf_t *buf;
+ nxt_port_mmap_tracking_t tracking;
+ nxt_work_handler_t completion_handler;
+} nxt_msg_info_t;
+
+
typedef struct nxt_req_app_link_s nxt_req_app_link_t;
@@ -32,6 +39,7 @@ typedef struct {
nxt_app_t *app;
nxt_port_t *app_port;
nxt_app_parse_ctx_t *ap;
+ nxt_msg_info_t msg_info;
nxt_req_app_link_t *ra;
nxt_queue_link_t link; /* for nxt_conn_t.requests */
@@ -44,6 +52,7 @@ struct nxt_req_app_link_s {
nxt_pid_t app_pid;
nxt_port_t *reply_port;
nxt_app_parse_ctx_t *ap;
+ nxt_msg_info_t msg_info;
nxt_req_conn_link_t *rc;
nxt_queue_link_t link; /* for nxt_app_t.requests */
@@ -64,9 +73,6 @@ typedef struct {
static nxt_int_t nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app);
-static void nxt_router_ra_error(nxt_task_t *task, nxt_req_app_link_t *ra,
- int code, const char* str);
-
static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
static void nxt_router_conf_ready(nxt_task_t *task,
@@ -352,93 +358,87 @@ nxt_router_ra_create(nxt_task_t *task, nxt_req_app_link_t *ra_src)
}
-static void
-nxt_router_ra_release(nxt_task_t *task, void *obj, void *data)
+nxt_inline nxt_bool_t
+nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info,
+ uint32_t stream)
{
- nxt_req_app_link_t *ra;
- nxt_event_engine_t *engine;
- nxt_req_conn_link_t *rc;
+ nxt_buf_t *b, *next;
+ nxt_bool_t cancelled;
- ra = obj;
- engine = data;
-
- if (task->thread->engine != engine) {
- if (ra->app_port != NULL) {
- ra->app_pid = ra->app_port->pid;
- }
-
- ra->work.handler = nxt_router_ra_release;
- ra->work.task = &engine->task;
- ra->work.next = NULL;
+ if (msg_info->buf == NULL) {
+ return 0;
+ }
- nxt_debug(task, "ra stream #%uD post release to %p",
- ra->stream, engine);
+ cancelled = nxt_port_mmap_tracking_cancel(task, &msg_info->tracking,
+ stream);
- nxt_event_engine_post(engine, &ra->work);
-
- return;
+ if (cancelled) {
+ nxt_debug(task, "stream #%uD: cancelled by router", stream);
}
- nxt_debug(task, "ra stream #%uD release", ra->stream);
+ for (b = msg_info->buf; b != NULL; b = next) {
+ next = b->next;
- rc = ra->rc;
+ b->completion_handler = msg_info->completion_handler;
- if (rc != NULL) {
- if (ra->app_pid != -1) {
- nxt_port_rpc_ex_set_peer(task, engine->port, rc, ra->app_pid);
+ if (b->is_port_mmap_sent) {
+ b->is_port_mmap_sent = cancelled == 0;
+ b->completion_handler(task, b, b->parent);
}
-
- rc->app_port = ra->app_port;
-
- ra->app_port = NULL;
- rc->ra = NULL;
- ra->rc = NULL;
}
- if (ra->app_port != NULL) {
- nxt_router_app_port_release(task, ra->app_port, 0, 1);
-
- ra->app_port = NULL;
- }
+ msg_info->buf = NULL;
- if (ra->mem_pool != NULL) {
- nxt_mp_release(ra->mem_pool, ra);
- }
+ return cancelled;
}
static void
-nxt_router_ra_abort(nxt_task_t *task, void *obj, void *data)
+nxt_router_ra_release(nxt_task_t *task, void *obj, void *data)
{
- nxt_conn_t *c;
nxt_req_app_link_t *ra;
- nxt_req_conn_link_t *rc;
nxt_event_engine_t *engine;
+ nxt_req_conn_link_t *rc;
ra = obj;
engine = data;
if (task->thread->engine != engine) {
- ra->work.handler = nxt_router_ra_abort;
+ if (ra->app_port != NULL) {
+ ra->app_pid = ra->app_port->pid;
+ }
+
+ ra->work.handler = nxt_router_ra_release;
ra->work.task = &engine->task;
ra->work.next = NULL;
- nxt_debug(task, "ra stream #%uD post abort to %p", ra->stream, engine);
+ nxt_debug(task, "ra stream #%uD post release to %p",
+ ra->stream, engine);
nxt_event_engine_post(engine, &ra->work);
return;
}
- nxt_debug(task, "ra stream #%uD abort", ra->stream);
+ nxt_debug(task, "ra stream #%uD release", ra->stream);
rc = ra->rc;
if (rc != NULL) {
- c = rc->conn;
+ if (ra->app_pid != -1) {
+ nxt_port_rpc_ex_set_peer(task, engine->port, rc, ra->app_pid);
+ }
- nxt_router_gen_error(task, c, 500,
- "Failed to start application worker");
+ if (nxt_slow_path(ra->err_code != 0)) {
+ nxt_router_gen_error(task, rc->conn, ra->err_code, ra->err_str);
+
+ } else {
+ rc->app_port = ra->app_port;
+ rc->msg_info = ra->msg_info;
+
+ ra->app_port = NULL;
+ ra->msg_info.buf = NULL;
+ }
rc->ra = NULL;
ra->rc = NULL;
@@ -450,70 +450,20 @@ nxt_router_ra_abort(nxt_task_t *task, void *obj, void *data)
ra->app_port = NULL;
}
+ nxt_router_msg_cancel(task, &ra->msg_info, ra->stream);
+
if (ra->mem_pool != NULL) {
nxt_mp_release(ra->mem_pool, ra);
}
}
-static void
-nxt_router_ra_error_handler(nxt_task_t *task, void *obj, void *data)
-{
- nxt_req_app_link_t *ra;
-
- ra = obj;
-
- nxt_router_ra_error(task, ra, ra->err_code, ra->err_str);
-}
-
-
-static void
-nxt_router_ra_error(nxt_task_t *task, nxt_req_app_link_t *ra, int code,
- const char* str)
+nxt_inline void
+nxt_router_ra_error(nxt_req_app_link_t *ra, int code, const char* str)
{
- nxt_conn_t *c;
- nxt_req_conn_link_t *rc;
- nxt_event_engine_t *engine;
-
- engine = ra->work.data;
-
- if (task->thread->engine != engine) {
- ra->err_code = code;
- ra->err_str = str;
-
- ra->work.handler = nxt_router_ra_error_handler;
- ra->work.task = &engine->task;
- ra->work.next = NULL;
-
- nxt_debug(task, "ra stream #%uD post error to %p", ra->stream, engine);
-
- nxt_event_engine_post(engine, &ra->work);
-
- return;
- }
-
- nxt_debug(task, "ra stream #%uD error", ra->stream);
-
- rc = ra->rc;
-
- if (rc != NULL) {
- c = rc->conn;
-
- nxt_router_gen_error(task, c, code, str);
-
- rc->ra = NULL;
- ra->rc = NULL;
- }
-
- if (ra->app_port != NULL) {
- nxt_router_app_port_release(task, ra->app_port, 0, 1);
-
- ra->app_port = NULL;
- }
-
- if (ra->mem_pool != NULL) {
- nxt_mp_release(ra->mem_pool, ra);
- }
+ ra->app_port = NULL;
+ ra->err_code = code;
+ ra->err_str = str;
}
@@ -528,6 +478,8 @@ nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc)
rc->app_port = NULL;
}
+ nxt_router_msg_cancel(task, &rc->msg_info, rc->stream);
+
ra = rc->ra;
if (ra != NULL) {
@@ -601,6 +553,10 @@ nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
return;
}
+ nxt_debug(task, "nxt_router_conf_data_handler(%d): %*s",
+ nxt_buf_used_size(msg->buf),
+ nxt_buf_used_size(msg->buf), msg->buf->mem.pos);
+
b = nxt_buf_chk_make_plain(tmcf->conf->mem_pool, msg->buf, msg->size);
nxt_assert(b != NULL);
@@ -2417,8 +2373,6 @@ nxt_router_get_error_buf(nxt_task_t *task, nxt_mp_t *mp, int code,
b->mem.free = nxt_cpymem(b->mem.free, str, nxt_strlen(str));
- nxt_log_alert(task->log, "error %d: %s", code, str);
-
last = nxt_buf_sync_alloc(mp, NXT_BUF_SYNC_LAST);
if (nxt_slow_path(last == NULL)) {
@@ -2442,13 +2396,16 @@ nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code,
/* TODO: fix when called in the middle of response */
+ nxt_log_alert(task->log, "error %d: %s", code, str);
+
+ if (c->socket.fd == -1) {
+ return;
+ }
+
mp = c->mem_pool;
b = nxt_router_get_error_buf(task, mp, code, str);
-
- if (c->socket.fd == -1) {
- nxt_mp_free(mp, b->next);
- nxt_mp_free(mp, b);
+ if (nxt_slow_path(b == NULL)) {
return;
}
@@ -2533,7 +2490,8 @@ nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_debug(task, "app '%V' %p abort next stream #%uD",
&app->name, app, ra->stream);
- nxt_router_ra_abort(task, ra, ra->work.data);
+ nxt_router_ra_error(ra, 500, "Failed to start application worker");
+ nxt_router_ra_release(task, ra, ra->work.data);
}
nxt_router_app_use(task, app, -1);
@@ -3198,9 +3156,16 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
static void
+nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj, void *data)
+{
+}
+
+
+static void
nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra)
{
uint32_t request_failed;
+ nxt_buf_t *b;
nxt_int_t res;
nxt_port_t *port, *c_port, *reply_port;
nxt_app_wmsg_t wmsg;
@@ -3220,9 +3185,8 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra)
res = nxt_port_send_port(task, port, reply_port, 0);
if (nxt_slow_path(res != NXT_OK)) {
- nxt_router_ra_error(task, ra, 500,
+ nxt_router_ra_error(ra, 500,
"Failed to send reply port to application");
- ra = NULL;
goto release_port;
}
@@ -3237,9 +3201,8 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra)
res = port->app->prepare_msg(task, &ap->r, &wmsg);
if (nxt_slow_path(res != NXT_OK)) {
- nxt_router_ra_error(task, ra, 500,
+ nxt_router_ra_error(ra, 500,
"Failed to prepare message for application");
- ra = NULL;
goto release_port;
}
@@ -3249,13 +3212,28 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra)
request_failed = 0;
- res = nxt_port_socket_write(task, wmsg.port, NXT_PORT_MSG_DATA,
- -1, ra->stream, reply_port->id, wmsg.write);
+ ra->msg_info.buf = wmsg.write;
+ ra->msg_info.completion_handler = wmsg.write->completion_handler;
+
+ for (b = wmsg.write; b != NULL; b = b->next) {
+ b->completion_handler = nxt_router_dummy_buf_completion;
+ }
+
+ res = nxt_port_mmap_get_tracking(task, port, &ra->msg_info.tracking,
+ ra->stream);
+ if (nxt_slow_path(res != NXT_OK)) {
+ nxt_router_ra_error(ra, 500,
+ "Failed to get tracking area");
+ goto release_port;
+ }
+
+ res = nxt_port_socket_twrite(task, wmsg.port, NXT_PORT_MSG_DATA,
+ -1, ra->stream, reply_port->id, wmsg.write,
+ &ra->msg_info.tracking);
if (nxt_slow_path(res != NXT_OK)) {
- nxt_router_ra_error(task, ra, 500,
+ nxt_router_ra_error(ra, 500,
"Failed to send message to application");
- ra = NULL;
goto release_port;
}
@@ -3263,13 +3241,7 @@ release_port:
nxt_router_app_port_release(task, port, request_failed, 0);
- if (ra != NULL) {
- if (request_failed != 0) {
- ra->app_port = 0;
- }
-
- nxt_router_ra_release(task, ra, ra->work.data);
- }
+ nxt_router_ra_release(task, ra, ra->work.data);
}