summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_port_memory.c
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2017-05-12 20:32:41 +0300
committerMax Romanov <max.romanov@nginx.com>2017-05-12 20:32:41 +0300
commitf7b4bdfd892a0b479dc946896435a3ba7f9615dd (patch)
treea6f0c4ebaeed2d9f0fcb1c07178b52a684a53280 /src/nxt_port_memory.c
parent1782c771fab999b37a8c04ed72760e3528205be7 (diff)
downloadunit-f7b4bdfd892a0b479dc946896435a3ba7f9615dd.tar.gz
unit-f7b4bdfd892a0b479dc946896435a3ba7f9615dd.tar.bz2
Using shared memory to send data via nxt_port.
Usage: b = nxt_port_mmap_get_buf(task, port, size); b->mem.free = nxt_cpymem(b->mem.free, data, size); nxt_port_socket_write(task, port, NXT_PORT_MSG_DATA, -1, 0, b);
Diffstat (limited to 'src/nxt_port_memory.c')
-rw-r--r--src/nxt_port_memory.c700
1 files changed, 700 insertions, 0 deletions
diff --git a/src/nxt_port_memory.c b/src/nxt_port_memory.c
new file mode 100644
index 00000000..447ef138
--- /dev/null
+++ b/src/nxt_port_memory.c
@@ -0,0 +1,700 @@
+
+#include <nxt_main.h>
+
+#if (NXT_HAVE_MEMFD_CREATE)
+
+#include <linux/memfd.h>
+#include <unistd.h>
+#include <sys/syscall.h>
+
+#endif
+
+#define PORT_MMAP_CHUNK_SIZE (1024 * 16)
+#define PORT_MMAP_HEADER_SIZE (1024 * 4)
+#define PORT_MMAP_SIZE (PORT_MMAP_HEADER_SIZE + 1024 * 1024 * 10)
+
+#define PORT_MMAP_CHUNK_COUNT \
+ ( (PORT_MMAP_SIZE - PORT_MMAP_HEADER_SIZE) / PORT_MMAP_CHUNK_SIZE )
+
+
+typedef uint32_t nxt_chunk_id_t;
+
+typedef nxt_atomic_uint_t nxt_free_map_t;
+
+#define FREE_BITS (sizeof(nxt_free_map_t) * 8)
+
+#define FREE_IDX(nchunk) ((nchunk) / FREE_BITS)
+
+#define FREE_MASK(nchunk) \
+ ( 1ULL << ( (nchunk) % FREE_BITS ) )
+
+#define MAX_FREE_IDX FREE_IDX(PORT_MMAP_CHUNK_COUNT)
+
+
+/* Mapped at the start of shared memory segment. */
+struct nxt_port_mmap_header_s {
+ nxt_free_map_t free_map[MAX_FREE_IDX];
+};
+
+
+/*
+ * Element of nxt_process_t.incoming/outgoing, shared memory segment
+ * descriptor.
+ */
+struct nxt_port_mmap_s {
+ uint32_t id;
+ nxt_fd_t fd;
+ nxt_pid_t pid; /* For sanity check. */
+ union {
+ void *mem;
+ nxt_port_mmap_header_t *hdr;
+ } u;
+};
+
+
+/* Passed as a second iov chunk when 'mmap' bit in nxt_port_msg_t is 1. */
+typedef struct {
+ uint32_t mmap_id; /* Mmap index in nxt_process_t.outgoing. */
+ nxt_chunk_id_t chunk_id; /* Mmap chunk index. */
+ uint32_t size; /* Payload data size. */
+} nxt_port_mmap_msg_t;
+
+
+static nxt_bool_t
+nxt_port_mmap_get_free_chunk(nxt_port_mmap_t *port_mmap, nxt_chunk_id_t *c);
+
+#define nxt_port_mmap_get_chunk_busy(hdr, c) \
+ ((hdr->free_map[FREE_IDX(c)] & FREE_MASK(c)) == 0)
+
+nxt_inline void
+nxt_port_mmap_set_chunk_busy(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c);
+
+nxt_inline void
+nxt_port_mmap_set_chunk_free(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c);
+
+#define nxt_port_mmap_chunk_id(port_mmap, b) \
+ ((((u_char *) (b) - (u_char *) (port_mmap->u.mem)) - \
+ PORT_MMAP_HEADER_SIZE) / PORT_MMAP_CHUNK_SIZE)
+
+#define nxt_port_mmap_chunk_start(port_mmap, chunk) \
+ (((u_char *) (port_mmap->u.mem)) + PORT_MMAP_HEADER_SIZE + \
+ (chunk) * PORT_MMAP_CHUNK_SIZE)
+
+
+void
+nxt_port_mmap_destroy(nxt_port_mmap_t *port_mmap)
+{
+ if (port_mmap->u.mem != NULL) {
+ nxt_mem_munmap(port_mmap->u.mem, PORT_MMAP_SIZE);
+ port_mmap->u.mem = NULL;
+ }
+
+ if (port_mmap->fd != -1) {
+ close(port_mmap->fd);
+ port_mmap->fd = -1;
+ }
+}
+
+
+static void
+nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
+{
+ u_char *p;
+ nxt_buf_t *b;
+ nxt_chunk_id_t c;
+ nxt_mem_pool_t *mp;
+ nxt_port_mmap_t *port_mmap;
+ nxt_port_mmap_header_t *hdr;
+
+ b = obj;
+
+ nxt_debug(task, "mmap buf completion: %p %p", b, b->mem.start);
+
+ mp = b->data;
+
+ port_mmap = (nxt_port_mmap_t *) b->parent;
+ hdr = port_mmap->u.hdr;
+
+ if (b->is_port_mmap_sent && b->mem.pos > b->mem.start) {
+ /*
+ * Chunks until b->mem.pos has been sent to other side,
+ * let's release rest (if any).
+ */
+ p = b->mem.pos - 1;
+ c = nxt_port_mmap_chunk_id(port_mmap, p) + 1;
+ p = nxt_port_mmap_chunk_start(port_mmap, c);
+ } else {
+ p = b->mem.start;
+ c = nxt_port_mmap_chunk_id(port_mmap, p);
+ }
+
+ while (p < b->mem.end) {
+ nxt_port_mmap_set_chunk_free(hdr, c);
+
+ p += PORT_MMAP_CHUNK_SIZE;
+ c++;
+ }
+
+ nxt_buf_free(mp, b);
+}
+
+
+static nxt_bool_t
+nxt_port_mmap_get_free_chunk(nxt_port_mmap_t *port_mmap, nxt_chunk_id_t *c)
+{
+ int ffs;
+ size_t i;
+ nxt_free_map_t bits;
+ nxt_free_map_t *free_map;
+
+ free_map = port_mmap->u.hdr->free_map;
+
+ for (i = 0; i < MAX_FREE_IDX; i++) {
+ bits = free_map[i];
+ if (bits == 0) {
+ continue;
+ }
+
+ ffs = __builtin_ffsll(bits);
+ if (ffs != 0) {
+ *c = i * FREE_BITS + ffs - 1;
+ return 1;
+ }
+ }
+
+ return 0;
+}
+
+
+nxt_inline void
+nxt_port_mmap_set_chunk_busy(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c)
+{
+ nxt_atomic_and_fetch(hdr->free_map + FREE_IDX(c), ~FREE_MASK(c));
+
+ nxt_thread_log_debug("set_chunk_busy: hdr %p; b %D", hdr, c);
+}
+
+
+nxt_inline void
+nxt_port_mmap_set_chunk_free(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c)
+{
+ nxt_atomic_or_fetch(hdr->free_map + FREE_IDX(c), FREE_MASK(c));
+
+ nxt_thread_log_debug("set_chunk_free: hdr %p; b %D", hdr, c);
+}
+
+
+nxt_port_mmap_t *
+nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process,
+ nxt_fd_t fd)
+{
+ struct stat mmap_stat;
+ nxt_port_mmap_t *port_mmap;
+
+ nxt_debug(task, "got new mmap fd #%FD from process %PI",
+ fd, process->pid);
+
+ if (fstat(fd, &mmap_stat) == -1) {
+ nxt_log(task, NXT_LOG_WARN, "fstat(%FD) failed %E", fd, nxt_errno);
+
+ return NULL;
+ }
+
+ if (process->incoming == NULL) {
+ process->incoming = nxt_array_create(process->mem_pool, 1,
+ sizeof(nxt_port_mmap_t));
+ }
+
+ if (nxt_slow_path(process->incoming == NULL)) {
+ nxt_log(task, NXT_LOG_WARN, "failed to allocate incoming array");
+
+ return NULL;
+ }
+
+ port_mmap = nxt_array_zero_add(process->incoming);
+ if (nxt_slow_path(port_mmap == NULL)) {
+ nxt_log(task, NXT_LOG_WARN, "failed to add mmap to incoming array");
+
+ return NULL;
+ }
+
+ port_mmap->id = process->incoming->nelts - 1;
+ port_mmap->fd = -1;
+ port_mmap->pid = process->pid;
+ port_mmap->u.mem = nxt_mem_mmap(NULL, mmap_stat.st_size,
+ PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
+
+ if (nxt_slow_path(port_mmap->u.mem == MAP_FAILED)) {
+ nxt_log(task, NXT_LOG_WARN, "mmap() failed %E", nxt_errno);
+
+ port_mmap->u.mem = NULL;
+
+ return NULL;
+ }
+
+ return port_mmap;
+}
+
+
+static void
+nxt_port_mmap_send_fd_buf_completion(nxt_task_t *task, void *obj, void *data)
+{
+ nxt_buf_t *b;
+ nxt_port_t *port;
+ nxt_port_mmap_t *port_mmap;
+
+ b = obj;
+ port = b->data;
+ port_mmap = (nxt_port_mmap_t *) b->parent;
+
+ nxt_debug(task, "mmap fd %FD sent to %PI", port_mmap->fd, port->pid);
+
+ close(port_mmap->fd);
+ port_mmap->fd = -1;
+
+ nxt_buf_free(port->mem_pool, b);
+}
+
+
+static nxt_port_mmap_t *
+nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
+ nxt_port_t *port)
+{
+ u_char *p, name[64];
+ nxt_buf_t *b;
+ nxt_port_mmap_t *port_mmap;
+ nxt_port_mmap_header_t *hdr;
+
+ if (process->outgoing == NULL) {
+ process->outgoing = nxt_array_create(process->mem_pool, 1,
+ sizeof(nxt_port_mmap_t));
+ }
+
+ if (nxt_slow_path(process->outgoing == NULL)) {
+ nxt_log(task, NXT_LOG_WARN, "failed to allocate outgoing array");
+
+ return NULL;
+ }
+
+ port_mmap = nxt_array_zero_add(process->outgoing);
+ if (nxt_slow_path(port_mmap == NULL)) {
+ nxt_log(task, NXT_LOG_WARN,
+ "failed to add port mmap to outgoing array");
+
+ return NULL;
+ }
+
+ port_mmap->id = process->outgoing->nelts - 1;
+ port_mmap->pid = process->pid;
+
+ p = nxt_sprintf(name, name + sizeof(name), "/nginext.%PI.%PT.%D",
+ nxt_pid, task->thread->tid, nxt_random(&nxt_random_data));
+ *p = '\0';
+
+#if (NXT_HAVE_MEMFD_CREATE)
+ port_mmap->fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
+
+ if (nxt_slow_path(port_mmap->fd == -1)) {
+ nxt_log(task, NXT_LOG_CRIT, "memfd_create(%s) failed %E",
+ name, nxt_errno);
+
+ goto remove_fail;
+ }
+
+ nxt_debug(task, "memfd_create(%s): %FD", name, port_mmap->fd);
+
+#elif (NXT_HAVE_SHM_OPEN)
+ shm_unlink((char *) name); // just in case
+
+ port_mmap->fd = shm_open((char *) name, O_CREAT | O_EXCL | O_RDWR,
+ S_IRUSR | S_IWUSR);
+
+ nxt_debug(task, "shm_open(%s): %FD", name, port_mmap->fd);
+
+ if (nxt_slow_path(port_mmap->fd == -1)) {
+ nxt_log(task, NXT_LOG_CRIT, "shm_open(%s) failed %E", name, nxt_errno);
+
+ goto remove_fail;
+ }
+
+ if (nxt_slow_path(shm_unlink((char *) name) == -1)) {
+ nxt_log(task, NXT_LOG_WARN, "shm_unlink(%s) failed %E", name,
+ nxt_errno);
+ }
+#endif
+
+ if (nxt_slow_path(ftruncate(port_mmap->fd, PORT_MMAP_SIZE) == -1)) {
+ nxt_log(task, NXT_LOG_WARN, "ftruncate() failed %E", nxt_errno);
+
+ goto remove_fail;
+ }
+
+ port_mmap->u.mem = nxt_mem_mmap(NULL, PORT_MMAP_SIZE,
+ PROT_READ | PROT_WRITE, MAP_SHARED,
+ port_mmap->fd, 0);
+
+ if (nxt_slow_path(port_mmap->u.mem == MAP_FAILED)) {
+ goto remove_fail;
+ }
+
+ /* Init segment header. */
+ hdr = port_mmap->u.hdr;
+
+ nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
+
+ /* Mark as busy chunk followed the last available chunk. */
+ nxt_port_mmap_set_chunk_busy(hdr, PORT_MMAP_CHUNK_COUNT);
+
+ nxt_debug(task, "send mmap fd %FD to process %PI", port_mmap->fd,
+ port->pid);
+
+ b = nxt_buf_mem_alloc(port->mem_pool, 0, 0);
+ b->completion_handler = nxt_port_mmap_send_fd_buf_completion;
+ b->data = port;
+ b->parent = port_mmap;
+
+ /* TODO handle error */
+ (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, port_mmap->fd,
+ 0, 0, b);
+
+ nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> %PI",
+ port_mmap->id, nxt_pid, process->pid);
+
+ return port_mmap;
+
+remove_fail:
+
+ nxt_array_remove(process->outgoing, port_mmap);
+
+ return NULL;
+}
+
+
+static nxt_port_mmap_t *
+nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c,
+ size_t size)
+{
+ nxt_array_t *outgoing;
+ nxt_process_t *process;
+ nxt_port_mmap_t *port_mmap;
+ nxt_port_mmap_t *end_port_mmap;
+
+ process = nxt_runtime_process_get(task->thread->runtime, port->pid);
+ if (nxt_slow_path(process == NULL)) {
+ return NULL;
+ }
+
+ *c = 0;
+
+ if (process->outgoing == NULL) {
+ return nxt_port_new_port_mmap(task, process, port);
+ }
+
+ outgoing = process->outgoing;
+ port_mmap = outgoing->elts;
+ end_port_mmap = port_mmap + outgoing->nelts;
+
+ while (port_mmap < end_port_mmap) {
+
+ if (nxt_port_mmap_get_free_chunk(port_mmap, c)) {
+ return port_mmap;
+ }
+
+ port_mmap++;
+ }
+
+ /* TODO introduce port_mmap limit and release wait. */
+ return nxt_port_new_port_mmap(task, process, port);
+}
+
+
+static nxt_port_mmap_t *
+nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id)
+{
+ nxt_array_t *incoming;
+ nxt_process_t *process;
+ nxt_port_mmap_t *port_mmap;
+
+ process = nxt_runtime_process_get(task->thread->runtime, spid);
+ if (nxt_slow_path(process == NULL)) {
+ return NULL;
+ }
+
+ incoming = process->incoming;
+ if (nxt_slow_path(incoming == NULL)) {
+ /* TODO add warning */
+ return NULL;
+ }
+
+ if (nxt_slow_path(incoming->nelts <= id)) {
+ /* TODO add warning */
+ return NULL;
+ }
+
+ port_mmap = incoming->elts;
+
+ return port_mmap + id;
+}
+
+
+nxt_buf_t *
+nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
+{
+ size_t nchunks;
+ nxt_buf_t *b;
+ nxt_chunk_id_t c;
+ nxt_port_mmap_t *port_mmap;
+ nxt_port_mmap_header_t *hdr;
+
+ nxt_debug(task, "request %z bytes shm buffer", size);
+
+ b = nxt_mem_cache_zalloc0(port->mem_pool, NXT_BUF_PORT_MMAP_SIZE);
+ if (nxt_slow_path(b == NULL)) {
+ return NULL;
+ }
+
+ b->data = port->mem_pool;
+ b->completion_handler = nxt_port_mmap_buf_completion;
+ b->size = NXT_BUF_PORT_MMAP_SIZE;
+
+ nxt_buf_set_port_mmap(b);
+
+ port_mmap = nxt_port_mmap_get(task, port, &c, size);
+ if (nxt_slow_path(port_mmap == NULL)) {
+ nxt_buf_free(port->mem_pool, b);
+ return NULL;
+ }
+
+ hdr = port_mmap->u.hdr;
+
+ b->parent = port_mmap;
+ b->mem.start = nxt_port_mmap_chunk_start(port_mmap, c);
+ b->mem.pos = b->mem.start;
+ b->mem.free = b->mem.start;
+ b->mem.end = b->mem.start + PORT_MMAP_CHUNK_SIZE;
+
+ nxt_port_mmap_set_chunk_busy(hdr, c);
+
+ nchunks = size / PORT_MMAP_CHUNK_SIZE;
+ if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) {
+ nchunks++;
+ }
+
+ c++;
+ nchunks--;
+
+ /* Try to acquire as much chunks as required. */
+ while (nchunks > 0) {
+
+ if (nxt_port_mmap_get_chunk_busy(hdr, c)) {
+ break;
+ }
+ nxt_port_mmap_set_chunk_busy(hdr, c);
+
+ b->mem.end += PORT_MMAP_CHUNK_SIZE;
+ c++;
+ nchunks--;
+ }
+
+ return b;
+}
+
+
+static nxt_buf_t *
+nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port,
+ nxt_pid_t spid, nxt_port_mmap_msg_t *mmap_msg)
+{
+ size_t nchunks;
+ nxt_buf_t *b;
+ nxt_port_mmap_t *port_mmap;
+
+ b = nxt_mem_cache_zalloc0(port->mem_pool, NXT_BUF_PORT_MMAP_SIZE);
+ if (nxt_slow_path(b == NULL)) {
+ return NULL;
+ }
+
+ b->data = port->mem_pool;
+ b->completion_handler = nxt_port_mmap_buf_completion;
+ b->size = NXT_BUF_PORT_MMAP_SIZE;
+
+ nxt_buf_set_port_mmap(b);
+
+ port_mmap = nxt_port_get_port_incoming_mmap(task, spid, mmap_msg->mmap_id);
+ if (nxt_slow_path(port_mmap == NULL)) {
+ nxt_buf_free(port->mem_pool, b);
+ return NULL;
+ }
+
+ nchunks = mmap_msg->size / PORT_MMAP_CHUNK_SIZE;
+ if ((mmap_msg->size % PORT_MMAP_CHUNK_SIZE) != 0) {
+ nchunks++;
+ }
+
+ b->mem.start = nxt_port_mmap_chunk_start(port_mmap, mmap_msg->chunk_id);
+ b->mem.pos = b->mem.start;
+ b->mem.free = b->mem.start + mmap_msg->size;
+ b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE;
+
+ b->parent = port_mmap;
+
+ return b;
+}
+
+
+void
+nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port,
+ nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb)
+{
+ size_t bsize;
+ nxt_buf_t *b, *bmem;
+ nxt_uint_t i;
+ nxt_port_mmap_t *port_mmap;
+ nxt_port_mmap_msg_t *mmap_msg;
+
+ nxt_debug(task, "prepare %z bytes message for transfer to process %PI "
+ "via shared memory", sb->size, port->pid);
+
+ bsize = sb->niov * sizeof(nxt_port_mmap_msg_t);
+
+ b = nxt_buf_mem_alloc(port->mem_pool, bsize, 0);
+ if (nxt_slow_path(b == NULL)) {
+ return;
+ }
+
+ mmap_msg = (nxt_port_mmap_msg_t *) b->mem.start;
+ bmem = msg->buf;
+
+ for (i = 0; i < sb->niov; i++, mmap_msg++) {
+
+ /* Lookup buffer which starts current iov_base. */
+ while (bmem && sb->iobuf[i].iov_base != bmem->mem.pos) {
+ bmem = bmem->next;
+ }
+
+ if (nxt_slow_path(bmem == NULL)) {
+ nxt_log_error(NXT_LOG_ERR, task->log, "failed to find buf for "
+ "iobuf[%d]", i);
+ return;
+ /* TODO clear b and exit */
+ }
+
+ port_mmap = (nxt_port_mmap_t *) bmem->parent;
+
+ mmap_msg->mmap_id = port_mmap->id;
+ mmap_msg->chunk_id = nxt_port_mmap_chunk_id(port_mmap, bmem->mem.pos);
+ mmap_msg->size = sb->iobuf[i].iov_len;
+
+ nxt_debug(task, "mmap_msg={%D, %D, %D} to %PI",
+ mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size,
+ port->pid);
+ }
+
+ msg->buf = b;
+ b->mem.free += bsize;
+
+ sb->iobuf[0].iov_base = b->mem.pos;
+ sb->iobuf[0].iov_len = bsize;
+ sb->niov = 1;
+ sb->size = bsize;
+
+ msg->port_msg.mmap = 1;
+}
+
+
+void
+nxt_port_mmap_read(nxt_task_t *task, nxt_port_t *port,
+ nxt_port_recv_msg_t *msg, size_t size)
+{
+ nxt_buf_t *b, **pb;
+ nxt_port_mmap_msg_t *end, *mmap_msg;
+
+ b = msg->buf;
+
+ mmap_msg = (nxt_port_mmap_msg_t *) b->mem.pos;
+ end = (nxt_port_mmap_msg_t *) b->mem.free;
+
+ pb = &msg->buf;
+
+ while (mmap_msg < end) {
+ nxt_debug(task, "mmap_msg={%D, %D, %D} from %PI",
+ mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size,
+ msg->port_msg.pid);
+
+ *pb = nxt_port_mmap_get_incoming_buf(task, port, msg->port_msg.pid,
+ mmap_msg);
+ if (nxt_slow_path(*pb == NULL)) {
+ nxt_log_error(NXT_LOG_ERR, task->log, "failed to get mmap buffer");
+
+ break;
+ }
+
+ pb = &(*pb)->next;
+ mmap_msg++;
+ }
+
+ /* Mark original buf as complete. */
+ b->mem.pos += nxt_buf_used_size(b);
+}
+
+
+nxt_port_method_t
+nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b)
+{
+ nxt_port_mmap_t *port_mmap;
+ nxt_port_method_t m;
+
+ m = NXT_PORT_METHOD_ANY;
+
+ for (; b != NULL; b = b->next) {
+ if (nxt_buf_used_size(b) == 0) {
+ /* empty buffers does not affect method */
+ continue;
+ }
+
+ if (nxt_buf_is_port_mmap(b)) {
+ port_mmap = (nxt_port_mmap_t *) b->parent;
+
+ if (m == NXT_PORT_METHOD_PLAIN) {
+ nxt_log_error(NXT_LOG_ERR, task->log,
+ "mixing plain and mmap buffers, "
+ "using plain mode");
+
+ break;
+ }
+
+ if (port->pid != port_mmap->pid) {
+ nxt_log_error(NXT_LOG_ERR, task->log,
+ "send mmap buffer for %PI to %PI, "
+ "using plain mode", port_mmap->pid, port->pid);
+
+ m = NXT_PORT_METHOD_PLAIN;
+
+ break;
+ }
+
+ if (m == NXT_PORT_METHOD_ANY) {
+ nxt_debug(task, "using mmap mode");
+
+ m = NXT_PORT_METHOD_MMAP;
+ }
+ } else {
+ if (m == NXT_PORT_METHOD_MMAP) {
+ nxt_log_error(NXT_LOG_ERR, task->log,
+ "mixing mmap and plain buffers, "
+ "switching to plain mode");
+
+ m = NXT_PORT_METHOD_PLAIN;
+
+ break;
+ }
+
+ if (m == NXT_PORT_METHOD_ANY) {
+ nxt_debug(task, "using plain mode");
+
+ m = NXT_PORT_METHOD_PLAIN;
+ }
+ }
+ }
+
+ return m;
+}