diff options
Diffstat (limited to '')
-rw-r--r-- | src/nxt_event_conn_write.c | 78 |
1 files changed, 40 insertions, 38 deletions
diff --git a/src/nxt_event_conn_write.c b/src/nxt_event_conn_write.c index 0f35b5f3..d72ddc80 100644 --- a/src/nxt_event_conn_write.c +++ b/src/nxt_event_conn_write.c @@ -11,38 +11,41 @@ static void nxt_event_conn_average_rate_update(nxt_event_write_rate_t *rate, size_t sent, nxt_msec_t now); NXT_LIB_UNIT_TEST_STATIC double nxt_event_conn_exponential_approximation(double n); -static void nxt_event_conn_write_timer_handler(nxt_thread_t *thr, void *obj, +static void nxt_event_conn_write_timer_handler(nxt_task_t *task, void *obj, void *data); void -nxt_event_conn_write(nxt_thread_t *thr, nxt_event_conn_t *c) +nxt_event_conn_write(nxt_task_t *task, nxt_event_conn_t *c) { - if (thr->engine->batch != 0) { - nxt_event_conn_write_enqueue(thr, c); + if (task->thread->engine->batch != 0) { + nxt_event_conn_write_enqueue(task->thread, task, c); } else { - c->io->write(thr, c, c->socket.data); + c->io->write(task, c, c->socket.data); } } void -nxt_event_conn_io_write(nxt_thread_t *thr, void *obj, void *data) +nxt_event_conn_io_write(nxt_task_t *task, void *obj, void *data) { - size_t sent, limit; - ssize_t ret; - nxt_buf_t *b; - nxt_event_conn_t *c; + size_t sent, limit; + ssize_t ret; + nxt_buf_t *b; + nxt_event_conn_t *c; + nxt_event_engine_t *engine; c = obj; - nxt_log_debug(thr->log, "event conn write fd:%d", c->socket.fd); + nxt_debug(task, "event conn write fd:%d", c->socket.fd); if (!c->socket.write_ready || c->delayed || c->write == NULL) { return; } + engine = task->thread->engine; + c->socket.write_handler = nxt_event_conn_io_write; c->socket.error_handler = c->write_state->error_handler; @@ -54,7 +57,7 @@ nxt_event_conn_io_write(nxt_thread_t *thr, void *obj, void *data) while (limit != 0) { - ret = c->io->write_chunk(thr, c, b, limit); + ret = c->io->write_chunk(c, b, limit); if (ret < 0) { /* ret == NXT_AGAIN || ret == NXT_ERROR. */ @@ -65,7 +68,7 @@ nxt_event_conn_io_write(nxt_thread_t *thr, void *obj, void *data) limit -= ret; if (c->write_state->process_buffers) { - b = nxt_sendbuf_completion(thr, c->write_work_queue, b, ret); + b = nxt_sendbuf_completion(task, c->write_work_queue, b, ret); c->write = b; } else { @@ -73,7 +76,7 @@ nxt_event_conn_io_write(nxt_thread_t *thr, void *obj, void *data) } if (b == NULL) { - nxt_event_fd_block_write(thr->engine, &c->socket); + nxt_event_fd_block_write(engine, &c->socket); break; } @@ -83,7 +86,7 @@ nxt_event_conn_io_write(nxt_thread_t *thr, void *obj, void *data) } } - nxt_log_debug(thr->log, "event conn: %i sent:%z", ret, sent); + nxt_debug(task, "event conn: %i sent:%z", ret, sent); if (sent != 0) { if (c->write_state->autoreset_timer) { @@ -92,7 +95,7 @@ nxt_event_conn_io_write(nxt_thread_t *thr, void *obj, void *data) } if (ret != NXT_ERROR - && !nxt_event_conn_write_delayed(thr->engine, c, sent)) + && !nxt_event_conn_write_delayed(engine, c, sent)) { if (limit == 0) { /* @@ -100,7 +103,7 @@ nxt_event_conn_io_write(nxt_thread_t *thr, void *obj, void *data) * process other recevied events and to get new events. */ c->write_timer.handler = nxt_event_conn_write_timer_handler; - nxt_event_timer_add(thr->engine, &c->write_timer, 0); + nxt_event_timer_add(engine, &c->write_timer, 0); } else if (ret == NXT_AGAIN) { /* @@ -110,16 +113,15 @@ nxt_event_conn_io_write(nxt_thread_t *thr, void *obj, void *data) * can be set here because it should be set only for write * direction. */ - nxt_event_conn_timer(thr->engine, c, c->write_state, - &c->write_timer); + nxt_event_conn_timer(engine, c, c->write_state, &c->write_timer); } } if (ret == 0 || sent != 0) { /* "ret == 0" means a sync buffer was processed. */ c->sent += sent; - nxt_event_conn_io_handle(thr, c->write_work_queue, - c->write_state->ready_handler, c, data); + nxt_event_conn_io_handle(task->thread, c->write_work_queue, + c->write_state->ready_handler, task, c, data); /* * Fall through if first operations were * successful but the last one failed. @@ -127,10 +129,10 @@ nxt_event_conn_io_write(nxt_thread_t *thr, void *obj, void *data) } if (nxt_slow_path(ret == NXT_ERROR)) { - nxt_event_fd_block_write(thr->engine, &c->socket); + nxt_event_fd_block_write(engine, &c->socket); - nxt_event_conn_io_handle(thr, c->write_work_queue, - c->write_state->error_handler, c, data); + nxt_event_conn_io_handle(task->thread, c->write_work_queue, + c->write_state->error_handler, task, c, data); } } @@ -284,25 +286,24 @@ nxt_event_conn_exponential_approximation(double x) static void -nxt_event_conn_write_timer_handler(nxt_thread_t *thr, void *obj, void *data) +nxt_event_conn_write_timer_handler(nxt_task_t *task, void *obj, void *data) { nxt_event_conn_t *c; nxt_event_timer_t *ev; ev = obj; - nxt_log_debug(thr->log, "event conn conn timer"); + nxt_debug(task, "event conn conn timer"); c = nxt_event_write_timer_conn(ev); c->delayed = 0; - c->io->write(thr, c, c->socket.data); + c->io->write(task, c, c->socket.data); } ssize_t -nxt_event_conn_io_write_chunk(nxt_thread_t *thr, nxt_event_conn_t *c, - nxt_buf_t *b, size_t limit) +nxt_event_conn_io_write_chunk(nxt_event_conn_t *c, nxt_buf_t *b, size_t limit) { ssize_t ret; @@ -311,7 +312,7 @@ nxt_event_conn_io_write_chunk(nxt_thread_t *thr, nxt_event_conn_t *c, if ((ret == NXT_AGAIN || !c->socket.write_ready) && nxt_event_fd_is_disabled(c->socket.write)) { - nxt_event_fd_enable_write(thr->engine, &c->socket); + nxt_event_fd_enable_write(c->socket.task->thread->engine, &c->socket); } return ret; @@ -332,7 +333,7 @@ nxt_event_conn_io_sendbuf(nxt_event_conn_t *c, nxt_buf_t *b, size_t limit) sb.size = 0; sb.limit = limit; - niob = nxt_sendbuf_mem_coalesce(&sb); + niob = nxt_sendbuf_mem_coalesce(c->socket.task, &sb); if (niob == 0 && sb.sync) { return 0; @@ -400,8 +401,8 @@ nxt_event_conn_io_send(nxt_event_conn_t *c, void *buf, size_t size) err = (n == -1) ? nxt_socket_errno : 0; - nxt_log_debug(c->socket.log, "send(%d, %p, %uz): %z", - c->socket.fd, buf, size, n); + nxt_debug(c->socket.task, "send(%d, %p, %uz): %z", + c->socket.fd, buf, size, n); if (n > 0) { return n; @@ -412,19 +413,20 @@ nxt_event_conn_io_send(nxt_event_conn_t *c, void *buf, size_t size) switch (err) { case NXT_EAGAIN: - nxt_log_debug(c->socket.log, "send() %E", err); + nxt_debug(c->socket.task, "send() %E", err); c->socket.write_ready = 0; return NXT_AGAIN; case NXT_EINTR: - nxt_log_debug(c->socket.log, "send() %E", err); + nxt_debug(c->socket.task, "send() %E", err); continue; default: c->socket.error = err; - nxt_log_error(nxt_socket_error_level(err, c->socket.log_error), - c->socket.log, "send(%d, %p, %uz) failed %E", - c->socket.fd, buf, size, err); + nxt_log(c->socket.task, + nxt_socket_error_level(err, c->socket.log_error), + "send(%d, %p, %uz) failed %E", + c->socket.fd, buf, size, err); return NXT_ERROR; } } |