summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_event_conn_write.c
diff options
context:
space:
mode:
authorIgor Sysoev <igor@sysoev.ru>2017-01-23 19:56:03 +0300
committerIgor Sysoev <igor@sysoev.ru>2017-01-23 19:56:03 +0300
commitde532922d9ab42aa15b40d47c8db53ac2af38500 (patch)
treed6b7c6b21c7a6e0e3620a3e0c7198e63454d05e3 /src/nxt_event_conn_write.c
parent16cbf3c076a0aca6d47adaf3f719493674cf2363 (diff)
downloadunit-de532922d9ab42aa15b40d47c8db53ac2af38500.tar.gz
unit-de532922d9ab42aa15b40d47c8db53ac2af38500.tar.bz2
Introducing tasks.
Diffstat (limited to '')
-rw-r--r--src/nxt_event_conn_write.c78
1 files changed, 40 insertions, 38 deletions
diff --git a/src/nxt_event_conn_write.c b/src/nxt_event_conn_write.c
index 0f35b5f3..d72ddc80 100644
--- a/src/nxt_event_conn_write.c
+++ b/src/nxt_event_conn_write.c
@@ -11,38 +11,41 @@ static void nxt_event_conn_average_rate_update(nxt_event_write_rate_t *rate,
size_t sent, nxt_msec_t now);
NXT_LIB_UNIT_TEST_STATIC double
nxt_event_conn_exponential_approximation(double n);
-static void nxt_event_conn_write_timer_handler(nxt_thread_t *thr, void *obj,
+static void nxt_event_conn_write_timer_handler(nxt_task_t *task, void *obj,
void *data);
void
-nxt_event_conn_write(nxt_thread_t *thr, nxt_event_conn_t *c)
+nxt_event_conn_write(nxt_task_t *task, nxt_event_conn_t *c)
{
- if (thr->engine->batch != 0) {
- nxt_event_conn_write_enqueue(thr, c);
+ if (task->thread->engine->batch != 0) {
+ nxt_event_conn_write_enqueue(task->thread, task, c);
} else {
- c->io->write(thr, c, c->socket.data);
+ c->io->write(task, c, c->socket.data);
}
}
void
-nxt_event_conn_io_write(nxt_thread_t *thr, void *obj, void *data)
+nxt_event_conn_io_write(nxt_task_t *task, void *obj, void *data)
{
- size_t sent, limit;
- ssize_t ret;
- nxt_buf_t *b;
- nxt_event_conn_t *c;
+ size_t sent, limit;
+ ssize_t ret;
+ nxt_buf_t *b;
+ nxt_event_conn_t *c;
+ nxt_event_engine_t *engine;
c = obj;
- nxt_log_debug(thr->log, "event conn write fd:%d", c->socket.fd);
+ nxt_debug(task, "event conn write fd:%d", c->socket.fd);
if (!c->socket.write_ready || c->delayed || c->write == NULL) {
return;
}
+ engine = task->thread->engine;
+
c->socket.write_handler = nxt_event_conn_io_write;
c->socket.error_handler = c->write_state->error_handler;
@@ -54,7 +57,7 @@ nxt_event_conn_io_write(nxt_thread_t *thr, void *obj, void *data)
while (limit != 0) {
- ret = c->io->write_chunk(thr, c, b, limit);
+ ret = c->io->write_chunk(c, b, limit);
if (ret < 0) {
/* ret == NXT_AGAIN || ret == NXT_ERROR. */
@@ -65,7 +68,7 @@ nxt_event_conn_io_write(nxt_thread_t *thr, void *obj, void *data)
limit -= ret;
if (c->write_state->process_buffers) {
- b = nxt_sendbuf_completion(thr, c->write_work_queue, b, ret);
+ b = nxt_sendbuf_completion(task, c->write_work_queue, b, ret);
c->write = b;
} else {
@@ -73,7 +76,7 @@ nxt_event_conn_io_write(nxt_thread_t *thr, void *obj, void *data)
}
if (b == NULL) {
- nxt_event_fd_block_write(thr->engine, &c->socket);
+ nxt_event_fd_block_write(engine, &c->socket);
break;
}
@@ -83,7 +86,7 @@ nxt_event_conn_io_write(nxt_thread_t *thr, void *obj, void *data)
}
}
- nxt_log_debug(thr->log, "event conn: %i sent:%z", ret, sent);
+ nxt_debug(task, "event conn: %i sent:%z", ret, sent);
if (sent != 0) {
if (c->write_state->autoreset_timer) {
@@ -92,7 +95,7 @@ nxt_event_conn_io_write(nxt_thread_t *thr, void *obj, void *data)
}
if (ret != NXT_ERROR
- && !nxt_event_conn_write_delayed(thr->engine, c, sent))
+ && !nxt_event_conn_write_delayed(engine, c, sent))
{
if (limit == 0) {
/*
@@ -100,7 +103,7 @@ nxt_event_conn_io_write(nxt_thread_t *thr, void *obj, void *data)
* process other recevied events and to get new events.
*/
c->write_timer.handler = nxt_event_conn_write_timer_handler;
- nxt_event_timer_add(thr->engine, &c->write_timer, 0);
+ nxt_event_timer_add(engine, &c->write_timer, 0);
} else if (ret == NXT_AGAIN) {
/*
@@ -110,16 +113,15 @@ nxt_event_conn_io_write(nxt_thread_t *thr, void *obj, void *data)
* can be set here because it should be set only for write
* direction.
*/
- nxt_event_conn_timer(thr->engine, c, c->write_state,
- &c->write_timer);
+ nxt_event_conn_timer(engine, c, c->write_state, &c->write_timer);
}
}
if (ret == 0 || sent != 0) {
/* "ret == 0" means a sync buffer was processed. */
c->sent += sent;
- nxt_event_conn_io_handle(thr, c->write_work_queue,
- c->write_state->ready_handler, c, data);
+ nxt_event_conn_io_handle(task->thread, c->write_work_queue,
+ c->write_state->ready_handler, task, c, data);
/*
* Fall through if first operations were
* successful but the last one failed.
@@ -127,10 +129,10 @@ nxt_event_conn_io_write(nxt_thread_t *thr, void *obj, void *data)
}
if (nxt_slow_path(ret == NXT_ERROR)) {
- nxt_event_fd_block_write(thr->engine, &c->socket);
+ nxt_event_fd_block_write(engine, &c->socket);
- nxt_event_conn_io_handle(thr, c->write_work_queue,
- c->write_state->error_handler, c, data);
+ nxt_event_conn_io_handle(task->thread, c->write_work_queue,
+ c->write_state->error_handler, task, c, data);
}
}
@@ -284,25 +286,24 @@ nxt_event_conn_exponential_approximation(double x)
static void
-nxt_event_conn_write_timer_handler(nxt_thread_t *thr, void *obj, void *data)
+nxt_event_conn_write_timer_handler(nxt_task_t *task, void *obj, void *data)
{
nxt_event_conn_t *c;
nxt_event_timer_t *ev;
ev = obj;
- nxt_log_debug(thr->log, "event conn conn timer");
+ nxt_debug(task, "event conn conn timer");
c = nxt_event_write_timer_conn(ev);
c->delayed = 0;
- c->io->write(thr, c, c->socket.data);
+ c->io->write(task, c, c->socket.data);
}
ssize_t
-nxt_event_conn_io_write_chunk(nxt_thread_t *thr, nxt_event_conn_t *c,
- nxt_buf_t *b, size_t limit)
+nxt_event_conn_io_write_chunk(nxt_event_conn_t *c, nxt_buf_t *b, size_t limit)
{
ssize_t ret;
@@ -311,7 +312,7 @@ nxt_event_conn_io_write_chunk(nxt_thread_t *thr, nxt_event_conn_t *c,
if ((ret == NXT_AGAIN || !c->socket.write_ready)
&& nxt_event_fd_is_disabled(c->socket.write))
{
- nxt_event_fd_enable_write(thr->engine, &c->socket);
+ nxt_event_fd_enable_write(c->socket.task->thread->engine, &c->socket);
}
return ret;
@@ -332,7 +333,7 @@ nxt_event_conn_io_sendbuf(nxt_event_conn_t *c, nxt_buf_t *b, size_t limit)
sb.size = 0;
sb.limit = limit;
- niob = nxt_sendbuf_mem_coalesce(&sb);
+ niob = nxt_sendbuf_mem_coalesce(c->socket.task, &sb);
if (niob == 0 && sb.sync) {
return 0;
@@ -400,8 +401,8 @@ nxt_event_conn_io_send(nxt_event_conn_t *c, void *buf, size_t size)
err = (n == -1) ? nxt_socket_errno : 0;
- nxt_log_debug(c->socket.log, "send(%d, %p, %uz): %z",
- c->socket.fd, buf, size, n);
+ nxt_debug(c->socket.task, "send(%d, %p, %uz): %z",
+ c->socket.fd, buf, size, n);
if (n > 0) {
return n;
@@ -412,19 +413,20 @@ nxt_event_conn_io_send(nxt_event_conn_t *c, void *buf, size_t size)
switch (err) {
case NXT_EAGAIN:
- nxt_log_debug(c->socket.log, "send() %E", err);
+ nxt_debug(c->socket.task, "send() %E", err);
c->socket.write_ready = 0;
return NXT_AGAIN;
case NXT_EINTR:
- nxt_log_debug(c->socket.log, "send() %E", err);
+ nxt_debug(c->socket.task, "send() %E", err);
continue;
default:
c->socket.error = err;
- nxt_log_error(nxt_socket_error_level(err, c->socket.log_error),
- c->socket.log, "send(%d, %p, %uz) failed %E",
- c->socket.fd, buf, size, err);
+ nxt_log(c->socket.task,
+ nxt_socket_error_level(err, c->socket.log_error),
+ "send(%d, %p, %uz) failed %E",
+ c->socket.fd, buf, size, err);
return NXT_ERROR;
}
}