diff options
author | Igor Sysoev <igor@sysoev.ru> | 2017-06-14 15:18:52 +0300 |
---|---|---|
committer | Igor Sysoev <igor@sysoev.ru> | 2017-06-14 15:18:52 +0300 |
commit | 7574c64992b98d3dfbc3dd101bd0f7d78bad0823 (patch) | |
tree | 3a98c46e88d9023df34be3e6cce4f762d53aad36 /src | |
parent | 3e2632688f53c4cb08e7ac03c61e71facd038df4 (diff) | |
download | unit-7574c64992b98d3dfbc3dd101bd0f7d78bad0823.tar.gz unit-7574c64992b98d3dfbc3dd101bd0f7d78bad0823.tar.bz2 |
nxt_event_conn_... functions and structures have been renamed
to nxt_conn_...
Diffstat (limited to 'src')
36 files changed, 1915 insertions, 2002 deletions
diff --git a/src/nxt_application.c b/src/nxt_application.c index 293807b0..a07a9038 100644 --- a/src/nxt_application.c +++ b/src/nxt_application.c @@ -17,19 +17,19 @@ static nxt_int_t nxt_app_listen_socket(nxt_task_t *task, nxt_runtime_t *rt); static void nxt_app_thread(void *ctx); static nxt_app_request_t *nxt_app_request_create(nxt_socket_t s, nxt_log_t *log); -static void nxt_app_conn_update(nxt_thread_t *thr, nxt_event_conn_t *c, +static void nxt_app_conn_update(nxt_thread_t *thr, nxt_conn_t *c, nxt_log_t *log); static nxt_int_t nxt_app_write_finish(nxt_app_request_t *r); -static void nxt_app_buf_send(nxt_event_conn_t *c, nxt_buf_t *out); +static void nxt_app_buf_send(nxt_conn_t *c, nxt_buf_t *out); static void nxt_app_delivery_handler(nxt_task_t *task, void *obj, void *data); static void nxt_app_delivery_ready(nxt_task_t *task, void *obj, void *data); static void nxt_app_delivery_completion(nxt_task_t *task, void *obj, void *data); static void nxt_app_delivery_error(nxt_task_t *task, void *obj, void *data); static void nxt_app_delivery_timeout(nxt_task_t *task, void *obj, void *data); -static nxt_msec_t nxt_app_delivery_timer_value(nxt_event_conn_t *c, +static nxt_msec_t nxt_app_delivery_timer_value(nxt_conn_t *c, uintptr_t data); -static void nxt_app_delivery_done(nxt_task_t *task, nxt_event_conn_t *c); +static void nxt_app_delivery_done(nxt_task_t *task, nxt_conn_t *c); static void nxt_app_close_request(nxt_task_t *task, void *obj, void *data); @@ -256,8 +256,8 @@ nxt_app_thread(void *ctx) static nxt_app_request_t * nxt_app_request_create(nxt_socket_t s, nxt_log_t *log) { + nxt_conn_t *c; nxt_mem_pool_t *mp; - nxt_event_conn_t *c; nxt_app_request_t *r; mp = nxt_mem_pool_create(1024); @@ -270,7 +270,7 @@ nxt_app_request_create(nxt_socket_t s, nxt_log_t *log) return NULL; } - c = nxt_mem_zalloc(mp, sizeof(nxt_event_conn_t)); + c = nxt_mem_zalloc(mp, sizeof(nxt_conn_t)); if (nxt_slow_path(c == NULL)) { return NULL; } @@ -534,7 +534,7 @@ nxt_app_http_process_headers(nxt_app_request_t *r) static void -nxt_app_conn_update(nxt_thread_t *thr, nxt_event_conn_t *c, nxt_log_t *log) +nxt_app_conn_update(nxt_thread_t *thr, nxt_conn_t *c, nxt_log_t *log) { c->socket.write_ready = 1; @@ -562,8 +562,8 @@ nxt_app_conn_update(nxt_thread_t *thr, nxt_event_conn_t *c, nxt_log_t *log) c->read_work_queue = &thr->engine->read_work_queue; c->write_work_queue = &thr->engine->write_work_queue; - nxt_event_conn_timer_init(&c->read_timer, c, c->socket.read_work_queue); - nxt_event_conn_timer_init(&c->write_timer, c, c->socket.write_work_queue); + nxt_conn_timer_init(&c->read_timer, c, c->socket.read_work_queue); + nxt_conn_timer_init(&c->write_timer, c, c->socket.write_work_queue); nxt_log_debug(&c->log, "event connections: %uD", thr->engine->connections); } @@ -770,7 +770,7 @@ nxt_app_write_finish(nxt_app_request_t *r) static void -nxt_app_buf_send(nxt_event_conn_t *c, nxt_buf_t *out) +nxt_app_buf_send(nxt_conn_t *c, nxt_buf_t *out) { nxt_app_buf_t *ab; @@ -785,9 +785,9 @@ nxt_app_buf_send(nxt_event_conn_t *c, nxt_buf_t *out) static void nxt_app_delivery_handler(nxt_task_t *task, void *obj, void *data) { - nxt_buf_t *b; - nxt_mem_pool_t *mp; - nxt_event_conn_t *c; + nxt_buf_t *b; + nxt_conn_t *c; + nxt_mem_pool_t *mp; c = obj; b = data; @@ -820,7 +820,7 @@ nxt_app_delivery_handler(nxt_task_t *task, void *obj, void *data) c->write = b; c->write_state = &nxt_app_delivery_write_state; - nxt_event_conn_write(task->thread->engine, c); + nxt_conn_write(task->thread->engine, c); } @@ -840,8 +840,8 @@ static const nxt_event_conn_state_t nxt_app_delivery_write_state static void nxt_app_delivery_ready(nxt_task_t *task, void *obj, void *data) { - nxt_buf_t *b, *next; - nxt_event_conn_t *c; + nxt_buf_t *b, *next; + nxt_conn_t *c; c = obj; @@ -875,8 +875,8 @@ static const nxt_event_conn_state_t nxt_app_delivery_close_state static void nxt_app_delivery_completion(nxt_task_t *task, void *obj, void *data) { - nxt_buf_t *b, *bn, *free; - nxt_event_conn_t *c; + nxt_buf_t *b, *bn, *free; + nxt_conn_t *c; nxt_app_request_t *r; nxt_debug(task, "app delivery completion"); @@ -902,7 +902,7 @@ nxt_app_delivery_completion(nxt_task_t *task, void *obj, void *data) c = r->event_conn; c->write_state = &nxt_app_delivery_close_state; - nxt_event_conn_close(task->thread->engine, c); + nxt_conn_close(task->thread->engine, c); } } @@ -929,7 +929,7 @@ nxt_app_delivery_completion(nxt_task_t *task, void *obj, void *data) static void nxt_app_delivery_error(nxt_task_t *task, void *obj, void *data) { - nxt_event_conn_t *c; + nxt_conn_t *c; c = obj; @@ -942,7 +942,7 @@ nxt_app_delivery_error(nxt_task_t *task, void *obj, void *data) static void nxt_app_delivery_timeout(nxt_task_t *task, void *obj, void *data) { - nxt_event_conn_t *c; + nxt_conn_t *c; c = obj; @@ -953,7 +953,7 @@ nxt_app_delivery_timeout(nxt_task_t *task, void *obj, void *data) static nxt_msec_t -nxt_app_delivery_timer_value(nxt_event_conn_t *c, uintptr_t data) +nxt_app_delivery_timer_value(nxt_conn_t *c, uintptr_t data) { /* 30000 ms */ return 30000; @@ -961,7 +961,7 @@ nxt_app_delivery_timer_value(nxt_event_conn_t *c, uintptr_t data) static void -nxt_app_delivery_done(nxt_task_t *task, nxt_event_conn_t *c) +nxt_app_delivery_done(nxt_task_t *task, nxt_conn_t *c) { if (c->write == NULL) { return; @@ -981,7 +981,7 @@ nxt_app_delivery_done(nxt_task_t *task, nxt_event_conn_t *c) static void nxt_app_close_request(nxt_task_t *task, void *obj, void *data) { - nxt_event_conn_t *c; + nxt_conn_t *c; nxt_app_request_t *r; c = obj; diff --git a/src/nxt_application.h b/src/nxt_application.h index c2619ded..5fd49667 100644 --- a/src/nxt_application.h +++ b/src/nxt_application.h @@ -29,7 +29,7 @@ typedef struct { typedef struct { nxt_event_engine_t *engine; nxt_mem_pool_t *mem_pool; - nxt_event_conn_t *event_conn; + nxt_conn_t *event_conn; nxt_log_t *log; nxt_buf_t *output_buf; diff --git a/src/nxt_event_conn.c b/src/nxt_conn.c index b8adb00a..3f21747e 100644 --- a/src/nxt_event_conn.c +++ b/src/nxt_conn.c @@ -7,13 +7,13 @@ #include <nxt_main.h> -nxt_event_conn_io_t nxt_unix_event_conn_io = { - nxt_event_conn_io_connect, - nxt_event_conn_io_accept, +nxt_conn_io_t nxt_unix_conn_io = { + nxt_conn_io_connect, + nxt_conn_io_accept, - nxt_event_conn_io_read, - nxt_event_conn_io_recvbuf, - nxt_event_conn_io_recv, + nxt_conn_io_read, + nxt_conn_io_recvbuf, + nxt_conn_io_recv, nxt_conn_io_write, nxt_event_conn_io_write_chunk, @@ -37,17 +37,17 @@ nxt_event_conn_io_t nxt_unix_event_conn_io = { nxt_event_conn_io_writev, nxt_event_conn_io_send, - nxt_event_conn_io_shutdown, + nxt_conn_io_shutdown, }; -nxt_event_conn_t * -nxt_event_conn_create(nxt_mem_pool_t *mp, nxt_task_t *task) +nxt_conn_t * +nxt_conn_create(nxt_mem_pool_t *mp, nxt_task_t *task) { - nxt_thread_t *thr; - nxt_event_conn_t *c; + nxt_conn_t *c; + nxt_thread_t *thr; - c = nxt_mem_zalloc(mp, sizeof(nxt_event_conn_t)); + c = nxt_mem_zalloc(mp, sizeof(nxt_conn_t)); if (nxt_slow_path(c == NULL)) { return NULL; } @@ -82,22 +82,22 @@ nxt_event_conn_create(nxt_mem_pool_t *mp, nxt_task_t *task) c->socket.read_work_queue = &thr->engine->fast_work_queue; c->socket.write_work_queue = &thr->engine->fast_work_queue; - nxt_event_conn_timer_init(&c->read_timer, c, c->socket.read_work_queue); - nxt_event_conn_timer_init(&c->write_timer, c, c->socket.write_work_queue); + nxt_conn_timer_init(&c->read_timer, c, c->socket.read_work_queue); + nxt_conn_timer_init(&c->write_timer, c, c->socket.write_work_queue); - nxt_log_debug(&c->log, "event connections: %uD", thr->engine->connections); + nxt_log_debug(&c->log, "connections: %uD", thr->engine->connections); return c; } void -nxt_event_conn_io_shutdown(nxt_task_t *task, void *obj, void *data) +nxt_conn_io_shutdown(nxt_task_t *task, void *obj, void *data) { - int ret; - socklen_t len; - struct linger linger; - nxt_event_conn_t *c; + int ret; + socklen_t len; + nxt_conn_t *c; + struct linger linger; c = obj; @@ -126,24 +126,24 @@ nxt_event_conn_io_shutdown(nxt_task_t *task, void *obj, void *data) void -nxt_event_conn_timer(nxt_event_engine_t *engine, nxt_event_conn_t *c, - const nxt_event_conn_state_t *state, nxt_timer_t *tev) +nxt_conn_timer(nxt_event_engine_t *engine, nxt_conn_t *c, + const nxt_conn_state_t *state, nxt_timer_t *timer) { - nxt_msec_t timer; + nxt_msec_t value; if (state->timer_value != NULL) { - timer = state->timer_value(c, state->timer_data); + value = state->timer_value(c, state->timer_data); - if (timer != 0) { - tev->handler = state->timer_handler; - nxt_timer_add(engine, tev, timer); + if (value != 0) { + timer->handler = state->timer_handler; + nxt_timer_add(engine, timer, value); } } } void -nxt_event_conn_work_queue_set(nxt_event_conn_t *c, nxt_work_queue_t *wq) +nxt_conn_work_queue_set(nxt_conn_t *c, nxt_work_queue_t *wq) { c->read_work_queue = wq; c->write_work_queue = wq; diff --git a/src/nxt_event_conn.h b/src/nxt_conn.h index 02c088b8..491531b0 100644 --- a/src/nxt_event_conn.h +++ b/src/nxt_conn.h @@ -4,12 +4,11 @@ * Copyright (C) NGINX, Inc. */ -#ifndef _NXT_EVENT_CONN_H_INCLUDED_ -#define _NXT_EVENT_CONN_H_INCLUDED_ +#ifndef _NXT_CONN_H_INCLUDED_ +#define _NXT_CONN_H_INCLUDED_ -typedef nxt_msec_t (*nxt_event_conn_timer_val_t)(nxt_event_conn_t *c, - uintptr_t data); +typedef nxt_msec_t (*nxt_conn_timer_value_t)(nxt_conn_t *c, uintptr_t data); typedef struct { @@ -18,11 +17,11 @@ typedef struct { nxt_work_handler_t error_handler; nxt_work_handler_t timer_handler; - nxt_event_conn_timer_val_t timer_value; + nxt_conn_timer_value_t timer_value; uintptr_t timer_data; uint8_t timer_autoreset; -} nxt_event_conn_state_t; +} nxt_conn_state_t; typedef struct { @@ -49,9 +48,9 @@ typedef struct { */ nxt_work_handler_t read; - ssize_t (*recvbuf)(nxt_event_conn_t *c, nxt_buf_t *b); + ssize_t (*recvbuf)(nxt_conn_t *c, nxt_buf_t *b); - ssize_t (*recv)(nxt_event_conn_t *c, void *buf, + ssize_t (*recv)(nxt_conn_t *c, void *buf, size_t size, nxt_uint_t flags); /* @@ -66,35 +65,35 @@ typedef struct { * buffers data and calls the library specific send() interface to write * the buffered data eventually. */ - ssize_t (*write_chunk)(nxt_event_conn_t *c, + ssize_t (*write_chunk)(nxt_conn_t *c, nxt_buf_t *b, size_t limit); /* * The sendbuf() is an interface for OS-specific sendfile * implementations or simple writev(). */ - ssize_t (*sendbuf)(nxt_event_conn_t *c, nxt_buf_t *b, + ssize_t (*sendbuf)(nxt_conn_t *c, nxt_buf_t *b, size_t limit); /* * The writev() is an interface to write several nxt_iobuf_t buffers. */ - ssize_t (*writev)(nxt_event_conn_t *c, + ssize_t (*writev)(nxt_conn_t *c, nxt_iobuf_t *iob, nxt_uint_t niob); /* * The send() is an interface to write a single buffer. SSL/TLS * libraries' send() interface handles also the libraries' errors. */ - ssize_t (*send)(nxt_event_conn_t *c, void *buf, + ssize_t (*send)(nxt_conn_t *c, void *buf, size_t size); nxt_work_handler_t shutdown; -} nxt_event_conn_io_t; +} nxt_conn_io_t; /* - * The nxt_event_conn_listen_t is separated from nxt_listen_socket_t + * The nxt_listen_event_t is separated from nxt_listen_socket_t * because nxt_listen_socket_t is one per process whilst each worker - * thread uses own nxt_event_conn_listen_t. + * thread uses own nxt_listen_event_t. */ typedef struct { /* Must be the first field. */ @@ -109,31 +108,29 @@ typedef struct { nxt_work_handler_t accept; nxt_listen_socket_t *listen; - nxt_event_conn_t *next; /* STUB */; + nxt_conn_t *next; /* STUB */; nxt_work_queue_t *work_queue; nxt_timer_t timer; nxt_queue_link_t link; -} nxt_event_conn_listen_t; +} nxt_listen_event_t; -typedef nxt_event_conn_listen_t nxt_listen_event_t; - -struct nxt_event_conn_s { +struct nxt_conn_s { /* * Must be the first field, since nxt_fd_event_t - * and nxt_event_conn_t are used interchangeably. + * and nxt_conn_t are used interchangeably. */ nxt_fd_event_t socket; nxt_buf_t *read; - const nxt_event_conn_state_t *read_state; + const nxt_conn_state_t *read_state; nxt_work_queue_t *read_work_queue; nxt_timer_t read_timer; nxt_buf_t *write; - const nxt_event_conn_state_t *write_state; + const nxt_conn_state_t *write_state; nxt_work_queue_t *write_work_queue; nxt_event_write_rate_t *rate; nxt_timer_t write_timer; @@ -142,7 +139,7 @@ struct nxt_event_conn_s { uint32_t max_chunk; uint32_t nbytes; - nxt_event_conn_io_t *io; + nxt_conn_io_t *io; #if (NXT_SSLTLS || NXT_THREADS) /* SunC does not support "zero-sized struct/union". */ @@ -163,7 +160,7 @@ struct nxt_event_conn_s { nxt_task_t task; nxt_log_t log; - nxt_event_conn_listen_t *listen; + nxt_listen_event_t *listen; nxt_sockaddr_t *remote; nxt_sockaddr_t *local; const char *action; @@ -183,8 +180,7 @@ struct nxt_event_conn_s { }; -#define \ -nxt_event_conn_timer_init(ev, c, wq) \ +#define nxt_conn_timer_init(ev, c, wq) \ do { \ (ev)->work_queue = (wq); \ (ev)->log = &(c)->log; \ @@ -192,20 +188,17 @@ nxt_event_conn_timer_init(ev, c, wq) \ } while (0) -#define \ -nxt_event_read_timer_conn(ev) \ - nxt_timer_data(ev, nxt_event_conn_t, read_timer) +#define nxt_read_timer_conn(ev) \ + nxt_timer_data(ev, nxt_conn_t, read_timer) -#define \ -nxt_event_write_timer_conn(ev) \ - nxt_timer_data(ev, nxt_event_conn_t, write_timer) +#define nxt_write_timer_conn(ev) \ + nxt_timer_data(ev, nxt_conn_t, write_timer) #if (NXT_HAVE_UNIX_DOMAIN) -#define \ -nxt_event_conn_tcp_nodelay_on(task, c) \ +#define nxt_conn_tcp_nodelay_on(task, c) \ do { \ nxt_int_t ret; \ \ @@ -220,8 +213,7 @@ nxt_event_conn_tcp_nodelay_on(task, c) \ #else -#define \ -nxt_event_conn_tcp_nodelay_on(task, c) \ +#define nxt_conn_tcp_nodelay_on(task, c) \ do { \ nxt_int_t ret; \ \ @@ -234,39 +226,34 @@ nxt_event_conn_tcp_nodelay_on(task, c) \ #endif -NXT_EXPORT nxt_event_conn_t *nxt_event_conn_create(nxt_mem_pool_t *mp, - nxt_task_t *task); -void nxt_event_conn_io_shutdown(nxt_task_t *task, void *obj, void *data); -NXT_EXPORT void nxt_event_conn_close(nxt_event_engine_t *engine, - nxt_event_conn_t *c); +NXT_EXPORT nxt_conn_t *nxt_conn_create(nxt_mem_pool_t *mp, nxt_task_t *task); +void nxt_conn_io_shutdown(nxt_task_t *task, void *obj, void *data); +NXT_EXPORT void nxt_conn_close(nxt_event_engine_t *engine, nxt_conn_t *c); -NXT_EXPORT void nxt_event_conn_timer(nxt_event_engine_t *engine, - nxt_event_conn_t *c, const nxt_event_conn_state_t *state, nxt_timer_t *tev); -NXT_EXPORT void nxt_event_conn_work_queue_set(nxt_event_conn_t *c, - nxt_work_queue_t *wq); +NXT_EXPORT void nxt_conn_timer(nxt_event_engine_t *engine, nxt_conn_t *c, + const nxt_conn_state_t *state, nxt_timer_t *tev); +NXT_EXPORT void nxt_conn_work_queue_set(nxt_conn_t *c, nxt_work_queue_t *wq); -void nxt_event_conn_sys_socket(nxt_task_t *task, void *obj, void *data); -void nxt_event_conn_io_connect(nxt_task_t *task, void *obj, void *data); -nxt_int_t nxt_event_conn_socket(nxt_task_t *task, nxt_event_conn_t *c); -void nxt_event_conn_connect_test(nxt_task_t *task, void *obj, void *data); -void nxt_event_conn_connect_error(nxt_task_t *task, void *obj, void *data); +void nxt_conn_sys_socket(nxt_task_t *task, void *obj, void *data); +void nxt_conn_io_connect(nxt_task_t *task, void *obj, void *data); +nxt_int_t nxt_conn_socket(nxt_task_t *task, nxt_conn_t *c); +void nxt_conn_connect_test(nxt_task_t *task, void *obj, void *data); +void nxt_conn_connect_error(nxt_task_t *task, void *obj, void *data); -NXT_EXPORT nxt_event_conn_listen_t *nxt_listen_event(nxt_task_t *task, - nxt_listen_socket_t *ls); -NXT_EXPORT nxt_int_t nxt_event_conn_listen(nxt_task_t *task, +NXT_EXPORT nxt_listen_event_t *nxt_listen_event(nxt_task_t *task, nxt_listen_socket_t *ls); -void nxt_event_conn_io_accept(nxt_task_t *task, void *obj, void *data); -NXT_EXPORT void nxt_event_conn_accept(nxt_task_t *task, - nxt_event_conn_listen_t *cls, nxt_event_conn_t *c); -void nxt_event_conn_accept_error(nxt_task_t *task, nxt_event_conn_listen_t *cls, +void nxt_conn_io_accept(nxt_task_t *task, void *obj, void *data); +NXT_EXPORT void nxt_conn_accept(nxt_task_t *task, nxt_listen_event_t *lev, + nxt_conn_t *c); +void nxt_conn_accept_error(nxt_task_t *task, nxt_listen_event_t *lev, const char *accept_syscall, nxt_err_t err); -void nxt_conn_wait(nxt_event_conn_t *c); +void nxt_conn_wait(nxt_conn_t *c); -void nxt_event_conn_io_read(nxt_task_t *task, void *obj, void *data); -ssize_t nxt_event_conn_io_recvbuf(nxt_event_conn_t *c, nxt_buf_t *b); -ssize_t nxt_event_conn_io_recv(nxt_event_conn_t *c, void *buf, - size_t size, nxt_uint_t flags); +void nxt_conn_io_read(nxt_task_t *task, void *obj, void *data); +ssize_t nxt_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b); +ssize_t nxt_conn_io_recv(nxt_conn_t *c, void *buf, size_t size, + nxt_uint_t flags); void nxt_conn_io_write(nxt_task_t *task, void *obj, void *data); ssize_t nxt_conn_io_sendbuf(nxt_task_t *task, nxt_sendbuf_t *sb); @@ -275,28 +262,25 @@ ssize_t nxt_conn_io_writev(nxt_task_t *task, nxt_sendbuf_t *sb, ssize_t nxt_conn_io_send(nxt_task_t *task, nxt_sendbuf_t *sb, void *buf, size_t size); -size_t nxt_event_conn_write_limit(nxt_event_conn_t *c); +size_t nxt_event_conn_write_limit(nxt_conn_t *c); nxt_bool_t nxt_event_conn_write_delayed(nxt_event_engine_t *engine, - nxt_event_conn_t *c, size_t sent); -ssize_t nxt_event_conn_io_write_chunk(nxt_event_conn_t *c, nxt_buf_t *b, + nxt_conn_t *c, size_t sent); +ssize_t nxt_event_conn_io_write_chunk(nxt_conn_t *c, nxt_buf_t *b, size_t limit); -ssize_t nxt_event_conn_io_writev(nxt_event_conn_t *c, nxt_iobuf_t *iob, +ssize_t nxt_event_conn_io_writev(nxt_conn_t *c, nxt_iobuf_t *iob, nxt_uint_t niob); -ssize_t nxt_event_conn_io_send(nxt_event_conn_t *c, void *buf, size_t size); - -NXT_EXPORT void nxt_event_conn_io_close(nxt_task_t *task, void *obj, - void *data); +ssize_t nxt_event_conn_io_send(nxt_conn_t *c, void *buf, size_t size); NXT_EXPORT void nxt_event_conn_job_sendfile(nxt_task_t *task, - nxt_event_conn_t *c); + nxt_conn_t *c); -#define nxt_event_conn_connect(engine, c) \ - nxt_work_queue_add(&engine->socket_work_queue, nxt_event_conn_sys_socket, \ +#define nxt_conn_connect(engine, c) \ + nxt_work_queue_add(&engine->socket_work_queue, nxt_conn_sys_socket, \ c->socket.task, c, c->socket.data) -#define nxt_event_conn_read(engine, c) \ +#define nxt_conn_read(engine, c) \ do { \ nxt_event_engine_t *e = engine; \ \ @@ -307,7 +291,7 @@ NXT_EXPORT void nxt_event_conn_job_sendfile(nxt_task_t *task, } while (0) -#define nxt_event_conn_write(e, c) \ +#define nxt_conn_write(e, c) \ do { \ nxt_event_engine_t *engine = e; \ \ @@ -318,7 +302,7 @@ NXT_EXPORT void nxt_event_conn_job_sendfile(nxt_task_t *task, } while (0) -extern nxt_event_conn_io_t nxt_unix_event_conn_io; +extern nxt_conn_io_t nxt_unix_conn_io; typedef struct { @@ -326,8 +310,8 @@ typedef struct { * Client and peer connections are not embedded because already * existent connections can be switched to the event connection proxy. */ - nxt_event_conn_t *client; - nxt_event_conn_t *peer; + nxt_conn_t *client; + nxt_conn_t *peer; nxt_buf_t *client_buffer; nxt_buf_t *peer_buffer; @@ -347,13 +331,20 @@ typedef struct { uint8_t retain; /* 2 bits */ nxt_work_handler_t completion_handler; -} nxt_event_conn_proxy_t; +} nxt_conn_proxy_t; + + +NXT_EXPORT nxt_conn_proxy_t *nxt_conn_proxy_create(nxt_conn_t *c); +NXT_EXPORT void nxt_conn_proxy(nxt_task_t *task, nxt_conn_proxy_t *p); -NXT_EXPORT nxt_event_conn_proxy_t *nxt_event_conn_proxy_create( - nxt_event_conn_t *c); -NXT_EXPORT void nxt_event_conn_proxy(nxt_task_t *task, - nxt_event_conn_proxy_t *p); +/* STUB */ +#define nxt_event_conn_t nxt_conn_t +#define nxt_event_conn_state_t nxt_conn_state_t +#define nxt_event_conn_proxy_t nxt_conn_proxy_t +#define nxt_event_conn_read nxt_conn_read +#define nxt_event_conn_write nxt_conn_write +#define nxt_event_conn_close nxt_conn_close -#endif /* _NXT_EVENT_CONN_H_INCLUDED_ */ +#endif /* _NXT_CONN_H_INCLUDED_ */ diff --git a/src/nxt_conn_accept.c b/src/nxt_conn_accept.c new file mode 100644 index 00000000..eb0172f4 --- /dev/null +++ b/src/nxt_conn_accept.c @@ -0,0 +1,366 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) NGINX, Inc. + */ + +#include <nxt_main.h> + + +/* + * A listen socket handler calls an event facility specific io_accept() + * method. The method accept()s a new connection and then calls + * nxt_event_conn_accept() to handle the new connection and to prepare + * for a next connection to avoid just dropping next accept()ed socket + * if no more connections allowed. If there are no available connections + * an idle connection would be closed. If there are no idle connections + * then new connections will not be accept()ed for 1 second. + */ + + +static nxt_conn_t *nxt_conn_accept_alloc(nxt_task_t *task, + nxt_listen_event_t *lev); +static void nxt_conn_listen_handler(nxt_task_t *task, void *obj, + void *data); +static nxt_conn_t *nxt_conn_accept_next(nxt_task_t *task, + nxt_listen_event_t *lev); +static nxt_int_t nxt_conn_accept_close_idle(nxt_task_t *task, + nxt_listen_event_t *lev); +static void nxt_conn_listen_event_error(nxt_task_t *task, void *obj, + void *data); +static void nxt_conn_listen_timer_handler(nxt_task_t *task, void *obj, + void *data); + + +nxt_listen_event_t * +nxt_listen_event(nxt_task_t *task, nxt_listen_socket_t *ls) +{ + nxt_listen_event_t *lev; + nxt_event_engine_t *engine; + + lev = nxt_zalloc(sizeof(nxt_listen_event_t)); + + if (nxt_fast_path(lev != NULL)) { + lev->socket.fd = ls->socket; + + engine = task->thread->engine; + lev->batch = engine->batch; + + lev->socket.read_work_queue = &engine->accept_work_queue; + lev->socket.read_handler = nxt_conn_listen_handler; + lev->socket.error_handler = nxt_conn_listen_event_error; + lev->socket.log = &nxt_main_log; + + lev->accept = engine->event.io->accept; + + lev->listen = ls; + lev->work_queue = &engine->read_work_queue; + + lev->timer.work_queue = &engine->fast_work_queue; + lev->timer.handler = nxt_conn_listen_timer_handler; + lev->timer.log = &nxt_main_log; + + lev->task.thread = task->thread; + lev->task.log = &nxt_main_log; + lev->task.ident = nxt_task_next_ident(); + lev->socket.task = &lev->task; + lev->timer.task = &lev->task; + + if (nxt_conn_accept_alloc(task, lev) != NULL) { + nxt_fd_event_enable_accept(engine, &lev->socket); + + nxt_queue_insert_head(&engine->listen_connections, &lev->link); + } + + return lev; + } + + return NULL; +} + + +static nxt_conn_t * +nxt_conn_accept_alloc(nxt_task_t *task, nxt_listen_event_t *lev) +{ + nxt_conn_t *c; + nxt_sockaddr_t *sa, *remote; + nxt_mem_pool_t *mp; + nxt_event_engine_t *engine; + nxt_listen_socket_t *ls; + + engine = task->thread->engine; + + if (engine->connections < engine->max_connections) { + + mp = nxt_mem_pool_create(lev->listen->mem_pool_size); + + if (nxt_fast_path(mp != NULL)) { + /* This allocation cannot fail. */ + c = nxt_conn_create(mp, lev->socket.task); + + lev->next = c; + c->socket.read_work_queue = lev->socket.read_work_queue; + c->socket.write_ready = 1; + c->listen = lev; + + ls = lev->listen; + /* This allocation cannot fail. */ + remote = nxt_sockaddr_alloc(mp, ls->socklen, ls->address_length); + c->remote = remote; + + sa = ls->sockaddr; + remote->type = sa->type; + /* + * Set address family for unspecified Unix domain, + * because these sockaddr's are not be passed to accept(). + */ + remote->u.sockaddr.sa_family = sa->u.sockaddr.sa_family; + + return c; + } + } + + return NULL; +} + + +static void +nxt_conn_listen_handler(nxt_task_t *task, void *obj, void *data) +{ + nxt_listen_event_t *lev; + + lev = obj; + lev->ready = lev->batch; + + lev->accept(task, lev, data); +} + + +void +nxt_conn_io_accept(nxt_task_t *task, void *obj, void *data) +{ + socklen_t len; + nxt_conn_t *c; + nxt_socket_t s; + struct sockaddr *sa; + nxt_listen_event_t *lev; + + lev = obj; + c = lev->next; + + lev->ready--; + lev->socket.read_ready = (lev->ready != 0); + + len = c->remote->socklen; + + if (len >= sizeof(struct sockaddr)) { + sa = &c->remote->u.sockaddr; + + } else { + sa = NULL; + len = 0; + } + + s = accept(lev->socket.fd, sa, &len); + + if (s == -1) { + nxt_conn_accept_error(task, lev, "accept", nxt_socket_errno); + return; + } + + c->socket.fd = s; + +#if (NXT_LINUX) + /* + * Linux does not inherit non-blocking mode + * from listen socket for accept()ed socket. + */ + if (nxt_slow_path(nxt_socket_nonblocking(task, s) != NXT_OK)) { + nxt_socket_close(task, s); + } + +#endif + + nxt_debug(task, "accept(%d): %d", lev->socket.fd, s); + + nxt_conn_accept(task, lev, c); +} + + +void +nxt_conn_accept(nxt_task_t *task, nxt_listen_event_t *lev, nxt_conn_t *c) +{ + nxt_conn_t *next; + + nxt_sockaddr_text(c->remote); + + nxt_debug(task, "client: %*s", + c->remote->address_length, nxt_sockaddr_address(c->remote)); + + nxt_queue_insert_head(&task->thread->engine->idle_connections, &c->link); + + c->read_work_queue = lev->work_queue; + c->write_work_queue = lev->work_queue; + + if (lev->listen->read_after_accept) { + + //c->socket.read_ready = 1; +// lev->listen->handler(task, c, lev->socket.data); + nxt_work_queue_add(c->read_work_queue, lev->listen->handler, + task, c, lev->socket.data); + + } else { + nxt_work_queue_add(c->write_work_queue, lev->listen->handler, + task, c, lev->socket.data); + } + + next = nxt_conn_accept_next(task, lev); + + if (next != NULL && lev->socket.read_ready) { + nxt_work_queue_add(lev->socket.read_work_queue, + lev->accept, task, lev, next); + } +} + + +static nxt_conn_t * +nxt_conn_accept_next(nxt_task_t *task, nxt_listen_event_t *lev) +{ + nxt_conn_t *c; + + lev->next = NULL; + + do { + c = nxt_conn_accept_alloc(task, lev); + + if (nxt_fast_path(c != NULL)) { + return c; + } + + } while (nxt_conn_accept_close_idle(task, lev) == NXT_OK); + + nxt_log(task, NXT_LOG_CRIT, "no available connections, " + "new connections are not accepted within 1s"); + + return NULL; +} + + +static nxt_int_t +nxt_conn_accept_close_idle(nxt_task_t *task, nxt_listen_event_t *lev) +{ + nxt_conn_t *c; + nxt_queue_t *idle; + nxt_queue_link_t *link; + nxt_event_engine_t *engine; + + static nxt_log_moderation_t nxt_idle_close_log_moderation = { + NXT_LOG_INFO, 2, "idle connections closed", NXT_LOG_MODERATION + }; + + engine = task->thread->engine; + + idle = &engine->idle_connections; + + for (link = nxt_queue_last(idle); + link != nxt_queue_head(idle); + link = nxt_queue_next(link)) + { + c = nxt_queue_link_data(link, nxt_conn_t, link); + + if (!c->socket.read_ready) { + nxt_log_moderate(&nxt_idle_close_log_moderation, NXT_LOG_INFO, + task->log, "no available connections, " + "close idle connection"); + nxt_queue_remove(link); + nxt_conn_close(engine, c); + + return NXT_OK; + } + } + + nxt_timer_add(engine, &lev->timer, 1000); + + nxt_fd_event_disable_read(engine, &lev->socket); + + return NXT_DECLINED; +} + + +void +nxt_conn_accept_error(nxt_task_t *task, nxt_listen_event_t *lev, + const char *accept_syscall, nxt_err_t err) +{ + static nxt_log_moderation_t nxt_accept_log_moderation = { + NXT_LOG_INFO, 2, "accept() failed", NXT_LOG_MODERATION + }; + + lev->socket.read_ready = 0; + + switch (err) { + + case NXT_EAGAIN: + nxt_debug(task, "%s(%d) %E", accept_syscall, lev->socket.fd, err); + return; + + case ECONNABORTED: + nxt_log_moderate(&nxt_accept_log_moderation, NXT_LOG_WARN, + task->log, "%s(%d) failed %E", + accept_syscall, lev->socket.fd, err); + return; + + case EMFILE: + case ENFILE: + case ENOBUFS: + case ENOMEM: + if (nxt_conn_accept_close_idle(task, lev) != NXT_OK) { + nxt_log(task, NXT_LOG_CRIT, "%s(%d) failed %E, " + "new connections are not accepted within 1s", + accept_syscall, lev->socket.fd, err); + } + + return; + + default: + nxt_log(task, NXT_LOG_CRIT, "%s(%d) failed %E", + accept_syscall, lev->socket.fd, err); + return; + } +} + + +static void +nxt_conn_listen_timer_handler(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *c; + nxt_timer_t *timer; + nxt_listen_event_t *lev; + + timer = obj; + + lev = nxt_timer_data(timer, nxt_listen_event_t, timer); + c = lev->next; + + if (c == NULL) { + c = nxt_conn_accept_next(task, lev); + + if (c == NULL) { + return; + } + } + + nxt_fd_event_enable_accept(task->thread->engine, &lev->socket); + + lev->accept(task, lev, c); +} + + +static void +nxt_conn_listen_event_error(nxt_task_t *task, void *obj, void *data) +{ + nxt_fd_event_t *ev; + + ev = obj; + + nxt_log(task, NXT_LOG_CRIT, "accept(%d) event error", ev->fd); +} diff --git a/src/nxt_conn_close.c b/src/nxt_conn_close.c index 18b2450c..fb86d052 100644 --- a/src/nxt_conn_close.c +++ b/src/nxt_conn_close.c @@ -16,7 +16,7 @@ static void nxt_conn_close_error_ignore(nxt_task_t *task, void *obj, void -nxt_event_conn_close(nxt_event_engine_t *engine, nxt_event_conn_t *c) +nxt_conn_close(nxt_event_engine_t *engine, nxt_conn_t *c) { int ret; socklen_t len; @@ -69,7 +69,7 @@ nxt_event_conn_close(nxt_event_engine_t *engine, nxt_event_conn_t *c) static void nxt_conn_shutdown_handler(nxt_task_t *task, void *obj, void *data) { - nxt_event_conn_t *c; + nxt_conn_t *c; nxt_event_engine_t *engine; c = obj; @@ -90,7 +90,7 @@ static void nxt_conn_close_handler(nxt_task_t *task, void *obj, void *data) { nxt_uint_t events_pending, timers_pending; - nxt_event_conn_t *c; + nxt_conn_t *c; nxt_event_engine_t *engine; c = obj; @@ -132,12 +132,12 @@ nxt_conn_close_handler(nxt_task_t *task, void *obj, void *data) static void nxt_conn_close_timer_handler(nxt_task_t *task, void *obj, void *data) { - nxt_timer_t *timer; - nxt_event_conn_t *c; + nxt_conn_t *c; + nxt_timer_t *timer; timer = obj; - c = nxt_event_write_timer_conn(timer); + c = nxt_write_timer_conn(timer); nxt_debug(task, "conn close timer handler fd:%d", c->socket.fd); diff --git a/src/nxt_event_conn_connect.c b/src/nxt_conn_connect.c index 55aa33f9..94d25c30 100644 --- a/src/nxt_event_conn_connect.c +++ b/src/nxt_conn_connect.c @@ -8,14 +8,14 @@ void -nxt_event_conn_sys_socket(nxt_task_t *task, void *obj, void *data) +nxt_conn_sys_socket(nxt_task_t *task, void *obj, void *data) { - nxt_event_conn_t *c; + nxt_conn_t *c; nxt_work_handler_t handler; c = obj; - if (nxt_event_conn_socket(task, c) == NXT_OK) { + if (nxt_conn_socket(task, c) == NXT_OK) { c->socket.write_work_queue = c->write_work_queue; handler = c->io->connect; @@ -29,12 +29,12 @@ nxt_event_conn_sys_socket(nxt_task_t *task, void *obj, void *data) void -nxt_event_conn_io_connect(nxt_task_t *task, void *obj, void *data) +nxt_conn_io_connect(nxt_task_t *task, void *obj, void *data) { - nxt_event_conn_t *c; - nxt_work_handler_t handler; - nxt_event_engine_t *engine; - const nxt_event_conn_state_t *state; + nxt_conn_t *c; + nxt_work_handler_t handler; + nxt_event_engine_t *engine; + const nxt_conn_state_t *state; c = obj; @@ -48,12 +48,12 @@ nxt_event_conn_io_connect(nxt_task_t *task, void *obj, void *data) break; case NXT_AGAIN: - c->socket.write_handler = nxt_event_conn_connect_test; + c->socket.write_handler = nxt_conn_connect_test; c->socket.error_handler = state->error_handler; engine = task->thread->engine; - nxt_event_conn_timer(engine, c, state, &c->write_timer); + nxt_conn_timer(engine, c, state, &c->write_timer); nxt_fd_event_enable_write(engine, &c->socket); return; @@ -72,7 +72,7 @@ nxt_event_conn_io_connect(nxt_task_t *task, void *obj, void *data) nxt_int_t -nxt_event_conn_socket(nxt_task_t *task, nxt_event_conn_t *c) +nxt_conn_socket(nxt_task_t *task, nxt_conn_t *c) { nxt_uint_t family; nxt_socket_t s; @@ -116,11 +116,11 @@ nxt_event_conn_socket(nxt_task_t *task, nxt_event_conn_t *c) void -nxt_event_conn_connect_test(nxt_task_t *task, void *obj, void *data) +nxt_conn_connect_test(nxt_task_t *task, void *obj, void *data) { - int ret, err; - socklen_t len; - nxt_event_conn_t *c; + int ret, err; + socklen_t len; + nxt_conn_t *c; c = obj; @@ -157,16 +157,16 @@ nxt_event_conn_connect_test(nxt_task_t *task, void *obj, void *data) nxt_log(task, nxt_socket_error_level(err), "connect(%d, %*s) failed %E", c->socket.fd, c->remote->length, nxt_sockaddr_start(c->remote)); - nxt_event_conn_connect_error(task, c, data); + nxt_conn_connect_error(task, c, data); } void -nxt_event_conn_connect_error(nxt_task_t *task, void *obj, void *data) +nxt_conn_connect_error(nxt_task_t *task, void *obj, void *data) { - nxt_event_conn_t *c; - nxt_work_handler_t handler; - const nxt_event_conn_state_t *state; + nxt_conn_t *c; + nxt_work_handler_t handler; + const nxt_conn_state_t *state; c = obj; diff --git a/src/nxt_conn_proxy.c b/src/nxt_conn_proxy.c new file mode 100644 index 00000000..dec23684 --- /dev/null +++ b/src/nxt_conn_proxy.c @@ -0,0 +1,998 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) NGINX, Inc. + */ + +#include <nxt_main.h> + + +static void nxt_conn_proxy_client_buffer_alloc(nxt_task_t *task, void *obj, + void *data); +static void nxt_conn_proxy_peer_connect(nxt_task_t *task, void *obj, + void *data); +static void nxt_conn_proxy_connected(nxt_task_t *task, void *obj, void *data); +static void nxt_conn_proxy_peer_read(nxt_task_t *task, void *obj, void *data); +static void nxt_conn_proxy_client_read_ready(nxt_task_t *task, void *obj, + void *data); +static void nxt_conn_proxy_peer_read_ready(nxt_task_t *task, void *obj, + void *data); +static void nxt_conn_proxy_read_process(nxt_task_t *task, nxt_conn_proxy_t *p, + nxt_conn_t *source, nxt_conn_t *sink); +static void nxt_conn_proxy_write_add(nxt_conn_t *c, nxt_buf_t *b); +static void nxt_conn_proxy_read(nxt_task_t *task, void *obj, void *data); +static void nxt_conn_proxy_client_write_ready(nxt_task_t *task, void *obj, + void *data); +static void nxt_conn_proxy_peer_write_ready(nxt_task_t *task, void *obj, + void *data); +static void nxt_conn_proxy_write_process(nxt_task_t *task, nxt_conn_proxy_t *p, + nxt_conn_t *sink, nxt_conn_t *source); +static void nxt_conn_proxy_read_add(nxt_conn_t *c, nxt_buf_t *b); +static void nxt_conn_proxy_close(nxt_task_t *task, void *obj, void *data); +static void nxt_conn_proxy_error(nxt_task_t *task, void *obj, void *data); +static void nxt_conn_proxy_read_timeout(nxt_task_t *task, void *obj, + void *data); +static void nxt_conn_proxy_write_timeout(nxt_task_t *task, void *obj, + void *data); +static nxt_msec_t nxt_conn_proxy_timeout_value(nxt_conn_t *c, uintptr_t data); +static void nxt_conn_proxy_refused(nxt_task_t *task, void *obj, void *data); +static void nxt_conn_proxy_reconnect_handler(nxt_task_t *task, void *obj, + void *data); +static void nxt_conn_proxy_shutdown(nxt_task_t *task, nxt_conn_proxy_t *p, + nxt_conn_t *source, nxt_conn_t *sink); +static void nxt_conn_proxy_read_error(nxt_task_t *task, void *obj, void *data); +static void nxt_conn_proxy_write_error(nxt_task_t *task, void *obj, void *data); +static void nxt_conn_proxy_complete(nxt_task_t *task, nxt_conn_proxy_t *p); +static void nxt_conn_proxy_completion(nxt_task_t *task, void *obj, void *data); + + +static const nxt_conn_state_t nxt_conn_proxy_client_wait_state; +static const nxt_conn_state_t nxt_conn_proxy_client_first_read_state; +static const nxt_conn_state_t nxt_conn_proxy_peer_connect_state; +static const nxt_conn_state_t nxt_conn_proxy_peer_wait_state; +static const nxt_conn_state_t nxt_conn_proxy_client_read_state; +static const nxt_conn_state_t nxt_conn_proxy_peer_read_state; +static const nxt_conn_state_t nxt_conn_proxy_client_write_state; +static const nxt_conn_state_t nxt_conn_proxy_peer_write_state; + + +nxt_conn_proxy_t * +nxt_conn_proxy_create(nxt_conn_t *client) +{ + nxt_conn_t *peer; + nxt_thread_t *thr; + nxt_conn_proxy_t *p; + + p = nxt_mem_zalloc(client->mem_pool, sizeof(nxt_conn_proxy_t)); + if (nxt_slow_path(p == NULL)) { + return NULL; + } + + peer = nxt_conn_create(client->mem_pool, client->socket.task); + if (nxt_slow_path(peer == NULL)) { + return NULL; + } + + thr = nxt_thread(); + + client->read_work_queue = &thr->engine->read_work_queue; + client->write_work_queue = &thr->engine->write_work_queue; + client->socket.read_work_queue = &thr->engine->read_work_queue; + client->socket.write_work_queue = &thr->engine->write_work_queue; + peer->socket.read_work_queue = &thr->engine->read_work_queue; + peer->socket.write_work_queue = &thr->engine->write_work_queue; + + peer->socket.data = client->socket.data; + + peer->read_work_queue = client->read_work_queue; + peer->write_work_queue = client->write_work_queue; + peer->read_timer.work_queue = client->read_work_queue; + peer->write_timer.work_queue = client->write_work_queue; + + p->client = client; + p->peer = peer; + + return p; +} + + +void +nxt_conn_proxy(nxt_task_t *task, nxt_conn_proxy_t *p) +{ + nxt_conn_t *peer; + + /* + * Peer read event: not connected, disabled. + * Peer write event: not connected, disabled. + */ + + if (p->client_wait_timeout == 0) { + /* + * Peer write event: waiting for connection + * to be established with connect_timeout. + */ + peer = p->peer; + peer->write_state = &nxt_conn_proxy_peer_connect_state; + + nxt_conn_connect(task->thread->engine, peer); + } + + /* + * Client read event: waiting for client data with + * client_wait_timeout before buffer allocation. + */ + p->client->read_state = &nxt_conn_proxy_client_wait_state; + + nxt_conn_wait(p->client); +} + + +static const nxt_conn_state_t nxt_conn_proxy_client_wait_state + nxt_aligned(64) = +{ + .ready_handler = nxt_conn_proxy_client_buffer_alloc, + .close_handler = nxt_conn_proxy_close, + .error_handler = nxt_conn_proxy_error, + + .timer_handler = nxt_conn_proxy_read_timeout, + .timer_value = nxt_conn_proxy_timeout_value, + .timer_data = offsetof(nxt_conn_proxy_t, client_wait_timeout), +}; + + +static void +nxt_conn_proxy_client_buffer_alloc(nxt_task_t *task, void *obj, void *data) +{ + nxt_buf_t *b; + nxt_conn_t *client; + nxt_conn_proxy_t *p; + + client = obj; + p = data; + + nxt_debug(task, "conn proxy client first read fd:%d", client->socket.fd); + + b = nxt_buf_mem_alloc(client->mem_pool, p->client_buffer_size, + NXT_MEM_BUF_CUTBACK | NXT_MEM_BUF_USABLE); + + if (nxt_slow_path(b == NULL)) { + /* An error completion. */ + nxt_conn_proxy_complete(task, p); + return; + } + + p->client_buffer = b; + client->read = b; + + if (p->peer->socket.fd != -1) { + /* + * Client read event: waiting, no timeout. + * Client write event: blocked. + * Peer read event: disabled. + * Peer write event: waiting for connection to be established + * or blocked after the connection has established. + */ + client->read_state = &nxt_conn_proxy_client_read_state; + + } else { + /* + * Client read event: waiting for data with client_wait_timeout + * before connecting to a peer. + * Client write event: blocked. + * Peer read event: not connected, disabled. + * Peer write event: not connected, disabled. + */ + client->read_state = &nxt_conn_proxy_client_first_read_state; + } + + nxt_conn_read(task->thread->engine, client); +} + + +static const nxt_conn_state_t nxt_conn_proxy_client_first_read_state + nxt_aligned(64) = +{ + .ready_handler = nxt_conn_proxy_peer_connect, + .close_handler = nxt_conn_proxy_close, + .error_handler = nxt_conn_proxy_error, + + .timer_handler = nxt_conn_proxy_read_timeout, + .timer_value = nxt_conn_proxy_timeout_value, + .timer_data = offsetof(nxt_conn_proxy_t, client_wait_timeout), + .timer_autoreset = 1, +}; + + +static void +nxt_conn_proxy_peer_connect(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *client; + nxt_conn_proxy_t *p; + + client = obj; + p = data; + + /* + * Client read event: waiting, no timeout. + * Client write event: blocked. + * Peer read event: disabled. + * Peer write event: waiting for connection to be established + * with connect_timeout. + */ + client->read_state = &nxt_conn_proxy_client_read_state; + + p->peer->write_state = &nxt_conn_proxy_peer_connect_state; + + nxt_conn_connect(task->thread->engine, p->peer); +} + + +static const nxt_conn_state_t nxt_conn_proxy_peer_connect_state + nxt_aligned(64) = +{ + .ready_handler = nxt_conn_proxy_connected, + .close_handler = nxt_conn_proxy_refused, + .error_handler = nxt_conn_proxy_error, + + .timer_handler = nxt_conn_proxy_write_timeout, + .timer_value = nxt_conn_proxy_timeout_value, + .timer_data = offsetof(nxt_conn_proxy_t, connect_timeout), + .timer_autoreset = 1, +}; + + +static void +nxt_conn_proxy_connected(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *client, *peer; + nxt_conn_proxy_t *p; + + peer = obj; + p = data; + + nxt_debug(task, "conn proxy connected fd:%d", peer->socket.fd); + + p->connected = 1; + + nxt_conn_tcp_nodelay_on(task, peer); + nxt_conn_tcp_nodelay_on(task, p->client); + + /* Peer read event: waiting with peer_wait_timeout. */ + + peer->read_state = &nxt_conn_proxy_peer_wait_state; + peer->write_state = &nxt_conn_proxy_peer_write_state; + + nxt_conn_wait(peer); + + if (p->client_buffer != NULL) { + client = p->client; + + client->read_state = &nxt_conn_proxy_client_read_state; + client->write_state = &nxt_conn_proxy_client_write_state; + /* + * Send a client read data to the connected peer. + * Client write event: blocked. + */ + nxt_conn_proxy_read_process(task, p, client, peer); + } +} + + +static const nxt_conn_state_t nxt_conn_proxy_peer_wait_state + nxt_aligned(64) = +{ + .ready_handler = nxt_conn_proxy_peer_read, + .close_handler = nxt_conn_proxy_close, + .error_handler = nxt_conn_proxy_error, + + .timer_handler = nxt_conn_proxy_read_timeout, + .timer_value = nxt_conn_proxy_timeout_value, + .timer_data = offsetof(nxt_conn_proxy_t, peer_wait_timeout), +}; + + +static void +nxt_conn_proxy_peer_read(nxt_task_t *task, void *obj, void *data) +{ + nxt_buf_t *b; + nxt_conn_t *peer; + nxt_conn_proxy_t *p; + + peer = obj; + p = data; + + nxt_debug(task, "conn proxy peer read fd:%d", peer->socket.fd); + + b = nxt_buf_mem_alloc(peer->mem_pool, p->peer_buffer_size, + NXT_MEM_BUF_CUTBACK | NXT_MEM_BUF_USABLE); + + if (nxt_slow_path(b == NULL)) { + /* An error completion. */ + nxt_conn_proxy_complete(task, p); + return; + } + + p->peer_buffer = b; + peer->read = b; + + p->client->write_state = &nxt_conn_proxy_client_write_state; + peer->read_state = &nxt_conn_proxy_peer_read_state; + peer->write_state = &nxt_conn_proxy_peer_write_state; + + /* + * Client read event: waiting, no timeout. + * Client write event: blocked. + * Peer read event: waiting with possible peer_wait_timeout. + * Peer write event: blocked. + */ + nxt_conn_read(task->thread->engine, peer); +} + + +static const nxt_conn_state_t nxt_conn_proxy_client_read_state + nxt_aligned(64) = +{ + .ready_handler = nxt_conn_proxy_client_read_ready, + .close_handler = nxt_conn_proxy_close, + .error_handler = nxt_conn_proxy_read_error, +}; + + +static void +nxt_conn_proxy_client_read_ready(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *client; + nxt_conn_proxy_t *p; + + client = obj; + p = data; + + nxt_debug(task, "conn proxy client read ready fd:%d", client->socket.fd); + + nxt_conn_proxy_read_process(task, p, client, p->peer); +} + + +static const nxt_conn_state_t nxt_conn_proxy_peer_read_state + nxt_aligned(64) = +{ + .ready_handler = nxt_conn_proxy_peer_read_ready, + .close_handler = nxt_conn_proxy_close, + .error_handler = nxt_conn_proxy_read_error, +}; + + +static void +nxt_conn_proxy_peer_read_ready(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *peer; + nxt_conn_proxy_t *p; + + peer = obj; + p = data; + + nxt_debug(task, "conn proxy peer read ready fd:%d", peer->socket.fd); + + nxt_conn_proxy_read_process(task, p, peer, p->client); +} + + +static void +nxt_conn_proxy_read_process(nxt_task_t *task, nxt_conn_proxy_t *p, + nxt_conn_t *source, nxt_conn_t *sink) +{ + nxt_buf_t *rb, *wb; + + if (sink->socket.error != 0) { + nxt_debug(task, "conn proxy sink fd:%d error:%d", + sink->socket.fd, sink->socket.error); + + nxt_conn_proxy_write_error(task, sink, sink->socket.data); + return; + } + + while (source->read != NULL) { + + rb = source->read; + + if (rb->mem.pos != rb->mem.free) { + + /* Add a read part to a write chain. */ + + wb = nxt_buf_mem_alloc(source->mem_pool, 0, 0); + if (wb == NULL) { + /* An error completion. */ + nxt_conn_proxy_complete(task, p); + return; + } + + wb->mem.pos = rb->mem.pos; + wb->mem.free = rb->mem.free; + wb->mem.start = rb->mem.pos; + wb->mem.end = rb->mem.free; + + rb->mem.pos = rb->mem.free; + rb->mem.start = rb->mem.free; + + nxt_conn_proxy_write_add(sink, wb); + } + + if (rb->mem.start != rb->mem.end) { + nxt_work_queue_add(source->read_work_queue, nxt_conn_proxy_read, + task, source, source->socket.data); + break; + } + + source->read = rb->next; + nxt_buf_free(source->mem_pool, rb); + } + + if (p->connected) { + nxt_conn_write(task->thread->engine, sink); + } +} + + +static void +nxt_conn_proxy_write_add(nxt_conn_t *c, nxt_buf_t *b) +{ + nxt_buf_t *first, *second, *prev; + + first = c->write; + + if (first == NULL) { + c->write = b; + return; + } + + /* + * A event conn proxy maintains a buffer per each direction. + * The buffer is divided by read and write parts. These parts are + * linked in buffer chains. There can be no more than two buffers + * in write chain at any time, because an added buffer is coalesced + * with the last buffer if possible. + */ + + second = first->next; + + if (second == NULL) { + + if (first->mem.end != b->mem.start) { + first->next = b; + return; + } + + /* + * The first buffer is just before the added buffer, so + * expand the first buffer to the end of the added buffer. + */ + prev = first; + + } else { + if (second->mem.end != b->mem.start) { + nxt_thread_log_alert("event conn proxy write: second buffer end:%p " + "is not equal to added buffer start:%p", + second->mem.end, b->mem.start); + return; + } + + /* + * "second->mem.end == b->mem.start" must be always true here, + * that is the second buffer is just before the added buffer, + * so expand the second buffer to the end of added buffer. + */ + prev = second; + } + + prev->mem.free = b->mem.end; + prev->mem.end = b->mem.end; + + nxt_buf_free(c->mem_pool, b); +} + + +static void +nxt_conn_proxy_read(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *source, *sink; + nxt_conn_proxy_t *p; + + source = obj; + p = data; + + nxt_debug(task, "conn proxy read fd:%d", source->socket.fd); + + if (!source->socket.closed) { + sink = (source == p->client) ? p->peer : p->client; + + if (sink->socket.error == 0) { + nxt_conn_read(task->thread->engine, source); + } + } +} + + +static const nxt_conn_state_t nxt_conn_proxy_client_write_state + nxt_aligned(64) = +{ + .ready_handler = nxt_conn_proxy_client_write_ready, + .error_handler = nxt_conn_proxy_write_error, + + .timer_handler = nxt_conn_proxy_write_timeout, + .timer_value = nxt_conn_proxy_timeout_value, + .timer_data = offsetof(nxt_conn_proxy_t, client_write_timeout), + .timer_autoreset = 1, +}; + + +static void +nxt_conn_proxy_client_write_ready(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *client; + nxt_conn_proxy_t *p; + + client = obj; + p = data; + + nxt_debug(task, "conn proxy client write ready fd:%d", client->socket.fd); + + nxt_conn_proxy_write_process(task, p, client, p->peer); +} + + +static const nxt_conn_state_t nxt_conn_proxy_peer_write_state + nxt_aligned(64) = +{ + .ready_handler = nxt_conn_proxy_peer_write_ready, + .error_handler = nxt_conn_proxy_write_error, + + .timer_handler = nxt_conn_proxy_write_timeout, + .timer_value = nxt_conn_proxy_timeout_value, + .timer_data = offsetof(nxt_conn_proxy_t, peer_write_timeout), + .timer_autoreset = 1, +}; + + +static void +nxt_conn_proxy_peer_write_ready(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *peer; + nxt_conn_proxy_t *p; + + peer = obj; + p = data; + + nxt_debug(task, "conn proxy peer write ready fd:%d", peer->socket.fd); + + nxt_conn_proxy_write_process(task, p, peer, p->client); +} + + +static void +nxt_conn_proxy_write_process(nxt_task_t *task, nxt_conn_proxy_t *p, + nxt_conn_t *sink, nxt_conn_t *source) +{ + nxt_buf_t *rb, *wb; + + while (sink->write != NULL) { + + wb = sink->write; + + if (nxt_buf_is_sync(wb)) { + + /* A sync buffer marks the end of stream. */ + + sink->write = NULL; + nxt_buf_free(sink->mem_pool, wb); + nxt_conn_proxy_shutdown(task, p, source, sink); + return; + } + + if (wb->mem.start != wb->mem.pos) { + + /* Add a written part to a read chain. */ + + rb = nxt_buf_mem_alloc(sink->mem_pool, 0, 0); + if (rb == NULL) { + /* An error completion. */ + nxt_conn_proxy_complete(task, p); + return; + } + + rb->mem.pos = wb->mem.start; + rb->mem.free = wb->mem.start; + rb->mem.start = wb->mem.start; + rb->mem.end = wb->mem.pos; + + wb->mem.start = wb->mem.pos; + + nxt_conn_proxy_read_add(source, rb); + } + + if (wb->mem.pos != wb->mem.free) { + nxt_conn_write(task->thread->engine, sink); + + break; + } + + sink->write = wb->next; + nxt_buf_free(sink->mem_pool, wb); + } + + nxt_work_queue_add(source->read_work_queue, nxt_conn_proxy_read, + task, source, source->socket.data); +} + + +static void +nxt_conn_proxy_read_add(nxt_conn_t *c, nxt_buf_t *b) +{ + nxt_buf_t *first, *second; + + first = c->read; + + if (first == NULL) { + c->read = b; + return; + } + + /* + * A event conn proxy maintains a buffer per each direction. + * The buffer is divided by read and write parts. These parts are + * linked in buffer chains. There can be no more than two buffers + * in read chain at any time, because an added buffer is coalesced + * with the last buffer if possible. The first and the second + * buffers are also coalesced if possible. + */ + + second = first->next; + + if (second == NULL) { + + if (first->mem.start == b->mem.end) { + /* + * The added buffer is just before the first buffer, so expand + * the first buffer to the beginning of the added buffer. + */ + first->mem.pos = b->mem.start; + first->mem.free = b->mem.start; + first->mem.start = b->mem.start; + + } else if (first->mem.end == b->mem.start) { + /* + * The added buffer is just after the first buffer, so + * expand the first buffer to the end of the added buffer. + */ + first->mem.end = b->mem.end; + + } else { + first->next = b; + return; + } + + } else { + if (second->mem.end != b->mem.start) { + nxt_thread_log_alert("event conn proxy read: second buffer end:%p " + "is not equal to added buffer start:%p", + second->mem.end, b->mem.start); + return; + } + + /* + * The added buffer is just after the second buffer, so + * expand the second buffer to the end of the added buffer. + */ + second->mem.end = b->mem.end; + + if (first->mem.start == second->mem.end) { + /* + * The second buffer is just before the first buffer, so expand + * the first buffer to the beginning of the second buffer. + */ + first->mem.pos = second->mem.start; + first->mem.free = second->mem.start; + first->mem.start = second->mem.start; + first->next = NULL; + + nxt_buf_free(c->mem_pool, second); + } + } + + nxt_buf_free(c->mem_pool, b); +} + + +static void +nxt_conn_proxy_close(nxt_task_t *task, void *obj, void *data) +{ + nxt_buf_t *b; + nxt_conn_t *source, *sink; + nxt_conn_proxy_t *p; + + source = obj; + p = data; + + nxt_debug(task, "conn proxy close fd:%d", source->socket.fd); + + sink = (source == p->client) ? p->peer : p->client; + + if (sink->write == NULL) { + nxt_conn_proxy_shutdown(task, p, source, sink); + return; + } + + b = nxt_buf_sync_alloc(source->mem_pool, 0); + if (b == NULL) { + /* An error completion. */ + nxt_conn_proxy_complete(task, p); + return; + } + + nxt_buf_chain_add(&sink->write, b); +} + + +static void +nxt_conn_proxy_error(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *c; + nxt_conn_proxy_t *p; + + c = obj; + p = data; + + nxt_debug(task, "conn proxy error fd:%d", c->socket.fd); + + nxt_conn_proxy_close(task, c, p); +} + + +static void +nxt_conn_proxy_read_timeout(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *c; + nxt_timer_t *timer; + + timer = obj; + + c = nxt_read_timer_conn(timer); + c->socket.timedout = 1; + c->socket.closed = 1; + + nxt_debug(task, "conn proxy read timeout fd:%d", c->socket.fd); + + nxt_conn_proxy_close(task, c, c->socket.data); +} + + +static void +nxt_conn_proxy_write_timeout(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *c; + nxt_timer_t *timer; + + timer = obj; + + c = nxt_write_timer_conn(timer); + c->socket.timedout = 1; + c->socket.closed = 1; + + nxt_debug(task, "conn proxy write timeout fd:%d", c->socket.fd); + + nxt_conn_proxy_close(task, c, c->socket.data); +} + + +static nxt_msec_t +nxt_conn_proxy_timeout_value(nxt_conn_t *c, uintptr_t data) +{ + nxt_msec_t *timer; + nxt_conn_proxy_t *p; + + p = c->socket.data; + + timer = (nxt_msec_t *) ((char *) p + data); + + return *timer; +} + + +static void +nxt_conn_proxy_refused(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *peer; + nxt_conn_proxy_t *p; + + peer = obj; + p = data; + + nxt_debug(task, "conn proxy refused fd:%d", peer->socket.fd); + + if (p->retries == 0) { + /* An error completion. */ + nxt_conn_proxy_complete(task, p); + return; + } + + p->retries--; + + nxt_socket_close(task, peer->socket.fd); + peer->socket.fd = -1; + peer->socket.error = 0; + + p->delayed = 1; + + peer->write_timer.handler = nxt_conn_proxy_reconnect_handler; + nxt_timer_add(task->thread->engine, &peer->write_timer, + p->reconnect_timeout); +} + + +static void +nxt_conn_proxy_reconnect_handler(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *peer; + nxt_timer_t *timer; + nxt_conn_proxy_t *p; + + timer = obj; + + nxt_debug(task, "conn proxy reconnect timer"); + + peer = nxt_write_timer_conn(timer); + p = peer->socket.data; + + if (p->client->socket.closed) { + nxt_conn_proxy_complete(task, p); + return; + } + + p->delayed = 0; + + peer->write_state = &nxt_conn_proxy_peer_connect_state; + /* + * Peer read event: disabled. + * Peer write event: waiting for connection with connect_timeout. + */ + nxt_conn_connect(task->thread->engine, peer); +} + + +static void +nxt_conn_proxy_shutdown(nxt_task_t *task, nxt_conn_proxy_t *p, + nxt_conn_t *source, nxt_conn_t *sink) +{ + nxt_buf_t *b; + + nxt_debug(source->socket.task, + "conn proxy shutdown source fd:%d cl:%d err:%d", + source->socket.fd, source->socket.closed, source->socket.error); + + nxt_debug(sink->socket.task, + "conn proxy shutdown sink fd:%d cl:%d err:%d", + sink->socket.fd, sink->socket.closed, sink->socket.error); + + if (!p->connected || p->delayed) { + nxt_conn_proxy_complete(task, p); + return; + } + + if (sink->socket.error == 0 && !sink->socket.closed) { + sink->socket.shutdown = 1; + nxt_socket_shutdown(task, sink->socket.fd, SHUT_WR); + } + + if (sink->socket.error != 0 + || (sink->socket.closed && source->write == NULL)) + { + /* The opposite direction also has been already closed. */ + nxt_conn_proxy_complete(task, p); + return; + } + + nxt_debug(source->socket.task, "free source buffer"); + + /* Free the direction's buffer. */ + b = (source == p->client) ? p->client_buffer : p->peer_buffer; + nxt_mem_free(source->mem_pool, b); +} + + +static void +nxt_conn_proxy_read_error(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *c; + nxt_conn_proxy_t *p; + + c = obj; + p = data; + + nxt_debug(task, "conn proxy read error fd:%d", c->socket.fd); + + nxt_conn_proxy_close(task, c, p); +} + + +static void +nxt_conn_proxy_write_error(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *source, *sink; + nxt_conn_proxy_t *p; + + sink = obj; + p = data; + + nxt_debug(task, "conn proxy write error fd:%d", sink->socket.fd); + + /* Clear data for the direction sink. */ + sink->write = NULL; + + /* Block the direction source. */ + source = (sink == p->client) ? p->peer : p->client; + nxt_fd_event_block_read(task->thread->engine, &source->socket); + + if (source->write == NULL) { + /* + * There is no data for the opposite direction and + * the next read from the sink will most probably fail. + */ + nxt_conn_proxy_complete(task, p); + } +} + + +static const nxt_conn_state_t nxt_conn_proxy_close_state + nxt_aligned(64) = +{ + .ready_handler = nxt_conn_proxy_completion, +}; + + +static void +nxt_conn_proxy_complete(nxt_task_t *task, nxt_conn_proxy_t *p) +{ + nxt_event_engine_t *engine; + + engine = task->thread->engine; + + nxt_debug(p->client->socket.task, "conn proxy complete %d:%d", + p->client->socket.fd, p->peer->socket.fd); + + if (p->delayed) { + p->delayed = 0; + nxt_queue_remove(&p->peer->link); + } + + if (p->client->socket.fd != -1) { + p->retain = 1; + p->client->write_state = &nxt_conn_proxy_close_state; + nxt_conn_close(engine, p->client); + } + + if (p->peer->socket.fd != -1) { + p->retain++; + p->peer->write_state = &nxt_conn_proxy_close_state; + nxt_conn_close(engine, p->peer); + } +} + + +static void +nxt_conn_proxy_completion(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_proxy_t *p; + + p = data; + + nxt_debug(p->client->socket.task, "conn proxy completion %d:%d:%d", + p->retain, p->client->socket.fd, p->peer->socket.fd); + + p->retain--; + + if (p->retain == 0) { + nxt_mem_free(p->client->mem_pool, p->client_buffer); + nxt_mem_free(p->client->mem_pool, p->peer_buffer); + + p->completion_handler(task, p, NULL); + } +} diff --git a/src/nxt_event_conn_read.c b/src/nxt_conn_read.c index be2dfdb2..e7134b99 100644 --- a/src/nxt_event_conn_read.c +++ b/src/nxt_conn_read.c @@ -8,10 +8,10 @@ void -nxt_conn_wait(nxt_event_conn_t *c) +nxt_conn_wait(nxt_conn_t *c) { - nxt_event_engine_t *engine; - const nxt_event_conn_state_t *state; + nxt_event_engine_t *engine; + const nxt_conn_state_t *state; nxt_debug(c->socket.task, "conn wait fd:%d rdy:%d", c->socket.fd, c->socket.read_ready); @@ -28,26 +28,26 @@ nxt_conn_wait(nxt_event_conn_t *c) c->socket.read_handler = state->ready_handler; c->socket.error_handler = state->error_handler; - nxt_event_conn_timer(engine, c, state, &c->read_timer); + nxt_conn_timer(engine, c, state, &c->read_timer); nxt_fd_event_enable_read(engine, &c->socket); } void -nxt_event_conn_io_read(nxt_task_t *task, void *obj, void *data) +nxt_conn_io_read(nxt_task_t *task, void *obj, void *data) { - ssize_t n; - nxt_buf_t *b; - nxt_work_queue_t *wq; - nxt_event_conn_t *c; - nxt_event_engine_t *engine; - nxt_work_handler_t handler; - const nxt_event_conn_state_t *state; + ssize_t n; + nxt_buf_t *b; + nxt_conn_t *c; + nxt_work_queue_t *wq; + nxt_event_engine_t *engine; + nxt_work_handler_t handler; + const nxt_conn_state_t *state; c = obj; - nxt_debug(task, "event conn read fd:%d rdy:%d cl:%d", + nxt_debug(task, "conn read fd:%d rdy:%d cl:%d", c->socket.fd, c->socket.read_ready, c->socket.closed); engine = task->thread->engine; @@ -99,9 +99,8 @@ nxt_event_conn_io_read(nxt_task_t *task, void *obj, void *data) } /* - * Here c->io->read() is assigned instead of direct - * nxt_event_conn_io_read() because the function can - * be called by nxt_kqueue_event_conn_io_read(). + * Here c->io->read() is assigned instead of direct nxt_conn_io_read() + * because the function can be called by nxt_kqueue_conn_io_read(). */ c->socket.read_handler = c->io->read; c->socket.error_handler = state->error_handler; @@ -110,7 +109,7 @@ nxt_event_conn_io_read(nxt_task_t *task, void *obj, void *data) || nxt_fd_event_is_disabled(c->socket.read)) { /* Timer may be set or reset. */ - nxt_event_conn_timer(engine, c, state, &c->read_timer); + nxt_conn_timer(engine, c, state, &c->read_timer); if (nxt_fd_event_is_disabled(c->socket.read)) { nxt_fd_event_enable_read(engine, &c->socket); @@ -122,7 +121,7 @@ nxt_event_conn_io_read(nxt_task_t *task, void *obj, void *data) ssize_t -nxt_event_conn_io_recvbuf(nxt_event_conn_t *c, nxt_buf_t *b) +nxt_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b) { ssize_t n; nxt_err_t err; @@ -139,7 +138,7 @@ nxt_event_conn_io_recvbuf(nxt_event_conn_t *c, nxt_buf_t *b) if (niov == 1) { /* Disposal of surplus kernel iovec copy-in operation. */ - return nxt_event_conn_io_recv(c, iov->iov_base, iov->iov_len, 0); + return nxt_conn_io_recv(c, iov->iov_base, iov->iov_len, 0); } for ( ;; ) { @@ -188,8 +187,7 @@ nxt_event_conn_io_recvbuf(nxt_event_conn_t *c, nxt_buf_t *b) ssize_t -nxt_event_conn_io_recv(nxt_event_conn_t *c, void *buf, size_t size, - nxt_uint_t flags) +nxt_conn_io_recv(nxt_conn_t *c, void *buf, size_t size, nxt_uint_t flags) { ssize_t n; nxt_err_t err; diff --git a/src/nxt_event_conn_write.c b/src/nxt_conn_write.c index fa5b9241..a2a5737b 100644 --- a/src/nxt_event_conn_write.c +++ b/src/nxt_conn_write.c @@ -7,7 +7,7 @@ #include <nxt_main.h> -static void nxt_event_conn_write_timer_handler(nxt_task_t *task, void *obj, +static void nxt_conn_write_timer_handler(nxt_task_t *task, void *obj, void *data); @@ -16,13 +16,13 @@ nxt_conn_io_write(nxt_task_t *task, void *obj, void *data) { ssize_t ret; nxt_buf_t *b; + nxt_conn_t *c; nxt_sendbuf_t sb; - nxt_event_conn_t *c; nxt_event_engine_t *engine; c = obj; - nxt_debug(task, "event conn write fd:%d", c->socket.fd); + nxt_debug(task, "conn write fd:%d", c->socket.fd); if (!c->socket.write_ready || c->write == NULL) { return; @@ -89,7 +89,7 @@ nxt_conn_io_write(nxt_task_t *task, void *obj, void *data) * Postpone writing until next event poll to allow to * process other recevied events and to get new events. */ - c->write_timer.handler = nxt_event_conn_write_timer_handler; + c->write_timer.handler = nxt_conn_write_timer_handler; nxt_timer_add(engine, &c->write_timer, 0); } else if (ret == NXT_AGAIN) { @@ -100,7 +100,7 @@ nxt_conn_io_write(nxt_task_t *task, void *obj, void *data) * can be set here because it should be set only for write * direction. */ - nxt_event_conn_timer(engine, c, c->write_state, &c->write_timer); + nxt_conn_timer(engine, c, c->write_state, &c->write_timer); if (nxt_fd_event_is_disabled(c->socket.write)) { nxt_fd_event_enable_write(engine, &c->socket); @@ -128,8 +128,133 @@ nxt_conn_io_write(nxt_task_t *task, void *obj, void *data) } +static void +nxt_conn_write_timer_handler(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *c; + nxt_timer_t *timer; + + timer = obj; + + nxt_debug(task, "event conn conn timer"); + + c = nxt_write_timer_conn(timer); + c->delayed = 0; + + c->io->write(task, c, c->socket.data); +} + + +ssize_t +nxt_conn_io_sendbuf(nxt_task_t *task, nxt_sendbuf_t *sb) +{ + nxt_uint_t niov; + struct iovec iov[NXT_IOBUF_MAX]; + + niov = nxt_sendbuf_mem_coalesce0(task, sb, iov, NXT_IOBUF_MAX); + + if (niov == 0 && sb->sync) { + return 0; + } + + return nxt_conn_io_writev(task, sb, iov, niov); +} + + +ssize_t +nxt_conn_io_writev(nxt_task_t *task, nxt_sendbuf_t *sb, struct iovec *iov, + nxt_uint_t niov) +{ + ssize_t n; + nxt_err_t err; + + if (niov == 1) { + /* Disposal of surplus kernel iovec copy-in operation. */ + return nxt_conn_io_send(task, sb, iov[0].iov_base, iov[0].iov_len); + } + + for ( ;; ) { + n = writev(sb->socket, iov, niov); + + err = (n == -1) ? nxt_socket_errno : 0; + + nxt_debug(task, "writev(%d, %ui): %d", sb->socket, niov, n); + + if (n > 0) { + return n; + } + + /* n == -1 */ + + switch (err) { + + case NXT_EAGAIN: + sb->ready = 0; + nxt_debug(task, "writev() %E", err); + + return NXT_AGAIN; + + case NXT_EINTR: + nxt_debug(task, "writev() %E", err); + continue; + + default: + sb->error = err; + nxt_log(task, nxt_socket_error_level(err), + "writev(%d, %ui) failed %E", sb->socket, niov, err); + + return NXT_ERROR; + } + } +} + + +ssize_t +nxt_conn_io_send(nxt_task_t *task, nxt_sendbuf_t *sb, void *buf, size_t size) +{ + ssize_t n; + nxt_err_t err; + + for ( ;; ) { + n = send(sb->socket, buf, size, 0); + + err = (n == -1) ? nxt_socket_errno : 0; + + nxt_debug(task, "send(%d, %p, %uz): %z", sb->socket, buf, size, n); + + if (n > 0) { + return n; + } + + /* n == -1 */ + + switch (err) { + + case NXT_EAGAIN: + sb->ready = 0; + nxt_debug(task, "send() %E", err); + + return NXT_AGAIN; + + case NXT_EINTR: + nxt_debug(task, "send() %E", err); + continue; + + default: + sb->error = err; + nxt_log(task, nxt_socket_error_level(err), + "send(%d, %p, %uz) failed %E", sb->socket, buf, size, err); + + return NXT_ERROR; + } + } +} + + +/* Obsolete interfaces. */ + size_t -nxt_event_conn_write_limit(nxt_event_conn_t *c) +nxt_event_conn_write_limit(nxt_conn_t *c) { ssize_t limit, correction; nxt_event_write_rate_t *rate; @@ -162,32 +287,15 @@ nxt_event_conn_write_limit(nxt_event_conn_t *c) nxt_bool_t -nxt_event_conn_write_delayed(nxt_event_engine_t *engine, nxt_event_conn_t *c, +nxt_event_conn_write_delayed(nxt_event_engine_t *engine, nxt_conn_t *c, size_t sent) { return 0; } -static void -nxt_event_conn_write_timer_handler(nxt_task_t *task, void *obj, void *data) -{ - nxt_timer_t *ev; - nxt_event_conn_t *c; - - ev = obj; - - nxt_debug(task, "event conn conn timer"); - - c = nxt_event_write_timer_conn(ev); - c->delayed = 0; - - c->io->write(task, c, c->socket.data); -} - - ssize_t -nxt_event_conn_io_write_chunk(nxt_event_conn_t *c, nxt_buf_t *b, size_t limit) +nxt_event_conn_io_write_chunk(nxt_conn_t *c, nxt_buf_t *b, size_t limit) { ssize_t ret; @@ -204,7 +312,7 @@ nxt_event_conn_io_write_chunk(nxt_event_conn_t *c, nxt_buf_t *b, size_t limit) ssize_t -nxt_event_conn_io_sendbuf(nxt_event_conn_t *c, nxt_buf_t *b, size_t limit) +nxt_event_conn_io_sendbuf(nxt_conn_t *c, nxt_buf_t *b, size_t limit) { nxt_uint_t niob; struct iovec iob[NXT_IOBUF_MAX]; @@ -228,7 +336,7 @@ nxt_event_conn_io_sendbuf(nxt_event_conn_t *c, nxt_buf_t *b, size_t limit) ssize_t -nxt_event_conn_io_writev(nxt_event_conn_t *c, nxt_iobuf_t *iob, nxt_uint_t niob) +nxt_event_conn_io_writev(nxt_conn_t *c, nxt_iobuf_t *iob, nxt_uint_t niob) { ssize_t n; nxt_err_t err; @@ -273,7 +381,7 @@ nxt_event_conn_io_writev(nxt_event_conn_t *c, nxt_iobuf_t *iob, nxt_uint_t niob) ssize_t -nxt_event_conn_io_send(nxt_event_conn_t *c, void *buf, size_t size) +nxt_event_conn_io_send(nxt_conn_t *c, void *buf, size_t size) { ssize_t n; nxt_err_t err; @@ -312,109 +420,3 @@ nxt_event_conn_io_send(nxt_event_conn_t *c, void *buf, size_t size) } } } - - -ssize_t -nxt_conn_io_sendbuf(nxt_task_t *task, nxt_sendbuf_t *sb) -{ - nxt_uint_t niov; - struct iovec iov[NXT_IOBUF_MAX]; - - niov = nxt_sendbuf_mem_coalesce0(task, sb, iov, NXT_IOBUF_MAX); - - if (niov == 0 && sb->sync) { - return 0; - } - - return nxt_conn_io_writev(task, sb, iov, niov); -} - - -ssize_t -nxt_conn_io_writev(nxt_task_t *task, nxt_sendbuf_t *sb, struct iovec *iov, - nxt_uint_t niov) -{ - ssize_t n; - nxt_err_t err; - - if (niov == 1) { - /* Disposal of surplus kernel iovec copy-in operation. */ - return nxt_conn_io_send(task, sb, iov[0].iov_base, iov[0].iov_len); - } - - for ( ;; ) { - n = writev(sb->socket, iov, niov); - - err = (n == -1) ? nxt_socket_errno : 0; - - nxt_debug(task, "writev(%d, %ui): %d", sb->socket, niov, n); - - if (n > 0) { - return n; - } - - /* n == -1 */ - - switch (err) { - - case NXT_EAGAIN: - sb->ready = 0; - nxt_debug(task, "writev() %E", err); - - return NXT_AGAIN; - - case NXT_EINTR: - nxt_debug(task, "writev() %E", err); - continue; - - default: - sb->error = err; - nxt_log(task, nxt_socket_error_level(err), - "writev(%d, %ui) failed %E", sb->socket, niov, err); - - return NXT_ERROR; - } - } -} - - -ssize_t -nxt_conn_io_send(nxt_task_t *task, nxt_sendbuf_t *sb, void *buf, size_t size) -{ - ssize_t n; - nxt_err_t err; - - for ( ;; ) { - n = send(sb->socket, buf, size, 0); - - err = (n == -1) ? nxt_socket_errno : 0; - - nxt_debug(task, "send(%d, %p, %uz): %z", sb->socket, buf, size, n); - - if (n > 0) { - return n; - } - - /* n == -1 */ - - switch (err) { - - case NXT_EAGAIN: - sb->ready = 0; - nxt_debug(task, "send() %E", err); - - return NXT_AGAIN; - - case NXT_EINTR: - nxt_debug(task, "send() %E", err); - continue; - - default: - sb->error = err; - nxt_log(task, nxt_socket_error_level(err), - "send(%d, %p, %uz) failed %E", sb->socket, buf, size, err); - - return NXT_ERROR; - } - } -} diff --git a/src/nxt_controller.c b/src/nxt_controller.c index e31e2e15..6ec4c744 100644 --- a/src/nxt_controller.c +++ b/src/nxt_controller.c @@ -34,7 +34,7 @@ typedef struct { static void nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data); static void nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data); -static nxt_msec_t nxt_controller_conn_timeout_value(nxt_event_conn_t *c, +static nxt_msec_t nxt_controller_conn_timeout_value(nxt_conn_t *c, uintptr_t data); static void nxt_controller_conn_read_error(nxt_task_t *task, void *obj, void *data); @@ -54,8 +54,8 @@ static nxt_int_t nxt_controller_request_content_length(void *ctx, nxt_http_field_t *field, uintptr_t data, nxt_log_t *log); static void nxt_controller_process_request(nxt_task_t *task, - nxt_event_conn_t *c, nxt_controller_request_t *r); -static nxt_int_t nxt_controller_response(nxt_task_t *task, nxt_event_conn_t *c, + nxt_conn_t *c, nxt_controller_request_t *r); +static nxt_int_t nxt_controller_response(nxt_task_t *task, nxt_conn_t *c, nxt_controller_response_t *resp); static nxt_buf_t *nxt_controller_response_body(nxt_controller_response_t *resp, nxt_mem_pool_t *pool); @@ -184,7 +184,7 @@ nxt_runtime_controller_socket(nxt_task_t *task, nxt_runtime_t *rt) */ ls->mem_pool_size = nxt_listen_socket_pool_min_size(ls) + sizeof(nxt_event_conn_proxy_t) - + sizeof(nxt_event_conn_t) + + sizeof(nxt_conn_t) + 4 * sizeof(nxt_buf_t); if (nxt_listen_socket_create(task, ls, 0) != NXT_OK) { @@ -201,7 +201,7 @@ static void nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data) { nxt_buf_t *b; - nxt_event_conn_t *c; + nxt_conn_t *c; nxt_event_engine_t *engine; nxt_controller_request_t *r; @@ -237,7 +237,7 @@ nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data) c->read_work_queue = &engine->read_work_queue; c->write_work_queue = &engine->write_work_queue; - nxt_event_conn_read(engine, c); + nxt_conn_read(engine, c); } @@ -260,7 +260,7 @@ nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data) size_t preread; nxt_buf_t *b; nxt_int_t rc; - nxt_event_conn_t *c; + nxt_conn_t *c; nxt_controller_request_t *r; c = obj; @@ -284,7 +284,7 @@ nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data) return; } - nxt_event_conn_read(task->thread->engine, c); + nxt_conn_read(task->thread->engine, c); return; } @@ -329,12 +329,12 @@ nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data) c->read_state = &nxt_controller_conn_body_read_state; - nxt_event_conn_read(task->thread->engine, c); + nxt_conn_read(task->thread->engine, c); } static nxt_msec_t -nxt_controller_conn_timeout_value(nxt_event_conn_t *c, uintptr_t data) +nxt_controller_conn_timeout_value(nxt_conn_t *c, uintptr_t data) { return (nxt_msec_t) data; } @@ -343,7 +343,7 @@ nxt_controller_conn_timeout_value(nxt_event_conn_t *c, uintptr_t data) static void nxt_controller_conn_read_error(nxt_task_t *task, void *obj, void *data) { - nxt_event_conn_t *c; + nxt_conn_t *c; c = obj; @@ -356,12 +356,12 @@ nxt_controller_conn_read_error(nxt_task_t *task, void *obj, void *data) static void nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj, void *data) { - nxt_timer_t *ev; - nxt_event_conn_t *c; + nxt_timer_t *timer; + nxt_conn_t *c; - ev = obj; + timer = obj; - c = nxt_event_read_timer_conn(ev); + c = nxt_read_timer_conn(timer); c->socket.timedout = 1; c->socket.closed = 1; @@ -388,9 +388,9 @@ static const nxt_event_conn_state_t nxt_controller_conn_body_read_state static void nxt_controller_conn_body_read(nxt_task_t *task, void *obj, void *data) { - size_t rest; - nxt_buf_t *b; - nxt_event_conn_t *c; + size_t rest; + nxt_buf_t *b; + nxt_conn_t *c; c = obj; @@ -409,7 +409,7 @@ nxt_controller_conn_body_read(nxt_task_t *task, void *obj, void *data) nxt_debug(task, "controller conn body read again, rest: %uz", rest); - nxt_event_conn_read(task->thread->engine, c); + nxt_conn_read(task->thread->engine, c); } @@ -429,8 +429,8 @@ static const nxt_event_conn_state_t nxt_controller_conn_write_state static void nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data) { - nxt_buf_t *b; - nxt_event_conn_t *c; + nxt_buf_t *b; + nxt_conn_t *c; c = obj; @@ -439,7 +439,7 @@ nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data) b = c->write; if (b->mem.pos != b->mem.free) { - nxt_event_conn_write(task->thread->engine, c); + nxt_conn_write(task->thread->engine, c); return; } @@ -452,7 +452,7 @@ nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data) static void nxt_controller_conn_write_error(nxt_task_t *task, void *obj, void *data) { - nxt_event_conn_t *c; + nxt_conn_t *c; c = obj; @@ -465,12 +465,12 @@ nxt_controller_conn_write_error(nxt_task_t *task, void *obj, void *data) static void nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj, void *data) { - nxt_timer_t *ev; - nxt_event_conn_t *c; + nxt_conn_t *c; + nxt_timer_t *timer; - ev = obj; + timer = obj; - c = nxt_event_write_timer_conn(ev); + c = nxt_write_timer_conn(timer); c->socket.timedout = 1; c->socket.closed = 1; @@ -490,7 +490,7 @@ static const nxt_event_conn_state_t nxt_controller_conn_close_state static void nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data) { - nxt_event_conn_t *c; + nxt_conn_t *c; c = obj; @@ -500,14 +500,14 @@ nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data) c->write_state = &nxt_controller_conn_close_state; - nxt_event_conn_close(task->thread->engine, c); + nxt_conn_close(task->thread->engine, c); } static void nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data) { - nxt_event_conn_t *c; + nxt_conn_t *c; c = obj; @@ -544,7 +544,7 @@ nxt_controller_request_content_length(void *ctx, nxt_http_field_t *field, static void -nxt_controller_process_request(nxt_task_t *task, nxt_event_conn_t *c, +nxt_controller_process_request(nxt_task_t *task, nxt_conn_t *c, nxt_controller_request_t *req) { nxt_int_t rc; @@ -737,7 +737,7 @@ done: static nxt_int_t -nxt_controller_response(nxt_task_t *task, nxt_event_conn_t *c, +nxt_controller_response(nxt_task_t *task, nxt_conn_t *c, nxt_controller_response_t *resp) { size_t size; @@ -765,7 +765,7 @@ nxt_controller_response(nxt_task_t *task, nxt_event_conn_t *c, c->write = b; c->write_state = &nxt_controller_conn_write_state; - nxt_event_conn_write(task->thread->engine, c); + nxt_conn_write(task->thread->engine, c); return NXT_OK; } diff --git a/src/nxt_devpoll_engine.c b/src/nxt_devpoll_engine.c index 37435661..faa6368a 100644 --- a/src/nxt_devpoll_engine.c +++ b/src/nxt_devpoll_engine.c @@ -85,7 +85,7 @@ const nxt_event_interface_t nxt_devpoll_engine = { NULL, nxt_devpoll_poll, - &nxt_unix_event_conn_io, + &nxt_unix_conn_io, NXT_NO_FILE_EVENTS, NXT_NO_SIGNAL_EVENTS, diff --git a/src/nxt_epoll_engine.c b/src/nxt_epoll_engine.c index 4ca878e6..410c542d 100644 --- a/src/nxt_epoll_engine.c +++ b/src/nxt_epoll_engine.c @@ -38,10 +38,9 @@ static nxt_int_t nxt_epoll_edge_create(nxt_event_engine_t *engine, static nxt_int_t nxt_epoll_level_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, nxt_uint_t mevents); static nxt_int_t nxt_epoll_create(nxt_event_engine_t *engine, - nxt_uint_t mchanges, nxt_uint_t mevents, nxt_event_conn_io_t *io, - uint32_t mode); + nxt_uint_t mchanges, nxt_uint_t mevents, nxt_conn_io_t *io, uint32_t mode); static void nxt_epoll_test_accept4(nxt_event_engine_t *engine, - nxt_event_conn_io_t *io); + nxt_conn_io_t *io); static void nxt_epoll_free(nxt_event_engine_t *engine); static void nxt_epoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); static void nxt_epoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev); @@ -83,28 +82,27 @@ static void nxt_epoll_signal(nxt_event_engine_t *engine, nxt_uint_t signo); static void nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout); #if (NXT_HAVE_ACCEPT4) -static void nxt_epoll_event_conn_io_accept4(nxt_task_t *task, void *obj, +static void nxt_epoll_conn_io_accept4(nxt_task_t *task, void *obj, void *data); #endif #if (NXT_HAVE_EPOLL_EDGE) -static void nxt_epoll_edge_event_conn_io_connect(nxt_task_t *task, void *obj, +static void nxt_epoll_edge_conn_io_connect(nxt_task_t *task, void *obj, void *data); -static void nxt_epoll_edge_event_conn_connected(nxt_task_t *task, void *obj, +static void nxt_epoll_edge_conn_connected(nxt_task_t *task, void *obj, void *data); -static ssize_t nxt_epoll_edge_event_conn_io_recvbuf(nxt_event_conn_t *c, - nxt_buf_t *b); +static ssize_t nxt_epoll_edge_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b); -static nxt_event_conn_io_t nxt_epoll_edge_event_conn_io = { - nxt_epoll_edge_event_conn_io_connect, - nxt_event_conn_io_accept, +static nxt_conn_io_t nxt_epoll_edge_conn_io = { + nxt_epoll_edge_conn_io_connect, + nxt_conn_io_accept, - nxt_event_conn_io_read, - nxt_epoll_edge_event_conn_io_recvbuf, - nxt_event_conn_io_recv, + nxt_conn_io_read, + nxt_epoll_edge_conn_io_recvbuf, + nxt_conn_io_recv, nxt_conn_io_write, nxt_event_conn_io_write_chunk, @@ -118,7 +116,7 @@ static nxt_event_conn_io_t nxt_epoll_edge_event_conn_io = { nxt_event_conn_io_writev, nxt_event_conn_io_send, - nxt_event_conn_io_shutdown, + nxt_conn_io_shutdown, }; @@ -150,7 +148,7 @@ const nxt_event_interface_t nxt_epoll_edge_engine = { #endif nxt_epoll_poll, - &nxt_epoll_edge_event_conn_io, + &nxt_epoll_edge_conn_io, #if (NXT_HAVE_INOTIFY) NXT_FILE_EVENTS, @@ -196,7 +194,7 @@ const nxt_event_interface_t nxt_epoll_level_engine = { #endif nxt_epoll_poll, - &nxt_unix_event_conn_io, + &nxt_unix_conn_io, #if (NXT_HAVE_INOTIFY) NXT_FILE_EVENTS, @@ -218,8 +216,7 @@ static nxt_int_t nxt_epoll_edge_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, nxt_uint_t mevents) { - return nxt_epoll_create(engine, mchanges, mevents, - &nxt_epoll_edge_event_conn_io, + return nxt_epoll_create(engine, mchanges, mevents, &nxt_epoll_edge_conn_io, EPOLLET | EPOLLRDHUP); } @@ -231,13 +228,13 @@ nxt_epoll_level_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, nxt_uint_t mevents) { return nxt_epoll_create(engine, mchanges, mevents, - &nxt_unix_event_conn_io, 0); + &nxt_unix_conn_io, 0); } static nxt_int_t nxt_epoll_create(nxt_event_engine_t *engine, nxt_uint_t mchanges, - nxt_uint_t mevents, nxt_event_conn_io_t *io, uint32_t mode) + nxt_uint_t mevents, nxt_conn_io_t *io, uint32_t mode) { engine->u.epoll.fd = -1; engine->u.epoll.mode = mode; @@ -290,7 +287,7 @@ fail: static void -nxt_epoll_test_accept4(nxt_event_engine_t *engine, nxt_event_conn_io_t *io) +nxt_epoll_test_accept4(nxt_event_engine_t *engine, nxt_conn_io_t *io) { static nxt_work_handler_t handler; @@ -303,7 +300,7 @@ nxt_epoll_test_accept4(nxt_event_engine_t *engine, nxt_event_conn_io_t *io) (void) accept4(-1, NULL, NULL, SOCK_NONBLOCK); if (nxt_errno != NXT_ENOSYS) { - handler = nxt_epoll_event_conn_io_accept4; + handler = nxt_epoll_conn_io_accept4; } else { nxt_log(&engine->task, NXT_LOG_INFO, "accept4() failed %E", @@ -985,19 +982,19 @@ nxt_epoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout) #if (NXT_HAVE_ACCEPT4) static void -nxt_epoll_event_conn_io_accept4(nxt_task_t *task, void *obj, void *data) +nxt_epoll_conn_io_accept4(nxt_task_t *task, void *obj, void *data) { - socklen_t len; - nxt_socket_t s; - struct sockaddr *sa; - nxt_event_conn_t *c; - nxt_event_conn_listen_t *cls; + socklen_t len; + nxt_conn_t *c; + nxt_socket_t s; + struct sockaddr *sa; + nxt_listen_event_t *lev; - cls = obj; - c = cls->next; + lev = obj; + c = lev->next; - cls->ready--; - cls->socket.read_ready = (cls->ready != 0); + lev->ready--; + lev->socket.read_ready = (lev->ready != 0); len = c->remote->socklen; @@ -1009,18 +1006,18 @@ nxt_epoll_event_conn_io_accept4(nxt_task_t *task, void *obj, void *data) len = 0; } - s = accept4(cls->socket.fd, sa, &len, SOCK_NONBLOCK); + s = accept4(lev->socket.fd, sa, &len, SOCK_NONBLOCK); if (s != -1) { c->socket.fd = s; - nxt_debug(task, "accept4(%d): %d", cls->socket.fd, s); + nxt_debug(task, "accept4(%d): %d", lev->socket.fd, s); - nxt_event_conn_accept(task, cls, c); + nxt_conn_accept(task, lev, c); return; } - nxt_event_conn_accept_error(task, cls, "accept4", nxt_errno); + nxt_conn_accept_error(task, lev, "accept4", nxt_errno); } #endif @@ -1039,9 +1036,9 @@ nxt_epoll_event_conn_io_accept4(nxt_task_t *task, void *obj, void *data) */ static void -nxt_epoll_edge_event_conn_io_connect(nxt_task_t *task, void *obj, void *data) +nxt_epoll_edge_conn_io_connect(nxt_task_t *task, void *obj, void *data) { - nxt_event_conn_t *c; + nxt_conn_t *c; nxt_event_engine_t *engine; nxt_work_handler_t handler; const nxt_event_conn_state_t *state; @@ -1058,11 +1055,11 @@ nxt_epoll_edge_event_conn_io_connect(nxt_task_t *task, void *obj, void *data) break; case NXT_AGAIN: - c->socket.write_handler = nxt_epoll_edge_event_conn_connected; - c->socket.error_handler = nxt_event_conn_connect_error; + c->socket.write_handler = nxt_epoll_edge_conn_connected; + c->socket.error_handler = nxt_conn_connect_error; engine = task->thread->engine; - nxt_event_conn_timer(engine, c, state, &c->write_timer); + nxt_conn_timer(engine, c, state, &c->write_timer); nxt_epoll_enable(engine, &c->socket); c->socket.read = NXT_EVENT_BLOCKED; @@ -1070,7 +1067,7 @@ nxt_epoll_edge_event_conn_io_connect(nxt_task_t *task, void *obj, void *data) #if 0 case NXT_AGAIN: - nxt_event_conn_timer(engine, c, state, &c->write_timer); + nxt_conn_timer(engine, c, state, &c->write_timer); /* Fall through. */ @@ -1111,9 +1108,9 @@ nxt_epoll_edge_event_conn_io_connect(nxt_task_t *task, void *obj, void *data) static void -nxt_epoll_edge_event_conn_connected(nxt_task_t *task, void *obj, void *data) +nxt_epoll_edge_conn_connected(nxt_task_t *task, void *obj, void *data) { - nxt_event_conn_t *c; + nxt_conn_t *c; c = obj; @@ -1131,22 +1128,22 @@ nxt_epoll_edge_event_conn_connected(nxt_task_t *task, void *obj, void *data) return; } - nxt_event_conn_connect_test(task, c, data); + nxt_conn_connect_test(task, c, data); } /* - * nxt_epoll_edge_event_conn_io_recvbuf() is just wrapper around - * standard nxt_event_conn_io_recvbuf() to enforce to read a pending EOF + * nxt_epoll_edge_conn_io_recvbuf() is just wrapper around + * standard nxt_conn_io_recvbuf() to enforce to read a pending EOF * in edge-triggered mode. */ static ssize_t -nxt_epoll_edge_event_conn_io_recvbuf(nxt_event_conn_t *c, nxt_buf_t *b) +nxt_epoll_edge_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b) { ssize_t n; - n = nxt_event_conn_io_recvbuf(c, b); + n = nxt_conn_io_recvbuf(c, b); if (n > 0 && c->socket.epoll_eof) { c->socket.read_ready = 1; diff --git a/src/nxt_event_conn_accept.c b/src/nxt_event_conn_accept.c deleted file mode 100644 index ced2a3d8..00000000 --- a/src/nxt_event_conn_accept.c +++ /dev/null @@ -1,413 +0,0 @@ - -/* - * Copyright (C) Igor Sysoev - * Copyright (C) NGINX, Inc. - */ - -#include <nxt_main.h> - - -/* - * A listen socket handler calls an event facility specific io_accept() - * method. The method accept()s a new connection and then calls - * nxt_event_conn_accept() to handle the new connection and to prepare - * for a next connection to avoid just dropping next accept()ed socket - * if no more connections allowed. If there are no available connections - * an idle connection would be closed. If there are no idle connections - * then new connections will not be accept()ed for 1 second. - */ - - -static nxt_event_conn_t *nxt_event_conn_accept_alloc(nxt_task_t *task, - nxt_event_conn_listen_t *cls); -static void nxt_event_conn_listen_handler(nxt_task_t *task, void *obj, - void *data); -static nxt_event_conn_t *nxt_event_conn_accept_next(nxt_task_t *task, - nxt_event_conn_listen_t *cls); -static nxt_int_t nxt_event_conn_accept_close_idle(nxt_task_t *task, - nxt_event_conn_listen_t *cls); -static void nxt_event_conn_listen_event_error(nxt_task_t *task, void *obj, - void *data); -static void nxt_event_conn_listen_timer_handler(nxt_task_t *task, void *obj, - void *data); - - -nxt_event_conn_listen_t * -nxt_listen_event(nxt_task_t *task, nxt_listen_socket_t *ls) -{ - nxt_event_engine_t *engine; - nxt_event_conn_listen_t *cls; - - cls = nxt_zalloc(sizeof(nxt_event_conn_listen_t)); - - if (nxt_fast_path(cls != NULL)) { - cls->socket.fd = ls->socket; - - engine = task->thread->engine; - cls->batch = engine->batch; - - cls->socket.read_work_queue = &engine->accept_work_queue; - cls->socket.read_handler = nxt_event_conn_listen_handler; - cls->socket.error_handler = nxt_event_conn_listen_event_error; - cls->socket.log = &nxt_main_log; - - cls->accept = engine->event.io->accept; - - cls->listen = ls; - cls->work_queue = &engine->read_work_queue; - - cls->timer.work_queue = &engine->fast_work_queue; - cls->timer.handler = nxt_event_conn_listen_timer_handler; - cls->timer.log = &nxt_main_log; - - cls->task.thread = task->thread; - cls->task.log = &nxt_main_log; - cls->task.ident = nxt_task_next_ident(); - cls->socket.task = &cls->task; - cls->timer.task = &cls->task; - - if (nxt_event_conn_accept_alloc(task, cls) != NULL) { - nxt_fd_event_enable_accept(engine, &cls->socket); - - nxt_queue_insert_head(&engine->listen_connections, &cls->link); - } - - return cls; - } - - return NULL; -} - - -nxt_int_t -nxt_event_conn_listen(nxt_task_t *task, nxt_listen_socket_t *ls) -{ - nxt_event_engine_t *engine; - nxt_event_conn_listen_t *cls; - - cls = nxt_zalloc(sizeof(nxt_event_conn_listen_t)); - - if (nxt_fast_path(cls != NULL)) { - cls->socket.fd = ls->socket; - - engine = task->thread->engine; - cls->batch = engine->batch; - - cls->socket.read_work_queue = &engine->accept_work_queue; - cls->socket.read_handler = nxt_event_conn_listen_handler; - cls->socket.error_handler = nxt_event_conn_listen_event_error; - cls->socket.log = &nxt_main_log; - - cls->accept = engine->event.io->accept; - - cls->listen = ls; - - cls->timer.work_queue = &engine->fast_work_queue; - cls->timer.handler = nxt_event_conn_listen_timer_handler; - cls->timer.log = &nxt_main_log; - - cls->task.thread = task->thread; - cls->task.log = &nxt_main_log; - cls->task.ident = nxt_task_next_ident(); - cls->socket.task = &cls->task; - cls->timer.task = &cls->task; - - if (nxt_event_conn_accept_alloc(task, cls) != NULL) { - nxt_fd_event_enable_accept(engine, &cls->socket); - - nxt_queue_insert_head(&engine->listen_connections, &cls->link); - } - - return NXT_OK; - } - - return NXT_ERROR; -} - - -static nxt_event_conn_t * -nxt_event_conn_accept_alloc(nxt_task_t *task, nxt_event_conn_listen_t *cls) -{ - nxt_sockaddr_t *sa, *remote; - nxt_mem_pool_t *mp; - nxt_event_conn_t *c; - nxt_event_engine_t *engine; - nxt_listen_socket_t *ls; - - engine = task->thread->engine; - - if (engine->connections < engine->max_connections) { - - mp = nxt_mem_pool_create(cls->listen->mem_pool_size); - - if (nxt_fast_path(mp != NULL)) { - /* This allocation cannot fail. */ - c = nxt_event_conn_create(mp, cls->socket.task); - - cls->next = c; - c->socket.read_work_queue = cls->socket.read_work_queue; - c->socket.write_ready = 1; - c->listen = cls; - - ls = cls->listen; - /* This allocation cannot fail. */ - remote = nxt_sockaddr_alloc(mp, ls->socklen, ls->address_length); - c->remote = remote; - - sa = ls->sockaddr; - remote->type = sa->type; - /* - * Set address family for unspecified Unix domain, - * because these sockaddr's are not be passed to accept(). - */ - remote->u.sockaddr.sa_family = sa->u.sockaddr.sa_family; - - return c; - } - } - - return NULL; -} - - -static void -nxt_event_conn_listen_handler(nxt_task_t *task, void *obj, void *data) -{ - nxt_event_conn_listen_t *cls; - - cls = obj; - cls->ready = cls->batch; - - cls->accept(task, cls, data); -} - - -void -nxt_event_conn_io_accept(nxt_task_t *task, void *obj, void *data) -{ - socklen_t len; - nxt_socket_t s; - struct sockaddr *sa; - nxt_event_conn_t *c; - nxt_event_conn_listen_t *cls; - - cls = obj; - c = cls->next; - - cls->ready--; - cls->socket.read_ready = (cls->ready != 0); - - len = c->remote->socklen; - - if (len >= sizeof(struct sockaddr)) { - sa = &c->remote->u.sockaddr; - - } else { - sa = NULL; - len = 0; - } - - s = accept(cls->socket.fd, sa, &len); - - if (s == -1) { - nxt_event_conn_accept_error(task, cls, "accept", nxt_socket_errno); - return; - } - - c->socket.fd = s; - -#if (NXT_LINUX) - /* - * Linux does not inherit non-blocking mode - * from listen socket for accept()ed socket. - */ - if (nxt_slow_path(nxt_socket_nonblocking(task, s) != NXT_OK)) { - nxt_socket_close(task, s); - } - -#endif - - nxt_debug(task, "accept(%d): %d", cls->socket.fd, s); - - nxt_event_conn_accept(task, cls, c); -} - - -void -nxt_event_conn_accept(nxt_task_t *task, nxt_event_conn_listen_t *cls, - nxt_event_conn_t *c) -{ - nxt_event_conn_t *next; - - nxt_sockaddr_text(c->remote); - - nxt_debug(task, "client: %*s", - c->remote->address_length, nxt_sockaddr_address(c->remote)); - - nxt_queue_insert_head(&task->thread->engine->idle_connections, &c->link); - - c->read_work_queue = cls->work_queue; - c->write_work_queue = cls->work_queue; - - if (cls->listen->read_after_accept) { - - //c->socket.read_ready = 1; -// cls->listen->handler(task, c, cls->socket.data); - nxt_work_queue_add(c->read_work_queue, cls->listen->handler, - task, c, cls->socket.data); - - } else { - nxt_work_queue_add(c->write_work_queue, cls->listen->handler, - task, c, cls->socket.data); - } - - next = nxt_event_conn_accept_next(task, cls); - - if (next != NULL && cls->socket.read_ready) { - nxt_work_queue_add(cls->socket.read_work_queue, - cls->accept, task, cls, next); - } -} - - -static nxt_event_conn_t * -nxt_event_conn_accept_next(nxt_task_t *task, nxt_event_conn_listen_t *cls) -{ - nxt_event_conn_t *c; - - cls->next = NULL; - - do { - c = nxt_event_conn_accept_alloc(task, cls); - - if (nxt_fast_path(c != NULL)) { - return c; - } - - } while (nxt_event_conn_accept_close_idle(task, cls) == NXT_OK); - - nxt_log(task, NXT_LOG_CRIT, "no available connections, " - "new connections are not accepted within 1s"); - - return NULL; -} - - -static nxt_int_t -nxt_event_conn_accept_close_idle(nxt_task_t *task, nxt_event_conn_listen_t *cls) -{ - nxt_queue_t *idle; - nxt_queue_link_t *link; - nxt_event_conn_t *c; - nxt_event_engine_t *engine; - - static nxt_log_moderation_t nxt_idle_close_log_moderation = { - NXT_LOG_INFO, 2, "idle connections closed", NXT_LOG_MODERATION - }; - - engine = task->thread->engine; - - idle = &engine->idle_connections; - - for (link = nxt_queue_last(idle); - link != nxt_queue_head(idle); - link = nxt_queue_next(link)) - { - c = nxt_queue_link_data(link, nxt_event_conn_t, link); - - if (!c->socket.read_ready) { - nxt_log_moderate(&nxt_idle_close_log_moderation, NXT_LOG_INFO, - task->log, "no available connections, " - "close idle connection"); - nxt_queue_remove(link); - nxt_event_conn_close(engine, c); - - return NXT_OK; - } - } - - nxt_timer_add(engine, &cls->timer, 1000); - - nxt_fd_event_disable_read(engine, &cls->socket); - - return NXT_DECLINED; -} - - -void -nxt_event_conn_accept_error(nxt_task_t *task, nxt_event_conn_listen_t *cls, - const char *accept_syscall, nxt_err_t err) -{ - static nxt_log_moderation_t nxt_accept_log_moderation = { - NXT_LOG_INFO, 2, "accept() failed", NXT_LOG_MODERATION - }; - - cls->socket.read_ready = 0; - - switch (err) { - - case NXT_EAGAIN: - nxt_debug(task, "%s(%d) %E", accept_syscall, cls->socket.fd, err); - return; - - case ECONNABORTED: - nxt_log_moderate(&nxt_accept_log_moderation, NXT_LOG_WARN, - task->log, "%s(%d) failed %E", - accept_syscall, cls->socket.fd, err); - return; - - case EMFILE: - case ENFILE: - case ENOBUFS: - case ENOMEM: - if (nxt_event_conn_accept_close_idle(task, cls) != NXT_OK) { - nxt_log(task, NXT_LOG_CRIT, "%s(%d) failed %E, " - "new connections are not accepted within 1s", - accept_syscall, cls->socket.fd, err); - } - - return; - - default: - nxt_log(task, NXT_LOG_CRIT, "%s(%d) failed %E", - accept_syscall, cls->socket.fd, err); - return; - } -} - - -static void -nxt_event_conn_listen_timer_handler(nxt_task_t *task, void *obj, void *data) -{ - nxt_timer_t *ev; - nxt_event_conn_t *c; - nxt_event_conn_listen_t *cls; - - ev = obj; - - cls = nxt_timer_data(ev, nxt_event_conn_listen_t, timer); - c = cls->next; - - if (c == NULL) { - c = nxt_event_conn_accept_next(task, cls); - - if (c == NULL) { - return; - } - } - - nxt_fd_event_enable_accept(task->thread->engine, &cls->socket); - - cls->accept(task, cls, c); -} - - -static void -nxt_event_conn_listen_event_error(nxt_task_t *task, void *obj, void *data) -{ - nxt_fd_event_t *ev; - - ev = obj; - - nxt_log(task, NXT_LOG_CRIT, "accept(%d) event error", ev->fd); -} diff --git a/src/nxt_event_conn_job_sendfile.c b/src/nxt_event_conn_job_sendfile.c index dae5079c..15d7fe1d 100644 --- a/src/nxt_event_conn_job_sendfile.c +++ b/src/nxt_event_conn_job_sendfile.c @@ -25,11 +25,11 @@ static void nxt_event_conn_job_sendfile_handler(nxt_task_t *task, void *obj, static void nxt_event_conn_job_sendfile_return(nxt_task_t *task, void *obj, void *data); static nxt_buf_t *nxt_event_conn_job_sendfile_completion(nxt_task_t *task, - nxt_event_conn_t *c, nxt_buf_t *b); + nxt_conn_t *c, nxt_buf_t *b); void -nxt_event_conn_job_sendfile(nxt_task_t *task, nxt_event_conn_t *c) +nxt_event_conn_job_sendfile(nxt_task_t *task, nxt_conn_t *c) { nxt_fd_event_disable(task->thread->engine, &c->socket); @@ -41,8 +41,8 @@ nxt_event_conn_job_sendfile(nxt_task_t *task, nxt_event_conn_t *c) static void nxt_event_conn_job_sendfile_start(nxt_task_t *task, void *obj, void *data) { + nxt_conn_t *c; nxt_iobuf_t b; - nxt_event_conn_t *c; nxt_job_sendfile_t *jbs; nxt_sendbuf_coalesce_t sb; @@ -99,7 +99,7 @@ nxt_event_conn_job_sendfile_handler(nxt_task_t *task, void *obj, void *data) ssize_t ret; nxt_buf_t *b; nxt_bool_t first; - nxt_event_conn_t *c; + nxt_conn_t *c; nxt_job_sendfile_t *jbs; jbs = obj; @@ -166,7 +166,7 @@ nxt_event_conn_job_sendfile_return(nxt_task_t *task, void *obj, void *data) size_t sent; nxt_buf_t *b; nxt_bool_t done; - nxt_event_conn_t *c; + nxt_conn_t *c; nxt_job_sendfile_t *jbs; jbs = obj; @@ -212,8 +212,8 @@ nxt_event_conn_job_sendfile_return(nxt_task_t *task, void *obj, void *data) if (c->socket.error == 0 && !nxt_event_conn_write_delayed(task->thread->engine, c, sent)) { - nxt_event_conn_timer(task->thread->engine, c, c->write_state, - &c->write_timer); + nxt_conn_timer(task->thread->engine, c, c->write_state, + &c->write_timer); nxt_fd_event_oneshot_write(task->thread->engine, &c->socket); } @@ -235,7 +235,7 @@ nxt_event_conn_job_sendfile_return(nxt_task_t *task, void *obj, void *data) static nxt_buf_t * -nxt_event_conn_job_sendfile_completion(nxt_task_t *task, nxt_event_conn_t *c, +nxt_event_conn_job_sendfile_completion(nxt_task_t *task, nxt_conn_t *c, nxt_buf_t *b) { while (b != NULL) { diff --git a/src/nxt_event_conn_proxy.c b/src/nxt_event_conn_proxy.c deleted file mode 100644 index 45a6b257..00000000 --- a/src/nxt_event_conn_proxy.c +++ /dev/null @@ -1,1017 +0,0 @@ - -/* - * Copyright (C) Igor Sysoev - * Copyright (C) NGINX, Inc. - */ - -#include <nxt_main.h> - - -static void nxt_event_conn_proxy_client_buffer_alloc(nxt_task_t *task, - void *obj, void *data); -static void nxt_event_conn_proxy_peer_connect(nxt_task_t *task, void *obj, - void *data); -static void nxt_event_conn_proxy_connected(nxt_task_t *task, void *obj, - void *data); -static void nxt_event_conn_proxy_peer_read(nxt_task_t *task, void *obj, - void *data); -static void nxt_event_conn_proxy_client_read_ready(nxt_task_t *task, void *obj, - void *data); -static void nxt_event_conn_proxy_peer_read_ready(nxt_task_t *task, void *obj, - void *data); -static void nxt_event_conn_proxy_read_process(nxt_task_t *task, - nxt_event_conn_proxy_t *p, nxt_event_conn_t *source, - nxt_event_conn_t *sink); -static void nxt_event_conn_proxy_write_add(nxt_event_conn_t *c, nxt_buf_t *b); -static void nxt_event_conn_proxy_read(nxt_task_t *task, void *obj, void *data); -static void nxt_event_conn_proxy_client_write_ready(nxt_task_t *task, void *obj, - void *data); -static void nxt_event_conn_proxy_peer_write_ready(nxt_task_t *task, void *obj, - void *data); -static void nxt_event_conn_proxy_write_process(nxt_task_t *task, - nxt_event_conn_proxy_t *p, nxt_event_conn_t *sink, - nxt_event_conn_t *source); -static void nxt_event_conn_proxy_read_add(nxt_event_conn_t *c, nxt_buf_t *b); -static void nxt_event_conn_proxy_close(nxt_task_t *task, void *obj, void *data); -static void nxt_event_conn_proxy_error(nxt_task_t *task, void *obj, void *data); -static void nxt_event_conn_proxy_read_timeout(nxt_task_t *task, void *obj, - void *data); -static void nxt_event_conn_proxy_write_timeout(nxt_task_t *task, void *obj, - void *data); -static nxt_msec_t nxt_event_conn_proxy_timeout_value(nxt_event_conn_t *c, - uintptr_t data); -static void nxt_event_conn_proxy_refused(nxt_task_t *task, void *obj, - void *data); -static void nxt_event_conn_proxy_reconnect_handler(nxt_task_t *task, void *obj, - void *data); -static void nxt_event_conn_proxy_shutdown(nxt_task_t *task, - nxt_event_conn_proxy_t *p, nxt_event_conn_t *source, - nxt_event_conn_t *sink); -static void nxt_event_conn_proxy_read_error(nxt_task_t *task, void *obj, - void *data); -static void nxt_event_conn_proxy_write_error(nxt_task_t *task, void *obj, - void *data); -static void nxt_event_conn_proxy_complete(nxt_task_t *task, - nxt_event_conn_proxy_t *p); -static void nxt_event_conn_proxy_completion(nxt_task_t *task, void *obj, - void *data); - - -static const nxt_event_conn_state_t nxt_event_conn_proxy_client_wait_state; -static const nxt_event_conn_state_t - nxt_event_conn_proxy_client_first_read_state; -static const nxt_event_conn_state_t nxt_event_conn_proxy_peer_connect_state; -static const nxt_event_conn_state_t nxt_event_conn_proxy_peer_wait_state; -static const nxt_event_conn_state_t nxt_event_conn_proxy_client_read_state; -static const nxt_event_conn_state_t nxt_event_conn_proxy_peer_read_state; -static const nxt_event_conn_state_t nxt_event_conn_proxy_client_write_state; -static const nxt_event_conn_state_t nxt_event_conn_proxy_peer_write_state; - - -nxt_event_conn_proxy_t * -nxt_event_conn_proxy_create(nxt_event_conn_t *client) -{ - nxt_thread_t *thr; - nxt_event_conn_t *peer; - nxt_event_conn_proxy_t *p; - - p = nxt_mem_zalloc(client->mem_pool, sizeof(nxt_event_conn_proxy_t)); - if (nxt_slow_path(p == NULL)) { - return NULL; - } - - peer = nxt_event_conn_create(client->mem_pool, client->socket.task); - if (nxt_slow_path(peer == NULL)) { - return NULL; - } - - thr = nxt_thread(); - - client->read_work_queue = &thr->engine->read_work_queue; - client->write_work_queue = &thr->engine->write_work_queue; - client->socket.read_work_queue = &thr->engine->read_work_queue; - client->socket.write_work_queue = &thr->engine->write_work_queue; - peer->socket.read_work_queue = &thr->engine->read_work_queue; - peer->socket.write_work_queue = &thr->engine->write_work_queue; - - peer->socket.data = client->socket.data; - - peer->read_work_queue = client->read_work_queue; - peer->write_work_queue = client->write_work_queue; - peer->read_timer.work_queue = client->read_work_queue; - peer->write_timer.work_queue = client->write_work_queue; - - p->client = client; - p->peer = peer; - - return p; -} - - -void -nxt_event_conn_proxy(nxt_task_t *task, nxt_event_conn_proxy_t *p) -{ - nxt_event_conn_t *peer; - - /* - * Peer read event: not connected, disabled. - * Peer write event: not connected, disabled. - */ - - if (p->client_wait_timeout == 0) { - /* - * Peer write event: waiting for connection - * to be established with connect_timeout. - */ - peer = p->peer; - peer->write_state = &nxt_event_conn_proxy_peer_connect_state; - - nxt_event_conn_connect(task->thread->engine, peer); - } - - /* - * Client read event: waiting for client data with - * client_wait_timeout before buffer allocation. - */ - p->client->read_state = &nxt_event_conn_proxy_client_wait_state; - - nxt_conn_wait(p->client); -} - - -static const nxt_event_conn_state_t nxt_event_conn_proxy_client_wait_state - nxt_aligned(64) = -{ - .ready_handler = nxt_event_conn_proxy_client_buffer_alloc, - .close_handler = nxt_event_conn_proxy_close, - .error_handler = nxt_event_conn_proxy_error, - - .timer_handler = nxt_event_conn_proxy_read_timeout, - .timer_value = nxt_event_conn_proxy_timeout_value, - .timer_data = offsetof(nxt_event_conn_proxy_t, client_wait_timeout), -}; - - -static void -nxt_event_conn_proxy_client_buffer_alloc(nxt_task_t *task, void *obj, - void *data) -{ - nxt_buf_t *b; - nxt_event_conn_t *client; - nxt_event_conn_proxy_t *p; - - client = obj; - p = data; - - nxt_debug(task, "event conn proxy client first read fd:%d", - client->socket.fd); - - b = nxt_buf_mem_alloc(client->mem_pool, p->client_buffer_size, - NXT_MEM_BUF_CUTBACK | NXT_MEM_BUF_USABLE); - - if (nxt_slow_path(b == NULL)) { - /* An error completion. */ - nxt_event_conn_proxy_complete(task, p); - return; - } - - p->client_buffer = b; - client->read = b; - - if (p->peer->socket.fd != -1) { - /* - * Client read event: waiting, no timeout. - * Client write event: blocked. - * Peer read event: disabled. - * Peer write event: waiting for connection to be established - * or blocked after the connection has established. - */ - client->read_state = &nxt_event_conn_proxy_client_read_state; - - } else { - /* - * Client read event: waiting for data with client_wait_timeout - * before connecting to a peer. - * Client write event: blocked. - * Peer read event: not connected, disabled. - * Peer write event: not connected, disabled. - */ - client->read_state = &nxt_event_conn_proxy_client_first_read_state; - } - - nxt_event_conn_read(task->thread->engine, client); -} - - -static const nxt_event_conn_state_t - nxt_event_conn_proxy_client_first_read_state - nxt_aligned(64) = -{ - .ready_handler = nxt_event_conn_proxy_peer_connect, - .close_handler = nxt_event_conn_proxy_close, - .error_handler = nxt_event_conn_proxy_error, - - .timer_handler = nxt_event_conn_proxy_read_timeout, - .timer_value = nxt_event_conn_proxy_timeout_value, - .timer_data = offsetof(nxt_event_conn_proxy_t, client_wait_timeout), - .timer_autoreset = 1, -}; - - -static void -nxt_event_conn_proxy_peer_connect(nxt_task_t *task, void *obj, void *data) -{ - nxt_event_conn_t *client; - nxt_event_conn_proxy_t *p; - - client = obj; - p = data; - - /* - * Client read event: waiting, no timeout. - * Client write event: blocked. - * Peer read event: disabled. - * Peer write event: waiting for connection to be established - * with connect_timeout. - */ - client->read_state = &nxt_event_conn_proxy_client_read_state; - - p->peer->write_state = &nxt_event_conn_proxy_peer_connect_state; - - nxt_event_conn_connect(task->thread->engine, p->peer); -} - - -static const nxt_event_conn_state_t nxt_event_conn_proxy_peer_connect_state - nxt_aligned(64) = -{ - .ready_handler = nxt_event_conn_proxy_connected, - .close_handler = nxt_event_conn_proxy_refused, - .error_handler = nxt_event_conn_proxy_error, - - .timer_handler = nxt_event_conn_proxy_write_timeout, - .timer_value = nxt_event_conn_proxy_timeout_value, - .timer_data = offsetof(nxt_event_conn_proxy_t, connect_timeout), - .timer_autoreset = 1, -}; - - -static void -nxt_event_conn_proxy_connected(nxt_task_t *task, void *obj, void *data) -{ - nxt_event_conn_t *client, *peer; - nxt_event_conn_proxy_t *p; - - peer = obj; - p = data; - - nxt_debug(task, "event conn proxy connected fd:%d", peer->socket.fd); - - p->connected = 1; - - nxt_event_conn_tcp_nodelay_on(task, peer); - nxt_event_conn_tcp_nodelay_on(task, p->client); - - /* Peer read event: waiting with peer_wait_timeout. */ - - peer->read_state = &nxt_event_conn_proxy_peer_wait_state; - peer->write_state = &nxt_event_conn_proxy_peer_write_state; - - nxt_conn_wait(peer); - - if (p->client_buffer != NULL) { - client = p->client; - - client->read_state = &nxt_event_conn_proxy_client_read_state; - client->write_state = &nxt_event_conn_proxy_client_write_state; - /* - * Send a client read data to the connected peer. - * Client write event: blocked. - */ - nxt_event_conn_proxy_read_process(task, p, client, peer); - } -} - - -static const nxt_event_conn_state_t nxt_event_conn_proxy_peer_wait_state - nxt_aligned(64) = -{ - .ready_handler = nxt_event_conn_proxy_peer_read, - .close_handler = nxt_event_conn_proxy_close, - .error_handler = nxt_event_conn_proxy_error, - - .timer_handler = nxt_event_conn_proxy_read_timeout, - .timer_value = nxt_event_conn_proxy_timeout_value, - .timer_data = offsetof(nxt_event_conn_proxy_t, peer_wait_timeout), -}; - - -static void -nxt_event_conn_proxy_peer_read(nxt_task_t *task, void *obj, void *data) -{ - nxt_buf_t *b; - nxt_event_conn_t *peer; - nxt_event_conn_proxy_t *p; - - peer = obj; - p = data; - - nxt_debug(task, "event conn proxy peer read fd:%d", peer->socket.fd); - - b = nxt_buf_mem_alloc(peer->mem_pool, p->peer_buffer_size, - NXT_MEM_BUF_CUTBACK | NXT_MEM_BUF_USABLE); - - if (nxt_slow_path(b == NULL)) { - /* An error completion. */ - nxt_event_conn_proxy_complete(task, p); - return; - } - - p->peer_buffer = b; - peer->read = b; - - p->client->write_state = &nxt_event_conn_proxy_client_write_state; - peer->read_state = &nxt_event_conn_proxy_peer_read_state; - peer->write_state = &nxt_event_conn_proxy_peer_write_state; - - /* - * Client read event: waiting, no timeout. - * Client write event: blocked. - * Peer read event: waiting with possible peer_wait_timeout. - * Peer write event: blocked. - */ - nxt_event_conn_read(task->thread->engine, peer); -} - - -static const nxt_event_conn_state_t nxt_event_conn_proxy_client_read_state - nxt_aligned(64) = -{ - .ready_handler = nxt_event_conn_proxy_client_read_ready, - .close_handler = nxt_event_conn_proxy_close, - .error_handler = nxt_event_conn_proxy_read_error, -}; - - -static void -nxt_event_conn_proxy_client_read_ready(nxt_task_t *task, void *obj, void *data) -{ - nxt_event_conn_t *client; - nxt_event_conn_proxy_t *p; - - client = obj; - p = data; - - nxt_debug(task, "event conn proxy client read ready fd:%d", - client->socket.fd); - - nxt_event_conn_proxy_read_process(task, p, client, p->peer); -} - - -static const nxt_event_conn_state_t nxt_event_conn_proxy_peer_read_state - nxt_aligned(64) = -{ - .ready_handler = nxt_event_conn_proxy_peer_read_ready, - .close_handler = nxt_event_conn_proxy_close, - .error_handler = nxt_event_conn_proxy_read_error, -}; - - -static void -nxt_event_conn_proxy_peer_read_ready(nxt_task_t *task, void *obj, void *data) -{ - nxt_event_conn_t *peer; - nxt_event_conn_proxy_t *p; - - peer = obj; - p = data; - - nxt_debug(task, "event conn proxy peer read ready fd:%d", peer->socket.fd); - - nxt_event_conn_proxy_read_process(task, p, peer, p->client); -} - - -static void -nxt_event_conn_proxy_read_process(nxt_task_t *task, nxt_event_conn_proxy_t *p, - nxt_event_conn_t *source, nxt_event_conn_t *sink) -{ - nxt_buf_t *rb, *wb; - - if (sink->socket.error != 0) { - nxt_debug(task, "event conn proxy sink fd:%d error:%d", - sink->socket.fd, sink->socket.error); - - nxt_event_conn_proxy_write_error(task, sink, sink->socket.data); - return; - } - - while (source->read != NULL) { - - rb = source->read; - - if (rb->mem.pos != rb->mem.free) { - - /* Add a read part to a write chain. */ - - wb = nxt_buf_mem_alloc(source->mem_pool, 0, 0); - if (wb == NULL) { - /* An error completion. */ - nxt_event_conn_proxy_complete(task, p); - return; - } - - wb->mem.pos = rb->mem.pos; - wb->mem.free = rb->mem.free; - wb->mem.start = rb->mem.pos; - wb->mem.end = rb->mem.free; - - rb->mem.pos = rb->mem.free; - rb->mem.start = rb->mem.free; - - nxt_event_conn_proxy_write_add(sink, wb); - } - - if (rb->mem.start != rb->mem.end) { - nxt_work_queue_add(source->read_work_queue, - nxt_event_conn_proxy_read, - task, source, source->socket.data); - break; - } - - source->read = rb->next; - nxt_buf_free(source->mem_pool, rb); - } - - if (p->connected) { - nxt_event_conn_write(task->thread->engine, sink); - } -} - - -static void -nxt_event_conn_proxy_write_add(nxt_event_conn_t *c, nxt_buf_t *b) -{ - nxt_buf_t *first, *second, *prev; - - first = c->write; - - if (first == NULL) { - c->write = b; - return; - } - - /* - * A event conn proxy maintains a buffer per each direction. - * The buffer is divided by read and write parts. These parts are - * linked in buffer chains. There can be no more than two buffers - * in write chain at any time, because an added buffer is coalesced - * with the last buffer if possible. - */ - - second = first->next; - - if (second == NULL) { - - if (first->mem.end != b->mem.start) { - first->next = b; - return; - } - - /* - * The first buffer is just before the added buffer, so - * expand the first buffer to the end of the added buffer. - */ - prev = first; - - } else { - if (second->mem.end != b->mem.start) { - nxt_thread_log_alert("event conn proxy write: second buffer end:%p " - "is not equal to added buffer start:%p", - second->mem.end, b->mem.start); - return; - } - - /* - * "second->mem.end == b->mem.start" must be always true here, - * that is the second buffer is just before the added buffer, - * so expand the second buffer to the end of added buffer. - */ - prev = second; - } - - prev->mem.free = b->mem.end; - prev->mem.end = b->mem.end; - - nxt_buf_free(c->mem_pool, b); -} - - -static void -nxt_event_conn_proxy_read(nxt_task_t *task, void *obj, void *data) -{ - nxt_event_conn_t *source, *sink; - nxt_event_conn_proxy_t *p; - - source = obj; - p = data; - - nxt_debug(task, "event conn proxy read fd:%d", source->socket.fd); - - if (!source->socket.closed) { - sink = (source == p->client) ? p->peer : p->client; - - if (sink->socket.error == 0) { - nxt_event_conn_read(task->thread->engine, source); - } - } -} - - -static const nxt_event_conn_state_t nxt_event_conn_proxy_client_write_state - nxt_aligned(64) = -{ - .ready_handler = nxt_event_conn_proxy_client_write_ready, - .error_handler = nxt_event_conn_proxy_write_error, - - .timer_handler = nxt_event_conn_proxy_write_timeout, - .timer_value = nxt_event_conn_proxy_timeout_value, - .timer_data = offsetof(nxt_event_conn_proxy_t, client_write_timeout), - .timer_autoreset = 1, -}; - - -static void -nxt_event_conn_proxy_client_write_ready(nxt_task_t *task, void *obj, void *data) -{ - nxt_event_conn_t *client; - nxt_event_conn_proxy_t *p; - - client = obj; - p = data; - - nxt_debug(task, "event conn proxy client write ready fd:%d", - client->socket.fd); - - nxt_event_conn_proxy_write_process(task, p, client, p->peer); -} - - -static const nxt_event_conn_state_t nxt_event_conn_proxy_peer_write_state - nxt_aligned(64) = -{ - .ready_handler = nxt_event_conn_proxy_peer_write_ready, - .error_handler = nxt_event_conn_proxy_write_error, - - .timer_handler = nxt_event_conn_proxy_write_timeout, - .timer_value = nxt_event_conn_proxy_timeout_value, - .timer_data = offsetof(nxt_event_conn_proxy_t, peer_write_timeout), - .timer_autoreset = 1, -}; - - -static void -nxt_event_conn_proxy_peer_write_ready(nxt_task_t *task, void *obj, void *data) -{ - nxt_event_conn_t *peer; - nxt_event_conn_proxy_t *p; - - peer = obj; - p = data; - - nxt_debug(task, "event conn proxy peer write ready fd:%d", peer->socket.fd); - - nxt_event_conn_proxy_write_process(task, p, peer, p->client); -} - - -static void -nxt_event_conn_proxy_write_process(nxt_task_t *task, nxt_event_conn_proxy_t *p, - nxt_event_conn_t *sink, nxt_event_conn_t *source) -{ - nxt_buf_t *rb, *wb; - - while (sink->write != NULL) { - - wb = sink->write; - - if (nxt_buf_is_sync(wb)) { - - /* A sync buffer marks the end of stream. */ - - sink->write = NULL; - nxt_buf_free(sink->mem_pool, wb); - nxt_event_conn_proxy_shutdown(task, p, source, sink); - return; - } - - if (wb->mem.start != wb->mem.pos) { - - /* Add a written part to a read chain. */ - - rb = nxt_buf_mem_alloc(sink->mem_pool, 0, 0); - if (rb == NULL) { - /* An error completion. */ - nxt_event_conn_proxy_complete(task, p); - return; - } - - rb->mem.pos = wb->mem.start; - rb->mem.free = wb->mem.start; - rb->mem.start = wb->mem.start; - rb->mem.end = wb->mem.pos; - - wb->mem.start = wb->mem.pos; - - nxt_event_conn_proxy_read_add(source, rb); - } - - if (wb->mem.pos != wb->mem.free) { - nxt_event_conn_write(task->thread->engine, sink); - - break; - } - - sink->write = wb->next; - nxt_buf_free(sink->mem_pool, wb); - } - - nxt_work_queue_add(source->read_work_queue, - nxt_event_conn_proxy_read, task, source, - source->socket.data); -} - - -static void -nxt_event_conn_proxy_read_add(nxt_event_conn_t *c, nxt_buf_t *b) -{ - nxt_buf_t *first, *second; - - first = c->read; - - if (first == NULL) { - c->read = b; - return; - } - - /* - * A event conn proxy maintains a buffer per each direction. - * The buffer is divided by read and write parts. These parts are - * linked in buffer chains. There can be no more than two buffers - * in read chain at any time, because an added buffer is coalesced - * with the last buffer if possible. The first and the second - * buffers are also coalesced if possible. - */ - - second = first->next; - - if (second == NULL) { - - if (first->mem.start == b->mem.end) { - /* - * The added buffer is just before the first buffer, so expand - * the first buffer to the beginning of the added buffer. - */ - first->mem.pos = b->mem.start; - first->mem.free = b->mem.start; - first->mem.start = b->mem.start; - - } else if (first->mem.end == b->mem.start) { - /* - * The added buffer is just after the first buffer, so - * expand the first buffer to the end of the added buffer. - */ - first->mem.end = b->mem.end; - - } else { - first->next = b; - return; - } - - } else { - if (second->mem.end != b->mem.start) { - nxt_thread_log_alert("event conn proxy read: second buffer end:%p " - "is not equal to added buffer start:%p", - second->mem.end, b->mem.start); - return; - } - - /* - * The added buffer is just after the second buffer, so - * expand the second buffer to the end of the added buffer. - */ - second->mem.end = b->mem.end; - - if (first->mem.start == second->mem.end) { - /* - * The second buffer is just before the first buffer, so expand - * the first buffer to the beginning of the second buffer. - */ - first->mem.pos = second->mem.start; - first->mem.free = second->mem.start; - first->mem.start = second->mem.start; - first->next = NULL; - - nxt_buf_free(c->mem_pool, second); - } - } - - nxt_buf_free(c->mem_pool, b); -} - - -static void -nxt_event_conn_proxy_close(nxt_task_t *task, void *obj, void *data) -{ - nxt_buf_t *b; - nxt_event_conn_t *source, *sink; - nxt_event_conn_proxy_t *p; - - source = obj; - p = data; - - nxt_debug(task, "event conn proxy close fd:%d", source->socket.fd); - - sink = (source == p->client) ? p->peer : p->client; - - if (sink->write == NULL) { - nxt_event_conn_proxy_shutdown(task, p, source, sink); - return; - } - - b = nxt_buf_sync_alloc(source->mem_pool, 0); - if (b == NULL) { - /* An error completion. */ - nxt_event_conn_proxy_complete(task, p); - return; - } - - nxt_buf_chain_add(&sink->write, b); -} - - -static void -nxt_event_conn_proxy_error(nxt_task_t *task, void *obj, void *data) -{ - nxt_event_conn_t *c; - nxt_event_conn_proxy_t *p; - - c = obj; - p = data; - - nxt_debug(task, "event conn proxy error fd:%d", c->socket.fd); - - nxt_event_conn_proxy_close(task, c, p); -} - - -static void -nxt_event_conn_proxy_read_timeout(nxt_task_t *task, void *obj, void *data) -{ - nxt_timer_t *ev; - nxt_event_conn_t *c; - - ev = obj; - - c = nxt_event_read_timer_conn(ev); - c->socket.timedout = 1; - c->socket.closed = 1; - - nxt_debug(task, "event conn proxy read timeout fd:%d", c->socket.fd); - - nxt_event_conn_proxy_close(task, c, c->socket.data); -} - - -static void -nxt_event_conn_proxy_write_timeout(nxt_task_t *task, void *obj, void *data) -{ - nxt_timer_t *ev; - nxt_event_conn_t *c; - - ev = obj; - - c = nxt_event_write_timer_conn(ev); - c->socket.timedout = 1; - c->socket.closed = 1; - - nxt_debug(task, "event conn proxy write timeout fd:%d", c->socket.fd); - - nxt_event_conn_proxy_close(task, c, c->socket.data); -} - - -static nxt_msec_t -nxt_event_conn_proxy_timeout_value(nxt_event_conn_t *c, uintptr_t data) -{ - nxt_msec_t *timer; - nxt_event_conn_proxy_t *p; - - p = c->socket.data; - - timer = (nxt_msec_t *) ((char *) p + data); - - return *timer; -} - - -static void -nxt_event_conn_proxy_refused(nxt_task_t *task, void *obj, void *data) -{ - nxt_event_conn_t *peer; - nxt_event_conn_proxy_t *p; - - peer = obj; - p = data; - - nxt_debug(task, "event conn proxy refused fd:%d", peer->socket.fd); - - if (p->retries == 0) { - /* An error completion. */ - nxt_event_conn_proxy_complete(task, p); - return; - } - - p->retries--; - - nxt_socket_close(task, peer->socket.fd); - peer->socket.fd = -1; - peer->socket.error = 0; - - p->delayed = 1; - - peer->write_timer.handler = nxt_event_conn_proxy_reconnect_handler; - nxt_timer_add(task->thread->engine, &peer->write_timer, - p->reconnect_timeout); -} - - -static void -nxt_event_conn_proxy_reconnect_handler(nxt_task_t *task, void *obj, void *data) -{ - nxt_timer_t *ev; - nxt_event_conn_t *peer; - nxt_event_conn_proxy_t *p; - - ev = obj; - - nxt_debug(task, "event conn proxy reconnect timer"); - - peer = nxt_event_write_timer_conn(ev); - p = peer->socket.data; - - if (p->client->socket.closed) { - nxt_event_conn_proxy_complete(task, p); - return; - } - - p->delayed = 0; - - peer->write_state = &nxt_event_conn_proxy_peer_connect_state; - /* - * Peer read event: disabled. - * Peer write event: waiting for connection with connect_timeout. - */ - nxt_event_conn_connect(task->thread->engine, peer); -} - - -static void -nxt_event_conn_proxy_shutdown(nxt_task_t *task, nxt_event_conn_proxy_t *p, - nxt_event_conn_t *source, nxt_event_conn_t *sink) -{ - nxt_buf_t *b; - - nxt_debug(source->socket.task, - "event conn proxy shutdown source fd:%d cl:%d err:%d", - source->socket.fd, source->socket.closed, source->socket.error); - - nxt_debug(sink->socket.task, - "event conn proxy shutdown sink fd:%d cl:%d err:%d", - sink->socket.fd, sink->socket.closed, sink->socket.error); - - if (!p->connected || p->delayed) { - nxt_event_conn_proxy_complete(task, p); - return; - } - - if (sink->socket.error == 0 && !sink->socket.closed) { - sink->socket.shutdown = 1; - nxt_socket_shutdown(task, sink->socket.fd, SHUT_WR); - } - - if (sink->socket.error != 0 - || (sink->socket.closed && source->write == NULL)) - { - /* The opposite direction also has been already closed. */ - nxt_event_conn_proxy_complete(task, p); - return; - } - - nxt_debug(source->socket.task, "free source buffer"); - - /* Free the direction's buffer. */ - b = (source == p->client) ? p->client_buffer : p->peer_buffer; - nxt_mem_free(source->mem_pool, b); -} - - -static void -nxt_event_conn_proxy_read_error(nxt_task_t *task, void *obj, void *data) -{ - nxt_event_conn_t *c; - nxt_event_conn_proxy_t *p; - - c = obj; - p = data; - - nxt_debug(task, "event conn proxy read error fd:%d", c->socket.fd); - - nxt_event_conn_proxy_close(task, c, p); -} - - -static void -nxt_event_conn_proxy_write_error(nxt_task_t *task, void *obj, void *data) -{ - nxt_event_conn_t *source, *sink; - nxt_event_conn_proxy_t *p; - - sink = obj; - p = data; - - nxt_debug(task, "event conn proxy write error fd:%d", sink->socket.fd); - - /* Clear data for the direction sink. */ - sink->write = NULL; - - /* Block the direction source. */ - source = (sink == p->client) ? p->peer : p->client; - nxt_fd_event_block_read(task->thread->engine, &source->socket); - - if (source->write == NULL) { - /* - * There is no data for the opposite direction and - * the next read from the sink will most probably fail. - */ - nxt_event_conn_proxy_complete(task, p); - } -} - - -static const nxt_event_conn_state_t nxt_event_conn_proxy_close_state - nxt_aligned(64) = -{ - .ready_handler = nxt_event_conn_proxy_completion, -}; - - -static void -nxt_event_conn_proxy_complete(nxt_task_t *task, nxt_event_conn_proxy_t *p) -{ - nxt_event_engine_t *engine; - - engine = task->thread->engine; - - nxt_debug(p->client->socket.task, "event conn proxy complete %d:%d", - p->client->socket.fd, p->peer->socket.fd); - - if (p->delayed) { - p->delayed = 0; - nxt_queue_remove(&p->peer->link); - } - - if (p->client->socket.fd != -1) { - p->retain = 1; - p->client->write_state = &nxt_event_conn_proxy_close_state; - nxt_event_conn_close(engine, p->client); - } - - if (p->peer->socket.fd != -1) { - p->retain++; - p->peer->write_state = &nxt_event_conn_proxy_close_state; - nxt_event_conn_close(engine, p->peer); - } -} - - -static void -nxt_event_conn_proxy_completion(nxt_task_t *task, void *obj, void *data) -{ - nxt_event_conn_proxy_t *p; - - p = data; - - nxt_debug(p->client->socket.task, "event conn proxy completion %d:%d:%d", - p->retain, p->client->socket.fd, p->peer->socket.fd); - - p->retain--; - - if (p->retain == 0) { - nxt_mem_free(p->client->mem_pool, p->client_buffer); - nxt_mem_free(p->client->mem_pool, p->peer_buffer); - - p->completion_handler(task, p, NULL); - } -} diff --git a/src/nxt_event_engine.h b/src/nxt_event_engine.h index 6947cc8d..5b602bc0 100644 --- a/src/nxt_event_engine.h +++ b/src/nxt_event_engine.h @@ -123,13 +123,13 @@ typedef struct { * events. */ void (*enable_file)(nxt_event_engine_t *engine, - nxt_event_file_t *fev); + nxt_file_event_t *ev); /* * Delete a file from an event set before closing the file descriptor. */ void (*close_file)(nxt_event_engine_t *engine, - nxt_event_file_t *fev); + nxt_file_event_t *ev); /* * Enable post event notifications and set a post handler to handle @@ -157,7 +157,7 @@ typedef struct { nxt_msec_t timeout); /* I/O operations suitable to underlying event facility. */ - nxt_event_conn_io_t *io; + nxt_conn_io_t *io; /* True if an event facility supports file change event notifications. */ uint8_t file_support; /* 1 bit */ diff --git a/src/nxt_eventport_engine.c b/src/nxt_eventport_engine.c index 7dc0ffa8..2bf111f9 100644 --- a/src/nxt_eventport_engine.c +++ b/src/nxt_eventport_engine.c @@ -78,7 +78,7 @@ const nxt_event_interface_t nxt_eventport_engine = { nxt_eventport_signal, nxt_eventport_poll, - &nxt_unix_event_conn_io, + &nxt_unix_conn_io, NXT_NO_FILE_EVENTS, NXT_NO_SIGNAL_EVENTS, diff --git a/src/nxt_event_file.h b/src/nxt_file_event.h index 06f1762f..d4255ce9 100644 --- a/src/nxt_event_file.h +++ b/src/nxt_file_event.h @@ -3,8 +3,8 @@ * Copyright (C) NGINX, Inc. */ -#ifndef _NXT_EVENT_FILE_H_INCLUDED_ -#define _NXT_EVENT_FILE_H_INCLUDED_ +#ifndef _NXT_FILE_EVENT_H_INCLUDED_ +#define _NXT_FILE_EVENT_H_INCLUDED_ typedef struct { @@ -12,7 +12,7 @@ typedef struct { nxt_file_t *file; nxt_work_handler_t handler; nxt_task_t *task; -} nxt_event_file_t; +} nxt_file_event_t; -#endif /* _NXT_EVENT_FILE_H_INCLUDED_ */ +#endif /* _NXT_FILE_EVENT_H_INCLUDED_ */ diff --git a/src/nxt_kqueue_engine.c b/src/nxt_kqueue_engine.c index c5e40eeb..4582b314 100644 --- a/src/nxt_kqueue_engine.c +++ b/src/nxt_kqueue_engine.c @@ -78,9 +78,9 @@ static void nxt_kqueue_oneshot_write(nxt_event_engine_t *engine, static void nxt_kqueue_enable_accept(nxt_event_engine_t *engine, nxt_fd_event_t *ev); static void nxt_kqueue_enable_file(nxt_event_engine_t *engine, - nxt_event_file_t *ev); + nxt_file_event_t *ev); static void nxt_kqueue_close_file(nxt_event_engine_t *engine, - nxt_event_file_t *ev); + nxt_file_event_t *ev); static void nxt_kqueue_fd_set(nxt_event_engine_t *engine, nxt_fd_event_t *ev, nxt_int_t filter, nxt_uint_t flags); static struct kevent *nxt_kqueue_get_kevent(nxt_event_engine_t *engine); @@ -98,26 +98,25 @@ static void nxt_kqueue_signal(nxt_event_engine_t *engine, nxt_uint_t signo); #endif static void nxt_kqueue_poll(nxt_event_engine_t *engine, nxt_msec_t timeout); -static void nxt_kqueue_event_conn_io_connect(nxt_task_t *task, void *obj, +static void nxt_kqueue_conn_io_connect(nxt_task_t *task, void *obj, void *data); -static void nxt_kqueue_event_conn_connected(nxt_task_t *task, void *obj, +static void nxt_kqueue_conn_connected(nxt_task_t *task, void *obj, void *data); static void nxt_kqueue_listen_handler(nxt_task_t *task, void *obj, void *data); -static void nxt_kqueue_event_conn_io_accept(nxt_task_t *task, void *obj, +static void nxt_kqueue_conn_io_accept(nxt_task_t *task, void *obj, void *data); -static void nxt_kqueue_event_conn_io_read(nxt_task_t *task, void *obj, +static void nxt_kqueue_conn_io_read(nxt_task_t *task, void *obj, void *data); -static ssize_t nxt_kqueue_event_conn_io_recvbuf(nxt_event_conn_t *c, - nxt_buf_t *b); +static ssize_t nxt_kqueue_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b); -static nxt_event_conn_io_t nxt_kqueue_event_conn_io = { - nxt_kqueue_event_conn_io_connect, - nxt_kqueue_event_conn_io_accept, +static nxt_conn_io_t nxt_kqueue_conn_io = { + nxt_kqueue_conn_io_connect, + nxt_kqueue_conn_io_accept, - nxt_kqueue_event_conn_io_read, - nxt_kqueue_event_conn_io_recvbuf, - nxt_event_conn_io_recv, + nxt_kqueue_conn_io_read, + nxt_kqueue_conn_io_recvbuf, + nxt_conn_io_recv, nxt_conn_io_write, nxt_event_conn_io_write_chunk, @@ -133,7 +132,7 @@ static nxt_event_conn_io_t nxt_kqueue_event_conn_io = { nxt_event_conn_io_writev, nxt_event_conn_io_send, - nxt_event_conn_io_shutdown, + nxt_conn_io_shutdown, }; @@ -165,7 +164,7 @@ const nxt_event_interface_t nxt_kqueue_engine = { #endif nxt_kqueue_poll, - &nxt_kqueue_event_conn_io, + &nxt_kqueue_conn_io, NXT_FILE_EVENTS, NXT_SIGNAL_EVENTS, @@ -414,7 +413,7 @@ nxt_kqueue_enable_accept(nxt_event_engine_t *engine, nxt_fd_event_t *ev) static void -nxt_kqueue_enable_file(nxt_event_engine_t *engine, nxt_event_file_t *ev) +nxt_kqueue_enable_file(nxt_event_engine_t *engine, nxt_file_event_t *ev) { struct kevent *kev; @@ -437,7 +436,7 @@ nxt_kqueue_enable_file(nxt_event_engine_t *engine, nxt_event_file_t *ev) static void -nxt_kqueue_close_file(nxt_event_engine_t *engine, nxt_event_file_t *ev) +nxt_kqueue_close_file(nxt_event_engine_t *engine, nxt_file_event_t *ev) { /* TODO: pending event. */ } @@ -497,7 +496,7 @@ nxt_kqueue_error(nxt_event_engine_t *engine) { struct kevent *kev, *end; nxt_fd_event_t *ev; - nxt_event_file_t *fev; + nxt_file_event_t *fev; nxt_work_queue_t *wq; wq = &engine->fast_work_queue; @@ -551,7 +550,7 @@ nxt_kqueue_fd_error_handler(nxt_task_t *task, void *obj, void *data) static void nxt_kqueue_file_error_handler(nxt_task_t *task, void *obj, void *data) { - nxt_event_file_t *ev; + nxt_file_event_t *ev; ev = obj; @@ -678,7 +677,7 @@ nxt_kqueue_poll(nxt_event_engine_t *engine, nxt_msec_t timeout) nxt_fd_event_t *ev; nxt_sig_event_t *sigev; struct timespec ts, *tp; - nxt_event_file_t *fev; + nxt_file_event_t *fev; nxt_work_queue_t *wq; nxt_work_handler_t handler; @@ -850,9 +849,9 @@ nxt_kqueue_poll(nxt_event_engine_t *engine, nxt_msec_t timeout) */ static void -nxt_kqueue_event_conn_io_connect(nxt_task_t *task, void *obj, void *data) +nxt_kqueue_conn_io_connect(nxt_task_t *task, void *obj, void *data) { - nxt_event_conn_t *c; + nxt_conn_t *c; nxt_event_engine_t *engine; nxt_work_handler_t handler; const nxt_event_conn_state_t *state; @@ -869,11 +868,11 @@ nxt_kqueue_event_conn_io_connect(nxt_task_t *task, void *obj, void *data) break; case NXT_AGAIN: - c->socket.write_handler = nxt_kqueue_event_conn_connected; - c->socket.error_handler = nxt_event_conn_connect_error; + c->socket.write_handler = nxt_kqueue_conn_connected; + c->socket.error_handler = nxt_conn_connect_error; engine = task->thread->engine; - nxt_event_conn_timer(engine, c, state, &c->write_timer); + nxt_conn_timer(engine, c, state, &c->write_timer); nxt_kqueue_enable_write(engine, &c->socket); return; @@ -892,13 +891,13 @@ nxt_kqueue_event_conn_io_connect(nxt_task_t *task, void *obj, void *data) static void -nxt_kqueue_event_conn_connected(nxt_task_t *task, void *obj, void *data) +nxt_kqueue_conn_connected(nxt_task_t *task, void *obj, void *data) { - nxt_event_conn_t *c; + nxt_conn_t *c; c = obj; - nxt_debug(task, "kqueue event conn connected fd:%d", c->socket.fd); + nxt_debug(task, "kqueue conn connected fd:%d", c->socket.fd); c->socket.write = NXT_EVENT_BLOCKED; @@ -914,36 +913,36 @@ nxt_kqueue_event_conn_connected(nxt_task_t *task, void *obj, void *data) static void nxt_kqueue_listen_handler(nxt_task_t *task, void *obj, void *data) { - nxt_event_conn_listen_t *cls; + nxt_listen_event_t *lev; - cls = obj; + lev = obj; nxt_debug(task, "kevent fd:%d avail:%D", - cls->socket.fd, cls->socket.kq_available); + lev->socket.fd, lev->socket.kq_available); - cls->ready = nxt_min(cls->batch, (uint32_t) cls->socket.kq_available); + lev->ready = nxt_min(lev->batch, (uint32_t) lev->socket.kq_available); - nxt_kqueue_event_conn_io_accept(task, cls, data); + nxt_kqueue_conn_io_accept(task, lev, data); } static void -nxt_kqueue_event_conn_io_accept(nxt_task_t *task, void *obj, void *data) +nxt_kqueue_conn_io_accept(nxt_task_t *task, void *obj, void *data) { - socklen_t len; - nxt_socket_t s; - struct sockaddr *sa; - nxt_event_conn_t *c; - nxt_event_conn_listen_t *cls; + socklen_t len; + nxt_conn_t *c; + nxt_socket_t s; + struct sockaddr *sa; + nxt_listen_event_t *lev; - cls = obj; - c = cls->next; + lev = obj; + c = lev->next; - cls->ready--; - cls->socket.read_ready = (cls->ready != 0); + lev->ready--; + lev->socket.read_ready = (lev->ready != 0); - cls->socket.kq_available--; - cls->socket.read_ready = (cls->socket.kq_available != 0); + lev->socket.kq_available--; + lev->socket.read_ready = (lev->socket.kq_available != 0); len = c->remote->socklen; @@ -955,34 +954,34 @@ nxt_kqueue_event_conn_io_accept(nxt_task_t *task, void *obj, void *data) len = 0; } - s = accept(cls->socket.fd, sa, &len); + s = accept(lev->socket.fd, sa, &len); if (s != -1) { c->socket.fd = s; - nxt_debug(task, "accept(%d): %d", cls->socket.fd, s); + nxt_debug(task, "accept(%d): %d", lev->socket.fd, s); - nxt_event_conn_accept(task, cls, c); + nxt_conn_accept(task, lev, c); return; } - nxt_event_conn_accept_error(task, cls, "accept", nxt_errno); + nxt_conn_accept_error(task, lev, "accept", nxt_errno); } /* - * nxt_kqueue_event_conn_io_read() is just a wrapper to eliminate the + * nxt_kqueue_conn_io_read() is just a wrapper to eliminate the * readv() or recv() syscall if a remote side just closed connection. */ static void -nxt_kqueue_event_conn_io_read(nxt_task_t *task, void *obj, void *data) +nxt_kqueue_conn_io_read(nxt_task_t *task, void *obj, void *data) { - nxt_event_conn_t *c; + nxt_conn_t *c; c = obj; - nxt_debug(task, "kqueue event conn read fd:%d", c->socket.fd); + nxt_debug(task, "kqueue conn read fd:%d", c->socket.fd); if (c->socket.kq_available == 0 && c->socket.kq_eof) { nxt_debug(task, "kevent fd:%d eof", c->socket.fd); @@ -993,18 +992,18 @@ nxt_kqueue_event_conn_io_read(nxt_task_t *task, void *obj, void *data) return; } - nxt_event_conn_io_read(task, c, data); + nxt_conn_io_read(task, c, data); } /* - * nxt_kqueue_event_conn_io_recvbuf() is just wrapper around standard - * nxt_event_conn_io_recvbuf() to eliminate the readv() or recv() syscalls + * nxt_kqueue_conn_io_recvbuf() is just wrapper around standard + * nxt_conn_io_recvbuf() to eliminate the readv() or recv() syscalls * if there is no pending data or a remote side closed connection. */ static ssize_t -nxt_kqueue_event_conn_io_recvbuf(nxt_event_conn_t *c, nxt_buf_t *b) +nxt_kqueue_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b) { ssize_t n; @@ -1013,7 +1012,7 @@ nxt_kqueue_event_conn_io_recvbuf(nxt_event_conn_t *c, nxt_buf_t *b) return 0; } - n = nxt_event_conn_io_recvbuf(c, b); + n = nxt_conn_io_recvbuf(c, b); if (n > 0) { c->socket.kq_available -= n; diff --git a/src/nxt_listen_socket.c b/src/nxt_listen_socket.c index 08751dfc..7a0e3a7c 100644 --- a/src/nxt_listen_socket.c +++ b/src/nxt_listen_socket.c @@ -241,7 +241,7 @@ nxt_listen_socket_pool_min_size(nxt_listen_socket_t *ls) #endif return size + sizeof(nxt_mem_pool_t) - + sizeof(nxt_event_conn_t) + + sizeof(nxt_conn_t) + sizeof(nxt_log_t); } diff --git a/src/nxt_macosx_sendfile.c b/src/nxt_macosx_sendfile.c index f636819c..2c6ea954 100644 --- a/src/nxt_macosx_sendfile.c +++ b/src/nxt_macosx_sendfile.c @@ -26,8 +26,7 @@ static int nxt_sys_sendfile(int fd, int s, off_t offset, off_t *len, ssize_t -nxt_macosx_event_conn_io_sendfile(nxt_event_conn_t *c, nxt_buf_t *b, - size_t limit) +nxt_macosx_event_conn_io_sendfile(nxt_conn_t *c, nxt_buf_t *b, size_t limit) { size_t hd_size, file_size; ssize_t n; diff --git a/src/nxt_main.h b/src/nxt_main.h index 24ec4ca1..19db0aa3 100644 --- a/src/nxt_main.h +++ b/src/nxt_main.h @@ -110,7 +110,7 @@ typedef struct nxt_port_mmap_s nxt_port_mmap_t; #include <nxt_buf_pool.h> #include <nxt_recvbuf.h> -typedef struct nxt_event_conn_s nxt_event_conn_t; +typedef struct nxt_conn_s nxt_conn_t; #include <nxt_sendbuf.h> #include <nxt_log_moderation.h> @@ -129,6 +129,7 @@ nxt_thread_extern_data(nxt_thread_t, nxt_thread_context); #include <nxt_thread_log.h> #include <nxt_fd_event.h> +#include <nxt_file_event.h> #include <nxt_port.h> #include <nxt_port_memory.h> @@ -137,12 +138,10 @@ nxt_thread_extern_data(nxt_thread_t, nxt_thread_context); #endif -typedef void (*nxt_event_conn_handler_t)(nxt_thread_t *thr, - nxt_event_conn_t *c); +typedef void (*nxt_event_conn_handler_t)(nxt_thread_t *thr, nxt_conn_t *c); #include <nxt_listen_socket.h> -#include <nxt_event_conn.h> -#include <nxt_event_file.h> +#include <nxt_conn.h> #include <nxt_event_engine.h> #include <nxt_job.h> diff --git a/src/nxt_openssl.c b/src/nxt_openssl.c index 17e65a56..66f0ad32 100644 --- a/src/nxt_openssl.c +++ b/src/nxt_openssl.c @@ -23,23 +23,20 @@ typedef struct { static nxt_int_t nxt_openssl_server_init(nxt_ssltls_conf_t *conf); static void nxt_openssl_conn_init(nxt_task_t *task, nxt_ssltls_conf_t *conf, - nxt_event_conn_t *c); -static void nxt_openssl_session_cleanup(void *data); + nxt_conn_t *c); +static void nxt_openssl_session_cleanup(nxt_task_t *task, void *data); static void nxt_openssl_conn_handshake(nxt_task_t *task, void *obj, void *data); static void nxt_openssl_conn_io_read(nxt_task_t *task, void *obj, void *data); static void nxt_openssl_conn_io_shutdown(nxt_task_t *task, void *obj, void *data); -static ssize_t nxt_openssl_conn_io_write_chunk(nxt_event_conn_t *c, - nxt_buf_t *b, size_t limit); -static ssize_t nxt_openssl_conn_io_send(nxt_event_conn_t *c, void *buf, - size_t size); +static ssize_t nxt_openssl_conn_io_write_chunk(nxt_conn_t *c, nxt_buf_t *b, + size_t limit); +static ssize_t nxt_openssl_conn_io_send(nxt_conn_t *c, void *buf, size_t size); static nxt_int_t nxt_openssl_conn_test_error(nxt_task_t *task, - nxt_event_conn_t *c, int ret, nxt_err_t sys_err, - nxt_work_handler_t handler); -static void nxt_cdecl nxt_openssl_conn_error(nxt_event_conn_t *c, nxt_err_t err, + nxt_conn_t *c, int ret, nxt_err_t sys_err, nxt_work_handler_t handler); +static void nxt_cdecl nxt_openssl_conn_error(nxt_conn_t *c, nxt_err_t err, const char *fmt, ...); -static nxt_uint_t nxt_openssl_log_error_level(nxt_event_conn_t *c, - nxt_err_t err); +static nxt_uint_t nxt_openssl_log_error_level(nxt_conn_t *c, nxt_err_t err); static void nxt_cdecl nxt_openssl_log_error(nxt_uint_t level, nxt_log_t *log, const char *fmt, ...); static u_char *nxt_openssl_copy_error(u_char *p, u_char *end); @@ -51,7 +48,7 @@ const nxt_ssltls_lib_t nxt_openssl_lib = { }; -static nxt_event_conn_io_t nxt_openssl_event_conn_io = { +static nxt_conn_io_t nxt_openssl_conn_io = { NULL, NULL, @@ -249,8 +246,7 @@ fail: static void -nxt_openssl_conn_init(nxt_task_t *task, nxt_ssltls_conf_t *conf, - nxt_event_conn_t *c) +nxt_openssl_conn_init(nxt_task_t *task, nxt_ssltls_conf_t *conf, nxt_conn_t *c) { int ret; SSL *s; @@ -301,7 +297,7 @@ nxt_openssl_conn_init(nxt_task_t *task, nxt_ssltls_conf_t *conf, goto fail; } - c->io = &nxt_openssl_event_conn_io; + c->io = &nxt_openssl_conn_io; c->sendfile = NXT_CONN_SENDFILE_OFF; nxt_openssl_conn_handshake(task, c, c->socket.data); @@ -315,13 +311,13 @@ fail: static void -nxt_openssl_session_cleanup(void *data) +nxt_openssl_session_cleanup(nxt_task_t *task, void *data) { nxt_openssl_conn_t *ssltls; ssltls = data; - nxt_thread_log_debug("openssl session cleanup"); + nxt_debug(task, "openssl session cleanup"); nxt_free(ssltls->buffer.start); @@ -335,7 +331,7 @@ nxt_openssl_conn_handshake(nxt_task_t *task, void *obj, void *data) int ret; nxt_int_t n; nxt_err_t err; - nxt_event_conn_t *c; + nxt_conn_t *c; nxt_openssl_conn_t *ssltls; c = obj; @@ -382,7 +378,7 @@ nxt_openssl_conn_io_read(nxt_task_t *task, void *obj, void *data) nxt_buf_t *b; nxt_int_t n; nxt_err_t err; - nxt_event_conn_t *c; + nxt_conn_t *c; nxt_work_handler_t handler; nxt_openssl_conn_t *ssltls; @@ -432,7 +428,7 @@ nxt_openssl_conn_io_read(nxt_task_t *task, void *obj, void *data) static ssize_t -nxt_openssl_conn_io_write_chunk(nxt_event_conn_t *c, nxt_buf_t *b, size_t limit) +nxt_openssl_conn_io_write_chunk(nxt_conn_t *c, nxt_buf_t *b, size_t limit) { nxt_openssl_conn_t *ssltls; @@ -445,7 +441,7 @@ nxt_openssl_conn_io_write_chunk(nxt_event_conn_t *c, nxt_buf_t *b, size_t limit) static ssize_t -nxt_openssl_conn_io_send(nxt_event_conn_t *c, void *buf, size_t size) +nxt_openssl_conn_io_send(nxt_conn_t *c, void *buf, size_t size) { int ret; nxt_err_t err; @@ -491,7 +487,7 @@ nxt_openssl_conn_io_shutdown(nxt_task_t *task, void *obj, void *data) nxt_err_t err; nxt_int_t n; nxt_bool_t quiet, once; - nxt_event_conn_t *c; + nxt_conn_t *c; nxt_work_handler_t handler; nxt_openssl_conn_t *ssltls; @@ -586,7 +582,7 @@ done: static nxt_int_t -nxt_openssl_conn_test_error(nxt_task_t *task, nxt_event_conn_t *c, int ret, +nxt_openssl_conn_test_error(nxt_task_t *task, nxt_conn_t *c, int ret, nxt_err_t sys_err, nxt_work_handler_t handler) { u_long lib_err; @@ -664,7 +660,7 @@ nxt_openssl_conn_test_error(nxt_task_t *task, nxt_event_conn_t *c, int ret, static void nxt_cdecl -nxt_openssl_conn_error(nxt_event_conn_t *c, nxt_err_t err, const char *fmt, ...) +nxt_openssl_conn_error(nxt_conn_t *c, nxt_err_t err, const char *fmt, ...) { u_char *p, *end; va_list args; @@ -697,7 +693,7 @@ nxt_openssl_conn_error(nxt_event_conn_t *c, nxt_err_t err, const char *fmt, ...) static nxt_uint_t -nxt_openssl_log_error_level(nxt_event_conn_t *c, nxt_err_t err) +nxt_openssl_log_error_level(nxt_conn_t *c, nxt_err_t err) { switch (ERR_GET_REASON(ERR_peek_error())) { diff --git a/src/nxt_poll_engine.c b/src/nxt_poll_engine.c index 90a8176e..607cd144 100644 --- a/src/nxt_poll_engine.c +++ b/src/nxt_poll_engine.c @@ -88,7 +88,7 @@ const nxt_event_interface_t nxt_poll_engine = { NULL, nxt_poll, - &nxt_unix_event_conn_io, + &nxt_unix_conn_io, NXT_NO_FILE_EVENTS, NXT_NO_SIGNAL_EVENTS, diff --git a/src/nxt_pollset_engine.c b/src/nxt_pollset_engine.c index 571ad794..402f954c 100644 --- a/src/nxt_pollset_engine.c +++ b/src/nxt_pollset_engine.c @@ -79,7 +79,7 @@ const nxt_event_interface_t nxt_pollset_engine = { NULL, nxt_pollset_poll, - &nxt_unix_event_conn_io, + &nxt_unix_conn_io, NXT_NO_FILE_EVENTS, NXT_NO_SIGNAL_EVENTS, diff --git a/src/nxt_router.c b/src/nxt_router.c index c87b48b8..67acc12d 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -71,8 +71,7 @@ static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data); static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data); static void nxt_router_conn_error(nxt_task_t *task, void *obj, void *data); static void nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data); -static nxt_msec_t nxt_router_conn_timeout_value(nxt_event_conn_t *c, - uintptr_t data); +static nxt_msec_t nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data); nxt_int_t @@ -207,8 +206,8 @@ nxt_router_stub_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) skcf->listen.handler = nxt_router_conn_init; skcf->listen.mem_pool_size = nxt_listen_socket_pool_min_size(&skcf->listen) - + sizeof(nxt_event_conn_proxy_t) - + sizeof(nxt_event_conn_t) + + sizeof(nxt_conn_proxy_t) + + sizeof(nxt_conn_t) + 4 * sizeof(nxt_buf_t); skcf->header_buffer_size = 2048; @@ -222,8 +221,8 @@ nxt_router_stub_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf) skcf->listen.handler = nxt_stream_connection_init; skcf->listen.mem_pool_size = nxt_listen_socket_pool_min_size(&skcf->listen) - + sizeof(nxt_event_conn_proxy_t) - + sizeof(nxt_event_conn_t) + + sizeof(nxt_conn_proxy_t) + + sizeof(nxt_conn_t) + 4 * sizeof(nxt_buf_t); skcf->header_read_timeout = 5000; @@ -881,7 +880,7 @@ nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data) } -static const nxt_event_conn_state_t nxt_router_conn_read_state +static const nxt_conn_state_t nxt_router_conn_read_state nxt_aligned(64) = { .ready_handler = nxt_router_conn_http_header_parse, @@ -898,7 +897,7 @@ static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data) { size_t size; - nxt_event_conn_t *c; + nxt_conn_t *c; nxt_event_engine_t *engine; nxt_socket_conf_joint_t *joint; @@ -920,11 +919,11 @@ nxt_router_conn_init(nxt_task_t *task, void *obj, void *data) c->read_state = &nxt_router_conn_read_state; - nxt_event_conn_read(engine, c); + nxt_conn_read(engine, c); } -static const nxt_event_conn_state_t nxt_router_conn_write_state +static const nxt_conn_state_t nxt_router_conn_write_state nxt_aligned(64) = { .ready_handler = nxt_router_conn_close, @@ -939,7 +938,7 @@ nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data) size_t size; nxt_int_t ret; nxt_buf_t *b; - nxt_event_conn_t *c; + nxt_conn_t *c; nxt_socket_conf_joint_t *joint; nxt_http_request_parse_t *rp; @@ -996,7 +995,7 @@ nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data) c->read = b; } - nxt_event_conn_read(task->thread->engine, c); + nxt_conn_read(task->thread->engine, c); return; } @@ -1004,11 +1003,11 @@ nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data) c->write->mem.pos = c->write->mem.start; c->write_state = &nxt_router_conn_write_state; - nxt_event_conn_write(task->thread->engine, c); + nxt_conn_write(task->thread->engine, c); } -static const nxt_event_conn_state_t nxt_router_conn_close_state +static const nxt_conn_state_t nxt_router_conn_close_state nxt_aligned(64) = { .ready_handler = nxt_router_conn_free, @@ -1018,7 +1017,7 @@ static const nxt_event_conn_state_t nxt_router_conn_close_state static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data) { - nxt_event_conn_t *c; + nxt_conn_t *c; c = obj; @@ -1026,14 +1025,14 @@ nxt_router_conn_close(nxt_task_t *task, void *obj, void *data) c->write_state = &nxt_router_conn_close_state; - nxt_event_conn_close(task->thread->engine, c); + nxt_conn_close(task->thread->engine, c); } static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data) { - nxt_event_conn_t *c; + nxt_conn_t *c; nxt_socket_conf_joint_t *joint; c = obj; @@ -1050,7 +1049,7 @@ nxt_router_conn_free(nxt_task_t *task, void *obj, void *data) static void nxt_router_conn_error(nxt_task_t *task, void *obj, void *data) { - nxt_event_conn_t *c; + nxt_conn_t *c; c = obj; @@ -1058,30 +1057,30 @@ nxt_router_conn_error(nxt_task_t *task, void *obj, void *data) c->write_state = &nxt_router_conn_close_state; - nxt_event_conn_close(task->thread->engine, c); + nxt_conn_close(task->thread->engine, c); } static void nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data) { - nxt_timer_t *timer; - nxt_event_conn_t *c; + nxt_conn_t *c; + nxt_timer_t *timer; timer = obj; nxt_debug(task, "router conn timeout"); - c = nxt_event_read_timer_conn(timer); + c = nxt_read_timer_conn(timer); c->write_state = &nxt_router_conn_close_state; - nxt_event_conn_close(task->thread->engine, c); + nxt_conn_close(task->thread->engine, c); } static nxt_msec_t -nxt_router_conn_timeout_value(nxt_event_conn_t *c, uintptr_t data) +nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data) { nxt_socket_conf_joint_t *joint; diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c index 62b0e3b5..e35baa8f 100644 --- a/src/nxt_runtime.c +++ b/src/nxt_runtime.c @@ -497,9 +497,9 @@ nxt_runtime_quit(nxt_task_t *task) static void nxt_runtime_close_idle_connections(nxt_event_engine_t *engine) { + nxt_conn_t *c; nxt_queue_t *idle; nxt_queue_link_t *link, *next; - nxt_event_conn_t *c; nxt_debug(&engine->task, "close idle connections"); @@ -510,11 +510,11 @@ nxt_runtime_close_idle_connections(nxt_event_engine_t *engine) link = next) { next = nxt_queue_next(link); - c = nxt_queue_link_data(link, nxt_event_conn_t, link); + c = nxt_queue_link_data(link, nxt_conn_t, link); if (!c->socket.read_ready) { nxt_queue_remove(link); - nxt_event_conn_close(engine, c); + nxt_conn_close(engine, c); } } } diff --git a/src/nxt_select_engine.c b/src/nxt_select_engine.c index 8a6f0710..6f760012 100644 --- a/src/nxt_select_engine.c +++ b/src/nxt_select_engine.c @@ -57,7 +57,7 @@ const nxt_event_interface_t nxt_select_engine = { NULL, nxt_select_poll, - &nxt_unix_event_conn_io, + &nxt_unix_conn_io, NXT_NO_FILE_EVENTS, NXT_NO_SIGNAL_EVENTS, diff --git a/src/nxt_sendbuf.c b/src/nxt_sendbuf.c index e8fbe2a0..361cb0cd 100644 --- a/src/nxt_sendbuf.c +++ b/src/nxt_sendbuf.c @@ -202,8 +202,8 @@ nxt_sendbuf_file_coalesce(nxt_sendbuf_coalesce_t *sb) ssize_t -nxt_sendbuf_copy_coalesce(nxt_event_conn_t *c, nxt_buf_mem_t *bm, - nxt_buf_t *b, size_t limit) +nxt_sendbuf_copy_coalesce(nxt_conn_t *c, nxt_buf_mem_t *bm, nxt_buf_t *b, + size_t limit) { size_t size, bsize, copied; ssize_t n; diff --git a/src/nxt_sendbuf.h b/src/nxt_sendbuf.h index 0b789583..b159bfe5 100644 --- a/src/nxt_sendbuf.h +++ b/src/nxt_sendbuf.h @@ -66,41 +66,41 @@ typedef struct { #if (NXT_HAVE_LINUX_SENDFILE) #define NXT_HAVE_SENDFILE 1 -ssize_t nxt_linux_event_conn_io_sendfile(nxt_event_conn_t *c, nxt_buf_t *b, +ssize_t nxt_linux_event_conn_io_sendfile(nxt_conn_t *c, nxt_buf_t *b, size_t limit); #endif #if (NXT_HAVE_FREEBSD_SENDFILE) #define NXT_HAVE_SENDFILE 1 -ssize_t nxt_freebsd_event_conn_io_sendfile(nxt_event_conn_t *c, nxt_buf_t *b, +ssize_t nxt_freebsd_event_conn_io_sendfile(nxt_conn_t *c, nxt_buf_t *b, size_t limit); #endif #if (NXT_HAVE_SOLARIS_SENDFILEV) #define NXT_HAVE_SENDFILE 1 -ssize_t nxt_solaris_event_conn_io_sendfilev(nxt_event_conn_t *c, nxt_buf_t *b, +ssize_t nxt_solaris_event_conn_io_sendfilev(nxt_conn_t *c, nxt_buf_t *b, size_t limit); #endif #if (NXT_HAVE_MACOSX_SENDFILE) #define NXT_HAVE_SENDFILE 1 -ssize_t nxt_macosx_event_conn_io_sendfile(nxt_event_conn_t *c, nxt_buf_t *b, +ssize_t nxt_macosx_event_conn_io_sendfile(nxt_conn_t *c, nxt_buf_t *b, size_t limit); #endif #if (NXT_HAVE_AIX_SEND_FILE) #define NXT_HAVE_SENDFILE 1 -ssize_t nxt_aix_event_conn_io_send_file(nxt_event_conn_t *c, nxt_buf_t *b, +ssize_t nxt_aix_event_conn_io_send_file(nxt_conn_t *c, nxt_buf_t *b, size_t limit); #endif #if (NXT_HAVE_HPUX_SENDFILE) #define NXT_HAVE_SENDFILE 1 -ssize_t nxt_hpux_event_conn_io_sendfile(nxt_event_conn_t *c, nxt_buf_t *b, +ssize_t nxt_hpux_event_conn_io_sendfile(nxt_conn_t *c, nxt_buf_t *b, size_t limit); #endif -ssize_t nxt_event_conn_io_sendbuf(nxt_event_conn_t *c, nxt_buf_t *b, +ssize_t nxt_event_conn_io_sendbuf(nxt_conn_t *c, nxt_buf_t *b, size_t limit); @@ -116,7 +116,7 @@ size_t nxt_sendbuf_file_coalesce(nxt_sendbuf_coalesce_t *sb); * SSL/TLS libraries which lack vector I/O interface yet add noticeable * overhead to each SSL/TLS record. */ -ssize_t nxt_sendbuf_copy_coalesce(nxt_event_conn_t *c, nxt_buf_mem_t *bm, +ssize_t nxt_sendbuf_copy_coalesce(nxt_conn_t *c, nxt_buf_mem_t *bm, nxt_buf_t *b, size_t limit); nxt_buf_t *nxt_sendbuf_update(nxt_buf_t *b, size_t sent); diff --git a/src/nxt_ssltls.h b/src/nxt_ssltls.h index aa32348d..f12335a7 100644 --- a/src/nxt_ssltls.h +++ b/src/nxt_ssltls.h @@ -35,8 +35,7 @@ typedef struct { struct nxt_ssltls_conf_s { void *ctx; void (*conn_init)(nxt_task_t *task, - nxt_ssltls_conf_t *conf, - nxt_event_conn_t *c); + nxt_ssltls_conf_t *conf, nxt_conn_t *c); const nxt_ssltls_lib_t *lib; diff --git a/src/nxt_stream_module.c b/src/nxt_stream_module.c index e89cd6fd..6000daaf 100644 --- a/src/nxt_stream_module.c +++ b/src/nxt_stream_module.c @@ -17,8 +17,8 @@ static void nxt_stream_connection_close(nxt_task_t *task, void *obj, void nxt_stream_connection_init(nxt_task_t *task, void *obj, void *data) { + nxt_conn_t *c; nxt_runtime_t *rt; - nxt_event_conn_t *c; nxt_upstream_peer_t *up; c = obj; @@ -57,8 +57,8 @@ fail: static void nxt_stream_connection_peer(nxt_task_t *task, nxt_upstream_peer_t *up) { - nxt_event_conn_t *c; - nxt_event_conn_proxy_t *p; + nxt_conn_t *c; + nxt_conn_proxy_t *p; c = up->data; @@ -67,7 +67,7 @@ nxt_stream_connection_peer(nxt_task_t *task, nxt_upstream_peer_t *up) nxt_log_debug(c->socket.log, "stream connection peer %*s", up->sockaddr->length, nxt_sockaddr_start(up->sockaddr)); - p = nxt_event_conn_proxy_create(c); + p = nxt_conn_proxy_create(c); if (nxt_slow_path(p == NULL)) { goto fail; } @@ -107,7 +107,7 @@ nxt_stream_connection_peer(nxt_task_t *task, nxt_upstream_peer_t *up) rate->last = engine->timers.now; } - nxt_event_conn_proxy(task, p); + nxt_conn_proxy(task, p); return; fail: diff --git a/src/nxt_stream_source.c b/src/nxt_stream_source.c index 9b667b99..66ec1640 100644 --- a/src/nxt_stream_source.c +++ b/src/nxt_stream_source.c @@ -58,7 +58,7 @@ nxt_stream_source_connect(nxt_task_t *task, nxt_stream_source_t *stream) stream->conn = c; c->socket.data = stream; - nxt_event_conn_work_queue_set(c, us->work_queue); + nxt_conn_work_queue_set(c, us->work_queue); c->remote = us->peer->sockaddr; c->write_state = &nxt_stream_source_connect_state; @@ -158,7 +158,7 @@ nxt_stream_source_write_ready(nxt_task_t *task, void *obj, void *data) nxt_debug(task, "stream source write ready fd:%d", c->socket.fd); - nxt_event_conn_read(task, c); + nxt_conn_read(task, c); } @@ -212,7 +212,7 @@ nxt_stream_source_read_ready(nxt_task_t *task, void *obj, void *data) c->read_state = &nxt_stream_source_response_read_state; - nxt_event_conn_read(task, c); + nxt_conn_read(task, c); return; fail: @@ -425,7 +425,7 @@ nxt_stream_source_closed(nxt_task_t *task, void *obj, void *data) nxt_debug(task, "stream source closed fd:%d", c->socket.fd); - nxt_event_conn_close(task, c); + nxt_conn_close(task, c); b = nxt_buf_sync_alloc(stream->upstream->buffers.mem_pool, NXT_BUF_SYNC_LAST); @@ -463,7 +463,7 @@ nxt_stream_source_error(nxt_task_t *task, void *obj, void *data) static void nxt_stream_source_close(nxt_task_t *task, nxt_stream_source_t *stream) { - nxt_event_conn_close(task, stream->conn); + nxt_conn_close(task, stream->conn); stream->error_handler(task, stream); } diff --git a/src/nxt_stream_source.h b/src/nxt_stream_source.h index 27a6bfc8..2d57073f 100644 --- a/src/nxt_stream_source.h +++ b/src/nxt_stream_source.h @@ -14,7 +14,7 @@ typedef void (*nxt_stream_source_handler_t)(nxt_task_t *task, nxt_stream_source_t *s); struct nxt_stream_source_s { - nxt_event_conn_t *conn; + nxt_conn_t *conn; nxt_source_hook_t *next; nxt_upstream_source_t *upstream; diff --git a/src/nxt_worker_process.c b/src/nxt_worker_process.c index b6266520..6407c734 100644 --- a/src/nxt_worker_process.c +++ b/src/nxt_worker_process.c @@ -45,12 +45,12 @@ const nxt_sig_event_t nxt_worker_process_signals[] = { static void nxt_worker_process_quit(nxt_task_t *task) { - nxt_uint_t n; - nxt_queue_t *listen; - nxt_runtime_t *rt; - nxt_queue_link_t *link, *next; - nxt_listen_socket_t *ls; - nxt_event_conn_listen_t *cls; + nxt_uint_t n; + nxt_queue_t *listen; + nxt_runtime_t *rt; + nxt_queue_link_t *link, *next; + nxt_listen_event_t *lev; + nxt_listen_socket_t *ls; rt = task->thread->runtime; @@ -63,10 +63,10 @@ nxt_worker_process_quit(nxt_task_t *task) link = next) { next = nxt_queue_next(link); - cls = nxt_queue_link_data(link, nxt_event_conn_listen_t, link); + lev = nxt_queue_link_data(link, nxt_listen_event_t, link); nxt_queue_remove(link); - nxt_fd_event_close(task->thread->engine, &cls->socket); + nxt_fd_event_close(task->thread->engine, &lev->socket); } if (rt->listen_sockets != NULL) { |