summaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorIgor Sysoev <igor@sysoev.ru>2019-11-14 16:39:48 +0300
committerIgor Sysoev <igor@sysoev.ru>2019-11-14 16:39:48 +0300
commit57e326b4119863f737d8677adc69dc53c7e4ed27 (patch)
treef80e27a6ea8ed77bbfa8d94ce5c3b98b42e5188b /src
parent643c433f8eb72cfe7d29d4f624888df646480477 (diff)
downloadunit-57e326b4119863f737d8677adc69dc53c7e4ed27.tar.gz
unit-57e326b4119863f737d8677adc69dc53c7e4ed27.tar.bz2
Introduced chained buffer completion handlers.
Diffstat (limited to 'src')
-rw-r--r--src/nxt_buf.c32
-rw-r--r--src/nxt_event_engine.c14
-rw-r--r--src/nxt_h1proto.c1
-rw-r--r--src/nxt_h1proto_websocket.c1
-rw-r--r--src/nxt_http_request.c11
-rw-r--r--src/nxt_http_websocket.c1
-rw-r--r--src/nxt_port_socket.c22
-rw-r--r--src/nxt_router.c9
-rw-r--r--src/nxt_sendbuf.c53
9 files changed, 113 insertions, 31 deletions
diff --git a/src/nxt_buf.c b/src/nxt_buf.c
index 2d52efca..af3f1243 100644
--- a/src/nxt_buf.c
+++ b/src/nxt_buf.c
@@ -195,7 +195,7 @@ static void
nxt_buf_completion(nxt_task_t *task, void *obj, void *data)
{
nxt_mp_t *mp;
- nxt_buf_t *b, *parent;
+ nxt_buf_t *b, *next, *parent;
b = obj;
parent = data;
@@ -204,10 +204,17 @@ nxt_buf_completion(nxt_task_t *task, void *obj, void *data)
nxt_assert(data == b->parent);
- mp = b->data;
- nxt_mp_free(mp, b);
+ do {
+ next = b->next;
+ parent = b->parent;
+ mp = b->data;
- nxt_buf_parent_completion(task, parent);
+ nxt_mp_free(mp, b);
+
+ nxt_buf_parent_completion(task, parent);
+
+ b = next;
+ } while (b != NULL);
}
@@ -262,7 +269,7 @@ static void
nxt_buf_ts_completion(nxt_task_t *task, void *obj, void *data)
{
nxt_mp_t *mp;
- nxt_buf_t *b, *parent;
+ nxt_buf_t *b, *next, *parent;
b = obj;
parent = data;
@@ -275,11 +282,18 @@ nxt_buf_ts_completion(nxt_task_t *task, void *obj, void *data)
nxt_assert(data == b->parent);
- mp = b->data;
- nxt_mp_free(mp, b);
- nxt_mp_release(mp);
+ do {
+ next = b->next;
+ parent = b->parent;
+ mp = b->data;
+
+ nxt_mp_free(mp, b);
+ nxt_mp_release(mp);
+
+ nxt_buf_parent_completion(task, parent);
- nxt_buf_parent_completion(task, parent);
+ b = next;
+ } while (b != NULL);
}
diff --git a/src/nxt_event_engine.c b/src/nxt_event_engine.c
index c46158b7..6f051067 100644
--- a/src/nxt_event_engine.c
+++ b/src/nxt_event_engine.c
@@ -721,7 +721,7 @@ void
nxt_event_engine_buf_mem_completion(nxt_task_t *task, void *obj, void *data)
{
nxt_event_engine_t *engine;
- nxt_buf_t *b, *parent;
+ nxt_buf_t *b, *next, *parent;
b = obj;
parent = data;
@@ -729,9 +729,17 @@ nxt_event_engine_buf_mem_completion(nxt_task_t *task, void *obj, void *data)
nxt_debug(task, "buf completion: %p %p", b, b->mem.start);
engine = b->data;
- nxt_event_engine_buf_mem_free(engine, b);
- nxt_buf_parent_completion(task, parent);
+ do {
+ next = b->next;
+ parent = b->parent;
+
+ nxt_event_engine_buf_mem_free(engine, b);
+
+ nxt_buf_parent_completion(task, parent);
+
+ b = next;
+ } while (b != NULL);
}
diff --git a/src/nxt_h1proto.c b/src/nxt_h1proto.c
index 0e70a670..c2866ccf 100644
--- a/src/nxt_h1proto.c
+++ b/src/nxt_h1proto.c
@@ -1219,6 +1219,7 @@ nxt_h1p_complete_buffers(nxt_task_t *task, nxt_h1proto_t *h1p)
while (b != NULL) {
next = b->next;
+ b->next = NULL;
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
b->completion_handler, task, b, b->parent);
diff --git a/src/nxt_h1proto_websocket.c b/src/nxt_h1proto_websocket.c
index cd67a8a6..c9ff899c 100644
--- a/src/nxt_h1proto_websocket.c
+++ b/src/nxt_h1proto_websocket.c
@@ -697,6 +697,7 @@ nxt_h1p_conn_ws_pong(nxt_task_t *task, void *obj, void *data)
for (i = 0; i < payload_len; i++) {
while (nxt_buf_mem_used_size(&b->mem) == 0) {
next = b->next;
+ b->next = NULL;
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
b->completion_handler, task, b, b->parent);
diff --git a/src/nxt_http_request.c b/src/nxt_http_request.c
index 394411a9..d40393b7 100644
--- a/src/nxt_http_request.c
+++ b/src/nxt_http_request.c
@@ -483,15 +483,20 @@ nxt_http_buf_mem(nxt_task_t *task, nxt_http_request_t *r, size_t size)
static void
nxt_http_request_mem_buf_completion(nxt_task_t *task, void *obj, void *data)
{
- nxt_buf_t *b;
+ nxt_buf_t *b, *next;
nxt_http_request_t *r;
b = obj;
r = data;
- nxt_mp_free(r->mem_pool, b);
+ do {
+ next = b->next;
- nxt_mp_release(r->mem_pool);
+ nxt_mp_free(r->mem_pool, b);
+ nxt_mp_release(r->mem_pool);
+
+ b = next;
+ } while (b != NULL);
}
diff --git a/src/nxt_http_websocket.c b/src/nxt_http_websocket.c
index d58d615c..fb888f5d 100644
--- a/src/nxt_http_websocket.c
+++ b/src/nxt_http_websocket.c
@@ -88,6 +88,7 @@ nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data)
frame_size -= copy_size;
next = b->next;
+ b->next = NULL;
if (nxt_buf_mem_used_size(&b->mem) == 0) {
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c
index 4edc423a..9c7da970 100644
--- a/src/nxt_port_socket.c
+++ b/src/nxt_port_socket.c
@@ -478,7 +478,8 @@ static nxt_buf_t *
nxt_port_buf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b,
size_t sent, nxt_bool_t mmap_mode)
{
- size_t size;
+ size_t size;
+ nxt_buf_t *next;
while (b != NULL) {
@@ -528,7 +529,9 @@ nxt_port_buf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b,
nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
- b = b->next;
+ next = b->next;
+ b->next = NULL;
+ b = next;
}
return b;
@@ -796,7 +799,7 @@ static void
nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
nxt_port_recv_msg_t *msg)
{
- nxt_buf_t *b, *orig_b;
+ nxt_buf_t *b, *orig_b, *next;
nxt_port_recv_msg_t *fmsg;
if (nxt_slow_path(msg->size < sizeof(nxt_port_msg_t))) {
@@ -915,11 +918,15 @@ fmsg_failed:
*/
if (msg->buf == b) {
/* complete mmap buffers */
- for (; b != NULL; b = b->next) {
+ while (b != NULL) {
nxt_debug(task, "complete buffer %p", b);
nxt_work_queue_add(port->socket.read_work_queue,
b->completion_handler, task, b, b->parent);
+
+ next = b->next;
+ b->next = NULL;
+ b = next;
}
}
@@ -964,7 +971,7 @@ static void
nxt_port_error_handler(nxt_task_t *task, void *obj, void *data)
{
int use_delta;
- nxt_buf_t *b;
+ nxt_buf_t *b, *next;
nxt_port_t *port;
nxt_work_queue_t *wq;
nxt_port_send_msg_t *msg;
@@ -986,7 +993,10 @@ nxt_port_error_handler(nxt_task_t *task, void *obj, void *data)
nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) {
- for (b = msg->buf; b != NULL; b = b->next) {
+ for (b = msg->buf; b != NULL; b = next) {
+ next = b->next;
+ b->next = NULL;
+
if (nxt_buf_is_sync(b)) {
continue;
}
diff --git a/src/nxt_router.c b/src/nxt_router.c
index 9025c813..60ee77e5 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -537,6 +537,7 @@ nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info,
for (b = msg_info->buf; b != NULL; b = next) {
next = b->next;
+ b->next = NULL;
b->completion_handler = msg_info->completion_handler;
@@ -3498,7 +3499,7 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
void *data)
{
nxt_int_t ret;
- nxt_buf_t *b;
+ nxt_buf_t *b, *next;
nxt_port_t *app_port;
nxt_unit_field_t *f;
nxt_http_field_t *field;
@@ -3613,10 +3614,13 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
}
if (nxt_buf_mem_used_size(&b->mem) == 0) {
+ next = b->next;
+ b->next = NULL;
+
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
b->completion_handler, task, b, b->parent);
- b = b->next;
+ b = next;
}
if (b != NULL) {
@@ -5057,6 +5061,7 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r,
if (nxt_slow_path(buf == NULL)) {
while (out != NULL) {
buf = out->next;
+ out->next = NULL;
out->completion_handler(task, out, out->parent);
out = buf;
}
diff --git a/src/nxt_sendbuf.c b/src/nxt_sendbuf.c
index 16fe4724..94f8e9eb 100644
--- a/src/nxt_sendbuf.c
+++ b/src/nxt_sendbuf.c
@@ -9,6 +9,8 @@
static nxt_bool_t nxt_sendbuf_copy(nxt_buf_mem_t *bm, nxt_buf_t *b,
size_t *copied);
+static nxt_buf_t *nxt_sendbuf_coalesce_completion(nxt_task_t *task,
+ nxt_work_queue_t *wq, nxt_buf_t *start);
nxt_uint_t
@@ -380,15 +382,11 @@ nxt_sendbuf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b)
{
while (b != NULL) {
- nxt_prefetch(b->next);
-
if (!nxt_buf_is_sync(b) && nxt_buf_used_size(b) != 0) {
break;
}
- nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
-
- b = b->next;
+ b = nxt_sendbuf_coalesce_completion(task, wq, b);
}
return b;
@@ -399,10 +397,49 @@ void
nxt_sendbuf_drain(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b)
{
while (b != NULL) {
- nxt_prefetch(b->next);
+ b = nxt_sendbuf_coalesce_completion(task, wq, b);
+ }
+}
- nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
- b = b->next;
+static nxt_buf_t *
+nxt_sendbuf_coalesce_completion(nxt_task_t *task, nxt_work_queue_t *wq,
+ nxt_buf_t *start)
+{
+ nxt_buf_t *b, *next, **last, *rest, **last_rest;
+ nxt_work_handler_t handler;
+
+ rest = NULL;
+ last_rest = &rest;
+ last = &start->next;
+ b = start;
+ handler = b->completion_handler;
+
+ for ( ;; ) {
+ next = b->next;
+ if (next == NULL) {
+ break;
+ }
+
+ b->next = NULL;
+ b = next;
+
+ if (!nxt_buf_is_sync(b) && nxt_buf_used_size(b) != 0) {
+ *last_rest = b;
+ break;
+ }
+
+ if (handler == b->completion_handler) {
+ *last = b;
+ last = &b->next;
+
+ } else {
+ *last_rest = b;
+ last_rest = &b->next;
+ }
}
+
+ nxt_work_queue_add(wq, handler, task, start, start->parent);
+
+ return rest;
}