diff options
-rw-r--r-- | auto/sources | 1 | ||||
-rw-r--r-- | src/nxt_port_memory.c | 433 | ||||
-rw-r--r-- | src/nxt_port_memory.h | 12 | ||||
-rw-r--r-- | src/nxt_port_memory_int.h | 179 |
4 files changed, 393 insertions, 232 deletions
diff --git a/auto/sources b/auto/sources index df7c9959..33c97e1f 100644 --- a/auto/sources +++ b/auto/sources @@ -20,6 +20,7 @@ NXT_LIB_DEPS=" \ src/nxt_port.h \ src/nxt_port_hash.h \ src/nxt_port_memory.h \ + src/nxt_port_memory_int.h \ src/nxt_dyld.h \ src/nxt_thread.h \ src/nxt_thread_id.h \ diff --git a/src/nxt_port_memory.c b/src/nxt_port_memory.c index 2af03f67..5832ca25 100644 --- a/src/nxt_port_memory.c +++ b/src/nxt_port_memory.c @@ -1,4 +1,9 @@ +/* + * Copyright (C) Max Romanov + * Copyright (C) NGINX, Inc. + */ + #include <nxt_main.h> #if (NXT_HAVE_MEMFD_CREATE) @@ -9,89 +14,14 @@ #endif -#define PORT_MMAP_CHUNK_SIZE (1024 * 16) -#define PORT_MMAP_HEADER_SIZE (1024 * 4) -#define PORT_MMAP_SIZE (PORT_MMAP_HEADER_SIZE + 1024 * 1024 * 10) - -#define PORT_MMAP_CHUNK_COUNT \ - ( (PORT_MMAP_SIZE - PORT_MMAP_HEADER_SIZE) / PORT_MMAP_CHUNK_SIZE ) - - -typedef uint32_t nxt_chunk_id_t; - -typedef nxt_atomic_uint_t nxt_free_map_t; - -#define FREE_BITS (sizeof(nxt_free_map_t) * 8) - -#define FREE_IDX(nchunk) ((nchunk) / FREE_BITS) - -#define FREE_MASK(nchunk) \ - ( 1ULL << ( (nchunk) % FREE_BITS ) ) - -#define MAX_FREE_IDX FREE_IDX(PORT_MMAP_CHUNK_COUNT) - - -/* Mapped at the start of shared memory segment. */ -struct nxt_port_mmap_header_s { - nxt_free_map_t free_map[MAX_FREE_IDX]; -}; - - -/* - * Element of nxt_process_t.incoming/outgoing, shared memory segment - * descriptor. - */ -struct nxt_port_mmap_s { - uint32_t id; - nxt_fd_t fd; - nxt_pid_t pid; /* For sanity check. */ - union { - void *mem; - nxt_port_mmap_header_t *hdr; - } u; -}; - - -/* Passed as a second iov chunk when 'mmap' bit in nxt_port_msg_t is 1. */ -typedef struct { - uint32_t mmap_id; /* Mmap index in nxt_process_t.outgoing. */ - nxt_chunk_id_t chunk_id; /* Mmap chunk index. */ - uint32_t size; /* Payload data size. */ -} nxt_port_mmap_msg_t; - - -static nxt_bool_t -nxt_port_mmap_get_free_chunk(nxt_port_mmap_t *port_mmap, nxt_chunk_id_t *c); - -#define nxt_port_mmap_get_chunk_busy(hdr, c) \ - ((hdr->free_map[FREE_IDX(c)] & FREE_MASK(c)) == 0) - -nxt_inline void -nxt_port_mmap_set_chunk_busy(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c); - -nxt_inline void -nxt_port_mmap_set_chunk_free(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c); - -#define nxt_port_mmap_chunk_id(port_mmap, b) \ - ((((u_char *) (b) - (u_char *) (port_mmap->u.mem)) - \ - PORT_MMAP_HEADER_SIZE) / PORT_MMAP_CHUNK_SIZE) - -#define nxt_port_mmap_chunk_start(port_mmap, chunk) \ - (((u_char *) (port_mmap->u.mem)) + PORT_MMAP_HEADER_SIZE + \ - (chunk) * PORT_MMAP_CHUNK_SIZE) - +#include <nxt_port_memory_int.h> void nxt_port_mmap_destroy(nxt_port_mmap_t *port_mmap) { - if (port_mmap->u.mem != NULL) { - nxt_mem_munmap(port_mmap->u.mem, PORT_MMAP_SIZE); - port_mmap->u.mem = NULL; - } - - if (port_mmap->fd != -1) { - nxt_fd_close(port_mmap->fd); - port_mmap->fd = -1; + if (port_mmap->hdr != NULL) { + nxt_mem_munmap(port_mmap->hdr, PORT_MMAP_SIZE); + port_mmap->hdr = NULL; } } @@ -103,7 +33,6 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data) nxt_mp_t *mp; nxt_buf_t *b; nxt_chunk_id_t c; - nxt_port_mmap_t *port_mmap; nxt_port_mmap_header_t *hdr; b = obj; @@ -120,8 +49,7 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data) } #endif - port_mmap = data; - hdr = port_mmap->u.hdr; + hdr = data; if (b->is_port_mmap_sent && b->mem.pos > b->mem.start) { /* @@ -129,11 +57,11 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data) * let's release rest (if any). */ p = b->mem.pos - 1; - c = nxt_port_mmap_chunk_id(port_mmap, p) + 1; - p = nxt_port_mmap_chunk_start(port_mmap, c); + c = nxt_port_mmap_chunk_id(hdr, p) + 1; + p = nxt_port_mmap_chunk_start(hdr, c); } else { p = b->mem.start; - c = nxt_port_mmap_chunk_id(port_mmap, p); + c = nxt_port_mmap_chunk_id(hdr, p); } while (p < b->mem.end) { @@ -147,61 +75,21 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data) } -static nxt_bool_t -nxt_port_mmap_get_free_chunk(nxt_port_mmap_t *port_mmap, nxt_chunk_id_t *c) -{ - int ffs; - size_t i; - nxt_free_map_t bits; - nxt_free_map_t *free_map; - - free_map = port_mmap->u.hdr->free_map; - - for (i = 0; i < MAX_FREE_IDX; i++) { - bits = free_map[i]; - if (bits == 0) { - continue; - } - - ffs = __builtin_ffsll(bits); - if (ffs != 0) { - *c = i * FREE_BITS + ffs - 1; - return 1; - } - } - - return 0; -} - - -nxt_inline void -nxt_port_mmap_set_chunk_busy(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c) -{ - nxt_atomic_and_fetch(hdr->free_map + FREE_IDX(c), ~FREE_MASK(c)); - - nxt_thread_log_debug("set_chunk_busy: hdr %p; b %D", hdr, c); -} - - -nxt_inline void -nxt_port_mmap_set_chunk_free(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c) -{ - nxt_atomic_or_fetch(hdr->free_map + FREE_IDX(c), FREE_MASK(c)); - - nxt_thread_log_debug("set_chunk_free: hdr %p; b %D", hdr, c); -} - - -nxt_port_mmap_t * +nxt_port_mmap_header_t * nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process, nxt_fd_t fd) { - struct stat mmap_stat; - nxt_port_mmap_t *port_mmap; + void *mem; + struct stat mmap_stat; + nxt_port_mmap_t *port_mmap; + nxt_port_mmap_header_t *hdr; nxt_debug(task, "got new mmap fd #%FD from process %PI", fd, process->pid); + port_mmap = NULL; + hdr = NULL; + if (fstat(fd, &mmap_stat) == -1) { nxt_log(task, NXT_LOG_WARN, "fstat(%FD) failed %E", fd, nxt_errno); @@ -216,44 +104,51 @@ nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process, if (nxt_slow_path(process->incoming == NULL)) { nxt_log(task, NXT_LOG_WARN, "failed to allocate incoming array"); - return NULL; + goto fail; } port_mmap = nxt_array_zero_add(process->incoming); if (nxt_slow_path(port_mmap == NULL)) { nxt_log(task, NXT_LOG_WARN, "failed to add mmap to incoming array"); - return NULL; + goto fail; } - port_mmap->id = process->incoming->nelts - 1; - port_mmap->fd = -1; - port_mmap->pid = process->pid; - port_mmap->u.mem = nxt_mem_mmap(NULL, mmap_stat.st_size, - PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + mem = nxt_mem_mmap(NULL, mmap_stat.st_size, + PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); - if (nxt_slow_path(port_mmap->u.mem == MAP_FAILED)) { + if (nxt_slow_path(mem == MAP_FAILED)) { nxt_log(task, NXT_LOG_WARN, "mmap() failed %E", nxt_errno); - port_mmap->u.mem = NULL; + port_mmap = NULL; - return NULL; + goto fail; } - return port_mmap; + port_mmap->hdr = mem; + hdr = port_mmap->hdr; + + if (nxt_slow_path(port_mmap->hdr->id != process->incoming->nelts - 1)) { + nxt_log(task, NXT_LOG_WARN, "port mmap id mismatch (%d != %d)", + port_mmap->hdr->id, process->incoming->nelts - 1); + } + +fail: + + return hdr; } static void nxt_port_mmap_send_fd_buf_completion(nxt_task_t *task, void *obj, void *data) { - nxt_buf_t *b; - nxt_mp_t *mp; - nxt_port_mmap_t *port_mmap; + nxt_fd_t fd; + nxt_buf_t *b; + nxt_mp_t *mp; b = obj; mp = b->data; - port_mmap = data; + fd = (nxt_fd_t) (intptr_t) data; #if (NXT_DEBUG) if (nxt_slow_path(data != b->parent)) { @@ -263,24 +158,27 @@ nxt_port_mmap_send_fd_buf_completion(nxt_task_t *task, void *obj, void *data) } #endif - nxt_debug(task, "mmap fd %FD has been sent", port_mmap->fd); + nxt_debug(task, "mmap fd %FD has been sent", fd); - nxt_fd_close(port_mmap->fd); - port_mmap->fd = -1; + nxt_fd_close(fd); nxt_buf_free(mp, b); } -static nxt_port_mmap_t * +static nxt_port_mmap_header_t * nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, nxt_port_t *port) { + void *mem; u_char *p, name[64]; + nxt_fd_t fd; nxt_buf_t *b; nxt_port_mmap_t *port_mmap; nxt_port_mmap_header_t *hdr; + port_mmap = NULL; + if (process->outgoing == NULL) { process->outgoing = nxt_array_create(process->mem_pool, 1, sizeof(nxt_port_mmap_t)); @@ -300,34 +198,30 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, return NULL; } - port_mmap->id = process->outgoing->nelts - 1; - port_mmap->pid = process->pid; - p = nxt_sprintf(name, name + sizeof(name), "/nginext.%PI.%uxD", nxt_pid, nxt_random(&nxt_random_data)); *p = '\0'; #if (NXT_HAVE_MEMFD_CREATE) - port_mmap->fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC); + fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC); - if (nxt_slow_path(port_mmap->fd == -1)) { + if (nxt_slow_path(fd == -1)) { nxt_log(task, NXT_LOG_CRIT, "memfd_create(%s) failed %E", name, nxt_errno); goto remove_fail; } - nxt_debug(task, "memfd_create(%s): %FD", name, port_mmap->fd); + nxt_debug(task, "memfd_create(%s): %FD", name, fd); #elif (NXT_HAVE_SHM_OPEN) shm_unlink((char *) name); // just in case - port_mmap->fd = shm_open((char *) name, O_CREAT | O_EXCL | O_RDWR, - S_IRUSR | S_IWUSR); + fd = shm_open((char *) name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR); - nxt_debug(task, "shm_open(%s): %FD", name, port_mmap->fd); + nxt_debug(task, "shm_open(%s): %FD", name, fd); - if (nxt_slow_path(port_mmap->fd == -1)) { + if (nxt_slow_path(fd == -1)) { nxt_log(task, NXT_LOG_CRIT, "shm_open(%s) failed %E", name, nxt_errno); goto remove_fail; @@ -339,20 +233,21 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, } #endif - if (nxt_slow_path(ftruncate(port_mmap->fd, PORT_MMAP_SIZE) == -1)) { + if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) { nxt_log(task, NXT_LOG_WARN, "ftruncate() failed %E", nxt_errno); goto remove_fail; } - port_mmap->u.mem = nxt_mem_mmap(NULL, PORT_MMAP_SIZE, - PROT_READ | PROT_WRITE, MAP_SHARED, - port_mmap->fd, 0); + mem = nxt_mem_mmap(NULL, PORT_MMAP_SIZE, + PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); - if (nxt_slow_path(port_mmap->u.mem == MAP_FAILED)) { + if (nxt_slow_path(mem == MAP_FAILED)) { goto remove_fail; } + port_mmap->hdr = mem; + b = nxt_buf_mem_alloc(port->mem_pool, 0, 0); if (nxt_slow_path(b == NULL)) { goto remove_fail; @@ -360,27 +255,32 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, b->completion_handler = nxt_port_mmap_send_fd_buf_completion; b->data = port->mem_pool; - b->parent = port_mmap; + b->parent = (void *) (intptr_t) fd; /* Init segment header. */ - hdr = port_mmap->u.hdr; + hdr = port_mmap->hdr; nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map)); + hdr->id = process->outgoing->nelts - 1; + hdr->pid = process->pid; + + /* Mark first chunk as busy */ + nxt_port_mmap_set_chunk_busy(hdr, 0); + /* Mark as busy chunk followed the last available chunk. */ nxt_port_mmap_set_chunk_busy(hdr, PORT_MMAP_CHUNK_COUNT); - nxt_debug(task, "send mmap fd %FD to process %PI", port_mmap->fd, + nxt_debug(task, "send mmap fd %FD to process %PI", fd, port->pid); /* TODO handle error */ - (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, port_mmap->fd, - 0, 0, b); + (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, b); nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> %PI", - port_mmap->id, nxt_pid, process->pid); + hdr->id, nxt_pid, process->pid); - return port_mmap; + return hdr; remove_fail: @@ -390,24 +290,29 @@ remove_fail: } -static nxt_port_mmap_t * +static nxt_port_mmap_header_t * nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c, size_t size) { - nxt_array_t *outgoing; - nxt_process_t *process; - nxt_port_mmap_t *port_mmap; - nxt_port_mmap_t *end_port_mmap; + nxt_array_t *outgoing; + nxt_process_t *process; + nxt_port_mmap_t *port_mmap; + nxt_port_mmap_t *end_port_mmap; + nxt_port_mmap_header_t *hdr; - process = nxt_runtime_process_get(task->thread->runtime, port->pid); + process = port->process; if (nxt_slow_path(process == NULL)) { return NULL; } *c = 0; + port_mmap = NULL; + hdr = NULL; if (process->outgoing == NULL) { - return nxt_port_new_port_mmap(task, process, port); + hdr = nxt_port_new_port_mmap(task, process, port); + + goto unlock_return; } outgoing = process->outgoing; @@ -416,44 +321,51 @@ nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c, while (port_mmap < end_port_mmap) { - if (nxt_port_mmap_get_free_chunk(port_mmap, c)) { - return port_mmap; + if (nxt_port_mmap_get_free_chunk(port_mmap->hdr, c)) { + hdr = port_mmap->hdr; + + goto unlock_return; } port_mmap++; } /* TODO introduce port_mmap limit and release wait. */ - return nxt_port_new_port_mmap(task, process, port); + + hdr = nxt_port_new_port_mmap(task, process, port); + +unlock_return: + + return hdr; } -static nxt_port_mmap_t * +static nxt_port_mmap_header_t * nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id) { - nxt_array_t *incoming; - nxt_process_t *process; - nxt_port_mmap_t *port_mmap; + nxt_array_t *incoming; + nxt_process_t *process; + nxt_port_mmap_t *port_mmap; + nxt_port_mmap_header_t *hdr; process = nxt_runtime_process_get(task->thread->runtime, spid); if (nxt_slow_path(process == NULL)) { return NULL; } + hdr = NULL; + incoming = process->incoming; - if (nxt_slow_path(incoming == NULL)) { - /* TODO add warning */ - return NULL; - } - if (nxt_slow_path(incoming->nelts <= id)) { - /* TODO add warning */ - return NULL; + if (nxt_fast_path(incoming != NULL && incoming->nelts > id)) { + port_mmap = incoming->elts; + hdr = port_mmap[id].hdr; + } else { + nxt_log(task, NXT_LOG_WARN, + "failed to get incoming mmap #%d for process %PI", id, spid); } - port_mmap = incoming->elts; - - return port_mmap + id; + return hdr; } @@ -463,11 +375,15 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size) size_t nchunks; nxt_buf_t *b; nxt_chunk_id_t c; - nxt_port_mmap_t *port_mmap; nxt_port_mmap_header_t *hdr; nxt_debug(task, "request %z bytes shm buffer", size); + if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) { + nxt_debug(task, "requested size (%z bytes) too big", size); + return NULL; + } + b = nxt_mp_zalloc(port->mem_pool, NXT_BUF_PORT_MMAP_SIZE); if (nxt_slow_path(b == NULL)) { return NULL; @@ -479,22 +395,18 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size) nxt_buf_set_port_mmap(b); - port_mmap = nxt_port_mmap_get(task, port, &c, size); - if (nxt_slow_path(port_mmap == NULL)) { + hdr = nxt_port_mmap_get(task, port, &c, size); + if (nxt_slow_path(hdr == NULL)) { nxt_buf_free(port->mem_pool, b); return NULL; } - hdr = port_mmap->u.hdr; - - b->parent = port_mmap; - b->mem.start = nxt_port_mmap_chunk_start(port_mmap, c); + b->parent = hdr; + b->mem.start = nxt_port_mmap_chunk_start(hdr, c); b->mem.pos = b->mem.start; b->mem.free = b->mem.start; b->mem.end = b->mem.start + PORT_MMAP_CHUNK_SIZE; - nxt_port_mmap_set_chunk_busy(hdr, c); - nchunks = size / PORT_MMAP_CHUNK_SIZE; if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) { nchunks++; @@ -506,10 +418,9 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size) /* Try to acquire as much chunks as required. */ while (nchunks > 0) { - if (nxt_port_mmap_get_chunk_busy(hdr, c)) { + if (nxt_port_mmap_chk_set_chunk_busy(hdr, c) == 0) { break; } - nxt_port_mmap_set_chunk_busy(hdr, c); b->mem.end += PORT_MMAP_CHUNK_SIZE; c++; @@ -520,13 +431,79 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size) } +nxt_int_t +nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size) +{ + size_t nchunks; + nxt_chunk_id_t c, start; + nxt_port_mmap_header_t *hdr; + + nxt_debug(task, "request increase %z bytes shm buffer", size); + + if (nxt_slow_path(nxt_buf_is_port_mmap(b) == 0)) { + nxt_log(task, NXT_LOG_WARN, + "failed to increase, not a mmap buffer"); + return NXT_ERROR; + } + + if (nxt_slow_path(size <= (size_t) nxt_buf_mem_free_size(&b->mem))) { + return NXT_OK; + } + + hdr = b->parent; + + start = nxt_port_mmap_chunk_id(hdr, b->mem.end); + + size -= nxt_buf_mem_free_size(&b->mem); + + nchunks = size / PORT_MMAP_CHUNK_SIZE; + if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) { + nchunks++; + } + + c = start; + + /* Try to acquire as much chunks as required. */ + while (nchunks > 0) { + + if (nxt_port_mmap_chk_set_chunk_busy(hdr, c) == 0) { + break; + } + + c++; + nchunks--; + } + + if (nchunks != 0) { + c--; + while (c >= start) { + nxt_port_mmap_set_chunk_free(hdr, c); + c--; + } + + nxt_debug(task, "failed to increase, %d chunks busy", nchunks); + + return NXT_ERROR; + } else { + b->mem.end += PORT_MMAP_CHUNK_SIZE * (c - start); + + return NXT_OK; + } +} + + static nxt_buf_t * nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port, nxt_pid_t spid, nxt_port_mmap_msg_t *mmap_msg) { size_t nchunks; nxt_buf_t *b; - nxt_port_mmap_t *port_mmap; + nxt_port_mmap_header_t *hdr; + + hdr = nxt_port_get_port_incoming_mmap(task, spid, mmap_msg->mmap_id); + if (nxt_slow_path(hdr == NULL)) { + return NULL; + } b = nxt_mp_zalloc(port->mem_pool, NXT_BUF_PORT_MMAP_SIZE); if (nxt_slow_path(b == NULL)) { @@ -539,23 +516,17 @@ nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port, nxt_buf_set_port_mmap(b); - port_mmap = nxt_port_get_port_incoming_mmap(task, spid, mmap_msg->mmap_id); - if (nxt_slow_path(port_mmap == NULL)) { - nxt_buf_free(port->mem_pool, b); - return NULL; - } - nchunks = mmap_msg->size / PORT_MMAP_CHUNK_SIZE; if ((mmap_msg->size % PORT_MMAP_CHUNK_SIZE) != 0) { nchunks++; } - b->mem.start = nxt_port_mmap_chunk_start(port_mmap, mmap_msg->chunk_id); + b->mem.start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id); b->mem.pos = b->mem.start; b->mem.free = b->mem.start + mmap_msg->size; b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE; - b->parent = port_mmap; + b->parent = hdr; return b; } @@ -565,11 +536,11 @@ void nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port, nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb) { - size_t bsize; - nxt_buf_t *b, *bmem; - nxt_uint_t i; - nxt_port_mmap_t *port_mmap; - nxt_port_mmap_msg_t *mmap_msg; + size_t bsize; + nxt_buf_t *b, *bmem; + nxt_uint_t i; + nxt_port_mmap_msg_t *mmap_msg; + nxt_port_mmap_header_t *hdr; nxt_debug(task, "prepare %z bytes message for transfer to process %PI " "via shared memory", sb->size, port->pid); @@ -598,10 +569,10 @@ nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port, /* TODO clear b and exit */ } - port_mmap = (nxt_port_mmap_t *) bmem->parent; + hdr = bmem->parent; - mmap_msg->mmap_id = port_mmap->id; - mmap_msg->chunk_id = nxt_port_mmap_chunk_id(port_mmap, bmem->mem.pos); + mmap_msg->mmap_id = hdr->id; + mmap_msg->chunk_id = nxt_port_mmap_chunk_id(hdr, bmem->mem.pos); mmap_msg->size = sb->iobuf[i].iov_len; nxt_debug(task, "mmap_msg={%D, %D, %D} to %PI", @@ -660,8 +631,8 @@ nxt_port_mmap_read(nxt_task_t *task, nxt_port_t *port, nxt_port_method_t nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b) { - nxt_port_mmap_t *port_mmap; - nxt_port_method_t m; + nxt_port_method_t m; + nxt_port_mmap_header_t *hdr; m = NXT_PORT_METHOD_ANY; @@ -672,7 +643,7 @@ nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b) } if (nxt_buf_is_port_mmap(b)) { - port_mmap = (nxt_port_mmap_t *) b->parent; + hdr = b->parent; if (m == NXT_PORT_METHOD_PLAIN) { nxt_log_error(NXT_LOG_ERR, task->log, @@ -682,10 +653,10 @@ nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b) break; } - if (port->pid != port_mmap->pid) { + if (port->pid != hdr->pid) { nxt_log_error(NXT_LOG_ERR, task->log, "send mmap buffer for %PI to %PI, " - "using plain mode", port_mmap->pid, port->pid); + "using plain mode", hdr->pid, port->pid); m = NXT_PORT_METHOD_PLAIN; diff --git a/src/nxt_port_memory.h b/src/nxt_port_memory.h index 9ad4e2a4..379201d0 100644 --- a/src/nxt_port_memory.h +++ b/src/nxt_port_memory.h @@ -1,7 +1,13 @@ +/* + * Copyright (C) Max Romanov + * Copyright (C) NGINX, Inc. + */ + #ifndef _NXT_PORT_MEMORY_H_INCLUDED_ #define _NXT_PORT_MEMORY_H_INCLUDED_ + #define PORT_MMAP_MIN_SIZE (3 * sizeof(uint32_t)) typedef struct nxt_port_mmap_header_s nxt_port_mmap_header_t; @@ -19,7 +25,10 @@ nxt_port_mmap_destroy(nxt_port_mmap_t *port_mmap); nxt_buf_t * nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size); -nxt_port_mmap_t * +nxt_int_t nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, + size_t size); + +nxt_port_mmap_header_t * nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process, nxt_fd_t fd); @@ -47,4 +56,5 @@ typedef enum nxt_port_method_e nxt_port_method_t; nxt_port_method_t nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b); + #endif /* _NXT_PORT_MEMORY_H_INCLUDED_ */ diff --git a/src/nxt_port_memory_int.h b/src/nxt_port_memory_int.h new file mode 100644 index 00000000..d82b6237 --- /dev/null +++ b/src/nxt_port_memory_int.h @@ -0,0 +1,179 @@ + +/* + * Copyright (C) Max Romanov + * Copyright (C) NGINX, Inc. + */ + +#ifndef _NXT_PORT_MEMORY_INT_H_INCLUDED_ +#define _NXT_PORT_MEMORY_INT_H_INCLUDED_ + + +#include <stdint.h> +#include <nxt_atomic.h> + + +#ifdef NXT_MMAP_TINY_CHUNK + +#define PORT_MMAP_CHUNK_SIZE 16 +#define PORT_MMAP_HEADER_SIZE 1024 +#define PORT_MMAP_DATA_SIZE 1024 + +#else + +#define PORT_MMAP_CHUNK_SIZE (1024 * 16) +#define PORT_MMAP_HEADER_SIZE (1024 * 4) +#define PORT_MMAP_DATA_SIZE (1024 * 1024 * 10) + +#endif + + +#define PORT_MMAP_SIZE (PORT_MMAP_HEADER_SIZE + PORT_MMAP_DATA_SIZE) +#define PORT_MMAP_CHUNK_COUNT (PORT_MMAP_DATA_SIZE / PORT_MMAP_CHUNK_SIZE) + + +typedef uint32_t nxt_chunk_id_t; + +typedef nxt_atomic_uint_t nxt_free_map_t; + +#define FREE_BITS (sizeof(nxt_free_map_t) * 8) + +#define FREE_IDX(nchunk) ((nchunk) / FREE_BITS) + +#define FREE_MASK(nchunk) \ + ( 1ULL << ( (nchunk) % FREE_BITS ) ) + +#define MAX_FREE_IDX FREE_IDX(PORT_MMAP_CHUNK_COUNT) + + +/* Mapped at the start of shared memory segment. */ +struct nxt_port_mmap_header_s { + uint32_t id; + nxt_pid_t pid; /* For sanity check. */ + nxt_free_map_t free_map[MAX_FREE_IDX]; +}; + + +/* + * Element of nxt_process_t.incoming/outgoing, shared memory segment + * descriptor. + */ +struct nxt_port_mmap_s { + nxt_port_mmap_header_t *hdr; +}; + +typedef struct nxt_port_mmap_msg_s nxt_port_mmap_msg_t; + +/* Passed as a second iov chunk when 'mmap' bit in nxt_port_msg_t is 1. */ +struct nxt_port_mmap_msg_s { + uint32_t mmap_id; /* Mmap index in nxt_process_t.outgoing. */ + nxt_chunk_id_t chunk_id; /* Mmap chunk index. */ + uint32_t size; /* Payload data size. */ +}; + + +static nxt_bool_t +nxt_port_mmap_get_free_chunk(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t *c); + +#define nxt_port_mmap_get_chunk_busy(hdr, c) \ + ((hdr->free_map[FREE_IDX(c)] & FREE_MASK(c)) == 0) + +nxt_inline void +nxt_port_mmap_set_chunk_busy(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c); + +nxt_inline nxt_bool_t +nxt_port_mmap_chk_set_chunk_busy(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c); + +nxt_inline void +nxt_port_mmap_set_chunk_free(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c); + +nxt_inline nxt_chunk_id_t +nxt_port_mmap_chunk_id(nxt_port_mmap_header_t *hdr, u_char *p) +{ + u_char *mm_start; + + mm_start = (u_char *) hdr; + + return ((p - mm_start) - PORT_MMAP_HEADER_SIZE) / PORT_MMAP_CHUNK_SIZE; +} + + +nxt_inline u_char * +nxt_port_mmap_chunk_start(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c) +{ + u_char *mm_start; + + mm_start = (u_char *) hdr; + + return mm_start + PORT_MMAP_HEADER_SIZE + c * PORT_MMAP_CHUNK_SIZE; +} + + +static nxt_bool_t +nxt_port_mmap_get_free_chunk(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t *c) +{ + int ffs; + size_t i; + nxt_chunk_id_t chunk; + nxt_free_map_t bits; + nxt_free_map_t *free_map; + + free_map = hdr->free_map; + + for (i = 0; i < MAX_FREE_IDX; i++) { + bits = free_map[i]; + if (bits == 0) { + continue; + } + + ffs = __builtin_ffsll(bits); + if (ffs != 0) { + chunk = i * FREE_BITS + ffs - 1; + + if (nxt_port_mmap_chk_set_chunk_busy(hdr, chunk)) { + *c = chunk; + return 1; + } + } + } + + return 0; +} + + +nxt_inline void +nxt_port_mmap_set_chunk_busy(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c) +{ + nxt_atomic_and_fetch(hdr->free_map + FREE_IDX(c), ~FREE_MASK(c)); +} + + +nxt_inline nxt_bool_t +nxt_port_mmap_chk_set_chunk_busy(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c) +{ + nxt_free_map_t *f; + nxt_free_map_t free_val, busy_val; + + f = hdr->free_map + FREE_IDX(c); + + while ( (*f & FREE_MASK(c)) != 0 ) { + + free_val = *f | FREE_MASK(c); + busy_val = free_val & ~FREE_MASK(c); + + if (nxt_atomic_cmp_set(f, free_val, busy_val) != 0) { + return 1; + } + } + + return 0; +} + + +nxt_inline void +nxt_port_mmap_set_chunk_free(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c) +{ + nxt_atomic_or_fetch(hdr->free_map + FREE_IDX(c), FREE_MASK(c)); +} + + +#endif /* _NXT_PORT_MEMORY_INT_H_INCLUDED_ */ |