summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_event_conn.c
diff options
context:
space:
mode:
authorIgor Sysoev <igor@sysoev.ru>2017-02-07 20:04:56 +0300
committerIgor Sysoev <igor@sysoev.ru>2017-02-07 20:04:56 +0300
commit059a8642898a6bd4b47d13a1c1d599cd44af7e1c (patch)
treee3c8c530a04f1ae44777d5ea4fd6901dc55a8ebf /src/nxt_event_conn.c
parente57b95a92333fa7ff558737b0ba2b76894cc0412 (diff)
downloadunit-059a8642898a6bd4b47d13a1c1d599cd44af7e1c.tar.gz
unit-059a8642898a6bd4b47d13a1c1d599cd44af7e1c.tar.bz2
Event engines refactoring.
Diffstat (limited to 'src/nxt_event_conn.c')
-rw-r--r--src/nxt_event_conn.c144
1 files changed, 92 insertions, 52 deletions
diff --git a/src/nxt_event_conn.c b/src/nxt_event_conn.c
index d907c238..b78a9251 100644
--- a/src/nxt_event_conn.c
+++ b/src/nxt_event_conn.c
@@ -7,9 +7,9 @@
#include <nxt_main.h>
-static void nxt_event_conn_shutdown_socket(nxt_task_t *task, void *obj,
- void *data);
-static void nxt_event_conn_close_socket(nxt_task_t *task, void *obj,
+static void nxt_conn_shutdown_handler(nxt_task_t *task, void *obj, void *data);
+static void nxt_conn_close_handler(nxt_task_t *task, void *obj, void *data);
+static void nxt_conn_close_timer_handler(nxt_task_t *task, void *obj,
void *data);
@@ -81,7 +81,7 @@ nxt_event_conn_create(nxt_mem_pool_t *mp, nxt_log_t *log)
c->read_timer.task = &c->task;
c->write_timer.task = &c->task;
- c->io = thr->engine->event->io;
+ c->io = thr->engine->event.io;
c->max_chunk = NXT_INT32_T_MAX;
c->sendfile = NXT_CONN_SENDFILE_UNSET;
@@ -132,72 +132,124 @@ nxt_event_conn_io_shutdown(nxt_task_t *task, void *obj, void *data)
void
-nxt_event_conn_close(nxt_task_t *task, nxt_event_conn_t *c)
+nxt_event_conn_close(nxt_event_engine_t *engine, nxt_event_conn_t *c)
{
- nxt_thread_t *thr;
+ int ret;
+ socklen_t len;
+ struct linger linger;
nxt_work_queue_t *wq;
- nxt_event_engine_t *engine;
nxt_work_handler_t handler;
- nxt_debug(task, "event conn close fd:%d", c->socket.fd);
+ if (c->socket.timedout) {
+ /*
+ * Resetting of timed out connection on close
+ * releases kernel memory associated with socket.
+ * This also causes sending TCP/IP RST to a peer.
+ */
+ linger.l_onoff = 1;
+ linger.l_linger = 0;
+ len = sizeof(struct linger);
- thr = task->thread;
+ ret = setsockopt(c->socket.fd, SOL_SOCKET, SO_LINGER, &linger, len);
- engine = thr->engine;
+ if (nxt_slow_path(ret != 0)) {
+ nxt_log(c->socket.task, NXT_LOG_CRIT,
+ "setsockopt(%d, SO_LINGER) failed %E",
+ c->socket.fd, nxt_socket_errno);
+ }
+ }
- nxt_timer_delete(engine, &c->read_timer);
- nxt_timer_delete(engine, &c->write_timer);
+ if (c->socket.error == 0 && !c->socket.closed && !c->socket.shutdown) {
+ wq = &engine->shutdown_work_queue;
+ handler = nxt_conn_shutdown_handler;
- nxt_event_fd_close(engine, &c->socket);
- engine->connections--;
+ } else{
+ wq = &engine->close_work_queue;
+ handler = nxt_conn_close_handler;
+ }
- nxt_debug(task, "event connections: %uD", engine->connections);
+ nxt_work_queue_add(wq, handler, c->socket.task, c, engine);
+}
- if (engine->batch != 0) {
- if (c->socket.closed || c->socket.error != 0) {
- wq = &engine->close_work_queue;
- handler = nxt_event_conn_close_socket;
+static void
+nxt_conn_shutdown_handler(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_event_conn_t *c;
+ nxt_event_engine_t *engine;
- } else {
- wq = &engine->shutdown_work_queue;
- handler = nxt_event_conn_shutdown_socket;
- }
+ c = obj;
+ engine = data;
- nxt_work_queue_add(wq, handler, task,
- (void *) (uintptr_t) c->socket.fd, NULL);
+ nxt_debug(task, "event conn shutdown fd:%d", c->socket.fd);
- } else {
- nxt_socket_close(c->socket.fd);
- }
+ c->socket.shutdown = 1;
- c->socket.fd = -1;
+ nxt_socket_shutdown(c->socket.fd, SHUT_RDWR);
+
+ nxt_work_queue_add(&engine->close_work_queue, nxt_conn_close_handler,
+ task, c, engine);
}
static void
-nxt_event_conn_shutdown_socket(nxt_task_t *task, void *obj, void *data)
+nxt_conn_close_handler(nxt_task_t *task, void *obj, void *data)
{
- nxt_socket_t s;
+ nxt_uint_t events_pending, timers_pending;
+ nxt_event_conn_t *c;
+ nxt_event_engine_t *engine;
+
+ c = obj;
+ engine = data;
+
+ nxt_debug(task, "event conn close fd:%d", c->socket.fd);
- s = (nxt_socket_t) (uintptr_t) obj;
+ timers_pending = nxt_timer_delete(engine, &c->read_timer);
+ timers_pending += nxt_timer_delete(engine, &c->write_timer);
+
+ events_pending = nxt_fd_event_close(engine, &c->socket);
+
+ if (events_pending == 0) {
+ nxt_socket_close(c->socket.fd);
+ c->socket.fd = -1;
+
+ if (timers_pending == 0) {
+ nxt_work_queue_add(&engine->fast_work_queue,
+ c->write_state->ready_handler,
+ task, c, c->socket.data);
+ return;
+ }
+ }
- nxt_socket_shutdown(s, SHUT_RDWR);
+ c->write_timer.handler = nxt_conn_close_timer_handler;
+ c->write_timer.work_queue = &engine->fast_work_queue;
- nxt_work_queue_add(&task->thread->engine->close_work_queue,
- nxt_event_conn_close_socket, task,
- (void *) (uintptr_t) s, NULL);
+ nxt_timer_add(engine, &c->write_timer, 0);
}
static void
-nxt_event_conn_close_socket(nxt_task_t *task, void *obj, void *data)
+nxt_conn_close_timer_handler(nxt_task_t *task, void *obj, void *data)
{
- nxt_socket_t s;
+ nxt_timer_t *ev;
+ nxt_event_conn_t *c;
+ nxt_event_engine_t *engine;
+
+ ev = obj;
+
+ c = nxt_event_write_timer_conn(ev);
+
+ nxt_debug(task, "event conn close handler fd:%d", c->socket.fd);
+
+ if (c->socket.fd != -1) {
+ nxt_socket_close(c->socket.fd);
+ c->socket.fd = -1;
+ }
- s = (nxt_socket_t) (uintptr_t) obj;
+ engine = task->thread->engine;
- nxt_socket_close(s);
+ nxt_work_queue_add(&engine->fast_work_queue, c->write_state->ready_handler,
+ task, c, c->socket.data);
}
@@ -221,18 +273,6 @@ nxt_event_conn_timer(nxt_event_engine_t *engine, nxt_event_conn_t *c,
void
nxt_event_conn_work_queue_set(nxt_event_conn_t *c, nxt_work_queue_t *wq)
{
-#if 0
- nxt_thread_t *thr;
- nxt_work_queue_t *owq;
-
- thr = nxt_thread();
- owq = c->socket.work_queue;
-
- nxt_thread_work_queue_move(thr, owq, wq, c);
- nxt_thread_work_queue_move(thr, owq, wq, &c->read_timer);
- nxt_thread_work_queue_move(thr, owq, wq, &c->write_timer);
-#endif
-
c->read_work_queue = wq;
c->write_work_queue = wq;
c->read_timer.work_queue = wq;