summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_port.c
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2020-08-11 19:20:34 +0300
committerMax Romanov <max.romanov@nginx.com>2020-08-11 19:20:34 +0300
commite227fc9e6281c280c46139a81646ecd7b0510e2b (patch)
tree8f6e631790e7a83bb46c28bd45ebaa5e737a3424 /src/nxt_port.c
parenta82cf4ffb68126f2831ab9877a7ef283dd517690 (diff)
downloadunit-e227fc9e6281c280c46139a81646ecd7b0510e2b.tar.gz
unit-e227fc9e6281c280c46139a81646ecd7b0510e2b.tar.bz2
Introducing application and port shared memory queues.
The goal is to minimize the number of syscalls needed to deliver a message.
Diffstat (limited to 'src/nxt_port.c')
-rw-r--r--src/nxt_port.c28
1 files changed, 25 insertions, 3 deletions
diff --git a/src/nxt_port.c b/src/nxt_port.c
index 54434d70..c9189d7c 100644
--- a/src/nxt_port.c
+++ b/src/nxt_port.c
@@ -8,6 +8,7 @@
#include <nxt_runtime.h>
#include <nxt_port.h>
#include <nxt_router.h>
+#include <nxt_port_queue.h>
static void nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
@@ -68,6 +69,8 @@ nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid,
nxt_queue_init(&port->messages);
nxt_thread_mutex_create(&port->write_mutex);
+ port->queue_fd = -1;
+
} else {
nxt_mp_destroy(mp);
}
@@ -99,6 +102,16 @@ nxt_port_close(nxt_task_t *task, nxt_port_t *port)
nxt_router_app_port_close(task, port);
}
}
+
+ if (port->queue_fd != -1) {
+ nxt_fd_close(port->queue_fd);
+ port->queue_fd = -1;
+ }
+
+ if (port->queue != NULL) {
+ nxt_mem_munmap(port->queue, sizeof(nxt_port_queue_t));
+ port->queue = NULL;
+ }
}
@@ -176,6 +189,7 @@ nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
}
+/* TODO join with process_ready and move to nxt_main_process.c */
nxt_inline void
nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt,
nxt_port_t *new_port, uint32_t stream)
@@ -227,8 +241,9 @@ nxt_port_send_port(nxt_task_t *task, nxt_port_t *port, nxt_port_t *new_port,
msg->max_share = port->max_share;
msg->type = new_port->type;
- return nxt_port_socket_write(task, port, NXT_PORT_MSG_NEW_PORT,
- new_port->pair[1], stream, 0, b);
+ return nxt_port_socket_write2(task, port, NXT_PORT_MSG_NEW_PORT,
+ new_port->pair[1], new_port->queue_fd,
+ stream, 0, b);
}
@@ -279,7 +294,7 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
msg->u.new_port = port;
}
-
+/* TODO move to nxt_main_process.c */
void
nxt_port_process_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
@@ -304,6 +319,13 @@ nxt_port_process_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
nxt_debug(task, "process %PI ready", msg->port_msg.pid);
+ if (msg->fd != -1) {
+ port->queue_fd = msg->fd;
+ port->queue = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
+ PROT_READ | PROT_WRITE, MAP_SHARED, msg->fd,
+ 0);
+ }
+
nxt_port_send_new_port(task, rt, port, msg->port_msg.stream);
}