summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_event_conn_write.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/nxt_event_conn_write.c431
1 files changed, 431 insertions, 0 deletions
diff --git a/src/nxt_event_conn_write.c b/src/nxt_event_conn_write.c
new file mode 100644
index 00000000..0f35b5f3
--- /dev/null
+++ b/src/nxt_event_conn_write.c
@@ -0,0 +1,431 @@
+
+/*
+ * Copyright (C) Igor Sysoev
+ * Copyright (C) NGINX, Inc.
+ */
+
+#include <nxt_main.h>
+
+
+static void nxt_event_conn_average_rate_update(nxt_event_write_rate_t *rate,
+ size_t sent, nxt_msec_t now);
+NXT_LIB_UNIT_TEST_STATIC double
+ nxt_event_conn_exponential_approximation(double n);
+static void nxt_event_conn_write_timer_handler(nxt_thread_t *thr, void *obj,
+ void *data);
+
+
+void
+nxt_event_conn_write(nxt_thread_t *thr, nxt_event_conn_t *c)
+{
+ if (thr->engine->batch != 0) {
+ nxt_event_conn_write_enqueue(thr, c);
+
+ } else {
+ c->io->write(thr, c, c->socket.data);
+ }
+}
+
+
+void
+nxt_event_conn_io_write(nxt_thread_t *thr, void *obj, void *data)
+{
+ size_t sent, limit;
+ ssize_t ret;
+ nxt_buf_t *b;
+ nxt_event_conn_t *c;
+
+ c = obj;
+
+ nxt_log_debug(thr->log, "event conn write fd:%d", c->socket.fd);
+
+ if (!c->socket.write_ready || c->delayed || c->write == NULL) {
+ return;
+ }
+
+ c->socket.write_handler = nxt_event_conn_io_write;
+ c->socket.error_handler = c->write_state->error_handler;
+
+ ret = NXT_DECLINED;
+ sent = 0;
+ b = c->write;
+
+ limit = nxt_event_conn_write_limit(c);
+
+ while (limit != 0) {
+
+ ret = c->io->write_chunk(thr, c, b, limit);
+
+ if (ret < 0) {
+ /* ret == NXT_AGAIN || ret == NXT_ERROR. */
+ break;
+ }
+
+ sent += ret;
+ limit -= ret;
+
+ if (c->write_state->process_buffers) {
+ b = nxt_sendbuf_completion(thr, c->write_work_queue, b, ret);
+ c->write = b;
+
+ } else {
+ b = nxt_sendbuf_update(b, ret);
+ }
+
+ if (b == NULL) {
+ nxt_event_fd_block_write(thr->engine, &c->socket);
+ break;
+ }
+
+ if (!c->socket.write_ready) {
+ ret = NXT_AGAIN;
+ break;
+ }
+ }
+
+ nxt_log_debug(thr->log, "event conn: %i sent:%z", ret, sent);
+
+ if (sent != 0) {
+ if (c->write_state->autoreset_timer) {
+ nxt_event_timer_disable(&c->write_timer);
+ }
+ }
+
+ if (ret != NXT_ERROR
+ && !nxt_event_conn_write_delayed(thr->engine, c, sent))
+ {
+ if (limit == 0) {
+ /*
+ * Postpone writing until next event poll to allow to
+ * process other recevied events and to get new events.
+ */
+ c->write_timer.handler = nxt_event_conn_write_timer_handler;
+ nxt_event_timer_add(thr->engine, &c->write_timer, 0);
+
+ } else if (ret == NXT_AGAIN) {
+ /*
+ * SSL libraries can require to toggle either write or read
+ * event if renegotiation occurs during SSL write operation.
+ * This case is handled on the event_io->send() level. Timer
+ * can be set here because it should be set only for write
+ * direction.
+ */
+ nxt_event_conn_timer(thr->engine, c, c->write_state,
+ &c->write_timer);
+ }
+ }
+
+ if (ret == 0 || sent != 0) {
+ /* "ret == 0" means a sync buffer was processed. */
+ c->sent += sent;
+ nxt_event_conn_io_handle(thr, c->write_work_queue,
+ c->write_state->ready_handler, c, data);
+ /*
+ * Fall through if first operations were
+ * successful but the last one failed.
+ */
+ }
+
+ if (nxt_slow_path(ret == NXT_ERROR)) {
+ nxt_event_fd_block_write(thr->engine, &c->socket);
+
+ nxt_event_conn_io_handle(thr, c->write_work_queue,
+ c->write_state->error_handler, c, data);
+ }
+}
+
+
+size_t
+nxt_event_conn_write_limit(nxt_event_conn_t *c)
+{
+ ssize_t limit, correction;
+ nxt_event_write_rate_t *rate;
+
+ rate = c->rate;
+
+ if (rate == NULL) {
+ return c->max_chunk;
+ }
+
+ limit = rate->limit;
+ correction = limit - (size_t) rate->average;
+
+ nxt_log_debug(c->socket.log, "event conn correction:%z average:%0.3f",
+ correction, rate->average);
+
+ limit += correction;
+
+ if (limit <= 0) {
+ return 0;
+ }
+
+ if (rate->limit_after != 0) {
+ limit += rate->limit_after;
+ limit = nxt_min((size_t) limit, rate->max_limit);
+ }
+
+ return nxt_min((size_t) limit, c->max_chunk);
+}
+
+
+nxt_bool_t
+nxt_event_conn_write_delayed(nxt_event_engine_t *engine, nxt_event_conn_t *c,
+ size_t sent)
+{
+ nxt_msec_t timer;
+ nxt_event_write_rate_t *rate;
+
+ rate = c->rate;
+
+ if (rate != NULL) {
+ nxt_event_conn_average_rate_update(rate, sent, engine->timers.now);
+
+ if (rate->limit_after == 0) {
+ timer = sent * 1000 / rate->limit;
+
+ } else if (rate->limit_after >= sent) {
+ timer = sent * 1000 / rate->max_limit;
+ rate->limit_after -= sent;
+
+ } else {
+ sent -= rate->limit_after;
+ timer = rate->limit_after * 1000 / rate->max_limit
+ + sent * 1000 / rate->limit;
+ rate->limit_after = 0;
+ }
+
+ nxt_log_debug(c->socket.log, "event conn timer: %M", timer);
+
+ if (timer != 0) {
+ c->delayed = 1;
+
+ nxt_event_fd_block_write(engine, &c->socket);
+
+ c->write_timer.handler = nxt_event_conn_write_timer_handler;
+ nxt_event_timer_add(engine, &c->write_timer, timer);
+
+ return 1;
+ }
+ }
+
+ return 0;
+}
+
+
+/* Exponentially weighted moving average rate for a given interval. */
+
+static void
+nxt_event_conn_average_rate_update(nxt_event_write_rate_t *rate, size_t sent,
+ nxt_msec_t now)
+{
+ double weight, delta;
+ nxt_msec_t elapsed;
+ const nxt_uint_t interval = 10; /* 10s */
+
+ elapsed = now - rate->last;
+
+ if (elapsed == 0) {
+ return;
+ }
+
+ rate->last = now;
+ delta = (double) elapsed / 1000;
+
+ weight = nxt_event_conn_exponential_approximation(-delta / interval);
+
+ rate->average = (1 - weight) * sent / delta + weight * rate->average;
+
+ nxt_thread_log_debug("event conn delta:%0.3f, weight:%0.3f, average:%0.3f",
+ delta, weight, rate->average);
+}
+
+
+/*
+ * exp() takes tens or hundreds nanoseconds on modern CPU.
+ * This is a faster exp() approximation based on IEEE-754 format
+ * layout and described in "A Fast, Compact Approximation of
+ * the Exponential Function" * by N. N. Schraudolph, 1999.
+ */
+
+NXT_LIB_UNIT_TEST_STATIC double
+nxt_event_conn_exponential_approximation(double x)
+{
+ union {
+ double d;
+ int64_t n;
+ } exp;
+
+ if (x < -100) {
+ /*
+ * The approximation is correct in -700 to 700 range.
+ * The "x" argument is always negative.
+ */
+ return 0;
+ }
+
+ /*
+ * x * 2^52 / ln(2) + (1023 * 2^52 - 261140389990637.73
+ *
+ * 52 is the number of mantissa bits;
+ * 1023 is the exponent bias;
+ * 261140389990637.73 is the adjustment parameter to
+ * improve the approximation. The parameter is equal to
+ *
+ * 2^52 * ln[ 3 / (8 * ln(2)) + 0.5 ] / ln(2)
+ *
+ * Only significant digits of the double float format
+ * are used to present the double float constants.
+ */
+ exp.n = x * 4503599627370496.0 / 0.69314718055994530
+ + (4607182418800017408.0 - 261140389990637.73);
+
+ return exp.d;
+}
+
+
+static void
+nxt_event_conn_write_timer_handler(nxt_thread_t *thr, void *obj, void *data)
+{
+ nxt_event_conn_t *c;
+ nxt_event_timer_t *ev;
+
+ ev = obj;
+
+ nxt_log_debug(thr->log, "event conn conn timer");
+
+ c = nxt_event_write_timer_conn(ev);
+ c->delayed = 0;
+
+ c->io->write(thr, c, c->socket.data);
+}
+
+
+ssize_t
+nxt_event_conn_io_write_chunk(nxt_thread_t *thr, nxt_event_conn_t *c,
+ nxt_buf_t *b, size_t limit)
+{
+ ssize_t ret;
+
+ ret = c->io->sendbuf(c, b, limit);
+
+ if ((ret == NXT_AGAIN || !c->socket.write_ready)
+ && nxt_event_fd_is_disabled(c->socket.write))
+ {
+ nxt_event_fd_enable_write(thr->engine, &c->socket);
+ }
+
+ return ret;
+}
+
+
+ssize_t
+nxt_event_conn_io_sendbuf(nxt_event_conn_t *c, nxt_buf_t *b, size_t limit)
+{
+ nxt_uint_t niob;
+ struct iovec iob[NXT_IOBUF_MAX];
+ nxt_sendbuf_coalesce_t sb;
+
+ sb.buf = b;
+ sb.iobuf = iob;
+ sb.nmax = NXT_IOBUF_MAX;
+ sb.sync = 0;
+ sb.size = 0;
+ sb.limit = limit;
+
+ niob = nxt_sendbuf_mem_coalesce(&sb);
+
+ if (niob == 0 && sb.sync) {
+ return 0;
+ }
+
+ return nxt_event_conn_io_writev(c, iob, niob);
+}
+
+
+ssize_t
+nxt_event_conn_io_writev(nxt_event_conn_t *c, nxt_iobuf_t *iob, nxt_uint_t niob)
+{
+ ssize_t n;
+ nxt_err_t err;
+
+ if (niob == 1) {
+ /* Disposal of surplus kernel iovec copy-in operation. */
+ return nxt_event_conn_io_send(c, iob->iov_base, iob->iov_len);
+ }
+
+ for ( ;; ) {
+ n = writev(c->socket.fd, iob, niob);
+
+ err = (n == -1) ? nxt_socket_errno : 0;
+
+ nxt_log_debug(c->socket.log, "writev(%d, %ui): %d",
+ c->socket.fd, niob, n);
+
+ if (n > 0) {
+ return n;
+ }
+
+ /* n == -1 */
+
+ switch (err) {
+
+ case NXT_EAGAIN:
+ nxt_log_debug(c->socket.log, "writev() %E", err);
+ c->socket.write_ready = 0;
+ return NXT_AGAIN;
+
+ case NXT_EINTR:
+ nxt_log_debug(c->socket.log, "writev() %E", err);
+ continue;
+
+ default:
+ c->socket.error = err;
+ nxt_log_error(nxt_socket_error_level(err, c->socket.log_error),
+ c->socket.log, "writev(%d, %ui) failed %E",
+ c->socket.fd, niob, err);
+ return NXT_ERROR;
+ }
+ }
+}
+
+
+ssize_t
+nxt_event_conn_io_send(nxt_event_conn_t *c, void *buf, size_t size)
+{
+ ssize_t n;
+ nxt_err_t err;
+
+ for ( ;; ) {
+ n = send(c->socket.fd, buf, size, 0);
+
+ err = (n == -1) ? nxt_socket_errno : 0;
+
+ nxt_log_debug(c->socket.log, "send(%d, %p, %uz): %z",
+ c->socket.fd, buf, size, n);
+
+ if (n > 0) {
+ return n;
+ }
+
+ /* n == -1 */
+
+ switch (err) {
+
+ case NXT_EAGAIN:
+ nxt_log_debug(c->socket.log, "send() %E", err);
+ c->socket.write_ready = 0;
+ return NXT_AGAIN;
+
+ case NXT_EINTR:
+ nxt_log_debug(c->socket.log, "send() %E", err);
+ continue;
+
+ default:
+ c->socket.error = err;
+ nxt_log_error(nxt_socket_error_level(err, c->socket.log_error),
+ c->socket.log, "send(%d, %p, %uz) failed %E",
+ c->socket.fd, buf, size, err);
+ return NXT_ERROR;
+ }
+ }
+}