/* * Copyright (C) Igor Sysoev * Copyright (C) NGINX, Inc. */ #include #include #include #include #define NXT_PORT_MAX_ENQUEUE_BUF_SIZE \ (int) (NXT_PORT_QUEUE_MSG_SIZE - sizeof(nxt_port_msg_t)) static nxt_bool_t nxt_port_can_enqueue_buf(nxt_buf_t *b); static uint8_t nxt_port_enqueue_buf(nxt_task_t *task, nxt_port_msg_t *pm, void *qbuf, nxt_buf_t *b); static nxt_int_t nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port, nxt_port_send_msg_t *msg); static nxt_port_send_msg_t *nxt_port_msg_alloc(const nxt_port_send_msg_t *m); static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data); static nxt_port_send_msg_t *nxt_port_msg_first(nxt_port_t *port); nxt_inline void nxt_port_msg_close_fd(nxt_port_send_msg_t *msg); nxt_inline void nxt_port_close_fds(nxt_fd_t *fd); static nxt_buf_t *nxt_port_buf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b, size_t sent, nxt_bool_t mmap_mode); static nxt_port_send_msg_t *nxt_port_msg_insert_tail(nxt_port_t *port, nxt_port_send_msg_t *msg); static void nxt_port_read_handler(nxt_task_t *task, void *obj, void *data); static void nxt_port_queue_read_handler(nxt_task_t *task, void *obj, void *data); static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, nxt_port_recv_msg_t *msg); static nxt_buf_t *nxt_port_buf_alloc(nxt_port_t *port); static void nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b); static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data); nxt_int_t nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port, size_t max_size) { nxt_int_t sndbuf, rcvbuf, size; nxt_socket_t snd, rcv; port->socket.task = task; port->pair[0] = -1; port->pair[1] = -1; if (nxt_slow_path(nxt_socketpair_create(task, port->pair) != NXT_OK)) { goto socketpair_fail; } snd = port->pair[1]; sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF); if (nxt_slow_path(sndbuf < 0)) { goto getsockopt_fail; } rcv = port->pair[0]; rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF); if (nxt_slow_path(rcvbuf < 0)) { goto getsockopt_fail; } if (max_size == 0) { max_size = 16 * 1024; } if ((size_t) sndbuf < max_size) { /* * On Unix domain sockets * Linux uses 224K on both send and receive directions; * FreeBSD, MacOSX, NetBSD, and OpenBSD use 2K buffer size * on send direction and 4K buffer size on receive direction; * Solaris uses 16K on send direction and 5K on receive direction. */ (void) nxt_socket_setsockopt(task, snd, SOL_SOCKET, SO_SNDBUF, max_size); sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF); if (nxt_slow_path(sndbuf < 0)) { goto getsockopt_fail; } size = sndbuf * 4; if (rcvbuf < size) { (void) nxt_socket_setsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF, size); rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF); if (nxt_slow_path(rcvbuf < 0)) { goto getsockopt_fail; } } } port->max_size = nxt_min(max_size, (size_t) sndbuf); port->max_share = (64 * 1024); return NXT_OK; getsockopt_fail: nxt_socket_close(task, port->pair[0]); nxt_socket_close(task, port->pair[1]); socketpair_fail: return NXT_ERROR; } void nxt_port_destroy(nxt_port_t *port) { nxt_socket_close(port->socket.task, port->socket.fd); nxt_mp_destroy(port->mem_pool); } void nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port) { port->socket.fd = port->pair[1]; port->socket.log = &nxt_main_log; port->socket.write_ready = 1; port->engine = task->thread->engine; port->socket.write_work_queue = &port->engine->fast_work_queue; port->socket.write_handler = nxt_port_write_handler; port->socket.error_handler = nxt_port_error_handler; } void nxt_port_write_close(nxt_port_t *port) { nxt_socket_close(port->socket.task, port->pair[1]); port->pair[1] = -1; } static void nxt_port_release_send_msg(nxt_port_send_msg_t *msg) { if (msg->allocated) { nxt_free(msg); } } nxt_int_t nxt_port_socket_write2(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, nxt_fd_t fd, nxt_fd_t fd2, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b) { int notify; uint8_t qmsg_size; nxt_int_t res; nxt_port_send_msg_t msg; struct { nxt_port_msg_t pm; uint8_t buf[NXT_PORT_MAX_ENQUEUE_BUF_SIZE]; } qmsg; msg.link.next = NULL; msg.link.prev = NULL; msg.buf = b; msg.share = 0; msg.fd[0] = fd; msg.fd[1] = fd2; msg.close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0; msg.allocated = 0; msg.port_msg.stream = stream; msg.port_msg.pid = nxt_pid; msg.port_msg.reply_port = reply_port; msg.port_msg.type = type & NXT_PORT_MSG_MASK; msg.port_msg.last = (type & NXT_PORT_MSG_LAST) != 0; msg.port_msg.mmap = 0; msg.port_msg.nf = 0; msg.port_msg.mf = 0; if (port->queue != NULL && type != _NXT_PORT_MSG_READ_QUEUE) { if (fd == -1 && nxt_port_can_enqueue_buf(b)) { qmsg.pm = msg.port_msg; qmsg_size = sizeof(qmsg.pm); if (b != NULL) { qmsg_size += nxt_port_enqueue_buf(task, &qmsg.pm, qmsg.buf, b); } res = nxt_port_queue_send(port->queue, &qmsg, qmsg_size, ¬ify); nxt_debug(task, "port{%d,%d} %d: enqueue %d notify %d, %d", (int) port->pid, (int) port->id, port->socket.fd, (int) qmsg_size, notify, res); if (b != NULL && nxt_fast_path(res == NXT_OK)) { if (qmsg.pm.mmap) { b->is_port_mmap_sent = 1; } b->mem.pos = b->mem.free; nxt_work_queue_add(&task->thread->engine->fast_work_queue, b->completion_handler, task, b, b->parent); } if (notify == 0) { return res; } msg.port_msg.type = _NXT_PORT_MSG_READ_QUEUE; msg.buf = NULL; } else { qmsg.buf[0] = _NXT_PORT_MSG_READ_SOCKET; res = nxt_port_queue_send(port->queue, qmsg.buf, 1, ¬ify); nxt_debug(task, "port{%d,%d} %d: enqueue 1 notify %d, %d", (int) port->pid, (int) port->id, port->socket.fd, notify, res); if (nxt_slow_path(res == NXT_AGAIN)) { return NXT_AGAIN; } } } res = nxt_port_msg_chk_insert(task, port, &msg); if (nxt_fast_path(res == NXT_DECLINED)) { nxt_port_write_handler(task, &port->socket, &msg); res = NXT_OK; } return res; } static nxt_bool_t nxt_port_can_enqueue_buf(nxt_buf_t *b) { if (b == NULL) { return 1; } if (b->next != NULL) { return 0; } return (nxt_buf_mem_used_size(&b->mem) <= NXT_PORT_MAX_ENQUEUE_BUF_SIZE || nxt_buf_is_port_mmap(b)); } static uint8_t nxt_port_enqueue_buf(nxt_task_t *task, nxt_port_msg_t *pm, void *qbuf, nxt_buf_t *b) { ssize_t size; nxt_port_mmap_msg_t *mm; nxt_port_mmap_header_t *hdr; nxt_port_mmap_handler_t *mmap_handler; size = nxt_buf_mem_used_size(&b->mem); if (size <= NXT_PORT_MAX_ENQUEUE_BUF_SIZE) { nxt_memcpy(qbuf, b->mem.pos, size); return size; } mmap_handler = b->parent; hdr = mmap_handler->hdr; mm = qbuf; mm->mmap_id = hdr->id; mm->chunk_id = nxt_port_mmap_chunk_id(hdr, b->mem.pos); mm->size = nxt_buf_mem_used_size(&b->mem); pm->mmap = 1; nxt_debug(task, "mmap_msg={%D, %D, %D}", mm->mmap_id, mm->chunk_id, mm->size); return sizeof(nxt_port_mmap_msg_t); } static nxt_int_t nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port, nxt_port_send_msg_t *msg) { nxt_int_t res; nxt_thread_mutex_lock(&port->write_mutex); if (nxt_fast_path(port->socket.write_ready && nxt_queue_is_empty(&port->messages))) { res = NXT_DECLINED; } else { msg = nxt_port_msg_alloc(msg); if (nxt_fast_path(msg != NULL)) { nxt_queue_insert_tail(&port->messages, &msg->link); nxt_port_use(task, port, 1); res = NXT_OK; } else { res = NXT_ERROR; } } nxt_thread_mutex_unlock(&port->write_mutex); return res; } static nxt_port_send_msg_t * nxt_port_msg_alloc(const nxt_port_send_msg_t *m) { nxt_port_send_msg_t *msg; msg = nxt_malloc(sizeof(nxt_port_send_msg_t)); if (nxt_slow_path(msg == NULL)) { return NULL; } *msg = *m; msg->allocated = 1; return msg; } static void nxt_port_fd_block_write(nxt_task_t *task, nxt_port_t *port, void *data) { nxt_fd_event_block_write(task->thread->engine, &port->socket); } static void nxt_port_fd_enable_write(nxt_task_t *task, nxt_port_t *port, void *data) { nxt_fd_event_enable_write(task->thread->engine, &port->socket); } static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) { int use_delta; size_t plain_size; ssize_t n; uint32_t mmsg_buf[3 * NXT_IOBUF_MAX * 10]; nxt_bool_t block_write, enable_write; nxt_port_t *port; struct iovec iov[NXT_IOBUF_MAX * 10]; nxt_work_queue_t *wq; nxt_port_method_t m; nxt_port_send_msg_t *msg; nxt_sendbuf_coalesce_t sb; port = nxt_container_of(obj, nxt_port_t, socket); block_write = 0; enable_write = 0; use_delta = 0; wq = &task->thread->engine->fast_work_queue; do { if (data) { msg = data; } else { msg = nxt_port_msg_first(port); if (msg == NULL) { block_write = 1; goto cleanup; } } next_fragment: iov[0].iov_base = &msg->port_msg; iov[0].iov_len = sizeof(nxt_port_msg_t); sb.buf = msg->buf; sb.iobuf = &iov[1]; sb.nmax = NXT_IOBUF_MAX - 1; sb.sync = 0; sb.last = 0; sb.size = 0; sb.limit = port->max_size; sb.limit_reached = 0; sb.nmax_reached = 0; m = nxt_port_mmap_get_method(task, port, msg->buf); if (m == NXT_PORT_METHOD_MMAP) { sb.limit = (1ULL << 31) - 1; sb.nmax = nxt_min(NXT_IOBUF_MAX * 10 - 1, port->max_size / PORT_MMAP_MIN_SIZE); } sb.limit -= iov[0].iov_len; nxt_sendbuf_mem_coalesce(task, &sb); plain_size = sb.size; /* * Send through mmap enabled only when payload * is bigger than PORT_MMAP_MIN_SIZE. */ if (m == NXT_PORT_METHOD_MMAP && plain_size > PORT_MMAP_MIN_SIZE) { nxt_port_mmap_write(task, port, msg, &sb, mmsg_buf); } else { m = NXT_PORT_METHOD_PLAIN; } msg->port_msg.last |= sb.last; msg->port_msg.mf = sb.limit_reached || sb.nmax_reached; n = nxt_socketpair_send(&port->socket, msg->fd, iov, sb.niov + 1); if (n > 0) { if (nxt_slow_path((size_t) n != sb.size + iov[0].iov_len)) { nxt_alert(task, "port %d: short write: %z instead of %uz", port->socket.fd, n, sb.size + iov[0].iov_len); goto fail; } nxt_port_msg_close_fd(msg); msg->buf = nxt_port_buf_completion(task, wq, msg->buf, plain_size, m == NXT_PORT_METHOD_MMAP); if (msg->buf != NULL) { nxt_debug(task, "port %d: frag stream #%uD", port->socket.fd, msg->port_msg.stream); /* * A file descriptor is sent only * in the first message of a stream. */ msg->fd[0] = -1; msg->fd[1] = -1; msg->share += n; msg->port_msg.nf = 1; if (msg->share >= port->max_share) { msg->share = 0; if (msg->link.next != NULL) { nxt_thread_mutex_lock(&port->write_mutex); nxt_queue_remove(&msg->link); nxt_queue_insert_tail(&port->messages, &msg->link); nxt_thread_mutex_unlock(&port->write_mutex); } else { msg = nxt_port_msg_insert_tail(port, msg); if (nxt_slow_path(msg == NULL)) { goto fail; } use_delta++; } } else { goto next_fragment; } } else { if (msg->link.next != NULL) { nxt_thread_mutex_lock(&port->write_mutex); nxt_queue_remove(&msg->link); msg->link.next = NULL; nxt_thread_mutex_unlock(&port->write_mutex); use_delta--; } nxt_port_release_send_msg(msg); } if (data != NULL) { goto cleanup; } } else { if (nxt_slow_path(n == NXT_ERROR)) { if (msg->link.next == NULL) { nxt_port_msg_close_fd(msg); nxt_port_release_send_msg(msg); } goto fail; } if (msg->link.next == NULL) { msg = nxt_port_msg_insert_tail(port, msg); if (nxt_slow_path(msg == NULL)) { goto fail; } use_delta++; } } } while (port->socket.write_ready); if (nxt_fd_event_is_disabled(port->socket.write)) { enable_write = 1; } goto cleanup; fail: use_delta++; nxt_work_queue_add(wq, nxt_port_error_handler, task, &port->socket, &port->socket); cleanup: if (block_write && nxt_fd_event_is_active(port->socket.write)) { nxt_port_post(task, port, nxt_port_fd_block_write, NULL); } if (enable_write) { nxt_port_post(task, port, nxt_port_fd_enable_write, NULL); } if (use_delta != 0) { nxt_port_use(task, port, use_delta); } } static nxt_port_send_msg_t * nxt_port_msg_first(nxt_port_t *port) { nxt_queue_link_t *lnk; nxt_port_send_msg_t *msg; nxt_thread_mutex_lock(&port->write_mutex); lnk = nxt_queue_first(&port->messages); if (lnk == nxt_queue_tail(&port->messages)) { msg = NULL; } else { msg = nxt_queue_link_data(lnk, nxt_port_send_msg_t, link); } nxt_thread_mutex_unlock(&port->write_mutex); return msg; } nxt_inline void nxt_port_msg_close_fd(nxt_port_send_msg_t *msg) { if (!msg->close_fd) { return; } nxt_port_close_fds(msg->fd); } nxt_inline void nxt_port_close_fds(nxt_fd_t *fd) { if (fd[0] != -1) { nxt_fd_close(fd[0]); fd[0] = -1; } if (fd[1] != -1) { nxt_fd_close(fd[1]); fd[1] = -1; } } static nxt_buf_t * nxt_port_buf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b, size_t sent, nxt_bool_t mmap_mode) { size_t size; nxt_buf_t *next; while (b != NULL) { nxt_prefetch(b->next); if (!nxt_buf_is_sync(b)) { size = nxt_buf_used_size(b); if (size != 0) { if (sent == 0) { break; } if (nxt_buf_is_port_mmap(b) && mmap_mode) { /* * buffer has been sent to other side which is now * responsible for shared memory bucket release */ b->is_port_mmap_sent = 1; } if (sent < size) { if (nxt_buf_is_mem(b)) { b->mem.pos += sent; } if (nxt_buf_is_file(b)) { b->file_pos += sent; } break; } /* b->mem.free is NULL in file-only buffer. */ b->mem.pos = b->mem.free; if (nxt_buf_is_file(b)) { b->file_pos = b->file_end; } sent -= size; } } nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent); next = b->next; b->next = NULL; b = next; } return b; } static nxt_port_send_msg_t * nxt_port_msg_insert_tail(nxt_port_t *port, nxt_port_send_msg_t *msg) { if (msg->allocated == 0) { msg = nxt_port_msg_alloc(msg); if (nxt_slow_path(msg == NULL)) { return NULL; } } nxt_thread_mutex_lock(&port->write_mutex); nxt_queue_insert_tail(&port->messages, &msg->link); nxt_thread_mutex_unlock(&port->write_mutex); return msg; } void nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port) { port->socket.fd = port->pair[0]; port->socket.log = &nxt_main_log; port->engine = task->thread->engine; port->socket.read_work_queue = &port->engine->fast_work_queue; port->socket.read_handler = port->queue != NULL ? nxt_port_queue_read_handler : nxt_port_read_handler; port->socket.error_handler = nxt_port_error_handler; nxt_fd_event_enable_read(port->engine, &port->socket); } void nxt_port_read_close(nxt_port_t *port) { port->socket.read_ready = 0; port->socket.read = NXT_EVENT_INACTIVE; nxt_socket_close(port->socket.task, port->pair[0]); port->pair[0] = -1; } static void nxt_port_read_handler(nxt_task_t *task, void *obj, void *data) { ssize_t n; nxt_buf_t *b; nxt_int_t ret; nxt_port_t *port; nxt_recv_oob_t oob; nxt_port_recv_msg_t msg; struct iovec iov[2]; port = msg.port = nxt_container_of(obj, nxt_port_t, socket); nxt_assert(port->engine == task->thread->engine); for ( ;; ) { b = nxt_port_buf_alloc(port); if (nxt_slow_path(b == NULL)) { /* TODO: disable event for some time */ } iov[0].iov_base = &msg.port_msg; iov[0].iov_len = sizeof(nxt_port_msg_t); iov[1].iov_base = b->mem.pos; iov[1].iov_len = port->max_size; n = nxt_socketpair_recv(&port->socket, iov, 2, &oob); if (n > 0) { msg.fd[0] = -1; msg.fd[1] = -1; ret = nxt_socket_msg_oob_get(&oob, msg.fd, nxt_recv_msg_cmsg_pid_ref(&msg)); if (nxt_slow_path(ret != NXT_OK)) { nxt_alert(task, "failed to get oob data from %d", port->socket.fd); nxt_port_close_fds(msg.fd); goto fail; } msg.buf = b; msg.size = n; nxt_port_read_msg_process(task, port, &msg); /* * To disable instant completion or buffer re-usage, * handler should reset 'msg.buf'. */ if (msg.buf == b) { nxt_port_buf_free(port, b); } if (port->socket.read_ready) { continue; } return; } if (n == NXT_AGAIN) { nxt_port_buf_free(port, b); nxt_fd_event_enable_read(task->thread->engine, &port->socket); return; } fail: /* n == 0 || error */ nxt_work_queue_add(&task->thread->engine->fast_work_queue, nxt_port_error_handler, task, &port->socket, NULL); return; } } static void nxt_port_queue_read_handler(nxt_task_t *task, void *obj, void *data) { ssize_t n; nxt_buf_t *b; nxt_int_t ret; nxt_port_t *port; struct iovec iov[2]; nxt_recv_oob_t oob; nxt_port_queue_t *queue; nxt_port_recv_msg_t msg, *smsg; uint8_t qmsg[NXT_PORT_QUEUE_MSG_SIZE]; port = nxt_container_of(obj, nxt_port_t, socket); msg.port = port; nxt_assert(port->engine == task->thread->engine); queue = port->queue; nxt_atomic_fetch_add(&queue->nitems, 1); for ( ;; ) { if (port->from_socket == 0) { n = nxt_port_queue_recv(queue, qmsg); if (n < 0 && !port->socket.read_ready) { nxt_atomic_fetch_add(&queue->nitems, -1); n = nxt_port_queue_recv(queue, qmsg); if (n < 0) { return; } nxt_atomic_fetch_add(&queue->nitems, 1); } if (n == 1 && qmsg[0] == _NXT_PORT_MSG_READ_SOCKET) { port->from_socket++; nxt_debug(task, "port{%d,%d} %d: dequeue 1 read_socket %d", (int) port->pid, (int) port->id, port->socket.fd, port->from_socket); continue; } nxt_debug(task, "port{%d,%d} %d: dequeue %d", (int) port->pid, (int) port->id, port->socket.fd, (int) n); } else { if ((smsg = port->socket_msg) != NULL && smsg->size != 0) { msg.port_msg = smsg->port_msg; b = smsg->buf; n = smsg->size; msg.fd[0] = smsg->fd[0]; msg.fd[1] = smsg->fd[1]; smsg->size = 0; port->from_socket--; nxt_debug(task, "port{%d,%d} %d: use suspended message %d", (int) port->pid, (int) port->id, port->socket.fd, (int) n); goto process; } n = -1; } if (n < 0 && !port->socket.read_ready) { nxt_atomic_fetch_add(&queue->nitems, -1); return; } b = nxt_port_buf_alloc(port); if (nxt_slow_path(b == NULL)) { /* TODO: disable event for some time */ } if (n >= (ssize_t) sizeof(nxt_port_msg_t)) { nxt_memcpy(&msg.port_msg, qmsg, sizeof(nxt_port_msg_t)); if (n > (ssize_t) sizeof(nxt_port_msg_t)) { nxt_memcpy(b->mem.pos, qmsg + sizeof(nxt_port_msg_t), n - sizeof(nxt_port_msg_t)); } } else { iov[0].iov_base = &msg.port_msg; iov[0].iov_len = sizeof(nxt_port_msg_t); iov[1].iov_base = b->mem.pos; iov[1].iov_len = port->max_size; n = nxt_socketpair_recv(&port->socket, iov, 2, &oob); if (n > 0) { msg.fd[0] = -1; msg.fd[1] = -1; ret = nxt_socket_msg_oob_get(&oob, msg.fd, nxt_recv_msg_cmsg_pid_ref(&msg)); if (nxt_slow_path(ret != NXT_OK)) { nxt_alert(task, "failed to get oob data from %d", port->socket.fd); nxt_port_close_fds(msg.fd); return; } } if (n == (ssize_t) sizeof(nxt_port_msg_t) && msg.port_msg.type == _NXT_PORT_MSG_READ_QUEUE) { nxt_port_buf_free(port, b); nxt_debug(task, "port{%d,%d} %d: recv %d read_queue", (int) port->pid, (int) port->id, port->socket.fd, (int) n); continue; } nxt_debug(task, "port{%d,%d} %d: recvmsg %d", (int) port->pid, (int) port->id, port->socket.fd, (int) n); if (n > 0) { if (port->from_socket == 0) { nxt_debug(task, "port{%d,%d} %d: suspend message %d", (int) port->pid, (int) port->id, port->socket.fd, (int) n); smsg = port->socket_msg; if (nxt_slow_path(smsg == NULL)) { smsg = nxt_mp_alloc(port->mem_pool, sizeof(nxt_port_recv_msg_t)); if (nxt_slow_path(smsg == NULL)) { nxt_alert(task, "port{%d,%d} %d: suspend message " "failed", (int) port->pid, (int) port->id, port->socket.fd); return; } port->socket_msg = smsg; } else { if (nxt_slow_path(smsg->size != 0)) { nxt_alert(task, "port{%d,%d} %d: too many suspend " "messages", (int) port->pid, (int) port->id, port->socket.fd); return; } } smsg->port_msg = msg.port_msg; smsg->buf = b; smsg->size = n; smsg->fd[0] = msg.fd[0]; smsg->fd[1] = msg.fd[1]; continue; } port->from_socket--; } } process: if (n > 0) { msg.buf = b; msg.size = n; nxt_port_read_msg_process(task, port, &msg); /* * To disable instant completion or buffer re-usage, * handler should reset 'msg.buf'. */ if (msg.buf == b) { nxt_port_buf_free(port, b); } continue; } if (n == NXT_AGAIN) { nxt_port_buf_free(port, b); nxt_fd_event_enable_read(task->thread->engine, &port->socket); continue; } /* n == 0 || n == NXT_ERROR */ nxt_work_queue_add(&task->thread->engine->fast_work_queue, nxt_port_error_handler, task, &port->socket, NULL); return; } } typedef struct { uint32_t stream; uint32_t pid; } nxt_port_frag_key_t; static nxt_int_t nxt_port_lvlhsh_frag_test(nxt_lvlhsh_query_t *lhq, void *data) { nxt_port_recv_msg_t *fmsg; nxt_port_frag_key_t *frag_key; fmsg = data; frag_key = (nxt_port_frag_key_t *) lhq->key.start; if (lhq->key.length == sizeof(nxt_port_frag_key_t) && frag_key->stream == fmsg->port_msg.stream && frag_key->pid == (uint32_t) fmsg->port_msg.pid) { return NXT_OK; } return NXT_DECLINED; } static void * nxt_port_lvlhsh_frag_alloc(void *ctx, size_t size) { return nxt_mp_align(ctx, size, size); } static void nxt_port_lvlhsh_frag_free(void *ctx, void *p) { nxt_mp_free(ctx, p); } static const nxt_lvlhsh_proto_t lvlhsh_frag_proto nxt_aligned(64) = { NXT_LVLHSH_DEFAULT, nxt_port_lvlhsh_frag_test, nxt_port_lvlhsh_frag_alloc, nxt_port_lvlhsh_frag_free, }; static nxt_port_recv_msg_t * nxt_port_frag_start(nxt_task_t *task, nxt_port_t *port, nxt_port_recv_msg_t *msg) { nxt_int_t res; nxt_lvlhsh_query_t lhq; nxt_port_recv_msg_t *fmsg; nxt_port_frag_key_t frag_key; nxt_debug(task, "start frag stream #%uD", msg->port_msg.stream); fmsg = nxt_mp_alloc(port->mem_pool, sizeof(nxt_port_recv_msg_t)); if (nxt_slow_path(fmsg == NULL)) { return NULL; } *fmsg = *msg; frag_key.stream = fmsg->port_msg.stream; frag_key.pid = fmsg->port_msg.pid; lhq.key_hash = nxt_murmur_hash2(&frag_key, sizeof(nxt_port_frag_key_t)); lhq.key.length = sizeof(nxt_port_frag_key_t); lhq.key.start = (u_char *) &frag_key; lhq.proto = &lvlhsh_frag_proto; lhq.replace = 0; lhq.value = fmsg; lhq.pool = port->mem_pool; res = nxt_lvlhsh_insert(&port->frags, &lhq); switch (res) { case NXT_OK: return fmsg; case NXT_DECLINED: nxt_log(task, NXT_LOG_WARN, "duplicate frag stream #%uD", fmsg->port_msg.stream); nxt_mp_free(port->mem_pool, fmsg); return NULL; default: nxt_log(task, NXT_LOG_WARN, "failed to add frag stream #%uD", fmsg->port_msg.stream); nxt_mp_free(port->mem_pool, fmsg); return NULL; } } static nxt_port_recv_msg_t * nxt_port_frag_find(nxt_task_t *task, nxt_port_t *port, nxt_port_recv_msg_t *msg) { nxt_int_t res; nxt_bool_t last; nxt_lvlhsh_query_t lhq; nxt_port_frag_key_t frag_key; last = msg->port_msg.mf == 0; nxt_debug(task, "%s frag stream #%uD", last ? "last" : "next", msg->port_msg.stream); frag_key.stream = msg->port_msg.stream; frag_key.pid = msg->port_msg.pid; lhq.key_hash = nxt_murmur_hash2(&frag_key, sizeof(nxt_port_frag_key_t)); lhq.key.length = sizeof(nxt_port_frag_key_t); lhq.key.start = (u_char *) &frag_key; lhq.proto = &lvlhsh_frag_proto; lhq.pool = port->mem_pool; res = last != 0 ? nxt_lvlhsh_delete(&port->frags, &lhq) : nxt_lvlhsh_find(&port->frags, &lhq); switch (res) { case NXT_OK: return lhq.value; default: nxt_log(task, NXT_LOG_INFO, "frag stream #%uD not found", frag_key.stream); return NULL; } } static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, nxt_port_recv_msg_t *msg) { nxt_buf_t *b, *orig_b, *next; nxt_port_recv_msg_t *fmsg; if (nxt_slow_path(msg->size < sizeof(nxt_port_msg_t))) { nxt_alert(task, "port %d: too small message:%uz", port->socket.fd, msg->size); nxt_port_close_fds(msg->fd); return; } /* adjust size to actual buffer used size */ msg->size -= sizeof(nxt_port_msg_t); b = orig_b = msg->buf; b->mem.free += msg->size; msg->cancelled = 0; if (nxt_slow_path(msg->port_msg.nf != 0)) { fmsg = nxt_port_frag_find(task, port, msg); if (nxt_slow_path(fmsg == NULL)) { goto fmsg_failed; } if (nxt_fast_path(fmsg->cancelled == 0)) { if (msg->port_msg.mmap) { nxt_port_mmap_read(task, msg); } nxt_buf_chain_add(&fmsg->buf, msg->buf); fmsg->size += msg->size; msg->buf = NULL; b = NULL; if (nxt_fast_path(msg->port_msg.mf == 0)) { b = fmsg->buf; port->handler(task, fmsg); msg->buf = fmsg->buf; msg->fd[0] = fmsg->fd[0]; msg->fd[1] = fmsg->fd[1]; /* * To disable instant completion or buffer re-usage, * handler should reset 'msg.buf'. */ if (!msg->port_msg.mmap && msg->buf == b) { nxt_port_buf_free(port, b); } } } if (nxt_fast_path(msg->port_msg.mf == 0)) { nxt_mp_free(port->mem_pool, fmsg); } } else { if (nxt_slow_path(msg->port_msg.mf != 0)) { if (msg->port_msg.mmap && msg->cancelled == 0) { nxt_port_mmap_read(task, msg); b = msg->buf; } fmsg = nxt_port_frag_start(task, port, msg); if (nxt_slow_path(fmsg == NULL)) { goto fmsg_failed; } fmsg->port_msg.nf = 0; fmsg->port_msg.mf = 0; if (nxt_fast_path(msg->cancelled == 0)) { msg->buf = NULL; msg->fd[0] = -1; msg->fd[1] = -1; b = NULL; } else { nxt_port_close_fds(msg->fd); } } else { if (nxt_fast_path(msg->cancelled == 0)) { if (msg->port_msg.mmap) { nxt_port_mmap_read(task, msg); b = msg->buf; } port->handler(task, msg); } } } fmsg_failed: if (msg->port_msg.mmap && orig_b != b) { /* * To disable instant buffer completion, * handler should reset 'msg->buf'. */ if (msg->buf == b) { /* complete mmap buffers */ while (b != NULL) { nxt_debug(task, "complete buffer %p", b); nxt_work_queue_add(port->socket.read_work_queue, b->completion_handler, task, b, b->parent); next = b->next; b->next = NULL; b = next; } } /* restore original buf */ msg->buf = orig_b; } } static nxt_buf_t * nxt_port_buf_alloc(nxt_port_t *port) { nxt_buf_t *b; if (port->free_bufs != NULL) { b = port->free_bufs; port->free_bufs = b->next; b->mem.pos = b->mem.start; b->mem.free = b->mem.start; b->next = NULL; } else { b = nxt_buf_mem_alloc(port->mem_pool, port->max_size, 0); if (nxt_slow_path(b == NULL)) { return NULL; } } return b; } static void nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b) { nxt_buf_chain_add(&b, port->free_bufs); port->free_bufs = b; } static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data) { int use_delta; nxt_buf_t *b, *next; nxt_port_t *port; nxt_work_queue_t *wq; nxt_port_send_msg_t *msg; nxt_debug(task, "port error handler %p", obj); /* TODO */ port = nxt_container_of(obj, nxt_port_t, socket); use_delta = 0; if (obj == data) { use_delta--; } wq = &task->thread->engine->fast_work_queue; nxt_thread_mutex_lock(&port->write_mutex); nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) { nxt_port_msg_close_fd(msg); for (b = msg->buf; b != NULL; b = next) { next = b->next; b->next = NULL; if (nxt_buf_is_sync(b)) { continue; } nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent); } nxt_queue_remove(&msg->link); use_delta--; nxt_port_release_send_msg(msg); } nxt_queue_loop; nxt_thread_mutex_unlock(&port->write_mutex); if (use_delta != 0) { nxt_port_use(task, port, use_delta); } }