summaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/nxt_port_memory.c221
-rw-r--r--src/nxt_port_memory.h5
-rw-r--r--src/nxt_port_memory_int.h7
3 files changed, 146 insertions, 87 deletions
diff --git a/src/nxt_port_memory.c b/src/nxt_port_memory.c
index db2ad2df..fa2ce449 100644
--- a/src/nxt_port_memory.c
+++ b/src/nxt_port_memory.c
@@ -16,12 +16,21 @@
#include <nxt_port_memory_int.h>
+
nxt_inline void
-nxt_port_mmap_destroy(nxt_port_mmap_t *port_mmap)
+nxt_port_mmap_handler_use(nxt_port_mmap_handler_t *mmap_handler, int i)
{
- if (port_mmap->hdr != NULL) {
- nxt_mem_munmap(port_mmap->hdr, PORT_MMAP_SIZE);
- port_mmap->hdr = NULL;
+ int c;
+
+ c = nxt_atomic_fetch_add(&mmap_handler->use_count, i);
+
+ if (i < 0 && c == -i) {
+ if (mmap_handler->hdr != NULL) {
+ nxt_mem_munmap(mmap_handler->hdr, PORT_MMAP_SIZE);
+ mmap_handler->hdr = NULL;
+ }
+
+ nxt_free(mmap_handler);
}
}
@@ -70,7 +79,7 @@ nxt_port_mmap_at(nxt_port_mmaps_t *port_mmaps, uint32_t i)
void
-nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free)
+nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free_elts)
{
uint32_t i;
nxt_port_mmap_t *port_mmap;
@@ -82,12 +91,12 @@ nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free)
port_mmap = port_mmaps->elts;
for (i = 0; i < port_mmaps->size; i++) {
- nxt_port_mmap_destroy(port_mmap + i);
+ nxt_port_mmap_handler_use(port_mmap[i].mmap_handler, -1);
}
port_mmaps->size = 0;
- if (free != 0) {
+ if (free_elts != 0) {
nxt_free(port_mmaps->elts);
}
}
@@ -100,11 +109,12 @@ nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free)
static void
nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
{
- u_char *p;
- nxt_mp_t *mp;
- nxt_buf_t *b;
- nxt_chunk_id_t c;
- nxt_port_mmap_header_t *hdr;
+ u_char *p;
+ nxt_mp_t *mp;
+ nxt_buf_t *b;
+ nxt_chunk_id_t c;
+ nxt_port_mmap_header_t *hdr;
+ nxt_port_mmap_handler_t *mmap_handler;
if (nxt_buf_ts_handle(task, obj, data)) {
return;
@@ -122,7 +132,8 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
}
#endif
- hdr = data;
+ mmap_handler = data;
+ hdr = mmap_handler->hdr;
if (nxt_slow_path(hdr->src_pid != nxt_pid && hdr->dst_pid != nxt_pid)) {
nxt_debug(task, "mmap buf completion: mmap for other process pair "
@@ -160,18 +171,21 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
release_buf:
+ nxt_port_mmap_handler_use(mmap_handler, -1);
+
nxt_mp_release(mp, b);
}
-nxt_port_mmap_header_t *
+nxt_port_mmap_handler_t *
nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process,
nxt_fd_t fd)
{
- void *mem;
- struct stat mmap_stat;
- nxt_port_mmap_t *port_mmap;
- nxt_port_mmap_header_t *hdr;
+ void *mem;
+ struct stat mmap_stat;
+ nxt_port_mmap_t *port_mmap;
+ nxt_port_mmap_header_t *hdr;
+ nxt_port_mmap_handler_t *mmap_handler;
nxt_debug(task, "got new mmap fd #%FD from process %PI",
fd, process->pid);
@@ -196,6 +210,15 @@ nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process,
hdr = mem;
+ mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t));
+ if (nxt_slow_path(mmap_handler == NULL)) {
+ nxt_log(task, NXT_LOG_WARN, "failed to allocate mmap_handler");
+
+ return NULL;
+ }
+
+ mmap_handler->hdr = hdr;
+
nxt_thread_mutex_lock(&process->incoming.mutex);
port_mmap = nxt_port_mmap_at(&process->incoming, hdr->id);
@@ -205,13 +228,17 @@ nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process,
nxt_mem_munmap(mem, PORT_MMAP_SIZE);
hdr = NULL;
+ nxt_free(mmap_handler);
+ mmap_handler = NULL;
+
goto fail;
}
nxt_assert(hdr->src_pid == process->pid);
nxt_assert(hdr->dst_pid == nxt_pid);
- port_mmap->hdr = hdr;
+ port_mmap->mmap_handler = mmap_handler;
+ nxt_port_mmap_handler_use(mmap_handler, 1);
hdr->sent_over = 0xFFFFu;
@@ -219,25 +246,34 @@ fail:
nxt_thread_mutex_unlock(&process->incoming.mutex);
- return hdr;
+ return mmap_handler;
}
-static nxt_port_mmap_header_t *
+static nxt_port_mmap_handler_t *
nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
nxt_port_t *port)
{
- void *mem;
- u_char *p, name[64];
- nxt_fd_t fd;
- nxt_port_mmap_t *port_mmap;
- nxt_port_mmap_header_t *hdr;
+ void *mem;
+ u_char *p, name[64];
+ nxt_fd_t fd;
+ nxt_port_mmap_t *port_mmap;
+ nxt_port_mmap_header_t *hdr;
+ nxt_port_mmap_handler_t *mmap_handler;
+
+ mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t));
+ if (nxt_slow_path(mmap_handler == NULL)) {
+ nxt_log(task, NXT_LOG_WARN, "failed to allocate mmap_handler");
+
+ return NULL;
+ }
port_mmap = nxt_port_mmap_at(&process->outgoing, process->outgoing.size);
if (nxt_slow_path(port_mmap == NULL)) {
nxt_log(task, NXT_LOG_WARN,
"failed to add port mmap to outgoing array");
+ nxt_free(mmap_handler);
return NULL;
}
@@ -293,10 +329,12 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
goto remove_fail;
}
- port_mmap->hdr = mem;
+ mmap_handler->hdr = mem;
+ port_mmap->mmap_handler = mmap_handler;
+ nxt_port_mmap_handler_use(mmap_handler, 1);
/* Init segment header. */
- hdr = port_mmap->hdr;
+ hdr = mmap_handler->hdr;
nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
@@ -319,24 +357,27 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> %PI",
hdr->id, nxt_pid, process->pid);
- return hdr;
+ return mmap_handler;
remove_fail:
+ nxt_free(mmap_handler);
+
process->outgoing.size--;
return NULL;
}
-static nxt_port_mmap_header_t *
+static nxt_port_mmap_handler_t *
nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c,
size_t size)
{
- nxt_process_t *process;
- nxt_port_mmap_t *port_mmap;
- nxt_port_mmap_t *end_port_mmap;
- nxt_port_mmap_header_t *hdr;
+ nxt_process_t *process;
+ nxt_port_mmap_t *port_mmap;
+ nxt_port_mmap_t *end_port_mmap;
+ nxt_port_mmap_header_t *hdr;
+ nxt_port_mmap_handler_t *mmap_handler;
process = port->process;
if (nxt_slow_path(process == NULL)) {
@@ -344,75 +385,72 @@ nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c,
}
*c = 0;
- port_mmap = NULL;
- hdr = NULL;
nxt_thread_mutex_lock(&process->outgoing.mutex);
- port_mmap = process->outgoing.elts;
- end_port_mmap = port_mmap + process->outgoing.size;
+ end_port_mmap = process->outgoing.elts + process->outgoing.size;
- while (port_mmap < end_port_mmap) {
+ for (port_mmap = process->outgoing.elts;
+ port_mmap < end_port_mmap;
+ port_mmap++)
+ {
+ mmap_handler = port_mmap->mmap_handler;
+ hdr = mmap_handler->hdr;
- if ( (port_mmap->hdr->sent_over == 0xFFFFu ||
- port_mmap->hdr->sent_over == port->id) &&
- nxt_port_mmap_get_free_chunk(port_mmap->hdr, c)) {
- hdr = port_mmap->hdr;
+ if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port->id) {
+ continue;
+ }
+ if (nxt_port_mmap_get_free_chunk(hdr, c)) {
goto unlock_return;
}
-
- port_mmap++;
}
/* TODO introduce port_mmap limit and release wait. */
- hdr = nxt_port_new_port_mmap(task, process, port);
+ mmap_handler = nxt_port_new_port_mmap(task, process, port);
unlock_return:
nxt_thread_mutex_unlock(&process->outgoing.mutex);
- return hdr;
+ return mmap_handler;
}
-static nxt_port_mmap_header_t *
+static nxt_port_mmap_handler_t *
nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id)
{
- nxt_process_t *process;
- nxt_port_mmap_header_t *hdr;
+ nxt_process_t *process;
+ nxt_port_mmap_handler_t *mmap_handler;
process = nxt_runtime_process_find(task->thread->runtime, spid);
if (nxt_slow_path(process == NULL)) {
return NULL;
}
- hdr = NULL;
+ mmap_handler = NULL;
nxt_thread_mutex_lock(&process->incoming.mutex);
if (nxt_fast_path(process->incoming.size > id)) {
- hdr = process->incoming.elts[id].hdr;
-
- } else {
- nxt_log(task, NXT_LOG_WARN,
- "failed to get incoming mmap #%d for process %PI", id, spid);
+ mmap_handler = process->incoming.elts[id].mmap_handler;
}
nxt_thread_mutex_unlock(&process->incoming.mutex);
- return hdr;
+ return mmap_handler;
}
nxt_buf_t *
nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
{
- size_t nchunks;
- nxt_buf_t *b;
- nxt_chunk_id_t c;
- nxt_port_mmap_header_t *hdr;
+ size_t nchunks;
+ nxt_buf_t *b;
+ nxt_chunk_id_t c;
+ nxt_port_mmap_header_t *hdr;
+ nxt_port_mmap_handler_t *mmap_handler;
nxt_debug(task, "request %z bytes shm buffer", size);
@@ -424,13 +462,17 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
b->completion_handler = nxt_port_mmap_buf_completion;
nxt_buf_set_port_mmap(b);
- hdr = nxt_port_mmap_get(task, port, &c, size);
- if (nxt_slow_path(hdr == NULL)) {
+ mmap_handler = nxt_port_mmap_get(task, port, &c, size);
+ if (nxt_slow_path(mmap_handler == NULL)) {
nxt_mp_release(task->thread->engine->mem_pool, b);
return NULL;
}
- b->parent = hdr;
+ b->parent = mmap_handler;
+
+ nxt_port_mmap_handler_use(mmap_handler, 1);
+
+ hdr = mmap_handler->hdr;
b->mem.start = nxt_port_mmap_chunk_start(hdr, c);
b->mem.pos = b->mem.start;
@@ -469,9 +511,10 @@ nxt_int_t
nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size,
size_t min_size)
{
- size_t nchunks, free_size;
- nxt_chunk_id_t c, start;
- nxt_port_mmap_header_t *hdr;
+ size_t nchunks, free_size;
+ nxt_chunk_id_t c, start;
+ nxt_port_mmap_header_t *hdr;
+ nxt_port_mmap_handler_t *mmap_handler;
nxt_debug(task, "request increase %z bytes shm buffer", size);
@@ -487,7 +530,8 @@ nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size,
return NXT_OK;
}
- hdr = b->parent;
+ mmap_handler = b->parent;
+ hdr = mmap_handler->hdr;
start = nxt_port_mmap_chunk_id(hdr, b->mem.end);
@@ -536,12 +580,14 @@ static nxt_buf_t *
nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port,
nxt_pid_t spid, nxt_port_mmap_msg_t *mmap_msg)
{
- size_t nchunks;
- nxt_buf_t *b;
- nxt_port_mmap_header_t *hdr;
-
- hdr = nxt_port_get_port_incoming_mmap(task, spid, mmap_msg->mmap_id);
- if (nxt_slow_path(hdr == NULL)) {
+ size_t nchunks;
+ nxt_buf_t *b;
+ nxt_port_mmap_header_t *hdr;
+ nxt_port_mmap_handler_t *mmap_handler;
+
+ mmap_handler = nxt_port_get_port_incoming_mmap(task, spid,
+ mmap_msg->mmap_id);
+ if (nxt_slow_path(mmap_handler == NULL)) {
return NULL;
}
@@ -559,12 +605,15 @@ nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port,
nchunks++;
}
+ hdr = mmap_handler->hdr;
+
b->mem.start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id);
b->mem.pos = b->mem.start;
b->mem.free = b->mem.start + mmap_msg->size;
b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE;
- b->parent = hdr;
+ b->parent = mmap_handler;
+ nxt_port_mmap_handler_use(mmap_handler, 1);
nxt_debug(task, "incoming mmap buf allocation: %p [%p,%d] %PI->%PI,%d,%d",
b, b->mem.start, b->mem.end - b->mem.start,
@@ -578,11 +627,12 @@ void
nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port,
nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb)
{
- size_t bsize;
- nxt_buf_t *bmem;
- nxt_uint_t i;
- nxt_port_mmap_msg_t *mmap_msg;
- nxt_port_mmap_header_t *hdr;
+ size_t bsize;
+ nxt_buf_t *bmem;
+ nxt_uint_t i;
+ nxt_port_mmap_msg_t *mmap_msg;
+ nxt_port_mmap_header_t *hdr;
+ nxt_port_mmap_handler_t *mmap_handler;
nxt_debug(task, "prepare %z bytes message for transfer to process %PI "
"via shared memory", sb->size, port->pid);
@@ -606,7 +656,8 @@ nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port,
/* TODO clear b and exit */
}
- hdr = bmem->parent;
+ mmap_handler = bmem->parent;
+ hdr = mmap_handler->hdr;
mmap_msg->mmap_id = hdr->id;
mmap_msg->chunk_id = nxt_port_mmap_chunk_id(hdr, bmem->mem.pos);
@@ -667,8 +718,9 @@ nxt_port_mmap_read(nxt_task_t *task, nxt_port_t *port,
nxt_port_method_t
nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b)
{
- nxt_port_method_t m;
- nxt_port_mmap_header_t *hdr;
+ nxt_port_method_t m;
+ nxt_port_mmap_header_t *hdr;
+ nxt_port_mmap_handler_t *mmap_handler;
m = NXT_PORT_METHOD_ANY;
@@ -679,7 +731,8 @@ nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b)
}
if (nxt_buf_is_port_mmap(b)) {
- hdr = b->parent;
+ mmap_handler = b->parent;
+ hdr = mmap_handler->hdr;
if (m == NXT_PORT_METHOD_PLAIN) {
nxt_log_error(NXT_LOG_ERR, task->log,
diff --git a/src/nxt_port_memory.h b/src/nxt_port_memory.h
index 51c40411..d1f29df8 100644
--- a/src/nxt_port_memory.h
+++ b/src/nxt_port_memory.h
@@ -11,8 +11,9 @@
#define PORT_MMAP_MIN_SIZE (3 * sizeof(uint32_t))
typedef struct nxt_port_mmap_header_s nxt_port_mmap_header_t;
+typedef struct nxt_port_mmap_handler_s nxt_port_mmap_handler_t;
-void nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free);
+void nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free_elts);
/*
* Allocates nxt_but_t structure from port's mem_pool, assigns this buf 'mem'
@@ -27,7 +28,7 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size);
nxt_int_t nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b,
size_t size, size_t min_size);
-nxt_port_mmap_header_t *
+nxt_port_mmap_handler_t *
nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process,
nxt_fd_t fd);
diff --git a/src/nxt_port_memory_int.h b/src/nxt_port_memory_int.h
index 6ccc3d83..0168e1ae 100644
--- a/src/nxt_port_memory_int.h
+++ b/src/nxt_port_memory_int.h
@@ -55,12 +55,17 @@ struct nxt_port_mmap_header_s {
};
+struct nxt_port_mmap_handler_s {
+ nxt_port_mmap_header_t *hdr;
+ nxt_atomic_t use_count;
+};
+
/*
* Element of nxt_process_t.incoming/outgoing, shared memory segment
* descriptor.
*/
struct nxt_port_mmap_s {
- nxt_port_mmap_header_t *hdr;
+ nxt_port_mmap_handler_t *mmap_handler;
};
typedef struct nxt_port_mmap_msg_s nxt_port_mmap_msg_t;