diff options
author | oxpa <iippolitov@gmail.com> | 2024-09-17 14:21:10 +0100 |
---|---|---|
committer | oxpa <iippolitov@gmail.com> | 2024-09-17 14:21:10 +0100 |
commit | 2417826d8bebf921ee1be102ef8ce702f0683d66 (patch) | |
tree | 76d29a1705415ed7368870826dbb2f04942ee794 /tools/unitctl/unit-client-rs | |
parent | 0e79d961bb1ea68674961da1703ffedb1ddf6e43 (diff) | |
parent | 24ed91f40634372d99f67f0e4e3c2ac0abde81bd (diff) | |
download | unit-2417826d8bebf921ee1be102ef8ce702f0683d66.tar.gz unit-2417826d8bebf921ee1be102ef8ce702f0683d66.tar.bz2 |
Merge tag '1.33.0' into packaging.
Unit 1.33.0 release.
Diffstat (limited to 'tools/unitctl/unit-client-rs')
-rw-r--r-- | tools/unitctl/unit-client-rs/Cargo.toml | 35 | ||||
-rw-r--r-- | tools/unitctl/unit-client-rs/src/control_socket_address.rs | 569 | ||||
-rw-r--r-- | tools/unitctl/unit-client-rs/src/lib.rs | 16 | ||||
-rw-r--r-- | tools/unitctl/unit-client-rs/src/runtime_flags.rs | 90 | ||||
-rw-r--r-- | tools/unitctl/unit-client-rs/src/unit_client.rs | 424 | ||||
-rw-r--r-- | tools/unitctl/unit-client-rs/src/unitd_cmd.rs | 88 | ||||
-rw-r--r-- | tools/unitctl/unit-client-rs/src/unitd_configure_options.rs | 236 | ||||
-rw-r--r-- | tools/unitctl/unit-client-rs/src/unitd_docker.rs | 456 | ||||
-rw-r--r-- | tools/unitctl/unit-client-rs/src/unitd_instance.rs | 403 | ||||
-rw-r--r-- | tools/unitctl/unit-client-rs/src/unitd_process.rs | 196 | ||||
-rw-r--r-- | tools/unitctl/unit-client-rs/src/unitd_process_user.rs | 36 |
11 files changed, 2549 insertions, 0 deletions
diff --git a/tools/unitctl/unit-client-rs/Cargo.toml b/tools/unitctl/unit-client-rs/Cargo.toml new file mode 100644 index 00000000..6d873417 --- /dev/null +++ b/tools/unitctl/unit-client-rs/Cargo.toml @@ -0,0 +1,35 @@ +[package] +name = "unit-client-rs" +version = "1.33.0" +authors = ["Elijah Zupancic"] +edition = "2021" +license = "Apache-2.0" + +[lib] +name = "unit_client_rs" + +[features] +# this preserves the ordering of json +default = ["serde_json/preserve_order"] + +[dependencies] +custom_error = "1.9" +hyper = { version = "0.14", features = ["stream"] } +hyper-tls = "0.5" +hyperlocal = "0.8" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +sysinfo = "0.30.5" +tokio = { version = "1.34", features = ["macros"] } +futures = "0.3" +hex = "0.4" +which = "5.0" + +unit-openapi = { path = "../unit-openapi" } +rustls = "0.23.5" +bollard = "0.16.1" +regex = "1.10.4" +pbr = "1.1.1" + +[dev-dependencies] +rand = "0.8.5" diff --git a/tools/unitctl/unit-client-rs/src/control_socket_address.rs b/tools/unitctl/unit-client-rs/src/control_socket_address.rs new file mode 100644 index 00000000..438ab0ad --- /dev/null +++ b/tools/unitctl/unit-client-rs/src/control_socket_address.rs @@ -0,0 +1,569 @@ +use crate::control_socket_address::ControlSocket::{TcpSocket, UnixLocalAbstractSocket, UnixLocalSocket}; +use crate::control_socket_address::ControlSocketScheme::{HTTP, HTTPS}; +use crate::unit_client::UnitClientError; +use hyper::http::uri::{Authority, PathAndQuery}; +use hyper::Uri; +use std::fmt::{Display, Formatter}; +use std::fs; +use std::os::unix::fs::FileTypeExt; +use std::path::{PathBuf, MAIN_SEPARATOR}; + +type AbstractSocketName = String; +type UnixSocketPath = PathBuf; +type Port = u16; + +#[derive(Debug, Clone)] +pub enum ControlSocket { + UnixLocalAbstractSocket(AbstractSocketName), + UnixLocalSocket(UnixSocketPath), + TcpSocket(Uri), +} + +#[derive(Debug)] +pub enum ControlSocketScheme { + HTTP, + HTTPS, +} + +impl ControlSocketScheme { + fn port(&self) -> Port { + match self { + HTTP => 80, + HTTPS => 443, + } + } +} + +impl Display for ControlSocket { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + UnixLocalAbstractSocket(name) => f.write_fmt(format_args!("unix:@{}", name)), + UnixLocalSocket(path) => f.write_fmt(format_args!("unix:{}", path.to_string_lossy())), + TcpSocket(uri) => uri.fmt(f), + } + } +} + +impl From<ControlSocket> for String { + fn from(val: ControlSocket) -> Self { + val.to_string() + } +} + +impl From<ControlSocket> for PathBuf { + fn from(val: ControlSocket) -> Self { + match val { + UnixLocalAbstractSocket(socket_name) => PathBuf::from(format!("@{}", socket_name)), + UnixLocalSocket(socket_path) => socket_path, + TcpSocket(_) => PathBuf::default(), + } + } +} + +impl From<ControlSocket> for Uri { + fn from(val: ControlSocket) -> Self { + val.create_uri_with_path("") + } +} + +impl TryFrom<String> for ControlSocket { + type Error = UnitClientError; + + fn try_from(socket_address: String) -> Result<Self, Self::Error> { + ControlSocket::parse_address(socket_address.as_str()) + } +} + +impl TryFrom<&str> for ControlSocket { + type Error = UnitClientError; + + fn try_from(socket_address: &str) -> Result<Self, Self::Error> { + ControlSocket::parse_address(socket_address) + } +} + +impl TryFrom<Uri> for ControlSocket { + type Error = UnitClientError; + + fn try_from(socket_uri: Uri) -> Result<Self, Self::Error> { + match socket_uri.scheme_str() { + // URIs with the unix scheme will have a hostname that is a hex encoded string + // representing the path to the socket + Some("unix") => { + let host = match socket_uri.host() { + Some(host) => host, + None => { + return Err(UnitClientError::TcpSocketAddressParseError { + message: "No host found in socket address".to_string(), + control_socket_address: socket_uri.to_string(), + }) + } + }; + let bytes = hex::decode(host).map_err(|error| UnitClientError::TcpSocketAddressParseError { + message: error.to_string(), + control_socket_address: socket_uri.to_string(), + })?; + let path = String::from_utf8_lossy(&bytes); + ControlSocket::parse_address(path) + } + Some("http") | Some("https") => Ok(TcpSocket(socket_uri)), + Some(unknown) => Err(UnitClientError::TcpSocketAddressParseError { + message: format!("Unsupported scheme found in socket address: {}", unknown).to_string(), + control_socket_address: socket_uri.to_string(), + }), + None => Err(UnitClientError::TcpSocketAddressParseError { + message: "No scheme found in socket address".to_string(), + control_socket_address: socket_uri.to_string(), + }), + } + } +} + +impl ControlSocket { + pub fn socket_scheme(&self) -> ControlSocketScheme { + match self { + UnixLocalAbstractSocket(_) => ControlSocketScheme::HTTP, + UnixLocalSocket(_) => ControlSocketScheme::HTTP, + TcpSocket(uri) => match uri.scheme_str().expect("Scheme should not be None") { + "http" => ControlSocketScheme::HTTP, + "https" => ControlSocketScheme::HTTPS, + _ => unreachable!("Scheme should be http or https"), + }, + } + } + + pub fn create_uri_with_path(&self, str_path: &str) -> Uri { + match self { + UnixLocalAbstractSocket(name) => { + let socket_path = PathBuf::from(format!("@{}", name)); + hyperlocal::Uri::new(socket_path, str_path).into() + } + UnixLocalSocket(socket_path) => hyperlocal::Uri::new(socket_path, str_path).into(), + TcpSocket(uri) => { + if str_path.is_empty() { + uri.clone() + } else { + let authority = uri.authority().expect("Authority should not be None"); + Uri::builder() + .scheme(uri.scheme_str().expect("Scheme should not be None")) + .authority(authority.clone()) + .path_and_query(str_path) + .build() + .expect("URI should be valid") + } + } + } + } + + pub fn validate_http_address(uri: Uri) -> Result<(), UnitClientError> { + let http_address = uri.to_string(); + if uri.authority().is_none() { + return Err(UnitClientError::TcpSocketAddressParseError { + message: "No authority found in socket address".to_string(), + control_socket_address: http_address, + }); + } + if uri.port_u16().is_none() { + return Err(UnitClientError::TcpSocketAddressNoPortError { + control_socket_address: http_address, + }); + } + if !(uri.path().is_empty() || uri.path().eq("/")) { + return Err(UnitClientError::TcpSocketAddressParseError { + message: format!("Path is not empty or is not / [path={}]", uri.path()), + control_socket_address: http_address, + }); + } + + Ok(()) + } + + pub fn validate_unix_address(socket: PathBuf) -> Result<(), UnitClientError> { + if !socket.exists() { + return Err(UnitClientError::UnixSocketNotFound { + control_socket_address: socket.to_string_lossy().to_string(), + }); + } + let metadata = fs::metadata(&socket).map_err(|error| UnitClientError::UnixSocketAddressError { + source: error, + control_socket_address: socket.to_string_lossy().to_string(), + })?; + let file_type = metadata.file_type(); + if !file_type.is_socket() { + return Err(UnitClientError::UnixSocketAddressError { + source: std::io::Error::new(std::io::ErrorKind::Other, "Control socket path is not a socket"), + control_socket_address: socket.to_string_lossy().to_string(), + }); + } + + Ok(()) + } + + pub fn validate(&self) -> Result<Self, UnitClientError> { + match self { + UnixLocalAbstractSocket(socket_name) => { + let socket_path = PathBuf::from(format!("@{}", socket_name)); + Self::validate_unix_address(socket_path.clone()) + } + UnixLocalSocket(socket_path) => Self::validate_unix_address(socket_path.clone()), + TcpSocket(socket_uri) => Self::validate_http_address(socket_uri.clone()), + } + .map(|_| self.to_owned()) + } + + fn normalize_and_parse_http_address(http_address: String) -> Result<Uri, UnitClientError> { + // Convert *:1 style network addresses to URI format + let address = if http_address.starts_with("*:") { + http_address.replacen("*:", "http://127.0.0.1:", 1) + // Add scheme if not present + } else if !(http_address.starts_with("http://") || http_address.starts_with("https://")) { + format!("http://{}", http_address) + } else { + http_address.to_owned() + }; + + let is_https = address.starts_with("https://"); + + let parsed_uri = + Uri::try_from(address.as_str()).map_err(|error| UnitClientError::TcpSocketAddressUriError { + source: error, + control_socket_address: address, + })?; + let authority = parsed_uri.authority().expect("Authority should not be None"); + let expected_port = if is_https { HTTPS.port() } else { HTTP.port() }; + let normalized_authority = match authority.port_u16() { + Some(_) => authority.to_owned(), + None => { + let host = format!("{}:{}", authority.host(), expected_port); + Authority::try_from(host.as_str()).expect("Authority should be valid") + } + }; + + let normalized_uri = Uri::builder() + .scheme(parsed_uri.scheme_str().expect("Scheme should not be None")) + .authority(normalized_authority) + .path_and_query(PathAndQuery::from_static("")) + .build() + .map_err(|error| UnitClientError::TcpSocketAddressParseError { + message: error.to_string(), + control_socket_address: http_address.clone(), + })?; + + Ok(normalized_uri) + } + + /// Flexibly parse a textual representation of a socket address + pub fn parse_address<S: Into<String>>(socket_address: S) -> Result<Self, UnitClientError> { + let full_socket_address: String = socket_address.into(); + let socket_prefix = "unix:"; + let socket_uri_prefix = "unix://"; + let mut buf = String::with_capacity(socket_prefix.len()); + for (i, c) in full_socket_address.char_indices() { + // Abstract unix socket with no prefix + if i == 0 && c == '@' { + return Ok(UnixLocalAbstractSocket(full_socket_address[1..].to_string())); + } + buf.push(c); + // Unix socket with prefix + if i == socket_prefix.len() - 1 && buf.eq(socket_prefix) { + let path_text = full_socket_address[socket_prefix.len()..].to_string(); + // Return here if this URI does not have a scheme followed by double slashes + if !path_text.starts_with("//") { + return match path_text.strip_prefix('@') { + Some(name) => Ok(UnixLocalAbstractSocket(name.to_string())), + None => { + let path = PathBuf::from(path_text); + Ok(UnixLocalSocket(path)) + } + }; + } + } + + // Unix socket with URI prefix + if i == socket_uri_prefix.len() - 1 && buf.eq(socket_uri_prefix) { + let uri = Uri::try_from(full_socket_address.as_str()).map_err(|error| { + UnitClientError::TcpSocketAddressParseError { + message: error.to_string(), + control_socket_address: full_socket_address.clone(), + } + })?; + return ControlSocket::try_from(uri); + } + } + + /* Sockets on Windows are not supported, so there is no need to check + * if the socket address is a valid path, so we can do this shortcut + * here to see if a path was specified without a unix: prefix. */ + if buf.starts_with(MAIN_SEPARATOR) { + let path = PathBuf::from(buf); + return Ok(UnixLocalSocket(path)); + } + + let uri = Self::normalize_and_parse_http_address(buf)?; + Ok(TcpSocket(uri)) + } + + pub fn is_local_socket(&self) -> bool { + match self { + UnixLocalAbstractSocket(_) | UnixLocalSocket(_) => true, + TcpSocket(_) => false, + } + } +} + +#[cfg(test)] +mod tests { + use rand::distributions::{Alphanumeric, DistString}; + use std::env::temp_dir; + use std::fmt::Display; + use std::io; + use std::os::unix::net::UnixListener; + + use super::*; + + struct TempSocket { + socket_path: PathBuf, + _listener: UnixListener, + } + + impl TempSocket { + fn shutdown(&mut self) -> io::Result<()> { + fs::remove_file(&self.socket_path) + } + } + + impl Display for TempSocket { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "unix:{}", self.socket_path.to_string_lossy().to_string()) + } + } + + impl Drop for TempSocket { + fn drop(&mut self) { + self.shutdown() + .expect(format!("Unable to shutdown socket {}", self.socket_path.to_string_lossy()).as_str()); + } + } + + #[test] + fn will_error_with_nonexistent_unix_socket() { + let socket_address = "unix:/tmp/some_random_filename_that_doesnt_exist.sock"; + let control_socket = + ControlSocket::try_from(socket_address).expect("No error should be returned until validate() is called"); + assert!(control_socket.is_local_socket(), "Not parsed as a local socket"); + assert!(control_socket.validate().is_err(), "Socket should not be valid"); + } + + #[test] + fn can_parse_socket_with_prefix() { + let temp_socket = create_file_socket().expect("Unable to create socket"); + let control_socket = ControlSocket::try_from(temp_socket.to_string()).expect("Error parsing good socket path"); + assert!(control_socket.is_local_socket(), "Not parsed as a local socket"); + if let Err(e) = control_socket.validate() { + panic!("Socket should be valid: {}", e); + } + } + + #[test] + fn can_parse_socket_from_uri() { + let temp_socket = create_file_socket().expect("Unable to create socket"); + let uri: Uri = hyperlocal::Uri::new(temp_socket.socket_path.clone(), "").into(); + let control_socket = ControlSocket::try_from(uri).expect("Error parsing good socket path"); + assert!(control_socket.is_local_socket(), "Not parsed as a local socket"); + if let Err(e) = control_socket.validate() { + panic!("Socket should be valid: {}", e); + } + } + + #[test] + fn can_parse_socket_from_uri_text() { + let temp_socket = create_file_socket().expect("Unable to create socket"); + let uri: Uri = hyperlocal::Uri::new(temp_socket.socket_path.clone(), "").into(); + let control_socket = ControlSocket::parse_address(uri.to_string()).expect("Error parsing good socket path"); + assert!(control_socket.is_local_socket(), "Not parsed as a local socket"); + if let Err(e) = control_socket.validate() { + panic!("Socket for input text should be valid: {}", e); + } + } + + #[test] + #[cfg(target_os = "linux")] + fn can_parse_abstract_socket_from_uri() { + let temp_socket = create_abstract_socket().expect("Unable to create socket"); + let uri: Uri = hyperlocal::Uri::new(temp_socket.socket_path.clone(), "").into(); + let control_socket = ControlSocket::try_from(uri).expect("Error parsing good socket path"); + assert!(control_socket.is_local_socket(), "Not parsed as a local socket"); + if let Err(e) = control_socket.validate() { + panic!("Socket should be valid: {}", e); + } + } + + #[test] + #[cfg(target_os = "linux")] + fn can_parse_abstract_socket_from_uri_text() { + let temp_socket = create_abstract_socket().expect("Unable to create socket"); + let uri: Uri = hyperlocal::Uri::new(temp_socket.socket_path.clone(), "").into(); + let control_socket = ControlSocket::parse_address(uri.to_string()).expect("Error parsing good socket path"); + assert!(control_socket.is_local_socket(), "Not parsed as a local socket"); + if let Err(e) = control_socket.validate() { + panic!("Socket should be valid: {}", e); + } + } + + #[test] + fn can_parse_socket_without_prefix() { + let temp_socket = create_file_socket().expect("Unable to create socket"); + let control_socket = ControlSocket::try_from(temp_socket.socket_path.to_string_lossy().to_string()) + .expect("Error parsing good socket path"); + assert!(control_socket.is_local_socket(), "Not parsed as a local socket"); + if let Err(e) = control_socket.validate() { + panic!("Socket should be valid: {}", e); + } + } + + #[cfg(target_os = "linux")] + #[test] + fn can_parse_abstract_socket() { + let temp_socket = create_abstract_socket().expect("Unable to create socket"); + let control_socket = ControlSocket::try_from(temp_socket.to_string()).expect("Error parsing good socket path"); + assert!(control_socket.is_local_socket(), "Not parsed as a local socket"); + if let Err(e) = control_socket.validate() { + panic!("Socket should be valid: {}", e); + } + } + + #[test] + fn can_normalize_good_http_socket_addresses() { + let valid_socket_addresses = vec![ + "http://127.0.0.1:8080", + "https://127.0.0.1:8080", + "http://127.0.0.1:8080/", + "127.0.0.1:8080", + "http://0.0.0.0:8080", + "https://0.0.0.0:8080", + "http://0.0.0.0:8080/", + "0.0.0.0:8080", + "http://localhost:8080", + "https://localhost:8080", + "http://localhost:8080/", + "localhost:8080", + "http://[::1]:8080", + "https://[::1]:8080", + "http://[::1]:8080/", + "[::1]:8080", + "http://[0000:0000:0000:0000:0000:0000:0000:0000]:8080", + "https://[0000:0000:0000:0000:0000:0000:0000:0000]:8080", + "http://[0000:0000:0000:0000:0000:0000:0000:0000]:8080/", + "[0000:0000:0000:0000:0000:0000:0000:0000]:8080", + ]; + for socket_address in valid_socket_addresses { + let mut expected = if socket_address.starts_with("http") { + socket_address.to_string().trim_end_matches('/').to_string() + } else { + format!("http://{}", socket_address).trim_end_matches('/').to_string() + }; + expected.push('/'); + + let control_socket = ControlSocket::try_from(socket_address).expect("Error parsing good socket path"); + assert!(!control_socket.is_local_socket(), "Not parsed as a local socket"); + if let Err(e) = control_socket.validate() { + panic!("Socket should be valid: {}", e); + } + } + } + + #[test] + fn can_normalize_wildcard_http_socket_address() { + let socket_address = "*:8080"; + let expected = "http://127.0.0.1:8080/"; + let normalized_result = ControlSocket::normalize_and_parse_http_address(socket_address.to_string()); + let normalized = normalized_result + .expect("Unable to normalize socket address") + .to_string(); + assert_eq!(normalized, expected); + } + + #[test] + fn can_normalize_http_socket_address_with_no_port() { + let socket_address = "http://localhost"; + let expected = "http://localhost:80/"; + let normalized_result = ControlSocket::normalize_and_parse_http_address(socket_address.to_string()); + let normalized = normalized_result + .expect("Unable to normalize socket address") + .to_string(); + assert_eq!(normalized, expected); + } + + #[test] + fn can_normalize_https_socket_address_with_no_port() { + let socket_address = "https://localhost"; + let expected = "https://localhost:443/"; + let normalized_result = ControlSocket::normalize_and_parse_http_address(socket_address.to_string()); + let normalized = normalized_result + .expect("Unable to normalize socket address") + .to_string(); + assert_eq!(normalized, expected); + } + + #[test] + fn can_parse_http_addresses() { + let valid_socket_addresses = vec![ + "http://127.0.0.1:8080", + "https://127.0.0.1:8080", + "http://127.0.0.1:8080/", + "127.0.0.1:8080", + "http://0.0.0.0:8080", + "https://0.0.0.0:8080", + "http://0.0.0.0:8080/", + "0.0.0.0:8080", + "http://localhost:8080", + "https://localhost:8080", + "http://localhost:8080/", + "localhost:8080", + "http://[::1]:8080", + "https://[::1]:8080", + "http://[::1]:8080/", + "[::1]:8080", + "http://[0000:0000:0000:0000:0000:0000:0000:0000]:8080", + "https://[0000:0000:0000:0000:0000:0000:0000:0000]:8080", + "http://[0000:0000:0000:0000:0000:0000:0000:0000]:8080/", + "[0000:0000:0000:0000:0000:0000:0000:0000]:8080", + ]; + for socket_address in valid_socket_addresses { + let mut expected = if socket_address.starts_with("http") { + socket_address.to_string().trim_end_matches('/').to_string() + } else { + format!("http://{}", socket_address).trim_end_matches('/').to_string() + }; + expected.push('/'); + + let normalized = ControlSocket::normalize_and_parse_http_address(socket_address.to_string()) + .expect("Unable to normalize socket address") + .to_string(); + assert_eq!(normalized, expected); + } + } + + fn create_file_socket() -> Result<TempSocket, io::Error> { + let random = Alphanumeric.sample_string(&mut rand::thread_rng(), 10); + let socket_name = format!("unit-client-socket-test-{}.sock", random); + let socket_path = temp_dir().join(socket_name); + let listener = UnixListener::bind(&socket_path)?; + Ok(TempSocket { + socket_path, + _listener: listener, + }) + } + + #[cfg(target_os = "linux")] + fn create_abstract_socket() -> Result<TempSocket, io::Error> { + let random = Alphanumeric.sample_string(&mut rand::thread_rng(), 10); + let socket_name = format!("@unit-client-socket-test-{}.sock", random); + let socket_path = PathBuf::from(socket_name); + let listener = UnixListener::bind(&socket_path)?; + Ok(TempSocket { + socket_path, + _listener: listener, + }) + } +} diff --git a/tools/unitctl/unit-client-rs/src/lib.rs b/tools/unitctl/unit-client-rs/src/lib.rs new file mode 100644 index 00000000..a0933f42 --- /dev/null +++ b/tools/unitctl/unit-client-rs/src/lib.rs @@ -0,0 +1,16 @@ +extern crate custom_error; +extern crate futures; +extern crate hyper; +extern crate hyper_tls; +extern crate hyperlocal; +extern crate serde; +extern crate serde_json; +pub mod control_socket_address; +mod runtime_flags; +pub mod unit_client; +mod unitd_cmd; +pub mod unitd_configure_options; +pub mod unitd_docker; +pub mod unitd_instance; +pub mod unitd_process; +mod unitd_process_user; diff --git a/tools/unitctl/unit-client-rs/src/runtime_flags.rs b/tools/unitctl/unit-client-rs/src/runtime_flags.rs new file mode 100644 index 00000000..7b31274d --- /dev/null +++ b/tools/unitctl/unit-client-rs/src/runtime_flags.rs @@ -0,0 +1,90 @@ +use std::borrow::Cow; +use std::fmt; +use std::fmt::Display; +use std::path::{Path, PathBuf}; + +#[derive(Debug, Clone)] +pub struct RuntimeFlags { + pub flags: Cow<'static, str>, +} + +impl Display for RuntimeFlags { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", self.flags) + } +} + +impl RuntimeFlags { + pub fn new<S>(flags: S) -> RuntimeFlags + where + S: Into<String>, + { + RuntimeFlags { + flags: Cow::from(flags.into()), + } + } + + pub fn has_flag(&self, flag_name: &str) -> bool { + self.flags.contains(format!("--{}", flag_name).as_str()) + } + + pub fn get_flag_value(&self, flag_name: &str) -> Option<String> { + let flag_parts = self.flags.split_ascii_whitespace().collect::<Vec<&str>>(); + for (i, flag) in flag_parts.iter().enumerate() { + if let Some(name) = flag.strip_prefix("--") { + /* If there is no flag value after the current one, there is by definition no + * flag value for the current flag. */ + let index_lt_len = flag_parts.len() > i + 1; + if index_lt_len { + let next_value_isnt_flag = !flag_parts[i + 1].starts_with("--"); + if name.eq(flag_name) && next_value_isnt_flag { + return Some(flag_parts[i + 1].to_string()); + } + } + } + } + None + } + + pub fn control_api_socket_address(&self) -> Option<String> { + self.get_flag_value("control") + } + + pub fn pid_path(&self) -> Option<Box<Path>> { + self.get_flag_value("pid") + .map(PathBuf::from) + .map(PathBuf::into_boxed_path) + } + + pub fn log_path(&self) -> Option<Box<Path>> { + self.get_flag_value("log") + .map(PathBuf::from) + .map(PathBuf::into_boxed_path) + } + + pub fn modules_directory(&self) -> Option<Box<Path>> { + self.get_flag_value("modules") + .map(PathBuf::from) + .map(PathBuf::into_boxed_path) + } + + pub fn state_directory(&self) -> Option<Box<Path>> { + self.get_flag_value("state") + .map(PathBuf::from) + .map(PathBuf::into_boxed_path) + } + + pub fn tmp_directory(&self) -> Option<Box<Path>> { + self.get_flag_value("tmp") + .map(PathBuf::from) + .map(PathBuf::into_boxed_path) + } + + pub fn user(&self) -> Option<String> { + self.get_flag_value("user").map(String::from) + } + + pub fn group(&self) -> Option<String> { + self.get_flag_value("group").map(String::from) + } +} diff --git a/tools/unitctl/unit-client-rs/src/unit_client.rs b/tools/unitctl/unit-client-rs/src/unit_client.rs new file mode 100644 index 00000000..3d09e67a --- /dev/null +++ b/tools/unitctl/unit-client-rs/src/unit_client.rs @@ -0,0 +1,424 @@ +use std::collections::HashMap; +use std::error::Error as StdError; +use std::fmt::Debug; +use std::rc::Rc; +use std::{fmt, io}; + +use custom_error::custom_error; +use hyper::body::{Buf, HttpBody}; +use hyper::client::{HttpConnector, ResponseFuture}; +use hyper::Error as HyperError; +use hyper::{http, Body, Client, Request}; +use hyper_tls::HttpsConnector; +use hyperlocal::{UnixClientExt, UnixConnector}; +use serde::{Deserialize, Serialize}; + +use crate::control_socket_address::ControlSocket; +use unit_openapi::apis::configuration::Configuration; +use unit_openapi::apis::{ + ApplicationsApi, ApplicationsApiClient, AppsApi, AppsApiClient, Error as OpenAPIError, ListenersApi, + ListenersApiClient, StatusApi, StatusApiClient, +}; +use unit_openapi::models::{ConfigApplication, ConfigListener, Status}; + +const USER_AGENT: &str = concat!("Unit CLI/", env!("CARGO_PKG_VERSION"), "/rust"); + +custom_error! {pub UnitClientError + OpenAPIError { source: OpenAPIError } = "OpenAPI error", + JsonError { source: serde_json::Error, + path: String} = "JSON error [path={path}]", + HyperError { source: hyper::Error, + control_socket_address: String, + path: String} = "Communications error [control_socket_address={control_socket_address}, path={path}]: {source}", + HttpRequestError { source: http::Error, + path: String} = "HTTP error [path={path}]", + HttpResponseError { status: http::StatusCode, + path: String, + body: String} = "HTTP response error [path={path}, status={status}]:\n{body}", + HttpResponseJsonBodyError { status: http::StatusCode, + path: String, + error: String, + detail: String} = "HTTP response error [path={path}, status={status}]:\n Error: {error}\n Detail: {detail}", + IoError { source: io::Error, socket: String } = "IO error [socket={socket}]", + UnixSocketAddressError { + source: io::Error, + control_socket_address: String + } = "Invalid unix domain socket address [control_socket_address={control_socket_address}]", + SocketPermissionsError { control_socket_address: String } = + "Insufficient permissions to connect to control socket [control_socket_address={control_socket_address}]", + UnixSocketNotFound { control_socket_address: String } = "Unix socket not found [control_socket_address={control_socket_address}]", + TcpSocketAddressUriError { + source: http::uri::InvalidUri, + control_socket_address: String + } = "Invalid TCP socket address [control_socket_address={control_socket_address}]", + TcpSocketAddressParseError { + message: String, + control_socket_address: String + } = "Invalid TCP socket address [control_socket_address={control_socket_address}]: {message}", + TcpSocketAddressNoPortError { + control_socket_address: String + } = "TCP socket address does not have a port specified [control_socket_address={control_socket_address}]", + UnitdProcessParseError { + message: String, + pid: u64 + } = "{message} for [pid={pid}]", + UnitdProcessExecError { + source: Box<dyn StdError>, + message: String, + executable_path: String, + pid: u64 + } = "{message} for [pid={pid}, executable_path={executable_path}]: {source}", + UnitdDockerError { + message: String + } = "Failed to communicate with docker daemon: {message}", +} + +impl UnitClientError { + fn new(error: HyperError, control_socket_address: String, path: String) -> Self { + if error.is_connect() { + if let Some(source) = error.source() { + if let Some(io_error) = source.downcast_ref::<io::Error>() { + if io_error.kind().eq(&io::ErrorKind::PermissionDenied) { + return UnitClientError::SocketPermissionsError { control_socket_address }; + } + } + } + } + + UnitClientError::HyperError { + source: error, + control_socket_address, + path, + } + } +} + +macro_rules! new_openapi_client_from_hyper_client { + ($unit_client:expr, $hyper_client: ident, $api_client:ident, $api_trait:ident) => {{ + let config = Configuration { + base_path: $unit_client.control_socket.create_uri_with_path("/").to_string(), + user_agent: Some(format!("{}/OpenAPI-Generator", USER_AGENT).to_owned()), + client: $hyper_client.clone(), + basic_auth: None, + oauth_access_token: None, + api_key: None, + }; + let rc_config = Rc::new(config); + Box::new($api_client::new(rc_config)) as Box<dyn $api_trait> + }}; +} + +macro_rules! new_openapi_client { + ($unit_client:expr, $api_client:ident, $api_trait:ident) => { + match &*$unit_client.client { + RemoteClient::Tcp { client } => { + new_openapi_client_from_hyper_client!($unit_client, client, $api_client, $api_trait) + } + RemoteClient::Unix { client } => { + new_openapi_client_from_hyper_client!($unit_client, client, $api_client, $api_trait) + } + } + }; +} + +#[derive(Clone)] +pub enum RemoteClient<B> +where + B: HttpBody + Send + 'static, + B::Data: Send, + B::Error: Into<Box<dyn StdError + Send + Sync>>, +{ + Unix { + client: Client<UnixConnector, B>, + }, + Tcp { + client: Client<HttpsConnector<HttpConnector>, B>, + }, +} + +impl<B> RemoteClient<B> +where + B: HttpBody + Send + 'static, + B::Data: Send, + B::Error: Into<Box<dyn StdError + Send + Sync>>, +{ + fn client_name(&self) -> &str { + match self { + RemoteClient::Unix { .. } => "Client<UnixConnector, Body>", + RemoteClient::Tcp { .. } => "Client<HttpsConnector<HttpConnector>, Body>", + } + } + + pub fn request(&self, req: Request<B>) -> ResponseFuture { + match self { + RemoteClient::Unix { client } => client.request(req), + RemoteClient::Tcp { client } => client.request(req), + } + } +} + +impl<B> Debug for RemoteClient<B> +where + B: HttpBody + Send + 'static, + B::Data: Send, + B::Error: Into<Box<dyn StdError + Send + Sync>>, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.client_name()) + } +} + +#[derive(Debug)] +pub struct UnitClient { + pub control_socket: ControlSocket, + /// Client for communicating with the control API over the UNIX domain socket + client: Box<RemoteClient<Body>>, +} + +impl UnitClient { + pub fn new(control_socket: ControlSocket) -> Self { + if control_socket.is_local_socket() { + Self::new_unix(control_socket) + } else { + Self::new_http(control_socket) + } + } + + pub fn new_http(control_socket: ControlSocket) -> Self { + let remote_client = Client::builder().build(HttpsConnector::new()); + Self { + control_socket, + client: Box::from(RemoteClient::Tcp { client: remote_client }), + } + } + + pub fn new_unix(control_socket: ControlSocket) -> UnitClient { + let remote_client = Client::unix(); + + Self { + control_socket, + client: Box::from(RemoteClient::Unix { client: remote_client }), + } + } + + /// Sends a request to Unit and deserializes the JSON response body into the value of type `RESPONSE`. + pub async fn send_request_and_deserialize_response<RESPONSE: for<'de> serde::Deserialize<'de>>( + &self, + mut request: Request<Body>, + ) -> Result<RESPONSE, UnitClientError> { + let uri = request.uri().clone(); + let path: &str = uri.path(); + + request.headers_mut().insert("User-Agent", USER_AGENT.parse().unwrap()); + + let response_future = self.client.request(request); + + let response = response_future + .await + .map_err(|error| UnitClientError::new(error, self.control_socket.to_string(), path.to_string()))?; + + let status = response.status(); + let body = hyper::body::aggregate(response) + .await + .map_err(|error| UnitClientError::new(error, self.control_socket.to_string(), path.to_string()))?; + let reader = &mut body.reader(); + if !status.is_success() { + let error: HashMap<String, String> = + serde_json::from_reader(reader).map_err(|error| UnitClientError::JsonError { + source: error, + path: path.to_string(), + })?; + + return Err(UnitClientError::HttpResponseJsonBodyError { + status, + path: path.to_string(), + error: error.get("error").unwrap_or(&"Unknown error".into()).to_string(), + detail: error.get("detail").unwrap_or(&"".into()).to_string(), + }); + } + serde_json::from_reader(reader).map_err(|error| UnitClientError::JsonError { + source: error, + path: path.to_string(), + }) + } + + pub fn listeners_api(&self) -> Box<dyn ListenersApi + 'static> { + new_openapi_client!(self, ListenersApiClient, ListenersApi) + } + + pub async fn listeners(&self) -> Result<HashMap<String, ConfigListener>, Box<UnitClientError>> { + self.listeners_api().get_listeners().await.or_else(|err| { + if let OpenAPIError::Hyper(hyper_error) = err { + Err(Box::new(UnitClientError::new( + hyper_error, + self.control_socket.to_string(), + "/listeners".to_string(), + ))) + } else { + Err(Box::new(UnitClientError::OpenAPIError { source: err })) + } + }) + } + + pub fn status_api(&self) -> Box<dyn StatusApi + 'static> { + new_openapi_client!(self, StatusApiClient, StatusApi) + } + + pub async fn status(&self) -> Result<Status, Box<UnitClientError>> { + self.status_api().get_status().await.or_else(|err| { + if let OpenAPIError::Hyper(hyper_error) = err { + Err(Box::new(UnitClientError::new( + hyper_error, + self.control_socket.to_string(), + "/status".to_string(), + ))) + } else { + Err(Box::new(UnitClientError::OpenAPIError { source: err })) + } + }) + } + + pub fn applications_api(&self) -> Box<dyn ApplicationsApi + 'static> { + new_openapi_client!(self, ApplicationsApiClient, ApplicationsApi) + } + + pub async fn applications(&self) -> Result<HashMap<String, ConfigApplication>, Box<UnitClientError>> { + self.applications_api().get_applications().await.or_else(|err| { + if let OpenAPIError::Hyper(hyper_error) = err { + Err(Box::new(UnitClientError::new( + hyper_error, + self.control_socket.to_string(), + "/applications".to_string(), + ))) + } else { + Err(Box::new(UnitClientError::OpenAPIError { source: err })) + } + }) + } + + pub async fn per_application_api(&self) -> Box<dyn AppsApi + 'static> { + new_openapi_client!(self, AppsApiClient, AppsApi) + } + + pub async fn restart_application(&self, name: &String) -> Result<HashMap<String, String>, Box<UnitClientError>> { + self.per_application_api() + .await + .get_app_restart(name.as_str()) + .await + .or_else(|err| { + if let OpenAPIError::Hyper(hyper_error) = err { + Err(Box::new(UnitClientError::new( + hyper_error, + self.control_socket.to_string(), + format!("/control/applications/{}/restart", name), + ))) + } else { + Err(Box::new(UnitClientError::OpenAPIError { source: err })) + } + }) + } + + pub async fn is_running(&self) -> bool { + self.status().await.is_ok() + } +} + +pub type UnitSerializableMap = HashMap<String, serde_json::Value>; + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct UnitStatus { + pub connections: UnitStatusConnections, + pub requests: UnitStatusRequests, + pub applications: HashMap<String, UnitStatusApplication>, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct UnitStatusConnections { + #[serde(default)] + pub closed: usize, + #[serde(default)] + pub idle: usize, + #[serde(default)] + pub active: usize, + #[serde(default)] + pub accepted: usize, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct UnitStatusRequests { + #[serde(default)] + pub active: usize, + #[serde(default)] + pub total: usize, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct UnitStatusApplication { + #[serde(default)] + pub processes: HashMap<String, usize>, + #[serde(default)] + pub requests: HashMap<String, usize>, +} + +#[cfg(test)] +mod tests { + use crate::unitd_instance::UnitdInstance; + + use super::*; + // Integration tests + + #[tokio::test] + async fn can_connect_to_unit_api() { + match UnitdInstance::running_unitd_instances().await.first() { + Some(unit_instance) => { + let control_api_socket_address = unit_instance + .control_api_socket_address() + .expect("No control API socket path found"); + let control_socket = ControlSocket::try_from(control_api_socket_address) + .expect("Unable to parse control socket address"); + let unit_client = UnitClient::new(control_socket); + assert!(unit_client.is_running().await); + } + None => { + eprintln!("No running unitd instances found - skipping test"); + } + } + } + + #[tokio::test] + async fn can_get_unit_status() { + match UnitdInstance::running_unitd_instances().await.first() { + Some(unit_instance) => { + let control_api_socket_address = unit_instance + .control_api_socket_address() + .expect("No control API socket path found"); + let control_socket = ControlSocket::try_from(control_api_socket_address) + .expect("Unable to parse control socket address"); + let unit_client = UnitClient::new(control_socket); + let status = unit_client.status().await.expect("Unable to get unit status"); + println!("Unit status: {:?}", status); + } + None => { + eprintln!("No running unitd instances found - skipping test"); + } + } + } + + #[tokio::test] + async fn can_get_unit_listeners() { + match UnitdInstance::running_unitd_instances().await.first() { + Some(unit_instance) => { + let control_api_socket_address = unit_instance + .control_api_socket_address() + .expect("No control API socket path found"); + let control_socket = ControlSocket::try_from(control_api_socket_address) + .expect("Unable to parse control socket address"); + let unit_client = UnitClient::new(control_socket); + unit_client.listeners().await.expect("Unable to get Unit listeners"); + } + None => { + eprintln!("No running unitd instances found - skipping test"); + } + } + } +} diff --git a/tools/unitctl/unit-client-rs/src/unitd_cmd.rs b/tools/unitctl/unit-client-rs/src/unitd_cmd.rs new file mode 100644 index 00000000..17563cb0 --- /dev/null +++ b/tools/unitctl/unit-client-rs/src/unitd_cmd.rs @@ -0,0 +1,88 @@ +use std::error::Error as StdError; +use std::io::{Error as IoError, ErrorKind}; + +use crate::runtime_flags::RuntimeFlags; +use std::path::{Path, PathBuf}; + +#[derive(Debug, Clone)] +pub struct UnitdCmd { + pub(crate) process_executable_path: Option<Box<Path>>, + pub version: Option<String>, + pub flags: Option<RuntimeFlags>, +} + +impl UnitdCmd { + pub(crate) fn new<S>(full_cmd: S, binary_name: &str) -> Result<UnitdCmd, Box<dyn StdError>> + where + S: Into<String>, + { + let process_cmd: String = full_cmd.into(); + let parsable = process_cmd + .strip_prefix("unit: main v") + .and_then(|s| s.strip_suffix(']')); + if parsable.is_none() { + let msg = format!("cmd does not have the expected format: {}", process_cmd); + return Err(IoError::new(ErrorKind::InvalidInput, msg).into()); + } + let parts = parsable + .expect("Unable to parse cmd") + .splitn(2, " [") + .collect::<Vec<&str>>(); + + if parts.len() != 2 { + let msg = format!("cmd does not have the expected format: {}", process_cmd); + return Err(IoError::new(ErrorKind::InvalidInput, msg).into()); + } + + let version = Some(parts[0].to_string()); + let executable_path = UnitdCmd::parse_executable_path_from_cmd(parts[1], binary_name); + let flags = UnitdCmd::parse_runtime_flags_from_cmd(parts[1]); + + Ok(UnitdCmd { + process_executable_path: executable_path, + version, + flags, + }) + } + + fn parse_executable_path_from_cmd<S>(full_cmd: S, binary_name: &str) -> Option<Box<Path>> + where + S: Into<String>, + { + let cmd = full_cmd.into(); + if cmd.is_empty() { + return None; + } + + let split = cmd.splitn(2, binary_name).collect::<Vec<&str>>(); + if split.is_empty() { + return None; + } + + let path = format!("{}{}", split[0], binary_name); + Some(PathBuf::from(path).into_boxed_path()) + } + + fn parse_runtime_flags_from_cmd<S>(full_cmd: S) -> Option<RuntimeFlags> + where + S: Into<String>, + { + let cmd = full_cmd.into(); + if cmd.is_empty() { + return None; + } + + // Split out everything in between the brackets [ and ] + let split = cmd.trim_end_matches(']').splitn(2, '[').collect::<Vec<&str>>(); + if split.is_empty() { + return None; + } + /* Now we need to parse a string like this: + * ./sbin/unitd --no-daemon --tmp /tmp + * and only return what is after the invoking command */ + split[0] + .find("--") + .map(|index| cmd[index..].to_string()) + .map(RuntimeFlags::new) + } +} diff --git a/tools/unitctl/unit-client-rs/src/unitd_configure_options.rs b/tools/unitctl/unit-client-rs/src/unitd_configure_options.rs new file mode 100644 index 00000000..00ee22a3 --- /dev/null +++ b/tools/unitctl/unit-client-rs/src/unitd_configure_options.rs @@ -0,0 +1,236 @@ +use custom_error::custom_error; +use std::borrow::Cow; +use std::error::Error as stdError; +use std::io::{BufRead, BufReader, Lines}; +use std::path::{Path, PathBuf}; +use std::process::{Command, Stdio}; + +custom_error! {UnitdStderrParseError + VersionNotFound = "Version string output not found", + BuildSettingsNotFound = "Build settings not found" +} + +#[derive(Debug, Clone)] +pub struct UnitdConfigureOptions { + pub version: Cow<'static, str>, + pub all_flags: Cow<'static, str>, +} + +impl UnitdConfigureOptions { + pub fn new(unitd_path: &Path) -> Result<UnitdConfigureOptions, Box<dyn stdError>> { + fn parse_configure_settings_from_unitd_stderr_output<B: BufRead>( + lines: &mut Lines<B>, + ) -> Result<UnitdConfigureOptions, Box<dyn stdError>> { + const VERSION_PREFIX: &str = "unit version: "; + const CONFIGURED_AS_PREFIX: &str = "configured as "; + const CONFIGURE_PREFIX: &str = "configured as ./configure "; + + fn aggregate_parsable_lines( + mut accum: (Option<String>, Option<String>), + line: String, + ) -> (Option<String>, Option<String>) { + if line.starts_with(VERSION_PREFIX) { + accum.0 = line.strip_prefix(VERSION_PREFIX).map(|l| l.to_string()); + } else if line.starts_with(CONFIGURED_AS_PREFIX) { + accum.1 = line.strip_prefix(CONFIGURE_PREFIX).map(|l| l.to_string()); + } + + accum + } + + let options_lines = lines + .filter_map(|line| line.ok()) + .fold((None, None), aggregate_parsable_lines); + + if options_lines.0.is_none() { + return Err(Box::new(UnitdStderrParseError::VersionNotFound) as Box<dyn stdError>); + } else if options_lines.1.is_none() { + return Err(Box::new(UnitdStderrParseError::BuildSettingsNotFound) as Box<dyn stdError>); + } + + Ok(UnitdConfigureOptions { + version: options_lines.0.unwrap().into(), + all_flags: options_lines.1.unwrap().into(), + }) + } + + let program = unitd_path.as_os_str(); + let child = Command::new(program) + .arg("--version") + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn()?; + let output = child.wait_with_output()?; + let err = BufReader::new(&*output.stderr); + parse_configure_settings_from_unitd_stderr_output(&mut err.lines()) + } + + pub fn has_flag(&self, flag_name: &str) -> bool { + self.all_flags + .split_ascii_whitespace() + .any(|flag| flag.starts_with(format!("--{}", flag_name).as_str())) + } + + pub fn get_flag_value(&self, flag_name: &str) -> Option<String> { + self.all_flags + .split_ascii_whitespace() + .find(|flag| flag.starts_with(format!("--{}", flag_name).as_str())) + .and_then(|flag| { + let parts: Vec<&str> = flag.split('=').collect(); + if parts.len() >= 2 { + Some(parts[1].to_owned()) + } else { + None + } + }) + } + + pub fn debug_enabled(&self) -> bool { + self.has_flag("debug") + } + + pub fn openssl_enabled(&self) -> bool { + self.has_flag("openssl") + } + + pub fn prefix_path(&self) -> Option<Box<Path>> { + self.get_flag_value("prefix") + .map(PathBuf::from) + .map(PathBuf::into_boxed_path) + } + + fn join_to_prefix_path<S>(&self, sub_path: S) -> Option<Box<Path>> + where + S: Into<String>, + { + self.prefix_path() + .map(|path| path.join(sub_path.into()).into_boxed_path()) + } + + pub fn default_control_api_socket_address(&self) -> Option<String> { + // If the socket address is specific configured in the configure options, we use + // that. Otherwise, we use the default path as assumed to be unix:$prefix/control.unit.sock. + match self.get_flag_value("control") { + Some(socket_address) => Some(socket_address), + None => { + // Give up if the unitd is compiled with unix sockets disabled + if self.has_flag("no-unix-sockets") { + return None; + } + let socket_path = self.join_to_prefix_path("control.unit.sock"); + socket_path.map(|path| format!("unix:{}", path.to_string_lossy())) + } + } + } + + pub fn default_pid_path(&self) -> Option<Box<Path>> { + match self.get_flag_value("pid") { + Some(pid_path) => self.join_to_prefix_path(pid_path), + None => self.join_to_prefix_path("unit.pid"), + } + } + + pub fn default_log_path(&self) -> Option<Box<Path>> { + match self.get_flag_value("log") { + Some(pid_path) => self.join_to_prefix_path(pid_path), + None => self.join_to_prefix_path("unit.log"), + } + } + + pub fn default_modules_directory(&self) -> Option<Box<Path>> { + match self.get_flag_value("modules") { + Some(modules_dir_name) => self.join_to_prefix_path(modules_dir_name), + None => self.join_to_prefix_path("modules"), + } + } + + pub fn default_state_directory(&self) -> Option<Box<Path>> { + match self.get_flag_value("state") { + Some(state_dir_name) => self.join_to_prefix_path(state_dir_name), + None => self.join_to_prefix_path("state"), + } + } + + pub fn default_tmp_directory(&self) -> Option<Box<Path>> { + match self.get_flag_value("tmp") { + Some(tmp_dir_name) => self.join_to_prefix_path(tmp_dir_name), + None => self.join_to_prefix_path("tmp"), + } + } + pub fn default_user(&self) -> Option<String> { + self.get_flag_value("user").map(String::from) + } + pub fn default_group(&self) -> Option<String> { + self.get_flag_value("group").map(String::from) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::unitd_instance; + use crate::unitd_instance::UNITD_PATH_ENV_KEY; + + #[test] + fn can_detect_key() { + let options = UnitdConfigureOptions { + version: Default::default(), + all_flags: Cow::from("--debug --openssl --prefix=/opt/unit"), + }; + assert!(options.has_flag("debug")); + assert!(options.has_flag("openssl")); + assert!(options.has_flag("prefix")); + assert!(!options.has_flag("fobar")); + } + + #[test] + fn can_get_flag_value_by_key() { + let expected = "/opt/unit"; + let options = UnitdConfigureOptions { + version: Default::default(), + all_flags: Cow::from("--debug --openssl --prefix=/opt/unit"), + }; + + let actual = options.get_flag_value("prefix"); + assert_eq!(expected, actual.unwrap()) + } + + #[test] + fn can_get_prefix_path() { + let expected: Box<Path> = Path::new("/opt/unit").into(); + let options = UnitdConfigureOptions { + version: Default::default(), + all_flags: Cow::from("--debug --openssl --prefix=/opt/unit"), + }; + + let actual = options.prefix_path(); + assert_eq!(expected, actual.unwrap()) + } + + #[test] + fn can_parse_complicated_configure_options() { + let expected: Box<Path> = Path::new("/usr").into(); + let options = UnitdConfigureOptions { + version: Default::default(), + all_flags: Cow::from("--prefix=/usr --state=/var/lib/unit --control=unix:/var/run/control.unit.sock --pid=/var/run/unit.pid --log=/var/log/unit.log --tmp=/var/tmp --user=unit --group=unit --tests --openssl --modules=/usr/lib/unit/modules --libdir=/usr/lib/x86_64-linux-gnu --cc-opt='-g -O2 -fdebug-prefix-map=/data/builder/debuild/unit-1.28.0/pkg/deb/debuild/unit-1.28.0=. -specs=/usr/share/dpkg/no-pie-compile.specs -fstack-protector-strong -Wformat -Werror=format-security -Wp,-D_FORTIFY_SOURCE=2 -fPIC' --ld-opt='-Wl,-Bsymbolic-functions -specs=/usr/share/dpkg/no-pie-link.specs -Wl,-z,relro -Wl,-z,now -Wl,--as-needed -pie' +"), + }; + + let actual = options.prefix_path(); + assert_eq!(expected, actual.unwrap()) + } + + #[test] + #[ignore] // run this one manually - not in CI + fn can_run_unitd() { + let specific_path = std::env::var(UNITD_PATH_ENV_KEY).map_err(|error| Box::new(error) as Box<dyn stdError>); + let unitd_path = unitd_instance::find_executable_path(specific_path); + let config_options = UnitdConfigureOptions::new(&unitd_path.unwrap()); + match config_options { + Ok(options) => { + println!("{:?}", options) + } + Err(error) => panic!("{}", error), + }; + } +} diff --git a/tools/unitctl/unit-client-rs/src/unitd_docker.rs b/tools/unitctl/unit-client-rs/src/unitd_docker.rs new file mode 100644 index 00000000..2b9e0c7d --- /dev/null +++ b/tools/unitctl/unit-client-rs/src/unitd_docker.rs @@ -0,0 +1,456 @@ +use std::collections::HashMap; +use std::fs::read_to_string; +use std::io::stderr; +use std::path::{PathBuf, MAIN_SEPARATOR}; + +use crate::control_socket_address::ControlSocket; +use crate::futures::StreamExt; +use crate::unit_client::UnitClientError; +use crate::unitd_process::UnitdProcess; + +use bollard::container::{Config, ListContainersOptions, StartContainerOptions}; +use bollard::image::CreateImageOptions; +use bollard::models::{ContainerCreateResponse, ContainerSummary, HostConfig, Mount, MountTypeEnum}; +use bollard::secret::ContainerInspectResponse; +use bollard::Docker; + +use regex::Regex; + +use serde::ser::SerializeMap; +use serde::{Serialize, Serializer}; + +use pbr::ProgressBar; + +#[derive(Clone, Debug)] +pub struct UnitdContainer { + pub container_id: Option<String>, + pub container_image: String, + pub command: Option<String>, + pub mounts: HashMap<PathBuf, PathBuf>, + pub platform: String, + details: Option<ContainerInspectResponse>, +} + +impl From<&ContainerSummary> for UnitdContainer { + fn from(ctr: &ContainerSummary) -> Self { + // we assume paths from the docker api are absolute + // they certainly have to be later... + let mut mounts = HashMap::new(); + if let Some(mts) = &ctr.mounts { + for i in mts { + if let Some(ref src) = i.source { + if let Some(ref dest) = i.destination { + mounts.insert(PathBuf::from(dest.clone()), PathBuf::from(src.clone())); + } + } + } + } + + UnitdContainer { + container_id: ctr.id.clone(), + container_image: format!( + "{} (docker)", + ctr.image.clone().unwrap_or(String::from("unknown container")), + ), + command: ctr.command.clone(), + mounts: mounts, + platform: String::from("Docker"), + details: None, + } + } +} + +impl From<&UnitdContainer> for UnitdProcess { + fn from(ctr: &UnitdContainer) -> Self { + let version = ctr.details.as_ref().and_then(|details| { + details.config.as_ref().and_then(|conf| { + conf.labels.as_ref().and_then(|labels| { + labels + .get("org.opencontainers.image.version") + .and_then(|version| Some(version.clone())) + }) + }) + }); + let command = ctr.command.clone().and_then(|cmd| { + Some(format!( + "{}{} [{}{}]", + "unit: main v", + version.or(Some(String::from(""))).unwrap(), + ctr.container_image, + ctr.rewrite_socket( + cmd.strip_prefix("/usr/local/bin/docker-entrypoint.sh") + .or_else(|| Some("")) + .unwrap() + .to_string() + ) + )) + }); + let mut cmds = vec![]; + let _ = command.map_or((), |cmd| cmds.push(cmd)); + UnitdProcess { + all_cmds: cmds, + binary_name: ctr.container_image.clone(), + process_id: ctr + .details + .as_ref() + .and_then(|details| { + details + .state + .as_ref() + .and_then(|state| state.pid.and_then(|pid| Some(pid.clone() as u64))) + }) + .or(Some(0 as u64)) + .unwrap(), + executable_path: None, + environ: vec![], + working_dir: ctr.details.as_ref().and_then(|details| { + details.config.as_ref().and_then(|conf| { + Some( + PathBuf::from( + conf.working_dir + .as_ref() + .map_or(String::new(), |dir| ctr.host_path(dir.clone())), + ) + .into_boxed_path(), + ) + }) + }), + child_pids: vec![], + user: None, + effective_user: None, + container: Some(ctr.clone()), + } + } +} + +impl Serialize for UnitdContainer { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: Serializer, + { + // 5 = fields to serialize + let mut state = serializer.serialize_map(Some(5))?; + state.serialize_entry("container_id", &self.container_id)?; + state.serialize_entry("container_image", &self.container_image)?; + state.serialize_entry("command", &self.command)?; + state.serialize_entry("mounts", &self.mounts)?; + state.serialize_entry("platform", &self.platform)?; + state.end() + } +} + +impl UnitdContainer { + pub async fn find_unitd_containers() -> Vec<UnitdContainer> { + if let Ok(docker) = Docker::connect_with_local_defaults() { + match docker.list_containers::<String>(None).await { + Err(e) => { + eprintln!("{}", e); + vec![] + } + Ok(summary) => { + let unitd_command_re = Regex::new(r"^(.* )?unitd( .*)?$").unwrap(); + + // cant do this functionally because of the async call + let mut mapped = vec![]; + for ctr in summary { + if unitd_command_re.is_match(&ctr.clone().command.or(Some(String::new())).unwrap()) { + let mut c = UnitdContainer::from(&ctr); + if let Some(names) = ctr.names { + if names.len() > 0 { + let name = names[0].strip_prefix("/").or(Some(names[0].as_str())).unwrap(); + if let Ok(cir) = docker.inspect_container(name, None).await { + c.details = Some(cir); + } + } + } + mapped.push(c); + } + } + mapped + } + } + } else { + vec![] + } + } + + pub fn host_path(&self, container_path: String) -> String { + let cp = PathBuf::from(container_path); + // get only possible mount points + // sort to deepest mountpoint first + // assumed deepest possible mount point takes precedence + let mut keys = self + .mounts + .clone() + .into_keys() + .filter(|mp| cp.as_path().starts_with(mp)) + .collect::<Vec<_>>(); + keys.sort_by_key(|a| 0 as isize - a.ancestors().count() as isize); + + // either return translated path or original prefixed with "container" + if keys.len() > 0 { + let mut matches = self.mounts[&keys[0]].clone().join( + cp.as_path() + .strip_prefix(keys[0].clone()) + .expect("error checking path prefix"), + ); + /* Observed on M1 Mac that Docker on OSX + * adds a bunch of garbage to the mount path + * converting it into a useless directory + * that doesnt actually exist + */ + if cfg!(target_os = "macos") { + let mut abs = PathBuf::from(String::from(MAIN_SEPARATOR)); + let m = matches + .strip_prefix("/host_mnt/private") + .unwrap_or(matches.strip_prefix("/host_mnt").unwrap_or(matches.as_path())); + // make it absolute again + abs.push(m); + matches = abs; + } + matches.to_string_lossy().to_string() + } else { + format!("<container>:{}", cp.display()) + } + } + + pub fn rewrite_socket(&self, command: String) -> String { + command + .split(" ") + .map(|tok| { + if tok.starts_with("unix:") { + format!( + "unix:{}", + self.host_path(tok.strip_prefix("unix:").unwrap().to_string()) + ) + } else { + tok.to_string() + } + }) + .collect::<Vec<_>>() + .join(" ") + } + + pub fn container_is_running(&self) -> Option<bool> { + self.details + .as_ref() + .and_then(|details| details.state.as_ref().and_then(|state| state.running)) + } +} + +/* deploys a new docker image of tag $image_tag. + * mounts $socket to /var/run in the new container. + * mounts $application read only to /www. + * new container is on host network. + * + * ON SUCCESS returns vector of warnings from Docker API + * ON FAILURE returns wrapped error from Docker API + */ +pub async fn deploy_new_container( + socket: ControlSocket, + application: &String, + application_read_only: bool, + image: &String, +) -> Result<Vec<String>, UnitClientError> { + match Docker::connect_with_local_defaults() { + Ok(docker) => { + let mut mounts = vec![]; + // if a unix socket is specified, mounts its directory + if socket.is_local_socket() { + let mount_path = PathBuf::from(socket.clone()).as_path().to_string_lossy().to_string(); + mounts.push(Mount { + typ: Some(MountTypeEnum::BIND), + source: Some(mount_path), + target: Some("/var/run".to_string()), + ..Default::default() + }); + } + // mount application dir + mounts.push(Mount { + typ: Some(MountTypeEnum::BIND), + source: Some(application.clone()), + target: Some("/www".to_string()), + read_only: Some(application_read_only), + ..Default::default() + }); + + let mut pb = ProgressBar::on(stderr(), 10); + let mut totals = HashMap::new(); + let mut stream = docker.create_image( + Some(CreateImageOptions { + from_image: image.as_str(), + ..Default::default() + }), + None, + None, + ); + while let Some(res) = stream.next().await { + if let Ok(info) = res { + if let Some(id) = info.id { + if let Some(_) = totals.get_mut(&id) { + if let Some(delta) = info.progress_detail.and_then(|detail| detail.current) { + pb.add(delta as u64); + } + } else { + if let Some(total) = info.progress_detail.and_then(|detail| detail.total) { + totals.insert(id, total); + pb.total += total as u64; + } + } + } + } + } + pb.finish(); + + // create the new unit container + let resp: ContainerCreateResponse; + let host_conf = HostConfig { + mounts: Some(mounts), + network_mode: Some("host".to_string()), + ..Default::default() + }; + let mut container_conf = Config { + image: Some(image.clone()), + ..Default::default() + }; + if let ControlSocket::TcpSocket(ref uri) = socket { + let port = uri.port_u16().or(Some(80)).unwrap(); + // override port + container_conf.cmd = Some(vec![ + "unitd".to_string(), + "--no-daemon".to_string(), + "--control".to_string(), + format!("{}:{}", uri.host().unwrap(), port), + ]); + } + container_conf.host_config = Some(host_conf); + match docker.create_container::<String, String>(None, container_conf).await { + Err(err) => { + return Err(UnitClientError::UnitdDockerError { + message: err.to_string(), + }) + } + Ok(response) => resp = response, + } + + // create container gives us an ID + // but start container requires a name + let mut list_container_filters = HashMap::new(); + list_container_filters.insert("id".to_string(), vec![resp.id]); + match docker + .list_containers::<String>(Some(ListContainersOptions { + all: true, + limit: None, + size: false, + filters: list_container_filters, + })) + .await + { + // somehow our container doesnt exist + Err(e) => Err(UnitClientError::UnitdDockerError { message: e.to_string() }), + // here it is! + Ok(info) => { + if info.len() < 1 { + return Err(UnitClientError::UnitdDockerError { + message: "couldnt find new container".to_string(), + }); + } else if info[0].names.is_none() || info[0].names.clone().unwrap().len() < 1 { + return Err(UnitClientError::UnitdDockerError { + message: "new container has no name".to_string(), + }); + } + + // start our container + match docker + .start_container( + info[0].names.clone().unwrap()[0].strip_prefix(MAIN_SEPARATOR).unwrap(), + None::<StartContainerOptions<String>>, + ) + .await + { + Err(err) => Err(UnitClientError::UnitdDockerError { + message: err.to_string(), + }), + Ok(_) => Ok(resp.warnings), + } + } + } + } + Err(e) => Err(UnitClientError::UnitdDockerError { message: e.to_string() }), + } +} + +/* Returns either 64 char docker container ID or None */ +pub fn pid_is_dockerized(pid: u64) -> bool { + let cg_filepath = format!("/proc/{}/cgroup", pid); + match read_to_string(cg_filepath) { + Err(e) => { + eprintln!("{}", e); + false + } + Ok(contents) => { + let docker_re = Regex::new(r"docker-([a-zA-Z0-9]{64})").unwrap(); + docker_re.is_match(contents.as_str()) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_path_translation() { + let mut mounts = HashMap::new(); + mounts.insert("/1/2/3/4/5/6/7".into(), "/0".into()); + mounts.insert("/root".into(), "/1".into()); + mounts.insert("/root/mid".into(), "/2".into()); + mounts.insert("/root/mid/child".into(), "/3".into()); + mounts.insert("/mid/child".into(), "/4".into()); + mounts.insert("/child".into(), "/5".into()); + mounts.insert("/var".into(), "/host_mnt/private/6".into()); + mounts.insert("/var/var".into(), "/host_mnt/7".into()); + + let ctr = UnitdContainer { + container_id: None, + container_image: String::from(""), + command: None, + platform: "test".to_string(), + details: None, + mounts: mounts, + }; + + assert_eq!( + "/3/c2/test".to_string(), + ctr.host_path("/root/mid/child/c2/test".to_string()) + ); + assert_eq!( + "<container>:/path/to/conf".to_string(), + ctr.host_path("/path/to/conf".to_string()) + ); + if cfg!(target_os = "macos") { + assert_eq!("/6/test".to_string(), ctr.host_path("/var/test".to_string())); + assert_eq!("/7/test".to_string(), ctr.host_path("/var/var/test".to_string())); + } + } + + #[test] + fn test_unix_sock_path_translate() { + let mut mounts = HashMap::new(); + mounts.insert("/var/run".into(), "/tmp".into()); + + let ctr = UnitdContainer { + container_id: None, + container_image: String::from(""), + command: None, + platform: "test".to_string(), + details: None, + mounts: mounts, + }; + + assert_eq!( + ctr.rewrite_socket("unitd --no-daemon --control unix:/var/run/control.unit.sock".to_string()), + "unitd --no-daemon --control unix:/tmp/control.unit.sock".to_string() + ); + } +} diff --git a/tools/unitctl/unit-client-rs/src/unitd_instance.rs b/tools/unitctl/unit-client-rs/src/unitd_instance.rs new file mode 100644 index 00000000..ace8e858 --- /dev/null +++ b/tools/unitctl/unit-client-rs/src/unitd_instance.rs @@ -0,0 +1,403 @@ +use crate::unit_client::UnitClientError; +use crate::unitd_docker::UnitdContainer; +use serde::ser::SerializeMap; +use serde::{Serialize, Serializer}; +use std::error::Error as StdError; +use std::path::{Path, PathBuf}; +use std::{fmt, io}; +use which::which; + +use crate::runtime_flags::RuntimeFlags; +use crate::unitd_configure_options::UnitdConfigureOptions; +use crate::unitd_process::UnitdProcess; + +pub const UNITD_PATH_ENV_KEY: &str = "UNITD_PATH"; +pub const UNITD_BINARY_NAMES: [&str; 2] = ["unitd", "unitd-debug"]; + +#[derive(Debug)] +pub struct UnitdInstance { + pub process: UnitdProcess, + pub configure_options: Option<UnitdConfigureOptions>, + pub errors: Vec<UnitClientError>, +} + +impl Serialize for UnitdInstance { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: Serializer, + { + // 11 = fields to serialize + let mut state = serializer.serialize_map(Some(11))?; + let runtime_flags = self + .process + .cmd() + .and_then(|cmd| cmd.flags) + .map(|flags| flags.to_string()); + + let configure_flags = self.configure_options.as_ref().map(|opts| opts.all_flags.clone()); + + state.serialize_entry("process", &self.process)?; + state.serialize_entry("version", &self.version())?; + state.serialize_entry("control_socket", &self.control_api_socket_address())?; + state.serialize_entry("log_path", &self.log_path())?; + state.serialize_entry("pid_path", &self.pid_path())?; + state.serialize_entry("modules_directory", &self.modules_directory())?; + state.serialize_entry("state_directory", &self.state_directory())?; + state.serialize_entry("tmp_directory", &self.tmp_directory())?; + state.serialize_entry("runtime_flags", &runtime_flags)?; + state.serialize_entry("configure_flags", &configure_flags)?; + let string_errors = &self.errors.iter().map(|e| e.to_string()).collect::<Vec<String>>(); + state.serialize_entry("errors", string_errors)?; + + state.end() + } +} + +impl UnitdInstance { + pub async fn running_unitd_instances() -> Vec<UnitdInstance> { + Self::collect_unitd_processes( + UnitdProcess::find_unitd_processes() + .into_iter() + .chain( + UnitdContainer::find_unitd_containers() + .await + .into_iter() + .map(|x| UnitdProcess::from(&x)) + .collect::<Vec<_>>(), + ) + .collect(), + ) + } + + /// Find all running unitd processes and convert them into UnitdInstances and filter + /// out all errors by printing them to stderr and leaving errored instances out of + /// the returned vector. + fn collect_unitd_processes(processes: Vec<UnitdProcess>) -> Vec<UnitdInstance> { + Self::map_processes_to_instances(processes).into_iter().collect() + } + + fn map_processes_to_instances(processes: Vec<UnitdProcess>) -> Vec<UnitdInstance> { + fn unitd_path_from_process(process: &UnitdProcess) -> Result<Box<Path>, UnitClientError> { + match process.executable_path() { + Some(executable_path) => { + let is_absolute_working_dir = process + .working_dir + .as_ref() + .map(|p| p.is_absolute()) + .unwrap_or_default(); + if executable_path.is_absolute() { + Ok(executable_path.to_owned()) + } else if executable_path.is_relative() && is_absolute_working_dir { + let new_path = process + .working_dir + .as_ref() + .unwrap() + .join(executable_path) + .canonicalize() + .map(|path| path.into_boxed_path()) + .map_err(|error| UnitClientError::UnitdProcessParseError { + message: format!("Error canonicalizing unitd executable path: {}", error), + pid: process.process_id, + })?; + Ok(new_path) + } else if process.container.is_none() { + Err(UnitClientError::UnitdProcessParseError { + message: "Unable to get absolute unitd executable path from process".to_string(), + pid: process.process_id, + }) + } else { + // container case + Ok(PathBuf::from("/").into_boxed_path()) + } + } + None => Err(UnitClientError::UnitdProcessParseError { + message: "Unable to get unitd executable path from process".to_string(), + pid: process.process_id, + }), + } + } + + fn map_process_to_unitd_instance(process: &UnitdProcess) -> UnitdInstance { + match unitd_path_from_process(process) { + Ok(_) if process.container.is_some() => { + let mut err = vec![]; + // double check that it is running + let running = process.container.as_ref().unwrap().container_is_running(); + + if running.is_none() || !running.unwrap() { + err.push(UnitClientError::UnitdProcessParseError { + message: "process container is not running".to_string(), + pid: process.process_id, + }); + } + + UnitdInstance { + process: process.to_owned(), + configure_options: None, + errors: err, + } + } + Ok(unitd_path) => match UnitdConfigureOptions::new(&unitd_path.clone().into_path_buf()) { + Ok(configure_options) => UnitdInstance { + process: process.to_owned(), + configure_options: Some(configure_options), + errors: vec![], + }, + Err(error) => { + let error = UnitClientError::UnitdProcessExecError { + source: error, + executable_path: unitd_path.to_string_lossy().parse().unwrap_or_default(), + message: "Error running unitd binary to get configure options".to_string(), + pid: process.process_id, + }; + UnitdInstance { + process: process.to_owned(), + configure_options: None, + errors: vec![error], + } + } + }, + Err(err) => UnitdInstance { + process: process.to_owned(), + configure_options: None, + errors: vec![err], + }, + } + } + + processes + .iter() + // This converts processes into a UnitdInstance + .map(map_process_to_unitd_instance) + .collect() + } + + fn version(&self) -> Option<String> { + match self.process.cmd()?.version { + Some(version) => Some(version), + None => self.configure_options.as_ref().map(|opts| opts.version.to_string()), + } + } + + fn flag_or_default_option<R>( + &self, + read_flag: fn(RuntimeFlags) -> Option<R>, + read_opts: fn(UnitdConfigureOptions) -> Option<R>, + ) -> Option<R> { + self.process + .cmd()? + .flags + .and_then(read_flag) + .or_else(|| self.configure_options.to_owned().and_then(read_opts)) + } + + pub fn control_api_socket_address(&self) -> Option<String> { + self.flag_or_default_option( + |flags| flags.control_api_socket_address(), + |opts| opts.default_control_api_socket_address(), + ) + } + + pub fn pid_path(&self) -> Option<Box<Path>> { + self.flag_or_default_option(|flags| flags.pid_path(), |opts| opts.default_pid_path()) + } + + pub fn log_path(&self) -> Option<Box<Path>> { + self.flag_or_default_option(|flags| flags.log_path(), |opts| opts.default_log_path()) + } + + pub fn modules_directory(&self) -> Option<Box<Path>> { + self.flag_or_default_option( + |flags| flags.modules_directory(), + |opts| opts.default_modules_directory(), + ) + } + + pub fn state_directory(&self) -> Option<Box<Path>> { + self.flag_or_default_option(|flags| flags.state_directory(), |opts| opts.default_state_directory()) + } + + pub fn tmp_directory(&self) -> Option<Box<Path>> { + self.flag_or_default_option(|flags| flags.tmp_directory(), |opts| opts.default_tmp_directory()) + } +} + +impl fmt::Display for UnitdInstance { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + const UNKNOWN: &str = "[unknown]"; + let version = self.version().unwrap_or_else(|| String::from("[unknown]")); + let runtime_flags = self + .process + .cmd() + .and_then(|cmd| cmd.flags) + .map(|flags| flags.to_string()) + .unwrap_or_else(|| UNKNOWN.into()); + let configure_flags = self + .configure_options + .as_ref() + .map(|opts| opts.all_flags.clone()) + .unwrap_or_else(|| UNKNOWN.into()); + let unitd_path: String = self + .process + .executable_path() + .map(|p| p.to_string_lossy().into()) + .unwrap_or_else(|| UNKNOWN.into()); + let working_dir: String = self + .process + .working_dir + .as_ref() + .map(|p| p.to_string_lossy().into()) + .unwrap_or_else(|| UNKNOWN.into()); + let socket_address = self.control_api_socket_address().unwrap_or_else(|| UNKNOWN.to_string()); + let child_pids = self + .process + .child_pids + .iter() + .map(u64::to_string) + .collect::<Vec<String>>() + .join(", "); + + writeln!( + f, + "{} instance [pid: {}, version: {}]:", + self.process.binary_name, self.process.process_id, version + )?; + writeln!(f, " Executable: {}", unitd_path)?; + writeln!(f, " Process working directory: {}", working_dir)?; + write!(f, " Process ownership: ")?; + if let Some(user) = &self.process.user { + writeln!(f, "name: {}, uid: {}, gid: {}", user.name, user.uid, user.gid)?; + } else { + writeln!(f, "{}", UNKNOWN)?; + } + write!(f, " Process effective ownership: ")?; + if let Some(user) = &self.process.effective_user { + writeln!(f, "name: {}, uid: {}, gid: {}", user.name, user.uid, user.gid)?; + } else { + writeln!(f, "{}", UNKNOWN)?; + } + + writeln!(f, " API control unix socket: {}", socket_address)?; + writeln!(f, " Child processes ids: {}", child_pids)?; + writeln!(f, " Runtime flags: {}", runtime_flags)?; + writeln!(f, " Configure options: {}", configure_flags)?; + + if let Some(ctr) = &self.process.container { + writeln!(f, " Container:")?; + writeln!(f, " Platform: {}", ctr.platform)?; + if let Some(id) = ctr.container_id.clone() { + writeln!(f, " Container ID: {}", id)?; + } + writeln!(f, " Mounts:")?; + for (k, v) in &ctr.mounts { + writeln!(f, " {} => {}", k.to_string_lossy(), v.to_string_lossy())?; + } + } + + if !self.errors.is_empty() { + write!(f, " Errors:")?; + for error in &self.errors { + write!(f, "\n {}", error)?; + } + } + + Ok(()) + } +} + +pub fn find_executable_path(specific_path: Result<String, Box<dyn StdError>>) -> Result<PathBuf, Box<dyn StdError>> { + fn find_unitd_in_system_path() -> Vec<PathBuf> { + UNITD_BINARY_NAMES + .iter() + .map(which) + .filter_map(Result::ok) + .collect::<Vec<PathBuf>>() + } + + match specific_path { + Ok(path) => Ok(PathBuf::from(path)), + Err(_) => { + let unitd_paths = find_unitd_in_system_path(); + if unitd_paths.is_empty() { + let err_msg = format!( + "Could not find unitd in system path or in UNITD_PATH environment variable. Searched for: {:?}", + UNITD_BINARY_NAMES + ); + let err = io::Error::new(io::ErrorKind::NotFound, err_msg); + Err(Box::from(err)) + } else { + Ok(unitd_paths[0].clone()) + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use rand::rngs::StdRng; + use rand::{RngCore, SeedableRng}; + + // We don't need a secure seed for testing, in fact it is better that we have a + // predictable value + const SEED: [u8; 32] = [ + 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, + 30, 31, + ]; + #[tokio::test] + async fn can_find_unitd_instances() { + UnitdInstance::running_unitd_instances().await.iter().for_each(|p| { + println!("{:?}", p); + println!("Runtime Flags: {:?}", p.process.cmd().map(|c| c.flags)); + println!("Temp directory: {:?}", p.tmp_directory()); + }) + } + + fn mock_process<S: Into<String>>( + rng: &mut StdRng, + binary_name: S, + executable_path: Option<String>, + ) -> UnitdProcess { + UnitdProcess { + process_id: rng.next_u32() as u64, + binary_name: binary_name.into(), + executable_path: executable_path.map(|p| Box::from(Path::new(&p))), + environ: vec![], + all_cmds: vec![], + working_dir: Some(Box::from(Path::new("/opt/unit"))), + child_pids: vec![], + user: None, + effective_user: None, + container: None, + } + } + + #[test] + fn will_list_without_errors_valid_processes() { + let specific_path = std::env::var(UNITD_PATH_ENV_KEY).map_err(|error| Box::new(error) as Box<dyn StdError>); + let binding = match find_executable_path(specific_path) { + Ok(path) => path, + Err(error) => { + eprintln!("Could not find unitd executable path: {} - skipping test", error); + return; + } + }; + let binary_name = binding + .file_name() + .expect("Could not get binary name") + .to_string_lossy() + .to_string(); + let unitd_path = binding.to_string_lossy(); + let mut rng: StdRng = SeedableRng::from_seed(SEED); + + let processes = vec![ + mock_process(&mut rng, &binary_name, Some(unitd_path.to_string())), + mock_process(&mut rng, &binary_name, Some(unitd_path.to_string())), + ]; + let instances = UnitdInstance::collect_unitd_processes(processes); + // assert_eq!(instances.len(), 3); + instances.iter().for_each(|p| { + assert_eq!(p.errors.len(), 0, "Expected no errors, got: {:?}", p.errors); + }) + } +} diff --git a/tools/unitctl/unit-client-rs/src/unitd_process.rs b/tools/unitctl/unit-client-rs/src/unitd_process.rs new file mode 100644 index 00000000..3dc0c3af --- /dev/null +++ b/tools/unitctl/unit-client-rs/src/unitd_process.rs @@ -0,0 +1,196 @@ +use crate::unitd_cmd::UnitdCmd; +use crate::unitd_docker::{pid_is_dockerized, UnitdContainer}; +use crate::unitd_instance::UNITD_BINARY_NAMES; +use crate::unitd_process_user::UnitdProcessUser; +use serde::ser::SerializeMap; +use serde::{Serialize, Serializer}; +use std::collections::HashMap; +use std::path::Path; +use sysinfo::{Pid, Process, ProcessRefreshKind, System, UpdateKind, Users}; + +#[derive(Debug, Clone)] +pub struct UnitdProcess { + pub binary_name: String, + pub process_id: u64, + pub executable_path: Option<Box<Path>>, + pub environ: Vec<String>, + pub all_cmds: Vec<String>, + pub working_dir: Option<Box<Path>>, + pub child_pids: Vec<u64>, + pub user: Option<UnitdProcessUser>, + pub effective_user: Option<UnitdProcessUser>, + pub container: Option<UnitdContainer>, +} + +impl Serialize for UnitdProcess { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: Serializer, + { + // 6 = fields to serialize + let mut state = serializer.serialize_map(Some(6))?; + state.serialize_entry("pid", &self.process_id)?; + state.serialize_entry("user", &self.user)?; + state.serialize_entry("effective_user", &self.effective_user)?; + state.serialize_entry("executable", &self.executable_path())?; + state.serialize_entry("child_pids", &self.child_pids)?; + state.serialize_entry("container", &self.container)?; + state.end() + } +} + +impl UnitdProcess { + pub fn find_unitd_processes() -> Vec<UnitdProcess> { + let process_refresh_kind = ProcessRefreshKind::new() + .with_cmd(UpdateKind::Always) + .with_cwd(UpdateKind::Always) + .with_exe(UpdateKind::Always) + .with_user(UpdateKind::Always); + let refresh_kind = sysinfo::RefreshKind::new().with_processes(process_refresh_kind); + let sys = System::new_with_specifics(refresh_kind); + let unitd_processes: HashMap<&Pid, &Process> = sys + .processes() + .iter() + .filter(|p| { + let process_name = p.1.name(); + UNITD_BINARY_NAMES.contains(&process_name) + }) + .collect::<HashMap<&Pid, &Process>>(); + let users = Users::new_with_refreshed_list(); + + unitd_processes + .iter() + // Filter out child processes + .filter(|p| { + #[cfg(target_os = "linux")] + if pid_is_dockerized(p.0.as_u32().into()) { + return false; + } + let parent_pid = p.1.parent(); + match parent_pid { + Some(pid) => !unitd_processes.contains_key(&pid), + None => false, + } + }) + .map(|p| { + let tuple = p.to_owned(); + /* The sysinfo library only supports 32-bit pids, yet larger values are possible + * if the OS is configured to support it, thus we use 64-bit integers internally + * because it is just a matter of time until the library changes to larger values. */ + let pid = *tuple.0; + let process = *tuple.1; + let process_id: u64 = pid.as_u32().into(); + let executable_path: Option<Box<Path>> = process.exe().map(|p| p.to_path_buf().into_boxed_path()); + let environ: Vec<String> = process.environ().into(); + let cmd: Vec<String> = process.cmd().into(); + let working_dir: Option<Box<Path>> = process.cwd().map(|p| p.to_path_buf().into_boxed_path()); + let child_pids = unitd_processes + .iter() + .filter_map(|p| p.to_owned().1.parent()) + .filter(|parent_pid| parent_pid == pid) + .map(|p| p.as_u32() as u64) + .collect::<Vec<u64>>(); + + let user = process + .user_id() + .and_then(|uid| users.get_user_by_id(uid)) + .map(UnitdProcessUser::from); + let effective_user = process + .effective_user_id() + .and_then(|uid| users.get_user_by_id(uid)) + .map(UnitdProcessUser::from); + + UnitdProcess { + binary_name: process.name().to_string(), + process_id, + executable_path, + environ, + all_cmds: cmd, + working_dir, + child_pids, + user, + effective_user, + container: None, + } + }) + .collect::<Vec<UnitdProcess>>() + } + + pub fn cmd(&self) -> Option<UnitdCmd> { + if self.all_cmds.is_empty() { + return None; + } + + match UnitdCmd::new(self.all_cmds[0].clone(), self.binary_name.as_ref()) { + Ok(cmd) => Some(cmd), + Err(error) => { + eprintln!("Failed to parse process cmd: {}", error); + None + } + } + } + + pub fn executable_path(&self) -> Option<Box<Path>> { + if self.executable_path.is_some() { + return self.executable_path.clone(); + } + self.cmd().and_then(|cmd| cmd.process_executable_path) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn can_parse_runtime_cmd_absolute_path(binary_name: &str) { + let cmd = format!( + "unit: main v1.28.0 [/usr/sbin/{} --log /var/log/unit.log --pid /var/run/unit.pid]", + binary_name + ); + let unitd_cmd = UnitdCmd::new(cmd, binary_name).expect("Failed to parse unitd cmd"); + assert_eq!(unitd_cmd.version.unwrap(), "1.28.0"); + assert_eq!( + unitd_cmd.process_executable_path.unwrap().to_string_lossy(), + format!("/usr/sbin/{}", binary_name) + ); + let flags = unitd_cmd.flags.unwrap(); + assert_eq!(flags.get_flag_value("log").unwrap(), "/var/log/unit.log"); + assert_eq!(flags.get_flag_value("pid").unwrap(), "/var/run/unit.pid"); + } + + fn can_parse_runtime_cmd_relative_path(binary_name: &str) { + let cmd = format!( + "unit: main v1.29.0 [./sbin/{} --no-daemon --tmp /tmp --something]", + binary_name + ); + let unitd_cmd = UnitdCmd::new(cmd, binary_name).expect("Failed to parse unitd cmd"); + assert_eq!(unitd_cmd.version.unwrap(), "1.29.0"); + assert_eq!( + unitd_cmd.process_executable_path.unwrap().to_string_lossy(), + format!("./sbin/{}", binary_name) + ); + let flags = unitd_cmd.flags.unwrap(); + assert_eq!(flags.get_flag_value("tmp").unwrap(), "/tmp"); + assert!(flags.has_flag("something")); + } + + #[test] + fn can_parse_runtime_cmd_unitd_absolute_path() { + can_parse_runtime_cmd_absolute_path("unitd"); + } + + #[test] + fn can_parse_runtime_cmd_unitd_debug_absolute_path() { + can_parse_runtime_cmd_absolute_path("unitd-debug"); + } + + #[test] + fn can_parse_runtime_cmd_unitd_relative_path() { + can_parse_runtime_cmd_relative_path("unitd"); + } + + #[test] + fn can_parse_runtime_cmd_unitd_debug_relative_path() { + can_parse_runtime_cmd_relative_path("unitd-debug"); + } +} diff --git a/tools/unitctl/unit-client-rs/src/unitd_process_user.rs b/tools/unitctl/unit-client-rs/src/unitd_process_user.rs new file mode 100644 index 00000000..c4f9be22 --- /dev/null +++ b/tools/unitctl/unit-client-rs/src/unitd_process_user.rs @@ -0,0 +1,36 @@ +use serde::Serialize; +use std::fmt; +use std::fmt::Display; +use sysinfo::User; + +#[derive(Debug, Clone, Serialize)] +pub struct UnitdProcessUser { + pub name: String, + pub uid: u32, + pub gid: u32, + pub groups: Vec<String>, +} + +impl Display for UnitdProcessUser { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "name: {}, uid: {}, gid: {}, groups: {}", + self.name, + self.uid, + self.gid, + self.groups.join(", ") + ) + } +} + +impl From<&User> for UnitdProcessUser { + fn from(user: &User) -> Self { + UnitdProcessUser { + name: user.name().into(), + uid: *user.id().clone(), + gid: *user.group_id(), + groups: user.groups().iter().map(|g| g.name().into()).collect(), + } + } +} |