summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorMax Romanov <max.romanov@nginx.com>2020-10-28 00:01:46 +0300
committerMax Romanov <max.romanov@nginx.com>2020-10-28 00:01:46 +0300
commitccee391ab28d2742a98c612c42a37d6dcdbcd5f7 (patch)
tree1636eeb079cb22307c4048c05d0db6d61d1e65af
parent735bb2f1276a7d768cffd1e780114a10018980cb (diff)
downloadunit-ccee391ab28d2742a98c612c42a37d6dcdbcd5f7.tar.gz
unit-ccee391ab28d2742a98c612c42a37d6dcdbcd5f7.tar.bz2
Router: broadcasting the SHM_ACK message to all process ports.
-rw-r--r--src/nxt_port_memory.c46
-rw-r--r--src/nxt_port_memory.h2
-rw-r--r--src/nxt_router.c3
3 files changed, 40 insertions, 11 deletions
diff --git a/src/nxt_port_memory.c b/src/nxt_port_memory.c
index 1e01629e..ae9f079c 100644
--- a/src/nxt_port_memory.c
+++ b/src/nxt_port_memory.c
@@ -17,6 +17,10 @@
#include <nxt_port_memory_int.h>
+static void nxt_port_broadcast_shm_ack(nxt_task_t *task, nxt_port_t *port,
+ void *data);
+
+
nxt_inline void
nxt_port_mmap_handler_use(nxt_port_mmap_handler_t *mmap_handler, int i)
{
@@ -112,7 +116,6 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
u_char *p;
nxt_mp_t *mp;
nxt_buf_t *b, *next;
- nxt_port_t *port;
nxt_process_t *process;
nxt_chunk_id_t c;
nxt_port_mmap_header_t *hdr;
@@ -171,14 +174,7 @@ complete_buf:
{
process = nxt_runtime_process_find(task->thread->runtime, hdr->src_pid);
- if (process != NULL && !nxt_queue_is_empty(&process->ports)) {
- port = nxt_process_port_first(process);
-
- if (port->type == NXT_PROCESS_APP) {
- (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_SHM_ACK,
- -1, 0, 0, NULL);
- }
- }
+ nxt_process_broadcast_shm_ack(task, process);
}
release_buf:
@@ -976,3 +972,35 @@ nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b)
return m;
}
+
+
+void
+nxt_process_broadcast_shm_ack(nxt_task_t *task, nxt_process_t *process)
+{
+ nxt_port_t *port;
+
+ if (nxt_slow_path(process == NULL || nxt_queue_is_empty(&process->ports)))
+ {
+ return;
+ }
+
+ port = nxt_process_port_first(process);
+
+ if (port->type == NXT_PROCESS_APP) {
+ nxt_port_post(task, port, nxt_port_broadcast_shm_ack, process);
+ }
+}
+
+
+static void
+nxt_port_broadcast_shm_ack(nxt_task_t *task, nxt_port_t *port, void *data)
+{
+ nxt_process_t *process;
+
+ process = data;
+
+ nxt_queue_each(port, &process->ports, nxt_port_t, link) {
+ (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_SHM_ACK,
+ -1, 0, 0, NULL);
+ } nxt_queue_loop;
+}
diff --git a/src/nxt_port_memory.h b/src/nxt_port_memory.h
index 8e71af3d..a2cdf5dd 100644
--- a/src/nxt_port_memory.h
+++ b/src/nxt_port_memory.h
@@ -71,4 +71,6 @@ nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b);
nxt_int_t nxt_shm_open(nxt_task_t *task, size_t size);
+void nxt_process_broadcast_shm_ack(nxt_task_t *task, nxt_process_t *process);
+
#endif /* _NXT_PORT_MEMORY_H_INCLUDED_ */
diff --git a/src/nxt_router.c b/src/nxt_router.c
index 15706428..cf627746 100644
--- a/src/nxt_router.c
+++ b/src/nxt_router.c
@@ -5389,8 +5389,7 @@ nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
nxt_thread_mutex_unlock(&process->incoming.mutex);
if (ack) {
- (void) nxt_port_socket_write(task, msg->port, NXT_PORT_MSG_SHM_ACK,
- -1, 0, 0, NULL);
+ nxt_process_broadcast_shm_ack(task, process);
}
}