summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_event_conn_job_sendfile.c
diff options
context:
space:
mode:
authorIgor Sysoev <igor@sysoev.ru>2017-01-17 20:00:00 +0300
committerIgor Sysoev <igor@sysoev.ru>2017-01-17 20:00:00 +0300
commit16cbf3c076a0aca6d47adaf3f719493674cf2363 (patch)
treee6530480020f62a2bdbf249988ec3e2a751d3927 /src/nxt_event_conn_job_sendfile.c
downloadunit-16cbf3c076a0aca6d47adaf3f719493674cf2363.tar.gz
unit-16cbf3c076a0aca6d47adaf3f719493674cf2363.tar.bz2
Initial version.
Diffstat (limited to 'src/nxt_event_conn_job_sendfile.c')
-rw-r--r--src/nxt_event_conn_job_sendfile.c268
1 files changed, 268 insertions, 0 deletions
diff --git a/src/nxt_event_conn_job_sendfile.c b/src/nxt_event_conn_job_sendfile.c
new file mode 100644
index 00000000..140febab
--- /dev/null
+++ b/src/nxt_event_conn_job_sendfile.c
@@ -0,0 +1,268 @@
+
+/*
+ * Copyright (C) Igor Sysoev
+ * Copyright (C) NGINX, Inc.
+ */
+
+#include <nxt_main.h>
+
+
+#if (NXT_THREADS)
+
+typedef struct {
+ nxt_job_t job;
+ nxt_buf_t *out;
+ size_t sent;
+ size_t limit;
+ nxt_work_handler_t ready_handler;
+} nxt_job_sendfile_t;
+
+
+static void nxt_event_conn_job_sendfile_start(nxt_thread_t *thr, void *obj,
+ void *data);
+static void nxt_event_conn_job_sendfile_handler(nxt_thread_t *thr, void *obj,
+ void *data);
+static void nxt_event_conn_job_sendfile_return(nxt_thread_t *thr, void *obj,
+ void *data);
+static nxt_buf_t *nxt_event_conn_job_sendfile_completion(nxt_thread_t *thr,
+ nxt_event_conn_t *c, nxt_buf_t *b);
+
+
+void
+nxt_event_conn_job_sendfile(nxt_thread_t *thr, nxt_event_conn_t *c)
+{
+ nxt_event_fd_disable(thr->engine, &c->socket);
+
+ /* A work item data is not used in nxt_event_conn_job_sendfile_start(). */
+ nxt_event_conn_job_sendfile_start(thr, c, NULL);
+}
+
+
+static void
+nxt_event_conn_job_sendfile_start(nxt_thread_t *thr, void *obj, void *data)
+{
+ nxt_iobuf_t b;
+ nxt_event_conn_t *c;
+ nxt_job_sendfile_t *jbs;
+ nxt_sendbuf_coalesce_t sb;
+
+ c = obj;
+
+ nxt_log_debug(thr->log, "event conn sendfile fd:%d", c->socket.fd);
+
+ jbs = nxt_job_create(c->mem_pool, sizeof(nxt_job_sendfile_t));
+
+ if (nxt_slow_path(jbs == NULL)) {
+ c->write_state->error_handler(thr, c, NULL);
+ return;
+ }
+
+ c->socket.write_handler = nxt_event_conn_job_sendfile_start;
+ c->socket.error_handler = c->write_state->error_handler;
+
+ jbs->job.data = c;
+ nxt_job_set_name(&jbs->job, "job sendfile");
+
+ jbs->limit = nxt_event_conn_write_limit(c);
+
+ if (jbs->limit != 0) {
+
+ sb.buf = c->write;
+ sb.iobuf = &b;
+ sb.nmax = 1;
+ sb.sync = 0;
+ sb.size = 0;
+ sb.limit = jbs->limit;
+
+ if (nxt_sendbuf_mem_coalesce(&sb) != 0 || !sb.sync) {
+
+ jbs->job.thread_pool = c->u.thread_pool;
+ jbs->job.log = c->socket.log;
+ jbs->out = c->write;
+ c->write = NULL;
+ jbs->ready_handler = nxt_event_conn_job_sendfile_return;
+
+ c->blocked = 1;
+
+ if (c->write_timer.state != NXT_EVENT_TIMER_DISABLED) {
+ c->write_timer.state = NXT_EVENT_TIMER_BLOCKED;
+ }
+
+ nxt_job_start(thr, &jbs->job, nxt_event_conn_job_sendfile_handler);
+ return;
+ }
+ }
+
+ nxt_event_conn_job_sendfile_return(thr, jbs, c);
+}
+
+
+static void
+nxt_event_conn_job_sendfile_handler(nxt_thread_t *thr, void *obj, void *data)
+{
+ ssize_t ret;
+ nxt_buf_t *b;
+ nxt_bool_t first;
+ nxt_event_conn_t *c;
+ nxt_job_sendfile_t *jbs;
+
+ jbs = obj;
+ c = data;
+
+ nxt_log_debug(thr->log, "event conn job sendfile fd:%d", c->socket.fd);
+
+ first = c->socket.write_ready;
+ b = jbs->out;
+
+ do {
+ ret = c->io->sendbuf(c, b, jbs->limit);
+
+ if (ret == NXT_AGAIN) {
+ break;
+ }
+
+ if (nxt_slow_path(ret == NXT_ERROR)) {
+ goto done;
+ }
+
+ jbs->sent += ret;
+ jbs->limit -= ret;
+
+ b = nxt_sendbuf_update(b, ret);
+
+ if (b == NULL) {
+ goto done;
+ }
+
+ if (jbs->limit == 0) {
+
+ if (c->rate == NULL) {
+ jbs->limit = c->max_chunk;
+ goto fast;
+ }
+
+ goto done;
+ }
+
+ } while (c->socket.write_ready);
+
+ if (first && thr->thread_pool->work_queue.head != NULL) {
+ goto fast;
+ }
+
+done:
+
+ nxt_job_return(thr, &jbs->job, jbs->ready_handler);
+ return;
+
+fast:
+
+ nxt_thread_pool_post(thr->thread_pool, nxt_event_conn_job_sendfile_handler,
+ jbs, c, thr->log);
+}
+
+
+static void
+nxt_event_conn_job_sendfile_return(nxt_thread_t *thr, void *obj, void *data)
+{
+ size_t sent;
+ nxt_buf_t *b;
+ nxt_bool_t done;
+ nxt_event_conn_t *c;
+ nxt_job_sendfile_t *jbs;
+
+ jbs = obj;
+ c = data;
+
+ c->blocked = 0;
+
+ sent = jbs->sent;
+ c->sent += sent;
+
+ nxt_log_debug(thr->log, "event conn sendfile sent:%z", sent);
+
+ b = jbs->out;
+
+ /* The job must be destroyed before connection error handler. */
+ nxt_job_destroy(jbs);
+
+ if (c->write_state->process_buffers) {
+ b = nxt_event_conn_job_sendfile_completion(thr, c, b);
+
+ done = (b == NULL);
+
+ /* Add data which might be added after sendfile job has started. */
+ nxt_buf_chain_add(&b, c->write);
+ c->write = b;
+
+ if (done) {
+ /* All data has been sent. */
+
+ if (b != NULL) {
+ /* But new data has been added. */
+ nxt_event_conn_job_sendfile_start(thr, c, NULL);
+ }
+
+ return;
+ }
+ }
+
+ if (sent != 0 && c->write_state->autoreset_timer) {
+ nxt_event_timer_disable(&c->write_timer);
+
+ } else if (c->write_timer.state == NXT_EVENT_TIMER_BLOCKED) {
+ c->write_timer.state = NXT_EVENT_TIMER_ACTIVE;
+ }
+
+ if (c->socket.error == 0
+ && !nxt_event_conn_write_delayed(thr->engine, c, sent))
+ {
+ nxt_event_conn_timer(thr->engine, c, c->write_state, &c->write_timer);
+
+ nxt_event_fd_oneshot_write(thr->engine, &c->socket);
+ }
+
+ if (sent != 0) {
+ nxt_event_conn_io_handle(thr, c->write_work_queue,
+ c->write_state->ready_handler,
+ c, c->socket.data);
+ /*
+ * Fall through if first operations were
+ * successful but the last one failed.
+ */
+ }
+
+ if (nxt_slow_path(c->socket.error != 0)) {
+ nxt_event_conn_io_handle(thr, c->write_work_queue,
+ c->write_state->error_handler,
+ c, c->socket.data);
+ }
+}
+
+
+static nxt_buf_t *
+nxt_event_conn_job_sendfile_completion(nxt_thread_t *thr, nxt_event_conn_t *c,
+ nxt_buf_t *b)
+{
+ while (b != NULL) {
+
+ nxt_prefetch(b->next);
+
+ if (nxt_buf_is_mem(b) && b->mem.pos != b->mem.free) {
+ break;
+
+ } else if (nxt_buf_is_file(b) && b->file_pos != b->file_end) {
+ break;
+ }
+
+ nxt_thread_work_queue_add(thr, c->write_work_queue,
+ b->completion_handler,
+ b, b->parent, thr->log);
+
+ b = b->next;
+ }
+
+ return b;
+}
+
+#endif