diff options
Diffstat (limited to '')
-rw-r--r-- | src/nxt_port_socket.c | 329 |
1 files changed, 294 insertions, 35 deletions
diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c index 4e3eaef6..9d8096b2 100644 --- a/src/nxt_port_socket.c +++ b/src/nxt_port_socket.c @@ -5,6 +5,7 @@ */ #include <nxt_main.h> +#include <nxt_port_queue.h> static nxt_int_t nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port, @@ -17,6 +18,8 @@ static nxt_buf_t *nxt_port_buf_completion(nxt_task_t *task, static nxt_port_send_msg_t *nxt_port_msg_insert_tail(nxt_port_t *port, nxt_port_send_msg_t *msg); static void nxt_port_read_handler(nxt_task_t *task, void *obj, void *data); +static void nxt_port_queue_read_handler(nxt_task_t *task, void *obj, + void *data); static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, nxt_port_recv_msg_t *msg); static nxt_buf_t *nxt_port_buf_alloc(nxt_port_t *port); @@ -143,26 +146,26 @@ nxt_port_release_send_msg(nxt_port_send_msg_t *msg) nxt_int_t -nxt_port_socket_twrite(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, - nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b, - void *tracking) +nxt_port_socket_write2(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, + nxt_fd_t fd, nxt_fd_t fd2, uint32_t stream, nxt_port_id_t reply_port, + nxt_buf_t *b) { + int notify; + uint8_t *p; nxt_int_t res; nxt_port_send_msg_t msg; + uint8_t qmsg[NXT_PORT_QUEUE_MSG_SIZE]; msg.link.next = NULL; msg.link.prev = NULL; msg.buf = b; msg.share = 0; - msg.fd = fd; + msg.fd[0] = fd; + msg.fd[1] = fd2; msg.close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0; msg.allocated = 0; - if (tracking != NULL) { - nxt_port_mmap_tracking_write(msg.tracking_msg, tracking); - } - msg.port_msg.stream = stream; msg.port_msg.pid = nxt_pid; msg.port_msg.reply_port = reply_port; @@ -171,7 +174,46 @@ nxt_port_socket_twrite(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, msg.port_msg.mmap = 0; msg.port_msg.nf = 0; msg.port_msg.mf = 0; - msg.port_msg.tracking = tracking != NULL; + + if (port->queue != NULL && type != _NXT_PORT_MSG_READ_QUEUE) { + + if (fd == -1 + && (b == NULL + || nxt_buf_mem_used_size(&b->mem) + <= (int) (NXT_PORT_QUEUE_MSG_SIZE - sizeof(nxt_port_msg_t)))) + { + p = nxt_cpymem(qmsg, &msg.port_msg, sizeof(nxt_port_msg_t)); + if (b != NULL) { + p = nxt_cpymem(p, b->mem.pos, nxt_buf_mem_used_size(&b->mem)); + } + + res = nxt_port_queue_send(port->queue, qmsg, p - qmsg, ¬ify); + + nxt_debug(task, "port{%d,%d} %d: enqueue %d notify %d, %d", + (int) port->pid, (int) port->id, port->socket.fd, + (int) (p - qmsg), notify, res); + + if (notify == 0) { + return res; + } + + msg.port_msg.type = _NXT_PORT_MSG_READ_QUEUE; + msg.buf = NULL; + + } else { + qmsg[0] = _NXT_PORT_MSG_READ_SOCKET; + + res = nxt_port_queue_send(port->queue, qmsg, 1, ¬ify); + + nxt_debug(task, "port{%d,%d} %d: enqueue 1 notify %d, %d", + (int) port->pid, (int) port->id, port->socket.fd, + notify, res); + + if (nxt_slow_path(res == NXT_AGAIN)) { + return NXT_AGAIN; + } + } + } res = nxt_port_msg_chk_insert(task, port, &msg); if (nxt_fast_path(res == NXT_DECLINED)) { @@ -307,10 +349,6 @@ next_fragment: port->max_size / PORT_MMAP_MIN_SIZE); } - if (msg->port_msg.tracking) { - iov[0].iov_len += sizeof(msg->tracking_msg); - } - sb.limit -= iov[0].iov_len; nxt_sendbuf_mem_coalesce(task, &sb); @@ -340,10 +378,18 @@ next_fragment: goto fail; } - if (msg->fd != -1 && msg->close_fd != 0) { - nxt_fd_close(msg->fd); + if (msg->close_fd) { + if (msg->fd[0] != -1) { + nxt_fd_close(msg->fd[0]); + + msg->fd[0] = -1; + } + + if (msg->fd[1] != -1) { + nxt_fd_close(msg->fd[1]); - msg->fd = -1; + msg->fd[1] = -1; + } } msg->buf = nxt_port_buf_completion(task, wq, msg->buf, plain_size, @@ -357,10 +403,10 @@ next_fragment: * A file descriptor is sent only * in the first message of a stream. */ - msg->fd = -1; + msg->fd[0] = -1; + msg->fd[1] = -1; msg->share += n; msg->port_msg.nf = 1; - msg->port_msg.tracking = 0; if (msg->share >= port->max_share) { msg->share = 0; @@ -568,7 +614,9 @@ nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port) port->engine = task->thread->engine; port->socket.read_work_queue = &port->engine->fast_work_queue; - port->socket.read_handler = nxt_port_read_handler; + port->socket.read_handler = port->queue != NULL + ? nxt_port_queue_read_handler + : nxt_port_read_handler; port->socket.error_handler = nxt_port_error_handler; nxt_fd_event_enable_read(port->engine, &port->socket); @@ -612,7 +660,7 @@ nxt_port_read_handler(nxt_task_t *task, void *obj, void *data) iov[1].iov_base = b->mem.pos; iov[1].iov_len = port->max_size; - n = nxt_socketpair_recv(&port->socket, &msg.fd, iov, 2); + n = nxt_socketpair_recv(&port->socket, msg.fd, iov, 2); if (n > 0) { @@ -652,6 +700,204 @@ nxt_port_read_handler(nxt_task_t *task, void *obj, void *data) } +static void +nxt_port_queue_read_handler(nxt_task_t *task, void *obj, void *data) +{ + ssize_t n; + nxt_buf_t *b; + nxt_port_t *port; + struct iovec iov[2]; + nxt_port_queue_t *queue; + nxt_port_recv_msg_t msg, *smsg; + uint8_t qmsg[NXT_PORT_QUEUE_MSG_SIZE]; + + port = nxt_container_of(obj, nxt_port_t, socket); + msg.port = port; + + nxt_assert(port->engine == task->thread->engine); + + queue = port->queue; + nxt_atomic_fetch_add(&queue->nitems, 1); + + for ( ;; ) { + + if (port->from_socket == 0) { + n = nxt_port_queue_recv(queue, qmsg); + + if (n < 0 && !port->socket.read_ready) { + nxt_atomic_fetch_add(&queue->nitems, -1); + + n = nxt_port_queue_recv(queue, qmsg); + if (n < 0) { + return; + } + + nxt_atomic_fetch_add(&queue->nitems, 1); + } + + if (n == 1 && qmsg[0] == _NXT_PORT_MSG_READ_SOCKET) { + port->from_socket++; + + nxt_debug(task, "port{%d,%d} %d: dequeue 1 read_socket %d", + (int) port->pid, (int) port->id, port->socket.fd, + port->from_socket); + + continue; + } + + nxt_debug(task, "port{%d,%d} %d: dequeue %d", + (int) port->pid, (int) port->id, port->socket.fd, + (int) n); + + } else { + if ((smsg = port->socket_msg) != NULL && smsg->size != 0) { + msg.port_msg = smsg->port_msg; + b = smsg->buf; + n = smsg->size; + msg.fd[0] = smsg->fd[0]; + msg.fd[1] = smsg->fd[1]; + + smsg->size = 0; + + port->from_socket--; + + nxt_debug(task, "port{%d,%d} %d: use suspended message %d", + (int) port->pid, (int) port->id, port->socket.fd, + (int) n); + + goto process; + } + + n = -1; + } + + if (n < 0 && !port->socket.read_ready) { + nxt_atomic_fetch_add(&queue->nitems, -1); + return; + } + + b = nxt_port_buf_alloc(port); + + if (nxt_slow_path(b == NULL)) { + /* TODO: disable event for some time */ + } + + if (n >= (ssize_t) sizeof(nxt_port_msg_t)) { + nxt_memcpy(&msg.port_msg, qmsg, sizeof(nxt_port_msg_t)); + + if (n > (ssize_t) sizeof(nxt_port_msg_t)) { + nxt_memcpy(b->mem.pos, qmsg + sizeof(nxt_port_msg_t), + n - sizeof(nxt_port_msg_t)); + } + + } else { + iov[0].iov_base = &msg.port_msg; + iov[0].iov_len = sizeof(nxt_port_msg_t); + + iov[1].iov_base = b->mem.pos; + iov[1].iov_len = port->max_size; + + n = nxt_socketpair_recv(&port->socket, msg.fd, iov, 2); + + if (n == (ssize_t) sizeof(nxt_port_msg_t) + && msg.port_msg.type == _NXT_PORT_MSG_READ_QUEUE) + { + nxt_port_buf_free(port, b); + + nxt_debug(task, "port{%d,%d} %d: recv %d read_queue", + (int) port->pid, (int) port->id, port->socket.fd, + (int) n); + + continue; + } + + nxt_debug(task, "port{%d,%d} %d: recvmsg %d", + (int) port->pid, (int) port->id, port->socket.fd, + (int) n); + + if (n > 0) { + if (port->from_socket == 0) { + nxt_debug(task, "port{%d,%d} %d: suspend message %d", + (int) port->pid, (int) port->id, port->socket.fd, + (int) n); + + smsg = port->socket_msg; + + if (nxt_slow_path(smsg == NULL)) { + smsg = nxt_mp_alloc(port->mem_pool, + sizeof(nxt_port_recv_msg_t)); + + if (nxt_slow_path(smsg == NULL)) { + nxt_alert(task, "port{%d,%d} %d: suspend message " + "failed", + (int) port->pid, (int) port->id, + port->socket.fd); + + return; + } + + port->socket_msg = smsg; + + } else { + if (nxt_slow_path(smsg->size != 0)) { + nxt_alert(task, "port{%d,%d} %d: too many suspend " + "messages", + (int) port->pid, (int) port->id, + port->socket.fd); + + return; + } + } + + smsg->port_msg = msg.port_msg; + smsg->buf = b; + smsg->size = n; + smsg->fd[0] = msg.fd[0]; + smsg->fd[1] = msg.fd[1]; + + continue; + } + + port->from_socket--; + } + } + + process: + + if (n > 0) { + msg.buf = b; + msg.size = n; + + nxt_port_read_msg_process(task, port, &msg); + + /* + * To disable instant completion or buffer re-usage, + * handler should reset 'msg.buf'. + */ + if (msg.buf == b) { + nxt_port_buf_free(port, b); + } + + continue; + } + + if (n == NXT_AGAIN) { + nxt_port_buf_free(port, b); + + nxt_fd_event_enable_read(task->thread->engine, &port->socket); + + continue; + } + + /* n == 0 || n == NXT_ERROR */ + + nxt_work_queue_add(&task->thread->engine->fast_work_queue, + nxt_port_error_handler, task, &port->socket, NULL); + return; + } +} + + typedef struct { uint32_t stream; uint32_t pid; @@ -806,8 +1052,12 @@ nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, nxt_alert(task, "port %d: too small message:%uz", port->socket.fd, msg->size); - if (msg->fd != -1) { - nxt_fd_close(msg->fd); + if (msg->fd[0] != -1) { + nxt_fd_close(msg->fd[0]); + } + + if (msg->fd[1] != -1) { + nxt_fd_close(msg->fd[1]); } return; @@ -819,12 +1069,7 @@ nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, b = orig_b = msg->buf; b->mem.free += msg->size; - if (msg->port_msg.tracking) { - msg->cancelled = nxt_port_mmap_tracking_read(task, msg) == 0; - - } else { - msg->cancelled = 0; - } + msg->cancelled = 0; if (nxt_slow_path(msg->port_msg.nf != 0)) { @@ -853,7 +1098,8 @@ nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, port->handler(task, fmsg); msg->buf = fmsg->buf; - msg->fd = fmsg->fd; + msg->fd[0] = fmsg->fd[0]; + msg->fd[1] = fmsg->fd[1]; /* * To disable instant completion or buffer re-usage, @@ -887,12 +1133,17 @@ nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, if (nxt_fast_path(msg->cancelled == 0)) { msg->buf = NULL; - msg->fd = -1; + msg->fd[0] = -1; + msg->fd[1] = -1; b = NULL; } else { - if (msg->fd != -1) { - nxt_fd_close(msg->fd); + if (msg->fd[0] != -1) { + nxt_fd_close(msg->fd[0]); + } + + if (msg->fd[1] != -1) { + nxt_fd_close(msg->fd[1]); } } } else { @@ -993,10 +1244,18 @@ nxt_port_error_handler(nxt_task_t *task, void *obj, void *data) nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) { - if (msg->fd != -1 && msg->close_fd != 0) { - nxt_fd_close(msg->fd); + if (msg->close_fd) { + if (msg->fd[0] != -1) { + nxt_fd_close(msg->fd[0]); - msg->fd = -1; + msg->fd[0] = -1; + } + + if (msg->fd[1] != -1) { + nxt_fd_close(msg->fd[1]); + + msg->fd[1] = -1; + } } for (b = msg->buf; b != NULL; b = next) { |