summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_event_conn.c
diff options
context:
space:
mode:
authorIgor Sysoev <igor@sysoev.ru>2017-01-23 19:56:03 +0300
committerIgor Sysoev <igor@sysoev.ru>2017-01-23 19:56:03 +0300
commitde532922d9ab42aa15b40d47c8db53ac2af38500 (patch)
treed6b7c6b21c7a6e0e3620a3e0c7198e63454d05e3 /src/nxt_event_conn.c
parent16cbf3c076a0aca6d47adaf3f719493674cf2363 (diff)
downloadunit-de532922d9ab42aa15b40d47c8db53ac2af38500.tar.gz
unit-de532922d9ab42aa15b40d47c8db53ac2af38500.tar.bz2
Introducing tasks.
Diffstat (limited to 'src/nxt_event_conn.c')
-rw-r--r--src/nxt_event_conn.c75
1 files changed, 43 insertions, 32 deletions
diff --git a/src/nxt_event_conn.c b/src/nxt_event_conn.c
index a516bbac..6c4fa949 100644
--- a/src/nxt_event_conn.c
+++ b/src/nxt_event_conn.c
@@ -7,9 +7,9 @@
#include <nxt_main.h>
-static void nxt_event_conn_shutdown_socket(nxt_thread_t *thr, void *obj,
+static void nxt_event_conn_shutdown_socket(nxt_task_t *task, void *obj,
void *data);
-static void nxt_event_conn_close_socket(nxt_thread_t *thr, void *obj,
+static void nxt_event_conn_close_socket(nxt_task_t *task, void *obj,
void *data);
@@ -50,9 +50,8 @@ nxt_event_conn_io_t nxt_unix_event_conn_io = {
nxt_event_conn_t *
nxt_event_conn_create(nxt_mem_pool_t *mp, nxt_log_t *log)
{
- nxt_thread_t *thr;
- nxt_event_conn_t *c;
- static nxt_atomic_t ident = 1;
+ nxt_thread_t *thr;
+ nxt_event_conn_t *c;
c = nxt_mem_zalloc(mp, sizeof(nxt_event_conn_t));
if (nxt_slow_path(c == NULL)) {
@@ -69,12 +68,19 @@ nxt_event_conn_create(nxt_mem_pool_t *mp, nxt_log_t *log)
/* The while loop skips possible uint32_t overflow. */
while (c->log.ident == 0) {
- c->log.ident = (uint32_t) nxt_atomic_fetch_add(&ident, 1);
+ c->log.ident = nxt_task_next_ident();
}
thr = nxt_thread();
thr->engine->connections++;
+ c->task.thread = thr;
+ c->task.log = &c->log;
+ c->task.ident = c->log.ident;
+ c->socket.task = &c->task;
+ c->read_timer.task = &c->task;
+ c->write_timer.task = &c->task;
+
c->io = thr->engine->event->io;
c->max_chunk = NXT_INT32_T_MAX;
c->sendfile = NXT_CONN_SENDFILE_UNSET;
@@ -92,7 +98,7 @@ nxt_event_conn_create(nxt_mem_pool_t *mp, nxt_log_t *log)
void
-nxt_event_conn_io_shutdown(nxt_thread_t *thr, void *obj, void *data)
+nxt_event_conn_io_shutdown(nxt_task_t *task, void *obj, void *data)
{
int ret;
socklen_t len;
@@ -101,12 +107,12 @@ nxt_event_conn_io_shutdown(nxt_thread_t *thr, void *obj, void *data)
c = obj;
- nxt_log_debug(thr->log, "event conn shutdown");
+ nxt_debug(task, "event conn shutdown");
if (c->socket.timedout) {
/*
- * A reset of timed out connection on close
- * to release kernel memory associated with socket.
+ * Resetting of timed out connection on close
+ * releases kernel memory associated with socket.
* This also causes sending TCP/IP RST to a peer.
*/
linger.l_onoff = 1;
@@ -116,50 +122,54 @@ nxt_event_conn_io_shutdown(nxt_thread_t *thr, void *obj, void *data)
ret = setsockopt(c->socket.fd, SOL_SOCKET, SO_LINGER, &linger, len);
if (nxt_slow_path(ret != 0)) {
- nxt_log_error(NXT_LOG_CRIT, thr->log,
- "setsockopt(%d, SO_LINGER) failed %E",
- c->socket.fd, nxt_socket_errno);
+ nxt_log(task, NXT_LOG_CRIT, "setsockopt(%d, SO_LINGER) failed %E",
+ c->socket.fd, nxt_socket_errno);
}
}
- c->write_state->close_handler(thr, c, data);
+ c->write_state->close_handler(task, c, data);
}
void
-nxt_event_conn_close(nxt_thread_t *thr, nxt_event_conn_t *c)
+nxt_event_conn_close(nxt_task_t *task, nxt_event_conn_t *c)
{
+ nxt_thread_t *thr;
nxt_work_queue_t *wq;
+ nxt_event_engine_t *engine;
nxt_work_handler_t handler;
- nxt_log_debug(thr->log, "event conn close fd:%d", c->socket.fd);
+ nxt_debug(task, "event conn close fd:%d", c->socket.fd);
+
+ thr = task->thread;
nxt_thread_work_queue_drop(thr, c);
nxt_thread_work_queue_drop(thr, &c->read_timer);
nxt_thread_work_queue_drop(thr, &c->write_timer);
- nxt_event_timer_delete(thr->engine, &c->read_timer);
- nxt_event_timer_delete(thr->engine, &c->write_timer);
+ engine = thr->engine;
+
+ nxt_event_timer_delete(engine, &c->read_timer);
+ nxt_event_timer_delete(engine, &c->write_timer);
- nxt_event_fd_close(thr->engine, &c->socket);
- thr->engine->connections--;
+ nxt_event_fd_close(engine, &c->socket);
+ engine->connections--;
- nxt_log_debug(thr->log, "event connections: %uD", thr->engine->connections);
+ nxt_debug(task, "event connections: %uD", engine->connections);
- if (thr->engine->batch != 0) {
+ if (engine->batch != 0) {
if (c->socket.closed || c->socket.error != 0) {
- wq = &thr->engine->close_work_queue;
+ wq = &engine->close_work_queue;
handler = nxt_event_conn_close_socket;
} else {
- wq = &thr->engine->shutdown_work_queue;
+ wq = &engine->shutdown_work_queue;
handler = nxt_event_conn_shutdown_socket;
}
- nxt_thread_work_queue_add(thr, wq, handler,
- (void *) (uintptr_t) c->socket.fd, NULL,
- &nxt_main_log);
+ nxt_thread_work_queue_add(thr, wq, handler, task,
+ (void *) (uintptr_t) c->socket.fd, NULL);
} else {
nxt_socket_close(c->socket.fd);
@@ -170,7 +180,7 @@ nxt_event_conn_close(nxt_thread_t *thr, nxt_event_conn_t *c)
static void
-nxt_event_conn_shutdown_socket(nxt_thread_t *thr, void *obj, void *data)
+nxt_event_conn_shutdown_socket(nxt_task_t *task, void *obj, void *data)
{
nxt_socket_t s;
@@ -178,14 +188,15 @@ nxt_event_conn_shutdown_socket(nxt_thread_t *thr, void *obj, void *data)
nxt_socket_shutdown(s, SHUT_RDWR);
- nxt_thread_work_queue_add(thr, &thr->engine->close_work_queue,
- nxt_event_conn_close_socket,
- (void *) (uintptr_t) s, NULL, &nxt_main_log);
+ nxt_thread_work_queue_add(task->thread,
+ &task->thread->engine->close_work_queue,
+ nxt_event_conn_close_socket, task,
+ (void *) (uintptr_t) s, NULL);
}
static void
-nxt_event_conn_close_socket(nxt_thread_t *thr, void *obj, void *data)
+nxt_event_conn_close_socket(nxt_task_t *task, void *obj, void *data)
{
nxt_socket_t s;