diff options
author | Igor Sysoev <igor@sysoev.ru> | 2017-01-23 19:56:03 +0300 |
---|---|---|
committer | Igor Sysoev <igor@sysoev.ru> | 2017-01-23 19:56:03 +0300 |
commit | de532922d9ab42aa15b40d47c8db53ac2af38500 (patch) | |
tree | d6b7c6b21c7a6e0e3620a3e0c7198e63454d05e3 /src/nxt_event_conn.c | |
parent | 16cbf3c076a0aca6d47adaf3f719493674cf2363 (diff) | |
download | unit-de532922d9ab42aa15b40d47c8db53ac2af38500.tar.gz unit-de532922d9ab42aa15b40d47c8db53ac2af38500.tar.bz2 |
Introducing tasks.
Diffstat (limited to 'src/nxt_event_conn.c')
-rw-r--r-- | src/nxt_event_conn.c | 75 |
1 files changed, 43 insertions, 32 deletions
diff --git a/src/nxt_event_conn.c b/src/nxt_event_conn.c index a516bbac..6c4fa949 100644 --- a/src/nxt_event_conn.c +++ b/src/nxt_event_conn.c @@ -7,9 +7,9 @@ #include <nxt_main.h> -static void nxt_event_conn_shutdown_socket(nxt_thread_t *thr, void *obj, +static void nxt_event_conn_shutdown_socket(nxt_task_t *task, void *obj, void *data); -static void nxt_event_conn_close_socket(nxt_thread_t *thr, void *obj, +static void nxt_event_conn_close_socket(nxt_task_t *task, void *obj, void *data); @@ -50,9 +50,8 @@ nxt_event_conn_io_t nxt_unix_event_conn_io = { nxt_event_conn_t * nxt_event_conn_create(nxt_mem_pool_t *mp, nxt_log_t *log) { - nxt_thread_t *thr; - nxt_event_conn_t *c; - static nxt_atomic_t ident = 1; + nxt_thread_t *thr; + nxt_event_conn_t *c; c = nxt_mem_zalloc(mp, sizeof(nxt_event_conn_t)); if (nxt_slow_path(c == NULL)) { @@ -69,12 +68,19 @@ nxt_event_conn_create(nxt_mem_pool_t *mp, nxt_log_t *log) /* The while loop skips possible uint32_t overflow. */ while (c->log.ident == 0) { - c->log.ident = (uint32_t) nxt_atomic_fetch_add(&ident, 1); + c->log.ident = nxt_task_next_ident(); } thr = nxt_thread(); thr->engine->connections++; + c->task.thread = thr; + c->task.log = &c->log; + c->task.ident = c->log.ident; + c->socket.task = &c->task; + c->read_timer.task = &c->task; + c->write_timer.task = &c->task; + c->io = thr->engine->event->io; c->max_chunk = NXT_INT32_T_MAX; c->sendfile = NXT_CONN_SENDFILE_UNSET; @@ -92,7 +98,7 @@ nxt_event_conn_create(nxt_mem_pool_t *mp, nxt_log_t *log) void -nxt_event_conn_io_shutdown(nxt_thread_t *thr, void *obj, void *data) +nxt_event_conn_io_shutdown(nxt_task_t *task, void *obj, void *data) { int ret; socklen_t len; @@ -101,12 +107,12 @@ nxt_event_conn_io_shutdown(nxt_thread_t *thr, void *obj, void *data) c = obj; - nxt_log_debug(thr->log, "event conn shutdown"); + nxt_debug(task, "event conn shutdown"); if (c->socket.timedout) { /* - * A reset of timed out connection on close - * to release kernel memory associated with socket. + * Resetting of timed out connection on close + * releases kernel memory associated with socket. * This also causes sending TCP/IP RST to a peer. */ linger.l_onoff = 1; @@ -116,50 +122,54 @@ nxt_event_conn_io_shutdown(nxt_thread_t *thr, void *obj, void *data) ret = setsockopt(c->socket.fd, SOL_SOCKET, SO_LINGER, &linger, len); if (nxt_slow_path(ret != 0)) { - nxt_log_error(NXT_LOG_CRIT, thr->log, - "setsockopt(%d, SO_LINGER) failed %E", - c->socket.fd, nxt_socket_errno); + nxt_log(task, NXT_LOG_CRIT, "setsockopt(%d, SO_LINGER) failed %E", + c->socket.fd, nxt_socket_errno); } } - c->write_state->close_handler(thr, c, data); + c->write_state->close_handler(task, c, data); } void -nxt_event_conn_close(nxt_thread_t *thr, nxt_event_conn_t *c) +nxt_event_conn_close(nxt_task_t *task, nxt_event_conn_t *c) { + nxt_thread_t *thr; nxt_work_queue_t *wq; + nxt_event_engine_t *engine; nxt_work_handler_t handler; - nxt_log_debug(thr->log, "event conn close fd:%d", c->socket.fd); + nxt_debug(task, "event conn close fd:%d", c->socket.fd); + + thr = task->thread; nxt_thread_work_queue_drop(thr, c); nxt_thread_work_queue_drop(thr, &c->read_timer); nxt_thread_work_queue_drop(thr, &c->write_timer); - nxt_event_timer_delete(thr->engine, &c->read_timer); - nxt_event_timer_delete(thr->engine, &c->write_timer); + engine = thr->engine; + + nxt_event_timer_delete(engine, &c->read_timer); + nxt_event_timer_delete(engine, &c->write_timer); - nxt_event_fd_close(thr->engine, &c->socket); - thr->engine->connections--; + nxt_event_fd_close(engine, &c->socket); + engine->connections--; - nxt_log_debug(thr->log, "event connections: %uD", thr->engine->connections); + nxt_debug(task, "event connections: %uD", engine->connections); - if (thr->engine->batch != 0) { + if (engine->batch != 0) { if (c->socket.closed || c->socket.error != 0) { - wq = &thr->engine->close_work_queue; + wq = &engine->close_work_queue; handler = nxt_event_conn_close_socket; } else { - wq = &thr->engine->shutdown_work_queue; + wq = &engine->shutdown_work_queue; handler = nxt_event_conn_shutdown_socket; } - nxt_thread_work_queue_add(thr, wq, handler, - (void *) (uintptr_t) c->socket.fd, NULL, - &nxt_main_log); + nxt_thread_work_queue_add(thr, wq, handler, task, + (void *) (uintptr_t) c->socket.fd, NULL); } else { nxt_socket_close(c->socket.fd); @@ -170,7 +180,7 @@ nxt_event_conn_close(nxt_thread_t *thr, nxt_event_conn_t *c) static void -nxt_event_conn_shutdown_socket(nxt_thread_t *thr, void *obj, void *data) +nxt_event_conn_shutdown_socket(nxt_task_t *task, void *obj, void *data) { nxt_socket_t s; @@ -178,14 +188,15 @@ nxt_event_conn_shutdown_socket(nxt_thread_t *thr, void *obj, void *data) nxt_socket_shutdown(s, SHUT_RDWR); - nxt_thread_work_queue_add(thr, &thr->engine->close_work_queue, - nxt_event_conn_close_socket, - (void *) (uintptr_t) s, NULL, &nxt_main_log); + nxt_thread_work_queue_add(task->thread, + &task->thread->engine->close_work_queue, + nxt_event_conn_close_socket, task, + (void *) (uintptr_t) s, NULL); } static void -nxt_event_conn_close_socket(nxt_thread_t *thr, void *obj, void *data) +nxt_event_conn_close_socket(nxt_task_t *task, void *obj, void *data) { nxt_socket_t s; |