summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_port_memory.c
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2020-08-11 19:20:13 +0300
committerMax Romanov <max.romanov@nginx.com>2020-08-11 19:20:13 +0300
commit6e31d6cd39be9d3f4ee680fc13c3fe42f5cd39e7 (patch)
tree2ef1d993674825791636486a0929ead424c8dd7a /src/nxt_port_memory.c
parent3cbc22a6dc45abdeade4deb364601230ddca02c1 (diff)
downloadunit-6e31d6cd39be9d3f4ee680fc13c3fe42f5cd39e7.tar.gz
unit-6e31d6cd39be9d3f4ee680fc13c3fe42f5cd39e7.tar.bz2
Changing router to application shared memory exchange protocol.
The application process needs to request the shared memory segment from the router instead of the latter pushing the segment before sending a request to the application. This is required to simplify the communication between the router and the application and to prepare the router for using the application shared port and then the queue.
Diffstat (limited to 'src/nxt_port_memory.c')
-rw-r--r--src/nxt_port_memory.c73
1 files changed, 23 insertions, 50 deletions
diff --git a/src/nxt_port_memory.c b/src/nxt_port_memory.c
index fd472cc6..1e01629e 100644
--- a/src/nxt_port_memory.c
+++ b/src/nxt_port_memory.c
@@ -282,8 +282,8 @@ fail:
static nxt_port_mmap_handler_t *
-nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
- nxt_port_t *port, nxt_bool_t tracking, nxt_int_t n)
+nxt_port_new_port_mmap(nxt_task_t *task, nxt_port_mmaps_t *mmaps,
+ nxt_bool_t tracking, nxt_int_t n)
{
void *mem;
nxt_fd_t fd;
@@ -295,15 +295,14 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t));
if (nxt_slow_path(mmap_handler == NULL)) {
- nxt_log(task, NXT_LOG_WARN, "failed to allocate mmap_handler");
+ nxt_alert(task, "failed to allocate mmap_handler");
return NULL;
}
- port_mmap = nxt_port_mmap_at(&process->outgoing, process->outgoing.size);
+ port_mmap = nxt_port_mmap_at(mmaps, mmaps->size);
if (nxt_slow_path(port_mmap == NULL)) {
- nxt_log(task, NXT_LOG_WARN,
- "failed to add port mmap to outgoing array");
+ nxt_alert(task, "failed to add port mmap to mmaps array");
nxt_free(mmap_handler);
return NULL;
@@ -322,6 +321,7 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
}
mmap_handler->hdr = mem;
+ mmap_handler->fd = fd;
port_mmap->mmap_handler = mmap_handler;
nxt_port_mmap_handler_use(mmap_handler, 1);
@@ -331,10 +331,9 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
nxt_memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map));
- hdr->id = process->outgoing.size - 1;
+ hdr->id = mmaps->size - 1;
hdr->src_pid = nxt_pid;
- hdr->dst_pid = process->pid;
- hdr->sent_over = port->id;
+ hdr->sent_over = 0xFFFFu;
/* Mark first chunk as busy */
free_map = tracking ? hdr->free_tracking_map : hdr->free_map;
@@ -347,13 +346,8 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT);
nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT);
- nxt_debug(task, "send mmap fd %FD to process %PI", fd, port->pid);
-
- /* TODO handle error */
- (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL);
-
- nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> %PI",
- hdr->id, nxt_pid, process->pid);
+ nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> ...",
+ hdr->id, nxt_pid);
return mmap_handler;
@@ -361,7 +355,7 @@ remove_fail:
nxt_free(mmap_handler);
- process->outgoing.size--;
+ mmaps->size--;
return NULL;
}
@@ -445,34 +439,28 @@ nxt_shm_open(nxt_task_t *task, size_t size)
static nxt_port_mmap_handler_t *
-nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c,
+nxt_port_mmap_get(nxt_task_t *task, nxt_port_mmaps_t *mmaps, nxt_chunk_id_t *c,
nxt_int_t n, nxt_bool_t tracking)
{
nxt_int_t i, res, nchunks;
- nxt_process_t *process;
nxt_free_map_t *free_map;
nxt_port_mmap_t *port_mmap;
nxt_port_mmap_t *end_port_mmap;
nxt_port_mmap_header_t *hdr;
nxt_port_mmap_handler_t *mmap_handler;
- process = port->process;
- if (nxt_slow_path(process == NULL)) {
- return NULL;
- }
-
- nxt_thread_mutex_lock(&process->outgoing.mutex);
+ nxt_thread_mutex_lock(&mmaps->mutex);
- end_port_mmap = process->outgoing.elts + process->outgoing.size;
+ end_port_mmap = mmaps->elts + mmaps->size;
- for (port_mmap = process->outgoing.elts;
+ for (port_mmap = mmaps->elts;
port_mmap < end_port_mmap;
port_mmap++)
{
mmap_handler = port_mmap->mmap_handler;
hdr = mmap_handler->hdr;
- if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port->id) {
+ if (hdr->sent_over != 0xFFFFu) {
continue;
}
@@ -510,11 +498,11 @@ nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c,
/* TODO introduce port_mmap limit and release wait. */
*c = 0;
- mmap_handler = nxt_port_new_port_mmap(task, process, port, tracking, n);
+ mmap_handler = nxt_port_new_port_mmap(task, mmaps, tracking, n);
unlock_return:
- nxt_thread_mutex_unlock(&process->outgoing.mutex);
+ nxt_thread_mutex_unlock(&mmaps->mutex);
return mmap_handler;
}
@@ -549,7 +537,7 @@ nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id)
nxt_int_t
-nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_t *port,
+nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_mmaps_t *mmaps,
nxt_port_mmap_tracking_t *tracking, uint32_t stream)
{
nxt_chunk_id_t c;
@@ -558,7 +546,7 @@ nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_t *port,
nxt_debug(task, "request tracking for stream #%uD", stream);
- mmap_handler = nxt_port_mmap_get(task, port, &c, 1, 1);
+ mmap_handler = nxt_port_mmap_get(task, mmaps, &c, 1, 1);
if (nxt_slow_path(mmap_handler == NULL)) {
return NXT_ERROR;
}
@@ -680,7 +668,7 @@ nxt_port_mmap_tracking_read(nxt_task_t *task, nxt_port_recv_msg_t *msg)
nxt_buf_t *
-nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
+nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_mmaps_t *mmaps, size_t size)
{
nxt_mp_t *mp;
nxt_buf_t *b;
@@ -707,7 +695,7 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
b->completion_handler = nxt_port_mmap_buf_completion;
nxt_buf_set_port_mmap(b);
- mmap_handler = nxt_port_mmap_get(task, port, &c, nchunks, 0);
+ mmap_handler = nxt_port_mmap_get(task, mmaps, &c, nchunks, 0);
if (nxt_slow_path(mmap_handler == NULL)) {
mp = task->thread->engine->mem_pool;
nxt_mp_free(mp, b);
@@ -943,9 +931,7 @@ nxt_port_mmap_read(nxt_task_t *task, nxt_port_recv_msg_t *msg)
nxt_port_method_t
nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b)
{
- nxt_port_method_t m;
- nxt_port_mmap_header_t *hdr;
- nxt_port_mmap_handler_t *mmap_handler;
+ nxt_port_method_t m;
m = NXT_PORT_METHOD_ANY;
@@ -956,9 +942,6 @@ nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b)
}
if (nxt_buf_is_port_mmap(b)) {
- mmap_handler = b->parent;
- hdr = mmap_handler->hdr;
-
if (m == NXT_PORT_METHOD_PLAIN) {
nxt_log_error(NXT_LOG_ERR, task->log,
"mixing plain and mmap buffers, "
@@ -967,16 +950,6 @@ nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b)
break;
}
- if (port->pid != hdr->dst_pid) {
- nxt_log_error(NXT_LOG_ERR, task->log,
- "send mmap buffer for %PI to %PI, "
- "using plain mode", hdr->dst_pid, port->pid);
-
- m = NXT_PORT_METHOD_PLAIN;
-
- break;
- }
-
if (m == NXT_PORT_METHOD_ANY) {
nxt_debug(task, "using mmap mode");