diff options
author | Igor Sysoev <igor@sysoev.ru> | 2017-02-22 15:09:59 +0300 |
---|---|---|
committer | Igor Sysoev <igor@sysoev.ru> | 2017-02-22 15:09:59 +0300 |
commit | 029942f4eb7196c2cff0d0e26bc6ff274138f7d8 (patch) | |
tree | f4686c4d7b9cd574fe94c6f4918479a580fecf75 /src/nxt_event_conn_write.c | |
parent | 059a8642898a6bd4b47d13a1c1d599cd44af7e1c (diff) | |
download | unit-029942f4eb7196c2cff0d0e26bc6ff274138f7d8.tar.gz unit-029942f4eb7196c2cff0d0e26bc6ff274138f7d8.tar.bz2 |
I/O operations refactoring.
Diffstat (limited to 'src/nxt_event_conn_write.c')
-rw-r--r-- | src/nxt_event_conn_write.c | 309 |
1 files changed, 143 insertions, 166 deletions
diff --git a/src/nxt_event_conn_write.c b/src/nxt_event_conn_write.c index 72d0731b..a0f6f953 100644 --- a/src/nxt_event_conn_write.c +++ b/src/nxt_event_conn_write.c @@ -7,32 +7,16 @@ #include <nxt_main.h> -static void nxt_event_conn_average_rate_update(nxt_event_write_rate_t *rate, - size_t sent, nxt_msec_t now); -NXT_LIB_UNIT_TEST_STATIC double - nxt_event_conn_exponential_approximation(double n); static void nxt_event_conn_write_timer_handler(nxt_task_t *task, void *obj, void *data); void -nxt_event_conn_write(nxt_task_t *task, nxt_event_conn_t *c) +nxt_conn_io_write(nxt_task_t *task, void *obj, void *data) { - if (task->thread->engine->batch != 0) { - nxt_event_conn_write_enqueue(task->thread, task, c); - - } else { - c->io->write(task, c, c->socket.data); - } -} - - -void -nxt_event_conn_io_write(nxt_task_t *task, void *obj, void *data) -{ - size_t sent, limit; ssize_t ret; nxt_buf_t *b; + nxt_sendbuf_t sb; nxt_event_conn_t *c; nxt_event_engine_t *engine; @@ -40,40 +24,37 @@ nxt_event_conn_io_write(nxt_task_t *task, void *obj, void *data) nxt_debug(task, "event conn write fd:%d", c->socket.fd); - if (!c->socket.write_ready || c->delayed || c->write == NULL) { + if (!c->socket.write_ready || c->write == NULL) { return; } engine = task->thread->engine; - c->socket.write_handler = nxt_event_conn_io_write; + c->socket.write_handler = nxt_conn_io_write; c->socket.error_handler = c->write_state->error_handler; - ret = NXT_DECLINED; - sent = 0; b = c->write; - limit = nxt_event_conn_write_limit(c); - - while (limit != 0) { + sb.socket = c->socket.fd; + sb.sent = 0; + sb.size = 0; + sb.buf = b; + sb.limit = 10 * 1024 * 1024; + sb.ready = 1; + sb.sync = 0; - ret = c->io->write_chunk(c, b, limit); + do { + ret = nxt_conn_io_sendbuf(task, &sb); if (ret < 0) { /* ret == NXT_AGAIN || ret == NXT_ERROR. */ break; } - sent += ret; - limit -= ret; + sb.sent += ret; + sb.limit -= ret; - if (c->write_state->process_buffers) { - b = nxt_sendbuf_completion(task, c->write_work_queue, b, ret); - c->write = b; - - } else { - b = nxt_sendbuf_update(b, ret); - } + b = nxt_sendbuf_update(b, ret); if (b == NULL) { nxt_fd_event_block_write(engine, &c->socket); @@ -84,20 +65,20 @@ nxt_event_conn_io_write(nxt_task_t *task, void *obj, void *data) ret = NXT_AGAIN; break; } - } - nxt_debug(task, "event conn: %i sent:%z", ret, sent); + } while (sb.limit != 0); - if (sent != 0) { + nxt_debug(task, "event conn: %i sent:%z", ret, sb.sent); + + if (sb.sent != 0) { if (c->write_state->autoreset_timer) { nxt_timer_disable(engine, &c->write_timer); } } - if (ret != NXT_ERROR - && !nxt_event_conn_write_delayed(engine, c, sent)) - { - if (limit == 0) { + if (ret != NXT_ERROR) { + + if (sb.limit == 0) { /* * Postpone writing until next event poll to allow to * process other recevied events and to get new events. @@ -117,11 +98,11 @@ nxt_event_conn_io_write(nxt_task_t *task, void *obj, void *data) } } - if (ret == 0 || sent != 0) { + if (ret == 0 || sb.sent != 0) { /* "ret == 0" means a sync buffer was processed. */ - c->sent += sent; - nxt_event_conn_io_handle(task->thread, c->write_work_queue, - c->write_state->ready_handler, task, c, data); + c->sent += sb.sent; + nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler, + task, c, data); /* * Fall through if first operations were * successful but the last one failed. @@ -131,8 +112,8 @@ nxt_event_conn_io_write(nxt_task_t *task, void *obj, void *data) if (nxt_slow_path(ret == NXT_ERROR)) { nxt_fd_event_block_write(engine, &c->socket); - nxt_event_conn_io_handle(task->thread, c->write_work_queue, - c->write_state->error_handler, task, c, data); + nxt_work_queue_add(c->write_work_queue, c->write_state->error_handler, + task, c, data); } } @@ -152,8 +133,8 @@ nxt_event_conn_write_limit(nxt_event_conn_t *c) limit = rate->limit; correction = limit - (size_t) rate->average; - nxt_log_debug(c->socket.log, "event conn correction:%z average:%0.3f", - correction, rate->average); + nxt_debug(c->socket.task, "event conn correction:%z average:%0.3f", + correction, rate->average); limit += correction; @@ -174,117 +155,10 @@ nxt_bool_t nxt_event_conn_write_delayed(nxt_event_engine_t *engine, nxt_event_conn_t *c, size_t sent) { - nxt_msec_t timer; - nxt_event_write_rate_t *rate; - - rate = c->rate; - - if (rate != NULL) { - nxt_event_conn_average_rate_update(rate, sent, engine->timers.now); - - if (rate->limit_after == 0) { - timer = sent * 1000 / rate->limit; - - } else if (rate->limit_after >= sent) { - timer = sent * 1000 / rate->max_limit; - rate->limit_after -= sent; - - } else { - sent -= rate->limit_after; - timer = rate->limit_after * 1000 / rate->max_limit - + sent * 1000 / rate->limit; - rate->limit_after = 0; - } - - nxt_log_debug(c->socket.log, "event conn timer: %M", timer); - - if (timer != 0) { - c->delayed = 1; - - nxt_fd_event_block_write(engine, &c->socket); - - c->write_timer.handler = nxt_event_conn_write_timer_handler; - nxt_timer_add(engine, &c->write_timer, timer); - - return 1; - } - } - return 0; } -/* Exponentially weighted moving average rate for a given interval. */ - -static void -nxt_event_conn_average_rate_update(nxt_event_write_rate_t *rate, size_t sent, - nxt_msec_t now) -{ - double weight, delta; - nxt_msec_t elapsed; - const nxt_uint_t interval = 10; /* 10s */ - - elapsed = now - rate->last; - - if (elapsed == 0) { - return; - } - - rate->last = now; - delta = (double) elapsed / 1000; - - weight = nxt_event_conn_exponential_approximation(-delta / interval); - - rate->average = (1 - weight) * sent / delta + weight * rate->average; - - nxt_thread_log_debug("event conn delta:%0.3f, weight:%0.3f, average:%0.3f", - delta, weight, rate->average); -} - - -/* - * exp() takes tens or hundreds nanoseconds on modern CPU. - * This is a faster exp() approximation based on IEEE-754 format - * layout and described in "A Fast, Compact Approximation of - * the Exponential Function" * by N. N. Schraudolph, 1999. - */ - -NXT_LIB_UNIT_TEST_STATIC double -nxt_event_conn_exponential_approximation(double x) -{ - union { - double d; - int64_t n; - } exp; - - if (x < -100) { - /* - * The approximation is correct in -700 to 700 range. - * The "x" argument is always negative. - */ - return 0; - } - - /* - * x * 2^52 / ln(2) + (1023 * 2^52 - 261140389990637.73 - * - * 52 is the number of mantissa bits; - * 1023 is the exponent bias; - * 261140389990637.73 is the adjustment parameter to - * improve the approximation. The parameter is equal to - * - * 2^52 * ln[ 3 / (8 * ln(2)) + 0.5 ] / ln(2) - * - * Only significant digits of the double float format - * are used to present the double float constants. - */ - exp.n = x * 4503599627370496.0 / 0.69314718055994530 - + (4607182418800017408.0 - 261140389990637.73); - - return exp.d; -} - - static void nxt_event_conn_write_timer_handler(nxt_task_t *task, void *obj, void *data) { @@ -359,8 +233,7 @@ nxt_event_conn_io_writev(nxt_event_conn_t *c, nxt_iobuf_t *iob, nxt_uint_t niob) err = (n == -1) ? nxt_socket_errno : 0; - nxt_log_debug(c->socket.log, "writev(%d, %ui): %d", - c->socket.fd, niob, n); + nxt_debug(c->socket.task, "writev(%d, %ui): %d", c->socket.fd, niob, n); if (n > 0) { return n; @@ -371,19 +244,18 @@ nxt_event_conn_io_writev(nxt_event_conn_t *c, nxt_iobuf_t *iob, nxt_uint_t niob) switch (err) { case NXT_EAGAIN: - nxt_log_debug(c->socket.log, "writev() %E", err); + nxt_debug(c->socket.task, "writev() %E", err); c->socket.write_ready = 0; return NXT_AGAIN; case NXT_EINTR: - nxt_log_debug(c->socket.log, "writev() %E", err); + nxt_debug(c->socket.task, "writev() %E", err); continue; default: c->socket.error = err; - nxt_log_error(nxt_socket_error_level(err, c->socket.log_error), - c->socket.log, "writev(%d, %ui) failed %E", - c->socket.fd, niob, err); + nxt_log(c->socket.task, nxt_socket_error_level(err), + "writev(%d, %ui) failed %E", c->socket.fd, niob, err); return NXT_ERROR; } } @@ -423,11 +295,116 @@ nxt_event_conn_io_send(nxt_event_conn_t *c, void *buf, size_t size) default: c->socket.error = err; - nxt_log(c->socket.task, - nxt_socket_error_level(err, c->socket.log_error), + nxt_log(c->socket.task, nxt_socket_error_level(err), "send(%d, %p, %uz) failed %E", c->socket.fd, buf, size, err); return NXT_ERROR; } } } + + +ssize_t +nxt_conn_io_sendbuf(nxt_task_t *task, nxt_sendbuf_t *sb) +{ + nxt_uint_t niov; + struct iovec iov[NXT_IOBUF_MAX]; + + niov = nxt_sendbuf_mem_coalesce0(task, sb, iov, NXT_IOBUF_MAX); + + if (niov == 0 && sb->sync) { + return 0; + } + + return nxt_conn_io_writev(task, sb, iov, niov); +} + + +ssize_t +nxt_conn_io_writev(nxt_task_t *task, nxt_sendbuf_t *sb, struct iovec *iov, + nxt_uint_t niov) +{ + ssize_t n; + nxt_err_t err; + + if (niov == 1) { + /* Disposal of surplus kernel iovec copy-in operation. */ + return nxt_conn_io_send(task, sb, iov[0].iov_base, iov[0].iov_len); + } + + for ( ;; ) { + n = writev(sb->socket, iov, niov); + + err = (n == -1) ? nxt_socket_errno : 0; + + nxt_debug(task, "writev(%d, %ui): %d", sb->socket, niov, n); + + if (n > 0) { + return n; + } + + /* n == -1 */ + + switch (err) { + + case NXT_EAGAIN: + sb->ready = 0; + nxt_debug(task, "writev() %E", err); + + return NXT_AGAIN; + + case NXT_EINTR: + nxt_debug(task, "writev() %E", err); + continue; + + default: + sb->error = err; + nxt_log(task, nxt_socket_error_level(err), + "writev(%d, %ui) failed %E", sb->socket, niov, err); + + return NXT_ERROR; + } + } +} + + +ssize_t +nxt_conn_io_send(nxt_task_t *task, nxt_sendbuf_t *sb, void *buf, size_t size) +{ + ssize_t n; + nxt_err_t err; + + for ( ;; ) { + n = send(sb->socket, buf, size, 0); + + err = (n == -1) ? nxt_socket_errno : 0; + + nxt_debug(task, "send(%d, %p, %uz): %z", sb->socket, buf, size, n); + + if (n > 0) { + return n; + } + + /* n == -1 */ + + switch (err) { + + case NXT_EAGAIN: + sb->ready = 0; + nxt_debug(task, "send() %E", err); + + return NXT_AGAIN; + + case NXT_EINTR: + nxt_debug(task, "send() %E", err); + continue; + + default: + sb->error = err; + nxt_log(task, nxt_socket_error_level(err), + "send(%d, %p, %uz) failed %E", sb->socket, buf, size, err); + + return NXT_ERROR; + } + } +} |