diff options
Diffstat (limited to 'tools/unitctl/unit-client-rs')
-rw-r--r-- | tools/unitctl/unit-client-rs/Cargo.toml | 2 | ||||
-rw-r--r-- | tools/unitctl/unit-client-rs/src/lib.rs | 1 | ||||
-rw-r--r-- | tools/unitctl/unit-client-rs/src/unit_client.rs | 146 | ||||
-rw-r--r-- | tools/unitctl/unit-client-rs/src/unitd_cmd.rs | 5 | ||||
-rw-r--r-- | tools/unitctl/unit-client-rs/src/unitd_docker.rs | 282 | ||||
-rw-r--r-- | tools/unitctl/unit-client-rs/src/unitd_instance.rs | 77 | ||||
-rw-r--r-- | tools/unitctl/unit-client-rs/src/unitd_process.rs | 34 |
7 files changed, 447 insertions, 100 deletions
diff --git a/tools/unitctl/unit-client-rs/Cargo.toml b/tools/unitctl/unit-client-rs/Cargo.toml index d3b2f9cf..3e48ee23 100644 --- a/tools/unitctl/unit-client-rs/Cargo.toml +++ b/tools/unitctl/unit-client-rs/Cargo.toml @@ -27,6 +27,8 @@ which = "5.0" unit-openapi = { path = "../unit-openapi" } rustls = "0.23.5" +bollard = "0.16.1" +regex = "1.10.4" [dev-dependencies] rand = "0.8.5" diff --git a/tools/unitctl/unit-client-rs/src/lib.rs b/tools/unitctl/unit-client-rs/src/lib.rs index dca8a86f..a0933f42 100644 --- a/tools/unitctl/unit-client-rs/src/lib.rs +++ b/tools/unitctl/unit-client-rs/src/lib.rs @@ -10,6 +10,7 @@ mod runtime_flags; pub mod unit_client; mod unitd_cmd; pub mod unitd_configure_options; +pub mod unitd_docker; pub mod unitd_instance; pub mod unitd_process; mod unitd_process_user; diff --git a/tools/unitctl/unit-client-rs/src/unit_client.rs b/tools/unitctl/unit-client-rs/src/unit_client.rs index b856fd20..7456b106 100644 --- a/tools/unitctl/unit-client-rs/src/unit_client.rs +++ b/tools/unitctl/unit-client-rs/src/unit_client.rs @@ -1,7 +1,6 @@ use std::collections::HashMap; use std::error::Error as StdError; use std::fmt::Debug; -use std::future::Future; use std::rc::Rc; use std::{fmt, io}; @@ -13,7 +12,6 @@ use hyper::{http, Body, Client, Request}; use hyper_tls::HttpsConnector; use hyperlocal::{UnixClientExt, UnixConnector}; use serde::{Deserialize, Serialize}; -use tokio::runtime::Runtime; use crate::control_socket_address::ControlSocket; use unit_openapi::apis::configuration::Configuration; @@ -168,51 +166,38 @@ where #[derive(Debug)] pub struct UnitClient { pub control_socket: ControlSocket, - /// A `current_thread` runtime for executing operations on the - /// asynchronous client in a blocking manner. - rt: Runtime, /// Client for communicating with the control API over the UNIX domain socket client: Box<RemoteClient<Body>>, } impl UnitClient { - pub fn new_with_runtime(control_socket: ControlSocket, runtime: Runtime) -> Self { + pub fn new(control_socket: ControlSocket) -> Self { if control_socket.is_local_socket() { - Self::new_unix(control_socket, runtime) + Self::new_unix(control_socket) } else { - Self::new_http(control_socket, runtime) + Self::new_http(control_socket) } } - pub fn new(control_socket: ControlSocket) -> Self { - let runtime = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .expect("Unable to create a current_thread runtime"); - Self::new_with_runtime(control_socket, runtime) - } - - pub fn new_http(control_socket: ControlSocket, runtime: Runtime) -> Self { + pub fn new_http(control_socket: ControlSocket) -> Self { let remote_client = Client::builder().build(HttpsConnector::new()); Self { control_socket, - rt: runtime, client: Box::from(RemoteClient::Tcp { client: remote_client }), } } - pub fn new_unix(control_socket: ControlSocket, runtime: Runtime) -> UnitClient { + pub fn new_unix(control_socket: ControlSocket) -> UnitClient { let remote_client = Client::unix(); Self { control_socket, - rt: runtime, client: Box::from(RemoteClient::Unix { client: remote_client }), } } /// Sends a request to UNIT and deserializes the JSON response body into the value of type `RESPONSE`. - pub fn send_request_and_deserialize_response<RESPONSE: for<'de> serde::Deserialize<'de>>( + pub async fn send_request_and_deserialize_response<RESPONSE: for<'de> serde::Deserialize<'de>>( &self, mut request: Request<Body>, ) -> Result<RESPONSE, UnitClientError> { @@ -223,34 +208,32 @@ impl UnitClient { let response_future = self.client.request(request); - self.rt.block_on(async { - let response = response_future - .await - .map_err(|error| UnitClientError::new(error, self.control_socket.to_string(), path.to_string()))?; - - let status = response.status(); - let body = hyper::body::aggregate(response) - .await - .map_err(|error| UnitClientError::new(error, self.control_socket.to_string(), path.to_string()))?; - let reader = &mut body.reader(); - if !status.is_success() { - let error: HashMap<String, String> = - serde_json::from_reader(reader).map_err(|error| UnitClientError::JsonError { - source: error, - path: path.to_string(), - })?; - - return Err(UnitClientError::HttpResponseJsonBodyError { - status, + let response = response_future + .await + .map_err(|error| UnitClientError::new(error, self.control_socket.to_string(), path.to_string()))?; + + let status = response.status(); + let body = hyper::body::aggregate(response) + .await + .map_err(|error| UnitClientError::new(error, self.control_socket.to_string(), path.to_string()))?; + let reader = &mut body.reader(); + if !status.is_success() { + let error: HashMap<String, String> = + serde_json::from_reader(reader).map_err(|error| UnitClientError::JsonError { + source: error, path: path.to_string(), - error: error.get("error").unwrap_or(&"Unknown error".into()).to_string(), - detail: error.get("detail").unwrap_or(&"".into()).to_string(), - }); - } - serde_json::from_reader(reader).map_err(|error| UnitClientError::JsonError { - source: error, + })?; + + return Err(UnitClientError::HttpResponseJsonBodyError { + status, path: path.to_string(), - }) + error: error.get("error").unwrap_or(&"Unknown error".into()).to_string(), + detail: error.get("detail").unwrap_or(&"".into()).to_string(), + }); + } + serde_json::from_reader(reader).map_err(|error| UnitClientError::JsonError { + source: error, + path: path.to_string(), }) } @@ -258,23 +241,17 @@ impl UnitClient { new_openapi_client!(self, ListenersApiClient, ListenersApi) } - pub fn listeners(&self) -> Result<HashMap<String, ConfigListener>, Box<UnitClientError>> { - let list_listeners = self.listeners_api().get_listeners(); - self.execute_openapi_future(list_listeners) - } - - pub fn execute_openapi_future<F: Future<Output = Result<R, OpenAPIError>>, R: for<'de> serde::Deserialize<'de>>( - &self, - future: F, - ) -> Result<R, Box<UnitClientError>> { - self.rt.block_on(future).map_err(|error| { - let remapped_error = if let OpenAPIError::Hyper(hyper_error) = error { - UnitClientError::new(hyper_error, self.control_socket.to_string(), "".to_string()) + pub async fn listeners(&self) -> Result<HashMap<String, ConfigListener>, Box<UnitClientError>> { + self.listeners_api().get_listeners().await.or_else(|err| { + if let OpenAPIError::Hyper(hyper_error) = err { + Err(Box::new(UnitClientError::new( + hyper_error, + self.control_socket.to_string(), + "".to_string(), + ))) } else { - UnitClientError::OpenAPIError { source: error } - }; - - Box::new(remapped_error) + Err(Box::new(UnitClientError::OpenAPIError { source: err })) + } }) } @@ -282,13 +259,22 @@ impl UnitClient { new_openapi_client!(self, StatusApiClient, StatusApi) } - pub fn status(&self) -> Result<Status, Box<UnitClientError>> { - let status = self.status_api().get_status(); - self.execute_openapi_future(status) + pub async fn status(&self) -> Result<Status, Box<UnitClientError>> { + self.status_api().get_status().await.or_else(|err| { + if let OpenAPIError::Hyper(hyper_error) = err { + Err(Box::new(UnitClientError::new( + hyper_error, + self.control_socket.to_string(), + "".to_string(), + ))) + } else { + Err(Box::new(UnitClientError::OpenAPIError { source: err })) + } + }) } - pub fn is_running(&self) -> bool { - self.status().is_ok() + pub async fn is_running(&self) -> bool { + self.status().await.is_ok() } } @@ -336,9 +322,9 @@ mod tests { use super::*; // Integration tests - #[test] - fn can_connect_to_unit_api() { - match UnitdInstance::running_unitd_instances().first() { + #[tokio::test] + async fn can_connect_to_unit_api() { + match UnitdInstance::running_unitd_instances().await.first() { Some(unit_instance) => { let control_api_socket_address = unit_instance .control_api_socket_address() @@ -346,7 +332,7 @@ mod tests { let control_socket = ControlSocket::try_from(control_api_socket_address) .expect("Unable to parse control socket address"); let unit_client = UnitClient::new(control_socket); - assert!(unit_client.is_running()); + assert!(unit_client.is_running().await); } None => { eprintln!("No running unitd instances found - skipping test"); @@ -354,9 +340,9 @@ mod tests { } } - #[test] - fn can_get_unit_status() { - match UnitdInstance::running_unitd_instances().first() { + #[tokio::test] + async fn can_get_unit_status() { + match UnitdInstance::running_unitd_instances().await.first() { Some(unit_instance) => { let control_api_socket_address = unit_instance .control_api_socket_address() @@ -364,7 +350,7 @@ mod tests { let control_socket = ControlSocket::try_from(control_api_socket_address) .expect("Unable to parse control socket address"); let unit_client = UnitClient::new(control_socket); - let status = unit_client.status().expect("Unable to get unit status"); + let status = unit_client.status().await.expect("Unable to get unit status"); println!("Unit status: {:?}", status); } None => { @@ -373,9 +359,9 @@ mod tests { } } - #[test] - fn can_get_unit_listeners() { - match UnitdInstance::running_unitd_instances().first() { + #[tokio::test] + async fn can_get_unit_listeners() { + match UnitdInstance::running_unitd_instances().await.first() { Some(unit_instance) => { let control_api_socket_address = unit_instance .control_api_socket_address() @@ -383,7 +369,7 @@ mod tests { let control_socket = ControlSocket::try_from(control_api_socket_address) .expect("Unable to parse control socket address"); let unit_client = UnitClient::new(control_socket); - unit_client.listeners().expect("Unable to get Unit listeners"); + unit_client.listeners().await.expect("Unable to get Unit listeners"); } None => { eprintln!("No running unitd instances found - skipping test"); diff --git a/tools/unitctl/unit-client-rs/src/unitd_cmd.rs b/tools/unitctl/unit-client-rs/src/unitd_cmd.rs index c4883ed5..17563cb0 100644 --- a/tools/unitctl/unit-client-rs/src/unitd_cmd.rs +++ b/tools/unitctl/unit-client-rs/src/unitd_cmd.rs @@ -28,11 +28,13 @@ impl UnitdCmd { .expect("Unable to parse cmd") .splitn(2, " [") .collect::<Vec<&str>>(); + if parts.len() != 2 { let msg = format!("cmd does not have the expected format: {}", process_cmd); return Err(IoError::new(ErrorKind::InvalidInput, msg).into()); } - let version: Option<String> = Some(parts[0].to_string()); + + let version = Some(parts[0].to_string()); let executable_path = UnitdCmd::parse_executable_path_from_cmd(parts[1], binary_name); let flags = UnitdCmd::parse_runtime_flags_from_cmd(parts[1]); @@ -69,6 +71,7 @@ impl UnitdCmd { if cmd.is_empty() { return None; } + // Split out everything in between the brackets [ and ] let split = cmd.trim_end_matches(']').splitn(2, '[').collect::<Vec<&str>>(); if split.is_empty() { diff --git a/tools/unitctl/unit-client-rs/src/unitd_docker.rs b/tools/unitctl/unit-client-rs/src/unitd_docker.rs new file mode 100644 index 00000000..d5028afc --- /dev/null +++ b/tools/unitctl/unit-client-rs/src/unitd_docker.rs @@ -0,0 +1,282 @@ +use std::collections::HashMap; +use std::fs::read_to_string; +use std::path::PathBuf; + +use crate::unitd_process::UnitdProcess; + +use bollard::secret::ContainerInspectResponse; +use regex::Regex; +use serde::ser::SerializeMap; +use serde::{Serialize, Serializer}; + +use bollard::{models::ContainerSummary, Docker}; + +#[derive(Clone, Debug)] +pub struct UnitdContainer { + pub container_id: Option<String>, + pub container_image: String, + pub command: Option<String>, + pub mounts: HashMap<PathBuf, PathBuf>, + pub platform: String, + details: Option<ContainerInspectResponse>, +} + +impl From<&ContainerSummary> for UnitdContainer { + fn from(ctr: &ContainerSummary) -> Self { + // we assume paths from the docker api are absolute + // they certainly have to be later... + let mut mounts = HashMap::new(); + if let Some(mts) = &ctr.mounts { + for i in mts { + if let Some(ref src) = i.source { + if let Some(ref dest) = i.destination { + mounts.insert(PathBuf::from(dest.clone()), PathBuf::from(src.clone())); + } + } + } + } + + UnitdContainer { + container_id: ctr.id.clone(), + container_image: format!( + "{} (docker)", + ctr.image.clone().unwrap_or(String::from("unknown container")), + ), + command: ctr.command.clone(), + mounts: mounts, + platform: String::from("Docker"), + details: None, + } + } +} + +impl From<&UnitdContainer> for UnitdProcess { + fn from(ctr: &UnitdContainer) -> Self { + let version = ctr.details.as_ref().and_then(|details| { + details.config.as_ref().and_then(|conf| { + conf.labels.as_ref().and_then(|labels| { + labels + .get("org.opencontainers.image.version") + .and_then(|version| Some(version.clone())) + }) + }) + }); + let command = ctr.command.clone().and_then(|cmd| { + Some(format!( + "{}{} [{}{}]", + "unit: main v", + version.or(Some(String::from(""))).unwrap(), + ctr.container_image, + ctr.rewrite_socket( + cmd.strip_prefix("/usr/local/bin/docker-entrypoint.sh") + .or_else(|| Some("")) + .unwrap() + .to_string()) + )) + }); + let mut cmds = vec![]; + let _ = command.map_or((), |cmd| cmds.push(cmd)); + UnitdProcess { + all_cmds: cmds, + binary_name: ctr.container_image.clone(), + process_id: ctr + .details + .as_ref() + .and_then(|details| { + details + .state + .as_ref() + .and_then(|state| state.pid.and_then(|pid| Some(pid.clone() as u64))) + }) + .or(Some(0 as u64)) + .unwrap(), + executable_path: None, + environ: vec![], + working_dir: ctr.details.as_ref().and_then(|details| { + details.config.as_ref().and_then(|conf| { + Some( + PathBuf::from( + conf.working_dir + .as_ref() + .map_or(String::new(), |dir| ctr.host_path(dir.clone())), + ) + .into_boxed_path(), + ) + }) + }), + child_pids: vec![], + user: None, + effective_user: None, + container: Some(ctr.clone()), + } + } +} + +impl Serialize for UnitdContainer { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: Serializer, + { + let mut state = serializer.serialize_map(Some(5))?; + state.serialize_entry("container_id", &self.container_id)?; + state.serialize_entry("container_image", &self.container_image)?; + state.serialize_entry("command", &self.command)?; + state.serialize_entry("mounts", &self.mounts)?; + state.serialize_entry("platform", &self.platform)?; + state.end() + } +} + +impl UnitdContainer { + pub async fn find_unitd_containers() -> Vec<UnitdContainer> { + if let Ok(docker) = Docker::connect_with_local_defaults() { + match docker.list_containers::<String>(None).await { + Err(e) => { + eprintln!("{}", e); + vec![] + } + Ok(summary) => { + // cant do this functionally because of the async call + let mut mapped = vec![]; + for ctr in summary { + if ctr.clone().image.or(Some(String::new())).unwrap().contains("unit") { + let mut c = UnitdContainer::from(&ctr); + if let Some(names) = ctr.names { + if names.len() > 0 { + let name = names[0].strip_prefix("/").or(Some(names[0].as_str())).unwrap(); + if let Ok(cir) = docker.inspect_container(name, None).await { + c.details = Some(cir); + } + } + } + mapped.push(c); + } + } + mapped + } + } + } else { + vec![] + } + } + + pub fn host_path(&self, container_path: String) -> String { + let cp = PathBuf::from(container_path); + + // get only possible mount points + // sort to deepest mountpoint first + // assumed deepest possible mount point takes precedence + let mut keys = self + .mounts + .clone() + .into_keys() + .filter(|mp| cp.as_path().starts_with(mp)) + .collect::<Vec<_>>(); + keys.sort_by_key(|a| 0 as isize - a.ancestors().count() as isize); + + // either return translated path or original prefixed with "container" + if keys.len() > 0 { + self.mounts[&keys[0]] + .clone() + .join( + cp.as_path() + .strip_prefix(keys[0].clone()) + .expect("error checking path prefix"), + ) + .to_string_lossy() + .to_string() + } else { + format!("<container>:{}", cp.display()) + } + } + + pub fn rewrite_socket(&self, command: String) -> String { + command + .split(" ") + .map(|tok| if tok.starts_with("unix:") { + format!("unix:{}", self.host_path( + tok.strip_prefix("unix:") + .unwrap() + .to_string())) + } else { + tok.to_string() + }) + .collect::<Vec<_>>() + .join(" ") + } + + pub fn container_is_running(&self) -> Option<bool> { + self.details + .as_ref() + .and_then(|details| details.state.as_ref().and_then(|state| state.running)) + } +} + +/* Returns either 64 char docker container ID or None */ +pub fn pid_is_dockerized(pid: u64) -> bool { + let cg_filepath = format!("/proc/{}/cgroup", pid); + match read_to_string(cg_filepath) { + Err(e) => { + eprintln!("{}", e); + false + } + Ok(contents) => { + let docker_re = Regex::new(r"docker-([a-zA-Z0-9]{64})").unwrap(); + docker_re.is_match(contents.as_str()) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_path_translation() { + let mut mounts = HashMap::new(); + mounts.insert("/1/2/3/4/5/6/7".into(), "/0".into()); + mounts.insert("/root".into(), "/1".into()); + mounts.insert("/root/mid".into(), "/2".into()); + mounts.insert("/root/mid/child".into(), "/3".into()); + mounts.insert("/mid/child".into(), "/4".into()); + mounts.insert("/child".into(), "/5".into()); + + let ctr = UnitdContainer { + container_id: None, + container_image: String::from(""), + command: None, + platform: "test".to_string(), + details: None, + mounts: mounts, + }; + + assert_eq!( + "/3/c2/test".to_string(), + ctr.host_path("/root/mid/child/c2/test".to_string()) + ); + assert_eq!( + "<container>:/path/to/conf".to_string(), + ctr.host_path("/path/to/conf".to_string()) + ); + } + + #[test] + fn test_unix_sock_path_translate() { + let mut mounts = HashMap::new(); + mounts.insert("/var/run".into(), "/tmp".into()); + + let ctr = UnitdContainer { + container_id: None, + container_image: String::from(""), + command: None, + platform: "test".to_string(), + details: None, + mounts: mounts, + }; + + assert_eq!( + ctr.rewrite_socket("unitd --no-daemon --control unix:/var/run/control.unit.sock".to_string()), + "unitd --no-daemon --control unix:/tmp/control.unit.sock".to_string()); + + } +} diff --git a/tools/unitctl/unit-client-rs/src/unitd_instance.rs b/tools/unitctl/unit-client-rs/src/unitd_instance.rs index 9467fcb7..86f8e73d 100644 --- a/tools/unitctl/unit-client-rs/src/unitd_instance.rs +++ b/tools/unitctl/unit-client-rs/src/unitd_instance.rs @@ -1,4 +1,5 @@ use crate::unit_client::UnitClientError; +use crate::unitd_docker::UnitdContainer; use serde::ser::SerializeMap; use serde::{Serialize, Serializer}; use std::error::Error as StdError; @@ -25,7 +26,7 @@ impl Serialize for UnitdInstance { where S: Serializer, { - let mut state = serializer.serialize_map(Some(15))?; + let mut state = serializer.serialize_map(Some(11))?; let runtime_flags = self .process .cmd() @@ -34,13 +35,9 @@ impl Serialize for UnitdInstance { let configure_flags = self.configure_options.as_ref().map(|opts| opts.all_flags.clone()); - state.serialize_entry("pid", &self.process.process_id)?; + state.serialize_entry("process", &self.process)?; state.serialize_entry("version", &self.version())?; - state.serialize_entry("user", &self.process.user)?; - state.serialize_entry("effective_user", &self.process.effective_user)?; - state.serialize_entry("executable", &self.process.executable_path())?; state.serialize_entry("control_socket", &self.control_api_socket_address())?; - state.serialize_entry("child_pids", &self.process.child_pids)?; state.serialize_entry("log_path", &self.log_path())?; state.serialize_entry("pid_path", &self.pid_path())?; state.serialize_entry("modules_directory", &self.modules_directory())?; @@ -56,8 +53,19 @@ impl Serialize for UnitdInstance { } impl UnitdInstance { - pub fn running_unitd_instances() -> Vec<UnitdInstance> { - Self::collect_unitd_processes(UnitdProcess::find_unitd_processes()) + pub async fn running_unitd_instances() -> Vec<UnitdInstance> { + Self::collect_unitd_processes( + UnitdProcess::find_unitd_processes() + .into_iter() + .chain( + UnitdContainer::find_unitd_containers() + .await + .into_iter() + .map(|x| UnitdProcess::from(&x)) + .collect::<Vec<_>>(), + ) + .collect(), + ) } /// Find all running unitd processes and convert them into UnitdInstances and filter @@ -91,11 +99,14 @@ impl UnitdInstance { pid: process.process_id, })?; Ok(new_path) - } else { + } else if process.container.is_none() { Err(UnitClientError::UnitdProcessParseError { message: "Unable to get absolute unitd executable path from process".to_string(), pid: process.process_id, }) + } else { + // container case + Ok(PathBuf::from("/").into_boxed_path()) } } None => Err(UnitClientError::UnitdProcessParseError { @@ -107,7 +118,30 @@ impl UnitdInstance { fn map_process_to_unitd_instance(process: &UnitdProcess) -> UnitdInstance { match unitd_path_from_process(process) { - Ok(unitd_path) => match UnitdConfigureOptions::new(&unitd_path.clone().into_path_buf()) { + Ok(_) if process.container.is_some() => { + let mut err = vec![]; + // double check that it is running + let running = process.container + .as_ref() + .unwrap() + .container_is_running(); + + if running.is_none() || !running.unwrap() { + err.push(UnitClientError::UnitdProcessParseError{ + message: "process container is not running".to_string(), + pid: process.process_id, + }); + } + + UnitdInstance { + process: process.to_owned(), + configure_options: None, + errors: err, + } + }, + Ok(unitd_path) => match UnitdConfigureOptions::new( + &unitd_path.clone() + .into_path_buf()) { Ok(configure_options) => UnitdInstance { process: process.to_owned(), configure_options: Some(configure_options), @@ -250,10 +284,22 @@ impl fmt::Display for UnitdInstance { writeln!(f, " API control unix socket: {}", socket_address)?; writeln!(f, " Child processes ids: {}", child_pids)?; writeln!(f, " Runtime flags: {}", runtime_flags)?; - write!(f, " Configure options: {}", configure_flags)?; + writeln!(f, " Configure options: {}", configure_flags)?; + + if let Some(ctr) = &self.process.container { + writeln!(f, " Container:")?; + writeln!(f, " Platform: {}", ctr.platform)?; + if let Some(id) = ctr.container_id.clone() { + writeln!(f, " Container ID: {}", id)?; + } + writeln!(f, " Mounts:")?; + for (k, v) in &ctr.mounts { + writeln!(f, " {} => {}", k.to_string_lossy(), v.to_string_lossy())?; + } + } if !self.errors.is_empty() { - write!(f, "\n Errors:")?; + write!(f, " Errors:")?; for error in &self.errors { write!(f, "\n {}", error)?; } @@ -302,9 +348,9 @@ mod tests { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, ]; - #[test] - fn can_find_unitd_instances() { - UnitdInstance::running_unitd_instances().iter().for_each(|p| { + #[tokio::test] + async fn can_find_unitd_instances() { + UnitdInstance::running_unitd_instances().await.iter().for_each(|p| { println!("{:?}", p); println!("Runtime Flags: {:?}", p.process.cmd().map(|c| c.flags)); println!("Temp directory: {:?}", p.tmp_directory()); @@ -326,6 +372,7 @@ mod tests { child_pids: vec![], user: None, effective_user: None, + container: None, } } diff --git a/tools/unitctl/unit-client-rs/src/unitd_process.rs b/tools/unitctl/unit-client-rs/src/unitd_process.rs index b8604e89..2a78bfc6 100644 --- a/tools/unitctl/unit-client-rs/src/unitd_process.rs +++ b/tools/unitctl/unit-client-rs/src/unitd_process.rs @@ -1,6 +1,9 @@ use crate::unitd_cmd::UnitdCmd; +use crate::unitd_docker::{pid_is_dockerized, UnitdContainer}; use crate::unitd_instance::UNITD_BINARY_NAMES; use crate::unitd_process_user::UnitdProcessUser; +use serde::ser::SerializeMap; +use serde::{Serialize, Serializer}; use std::collections::HashMap; use std::path::Path; use sysinfo::{Pid, Process, ProcessRefreshKind, System, UpdateKind, Users}; @@ -16,6 +19,23 @@ pub struct UnitdProcess { pub child_pids: Vec<u64>, pub user: Option<UnitdProcessUser>, pub effective_user: Option<UnitdProcessUser>, + pub container: Option<UnitdContainer>, +} + +impl Serialize for UnitdProcess { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: Serializer, + { + let mut state = serializer.serialize_map(Some(6))?; + state.serialize_entry("pid", &self.process_id)?; + state.serialize_entry("user", &self.user)?; + state.serialize_entry("effective_user", &self.effective_user)?; + state.serialize_entry("executable", &self.executable_path())?; + state.serialize_entry("child_pids", &self.child_pids)?; + state.serialize_entry("container", &self.container)?; + state.end() + } } impl UnitdProcess { @@ -41,10 +61,15 @@ impl UnitdProcess { .iter() // Filter out child processes .filter(|p| { - let parent_pid = p.1.parent(); - match parent_pid { - Some(pid) => !unitd_processes.contains_key(&pid), - None => false, + #[cfg(target_os = "linux")] + if pid_is_dockerized(p.0.as_u32().into()) { + false + } else { + let parent_pid = p.1.parent(); + match parent_pid { + Some(pid) => !unitd_processes.contains_key(&pid), + None => false, + } } }) .map(|p| { @@ -85,6 +110,7 @@ impl UnitdProcess { child_pids, user, effective_user, + container: None, } }) .collect::<Vec<UnitdProcess>>() |