summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_port_socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nxt_port_socket.c')
-rw-r--r--src/nxt_port_socket.c271
1 files changed, 250 insertions, 21 deletions
diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c
index 844b65ca..14e2e605 100644
--- a/src/nxt_port_socket.c
+++ b/src/nxt_port_socket.c
@@ -5,6 +5,7 @@
*/
#include <nxt_main.h>
+#include <nxt_port_queue.h>
static nxt_int_t nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port,
@@ -17,6 +18,8 @@ static nxt_buf_t *nxt_port_buf_completion(nxt_task_t *task,
static nxt_port_send_msg_t *nxt_port_msg_insert_tail(nxt_port_t *port,
nxt_port_send_msg_t *msg);
static void nxt_port_read_handler(nxt_task_t *task, void *obj, void *data);
+static void nxt_port_queue_read_handler(nxt_task_t *task, void *obj,
+ void *data);
static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
nxt_port_recv_msg_t *msg);
static nxt_buf_t *nxt_port_buf_alloc(nxt_port_t *port);
@@ -143,12 +146,15 @@ nxt_port_release_send_msg(nxt_port_send_msg_t *msg)
nxt_int_t
-nxt_port_socket_twrite(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
- nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b,
- void *tracking)
+nxt_port_socket_write2(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
+ nxt_fd_t fd, nxt_fd_t fd2, uint32_t stream, nxt_port_id_t reply_port,
+ nxt_buf_t *b)
{
+ int notify;
+ uint8_t *p;
nxt_int_t res;
nxt_port_send_msg_t msg;
+ uint8_t qmsg[NXT_PORT_QUEUE_MSG_SIZE];
msg.link.next = NULL;
msg.link.prev = NULL;
@@ -156,14 +162,10 @@ nxt_port_socket_twrite(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
msg.buf = b;
msg.share = 0;
msg.fd = fd;
- msg.fd2 = -1;
+ msg.fd2 = fd2;
msg.close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0;
msg.allocated = 0;
- if (tracking != NULL) {
- nxt_port_mmap_tracking_write(msg.tracking_msg, tracking);
- }
-
msg.port_msg.stream = stream;
msg.port_msg.pid = nxt_pid;
msg.port_msg.reply_port = reply_port;
@@ -172,7 +174,42 @@ nxt_port_socket_twrite(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
msg.port_msg.mmap = 0;
msg.port_msg.nf = 0;
msg.port_msg.mf = 0;
- msg.port_msg.tracking = tracking != NULL;
+
+ if (port->queue != NULL && type != _NXT_PORT_MSG_READ_QUEUE) {
+
+ if (fd == -1
+ && (b == NULL
+ || nxt_buf_mem_used_size(&b->mem)
+ <= (int) (NXT_PORT_QUEUE_MSG_SIZE - sizeof(nxt_port_msg_t))))
+ {
+ p = nxt_cpymem(qmsg, &msg.port_msg, sizeof(nxt_port_msg_t));
+ if (b != NULL) {
+ p = nxt_cpymem(p, b->mem.pos, nxt_buf_mem_used_size(&b->mem));
+ }
+
+ res = nxt_port_queue_send(port->queue, qmsg, p - qmsg, &notify);
+
+ nxt_debug(task, "port{%d,%d} %d: enqueue %d notify %d, %d",
+ (int) port->pid, (int) port->id, port->socket.fd,
+ (int) (p - qmsg), notify, res);
+
+ if (notify == 0) {
+ return res;
+ }
+
+ msg.port_msg.type = _NXT_PORT_MSG_READ_QUEUE;
+ msg.buf = NULL;
+
+ } else {
+ qmsg[0] = _NXT_PORT_MSG_READ_SOCKET;
+
+ res = nxt_port_queue_send(port->queue, qmsg, 1, &notify);
+
+ nxt_debug(task, "port{%d,%d} %d: enqueue 1 notify %d, %d",
+ (int) port->pid, (int) port->id, port->socket.fd,
+ notify, res);
+ }
+ }
res = nxt_port_msg_chk_insert(task, port, &msg);
if (nxt_fast_path(res == NXT_DECLINED)) {
@@ -308,10 +345,6 @@ next_fragment:
port->max_size / PORT_MMAP_MIN_SIZE);
}
- if (msg->port_msg.tracking) {
- iov[0].iov_len += sizeof(msg->tracking_msg);
- }
-
sb.limit -= iov[0].iov_len;
nxt_sendbuf_mem_coalesce(task, &sb);
@@ -368,7 +401,6 @@ next_fragment:
msg->fd2 = -1;
msg->share += n;
msg->port_msg.nf = 1;
- msg->port_msg.tracking = 0;
if (msg->share >= port->max_share) {
msg->share = 0;
@@ -576,7 +608,9 @@ nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port)
port->engine = task->thread->engine;
port->socket.read_work_queue = &port->engine->fast_work_queue;
- port->socket.read_handler = nxt_port_read_handler;
+ port->socket.read_handler = port->queue != NULL
+ ? nxt_port_queue_read_handler
+ : nxt_port_read_handler;
port->socket.error_handler = nxt_port_error_handler;
nxt_fd_event_enable_read(port->engine, &port->socket);
@@ -660,6 +694,206 @@ nxt_port_read_handler(nxt_task_t *task, void *obj, void *data)
}
+static void
+nxt_port_queue_read_handler(nxt_task_t *task, void *obj, void *data)
+{
+ ssize_t n;
+ nxt_buf_t *b;
+ nxt_port_t *port;
+ struct iovec iov[2];
+ nxt_port_queue_t *queue;
+ nxt_port_recv_msg_t msg, *smsg;
+ uint8_t qmsg[NXT_PORT_QUEUE_MSG_SIZE];
+
+ port = nxt_container_of(obj, nxt_port_t, socket);
+ msg.port = port;
+
+ nxt_assert(port->engine == task->thread->engine);
+
+ queue = port->queue;
+ nxt_atomic_fetch_add(&queue->nitems, 1);
+
+ for ( ;; ) {
+
+ if (port->from_socket == 0) {
+ n = nxt_port_queue_recv(queue, qmsg);
+
+ if (n < 0 && !port->socket.read_ready) {
+ nxt_atomic_fetch_add(&queue->nitems, -1);
+
+ n = nxt_port_queue_recv(queue, qmsg);
+ if (n < 0) {
+ return;
+ }
+
+ nxt_atomic_fetch_add(&queue->nitems, 1);
+ }
+
+ if (n == 1 && qmsg[0] == _NXT_PORT_MSG_READ_SOCKET) {
+ port->from_socket++;
+
+ nxt_debug(task, "port{%d,%d} %d: dequeue 1 read_socket %d",
+ (int) port->pid, (int) port->id, port->socket.fd,
+ port->from_socket);
+
+ n = -1;
+
+ continue;
+ }
+
+ nxt_debug(task, "port{%d,%d} %d: dequeue %d",
+ (int) port->pid, (int) port->id, port->socket.fd,
+ (int) n);
+
+ } else {
+ if ((smsg = port->socket_msg) != NULL && smsg->size != 0) {
+ msg.port_msg = smsg->port_msg;
+ b = smsg->buf;
+ n = smsg->size;
+ msg.fd = smsg->fd;
+ msg.fd2 = smsg->fd2;
+
+ smsg->size = 0;
+
+ port->from_socket--;
+
+ nxt_debug(task, "port{%d,%d} %d: use suspended message %d",
+ (int) port->pid, (int) port->id, port->socket.fd,
+ (int) n);
+
+ goto process;
+ }
+
+ n = -1;
+ }
+
+ if (n < 0 && !port->socket.read_ready) {
+ nxt_atomic_fetch_add(&queue->nitems, -1);
+ return;
+ }
+
+ b = nxt_port_buf_alloc(port);
+
+ if (nxt_slow_path(b == NULL)) {
+ /* TODO: disable event for some time */
+ }
+
+ if (n >= (ssize_t) sizeof(nxt_port_msg_t)) {
+ nxt_memcpy(&msg.port_msg, qmsg, sizeof(nxt_port_msg_t));
+
+ if (n > (ssize_t) sizeof(nxt_port_msg_t)) {
+ nxt_memcpy(b->mem.pos, qmsg + sizeof(nxt_port_msg_t),
+ n - sizeof(nxt_port_msg_t));
+ }
+
+ } else {
+ iov[0].iov_base = &msg.port_msg;
+ iov[0].iov_len = sizeof(nxt_port_msg_t);
+
+ iov[1].iov_base = b->mem.pos;
+ iov[1].iov_len = port->max_size;
+
+ n = nxt_socketpair_recv(&port->socket, &msg.fd, iov, 2);
+
+ if (n == (ssize_t) sizeof(nxt_port_msg_t)
+ && msg.port_msg.type == _NXT_PORT_MSG_READ_QUEUE)
+ {
+ nxt_port_buf_free(port, b);
+
+ nxt_debug(task, "port{%d,%d} %d: recv %d read_queue",
+ (int) port->pid, (int) port->id, port->socket.fd,
+ (int) n);
+
+ continue;
+ }
+
+ nxt_debug(task, "port{%d,%d} %d: recvmsg %d",
+ (int) port->pid, (int) port->id, port->socket.fd,
+ (int) n);
+
+ if (n > 0) {
+ if (port->from_socket == 0) {
+ nxt_debug(task, "port{%d,%d} %d: suspend message %d",
+ (int) port->pid, (int) port->id, port->socket.fd,
+ (int) n);
+
+ smsg = port->socket_msg;
+
+ if (nxt_slow_path(smsg == NULL)) {
+ smsg = nxt_mp_alloc(port->mem_pool,
+ sizeof(nxt_port_recv_msg_t));
+
+ if (nxt_slow_path(smsg == NULL)) {
+ nxt_alert(task, "port{%d,%d} %d: suspend message "
+ "failed",
+ (int) port->pid, (int) port->id,
+ port->socket.fd);
+
+ return;
+ }
+
+ port->socket_msg = smsg;
+
+ } else {
+ if (nxt_slow_path(smsg->size != 0)) {
+ nxt_alert(task, "port{%d,%d} %d: too many suspend "
+ "messages",
+ (int) port->pid, (int) port->id,
+ port->socket.fd);
+
+ return;
+ }
+ }
+
+ smsg->port_msg = msg.port_msg;
+ smsg->buf = b;
+ smsg->size = n;
+ smsg->fd = msg.fd;
+ smsg->fd2 = msg.fd2;
+
+ continue;
+ }
+
+ port->from_socket--;
+ }
+ }
+
+ process:
+
+ if (n > 0) {
+ msg.buf = b;
+ msg.size = n;
+
+ nxt_port_read_msg_process(task, port, &msg);
+
+ /*
+ * To disable instant completion or buffer re-usage,
+ * handler should reset 'msg.buf'.
+ */
+ if (msg.buf == b) {
+ nxt_port_buf_free(port, b);
+ }
+
+ continue;
+ }
+
+ if (n == NXT_AGAIN) {
+ nxt_port_buf_free(port, b);
+
+ nxt_fd_event_enable_read(task->thread->engine, &port->socket);
+
+ continue;
+ }
+
+ /* n == 0 || n == NXT_ERROR */
+
+ nxt_work_queue_add(&task->thread->engine->fast_work_queue,
+ nxt_port_error_handler, task, &port->socket, NULL);
+ return;
+ }
+}
+
+
typedef struct {
uint32_t stream;
uint32_t pid;
@@ -831,12 +1065,7 @@ nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
b = orig_b = msg->buf;
b->mem.free += msg->size;
- if (msg->port_msg.tracking) {
- msg->cancelled = nxt_port_mmap_tracking_read(task, msg) == 0;
-
- } else {
- msg->cancelled = 0;
- }
+ msg->cancelled = 0;
if (nxt_slow_path(msg->port_msg.nf != 0)) {