summaryrefslogblamecommitdiffhomepage
path: root/src/nxt_application.c
blob: eedfd0c94565fe67e13ac7ed30a5a52866c4ae10 (plain) (tree)
1
2
3
4
5
6
7
8
9







                                     
                        





                                      
                                                                            





                                                                       
                                                                  


                                                                              
                

                                                                              

                                                                   
                                                                         
                                                                           









                                                                          






                      













                                                                              
                                          














                                                             
                                                  



                                



                                                    







                                                                           
                                                 


                                      
                             







                                                
































                                                                     






                                

                                 
                                
                                  
                               








                                          
             
 

                                                                         









                                                           
                                  

                

                                                  

                                           

                                    



















































                                                                             

                                                    






































                                                                  






                                   




























                                                                          

                                                     














































                                                                             

                                 








                                            

                               






                                                                           

                                               



















































                                                                         

                                 






































                                                                          

                                  
 
                                         




























                                                              

                                                              


                                                           
                                                                                


                     

                                                            













                                                                           







                                                          
                                             



                               



                                 
                                  















                                                                               
                                                                          




                       

                                        



                

                                                          
 
                                                          
 

                                          


                                
                          

     
                      


                      
                                                                   
 
                                              



                                                                         
                                                                 
















                                                              
                                                                      
 





                                    















                                              

                                                                





                                                          
                       



                        
                          







                                             
                                                 


































                                                                             






                                                   


                                 



                                             
                                         











                                         
                                             








                                          
                        





                                                           
                                 











                                
                                         





                  












                                                                        
                                                                 







                          
                                            













                                        
                                                            



                                                     

                                                                            





                                                   
                                                  





                                                                 
                             












                                 
                                                               
 
                                



                         

                                          












                                             

                                                                   


 















                                                                 
           
                                                                    

                                      
                          

                          
                                               

















                                                
 



                                                           
















                                                                         
                                         





                                                 
                                                               




                         
                                          
 
                                   



           
                                                                 




                         
                                            
 
                                   











                                                                 
                                                            




                           

                                         



                                                   

                                                                   



           
                                                              
 

                          
 
            
 

                                            
                       



                                      

/*
 * Copyright (C) Igor Sysoev
 * Copyright (C) Valentin V. Bartenev
 * Copyright (C) NGINX, Inc.
 */

#include <nxt_main.h>
#include <nxt_runtime.h>
#include <nxt_application.h>


#define NXT_PARSE_AGAIN  (u_char *) -1


static nxt_int_t nxt_app_listen_socket(nxt_task_t *task, nxt_runtime_t *rt);
static void nxt_app_thread(void *ctx);
static nxt_app_request_t *nxt_app_request_create(nxt_socket_t s,
    nxt_log_t *log);
static void nxt_app_conn_update(nxt_thread_t *thr, nxt_event_conn_t *c,
    nxt_log_t *log);
static nxt_int_t nxt_app_write_finish(nxt_app_request_t *r);
static void nxt_app_buf_send(nxt_event_conn_t *c, nxt_buf_t *out);
static void nxt_app_delivery_handler(nxt_task_t *task, void *obj, void *data);
static void nxt_app_delivery_ready(nxt_task_t *task, void *obj, void *data);
static void nxt_app_delivery_completion(nxt_task_t *task, void *obj,
    void *data);
static void nxt_app_delivery_error(nxt_task_t *task, void *obj, void *data);
static void nxt_app_delivery_timeout(nxt_task_t *task, void *obj, void *data);
static nxt_msec_t nxt_app_delivery_timer_value(nxt_event_conn_t *c,
    uintptr_t data);
static void nxt_app_delivery_done(nxt_task_t *task, nxt_event_conn_t *c);
static void nxt_app_close_request(nxt_task_t *task, void *obj, void *data);


typedef struct nxt_app_http_parse_state_s  nxt_app_http_parse_state_t;

struct nxt_app_http_parse_state_s {
    u_char     *pos;
    nxt_int_t  (*handler)(nxt_app_request_header_t *h, u_char *start,
                          u_char *end, nxt_app_http_parse_state_t *state);
};


typedef struct {
     nxt_work_t  work;
     nxt_buf_t   buf;
} nxt_app_buf_t;


static nxt_int_t nxt_app_http_parse_request(nxt_app_request_t *r, u_char *buf,
    size_t size);
static nxt_int_t nxt_app_http_parse_request_line(nxt_app_request_header_t *h,
    u_char *start, u_char *end, nxt_app_http_parse_state_t *state);
static nxt_int_t nxt_app_http_parse_field_value(nxt_app_request_header_t *h,
    u_char *start, u_char *end, nxt_app_http_parse_state_t *state);
static nxt_int_t nxt_app_http_parse_field_name(nxt_app_request_header_t *h,
    u_char *start, u_char *end, nxt_app_http_parse_state_t *state);

static nxt_int_t nxt_app_http_process_headers(nxt_app_request_t *r);


static const nxt_event_conn_state_t  nxt_app_delivery_write_state;

nxt_application_module_t         *nxt_app;

static nxt_thread_mutex_t        nxt_app_mutex;
static nxt_thread_cond_t         nxt_app_cond;

static nxt_buf_t                 *nxt_app_buf_free;
static nxt_buf_t                 *nxt_app_buf_done;

static nxt_event_engine_t        *nxt_app_engine;
static nxt_mem_pool_t            *nxt_app_mem_pool;

static nxt_uint_t                nxt_app_buf_current_number;
static nxt_uint_t                nxt_app_buf_max_number = 16;


nxt_int_t
nxt_app_start(nxt_task_t *task, nxt_runtime_t *rt)
{
    nxt_thread_link_t    *link;
    nxt_thread_handle_t  handle;

    if (nxt_app_listen_socket(task, rt) != NXT_OK) {
        return NXT_ERROR;
    }

    if (nxt_slow_path(nxt_thread_mutex_create(&nxt_app_mutex) != NXT_OK)) {
        return NXT_ERROR;
    }

    if (nxt_slow_path(nxt_thread_cond_create(&nxt_app_cond) != NXT_OK)) {
        return NXT_ERROR;
    }

    link = nxt_zalloc(sizeof(nxt_thread_link_t));

    if (nxt_fast_path(link != NULL)) {
        link->start = nxt_app_thread;
        link->work.data = rt;

        return nxt_thread_create(&handle, link);
    }

    return NXT_ERROR;
}


static nxt_int_t
nxt_app_listen_socket(nxt_task_t *task, nxt_runtime_t *rt)
{
    nxt_sockaddr_t       *sa;
    nxt_listen_socket_t  *ls;

    sa = nxt_sockaddr_alloc(rt->mem_pool, sizeof(struct sockaddr_in),
                            NXT_INET_ADDR_STR_LEN);
    if (sa == NULL) {
        return NXT_ERROR;
    }

    sa->type = SOCK_STREAM;
    sa->u.sockaddr_in.sin_family = AF_INET;
    sa->u.sockaddr_in.sin_port = htons(8080);

    nxt_sockaddr_text(sa);

    ls = nxt_runtime_listen_socket_add(rt, sa);
    if (ls == NULL) {
        return NXT_ERROR;
    }

    ls->read_after_accept = 1;

    if (nxt_listen_socket_create(task, ls, 0) != NXT_OK) {
        return NXT_ERROR;
    }

    return NXT_OK;
}


#define SIZE  4096

static void
nxt_app_thread(void *ctx)
{
    ssize_t                 n;
    nxt_err_t               err;
    nxt_socket_t            s;
    nxt_thread_t            *thr;
    nxt_runtime_t           *rt;
    nxt_queue_link_t        *link;
    nxt_app_request_t       *r;
    nxt_listen_socket_t     *ls;
    u_char                  buf[SIZE];
    const size_t            size = SIZE;
    nxt_app_header_field_t  fields[128];

    thr = nxt_thread();

    nxt_log_debug(thr->log, "app thread");

    rt = ctx;

    link = nxt_queue_first(&rt->engines);
    nxt_app_engine = nxt_queue_link_data(link, nxt_event_engine_t, link);

    nxt_app_mem_pool = nxt_mem_pool_create(512);
    if (nxt_slow_path(nxt_app_mem_pool == NULL)) {
        return;
    }

    if (nxt_slow_path(nxt_app->init(thr) != NXT_OK)) {
        nxt_log_debug(thr->log, "application init failed");
    }

    ls = rt->listen_sockets->elts;

    for ( ;; ) {
        nxt_log_debug(thr->log, "wait on accept");

        s = accept(ls->socket, NULL, NULL);

        nxt_thread_time_update(thr);

        if (nxt_slow_path(s == -1)) {
            err = nxt_socket_errno;

            nxt_log_error(NXT_LOG_ERR, thr->log, "accept(%d) failed %E",
                          ls->socket, err);

            if (err == EBADF) {
                /* STUB: ls->socket has been closed on exit. */
                return;
            }

            continue;
        }

        nxt_log_debug(thr->log, "accept(%d): %d", ls->socket, s);

        n = recv(s, buf, size, 0);

        if (nxt_slow_path(n <= 0)) {
            err = (n == 0) ? 0 : nxt_socket_errno;

            nxt_log_error(NXT_LOG_ERR, thr->log, "recv(%d, %uz) failed %E",
                          s, size, err);
            close(s);
            continue;
        }

        nxt_log_debug(thr->log, "recv(%d, %uz): %z", s, size, n);

        r = nxt_app_request_create(s, thr->log);
        if (nxt_slow_path(r == NULL)) {
            goto fail;
        }

        r->header.fields = fields;

        //nxt_app->start(r);

        if (nxt_app_http_parse_request(r, buf, n) != NXT_OK) {
            nxt_log_debug(thr->log, "nxt_app_http_parse_request() failed");
            nxt_mem_pool_destroy(r->mem_pool);
            goto fail;
        }

        if (nxt_app_http_process_headers(r) != NXT_OK) {
            nxt_log_debug(thr->log, "nxt_app_http_process_headers() failed");
            nxt_mem_pool_destroy(r->mem_pool);
            goto fail;
        }

        nxt_app->run(r);

        nxt_log_debug(thr->log, "app request done");

        if (nxt_slow_path(nxt_app_write_finish(r) == NXT_ERROR)) {
            goto fail;
        }

        continue;

    fail:

        close(s);
        nxt_nanosleep(1000000000);  /* 1s */
    }
}


static nxt_app_request_t *
nxt_app_request_create(nxt_socket_t s, nxt_log_t *log)
{
    nxt_mem_pool_t     *mp;
    nxt_event_conn_t   *c;
    nxt_app_request_t  *r;

    mp = nxt_mem_pool_create(1024);
    if (nxt_slow_path(mp == NULL)) {
        return NULL;
    }

    r = nxt_mem_zalloc(mp, sizeof(nxt_app_request_t));
    if (nxt_slow_path(r == NULL)) {
        return NULL;
    }

    c = nxt_mem_zalloc(mp, sizeof(nxt_event_conn_t));
    if (nxt_slow_path(c == NULL)) {
        return NULL;
    }

    c->socket.fd = s;
    c->socket.data = r;

    c->task.thread = nxt_thread();
    c->task.log = log;
    c->task.ident = log->ident;
    c->socket.task = &c->task;
    c->read_timer.task = &c->task;
    c->write_timer.task = &c->task;

    r->mem_pool = mp;
    r->event_conn = c;
    r->log = log;

    return r;
}


static nxt_int_t
nxt_app_http_parse_request(nxt_app_request_t *r, u_char *buf, size_t size)
{
    u_char                      *end;
    ssize_t                     n;
    nxt_err_t                   err;
    nxt_socket_t                s;
    nxt_app_http_parse_state_t  state;

    end = buf + size;

    state.pos = buf;
    state.handler = nxt_app_http_parse_request_line;

    for ( ;; ) {
        switch (state.handler(&r->header, state.pos, end, &state)) {

        case NXT_OK:
            continue;

        case NXT_DONE:
            r->body_preread.length = end - state.pos;
            r->body_preread.start = state.pos;

            return NXT_OK;

        case NXT_AGAIN:
            s = r->event_conn->socket.fd;
            n = recv(s, end, SIZE - size, 0);

            if (nxt_slow_path(n <= 0)) {
                err = (n == 0) ? 0 : nxt_socket_errno;

                nxt_log_error(NXT_LOG_ERR, r->log, "recv(%d, %uz) failed %E",
                              s, size, err);

                return NXT_ERROR;
            }

            nxt_log_debug(r->log, "recv(%d, %uz): %z", s, SIZE - size, n);

            size += n;
            end += n;

            continue;
        }

        return NXT_ERROR;
    }
}


static nxt_int_t
nxt_app_http_parse_request_line(nxt_app_request_header_t *h, u_char *start,
    u_char *end, nxt_app_http_parse_state_t *state)
{
    u_char  *p;

    for (p = start; /* void */; p++) {

        if (nxt_slow_path(p == end)) {
            state->pos = p;
            return NXT_AGAIN;
        }

        if (*p == ' ') {
            break;
        }
    }

    h->method.length = p - start;
    h->method.start = start;

    start = p + 1;

    p = nxt_memchr(start, ' ', end - start);

    if (nxt_slow_path(p == NULL)) {
        return NXT_AGAIN;
    }

    h->path.length = p - start;
    h->path.start = start;

    start = p + 1;

    if (nxt_slow_path((size_t) (end - start) < sizeof("HTTP/1.1\n") - 1)) {
        return NXT_AGAIN;
    }

    h->version.length = sizeof("HTTP/1.1") - 1;
    h->version.start = start;

    p = start + sizeof("HTTP/1.1") - 1;

    if (nxt_slow_path(*p == '\n')) {
        return nxt_app_http_parse_field_name(h, p + 1, end, state);
    }

    if (nxt_slow_path(end - p < 2)) {
        return NXT_AGAIN;
    }

    return nxt_app_http_parse_field_name(h, p + 2, end, state);
}


static nxt_int_t
nxt_app_http_parse_field_name(nxt_app_request_header_t *h, u_char *start,
    u_char *end, nxt_app_http_parse_state_t *state)
{
    u_char                  *p;
    nxt_app_header_field_t  *fld;

    if (nxt_slow_path(start == end)) {
        goto again;
    }

    if (nxt_slow_path(*start == '\n')) {
        state->pos = start + 1;
        return NXT_DONE;
    }

    if (*start == '\r') {
        if (nxt_slow_path(end - start < 2)) {
            goto again;
        }

        if (nxt_slow_path(start[1] != '\n')) {
            return NXT_ERROR;
        }

        state->pos = start + 2;
        return NXT_DONE;
    }

    p = nxt_memchr(start, ':', end - start);

    if (nxt_slow_path(p == NULL)) {
        goto again;
    }

    fld = &h->fields[h->fields_num];

    fld->name.length = p - start;
    fld->name.start = start;

    return nxt_app_http_parse_field_value(h, p + 1, end, state);

again:

    state->pos = start;
    state->handler = nxt_app_http_parse_field_name;

    return NXT_AGAIN;
}


static nxt_int_t
nxt_app_http_parse_field_value(nxt_app_request_header_t *h, u_char *start,
    u_char *end, nxt_app_http_parse_state_t *state)
{
    u_char                  *p;
    nxt_app_header_field_t  *fld;

    for ( ;; ) {
        if (nxt_slow_path(start == end)) {
            goto again;
        }

        if (*start != ' ') {
            break;
        }

        start++;
    }

    p = nxt_memchr(start, '\n', end - start);

    if (nxt_slow_path(p == NULL)) {
        goto again;
    }

    fld = &h->fields[h->fields_num];

    fld->value.length = p - start;
    fld->value.start = start;

    fld->value.length -= (p[-1] == '\r');

    h->fields_num++;

    state->pos = p + 1;
    state->handler = nxt_app_http_parse_field_name;

    return NXT_OK;

again:

    state->pos = start;
    state->handler = nxt_app_http_parse_field_value;

    return NXT_AGAIN;
}


static nxt_int_t
nxt_app_http_process_headers(nxt_app_request_t *r)
{
    nxt_uint_t               i;
    nxt_app_header_field_t  *fld;

    static const u_char content_length[14] = "Content-Length";
    static const u_char content_type[12] = "Content-Type";

    for (i = 0; i < r->header.fields_num; i++) {
        fld = &r->header.fields[i];

        if (fld->name.length == sizeof(content_length)
            && nxt_memcasecmp(fld->name.start, content_length,
                              sizeof(content_length)) == 0)
        {
            r->header.content_length = &fld->value;
            r->body_rest = nxt_off_t_parse(fld->value.start, fld->value.length);
            continue;
        }

        if (fld->name.length == sizeof(content_type)
            && nxt_memcasecmp(fld->name.start, content_type,
                              sizeof(content_type)) == 0)
        {
            r->header.content_type = &fld->value;
            continue;
        }
    }

    return NXT_OK;
}


static void
nxt_app_conn_update(nxt_thread_t *thr, nxt_event_conn_t *c, nxt_log_t *log)
{
    c->socket.write_ready = 1;

    c->socket.log = &c->log;
    c->log = *log;

    /* The while loop skips possible uint32_t overflow. */

    while (c->log.ident == 0) {
        c->log.ident = nxt_task_next_ident();
    }

    thr->engine->connections++;

    c->task.thread = thr;
    c->task.log = &c->log;
    c->task.ident = c->log.ident;

    c->io = thr->engine->event.io;
    c->max_chunk = NXT_INT32_T_MAX;
    c->sendfile = NXT_CONN_SENDFILE_UNSET;

    c->socket.read_work_queue = &thr->engine->read_work_queue;
    c->socket.write_work_queue = &thr->engine->write_work_queue;
    c->read_work_queue = &thr->engine->read_work_queue;
    c->write_work_queue = &thr->engine->write_work_queue;

    nxt_event_conn_timer_init(&c->read_timer, c, c->socket.read_work_queue);
    nxt_event_conn_timer_init(&c->write_timer, c, c->socket.write_work_queue);

    nxt_log_debug(&c->log, "event connections: %uD", thr->engine->connections);
}


nxt_int_t
nxt_app_http_read_body(nxt_app_request_t *r, u_char *start, size_t length)
{
    size_t     preread;
    ssize_t    n;
    nxt_err_t  err;

    if ((off_t) length > r->body_rest) {
        length = (size_t) r->body_rest;
    }

    preread = 0;

    if (r->body_preread.length != 0) {
        preread = nxt_min(r->body_preread.length, length);

        nxt_memcpy(start, r->body_preread.start, preread);

        r->body_preread.length -= preread;
        r->body_preread.start += preread;

        r->body_rest -= preread;

        length -= preread;
    }

    if (length == 0) {
        return NXT_OK;
    }

    n = recv(r->event_conn->socket.fd, start + preread, length, 0);

    if (nxt_slow_path(n < (ssize_t) length)) {
        if (n <= 0) {
            err = (n == 0) ? 0 : nxt_socket_errno;

            nxt_log_error(NXT_LOG_ERR, r->log, "recv(%d, %uz) failed %E",
                          r->event_conn->socket.fd, length, err);

            return NXT_ERROR;
        }

        nxt_log_error(NXT_LOG_ERR, r->log,
                      "client prematurely closed connection");

        return NXT_ERROR;
    }

    r->body_rest -= n;

    return NXT_OK;
}


nxt_int_t
nxt_app_write(nxt_app_request_t *r, const u_char *data, size_t length)
{
    void           *start;
    size_t         free;
    nxt_err_t      err;
    nxt_buf_t      *b, *out, **next;
    nxt_uint_t     bufs;
    nxt_app_buf_t  *ab;

    out = NULL;
    next = &out;

    b = r->output_buf;

    if (b == NULL) {
        bufs = 0;
        goto get_buf;
    }

    bufs = 1;

    for ( ;; ) {
        free = nxt_buf_mem_free_size(&b->mem);

        if (free > length) {
            b->mem.free = nxt_cpymem(b->mem.free, data, length);
            break;
        }

        b->mem.free = nxt_cpymem(b->mem.free, data, free);

        data += free;
        length -= free;

        *next = b;
        next = &b->next;

        if (length == 0) {
            b = NULL;
            break;
        }

        if (bufs == nxt_app_buf_max_number) {
            bufs = 0;
            *next = NULL;

            nxt_app_buf_send(r->event_conn, out);

            out = NULL;
            next = &out;
        }

    get_buf:

        if (nxt_slow_path(nxt_thread_mutex_lock(&nxt_app_mutex) != NXT_OK)) {
            return NXT_ERROR;
        }

        for ( ;; ) {
            b = nxt_app_buf_free;

            if (b != NULL) {
                nxt_app_buf_free = b->next;
                break;
            }

            if (nxt_app_buf_current_number < nxt_app_buf_max_number) {
                break;
            }

            err = nxt_thread_cond_wait(&nxt_app_cond, &nxt_app_mutex,
                                       NXT_INFINITE_NSEC);

            if (nxt_slow_path(err != 0)) {
                (void) nxt_thread_mutex_unlock(&nxt_app_mutex);
                return NXT_ERROR;
            }
        }

        (void) nxt_thread_mutex_unlock(&nxt_app_mutex);

        if (b == NULL) {
            start = nxt_malloc(4096);
            if (nxt_slow_path(start == NULL)) {
                return NXT_ERROR;
            }

            ab = nxt_zalloc(sizeof(nxt_app_buf_t));
            if (nxt_slow_path(ab == NULL)) {
                return NXT_ERROR;
            }

            b = &ab->buf;

            nxt_buf_mem_init(b, start, 4096);

            b->completion_handler = NULL;

            nxt_app_buf_current_number++;
        }

        bufs++;
    }

    r->output_buf = b;

    if (out != NULL) {
        *next = NULL;

        nxt_app_buf_send(r->event_conn, out);
    }

    return NXT_OK;
}


static nxt_int_t
nxt_app_write_finish(nxt_app_request_t *r)
{
    nxt_buf_t  *b, *out;

    b = nxt_buf_sync_alloc(r->mem_pool, NXT_BUF_SYNC_LAST);
    if (nxt_slow_path(b == NULL)) {
        return NXT_ERROR;
    }

    b->completion_handler = NULL;
    b->parent = (nxt_buf_t *) r;

    out = r->output_buf;

    if (out != NULL) {
        r->output_buf = NULL;
        out->next = b;

    } else {
        out = b;
    }

    nxt_app_buf_send(r->event_conn, out);

    return NXT_OK;
}


static void
nxt_app_buf_send(nxt_event_conn_t *c, nxt_buf_t *out)
{
    nxt_app_buf_t  *ab;

    ab = nxt_container_of(out, nxt_app_buf_t, buf);

    nxt_work_set(&ab->work, nxt_app_delivery_handler, &c->task, c, out);

    nxt_event_engine_post(nxt_app_engine, &ab->work);
}


static void
nxt_app_delivery_handler(nxt_task_t *task, void *obj, void *data)
{
    nxt_buf_t         *b;
    nxt_mem_pool_t    *mp;
    nxt_event_conn_t  *c;

    c = obj;
    b = data;

    nxt_debug(task, "app delivery handler");

    if (c->write != NULL) {
        nxt_buf_chain_add(&c->write, b);
        return;
    }

    if (c->mem_pool == NULL) {
        mp = nxt_mem_pool_create(256);
        if (nxt_slow_path(mp == NULL)) {
            close(c->socket.fd);
            return;
        }

        c->mem_pool = mp;
        nxt_app_conn_update(task->thread, c, &nxt_main_log);
    }

    if (c->socket.timedout || c->socket.error != 0) {
        nxt_buf_chain_add(&nxt_app_buf_done, b);
        nxt_work_queue_add(c->write_work_queue, nxt_app_delivery_completion,
                           task, c, NULL);
        return;
    }

    c->write = b;
    c->write_state = &nxt_app_delivery_write_state;

    nxt_event_conn_write(task->thread->engine, c);
}


static const nxt_event_conn_state_t  nxt_app_delivery_write_state
    nxt_aligned(64) =
{
    NXT_EVENT_NO_BUF_PROCESS,
    NXT_EVENT_TIMER_AUTORESET,

    nxt_app_delivery_ready,
    NULL,
    nxt_app_delivery_error,

    nxt_app_delivery_timeout,
    nxt_app_delivery_timer_value,
    0,
};


static void
nxt_app_delivery_ready(nxt_task_t *task, void *obj, void *data)
{
    nxt_buf_t         *b, *next;
    nxt_event_conn_t  *c;

    c = obj;

    nxt_debug(task, "app delivery ready");

    for (b = c->write; b != NULL; b = next) {

        if (nxt_buf_is_mem(b)) {
            if (b->mem.pos != b->mem.free) {
                break;
            }
        }

        next = b->next;
        b->next = nxt_app_buf_done;
        nxt_app_buf_done = b;
    }

    nxt_work_queue_add(c->write_work_queue,
                       nxt_app_delivery_completion, task, c, NULL);
}


static const nxt_event_conn_state_t  nxt_app_delivery_close_state
    nxt_aligned(64) =
{
    NXT_EVENT_NO_BUF_PROCESS,
    NXT_EVENT_TIMER_NO_AUTORESET,

    nxt_app_close_request,
    NULL,
    NULL,

    NULL,
    NULL,
    0,
};


static void
nxt_app_delivery_completion(nxt_task_t *task, void *obj, void *data)
{
    nxt_buf_t          *b, *bn, *free;
    nxt_event_conn_t   *c;
    nxt_app_request_t  *r;

    nxt_debug(task, "app delivery completion");

    free = NULL;

    for (b = nxt_app_buf_done; b; b = bn) {
        bn = b->next;

        if (nxt_buf_is_mem(b)) {
            b->mem.pos = b->mem.start;
            b->mem.free = b->mem.start;

            b->next = free;
            free = b;

            continue;
        }

        if (nxt_buf_is_last(b)) {
            r = (nxt_app_request_t *) b->parent;

            c = r->event_conn;
            c->write_state = &nxt_app_delivery_close_state;

            nxt_event_conn_close(task->thread->engine, c);
        }
    }

    nxt_app_buf_done = NULL;

    if (free == NULL) {
        return;
    }

    if (nxt_slow_path(nxt_thread_mutex_lock(&nxt_app_mutex) != NXT_OK)) {
        return;
    }

    nxt_buf_chain_add(&nxt_app_buf_free, free);

    (void) nxt_thread_mutex_unlock(&nxt_app_mutex);

    nxt_thread_time_update(task->thread);

    (void) nxt_thread_cond_signal(&nxt_app_cond);
}


static void
nxt_app_delivery_error(nxt_task_t *task, void *obj, void *data)
{
    nxt_event_conn_t  *c;

    c = obj;

    nxt_debug(task, "app delivery error");

    nxt_app_delivery_done(task, c);
}


static void
nxt_app_delivery_timeout(nxt_task_t *task, void *obj, void *data)
{
    nxt_event_conn_t  *c;

    c = obj;

    nxt_debug(task, "app delivery timeout");

    nxt_app_delivery_done(task, c);
}


static nxt_msec_t
nxt_app_delivery_timer_value(nxt_event_conn_t *c, uintptr_t data)
{
    /* 30000 ms */
    return 30000;
}


static void
nxt_app_delivery_done(nxt_task_t *task, nxt_event_conn_t *c)
{
    if (c->write == NULL) {
        return;
    }

    nxt_debug(task, "app delivery done");

    nxt_buf_chain_add(&nxt_app_buf_done, c->write);

    c->write = NULL;

    nxt_work_queue_add(c->write_work_queue,
                       nxt_app_delivery_completion, task, c, NULL);
}


static void
nxt_app_close_request(nxt_task_t *task, void *obj, void *data)
{
    nxt_event_conn_t   *c;
    nxt_app_request_t  *r;

    c = obj;

    nxt_debug(task, "app close connection");

    r = c->socket.data;

    nxt_mem_pool_destroy(c->mem_pool);
    nxt_mem_pool_destroy(r->mem_pool);
}