diff options
author | Igor Sysoev <igor@sysoev.ru> | 2017-01-17 20:00:00 +0300 |
---|---|---|
committer | Igor Sysoev <igor@sysoev.ru> | 2017-01-17 20:00:00 +0300 |
commit | 16cbf3c076a0aca6d47adaf3f719493674cf2363 (patch) | |
tree | e6530480020f62a2bdbf249988ec3e2a751d3927 /src/nxt_event_conn_job_sendfile.c | |
download | unit-16cbf3c076a0aca6d47adaf3f719493674cf2363.tar.gz unit-16cbf3c076a0aca6d47adaf3f719493674cf2363.tar.bz2 |
Initial version.
Diffstat (limited to 'src/nxt_event_conn_job_sendfile.c')
-rw-r--r-- | src/nxt_event_conn_job_sendfile.c | 268 |
1 files changed, 268 insertions, 0 deletions
diff --git a/src/nxt_event_conn_job_sendfile.c b/src/nxt_event_conn_job_sendfile.c new file mode 100644 index 00000000..140febab --- /dev/null +++ b/src/nxt_event_conn_job_sendfile.c @@ -0,0 +1,268 @@ + +/* + * Copyright (C) Igor Sysoev + * Copyright (C) NGINX, Inc. + */ + +#include <nxt_main.h> + + +#if (NXT_THREADS) + +typedef struct { + nxt_job_t job; + nxt_buf_t *out; + size_t sent; + size_t limit; + nxt_work_handler_t ready_handler; +} nxt_job_sendfile_t; + + +static void nxt_event_conn_job_sendfile_start(nxt_thread_t *thr, void *obj, + void *data); +static void nxt_event_conn_job_sendfile_handler(nxt_thread_t *thr, void *obj, + void *data); +static void nxt_event_conn_job_sendfile_return(nxt_thread_t *thr, void *obj, + void *data); +static nxt_buf_t *nxt_event_conn_job_sendfile_completion(nxt_thread_t *thr, + nxt_event_conn_t *c, nxt_buf_t *b); + + +void +nxt_event_conn_job_sendfile(nxt_thread_t *thr, nxt_event_conn_t *c) +{ + nxt_event_fd_disable(thr->engine, &c->socket); + + /* A work item data is not used in nxt_event_conn_job_sendfile_start(). */ + nxt_event_conn_job_sendfile_start(thr, c, NULL); +} + + +static void +nxt_event_conn_job_sendfile_start(nxt_thread_t *thr, void *obj, void *data) +{ + nxt_iobuf_t b; + nxt_event_conn_t *c; + nxt_job_sendfile_t *jbs; + nxt_sendbuf_coalesce_t sb; + + c = obj; + + nxt_log_debug(thr->log, "event conn sendfile fd:%d", c->socket.fd); + + jbs = nxt_job_create(c->mem_pool, sizeof(nxt_job_sendfile_t)); + + if (nxt_slow_path(jbs == NULL)) { + c->write_state->error_handler(thr, c, NULL); + return; + } + + c->socket.write_handler = nxt_event_conn_job_sendfile_start; + c->socket.error_handler = c->write_state->error_handler; + + jbs->job.data = c; + nxt_job_set_name(&jbs->job, "job sendfile"); + + jbs->limit = nxt_event_conn_write_limit(c); + + if (jbs->limit != 0) { + + sb.buf = c->write; + sb.iobuf = &b; + sb.nmax = 1; + sb.sync = 0; + sb.size = 0; + sb.limit = jbs->limit; + + if (nxt_sendbuf_mem_coalesce(&sb) != 0 || !sb.sync) { + + jbs->job.thread_pool = c->u.thread_pool; + jbs->job.log = c->socket.log; + jbs->out = c->write; + c->write = NULL; + jbs->ready_handler = nxt_event_conn_job_sendfile_return; + + c->blocked = 1; + + if (c->write_timer.state != NXT_EVENT_TIMER_DISABLED) { + c->write_timer.state = NXT_EVENT_TIMER_BLOCKED; + } + + nxt_job_start(thr, &jbs->job, nxt_event_conn_job_sendfile_handler); + return; + } + } + + nxt_event_conn_job_sendfile_return(thr, jbs, c); +} + + +static void +nxt_event_conn_job_sendfile_handler(nxt_thread_t *thr, void *obj, void *data) +{ + ssize_t ret; + nxt_buf_t *b; + nxt_bool_t first; + nxt_event_conn_t *c; + nxt_job_sendfile_t *jbs; + + jbs = obj; + c = data; + + nxt_log_debug(thr->log, "event conn job sendfile fd:%d", c->socket.fd); + + first = c->socket.write_ready; + b = jbs->out; + + do { + ret = c->io->sendbuf(c, b, jbs->limit); + + if (ret == NXT_AGAIN) { + break; + } + + if (nxt_slow_path(ret == NXT_ERROR)) { + goto done; + } + + jbs->sent += ret; + jbs->limit -= ret; + + b = nxt_sendbuf_update(b, ret); + + if (b == NULL) { + goto done; + } + + if (jbs->limit == 0) { + + if (c->rate == NULL) { + jbs->limit = c->max_chunk; + goto fast; + } + + goto done; + } + + } while (c->socket.write_ready); + + if (first && thr->thread_pool->work_queue.head != NULL) { + goto fast; + } + +done: + + nxt_job_return(thr, &jbs->job, jbs->ready_handler); + return; + +fast: + + nxt_thread_pool_post(thr->thread_pool, nxt_event_conn_job_sendfile_handler, + jbs, c, thr->log); +} + + +static void +nxt_event_conn_job_sendfile_return(nxt_thread_t *thr, void *obj, void *data) +{ + size_t sent; + nxt_buf_t *b; + nxt_bool_t done; + nxt_event_conn_t *c; + nxt_job_sendfile_t *jbs; + + jbs = obj; + c = data; + + c->blocked = 0; + + sent = jbs->sent; + c->sent += sent; + + nxt_log_debug(thr->log, "event conn sendfile sent:%z", sent); + + b = jbs->out; + + /* The job must be destroyed before connection error handler. */ + nxt_job_destroy(jbs); + + if (c->write_state->process_buffers) { + b = nxt_event_conn_job_sendfile_completion(thr, c, b); + + done = (b == NULL); + + /* Add data which might be added after sendfile job has started. */ + nxt_buf_chain_add(&b, c->write); + c->write = b; + + if (done) { + /* All data has been sent. */ + + if (b != NULL) { + /* But new data has been added. */ + nxt_event_conn_job_sendfile_start(thr, c, NULL); + } + + return; + } + } + + if (sent != 0 && c->write_state->autoreset_timer) { + nxt_event_timer_disable(&c->write_timer); + + } else if (c->write_timer.state == NXT_EVENT_TIMER_BLOCKED) { + c->write_timer.state = NXT_EVENT_TIMER_ACTIVE; + } + + if (c->socket.error == 0 + && !nxt_event_conn_write_delayed(thr->engine, c, sent)) + { + nxt_event_conn_timer(thr->engine, c, c->write_state, &c->write_timer); + + nxt_event_fd_oneshot_write(thr->engine, &c->socket); + } + + if (sent != 0) { + nxt_event_conn_io_handle(thr, c->write_work_queue, + c->write_state->ready_handler, + c, c->socket.data); + /* + * Fall through if first operations were + * successful but the last one failed. + */ + } + + if (nxt_slow_path(c->socket.error != 0)) { + nxt_event_conn_io_handle(thr, c->write_work_queue, + c->write_state->error_handler, + c, c->socket.data); + } +} + + +static nxt_buf_t * +nxt_event_conn_job_sendfile_completion(nxt_thread_t *thr, nxt_event_conn_t *c, + nxt_buf_t *b) +{ + while (b != NULL) { + + nxt_prefetch(b->next); + + if (nxt_buf_is_mem(b) && b->mem.pos != b->mem.free) { + break; + + } else if (nxt_buf_is_file(b) && b->file_pos != b->file_end) { + break; + } + + nxt_thread_work_queue_add(thr, c->write_work_queue, + b->completion_handler, + b, b->parent, thr->log); + + b = b->next; + } + + return b; +} + +#endif |