diff options
-rw-r--r-- | src/nxt_buf.c | 32 | ||||
-rw-r--r-- | src/nxt_event_engine.c | 14 | ||||
-rw-r--r-- | src/nxt_h1proto.c | 1 | ||||
-rw-r--r-- | src/nxt_h1proto_websocket.c | 1 | ||||
-rw-r--r-- | src/nxt_http_request.c | 11 | ||||
-rw-r--r-- | src/nxt_http_websocket.c | 1 | ||||
-rw-r--r-- | src/nxt_port_socket.c | 22 | ||||
-rw-r--r-- | src/nxt_router.c | 9 | ||||
-rw-r--r-- | src/nxt_sendbuf.c | 53 |
9 files changed, 113 insertions, 31 deletions
diff --git a/src/nxt_buf.c b/src/nxt_buf.c index 2d52efca..af3f1243 100644 --- a/src/nxt_buf.c +++ b/src/nxt_buf.c @@ -195,7 +195,7 @@ static void nxt_buf_completion(nxt_task_t *task, void *obj, void *data) { nxt_mp_t *mp; - nxt_buf_t *b, *parent; + nxt_buf_t *b, *next, *parent; b = obj; parent = data; @@ -204,10 +204,17 @@ nxt_buf_completion(nxt_task_t *task, void *obj, void *data) nxt_assert(data == b->parent); - mp = b->data; - nxt_mp_free(mp, b); + do { + next = b->next; + parent = b->parent; + mp = b->data; - nxt_buf_parent_completion(task, parent); + nxt_mp_free(mp, b); + + nxt_buf_parent_completion(task, parent); + + b = next; + } while (b != NULL); } @@ -262,7 +269,7 @@ static void nxt_buf_ts_completion(nxt_task_t *task, void *obj, void *data) { nxt_mp_t *mp; - nxt_buf_t *b, *parent; + nxt_buf_t *b, *next, *parent; b = obj; parent = data; @@ -275,11 +282,18 @@ nxt_buf_ts_completion(nxt_task_t *task, void *obj, void *data) nxt_assert(data == b->parent); - mp = b->data; - nxt_mp_free(mp, b); - nxt_mp_release(mp); + do { + next = b->next; + parent = b->parent; + mp = b->data; + + nxt_mp_free(mp, b); + nxt_mp_release(mp); + + nxt_buf_parent_completion(task, parent); - nxt_buf_parent_completion(task, parent); + b = next; + } while (b != NULL); } diff --git a/src/nxt_event_engine.c b/src/nxt_event_engine.c index c46158b7..6f051067 100644 --- a/src/nxt_event_engine.c +++ b/src/nxt_event_engine.c @@ -721,7 +721,7 @@ void nxt_event_engine_buf_mem_completion(nxt_task_t *task, void *obj, void *data) { nxt_event_engine_t *engine; - nxt_buf_t *b, *parent; + nxt_buf_t *b, *next, *parent; b = obj; parent = data; @@ -729,9 +729,17 @@ nxt_event_engine_buf_mem_completion(nxt_task_t *task, void *obj, void *data) nxt_debug(task, "buf completion: %p %p", b, b->mem.start); engine = b->data; - nxt_event_engine_buf_mem_free(engine, b); - nxt_buf_parent_completion(task, parent); + do { + next = b->next; + parent = b->parent; + + nxt_event_engine_buf_mem_free(engine, b); + + nxt_buf_parent_completion(task, parent); + + b = next; + } while (b != NULL); } diff --git a/src/nxt_h1proto.c b/src/nxt_h1proto.c index 0e70a670..c2866ccf 100644 --- a/src/nxt_h1proto.c +++ b/src/nxt_h1proto.c @@ -1219,6 +1219,7 @@ nxt_h1p_complete_buffers(nxt_task_t *task, nxt_h1proto_t *h1p) while (b != NULL) { next = b->next; + b->next = NULL; nxt_work_queue_add(&task->thread->engine->fast_work_queue, b->completion_handler, task, b, b->parent); diff --git a/src/nxt_h1proto_websocket.c b/src/nxt_h1proto_websocket.c index cd67a8a6..c9ff899c 100644 --- a/src/nxt_h1proto_websocket.c +++ b/src/nxt_h1proto_websocket.c @@ -697,6 +697,7 @@ nxt_h1p_conn_ws_pong(nxt_task_t *task, void *obj, void *data) for (i = 0; i < payload_len; i++) { while (nxt_buf_mem_used_size(&b->mem) == 0) { next = b->next; + b->next = NULL; nxt_work_queue_add(&task->thread->engine->fast_work_queue, b->completion_handler, task, b, b->parent); diff --git a/src/nxt_http_request.c b/src/nxt_http_request.c index 394411a9..d40393b7 100644 --- a/src/nxt_http_request.c +++ b/src/nxt_http_request.c @@ -483,15 +483,20 @@ nxt_http_buf_mem(nxt_task_t *task, nxt_http_request_t *r, size_t size) static void nxt_http_request_mem_buf_completion(nxt_task_t *task, void *obj, void *data) { - nxt_buf_t *b; + nxt_buf_t *b, *next; nxt_http_request_t *r; b = obj; r = data; - nxt_mp_free(r->mem_pool, b); + do { + next = b->next; - nxt_mp_release(r->mem_pool); + nxt_mp_free(r->mem_pool, b); + nxt_mp_release(r->mem_pool); + + b = next; + } while (b != NULL); } diff --git a/src/nxt_http_websocket.c b/src/nxt_http_websocket.c index d58d615c..fb888f5d 100644 --- a/src/nxt_http_websocket.c +++ b/src/nxt_http_websocket.c @@ -88,6 +88,7 @@ nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data) frame_size -= copy_size; next = b->next; + b->next = NULL; if (nxt_buf_mem_used_size(&b->mem) == 0) { nxt_work_queue_add(&task->thread->engine->fast_work_queue, diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c index 4edc423a..9c7da970 100644 --- a/src/nxt_port_socket.c +++ b/src/nxt_port_socket.c @@ -478,7 +478,8 @@ static nxt_buf_t * nxt_port_buf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b, size_t sent, nxt_bool_t mmap_mode) { - size_t size; + size_t size; + nxt_buf_t *next; while (b != NULL) { @@ -528,7 +529,9 @@ nxt_port_buf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b, nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent); - b = b->next; + next = b->next; + b->next = NULL; + b = next; } return b; @@ -796,7 +799,7 @@ static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, nxt_port_recv_msg_t *msg) { - nxt_buf_t *b, *orig_b; + nxt_buf_t *b, *orig_b, *next; nxt_port_recv_msg_t *fmsg; if (nxt_slow_path(msg->size < sizeof(nxt_port_msg_t))) { @@ -915,11 +918,15 @@ fmsg_failed: */ if (msg->buf == b) { /* complete mmap buffers */ - for (; b != NULL; b = b->next) { + while (b != NULL) { nxt_debug(task, "complete buffer %p", b); nxt_work_queue_add(port->socket.read_work_queue, b->completion_handler, task, b, b->parent); + + next = b->next; + b->next = NULL; + b = next; } } @@ -964,7 +971,7 @@ static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data) { int use_delta; - nxt_buf_t *b; + nxt_buf_t *b, *next; nxt_port_t *port; nxt_work_queue_t *wq; nxt_port_send_msg_t *msg; @@ -986,7 +993,10 @@ nxt_port_error_handler(nxt_task_t *task, void *obj, void *data) nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) { - for (b = msg->buf; b != NULL; b = b->next) { + for (b = msg->buf; b != NULL; b = next) { + next = b->next; + b->next = NULL; + if (nxt_buf_is_sync(b)) { continue; } diff --git a/src/nxt_router.c b/src/nxt_router.c index 9025c813..60ee77e5 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -537,6 +537,7 @@ nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info, for (b = msg_info->buf; b != NULL; b = next) { next = b->next; + b->next = NULL; b->completion_handler = msg_info->completion_handler; @@ -3498,7 +3499,7 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) { nxt_int_t ret; - nxt_buf_t *b; + nxt_buf_t *b, *next; nxt_port_t *app_port; nxt_unit_field_t *f; nxt_http_field_t *field; @@ -3613,10 +3614,13 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, } if (nxt_buf_mem_used_size(&b->mem) == 0) { + next = b->next; + b->next = NULL; + nxt_work_queue_add(&task->thread->engine->fast_work_queue, b->completion_handler, task, b, b->parent); - b = b->next; + b = next; } if (b != NULL) { @@ -5057,6 +5061,7 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r, if (nxt_slow_path(buf == NULL)) { while (out != NULL) { buf = out->next; + out->next = NULL; out->completion_handler(task, out, out->parent); out = buf; } diff --git a/src/nxt_sendbuf.c b/src/nxt_sendbuf.c index 16fe4724..94f8e9eb 100644 --- a/src/nxt_sendbuf.c +++ b/src/nxt_sendbuf.c @@ -9,6 +9,8 @@ static nxt_bool_t nxt_sendbuf_copy(nxt_buf_mem_t *bm, nxt_buf_t *b, size_t *copied); +static nxt_buf_t *nxt_sendbuf_coalesce_completion(nxt_task_t *task, + nxt_work_queue_t *wq, nxt_buf_t *start); nxt_uint_t @@ -380,15 +382,11 @@ nxt_sendbuf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b) { while (b != NULL) { - nxt_prefetch(b->next); - if (!nxt_buf_is_sync(b) && nxt_buf_used_size(b) != 0) { break; } - nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent); - - b = b->next; + b = nxt_sendbuf_coalesce_completion(task, wq, b); } return b; @@ -399,10 +397,49 @@ void nxt_sendbuf_drain(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b) { while (b != NULL) { - nxt_prefetch(b->next); + b = nxt_sendbuf_coalesce_completion(task, wq, b); + } +} - nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent); - b = b->next; +static nxt_buf_t * +nxt_sendbuf_coalesce_completion(nxt_task_t *task, nxt_work_queue_t *wq, + nxt_buf_t *start) +{ + nxt_buf_t *b, *next, **last, *rest, **last_rest; + nxt_work_handler_t handler; + + rest = NULL; + last_rest = &rest; + last = &start->next; + b = start; + handler = b->completion_handler; + + for ( ;; ) { + next = b->next; + if (next == NULL) { + break; + } + + b->next = NULL; + b = next; + + if (!nxt_buf_is_sync(b) && nxt_buf_used_size(b) != 0) { + *last_rest = b; + break; + } + + if (handler == b->completion_handler) { + *last = b; + last = &b->next; + + } else { + *last_rest = b; + last_rest = &b->next; + } } + + nxt_work_queue_add(wq, handler, task, start, start->parent); + + return rest; } |