diff options
author | Igor Sysoev <igor@sysoev.ru> | 2017-01-17 20:00:00 +0300 |
---|---|---|
committer | Igor Sysoev <igor@sysoev.ru> | 2017-01-17 20:00:00 +0300 |
commit | 16cbf3c076a0aca6d47adaf3f719493674cf2363 (patch) | |
tree | e6530480020f62a2bdbf249988ec3e2a751d3927 /src/nxt_sendbuf.c | |
download | unit-16cbf3c076a0aca6d47adaf3f719493674cf2363.tar.gz unit-16cbf3c076a0aca6d47adaf3f719493674cf2363.tar.bz2 |
Initial version.
Diffstat (limited to '')
-rw-r--r-- | src/nxt_sendbuf.c | 353 |
1 files changed, 353 insertions, 0 deletions
diff --git a/src/nxt_sendbuf.c b/src/nxt_sendbuf.c new file mode 100644 index 00000000..d473a29b --- /dev/null +++ b/src/nxt_sendbuf.c @@ -0,0 +1,353 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) NGINX, Inc. + */ + +#include <nxt_main.h> + + +static nxt_bool_t nxt_sendbuf_copy(nxt_buf_mem_t *bm, nxt_buf_t *b, + size_t *copied); + + +nxt_uint_t +nxt_sendbuf_mem_coalesce(nxt_sendbuf_coalesce_t *sb) +{ + u_char *last; + size_t size, total; + nxt_buf_t *b; + nxt_uint_t n; + + total = sb->size; + last = NULL; + n = (nxt_uint_t) -1; + + for (b = sb->buf; b != NULL && total < sb->limit; b = b->next) { + + nxt_prefetch(b->next); + + if (nxt_buf_is_file(b)) { + break; + } + + if (nxt_buf_is_mem(b)) { + + size = b->mem.free - b->mem.pos; + + if (size != 0) { + + if (total + size > sb->limit) { + size = sb->limit - total; + + if (size == 0) { + break; + } + } + + if (b->mem.pos != last) { + + if (++n >= sb->nmax) { + goto done; + } + + nxt_iobuf_set(&sb->iobuf[n], b->mem.pos, size); + + } else { + nxt_iobuf_add(&sb->iobuf[n], size); + } + + nxt_thread_log_debug("sendbuf: %ui, %p, %uz", n, + nxt_iobuf_data(&sb->iobuf[n]), + nxt_iobuf_size(&sb->iobuf[n])); + + total += size; + last = b->mem.pos + size; + } + + } else { + sb->sync = 1; + sb->last |= nxt_buf_is_last(b); + } + } + + n++; + +done: + + sb->buf = b; + sb->size = total; + + return n; +} + + +size_t +nxt_sendbuf_file_coalesce(nxt_sendbuf_coalesce_t *sb) +{ + size_t file_start, total; + nxt_fd_t fd; + nxt_off_t size, last; + nxt_buf_t *b; + + b = sb->buf; + fd = b->file->fd; + + total = sb->size; + + for ( ;; ) { + + nxt_prefetch(b->next); + + size = b->file_end - b->file_pos; + + if (total + size >= sb->limit) { + total = sb->limit; + break; + } + + total += size; + last = b->file_pos + size; + + b = b->next; + + if (b == NULL || !nxt_buf_is_file(b)) { + break; + } + + if (b->file_pos != last || b->file->fd != fd) { + break; + } + } + + sb->buf = b; + + file_start = sb->size; + sb->size = total; + + return total - file_start; +} + + +ssize_t +nxt_sendbuf_copy_coalesce(nxt_event_conn_t *c, nxt_buf_mem_t *bm, + nxt_buf_t *b, size_t limit) +{ + size_t size, bsize, copied; + ssize_t n; + nxt_bool_t flush; + + size = nxt_buf_mem_used_size(&b->mem); + bsize = nxt_buf_mem_size(bm); + + if (bsize != 0) { + + if (size > bsize && bm->pos == bm->free) { + /* + * A data buffer size is larger than the internal + * buffer size and the internal buffer is empty. + */ + goto no_buffer; + } + + if (bm->pos == NULL) { + bm->pos = nxt_malloc(bsize); + if (nxt_slow_path(bm->pos == NULL)) { + return NXT_ERROR; + } + + bm->start = bm->pos; + bm->free = bm->pos; + bm->end += (uintptr_t) bm->pos; + } + + copied = 0; + + flush = nxt_sendbuf_copy(bm, b, &copied); + + nxt_log_debug(c->socket.log, "sendbuf copy:%uz fl:%b", copied, flush); + + if (flush == 0) { + return copied; + } + + size = nxt_buf_mem_used_size(bm); + + if (size == 0 && nxt_buf_is_sync(b)) { + goto done; + } + + n = c->io->send(c, bm->pos, nxt_min(size, limit)); + + nxt_log_debug(c->socket.log, "sendbuf sent:%z", n); + + if (n > 0) { + bm->pos += n; + + if (bm->pos == bm->free) { + bm->pos = bm->start; + bm->free = bm->start; + } + + n = 0; + } + + return (copied != 0) ? (ssize_t) copied : n; + } + + /* No internal buffering. */ + + if (size == 0 && nxt_buf_is_sync(b)) { + goto done; + } + +no_buffer: + + return c->io->send(c, b->mem.pos, nxt_min(size, limit)); + +done: + + nxt_log_debug(c->socket.log, "sendbuf done"); + + return 0; +} + + +static nxt_bool_t +nxt_sendbuf_copy(nxt_buf_mem_t *bm, nxt_buf_t *b, size_t *copied) +{ + size_t size, bsize; + nxt_bool_t flush; + + flush = 0; + + do { + nxt_prefetch(b->next); + + if (nxt_buf_is_mem(b)) { + bsize = bm->end - bm->free; + size = b->mem.free - b->mem.pos; + size = nxt_min(size, bsize); + + nxt_memcpy(bm->free, b->mem.pos, size); + + *copied += size; + bm->free += size; + + if (bm->free == bm->end) { + return 1; + } + } + + flush |= nxt_buf_is_flush(b) || nxt_buf_is_last(b); + + b = b->next; + + } while (b != NULL); + + return flush; +} + + +nxt_buf_t * +nxt_sendbuf_update(nxt_buf_t *b, size_t sent) +{ + size_t size; + + while (b != NULL) { + + nxt_prefetch(b->next); + + if (!nxt_buf_is_sync(b)) { + + size = nxt_buf_used_size(b); + + if (size != 0) { + + if (sent == 0) { + break; + } + + if (sent < size) { + + if (nxt_buf_is_mem(b)) { + b->mem.pos += sent; + } + + if (nxt_buf_is_file(b)) { + b->file_pos += sent; + } + + break; + } + + /* b->mem.free is NULL in file-only buffer. */ + b->mem.pos = b->mem.free; + + if (nxt_buf_is_file(b)) { + b->file_pos = b->file_end; + } + + sent -= size; + } + } + + b = b->next; + } + + return b; +} + + +nxt_buf_t * +nxt_sendbuf_completion(nxt_thread_t *thr, nxt_work_queue_t *wq, nxt_buf_t *b, + size_t sent) +{ + size_t size; + + while (b != NULL) { + + nxt_prefetch(b->next); + + if (!nxt_buf_is_sync(b)) { + + size = nxt_buf_used_size(b); + + if (size != 0) { + + if (sent == 0) { + break; + } + + if (sent < size) { + + if (nxt_buf_is_mem(b)) { + b->mem.pos += sent; + } + + if (nxt_buf_is_file(b)) { + b->file_pos += sent; + } + + break; + } + + /* b->mem.free is NULL in file-only buffer. */ + b->mem.pos = b->mem.free; + + if (nxt_buf_is_file(b)) { + b->file_pos = b->file_end; + } + + sent -= size; + } + } + + nxt_thread_work_queue_add(thr, wq, b->completion_handler, + b, b->parent, thr->log); + + b = b->next; + } + + return b; +} |