diff options
Diffstat (limited to 'src/nxt_event_conn_read.c')
-rw-r--r-- | src/nxt_event_conn_read.c | 109 |
1 files changed, 46 insertions, 63 deletions
diff --git a/src/nxt_event_conn_read.c b/src/nxt_event_conn_read.c index 20e4d39b..d898c694 100644 --- a/src/nxt_event_conn_read.c +++ b/src/nxt_event_conn_read.c @@ -8,23 +8,29 @@ void -nxt_event_conn_read(nxt_task_t *task, nxt_event_conn_t *c) +nxt_conn_wait(nxt_event_conn_t *c) { - nxt_work_queue_t *wq; - nxt_work_handler_t handler; - - handler = c->io->read; + nxt_event_engine_t *engine; + const nxt_event_conn_state_t *state; - if (task->thread->engine->batch != 0) { + nxt_debug(c->socket.task, "conn wait fd:%d rdy:%d", + c->socket.fd, c->socket.read_ready); - wq = &task->thread->engine->read_work_queue; - c->socket.read_work_queue = wq; + engine = c->socket.task->thread->engine; + state = c->read_state; - nxt_work_queue_add( wq, handler, task, c, c->socket.data); + if (c->socket.read_ready) { + nxt_work_queue_add(&engine->fast_work_queue, state->ready_handler, + c->socket.task, c, c->socket.data); return; } - handler(task, c, c->socket.data); + c->socket.read_handler = state->ready_handler; + c->socket.error_handler = state->error_handler; + + nxt_event_conn_timer(engine, c, state, &c->read_timer); + + nxt_fd_event_enable_read(engine, &c->socket); } @@ -33,7 +39,7 @@ nxt_event_conn_io_read(nxt_task_t *task, void *obj, void *data) { ssize_t n; nxt_buf_t *b; - nxt_bool_t batch; + nxt_work_queue_t *wq; nxt_event_conn_t *c; nxt_event_engine_t *engine; nxt_work_handler_t handler; @@ -46,18 +52,12 @@ nxt_event_conn_io_read(nxt_task_t *task, void *obj, void *data) engine = task->thread->engine; - batch = (engine->batch != 0); state = c->read_state; if (c->socket.read_ready) { b = c->read; - if (b == NULL) { - /* Just test descriptor readiness. */ - goto ready; - } - if (c->peek == 0) { n = c->io->recvbuf(c, b); @@ -68,34 +68,33 @@ nxt_event_conn_io_read(nxt_task_t *task, void *obj, void *data) if (n > 0) { c->nbytes = n; - if (state->process_buffers) { - nxt_recvbuf_update(b, n); - - } else { - /* - * A ready_handler must not be queued, instead buffers - * must be processed by the ready_handler at once after - * recv() operation, otherwise two sequentially queued - * recv() operations will read in the same buffers. - */ - batch = 0; + nxt_recvbuf_update(b, n); + + nxt_fd_event_block_read(engine, &c->socket); + + if (state->autoreset_timer) { + nxt_timer_disable(engine, &c->read_timer); } - goto ready; + wq = c->read_work_queue; + handler = state->ready_handler; + + nxt_work_queue_add(wq, handler, task, c, data); + + return; } if (n != NXT_AGAIN) { nxt_fd_event_block_read(engine, &c->socket); nxt_timer_disable(engine, &c->read_timer); - if (n == 0) { - handler = state->close_handler; - goto done; - } + wq = &engine->fast_work_queue; - /* n == NXT_ERROR */ - handler = state->error_handler; - goto done; + handler = (n == 0) ? state->close_handler : state->error_handler; + + nxt_work_queue_add(wq, handler, task, c, data); + + return; } } @@ -119,25 +118,6 @@ nxt_event_conn_io_read(nxt_task_t *task, void *obj, void *data) } return; - -ready: - - nxt_fd_event_block_read(engine, &c->socket); - - if (state->autoreset_timer) { - nxt_timer_disable(engine, &c->read_timer); - } - - handler = state->ready_handler; - -done: - - if (batch) { - nxt_work_queue_add(c->read_work_queue, handler, task, c, data); - - } else { - handler(task, c, data); - } } @@ -198,9 +178,9 @@ nxt_event_conn_io_recvbuf(nxt_event_conn_t *c, nxt_buf_t *b) default: c->socket.error = err; - nxt_log_error(nxt_socket_error_level(err, c->socket.log_error), - c->socket.log, "readv(%d, %ui) failed %E", - c->socket.fd, niov, err); + nxt_log(c->socket.task, nxt_socket_error_level(err), + "readv(%d, %ui) failed %E", c->socket.fd, niov, err); + return NXT_ERROR; } } @@ -233,6 +213,7 @@ nxt_event_conn_io_recv(nxt_event_conn_t *c, void *buf, size_t size, if (n == 0) { c->socket.closed = 1; c->socket.read_ready = 0; + return n; } @@ -241,19 +222,21 @@ nxt_event_conn_io_recv(nxt_event_conn_t *c, void *buf, size_t size, switch (err) { case NXT_EAGAIN: - nxt_log_debug(c->socket.log, "recv() %E", err); + nxt_debug(c->socket.task, "recv() %E", err); c->socket.read_ready = 0; + return NXT_AGAIN; case NXT_EINTR: - nxt_log_debug(c->socket.log, "recv() %E", err); + nxt_debug(c->socket.task, "recv() %E", err); continue; default: c->socket.error = err; - nxt_log_error(nxt_socket_error_level(err, c->socket.log_error), - c->socket.log, "recv(%d, %p, %uz, %ui) failed %E", - c->socket.fd, buf, size, flags, err); + nxt_log(c->socket.task, nxt_socket_error_level(err), + "recv(%d, %p, %uz, %ui) failed %E", + c->socket.fd, buf, size, flags, err); + return NXT_ERROR; } } |