summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_event_conn_connect.c
diff options
context:
space:
mode:
authorIgor Sysoev <igor@sysoev.ru>2017-01-23 19:56:03 +0300
committerIgor Sysoev <igor@sysoev.ru>2017-01-23 19:56:03 +0300
commitde532922d9ab42aa15b40d47c8db53ac2af38500 (patch)
treed6b7c6b21c7a6e0e3620a3e0c7198e63454d05e3 /src/nxt_event_conn_connect.c
parent16cbf3c076a0aca6d47adaf3f719493674cf2363 (diff)
downloadunit-de532922d9ab42aa15b40d47c8db53ac2af38500.tar.gz
unit-de532922d9ab42aa15b40d47c8db53ac2af38500.tar.bz2
Introducing tasks.
Diffstat (limited to 'src/nxt_event_conn_connect.c')
-rw-r--r--src/nxt_event_conn_connect.c71
1 files changed, 41 insertions, 30 deletions
diff --git a/src/nxt_event_conn_connect.c b/src/nxt_event_conn_connect.c
index f614dcda..55554720 100644
--- a/src/nxt_event_conn_connect.c
+++ b/src/nxt_event_conn_connect.c
@@ -8,37 +8,38 @@
void
-nxt_event_conn_connect(nxt_thread_t *thr, nxt_event_conn_t *c)
+nxt_event_conn_connect(nxt_task_t *task, nxt_event_conn_t *c)
{
- void *data;
+ void *data;
+ nxt_event_engine_t *engine;
data = c->socket.data;
+ engine = task->thread->engine;
- if (thr->engine->batch != 0) {
- nxt_thread_work_queue_add(thr, &thr->engine->socket_work_queue,
- nxt_event_conn_batch_socket, c, data,
- c->socket.log);
+ if (engine->batch != 0) {
+ nxt_thread_work_queue_add(task->thread, &engine->socket_work_queue,
+ nxt_event_conn_batch_socket, task, c, data);
return;
}
- if (nxt_event_conn_socket(thr, c) == NXT_OK) {
- c->io->connect(thr, c, data);
+ if (nxt_event_conn_socket(task, c) == NXT_OK) {
+ c->io->connect(task, c, data);
return;
}
- c->write_state->error_handler(thr, c, data);
+ c->write_state->error_handler(task, c, data);
}
void
-nxt_event_conn_batch_socket(nxt_thread_t *thr, void *obj, void *data)
+nxt_event_conn_batch_socket(nxt_task_t *task, void *obj, void *data)
{
nxt_event_conn_t *c;
nxt_work_handler_t handler;
c = obj;
- if (nxt_event_conn_socket(thr, c) == NXT_OK) {
+ if (nxt_event_conn_socket(task, c) == NXT_OK) {
c->socket.write_work_queue = c->write_work_queue;
handler = c->io->connect;
@@ -46,16 +47,18 @@ nxt_event_conn_batch_socket(nxt_thread_t *thr, void *obj, void *data)
handler = c->write_state->error_handler;
}
- nxt_thread_work_queue_add(thr, &thr->engine->connect_work_queue,
- handler, c, data, thr->log);
+ nxt_thread_work_queue_add(task->thread,
+ &task->thread->engine->connect_work_queue,
+ handler, task, c, data);
}
void
-nxt_event_conn_io_connect(nxt_thread_t *thr, void *obj, void *data)
+nxt_event_conn_io_connect(nxt_task_t *task, void *obj, void *data)
{
nxt_event_conn_t *c;
nxt_work_handler_t handler;
+ nxt_event_engine_t *engine;
const nxt_event_conn_state_t *state;
c = obj;
@@ -73,9 +76,11 @@ nxt_event_conn_io_connect(nxt_thread_t *thr, void *obj, void *data)
c->socket.write_handler = nxt_event_conn_connect_test;
c->socket.error_handler = state->error_handler;
- nxt_event_conn_timer(thr->engine, c, state, &c->write_timer);
+ engine = task->thread->engine;
- nxt_event_fd_enable_write(thr->engine, &c->socket);
+ nxt_event_conn_timer(engine, c, state, &c->write_timer);
+
+ nxt_event_fd_enable_write(engine, &c->socket);
return;
case NXT_DECLINED:
@@ -87,17 +92,18 @@ nxt_event_conn_io_connect(nxt_thread_t *thr, void *obj, void *data)
break;
}
- nxt_event_conn_io_handle(thr, c->write_work_queue, handler, c, data);
+ nxt_event_conn_io_handle(task->thread, c->write_work_queue, handler, task,
+ c, data);
}
nxt_int_t
-nxt_event_conn_socket(nxt_thread_t *thr, nxt_event_conn_t *c)
+nxt_event_conn_socket(nxt_task_t *task, nxt_event_conn_t *c)
{
nxt_uint_t family;
nxt_socket_t s;
- nxt_log_debug(thr->log, "event conn socket");
+ nxt_debug(task, "event conn socket");
family = c->remote->u.sockaddr.sa_family;
@@ -122,6 +128,10 @@ nxt_event_conn_socket(nxt_thread_t *thr, nxt_event_conn_t *c)
nxt_event_timer_ident(&c->read_timer, s);
nxt_event_timer_ident(&c->write_timer, s);
+ c->socket.task = task;
+ c->read_timer.task = task;
+ c->write_timer.task = task;
+
if (c->local != NULL) {
if (nxt_slow_path(nxt_socket_bind(s, c->local, 0) != NXT_OK)) {
nxt_socket_close(s);
@@ -134,7 +144,7 @@ nxt_event_conn_socket(nxt_thread_t *thr, nxt_event_conn_t *c)
void
-nxt_event_conn_connect_test(nxt_thread_t *thr, void *obj, void *data)
+nxt_event_conn_connect_test(nxt_task_t *task, void *obj, void *data)
{
int ret, err;
socklen_t len;
@@ -142,9 +152,9 @@ nxt_event_conn_connect_test(nxt_thread_t *thr, void *obj, void *data)
c = obj;
- nxt_log_debug(thr->log, "event connect test fd:%d", c->socket.fd);
+ nxt_debug(task, "event connect test fd:%d", c->socket.fd);
- nxt_event_fd_block_write(thr->engine, &c->socket);
+ nxt_event_fd_block_write(task->thread->engine, &c->socket);
if (c->write_state->autoreset_timer) {
nxt_event_timer_disable(&c->write_timer);
@@ -165,23 +175,23 @@ nxt_event_conn_connect_test(nxt_thread_t *thr, void *obj, void *data)
}
if (err == 0) {
- nxt_event_conn_io_handle(thr, c->write_work_queue,
- c->write_state->ready_handler, c, data);
+ nxt_event_conn_io_handle(task->thread, c->write_work_queue,
+ c->write_state->ready_handler, task, c, data);
return;
}
c->socket.error = err;
- nxt_log_error(nxt_socket_error_level(err, c->socket.log_error), thr->log,
- "connect(%d, %*s) failed %E",
- c->socket.fd, c->remote->text_len, c->remote->text, err);
+ nxt_log(task, nxt_socket_error_level(err, c->socket.log_error),
+ "connect(%d, %*s) failed %E",
+ c->socket.fd, c->remote->text_len, c->remote->text, err);
- nxt_event_conn_connect_error(thr, c, data);
+ nxt_event_conn_connect_error(task, c, data);
}
void
-nxt_event_conn_connect_error(nxt_thread_t *thr, void *obj, void *data)
+nxt_event_conn_connect_error(nxt_task_t *task, void *obj, void *data)
{
nxt_event_conn_t *c;
nxt_work_handler_t handler;
@@ -209,5 +219,6 @@ nxt_event_conn_connect_error(nxt_thread_t *thr, void *obj, void *data)
break;
}
- nxt_event_conn_io_handle(thr, c->write_work_queue, handler, c, data);
+ nxt_event_conn_io_handle(task->thread, c->write_work_queue, handler,
+ task, c, data);
}