summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_port_memory.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nxt_port_memory.c')
-rw-r--r--src/nxt_port_memory.c68
1 files changed, 39 insertions, 29 deletions
diff --git a/src/nxt_port_memory.c b/src/nxt_port_memory.c
index 5f730030..0c10cdd2 100644
--- a/src/nxt_port_memory.c
+++ b/src/nxt_port_memory.c
@@ -394,8 +394,9 @@ remove_fail:
static nxt_port_mmap_handler_t *
nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c,
- nxt_bool_t tracking)
+ nxt_int_t n, nxt_bool_t tracking)
{
+ nxt_int_t i, res, nchunks;
nxt_process_t *process;
nxt_free_map_t *free_map;
nxt_port_mmap_t *port_mmap;
@@ -408,8 +409,6 @@ nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c,
return NULL;
}
- *c = 0;
-
nxt_thread_mutex_lock(&process->outgoing.mutex);
end_port_mmap = process->outgoing.elts + process->outgoing.size;
@@ -425,15 +424,38 @@ nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c,
continue;
}
+ *c = 0;
+
free_map = tracking ? hdr->free_tracking_map : hdr->free_map;
- if (nxt_port_mmap_get_free_chunk(free_map, c)) {
- goto unlock_return;
+ while (nxt_port_mmap_get_free_chunk(free_map, c)) {
+ nchunks = 1;
+
+ while (nchunks < n) {
+ res = nxt_port_mmap_chk_set_chunk_busy(free_map, *c + nchunks);
+
+ if (res == 0) {
+ for (i = 0; i < nchunks; i++) {
+ nxt_port_mmap_set_chunk_free(free_map, *c + i);
+ }
+
+ *c += nchunks + 1;
+ nchunks = 0;
+ break;
+ }
+
+ nchunks++;
+ }
+
+ if (nchunks == n) {
+ goto unlock_return;
+ }
}
}
/* TODO introduce port_mmap limit and release wait. */
+ *c = 0;
mmap_handler = nxt_port_new_port_mmap(task, process, port, tracking);
unlock_return:
@@ -482,7 +504,7 @@ nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_t *port,
nxt_debug(task, "request tracking for stream #%uD", stream);
- mmap_handler = nxt_port_mmap_get(task, port, &c, 1);
+ mmap_handler = nxt_port_mmap_get(task, port, &c, 1, 1);
if (nxt_slow_path(mmap_handler == NULL)) {
return NXT_ERROR;
}
@@ -606,15 +628,23 @@ nxt_port_mmap_tracking_read(nxt_task_t *task, nxt_port_recv_msg_t *msg)
nxt_buf_t *
nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
{
- size_t nchunks;
nxt_mp_t *mp;
nxt_buf_t *b;
+ nxt_int_t nchunks;
nxt_chunk_id_t c;
nxt_port_mmap_header_t *hdr;
nxt_port_mmap_handler_t *mmap_handler;
nxt_debug(task, "request %z bytes shm buffer", size);
+ nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
+
+ if (nxt_slow_path(nchunks > PORT_MMAP_CHUNK_COUNT)) {
+ nxt_alert(task, "requested buffer (%z) too big", size);
+
+ return NULL;
+ }
+
b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 0);
if (nxt_slow_path(b == NULL)) {
return NULL;
@@ -623,7 +653,7 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
b->completion_handler = nxt_port_mmap_buf_completion;
nxt_buf_set_port_mmap(b);
- mmap_handler = nxt_port_mmap_get(task, port, &c, 0);
+ mmap_handler = nxt_port_mmap_get(task, port, &c, nchunks, 0);
if (nxt_slow_path(mmap_handler == NULL)) {
mp = task->thread->engine->mem_pool;
nxt_mp_free(mp, b);
@@ -640,32 +670,12 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
b->mem.start = nxt_port_mmap_chunk_start(hdr, c);
b->mem.pos = b->mem.start;
b->mem.free = b->mem.start;
- b->mem.end = b->mem.start + PORT_MMAP_CHUNK_SIZE;
-
- nchunks = size / PORT_MMAP_CHUNK_SIZE;
- if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) {
- nchunks++;
- }
+ b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE;
nxt_debug(task, "outgoing mmap buf allocation: %p [%p,%uz] %PI->%PI,%d,%d",
b, b->mem.start, b->mem.end - b->mem.start,
hdr->src_pid, hdr->dst_pid, hdr->id, c);
- c++;
- nchunks--;
-
- /* Try to acquire as much chunks as required. */
- while (nchunks > 0) {
-
- if (nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, c) == 0) {
- break;
- }
-
- b->mem.end += PORT_MMAP_CHUNK_SIZE;
- c++;
- nchunks--;
- }
-
return b;
}