diff options
Diffstat (limited to 'src/nxt_event_conn_proxy.c')
-rw-r--r-- | src/nxt_event_conn_proxy.c | 257 |
1 files changed, 122 insertions, 135 deletions
diff --git a/src/nxt_event_conn_proxy.c b/src/nxt_event_conn_proxy.c index 0b9afe95..0a2a6474 100644 --- a/src/nxt_event_conn_proxy.c +++ b/src/nxt_event_conn_proxy.c @@ -7,53 +7,51 @@ #include <nxt_main.h> -static void nxt_event_conn_proxy_client_buffer_alloc(nxt_thread_t *thr, +static void nxt_event_conn_proxy_client_buffer_alloc(nxt_task_t *task, void *obj, void *data); -static void nxt_event_conn_proxy_peer_connect(nxt_thread_t *thr, void *obj, +static void nxt_event_conn_proxy_peer_connect(nxt_task_t *task, void *obj, void *data); -static void nxt_event_conn_proxy_connected(nxt_thread_t *thr, void *obj, +static void nxt_event_conn_proxy_connected(nxt_task_t *task, void *obj, void *data); -static void nxt_event_conn_proxy_peer_read(nxt_thread_t *thr, void *obj, +static void nxt_event_conn_proxy_peer_read(nxt_task_t *task, void *obj, void *data); -static void nxt_event_conn_proxy_client_read_ready(nxt_thread_t *thr, - void *obj, void *data); -static void nxt_event_conn_proxy_peer_read_ready(nxt_thread_t *thr, void *obj, +static void nxt_event_conn_proxy_client_read_ready(nxt_task_t *task, void *obj, + void *data); +static void nxt_event_conn_proxy_peer_read_ready(nxt_task_t *task, void *obj, void *data); -static void nxt_event_conn_proxy_read_process(nxt_thread_t *thr, +static void nxt_event_conn_proxy_read_process(nxt_task_t *task, nxt_event_conn_proxy_t *p, nxt_event_conn_t *source, nxt_event_conn_t *sink); static void nxt_event_conn_proxy_write_add(nxt_event_conn_t *c, nxt_buf_t *b); -static void nxt_event_conn_proxy_read(nxt_thread_t *thr, void *obj, void *data); -static void nxt_event_conn_proxy_client_write_ready(nxt_thread_t *thr, - void *obj, void *data); -static void nxt_event_conn_proxy_peer_write_ready(nxt_thread_t *thr, void *obj, +static void nxt_event_conn_proxy_read(nxt_task_t *task, void *obj, void *data); +static void nxt_event_conn_proxy_client_write_ready(nxt_task_t *task, void *obj, void *data); -static void nxt_event_conn_proxy_write_process(nxt_thread_t *thr, +static void nxt_event_conn_proxy_peer_write_ready(nxt_task_t *task, void *obj, + void *data); +static void nxt_event_conn_proxy_write_process(nxt_task_t *task, nxt_event_conn_proxy_t *p, nxt_event_conn_t *sink, nxt_event_conn_t *source); static void nxt_event_conn_proxy_read_add(nxt_event_conn_t *c, nxt_buf_t *b); -static void nxt_event_conn_proxy_close(nxt_thread_t *thr, void *obj, - void *data); -static void nxt_event_conn_proxy_error(nxt_thread_t *thr, void *obj, - void *data); -static void nxt_event_conn_proxy_read_timeout(nxt_thread_t *thr, void *obj, +static void nxt_event_conn_proxy_close(nxt_task_t *task, void *obj, void *data); +static void nxt_event_conn_proxy_error(nxt_task_t *task, void *obj, void *data); +static void nxt_event_conn_proxy_read_timeout(nxt_task_t *task, void *obj, void *data); -static void nxt_event_conn_proxy_write_timeout(nxt_thread_t *thr, void *obj, +static void nxt_event_conn_proxy_write_timeout(nxt_task_t *task, void *obj, void *data); static nxt_msec_t nxt_event_conn_proxy_timeout_value(nxt_event_conn_t *c, uintptr_t data); -static void nxt_event_conn_proxy_refused(nxt_thread_t *thr, void *obj, +static void nxt_event_conn_proxy_refused(nxt_task_t *task, void *obj, void *data); -static void nxt_event_conn_proxy_reconnect_handler(nxt_thread_t *thr, - void *obj, void *data); -static void nxt_event_conn_proxy_shutdown(nxt_thread_t *thr, +static void nxt_event_conn_proxy_reconnect_handler(nxt_task_t *task, void *obj, + void *data); +static void nxt_event_conn_proxy_shutdown(nxt_task_t *task, nxt_event_conn_proxy_t *p, nxt_event_conn_t *source, nxt_event_conn_t *sink); -static void nxt_event_conn_proxy_read_error(nxt_thread_t *thr, void *obj, +static void nxt_event_conn_proxy_read_error(nxt_task_t *task, void *obj, void *data); -static void nxt_event_conn_proxy_write_error(nxt_thread_t *thr, void *obj, +static void nxt_event_conn_proxy_write_error(nxt_task_t *task, void *obj, void *data); -static void nxt_event_conn_proxy_complete(nxt_thread_t *thr, +static void nxt_event_conn_proxy_complete(nxt_task_t *task, nxt_event_conn_proxy_t *p); @@ -107,7 +105,7 @@ nxt_event_conn_proxy_create(nxt_event_conn_t *client) void -nxt_event_conn_proxy(nxt_event_conn_proxy_t *p) +nxt_event_conn_proxy(nxt_task_t *task, nxt_event_conn_proxy_t *p) { nxt_thread_t *thr; nxt_event_conn_t *peer; @@ -127,7 +125,7 @@ nxt_event_conn_proxy(nxt_event_conn_proxy_t *p) peer = p->peer; peer->write_state = &nxt_event_conn_proxy_peer_connect_state; - nxt_event_conn_connect_enqueue(thr, peer); + nxt_event_conn_connect_enqueue(thr, task, peer); } /* @@ -136,7 +134,7 @@ nxt_event_conn_proxy(nxt_event_conn_proxy_t *p) */ p->client->read_state = &nxt_event_conn_proxy_client_wait_state; - nxt_event_conn_read(thr, p->client); + nxt_event_conn_read(task, p->client); } @@ -157,7 +155,7 @@ static const nxt_event_conn_state_t nxt_event_conn_proxy_client_wait_state static void -nxt_event_conn_proxy_client_buffer_alloc(nxt_thread_t *thr, void *obj, +nxt_event_conn_proxy_client_buffer_alloc(nxt_task_t *task, void *obj, void *data) { nxt_buf_t *b; @@ -167,15 +165,15 @@ nxt_event_conn_proxy_client_buffer_alloc(nxt_thread_t *thr, void *obj, client = obj; p = data; - nxt_log_debug(thr->log, "event conn proxy client first read fd:%d", - client->socket.fd); + nxt_debug(task, "event conn proxy client first read fd:%d", + client->socket.fd); b = nxt_buf_mem_alloc(client->mem_pool, p->client_buffer_size, NXT_MEM_BUF_CUTBACK | NXT_MEM_BUF_USABLE); if (nxt_slow_path(b == NULL)) { /* An error completion. */ - nxt_event_conn_proxy_complete(thr, p); + nxt_event_conn_proxy_complete(task, p); return; } @@ -203,7 +201,7 @@ nxt_event_conn_proxy_client_buffer_alloc(nxt_thread_t *thr, void *obj, client->read_state = &nxt_event_conn_proxy_client_first_read_state; } - nxt_event_conn_read(thr, client); + nxt_event_conn_read(task, client); } @@ -224,7 +222,7 @@ static const nxt_event_conn_state_t static void -nxt_event_conn_proxy_peer_connect(nxt_thread_t *thr, void *obj, void *data) +nxt_event_conn_proxy_peer_connect(nxt_task_t *task, void *obj, void *data) { nxt_event_conn_t *client; nxt_event_conn_proxy_t *p; @@ -243,7 +241,7 @@ nxt_event_conn_proxy_peer_connect(nxt_thread_t *thr, void *obj, void *data) p->peer->write_state = &nxt_event_conn_proxy_peer_connect_state; - nxt_event_conn_connect(thr, p->peer); + nxt_event_conn_connect(task, p->peer); } @@ -264,7 +262,7 @@ static const nxt_event_conn_state_t nxt_event_conn_proxy_peer_connect_state static void -nxt_event_conn_proxy_connected(nxt_thread_t *thr, void *obj, void *data) +nxt_event_conn_proxy_connected(nxt_task_t *task, void *obj, void *data) { nxt_event_conn_t *client, *peer; nxt_event_conn_proxy_t *p; @@ -272,8 +270,7 @@ nxt_event_conn_proxy_connected(nxt_thread_t *thr, void *obj, void *data) peer = obj; p = data; - nxt_log_debug(thr->log, "event conn proxy connected fd:%d", - peer->socket.fd); + nxt_debug(task, "event conn proxy connected fd:%d", peer->socket.fd); p->connected = 1; @@ -285,7 +282,7 @@ nxt_event_conn_proxy_connected(nxt_thread_t *thr, void *obj, void *data) peer->read_state = &nxt_event_conn_proxy_peer_wait_state; peer->write_state = &nxt_event_conn_proxy_peer_write_state; - nxt_event_conn_read_enqueue(thr, peer); + nxt_event_conn_read_enqueue(task->thread, task, peer); if (p->client_buffer != NULL) { client = p->client; @@ -296,7 +293,7 @@ nxt_event_conn_proxy_connected(nxt_thread_t *thr, void *obj, void *data) * Send a client read data to the connected peer. * Client write event: blocked. */ - nxt_event_conn_proxy_read_process(thr, p, client, peer); + nxt_event_conn_proxy_read_process(task, p, client, peer); } } @@ -318,7 +315,7 @@ static const nxt_event_conn_state_t nxt_event_conn_proxy_peer_wait_state static void -nxt_event_conn_proxy_peer_read(nxt_thread_t *thr, void *obj, void *data) +nxt_event_conn_proxy_peer_read(nxt_task_t *task, void *obj, void *data) { nxt_buf_t *b; nxt_event_conn_t *peer; @@ -327,15 +324,14 @@ nxt_event_conn_proxy_peer_read(nxt_thread_t *thr, void *obj, void *data) peer = obj; p = data; - nxt_log_debug(thr->log, "event conn proxy peer read fd:%d", - peer->socket.fd); + nxt_debug(task, "event conn proxy peer read fd:%d", peer->socket.fd); b = nxt_buf_mem_alloc(peer->mem_pool, p->peer_buffer_size, NXT_MEM_BUF_CUTBACK | NXT_MEM_BUF_USABLE); if (nxt_slow_path(b == NULL)) { /* An error completion. */ - nxt_event_conn_proxy_complete(thr, p); + nxt_event_conn_proxy_complete(task, p); return; } @@ -352,7 +348,7 @@ nxt_event_conn_proxy_peer_read(nxt_thread_t *thr, void *obj, void *data) * Peer read event: waiting with possible peer_wait_timeout. * Peer write event: blocked. */ - nxt_event_conn_read(thr, peer); + nxt_event_conn_read(task, peer); } @@ -373,8 +369,7 @@ static const nxt_event_conn_state_t nxt_event_conn_proxy_client_read_state static void -nxt_event_conn_proxy_client_read_ready(nxt_thread_t *thr, void *obj, - void *data) +nxt_event_conn_proxy_client_read_ready(nxt_task_t *task, void *obj, void *data) { nxt_event_conn_t *client; nxt_event_conn_proxy_t *p; @@ -382,10 +377,10 @@ nxt_event_conn_proxy_client_read_ready(nxt_thread_t *thr, void *obj, client = obj; p = data; - nxt_log_debug(thr->log, "event conn proxy client read ready fd:%d", - client->socket.fd); + nxt_debug(task, "event conn proxy client read ready fd:%d", + client->socket.fd); - nxt_event_conn_proxy_read_process(thr, p, client, p->peer); + nxt_event_conn_proxy_read_process(task, p, client, p->peer); } @@ -406,8 +401,7 @@ static const nxt_event_conn_state_t nxt_event_conn_proxy_peer_read_state static void -nxt_event_conn_proxy_peer_read_ready(nxt_thread_t *thr, void *obj, - void *data) +nxt_event_conn_proxy_peer_read_ready(nxt_task_t *task, void *obj, void *data) { nxt_event_conn_t *peer; nxt_event_conn_proxy_t *p; @@ -415,24 +409,23 @@ nxt_event_conn_proxy_peer_read_ready(nxt_thread_t *thr, void *obj, peer = obj; p = data; - nxt_log_debug(thr->log, "event conn proxy peer read ready fd:%d", - peer->socket.fd); + nxt_debug(task, "event conn proxy peer read ready fd:%d", peer->socket.fd); - nxt_event_conn_proxy_read_process(thr, p, peer, p->client); + nxt_event_conn_proxy_read_process(task, p, peer, p->client); } static void -nxt_event_conn_proxy_read_process(nxt_thread_t *thr, nxt_event_conn_proxy_t *p, +nxt_event_conn_proxy_read_process(nxt_task_t *task, nxt_event_conn_proxy_t *p, nxt_event_conn_t *source, nxt_event_conn_t *sink) { nxt_buf_t *rb, *wb; if (sink->socket.error != 0) { - nxt_log_debug(thr->log, "event conn proxy sink fd:%d error:%d", - sink->socket.fd, sink->socket.error); + nxt_debug(task, "event conn proxy sink fd:%d error:%d", + sink->socket.fd, sink->socket.error); - nxt_event_conn_proxy_write_error(thr, sink, sink->socket.data); + nxt_event_conn_proxy_write_error(task, sink, sink->socket.data); return; } @@ -447,7 +440,7 @@ nxt_event_conn_proxy_read_process(nxt_thread_t *thr, nxt_event_conn_proxy_t *p, wb = nxt_buf_mem_alloc(source->mem_pool, 0, 0); if (wb == NULL) { /* An error completion. */ - nxt_event_conn_proxy_complete(thr, p); + nxt_event_conn_proxy_complete(task, p); return; } @@ -463,10 +456,9 @@ nxt_event_conn_proxy_read_process(nxt_thread_t *thr, nxt_event_conn_proxy_t *p, } if (rb->mem.start != rb->mem.end) { - nxt_thread_work_queue_push(thr, source->read_work_queue, + nxt_thread_work_queue_push(task->thread, source->read_work_queue, nxt_event_conn_proxy_read, - source, source->socket.data, - source->socket.log); + task, source, source->socket.data); break; } @@ -475,7 +467,7 @@ nxt_event_conn_proxy_read_process(nxt_thread_t *thr, nxt_event_conn_proxy_t *p, } if (p->connected) { - nxt_event_conn_write_enqueue(thr, sink); + nxt_event_conn_write_enqueue(task->thread, task, sink); } } @@ -539,7 +531,7 @@ nxt_event_conn_proxy_write_add(nxt_event_conn_t *c, nxt_buf_t *b) static void -nxt_event_conn_proxy_read(nxt_thread_t *thr, void *obj, void *data) +nxt_event_conn_proxy_read(nxt_task_t *task, void *obj, void *data) { nxt_event_conn_t *source, *sink; nxt_event_conn_proxy_t *p; @@ -547,13 +539,13 @@ nxt_event_conn_proxy_read(nxt_thread_t *thr, void *obj, void *data) source = obj; p = data; - nxt_log_debug(thr->log, "event conn proxy read fd:%d", source->socket.fd); + nxt_debug(task, "event conn proxy read fd:%d", source->socket.fd); if (!source->socket.closed) { sink = (source == p->client) ? p->peer : p->client; if (sink->socket.error == 0) { - nxt_event_conn_read(thr, source); + nxt_event_conn_read(task, source); } } } @@ -576,8 +568,7 @@ static const nxt_event_conn_state_t nxt_event_conn_proxy_client_write_state static void -nxt_event_conn_proxy_client_write_ready(nxt_thread_t *thr, void *obj, - void *data) +nxt_event_conn_proxy_client_write_ready(nxt_task_t *task, void *obj, void *data) { nxt_event_conn_t *client; nxt_event_conn_proxy_t *p; @@ -585,10 +576,10 @@ nxt_event_conn_proxy_client_write_ready(nxt_thread_t *thr, void *obj, client = obj; p = data; - nxt_log_debug(thr->log, "event conn proxy client write ready fd:%d", - client->socket.fd); + nxt_debug(task, "event conn proxy client write ready fd:%d", + client->socket.fd); - nxt_event_conn_proxy_write_process(thr, p, client, p->peer); + nxt_event_conn_proxy_write_process(task, p, client, p->peer); } @@ -609,7 +600,7 @@ static const nxt_event_conn_state_t nxt_event_conn_proxy_peer_write_state static void -nxt_event_conn_proxy_peer_write_ready(nxt_thread_t *thr, void *obj, void *data) +nxt_event_conn_proxy_peer_write_ready(nxt_task_t *task, void *obj, void *data) { nxt_event_conn_t *peer; nxt_event_conn_proxy_t *p; @@ -617,15 +608,14 @@ nxt_event_conn_proxy_peer_write_ready(nxt_thread_t *thr, void *obj, void *data) peer = obj; p = data; - nxt_log_debug(thr->log, "event conn proxy peer write ready fd:%d", - peer->socket.fd); + nxt_debug(task, "event conn proxy peer write ready fd:%d", peer->socket.fd); - nxt_event_conn_proxy_write_process(thr, p, peer, p->client); + nxt_event_conn_proxy_write_process(task, p, peer, p->client); } static void -nxt_event_conn_proxy_write_process(nxt_thread_t *thr, nxt_event_conn_proxy_t *p, +nxt_event_conn_proxy_write_process(nxt_task_t *task, nxt_event_conn_proxy_t *p, nxt_event_conn_t *sink, nxt_event_conn_t *source) { nxt_buf_t *rb, *wb; @@ -640,7 +630,7 @@ nxt_event_conn_proxy_write_process(nxt_thread_t *thr, nxt_event_conn_proxy_t *p, sink->write = NULL; nxt_buf_free(sink->mem_pool, wb); - nxt_event_conn_proxy_shutdown(thr, p, source, sink); + nxt_event_conn_proxy_shutdown(task, p, source, sink); return; } @@ -651,7 +641,7 @@ nxt_event_conn_proxy_write_process(nxt_thread_t *thr, nxt_event_conn_proxy_t *p, rb = nxt_buf_mem_alloc(sink->mem_pool, 0, 0); if (rb == NULL) { /* An error completion. */ - nxt_event_conn_proxy_complete(thr, p); + nxt_event_conn_proxy_complete(task, p); return; } @@ -666,7 +656,7 @@ nxt_event_conn_proxy_write_process(nxt_thread_t *thr, nxt_event_conn_proxy_t *p, } if (wb->mem.pos != wb->mem.free) { - nxt_event_conn_write_enqueue(thr, sink); + nxt_event_conn_write_enqueue(task->thread, task, sink); break; } @@ -675,9 +665,9 @@ nxt_event_conn_proxy_write_process(nxt_thread_t *thr, nxt_event_conn_proxy_t *p, nxt_buf_free(sink->mem_pool, wb); } - nxt_thread_work_queue_push(thr, source->read_work_queue, - nxt_event_conn_proxy_read, source, - source->socket.data, source->socket.log); + nxt_thread_work_queue_push(task->thread, source->read_work_queue, + nxt_event_conn_proxy_read, task, source, + source->socket.data); } @@ -760,7 +750,7 @@ nxt_event_conn_proxy_read_add(nxt_event_conn_t *c, nxt_buf_t *b) static void -nxt_event_conn_proxy_close(nxt_thread_t *thr, void *obj, void *data) +nxt_event_conn_proxy_close(nxt_task_t *task, void *obj, void *data) { nxt_buf_t *b; nxt_event_conn_t *source, *sink; @@ -769,19 +759,19 @@ nxt_event_conn_proxy_close(nxt_thread_t *thr, void *obj, void *data) source = obj; p = data; - nxt_log_debug(thr->log, "event conn proxy close fd:%d", source->socket.fd); + nxt_debug(task, "event conn proxy close fd:%d", source->socket.fd); sink = (source == p->client) ? p->peer : p->client; if (sink->write == NULL) { - nxt_event_conn_proxy_shutdown(thr, p, source, sink); + nxt_event_conn_proxy_shutdown(task, p, source, sink); return; } b = nxt_buf_sync_alloc(source->mem_pool, 0); if (b == NULL) { /* An error completion. */ - nxt_event_conn_proxy_complete(thr, p); + nxt_event_conn_proxy_complete(task, p); return; } @@ -790,7 +780,7 @@ nxt_event_conn_proxy_close(nxt_thread_t *thr, void *obj, void *data) static void -nxt_event_conn_proxy_error(nxt_thread_t *thr, void *obj, void *data) +nxt_event_conn_proxy_error(nxt_task_t *task, void *obj, void *data) { nxt_event_conn_t *c; nxt_event_conn_proxy_t *p; @@ -798,14 +788,14 @@ nxt_event_conn_proxy_error(nxt_thread_t *thr, void *obj, void *data) c = obj; p = data; - nxt_log_debug(thr->log, "event conn proxy error fd:%d", c->socket.fd); + nxt_debug(task, "event conn proxy error fd:%d", c->socket.fd); - nxt_event_conn_proxy_close(thr, c, p); + nxt_event_conn_proxy_close(task, c, p); } static void -nxt_event_conn_proxy_read_timeout(nxt_thread_t *thr, void *obj, void *data) +nxt_event_conn_proxy_read_timeout(nxt_task_t *task, void *obj, void *data) { nxt_event_conn_t *c; nxt_event_timer_t *ev; @@ -816,15 +806,14 @@ nxt_event_conn_proxy_read_timeout(nxt_thread_t *thr, void *obj, void *data) c->socket.timedout = 1; c->socket.closed = 1; - nxt_log_debug(thr->log, "event conn proxy read timeout fd:%d", - c->socket.fd); + nxt_debug(task, "event conn proxy read timeout fd:%d", c->socket.fd); - nxt_event_conn_proxy_close(thr, c, c->socket.data); + nxt_event_conn_proxy_close(task, c, c->socket.data); } static void -nxt_event_conn_proxy_write_timeout(nxt_thread_t *thr, void *obj, void *data) +nxt_event_conn_proxy_write_timeout(nxt_task_t *task, void *obj, void *data) { nxt_event_conn_t *c; nxt_event_timer_t *ev; @@ -835,10 +824,9 @@ nxt_event_conn_proxy_write_timeout(nxt_thread_t *thr, void *obj, void *data) c->socket.timedout = 1; c->socket.closed = 1; - nxt_log_debug(thr->log, "event conn proxy write timeout fd:%d", - c->socket.fd); + nxt_debug(task, "event conn proxy write timeout fd:%d", c->socket.fd); - nxt_event_conn_proxy_close(thr, c, c->socket.data); + nxt_event_conn_proxy_close(task, c, c->socket.data); } @@ -857,7 +845,7 @@ nxt_event_conn_proxy_timeout_value(nxt_event_conn_t *c, uintptr_t data) static void -nxt_event_conn_proxy_refused(nxt_thread_t *thr, void *obj, void *data) +nxt_event_conn_proxy_refused(nxt_task_t *task, void *obj, void *data) { nxt_event_conn_t *peer; nxt_event_conn_proxy_t *p; @@ -865,11 +853,11 @@ nxt_event_conn_proxy_refused(nxt_thread_t *thr, void *obj, void *data) peer = obj; p = data; - nxt_log_debug(thr->log, "event conn proxy refused fd:%d", peer->socket.fd); + nxt_debug(task, "event conn proxy refused fd:%d", peer->socket.fd); if (p->retries == 0) { /* An error completion. */ - nxt_event_conn_proxy_complete(thr, p); + nxt_event_conn_proxy_complete(task, p); return; } @@ -882,13 +870,13 @@ nxt_event_conn_proxy_refused(nxt_thread_t *thr, void *obj, void *data) p->delayed = 1; peer->write_timer.handler = nxt_event_conn_proxy_reconnect_handler; - nxt_event_timer_add(thr->engine, &peer->write_timer, p->reconnect_timeout); + nxt_event_timer_add(task->thread->engine, &peer->write_timer, + p->reconnect_timeout); } static void -nxt_event_conn_proxy_reconnect_handler(nxt_thread_t *thr, void *obj, - void *data) +nxt_event_conn_proxy_reconnect_handler(nxt_task_t *task, void *obj, void *data) { nxt_event_conn_t *peer; nxt_event_timer_t *ev; @@ -896,13 +884,13 @@ nxt_event_conn_proxy_reconnect_handler(nxt_thread_t *thr, void *obj, ev = obj; - nxt_log_debug(thr->log, "event conn proxy reconnect timer"); + nxt_debug(task, "event conn proxy reconnect timer"); peer = nxt_event_write_timer_conn(ev); p = peer->socket.data; if (p->client->socket.closed) { - nxt_event_conn_proxy_complete(thr, p); + nxt_event_conn_proxy_complete(task, p); return; } @@ -913,27 +901,27 @@ nxt_event_conn_proxy_reconnect_handler(nxt_thread_t *thr, void *obj, * Peer read event: disabled. * Peer write event: waiting for connection with connect_timeout. */ - nxt_event_conn_connect(thr, peer); + nxt_event_conn_connect(task, peer); } static void -nxt_event_conn_proxy_shutdown(nxt_thread_t *thr, nxt_event_conn_proxy_t *p, +nxt_event_conn_proxy_shutdown(nxt_task_t *task, nxt_event_conn_proxy_t *p, nxt_event_conn_t *source, nxt_event_conn_t *sink) { nxt_buf_t *b; - nxt_log_debug(source->socket.log, - "event conn proxy shutdown source fd:%d cl:%d err:%d", - source->socket.fd, source->socket.closed, - source->socket.error); + nxt_debug(source->socket.task, + "event conn proxy shutdown source fd:%d cl:%d err:%d", + source->socket.fd, source->socket.closed, + source->socket.error); - nxt_log_debug(sink->socket.log, - "event conn proxy shutdown sink fd:%d cl:%d err:%d", - sink->socket.fd, sink->socket.closed, sink->socket.error); + nxt_debug(sink->socket.task, + "event conn proxy shutdown sink fd:%d cl:%d err:%d", + sink->socket.fd, sink->socket.closed, sink->socket.error); if (!p->connected || p->delayed) { - nxt_event_conn_proxy_complete(thr, p); + nxt_event_conn_proxy_complete(task, p); return; } @@ -943,7 +931,7 @@ nxt_event_conn_proxy_shutdown(nxt_thread_t *thr, nxt_event_conn_proxy_t *p, * remote side so the shutdown() syscall is surplus * since the close() syscall also sends FIN. */ - nxt_event_conn_close(thr, sink); + nxt_event_conn_close(task, sink); } else { nxt_socket_shutdown(sink->socket.fd, SHUT_WR); @@ -953,7 +941,7 @@ nxt_event_conn_proxy_shutdown(nxt_thread_t *thr, nxt_event_conn_proxy_t *p, || (sink->socket.closed && source->write == NULL)) { /* The opposite direction also has been already closed. */ - nxt_event_conn_proxy_complete(thr, p); + nxt_event_conn_proxy_complete(task, p); return; } @@ -964,7 +952,7 @@ nxt_event_conn_proxy_shutdown(nxt_thread_t *thr, nxt_event_conn_proxy_t *p, static void -nxt_event_conn_proxy_read_error(nxt_thread_t *thr, void *obj, void *data) +nxt_event_conn_proxy_read_error(nxt_task_t *task, void *obj, void *data) { nxt_event_conn_t *c; nxt_event_conn_proxy_t *p; @@ -972,14 +960,14 @@ nxt_event_conn_proxy_read_error(nxt_thread_t *thr, void *obj, void *data) c = obj; p = data; - nxt_log_debug(thr->log, "event conn proxy read error fd:%d", c->socket.fd); + nxt_debug(task, "event conn proxy read error fd:%d", c->socket.fd); - nxt_event_conn_proxy_close(thr, c, p); + nxt_event_conn_proxy_close(task, c, p); } static void -nxt_event_conn_proxy_write_error(nxt_thread_t *thr, void *obj, void *data) +nxt_event_conn_proxy_write_error(nxt_task_t *task, void *obj, void *data) { nxt_event_conn_t *source, *sink; nxt_event_conn_proxy_t *p; @@ -987,48 +975,47 @@ nxt_event_conn_proxy_write_error(nxt_thread_t *thr, void *obj, void *data) sink = obj; p = data; - nxt_log_debug(thr->log, "event conn proxy write error fd:%d", - sink->socket.fd); + nxt_debug(task, "event conn proxy write error fd:%d", sink->socket.fd); /* Clear data for the direction sink. */ sink->write = NULL; /* Block the direction source. */ source = (sink == p->client) ? p->peer : p->client; - nxt_event_fd_block_read(thr->engine, &source->socket); + nxt_event_fd_block_read(task->thread->engine, &source->socket); if (source->write == NULL) { /* * There is no data for the opposite direction and * the next read from the sink will most probably fail. */ - nxt_event_conn_proxy_complete(thr, p); + nxt_event_conn_proxy_complete(task, p); } } static void -nxt_event_conn_proxy_complete(nxt_thread_t *thr, nxt_event_conn_proxy_t *p) +nxt_event_conn_proxy_complete(nxt_task_t *task, nxt_event_conn_proxy_t *p) { - nxt_log_debug(p->client->socket.log, "event conn proxy complete %d:%d", - p->client->socket.fd, p->peer->socket.fd); + nxt_debug(p->client->socket.task, "event conn proxy complete %d:%d", + p->client->socket.fd, p->peer->socket.fd); if (p->client->socket.fd != -1) { - nxt_event_conn_close(thr, p->client); + nxt_event_conn_close(task, p->client); } if (p->peer->socket.fd != -1) { - nxt_event_conn_close(thr, p->peer); + nxt_event_conn_close(task, p->peer); } else if (p->delayed) { - nxt_thread_work_queue_drop(thr, &p->peer->write_timer); + nxt_thread_work_queue_drop(task->thread, &p->peer->write_timer); nxt_queue_remove(&p->peer->link); - nxt_event_timer_delete(thr->engine, &p->peer->write_timer); + nxt_event_timer_delete(task->thread->engine, &p->peer->write_timer); } nxt_mem_free(p->client->mem_pool, p->client_buffer); nxt_mem_free(p->client->mem_pool, p->peer_buffer); - p->completion_handler(thr, p, NULL); + p->completion_handler(task, p, NULL); } |