summaryrefslogblamecommitdiffhomepage
path: root/src/nxt_work_queue.c
blob: 1b989eab4fb9fed7fcf7882be2536a4df736c342 (plain) (tree)


























                                                                        
                                                                   





                                                                         

























                                                                           



















                                                           






                                        
    
                                                                             
 
                                                       





                                                           
                                       
 

                                       




     
                                                           


                                          
                                                     






                           
                                                      




























                                                                   







                               


                                      

                                                                    


                      

                                     
                
                               

                                          
                                         


                                    
                              

                              












                                      
                                           



     
                  
                                                                       
                
 

                      

                                     
                    
 
                          
 

                             

     
                       
 

                       
 

                        
 

                                 
 
                         





                                             
                                                                         
 

                                     

                               
 

                         

     

                     






                                               

                                                                          
 
                              

                                
                   
 
                                     
 
                     
 

                           
 

                           
 

                            
 
                               
 


                                 
 
                                

     
                                       
 
                   








                                                                           
                      
 
                                     
 
                     
 

                     
 
                                       
 

                                 
 

                                                         
 
                          
     
 

/*
 * Copyright (C) Igor Sysoev
 * Copyright (C) NGINX, Inc.
 */

#include <nxt_main.h>


/*
 * Available work items are crucial for overall engine operation, so
 * the items are preallocated in two chunks: cache and spare chunks.
 * By default each chunk preallocates 409 work items on two or four
 * CPU pages depending on platform.  If all items in a cache chunk are
 * exhausted then a spare chunk becomes a cache chunk, and a new spare
 * chunk is allocated.  This two-step allocation mitigates low memory
 * condition impact on work queue operation.  However, if both chunks
 * are exhausted then a thread will sleep in reliance on another thread
 * frees some memory.  However, this may lead to deadlock and probably
 * a process should be aborted.  This behaviour should be considered as
 * abort on program stack exhaustion.
 *
 * The cache and spare chunks initially are also allocated in two steps:
 * a spare chunk is allocated first, then it becomes the cache chunk and
 * a new spare chunk is allocated again.
 */

static void nxt_work_queue_allocate(nxt_work_queue_cache_t *cache);


/* It should be adjusted with the "work_queue_bucket_items" directive. */
static nxt_uint_t  nxt_work_queue_bucket_items = 409;


#if (NXT_DEBUG)

nxt_inline void
nxt_work_queue_thread_assert(nxt_work_queue_t *wq)
{
    nxt_tid_t     tid;
    nxt_thread_t  *thread;

    thread = nxt_thread();
    tid = nxt_thread_tid(thread);

    if (nxt_fast_path(wq->tid == tid)) {
        return;
    }

    if (nxt_slow_path(nxt_pid != wq->pid)) {
        wq->pid = nxt_pid;
        wq->tid = tid;

        return;
    }

    nxt_log_alert(thread->log, "work queue locked by thread %PT", wq->tid);
    nxt_abort();
}


void nxt_work_queue_thread_adopt(nxt_work_queue_t *wq)
{
    nxt_thread_t  *thread;

    thread = nxt_thread();

    wq->pid = nxt_pid;
    wq->tid = nxt_thread_tid(thread);
}


void
nxt_work_queue_name(nxt_work_queue_t *wq, const char* name)
{
    nxt_work_queue_thread_assert(wq);

    wq->name = name;
}

#else

#define nxt_work_queue_thread_assert(wq)

#endif


void
nxt_work_queue_cache_create(nxt_work_queue_cache_t *cache, size_t chunk_size)
{
    nxt_memzero(cache, sizeof(nxt_work_queue_cache_t));

    if (chunk_size == 0) {
        chunk_size = nxt_work_queue_bucket_items;
    }

    /* nxt_work_queue_chunk_t already has one work item. */
    cache->chunk_size = chunk_size - 1;

    while (cache->next == NULL) {
        nxt_work_queue_allocate(cache);
    }
}


void
nxt_work_queue_cache_destroy(nxt_work_queue_cache_t *cache)
{
    nxt_work_queue_chunk_t  *chunk, *next;

    for (chunk = cache->chunk; chunk; chunk = next) {
        next = chunk->next;
        nxt_free(chunk);
    }
}


static void
nxt_work_queue_allocate(nxt_work_queue_cache_t *cache)
{
    size_t                  size;
    nxt_uint_t              i, n;
    nxt_work_t              *work;
    nxt_work_queue_chunk_t  *chunk;

    n = cache->chunk_size;
    size = sizeof(nxt_work_queue_chunk_t) + n * sizeof(nxt_work_t);

    chunk = nxt_malloc(size);

    if (nxt_fast_path(chunk != NULL)) {

        chunk->next = cache->chunk;
        cache->chunk = chunk;
        work = &chunk->work;

        for (i = 0; i < n; i++) {
            work[i].next = &work[i + 1];
        }

        work[i].next = NULL;
        work++;

    } else if (cache->spare != NULL) {

        work = NULL;

    } else {
        return;
    }

    cache->next = cache->spare;
    cache->spare = work;
}


/* Add a work to a work queue tail. */

void
nxt_work_queue_add(nxt_work_queue_t *wq, nxt_work_handler_t handler,
    nxt_task_t *task, void *obj, void *data)
{
    nxt_work_t  *work;

    nxt_work_queue_thread_assert(wq);

    for ( ;; ) {
        work = wq->cache->next;

        if (nxt_fast_path(work != NULL)) {
            wq->cache->next = work->next;
            work->next = NULL;

            work->handler = handler;
            work->task = task;
            work->obj = obj;
            work->data = data;

            if (wq->tail != NULL) {
                wq->tail->next = work;

            } else {
                wq->head = work;
            }

            wq->tail = work;

            return;
        }

        nxt_work_queue_allocate(wq->cache);
    }
}


nxt_work_handler_t
nxt_work_queue_pop(nxt_work_queue_t *wq, nxt_task_t **task, void **obj,
    void **data)
{
    nxt_work_t  *work;

    nxt_work_queue_thread_assert(wq);

    work = wq->head;

    wq->head = work->next;

    if (work->next == NULL) {
        wq->tail = NULL;
    }

    *task = work->task;

    *obj = work->obj;
    nxt_prefetch(*obj);

    *data = work->data;
    nxt_prefetch(*data);

    work->next = wq->cache->next;
    wq->cache->next = work;

    return work->handler;
}


/* Add a work to a locked work queue tail. */

void
nxt_locked_work_queue_add(nxt_locked_work_queue_t *lwq, nxt_work_t *work)
{
    nxt_thread_spin_lock(&lwq->lock);

    if (lwq->tail != NULL) {
        lwq->tail->next = work;

    } else {
        lwq->head = work;
    }

    lwq->tail = work;

    nxt_thread_spin_unlock(&lwq->lock);
}


/* Pop a work from a locked work queue head. */

nxt_work_handler_t
nxt_locked_work_queue_pop(nxt_locked_work_queue_t *lwq, nxt_task_t **task,
    void **obj, void **data)
{
    nxt_work_t          *work;
    nxt_work_handler_t  handler;

    handler = NULL;

    nxt_thread_spin_lock(&lwq->lock);

    work = lwq->head;

    if (work != NULL) {
        *task = work->task;

        *obj = work->obj;
        nxt_prefetch(*obj);

        *data = work->data;
        nxt_prefetch(*data);

        lwq->head = work->next;

        if (work->next == NULL) {
            lwq->tail = NULL;
        }

        handler = work->handler;
    }

    nxt_thread_spin_unlock(&lwq->lock);

    return handler;
}


/* Move all works from a locked work queue to a usual work queue. */

void
nxt_locked_work_queue_move(nxt_thread_t *thr, nxt_locked_work_queue_t *lwq,
    nxt_work_queue_t *wq)
{
    nxt_work_t  *work;

    nxt_thread_spin_lock(&lwq->lock);

    work = lwq->head;

    lwq->head = NULL;
    lwq->tail = NULL;

    nxt_thread_spin_unlock(&lwq->lock);

    while (work != NULL) {
        work->task->thread = thr;

        nxt_work_queue_add(wq, work->handler, work->task,
                           work->obj, work->data);

        work = work->next;
    }
}