diff options
author | Andrei Belov <defan@nginx.com> | 2021-11-18 17:04:04 +0300 |
---|---|---|
committer | Andrei Belov <defan@nginx.com> | 2021-11-18 17:04:04 +0300 |
commit | b400ccd1aa8eeb6a5de1707e0bb8c3d417fe69b7 (patch) | |
tree | 60ff49ffc16ef7cb3aad1beb2d78f051a8794cdf /src/nxt_port.c | |
parent | fafd44166d9e835e91a4c5668048308ce99a62bd (diff) | |
parent | b77895d1c7d6cd4826ac7427c91baa95b998a912 (diff) | |
download | unit-b400ccd1aa8eeb6a5de1707e0bb8c3d417fe69b7.tar.gz unit-b400ccd1aa8eeb6a5de1707e0bb8c3d417fe69b7.tar.bz2 |
Merged with the default branch.1.26.0-1
Diffstat (limited to 'src/nxt_port.c')
-rw-r--r-- | src/nxt_port.c | 75 |
1 files changed, 67 insertions, 8 deletions
diff --git a/src/nxt_port.c b/src/nxt_port.c index d4e46564..1e8fa28a 100644 --- a/src/nxt_port.c +++ b/src/nxt_port.c @@ -12,6 +12,8 @@ #include <nxt_port_queue.h> +static void nxt_port_remove_pid(nxt_task_t *task, nxt_port_recv_msg_t *msg, + nxt_pid_t pid); static void nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); static nxt_atomic_uint_t nxt_port_last_id = 1; @@ -274,6 +276,8 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_debug(task, "port %PI:%d already exists", new_port_msg->pid, new_port_msg->id); + msg->u.new_port = port; + nxt_fd_close(msg->fd[0]); msg->fd[0] = -1; return; @@ -384,14 +388,13 @@ nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt, nxt_uint_t slot, port = nxt_process_port_first(process); - b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, - sizeof(nxt_port_data_t)); + b = nxt_buf_mem_alloc(task->thread->engine->mem_pool, + sizeof(nxt_uint_t), 0); if (nxt_slow_path(b == NULL)) { continue; } - *(nxt_uint_t *) b->mem.pos = slot; - b->mem.free += sizeof(nxt_uint_t); + b->mem.free = nxt_cpymem(b->mem.free, &slot, sizeof(nxt_uint_t)); (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_CHANGE_FILE, fd, 0, 0, b); @@ -448,18 +451,74 @@ nxt_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) void -nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +nxt_port_remove_notify_others(nxt_task_t *task, nxt_process_t *process) { - nxt_buf_t *buf; nxt_pid_t pid; + nxt_buf_t *buf; + nxt_port_t *port; nxt_runtime_t *rt; - nxt_process_t *process; + nxt_process_t *p; + nxt_process_type_t ptype; + + pid = process->pid; + + ptype = nxt_process_type(process); + + rt = task->thread->runtime; + + nxt_runtime_process_each(rt, p) { + + if (p->pid == nxt_pid + || p->pid == pid + || nxt_queue_is_empty(&p->ports)) + { + continue; + } + + port = nxt_process_port_first(p); + + if (nxt_proc_remove_notify_matrix[ptype][port->type] == 0) { + continue; + } + + buf = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, + sizeof(pid)); + + if (nxt_slow_path(buf == NULL)) { + continue; + } + + buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(pid)); + + nxt_port_socket_write(task, port, NXT_PORT_MSG_REMOVE_PID, -1, + process->stream, 0, buf); + + } nxt_runtime_process_loop; +} + + +void +nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + nxt_pid_t pid; + nxt_buf_t *buf; buf = msg->buf; nxt_assert(nxt_buf_used_size(buf) == sizeof(pid)); - nxt_memcpy(&pid, buf->mem.pos, sizeof(pid)); + nxt_memcpy(&pid, buf->mem.pos, sizeof(nxt_pid_t)); + + nxt_port_remove_pid(task, msg, pid); +} + + +static void +nxt_port_remove_pid(nxt_task_t *task, nxt_port_recv_msg_t *msg, + nxt_pid_t pid) +{ + nxt_runtime_t *rt; + nxt_process_t *process; msg->u.removed_pid = pid; |