summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_event_conn_proxy.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nxt_event_conn_proxy.c')
-rw-r--r--src/nxt_event_conn_proxy.c99
1 files changed, 67 insertions, 32 deletions
diff --git a/src/nxt_event_conn_proxy.c b/src/nxt_event_conn_proxy.c
index 42b9dd0f..d91cff4a 100644
--- a/src/nxt_event_conn_proxy.c
+++ b/src/nxt_event_conn_proxy.c
@@ -53,6 +53,8 @@ static void nxt_event_conn_proxy_write_error(nxt_task_t *task, void *obj,
void *data);
static void nxt_event_conn_proxy_complete(nxt_task_t *task,
nxt_event_conn_proxy_t *p);
+static void nxt_event_conn_proxy_completion(nxt_task_t *task, void *obj,
+ void *data);
static const nxt_event_conn_state_t nxt_event_conn_proxy_client_wait_state;
@@ -78,7 +80,7 @@ nxt_event_conn_proxy_create(nxt_event_conn_t *client)
return NULL;
}
- peer = nxt_event_conn_create(client->mem_pool, client->socket.log);
+ peer = nxt_event_conn_create(client->mem_pool, client->socket.task);
if (nxt_slow_path(peer == NULL)) {
return NULL;
}
@@ -109,11 +111,8 @@ nxt_event_conn_proxy_create(nxt_event_conn_t *client)
void
nxt_event_conn_proxy(nxt_task_t *task, nxt_event_conn_proxy_t *p)
{
- nxt_thread_t *thr;
nxt_event_conn_t *peer;
- thr = nxt_thread();
-
/*
* Peer read event: not connected, disabled.
* Peer write event: not connected, disabled.
@@ -127,7 +126,7 @@ nxt_event_conn_proxy(nxt_task_t *task, nxt_event_conn_proxy_t *p)
peer = p->peer;
peer->write_state = &nxt_event_conn_proxy_peer_connect_state;
- nxt_event_conn_connect_enqueue(thr, task, peer);
+ nxt_event_conn_connect(task->thread->engine, peer);
}
/*
@@ -136,7 +135,7 @@ nxt_event_conn_proxy(nxt_task_t *task, nxt_event_conn_proxy_t *p)
*/
p->client->read_state = &nxt_event_conn_proxy_client_wait_state;
- nxt_event_conn_read(task, p->client);
+ nxt_conn_wait(p->client);
}
@@ -203,12 +202,13 @@ nxt_event_conn_proxy_client_buffer_alloc(nxt_task_t *task, void *obj,
client->read_state = &nxt_event_conn_proxy_client_first_read_state;
}
- nxt_event_conn_read(task, client);
+ nxt_event_conn_read(task->thread->engine, client);
}
static const nxt_event_conn_state_t
- nxt_event_conn_proxy_client_first_read_state nxt_aligned(64) =
+ nxt_event_conn_proxy_client_first_read_state
+ nxt_aligned(64) =
{
NXT_EVENT_BUF_PROCESS,
NXT_EVENT_TIMER_AUTORESET,
@@ -243,7 +243,7 @@ nxt_event_conn_proxy_peer_connect(nxt_task_t *task, void *obj, void *data)
p->peer->write_state = &nxt_event_conn_proxy_peer_connect_state;
- nxt_event_conn_connect(task, p->peer);
+ nxt_event_conn_connect(task->thread->engine, p->peer);
}
@@ -276,15 +276,15 @@ nxt_event_conn_proxy_connected(nxt_task_t *task, void *obj, void *data)
p->connected = 1;
- nxt_event_conn_tcp_nodelay_on(peer);
- nxt_event_conn_tcp_nodelay_on(p->client);
+ nxt_event_conn_tcp_nodelay_on(task, peer);
+ nxt_event_conn_tcp_nodelay_on(task, p->client);
/* Peer read event: waiting with peer_wait_timeout. */
peer->read_state = &nxt_event_conn_proxy_peer_wait_state;
peer->write_state = &nxt_event_conn_proxy_peer_write_state;
- nxt_event_conn_read_enqueue(task->thread, task, peer);
+ nxt_conn_wait(peer);
if (p->client_buffer != NULL) {
client = p->client;
@@ -350,7 +350,7 @@ nxt_event_conn_proxy_peer_read(nxt_task_t *task, void *obj, void *data)
* Peer read event: waiting with possible peer_wait_timeout.
* Peer write event: blocked.
*/
- nxt_event_conn_read(task, peer);
+ nxt_event_conn_read(task->thread->engine, peer);
}
@@ -469,7 +469,7 @@ nxt_event_conn_proxy_read_process(nxt_task_t *task, nxt_event_conn_proxy_t *p,
}
if (p->connected) {
- nxt_event_conn_write_enqueue(task->thread, task, sink);
+ nxt_event_conn_write(task->thread->engine, sink);
}
}
@@ -547,7 +547,7 @@ nxt_event_conn_proxy_read(nxt_task_t *task, void *obj, void *data)
sink = (source == p->client) ? p->peer : p->client;
if (sink->socket.error == 0) {
- nxt_event_conn_read(task, source);
+ nxt_event_conn_read(task->thread->engine, source);
}
}
}
@@ -658,7 +658,7 @@ nxt_event_conn_proxy_write_process(nxt_task_t *task, nxt_event_conn_proxy_t *p,
}
if (wb->mem.pos != wb->mem.free) {
- nxt_event_conn_write_enqueue(task->thread, task, sink);
+ nxt_event_conn_write(task->thread->engine, sink);
break;
}
@@ -865,7 +865,7 @@ nxt_event_conn_proxy_refused(nxt_task_t *task, void *obj, void *data)
p->retries--;
- nxt_socket_close(peer->socket.fd);
+ nxt_socket_close(task, peer->socket.fd);
peer->socket.fd = -1;
peer->socket.error = 0;
@@ -903,7 +903,7 @@ nxt_event_conn_proxy_reconnect_handler(nxt_task_t *task, void *obj, void *data)
* Peer read event: disabled.
* Peer write event: waiting for connection with connect_timeout.
*/
- nxt_event_conn_connect(task, peer);
+ nxt_event_conn_connect(task->thread->engine, peer);
}
@@ -915,8 +915,7 @@ nxt_event_conn_proxy_shutdown(nxt_task_t *task, nxt_event_conn_proxy_t *p,
nxt_debug(source->socket.task,
"event conn proxy shutdown source fd:%d cl:%d err:%d",
- source->socket.fd, source->socket.closed,
- source->socket.error);
+ source->socket.fd, source->socket.closed, source->socket.error);
nxt_debug(sink->socket.task,
"event conn proxy shutdown sink fd:%d cl:%d err:%d",
@@ -927,12 +926,9 @@ nxt_event_conn_proxy_shutdown(nxt_task_t *task, nxt_event_conn_proxy_t *p,
return;
}
- if (sink->socket.error != 0 || sink->socket.closed) {
- nxt_event_conn_close(task->thread->engine, sink);
-
- } else {
+ if (sink->socket.error == 0 && !sink->socket.closed) {
sink->socket.shutdown = 1;
- nxt_socket_shutdown(sink->socket.fd, SHUT_WR);
+ nxt_socket_shutdown(task, sink->socket.fd, SHUT_WR);
}
if (sink->socket.error != 0
@@ -943,6 +939,8 @@ nxt_event_conn_proxy_shutdown(nxt_task_t *task, nxt_event_conn_proxy_t *p,
return;
}
+ nxt_debug(source->socket.task, "free source buffer");
+
/* Free the direction's buffer. */
b = (source == p->client) ? p->client_buffer : p->peer_buffer;
nxt_mem_free(source->mem_pool, b);
@@ -992,6 +990,22 @@ nxt_event_conn_proxy_write_error(nxt_task_t *task, void *obj, void *data)
}
+static const nxt_event_conn_state_t nxt_event_conn_proxy_close_state
+ nxt_aligned(64) =
+{
+ NXT_EVENT_NO_BUF_PROCESS,
+ NXT_EVENT_TIMER_NO_AUTORESET,
+
+ nxt_event_conn_proxy_completion,
+ NULL,
+ NULL,
+
+ NULL,
+ NULL,
+ 0,
+};
+
+
static void
nxt_event_conn_proxy_complete(nxt_task_t *task, nxt_event_conn_proxy_t *p)
{
@@ -1002,20 +1016,41 @@ nxt_event_conn_proxy_complete(nxt_task_t *task, nxt_event_conn_proxy_t *p)
nxt_debug(p->client->socket.task, "event conn proxy complete %d:%d",
p->client->socket.fd, p->peer->socket.fd);
+ if (p->delayed) {
+ p->delayed = 0;
+ nxt_queue_remove(&p->peer->link);
+ }
+
if (p->client->socket.fd != -1) {
+ p->retain = 1;
+ p->client->write_state = &nxt_event_conn_proxy_close_state;
nxt_event_conn_close(engine, p->client);
}
if (p->peer->socket.fd != -1) {
+ p->retain++;
+ p->peer->write_state = &nxt_event_conn_proxy_close_state;
nxt_event_conn_close(engine, p->peer);
-
- } else if (p->delayed) {
- nxt_queue_remove(&p->peer->link);
- nxt_timer_delete(engine, &p->peer->write_timer);
}
+}
- nxt_mem_free(p->client->mem_pool, p->client_buffer);
- nxt_mem_free(p->client->mem_pool, p->peer_buffer);
- p->completion_handler(task, p, NULL);
+static void
+nxt_event_conn_proxy_completion(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_event_conn_proxy_t *p;
+
+ p = data;
+
+ nxt_debug(p->client->socket.task, "event conn proxy completion %d:%d:%d",
+ p->retain, p->client->socket.fd, p->peer->socket.fd);
+
+ p->retain--;
+
+ if (p->retain == 0) {
+ nxt_mem_free(p->client->mem_pool, p->client_buffer);
+ nxt_mem_free(p->client->mem_pool, p->peer_buffer);
+
+ p->completion_handler(task, p, NULL);
+ }
}