diff options
Diffstat (limited to 'tools/unitctl/unit-client-rs/src')
-rw-r--r-- | tools/unitctl/unit-client-rs/src/control_socket_address.rs | 571 | ||||
-rw-r--r-- | tools/unitctl/unit-client-rs/src/lib.rs | 15 | ||||
-rw-r--r-- | tools/unitctl/unit-client-rs/src/runtime_flags.rs | 90 | ||||
-rw-r--r-- | tools/unitctl/unit-client-rs/src/unit_client.rs | 393 | ||||
-rw-r--r-- | tools/unitctl/unit-client-rs/src/unitd_cmd.rs | 85 | ||||
-rw-r--r-- | tools/unitctl/unit-client-rs/src/unitd_configure_options.rs | 235 | ||||
-rw-r--r-- | tools/unitctl/unit-client-rs/src/unitd_instance.rs | 360 | ||||
-rw-r--r-- | tools/unitctl/unit-client-rs/src/unitd_process.rs | 170 | ||||
-rw-r--r-- | tools/unitctl/unit-client-rs/src/unitd_process_user.rs | 36 |
9 files changed, 1955 insertions, 0 deletions
diff --git a/tools/unitctl/unit-client-rs/src/control_socket_address.rs b/tools/unitctl/unit-client-rs/src/control_socket_address.rs new file mode 100644 index 00000000..b9ae5afc --- /dev/null +++ b/tools/unitctl/unit-client-rs/src/control_socket_address.rs @@ -0,0 +1,571 @@ +use crate::control_socket_address::ControlSocket::{TcpSocket, UnixLocalAbstractSocket, UnixLocalSocket}; +use crate::control_socket_address::ControlSocketScheme::{HTTP, HTTPS}; +use crate::unit_client::UnitClientError; +use hyper::http::uri::{Authority, PathAndQuery}; +use hyper::Uri; +use std::fmt::{Display, Formatter}; +use std::fs; +use std::os::unix::fs::FileTypeExt; +use std::path::{PathBuf, MAIN_SEPARATOR}; + +type AbstractSocketName = String; +type UnixSocketPath = PathBuf; +type Port = u16; + +#[derive(Debug, Clone)] +pub enum ControlSocket { + UnixLocalAbstractSocket(AbstractSocketName), + UnixLocalSocket(UnixSocketPath), + TcpSocket(Uri), +} + +#[derive(Debug)] +pub enum ControlSocketScheme { + HTTP, + HTTPS, +} + +impl ControlSocketScheme { + fn port(&self) -> Port { + match self { + HTTP => 80, + HTTPS => 443, + } + } +} + +impl ControlSocket { + pub fn socket_scheme(&self) -> ControlSocketScheme { + match self { + UnixLocalAbstractSocket(_) => ControlSocketScheme::HTTP, + UnixLocalSocket(_) => ControlSocketScheme::HTTP, + TcpSocket(uri) => match uri.scheme_str().expect("Scheme should not be None") { + "http" => ControlSocketScheme::HTTP, + "https" => ControlSocketScheme::HTTPS, + _ => unreachable!("Scheme should be http or https"), + }, + } + } + + pub fn create_uri_with_path(&self, str_path: &str) -> Uri { + match self { + UnixLocalAbstractSocket(name) => { + let socket_path = PathBuf::from(format!("@{}", name)); + hyperlocal::Uri::new(socket_path, str_path).into() + } + UnixLocalSocket(socket_path) => hyperlocal::Uri::new(socket_path, str_path).into(), + TcpSocket(uri) => { + if str_path.is_empty() { + uri.clone() + } else { + let authority = uri.authority().expect("Authority should not be None"); + Uri::builder() + .scheme(uri.scheme_str().expect("Scheme should not be None")) + .authority(authority.clone()) + .path_and_query(str_path) + .build() + .expect("URI should be valid") + } + } + } + } +} + +impl Display for ControlSocket { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + UnixLocalAbstractSocket(name) => f.write_fmt(format_args!("unix:@{}", name)), + UnixLocalSocket(path) => f.write_fmt(format_args!("unix:{}", path.to_string_lossy())), + TcpSocket(uri) => uri.fmt(f), + } + } +} + +impl From<ControlSocket> for String { + fn from(val: ControlSocket) -> Self { + val.to_string() + } +} + +impl From<ControlSocket> for PathBuf { + fn from(val: ControlSocket) -> Self { + match val { + UnixLocalAbstractSocket(socket_name) => PathBuf::from(format!("@{}", socket_name)), + UnixLocalSocket(socket_path) => socket_path, + TcpSocket(_) => PathBuf::default(), + } + } +} + +impl From<ControlSocket> for Uri { + fn from(val: ControlSocket) -> Self { + val.create_uri_with_path("") + } +} + +impl ControlSocket { + pub fn validate_http_address(uri: Uri) -> Result<(), UnitClientError> { + let http_address = uri.to_string(); + if uri.authority().is_none() { + return Err(UnitClientError::TcpSocketAddressParseError { + message: "No authority found in socket address".to_string(), + control_socket_address: http_address, + }); + } + if uri.port_u16().is_none() { + return Err(UnitClientError::TcpSocketAddressNoPortError { + control_socket_address: http_address, + }); + } + if !(uri.path().is_empty() || uri.path().eq("/")) { + return Err(UnitClientError::TcpSocketAddressParseError { + message: format!("Path is not empty or is not / [path={}]", uri.path()), + control_socket_address: http_address, + }); + } + + Ok(()) + } + + pub fn validate_unix_address(socket: PathBuf) -> Result<(), UnitClientError> { + if !socket.exists() { + return Err(UnitClientError::UnixSocketNotFound { + control_socket_address: socket.to_string_lossy().to_string(), + }); + } + let metadata = fs::metadata(&socket).map_err(|error| UnitClientError::UnixSocketAddressError { + source: error, + control_socket_address: socket.to_string_lossy().to_string(), + })?; + let file_type = metadata.file_type(); + if !file_type.is_socket() { + return Err(UnitClientError::UnixSocketAddressError { + source: std::io::Error::new(std::io::ErrorKind::Other, "Control socket path is not a socket"), + control_socket_address: socket.to_string_lossy().to_string(), + }); + } + + Ok(()) + } + + pub fn validate(&self) -> Result<Self, UnitClientError> { + match self { + UnixLocalAbstractSocket(socket_name) => { + let socket_path = PathBuf::from(format!("@{}", socket_name)); + Self::validate_unix_address(socket_path.clone()) + } + UnixLocalSocket(socket_path) => Self::validate_unix_address(socket_path.clone()), + TcpSocket(socket_uri) => Self::validate_http_address(socket_uri.clone()), + } + .map(|_| self.to_owned()) + } + + fn normalize_and_parse_http_address(http_address: String) -> Result<Uri, UnitClientError> { + // Convert *:1 style network addresses to URI format + let address = if http_address.starts_with("*:") { + http_address.replacen("*:", "http://127.0.0.1:", 1) + // Add scheme if not present + } else if !(http_address.starts_with("http://") || http_address.starts_with("https://")) { + format!("http://{}", http_address) + } else { + http_address.to_owned() + }; + + let is_https = address.starts_with("https://"); + + let parsed_uri = + Uri::try_from(address.as_str()).map_err(|error| UnitClientError::TcpSocketAddressUriError { + source: error, + control_socket_address: address, + })?; + let authority = parsed_uri.authority().expect("Authority should not be None"); + let expected_port = if is_https { HTTPS.port() } else { HTTP.port() }; + let normalized_authority = match authority.port_u16() { + Some(_) => authority.to_owned(), + None => { + let host = format!("{}:{}", authority.host(), expected_port); + Authority::try_from(host.as_str()).expect("Authority should be valid") + } + }; + + let normalized_uri = Uri::builder() + .scheme(parsed_uri.scheme_str().expect("Scheme should not be None")) + .authority(normalized_authority) + .path_and_query(PathAndQuery::from_static("")) + .build() + .map_err(|error| UnitClientError::TcpSocketAddressParseError { + message: error.to_string(), + control_socket_address: http_address.clone(), + })?; + + Ok(normalized_uri) + } + + /// Flexibly parse a textual representation of a socket address + fn parse_address<S: Into<String>>(socket_address: S) -> Result<Self, UnitClientError> { + let full_socket_address: String = socket_address.into(); + let socket_prefix = "unix:"; + let socket_uri_prefix = "unix://"; + let mut buf = String::with_capacity(socket_prefix.len()); + for (i, c) in full_socket_address.char_indices() { + // Abstract unix socket with no prefix + if i == 0 && c == '@' { + return Ok(UnixLocalAbstractSocket(full_socket_address[1..].to_string())); + } + buf.push(c); + // Unix socket with prefix + if i == socket_prefix.len() - 1 && buf.eq(socket_prefix) { + let path_text = full_socket_address[socket_prefix.len()..].to_string(); + // Return here if this URI does not have a scheme followed by double slashes + if !path_text.starts_with("//") { + return match path_text.strip_prefix('@') { + Some(name) => Ok(UnixLocalAbstractSocket(name.to_string())), + None => { + let path = PathBuf::from(path_text); + Ok(UnixLocalSocket(path)) + } + }; + } + } + + // Unix socket with URI prefix + if i == socket_uri_prefix.len() - 1 && buf.eq(socket_uri_prefix) { + let uri = Uri::try_from(full_socket_address.as_str()).map_err(|error| { + UnitClientError::TcpSocketAddressParseError { + message: error.to_string(), + control_socket_address: full_socket_address.clone(), + } + })?; + return ControlSocket::try_from(uri); + } + } + + /* Sockets on Windows are not supported, so there is no need to check + * if the socket address is a valid path, so we can do this shortcut + * here to see if a path was specified without a unix: prefix. */ + if buf.starts_with(MAIN_SEPARATOR) { + let path = PathBuf::from(buf); + return Ok(UnixLocalSocket(path)); + } + + let uri = Self::normalize_and_parse_http_address(buf)?; + Ok(TcpSocket(uri)) + } + + pub fn is_local_socket(&self) -> bool { + match self { + UnixLocalAbstractSocket(_) | UnixLocalSocket(_) => true, + TcpSocket(_) => false, + } + } +} + +impl TryFrom<String> for ControlSocket { + type Error = UnitClientError; + + fn try_from(socket_address: String) -> Result<Self, Self::Error> { + ControlSocket::parse_address(socket_address.as_str()) + } +} + +impl TryFrom<&str> for ControlSocket { + type Error = UnitClientError; + + fn try_from(socket_address: &str) -> Result<Self, Self::Error> { + ControlSocket::parse_address(socket_address) + } +} + +impl TryFrom<Uri> for ControlSocket { + type Error = UnitClientError; + + fn try_from(socket_uri: Uri) -> Result<Self, Self::Error> { + match socket_uri.scheme_str() { + // URIs with the unix scheme will have a hostname that is a hex encoded string + // representing the path to the socket + Some("unix") => { + let host = match socket_uri.host() { + Some(host) => host, + None => { + return Err(UnitClientError::TcpSocketAddressParseError { + message: "No host found in socket address".to_string(), + control_socket_address: socket_uri.to_string(), + }) + } + }; + let bytes = hex::decode(host).map_err(|error| UnitClientError::TcpSocketAddressParseError { + message: error.to_string(), + control_socket_address: socket_uri.to_string(), + })?; + let path = String::from_utf8_lossy(&bytes); + ControlSocket::parse_address(path) + } + Some("http") | Some("https") => Ok(TcpSocket(socket_uri)), + Some(unknown) => Err(UnitClientError::TcpSocketAddressParseError { + message: format!("Unsupported scheme found in socket address: {}", unknown).to_string(), + control_socket_address: socket_uri.to_string(), + }), + None => Err(UnitClientError::TcpSocketAddressParseError { + message: "No scheme found in socket address".to_string(), + control_socket_address: socket_uri.to_string(), + }), + } + } +} + +#[cfg(test)] +mod tests { + use rand::distributions::{Alphanumeric, DistString}; + use std::env::temp_dir; + use std::fmt::Display; + use std::io; + use std::os::unix::net::UnixListener; + + use super::*; + + struct TempSocket { + socket_path: PathBuf, + _listener: UnixListener, + } + + impl TempSocket { + fn shutdown(&mut self) -> io::Result<()> { + fs::remove_file(&self.socket_path) + } + } + + impl Display for TempSocket { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "unix:{}", self.socket_path.to_string_lossy().to_string()) + } + } + + impl Drop for TempSocket { + fn drop(&mut self) { + self.shutdown() + .expect(format!("Unable to shutdown socket {}", self.socket_path.to_string_lossy()).as_str()); + } + } + + #[test] + fn will_error_with_nonexistent_unix_socket() { + let socket_address = "unix:/tmp/some_random_filename_that_doesnt_exist.sock"; + let control_socket = + ControlSocket::try_from(socket_address).expect("No error should be returned until validate() is called"); + assert!(control_socket.is_local_socket(), "Not parsed as a local socket"); + assert!(control_socket.validate().is_err(), "Socket should not be valid"); + } + + #[test] + fn can_parse_socket_with_prefix() { + let temp_socket = create_file_socket().expect("Unable to create socket"); + let control_socket = ControlSocket::try_from(temp_socket.to_string()).expect("Error parsing good socket path"); + assert!(control_socket.is_local_socket(), "Not parsed as a local socket"); + if let Err(e) = control_socket.validate() { + panic!("Socket should be valid: {}", e); + } + } + + #[test] + fn can_parse_socket_from_uri() { + let temp_socket = create_file_socket().expect("Unable to create socket"); + let uri: Uri = hyperlocal::Uri::new(temp_socket.socket_path.clone(), "").into(); + let control_socket = ControlSocket::try_from(uri).expect("Error parsing good socket path"); + assert!(control_socket.is_local_socket(), "Not parsed as a local socket"); + if let Err(e) = control_socket.validate() { + panic!("Socket should be valid: {}", e); + } + } + + #[test] + fn can_parse_socket_from_uri_text() { + let temp_socket = create_file_socket().expect("Unable to create socket"); + let uri: Uri = hyperlocal::Uri::new(temp_socket.socket_path.clone(), "").into(); + let control_socket = ControlSocket::parse_address(uri.to_string()).expect("Error parsing good socket path"); + assert!(control_socket.is_local_socket(), "Not parsed as a local socket"); + if let Err(e) = control_socket.validate() { + panic!("Socket for input text should be valid: {}", e); + } + } + + #[test] + #[cfg(target_os = "linux")] + fn can_parse_abstract_socket_from_uri() { + let temp_socket = create_abstract_socket().expect("Unable to create socket"); + let uri: Uri = hyperlocal::Uri::new(temp_socket.socket_path.clone(), "").into(); + let control_socket = ControlSocket::try_from(uri).expect("Error parsing good socket path"); + assert!(control_socket.is_local_socket(), "Not parsed as a local socket"); + if let Err(e) = control_socket.validate() { + panic!("Socket should be valid: {}", e); + } + } + + #[test] + #[cfg(target_os = "linux")] + fn can_parse_abstract_socket_from_uri_text() { + let temp_socket = create_abstract_socket().expect("Unable to create socket"); + let uri: Uri = hyperlocal::Uri::new(temp_socket.socket_path.clone(), "").into(); + let control_socket = ControlSocket::parse_address(uri.to_string()).expect("Error parsing good socket path"); + assert!(control_socket.is_local_socket(), "Not parsed as a local socket"); + if let Err(e) = control_socket.validate() { + panic!("Socket should be valid: {}", e); + } + } + + #[test] + fn can_parse_socket_without_prefix() { + let temp_socket = create_file_socket().expect("Unable to create socket"); + let control_socket = ControlSocket::try_from(temp_socket.socket_path.to_string_lossy().to_string()) + .expect("Error parsing good socket path"); + assert!(control_socket.is_local_socket(), "Not parsed as a local socket"); + if let Err(e) = control_socket.validate() { + panic!("Socket should be valid: {}", e); + } + } + + #[cfg(target_os = "linux")] + #[test] + fn can_parse_abstract_socket() { + let temp_socket = create_abstract_socket().expect("Unable to create socket"); + let control_socket = ControlSocket::try_from(temp_socket.to_string()).expect("Error parsing good socket path"); + assert!(control_socket.is_local_socket(), "Not parsed as a local socket"); + if let Err(e) = control_socket.validate() { + panic!("Socket should be valid: {}", e); + } + } + + #[test] + fn can_normalize_good_http_socket_addresses() { + let valid_socket_addresses = vec![ + "http://127.0.0.1:8080", + "https://127.0.0.1:8080", + "http://127.0.0.1:8080/", + "127.0.0.1:8080", + "http://0.0.0.0:8080", + "https://0.0.0.0:8080", + "http://0.0.0.0:8080/", + "0.0.0.0:8080", + "http://localhost:8080", + "https://localhost:8080", + "http://localhost:8080/", + "localhost:8080", + "http://[::1]:8080", + "https://[::1]:8080", + "http://[::1]:8080/", + "[::1]:8080", + "http://[0000:0000:0000:0000:0000:0000:0000:0000]:8080", + "https://[0000:0000:0000:0000:0000:0000:0000:0000]:8080", + "http://[0000:0000:0000:0000:0000:0000:0000:0000]:8080/", + "[0000:0000:0000:0000:0000:0000:0000:0000]:8080", + ]; + for socket_address in valid_socket_addresses { + let mut expected = if socket_address.starts_with("http") { + socket_address.to_string().trim_end_matches('/').to_string() + } else { + format!("http://{}", socket_address).trim_end_matches('/').to_string() + }; + expected.push('/'); + + let control_socket = ControlSocket::try_from(socket_address).expect("Error parsing good socket path"); + assert!(!control_socket.is_local_socket(), "Not parsed as a local socket"); + if let Err(e) = control_socket.validate() { + panic!("Socket should be valid: {}", e); + } + } + } + + #[test] + fn can_normalize_wildcard_http_socket_address() { + let socket_address = "*:8080"; + let expected = "http://127.0.0.1:8080/"; + let normalized_result = ControlSocket::normalize_and_parse_http_address(socket_address.to_string()); + let normalized = normalized_result + .expect("Unable to normalize socket address") + .to_string(); + assert_eq!(normalized, expected); + } + + #[test] + fn can_normalize_http_socket_address_with_no_port() { + let socket_address = "http://localhost"; + let expected = "http://localhost:80/"; + let normalized_result = ControlSocket::normalize_and_parse_http_address(socket_address.to_string()); + let normalized = normalized_result + .expect("Unable to normalize socket address") + .to_string(); + assert_eq!(normalized, expected); + } + + #[test] + fn can_normalize_https_socket_address_with_no_port() { + let socket_address = "https://localhost"; + let expected = "https://localhost:443/"; + let normalized_result = ControlSocket::normalize_and_parse_http_address(socket_address.to_string()); + let normalized = normalized_result + .expect("Unable to normalize socket address") + .to_string(); + assert_eq!(normalized, expected); + } + + #[test] + fn can_parse_http_addresses() { + let valid_socket_addresses = vec![ + "http://127.0.0.1:8080", + "https://127.0.0.1:8080", + "http://127.0.0.1:8080/", + "127.0.0.1:8080", + "http://0.0.0.0:8080", + "https://0.0.0.0:8080", + "http://0.0.0.0:8080/", + "0.0.0.0:8080", + "http://localhost:8080", + "https://localhost:8080", + "http://localhost:8080/", + "localhost:8080", + "http://[::1]:8080", + "https://[::1]:8080", + "http://[::1]:8080/", + "[::1]:8080", + "http://[0000:0000:0000:0000:0000:0000:0000:0000]:8080", + "https://[0000:0000:0000:0000:0000:0000:0000:0000]:8080", + "http://[0000:0000:0000:0000:0000:0000:0000:0000]:8080/", + "[0000:0000:0000:0000:0000:0000:0000:0000]:8080", + ]; + for socket_address in valid_socket_addresses { + let mut expected = if socket_address.starts_with("http") { + socket_address.to_string().trim_end_matches('/').to_string() + } else { + format!("http://{}", socket_address).trim_end_matches('/').to_string() + }; + expected.push('/'); + + let normalized = ControlSocket::normalize_and_parse_http_address(socket_address.to_string()) + .expect("Unable to normalize socket address") + .to_string(); + assert_eq!(normalized, expected); + } + } + + fn create_file_socket() -> Result<TempSocket, io::Error> { + let random = Alphanumeric.sample_string(&mut rand::thread_rng(), 10); + let socket_name = format!("unit-client-socket-test-{}.sock", random); + let socket_path = temp_dir().join(socket_name); + let listener = UnixListener::bind(&socket_path)?; + Ok(TempSocket { + socket_path, + _listener: listener, + }) + } + + #[cfg(target_os = "linux")] + fn create_abstract_socket() -> Result<TempSocket, io::Error> { + let random = Alphanumeric.sample_string(&mut rand::thread_rng(), 10); + let socket_name = format!("@unit-client-socket-test-{}.sock", random); + let socket_path = PathBuf::from(socket_name); + let listener = UnixListener::bind(&socket_path)?; + Ok(TempSocket { + socket_path, + _listener: listener, + }) + } +} diff --git a/tools/unitctl/unit-client-rs/src/lib.rs b/tools/unitctl/unit-client-rs/src/lib.rs new file mode 100644 index 00000000..dca8a86f --- /dev/null +++ b/tools/unitctl/unit-client-rs/src/lib.rs @@ -0,0 +1,15 @@ +extern crate custom_error; +extern crate futures; +extern crate hyper; +extern crate hyper_tls; +extern crate hyperlocal; +extern crate serde; +extern crate serde_json; +pub mod control_socket_address; +mod runtime_flags; +pub mod unit_client; +mod unitd_cmd; +pub mod unitd_configure_options; +pub mod unitd_instance; +pub mod unitd_process; +mod unitd_process_user; diff --git a/tools/unitctl/unit-client-rs/src/runtime_flags.rs b/tools/unitctl/unit-client-rs/src/runtime_flags.rs new file mode 100644 index 00000000..7b31274d --- /dev/null +++ b/tools/unitctl/unit-client-rs/src/runtime_flags.rs @@ -0,0 +1,90 @@ +use std::borrow::Cow; +use std::fmt; +use std::fmt::Display; +use std::path::{Path, PathBuf}; + +#[derive(Debug, Clone)] +pub struct RuntimeFlags { + pub flags: Cow<'static, str>, +} + +impl Display for RuntimeFlags { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", self.flags) + } +} + +impl RuntimeFlags { + pub fn new<S>(flags: S) -> RuntimeFlags + where + S: Into<String>, + { + RuntimeFlags { + flags: Cow::from(flags.into()), + } + } + + pub fn has_flag(&self, flag_name: &str) -> bool { + self.flags.contains(format!("--{}", flag_name).as_str()) + } + + pub fn get_flag_value(&self, flag_name: &str) -> Option<String> { + let flag_parts = self.flags.split_ascii_whitespace().collect::<Vec<&str>>(); + for (i, flag) in flag_parts.iter().enumerate() { + if let Some(name) = flag.strip_prefix("--") { + /* If there is no flag value after the current one, there is by definition no + * flag value for the current flag. */ + let index_lt_len = flag_parts.len() > i + 1; + if index_lt_len { + let next_value_isnt_flag = !flag_parts[i + 1].starts_with("--"); + if name.eq(flag_name) && next_value_isnt_flag { + return Some(flag_parts[i + 1].to_string()); + } + } + } + } + None + } + + pub fn control_api_socket_address(&self) -> Option<String> { + self.get_flag_value("control") + } + + pub fn pid_path(&self) -> Option<Box<Path>> { + self.get_flag_value("pid") + .map(PathBuf::from) + .map(PathBuf::into_boxed_path) + } + + pub fn log_path(&self) -> Option<Box<Path>> { + self.get_flag_value("log") + .map(PathBuf::from) + .map(PathBuf::into_boxed_path) + } + + pub fn modules_directory(&self) -> Option<Box<Path>> { + self.get_flag_value("modules") + .map(PathBuf::from) + .map(PathBuf::into_boxed_path) + } + + pub fn state_directory(&self) -> Option<Box<Path>> { + self.get_flag_value("state") + .map(PathBuf::from) + .map(PathBuf::into_boxed_path) + } + + pub fn tmp_directory(&self) -> Option<Box<Path>> { + self.get_flag_value("tmp") + .map(PathBuf::from) + .map(PathBuf::into_boxed_path) + } + + pub fn user(&self) -> Option<String> { + self.get_flag_value("user").map(String::from) + } + + pub fn group(&self) -> Option<String> { + self.get_flag_value("group").map(String::from) + } +} diff --git a/tools/unitctl/unit-client-rs/src/unit_client.rs b/tools/unitctl/unit-client-rs/src/unit_client.rs new file mode 100644 index 00000000..b856fd20 --- /dev/null +++ b/tools/unitctl/unit-client-rs/src/unit_client.rs @@ -0,0 +1,393 @@ +use std::collections::HashMap; +use std::error::Error as StdError; +use std::fmt::Debug; +use std::future::Future; +use std::rc::Rc; +use std::{fmt, io}; + +use custom_error::custom_error; +use hyper::body::{Buf, HttpBody}; +use hyper::client::{HttpConnector, ResponseFuture}; +use hyper::Error as HyperError; +use hyper::{http, Body, Client, Request}; +use hyper_tls::HttpsConnector; +use hyperlocal::{UnixClientExt, UnixConnector}; +use serde::{Deserialize, Serialize}; +use tokio::runtime::Runtime; + +use crate::control_socket_address::ControlSocket; +use unit_openapi::apis::configuration::Configuration; +use unit_openapi::apis::{Error as OpenAPIError, StatusApi}; +use unit_openapi::apis::{ListenersApi, ListenersApiClient, StatusApiClient}; +use unit_openapi::models::{ConfigListener, Status}; + +const USER_AGENT: &str = concat!("UNIT CLI/", env!("CARGO_PKG_VERSION"), "/rust"); + +custom_error! {pub UnitClientError + OpenAPIError { source: OpenAPIError } = "OpenAPI error", + JsonError { source: serde_json::Error, + path: String} = "JSON error [path={path}]", + HyperError { source: hyper::Error, + control_socket_address: String, + path: String} = "Communications error [control_socket_address={control_socket_address}, path={path}]: {source}", + HttpRequestError { source: http::Error, + path: String} = "HTTP error [path={path}]", + HttpResponseError { status: http::StatusCode, + path: String, + body: String} = "HTTP response error [path={path}, status={status}]:\n{body}", + HttpResponseJsonBodyError { status: http::StatusCode, + path: String, + error: String, + detail: String} = "HTTP response error [path={path}, status={status}]:\n Error: {error}\n Detail: {detail}", + IoError { source: io::Error, socket: String } = "IO error [socket={socket}]", + UnixSocketAddressError { + source: io::Error, + control_socket_address: String + } = "Invalid unix domain socket address [control_socket_address={control_socket_address}]", + SocketPermissionsError { control_socket_address: String } = + "Insufficient permissions to connect to control socket [control_socket_address={control_socket_address}]", + UnixSocketNotFound { control_socket_address: String } = "Unix socket not found [control_socket_address={control_socket_address}]", + TcpSocketAddressUriError { + source: http::uri::InvalidUri, + control_socket_address: String + } = "Invalid TCP socket address [control_socket_address={control_socket_address}]", + TcpSocketAddressParseError { + message: String, + control_socket_address: String + } = "Invalid TCP socket address [control_socket_address={control_socket_address}]: {message}", + TcpSocketAddressNoPortError { + control_socket_address: String + } = "TCP socket address does not have a port specified [control_socket_address={control_socket_address}]", + UnitdProcessParseError { + message: String, + pid: u64 + } = "{message} for [pid={pid}]", + UnitdProcessExecError { + source: Box<dyn StdError>, + message: String, + executable_path: String, + pid: u64 + } = "{message} for [pid={pid}, executable_path={executable_path}]: {source}", +} + +impl UnitClientError { + fn new(error: HyperError, control_socket_address: String, path: String) -> Self { + if error.is_connect() { + if let Some(source) = error.source() { + if let Some(io_error) = source.downcast_ref::<io::Error>() { + if io_error.kind().eq(&io::ErrorKind::PermissionDenied) { + return UnitClientError::SocketPermissionsError { control_socket_address }; + } + } + } + } + + UnitClientError::HyperError { + source: error, + control_socket_address, + path, + } + } +} + +macro_rules! new_openapi_client_from_hyper_client { + ($unit_client:expr, $hyper_client: ident, $api_client:ident, $api_trait:ident) => {{ + let config = Configuration { + base_path: $unit_client.control_socket.create_uri_with_path("/").to_string(), + user_agent: Some(format!("{}/OpenAPI-Generator", USER_AGENT).to_owned()), + client: $hyper_client.clone(), + basic_auth: None, + oauth_access_token: None, + api_key: None, + }; + let rc_config = Rc::new(config); + Box::new($api_client::new(rc_config)) as Box<dyn $api_trait> + }}; +} + +macro_rules! new_openapi_client { + ($unit_client:expr, $api_client:ident, $api_trait:ident) => { + match &*$unit_client.client { + RemoteClient::Tcp { client } => { + new_openapi_client_from_hyper_client!($unit_client, client, $api_client, $api_trait) + } + RemoteClient::Unix { client } => { + new_openapi_client_from_hyper_client!($unit_client, client, $api_client, $api_trait) + } + } + }; +} + +#[derive(Clone)] +pub enum RemoteClient<B> +where + B: HttpBody + Send + 'static, + B::Data: Send, + B::Error: Into<Box<dyn StdError + Send + Sync>>, +{ + Unix { + client: Client<UnixConnector, B>, + }, + Tcp { + client: Client<HttpsConnector<HttpConnector>, B>, + }, +} + +impl<B> RemoteClient<B> +where + B: HttpBody + Send + 'static, + B::Data: Send, + B::Error: Into<Box<dyn StdError + Send + Sync>>, +{ + fn client_name(&self) -> &str { + match self { + RemoteClient::Unix { .. } => "Client<UnixConnector, Body>", + RemoteClient::Tcp { .. } => "Client<HttpsConnector<HttpConnector>, Body>", + } + } + + pub fn request(&self, req: Request<B>) -> ResponseFuture { + match self { + RemoteClient::Unix { client } => client.request(req), + RemoteClient::Tcp { client } => client.request(req), + } + } +} + +impl<B> Debug for RemoteClient<B> +where + B: HttpBody + Send + 'static, + B::Data: Send, + B::Error: Into<Box<dyn StdError + Send + Sync>>, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.client_name()) + } +} + +#[derive(Debug)] +pub struct UnitClient { + pub control_socket: ControlSocket, + /// A `current_thread` runtime for executing operations on the + /// asynchronous client in a blocking manner. + rt: Runtime, + /// Client for communicating with the control API over the UNIX domain socket + client: Box<RemoteClient<Body>>, +} + +impl UnitClient { + pub fn new_with_runtime(control_socket: ControlSocket, runtime: Runtime) -> Self { + if control_socket.is_local_socket() { + Self::new_unix(control_socket, runtime) + } else { + Self::new_http(control_socket, runtime) + } + } + + pub fn new(control_socket: ControlSocket) -> Self { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("Unable to create a current_thread runtime"); + Self::new_with_runtime(control_socket, runtime) + } + + pub fn new_http(control_socket: ControlSocket, runtime: Runtime) -> Self { + let remote_client = Client::builder().build(HttpsConnector::new()); + Self { + control_socket, + rt: runtime, + client: Box::from(RemoteClient::Tcp { client: remote_client }), + } + } + + pub fn new_unix(control_socket: ControlSocket, runtime: Runtime) -> UnitClient { + let remote_client = Client::unix(); + + Self { + control_socket, + rt: runtime, + client: Box::from(RemoteClient::Unix { client: remote_client }), + } + } + + /// Sends a request to UNIT and deserializes the JSON response body into the value of type `RESPONSE`. + pub fn send_request_and_deserialize_response<RESPONSE: for<'de> serde::Deserialize<'de>>( + &self, + mut request: Request<Body>, + ) -> Result<RESPONSE, UnitClientError> { + let uri = request.uri().clone(); + let path: &str = uri.path(); + + request.headers_mut().insert("User-Agent", USER_AGENT.parse().unwrap()); + + let response_future = self.client.request(request); + + self.rt.block_on(async { + let response = response_future + .await + .map_err(|error| UnitClientError::new(error, self.control_socket.to_string(), path.to_string()))?; + + let status = response.status(); + let body = hyper::body::aggregate(response) + .await + .map_err(|error| UnitClientError::new(error, self.control_socket.to_string(), path.to_string()))?; + let reader = &mut body.reader(); + if !status.is_success() { + let error: HashMap<String, String> = + serde_json::from_reader(reader).map_err(|error| UnitClientError::JsonError { + source: error, + path: path.to_string(), + })?; + + return Err(UnitClientError::HttpResponseJsonBodyError { + status, + path: path.to_string(), + error: error.get("error").unwrap_or(&"Unknown error".into()).to_string(), + detail: error.get("detail").unwrap_or(&"".into()).to_string(), + }); + } + serde_json::from_reader(reader).map_err(|error| UnitClientError::JsonError { + source: error, + path: path.to_string(), + }) + }) + } + + pub fn listeners_api(&self) -> Box<dyn ListenersApi + 'static> { + new_openapi_client!(self, ListenersApiClient, ListenersApi) + } + + pub fn listeners(&self) -> Result<HashMap<String, ConfigListener>, Box<UnitClientError>> { + let list_listeners = self.listeners_api().get_listeners(); + self.execute_openapi_future(list_listeners) + } + + pub fn execute_openapi_future<F: Future<Output = Result<R, OpenAPIError>>, R: for<'de> serde::Deserialize<'de>>( + &self, + future: F, + ) -> Result<R, Box<UnitClientError>> { + self.rt.block_on(future).map_err(|error| { + let remapped_error = if let OpenAPIError::Hyper(hyper_error) = error { + UnitClientError::new(hyper_error, self.control_socket.to_string(), "".to_string()) + } else { + UnitClientError::OpenAPIError { source: error } + }; + + Box::new(remapped_error) + }) + } + + pub fn status_api(&self) -> Box<dyn StatusApi + 'static> { + new_openapi_client!(self, StatusApiClient, StatusApi) + } + + pub fn status(&self) -> Result<Status, Box<UnitClientError>> { + let status = self.status_api().get_status(); + self.execute_openapi_future(status) + } + + pub fn is_running(&self) -> bool { + self.status().is_ok() + } +} + +pub type UnitSerializableMap = HashMap<String, serde_json::Value>; + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct UnitStatus { + pub connections: UnitStatusConnections, + pub requests: UnitStatusRequests, + pub applications: HashMap<String, UnitStatusApplication>, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct UnitStatusConnections { + #[serde(default)] + pub closed: usize, + #[serde(default)] + pub idle: usize, + #[serde(default)] + pub active: usize, + #[serde(default)] + pub accepted: usize, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct UnitStatusRequests { + #[serde(default)] + pub active: usize, + #[serde(default)] + pub total: usize, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct UnitStatusApplication { + #[serde(default)] + pub processes: HashMap<String, usize>, + #[serde(default)] + pub requests: HashMap<String, usize>, +} + +#[cfg(test)] +mod tests { + use crate::unitd_instance::UnitdInstance; + + use super::*; + // Integration tests + + #[test] + fn can_connect_to_unit_api() { + match UnitdInstance::running_unitd_instances().first() { + Some(unit_instance) => { + let control_api_socket_address = unit_instance + .control_api_socket_address() + .expect("No control API socket path found"); + let control_socket = ControlSocket::try_from(control_api_socket_address) + .expect("Unable to parse control socket address"); + let unit_client = UnitClient::new(control_socket); + assert!(unit_client.is_running()); + } + None => { + eprintln!("No running unitd instances found - skipping test"); + } + } + } + + #[test] + fn can_get_unit_status() { + match UnitdInstance::running_unitd_instances().first() { + Some(unit_instance) => { + let control_api_socket_address = unit_instance + .control_api_socket_address() + .expect("No control API socket path found"); + let control_socket = ControlSocket::try_from(control_api_socket_address) + .expect("Unable to parse control socket address"); + let unit_client = UnitClient::new(control_socket); + let status = unit_client.status().expect("Unable to get unit status"); + println!("Unit status: {:?}", status); + } + None => { + eprintln!("No running unitd instances found - skipping test"); + } + } + } + + #[test] + fn can_get_unit_listeners() { + match UnitdInstance::running_unitd_instances().first() { + Some(unit_instance) => { + let control_api_socket_address = unit_instance + .control_api_socket_address() + .expect("No control API socket path found"); + let control_socket = ControlSocket::try_from(control_api_socket_address) + .expect("Unable to parse control socket address"); + let unit_client = UnitClient::new(control_socket); + unit_client.listeners().expect("Unable to get Unit listeners"); + } + None => { + eprintln!("No running unitd instances found - skipping test"); + } + } + } +} diff --git a/tools/unitctl/unit-client-rs/src/unitd_cmd.rs b/tools/unitctl/unit-client-rs/src/unitd_cmd.rs new file mode 100644 index 00000000..c4883ed5 --- /dev/null +++ b/tools/unitctl/unit-client-rs/src/unitd_cmd.rs @@ -0,0 +1,85 @@ +use std::error::Error as StdError; +use std::io::{Error as IoError, ErrorKind}; + +use crate::runtime_flags::RuntimeFlags; +use std::path::{Path, PathBuf}; + +#[derive(Debug, Clone)] +pub struct UnitdCmd { + pub(crate) process_executable_path: Option<Box<Path>>, + pub version: Option<String>, + pub flags: Option<RuntimeFlags>, +} + +impl UnitdCmd { + pub(crate) fn new<S>(full_cmd: S, binary_name: &str) -> Result<UnitdCmd, Box<dyn StdError>> + where + S: Into<String>, + { + let process_cmd: String = full_cmd.into(); + let parsable = process_cmd + .strip_prefix("unit: main v") + .and_then(|s| s.strip_suffix(']')); + if parsable.is_none() { + let msg = format!("cmd does not have the expected format: {}", process_cmd); + return Err(IoError::new(ErrorKind::InvalidInput, msg).into()); + } + let parts = parsable + .expect("Unable to parse cmd") + .splitn(2, " [") + .collect::<Vec<&str>>(); + if parts.len() != 2 { + let msg = format!("cmd does not have the expected format: {}", process_cmd); + return Err(IoError::new(ErrorKind::InvalidInput, msg).into()); + } + let version: Option<String> = Some(parts[0].to_string()); + let executable_path = UnitdCmd::parse_executable_path_from_cmd(parts[1], binary_name); + let flags = UnitdCmd::parse_runtime_flags_from_cmd(parts[1]); + + Ok(UnitdCmd { + process_executable_path: executable_path, + version, + flags, + }) + } + + fn parse_executable_path_from_cmd<S>(full_cmd: S, binary_name: &str) -> Option<Box<Path>> + where + S: Into<String>, + { + let cmd = full_cmd.into(); + if cmd.is_empty() { + return None; + } + + let split = cmd.splitn(2, binary_name).collect::<Vec<&str>>(); + if split.is_empty() { + return None; + } + + let path = format!("{}{}", split[0], binary_name); + Some(PathBuf::from(path).into_boxed_path()) + } + + fn parse_runtime_flags_from_cmd<S>(full_cmd: S) -> Option<RuntimeFlags> + where + S: Into<String>, + { + let cmd = full_cmd.into(); + if cmd.is_empty() { + return None; + } + // Split out everything in between the brackets [ and ] + let split = cmd.trim_end_matches(']').splitn(2, '[').collect::<Vec<&str>>(); + if split.is_empty() { + return None; + } + /* Now we need to parse a string like this: + * ./sbin/unitd --no-daemon --tmp /tmp + * and only return what is after the invoking command */ + split[0] + .find("--") + .map(|index| cmd[index..].to_string()) + .map(RuntimeFlags::new) + } +} diff --git a/tools/unitctl/unit-client-rs/src/unitd_configure_options.rs b/tools/unitctl/unit-client-rs/src/unitd_configure_options.rs new file mode 100644 index 00000000..88ab1101 --- /dev/null +++ b/tools/unitctl/unit-client-rs/src/unitd_configure_options.rs @@ -0,0 +1,235 @@ +use custom_error::custom_error; +use std::borrow::Cow; +use std::error::Error as stdError; +use std::io::{BufRead, BufReader, Lines}; +use std::path::{Path, PathBuf}; +use std::process::{Command, Stdio}; + +custom_error! {UnitdStderrParseError + VersionNotFound = "Version string output not found", + BuildSettingsNotFound = "Build settings not found" +} + +#[derive(Debug, Clone)] +pub struct UnitdConfigureOptions { + pub version: Cow<'static, str>, + pub all_flags: Cow<'static, str>, +} + +impl UnitdConfigureOptions { + pub fn new(unitd_path: &Path) -> Result<UnitdConfigureOptions, Box<dyn stdError>> { + fn parse_configure_settings_from_unitd_stderr_output<B: BufRead>( + lines: &mut Lines<B>, + ) -> Result<UnitdConfigureOptions, Box<dyn stdError>> { + const VERSION_PREFIX: &str = "unit version: "; + const CONFIGURED_AS_PREFIX: &str = "configured as "; + const CONFIGURE_PREFIX: &str = "configured as ./configure "; + + fn aggregate_parsable_lines( + mut accum: (Option<String>, Option<String>), + line: String, + ) -> (Option<String>, Option<String>) { + if line.starts_with(VERSION_PREFIX) { + accum.0 = line.strip_prefix(VERSION_PREFIX).map(|l| l.to_string()); + } else if line.starts_with(CONFIGURED_AS_PREFIX) { + accum.1 = line.strip_prefix(CONFIGURE_PREFIX).map(|l| l.to_string()); + } + + accum + } + + let options_lines = lines + .filter_map(|line| line.ok()) + .fold((None, None), aggregate_parsable_lines); + + if options_lines.0.is_none() { + return Err(Box::new(UnitdStderrParseError::VersionNotFound) as Box<dyn stdError>); + } else if options_lines.1.is_none() { + return Err(Box::new(UnitdStderrParseError::BuildSettingsNotFound) as Box<dyn stdError>); + } + + Ok(UnitdConfigureOptions { + version: options_lines.0.unwrap().into(), + all_flags: options_lines.1.unwrap().into(), + }) + } + + let program = unitd_path.as_os_str(); + let child = Command::new(program) + .arg("--version") + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn()?; + let output = child.wait_with_output()?; + let err = BufReader::new(&*output.stderr); + parse_configure_settings_from_unitd_stderr_output(&mut err.lines()) + } + + pub fn has_flag(&self, flag_name: &str) -> bool { + self.all_flags + .split_ascii_whitespace() + .any(|flag| flag.starts_with(format!("--{}", flag_name).as_str())) + } + + pub fn get_flag_value(&self, flag_name: &str) -> Option<String> { + self.all_flags + .split_ascii_whitespace() + .find(|flag| flag.starts_with(format!("--{}", flag_name).as_str())) + .and_then(|flag| { + let parts: Vec<&str> = flag.split('=').collect(); + if parts.len() >= 2 { + Some(parts[1].to_owned()) + } else { + None + } + }) + } + + pub fn debug_enabled(&self) -> bool { + self.has_flag("debug") + } + + pub fn openssl_enabled(&self) -> bool { + self.has_flag("openssl") + } + + pub fn prefix_path(&self) -> Option<Box<Path>> { + self.get_flag_value("prefix") + .map(PathBuf::from) + .map(PathBuf::into_boxed_path) + } + + fn join_to_prefix_path<S>(&self, sub_path: S) -> Option<Box<Path>> + where + S: Into<String>, + { + self.prefix_path() + .map(|path| path.join(sub_path.into()).into_boxed_path()) + } + + pub fn default_control_api_socket_address(&self) -> Option<String> { + // If the socket address is specific configured in the configure options, we use + // that. Otherwise, we use the default path as assumed to be unix:$prefix/control.unit.sock. + match self.get_flag_value("control") { + Some(socket_address) => Some(socket_address), + None => { + // Give up if the unitd is compiled with unix sockets disabled + if self.has_flag("no-unix-sockets") { + return None; + } + let socket_path = self.join_to_prefix_path("control.unit.sock"); + socket_path.map(|path| format!("unix:{}", path.to_string_lossy())) + } + } + } + + pub fn default_pid_path(&self) -> Option<Box<Path>> { + match self.get_flag_value("pid") { + Some(pid_path) => self.join_to_prefix_path(pid_path), + None => self.join_to_prefix_path("unit.pid"), + } + } + + pub fn default_log_path(&self) -> Option<Box<Path>> { + match self.get_flag_value("log") { + Some(pid_path) => self.join_to_prefix_path(pid_path), + None => self.join_to_prefix_path("unit.log"), + } + } + + pub fn default_modules_directory(&self) -> Option<Box<Path>> { + match self.get_flag_value("modules") { + Some(modules_dir_name) => self.join_to_prefix_path(modules_dir_name), + None => self.join_to_prefix_path("modules"), + } + } + + pub fn default_state_directory(&self) -> Option<Box<Path>> { + match self.get_flag_value("state") { + Some(state_dir_name) => self.join_to_prefix_path(state_dir_name), + None => self.join_to_prefix_path("state"), + } + } + + pub fn default_tmp_directory(&self) -> Option<Box<Path>> { + match self.get_flag_value("tmp") { + Some(tmp_dir_name) => self.join_to_prefix_path(tmp_dir_name), + None => self.join_to_prefix_path("tmp"), + } + } + pub fn default_user(&self) -> Option<String> { + self.get_flag_value("user").map(String::from) + } + pub fn default_group(&self) -> Option<String> { + self.get_flag_value("group").map(String::from) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::unitd_instance; + use crate::unitd_instance::UNITD_PATH_ENV_KEY; + + #[test] + fn can_detect_key() { + let options = UnitdConfigureOptions { + version: Default::default(), + all_flags: Cow::from("--debug --openssl --prefix=/opt/unit"), + }; + assert!(options.has_flag("debug")); + assert!(options.has_flag("openssl")); + assert!(options.has_flag("prefix")); + assert!(!options.has_flag("fobar")); + } + + #[test] + fn can_get_flag_value_by_key() { + let expected = "/opt/unit"; + let options = UnitdConfigureOptions { + version: Default::default(), + all_flags: Cow::from("--debug --openssl --prefix=/opt/unit"), + }; + + let actual = options.get_flag_value("prefix"); + assert_eq!(expected, actual.unwrap()) + } + + #[test] + fn can_get_prefix_path() { + let expected: Box<Path> = Path::new("/opt/unit").into(); + let options = UnitdConfigureOptions { + version: Default::default(), + all_flags: Cow::from("--debug --openssl --prefix=/opt/unit"), + }; + + let actual = options.prefix_path(); + assert_eq!(expected, actual.unwrap()) + } + + #[test] + fn can_parse_complicated_configure_options() { + let expected: Box<Path> = Path::new("/usr").into(); + let options = UnitdConfigureOptions { + version: Default::default(), + all_flags: Cow::from("--prefix=/usr --state=/var/lib/unit --control=unix:/var/run/control.unit.sock --pid=/var/run/unit.pid --log=/var/log/unit.log --tmp=/var/tmp --user=unit --group=unit --tests --openssl --modules=/usr/lib/unit/modules --libdir=/usr/lib/x86_64-linux-gnu --cc-opt='-g -O2 -fdebug-prefix-map=/data/builder/debuild/unit-1.28.0/pkg/deb/debuild/unit-1.28.0=. -specs=/usr/share/dpkg/no-pie-compile.specs -fstack-protector-strong -Wformat -Werror=format-security -Wp,-D_FORTIFY_SOURCE=2 -fPIC' --ld-opt='-Wl,-Bsymbolic-functions -specs=/usr/share/dpkg/no-pie-link.specs -Wl,-z,relro -Wl,-z,now -Wl,--as-needed -pie' +"), + }; + + let actual = options.prefix_path(); + assert_eq!(expected, actual.unwrap()) + } + + #[test] + fn can_run_unitd() { + let specific_path = std::env::var(UNITD_PATH_ENV_KEY).map_err(|error| Box::new(error) as Box<dyn stdError>); + let unitd_path = unitd_instance::find_executable_path(specific_path); + let config_options = UnitdConfigureOptions::new(&unitd_path.unwrap()); + match config_options { + Ok(options) => { + println!("{:?}", options) + } + Err(error) => panic!("{}", error), + }; + } +} diff --git a/tools/unitctl/unit-client-rs/src/unitd_instance.rs b/tools/unitctl/unit-client-rs/src/unitd_instance.rs new file mode 100644 index 00000000..9467fcb7 --- /dev/null +++ b/tools/unitctl/unit-client-rs/src/unitd_instance.rs @@ -0,0 +1,360 @@ +use crate::unit_client::UnitClientError; +use serde::ser::SerializeMap; +use serde::{Serialize, Serializer}; +use std::error::Error as StdError; +use std::path::{Path, PathBuf}; +use std::{fmt, io}; +use which::which; + +use crate::runtime_flags::RuntimeFlags; +use crate::unitd_configure_options::UnitdConfigureOptions; +use crate::unitd_process::UnitdProcess; + +pub const UNITD_PATH_ENV_KEY: &str = "UNITD_PATH"; +pub const UNITD_BINARY_NAMES: [&str; 2] = ["unitd", "unitd-debug"]; + +#[derive(Debug)] +pub struct UnitdInstance { + pub process: UnitdProcess, + pub configure_options: Option<UnitdConfigureOptions>, + pub errors: Vec<UnitClientError>, +} + +impl Serialize for UnitdInstance { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: Serializer, + { + let mut state = serializer.serialize_map(Some(15))?; + let runtime_flags = self + .process + .cmd() + .and_then(|cmd| cmd.flags) + .map(|flags| flags.to_string()); + + let configure_flags = self.configure_options.as_ref().map(|opts| opts.all_flags.clone()); + + state.serialize_entry("pid", &self.process.process_id)?; + state.serialize_entry("version", &self.version())?; + state.serialize_entry("user", &self.process.user)?; + state.serialize_entry("effective_user", &self.process.effective_user)?; + state.serialize_entry("executable", &self.process.executable_path())?; + state.serialize_entry("control_socket", &self.control_api_socket_address())?; + state.serialize_entry("child_pids", &self.process.child_pids)?; + state.serialize_entry("log_path", &self.log_path())?; + state.serialize_entry("pid_path", &self.pid_path())?; + state.serialize_entry("modules_directory", &self.modules_directory())?; + state.serialize_entry("state_directory", &self.state_directory())?; + state.serialize_entry("tmp_directory", &self.tmp_directory())?; + state.serialize_entry("runtime_flags", &runtime_flags)?; + state.serialize_entry("configure_flags", &configure_flags)?; + let string_errors = &self.errors.iter().map(|e| e.to_string()).collect::<Vec<String>>(); + state.serialize_entry("errors", string_errors)?; + + state.end() + } +} + +impl UnitdInstance { + pub fn running_unitd_instances() -> Vec<UnitdInstance> { + Self::collect_unitd_processes(UnitdProcess::find_unitd_processes()) + } + + /// Find all running unitd processes and convert them into UnitdInstances and filter + /// out all errors by printing them to stderr and leaving errored instances out of + /// the returned vector. + fn collect_unitd_processes(processes: Vec<UnitdProcess>) -> Vec<UnitdInstance> { + Self::map_processes_to_instances(processes).into_iter().collect() + } + + fn map_processes_to_instances(processes: Vec<UnitdProcess>) -> Vec<UnitdInstance> { + fn unitd_path_from_process(process: &UnitdProcess) -> Result<Box<Path>, UnitClientError> { + match process.executable_path() { + Some(executable_path) => { + let is_absolute_working_dir = process + .working_dir + .as_ref() + .map(|p| p.is_absolute()) + .unwrap_or_default(); + if executable_path.is_absolute() { + Ok(executable_path.to_owned()) + } else if executable_path.is_relative() && is_absolute_working_dir { + let new_path = process + .working_dir + .as_ref() + .unwrap() + .join(executable_path) + .canonicalize() + .map(|path| path.into_boxed_path()) + .map_err(|error| UnitClientError::UnitdProcessParseError { + message: format!("Error canonicalizing unitd executable path: {}", error), + pid: process.process_id, + })?; + Ok(new_path) + } else { + Err(UnitClientError::UnitdProcessParseError { + message: "Unable to get absolute unitd executable path from process".to_string(), + pid: process.process_id, + }) + } + } + None => Err(UnitClientError::UnitdProcessParseError { + message: "Unable to get unitd executable path from process".to_string(), + pid: process.process_id, + }), + } + } + + fn map_process_to_unitd_instance(process: &UnitdProcess) -> UnitdInstance { + match unitd_path_from_process(process) { + Ok(unitd_path) => match UnitdConfigureOptions::new(&unitd_path.clone().into_path_buf()) { + Ok(configure_options) => UnitdInstance { + process: process.to_owned(), + configure_options: Some(configure_options), + errors: vec![], + }, + Err(error) => { + let error = UnitClientError::UnitdProcessExecError { + source: error, + executable_path: unitd_path.to_string_lossy().parse().unwrap_or_default(), + message: "Error running unitd binary to get configure options".to_string(), + pid: process.process_id, + }; + UnitdInstance { + process: process.to_owned(), + configure_options: None, + errors: vec![error], + } + } + }, + Err(err) => UnitdInstance { + process: process.to_owned(), + configure_options: None, + errors: vec![err], + }, + } + } + + processes + .iter() + // This converts processes into a UnitdInstance + .map(map_process_to_unitd_instance) + .collect() + } + + fn version(&self) -> Option<String> { + match self.process.cmd()?.version { + Some(version) => Some(version), + None => self.configure_options.as_ref().map(|opts| opts.version.to_string()), + } + } + + fn flag_or_default_option<R>( + &self, + read_flag: fn(RuntimeFlags) -> Option<R>, + read_opts: fn(UnitdConfigureOptions) -> Option<R>, + ) -> Option<R> { + self.process + .cmd()? + .flags + .and_then(read_flag) + .or_else(|| self.configure_options.to_owned().and_then(read_opts)) + } + + pub fn control_api_socket_address(&self) -> Option<String> { + self.flag_or_default_option( + |flags| flags.control_api_socket_address(), + |opts| opts.default_control_api_socket_address(), + ) + } + + pub fn pid_path(&self) -> Option<Box<Path>> { + self.flag_or_default_option(|flags| flags.pid_path(), |opts| opts.default_pid_path()) + } + + pub fn log_path(&self) -> Option<Box<Path>> { + self.flag_or_default_option(|flags| flags.log_path(), |opts| opts.default_log_path()) + } + + pub fn modules_directory(&self) -> Option<Box<Path>> { + self.flag_or_default_option( + |flags| flags.modules_directory(), + |opts| opts.default_modules_directory(), + ) + } + + pub fn state_directory(&self) -> Option<Box<Path>> { + self.flag_or_default_option(|flags| flags.state_directory(), |opts| opts.default_state_directory()) + } + + pub fn tmp_directory(&self) -> Option<Box<Path>> { + self.flag_or_default_option(|flags| flags.tmp_directory(), |opts| opts.default_tmp_directory()) + } +} + +impl fmt::Display for UnitdInstance { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + const UNKNOWN: &str = "[unknown]"; + let version = self.version().unwrap_or_else(|| String::from("[unknown]")); + let runtime_flags = self + .process + .cmd() + .and_then(|cmd| cmd.flags) + .map(|flags| flags.to_string()) + .unwrap_or_else(|| UNKNOWN.into()); + let configure_flags = self + .configure_options + .as_ref() + .map(|opts| opts.all_flags.clone()) + .unwrap_or_else(|| UNKNOWN.into()); + let unitd_path: String = self + .process + .executable_path() + .map(|p| p.to_string_lossy().into()) + .unwrap_or_else(|| UNKNOWN.into()); + let working_dir: String = self + .process + .working_dir + .as_ref() + .map(|p| p.to_string_lossy().into()) + .unwrap_or_else(|| UNKNOWN.into()); + let socket_address = self.control_api_socket_address().unwrap_or_else(|| UNKNOWN.to_string()); + let child_pids = self + .process + .child_pids + .iter() + .map(u64::to_string) + .collect::<Vec<String>>() + .join(", "); + + writeln!( + f, + "{} instance [pid: {}, version: {}]:", + self.process.binary_name, self.process.process_id, version + )?; + writeln!(f, " Executable: {}", unitd_path)?; + writeln!(f, " Process working directory: {}", working_dir)?; + write!(f, " Process ownership: ")?; + if let Some(user) = &self.process.user { + writeln!(f, "name: {}, uid: {}, gid: {}", user.name, user.uid, user.gid)?; + } else { + writeln!(f, "{}", UNKNOWN)?; + } + write!(f, " Process effective ownership: ")?; + if let Some(user) = &self.process.effective_user { + writeln!(f, "name: {}, uid: {}, gid: {}", user.name, user.uid, user.gid)?; + } else { + writeln!(f, "{}", UNKNOWN)?; + } + + writeln!(f, " API control unix socket: {}", socket_address)?; + writeln!(f, " Child processes ids: {}", child_pids)?; + writeln!(f, " Runtime flags: {}", runtime_flags)?; + write!(f, " Configure options: {}", configure_flags)?; + + if !self.errors.is_empty() { + write!(f, "\n Errors:")?; + for error in &self.errors { + write!(f, "\n {}", error)?; + } + } + + Ok(()) + } +} + +pub fn find_executable_path(specific_path: Result<String, Box<dyn StdError>>) -> Result<PathBuf, Box<dyn StdError>> { + fn find_unitd_in_system_path() -> Vec<PathBuf> { + UNITD_BINARY_NAMES + .iter() + .map(which) + .filter_map(Result::ok) + .collect::<Vec<PathBuf>>() + } + + match specific_path { + Ok(path) => Ok(PathBuf::from(path)), + Err(_) => { + let unitd_paths = find_unitd_in_system_path(); + if unitd_paths.is_empty() { + let err_msg = format!( + "Could not find unitd in system path or in UNITD_PATH environment variable. Searched for: {:?}", + UNITD_BINARY_NAMES + ); + let err = io::Error::new(io::ErrorKind::NotFound, err_msg); + Err(Box::from(err)) + } else { + Ok(unitd_paths[0].clone()) + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use rand::rngs::StdRng; + use rand::{RngCore, SeedableRng}; + + // We don't need a secure seed for testing, in fact it is better that we have a + // predictable value + const SEED: [u8; 32] = [ + 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, + 30, 31, + ]; + #[test] + fn can_find_unitd_instances() { + UnitdInstance::running_unitd_instances().iter().for_each(|p| { + println!("{:?}", p); + println!("Runtime Flags: {:?}", p.process.cmd().map(|c| c.flags)); + println!("Temp directory: {:?}", p.tmp_directory()); + }) + } + + fn mock_process<S: Into<String>>( + rng: &mut StdRng, + binary_name: S, + executable_path: Option<String>, + ) -> UnitdProcess { + UnitdProcess { + process_id: rng.next_u32() as u64, + binary_name: binary_name.into(), + executable_path: executable_path.map(|p| Box::from(Path::new(&p))), + environ: vec![], + all_cmds: vec![], + working_dir: Some(Box::from(Path::new("/opt/unit"))), + child_pids: vec![], + user: None, + effective_user: None, + } + } + + #[test] + fn will_list_without_errors_valid_processes() { + let specific_path = std::env::var(UNITD_PATH_ENV_KEY).map_err(|error| Box::new(error) as Box<dyn StdError>); + let binding = match find_executable_path(specific_path) { + Ok(path) => path, + Err(error) => { + eprintln!("Could not find unitd executable path: {} - skipping test", error); + return; + } + }; + let binary_name = binding + .file_name() + .expect("Could not get binary name") + .to_string_lossy() + .to_string(); + let unitd_path = binding.to_string_lossy(); + let mut rng: StdRng = SeedableRng::from_seed(SEED); + + let processes = vec![ + mock_process(&mut rng, &binary_name, Some(unitd_path.to_string())), + mock_process(&mut rng, &binary_name, Some(unitd_path.to_string())), + ]; + let instances = UnitdInstance::collect_unitd_processes(processes); + // assert_eq!(instances.len(), 3); + instances.iter().for_each(|p| { + assert_eq!(p.errors.len(), 0, "Expected no errors, got: {:?}", p.errors); + }) + } +} diff --git a/tools/unitctl/unit-client-rs/src/unitd_process.rs b/tools/unitctl/unit-client-rs/src/unitd_process.rs new file mode 100644 index 00000000..b8604e89 --- /dev/null +++ b/tools/unitctl/unit-client-rs/src/unitd_process.rs @@ -0,0 +1,170 @@ +use crate::unitd_cmd::UnitdCmd; +use crate::unitd_instance::UNITD_BINARY_NAMES; +use crate::unitd_process_user::UnitdProcessUser; +use std::collections::HashMap; +use std::path::Path; +use sysinfo::{Pid, Process, ProcessRefreshKind, System, UpdateKind, Users}; + +#[derive(Debug, Clone)] +pub struct UnitdProcess { + pub binary_name: String, + pub process_id: u64, + pub executable_path: Option<Box<Path>>, + pub environ: Vec<String>, + pub all_cmds: Vec<String>, + pub working_dir: Option<Box<Path>>, + pub child_pids: Vec<u64>, + pub user: Option<UnitdProcessUser>, + pub effective_user: Option<UnitdProcessUser>, +} + +impl UnitdProcess { + pub fn find_unitd_processes() -> Vec<UnitdProcess> { + let process_refresh_kind = ProcessRefreshKind::new() + .with_cmd(UpdateKind::Always) + .with_cwd(UpdateKind::Always) + .with_exe(UpdateKind::Always) + .with_user(UpdateKind::Always); + let refresh_kind = sysinfo::RefreshKind::new().with_processes(process_refresh_kind); + let sys = System::new_with_specifics(refresh_kind); + let unitd_processes: HashMap<&Pid, &Process> = sys + .processes() + .iter() + .filter(|p| { + let process_name = p.1.name(); + UNITD_BINARY_NAMES.contains(&process_name) + }) + .collect::<HashMap<&Pid, &Process>>(); + let users = Users::new_with_refreshed_list(); + + unitd_processes + .iter() + // Filter out child processes + .filter(|p| { + let parent_pid = p.1.parent(); + match parent_pid { + Some(pid) => !unitd_processes.contains_key(&pid), + None => false, + } + }) + .map(|p| { + let tuple = p.to_owned(); + /* The sysinfo library only supports 32-bit pids, yet larger values are possible + * if the OS is configured to support it, thus we use 64-bit integers internally + * because it is just a matter of time until the library changes to larger values. */ + let pid = *tuple.0; + let process = *tuple.1; + let process_id: u64 = pid.as_u32().into(); + let executable_path: Option<Box<Path>> = process.exe().map(|p| p.to_path_buf().into_boxed_path()); + let environ: Vec<String> = process.environ().into(); + let cmd: Vec<String> = process.cmd().into(); + let working_dir: Option<Box<Path>> = process.cwd().map(|p| p.to_path_buf().into_boxed_path()); + let child_pids = unitd_processes + .iter() + .filter_map(|p| p.to_owned().1.parent()) + .filter(|parent_pid| parent_pid == pid) + .map(|p| p.as_u32() as u64) + .collect::<Vec<u64>>(); + + let user = process + .user_id() + .and_then(|uid| users.get_user_by_id(uid)) + .map(UnitdProcessUser::from); + let effective_user = process + .effective_user_id() + .and_then(|uid| users.get_user_by_id(uid)) + .map(UnitdProcessUser::from); + + UnitdProcess { + binary_name: process.name().to_string(), + process_id, + executable_path, + environ, + all_cmds: cmd, + working_dir, + child_pids, + user, + effective_user, + } + }) + .collect::<Vec<UnitdProcess>>() + } + + pub fn cmd(&self) -> Option<UnitdCmd> { + if self.all_cmds.is_empty() { + return None; + } + + match UnitdCmd::new(self.all_cmds[0].clone(), self.binary_name.as_ref()) { + Ok(cmd) => Some(cmd), + Err(error) => { + eprintln!("Failed to parse process cmd: {}", error); + None + } + } + } + + pub fn executable_path(&self) -> Option<Box<Path>> { + if self.executable_path.is_some() { + return self.executable_path.clone(); + } + self.cmd().and_then(|cmd| cmd.process_executable_path) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn can_parse_runtime_cmd_absolute_path(binary_name: &str) { + let cmd = format!( + "unit: main v1.28.0 [/usr/sbin/{} --log /var/log/unit.log --pid /var/run/unit.pid]", + binary_name + ); + let unitd_cmd = UnitdCmd::new(cmd, binary_name).expect("Failed to parse unitd cmd"); + assert_eq!(unitd_cmd.version.unwrap(), "1.28.0"); + assert_eq!( + unitd_cmd.process_executable_path.unwrap().to_string_lossy(), + format!("/usr/sbin/{}", binary_name) + ); + let flags = unitd_cmd.flags.unwrap(); + assert_eq!(flags.get_flag_value("log").unwrap(), "/var/log/unit.log"); + assert_eq!(flags.get_flag_value("pid").unwrap(), "/var/run/unit.pid"); + } + + fn can_parse_runtime_cmd_relative_path(binary_name: &str) { + let cmd = format!( + "unit: main v1.29.0 [./sbin/{} --no-daemon --tmp /tmp --something]", + binary_name + ); + let unitd_cmd = UnitdCmd::new(cmd, binary_name).expect("Failed to parse unitd cmd"); + assert_eq!(unitd_cmd.version.unwrap(), "1.29.0"); + assert_eq!( + unitd_cmd.process_executable_path.unwrap().to_string_lossy(), + format!("./sbin/{}", binary_name) + ); + let flags = unitd_cmd.flags.unwrap(); + assert_eq!(flags.get_flag_value("tmp").unwrap(), "/tmp"); + assert!(flags.has_flag("something")); + } + + #[test] + fn can_parse_runtime_cmd_unitd_absolute_path() { + can_parse_runtime_cmd_absolute_path("unitd"); + } + + #[test] + fn can_parse_runtime_cmd_unitd_debug_absolute_path() { + can_parse_runtime_cmd_absolute_path("unitd-debug"); + } + + #[test] + fn can_parse_runtime_cmd_unitd_relative_path() { + can_parse_runtime_cmd_relative_path("unitd"); + } + + #[test] + fn can_parse_runtime_cmd_unitd_debug_relative_path() { + can_parse_runtime_cmd_relative_path("unitd-debug"); + } +} diff --git a/tools/unitctl/unit-client-rs/src/unitd_process_user.rs b/tools/unitctl/unit-client-rs/src/unitd_process_user.rs new file mode 100644 index 00000000..c4f9be22 --- /dev/null +++ b/tools/unitctl/unit-client-rs/src/unitd_process_user.rs @@ -0,0 +1,36 @@ +use serde::Serialize; +use std::fmt; +use std::fmt::Display; +use sysinfo::User; + +#[derive(Debug, Clone, Serialize)] +pub struct UnitdProcessUser { + pub name: String, + pub uid: u32, + pub gid: u32, + pub groups: Vec<String>, +} + +impl Display for UnitdProcessUser { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "name: {}, uid: {}, gid: {}, groups: {}", + self.name, + self.uid, + self.gid, + self.groups.join(", ") + ) + } +} + +impl From<&User> for UnitdProcessUser { + fn from(user: &User) -> Self { + UnitdProcessUser { + name: user.name().into(), + uid: *user.id().clone(), + gid: *user.group_id(), + groups: user.groups().iter().map(|g| g.name().into()).collect(), + } + } +} |