summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_stream_module.c
diff options
context:
space:
mode:
authorIgor Sysoev <igor@sysoev.ru>2017-02-22 15:09:59 +0300
committerIgor Sysoev <igor@sysoev.ru>2017-02-22 15:09:59 +0300
commit029942f4eb7196c2cff0d0e26bc6ff274138f7d8 (patch)
treef4686c4d7b9cd574fe94c6f4918479a580fecf75 /src/nxt_stream_module.c
parent059a8642898a6bd4b47d13a1c1d599cd44af7e1c (diff)
downloadunit-029942f4eb7196c2cff0d0e26bc6ff274138f7d8.tar.gz
unit-029942f4eb7196c2cff0d0e26bc6ff274138f7d8.tar.bz2
I/O operations refactoring.
Diffstat (limited to 'src/nxt_stream_module.c')
-rw-r--r--src/nxt_stream_module.c130
1 files changed, 130 insertions, 0 deletions
diff --git a/src/nxt_stream_module.c b/src/nxt_stream_module.c
new file mode 100644
index 00000000..89201893
--- /dev/null
+++ b/src/nxt_stream_module.c
@@ -0,0 +1,130 @@
+
+/*
+ * Copyright (C) Igor Sysoev
+ * Copyright (C) NGINX, Inc.
+ */
+
+#include <nxt_main.h>
+#include <nxt_cycle.h>
+
+
+static void nxt_stream_connection_peer(nxt_task_t *task,
+ nxt_upstream_peer_t *up);
+static void nxt_stream_connection_close(nxt_task_t *task, void *obj,
+ void *data);
+
+
+void
+nxt_stream_connection_init(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_cycle_t *cycle;
+ nxt_event_conn_t *c;
+ nxt_upstream_peer_t *up;
+
+ c = obj;
+
+ nxt_debug(task, "stream connection init");
+
+ up = nxt_mem_zalloc(c->mem_pool, sizeof(nxt_upstream_peer_t));
+ if (nxt_slow_path(up == NULL)) {
+ goto fail;
+ }
+
+ up->data = c;
+
+ cycle = nxt_thread_cycle();
+
+ if (cycle->upstream.length != 0) {
+ up->addr = cycle->upstream;
+
+ } else {
+ nxt_str_set(&up->addr, "127.0.0.1:8080");
+ }
+
+ up->ready_handler = nxt_stream_connection_peer;
+ up->mem_pool = c->mem_pool;
+
+ nxt_upstream_round_robin_peer(task, up);
+ return;
+
+fail:
+
+ /* TODO: close connection */
+ return;
+}
+
+
+static void
+nxt_stream_connection_peer(nxt_task_t *task, nxt_upstream_peer_t *up)
+{
+ nxt_event_conn_t *c;
+ nxt_event_conn_proxy_t *p;
+
+ c = up->data;
+
+ up->sockaddr->type = SOCK_STREAM;
+
+ nxt_log_debug(c->socket.log, "stream connection peer %*s",
+ up->sockaddr->length, nxt_sockaddr_start(up->sockaddr));
+
+ p = nxt_event_conn_proxy_create(c);
+ if (nxt_slow_path(p == NULL)) {
+ goto fail;
+ }
+
+ p->client->socket.data = p;
+ p->peer->socket.data = p;
+
+ p->client_buffer_size = 1024;
+ p->peer_buffer_size = 4096;
+ //p->client_wait_timeout = 9000;
+ p->connect_timeout = 7000;
+ p->reconnect_timeout = 500;
+ //p->peer_wait_timeout = 5000;
+ p->client_write_timeout = 3000;
+ p->peer_write_timeout = 3000;
+ p->completion_handler = nxt_stream_connection_close;
+ //p->retries = 10;
+ p->peer->remote = up->sockaddr;
+
+ if (0) {
+ nxt_event_engine_t *engine;
+ nxt_event_write_rate_t *rate;
+
+ rate = nxt_mem_alloc(c->mem_pool, sizeof(nxt_event_write_rate_t));
+
+ if (nxt_slow_path(rate == NULL)) {
+ goto fail;
+ }
+
+ c->rate = rate;
+
+ rate->limit = 1024;
+ rate->limit_after = 0;
+ rate->average = rate->limit;
+
+ engine = nxt_thread_event_engine();
+ rate->last = engine->timers.now;
+ }
+
+ nxt_event_conn_proxy(task, p);
+ return;
+
+fail:
+
+ /* TODO: close connection */
+ return;
+}
+
+
+static void
+nxt_stream_connection_close(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_event_conn_proxy_t *p;
+
+ p = obj;
+
+ nxt_log_debug(p->client->socket.log, "stream connection close");
+
+ nxt_mem_pool_destroy(p->client->mem_pool);
+}