diff options
Diffstat (limited to 'src/nxt_port_memory.c')
-rw-r--r-- | src/nxt_port_memory.c | 68 |
1 files changed, 39 insertions, 29 deletions
diff --git a/src/nxt_port_memory.c b/src/nxt_port_memory.c index 5f730030..0c10cdd2 100644 --- a/src/nxt_port_memory.c +++ b/src/nxt_port_memory.c @@ -394,8 +394,9 @@ remove_fail: static nxt_port_mmap_handler_t * nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c, - nxt_bool_t tracking) + nxt_int_t n, nxt_bool_t tracking) { + nxt_int_t i, res, nchunks; nxt_process_t *process; nxt_free_map_t *free_map; nxt_port_mmap_t *port_mmap; @@ -408,8 +409,6 @@ nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c, return NULL; } - *c = 0; - nxt_thread_mutex_lock(&process->outgoing.mutex); end_port_mmap = process->outgoing.elts + process->outgoing.size; @@ -425,15 +424,38 @@ nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c, continue; } + *c = 0; + free_map = tracking ? hdr->free_tracking_map : hdr->free_map; - if (nxt_port_mmap_get_free_chunk(free_map, c)) { - goto unlock_return; + while (nxt_port_mmap_get_free_chunk(free_map, c)) { + nchunks = 1; + + while (nchunks < n) { + res = nxt_port_mmap_chk_set_chunk_busy(free_map, *c + nchunks); + + if (res == 0) { + for (i = 0; i < nchunks; i++) { + nxt_port_mmap_set_chunk_free(free_map, *c + i); + } + + *c += nchunks + 1; + nchunks = 0; + break; + } + + nchunks++; + } + + if (nchunks == n) { + goto unlock_return; + } } } /* TODO introduce port_mmap limit and release wait. */ + *c = 0; mmap_handler = nxt_port_new_port_mmap(task, process, port, tracking); unlock_return: @@ -482,7 +504,7 @@ nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_t *port, nxt_debug(task, "request tracking for stream #%uD", stream); - mmap_handler = nxt_port_mmap_get(task, port, &c, 1); + mmap_handler = nxt_port_mmap_get(task, port, &c, 1, 1); if (nxt_slow_path(mmap_handler == NULL)) { return NXT_ERROR; } @@ -606,15 +628,23 @@ nxt_port_mmap_tracking_read(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_buf_t * nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size) { - size_t nchunks; nxt_mp_t *mp; nxt_buf_t *b; + nxt_int_t nchunks; nxt_chunk_id_t c; nxt_port_mmap_header_t *hdr; nxt_port_mmap_handler_t *mmap_handler; nxt_debug(task, "request %z bytes shm buffer", size); + nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE; + + if (nxt_slow_path(nchunks > PORT_MMAP_CHUNK_COUNT)) { + nxt_alert(task, "requested buffer (%z) too big", size); + + return NULL; + } + b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 0); if (nxt_slow_path(b == NULL)) { return NULL; @@ -623,7 +653,7 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size) b->completion_handler = nxt_port_mmap_buf_completion; nxt_buf_set_port_mmap(b); - mmap_handler = nxt_port_mmap_get(task, port, &c, 0); + mmap_handler = nxt_port_mmap_get(task, port, &c, nchunks, 0); if (nxt_slow_path(mmap_handler == NULL)) { mp = task->thread->engine->mem_pool; nxt_mp_free(mp, b); @@ -640,32 +670,12 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size) b->mem.start = nxt_port_mmap_chunk_start(hdr, c); b->mem.pos = b->mem.start; b->mem.free = b->mem.start; - b->mem.end = b->mem.start + PORT_MMAP_CHUNK_SIZE; - - nchunks = size / PORT_MMAP_CHUNK_SIZE; - if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) { - nchunks++; - } + b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE; nxt_debug(task, "outgoing mmap buf allocation: %p [%p,%uz] %PI->%PI,%d,%d", b, b->mem.start, b->mem.end - b->mem.start, hdr->src_pid, hdr->dst_pid, hdr->id, c); - c++; - nchunks--; - - /* Try to acquire as much chunks as required. */ - while (nchunks > 0) { - - if (nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, c) == 0) { - break; - } - - b->mem.end += PORT_MMAP_CHUNK_SIZE; - c++; - nchunks--; - } - return b; } |