summaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorIgor Sysoev <igor@sysoev.ru>2017-06-14 15:18:52 +0300
committerIgor Sysoev <igor@sysoev.ru>2017-06-14 15:18:52 +0300
commit7574c64992b98d3dfbc3dd101bd0f7d78bad0823 (patch)
tree3a98c46e88d9023df34be3e6cce4f762d53aad36 /src
parent3e2632688f53c4cb08e7ac03c61e71facd038df4 (diff)
downloadunit-7574c64992b98d3dfbc3dd101bd0f7d78bad0823.tar.gz
unit-7574c64992b98d3dfbc3dd101bd0f7d78bad0823.tar.bz2
nxt_event_conn_... functions and structures have been renamed
to nxt_conn_...
Diffstat (limited to 'src')
-rw-r--r--src/nxt_application.c48
-rw-r--r--src/nxt_application.h2
-rw-r--r--src/nxt_conn.c (renamed from src/nxt_event_conn.c)56
-rw-r--r--src/nxt_conn.h (renamed from src/nxt_event_conn.h)161
-rw-r--r--src/nxt_conn_accept.c366
-rw-r--r--src/nxt_conn_close.c12
-rw-r--r--src/nxt_conn_connect.c (renamed from src/nxt_event_conn_connect.c)40
-rw-r--r--src/nxt_conn_proxy.c998
-rw-r--r--src/nxt_conn_read.c (renamed from src/nxt_event_conn_read.c)40
-rw-r--r--src/nxt_conn_write.c (renamed from src/nxt_event_conn_write.c)270
-rw-r--r--src/nxt_controller.c66
-rw-r--r--src/nxt_devpoll_engine.c2
-rw-r--r--src/nxt_epoll_engine.c97
-rw-r--r--src/nxt_event_conn_accept.c413
-rw-r--r--src/nxt_event_conn_job_sendfile.c16
-rw-r--r--src/nxt_event_conn_proxy.c1017
-rw-r--r--src/nxt_event_engine.h6
-rw-r--r--src/nxt_eventport_engine.c2
-rw-r--r--src/nxt_file_event.h (renamed from src/nxt_event_file.h)8
-rw-r--r--src/nxt_kqueue_engine.c117
-rw-r--r--src/nxt_listen_socket.c2
-rw-r--r--src/nxt_macosx_sendfile.c3
-rw-r--r--src/nxt_main.h9
-rw-r--r--src/nxt_openssl.c46
-rw-r--r--src/nxt_poll_engine.c2
-rw-r--r--src/nxt_pollset_engine.c2
-rw-r--r--src/nxt_router.c47
-rw-r--r--src/nxt_runtime.c6
-rw-r--r--src/nxt_select_engine.c2
-rw-r--r--src/nxt_sendbuf.c4
-rw-r--r--src/nxt_sendbuf.h16
-rw-r--r--src/nxt_ssltls.h3
-rw-r--r--src/nxt_stream_module.c10
-rw-r--r--src/nxt_stream_source.c10
-rw-r--r--src/nxt_stream_source.h2
-rw-r--r--src/nxt_worker_process.c16
36 files changed, 1915 insertions, 2002 deletions
diff --git a/src/nxt_application.c b/src/nxt_application.c
index 293807b0..a07a9038 100644
--- a/src/nxt_application.c
+++ b/src/nxt_application.c
@@ -17,19 +17,19 @@ static nxt_int_t nxt_app_listen_socket(nxt_task_t *task, nxt_runtime_t *rt);
static void nxt_app_thread(void *ctx);
static nxt_app_request_t *nxt_app_request_create(nxt_socket_t s,
nxt_log_t *log);
-static void nxt_app_conn_update(nxt_thread_t *thr, nxt_event_conn_t *c,
+static void nxt_app_conn_update(nxt_thread_t *thr, nxt_conn_t *c,
nxt_log_t *log);
static nxt_int_t nxt_app_write_finish(nxt_app_request_t *r);
-static void nxt_app_buf_send(nxt_event_conn_t *c, nxt_buf_t *out);
+static void nxt_app_buf_send(nxt_conn_t *c, nxt_buf_t *out);
static void nxt_app_delivery_handler(nxt_task_t *task, void *obj, void *data);
static void nxt_app_delivery_ready(nxt_task_t *task, void *obj, void *data);
static void nxt_app_delivery_completion(nxt_task_t *task, void *obj,
void *data);
static void nxt_app_delivery_error(nxt_task_t *task, void *obj, void *data);
static void nxt_app_delivery_timeout(nxt_task_t *task, void *obj, void *data);
-static nxt_msec_t nxt_app_delivery_timer_value(nxt_event_conn_t *c,
+static nxt_msec_t nxt_app_delivery_timer_value(nxt_conn_t *c,
uintptr_t data);
-static void nxt_app_delivery_done(nxt_task_t *task, nxt_event_conn_t *c);
+static void nxt_app_delivery_done(nxt_task_t *task, nxt_conn_t *c);
static void nxt_app_close_request(nxt_task_t *task, void *obj, void *data);
@@ -256,8 +256,8 @@ nxt_app_thread(void *ctx)
static nxt_app_request_t *
nxt_app_request_create(nxt_socket_t s, nxt_log_t *log)
{
+ nxt_conn_t *c;
nxt_mem_pool_t *mp;
- nxt_event_conn_t *c;
nxt_app_request_t *r;
mp = nxt_mem_pool_create(1024);
@@ -270,7 +270,7 @@ nxt_app_request_create(nxt_socket_t s, nxt_log_t *log)
return NULL;
}
- c = nxt_mem_zalloc(mp, sizeof(nxt_event_conn_t));
+ c = nxt_mem_zalloc(mp, sizeof(nxt_conn_t));
if (nxt_slow_path(c == NULL)) {
return NULL;
}
@@ -534,7 +534,7 @@ nxt_app_http_process_headers(nxt_app_request_t *r)
static void
-nxt_app_conn_update(nxt_thread_t *thr, nxt_event_conn_t *c, nxt_log_t *log)
+nxt_app_conn_update(nxt_thread_t *thr, nxt_conn_t *c, nxt_log_t *log)
{
c->socket.write_ready = 1;
@@ -562,8 +562,8 @@ nxt_app_conn_update(nxt_thread_t *thr, nxt_event_conn_t *c, nxt_log_t *log)
c->read_work_queue = &thr->engine->read_work_queue;
c->write_work_queue = &thr->engine->write_work_queue;
- nxt_event_conn_timer_init(&c->read_timer, c, c->socket.read_work_queue);
- nxt_event_conn_timer_init(&c->write_timer, c, c->socket.write_work_queue);
+ nxt_conn_timer_init(&c->read_timer, c, c->socket.read_work_queue);
+ nxt_conn_timer_init(&c->write_timer, c, c->socket.write_work_queue);
nxt_log_debug(&c->log, "event connections: %uD", thr->engine->connections);
}
@@ -770,7 +770,7 @@ nxt_app_write_finish(nxt_app_request_t *r)
static void
-nxt_app_buf_send(nxt_event_conn_t *c, nxt_buf_t *out)
+nxt_app_buf_send(nxt_conn_t *c, nxt_buf_t *out)
{
nxt_app_buf_t *ab;
@@ -785,9 +785,9 @@ nxt_app_buf_send(nxt_event_conn_t *c, nxt_buf_t *out)
static void
nxt_app_delivery_handler(nxt_task_t *task, void *obj, void *data)
{
- nxt_buf_t *b;
- nxt_mem_pool_t *mp;
- nxt_event_conn_t *c;
+ nxt_buf_t *b;
+ nxt_conn_t *c;
+ nxt_mem_pool_t *mp;
c = obj;
b = data;
@@ -820,7 +820,7 @@ nxt_app_delivery_handler(nxt_task_t *task, void *obj, void *data)
c->write = b;
c->write_state = &nxt_app_delivery_write_state;
- nxt_event_conn_write(task->thread->engine, c);
+ nxt_conn_write(task->thread->engine, c);
}
@@ -840,8 +840,8 @@ static const nxt_event_conn_state_t nxt_app_delivery_write_state
static void
nxt_app_delivery_ready(nxt_task_t *task, void *obj, void *data)
{
- nxt_buf_t *b, *next;
- nxt_event_conn_t *c;
+ nxt_buf_t *b, *next;
+ nxt_conn_t *c;
c = obj;
@@ -875,8 +875,8 @@ static const nxt_event_conn_state_t nxt_app_delivery_close_state
static void
nxt_app_delivery_completion(nxt_task_t *task, void *obj, void *data)
{
- nxt_buf_t *b, *bn, *free;
- nxt_event_conn_t *c;
+ nxt_buf_t *b, *bn, *free;
+ nxt_conn_t *c;
nxt_app_request_t *r;
nxt_debug(task, "app delivery completion");
@@ -902,7 +902,7 @@ nxt_app_delivery_completion(nxt_task_t *task, void *obj, void *data)
c = r->event_conn;
c->write_state = &nxt_app_delivery_close_state;
- nxt_event_conn_close(task->thread->engine, c);
+ nxt_conn_close(task->thread->engine, c);
}
}
@@ -929,7 +929,7 @@ nxt_app_delivery_completion(nxt_task_t *task, void *obj, void *data)
static void
nxt_app_delivery_error(nxt_task_t *task, void *obj, void *data)
{
- nxt_event_conn_t *c;
+ nxt_conn_t *c;
c = obj;
@@ -942,7 +942,7 @@ nxt_app_delivery_error(nxt_task_t *task, void *obj, void *data)
static void
nxt_app_delivery_timeout(nxt_task_t *task, void *obj, void *data)
{
- nxt_event_conn_t *c;
+ nxt_conn_t *c;
c = obj;
@@ -953,7 +953,7 @@ nxt_app_delivery_timeout(nxt_task_t *task, void *obj, void *data)
static nxt_msec_t
-nxt_app_delivery_timer_value(nxt_event_conn_t *c, uintptr_t data)
+nxt_app_delivery_timer_value(nxt_conn_t *c, uintptr_t data)
{
/* 30000 ms */
return 30000;
@@ -961,7 +961,7 @@ nxt_app_delivery_timer_value(nxt_event_conn_t *c, uintptr_t data)
static void
-nxt_app_delivery_done(nxt_task_t *task, nxt_event_conn_t *c)
+nxt_app_delivery_done(nxt_task_t *task, nxt_conn_t *c)
{
if (c->write == NULL) {
return;
@@ -981,7 +981,7 @@ nxt_app_delivery_done(nxt_task_t *task, nxt_event_conn_t *c)
static void
nxt_app_close_request(nxt_task_t *task, void *obj, void *data)
{
- nxt_event_conn_t *c;
+ nxt_conn_t *c;
nxt_app_request_t *r;
c = obj;
diff --git a/src/nxt_application.h b/src/nxt_application.h
index c2619ded..5fd49667 100644
--- a/src/nxt_application.h
+++ b/src/nxt_application.h
@@ -29,7 +29,7 @@ typedef struct {
typedef struct {
nxt_event_engine_t *engine;
nxt_mem_pool_t *mem_pool;
- nxt_event_conn_t *event_conn;
+ nxt_conn_t *event_conn;
nxt_log_t *log;
nxt_buf_t *output_buf;
diff --git a/src/nxt_event_conn.c b/src/nxt_conn.c
index b8adb00a..3f21747e 100644
--- a/src/nxt_event_conn.c
+++ b/src/nxt_conn.c
@@ -7,13 +7,13 @@
#include <nxt_main.h>
-nxt_event_conn_io_t nxt_unix_event_conn_io = {
- nxt_event_conn_io_connect,
- nxt_event_conn_io_accept,
+nxt_conn_io_t nxt_unix_conn_io = {
+ nxt_conn_io_connect,
+ nxt_conn_io_accept,
- nxt_event_conn_io_read,
- nxt_event_conn_io_recvbuf,
- nxt_event_conn_io_recv,
+ nxt_conn_io_read,
+ nxt_conn_io_recvbuf,
+ nxt_conn_io_recv,
nxt_conn_io_write,
nxt_event_conn_io_write_chunk,
@@ -37,17 +37,17 @@ nxt_event_conn_io_t nxt_unix_event_conn_io = {
nxt_event_conn_io_writev,
nxt_event_conn_io_send,
- nxt_event_conn_io_shutdown,
+ nxt_conn_io_shutdown,
};
-nxt_event_conn_t *
-nxt_event_conn_create(nxt_mem_pool_t *mp, nxt_task_t *task)
+nxt_conn_t *
+nxt_conn_create(nxt_mem_pool_t *mp, nxt_task_t *task)
{
- nxt_thread_t *thr;
- nxt_event_conn_t *c;
+ nxt_conn_t *c;
+ nxt_thread_t *thr;
- c = nxt_mem_zalloc(mp, sizeof(nxt_event_conn_t));
+ c = nxt_mem_zalloc(mp, sizeof(nxt_conn_t));
if (nxt_slow_path(c == NULL)) {
return NULL;
}
@@ -82,22 +82,22 @@ nxt_event_conn_create(nxt_mem_pool_t *mp, nxt_task_t *task)
c->socket.read_work_queue = &thr->engine->fast_work_queue;
c->socket.write_work_queue = &thr->engine->fast_work_queue;
- nxt_event_conn_timer_init(&c->read_timer, c, c->socket.read_work_queue);
- nxt_event_conn_timer_init(&c->write_timer, c, c->socket.write_work_queue);
+ nxt_conn_timer_init(&c->read_timer, c, c->socket.read_work_queue);
+ nxt_conn_timer_init(&c->write_timer, c, c->socket.write_work_queue);
- nxt_log_debug(&c->log, "event connections: %uD", thr->engine->connections);
+ nxt_log_debug(&c->log, "connections: %uD", thr->engine->connections);
return c;
}
void
-nxt_event_conn_io_shutdown(nxt_task_t *task, void *obj, void *data)
+nxt_conn_io_shutdown(nxt_task_t *task, void *obj, void *data)
{
- int ret;
- socklen_t len;
- struct linger linger;
- nxt_event_conn_t *c;
+ int ret;
+ socklen_t len;
+ nxt_conn_t *c;
+ struct linger linger;
c = obj;
@@ -126,24 +126,24 @@ nxt_event_conn_io_shutdown(nxt_task_t *task, void *obj, void *data)
void
-nxt_event_conn_timer(nxt_event_engine_t *engine, nxt_event_conn_t *c,
- const nxt_event_conn_state_t *state, nxt_timer_t *tev)
+nxt_conn_timer(nxt_event_engine_t *engine, nxt_conn_t *c,
+ const nxt_conn_state_t *state, nxt_timer_t *timer)
{
- nxt_msec_t timer;
+ nxt_msec_t value;
if (state->timer_value != NULL) {
- timer = state->timer_value(c, state->timer_data);
+ value = state->timer_value(c, state->timer_data);
- if (timer != 0) {
- tev->handler = state->timer_handler;
- nxt_timer_add(engine, tev, timer);
+ if (value != 0) {
+ timer->handler = state->timer_handler;
+ nxt_timer_add(engine, timer, value);
}
}
}
void
-nxt_event_conn_work_queue_set(nxt_event_conn_t *c, nxt_work_queue_t *wq)
+nxt_conn_work_queue_set(nxt_conn_t *c, nxt_work_queue_t *wq)
{
c->read_work_queue = wq;
c->write_work_queue = wq;
diff --git a/src/nxt_event_conn.h b/src/nxt_conn.h
index 02c088b8..491531b0 100644
--- a/src/nxt_event_conn.h
+++ b/src/nxt_conn.h
@@ -4,12 +4,11 @@
* Copyright (C) NGINX, Inc.
*/
-#ifndef _NXT_EVENT_CONN_H_INCLUDED_
-#define _NXT_EVENT_CONN_H_INCLUDED_
+#ifndef _NXT_CONN_H_INCLUDED_
+#define _NXT_CONN_H_INCLUDED_
-typedef nxt_msec_t (*nxt_event_conn_timer_val_t)(nxt_event_conn_t *c,
- uintptr_t data);
+typedef nxt_msec_t (*nxt_conn_timer_value_t)(nxt_conn_t *c, uintptr_t data);
typedef struct {
@@ -18,11 +17,11 @@ typedef struct {
nxt_work_handler_t error_handler;
nxt_work_handler_t timer_handler;
- nxt_event_conn_timer_val_t timer_value;
+ nxt_conn_timer_value_t timer_value;
uintptr_t timer_data;
uint8_t timer_autoreset;
-} nxt_event_conn_state_t;
+} nxt_conn_state_t;
typedef struct {
@@ -49,9 +48,9 @@ typedef struct {
*/
nxt_work_handler_t read;
- ssize_t (*recvbuf)(nxt_event_conn_t *c, nxt_buf_t *b);
+ ssize_t (*recvbuf)(nxt_conn_t *c, nxt_buf_t *b);
- ssize_t (*recv)(nxt_event_conn_t *c, void *buf,
+ ssize_t (*recv)(nxt_conn_t *c, void *buf,
size_t size, nxt_uint_t flags);
/*
@@ -66,35 +65,35 @@ typedef struct {
* buffers data and calls the library specific send() interface to write
* the buffered data eventually.
*/
- ssize_t (*write_chunk)(nxt_event_conn_t *c,
+ ssize_t (*write_chunk)(nxt_conn_t *c,
nxt_buf_t *b, size_t limit);
/*
* The sendbuf() is an interface for OS-specific sendfile
* implementations or simple writev().
*/
- ssize_t (*sendbuf)(nxt_event_conn_t *c, nxt_buf_t *b,
+ ssize_t (*sendbuf)(nxt_conn_t *c, nxt_buf_t *b,
size_t limit);
/*
* The writev() is an interface to write several nxt_iobuf_t buffers.
*/
- ssize_t (*writev)(nxt_event_conn_t *c,
+ ssize_t (*writev)(nxt_conn_t *c,
nxt_iobuf_t *iob, nxt_uint_t niob);
/*
* The send() is an interface to write a single buffer. SSL/TLS
* libraries' send() interface handles also the libraries' errors.
*/
- ssize_t (*send)(nxt_event_conn_t *c, void *buf,
+ ssize_t (*send)(nxt_conn_t *c, void *buf,
size_t size);
nxt_work_handler_t shutdown;
-} nxt_event_conn_io_t;
+} nxt_conn_io_t;
/*
- * The nxt_event_conn_listen_t is separated from nxt_listen_socket_t
+ * The nxt_listen_event_t is separated from nxt_listen_socket_t
* because nxt_listen_socket_t is one per process whilst each worker
- * thread uses own nxt_event_conn_listen_t.
+ * thread uses own nxt_listen_event_t.
*/
typedef struct {
/* Must be the first field. */
@@ -109,31 +108,29 @@ typedef struct {
nxt_work_handler_t accept;
nxt_listen_socket_t *listen;
- nxt_event_conn_t *next; /* STUB */;
+ nxt_conn_t *next; /* STUB */;
nxt_work_queue_t *work_queue;
nxt_timer_t timer;
nxt_queue_link_t link;
-} nxt_event_conn_listen_t;
+} nxt_listen_event_t;
-typedef nxt_event_conn_listen_t nxt_listen_event_t;
-
-struct nxt_event_conn_s {
+struct nxt_conn_s {
/*
* Must be the first field, since nxt_fd_event_t
- * and nxt_event_conn_t are used interchangeably.
+ * and nxt_conn_t are used interchangeably.
*/
nxt_fd_event_t socket;
nxt_buf_t *read;
- const nxt_event_conn_state_t *read_state;
+ const nxt_conn_state_t *read_state;
nxt_work_queue_t *read_work_queue;
nxt_timer_t read_timer;
nxt_buf_t *write;
- const nxt_event_conn_state_t *write_state;
+ const nxt_conn_state_t *write_state;
nxt_work_queue_t *write_work_queue;
nxt_event_write_rate_t *rate;
nxt_timer_t write_timer;
@@ -142,7 +139,7 @@ struct nxt_event_conn_s {
uint32_t max_chunk;
uint32_t nbytes;
- nxt_event_conn_io_t *io;
+ nxt_conn_io_t *io;
#if (NXT_SSLTLS || NXT_THREADS)
/* SunC does not support "zero-sized struct/union". */
@@ -163,7 +160,7 @@ struct nxt_event_conn_s {
nxt_task_t task;
nxt_log_t log;
- nxt_event_conn_listen_t *listen;
+ nxt_listen_event_t *listen;
nxt_sockaddr_t *remote;
nxt_sockaddr_t *local;
const char *action;
@@ -183,8 +180,7 @@ struct nxt_event_conn_s {
};
-#define \
-nxt_event_conn_timer_init(ev, c, wq) \
+#define nxt_conn_timer_init(ev, c, wq) \
do { \
(ev)->work_queue = (wq); \
(ev)->log = &(c)->log; \
@@ -192,20 +188,17 @@ nxt_event_conn_timer_init(ev, c, wq) \
} while (0)
-#define \
-nxt_event_read_timer_conn(ev) \
- nxt_timer_data(ev, nxt_event_conn_t, read_timer)
+#define nxt_read_timer_conn(ev) \
+ nxt_timer_data(ev, nxt_conn_t, read_timer)
-#define \
-nxt_event_write_timer_conn(ev) \
- nxt_timer_data(ev, nxt_event_conn_t, write_timer)
+#define nxt_write_timer_conn(ev) \
+ nxt_timer_data(ev, nxt_conn_t, write_timer)
#if (NXT_HAVE_UNIX_DOMAIN)
-#define \
-nxt_event_conn_tcp_nodelay_on(task, c) \
+#define nxt_conn_tcp_nodelay_on(task, c) \
do { \
nxt_int_t ret; \
\
@@ -220,8 +213,7 @@ nxt_event_conn_tcp_nodelay_on(task, c) \
#else
-#define \
-nxt_event_conn_tcp_nodelay_on(task, c) \
+#define nxt_conn_tcp_nodelay_on(task, c) \
do { \
nxt_int_t ret; \
\
@@ -234,39 +226,34 @@ nxt_event_conn_tcp_nodelay_on(task, c) \
#endif
-NXT_EXPORT nxt_event_conn_t *nxt_event_conn_create(nxt_mem_pool_t *mp,
- nxt_task_t *task);
-void nxt_event_conn_io_shutdown(nxt_task_t *task, void *obj, void *data);
-NXT_EXPORT void nxt_event_conn_close(nxt_event_engine_t *engine,
- nxt_event_conn_t *c);
+NXT_EXPORT nxt_conn_t *nxt_conn_create(nxt_mem_pool_t *mp, nxt_task_t *task);
+void nxt_conn_io_shutdown(nxt_task_t *task, void *obj, void *data);
+NXT_EXPORT void nxt_conn_close(nxt_event_engine_t *engine, nxt_conn_t *c);
-NXT_EXPORT void nxt_event_conn_timer(nxt_event_engine_t *engine,
- nxt_event_conn_t *c, const nxt_event_conn_state_t *state, nxt_timer_t *tev);
-NXT_EXPORT void nxt_event_conn_work_queue_set(nxt_event_conn_t *c,
- nxt_work_queue_t *wq);
+NXT_EXPORT void nxt_conn_timer(nxt_event_engine_t *engine, nxt_conn_t *c,
+ const nxt_conn_state_t *state, nxt_timer_t *tev);
+NXT_EXPORT void nxt_conn_work_queue_set(nxt_conn_t *c, nxt_work_queue_t *wq);
-void nxt_event_conn_sys_socket(nxt_task_t *task, void *obj, void *data);
-void nxt_event_conn_io_connect(nxt_task_t *task, void *obj, void *data);
-nxt_int_t nxt_event_conn_socket(nxt_task_t *task, nxt_event_conn_t *c);
-void nxt_event_conn_connect_test(nxt_task_t *task, void *obj, void *data);
-void nxt_event_conn_connect_error(nxt_task_t *task, void *obj, void *data);
+void nxt_conn_sys_socket(nxt_task_t *task, void *obj, void *data);
+void nxt_conn_io_connect(nxt_task_t *task, void *obj, void *data);
+nxt_int_t nxt_conn_socket(nxt_task_t *task, nxt_conn_t *c);
+void nxt_conn_connect_test(nxt_task_t *task, void *obj, void *data);
+void nxt_conn_connect_error(nxt_task_t *task, void *obj, void *data);
-NXT_EXPORT nxt_event_conn_listen_t *nxt_listen_event(nxt_task_t *task,
- nxt_listen_socket_t *ls);
-NXT_EXPORT nxt_int_t nxt_event_conn_listen(nxt_task_t *task,
+NXT_EXPORT nxt_listen_event_t *nxt_listen_event(nxt_task_t *task,
nxt_listen_socket_t *ls);
-void nxt_event_conn_io_accept(nxt_task_t *task, void *obj, void *data);
-NXT_EXPORT void nxt_event_conn_accept(nxt_task_t *task,
- nxt_event_conn_listen_t *cls, nxt_event_conn_t *c);
-void nxt_event_conn_accept_error(nxt_task_t *task, nxt_event_conn_listen_t *cls,
+void nxt_conn_io_accept(nxt_task_t *task, void *obj, void *data);
+NXT_EXPORT void nxt_conn_accept(nxt_task_t *task, nxt_listen_event_t *lev,
+ nxt_conn_t *c);
+void nxt_conn_accept_error(nxt_task_t *task, nxt_listen_event_t *lev,
const char *accept_syscall, nxt_err_t err);
-void nxt_conn_wait(nxt_event_conn_t *c);
+void nxt_conn_wait(nxt_conn_t *c);
-void nxt_event_conn_io_read(nxt_task_t *task, void *obj, void *data);
-ssize_t nxt_event_conn_io_recvbuf(nxt_event_conn_t *c, nxt_buf_t *b);
-ssize_t nxt_event_conn_io_recv(nxt_event_conn_t *c, void *buf,
- size_t size, nxt_uint_t flags);
+void nxt_conn_io_read(nxt_task_t *task, void *obj, void *data);
+ssize_t nxt_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b);
+ssize_t nxt_conn_io_recv(nxt_conn_t *c, void *buf, size_t size,
+ nxt_uint_t flags);
void nxt_conn_io_write(nxt_task_t *task, void *obj, void *data);
ssize_t nxt_conn_io_sendbuf(nxt_task_t *task, nxt_sendbuf_t *sb);
@@ -275,28 +262,25 @@ ssize_t nxt_conn_io_writev(nxt_task_t *task, nxt_sendbuf_t *sb,
ssize_t nxt_conn_io_send(nxt_task_t *task, nxt_sendbuf_t *sb, void *buf,
size_t size);
-size_t nxt_event_conn_write_limit(nxt_event_conn_t *c);
+size_t nxt_event_conn_write_limit(nxt_conn_t *c);
nxt_bool_t nxt_event_conn_write_delayed(nxt_event_engine_t *engine,
- nxt_event_conn_t *c, size_t sent);
-ssize_t nxt_event_conn_io_write_chunk(nxt_event_conn_t *c, nxt_buf_t *b,
+ nxt_conn_t *c, size_t sent);
+ssize_t nxt_event_conn_io_write_chunk(nxt_conn_t *c, nxt_buf_t *b,
size_t limit);
-ssize_t nxt_event_conn_io_writev(nxt_event_conn_t *c, nxt_iobuf_t *iob,
+ssize_t nxt_event_conn_io_writev(nxt_conn_t *c, nxt_iobuf_t *iob,
nxt_uint_t niob);
-ssize_t nxt_event_conn_io_send(nxt_event_conn_t *c, void *buf, size_t size);
-
-NXT_EXPORT void nxt_event_conn_io_close(nxt_task_t *task, void *obj,
- void *data);
+ssize_t nxt_event_conn_io_send(nxt_conn_t *c, void *buf, size_t size);
NXT_EXPORT void nxt_event_conn_job_sendfile(nxt_task_t *task,
- nxt_event_conn_t *c);
+ nxt_conn_t *c);
-#define nxt_event_conn_connect(engine, c) \
- nxt_work_queue_add(&engine->socket_work_queue, nxt_event_conn_sys_socket, \
+#define nxt_conn_connect(engine, c) \
+ nxt_work_queue_add(&engine->socket_work_queue, nxt_conn_sys_socket, \
c->socket.task, c, c->socket.data)
-#define nxt_event_conn_read(engine, c) \
+#define nxt_conn_read(engine, c) \
do { \
nxt_event_engine_t *e = engine; \
\
@@ -307,7 +291,7 @@ NXT_EXPORT void nxt_event_conn_job_sendfile(nxt_task_t *task,
} while (0)
-#define nxt_event_conn_write(e, c) \
+#define nxt_conn_write(e, c) \
do { \
nxt_event_engine_t *engine = e; \
\
@@ -318,7 +302,7 @@ NXT_EXPORT void nxt_event_conn_job_sendfile(nxt_task_t *task,
} while (0)
-extern nxt_event_conn_io_t nxt_unix_event_conn_io;
+extern nxt_conn_io_t nxt_unix_conn_io;
typedef struct {
@@ -326,8 +310,8 @@ typedef struct {
* Client and peer connections are not embedded because already
* existent connections can be switched to the event connection proxy.
*/
- nxt_event_conn_t *client;
- nxt_event_conn_t *peer;
+ nxt_conn_t *client;
+ nxt_conn_t *peer;
nxt_buf_t *client_buffer;
nxt_buf_t *peer_buffer;
@@ -347,13 +331,20 @@ typedef struct {
uint8_t retain; /* 2 bits */
nxt_work_handler_t completion_handler;
-} nxt_event_conn_proxy_t;
+} nxt_conn_proxy_t;
+
+
+NXT_EXPORT nxt_conn_proxy_t *nxt_conn_proxy_create(nxt_conn_t *c);
+NXT_EXPORT void nxt_conn_proxy(nxt_task_t *task, nxt_conn_proxy_t *p);
-NXT_EXPORT nxt_event_conn_proxy_t *nxt_event_conn_proxy_create(
- nxt_event_conn_t *c);
-NXT_EXPORT void nxt_event_conn_proxy(nxt_task_t *task,
- nxt_event_conn_proxy_t *p);
+/* STUB */
+#define nxt_event_conn_t nxt_conn_t
+#define nxt_event_conn_state_t nxt_conn_state_t
+#define nxt_event_conn_proxy_t nxt_conn_proxy_t
+#define nxt_event_conn_read nxt_conn_read
+#define nxt_event_conn_write nxt_conn_write
+#define nxt_event_conn_close nxt_conn_close
-#endif /* _NXT_EVENT_CONN_H_INCLUDED_ */
+#endif /* _NXT_CONN_H_INCLUDED_ */
diff --git a/src/nxt_conn_accept.c b/src/nxt_conn_accept.c
new file mode 100644
index 00000000..eb0172f4
--- /dev/null
+++ b/src/nxt_conn_accept.c
@@ -0,0 +1,366 @@
+
+/*
+ * Copyright (C) Igor Sysoev
+ * Copyright (C) NGINX, Inc.
+ */
+
+#include <nxt_main.h>
+
+
+/*
+ * A listen socket handler calls an event facility specific io_accept()
+ * method. The method accept()s a new connection and then calls
+ * nxt_event_conn_accept() to handle the new connection and to prepare
+ * for a next connection to avoid just dropping next accept()ed socket
+ * if no more connections allowed. If there are no available connections
+ * an idle connection would be closed. If there are no idle connections
+ * then new connections will not be accept()ed for 1 second.
+ */
+
+
+static nxt_conn_t *nxt_conn_accept_alloc(nxt_task_t *task,
+ nxt_listen_event_t *lev);
+static void nxt_conn_listen_handler(nxt_task_t *task, void *obj,
+ void *data);
+static nxt_conn_t *nxt_conn_accept_next(nxt_task_t *task,
+ nxt_listen_event_t *lev);
+static nxt_int_t nxt_conn_accept_close_idle(nxt_task_t *task,
+ nxt_listen_event_t *lev);
+static void nxt_conn_listen_event_error(nxt_task_t *task, void *obj,
+ void *data);
+static void nxt_conn_listen_timer_handler(nxt_task_t *task, void *obj,
+ void *data);
+
+
+nxt_listen_event_t *
+nxt_listen_event(nxt_task_t *task, nxt_listen_socket_t *ls)
+{
+ nxt_listen_event_t *lev;
+ nxt_event_engine_t *engine;
+
+ lev = nxt_zalloc(sizeof(nxt_listen_event_t));
+
+ if (nxt_fast_path(lev != NULL)) {
+ lev->socket.fd = ls->socket;
+
+ engine = task->thread->engine;
+ lev->batch = engine->batch;
+
+ lev->socket.read_work_queue = &engine->accept_work_queue;
+ lev->socket.read_handler = nxt_conn_listen_handler;
+ lev->socket.error_handler = nxt_conn_listen_event_error;
+ lev->socket.log = &nxt_main_log;
+
+ lev->accept = engine->event.io->accept;
+
+ lev->listen = ls;
+ lev->work_queue = &engine->read_work_queue;
+
+ lev->timer.work_queue = &engine->fast_work_queue;
+ lev->timer.handler = nxt_conn_listen_timer_handler;
+ lev->timer.log = &nxt_main_log;
+
+ lev->task.thread = task->thread;
+ lev->task.log = &nxt_main_log;
+ lev->task.ident = nxt_task_next_ident();
+ lev->socket.task = &lev->task;
+ lev->timer.task = &lev->task;
+
+ if (nxt_conn_accept_alloc(task, lev) != NULL) {
+ nxt_fd_event_enable_accept(engine, &lev->socket);
+
+ nxt_queue_insert_head(&engine->listen_connections, &lev->link);
+ }
+
+ return lev;
+ }
+
+ return NULL;
+}
+
+
+static nxt_conn_t *
+nxt_conn_accept_alloc(nxt_task_t *task, nxt_listen_event_t *lev)
+{
+ nxt_conn_t *c;
+ nxt_sockaddr_t *sa, *remote;
+ nxt_mem_pool_t *mp;
+ nxt_event_engine_t *engine;
+ nxt_listen_socket_t *ls;
+
+ engine = task->thread->engine;
+
+ if (engine->connections < engine->max_connections) {
+
+ mp = nxt_mem_pool_create(lev->listen->mem_pool_size);
+
+ if (nxt_fast_path(mp != NULL)) {
+ /* This allocation cannot fail. */
+ c = nxt_conn_create(mp, lev->socket.task);
+
+ lev->next = c;
+ c->socket.read_work_queue = lev->socket.read_work_queue;
+ c->socket.write_ready = 1;
+ c->listen = lev;
+
+ ls = lev->listen;
+ /* This allocation cannot fail. */
+ remote = nxt_sockaddr_alloc(mp, ls->socklen, ls->address_length);
+ c->remote = remote;
+
+ sa = ls->sockaddr;
+ remote->type = sa->type;
+ /*
+ * Set address family for unspecified Unix domain,
+ * because these sockaddr's are not be passed to accept().
+ */
+ remote->u.sockaddr.sa_family = sa->u.sockaddr.sa_family;
+
+ return c;
+ }
+ }
+
+ return NULL;
+}
+
+
+static void
+nxt_conn_listen_handler(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_listen_event_t *lev;
+
+ lev = obj;
+ lev->ready = lev->batch;
+
+ lev->accept(task, lev, data);
+}
+
+
+void
+nxt_conn_io_accept(nxt_task_t *task, void *obj, void *data)
+{
+ socklen_t len;
+ nxt_conn_t *c;
+ nxt_socket_t s;
+ struct sockaddr *sa;
+ nxt_listen_event_t *lev;
+
+ lev = obj;
+ c = lev->next;
+
+ lev->ready--;
+ lev->socket.read_ready = (lev->ready != 0);
+
+ len = c->remote->socklen;
+
+ if (len >= sizeof(struct sockaddr)) {
+ sa = &c->remote->u.sockaddr;
+
+ } else {
+ sa = NULL;
+ len = 0;
+ }
+
+ s = accept(lev->socket.fd, sa, &len);
+
+ if (s == -1) {
+ nxt_conn_accept_error(task, lev, "accept", nxt_socket_errno);
+ return;
+ }
+
+ c->socket.fd = s;
+
+#if (NXT_LINUX)
+ /*
+ * Linux does not inherit non-blocking mode
+ * from listen socket for accept()ed socket.
+ */
+ if (nxt_slow_path(nxt_socket_nonblocking(task, s) != NXT_OK)) {
+ nxt_socket_close(task, s);
+ }
+
+#endif
+
+ nxt_debug(task, "accept(%d): %d", lev->socket.fd, s);
+
+ nxt_conn_accept(task, lev, c);
+}
+
+
+void
+nxt_conn_accept(nxt_task_t *task, nxt_listen_event_t *lev, nxt_conn_t *c)
+{
+ nxt_conn_t *next;
+
+ nxt_sockaddr_text(c->remote);
+
+ nxt_debug(task, "client: %*s",
+ c->remote->address_length, nxt_sockaddr_address(c->remote));
+
+ nxt_queue_insert_head(&task->thread->engine->idle_connections, &c->link);
+
+ c->read_work_queue = lev->work_queue;
+ c->write_work_queue = lev->work_queue;
+
+ if (lev->listen->read_after_accept) {
+
+ //c->socket.read_ready = 1;
+// lev->listen->handler(task, c, lev->socket.data);
+ nxt_work_queue_add(c->read_work_queue, lev->listen->handler,
+ task, c, lev->socket.data);
+
+ } else {
+ nxt_work_queue_add(c->write_work_queue, lev->listen->handler,
+ task, c, lev->socket.data);
+ }
+
+ next = nxt_conn_accept_next(task, lev);
+
+ if (next != NULL && lev->socket.read_ready) {
+ nxt_work_queue_add(lev->socket.read_work_queue,
+ lev->accept, task, lev, next);
+ }
+}
+
+
+static nxt_conn_t *
+nxt_conn_accept_next(nxt_task_t *task, nxt_listen_event_t *lev)
+{
+ nxt_conn_t *c;
+
+ lev->next = NULL;
+
+ do {
+ c = nxt_conn_accept_alloc(task, lev);
+
+ if (nxt_fast_path(c != NULL)) {
+ return c;
+ }
+
+ } while (nxt_conn_accept_close_idle(task, lev) == NXT_OK);
+
+ nxt_log(task, NXT_LOG_CRIT, "no available connections, "
+ "new connections are not accepted within 1s");
+
+ return NULL;
+}
+
+
+static nxt_int_t
+nxt_conn_accept_close_idle(nxt_task_t *task, nxt_listen_event_t *lev)
+{
+ nxt_conn_t *c;
+ nxt_queue_t *idle;
+ nxt_queue_link_t *link;
+ nxt_event_engine_t *engine;
+
+ static nxt_log_moderation_t nxt_idle_close_log_moderation = {
+ NXT_LOG_INFO, 2, "idle connections closed", NXT_LOG_MODERATION
+ };
+
+ engine = task->thread->engine;
+
+ idle = &engine->idle_connections;
+
+ for (link = nxt_queue_last(idle);
+ link != nxt_queue_head(idle);
+ link = nxt_queue_next(link))
+ {
+ c = nxt_queue_link_data(link, nxt_conn_t, link);
+
+ if (!c->socket.read_ready) {
+ nxt_log_moderate(&nxt_idle_close_log_moderation, NXT_LOG_INFO,
+ task->log, "no available connections, "
+ "close idle connection");
+ nxt_queue_remove(link);
+ nxt_conn_close(engine, c);
+
+ return NXT_OK;
+ }
+ }
+
+ nxt_timer_add(engine, &lev->timer, 1000);
+
+ nxt_fd_event_disable_read(engine, &lev->socket);
+
+ return NXT_DECLINED;
+}
+
+
+void
+nxt_conn_accept_error(nxt_task_t *task, nxt_listen_event_t *lev,
+ const char *accept_syscall, nxt_err_t err)
+{
+ static nxt_log_moderation_t nxt_accept_log_moderation = {
+ NXT_LOG_INFO, 2, "accept() failed", NXT_LOG_MODERATION
+ };
+
+ lev->socket.read_ready = 0;
+
+ switch (err) {
+
+ case NXT_EAGAIN:
+ nxt_debug(task, "%s(%d) %E", accept_syscall, lev->socket.fd, err);
+ return;
+
+ case ECONNABORTED:
+ nxt_log_moderate(&nxt_accept_log_moderation, NXT_LOG_WARN,
+ task->log, "%s(%d) failed %E",
+ accept_syscall, lev->socket.fd, err);
+ return;
+
+ case EMFILE:
+ case ENFILE:
+ case ENOBUFS:
+ case ENOMEM:
+ if (nxt_conn_accept_close_idle(task, lev) != NXT_OK) {
+ nxt_log(task, NXT_LOG_CRIT, "%s(%d) failed %E, "
+ "new connections are not accepted within 1s",
+ accept_syscall, lev->socket.fd, err);
+ }
+
+ return;
+
+ default:
+ nxt_log(task, NXT_LOG_CRIT, "%s(%d) failed %E",
+ accept_syscall, lev->socket.fd, err);
+ return;
+ }
+}
+
+
+static void
+nxt_conn_listen_timer_handler(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_conn_t *c;
+ nxt_timer_t *timer;
+ nxt_listen_event_t *lev;
+
+ timer = obj;
+
+ lev = nxt_timer_data(timer, nxt_listen_event_t, timer);
+ c = lev->next;
+
+ if (c == NULL) {
+ c = nxt_conn_accept_next(task, lev);
+
+ if (c == NULL) {
+ return;
+ }
+ }
+
+ nxt_fd_event_enable_accept(task->thread->engine, &lev->socket);
+
+ lev->accept(task, lev, c);
+}
+
+
+static void
+nxt_conn_listen_event_error(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_fd_event_t *ev;
+
+ ev = obj;
+
+ nxt_log(task, NXT_LOG_CRIT, "accept(%d) event error", ev->fd);
+}
diff --git a/src/nxt_conn_close.c b/src/nxt_conn_close.c
index 18b2450c..fb86d052 100644
--- a/src/nxt_conn_close.c
+++ b/src/nxt_conn_close.c
@@ -16,7 +16,7 @@ static void nxt_conn_close_error_ignore(nxt_task_t *task, void *obj,
void
-nxt_event_conn_close(nxt_event_engine_t *engine, nxt_event_conn_t *c)
+nxt_conn_close(nxt_event_engine_t *engine, nxt_conn_t *c)
{
int ret;
socklen_t len;
@@ -69,7 +69,7 @@ nxt_event_conn_close(nxt_event_engine_t *engine, nxt_event_conn_t *c)
static void
nxt_conn_shutdown_handler(nxt_task_t *task, void *obj, void *data)
{
- nxt_event_conn_t *c;
+ nxt_conn_t *c;
nxt_event_engine_t *engine;
c = obj;
@@ -90,7 +90,7 @@ static void
nxt_conn_close_handler(nxt_task_t *task, void *obj, void *data)
{
nxt_uint_t events_pending, timers_pending;
- nxt_event_conn_t *c;
+ nxt_conn_t *c;
nxt_event_engine_t *engine;
c = obj;
@@ -132,12 +132,12 @@ nxt_conn_close_handler(nxt_task_t *task, void *obj, void *data)
static void
nxt_conn_close_timer_handler(nxt_task_t *task, void *obj, void *data)
{
- nxt_timer_t *timer;
- nxt_event_conn_t *c;
+ nxt_conn_t *c;
+ nxt_timer_t *timer;
timer = obj;
- c = nxt_event_write_timer_conn(timer);
+ c = nxt_write_timer_conn(timer);
nxt_debug(task, "conn close timer handler fd:%d", c->socket.fd);
diff --git a/src/nxt_event_conn_connect.c b/src/nxt_conn_connect.c
index 55aa33f9..94d25c30 100644
--- a/src/nxt_event_conn_connect.c
+++ b/src/nxt_conn_connect.c
@@ -8,14 +8,14 @@
void
-nxt_event_conn_sys_socket(nxt_task_t *task, void *obj, void *data)
+nxt_conn_sys_socket(nxt_task_t *task, void *obj, void *data)
{
- nxt_event_conn_t *c;
+ nxt_conn_t *c;
nxt_work_handler_t handler;
c = obj;
- if (nxt_event_conn_socket(task, c) == NXT_OK) {
+ if (nxt_conn_socket(task, c) == NXT_OK) {
c->socket.write_work_queue = c->write_work_queue;
handler = c->io->connect;
@@ -29,12 +29,12 @@ nxt_event_conn_sys_socket(nxt_task_t *task, void *obj, void *data)
void
-nxt_event_conn_io_connect(nxt_task_t *task, void *obj, void *data)
+nxt_conn_io_connect(nxt_task_t *task, void *obj, void *data)
{
- nxt_event_conn_t *c;
- nxt_work_handler_t handler;
- nxt_event_engine_t *engine;
- const nxt_event_conn_state_t *state;
+ nxt_conn_t *c;
+ nxt_work_handler_t handler;
+ nxt_event_engine_t *engine;
+ const nxt_conn_state_t *state;
c = obj;
@@ -48,12 +48,12 @@ nxt_event_conn_io_connect(nxt_task_t *task, void *obj, void *data)
break;
case NXT_AGAIN:
- c->socket.write_handler = nxt_event_conn_connect_test;
+ c->socket.write_handler = nxt_conn_connect_test;
c->socket.error_handler = state->error_handler;
engine = task->thread->engine;
- nxt_event_conn_timer(engine, c, state, &c->write_timer);
+ nxt_conn_timer(engine, c, state, &c->write_timer);
nxt_fd_event_enable_write(engine, &c->socket);
return;
@@ -72,7 +72,7 @@ nxt_event_conn_io_connect(nxt_task_t *task, void *obj, void *data)
nxt_int_t
-nxt_event_conn_socket(nxt_task_t *task, nxt_event_conn_t *c)
+nxt_conn_socket(nxt_task_t *task, nxt_conn_t *c)
{
nxt_uint_t family;
nxt_socket_t s;
@@ -116,11 +116,11 @@ nxt_event_conn_socket(nxt_task_t *task, nxt_event_conn_t *c)
void
-nxt_event_conn_connect_test(nxt_task_t *task, void *obj, void *data)
+nxt_conn_connect_test(nxt_task_t *task, void *obj, void *data)
{
- int ret, err;
- socklen_t len;
- nxt_event_conn_t *c;
+ int ret, err;
+ socklen_t len;
+ nxt_conn_t *c;
c = obj;
@@ -157,16 +157,16 @@ nxt_event_conn_connect_test(nxt_task_t *task, void *obj, void *data)
nxt_log(task, nxt_socket_error_level(err), "connect(%d, %*s) failed %E",
c->socket.fd, c->remote->length, nxt_sockaddr_start(c->remote));
- nxt_event_conn_connect_error(task, c, data);
+ nxt_conn_connect_error(task, c, data);
}
void
-nxt_event_conn_connect_error(nxt_task_t *task, void *obj, void *data)
+nxt_conn_connect_error(nxt_task_t *task, void *obj, void *data)
{
- nxt_event_conn_t *c;
- nxt_work_handler_t handler;
- const nxt_event_conn_state_t *state;
+ nxt_conn_t *c;
+ nxt_work_handler_t handler;
+ const nxt_conn_state_t *state;
c = obj;
diff --git a/src/nxt_conn_proxy.c b/src/nxt_conn_proxy.c
new file mode 100644
index 00000000..dec23684
--- /dev/null
+++ b/src/nxt_conn_proxy.c
@@ -0,0 +1,998 @@
+
+/*
+ * Copyright (C) Igor Sysoev
+ * Copyright (C) NGINX, Inc.
+ */
+
+#include <nxt_main.h>
+
+
+static void nxt_conn_proxy_client_buffer_alloc(nxt_task_t *task, void *obj,
+ void *data);
+static void nxt_conn_proxy_peer_connect(nxt_task_t *task, void *obj,
+ void *data);
+static void nxt_conn_proxy_connected(nxt_task_t *task, void *obj, void *data);
+static void nxt_conn_proxy_peer_read(nxt_task_t *task, void *obj, void *data);
+static void nxt_conn_proxy_client_read_ready(nxt_task_t *task, void *obj,
+ void *data);
+static void nxt_conn_proxy_peer_read_ready(nxt_task_t *task, void *obj,
+ void *data);
+static void nxt_conn_proxy_read_process(nxt_task_t *task, nxt_conn_proxy_t *p,
+ nxt_conn_t *source, nxt_conn_t *sink);
+static void nxt_conn_proxy_write_add(nxt_conn_t *c, nxt_buf_t *b);
+static void nxt_conn_proxy_read(nxt_task_t *task, void *obj, void *data);
+static void nxt_conn_proxy_client_write_ready(nxt_task_t *task, void *obj,
+ void *data);
+static void nxt_conn_proxy_peer_write_ready(nxt_task_t *task, void *obj,
+ void *data);
+static void nxt_conn_proxy_write_process(nxt_task_t *task, nxt_conn_proxy_t *p,
+ nxt_conn_t *sink, nxt_conn_t *source);
+static void nxt_conn_proxy_read_add(nxt_conn_t *c, nxt_buf_t *b);
+static void nxt_conn_proxy_close(nxt_task_t *task, void *obj, void *data);
+static void nxt_conn_proxy_error(nxt_task_t *task, void *obj, void *data);
+static void nxt_conn_proxy_read_timeout(nxt_task_t *task, void *obj,
+ void *data);
+static void nxt_conn_proxy_write_timeout(nxt_task_t *task, void *obj,
+ void *data);
+static nxt_msec_t nxt_conn_proxy_timeout_value(nxt_conn_t *c, uintptr_t data);
+static void nxt_conn_proxy_refused(nxt_task_t *task, void *obj, void *data);
+static void nxt_conn_proxy_reconnect_handler(nxt_task_t *task, void *obj,
+ void *data);
+static void nxt_conn_proxy_shutdown(nxt_task_t *task, nxt_conn_proxy_t *p,
+ nxt_conn_t *source, nxt_conn_t *sink);
+static void nxt_conn_proxy_read_error(nxt_task_t *task, void *obj, void *data);
+static void nxt_conn_proxy_write_error(nxt_task_t *task, void *obj, void *data);
+static void nxt_conn_proxy_complete(nxt_task_t *task, nxt_conn_proxy_t *p);
+static void nxt_conn_proxy_completion(nxt_task_t *task, void *obj, void *data);
+
+
+static const nxt_conn_state_t nxt_conn_proxy_client_wait_state;
+static const nxt_conn_state_t nxt_conn_proxy_client_first_read_state;
+static const nxt_conn_state_t nxt_conn_proxy_peer_connect_state;
+static const nxt_conn_state_t nxt_conn_proxy_peer_wait_state;
+static const nxt_conn_state_t nxt_conn_proxy_client_read_state;
+static const nxt_conn_state_t nxt_conn_proxy_peer_read_state;
+static const nxt_conn_state_t nxt_conn_proxy_client_write_state;
+static const nxt_conn_state_t nxt_conn_proxy_peer_write_state;
+
+
+nxt_conn_proxy_t *
+nxt_conn_proxy_create(nxt_conn_t *client)
+{
+ nxt_conn_t *peer;
+ nxt_thread_t *thr;
+ nxt_conn_proxy_t *p;
+
+ p = nxt_mem_zalloc(client->mem_pool, sizeof(nxt_conn_proxy_t));
+ if (nxt_slow_path(p == NULL)) {
+ return NULL;
+ }
+
+ peer = nxt_conn_create(client->mem_pool, client->socket.task);
+ if (nxt_slow_path(peer == NULL)) {
+ return NULL;
+ }
+
+ thr = nxt_thread();
+
+ client->read_work_queue = &thr->engine->read_work_queue;
+ client->write_work_queue = &thr->engine->write_work_queue;
+ client->socket.read_work_queue = &thr->engine->read_work_queue;
+ client->socket.write_work_queue = &thr->engine->write_work_queue;
+ peer->socket.read_work_queue = &thr->engine->read_work_queue;
+ peer->socket.write_work_queue = &thr->engine->write_work_queue;
+
+ peer->socket.data = client->socket.data;
+
+ peer->read_work_queue = client->read_work_queue;
+ peer->write_work_queue = client->write_work_queue;
+ peer->read_timer.work_queue = client->read_work_queue;
+ peer->write_timer.work_queue = client->write_work_queue;
+
+ p->client = client;
+ p->peer = peer;
+
+ return p;
+}
+
+
+void
+nxt_conn_proxy(nxt_task_t *task, nxt_conn_proxy_t *p)
+{
+ nxt_conn_t *peer;
+
+ /*
+ * Peer read event: not connected, disabled.
+ * Peer write event: not connected, disabled.
+ */
+
+ if (p->client_wait_timeout == 0) {
+ /*
+ * Peer write event: waiting for connection
+ * to be established with connect_timeout.
+ */
+ peer = p->peer;
+ peer->write_state = &nxt_conn_proxy_peer_connect_state;
+
+ nxt_conn_connect(task->thread->engine, peer);
+ }
+
+ /*
+ * Client read event: waiting for client data with
+ * client_wait_timeout before buffer allocation.
+ */
+ p->client->read_state = &nxt_conn_proxy_client_wait_state;
+
+ nxt_conn_wait(p->client);
+}
+
+
+static const nxt_conn_state_t nxt_conn_proxy_client_wait_state
+ nxt_aligned(64) =
+{
+ .ready_handler = nxt_conn_proxy_client_buffer_alloc,
+ .close_handler = nxt_conn_proxy_close,
+ .error_handler = nxt_conn_proxy_error,
+
+ .timer_handler = nxt_conn_proxy_read_timeout,
+ .timer_value = nxt_conn_proxy_timeout_value,
+ .timer_data = offsetof(nxt_conn_proxy_t, client_wait_timeout),
+};
+
+
+static void
+nxt_conn_proxy_client_buffer_alloc(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_buf_t *b;
+ nxt_conn_t *client;
+ nxt_conn_proxy_t *p;
+
+ client = obj;
+ p = data;
+
+ nxt_debug(task, "conn proxy client first read fd:%d", client->socket.fd);
+
+ b = nxt_buf_mem_alloc(client->mem_pool, p->client_buffer_size,
+ NXT_MEM_BUF_CUTBACK | NXT_MEM_BUF_USABLE);
+
+ if (nxt_slow_path(b == NULL)) {
+ /* An error completion. */
+ nxt_conn_proxy_complete(task, p);
+ return;
+ }
+
+ p->client_buffer = b;
+ client->read = b;
+
+ if (p->peer->socket.fd != -1) {
+ /*
+ * Client read event: waiting, no timeout.
+ * Client write event: blocked.
+ * Peer read event: disabled.
+ * Peer write event: waiting for connection to be established
+ * or blocked after the connection has established.
+ */
+ client->read_state = &nxt_conn_proxy_client_read_state;
+
+ } else {
+ /*
+ * Client read event: waiting for data with client_wait_timeout
+ * before connecting to a peer.
+ * Client write event: blocked.
+ * Peer read event: not connected, disabled.
+ * Peer write event: not connected, disabled.
+ */
+ client->read_state = &nxt_conn_proxy_client_first_read_state;
+ }
+
+ nxt_conn_read(task->thread->engine, client);
+}
+
+
+static const nxt_conn_state_t nxt_conn_proxy_client_first_read_state
+ nxt_aligned(64) =
+{
+ .ready_handler = nxt_conn_proxy_peer_connect,
+ .close_handler = nxt_conn_proxy_close,
+ .error_handler = nxt_conn_proxy_error,
+
+ .timer_handler = nxt_conn_proxy_read_timeout,
+ .timer_value = nxt_conn_proxy_timeout_value,
+ .timer_data = offsetof(nxt_conn_proxy_t, client_wait_timeout),
+ .timer_autoreset = 1,
+};
+
+
+static void
+nxt_conn_proxy_peer_connect(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_conn_t *client;
+ nxt_conn_proxy_t *p;
+
+ client = obj;
+ p = data;
+
+ /*
+ * Client read event: waiting, no timeout.
+ * Client write event: blocked.
+ * Peer read event: disabled.
+ * Peer write event: waiting for connection to be established
+ * with connect_timeout.
+ */
+ client->read_state = &nxt_conn_proxy_client_read_state;
+
+ p->peer->write_state = &nxt_conn_proxy_peer_connect_state;
+
+ nxt_conn_connect(task->thread->engine, p->peer);
+}
+
+
+static const nxt_conn_state_t nxt_conn_proxy_peer_connect_state
+ nxt_aligned(64) =
+{
+ .ready_handler = nxt_conn_proxy_connected,
+ .close_handler = nxt_conn_proxy_refused,
+ .error_handler = nxt_conn_proxy_error,
+
+ .timer_handler = nxt_conn_proxy_write_timeout,
+ .timer_value = nxt_conn_proxy_timeout_value,
+ .timer_data = offsetof(nxt_conn_proxy_t, connect_timeout),
+ .timer_autoreset = 1,
+};
+
+
+static void
+nxt_conn_proxy_connected(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_conn_t *client, *peer;
+ nxt_conn_proxy_t *p;
+
+ peer = obj;
+ p = data;
+
+ nxt_debug(task, "conn proxy connected fd:%d", peer->socket.fd);
+
+ p->connected = 1;
+
+ nxt_conn_tcp_nodelay_on(task, peer);
+ nxt_conn_tcp_nodelay_on(task, p->client);
+
+ /* Peer read event: waiting with peer_wait_timeout. */
+
+ peer->read_state = &nxt_conn_proxy_peer_wait_state;
+ peer->write_state = &nxt_conn_proxy_peer_write_state;
+
+ nxt_conn_wait(peer);
+
+ if (p->client_buffer != NULL) {
+ client = p->client;
+
+ client->read_state = &nxt_conn_proxy_client_read_state;
+ client->write_state = &nxt_conn_proxy_client_write_state;
+ /*
+ * Send a client read data to the connected peer.
+ * Client write event: blocked.
+ */
+ nxt_conn_proxy_read_process(task, p, client, peer);
+ }
+}
+
+
+static const nxt_conn_state_t nxt_conn_proxy_peer_wait_state
+ nxt_aligned(64) =
+{
+ .ready_handler = nxt_conn_proxy_peer_read,
+ .close_handler = nxt_conn_proxy_close,
+ .error_handler = nxt_conn_proxy_error,
+
+ .timer_handler = nxt_conn_proxy_read_timeout,
+ .timer_value = nxt_conn_proxy_timeout_value,
+ .timer_data = offsetof(nxt_conn_proxy_t, peer_wait_timeout),
+};
+
+
+static void
+nxt_conn_proxy_peer_read(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_buf_t *b;
+ nxt_conn_t *peer;
+ nxt_conn_proxy_t *p;
+
+ peer = obj;
+ p = data;
+
+ nxt_debug(task, "conn proxy peer read fd:%d", peer->socket.fd);
+
+ b = nxt_buf_mem_alloc(peer->mem_pool, p->peer_buffer_size,
+ NXT_MEM_BUF_CUTBACK | NXT_MEM_BUF_USABLE);
+
+ if (nxt_slow_path(b == NULL)) {
+ /* An error completion. */
+ nxt_conn_proxy_complete(task, p);
+ return;
+ }
+
+ p->peer_buffer = b;
+ peer->read = b;
+
+ p->client->write_state = &nxt_conn_proxy_client_write_state;
+ peer->read_state = &nxt_conn_proxy_peer_read_state;
+ peer->write_state = &nxt_conn_proxy_peer_write_state;
+
+ /*
+ * Client read event: waiting, no timeout.
+ * Client write event: blocked.
+ * Peer read event: waiting with possible peer_wait_timeout.
+ * Peer write event: blocked.
+ */
+ nxt_conn_read(task->thread->engine, peer);
+}
+
+
+static const nxt_conn_state_t nxt_conn_proxy_client_read_state
+ nxt_aligned(64) =
+{
+ .ready_handler = nxt_conn_proxy_client_read_ready,
+ .close_handler = nxt_conn_proxy_close,
+ .error_handler = nxt_conn_proxy_read_error,
+};
+
+
+static void
+nxt_conn_proxy_client_read_ready(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_conn_t *client;
+ nxt_conn_proxy_t *p;
+
+ client = obj;
+ p = data;
+
+ nxt_debug(task, "conn proxy client read ready fd:%d", client->socket.fd);
+
+ nxt_conn_proxy_read_process(task, p, client, p->peer);
+}
+
+
+static const nxt_conn_state_t nxt_conn_proxy_peer_read_state
+ nxt_aligned(64) =
+{
+ .ready_handler = nxt_conn_proxy_peer_read_ready,
+ .close_handler = nxt_conn_proxy_close,
+ .error_handler = nxt_conn_proxy_read_error,
+};
+
+
+static void
+nxt_conn_proxy_peer_read_ready(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_conn_t *peer;
+ nxt_conn_proxy_t *p;
+
+ peer = obj;
+ p = data;
+
+ nxt_debug(task, "conn proxy peer read ready fd:%d", peer->socket.fd);
+
+ nxt_conn_proxy_read_process(task, p, peer, p->client);
+}
+
+
+static void
+nxt_conn_proxy_read_process(nxt_task_t *task, nxt_conn_proxy_t *p,
+ nxt_conn_t *source, nxt_conn_t *sink)
+{
+ nxt_buf_t *rb, *wb;
+
+ if (sink->socket.error != 0) {
+ nxt_debug(task, "conn proxy sink fd:%d error:%d",
+ sink->socket.fd, sink->socket.error);
+
+ nxt_conn_proxy_write_error(task, sink, sink->socket.data);
+ return;
+ }
+
+ while (source->read != NULL) {
+
+ rb = source->read;
+
+ if (rb->mem.pos != rb->mem.free) {
+
+ /* Add a read part to a write chain. */
+
+ wb = nxt_buf_mem_alloc(source->mem_pool, 0, 0);
+ if (wb == NULL) {
+ /* An error completion. */
+ nxt_conn_proxy_complete(task, p);
+ return;
+ }
+
+ wb->mem.pos = rb->mem.pos;
+ wb->mem.free = rb->mem.free;
+ wb->mem.start = rb->mem.pos;
+ wb->mem.end = rb->mem.free;
+
+ rb->mem.pos = rb->mem.free;
+ rb->mem.start = rb->mem.free;
+
+ nxt_conn_proxy_write_add(sink, wb);
+ }
+
+ if (rb->mem.start != rb->mem.end) {
+ nxt_work_queue_add(source->read_work_queue, nxt_conn_proxy_read,
+ task, source, source->socket.data);
+ break;
+ }
+
+ source->read = rb->next;
+ nxt_buf_free(source->mem_pool, rb);
+ }
+
+ if (p->connected) {
+ nxt_conn_write(task->thread->engine, sink);
+ }
+}
+
+
+static void
+nxt_conn_proxy_write_add(nxt_conn_t *c, nxt_buf_t *b)
+{
+ nxt_buf_t *first, *second, *prev;
+
+ first = c->write;
+
+ if (first == NULL) {
+ c->write = b;
+ return;
+ }
+
+ /*
+ * A event conn proxy maintains a buffer per each direction.
+ * The buffer is divided by read and write parts. These parts are
+ * linked in buffer chains. There can be no more than two buffers
+ * in write chain at any time, because an added buffer is coalesced
+ * with the last buffer if possible.
+ */
+
+ second = first->next;
+
+ if (second == NULL) {
+
+ if (first->mem.end != b->mem.start) {
+ first->next = b;
+ return;
+ }
+
+ /*
+ * The first buffer is just before the added buffer, so
+ * expand the first buffer to the end of the added buffer.
+ */
+ prev = first;
+
+ } else {
+ if (second->mem.end != b->mem.start) {
+ nxt_thread_log_alert("event conn proxy write: second buffer end:%p "
+ "is not equal to added buffer start:%p",
+ second->mem.end, b->mem.start);
+ return;
+ }
+
+ /*
+ * "second->mem.end == b->mem.start" must be always true here,
+ * that is the second buffer is just before the added buffer,
+ * so expand the second buffer to the end of added buffer.
+ */
+ prev = second;
+ }
+
+ prev->mem.free = b->mem.end;
+ prev->mem.end = b->mem.end;
+
+ nxt_buf_free(c->mem_pool, b);
+}
+
+
+static void
+nxt_conn_proxy_read(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_conn_t *source, *sink;
+ nxt_conn_proxy_t *p;
+
+ source = obj;
+ p = data;
+
+ nxt_debug(task, "conn proxy read fd:%d", source->socket.fd);
+
+ if (!source->socket.closed) {
+ sink = (source == p->client) ? p->peer : p->client;
+
+ if (sink->socket.error == 0) {
+ nxt_conn_read(task->thread->engine, source);
+ }
+ }
+}
+
+
+static const nxt_conn_state_t nxt_conn_proxy_client_write_state
+ nxt_aligned(64) =
+{
+ .ready_handler = nxt_conn_proxy_client_write_ready,
+ .error_handler = nxt_conn_proxy_write_error,
+
+ .timer_handler = nxt_conn_proxy_write_timeout,
+ .timer_value = nxt_conn_proxy_timeout_value,
+ .timer_data = offsetof(nxt_conn_proxy_t, client_write_timeout),
+ .timer_autoreset = 1,
+};
+
+
+static void
+nxt_conn_proxy_client_write_ready(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_conn_t *client;
+ nxt_conn_proxy_t *p;
+
+ client = obj;
+ p = data;
+
+ nxt_debug(task, "conn proxy client write ready fd:%d", client->socket.fd);
+
+ nxt_conn_proxy_write_process(task, p, client, p->peer);
+}
+
+
+static const nxt_conn_state_t nxt_conn_proxy_peer_write_state
+ nxt_aligned(64) =
+{
+ .ready_handler = nxt_conn_proxy_peer_write_ready,
+ .error_handler = nxt_conn_proxy_write_error,
+
+ .timer_handler = nxt_conn_proxy_write_timeout,
+ .timer_value = nxt_conn_proxy_timeout_value,
+ .timer_data = offsetof(nxt_conn_proxy_t, peer_write_timeout),
+ .timer_autoreset = 1,
+};
+
+
+static void
+nxt_conn_proxy_peer_write_ready(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_conn_t *peer;
+ nxt_conn_proxy_t *p;
+
+ peer = obj;
+ p = data;
+
+ nxt_debug(task, "conn proxy peer write ready fd:%d", peer->socket.fd);
+
+ nxt_conn_proxy_write_process(task, p, peer, p->client);
+}
+
+
+static void
+nxt_conn_proxy_write_process(nxt_task_t *task, nxt_conn_proxy_t *p,
+ nxt_conn_t *sink, nxt_conn_t *source)
+{
+ nxt_buf_t *rb, *wb;
+
+ while (sink->write != NULL) {
+
+ wb = sink->write;
+
+ if (nxt_buf_is_sync(wb)) {
+
+ /* A sync buffer marks the end of stream. */
+
+ sink->write = NULL;
+ nxt_buf_free(sink->mem_pool, wb);
+ nxt_conn_proxy_shutdown(task, p, source, sink);
+ return;
+ }
+
+ if (wb->mem.start != wb->mem.pos) {
+
+ /* Add a written part to a read chain. */
+
+ rb = nxt_buf_mem_alloc(sink->mem_pool, 0, 0);
+ if (rb == NULL) {
+ /* An error completion. */
+ nxt_conn_proxy_complete(task, p);
+ return;
+ }
+
+ rb->mem.pos = wb->mem.start;
+ rb->mem.free = wb->mem.start;
+ rb->mem.start = wb->mem.start;
+ rb->mem.end = wb->mem.pos;
+
+ wb->mem.start = wb->mem.pos;
+
+ nxt_conn_proxy_read_add(source, rb);
+ }
+
+ if (wb->mem.pos != wb->mem.free) {
+ nxt_conn_write(task->thread->engine, sink);
+
+ break;
+ }
+
+ sink->write = wb->next;
+ nxt_buf_free(sink->mem_pool, wb);
+ }
+
+ nxt_work_queue_add(source->read_work_queue, nxt_conn_proxy_read,
+ task, source, source->socket.data);
+}
+
+
+static void
+nxt_conn_proxy_read_add(nxt_conn_t *c, nxt_buf_t *b)
+{
+ nxt_buf_t *first, *second;
+
+ first = c->read;
+
+ if (first == NULL) {
+ c->read = b;
+ return;
+ }
+
+ /*
+ * A event conn proxy maintains a buffer per each direction.
+ * The buffer is divided by read and write parts. These parts are
+ * linked in buffer chains. There can be no more than two buffers
+ * in read chain at any time, because an added buffer is coalesced
+ * with the last buffer if possible. The first and the second
+ * buffers are also coalesced if possible.
+ */
+
+ second = first->next;
+
+ if (second == NULL) {
+
+ if (first->mem.start == b->mem.end) {
+ /*
+ * The added buffer is just before the first buffer, so expand
+ * the first buffer to the beginning of the added buffer.
+ */
+ first->mem.pos = b->mem.start;
+ first->mem.free = b->mem.start;
+ first->mem.start = b->mem.start;
+
+ } else if (first->mem.end == b->mem.start) {
+ /*
+ * The added buffer is just after the first buffer, so
+ * expand the first buffer to the end of the added buffer.
+ */
+ first->mem.end = b->mem.end;
+
+ } else {
+ first->next = b;
+ return;
+ }
+
+ } else {
+ if (second->mem.end != b->mem.start) {
+ nxt_thread_log_alert("event conn proxy read: second buffer end:%p "
+ "is not equal to added buffer start:%p",
+ second->mem.end, b->mem.start);
+ return;
+ }
+
+ /*
+ * The added buffer is just after the second buffer, so
+ * expand the second buffer to the end of the added buffer.
+ */
+ second->mem.end = b->mem.end;
+
+ if (first->mem.start == second->mem.end) {
+ /*
+ * The second buffer is just before the first buffer, so expand
+ * the first buffer to the beginning of the second buffer.
+ */
+ first->mem.pos = second->mem.start;
+ first->mem.free = second->mem.start;
+ first->mem.start = second->mem.start;
+ first->next = NULL;
+
+ nxt_buf_free(c->mem_pool, second);
+ }
+ }
+
+ nxt_buf_free(c->mem_pool, b);
+}
+
+
+static void
+nxt_conn_proxy_close(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_buf_t *b;
+ nxt_conn_t *source, *sink;
+ nxt_conn_proxy_t *p;
+
+ source = obj;
+ p = data;
+
+ nxt_debug(task, "conn proxy close fd:%d", source->socket.fd);
+
+ sink = (source == p->client) ? p->peer : p->client;
+
+ if (sink->write == NULL) {
+ nxt_conn_proxy_shutdown(task, p, source, sink);
+ return;
+ }
+
+ b = nxt_buf_sync_alloc(source->mem_pool, 0);
+ if (b == NULL) {
+ /* An error completion. */
+ nxt_conn_proxy_complete(task, p);
+ return;
+ }
+
+ nxt_buf_chain_add(&sink->write, b);
+}
+
+
+static void
+nxt_conn_proxy_error(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_conn_t *c;
+ nxt_conn_proxy_t *p;
+
+ c = obj;
+ p = data;
+
+ nxt_debug(task, "conn proxy error fd:%d", c->socket.fd);
+
+ nxt_conn_proxy_close(task, c, p);
+}
+
+
+static void
+nxt_conn_proxy_read_timeout(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_conn_t *c;
+ nxt_timer_t *timer;
+
+ timer = obj;
+
+ c = nxt_read_timer_conn(timer);
+ c->socket.timedout = 1;
+ c->socket.closed = 1;
+
+ nxt_debug(task, "conn proxy read timeout fd:%d", c->socket.fd);
+
+ nxt_conn_proxy_close(task, c, c->socket.data);
+}
+
+
+static void
+nxt_conn_proxy_write_timeout(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_conn_t *c;
+ nxt_timer_t *timer;
+
+ timer = obj;
+
+ c = nxt_write_timer_conn(timer);
+ c->socket.timedout = 1;
+ c->socket.closed = 1;
+
+ nxt_debug(task, "conn proxy write timeout fd:%d", c->socket.fd);
+
+ nxt_conn_proxy_close(task, c, c->socket.data);
+}
+
+
+static nxt_msec_t
+nxt_conn_proxy_timeout_value(nxt_conn_t *c, uintptr_t data)
+{
+ nxt_msec_t *timer;
+ nxt_conn_proxy_t *p;
+
+ p = c->socket.data;
+
+ timer = (nxt_msec_t *) ((char *) p + data);
+
+ return *timer;
+}
+
+
+static void
+nxt_conn_proxy_refused(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_conn_t *peer;
+ nxt_conn_proxy_t *p;
+
+ peer = obj;
+ p = data;
+
+ nxt_debug(task, "conn proxy refused fd:%d", peer->socket.fd);
+
+ if (p->retries == 0) {
+ /* An error completion. */
+ nxt_conn_proxy_complete(task, p);
+ return;
+ }
+
+ p->retries--;
+
+ nxt_socket_close(task, peer->socket.fd);
+ peer->socket.fd = -1;
+ peer->socket.error = 0;
+
+ p->delayed = 1;
+
+ peer->write_timer.handler = nxt_conn_proxy_reconnect_handler;
+ nxt_timer_add(task->thread->engine, &peer->write_timer,
+ p->reconnect_timeout);
+}
+
+
+static void
+nxt_conn_proxy_reconnect_handler(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_conn_t *peer;
+ nxt_timer_t *timer;
+ nxt_conn_proxy_t *p;
+
+ timer = obj;
+
+ nxt_debug(task, "conn proxy reconnect timer");
+
+ peer = nxt_write_timer_conn(timer);
+ p = peer->socket.data;
+
+ if (p->client->socket.closed) {
+ nxt_conn_proxy_complete(task, p);
+ return;
+ }
+
+ p->delayed = 0;
+
+ peer->write_state = &nxt_conn_proxy_peer_connect_state;
+ /*
+ * Peer read event: disabled.
+ * Peer write event: waiting for connection with connect_timeout.
+ */
+ nxt_conn_connect(task->thread->engine, peer);
+}
+
+
+static void
+nxt_conn_proxy_shutdown(nxt_task_t *task, nxt_conn_proxy_t *p,
+ nxt_conn_t *source, nxt_conn_t *sink)
+{
+ nxt_buf_t *b;
+
+ nxt_debug(source->socket.task,
+ "conn proxy shutdown source fd:%d cl:%d err:%d",
+ source->socket.fd, source->socket.closed, source->socket.error);
+
+ nxt_debug(sink->socket.task,
+ "conn proxy shutdown sink fd:%d cl:%d err:%d",
+ sink->socket.fd, sink->socket.closed, sink->socket.error);
+
+ if (!p->connected || p->delayed) {
+ nxt_conn_proxy_complete(task, p);
+ return;
+ }
+
+ if (sink->socket.error == 0 && !sink->socket.closed) {
+ sink->socket.shutdown = 1;
+ nxt_socket_shutdown(task, sink->socket.fd, SHUT_WR);
+ }
+
+ if (sink->socket.error != 0
+ || (sink->socket.closed && source->write == NULL))
+ {
+ /* The opposite direction also has been already closed. */
+ nxt_conn_proxy_complete(task, p);
+ return;
+ }
+
+ nxt_debug(source->socket.task, "free source buffer");
+
+ /* Free the direction's buffer. */
+ b = (source == p->client) ? p->client_buffer : p->peer_buffer;
+ nxt_mem_free(source->mem_pool, b);
+}
+
+
+static void
+nxt_conn_proxy_read_error(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_conn_t *c;
+ nxt_conn_proxy_t *p;
+
+ c = obj;
+ p = data;
+
+ nxt_debug(task, "conn proxy read error fd:%d", c->socket.fd);
+
+ nxt_conn_proxy_close(task, c, p);
+}
+
+
+static void
+nxt_conn_proxy_write_error(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_conn_t *source, *sink;
+ nxt_conn_proxy_t *p;
+
+ sink = obj;
+ p = data;
+
+ nxt_debug(task, "conn proxy write error fd:%d", sink->socket.fd);
+
+ /* Clear data for the direction sink. */
+ sink->write = NULL;
+
+ /* Block the direction source. */
+ source = (sink == p->client) ? p->peer : p->client;
+ nxt_fd_event_block_read(task->thread->engine, &source->socket);
+
+ if (source->write == NULL) {
+ /*
+ * There is no data for the opposite direction and
+ * the next read from the sink will most probably fail.
+ */
+ nxt_conn_proxy_complete(task, p);
+ }
+}
+
+
+static const nxt_conn_state_t nxt_conn_proxy_close_state
+ nxt_aligned(64) =
+{
+ .ready_handler = nxt_conn_proxy_completion,
+};
+
+
+static void
+nxt_conn_proxy_complete(nxt_task_t *task, nxt_conn_proxy_t *p)
+{
+ nxt_event_engine_t *engine;
+
+ engine = task->thread->engine;
+
+ nxt_debug(p->client->socket.task, "conn proxy complete %d:%d",
+ p->client->socket.fd, p->peer->socket.fd);
+
+ if (p->delayed) {
+ p->delayed = 0;
+ nxt_queue_remove(&p->peer->link);
+ }
+
+ if (p->client->socket.fd != -1) {
+ p->retain = 1;
+ p->client->write_state = &nxt_conn_proxy_close_state;
+ nxt_conn_close(engine, p->client);
+ }
+
+ if (p->peer->socket.fd != -1) {
+ p->retain++;
+ p->peer->write_state = &nxt_conn_proxy_close_state;
+ nxt_conn_close(engine, p->peer);
+ }
+}
+
+
+static void
+nxt_conn_proxy_completion(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_conn_proxy_t *p;
+
+ p = data;
+
+ nxt_debug(p->client->socket.task, "conn proxy completion %d:%d:%d",
+ p->retain, p->client->socket.fd, p->peer->socket.fd);
+
+ p->retain--;
+
+ if (p->retain == 0) {
+ nxt_mem_free(p->client->mem_pool, p->client_buffer);
+ nxt_mem_free(p->client->mem_pool, p->peer_buffer);
+
+ p->completion_handler(task, p, NULL);
+ }
+}
diff --git a/src/nxt_event_conn_read.c b/src/nxt_conn_read.c
index be2dfdb2..e7134b99 100644
--- a/src/nxt_event_conn_read.c
+++ b/src/nxt_conn_read.c
@@ -8,10 +8,10 @@
void
-nxt_conn_wait(nxt_event_conn_t *c)
+nxt_conn_wait(nxt_conn_t *c)
{
- nxt_event_engine_t *engine;
- const nxt_event_conn_state_t *state;
+ nxt_event_engine_t *engine;
+ const nxt_conn_state_t *state;
nxt_debug(c->socket.task, "conn wait fd:%d rdy:%d",
c->socket.fd, c->socket.read_ready);
@@ -28,26 +28,26 @@ nxt_conn_wait(nxt_event_conn_t *c)
c->socket.read_handler = state->ready_handler;
c->socket.error_handler = state->error_handler;
- nxt_event_conn_timer(engine, c, state, &c->read_timer);
+ nxt_conn_timer(engine, c, state, &c->read_timer);
nxt_fd_event_enable_read(engine, &c->socket);
}
void
-nxt_event_conn_io_read(nxt_task_t *task, void *obj, void *data)
+nxt_conn_io_read(nxt_task_t *task, void *obj, void *data)
{
- ssize_t n;
- nxt_buf_t *b;
- nxt_work_queue_t *wq;
- nxt_event_conn_t *c;
- nxt_event_engine_t *engine;
- nxt_work_handler_t handler;
- const nxt_event_conn_state_t *state;
+ ssize_t n;
+ nxt_buf_t *b;
+ nxt_conn_t *c;
+ nxt_work_queue_t *wq;
+ nxt_event_engine_t *engine;
+ nxt_work_handler_t handler;
+ const nxt_conn_state_t *state;
c = obj;
- nxt_debug(task, "event conn read fd:%d rdy:%d cl:%d",
+ nxt_debug(task, "conn read fd:%d rdy:%d cl:%d",
c->socket.fd, c->socket.read_ready, c->socket.closed);
engine = task->thread->engine;
@@ -99,9 +99,8 @@ nxt_event_conn_io_read(nxt_task_t *task, void *obj, void *data)
}
/*
- * Here c->io->read() is assigned instead of direct
- * nxt_event_conn_io_read() because the function can
- * be called by nxt_kqueue_event_conn_io_read().
+ * Here c->io->read() is assigned instead of direct nxt_conn_io_read()
+ * because the function can be called by nxt_kqueue_conn_io_read().
*/
c->socket.read_handler = c->io->read;
c->socket.error_handler = state->error_handler;
@@ -110,7 +109,7 @@ nxt_event_conn_io_read(nxt_task_t *task, void *obj, void *data)
|| nxt_fd_event_is_disabled(c->socket.read))
{
/* Timer may be set or reset. */
- nxt_event_conn_timer(engine, c, state, &c->read_timer);
+ nxt_conn_timer(engine, c, state, &c->read_timer);
if (nxt_fd_event_is_disabled(c->socket.read)) {
nxt_fd_event_enable_read(engine, &c->socket);
@@ -122,7 +121,7 @@ nxt_event_conn_io_read(nxt_task_t *task, void *obj, void *data)
ssize_t
-nxt_event_conn_io_recvbuf(nxt_event_conn_t *c, nxt_buf_t *b)
+nxt_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b)
{
ssize_t n;
nxt_err_t err;
@@ -139,7 +138,7 @@ nxt_event_conn_io_recvbuf(nxt_event_conn_t *c, nxt_buf_t *b)
if (niov == 1) {
/* Disposal of surplus kernel iovec copy-in operation. */
- return nxt_event_conn_io_recv(c, iov->iov_base, iov->iov_len, 0);
+ return nxt_conn_io_recv(c, iov->iov_base, iov->iov_len, 0);
}
for ( ;; ) {
@@ -188,8 +187,7 @@ nxt_event_conn_io_recvbuf(nxt_event_conn_t *c, nxt_buf_t *b)
ssize_t
-nxt_event_conn_io_recv(nxt_event_conn_t *c, void *buf, size_t size,
- nxt_uint_t flags)
+nxt_conn_io_recv(nxt_conn_t *c, void *buf, size_t size, nxt_uint_t flags)
{
ssize_t n;
nxt_err_t err;
diff --git a/src/nxt_event_conn_write.c b/src/nxt_conn_write.c
index fa5b9241..a2a5737b 100644
--- a/src/nxt_event_conn_write.c
+++ b/src/nxt_conn_write.c
@@ -7,7 +7,7 @@
#include <nxt_main.h>
-static void nxt_event_conn_write_timer_handler(nxt_task_t *task, void *obj,
+static void nxt_conn_write_timer_handler(nxt_task_t *task, void *obj,
void *data);
@@ -16,13 +16,13 @@ nxt_conn_io_write(nxt_task_t *task, void *obj, void *data)
{
ssize_t ret;
nxt_buf_t *b;
+ nxt_conn_t *c;
nxt_sendbuf_t sb;
- nxt_event_conn_t *c;
nxt_event_engine_t *engine;
c = obj;
- nxt_debug(task, "event conn write fd:%d", c->socket.fd);
+ nxt_debug(task, "conn write fd:%d", c->socket.fd);
if (!c->socket.write_ready || c->write == NULL) {
return;
@@ -89,7 +89,7 @@ nxt_conn_io_write(nxt_task_t *task, void *obj, void *data)
* Postpone writing until next event poll to allow to
* process other recevied events and to get new events.
*/
- c->write_timer.handler = nxt_event_conn_write_timer_handler;
+ c->write_timer.handler = nxt_conn_write_timer_handler;
nxt_timer_add(engine, &c->write_timer, 0);
} else if (ret == NXT_AGAIN) {
@@ -100,7 +100,7 @@ nxt_conn_io_write(nxt_task_t *task, void *obj, void *data)
* can be set here because it should be set only for write
* direction.
*/
- nxt_event_conn_timer(engine, c, c->write_state, &c->write_timer);
+ nxt_conn_timer(engine, c, c->write_state, &c->write_timer);
if (nxt_fd_event_is_disabled(c->socket.write)) {
nxt_fd_event_enable_write(engine, &c->socket);
@@ -128,8 +128,133 @@ nxt_conn_io_write(nxt_task_t *task, void *obj, void *data)
}
+static void
+nxt_conn_write_timer_handler(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_conn_t *c;
+ nxt_timer_t *timer;
+
+ timer = obj;
+
+ nxt_debug(task, "event conn conn timer");
+
+ c = nxt_write_timer_conn(timer);
+ c->delayed = 0;
+
+ c->io->write(task, c, c->socket.data);
+}
+
+
+ssize_t
+nxt_conn_io_sendbuf(nxt_task_t *task, nxt_sendbuf_t *sb)
+{
+ nxt_uint_t niov;
+ struct iovec iov[NXT_IOBUF_MAX];
+
+ niov = nxt_sendbuf_mem_coalesce0(task, sb, iov, NXT_IOBUF_MAX);
+
+ if (niov == 0 && sb->sync) {
+ return 0;
+ }
+
+ return nxt_conn_io_writev(task, sb, iov, niov);
+}
+
+
+ssize_t
+nxt_conn_io_writev(nxt_task_t *task, nxt_sendbuf_t *sb, struct iovec *iov,
+ nxt_uint_t niov)
+{
+ ssize_t n;
+ nxt_err_t err;
+
+ if (niov == 1) {
+ /* Disposal of surplus kernel iovec copy-in operation. */
+ return nxt_conn_io_send(task, sb, iov[0].iov_base, iov[0].iov_len);
+ }
+
+ for ( ;; ) {
+ n = writev(sb->socket, iov, niov);
+
+ err = (n == -1) ? nxt_socket_errno : 0;
+
+ nxt_debug(task, "writev(%d, %ui): %d", sb->socket, niov, n);
+
+ if (n > 0) {
+ return n;
+ }
+
+ /* n == -1 */
+
+ switch (err) {
+
+ case NXT_EAGAIN:
+ sb->ready = 0;
+ nxt_debug(task, "writev() %E", err);
+
+ return NXT_AGAIN;
+
+ case NXT_EINTR:
+ nxt_debug(task, "writev() %E", err);
+ continue;
+
+ default:
+ sb->error = err;
+ nxt_log(task, nxt_socket_error_level(err),
+ "writev(%d, %ui) failed %E", sb->socket, niov, err);
+
+ return NXT_ERROR;
+ }
+ }
+}
+
+
+ssize_t
+nxt_conn_io_send(nxt_task_t *task, nxt_sendbuf_t *sb, void *buf, size_t size)
+{
+ ssize_t n;
+ nxt_err_t err;
+
+ for ( ;; ) {
+ n = send(sb->socket, buf, size, 0);
+
+ err = (n == -1) ? nxt_socket_errno : 0;
+
+ nxt_debug(task, "send(%d, %p, %uz): %z", sb->socket, buf, size, n);
+
+ if (n > 0) {
+ return n;
+ }
+
+ /* n == -1 */
+
+ switch (err) {
+
+ case NXT_EAGAIN:
+ sb->ready = 0;
+ nxt_debug(task, "send() %E", err);
+
+ return NXT_AGAIN;
+
+ case NXT_EINTR:
+ nxt_debug(task, "send() %E", err);
+ continue;
+
+ default:
+ sb->error = err;
+ nxt_log(task, nxt_socket_error_level(err),
+ "send(%d, %p, %uz) failed %E", sb->socket, buf, size, err);
+
+ return NXT_ERROR;
+ }
+ }
+}
+
+
+/* Obsolete interfaces. */
+
size_t
-nxt_event_conn_write_limit(nxt_event_conn_t *c)
+nxt_event_conn_write_limit(nxt_conn_t *c)
{
ssize_t limit, correction;
nxt_event_write_rate_t *rate;
@@ -162,32 +287,15 @@ nxt_event_conn_write_limit(nxt_event_conn_t *c)
nxt_bool_t
-nxt_event_conn_write_delayed(nxt_event_engine_t *engine, nxt_event_conn_t *c,
+nxt_event_conn_write_delayed(nxt_event_engine_t *engine, nxt_conn_t *c,
size_t sent)
{
return 0;
}
-static void
-nxt_event_conn_write_timer_handler(nxt_task_t *task, void *obj, void *data)
-{
- nxt_timer_t *ev;
- nxt_event_conn_t *c;
-
- ev = obj;
-
- nxt_debug(task, "event conn conn timer");
-
- c = nxt_event_write_timer_conn(ev);
- c->delayed = 0;
-
- c->io->write(task, c, c->socket.data);
-}
-
-
ssize_t
-nxt_event_conn_io_write_chunk(nxt_event_conn_t *c, nxt_buf_t *b, size_t limit)
+nxt_event_conn_io_write_chunk(nxt_conn_t *c, nxt_buf_t *b, size_t limit)
{
ssize_t ret;
@@ -204,7 +312,7 @@ nxt_event_conn_io_write_chunk(nxt_event_conn_t *c, nxt_buf_t *b, size_t limit)
ssize_t
-nxt_event_conn_io_sendbuf(nxt_event_conn_t *c, nxt_buf_t *b, size_t limit)
+nxt_event_conn_io_sendbuf(nxt_conn_t *c, nxt_buf_t *b, size_t limit)
{
nxt_uint_t niob;
struct iovec iob[NXT_IOBUF_MAX];
@@ -228,7 +336,7 @@ nxt_event_conn_io_sendbuf(nxt_event_conn_t *c, nxt_buf_t *b, size_t limit)
ssize_t
-nxt_event_conn_io_writev(nxt_event_conn_t *c, nxt_iobuf_t *iob, nxt_uint_t niob)
+nxt_event_conn_io_writev(nxt_conn_t *c, nxt_iobuf_t *iob, nxt_uint_t niob)
{
ssize_t n;
nxt_err_t err;
@@ -273,7 +381,7 @@ nxt_event_conn_io_writev(nxt_event_conn_t *c, nxt_iobuf_t *iob, nxt_uint_t niob)
ssize_t
-nxt_event_conn_io_send(nxt_event_conn_t *c, void *buf, size_t size)
+nxt_event_conn_io_send(nxt_conn_t *c, void *buf, size_t size)
{
ssize_t n;
nxt_err_t err;
@@ -312,109 +420,3 @@ nxt_event_conn_io_send(nxt_event_conn_t *c, void *buf, size_t size)
}
}
}
-
-
-ssize_t
-nxt_conn_io_sendbuf(nxt_task_t *task, nxt_sendbuf_t *sb)
-{
- nxt_uint_t niov;
- struct iovec iov[NXT_IOBUF_MAX];
-
- niov = nxt_sendbuf_mem_coalesce0(task, sb, iov, NXT_IOBUF_MAX);
-
- if (niov == 0 && sb->sync) {
- return 0;
- }
-
- return nxt_conn_io_writev(task, sb, iov, niov);
-}
-
-
-ssize_t
-nxt_conn_io_writev(nxt_task_t *task, nxt_sendbuf_t *sb, struct iovec *iov,
- nxt_uint_t niov)
-{
- ssize_t n;
- nxt_err_t err;
-
- if (niov == 1) {
- /* Disposal of surplus kernel iovec copy-in operation. */
- return nxt_conn_io_send(task, sb, iov[0].iov_base, iov[0].iov_len);
- }
-
- for ( ;; ) {
- n = writev(sb->socket, iov, niov);
-
- err = (n == -1) ? nxt_socket_errno : 0;
-
- nxt_debug(task, "writev(%d, %ui): %d", sb->socket, niov, n);
-
- if (n > 0) {
- return n;
- }
-
- /* n == -1 */
-
- switch (err) {
-
- case NXT_EAGAIN:
- sb->ready = 0;
- nxt_debug(task, "writev() %E", err);
-
- return NXT_AGAIN;
-
- case NXT_EINTR:
- nxt_debug(task, "writev() %E", err);
- continue;
-
- default:
- sb->error = err;
- nxt_log(task, nxt_socket_error_level(err),
- "writev(%d, %ui) failed %E", sb->socket, niov, err);
-
- return NXT_ERROR;
- }
- }
-}
-
-
-ssize_t
-nxt_conn_io_send(nxt_task_t *task, nxt_sendbuf_t *sb, void *buf, size_t size)
-{
- ssize_t n;
- nxt_err_t err;
-
- for ( ;; ) {
- n = send(sb->socket, buf, size, 0);
-
- err = (n == -1) ? nxt_socket_errno : 0;
-
- nxt_debug(task, "send(%d, %p, %uz): %z", sb->socket, buf, size, n);
-
- if (n > 0) {
- return n;
- }
-
- /* n == -1 */
-
- switch (err) {
-
- case NXT_EAGAIN:
- sb->ready = 0;
- nxt_debug(task, "send() %E", err);
-
- return NXT_AGAIN;
-
- case NXT_EINTR:
- nxt_debug(task, "send() %E", err);
- continue;
-
- default:
- sb->error = err;
- nxt_log(task, nxt_socket_error_level(err),
- "send(%d, %p, %uz) failed %E", sb->socket, buf, size, err);
-
- return NXT_ERROR;
- }
- }
-}
diff --git a/src/nxt_controller.c b/src/nxt_controller.c
index e31e2e15..6ec4c744 100644
--- a/src/nxt_controller.c
+++ b/src/nxt_controller.c
@@ -34,7 +34,7 @@ typedef struct {
static void nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data);
static void nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data);
-static nxt_msec_t nxt_controller_conn_timeout_value(nxt_event_conn_t *c,
+static nxt_msec_t nxt_controller_conn_timeout_value(nxt_conn_t *c,
uintptr_t data);
static void nxt_controller_conn_read_error(nxt_task_t *task, void *obj,
void *data);
@@ -54,8 +54,8 @@ static nxt_int_t nxt_controller_request_content_length(void *ctx,
nxt_http_field_t *field, uintptr_t data, nxt_log_t *log);
static void nxt_controller_process_request(nxt_task_t *task,
- nxt_event_conn_t *c, nxt_controller_request_t *r);
-static nxt_int_t nxt_controller_response(nxt_task_t *task, nxt_event_conn_t *c,
+ nxt_conn_t *c, nxt_controller_request_t *r);
+static nxt_int_t nxt_controller_response(nxt_task_t *task, nxt_conn_t *c,
nxt_controller_response_t *resp);
static nxt_buf_t *nxt_controller_response_body(nxt_controller_response_t *resp,
nxt_mem_pool_t *pool);
@@ -184,7 +184,7 @@ nxt_runtime_controller_socket(nxt_task_t *task, nxt_runtime_t *rt)
*/
ls->mem_pool_size = nxt_listen_socket_pool_min_size(ls)
+ sizeof(nxt_event_conn_proxy_t)
- + sizeof(nxt_event_conn_t)
+ + sizeof(nxt_conn_t)
+ 4 * sizeof(nxt_buf_t);
if (nxt_listen_socket_create(task, ls, 0) != NXT_OK) {
@@ -201,7 +201,7 @@ static void
nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data)
{
nxt_buf_t *b;
- nxt_event_conn_t *c;
+ nxt_conn_t *c;
nxt_event_engine_t *engine;
nxt_controller_request_t *r;
@@ -237,7 +237,7 @@ nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data)
c->read_work_queue = &engine->read_work_queue;
c->write_work_queue = &engine->write_work_queue;
- nxt_event_conn_read(engine, c);
+ nxt_conn_read(engine, c);
}
@@ -260,7 +260,7 @@ nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data)
size_t preread;
nxt_buf_t *b;
nxt_int_t rc;
- nxt_event_conn_t *c;
+ nxt_conn_t *c;
nxt_controller_request_t *r;
c = obj;
@@ -284,7 +284,7 @@ nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data)
return;
}
- nxt_event_conn_read(task->thread->engine, c);
+ nxt_conn_read(task->thread->engine, c);
return;
}
@@ -329,12 +329,12 @@ nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data)
c->read_state = &nxt_controller_conn_body_read_state;
- nxt_event_conn_read(task->thread->engine, c);
+ nxt_conn_read(task->thread->engine, c);
}
static nxt_msec_t
-nxt_controller_conn_timeout_value(nxt_event_conn_t *c, uintptr_t data)
+nxt_controller_conn_timeout_value(nxt_conn_t *c, uintptr_t data)
{
return (nxt_msec_t) data;
}
@@ -343,7 +343,7 @@ nxt_controller_conn_timeout_value(nxt_event_conn_t *c, uintptr_t data)
static void
nxt_controller_conn_read_error(nxt_task_t *task, void *obj, void *data)
{
- nxt_event_conn_t *c;
+ nxt_conn_t *c;
c = obj;
@@ -356,12 +356,12 @@ nxt_controller_conn_read_error(nxt_task_t *task, void *obj, void *data)
static void
nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj, void *data)
{
- nxt_timer_t *ev;
- nxt_event_conn_t *c;
+ nxt_timer_t *timer;
+ nxt_conn_t *c;
- ev = obj;
+ timer = obj;
- c = nxt_event_read_timer_conn(ev);
+ c = nxt_read_timer_conn(timer);
c->socket.timedout = 1;
c->socket.closed = 1;
@@ -388,9 +388,9 @@ static const nxt_event_conn_state_t nxt_controller_conn_body_read_state
static void
nxt_controller_conn_body_read(nxt_task_t *task, void *obj, void *data)
{
- size_t rest;
- nxt_buf_t *b;
- nxt_event_conn_t *c;
+ size_t rest;
+ nxt_buf_t *b;
+ nxt_conn_t *c;
c = obj;
@@ -409,7 +409,7 @@ nxt_controller_conn_body_read(nxt_task_t *task, void *obj, void *data)
nxt_debug(task, "controller conn body read again, rest: %uz", rest);
- nxt_event_conn_read(task->thread->engine, c);
+ nxt_conn_read(task->thread->engine, c);
}
@@ -429,8 +429,8 @@ static const nxt_event_conn_state_t nxt_controller_conn_write_state
static void
nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data)
{
- nxt_buf_t *b;
- nxt_event_conn_t *c;
+ nxt_buf_t *b;
+ nxt_conn_t *c;
c = obj;
@@ -439,7 +439,7 @@ nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data)
b = c->write;
if (b->mem.pos != b->mem.free) {
- nxt_event_conn_write(task->thread->engine, c);
+ nxt_conn_write(task->thread->engine, c);
return;
}
@@ -452,7 +452,7 @@ nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data)
static void
nxt_controller_conn_write_error(nxt_task_t *task, void *obj, void *data)
{
- nxt_event_conn_t *c;
+ nxt_conn_t *c;
c = obj;
@@ -465,12 +465,12 @@ nxt_controller_conn_write_error(nxt_task_t *task, void *obj, void *data)
static void
nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj, void *data)
{
- nxt_timer_t *ev;
- nxt_event_conn_t *c;
+ nxt_conn_t *c;
+ nxt_timer_t *timer;
- ev = obj;
+ timer = obj;
- c = nxt_event_write_timer_conn(ev);
+ c = nxt_write_timer_conn(timer);
c->socket.timedout = 1;
c->socket.closed = 1;
@@ -490,7 +490,7 @@ static const nxt_event_conn_state_t nxt_controller_conn_close_state
static void
nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data)
{
- nxt_event_conn_t *c;
+ nxt_conn_t *c;
c = obj;
@@ -500,14 +500,14 @@ nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data)
c->write_state = &nxt_controller_conn_close_state;
- nxt_event_conn_close(task->thread->engine, c);
+ nxt_conn_close(task->thread->engine, c);
}
static void
nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data)
{
- nxt_event_conn_t *c;
+ nxt_conn_t *c;
c = obj;
@@ -544,7 +544,7 @@ nxt_controller_request_content_length(void *ctx, nxt_http_field_t *field,
static void
-nxt_controller_process_request(nxt_task_t *task, nxt_event_conn_t *c,
+nxt_controller_process_request(nxt_task_t *task, nxt_conn_t *c,
nxt_controller_request_t *req)
{
nxt_int_t rc;
@@ -737,7 +737,7 @@ done:
static nxt_int_t
-nxt_controller_response(nxt_task_t *task, nxt_event_conn_t *c,
+nxt_controller_response(nxt_task_t *task, nxt_conn_t *c,
nxt_controller_response_t *resp)
{
size_t size;
@@ -765,7 +765,7 @@ nxt_controller_response(nxt_task_t *task, nxt_event_conn_t *c,
c->write = b;
c->write_state = &nxt_controller_conn_write_state;
- nxt_event_conn_write(task->thread->engine, c);
+ nxt_conn_write(task->thread->engine, c);
return NXT_OK;
}
diff --git a/src/nxt_devpoll_engine.c b/src/nxt_devpoll_engine.c
index 37435661..faa6368a 100644
--- a/src/nxt_devpoll_engine.c
+++ b/src/nxt_devpoll_engine.c
@@ -85,7 +85,7 @@ const nxt_event_interface_t nxt_devpoll_engine = {
NULL,
nxt_devpoll_poll,
- &nxt_unix_event_conn_io,
+ &nxt_unix_conn_io,
NXT_NO_FILE_EVENTS,
NXT_NO_SIGNAL_EVENTS,
diff --git a/src/nxt_epoll_engine.c b/src/nxt_epoll_engine.c
index 4ca878e6..410c542d 100644
--- a/src/nxt_epoll_engine.c
+++ b/src/nxt_epoll_engine.c
@@ -38,10 +38,9 @@ static nxt_int_t nxt_epoll_edge_create(nxt_event_engine_t *engine,
static nxt_int_t nxt_epoll_level_create(nxt_event_engine_t *engine,
nxt_uint_t mchanges, nxt_uint_t mevents);
static nxt_int_t nxt_epoll_create(nxt_event_engine_t *engine,
- nxt_uint_t mchanges, nxt_uint_t mevents, nxt_event_conn_io_t *io,
- uint32_t mode);
+ nxt_uint_t mchanges, nxt_uint_t mevents, nxt_conn_io_t *io, uint32_t mode);
static void nxt_epoll_test_accept4(nxt_event_engine_t *engine,
- nxt_event_conn_io_t *io);
+ nxt_conn_io_t *io);
static void nxt_epoll_free(nxt_event_engine_t *engine);
static void nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
static void nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
@@ -83,28 +82,27 @@ static void nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo);
static void nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout);
#if (NXT_HAVE_ACCEPT4)
-static void nxt_epoll_event_conn_io_accept4(nxt_task_t *task, void *obj,
+static void nxt_epoll_conn_io_accept4(nxt_task_t *task, void *obj,
void *data);
#endif
#if (NXT_HAVE_EPOLL_EDGE)
-static void nxt_epoll_edge_event_conn_io_connect(nxt_task_t *task, void *obj,
+static void nxt_epoll_edge_conn_io_connect(nxt_task_t *task, void *obj,
void *data);
-static void nxt_epoll_edge_event_conn_connected(nxt_task_t *task, void *obj,
+static void nxt_epoll_edge_conn_connected(nxt_task_t *task, void *obj,
void *data);
-static ssize_t nxt_epoll_edge_event_conn_io_recvbuf(nxt_event_conn_t *c,
- nxt_buf_t *b);
+static ssize_t nxt_epoll_edge_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b);
-static nxt_event_conn_io_t nxt_epoll_edge_event_conn_io = {
- nxt_epoll_edge_event_conn_io_connect,
- nxt_event_conn_io_accept,
+static nxt_conn_io_t nxt_epoll_edge_conn_io = {
+ nxt_epoll_edge_conn_io_connect,
+ nxt_conn_io_accept,
- nxt_event_conn_io_read,
- nxt_epoll_edge_event_conn_io_recvbuf,
- nxt_event_conn_io_recv,
+ nxt_conn_io_read,
+ nxt_epoll_edge_conn_io_recvbuf,
+ nxt_conn_io_recv,
nxt_conn_io_write,
nxt_event_conn_io_write_chunk,
@@ -118,7 +116,7 @@ static nxt_event_conn_io_t nxt_epoll_edge_event_conn_io = {
nxt_event_conn_io_writev,
nxt_event_conn_io_send,
- nxt_event_conn_io_shutdown,
+ nxt_conn_io_shutdown,
};
@@ -150,7 +148,7 @@ const nxt_event_interface_t nxt_epoll_edge_engine = {
#endif
nxt_epoll_poll,
- &nxt_epoll_edge_event_conn_io,
+ &nxt_epoll_edge_conn_io,
#if (NXT_HAVE_INOTIFY)
NXT_FILE_EVENTS,
@@ -196,7 +194,7 @@ const nxt_event_interface_t nxt_epoll_level_engine = {
#endif
nxt_epoll_poll,
- &nxt_unix_event_conn_io,
+ &nxt_unix_conn_io,
#if (NXT_HAVE_INOTIFY)
NXT_FILE_EVENTS,
@@ -218,8 +216,7 @@ static nxt_int_t
nxt_epoll_edge_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
nxt_uint_t mevents)
{
- return nxt_epoll_create(engine, mchanges, mevents,
- &nxt_epoll_edge_event_conn_io,
+ return nxt_epoll_create(engine, mchanges, mevents, &nxt_epoll_edge_conn_io,
EPOLLET | EPOLLRDHUP);
}
@@ -231,13 +228,13 @@ nxt_epoll_level_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
nxt_uint_t mevents)
{
return nxt_epoll_create(engine, mchanges, mevents,
- &nxt_unix_event_conn_io, 0);
+ &nxt_unix_conn_io, 0);
}
static nxt_int_t
nxt_epoll_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
- nxt_uint_t mevents, nxt_event_conn_io_t *io, uint32_t mode)
+ nxt_uint_t mevents, nxt_conn_io_t *io, uint32_t mode)
{
engine->u.epoll.fd = -1;
engine->u.epoll.mode = mode;
@@ -290,7 +287,7 @@ fail:
static void
-nxt_epoll_test_accept4(nxt_event_engine_t *engine, nxt_event_conn_io_t *io)
+nxt_epoll_test_accept4(nxt_event_engine_t *engine, nxt_conn_io_t *io)
{
static nxt_work_handler_t handler;
@@ -303,7 +300,7 @@ nxt_epoll_test_accept4(nxt_event_engine_t *engine, nxt_event_conn_io_t *io)
(void) accept4(-1, NULL, NULL, SOCK_NONBLOCK);
if (nxt_errno != NXT_ENOSYS) {
- handler = nxt_epoll_event_conn_io_accept4;
+ handler = nxt_epoll_conn_io_accept4;
} else {
nxt_log(&engine->task, NXT_LOG_INFO, "accept4() failed %E",
@@ -985,19 +982,19 @@ nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout)
#if (NXT_HAVE_ACCEPT4)
static void
-nxt_epoll_event_conn_io_accept4(nxt_task_t *task, void *obj, void *data)
+nxt_epoll_conn_io_accept4(nxt_task_t *task, void *obj, void *data)
{
- socklen_t len;
- nxt_socket_t s;
- struct sockaddr *sa;
- nxt_event_conn_t *c;
- nxt_event_conn_listen_t *cls;
+ socklen_t len;
+ nxt_conn_t *c;
+ nxt_socket_t s;
+ struct sockaddr *sa;
+ nxt_listen_event_t *lev;
- cls = obj;
- c = cls->next;
+ lev = obj;
+ c = lev->next;
- cls->ready--;
- cls->socket.read_ready = (cls->ready != 0);
+ lev->ready--;
+ lev->socket.read_ready = (lev->ready != 0);
len = c->remote->socklen;
@@ -1009,18 +1006,18 @@ nxt_epoll_event_conn_io_accept4(nxt_task_t *task, void *obj, void *data)
len = 0;
}
- s = accept4(cls->socket.fd, sa, &len, SOCK_NONBLOCK);
+ s = accept4(lev->socket.fd, sa, &len, SOCK_NONBLOCK);
if (s != -1) {
c->socket.fd = s;
- nxt_debug(task, "accept4(%d): %d", cls->socket.fd, s);
+ nxt_debug(task, "accept4(%d): %d", lev->socket.fd, s);
- nxt_event_conn_accept(task, cls, c);
+ nxt_conn_accept(task, lev, c);
return;
}
- nxt_event_conn_accept_error(task, cls, "accept4", nxt_errno);
+ nxt_conn_accept_error(task, lev, "accept4", nxt_errno);
}
#endif
@@ -1039,9 +1036,9 @@ nxt_epoll_event_conn_io_accept4(nxt_task_t *task, void *obj, void *data)
*/
static void
-nxt_epoll_edge_event_conn_io_connect(nxt_task_t *task, void *obj, void *data)
+nxt_epoll_edge_conn_io_connect(nxt_task_t *task, void *obj, void *data)
{
- nxt_event_conn_t *c;
+ nxt_conn_t *c;
nxt_event_engine_t *engine;
nxt_work_handler_t handler;
const nxt_event_conn_state_t *state;
@@ -1058,11 +1055,11 @@ nxt_epoll_edge_event_conn_io_connect(nxt_task_t *task, void *obj, void *data)
break;
case NXT_AGAIN:
- c->socket.write_handler = nxt_epoll_edge_event_conn_connected;
- c->socket.error_handler = nxt_event_conn_connect_error;
+ c->socket.write_handler = nxt_epoll_edge_conn_connected;
+ c->socket.error_handler = nxt_conn_connect_error;
engine = task->thread->engine;
- nxt_event_conn_timer(engine, c, state, &c->write_timer);
+ nxt_conn_timer(engine, c, state, &c->write_timer);
nxt_epoll_enable(engine, &c->socket);
c->socket.read = NXT_EVENT_BLOCKED;
@@ -1070,7 +1067,7 @@ nxt_epoll_edge_event_conn_io_connect(nxt_task_t *task, void *obj, void *data)
#if 0
case NXT_AGAIN:
- nxt_event_conn_timer(engine, c, state, &c->write_timer);
+ nxt_conn_timer(engine, c, state, &c->write_timer);
/* Fall through. */
@@ -1111,9 +1108,9 @@ nxt_epoll_edge_event_conn_io_connect(nxt_task_t *task, void *obj, void *data)
static void
-nxt_epoll_edge_event_conn_connected(nxt_task_t *task, void *obj, void *data)
+nxt_epoll_edge_conn_connected(nxt_task_t *task, void *obj, void *data)
{
- nxt_event_conn_t *c;
+ nxt_conn_t *c;
c = obj;
@@ -1131,22 +1128,22 @@ nxt_epoll_edge_event_conn_connected(nxt_task_t *task, void *obj, void *data)
return;
}
- nxt_event_conn_connect_test(task, c, data);
+ nxt_conn_connect_test(task, c, data);
}
/*
- * nxt_epoll_edge_event_conn_io_recvbuf() is just wrapper around
- * standard nxt_event_conn_io_recvbuf() to enforce to read a pending EOF
+ * nxt_epoll_edge_conn_io_recvbuf() is just wrapper around
+ * standard nxt_conn_io_recvbuf() to enforce to read a pending EOF
* in edge-triggered mode.
*/
static ssize_t
-nxt_epoll_edge_event_conn_io_recvbuf(nxt_event_conn_t *c, nxt_buf_t *b)
+nxt_epoll_edge_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b)
{
ssize_t n;
- n = nxt_event_conn_io_recvbuf(c, b);
+ n = nxt_conn_io_recvbuf(c, b);
if (n > 0 && c->socket.epoll_eof) {
c->socket.read_ready = 1;
diff --git a/src/nxt_event_conn_accept.c b/src/nxt_event_conn_accept.c
deleted file mode 100644
index ced2a3d8..00000000
--- a/src/nxt_event_conn_accept.c
+++ /dev/null
@@ -1,413 +0,0 @@
-
-/*
- * Copyright (C) Igor Sysoev
- * Copyright (C) NGINX, Inc.
- */
-
-#include <nxt_main.h>
-
-
-/*
- * A listen socket handler calls an event facility specific io_accept()
- * method. The method accept()s a new connection and then calls
- * nxt_event_conn_accept() to handle the new connection and to prepare
- * for a next connection to avoid just dropping next accept()ed socket
- * if no more connections allowed. If there are no available connections
- * an idle connection would be closed. If there are no idle connections
- * then new connections will not be accept()ed for 1 second.
- */
-
-
-static nxt_event_conn_t *nxt_event_conn_accept_alloc(nxt_task_t *task,
- nxt_event_conn_listen_t *cls);
-static void nxt_event_conn_listen_handler(nxt_task_t *task, void *obj,
- void *data);
-static nxt_event_conn_t *nxt_event_conn_accept_next(nxt_task_t *task,
- nxt_event_conn_listen_t *cls);
-static nxt_int_t nxt_event_conn_accept_close_idle(nxt_task_t *task,
- nxt_event_conn_listen_t *cls);
-static void nxt_event_conn_listen_event_error(nxt_task_t *task, void *obj,
- void *data);
-static void nxt_event_conn_listen_timer_handler(nxt_task_t *task, void *obj,
- void *data);
-
-
-nxt_event_conn_listen_t *
-nxt_listen_event(nxt_task_t *task, nxt_listen_socket_t *ls)
-{
- nxt_event_engine_t *engine;
- nxt_event_conn_listen_t *cls;
-
- cls = nxt_zalloc(sizeof(nxt_event_conn_listen_t));
-
- if (nxt_fast_path(cls != NULL)) {
- cls->socket.fd = ls->socket;
-
- engine = task->thread->engine;
- cls->batch = engine->batch;
-
- cls->socket.read_work_queue = &engine->accept_work_queue;
- cls->socket.read_handler = nxt_event_conn_listen_handler;
- cls->socket.error_handler = nxt_event_conn_listen_event_error;
- cls->socket.log = &nxt_main_log;
-
- cls->accept = engine->event.io->accept;
-
- cls->listen = ls;
- cls->work_queue = &engine->read_work_queue;
-
- cls->timer.work_queue = &engine->fast_work_queue;
- cls->timer.handler = nxt_event_conn_listen_timer_handler;
- cls->timer.log = &nxt_main_log;
-
- cls->task.thread = task->thread;
- cls->task.log = &nxt_main_log;
- cls->task.ident = nxt_task_next_ident();
- cls->socket.task = &cls->task;
- cls->timer.task = &cls->task;
-
- if (nxt_event_conn_accept_alloc(task, cls) != NULL) {
- nxt_fd_event_enable_accept(engine, &cls->socket);
-
- nxt_queue_insert_head(&engine->listen_connections, &cls->link);
- }
-
- return cls;
- }
-
- return NULL;
-}
-
-
-nxt_int_t
-nxt_event_conn_listen(nxt_task_t *task, nxt_listen_socket_t *ls)
-{
- nxt_event_engine_t *engine;
- nxt_event_conn_listen_t *cls;
-
- cls = nxt_zalloc(sizeof(nxt_event_conn_listen_t));
-
- if (nxt_fast_path(cls != NULL)) {
- cls->socket.fd = ls->socket;
-
- engine = task->thread->engine;
- cls->batch = engine->batch;
-
- cls->socket.read_work_queue = &engine->accept_work_queue;
- cls->socket.read_handler = nxt_event_conn_listen_handler;
- cls->socket.error_handler = nxt_event_conn_listen_event_error;
- cls->socket.log = &nxt_main_log;
-
- cls->accept = engine->event.io->accept;
-
- cls->listen = ls;
-
- cls->timer.work_queue = &engine->fast_work_queue;
- cls->timer.handler = nxt_event_conn_listen_timer_handler;
- cls->timer.log = &nxt_main_log;
-
- cls->task.thread = task->thread;
- cls->task.log = &nxt_main_log;
- cls->task.ident = nxt_task_next_ident();
- cls->socket.task = &cls->task;
- cls->timer.task = &cls->task;
-
- if (nxt_event_conn_accept_alloc(task, cls) != NULL) {
- nxt_fd_event_enable_accept(engine, &cls->socket);
-
- nxt_queue_insert_head(&engine->listen_connections, &cls->link);
- }
-
- return NXT_OK;
- }
-
- return NXT_ERROR;
-}
-
-
-static nxt_event_conn_t *
-nxt_event_conn_accept_alloc(nxt_task_t *task, nxt_event_conn_listen_t *cls)
-{
- nxt_sockaddr_t *sa, *remote;
- nxt_mem_pool_t *mp;
- nxt_event_conn_t *c;
- nxt_event_engine_t *engine;
- nxt_listen_socket_t *ls;
-
- engine = task->thread->engine;
-
- if (engine->connections < engine->max_connections) {
-
- mp = nxt_mem_pool_create(cls->listen->mem_pool_size);
-
- if (nxt_fast_path(mp != NULL)) {
- /* This allocation cannot fail. */
- c = nxt_event_conn_create(mp, cls->socket.task);
-
- cls->next = c;
- c->socket.read_work_queue = cls->socket.read_work_queue;
- c->socket.write_ready = 1;
- c->listen = cls;
-
- ls = cls->listen;
- /* This allocation cannot fail. */
- remote = nxt_sockaddr_alloc(mp, ls->socklen, ls->address_length);
- c->remote = remote;
-
- sa = ls->sockaddr;
- remote->type = sa->type;
- /*
- * Set address family for unspecified Unix domain,
- * because these sockaddr's are not be passed to accept().
- */
- remote->u.sockaddr.sa_family = sa->u.sockaddr.sa_family;
-
- return c;
- }
- }
-
- return NULL;
-}
-
-
-static void
-nxt_event_conn_listen_handler(nxt_task_t *task, void *obj, void *data)
-{
- nxt_event_conn_listen_t *cls;
-
- cls = obj;
- cls->ready = cls->batch;
-
- cls->accept(task, cls, data);
-}
-
-
-void
-nxt_event_conn_io_accept(nxt_task_t *task, void *obj, void *data)
-{
- socklen_t len;
- nxt_socket_t s;
- struct sockaddr *sa;
- nxt_event_conn_t *c;
- nxt_event_conn_listen_t *cls;
-
- cls = obj;
- c = cls->next;
-
- cls->ready--;
- cls->socket.read_ready = (cls->ready != 0);
-
- len = c->remote->socklen;
-
- if (len >= sizeof(struct sockaddr)) {
- sa = &c->remote->u.sockaddr;
-
- } else {
- sa = NULL;
- len = 0;
- }
-
- s = accept(cls->socket.fd, sa, &len);
-
- if (s == -1) {
- nxt_event_conn_accept_error(task, cls, "accept", nxt_socket_errno);
- return;
- }
-
- c->socket.fd = s;
-
-#if (NXT_LINUX)
- /*
- * Linux does not inherit non-blocking mode
- * from listen socket for accept()ed socket.
- */
- if (nxt_slow_path(nxt_socket_nonblocking(task, s) != NXT_OK)) {
- nxt_socket_close(task, s);
- }
-
-#endif
-
- nxt_debug(task, "accept(%d): %d", cls->socket.fd, s);
-
- nxt_event_conn_accept(task, cls, c);
-}
-
-
-void
-nxt_event_conn_accept(nxt_task_t *task, nxt_event_conn_listen_t *cls,
- nxt_event_conn_t *c)
-{
- nxt_event_conn_t *next;
-
- nxt_sockaddr_text(c->remote);
-
- nxt_debug(task, "client: %*s",
- c->remote->address_length, nxt_sockaddr_address(c->remote));
-
- nxt_queue_insert_head(&task->thread->engine->idle_connections, &c->link);
-
- c->read_work_queue = cls->work_queue;
- c->write_work_queue = cls->work_queue;
-
- if (cls->listen->read_after_accept) {
-
- //c->socket.read_ready = 1;
-// cls->listen->handler(task, c, cls->socket.data);
- nxt_work_queue_add(c->read_work_queue, cls->listen->handler,
- task, c, cls->socket.data);
-
- } else {
- nxt_work_queue_add(c->write_work_queue, cls->listen->handler,
- task, c, cls->socket.data);
- }
-
- next = nxt_event_conn_accept_next(task, cls);
-
- if (next != NULL && cls->socket.read_ready) {
- nxt_work_queue_add(cls->socket.read_work_queue,
- cls->accept, task, cls, next);
- }
-}
-
-
-static nxt_event_conn_t *
-nxt_event_conn_accept_next(nxt_task_t *task, nxt_event_conn_listen_t *cls)
-{
- nxt_event_conn_t *c;
-
- cls->next = NULL;
-
- do {
- c = nxt_event_conn_accept_alloc(task, cls);
-
- if (nxt_fast_path(c != NULL)) {
- return c;
- }
-
- } while (nxt_event_conn_accept_close_idle(task, cls) == NXT_OK);
-
- nxt_log(task, NXT_LOG_CRIT, "no available connections, "
- "new connections are not accepted within 1s");
-
- return NULL;
-}
-
-
-static nxt_int_t
-nxt_event_conn_accept_close_idle(nxt_task_t *task, nxt_event_conn_listen_t *cls)
-{
- nxt_queue_t *idle;
- nxt_queue_link_t *link;
- nxt_event_conn_t *c;
- nxt_event_engine_t *engine;
-
- static nxt_log_moderation_t nxt_idle_close_log_moderation = {
- NXT_LOG_INFO, 2, "idle connections closed", NXT_LOG_MODERATION
- };
-
- engine = task->thread->engine;
-
- idle = &engine->idle_connections;
-
- for (link = nxt_queue_last(idle);
- link != nxt_queue_head(idle);
- link = nxt_queue_next(link))
- {
- c = nxt_queue_link_data(link, nxt_event_conn_t, link);
-
- if (!c->socket.read_ready) {
- nxt_log_moderate(&nxt_idle_close_log_moderation, NXT_LOG_INFO,
- task->log, "no available connections, "
- "close idle connection");
- nxt_queue_remove(link);
- nxt_event_conn_close(engine, c);
-
- return NXT_OK;
- }
- }
-
- nxt_timer_add(engine, &cls->timer, 1000);
-
- nxt_fd_event_disable_read(engine, &cls->socket);
-
- return NXT_DECLINED;
-}
-
-
-void
-nxt_event_conn_accept_error(nxt_task_t *task, nxt_event_conn_listen_t *cls,
- const char *accept_syscall, nxt_err_t err)
-{
- static nxt_log_moderation_t nxt_accept_log_moderation = {
- NXT_LOG_INFO, 2, "accept() failed", NXT_LOG_MODERATION
- };
-
- cls->socket.read_ready = 0;
-
- switch (err) {
-
- case NXT_EAGAIN:
- nxt_debug(task, "%s(%d) %E", accept_syscall, cls->socket.fd, err);
- return;
-
- case ECONNABORTED:
- nxt_log_moderate(&nxt_accept_log_moderation, NXT_LOG_WARN,
- task->log, "%s(%d) failed %E",
- accept_syscall, cls->socket.fd, err);
- return;
-
- case EMFILE:
- case ENFILE:
- case ENOBUFS:
- case ENOMEM:
- if (nxt_event_conn_accept_close_idle(task, cls) != NXT_OK) {
- nxt_log(task, NXT_LOG_CRIT, "%s(%d) failed %E, "
- "new connections are not accepted within 1s",
- accept_syscall, cls->socket.fd, err);
- }
-
- return;
-
- default:
- nxt_log(task, NXT_LOG_CRIT, "%s(%d) failed %E",
- accept_syscall, cls->socket.fd, err);
- return;
- }
-}
-
-
-static void
-nxt_event_conn_listen_timer_handler(nxt_task_t *task, void *obj, void *data)
-{
- nxt_timer_t *ev;
- nxt_event_conn_t *c;
- nxt_event_conn_listen_t *cls;
-
- ev = obj;
-
- cls = nxt_timer_data(ev, nxt_event_conn_listen_t, timer);
- c = cls->next;
-
- if (c == NULL) {
- c = nxt_event_conn_accept_next(task, cls);
-
- if (c == NULL) {
- return;
- }
- }
-
- nxt_fd_event_enable_accept(task->thread->engine, &cls->socket);
-
- cls->accept(task, cls, c);
-}
-
-
-static void
-nxt_event_conn_listen_event_error(nxt_task_t *task, void *obj, void *data)
-{
- nxt_fd_event_t *ev;
-
- ev = obj;
-
- nxt_log(task, NXT_LOG_CRIT, "accept(%d) event error", ev->fd);
-}
diff --git a/src/nxt_event_conn_job_sendfile.c b/src/nxt_event_conn_job_sendfile.c
index dae5079c..15d7fe1d 100644
--- a/src/nxt_event_conn_job_sendfile.c
+++ b/src/nxt_event_conn_job_sendfile.c
@@ -25,11 +25,11 @@ static void nxt_event_conn_job_sendfile_handler(nxt_task_t *task, void *obj,
static void nxt_event_conn_job_sendfile_return(nxt_task_t *task, void *obj,
void *data);
static nxt_buf_t *nxt_event_conn_job_sendfile_completion(nxt_task_t *task,
- nxt_event_conn_t *c, nxt_buf_t *b);
+ nxt_conn_t *c, nxt_buf_t *b);
void
-nxt_event_conn_job_sendfile(nxt_task_t *task, nxt_event_conn_t *c)
+nxt_event_conn_job_sendfile(nxt_task_t *task, nxt_conn_t *c)
{
nxt_fd_event_disable(task->thread->engine, &c->socket);
@@ -41,8 +41,8 @@ nxt_event_conn_job_sendfile(nxt_task_t *task, nxt_event_conn_t *c)
static void
nxt_event_conn_job_sendfile_start(nxt_task_t *task, void *obj, void *data)
{
+ nxt_conn_t *c;
nxt_iobuf_t b;
- nxt_event_conn_t *c;
nxt_job_sendfile_t *jbs;
nxt_sendbuf_coalesce_t sb;
@@ -99,7 +99,7 @@ nxt_event_conn_job_sendfile_handler(nxt_task_t *task, void *obj, void *data)
ssize_t ret;
nxt_buf_t *b;
nxt_bool_t first;
- nxt_event_conn_t *c;
+ nxt_conn_t *c;
nxt_job_sendfile_t *jbs;
jbs = obj;
@@ -166,7 +166,7 @@ nxt_event_conn_job_sendfile_return(nxt_task_t *task, void *obj, void *data)
size_t sent;
nxt_buf_t *b;
nxt_bool_t done;
- nxt_event_conn_t *c;
+ nxt_conn_t *c;
nxt_job_sendfile_t *jbs;
jbs = obj;
@@ -212,8 +212,8 @@ nxt_event_conn_job_sendfile_return(nxt_task_t *task, void *obj, void *data)
if (c->socket.error == 0
&& !nxt_event_conn_write_delayed(task->thread->engine, c, sent))
{
- nxt_event_conn_timer(task->thread->engine, c, c->write_state,
- &c->write_timer);
+ nxt_conn_timer(task->thread->engine, c, c->write_state,
+ &c->write_timer);
nxt_fd_event_oneshot_write(task->thread->engine, &c->socket);
}
@@ -235,7 +235,7 @@ nxt_event_conn_job_sendfile_return(nxt_task_t *task, void *obj, void *data)
static nxt_buf_t *
-nxt_event_conn_job_sendfile_completion(nxt_task_t *task, nxt_event_conn_t *c,
+nxt_event_conn_job_sendfile_completion(nxt_task_t *task, nxt_conn_t *c,
nxt_buf_t *b)
{
while (b != NULL) {
diff --git a/src/nxt_event_conn_proxy.c b/src/nxt_event_conn_proxy.c
deleted file mode 100644
index 45a6b257..00000000
--- a/src/nxt_event_conn_proxy.c
+++ /dev/null
@@ -1,1017 +0,0 @@
-
-/*
- * Copyright (C) Igor Sysoev
- * Copyright (C) NGINX, Inc.
- */
-
-#include <nxt_main.h>
-
-
-static void nxt_event_conn_proxy_client_buffer_alloc(nxt_task_t *task,
- void *obj, void *data);
-static void nxt_event_conn_proxy_peer_connect(nxt_task_t *task, void *obj,
- void *data);
-static void nxt_event_conn_proxy_connected(nxt_task_t *task, void *obj,
- void *data);
-static void nxt_event_conn_proxy_peer_read(nxt_task_t *task, void *obj,
- void *data);
-static void nxt_event_conn_proxy_client_read_ready(nxt_task_t *task, void *obj,
- void *data);
-static void nxt_event_conn_proxy_peer_read_ready(nxt_task_t *task, void *obj,
- void *data);
-static void nxt_event_conn_proxy_read_process(nxt_task_t *task,
- nxt_event_conn_proxy_t *p, nxt_event_conn_t *source,
- nxt_event_conn_t *sink);
-static void nxt_event_conn_proxy_write_add(nxt_event_conn_t *c, nxt_buf_t *b);
-static void nxt_event_conn_proxy_read(nxt_task_t *task, void *obj, void *data);
-static void nxt_event_conn_proxy_client_write_ready(nxt_task_t *task, void *obj,
- void *data);
-static void nxt_event_conn_proxy_peer_write_ready(nxt_task_t *task, void *obj,
- void *data);
-static void nxt_event_conn_proxy_write_process(nxt_task_t *task,
- nxt_event_conn_proxy_t *p, nxt_event_conn_t *sink,
- nxt_event_conn_t *source);
-static void nxt_event_conn_proxy_read_add(nxt_event_conn_t *c, nxt_buf_t *b);
-static void nxt_event_conn_proxy_close(nxt_task_t *task, void *obj, void *data);
-static void nxt_event_conn_proxy_error(nxt_task_t *task, void *obj, void *data);
-static void nxt_event_conn_proxy_read_timeout(nxt_task_t *task, void *obj,
- void *data);
-static void nxt_event_conn_proxy_write_timeout(nxt_task_t *task, void *obj,
- void *data);
-static nxt_msec_t nxt_event_conn_proxy_timeout_value(nxt_event_conn_t *c,
- uintptr_t data);
-static void nxt_event_conn_proxy_refused(nxt_task_t *task, void *obj,
- void *data);
-static void nxt_event_conn_proxy_reconnect_handler(nxt_task_t *task, void *obj,
- void *data);
-static void nxt_event_conn_proxy_shutdown(nxt_task_t *task,
- nxt_event_conn_proxy_t *p, nxt_event_conn_t *source,
- nxt_event_conn_t *sink);
-static void nxt_event_conn_proxy_read_error(nxt_task_t *task, void *obj,
- void *data);
-static void nxt_event_conn_proxy_write_error(nxt_task_t *task, void *obj,
- void *data);
-static void nxt_event_conn_proxy_complete(nxt_task_t *task,
- nxt_event_conn_proxy_t *p);
-static void nxt_event_conn_proxy_completion(nxt_task_t *task, void *obj,
- void *data);
-
-
-static const nxt_event_conn_state_t nxt_event_conn_proxy_client_wait_state;
-static const nxt_event_conn_state_t
- nxt_event_conn_proxy_client_first_read_state;
-static const nxt_event_conn_state_t nxt_event_conn_proxy_peer_connect_state;
-static const nxt_event_conn_state_t nxt_event_conn_proxy_peer_wait_state;
-static const nxt_event_conn_state_t nxt_event_conn_proxy_client_read_state;
-static const nxt_event_conn_state_t nxt_event_conn_proxy_peer_read_state;
-static const nxt_event_conn_state_t nxt_event_conn_proxy_client_write_state;
-static const nxt_event_conn_state_t nxt_event_conn_proxy_peer_write_state;
-
-
-nxt_event_conn_proxy_t *
-nxt_event_conn_proxy_create(nxt_event_conn_t *client)
-{
- nxt_thread_t *thr;
- nxt_event_conn_t *peer;
- nxt_event_conn_proxy_t *p;
-
- p = nxt_mem_zalloc(client->mem_pool, sizeof(nxt_event_conn_proxy_t));
- if (nxt_slow_path(p == NULL)) {
- return NULL;
- }
-
- peer = nxt_event_conn_create(client->mem_pool, client->socket.task);
- if (nxt_slow_path(peer == NULL)) {
- return NULL;
- }
-
- thr = nxt_thread();
-
- client->read_work_queue = &thr->engine->read_work_queue;
- client->write_work_queue = &thr->engine->write_work_queue;
- client->socket.read_work_queue = &thr->engine->read_work_queue;
- client->socket.write_work_queue = &thr->engine->write_work_queue;
- peer->socket.read_work_queue = &thr->engine->read_work_queue;
- peer->socket.write_work_queue = &thr->engine->write_work_queue;
-
- peer->socket.data = client->socket.data;
-
- peer->read_work_queue = client->read_work_queue;
- peer->write_work_queue = client->write_work_queue;
- peer->read_timer.work_queue = client->read_work_queue;
- peer->write_timer.work_queue = client->write_work_queue;
-
- p->client = client;
- p->peer = peer;
-
- return p;
-}
-
-
-void
-nxt_event_conn_proxy(nxt_task_t *task, nxt_event_conn_proxy_t *p)
-{
- nxt_event_conn_t *peer;
-
- /*
- * Peer read event: not connected, disabled.
- * Peer write event: not connected, disabled.
- */
-
- if (p->client_wait_timeout == 0) {
- /*
- * Peer write event: waiting for connection
- * to be established with connect_timeout.
- */
- peer = p->peer;
- peer->write_state = &nxt_event_conn_proxy_peer_connect_state;
-
- nxt_event_conn_connect(task->thread->engine, peer);
- }
-
- /*
- * Client read event: waiting for client data with
- * client_wait_timeout before buffer allocation.
- */
- p->client->read_state = &nxt_event_conn_proxy_client_wait_state;
-
- nxt_conn_wait(p->client);
-}
-
-
-static const nxt_event_conn_state_t nxt_event_conn_proxy_client_wait_state
- nxt_aligned(64) =
-{
- .ready_handler = nxt_event_conn_proxy_client_buffer_alloc,
- .close_handler = nxt_event_conn_proxy_close,
- .error_handler = nxt_event_conn_proxy_error,
-
- .timer_handler = nxt_event_conn_proxy_read_timeout,
- .timer_value = nxt_event_conn_proxy_timeout_value,
- .timer_data = offsetof(nxt_event_conn_proxy_t, client_wait_timeout),
-};
-
-
-static void
-nxt_event_conn_proxy_client_buffer_alloc(nxt_task_t *task, void *obj,
- void *data)
-{
- nxt_buf_t *b;
- nxt_event_conn_t *client;
- nxt_event_conn_proxy_t *p;
-
- client = obj;
- p = data;
-
- nxt_debug(task, "event conn proxy client first read fd:%d",
- client->socket.fd);
-
- b = nxt_buf_mem_alloc(client->mem_pool, p->client_buffer_size,
- NXT_MEM_BUF_CUTBACK | NXT_MEM_BUF_USABLE);
-
- if (nxt_slow_path(b == NULL)) {
- /* An error completion. */
- nxt_event_conn_proxy_complete(task, p);
- return;
- }
-
- p->client_buffer = b;
- client->read = b;
-
- if (p->peer->socket.fd != -1) {
- /*
- * Client read event: waiting, no timeout.
- * Client write event: blocked.
- * Peer read event: disabled.
- * Peer write event: waiting for connection to be established
- * or blocked after the connection has established.
- */
- client->read_state = &nxt_event_conn_proxy_client_read_state;
-
- } else {
- /*
- * Client read event: waiting for data with client_wait_timeout
- * before connecting to a peer.
- * Client write event: blocked.
- * Peer read event: not connected, disabled.
- * Peer write event: not connected, disabled.
- */
- client->read_state = &nxt_event_conn_proxy_client_first_read_state;
- }
-
- nxt_event_conn_read(task->thread->engine, client);
-}
-
-
-static const nxt_event_conn_state_t
- nxt_event_conn_proxy_client_first_read_state
- nxt_aligned(64) =
-{
- .ready_handler = nxt_event_conn_proxy_peer_connect,
- .close_handler = nxt_event_conn_proxy_close,
- .error_handler = nxt_event_conn_proxy_error,
-
- .timer_handler = nxt_event_conn_proxy_read_timeout,
- .timer_value = nxt_event_conn_proxy_timeout_value,
- .timer_data = offsetof(nxt_event_conn_proxy_t, client_wait_timeout),
- .timer_autoreset = 1,
-};
-
-
-static void
-nxt_event_conn_proxy_peer_connect(nxt_task_t *task, void *obj, void *data)
-{
- nxt_event_conn_t *client;
- nxt_event_conn_proxy_t *p;
-
- client = obj;
- p = data;
-
- /*
- * Client read event: waiting, no timeout.
- * Client write event: blocked.
- * Peer read event: disabled.
- * Peer write event: waiting for connection to be established
- * with connect_timeout.
- */
- client->read_state = &nxt_event_conn_proxy_client_read_state;
-
- p->peer->write_state = &nxt_event_conn_proxy_peer_connect_state;
-
- nxt_event_conn_connect(task->thread->engine, p->peer);
-}
-
-
-static const nxt_event_conn_state_t nxt_event_conn_proxy_peer_connect_state
- nxt_aligned(64) =
-{
- .ready_handler = nxt_event_conn_proxy_connected,
- .close_handler = nxt_event_conn_proxy_refused,
- .error_handler = nxt_event_conn_proxy_error,
-
- .timer_handler = nxt_event_conn_proxy_write_timeout,
- .timer_value = nxt_event_conn_proxy_timeout_value,
- .timer_data = offsetof(nxt_event_conn_proxy_t, connect_timeout),
- .timer_autoreset = 1,
-};
-
-
-static void
-nxt_event_conn_proxy_connected(nxt_task_t *task, void *obj, void *data)
-{
- nxt_event_conn_t *client, *peer;
- nxt_event_conn_proxy_t *p;
-
- peer = obj;
- p = data;
-
- nxt_debug(task, "event conn proxy connected fd:%d", peer->socket.fd);
-
- p->connected = 1;
-
- nxt_event_conn_tcp_nodelay_on(task, peer);
- nxt_event_conn_tcp_nodelay_on(task, p->client);
-
- /* Peer read event: waiting with peer_wait_timeout. */
-
- peer->read_state = &nxt_event_conn_proxy_peer_wait_state;
- peer->write_state = &nxt_event_conn_proxy_peer_write_state;
-
- nxt_conn_wait(peer);
-
- if (p->client_buffer != NULL) {
- client = p->client;
-
- client->read_state = &nxt_event_conn_proxy_client_read_state;
- client->write_state = &nxt_event_conn_proxy_client_write_state;
- /*
- * Send a client read data to the connected peer.
- * Client write event: blocked.
- */
- nxt_event_conn_proxy_read_process(task, p, client, peer);
- }
-}
-
-
-static const nxt_event_conn_state_t nxt_event_conn_proxy_peer_wait_state
- nxt_aligned(64) =
-{
- .ready_handler = nxt_event_conn_proxy_peer_read,
- .close_handler = nxt_event_conn_proxy_close,
- .error_handler = nxt_event_conn_proxy_error,
-
- .timer_handler = nxt_event_conn_proxy_read_timeout,
- .timer_value = nxt_event_conn_proxy_timeout_value,
- .timer_data = offsetof(nxt_event_conn_proxy_t, peer_wait_timeout),
-};
-
-
-static void
-nxt_event_conn_proxy_peer_read(nxt_task_t *task, void *obj, void *data)
-{
- nxt_buf_t *b;
- nxt_event_conn_t *peer;
- nxt_event_conn_proxy_t *p;
-
- peer = obj;
- p = data;
-
- nxt_debug(task, "event conn proxy peer read fd:%d", peer->socket.fd);
-
- b = nxt_buf_mem_alloc(peer->mem_pool, p->peer_buffer_size,
- NXT_MEM_BUF_CUTBACK | NXT_MEM_BUF_USABLE);
-
- if (nxt_slow_path(b == NULL)) {
- /* An error completion. */
- nxt_event_conn_proxy_complete(task, p);
- return;
- }
-
- p->peer_buffer = b;
- peer->read = b;
-
- p->client->write_state = &nxt_event_conn_proxy_client_write_state;
- peer->read_state = &nxt_event_conn_proxy_peer_read_state;
- peer->write_state = &nxt_event_conn_proxy_peer_write_state;
-
- /*
- * Client read event: waiting, no timeout.
- * Client write event: blocked.
- * Peer read event: waiting with possible peer_wait_timeout.
- * Peer write event: blocked.
- */
- nxt_event_conn_read(task->thread->engine, peer);
-}
-
-
-static const nxt_event_conn_state_t nxt_event_conn_proxy_client_read_state
- nxt_aligned(64) =
-{
- .ready_handler = nxt_event_conn_proxy_client_read_ready,
- .close_handler = nxt_event_conn_proxy_close,
- .error_handler = nxt_event_conn_proxy_read_error,
-};
-
-
-static void
-nxt_event_conn_proxy_client_read_ready(nxt_task_t *task, void *obj, void *data)
-{
- nxt_event_conn_t *client;
- nxt_event_conn_proxy_t *p;
-
- client = obj;
- p = data;
-
- nxt_debug(task, "event conn proxy client read ready fd:%d",
- client->socket.fd);
-
- nxt_event_conn_proxy_read_process(task, p, client, p->peer);
-}
-
-
-static const nxt_event_conn_state_t nxt_event_conn_proxy_peer_read_state
- nxt_aligned(64) =
-{
- .ready_handler = nxt_event_conn_proxy_peer_read_ready,
- .close_handler = nxt_event_conn_proxy_close,
- .error_handler = nxt_event_conn_proxy_read_error,
-};
-
-
-static void
-nxt_event_conn_proxy_peer_read_ready(nxt_task_t *task, void *obj, void *data)
-{
- nxt_event_conn_t *peer;
- nxt_event_conn_proxy_t *p;
-
- peer = obj;
- p = data;
-
- nxt_debug(task, "event conn proxy peer read ready fd:%d", peer->socket.fd);
-
- nxt_event_conn_proxy_read_process(task, p, peer, p->client);
-}
-
-
-static void
-nxt_event_conn_proxy_read_process(nxt_task_t *task, nxt_event_conn_proxy_t *p,
- nxt_event_conn_t *source, nxt_event_conn_t *sink)
-{
- nxt_buf_t *rb, *wb;
-
- if (sink->socket.error != 0) {
- nxt_debug(task, "event conn proxy sink fd:%d error:%d",
- sink->socket.fd, sink->socket.error);
-
- nxt_event_conn_proxy_write_error(task, sink, sink->socket.data);
- return;
- }
-
- while (source->read != NULL) {
-
- rb = source->read;
-
- if (rb->mem.pos != rb->mem.free) {
-
- /* Add a read part to a write chain. */
-
- wb = nxt_buf_mem_alloc(source->mem_pool, 0, 0);
- if (wb == NULL) {
- /* An error completion. */
- nxt_event_conn_proxy_complete(task, p);
- return;
- }
-
- wb->mem.pos = rb->mem.pos;
- wb->mem.free = rb->mem.free;
- wb->mem.start = rb->mem.pos;
- wb->mem.end = rb->mem.free;
-
- rb->mem.pos = rb->mem.free;
- rb->mem.start = rb->mem.free;
-
- nxt_event_conn_proxy_write_add(sink, wb);
- }
-
- if (rb->mem.start != rb->mem.end) {
- nxt_work_queue_add(source->read_work_queue,
- nxt_event_conn_proxy_read,
- task, source, source->socket.data);
- break;
- }
-
- source->read = rb->next;
- nxt_buf_free(source->mem_pool, rb);
- }
-
- if (p->connected) {
- nxt_event_conn_write(task->thread->engine, sink);
- }
-}
-
-
-static void
-nxt_event_conn_proxy_write_add(nxt_event_conn_t *c, nxt_buf_t *b)
-{
- nxt_buf_t *first, *second, *prev;
-
- first = c->write;
-
- if (first == NULL) {
- c->write = b;
- return;
- }
-
- /*
- * A event conn proxy maintains a buffer per each direction.
- * The buffer is divided by read and write parts. These parts are
- * linked in buffer chains. There can be no more than two buffers
- * in write chain at any time, because an added buffer is coalesced
- * with the last buffer if possible.
- */
-
- second = first->next;
-
- if (second == NULL) {
-
- if (first->mem.end != b->mem.start) {
- first->next = b;
- return;
- }
-
- /*
- * The first buffer is just before the added buffer, so
- * expand the first buffer to the end of the added buffer.
- */
- prev = first;
-
- } else {
- if (second->mem.end != b->mem.start) {
- nxt_thread_log_alert("event conn proxy write: second buffer end:%p "
- "is not equal to added buffer start:%p",
- second->mem.end, b->mem.start);
- return;
- }
-
- /*
- * "second->mem.end == b->mem.start" must be always true here,
- * that is the second buffer is just before the added buffer,
- * so expand the second buffer to the end of added buffer.
- */
- prev = second;
- }
-
- prev->mem.free = b->mem.end;
- prev->mem.end = b->mem.end;
-
- nxt_buf_free(c->mem_pool, b);
-}
-
-
-static void
-nxt_event_conn_proxy_read(nxt_task_t *task, void *obj, void *data)
-{
- nxt_event_conn_t *source, *sink;
- nxt_event_conn_proxy_t *p;
-
- source = obj;
- p = data;
-
- nxt_debug(task, "event conn proxy read fd:%d", source->socket.fd);
-
- if (!source->socket.closed) {
- sink = (source == p->client) ? p->peer : p->client;
-
- if (sink->socket.error == 0) {
- nxt_event_conn_read(task->thread->engine, source);
- }
- }
-}
-
-
-static const nxt_event_conn_state_t nxt_event_conn_proxy_client_write_state
- nxt_aligned(64) =
-{
- .ready_handler = nxt_event_conn_proxy_client_write_ready,
- .error_handler = nxt_event_conn_proxy_write_error,
-
- .timer_handler = nxt_event_conn_proxy_write_timeout,
- .timer_value = nxt_event_conn_proxy_timeout_value,
- .timer_data = offsetof(nxt_event_conn_proxy_t, client_write_timeout),
- .timer_autoreset = 1,
-};
-
-
-static void
-nxt_event_conn_proxy_client_write_ready(nxt_task_t *task, void *obj, void *data)
-{
- nxt_event_conn_t *client;
- nxt_event_conn_proxy_t *p;
-
- client = obj;
- p = data;
-
- nxt_debug(task, "event conn proxy client write ready fd:%d",
- client->socket.fd);
-
- nxt_event_conn_proxy_write_process(task, p, client, p->peer);
-}
-
-
-static const nxt_event_conn_state_t nxt_event_conn_proxy_peer_write_state
- nxt_aligned(64) =
-{
- .ready_handler = nxt_event_conn_proxy_peer_write_ready,
- .error_handler = nxt_event_conn_proxy_write_error,
-
- .timer_handler = nxt_event_conn_proxy_write_timeout,
- .timer_value = nxt_event_conn_proxy_timeout_value,
- .timer_data = offsetof(nxt_event_conn_proxy_t, peer_write_timeout),
- .timer_autoreset = 1,
-};
-
-
-static void
-nxt_event_conn_proxy_peer_write_ready(nxt_task_t *task, void *obj, void *data)
-{
- nxt_event_conn_t *peer;
- nxt_event_conn_proxy_t *p;
-
- peer = obj;
- p = data;
-
- nxt_debug(task, "event conn proxy peer write ready fd:%d", peer->socket.fd);
-
- nxt_event_conn_proxy_write_process(task, p, peer, p->client);
-}
-
-
-static void
-nxt_event_conn_proxy_write_process(nxt_task_t *task, nxt_event_conn_proxy_t *p,
- nxt_event_conn_t *sink, nxt_event_conn_t *source)
-{
- nxt_buf_t *rb, *wb;
-
- while (sink->write != NULL) {
-
- wb = sink->write;
-
- if (nxt_buf_is_sync(wb)) {
-
- /* A sync buffer marks the end of stream. */
-
- sink->write = NULL;
- nxt_buf_free(sink->mem_pool, wb);
- nxt_event_conn_proxy_shutdown(task, p, source, sink);
- return;
- }
-
- if (wb->mem.start != wb->mem.pos) {
-
- /* Add a written part to a read chain. */
-
- rb = nxt_buf_mem_alloc(sink->mem_pool, 0, 0);
- if (rb == NULL) {
- /* An error completion. */
- nxt_event_conn_proxy_complete(task, p);
- return;
- }
-
- rb->mem.pos = wb->mem.start;
- rb->mem.free = wb->mem.start;
- rb->mem.start = wb->mem.start;
- rb->mem.end = wb->mem.pos;
-
- wb->mem.start = wb->mem.pos;
-
- nxt_event_conn_proxy_read_add(source, rb);
- }
-
- if (wb->mem.pos != wb->mem.free) {
- nxt_event_conn_write(task->thread->engine, sink);
-
- break;
- }
-
- sink->write = wb->next;
- nxt_buf_free(sink->mem_pool, wb);
- }
-
- nxt_work_queue_add(source->read_work_queue,
- nxt_event_conn_proxy_read, task, source,
- source->socket.data);
-}
-
-
-static void
-nxt_event_conn_proxy_read_add(nxt_event_conn_t *c, nxt_buf_t *b)
-{
- nxt_buf_t *first, *second;
-
- first = c->read;
-
- if (first == NULL) {
- c->read = b;
- return;
- }
-
- /*
- * A event conn proxy maintains a buffer per each direction.
- * The buffer is divided by read and write parts. These parts are
- * linked in buffer chains. There can be no more than two buffers
- * in read chain at any time, because an added buffer is coalesced
- * with the last buffer if possible. The first and the second
- * buffers are also coalesced if possible.
- */
-
- second = first->next;
-
- if (second == NULL) {
-
- if (first->mem.start == b->mem.end) {
- /*
- * The added buffer is just before the first buffer, so expand
- * the first buffer to the beginning of the added buffer.
- */
- first->mem.pos = b->mem.start;
- first->mem.free = b->mem.start;
- first->mem.start = b->mem.start;
-
- } else if (first->mem.end == b->mem.start) {
- /*
- * The added buffer is just after the first buffer, so
- * expand the first buffer to the end of the added buffer.
- */
- first->mem.end = b->mem.end;
-
- } else {
- first->next = b;
- return;
- }
-
- } else {
- if (second->mem.end != b->mem.start) {
- nxt_thread_log_alert("event conn proxy read: second buffer end:%p "
- "is not equal to added buffer start:%p",
- second->mem.end, b->mem.start);
- return;
- }
-
- /*
- * The added buffer is just after the second buffer, so
- * expand the second buffer to the end of the added buffer.
- */
- second->mem.end = b->mem.end;
-
- if (first->mem.start == second->mem.end) {
- /*
- * The second buffer is just before the first buffer, so expand
- * the first buffer to the beginning of the second buffer.
- */
- first->mem.pos = second->mem.start;
- first->mem.free = second->mem.start;
- first->mem.start = second->mem.start;
- first->next = NULL;
-
- nxt_buf_free(c->mem_pool, second);
- }
- }
-
- nxt_buf_free(c->mem_pool, b);
-}
-
-
-static void
-nxt_event_conn_proxy_close(nxt_task_t *task, void *obj, void *data)
-{
- nxt_buf_t *b;
- nxt_event_conn_t *source, *sink;
- nxt_event_conn_proxy_t *p;
-
- source = obj;
- p = data;
-
- nxt_debug(task, "event conn proxy close fd:%d", source->socket.fd);
-
- sink = (source == p->client) ? p->peer : p->client;
-
- if (sink->write == NULL) {
- nxt_event_conn_proxy_shutdown(task, p, source, sink);
- return;
- }
-
- b = nxt_buf_sync_alloc(source->mem_pool, 0);
- if (b == NULL) {
- /* An error completion. */
- nxt_event_conn_proxy_complete(task, p);
- return;
- }
-
- nxt_buf_chain_add(&sink->write, b);
-}
-
-
-static void
-nxt_event_conn_proxy_error(nxt_task_t *task, void *obj, void *data)
-{
- nxt_event_conn_t *c;
- nxt_event_conn_proxy_t *p;
-
- c = obj;
- p = data;
-
- nxt_debug(task, "event conn proxy error fd:%d", c->socket.fd);
-
- nxt_event_conn_proxy_close(task, c, p);
-}
-
-
-static void
-nxt_event_conn_proxy_read_timeout(nxt_task_t *task, void *obj, void *data)
-{
- nxt_timer_t *ev;
- nxt_event_conn_t *c;
-
- ev = obj;
-
- c = nxt_event_read_timer_conn(ev);
- c->socket.timedout = 1;
- c->socket.closed = 1;
-
- nxt_debug(task, "event conn proxy read timeout fd:%d", c->socket.fd);
-
- nxt_event_conn_proxy_close(task, c, c->socket.data);
-}
-
-
-static void
-nxt_event_conn_proxy_write_timeout(nxt_task_t *task, void *obj, void *data)
-{
- nxt_timer_t *ev;
- nxt_event_conn_t *c;
-
- ev = obj;
-
- c = nxt_event_write_timer_conn(ev);
- c->socket.timedout = 1;
- c->socket.closed = 1;
-
- nxt_debug(task, "event conn proxy write timeout fd:%d", c->socket.fd);
-
- nxt_event_conn_proxy_close(task, c, c->socket.data);
-}
-
-
-static nxt_msec_t
-nxt_event_conn_proxy_timeout_value(nxt_event_conn_t *c, uintptr_t data)
-{
- nxt_msec_t *timer;
- nxt_event_conn_proxy_t *p;
-
- p = c->socket.data;
-
- timer = (nxt_msec_t *) ((char *) p + data);
-
- return *timer;
-}
-
-
-static void
-nxt_event_conn_proxy_refused(nxt_task_t *task, void *obj, void *data)
-{
- nxt_event_conn_t *peer;
- nxt_event_conn_proxy_t *p;
-
- peer = obj;
- p = data;
-
- nxt_debug(task, "event conn proxy refused fd:%d", peer->socket.fd);
-
- if (p->retries == 0) {
- /* An error completion. */
- nxt_event_conn_proxy_complete(task, p);
- return;
- }
-
- p->retries--;
-
- nxt_socket_close(task, peer->socket.fd);
- peer->socket.fd = -1;
- peer->socket.error = 0;
-
- p->delayed = 1;
-
- peer->write_timer.handler = nxt_event_conn_proxy_reconnect_handler;
- nxt_timer_add(task->thread->engine, &peer->write_timer,
- p->reconnect_timeout);
-}
-
-
-static void
-nxt_event_conn_proxy_reconnect_handler(nxt_task_t *task, void *obj, void *data)
-{
- nxt_timer_t *ev;
- nxt_event_conn_t *peer;
- nxt_event_conn_proxy_t *p;
-
- ev = obj;
-
- nxt_debug(task, "event conn proxy reconnect timer");
-
- peer = nxt_event_write_timer_conn(ev);
- p = peer->socket.data;
-
- if (p->client->socket.closed) {
- nxt_event_conn_proxy_complete(task, p);
- return;
- }
-
- p->delayed = 0;
-
- peer->write_state = &nxt_event_conn_proxy_peer_connect_state;
- /*
- * Peer read event: disabled.
- * Peer write event: waiting for connection with connect_timeout.
- */
- nxt_event_conn_connect(task->thread->engine, peer);
-}
-
-
-static void
-nxt_event_conn_proxy_shutdown(nxt_task_t *task, nxt_event_conn_proxy_t *p,
- nxt_event_conn_t *source, nxt_event_conn_t *sink)
-{
- nxt_buf_t *b;
-
- nxt_debug(source->socket.task,
- "event conn proxy shutdown source fd:%d cl:%d err:%d",
- source->socket.fd, source->socket.closed, source->socket.error);
-
- nxt_debug(sink->socket.task,
- "event conn proxy shutdown sink fd:%d cl:%d err:%d",
- sink->socket.fd, sink->socket.closed, sink->socket.error);
-
- if (!p->connected || p->delayed) {
- nxt_event_conn_proxy_complete(task, p);
- return;
- }
-
- if (sink->socket.error == 0 && !sink->socket.closed) {
- sink->socket.shutdown = 1;
- nxt_socket_shutdown(task, sink->socket.fd, SHUT_WR);
- }
-
- if (sink->socket.error != 0
- || (sink->socket.closed && source->write == NULL))
- {
- /* The opposite direction also has been already closed. */
- nxt_event_conn_proxy_complete(task, p);
- return;
- }
-
- nxt_debug(source->socket.task, "free source buffer");
-
- /* Free the direction's buffer. */
- b = (source == p->client) ? p->client_buffer : p->peer_buffer;
- nxt_mem_free(source->mem_pool, b);
-}
-
-
-static void
-nxt_event_conn_proxy_read_error(nxt_task_t *task, void *obj, void *data)
-{
- nxt_event_conn_t *c;
- nxt_event_conn_proxy_t *p;
-
- c = obj;
- p = data;
-
- nxt_debug(task, "event conn proxy read error fd:%d", c->socket.fd);
-
- nxt_event_conn_proxy_close(task, c, p);
-}
-
-
-static void
-nxt_event_conn_proxy_write_error(nxt_task_t *task, void *obj, void *data)
-{
- nxt_event_conn_t *source, *sink;
- nxt_event_conn_proxy_t *p;
-
- sink = obj;
- p = data;
-
- nxt_debug(task, "event conn proxy write error fd:%d", sink->socket.fd);
-
- /* Clear data for the direction sink. */
- sink->write = NULL;
-
- /* Block the direction source. */
- source = (sink == p->client) ? p->peer : p->client;
- nxt_fd_event_block_read(task->thread->engine, &source->socket);
-
- if (source->write == NULL) {
- /*
- * There is no data for the opposite direction and
- * the next read from the sink will most probably fail.
- */
- nxt_event_conn_proxy_complete(task, p);
- }
-}
-
-
-static const nxt_event_conn_state_t nxt_event_conn_proxy_close_state
- nxt_aligned(64) =
-{
- .ready_handler = nxt_event_conn_proxy_completion,
-};
-
-
-static void
-nxt_event_conn_proxy_complete(nxt_task_t *task, nxt_event_conn_proxy_t *p)
-{
- nxt_event_engine_t *engine;
-
- engine = task->thread->engine;
-
- nxt_debug(p->client->socket.task, "event conn proxy complete %d:%d",
- p->client->socket.fd, p->peer->socket.fd);
-
- if (p->delayed) {
- p->delayed = 0;
- nxt_queue_remove(&p->peer->link);
- }
-
- if (p->client->socket.fd != -1) {
- p->retain = 1;
- p->client->write_state = &nxt_event_conn_proxy_close_state;
- nxt_event_conn_close(engine, p->client);
- }
-
- if (p->peer->socket.fd != -1) {
- p->retain++;
- p->peer->write_state = &nxt_event_conn_proxy_close_state;
- nxt_event_conn_close(engine, p->peer);
- }
-}
-
-
-static void
-nxt_event_conn_proxy_completion(nxt_task_t *task, void *obj, void *data)
-{
- nxt_event_conn_proxy_t *p;
-
- p = data;
-
- nxt_debug(p->client->socket.task, "event conn proxy completion %d:%d:%d",
- p->retain, p->client->socket.fd, p->peer->socket.fd);
-
- p->retain--;
-
- if (p->retain == 0) {
- nxt_mem_free(p->client->mem_pool, p->client_buffer);
- nxt_mem_free(p->client->mem_pool, p->peer_buffer);
-
- p->completion_handler(task, p, NULL);
- }
-}
diff --git a/src/nxt_event_engine.h b/src/nxt_event_engine.h
index 6947cc8d..5b602bc0 100644
--- a/src/nxt_event_engine.h
+++ b/src/nxt_event_engine.h
@@ -123,13 +123,13 @@ typedef struct {
* events.
*/
void (*enable_file)(nxt_event_engine_t *engine,
- nxt_event_file_t *fev);
+ nxt_file_event_t *ev);
/*
* Delete a file from an event set before closing the file descriptor.
*/
void (*close_file)(nxt_event_engine_t *engine,
- nxt_event_file_t *fev);
+ nxt_file_event_t *ev);
/*
* Enable post event notifications and set a post handler to handle
@@ -157,7 +157,7 @@ typedef struct {
nxt_msec_t timeout);
/* I/O operations suitable to underlying event facility. */
- nxt_event_conn_io_t *io;
+ nxt_conn_io_t *io;
/* True if an event facility supports file change event notifications. */
uint8_t file_support; /* 1 bit */
diff --git a/src/nxt_eventport_engine.c b/src/nxt_eventport_engine.c
index 7dc0ffa8..2bf111f9 100644
--- a/src/nxt_eventport_engine.c
+++ b/src/nxt_eventport_engine.c
@@ -78,7 +78,7 @@ const nxt_event_interface_t nxt_eventport_engine = {
nxt_eventport_signal,
nxt_eventport_poll,
- &nxt_unix_event_conn_io,
+ &nxt_unix_conn_io,
NXT_NO_FILE_EVENTS,
NXT_NO_SIGNAL_EVENTS,
diff --git a/src/nxt_event_file.h b/src/nxt_file_event.h
index 06f1762f..d4255ce9 100644
--- a/src/nxt_event_file.h
+++ b/src/nxt_file_event.h
@@ -3,8 +3,8 @@
* Copyright (C) NGINX, Inc.
*/
-#ifndef _NXT_EVENT_FILE_H_INCLUDED_
-#define _NXT_EVENT_FILE_H_INCLUDED_
+#ifndef _NXT_FILE_EVENT_H_INCLUDED_
+#define _NXT_FILE_EVENT_H_INCLUDED_
typedef struct {
@@ -12,7 +12,7 @@ typedef struct {
nxt_file_t *file;
nxt_work_handler_t handler;
nxt_task_t *task;
-} nxt_event_file_t;
+} nxt_file_event_t;
-#endif /* _NXT_EVENT_FILE_H_INCLUDED_ */
+#endif /* _NXT_FILE_EVENT_H_INCLUDED_ */
diff --git a/src/nxt_kqueue_engine.c b/src/nxt_kqueue_engine.c
index c5e40eeb..4582b314 100644
--- a/src/nxt_kqueue_engine.c
+++ b/src/nxt_kqueue_engine.c
@@ -78,9 +78,9 @@ static void nxt_kqueue_oneshot_write(nxt_event_engine_t *engine,
static void nxt_kqueue_enable_accept(nxt_event_engine_t *engine,
nxt_fd_event_t *ev);
static void nxt_kqueue_enable_file(nxt_event_engine_t *engine,
- nxt_event_file_t *ev);
+ nxt_file_event_t *ev);
static void nxt_kqueue_close_file(nxt_event_engine_t *engine,
- nxt_event_file_t *ev);
+ nxt_file_event_t *ev);
static void nxt_kqueue_fd_set(nxt_event_engine_t *engine, nxt_fd_event_t *ev,
nxt_int_t filter, nxt_uint_t flags);
static struct kevent *nxt_kqueue_get_kevent(nxt_event_engine_t *engine);
@@ -98,26 +98,25 @@ static void nxt_kqueue_signal(nxt_event_engine_t *engine, nxt_uint_t signo);
#endif
static void nxt_kqueue_poll(nxt_event_engine_t *engine, nxt_msec_t timeout);
-static void nxt_kqueue_event_conn_io_connect(nxt_task_t *task, void *obj,
+static void nxt_kqueue_conn_io_connect(nxt_task_t *task, void *obj,
void *data);
-static void nxt_kqueue_event_conn_connected(nxt_task_t *task, void *obj,
+static void nxt_kqueue_conn_connected(nxt_task_t *task, void *obj,
void *data);
static void nxt_kqueue_listen_handler(nxt_task_t *task, void *obj, void *data);
-static void nxt_kqueue_event_conn_io_accept(nxt_task_t *task, void *obj,
+static void nxt_kqueue_conn_io_accept(nxt_task_t *task, void *obj,
void *data);
-static void nxt_kqueue_event_conn_io_read(nxt_task_t *task, void *obj,
+static void nxt_kqueue_conn_io_read(nxt_task_t *task, void *obj,
void *data);
-static ssize_t nxt_kqueue_event_conn_io_recvbuf(nxt_event_conn_t *c,
- nxt_buf_t *b);
+static ssize_t nxt_kqueue_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b);
-static nxt_event_conn_io_t nxt_kqueue_event_conn_io = {
- nxt_kqueue_event_conn_io_connect,
- nxt_kqueue_event_conn_io_accept,
+static nxt_conn_io_t nxt_kqueue_conn_io = {
+ nxt_kqueue_conn_io_connect,
+ nxt_kqueue_conn_io_accept,
- nxt_kqueue_event_conn_io_read,
- nxt_kqueue_event_conn_io_recvbuf,
- nxt_event_conn_io_recv,
+ nxt_kqueue_conn_io_read,
+ nxt_kqueue_conn_io_recvbuf,
+ nxt_conn_io_recv,
nxt_conn_io_write,
nxt_event_conn_io_write_chunk,
@@ -133,7 +132,7 @@ static nxt_event_conn_io_t nxt_kqueue_event_conn_io = {
nxt_event_conn_io_writev,
nxt_event_conn_io_send,
- nxt_event_conn_io_shutdown,
+ nxt_conn_io_shutdown,
};
@@ -165,7 +164,7 @@ const nxt_event_interface_t nxt_kqueue_engine = {
#endif
nxt_kqueue_poll,
- &nxt_kqueue_event_conn_io,
+ &nxt_kqueue_conn_io,
NXT_FILE_EVENTS,
NXT_SIGNAL_EVENTS,
@@ -414,7 +413,7 @@ nxt_kqueue_enable_accept(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
static void
-nxt_kqueue_enable_file(nxt_event_engine_t *engine, nxt_event_file_t *ev)
+nxt_kqueue_enable_file(nxt_event_engine_t *engine, nxt_file_event_t *ev)
{
struct kevent *kev;
@@ -437,7 +436,7 @@ nxt_kqueue_enable_file(nxt_event_engine_t *engine, nxt_event_file_t *ev)
static void
-nxt_kqueue_close_file(nxt_event_engine_t *engine, nxt_event_file_t *ev)
+nxt_kqueue_close_file(nxt_event_engine_t *engine, nxt_file_event_t *ev)
{
/* TODO: pending event. */
}
@@ -497,7 +496,7 @@ nxt_kqueue_error(nxt_event_engine_t *engine)
{
struct kevent *kev, *end;
nxt_fd_event_t *ev;
- nxt_event_file_t *fev;
+ nxt_file_event_t *fev;
nxt_work_queue_t *wq;
wq = &engine->fast_work_queue;
@@ -551,7 +550,7 @@ nxt_kqueue_fd_error_handler(nxt_task_t *task, void *obj, void *data)
static void
nxt_kqueue_file_error_handler(nxt_task_t *task, void *obj, void *data)
{
- nxt_event_file_t *ev;
+ nxt_file_event_t *ev;
ev = obj;
@@ -678,7 +677,7 @@ nxt_kqueue_poll(nxt_event_engine_t *engine, nxt_msec_t timeout)
nxt_fd_event_t *ev;
nxt_sig_event_t *sigev;
struct timespec ts, *tp;
- nxt_event_file_t *fev;
+ nxt_file_event_t *fev;
nxt_work_queue_t *wq;
nxt_work_handler_t handler;
@@ -850,9 +849,9 @@ nxt_kqueue_poll(nxt_event_engine_t *engine, nxt_msec_t timeout)
*/
static void
-nxt_kqueue_event_conn_io_connect(nxt_task_t *task, void *obj, void *data)
+nxt_kqueue_conn_io_connect(nxt_task_t *task, void *obj, void *data)
{
- nxt_event_conn_t *c;
+ nxt_conn_t *c;
nxt_event_engine_t *engine;
nxt_work_handler_t handler;
const nxt_event_conn_state_t *state;
@@ -869,11 +868,11 @@ nxt_kqueue_event_conn_io_connect(nxt_task_t *task, void *obj, void *data)
break;
case NXT_AGAIN:
- c->socket.write_handler = nxt_kqueue_event_conn_connected;
- c->socket.error_handler = nxt_event_conn_connect_error;
+ c->socket.write_handler = nxt_kqueue_conn_connected;
+ c->socket.error_handler = nxt_conn_connect_error;
engine = task->thread->engine;
- nxt_event_conn_timer(engine, c, state, &c->write_timer);
+ nxt_conn_timer(engine, c, state, &c->write_timer);
nxt_kqueue_enable_write(engine, &c->socket);
return;
@@ -892,13 +891,13 @@ nxt_kqueue_event_conn_io_connect(nxt_task_t *task, void *obj, void *data)
static void
-nxt_kqueue_event_conn_connected(nxt_task_t *task, void *obj, void *data)
+nxt_kqueue_conn_connected(nxt_task_t *task, void *obj, void *data)
{
- nxt_event_conn_t *c;
+ nxt_conn_t *c;
c = obj;
- nxt_debug(task, "kqueue event conn connected fd:%d", c->socket.fd);
+ nxt_debug(task, "kqueue conn connected fd:%d", c->socket.fd);
c->socket.write = NXT_EVENT_BLOCKED;
@@ -914,36 +913,36 @@ nxt_kqueue_event_conn_connected(nxt_task_t *task, void *obj, void *data)
static void
nxt_kqueue_listen_handler(nxt_task_t *task, void *obj, void *data)
{
- nxt_event_conn_listen_t *cls;
+ nxt_listen_event_t *lev;
- cls = obj;
+ lev = obj;
nxt_debug(task, "kevent fd:%d avail:%D",
- cls->socket.fd, cls->socket.kq_available);
+ lev->socket.fd, lev->socket.kq_available);
- cls->ready = nxt_min(cls->batch, (uint32_t) cls->socket.kq_available);
+ lev->ready = nxt_min(lev->batch, (uint32_t) lev->socket.kq_available);
- nxt_kqueue_event_conn_io_accept(task, cls, data);
+ nxt_kqueue_conn_io_accept(task, lev, data);
}
static void
-nxt_kqueue_event_conn_io_accept(nxt_task_t *task, void *obj, void *data)
+nxt_kqueue_conn_io_accept(nxt_task_t *task, void *obj, void *data)
{
- socklen_t len;
- nxt_socket_t s;
- struct sockaddr *sa;
- nxt_event_conn_t *c;
- nxt_event_conn_listen_t *cls;
+ socklen_t len;
+ nxt_conn_t *c;
+ nxt_socket_t s;
+ struct sockaddr *sa;
+ nxt_listen_event_t *lev;
- cls = obj;
- c = cls->next;
+ lev = obj;
+ c = lev->next;
- cls->ready--;
- cls->socket.read_ready = (cls->ready != 0);
+ lev->ready--;
+ lev->socket.read_ready = (lev->ready != 0);
- cls->socket.kq_available--;
- cls->socket.read_ready = (cls->socket.kq_available != 0);
+ lev->socket.kq_available--;
+ lev->socket.read_ready = (lev->socket.kq_available != 0);
len = c->remote->socklen;
@@ -955,34 +954,34 @@ nxt_kqueue_event_conn_io_accept(nxt_task_t *task, void *obj, void *data)
len = 0;
}
- s = accept(cls->socket.fd, sa, &len);
+ s = accept(lev->socket.fd, sa, &len);
if (s != -1) {
c->socket.fd = s;
- nxt_debug(task, "accept(%d): %d", cls->socket.fd, s);
+ nxt_debug(task, "accept(%d): %d", lev->socket.fd, s);
- nxt_event_conn_accept(task, cls, c);
+ nxt_conn_accept(task, lev, c);
return;
}
- nxt_event_conn_accept_error(task, cls, "accept", nxt_errno);
+ nxt_conn_accept_error(task, lev, "accept", nxt_errno);
}
/*
- * nxt_kqueue_event_conn_io_read() is just a wrapper to eliminate the
+ * nxt_kqueue_conn_io_read() is just a wrapper to eliminate the
* readv() or recv() syscall if a remote side just closed connection.
*/
static void
-nxt_kqueue_event_conn_io_read(nxt_task_t *task, void *obj, void *data)
+nxt_kqueue_conn_io_read(nxt_task_t *task, void *obj, void *data)
{
- nxt_event_conn_t *c;
+ nxt_conn_t *c;
c = obj;
- nxt_debug(task, "kqueue event conn read fd:%d", c->socket.fd);
+ nxt_debug(task, "kqueue conn read fd:%d", c->socket.fd);
if (c->socket.kq_available == 0 && c->socket.kq_eof) {
nxt_debug(task, "kevent fd:%d eof", c->socket.fd);
@@ -993,18 +992,18 @@ nxt_kqueue_event_conn_io_read(nxt_task_t *task, void *obj, void *data)
return;
}
- nxt_event_conn_io_read(task, c, data);
+ nxt_conn_io_read(task, c, data);
}
/*
- * nxt_kqueue_event_conn_io_recvbuf() is just wrapper around standard
- * nxt_event_conn_io_recvbuf() to eliminate the readv() or recv() syscalls
+ * nxt_kqueue_conn_io_recvbuf() is just wrapper around standard
+ * nxt_conn_io_recvbuf() to eliminate the readv() or recv() syscalls
* if there is no pending data or a remote side closed connection.
*/
static ssize_t
-nxt_kqueue_event_conn_io_recvbuf(nxt_event_conn_t *c, nxt_buf_t *b)
+nxt_kqueue_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b)
{
ssize_t n;
@@ -1013,7 +1012,7 @@ nxt_kqueue_event_conn_io_recvbuf(nxt_event_conn_t *c, nxt_buf_t *b)
return 0;
}
- n = nxt_event_conn_io_recvbuf(c, b);
+ n = nxt_conn_io_recvbuf(c, b);
if (n > 0) {
c->socket.kq_available -= n;
diff --git a/src/nxt_listen_socket.c b/src/nxt_listen_socket.c
index 08751dfc..7a0e3a7c 100644
--- a/src/nxt_listen_socket.c
+++ b/src/nxt_listen_socket.c
@@ -241,7 +241,7 @@ nxt_listen_socket_pool_min_size(nxt_listen_socket_t *ls)
#endif
return size + sizeof(nxt_mem_pool_t)
- + sizeof(nxt_event_conn_t)
+ + sizeof(nxt_conn_t)
+ sizeof(nxt_log_t);
}
diff --git a/src/nxt_macosx_sendfile.c b/src/nxt_macosx_sendfile.c
index f636819c..2c6ea954 100644
--- a/src/nxt_macosx_sendfile.c
+++ b/src/nxt_macosx_sendfile.c
@@ -26,8 +26,7 @@ static int nxt_sys_sendfile(int fd, int s, off_t offset, off_t *len,
ssize_t
-nxt_macosx_event_conn_io_sendfile(nxt_event_conn_t *c, nxt_buf_t *b,
- size_t limit)
+nxt_macosx_event_conn_io_sendfile(nxt_conn_t *c, nxt_buf_t *b, size_t limit)
{
size_t hd_size, file_size;
ssize_t n;
diff --git a/src/nxt_main.h b/src/nxt_main.h
index 24ec4ca1..19db0aa3 100644
--- a/src/nxt_main.h
+++ b/src/nxt_main.h
@@ -110,7 +110,7 @@ typedef struct nxt_port_mmap_s nxt_port_mmap_t;
#include <nxt_buf_pool.h>
#include <nxt_recvbuf.h>
-typedef struct nxt_event_conn_s nxt_event_conn_t;
+typedef struct nxt_conn_s nxt_conn_t;
#include <nxt_sendbuf.h>
#include <nxt_log_moderation.h>
@@ -129,6 +129,7 @@ nxt_thread_extern_data(nxt_thread_t, nxt_thread_context);
#include <nxt_thread_log.h>
#include <nxt_fd_event.h>
+#include <nxt_file_event.h>
#include <nxt_port.h>
#include <nxt_port_memory.h>
@@ -137,12 +138,10 @@ nxt_thread_extern_data(nxt_thread_t, nxt_thread_context);
#endif
-typedef void (*nxt_event_conn_handler_t)(nxt_thread_t *thr,
- nxt_event_conn_t *c);
+typedef void (*nxt_event_conn_handler_t)(nxt_thread_t *thr, nxt_conn_t *c);
#include <nxt_listen_socket.h>
-#include <nxt_event_conn.h>
-#include <nxt_event_file.h>
+#include <nxt_conn.h>
#include <nxt_event_engine.h>
#include <nxt_job.h>
diff --git a/src/nxt_openssl.c b/src/nxt_openssl.c
index 17e65a56..66f0ad32 100644
--- a/src/nxt_openssl.c
+++ b/src/nxt_openssl.c
@@ -23,23 +23,20 @@ typedef struct {
static nxt_int_t nxt_openssl_server_init(nxt_ssltls_conf_t *conf);
static void nxt_openssl_conn_init(nxt_task_t *task, nxt_ssltls_conf_t *conf,
- nxt_event_conn_t *c);
-static void nxt_openssl_session_cleanup(void *data);
+ nxt_conn_t *c);
+static void nxt_openssl_session_cleanup(nxt_task_t *task, void *data);
static void nxt_openssl_conn_handshake(nxt_task_t *task, void *obj, void *data);
static void nxt_openssl_conn_io_read(nxt_task_t *task, void *obj, void *data);
static void nxt_openssl_conn_io_shutdown(nxt_task_t *task, void *obj,
void *data);
-static ssize_t nxt_openssl_conn_io_write_chunk(nxt_event_conn_t *c,
- nxt_buf_t *b, size_t limit);
-static ssize_t nxt_openssl_conn_io_send(nxt_event_conn_t *c, void *buf,
- size_t size);
+static ssize_t nxt_openssl_conn_io_write_chunk(nxt_conn_t *c, nxt_buf_t *b,
+ size_t limit);
+static ssize_t nxt_openssl_conn_io_send(nxt_conn_t *c, void *buf, size_t size);
static nxt_int_t nxt_openssl_conn_test_error(nxt_task_t *task,
- nxt_event_conn_t *c, int ret, nxt_err_t sys_err,
- nxt_work_handler_t handler);
-static void nxt_cdecl nxt_openssl_conn_error(nxt_event_conn_t *c, nxt_err_t err,
+ nxt_conn_t *c, int ret, nxt_err_t sys_err, nxt_work_handler_t handler);
+static void nxt_cdecl nxt_openssl_conn_error(nxt_conn_t *c, nxt_err_t err,
const char *fmt, ...);
-static nxt_uint_t nxt_openssl_log_error_level(nxt_event_conn_t *c,
- nxt_err_t err);
+static nxt_uint_t nxt_openssl_log_error_level(nxt_conn_t *c, nxt_err_t err);
static void nxt_cdecl nxt_openssl_log_error(nxt_uint_t level, nxt_log_t *log,
const char *fmt, ...);
static u_char *nxt_openssl_copy_error(u_char *p, u_char *end);
@@ -51,7 +48,7 @@ const nxt_ssltls_lib_t nxt_openssl_lib = {
};
-static nxt_event_conn_io_t nxt_openssl_event_conn_io = {
+static nxt_conn_io_t nxt_openssl_conn_io = {
NULL,
NULL,
@@ -249,8 +246,7 @@ fail:
static void
-nxt_openssl_conn_init(nxt_task_t *task, nxt_ssltls_conf_t *conf,
- nxt_event_conn_t *c)
+nxt_openssl_conn_init(nxt_task_t *task, nxt_ssltls_conf_t *conf, nxt_conn_t *c)
{
int ret;
SSL *s;
@@ -301,7 +297,7 @@ nxt_openssl_conn_init(nxt_task_t *task, nxt_ssltls_conf_t *conf,
goto fail;
}
- c->io = &nxt_openssl_event_conn_io;
+ c->io = &nxt_openssl_conn_io;
c->sendfile = NXT_CONN_SENDFILE_OFF;
nxt_openssl_conn_handshake(task, c, c->socket.data);
@@ -315,13 +311,13 @@ fail:
static void
-nxt_openssl_session_cleanup(void *data)
+nxt_openssl_session_cleanup(nxt_task_t *task, void *data)
{
nxt_openssl_conn_t *ssltls;
ssltls = data;
- nxt_thread_log_debug("openssl session cleanup");
+ nxt_debug(task, "openssl session cleanup");
nxt_free(ssltls->buffer.start);
@@ -335,7 +331,7 @@ nxt_openssl_conn_handshake(nxt_task_t *task, void *obj, void *data)
int ret;
nxt_int_t n;
nxt_err_t err;
- nxt_event_conn_t *c;
+ nxt_conn_t *c;
nxt_openssl_conn_t *ssltls;
c = obj;
@@ -382,7 +378,7 @@ nxt_openssl_conn_io_read(nxt_task_t *task, void *obj, void *data)
nxt_buf_t *b;
nxt_int_t n;
nxt_err_t err;
- nxt_event_conn_t *c;
+ nxt_conn_t *c;
nxt_work_handler_t handler;
nxt_openssl_conn_t *ssltls;
@@ -432,7 +428,7 @@ nxt_openssl_conn_io_read(nxt_task_t *task, void *obj, void *data)
static ssize_t
-nxt_openssl_conn_io_write_chunk(nxt_event_conn_t *c, nxt_buf_t *b, size_t limit)
+nxt_openssl_conn_io_write_chunk(nxt_conn_t *c, nxt_buf_t *b, size_t limit)
{
nxt_openssl_conn_t *ssltls;
@@ -445,7 +441,7 @@ nxt_openssl_conn_io_write_chunk(nxt_event_conn_t *c, nxt_buf_t *b, size_t limit)
static ssize_t
-nxt_openssl_conn_io_send(nxt_event_conn_t *c, void *buf, size_t size)
+nxt_openssl_conn_io_send(nxt_conn_t *c, void *buf, size_t size)
{
int ret;
nxt_err_t err;
@@ -491,7 +487,7 @@ nxt_openssl_conn_io_shutdown(nxt_task_t *task, void *obj, void *data)
nxt_err_t err;
nxt_int_t n;
nxt_bool_t quiet, once;
- nxt_event_conn_t *c;
+ nxt_conn_t *c;
nxt_work_handler_t handler;
nxt_openssl_conn_t *ssltls;
@@ -586,7 +582,7 @@ done:
static nxt_int_t
-nxt_openssl_conn_test_error(nxt_task_t *task, nxt_event_conn_t *c, int ret,
+nxt_openssl_conn_test_error(nxt_task_t *task, nxt_conn_t *c, int ret,
nxt_err_t sys_err, nxt_work_handler_t handler)
{
u_long lib_err;
@@ -664,7 +660,7 @@ nxt_openssl_conn_test_error(nxt_task_t *task, nxt_event_conn_t *c, int ret,
static void nxt_cdecl
-nxt_openssl_conn_error(nxt_event_conn_t *c, nxt_err_t err, const char *fmt, ...)
+nxt_openssl_conn_error(nxt_conn_t *c, nxt_err_t err, const char *fmt, ...)
{
u_char *p, *end;
va_list args;
@@ -697,7 +693,7 @@ nxt_openssl_conn_error(nxt_event_conn_t *c, nxt_err_t err, const char *fmt, ...)
static nxt_uint_t
-nxt_openssl_log_error_level(nxt_event_conn_t *c, nxt_err_t err)
+nxt_openssl_log_error_level(nxt_conn_t *c, nxt_err_t err)
{
switch (ERR_GET_REASON(ERR_peek_error())) {
diff --git a/src/nxt_poll_engine.c b/src/nxt_poll_engine.c
index 90a8176e..607cd144 100644
--- a/src/nxt_poll_engine.c
+++ b/src/nxt_poll_engine.c
@@ -88,7 +88,7 @@ const nxt_event_interface_t nxt_poll_engine = {
NULL,
nxt_poll,
- &nxt_unix_event_conn_io,
+ &nxt_unix_conn_io,
NXT_NO_FILE_EVENTS,
NXT_NO_SIGNAL_EVENTS,
diff --git a/src/nxt_pollset_engine.c b/src/nxt_pollset_engine.c
index 571ad794..402f954c 100644
--- a/src/nxt_pollset_engine.c
+++ b/src/nxt_pollset_engine.c
@@ -79,7 +79,7 @@ const nxt_event_interface_t nxt_pollset_engine = {
NULL,
nxt_pollset_poll,
- &nxt_unix_event_conn_io,
+ &nxt_unix_conn_io,
NXT_NO_FILE_EVENTS,
NXT_NO_SIGNAL_EVENTS,
diff --git a/src/nxt_router.c b/src/nxt_router.c
index c87b48b8..67acc12d 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -71,8 +71,7 @@ static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data);
static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data);
static void nxt_router_conn_error(nxt_task_t *task, void *obj, void *data);
static void nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data);
-static nxt_msec_t nxt_router_conn_timeout_value(nxt_event_conn_t *c,
- uintptr_t data);
+static nxt_msec_t nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data);
nxt_int_t
@@ -207,8 +206,8 @@ nxt_router_stub_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
skcf->listen.handler = nxt_router_conn_init;
skcf->listen.mem_pool_size = nxt_listen_socket_pool_min_size(&skcf->listen)
- + sizeof(nxt_event_conn_proxy_t)
- + sizeof(nxt_event_conn_t)
+ + sizeof(nxt_conn_proxy_t)
+ + sizeof(nxt_conn_t)
+ 4 * sizeof(nxt_buf_t);
skcf->header_buffer_size = 2048;
@@ -222,8 +221,8 @@ nxt_router_stub_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
skcf->listen.handler = nxt_stream_connection_init;
skcf->listen.mem_pool_size = nxt_listen_socket_pool_min_size(&skcf->listen)
- + sizeof(nxt_event_conn_proxy_t)
- + sizeof(nxt_event_conn_t)
+ + sizeof(nxt_conn_proxy_t)
+ + sizeof(nxt_conn_t)
+ 4 * sizeof(nxt_buf_t);
skcf->header_read_timeout = 5000;
@@ -881,7 +880,7 @@ nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data)
}
-static const nxt_event_conn_state_t nxt_router_conn_read_state
+static const nxt_conn_state_t nxt_router_conn_read_state
nxt_aligned(64) =
{
.ready_handler = nxt_router_conn_http_header_parse,
@@ -898,7 +897,7 @@ static void
nxt_router_conn_init(nxt_task_t *task, void *obj, void *data)
{
size_t size;
- nxt_event_conn_t *c;
+ nxt_conn_t *c;
nxt_event_engine_t *engine;
nxt_socket_conf_joint_t *joint;
@@ -920,11 +919,11 @@ nxt_router_conn_init(nxt_task_t *task, void *obj, void *data)
c->read_state = &nxt_router_conn_read_state;
- nxt_event_conn_read(engine, c);
+ nxt_conn_read(engine, c);
}
-static const nxt_event_conn_state_t nxt_router_conn_write_state
+static const nxt_conn_state_t nxt_router_conn_write_state
nxt_aligned(64) =
{
.ready_handler = nxt_router_conn_close,
@@ -939,7 +938,7 @@ nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data)
size_t size;
nxt_int_t ret;
nxt_buf_t *b;
- nxt_event_conn_t *c;
+ nxt_conn_t *c;
nxt_socket_conf_joint_t *joint;
nxt_http_request_parse_t *rp;
@@ -996,7 +995,7 @@ nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data)
c->read = b;
}
- nxt_event_conn_read(task->thread->engine, c);
+ nxt_conn_read(task->thread->engine, c);
return;
}
@@ -1004,11 +1003,11 @@ nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data)
c->write->mem.pos = c->write->mem.start;
c->write_state = &nxt_router_conn_write_state;
- nxt_event_conn_write(task->thread->engine, c);
+ nxt_conn_write(task->thread->engine, c);
}
-static const nxt_event_conn_state_t nxt_router_conn_close_state
+static const nxt_conn_state_t nxt_router_conn_close_state
nxt_aligned(64) =
{
.ready_handler = nxt_router_conn_free,
@@ -1018,7 +1017,7 @@ static const nxt_event_conn_state_t nxt_router_conn_close_state
static void
nxt_router_conn_close(nxt_task_t *task, void *obj, void *data)
{
- nxt_event_conn_t *c;
+ nxt_conn_t *c;
c = obj;
@@ -1026,14 +1025,14 @@ nxt_router_conn_close(nxt_task_t *task, void *obj, void *data)
c->write_state = &nxt_router_conn_close_state;
- nxt_event_conn_close(task->thread->engine, c);
+ nxt_conn_close(task->thread->engine, c);
}
static void
nxt_router_conn_free(nxt_task_t *task, void *obj, void *data)
{
- nxt_event_conn_t *c;
+ nxt_conn_t *c;
nxt_socket_conf_joint_t *joint;
c = obj;
@@ -1050,7 +1049,7 @@ nxt_router_conn_free(nxt_task_t *task, void *obj, void *data)
static void
nxt_router_conn_error(nxt_task_t *task, void *obj, void *data)
{
- nxt_event_conn_t *c;
+ nxt_conn_t *c;
c = obj;
@@ -1058,30 +1057,30 @@ nxt_router_conn_error(nxt_task_t *task, void *obj, void *data)
c->write_state = &nxt_router_conn_close_state;
- nxt_event_conn_close(task->thread->engine, c);
+ nxt_conn_close(task->thread->engine, c);
}
static void
nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data)
{
- nxt_timer_t *timer;
- nxt_event_conn_t *c;
+ nxt_conn_t *c;
+ nxt_timer_t *timer;
timer = obj;
nxt_debug(task, "router conn timeout");
- c = nxt_event_read_timer_conn(timer);
+ c = nxt_read_timer_conn(timer);
c->write_state = &nxt_router_conn_close_state;
- nxt_event_conn_close(task->thread->engine, c);
+ nxt_conn_close(task->thread->engine, c);
}
static nxt_msec_t
-nxt_router_conn_timeout_value(nxt_event_conn_t *c, uintptr_t data)
+nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data)
{
nxt_socket_conf_joint_t *joint;
diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c
index 62b0e3b5..e35baa8f 100644
--- a/src/nxt_runtime.c
+++ b/src/nxt_runtime.c
@@ -497,9 +497,9 @@ nxt_runtime_quit(nxt_task_t *task)
static void
nxt_runtime_close_idle_connections(nxt_event_engine_t *engine)
{
+ nxt_conn_t *c;
nxt_queue_t *idle;
nxt_queue_link_t *link, *next;
- nxt_event_conn_t *c;
nxt_debug(&engine->task, "close idle connections");
@@ -510,11 +510,11 @@ nxt_runtime_close_idle_connections(nxt_event_engine_t *engine)
link = next)
{
next = nxt_queue_next(link);
- c = nxt_queue_link_data(link, nxt_event_conn_t, link);
+ c = nxt_queue_link_data(link, nxt_conn_t, link);
if (!c->socket.read_ready) {
nxt_queue_remove(link);
- nxt_event_conn_close(engine, c);
+ nxt_conn_close(engine, c);
}
}
}
diff --git a/src/nxt_select_engine.c b/src/nxt_select_engine.c
index 8a6f0710..6f760012 100644
--- a/src/nxt_select_engine.c
+++ b/src/nxt_select_engine.c
@@ -57,7 +57,7 @@ const nxt_event_interface_t nxt_select_engine = {
NULL,
nxt_select_poll,
- &nxt_unix_event_conn_io,
+ &nxt_unix_conn_io,
NXT_NO_FILE_EVENTS,
NXT_NO_SIGNAL_EVENTS,
diff --git a/src/nxt_sendbuf.c b/src/nxt_sendbuf.c
index e8fbe2a0..361cb0cd 100644
--- a/src/nxt_sendbuf.c
+++ b/src/nxt_sendbuf.c
@@ -202,8 +202,8 @@ nxt_sendbuf_file_coalesce(nxt_sendbuf_coalesce_t *sb)
ssize_t
-nxt_sendbuf_copy_coalesce(nxt_event_conn_t *c, nxt_buf_mem_t *bm,
- nxt_buf_t *b, size_t limit)
+nxt_sendbuf_copy_coalesce(nxt_conn_t *c, nxt_buf_mem_t *bm, nxt_buf_t *b,
+ size_t limit)
{
size_t size, bsize, copied;
ssize_t n;
diff --git a/src/nxt_sendbuf.h b/src/nxt_sendbuf.h
index 0b789583..b159bfe5 100644
--- a/src/nxt_sendbuf.h
+++ b/src/nxt_sendbuf.h
@@ -66,41 +66,41 @@ typedef struct {
#if (NXT_HAVE_LINUX_SENDFILE)
#define NXT_HAVE_SENDFILE 1
-ssize_t nxt_linux_event_conn_io_sendfile(nxt_event_conn_t *c, nxt_buf_t *b,
+ssize_t nxt_linux_event_conn_io_sendfile(nxt_conn_t *c, nxt_buf_t *b,
size_t limit);
#endif
#if (NXT_HAVE_FREEBSD_SENDFILE)
#define NXT_HAVE_SENDFILE 1
-ssize_t nxt_freebsd_event_conn_io_sendfile(nxt_event_conn_t *c, nxt_buf_t *b,
+ssize_t nxt_freebsd_event_conn_io_sendfile(nxt_conn_t *c, nxt_buf_t *b,
size_t limit);
#endif
#if (NXT_HAVE_SOLARIS_SENDFILEV)
#define NXT_HAVE_SENDFILE 1
-ssize_t nxt_solaris_event_conn_io_sendfilev(nxt_event_conn_t *c, nxt_buf_t *b,
+ssize_t nxt_solaris_event_conn_io_sendfilev(nxt_conn_t *c, nxt_buf_t *b,
size_t limit);
#endif
#if (NXT_HAVE_MACOSX_SENDFILE)
#define NXT_HAVE_SENDFILE 1
-ssize_t nxt_macosx_event_conn_io_sendfile(nxt_event_conn_t *c, nxt_buf_t *b,
+ssize_t nxt_macosx_event_conn_io_sendfile(nxt_conn_t *c, nxt_buf_t *b,
size_t limit);
#endif
#if (NXT_HAVE_AIX_SEND_FILE)
#define NXT_HAVE_SENDFILE 1
-ssize_t nxt_aix_event_conn_io_send_file(nxt_event_conn_t *c, nxt_buf_t *b,
+ssize_t nxt_aix_event_conn_io_send_file(nxt_conn_t *c, nxt_buf_t *b,
size_t limit);
#endif
#if (NXT_HAVE_HPUX_SENDFILE)
#define NXT_HAVE_SENDFILE 1
-ssize_t nxt_hpux_event_conn_io_sendfile(nxt_event_conn_t *c, nxt_buf_t *b,
+ssize_t nxt_hpux_event_conn_io_sendfile(nxt_conn_t *c, nxt_buf_t *b,
size_t limit);
#endif
-ssize_t nxt_event_conn_io_sendbuf(nxt_event_conn_t *c, nxt_buf_t *b,
+ssize_t nxt_event_conn_io_sendbuf(nxt_conn_t *c, nxt_buf_t *b,
size_t limit);
@@ -116,7 +116,7 @@ size_t nxt_sendbuf_file_coalesce(nxt_sendbuf_coalesce_t *sb);
* SSL/TLS libraries which lack vector I/O interface yet add noticeable
* overhead to each SSL/TLS record.
*/
-ssize_t nxt_sendbuf_copy_coalesce(nxt_event_conn_t *c, nxt_buf_mem_t *bm,
+ssize_t nxt_sendbuf_copy_coalesce(nxt_conn_t *c, nxt_buf_mem_t *bm,
nxt_buf_t *b, size_t limit);
nxt_buf_t *nxt_sendbuf_update(nxt_buf_t *b, size_t sent);
diff --git a/src/nxt_ssltls.h b/src/nxt_ssltls.h
index aa32348d..f12335a7 100644
--- a/src/nxt_ssltls.h
+++ b/src/nxt_ssltls.h
@@ -35,8 +35,7 @@ typedef struct {
struct nxt_ssltls_conf_s {
void *ctx;
void (*conn_init)(nxt_task_t *task,
- nxt_ssltls_conf_t *conf,
- nxt_event_conn_t *c);
+ nxt_ssltls_conf_t *conf, nxt_conn_t *c);
const nxt_ssltls_lib_t *lib;
diff --git a/src/nxt_stream_module.c b/src/nxt_stream_module.c
index e89cd6fd..6000daaf 100644
--- a/src/nxt_stream_module.c
+++ b/src/nxt_stream_module.c
@@ -17,8 +17,8 @@ static void nxt_stream_connection_close(nxt_task_t *task, void *obj,
void
nxt_stream_connection_init(nxt_task_t *task, void *obj, void *data)
{
+ nxt_conn_t *c;
nxt_runtime_t *rt;
- nxt_event_conn_t *c;
nxt_upstream_peer_t *up;
c = obj;
@@ -57,8 +57,8 @@ fail:
static void
nxt_stream_connection_peer(nxt_task_t *task, nxt_upstream_peer_t *up)
{
- nxt_event_conn_t *c;
- nxt_event_conn_proxy_t *p;
+ nxt_conn_t *c;
+ nxt_conn_proxy_t *p;
c = up->data;
@@ -67,7 +67,7 @@ nxt_stream_connection_peer(nxt_task_t *task, nxt_upstream_peer_t *up)
nxt_log_debug(c->socket.log, "stream connection peer %*s",
up->sockaddr->length, nxt_sockaddr_start(up->sockaddr));
- p = nxt_event_conn_proxy_create(c);
+ p = nxt_conn_proxy_create(c);
if (nxt_slow_path(p == NULL)) {
goto fail;
}
@@ -107,7 +107,7 @@ nxt_stream_connection_peer(nxt_task_t *task, nxt_upstream_peer_t *up)
rate->last = engine->timers.now;
}
- nxt_event_conn_proxy(task, p);
+ nxt_conn_proxy(task, p);
return;
fail:
diff --git a/src/nxt_stream_source.c b/src/nxt_stream_source.c
index 9b667b99..66ec1640 100644
--- a/src/nxt_stream_source.c
+++ b/src/nxt_stream_source.c
@@ -58,7 +58,7 @@ nxt_stream_source_connect(nxt_task_t *task, nxt_stream_source_t *stream)
stream->conn = c;
c->socket.data = stream;
- nxt_event_conn_work_queue_set(c, us->work_queue);
+ nxt_conn_work_queue_set(c, us->work_queue);
c->remote = us->peer->sockaddr;
c->write_state = &nxt_stream_source_connect_state;
@@ -158,7 +158,7 @@ nxt_stream_source_write_ready(nxt_task_t *task, void *obj, void *data)
nxt_debug(task, "stream source write ready fd:%d", c->socket.fd);
- nxt_event_conn_read(task, c);
+ nxt_conn_read(task, c);
}
@@ -212,7 +212,7 @@ nxt_stream_source_read_ready(nxt_task_t *task, void *obj, void *data)
c->read_state = &nxt_stream_source_response_read_state;
- nxt_event_conn_read(task, c);
+ nxt_conn_read(task, c);
return;
fail:
@@ -425,7 +425,7 @@ nxt_stream_source_closed(nxt_task_t *task, void *obj, void *data)
nxt_debug(task, "stream source closed fd:%d", c->socket.fd);
- nxt_event_conn_close(task, c);
+ nxt_conn_close(task, c);
b = nxt_buf_sync_alloc(stream->upstream->buffers.mem_pool,
NXT_BUF_SYNC_LAST);
@@ -463,7 +463,7 @@ nxt_stream_source_error(nxt_task_t *task, void *obj, void *data)
static void
nxt_stream_source_close(nxt_task_t *task, nxt_stream_source_t *stream)
{
- nxt_event_conn_close(task, stream->conn);
+ nxt_conn_close(task, stream->conn);
stream->error_handler(task, stream);
}
diff --git a/src/nxt_stream_source.h b/src/nxt_stream_source.h
index 27a6bfc8..2d57073f 100644
--- a/src/nxt_stream_source.h
+++ b/src/nxt_stream_source.h
@@ -14,7 +14,7 @@ typedef void (*nxt_stream_source_handler_t)(nxt_task_t *task,
nxt_stream_source_t *s);
struct nxt_stream_source_s {
- nxt_event_conn_t *conn;
+ nxt_conn_t *conn;
nxt_source_hook_t *next;
nxt_upstream_source_t *upstream;
diff --git a/src/nxt_worker_process.c b/src/nxt_worker_process.c
index b6266520..6407c734 100644
--- a/src/nxt_worker_process.c
+++ b/src/nxt_worker_process.c
@@ -45,12 +45,12 @@ const nxt_sig_event_t nxt_worker_process_signals[] = {
static void
nxt_worker_process_quit(nxt_task_t *task)
{
- nxt_uint_t n;
- nxt_queue_t *listen;
- nxt_runtime_t *rt;
- nxt_queue_link_t *link, *next;
- nxt_listen_socket_t *ls;
- nxt_event_conn_listen_t *cls;
+ nxt_uint_t n;
+ nxt_queue_t *listen;
+ nxt_runtime_t *rt;
+ nxt_queue_link_t *link, *next;
+ nxt_listen_event_t *lev;
+ nxt_listen_socket_t *ls;
rt = task->thread->runtime;
@@ -63,10 +63,10 @@ nxt_worker_process_quit(nxt_task_t *task)
link = next)
{
next = nxt_queue_next(link);
- cls = nxt_queue_link_data(link, nxt_event_conn_listen_t, link);
+ lev = nxt_queue_link_data(link, nxt_listen_event_t, link);
nxt_queue_remove(link);
- nxt_fd_event_close(task->thread->engine, &cls->socket);
+ nxt_fd_event_close(task->thread->engine, &lev->socket);
}
if (rt->listen_sockets != NULL) {