summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_port_memory.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nxt_port_memory.c')
-rw-r--r--src/nxt_port_memory.c73
1 files changed, 23 insertions, 50 deletions
diff --git a/src/nxt_port_memory.c b/src/nxt_port_memory.c
index fd472cc6..1e01629e 100644
--- a/src/nxt_port_memory.c
+++ b/src/nxt_port_memory.c
@@ -282,8 +282,8 @@ fail:
static nxt_port_mmap_handler_t *
-nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
- nxt_port_t *port, nxt_bool_t tracking, nxt_int_t n)
+nxt_port_new_port_mmap(nxt_task_t *task, nxt_port_mmaps_t *mmaps,
+ nxt_bool_t tracking, nxt_int_t n)
{
void *mem;
nxt_fd_t fd;
@@ -295,15 +295,14 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t));
if (nxt_slow_path(mmap_handler == NULL)) {
- nxt_log(task, NXT_LOG_WARN, "failed to allocate mmap_handler");
+ nxt_alert(task, "failed to allocate mmap_handler");
return NULL;
}
- port_mmap = nxt_port_mmap_at(&process->outgoing, process->outgoing.size);
+ port_mmap = nxt_port_mmap_at(mmaps, mmaps->size);
if (nxt_slow_path(port_mmap == NULL)) {
- nxt_log(task, NXT_LOG_WARN,
- "failed to add port mmap to outgoing array");
+ nxt_alert(task, "failed to add port mmap to mmaps array");
nxt_free(mmap_handler);
return NULL;
@@ -322,6 +321,7 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
}
mmap_handler->hdr = mem;
+ mmap_handler->fd = fd;
port_mmap->mmap_handler = mmap_handler;
nxt_port_mmap_handler_use(mmap_handler, 1);
@@ -331,10 +331,9 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
nxt_memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map));
- hdr->id = process->outgoing.size - 1;
+ hdr->id = mmaps->size - 1;
hdr->src_pid = nxt_pid;
- hdr->dst_pid = process->pid;
- hdr->sent_over = port->id;
+ hdr->sent_over = 0xFFFFu;
/* Mark first chunk as busy */
free_map = tracking ? hdr->free_tracking_map : hdr->free_map;
@@ -347,13 +346,8 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT);
nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT);
- nxt_debug(task, "send mmap fd %FD to process %PI", fd, port->pid);
-
- /* TODO handle error */
- (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL);
-
- nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> %PI",
- hdr->id, nxt_pid, process->pid);
+ nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> ...",
+ hdr->id, nxt_pid);
return mmap_handler;
@@ -361,7 +355,7 @@ remove_fail:
nxt_free(mmap_handler);
- process->outgoing.size--;
+ mmaps->size--;
return NULL;
}
@@ -445,34 +439,28 @@ nxt_shm_open(nxt_task_t *task, size_t size)
static nxt_port_mmap_handler_t *
-nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c,
+nxt_port_mmap_get(nxt_task_t *task, nxt_port_mmaps_t *mmaps, nxt_chunk_id_t *c,
nxt_int_t n, nxt_bool_t tracking)
{
nxt_int_t i, res, nchunks;
- nxt_process_t *process;
nxt_free_map_t *free_map;
nxt_port_mmap_t *port_mmap;
nxt_port_mmap_t *end_port_mmap;
nxt_port_mmap_header_t *hdr;
nxt_port_mmap_handler_t *mmap_handler;
- process = port->process;
- if (nxt_slow_path(process == NULL)) {
- return NULL;
- }
-
- nxt_thread_mutex_lock(&process->outgoing.mutex);
+ nxt_thread_mutex_lock(&mmaps->mutex);
- end_port_mmap = process->outgoing.elts + process->outgoing.size;
+ end_port_mmap = mmaps->elts + mmaps->size;
- for (port_mmap = process->outgoing.elts;
+ for (port_mmap = mmaps->elts;
port_mmap < end_port_mmap;
port_mmap++)
{
mmap_handler = port_mmap->mmap_handler;
hdr = mmap_handler->hdr;
- if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port->id) {
+ if (hdr->sent_over != 0xFFFFu) {
continue;
}
@@ -510,11 +498,11 @@ nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c,
/* TODO introduce port_mmap limit and release wait. */
*c = 0;
- mmap_handler = nxt_port_new_port_mmap(task, process, port, tracking, n);
+ mmap_handler = nxt_port_new_port_mmap(task, mmaps, tracking, n);
unlock_return:
- nxt_thread_mutex_unlock(&process->outgoing.mutex);
+ nxt_thread_mutex_unlock(&mmaps->mutex);
return mmap_handler;
}
@@ -549,7 +537,7 @@ nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id)
nxt_int_t
-nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_t *port,
+nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_mmaps_t *mmaps,
nxt_port_mmap_tracking_t *tracking, uint32_t stream)
{
nxt_chunk_id_t c;
@@ -558,7 +546,7 @@ nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_t *port,
nxt_debug(task, "request tracking for stream #%uD", stream);
- mmap_handler = nxt_port_mmap_get(task, port, &c, 1, 1);
+ mmap_handler = nxt_port_mmap_get(task, mmaps, &c, 1, 1);
if (nxt_slow_path(mmap_handler == NULL)) {
return NXT_ERROR;
}
@@ -680,7 +668,7 @@ nxt_port_mmap_tracking_read(nxt_task_t *task, nxt_port_recv_msg_t *msg)
nxt_buf_t *
-nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
+nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_mmaps_t *mmaps, size_t size)
{
nxt_mp_t *mp;
nxt_buf_t *b;
@@ -707,7 +695,7 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
b->completion_handler = nxt_port_mmap_buf_completion;
nxt_buf_set_port_mmap(b);
- mmap_handler = nxt_port_mmap_get(task, port, &c, nchunks, 0);
+ mmap_handler = nxt_port_mmap_get(task, mmaps, &c, nchunks, 0);
if (nxt_slow_path(mmap_handler == NULL)) {
mp = task->thread->engine->mem_pool;
nxt_mp_free(mp, b);
@@ -943,9 +931,7 @@ nxt_port_mmap_read(nxt_task_t *task, nxt_port_recv_msg_t *msg)
nxt_port_method_t
nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b)
{
- nxt_port_method_t m;
- nxt_port_mmap_header_t *hdr;
- nxt_port_mmap_handler_t *mmap_handler;
+ nxt_port_method_t m;
m = NXT_PORT_METHOD_ANY;
@@ -956,9 +942,6 @@ nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b)
}
if (nxt_buf_is_port_mmap(b)) {
- mmap_handler = b->parent;
- hdr = mmap_handler->hdr;
-
if (m == NXT_PORT_METHOD_PLAIN) {
nxt_log_error(NXT_LOG_ERR, task->log,
"mixing plain and mmap buffers, "
@@ -967,16 +950,6 @@ nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b)
break;
}
- if (port->pid != hdr->dst_pid) {
- nxt_log_error(NXT_LOG_ERR, task->log,
- "send mmap buffer for %PI to %PI, "
- "using plain mode", hdr->dst_pid, port->pid);
-
- m = NXT_PORT_METHOD_PLAIN;
-
- break;
- }
-
if (m == NXT_PORT_METHOD_ANY) {
nxt_debug(task, "using mmap mode");