summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--auto/sources2
-rw-r--r--src/nxt_application.c8
-rw-r--r--src/nxt_clang.h4
-rw-r--r--src/nxt_conn_close.c159
-rw-r--r--src/nxt_epoll_engine.c2
-rw-r--r--src/nxt_event_conn.c128
-rw-r--r--src/nxt_event_conn.h58
-rw-r--r--src/nxt_event_conn_accept.c72
-rw-r--r--src/nxt_event_engine.c4
-rw-r--r--src/nxt_event_engine.h10
-rw-r--r--src/nxt_kqueue_engine.c6
-rw-r--r--src/nxt_listen_socket.h3
-rw-r--r--src/nxt_router.c1076
-rw-r--r--src/nxt_router.h78
-rw-r--r--src/nxt_runtime.c31
-rw-r--r--src/nxt_runtime.h4
-rw-r--r--src/nxt_signal.c2
-rw-r--r--src/nxt_thread.c14
-rw-r--r--src/nxt_thread.h2
-rw-r--r--src/nxt_thread_pool.c43
-rw-r--r--src/nxt_thread_time.c2
21 files changed, 1447 insertions, 261 deletions
diff --git a/auto/sources b/auto/sources
index efe748d1..d350804c 100644
--- a/auto/sources
+++ b/auto/sources
@@ -71,6 +71,7 @@ NXT_LIB_DEPS=" \
src/nxt_conf.h \
src/nxt_application.h \
src/nxt_master_process.h \
+ src/nxt_router.h \
"
NXT_LIB_SRCS=" \
@@ -128,6 +129,7 @@ NXT_LIB_SRCS=" \
src/nxt_event_conn_accept.c \
src/nxt_event_conn_read.c \
src/nxt_event_conn_write.c \
+ src/nxt_conn_close.c \
src/nxt_event_conn_job_sendfile.c \
src/nxt_event_conn_proxy.c \
src/nxt_job.c \
diff --git a/src/nxt_application.c b/src/nxt_application.c
index f45d35c9..eedfd0c9 100644
--- a/src/nxt_application.c
+++ b/src/nxt_application.c
@@ -99,7 +99,7 @@ nxt_app_start(nxt_task_t *task, nxt_runtime_t *rt)
if (nxt_fast_path(link != NULL)) {
link->start = nxt_app_thread;
- link->data = rt;
+ link->work.data = rt;
return nxt_thread_create(&handle, link);
}
@@ -151,8 +151,8 @@ nxt_app_thread(void *ctx)
nxt_socket_t s;
nxt_thread_t *thr;
nxt_runtime_t *rt;
+ nxt_queue_link_t *link;
nxt_app_request_t *r;
- nxt_event_engine_t **engines;
nxt_listen_socket_t *ls;
u_char buf[SIZE];
const size_t size = SIZE;
@@ -163,9 +163,9 @@ nxt_app_thread(void *ctx)
nxt_log_debug(thr->log, "app thread");
rt = ctx;
- engines = rt->engines->elts;
- nxt_app_engine = engines[0];
+ link = nxt_queue_first(&rt->engines);
+ nxt_app_engine = nxt_queue_link_data(link, nxt_event_engine_t, link);
nxt_app_mem_pool = nxt_mem_pool_create(512);
if (nxt_slow_path(nxt_app_mem_pool == NULL)) {
diff --git a/src/nxt_clang.h b/src/nxt_clang.h
index 6cb752dd..31bc6fda 100644
--- a/src/nxt_clang.h
+++ b/src/nxt_clang.h
@@ -173,6 +173,10 @@ nxt_container_of(p, type, field) \
(type *) ((u_char *) (p) - offsetof(type, field))
+#define nxt_value_at(type, p, offset) \
+ *(type *) ((u_char *) p + offset)
+
+
#define \
nxt_nitems(x) \
(sizeof(x) / sizeof((x)[0]))
diff --git a/src/nxt_conn_close.c b/src/nxt_conn_close.c
new file mode 100644
index 00000000..18b2450c
--- /dev/null
+++ b/src/nxt_conn_close.c
@@ -0,0 +1,159 @@
+
+/*
+ * Copyright (C) Igor Sysoev
+ * Copyright (C) NGINX, Inc.
+ */
+
+#include <nxt_main.h>
+
+
+static void nxt_conn_shutdown_handler(nxt_task_t *task, void *obj, void *data);
+static void nxt_conn_close_handler(nxt_task_t *task, void *obj, void *data);
+static void nxt_conn_close_timer_handler(nxt_task_t *task, void *obj,
+ void *data);
+static void nxt_conn_close_error_ignore(nxt_task_t *task, void *obj,
+ void *data);
+
+
+void
+nxt_event_conn_close(nxt_event_engine_t *engine, nxt_event_conn_t *c)
+{
+ int ret;
+ socklen_t len;
+ struct linger linger;
+ nxt_work_queue_t *wq;
+ nxt_work_handler_t handler;
+
+ nxt_debug(c->socket.task, "conn close fd:%d, to:%d",
+ c->socket.fd, c->socket.timedout);
+
+ if (c->socket.timedout) {
+ /*
+ * Resetting of timed out connection on close
+ * releases kernel memory associated with socket.
+ * This also causes sending TCP/IP RST to a peer.
+ */
+ linger.l_onoff = 1;
+ linger.l_linger = 0;
+ len = sizeof(struct linger);
+
+ ret = setsockopt(c->socket.fd, SOL_SOCKET, SO_LINGER, &linger, len);
+
+ if (nxt_slow_path(ret != 0)) {
+ nxt_log(c->socket.task, NXT_LOG_CRIT,
+ "setsockopt(%d, SO_LINGER) failed %E",
+ c->socket.fd, nxt_socket_errno);
+ }
+ }
+
+ /*
+ * Event errors should be ignored here to avoid repeated nxt_conn_close()
+ * calls. nxt_conn_close_handler() or nxt_conn_close_timer_handler()
+ * will eventually close socket.
+ */
+ c->socket.error_handler = nxt_conn_close_error_ignore;
+
+ if (c->socket.error == 0 && !c->socket.closed && !c->socket.shutdown) {
+ wq = &engine->shutdown_work_queue;
+ handler = nxt_conn_shutdown_handler;
+
+ } else{
+ wq = &engine->close_work_queue;
+ handler = nxt_conn_close_handler;
+ }
+
+ nxt_work_queue_add(wq, handler, c->socket.task, c, engine);
+}
+
+
+static void
+nxt_conn_shutdown_handler(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_event_conn_t *c;
+ nxt_event_engine_t *engine;
+
+ c = obj;
+ engine = data;
+
+ nxt_debug(task, "conn shutdown handler fd:%d", c->socket.fd);
+
+ c->socket.shutdown = 1;
+
+ nxt_socket_shutdown(task, c->socket.fd, SHUT_RDWR);
+
+ nxt_work_queue_add(&engine->close_work_queue, nxt_conn_close_handler,
+ task, c, engine);
+}
+
+
+static void
+nxt_conn_close_handler(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_uint_t events_pending, timers_pending;
+ nxt_event_conn_t *c;
+ nxt_event_engine_t *engine;
+
+ c = obj;
+ engine = data;
+
+ nxt_debug(task, "conn close handler fd:%d", c->socket.fd);
+
+ /*
+ * Socket should be closed only after all pending socket event operations
+ * will be processed by the kernel. This could be achieved with zero-timer
+ * handler. Pending timer operations associated with the socket are
+ * processed before going to the kernel.
+ */
+
+ timers_pending = nxt_timer_delete(engine, &c->read_timer);
+ timers_pending += nxt_timer_delete(engine, &c->write_timer);
+
+ events_pending = nxt_fd_event_close(engine, &c->socket);
+
+ if (events_pending == 0) {
+ nxt_socket_close(task, c->socket.fd);
+ c->socket.fd = -1;
+
+ if (timers_pending == 0) {
+ nxt_work_queue_add(&engine->fast_work_queue,
+ c->write_state->ready_handler,
+ task, c, c->socket.data);
+ return;
+ }
+ }
+
+ c->write_timer.handler = nxt_conn_close_timer_handler;
+ c->write_timer.work_queue = &engine->fast_work_queue;
+
+ nxt_timer_add(engine, &c->write_timer, 0);
+}
+
+
+static void
+nxt_conn_close_timer_handler(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_timer_t *timer;
+ nxt_event_conn_t *c;
+
+ timer = obj;
+
+ c = nxt_event_write_timer_conn(timer);
+
+ nxt_debug(task, "conn close timer handler fd:%d", c->socket.fd);
+
+ if (c->socket.fd != -1) {
+ nxt_socket_close(task, c->socket.fd);
+ c->socket.fd = -1;
+ }
+
+ nxt_work_queue_add(&task->thread->engine->fast_work_queue,
+ c->write_state->ready_handler,
+ task, c, c->socket.data);
+}
+
+
+static void
+nxt_conn_close_error_ignore(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_debug(task, "conn close error ignore");
+}
diff --git a/src/nxt_epoll_engine.c b/src/nxt_epoll_engine.c
index 149f5cb8..d46ee007 100644
--- a/src/nxt_epoll_engine.c
+++ b/src/nxt_epoll_engine.c
@@ -994,7 +994,7 @@ nxt_epoll_event_conn_io_accept4(nxt_task_t *task, void *obj, void *data)
nxt_event_conn_listen_t *cls;
cls = obj;
- c = data;
+ c = cls->next;
cls->ready--;
cls->socket.read_ready = (cls->ready != 0);
diff --git a/src/nxt_event_conn.c b/src/nxt_event_conn.c
index 079901f3..b8adb00a 100644
--- a/src/nxt_event_conn.c
+++ b/src/nxt_event_conn.c
@@ -7,12 +7,6 @@
#include <nxt_main.h>
-static void nxt_conn_shutdown_handler(nxt_task_t *task, void *obj, void *data);
-static void nxt_conn_close_handler(nxt_task_t *task, void *obj, void *data);
-static void nxt_conn_close_timer_handler(nxt_task_t *task, void *obj,
- void *data);
-
-
nxt_event_conn_io_t nxt_unix_event_conn_io = {
nxt_event_conn_io_connect,
nxt_event_conn_io_accept,
@@ -132,128 +126,6 @@ nxt_event_conn_io_shutdown(nxt_task_t *task, void *obj, void *data)
void
-nxt_event_conn_close(nxt_event_engine_t *engine, nxt_event_conn_t *c)
-{
- int ret;
- socklen_t len;
- struct linger linger;
- nxt_work_queue_t *wq;
- nxt_work_handler_t handler;
-
- if (c->socket.timedout) {
- /*
- * Resetting of timed out connection on close
- * releases kernel memory associated with socket.
- * This also causes sending TCP/IP RST to a peer.
- */
- linger.l_onoff = 1;
- linger.l_linger = 0;
- len = sizeof(struct linger);
-
- ret = setsockopt(c->socket.fd, SOL_SOCKET, SO_LINGER, &linger, len);
-
- if (nxt_slow_path(ret != 0)) {
- nxt_log(c->socket.task, NXT_LOG_CRIT,
- "setsockopt(%d, SO_LINGER) failed %E",
- c->socket.fd, nxt_socket_errno);
- }
- }
-
- if (c->socket.error == 0 && !c->socket.closed && !c->socket.shutdown) {
- wq = &engine->shutdown_work_queue;
- handler = nxt_conn_shutdown_handler;
-
- } else{
- wq = &engine->close_work_queue;
- handler = nxt_conn_close_handler;
- }
-
- nxt_work_queue_add(wq, handler, c->socket.task, c, engine);
-}
-
-
-static void
-nxt_conn_shutdown_handler(nxt_task_t *task, void *obj, void *data)
-{
- nxt_event_conn_t *c;
- nxt_event_engine_t *engine;
-
- c = obj;
- engine = data;
-
- nxt_debug(task, "event conn shutdown fd:%d", c->socket.fd);
-
- c->socket.shutdown = 1;
-
- nxt_socket_shutdown(task, c->socket.fd, SHUT_RDWR);
-
- nxt_work_queue_add(&engine->close_work_queue, nxt_conn_close_handler,
- task, c, engine);
-}
-
-
-static void
-nxt_conn_close_handler(nxt_task_t *task, void *obj, void *data)
-{
- nxt_uint_t events_pending, timers_pending;
- nxt_event_conn_t *c;
- nxt_event_engine_t *engine;
-
- c = obj;
- engine = data;
-
- nxt_debug(task, "event conn close fd:%d", c->socket.fd);
-
- timers_pending = nxt_timer_delete(engine, &c->read_timer);
- timers_pending += nxt_timer_delete(engine, &c->write_timer);
-
- events_pending = nxt_fd_event_close(engine, &c->socket);
-
- if (events_pending == 0) {
- nxt_socket_close(task, c->socket.fd);
- c->socket.fd = -1;
-
- if (timers_pending == 0) {
- nxt_work_queue_add(&engine->fast_work_queue,
- c->write_state->ready_handler,
- task, c, c->socket.data);
- return;
- }
- }
-
- c->write_timer.handler = nxt_conn_close_timer_handler;
- c->write_timer.work_queue = &engine->fast_work_queue;
-
- nxt_timer_add(engine, &c->write_timer, 0);
-}
-
-
-static void
-nxt_conn_close_timer_handler(nxt_task_t *task, void *obj, void *data)
-{
- nxt_timer_t *ev;
- nxt_event_conn_t *c;
- nxt_event_engine_t *engine;
-
- ev = obj;
-
- c = nxt_event_write_timer_conn(ev);
-
- nxt_debug(task, "event conn close handler fd:%d", c->socket.fd);
-
- if (c->socket.fd != -1) {
- nxt_socket_close(task, c->socket.fd);
- c->socket.fd = -1;
- }
-
- engine = task->thread->engine;
-
- nxt_work_queue_add(&engine->fast_work_queue, c->write_state->ready_handler,
- task, c, c->socket.data);
-}
-
-
-void
nxt_event_conn_timer(nxt_event_engine_t *engine, nxt_event_conn_t *c,
const nxt_event_conn_state_t *state, nxt_timer_t *tev)
{
diff --git a/src/nxt_event_conn.h b/src/nxt_event_conn.h
index 30d4fedc..902dff9f 100644
--- a/src/nxt_event_conn.h
+++ b/src/nxt_event_conn.h
@@ -100,6 +100,35 @@ typedef struct {
} nxt_event_conn_io_t;
+/*
+ * The nxt_event_conn_listen_t is separated from nxt_listen_socket_t
+ * because nxt_listen_socket_t is one per process whilst each worker
+ * thread uses own nxt_event_conn_listen_t.
+ */
+typedef struct {
+ /* Must be the first field. */
+ nxt_fd_event_t socket;
+
+ nxt_task_t task;
+
+ uint32_t ready;
+ uint32_t batch;
+
+ /* An accept() interface is cached to minimize memory accesses. */
+ nxt_work_handler_t accept;
+
+ nxt_listen_socket_t *listen;
+ nxt_event_conn_t *next; /* STUB */;
+ nxt_work_queue_t *work_queue;
+
+ nxt_timer_t timer;
+
+ nxt_queue_link_t link;
+} nxt_event_conn_listen_t;
+
+typedef nxt_event_conn_listen_t nxt_listen_event_t;
+
+
struct nxt_event_conn_s {
/*
* Must be the first field, since nxt_fd_event_t
@@ -143,7 +172,7 @@ struct nxt_event_conn_s {
nxt_task_t task;
nxt_log_t log;
- nxt_listen_socket_t *listen;
+ nxt_event_conn_listen_t *listen;
nxt_sockaddr_t *remote;
nxt_sockaddr_t *local;
const char *action;
@@ -163,31 +192,6 @@ struct nxt_event_conn_s {
};
-/*
- * The nxt_event_conn_listen_t is separated from nxt_listen_socket_t
- * because nxt_listen_socket_t is one per process whilst each worker
- * thread uses own nxt_event_conn_listen_t.
- */
-typedef struct {
- /* Must be the first field. */
- nxt_fd_event_t socket;
-
- nxt_task_t task;
-
- uint32_t ready;
- uint32_t batch;
-
- /* An accept() interface is cached to minimize memory accesses. */
- nxt_work_handler_t accept;
-
- nxt_listen_socket_t *listen;
-
- nxt_timer_t timer;
-
- nxt_queue_link_t link;
-} nxt_event_conn_listen_t;
-
-
#define \
nxt_event_conn_timer_init(ev, c, wq) \
do { \
@@ -256,6 +260,8 @@ nxt_int_t nxt_event_conn_socket(nxt_task_t *task, nxt_event_conn_t *c);
void nxt_event_conn_connect_test(nxt_task_t *task, void *obj, void *data);
void nxt_event_conn_connect_error(nxt_task_t *task, void *obj, void *data);
+NXT_EXPORT nxt_event_conn_listen_t *nxt_listen_event(nxt_task_t *task,
+ nxt_listen_socket_t *ls);
NXT_EXPORT nxt_int_t nxt_event_conn_listen(nxt_task_t *task,
nxt_listen_socket_t *ls);
void nxt_event_conn_io_accept(nxt_task_t *task, void *obj, void *data);
diff --git a/src/nxt_event_conn_accept.c b/src/nxt_event_conn_accept.c
index 5f0a72f0..ced2a3d8 100644
--- a/src/nxt_event_conn_accept.c
+++ b/src/nxt_event_conn_accept.c
@@ -32,6 +32,53 @@ static void nxt_event_conn_listen_timer_handler(nxt_task_t *task, void *obj,
void *data);
+nxt_event_conn_listen_t *
+nxt_listen_event(nxt_task_t *task, nxt_listen_socket_t *ls)
+{
+ nxt_event_engine_t *engine;
+ nxt_event_conn_listen_t *cls;
+
+ cls = nxt_zalloc(sizeof(nxt_event_conn_listen_t));
+
+ if (nxt_fast_path(cls != NULL)) {
+ cls->socket.fd = ls->socket;
+
+ engine = task->thread->engine;
+ cls->batch = engine->batch;
+
+ cls->socket.read_work_queue = &engine->accept_work_queue;
+ cls->socket.read_handler = nxt_event_conn_listen_handler;
+ cls->socket.error_handler = nxt_event_conn_listen_event_error;
+ cls->socket.log = &nxt_main_log;
+
+ cls->accept = engine->event.io->accept;
+
+ cls->listen = ls;
+ cls->work_queue = &engine->read_work_queue;
+
+ cls->timer.work_queue = &engine->fast_work_queue;
+ cls->timer.handler = nxt_event_conn_listen_timer_handler;
+ cls->timer.log = &nxt_main_log;
+
+ cls->task.thread = task->thread;
+ cls->task.log = &nxt_main_log;
+ cls->task.ident = nxt_task_next_ident();
+ cls->socket.task = &cls->task;
+ cls->timer.task = &cls->task;
+
+ if (nxt_event_conn_accept_alloc(task, cls) != NULL) {
+ nxt_fd_event_enable_accept(engine, &cls->socket);
+
+ nxt_queue_insert_head(&engine->listen_connections, &cls->link);
+ }
+
+ return cls;
+ }
+
+ return NULL;
+}
+
+
nxt_int_t
nxt_event_conn_listen(nxt_task_t *task, nxt_listen_socket_t *ls)
{
@@ -97,13 +144,12 @@ nxt_event_conn_accept_alloc(nxt_task_t *task, nxt_event_conn_listen_t *cls)
/* This allocation cannot fail. */
c = nxt_event_conn_create(mp, cls->socket.task);
- cls->socket.data = c;
+ cls->next = c;
c->socket.read_work_queue = cls->socket.read_work_queue;
c->socket.write_ready = 1;
+ c->listen = cls;
ls = cls->listen;
- c->listen = ls;
-
/* This allocation cannot fail. */
remote = nxt_sockaddr_alloc(mp, ls->socklen, ls->address_length);
c->remote = remote;
@@ -146,7 +192,7 @@ nxt_event_conn_io_accept(nxt_task_t *task, void *obj, void *data)
nxt_event_conn_listen_t *cls;
cls = obj;
- c = data;
+ c = cls->next;
cls->ready--;
cls->socket.read_ready = (cls->ready != 0);
@@ -200,17 +246,19 @@ nxt_event_conn_accept(nxt_task_t *task, nxt_event_conn_listen_t *cls,
nxt_queue_insert_head(&task->thread->engine->idle_connections, &c->link);
- c->read_work_queue = c->listen->work_queue;
- c->write_work_queue = c->listen->work_queue;
+ c->read_work_queue = cls->work_queue;
+ c->write_work_queue = cls->work_queue;
- if (c->listen->read_after_accept) {
+ if (cls->listen->read_after_accept) {
//c->socket.read_ready = 1;
- c->listen->handler(task, c, NULL);
+// cls->listen->handler(task, c, cls->socket.data);
+ nxt_work_queue_add(c->read_work_queue, cls->listen->handler,
+ task, c, cls->socket.data);
} else {
- nxt_work_queue_add(c->write_work_queue, c->listen->handler,
- task, c, NULL);
+ nxt_work_queue_add(c->write_work_queue, cls->listen->handler,
+ task, c, cls->socket.data);
}
next = nxt_event_conn_accept_next(task, cls);
@@ -227,7 +275,7 @@ nxt_event_conn_accept_next(nxt_task_t *task, nxt_event_conn_listen_t *cls)
{
nxt_event_conn_t *c;
- cls->socket.data = NULL;
+ cls->next = NULL;
do {
c = nxt_event_conn_accept_alloc(task, cls);
@@ -338,7 +386,7 @@ nxt_event_conn_listen_timer_handler(nxt_task_t *task, void *obj, void *data)
ev = obj;
cls = nxt_timer_data(ev, nxt_event_conn_listen_t, timer);
- c = cls->socket.data;
+ c = cls->next;
if (c == NULL) {
c = nxt_event_conn_accept_next(task, cls);
diff --git a/src/nxt_event_engine.c b/src/nxt_event_engine.c
index bd10a270..52bfa39d 100644
--- a/src/nxt_event_engine.c
+++ b/src/nxt_event_engine.c
@@ -44,9 +44,6 @@ nxt_event_engine_create(nxt_task_t *task,
engine->task.log = thread->log;
engine->task.ident = nxt_task_next_ident();
- thread->engine = engine;
- thread->fiber = &engine->fibers->fiber;
-
engine->batch = batch;
if (flags & NXT_ENGINE_FIBERS) {
@@ -121,6 +118,7 @@ nxt_event_engine_create(nxt_task_t *task,
engine->max_connections = 0xffffffff;
+ nxt_queue_init(&engine->joints);
nxt_queue_init(&engine->listen_connections);
nxt_queue_init(&engine->idle_connections);
diff --git a/src/nxt_event_engine.h b/src/nxt_event_engine.h
index 30d1fb21..6947cc8d 100644
--- a/src/nxt_event_engine.h
+++ b/src/nxt_event_engine.h
@@ -355,6 +355,11 @@ nxt_fd_event_disable(engine, ev) \
#define \
+nxt_fd_event_delete(engine, ev) \
+ (engine)->event.delete(engine, ev)
+
+
+#define \
nxt_fd_event_close(engine, ev) \
(engine)->event.close(engine, ev)
@@ -481,8 +486,13 @@ struct nxt_event_engine_s {
uint32_t connections;
uint32_t max_connections;
+ nxt_port_t *port;
+ nxt_mem_cache_pool_t *mem_pool;
+ nxt_queue_t joints;
nxt_queue_t listen_connections;
nxt_queue_t idle_connections;
+
+ nxt_queue_link_t link;
};
diff --git a/src/nxt_kqueue_engine.c b/src/nxt_kqueue_engine.c
index 559f0c9e..a216e066 100644
--- a/src/nxt_kqueue_engine.c
+++ b/src/nxt_kqueue_engine.c
@@ -522,6 +522,8 @@ nxt_kqueue_fd_error_handler(nxt_task_t *task, void *obj, void *data)
ev = obj;
+ nxt_debug(task, "kqueue fd error handler fd:%d", ev->fd);
+
if (ev->kq_eof && ev->kq_errno != 0) {
ev->error = ev->kq_errno;
nxt_log(task, nxt_socket_error_level(ev->kq_errno),
@@ -544,6 +546,8 @@ nxt_kqueue_file_error_handler(nxt_task_t *task, void *obj, void *data)
ev = obj;
+ nxt_debug(task, "kqueue file error handler fd:%d", ev->file->fd);
+
ev->handler(task, ev, data);
}
@@ -924,7 +928,7 @@ nxt_kqueue_event_conn_io_accept(nxt_task_t *task, void *obj, void *data)
nxt_event_conn_listen_t *cls;
cls = obj;
- c = data;
+ c = cls->next;
cls->ready--;
cls->socket.read_ready = (cls->ready != 0);
diff --git a/src/nxt_listen_socket.h b/src/nxt_listen_socket.h
index 1e62ae80..ce7a8aeb 100644
--- a/src/nxt_listen_socket.h
+++ b/src/nxt_listen_socket.h
@@ -31,9 +31,8 @@ typedef struct {
uint8_t socklen;
uint8_t address_length;
+ uint32_t count;
uint32_t mem_pool_size;
-
- void *servers;
} nxt_listen_socket_t;
diff --git a/src/nxt_router.c b/src/nxt_router.c
index caef8503..1f96a1df 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -5,81 +5,1083 @@
* Copyright (C) NGINX, Inc.
*/
-#include <nxt_main.h>
-#include <nxt_runtime.h>
-#include <nxt_master_process.h>
+#include <nxt_router.h>
-static nxt_int_t nxt_router_listen_socket(nxt_task_t *task, nxt_runtime_t *rt);
+static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task,
+ nxt_router_t *router);
+static void nxt_router_listen_sockets_sort(nxt_router_t *router,
+ nxt_router_temp_conf_t *tmcf);
+
+static nxt_int_t nxt_router_stub_conf(nxt_task_t *task,
+ nxt_router_temp_conf_t *tmcf);
+static nxt_int_t nxt_router_listen_sockets_stub_create(nxt_task_t *task,
+ nxt_router_temp_conf_t *tmcf);
+static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
+ nxt_mem_pool_t *mp, nxt_sockaddr_t *sa);
+static nxt_sockaddr_t *nxt_router_listen_sockaddr_stub(nxt_task_t *task,
+ nxt_mem_pool_t *mp, uint32_t port);
+
+static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
+ nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
+ const nxt_event_interface_t *interface);
+static nxt_int_t nxt_router_engine_conf_create(nxt_task_t *task,
+ nxt_mem_pool_t *mp, nxt_router_temp_conf_t *tmcf,
+ nxt_router_engine_conf_t *recf);
+static nxt_int_t nxt_router_engine_conf_update(nxt_task_t *task,
+ nxt_mem_pool_t *mp, nxt_router_temp_conf_t *tmcf,
+ nxt_router_engine_conf_t *recf);
+static nxt_int_t nxt_router_engine_conf_delete(nxt_task_t *task,
+ nxt_mem_pool_t *mp, nxt_router_temp_conf_t *tmcf,
+ nxt_router_engine_conf_t *recf);
+static nxt_int_t nxt_router_engine_joints_create(nxt_task_t *task,
+ nxt_mem_pool_t *mp, nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
+ nxt_array_t *array, nxt_work_handler_t handler);
+static nxt_int_t nxt_router_engine_joints_delete(nxt_task_t *task,
+ nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array);
+
+static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
+ nxt_router_temp_conf_t *tmcf);
+static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
+ nxt_event_engine_t *engine);
+
+static void nxt_router_engines_post(nxt_router_temp_conf_t *tmcf);
+static void nxt_router_engine_post(nxt_router_engine_conf_t *recf);
+
+static void nxt_router_thread_start(void *data);
+static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
+ void *data);
+static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
+ void *data);
+static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
+ void *data);
+static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
+ void *data);
+static void nxt_router_listen_socket_release(nxt_task_t *task,
+ nxt_socket_conf_joint_t *joint);
+static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
+ void *data);
+static void nxt_router_conf_release(nxt_task_t *task,
+ nxt_socket_conf_joint_t *joint);
+
+static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data);
+static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj,
+ void *data);
+static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data);
+static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data);
+static void nxt_router_conn_error(nxt_task_t *task, void *obj, void *data);
+static void nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data);
+static nxt_msec_t nxt_router_conn_timeout_value(nxt_event_conn_t *c,
+ uintptr_t data);
nxt_int_t
nxt_router_start(nxt_task_t *task, nxt_runtime_t *rt)
{
- if (nxt_router_listen_socket(task, rt) != NXT_OK) {
+ nxt_int_t ret;
+ nxt_router_t *router;
+ nxt_router_temp_conf_t *tmcf;
+ const nxt_event_interface_t *interface;
+
+ router = nxt_zalloc(sizeof(nxt_router_t));
+ if (nxt_slow_path(router == NULL)) {
+ return NXT_ERROR;
+ }
+
+ nxt_queue_init(&router->engines);
+ nxt_queue_init(&router->sockets);
+
+ /**/
+
+ tmcf = nxt_router_temp_conf(task, router);
+ if (nxt_slow_path(tmcf == NULL)) {
return NXT_ERROR;
}
+ ret = nxt_router_stub_conf(task, tmcf);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ return ret;
+ }
+
+ nxt_router_listen_sockets_sort(router, tmcf);
+
+ ret = nxt_router_listen_sockets_stub_create(task, tmcf);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ return ret;
+ }
+
+ interface = nxt_service_get(rt->services, "engine", NULL);
+
+ ret = nxt_router_engines_create(task, router, tmcf, interface);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ return ret;
+ }
+
+ ret = nxt_router_threads_create(task, rt, tmcf);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ return ret;
+ }
+
+ nxt_router_engines_post(tmcf);
+
+ nxt_queue_add(&router->sockets, &tmcf->updating);
+ nxt_queue_add(&router->sockets, &tmcf->creating);
+
return NXT_OK;
}
+static nxt_router_temp_conf_t *
+nxt_router_temp_conf(nxt_task_t *task, nxt_router_t *router)
+{
+ nxt_mem_pool_t *mp, *tmp;
+ nxt_router_conf_t *rtcf;
+ nxt_router_temp_conf_t *tmcf;
+
+ mp = nxt_mem_pool_create(1024);
+ if (nxt_slow_path(mp == NULL)) {
+ return NULL;
+ }
+
+ rtcf = nxt_mem_zalloc(mp, sizeof(nxt_router_conf_t));
+ if (nxt_slow_path(rtcf == NULL)) {
+ goto fail;
+ }
+
+ rtcf->mem_pool = mp;
+ rtcf->router = router;
+ rtcf->count = 1;
+
+ tmp = nxt_mem_pool_create(1024);
+ if (nxt_slow_path(tmp == NULL)) {
+ goto fail;
+ }
+
+ tmcf = nxt_mem_zalloc(tmp, sizeof(nxt_router_temp_conf_t));
+ if (nxt_slow_path(tmcf == NULL)) {
+ goto temp_fail;
+ }
+
+ tmcf->mem_pool = tmp;
+ tmcf->conf = rtcf;
+
+ tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
+ sizeof(nxt_router_engine_conf_t));
+ if (nxt_slow_path(tmcf->engines == NULL)) {
+ goto temp_fail;
+ }
+
+ nxt_queue_init(&tmcf->deleting);
+ nxt_queue_init(&tmcf->keeping);
+ nxt_queue_init(&tmcf->updating);
+ nxt_queue_init(&tmcf->pending);
+ nxt_queue_init(&tmcf->creating);
+
+ return tmcf;
+
+temp_fail:
+
+ nxt_mem_pool_destroy(tmp);
+
+fail:
+
+ nxt_mem_pool_destroy(mp);
+
+ return NULL;
+}
+
+
static nxt_int_t
-nxt_router_listen_socket(nxt_task_t *task, nxt_runtime_t *rt)
+nxt_router_stub_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
{
- nxt_sockaddr_t *sa;
- nxt_listen_socket_t *ls;
+ nxt_sockaddr_t *sa;
+ nxt_mem_pool_t *mp;
+ nxt_socket_conf_t *skcf;
- sa = nxt_sockaddr_alloc(rt->mem_pool, sizeof(struct sockaddr_in),
- NXT_INET_ADDR_STR_LEN);
- if (sa == NULL) {
- return NXT_ERROR;
+ tmcf->conf->threads = 1;
+
+ mp = tmcf->conf->mem_pool;
+
+ sa = nxt_router_listen_sockaddr_stub(task, mp, 8000);
+ skcf = nxt_router_socket_conf(task, mp, sa);
+
+ skcf->listen.handler = nxt_router_conn_init;
+ skcf->listen.mem_pool_size = nxt_listen_socket_pool_min_size(&skcf->listen)
+ + sizeof(nxt_event_conn_proxy_t)
+ + sizeof(nxt_event_conn_t)
+ + 4 * sizeof(nxt_buf_t);
+
+ skcf->header_buffer_size = 2048;
+ skcf->large_header_buffer_size = 8192;
+ skcf->header_read_timeout = 5000;
+
+ nxt_queue_insert_tail(&tmcf->pending, &skcf->link);
+
+ sa = nxt_router_listen_sockaddr_stub(task, mp, 8001);
+ skcf = nxt_router_socket_conf(task, mp, sa);
+
+ skcf->listen.handler = nxt_stream_connection_init;
+ skcf->listen.mem_pool_size = nxt_listen_socket_pool_min_size(&skcf->listen)
+ + sizeof(nxt_event_conn_proxy_t)
+ + sizeof(nxt_event_conn_t)
+ + 4 * sizeof(nxt_buf_t);
+
+ skcf->header_read_timeout = 5000;
+
+ nxt_queue_insert_tail(&tmcf->pending, &skcf->link);
+
+ return NXT_OK;
+}
+
+
+static nxt_socket_conf_t *
+nxt_router_socket_conf(nxt_task_t *task, nxt_mem_pool_t *mp, nxt_sockaddr_t *sa)
+{
+ nxt_socket_conf_t *conf;
+
+ conf = nxt_mem_zalloc(mp, sizeof(nxt_socket_conf_t));
+ if (nxt_slow_path(conf == NULL)) {
+ return NULL;
+ }
+
+ conf->listen.sockaddr = sa;
+
+ conf->listen.socket = -1;
+ conf->listen.backlog = NXT_LISTEN_BACKLOG;
+ conf->listen.flags = NXT_NONBLOCK;
+ conf->listen.read_after_accept = 1;
+
+ return conf;
+}
+
+
+static nxt_sockaddr_t *
+nxt_router_listen_sockaddr_stub(nxt_task_t *task, nxt_mem_pool_t *mp,
+ uint32_t port)
+{
+ nxt_sockaddr_t *sa;
+ struct sockaddr_in sin;
+
+ nxt_memzero(&sin, sizeof(struct sockaddr_in));
+
+ sin.sin_family = AF_INET;
+ sin.sin_port = htons(port);
+
+ sa = nxt_sockaddr_create(mp, (struct sockaddr *) &sin,
+ sizeof(struct sockaddr_in), NXT_INET_ADDR_STR_LEN);
+ if (nxt_slow_path(sa == NULL)) {
+ return NULL;
}
sa->type = SOCK_STREAM;
- sa->u.sockaddr_in.sin_family = AF_INET;
- sa->u.sockaddr_in.sin_port = htons(8000);
nxt_sockaddr_text(sa);
- ls = nxt_runtime_listen_socket_add(rt, sa);
- if (ls == NULL) {
+ return sa;
+}
+
+
+static void
+nxt_router_listen_sockets_sort(nxt_router_t *router,
+ nxt_router_temp_conf_t *tmcf)
+{
+ nxt_queue_link_t *nqlk, *oqlk, *next;
+ nxt_socket_conf_t *nskcf, *oskcf;
+
+ for (nqlk = nxt_queue_first(&tmcf->pending);
+ nqlk != nxt_queue_tail(&tmcf->pending);
+ nqlk = next)
+ {
+ next = nxt_queue_next(nqlk);
+ nskcf = nxt_queue_link_data(nqlk, nxt_socket_conf_t, link);
+
+ for (oqlk = nxt_queue_first(&router->sockets);
+ oqlk != nxt_queue_tail(&router->sockets);
+ oqlk = nxt_queue_next(oqlk))
+ {
+ oskcf = nxt_queue_link_data(oqlk, nxt_socket_conf_t, link);
+
+ if (nxt_sockaddr_cmp(nskcf->listen.sockaddr,
+ oskcf->listen.sockaddr))
+ {
+ nxt_queue_remove(oqlk);
+ nxt_queue_insert_tail(&tmcf->keeping, oqlk);
+
+ nxt_queue_remove(nqlk);
+ nxt_queue_insert_tail(&tmcf->updating, nqlk);
+
+ break;
+ }
+ }
+ }
+
+ nxt_queue_add(&tmcf->deleting, &router->sockets);
+}
+
+
+static nxt_int_t
+nxt_router_listen_sockets_stub_create(nxt_task_t *task,
+ nxt_router_temp_conf_t *tmcf)
+{
+ nxt_queue_link_t *qlk, *nqlk;
+ nxt_socket_conf_t *skcf;
+
+ for (qlk = nxt_queue_first(&tmcf->pending);
+ qlk != nxt_queue_tail(&tmcf->pending);
+ qlk = nqlk)
+ {
+ skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
+
+ if (nxt_listen_socket_create(task, &skcf->listen, 0) != NXT_OK) {
+ return NXT_ERROR;
+ }
+
+ nqlk = nxt_queue_next(qlk);
+ nxt_queue_remove(qlk);
+ nxt_queue_insert_tail(&tmcf->creating, qlk);
+ }
+
+ return NXT_OK;
+}
+
+
+static nxt_int_t
+nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
+ nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
+{
+ nxt_int_t ret;
+ nxt_uint_t n, threads;
+ nxt_mem_pool_t *mp;
+ nxt_queue_link_t *qlk;
+ nxt_router_engine_conf_t *recf;
+
+ mp = tmcf->conf->mem_pool;
+ threads = tmcf->conf->threads;
+
+ tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
+ sizeof(nxt_router_engine_conf_t));
+ if (nxt_slow_path(tmcf->engines == NULL)) {
return NXT_ERROR;
}
- ls->read_after_accept = 1;
+ n = 0;
- ls->flags = NXT_NONBLOCK;
+ for (qlk = nxt_queue_first(&router->engines);
+ qlk != nxt_queue_tail(&router->engines);
+ qlk = nxt_queue_next(qlk))
+ {
+ recf = nxt_array_zero_add(tmcf->engines);
+ if (nxt_slow_path(recf == NULL)) {
+ return NXT_ERROR;
+ }
-#if 0
- /* STUB */
- wq = nxt_mem_zalloc(cf->mem_pool, sizeof(nxt_work_queue_t));
- if (wq == NULL) {
+ recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link);
+ // STUB
+ recf->task = recf->engine->task;
+
+ if (n < threads) {
+ ret = nxt_router_engine_conf_update(task, mp, tmcf, recf);
+
+ } else {
+ ret = nxt_router_engine_conf_delete(task, mp, tmcf, recf);
+ }
+
+ if (nxt_slow_path(ret != NXT_OK)) {
+ return ret;
+ }
+
+ n++;
+ }
+
+ tmcf->new_threads = n;
+
+ while (n < threads) {
+ recf = nxt_array_zero_add(tmcf->engines);
+ if (nxt_slow_path(recf == NULL)) {
+ return NXT_ERROR;
+ }
+
+ recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0);
+ if (nxt_slow_path(recf->engine == NULL)) {
+ return NXT_ERROR;
+ }
+ // STUB
+ recf->task = recf->engine->task;
+
+ ret = nxt_router_engine_conf_create(task, mp, tmcf, recf);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ return ret;
+ }
+
+ n++;
+ }
+
+ return NXT_OK;
+}
+
+
+static nxt_int_t
+nxt_router_engine_conf_create(nxt_task_t *task, nxt_mem_pool_t *mp,
+ nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf)
+{
+ nxt_int_t ret;
+
+ recf->creating = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t));
+ if (nxt_slow_path(recf->creating == NULL)) {
return NXT_ERROR;
}
- nxt_work_queue_name(wq, "listen");
- /**/
- ls->work_queue = wq;
-#endif
- ls->handler = nxt_stream_connection_init;
+ ret = nxt_router_engine_joints_create(task, mp, recf, &tmcf->creating,
+ recf->creating, nxt_router_listen_socket_create);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ return ret;
+ }
- /*
- * Connection memory pool chunk size is tunned to
- * allocate the most data in one mem_pool chunk.
- */
- ls->mem_pool_size = nxt_listen_socket_pool_min_size(ls)
- + sizeof(nxt_event_conn_proxy_t)
- + sizeof(nxt_event_conn_t)
- + 4 * sizeof(nxt_buf_t);
+ return nxt_router_engine_joints_create(task, mp, recf, &tmcf->updating,
+ recf->creating, nxt_router_listen_socket_create);
+}
+
+
+static nxt_int_t
+nxt_router_engine_conf_update(nxt_task_t *task, nxt_mem_pool_t *mp,
+ nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf)
+{
+ nxt_int_t ret;
+
+ recf->creating = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t));
+ if (nxt_slow_path(recf->creating == NULL)) {
+ return NXT_ERROR;
+ }
+
+ ret = nxt_router_engine_joints_create(task, mp, recf, &tmcf->creating,
+ recf->creating, nxt_router_listen_socket_create);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ return ret;
+ }
+
+ recf->updating = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t));
+ if (nxt_slow_path(recf->updating == NULL)) {
+ return NXT_ERROR;
+ }
+
+ ret = nxt_router_engine_joints_create(task, mp, recf, &tmcf->updating,
+ recf->updating, nxt_router_listen_socket_update);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ return ret;
+ }
- if (nxt_listen_socket_create(task, ls, 0) != NXT_OK) {
+ recf->deleting = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t));
+ if (nxt_slow_path(recf->deleting == NULL)) {
return NXT_ERROR;
}
- if (nxt_event_conn_listen(task, ls) != NXT_OK) {
+ return nxt_router_engine_joints_delete(task, recf, &tmcf->deleting,
+ recf->deleting);
+}
+
+
+static nxt_int_t
+nxt_router_engine_conf_delete(nxt_task_t *task, nxt_mem_pool_t *mp,
+ nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf)
+{
+ nxt_int_t ret;
+
+ recf->deleting = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t));
+ if (nxt_slow_path(recf->deleting == NULL)) {
return NXT_ERROR;
}
+ ret = nxt_router_engine_joints_delete(task, recf, &tmcf->updating,
+ recf->deleting);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ return ret;
+ }
+
+ return nxt_router_engine_joints_delete(task, recf, &tmcf->deleting,
+ recf->deleting);
+}
+
+
+static nxt_int_t
+nxt_router_engine_joints_create(nxt_task_t *task, nxt_mem_pool_t *mp,
+ nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array,
+ nxt_work_handler_t handler)
+{
+ nxt_work_t *work;
+ nxt_queue_link_t *qlk;
+ nxt_socket_conf_joint_t *joint;
+
+ for (qlk = nxt_queue_first(sockets);
+ qlk != nxt_queue_tail(sockets);
+ qlk = nxt_queue_next(qlk))
+ {
+ work = nxt_array_add(array);
+ if (nxt_slow_path(work == NULL)) {
+ return NXT_ERROR;
+ }
+
+ work->next = NULL;
+ work->handler = handler;
+ work->task = &recf->task;
+ work->obj = recf->engine;
+
+ joint = nxt_mem_alloc(mp, sizeof(nxt_socket_conf_joint_t));
+ if (nxt_slow_path(joint == NULL)) {
+ return NXT_ERROR;
+ }
+
+ work->data = joint;
+
+ joint->count = 1;
+ joint->socket_conf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
+ }
+
+ return NXT_OK;
+}
+
+
+static nxt_int_t
+nxt_router_engine_joints_delete(nxt_task_t *task,
+ nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array)
+{
+ nxt_work_t *work;
+ nxt_queue_link_t *qlk;
+
+ for (qlk = nxt_queue_first(sockets);
+ qlk != nxt_queue_tail(sockets);
+ qlk = nxt_queue_next(qlk))
+ {
+ work = nxt_array_add(array);
+ if (nxt_slow_path(work == NULL)) {
+ return NXT_ERROR;
+ }
+
+ work->next = NULL;
+ work->handler = nxt_router_listen_socket_delete;
+ work->task = &recf->task;
+ work->obj = recf->engine;
+ work->data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
+ }
+
return NXT_OK;
}
+
+
+static nxt_int_t
+nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
+ nxt_router_temp_conf_t *tmcf)
+{
+ nxt_int_t ret;
+ nxt_uint_t i, threads;
+ nxt_router_engine_conf_t *recf;
+
+ recf = tmcf->engines->elts;
+ threads = tmcf->conf->threads;
+
+ for (i = tmcf->new_threads; i < threads; i++) {
+ ret = nxt_router_thread_create(task, rt, recf[i].engine);
+ if (nxt_slow_path(ret != NXT_OK)) {
+ return ret;
+ }
+ }
+
+ return NXT_OK;
+}
+
+
+static nxt_int_t
+nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
+ nxt_event_engine_t *engine)
+{
+ nxt_int_t ret;
+ nxt_thread_link_t *link;
+ nxt_thread_handle_t handle;
+
+ link = nxt_zalloc(sizeof(nxt_thread_link_t));
+
+ if (nxt_slow_path(link == NULL)) {
+ return NXT_ERROR;
+ }
+
+ link->start = nxt_router_thread_start;
+ link->engine = engine;
+ link->work.handler = nxt_router_thread_exit_handler;
+ link->work.task = task;
+ link->work.data = link;
+
+ nxt_queue_insert_tail(&rt->engines, &engine->link);
+
+ ret = nxt_thread_create(&handle, link);
+
+ if (nxt_slow_path(ret != NXT_OK)) {
+ nxt_queue_remove(&engine->link);
+ }
+
+ return ret;
+}
+
+
+static void
+nxt_router_engines_post(nxt_router_temp_conf_t *tmcf)
+{
+ nxt_uint_t n;
+ nxt_router_engine_conf_t *recf;
+
+ recf = tmcf->engines->elts;
+
+ for (n = tmcf->engines->nelts; n != 0; n--) {
+ nxt_router_engine_post(recf);
+ recf++;
+ }
+}
+
+
+static void
+nxt_router_engine_post(nxt_router_engine_conf_t *recf)
+{
+ nxt_uint_t n;
+ nxt_work_t *work;
+
+ work = recf->creating->elts;
+
+ for (n = recf->creating->nelts; n != 0; n--) {
+ nxt_event_engine_post(recf->engine, work);
+ work++;
+ }
+}
+
+
+static void
+nxt_router_thread_start(void *data)
+{
+ nxt_thread_t *thread;
+ nxt_thread_link_t *link;
+ nxt_event_engine_t *engine;
+
+ link = data;
+ engine = link->engine;
+
+ thread = nxt_thread();
+
+ /* STUB */
+ thread->runtime = engine->task.thread->runtime;
+
+ engine->task.thread = thread;
+ engine->task.log = thread->log;
+ thread->engine = engine;
+ thread->fiber = &engine->fibers->fiber;
+
+ engine->mem_pool = nxt_mem_cache_pool_create(4096, 1024, 1024, 64);
+
+ nxt_event_engine_start(engine);
+}
+
+
+static void
+nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_listen_event_t *listen;
+ nxt_event_engine_t *engine;
+ nxt_listen_socket_t *ls;
+ nxt_socket_conf_joint_t *joint;
+
+ engine = obj;
+ joint = data;
+
+ ls = &joint->socket_conf->listen;
+
+ listen = nxt_listen_event(task, ls);
+ if (nxt_slow_path(listen == NULL)) {
+ nxt_router_listen_socket_release(task, joint);
+ return;
+ }
+
+ listen->socket.data = joint;
+}
+
+
+nxt_inline nxt_listen_event_t *
+nxt_router_listen_event(nxt_queue_t *listen_connections,
+ nxt_socket_conf_t *skcf)
+{
+ nxt_socket_t socket;
+ nxt_queue_link_t *link;
+ nxt_listen_event_t *listen;
+
+ socket = skcf->listen.socket;
+
+ for (link = nxt_queue_first(listen_connections);
+ link != nxt_queue_tail(listen_connections);
+ link = nxt_queue_next(link))
+ {
+ listen = nxt_queue_link_data(link, nxt_listen_event_t, link);
+
+ if (socket == listen->socket.fd) {
+ return listen;
+ }
+ }
+
+ return NULL;
+}
+
+
+static void
+nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_event_engine_t *engine;
+ nxt_listen_event_t *listen;
+ nxt_socket_conf_joint_t *joint, *old;
+
+ engine = obj;
+ joint = data;
+
+ listen = nxt_router_listen_event(&engine->listen_connections,
+ joint->socket_conf);
+
+ old = listen->socket.data;
+ listen->socket.data = joint;
+
+ nxt_router_conf_release(task, old);
+}
+
+
+static void
+nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_socket_conf_t *skcf;
+ nxt_listen_event_t *listen;
+ nxt_event_engine_t *engine;
+
+ engine = obj;
+ skcf = data;
+
+ listen = nxt_router_listen_event(&engine->listen_connections, skcf);
+
+ nxt_fd_event_delete(engine, &listen->socket);
+
+ listen->timer.handler = nxt_router_listen_socket_close;
+ listen->timer.work_queue = &engine->fast_work_queue;
+
+ nxt_timer_add(engine, &listen->timer, 0);
+}
+
+
+static void
+nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_timer_t *timer;
+ nxt_listen_event_t *listen;
+ nxt_socket_conf_joint_t *joint;
+
+ timer = obj;
+ listen = nxt_timer_data(timer, nxt_listen_event_t, timer);
+ joint = listen->socket.data;
+
+ nxt_queue_remove(&listen->link);
+ nxt_free(listen);
+
+ nxt_router_listen_socket_release(task, joint);
+}
+
+
+static void
+nxt_router_listen_socket_release(nxt_task_t *task,
+ nxt_socket_conf_joint_t *joint)
+{
+ nxt_socket_t s;
+ nxt_listen_socket_t *ls;
+ nxt_thread_spinlock_t *lock;
+
+ s = -1;
+ ls = &joint->socket_conf->listen;
+ lock = &joint->socket_conf->router_conf->router->lock;
+
+ nxt_thread_spin_lock(lock);
+
+ if (--ls->count == 0) {
+ s = ls->socket;
+ ls->socket = -1;
+ }
+
+ nxt_thread_spin_unlock(lock);
+
+ if (s != -1) {
+ nxt_socket_close(task, s);
+ }
+
+ nxt_router_conf_release(task, joint);
+}
+
+
+static void
+nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
+{
+ nxt_socket_conf_t *skcf;
+ nxt_router_conf_t *rtcf;
+ nxt_thread_spinlock_t *lock;
+
+ nxt_debug(task, "conf joint count: %D", joint->count);
+
+ if (--joint->count != 0) {
+ return;
+ }
+
+ nxt_queue_remove(&joint->link);
+
+ skcf = joint->socket_conf;
+ rtcf = skcf->router_conf;
+ lock = &rtcf->router->lock;
+
+ nxt_thread_spin_lock(lock);
+
+ if (--skcf->count != 0) {
+ rtcf = NULL;
+
+ } else {
+ nxt_queue_remove(&skcf->link);
+
+ if (--rtcf->count != 0) {
+ rtcf = NULL;
+ }
+ }
+
+ nxt_thread_spin_unlock(lock);
+
+ if (rtcf != NULL) {
+ nxt_mem_pool_destroy(rtcf->mem_pool);
+ }
+
+ if (nxt_queue_is_empty(&joint->engine->joints)) {
+ nxt_thread_exit(task->thread);
+ }
+}
+
+
+static void
+nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_thread_link_t *link;
+ nxt_event_engine_t *engine;
+ nxt_thread_handle_t handle;
+
+ handle = obj;
+ link = data;
+
+ nxt_thread_wait(handle);
+
+ engine = link->engine;
+
+ nxt_queue_remove(&engine->link);
+
+ nxt_mem_cache_pool_destroy(engine->mem_pool);
+
+ nxt_event_engine_free(engine);
+
+ nxt_free(link);
+
+ // TODO: free port
+}
+
+
+static const nxt_event_conn_state_t nxt_router_conn_read_state
+ nxt_aligned(64) =
+{
+ .ready_handler = nxt_router_conn_http_header_parse,
+ .close_handler = nxt_router_conn_close,
+ .error_handler = nxt_router_conn_error,
+
+ .timer_handler = nxt_router_conn_timeout,
+ .timer_value = nxt_router_conn_timeout_value,
+ .timer_data = offsetof(nxt_socket_conf_t, header_read_timeout),
+};
+
+
+static void
+nxt_router_conn_init(nxt_task_t *task, void *obj, void *data)
+{
+ size_t size;
+ nxt_event_conn_t *c;
+ nxt_event_engine_t *engine;
+ nxt_socket_conf_joint_t *joint;
+
+ c = obj;
+ joint = data;
+
+ nxt_debug(task, "router conn init");
+
+ joint->count++;
+
+ size = joint->socket_conf->header_buffer_size;
+ c->read = nxt_buf_mem_alloc(c->mem_pool, size, 0);
+
+ c->socket.data = NULL;
+
+ engine = task->thread->engine;
+ c->read_work_queue = &engine->fast_work_queue;
+ c->write_work_queue = &engine->fast_work_queue;
+
+ c->read_state = &nxt_router_conn_read_state;
+
+ nxt_event_conn_read(engine, c);
+}
+
+
+static const nxt_event_conn_state_t nxt_router_conn_write_state
+ nxt_aligned(64) =
+{
+ .ready_handler = nxt_router_conn_close,
+ .close_handler = nxt_router_conn_close,
+ .error_handler = nxt_router_conn_error,
+};
+
+
+static void
+nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data)
+{
+ size_t size;
+ nxt_int_t ret;
+ nxt_buf_t *b;
+ nxt_event_conn_t *c;
+ nxt_socket_conf_joint_t *joint;
+ nxt_http_request_parse_t *rp;
+
+ c = obj;
+ rp = data;
+
+ nxt_debug(task, "router conn http header parse");
+
+ if (rp == NULL) {
+ rp = nxt_mem_zalloc(c->mem_pool, sizeof(nxt_http_request_parse_t));
+ if (nxt_slow_path(rp == NULL)) {
+ nxt_router_conn_close(task, c, data);
+ return;
+ }
+
+ c->socket.data = rp;
+ }
+
+ ret = nxt_http_parse_request(rp, &c->read->mem);
+
+ nxt_debug(task, "http parse request: %d", ret);
+
+ switch (nxt_expect(NXT_DONE, ret)) {
+
+ case NXT_DONE:
+ break;
+
+ case NXT_ERROR:
+ nxt_router_conn_close(task, c, data);
+ return;
+
+ default: /* NXT_AGAIN */
+
+ if (c->read->mem.free == c->read->mem.end) {
+ joint = c->listen->socket.data;
+ size = joint->socket_conf->large_header_buffer_size,
+
+ b = nxt_buf_mem_alloc(c->mem_pool, size, 0);
+ if (nxt_slow_path(b == NULL)) {
+ nxt_router_conn_close(task, c, data);
+ return;
+ }
+
+ size = c->read->mem.free - c->read->mem.pos;
+ nxt_memcpy(b->mem.pos, c->read->mem.pos, size);
+
+ b->mem.free += size;
+ c->read = b;
+ }
+
+ nxt_event_conn_read(task->thread->engine, c);
+ return;
+ }
+
+ c->write = c->read;
+ c->write->mem.pos = c->write->mem.start;
+ c->write_state = &nxt_router_conn_write_state;
+
+ nxt_event_conn_write(task->thread->engine, c);
+}
+
+
+static const nxt_event_conn_state_t nxt_router_conn_close_state
+ nxt_aligned(64) =
+{
+ .ready_handler = nxt_router_conn_free,
+};
+
+
+static void
+nxt_router_conn_close(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_event_conn_t *c;
+
+ c = obj;
+
+ nxt_debug(task, "router conn close");
+
+ c->write_state = &nxt_router_conn_close_state;
+
+ nxt_event_conn_close(task->thread->engine, c);
+}
+
+
+static void
+nxt_router_conn_free(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_event_conn_t *c;
+ nxt_socket_conf_joint_t *joint;
+
+ c = obj;
+
+ nxt_debug(task, "router conn close done");
+
+ joint = c->listen->socket.data;
+ nxt_router_conf_release(task, joint);
+
+ nxt_mem_pool_destroy(c->mem_pool);
+}
+
+
+static void
+nxt_router_conn_error(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_event_conn_t *c;
+
+ c = obj;
+
+ nxt_debug(task, "router conn error");
+
+ c->write_state = &nxt_router_conn_close_state;
+
+ nxt_event_conn_close(task->thread->engine, c);
+}
+
+
+static void
+nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_timer_t *timer;
+ nxt_event_conn_t *c;
+
+ timer = obj;
+
+ nxt_debug(task, "router conn timeout");
+
+ c = nxt_event_read_timer_conn(timer);
+
+ c->write_state = &nxt_router_conn_close_state;
+
+ nxt_event_conn_close(task->thread->engine, c);
+}
+
+
+static nxt_msec_t
+nxt_router_conn_timeout_value(nxt_event_conn_t *c, uintptr_t data)
+{
+ nxt_socket_conf_joint_t *joint;
+
+ joint = c->listen->socket.data;
+
+ return nxt_value_at(nxt_msec_t, joint->socket_conf, data);
+}
diff --git a/src/nxt_router.h b/src/nxt_router.h
new file mode 100644
index 00000000..d31a5964
--- /dev/null
+++ b/src/nxt_router.h
@@ -0,0 +1,78 @@
+
+/*
+ * Copyright (C) Igor Sysoev
+ * Copyright (C) NGINX, Inc.
+ */
+
+#ifndef _NXT_ROUTER_H_INCLUDED_
+#define _NXT_ROUTER_H_INCLUDED_
+
+
+#include <nxt_main.h>
+#include <nxt_runtime.h>
+#include <nxt_master_process.h>
+
+
+typedef struct {
+ nxt_thread_spinlock_t lock;
+ nxt_queue_t engines;
+
+ nxt_queue_t sockets;
+} nxt_router_t;
+
+
+typedef struct {
+ uint32_t count;
+ uint32_t threads;
+ nxt_router_t *router;
+ nxt_mem_pool_t *mem_pool;
+} nxt_router_conf_t;
+
+
+typedef struct {
+ nxt_event_engine_t *engine;
+ nxt_task_t task;
+ nxt_array_t *creating; /* of nxt_work_t */
+ nxt_array_t *updating; /* of nxt_work_t */
+ nxt_array_t *deleting; /* of nxt_work_t */
+} nxt_router_engine_conf_t;
+
+
+typedef struct {
+ nxt_queue_t creating;
+ nxt_queue_t pending;
+ nxt_queue_t updating;
+ nxt_queue_t keeping;
+ nxt_queue_t deleting;
+
+ uint32_t new_threads;
+
+ nxt_array_t *engines;
+ nxt_router_conf_t *conf;
+ nxt_mem_pool_t *mem_pool;
+} nxt_router_temp_conf_t;
+
+
+typedef struct {
+ uint32_t count;
+ nxt_listen_socket_t listen;
+ nxt_queue_link_t link;
+ nxt_router_conf_t *router_conf;
+
+ size_t header_buffer_size;
+ size_t large_header_buffer_size;
+ nxt_msec_t header_read_timeout;
+} nxt_socket_conf_t;
+
+
+typedef struct {
+ uint32_t count;
+ nxt_queue_link_t link;
+ nxt_event_engine_t *engine;
+ nxt_socket_conf_t *socket_conf;
+
+ /* Modules configuraitons. */
+} nxt_socket_conf_joint_t;
+
+
+#endif /* _NXT_ROUTER_H_INCLUDED_ */
diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c
index 8337712a..5678f522 100644
--- a/src/nxt_runtime.c
+++ b/src/nxt_runtime.c
@@ -253,20 +253,10 @@ nxt_runtime_systemd_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt)
static nxt_int_t
nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt)
{
- nxt_event_engine_t *engine, **e;
+ nxt_thread_t *thread;
+ nxt_event_engine_t *engine;
const nxt_event_interface_t *interface;
- rt->engines = nxt_array_create(rt->mem_pool, 1,
- sizeof(nxt_event_engine_t *));
- if (nxt_slow_path(rt->engines == NULL)) {
- return NXT_ERROR;
- }
-
- e = nxt_array_add(rt->engines);
- if (nxt_slow_path(e == NULL)) {
- return NXT_ERROR;
- }
-
interface = nxt_service_get(rt->services, "engine", NULL);
if (nxt_slow_path(interface == NULL)) {
@@ -281,8 +271,14 @@ nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt)
return NXT_ERROR;
}
+ thread = task->thread;
+ thread->engine = engine;
+ thread->fiber = &engine->fibers->fiber;
+
engine->id = rt->last_engine_id++;
- *e = engine;
+
+ nxt_queue_init(&rt->engines);
+ nxt_queue_insert_tail(&rt->engines, &engine->link);
return NXT_OK;
}
@@ -587,12 +583,13 @@ nxt_runtime_event_engine_change(nxt_task_t *task, nxt_runtime_t *rt)
void
nxt_runtime_event_engine_free(nxt_runtime_t *rt)
{
- nxt_event_engine_t *engine, **engines;
+ nxt_queue_link_t *link;
+ nxt_event_engine_t *engine;
- engines = rt->engines->elts;
- engine = engines[0];
- nxt_array_remove(rt->engines, &engines[0]);
+ link = nxt_queue_first(&rt->engines);
+ nxt_queue_remove(link);
+ engine = nxt_queue_link_data(link, nxt_event_engine_t, link);
nxt_event_engine_free(engine);
}
diff --git a/src/nxt_runtime.h b/src/nxt_runtime.h
index 5cce5703..c24618b7 100644
--- a/src/nxt_runtime.h
+++ b/src/nxt_runtime.h
@@ -19,7 +19,7 @@ struct nxt_runtime_s {
nxt_array_t *listen_sockets; /* of nxt_listen_socket_t */
nxt_array_t *services; /* of nxt_service_t */
- nxt_array_t *engines; /* of nxt_event_engine_t */
+ void *data;
nxt_runtime_cont_t start;
@@ -61,6 +61,8 @@ struct nxt_runtime_s {
const char *pid;
const char *error_log;
+ nxt_queue_t engines; /* of nxt_event_engine_t */
+
nxt_sockaddr_t *controller_listen;
nxt_listen_socket_t *controller_socket;
nxt_str_t upstream;
diff --git a/src/nxt_signal.c b/src/nxt_signal.c
index 492904d1..3ccd0774 100644
--- a/src/nxt_signal.c
+++ b/src/nxt_signal.c
@@ -141,7 +141,7 @@ nxt_signal_thread_start(nxt_event_engine_t *engine)
if (nxt_fast_path(link != NULL)) {
link->start = nxt_signal_thread;
- link->data = engine;
+ link->work.data = engine;
if (nxt_thread_create(&engine->signals->thread, link) == NXT_OK) {
engine->signals->process = nxt_pid;
diff --git a/src/nxt_thread.c b/src/nxt_thread.c
index 78c6f50c..851ebb4f 100644
--- a/src/nxt_thread.c
+++ b/src/nxt_thread.c
@@ -114,9 +114,9 @@ nxt_thread_trampoline(void *data)
pthread_cleanup_push(nxt_thread_time_cleanup, thr);
start = link->start;
- data = link->data;
+ data = link->work.data;
- if (link->engine != NULL) {
+ if (link->work.handler != NULL) {
thr->link = link;
} else {
@@ -180,7 +180,8 @@ nxt_thread_time_cleanup(void *data)
void
nxt_thread_exit(nxt_thread_t *thr)
{
- nxt_thread_link_t *link;
+ nxt_thread_link_t *link;
+ nxt_event_engine_t *engine;
nxt_log_debug(thr->log, "thread exit");
@@ -189,13 +190,14 @@ nxt_thread_exit(nxt_thread_t *thr)
if (link != NULL) {
/*
- * link->handler is already set to an exit handler,
- * and link->task is already set to engine->task.
+ * link->work.handler is already set to an exit handler,
+ * and link->work.task is already set to the correct engine->task.
* The link should be freed by the exit handler.
*/
link->work.obj = (void *) (uintptr_t) thr->handle;
+ engine = nxt_container_of(link->work.task, nxt_event_engine_t, task);
- nxt_event_engine_post(link->engine, &link->work);
+ nxt_event_engine_post(engine, &link->work);
}
nxt_thread_time_free(thr);
diff --git a/src/nxt_thread.h b/src/nxt_thread.h
index a8e04749..c1f3aaee 100644
--- a/src/nxt_thread.h
+++ b/src/nxt_thread.h
@@ -90,7 +90,6 @@ typedef void (*nxt_thread_start_t)(void *data);
typedef struct {
nxt_thread_start_t start;
- void *data;
nxt_event_engine_t *engine;
nxt_work_t work;
} nxt_thread_link_t;
@@ -180,6 +179,7 @@ struct nxt_thread_s {
nxt_runtime_t *runtime;
nxt_event_engine_t *engine;
+ void *data;
/*
* Although pointer to a current fiber should be a property of
diff --git a/src/nxt_thread_pool.c b/src/nxt_thread_pool.c
index fd7246c8..c8959dc6 100644
--- a/src/nxt_thread_pool.c
+++ b/src/nxt_thread_pool.c
@@ -10,6 +10,7 @@
static nxt_int_t nxt_thread_pool_init(nxt_thread_pool_t *tp);
static void nxt_thread_pool_exit(nxt_task_t *task, void *obj, void *data);
static void nxt_thread_pool_start(void *ctx);
+static void nxt_thread_pool_loop(void *ctx);
static void nxt_thread_pool_wait(nxt_thread_pool_t *tp);
@@ -86,13 +87,8 @@ nxt_thread_pool_init(nxt_thread_pool_t *tp)
if (nxt_fast_path(link != NULL)) {
link->start = nxt_thread_pool_start;
- link->data = tp;
- link->engine = tp->engine;
- /*
- * link->exit is not used. link->engine is used just to
- * set thr->link by nxt_thread_trampoline() and the link
- * is a mark of the first thread of pool.
- */
+ link->work.data = tp;
+
if (nxt_thread_create(&handle, link) == NXT_OK) {
tp->ready = 1;
goto done;
@@ -118,6 +114,22 @@ done:
static void
nxt_thread_pool_start(void *ctx)
{
+ nxt_thread_t *thr;
+ nxt_thread_pool_t *tp;
+
+ tp = ctx;
+ thr = nxt_thread();
+
+ tp->main = thr->handle;
+ tp->task.thread = thr;
+
+ nxt_thread_pool_loop(ctx);
+}
+
+
+static void
+nxt_thread_pool_loop(void *ctx)
+{
void *obj, *data;
nxt_task_t *task;
nxt_thread_t *thr;
@@ -127,17 +139,6 @@ nxt_thread_pool_start(void *ctx)
tp = ctx;
thr = nxt_thread();
- if (thr->link != NULL) {
- /* Only the first thread has a link. */
- tp->main = thr->handle;
- nxt_free(thr->link);
- thr->link = NULL;
-
- tp->task.thread = thr;
- }
-
- thr->thread_pool = tp;
-
if (tp->init != NULL) {
tp->init();
}
@@ -215,8 +216,8 @@ nxt_thread_pool_wait(nxt_thread_pool_t *tp)
link = nxt_zalloc(sizeof(nxt_thread_link_t));
if (nxt_fast_path(link != NULL)) {
- link->start = nxt_thread_pool_start;
- link->data = tp;
+ link->start = nxt_thread_pool_loop;
+ link->work.data = tp;
if (nxt_thread_create(&handle, link) != NXT_OK) {
(void) nxt_atomic_fetch_add(&tp->threads, -1);
@@ -232,6 +233,8 @@ nxt_thread_pool_destroy(nxt_thread_pool_t *tp)
thr = nxt_thread();
+ nxt_log_debug(thr->log, "thread pool destroy: %d", tp->ready);
+
if (!tp->ready) {
nxt_work_queue_add(&thr->engine->fast_work_queue, tp->exit,
&tp->task, tp, NULL);
diff --git a/src/nxt_thread_time.c b/src/nxt_thread_time.c
index 916aa564..95c88fa3 100644
--- a/src/nxt_thread_time.c
+++ b/src/nxt_thread_time.c
@@ -123,7 +123,7 @@ nxt_time_thread_start(nxt_msec_t interval)
if (nxt_fast_path(link != NULL)) {
link->start = nxt_time_thread;
- link->data = (void *) (uintptr_t) interval;
+ link->work.data = (void *) (uintptr_t) interval;
(void) nxt_thread_create(&handle, link);
}