summaryrefslogblamecommitdiffhomepage
path: root/src/nxt_event_conn_accept.c
blob: ced2a3d8223bded7db73ab787b910ca64c7eb98c (plain) (tree)



















                                                                         
                                                                      
                                  
                                                                      
                
                                                                     
                                  
                                                                   
                                  
                                                                          
                
                                                                            


                














































                                                                           
         
                                                                
 
                                     






                                                      
                                      
                                   
 
                                                                 



                                                                      
                                               


                         
                                                         


                                                                 




                                                
 
                                                             
                                                             

                                                                           









                         
                                                                           



                                      
                                 

                             


                                                        




                                                             
                                                            
 
                          

                                                                    
                            

                             
                                              
                                                                             


















                                                                      
                                                                      



                                  
                            
 
                                 



    
                                                                 







                                  
                  



                                               
                             











                                         
                                                                           









                                                

                                                                   



      
                                                         
 
                                        



    
                                                                     



                            
                                 
 

                                                                          
 
                                                                             
 

                                          
 
                                         

                                   


                                                                    

            

                                                                     

     
                                                 

                                                 

                                                         




                         
                                                                          


                         
                     

        
                                                   




                                       
                                                                    
 
                                                            






                                                                
                                                                                
 



                                




                                                                      


                                     








                                                                          
                                                                    

                                                      
                                            




                          
                                             
 
                                                    





                        
                                                                           










                                                              
                                                                          


                      

                                                                  






                                                              



                                                                    




               

                                                       





               
                                                                            
 
                                 
                                



                                  
                                                             
                  

                    
                                                  





                        
                                                                   
 
                              



           
                                                                          
 
                        


             
                                                                  
 

/*
 * Copyright (C) Igor Sysoev
 * Copyright (C) NGINX, Inc.
 */

#include <nxt_main.h>


/*
 * A listen socket handler calls an event facility specific io_accept()
 * method.  The method accept()s a new connection and then calls
 * nxt_event_conn_accept() to handle the new connection and to prepare
 * for a next connection to avoid just dropping next accept()ed socket
 * if no more connections allowed.  If there are no available connections
 * an idle connection would be closed.  If there are no idle connections
 * then new connections will not be accept()ed for 1 second.
 */


static nxt_event_conn_t *nxt_event_conn_accept_alloc(nxt_task_t *task,
    nxt_event_conn_listen_t *cls);
static void nxt_event_conn_listen_handler(nxt_task_t *task, void *obj,
    void *data);
static nxt_event_conn_t *nxt_event_conn_accept_next(nxt_task_t *task,
    nxt_event_conn_listen_t *cls);
static nxt_int_t nxt_event_conn_accept_close_idle(nxt_task_t *task,
    nxt_event_conn_listen_t *cls);
static void nxt_event_conn_listen_event_error(nxt_task_t *task, void *obj,
    void *data);
static void nxt_event_conn_listen_timer_handler(nxt_task_t *task, void *obj,
    void *data);


nxt_event_conn_listen_t *
nxt_listen_event(nxt_task_t *task, nxt_listen_socket_t *ls)
{
    nxt_event_engine_t       *engine;
    nxt_event_conn_listen_t  *cls;

    cls = nxt_zalloc(sizeof(nxt_event_conn_listen_t));

    if (nxt_fast_path(cls != NULL)) {
        cls->socket.fd = ls->socket;

        engine = task->thread->engine;
        cls->batch = engine->batch;

        cls->socket.read_work_queue = &engine->accept_work_queue;
        cls->socket.read_handler = nxt_event_conn_listen_handler;
        cls->socket.error_handler = nxt_event_conn_listen_event_error;
        cls->socket.log = &nxt_main_log;

        cls->accept = engine->event.io->accept;

        cls->listen = ls;
        cls->work_queue = &engine->read_work_queue;

        cls->timer.work_queue = &engine->fast_work_queue;
        cls->timer.handler = nxt_event_conn_listen_timer_handler;
        cls->timer.log = &nxt_main_log;

        cls->task.thread = task->thread;
        cls->task.log = &nxt_main_log;
        cls->task.ident = nxt_task_next_ident();
        cls->socket.task = &cls->task;
        cls->timer.task = &cls->task;

        if (nxt_event_conn_accept_alloc(task, cls) != NULL) {
            nxt_fd_event_enable_accept(engine, &cls->socket);

            nxt_queue_insert_head(&engine->listen_connections, &cls->link);
        }

        return cls;
    }

    return NULL;
}


nxt_int_t
nxt_event_conn_listen(nxt_task_t *task, nxt_listen_socket_t *ls)
{
    nxt_event_engine_t       *engine;
    nxt_event_conn_listen_t  *cls;

    cls = nxt_zalloc(sizeof(nxt_event_conn_listen_t));

    if (nxt_fast_path(cls != NULL)) {
        cls->socket.fd = ls->socket;

        engine = task->thread->engine;
        cls->batch = engine->batch;

        cls->socket.read_work_queue = &engine->accept_work_queue;
        cls->socket.read_handler = nxt_event_conn_listen_handler;
        cls->socket.error_handler = nxt_event_conn_listen_event_error;
        cls->socket.log = &nxt_main_log;

        cls->accept = engine->event.io->accept;

        cls->listen = ls;

        cls->timer.work_queue = &engine->fast_work_queue;
        cls->timer.handler = nxt_event_conn_listen_timer_handler;
        cls->timer.log = &nxt_main_log;

        cls->task.thread = task->thread;
        cls->task.log = &nxt_main_log;
        cls->task.ident = nxt_task_next_ident();
        cls->socket.task = &cls->task;
        cls->timer.task = &cls->task;

        if (nxt_event_conn_accept_alloc(task, cls) != NULL) {
            nxt_fd_event_enable_accept(engine, &cls->socket);

            nxt_queue_insert_head(&engine->listen_connections, &cls->link);
        }

        return NXT_OK;
    }

    return NXT_ERROR;
}


static nxt_event_conn_t *
nxt_event_conn_accept_alloc(nxt_task_t *task, nxt_event_conn_listen_t *cls)
{
    nxt_sockaddr_t       *sa, *remote;
    nxt_mem_pool_t       *mp;
    nxt_event_conn_t     *c;
    nxt_event_engine_t   *engine;
    nxt_listen_socket_t  *ls;

    engine = task->thread->engine;

    if (engine->connections < engine->max_connections) {

        mp = nxt_mem_pool_create(cls->listen->mem_pool_size);

        if (nxt_fast_path(mp != NULL)) {
            /* This allocation cannot fail. */
            c = nxt_event_conn_create(mp, cls->socket.task);

            cls->next = c;
            c->socket.read_work_queue = cls->socket.read_work_queue;
            c->socket.write_ready = 1;
            c->listen = cls;

            ls = cls->listen;
            /* This allocation cannot fail. */
            remote = nxt_sockaddr_alloc(mp, ls->socklen, ls->address_length);
            c->remote = remote;

            sa = ls->sockaddr;
            remote->type = sa->type;
            /*
             * Set address family for unspecified Unix domain,
             * because these sockaddr's are not be passed to accept().
             */
            remote->u.sockaddr.sa_family = sa->u.sockaddr.sa_family;

            return c;
        }
    }

    return NULL;
}


static void
nxt_event_conn_listen_handler(nxt_task_t *task, void *obj, void *data)
{
    nxt_event_conn_listen_t  *cls;

    cls = obj;
    cls->ready = cls->batch;

    cls->accept(task, cls, data);
}


void
nxt_event_conn_io_accept(nxt_task_t *task, void *obj, void *data)
{
    socklen_t                len;
    nxt_socket_t             s;
    struct sockaddr          *sa;
    nxt_event_conn_t         *c;
    nxt_event_conn_listen_t  *cls;

    cls = obj;
    c = cls->next;

    cls->ready--;
    cls->socket.read_ready = (cls->ready != 0);

    len = c->remote->socklen;

    if (len >= sizeof(struct sockaddr)) {
        sa = &c->remote->u.sockaddr;

    } else {
        sa = NULL;
        len = 0;
    }

    s = accept(cls->socket.fd, sa, &len);

    if (s == -1) {
        nxt_event_conn_accept_error(task, cls, "accept", nxt_socket_errno);
        return;
    }

    c->socket.fd = s;

#if (NXT_LINUX)
    /*
     * Linux does not inherit non-blocking mode
     * from listen socket for accept()ed socket.
     */
    if (nxt_slow_path(nxt_socket_nonblocking(task, s) != NXT_OK)) {
        nxt_socket_close(task, s);
    }

#endif

    nxt_debug(task, "accept(%d): %d", cls->socket.fd, s);

    nxt_event_conn_accept(task, cls, c);
}


void
nxt_event_conn_accept(nxt_task_t *task, nxt_event_conn_listen_t *cls,
    nxt_event_conn_t *c)
{
    nxt_event_conn_t  *next;

    nxt_sockaddr_text(c->remote);

    nxt_debug(task, "client: %*s",
              c->remote->address_length, nxt_sockaddr_address(c->remote));

    nxt_queue_insert_head(&task->thread->engine->idle_connections, &c->link);

    c->read_work_queue = cls->work_queue;
    c->write_work_queue = cls->work_queue;

    if (cls->listen->read_after_accept) {

        //c->socket.read_ready = 1;
//        cls->listen->handler(task, c, cls->socket.data);
        nxt_work_queue_add(c->read_work_queue, cls->listen->handler,
                           task, c, cls->socket.data);

    } else {
        nxt_work_queue_add(c->write_work_queue, cls->listen->handler,
                           task, c, cls->socket.data);
    }

    next = nxt_event_conn_accept_next(task, cls);

    if (next != NULL && cls->socket.read_ready) {
        nxt_work_queue_add(cls->socket.read_work_queue,
                           cls->accept, task, cls, next);
    }
}


static nxt_event_conn_t *
nxt_event_conn_accept_next(nxt_task_t *task, nxt_event_conn_listen_t *cls)
{
    nxt_event_conn_t  *c;

    cls->next = NULL;

    do {
        c = nxt_event_conn_accept_alloc(task, cls);

        if (nxt_fast_path(c != NULL)) {
            return c;
        }

    } while (nxt_event_conn_accept_close_idle(task, cls) == NXT_OK);

    nxt_log(task, NXT_LOG_CRIT, "no available connections, "
                  "new connections are not accepted within 1s");

    return NULL;
}


static nxt_int_t
nxt_event_conn_accept_close_idle(nxt_task_t *task, nxt_event_conn_listen_t *cls)
{
    nxt_queue_t         *idle;
    nxt_queue_link_t    *link;
    nxt_event_conn_t    *c;
    nxt_event_engine_t  *engine;

    static nxt_log_moderation_t  nxt_idle_close_log_moderation = {
        NXT_LOG_INFO, 2, "idle connections closed", NXT_LOG_MODERATION
    };

    engine = task->thread->engine;

    idle = &engine->idle_connections;

    for (link = nxt_queue_last(idle);
         link != nxt_queue_head(idle);
         link = nxt_queue_next(link))
    {
        c = nxt_queue_link_data(link, nxt_event_conn_t, link);

        if (!c->socket.read_ready) {
            nxt_log_moderate(&nxt_idle_close_log_moderation, NXT_LOG_INFO,
                             task->log, "no available connections, "
                             "close idle connection");
            nxt_queue_remove(link);
            nxt_event_conn_close(engine, c);

            return NXT_OK;
        }
    }

    nxt_timer_add(engine, &cls->timer, 1000);

    nxt_fd_event_disable_read(engine, &cls->socket);

    return NXT_DECLINED;
}


void
nxt_event_conn_accept_error(nxt_task_t *task, nxt_event_conn_listen_t *cls,
    const char *accept_syscall, nxt_err_t err)
{
    static nxt_log_moderation_t  nxt_accept_log_moderation = {
        NXT_LOG_INFO, 2, "accept() failed", NXT_LOG_MODERATION
    };

    cls->socket.read_ready = 0;

    switch (err) {

    case NXT_EAGAIN:
        nxt_debug(task, "%s(%d) %E", accept_syscall, cls->socket.fd, err);
        return;

    case ECONNABORTED:
        nxt_log_moderate(&nxt_accept_log_moderation, NXT_LOG_WARN,
                         task->log, "%s(%d) failed %E",
                         accept_syscall, cls->socket.fd, err);
        return;

    case EMFILE:
    case ENFILE:
    case ENOBUFS:
    case ENOMEM:
        if (nxt_event_conn_accept_close_idle(task, cls) != NXT_OK) {
            nxt_log(task, NXT_LOG_CRIT, "%s(%d) failed %E, "
                    "new connections are not accepted within 1s",
                    accept_syscall, cls->socket.fd, err);
        }

        return;

    default:
        nxt_log(task, NXT_LOG_CRIT, "%s(%d) failed %E",
                accept_syscall, cls->socket.fd, err);
        return;
    }
}


static void
nxt_event_conn_listen_timer_handler(nxt_task_t *task, void *obj, void *data)
{
    nxt_timer_t              *ev;
    nxt_event_conn_t         *c;
    nxt_event_conn_listen_t  *cls;

    ev = obj;

    cls = nxt_timer_data(ev, nxt_event_conn_listen_t, timer);
    c = cls->next;

    if (c == NULL) {
        c = nxt_event_conn_accept_next(task, cls);

        if (c == NULL) {
            return;
        }
    }

    nxt_fd_event_enable_accept(task->thread->engine, &cls->socket);

    cls->accept(task, cls, c);
}


static void
nxt_event_conn_listen_event_error(nxt_task_t *task, void *obj, void *data)
{
    nxt_fd_event_t  *ev;

    ev = obj;

    nxt_log(task, NXT_LOG_CRIT, "accept(%d) event error", ev->fd);
}