summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorIgor Sysoev <igor@sysoev.ru>2018-04-03 16:28:26 +0300
committerIgor Sysoev <igor@sysoev.ru>2018-04-03 16:28:26 +0300
commitfa04c05aa0e67e6964ab006935f939c6b27754a3 (patch)
tree969d3dac0bda996a487dc6e58e3b1e2ef7f91946
parent151160c1103166810daddf1e9cc7135cb6ccc321 (diff)
downloadunit-fa04c05aa0e67e6964ab006935f939c6b27754a3.tar.gz
unit-fa04c05aa0e67e6964ab006935f939c6b27754a3.tar.bz2
HTTP: using r->mem_pool retention counter for response buffers.
Diffstat (limited to '')
-rw-r--r--src/nxt_buf.c2
-rw-r--r--src/nxt_buf.h2
-rw-r--r--src/nxt_h1proto.c56
-rw-r--r--src/nxt_http.h13
-rw-r--r--src/nxt_http_error.c29
-rw-r--r--src/nxt_http_request.c69
-rw-r--r--src/nxt_router.c17
-rw-r--r--src/nxt_sendbuf.c15
-rw-r--r--src/nxt_sendbuf.h1
9 files changed, 154 insertions, 50 deletions
diff --git a/src/nxt_buf.c b/src/nxt_buf.c
index c764fecd..e2f1f047 100644
--- a/src/nxt_buf.c
+++ b/src/nxt_buf.c
@@ -144,7 +144,7 @@ nxt_buf_sync_alloc(nxt_mp_t *mp, nxt_uint_t flags)
{
nxt_buf_t *b;
- b = nxt_mp_zalloc(mp, NXT_BUF_SYNC_SIZE);
+ b = nxt_mp_zalloc(mp, NXT_BUF_MEM_SIZE);
if (nxt_fast_path(b != NULL)) {
b->data = mp;
diff --git a/src/nxt_buf.h b/src/nxt_buf.h
index 7a894d0f..d9d4ee1b 100644
--- a/src/nxt_buf.h
+++ b/src/nxt_buf.h
@@ -101,8 +101,8 @@ struct nxt_buf_s {
};
+#define NXT_BUF_SYNC_SIZE offsetof(nxt_buf_t, mem.free)
#define NXT_BUF_MEM_SIZE offsetof(nxt_buf_t, file)
-#define NXT_BUF_SYNC_SIZE NXT_BUF_MEM_SIZE
#define NXT_BUF_FILE_SIZE sizeof(nxt_buf_t)
#define NXT_BUF_MMAP_SIZE NXT_BUF_FILE_SIZE
#define NXT_BUF_PORT_MMAP_SIZE NXT_BUF_MEM_SIZE
diff --git a/src/nxt_h1proto.c b/src/nxt_h1proto.c
index a216dafb..e4993f79 100644
--- a/src/nxt_h1proto.c
+++ b/src/nxt_h1proto.c
@@ -21,9 +21,12 @@ static void nxt_h1p_request_header_send(nxt_task_t *task,
nxt_http_request_t *r);
static void nxt_h1p_request_send(nxt_task_t *task, nxt_http_request_t *r,
nxt_buf_t *out);
+nxt_inline void nxt_h1p_request_error(nxt_task_t *task, nxt_http_request_t *r);
static nxt_buf_t *nxt_h1p_chunk_create(nxt_task_t *task, nxt_http_request_t *r,
nxt_buf_t *out);
static void nxt_h1p_sent(nxt_task_t *task, void *obj, void *data);
+static void nxt_h1p_request_discard(nxt_task_t *task, nxt_http_request_t *r,
+ nxt_buf_t *last);
static void nxt_h1p_request_close(nxt_task_t *task, nxt_http_proto_t proto);
static void nxt_h1p_keepalive(nxt_task_t *task, nxt_h1proto_t *h1p,
nxt_conn_t *c);
@@ -68,6 +71,13 @@ const nxt_http_proto_send_t nxt_http_proto_send[3] = {
};
+const nxt_http_proto_discard_t nxt_http_proto_discard[3] = {
+ nxt_h1p_request_discard,
+ NULL,
+ NULL,
+};
+
+
const nxt_http_proto_close_t nxt_http_proto_close[3] = {
nxt_h1p_request_close,
NULL,
@@ -687,11 +697,9 @@ nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r)
} nxt_list_loop;
- header = nxt_buf_mem_alloc(r->mem_pool, size, 0);
+ header = nxt_http_buf_mem(task, r, size);
if (nxt_slow_path(header == NULL)) {
- /* The internal server error is set just for logging. */
- r->status = NXT_HTTP_INTERNAL_SERVER_ERROR;
- nxt_h1p_conn_close(task, h1p->conn, h1p);
+ nxt_h1p_request_error(task, r);
return;
}
@@ -738,6 +746,13 @@ nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r)
}
+nxt_inline void
+nxt_h1p_request_error(nxt_task_t *task, nxt_http_request_t *r)
+{
+ r->state->error_handler(task, r, r->proto.h1);
+}
+
+
static const nxt_conn_state_t nxt_h1p_send_state
nxt_aligned(64) =
{
@@ -764,7 +779,7 @@ nxt_h1p_request_send(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *out)
if (r->proto.h1->chunked) {
out = nxt_h1p_chunk_create(task, r, out);
if (nxt_slow_path(out == NULL)) {
- nxt_h1p_conn_error(task, c, c->socket.data);
+ nxt_h1p_request_error(task, r);
return;
}
}
@@ -796,7 +811,7 @@ nxt_h1p_chunk_create(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *out)
for (b = out; b != NULL; b = b->next) {
if (nxt_buf_is_last(b)) {
- tail = nxt_buf_mem_alloc(r->mem_pool, chunk_size, 0);
+ tail = nxt_http_buf_mem(task, r, chunk_size);
if (nxt_slow_path(tail == NULL)) {
return NULL;
}
@@ -821,7 +836,7 @@ nxt_h1p_chunk_create(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *out)
return out;
}
- header = nxt_buf_mem_alloc(r->mem_pool, chunk_size, 0);
+ header = nxt_http_buf_mem(task, r, chunk_size);
if (nxt_slow_path(header == NULL)) {
return NULL;
}
@@ -854,6 +869,31 @@ nxt_h1p_sent(nxt_task_t *task, void *obj, void *data)
static void
+nxt_h1p_request_discard(nxt_task_t *task, nxt_http_request_t *r,
+ nxt_buf_t *last)
+{
+ nxt_buf_t *b;
+ nxt_conn_t *c;
+ nxt_h1proto_t *h1p;
+ nxt_work_queue_t *wq;
+
+ nxt_debug(task, "h1p request discard");
+
+ h1p = r->proto.h1;
+ h1p->keepalive = 0;
+
+ c = h1p->conn;
+ b = c->write;
+ c->write = NULL;
+
+ wq = &task->thread->engine->fast_work_queue;
+
+ nxt_sendbuf_drain(task, wq, b);
+ nxt_sendbuf_drain(task, wq, last);
+}
+
+
+static void
nxt_h1p_request_close(nxt_task_t *task, nxt_http_proto_t proto)
{
nxt_conn_t *c;
@@ -967,7 +1007,7 @@ nxt_h1p_conn_close(nxt_task_t *task, void *obj, void *data)
r = h1p->request;
if (r != NULL) {
- r->state->error_handler(task, r, r->proto.h1);
+ nxt_h1p_request_error(task, r);
return;
}
}
diff --git a/src/nxt_http.h b/src/nxt_http.h
index 604c2883..be751994 100644
--- a/src/nxt_http.h
+++ b/src/nxt_http.h
@@ -122,6 +122,8 @@ struct nxt_http_request_s {
nxt_sockaddr_t *remote;
nxt_sockaddr_t *local;
+ nxt_buf_t *last;
+
nxt_http_response_t resp;
nxt_http_status_t status:16;
@@ -129,6 +131,7 @@ struct nxt_http_request_s {
uint8_t protocol; /* 2 bits */
uint8_t logged; /* 1 bit */
uint8_t header_sent; /* 1 bit */
+ uint8_t error; /* 1 bit */
};
@@ -140,6 +143,8 @@ typedef void (*nxt_http_proto_header_send_t)(nxt_task_t *task,
nxt_http_request_t *r);
typedef void (*nxt_http_proto_send_t)(nxt_task_t *task, nxt_http_request_t *r,
nxt_buf_t *out);
+typedef void (*nxt_http_proto_discard_t)(nxt_task_t *task,
+ nxt_http_request_t *r, nxt_buf_t *last);
typedef void (*nxt_http_proto_close_t)(nxt_task_t *task,
nxt_http_proto_t proto);
@@ -157,9 +162,10 @@ void nxt_http_request_local_addr(nxt_task_t *task, nxt_http_request_t *r);
void nxt_http_request_header_send(nxt_task_t *task, nxt_http_request_t *r);
void nxt_http_request_send(nxt_task_t *task, nxt_http_request_t *r,
nxt_buf_t *out);
-void nxt_http_request_release(nxt_task_t *task, nxt_http_request_t *r);
-nxt_buf_t *nxt_http_request_last_buffer(nxt_task_t *task,
- nxt_http_request_t *r);
+nxt_buf_t *nxt_http_buf_mem(nxt_task_t *task, nxt_http_request_t *r,
+ size_t size);
+nxt_buf_t *nxt_http_buf_last(nxt_http_request_t *r);
+void nxt_http_request_error_handler(nxt_task_t *task, void *obj, void *data);
void nxt_http_request_close_handler(nxt_task_t *task, void *obj, void *data);
nxt_int_t nxt_http_request_host(void *ctx, nxt_http_field_t *field,
@@ -177,6 +183,7 @@ extern const nxt_http_proto_body_read_t nxt_http_proto_body_read[];
extern const nxt_http_proto_local_addr_t nxt_http_proto_local_addr[];
extern const nxt_http_proto_header_send_t nxt_http_proto_header_send[];
extern const nxt_http_proto_send_t nxt_http_proto_send[];
+extern const nxt_http_proto_discard_t nxt_http_proto_discard[];
extern const nxt_http_proto_close_t nxt_http_proto_close[];
diff --git a/src/nxt_http_error.c b/src/nxt_http_error.c
index 5030264b..8b10f508 100644
--- a/src/nxt_http_error.c
+++ b/src/nxt_http_error.c
@@ -12,7 +12,7 @@ static void nxt_http_request_send_error_body(nxt_task_t *task, void *r,
void *data);
-static const nxt_http_request_state_t nxt_http_request_send_state;
+static const nxt_http_request_state_t nxt_http_request_send_error_body_state;
static const char error[] =
@@ -28,10 +28,12 @@ nxt_http_request_error(nxt_task_t *task, nxt_http_request_t *r,
nxt_debug(task, "http request error: %d", status);
- if (r->header_sent) {
+ if (r->header_sent || r->error) {
goto fail;
}
+ r->error = (status == NXT_HTTP_INTERNAL_SERVER_ERROR);
+
r->status = status;
r->resp.fields = nxt_list_create(r->mem_pool, 8, sizeof(nxt_http_field_t));
@@ -49,36 +51,36 @@ nxt_http_request_error(nxt_task_t *task, nxt_http_request_t *r,
r->resp.content_length = NULL;
r->resp.content_length_n = sizeof(error) - 1;
- r->state = &nxt_http_request_send_state;
+ r->state = &nxt_http_request_send_error_body_state;
nxt_http_request_header_send(task, r);
return;
fail:
- nxt_http_request_release(task, r);
+ nxt_http_request_error_handler(task, r, r->proto.any);
}
-static const nxt_http_request_state_t nxt_http_request_send_state
+static const nxt_http_request_state_t nxt_http_request_send_error_body_state
nxt_aligned(64) =
{
.ready_handler = nxt_http_request_send_error_body,
- .error_handler = nxt_http_request_close_handler,
+ .error_handler = nxt_http_request_error_handler,
};
static void
nxt_http_request_send_error_body(nxt_task_t *task, void *obj, void *data)
{
- nxt_buf_t *out, *last;
+ nxt_buf_t *out;
nxt_http_request_t *r;
r = obj;
nxt_debug(task, "http request send error body");
- out = nxt_buf_mem_alloc(r->mem_pool, 0, 0);
+ out = nxt_http_buf_mem(task, r, 0);
if (nxt_slow_path(out == NULL)) {
goto fail;
}
@@ -88,18 +90,13 @@ nxt_http_request_send_error_body(nxt_task_t *task, void *obj, void *data)
out->mem.free = out->mem.start + sizeof(error) - 1;
out->mem.end = out->mem.free;
- last = nxt_http_request_last_buffer(task, r);
- if (nxt_slow_path(last == NULL)) {
- goto fail;
- }
-
- out->next = last;
+ out->next = nxt_http_buf_last(r);
nxt_http_request_send(task, r, out);
return;
fail:
- // TODO
- nxt_http_request_release(task, r);
+
+ nxt_http_request_error_handler(task, r, r->proto.any);
}
diff --git a/src/nxt_http_request.c b/src/nxt_http_request.c
index 40cbaf40..b1d52dd5 100644
--- a/src/nxt_http_request.c
+++ b/src/nxt_http_request.c
@@ -10,6 +10,8 @@
static void nxt_http_request_start(nxt_task_t *task, void *obj, void *data);
static void nxt_http_app_request(nxt_task_t *task, void *obj, void *data);
+static void nxt_http_request_mem_buf_completion(nxt_task_t *task, void *obj,
+ void *data);
static void nxt_http_request_done(nxt_task_t *task, void *obj, void *data);
static u_char *nxt_http_date(u_char *buf, nxt_realtime_t *now, struct tm *tm,
@@ -82,6 +84,7 @@ nxt_http_request_t *
nxt_http_request_create(nxt_task_t *task)
{
nxt_mp_t *mp;
+ nxt_buf_t *last;
nxt_http_request_t *r;
mp = nxt_mp_create(1024, 128, 256, 32);
@@ -99,6 +102,17 @@ nxt_http_request_create(nxt_task_t *task)
goto fail;
}
+ last = nxt_mp_zget(mp, NXT_BUF_SYNC_SIZE);
+ if (nxt_slow_path(last == NULL)) {
+ goto fail;
+ }
+
+ nxt_buf_set_sync(last);
+ nxt_buf_set_last(last);
+ last->completion_handler = nxt_http_request_done;
+ last->parent = r;
+ r->last = last;
+
r->mem_pool = mp;
r->content_length_n = -1;
r->resp.content_length_n = -1;
@@ -109,6 +123,7 @@ nxt_http_request_create(nxt_task_t *task)
fail:
nxt_mp_release(mp);
+
return NULL;
}
@@ -349,20 +364,18 @@ nxt_http_request_send(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *out)
nxt_buf_t *
-nxt_http_request_last_buffer(nxt_task_t *task, nxt_http_request_t *r)
+nxt_http_buf_mem(nxt_task_t *task, nxt_http_request_t *r, size_t size)
{
nxt_buf_t *b;
- b = nxt_buf_mem_alloc(r->mem_pool, 0, 0);
-
+ b = nxt_buf_mem_alloc(r->mem_pool, size, 0);
if (nxt_fast_path(b != NULL)) {
- nxt_buf_set_sync(b);
- nxt_buf_set_last(b);
- b->completion_handler = nxt_http_request_done;
+ b->completion_handler = nxt_http_request_mem_buf_completion;
b->parent = r;
+ nxt_mp_retain(r->mem_pool);
} else {
- nxt_http_request_release(task, r);
+ nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
}
return b;
@@ -370,6 +383,33 @@ nxt_http_request_last_buffer(nxt_task_t *task, nxt_http_request_t *r)
static void
+nxt_http_request_mem_buf_completion(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_buf_t *b;
+ nxt_http_request_t *r;
+
+ b = obj;
+ r = data;
+
+ nxt_mp_free(r->mem_pool, b);
+
+ nxt_mp_release(r->mem_pool);
+}
+
+
+nxt_buf_t *
+nxt_http_buf_last(nxt_http_request_t *r)
+{
+ nxt_buf_t *last;
+
+ last = r->last;
+ r->last = NULL;
+
+ return last;
+}
+
+
+static void
nxt_http_request_done(nxt_task_t *task, void *obj, void *data)
{
nxt_http_request_t *r;
@@ -383,12 +423,19 @@ nxt_http_request_done(nxt_task_t *task, void *obj, void *data)
void
-nxt_http_request_release(nxt_task_t *task, nxt_http_request_t *r)
+nxt_http_request_error_handler(nxt_task_t *task, void *obj, void *data)
{
- nxt_debug(task, "http request release");
+ nxt_http_proto_t proto;
+ nxt_http_request_t *r;
+
+ r = obj;
+ proto.any = data;
+
+ nxt_debug(task, "http request error handler");
- nxt_work_queue_add(&task->thread->engine->fast_work_queue,
- nxt_http_request_close_handler, task, r, r->proto.any);
+ if (proto.any != NULL) {
+ nxt_http_proto_discard[r->protocol](task, r, nxt_http_buf_last(r));
+ }
}
diff --git a/src/nxt_router.c b/src/nxt_router.c
index 57e3cff1..994ba4af 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -2636,7 +2636,7 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
{
size_t dump_size;
nxt_int_t ret;
- nxt_buf_t *b, *last;
+ nxt_buf_t *b;
nxt_http_request_t *r;
nxt_req_conn_link_t *rc;
nxt_app_parse_ctx_t *ar;
@@ -2663,17 +2663,16 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
return;
}
+ if (ar->request->error) {
+ nxt_app_http_req_done(task, ar);
+ nxt_router_rc_unlink(task, rc);
+ return;
+ }
+
if (msg->port_msg.last != 0) {
nxt_debug(task, "router data create last buf");
- last = nxt_http_request_last_buffer(task, ar->request);
- if (nxt_slow_path(last == NULL)) {
- nxt_app_http_req_done(task, ar);
- nxt_router_rc_unlink(task, rc);
- return;
- }
-
- nxt_buf_chain_add(&b, last);
+ nxt_buf_chain_add(&b, nxt_http_buf_last(ar->request));
nxt_router_rc_unlink(task, rc);
diff --git a/src/nxt_sendbuf.c b/src/nxt_sendbuf.c
index ed6fca36..16fe4724 100644
--- a/src/nxt_sendbuf.c
+++ b/src/nxt_sendbuf.c
@@ -382,7 +382,7 @@ nxt_sendbuf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b)
nxt_prefetch(b->next);
- if (nxt_buf_used_size(b) != 0) {
+ if (!nxt_buf_is_sync(b) && nxt_buf_used_size(b) != 0) {
break;
}
@@ -393,3 +393,16 @@ nxt_sendbuf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b)
return b;
}
+
+
+void
+nxt_sendbuf_drain(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b)
+{
+ while (b != NULL) {
+ nxt_prefetch(b->next);
+
+ nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
+
+ b = b->next;
+ }
+}
diff --git a/src/nxt_sendbuf.h b/src/nxt_sendbuf.h
index 46d2a0fe..b76ee06a 100644
--- a/src/nxt_sendbuf.h
+++ b/src/nxt_sendbuf.h
@@ -124,6 +124,7 @@ ssize_t nxt_sendbuf_copy_coalesce(nxt_conn_t *c, nxt_buf_mem_t *bm,
nxt_buf_t *nxt_sendbuf_update(nxt_buf_t *b, size_t sent);
nxt_buf_t *nxt_sendbuf_completion(nxt_task_t *task, nxt_work_queue_t *wq,
nxt_buf_t *b);
+void nxt_sendbuf_drain(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b);
#endif /* _NXT_SENDBUF_H_INCLUDED_ */