summaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2017-06-23 19:20:04 +0300
committerMax Romanov <max.romanov@nginx.com>2017-06-23 19:20:04 +0300
commit0cd95216877587d053341f52aa4b16362eccf173 (patch)
tree274b969a8f940845acb4710bcba05b7b2e5fa0fa /src
parentb13cdb0faae250e6aa8c9e46be03e3162f7de27d (diff)
downloadunit-0cd95216877587d053341f52aa4b16362eccf173.tar.gz
unit-0cd95216877587d053341f52aa4b16362eccf173.tar.bz2
Store pointer to shared memory start in buf->parent.
nxt_port_mmap_t stored in arrays and it is unsafe to store pointer to array element. Shared memory structures and macros moved to separate header file to be used by GO package.
Diffstat (limited to '')
-rw-r--r--src/nxt_port_memory.c433
-rw-r--r--src/nxt_port_memory.h12
-rw-r--r--src/nxt_port_memory_int.h179
3 files changed, 392 insertions, 232 deletions
diff --git a/src/nxt_port_memory.c b/src/nxt_port_memory.c
index 2af03f67..5832ca25 100644
--- a/src/nxt_port_memory.c
+++ b/src/nxt_port_memory.c
@@ -1,4 +1,9 @@
+/*
+ * Copyright (C) Max Romanov
+ * Copyright (C) NGINX, Inc.
+ */
+
#include <nxt_main.h>
#if (NXT_HAVE_MEMFD_CREATE)
@@ -9,89 +14,14 @@
#endif
-#define PORT_MMAP_CHUNK_SIZE (1024 * 16)
-#define PORT_MMAP_HEADER_SIZE (1024 * 4)
-#define PORT_MMAP_SIZE (PORT_MMAP_HEADER_SIZE + 1024 * 1024 * 10)
-
-#define PORT_MMAP_CHUNK_COUNT \
- ( (PORT_MMAP_SIZE - PORT_MMAP_HEADER_SIZE) / PORT_MMAP_CHUNK_SIZE )
-
-
-typedef uint32_t nxt_chunk_id_t;
-
-typedef nxt_atomic_uint_t nxt_free_map_t;
-
-#define FREE_BITS (sizeof(nxt_free_map_t) * 8)
-
-#define FREE_IDX(nchunk) ((nchunk) / FREE_BITS)
-
-#define FREE_MASK(nchunk) \
- ( 1ULL << ( (nchunk) % FREE_BITS ) )
-
-#define MAX_FREE_IDX FREE_IDX(PORT_MMAP_CHUNK_COUNT)
-
-
-/* Mapped at the start of shared memory segment. */
-struct nxt_port_mmap_header_s {
- nxt_free_map_t free_map[MAX_FREE_IDX];
-};
-
-
-/*
- * Element of nxt_process_t.incoming/outgoing, shared memory segment
- * descriptor.
- */
-struct nxt_port_mmap_s {
- uint32_t id;
- nxt_fd_t fd;
- nxt_pid_t pid; /* For sanity check. */
- union {
- void *mem;
- nxt_port_mmap_header_t *hdr;
- } u;
-};
-
-
-/* Passed as a second iov chunk when 'mmap' bit in nxt_port_msg_t is 1. */
-typedef struct {
- uint32_t mmap_id; /* Mmap index in nxt_process_t.outgoing. */
- nxt_chunk_id_t chunk_id; /* Mmap chunk index. */
- uint32_t size; /* Payload data size. */
-} nxt_port_mmap_msg_t;
-
-
-static nxt_bool_t
-nxt_port_mmap_get_free_chunk(nxt_port_mmap_t *port_mmap, nxt_chunk_id_t *c);
-
-#define nxt_port_mmap_get_chunk_busy(hdr, c) \
- ((hdr->free_map[FREE_IDX(c)] & FREE_MASK(c)) == 0)
-
-nxt_inline void
-nxt_port_mmap_set_chunk_busy(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c);
-
-nxt_inline void
-nxt_port_mmap_set_chunk_free(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c);
-
-#define nxt_port_mmap_chunk_id(port_mmap, b) \
- ((((u_char *) (b) - (u_char *) (port_mmap->u.mem)) - \
- PORT_MMAP_HEADER_SIZE) / PORT_MMAP_CHUNK_SIZE)
-
-#define nxt_port_mmap_chunk_start(port_mmap, chunk) \
- (((u_char *) (port_mmap->u.mem)) + PORT_MMAP_HEADER_SIZE + \
- (chunk) * PORT_MMAP_CHUNK_SIZE)
-
+#include <nxt_port_memory_int.h>
void
nxt_port_mmap_destroy(nxt_port_mmap_t *port_mmap)
{
- if (port_mmap->u.mem != NULL) {
- nxt_mem_munmap(port_mmap->u.mem, PORT_MMAP_SIZE);
- port_mmap->u.mem = NULL;
- }
-
- if (port_mmap->fd != -1) {
- nxt_fd_close(port_mmap->fd);
- port_mmap->fd = -1;
+ if (port_mmap->hdr != NULL) {
+ nxt_mem_munmap(port_mmap->hdr, PORT_MMAP_SIZE);
+ port_mmap->hdr = NULL;
}
}
@@ -103,7 +33,6 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
nxt_mp_t *mp;
nxt_buf_t *b;
nxt_chunk_id_t c;
- nxt_port_mmap_t *port_mmap;
nxt_port_mmap_header_t *hdr;
b = obj;
@@ -120,8 +49,7 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
}
#endif
- port_mmap = data;
- hdr = port_mmap->u.hdr;
+ hdr = data;
if (b->is_port_mmap_sent && b->mem.pos > b->mem.start) {
/*
@@ -129,11 +57,11 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
* let's release rest (if any).
*/
p = b->mem.pos - 1;
- c = nxt_port_mmap_chunk_id(port_mmap, p) + 1;
- p = nxt_port_mmap_chunk_start(port_mmap, c);
+ c = nxt_port_mmap_chunk_id(hdr, p) + 1;
+ p = nxt_port_mmap_chunk_start(hdr, c);
} else {
p = b->mem.start;
- c = nxt_port_mmap_chunk_id(port_mmap, p);
+ c = nxt_port_mmap_chunk_id(hdr, p);
}
while (p < b->mem.end) {
@@ -147,61 +75,21 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
}
-static nxt_bool_t
-nxt_port_mmap_get_free_chunk(nxt_port_mmap_t *port_mmap, nxt_chunk_id_t *c)
-{
- int ffs;
- size_t i;
- nxt_free_map_t bits;
- nxt_free_map_t *free_map;
-
- free_map = port_mmap->u.hdr->free_map;
-
- for (i = 0; i < MAX_FREE_IDX; i++) {
- bits = free_map[i];
- if (bits == 0) {
- continue;
- }
-
- ffs = __builtin_ffsll(bits);
- if (ffs != 0) {
- *c = i * FREE_BITS + ffs - 1;
- return 1;
- }
- }
-
- return 0;
-}
-
-
-nxt_inline void
-nxt_port_mmap_set_chunk_busy(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c)
-{
- nxt_atomic_and_fetch(hdr->free_map + FREE_IDX(c), ~FREE_MASK(c));
-
- nxt_thread_log_debug("set_chunk_busy: hdr %p; b %D", hdr, c);
-}
-
-
-nxt_inline void
-nxt_port_mmap_set_chunk_free(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c)
-{
- nxt_atomic_or_fetch(hdr->free_map + FREE_IDX(c), FREE_MASK(c));
-
- nxt_thread_log_debug("set_chunk_free: hdr %p; b %D", hdr, c);
-}
-
-
-nxt_port_mmap_t *
+nxt_port_mmap_header_t *
nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process,
nxt_fd_t fd)
{
- struct stat mmap_stat;
- nxt_port_mmap_t *port_mmap;
+ void *mem;
+ struct stat mmap_stat;
+ nxt_port_mmap_t *port_mmap;
+ nxt_port_mmap_header_t *hdr;
nxt_debug(task, "got new mmap fd #%FD from process %PI",
fd, process->pid);
+ port_mmap = NULL;
+ hdr = NULL;
+
if (fstat(fd, &mmap_stat) == -1) {
nxt_log(task, NXT_LOG_WARN, "fstat(%FD) failed %E", fd, nxt_errno);
@@ -216,44 +104,51 @@ nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process,
if (nxt_slow_path(process->incoming == NULL)) {
nxt_log(task, NXT_LOG_WARN, "failed to allocate incoming array");
- return NULL;
+ goto fail;
}
port_mmap = nxt_array_zero_add(process->incoming);
if (nxt_slow_path(port_mmap == NULL)) {
nxt_log(task, NXT_LOG_WARN, "failed to add mmap to incoming array");
- return NULL;
+ goto fail;
}
- port_mmap->id = process->incoming->nelts - 1;
- port_mmap->fd = -1;
- port_mmap->pid = process->pid;
- port_mmap->u.mem = nxt_mem_mmap(NULL, mmap_stat.st_size,
- PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
+ mem = nxt_mem_mmap(NULL, mmap_stat.st_size,
+ PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
- if (nxt_slow_path(port_mmap->u.mem == MAP_FAILED)) {
+ if (nxt_slow_path(mem == MAP_FAILED)) {
nxt_log(task, NXT_LOG_WARN, "mmap() failed %E", nxt_errno);
- port_mmap->u.mem = NULL;
+ port_mmap = NULL;
- return NULL;
+ goto fail;
}
- return port_mmap;
+ port_mmap->hdr = mem;
+ hdr = port_mmap->hdr;
+
+ if (nxt_slow_path(port_mmap->hdr->id != process->incoming->nelts - 1)) {
+ nxt_log(task, NXT_LOG_WARN, "port mmap id mismatch (%d != %d)",
+ port_mmap->hdr->id, process->incoming->nelts - 1);
+ }
+
+fail:
+
+ return hdr;
}
static void
nxt_port_mmap_send_fd_buf_completion(nxt_task_t *task, void *obj, void *data)
{
- nxt_buf_t *b;
- nxt_mp_t *mp;
- nxt_port_mmap_t *port_mmap;
+ nxt_fd_t fd;
+ nxt_buf_t *b;
+ nxt_mp_t *mp;
b = obj;
mp = b->data;
- port_mmap = data;
+ fd = (nxt_fd_t) (intptr_t) data;
#if (NXT_DEBUG)
if (nxt_slow_path(data != b->parent)) {
@@ -263,24 +158,27 @@ nxt_port_mmap_send_fd_buf_completion(nxt_task_t *task, void *obj, void *data)
}
#endif
- nxt_debug(task, "mmap fd %FD has been sent", port_mmap->fd);
+ nxt_debug(task, "mmap fd %FD has been sent", fd);
- nxt_fd_close(port_mmap->fd);
- port_mmap->fd = -1;
+ nxt_fd_close(fd);
nxt_buf_free(mp, b);
}
-static nxt_port_mmap_t *
+static nxt_port_mmap_header_t *
nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
nxt_port_t *port)
{
+ void *mem;
u_char *p, name[64];
+ nxt_fd_t fd;
nxt_buf_t *b;
nxt_port_mmap_t *port_mmap;
nxt_port_mmap_header_t *hdr;
+ port_mmap = NULL;
+
if (process->outgoing == NULL) {
process->outgoing = nxt_array_create(process->mem_pool, 1,
sizeof(nxt_port_mmap_t));
@@ -300,34 +198,30 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
return NULL;
}
- port_mmap->id = process->outgoing->nelts - 1;
- port_mmap->pid = process->pid;
-
p = nxt_sprintf(name, name + sizeof(name), "/nginext.%PI.%uxD",
nxt_pid, nxt_random(&nxt_random_data));
*p = '\0';
#if (NXT_HAVE_MEMFD_CREATE)
- port_mmap->fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
+ fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
- if (nxt_slow_path(port_mmap->fd == -1)) {
+ if (nxt_slow_path(fd == -1)) {
nxt_log(task, NXT_LOG_CRIT, "memfd_create(%s) failed %E",
name, nxt_errno);
goto remove_fail;
}
- nxt_debug(task, "memfd_create(%s): %FD", name, port_mmap->fd);
+ nxt_debug(task, "memfd_create(%s): %FD", name, fd);
#elif (NXT_HAVE_SHM_OPEN)
shm_unlink((char *) name); // just in case
- port_mmap->fd = shm_open((char *) name, O_CREAT | O_EXCL | O_RDWR,
- S_IRUSR | S_IWUSR);
+ fd = shm_open((char *) name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
- nxt_debug(task, "shm_open(%s): %FD", name, port_mmap->fd);
+ nxt_debug(task, "shm_open(%s): %FD", name, fd);
- if (nxt_slow_path(port_mmap->fd == -1)) {
+ if (nxt_slow_path(fd == -1)) {
nxt_log(task, NXT_LOG_CRIT, "shm_open(%s) failed %E", name, nxt_errno);
goto remove_fail;
@@ -339,20 +233,21 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
}
#endif
- if (nxt_slow_path(ftruncate(port_mmap->fd, PORT_MMAP_SIZE) == -1)) {
+ if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) {
nxt_log(task, NXT_LOG_WARN, "ftruncate() failed %E", nxt_errno);
goto remove_fail;
}
- port_mmap->u.mem = nxt_mem_mmap(NULL, PORT_MMAP_SIZE,
- PROT_READ | PROT_WRITE, MAP_SHARED,
- port_mmap->fd, 0);
+ mem = nxt_mem_mmap(NULL, PORT_MMAP_SIZE,
+ PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
- if (nxt_slow_path(port_mmap->u.mem == MAP_FAILED)) {
+ if (nxt_slow_path(mem == MAP_FAILED)) {
goto remove_fail;
}
+ port_mmap->hdr = mem;
+
b = nxt_buf_mem_alloc(port->mem_pool, 0, 0);
if (nxt_slow_path(b == NULL)) {
goto remove_fail;
@@ -360,27 +255,32 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
b->completion_handler = nxt_port_mmap_send_fd_buf_completion;
b->data = port->mem_pool;
- b->parent = port_mmap;
+ b->parent = (void *) (intptr_t) fd;
/* Init segment header. */
- hdr = port_mmap->u.hdr;
+ hdr = port_mmap->hdr;
nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
+ hdr->id = process->outgoing->nelts - 1;
+ hdr->pid = process->pid;
+
+ /* Mark first chunk as busy */
+ nxt_port_mmap_set_chunk_busy(hdr, 0);
+
/* Mark as busy chunk followed the last available chunk. */
nxt_port_mmap_set_chunk_busy(hdr, PORT_MMAP_CHUNK_COUNT);
- nxt_debug(task, "send mmap fd %FD to process %PI", port_mmap->fd,
+ nxt_debug(task, "send mmap fd %FD to process %PI", fd,
port->pid);
/* TODO handle error */
- (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, port_mmap->fd,
- 0, 0, b);
+ (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, b);
nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> %PI",
- port_mmap->id, nxt_pid, process->pid);
+ hdr->id, nxt_pid, process->pid);
- return port_mmap;
+ return hdr;
remove_fail:
@@ -390,24 +290,29 @@ remove_fail:
}
-static nxt_port_mmap_t *
+static nxt_port_mmap_header_t *
nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c,
size_t size)
{
- nxt_array_t *outgoing;
- nxt_process_t *process;
- nxt_port_mmap_t *port_mmap;
- nxt_port_mmap_t *end_port_mmap;
+ nxt_array_t *outgoing;
+ nxt_process_t *process;
+ nxt_port_mmap_t *port_mmap;
+ nxt_port_mmap_t *end_port_mmap;
+ nxt_port_mmap_header_t *hdr;
- process = nxt_runtime_process_get(task->thread->runtime, port->pid);
+ process = port->process;
if (nxt_slow_path(process == NULL)) {
return NULL;
}
*c = 0;
+ port_mmap = NULL;
+ hdr = NULL;
if (process->outgoing == NULL) {
- return nxt_port_new_port_mmap(task, process, port);
+ hdr = nxt_port_new_port_mmap(task, process, port);
+
+ goto unlock_return;
}
outgoing = process->outgoing;
@@ -416,44 +321,51 @@ nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c,
while (port_mmap < end_port_mmap) {
- if (nxt_port_mmap_get_free_chunk(port_mmap, c)) {
- return port_mmap;
+ if (nxt_port_mmap_get_free_chunk(port_mmap->hdr, c)) {
+ hdr = port_mmap->hdr;
+
+ goto unlock_return;
}
port_mmap++;
}
/* TODO introduce port_mmap limit and release wait. */
- return nxt_port_new_port_mmap(task, process, port);
+
+ hdr = nxt_port_new_port_mmap(task, process, port);
+
+unlock_return:
+
+ return hdr;
}
-static nxt_port_mmap_t *
+static nxt_port_mmap_header_t *
nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id)
{
- nxt_array_t *incoming;
- nxt_process_t *process;
- nxt_port_mmap_t *port_mmap;
+ nxt_array_t *incoming;
+ nxt_process_t *process;
+ nxt_port_mmap_t *port_mmap;
+ nxt_port_mmap_header_t *hdr;
process = nxt_runtime_process_get(task->thread->runtime, spid);
if (nxt_slow_path(process == NULL)) {
return NULL;
}
+ hdr = NULL;
+
incoming = process->incoming;
- if (nxt_slow_path(incoming == NULL)) {
- /* TODO add warning */
- return NULL;
- }
- if (nxt_slow_path(incoming->nelts <= id)) {
- /* TODO add warning */
- return NULL;
+ if (nxt_fast_path(incoming != NULL && incoming->nelts > id)) {
+ port_mmap = incoming->elts;
+ hdr = port_mmap[id].hdr;
+ } else {
+ nxt_log(task, NXT_LOG_WARN,
+ "failed to get incoming mmap #%d for process %PI", id, spid);
}
- port_mmap = incoming->elts;
-
- return port_mmap + id;
+ return hdr;
}
@@ -463,11 +375,15 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
size_t nchunks;
nxt_buf_t *b;
nxt_chunk_id_t c;
- nxt_port_mmap_t *port_mmap;
nxt_port_mmap_header_t *hdr;
nxt_debug(task, "request %z bytes shm buffer", size);
+ if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) {
+ nxt_debug(task, "requested size (%z bytes) too big", size);
+ return NULL;
+ }
+
b = nxt_mp_zalloc(port->mem_pool, NXT_BUF_PORT_MMAP_SIZE);
if (nxt_slow_path(b == NULL)) {
return NULL;
@@ -479,22 +395,18 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
nxt_buf_set_port_mmap(b);
- port_mmap = nxt_port_mmap_get(task, port, &c, size);
- if (nxt_slow_path(port_mmap == NULL)) {
+ hdr = nxt_port_mmap_get(task, port, &c, size);
+ if (nxt_slow_path(hdr == NULL)) {
nxt_buf_free(port->mem_pool, b);
return NULL;
}
- hdr = port_mmap->u.hdr;
-
- b->parent = port_mmap;
- b->mem.start = nxt_port_mmap_chunk_start(port_mmap, c);
+ b->parent = hdr;
+ b->mem.start = nxt_port_mmap_chunk_start(hdr, c);
b->mem.pos = b->mem.start;
b->mem.free = b->mem.start;
b->mem.end = b->mem.start + PORT_MMAP_CHUNK_SIZE;
- nxt_port_mmap_set_chunk_busy(hdr, c);
-
nchunks = size / PORT_MMAP_CHUNK_SIZE;
if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) {
nchunks++;
@@ -506,10 +418,9 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
/* Try to acquire as much chunks as required. */
while (nchunks > 0) {
- if (nxt_port_mmap_get_chunk_busy(hdr, c)) {
+ if (nxt_port_mmap_chk_set_chunk_busy(hdr, c) == 0) {
break;
}
- nxt_port_mmap_set_chunk_busy(hdr, c);
b->mem.end += PORT_MMAP_CHUNK_SIZE;
c++;
@@ -520,13 +431,79 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
}
+nxt_int_t
+nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size)
+{
+ size_t nchunks;
+ nxt_chunk_id_t c, start;
+ nxt_port_mmap_header_t *hdr;
+
+ nxt_debug(task, "request increase %z bytes shm buffer", size);
+
+ if (nxt_slow_path(nxt_buf_is_port_mmap(b) == 0)) {
+ nxt_log(task, NXT_LOG_WARN,
+ "failed to increase, not a mmap buffer");
+ return NXT_ERROR;
+ }
+
+ if (nxt_slow_path(size <= (size_t) nxt_buf_mem_free_size(&b->mem))) {
+ return NXT_OK;
+ }
+
+ hdr = b->parent;
+
+ start = nxt_port_mmap_chunk_id(hdr, b->mem.end);
+
+ size -= nxt_buf_mem_free_size(&b->mem);
+
+ nchunks = size / PORT_MMAP_CHUNK_SIZE;
+ if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) {
+ nchunks++;
+ }
+
+ c = start;
+
+ /* Try to acquire as much chunks as required. */
+ while (nchunks > 0) {
+
+ if (nxt_port_mmap_chk_set_chunk_busy(hdr, c) == 0) {
+ break;
+ }
+
+ c++;
+ nchunks--;
+ }
+
+ if (nchunks != 0) {
+ c--;
+ while (c >= start) {
+ nxt_port_mmap_set_chunk_free(hdr, c);
+ c--;
+ }
+
+ nxt_debug(task, "failed to increase, %d chunks busy", nchunks);
+
+ return NXT_ERROR;
+ } else {
+ b->mem.end += PORT_MMAP_CHUNK_SIZE * (c - start);
+
+ return NXT_OK;
+ }
+}
+
+
static nxt_buf_t *
nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port,
nxt_pid_t spid, nxt_port_mmap_msg_t *mmap_msg)
{
size_t nchunks;
nxt_buf_t *b;
- nxt_port_mmap_t *port_mmap;
+ nxt_port_mmap_header_t *hdr;
+
+ hdr = nxt_port_get_port_incoming_mmap(task, spid, mmap_msg->mmap_id);
+ if (nxt_slow_path(hdr == NULL)) {
+ return NULL;
+ }
b = nxt_mp_zalloc(port->mem_pool, NXT_BUF_PORT_MMAP_SIZE);
if (nxt_slow_path(b == NULL)) {
@@ -539,23 +516,17 @@ nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port,
nxt_buf_set_port_mmap(b);
- port_mmap = nxt_port_get_port_incoming_mmap(task, spid, mmap_msg->mmap_id);
- if (nxt_slow_path(port_mmap == NULL)) {
- nxt_buf_free(port->mem_pool, b);
- return NULL;
- }
-
nchunks = mmap_msg->size / PORT_MMAP_CHUNK_SIZE;
if ((mmap_msg->size % PORT_MMAP_CHUNK_SIZE) != 0) {
nchunks++;
}
- b->mem.start = nxt_port_mmap_chunk_start(port_mmap, mmap_msg->chunk_id);
+ b->mem.start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id);
b->mem.pos = b->mem.start;
b->mem.free = b->mem.start + mmap_msg->size;
b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE;
- b->parent = port_mmap;
+ b->parent = hdr;
return b;
}
@@ -565,11 +536,11 @@ void
nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port,
nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb)
{
- size_t bsize;
- nxt_buf_t *b, *bmem;
- nxt_uint_t i;
- nxt_port_mmap_t *port_mmap;
- nxt_port_mmap_msg_t *mmap_msg;
+ size_t bsize;
+ nxt_buf_t *b, *bmem;
+ nxt_uint_t i;
+ nxt_port_mmap_msg_t *mmap_msg;
+ nxt_port_mmap_header_t *hdr;
nxt_debug(task, "prepare %z bytes message for transfer to process %PI "
"via shared memory", sb->size, port->pid);
@@ -598,10 +569,10 @@ nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port,
/* TODO clear b and exit */
}
- port_mmap = (nxt_port_mmap_t *) bmem->parent;
+ hdr = bmem->parent;
- mmap_msg->mmap_id = port_mmap->id;
- mmap_msg->chunk_id = nxt_port_mmap_chunk_id(port_mmap, bmem->mem.pos);
+ mmap_msg->mmap_id = hdr->id;
+ mmap_msg->chunk_id = nxt_port_mmap_chunk_id(hdr, bmem->mem.pos);
mmap_msg->size = sb->iobuf[i].iov_len;
nxt_debug(task, "mmap_msg={%D, %D, %D} to %PI",
@@ -660,8 +631,8 @@ nxt_port_mmap_read(nxt_task_t *task, nxt_port_t *port,
nxt_port_method_t
nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b)
{
- nxt_port_mmap_t *port_mmap;
- nxt_port_method_t m;
+ nxt_port_method_t m;
+ nxt_port_mmap_header_t *hdr;
m = NXT_PORT_METHOD_ANY;
@@ -672,7 +643,7 @@ nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b)
}
if (nxt_buf_is_port_mmap(b)) {
- port_mmap = (nxt_port_mmap_t *) b->parent;
+ hdr = b->parent;
if (m == NXT_PORT_METHOD_PLAIN) {
nxt_log_error(NXT_LOG_ERR, task->log,
@@ -682,10 +653,10 @@ nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b)
break;
}
- if (port->pid != port_mmap->pid) {
+ if (port->pid != hdr->pid) {
nxt_log_error(NXT_LOG_ERR, task->log,
"send mmap buffer for %PI to %PI, "
- "using plain mode", port_mmap->pid, port->pid);
+ "using plain mode", hdr->pid, port->pid);
m = NXT_PORT_METHOD_PLAIN;
diff --git a/src/nxt_port_memory.h b/src/nxt_port_memory.h
index 9ad4e2a4..379201d0 100644
--- a/src/nxt_port_memory.h
+++ b/src/nxt_port_memory.h
@@ -1,7 +1,13 @@
+/*
+ * Copyright (C) Max Romanov
+ * Copyright (C) NGINX, Inc.
+ */
+
#ifndef _NXT_PORT_MEMORY_H_INCLUDED_
#define _NXT_PORT_MEMORY_H_INCLUDED_
+
#define PORT_MMAP_MIN_SIZE (3 * sizeof(uint32_t))
typedef struct nxt_port_mmap_header_s nxt_port_mmap_header_t;
@@ -19,7 +25,10 @@ nxt_port_mmap_destroy(nxt_port_mmap_t *port_mmap);
nxt_buf_t *
nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size);
-nxt_port_mmap_t *
+nxt_int_t nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b,
+ size_t size);
+
+nxt_port_mmap_header_t *
nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process,
nxt_fd_t fd);
@@ -47,4 +56,5 @@ typedef enum nxt_port_method_e nxt_port_method_t;
nxt_port_method_t
nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b);
+
#endif /* _NXT_PORT_MEMORY_H_INCLUDED_ */
diff --git a/src/nxt_port_memory_int.h b/src/nxt_port_memory_int.h
new file mode 100644
index 00000000..d82b6237
--- /dev/null
+++ b/src/nxt_port_memory_int.h
@@ -0,0 +1,179 @@
+
+/*
+ * Copyright (C) Max Romanov
+ * Copyright (C) NGINX, Inc.
+ */
+
+#ifndef _NXT_PORT_MEMORY_INT_H_INCLUDED_
+#define _NXT_PORT_MEMORY_INT_H_INCLUDED_
+
+
+#include <stdint.h>
+#include <nxt_atomic.h>
+
+
+#ifdef NXT_MMAP_TINY_CHUNK
+
+#define PORT_MMAP_CHUNK_SIZE 16
+#define PORT_MMAP_HEADER_SIZE 1024
+#define PORT_MMAP_DATA_SIZE 1024
+
+#else
+
+#define PORT_MMAP_CHUNK_SIZE (1024 * 16)
+#define PORT_MMAP_HEADER_SIZE (1024 * 4)
+#define PORT_MMAP_DATA_SIZE (1024 * 1024 * 10)
+
+#endif
+
+
+#define PORT_MMAP_SIZE (PORT_MMAP_HEADER_SIZE + PORT_MMAP_DATA_SIZE)
+#define PORT_MMAP_CHUNK_COUNT (PORT_MMAP_DATA_SIZE / PORT_MMAP_CHUNK_SIZE)
+
+
+typedef uint32_t nxt_chunk_id_t;
+
+typedef nxt_atomic_uint_t nxt_free_map_t;
+
+#define FREE_BITS (sizeof(nxt_free_map_t) * 8)
+
+#define FREE_IDX(nchunk) ((nchunk) / FREE_BITS)
+
+#define FREE_MASK(nchunk) \
+ ( 1ULL << ( (nchunk) % FREE_BITS ) )
+
+#define MAX_FREE_IDX FREE_IDX(PORT_MMAP_CHUNK_COUNT)
+
+
+/* Mapped at the start of shared memory segment. */
+struct nxt_port_mmap_header_s {
+ uint32_t id;
+ nxt_pid_t pid; /* For sanity check. */
+ nxt_free_map_t free_map[MAX_FREE_IDX];
+};
+
+
+/*
+ * Element of nxt_process_t.incoming/outgoing, shared memory segment
+ * descriptor.
+ */
+struct nxt_port_mmap_s {
+ nxt_port_mmap_header_t *hdr;
+};
+
+typedef struct nxt_port_mmap_msg_s nxt_port_mmap_msg_t;
+
+/* Passed as a second iov chunk when 'mmap' bit in nxt_port_msg_t is 1. */
+struct nxt_port_mmap_msg_s {
+ uint32_t mmap_id; /* Mmap index in nxt_process_t.outgoing. */
+ nxt_chunk_id_t chunk_id; /* Mmap chunk index. */
+ uint32_t size; /* Payload data size. */
+};
+
+
+static nxt_bool_t
+nxt_port_mmap_get_free_chunk(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t *c);
+
+#define nxt_port_mmap_get_chunk_busy(hdr, c) \
+ ((hdr->free_map[FREE_IDX(c)] & FREE_MASK(c)) == 0)
+
+nxt_inline void
+nxt_port_mmap_set_chunk_busy(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c);
+
+nxt_inline nxt_bool_t
+nxt_port_mmap_chk_set_chunk_busy(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c);
+
+nxt_inline void
+nxt_port_mmap_set_chunk_free(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c);
+
+nxt_inline nxt_chunk_id_t
+nxt_port_mmap_chunk_id(nxt_port_mmap_header_t *hdr, u_char *p)
+{
+ u_char *mm_start;
+
+ mm_start = (u_char *) hdr;
+
+ return ((p - mm_start) - PORT_MMAP_HEADER_SIZE) / PORT_MMAP_CHUNK_SIZE;
+}
+
+
+nxt_inline u_char *
+nxt_port_mmap_chunk_start(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c)
+{
+ u_char *mm_start;
+
+ mm_start = (u_char *) hdr;
+
+ return mm_start + PORT_MMAP_HEADER_SIZE + c * PORT_MMAP_CHUNK_SIZE;
+}
+
+
+static nxt_bool_t
+nxt_port_mmap_get_free_chunk(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t *c)
+{
+ int ffs;
+ size_t i;
+ nxt_chunk_id_t chunk;
+ nxt_free_map_t bits;
+ nxt_free_map_t *free_map;
+
+ free_map = hdr->free_map;
+
+ for (i = 0; i < MAX_FREE_IDX; i++) {
+ bits = free_map[i];
+ if (bits == 0) {
+ continue;
+ }
+
+ ffs = __builtin_ffsll(bits);
+ if (ffs != 0) {
+ chunk = i * FREE_BITS + ffs - 1;
+
+ if (nxt_port_mmap_chk_set_chunk_busy(hdr, chunk)) {
+ *c = chunk;
+ return 1;
+ }
+ }
+ }
+
+ return 0;
+}
+
+
+nxt_inline void
+nxt_port_mmap_set_chunk_busy(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c)
+{
+ nxt_atomic_and_fetch(hdr->free_map + FREE_IDX(c), ~FREE_MASK(c));
+}
+
+
+nxt_inline nxt_bool_t
+nxt_port_mmap_chk_set_chunk_busy(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c)
+{
+ nxt_free_map_t *f;
+ nxt_free_map_t free_val, busy_val;
+
+ f = hdr->free_map + FREE_IDX(c);
+
+ while ( (*f & FREE_MASK(c)) != 0 ) {
+
+ free_val = *f | FREE_MASK(c);
+ busy_val = free_val & ~FREE_MASK(c);
+
+ if (nxt_atomic_cmp_set(f, free_val, busy_val) != 0) {
+ return 1;
+ }
+ }
+
+ return 0;
+}
+
+
+nxt_inline void
+nxt_port_mmap_set_chunk_free(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c)
+{
+ nxt_atomic_or_fetch(hdr->free_map + FREE_IDX(c), FREE_MASK(c));
+}
+
+
+#endif /* _NXT_PORT_MEMORY_INT_H_INCLUDED_ */