summaryrefslogblamecommitdiffhomepage
path: root/src/nxt_conn_read.c
blob: 3285abcd1ef35b3b4a1edfc3d6875ec5ebb546e7 (plain) (tree)
1
2
3
4
5
6
7
8
9
10









                            
                            
 

                                    
 

                                                       
 

                                            
 


                                                                          


               


                                                   
                                                     

                                                 



    
                                                         
 
                              
                               


                                    


            


                                                                   
 
                                                


               

                                  




                                                                          
                          
                                                   


                               

                                             

                
                                                

                                                              




                          
                                           


                                                        
                                         
                                                          

             

                                                                    
                   


                             


                                                                             
                                                        
                                                      
 



                                                        
 
                             
 





                                                                    
                                         

                                                                 

                   


         

                                                     

     
                                                           

                                                         



       
                                                















                                                                 
                                                                   






                                               
                                                                               



















                                         
                                                         



                                     
                                                         



                                  


                                                                         






                             
                                                                         








                                                 

                                                                 

                    
                                                               







                                         



                                          
 







                        
                                                        
                                     
 


                             
                                                        



                                  



                                                                



                             

/*
 * Copyright (C) Igor Sysoev
 * Copyright (C) NGINX, Inc.
 */

#include <nxt_main.h>


void
nxt_conn_wait(nxt_conn_t *c)
{
    nxt_event_engine_t      *engine;
    const nxt_conn_state_t  *state;

    nxt_debug(c->socket.task, "conn wait fd:%d rdy:%d",
              c->socket.fd, c->socket.read_ready);

    engine = c->socket.task->thread->engine;
    state = c->read_state;

    if (c->socket.read_ready) {
        nxt_work_queue_add(&engine->fast_work_queue, state->ready_handler,
                           c->socket.task, c, c->socket.data);
        return;
    }

    c->socket.read_handler = state->ready_handler;
    c->socket.error_handler = state->error_handler;

    nxt_conn_timer(engine, c, state, &c->read_timer);

    nxt_fd_event_enable_read(engine, &c->socket);
}


void
nxt_conn_io_read(nxt_task_t *task, void *obj, void *data)
{
    ssize_t                 n;
    nxt_conn_t              *c;
    nxt_event_engine_t      *engine;
    nxt_work_handler_t      handler;
    const nxt_conn_state_t  *state;

    c = obj;

    nxt_debug(task, "conn read fd:%d rdy:%d cl:%d er:%d bl:%d",
              c->socket.fd, c->socket.read_ready, c->socket.closed,
              c->socket.error, c->block_read);

    if (c->socket.error != 0 || c->block_read) {
        return;
    }

    engine = task->thread->engine;

    /*
     * Here c->io->read() is assigned instead of direct nxt_conn_io_read()
     * because the function can be called by nxt_kqueue_conn_io_read().
     */
    c->socket.read_handler = c->io->read;
    state = c->read_state;
    c->socket.error_handler = state->error_handler;

    if (c->socket.read_ready) {

        if (state->io_read_handler == NULL) {
            n = c->io->recvbuf(c, c->read);

        } else {
            n = state->io_read_handler(task, c);
            /* The state can be changed by io_read_handler. */
            state = c->read_state;
        }

        if (n > 0) {
            c->nbytes = n;

            nxt_recvbuf_update(c->read, n);

            nxt_fd_event_block_read(engine, &c->socket);

            if (state->timer_autoreset) {
                nxt_timer_disable(engine, &c->read_timer);
            }

            nxt_work_queue_add(c->read_work_queue,
                               state->ready_handler, task, c, data);
            return;
        }

        if (n != NXT_AGAIN) {
            /* n == 0 or n == NXT_ERROR. */
            handler = (n == 0) ? state->close_handler : state->error_handler;

            nxt_fd_event_block_read(engine, &c->socket);
            nxt_timer_disable(engine, &c->read_timer);

            nxt_work_queue_add(&engine->fast_work_queue,
                               handler, task, c, data);
            return;
        }

        /* n == NXT_AGAIN. */

        if (c->socket.read_ready) {
            /*
             * SSL/TLS library can return NXT_AGAIN if renegotiation
             * occured during read operation, it toggled write event
             * internally so only read timer should be set.
             */
            if (!c->read_timer.enabled) {
                nxt_conn_timer(engine, c, state, &c->read_timer);
            }

            return;
        }
    }

    if (nxt_fd_event_is_disabled(c->socket.read)) {
        nxt_fd_event_enable_read(engine, &c->socket);
    }

    if (state->timer_autoreset || !c->read_timer.enabled) {
        nxt_conn_timer(engine, c, state, &c->read_timer);
    }
}


ssize_t
nxt_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b)
{
    ssize_t                 n;
    nxt_err_t               err;
    nxt_uint_t              niov;
    struct iovec            iov[NXT_IOBUF_MAX];
    nxt_recvbuf_coalesce_t  rb;

    rb.buf = b;
    rb.iobuf = iov;
    rb.nmax = NXT_IOBUF_MAX;
    rb.size = 0;

    niov = nxt_recvbuf_mem_coalesce(&rb);

    if (niov == 1) {
        /* Disposal of surplus kernel iovec copy-in operation. */
        return nxt_conn_io_recv(c, iov->iov_base, iov->iov_len, 0);
    }

    for ( ;; ) {
        n = readv(c->socket.fd, iov, niov);

        err = (n == -1) ? nxt_socket_errno : 0;

        nxt_debug(c->socket.task, "readv(%d, %ui): %z", c->socket.fd, niov, n);

        if (n > 0) {
            if ((size_t) n < rb.size) {
                c->socket.read_ready = 0;
            }

            return n;
        }

        if (n == 0) {
            c->socket.closed = 1;
            c->socket.read_ready = 0;
            return n;
        }

        /* n == -1 */

        switch (err) {

        case NXT_EAGAIN:
            nxt_debug(c->socket.task, "readv() %E", err);
            c->socket.read_ready = 0;
            return NXT_AGAIN;

        case NXT_EINTR:
            nxt_debug(c->socket.task, "readv() %E", err);
            continue;

        default:
            c->socket.error = err;
            nxt_log(c->socket.task, nxt_socket_error_level(err),
                    "readv(%d, %ui) failed %E", c->socket.fd, niov, err);

            return NXT_ERROR;
        }
    }
}


ssize_t
nxt_conn_io_recv(nxt_conn_t *c, void *buf, size_t size, nxt_uint_t flags)
{
    ssize_t    n;
    nxt_err_t  err;

    for ( ;; ) {
        n = recv(c->socket.fd, buf, size, flags);

        err = (n == -1) ? nxt_socket_errno : 0;

        nxt_debug(c->socket.task, "recv(%d, %p, %uz, 0x%ui): %z",
                  c->socket.fd, buf, size, flags, n);

        if (n > 0) {
            if ((size_t) n < size && (flags & MSG_PEEK) == 0) {
                c->socket.read_ready = 0;
            }

            return n;
        }

        if (n == 0) {
            c->socket.closed = 1;

            if ((flags & MSG_PEEK) == 0) {
                c->socket.read_ready = 0;
            }

            return n;
        }

        /* n == -1 */

        switch (err) {

        case NXT_EAGAIN:
            nxt_debug(c->socket.task, "recv() %E", err);
            c->socket.read_ready = 0;

            return NXT_AGAIN;

        case NXT_EINTR:
            nxt_debug(c->socket.task, "recv() %E", err);
            continue;

        default:
            c->socket.error = err;
            nxt_log(c->socket.task, nxt_socket_error_level(err),
                    "recv(%d, %p, %uz, %ui) failed %E",
                    c->socket.fd, buf, size, flags, err);

            return NXT_ERROR;
        }
    }
}