summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_sendbuf.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nxt_sendbuf.c')
-rw-r--r--src/nxt_sendbuf.c353
1 files changed, 353 insertions, 0 deletions
diff --git a/src/nxt_sendbuf.c b/src/nxt_sendbuf.c
new file mode 100644
index 00000000..d473a29b
--- /dev/null
+++ b/src/nxt_sendbuf.c
@@ -0,0 +1,353 @@
+
+/*
+ * Copyright (C) Igor Sysoev
+ * Copyright (C) NGINX, Inc.
+ */
+
+#include <nxt_main.h>
+
+
+static nxt_bool_t nxt_sendbuf_copy(nxt_buf_mem_t *bm, nxt_buf_t *b,
+ size_t *copied);
+
+
+nxt_uint_t
+nxt_sendbuf_mem_coalesce(nxt_sendbuf_coalesce_t *sb)
+{
+ u_char *last;
+ size_t size, total;
+ nxt_buf_t *b;
+ nxt_uint_t n;
+
+ total = sb->size;
+ last = NULL;
+ n = (nxt_uint_t) -1;
+
+ for (b = sb->buf; b != NULL && total < sb->limit; b = b->next) {
+
+ nxt_prefetch(b->next);
+
+ if (nxt_buf_is_file(b)) {
+ break;
+ }
+
+ if (nxt_buf_is_mem(b)) {
+
+ size = b->mem.free - b->mem.pos;
+
+ if (size != 0) {
+
+ if (total + size > sb->limit) {
+ size = sb->limit - total;
+
+ if (size == 0) {
+ break;
+ }
+ }
+
+ if (b->mem.pos != last) {
+
+ if (++n >= sb->nmax) {
+ goto done;
+ }
+
+ nxt_iobuf_set(&sb->iobuf[n], b->mem.pos, size);
+
+ } else {
+ nxt_iobuf_add(&sb->iobuf[n], size);
+ }
+
+ nxt_thread_log_debug("sendbuf: %ui, %p, %uz", n,
+ nxt_iobuf_data(&sb->iobuf[n]),
+ nxt_iobuf_size(&sb->iobuf[n]));
+
+ total += size;
+ last = b->mem.pos + size;
+ }
+
+ } else {
+ sb->sync = 1;
+ sb->last |= nxt_buf_is_last(b);
+ }
+ }
+
+ n++;
+
+done:
+
+ sb->buf = b;
+ sb->size = total;
+
+ return n;
+}
+
+
+size_t
+nxt_sendbuf_file_coalesce(nxt_sendbuf_coalesce_t *sb)
+{
+ size_t file_start, total;
+ nxt_fd_t fd;
+ nxt_off_t size, last;
+ nxt_buf_t *b;
+
+ b = sb->buf;
+ fd = b->file->fd;
+
+ total = sb->size;
+
+ for ( ;; ) {
+
+ nxt_prefetch(b->next);
+
+ size = b->file_end - b->file_pos;
+
+ if (total + size >= sb->limit) {
+ total = sb->limit;
+ break;
+ }
+
+ total += size;
+ last = b->file_pos + size;
+
+ b = b->next;
+
+ if (b == NULL || !nxt_buf_is_file(b)) {
+ break;
+ }
+
+ if (b->file_pos != last || b->file->fd != fd) {
+ break;
+ }
+ }
+
+ sb->buf = b;
+
+ file_start = sb->size;
+ sb->size = total;
+
+ return total - file_start;
+}
+
+
+ssize_t
+nxt_sendbuf_copy_coalesce(nxt_event_conn_t *c, nxt_buf_mem_t *bm,
+ nxt_buf_t *b, size_t limit)
+{
+ size_t size, bsize, copied;
+ ssize_t n;
+ nxt_bool_t flush;
+
+ size = nxt_buf_mem_used_size(&b->mem);
+ bsize = nxt_buf_mem_size(bm);
+
+ if (bsize != 0) {
+
+ if (size > bsize && bm->pos == bm->free) {
+ /*
+ * A data buffer size is larger than the internal
+ * buffer size and the internal buffer is empty.
+ */
+ goto no_buffer;
+ }
+
+ if (bm->pos == NULL) {
+ bm->pos = nxt_malloc(bsize);
+ if (nxt_slow_path(bm->pos == NULL)) {
+ return NXT_ERROR;
+ }
+
+ bm->start = bm->pos;
+ bm->free = bm->pos;
+ bm->end += (uintptr_t) bm->pos;
+ }
+
+ copied = 0;
+
+ flush = nxt_sendbuf_copy(bm, b, &copied);
+
+ nxt_log_debug(c->socket.log, "sendbuf copy:%uz fl:%b", copied, flush);
+
+ if (flush == 0) {
+ return copied;
+ }
+
+ size = nxt_buf_mem_used_size(bm);
+
+ if (size == 0 && nxt_buf_is_sync(b)) {
+ goto done;
+ }
+
+ n = c->io->send(c, bm->pos, nxt_min(size, limit));
+
+ nxt_log_debug(c->socket.log, "sendbuf sent:%z", n);
+
+ if (n > 0) {
+ bm->pos += n;
+
+ if (bm->pos == bm->free) {
+ bm->pos = bm->start;
+ bm->free = bm->start;
+ }
+
+ n = 0;
+ }
+
+ return (copied != 0) ? (ssize_t) copied : n;
+ }
+
+ /* No internal buffering. */
+
+ if (size == 0 && nxt_buf_is_sync(b)) {
+ goto done;
+ }
+
+no_buffer:
+
+ return c->io->send(c, b->mem.pos, nxt_min(size, limit));
+
+done:
+
+ nxt_log_debug(c->socket.log, "sendbuf done");
+
+ return 0;
+}
+
+
+static nxt_bool_t
+nxt_sendbuf_copy(nxt_buf_mem_t *bm, nxt_buf_t *b, size_t *copied)
+{
+ size_t size, bsize;
+ nxt_bool_t flush;
+
+ flush = 0;
+
+ do {
+ nxt_prefetch(b->next);
+
+ if (nxt_buf_is_mem(b)) {
+ bsize = bm->end - bm->free;
+ size = b->mem.free - b->mem.pos;
+ size = nxt_min(size, bsize);
+
+ nxt_memcpy(bm->free, b->mem.pos, size);
+
+ *copied += size;
+ bm->free += size;
+
+ if (bm->free == bm->end) {
+ return 1;
+ }
+ }
+
+ flush |= nxt_buf_is_flush(b) || nxt_buf_is_last(b);
+
+ b = b->next;
+
+ } while (b != NULL);
+
+ return flush;
+}
+
+
+nxt_buf_t *
+nxt_sendbuf_update(nxt_buf_t *b, size_t sent)
+{
+ size_t size;
+
+ while (b != NULL) {
+
+ nxt_prefetch(b->next);
+
+ if (!nxt_buf_is_sync(b)) {
+
+ size = nxt_buf_used_size(b);
+
+ if (size != 0) {
+
+ if (sent == 0) {
+ break;
+ }
+
+ if (sent < size) {
+
+ if (nxt_buf_is_mem(b)) {
+ b->mem.pos += sent;
+ }
+
+ if (nxt_buf_is_file(b)) {
+ b->file_pos += sent;
+ }
+
+ break;
+ }
+
+ /* b->mem.free is NULL in file-only buffer. */
+ b->mem.pos = b->mem.free;
+
+ if (nxt_buf_is_file(b)) {
+ b->file_pos = b->file_end;
+ }
+
+ sent -= size;
+ }
+ }
+
+ b = b->next;
+ }
+
+ return b;
+}
+
+
+nxt_buf_t *
+nxt_sendbuf_completion(nxt_thread_t *thr, nxt_work_queue_t *wq, nxt_buf_t *b,
+ size_t sent)
+{
+ size_t size;
+
+ while (b != NULL) {
+
+ nxt_prefetch(b->next);
+
+ if (!nxt_buf_is_sync(b)) {
+
+ size = nxt_buf_used_size(b);
+
+ if (size != 0) {
+
+ if (sent == 0) {
+ break;
+ }
+
+ if (sent < size) {
+
+ if (nxt_buf_is_mem(b)) {
+ b->mem.pos += sent;
+ }
+
+ if (nxt_buf_is_file(b)) {
+ b->file_pos += sent;
+ }
+
+ break;
+ }
+
+ /* b->mem.free is NULL in file-only buffer. */
+ b->mem.pos = b->mem.free;
+
+ if (nxt_buf_is_file(b)) {
+ b->file_pos = b->file_end;
+ }
+
+ sent -= size;
+ }
+ }
+
+ nxt_thread_work_queue_add(thr, wq, b->completion_handler,
+ b, b->parent, thr->log);
+
+ b = b->next;
+ }
+
+ return b;
+}