summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_event_conn_read.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/nxt_event_conn_read.c109
1 files changed, 46 insertions, 63 deletions
diff --git a/src/nxt_event_conn_read.c b/src/nxt_event_conn_read.c
index 20e4d39b..d898c694 100644
--- a/src/nxt_event_conn_read.c
+++ b/src/nxt_event_conn_read.c
@@ -8,23 +8,29 @@
void
-nxt_event_conn_read(nxt_task_t *task, nxt_event_conn_t *c)
+nxt_conn_wait(nxt_event_conn_t *c)
{
- nxt_work_queue_t *wq;
- nxt_work_handler_t handler;
-
- handler = c->io->read;
+ nxt_event_engine_t *engine;
+ const nxt_event_conn_state_t *state;
- if (task->thread->engine->batch != 0) {
+ nxt_debug(c->socket.task, "conn wait fd:%d rdy:%d",
+ c->socket.fd, c->socket.read_ready);
- wq = &task->thread->engine->read_work_queue;
- c->socket.read_work_queue = wq;
+ engine = c->socket.task->thread->engine;
+ state = c->read_state;
- nxt_work_queue_add( wq, handler, task, c, c->socket.data);
+ if (c->socket.read_ready) {
+ nxt_work_queue_add(&engine->fast_work_queue, state->ready_handler,
+ c->socket.task, c, c->socket.data);
return;
}
- handler(task, c, c->socket.data);
+ c->socket.read_handler = state->ready_handler;
+ c->socket.error_handler = state->error_handler;
+
+ nxt_event_conn_timer(engine, c, state, &c->read_timer);
+
+ nxt_fd_event_enable_read(engine, &c->socket);
}
@@ -33,7 +39,7 @@ nxt_event_conn_io_read(nxt_task_t *task, void *obj, void *data)
{
ssize_t n;
nxt_buf_t *b;
- nxt_bool_t batch;
+ nxt_work_queue_t *wq;
nxt_event_conn_t *c;
nxt_event_engine_t *engine;
nxt_work_handler_t handler;
@@ -46,18 +52,12 @@ nxt_event_conn_io_read(nxt_task_t *task, void *obj, void *data)
engine = task->thread->engine;
- batch = (engine->batch != 0);
state = c->read_state;
if (c->socket.read_ready) {
b = c->read;
- if (b == NULL) {
- /* Just test descriptor readiness. */
- goto ready;
- }
-
if (c->peek == 0) {
n = c->io->recvbuf(c, b);
@@ -68,34 +68,33 @@ nxt_event_conn_io_read(nxt_task_t *task, void *obj, void *data)
if (n > 0) {
c->nbytes = n;
- if (state->process_buffers) {
- nxt_recvbuf_update(b, n);
-
- } else {
- /*
- * A ready_handler must not be queued, instead buffers
- * must be processed by the ready_handler at once after
- * recv() operation, otherwise two sequentially queued
- * recv() operations will read in the same buffers.
- */
- batch = 0;
+ nxt_recvbuf_update(b, n);
+
+ nxt_fd_event_block_read(engine, &c->socket);
+
+ if (state->autoreset_timer) {
+ nxt_timer_disable(engine, &c->read_timer);
}
- goto ready;
+ wq = c->read_work_queue;
+ handler = state->ready_handler;
+
+ nxt_work_queue_add(wq, handler, task, c, data);
+
+ return;
}
if (n != NXT_AGAIN) {
nxt_fd_event_block_read(engine, &c->socket);
nxt_timer_disable(engine, &c->read_timer);
- if (n == 0) {
- handler = state->close_handler;
- goto done;
- }
+ wq = &engine->fast_work_queue;
- /* n == NXT_ERROR */
- handler = state->error_handler;
- goto done;
+ handler = (n == 0) ? state->close_handler : state->error_handler;
+
+ nxt_work_queue_add(wq, handler, task, c, data);
+
+ return;
}
}
@@ -119,25 +118,6 @@ nxt_event_conn_io_read(nxt_task_t *task, void *obj, void *data)
}
return;
-
-ready:
-
- nxt_fd_event_block_read(engine, &c->socket);
-
- if (state->autoreset_timer) {
- nxt_timer_disable(engine, &c->read_timer);
- }
-
- handler = state->ready_handler;
-
-done:
-
- if (batch) {
- nxt_work_queue_add(c->read_work_queue, handler, task, c, data);
-
- } else {
- handler(task, c, data);
- }
}
@@ -198,9 +178,9 @@ nxt_event_conn_io_recvbuf(nxt_event_conn_t *c, nxt_buf_t *b)
default:
c->socket.error = err;
- nxt_log_error(nxt_socket_error_level(err, c->socket.log_error),
- c->socket.log, "readv(%d, %ui) failed %E",
- c->socket.fd, niov, err);
+ nxt_log(c->socket.task, nxt_socket_error_level(err),
+ "readv(%d, %ui) failed %E", c->socket.fd, niov, err);
+
return NXT_ERROR;
}
}
@@ -233,6 +213,7 @@ nxt_event_conn_io_recv(nxt_event_conn_t *c, void *buf, size_t size,
if (n == 0) {
c->socket.closed = 1;
c->socket.read_ready = 0;
+
return n;
}
@@ -241,19 +222,21 @@ nxt_event_conn_io_recv(nxt_event_conn_t *c, void *buf, size_t size,
switch (err) {
case NXT_EAGAIN:
- nxt_log_debug(c->socket.log, "recv() %E", err);
+ nxt_debug(c->socket.task, "recv() %E", err);
c->socket.read_ready = 0;
+
return NXT_AGAIN;
case NXT_EINTR:
- nxt_log_debug(c->socket.log, "recv() %E", err);
+ nxt_debug(c->socket.task, "recv() %E", err);
continue;
default:
c->socket.error = err;
- nxt_log_error(nxt_socket_error_level(err, c->socket.log_error),
- c->socket.log, "recv(%d, %p, %uz, %ui) failed %E",
- c->socket.fd, buf, size, flags, err);
+ nxt_log(c->socket.task, nxt_socket_error_level(err),
+ "recv(%d, %p, %uz, %ui) failed %E",
+ c->socket.fd, buf, size, flags, err);
+
return NXT_ERROR;
}
}