summaryrefslogblamecommitdiffhomepage
path: root/src/nxt_sendbuf.c
blob: 94f8e9eb9706f78348239550167a9106675a8672 (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11










                                                                   

                                                                   


          







































                                                                    
                                                 





























                                                              
                                                                      


























                                                                    


                                                   






                                          

                                             


                                  
                                                       
                                                

                        
                                                 

                 

                                                                          
















                                           
                 




















































                                                       

                                                                         







































































































































































                                                                              
                                                                            


                       
                                                               


                  
                                                         



             





                                                                       


                                                         
 
 


































                                                                       
     



                                                                
 

/*
 * Copyright (C) Igor Sysoev
 * Copyright (C) NGINX, Inc.
 */

#include <nxt_main.h>


static nxt_bool_t nxt_sendbuf_copy(nxt_buf_mem_t *bm, nxt_buf_t *b,
    size_t *copied);
static nxt_buf_t *nxt_sendbuf_coalesce_completion(nxt_task_t *task,
    nxt_work_queue_t *wq, nxt_buf_t *start);


nxt_uint_t
nxt_sendbuf_mem_coalesce0(nxt_task_t *task, nxt_sendbuf_t *sb,
    struct iovec *iov, nxt_uint_t niov_max)
{
    u_char      *last;
    size_t      size, total;
    nxt_buf_t   *b;
    nxt_uint_t  n;

    total = sb->size;
    last = NULL;
    n = (nxt_uint_t) -1;

    for (b = sb->buf; b != NULL && total < sb->limit; b = b->next) {

        nxt_prefetch(b->next);

        if (nxt_buf_is_file(b)) {
            break;
        }

        if (nxt_buf_is_mem(b)) {

            size = b->mem.free - b->mem.pos;

            if (size != 0) {

                if (total + size > sb->limit) {
                    size = sb->limit - total;

                    if (size == 0) {
                        break;
                    }
                }

                if (b->mem.pos != last) {

                    if (++n >= niov_max) {
                        goto done;
                    }

                    iov[n].iov_base = b->mem.pos;
                    iov[n].iov_len = size;

                } else {
                    iov[n].iov_len += size;
                }

                nxt_debug(task, "sendbuf: %ui, %p, %uz",
                          n, iov[n].iov_base, iov[n].iov_len);

                total += size;
                last = b->mem.pos + size;
            }

        } else {
            sb->sync = 1;
            sb->last |= nxt_buf_is_last(b);
        }
    }

    n++;

done:

    sb->buf = b;

    return n;
}


nxt_uint_t
nxt_sendbuf_mem_coalesce(nxt_task_t *task, nxt_sendbuf_coalesce_t *sb)
{
    u_char      *last;
    size_t      size, total;
    nxt_buf_t   *b;
    nxt_uint_t  n;

    total = sb->size;
    last = NULL;
    n = (nxt_uint_t) -1;

    for (b = sb->buf; b != NULL && total < sb->limit; b = b->next) {

        nxt_prefetch(b->next);

        if (nxt_buf_is_file(b)) {
            break;
        }

        if (nxt_buf_is_mem(b)) {

            size = b->mem.free - b->mem.pos;

            if (size != 0) {

                if (total + size > sb->limit) {
                    size = sb->limit - total;

                    sb->limit_reached = 1;

                    if (nxt_slow_path(size == 0)) {
                        break;
                    }
                }

                if (b->mem.pos != last) {

                    if (++n >= sb->nmax) {
                        sb->nmax_reached = 1;

                        goto done;
                    }

                    sb->iobuf[n].iov_base = b->mem.pos;
                    sb->iobuf[n].iov_len = size;

                } else {
                    sb->iobuf[n].iov_len += size;
                }

                nxt_debug(task, "sendbuf: %ui, %p, %uz",
                          n, sb->iobuf[n].iov_base, sb->iobuf[n].iov_len);

                total += size;
                last = b->mem.pos + size;
            }

        } else {
            sb->sync = 1;
            sb->last |= nxt_buf_is_last(b);
        }
    }

    n++;

done:

    sb->buf = b;
    sb->size = total;
    sb->niov = n;

    return n;
}


size_t
nxt_sendbuf_file_coalesce(nxt_sendbuf_coalesce_t *sb)
{
    size_t     file_start, total;
    nxt_fd_t   fd;
    nxt_off_t  size, last;
    nxt_buf_t  *b;

    b = sb->buf;
    fd = b->file->fd;

    total = sb->size;

    for ( ;; ) {

        nxt_prefetch(b->next);

        size = b->file_end - b->file_pos;

        if (total + size >= sb->limit) {
            total = sb->limit;
            break;
        }

        total += size;
        last = b->file_pos + size;

        b = b->next;

        if (b == NULL || !nxt_buf_is_file(b)) {
            break;
        }

        if (b->file_pos != last || b->file->fd != fd) {
            break;
        }
    }

    sb->buf = b;

    file_start = sb->size;
    sb->size = total;

    return total - file_start;
}


ssize_t
nxt_sendbuf_copy_coalesce(nxt_conn_t *c, nxt_buf_mem_t *bm, nxt_buf_t *b,
    size_t limit)
{
    size_t      size, bsize, copied;
    ssize_t     n;
    nxt_bool_t  flush;

    size = nxt_buf_mem_used_size(&b->mem);
    bsize = nxt_buf_mem_size(bm);

    if (bsize != 0) {

        if (size > bsize && bm->pos == bm->free) {
            /*
             * A data buffer size is larger than the internal
             * buffer size and the internal buffer is empty.
             */
            goto no_buffer;
        }

        if (bm->pos == NULL) {
            bm->pos = nxt_malloc(bsize);
            if (nxt_slow_path(bm->pos == NULL)) {
                return NXT_ERROR;
            }

            bm->start = bm->pos;
            bm->free = bm->pos;
            bm->end += (uintptr_t) bm->pos;
        }

        copied = 0;

        flush = nxt_sendbuf_copy(bm, b, &copied);

        nxt_log_debug(c->socket.log, "sendbuf copy:%uz fl:%b", copied, flush);

        if (flush == 0) {
            return copied;
        }

        size = nxt_buf_mem_used_size(bm);

        if (size == 0 && nxt_buf_is_sync(b)) {
            goto done;
        }

        n = c->io->send(c, bm->pos, nxt_min(size, limit));

        nxt_log_debug(c->socket.log, "sendbuf sent:%z", n);

        if (n > 0) {
            bm->pos += n;

            if (bm->pos == bm->free) {
                bm->pos = bm->start;
                bm->free = bm->start;
            }

            n = 0;
        }

        return (copied != 0) ? (ssize_t) copied : n;
    }

    /* No internal buffering. */

    if (size == 0 && nxt_buf_is_sync(b)) {
        goto done;
    }

no_buffer:

    return c->io->send(c, b->mem.pos, nxt_min(size, limit));

done:

    nxt_log_debug(c->socket.log, "sendbuf done");

    return 0;
}


static nxt_bool_t
nxt_sendbuf_copy(nxt_buf_mem_t *bm, nxt_buf_t *b, size_t *copied)
{
    size_t      size, bsize;
    nxt_bool_t  flush;

    flush = 0;

    do {
        nxt_prefetch(b->next);

        if (nxt_buf_is_mem(b)) {
            bsize = bm->end - bm->free;
            size = b->mem.free - b->mem.pos;
            size = nxt_min(size, bsize);

            nxt_memcpy(bm->free, b->mem.pos, size);

            *copied += size;
            bm->free += size;

            if (bm->free == bm->end) {
                return 1;
            }
        }

        flush |= nxt_buf_is_flush(b) || nxt_buf_is_last(b);

        b = b->next;

    } while (b != NULL);

    return flush;
}


nxt_buf_t *
nxt_sendbuf_update(nxt_buf_t *b, size_t sent)
{
    size_t  size;

    while (b != NULL) {

        nxt_prefetch(b->next);

        if (!nxt_buf_is_sync(b)) {

            size = nxt_buf_used_size(b);

            if (size != 0) {

                if (sent == 0) {
                    break;
                }

                if (sent < size) {

                    if (nxt_buf_is_mem(b)) {
                        b->mem.pos += sent;
                    }

                    if (nxt_buf_is_file(b)) {
                        b->file_pos += sent;
                    }

                    break;
                }

                /* b->mem.free is NULL in file-only buffer. */
                b->mem.pos = b->mem.free;

                if (nxt_buf_is_file(b)) {
                    b->file_pos = b->file_end;
                }

                sent -= size;
            }
        }

        b = b->next;
    }

    return b;
}


nxt_buf_t *
nxt_sendbuf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b)
{
    while (b != NULL) {

        if (!nxt_buf_is_sync(b) && nxt_buf_used_size(b) != 0) {
            break;
        }

        b = nxt_sendbuf_coalesce_completion(task, wq, b);
    }

    return b;
}


void
nxt_sendbuf_drain(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b)
{
    while (b != NULL) {
        b = nxt_sendbuf_coalesce_completion(task, wq, b);
    }
}


static nxt_buf_t *
nxt_sendbuf_coalesce_completion(nxt_task_t *task, nxt_work_queue_t *wq,
    nxt_buf_t *start)
{
    nxt_buf_t           *b, *next, **last, *rest, **last_rest;
    nxt_work_handler_t  handler;

    rest = NULL;
    last_rest = &rest;
    last = &start->next;
    b = start;
    handler = b->completion_handler;

    for ( ;; ) {
        next = b->next;
        if (next == NULL) {
            break;
        }

        b->next = NULL;
        b = next;

        if (!nxt_buf_is_sync(b) && nxt_buf_used_size(b) != 0) {
            *last_rest = b;
            break;
        }

        if (handler == b->completion_handler) {
            *last = b;
            last = &b->next;

        } else {
            *last_rest = b;
            last_rest = &b->next;
        }
    }

    nxt_work_queue_add(wq, handler, task, start, start->parent);

    return rest;
}