diff options
Diffstat (limited to '')
-rw-r--r-- | src/nxt_event_conn_proxy.c | 99 |
1 files changed, 67 insertions, 32 deletions
diff --git a/src/nxt_event_conn_proxy.c b/src/nxt_event_conn_proxy.c index 42b9dd0f..d91cff4a 100644 --- a/src/nxt_event_conn_proxy.c +++ b/src/nxt_event_conn_proxy.c @@ -53,6 +53,8 @@ static void nxt_event_conn_proxy_write_error(nxt_task_t *task, void *obj, void *data); static void nxt_event_conn_proxy_complete(nxt_task_t *task, nxt_event_conn_proxy_t *p); +static void nxt_event_conn_proxy_completion(nxt_task_t *task, void *obj, + void *data); static const nxt_event_conn_state_t nxt_event_conn_proxy_client_wait_state; @@ -78,7 +80,7 @@ nxt_event_conn_proxy_create(nxt_event_conn_t *client) return NULL; } - peer = nxt_event_conn_create(client->mem_pool, client->socket.log); + peer = nxt_event_conn_create(client->mem_pool, client->socket.task); if (nxt_slow_path(peer == NULL)) { return NULL; } @@ -109,11 +111,8 @@ nxt_event_conn_proxy_create(nxt_event_conn_t *client) void nxt_event_conn_proxy(nxt_task_t *task, nxt_event_conn_proxy_t *p) { - nxt_thread_t *thr; nxt_event_conn_t *peer; - thr = nxt_thread(); - /* * Peer read event: not connected, disabled. * Peer write event: not connected, disabled. @@ -127,7 +126,7 @@ nxt_event_conn_proxy(nxt_task_t *task, nxt_event_conn_proxy_t *p) peer = p->peer; peer->write_state = &nxt_event_conn_proxy_peer_connect_state; - nxt_event_conn_connect_enqueue(thr, task, peer); + nxt_event_conn_connect(task->thread->engine, peer); } /* @@ -136,7 +135,7 @@ nxt_event_conn_proxy(nxt_task_t *task, nxt_event_conn_proxy_t *p) */ p->client->read_state = &nxt_event_conn_proxy_client_wait_state; - nxt_event_conn_read(task, p->client); + nxt_conn_wait(p->client); } @@ -203,12 +202,13 @@ nxt_event_conn_proxy_client_buffer_alloc(nxt_task_t *task, void *obj, client->read_state = &nxt_event_conn_proxy_client_first_read_state; } - nxt_event_conn_read(task, client); + nxt_event_conn_read(task->thread->engine, client); } static const nxt_event_conn_state_t - nxt_event_conn_proxy_client_first_read_state nxt_aligned(64) = + nxt_event_conn_proxy_client_first_read_state + nxt_aligned(64) = { NXT_EVENT_BUF_PROCESS, NXT_EVENT_TIMER_AUTORESET, @@ -243,7 +243,7 @@ nxt_event_conn_proxy_peer_connect(nxt_task_t *task, void *obj, void *data) p->peer->write_state = &nxt_event_conn_proxy_peer_connect_state; - nxt_event_conn_connect(task, p->peer); + nxt_event_conn_connect(task->thread->engine, p->peer); } @@ -276,15 +276,15 @@ nxt_event_conn_proxy_connected(nxt_task_t *task, void *obj, void *data) p->connected = 1; - nxt_event_conn_tcp_nodelay_on(peer); - nxt_event_conn_tcp_nodelay_on(p->client); + nxt_event_conn_tcp_nodelay_on(task, peer); + nxt_event_conn_tcp_nodelay_on(task, p->client); /* Peer read event: waiting with peer_wait_timeout. */ peer->read_state = &nxt_event_conn_proxy_peer_wait_state; peer->write_state = &nxt_event_conn_proxy_peer_write_state; - nxt_event_conn_read_enqueue(task->thread, task, peer); + nxt_conn_wait(peer); if (p->client_buffer != NULL) { client = p->client; @@ -350,7 +350,7 @@ nxt_event_conn_proxy_peer_read(nxt_task_t *task, void *obj, void *data) * Peer read event: waiting with possible peer_wait_timeout. * Peer write event: blocked. */ - nxt_event_conn_read(task, peer); + nxt_event_conn_read(task->thread->engine, peer); } @@ -469,7 +469,7 @@ nxt_event_conn_proxy_read_process(nxt_task_t *task, nxt_event_conn_proxy_t *p, } if (p->connected) { - nxt_event_conn_write_enqueue(task->thread, task, sink); + nxt_event_conn_write(task->thread->engine, sink); } } @@ -547,7 +547,7 @@ nxt_event_conn_proxy_read(nxt_task_t *task, void *obj, void *data) sink = (source == p->client) ? p->peer : p->client; if (sink->socket.error == 0) { - nxt_event_conn_read(task, source); + nxt_event_conn_read(task->thread->engine, source); } } } @@ -658,7 +658,7 @@ nxt_event_conn_proxy_write_process(nxt_task_t *task, nxt_event_conn_proxy_t *p, } if (wb->mem.pos != wb->mem.free) { - nxt_event_conn_write_enqueue(task->thread, task, sink); + nxt_event_conn_write(task->thread->engine, sink); break; } @@ -865,7 +865,7 @@ nxt_event_conn_proxy_refused(nxt_task_t *task, void *obj, void *data) p->retries--; - nxt_socket_close(peer->socket.fd); + nxt_socket_close(task, peer->socket.fd); peer->socket.fd = -1; peer->socket.error = 0; @@ -903,7 +903,7 @@ nxt_event_conn_proxy_reconnect_handler(nxt_task_t *task, void *obj, void *data) * Peer read event: disabled. * Peer write event: waiting for connection with connect_timeout. */ - nxt_event_conn_connect(task, peer); + nxt_event_conn_connect(task->thread->engine, peer); } @@ -915,8 +915,7 @@ nxt_event_conn_proxy_shutdown(nxt_task_t *task, nxt_event_conn_proxy_t *p, nxt_debug(source->socket.task, "event conn proxy shutdown source fd:%d cl:%d err:%d", - source->socket.fd, source->socket.closed, - source->socket.error); + source->socket.fd, source->socket.closed, source->socket.error); nxt_debug(sink->socket.task, "event conn proxy shutdown sink fd:%d cl:%d err:%d", @@ -927,12 +926,9 @@ nxt_event_conn_proxy_shutdown(nxt_task_t *task, nxt_event_conn_proxy_t *p, return; } - if (sink->socket.error != 0 || sink->socket.closed) { - nxt_event_conn_close(task->thread->engine, sink); - - } else { + if (sink->socket.error == 0 && !sink->socket.closed) { sink->socket.shutdown = 1; - nxt_socket_shutdown(sink->socket.fd, SHUT_WR); + nxt_socket_shutdown(task, sink->socket.fd, SHUT_WR); } if (sink->socket.error != 0 @@ -943,6 +939,8 @@ nxt_event_conn_proxy_shutdown(nxt_task_t *task, nxt_event_conn_proxy_t *p, return; } + nxt_debug(source->socket.task, "free source buffer"); + /* Free the direction's buffer. */ b = (source == p->client) ? p->client_buffer : p->peer_buffer; nxt_mem_free(source->mem_pool, b); @@ -992,6 +990,22 @@ nxt_event_conn_proxy_write_error(nxt_task_t *task, void *obj, void *data) } +static const nxt_event_conn_state_t nxt_event_conn_proxy_close_state + nxt_aligned(64) = +{ + NXT_EVENT_NO_BUF_PROCESS, + NXT_EVENT_TIMER_NO_AUTORESET, + + nxt_event_conn_proxy_completion, + NULL, + NULL, + + NULL, + NULL, + 0, +}; + + static void nxt_event_conn_proxy_complete(nxt_task_t *task, nxt_event_conn_proxy_t *p) { @@ -1002,20 +1016,41 @@ nxt_event_conn_proxy_complete(nxt_task_t *task, nxt_event_conn_proxy_t *p) nxt_debug(p->client->socket.task, "event conn proxy complete %d:%d", p->client->socket.fd, p->peer->socket.fd); + if (p->delayed) { + p->delayed = 0; + nxt_queue_remove(&p->peer->link); + } + if (p->client->socket.fd != -1) { + p->retain = 1; + p->client->write_state = &nxt_event_conn_proxy_close_state; nxt_event_conn_close(engine, p->client); } if (p->peer->socket.fd != -1) { + p->retain++; + p->peer->write_state = &nxt_event_conn_proxy_close_state; nxt_event_conn_close(engine, p->peer); - - } else if (p->delayed) { - nxt_queue_remove(&p->peer->link); - nxt_timer_delete(engine, &p->peer->write_timer); } +} - nxt_mem_free(p->client->mem_pool, p->client_buffer); - nxt_mem_free(p->client->mem_pool, p->peer_buffer); - p->completion_handler(task, p, NULL); +static void +nxt_event_conn_proxy_completion(nxt_task_t *task, void *obj, void *data) +{ + nxt_event_conn_proxy_t *p; + + p = data; + + nxt_debug(p->client->socket.task, "event conn proxy completion %d:%d:%d", + p->retain, p->client->socket.fd, p->peer->socket.fd); + + p->retain--; + + if (p->retain == 0) { + nxt_mem_free(p->client->mem_pool, p->client_buffer); + nxt_mem_free(p->client->mem_pool, p->peer_buffer); + + p->completion_handler(task, p, NULL); + } } |