diff options
author | Igor Sysoev <igor@sysoev.ru> | 2017-02-07 20:04:56 +0300 |
---|---|---|
committer | Igor Sysoev <igor@sysoev.ru> | 2017-02-07 20:04:56 +0300 |
commit | 059a8642898a6bd4b47d13a1c1d599cd44af7e1c (patch) | |
tree | e3c8c530a04f1ae44777d5ea4fd6901dc55a8ebf /src/nxt_event_conn.c | |
parent | e57b95a92333fa7ff558737b0ba2b76894cc0412 (diff) | |
download | unit-059a8642898a6bd4b47d13a1c1d599cd44af7e1c.tar.gz unit-059a8642898a6bd4b47d13a1c1d599cd44af7e1c.tar.bz2 |
Event engines refactoring.
Diffstat (limited to 'src/nxt_event_conn.c')
-rw-r--r-- | src/nxt_event_conn.c | 144 |
1 files changed, 92 insertions, 52 deletions
diff --git a/src/nxt_event_conn.c b/src/nxt_event_conn.c index d907c238..b78a9251 100644 --- a/src/nxt_event_conn.c +++ b/src/nxt_event_conn.c @@ -7,9 +7,9 @@ #include <nxt_main.h> -static void nxt_event_conn_shutdown_socket(nxt_task_t *task, void *obj, - void *data); -static void nxt_event_conn_close_socket(nxt_task_t *task, void *obj, +static void nxt_conn_shutdown_handler(nxt_task_t *task, void *obj, void *data); +static void nxt_conn_close_handler(nxt_task_t *task, void *obj, void *data); +static void nxt_conn_close_timer_handler(nxt_task_t *task, void *obj, void *data); @@ -81,7 +81,7 @@ nxt_event_conn_create(nxt_mem_pool_t *mp, nxt_log_t *log) c->read_timer.task = &c->task; c->write_timer.task = &c->task; - c->io = thr->engine->event->io; + c->io = thr->engine->event.io; c->max_chunk = NXT_INT32_T_MAX; c->sendfile = NXT_CONN_SENDFILE_UNSET; @@ -132,72 +132,124 @@ nxt_event_conn_io_shutdown(nxt_task_t *task, void *obj, void *data) void -nxt_event_conn_close(nxt_task_t *task, nxt_event_conn_t *c) +nxt_event_conn_close(nxt_event_engine_t *engine, nxt_event_conn_t *c) { - nxt_thread_t *thr; + int ret; + socklen_t len; + struct linger linger; nxt_work_queue_t *wq; - nxt_event_engine_t *engine; nxt_work_handler_t handler; - nxt_debug(task, "event conn close fd:%d", c->socket.fd); + if (c->socket.timedout) { + /* + * Resetting of timed out connection on close + * releases kernel memory associated with socket. + * This also causes sending TCP/IP RST to a peer. + */ + linger.l_onoff = 1; + linger.l_linger = 0; + len = sizeof(struct linger); - thr = task->thread; + ret = setsockopt(c->socket.fd, SOL_SOCKET, SO_LINGER, &linger, len); - engine = thr->engine; + if (nxt_slow_path(ret != 0)) { + nxt_log(c->socket.task, NXT_LOG_CRIT, + "setsockopt(%d, SO_LINGER) failed %E", + c->socket.fd, nxt_socket_errno); + } + } - nxt_timer_delete(engine, &c->read_timer); - nxt_timer_delete(engine, &c->write_timer); + if (c->socket.error == 0 && !c->socket.closed && !c->socket.shutdown) { + wq = &engine->shutdown_work_queue; + handler = nxt_conn_shutdown_handler; - nxt_event_fd_close(engine, &c->socket); - engine->connections--; + } else{ + wq = &engine->close_work_queue; + handler = nxt_conn_close_handler; + } - nxt_debug(task, "event connections: %uD", engine->connections); + nxt_work_queue_add(wq, handler, c->socket.task, c, engine); +} - if (engine->batch != 0) { - if (c->socket.closed || c->socket.error != 0) { - wq = &engine->close_work_queue; - handler = nxt_event_conn_close_socket; +static void +nxt_conn_shutdown_handler(nxt_task_t *task, void *obj, void *data) +{ + nxt_event_conn_t *c; + nxt_event_engine_t *engine; - } else { - wq = &engine->shutdown_work_queue; - handler = nxt_event_conn_shutdown_socket; - } + c = obj; + engine = data; - nxt_work_queue_add(wq, handler, task, - (void *) (uintptr_t) c->socket.fd, NULL); + nxt_debug(task, "event conn shutdown fd:%d", c->socket.fd); - } else { - nxt_socket_close(c->socket.fd); - } + c->socket.shutdown = 1; - c->socket.fd = -1; + nxt_socket_shutdown(c->socket.fd, SHUT_RDWR); + + nxt_work_queue_add(&engine->close_work_queue, nxt_conn_close_handler, + task, c, engine); } static void -nxt_event_conn_shutdown_socket(nxt_task_t *task, void *obj, void *data) +nxt_conn_close_handler(nxt_task_t *task, void *obj, void *data) { - nxt_socket_t s; + nxt_uint_t events_pending, timers_pending; + nxt_event_conn_t *c; + nxt_event_engine_t *engine; + + c = obj; + engine = data; + + nxt_debug(task, "event conn close fd:%d", c->socket.fd); - s = (nxt_socket_t) (uintptr_t) obj; + timers_pending = nxt_timer_delete(engine, &c->read_timer); + timers_pending += nxt_timer_delete(engine, &c->write_timer); + + events_pending = nxt_fd_event_close(engine, &c->socket); + + if (events_pending == 0) { + nxt_socket_close(c->socket.fd); + c->socket.fd = -1; + + if (timers_pending == 0) { + nxt_work_queue_add(&engine->fast_work_queue, + c->write_state->ready_handler, + task, c, c->socket.data); + return; + } + } - nxt_socket_shutdown(s, SHUT_RDWR); + c->write_timer.handler = nxt_conn_close_timer_handler; + c->write_timer.work_queue = &engine->fast_work_queue; - nxt_work_queue_add(&task->thread->engine->close_work_queue, - nxt_event_conn_close_socket, task, - (void *) (uintptr_t) s, NULL); + nxt_timer_add(engine, &c->write_timer, 0); } static void -nxt_event_conn_close_socket(nxt_task_t *task, void *obj, void *data) +nxt_conn_close_timer_handler(nxt_task_t *task, void *obj, void *data) { - nxt_socket_t s; + nxt_timer_t *ev; + nxt_event_conn_t *c; + nxt_event_engine_t *engine; + + ev = obj; + + c = nxt_event_write_timer_conn(ev); + + nxt_debug(task, "event conn close handler fd:%d", c->socket.fd); + + if (c->socket.fd != -1) { + nxt_socket_close(c->socket.fd); + c->socket.fd = -1; + } - s = (nxt_socket_t) (uintptr_t) obj; + engine = task->thread->engine; - nxt_socket_close(s); + nxt_work_queue_add(&engine->fast_work_queue, c->write_state->ready_handler, + task, c, c->socket.data); } @@ -221,18 +273,6 @@ nxt_event_conn_timer(nxt_event_engine_t *engine, nxt_event_conn_t *c, void nxt_event_conn_work_queue_set(nxt_event_conn_t *c, nxt_work_queue_t *wq) { -#if 0 - nxt_thread_t *thr; - nxt_work_queue_t *owq; - - thr = nxt_thread(); - owq = c->socket.work_queue; - - nxt_thread_work_queue_move(thr, owq, wq, c); - nxt_thread_work_queue_move(thr, owq, wq, &c->read_timer); - nxt_thread_work_queue_move(thr, owq, wq, &c->write_timer); -#endif - c->read_work_queue = wq; c->write_work_queue = wq; c->read_timer.work_queue = wq; |