summaryrefslogtreecommitdiffhomepage
path: root/src/nxt_port.c
diff options
context:
space:
mode:
authorAndrei Belov <defan@nginx.com>2021-11-18 17:04:04 +0300
committerAndrei Belov <defan@nginx.com>2021-11-18 17:04:04 +0300
commitb400ccd1aa8eeb6a5de1707e0bb8c3d417fe69b7 (patch)
tree60ff49ffc16ef7cb3aad1beb2d78f051a8794cdf /src/nxt_port.c
parentfafd44166d9e835e91a4c5668048308ce99a62bd (diff)
parentb77895d1c7d6cd4826ac7427c91baa95b998a912 (diff)
downloadunit-b400ccd1aa8eeb6a5de1707e0bb8c3d417fe69b7.tar.gz
unit-b400ccd1aa8eeb6a5de1707e0bb8c3d417fe69b7.tar.bz2
Merged with the default branch.1.26.0-1
Diffstat (limited to 'src/nxt_port.c')
-rw-r--r--src/nxt_port.c75
1 files changed, 67 insertions, 8 deletions
diff --git a/src/nxt_port.c b/src/nxt_port.c
index d4e46564..1e8fa28a 100644
--- a/src/nxt_port.c
+++ b/src/nxt_port.c
@@ -12,6 +12,8 @@
#include <nxt_port_queue.h>
+static void nxt_port_remove_pid(nxt_task_t *task, nxt_port_recv_msg_t *msg,
+ nxt_pid_t pid);
static void nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
static nxt_atomic_uint_t nxt_port_last_id = 1;
@@ -274,6 +276,8 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
nxt_debug(task, "port %PI:%d already exists", new_port_msg->pid,
new_port_msg->id);
+ msg->u.new_port = port;
+
nxt_fd_close(msg->fd[0]);
msg->fd[0] = -1;
return;
@@ -384,14 +388,13 @@ nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt, nxt_uint_t slot,
port = nxt_process_port_first(process);
- b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
- sizeof(nxt_port_data_t));
+ b = nxt_buf_mem_alloc(task->thread->engine->mem_pool,
+ sizeof(nxt_uint_t), 0);
if (nxt_slow_path(b == NULL)) {
continue;
}
- *(nxt_uint_t *) b->mem.pos = slot;
- b->mem.free += sizeof(nxt_uint_t);
+ b->mem.free = nxt_cpymem(b->mem.free, &slot, sizeof(nxt_uint_t));
(void) nxt_port_socket_write(task, port, NXT_PORT_MSG_CHANGE_FILE,
fd, 0, 0, b);
@@ -448,18 +451,74 @@ nxt_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
void
-nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
+nxt_port_remove_notify_others(nxt_task_t *task, nxt_process_t *process)
{
- nxt_buf_t *buf;
nxt_pid_t pid;
+ nxt_buf_t *buf;
+ nxt_port_t *port;
nxt_runtime_t *rt;
- nxt_process_t *process;
+ nxt_process_t *p;
+ nxt_process_type_t ptype;
+
+ pid = process->pid;
+
+ ptype = nxt_process_type(process);
+
+ rt = task->thread->runtime;
+
+ nxt_runtime_process_each(rt, p) {
+
+ if (p->pid == nxt_pid
+ || p->pid == pid
+ || nxt_queue_is_empty(&p->ports))
+ {
+ continue;
+ }
+
+ port = nxt_process_port_first(p);
+
+ if (nxt_proc_remove_notify_matrix[ptype][port->type] == 0) {
+ continue;
+ }
+
+ buf = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
+ sizeof(pid));
+
+ if (nxt_slow_path(buf == NULL)) {
+ continue;
+ }
+
+ buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(pid));
+
+ nxt_port_socket_write(task, port, NXT_PORT_MSG_REMOVE_PID, -1,
+ process->stream, 0, buf);
+
+ } nxt_runtime_process_loop;
+}
+
+
+void
+nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
+{
+ nxt_pid_t pid;
+ nxt_buf_t *buf;
buf = msg->buf;
nxt_assert(nxt_buf_used_size(buf) == sizeof(pid));
- nxt_memcpy(&pid, buf->mem.pos, sizeof(pid));
+ nxt_memcpy(&pid, buf->mem.pos, sizeof(nxt_pid_t));
+
+ nxt_port_remove_pid(task, msg, pid);
+}
+
+
+static void
+nxt_port_remove_pid(nxt_task_t *task, nxt_port_recv_msg_t *msg,
+ nxt_pid_t pid)
+{
+ nxt_runtime_t *rt;
+ nxt_process_t *process;
msg->u.removed_pid = pid;