diff options
Diffstat (limited to 'src/nxt_port.c')
-rw-r--r-- | src/nxt_port.c | 75 |
1 files changed, 67 insertions, 8 deletions
diff --git a/src/nxt_port.c b/src/nxt_port.c index d4e46564..1e8fa28a 100644 --- a/src/nxt_port.c +++ b/src/nxt_port.c @@ -12,6 +12,8 @@ #include <nxt_port_queue.h> +static void nxt_port_remove_pid(nxt_task_t *task, nxt_port_recv_msg_t *msg, + nxt_pid_t pid); static void nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); static nxt_atomic_uint_t nxt_port_last_id = 1; @@ -274,6 +276,8 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_debug(task, "port %PI:%d already exists", new_port_msg->pid, new_port_msg->id); + msg->u.new_port = port; + nxt_fd_close(msg->fd[0]); msg->fd[0] = -1; return; @@ -384,14 +388,13 @@ nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt, nxt_uint_t slot, port = nxt_process_port_first(process); - b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, - sizeof(nxt_port_data_t)); + b = nxt_buf_mem_alloc(task->thread->engine->mem_pool, + sizeof(nxt_uint_t), 0); if (nxt_slow_path(b == NULL)) { continue; } - *(nxt_uint_t *) b->mem.pos = slot; - b->mem.free += sizeof(nxt_uint_t); + b->mem.free = nxt_cpymem(b->mem.free, &slot, sizeof(nxt_uint_t)); (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_CHANGE_FILE, fd, 0, 0, b); @@ -448,18 +451,74 @@ nxt_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) void -nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +nxt_port_remove_notify_others(nxt_task_t *task, nxt_process_t *process) { - nxt_buf_t *buf; nxt_pid_t pid; + nxt_buf_t *buf; + nxt_port_t *port; nxt_runtime_t *rt; - nxt_process_t *process; + nxt_process_t *p; + nxt_process_type_t ptype; + + pid = process->pid; + + ptype = nxt_process_type(process); + + rt = task->thread->runtime; + + nxt_runtime_process_each(rt, p) { + + if (p->pid == nxt_pid + || p->pid == pid + || nxt_queue_is_empty(&p->ports)) + { + continue; + } + + port = nxt_process_port_first(p); + + if (nxt_proc_remove_notify_matrix[ptype][port->type] == 0) { + continue; + } + + buf = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, + sizeof(pid)); + + if (nxt_slow_path(buf == NULL)) { + continue; + } + + buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(pid)); + + nxt_port_socket_write(task, port, NXT_PORT_MSG_REMOVE_PID, -1, + process->stream, 0, buf); + + } nxt_runtime_process_loop; +} + + +void +nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + nxt_pid_t pid; + nxt_buf_t *buf; buf = msg->buf; nxt_assert(nxt_buf_used_size(buf) == sizeof(pid)); - nxt_memcpy(&pid, buf->mem.pos, sizeof(pid)); + nxt_memcpy(&pid, buf->mem.pos, sizeof(nxt_pid_t)); + + nxt_port_remove_pid(task, msg, pid); +} + + +static void +nxt_port_remove_pid(nxt_task_t *task, nxt_port_recv_msg_t *msg, + nxt_pid_t pid) +{ + nxt_runtime_t *rt; + nxt_process_t *process; msg->u.removed_pid = pid; |