diff options
author | Max Romanov <max.romanov@nginx.com> | 2017-05-12 20:32:41 +0300 |
---|---|---|
committer | Max Romanov <max.romanov@nginx.com> | 2017-05-12 20:32:41 +0300 |
commit | f7b4bdfd892a0b479dc946896435a3ba7f9615dd (patch) | |
tree | a6f0c4ebaeed2d9f0fcb1c07178b52a684a53280 /src | |
parent | 1782c771fab999b37a8c04ed72760e3528205be7 (diff) | |
download | unit-f7b4bdfd892a0b479dc946896435a3ba7f9615dd.tar.gz unit-f7b4bdfd892a0b479dc946896435a3ba7f9615dd.tar.bz2 |
Using shared memory to send data via nxt_port.
Usage:
b = nxt_port_mmap_get_buf(task, port, size);
b->mem.free = nxt_cpymem(b->mem.free, data, size);
nxt_port_socket_write(task, port, NXT_PORT_MSG_DATA, -1, 0, b);
Diffstat (limited to 'src')
-rw-r--r-- | src/nxt_buf.h | 26 | ||||
-rw-r--r-- | src/nxt_main.h | 5 | ||||
-rw-r--r-- | src/nxt_master_process.c | 81 | ||||
-rw-r--r-- | src/nxt_port.c | 127 | ||||
-rw-r--r-- | src/nxt_port.h | 49 | ||||
-rw-r--r-- | src/nxt_port_memory.c | 700 | ||||
-rw-r--r-- | src/nxt_port_memory.h | 50 | ||||
-rw-r--r-- | src/nxt_port_socket.c | 141 | ||||
-rw-r--r-- | src/nxt_process.c | 19 | ||||
-rw-r--r-- | src/nxt_process.h | 24 | ||||
-rw-r--r-- | src/nxt_queue.h | 15 | ||||
-rw-r--r-- | src/nxt_runtime.c | 327 | ||||
-rw-r--r-- | src/nxt_runtime.h | 44 | ||||
-rw-r--r-- | src/nxt_sendbuf.c | 9 | ||||
-rw-r--r-- | src/nxt_sendbuf.h | 1 | ||||
-rw-r--r-- | src/nxt_worker_process.c | 1 |
16 files changed, 1438 insertions, 181 deletions
diff --git a/src/nxt_buf.h b/src/nxt_buf.h index 85ab3602..c5ac5c20 100644 --- a/src/nxt_buf.h +++ b/src/nxt_buf.h @@ -67,7 +67,7 @@ typedef struct { struct nxt_buf_s { void *data; nxt_work_handler_t completion_handler; - nxt_buf_t *parent; + void *parent; /* * The next link, flags, and nxt_buf_mem_t should @@ -85,11 +85,13 @@ struct nxt_buf_s { uint8_t is_file; /* 1 bit */ uint16_t is_mmap:1; + uint16_t is_port_mmap:1; uint16_t is_sync:1; uint16_t is_nobuf:1; uint16_t is_flush:1; uint16_t is_last:1; + uint16_t is_port_mmap_sent:1; nxt_buf_mem_t mem; @@ -103,10 +105,11 @@ struct nxt_buf_s { }; -#define NXT_BUF_MEM_SIZE offsetof(nxt_buf_t, file) -#define NXT_BUF_SYNC_SIZE NXT_BUF_MEM_SIZE -#define NXT_BUF_MMAP_SIZE sizeof(nxt_buf_t) -#define NXT_BUF_FILE_SIZE sizeof(nxt_buf_t) +#define NXT_BUF_MEM_SIZE offsetof(nxt_buf_t, file) +#define NXT_BUF_SYNC_SIZE NXT_BUF_MEM_SIZE +#define NXT_BUF_FILE_SIZE sizeof(nxt_buf_t) +#define NXT_BUF_MMAP_SIZE NXT_BUF_FILE_SIZE +#define NXT_BUF_PORT_MMAP_SIZE NXT_BUF_MEM_SIZE #define NXT_BUF_SYNC_NOBUF 1 @@ -146,6 +149,19 @@ nxt_buf_clear_mmap(b) \ #define \ +nxt_buf_is_port_mmap(b) \ + ((b)->is_port_mmap) + +#define \ +nxt_buf_set_port_mmap(b) \ + (b)->is_port_mmap = 1 + +#define \ +nxt_buf_clear_port_mmap(b) \ + (b)->is_port_mmap = 0 + + +#define \ nxt_buf_is_sync(b) \ ((b)->is_sync) diff --git a/src/nxt_main.h b/src/nxt_main.h index a1a01434..24ec4ca1 100644 --- a/src/nxt_main.h +++ b/src/nxt_main.h @@ -24,7 +24,9 @@ typedef struct nxt_port_recv_msg_s nxt_port_recv_msg_t; typedef void (*nxt_port_handler_t)(nxt_task_t *task, nxt_port_recv_msg_t *msg); typedef struct nxt_sig_event_s nxt_sig_event_t; typedef struct nxt_runtime_s nxt_runtime_t; +typedef uint16_t nxt_port_id_t; +#include <nxt_queue.h> #include <nxt_process.h> typedef struct nxt_thread_s nxt_thread_t; @@ -45,7 +47,6 @@ typedef struct nxt_log_s nxt_log_t; #include <nxt_atomic.h> -#include <nxt_queue.h> #include <nxt_rbtree.h> #include <nxt_sprintf.h> #include <nxt_parse.h> @@ -104,6 +105,7 @@ typedef struct nxt_thread_pool_s nxt_thread_pool_t; #include <nxt_service.h> typedef struct nxt_buf_s nxt_buf_t; +typedef struct nxt_port_mmap_s nxt_port_mmap_t; #include <nxt_buf.h> #include <nxt_buf_pool.h> #include <nxt_recvbuf.h> @@ -129,6 +131,7 @@ nxt_thread_extern_data(nxt_thread_t, nxt_thread_context); #include <nxt_fd_event.h> #include <nxt_port.h> +#include <nxt_port_memory.h> #if (NXT_THREADS) #include <nxt_thread_pool.h> #endif diff --git a/src/nxt_master_process.c b/src/nxt_master_process.c index e3d5382a..ed39130b 100644 --- a/src/nxt_master_process.c +++ b/src/nxt_master_process.c @@ -80,14 +80,12 @@ nxt_master_process_port_create(nxt_task_t *task, nxt_runtime_t *rt) nxt_port_t *port; nxt_process_t *process; - process = nxt_runtime_new_process(rt); + process = nxt_runtime_process_get(rt, nxt_pid); if (nxt_slow_path(process == NULL)) { return NXT_ERROR; } - process->pid = nxt_pid; - - port = nxt_array_zero_add(process->ports); + port = nxt_process_port_new(process); if (nxt_slow_path(port == NULL)) { return NXT_ERROR; } @@ -97,8 +95,10 @@ nxt_master_process_port_create(nxt_task_t *task, nxt_runtime_t *rt) return ret; } - port->pid = nxt_pid; port->engine = 0; + port->type = NXT_PROCESS_MASTER; + + nxt_runtime_port_add(rt, port); /* * A master process port. A write port is not closed @@ -220,17 +220,16 @@ nxt_master_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt, * TODO: remove process, init, ports from array on memory and fork failures. */ - process = nxt_runtime_new_process(rt); + process = nxt_runtime_process_new(rt); if (nxt_slow_path(process == NULL)) { return NXT_ERROR; } process->init = init; + master_process = rt->mprocess; + init->master_port = nxt_process_port_first(master_process); - master_process = rt->processes->elts; - init->master_port = master_process->ports->elts; - - port = nxt_array_zero_add(process->ports); + port = nxt_process_port_new(process); if (nxt_slow_path(port == NULL)) { return NXT_ERROR; } @@ -243,6 +242,7 @@ nxt_master_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt, } port->engine = 0; + port->type = init->type; pid = nxt_process_create(task, init); @@ -253,6 +253,11 @@ nxt_master_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt, case 0: /* A worker process, return to the event engine work queue loop. */ + process->pid = nxt_pid; + port->pid = nxt_pid; + + nxt_runtime_process_add(rt, process); + return NXT_AGAIN; default: @@ -260,6 +265,8 @@ nxt_master_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt, process->pid = pid; port->pid = pid; + nxt_runtime_process_add(rt, process); + nxt_port_read_close(port); nxt_port_write_enable(task, port); @@ -272,27 +279,23 @@ nxt_master_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt, void nxt_master_stop_worker_processes(nxt_task_t *task, nxt_runtime_t *rt) { - nxt_uint_t i, n, nprocesses, nports; nxt_port_t *port; nxt_process_t *process; - process = rt->processes->elts; - nprocesses = rt->processes->nelts; + nxt_runtime_process_each(rt, process) + { + if (nxt_pid != process->pid) { + process->init = NULL; - for (i = 0; i < nprocesses; i++) { + nxt_process_port_each(process, port) { - if (nxt_pid != process[i].pid) { - process[i].init = NULL; + (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, + -1, 0, 0, NULL); - port = process[i].ports->elts; - nports = process[i].ports->nelts; - - for (n = 0; n < nports; n++) { - (void) nxt_port_socket_write(task, &port[n], NXT_PORT_MSG_QUIT, - -1, 0, NULL); - } + } nxt_process_port_loop; } } + nxt_runtime_process_loop; } @@ -331,7 +334,7 @@ nxt_master_process_sigusr1_handler(nxt_task_t *task, void *obj, void *data) nxt_int_t ret; nxt_uint_t n; nxt_file_t *file, *new_file; - nxt_runtime_t *rt; + nxt_runtime_t *rt; nxt_array_t *new_files; nxt_mem_pool_t *mp; @@ -473,39 +476,29 @@ nxt_master_process_sigchld_handler(nxt_task_t *task, void *obj, void *data) static void nxt_master_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid) { - nxt_uint_t i, n; nxt_runtime_t *rt; nxt_process_t *process; nxt_process_init_t *init; rt = task->thread->runtime; - process = rt->processes->elts; - n = rt->processes->nelts; - - /* A process[0] is the master process. */ + process = nxt_runtime_process_find(rt, pid); - for (i = 1; i < n; i++) { + if (process) { + init = process->init; - if (pid == process[i].pid) { - init = process[i].init; + /* TODO: free ports fds. */ - /* TODO: free ports fds. */ + nxt_runtime_process_remove(rt, process); - nxt_array_remove(rt->processes, &process[i]); + if (nxt_exiting) { - if (nxt_exiting) { - nxt_debug(task, "processes %d", n); - - if (n == 2) { - nxt_runtime_quit(task); - } - - } else if (init != NULL) { - (void) nxt_master_create_worker_process(task, rt, init); + if (rt->nprocesses == 2) { + nxt_runtime_quit(task); } - return; + } else if (init != NULL) { + (void) nxt_master_create_worker_process(task, rt, init); } } } diff --git a/src/nxt_port.c b/src/nxt_port.c index 1da16587..fc807d1f 100644 --- a/src/nxt_port.c +++ b/src/nxt_port.c @@ -32,25 +32,21 @@ void nxt_port_write(nxt_task_t *task, nxt_runtime_t *rt, nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_buf_t *b) { - nxt_uint_t i, n, nprocesses, nports; nxt_port_t *port; nxt_process_t *process; - process = rt->processes->elts; - nprocesses = rt->processes->nelts; + nxt_runtime_process_each(rt, process) + { + if (nxt_pid != process->pid) { + nxt_process_port_each(process, port) { - for (i = 0; i < nprocesses; i++) { + (void) nxt_port_socket_write(task, port, type, + fd, stream, 0, b); - if (nxt_pid != process[i].pid) { - port = process[i].ports->elts; - nports = process[i].ports->nelts; - - for (n = 0; n < nports; n++) { - (void) nxt_port_socket_write(task, &port[n], type, - fd, stream, b); - } + } nxt_process_port_loop; } } + nxt_runtime_process_loop; } @@ -59,19 +55,19 @@ nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { nxt_port_handler_t *handlers; - if (nxt_fast_path(msg->type <= NXT_PORT_MSG_MAX)) { + if (nxt_fast_path(msg->port_msg.type <= NXT_PORT_MSG_MAX)) { nxt_debug(task, "port %d: message type:%uD", - msg->port->socket.fd, msg->type); + msg->port->socket.fd, msg->port_msg.type); handlers = msg->port->data; - handlers[msg->type](task, msg); + handlers[msg->port_msg.type](task, msg); return; } nxt_log(task, NXT_LOG_CRIT, "port %d: unknown message type:%uD", - msg->port->socket.fd, msg->type); + msg->port->socket.fd, msg->port_msg.type); } @@ -87,28 +83,20 @@ nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt, nxt_port_t *new_port) { nxt_buf_t *b; - nxt_uint_t i, n; nxt_port_t *port; nxt_process_t *process; nxt_port_msg_new_port_t *msg; - n = rt->processes->nelts; - if (n == 0) { - return; - } - nxt_debug(task, "new port %d for process %PI engine %uD", new_port->socket.fd, new_port->pid, new_port->engine); - process = rt->processes->elts; - - for (i = 0; i < n; i++) { - - if (process[i].pid == new_port->pid || process[i].pid == nxt_pid) { + nxt_runtime_process_each(rt, process) + { + if (process->pid == new_port->pid || process->pid == nxt_pid) { continue; } - port = process[i].ports->elts; + port = nxt_process_port_first(process); b = nxt_buf_mem_alloc(port->mem_pool, sizeof(nxt_port_data_t), 0); @@ -116,19 +104,25 @@ nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt, continue; } + nxt_debug(task, "send new port %FD to process %PI", + new_port->socket.fd, process->pid); + b->data = port; b->completion_handler = nxt_port_new_port_buf_completion; b->mem.free += sizeof(nxt_port_msg_new_port_t); msg = (nxt_port_msg_new_port_t *) b->mem.pos; + msg->id = new_port->id; msg->pid = new_port->pid; msg->engine = new_port->engine; msg->max_size = port->max_size; msg->max_share = port->max_share; + msg->type = new_port->type; (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_NEW_PORT, - new_port->socket.fd, 0, b); + new_port->socket.fd, 0, 0, b); } + nxt_runtime_process_loop; } @@ -158,12 +152,15 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) rt = task->thread->runtime; - process = nxt_runtime_new_process(rt); + new_port_msg = (nxt_port_msg_new_port_t *) msg->buf->mem.pos; + msg->buf->mem.pos = msg->buf->mem.free; + + process = nxt_runtime_process_get(rt, new_port_msg->pid); if (nxt_slow_path(process == NULL)) { return; } - port = nxt_array_zero_add(process->ports); + port = nxt_process_port_new(process); if (nxt_slow_path(port == NULL)) { return; } @@ -175,51 +172,74 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) port->mem_pool = mp; - new_port_msg = (nxt_port_msg_new_port_t *) msg->buf->mem.pos; - msg->buf->mem.pos = msg->buf->mem.free; - nxt_debug(task, "new port %d received for process %PI engine %uD", msg->fd, new_port_msg->pid, new_port_msg->engine); - process->pid = new_port_msg->pid; - - port->pid = new_port_msg->pid; + port->id = new_port_msg->id; port->engine = new_port_msg->engine; port->pair[0] = -1; port->pair[1] = msg->fd; port->max_size = new_port_msg->max_size; port->max_share = new_port_msg->max_share; + port->type = new_port_msg->type; nxt_queue_init(&port->messages); port->socket.task = task; + nxt_runtime_port_add(rt, port); + nxt_port_write_enable(task, port); } void +nxt_port_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + nxt_runtime_t *rt; + nxt_process_t *process; + + rt = task->thread->runtime; + + if (nxt_slow_path(msg->fd == -1)) { + nxt_log(task, NXT_LOG_WARN, "invalid fd passed with mmap message"); + + return; + } + + process = nxt_runtime_process_get(rt, msg->port_msg.pid); + if (nxt_slow_path(process == NULL)) { + nxt_log(task, NXT_LOG_WARN, "failed to get process #%PI", + msg->port_msg.pid); + + goto fail_close; + } + + nxt_port_incoming_port_mmap(task, process, msg->fd); + +fail_close: + + close(msg->fd); +} + + +void nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt, nxt_uint_t slot, nxt_fd_t fd) { nxt_buf_t *b; - nxt_uint_t i, n; nxt_port_t *port; nxt_process_t *process; - n = rt->processes->nelts; - if (n == 0) { - return; - } - nxt_debug(task, "change log file #%ui fd:%FD", slot, fd); - process = rt->processes->elts; - - /* process[0] is master process. */ + nxt_runtime_process_each(rt, process) + { + if (nxt_pid == process->pid) { + continue; + } - for (i = 1; i < n; i++) { - port = process[i].ports->elts; + port = nxt_process_port_first(process); b = nxt_buf_mem_alloc(port->mem_pool, sizeof(nxt_port_data_t), 0); if (nxt_slow_path(b == NULL)) { @@ -230,8 +250,9 @@ nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt, nxt_uint_t slot, b->mem.free += sizeof(nxt_uint_t); (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_CHANGE_FILE, - fd, 0, b); + fd, 0, 0, b); } + nxt_runtime_process_loop; } @@ -269,11 +290,17 @@ nxt_port_change_log_file_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) void nxt_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { + size_t dump_size; nxt_buf_t *b; b = msg->buf; + dump_size = b->mem.free - b->mem.pos; + + if (dump_size > 300) { + dump_size = 300; + } - nxt_debug(task, "data: %*s", b->mem.free - b->mem.pos, b->mem.pos); + nxt_debug(task, "data: %*s", dump_size, b->mem.pos); b->mem.pos = b->mem.free; } diff --git a/src/nxt_port.h b/src/nxt_port.h index 8a9b8926..76775660 100644 --- a/src/nxt_port.h +++ b/src/nxt_port.h @@ -8,11 +8,28 @@ #define _NXT_PORT_H_INCLUDED_ +typedef enum { + NXT_PORT_MSG_QUIT = 0, + NXT_PORT_MSG_NEW_PORT, + NXT_PORT_MSG_CHANGE_FILE, + NXT_PORT_MSG_MMAP, + NXT_PORT_MSG_DATA, +} nxt_port_msg_type_t; + +#define NXT_PORT_MSG_MAX NXT_PORT_MSG_DATA + + +/* Passed as a first iov chunk. */ typedef struct { - uint32_t stream; + uint32_t stream; + nxt_pid_t pid; + nxt_port_id_t reply_port; - uint16_t type; - uint8_t last; /* 1 bit */ + nxt_port_msg_type_t type:8; + uint8_t last; /* 1 bit */ + + /* Message data send using mmap, next chunk is a nxt_port_mmap_msg_t. */ + uint8_t mmap; /* 1 bit */ } nxt_port_msg_t; @@ -26,12 +43,10 @@ typedef struct { struct nxt_port_recv_msg_s { - uint32_t stream; - uint16_t type; - nxt_fd_t fd; nxt_buf_t *buf; nxt_port_t *port; + nxt_port_msg_t port_msg; }; @@ -39,6 +54,8 @@ struct nxt_port_s { /* Must be the first field. */ nxt_fd_event_t socket; + nxt_queue_link_t link; + nxt_queue_t messages; /* of nxt_port_send_msg_t */ /* Maximum size of message part. */ @@ -53,26 +70,22 @@ struct nxt_port_s { nxt_buf_t *free_bufs; nxt_socket_t pair[2]; + nxt_port_id_t id; nxt_pid_t pid; uint32_t engine; -}; - - -#define NXT_PORT_MSG_MAX NXT_PORT_MSG_DATA -typedef enum { - NXT_PORT_MSG_QUIT = 0, - NXT_PORT_MSG_NEW_PORT, - NXT_PORT_MSG_CHANGE_FILE, - NXT_PORT_MSG_DATA, -} nxt_port_msg_type_e; + nxt_process_type_t type:8; + nxt_process_t *process; +}; typedef struct { + nxt_port_id_t id; nxt_pid_t pid; uint32_t engine; size_t max_size; size_t max_share; + nxt_process_type_t type:8; } nxt_port_msg_new_port_t; @@ -94,7 +107,8 @@ void nxt_port_write_close(nxt_port_t *port); void nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port); void nxt_port_read_close(nxt_port_t *port); nxt_int_t nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, - nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_buf_t *b); + nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, + nxt_buf_t *b); void nxt_port_create(nxt_thread_t *thread, nxt_port_t *port, nxt_port_handler_t *handlers); @@ -109,6 +123,7 @@ void nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); void nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); void nxt_port_change_log_file_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); +void nxt_port_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); void nxt_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); void nxt_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); diff --git a/src/nxt_port_memory.c b/src/nxt_port_memory.c new file mode 100644 index 00000000..447ef138 --- /dev/null +++ b/src/nxt_port_memory.c @@ -0,0 +1,700 @@ + +#include <nxt_main.h> + +#if (NXT_HAVE_MEMFD_CREATE) + +#include <linux/memfd.h> +#include <unistd.h> +#include <sys/syscall.h> + +#endif + +#define PORT_MMAP_CHUNK_SIZE (1024 * 16) +#define PORT_MMAP_HEADER_SIZE (1024 * 4) +#define PORT_MMAP_SIZE (PORT_MMAP_HEADER_SIZE + 1024 * 1024 * 10) + +#define PORT_MMAP_CHUNK_COUNT \ + ( (PORT_MMAP_SIZE - PORT_MMAP_HEADER_SIZE) / PORT_MMAP_CHUNK_SIZE ) + + +typedef uint32_t nxt_chunk_id_t; + +typedef nxt_atomic_uint_t nxt_free_map_t; + +#define FREE_BITS (sizeof(nxt_free_map_t) * 8) + +#define FREE_IDX(nchunk) ((nchunk) / FREE_BITS) + +#define FREE_MASK(nchunk) \ + ( 1ULL << ( (nchunk) % FREE_BITS ) ) + +#define MAX_FREE_IDX FREE_IDX(PORT_MMAP_CHUNK_COUNT) + + +/* Mapped at the start of shared memory segment. */ +struct nxt_port_mmap_header_s { + nxt_free_map_t free_map[MAX_FREE_IDX]; +}; + + +/* + * Element of nxt_process_t.incoming/outgoing, shared memory segment + * descriptor. + */ +struct nxt_port_mmap_s { + uint32_t id; + nxt_fd_t fd; + nxt_pid_t pid; /* For sanity check. */ + union { + void *mem; + nxt_port_mmap_header_t *hdr; + } u; +}; + + +/* Passed as a second iov chunk when 'mmap' bit in nxt_port_msg_t is 1. */ +typedef struct { + uint32_t mmap_id; /* Mmap index in nxt_process_t.outgoing. */ + nxt_chunk_id_t chunk_id; /* Mmap chunk index. */ + uint32_t size; /* Payload data size. */ +} nxt_port_mmap_msg_t; + + +static nxt_bool_t +nxt_port_mmap_get_free_chunk(nxt_port_mmap_t *port_mmap, nxt_chunk_id_t *c); + +#define nxt_port_mmap_get_chunk_busy(hdr, c) \ + ((hdr->free_map[FREE_IDX(c)] & FREE_MASK(c)) == 0) + +nxt_inline void +nxt_port_mmap_set_chunk_busy(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c); + +nxt_inline void +nxt_port_mmap_set_chunk_free(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c); + +#define nxt_port_mmap_chunk_id(port_mmap, b) \ + ((((u_char *) (b) - (u_char *) (port_mmap->u.mem)) - \ + PORT_MMAP_HEADER_SIZE) / PORT_MMAP_CHUNK_SIZE) + +#define nxt_port_mmap_chunk_start(port_mmap, chunk) \ + (((u_char *) (port_mmap->u.mem)) + PORT_MMAP_HEADER_SIZE + \ + (chunk) * PORT_MMAP_CHUNK_SIZE) + + +void +nxt_port_mmap_destroy(nxt_port_mmap_t *port_mmap) +{ + if (port_mmap->u.mem != NULL) { + nxt_mem_munmap(port_mmap->u.mem, PORT_MMAP_SIZE); + port_mmap->u.mem = NULL; + } + + if (port_mmap->fd != -1) { + close(port_mmap->fd); + port_mmap->fd = -1; + } +} + + +static void +nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data) +{ + u_char *p; + nxt_buf_t *b; + nxt_chunk_id_t c; + nxt_mem_pool_t *mp; + nxt_port_mmap_t *port_mmap; + nxt_port_mmap_header_t *hdr; + + b = obj; + + nxt_debug(task, "mmap buf completion: %p %p", b, b->mem.start); + + mp = b->data; + + port_mmap = (nxt_port_mmap_t *) b->parent; + hdr = port_mmap->u.hdr; + + if (b->is_port_mmap_sent && b->mem.pos > b->mem.start) { + /* + * Chunks until b->mem.pos has been sent to other side, + * let's release rest (if any). + */ + p = b->mem.pos - 1; + c = nxt_port_mmap_chunk_id(port_mmap, p) + 1; + p = nxt_port_mmap_chunk_start(port_mmap, c); + } else { + p = b->mem.start; + c = nxt_port_mmap_chunk_id(port_mmap, p); + } + + while (p < b->mem.end) { + nxt_port_mmap_set_chunk_free(hdr, c); + + p += PORT_MMAP_CHUNK_SIZE; + c++; + } + + nxt_buf_free(mp, b); +} + + +static nxt_bool_t +nxt_port_mmap_get_free_chunk(nxt_port_mmap_t *port_mmap, nxt_chunk_id_t *c) +{ + int ffs; + size_t i; + nxt_free_map_t bits; + nxt_free_map_t *free_map; + + free_map = port_mmap->u.hdr->free_map; + + for (i = 0; i < MAX_FREE_IDX; i++) { + bits = free_map[i]; + if (bits == 0) { + continue; + } + + ffs = __builtin_ffsll(bits); + if (ffs != 0) { + *c = i * FREE_BITS + ffs - 1; + return 1; + } + } + + return 0; +} + + +nxt_inline void +nxt_port_mmap_set_chunk_busy(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c) +{ + nxt_atomic_and_fetch(hdr->free_map + FREE_IDX(c), ~FREE_MASK(c)); + + nxt_thread_log_debug("set_chunk_busy: hdr %p; b %D", hdr, c); +} + + +nxt_inline void +nxt_port_mmap_set_chunk_free(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c) +{ + nxt_atomic_or_fetch(hdr->free_map + FREE_IDX(c), FREE_MASK(c)); + + nxt_thread_log_debug("set_chunk_free: hdr %p; b %D", hdr, c); +} + + +nxt_port_mmap_t * +nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process, + nxt_fd_t fd) +{ + struct stat mmap_stat; + nxt_port_mmap_t *port_mmap; + + nxt_debug(task, "got new mmap fd #%FD from process %PI", + fd, process->pid); + + if (fstat(fd, &mmap_stat) == -1) { + nxt_log(task, NXT_LOG_WARN, "fstat(%FD) failed %E", fd, nxt_errno); + + return NULL; + } + + if (process->incoming == NULL) { + process->incoming = nxt_array_create(process->mem_pool, 1, + sizeof(nxt_port_mmap_t)); + } + + if (nxt_slow_path(process->incoming == NULL)) { + nxt_log(task, NXT_LOG_WARN, "failed to allocate incoming array"); + + return NULL; + } + + port_mmap = nxt_array_zero_add(process->incoming); + if (nxt_slow_path(port_mmap == NULL)) { + nxt_log(task, NXT_LOG_WARN, "failed to add mmap to incoming array"); + + return NULL; + } + + port_mmap->id = process->incoming->nelts - 1; + port_mmap->fd = -1; + port_mmap->pid = process->pid; + port_mmap->u.mem = nxt_mem_mmap(NULL, mmap_stat.st_size, + PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + + if (nxt_slow_path(port_mmap->u.mem == MAP_FAILED)) { + nxt_log(task, NXT_LOG_WARN, "mmap() failed %E", nxt_errno); + + port_mmap->u.mem = NULL; + + return NULL; + } + + return port_mmap; +} + + +static void +nxt_port_mmap_send_fd_buf_completion(nxt_task_t *task, void *obj, void *data) +{ + nxt_buf_t *b; + nxt_port_t *port; + nxt_port_mmap_t *port_mmap; + + b = obj; + port = b->data; + port_mmap = (nxt_port_mmap_t *) b->parent; + + nxt_debug(task, "mmap fd %FD sent to %PI", port_mmap->fd, port->pid); + + close(port_mmap->fd); + port_mmap->fd = -1; + + nxt_buf_free(port->mem_pool, b); +} + + +static nxt_port_mmap_t * +nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, + nxt_port_t *port) +{ + u_char *p, name[64]; + nxt_buf_t *b; + nxt_port_mmap_t *port_mmap; + nxt_port_mmap_header_t *hdr; + + if (process->outgoing == NULL) { + process->outgoing = nxt_array_create(process->mem_pool, 1, + sizeof(nxt_port_mmap_t)); + } + + if (nxt_slow_path(process->outgoing == NULL)) { + nxt_log(task, NXT_LOG_WARN, "failed to allocate outgoing array"); + + return NULL; + } + + port_mmap = nxt_array_zero_add(process->outgoing); + if (nxt_slow_path(port_mmap == NULL)) { + nxt_log(task, NXT_LOG_WARN, + "failed to add port mmap to outgoing array"); + + return NULL; + } + + port_mmap->id = process->outgoing->nelts - 1; + port_mmap->pid = process->pid; + + p = nxt_sprintf(name, name + sizeof(name), "/nginext.%PI.%PT.%D", + nxt_pid, task->thread->tid, nxt_random(&nxt_random_data)); + *p = '\0'; + +#if (NXT_HAVE_MEMFD_CREATE) + port_mmap->fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC); + + if (nxt_slow_path(port_mmap->fd == -1)) { + nxt_log(task, NXT_LOG_CRIT, "memfd_create(%s) failed %E", + name, nxt_errno); + + goto remove_fail; + } + + nxt_debug(task, "memfd_create(%s): %FD", name, port_mmap->fd); + +#elif (NXT_HAVE_SHM_OPEN) + shm_unlink((char *) name); // just in case + + port_mmap->fd = shm_open((char *) name, O_CREAT | O_EXCL | O_RDWR, + S_IRUSR | S_IWUSR); + + nxt_debug(task, "shm_open(%s): %FD", name, port_mmap->fd); + + if (nxt_slow_path(port_mmap->fd == -1)) { + nxt_log(task, NXT_LOG_CRIT, "shm_open(%s) failed %E", name, nxt_errno); + + goto remove_fail; + } + + if (nxt_slow_path(shm_unlink((char *) name) == -1)) { + nxt_log(task, NXT_LOG_WARN, "shm_unlink(%s) failed %E", name, + nxt_errno); + } +#endif + + if (nxt_slow_path(ftruncate(port_mmap->fd, PORT_MMAP_SIZE) == -1)) { + nxt_log(task, NXT_LOG_WARN, "ftruncate() failed %E", nxt_errno); + + goto remove_fail; + } + + port_mmap->u.mem = nxt_mem_mmap(NULL, PORT_MMAP_SIZE, + PROT_READ | PROT_WRITE, MAP_SHARED, + port_mmap->fd, 0); + + if (nxt_slow_path(port_mmap->u.mem == MAP_FAILED)) { + goto remove_fail; + } + + /* Init segment header. */ + hdr = port_mmap->u.hdr; + + nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map)); + + /* Mark as busy chunk followed the last available chunk. */ + nxt_port_mmap_set_chunk_busy(hdr, PORT_MMAP_CHUNK_COUNT); + + nxt_debug(task, "send mmap fd %FD to process %PI", port_mmap->fd, + port->pid); + + b = nxt_buf_mem_alloc(port->mem_pool, 0, 0); + b->completion_handler = nxt_port_mmap_send_fd_buf_completion; + b->data = port; + b->parent = port_mmap; + + /* TODO handle error */ + (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, port_mmap->fd, + 0, 0, b); + + nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> %PI", + port_mmap->id, nxt_pid, process->pid); + + return port_mmap; + +remove_fail: + + nxt_array_remove(process->outgoing, port_mmap); + + return NULL; +} + + +static nxt_port_mmap_t * +nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c, + size_t size) +{ + nxt_array_t *outgoing; + nxt_process_t *process; + nxt_port_mmap_t *port_mmap; + nxt_port_mmap_t *end_port_mmap; + + process = nxt_runtime_process_get(task->thread->runtime, port->pid); + if (nxt_slow_path(process == NULL)) { + return NULL; + } + + *c = 0; + + if (process->outgoing == NULL) { + return nxt_port_new_port_mmap(task, process, port); + } + + outgoing = process->outgoing; + port_mmap = outgoing->elts; + end_port_mmap = port_mmap + outgoing->nelts; + + while (port_mmap < end_port_mmap) { + + if (nxt_port_mmap_get_free_chunk(port_mmap, c)) { + return port_mmap; + } + + port_mmap++; + } + + /* TODO introduce port_mmap limit and release wait. */ + return nxt_port_new_port_mmap(task, process, port); +} + + +static nxt_port_mmap_t * +nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id) +{ + nxt_array_t *incoming; + nxt_process_t *process; + nxt_port_mmap_t *port_mmap; + + process = nxt_runtime_process_get(task->thread->runtime, spid); + if (nxt_slow_path(process == NULL)) { + return NULL; + } + + incoming = process->incoming; + if (nxt_slow_path(incoming == NULL)) { + /* TODO add warning */ + return NULL; + } + + if (nxt_slow_path(incoming->nelts <= id)) { + /* TODO add warning */ + return NULL; + } + + port_mmap = incoming->elts; + + return port_mmap + id; +} + + +nxt_buf_t * +nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size) +{ + size_t nchunks; + nxt_buf_t *b; + nxt_chunk_id_t c; + nxt_port_mmap_t *port_mmap; + nxt_port_mmap_header_t *hdr; + + nxt_debug(task, "request %z bytes shm buffer", size); + + b = nxt_mem_cache_zalloc0(port->mem_pool, NXT_BUF_PORT_MMAP_SIZE); + if (nxt_slow_path(b == NULL)) { + return NULL; + } + + b->data = port->mem_pool; + b->completion_handler = nxt_port_mmap_buf_completion; + b->size = NXT_BUF_PORT_MMAP_SIZE; + + nxt_buf_set_port_mmap(b); + + port_mmap = nxt_port_mmap_get(task, port, &c, size); + if (nxt_slow_path(port_mmap == NULL)) { + nxt_buf_free(port->mem_pool, b); + return NULL; + } + + hdr = port_mmap->u.hdr; + + b->parent = port_mmap; + b->mem.start = nxt_port_mmap_chunk_start(port_mmap, c); + b->mem.pos = b->mem.start; + b->mem.free = b->mem.start; + b->mem.end = b->mem.start + PORT_MMAP_CHUNK_SIZE; + + nxt_port_mmap_set_chunk_busy(hdr, c); + + nchunks = size / PORT_MMAP_CHUNK_SIZE; + if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) { + nchunks++; + } + + c++; + nchunks--; + + /* Try to acquire as much chunks as required. */ + while (nchunks > 0) { + + if (nxt_port_mmap_get_chunk_busy(hdr, c)) { + break; + } + nxt_port_mmap_set_chunk_busy(hdr, c); + + b->mem.end += PORT_MMAP_CHUNK_SIZE; + c++; + nchunks--; + } + + return b; +} + + +static nxt_buf_t * +nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port, + nxt_pid_t spid, nxt_port_mmap_msg_t *mmap_msg) +{ + size_t nchunks; + nxt_buf_t *b; + nxt_port_mmap_t *port_mmap; + + b = nxt_mem_cache_zalloc0(port->mem_pool, NXT_BUF_PORT_MMAP_SIZE); + if (nxt_slow_path(b == NULL)) { + return NULL; + } + + b->data = port->mem_pool; + b->completion_handler = nxt_port_mmap_buf_completion; + b->size = NXT_BUF_PORT_MMAP_SIZE; + + nxt_buf_set_port_mmap(b); + + port_mmap = nxt_port_get_port_incoming_mmap(task, spid, mmap_msg->mmap_id); + if (nxt_slow_path(port_mmap == NULL)) { + nxt_buf_free(port->mem_pool, b); + return NULL; + } + + nchunks = mmap_msg->size / PORT_MMAP_CHUNK_SIZE; + if ((mmap_msg->size % PORT_MMAP_CHUNK_SIZE) != 0) { + nchunks++; + } + + b->mem.start = nxt_port_mmap_chunk_start(port_mmap, mmap_msg->chunk_id); + b->mem.pos = b->mem.start; + b->mem.free = b->mem.start + mmap_msg->size; + b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE; + + b->parent = port_mmap; + + return b; +} + + +void +nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port, + nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb) +{ + size_t bsize; + nxt_buf_t *b, *bmem; + nxt_uint_t i; + nxt_port_mmap_t *port_mmap; + nxt_port_mmap_msg_t *mmap_msg; + + nxt_debug(task, "prepare %z bytes message for transfer to process %PI " + "via shared memory", sb->size, port->pid); + + bsize = sb->niov * sizeof(nxt_port_mmap_msg_t); + + b = nxt_buf_mem_alloc(port->mem_pool, bsize, 0); + if (nxt_slow_path(b == NULL)) { + return; + } + + mmap_msg = (nxt_port_mmap_msg_t *) b->mem.start; + bmem = msg->buf; + + for (i = 0; i < sb->niov; i++, mmap_msg++) { + + /* Lookup buffer which starts current iov_base. */ + while (bmem && sb->iobuf[i].iov_base != bmem->mem.pos) { + bmem = bmem->next; + } + + if (nxt_slow_path(bmem == NULL)) { + nxt_log_error(NXT_LOG_ERR, task->log, "failed to find buf for " + "iobuf[%d]", i); + return; + /* TODO clear b and exit */ + } + + port_mmap = (nxt_port_mmap_t *) bmem->parent; + + mmap_msg->mmap_id = port_mmap->id; + mmap_msg->chunk_id = nxt_port_mmap_chunk_id(port_mmap, bmem->mem.pos); + mmap_msg->size = sb->iobuf[i].iov_len; + + nxt_debug(task, "mmap_msg={%D, %D, %D} to %PI", + mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size, + port->pid); + } + + msg->buf = b; + b->mem.free += bsize; + + sb->iobuf[0].iov_base = b->mem.pos; + sb->iobuf[0].iov_len = bsize; + sb->niov = 1; + sb->size = bsize; + + msg->port_msg.mmap = 1; +} + + +void +nxt_port_mmap_read(nxt_task_t *task, nxt_port_t *port, + nxt_port_recv_msg_t *msg, size_t size) +{ + nxt_buf_t *b, **pb; + nxt_port_mmap_msg_t *end, *mmap_msg; + + b = msg->buf; + + mmap_msg = (nxt_port_mmap_msg_t *) b->mem.pos; + end = (nxt_port_mmap_msg_t *) b->mem.free; + + pb = &msg->buf; + + while (mmap_msg < end) { + nxt_debug(task, "mmap_msg={%D, %D, %D} from %PI", + mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size, + msg->port_msg.pid); + + *pb = nxt_port_mmap_get_incoming_buf(task, port, msg->port_msg.pid, + mmap_msg); + if (nxt_slow_path(*pb == NULL)) { + nxt_log_error(NXT_LOG_ERR, task->log, "failed to get mmap buffer"); + + break; + } + + pb = &(*pb)->next; + mmap_msg++; + } + + /* Mark original buf as complete. */ + b->mem.pos += nxt_buf_used_size(b); +} + + +nxt_port_method_t +nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b) +{ + nxt_port_mmap_t *port_mmap; + nxt_port_method_t m; + + m = NXT_PORT_METHOD_ANY; + + for (; b != NULL; b = b->next) { + if (nxt_buf_used_size(b) == 0) { + /* empty buffers does not affect method */ + continue; + } + + if (nxt_buf_is_port_mmap(b)) { + port_mmap = (nxt_port_mmap_t *) b->parent; + + if (m == NXT_PORT_METHOD_PLAIN) { + nxt_log_error(NXT_LOG_ERR, task->log, + "mixing plain and mmap buffers, " + "using plain mode"); + + break; + } + + if (port->pid != port_mmap->pid) { + nxt_log_error(NXT_LOG_ERR, task->log, + "send mmap buffer for %PI to %PI, " + "using plain mode", port_mmap->pid, port->pid); + + m = NXT_PORT_METHOD_PLAIN; + + break; + } + + if (m == NXT_PORT_METHOD_ANY) { + nxt_debug(task, "using mmap mode"); + + m = NXT_PORT_METHOD_MMAP; + } + } else { + if (m == NXT_PORT_METHOD_MMAP) { + nxt_log_error(NXT_LOG_ERR, task->log, + "mixing mmap and plain buffers, " + "switching to plain mode"); + + m = NXT_PORT_METHOD_PLAIN; + + break; + } + + if (m == NXT_PORT_METHOD_ANY) { + nxt_debug(task, "using plain mode"); + + m = NXT_PORT_METHOD_PLAIN; + } + } + } + + return m; +} diff --git a/src/nxt_port_memory.h b/src/nxt_port_memory.h new file mode 100644 index 00000000..9ad4e2a4 --- /dev/null +++ b/src/nxt_port_memory.h @@ -0,0 +1,50 @@ + +#ifndef _NXT_PORT_MEMORY_H_INCLUDED_ +#define _NXT_PORT_MEMORY_H_INCLUDED_ + +#define PORT_MMAP_MIN_SIZE (3 * sizeof(uint32_t)) + +typedef struct nxt_port_mmap_header_s nxt_port_mmap_header_t; + +void +nxt_port_mmap_destroy(nxt_port_mmap_t *port_mmap); + +/* + * Allocates nxt_but_t structure from port's mem_pool, assigns this buf 'mem' + * pointers to first available shared mem bucket(s). 'size' used as a hint to + * acquire several successive buckets if possible. + * + * This function assumes that current thread operates the 'port' exclusively. + */ +nxt_buf_t * +nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size); + +nxt_port_mmap_t * +nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process, + nxt_fd_t fd); + +void +nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port, + nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb); + +nxt_inline void +nxt_port_mmap_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b) { + nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent); +} + +void +nxt_port_mmap_read(nxt_task_t *task, nxt_port_t *port, + nxt_port_recv_msg_t *msg, size_t size); + +enum nxt_port_method_e { + NXT_PORT_METHOD_ANY = 0, + NXT_PORT_METHOD_PLAIN, + NXT_PORT_METHOD_MMAP +}; + +typedef enum nxt_port_method_e nxt_port_method_t; + +nxt_port_method_t +nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b); + +#endif /* _NXT_PORT_MEMORY_H_INCLUDED_ */ diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c index 671c3334..51d88ff7 100644 --- a/src/nxt_port_socket.c +++ b/src/nxt_port_socket.c @@ -10,7 +10,7 @@ static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data); static void nxt_port_read_handler(nxt_task_t *task, void *obj, void *data); static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, - nxt_port_msg_t *msg, nxt_fd_t fd, nxt_buf_t *b, size_t size); + nxt_port_recv_msg_t *msg, size_t size); static nxt_buf_t *nxt_port_buf_alloc(nxt_port_t *port); static void nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b); static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data); @@ -137,7 +137,7 @@ nxt_port_write_close(nxt_port_t *port) nxt_int_t nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, - nxt_fd_t fd, uint32_t stream, nxt_buf_t *b) + nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b) { nxt_queue_link_t *link; nxt_port_send_msg_t *msg; @@ -169,8 +169,11 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, msg->share = 0; msg->port_msg.stream = stream; + msg->port_msg.pid = nxt_pid; + msg->port_msg.reply_port = reply_port; msg->port_msg.type = type; msg->port_msg.last = 0; + msg->port_msg.mmap = 0; nxt_queue_insert_tail(&port->messages, &msg->link); @@ -186,12 +189,15 @@ static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) { ssize_t n; - nxt_uint_t niov; nxt_port_t *port; struct iovec iov[NXT_IOBUF_MAX]; nxt_queue_link_t *link; nxt_port_send_msg_t *msg; nxt_sendbuf_coalesce_t sb; + nxt_port_method_t m; + + size_t plain_size; + nxt_buf_t *plain_buf; port = obj; @@ -213,27 +219,59 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) sb.nmax = NXT_IOBUF_MAX - 1; sb.sync = 0; sb.last = 0; - sb.size = sizeof(nxt_port_msg_t); + sb.size = 0; sb.limit = port->max_size; - niov = nxt_sendbuf_mem_coalesce(task, &sb); + m = nxt_port_mmap_get_method(task, port, msg->buf); + + if (m == NXT_PORT_METHOD_MMAP) { + sb.limit = (1ULL << 31) - 1; + } + + nxt_sendbuf_mem_coalesce(task, &sb); + + plain_size = sb.size; + plain_buf = msg->buf; + + /* + * Send through mmap enabled only when payload + * is bigger than PORT_MMAP_MIN_SIZE. + */ + if (m == NXT_PORT_METHOD_MMAP && plain_size > PORT_MMAP_MIN_SIZE) { + nxt_port_mmap_write(task, port, msg, &sb); + + } else { + m = NXT_PORT_METHOD_PLAIN; + } msg->port_msg.last = sb.last; - n = nxt_socketpair_send(&port->socket, msg->fd, iov, niov + 1); + n = nxt_socketpair_send(&port->socket, msg->fd, iov, sb.niov + 1); if (n > 0) { - if (nxt_slow_path((size_t) n != sb.size)) { + if (nxt_slow_path((size_t) n != sb.size + iov[0].iov_len)) { nxt_log(task, NXT_LOG_CRIT, "port %d: short write: %z instead of %uz", - port->socket.fd, n, sb.size); + port->socket.fd, n, sb.size + iov[0].iov_len); goto fail; } + if (msg->buf != plain_buf) { + /* + * Complete crafted mmap_msgs buf and restore msg->buf + * for regular completion call. + */ + nxt_port_mmap_completion(task, + port->socket.write_work_queue, + msg->buf); + + msg->buf = plain_buf; + } + msg->buf = nxt_sendbuf_completion(task, port->socket.write_work_queue, msg->buf, - n - sizeof(nxt_port_msg_t)); + plain_size); if (msg->buf != NULL) { /* @@ -301,14 +339,13 @@ nxt_port_read_close(nxt_port_t *port) static void nxt_port_read_handler(nxt_task_t *task, void *obj, void *data) { - ssize_t n; - nxt_fd_t fd; - nxt_buf_t *b; - nxt_port_t *port; - struct iovec iov[2]; - nxt_port_msg_t msg; + ssize_t n; + nxt_buf_t *b; + nxt_port_t *port; + struct iovec iov[2]; + nxt_port_recv_msg_t msg; - port = obj; + port = msg.port = obj; for ( ;; ) { @@ -318,24 +355,21 @@ nxt_port_read_handler(nxt_task_t *task, void *obj, void *data) /* TODO: disable event for some time */ } - iov[0].iov_base = &msg; + iov[0].iov_base = &msg.port_msg; iov[0].iov_len = sizeof(nxt_port_msg_t); iov[1].iov_base = b->mem.pos; iov[1].iov_len = port->max_size; - n = nxt_socketpair_recv(&port->socket, &fd, iov, 2); + n = nxt_socketpair_recv(&port->socket, &msg.fd, iov, 2); if (n > 0) { - nxt_port_read_msg_process(task, port, &msg, fd, b, n); - if (b->mem.pos == b->mem.free) { + msg.buf = b; - if (b->next != NULL) { - /* A sync buffer */ - nxt_buf_free(port->mem_pool, b->next); - } + nxt_port_read_msg_process(task, port, &msg, n); + if (b->mem.pos == b->mem.free) { nxt_port_buf_free(port, b); } @@ -364,10 +398,11 @@ nxt_port_read_handler(nxt_task_t *task, void *obj, void *data) static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, - nxt_port_msg_t *msg, nxt_fd_t fd, nxt_buf_t *b, size_t size) + nxt_port_recv_msg_t *msg, size_t size) { - nxt_buf_t *sync; - nxt_port_recv_msg_t recv_msg; + nxt_buf_t *b; + nxt_buf_t *orig_b; + nxt_buf_t **last_next; if (nxt_slow_path(size < sizeof(nxt_port_msg_t))) { nxt_log(port->socket.task, NXT_LOG_CRIT, @@ -375,31 +410,56 @@ nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, goto fail; } - recv_msg.stream = msg->stream; - recv_msg.type = msg->type; - recv_msg.fd = fd; - recv_msg.buf = b; - recv_msg.port = port; + /* adjust size to actual buffer used size */ + size -= sizeof(nxt_port_msg_t); + + b = orig_b = msg->buf; + b->mem.free += size; + + if (msg->port_msg.mmap) { + nxt_port_mmap_read(task, port, msg, size); + b = msg->buf; + } - b->mem.free += size - sizeof(nxt_port_msg_t); + last_next = &b->next; - if (msg->last) { - sync = nxt_buf_sync_alloc(port->mem_pool, NXT_BUF_SYNC_LAST); - if (nxt_slow_path(sync == NULL)) { + if (msg->port_msg.last) { + /* find reference to last next, the NULL one */ + while (*last_next) { + last_next = &(*last_next)->next; + } + + *last_next = nxt_buf_sync_alloc(port->mem_pool, NXT_BUF_SYNC_LAST); + if (nxt_slow_path(*last_next == NULL)) { goto fail; } + } + + port->handler(task, msg); - b->next = sync; + if (*last_next != NULL) { + /* A sync buffer */ + nxt_buf_free(port->mem_pool, *last_next); + *last_next = NULL; } - port->handler(task, &recv_msg); + if (orig_b != b) { + /* complete mmap buffers */ + for (; b && nxt_buf_used_size(b) == 0; + b = b->next) { + nxt_debug(task, "complete buffer %p", b); + + nxt_work_queue_add(port->socket.read_work_queue, + b->completion_handler, task, b, b->parent); + } + } return; fail: - if (fd != -1) { - nxt_fd_close(fd); + if (msg->fd != -1) { + nxt_fd_close(msg->fd); } } @@ -415,6 +475,7 @@ nxt_port_buf_alloc(nxt_port_t *port) b->mem.pos = b->mem.start; b->mem.free = b->mem.start; + b->next = NULL; } else { b = nxt_buf_mem_alloc(port->mem_pool, port->max_size, 0); diff --git a/src/nxt_process.c b/src/nxt_process.c index d24be580..64fd1f19 100644 --- a/src/nxt_process.c +++ b/src/nxt_process.c @@ -506,3 +506,22 @@ nxt_user_cred_set(nxt_task_t *task, nxt_user_cred_t *uc) return NXT_OK; } + + +nxt_port_t * +nxt_process_port_new(nxt_process_t *process) +{ + nxt_port_t *port; + + port = nxt_mem_cache_zalloc0(process->mem_pool, sizeof(nxt_port_t)); + if (nxt_fast_path(port != NULL)) { + port->id = process->last_port_id++; + port->pid = process->pid; + port->process = process; + + nxt_process_port_add(process, port); + } + + return port; +} + diff --git a/src/nxt_process.h b/src/nxt_process.h index 92f673a2..b6262106 100644 --- a/src/nxt_process.h +++ b/src/nxt_process.h @@ -49,9 +49,15 @@ struct nxt_process_init_s { typedef struct { + nxt_mem_pool_t *mem_pool; + nxt_pid_t pid; - nxt_array_t *ports; /* of nxt_port_t */ + nxt_queue_t ports; /* of nxt_port_t */ + nxt_port_id_t last_port_id; + nxt_process_init_t *init; + nxt_array_t *incoming; /* of nxt_mmap_t */ + nxt_array_t *outgoing; /* of nxt_mmap_t */ } nxt_process_t; @@ -65,6 +71,22 @@ NXT_EXPORT void nxt_nanosleep(nxt_nsec_t ns); NXT_EXPORT void nxt_process_arguments(nxt_task_t *task, char **orig_argv, char ***orig_envp); +NXT_EXPORT nxt_port_t * nxt_process_port_new(nxt_process_t *process); + +#define nxt_process_port_remove(port) \ + nxt_queue_remove(&port->link) + +#define nxt_process_port_first(process) \ + nxt_queue_link_data(nxt_queue_first(&process->ports), nxt_port_t, link) + +#define nxt_process_port_add(process, port) \ + nxt_queue_insert_tail(&process->ports, &port->link) + +#define nxt_process_port_each(process, port) \ + nxt_queue_each(port, &process->ports, nxt_port_t, link) + +#define nxt_process_port_loop \ + nxt_queue_loop #if (NXT_HAVE_SETPROCTITLE) diff --git a/src/nxt_queue.h b/src/nxt_queue.h index e8f7c245..f1efd328 100644 --- a/src/nxt_queue.h +++ b/src/nxt_queue.h @@ -216,4 +216,19 @@ NXT_EXPORT void nxt_queue_sort(nxt_queue_t *queue, const nxt_queue_link_t *), const void *data); +#define nxt_queue_each(elt, queue, type, link) \ + do { \ + nxt_queue_link_t *_lnk; \ + \ + for (_lnk = nxt_queue_first(queue); \ + _lnk != nxt_queue_tail(queue); \ + _lnk = nxt_queue_next(_lnk)) { \ + \ + elt = nxt_queue_link_data(_lnk, type, link); \ + +#define nxt_queue_loop \ + } \ + } while(0) + + #endif /* _NXT_QUEUE_H_INCLUDED_ */ diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c index 84812cae..8337712a 100644 --- a/src/nxt_runtime.c +++ b/src/nxt_runtime.c @@ -16,7 +16,6 @@ static nxt_int_t nxt_runtime_inherited_listen_sockets(nxt_task_t *task, static nxt_int_t nxt_runtime_systemd_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt); static nxt_int_t nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt); -static nxt_int_t nxt_runtime_processes(nxt_runtime_t *rt); static nxt_int_t nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt); static void nxt_runtime_start(nxt_task_t *task, void *obj, void *data); static void nxt_runtime_initial_start(nxt_task_t *task); @@ -105,10 +104,6 @@ nxt_runtime_create(nxt_task_t *task) goto fail; } - if (nxt_slow_path(nxt_runtime_processes(rt) != NXT_OK)) { - goto fail; - } - if (nxt_slow_path(nxt_runtime_thread_pools(task->thread, rt) != NXT_OK)) { goto fail; } @@ -294,18 +289,6 @@ nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt) static nxt_int_t -nxt_runtime_processes(nxt_runtime_t *rt) -{ - rt->processes = nxt_array_create(rt->mem_pool, 4, sizeof(nxt_process_t)); - if (nxt_slow_path(rt->processes == NULL)) { - return NXT_ERROR; - } - - return NXT_OK; -} - - -static nxt_int_t nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt) { #if (NXT_THREADS) @@ -1479,21 +1462,323 @@ nxt_runtime_pid_file_create(nxt_task_t *task, nxt_file_name_t *pid_file) nxt_process_t * -nxt_runtime_new_process(nxt_runtime_t *rt) +nxt_runtime_process_new(nxt_runtime_t *rt) { nxt_process_t *process; /* TODO: memory failures. */ - process = nxt_array_zero_add(rt->processes); + process = nxt_mem_cache_zalloc0(rt->mem_pool, sizeof(nxt_process_t)); if (nxt_slow_path(process == NULL)) { return NULL; } - process->ports = nxt_array_create(rt->mem_pool, 1, sizeof(nxt_port_t)); - if (nxt_slow_path(process->ports == NULL)) { + nxt_queue_init(&process->ports); + + /* TODO each process should have it's own mem_pool for ports allocation */ + process->mem_pool = rt->mem_pool; + + return process; +} + + +static nxt_int_t +nxt_runtime_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data) +{ + nxt_process_t *process; + + process = data; + + if (lhq->key.length == sizeof(nxt_pid_t) && + *(nxt_pid_t *) lhq->key.start == process->pid) { + return NXT_OK; + } + + return NXT_DECLINED; +} + +static const nxt_lvlhsh_proto_t lvlhsh_processes_proto nxt_aligned(64) = { + NXT_LVLHSH_DEFAULT, + 0, + nxt_runtime_lvlhsh_pid_test, + nxt_lvlhsh_alloc, + nxt_lvlhsh_free, +}; + + +typedef struct { + nxt_pid_t pid; + nxt_port_id_t port_id; +} nxt_pid_port_id_t; + +static nxt_int_t +nxt_runtime_lvlhsh_port_test(nxt_lvlhsh_query_t *lhq, void *data) +{ + nxt_port_t *port; + nxt_pid_port_id_t *pid_port_id; + + port = data; + pid_port_id = (nxt_pid_port_id_t *) lhq->key.start; + + if (lhq->key.length == sizeof(nxt_pid_port_id_t) && + pid_port_id->pid == port->pid && + pid_port_id->port_id == port->id) { + return NXT_OK; + } + + return NXT_DECLINED; +} + +static const nxt_lvlhsh_proto_t lvlhsh_ports_proto nxt_aligned(64) = { + NXT_LVLHSH_DEFAULT, + 0, + nxt_runtime_lvlhsh_port_test, + nxt_lvlhsh_alloc, + nxt_lvlhsh_free, +}; + + +nxt_process_t * +nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid) +{ + nxt_lvlhsh_query_t lhq; + + lhq.key_hash = nxt_murmur_hash2(&pid, sizeof(pid)); + lhq.key.length = sizeof(pid); + lhq.key.start = (u_char *) &pid; + lhq.proto = &lvlhsh_processes_proto; + + /* TODO lock processes */ + + if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) { + nxt_thread_log_debug("process %PI found", pid); + return lhq.value; + } + + nxt_thread_log_debug("process %PI not found", pid); + + return NULL; +} + + +nxt_process_t * +nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid) +{ + nxt_process_t *process; + nxt_lvlhsh_query_t lhq; + + lhq.key_hash = nxt_murmur_hash2(&pid, sizeof(pid)); + lhq.key.length = sizeof(pid); + lhq.key.start = (u_char *) &pid; + lhq.proto = &lvlhsh_processes_proto; + + /* TODO lock processes */ + + if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) { + nxt_thread_log_debug("process %PI found", pid); + return lhq.value; + } + + process = nxt_runtime_process_new(rt); + if (nxt_slow_path(process == NULL)) { return NULL; } + process->pid = pid; + + lhq.replace = 0; + lhq.value = process; + lhq.pool = rt->mem_pool; + + switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) { + + case NXT_OK: + if (rt->nprocesses == 0) { + rt->mprocess = process; + } + + rt->nprocesses++; + + nxt_thread_log_debug("process %PI insert", pid); + break; + + default: + nxt_thread_log_debug("process %PI insert failed", pid); + break; + } + return process; } + + +void +nxt_runtime_process_add(nxt_runtime_t *rt, nxt_process_t *process) +{ + nxt_port_t *port; + nxt_lvlhsh_query_t lhq; + + lhq.key_hash = nxt_murmur_hash2(&process->pid, sizeof(process->pid)); + lhq.key.length = sizeof(process->pid); + lhq.key.start = (u_char *) &process->pid; + lhq.proto = &lvlhsh_processes_proto; + lhq.replace = 0; + lhq.value = process; + lhq.pool = rt->mem_pool; + + /* TODO lock processes */ + + switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) { + + case NXT_OK: + if (rt->nprocesses == 0) { + rt->mprocess = process; + } + + rt->nprocesses++; + + nxt_process_port_each(process, port) { + + nxt_runtime_port_add(rt, port); + + } nxt_process_port_loop; + + break; + + default: + break; + } +} + + +void +nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process) +{ + nxt_port_t *port; + nxt_lvlhsh_query_t lhq; + + lhq.key_hash = nxt_murmur_hash2(&process->pid, sizeof(process->pid)); + lhq.key.length = sizeof(process->pid); + lhq.key.start = (u_char *) &process->pid; + lhq.proto = &lvlhsh_processes_proto; + lhq.replace = 0; + lhq.value = process; + lhq.pool = rt->mem_pool; + + /* TODO lock processes */ + + switch (nxt_lvlhsh_delete(&rt->processes, &lhq)) { + + case NXT_OK: + rt->nprocesses--; + + nxt_process_port_each(process, port) { + + nxt_runtime_port_remove(rt, port); + + } nxt_process_port_loop; + + break; + + default: + break; + } +} + + +nxt_process_t * +nxt_runtime_process_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe) +{ + nxt_memzero(lhe, sizeof(nxt_lvlhsh_each_t)); + + lhe->proto = &lvlhsh_processes_proto; + + return nxt_runtime_process_next(rt, lhe); +} + + +void +nxt_runtime_port_add(nxt_runtime_t *rt, nxt_port_t *port) +{ + nxt_pid_port_id_t pid_port; + nxt_lvlhsh_query_t lhq; + + pid_port.pid = port->pid; + pid_port.port_id = port->id; + + lhq.key_hash = nxt_murmur_hash2(&pid_port, sizeof(pid_port)); + lhq.key.length = sizeof(pid_port); + lhq.key.start = (u_char *) &pid_port; + lhq.proto = &lvlhsh_ports_proto; + lhq.replace = 0; + lhq.value = port; + lhq.pool = rt->mem_pool; + + /* TODO lock ports */ + + switch (nxt_lvlhsh_insert(&rt->ports, &lhq)) { + + case NXT_OK: + break; + + default: + break; + } +} + + +void +nxt_runtime_port_remove(nxt_runtime_t *rt, nxt_port_t *port) +{ + nxt_pid_port_id_t pid_port; + nxt_lvlhsh_query_t lhq; + + pid_port.pid = port->pid; + pid_port.port_id = port->id; + + lhq.key_hash = nxt_murmur_hash2(&pid_port, sizeof(pid_port)); + lhq.key.length = sizeof(pid_port); + lhq.key.start = (u_char *) &pid_port; + lhq.proto = &lvlhsh_ports_proto; + lhq.replace = 0; + lhq.value = port; + lhq.pool = rt->mem_pool; + + /* TODO lock ports */ + + switch (nxt_lvlhsh_delete(&rt->ports, &lhq)) { + + case NXT_OK: + break; + + default: + break; + } +} + + +nxt_port_t * +nxt_runtime_port_find(nxt_runtime_t *rt, nxt_pid_t pid, + nxt_port_id_t port_id) +{ + nxt_pid_port_id_t pid_port; + nxt_lvlhsh_query_t lhq; + + pid_port.pid = pid; + pid_port.port_id = port_id; + + lhq.key_hash = nxt_murmur_hash2(&pid_port, sizeof(pid_port)); + lhq.key.length = sizeof(pid_port); + lhq.key.start = (u_char *) &pid_port; + lhq.proto = &lvlhsh_ports_proto; + + /* TODO lock ports */ + + if (nxt_lvlhsh_find(&rt->ports, &lhq) == NXT_OK) { + nxt_thread_log_debug("process port (%PI, %d) found", pid, port_id); + return lhq.value; + } + + nxt_thread_log_debug("process port (%PI, %d) not found", pid, port_id); + + return NULL; +} diff --git a/src/nxt_runtime.h b/src/nxt_runtime.h index 3b384235..5cce5703 100644 --- a/src/nxt_runtime.h +++ b/src/nxt_runtime.h @@ -35,7 +35,11 @@ struct nxt_runtime_s { nxt_runtime_cont_t continuation; #endif - nxt_array_t *processes; /* of nxt_process_t */ + nxt_process_t *mprocess; + size_t nprocesses; + nxt_lvlhsh_t processes; /* of nxt_process_t */ + + nxt_lvlhsh_t ports; /* of nxt_port_t */ nxt_list_t *log_files; /* of nxt_file_t */ @@ -78,7 +82,30 @@ nxt_int_t nxt_runtime_thread_pool_create(nxt_thread_t *thr, nxt_runtime_t *rt, #endif -nxt_process_t *nxt_runtime_new_process(nxt_runtime_t *rt); +nxt_process_t *nxt_runtime_process_new(nxt_runtime_t *rt); + +nxt_process_t *nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid); + +void nxt_runtime_process_add(nxt_runtime_t *rt, nxt_process_t *process); + +nxt_process_t *nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid); + +void nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process); + +nxt_process_t *nxt_runtime_process_first(nxt_runtime_t *rt, + nxt_lvlhsh_each_t *lhe); + +#define nxt_runtime_process_next(rt, lhe) \ + nxt_lvlhsh_each(&rt->processes, lhe) + + +void nxt_runtime_port_add(nxt_runtime_t *rt, nxt_port_t *port); + +void nxt_runtime_port_remove(nxt_runtime_t *rt, nxt_port_t *port); + +nxt_port_t *nxt_runtime_port_find(nxt_runtime_t *rt, nxt_pid_t pid, + nxt_port_id_t port_id); + /* STUB */ nxt_int_t nxt_runtime_controller_socket(nxt_task_t *task, nxt_runtime_t *rt); @@ -101,6 +128,19 @@ void nxt_stream_connection_init(nxt_task_t *task, void *obj, void *data); nxt_int_t nxt_app_start(nxt_task_t *task, nxt_runtime_t *rt); +#define nxt_runtime_process_each(rt, process) \ + do { \ + nxt_lvlhsh_each_t _lhe; \ + \ + for (process = nxt_runtime_process_first(rt, &_lhe); \ + process != NULL; \ + process = nxt_runtime_process_next(rt, &_lhe)) { \ + +#define nxt_runtime_process_loop \ + } \ + } while(0) + + extern nxt_module_init_t nxt_init_modules[]; extern nxt_uint_t nxt_init_modules_n; diff --git a/src/nxt_sendbuf.c b/src/nxt_sendbuf.c index 62042504..e8fbe2a0 100644 --- a/src/nxt_sendbuf.c +++ b/src/nxt_sendbuf.c @@ -148,6 +148,7 @@ done: sb->buf = b; sb->size = total; + sb->niov = n; return n; } @@ -390,6 +391,14 @@ nxt_sendbuf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b, break; } + if (nxt_buf_is_port_mmap(b)) { + /* + * buffer has been sent to other side which is now + * responsible for shared memory bucket release + */ + b->is_port_mmap_sent = 1; + } + if (sent < size) { if (nxt_buf_is_mem(b)) { diff --git a/src/nxt_sendbuf.h b/src/nxt_sendbuf.h index 71b18f91..0b789583 100644 --- a/src/nxt_sendbuf.h +++ b/src/nxt_sendbuf.h @@ -53,6 +53,7 @@ typedef struct { typedef struct { nxt_buf_t *buf; nxt_iobuf_t *iobuf; + nxt_uint_t niov; uint32_t nmax; uint8_t sync; /* 1 bit */ diff --git a/src/nxt_worker_process.c b/src/nxt_worker_process.c index 59285113..b6266520 100644 --- a/src/nxt_worker_process.c +++ b/src/nxt_worker_process.c @@ -25,6 +25,7 @@ nxt_port_handler_t nxt_worker_process_port_handlers[] = { nxt_worker_process_quit_handler, nxt_port_new_port_handler, nxt_port_change_log_file_handler, + nxt_port_mmap_handler, nxt_port_data_handler, }; |