summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_event_conn_write.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nxt_event_conn_write.c')
-rw-r--r--src/nxt_event_conn_write.c420
1 files changed, 0 insertions, 420 deletions
diff --git a/src/nxt_event_conn_write.c b/src/nxt_event_conn_write.c
deleted file mode 100644
index fa5b9241..00000000
--- a/src/nxt_event_conn_write.c
+++ /dev/null
@@ -1,420 +0,0 @@
-
-/*
- * Copyright (C) Igor Sysoev
- * Copyright (C) NGINX, Inc.
- */
-
-#include <nxt_main.h>
-
-
-static void nxt_event_conn_write_timer_handler(nxt_task_t *task, void *obj,
- void *data);
-
-
-void
-nxt_conn_io_write(nxt_task_t *task, void *obj, void *data)
-{
- ssize_t ret;
- nxt_buf_t *b;
- nxt_sendbuf_t sb;
- nxt_event_conn_t *c;
- nxt_event_engine_t *engine;
-
- c = obj;
-
- nxt_debug(task, "event conn write fd:%d", c->socket.fd);
-
- if (!c->socket.write_ready || c->write == NULL) {
- return;
- }
-
- engine = task->thread->engine;
-
- c->socket.write_handler = nxt_conn_io_write;
- c->socket.error_handler = c->write_state->error_handler;
-
- b = c->write;
-
- sb.socket = c->socket.fd;
- sb.error = 0;
- sb.sent = 0;
- sb.size = 0;
- sb.buf = b;
- sb.limit = 10 * 1024 * 1024;
- sb.ready = 1;
- sb.sync = 0;
-
- do {
- ret = nxt_conn_io_sendbuf(task, &sb);
-
- c->socket.write_ready = sb.ready;
- c->socket.error = sb.error;
-
- if (ret < 0) {
- /* ret == NXT_AGAIN || ret == NXT_ERROR. */
- break;
- }
-
- sb.sent += ret;
- sb.limit -= ret;
-
- b = nxt_sendbuf_update(b, ret);
-
- if (b == NULL) {
- nxt_fd_event_block_write(engine, &c->socket);
- break;
- }
-
- sb.buf = b;
-
- if (!c->socket.write_ready) {
- ret = NXT_AGAIN;
- break;
- }
-
- } while (sb.limit != 0);
-
- nxt_debug(task, "event conn: %i sent:%z", ret, sb.sent);
-
- if (sb.sent != 0) {
- if (c->write_state->timer_autoreset) {
- nxt_timer_disable(engine, &c->write_timer);
- }
- }
-
- if (ret != NXT_ERROR) {
-
- if (sb.limit == 0) {
- /*
- * Postpone writing until next event poll to allow to
- * process other recevied events and to get new events.
- */
- c->write_timer.handler = nxt_event_conn_write_timer_handler;
- nxt_timer_add(engine, &c->write_timer, 0);
-
- } else if (ret == NXT_AGAIN) {
- /*
- * SSL libraries can require to toggle either write or read
- * event if renegotiation occurs during SSL write operation.
- * This case is handled on the event_io->send() level. Timer
- * can be set here because it should be set only for write
- * direction.
- */
- nxt_event_conn_timer(engine, c, c->write_state, &c->write_timer);
-
- if (nxt_fd_event_is_disabled(c->socket.write)) {
- nxt_fd_event_enable_write(engine, &c->socket);
- }
- }
- }
-
- if (ret == 0 || sb.sent != 0) {
- /* "ret == 0" means a sync buffer was processed. */
- c->sent += sb.sent;
- nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler,
- task, c, data);
- /*
- * Fall through if first operations were
- * successful but the last one failed.
- */
- }
-
- if (nxt_slow_path(ret == NXT_ERROR)) {
- nxt_fd_event_block_write(engine, &c->socket);
-
- nxt_work_queue_add(c->write_work_queue, c->write_state->error_handler,
- task, c, data);
- }
-}
-
-
-size_t
-nxt_event_conn_write_limit(nxt_event_conn_t *c)
-{
- ssize_t limit, correction;
- nxt_event_write_rate_t *rate;
-
- rate = c->rate;
-
- if (rate == NULL) {
- return c->max_chunk;
- }
-
- limit = rate->limit;
- correction = limit - (size_t) rate->average;
-
- nxt_debug(c->socket.task, "event conn correction:%z average:%0.3f",
- correction, rate->average);
-
- limit += correction;
-
- if (limit <= 0) {
- return 0;
- }
-
- if (rate->limit_after != 0) {
- limit += rate->limit_after;
- limit = nxt_min((size_t) limit, rate->max_limit);
- }
-
- return nxt_min((size_t) limit, c->max_chunk);
-}
-
-
-nxt_bool_t
-nxt_event_conn_write_delayed(nxt_event_engine_t *engine, nxt_event_conn_t *c,
- size_t sent)
-{
- return 0;
-}
-
-
-static void
-nxt_event_conn_write_timer_handler(nxt_task_t *task, void *obj, void *data)
-{
- nxt_timer_t *ev;
- nxt_event_conn_t *c;
-
- ev = obj;
-
- nxt_debug(task, "event conn conn timer");
-
- c = nxt_event_write_timer_conn(ev);
- c->delayed = 0;
-
- c->io->write(task, c, c->socket.data);
-}
-
-
-ssize_t
-nxt_event_conn_io_write_chunk(nxt_event_conn_t *c, nxt_buf_t *b, size_t limit)
-{
- ssize_t ret;
-
- ret = c->io->sendbuf(c, b, limit);
-
- if ((ret == NXT_AGAIN || !c->socket.write_ready)
- && nxt_fd_event_is_disabled(c->socket.write))
- {
- nxt_fd_event_enable_write(c->socket.task->thread->engine, &c->socket);
- }
-
- return ret;
-}
-
-
-ssize_t
-nxt_event_conn_io_sendbuf(nxt_event_conn_t *c, nxt_buf_t *b, size_t limit)
-{
- nxt_uint_t niob;
- struct iovec iob[NXT_IOBUF_MAX];
- nxt_sendbuf_coalesce_t sb;
-
- sb.buf = b;
- sb.iobuf = iob;
- sb.nmax = NXT_IOBUF_MAX;
- sb.sync = 0;
- sb.size = 0;
- sb.limit = limit;
-
- niob = nxt_sendbuf_mem_coalesce(c->socket.task, &sb);
-
- if (niob == 0 && sb.sync) {
- return 0;
- }
-
- return nxt_event_conn_io_writev(c, iob, niob);
-}
-
-
-ssize_t
-nxt_event_conn_io_writev(nxt_event_conn_t *c, nxt_iobuf_t *iob, nxt_uint_t niob)
-{
- ssize_t n;
- nxt_err_t err;
-
- if (niob == 1) {
- /* Disposal of surplus kernel iovec copy-in operation. */
- return nxt_event_conn_io_send(c, iob->iov_base, iob->iov_len);
- }
-
- for ( ;; ) {
- n = writev(c->socket.fd, iob, niob);
-
- err = (n == -1) ? nxt_socket_errno : 0;
-
- nxt_debug(c->socket.task, "writev(%d, %ui): %d", c->socket.fd, niob, n);
-
- if (n > 0) {
- return n;
- }
-
- /* n == -1 */
-
- switch (err) {
-
- case NXT_EAGAIN:
- nxt_debug(c->socket.task, "writev() %E", err);
- c->socket.write_ready = 0;
- return NXT_AGAIN;
-
- case NXT_EINTR:
- nxt_debug(c->socket.task, "writev() %E", err);
- continue;
-
- default:
- c->socket.error = err;
- nxt_log(c->socket.task, nxt_socket_error_level(err),
- "writev(%d, %ui) failed %E", c->socket.fd, niob, err);
- return NXT_ERROR;
- }
- }
-}
-
-
-ssize_t
-nxt_event_conn_io_send(nxt_event_conn_t *c, void *buf, size_t size)
-{
- ssize_t n;
- nxt_err_t err;
-
- for ( ;; ) {
- n = send(c->socket.fd, buf, size, 0);
-
- err = (n == -1) ? nxt_socket_errno : 0;
-
- nxt_debug(c->socket.task, "send(%d, %p, %uz): %z",
- c->socket.fd, buf, size, n);
-
- if (n > 0) {
- return n;
- }
-
- /* n == -1 */
-
- switch (err) {
-
- case NXT_EAGAIN:
- nxt_debug(c->socket.task, "send() %E", err);
- c->socket.write_ready = 0;
- return NXT_AGAIN;
-
- case NXT_EINTR:
- nxt_debug(c->socket.task, "send() %E", err);
- continue;
-
- default:
- c->socket.error = err;
- nxt_log(c->socket.task, nxt_socket_error_level(err),
- "send(%d, %p, %uz) failed %E",
- c->socket.fd, buf, size, err);
- return NXT_ERROR;
- }
- }
-}
-
-
-ssize_t
-nxt_conn_io_sendbuf(nxt_task_t *task, nxt_sendbuf_t *sb)
-{
- nxt_uint_t niov;
- struct iovec iov[NXT_IOBUF_MAX];
-
- niov = nxt_sendbuf_mem_coalesce0(task, sb, iov, NXT_IOBUF_MAX);
-
- if (niov == 0 && sb->sync) {
- return 0;
- }
-
- return nxt_conn_io_writev(task, sb, iov, niov);
-}
-
-
-ssize_t
-nxt_conn_io_writev(nxt_task_t *task, nxt_sendbuf_t *sb, struct iovec *iov,
- nxt_uint_t niov)
-{
- ssize_t n;
- nxt_err_t err;
-
- if (niov == 1) {
- /* Disposal of surplus kernel iovec copy-in operation. */
- return nxt_conn_io_send(task, sb, iov[0].iov_base, iov[0].iov_len);
- }
-
- for ( ;; ) {
- n = writev(sb->socket, iov, niov);
-
- err = (n == -1) ? nxt_socket_errno : 0;
-
- nxt_debug(task, "writev(%d, %ui): %d", sb->socket, niov, n);
-
- if (n > 0) {
- return n;
- }
-
- /* n == -1 */
-
- switch (err) {
-
- case NXT_EAGAIN:
- sb->ready = 0;
- nxt_debug(task, "writev() %E", err);
-
- return NXT_AGAIN;
-
- case NXT_EINTR:
- nxt_debug(task, "writev() %E", err);
- continue;
-
- default:
- sb->error = err;
- nxt_log(task, nxt_socket_error_level(err),
- "writev(%d, %ui) failed %E", sb->socket, niov, err);
-
- return NXT_ERROR;
- }
- }
-}
-
-
-ssize_t
-nxt_conn_io_send(nxt_task_t *task, nxt_sendbuf_t *sb, void *buf, size_t size)
-{
- ssize_t n;
- nxt_err_t err;
-
- for ( ;; ) {
- n = send(sb->socket, buf, size, 0);
-
- err = (n == -1) ? nxt_socket_errno : 0;
-
- nxt_debug(task, "send(%d, %p, %uz): %z", sb->socket, buf, size, n);
-
- if (n > 0) {
- return n;
- }
-
- /* n == -1 */
-
- switch (err) {
-
- case NXT_EAGAIN:
- sb->ready = 0;
- nxt_debug(task, "send() %E", err);
-
- return NXT_AGAIN;
-
- case NXT_EINTR:
- nxt_debug(task, "send() %E", err);
- continue;
-
- default:
- sb->error = err;
- nxt_log(task, nxt_socket_error_level(err),
- "send(%d, %p, %uz) failed %E", sb->socket, buf, size, err);
-
- return NXT_ERROR;
- }
- }
-}