diff options
author | Max Romanov <max.romanov@nginx.com> | 2020-08-11 19:20:34 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2020-08-11 19:20:34 +0300 |
commit | e227fc9e6281c280c46139a81646ecd7b0510e2b (patch) | |
tree | 8f6e631790e7a83bb46c28bd45ebaa5e737a3424 /src/nxt_port.c | |
parent | a82cf4ffb68126f2831ab9877a7ef283dd517690 (diff) | |
download | unit-e227fc9e6281c280c46139a81646ecd7b0510e2b.tar.gz unit-e227fc9e6281c280c46139a81646ecd7b0510e2b.tar.bz2 |
Introducing application and port shared memory queues.
The goal is to minimize the number of syscalls needed to deliver a message.
Diffstat (limited to '')
-rw-r--r-- | src/nxt_port.c | 28 |
1 files changed, 25 insertions, 3 deletions
diff --git a/src/nxt_port.c b/src/nxt_port.c index 54434d70..c9189d7c 100644 --- a/src/nxt_port.c +++ b/src/nxt_port.c @@ -8,6 +8,7 @@ #include <nxt_runtime.h> #include <nxt_port.h> #include <nxt_router.h> +#include <nxt_port_queue.h> static void nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); @@ -68,6 +69,8 @@ nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid, nxt_queue_init(&port->messages); nxt_thread_mutex_create(&port->write_mutex); + port->queue_fd = -1; + } else { nxt_mp_destroy(mp); } @@ -99,6 +102,16 @@ nxt_port_close(nxt_task_t *task, nxt_port_t *port) nxt_router_app_port_close(task, port); } } + + if (port->queue_fd != -1) { + nxt_fd_close(port->queue_fd); + port->queue_fd = -1; + } + + if (port->queue != NULL) { + nxt_mem_munmap(port->queue, sizeof(nxt_port_queue_t)); + port->queue = NULL; + } } @@ -176,6 +189,7 @@ nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) } +/* TODO join with process_ready and move to nxt_main_process.c */ nxt_inline void nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt, nxt_port_t *new_port, uint32_t stream) @@ -227,8 +241,9 @@ nxt_port_send_port(nxt_task_t *task, nxt_port_t *port, nxt_port_t *new_port, msg->max_share = port->max_share; msg->type = new_port->type; - return nxt_port_socket_write(task, port, NXT_PORT_MSG_NEW_PORT, - new_port->pair[1], stream, 0, b); + return nxt_port_socket_write2(task, port, NXT_PORT_MSG_NEW_PORT, + new_port->pair[1], new_port->queue_fd, + stream, 0, b); } @@ -279,7 +294,7 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) msg->u.new_port = port; } - +/* TODO move to nxt_main_process.c */ void nxt_port_process_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { @@ -304,6 +319,13 @@ nxt_port_process_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_debug(task, "process %PI ready", msg->port_msg.pid); + if (msg->fd != -1) { + port->queue_fd = msg->fd; + port->queue = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t), + PROT_READ | PROT_WRITE, MAP_SHARED, msg->fd, + 0); + } + nxt_port_send_new_port(task, rt, port, msg->port_msg.stream); } |