summaryrefslogblamecommitdiffhomepage
path: root/src/nxt_router.c
blob: c87b48b8e2ccd2480f792fa8dcca0daea368eba9 (plain) (tree)
1
2
3
4
5
6
7
8
9
10






                                     
                       

 
































































                                                                               




                                                     
















                                              


                         




























                                                                   



                  



























































                                                                       
                
                                                                    
 


                             
 








































































                                                                                


                           


                          



















































































                                                                         


                         
          
 







                                                 
 




















































                                                                             

                         
 




                                                                             
 































                                                                             
 

                                                                             


                         












                                                                             


                         







































































                                                                               

                  



















































































































                                                                        


                                    

















































































































































































                                                                         
                                       




























































































                                                                           





                                                           






























































































































                                                                  

/*
 * Copyright (C) Igor Sysoev
 * Copyright (C) Valentin V. Bartenev
 * Copyright (C) NGINX, Inc.
 */

#include <nxt_router.h>


static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task,
    nxt_router_t *router);
static void nxt_router_listen_sockets_sort(nxt_router_t *router,
    nxt_router_temp_conf_t *tmcf);

static nxt_int_t nxt_router_stub_conf(nxt_task_t *task,
    nxt_router_temp_conf_t *tmcf);
static nxt_int_t nxt_router_listen_sockets_stub_create(nxt_task_t *task,
    nxt_router_temp_conf_t *tmcf);
static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
    nxt_mem_pool_t *mp, nxt_sockaddr_t *sa);
static nxt_sockaddr_t *nxt_router_listen_sockaddr_stub(nxt_task_t *task,
    nxt_mem_pool_t *mp, uint32_t port);

static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
    nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
    const nxt_event_interface_t *interface);
static nxt_int_t nxt_router_engine_conf_create(nxt_task_t *task,
    nxt_mem_pool_t *mp, nxt_router_temp_conf_t *tmcf,
    nxt_router_engine_conf_t *recf);
static nxt_int_t nxt_router_engine_conf_update(nxt_task_t *task,
    nxt_mem_pool_t *mp, nxt_router_temp_conf_t *tmcf,
    nxt_router_engine_conf_t *recf);
static nxt_int_t nxt_router_engine_conf_delete(nxt_task_t *task,
    nxt_mem_pool_t *mp, nxt_router_temp_conf_t *tmcf,
    nxt_router_engine_conf_t *recf);
static nxt_int_t nxt_router_engine_joints_create(nxt_task_t *task,
    nxt_mem_pool_t *mp, nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
    nxt_array_t *array, nxt_work_handler_t handler);
static nxt_int_t nxt_router_engine_joints_delete(nxt_task_t *task,
    nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array);

static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
    nxt_router_temp_conf_t *tmcf);
static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
    nxt_event_engine_t *engine);

static void nxt_router_engines_post(nxt_router_temp_conf_t *tmcf);
static void nxt_router_engine_post(nxt_router_engine_conf_t *recf);

static void nxt_router_thread_start(void *data);
static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
    void *data);
static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
    void *data);
static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
    void *data);
static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
    void *data);
static void nxt_router_listen_socket_release(nxt_task_t *task,
    nxt_socket_conf_joint_t *joint);
static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
    void *data);
static void nxt_router_conf_release(nxt_task_t *task,
    nxt_socket_conf_joint_t *joint);

static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data);
static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj,
    void *data);
static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data);
static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data);
static void nxt_router_conn_error(nxt_task_t *task, void *obj, void *data);
static void nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data);
static nxt_msec_t nxt_router_conn_timeout_value(nxt_event_conn_t *c,
    uintptr_t data);


nxt_int_t
nxt_router_start(nxt_task_t *task, nxt_runtime_t *rt)
{
    nxt_int_t                    ret;
    nxt_router_t                 *router;
    nxt_router_temp_conf_t       *tmcf;
    const nxt_event_interface_t  *interface;

    router = nxt_zalloc(sizeof(nxt_router_t));
    if (nxt_slow_path(router == NULL)) {
        return NXT_ERROR;
    }

    nxt_queue_init(&router->engines);
    nxt_queue_init(&router->sockets);

    /**/

    tmcf = nxt_router_temp_conf(task, router);
    if (nxt_slow_path(tmcf == NULL)) {
        return NXT_ERROR;
    }

    ret = nxt_router_stub_conf(task, tmcf);
    if (nxt_slow_path(ret != NXT_OK)) {
        return ret;
    }

    nxt_router_listen_sockets_sort(router, tmcf);

    ret = nxt_router_listen_sockets_stub_create(task, tmcf);
    if (nxt_slow_path(ret != NXT_OK)) {
        return ret;
    }

    interface = nxt_service_get(rt->services, "engine", NULL);

    ret = nxt_router_engines_create(task, router, tmcf, interface);
    if (nxt_slow_path(ret != NXT_OK)) {
        return ret;
    }

    ret = nxt_router_threads_create(task, rt, tmcf);
    if (nxt_slow_path(ret != NXT_OK)) {
        return ret;
    }

    nxt_router_engines_post(tmcf);

    nxt_queue_add(&router->sockets, &tmcf->updating);
    nxt_queue_add(&router->sockets, &tmcf->creating);

    return NXT_OK;
}


static nxt_router_temp_conf_t *
nxt_router_temp_conf(nxt_task_t *task, nxt_router_t *router)
{
    nxt_mem_pool_t          *mp, *tmp;
    nxt_router_conf_t       *rtcf;
    nxt_router_temp_conf_t  *tmcf;

    mp = nxt_mem_pool_create(1024);
    if (nxt_slow_path(mp == NULL)) {
        return NULL;
    }

    rtcf = nxt_mem_zalloc(mp, sizeof(nxt_router_conf_t));
    if (nxt_slow_path(rtcf == NULL)) {
        goto fail;
    }

    rtcf->mem_pool = mp;
    rtcf->router = router;
    rtcf->count = 1;

    tmp = nxt_mem_pool_create(1024);
    if (nxt_slow_path(tmp == NULL)) {
        goto fail;
    }

    tmcf = nxt_mem_zalloc(tmp, sizeof(nxt_router_temp_conf_t));
    if (nxt_slow_path(tmcf == NULL)) {
        goto temp_fail;
    }

    tmcf->mem_pool = tmp;
    tmcf->conf = rtcf;

    tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
                                     sizeof(nxt_router_engine_conf_t));
    if (nxt_slow_path(tmcf->engines == NULL)) {
        goto temp_fail;
    }

    nxt_queue_init(&tmcf->deleting);
    nxt_queue_init(&tmcf->keeping);
    nxt_queue_init(&tmcf->updating);
    nxt_queue_init(&tmcf->pending);
    nxt_queue_init(&tmcf->creating);

    return tmcf;

temp_fail:

    nxt_mem_pool_destroy(tmp);

fail:

    nxt_mem_pool_destroy(mp);

    return NULL;
}


static nxt_int_t
nxt_router_stub_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
{
    nxt_sockaddr_t     *sa;
    nxt_mem_pool_t     *mp;
    nxt_socket_conf_t  *skcf;

    tmcf->conf->threads = 1;

    mp = tmcf->conf->mem_pool;

    sa = nxt_router_listen_sockaddr_stub(task, mp, 8000);
    skcf = nxt_router_socket_conf(task, mp, sa);

    skcf->listen.handler = nxt_router_conn_init;
    skcf->listen.mem_pool_size = nxt_listen_socket_pool_min_size(&skcf->listen)
                        + sizeof(nxt_event_conn_proxy_t)
                        + sizeof(nxt_event_conn_t)
                        + 4 * sizeof(nxt_buf_t);

    skcf->header_buffer_size = 2048;
    skcf->large_header_buffer_size = 8192;
    skcf->header_read_timeout = 5000;

    nxt_queue_insert_tail(&tmcf->pending, &skcf->link);

    sa = nxt_router_listen_sockaddr_stub(task, mp, 8001);
    skcf = nxt_router_socket_conf(task, mp, sa);

    skcf->listen.handler = nxt_stream_connection_init;
    skcf->listen.mem_pool_size = nxt_listen_socket_pool_min_size(&skcf->listen)
                        + sizeof(nxt_event_conn_proxy_t)
                        + sizeof(nxt_event_conn_t)
                        + 4 * sizeof(nxt_buf_t);

    skcf->header_read_timeout = 5000;

    nxt_queue_insert_tail(&tmcf->pending, &skcf->link);

    return NXT_OK;
}


static nxt_socket_conf_t *
nxt_router_socket_conf(nxt_task_t *task, nxt_mem_pool_t *mp, nxt_sockaddr_t *sa)
{
    nxt_socket_conf_t  *conf;

    conf = nxt_mem_zalloc(mp, sizeof(nxt_socket_conf_t));
    if (nxt_slow_path(conf == NULL)) {
        return NULL;
    }

    conf->listen.sockaddr = sa;

    conf->listen.socket = -1;
    conf->listen.backlog = NXT_LISTEN_BACKLOG;
    conf->listen.flags = NXT_NONBLOCK;
    conf->listen.read_after_accept = 1;

    return conf;
}


static nxt_sockaddr_t *
nxt_router_listen_sockaddr_stub(nxt_task_t *task, nxt_mem_pool_t *mp,
    uint32_t port)
{
    nxt_sockaddr_t      *sa;
    struct sockaddr_in  sin;

    nxt_memzero(&sin, sizeof(struct sockaddr_in));

    sin.sin_family = AF_INET;
    sin.sin_port = htons(port);

    sa = nxt_sockaddr_create(mp, (struct sockaddr *) &sin,
                             sizeof(struct sockaddr_in), NXT_INET_ADDR_STR_LEN);
    if (nxt_slow_path(sa == NULL)) {
        return NULL;
    }

    sa->type = SOCK_STREAM;

    nxt_sockaddr_text(sa);

    return sa;
}


static void
nxt_router_listen_sockets_sort(nxt_router_t *router,
    nxt_router_temp_conf_t *tmcf)
{
    nxt_queue_link_t   *nqlk, *oqlk, *next;
    nxt_socket_conf_t  *nskcf, *oskcf;

    for (nqlk = nxt_queue_first(&tmcf->pending);
         nqlk != nxt_queue_tail(&tmcf->pending);
         nqlk = next)
    {
        next = nxt_queue_next(nqlk);
        nskcf = nxt_queue_link_data(nqlk, nxt_socket_conf_t, link);

        for (oqlk = nxt_queue_first(&router->sockets);
             oqlk != nxt_queue_tail(&router->sockets);
             oqlk = nxt_queue_next(oqlk))
        {
            oskcf = nxt_queue_link_data(oqlk, nxt_socket_conf_t, link);

            if (nxt_sockaddr_cmp(nskcf->listen.sockaddr,
                                 oskcf->listen.sockaddr))
            {
                nxt_queue_remove(oqlk);
                nxt_queue_insert_tail(&tmcf->keeping, oqlk);

                nxt_queue_remove(nqlk);
                nxt_queue_insert_tail(&tmcf->updating, nqlk);

                break;
            }
        }
    }

    nxt_queue_add(&tmcf->deleting, &router->sockets);
}


static nxt_int_t
nxt_router_listen_sockets_stub_create(nxt_task_t *task,
    nxt_router_temp_conf_t *tmcf)
{
    nxt_queue_link_t   *qlk, *nqlk;
    nxt_socket_conf_t  *skcf;

    for (qlk = nxt_queue_first(&tmcf->pending);
         qlk != nxt_queue_tail(&tmcf->pending);
         qlk = nqlk)
    {
        skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);

        if (nxt_listen_socket_create(task, &skcf->listen, 0) != NXT_OK) {
            return NXT_ERROR;
        }

        nqlk = nxt_queue_next(qlk);
        nxt_queue_remove(qlk);
        nxt_queue_insert_tail(&tmcf->creating, qlk);
    }

    return NXT_OK;
}


static nxt_int_t
nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
    nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
{
    nxt_int_t                 ret;
    nxt_uint_t                n, threads;
    nxt_mem_pool_t            *mp;
    nxt_queue_link_t          *qlk;
    nxt_router_engine_conf_t  *recf;

    mp = tmcf->conf->mem_pool;
    threads = tmcf->conf->threads;

    tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
                                     sizeof(nxt_router_engine_conf_t));
    if (nxt_slow_path(tmcf->engines == NULL)) {
        return NXT_ERROR;
    }

    n = 0;

    for (qlk = nxt_queue_first(&router->engines);
         qlk != nxt_queue_tail(&router->engines);
         qlk = nxt_queue_next(qlk))
    {
        recf = nxt_array_zero_add(tmcf->engines);
        if (nxt_slow_path(recf == NULL)) {
            return NXT_ERROR;
        }

        recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link);
        // STUB
        recf->task = recf->engine->task;

        if (n < threads) {
            ret = nxt_router_engine_conf_update(task, mp, tmcf, recf);

        } else {
            ret = nxt_router_engine_conf_delete(task, mp, tmcf, recf);
        }

        if (nxt_slow_path(ret != NXT_OK)) {
            return ret;
        }

        n++;
    }

    tmcf->new_threads = n;

    while (n < threads) {
        recf = nxt_array_zero_add(tmcf->engines);
        if (nxt_slow_path(recf == NULL)) {
            return NXT_ERROR;
        }

        recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0);
        if (nxt_slow_path(recf->engine == NULL)) {
            return NXT_ERROR;
        }
        // STUB
        recf->task = recf->engine->task;

        ret = nxt_router_engine_conf_create(task, mp, tmcf, recf);
        if (nxt_slow_path(ret != NXT_OK)) {
            return ret;
        }

        n++;
    }

    return NXT_OK;
}


static nxt_int_t
nxt_router_engine_conf_create(nxt_task_t *task, nxt_mem_pool_t *mp,
    nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf)
{
    nxt_int_t  ret;

    recf->creating = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t));
    if (nxt_slow_path(recf->creating == NULL)) {
        return NXT_ERROR;
    }

    ret = nxt_router_engine_joints_create(task, mp, recf, &tmcf->creating,
                            recf->creating, nxt_router_listen_socket_create);
    if (nxt_slow_path(ret != NXT_OK)) {
        return ret;
    }

    return nxt_router_engine_joints_create(task, mp, recf, &tmcf->updating,
                            recf->creating, nxt_router_listen_socket_create);
}


static nxt_int_t
nxt_router_engine_conf_update(nxt_task_t *task, nxt_mem_pool_t *mp,
    nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf)
{
    nxt_int_t  ret;

    recf->creating = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t));
    if (nxt_slow_path(recf->creating == NULL)) {
        return NXT_ERROR;
    }

    ret = nxt_router_engine_joints_create(task, mp, recf, &tmcf->creating,
                            recf->creating, nxt_router_listen_socket_create);
    if (nxt_slow_path(ret != NXT_OK)) {
        return ret;
    }

    recf->updating = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t));
    if (nxt_slow_path(recf->updating == NULL)) {
        return NXT_ERROR;
    }

    ret = nxt_router_engine_joints_create(task, mp, recf, &tmcf->updating,
                            recf->updating, nxt_router_listen_socket_update);
    if (nxt_slow_path(ret != NXT_OK)) {
        return ret;
    }

    recf->deleting = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t));
    if (nxt_slow_path(recf->deleting == NULL)) {
        return NXT_ERROR;
    }

    return nxt_router_engine_joints_delete(task, recf, &tmcf->deleting,
                                           recf->deleting);
}


static nxt_int_t
nxt_router_engine_conf_delete(nxt_task_t *task, nxt_mem_pool_t *mp,
    nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf)
{
    nxt_int_t  ret;

    recf->deleting = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t));
    if (nxt_slow_path(recf->deleting == NULL)) {
        return NXT_ERROR;
    }

    ret = nxt_router_engine_joints_delete(task, recf, &tmcf->updating,
                                          recf->deleting);
    if (nxt_slow_path(ret != NXT_OK)) {
        return ret;
    }

    return nxt_router_engine_joints_delete(task, recf, &tmcf->deleting,
                                           recf->deleting);
}


static nxt_int_t
nxt_router_engine_joints_create(nxt_task_t *task, nxt_mem_pool_t *mp,
    nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array,
    nxt_work_handler_t handler)
{
    nxt_work_t               *work;
    nxt_queue_link_t         *qlk;
    nxt_socket_conf_joint_t  *joint;

    for (qlk = nxt_queue_first(sockets);
         qlk != nxt_queue_tail(sockets);
         qlk = nxt_queue_next(qlk))
    {
        work = nxt_array_add(array);
        if (nxt_slow_path(work == NULL)) {
            return NXT_ERROR;
        }

        work->next = NULL;
        work->handler = handler;
        work->task = &recf->task;
        work->obj = recf->engine;

        joint = nxt_mem_alloc(mp, sizeof(nxt_socket_conf_joint_t));
        if (nxt_slow_path(joint == NULL)) {
            return NXT_ERROR;
        }

        work->data = joint;

        joint->count = 1;
        joint->socket_conf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
    }

    return NXT_OK;
}


static nxt_int_t
nxt_router_engine_joints_delete(nxt_task_t *task,
    nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array)
{
    nxt_work_t        *work;
    nxt_queue_link_t  *qlk;

    for (qlk = nxt_queue_first(sockets);
         qlk != nxt_queue_tail(sockets);
         qlk = nxt_queue_next(qlk))
    {
        work = nxt_array_add(array);
        if (nxt_slow_path(work == NULL)) {
            return NXT_ERROR;
        }

        work->next = NULL;
        work->handler = nxt_router_listen_socket_delete;
        work->task = &recf->task;
        work->obj = recf->engine;
        work->data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
    }

    return NXT_OK;
}


static nxt_int_t
nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
    nxt_router_temp_conf_t *tmcf)
{
    nxt_int_t                 ret;
    nxt_uint_t                i, threads;
    nxt_router_engine_conf_t  *recf;

    recf = tmcf->engines->elts;
    threads = tmcf->conf->threads;

    for (i = tmcf->new_threads; i < threads; i++) {
        ret = nxt_router_thread_create(task, rt, recf[i].engine);
        if (nxt_slow_path(ret != NXT_OK)) {
            return ret;
        }
    }

    return NXT_OK;
}


static nxt_int_t
nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
    nxt_event_engine_t *engine)
{
    nxt_int_t            ret;
    nxt_thread_link_t    *link;
    nxt_thread_handle_t  handle;

    link = nxt_zalloc(sizeof(nxt_thread_link_t));

    if (nxt_slow_path(link == NULL)) {
        return NXT_ERROR;
    }

    link->start = nxt_router_thread_start;
    link->engine = engine;
    link->work.handler = nxt_router_thread_exit_handler;
    link->work.task = task;
    link->work.data = link;

    nxt_queue_insert_tail(&rt->engines, &engine->link);

    ret = nxt_thread_create(&handle, link);

    if (nxt_slow_path(ret != NXT_OK)) {
        nxt_queue_remove(&engine->link);
    }

    return ret;
}


static void
nxt_router_engines_post(nxt_router_temp_conf_t *tmcf)
{
    nxt_uint_t                n;
    nxt_router_engine_conf_t  *recf;

    recf = tmcf->engines->elts;

    for (n = tmcf->engines->nelts; n != 0; n--) {
        nxt_router_engine_post(recf);
        recf++;
    }
}


static void
nxt_router_engine_post(nxt_router_engine_conf_t *recf)
{
    nxt_uint_t  n;
    nxt_work_t  *work;

    work = recf->creating->elts;

    for (n = recf->creating->nelts; n != 0; n--) {
        nxt_event_engine_post(recf->engine, work);
        work++;
    }
}


static void
nxt_router_thread_start(void *data)
{
    nxt_thread_t        *thread;
    nxt_thread_link_t   *link;
    nxt_event_engine_t  *engine;

    link = data;
    engine = link->engine;

    thread = nxt_thread();

    /* STUB */
    thread->runtime = engine->task.thread->runtime;

    engine->task.thread = thread;
    engine->task.log = thread->log;
    thread->engine = engine;
    thread->fiber = &engine->fibers->fiber;

    engine->mem_pool = nxt_mem_cache_pool_create(4096, 1024, 1024, 64);

    nxt_event_engine_start(engine);
}


static void
nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data)
{
    nxt_listen_event_t       *listen;
    nxt_listen_socket_t      *ls;
    nxt_socket_conf_joint_t  *joint;

    joint = data;

    ls = &joint->socket_conf->listen;

    listen = nxt_listen_event(task, ls);
    if (nxt_slow_path(listen == NULL)) {
        nxt_router_listen_socket_release(task, joint);
        return;
    }

    listen->socket.data = joint;
}


nxt_inline nxt_listen_event_t *
nxt_router_listen_event(nxt_queue_t *listen_connections,
    nxt_socket_conf_t *skcf)
{
    nxt_socket_t        socket;
    nxt_queue_link_t    *link;
    nxt_listen_event_t  *listen;

    socket = skcf->listen.socket;

    for (link = nxt_queue_first(listen_connections);
         link != nxt_queue_tail(listen_connections);
         link = nxt_queue_next(link))
    {
        listen = nxt_queue_link_data(link, nxt_listen_event_t, link);

        if (socket == listen->socket.fd) {
            return listen;
        }
    }

    return NULL;
}


static void
nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data)
{
    nxt_event_engine_t       *engine;
    nxt_listen_event_t       *listen;
    nxt_socket_conf_joint_t  *joint, *old;

    engine = obj;
    joint = data;

    listen = nxt_router_listen_event(&engine->listen_connections,
                                     joint->socket_conf);

    old = listen->socket.data;
    listen->socket.data = joint;

    nxt_router_conf_release(task, old);
}


static void
nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data)
{
    nxt_socket_conf_t   *skcf;
    nxt_listen_event_t  *listen;
    nxt_event_engine_t  *engine;

    engine = obj;
    skcf = data;

    listen = nxt_router_listen_event(&engine->listen_connections, skcf);

    nxt_fd_event_delete(engine, &listen->socket);

    listen->timer.handler = nxt_router_listen_socket_close;
    listen->timer.work_queue = &engine->fast_work_queue;

    nxt_timer_add(engine, &listen->timer, 0);
}


static void
nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data)
{
    nxt_timer_t              *timer;
    nxt_listen_event_t       *listen;
    nxt_socket_conf_joint_t  *joint;

    timer = obj;
    listen = nxt_timer_data(timer, nxt_listen_event_t, timer);
    joint = listen->socket.data;

    nxt_queue_remove(&listen->link);
    nxt_free(listen);

    nxt_router_listen_socket_release(task, joint);
}


static void
nxt_router_listen_socket_release(nxt_task_t *task,
    nxt_socket_conf_joint_t *joint)
{
    nxt_socket_t           s;
    nxt_listen_socket_t    *ls;
    nxt_thread_spinlock_t  *lock;

    s = -1;
    ls = &joint->socket_conf->listen;
    lock = &joint->socket_conf->router_conf->router->lock;

    nxt_thread_spin_lock(lock);

    if (--ls->count == 0) {
        s = ls->socket;
        ls->socket = -1;
    }

    nxt_thread_spin_unlock(lock);

    if (s != -1) {
        nxt_socket_close(task, s);
    }

    nxt_router_conf_release(task, joint);
}


static void
nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
{
    nxt_socket_conf_t      *skcf;
    nxt_router_conf_t      *rtcf;
    nxt_thread_spinlock_t  *lock;

    nxt_debug(task, "conf joint count: %D", joint->count);

    if (--joint->count != 0) {
        return;
    }

    nxt_queue_remove(&joint->link);

    skcf = joint->socket_conf;
    rtcf = skcf->router_conf;
    lock = &rtcf->router->lock;

    nxt_thread_spin_lock(lock);

    if (--skcf->count != 0) {
        rtcf = NULL;

    } else {
        nxt_queue_remove(&skcf->link);

        if (--rtcf->count != 0) {
            rtcf = NULL;
        }
    }

    nxt_thread_spin_unlock(lock);

    if (rtcf != NULL) {
        nxt_mem_pool_destroy(rtcf->mem_pool);
    }

    if (nxt_queue_is_empty(&joint->engine->joints)) {
        nxt_thread_exit(task->thread);
    }
}


static void
nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data)
{
    nxt_thread_link_t    *link;
    nxt_event_engine_t   *engine;
    nxt_thread_handle_t  handle;

    handle = (nxt_thread_handle_t) obj;
    link = data;

    nxt_thread_wait(handle);

    engine = link->engine;

    nxt_queue_remove(&engine->link);

    nxt_mem_cache_pool_destroy(engine->mem_pool);

    nxt_event_engine_free(engine);

    nxt_free(link);

    // TODO: free port
}


static const nxt_event_conn_state_t  nxt_router_conn_read_state
    nxt_aligned(64) =
{
    .ready_handler = nxt_router_conn_http_header_parse,
    .close_handler = nxt_router_conn_close,
    .error_handler = nxt_router_conn_error,

    .timer_handler = nxt_router_conn_timeout,
    .timer_value = nxt_router_conn_timeout_value,
    .timer_data = offsetof(nxt_socket_conf_t, header_read_timeout),
};


static void
nxt_router_conn_init(nxt_task_t *task, void *obj, void *data)
{
    size_t                   size;
    nxt_event_conn_t         *c;
    nxt_event_engine_t       *engine;
    nxt_socket_conf_joint_t  *joint;

    c = obj;
    joint = data;

    nxt_debug(task, "router conn init");

    joint->count++;

    size = joint->socket_conf->header_buffer_size;
    c->read = nxt_buf_mem_alloc(c->mem_pool, size, 0);

    c->socket.data = NULL;

    engine = task->thread->engine;
    c->read_work_queue = &engine->fast_work_queue;
    c->write_work_queue = &engine->fast_work_queue;

    c->read_state = &nxt_router_conn_read_state;

    nxt_event_conn_read(engine, c);
}


static const nxt_event_conn_state_t  nxt_router_conn_write_state
    nxt_aligned(64) =
{
    .ready_handler = nxt_router_conn_close,
    .close_handler = nxt_router_conn_close,
    .error_handler = nxt_router_conn_error,
};


static void
nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data)
{
    size_t                    size;
    nxt_int_t                 ret;
    nxt_buf_t                 *b;
    nxt_event_conn_t          *c;
    nxt_socket_conf_joint_t   *joint;
    nxt_http_request_parse_t  *rp;

    c = obj;
    rp = data;

    nxt_debug(task, "router conn http header parse");

    if (rp == NULL) {
        rp = nxt_mem_zalloc(c->mem_pool, sizeof(nxt_http_request_parse_t));
        if (nxt_slow_path(rp == NULL)) {
            nxt_router_conn_close(task, c, data);
            return;
        }

        c->socket.data = rp;

        ret = nxt_http_parse_request_init(rp, c->mem_pool);
        if (nxt_slow_path(ret != NXT_OK)) {
            nxt_router_conn_close(task, c, data);
            return;
        }
    }

    ret = nxt_http_parse_request(rp, &c->read->mem);

    nxt_debug(task, "http parse request: %d", ret);

    switch (nxt_expect(NXT_DONE, ret)) {

    case NXT_DONE:
        break;

    case NXT_ERROR:
        nxt_router_conn_close(task, c, data);
        return;

    default:  /* NXT_AGAIN */

        if (c->read->mem.free == c->read->mem.end) {
            joint = c->listen->socket.data;
            size = joint->socket_conf->large_header_buffer_size,

            b = nxt_buf_mem_alloc(c->mem_pool, size, 0);
            if (nxt_slow_path(b == NULL)) {
                nxt_router_conn_close(task, c, data);
                return;
            }

            size = c->read->mem.free - c->read->mem.pos;
            nxt_memcpy(b->mem.pos, c->read->mem.pos, size);

            b->mem.free += size;
            c->read = b;
        }

        nxt_event_conn_read(task->thread->engine, c);
        return;
    }

    c->write = c->read;
    c->write->mem.pos = c->write->mem.start;
    c->write_state = &nxt_router_conn_write_state;

    nxt_event_conn_write(task->thread->engine, c);
}


static const nxt_event_conn_state_t  nxt_router_conn_close_state
    nxt_aligned(64) =
{
    .ready_handler = nxt_router_conn_free,
};


static void
nxt_router_conn_close(nxt_task_t *task, void *obj, void *data)
{
    nxt_event_conn_t  *c;

    c = obj;

    nxt_debug(task, "router conn close");

    c->write_state = &nxt_router_conn_close_state;

    nxt_event_conn_close(task->thread->engine, c);
}


static void
nxt_router_conn_free(nxt_task_t *task, void *obj, void *data)
{
    nxt_event_conn_t         *c;
    nxt_socket_conf_joint_t  *joint;

    c = obj;

    nxt_debug(task, "router conn close done");

    joint = c->listen->socket.data;
    nxt_router_conf_release(task, joint);

    nxt_mem_pool_destroy(c->mem_pool);
}


static void
nxt_router_conn_error(nxt_task_t *task, void *obj, void *data)
{
    nxt_event_conn_t  *c;

    c = obj;

    nxt_debug(task, "router conn error");

    c->write_state = &nxt_router_conn_close_state;

    nxt_event_conn_close(task->thread->engine, c);
}


static void
nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data)
{
    nxt_timer_t       *timer;
    nxt_event_conn_t  *c;

    timer = obj;

    nxt_debug(task, "router conn timeout");

    c = nxt_event_read_timer_conn(timer);

    c->write_state = &nxt_router_conn_close_state;

    nxt_event_conn_close(task->thread->engine, c);
}


static nxt_msec_t
nxt_router_conn_timeout_value(nxt_event_conn_t *c, uintptr_t data)
{
    nxt_socket_conf_joint_t  *joint;

    joint = c->listen->socket.data;

    return nxt_value_at(nxt_msec_t, joint->socket_conf, data);
}