diff options
Diffstat (limited to '')
-rw-r--r-- | src/nxt_application.c | 8 | ||||
-rw-r--r-- | src/nxt_clang.h | 4 | ||||
-rw-r--r-- | src/nxt_conn_close.c | 159 | ||||
-rw-r--r-- | src/nxt_epoll_engine.c | 2 | ||||
-rw-r--r-- | src/nxt_event_conn.c | 128 | ||||
-rw-r--r-- | src/nxt_event_conn.h | 58 | ||||
-rw-r--r-- | src/nxt_event_conn_accept.c | 72 | ||||
-rw-r--r-- | src/nxt_event_engine.c | 4 | ||||
-rw-r--r-- | src/nxt_event_engine.h | 10 | ||||
-rw-r--r-- | src/nxt_kqueue_engine.c | 6 | ||||
-rw-r--r-- | src/nxt_listen_socket.h | 3 | ||||
-rw-r--r-- | src/nxt_router.c | 1076 | ||||
-rw-r--r-- | src/nxt_router.h | 78 | ||||
-rw-r--r-- | src/nxt_runtime.c | 31 | ||||
-rw-r--r-- | src/nxt_runtime.h | 4 | ||||
-rw-r--r-- | src/nxt_signal.c | 2 | ||||
-rw-r--r-- | src/nxt_thread.c | 14 | ||||
-rw-r--r-- | src/nxt_thread.h | 2 | ||||
-rw-r--r-- | src/nxt_thread_pool.c | 43 | ||||
-rw-r--r-- | src/nxt_thread_time.c | 2 |
20 files changed, 1445 insertions, 261 deletions
diff --git a/src/nxt_application.c b/src/nxt_application.c index f45d35c9..eedfd0c9 100644 --- a/src/nxt_application.c +++ b/src/nxt_application.c @@ -99,7 +99,7 @@ nxt_app_start(nxt_task_t *task, nxt_runtime_t *rt) if (nxt_fast_path(link != NULL)) { link->start = nxt_app_thread; - link->data = rt; + link->work.data = rt; return nxt_thread_create(&handle, link); } @@ -151,8 +151,8 @@ nxt_app_thread(void *ctx) nxt_socket_t s; nxt_thread_t *thr; nxt_runtime_t *rt; + nxt_queue_link_t *link; nxt_app_request_t *r; - nxt_event_engine_t **engines; nxt_listen_socket_t *ls; u_char buf[SIZE]; const size_t size = SIZE; @@ -163,9 +163,9 @@ nxt_app_thread(void *ctx) nxt_log_debug(thr->log, "app thread"); rt = ctx; - engines = rt->engines->elts; - nxt_app_engine = engines[0]; + link = nxt_queue_first(&rt->engines); + nxt_app_engine = nxt_queue_link_data(link, nxt_event_engine_t, link); nxt_app_mem_pool = nxt_mem_pool_create(512); if (nxt_slow_path(nxt_app_mem_pool == NULL)) { diff --git a/src/nxt_clang.h b/src/nxt_clang.h index 6cb752dd..31bc6fda 100644 --- a/src/nxt_clang.h +++ b/src/nxt_clang.h @@ -173,6 +173,10 @@ nxt_container_of(p, type, field) \ (type *) ((u_char *) (p) - offsetof(type, field)) +#define nxt_value_at(type, p, offset) \ + *(type *) ((u_char *) p + offset) + + #define \ nxt_nitems(x) \ (sizeof(x) / sizeof((x)[0])) diff --git a/src/nxt_conn_close.c b/src/nxt_conn_close.c new file mode 100644 index 00000000..18b2450c --- /dev/null +++ b/src/nxt_conn_close.c @@ -0,0 +1,159 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) NGINX, Inc. + */ + +#include <nxt_main.h> + + +static void nxt_conn_shutdown_handler(nxt_task_t *task, void *obj, void *data); +static void nxt_conn_close_handler(nxt_task_t *task, void *obj, void *data); +static void nxt_conn_close_timer_handler(nxt_task_t *task, void *obj, + void *data); +static void nxt_conn_close_error_ignore(nxt_task_t *task, void *obj, + void *data); + + +void +nxt_event_conn_close(nxt_event_engine_t *engine, nxt_event_conn_t *c) +{ + int ret; + socklen_t len; + struct linger linger; + nxt_work_queue_t *wq; + nxt_work_handler_t handler; + + nxt_debug(c->socket.task, "conn close fd:%d, to:%d", + c->socket.fd, c->socket.timedout); + + if (c->socket.timedout) { + /* + * Resetting of timed out connection on close + * releases kernel memory associated with socket. + * This also causes sending TCP/IP RST to a peer. + */ + linger.l_onoff = 1; + linger.l_linger = 0; + len = sizeof(struct linger); + + ret = setsockopt(c->socket.fd, SOL_SOCKET, SO_LINGER, &linger, len); + + if (nxt_slow_path(ret != 0)) { + nxt_log(c->socket.task, NXT_LOG_CRIT, + "setsockopt(%d, SO_LINGER) failed %E", + c->socket.fd, nxt_socket_errno); + } + } + + /* + * Event errors should be ignored here to avoid repeated nxt_conn_close() + * calls. nxt_conn_close_handler() or nxt_conn_close_timer_handler() + * will eventually close socket. + */ + c->socket.error_handler = nxt_conn_close_error_ignore; + + if (c->socket.error == 0 && !c->socket.closed && !c->socket.shutdown) { + wq = &engine->shutdown_work_queue; + handler = nxt_conn_shutdown_handler; + + } else{ + wq = &engine->close_work_queue; + handler = nxt_conn_close_handler; + } + + nxt_work_queue_add(wq, handler, c->socket.task, c, engine); +} + + +static void +nxt_conn_shutdown_handler(nxt_task_t *task, void *obj, void *data) +{ + nxt_event_conn_t *c; + nxt_event_engine_t *engine; + + c = obj; + engine = data; + + nxt_debug(task, "conn shutdown handler fd:%d", c->socket.fd); + + c->socket.shutdown = 1; + + nxt_socket_shutdown(task, c->socket.fd, SHUT_RDWR); + + nxt_work_queue_add(&engine->close_work_queue, nxt_conn_close_handler, + task, c, engine); +} + + +static void +nxt_conn_close_handler(nxt_task_t *task, void *obj, void *data) +{ + nxt_uint_t events_pending, timers_pending; + nxt_event_conn_t *c; + nxt_event_engine_t *engine; + + c = obj; + engine = data; + + nxt_debug(task, "conn close handler fd:%d", c->socket.fd); + + /* + * Socket should be closed only after all pending socket event operations + * will be processed by the kernel. This could be achieved with zero-timer + * handler. Pending timer operations associated with the socket are + * processed before going to the kernel. + */ + + timers_pending = nxt_timer_delete(engine, &c->read_timer); + timers_pending += nxt_timer_delete(engine, &c->write_timer); + + events_pending = nxt_fd_event_close(engine, &c->socket); + + if (events_pending == 0) { + nxt_socket_close(task, c->socket.fd); + c->socket.fd = -1; + + if (timers_pending == 0) { + nxt_work_queue_add(&engine->fast_work_queue, + c->write_state->ready_handler, + task, c, c->socket.data); + return; + } + } + + c->write_timer.handler = nxt_conn_close_timer_handler; + c->write_timer.work_queue = &engine->fast_work_queue; + + nxt_timer_add(engine, &c->write_timer, 0); +} + + +static void +nxt_conn_close_timer_handler(nxt_task_t *task, void *obj, void *data) +{ + nxt_timer_t *timer; + nxt_event_conn_t *c; + + timer = obj; + + c = nxt_event_write_timer_conn(timer); + + nxt_debug(task, "conn close timer handler fd:%d", c->socket.fd); + + if (c->socket.fd != -1) { + nxt_socket_close(task, c->socket.fd); + c->socket.fd = -1; + } + + nxt_work_queue_add(&task->thread->engine->fast_work_queue, + c->write_state->ready_handler, + task, c, c->socket.data); +} + + +static void +nxt_conn_close_error_ignore(nxt_task_t *task, void *obj, void *data) +{ + nxt_debug(task, "conn close error ignore"); +} diff --git a/src/nxt_epoll_engine.c b/src/nxt_epoll_engine.c index 149f5cb8..d46ee007 100644 --- a/src/nxt_epoll_engine.c +++ b/src/nxt_epoll_engine.c @@ -994,7 +994,7 @@ nxt_epoll_event_conn_io_accept4(nxt_task_t *task, void *obj, void *data) nxt_event_conn_listen_t *cls; cls = obj; - c = data; + c = cls->next; cls->ready--; cls->socket.read_ready = (cls->ready != 0); diff --git a/src/nxt_event_conn.c b/src/nxt_event_conn.c index 079901f3..b8adb00a 100644 --- a/src/nxt_event_conn.c +++ b/src/nxt_event_conn.c @@ -7,12 +7,6 @@ #include <nxt_main.h> -static void nxt_conn_shutdown_handler(nxt_task_t *task, void *obj, void *data); -static void nxt_conn_close_handler(nxt_task_t *task, void *obj, void *data); -static void nxt_conn_close_timer_handler(nxt_task_t *task, void *obj, - void *data); - - nxt_event_conn_io_t nxt_unix_event_conn_io = { nxt_event_conn_io_connect, nxt_event_conn_io_accept, @@ -132,128 +126,6 @@ nxt_event_conn_io_shutdown(nxt_task_t *task, void *obj, void *data) void -nxt_event_conn_close(nxt_event_engine_t *engine, nxt_event_conn_t *c) -{ - int ret; - socklen_t len; - struct linger linger; - nxt_work_queue_t *wq; - nxt_work_handler_t handler; - - if (c->socket.timedout) { - /* - * Resetting of timed out connection on close - * releases kernel memory associated with socket. - * This also causes sending TCP/IP RST to a peer. - */ - linger.l_onoff = 1; - linger.l_linger = 0; - len = sizeof(struct linger); - - ret = setsockopt(c->socket.fd, SOL_SOCKET, SO_LINGER, &linger, len); - - if (nxt_slow_path(ret != 0)) { - nxt_log(c->socket.task, NXT_LOG_CRIT, - "setsockopt(%d, SO_LINGER) failed %E", - c->socket.fd, nxt_socket_errno); - } - } - - if (c->socket.error == 0 && !c->socket.closed && !c->socket.shutdown) { - wq = &engine->shutdown_work_queue; - handler = nxt_conn_shutdown_handler; - - } else{ - wq = &engine->close_work_queue; - handler = nxt_conn_close_handler; - } - - nxt_work_queue_add(wq, handler, c->socket.task, c, engine); -} - - -static void -nxt_conn_shutdown_handler(nxt_task_t *task, void *obj, void *data) -{ - nxt_event_conn_t *c; - nxt_event_engine_t *engine; - - c = obj; - engine = data; - - nxt_debug(task, "event conn shutdown fd:%d", c->socket.fd); - - c->socket.shutdown = 1; - - nxt_socket_shutdown(task, c->socket.fd, SHUT_RDWR); - - nxt_work_queue_add(&engine->close_work_queue, nxt_conn_close_handler, - task, c, engine); -} - - -static void -nxt_conn_close_handler(nxt_task_t *task, void *obj, void *data) -{ - nxt_uint_t events_pending, timers_pending; - nxt_event_conn_t *c; - nxt_event_engine_t *engine; - - c = obj; - engine = data; - - nxt_debug(task, "event conn close fd:%d", c->socket.fd); - - timers_pending = nxt_timer_delete(engine, &c->read_timer); - timers_pending += nxt_timer_delete(engine, &c->write_timer); - - events_pending = nxt_fd_event_close(engine, &c->socket); - - if (events_pending == 0) { - nxt_socket_close(task, c->socket.fd); - c->socket.fd = -1; - - if (timers_pending == 0) { - nxt_work_queue_add(&engine->fast_work_queue, - c->write_state->ready_handler, - task, c, c->socket.data); - return; - } - } - - c->write_timer.handler = nxt_conn_close_timer_handler; - c->write_timer.work_queue = &engine->fast_work_queue; - - nxt_timer_add(engine, &c->write_timer, 0); -} - - -static void -nxt_conn_close_timer_handler(nxt_task_t *task, void *obj, void *data) -{ - nxt_timer_t *ev; - nxt_event_conn_t *c; - nxt_event_engine_t *engine; - - ev = obj; - - c = nxt_event_write_timer_conn(ev); - - nxt_debug(task, "event conn close handler fd:%d", c->socket.fd); - - if (c->socket.fd != -1) { - nxt_socket_close(task, c->socket.fd); - c->socket.fd = -1; - } - - engine = task->thread->engine; - - nxt_work_queue_add(&engine->fast_work_queue, c->write_state->ready_handler, - task, c, c->socket.data); -} - - -void nxt_event_conn_timer(nxt_event_engine_t *engine, nxt_event_conn_t *c, const nxt_event_conn_state_t *state, nxt_timer_t *tev) { diff --git a/src/nxt_event_conn.h b/src/nxt_event_conn.h index 30d4fedc..902dff9f 100644 --- a/src/nxt_event_conn.h +++ b/src/nxt_event_conn.h @@ -100,6 +100,35 @@ typedef struct { } nxt_event_conn_io_t; +/* + * The nxt_event_conn_listen_t is separated from nxt_listen_socket_t + * because nxt_listen_socket_t is one per process whilst each worker + * thread uses own nxt_event_conn_listen_t. + */ +typedef struct { + /* Must be the first field. */ + nxt_fd_event_t socket; + + nxt_task_t task; + + uint32_t ready; + uint32_t batch; + + /* An accept() interface is cached to minimize memory accesses. */ + nxt_work_handler_t accept; + + nxt_listen_socket_t *listen; + nxt_event_conn_t *next; /* STUB */; + nxt_work_queue_t *work_queue; + + nxt_timer_t timer; + + nxt_queue_link_t link; +} nxt_event_conn_listen_t; + +typedef nxt_event_conn_listen_t nxt_listen_event_t; + + struct nxt_event_conn_s { /* * Must be the first field, since nxt_fd_event_t @@ -143,7 +172,7 @@ struct nxt_event_conn_s { nxt_task_t task; nxt_log_t log; - nxt_listen_socket_t *listen; + nxt_event_conn_listen_t *listen; nxt_sockaddr_t *remote; nxt_sockaddr_t *local; const char *action; @@ -163,31 +192,6 @@ struct nxt_event_conn_s { }; -/* - * The nxt_event_conn_listen_t is separated from nxt_listen_socket_t - * because nxt_listen_socket_t is one per process whilst each worker - * thread uses own nxt_event_conn_listen_t. - */ -typedef struct { - /* Must be the first field. */ - nxt_fd_event_t socket; - - nxt_task_t task; - - uint32_t ready; - uint32_t batch; - - /* An accept() interface is cached to minimize memory accesses. */ - nxt_work_handler_t accept; - - nxt_listen_socket_t *listen; - - nxt_timer_t timer; - - nxt_queue_link_t link; -} nxt_event_conn_listen_t; - - #define \ nxt_event_conn_timer_init(ev, c, wq) \ do { \ @@ -256,6 +260,8 @@ nxt_int_t nxt_event_conn_socket(nxt_task_t *task, nxt_event_conn_t *c); void nxt_event_conn_connect_test(nxt_task_t *task, void *obj, void *data); void nxt_event_conn_connect_error(nxt_task_t *task, void *obj, void *data); +NXT_EXPORT nxt_event_conn_listen_t *nxt_listen_event(nxt_task_t *task, + nxt_listen_socket_t *ls); NXT_EXPORT nxt_int_t nxt_event_conn_listen(nxt_task_t *task, nxt_listen_socket_t *ls); void nxt_event_conn_io_accept(nxt_task_t *task, void *obj, void *data); diff --git a/src/nxt_event_conn_accept.c b/src/nxt_event_conn_accept.c index 5f0a72f0..ced2a3d8 100644 --- a/src/nxt_event_conn_accept.c +++ b/src/nxt_event_conn_accept.c @@ -32,6 +32,53 @@ static void nxt_event_conn_listen_timer_handler(nxt_task_t *task, void *obj, void *data); +nxt_event_conn_listen_t * +nxt_listen_event(nxt_task_t *task, nxt_listen_socket_t *ls) +{ + nxt_event_engine_t *engine; + nxt_event_conn_listen_t *cls; + + cls = nxt_zalloc(sizeof(nxt_event_conn_listen_t)); + + if (nxt_fast_path(cls != NULL)) { + cls->socket.fd = ls->socket; + + engine = task->thread->engine; + cls->batch = engine->batch; + + cls->socket.read_work_queue = &engine->accept_work_queue; + cls->socket.read_handler = nxt_event_conn_listen_handler; + cls->socket.error_handler = nxt_event_conn_listen_event_error; + cls->socket.log = &nxt_main_log; + + cls->accept = engine->event.io->accept; + + cls->listen = ls; + cls->work_queue = &engine->read_work_queue; + + cls->timer.work_queue = &engine->fast_work_queue; + cls->timer.handler = nxt_event_conn_listen_timer_handler; + cls->timer.log = &nxt_main_log; + + cls->task.thread = task->thread; + cls->task.log = &nxt_main_log; + cls->task.ident = nxt_task_next_ident(); + cls->socket.task = &cls->task; + cls->timer.task = &cls->task; + + if (nxt_event_conn_accept_alloc(task, cls) != NULL) { + nxt_fd_event_enable_accept(engine, &cls->socket); + + nxt_queue_insert_head(&engine->listen_connections, &cls->link); + } + + return cls; + } + + return NULL; +} + + nxt_int_t nxt_event_conn_listen(nxt_task_t *task, nxt_listen_socket_t *ls) { @@ -97,13 +144,12 @@ nxt_event_conn_accept_alloc(nxt_task_t *task, nxt_event_conn_listen_t *cls) /* This allocation cannot fail. */ c = nxt_event_conn_create(mp, cls->socket.task); - cls->socket.data = c; + cls->next = c; c->socket.read_work_queue = cls->socket.read_work_queue; c->socket.write_ready = 1; + c->listen = cls; ls = cls->listen; - c->listen = ls; - /* This allocation cannot fail. */ remote = nxt_sockaddr_alloc(mp, ls->socklen, ls->address_length); c->remote = remote; @@ -146,7 +192,7 @@ nxt_event_conn_io_accept(nxt_task_t *task, void *obj, void *data) nxt_event_conn_listen_t *cls; cls = obj; - c = data; + c = cls->next; cls->ready--; cls->socket.read_ready = (cls->ready != 0); @@ -200,17 +246,19 @@ nxt_event_conn_accept(nxt_task_t *task, nxt_event_conn_listen_t *cls, nxt_queue_insert_head(&task->thread->engine->idle_connections, &c->link); - c->read_work_queue = c->listen->work_queue; - c->write_work_queue = c->listen->work_queue; + c->read_work_queue = cls->work_queue; + c->write_work_queue = cls->work_queue; - if (c->listen->read_after_accept) { + if (cls->listen->read_after_accept) { //c->socket.read_ready = 1; - c->listen->handler(task, c, NULL); +// cls->listen->handler(task, c, cls->socket.data); + nxt_work_queue_add(c->read_work_queue, cls->listen->handler, + task, c, cls->socket.data); } else { - nxt_work_queue_add(c->write_work_queue, c->listen->handler, - task, c, NULL); + nxt_work_queue_add(c->write_work_queue, cls->listen->handler, + task, c, cls->socket.data); } next = nxt_event_conn_accept_next(task, cls); @@ -227,7 +275,7 @@ nxt_event_conn_accept_next(nxt_task_t *task, nxt_event_conn_listen_t *cls) { nxt_event_conn_t *c; - cls->socket.data = NULL; + cls->next = NULL; do { c = nxt_event_conn_accept_alloc(task, cls); @@ -338,7 +386,7 @@ nxt_event_conn_listen_timer_handler(nxt_task_t *task, void *obj, void *data) ev = obj; cls = nxt_timer_data(ev, nxt_event_conn_listen_t, timer); - c = cls->socket.data; + c = cls->next; if (c == NULL) { c = nxt_event_conn_accept_next(task, cls); diff --git a/src/nxt_event_engine.c b/src/nxt_event_engine.c index bd10a270..52bfa39d 100644 --- a/src/nxt_event_engine.c +++ b/src/nxt_event_engine.c @@ -44,9 +44,6 @@ nxt_event_engine_create(nxt_task_t *task, engine->task.log = thread->log; engine->task.ident = nxt_task_next_ident(); - thread->engine = engine; - thread->fiber = &engine->fibers->fiber; - engine->batch = batch; if (flags & NXT_ENGINE_FIBERS) { @@ -121,6 +118,7 @@ nxt_event_engine_create(nxt_task_t *task, engine->max_connections = 0xffffffff; + nxt_queue_init(&engine->joints); nxt_queue_init(&engine->listen_connections); nxt_queue_init(&engine->idle_connections); diff --git a/src/nxt_event_engine.h b/src/nxt_event_engine.h index 30d1fb21..6947cc8d 100644 --- a/src/nxt_event_engine.h +++ b/src/nxt_event_engine.h @@ -355,6 +355,11 @@ nxt_fd_event_disable(engine, ev) \ #define \ +nxt_fd_event_delete(engine, ev) \ + (engine)->event.delete(engine, ev) + + +#define \ nxt_fd_event_close(engine, ev) \ (engine)->event.close(engine, ev) @@ -481,8 +486,13 @@ struct nxt_event_engine_s { uint32_t connections; uint32_t max_connections; + nxt_port_t *port; + nxt_mem_cache_pool_t *mem_pool; + nxt_queue_t joints; nxt_queue_t listen_connections; nxt_queue_t idle_connections; + + nxt_queue_link_t link; }; diff --git a/src/nxt_kqueue_engine.c b/src/nxt_kqueue_engine.c index 559f0c9e..a216e066 100644 --- a/src/nxt_kqueue_engine.c +++ b/src/nxt_kqueue_engine.c @@ -522,6 +522,8 @@ nxt_kqueue_fd_error_handler(nxt_task_t *task, void *obj, void *data) ev = obj; + nxt_debug(task, "kqueue fd error handler fd:%d", ev->fd); + if (ev->kq_eof && ev->kq_errno != 0) { ev->error = ev->kq_errno; nxt_log(task, nxt_socket_error_level(ev->kq_errno), @@ -544,6 +546,8 @@ nxt_kqueue_file_error_handler(nxt_task_t *task, void *obj, void *data) ev = obj; + nxt_debug(task, "kqueue file error handler fd:%d", ev->file->fd); + ev->handler(task, ev, data); } @@ -924,7 +928,7 @@ nxt_kqueue_event_conn_io_accept(nxt_task_t *task, void *obj, void *data) nxt_event_conn_listen_t *cls; cls = obj; - c = data; + c = cls->next; cls->ready--; cls->socket.read_ready = (cls->ready != 0); diff --git a/src/nxt_listen_socket.h b/src/nxt_listen_socket.h index 1e62ae80..ce7a8aeb 100644 --- a/src/nxt_listen_socket.h +++ b/src/nxt_listen_socket.h @@ -31,9 +31,8 @@ typedef struct { uint8_t socklen; uint8_t address_length; + uint32_t count; uint32_t mem_pool_size; - - void *servers; } nxt_listen_socket_t; diff --git a/src/nxt_router.c b/src/nxt_router.c index caef8503..1f96a1df 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -5,81 +5,1083 @@ * Copyright (C) NGINX, Inc. */ -#include <nxt_main.h> -#include <nxt_runtime.h> -#include <nxt_master_process.h> +#include <nxt_router.h> -static nxt_int_t nxt_router_listen_socket(nxt_task_t *task, nxt_runtime_t *rt); +static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task, + nxt_router_t *router); +static void nxt_router_listen_sockets_sort(nxt_router_t *router, + nxt_router_temp_conf_t *tmcf); + +static nxt_int_t nxt_router_stub_conf(nxt_task_t *task, + nxt_router_temp_conf_t *tmcf); +static nxt_int_t nxt_router_listen_sockets_stub_create(nxt_task_t *task, + nxt_router_temp_conf_t *tmcf); +static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task, + nxt_mem_pool_t *mp, nxt_sockaddr_t *sa); +static nxt_sockaddr_t *nxt_router_listen_sockaddr_stub(nxt_task_t *task, + nxt_mem_pool_t *mp, uint32_t port); + +static nxt_int_t nxt_router_engines_create(nxt_task_t *task, + nxt_router_t *router, nxt_router_temp_conf_t *tmcf, + const nxt_event_interface_t *interface); +static nxt_int_t nxt_router_engine_conf_create(nxt_task_t *task, + nxt_mem_pool_t *mp, nxt_router_temp_conf_t *tmcf, + nxt_router_engine_conf_t *recf); +static nxt_int_t nxt_router_engine_conf_update(nxt_task_t *task, + nxt_mem_pool_t *mp, nxt_router_temp_conf_t *tmcf, + nxt_router_engine_conf_t *recf); +static nxt_int_t nxt_router_engine_conf_delete(nxt_task_t *task, + nxt_mem_pool_t *mp, nxt_router_temp_conf_t *tmcf, + nxt_router_engine_conf_t *recf); +static nxt_int_t nxt_router_engine_joints_create(nxt_task_t *task, + nxt_mem_pool_t *mp, nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, + nxt_array_t *array, nxt_work_handler_t handler); +static nxt_int_t nxt_router_engine_joints_delete(nxt_task_t *task, + nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array); + +static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, + nxt_router_temp_conf_t *tmcf); +static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, + nxt_event_engine_t *engine); + +static void nxt_router_engines_post(nxt_router_temp_conf_t *tmcf); +static void nxt_router_engine_post(nxt_router_engine_conf_t *recf); + +static void nxt_router_thread_start(void *data); +static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj, + void *data); +static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj, + void *data); +static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, + void *data); +static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj, + void *data); +static void nxt_router_listen_socket_release(nxt_task_t *task, + nxt_socket_conf_joint_t *joint); +static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, + void *data); +static void nxt_router_conf_release(nxt_task_t *task, + nxt_socket_conf_joint_t *joint); + +static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data); +static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, + void *data); +static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data); +static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data); +static void nxt_router_conn_error(nxt_task_t *task, void *obj, void *data); +static void nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data); +static nxt_msec_t nxt_router_conn_timeout_value(nxt_event_conn_t *c, + uintptr_t data); nxt_int_t nxt_router_start(nxt_task_t *task, nxt_runtime_t *rt) { - if (nxt_router_listen_socket(task, rt) != NXT_OK) { + nxt_int_t ret; + nxt_router_t *router; + nxt_router_temp_conf_t *tmcf; + const nxt_event_interface_t *interface; + + router = nxt_zalloc(sizeof(nxt_router_t)); + if (nxt_slow_path(router == NULL)) { + return NXT_ERROR; + } + + nxt_queue_init(&router->engines); + nxt_queue_init(&router->sockets); + + /**/ + + tmcf = nxt_router_temp_conf(task, router); + if (nxt_slow_path(tmcf == NULL)) { return NXT_ERROR; } + ret = nxt_router_stub_conf(task, tmcf); + if (nxt_slow_path(ret != NXT_OK)) { + return ret; + } + + nxt_router_listen_sockets_sort(router, tmcf); + + ret = nxt_router_listen_sockets_stub_create(task, tmcf); + if (nxt_slow_path(ret != NXT_OK)) { + return ret; + } + + interface = nxt_service_get(rt->services, "engine", NULL); + + ret = nxt_router_engines_create(task, router, tmcf, interface); + if (nxt_slow_path(ret != NXT_OK)) { + return ret; + } + + ret = nxt_router_threads_create(task, rt, tmcf); + if (nxt_slow_path(ret != NXT_OK)) { + return ret; + } + + nxt_router_engines_post(tmcf); + + nxt_queue_add(&router->sockets, &tmcf->updating); + nxt_queue_add(&router->sockets, &tmcf->creating); + return NXT_OK; } +static nxt_router_temp_conf_t * +nxt_router_temp_conf(nxt_task_t *task, nxt_router_t *router) +{ + nxt_mem_pool_t *mp, *tmp; + nxt_router_conf_t *rtcf; + nxt_router_temp_conf_t *tmcf; + + mp = nxt_mem_pool_create(1024); + if (nxt_slow_path(mp == NULL)) { + return NULL; + } + + rtcf = nxt_mem_zalloc(mp, sizeof(nxt_router_conf_t)); + if (nxt_slow_path(rtcf == NULL)) { + goto fail; + } + + rtcf->mem_pool = mp; + rtcf->router = router; + rtcf->count = 1; + + tmp = nxt_mem_pool_create(1024); + if (nxt_slow_path(tmp == NULL)) { + goto fail; + } + + tmcf = nxt_mem_zalloc(tmp, sizeof(nxt_router_temp_conf_t)); + if (nxt_slow_path(tmcf == NULL)) { + goto temp_fail; + } + + tmcf->mem_pool = tmp; + tmcf->conf = rtcf; + + tmcf->engines = nxt_array_create(tmcf->mem_pool, 4, + sizeof(nxt_router_engine_conf_t)); + if (nxt_slow_path(tmcf->engines == NULL)) { + goto temp_fail; + } + + nxt_queue_init(&tmcf->deleting); + nxt_queue_init(&tmcf->keeping); + nxt_queue_init(&tmcf->updating); + nxt_queue_init(&tmcf->pending); + nxt_queue_init(&tmcf->creating); + + return tmcf; + +temp_fail: + + nxt_mem_pool_destroy(tmp); + +fail: + + nxt_mem_pool_destroy(mp); + + return NULL; +} + + static nxt_int_t -nxt_router_listen_socket(nxt_task_t *task, nxt_runtime_t *rt) +nxt_router_stub_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) { - nxt_sockaddr_t *sa; - nxt_listen_socket_t *ls; + nxt_sockaddr_t *sa; + nxt_mem_pool_t *mp; + nxt_socket_conf_t *skcf; - sa = nxt_sockaddr_alloc(rt->mem_pool, sizeof(struct sockaddr_in), - NXT_INET_ADDR_STR_LEN); - if (sa == NULL) { - return NXT_ERROR; + tmcf->conf->threads = 1; + + mp = tmcf->conf->mem_pool; + + sa = nxt_router_listen_sockaddr_stub(task, mp, 8000); + skcf = nxt_router_socket_conf(task, mp, sa); + + skcf->listen.handler = nxt_router_conn_init; + skcf->listen.mem_pool_size = nxt_listen_socket_pool_min_size(&skcf->listen) + + sizeof(nxt_event_conn_proxy_t) + + sizeof(nxt_event_conn_t) + + 4 * sizeof(nxt_buf_t); + + skcf->header_buffer_size = 2048; + skcf->large_header_buffer_size = 8192; + skcf->header_read_timeout = 5000; + + nxt_queue_insert_tail(&tmcf->pending, &skcf->link); + + sa = nxt_router_listen_sockaddr_stub(task, mp, 8001); + skcf = nxt_router_socket_conf(task, mp, sa); + + skcf->listen.handler = nxt_stream_connection_init; + skcf->listen.mem_pool_size = nxt_listen_socket_pool_min_size(&skcf->listen) + + sizeof(nxt_event_conn_proxy_t) + + sizeof(nxt_event_conn_t) + + 4 * sizeof(nxt_buf_t); + + skcf->header_read_timeout = 5000; + + nxt_queue_insert_tail(&tmcf->pending, &skcf->link); + + return NXT_OK; +} + + +static nxt_socket_conf_t * +nxt_router_socket_conf(nxt_task_t *task, nxt_mem_pool_t *mp, nxt_sockaddr_t *sa) +{ + nxt_socket_conf_t *conf; + + conf = nxt_mem_zalloc(mp, sizeof(nxt_socket_conf_t)); + if (nxt_slow_path(conf == NULL)) { + return NULL; + } + + conf->listen.sockaddr = sa; + + conf->listen.socket = -1; + conf->listen.backlog = NXT_LISTEN_BACKLOG; + conf->listen.flags = NXT_NONBLOCK; + conf->listen.read_after_accept = 1; + + return conf; +} + + +static nxt_sockaddr_t * +nxt_router_listen_sockaddr_stub(nxt_task_t *task, nxt_mem_pool_t *mp, + uint32_t port) +{ + nxt_sockaddr_t *sa; + struct sockaddr_in sin; + + nxt_memzero(&sin, sizeof(struct sockaddr_in)); + + sin.sin_family = AF_INET; + sin.sin_port = htons(port); + + sa = nxt_sockaddr_create(mp, (struct sockaddr *) &sin, + sizeof(struct sockaddr_in), NXT_INET_ADDR_STR_LEN); + if (nxt_slow_path(sa == NULL)) { + return NULL; } sa->type = SOCK_STREAM; - sa->u.sockaddr_in.sin_family = AF_INET; - sa->u.sockaddr_in.sin_port = htons(8000); nxt_sockaddr_text(sa); - ls = nxt_runtime_listen_socket_add(rt, sa); - if (ls == NULL) { + return sa; +} + + +static void +nxt_router_listen_sockets_sort(nxt_router_t *router, + nxt_router_temp_conf_t *tmcf) +{ + nxt_queue_link_t *nqlk, *oqlk, *next; + nxt_socket_conf_t *nskcf, *oskcf; + + for (nqlk = nxt_queue_first(&tmcf->pending); + nqlk != nxt_queue_tail(&tmcf->pending); + nqlk = next) + { + next = nxt_queue_next(nqlk); + nskcf = nxt_queue_link_data(nqlk, nxt_socket_conf_t, link); + + for (oqlk = nxt_queue_first(&router->sockets); + oqlk != nxt_queue_tail(&router->sockets); + oqlk = nxt_queue_next(oqlk)) + { + oskcf = nxt_queue_link_data(oqlk, nxt_socket_conf_t, link); + + if (nxt_sockaddr_cmp(nskcf->listen.sockaddr, + oskcf->listen.sockaddr)) + { + nxt_queue_remove(oqlk); + nxt_queue_insert_tail(&tmcf->keeping, oqlk); + + nxt_queue_remove(nqlk); + nxt_queue_insert_tail(&tmcf->updating, nqlk); + + break; + } + } + } + + nxt_queue_add(&tmcf->deleting, &router->sockets); +} + + +static nxt_int_t +nxt_router_listen_sockets_stub_create(nxt_task_t *task, + nxt_router_temp_conf_t *tmcf) +{ + nxt_queue_link_t *qlk, *nqlk; + nxt_socket_conf_t *skcf; + + for (qlk = nxt_queue_first(&tmcf->pending); + qlk != nxt_queue_tail(&tmcf->pending); + qlk = nqlk) + { + skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); + + if (nxt_listen_socket_create(task, &skcf->listen, 0) != NXT_OK) { + return NXT_ERROR; + } + + nqlk = nxt_queue_next(qlk); + nxt_queue_remove(qlk); + nxt_queue_insert_tail(&tmcf->creating, qlk); + } + + return NXT_OK; +} + + +static nxt_int_t +nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router, + nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface) +{ + nxt_int_t ret; + nxt_uint_t n, threads; + nxt_mem_pool_t *mp; + nxt_queue_link_t *qlk; + nxt_router_engine_conf_t *recf; + + mp = tmcf->conf->mem_pool; + threads = tmcf->conf->threads; + + tmcf->engines = nxt_array_create(tmcf->mem_pool, threads, + sizeof(nxt_router_engine_conf_t)); + if (nxt_slow_path(tmcf->engines == NULL)) { return NXT_ERROR; } - ls->read_after_accept = 1; + n = 0; - ls->flags = NXT_NONBLOCK; + for (qlk = nxt_queue_first(&router->engines); + qlk != nxt_queue_tail(&router->engines); + qlk = nxt_queue_next(qlk)) + { + recf = nxt_array_zero_add(tmcf->engines); + if (nxt_slow_path(recf == NULL)) { + return NXT_ERROR; + } -#if 0 - /* STUB */ - wq = nxt_mem_zalloc(cf->mem_pool, sizeof(nxt_work_queue_t)); - if (wq == NULL) { + recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link); + // STUB + recf->task = recf->engine->task; + + if (n < threads) { + ret = nxt_router_engine_conf_update(task, mp, tmcf, recf); + + } else { + ret = nxt_router_engine_conf_delete(task, mp, tmcf, recf); + } + + if (nxt_slow_path(ret != NXT_OK)) { + return ret; + } + + n++; + } + + tmcf->new_threads = n; + + while (n < threads) { + recf = nxt_array_zero_add(tmcf->engines); + if (nxt_slow_path(recf == NULL)) { + return NXT_ERROR; + } + + recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0); + if (nxt_slow_path(recf->engine == NULL)) { + return NXT_ERROR; + } + // STUB + recf->task = recf->engine->task; + + ret = nxt_router_engine_conf_create(task, mp, tmcf, recf); + if (nxt_slow_path(ret != NXT_OK)) { + return ret; + } + + n++; + } + + return NXT_OK; +} + + +static nxt_int_t +nxt_router_engine_conf_create(nxt_task_t *task, nxt_mem_pool_t *mp, + nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf) +{ + nxt_int_t ret; + + recf->creating = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t)); + if (nxt_slow_path(recf->creating == NULL)) { return NXT_ERROR; } - nxt_work_queue_name(wq, "listen"); - /**/ - ls->work_queue = wq; -#endif - ls->handler = nxt_stream_connection_init; + ret = nxt_router_engine_joints_create(task, mp, recf, &tmcf->creating, + recf->creating, nxt_router_listen_socket_create); + if (nxt_slow_path(ret != NXT_OK)) { + return ret; + } - /* - * Connection memory pool chunk size is tunned to - * allocate the most data in one mem_pool chunk. - */ - ls->mem_pool_size = nxt_listen_socket_pool_min_size(ls) - + sizeof(nxt_event_conn_proxy_t) - + sizeof(nxt_event_conn_t) - + 4 * sizeof(nxt_buf_t); + return nxt_router_engine_joints_create(task, mp, recf, &tmcf->updating, + recf->creating, nxt_router_listen_socket_create); +} + + +static nxt_int_t +nxt_router_engine_conf_update(nxt_task_t *task, nxt_mem_pool_t *mp, + nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf) +{ + nxt_int_t ret; + + recf->creating = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t)); + if (nxt_slow_path(recf->creating == NULL)) { + return NXT_ERROR; + } + + ret = nxt_router_engine_joints_create(task, mp, recf, &tmcf->creating, + recf->creating, nxt_router_listen_socket_create); + if (nxt_slow_path(ret != NXT_OK)) { + return ret; + } + + recf->updating = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t)); + if (nxt_slow_path(recf->updating == NULL)) { + return NXT_ERROR; + } + + ret = nxt_router_engine_joints_create(task, mp, recf, &tmcf->updating, + recf->updating, nxt_router_listen_socket_update); + if (nxt_slow_path(ret != NXT_OK)) { + return ret; + } - if (nxt_listen_socket_create(task, ls, 0) != NXT_OK) { + recf->deleting = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t)); + if (nxt_slow_path(recf->deleting == NULL)) { return NXT_ERROR; } - if (nxt_event_conn_listen(task, ls) != NXT_OK) { + return nxt_router_engine_joints_delete(task, recf, &tmcf->deleting, + recf->deleting); +} + + +static nxt_int_t +nxt_router_engine_conf_delete(nxt_task_t *task, nxt_mem_pool_t *mp, + nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf) +{ + nxt_int_t ret; + + recf->deleting = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t)); + if (nxt_slow_path(recf->deleting == NULL)) { return NXT_ERROR; } + ret = nxt_router_engine_joints_delete(task, recf, &tmcf->updating, + recf->deleting); + if (nxt_slow_path(ret != NXT_OK)) { + return ret; + } + + return nxt_router_engine_joints_delete(task, recf, &tmcf->deleting, + recf->deleting); +} + + +static nxt_int_t +nxt_router_engine_joints_create(nxt_task_t *task, nxt_mem_pool_t *mp, + nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array, + nxt_work_handler_t handler) +{ + nxt_work_t *work; + nxt_queue_link_t *qlk; + nxt_socket_conf_joint_t *joint; + + for (qlk = nxt_queue_first(sockets); + qlk != nxt_queue_tail(sockets); + qlk = nxt_queue_next(qlk)) + { + work = nxt_array_add(array); + if (nxt_slow_path(work == NULL)) { + return NXT_ERROR; + } + + work->next = NULL; + work->handler = handler; + work->task = &recf->task; + work->obj = recf->engine; + + joint = nxt_mem_alloc(mp, sizeof(nxt_socket_conf_joint_t)); + if (nxt_slow_path(joint == NULL)) { + return NXT_ERROR; + } + + work->data = joint; + + joint->count = 1; + joint->socket_conf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); + } + + return NXT_OK; +} + + +static nxt_int_t +nxt_router_engine_joints_delete(nxt_task_t *task, + nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array) +{ + nxt_work_t *work; + nxt_queue_link_t *qlk; + + for (qlk = nxt_queue_first(sockets); + qlk != nxt_queue_tail(sockets); + qlk = nxt_queue_next(qlk)) + { + work = nxt_array_add(array); + if (nxt_slow_path(work == NULL)) { + return NXT_ERROR; + } + + work->next = NULL; + work->handler = nxt_router_listen_socket_delete; + work->task = &recf->task; + work->obj = recf->engine; + work->data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); + } + return NXT_OK; } + + +static nxt_int_t +nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt, + nxt_router_temp_conf_t *tmcf) +{ + nxt_int_t ret; + nxt_uint_t i, threads; + nxt_router_engine_conf_t *recf; + + recf = tmcf->engines->elts; + threads = tmcf->conf->threads; + + for (i = tmcf->new_threads; i < threads; i++) { + ret = nxt_router_thread_create(task, rt, recf[i].engine); + if (nxt_slow_path(ret != NXT_OK)) { + return ret; + } + } + + return NXT_OK; +} + + +static nxt_int_t +nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, + nxt_event_engine_t *engine) +{ + nxt_int_t ret; + nxt_thread_link_t *link; + nxt_thread_handle_t handle; + + link = nxt_zalloc(sizeof(nxt_thread_link_t)); + + if (nxt_slow_path(link == NULL)) { + return NXT_ERROR; + } + + link->start = nxt_router_thread_start; + link->engine = engine; + link->work.handler = nxt_router_thread_exit_handler; + link->work.task = task; + link->work.data = link; + + nxt_queue_insert_tail(&rt->engines, &engine->link); + + ret = nxt_thread_create(&handle, link); + + if (nxt_slow_path(ret != NXT_OK)) { + nxt_queue_remove(&engine->link); + } + + return ret; +} + + +static void +nxt_router_engines_post(nxt_router_temp_conf_t *tmcf) +{ + nxt_uint_t n; + nxt_router_engine_conf_t *recf; + + recf = tmcf->engines->elts; + + for (n = tmcf->engines->nelts; n != 0; n--) { + nxt_router_engine_post(recf); + recf++; + } +} + + +static void +nxt_router_engine_post(nxt_router_engine_conf_t *recf) +{ + nxt_uint_t n; + nxt_work_t *work; + + work = recf->creating->elts; + + for (n = recf->creating->nelts; n != 0; n--) { + nxt_event_engine_post(recf->engine, work); + work++; + } +} + + +static void +nxt_router_thread_start(void *data) +{ + nxt_thread_t *thread; + nxt_thread_link_t *link; + nxt_event_engine_t *engine; + + link = data; + engine = link->engine; + + thread = nxt_thread(); + + /* STUB */ + thread->runtime = engine->task.thread->runtime; + + engine->task.thread = thread; + engine->task.log = thread->log; + thread->engine = engine; + thread->fiber = &engine->fibers->fiber; + + engine->mem_pool = nxt_mem_cache_pool_create(4096, 1024, 1024, 64); + + nxt_event_engine_start(engine); +} + + +static void +nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data) +{ + nxt_listen_event_t *listen; + nxt_event_engine_t *engine; + nxt_listen_socket_t *ls; + nxt_socket_conf_joint_t *joint; + + engine = obj; + joint = data; + + ls = &joint->socket_conf->listen; + + listen = nxt_listen_event(task, ls); + if (nxt_slow_path(listen == NULL)) { + nxt_router_listen_socket_release(task, joint); + return; + } + + listen->socket.data = joint; +} + + +nxt_inline nxt_listen_event_t * +nxt_router_listen_event(nxt_queue_t *listen_connections, + nxt_socket_conf_t *skcf) +{ + nxt_socket_t socket; + nxt_queue_link_t *link; + nxt_listen_event_t *listen; + + socket = skcf->listen.socket; + + for (link = nxt_queue_first(listen_connections); + link != nxt_queue_tail(listen_connections); + link = nxt_queue_next(link)) + { + listen = nxt_queue_link_data(link, nxt_listen_event_t, link); + + if (socket == listen->socket.fd) { + return listen; + } + } + + return NULL; +} + + +static void +nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data) +{ + nxt_event_engine_t *engine; + nxt_listen_event_t *listen; + nxt_socket_conf_joint_t *joint, *old; + + engine = obj; + joint = data; + + listen = nxt_router_listen_event(&engine->listen_connections, + joint->socket_conf); + + old = listen->socket.data; + listen->socket.data = joint; + + nxt_router_conf_release(task, old); +} + + +static void +nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data) +{ + nxt_socket_conf_t *skcf; + nxt_listen_event_t *listen; + nxt_event_engine_t *engine; + + engine = obj; + skcf = data; + + listen = nxt_router_listen_event(&engine->listen_connections, skcf); + + nxt_fd_event_delete(engine, &listen->socket); + + listen->timer.handler = nxt_router_listen_socket_close; + listen->timer.work_queue = &engine->fast_work_queue; + + nxt_timer_add(engine, &listen->timer, 0); +} + + +static void +nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data) +{ + nxt_timer_t *timer; + nxt_listen_event_t *listen; + nxt_socket_conf_joint_t *joint; + + timer = obj; + listen = nxt_timer_data(timer, nxt_listen_event_t, timer); + joint = listen->socket.data; + + nxt_queue_remove(&listen->link); + nxt_free(listen); + + nxt_router_listen_socket_release(task, joint); +} + + +static void +nxt_router_listen_socket_release(nxt_task_t *task, + nxt_socket_conf_joint_t *joint) +{ + nxt_socket_t s; + nxt_listen_socket_t *ls; + nxt_thread_spinlock_t *lock; + + s = -1; + ls = &joint->socket_conf->listen; + lock = &joint->socket_conf->router_conf->router->lock; + + nxt_thread_spin_lock(lock); + + if (--ls->count == 0) { + s = ls->socket; + ls->socket = -1; + } + + nxt_thread_spin_unlock(lock); + + if (s != -1) { + nxt_socket_close(task, s); + } + + nxt_router_conf_release(task, joint); +} + + +static void +nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint) +{ + nxt_socket_conf_t *skcf; + nxt_router_conf_t *rtcf; + nxt_thread_spinlock_t *lock; + + nxt_debug(task, "conf joint count: %D", joint->count); + + if (--joint->count != 0) { + return; + } + + nxt_queue_remove(&joint->link); + + skcf = joint->socket_conf; + rtcf = skcf->router_conf; + lock = &rtcf->router->lock; + + nxt_thread_spin_lock(lock); + + if (--skcf->count != 0) { + rtcf = NULL; + + } else { + nxt_queue_remove(&skcf->link); + + if (--rtcf->count != 0) { + rtcf = NULL; + } + } + + nxt_thread_spin_unlock(lock); + + if (rtcf != NULL) { + nxt_mem_pool_destroy(rtcf->mem_pool); + } + + if (nxt_queue_is_empty(&joint->engine->joints)) { + nxt_thread_exit(task->thread); + } +} + + +static void +nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data) +{ + nxt_thread_link_t *link; + nxt_event_engine_t *engine; + nxt_thread_handle_t handle; + + handle = obj; + link = data; + + nxt_thread_wait(handle); + + engine = link->engine; + + nxt_queue_remove(&engine->link); + + nxt_mem_cache_pool_destroy(engine->mem_pool); + + nxt_event_engine_free(engine); + + nxt_free(link); + + // TODO: free port +} + + +static const nxt_event_conn_state_t nxt_router_conn_read_state + nxt_aligned(64) = +{ + .ready_handler = nxt_router_conn_http_header_parse, + .close_handler = nxt_router_conn_close, + .error_handler = nxt_router_conn_error, + + .timer_handler = nxt_router_conn_timeout, + .timer_value = nxt_router_conn_timeout_value, + .timer_data = offsetof(nxt_socket_conf_t, header_read_timeout), +}; + + +static void +nxt_router_conn_init(nxt_task_t *task, void *obj, void *data) +{ + size_t size; + nxt_event_conn_t *c; + nxt_event_engine_t *engine; + nxt_socket_conf_joint_t *joint; + + c = obj; + joint = data; + + nxt_debug(task, "router conn init"); + + joint->count++; + + size = joint->socket_conf->header_buffer_size; + c->read = nxt_buf_mem_alloc(c->mem_pool, size, 0); + + c->socket.data = NULL; + + engine = task->thread->engine; + c->read_work_queue = &engine->fast_work_queue; + c->write_work_queue = &engine->fast_work_queue; + + c->read_state = &nxt_router_conn_read_state; + + nxt_event_conn_read(engine, c); +} + + +static const nxt_event_conn_state_t nxt_router_conn_write_state + nxt_aligned(64) = +{ + .ready_handler = nxt_router_conn_close, + .close_handler = nxt_router_conn_close, + .error_handler = nxt_router_conn_error, +}; + + +static void +nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data) +{ + size_t size; + nxt_int_t ret; + nxt_buf_t *b; + nxt_event_conn_t *c; + nxt_socket_conf_joint_t *joint; + nxt_http_request_parse_t *rp; + + c = obj; + rp = data; + + nxt_debug(task, "router conn http header parse"); + + if (rp == NULL) { + rp = nxt_mem_zalloc(c->mem_pool, sizeof(nxt_http_request_parse_t)); + if (nxt_slow_path(rp == NULL)) { + nxt_router_conn_close(task, c, data); + return; + } + + c->socket.data = rp; + } + + ret = nxt_http_parse_request(rp, &c->read->mem); + + nxt_debug(task, "http parse request: %d", ret); + + switch (nxt_expect(NXT_DONE, ret)) { + + case NXT_DONE: + break; + + case NXT_ERROR: + nxt_router_conn_close(task, c, data); + return; + + default: /* NXT_AGAIN */ + + if (c->read->mem.free == c->read->mem.end) { + joint = c->listen->socket.data; + size = joint->socket_conf->large_header_buffer_size, + + b = nxt_buf_mem_alloc(c->mem_pool, size, 0); + if (nxt_slow_path(b == NULL)) { + nxt_router_conn_close(task, c, data); + return; + } + + size = c->read->mem.free - c->read->mem.pos; + nxt_memcpy(b->mem.pos, c->read->mem.pos, size); + + b->mem.free += size; + c->read = b; + } + + nxt_event_conn_read(task->thread->engine, c); + return; + } + + c->write = c->read; + c->write->mem.pos = c->write->mem.start; + c->write_state = &nxt_router_conn_write_state; + + nxt_event_conn_write(task->thread->engine, c); +} + + +static const nxt_event_conn_state_t nxt_router_conn_close_state + nxt_aligned(64) = +{ + .ready_handler = nxt_router_conn_free, +}; + + +static void +nxt_router_conn_close(nxt_task_t *task, void *obj, void *data) +{ + nxt_event_conn_t *c; + + c = obj; + + nxt_debug(task, "router conn close"); + + c->write_state = &nxt_router_conn_close_state; + + nxt_event_conn_close(task->thread->engine, c); +} + + +static void +nxt_router_conn_free(nxt_task_t *task, void *obj, void *data) +{ + nxt_event_conn_t *c; + nxt_socket_conf_joint_t *joint; + + c = obj; + + nxt_debug(task, "router conn close done"); + + joint = c->listen->socket.data; + nxt_router_conf_release(task, joint); + + nxt_mem_pool_destroy(c->mem_pool); +} + + +static void +nxt_router_conn_error(nxt_task_t *task, void *obj, void *data) +{ + nxt_event_conn_t *c; + + c = obj; + + nxt_debug(task, "router conn error"); + + c->write_state = &nxt_router_conn_close_state; + + nxt_event_conn_close(task->thread->engine, c); +} + + +static void +nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data) +{ + nxt_timer_t *timer; + nxt_event_conn_t *c; + + timer = obj; + + nxt_debug(task, "router conn timeout"); + + c = nxt_event_read_timer_conn(timer); + + c->write_state = &nxt_router_conn_close_state; + + nxt_event_conn_close(task->thread->engine, c); +} + + +static nxt_msec_t +nxt_router_conn_timeout_value(nxt_event_conn_t *c, uintptr_t data) +{ + nxt_socket_conf_joint_t *joint; + + joint = c->listen->socket.data; + + return nxt_value_at(nxt_msec_t, joint->socket_conf, data); +} diff --git a/src/nxt_router.h b/src/nxt_router.h new file mode 100644 index 00000000..d31a5964 --- /dev/null +++ b/src/nxt_router.h @@ -0,0 +1,78 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) NGINX, Inc. + */ + +#ifndef _NXT_ROUTER_H_INCLUDED_ +#define _NXT_ROUTER_H_INCLUDED_ + + +#include <nxt_main.h> +#include <nxt_runtime.h> +#include <nxt_master_process.h> + + +typedef struct { + nxt_thread_spinlock_t lock; + nxt_queue_t engines; + + nxt_queue_t sockets; +} nxt_router_t; + + +typedef struct { + uint32_t count; + uint32_t threads; + nxt_router_t *router; + nxt_mem_pool_t *mem_pool; +} nxt_router_conf_t; + + +typedef struct { + nxt_event_engine_t *engine; + nxt_task_t task; + nxt_array_t *creating; /* of nxt_work_t */ + nxt_array_t *updating; /* of nxt_work_t */ + nxt_array_t *deleting; /* of nxt_work_t */ +} nxt_router_engine_conf_t; + + +typedef struct { + nxt_queue_t creating; + nxt_queue_t pending; + nxt_queue_t updating; + nxt_queue_t keeping; + nxt_queue_t deleting; + + uint32_t new_threads; + + nxt_array_t *engines; + nxt_router_conf_t *conf; + nxt_mem_pool_t *mem_pool; +} nxt_router_temp_conf_t; + + +typedef struct { + uint32_t count; + nxt_listen_socket_t listen; + nxt_queue_link_t link; + nxt_router_conf_t *router_conf; + + size_t header_buffer_size; + size_t large_header_buffer_size; + nxt_msec_t header_read_timeout; +} nxt_socket_conf_t; + + +typedef struct { + uint32_t count; + nxt_queue_link_t link; + nxt_event_engine_t *engine; + nxt_socket_conf_t *socket_conf; + + /* Modules configuraitons. */ +} nxt_socket_conf_joint_t; + + +#endif /* _NXT_ROUTER_H_INCLUDED_ */ diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c index 8337712a..5678f522 100644 --- a/src/nxt_runtime.c +++ b/src/nxt_runtime.c @@ -253,20 +253,10 @@ nxt_runtime_systemd_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt) static nxt_int_t nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt) { - nxt_event_engine_t *engine, **e; + nxt_thread_t *thread; + nxt_event_engine_t *engine; const nxt_event_interface_t *interface; - rt->engines = nxt_array_create(rt->mem_pool, 1, - sizeof(nxt_event_engine_t *)); - if (nxt_slow_path(rt->engines == NULL)) { - return NXT_ERROR; - } - - e = nxt_array_add(rt->engines); - if (nxt_slow_path(e == NULL)) { - return NXT_ERROR; - } - interface = nxt_service_get(rt->services, "engine", NULL); if (nxt_slow_path(interface == NULL)) { @@ -281,8 +271,14 @@ nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt) return NXT_ERROR; } + thread = task->thread; + thread->engine = engine; + thread->fiber = &engine->fibers->fiber; + engine->id = rt->last_engine_id++; - *e = engine; + + nxt_queue_init(&rt->engines); + nxt_queue_insert_tail(&rt->engines, &engine->link); return NXT_OK; } @@ -587,12 +583,13 @@ nxt_runtime_event_engine_change(nxt_task_t *task, nxt_runtime_t *rt) void nxt_runtime_event_engine_free(nxt_runtime_t *rt) { - nxt_event_engine_t *engine, **engines; + nxt_queue_link_t *link; + nxt_event_engine_t *engine; - engines = rt->engines->elts; - engine = engines[0]; - nxt_array_remove(rt->engines, &engines[0]); + link = nxt_queue_first(&rt->engines); + nxt_queue_remove(link); + engine = nxt_queue_link_data(link, nxt_event_engine_t, link); nxt_event_engine_free(engine); } diff --git a/src/nxt_runtime.h b/src/nxt_runtime.h index 5cce5703..c24618b7 100644 --- a/src/nxt_runtime.h +++ b/src/nxt_runtime.h @@ -19,7 +19,7 @@ struct nxt_runtime_s { nxt_array_t *listen_sockets; /* of nxt_listen_socket_t */ nxt_array_t *services; /* of nxt_service_t */ - nxt_array_t *engines; /* of nxt_event_engine_t */ + void *data; nxt_runtime_cont_t start; @@ -61,6 +61,8 @@ struct nxt_runtime_s { const char *pid; const char *error_log; + nxt_queue_t engines; /* of nxt_event_engine_t */ + nxt_sockaddr_t *controller_listen; nxt_listen_socket_t *controller_socket; nxt_str_t upstream; diff --git a/src/nxt_signal.c b/src/nxt_signal.c index 492904d1..3ccd0774 100644 --- a/src/nxt_signal.c +++ b/src/nxt_signal.c @@ -141,7 +141,7 @@ nxt_signal_thread_start(nxt_event_engine_t *engine) if (nxt_fast_path(link != NULL)) { link->start = nxt_signal_thread; - link->data = engine; + link->work.data = engine; if (nxt_thread_create(&engine->signals->thread, link) == NXT_OK) { engine->signals->process = nxt_pid; diff --git a/src/nxt_thread.c b/src/nxt_thread.c index 78c6f50c..851ebb4f 100644 --- a/src/nxt_thread.c +++ b/src/nxt_thread.c @@ -114,9 +114,9 @@ nxt_thread_trampoline(void *data) pthread_cleanup_push(nxt_thread_time_cleanup, thr); start = link->start; - data = link->data; + data = link->work.data; - if (link->engine != NULL) { + if (link->work.handler != NULL) { thr->link = link; } else { @@ -180,7 +180,8 @@ nxt_thread_time_cleanup(void *data) void nxt_thread_exit(nxt_thread_t *thr) { - nxt_thread_link_t *link; + nxt_thread_link_t *link; + nxt_event_engine_t *engine; nxt_log_debug(thr->log, "thread exit"); @@ -189,13 +190,14 @@ nxt_thread_exit(nxt_thread_t *thr) if (link != NULL) { /* - * link->handler is already set to an exit handler, - * and link->task is already set to engine->task. + * link->work.handler is already set to an exit handler, + * and link->work.task is already set to the correct engine->task. * The link should be freed by the exit handler. */ link->work.obj = (void *) (uintptr_t) thr->handle; + engine = nxt_container_of(link->work.task, nxt_event_engine_t, task); - nxt_event_engine_post(link->engine, &link->work); + nxt_event_engine_post(engine, &link->work); } nxt_thread_time_free(thr); diff --git a/src/nxt_thread.h b/src/nxt_thread.h index a8e04749..c1f3aaee 100644 --- a/src/nxt_thread.h +++ b/src/nxt_thread.h @@ -90,7 +90,6 @@ typedef void (*nxt_thread_start_t)(void *data); typedef struct { nxt_thread_start_t start; - void *data; nxt_event_engine_t *engine; nxt_work_t work; } nxt_thread_link_t; @@ -180,6 +179,7 @@ struct nxt_thread_s { nxt_runtime_t *runtime; nxt_event_engine_t *engine; + void *data; /* * Although pointer to a current fiber should be a property of diff --git a/src/nxt_thread_pool.c b/src/nxt_thread_pool.c index fd7246c8..c8959dc6 100644 --- a/src/nxt_thread_pool.c +++ b/src/nxt_thread_pool.c @@ -10,6 +10,7 @@ static nxt_int_t nxt_thread_pool_init(nxt_thread_pool_t *tp); static void nxt_thread_pool_exit(nxt_task_t *task, void *obj, void *data); static void nxt_thread_pool_start(void *ctx); +static void nxt_thread_pool_loop(void *ctx); static void nxt_thread_pool_wait(nxt_thread_pool_t *tp); @@ -86,13 +87,8 @@ nxt_thread_pool_init(nxt_thread_pool_t *tp) if (nxt_fast_path(link != NULL)) { link->start = nxt_thread_pool_start; - link->data = tp; - link->engine = tp->engine; - /* - * link->exit is not used. link->engine is used just to - * set thr->link by nxt_thread_trampoline() and the link - * is a mark of the first thread of pool. - */ + link->work.data = tp; + if (nxt_thread_create(&handle, link) == NXT_OK) { tp->ready = 1; goto done; @@ -118,6 +114,22 @@ done: static void nxt_thread_pool_start(void *ctx) { + nxt_thread_t *thr; + nxt_thread_pool_t *tp; + + tp = ctx; + thr = nxt_thread(); + + tp->main = thr->handle; + tp->task.thread = thr; + + nxt_thread_pool_loop(ctx); +} + + +static void +nxt_thread_pool_loop(void *ctx) +{ void *obj, *data; nxt_task_t *task; nxt_thread_t *thr; @@ -127,17 +139,6 @@ nxt_thread_pool_start(void *ctx) tp = ctx; thr = nxt_thread(); - if (thr->link != NULL) { - /* Only the first thread has a link. */ - tp->main = thr->handle; - nxt_free(thr->link); - thr->link = NULL; - - tp->task.thread = thr; - } - - thr->thread_pool = tp; - if (tp->init != NULL) { tp->init(); } @@ -215,8 +216,8 @@ nxt_thread_pool_wait(nxt_thread_pool_t *tp) link = nxt_zalloc(sizeof(nxt_thread_link_t)); if (nxt_fast_path(link != NULL)) { - link->start = nxt_thread_pool_start; - link->data = tp; + link->start = nxt_thread_pool_loop; + link->work.data = tp; if (nxt_thread_create(&handle, link) != NXT_OK) { (void) nxt_atomic_fetch_add(&tp->threads, -1); @@ -232,6 +233,8 @@ nxt_thread_pool_destroy(nxt_thread_pool_t *tp) thr = nxt_thread(); + nxt_log_debug(thr->log, "thread pool destroy: %d", tp->ready); + if (!tp->ready) { nxt_work_queue_add(&thr->engine->fast_work_queue, tp->exit, &tp->task, tp, NULL); diff --git a/src/nxt_thread_time.c b/src/nxt_thread_time.c index 916aa564..95c88fa3 100644 --- a/src/nxt_thread_time.c +++ b/src/nxt_thread_time.c @@ -123,7 +123,7 @@ nxt_time_thread_start(nxt_msec_t interval) if (nxt_fast_path(link != NULL)) { link->start = nxt_time_thread; - link->data = (void *) (uintptr_t) interval; + link->work.data = (void *) (uintptr_t) interval; (void) nxt_thread_create(&handle, link); } |