summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_event_conn_proxy.c
diff options
context:
space:
mode:
authorIgor Sysoev <igor@sysoev.ru>2017-01-23 19:56:03 +0300
committerIgor Sysoev <igor@sysoev.ru>2017-01-23 19:56:03 +0300
commitde532922d9ab42aa15b40d47c8db53ac2af38500 (patch)
treed6b7c6b21c7a6e0e3620a3e0c7198e63454d05e3 /src/nxt_event_conn_proxy.c
parent16cbf3c076a0aca6d47adaf3f719493674cf2363 (diff)
downloadunit-de532922d9ab42aa15b40d47c8db53ac2af38500.tar.gz
unit-de532922d9ab42aa15b40d47c8db53ac2af38500.tar.bz2
Introducing tasks.
Diffstat (limited to '')
-rw-r--r--src/nxt_event_conn_proxy.c257
1 files changed, 122 insertions, 135 deletions
diff --git a/src/nxt_event_conn_proxy.c b/src/nxt_event_conn_proxy.c
index 0b9afe95..0a2a6474 100644
--- a/src/nxt_event_conn_proxy.c
+++ b/src/nxt_event_conn_proxy.c
@@ -7,53 +7,51 @@
#include <nxt_main.h>
-static void nxt_event_conn_proxy_client_buffer_alloc(nxt_thread_t *thr,
+static void nxt_event_conn_proxy_client_buffer_alloc(nxt_task_t *task,
void *obj, void *data);
-static void nxt_event_conn_proxy_peer_connect(nxt_thread_t *thr, void *obj,
+static void nxt_event_conn_proxy_peer_connect(nxt_task_t *task, void *obj,
void *data);
-static void nxt_event_conn_proxy_connected(nxt_thread_t *thr, void *obj,
+static void nxt_event_conn_proxy_connected(nxt_task_t *task, void *obj,
void *data);
-static void nxt_event_conn_proxy_peer_read(nxt_thread_t *thr, void *obj,
+static void nxt_event_conn_proxy_peer_read(nxt_task_t *task, void *obj,
void *data);
-static void nxt_event_conn_proxy_client_read_ready(nxt_thread_t *thr,
- void *obj, void *data);
-static void nxt_event_conn_proxy_peer_read_ready(nxt_thread_t *thr, void *obj,
+static void nxt_event_conn_proxy_client_read_ready(nxt_task_t *task, void *obj,
+ void *data);
+static void nxt_event_conn_proxy_peer_read_ready(nxt_task_t *task, void *obj,
void *data);
-static void nxt_event_conn_proxy_read_process(nxt_thread_t *thr,
+static void nxt_event_conn_proxy_read_process(nxt_task_t *task,
nxt_event_conn_proxy_t *p, nxt_event_conn_t *source,
nxt_event_conn_t *sink);
static void nxt_event_conn_proxy_write_add(nxt_event_conn_t *c, nxt_buf_t *b);
-static void nxt_event_conn_proxy_read(nxt_thread_t *thr, void *obj, void *data);
-static void nxt_event_conn_proxy_client_write_ready(nxt_thread_t *thr,
- void *obj, void *data);
-static void nxt_event_conn_proxy_peer_write_ready(nxt_thread_t *thr, void *obj,
+static void nxt_event_conn_proxy_read(nxt_task_t *task, void *obj, void *data);
+static void nxt_event_conn_proxy_client_write_ready(nxt_task_t *task, void *obj,
void *data);
-static void nxt_event_conn_proxy_write_process(nxt_thread_t *thr,
+static void nxt_event_conn_proxy_peer_write_ready(nxt_task_t *task, void *obj,
+ void *data);
+static void nxt_event_conn_proxy_write_process(nxt_task_t *task,
nxt_event_conn_proxy_t *p, nxt_event_conn_t *sink,
nxt_event_conn_t *source);
static void nxt_event_conn_proxy_read_add(nxt_event_conn_t *c, nxt_buf_t *b);
-static void nxt_event_conn_proxy_close(nxt_thread_t *thr, void *obj,
- void *data);
-static void nxt_event_conn_proxy_error(nxt_thread_t *thr, void *obj,
- void *data);
-static void nxt_event_conn_proxy_read_timeout(nxt_thread_t *thr, void *obj,
+static void nxt_event_conn_proxy_close(nxt_task_t *task, void *obj, void *data);
+static void nxt_event_conn_proxy_error(nxt_task_t *task, void *obj, void *data);
+static void nxt_event_conn_proxy_read_timeout(nxt_task_t *task, void *obj,
void *data);
-static void nxt_event_conn_proxy_write_timeout(nxt_thread_t *thr, void *obj,
+static void nxt_event_conn_proxy_write_timeout(nxt_task_t *task, void *obj,
void *data);
static nxt_msec_t nxt_event_conn_proxy_timeout_value(nxt_event_conn_t *c,
uintptr_t data);
-static void nxt_event_conn_proxy_refused(nxt_thread_t *thr, void *obj,
+static void nxt_event_conn_proxy_refused(nxt_task_t *task, void *obj,
void *data);
-static void nxt_event_conn_proxy_reconnect_handler(nxt_thread_t *thr,
- void *obj, void *data);
-static void nxt_event_conn_proxy_shutdown(nxt_thread_t *thr,
+static void nxt_event_conn_proxy_reconnect_handler(nxt_task_t *task, void *obj,
+ void *data);
+static void nxt_event_conn_proxy_shutdown(nxt_task_t *task,
nxt_event_conn_proxy_t *p, nxt_event_conn_t *source,
nxt_event_conn_t *sink);
-static void nxt_event_conn_proxy_read_error(nxt_thread_t *thr, void *obj,
+static void nxt_event_conn_proxy_read_error(nxt_task_t *task, void *obj,
void *data);
-static void nxt_event_conn_proxy_write_error(nxt_thread_t *thr, void *obj,
+static void nxt_event_conn_proxy_write_error(nxt_task_t *task, void *obj,
void *data);
-static void nxt_event_conn_proxy_complete(nxt_thread_t *thr,
+static void nxt_event_conn_proxy_complete(nxt_task_t *task,
nxt_event_conn_proxy_t *p);
@@ -107,7 +105,7 @@ nxt_event_conn_proxy_create(nxt_event_conn_t *client)
void
-nxt_event_conn_proxy(nxt_event_conn_proxy_t *p)
+nxt_event_conn_proxy(nxt_task_t *task, nxt_event_conn_proxy_t *p)
{
nxt_thread_t *thr;
nxt_event_conn_t *peer;
@@ -127,7 +125,7 @@ nxt_event_conn_proxy(nxt_event_conn_proxy_t *p)
peer = p->peer;
peer->write_state = &nxt_event_conn_proxy_peer_connect_state;
- nxt_event_conn_connect_enqueue(thr, peer);
+ nxt_event_conn_connect_enqueue(thr, task, peer);
}
/*
@@ -136,7 +134,7 @@ nxt_event_conn_proxy(nxt_event_conn_proxy_t *p)
*/
p->client->read_state = &nxt_event_conn_proxy_client_wait_state;
- nxt_event_conn_read(thr, p->client);
+ nxt_event_conn_read(task, p->client);
}
@@ -157,7 +155,7 @@ static const nxt_event_conn_state_t nxt_event_conn_proxy_client_wait_state
static void
-nxt_event_conn_proxy_client_buffer_alloc(nxt_thread_t *thr, void *obj,
+nxt_event_conn_proxy_client_buffer_alloc(nxt_task_t *task, void *obj,
void *data)
{
nxt_buf_t *b;
@@ -167,15 +165,15 @@ nxt_event_conn_proxy_client_buffer_alloc(nxt_thread_t *thr, void *obj,
client = obj;
p = data;
- nxt_log_debug(thr->log, "event conn proxy client first read fd:%d",
- client->socket.fd);
+ nxt_debug(task, "event conn proxy client first read fd:%d",
+ client->socket.fd);
b = nxt_buf_mem_alloc(client->mem_pool, p->client_buffer_size,
NXT_MEM_BUF_CUTBACK | NXT_MEM_BUF_USABLE);
if (nxt_slow_path(b == NULL)) {
/* An error completion. */
- nxt_event_conn_proxy_complete(thr, p);
+ nxt_event_conn_proxy_complete(task, p);
return;
}
@@ -203,7 +201,7 @@ nxt_event_conn_proxy_client_buffer_alloc(nxt_thread_t *thr, void *obj,
client->read_state = &nxt_event_conn_proxy_client_first_read_state;
}
- nxt_event_conn_read(thr, client);
+ nxt_event_conn_read(task, client);
}
@@ -224,7 +222,7 @@ static const nxt_event_conn_state_t
static void
-nxt_event_conn_proxy_peer_connect(nxt_thread_t *thr, void *obj, void *data)
+nxt_event_conn_proxy_peer_connect(nxt_task_t *task, void *obj, void *data)
{
nxt_event_conn_t *client;
nxt_event_conn_proxy_t *p;
@@ -243,7 +241,7 @@ nxt_event_conn_proxy_peer_connect(nxt_thread_t *thr, void *obj, void *data)
p->peer->write_state = &nxt_event_conn_proxy_peer_connect_state;
- nxt_event_conn_connect(thr, p->peer);
+ nxt_event_conn_connect(task, p->peer);
}
@@ -264,7 +262,7 @@ static const nxt_event_conn_state_t nxt_event_conn_proxy_peer_connect_state
static void
-nxt_event_conn_proxy_connected(nxt_thread_t *thr, void *obj, void *data)
+nxt_event_conn_proxy_connected(nxt_task_t *task, void *obj, void *data)
{
nxt_event_conn_t *client, *peer;
nxt_event_conn_proxy_t *p;
@@ -272,8 +270,7 @@ nxt_event_conn_proxy_connected(nxt_thread_t *thr, void *obj, void *data)
peer = obj;
p = data;
- nxt_log_debug(thr->log, "event conn proxy connected fd:%d",
- peer->socket.fd);
+ nxt_debug(task, "event conn proxy connected fd:%d", peer->socket.fd);
p->connected = 1;
@@ -285,7 +282,7 @@ nxt_event_conn_proxy_connected(nxt_thread_t *thr, void *obj, void *data)
peer->read_state = &nxt_event_conn_proxy_peer_wait_state;
peer->write_state = &nxt_event_conn_proxy_peer_write_state;
- nxt_event_conn_read_enqueue(thr, peer);
+ nxt_event_conn_read_enqueue(task->thread, task, peer);
if (p->client_buffer != NULL) {
client = p->client;
@@ -296,7 +293,7 @@ nxt_event_conn_proxy_connected(nxt_thread_t *thr, void *obj, void *data)
* Send a client read data to the connected peer.
* Client write event: blocked.
*/
- nxt_event_conn_proxy_read_process(thr, p, client, peer);
+ nxt_event_conn_proxy_read_process(task, p, client, peer);
}
}
@@ -318,7 +315,7 @@ static const nxt_event_conn_state_t nxt_event_conn_proxy_peer_wait_state
static void
-nxt_event_conn_proxy_peer_read(nxt_thread_t *thr, void *obj, void *data)
+nxt_event_conn_proxy_peer_read(nxt_task_t *task, void *obj, void *data)
{
nxt_buf_t *b;
nxt_event_conn_t *peer;
@@ -327,15 +324,14 @@ nxt_event_conn_proxy_peer_read(nxt_thread_t *thr, void *obj, void *data)
peer = obj;
p = data;
- nxt_log_debug(thr->log, "event conn proxy peer read fd:%d",
- peer->socket.fd);
+ nxt_debug(task, "event conn proxy peer read fd:%d", peer->socket.fd);
b = nxt_buf_mem_alloc(peer->mem_pool, p->peer_buffer_size,
NXT_MEM_BUF_CUTBACK | NXT_MEM_BUF_USABLE);
if (nxt_slow_path(b == NULL)) {
/* An error completion. */
- nxt_event_conn_proxy_complete(thr, p);
+ nxt_event_conn_proxy_complete(task, p);
return;
}
@@ -352,7 +348,7 @@ nxt_event_conn_proxy_peer_read(nxt_thread_t *thr, void *obj, void *data)
* Peer read event: waiting with possible peer_wait_timeout.
* Peer write event: blocked.
*/
- nxt_event_conn_read(thr, peer);
+ nxt_event_conn_read(task, peer);
}
@@ -373,8 +369,7 @@ static const nxt_event_conn_state_t nxt_event_conn_proxy_client_read_state
static void
-nxt_event_conn_proxy_client_read_ready(nxt_thread_t *thr, void *obj,
- void *data)
+nxt_event_conn_proxy_client_read_ready(nxt_task_t *task, void *obj, void *data)
{
nxt_event_conn_t *client;
nxt_event_conn_proxy_t *p;
@@ -382,10 +377,10 @@ nxt_event_conn_proxy_client_read_ready(nxt_thread_t *thr, void *obj,
client = obj;
p = data;
- nxt_log_debug(thr->log, "event conn proxy client read ready fd:%d",
- client->socket.fd);
+ nxt_debug(task, "event conn proxy client read ready fd:%d",
+ client->socket.fd);
- nxt_event_conn_proxy_read_process(thr, p, client, p->peer);
+ nxt_event_conn_proxy_read_process(task, p, client, p->peer);
}
@@ -406,8 +401,7 @@ static const nxt_event_conn_state_t nxt_event_conn_proxy_peer_read_state
static void
-nxt_event_conn_proxy_peer_read_ready(nxt_thread_t *thr, void *obj,
- void *data)
+nxt_event_conn_proxy_peer_read_ready(nxt_task_t *task, void *obj, void *data)
{
nxt_event_conn_t *peer;
nxt_event_conn_proxy_t *p;
@@ -415,24 +409,23 @@ nxt_event_conn_proxy_peer_read_ready(nxt_thread_t *thr, void *obj,
peer = obj;
p = data;
- nxt_log_debug(thr->log, "event conn proxy peer read ready fd:%d",
- peer->socket.fd);
+ nxt_debug(task, "event conn proxy peer read ready fd:%d", peer->socket.fd);
- nxt_event_conn_proxy_read_process(thr, p, peer, p->client);
+ nxt_event_conn_proxy_read_process(task, p, peer, p->client);
}
static void
-nxt_event_conn_proxy_read_process(nxt_thread_t *thr, nxt_event_conn_proxy_t *p,
+nxt_event_conn_proxy_read_process(nxt_task_t *task, nxt_event_conn_proxy_t *p,
nxt_event_conn_t *source, nxt_event_conn_t *sink)
{
nxt_buf_t *rb, *wb;
if (sink->socket.error != 0) {
- nxt_log_debug(thr->log, "event conn proxy sink fd:%d error:%d",
- sink->socket.fd, sink->socket.error);
+ nxt_debug(task, "event conn proxy sink fd:%d error:%d",
+ sink->socket.fd, sink->socket.error);
- nxt_event_conn_proxy_write_error(thr, sink, sink->socket.data);
+ nxt_event_conn_proxy_write_error(task, sink, sink->socket.data);
return;
}
@@ -447,7 +440,7 @@ nxt_event_conn_proxy_read_process(nxt_thread_t *thr, nxt_event_conn_proxy_t *p,
wb = nxt_buf_mem_alloc(source->mem_pool, 0, 0);
if (wb == NULL) {
/* An error completion. */
- nxt_event_conn_proxy_complete(thr, p);
+ nxt_event_conn_proxy_complete(task, p);
return;
}
@@ -463,10 +456,9 @@ nxt_event_conn_proxy_read_process(nxt_thread_t *thr, nxt_event_conn_proxy_t *p,
}
if (rb->mem.start != rb->mem.end) {
- nxt_thread_work_queue_push(thr, source->read_work_queue,
+ nxt_thread_work_queue_push(task->thread, source->read_work_queue,
nxt_event_conn_proxy_read,
- source, source->socket.data,
- source->socket.log);
+ task, source, source->socket.data);
break;
}
@@ -475,7 +467,7 @@ nxt_event_conn_proxy_read_process(nxt_thread_t *thr, nxt_event_conn_proxy_t *p,
}
if (p->connected) {
- nxt_event_conn_write_enqueue(thr, sink);
+ nxt_event_conn_write_enqueue(task->thread, task, sink);
}
}
@@ -539,7 +531,7 @@ nxt_event_conn_proxy_write_add(nxt_event_conn_t *c, nxt_buf_t *b)
static void
-nxt_event_conn_proxy_read(nxt_thread_t *thr, void *obj, void *data)
+nxt_event_conn_proxy_read(nxt_task_t *task, void *obj, void *data)
{
nxt_event_conn_t *source, *sink;
nxt_event_conn_proxy_t *p;
@@ -547,13 +539,13 @@ nxt_event_conn_proxy_read(nxt_thread_t *thr, void *obj, void *data)
source = obj;
p = data;
- nxt_log_debug(thr->log, "event conn proxy read fd:%d", source->socket.fd);
+ nxt_debug(task, "event conn proxy read fd:%d", source->socket.fd);
if (!source->socket.closed) {
sink = (source == p->client) ? p->peer : p->client;
if (sink->socket.error == 0) {
- nxt_event_conn_read(thr, source);
+ nxt_event_conn_read(task, source);
}
}
}
@@ -576,8 +568,7 @@ static const nxt_event_conn_state_t nxt_event_conn_proxy_client_write_state
static void
-nxt_event_conn_proxy_client_write_ready(nxt_thread_t *thr, void *obj,
- void *data)
+nxt_event_conn_proxy_client_write_ready(nxt_task_t *task, void *obj, void *data)
{
nxt_event_conn_t *client;
nxt_event_conn_proxy_t *p;
@@ -585,10 +576,10 @@ nxt_event_conn_proxy_client_write_ready(nxt_thread_t *thr, void *obj,
client = obj;
p = data;
- nxt_log_debug(thr->log, "event conn proxy client write ready fd:%d",
- client->socket.fd);
+ nxt_debug(task, "event conn proxy client write ready fd:%d",
+ client->socket.fd);
- nxt_event_conn_proxy_write_process(thr, p, client, p->peer);
+ nxt_event_conn_proxy_write_process(task, p, client, p->peer);
}
@@ -609,7 +600,7 @@ static const nxt_event_conn_state_t nxt_event_conn_proxy_peer_write_state
static void
-nxt_event_conn_proxy_peer_write_ready(nxt_thread_t *thr, void *obj, void *data)
+nxt_event_conn_proxy_peer_write_ready(nxt_task_t *task, void *obj, void *data)
{
nxt_event_conn_t *peer;
nxt_event_conn_proxy_t *p;
@@ -617,15 +608,14 @@ nxt_event_conn_proxy_peer_write_ready(nxt_thread_t *thr, void *obj, void *data)
peer = obj;
p = data;
- nxt_log_debug(thr->log, "event conn proxy peer write ready fd:%d",
- peer->socket.fd);
+ nxt_debug(task, "event conn proxy peer write ready fd:%d", peer->socket.fd);
- nxt_event_conn_proxy_write_process(thr, p, peer, p->client);
+ nxt_event_conn_proxy_write_process(task, p, peer, p->client);
}
static void
-nxt_event_conn_proxy_write_process(nxt_thread_t *thr, nxt_event_conn_proxy_t *p,
+nxt_event_conn_proxy_write_process(nxt_task_t *task, nxt_event_conn_proxy_t *p,
nxt_event_conn_t *sink, nxt_event_conn_t *source)
{
nxt_buf_t *rb, *wb;
@@ -640,7 +630,7 @@ nxt_event_conn_proxy_write_process(nxt_thread_t *thr, nxt_event_conn_proxy_t *p,
sink->write = NULL;
nxt_buf_free(sink->mem_pool, wb);
- nxt_event_conn_proxy_shutdown(thr, p, source, sink);
+ nxt_event_conn_proxy_shutdown(task, p, source, sink);
return;
}
@@ -651,7 +641,7 @@ nxt_event_conn_proxy_write_process(nxt_thread_t *thr, nxt_event_conn_proxy_t *p,
rb = nxt_buf_mem_alloc(sink->mem_pool, 0, 0);
if (rb == NULL) {
/* An error completion. */
- nxt_event_conn_proxy_complete(thr, p);
+ nxt_event_conn_proxy_complete(task, p);
return;
}
@@ -666,7 +656,7 @@ nxt_event_conn_proxy_write_process(nxt_thread_t *thr, nxt_event_conn_proxy_t *p,
}
if (wb->mem.pos != wb->mem.free) {
- nxt_event_conn_write_enqueue(thr, sink);
+ nxt_event_conn_write_enqueue(task->thread, task, sink);
break;
}
@@ -675,9 +665,9 @@ nxt_event_conn_proxy_write_process(nxt_thread_t *thr, nxt_event_conn_proxy_t *p,
nxt_buf_free(sink->mem_pool, wb);
}
- nxt_thread_work_queue_push(thr, source->read_work_queue,
- nxt_event_conn_proxy_read, source,
- source->socket.data, source->socket.log);
+ nxt_thread_work_queue_push(task->thread, source->read_work_queue,
+ nxt_event_conn_proxy_read, task, source,
+ source->socket.data);
}
@@ -760,7 +750,7 @@ nxt_event_conn_proxy_read_add(nxt_event_conn_t *c, nxt_buf_t *b)
static void
-nxt_event_conn_proxy_close(nxt_thread_t *thr, void *obj, void *data)
+nxt_event_conn_proxy_close(nxt_task_t *task, void *obj, void *data)
{
nxt_buf_t *b;
nxt_event_conn_t *source, *sink;
@@ -769,19 +759,19 @@ nxt_event_conn_proxy_close(nxt_thread_t *thr, void *obj, void *data)
source = obj;
p = data;
- nxt_log_debug(thr->log, "event conn proxy close fd:%d", source->socket.fd);
+ nxt_debug(task, "event conn proxy close fd:%d", source->socket.fd);
sink = (source == p->client) ? p->peer : p->client;
if (sink->write == NULL) {
- nxt_event_conn_proxy_shutdown(thr, p, source, sink);
+ nxt_event_conn_proxy_shutdown(task, p, source, sink);
return;
}
b = nxt_buf_sync_alloc(source->mem_pool, 0);
if (b == NULL) {
/* An error completion. */
- nxt_event_conn_proxy_complete(thr, p);
+ nxt_event_conn_proxy_complete(task, p);
return;
}
@@ -790,7 +780,7 @@ nxt_event_conn_proxy_close(nxt_thread_t *thr, void *obj, void *data)
static void
-nxt_event_conn_proxy_error(nxt_thread_t *thr, void *obj, void *data)
+nxt_event_conn_proxy_error(nxt_task_t *task, void *obj, void *data)
{
nxt_event_conn_t *c;
nxt_event_conn_proxy_t *p;
@@ -798,14 +788,14 @@ nxt_event_conn_proxy_error(nxt_thread_t *thr, void *obj, void *data)
c = obj;
p = data;
- nxt_log_debug(thr->log, "event conn proxy error fd:%d", c->socket.fd);
+ nxt_debug(task, "event conn proxy error fd:%d", c->socket.fd);
- nxt_event_conn_proxy_close(thr, c, p);
+ nxt_event_conn_proxy_close(task, c, p);
}
static void
-nxt_event_conn_proxy_read_timeout(nxt_thread_t *thr, void *obj, void *data)
+nxt_event_conn_proxy_read_timeout(nxt_task_t *task, void *obj, void *data)
{
nxt_event_conn_t *c;
nxt_event_timer_t *ev;
@@ -816,15 +806,14 @@ nxt_event_conn_proxy_read_timeout(nxt_thread_t *thr, void *obj, void *data)
c->socket.timedout = 1;
c->socket.closed = 1;
- nxt_log_debug(thr->log, "event conn proxy read timeout fd:%d",
- c->socket.fd);
+ nxt_debug(task, "event conn proxy read timeout fd:%d", c->socket.fd);
- nxt_event_conn_proxy_close(thr, c, c->socket.data);
+ nxt_event_conn_proxy_close(task, c, c->socket.data);
}
static void
-nxt_event_conn_proxy_write_timeout(nxt_thread_t *thr, void *obj, void *data)
+nxt_event_conn_proxy_write_timeout(nxt_task_t *task, void *obj, void *data)
{
nxt_event_conn_t *c;
nxt_event_timer_t *ev;
@@ -835,10 +824,9 @@ nxt_event_conn_proxy_write_timeout(nxt_thread_t *thr, void *obj, void *data)
c->socket.timedout = 1;
c->socket.closed = 1;
- nxt_log_debug(thr->log, "event conn proxy write timeout fd:%d",
- c->socket.fd);
+ nxt_debug(task, "event conn proxy write timeout fd:%d", c->socket.fd);
- nxt_event_conn_proxy_close(thr, c, c->socket.data);
+ nxt_event_conn_proxy_close(task, c, c->socket.data);
}
@@ -857,7 +845,7 @@ nxt_event_conn_proxy_timeout_value(nxt_event_conn_t *c, uintptr_t data)
static void
-nxt_event_conn_proxy_refused(nxt_thread_t *thr, void *obj, void *data)
+nxt_event_conn_proxy_refused(nxt_task_t *task, void *obj, void *data)
{
nxt_event_conn_t *peer;
nxt_event_conn_proxy_t *p;
@@ -865,11 +853,11 @@ nxt_event_conn_proxy_refused(nxt_thread_t *thr, void *obj, void *data)
peer = obj;
p = data;
- nxt_log_debug(thr->log, "event conn proxy refused fd:%d", peer->socket.fd);
+ nxt_debug(task, "event conn proxy refused fd:%d", peer->socket.fd);
if (p->retries == 0) {
/* An error completion. */
- nxt_event_conn_proxy_complete(thr, p);
+ nxt_event_conn_proxy_complete(task, p);
return;
}
@@ -882,13 +870,13 @@ nxt_event_conn_proxy_refused(nxt_thread_t *thr, void *obj, void *data)
p->delayed = 1;
peer->write_timer.handler = nxt_event_conn_proxy_reconnect_handler;
- nxt_event_timer_add(thr->engine, &peer->write_timer, p->reconnect_timeout);
+ nxt_event_timer_add(task->thread->engine, &peer->write_timer,
+ p->reconnect_timeout);
}
static void
-nxt_event_conn_proxy_reconnect_handler(nxt_thread_t *thr, void *obj,
- void *data)
+nxt_event_conn_proxy_reconnect_handler(nxt_task_t *task, void *obj, void *data)
{
nxt_event_conn_t *peer;
nxt_event_timer_t *ev;
@@ -896,13 +884,13 @@ nxt_event_conn_proxy_reconnect_handler(nxt_thread_t *thr, void *obj,
ev = obj;
- nxt_log_debug(thr->log, "event conn proxy reconnect timer");
+ nxt_debug(task, "event conn proxy reconnect timer");
peer = nxt_event_write_timer_conn(ev);
p = peer->socket.data;
if (p->client->socket.closed) {
- nxt_event_conn_proxy_complete(thr, p);
+ nxt_event_conn_proxy_complete(task, p);
return;
}
@@ -913,27 +901,27 @@ nxt_event_conn_proxy_reconnect_handler(nxt_thread_t *thr, void *obj,
* Peer read event: disabled.
* Peer write event: waiting for connection with connect_timeout.
*/
- nxt_event_conn_connect(thr, peer);
+ nxt_event_conn_connect(task, peer);
}
static void
-nxt_event_conn_proxy_shutdown(nxt_thread_t *thr, nxt_event_conn_proxy_t *p,
+nxt_event_conn_proxy_shutdown(nxt_task_t *task, nxt_event_conn_proxy_t *p,
nxt_event_conn_t *source, nxt_event_conn_t *sink)
{
nxt_buf_t *b;
- nxt_log_debug(source->socket.log,
- "event conn proxy shutdown source fd:%d cl:%d err:%d",
- source->socket.fd, source->socket.closed,
- source->socket.error);
+ nxt_debug(source->socket.task,
+ "event conn proxy shutdown source fd:%d cl:%d err:%d",
+ source->socket.fd, source->socket.closed,
+ source->socket.error);
- nxt_log_debug(sink->socket.log,
- "event conn proxy shutdown sink fd:%d cl:%d err:%d",
- sink->socket.fd, sink->socket.closed, sink->socket.error);
+ nxt_debug(sink->socket.task,
+ "event conn proxy shutdown sink fd:%d cl:%d err:%d",
+ sink->socket.fd, sink->socket.closed, sink->socket.error);
if (!p->connected || p->delayed) {
- nxt_event_conn_proxy_complete(thr, p);
+ nxt_event_conn_proxy_complete(task, p);
return;
}
@@ -943,7 +931,7 @@ nxt_event_conn_proxy_shutdown(nxt_thread_t *thr, nxt_event_conn_proxy_t *p,
* remote side so the shutdown() syscall is surplus
* since the close() syscall also sends FIN.
*/
- nxt_event_conn_close(thr, sink);
+ nxt_event_conn_close(task, sink);
} else {
nxt_socket_shutdown(sink->socket.fd, SHUT_WR);
@@ -953,7 +941,7 @@ nxt_event_conn_proxy_shutdown(nxt_thread_t *thr, nxt_event_conn_proxy_t *p,
|| (sink->socket.closed && source->write == NULL))
{
/* The opposite direction also has been already closed. */
- nxt_event_conn_proxy_complete(thr, p);
+ nxt_event_conn_proxy_complete(task, p);
return;
}
@@ -964,7 +952,7 @@ nxt_event_conn_proxy_shutdown(nxt_thread_t *thr, nxt_event_conn_proxy_t *p,
static void
-nxt_event_conn_proxy_read_error(nxt_thread_t *thr, void *obj, void *data)
+nxt_event_conn_proxy_read_error(nxt_task_t *task, void *obj, void *data)
{
nxt_event_conn_t *c;
nxt_event_conn_proxy_t *p;
@@ -972,14 +960,14 @@ nxt_event_conn_proxy_read_error(nxt_thread_t *thr, void *obj, void *data)
c = obj;
p = data;
- nxt_log_debug(thr->log, "event conn proxy read error fd:%d", c->socket.fd);
+ nxt_debug(task, "event conn proxy read error fd:%d", c->socket.fd);
- nxt_event_conn_proxy_close(thr, c, p);
+ nxt_event_conn_proxy_close(task, c, p);
}
static void
-nxt_event_conn_proxy_write_error(nxt_thread_t *thr, void *obj, void *data)
+nxt_event_conn_proxy_write_error(nxt_task_t *task, void *obj, void *data)
{
nxt_event_conn_t *source, *sink;
nxt_event_conn_proxy_t *p;
@@ -987,48 +975,47 @@ nxt_event_conn_proxy_write_error(nxt_thread_t *thr, void *obj, void *data)
sink = obj;
p = data;
- nxt_log_debug(thr->log, "event conn proxy write error fd:%d",
- sink->socket.fd);
+ nxt_debug(task, "event conn proxy write error fd:%d", sink->socket.fd);
/* Clear data for the direction sink. */
sink->write = NULL;
/* Block the direction source. */
source = (sink == p->client) ? p->peer : p->client;
- nxt_event_fd_block_read(thr->engine, &source->socket);
+ nxt_event_fd_block_read(task->thread->engine, &source->socket);
if (source->write == NULL) {
/*
* There is no data for the opposite direction and
* the next read from the sink will most probably fail.
*/
- nxt_event_conn_proxy_complete(thr, p);
+ nxt_event_conn_proxy_complete(task, p);
}
}
static void
-nxt_event_conn_proxy_complete(nxt_thread_t *thr, nxt_event_conn_proxy_t *p)
+nxt_event_conn_proxy_complete(nxt_task_t *task, nxt_event_conn_proxy_t *p)
{
- nxt_log_debug(p->client->socket.log, "event conn proxy complete %d:%d",
- p->client->socket.fd, p->peer->socket.fd);
+ nxt_debug(p->client->socket.task, "event conn proxy complete %d:%d",
+ p->client->socket.fd, p->peer->socket.fd);
if (p->client->socket.fd != -1) {
- nxt_event_conn_close(thr, p->client);
+ nxt_event_conn_close(task, p->client);
}
if (p->peer->socket.fd != -1) {
- nxt_event_conn_close(thr, p->peer);
+ nxt_event_conn_close(task, p->peer);
} else if (p->delayed) {
- nxt_thread_work_queue_drop(thr, &p->peer->write_timer);
+ nxt_thread_work_queue_drop(task->thread, &p->peer->write_timer);
nxt_queue_remove(&p->peer->link);
- nxt_event_timer_delete(thr->engine, &p->peer->write_timer);
+ nxt_event_timer_delete(task->thread->engine, &p->peer->write_timer);
}
nxt_mem_free(p->client->mem_pool, p->client_buffer);
nxt_mem_free(p->client->mem_pool, p->peer_buffer);
- p->completion_handler(thr, p, NULL);
+ p->completion_handler(task, p, NULL);
}