/* * Copyright (C) Igor Sysoev * Copyright (C) NGINX, Inc. */ #include void nxt_conn_wait(nxt_conn_t *c) { nxt_event_engine_t *engine; const nxt_conn_state_t *state; nxt_debug(c->socket.task, "conn wait fd:%d rdy:%d", c->socket.fd, c->socket.read_ready); engine = c->socket.task->thread->engine; state = c->read_state; if (c->socket.read_ready) { nxt_work_queue_add(&engine->fast_work_queue, state->ready_handler, c->socket.task, c, c->socket.data); return; } c->socket.read_handler = state->ready_handler; c->socket.error_handler = state->error_handler; nxt_conn_timer(engine, c, state, &c->read_timer); nxt_fd_event_enable_read(engine, &c->socket); } void nxt_conn_io_read(nxt_task_t *task, void *obj, void *data) { ssize_t n; nxt_conn_t *c; nxt_event_engine_t *engine; nxt_work_handler_t handler; const nxt_conn_state_t *state; c = obj; nxt_debug(task, "conn read fd:%d rdy:%d cl:%d er:%d bl:%d", c->socket.fd, c->socket.read_ready, c->socket.closed, c->socket.error, c->block_read); if (c->socket.error != 0 || c->block_read) { return; } engine = task->thread->engine; /* * Here c->io->read() is assigned instead of direct nxt_conn_io_read() * because the function can be called by nxt_kqueue_conn_io_read(). */ c->socket.read_handler = c->io->read; state = c->read_state; c->socket.error_handler = state->error_handler; if (c->socket.read_ready) { if (state->io_read_handler == NULL) { n = c->io->recvbuf(c, c->read); } else { n = state->io_read_handler(task, c); /* The state can be changed by io_read_handler. */ state = c->read_state; } if (n > 0) { c->nbytes = n; nxt_recvbuf_update(c->read, n); nxt_fd_event_block_read(engine, &c->socket); if (state->timer_autoreset) { nxt_timer_disable(engine, &c->read_timer); } nxt_work_queue_add(c->read_work_queue, state->ready_handler, task, c, data); return; } if (n != NXT_AGAIN) { /* n == 0 or n == NXT_ERROR. */ handler = (n == 0) ? state->close_handler : state->error_handler; nxt_fd_event_block_read(engine, &c->socket); nxt_timer_disable(engine, &c->read_timer); nxt_work_queue_add(&engine->fast_work_queue, handler, task, c, data); return; } /* n == NXT_AGAIN. */ if (c->socket.read_ready) { /* * SSL/TLS library can return NXT_AGAIN if renegotiation * occured during read operation, it toggled write event * internally so only read timer should be set. */ if (!c->read_timer.enabled) { nxt_conn_timer(engine, c, state, &c->read_timer); } return; } } if (nxt_fd_event_is_disabled(c->socket.read)) { nxt_fd_event_enable_read(engine, &c->socket); } if (state->timer_autoreset || !c->read_timer.enabled) { nxt_conn_timer(engine, c, state, &c->read_timer); } } ssize_t nxt_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b) { ssize_t n; nxt_err_t err; nxt_uint_t niov; struct iovec iov[NXT_IOBUF_MAX]; nxt_recvbuf_coalesce_t rb; rb.buf = b; rb.iobuf = iov; rb.nmax = NXT_IOBUF_MAX; rb.size = 0; niov = nxt_recvbuf_mem_coalesce(&rb); if (niov == 1) { /* Disposal of surplus kernel iovec copy-in operation. */ return nxt_conn_io_recv(c, iov->iov_base, iov->iov_len, 0); } for ( ;; ) { n = readv(c->socket.fd, iov, niov); err = (n == -1) ? nxt_socket_errno : 0; nxt_debug(c->socket.task, "readv(%d, %ui): %z", c->socket.fd, niov, n); if (n > 0) { if ((size_t) n < rb.size) { c->socket.read_ready = 0; } return n; } if (n == 0) { c->socket.closed = 1; c->socket.read_ready = 0; return n; } /* n == -1 */ switch (err) { case NXT_EAGAIN: nxt_debug(c->socket.task, "readv() %E", err); c->socket.read_ready = 0; return NXT_AGAIN; case NXT_EINTR: nxt_debug(c->socket.task, "readv() %E", err); continue; default: c->socket.error = err; nxt_log(c->socket.task, nxt_socket_error_level(err), "readv(%d, %ui) failed %E", c->socket.fd, niov, err); return NXT_ERROR; } } } ssize_t nxt_conn_io_recv(nxt_conn_t *c, void *buf, size_t size, nxt_uint_t flags) { ssize_t n; nxt_err_t err; for ( ;; ) { n = recv(c->socket.fd, buf, size, flags); err = (n == -1) ? nxt_socket_errno : 0; nxt_debug(c->socket.task, "recv(%d, %p, %uz, 0x%ui): %z", c->socket.fd, buf, size, flags, n); if (n > 0) { if ((size_t) n < size && (flags & MSG_PEEK) == 0) { c->socket.read_ready = 0; } return n; } if (n == 0) { c->socket.closed = 1; if ((flags & MSG_PEEK) == 0) { c->socket.read_ready = 0; } return n; } /* n == -1 */ switch (err) { case NXT_EAGAIN: nxt_debug(c->socket.task, "recv() %E", err); c->socket.read_ready = 0; return NXT_AGAIN; case NXT_EINTR: nxt_debug(c->socket.task, "recv() %E", err); continue; default: c->socket.error = err; nxt_log(c->socket.task, nxt_socket_error_level(err), "recv(%d, %p, %uz, %ui) failed %E", c->socket.fd, buf, size, flags, err); return NXT_ERROR; } } }