/* * Copyright (C) Igor Sysoev * Copyright (C) NGINX, Inc. */ #include static void nxt_event_conn_proxy_client_buffer_alloc(nxt_task_t *task, void *obj, void *data); static void nxt_event_conn_proxy_peer_connect(nxt_task_t *task, void *obj, void *data); static void nxt_event_conn_proxy_connected(nxt_task_t *task, void *obj, void *data); static void nxt_event_conn_proxy_peer_read(nxt_task_t *task, void *obj, void *data); static void nxt_event_conn_proxy_client_read_ready(nxt_task_t *task, void *obj, void *data); static void nxt_event_conn_proxy_peer_read_ready(nxt_task_t *task, void *obj, void *data); static void nxt_event_conn_proxy_read_process(nxt_task_t *task, nxt_event_conn_proxy_t *p, nxt_event_conn_t *source, nxt_event_conn_t *sink); static void nxt_event_conn_proxy_write_add(nxt_event_conn_t *c, nxt_buf_t *b); static void nxt_event_conn_proxy_read(nxt_task_t *task, void *obj, void *data); static void nxt_event_conn_proxy_client_write_ready(nxt_task_t *task, void *obj, void *data); static void nxt_event_conn_proxy_peer_write_ready(nxt_task_t *task, void *obj, void *data); static void nxt_event_conn_proxy_write_process(nxt_task_t *task, nxt_event_conn_proxy_t *p, nxt_event_conn_t *sink, nxt_event_conn_t *source); static void nxt_event_conn_proxy_read_add(nxt_event_conn_t *c, nxt_buf_t *b); static void nxt_event_conn_proxy_close(nxt_task_t *task, void *obj, void *data); static void nxt_event_conn_proxy_error(nxt_task_t *task, void *obj, void *data); static void nxt_event_conn_proxy_read_timeout(nxt_task_t *task, void *obj, void *data); static void nxt_event_conn_proxy_write_timeout(nxt_task_t *task, void *obj, void *data); static nxt_msec_t nxt_event_conn_proxy_timeout_value(nxt_event_conn_t *c, uintptr_t data); static void nxt_event_conn_proxy_refused(nxt_task_t *task, void *obj, void *data); static void nxt_event_conn_proxy_reconnect_handler(nxt_task_t *task, void *obj, void *data); static void nxt_event_conn_proxy_shutdown(nxt_task_t *task, nxt_event_conn_proxy_t *p, nxt_event_conn_t *source, nxt_event_conn_t *sink); static void nxt_event_conn_proxy_read_error(nxt_task_t *task, void *obj, void *data); static void nxt_event_conn_proxy_write_error(nxt_task_t *task, void *obj, void *data); static void nxt_event_conn_proxy_complete(nxt_task_t *task, nxt_event_conn_proxy_t *p); static const nxt_event_conn_state_t nxt_event_conn_proxy_client_wait_state; static const nxt_event_conn_state_t nxt_event_conn_proxy_client_first_read_state; static const nxt_event_conn_state_t nxt_event_conn_proxy_peer_connect_state; static const nxt_event_conn_state_t nxt_event_conn_proxy_peer_wait_state; static const nxt_event_conn_state_t nxt_event_conn_proxy_client_read_state; static const nxt_event_conn_state_t nxt_event_conn_proxy_peer_read_state; static const nxt_event_conn_state_t nxt_event_conn_proxy_client_write_state; static const nxt_event_conn_state_t nxt_event_conn_proxy_peer_write_state; nxt_event_conn_proxy_t * nxt_event_conn_proxy_create(nxt_event_conn_t *client) { nxt_thread_t *thr; nxt_event_conn_t *peer; nxt_event_conn_proxy_t *p; p = nxt_mem_zalloc(client->mem_pool, sizeof(nxt_event_conn_proxy_t)); if (nxt_slow_path(p == NULL)) { return NULL; } peer = nxt_event_conn_create(client->mem_pool, client->socket.log); if (nxt_slow_path(peer == NULL)) { return NULL; } thr = nxt_thread(); client->socket.read_work_queue = &thr->engine->read_work_queue; client->socket.write_work_queue = &thr->engine->write_work_queue; peer->socket.read_work_queue = &thr->engine->read_work_queue; peer->socket.write_work_queue = &thr->engine->write_work_queue; peer->socket.data = client->socket.data; peer->read_work_queue = client->read_work_queue; peer->write_work_queue = client->write_work_queue; peer->read_timer.work_queue = client->read_work_queue; peer->write_timer.work_queue = client->write_work_queue; p->client = client; p->peer = peer; return p; } void nxt_event_conn_proxy(nxt_task_t *task, nxt_event_conn_proxy_t *p) { nxt_thread_t *thr; nxt_event_conn_t *peer; thr = nxt_thread(); /* * Peer read event: not connected, disabled. * Peer write event: not connected, disabled. */ if (p->client_wait_timeout == 0) { /* * Peer write event: waiting for connection * to be established with connect_timeout. */ peer = p->peer; peer->write_state = &nxt_event_conn_proxy_peer_connect_state; nxt_event_conn_connect_enqueue(thr, task, peer); } /* * Client read event: waiting for client data with * client_wait_timeout before buffer allocation. */ p->client->read_state = &nxt_event_conn_proxy_client_wait_state; nxt_event_conn_read(task, p->client); } static const nxt_event_conn_state_t nxt_event_conn_proxy_client_wait_state nxt_aligned(64) = { NXT_EVENT_NO_BUF_PROCESS, NXT_EVENT_TIMER_NO_AUTORESET, nxt_event_conn_proxy_client_buffer_alloc, nxt_event_conn_proxy_close, nxt_event_conn_proxy_error, nxt_event_conn_proxy_read_timeout, nxt_event_conn_proxy_timeout_value, offsetof(nxt_event_conn_proxy_t, client_wait_timeout), }; static void nxt_event_conn_proxy_client_buffer_alloc(nxt_task_t *task, void *obj, void *data) { nxt_buf_t *b; nxt_event_conn_t *client; nxt_event_conn_proxy_t *p; client = obj; p = data; nxt_debug(task, "event conn proxy client first read fd:%d", client->socket.fd); b = nxt_buf_mem_alloc(client->mem_pool, p->client_buffer_size, NXT_MEM_BUF_CUTBACK | NXT_MEM_BUF_USABLE); if (nxt_slow_path(b == NULL)) { /* An error completion. */ nxt_event_conn_proxy_complete(task, p); return; } p->client_buffer = b; client->read = b; if (p->peer->socket.fd != -1) { /* * Client read event: waiting, no timeout. * Client write event: blocked. * Peer read event: disabled. * Peer write event: waiting for connection to be established * or blocked after the connection has established. */ client->read_state = &nxt_event_conn_proxy_client_read_state; } else { /* * Client read event: waiting for data with client_wait_timeout * before connecting to a peer. * Client write event: blocked. * Peer read event: not connected, disabled. * Peer write event: not connected, disabled. */ client->read_state = &nxt_event_conn_proxy_client_first_read_state; } nxt_event_conn_read(task, client); } static const nxt_event_conn_state_t nxt_event_conn_proxy_client_first_read_state nxt_aligned(64) = { NXT_EVENT_BUF_PROCESS, NXT_EVENT_TIMER_AUTORESET, nxt_event_conn_proxy_peer_connect, nxt_event_conn_proxy_close, nxt_event_conn_proxy_error, nxt_event_conn_proxy_read_timeout, nxt_event_conn_proxy_timeout_value, offsetof(nxt_event_conn_proxy_t, client_wait_timeout), }; static void nxt_event_conn_proxy_peer_connect(nxt_task_t *task, void *obj, void *data) { nxt_event_conn_t *client; nxt_event_conn_proxy_t *p; client = obj; p = data; /* * Client read event: waiting, no timeout. * Client write event: blocked. * Peer read event: disabled. * Peer write event: waiting for connection to be established * with connect_timeout. */ client->read_state = &nxt_event_conn_proxy_client_read_state; p->peer->write_state = &nxt_event_conn_proxy_peer_connect_state; nxt_event_conn_connect(task, p->peer); } static const nxt_event_conn_state_t nxt_event_conn_proxy_peer_connect_state nxt_aligned(64) = { NXT_EVENT_NO_BUF_PROCESS, NXT_EVENT_TIMER_AUTORESET, nxt_event_conn_proxy_connected, nxt_event_conn_proxy_refused, nxt_event_conn_proxy_error, nxt_event_conn_proxy_write_timeout, nxt_event_conn_proxy_timeout_value, offsetof(nxt_event_conn_proxy_t, connect_timeout), }; static void nxt_event_conn_proxy_connected(nxt_task_t *task, void *obj, void *data) { nxt_event_conn_t *client, *peer; nxt_event_conn_proxy_t *p; peer = obj; p = data; nxt_debug(task, "event conn proxy connected fd:%d", peer->socket.fd); p->connected = 1; nxt_event_conn_tcp_nodelay_on(peer); nxt_event_conn_tcp_nodelay_on(p->client); /* Peer read event: waiting with peer_wait_timeout. */ peer->read_state = &nxt_event_conn_proxy_peer_wait_state; peer->write_state = &nxt_event_conn_proxy_peer_write_state; nxt_event_conn_read_enqueue(task->thread, task, peer); if (p->client_buffer != NULL) { client = p->client; client->read_state = &nxt_event_conn_proxy_client_read_state; client->write_state = &nxt_event_conn_proxy_client_write_state; /* * Send a client read data to the connected peer. * Client write event: blocked. */ nxt_event_conn_proxy_read_process(task, p, client, peer); } } static const nxt_event_conn_state_t nxt_event_conn_proxy_peer_wait_state nxt_aligned(64) = { NXT_EVENT_NO_BUF_PROCESS, NXT_EVENT_TIMER_NO_AUTORESET, nxt_event_conn_proxy_peer_read, nxt_event_conn_proxy_close, nxt_event_conn_proxy_error, nxt_event_conn_proxy_read_timeout, nxt_event_conn_proxy_timeout_value, offsetof(nxt_event_conn_proxy_t, peer_wait_timeout), }; static void nxt_event_conn_proxy_peer_read(nxt_task_t *task, void *obj, void *data) { nxt_buf_t *b; nxt_event_conn_t *peer; nxt_event_conn_proxy_t *p; peer = obj; p = data; nxt_debug(task, "event conn proxy peer read fd:%d", peer->socket.fd); b = nxt_buf_mem_alloc(peer->mem_pool, p->peer_buffer_size, NXT_MEM_BUF_CUTBACK | NXT_MEM_BUF_USABLE); if (nxt_slow_path(b == NULL)) { /* An error completion. */ nxt_event_conn_proxy_complete(task, p); return; } p->peer_buffer = b; peer->read = b; p->client->write_state = &nxt_event_conn_proxy_client_write_state; peer->read_state = &nxt_event_conn_proxy_peer_read_state; peer->write_state = &nxt_event_conn_proxy_peer_write_state; /* * Client read event: waiting, no timeout. * Client write event: blocked. * Peer read event: waiting with possible peer_wait_timeout. * Peer write event: blocked. */ nxt_event_conn_read(task, peer); } static const nxt_event_conn_state_t nxt_event_conn_proxy_client_read_state nxt_aligned(64) = { NXT_EVENT_BUF_PROCESS, NXT_EVENT_TIMER_NO_AUTORESET, nxt_event_conn_proxy_client_read_ready, nxt_event_conn_proxy_close, nxt_event_conn_proxy_read_error, NULL, NULL, 0, }; static void nxt_event_conn_proxy_client_read_ready(nxt_task_t *task, void *obj, void *data) { nxt_event_conn_t *client; nxt_event_conn_proxy_t *p; client = obj; p = data; nxt_debug(task, "event conn proxy client read ready fd:%d", client->socket.fd); nxt_event_conn_proxy_read_process(task, p, client, p->peer); } static const nxt_event_conn_state_t nxt_event_conn_proxy_peer_read_state nxt_aligned(64) = { NXT_EVENT_BUF_PROCESS, NXT_EVENT_TIMER_NO_AUTORESET, nxt_event_conn_proxy_peer_read_ready, nxt_event_conn_proxy_close, nxt_event_conn_proxy_read_error, NULL, NULL, 0, }; static void nxt_event_conn_proxy_peer_read_ready(nxt_task_t *task, void *obj, void *data) { nxt_event_conn_t *peer; nxt_event_conn_proxy_t *p; peer = obj; p = data; nxt_debug(task, "event conn proxy peer read ready fd:%d", peer->socket.fd); nxt_event_conn_proxy_read_process(task, p, peer, p->client); } static void nxt_event_conn_proxy_read_process(nxt_task_t *task, nxt_event_conn_proxy_t *p, nxt_event_conn_t *source, nxt_event_conn_t *sink) { nxt_buf_t *rb, *wb; if (sink->socket.error != 0) { nxt_debug(task, "event conn proxy sink fd:%d error:%d", sink->socket.fd, sink->socket.error); nxt_event_conn_proxy_write_error(task, sink, sink->socket.data); return; } while (source->read != NULL) { rb = source->read; if (rb->mem.pos != rb->mem.free) { /* Add a read part to a write chain. */ wb = nxt_buf_mem_alloc(source->mem_pool, 0, 0); if (wb == NULL) { /* An error completion. */ nxt_event_conn_proxy_complete(task, p); return; } wb->mem.pos = rb->mem.pos; wb->mem.free = rb->mem.free; wb->mem.start = rb->mem.pos; wb->mem.end = rb->mem.free; rb->mem.pos = rb->mem.free; rb->mem.start = rb->mem.free; nxt_event_conn_proxy_write_add(sink, wb); } if (rb->mem.start != rb->mem.end) { nxt_work_queue_add(source->read_work_queue, nxt_event_conn_proxy_read, task, source, source->socket.data); break; } source->read = rb->next; nxt_buf_free(source->mem_pool, rb); } if (p->connected) { nxt_event_conn_write_enqueue(task->thread, task, sink); } } static void nxt_event_conn_proxy_write_add(nxt_event_conn_t *c, nxt_buf_t *b) { nxt_buf_t *first, *second, *prev; first = c->write; if (first == NULL) { c->write = b; return; } /* * A event conn proxy maintains a buffer per each direction. * The buffer is divided by read and write parts. These parts are * linked in buffer chains. There can be no more than two buffers * in write chain at any time, because an added buffer is coalesced * with the last buffer if possible. */ second = first->next; if (second == NULL) { if (first->mem.end != b->mem.start) { first->next = b; return; } /* * The first buffer is just before the added buffer, so * expand the first buffer to the end of the added buffer. */ prev = first; } else { if (second->mem.end != b->mem.start) { nxt_thread_log_alert("event conn proxy write: second buffer end:%p " "is not equal to added buffer start:%p", second->mem.end, b->mem.start); return; } /* * "second->mem.end == b->mem.start" must be always true here, * that is the second buffer is just before the added buffer, * so expand the second buffer to the end of added buffer. */ prev = second; } prev->mem.free = b->mem.end; prev->mem.end = b->mem.end; nxt_buf_free(c->mem_pool, b); } static void nxt_event_conn_proxy_read(nxt_task_t *task, void *obj, void *data) { nxt_event_conn_t *source, *sink; nxt_event_conn_proxy_t *p; source = obj; p = data; nxt_debug(task, "event conn proxy read fd:%d", source->socket.fd); if (!source->socket.closed) { sink = (source == p->client) ? p->peer : p->client; if (sink->socket.error == 0) { nxt_event_conn_read(task, source); } } } static const nxt_event_conn_state_t nxt_event_conn_proxy_client_write_state nxt_aligned(64) = { NXT_EVENT_NO_BUF_PROCESS, NXT_EVENT_TIMER_AUTORESET, nxt_event_conn_proxy_client_write_ready, NULL, nxt_event_conn_proxy_write_error, nxt_event_conn_proxy_write_timeout, nxt_event_conn_proxy_timeout_value, offsetof(nxt_event_conn_proxy_t, client_write_timeout), }; static void nxt_event_conn_proxy_client_write_ready(nxt_task_t *task, void *obj, void *data) { nxt_event_conn_t *client; nxt_event_conn_proxy_t *p; client = obj; p = data; nxt_debug(task, "event conn proxy client write ready fd:%d", client->socket.fd); nxt_event_conn_proxy_write_process(task, p, client, p->peer); } static const nxt_event_conn_state_t nxt_event_conn_proxy_peer_write_state nxt_aligned(64) = { NXT_EVENT_NO_BUF_PROCESS, NXT_EVENT_TIMER_AUTORESET, nxt_event_conn_proxy_peer_write_ready, NULL, nxt_event_conn_proxy_write_error, nxt_event_conn_proxy_write_timeout, nxt_event_conn_proxy_timeout_value, offsetof(nxt_event_conn_proxy_t, peer_write_timeout), }; static void nxt_event_conn_proxy_peer_write_ready(nxt_task_t *task, void *obj, void *data) { nxt_event_conn_t *peer; nxt_event_conn_proxy_t *p; peer = obj; p = data; nxt_debug(task, "event conn proxy peer write ready fd:%d", peer->socket.fd); nxt_event_conn_proxy_write_process(task, p, peer, p->client); } static void nxt_event_conn_proxy_write_process(nxt_task_t *task, nxt_event_conn_proxy_t *p, nxt_event_conn_t *sink, nxt_event_conn_t *source) { nxt_buf_t *rb, *wb; while (sink->write != NULL) { wb = sink->write; if (nxt_buf_is_sync(wb)) { /* A sync buffer marks the end of stream. */ sink->write = NULL; nxt_buf_free(sink->mem_pool, wb); nxt_event_conn_proxy_shutdown(task, p, source, sink); return; } if (wb->mem.start != wb->mem.pos) { /* Add a written part to a read chain. */ rb = nxt_buf_mem_alloc(sink->mem_pool, 0, 0); if (rb == NULL) { /* An error completion. */ nxt_event_conn_proxy_complete(task, p); return; } rb->mem.pos = wb->mem.start; rb->mem.free = wb->mem.start; rb->mem.start = wb->mem.start; rb->mem.end = wb->mem.pos; wb->mem.start = wb->mem.pos; nxt_event_conn_proxy_read_add(source, rb); } if (wb->mem.pos != wb->mem.free) { nxt_event_conn_write_enqueue(task->thread, task, sink); break; } sink->write = wb->next; nxt_buf_free(sink->mem_pool, wb); } nxt_work_queue_add(source->read_work_queue, nxt_event_conn_proxy_read, task, source, source->socket.data); } static void nxt_event_conn_proxy_read_add(nxt_event_conn_t *c, nxt_buf_t *b) { nxt_buf_t *first, *second; first = c->read; if (first == NULL) { c->read = b; return; } /* * A event conn proxy maintains a buffer per each direction. * The buffer is divided by read and write parts. These parts are * linked in buffer chains. There can be no more than two buffers * in read chain at any time, because an added buffer is coalesced * with the last buffer if possible. The first and the second * buffers are also coalesced if possible. */ second = first->next; if (second == NULL) { if (first->mem.start == b->mem.end) { /* * The added buffer is just before the first buffer, so expand * the first buffer to the beginning of the added buffer. */ first->mem.pos = b->mem.start; first->mem.free = b->mem.start; first->mem.start = b->mem.start; } else if (first->mem.end == b->mem.start) { /* * The added buffer is just after the first buffer, so * expand the first buffer to the end of the added buffer. */ first->mem.end = b->mem.end; } else { first->next = b; return; } } else { if (second->mem.end != b->mem.start) { nxt_thread_log_alert("event conn proxy read: second buffer end:%p " "is not equal to added buffer start:%p", second->mem.end, b->mem.start); return; } /* * The added buffer is just after the second buffer, so * expand the second buffer to the end of the added buffer. */ second->mem.end = b->mem.end; if (first->mem.start == second->mem.end) { /* * The second buffer is just before the first buffer, so expand * the first buffer to the beginning of the second buffer. */ first->mem.pos = second->mem.start; first->mem.free = second->mem.start; first->mem.start = second->mem.start; first->next = NULL; nxt_buf_free(c->mem_pool, second); } } nxt_buf_free(c->mem_pool, b); } static void nxt_event_conn_proxy_close(nxt_task_t *task, void *obj, void *data) { nxt_buf_t *b; nxt_event_conn_t *source, *sink; nxt_event_conn_proxy_t *p; source = obj; p = data; nxt_debug(task, "event conn proxy close fd:%d", source->socket.fd); sink = (source == p->client) ? p->peer : p->client; if (sink->write == NULL) { nxt_event_conn_proxy_shutdown(task, p, source, sink); return; } b = nxt_buf_sync_alloc(source->mem_pool, 0); if (b == NULL) { /* An error completion. */ nxt_event_conn_proxy_complete(task, p); return; } nxt_buf_chain_add(&sink->write, b); } static void nxt_event_conn_proxy_error(nxt_task_t *task, void *obj, void *data) { nxt_event_conn_t *c; nxt_event_conn_proxy_t *p; c = obj; p = data; nxt_debug(task, "event conn proxy error fd:%d", c->socket.fd); nxt_event_conn_proxy_close(task, c, p); } static void nxt_event_conn_proxy_read_timeout(nxt_task_t *task, void *obj, void *data) { nxt_timer_t *ev; nxt_event_conn_t *c; ev = obj; c = nxt_event_read_timer_conn(ev); c->socket.timedout = 1; c->socket.closed = 1; nxt_debug(task, "event conn proxy read timeout fd:%d", c->socket.fd); nxt_event_conn_proxy_close(task, c, c->socket.data); } static void nxt_event_conn_proxy_write_timeout(nxt_task_t *task, void *obj, void *data) { nxt_timer_t *ev; nxt_event_conn_t *c; ev = obj; c = nxt_event_write_timer_conn(ev); c->socket.timedout = 1; c->socket.closed = 1; nxt_debug(task, "event conn proxy write timeout fd:%d", c->socket.fd); nxt_event_conn_proxy_close(task, c, c->socket.data); } static nxt_msec_t nxt_event_conn_proxy_timeout_value(nxt_event_conn_t *c, uintptr_t data) { nxt_msec_t *timer; nxt_event_conn_proxy_t *p; p = c->socket.data; timer = (nxt_msec_t *) ((char *) p + data); return *timer; } static void nxt_event_conn_proxy_refused(nxt_task_t *task, void *obj, void *data) { nxt_event_conn_t *peer; nxt_event_conn_proxy_t *p; peer = obj; p = data; nxt_debug(task, "event conn proxy refused fd:%d", peer->socket.fd); if (p->retries == 0) { /* An error completion. */ nxt_event_conn_proxy_complete(task, p); return; } p->retries--; nxt_socket_close(peer->socket.fd); peer->socket.fd = -1; peer->socket.error = 0; p->delayed = 1; peer->write_timer.handler = nxt_event_conn_proxy_reconnect_handler; nxt_timer_add(task->thread->engine, &peer->write_timer, p->reconnect_timeout); } static void nxt_event_conn_proxy_reconnect_handler(nxt_task_t *task, void *obj, void *data) { nxt_timer_t *ev; nxt_event_conn_t *peer; nxt_event_conn_proxy_t *p; ev = obj; nxt_debug(task, "event conn proxy reconnect timer"); peer = nxt_event_write_timer_conn(ev); p = peer->socket.data; if (p->client->socket.closed) { nxt_event_conn_proxy_complete(task, p); return; } p->delayed = 0; peer->write_state = &nxt_event_conn_proxy_peer_connect_state; /* * Peer read event: disabled. * Peer write event: waiting for connection with connect_timeout. */ nxt_event_conn_connect(task, peer); } static void nxt_event_conn_proxy_shutdown(nxt_task_t *task, nxt_event_conn_proxy_t *p, nxt_event_conn_t *source, nxt_event_conn_t *sink) { nxt_buf_t *b; nxt_debug(source->socket.task, "event conn proxy shutdown source fd:%d cl:%d err:%d", source->socket.fd, source->socket.closed, source->socket.error); nxt_debug(sink->socket.task, "event conn proxy shutdown sink fd:%d cl:%d err:%d", sink->socket.fd, sink->socket.closed, sink->socket.error); if (!p->connected || p->delayed) { nxt_event_conn_proxy_complete(task, p); return; } if (sink->socket.error != 0 || sink->socket.closed) { /* * A socket is already closed or half-closed by * remote side so the shutdown() syscall is surplus * since the close() syscall also sends FIN. */ nxt_event_conn_close(task, sink); } else { nxt_socket_shutdown(sink->socket.fd, SHUT_WR); } if (sink->socket.error != 0 || (sink->socket.closed && source->write == NULL)) { /* The opposite direction also has been already closed. */ nxt_event_conn_proxy_complete(task, p); return; } /* Free the direction's buffer. */ b = (source == p->client) ? p->client_buffer : p->peer_buffer; nxt_mem_free(source->mem_pool, b); } static void nxt_event_conn_proxy_read_error(nxt_task_t *task, void *obj, void *data) { nxt_event_conn_t *c; nxt_event_conn_proxy_t *p; c = obj; p = data; nxt_debug(task, "event conn proxy read error fd:%d", c->socket.fd); nxt_event_conn_proxy_close(task, c, p); } static void nxt_event_conn_proxy_write_error(nxt_task_t *task, void *obj, void *data) { nxt_event_conn_t *source, *sink; nxt_event_conn_proxy_t *p; sink = obj; p = data; nxt_debug(task, "event conn proxy write error fd:%d", sink->socket.fd); /* Clear data for the direction sink. */ sink->write = NULL; /* Block the direction source. */ source = (sink == p->client) ? p->peer : p->client; nxt_event_fd_block_read(task->thread->engine, &source->socket); if (source->write == NULL) { /* * There is no data for the opposite direction and * the next read from the sink will most probably fail. */ nxt_event_conn_proxy_complete(task, p); } } static void nxt_event_conn_proxy_complete(nxt_task_t *task, nxt_event_conn_proxy_t *p) { nxt_debug(p->client->socket.task, "event conn proxy complete %d:%d", p->client->socket.fd, p->peer->socket.fd); if (p->client->socket.fd != -1) { nxt_event_conn_close(task, p->client); } if (p->peer->socket.fd != -1) { nxt_event_conn_close(task, p->peer); } else if (p->delayed) { nxt_queue_remove(&p->peer->link); nxt_timer_delete(task->thread->engine, &p->peer->write_timer); } nxt_mem_free(p->client->mem_pool, p->client_buffer); nxt_mem_free(p->client->mem_pool, p->peer_buffer); p->completion_handler(task, p, NULL); }