summaryrefslogblamecommitdiffhomepage
path: root/src/nxt_event_engine.c
blob: c40e8fa63d4f35085a393f2f725d3b9c96d1365a (plain) (tree)
1
2
3
4
5
6
7
8
9








                            

                                                                        
                                
                                                                           
                
                                                                     
                
                                                                      
                
                                                                           
                
                                                                        
                

                                                                                













                                                                                






                                               








                                                       
                                                          
 












                                                                  






                                                                  
                                                            





























                                                                               
                                                       


                       
                                                                 










                                                                  
                         



















                                        
                                                            









                             
                                                      















                                                                         
                                                                







                         
                                                               




















                                                          
                                                           
                                                            
                                                            




























                                                                   
                                                                           










                                   
                                                                   


                                              
                                                                



























                                                                     
                                                                     
 




                             


             
                                          








                                                  
                                                           





                                                           

                                                                               





                               
                                                        




           
                                                                      
 

                                

                          
                            
 

                                                                  



           
                                                                           
 
                                
 
                                  
 

                                                                   
 

                                                     



           
                                                                        





                                  



                                                      
                                                 

                                                                       


         
                                                                       



         

                                                            














                                                                      
                                                                   

                                                          
                                                                       




                                                                 
                                                                   
           


                                                              














                                                                               
                                                       



























                                                               
                                                            








                                           




























                                                                         



                                                  
                              















                                                                        
                             
 
                

                    
                                                                             
 



                                  
                                     



                                                                      
                                         
 
                                                                       










                                                                       
                                       


         

/*
 * Copyright (C) Igor Sysoev
 * Copyright (C) NGINX, Inc.
 */

#include <nxt_main.h>


static nxt_int_t nxt_event_engine_post_init(nxt_event_engine_t *engine);
static nxt_int_t nxt_event_engine_signal_pipe_create(
    nxt_event_engine_t *engine);
static void nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj,
    void *data);
static void nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj,
    void *data);
static void nxt_event_engine_post_handler(nxt_task_t *task, void *obj,
    void *data);
static void nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj,
    void *data);
static void nxt_event_engine_signal_handler(nxt_task_t *task, void *obj,
    void *data);
static nxt_work_handler_t nxt_event_engine_queue_pop(nxt_event_engine_t *engine,
    nxt_task_t **task, void **obj, void **data);


nxt_event_engine_t *
nxt_event_engine_create(nxt_thread_t *thr, const nxt_event_set_ops_t *event_set,
    const nxt_event_sig_t *signals, nxt_uint_t flags, nxt_uint_t batch)
{
    nxt_uint_t          events;
    nxt_event_engine_t  *engine;

    engine = nxt_zalloc(sizeof(nxt_event_engine_t));
    if (engine == NULL) {
        return NULL;
    }

    engine->task.thread = thr;
    engine->task.log = thr->log;
    engine->task.ident = nxt_task_next_ident();

    thr->engine = engine;
    thr->fiber = &engine->fibers->fiber;

    engine->batch = batch;

    if (flags & NXT_ENGINE_FIBERS) {
        engine->fibers = nxt_fiber_main_create(engine);
        if (engine->fibers == NULL) {
            goto fibers_fail;
        }
    }

    engine->current_work_queue = &engine->fast_work_queue;

    nxt_work_queue_cache_create(&engine->work_queue_cache, 0);

    engine->fast_work_queue.cache = &engine->work_queue_cache;
    engine->accept_work_queue.cache = &engine->work_queue_cache;
    engine->read_work_queue.cache = &engine->work_queue_cache;
    engine->socket_work_queue.cache = &engine->work_queue_cache;
    engine->connect_work_queue.cache = &engine->work_queue_cache;
    engine->write_work_queue.cache = &engine->work_queue_cache;
    engine->shutdown_work_queue.cache = &engine->work_queue_cache;
    engine->close_work_queue.cache = &engine->work_queue_cache;
    engine->final_work_queue.cache = &engine->work_queue_cache;

    nxt_work_queue_name(&engine->fast_work_queue, "fast");
    nxt_work_queue_name(&engine->accept_work_queue, "accept");
    nxt_work_queue_name(&engine->read_work_queue, "read");
    nxt_work_queue_name(&engine->socket_work_queue, "socket");
    nxt_work_queue_name(&engine->connect_work_queue, "connect");
    nxt_work_queue_name(&engine->write_work_queue, "write");
    nxt_work_queue_name(&engine->shutdown_work_queue, "shutdown");
    nxt_work_queue_name(&engine->close_work_queue, "close");
    nxt_work_queue_name(&engine->final_work_queue, "final");

    if (signals != NULL) {
        engine->signals = nxt_event_engine_signals(signals);
        if (engine->signals == NULL) {
            goto signals_fail;
        }

        engine->signals->handler = nxt_event_engine_signal_handler;

        if (!event_set->signal_support) {
            if (nxt_event_engine_signals_start(engine) != NXT_OK) {
                goto signals_fail;
            }
        }
    }

    /*
     * Number of event set and timers changes should be at least twice
     * more than number of events to avoid premature flushes of the changes.
     * Fourfold is for sure.
     */
    events = (batch != 0) ? batch : 32;

    engine->event_set = event_set->create(engine->signals, 4 * events, events);
    if (engine->event_set == NULL) {
        goto event_set_fail;
    }

    engine->event = event_set;

    if (nxt_event_engine_post_init(engine) != NXT_OK) {
        goto post_fail;
    }

    if (nxt_timers_init(&engine->timers, 4 * events) != NXT_OK) {
        goto timers_fail;
    }

    nxt_thread_time_update(thr);
    engine->timers.now = nxt_thread_monotonic_time(thr) / 1000000;

    engine->max_connections = 0xffffffff;

    nxt_queue_init(&engine->listen_connections);
    nxt_queue_init(&engine->idle_connections);

    engine->thread = thr;

#if !(NXT_THREADS)

    if (engine->event->signal_support) {
        thr->time.signal = -1;
    }

#endif

    return engine;

timers_fail:
post_fail:

    event_set->free(engine->event_set);

event_set_fail:
signals_fail:

    nxt_free(engine->signals);
    nxt_work_queue_cache_destroy(&engine->work_queue_cache);
    nxt_free(engine->fibers);

fibers_fail:

    nxt_free(engine);
    return NULL;
}


static nxt_int_t
nxt_event_engine_post_init(nxt_event_engine_t *engine)
{
    if (engine->event->enable_post != NULL) {
        return engine->event->enable_post(engine->event_set,
                                          nxt_event_engine_post_handler);
    }

#if !(NXT_THREADS)

    /* Only signals may are posted in single-threaded mode. */

    if (engine->event->signal_support) {
        return NXT_OK;
    }

#endif

    if (nxt_event_engine_signal_pipe_create(engine) != NXT_OK) {
        return NXT_ERROR;
    }

    return NXT_OK;
}


static nxt_int_t
nxt_event_engine_signal_pipe_create(nxt_event_engine_t *engine)
{
    nxt_event_engine_pipe_t  *pipe;

    pipe = nxt_zalloc(sizeof(nxt_event_engine_pipe_t));
    if (pipe == NULL) {
        return NXT_ERROR;
    }

    engine->pipe = pipe;

    /*
     * An event engine pipe is in blocking mode for writer
     * and in non-blocking node for reader.
     */

    if (nxt_pipe_create(pipe->fds, 1, 0) != NXT_OK) {
        nxt_free(pipe);
        return NXT_ERROR;
    }

    pipe->event.fd = pipe->fds[0];
    pipe->event.read_work_queue = &engine->fast_work_queue;
    pipe->event.read_handler = nxt_event_engine_signal_pipe;
    pipe->event.write_work_queue = &engine->fast_work_queue;
    pipe->event.error_handler = nxt_event_engine_signal_pipe_error;
    pipe->event.log = &nxt_main_log;

    nxt_event_fd_enable_read(engine, &pipe->event);

    return NXT_OK;
}


static void
nxt_event_engine_signal_pipe_free(nxt_event_engine_t *engine)
{
    nxt_event_engine_pipe_t  *pipe;

    pipe = engine->pipe;

    if (pipe != NULL) {

        if (pipe->event.read_work_queue != NULL) {
            nxt_event_fd_close(engine, &pipe->event);
            nxt_pipe_close(pipe->fds);
        }

        nxt_free(pipe);
    }
}


static void
nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj, void *data)
{
    nxt_event_engine_pipe_t  *pipe;

    pipe = obj;

    nxt_pipe_close(pipe->fds);
    nxt_free(pipe);
}


void
nxt_event_engine_post(nxt_event_engine_t *engine, nxt_work_t *work)
{
    nxt_thread_log_debug("event engine post");

    nxt_locked_work_queue_add(&engine->locked_work_queue, work);

    nxt_event_engine_signal(engine, 0);
}


void
nxt_event_engine_signal(nxt_event_engine_t *engine, nxt_uint_t signo)
{
    u_char  buf;

    nxt_thread_log_debug("event engine signal:%ui", signo);

    /*
     * A signal number may be sent in a signal context, so the signal
     * information cannot be passed via a locked work queue.
     */

    if (engine->event->signal != NULL) {
        engine->event->signal(engine->event_set, signo);
        return;
    }

    buf = (u_char) signo;
    (void) nxt_fd_write(engine->pipe->fds[1], &buf, 1);
}


static void
nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj, void *data)
{
    int             i, n;
    u_char          signo;
    nxt_bool_t      post;
    nxt_event_fd_t  *ev;
    u_char          buf[128];

    ev = obj;

    nxt_debug(task, "engine signal pipe");

    post = 0;

    do {
        n = nxt_fd_read(ev->fd, buf, sizeof(buf));

        for (i = 0; i < n; i++) {
            signo = buf[i];

            nxt_debug(task, "engine pipe signo:%d", signo);

            if (signo == 0) {
                /* A post should be processed only once. */
                post = 1;

            } else {
                nxt_event_engine_signal_handler(task,
                                             (void *) (uintptr_t) signo, NULL);
            }
        }

    } while (n == sizeof(buf));

    if (post) {
        nxt_event_engine_post_handler(task, NULL, NULL);
    }
}


static void
nxt_event_engine_post_handler(nxt_task_t *task, void *obj, void *data)
{
    nxt_thread_t        *thread;
    nxt_event_engine_t  *engine;

    thread = task->thread;
    engine = thread->engine;

    nxt_locked_work_queue_move(thread, &engine->locked_work_queue,
                               &engine->fast_work_queue);
}


static void
nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj, void *data)
{
    nxt_event_engine_t  *engine;

    engine = task->thread->engine;

    nxt_log(task, NXT_LOG_CRIT, "engine pipe(%FD:%FD) event error",
            engine->pipe->fds[0], engine->pipe->fds[1]);

    nxt_event_fd_close(engine, &engine->pipe->event);
    nxt_pipe_close(engine->pipe->fds);
}


static void
nxt_event_engine_signal_handler(nxt_task_t *task, void *obj, void *data)
{
    uintptr_t              signo;
    const nxt_event_sig_t  *sigev;

    signo = (uintptr_t) obj;

    for (sigev = task->thread->engine->signals->sigev;
         sigev->signo != 0;
         sigev++)
    {
        if (signo == (nxt_uint_t) sigev->signo) {
            sigev->handler(task, (void *) signo, (void *) sigev->name);
            return;
        }
    }

    nxt_log(task, NXT_LOG_CRIT, "signal %ui handler not found", signo);
}


nxt_int_t
nxt_event_engine_change(nxt_thread_t *thr, nxt_task_t *task,
    const nxt_event_set_ops_t *event_set, nxt_uint_t batch)
{
    nxt_uint_t          events;
    nxt_event_engine_t  *engine;

    engine = thr->engine;
    engine->batch = batch;

    if (!engine->event->signal_support && event_set->signal_support) {
        /*
         * Block signal processing if the current event
         * facility does not support signal processing.
         */
        nxt_event_engine_signals_stop(engine);

        /*
         * Add to engine fast work queue the signal events possibly
         * received before the blocking signal processing.
         */
        nxt_event_engine_signal_pipe(task, &engine->pipe->event, NULL);
    }

    if (engine->pipe != NULL && event_set->enable_post != NULL) {
        /*
         * An engine pipe must be closed after all signal events
         * added above to engine fast work queue will be processed.
         */
        nxt_work_queue_add(&engine->final_work_queue,
                           nxt_event_engine_signal_pipe_close,
                           &engine->task, engine->pipe, NULL);

        engine->pipe = NULL;
    }

    engine->event->free(engine->event_set);

    events = (batch != 0) ? batch : 32;

    engine->event_set = event_set->create(engine->signals, 4 * events, events);
    if (engine->event_set == NULL) {
        return NXT_ERROR;
    }

    engine->event = event_set;

    if (nxt_event_engine_post_init(engine) != NXT_OK) {
        return NXT_ERROR;
    }

    if (engine->signals != NULL) {

        if (!engine->event->signal_support) {
            return nxt_event_engine_signals_start(engine);
        }

#if (NXT_THREADS)
        /*
         * Reset the PID flag to start the signal thread if
         * some future event facility will not support signals.
         */
        engine->signals->process = 0;
#endif
    }

    return NXT_OK;
}


void
nxt_event_engine_free(nxt_event_engine_t *engine)
{
    nxt_event_engine_signal_pipe_free(engine);
    nxt_free(engine->signals);

    nxt_work_queue_cache_destroy(&engine->work_queue_cache);

    engine->event->free(engine->event_set);

    /* TODO: free timers */

    nxt_free(engine);
}


static nxt_work_handler_t
nxt_event_engine_queue_pop(nxt_event_engine_t *engine, nxt_task_t **task,
    void **obj, void **data)
{
    nxt_work_queue_t  *wq;

    wq = engine->current_work_queue;

    if (wq->head == NULL) {
        wq = &engine->fast_work_queue;

        while (wq->head == NULL) {
            engine->current_work_queue++;
            wq = engine->current_work_queue;

            if (wq > &engine->final_work_queue) {
                engine->current_work_queue = &engine->fast_work_queue;

                return NULL;
            }
        }
    }

    nxt_debug(&engine->task, "work queue: %s", wq->name);

    return nxt_work_queue_pop(wq, task, obj, data);
}


void
nxt_event_engine_start(nxt_event_engine_t *engine)
{
    void                *obj, *data;
    nxt_task_t          *task;
    nxt_msec_t          timeout, now;
    nxt_thread_t        *thr;
    nxt_work_handler_t  handler;

    thr = nxt_thread();

    if (engine->fibers) {
        /*
         * _setjmp() cannot be wrapped in a function since return from
         * the function clobbers stack used by future _setjmp() returns.
         */
        _setjmp(engine->fibers->fiber.jmp);

        /* A return point from fibers. */
    }

    thr->log = &nxt_main_log;

    for ( ;; ) {

        for ( ;; ) {
            handler = nxt_event_engine_queue_pop(engine, &task, &obj, &data);

            if (handler == NULL) {
                break;
            }

            handler(task, obj, data);
        }

        /* Attach some event engine work queues in preferred order. */

        timeout = nxt_timer_find(engine);

        engine->event->poll(&engine->task, engine->event_set, timeout);

        /*
         * Look up expired timers only if a new zero timer has been
         * just added before the event poll or if the event poll slept
         * at least 1 millisecond, because all old eligible timers were
         * processed in the previous iterations.
         */

        now = nxt_thread_monotonic_time(thr) / 1000000;

        if (timeout == 0 || now != engine->timers.now) {
            nxt_timer_expire(thr, now);
        }
    }
}