summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_event_conn_write.c
diff options
context:
space:
mode:
authorIgor Sysoev <igor@sysoev.ru>2017-02-22 15:09:59 +0300
committerIgor Sysoev <igor@sysoev.ru>2017-02-22 15:09:59 +0300
commit029942f4eb7196c2cff0d0e26bc6ff274138f7d8 (patch)
treef4686c4d7b9cd574fe94c6f4918479a580fecf75 /src/nxt_event_conn_write.c
parent059a8642898a6bd4b47d13a1c1d599cd44af7e1c (diff)
downloadunit-029942f4eb7196c2cff0d0e26bc6ff274138f7d8.tar.gz
unit-029942f4eb7196c2cff0d0e26bc6ff274138f7d8.tar.bz2
I/O operations refactoring.
Diffstat (limited to 'src/nxt_event_conn_write.c')
-rw-r--r--src/nxt_event_conn_write.c309
1 files changed, 143 insertions, 166 deletions
diff --git a/src/nxt_event_conn_write.c b/src/nxt_event_conn_write.c
index 72d0731b..a0f6f953 100644
--- a/src/nxt_event_conn_write.c
+++ b/src/nxt_event_conn_write.c
@@ -7,32 +7,16 @@
#include <nxt_main.h>
-static void nxt_event_conn_average_rate_update(nxt_event_write_rate_t *rate,
- size_t sent, nxt_msec_t now);
-NXT_LIB_UNIT_TEST_STATIC double
- nxt_event_conn_exponential_approximation(double n);
static void nxt_event_conn_write_timer_handler(nxt_task_t *task, void *obj,
void *data);
void
-nxt_event_conn_write(nxt_task_t *task, nxt_event_conn_t *c)
+nxt_conn_io_write(nxt_task_t *task, void *obj, void *data)
{
- if (task->thread->engine->batch != 0) {
- nxt_event_conn_write_enqueue(task->thread, task, c);
-
- } else {
- c->io->write(task, c, c->socket.data);
- }
-}
-
-
-void
-nxt_event_conn_io_write(nxt_task_t *task, void *obj, void *data)
-{
- size_t sent, limit;
ssize_t ret;
nxt_buf_t *b;
+ nxt_sendbuf_t sb;
nxt_event_conn_t *c;
nxt_event_engine_t *engine;
@@ -40,40 +24,37 @@ nxt_event_conn_io_write(nxt_task_t *task, void *obj, void *data)
nxt_debug(task, "event conn write fd:%d", c->socket.fd);
- if (!c->socket.write_ready || c->delayed || c->write == NULL) {
+ if (!c->socket.write_ready || c->write == NULL) {
return;
}
engine = task->thread->engine;
- c->socket.write_handler = nxt_event_conn_io_write;
+ c->socket.write_handler = nxt_conn_io_write;
c->socket.error_handler = c->write_state->error_handler;
- ret = NXT_DECLINED;
- sent = 0;
b = c->write;
- limit = nxt_event_conn_write_limit(c);
-
- while (limit != 0) {
+ sb.socket = c->socket.fd;
+ sb.sent = 0;
+ sb.size = 0;
+ sb.buf = b;
+ sb.limit = 10 * 1024 * 1024;
+ sb.ready = 1;
+ sb.sync = 0;
- ret = c->io->write_chunk(c, b, limit);
+ do {
+ ret = nxt_conn_io_sendbuf(task, &sb);
if (ret < 0) {
/* ret == NXT_AGAIN || ret == NXT_ERROR. */
break;
}
- sent += ret;
- limit -= ret;
+ sb.sent += ret;
+ sb.limit -= ret;
- if (c->write_state->process_buffers) {
- b = nxt_sendbuf_completion(task, c->write_work_queue, b, ret);
- c->write = b;
-
- } else {
- b = nxt_sendbuf_update(b, ret);
- }
+ b = nxt_sendbuf_update(b, ret);
if (b == NULL) {
nxt_fd_event_block_write(engine, &c->socket);
@@ -84,20 +65,20 @@ nxt_event_conn_io_write(nxt_task_t *task, void *obj, void *data)
ret = NXT_AGAIN;
break;
}
- }
- nxt_debug(task, "event conn: %i sent:%z", ret, sent);
+ } while (sb.limit != 0);
- if (sent != 0) {
+ nxt_debug(task, "event conn: %i sent:%z", ret, sb.sent);
+
+ if (sb.sent != 0) {
if (c->write_state->autoreset_timer) {
nxt_timer_disable(engine, &c->write_timer);
}
}
- if (ret != NXT_ERROR
- && !nxt_event_conn_write_delayed(engine, c, sent))
- {
- if (limit == 0) {
+ if (ret != NXT_ERROR) {
+
+ if (sb.limit == 0) {
/*
* Postpone writing until next event poll to allow to
* process other recevied events and to get new events.
@@ -117,11 +98,11 @@ nxt_event_conn_io_write(nxt_task_t *task, void *obj, void *data)
}
}
- if (ret == 0 || sent != 0) {
+ if (ret == 0 || sb.sent != 0) {
/* "ret == 0" means a sync buffer was processed. */
- c->sent += sent;
- nxt_event_conn_io_handle(task->thread, c->write_work_queue,
- c->write_state->ready_handler, task, c, data);
+ c->sent += sb.sent;
+ nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler,
+ task, c, data);
/*
* Fall through if first operations were
* successful but the last one failed.
@@ -131,8 +112,8 @@ nxt_event_conn_io_write(nxt_task_t *task, void *obj, void *data)
if (nxt_slow_path(ret == NXT_ERROR)) {
nxt_fd_event_block_write(engine, &c->socket);
- nxt_event_conn_io_handle(task->thread, c->write_work_queue,
- c->write_state->error_handler, task, c, data);
+ nxt_work_queue_add(c->write_work_queue, c->write_state->error_handler,
+ task, c, data);
}
}
@@ -152,8 +133,8 @@ nxt_event_conn_write_limit(nxt_event_conn_t *c)
limit = rate->limit;
correction = limit - (size_t) rate->average;
- nxt_log_debug(c->socket.log, "event conn correction:%z average:%0.3f",
- correction, rate->average);
+ nxt_debug(c->socket.task, "event conn correction:%z average:%0.3f",
+ correction, rate->average);
limit += correction;
@@ -174,117 +155,10 @@ nxt_bool_t
nxt_event_conn_write_delayed(nxt_event_engine_t *engine, nxt_event_conn_t *c,
size_t sent)
{
- nxt_msec_t timer;
- nxt_event_write_rate_t *rate;
-
- rate = c->rate;
-
- if (rate != NULL) {
- nxt_event_conn_average_rate_update(rate, sent, engine->timers.now);
-
- if (rate->limit_after == 0) {
- timer = sent * 1000 / rate->limit;
-
- } else if (rate->limit_after >= sent) {
- timer = sent * 1000 / rate->max_limit;
- rate->limit_after -= sent;
-
- } else {
- sent -= rate->limit_after;
- timer = rate->limit_after * 1000 / rate->max_limit
- + sent * 1000 / rate->limit;
- rate->limit_after = 0;
- }
-
- nxt_log_debug(c->socket.log, "event conn timer: %M", timer);
-
- if (timer != 0) {
- c->delayed = 1;
-
- nxt_fd_event_block_write(engine, &c->socket);
-
- c->write_timer.handler = nxt_event_conn_write_timer_handler;
- nxt_timer_add(engine, &c->write_timer, timer);
-
- return 1;
- }
- }
-
return 0;
}
-/* Exponentially weighted moving average rate for a given interval. */
-
-static void
-nxt_event_conn_average_rate_update(nxt_event_write_rate_t *rate, size_t sent,
- nxt_msec_t now)
-{
- double weight, delta;
- nxt_msec_t elapsed;
- const nxt_uint_t interval = 10; /* 10s */
-
- elapsed = now - rate->last;
-
- if (elapsed == 0) {
- return;
- }
-
- rate->last = now;
- delta = (double) elapsed / 1000;
-
- weight = nxt_event_conn_exponential_approximation(-delta / interval);
-
- rate->average = (1 - weight) * sent / delta + weight * rate->average;
-
- nxt_thread_log_debug("event conn delta:%0.3f, weight:%0.3f, average:%0.3f",
- delta, weight, rate->average);
-}
-
-
-/*
- * exp() takes tens or hundreds nanoseconds on modern CPU.
- * This is a faster exp() approximation based on IEEE-754 format
- * layout and described in "A Fast, Compact Approximation of
- * the Exponential Function" * by N. N. Schraudolph, 1999.
- */
-
-NXT_LIB_UNIT_TEST_STATIC double
-nxt_event_conn_exponential_approximation(double x)
-{
- union {
- double d;
- int64_t n;
- } exp;
-
- if (x < -100) {
- /*
- * The approximation is correct in -700 to 700 range.
- * The "x" argument is always negative.
- */
- return 0;
- }
-
- /*
- * x * 2^52 / ln(2) + (1023 * 2^52 - 261140389990637.73
- *
- * 52 is the number of mantissa bits;
- * 1023 is the exponent bias;
- * 261140389990637.73 is the adjustment parameter to
- * improve the approximation. The parameter is equal to
- *
- * 2^52 * ln[ 3 / (8 * ln(2)) + 0.5 ] / ln(2)
- *
- * Only significant digits of the double float format
- * are used to present the double float constants.
- */
- exp.n = x * 4503599627370496.0 / 0.69314718055994530
- + (4607182418800017408.0 - 261140389990637.73);
-
- return exp.d;
-}
-
-
static void
nxt_event_conn_write_timer_handler(nxt_task_t *task, void *obj, void *data)
{
@@ -359,8 +233,7 @@ nxt_event_conn_io_writev(nxt_event_conn_t *c, nxt_iobuf_t *iob, nxt_uint_t niob)
err = (n == -1) ? nxt_socket_errno : 0;
- nxt_log_debug(c->socket.log, "writev(%d, %ui): %d",
- c->socket.fd, niob, n);
+ nxt_debug(c->socket.task, "writev(%d, %ui): %d", c->socket.fd, niob, n);
if (n > 0) {
return n;
@@ -371,19 +244,18 @@ nxt_event_conn_io_writev(nxt_event_conn_t *c, nxt_iobuf_t *iob, nxt_uint_t niob)
switch (err) {
case NXT_EAGAIN:
- nxt_log_debug(c->socket.log, "writev() %E", err);
+ nxt_debug(c->socket.task, "writev() %E", err);
c->socket.write_ready = 0;
return NXT_AGAIN;
case NXT_EINTR:
- nxt_log_debug(c->socket.log, "writev() %E", err);
+ nxt_debug(c->socket.task, "writev() %E", err);
continue;
default:
c->socket.error = err;
- nxt_log_error(nxt_socket_error_level(err, c->socket.log_error),
- c->socket.log, "writev(%d, %ui) failed %E",
- c->socket.fd, niob, err);
+ nxt_log(c->socket.task, nxt_socket_error_level(err),
+ "writev(%d, %ui) failed %E", c->socket.fd, niob, err);
return NXT_ERROR;
}
}
@@ -423,11 +295,116 @@ nxt_event_conn_io_send(nxt_event_conn_t *c, void *buf, size_t size)
default:
c->socket.error = err;
- nxt_log(c->socket.task,
- nxt_socket_error_level(err, c->socket.log_error),
+ nxt_log(c->socket.task, nxt_socket_error_level(err),
"send(%d, %p, %uz) failed %E",
c->socket.fd, buf, size, err);
return NXT_ERROR;
}
}
}
+
+
+ssize_t
+nxt_conn_io_sendbuf(nxt_task_t *task, nxt_sendbuf_t *sb)
+{
+ nxt_uint_t niov;
+ struct iovec iov[NXT_IOBUF_MAX];
+
+ niov = nxt_sendbuf_mem_coalesce0(task, sb, iov, NXT_IOBUF_MAX);
+
+ if (niov == 0 && sb->sync) {
+ return 0;
+ }
+
+ return nxt_conn_io_writev(task, sb, iov, niov);
+}
+
+
+ssize_t
+nxt_conn_io_writev(nxt_task_t *task, nxt_sendbuf_t *sb, struct iovec *iov,
+ nxt_uint_t niov)
+{
+ ssize_t n;
+ nxt_err_t err;
+
+ if (niov == 1) {
+ /* Disposal of surplus kernel iovec copy-in operation. */
+ return nxt_conn_io_send(task, sb, iov[0].iov_base, iov[0].iov_len);
+ }
+
+ for ( ;; ) {
+ n = writev(sb->socket, iov, niov);
+
+ err = (n == -1) ? nxt_socket_errno : 0;
+
+ nxt_debug(task, "writev(%d, %ui): %d", sb->socket, niov, n);
+
+ if (n > 0) {
+ return n;
+ }
+
+ /* n == -1 */
+
+ switch (err) {
+
+ case NXT_EAGAIN:
+ sb->ready = 0;
+ nxt_debug(task, "writev() %E", err);
+
+ return NXT_AGAIN;
+
+ case NXT_EINTR:
+ nxt_debug(task, "writev() %E", err);
+ continue;
+
+ default:
+ sb->error = err;
+ nxt_log(task, nxt_socket_error_level(err),
+ "writev(%d, %ui) failed %E", sb->socket, niov, err);
+
+ return NXT_ERROR;
+ }
+ }
+}
+
+
+ssize_t
+nxt_conn_io_send(nxt_task_t *task, nxt_sendbuf_t *sb, void *buf, size_t size)
+{
+ ssize_t n;
+ nxt_err_t err;
+
+ for ( ;; ) {
+ n = send(sb->socket, buf, size, 0);
+
+ err = (n == -1) ? nxt_socket_errno : 0;
+
+ nxt_debug(task, "send(%d, %p, %uz): %z", sb->socket, buf, size, n);
+
+ if (n > 0) {
+ return n;
+ }
+
+ /* n == -1 */
+
+ switch (err) {
+
+ case NXT_EAGAIN:
+ sb->ready = 0;
+ nxt_debug(task, "send() %E", err);
+
+ return NXT_AGAIN;
+
+ case NXT_EINTR:
+ nxt_debug(task, "send() %E", err);
+ continue;
+
+ default:
+ sb->error = err;
+ nxt_log(task, nxt_socket_error_level(err),
+ "send(%d, %p, %uz) failed %E", sb->socket, buf, size, err);
+
+ return NXT_ERROR;
+ }
+ }
+}