diff options
author | Igor Sysoev <igor@sysoev.ru> | 2017-06-14 15:18:52 +0300 |
---|---|---|
committer | Igor Sysoev <igor@sysoev.ru> | 2017-06-14 15:18:52 +0300 |
commit | 7574c64992b98d3dfbc3dd101bd0f7d78bad0823 (patch) | |
tree | 3a98c46e88d9023df34be3e6cce4f762d53aad36 /src/nxt_conn_proxy.c | |
parent | 3e2632688f53c4cb08e7ac03c61e71facd038df4 (diff) | |
download | unit-7574c64992b98d3dfbc3dd101bd0f7d78bad0823.tar.gz unit-7574c64992b98d3dfbc3dd101bd0f7d78bad0823.tar.bz2 |
nxt_event_conn_... functions and structures have been renamed
to nxt_conn_...
Diffstat (limited to '')
-rw-r--r-- | src/nxt_conn_proxy.c | 998 |
1 files changed, 998 insertions, 0 deletions
diff --git a/src/nxt_conn_proxy.c b/src/nxt_conn_proxy.c new file mode 100644 index 00000000..dec23684 --- /dev/null +++ b/src/nxt_conn_proxy.c @@ -0,0 +1,998 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) NGINX, Inc. + */ + +#include <nxt_main.h> + + +static void nxt_conn_proxy_client_buffer_alloc(nxt_task_t *task, void *obj, + void *data); +static void nxt_conn_proxy_peer_connect(nxt_task_t *task, void *obj, + void *data); +static void nxt_conn_proxy_connected(nxt_task_t *task, void *obj, void *data); +static void nxt_conn_proxy_peer_read(nxt_task_t *task, void *obj, void *data); +static void nxt_conn_proxy_client_read_ready(nxt_task_t *task, void *obj, + void *data); +static void nxt_conn_proxy_peer_read_ready(nxt_task_t *task, void *obj, + void *data); +static void nxt_conn_proxy_read_process(nxt_task_t *task, nxt_conn_proxy_t *p, + nxt_conn_t *source, nxt_conn_t *sink); +static void nxt_conn_proxy_write_add(nxt_conn_t *c, nxt_buf_t *b); +static void nxt_conn_proxy_read(nxt_task_t *task, void *obj, void *data); +static void nxt_conn_proxy_client_write_ready(nxt_task_t *task, void *obj, + void *data); +static void nxt_conn_proxy_peer_write_ready(nxt_task_t *task, void *obj, + void *data); +static void nxt_conn_proxy_write_process(nxt_task_t *task, nxt_conn_proxy_t *p, + nxt_conn_t *sink, nxt_conn_t *source); +static void nxt_conn_proxy_read_add(nxt_conn_t *c, nxt_buf_t *b); +static void nxt_conn_proxy_close(nxt_task_t *task, void *obj, void *data); +static void nxt_conn_proxy_error(nxt_task_t *task, void *obj, void *data); +static void nxt_conn_proxy_read_timeout(nxt_task_t *task, void *obj, + void *data); +static void nxt_conn_proxy_write_timeout(nxt_task_t *task, void *obj, + void *data); +static nxt_msec_t nxt_conn_proxy_timeout_value(nxt_conn_t *c, uintptr_t data); +static void nxt_conn_proxy_refused(nxt_task_t *task, void *obj, void *data); +static void nxt_conn_proxy_reconnect_handler(nxt_task_t *task, void *obj, + void *data); +static void nxt_conn_proxy_shutdown(nxt_task_t *task, nxt_conn_proxy_t *p, + nxt_conn_t *source, nxt_conn_t *sink); +static void nxt_conn_proxy_read_error(nxt_task_t *task, void *obj, void *data); +static void nxt_conn_proxy_write_error(nxt_task_t *task, void *obj, void *data); +static void nxt_conn_proxy_complete(nxt_task_t *task, nxt_conn_proxy_t *p); +static void nxt_conn_proxy_completion(nxt_task_t *task, void *obj, void *data); + + +static const nxt_conn_state_t nxt_conn_proxy_client_wait_state; +static const nxt_conn_state_t nxt_conn_proxy_client_first_read_state; +static const nxt_conn_state_t nxt_conn_proxy_peer_connect_state; +static const nxt_conn_state_t nxt_conn_proxy_peer_wait_state; +static const nxt_conn_state_t nxt_conn_proxy_client_read_state; +static const nxt_conn_state_t nxt_conn_proxy_peer_read_state; +static const nxt_conn_state_t nxt_conn_proxy_client_write_state; +static const nxt_conn_state_t nxt_conn_proxy_peer_write_state; + + +nxt_conn_proxy_t * +nxt_conn_proxy_create(nxt_conn_t *client) +{ + nxt_conn_t *peer; + nxt_thread_t *thr; + nxt_conn_proxy_t *p; + + p = nxt_mem_zalloc(client->mem_pool, sizeof(nxt_conn_proxy_t)); + if (nxt_slow_path(p == NULL)) { + return NULL; + } + + peer = nxt_conn_create(client->mem_pool, client->socket.task); + if (nxt_slow_path(peer == NULL)) { + return NULL; + } + + thr = nxt_thread(); + + client->read_work_queue = &thr->engine->read_work_queue; + client->write_work_queue = &thr->engine->write_work_queue; + client->socket.read_work_queue = &thr->engine->read_work_queue; + client->socket.write_work_queue = &thr->engine->write_work_queue; + peer->socket.read_work_queue = &thr->engine->read_work_queue; + peer->socket.write_work_queue = &thr->engine->write_work_queue; + + peer->socket.data = client->socket.data; + + peer->read_work_queue = client->read_work_queue; + peer->write_work_queue = client->write_work_queue; + peer->read_timer.work_queue = client->read_work_queue; + peer->write_timer.work_queue = client->write_work_queue; + + p->client = client; + p->peer = peer; + + return p; +} + + +void +nxt_conn_proxy(nxt_task_t *task, nxt_conn_proxy_t *p) +{ + nxt_conn_t *peer; + + /* + * Peer read event: not connected, disabled. + * Peer write event: not connected, disabled. + */ + + if (p->client_wait_timeout == 0) { + /* + * Peer write event: waiting for connection + * to be established with connect_timeout. + */ + peer = p->peer; + peer->write_state = &nxt_conn_proxy_peer_connect_state; + + nxt_conn_connect(task->thread->engine, peer); + } + + /* + * Client read event: waiting for client data with + * client_wait_timeout before buffer allocation. + */ + p->client->read_state = &nxt_conn_proxy_client_wait_state; + + nxt_conn_wait(p->client); +} + + +static const nxt_conn_state_t nxt_conn_proxy_client_wait_state + nxt_aligned(64) = +{ + .ready_handler = nxt_conn_proxy_client_buffer_alloc, + .close_handler = nxt_conn_proxy_close, + .error_handler = nxt_conn_proxy_error, + + .timer_handler = nxt_conn_proxy_read_timeout, + .timer_value = nxt_conn_proxy_timeout_value, + .timer_data = offsetof(nxt_conn_proxy_t, client_wait_timeout), +}; + + +static void +nxt_conn_proxy_client_buffer_alloc(nxt_task_t *task, void *obj, void *data) +{ + nxt_buf_t *b; + nxt_conn_t *client; + nxt_conn_proxy_t *p; + + client = obj; + p = data; + + nxt_debug(task, "conn proxy client first read fd:%d", client->socket.fd); + + b = nxt_buf_mem_alloc(client->mem_pool, p->client_buffer_size, + NXT_MEM_BUF_CUTBACK | NXT_MEM_BUF_USABLE); + + if (nxt_slow_path(b == NULL)) { + /* An error completion. */ + nxt_conn_proxy_complete(task, p); + return; + } + + p->client_buffer = b; + client->read = b; + + if (p->peer->socket.fd != -1) { + /* + * Client read event: waiting, no timeout. + * Client write event: blocked. + * Peer read event: disabled. + * Peer write event: waiting for connection to be established + * or blocked after the connection has established. + */ + client->read_state = &nxt_conn_proxy_client_read_state; + + } else { + /* + * Client read event: waiting for data with client_wait_timeout + * before connecting to a peer. + * Client write event: blocked. + * Peer read event: not connected, disabled. + * Peer write event: not connected, disabled. + */ + client->read_state = &nxt_conn_proxy_client_first_read_state; + } + + nxt_conn_read(task->thread->engine, client); +} + + +static const nxt_conn_state_t nxt_conn_proxy_client_first_read_state + nxt_aligned(64) = +{ + .ready_handler = nxt_conn_proxy_peer_connect, + .close_handler = nxt_conn_proxy_close, + .error_handler = nxt_conn_proxy_error, + + .timer_handler = nxt_conn_proxy_read_timeout, + .timer_value = nxt_conn_proxy_timeout_value, + .timer_data = offsetof(nxt_conn_proxy_t, client_wait_timeout), + .timer_autoreset = 1, +}; + + +static void +nxt_conn_proxy_peer_connect(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *client; + nxt_conn_proxy_t *p; + + client = obj; + p = data; + + /* + * Client read event: waiting, no timeout. + * Client write event: blocked. + * Peer read event: disabled. + * Peer write event: waiting for connection to be established + * with connect_timeout. + */ + client->read_state = &nxt_conn_proxy_client_read_state; + + p->peer->write_state = &nxt_conn_proxy_peer_connect_state; + + nxt_conn_connect(task->thread->engine, p->peer); +} + + +static const nxt_conn_state_t nxt_conn_proxy_peer_connect_state + nxt_aligned(64) = +{ + .ready_handler = nxt_conn_proxy_connected, + .close_handler = nxt_conn_proxy_refused, + .error_handler = nxt_conn_proxy_error, + + .timer_handler = nxt_conn_proxy_write_timeout, + .timer_value = nxt_conn_proxy_timeout_value, + .timer_data = offsetof(nxt_conn_proxy_t, connect_timeout), + .timer_autoreset = 1, +}; + + +static void +nxt_conn_proxy_connected(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *client, *peer; + nxt_conn_proxy_t *p; + + peer = obj; + p = data; + + nxt_debug(task, "conn proxy connected fd:%d", peer->socket.fd); + + p->connected = 1; + + nxt_conn_tcp_nodelay_on(task, peer); + nxt_conn_tcp_nodelay_on(task, p->client); + + /* Peer read event: waiting with peer_wait_timeout. */ + + peer->read_state = &nxt_conn_proxy_peer_wait_state; + peer->write_state = &nxt_conn_proxy_peer_write_state; + + nxt_conn_wait(peer); + + if (p->client_buffer != NULL) { + client = p->client; + + client->read_state = &nxt_conn_proxy_client_read_state; + client->write_state = &nxt_conn_proxy_client_write_state; + /* + * Send a client read data to the connected peer. + * Client write event: blocked. + */ + nxt_conn_proxy_read_process(task, p, client, peer); + } +} + + +static const nxt_conn_state_t nxt_conn_proxy_peer_wait_state + nxt_aligned(64) = +{ + .ready_handler = nxt_conn_proxy_peer_read, + .close_handler = nxt_conn_proxy_close, + .error_handler = nxt_conn_proxy_error, + + .timer_handler = nxt_conn_proxy_read_timeout, + .timer_value = nxt_conn_proxy_timeout_value, + .timer_data = offsetof(nxt_conn_proxy_t, peer_wait_timeout), +}; + + +static void +nxt_conn_proxy_peer_read(nxt_task_t *task, void *obj, void *data) +{ + nxt_buf_t *b; + nxt_conn_t *peer; + nxt_conn_proxy_t *p; + + peer = obj; + p = data; + + nxt_debug(task, "conn proxy peer read fd:%d", peer->socket.fd); + + b = nxt_buf_mem_alloc(peer->mem_pool, p->peer_buffer_size, + NXT_MEM_BUF_CUTBACK | NXT_MEM_BUF_USABLE); + + if (nxt_slow_path(b == NULL)) { + /* An error completion. */ + nxt_conn_proxy_complete(task, p); + return; + } + + p->peer_buffer = b; + peer->read = b; + + p->client->write_state = &nxt_conn_proxy_client_write_state; + peer->read_state = &nxt_conn_proxy_peer_read_state; + peer->write_state = &nxt_conn_proxy_peer_write_state; + + /* + * Client read event: waiting, no timeout. + * Client write event: blocked. + * Peer read event: waiting with possible peer_wait_timeout. + * Peer write event: blocked. + */ + nxt_conn_read(task->thread->engine, peer); +} + + +static const nxt_conn_state_t nxt_conn_proxy_client_read_state + nxt_aligned(64) = +{ + .ready_handler = nxt_conn_proxy_client_read_ready, + .close_handler = nxt_conn_proxy_close, + .error_handler = nxt_conn_proxy_read_error, +}; + + +static void +nxt_conn_proxy_client_read_ready(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *client; + nxt_conn_proxy_t *p; + + client = obj; + p = data; + + nxt_debug(task, "conn proxy client read ready fd:%d", client->socket.fd); + + nxt_conn_proxy_read_process(task, p, client, p->peer); +} + + +static const nxt_conn_state_t nxt_conn_proxy_peer_read_state + nxt_aligned(64) = +{ + .ready_handler = nxt_conn_proxy_peer_read_ready, + .close_handler = nxt_conn_proxy_close, + .error_handler = nxt_conn_proxy_read_error, +}; + + +static void +nxt_conn_proxy_peer_read_ready(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *peer; + nxt_conn_proxy_t *p; + + peer = obj; + p = data; + + nxt_debug(task, "conn proxy peer read ready fd:%d", peer->socket.fd); + + nxt_conn_proxy_read_process(task, p, peer, p->client); +} + + +static void +nxt_conn_proxy_read_process(nxt_task_t *task, nxt_conn_proxy_t *p, + nxt_conn_t *source, nxt_conn_t *sink) +{ + nxt_buf_t *rb, *wb; + + if (sink->socket.error != 0) { + nxt_debug(task, "conn proxy sink fd:%d error:%d", + sink->socket.fd, sink->socket.error); + + nxt_conn_proxy_write_error(task, sink, sink->socket.data); + return; + } + + while (source->read != NULL) { + + rb = source->read; + + if (rb->mem.pos != rb->mem.free) { + + /* Add a read part to a write chain. */ + + wb = nxt_buf_mem_alloc(source->mem_pool, 0, 0); + if (wb == NULL) { + /* An error completion. */ + nxt_conn_proxy_complete(task, p); + return; + } + + wb->mem.pos = rb->mem.pos; + wb->mem.free = rb->mem.free; + wb->mem.start = rb->mem.pos; + wb->mem.end = rb->mem.free; + + rb->mem.pos = rb->mem.free; + rb->mem.start = rb->mem.free; + + nxt_conn_proxy_write_add(sink, wb); + } + + if (rb->mem.start != rb->mem.end) { + nxt_work_queue_add(source->read_work_queue, nxt_conn_proxy_read, + task, source, source->socket.data); + break; + } + + source->read = rb->next; + nxt_buf_free(source->mem_pool, rb); + } + + if (p->connected) { + nxt_conn_write(task->thread->engine, sink); + } +} + + +static void +nxt_conn_proxy_write_add(nxt_conn_t *c, nxt_buf_t *b) +{ + nxt_buf_t *first, *second, *prev; + + first = c->write; + + if (first == NULL) { + c->write = b; + return; + } + + /* + * A event conn proxy maintains a buffer per each direction. + * The buffer is divided by read and write parts. These parts are + * linked in buffer chains. There can be no more than two buffers + * in write chain at any time, because an added buffer is coalesced + * with the last buffer if possible. + */ + + second = first->next; + + if (second == NULL) { + + if (first->mem.end != b->mem.start) { + first->next = b; + return; + } + + /* + * The first buffer is just before the added buffer, so + * expand the first buffer to the end of the added buffer. + */ + prev = first; + + } else { + if (second->mem.end != b->mem.start) { + nxt_thread_log_alert("event conn proxy write: second buffer end:%p " + "is not equal to added buffer start:%p", + second->mem.end, b->mem.start); + return; + } + + /* + * "second->mem.end == b->mem.start" must be always true here, + * that is the second buffer is just before the added buffer, + * so expand the second buffer to the end of added buffer. + */ + prev = second; + } + + prev->mem.free = b->mem.end; + prev->mem.end = b->mem.end; + + nxt_buf_free(c->mem_pool, b); +} + + +static void +nxt_conn_proxy_read(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *source, *sink; + nxt_conn_proxy_t *p; + + source = obj; + p = data; + + nxt_debug(task, "conn proxy read fd:%d", source->socket.fd); + + if (!source->socket.closed) { + sink = (source == p->client) ? p->peer : p->client; + + if (sink->socket.error == 0) { + nxt_conn_read(task->thread->engine, source); + } + } +} + + +static const nxt_conn_state_t nxt_conn_proxy_client_write_state + nxt_aligned(64) = +{ + .ready_handler = nxt_conn_proxy_client_write_ready, + .error_handler = nxt_conn_proxy_write_error, + + .timer_handler = nxt_conn_proxy_write_timeout, + .timer_value = nxt_conn_proxy_timeout_value, + .timer_data = offsetof(nxt_conn_proxy_t, client_write_timeout), + .timer_autoreset = 1, +}; + + +static void +nxt_conn_proxy_client_write_ready(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *client; + nxt_conn_proxy_t *p; + + client = obj; + p = data; + + nxt_debug(task, "conn proxy client write ready fd:%d", client->socket.fd); + + nxt_conn_proxy_write_process(task, p, client, p->peer); +} + + +static const nxt_conn_state_t nxt_conn_proxy_peer_write_state + nxt_aligned(64) = +{ + .ready_handler = nxt_conn_proxy_peer_write_ready, + .error_handler = nxt_conn_proxy_write_error, + + .timer_handler = nxt_conn_proxy_write_timeout, + .timer_value = nxt_conn_proxy_timeout_value, + .timer_data = offsetof(nxt_conn_proxy_t, peer_write_timeout), + .timer_autoreset = 1, +}; + + +static void +nxt_conn_proxy_peer_write_ready(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *peer; + nxt_conn_proxy_t *p; + + peer = obj; + p = data; + + nxt_debug(task, "conn proxy peer write ready fd:%d", peer->socket.fd); + + nxt_conn_proxy_write_process(task, p, peer, p->client); +} + + +static void +nxt_conn_proxy_write_process(nxt_task_t *task, nxt_conn_proxy_t *p, + nxt_conn_t *sink, nxt_conn_t *source) +{ + nxt_buf_t *rb, *wb; + + while (sink->write != NULL) { + + wb = sink->write; + + if (nxt_buf_is_sync(wb)) { + + /* A sync buffer marks the end of stream. */ + + sink->write = NULL; + nxt_buf_free(sink->mem_pool, wb); + nxt_conn_proxy_shutdown(task, p, source, sink); + return; + } + + if (wb->mem.start != wb->mem.pos) { + + /* Add a written part to a read chain. */ + + rb = nxt_buf_mem_alloc(sink->mem_pool, 0, 0); + if (rb == NULL) { + /* An error completion. */ + nxt_conn_proxy_complete(task, p); + return; + } + + rb->mem.pos = wb->mem.start; + rb->mem.free = wb->mem.start; + rb->mem.start = wb->mem.start; + rb->mem.end = wb->mem.pos; + + wb->mem.start = wb->mem.pos; + + nxt_conn_proxy_read_add(source, rb); + } + + if (wb->mem.pos != wb->mem.free) { + nxt_conn_write(task->thread->engine, sink); + + break; + } + + sink->write = wb->next; + nxt_buf_free(sink->mem_pool, wb); + } + + nxt_work_queue_add(source->read_work_queue, nxt_conn_proxy_read, + task, source, source->socket.data); +} + + +static void +nxt_conn_proxy_read_add(nxt_conn_t *c, nxt_buf_t *b) +{ + nxt_buf_t *first, *second; + + first = c->read; + + if (first == NULL) { + c->read = b; + return; + } + + /* + * A event conn proxy maintains a buffer per each direction. + * The buffer is divided by read and write parts. These parts are + * linked in buffer chains. There can be no more than two buffers + * in read chain at any time, because an added buffer is coalesced + * with the last buffer if possible. The first and the second + * buffers are also coalesced if possible. + */ + + second = first->next; + + if (second == NULL) { + + if (first->mem.start == b->mem.end) { + /* + * The added buffer is just before the first buffer, so expand + * the first buffer to the beginning of the added buffer. + */ + first->mem.pos = b->mem.start; + first->mem.free = b->mem.start; + first->mem.start = b->mem.start; + + } else if (first->mem.end == b->mem.start) { + /* + * The added buffer is just after the first buffer, so + * expand the first buffer to the end of the added buffer. + */ + first->mem.end = b->mem.end; + + } else { + first->next = b; + return; + } + + } else { + if (second->mem.end != b->mem.start) { + nxt_thread_log_alert("event conn proxy read: second buffer end:%p " + "is not equal to added buffer start:%p", + second->mem.end, b->mem.start); + return; + } + + /* + * The added buffer is just after the second buffer, so + * expand the second buffer to the end of the added buffer. + */ + second->mem.end = b->mem.end; + + if (first->mem.start == second->mem.end) { + /* + * The second buffer is just before the first buffer, so expand + * the first buffer to the beginning of the second buffer. + */ + first->mem.pos = second->mem.start; + first->mem.free = second->mem.start; + first->mem.start = second->mem.start; + first->next = NULL; + + nxt_buf_free(c->mem_pool, second); + } + } + + nxt_buf_free(c->mem_pool, b); +} + + +static void +nxt_conn_proxy_close(nxt_task_t *task, void *obj, void *data) +{ + nxt_buf_t *b; + nxt_conn_t *source, *sink; + nxt_conn_proxy_t *p; + + source = obj; + p = data; + + nxt_debug(task, "conn proxy close fd:%d", source->socket.fd); + + sink = (source == p->client) ? p->peer : p->client; + + if (sink->write == NULL) { + nxt_conn_proxy_shutdown(task, p, source, sink); + return; + } + + b = nxt_buf_sync_alloc(source->mem_pool, 0); + if (b == NULL) { + /* An error completion. */ + nxt_conn_proxy_complete(task, p); + return; + } + + nxt_buf_chain_add(&sink->write, b); +} + + +static void +nxt_conn_proxy_error(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *c; + nxt_conn_proxy_t *p; + + c = obj; + p = data; + + nxt_debug(task, "conn proxy error fd:%d", c->socket.fd); + + nxt_conn_proxy_close(task, c, p); +} + + +static void +nxt_conn_proxy_read_timeout(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *c; + nxt_timer_t *timer; + + timer = obj; + + c = nxt_read_timer_conn(timer); + c->socket.timedout = 1; + c->socket.closed = 1; + + nxt_debug(task, "conn proxy read timeout fd:%d", c->socket.fd); + + nxt_conn_proxy_close(task, c, c->socket.data); +} + + +static void +nxt_conn_proxy_write_timeout(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *c; + nxt_timer_t *timer; + + timer = obj; + + c = nxt_write_timer_conn(timer); + c->socket.timedout = 1; + c->socket.closed = 1; + + nxt_debug(task, "conn proxy write timeout fd:%d", c->socket.fd); + + nxt_conn_proxy_close(task, c, c->socket.data); +} + + +static nxt_msec_t +nxt_conn_proxy_timeout_value(nxt_conn_t *c, uintptr_t data) +{ + nxt_msec_t *timer; + nxt_conn_proxy_t *p; + + p = c->socket.data; + + timer = (nxt_msec_t *) ((char *) p + data); + + return *timer; +} + + +static void +nxt_conn_proxy_refused(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *peer; + nxt_conn_proxy_t *p; + + peer = obj; + p = data; + + nxt_debug(task, "conn proxy refused fd:%d", peer->socket.fd); + + if (p->retries == 0) { + /* An error completion. */ + nxt_conn_proxy_complete(task, p); + return; + } + + p->retries--; + + nxt_socket_close(task, peer->socket.fd); + peer->socket.fd = -1; + peer->socket.error = 0; + + p->delayed = 1; + + peer->write_timer.handler = nxt_conn_proxy_reconnect_handler; + nxt_timer_add(task->thread->engine, &peer->write_timer, + p->reconnect_timeout); +} + + +static void +nxt_conn_proxy_reconnect_handler(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *peer; + nxt_timer_t *timer; + nxt_conn_proxy_t *p; + + timer = obj; + + nxt_debug(task, "conn proxy reconnect timer"); + + peer = nxt_write_timer_conn(timer); + p = peer->socket.data; + + if (p->client->socket.closed) { + nxt_conn_proxy_complete(task, p); + return; + } + + p->delayed = 0; + + peer->write_state = &nxt_conn_proxy_peer_connect_state; + /* + * Peer read event: disabled. + * Peer write event: waiting for connection with connect_timeout. + */ + nxt_conn_connect(task->thread->engine, peer); +} + + +static void +nxt_conn_proxy_shutdown(nxt_task_t *task, nxt_conn_proxy_t *p, + nxt_conn_t *source, nxt_conn_t *sink) +{ + nxt_buf_t *b; + + nxt_debug(source->socket.task, + "conn proxy shutdown source fd:%d cl:%d err:%d", + source->socket.fd, source->socket.closed, source->socket.error); + + nxt_debug(sink->socket.task, + "conn proxy shutdown sink fd:%d cl:%d err:%d", + sink->socket.fd, sink->socket.closed, sink->socket.error); + + if (!p->connected || p->delayed) { + nxt_conn_proxy_complete(task, p); + return; + } + + if (sink->socket.error == 0 && !sink->socket.closed) { + sink->socket.shutdown = 1; + nxt_socket_shutdown(task, sink->socket.fd, SHUT_WR); + } + + if (sink->socket.error != 0 + || (sink->socket.closed && source->write == NULL)) + { + /* The opposite direction also has been already closed. */ + nxt_conn_proxy_complete(task, p); + return; + } + + nxt_debug(source->socket.task, "free source buffer"); + + /* Free the direction's buffer. */ + b = (source == p->client) ? p->client_buffer : p->peer_buffer; + nxt_mem_free(source->mem_pool, b); +} + + +static void +nxt_conn_proxy_read_error(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *c; + nxt_conn_proxy_t *p; + + c = obj; + p = data; + + nxt_debug(task, "conn proxy read error fd:%d", c->socket.fd); + + nxt_conn_proxy_close(task, c, p); +} + + +static void +nxt_conn_proxy_write_error(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_t *source, *sink; + nxt_conn_proxy_t *p; + + sink = obj; + p = data; + + nxt_debug(task, "conn proxy write error fd:%d", sink->socket.fd); + + /* Clear data for the direction sink. */ + sink->write = NULL; + + /* Block the direction source. */ + source = (sink == p->client) ? p->peer : p->client; + nxt_fd_event_block_read(task->thread->engine, &source->socket); + + if (source->write == NULL) { + /* + * There is no data for the opposite direction and + * the next read from the sink will most probably fail. + */ + nxt_conn_proxy_complete(task, p); + } +} + + +static const nxt_conn_state_t nxt_conn_proxy_close_state + nxt_aligned(64) = +{ + .ready_handler = nxt_conn_proxy_completion, +}; + + +static void +nxt_conn_proxy_complete(nxt_task_t *task, nxt_conn_proxy_t *p) +{ + nxt_event_engine_t *engine; + + engine = task->thread->engine; + + nxt_debug(p->client->socket.task, "conn proxy complete %d:%d", + p->client->socket.fd, p->peer->socket.fd); + + if (p->delayed) { + p->delayed = 0; + nxt_queue_remove(&p->peer->link); + } + + if (p->client->socket.fd != -1) { + p->retain = 1; + p->client->write_state = &nxt_conn_proxy_close_state; + nxt_conn_close(engine, p->client); + } + + if (p->peer->socket.fd != -1) { + p->retain++; + p->peer->write_state = &nxt_conn_proxy_close_state; + nxt_conn_close(engine, p->peer); + } +} + + +static void +nxt_conn_proxy_completion(nxt_task_t *task, void *obj, void *data) +{ + nxt_conn_proxy_t *p; + + p = data; + + nxt_debug(p->client->socket.task, "conn proxy completion %d:%d:%d", + p->retain, p->client->socket.fd, p->peer->socket.fd); + + p->retain--; + + if (p->retain == 0) { + nxt_mem_free(p->client->mem_pool, p->client_buffer); + nxt_mem_free(p->client->mem_pool, p->peer_buffer); + + p->completion_handler(task, p, NULL); + } +} |