diff options
author | Igor Sysoev <igor@sysoev.ru> | 2017-05-31 15:26:45 +0300 |
---|---|---|
committer | Igor Sysoev <igor@sysoev.ru> | 2017-05-31 15:26:45 +0300 |
commit | 5bc011afad76fe7576ef0e0ed7ca7eb5417cf338 (patch) | |
tree | 0db06be61c62ecabb5225f69688da8ab4424aa36 /src/nxt_router.c | |
parent | dee819daabe6e7ab462beac779ab0dfd7d648844 (diff) | |
download | unit-5bc011afad76fe7576ef0e0ed7ca7eb5417cf338.tar.gz unit-5bc011afad76fe7576ef0e0ed7ca7eb5417cf338.tar.bz2 |
Skeleton of router configuration and request processing.
Diffstat (limited to '')
-rw-r--r-- | src/nxt_router.c | 1076 |
1 files changed, 1039 insertions, 37 deletions
diff --git a/src/nxt_router.c b/src/nxt_router.c index caef8503..1f96a1df 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -5,81 +5,1083 @@ * Copyright (C) NGINX, Inc. */ -#include <nxt_main.h> -#include <nxt_runtime.h> -#include <nxt_master_process.h> +#include <nxt_router.h> -static nxt_int_t nxt_router_listen_socket(nxt_task_t *task, nxt_runtime_t *rt); +static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task, + nxt_router_t *router); +static void nxt_router_listen_sockets_sort(nxt_router_t *router, + nxt_router_temp_conf_t *tmcf); + +static nxt_int_t nxt_router_stub_conf(nxt_task_t *task, + nxt_router_temp_conf_t *tmcf); +static nxt_int_t nxt_router_listen_sockets_stub_create(nxt_task_t *task, + nxt_router_temp_conf_t *tmcf); +static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task, + nxt_mem_pool_t *mp, nxt_sockaddr_t *sa); +static nxt_sockaddr_t *nxt_router_listen_sockaddr_stub(nxt_task_t *task, + nxt_mem_pool_t *mp, uint32_t port); + +static nxt_int_t nxt_router_engines_create(nxt_task_t *task, + nxt_router_t *router, nxt_router_temp_conf_t *tmcf, + const nxt_event_interface_t *interface); +static nxt_int_t nxt_router_engine_conf_create(nxt_task_t *task, + nxt_mem_pool_t *mp, nxt_router_temp_conf_t *tmcf, + nxt_router_engine_conf_t *recf); +static nxt_int_t nxt_router_engine_conf_update(nxt_task_t *task, + nxt_mem_pool_t *mp, nxt_router_temp_conf_t *tmcf, + nxt_router_engine_conf_t *recf); +static nxt_int_t nxt_router_engine_conf_delete(nxt_task_t *task, + nxt_mem_pool_t *mp, nxt_router_temp_conf_t *tmcf, + nxt_router_engine_conf_t *recf); +static nxt_int_t nxt_router_engine_joints_create(nxt_task_t *task, + nxt_mem_pool_t *mp, nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, + nxt_array_t *array, nxt_work_handler_t handler); +static nxt_int_t nxt_router_engine_joints_delete(nxt_task_t *task, + nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array); + +static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, + nxt_router_temp_conf_t *tmcf); +static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, + nxt_event_engine_t *engine); + +static void nxt_router_engines_post(nxt_router_temp_conf_t *tmcf); +static void nxt_router_engine_post(nxt_router_engine_conf_t *recf); + +static void nxt_router_thread_start(void *data); +static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj, + void *data); +static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj, + void *data); +static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, + void *data); +static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj, + void *data); +static void nxt_router_listen_socket_release(nxt_task_t *task, + nxt_socket_conf_joint_t *joint); +static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, + void *data); +static void nxt_router_conf_release(nxt_task_t *task, + nxt_socket_conf_joint_t *joint); + +static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data); +static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, + void *data); +static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data); +static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data); +static void nxt_router_conn_error(nxt_task_t *task, void *obj, void *data); +static void nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data); +static nxt_msec_t nxt_router_conn_timeout_value(nxt_event_conn_t *c, + uintptr_t data); nxt_int_t nxt_router_start(nxt_task_t *task, nxt_runtime_t *rt) { - if (nxt_router_listen_socket(task, rt) != NXT_OK) { + nxt_int_t ret; + nxt_router_t *router; + nxt_router_temp_conf_t *tmcf; + const nxt_event_interface_t *interface; + + router = nxt_zalloc(sizeof(nxt_router_t)); + if (nxt_slow_path(router == NULL)) { + return NXT_ERROR; + } + + nxt_queue_init(&router->engines); + nxt_queue_init(&router->sockets); + + /**/ + + tmcf = nxt_router_temp_conf(task, router); + if (nxt_slow_path(tmcf == NULL)) { return NXT_ERROR; } + ret = nxt_router_stub_conf(task, tmcf); + if (nxt_slow_path(ret != NXT_OK)) { + return ret; + } + + nxt_router_listen_sockets_sort(router, tmcf); + + ret = nxt_router_listen_sockets_stub_create(task, tmcf); + if (nxt_slow_path(ret != NXT_OK)) { + return ret; + } + + interface = nxt_service_get(rt->services, "engine", NULL); + + ret = nxt_router_engines_create(task, router, tmcf, interface); + if (nxt_slow_path(ret != NXT_OK)) { + return ret; + } + + ret = nxt_router_threads_create(task, rt, tmcf); + if (nxt_slow_path(ret != NXT_OK)) { + return ret; + } + + nxt_router_engines_post(tmcf); + + nxt_queue_add(&router->sockets, &tmcf->updating); + nxt_queue_add(&router->sockets, &tmcf->creating); + return NXT_OK; } +static nxt_router_temp_conf_t * +nxt_router_temp_conf(nxt_task_t *task, nxt_router_t *router) +{ + nxt_mem_pool_t *mp, *tmp; + nxt_router_conf_t *rtcf; + nxt_router_temp_conf_t *tmcf; + + mp = nxt_mem_pool_create(1024); + if (nxt_slow_path(mp == NULL)) { + return NULL; + } + + rtcf = nxt_mem_zalloc(mp, sizeof(nxt_router_conf_t)); + if (nxt_slow_path(rtcf == NULL)) { + goto fail; + } + + rtcf->mem_pool = mp; + rtcf->router = router; + rtcf->count = 1; + + tmp = nxt_mem_pool_create(1024); + if (nxt_slow_path(tmp == NULL)) { + goto fail; + } + + tmcf = nxt_mem_zalloc(tmp, sizeof(nxt_router_temp_conf_t)); + if (nxt_slow_path(tmcf == NULL)) { + goto temp_fail; + } + + tmcf->mem_pool = tmp; + tmcf->conf = rtcf; + + tmcf->engines = nxt_array_create(tmcf->mem_pool, 4, + sizeof(nxt_router_engine_conf_t)); + if (nxt_slow_path(tmcf->engines == NULL)) { + goto temp_fail; + } + + nxt_queue_init(&tmcf->deleting); + nxt_queue_init(&tmcf->keeping); + nxt_queue_init(&tmcf->updating); + nxt_queue_init(&tmcf->pending); + nxt_queue_init(&tmcf->creating); + + return tmcf; + +temp_fail: + + nxt_mem_pool_destroy(tmp); + +fail: + + nxt_mem_pool_destroy(mp); + + return NULL; +} + + static nxt_int_t -nxt_router_listen_socket(nxt_task_t *task, nxt_runtime_t *rt) +nxt_router_stub_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) { - nxt_sockaddr_t *sa; - nxt_listen_socket_t *ls; + nxt_sockaddr_t *sa; + nxt_mem_pool_t *mp; + nxt_socket_conf_t *skcf; - sa = nxt_sockaddr_alloc(rt->mem_pool, sizeof(struct sockaddr_in), - NXT_INET_ADDR_STR_LEN); - if (sa == NULL) { - return NXT_ERROR; + tmcf->conf->threads = 1; + + mp = tmcf->conf->mem_pool; + + sa = nxt_router_listen_sockaddr_stub(task, mp, 8000); + skcf = nxt_router_socket_conf(task, mp, sa); + + skcf->listen.handler = nxt_router_conn_init; + skcf->listen.mem_pool_size = nxt_listen_socket_pool_min_size(&skcf->listen) + + sizeof(nxt_event_conn_proxy_t) + + sizeof(nxt_event_conn_t) + + 4 * sizeof(nxt_buf_t); + + skcf->header_buffer_size = 2048; + skcf->large_header_buffer_size = 8192; + skcf->header_read_timeout = 5000; + + nxt_queue_insert_tail(&tmcf->pending, &skcf->link); + + sa = nxt_router_listen_sockaddr_stub(task, mp, 8001); + skcf = nxt_router_socket_conf(task, mp, sa); + + skcf->listen.handler = nxt_stream_connection_init; + skcf->listen.mem_pool_size = nxt_listen_socket_pool_min_size(&skcf->listen) + + sizeof(nxt_event_conn_proxy_t) + + sizeof(nxt_event_conn_t) + + 4 * sizeof(nxt_buf_t); + + skcf->header_read_timeout = 5000; + + nxt_queue_insert_tail(&tmcf->pending, &skcf->link); + + return NXT_OK; +} + + +static nxt_socket_conf_t * +nxt_router_socket_conf(nxt_task_t *task, nxt_mem_pool_t *mp, nxt_sockaddr_t *sa) +{ + nxt_socket_conf_t *conf; + + conf = nxt_mem_zalloc(mp, sizeof(nxt_socket_conf_t)); + if (nxt_slow_path(conf == NULL)) { + return NULL; + } + + conf->listen.sockaddr = sa; + + conf->listen.socket = -1; + conf->listen.backlog = NXT_LISTEN_BACKLOG; + conf->listen.flags = NXT_NONBLOCK; + conf->listen.read_after_accept = 1; + + return conf; +} + + +static nxt_sockaddr_t * +nxt_router_listen_sockaddr_stub(nxt_task_t *task, nxt_mem_pool_t *mp, + uint32_t port) +{ + nxt_sockaddr_t *sa; + struct sockaddr_in sin; + + nxt_memzero(&sin, sizeof(struct sockaddr_in)); + + sin.sin_family = AF_INET; + sin.sin_port = htons(port); + + sa = nxt_sockaddr_create(mp, (struct sockaddr *) &sin, + sizeof(struct sockaddr_in), NXT_INET_ADDR_STR_LEN); + if (nxt_slow_path(sa == NULL)) { + return NULL; } sa->type = SOCK_STREAM; - sa->u.sockaddr_in.sin_family = AF_INET; - sa->u.sockaddr_in.sin_port = htons(8000); nxt_sockaddr_text(sa); - ls = nxt_runtime_listen_socket_add(rt, sa); - if (ls == NULL) { + return sa; +} + + +static void +nxt_router_listen_sockets_sort(nxt_router_t *router, + nxt_router_temp_conf_t *tmcf) +{ + nxt_queue_link_t *nqlk, *oqlk, *next; + nxt_socket_conf_t *nskcf, *oskcf; + + for (nqlk = nxt_queue_first(&tmcf->pending); + nqlk != nxt_queue_tail(&tmcf->pending); + nqlk = next) + { + next = nxt_queue_next(nqlk); + nskcf = nxt_queue_link_data(nqlk, nxt_socket_conf_t, link); + + for (oqlk = nxt_queue_first(&router->sockets); + oqlk != nxt_queue_tail(&router->sockets); + oqlk = nxt_queue_next(oqlk)) + { + oskcf = nxt_queue_link_data(oqlk, nxt_socket_conf_t, link); + + if (nxt_sockaddr_cmp(nskcf->listen.sockaddr, + oskcf->listen.sockaddr)) + { + nxt_queue_remove(oqlk); + nxt_queue_insert_tail(&tmcf->keeping, oqlk); + + nxt_queue_remove(nqlk); + nxt_queue_insert_tail(&tmcf->updating, nqlk); + + break; + } + } + } + + nxt_queue_add(&tmcf->deleting, &router->sockets); +} + + +static nxt_int_t +nxt_router_listen_sockets_stub_create(nxt_task_t *task, + nxt_router_temp_conf_t *tmcf) +{ + nxt_queue_link_t *qlk, *nqlk; + nxt_socket_conf_t *skcf; + + for (qlk = nxt_queue_first(&tmcf->pending); + qlk != nxt_queue_tail(&tmcf->pending); + qlk = nqlk) + { + skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); + + if (nxt_listen_socket_create(task, &skcf->listen, 0) != NXT_OK) { + return NXT_ERROR; + } + + nqlk = nxt_queue_next(qlk); + nxt_queue_remove(qlk); + nxt_queue_insert_tail(&tmcf->creating, qlk); + } + + return NXT_OK; +} + + +static nxt_int_t +nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router, + nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface) +{ + nxt_int_t ret; + nxt_uint_t n, threads; + nxt_mem_pool_t *mp; + nxt_queue_link_t *qlk; + nxt_router_engine_conf_t *recf; + + mp = tmcf->conf->mem_pool; + threads = tmcf->conf->threads; + + tmcf->engines = nxt_array_create(tmcf->mem_pool, threads, + sizeof(nxt_router_engine_conf_t)); + if (nxt_slow_path(tmcf->engines == NULL)) { return NXT_ERROR; } - ls->read_after_accept = 1; + n = 0; - ls->flags = NXT_NONBLOCK; + for (qlk = nxt_queue_first(&router->engines); + qlk != nxt_queue_tail(&router->engines); + qlk = nxt_queue_next(qlk)) + { + recf = nxt_array_zero_add(tmcf->engines); + if (nxt_slow_path(recf == NULL)) { + return NXT_ERROR; + } -#if 0 - /* STUB */ - wq = nxt_mem_zalloc(cf->mem_pool, sizeof(nxt_work_queue_t)); - if (wq == NULL) { + recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link); + // STUB + recf->task = recf->engine->task; + + if (n < threads) { + ret = nxt_router_engine_conf_update(task, mp, tmcf, recf); + + } else { + ret = nxt_router_engine_conf_delete(task, mp, tmcf, recf); + } + + if (nxt_slow_path(ret != NXT_OK)) { + return ret; + } + + n++; + } + + tmcf->new_threads = n; + + while (n < threads) { + recf = nxt_array_zero_add(tmcf->engines); + if (nxt_slow_path(recf == NULL)) { + return NXT_ERROR; + } + + recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0); + if (nxt_slow_path(recf->engine == NULL)) { + return NXT_ERROR; + } + // STUB + recf->task = recf->engine->task; + + ret = nxt_router_engine_conf_create(task, mp, tmcf, recf); + if (nxt_slow_path(ret != NXT_OK)) { + return ret; + } + + n++; + } + + return NXT_OK; +} + + +static nxt_int_t +nxt_router_engine_conf_create(nxt_task_t *task, nxt_mem_pool_t *mp, + nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf) +{ + nxt_int_t ret; + + recf->creating = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t)); + if (nxt_slow_path(recf->creating == NULL)) { return NXT_ERROR; } - nxt_work_queue_name(wq, "listen"); - /**/ - ls->work_queue = wq; -#endif - ls->handler = nxt_stream_connection_init; + ret = nxt_router_engine_joints_create(task, mp, recf, &tmcf->creating, + recf->creating, nxt_router_listen_socket_create); + if (nxt_slow_path(ret != NXT_OK)) { + return ret; + } - /* - * Connection memory pool chunk size is tunned to - * allocate the most data in one mem_pool chunk. - */ - ls->mem_pool_size = nxt_listen_socket_pool_min_size(ls) - + sizeof(nxt_event_conn_proxy_t) - + sizeof(nxt_event_conn_t) - + 4 * sizeof(nxt_buf_t); + return nxt_router_engine_joints_create(task, mp, recf, &tmcf->updating, + recf->creating, nxt_router_listen_socket_create); +} + + +static nxt_int_t +nxt_router_engine_conf_update(nxt_task_t *task, nxt_mem_pool_t *mp, + nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf) +{ + nxt_int_t ret; + + recf->creating = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t)); + if (nxt_slow_path(recf->creating == NULL)) { + return NXT_ERROR; + } + + ret = nxt_router_engine_joints_create(task, mp, recf, &tmcf->creating, + recf->creating, nxt_router_listen_socket_create); + if (nxt_slow_path(ret != NXT_OK)) { + return ret; + } + + recf->updating = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t)); + if (nxt_slow_path(recf->updating == NULL)) { + return NXT_ERROR; + } + + ret = nxt_router_engine_joints_create(task, mp, recf, &tmcf->updating, + recf->updating, nxt_router_listen_socket_update); + if (nxt_slow_path(ret != NXT_OK)) { + return ret; + } - if (nxt_listen_socket_create(task, ls, 0) != NXT_OK) { + recf->deleting = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t)); + if (nxt_slow_path(recf->deleting == NULL)) { return NXT_ERROR; } - if (nxt_event_conn_listen(task, ls) != NXT_OK) { + return nxt_router_engine_joints_delete(task, recf, &tmcf->deleting, + recf->deleting); +} + + +static nxt_int_t +nxt_router_engine_conf_delete(nxt_task_t *task, nxt_mem_pool_t *mp, + nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf) +{ + nxt_int_t ret; + + recf->deleting = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t)); + if (nxt_slow_path(recf->deleting == NULL)) { return NXT_ERROR; } + ret = nxt_router_engine_joints_delete(task, recf, &tmcf->updating, + recf->deleting); + if (nxt_slow_path(ret != NXT_OK)) { + return ret; + } + + return nxt_router_engine_joints_delete(task, recf, &tmcf->deleting, + recf->deleting); +} + + +static nxt_int_t +nxt_router_engine_joints_create(nxt_task_t *task, nxt_mem_pool_t *mp, + nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array, + nxt_work_handler_t handler) +{ + nxt_work_t *work; + nxt_queue_link_t *qlk; + nxt_socket_conf_joint_t *joint; + + for (qlk = nxt_queue_first(sockets); + qlk != nxt_queue_tail(sockets); + qlk = nxt_queue_next(qlk)) + { + work = nxt_array_add(array); + if (nxt_slow_path(work == NULL)) { + return NXT_ERROR; + } + + work->next = NULL; + work->handler = handler; + work->task = &recf->task; + work->obj = recf->engine; + + joint = nxt_mem_alloc(mp, sizeof(nxt_socket_conf_joint_t)); + if (nxt_slow_path(joint == NULL)) { + return NXT_ERROR; + } + + work->data = joint; + + joint->count = 1; + joint->socket_conf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); + } + + return NXT_OK; +} + + +static nxt_int_t +nxt_router_engine_joints_delete(nxt_task_t *task, + nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array) +{ + nxt_work_t *work; + nxt_queue_link_t *qlk; + + for (qlk = nxt_queue_first(sockets); + qlk != nxt_queue_tail(sockets); + qlk = nxt_queue_next(qlk)) + { + work = nxt_array_add(array); + if (nxt_slow_path(work == NULL)) { + return NXT_ERROR; + } + + work->next = NULL; + work->handler = nxt_router_listen_socket_delete; + work->task = &recf->task; + work->obj = recf->engine; + work->data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); + } + return NXT_OK; } + + +static nxt_int_t +nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, + nxt_router_temp_conf_t *tmcf) +{ + nxt_int_t ret; + nxt_uint_t i, threads; + nxt_router_engine_conf_t *recf; + + recf = tmcf->engines->elts; + threads = tmcf->conf->threads; + + for (i = tmcf->new_threads; i < threads; i++) { + ret = nxt_router_thread_create(task, rt, recf[i].engine); + if (nxt_slow_path(ret != NXT_OK)) { + return ret; + } + } + + return NXT_OK; +} + + +static nxt_int_t +nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, + nxt_event_engine_t *engine) +{ + nxt_int_t ret; + nxt_thread_link_t *link; + nxt_thread_handle_t handle; + + link = nxt_zalloc(sizeof(nxt_thread_link_t)); + + if (nxt_slow_path(link == NULL)) { + return NXT_ERROR; + } + + link->start = nxt_router_thread_start; + link->engine = engine; + link->work.handler = nxt_router_thread_exit_handler; + link->work.task = task; + link->work.data = link; + + nxt_queue_insert_tail(&rt->engines, &engine->link); + + ret = nxt_thread_create(&handle, link); + + if (nxt_slow_path(ret != NXT_OK)) { + nxt_queue_remove(&engine->link); + } + + return ret; +} + + +static void +nxt_router_engines_post(nxt_router_temp_conf_t *tmcf) +{ + nxt_uint_t n; + nxt_router_engine_conf_t *recf; + + recf = tmcf->engines->elts; + + for (n = tmcf->engines->nelts; n != 0; n--) { + nxt_router_engine_post(recf); + recf++; + } +} + + +static void +nxt_router_engine_post(nxt_router_engine_conf_t *recf) +{ + nxt_uint_t n; + nxt_work_t *work; + + work = recf->creating->elts; + + for (n = recf->creating->nelts; n != 0; n--) { + nxt_event_engine_post(recf->engine, work); + work++; + } +} + + +static void +nxt_router_thread_start(void *data) +{ + nxt_thread_t *thread; + nxt_thread_link_t *link; + nxt_event_engine_t *engine; + + link = data; + engine = link->engine; + + thread = nxt_thread(); + + /* STUB */ + thread->runtime = engine->task.thread->runtime; + + engine->task.thread = thread; + engine->task.log = thread->log; + thread->engine = engine; + thread->fiber = &engine->fibers->fiber; + + engine->mem_pool = nxt_mem_cache_pool_create(4096, 1024, 1024, 64); + + nxt_event_engine_start(engine); +} + + +static void +nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data) +{ + nxt_listen_event_t *listen; + nxt_event_engine_t *engine; + nxt_listen_socket_t *ls; + nxt_socket_conf_joint_t *joint; + + engine = obj; + joint = data; + + ls = &joint->socket_conf->listen; + + listen = nxt_listen_event(task, ls); + if (nxt_slow_path(listen == NULL)) { + nxt_router_listen_socket_release(task, joint); + return; + } + + listen->socket.data = joint; +} + + +nxt_inline nxt_listen_event_t * +nxt_router_listen_event(nxt_queue_t *listen_connections, + nxt_socket_conf_t *skcf) +{ + nxt_socket_t socket; + nxt_queue_link_t *link; + nxt_listen_event_t *listen; + + socket = skcf->listen.socket; + + for (link = nxt_queue_first(listen_connections); + link != nxt_queue_tail(listen_connections); + link = nxt_queue_next(link)) + { + listen = nxt_queue_link_data(link, nxt_listen_event_t, link); + + if (socket == listen->socket.fd) { + return listen; + } + } + + return NULL; +} + + +static void +nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data) +{ + nxt_event_engine_t *engine; + nxt_listen_event_t *listen; + nxt_socket_conf_joint_t *joint, *old; + + engine = obj; + joint = data; + + listen = nxt_router_listen_event(&engine->listen_connections, + joint->socket_conf); + + old = listen->socket.data; + listen->socket.data = joint; + + nxt_router_conf_release(task, old); +} + + +static void +nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data) +{ + nxt_socket_conf_t *skcf; + nxt_listen_event_t *listen; + nxt_event_engine_t *engine; + + engine = obj; + skcf = data; + + listen = nxt_router_listen_event(&engine->listen_connections, skcf); + + nxt_fd_event_delete(engine, &listen->socket); + + listen->timer.handler = nxt_router_listen_socket_close; + listen->timer.work_queue = &engine->fast_work_queue; + + nxt_timer_add(engine, &listen->timer, 0); +} + + +static void +nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data) +{ + nxt_timer_t *timer; + nxt_listen_event_t *listen; + nxt_socket_conf_joint_t *joint; + + timer = obj; + listen = nxt_timer_data(timer, nxt_listen_event_t, timer); + joint = listen->socket.data; + + nxt_queue_remove(&listen->link); + nxt_free(listen); + + nxt_router_listen_socket_release(task, joint); +} + + +static void +nxt_router_listen_socket_release(nxt_task_t *task, + nxt_socket_conf_joint_t *joint) +{ + nxt_socket_t s; + nxt_listen_socket_t *ls; + nxt_thread_spinlock_t *lock; + + s = -1; + ls = &joint->socket_conf->listen; + lock = &joint->socket_conf->router_conf->router->lock; + + nxt_thread_spin_lock(lock); + + if (--ls->count == 0) { + s = ls->socket; + ls->socket = -1; + } + + nxt_thread_spin_unlock(lock); + + if (s != -1) { + nxt_socket_close(task, s); + } + + nxt_router_conf_release(task, joint); +} + + +static void +nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint) +{ + nxt_socket_conf_t *skcf; + nxt_router_conf_t *rtcf; + nxt_thread_spinlock_t *lock; + + nxt_debug(task, "conf joint count: %D", joint->count); + + if (--joint->count != 0) { + return; + } + + nxt_queue_remove(&joint->link); + + skcf = joint->socket_conf; + rtcf = skcf->router_conf; + lock = &rtcf->router->lock; + + nxt_thread_spin_lock(lock); + + if (--skcf->count != 0) { + rtcf = NULL; + + } else { + nxt_queue_remove(&skcf->link); + + if (--rtcf->count != 0) { + rtcf = NULL; + } + } + + nxt_thread_spin_unlock(lock); + + if (rtcf != NULL) { + nxt_mem_pool_destroy(rtcf->mem_pool); + } + + if (nxt_queue_is_empty(&joint->engine->joints)) { + nxt_thread_exit(task->thread); + } +} + + +static void +nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data) +{ + nxt_thread_link_t *link; + nxt_event_engine_t *engine; + nxt_thread_handle_t handle; + + handle = obj; + link = data; + + nxt_thread_wait(handle); + + engine = link->engine; + + nxt_queue_remove(&engine->link); + + nxt_mem_cache_pool_destroy(engine->mem_pool); + + nxt_event_engine_free(engine); + + nxt_free(link); + + // TODO: free port +} + + +static const nxt_event_conn_state_t nxt_router_conn_read_state + nxt_aligned(64) = +{ + .ready_handler = nxt_router_conn_http_header_parse, + .close_handler = nxt_router_conn_close, + .error_handler = nxt_router_conn_error, + + .timer_handler = nxt_router_conn_timeout, + .timer_value = nxt_router_conn_timeout_value, + .timer_data = offsetof(nxt_socket_conf_t, header_read_timeout), +}; + + +static void +nxt_router_conn_init(nxt_task_t *task, void *obj, void *data) +{ + size_t size; + nxt_event_conn_t *c; + nxt_event_engine_t *engine; + nxt_socket_conf_joint_t *joint; + + c = obj; + joint = data; + + nxt_debug(task, "router conn init"); + + joint->count++; + + size = joint->socket_conf->header_buffer_size; + c->read = nxt_buf_mem_alloc(c->mem_pool, size, 0); + + c->socket.data = NULL; + + engine = task->thread->engine; + c->read_work_queue = &engine->fast_work_queue; + c->write_work_queue = &engine->fast_work_queue; + + c->read_state = &nxt_router_conn_read_state; + + nxt_event_conn_read(engine, c); +} + + +static const nxt_event_conn_state_t nxt_router_conn_write_state + nxt_aligned(64) = +{ + .ready_handler = nxt_router_conn_close, + .close_handler = nxt_router_conn_close, + .error_handler = nxt_router_conn_error, +}; + + +static void +nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data) +{ + size_t size; + nxt_int_t ret; + nxt_buf_t *b; + nxt_event_conn_t *c; + nxt_socket_conf_joint_t *joint; + nxt_http_request_parse_t *rp; + + c = obj; + rp = data; + + nxt_debug(task, "router conn http header parse"); + + if (rp == NULL) { + rp = nxt_mem_zalloc(c->mem_pool, sizeof(nxt_http_request_parse_t)); + if (nxt_slow_path(rp == NULL)) { + nxt_router_conn_close(task, c, data); + return; + } + + c->socket.data = rp; + } + + ret = nxt_http_parse_request(rp, &c->read->mem); + + nxt_debug(task, "http parse request: %d", ret); + + switch (nxt_expect(NXT_DONE, ret)) { + + case NXT_DONE: + break; + + case NXT_ERROR: + nxt_router_conn_close(task, c, data); + return; + + default: /* NXT_AGAIN */ + + if (c->read->mem.free == c->read->mem.end) { + joint = c->listen->socket.data; + size = joint->socket_conf->large_header_buffer_size, + + b = nxt_buf_mem_alloc(c->mem_pool, size, 0); + if (nxt_slow_path(b == NULL)) { + nxt_router_conn_close(task, c, data); + return; + } + + size = c->read->mem.free - c->read->mem.pos; + nxt_memcpy(b->mem.pos, c->read->mem.pos, size); + + b->mem.free += size; + c->read = b; + } + + nxt_event_conn_read(task->thread->engine, c); + return; + } + + c->write = c->read; + c->write->mem.pos = c->write->mem.start; + c->write_state = &nxt_router_conn_write_state; + + nxt_event_conn_write(task->thread->engine, c); +} + + +static const nxt_event_conn_state_t nxt_router_conn_close_state + nxt_aligned(64) = +{ + .ready_handler = nxt_router_conn_free, +}; + + +static void +nxt_router_conn_close(nxt_task_t *task, void *obj, void *data) +{ + nxt_event_conn_t *c; + + c = obj; + + nxt_debug(task, "router conn close"); + + c->write_state = &nxt_router_conn_close_state; + + nxt_event_conn_close(task->thread->engine, c); +} + + +static void +nxt_router_conn_free(nxt_task_t *task, void *obj, void *data) +{ + nxt_event_conn_t *c; + nxt_socket_conf_joint_t *joint; + + c = obj; + + nxt_debug(task, "router conn close done"); + + joint = c->listen->socket.data; + nxt_router_conf_release(task, joint); + + nxt_mem_pool_destroy(c->mem_pool); +} + + +static void +nxt_router_conn_error(nxt_task_t *task, void *obj, void *data) +{ + nxt_event_conn_t *c; + + c = obj; + + nxt_debug(task, "router conn error"); + + c->write_state = &nxt_router_conn_close_state; + + nxt_event_conn_close(task->thread->engine, c); +} + + +static void +nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data) +{ + nxt_timer_t *timer; + nxt_event_conn_t *c; + + timer = obj; + + nxt_debug(task, "router conn timeout"); + + c = nxt_event_read_timer_conn(timer); + + c->write_state = &nxt_router_conn_close_state; + + nxt_event_conn_close(task->thread->engine, c); +} + + +static nxt_msec_t +nxt_router_conn_timeout_value(nxt_event_conn_t *c, uintptr_t data) +{ + nxt_socket_conf_joint_t *joint; + + joint = c->listen->socket.data; + + return nxt_value_at(nxt_msec_t, joint->socket_conf, data); +} |