summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2017-05-12 20:32:41 +0300
committerMax Romanov <max.romanov@nginx.com>2017-05-12 20:32:41 +0300
commitf7b4bdfd892a0b479dc946896435a3ba7f9615dd (patch)
treea6f0c4ebaeed2d9f0fcb1c07178b52a684a53280
parent1782c771fab999b37a8c04ed72760e3528205be7 (diff)
downloadunit-f7b4bdfd892a0b479dc946896435a3ba7f9615dd.tar.gz
unit-f7b4bdfd892a0b479dc946896435a3ba7f9615dd.tar.bz2
Using shared memory to send data via nxt_port.
Usage: b = nxt_port_mmap_get_buf(task, port, size); b->mem.free = nxt_cpymem(b->mem.free, data, size); nxt_port_socket_write(task, port, NXT_PORT_MSG_DATA, -1, 0, b);
-rw-r--r--auto/shmem60
-rw-r--r--auto/sources2
-rwxr-xr-xconfigure1
-rw-r--r--src/nxt_buf.h26
-rw-r--r--src/nxt_main.h5
-rw-r--r--src/nxt_master_process.c81
-rw-r--r--src/nxt_port.c127
-rw-r--r--src/nxt_port.h49
-rw-r--r--src/nxt_port_memory.c700
-rw-r--r--src/nxt_port_memory.h50
-rw-r--r--src/nxt_port_socket.c141
-rw-r--r--src/nxt_process.c19
-rw-r--r--src/nxt_process.h24
-rw-r--r--src/nxt_queue.h15
-rw-r--r--src/nxt_runtime.c327
-rw-r--r--src/nxt_runtime.h44
-rw-r--r--src/nxt_sendbuf.c9
-rw-r--r--src/nxt_sendbuf.h1
-rw-r--r--src/nxt_worker_process.c1
19 files changed, 1501 insertions, 181 deletions
diff --git a/auto/shmem b/auto/shmem
new file mode 100644
index 00000000..48665649
--- /dev/null
+++ b/auto/shmem
@@ -0,0 +1,60 @@
+
+# Copyright (C) Igor Sysoev
+# Copyright (C) NGINX, Inc.
+
+
+# Linux, FreeBSD, MacOSX
+
+nxt_feature="shm_open()"
+nxt_feature_name=NXT_HAVE_SHM_OPEN
+nxt_feature_run=yes
+nxt_feature_incs=
+nxt_feature_libs=
+
+if [ "$NXT_SYSTEM" = "Linux" ]; then
+ nxt_feature_libs=-lrt
+fi
+
+nxt_feature_test="#include <sys/mman.h>
+ #include <fcntl.h>
+ #include <sys/stat.h>
+ #include <sys/types.h>
+
+ int main() {
+ static char name[] = \"/nginext.configure\";
+
+ shm_unlink(name);
+
+ int fd = shm_open(name, O_CREAT | O_EXCL | O_RDWR,
+ S_IRUSR | S_IWUSR);
+ if (fd == -1)
+ return 1;
+
+ shm_unlink(name);
+ return 0;
+ }"
+. auto/feature
+
+
+# Linux
+
+nxt_feature="memfd_create()"
+nxt_feature_name=NXT_HAVE_MEMFD_CREATE
+nxt_feature_run=yes
+nxt_feature_incs=
+nxt_feature_libs=
+nxt_feature_test="#include <linux/memfd.h>
+ #include <unistd.h>
+ #include <sys/syscall.h>
+
+ int main() {
+ static char name[] = \"/nginext.configure\";
+
+ int fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
+ if (fd == -1)
+ return 1;
+
+ return 0;
+ }"
+. auto/feature
+
diff --git a/auto/sources b/auto/sources
index 8958d5d9..efe748d1 100644
--- a/auto/sources
+++ b/auto/sources
@@ -18,6 +18,7 @@ NXT_LIB_DEPS=" \
src/nxt_process.h \
src/nxt_signal.h \
src/nxt_port.h \
+ src/nxt_port_memory.h \
src/nxt_dyld.h \
src/nxt_thread.h \
src/nxt_thread_id.h \
@@ -86,6 +87,7 @@ NXT_LIB_SRCS=" \
src/nxt_process_title.c \
src/nxt_signal.c \
src/nxt_port_socket.c \
+ src/nxt_port_memory.c \
src/nxt_port.c \
src/nxt_dyld.c \
src/nxt_random.c \
diff --git a/configure b/configure
index 1d748c97..16b19820 100755
--- a/configure
+++ b/configure
@@ -63,6 +63,7 @@ fi
. auto/atomic
. auto/malloc
. auto/mmap
+. auto/shmem
. auto/time
if [ $NXT_THREADS = YES ]; then
diff --git a/src/nxt_buf.h b/src/nxt_buf.h
index 85ab3602..c5ac5c20 100644
--- a/src/nxt_buf.h
+++ b/src/nxt_buf.h
@@ -67,7 +67,7 @@ typedef struct {
struct nxt_buf_s {
void *data;
nxt_work_handler_t completion_handler;
- nxt_buf_t *parent;
+ void *parent;
/*
* The next link, flags, and nxt_buf_mem_t should
@@ -85,11 +85,13 @@ struct nxt_buf_s {
uint8_t is_file; /* 1 bit */
uint16_t is_mmap:1;
+ uint16_t is_port_mmap:1;
uint16_t is_sync:1;
uint16_t is_nobuf:1;
uint16_t is_flush:1;
uint16_t is_last:1;
+ uint16_t is_port_mmap_sent:1;
nxt_buf_mem_t mem;
@@ -103,10 +105,11 @@ struct nxt_buf_s {
};
-#define NXT_BUF_MEM_SIZE offsetof(nxt_buf_t, file)
-#define NXT_BUF_SYNC_SIZE NXT_BUF_MEM_SIZE
-#define NXT_BUF_MMAP_SIZE sizeof(nxt_buf_t)
-#define NXT_BUF_FILE_SIZE sizeof(nxt_buf_t)
+#define NXT_BUF_MEM_SIZE offsetof(nxt_buf_t, file)
+#define NXT_BUF_SYNC_SIZE NXT_BUF_MEM_SIZE
+#define NXT_BUF_FILE_SIZE sizeof(nxt_buf_t)
+#define NXT_BUF_MMAP_SIZE NXT_BUF_FILE_SIZE
+#define NXT_BUF_PORT_MMAP_SIZE NXT_BUF_MEM_SIZE
#define NXT_BUF_SYNC_NOBUF 1
@@ -146,6 +149,19 @@ nxt_buf_clear_mmap(b) \
#define \
+nxt_buf_is_port_mmap(b) \
+ ((b)->is_port_mmap)
+
+#define \
+nxt_buf_set_port_mmap(b) \
+ (b)->is_port_mmap = 1
+
+#define \
+nxt_buf_clear_port_mmap(b) \
+ (b)->is_port_mmap = 0
+
+
+#define \
nxt_buf_is_sync(b) \
((b)->is_sync)
diff --git a/src/nxt_main.h b/src/nxt_main.h
index a1a01434..24ec4ca1 100644
--- a/src/nxt_main.h
+++ b/src/nxt_main.h
@@ -24,7 +24,9 @@ typedef struct nxt_port_recv_msg_s nxt_port_recv_msg_t;
typedef void (*nxt_port_handler_t)(nxt_task_t *task, nxt_port_recv_msg_t *msg);
typedef struct nxt_sig_event_s nxt_sig_event_t;
typedef struct nxt_runtime_s nxt_runtime_t;
+typedef uint16_t nxt_port_id_t;
+#include <nxt_queue.h>
#include <nxt_process.h>
typedef struct nxt_thread_s nxt_thread_t;
@@ -45,7 +47,6 @@ typedef struct nxt_log_s nxt_log_t;
#include <nxt_atomic.h>
-#include <nxt_queue.h>
#include <nxt_rbtree.h>
#include <nxt_sprintf.h>
#include <nxt_parse.h>
@@ -104,6 +105,7 @@ typedef struct nxt_thread_pool_s nxt_thread_pool_t;
#include <nxt_service.h>
typedef struct nxt_buf_s nxt_buf_t;
+typedef struct nxt_port_mmap_s nxt_port_mmap_t;
#include <nxt_buf.h>
#include <nxt_buf_pool.h>
#include <nxt_recvbuf.h>
@@ -129,6 +131,7 @@ nxt_thread_extern_data(nxt_thread_t, nxt_thread_context);
#include <nxt_fd_event.h>
#include <nxt_port.h>
+#include <nxt_port_memory.h>
#if (NXT_THREADS)
#include <nxt_thread_pool.h>
#endif
diff --git a/src/nxt_master_process.c b/src/nxt_master_process.c
index e3d5382a..ed39130b 100644
--- a/src/nxt_master_process.c
+++ b/src/nxt_master_process.c
@@ -80,14 +80,12 @@ nxt_master_process_port_create(nxt_task_t *task, nxt_runtime_t *rt)
nxt_port_t *port;
nxt_process_t *process;
- process = nxt_runtime_new_process(rt);
+ process = nxt_runtime_process_get(rt, nxt_pid);
if (nxt_slow_path(process == NULL)) {
return NXT_ERROR;
}
- process->pid = nxt_pid;
-
- port = nxt_array_zero_add(process->ports);
+ port = nxt_process_port_new(process);
if (nxt_slow_path(port == NULL)) {
return NXT_ERROR;
}
@@ -97,8 +95,10 @@ nxt_master_process_port_create(nxt_task_t *task, nxt_runtime_t *rt)
return ret;
}
- port->pid = nxt_pid;
port->engine = 0;
+ port->type = NXT_PROCESS_MASTER;
+
+ nxt_runtime_port_add(rt, port);
/*
* A master process port. A write port is not closed
@@ -220,17 +220,16 @@ nxt_master_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt,
* TODO: remove process, init, ports from array on memory and fork failures.
*/
- process = nxt_runtime_new_process(rt);
+ process = nxt_runtime_process_new(rt);
if (nxt_slow_path(process == NULL)) {
return NXT_ERROR;
}
process->init = init;
+ master_process = rt->mprocess;
+ init->master_port = nxt_process_port_first(master_process);
- master_process = rt->processes->elts;
- init->master_port = master_process->ports->elts;
-
- port = nxt_array_zero_add(process->ports);
+ port = nxt_process_port_new(process);
if (nxt_slow_path(port == NULL)) {
return NXT_ERROR;
}
@@ -243,6 +242,7 @@ nxt_master_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt,
}
port->engine = 0;
+ port->type = init->type;
pid = nxt_process_create(task, init);
@@ -253,6 +253,11 @@ nxt_master_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt,
case 0:
/* A worker process, return to the event engine work queue loop. */
+ process->pid = nxt_pid;
+ port->pid = nxt_pid;
+
+ nxt_runtime_process_add(rt, process);
+
return NXT_AGAIN;
default:
@@ -260,6 +265,8 @@ nxt_master_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt,
process->pid = pid;
port->pid = pid;
+ nxt_runtime_process_add(rt, process);
+
nxt_port_read_close(port);
nxt_port_write_enable(task, port);
@@ -272,27 +279,23 @@ nxt_master_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt,
void
nxt_master_stop_worker_processes(nxt_task_t *task, nxt_runtime_t *rt)
{
- nxt_uint_t i, n, nprocesses, nports;
nxt_port_t *port;
nxt_process_t *process;
- process = rt->processes->elts;
- nprocesses = rt->processes->nelts;
+ nxt_runtime_process_each(rt, process)
+ {
+ if (nxt_pid != process->pid) {
+ process->init = NULL;
- for (i = 0; i < nprocesses; i++) {
+ nxt_process_port_each(process, port) {
- if (nxt_pid != process[i].pid) {
- process[i].init = NULL;
+ (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT,
+ -1, 0, 0, NULL);
- port = process[i].ports->elts;
- nports = process[i].ports->nelts;
-
- for (n = 0; n < nports; n++) {
- (void) nxt_port_socket_write(task, &port[n], NXT_PORT_MSG_QUIT,
- -1, 0, NULL);
- }
+ } nxt_process_port_loop;
}
}
+ nxt_runtime_process_loop;
}
@@ -331,7 +334,7 @@ nxt_master_process_sigusr1_handler(nxt_task_t *task, void *obj, void *data)
nxt_int_t ret;
nxt_uint_t n;
nxt_file_t *file, *new_file;
- nxt_runtime_t *rt;
+ nxt_runtime_t *rt;
nxt_array_t *new_files;
nxt_mem_pool_t *mp;
@@ -473,39 +476,29 @@ nxt_master_process_sigchld_handler(nxt_task_t *task, void *obj, void *data)
static void
nxt_master_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid)
{
- nxt_uint_t i, n;
nxt_runtime_t *rt;
nxt_process_t *process;
nxt_process_init_t *init;
rt = task->thread->runtime;
- process = rt->processes->elts;
- n = rt->processes->nelts;
-
- /* A process[0] is the master process. */
+ process = nxt_runtime_process_find(rt, pid);
- for (i = 1; i < n; i++) {
+ if (process) {
+ init = process->init;
- if (pid == process[i].pid) {
- init = process[i].init;
+ /* TODO: free ports fds. */
- /* TODO: free ports fds. */
+ nxt_runtime_process_remove(rt, process);
- nxt_array_remove(rt->processes, &process[i]);
+ if (nxt_exiting) {
- if (nxt_exiting) {
- nxt_debug(task, "processes %d", n);
-
- if (n == 2) {
- nxt_runtime_quit(task);
- }
-
- } else if (init != NULL) {
- (void) nxt_master_create_worker_process(task, rt, init);
+ if (rt->nprocesses == 2) {
+ nxt_runtime_quit(task);
}
- return;
+ } else if (init != NULL) {
+ (void) nxt_master_create_worker_process(task, rt, init);
}
}
}
diff --git a/src/nxt_port.c b/src/nxt_port.c
index 1da16587..fc807d1f 100644
--- a/src/nxt_port.c
+++ b/src/nxt_port.c
@@ -32,25 +32,21 @@ void
nxt_port_write(nxt_task_t *task, nxt_runtime_t *rt, nxt_uint_t type,
nxt_fd_t fd, uint32_t stream, nxt_buf_t *b)
{
- nxt_uint_t i, n, nprocesses, nports;
nxt_port_t *port;
nxt_process_t *process;
- process = rt->processes->elts;
- nprocesses = rt->processes->nelts;
+ nxt_runtime_process_each(rt, process)
+ {
+ if (nxt_pid != process->pid) {
+ nxt_process_port_each(process, port) {
- for (i = 0; i < nprocesses; i++) {
+ (void) nxt_port_socket_write(task, port, type,
+ fd, stream, 0, b);
- if (nxt_pid != process[i].pid) {
- port = process[i].ports->elts;
- nports = process[i].ports->nelts;
-
- for (n = 0; n < nports; n++) {
- (void) nxt_port_socket_write(task, &port[n], type,
- fd, stream, b);
- }
+ } nxt_process_port_loop;
}
}
+ nxt_runtime_process_loop;
}
@@ -59,19 +55,19 @@ nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
nxt_port_handler_t *handlers;
- if (nxt_fast_path(msg->type <= NXT_PORT_MSG_MAX)) {
+ if (nxt_fast_path(msg->port_msg.type <= NXT_PORT_MSG_MAX)) {
nxt_debug(task, "port %d: message type:%uD",
- msg->port->socket.fd, msg->type);
+ msg->port->socket.fd, msg->port_msg.type);
handlers = msg->port->data;
- handlers[msg->type](task, msg);
+ handlers[msg->port_msg.type](task, msg);
return;
}
nxt_log(task, NXT_LOG_CRIT, "port %d: unknown message type:%uD",
- msg->port->socket.fd, msg->type);
+ msg->port->socket.fd, msg->port_msg.type);
}
@@ -87,28 +83,20 @@ nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt,
nxt_port_t *new_port)
{
nxt_buf_t *b;
- nxt_uint_t i, n;
nxt_port_t *port;
nxt_process_t *process;
nxt_port_msg_new_port_t *msg;
- n = rt->processes->nelts;
- if (n == 0) {
- return;
- }
-
nxt_debug(task, "new port %d for process %PI engine %uD",
new_port->socket.fd, new_port->pid, new_port->engine);
- process = rt->processes->elts;
-
- for (i = 0; i < n; i++) {
-
- if (process[i].pid == new_port->pid || process[i].pid == nxt_pid) {
+ nxt_runtime_process_each(rt, process)
+ {
+ if (process->pid == new_port->pid || process->pid == nxt_pid) {
continue;
}
- port = process[i].ports->elts;
+ port = nxt_process_port_first(process);
b = nxt_buf_mem_alloc(port->mem_pool, sizeof(nxt_port_data_t), 0);
@@ -116,19 +104,25 @@ nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt,
continue;
}
+ nxt_debug(task, "send new port %FD to process %PI",
+ new_port->socket.fd, process->pid);
+
b->data = port;
b->completion_handler = nxt_port_new_port_buf_completion;
b->mem.free += sizeof(nxt_port_msg_new_port_t);
msg = (nxt_port_msg_new_port_t *) b->mem.pos;
+ msg->id = new_port->id;
msg->pid = new_port->pid;
msg->engine = new_port->engine;
msg->max_size = port->max_size;
msg->max_share = port->max_share;
+ msg->type = new_port->type;
(void) nxt_port_socket_write(task, port, NXT_PORT_MSG_NEW_PORT,
- new_port->socket.fd, 0, b);
+ new_port->socket.fd, 0, 0, b);
}
+ nxt_runtime_process_loop;
}
@@ -158,12 +152,15 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
rt = task->thread->runtime;
- process = nxt_runtime_new_process(rt);
+ new_port_msg = (nxt_port_msg_new_port_t *) msg->buf->mem.pos;
+ msg->buf->mem.pos = msg->buf->mem.free;
+
+ process = nxt_runtime_process_get(rt, new_port_msg->pid);
if (nxt_slow_path(process == NULL)) {
return;
}
- port = nxt_array_zero_add(process->ports);
+ port = nxt_process_port_new(process);
if (nxt_slow_path(port == NULL)) {
return;
}
@@ -175,51 +172,74 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
port->mem_pool = mp;
- new_port_msg = (nxt_port_msg_new_port_t *) msg->buf->mem.pos;
- msg->buf->mem.pos = msg->buf->mem.free;
-
nxt_debug(task, "new port %d received for process %PI engine %uD",
msg->fd, new_port_msg->pid, new_port_msg->engine);
- process->pid = new_port_msg->pid;
-
- port->pid = new_port_msg->pid;
+ port->id = new_port_msg->id;
port->engine = new_port_msg->engine;
port->pair[0] = -1;
port->pair[1] = msg->fd;
port->max_size = new_port_msg->max_size;
port->max_share = new_port_msg->max_share;
+ port->type = new_port_msg->type;
nxt_queue_init(&port->messages);
port->socket.task = task;
+ nxt_runtime_port_add(rt, port);
+
nxt_port_write_enable(task, port);
}
void
+nxt_port_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
+{
+ nxt_runtime_t *rt;
+ nxt_process_t *process;
+
+ rt = task->thread->runtime;
+
+ if (nxt_slow_path(msg->fd == -1)) {
+ nxt_log(task, NXT_LOG_WARN, "invalid fd passed with mmap message");
+
+ return;
+ }
+
+ process = nxt_runtime_process_get(rt, msg->port_msg.pid);
+ if (nxt_slow_path(process == NULL)) {
+ nxt_log(task, NXT_LOG_WARN, "failed to get process #%PI",
+ msg->port_msg.pid);
+
+ goto fail_close;
+ }
+
+ nxt_port_incoming_port_mmap(task, process, msg->fd);
+
+fail_close:
+
+ close(msg->fd);
+}
+
+
+void
nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt, nxt_uint_t slot,
nxt_fd_t fd)
{
nxt_buf_t *b;
- nxt_uint_t i, n;
nxt_port_t *port;
nxt_process_t *process;
- n = rt->processes->nelts;
- if (n == 0) {
- return;
- }
-
nxt_debug(task, "change log file #%ui fd:%FD", slot, fd);
- process = rt->processes->elts;
-
- /* process[0] is master process. */
+ nxt_runtime_process_each(rt, process)
+ {
+ if (nxt_pid == process->pid) {
+ continue;
+ }
- for (i = 1; i < n; i++) {
- port = process[i].ports->elts;
+ port = nxt_process_port_first(process);
b = nxt_buf_mem_alloc(port->mem_pool, sizeof(nxt_port_data_t), 0);
if (nxt_slow_path(b == NULL)) {
@@ -230,8 +250,9 @@ nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt, nxt_uint_t slot,
b->mem.free += sizeof(nxt_uint_t);
(void) nxt_port_socket_write(task, port, NXT_PORT_MSG_CHANGE_FILE,
- fd, 0, b);
+ fd, 0, 0, b);
}
+ nxt_runtime_process_loop;
}
@@ -269,11 +290,17 @@ nxt_port_change_log_file_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
void
nxt_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
+ size_t dump_size;
nxt_buf_t *b;
b = msg->buf;
+ dump_size = b->mem.free - b->mem.pos;
+
+ if (dump_size > 300) {
+ dump_size = 300;
+ }
- nxt_debug(task, "data: %*s", b->mem.free - b->mem.pos, b->mem.pos);
+ nxt_debug(task, "data: %*s", dump_size, b->mem.pos);
b->mem.pos = b->mem.free;
}
diff --git a/src/nxt_port.h b/src/nxt_port.h
index 8a9b8926..76775660 100644
--- a/src/nxt_port.h
+++ b/src/nxt_port.h
@@ -8,11 +8,28 @@
#define _NXT_PORT_H_INCLUDED_
+typedef enum {
+ NXT_PORT_MSG_QUIT = 0,
+ NXT_PORT_MSG_NEW_PORT,
+ NXT_PORT_MSG_CHANGE_FILE,
+ NXT_PORT_MSG_MMAP,
+ NXT_PORT_MSG_DATA,
+} nxt_port_msg_type_t;
+
+#define NXT_PORT_MSG_MAX NXT_PORT_MSG_DATA
+
+
+/* Passed as a first iov chunk. */
typedef struct {
- uint32_t stream;
+ uint32_t stream;
+ nxt_pid_t pid;
+ nxt_port_id_t reply_port;
- uint16_t type;
- uint8_t last; /* 1 bit */
+ nxt_port_msg_type_t type:8;
+ uint8_t last; /* 1 bit */
+
+ /* Message data send using mmap, next chunk is a nxt_port_mmap_msg_t. */
+ uint8_t mmap; /* 1 bit */
} nxt_port_msg_t;
@@ -26,12 +43,10 @@ typedef struct {
struct nxt_port_recv_msg_s {
- uint32_t stream;
- uint16_t type;
-
nxt_fd_t fd;
nxt_buf_t *buf;
nxt_port_t *port;
+ nxt_port_msg_t port_msg;
};
@@ -39,6 +54,8 @@ struct nxt_port_s {
/* Must be the first field. */
nxt_fd_event_t socket;
+ nxt_queue_link_t link;
+
nxt_queue_t messages; /* of nxt_port_send_msg_t */
/* Maximum size of message part. */
@@ -53,26 +70,22 @@ struct nxt_port_s {
nxt_buf_t *free_bufs;
nxt_socket_t pair[2];
+ nxt_port_id_t id;
nxt_pid_t pid;
uint32_t engine;
-};
-
-
-#define NXT_PORT_MSG_MAX NXT_PORT_MSG_DATA
-typedef enum {
- NXT_PORT_MSG_QUIT = 0,
- NXT_PORT_MSG_NEW_PORT,
- NXT_PORT_MSG_CHANGE_FILE,
- NXT_PORT_MSG_DATA,
-} nxt_port_msg_type_e;
+ nxt_process_type_t type:8;
+ nxt_process_t *process;
+};
typedef struct {
+ nxt_port_id_t id;
nxt_pid_t pid;
uint32_t engine;
size_t max_size;
size_t max_share;
+ nxt_process_type_t type:8;
} nxt_port_msg_new_port_t;
@@ -94,7 +107,8 @@ void nxt_port_write_close(nxt_port_t *port);
void nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port);
void nxt_port_read_close(nxt_port_t *port);
nxt_int_t nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port,
- nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_buf_t *b);
+ nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port,
+ nxt_buf_t *b);
void nxt_port_create(nxt_thread_t *thread, nxt_port_t *port,
nxt_port_handler_t *handlers);
@@ -109,6 +123,7 @@ void nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
void nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
void nxt_port_change_log_file_handler(nxt_task_t *task,
nxt_port_recv_msg_t *msg);
+void nxt_port_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
void nxt_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
void nxt_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
diff --git a/src/nxt_port_memory.c b/src/nxt_port_memory.c
new file mode 100644
index 00000000..447ef138
--- /dev/null
+++ b/src/nxt_port_memory.c
@@ -0,0 +1,700 @@
+
+#include <nxt_main.h>
+
+#if (NXT_HAVE_MEMFD_CREATE)
+
+#include <linux/memfd.h>
+#include <unistd.h>
+#include <sys/syscall.h>
+
+#endif
+
+#define PORT_MMAP_CHUNK_SIZE (1024 * 16)
+#define PORT_MMAP_HEADER_SIZE (1024 * 4)
+#define PORT_MMAP_SIZE (PORT_MMAP_HEADER_SIZE + 1024 * 1024 * 10)
+
+#define PORT_MMAP_CHUNK_COUNT \
+ ( (PORT_MMAP_SIZE - PORT_MMAP_HEADER_SIZE) / PORT_MMAP_CHUNK_SIZE )
+
+
+typedef uint32_t nxt_chunk_id_t;
+
+typedef nxt_atomic_uint_t nxt_free_map_t;
+
+#define FREE_BITS (sizeof(nxt_free_map_t) * 8)
+
+#define FREE_IDX(nchunk) ((nchunk) / FREE_BITS)
+
+#define FREE_MASK(nchunk) \
+ ( 1ULL << ( (nchunk) % FREE_BITS ) )
+
+#define MAX_FREE_IDX FREE_IDX(PORT_MMAP_CHUNK_COUNT)
+
+
+/* Mapped at the start of shared memory segment. */
+struct nxt_port_mmap_header_s {
+ nxt_free_map_t free_map[MAX_FREE_IDX];
+};
+
+
+/*
+ * Element of nxt_process_t.incoming/outgoing, shared memory segment
+ * descriptor.
+ */
+struct nxt_port_mmap_s {
+ uint32_t id;
+ nxt_fd_t fd;
+ nxt_pid_t pid; /* For sanity check. */
+ union {
+ void *mem;
+ nxt_port_mmap_header_t *hdr;
+ } u;
+};
+
+
+/* Passed as a second iov chunk when 'mmap' bit in nxt_port_msg_t is 1. */
+typedef struct {
+ uint32_t mmap_id; /* Mmap index in nxt_process_t.outgoing. */
+ nxt_chunk_id_t chunk_id; /* Mmap chunk index. */
+ uint32_t size; /* Payload data size. */
+} nxt_port_mmap_msg_t;
+
+
+static nxt_bool_t
+nxt_port_mmap_get_free_chunk(nxt_port_mmap_t *port_mmap, nxt_chunk_id_t *c);
+
+#define nxt_port_mmap_get_chunk_busy(hdr, c) \
+ ((hdr->free_map[FREE_IDX(c)] & FREE_MASK(c)) == 0)
+
+nxt_inline void
+nxt_port_mmap_set_chunk_busy(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c);
+
+nxt_inline void
+nxt_port_mmap_set_chunk_free(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c);
+
+#define nxt_port_mmap_chunk_id(port_mmap, b) \
+ ((((u_char *) (b) - (u_char *) (port_mmap->u.mem)) - \
+ PORT_MMAP_HEADER_SIZE) / PORT_MMAP_CHUNK_SIZE)
+
+#define nxt_port_mmap_chunk_start(port_mmap, chunk) \
+ (((u_char *) (port_mmap->u.mem)) + PORT_MMAP_HEADER_SIZE + \
+ (chunk) * PORT_MMAP_CHUNK_SIZE)
+
+
+void
+nxt_port_mmap_destroy(nxt_port_mmap_t *port_mmap)
+{
+ if (port_mmap->u.mem != NULL) {
+ nxt_mem_munmap(port_mmap->u.mem, PORT_MMAP_SIZE);
+ port_mmap->u.mem = NULL;
+ }
+
+ if (port_mmap->fd != -1) {
+ close(port_mmap->fd);
+ port_mmap->fd = -1;
+ }
+}
+
+
+static void
+nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
+{
+ u_char *p;
+ nxt_buf_t *b;
+ nxt_chunk_id_t c;
+ nxt_mem_pool_t *mp;
+ nxt_port_mmap_t *port_mmap;
+ nxt_port_mmap_header_t *hdr;
+
+ b = obj;
+
+ nxt_debug(task, "mmap buf completion: %p %p", b, b->mem.start);
+
+ mp = b->data;
+
+ port_mmap = (nxt_port_mmap_t *) b->parent;
+ hdr = port_mmap->u.hdr;
+
+ if (b->is_port_mmap_sent && b->mem.pos > b->mem.start) {
+ /*
+ * Chunks until b->mem.pos has been sent to other side,
+ * let's release rest (if any).
+ */
+ p = b->mem.pos - 1;
+ c = nxt_port_mmap_chunk_id(port_mmap, p) + 1;
+ p = nxt_port_mmap_chunk_start(port_mmap, c);
+ } else {
+ p = b->mem.start;
+ c = nxt_port_mmap_chunk_id(port_mmap, p);
+ }
+
+ while (p < b->mem.end) {
+ nxt_port_mmap_set_chunk_free(hdr, c);
+
+ p += PORT_MMAP_CHUNK_SIZE;
+ c++;
+ }
+
+ nxt_buf_free(mp, b);
+}
+
+
+static nxt_bool_t
+nxt_port_mmap_get_free_chunk(nxt_port_mmap_t *port_mmap, nxt_chunk_id_t *c)
+{
+ int ffs;
+ size_t i;
+ nxt_free_map_t bits;
+ nxt_free_map_t *free_map;
+
+ free_map = port_mmap->u.hdr->free_map;
+
+ for (i = 0; i < MAX_FREE_IDX; i++) {
+ bits = free_map[i];
+ if (bits == 0) {
+ continue;
+ }
+
+ ffs = __builtin_ffsll(bits);
+ if (ffs != 0) {
+ *c = i * FREE_BITS + ffs - 1;
+ return 1;
+ }
+ }
+
+ return 0;
+}
+
+
+nxt_inline void
+nxt_port_mmap_set_chunk_busy(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c)
+{
+ nxt_atomic_and_fetch(hdr->free_map + FREE_IDX(c), ~FREE_MASK(c));
+
+ nxt_thread_log_debug("set_chunk_busy: hdr %p; b %D", hdr, c);
+}
+
+
+nxt_inline void
+nxt_port_mmap_set_chunk_free(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c)
+{
+ nxt_atomic_or_fetch(hdr->free_map + FREE_IDX(c), FREE_MASK(c));
+
+ nxt_thread_log_debug("set_chunk_free: hdr %p; b %D", hdr, c);
+}
+
+
+nxt_port_mmap_t *
+nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process,
+ nxt_fd_t fd)
+{
+ struct stat mmap_stat;
+ nxt_port_mmap_t *port_mmap;
+
+ nxt_debug(task, "got new mmap fd #%FD from process %PI",
+ fd, process->pid);
+
+ if (fstat(fd, &mmap_stat) == -1) {
+ nxt_log(task, NXT_LOG_WARN, "fstat(%FD) failed %E", fd, nxt_errno);
+
+ return NULL;
+ }
+
+ if (process->incoming == NULL) {
+ process->incoming = nxt_array_create(process->mem_pool, 1,
+ sizeof(nxt_port_mmap_t));
+ }
+
+ if (nxt_slow_path(process->incoming == NULL)) {
+ nxt_log(task, NXT_LOG_WARN, "failed to allocate incoming array");
+
+ return NULL;
+ }
+
+ port_mmap = nxt_array_zero_add(process->incoming);
+ if (nxt_slow_path(port_mmap == NULL)) {
+ nxt_log(task, NXT_LOG_WARN, "failed to add mmap to incoming array");
+
+ return NULL;
+ }
+
+ port_mmap->id = process->incoming->nelts - 1;
+ port_mmap->fd = -1;
+ port_mmap->pid = process->pid;
+ port_mmap->u.mem = nxt_mem_mmap(NULL, mmap_stat.st_size,
+ PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
+
+ if (nxt_slow_path(port_mmap->u.mem == MAP_FAILED)) {
+ nxt_log(task, NXT_LOG_WARN, "mmap() failed %E", nxt_errno);
+
+ port_mmap->u.mem = NULL;
+
+ return NULL;
+ }
+
+ return port_mmap;
+}
+
+
+static void
+nxt_port_mmap_send_fd_buf_completion(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_buf_t *b;
+ nxt_port_t *port;
+ nxt_port_mmap_t *port_mmap;
+
+ b = obj;
+ port = b->data;
+ port_mmap = (nxt_port_mmap_t *) b->parent;
+
+ nxt_debug(task, "mmap fd %FD sent to %PI", port_mmap->fd, port->pid);
+
+ close(port_mmap->fd);
+ port_mmap->fd = -1;
+
+ nxt_buf_free(port->mem_pool, b);
+}
+
+
+static nxt_port_mmap_t *
+nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
+ nxt_port_t *port)
+{
+ u_char *p, name[64];
+ nxt_buf_t *b;
+ nxt_port_mmap_t *port_mmap;
+ nxt_port_mmap_header_t *hdr;
+
+ if (process->outgoing == NULL) {
+ process->outgoing = nxt_array_create(process->mem_pool, 1,
+ sizeof(nxt_port_mmap_t));
+ }
+
+ if (nxt_slow_path(process->outgoing == NULL)) {
+ nxt_log(task, NXT_LOG_WARN, "failed to allocate outgoing array");
+
+ return NULL;
+ }
+
+ port_mmap = nxt_array_zero_add(process->outgoing);
+ if (nxt_slow_path(port_mmap == NULL)) {
+ nxt_log(task, NXT_LOG_WARN,
+ "failed to add port mmap to outgoing array");
+
+ return NULL;
+ }
+
+ port_mmap->id = process->outgoing->nelts - 1;
+ port_mmap->pid = process->pid;
+
+ p = nxt_sprintf(name, name + sizeof(name), "/nginext.%PI.%PT.%D",
+ nxt_pid, task->thread->tid, nxt_random(&nxt_random_data));
+ *p = '\0';
+
+#if (NXT_HAVE_MEMFD_CREATE)
+ port_mmap->fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
+
+ if (nxt_slow_path(port_mmap->fd == -1)) {
+ nxt_log(task, NXT_LOG_CRIT, "memfd_create(%s) failed %E",
+ name, nxt_errno);
+
+ goto remove_fail;
+ }
+
+ nxt_debug(task, "memfd_create(%s): %FD", name, port_mmap->fd);
+
+#elif (NXT_HAVE_SHM_OPEN)
+ shm_unlink((char *) name); // just in case
+
+ port_mmap->fd = shm_open((char *) name, O_CREAT | O_EXCL | O_RDWR,
+ S_IRUSR | S_IWUSR);
+
+ nxt_debug(task, "shm_open(%s): %FD", name, port_mmap->fd);
+
+ if (nxt_slow_path(port_mmap->fd == -1)) {
+ nxt_log(task, NXT_LOG_CRIT, "shm_open(%s) failed %E", name, nxt_errno);
+
+ goto remove_fail;
+ }
+
+ if (nxt_slow_path(shm_unlink((char *) name) == -1)) {
+ nxt_log(task, NXT_LOG_WARN, "shm_unlink(%s) failed %E", name,
+ nxt_errno);
+ }
+#endif
+
+ if (nxt_slow_path(ftruncate(port_mmap->fd, PORT_MMAP_SIZE) == -1)) {
+ nxt_log(task, NXT_LOG_WARN, "ftruncate() failed %E", nxt_errno);
+
+ goto remove_fail;
+ }
+
+ port_mmap->u.mem = nxt_mem_mmap(NULL, PORT_MMAP_SIZE,
+ PROT_READ | PROT_WRITE, MAP_SHARED,
+ port_mmap->fd, 0);
+
+ if (nxt_slow_path(port_mmap->u.mem == MAP_FAILED)) {
+ goto remove_fail;
+ }
+
+ /* Init segment header. */
+ hdr = port_mmap->u.hdr;
+
+ nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
+
+ /* Mark as busy chunk followed the last available chunk. */
+ nxt_port_mmap_set_chunk_busy(hdr, PORT_MMAP_CHUNK_COUNT);
+
+ nxt_debug(task, "send mmap fd %FD to process %PI", port_mmap->fd,
+ port->pid);
+
+ b = nxt_buf_mem_alloc(port->mem_pool, 0, 0);
+ b->completion_handler = nxt_port_mmap_send_fd_buf_completion;
+ b->data = port;
+ b->parent = port_mmap;
+
+ /* TODO handle error */
+ (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, port_mmap->fd,
+ 0, 0, b);
+
+ nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> %PI",
+ port_mmap->id, nxt_pid, process->pid);
+
+ return port_mmap;
+
+remove_fail:
+
+ nxt_array_remove(process->outgoing, port_mmap);
+
+ return NULL;
+}
+
+
+static nxt_port_mmap_t *
+nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c,
+ size_t size)
+{
+ nxt_array_t *outgoing;
+ nxt_process_t *process;
+ nxt_port_mmap_t *port_mmap;
+ nxt_port_mmap_t *end_port_mmap;
+
+ process = nxt_runtime_process_get(task->thread->runtime, port->pid);
+ if (nxt_slow_path(process == NULL)) {
+ return NULL;
+ }
+
+ *c = 0;
+
+ if (process->outgoing == NULL) {
+ return nxt_port_new_port_mmap(task, process, port);
+ }
+
+ outgoing = process->outgoing;
+ port_mmap = outgoing->elts;
+ end_port_mmap = port_mmap + outgoing->nelts;
+
+ while (port_mmap < end_port_mmap) {
+
+ if (nxt_port_mmap_get_free_chunk(port_mmap, c)) {
+ return port_mmap;
+ }
+
+ port_mmap++;
+ }
+
+ /* TODO introduce port_mmap limit and release wait. */
+ return nxt_port_new_port_mmap(task, process, port);
+}
+
+
+static nxt_port_mmap_t *
+nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id)
+{
+ nxt_array_t *incoming;
+ nxt_process_t *process;
+ nxt_port_mmap_t *port_mmap;
+
+ process = nxt_runtime_process_get(task->thread->runtime, spid);
+ if (nxt_slow_path(process == NULL)) {
+ return NULL;
+ }
+
+ incoming = process->incoming;
+ if (nxt_slow_path(incoming == NULL)) {
+ /* TODO add warning */
+ return NULL;
+ }
+
+ if (nxt_slow_path(incoming->nelts <= id)) {
+ /* TODO add warning */
+ return NULL;
+ }
+
+ port_mmap = incoming->elts;
+
+ return port_mmap + id;
+}
+
+
+nxt_buf_t *
+nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
+{
+ size_t nchunks;
+ nxt_buf_t *b;
+ nxt_chunk_id_t c;
+ nxt_port_mmap_t *port_mmap;
+ nxt_port_mmap_header_t *hdr;
+
+ nxt_debug(task, "request %z bytes shm buffer", size);
+
+ b = nxt_mem_cache_zalloc0(port->mem_pool, NXT_BUF_PORT_MMAP_SIZE);
+ if (nxt_slow_path(b == NULL)) {
+ return NULL;
+ }
+
+ b->data = port->mem_pool;
+ b->completion_handler = nxt_port_mmap_buf_completion;
+ b->size = NXT_BUF_PORT_MMAP_SIZE;
+
+ nxt_buf_set_port_mmap(b);
+
+ port_mmap = nxt_port_mmap_get(task, port, &c, size);
+ if (nxt_slow_path(port_mmap == NULL)) {
+ nxt_buf_free(port->mem_pool, b);
+ return NULL;
+ }
+
+ hdr = port_mmap->u.hdr;
+
+ b->parent = port_mmap;
+ b->mem.start = nxt_port_mmap_chunk_start(port_mmap, c);
+ b->mem.pos = b->mem.start;
+ b->mem.free = b->mem.start;
+ b->mem.end = b->mem.start + PORT_MMAP_CHUNK_SIZE;
+
+ nxt_port_mmap_set_chunk_busy(hdr, c);
+
+ nchunks = size / PORT_MMAP_CHUNK_SIZE;
+ if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) {
+ nchunks++;
+ }
+
+ c++;
+ nchunks--;
+
+ /* Try to acquire as much chunks as required. */
+ while (nchunks > 0) {
+
+ if (nxt_port_mmap_get_chunk_busy(hdr, c)) {
+ break;
+ }
+ nxt_port_mmap_set_chunk_busy(hdr, c);
+
+ b->mem.end += PORT_MMAP_CHUNK_SIZE;
+ c++;
+ nchunks--;
+ }
+
+ return b;
+}
+
+
+static nxt_buf_t *
+nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port,
+ nxt_pid_t spid, nxt_port_mmap_msg_t *mmap_msg)
+{
+ size_t nchunks;
+ nxt_buf_t *b;
+ nxt_port_mmap_t *port_mmap;
+
+ b = nxt_mem_cache_zalloc0(port->mem_pool, NXT_BUF_PORT_MMAP_SIZE);
+ if (nxt_slow_path(b == NULL)) {
+ return NULL;
+ }
+
+ b->data = port->mem_pool;
+ b->completion_handler = nxt_port_mmap_buf_completion;
+ b->size = NXT_BUF_PORT_MMAP_SIZE;
+
+ nxt_buf_set_port_mmap(b);
+
+ port_mmap = nxt_port_get_port_incoming_mmap(task, spid, mmap_msg->mmap_id);
+ if (nxt_slow_path(port_mmap == NULL)) {
+ nxt_buf_free(port->mem_pool, b);
+ return NULL;
+ }
+
+ nchunks = mmap_msg->size / PORT_MMAP_CHUNK_SIZE;
+ if ((mmap_msg->size % PORT_MMAP_CHUNK_SIZE) != 0) {
+ nchunks++;
+ }
+
+ b->mem.start = nxt_port_mmap_chunk_start(port_mmap, mmap_msg->chunk_id);
+ b->mem.pos = b->mem.start;
+ b->mem.free = b->mem.start + mmap_msg->size;
+ b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE;
+
+ b->parent = port_mmap;
+
+ return b;
+}
+
+
+void
+nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port,
+ nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb)
+{
+ size_t bsize;
+ nxt_buf_t *b, *bmem;
+ nxt_uint_t i;
+ nxt_port_mmap_t *port_mmap;
+ nxt_port_mmap_msg_t *mmap_msg;
+
+ nxt_debug(task, "prepare %z bytes message for transfer to process %PI "
+ "via shared memory", sb->size, port->pid);
+
+ bsize = sb->niov * sizeof(nxt_port_mmap_msg_t);
+
+ b = nxt_buf_mem_alloc(port->mem_pool, bsize, 0);
+ if (nxt_slow_path(b == NULL)) {
+ return;
+ }
+
+ mmap_msg = (nxt_port_mmap_msg_t *) b->mem.start;
+ bmem = msg->buf;
+
+ for (i = 0; i < sb->niov; i++, mmap_msg++) {
+
+ /* Lookup buffer which starts current iov_base. */
+ while (bmem && sb->iobuf[i].iov_base != bmem->mem.pos) {
+ bmem = bmem->next;
+ }
+
+ if (nxt_slow_path(bmem == NULL)) {
+ nxt_log_error(NXT_LOG_ERR, task->log, "failed to find buf for "
+ "iobuf[%d]", i);
+ return;
+ /* TODO clear b and exit */
+ }
+
+ port_mmap = (nxt_port_mmap_t *) bmem->parent;
+
+ mmap_msg->mmap_id = port_mmap->id;
+ mmap_msg->chunk_id = nxt_port_mmap_chunk_id(port_mmap, bmem->mem.pos);
+ mmap_msg->size = sb->iobuf[i].iov_len;
+
+ nxt_debug(task, "mmap_msg={%D, %D, %D} to %PI",
+ mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size,
+ port->pid);
+ }
+
+ msg->buf = b;
+ b->mem.free += bsize;
+
+ sb->iobuf[0].iov_base = b->mem.pos;
+ sb->iobuf[0].iov_len = bsize;
+ sb->niov = 1;
+ sb->size = bsize;
+
+ msg->port_msg.mmap = 1;
+}
+
+
+void
+nxt_port_mmap_read(nxt_task_t *task, nxt_port_t *port,
+ nxt_port_recv_msg_t *msg, size_t size)
+{
+ nxt_buf_t *b, **pb;
+ nxt_port_mmap_msg_t *end, *mmap_msg;
+
+ b = msg->buf;
+
+ mmap_msg = (nxt_port_mmap_msg_t *) b->mem.pos;
+ end = (nxt_port_mmap_msg_t *) b->mem.free;
+
+ pb = &msg->buf;
+
+ while (mmap_msg < end) {
+ nxt_debug(task, "mmap_msg={%D, %D, %D} from %PI",
+ mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size,
+ msg->port_msg.pid);
+
+ *pb = nxt_port_mmap_get_incoming_buf(task, port, msg->port_msg.pid,
+ mmap_msg);
+ if (nxt_slow_path(*pb == NULL)) {
+ nxt_log_error(NXT_LOG_ERR, task->log, "failed to get mmap buffer");
+
+ break;
+ }
+
+ pb = &(*pb)->next;
+ mmap_msg++;
+ }
+
+ /* Mark original buf as complete. */
+ b->mem.pos += nxt_buf_used_size(b);
+}
+
+
+nxt_port_method_t
+nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b)
+{
+ nxt_port_mmap_t *port_mmap;
+ nxt_port_method_t m;
+
+ m = NXT_PORT_METHOD_ANY;
+
+ for (; b != NULL; b = b->next) {
+ if (nxt_buf_used_size(b) == 0) {
+ /* empty buffers does not affect method */
+ continue;
+ }
+
+ if (nxt_buf_is_port_mmap(b)) {
+ port_mmap = (nxt_port_mmap_t *) b->parent;
+
+ if (m == NXT_PORT_METHOD_PLAIN) {
+ nxt_log_error(NXT_LOG_ERR, task->log,
+ "mixing plain and mmap buffers, "
+ "using plain mode");
+
+ break;
+ }
+
+ if (port->pid != port_mmap->pid) {
+ nxt_log_error(NXT_LOG_ERR, task->log,
+ "send mmap buffer for %PI to %PI, "
+ "using plain mode", port_mmap->pid, port->pid);
+
+ m = NXT_PORT_METHOD_PLAIN;
+
+ break;
+ }
+
+ if (m == NXT_PORT_METHOD_ANY) {
+ nxt_debug(task, "using mmap mode");
+
+ m = NXT_PORT_METHOD_MMAP;
+ }
+ } else {
+ if (m == NXT_PORT_METHOD_MMAP) {
+ nxt_log_error(NXT_LOG_ERR, task->log,
+ "mixing mmap and plain buffers, "
+ "switching to plain mode");
+
+ m = NXT_PORT_METHOD_PLAIN;
+
+ break;
+ }
+
+ if (m == NXT_PORT_METHOD_ANY) {
+ nxt_debug(task, "using plain mode");
+
+ m = NXT_PORT_METHOD_PLAIN;
+ }
+ }
+ }
+
+ return m;
+}
diff --git a/src/nxt_port_memory.h b/src/nxt_port_memory.h
new file mode 100644
index 00000000..9ad4e2a4
--- /dev/null
+++ b/src/nxt_port_memory.h
@@ -0,0 +1,50 @@
+
+#ifndef _NXT_PORT_MEMORY_H_INCLUDED_
+#define _NXT_PORT_MEMORY_H_INCLUDED_
+
+#define PORT_MMAP_MIN_SIZE (3 * sizeof(uint32_t))
+
+typedef struct nxt_port_mmap_header_s nxt_port_mmap_header_t;
+
+void
+nxt_port_mmap_destroy(nxt_port_mmap_t *port_mmap);
+
+/*
+ * Allocates nxt_but_t structure from port's mem_pool, assigns this buf 'mem'
+ * pointers to first available shared mem bucket(s). 'size' used as a hint to
+ * acquire several successive buckets if possible.
+ *
+ * This function assumes that current thread operates the 'port' exclusively.
+ */
+nxt_buf_t *
+nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size);
+
+nxt_port_mmap_t *
+nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process,
+ nxt_fd_t fd);
+
+void
+nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port,
+ nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb);
+
+nxt_inline void
+nxt_port_mmap_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b) {
+ nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
+}
+
+void
+nxt_port_mmap_read(nxt_task_t *task, nxt_port_t *port,
+ nxt_port_recv_msg_t *msg, size_t size);
+
+enum nxt_port_method_e {
+ NXT_PORT_METHOD_ANY = 0,
+ NXT_PORT_METHOD_PLAIN,
+ NXT_PORT_METHOD_MMAP
+};
+
+typedef enum nxt_port_method_e nxt_port_method_t;
+
+nxt_port_method_t
+nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b);
+
+#endif /* _NXT_PORT_MEMORY_H_INCLUDED_ */
diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c
index 671c3334..51d88ff7 100644
--- a/src/nxt_port_socket.c
+++ b/src/nxt_port_socket.c
@@ -10,7 +10,7 @@
static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data);
static void nxt_port_read_handler(nxt_task_t *task, void *obj, void *data);
static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
- nxt_port_msg_t *msg, nxt_fd_t fd, nxt_buf_t *b, size_t size);
+ nxt_port_recv_msg_t *msg, size_t size);
static nxt_buf_t *nxt_port_buf_alloc(nxt_port_t *port);
static void nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b);
static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data);
@@ -137,7 +137,7 @@ nxt_port_write_close(nxt_port_t *port)
nxt_int_t
nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
- nxt_fd_t fd, uint32_t stream, nxt_buf_t *b)
+ nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b)
{
nxt_queue_link_t *link;
nxt_port_send_msg_t *msg;
@@ -169,8 +169,11 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
msg->share = 0;
msg->port_msg.stream = stream;
+ msg->port_msg.pid = nxt_pid;
+ msg->port_msg.reply_port = reply_port;
msg->port_msg.type = type;
msg->port_msg.last = 0;
+ msg->port_msg.mmap = 0;
nxt_queue_insert_tail(&port->messages, &msg->link);
@@ -186,12 +189,15 @@ static void
nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
{
ssize_t n;
- nxt_uint_t niov;
nxt_port_t *port;
struct iovec iov[NXT_IOBUF_MAX];
nxt_queue_link_t *link;
nxt_port_send_msg_t *msg;
nxt_sendbuf_coalesce_t sb;
+ nxt_port_method_t m;
+
+ size_t plain_size;
+ nxt_buf_t *plain_buf;
port = obj;
@@ -213,27 +219,59 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
sb.nmax = NXT_IOBUF_MAX - 1;
sb.sync = 0;
sb.last = 0;
- sb.size = sizeof(nxt_port_msg_t);
+ sb.size = 0;
sb.limit = port->max_size;
- niov = nxt_sendbuf_mem_coalesce(task, &sb);
+ m = nxt_port_mmap_get_method(task, port, msg->buf);
+
+ if (m == NXT_PORT_METHOD_MMAP) {
+ sb.limit = (1ULL << 31) - 1;
+ }
+
+ nxt_sendbuf_mem_coalesce(task, &sb);
+
+ plain_size = sb.size;
+ plain_buf = msg->buf;
+
+ /*
+ * Send through mmap enabled only when payload
+ * is bigger than PORT_MMAP_MIN_SIZE.
+ */
+ if (m == NXT_PORT_METHOD_MMAP && plain_size > PORT_MMAP_MIN_SIZE) {
+ nxt_port_mmap_write(task, port, msg, &sb);
+
+ } else {
+ m = NXT_PORT_METHOD_PLAIN;
+ }
msg->port_msg.last = sb.last;
- n = nxt_socketpair_send(&port->socket, msg->fd, iov, niov + 1);
+ n = nxt_socketpair_send(&port->socket, msg->fd, iov, sb.niov + 1);
if (n > 0) {
- if (nxt_slow_path((size_t) n != sb.size)) {
+ if (nxt_slow_path((size_t) n != sb.size + iov[0].iov_len)) {
nxt_log(task, NXT_LOG_CRIT,
"port %d: short write: %z instead of %uz",
- port->socket.fd, n, sb.size);
+ port->socket.fd, n, sb.size + iov[0].iov_len);
goto fail;
}
+ if (msg->buf != plain_buf) {
+ /*
+ * Complete crafted mmap_msgs buf and restore msg->buf
+ * for regular completion call.
+ */
+ nxt_port_mmap_completion(task,
+ port->socket.write_work_queue,
+ msg->buf);
+
+ msg->buf = plain_buf;
+ }
+
msg->buf = nxt_sendbuf_completion(task,
port->socket.write_work_queue,
msg->buf,
- n - sizeof(nxt_port_msg_t));
+ plain_size);
if (msg->buf != NULL) {
/*
@@ -301,14 +339,13 @@ nxt_port_read_close(nxt_port_t *port)
static void
nxt_port_read_handler(nxt_task_t *task, void *obj, void *data)
{
- ssize_t n;
- nxt_fd_t fd;
- nxt_buf_t *b;
- nxt_port_t *port;
- struct iovec iov[2];
- nxt_port_msg_t msg;
+ ssize_t n;
+ nxt_buf_t *b;
+ nxt_port_t *port;
+ struct iovec iov[2];
+ nxt_port_recv_msg_t msg;
- port = obj;
+ port = msg.port = obj;
for ( ;; ) {
@@ -318,24 +355,21 @@ nxt_port_read_handler(nxt_task_t *task, void *obj, void *data)
/* TODO: disable event for some time */
}
- iov[0].iov_base = &msg;
+ iov[0].iov_base = &msg.port_msg;
iov[0].iov_len = sizeof(nxt_port_msg_t);
iov[1].iov_base = b->mem.pos;
iov[1].iov_len = port->max_size;
- n = nxt_socketpair_recv(&port->socket, &fd, iov, 2);
+ n = nxt_socketpair_recv(&port->socket, &msg.fd, iov, 2);
if (n > 0) {
- nxt_port_read_msg_process(task, port, &msg, fd, b, n);
- if (b->mem.pos == b->mem.free) {
+ msg.buf = b;
- if (b->next != NULL) {
- /* A sync buffer */
- nxt_buf_free(port->mem_pool, b->next);
- }
+ nxt_port_read_msg_process(task, port, &msg, n);
+ if (b->mem.pos == b->mem.free) {
nxt_port_buf_free(port, b);
}
@@ -364,10 +398,11 @@ nxt_port_read_handler(nxt_task_t *task, void *obj, void *data)
static void
nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
- nxt_port_msg_t *msg, nxt_fd_t fd, nxt_buf_t *b, size_t size)
+ nxt_port_recv_msg_t *msg, size_t size)
{
- nxt_buf_t *sync;
- nxt_port_recv_msg_t recv_msg;
+ nxt_buf_t *b;
+ nxt_buf_t *orig_b;
+ nxt_buf_t **last_next;
if (nxt_slow_path(size < sizeof(nxt_port_msg_t))) {
nxt_log(port->socket.task, NXT_LOG_CRIT,
@@ -375,31 +410,56 @@ nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
goto fail;
}
- recv_msg.stream = msg->stream;
- recv_msg.type = msg->type;
- recv_msg.fd = fd;
- recv_msg.buf = b;
- recv_msg.port = port;
+ /* adjust size to actual buffer used size */
+ size -= sizeof(nxt_port_msg_t);
+
+ b = orig_b = msg->buf;
+ b->mem.free += size;
+
+ if (msg->port_msg.mmap) {
+ nxt_port_mmap_read(task, port, msg, size);
+ b = msg->buf;
+ }
- b->mem.free += size - sizeof(nxt_port_msg_t);
+ last_next = &b->next;
- if (msg->last) {
- sync = nxt_buf_sync_alloc(port->mem_pool, NXT_BUF_SYNC_LAST);
- if (nxt_slow_path(sync == NULL)) {
+ if (msg->port_msg.last) {
+ /* find reference to last next, the NULL one */
+ while (*last_next) {
+ last_next = &(*last_next)->next;
+ }
+
+ *last_next = nxt_buf_sync_alloc(port->mem_pool, NXT_BUF_SYNC_LAST);
+ if (nxt_slow_path(*last_next == NULL)) {
goto fail;
}
+ }
+
+ port->handler(task, msg);
- b->next = sync;
+ if (*last_next != NULL) {
+ /* A sync buffer */
+ nxt_buf_free(port->mem_pool, *last_next);
+ *last_next = NULL;
}
- port->handler(task, &recv_msg);
+ if (orig_b != b) {
+ /* complete mmap buffers */
+ for (; b && nxt_buf_used_size(b) == 0;
+ b = b->next) {
+ nxt_debug(task, "complete buffer %p", b);
+
+ nxt_work_queue_add(port->socket.read_work_queue,
+ b->completion_handler, task, b, b->parent);
+ }
+ }
return;
fail:
- if (fd != -1) {
- nxt_fd_close(fd);
+ if (msg->fd != -1) {
+ nxt_fd_close(msg->fd);
}
}
@@ -415,6 +475,7 @@ nxt_port_buf_alloc(nxt_port_t *port)
b->mem.pos = b->mem.start;
b->mem.free = b->mem.start;
+ b->next = NULL;
} else {
b = nxt_buf_mem_alloc(port->mem_pool, port->max_size, 0);
diff --git a/src/nxt_process.c b/src/nxt_process.c
index d24be580..64fd1f19 100644
--- a/src/nxt_process.c
+++ b/src/nxt_process.c
@@ -506,3 +506,22 @@ nxt_user_cred_set(nxt_task_t *task, nxt_user_cred_t *uc)
return NXT_OK;
}
+
+
+nxt_port_t *
+nxt_process_port_new(nxt_process_t *process)
+{
+ nxt_port_t *port;
+
+ port = nxt_mem_cache_zalloc0(process->mem_pool, sizeof(nxt_port_t));
+ if (nxt_fast_path(port != NULL)) {
+ port->id = process->last_port_id++;
+ port->pid = process->pid;
+ port->process = process;
+
+ nxt_process_port_add(process, port);
+ }
+
+ return port;
+}
+
diff --git a/src/nxt_process.h b/src/nxt_process.h
index 92f673a2..b6262106 100644
--- a/src/nxt_process.h
+++ b/src/nxt_process.h
@@ -49,9 +49,15 @@ struct nxt_process_init_s {
typedef struct {
+ nxt_mem_pool_t *mem_pool;
+
nxt_pid_t pid;
- nxt_array_t *ports; /* of nxt_port_t */
+ nxt_queue_t ports; /* of nxt_port_t */
+ nxt_port_id_t last_port_id;
+
nxt_process_init_t *init;
+ nxt_array_t *incoming; /* of nxt_mmap_t */
+ nxt_array_t *outgoing; /* of nxt_mmap_t */
} nxt_process_t;
@@ -65,6 +71,22 @@ NXT_EXPORT void nxt_nanosleep(nxt_nsec_t ns);
NXT_EXPORT void nxt_process_arguments(nxt_task_t *task, char **orig_argv,
char ***orig_envp);
+NXT_EXPORT nxt_port_t * nxt_process_port_new(nxt_process_t *process);
+
+#define nxt_process_port_remove(port) \
+ nxt_queue_remove(&port->link)
+
+#define nxt_process_port_first(process) \
+ nxt_queue_link_data(nxt_queue_first(&process->ports), nxt_port_t, link)
+
+#define nxt_process_port_add(process, port) \
+ nxt_queue_insert_tail(&process->ports, &port->link)
+
+#define nxt_process_port_each(process, port) \
+ nxt_queue_each(port, &process->ports, nxt_port_t, link)
+
+#define nxt_process_port_loop \
+ nxt_queue_loop
#if (NXT_HAVE_SETPROCTITLE)
diff --git a/src/nxt_queue.h b/src/nxt_queue.h
index e8f7c245..f1efd328 100644
--- a/src/nxt_queue.h
+++ b/src/nxt_queue.h
@@ -216,4 +216,19 @@ NXT_EXPORT void nxt_queue_sort(nxt_queue_t *queue,
const nxt_queue_link_t *), const void *data);
+#define nxt_queue_each(elt, queue, type, link) \
+ do { \
+ nxt_queue_link_t *_lnk; \
+ \
+ for (_lnk = nxt_queue_first(queue); \
+ _lnk != nxt_queue_tail(queue); \
+ _lnk = nxt_queue_next(_lnk)) { \
+ \
+ elt = nxt_queue_link_data(_lnk, type, link); \
+
+#define nxt_queue_loop \
+ } \
+ } while(0)
+
+
#endif /* _NXT_QUEUE_H_INCLUDED_ */
diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c
index 84812cae..8337712a 100644
--- a/src/nxt_runtime.c
+++ b/src/nxt_runtime.c
@@ -16,7 +16,6 @@ static nxt_int_t nxt_runtime_inherited_listen_sockets(nxt_task_t *task,
static nxt_int_t nxt_runtime_systemd_listen_sockets(nxt_task_t *task,
nxt_runtime_t *rt);
static nxt_int_t nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt);
-static nxt_int_t nxt_runtime_processes(nxt_runtime_t *rt);
static nxt_int_t nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt);
static void nxt_runtime_start(nxt_task_t *task, void *obj, void *data);
static void nxt_runtime_initial_start(nxt_task_t *task);
@@ -105,10 +104,6 @@ nxt_runtime_create(nxt_task_t *task)
goto fail;
}
- if (nxt_slow_path(nxt_runtime_processes(rt) != NXT_OK)) {
- goto fail;
- }
-
if (nxt_slow_path(nxt_runtime_thread_pools(task->thread, rt) != NXT_OK)) {
goto fail;
}
@@ -294,18 +289,6 @@ nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt)
static nxt_int_t
-nxt_runtime_processes(nxt_runtime_t *rt)
-{
- rt->processes = nxt_array_create(rt->mem_pool, 4, sizeof(nxt_process_t));
- if (nxt_slow_path(rt->processes == NULL)) {
- return NXT_ERROR;
- }
-
- return NXT_OK;
-}
-
-
-static nxt_int_t
nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt)
{
#if (NXT_THREADS)
@@ -1479,21 +1462,323 @@ nxt_runtime_pid_file_create(nxt_task_t *task, nxt_file_name_t *pid_file)
nxt_process_t *
-nxt_runtime_new_process(nxt_runtime_t *rt)
+nxt_runtime_process_new(nxt_runtime_t *rt)
{
nxt_process_t *process;
/* TODO: memory failures. */
- process = nxt_array_zero_add(rt->processes);
+ process = nxt_mem_cache_zalloc0(rt->mem_pool, sizeof(nxt_process_t));
if (nxt_slow_path(process == NULL)) {
return NULL;
}
- process->ports = nxt_array_create(rt->mem_pool, 1, sizeof(nxt_port_t));
- if (nxt_slow_path(process->ports == NULL)) {
+ nxt_queue_init(&process->ports);
+
+ /* TODO each process should have it's own mem_pool for ports allocation */
+ process->mem_pool = rt->mem_pool;
+
+ return process;
+}
+
+
+static nxt_int_t
+nxt_runtime_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data)
+{
+ nxt_process_t *process;
+
+ process = data;
+
+ if (lhq->key.length == sizeof(nxt_pid_t) &&
+ *(nxt_pid_t *) lhq->key.start == process->pid) {
+ return NXT_OK;
+ }
+
+ return NXT_DECLINED;
+}
+
+static const nxt_lvlhsh_proto_t lvlhsh_processes_proto nxt_aligned(64) = {
+ NXT_LVLHSH_DEFAULT,
+ 0,
+ nxt_runtime_lvlhsh_pid_test,
+ nxt_lvlhsh_alloc,
+ nxt_lvlhsh_free,
+};
+
+
+typedef struct {
+ nxt_pid_t pid;
+ nxt_port_id_t port_id;
+} nxt_pid_port_id_t;
+
+static nxt_int_t
+nxt_runtime_lvlhsh_port_test(nxt_lvlhsh_query_t *lhq, void *data)
+{
+ nxt_port_t *port;
+ nxt_pid_port_id_t *pid_port_id;
+
+ port = data;
+ pid_port_id = (nxt_pid_port_id_t *) lhq->key.start;
+
+ if (lhq->key.length == sizeof(nxt_pid_port_id_t) &&
+ pid_port_id->pid == port->pid &&
+ pid_port_id->port_id == port->id) {
+ return NXT_OK;
+ }
+
+ return NXT_DECLINED;
+}
+
+static const nxt_lvlhsh_proto_t lvlhsh_ports_proto nxt_aligned(64) = {
+ NXT_LVLHSH_DEFAULT,
+ 0,
+ nxt_runtime_lvlhsh_port_test,
+ nxt_lvlhsh_alloc,
+ nxt_lvlhsh_free,
+};
+
+
+nxt_process_t *
+nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid)
+{
+ nxt_lvlhsh_query_t lhq;
+
+ lhq.key_hash = nxt_murmur_hash2(&pid, sizeof(pid));
+ lhq.key.length = sizeof(pid);
+ lhq.key.start = (u_char *) &pid;
+ lhq.proto = &lvlhsh_processes_proto;
+
+ /* TODO lock processes */
+
+ if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) {
+ nxt_thread_log_debug("process %PI found", pid);
+ return lhq.value;
+ }
+
+ nxt_thread_log_debug("process %PI not found", pid);
+
+ return NULL;
+}
+
+
+nxt_process_t *
+nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid)
+{
+ nxt_process_t *process;
+ nxt_lvlhsh_query_t lhq;
+
+ lhq.key_hash = nxt_murmur_hash2(&pid, sizeof(pid));
+ lhq.key.length = sizeof(pid);
+ lhq.key.start = (u_char *) &pid;
+ lhq.proto = &lvlhsh_processes_proto;
+
+ /* TODO lock processes */
+
+ if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) {
+ nxt_thread_log_debug("process %PI found", pid);
+ return lhq.value;
+ }
+
+ process = nxt_runtime_process_new(rt);
+ if (nxt_slow_path(process == NULL)) {
return NULL;
}
+ process->pid = pid;
+
+ lhq.replace = 0;
+ lhq.value = process;
+ lhq.pool = rt->mem_pool;
+
+ switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) {
+
+ case NXT_OK:
+ if (rt->nprocesses == 0) {
+ rt->mprocess = process;
+ }
+
+ rt->nprocesses++;
+
+ nxt_thread_log_debug("process %PI insert", pid);
+ break;
+
+ default:
+ nxt_thread_log_debug("process %PI insert failed", pid);
+ break;
+ }
+
return process;
}
+
+
+void
+nxt_runtime_process_add(nxt_runtime_t *rt, nxt_process_t *process)
+{
+ nxt_port_t *port;
+ nxt_lvlhsh_query_t lhq;
+
+ lhq.key_hash = nxt_murmur_hash2(&process->pid, sizeof(process->pid));
+ lhq.key.length = sizeof(process->pid);
+ lhq.key.start = (u_char *) &process->pid;
+ lhq.proto = &lvlhsh_processes_proto;
+ lhq.replace = 0;
+ lhq.value = process;
+ lhq.pool = rt->mem_pool;
+
+ /* TODO lock processes */
+
+ switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) {
+
+ case NXT_OK:
+ if (rt->nprocesses == 0) {
+ rt->mprocess = process;
+ }
+
+ rt->nprocesses++;
+
+ nxt_process_port_each(process, port) {
+
+ nxt_runtime_port_add(rt, port);
+
+ } nxt_process_port_loop;
+
+ break;
+
+ default:
+ break;
+ }
+}
+
+
+void
+nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process)
+{
+ nxt_port_t *port;
+ nxt_lvlhsh_query_t lhq;
+
+ lhq.key_hash = nxt_murmur_hash2(&process->pid, sizeof(process->pid));
+ lhq.key.length = sizeof(process->pid);
+ lhq.key.start = (u_char *) &process->pid;
+ lhq.proto = &lvlhsh_processes_proto;
+ lhq.replace = 0;
+ lhq.value = process;
+ lhq.pool = rt->mem_pool;
+
+ /* TODO lock processes */
+
+ switch (nxt_lvlhsh_delete(&rt->processes, &lhq)) {
+
+ case NXT_OK:
+ rt->nprocesses--;
+
+ nxt_process_port_each(process, port) {
+
+ nxt_runtime_port_remove(rt, port);
+
+ } nxt_process_port_loop;
+
+ break;
+
+ default:
+ break;
+ }
+}
+
+
+nxt_process_t *
+nxt_runtime_process_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe)
+{
+ nxt_memzero(lhe, sizeof(nxt_lvlhsh_each_t));
+
+ lhe->proto = &lvlhsh_processes_proto;
+
+ return nxt_runtime_process_next(rt, lhe);
+}
+
+
+void
+nxt_runtime_port_add(nxt_runtime_t *rt, nxt_port_t *port)
+{
+ nxt_pid_port_id_t pid_port;
+ nxt_lvlhsh_query_t lhq;
+
+ pid_port.pid = port->pid;
+ pid_port.port_id = port->id;
+
+ lhq.key_hash = nxt_murmur_hash2(&pid_port, sizeof(pid_port));
+ lhq.key.length = sizeof(pid_port);
+ lhq.key.start = (u_char *) &pid_port;
+ lhq.proto = &lvlhsh_ports_proto;
+ lhq.replace = 0;
+ lhq.value = port;
+ lhq.pool = rt->mem_pool;
+
+ /* TODO lock ports */
+
+ switch (nxt_lvlhsh_insert(&rt->ports, &lhq)) {
+
+ case NXT_OK:
+ break;
+
+ default:
+ break;
+ }
+}
+
+
+void
+nxt_runtime_port_remove(nxt_runtime_t *rt, nxt_port_t *port)
+{
+ nxt_pid_port_id_t pid_port;
+ nxt_lvlhsh_query_t lhq;
+
+ pid_port.pid = port->pid;
+ pid_port.port_id = port->id;
+
+ lhq.key_hash = nxt_murmur_hash2(&pid_port, sizeof(pid_port));
+ lhq.key.length = sizeof(pid_port);
+ lhq.key.start = (u_char *) &pid_port;
+ lhq.proto = &lvlhsh_ports_proto;
+ lhq.replace = 0;
+ lhq.value = port;
+ lhq.pool = rt->mem_pool;
+
+ /* TODO lock ports */
+
+ switch (nxt_lvlhsh_delete(&rt->ports, &lhq)) {
+
+ case NXT_OK:
+ break;
+
+ default:
+ break;
+ }
+}
+
+
+nxt_port_t *
+nxt_runtime_port_find(nxt_runtime_t *rt, nxt_pid_t pid,
+ nxt_port_id_t port_id)
+{
+ nxt_pid_port_id_t pid_port;
+ nxt_lvlhsh_query_t lhq;
+
+ pid_port.pid = pid;
+ pid_port.port_id = port_id;
+
+ lhq.key_hash = nxt_murmur_hash2(&pid_port, sizeof(pid_port));
+ lhq.key.length = sizeof(pid_port);
+ lhq.key.start = (u_char *) &pid_port;
+ lhq.proto = &lvlhsh_ports_proto;
+
+ /* TODO lock ports */
+
+ if (nxt_lvlhsh_find(&rt->ports, &lhq) == NXT_OK) {
+ nxt_thread_log_debug("process port (%PI, %d) found", pid, port_id);
+ return lhq.value;
+ }
+
+ nxt_thread_log_debug("process port (%PI, %d) not found", pid, port_id);
+
+ return NULL;
+}
diff --git a/src/nxt_runtime.h b/src/nxt_runtime.h
index 3b384235..5cce5703 100644
--- a/src/nxt_runtime.h
+++ b/src/nxt_runtime.h
@@ -35,7 +35,11 @@ struct nxt_runtime_s {
nxt_runtime_cont_t continuation;
#endif
- nxt_array_t *processes; /* of nxt_process_t */
+ nxt_process_t *mprocess;
+ size_t nprocesses;
+ nxt_lvlhsh_t processes; /* of nxt_process_t */
+
+ nxt_lvlhsh_t ports; /* of nxt_port_t */
nxt_list_t *log_files; /* of nxt_file_t */
@@ -78,7 +82,30 @@ nxt_int_t nxt_runtime_thread_pool_create(nxt_thread_t *thr, nxt_runtime_t *rt,
#endif
-nxt_process_t *nxt_runtime_new_process(nxt_runtime_t *rt);
+nxt_process_t *nxt_runtime_process_new(nxt_runtime_t *rt);
+
+nxt_process_t *nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid);
+
+void nxt_runtime_process_add(nxt_runtime_t *rt, nxt_process_t *process);
+
+nxt_process_t *nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid);
+
+void nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process);
+
+nxt_process_t *nxt_runtime_process_first(nxt_runtime_t *rt,
+ nxt_lvlhsh_each_t *lhe);
+
+#define nxt_runtime_process_next(rt, lhe) \
+ nxt_lvlhsh_each(&rt->processes, lhe)
+
+
+void nxt_runtime_port_add(nxt_runtime_t *rt, nxt_port_t *port);
+
+void nxt_runtime_port_remove(nxt_runtime_t *rt, nxt_port_t *port);
+
+nxt_port_t *nxt_runtime_port_find(nxt_runtime_t *rt, nxt_pid_t pid,
+ nxt_port_id_t port_id);
+
/* STUB */
nxt_int_t nxt_runtime_controller_socket(nxt_task_t *task, nxt_runtime_t *rt);
@@ -101,6 +128,19 @@ void nxt_stream_connection_init(nxt_task_t *task, void *obj, void *data);
nxt_int_t nxt_app_start(nxt_task_t *task, nxt_runtime_t *rt);
+#define nxt_runtime_process_each(rt, process) \
+ do { \
+ nxt_lvlhsh_each_t _lhe; \
+ \
+ for (process = nxt_runtime_process_first(rt, &_lhe); \
+ process != NULL; \
+ process = nxt_runtime_process_next(rt, &_lhe)) { \
+
+#define nxt_runtime_process_loop \
+ } \
+ } while(0)
+
+
extern nxt_module_init_t nxt_init_modules[];
extern nxt_uint_t nxt_init_modules_n;
diff --git a/src/nxt_sendbuf.c b/src/nxt_sendbuf.c
index 62042504..e8fbe2a0 100644
--- a/src/nxt_sendbuf.c
+++ b/src/nxt_sendbuf.c
@@ -148,6 +148,7 @@ done:
sb->buf = b;
sb->size = total;
+ sb->niov = n;
return n;
}
@@ -390,6 +391,14 @@ nxt_sendbuf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b,
break;
}
+ if (nxt_buf_is_port_mmap(b)) {
+ /*
+ * buffer has been sent to other side which is now
+ * responsible for shared memory bucket release
+ */
+ b->is_port_mmap_sent = 1;
+ }
+
if (sent < size) {
if (nxt_buf_is_mem(b)) {
diff --git a/src/nxt_sendbuf.h b/src/nxt_sendbuf.h
index 71b18f91..0b789583 100644
--- a/src/nxt_sendbuf.h
+++ b/src/nxt_sendbuf.h
@@ -53,6 +53,7 @@ typedef struct {
typedef struct {
nxt_buf_t *buf;
nxt_iobuf_t *iobuf;
+ nxt_uint_t niov;
uint32_t nmax;
uint8_t sync; /* 1 bit */
diff --git a/src/nxt_worker_process.c b/src/nxt_worker_process.c
index 59285113..b6266520 100644
--- a/src/nxt_worker_process.c
+++ b/src/nxt_worker_process.c
@@ -25,6 +25,7 @@ nxt_port_handler_t nxt_worker_process_port_handlers[] = {
nxt_worker_process_quit_handler,
nxt_port_new_port_handler,
nxt_port_change_log_file_handler,
+ nxt_port_mmap_handler,
nxt_port_data_handler,
};