summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--auto/sources1
-rw-r--r--src/nxt_port_memory.c433
-rw-r--r--src/nxt_port_memory.h12
-rw-r--r--src/nxt_port_memory_int.h179
4 files changed, 393 insertions, 232 deletions
diff --git a/auto/sources b/auto/sources
index df7c9959..33c97e1f 100644
--- a/auto/sources
+++ b/auto/sources
@@ -20,6 +20,7 @@ NXT_LIB_DEPS=" \
src/nxt_port.h \
src/nxt_port_hash.h \
src/nxt_port_memory.h \
+ src/nxt_port_memory_int.h \
src/nxt_dyld.h \
src/nxt_thread.h \
src/nxt_thread_id.h \
diff --git a/src/nxt_port_memory.c b/src/nxt_port_memory.c
index 2af03f67..5832ca25 100644
--- a/src/nxt_port_memory.c
+++ b/src/nxt_port_memory.c
@@ -1,4 +1,9 @@
+/*
+ * Copyright (C) Max Romanov
+ * Copyright (C) NGINX, Inc.
+ */
+
#include <nxt_main.h>
#if (NXT_HAVE_MEMFD_CREATE)
@@ -9,89 +14,14 @@
#endif
-#define PORT_MMAP_CHUNK_SIZE (1024 * 16)
-#define PORT_MMAP_HEADER_SIZE (1024 * 4)
-#define PORT_MMAP_SIZE (PORT_MMAP_HEADER_SIZE + 1024 * 1024 * 10)
-
-#define PORT_MMAP_CHUNK_COUNT \
- ( (PORT_MMAP_SIZE - PORT_MMAP_HEADER_SIZE) / PORT_MMAP_CHUNK_SIZE )
-
-
-typedef uint32_t nxt_chunk_id_t;
-
-typedef nxt_atomic_uint_t nxt_free_map_t;
-
-#define FREE_BITS (sizeof(nxt_free_map_t) * 8)
-
-#define FREE_IDX(nchunk) ((nchunk) / FREE_BITS)
-
-#define FREE_MASK(nchunk) \
- ( 1ULL << ( (nchunk) % FREE_BITS ) )
-
-#define MAX_FREE_IDX FREE_IDX(PORT_MMAP_CHUNK_COUNT)
-
-
-/* Mapped at the start of shared memory segment. */
-struct nxt_port_mmap_header_s {
- nxt_free_map_t free_map[MAX_FREE_IDX];
-};
-
-
-/*
- * Element of nxt_process_t.incoming/outgoing, shared memory segment
- * descriptor.
- */
-struct nxt_port_mmap_s {
- uint32_t id;
- nxt_fd_t fd;
- nxt_pid_t pid; /* For sanity check. */
- union {
- void *mem;
- nxt_port_mmap_header_t *hdr;
- } u;
-};
-
-
-/* Passed as a second iov chunk when 'mmap' bit in nxt_port_msg_t is 1. */
-typedef struct {
- uint32_t mmap_id; /* Mmap index in nxt_process_t.outgoing. */
- nxt_chunk_id_t chunk_id; /* Mmap chunk index. */
- uint32_t size; /* Payload data size. */
-} nxt_port_mmap_msg_t;
-
-
-static nxt_bool_t
-nxt_port_mmap_get_free_chunk(nxt_port_mmap_t *port_mmap, nxt_chunk_id_t *c);
-
-#define nxt_port_mmap_get_chunk_busy(hdr, c) \
- ((hdr->free_map[FREE_IDX(c)] & FREE_MASK(c)) == 0)
-
-nxt_inline void
-nxt_port_mmap_set_chunk_busy(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c);
-
-nxt_inline void
-nxt_port_mmap_set_chunk_free(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c);
-
-#define nxt_port_mmap_chunk_id(port_mmap, b) \
- ((((u_char *) (b) - (u_char *) (port_mmap->u.mem)) - \
- PORT_MMAP_HEADER_SIZE) / PORT_MMAP_CHUNK_SIZE)
-
-#define nxt_port_mmap_chunk_start(port_mmap, chunk) \
- (((u_char *) (port_mmap->u.mem)) + PORT_MMAP_HEADER_SIZE + \
- (chunk) * PORT_MMAP_CHUNK_SIZE)
-
+#include <nxt_port_memory_int.h>
void
nxt_port_mmap_destroy(nxt_port_mmap_t *port_mmap)
{
- if (port_mmap->u.mem != NULL) {
- nxt_mem_munmap(port_mmap->u.mem, PORT_MMAP_SIZE);
- port_mmap->u.mem = NULL;
- }
-
- if (port_mmap->fd != -1) {
- nxt_fd_close(port_mmap->fd);
- port_mmap->fd = -1;
+ if (port_mmap->hdr != NULL) {
+ nxt_mem_munmap(port_mmap->hdr, PORT_MMAP_SIZE);
+ port_mmap->hdr = NULL;
}
}
@@ -103,7 +33,6 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
nxt_mp_t *mp;
nxt_buf_t *b;
nxt_chunk_id_t c;
- nxt_port_mmap_t *port_mmap;
nxt_port_mmap_header_t *hdr;
b = obj;
@@ -120,8 +49,7 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
}
#endif
- port_mmap = data;
- hdr = port_mmap->u.hdr;
+ hdr = data;
if (b->is_port_mmap_sent && b->mem.pos > b->mem.start) {
/*
@@ -129,11 +57,11 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
* let's release rest (if any).
*/
p = b->mem.pos - 1;
- c = nxt_port_mmap_chunk_id(port_mmap, p) + 1;
- p = nxt_port_mmap_chunk_start(port_mmap, c);
+ c = nxt_port_mmap_chunk_id(hdr, p) + 1;
+ p = nxt_port_mmap_chunk_start(hdr, c);
} else {
p = b->mem.start;
- c = nxt_port_mmap_chunk_id(port_mmap, p);
+ c = nxt_port_mmap_chunk_id(hdr, p);
}
while (p < b->mem.end) {
@@ -147,61 +75,21 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
}
-static nxt_bool_t
-nxt_port_mmap_get_free_chunk(nxt_port_mmap_t *port_mmap, nxt_chunk_id_t *c)
-{
- int ffs;
- size_t i;
- nxt_free_map_t bits;
- nxt_free_map_t *free_map;
-
- free_map = port_mmap->u.hdr->free_map;
-
- for (i = 0; i < MAX_FREE_IDX; i++) {
- bits = free_map[i];
- if (bits == 0) {
- continue;
- }
-
- ffs = __builtin_ffsll(bits);
- if (ffs != 0) {
- *c = i * FREE_BITS + ffs - 1;
- return 1;
- }
- }
-
- return 0;
-}
-
-
-nxt_inline void
-nxt_port_mmap_set_chunk_busy(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c)
-{
- nxt_atomic_and_fetch(hdr->free_map + FREE_IDX(c), ~FREE_MASK(c));
-
- nxt_thread_log_debug("set_chunk_busy: hdr %p; b %D", hdr, c);
-}
-
-
-nxt_inline void
-nxt_port_mmap_set_chunk_free(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c)
-{
- nxt_atomic_or_fetch(hdr->free_map + FREE_IDX(c), FREE_MASK(c));
-
- nxt_thread_log_debug("set_chunk_free: hdr %p; b %D", hdr, c);
-}
-
-
-nxt_port_mmap_t *
+nxt_port_mmap_header_t *
nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process,
nxt_fd_t fd)
{
- struct stat mmap_stat;
- nxt_port_mmap_t *port_mmap;
+ void *mem;
+ struct stat mmap_stat;
+ nxt_port_mmap_t *port_mmap;
+ nxt_port_mmap_header_t *hdr;
nxt_debug(task, "got new mmap fd #%FD from process %PI",
fd, process->pid);
+ port_mmap = NULL;
+ hdr = NULL;
+
if (fstat(fd, &mmap_stat) == -1) {
nxt_log(task, NXT_LOG_WARN, "fstat(%FD) failed %E", fd, nxt_errno);
@@ -216,44 +104,51 @@ nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process,
if (nxt_slow_path(process->incoming == NULL)) {
nxt_log(task, NXT_LOG_WARN, "failed to allocate incoming array");
- return NULL;
+ goto fail;
}
port_mmap = nxt_array_zero_add(process->incoming);
if (nxt_slow_path(port_mmap == NULL)) {
nxt_log(task, NXT_LOG_WARN, "failed to add mmap to incoming array");
- return NULL;
+ goto fail;
}
- port_mmap->id = process->incoming->nelts - 1;
- port_mmap->fd = -1;
- port_mmap->pid = process->pid;
- port_mmap->u.mem = nxt_mem_mmap(NULL, mmap_stat.st_size,
- PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
+ mem = nxt_mem_mmap(NULL, mmap_stat.st_size,
+ PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
- if (nxt_slow_path(port_mmap->u.mem == MAP_FAILED)) {
+ if (nxt_slow_path(mem == MAP_FAILED)) {
nxt_log(task, NXT_LOG_WARN, "mmap() failed %E", nxt_errno);
- port_mmap->u.mem = NULL;
+ port_mmap = NULL;
- return NULL;
+ goto fail;
}
- return port_mmap;
+ port_mmap->hdr = mem;
+ hdr = port_mmap->hdr;
+
+ if (nxt_slow_path(port_mmap->hdr->id != process->incoming->nelts - 1)) {
+ nxt_log(task, NXT_LOG_WARN, "port mmap id mismatch (%d != %d)",
+ port_mmap->hdr->id, process->incoming->nelts - 1);
+ }
+
+fail:
+
+ return hdr;
}
static void
nxt_port_mmap_send_fd_buf_completion(nxt_task_t *task, void *obj, void *data)
{
- nxt_buf_t *b;
- nxt_mp_t *mp;
- nxt_port_mmap_t *port_mmap;
+ nxt_fd_t fd;
+ nxt_buf_t *b;
+ nxt_mp_t *mp;
b = obj;
mp = b->data;
- port_mmap = data;
+ fd = (nxt_fd_t) (intptr_t) data;
#if (NXT_DEBUG)
if (nxt_slow_path(data != b->parent)) {
@@ -263,24 +158,27 @@ nxt_port_mmap_send_fd_buf_completion(nxt_task_t *task, void *obj, void *data)
}
#endif
- nxt_debug(task, "mmap fd %FD has been sent", port_mmap->fd);
+ nxt_debug(task, "mmap fd %FD has been sent", fd);
- nxt_fd_close(port_mmap->fd);
- port_mmap->fd = -1;
+ nxt_fd_close(fd);
nxt_buf_free(mp, b);
}
-static nxt_port_mmap_t *
+static nxt_port_mmap_header_t *
nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
nxt_port_t *port)
{
+ void *mem;
u_char *p, name[64];
+ nxt_fd_t fd;
nxt_buf_t *b;
nxt_port_mmap_t *port_mmap;
nxt_port_mmap_header_t *hdr;
+ port_mmap = NULL;
+
if (process->outgoing == NULL) {
process->outgoing = nxt_array_create(process->mem_pool, 1,
sizeof(nxt_port_mmap_t));
@@ -300,34 +198,30 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
return NULL;
}
- port_mmap->id = process->outgoing->nelts - 1;
- port_mmap->pid = process->pid;
-
p = nxt_sprintf(name, name + sizeof(name), "/nginext.%PI.%uxD",
nxt_pid, nxt_random(&nxt_random_data));
*p = '\0';
#if (NXT_HAVE_MEMFD_CREATE)
- port_mmap->fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
+ fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
- if (nxt_slow_path(port_mmap->fd == -1)) {
+ if (nxt_slow_path(fd == -1)) {
nxt_log(task, NXT_LOG_CRIT, "memfd_create(%s) failed %E",
name, nxt_errno);
goto remove_fail;
}
- nxt_debug(task, "memfd_create(%s): %FD", name, port_mmap->fd);
+ nxt_debug(task, "memfd_create(%s): %FD", name, fd);
#elif (NXT_HAVE_SHM_OPEN)
shm_unlink((char *) name); // just in case
- port_mmap->fd = shm_open((char *) name, O_CREAT | O_EXCL | O_RDWR,
- S_IRUSR | S_IWUSR);
+ fd = shm_open((char *) name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
- nxt_debug(task, "shm_open(%s): %FD", name, port_mmap->fd);
+ nxt_debug(task, "shm_open(%s): %FD", name, fd);
- if (nxt_slow_path(port_mmap->fd == -1)) {
+ if (nxt_slow_path(fd == -1)) {
nxt_log(task, NXT_LOG_CRIT, "shm_open(%s) failed %E", name, nxt_errno);
goto remove_fail;
@@ -339,20 +233,21 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
}
#endif
- if (nxt_slow_path(ftruncate(port_mmap->fd, PORT_MMAP_SIZE) == -1)) {
+ if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) {
nxt_log(task, NXT_LOG_WARN, "ftruncate() failed %E", nxt_errno);
goto remove_fail;
}
- port_mmap->u.mem = nxt_mem_mmap(NULL, PORT_MMAP_SIZE,
- PROT_READ | PROT_WRITE, MAP_SHARED,
- port_mmap->fd, 0);
+ mem = nxt_mem_mmap(NULL, PORT_MMAP_SIZE,
+ PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
- if (nxt_slow_path(port_mmap->u.mem == MAP_FAILED)) {
+ if (nxt_slow_path(mem == MAP_FAILED)) {
goto remove_fail;
}
+ port_mmap->hdr = mem;
+
b = nxt_buf_mem_alloc(port->mem_pool, 0, 0);
if (nxt_slow_path(b == NULL)) {
goto remove_fail;
@@ -360,27 +255,32 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
b->completion_handler = nxt_port_mmap_send_fd_buf_completion;
b->data = port->mem_pool;
- b->parent = port_mmap;
+ b->parent = (void *) (intptr_t) fd;
/* Init segment header. */
- hdr = port_mmap->u.hdr;
+ hdr = port_mmap->hdr;
nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
+ hdr->id = process->outgoing->nelts - 1;
+ hdr->pid = process->pid;
+
+ /* Mark first chunk as busy */
+ nxt_port_mmap_set_chunk_busy(hdr, 0);
+
/* Mark as busy chunk followed the last available chunk. */
nxt_port_mmap_set_chunk_busy(hdr, PORT_MMAP_CHUNK_COUNT);
- nxt_debug(task, "send mmap fd %FD to process %PI", port_mmap->fd,
+ nxt_debug(task, "send mmap fd %FD to process %PI", fd,
port->pid);
/* TODO handle error */
- (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, port_mmap->fd,
- 0, 0, b);
+ (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, b);
nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> %PI",
- port_mmap->id, nxt_pid, process->pid);
+ hdr->id, nxt_pid, process->pid);
- return port_mmap;
+ return hdr;
remove_fail:
@@ -390,24 +290,29 @@ remove_fail:
}
-static nxt_port_mmap_t *
+static nxt_port_mmap_header_t *
nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c,
size_t size)
{
- nxt_array_t *outgoing;
- nxt_process_t *process;
- nxt_port_mmap_t *port_mmap;
- nxt_port_mmap_t *end_port_mmap;
+ nxt_array_t *outgoing;
+ nxt_process_t *process;
+ nxt_port_mmap_t *port_mmap;
+ nxt_port_mmap_t *end_port_mmap;
+ nxt_port_mmap_header_t *hdr;
- process = nxt_runtime_process_get(task->thread->runtime, port->pid);
+ process = port->process;
if (nxt_slow_path(process == NULL)) {
return NULL;
}
*c = 0;
+ port_mmap = NULL;
+ hdr = NULL;
if (process->outgoing == NULL) {
- return nxt_port_new_port_mmap(task, process, port);
+ hdr = nxt_port_new_port_mmap(task, process, port);
+
+ goto unlock_return;
}
outgoing = process->outgoing;
@@ -416,44 +321,51 @@ nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c,
while (port_mmap < end_port_mmap) {
- if (nxt_port_mmap_get_free_chunk(port_mmap, c)) {
- return port_mmap;
+ if (nxt_port_mmap_get_free_chunk(port_mmap->hdr, c)) {
+ hdr = port_mmap->hdr;
+
+ goto unlock_return;
}
port_mmap++;
}
/* TODO introduce port_mmap limit and release wait. */
- return nxt_port_new_port_mmap(task, process, port);
+
+ hdr = nxt_port_new_port_mmap(task, process, port);
+
+unlock_return:
+
+ return hdr;
}
-static nxt_port_mmap_t *
+static nxt_port_mmap_header_t *
nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id)
{
- nxt_array_t *incoming;
- nxt_process_t *process;
- nxt_port_mmap_t *port_mmap;
+ nxt_array_t *incoming;
+ nxt_process_t *process;
+ nxt_port_mmap_t *port_mmap;
+ nxt_port_mmap_header_t *hdr;
process = nxt_runtime_process_get(task->thread->runtime, spid);
if (nxt_slow_path(process == NULL)) {
return NULL;
}
+ hdr = NULL;
+
incoming = process->incoming;
- if (nxt_slow_path(incoming == NULL)) {
- /* TODO add warning */
- return NULL;
- }
- if (nxt_slow_path(incoming->nelts <= id)) {
- /* TODO add warning */
- return NULL;
+ if (nxt_fast_path(incoming != NULL && incoming->nelts > id)) {
+ port_mmap = incoming->elts;
+ hdr = port_mmap[id].hdr;
+ } else {
+ nxt_log(task, NXT_LOG_WARN,
+ "failed to get incoming mmap #%d for process %PI", id, spid);
}
- port_mmap = incoming->elts;
-
- return port_mmap + id;
+ return hdr;
}
@@ -463,11 +375,15 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
size_t nchunks;
nxt_buf_t *b;
nxt_chunk_id_t c;
- nxt_port_mmap_t *port_mmap;
nxt_port_mmap_header_t *hdr;
nxt_debug(task, "request %z bytes shm buffer", size);
+ if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) {
+ nxt_debug(task, "requested size (%z bytes) too big", size);
+ return NULL;
+ }
+
b = nxt_mp_zalloc(port->mem_pool, NXT_BUF_PORT_MMAP_SIZE);
if (nxt_slow_path(b == NULL)) {
return NULL;
@@ -479,22 +395,18 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
nxt_buf_set_port_mmap(b);
- port_mmap = nxt_port_mmap_get(task, port, &c, size);
- if (nxt_slow_path(port_mmap == NULL)) {
+ hdr = nxt_port_mmap_get(task, port, &c, size);
+ if (nxt_slow_path(hdr == NULL)) {
nxt_buf_free(port->mem_pool, b);
return NULL;
}
- hdr = port_mmap->u.hdr;
-
- b->parent = port_mmap;
- b->mem.start = nxt_port_mmap_chunk_start(port_mmap, c);
+ b->parent = hdr;
+ b->mem.start = nxt_port_mmap_chunk_start(hdr, c);
b->mem.pos = b->mem.start;
b->mem.free = b->mem.start;
b->mem.end = b->mem.start + PORT_MMAP_CHUNK_SIZE;
- nxt_port_mmap_set_chunk_busy(hdr, c);
-
nchunks = size / PORT_MMAP_CHUNK_SIZE;
if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) {
nchunks++;
@@ -506,10 +418,9 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
/* Try to acquire as much chunks as required. */
while (nchunks > 0) {
- if (nxt_port_mmap_get_chunk_busy(hdr, c)) {
+ if (nxt_port_mmap_chk_set_chunk_busy(hdr, c) == 0) {
break;
}
- nxt_port_mmap_set_chunk_busy(hdr, c);
b->mem.end += PORT_MMAP_CHUNK_SIZE;
c++;
@@ -520,13 +431,79 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
}
+nxt_int_t
+nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size)
+{
+ size_t nchunks;
+ nxt_chunk_id_t c, start;
+ nxt_port_mmap_header_t *hdr;
+
+ nxt_debug(task, "request increase %z bytes shm buffer", size);
+
+ if (nxt_slow_path(nxt_buf_is_port_mmap(b) == 0)) {
+ nxt_log(task, NXT_LOG_WARN,
+ "failed to increase, not a mmap buffer");
+ return NXT_ERROR;
+ }
+
+ if (nxt_slow_path(size <= (size_t) nxt_buf_mem_free_size(&b->mem))) {
+ return NXT_OK;
+ }
+
+ hdr = b->parent;
+
+ start = nxt_port_mmap_chunk_id(hdr, b->mem.end);
+
+ size -= nxt_buf_mem_free_size(&b->mem);
+
+ nchunks = size / PORT_MMAP_CHUNK_SIZE;
+ if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) {
+ nchunks++;
+ }
+
+ c = start;
+
+ /* Try to acquire as much chunks as required. */
+ while (nchunks > 0) {
+
+ if (nxt_port_mmap_chk_set_chunk_busy(hdr, c) == 0) {
+ break;
+ }
+
+ c++;
+ nchunks--;
+ }
+
+ if (nchunks != 0) {
+ c--;
+ while (c >= start) {
+ nxt_port_mmap_set_chunk_free(hdr, c);
+ c--;
+ }
+
+ nxt_debug(task, "failed to increase, %d chunks busy", nchunks);
+
+ return NXT_ERROR;
+ } else {
+ b->mem.end += PORT_MMAP_CHUNK_SIZE * (c - start);
+
+ return NXT_OK;
+ }
+}
+
+
static nxt_buf_t *
nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port,
nxt_pid_t spid, nxt_port_mmap_msg_t *mmap_msg)
{
size_t nchunks;
nxt_buf_t *b;
- nxt_port_mmap_t *port_mmap;
+ nxt_port_mmap_header_t *hdr;
+
+ hdr = nxt_port_get_port_incoming_mmap(task, spid, mmap_msg->mmap_id);
+ if (nxt_slow_path(hdr == NULL)) {
+ return NULL;
+ }
b = nxt_mp_zalloc(port->mem_pool, NXT_BUF_PORT_MMAP_SIZE);
if (nxt_slow_path(b == NULL)) {
@@ -539,23 +516,17 @@ nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port,
nxt_buf_set_port_mmap(b);
- port_mmap = nxt_port_get_port_incoming_mmap(task, spid, mmap_msg->mmap_id);
- if (nxt_slow_path(port_mmap == NULL)) {
- nxt_buf_free(port->mem_pool, b);
- return NULL;
- }
-
nchunks = mmap_msg->size / PORT_MMAP_CHUNK_SIZE;
if ((mmap_msg->size % PORT_MMAP_CHUNK_SIZE) != 0) {
nchunks++;
}
- b->mem.start = nxt_port_mmap_chunk_start(port_mmap, mmap_msg->chunk_id);
+ b->mem.start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id);
b->mem.pos = b->mem.start;
b->mem.free = b->mem.start + mmap_msg->size;
b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE;
- b->parent = port_mmap;
+ b->parent = hdr;
return b;
}
@@ -565,11 +536,11 @@ void
nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port,
nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb)
{
- size_t bsize;
- nxt_buf_t *b, *bmem;
- nxt_uint_t i;
- nxt_port_mmap_t *port_mmap;
- nxt_port_mmap_msg_t *mmap_msg;
+ size_t bsize;
+ nxt_buf_t *b, *bmem;
+ nxt_uint_t i;
+ nxt_port_mmap_msg_t *mmap_msg;
+ nxt_port_mmap_header_t *hdr;
nxt_debug(task, "prepare %z bytes message for transfer to process %PI "
"via shared memory", sb->size, port->pid);
@@ -598,10 +569,10 @@ nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port,
/* TODO clear b and exit */
}
- port_mmap = (nxt_port_mmap_t *) bmem->parent;
+ hdr = bmem->parent;
- mmap_msg->mmap_id = port_mmap->id;
- mmap_msg->chunk_id = nxt_port_mmap_chunk_id(port_mmap, bmem->mem.pos);
+ mmap_msg->mmap_id = hdr->id;
+ mmap_msg->chunk_id = nxt_port_mmap_chunk_id(hdr, bmem->mem.pos);
mmap_msg->size = sb->iobuf[i].iov_len;
nxt_debug(task, "mmap_msg={%D, %D, %D} to %PI",
@@ -660,8 +631,8 @@ nxt_port_mmap_read(nxt_task_t *task, nxt_port_t *port,
nxt_port_method_t
nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b)
{
- nxt_port_mmap_t *port_mmap;
- nxt_port_method_t m;
+ nxt_port_method_t m;
+ nxt_port_mmap_header_t *hdr;
m = NXT_PORT_METHOD_ANY;
@@ -672,7 +643,7 @@ nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b)
}
if (nxt_buf_is_port_mmap(b)) {
- port_mmap = (nxt_port_mmap_t *) b->parent;
+ hdr = b->parent;
if (m == NXT_PORT_METHOD_PLAIN) {
nxt_log_error(NXT_LOG_ERR, task->log,
@@ -682,10 +653,10 @@ nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b)
break;
}
- if (port->pid != port_mmap->pid) {
+ if (port->pid != hdr->pid) {
nxt_log_error(NXT_LOG_ERR, task->log,
"send mmap buffer for %PI to %PI, "
- "using plain mode", port_mmap->pid, port->pid);
+ "using plain mode", hdr->pid, port->pid);
m = NXT_PORT_METHOD_PLAIN;
diff --git a/src/nxt_port_memory.h b/src/nxt_port_memory.h
index 9ad4e2a4..379201d0 100644
--- a/src/nxt_port_memory.h
+++ b/src/nxt_port_memory.h
@@ -1,7 +1,13 @@
+/*
+ * Copyright (C) Max Romanov
+ * Copyright (C) NGINX, Inc.
+ */
+
#ifndef _NXT_PORT_MEMORY_H_INCLUDED_
#define _NXT_PORT_MEMORY_H_INCLUDED_
+
#define PORT_MMAP_MIN_SIZE (3 * sizeof(uint32_t))
typedef struct nxt_port_mmap_header_s nxt_port_mmap_header_t;
@@ -19,7 +25,10 @@ nxt_port_mmap_destroy(nxt_port_mmap_t *port_mmap);
nxt_buf_t *
nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size);
-nxt_port_mmap_t *
+nxt_int_t nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b,
+ size_t size);
+
+nxt_port_mmap_header_t *
nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process,
nxt_fd_t fd);
@@ -47,4 +56,5 @@ typedef enum nxt_port_method_e nxt_port_method_t;
nxt_port_method_t
nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b);
+
#endif /* _NXT_PORT_MEMORY_H_INCLUDED_ */
diff --git a/src/nxt_port_memory_int.h b/src/nxt_port_memory_int.h
new file mode 100644
index 00000000..d82b6237
--- /dev/null
+++ b/src/nxt_port_memory_int.h
@@ -0,0 +1,179 @@
+
+/*
+ * Copyright (C) Max Romanov
+ * Copyright (C) NGINX, Inc.
+ */
+
+#ifndef _NXT_PORT_MEMORY_INT_H_INCLUDED_
+#define _NXT_PORT_MEMORY_INT_H_INCLUDED_
+
+
+#include <stdint.h>
+#include <nxt_atomic.h>
+
+
+#ifdef NXT_MMAP_TINY_CHUNK
+
+#define PORT_MMAP_CHUNK_SIZE 16
+#define PORT_MMAP_HEADER_SIZE 1024
+#define PORT_MMAP_DATA_SIZE 1024
+
+#else
+
+#define PORT_MMAP_CHUNK_SIZE (1024 * 16)
+#define PORT_MMAP_HEADER_SIZE (1024 * 4)
+#define PORT_MMAP_DATA_SIZE (1024 * 1024 * 10)
+
+#endif
+
+
+#define PORT_MMAP_SIZE (PORT_MMAP_HEADER_SIZE + PORT_MMAP_DATA_SIZE)
+#define PORT_MMAP_CHUNK_COUNT (PORT_MMAP_DATA_SIZE / PORT_MMAP_CHUNK_SIZE)
+
+
+typedef uint32_t nxt_chunk_id_t;
+
+typedef nxt_atomic_uint_t nxt_free_map_t;
+
+#define FREE_BITS (sizeof(nxt_free_map_t) * 8)
+
+#define FREE_IDX(nchunk) ((nchunk) / FREE_BITS)
+
+#define FREE_MASK(nchunk) \
+ ( 1ULL << ( (nchunk) % FREE_BITS ) )
+
+#define MAX_FREE_IDX FREE_IDX(PORT_MMAP_CHUNK_COUNT)
+
+
+/* Mapped at the start of shared memory segment. */
+struct nxt_port_mmap_header_s {
+ uint32_t id;
+ nxt_pid_t pid; /* For sanity check. */
+ nxt_free_map_t free_map[MAX_FREE_IDX];
+};
+
+
+/*
+ * Element of nxt_process_t.incoming/outgoing, shared memory segment
+ * descriptor.
+ */
+struct nxt_port_mmap_s {
+ nxt_port_mmap_header_t *hdr;
+};
+
+typedef struct nxt_port_mmap_msg_s nxt_port_mmap_msg_t;
+
+/* Passed as a second iov chunk when 'mmap' bit in nxt_port_msg_t is 1. */
+struct nxt_port_mmap_msg_s {
+ uint32_t mmap_id; /* Mmap index in nxt_process_t.outgoing. */
+ nxt_chunk_id_t chunk_id; /* Mmap chunk index. */
+ uint32_t size; /* Payload data size. */
+};
+
+
+static nxt_bool_t
+nxt_port_mmap_get_free_chunk(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t *c);
+
+#define nxt_port_mmap_get_chunk_busy(hdr, c) \
+ ((hdr->free_map[FREE_IDX(c)] & FREE_MASK(c)) == 0)
+
+nxt_inline void
+nxt_port_mmap_set_chunk_busy(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c);
+
+nxt_inline nxt_bool_t
+nxt_port_mmap_chk_set_chunk_busy(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c);
+
+nxt_inline void
+nxt_port_mmap_set_chunk_free(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c);
+
+nxt_inline nxt_chunk_id_t
+nxt_port_mmap_chunk_id(nxt_port_mmap_header_t *hdr, u_char *p)
+{
+ u_char *mm_start;
+
+ mm_start = (u_char *) hdr;
+
+ return ((p - mm_start) - PORT_MMAP_HEADER_SIZE) / PORT_MMAP_CHUNK_SIZE;
+}
+
+
+nxt_inline u_char *
+nxt_port_mmap_chunk_start(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c)
+{
+ u_char *mm_start;
+
+ mm_start = (u_char *) hdr;
+
+ return mm_start + PORT_MMAP_HEADER_SIZE + c * PORT_MMAP_CHUNK_SIZE;
+}
+
+
+static nxt_bool_t
+nxt_port_mmap_get_free_chunk(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t *c)
+{
+ int ffs;
+ size_t i;
+ nxt_chunk_id_t chunk;
+ nxt_free_map_t bits;
+ nxt_free_map_t *free_map;
+
+ free_map = hdr->free_map;
+
+ for (i = 0; i < MAX_FREE_IDX; i++) {
+ bits = free_map[i];
+ if (bits == 0) {
+ continue;
+ }
+
+ ffs = __builtin_ffsll(bits);
+ if (ffs != 0) {
+ chunk = i * FREE_BITS + ffs - 1;
+
+ if (nxt_port_mmap_chk_set_chunk_busy(hdr, chunk)) {
+ *c = chunk;
+ return 1;
+ }
+ }
+ }
+
+ return 0;
+}
+
+
+nxt_inline void
+nxt_port_mmap_set_chunk_busy(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c)
+{
+ nxt_atomic_and_fetch(hdr->free_map + FREE_IDX(c), ~FREE_MASK(c));
+}
+
+
+nxt_inline nxt_bool_t
+nxt_port_mmap_chk_set_chunk_busy(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c)
+{
+ nxt_free_map_t *f;
+ nxt_free_map_t free_val, busy_val;
+
+ f = hdr->free_map + FREE_IDX(c);
+
+ while ( (*f & FREE_MASK(c)) != 0 ) {
+
+ free_val = *f | FREE_MASK(c);
+ busy_val = free_val & ~FREE_MASK(c);
+
+ if (nxt_atomic_cmp_set(f, free_val, busy_val) != 0) {
+ return 1;
+ }
+ }
+
+ return 0;
+}
+
+
+nxt_inline void
+nxt_port_mmap_set_chunk_free(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c)
+{
+ nxt_atomic_or_fetch(hdr->free_map + FREE_IDX(c), FREE_MASK(c));
+}
+
+
+#endif /* _NXT_PORT_MEMORY_INT_H_INCLUDED_ */