diff options
Diffstat (limited to '')
-rw-r--r-- | src/nxt_event_conn_connect.c | 71 |
1 files changed, 41 insertions, 30 deletions
diff --git a/src/nxt_event_conn_connect.c b/src/nxt_event_conn_connect.c index f614dcda..55554720 100644 --- a/src/nxt_event_conn_connect.c +++ b/src/nxt_event_conn_connect.c @@ -8,37 +8,38 @@ void -nxt_event_conn_connect(nxt_thread_t *thr, nxt_event_conn_t *c) +nxt_event_conn_connect(nxt_task_t *task, nxt_event_conn_t *c) { - void *data; + void *data; + nxt_event_engine_t *engine; data = c->socket.data; + engine = task->thread->engine; - if (thr->engine->batch != 0) { - nxt_thread_work_queue_add(thr, &thr->engine->socket_work_queue, - nxt_event_conn_batch_socket, c, data, - c->socket.log); + if (engine->batch != 0) { + nxt_thread_work_queue_add(task->thread, &engine->socket_work_queue, + nxt_event_conn_batch_socket, task, c, data); return; } - if (nxt_event_conn_socket(thr, c) == NXT_OK) { - c->io->connect(thr, c, data); + if (nxt_event_conn_socket(task, c) == NXT_OK) { + c->io->connect(task, c, data); return; } - c->write_state->error_handler(thr, c, data); + c->write_state->error_handler(task, c, data); } void -nxt_event_conn_batch_socket(nxt_thread_t *thr, void *obj, void *data) +nxt_event_conn_batch_socket(nxt_task_t *task, void *obj, void *data) { nxt_event_conn_t *c; nxt_work_handler_t handler; c = obj; - if (nxt_event_conn_socket(thr, c) == NXT_OK) { + if (nxt_event_conn_socket(task, c) == NXT_OK) { c->socket.write_work_queue = c->write_work_queue; handler = c->io->connect; @@ -46,16 +47,18 @@ nxt_event_conn_batch_socket(nxt_thread_t *thr, void *obj, void *data) handler = c->write_state->error_handler; } - nxt_thread_work_queue_add(thr, &thr->engine->connect_work_queue, - handler, c, data, thr->log); + nxt_thread_work_queue_add(task->thread, + &task->thread->engine->connect_work_queue, + handler, task, c, data); } void -nxt_event_conn_io_connect(nxt_thread_t *thr, void *obj, void *data) +nxt_event_conn_io_connect(nxt_task_t *task, void *obj, void *data) { nxt_event_conn_t *c; nxt_work_handler_t handler; + nxt_event_engine_t *engine; const nxt_event_conn_state_t *state; c = obj; @@ -73,9 +76,11 @@ nxt_event_conn_io_connect(nxt_thread_t *thr, void *obj, void *data) c->socket.write_handler = nxt_event_conn_connect_test; c->socket.error_handler = state->error_handler; - nxt_event_conn_timer(thr->engine, c, state, &c->write_timer); + engine = task->thread->engine; - nxt_event_fd_enable_write(thr->engine, &c->socket); + nxt_event_conn_timer(engine, c, state, &c->write_timer); + + nxt_event_fd_enable_write(engine, &c->socket); return; case NXT_DECLINED: @@ -87,17 +92,18 @@ nxt_event_conn_io_connect(nxt_thread_t *thr, void *obj, void *data) break; } - nxt_event_conn_io_handle(thr, c->write_work_queue, handler, c, data); + nxt_event_conn_io_handle(task->thread, c->write_work_queue, handler, task, + c, data); } nxt_int_t -nxt_event_conn_socket(nxt_thread_t *thr, nxt_event_conn_t *c) +nxt_event_conn_socket(nxt_task_t *task, nxt_event_conn_t *c) { nxt_uint_t family; nxt_socket_t s; - nxt_log_debug(thr->log, "event conn socket"); + nxt_debug(task, "event conn socket"); family = c->remote->u.sockaddr.sa_family; @@ -122,6 +128,10 @@ nxt_event_conn_socket(nxt_thread_t *thr, nxt_event_conn_t *c) nxt_event_timer_ident(&c->read_timer, s); nxt_event_timer_ident(&c->write_timer, s); + c->socket.task = task; + c->read_timer.task = task; + c->write_timer.task = task; + if (c->local != NULL) { if (nxt_slow_path(nxt_socket_bind(s, c->local, 0) != NXT_OK)) { nxt_socket_close(s); @@ -134,7 +144,7 @@ nxt_event_conn_socket(nxt_thread_t *thr, nxt_event_conn_t *c) void -nxt_event_conn_connect_test(nxt_thread_t *thr, void *obj, void *data) +nxt_event_conn_connect_test(nxt_task_t *task, void *obj, void *data) { int ret, err; socklen_t len; @@ -142,9 +152,9 @@ nxt_event_conn_connect_test(nxt_thread_t *thr, void *obj, void *data) c = obj; - nxt_log_debug(thr->log, "event connect test fd:%d", c->socket.fd); + nxt_debug(task, "event connect test fd:%d", c->socket.fd); - nxt_event_fd_block_write(thr->engine, &c->socket); + nxt_event_fd_block_write(task->thread->engine, &c->socket); if (c->write_state->autoreset_timer) { nxt_event_timer_disable(&c->write_timer); @@ -165,23 +175,23 @@ nxt_event_conn_connect_test(nxt_thread_t *thr, void *obj, void *data) } if (err == 0) { - nxt_event_conn_io_handle(thr, c->write_work_queue, - c->write_state->ready_handler, c, data); + nxt_event_conn_io_handle(task->thread, c->write_work_queue, + c->write_state->ready_handler, task, c, data); return; } c->socket.error = err; - nxt_log_error(nxt_socket_error_level(err, c->socket.log_error), thr->log, - "connect(%d, %*s) failed %E", - c->socket.fd, c->remote->text_len, c->remote->text, err); + nxt_log(task, nxt_socket_error_level(err, c->socket.log_error), + "connect(%d, %*s) failed %E", + c->socket.fd, c->remote->text_len, c->remote->text, err); - nxt_event_conn_connect_error(thr, c, data); + nxt_event_conn_connect_error(task, c, data); } void -nxt_event_conn_connect_error(nxt_thread_t *thr, void *obj, void *data) +nxt_event_conn_connect_error(nxt_task_t *task, void *obj, void *data) { nxt_event_conn_t *c; nxt_work_handler_t handler; @@ -209,5 +219,6 @@ nxt_event_conn_connect_error(nxt_thread_t *thr, void *obj, void *data) break; } - nxt_event_conn_io_handle(thr, c->write_work_queue, handler, c, data); + nxt_event_conn_io_handle(task->thread, c->write_work_queue, handler, + task, c, data); } |