summaryrefslogtreecommitdiffhomepage
path: root/tools/unitctl/unit-client-rs
diff options
context:
space:
mode:
Diffstat (limited to 'tools/unitctl/unit-client-rs')
-rw-r--r--tools/unitctl/unit-client-rs/Cargo.toml35
-rw-r--r--tools/unitctl/unit-client-rs/src/control_socket_address.rs569
-rw-r--r--tools/unitctl/unit-client-rs/src/lib.rs16
-rw-r--r--tools/unitctl/unit-client-rs/src/runtime_flags.rs90
-rw-r--r--tools/unitctl/unit-client-rs/src/unit_client.rs424
-rw-r--r--tools/unitctl/unit-client-rs/src/unitd_cmd.rs88
-rw-r--r--tools/unitctl/unit-client-rs/src/unitd_configure_options.rs236
-rw-r--r--tools/unitctl/unit-client-rs/src/unitd_docker.rs456
-rw-r--r--tools/unitctl/unit-client-rs/src/unitd_instance.rs403
-rw-r--r--tools/unitctl/unit-client-rs/src/unitd_process.rs196
-rw-r--r--tools/unitctl/unit-client-rs/src/unitd_process_user.rs36
11 files changed, 2549 insertions, 0 deletions
diff --git a/tools/unitctl/unit-client-rs/Cargo.toml b/tools/unitctl/unit-client-rs/Cargo.toml
new file mode 100644
index 00000000..6d873417
--- /dev/null
+++ b/tools/unitctl/unit-client-rs/Cargo.toml
@@ -0,0 +1,35 @@
+[package]
+name = "unit-client-rs"
+version = "1.33.0"
+authors = ["Elijah Zupancic"]
+edition = "2021"
+license = "Apache-2.0"
+
+[lib]
+name = "unit_client_rs"
+
+[features]
+# this preserves the ordering of json
+default = ["serde_json/preserve_order"]
+
+[dependencies]
+custom_error = "1.9"
+hyper = { version = "0.14", features = ["stream"] }
+hyper-tls = "0.5"
+hyperlocal = "0.8"
+serde = { version = "1.0", features = ["derive"] }
+serde_json = "1.0"
+sysinfo = "0.30.5"
+tokio = { version = "1.34", features = ["macros"] }
+futures = "0.3"
+hex = "0.4"
+which = "5.0"
+
+unit-openapi = { path = "../unit-openapi" }
+rustls = "0.23.5"
+bollard = "0.16.1"
+regex = "1.10.4"
+pbr = "1.1.1"
+
+[dev-dependencies]
+rand = "0.8.5"
diff --git a/tools/unitctl/unit-client-rs/src/control_socket_address.rs b/tools/unitctl/unit-client-rs/src/control_socket_address.rs
new file mode 100644
index 00000000..438ab0ad
--- /dev/null
+++ b/tools/unitctl/unit-client-rs/src/control_socket_address.rs
@@ -0,0 +1,569 @@
+use crate::control_socket_address::ControlSocket::{TcpSocket, UnixLocalAbstractSocket, UnixLocalSocket};
+use crate::control_socket_address::ControlSocketScheme::{HTTP, HTTPS};
+use crate::unit_client::UnitClientError;
+use hyper::http::uri::{Authority, PathAndQuery};
+use hyper::Uri;
+use std::fmt::{Display, Formatter};
+use std::fs;
+use std::os::unix::fs::FileTypeExt;
+use std::path::{PathBuf, MAIN_SEPARATOR};
+
+type AbstractSocketName = String;
+type UnixSocketPath = PathBuf;
+type Port = u16;
+
+#[derive(Debug, Clone)]
+pub enum ControlSocket {
+ UnixLocalAbstractSocket(AbstractSocketName),
+ UnixLocalSocket(UnixSocketPath),
+ TcpSocket(Uri),
+}
+
+#[derive(Debug)]
+pub enum ControlSocketScheme {
+ HTTP,
+ HTTPS,
+}
+
+impl ControlSocketScheme {
+ fn port(&self) -> Port {
+ match self {
+ HTTP => 80,
+ HTTPS => 443,
+ }
+ }
+}
+
+impl Display for ControlSocket {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ match self {
+ UnixLocalAbstractSocket(name) => f.write_fmt(format_args!("unix:@{}", name)),
+ UnixLocalSocket(path) => f.write_fmt(format_args!("unix:{}", path.to_string_lossy())),
+ TcpSocket(uri) => uri.fmt(f),
+ }
+ }
+}
+
+impl From<ControlSocket> for String {
+ fn from(val: ControlSocket) -> Self {
+ val.to_string()
+ }
+}
+
+impl From<ControlSocket> for PathBuf {
+ fn from(val: ControlSocket) -> Self {
+ match val {
+ UnixLocalAbstractSocket(socket_name) => PathBuf::from(format!("@{}", socket_name)),
+ UnixLocalSocket(socket_path) => socket_path,
+ TcpSocket(_) => PathBuf::default(),
+ }
+ }
+}
+
+impl From<ControlSocket> for Uri {
+ fn from(val: ControlSocket) -> Self {
+ val.create_uri_with_path("")
+ }
+}
+
+impl TryFrom<String> for ControlSocket {
+ type Error = UnitClientError;
+
+ fn try_from(socket_address: String) -> Result<Self, Self::Error> {
+ ControlSocket::parse_address(socket_address.as_str())
+ }
+}
+
+impl TryFrom<&str> for ControlSocket {
+ type Error = UnitClientError;
+
+ fn try_from(socket_address: &str) -> Result<Self, Self::Error> {
+ ControlSocket::parse_address(socket_address)
+ }
+}
+
+impl TryFrom<Uri> for ControlSocket {
+ type Error = UnitClientError;
+
+ fn try_from(socket_uri: Uri) -> Result<Self, Self::Error> {
+ match socket_uri.scheme_str() {
+ // URIs with the unix scheme will have a hostname that is a hex encoded string
+ // representing the path to the socket
+ Some("unix") => {
+ let host = match socket_uri.host() {
+ Some(host) => host,
+ None => {
+ return Err(UnitClientError::TcpSocketAddressParseError {
+ message: "No host found in socket address".to_string(),
+ control_socket_address: socket_uri.to_string(),
+ })
+ }
+ };
+ let bytes = hex::decode(host).map_err(|error| UnitClientError::TcpSocketAddressParseError {
+ message: error.to_string(),
+ control_socket_address: socket_uri.to_string(),
+ })?;
+ let path = String::from_utf8_lossy(&bytes);
+ ControlSocket::parse_address(path)
+ }
+ Some("http") | Some("https") => Ok(TcpSocket(socket_uri)),
+ Some(unknown) => Err(UnitClientError::TcpSocketAddressParseError {
+ message: format!("Unsupported scheme found in socket address: {}", unknown).to_string(),
+ control_socket_address: socket_uri.to_string(),
+ }),
+ None => Err(UnitClientError::TcpSocketAddressParseError {
+ message: "No scheme found in socket address".to_string(),
+ control_socket_address: socket_uri.to_string(),
+ }),
+ }
+ }
+}
+
+impl ControlSocket {
+ pub fn socket_scheme(&self) -> ControlSocketScheme {
+ match self {
+ UnixLocalAbstractSocket(_) => ControlSocketScheme::HTTP,
+ UnixLocalSocket(_) => ControlSocketScheme::HTTP,
+ TcpSocket(uri) => match uri.scheme_str().expect("Scheme should not be None") {
+ "http" => ControlSocketScheme::HTTP,
+ "https" => ControlSocketScheme::HTTPS,
+ _ => unreachable!("Scheme should be http or https"),
+ },
+ }
+ }
+
+ pub fn create_uri_with_path(&self, str_path: &str) -> Uri {
+ match self {
+ UnixLocalAbstractSocket(name) => {
+ let socket_path = PathBuf::from(format!("@{}", name));
+ hyperlocal::Uri::new(socket_path, str_path).into()
+ }
+ UnixLocalSocket(socket_path) => hyperlocal::Uri::new(socket_path, str_path).into(),
+ TcpSocket(uri) => {
+ if str_path.is_empty() {
+ uri.clone()
+ } else {
+ let authority = uri.authority().expect("Authority should not be None");
+ Uri::builder()
+ .scheme(uri.scheme_str().expect("Scheme should not be None"))
+ .authority(authority.clone())
+ .path_and_query(str_path)
+ .build()
+ .expect("URI should be valid")
+ }
+ }
+ }
+ }
+
+ pub fn validate_http_address(uri: Uri) -> Result<(), UnitClientError> {
+ let http_address = uri.to_string();
+ if uri.authority().is_none() {
+ return Err(UnitClientError::TcpSocketAddressParseError {
+ message: "No authority found in socket address".to_string(),
+ control_socket_address: http_address,
+ });
+ }
+ if uri.port_u16().is_none() {
+ return Err(UnitClientError::TcpSocketAddressNoPortError {
+ control_socket_address: http_address,
+ });
+ }
+ if !(uri.path().is_empty() || uri.path().eq("/")) {
+ return Err(UnitClientError::TcpSocketAddressParseError {
+ message: format!("Path is not empty or is not / [path={}]", uri.path()),
+ control_socket_address: http_address,
+ });
+ }
+
+ Ok(())
+ }
+
+ pub fn validate_unix_address(socket: PathBuf) -> Result<(), UnitClientError> {
+ if !socket.exists() {
+ return Err(UnitClientError::UnixSocketNotFound {
+ control_socket_address: socket.to_string_lossy().to_string(),
+ });
+ }
+ let metadata = fs::metadata(&socket).map_err(|error| UnitClientError::UnixSocketAddressError {
+ source: error,
+ control_socket_address: socket.to_string_lossy().to_string(),
+ })?;
+ let file_type = metadata.file_type();
+ if !file_type.is_socket() {
+ return Err(UnitClientError::UnixSocketAddressError {
+ source: std::io::Error::new(std::io::ErrorKind::Other, "Control socket path is not a socket"),
+ control_socket_address: socket.to_string_lossy().to_string(),
+ });
+ }
+
+ Ok(())
+ }
+
+ pub fn validate(&self) -> Result<Self, UnitClientError> {
+ match self {
+ UnixLocalAbstractSocket(socket_name) => {
+ let socket_path = PathBuf::from(format!("@{}", socket_name));
+ Self::validate_unix_address(socket_path.clone())
+ }
+ UnixLocalSocket(socket_path) => Self::validate_unix_address(socket_path.clone()),
+ TcpSocket(socket_uri) => Self::validate_http_address(socket_uri.clone()),
+ }
+ .map(|_| self.to_owned())
+ }
+
+ fn normalize_and_parse_http_address(http_address: String) -> Result<Uri, UnitClientError> {
+ // Convert *:1 style network addresses to URI format
+ let address = if http_address.starts_with("*:") {
+ http_address.replacen("*:", "http://127.0.0.1:", 1)
+ // Add scheme if not present
+ } else if !(http_address.starts_with("http://") || http_address.starts_with("https://")) {
+ format!("http://{}", http_address)
+ } else {
+ http_address.to_owned()
+ };
+
+ let is_https = address.starts_with("https://");
+
+ let parsed_uri =
+ Uri::try_from(address.as_str()).map_err(|error| UnitClientError::TcpSocketAddressUriError {
+ source: error,
+ control_socket_address: address,
+ })?;
+ let authority = parsed_uri.authority().expect("Authority should not be None");
+ let expected_port = if is_https { HTTPS.port() } else { HTTP.port() };
+ let normalized_authority = match authority.port_u16() {
+ Some(_) => authority.to_owned(),
+ None => {
+ let host = format!("{}:{}", authority.host(), expected_port);
+ Authority::try_from(host.as_str()).expect("Authority should be valid")
+ }
+ };
+
+ let normalized_uri = Uri::builder()
+ .scheme(parsed_uri.scheme_str().expect("Scheme should not be None"))
+ .authority(normalized_authority)
+ .path_and_query(PathAndQuery::from_static(""))
+ .build()
+ .map_err(|error| UnitClientError::TcpSocketAddressParseError {
+ message: error.to_string(),
+ control_socket_address: http_address.clone(),
+ })?;
+
+ Ok(normalized_uri)
+ }
+
+ /// Flexibly parse a textual representation of a socket address
+ pub fn parse_address<S: Into<String>>(socket_address: S) -> Result<Self, UnitClientError> {
+ let full_socket_address: String = socket_address.into();
+ let socket_prefix = "unix:";
+ let socket_uri_prefix = "unix://";
+ let mut buf = String::with_capacity(socket_prefix.len());
+ for (i, c) in full_socket_address.char_indices() {
+ // Abstract unix socket with no prefix
+ if i == 0 && c == '@' {
+ return Ok(UnixLocalAbstractSocket(full_socket_address[1..].to_string()));
+ }
+ buf.push(c);
+ // Unix socket with prefix
+ if i == socket_prefix.len() - 1 && buf.eq(socket_prefix) {
+ let path_text = full_socket_address[socket_prefix.len()..].to_string();
+ // Return here if this URI does not have a scheme followed by double slashes
+ if !path_text.starts_with("//") {
+ return match path_text.strip_prefix('@') {
+ Some(name) => Ok(UnixLocalAbstractSocket(name.to_string())),
+ None => {
+ let path = PathBuf::from(path_text);
+ Ok(UnixLocalSocket(path))
+ }
+ };
+ }
+ }
+
+ // Unix socket with URI prefix
+ if i == socket_uri_prefix.len() - 1 && buf.eq(socket_uri_prefix) {
+ let uri = Uri::try_from(full_socket_address.as_str()).map_err(|error| {
+ UnitClientError::TcpSocketAddressParseError {
+ message: error.to_string(),
+ control_socket_address: full_socket_address.clone(),
+ }
+ })?;
+ return ControlSocket::try_from(uri);
+ }
+ }
+
+ /* Sockets on Windows are not supported, so there is no need to check
+ * if the socket address is a valid path, so we can do this shortcut
+ * here to see if a path was specified without a unix: prefix. */
+ if buf.starts_with(MAIN_SEPARATOR) {
+ let path = PathBuf::from(buf);
+ return Ok(UnixLocalSocket(path));
+ }
+
+ let uri = Self::normalize_and_parse_http_address(buf)?;
+ Ok(TcpSocket(uri))
+ }
+
+ pub fn is_local_socket(&self) -> bool {
+ match self {
+ UnixLocalAbstractSocket(_) | UnixLocalSocket(_) => true,
+ TcpSocket(_) => false,
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use rand::distributions::{Alphanumeric, DistString};
+ use std::env::temp_dir;
+ use std::fmt::Display;
+ use std::io;
+ use std::os::unix::net::UnixListener;
+
+ use super::*;
+
+ struct TempSocket {
+ socket_path: PathBuf,
+ _listener: UnixListener,
+ }
+
+ impl TempSocket {
+ fn shutdown(&mut self) -> io::Result<()> {
+ fs::remove_file(&self.socket_path)
+ }
+ }
+
+ impl Display for TempSocket {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ write!(f, "unix:{}", self.socket_path.to_string_lossy().to_string())
+ }
+ }
+
+ impl Drop for TempSocket {
+ fn drop(&mut self) {
+ self.shutdown()
+ .expect(format!("Unable to shutdown socket {}", self.socket_path.to_string_lossy()).as_str());
+ }
+ }
+
+ #[test]
+ fn will_error_with_nonexistent_unix_socket() {
+ let socket_address = "unix:/tmp/some_random_filename_that_doesnt_exist.sock";
+ let control_socket =
+ ControlSocket::try_from(socket_address).expect("No error should be returned until validate() is called");
+ assert!(control_socket.is_local_socket(), "Not parsed as a local socket");
+ assert!(control_socket.validate().is_err(), "Socket should not be valid");
+ }
+
+ #[test]
+ fn can_parse_socket_with_prefix() {
+ let temp_socket = create_file_socket().expect("Unable to create socket");
+ let control_socket = ControlSocket::try_from(temp_socket.to_string()).expect("Error parsing good socket path");
+ assert!(control_socket.is_local_socket(), "Not parsed as a local socket");
+ if let Err(e) = control_socket.validate() {
+ panic!("Socket should be valid: {}", e);
+ }
+ }
+
+ #[test]
+ fn can_parse_socket_from_uri() {
+ let temp_socket = create_file_socket().expect("Unable to create socket");
+ let uri: Uri = hyperlocal::Uri::new(temp_socket.socket_path.clone(), "").into();
+ let control_socket = ControlSocket::try_from(uri).expect("Error parsing good socket path");
+ assert!(control_socket.is_local_socket(), "Not parsed as a local socket");
+ if let Err(e) = control_socket.validate() {
+ panic!("Socket should be valid: {}", e);
+ }
+ }
+
+ #[test]
+ fn can_parse_socket_from_uri_text() {
+ let temp_socket = create_file_socket().expect("Unable to create socket");
+ let uri: Uri = hyperlocal::Uri::new(temp_socket.socket_path.clone(), "").into();
+ let control_socket = ControlSocket::parse_address(uri.to_string()).expect("Error parsing good socket path");
+ assert!(control_socket.is_local_socket(), "Not parsed as a local socket");
+ if let Err(e) = control_socket.validate() {
+ panic!("Socket for input text should be valid: {}", e);
+ }
+ }
+
+ #[test]
+ #[cfg(target_os = "linux")]
+ fn can_parse_abstract_socket_from_uri() {
+ let temp_socket = create_abstract_socket().expect("Unable to create socket");
+ let uri: Uri = hyperlocal::Uri::new(temp_socket.socket_path.clone(), "").into();
+ let control_socket = ControlSocket::try_from(uri).expect("Error parsing good socket path");
+ assert!(control_socket.is_local_socket(), "Not parsed as a local socket");
+ if let Err(e) = control_socket.validate() {
+ panic!("Socket should be valid: {}", e);
+ }
+ }
+
+ #[test]
+ #[cfg(target_os = "linux")]
+ fn can_parse_abstract_socket_from_uri_text() {
+ let temp_socket = create_abstract_socket().expect("Unable to create socket");
+ let uri: Uri = hyperlocal::Uri::new(temp_socket.socket_path.clone(), "").into();
+ let control_socket = ControlSocket::parse_address(uri.to_string()).expect("Error parsing good socket path");
+ assert!(control_socket.is_local_socket(), "Not parsed as a local socket");
+ if let Err(e) = control_socket.validate() {
+ panic!("Socket should be valid: {}", e);
+ }
+ }
+
+ #[test]
+ fn can_parse_socket_without_prefix() {
+ let temp_socket = create_file_socket().expect("Unable to create socket");
+ let control_socket = ControlSocket::try_from(temp_socket.socket_path.to_string_lossy().to_string())
+ .expect("Error parsing good socket path");
+ assert!(control_socket.is_local_socket(), "Not parsed as a local socket");
+ if let Err(e) = control_socket.validate() {
+ panic!("Socket should be valid: {}", e);
+ }
+ }
+
+ #[cfg(target_os = "linux")]
+ #[test]
+ fn can_parse_abstract_socket() {
+ let temp_socket = create_abstract_socket().expect("Unable to create socket");
+ let control_socket = ControlSocket::try_from(temp_socket.to_string()).expect("Error parsing good socket path");
+ assert!(control_socket.is_local_socket(), "Not parsed as a local socket");
+ if let Err(e) = control_socket.validate() {
+ panic!("Socket should be valid: {}", e);
+ }
+ }
+
+ #[test]
+ fn can_normalize_good_http_socket_addresses() {
+ let valid_socket_addresses = vec![
+ "http://127.0.0.1:8080",
+ "https://127.0.0.1:8080",
+ "http://127.0.0.1:8080/",
+ "127.0.0.1:8080",
+ "http://0.0.0.0:8080",
+ "https://0.0.0.0:8080",
+ "http://0.0.0.0:8080/",
+ "0.0.0.0:8080",
+ "http://localhost:8080",
+ "https://localhost:8080",
+ "http://localhost:8080/",
+ "localhost:8080",
+ "http://[::1]:8080",
+ "https://[::1]:8080",
+ "http://[::1]:8080/",
+ "[::1]:8080",
+ "http://[0000:0000:0000:0000:0000:0000:0000:0000]:8080",
+ "https://[0000:0000:0000:0000:0000:0000:0000:0000]:8080",
+ "http://[0000:0000:0000:0000:0000:0000:0000:0000]:8080/",
+ "[0000:0000:0000:0000:0000:0000:0000:0000]:8080",
+ ];
+ for socket_address in valid_socket_addresses {
+ let mut expected = if socket_address.starts_with("http") {
+ socket_address.to_string().trim_end_matches('/').to_string()
+ } else {
+ format!("http://{}", socket_address).trim_end_matches('/').to_string()
+ };
+ expected.push('/');
+
+ let control_socket = ControlSocket::try_from(socket_address).expect("Error parsing good socket path");
+ assert!(!control_socket.is_local_socket(), "Not parsed as a local socket");
+ if let Err(e) = control_socket.validate() {
+ panic!("Socket should be valid: {}", e);
+ }
+ }
+ }
+
+ #[test]
+ fn can_normalize_wildcard_http_socket_address() {
+ let socket_address = "*:8080";
+ let expected = "http://127.0.0.1:8080/";
+ let normalized_result = ControlSocket::normalize_and_parse_http_address(socket_address.to_string());
+ let normalized = normalized_result
+ .expect("Unable to normalize socket address")
+ .to_string();
+ assert_eq!(normalized, expected);
+ }
+
+ #[test]
+ fn can_normalize_http_socket_address_with_no_port() {
+ let socket_address = "http://localhost";
+ let expected = "http://localhost:80/";
+ let normalized_result = ControlSocket::normalize_and_parse_http_address(socket_address.to_string());
+ let normalized = normalized_result
+ .expect("Unable to normalize socket address")
+ .to_string();
+ assert_eq!(normalized, expected);
+ }
+
+ #[test]
+ fn can_normalize_https_socket_address_with_no_port() {
+ let socket_address = "https://localhost";
+ let expected = "https://localhost:443/";
+ let normalized_result = ControlSocket::normalize_and_parse_http_address(socket_address.to_string());
+ let normalized = normalized_result
+ .expect("Unable to normalize socket address")
+ .to_string();
+ assert_eq!(normalized, expected);
+ }
+
+ #[test]
+ fn can_parse_http_addresses() {
+ let valid_socket_addresses = vec![
+ "http://127.0.0.1:8080",
+ "https://127.0.0.1:8080",
+ "http://127.0.0.1:8080/",
+ "127.0.0.1:8080",
+ "http://0.0.0.0:8080",
+ "https://0.0.0.0:8080",
+ "http://0.0.0.0:8080/",
+ "0.0.0.0:8080",
+ "http://localhost:8080",
+ "https://localhost:8080",
+ "http://localhost:8080/",
+ "localhost:8080",
+ "http://[::1]:8080",
+ "https://[::1]:8080",
+ "http://[::1]:8080/",
+ "[::1]:8080",
+ "http://[0000:0000:0000:0000:0000:0000:0000:0000]:8080",
+ "https://[0000:0000:0000:0000:0000:0000:0000:0000]:8080",
+ "http://[0000:0000:0000:0000:0000:0000:0000:0000]:8080/",
+ "[0000:0000:0000:0000:0000:0000:0000:0000]:8080",
+ ];
+ for socket_address in valid_socket_addresses {
+ let mut expected = if socket_address.starts_with("http") {
+ socket_address.to_string().trim_end_matches('/').to_string()
+ } else {
+ format!("http://{}", socket_address).trim_end_matches('/').to_string()
+ };
+ expected.push('/');
+
+ let normalized = ControlSocket::normalize_and_parse_http_address(socket_address.to_string())
+ .expect("Unable to normalize socket address")
+ .to_string();
+ assert_eq!(normalized, expected);
+ }
+ }
+
+ fn create_file_socket() -> Result<TempSocket, io::Error> {
+ let random = Alphanumeric.sample_string(&mut rand::thread_rng(), 10);
+ let socket_name = format!("unit-client-socket-test-{}.sock", random);
+ let socket_path = temp_dir().join(socket_name);
+ let listener = UnixListener::bind(&socket_path)?;
+ Ok(TempSocket {
+ socket_path,
+ _listener: listener,
+ })
+ }
+
+ #[cfg(target_os = "linux")]
+ fn create_abstract_socket() -> Result<TempSocket, io::Error> {
+ let random = Alphanumeric.sample_string(&mut rand::thread_rng(), 10);
+ let socket_name = format!("@unit-client-socket-test-{}.sock", random);
+ let socket_path = PathBuf::from(socket_name);
+ let listener = UnixListener::bind(&socket_path)?;
+ Ok(TempSocket {
+ socket_path,
+ _listener: listener,
+ })
+ }
+}
diff --git a/tools/unitctl/unit-client-rs/src/lib.rs b/tools/unitctl/unit-client-rs/src/lib.rs
new file mode 100644
index 00000000..a0933f42
--- /dev/null
+++ b/tools/unitctl/unit-client-rs/src/lib.rs
@@ -0,0 +1,16 @@
+extern crate custom_error;
+extern crate futures;
+extern crate hyper;
+extern crate hyper_tls;
+extern crate hyperlocal;
+extern crate serde;
+extern crate serde_json;
+pub mod control_socket_address;
+mod runtime_flags;
+pub mod unit_client;
+mod unitd_cmd;
+pub mod unitd_configure_options;
+pub mod unitd_docker;
+pub mod unitd_instance;
+pub mod unitd_process;
+mod unitd_process_user;
diff --git a/tools/unitctl/unit-client-rs/src/runtime_flags.rs b/tools/unitctl/unit-client-rs/src/runtime_flags.rs
new file mode 100644
index 00000000..7b31274d
--- /dev/null
+++ b/tools/unitctl/unit-client-rs/src/runtime_flags.rs
@@ -0,0 +1,90 @@
+use std::borrow::Cow;
+use std::fmt;
+use std::fmt::Display;
+use std::path::{Path, PathBuf};
+
+#[derive(Debug, Clone)]
+pub struct RuntimeFlags {
+ pub flags: Cow<'static, str>,
+}
+
+impl Display for RuntimeFlags {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ write!(f, "{}", self.flags)
+ }
+}
+
+impl RuntimeFlags {
+ pub fn new<S>(flags: S) -> RuntimeFlags
+ where
+ S: Into<String>,
+ {
+ RuntimeFlags {
+ flags: Cow::from(flags.into()),
+ }
+ }
+
+ pub fn has_flag(&self, flag_name: &str) -> bool {
+ self.flags.contains(format!("--{}", flag_name).as_str())
+ }
+
+ pub fn get_flag_value(&self, flag_name: &str) -> Option<String> {
+ let flag_parts = self.flags.split_ascii_whitespace().collect::<Vec<&str>>();
+ for (i, flag) in flag_parts.iter().enumerate() {
+ if let Some(name) = flag.strip_prefix("--") {
+ /* If there is no flag value after the current one, there is by definition no
+ * flag value for the current flag. */
+ let index_lt_len = flag_parts.len() > i + 1;
+ if index_lt_len {
+ let next_value_isnt_flag = !flag_parts[i + 1].starts_with("--");
+ if name.eq(flag_name) && next_value_isnt_flag {
+ return Some(flag_parts[i + 1].to_string());
+ }
+ }
+ }
+ }
+ None
+ }
+
+ pub fn control_api_socket_address(&self) -> Option<String> {
+ self.get_flag_value("control")
+ }
+
+ pub fn pid_path(&self) -> Option<Box<Path>> {
+ self.get_flag_value("pid")
+ .map(PathBuf::from)
+ .map(PathBuf::into_boxed_path)
+ }
+
+ pub fn log_path(&self) -> Option<Box<Path>> {
+ self.get_flag_value("log")
+ .map(PathBuf::from)
+ .map(PathBuf::into_boxed_path)
+ }
+
+ pub fn modules_directory(&self) -> Option<Box<Path>> {
+ self.get_flag_value("modules")
+ .map(PathBuf::from)
+ .map(PathBuf::into_boxed_path)
+ }
+
+ pub fn state_directory(&self) -> Option<Box<Path>> {
+ self.get_flag_value("state")
+ .map(PathBuf::from)
+ .map(PathBuf::into_boxed_path)
+ }
+
+ pub fn tmp_directory(&self) -> Option<Box<Path>> {
+ self.get_flag_value("tmp")
+ .map(PathBuf::from)
+ .map(PathBuf::into_boxed_path)
+ }
+
+ pub fn user(&self) -> Option<String> {
+ self.get_flag_value("user").map(String::from)
+ }
+
+ pub fn group(&self) -> Option<String> {
+ self.get_flag_value("group").map(String::from)
+ }
+}
diff --git a/tools/unitctl/unit-client-rs/src/unit_client.rs b/tools/unitctl/unit-client-rs/src/unit_client.rs
new file mode 100644
index 00000000..3d09e67a
--- /dev/null
+++ b/tools/unitctl/unit-client-rs/src/unit_client.rs
@@ -0,0 +1,424 @@
+use std::collections::HashMap;
+use std::error::Error as StdError;
+use std::fmt::Debug;
+use std::rc::Rc;
+use std::{fmt, io};
+
+use custom_error::custom_error;
+use hyper::body::{Buf, HttpBody};
+use hyper::client::{HttpConnector, ResponseFuture};
+use hyper::Error as HyperError;
+use hyper::{http, Body, Client, Request};
+use hyper_tls::HttpsConnector;
+use hyperlocal::{UnixClientExt, UnixConnector};
+use serde::{Deserialize, Serialize};
+
+use crate::control_socket_address::ControlSocket;
+use unit_openapi::apis::configuration::Configuration;
+use unit_openapi::apis::{
+ ApplicationsApi, ApplicationsApiClient, AppsApi, AppsApiClient, Error as OpenAPIError, ListenersApi,
+ ListenersApiClient, StatusApi, StatusApiClient,
+};
+use unit_openapi::models::{ConfigApplication, ConfigListener, Status};
+
+const USER_AGENT: &str = concat!("Unit CLI/", env!("CARGO_PKG_VERSION"), "/rust");
+
+custom_error! {pub UnitClientError
+ OpenAPIError { source: OpenAPIError } = "OpenAPI error",
+ JsonError { source: serde_json::Error,
+ path: String} = "JSON error [path={path}]",
+ HyperError { source: hyper::Error,
+ control_socket_address: String,
+ path: String} = "Communications error [control_socket_address={control_socket_address}, path={path}]: {source}",
+ HttpRequestError { source: http::Error,
+ path: String} = "HTTP error [path={path}]",
+ HttpResponseError { status: http::StatusCode,
+ path: String,
+ body: String} = "HTTP response error [path={path}, status={status}]:\n{body}",
+ HttpResponseJsonBodyError { status: http::StatusCode,
+ path: String,
+ error: String,
+ detail: String} = "HTTP response error [path={path}, status={status}]:\n Error: {error}\n Detail: {detail}",
+ IoError { source: io::Error, socket: String } = "IO error [socket={socket}]",
+ UnixSocketAddressError {
+ source: io::Error,
+ control_socket_address: String
+ } = "Invalid unix domain socket address [control_socket_address={control_socket_address}]",
+ SocketPermissionsError { control_socket_address: String } =
+ "Insufficient permissions to connect to control socket [control_socket_address={control_socket_address}]",
+ UnixSocketNotFound { control_socket_address: String } = "Unix socket not found [control_socket_address={control_socket_address}]",
+ TcpSocketAddressUriError {
+ source: http::uri::InvalidUri,
+ control_socket_address: String
+ } = "Invalid TCP socket address [control_socket_address={control_socket_address}]",
+ TcpSocketAddressParseError {
+ message: String,
+ control_socket_address: String
+ } = "Invalid TCP socket address [control_socket_address={control_socket_address}]: {message}",
+ TcpSocketAddressNoPortError {
+ control_socket_address: String
+ } = "TCP socket address does not have a port specified [control_socket_address={control_socket_address}]",
+ UnitdProcessParseError {
+ message: String,
+ pid: u64
+ } = "{message} for [pid={pid}]",
+ UnitdProcessExecError {
+ source: Box<dyn StdError>,
+ message: String,
+ executable_path: String,
+ pid: u64
+ } = "{message} for [pid={pid}, executable_path={executable_path}]: {source}",
+ UnitdDockerError {
+ message: String
+ } = "Failed to communicate with docker daemon: {message}",
+}
+
+impl UnitClientError {
+ fn new(error: HyperError, control_socket_address: String, path: String) -> Self {
+ if error.is_connect() {
+ if let Some(source) = error.source() {
+ if let Some(io_error) = source.downcast_ref::<io::Error>() {
+ if io_error.kind().eq(&io::ErrorKind::PermissionDenied) {
+ return UnitClientError::SocketPermissionsError { control_socket_address };
+ }
+ }
+ }
+ }
+
+ UnitClientError::HyperError {
+ source: error,
+ control_socket_address,
+ path,
+ }
+ }
+}
+
+macro_rules! new_openapi_client_from_hyper_client {
+ ($unit_client:expr, $hyper_client: ident, $api_client:ident, $api_trait:ident) => {{
+ let config = Configuration {
+ base_path: $unit_client.control_socket.create_uri_with_path("/").to_string(),
+ user_agent: Some(format!("{}/OpenAPI-Generator", USER_AGENT).to_owned()),
+ client: $hyper_client.clone(),
+ basic_auth: None,
+ oauth_access_token: None,
+ api_key: None,
+ };
+ let rc_config = Rc::new(config);
+ Box::new($api_client::new(rc_config)) as Box<dyn $api_trait>
+ }};
+}
+
+macro_rules! new_openapi_client {
+ ($unit_client:expr, $api_client:ident, $api_trait:ident) => {
+ match &*$unit_client.client {
+ RemoteClient::Tcp { client } => {
+ new_openapi_client_from_hyper_client!($unit_client, client, $api_client, $api_trait)
+ }
+ RemoteClient::Unix { client } => {
+ new_openapi_client_from_hyper_client!($unit_client, client, $api_client, $api_trait)
+ }
+ }
+ };
+}
+
+#[derive(Clone)]
+pub enum RemoteClient<B>
+where
+ B: HttpBody + Send + 'static,
+ B::Data: Send,
+ B::Error: Into<Box<dyn StdError + Send + Sync>>,
+{
+ Unix {
+ client: Client<UnixConnector, B>,
+ },
+ Tcp {
+ client: Client<HttpsConnector<HttpConnector>, B>,
+ },
+}
+
+impl<B> RemoteClient<B>
+where
+ B: HttpBody + Send + 'static,
+ B::Data: Send,
+ B::Error: Into<Box<dyn StdError + Send + Sync>>,
+{
+ fn client_name(&self) -> &str {
+ match self {
+ RemoteClient::Unix { .. } => "Client<UnixConnector, Body>",
+ RemoteClient::Tcp { .. } => "Client<HttpsConnector<HttpConnector>, Body>",
+ }
+ }
+
+ pub fn request(&self, req: Request<B>) -> ResponseFuture {
+ match self {
+ RemoteClient::Unix { client } => client.request(req),
+ RemoteClient::Tcp { client } => client.request(req),
+ }
+ }
+}
+
+impl<B> Debug for RemoteClient<B>
+where
+ B: HttpBody + Send + 'static,
+ B::Data: Send,
+ B::Error: Into<Box<dyn StdError + Send + Sync>>,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "{}", self.client_name())
+ }
+}
+
+#[derive(Debug)]
+pub struct UnitClient {
+ pub control_socket: ControlSocket,
+ /// Client for communicating with the control API over the UNIX domain socket
+ client: Box<RemoteClient<Body>>,
+}
+
+impl UnitClient {
+ pub fn new(control_socket: ControlSocket) -> Self {
+ if control_socket.is_local_socket() {
+ Self::new_unix(control_socket)
+ } else {
+ Self::new_http(control_socket)
+ }
+ }
+
+ pub fn new_http(control_socket: ControlSocket) -> Self {
+ let remote_client = Client::builder().build(HttpsConnector::new());
+ Self {
+ control_socket,
+ client: Box::from(RemoteClient::Tcp { client: remote_client }),
+ }
+ }
+
+ pub fn new_unix(control_socket: ControlSocket) -> UnitClient {
+ let remote_client = Client::unix();
+
+ Self {
+ control_socket,
+ client: Box::from(RemoteClient::Unix { client: remote_client }),
+ }
+ }
+
+ /// Sends a request to Unit and deserializes the JSON response body into the value of type `RESPONSE`.
+ pub async fn send_request_and_deserialize_response<RESPONSE: for<'de> serde::Deserialize<'de>>(
+ &self,
+ mut request: Request<Body>,
+ ) -> Result<RESPONSE, UnitClientError> {
+ let uri = request.uri().clone();
+ let path: &str = uri.path();
+
+ request.headers_mut().insert("User-Agent", USER_AGENT.parse().unwrap());
+
+ let response_future = self.client.request(request);
+
+ let response = response_future
+ .await
+ .map_err(|error| UnitClientError::new(error, self.control_socket.to_string(), path.to_string()))?;
+
+ let status = response.status();
+ let body = hyper::body::aggregate(response)
+ .await
+ .map_err(|error| UnitClientError::new(error, self.control_socket.to_string(), path.to_string()))?;
+ let reader = &mut body.reader();
+ if !status.is_success() {
+ let error: HashMap<String, String> =
+ serde_json::from_reader(reader).map_err(|error| UnitClientError::JsonError {
+ source: error,
+ path: path.to_string(),
+ })?;
+
+ return Err(UnitClientError::HttpResponseJsonBodyError {
+ status,
+ path: path.to_string(),
+ error: error.get("error").unwrap_or(&"Unknown error".into()).to_string(),
+ detail: error.get("detail").unwrap_or(&"".into()).to_string(),
+ });
+ }
+ serde_json::from_reader(reader).map_err(|error| UnitClientError::JsonError {
+ source: error,
+ path: path.to_string(),
+ })
+ }
+
+ pub fn listeners_api(&self) -> Box<dyn ListenersApi + 'static> {
+ new_openapi_client!(self, ListenersApiClient, ListenersApi)
+ }
+
+ pub async fn listeners(&self) -> Result<HashMap<String, ConfigListener>, Box<UnitClientError>> {
+ self.listeners_api().get_listeners().await.or_else(|err| {
+ if let OpenAPIError::Hyper(hyper_error) = err {
+ Err(Box::new(UnitClientError::new(
+ hyper_error,
+ self.control_socket.to_string(),
+ "/listeners".to_string(),
+ )))
+ } else {
+ Err(Box::new(UnitClientError::OpenAPIError { source: err }))
+ }
+ })
+ }
+
+ pub fn status_api(&self) -> Box<dyn StatusApi + 'static> {
+ new_openapi_client!(self, StatusApiClient, StatusApi)
+ }
+
+ pub async fn status(&self) -> Result<Status, Box<UnitClientError>> {
+ self.status_api().get_status().await.or_else(|err| {
+ if let OpenAPIError::Hyper(hyper_error) = err {
+ Err(Box::new(UnitClientError::new(
+ hyper_error,
+ self.control_socket.to_string(),
+ "/status".to_string(),
+ )))
+ } else {
+ Err(Box::new(UnitClientError::OpenAPIError { source: err }))
+ }
+ })
+ }
+
+ pub fn applications_api(&self) -> Box<dyn ApplicationsApi + 'static> {
+ new_openapi_client!(self, ApplicationsApiClient, ApplicationsApi)
+ }
+
+ pub async fn applications(&self) -> Result<HashMap<String, ConfigApplication>, Box<UnitClientError>> {
+ self.applications_api().get_applications().await.or_else(|err| {
+ if let OpenAPIError::Hyper(hyper_error) = err {
+ Err(Box::new(UnitClientError::new(
+ hyper_error,
+ self.control_socket.to_string(),
+ "/applications".to_string(),
+ )))
+ } else {
+ Err(Box::new(UnitClientError::OpenAPIError { source: err }))
+ }
+ })
+ }
+
+ pub async fn per_application_api(&self) -> Box<dyn AppsApi + 'static> {
+ new_openapi_client!(self, AppsApiClient, AppsApi)
+ }
+
+ pub async fn restart_application(&self, name: &String) -> Result<HashMap<String, String>, Box<UnitClientError>> {
+ self.per_application_api()
+ .await
+ .get_app_restart(name.as_str())
+ .await
+ .or_else(|err| {
+ if let OpenAPIError::Hyper(hyper_error) = err {
+ Err(Box::new(UnitClientError::new(
+ hyper_error,
+ self.control_socket.to_string(),
+ format!("/control/applications/{}/restart", name),
+ )))
+ } else {
+ Err(Box::new(UnitClientError::OpenAPIError { source: err }))
+ }
+ })
+ }
+
+ pub async fn is_running(&self) -> bool {
+ self.status().await.is_ok()
+ }
+}
+
+pub type UnitSerializableMap = HashMap<String, serde_json::Value>;
+
+#[derive(Clone, Debug, Deserialize, Serialize)]
+pub struct UnitStatus {
+ pub connections: UnitStatusConnections,
+ pub requests: UnitStatusRequests,
+ pub applications: HashMap<String, UnitStatusApplication>,
+}
+
+#[derive(Clone, Debug, Deserialize, Serialize)]
+pub struct UnitStatusConnections {
+ #[serde(default)]
+ pub closed: usize,
+ #[serde(default)]
+ pub idle: usize,
+ #[serde(default)]
+ pub active: usize,
+ #[serde(default)]
+ pub accepted: usize,
+}
+
+#[derive(Clone, Debug, Deserialize, Serialize)]
+pub struct UnitStatusRequests {
+ #[serde(default)]
+ pub active: usize,
+ #[serde(default)]
+ pub total: usize,
+}
+
+#[derive(Clone, Debug, Deserialize, Serialize)]
+pub struct UnitStatusApplication {
+ #[serde(default)]
+ pub processes: HashMap<String, usize>,
+ #[serde(default)]
+ pub requests: HashMap<String, usize>,
+}
+
+#[cfg(test)]
+mod tests {
+ use crate::unitd_instance::UnitdInstance;
+
+ use super::*;
+ // Integration tests
+
+ #[tokio::test]
+ async fn can_connect_to_unit_api() {
+ match UnitdInstance::running_unitd_instances().await.first() {
+ Some(unit_instance) => {
+ let control_api_socket_address = unit_instance
+ .control_api_socket_address()
+ .expect("No control API socket path found");
+ let control_socket = ControlSocket::try_from(control_api_socket_address)
+ .expect("Unable to parse control socket address");
+ let unit_client = UnitClient::new(control_socket);
+ assert!(unit_client.is_running().await);
+ }
+ None => {
+ eprintln!("No running unitd instances found - skipping test");
+ }
+ }
+ }
+
+ #[tokio::test]
+ async fn can_get_unit_status() {
+ match UnitdInstance::running_unitd_instances().await.first() {
+ Some(unit_instance) => {
+ let control_api_socket_address = unit_instance
+ .control_api_socket_address()
+ .expect("No control API socket path found");
+ let control_socket = ControlSocket::try_from(control_api_socket_address)
+ .expect("Unable to parse control socket address");
+ let unit_client = UnitClient::new(control_socket);
+ let status = unit_client.status().await.expect("Unable to get unit status");
+ println!("Unit status: {:?}", status);
+ }
+ None => {
+ eprintln!("No running unitd instances found - skipping test");
+ }
+ }
+ }
+
+ #[tokio::test]
+ async fn can_get_unit_listeners() {
+ match UnitdInstance::running_unitd_instances().await.first() {
+ Some(unit_instance) => {
+ let control_api_socket_address = unit_instance
+ .control_api_socket_address()
+ .expect("No control API socket path found");
+ let control_socket = ControlSocket::try_from(control_api_socket_address)
+ .expect("Unable to parse control socket address");
+ let unit_client = UnitClient::new(control_socket);
+ unit_client.listeners().await.expect("Unable to get Unit listeners");
+ }
+ None => {
+ eprintln!("No running unitd instances found - skipping test");
+ }
+ }
+ }
+}
diff --git a/tools/unitctl/unit-client-rs/src/unitd_cmd.rs b/tools/unitctl/unit-client-rs/src/unitd_cmd.rs
new file mode 100644
index 00000000..17563cb0
--- /dev/null
+++ b/tools/unitctl/unit-client-rs/src/unitd_cmd.rs
@@ -0,0 +1,88 @@
+use std::error::Error as StdError;
+use std::io::{Error as IoError, ErrorKind};
+
+use crate::runtime_flags::RuntimeFlags;
+use std::path::{Path, PathBuf};
+
+#[derive(Debug, Clone)]
+pub struct UnitdCmd {
+ pub(crate) process_executable_path: Option<Box<Path>>,
+ pub version: Option<String>,
+ pub flags: Option<RuntimeFlags>,
+}
+
+impl UnitdCmd {
+ pub(crate) fn new<S>(full_cmd: S, binary_name: &str) -> Result<UnitdCmd, Box<dyn StdError>>
+ where
+ S: Into<String>,
+ {
+ let process_cmd: String = full_cmd.into();
+ let parsable = process_cmd
+ .strip_prefix("unit: main v")
+ .and_then(|s| s.strip_suffix(']'));
+ if parsable.is_none() {
+ let msg = format!("cmd does not have the expected format: {}", process_cmd);
+ return Err(IoError::new(ErrorKind::InvalidInput, msg).into());
+ }
+ let parts = parsable
+ .expect("Unable to parse cmd")
+ .splitn(2, " [")
+ .collect::<Vec<&str>>();
+
+ if parts.len() != 2 {
+ let msg = format!("cmd does not have the expected format: {}", process_cmd);
+ return Err(IoError::new(ErrorKind::InvalidInput, msg).into());
+ }
+
+ let version = Some(parts[0].to_string());
+ let executable_path = UnitdCmd::parse_executable_path_from_cmd(parts[1], binary_name);
+ let flags = UnitdCmd::parse_runtime_flags_from_cmd(parts[1]);
+
+ Ok(UnitdCmd {
+ process_executable_path: executable_path,
+ version,
+ flags,
+ })
+ }
+
+ fn parse_executable_path_from_cmd<S>(full_cmd: S, binary_name: &str) -> Option<Box<Path>>
+ where
+ S: Into<String>,
+ {
+ let cmd = full_cmd.into();
+ if cmd.is_empty() {
+ return None;
+ }
+
+ let split = cmd.splitn(2, binary_name).collect::<Vec<&str>>();
+ if split.is_empty() {
+ return None;
+ }
+
+ let path = format!("{}{}", split[0], binary_name);
+ Some(PathBuf::from(path).into_boxed_path())
+ }
+
+ fn parse_runtime_flags_from_cmd<S>(full_cmd: S) -> Option<RuntimeFlags>
+ where
+ S: Into<String>,
+ {
+ let cmd = full_cmd.into();
+ if cmd.is_empty() {
+ return None;
+ }
+
+ // Split out everything in between the brackets [ and ]
+ let split = cmd.trim_end_matches(']').splitn(2, '[').collect::<Vec<&str>>();
+ if split.is_empty() {
+ return None;
+ }
+ /* Now we need to parse a string like this:
+ * ./sbin/unitd --no-daemon --tmp /tmp
+ * and only return what is after the invoking command */
+ split[0]
+ .find("--")
+ .map(|index| cmd[index..].to_string())
+ .map(RuntimeFlags::new)
+ }
+}
diff --git a/tools/unitctl/unit-client-rs/src/unitd_configure_options.rs b/tools/unitctl/unit-client-rs/src/unitd_configure_options.rs
new file mode 100644
index 00000000..00ee22a3
--- /dev/null
+++ b/tools/unitctl/unit-client-rs/src/unitd_configure_options.rs
@@ -0,0 +1,236 @@
+use custom_error::custom_error;
+use std::borrow::Cow;
+use std::error::Error as stdError;
+use std::io::{BufRead, BufReader, Lines};
+use std::path::{Path, PathBuf};
+use std::process::{Command, Stdio};
+
+custom_error! {UnitdStderrParseError
+ VersionNotFound = "Version string output not found",
+ BuildSettingsNotFound = "Build settings not found"
+}
+
+#[derive(Debug, Clone)]
+pub struct UnitdConfigureOptions {
+ pub version: Cow<'static, str>,
+ pub all_flags: Cow<'static, str>,
+}
+
+impl UnitdConfigureOptions {
+ pub fn new(unitd_path: &Path) -> Result<UnitdConfigureOptions, Box<dyn stdError>> {
+ fn parse_configure_settings_from_unitd_stderr_output<B: BufRead>(
+ lines: &mut Lines<B>,
+ ) -> Result<UnitdConfigureOptions, Box<dyn stdError>> {
+ const VERSION_PREFIX: &str = "unit version: ";
+ const CONFIGURED_AS_PREFIX: &str = "configured as ";
+ const CONFIGURE_PREFIX: &str = "configured as ./configure ";
+
+ fn aggregate_parsable_lines(
+ mut accum: (Option<String>, Option<String>),
+ line: String,
+ ) -> (Option<String>, Option<String>) {
+ if line.starts_with(VERSION_PREFIX) {
+ accum.0 = line.strip_prefix(VERSION_PREFIX).map(|l| l.to_string());
+ } else if line.starts_with(CONFIGURED_AS_PREFIX) {
+ accum.1 = line.strip_prefix(CONFIGURE_PREFIX).map(|l| l.to_string());
+ }
+
+ accum
+ }
+
+ let options_lines = lines
+ .filter_map(|line| line.ok())
+ .fold((None, None), aggregate_parsable_lines);
+
+ if options_lines.0.is_none() {
+ return Err(Box::new(UnitdStderrParseError::VersionNotFound) as Box<dyn stdError>);
+ } else if options_lines.1.is_none() {
+ return Err(Box::new(UnitdStderrParseError::BuildSettingsNotFound) as Box<dyn stdError>);
+ }
+
+ Ok(UnitdConfigureOptions {
+ version: options_lines.0.unwrap().into(),
+ all_flags: options_lines.1.unwrap().into(),
+ })
+ }
+
+ let program = unitd_path.as_os_str();
+ let child = Command::new(program)
+ .arg("--version")
+ .stdout(Stdio::piped())
+ .stderr(Stdio::piped())
+ .spawn()?;
+ let output = child.wait_with_output()?;
+ let err = BufReader::new(&*output.stderr);
+ parse_configure_settings_from_unitd_stderr_output(&mut err.lines())
+ }
+
+ pub fn has_flag(&self, flag_name: &str) -> bool {
+ self.all_flags
+ .split_ascii_whitespace()
+ .any(|flag| flag.starts_with(format!("--{}", flag_name).as_str()))
+ }
+
+ pub fn get_flag_value(&self, flag_name: &str) -> Option<String> {
+ self.all_flags
+ .split_ascii_whitespace()
+ .find(|flag| flag.starts_with(format!("--{}", flag_name).as_str()))
+ .and_then(|flag| {
+ let parts: Vec<&str> = flag.split('=').collect();
+ if parts.len() >= 2 {
+ Some(parts[1].to_owned())
+ } else {
+ None
+ }
+ })
+ }
+
+ pub fn debug_enabled(&self) -> bool {
+ self.has_flag("debug")
+ }
+
+ pub fn openssl_enabled(&self) -> bool {
+ self.has_flag("openssl")
+ }
+
+ pub fn prefix_path(&self) -> Option<Box<Path>> {
+ self.get_flag_value("prefix")
+ .map(PathBuf::from)
+ .map(PathBuf::into_boxed_path)
+ }
+
+ fn join_to_prefix_path<S>(&self, sub_path: S) -> Option<Box<Path>>
+ where
+ S: Into<String>,
+ {
+ self.prefix_path()
+ .map(|path| path.join(sub_path.into()).into_boxed_path())
+ }
+
+ pub fn default_control_api_socket_address(&self) -> Option<String> {
+ // If the socket address is specific configured in the configure options, we use
+ // that. Otherwise, we use the default path as assumed to be unix:$prefix/control.unit.sock.
+ match self.get_flag_value("control") {
+ Some(socket_address) => Some(socket_address),
+ None => {
+ // Give up if the unitd is compiled with unix sockets disabled
+ if self.has_flag("no-unix-sockets") {
+ return None;
+ }
+ let socket_path = self.join_to_prefix_path("control.unit.sock");
+ socket_path.map(|path| format!("unix:{}", path.to_string_lossy()))
+ }
+ }
+ }
+
+ pub fn default_pid_path(&self) -> Option<Box<Path>> {
+ match self.get_flag_value("pid") {
+ Some(pid_path) => self.join_to_prefix_path(pid_path),
+ None => self.join_to_prefix_path("unit.pid"),
+ }
+ }
+
+ pub fn default_log_path(&self) -> Option<Box<Path>> {
+ match self.get_flag_value("log") {
+ Some(pid_path) => self.join_to_prefix_path(pid_path),
+ None => self.join_to_prefix_path("unit.log"),
+ }
+ }
+
+ pub fn default_modules_directory(&self) -> Option<Box<Path>> {
+ match self.get_flag_value("modules") {
+ Some(modules_dir_name) => self.join_to_prefix_path(modules_dir_name),
+ None => self.join_to_prefix_path("modules"),
+ }
+ }
+
+ pub fn default_state_directory(&self) -> Option<Box<Path>> {
+ match self.get_flag_value("state") {
+ Some(state_dir_name) => self.join_to_prefix_path(state_dir_name),
+ None => self.join_to_prefix_path("state"),
+ }
+ }
+
+ pub fn default_tmp_directory(&self) -> Option<Box<Path>> {
+ match self.get_flag_value("tmp") {
+ Some(tmp_dir_name) => self.join_to_prefix_path(tmp_dir_name),
+ None => self.join_to_prefix_path("tmp"),
+ }
+ }
+ pub fn default_user(&self) -> Option<String> {
+ self.get_flag_value("user").map(String::from)
+ }
+ pub fn default_group(&self) -> Option<String> {
+ self.get_flag_value("group").map(String::from)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::unitd_instance;
+ use crate::unitd_instance::UNITD_PATH_ENV_KEY;
+
+ #[test]
+ fn can_detect_key() {
+ let options = UnitdConfigureOptions {
+ version: Default::default(),
+ all_flags: Cow::from("--debug --openssl --prefix=/opt/unit"),
+ };
+ assert!(options.has_flag("debug"));
+ assert!(options.has_flag("openssl"));
+ assert!(options.has_flag("prefix"));
+ assert!(!options.has_flag("fobar"));
+ }
+
+ #[test]
+ fn can_get_flag_value_by_key() {
+ let expected = "/opt/unit";
+ let options = UnitdConfigureOptions {
+ version: Default::default(),
+ all_flags: Cow::from("--debug --openssl --prefix=/opt/unit"),
+ };
+
+ let actual = options.get_flag_value("prefix");
+ assert_eq!(expected, actual.unwrap())
+ }
+
+ #[test]
+ fn can_get_prefix_path() {
+ let expected: Box<Path> = Path::new("/opt/unit").into();
+ let options = UnitdConfigureOptions {
+ version: Default::default(),
+ all_flags: Cow::from("--debug --openssl --prefix=/opt/unit"),
+ };
+
+ let actual = options.prefix_path();
+ assert_eq!(expected, actual.unwrap())
+ }
+
+ #[test]
+ fn can_parse_complicated_configure_options() {
+ let expected: Box<Path> = Path::new("/usr").into();
+ let options = UnitdConfigureOptions {
+ version: Default::default(),
+ all_flags: Cow::from("--prefix=/usr --state=/var/lib/unit --control=unix:/var/run/control.unit.sock --pid=/var/run/unit.pid --log=/var/log/unit.log --tmp=/var/tmp --user=unit --group=unit --tests --openssl --modules=/usr/lib/unit/modules --libdir=/usr/lib/x86_64-linux-gnu --cc-opt='-g -O2 -fdebug-prefix-map=/data/builder/debuild/unit-1.28.0/pkg/deb/debuild/unit-1.28.0=. -specs=/usr/share/dpkg/no-pie-compile.specs -fstack-protector-strong -Wformat -Werror=format-security -Wp,-D_FORTIFY_SOURCE=2 -fPIC' --ld-opt='-Wl,-Bsymbolic-functions -specs=/usr/share/dpkg/no-pie-link.specs -Wl,-z,relro -Wl,-z,now -Wl,--as-needed -pie'
+"),
+ };
+
+ let actual = options.prefix_path();
+ assert_eq!(expected, actual.unwrap())
+ }
+
+ #[test]
+ #[ignore] // run this one manually - not in CI
+ fn can_run_unitd() {
+ let specific_path = std::env::var(UNITD_PATH_ENV_KEY).map_err(|error| Box::new(error) as Box<dyn stdError>);
+ let unitd_path = unitd_instance::find_executable_path(specific_path);
+ let config_options = UnitdConfigureOptions::new(&unitd_path.unwrap());
+ match config_options {
+ Ok(options) => {
+ println!("{:?}", options)
+ }
+ Err(error) => panic!("{}", error),
+ };
+ }
+}
diff --git a/tools/unitctl/unit-client-rs/src/unitd_docker.rs b/tools/unitctl/unit-client-rs/src/unitd_docker.rs
new file mode 100644
index 00000000..2b9e0c7d
--- /dev/null
+++ b/tools/unitctl/unit-client-rs/src/unitd_docker.rs
@@ -0,0 +1,456 @@
+use std::collections::HashMap;
+use std::fs::read_to_string;
+use std::io::stderr;
+use std::path::{PathBuf, MAIN_SEPARATOR};
+
+use crate::control_socket_address::ControlSocket;
+use crate::futures::StreamExt;
+use crate::unit_client::UnitClientError;
+use crate::unitd_process::UnitdProcess;
+
+use bollard::container::{Config, ListContainersOptions, StartContainerOptions};
+use bollard::image::CreateImageOptions;
+use bollard::models::{ContainerCreateResponse, ContainerSummary, HostConfig, Mount, MountTypeEnum};
+use bollard::secret::ContainerInspectResponse;
+use bollard::Docker;
+
+use regex::Regex;
+
+use serde::ser::SerializeMap;
+use serde::{Serialize, Serializer};
+
+use pbr::ProgressBar;
+
+#[derive(Clone, Debug)]
+pub struct UnitdContainer {
+ pub container_id: Option<String>,
+ pub container_image: String,
+ pub command: Option<String>,
+ pub mounts: HashMap<PathBuf, PathBuf>,
+ pub platform: String,
+ details: Option<ContainerInspectResponse>,
+}
+
+impl From<&ContainerSummary> for UnitdContainer {
+ fn from(ctr: &ContainerSummary) -> Self {
+ // we assume paths from the docker api are absolute
+ // they certainly have to be later...
+ let mut mounts = HashMap::new();
+ if let Some(mts) = &ctr.mounts {
+ for i in mts {
+ if let Some(ref src) = i.source {
+ if let Some(ref dest) = i.destination {
+ mounts.insert(PathBuf::from(dest.clone()), PathBuf::from(src.clone()));
+ }
+ }
+ }
+ }
+
+ UnitdContainer {
+ container_id: ctr.id.clone(),
+ container_image: format!(
+ "{} (docker)",
+ ctr.image.clone().unwrap_or(String::from("unknown container")),
+ ),
+ command: ctr.command.clone(),
+ mounts: mounts,
+ platform: String::from("Docker"),
+ details: None,
+ }
+ }
+}
+
+impl From<&UnitdContainer> for UnitdProcess {
+ fn from(ctr: &UnitdContainer) -> Self {
+ let version = ctr.details.as_ref().and_then(|details| {
+ details.config.as_ref().and_then(|conf| {
+ conf.labels.as_ref().and_then(|labels| {
+ labels
+ .get("org.opencontainers.image.version")
+ .and_then(|version| Some(version.clone()))
+ })
+ })
+ });
+ let command = ctr.command.clone().and_then(|cmd| {
+ Some(format!(
+ "{}{} [{}{}]",
+ "unit: main v",
+ version.or(Some(String::from(""))).unwrap(),
+ ctr.container_image,
+ ctr.rewrite_socket(
+ cmd.strip_prefix("/usr/local/bin/docker-entrypoint.sh")
+ .or_else(|| Some(""))
+ .unwrap()
+ .to_string()
+ )
+ ))
+ });
+ let mut cmds = vec![];
+ let _ = command.map_or((), |cmd| cmds.push(cmd));
+ UnitdProcess {
+ all_cmds: cmds,
+ binary_name: ctr.container_image.clone(),
+ process_id: ctr
+ .details
+ .as_ref()
+ .and_then(|details| {
+ details
+ .state
+ .as_ref()
+ .and_then(|state| state.pid.and_then(|pid| Some(pid.clone() as u64)))
+ })
+ .or(Some(0 as u64))
+ .unwrap(),
+ executable_path: None,
+ environ: vec![],
+ working_dir: ctr.details.as_ref().and_then(|details| {
+ details.config.as_ref().and_then(|conf| {
+ Some(
+ PathBuf::from(
+ conf.working_dir
+ .as_ref()
+ .map_or(String::new(), |dir| ctr.host_path(dir.clone())),
+ )
+ .into_boxed_path(),
+ )
+ })
+ }),
+ child_pids: vec![],
+ user: None,
+ effective_user: None,
+ container: Some(ctr.clone()),
+ }
+ }
+}
+
+impl Serialize for UnitdContainer {
+ fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+ where
+ S: Serializer,
+ {
+ // 5 = fields to serialize
+ let mut state = serializer.serialize_map(Some(5))?;
+ state.serialize_entry("container_id", &self.container_id)?;
+ state.serialize_entry("container_image", &self.container_image)?;
+ state.serialize_entry("command", &self.command)?;
+ state.serialize_entry("mounts", &self.mounts)?;
+ state.serialize_entry("platform", &self.platform)?;
+ state.end()
+ }
+}
+
+impl UnitdContainer {
+ pub async fn find_unitd_containers() -> Vec<UnitdContainer> {
+ if let Ok(docker) = Docker::connect_with_local_defaults() {
+ match docker.list_containers::<String>(None).await {
+ Err(e) => {
+ eprintln!("{}", e);
+ vec![]
+ }
+ Ok(summary) => {
+ let unitd_command_re = Regex::new(r"^(.* )?unitd( .*)?$").unwrap();
+
+ // cant do this functionally because of the async call
+ let mut mapped = vec![];
+ for ctr in summary {
+ if unitd_command_re.is_match(&ctr.clone().command.or(Some(String::new())).unwrap()) {
+ let mut c = UnitdContainer::from(&ctr);
+ if let Some(names) = ctr.names {
+ if names.len() > 0 {
+ let name = names[0].strip_prefix("/").or(Some(names[0].as_str())).unwrap();
+ if let Ok(cir) = docker.inspect_container(name, None).await {
+ c.details = Some(cir);
+ }
+ }
+ }
+ mapped.push(c);
+ }
+ }
+ mapped
+ }
+ }
+ } else {
+ vec![]
+ }
+ }
+
+ pub fn host_path(&self, container_path: String) -> String {
+ let cp = PathBuf::from(container_path);
+ // get only possible mount points
+ // sort to deepest mountpoint first
+ // assumed deepest possible mount point takes precedence
+ let mut keys = self
+ .mounts
+ .clone()
+ .into_keys()
+ .filter(|mp| cp.as_path().starts_with(mp))
+ .collect::<Vec<_>>();
+ keys.sort_by_key(|a| 0 as isize - a.ancestors().count() as isize);
+
+ // either return translated path or original prefixed with "container"
+ if keys.len() > 0 {
+ let mut matches = self.mounts[&keys[0]].clone().join(
+ cp.as_path()
+ .strip_prefix(keys[0].clone())
+ .expect("error checking path prefix"),
+ );
+ /* Observed on M1 Mac that Docker on OSX
+ * adds a bunch of garbage to the mount path
+ * converting it into a useless directory
+ * that doesnt actually exist
+ */
+ if cfg!(target_os = "macos") {
+ let mut abs = PathBuf::from(String::from(MAIN_SEPARATOR));
+ let m = matches
+ .strip_prefix("/host_mnt/private")
+ .unwrap_or(matches.strip_prefix("/host_mnt").unwrap_or(matches.as_path()));
+ // make it absolute again
+ abs.push(m);
+ matches = abs;
+ }
+ matches.to_string_lossy().to_string()
+ } else {
+ format!("<container>:{}", cp.display())
+ }
+ }
+
+ pub fn rewrite_socket(&self, command: String) -> String {
+ command
+ .split(" ")
+ .map(|tok| {
+ if tok.starts_with("unix:") {
+ format!(
+ "unix:{}",
+ self.host_path(tok.strip_prefix("unix:").unwrap().to_string())
+ )
+ } else {
+ tok.to_string()
+ }
+ })
+ .collect::<Vec<_>>()
+ .join(" ")
+ }
+
+ pub fn container_is_running(&self) -> Option<bool> {
+ self.details
+ .as_ref()
+ .and_then(|details| details.state.as_ref().and_then(|state| state.running))
+ }
+}
+
+/* deploys a new docker image of tag $image_tag.
+ * mounts $socket to /var/run in the new container.
+ * mounts $application read only to /www.
+ * new container is on host network.
+ *
+ * ON SUCCESS returns vector of warnings from Docker API
+ * ON FAILURE returns wrapped error from Docker API
+ */
+pub async fn deploy_new_container(
+ socket: ControlSocket,
+ application: &String,
+ application_read_only: bool,
+ image: &String,
+) -> Result<Vec<String>, UnitClientError> {
+ match Docker::connect_with_local_defaults() {
+ Ok(docker) => {
+ let mut mounts = vec![];
+ // if a unix socket is specified, mounts its directory
+ if socket.is_local_socket() {
+ let mount_path = PathBuf::from(socket.clone()).as_path().to_string_lossy().to_string();
+ mounts.push(Mount {
+ typ: Some(MountTypeEnum::BIND),
+ source: Some(mount_path),
+ target: Some("/var/run".to_string()),
+ ..Default::default()
+ });
+ }
+ // mount application dir
+ mounts.push(Mount {
+ typ: Some(MountTypeEnum::BIND),
+ source: Some(application.clone()),
+ target: Some("/www".to_string()),
+ read_only: Some(application_read_only),
+ ..Default::default()
+ });
+
+ let mut pb = ProgressBar::on(stderr(), 10);
+ let mut totals = HashMap::new();
+ let mut stream = docker.create_image(
+ Some(CreateImageOptions {
+ from_image: image.as_str(),
+ ..Default::default()
+ }),
+ None,
+ None,
+ );
+ while let Some(res) = stream.next().await {
+ if let Ok(info) = res {
+ if let Some(id) = info.id {
+ if let Some(_) = totals.get_mut(&id) {
+ if let Some(delta) = info.progress_detail.and_then(|detail| detail.current) {
+ pb.add(delta as u64);
+ }
+ } else {
+ if let Some(total) = info.progress_detail.and_then(|detail| detail.total) {
+ totals.insert(id, total);
+ pb.total += total as u64;
+ }
+ }
+ }
+ }
+ }
+ pb.finish();
+
+ // create the new unit container
+ let resp: ContainerCreateResponse;
+ let host_conf = HostConfig {
+ mounts: Some(mounts),
+ network_mode: Some("host".to_string()),
+ ..Default::default()
+ };
+ let mut container_conf = Config {
+ image: Some(image.clone()),
+ ..Default::default()
+ };
+ if let ControlSocket::TcpSocket(ref uri) = socket {
+ let port = uri.port_u16().or(Some(80)).unwrap();
+ // override port
+ container_conf.cmd = Some(vec![
+ "unitd".to_string(),
+ "--no-daemon".to_string(),
+ "--control".to_string(),
+ format!("{}:{}", uri.host().unwrap(), port),
+ ]);
+ }
+ container_conf.host_config = Some(host_conf);
+ match docker.create_container::<String, String>(None, container_conf).await {
+ Err(err) => {
+ return Err(UnitClientError::UnitdDockerError {
+ message: err.to_string(),
+ })
+ }
+ Ok(response) => resp = response,
+ }
+
+ // create container gives us an ID
+ // but start container requires a name
+ let mut list_container_filters = HashMap::new();
+ list_container_filters.insert("id".to_string(), vec![resp.id]);
+ match docker
+ .list_containers::<String>(Some(ListContainersOptions {
+ all: true,
+ limit: None,
+ size: false,
+ filters: list_container_filters,
+ }))
+ .await
+ {
+ // somehow our container doesnt exist
+ Err(e) => Err(UnitClientError::UnitdDockerError { message: e.to_string() }),
+ // here it is!
+ Ok(info) => {
+ if info.len() < 1 {
+ return Err(UnitClientError::UnitdDockerError {
+ message: "couldnt find new container".to_string(),
+ });
+ } else if info[0].names.is_none() || info[0].names.clone().unwrap().len() < 1 {
+ return Err(UnitClientError::UnitdDockerError {
+ message: "new container has no name".to_string(),
+ });
+ }
+
+ // start our container
+ match docker
+ .start_container(
+ info[0].names.clone().unwrap()[0].strip_prefix(MAIN_SEPARATOR).unwrap(),
+ None::<StartContainerOptions<String>>,
+ )
+ .await
+ {
+ Err(err) => Err(UnitClientError::UnitdDockerError {
+ message: err.to_string(),
+ }),
+ Ok(_) => Ok(resp.warnings),
+ }
+ }
+ }
+ }
+ Err(e) => Err(UnitClientError::UnitdDockerError { message: e.to_string() }),
+ }
+}
+
+/* Returns either 64 char docker container ID or None */
+pub fn pid_is_dockerized(pid: u64) -> bool {
+ let cg_filepath = format!("/proc/{}/cgroup", pid);
+ match read_to_string(cg_filepath) {
+ Err(e) => {
+ eprintln!("{}", e);
+ false
+ }
+ Ok(contents) => {
+ let docker_re = Regex::new(r"docker-([a-zA-Z0-9]{64})").unwrap();
+ docker_re.is_match(contents.as_str())
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_path_translation() {
+ let mut mounts = HashMap::new();
+ mounts.insert("/1/2/3/4/5/6/7".into(), "/0".into());
+ mounts.insert("/root".into(), "/1".into());
+ mounts.insert("/root/mid".into(), "/2".into());
+ mounts.insert("/root/mid/child".into(), "/3".into());
+ mounts.insert("/mid/child".into(), "/4".into());
+ mounts.insert("/child".into(), "/5".into());
+ mounts.insert("/var".into(), "/host_mnt/private/6".into());
+ mounts.insert("/var/var".into(), "/host_mnt/7".into());
+
+ let ctr = UnitdContainer {
+ container_id: None,
+ container_image: String::from(""),
+ command: None,
+ platform: "test".to_string(),
+ details: None,
+ mounts: mounts,
+ };
+
+ assert_eq!(
+ "/3/c2/test".to_string(),
+ ctr.host_path("/root/mid/child/c2/test".to_string())
+ );
+ assert_eq!(
+ "<container>:/path/to/conf".to_string(),
+ ctr.host_path("/path/to/conf".to_string())
+ );
+ if cfg!(target_os = "macos") {
+ assert_eq!("/6/test".to_string(), ctr.host_path("/var/test".to_string()));
+ assert_eq!("/7/test".to_string(), ctr.host_path("/var/var/test".to_string()));
+ }
+ }
+
+ #[test]
+ fn test_unix_sock_path_translate() {
+ let mut mounts = HashMap::new();
+ mounts.insert("/var/run".into(), "/tmp".into());
+
+ let ctr = UnitdContainer {
+ container_id: None,
+ container_image: String::from(""),
+ command: None,
+ platform: "test".to_string(),
+ details: None,
+ mounts: mounts,
+ };
+
+ assert_eq!(
+ ctr.rewrite_socket("unitd --no-daemon --control unix:/var/run/control.unit.sock".to_string()),
+ "unitd --no-daemon --control unix:/tmp/control.unit.sock".to_string()
+ );
+ }
+}
diff --git a/tools/unitctl/unit-client-rs/src/unitd_instance.rs b/tools/unitctl/unit-client-rs/src/unitd_instance.rs
new file mode 100644
index 00000000..ace8e858
--- /dev/null
+++ b/tools/unitctl/unit-client-rs/src/unitd_instance.rs
@@ -0,0 +1,403 @@
+use crate::unit_client::UnitClientError;
+use crate::unitd_docker::UnitdContainer;
+use serde::ser::SerializeMap;
+use serde::{Serialize, Serializer};
+use std::error::Error as StdError;
+use std::path::{Path, PathBuf};
+use std::{fmt, io};
+use which::which;
+
+use crate::runtime_flags::RuntimeFlags;
+use crate::unitd_configure_options::UnitdConfigureOptions;
+use crate::unitd_process::UnitdProcess;
+
+pub const UNITD_PATH_ENV_KEY: &str = "UNITD_PATH";
+pub const UNITD_BINARY_NAMES: [&str; 2] = ["unitd", "unitd-debug"];
+
+#[derive(Debug)]
+pub struct UnitdInstance {
+ pub process: UnitdProcess,
+ pub configure_options: Option<UnitdConfigureOptions>,
+ pub errors: Vec<UnitClientError>,
+}
+
+impl Serialize for UnitdInstance {
+ fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+ where
+ S: Serializer,
+ {
+ // 11 = fields to serialize
+ let mut state = serializer.serialize_map(Some(11))?;
+ let runtime_flags = self
+ .process
+ .cmd()
+ .and_then(|cmd| cmd.flags)
+ .map(|flags| flags.to_string());
+
+ let configure_flags = self.configure_options.as_ref().map(|opts| opts.all_flags.clone());
+
+ state.serialize_entry("process", &self.process)?;
+ state.serialize_entry("version", &self.version())?;
+ state.serialize_entry("control_socket", &self.control_api_socket_address())?;
+ state.serialize_entry("log_path", &self.log_path())?;
+ state.serialize_entry("pid_path", &self.pid_path())?;
+ state.serialize_entry("modules_directory", &self.modules_directory())?;
+ state.serialize_entry("state_directory", &self.state_directory())?;
+ state.serialize_entry("tmp_directory", &self.tmp_directory())?;
+ state.serialize_entry("runtime_flags", &runtime_flags)?;
+ state.serialize_entry("configure_flags", &configure_flags)?;
+ let string_errors = &self.errors.iter().map(|e| e.to_string()).collect::<Vec<String>>();
+ state.serialize_entry("errors", string_errors)?;
+
+ state.end()
+ }
+}
+
+impl UnitdInstance {
+ pub async fn running_unitd_instances() -> Vec<UnitdInstance> {
+ Self::collect_unitd_processes(
+ UnitdProcess::find_unitd_processes()
+ .into_iter()
+ .chain(
+ UnitdContainer::find_unitd_containers()
+ .await
+ .into_iter()
+ .map(|x| UnitdProcess::from(&x))
+ .collect::<Vec<_>>(),
+ )
+ .collect(),
+ )
+ }
+
+ /// Find all running unitd processes and convert them into UnitdInstances and filter
+ /// out all errors by printing them to stderr and leaving errored instances out of
+ /// the returned vector.
+ fn collect_unitd_processes(processes: Vec<UnitdProcess>) -> Vec<UnitdInstance> {
+ Self::map_processes_to_instances(processes).into_iter().collect()
+ }
+
+ fn map_processes_to_instances(processes: Vec<UnitdProcess>) -> Vec<UnitdInstance> {
+ fn unitd_path_from_process(process: &UnitdProcess) -> Result<Box<Path>, UnitClientError> {
+ match process.executable_path() {
+ Some(executable_path) => {
+ let is_absolute_working_dir = process
+ .working_dir
+ .as_ref()
+ .map(|p| p.is_absolute())
+ .unwrap_or_default();
+ if executable_path.is_absolute() {
+ Ok(executable_path.to_owned())
+ } else if executable_path.is_relative() && is_absolute_working_dir {
+ let new_path = process
+ .working_dir
+ .as_ref()
+ .unwrap()
+ .join(executable_path)
+ .canonicalize()
+ .map(|path| path.into_boxed_path())
+ .map_err(|error| UnitClientError::UnitdProcessParseError {
+ message: format!("Error canonicalizing unitd executable path: {}", error),
+ pid: process.process_id,
+ })?;
+ Ok(new_path)
+ } else if process.container.is_none() {
+ Err(UnitClientError::UnitdProcessParseError {
+ message: "Unable to get absolute unitd executable path from process".to_string(),
+ pid: process.process_id,
+ })
+ } else {
+ // container case
+ Ok(PathBuf::from("/").into_boxed_path())
+ }
+ }
+ None => Err(UnitClientError::UnitdProcessParseError {
+ message: "Unable to get unitd executable path from process".to_string(),
+ pid: process.process_id,
+ }),
+ }
+ }
+
+ fn map_process_to_unitd_instance(process: &UnitdProcess) -> UnitdInstance {
+ match unitd_path_from_process(process) {
+ Ok(_) if process.container.is_some() => {
+ let mut err = vec![];
+ // double check that it is running
+ let running = process.container.as_ref().unwrap().container_is_running();
+
+ if running.is_none() || !running.unwrap() {
+ err.push(UnitClientError::UnitdProcessParseError {
+ message: "process container is not running".to_string(),
+ pid: process.process_id,
+ });
+ }
+
+ UnitdInstance {
+ process: process.to_owned(),
+ configure_options: None,
+ errors: err,
+ }
+ }
+ Ok(unitd_path) => match UnitdConfigureOptions::new(&unitd_path.clone().into_path_buf()) {
+ Ok(configure_options) => UnitdInstance {
+ process: process.to_owned(),
+ configure_options: Some(configure_options),
+ errors: vec![],
+ },
+ Err(error) => {
+ let error = UnitClientError::UnitdProcessExecError {
+ source: error,
+ executable_path: unitd_path.to_string_lossy().parse().unwrap_or_default(),
+ message: "Error running unitd binary to get configure options".to_string(),
+ pid: process.process_id,
+ };
+ UnitdInstance {
+ process: process.to_owned(),
+ configure_options: None,
+ errors: vec![error],
+ }
+ }
+ },
+ Err(err) => UnitdInstance {
+ process: process.to_owned(),
+ configure_options: None,
+ errors: vec![err],
+ },
+ }
+ }
+
+ processes
+ .iter()
+ // This converts processes into a UnitdInstance
+ .map(map_process_to_unitd_instance)
+ .collect()
+ }
+
+ fn version(&self) -> Option<String> {
+ match self.process.cmd()?.version {
+ Some(version) => Some(version),
+ None => self.configure_options.as_ref().map(|opts| opts.version.to_string()),
+ }
+ }
+
+ fn flag_or_default_option<R>(
+ &self,
+ read_flag: fn(RuntimeFlags) -> Option<R>,
+ read_opts: fn(UnitdConfigureOptions) -> Option<R>,
+ ) -> Option<R> {
+ self.process
+ .cmd()?
+ .flags
+ .and_then(read_flag)
+ .or_else(|| self.configure_options.to_owned().and_then(read_opts))
+ }
+
+ pub fn control_api_socket_address(&self) -> Option<String> {
+ self.flag_or_default_option(
+ |flags| flags.control_api_socket_address(),
+ |opts| opts.default_control_api_socket_address(),
+ )
+ }
+
+ pub fn pid_path(&self) -> Option<Box<Path>> {
+ self.flag_or_default_option(|flags| flags.pid_path(), |opts| opts.default_pid_path())
+ }
+
+ pub fn log_path(&self) -> Option<Box<Path>> {
+ self.flag_or_default_option(|flags| flags.log_path(), |opts| opts.default_log_path())
+ }
+
+ pub fn modules_directory(&self) -> Option<Box<Path>> {
+ self.flag_or_default_option(
+ |flags| flags.modules_directory(),
+ |opts| opts.default_modules_directory(),
+ )
+ }
+
+ pub fn state_directory(&self) -> Option<Box<Path>> {
+ self.flag_or_default_option(|flags| flags.state_directory(), |opts| opts.default_state_directory())
+ }
+
+ pub fn tmp_directory(&self) -> Option<Box<Path>> {
+ self.flag_or_default_option(|flags| flags.tmp_directory(), |opts| opts.default_tmp_directory())
+ }
+}
+
+impl fmt::Display for UnitdInstance {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ const UNKNOWN: &str = "[unknown]";
+ let version = self.version().unwrap_or_else(|| String::from("[unknown]"));
+ let runtime_flags = self
+ .process
+ .cmd()
+ .and_then(|cmd| cmd.flags)
+ .map(|flags| flags.to_string())
+ .unwrap_or_else(|| UNKNOWN.into());
+ let configure_flags = self
+ .configure_options
+ .as_ref()
+ .map(|opts| opts.all_flags.clone())
+ .unwrap_or_else(|| UNKNOWN.into());
+ let unitd_path: String = self
+ .process
+ .executable_path()
+ .map(|p| p.to_string_lossy().into())
+ .unwrap_or_else(|| UNKNOWN.into());
+ let working_dir: String = self
+ .process
+ .working_dir
+ .as_ref()
+ .map(|p| p.to_string_lossy().into())
+ .unwrap_or_else(|| UNKNOWN.into());
+ let socket_address = self.control_api_socket_address().unwrap_or_else(|| UNKNOWN.to_string());
+ let child_pids = self
+ .process
+ .child_pids
+ .iter()
+ .map(u64::to_string)
+ .collect::<Vec<String>>()
+ .join(", ");
+
+ writeln!(
+ f,
+ "{} instance [pid: {}, version: {}]:",
+ self.process.binary_name, self.process.process_id, version
+ )?;
+ writeln!(f, " Executable: {}", unitd_path)?;
+ writeln!(f, " Process working directory: {}", working_dir)?;
+ write!(f, " Process ownership: ")?;
+ if let Some(user) = &self.process.user {
+ writeln!(f, "name: {}, uid: {}, gid: {}", user.name, user.uid, user.gid)?;
+ } else {
+ writeln!(f, "{}", UNKNOWN)?;
+ }
+ write!(f, " Process effective ownership: ")?;
+ if let Some(user) = &self.process.effective_user {
+ writeln!(f, "name: {}, uid: {}, gid: {}", user.name, user.uid, user.gid)?;
+ } else {
+ writeln!(f, "{}", UNKNOWN)?;
+ }
+
+ writeln!(f, " API control unix socket: {}", socket_address)?;
+ writeln!(f, " Child processes ids: {}", child_pids)?;
+ writeln!(f, " Runtime flags: {}", runtime_flags)?;
+ writeln!(f, " Configure options: {}", configure_flags)?;
+
+ if let Some(ctr) = &self.process.container {
+ writeln!(f, " Container:")?;
+ writeln!(f, " Platform: {}", ctr.platform)?;
+ if let Some(id) = ctr.container_id.clone() {
+ writeln!(f, " Container ID: {}", id)?;
+ }
+ writeln!(f, " Mounts:")?;
+ for (k, v) in &ctr.mounts {
+ writeln!(f, " {} => {}", k.to_string_lossy(), v.to_string_lossy())?;
+ }
+ }
+
+ if !self.errors.is_empty() {
+ write!(f, " Errors:")?;
+ for error in &self.errors {
+ write!(f, "\n {}", error)?;
+ }
+ }
+
+ Ok(())
+ }
+}
+
+pub fn find_executable_path(specific_path: Result<String, Box<dyn StdError>>) -> Result<PathBuf, Box<dyn StdError>> {
+ fn find_unitd_in_system_path() -> Vec<PathBuf> {
+ UNITD_BINARY_NAMES
+ .iter()
+ .map(which)
+ .filter_map(Result::ok)
+ .collect::<Vec<PathBuf>>()
+ }
+
+ match specific_path {
+ Ok(path) => Ok(PathBuf::from(path)),
+ Err(_) => {
+ let unitd_paths = find_unitd_in_system_path();
+ if unitd_paths.is_empty() {
+ let err_msg = format!(
+ "Could not find unitd in system path or in UNITD_PATH environment variable. Searched for: {:?}",
+ UNITD_BINARY_NAMES
+ );
+ let err = io::Error::new(io::ErrorKind::NotFound, err_msg);
+ Err(Box::from(err))
+ } else {
+ Ok(unitd_paths[0].clone())
+ }
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use rand::rngs::StdRng;
+ use rand::{RngCore, SeedableRng};
+
+ // We don't need a secure seed for testing, in fact it is better that we have a
+ // predictable value
+ const SEED: [u8; 32] = [
+ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29,
+ 30, 31,
+ ];
+ #[tokio::test]
+ async fn can_find_unitd_instances() {
+ UnitdInstance::running_unitd_instances().await.iter().for_each(|p| {
+ println!("{:?}", p);
+ println!("Runtime Flags: {:?}", p.process.cmd().map(|c| c.flags));
+ println!("Temp directory: {:?}", p.tmp_directory());
+ })
+ }
+
+ fn mock_process<S: Into<String>>(
+ rng: &mut StdRng,
+ binary_name: S,
+ executable_path: Option<String>,
+ ) -> UnitdProcess {
+ UnitdProcess {
+ process_id: rng.next_u32() as u64,
+ binary_name: binary_name.into(),
+ executable_path: executable_path.map(|p| Box::from(Path::new(&p))),
+ environ: vec![],
+ all_cmds: vec![],
+ working_dir: Some(Box::from(Path::new("/opt/unit"))),
+ child_pids: vec![],
+ user: None,
+ effective_user: None,
+ container: None,
+ }
+ }
+
+ #[test]
+ fn will_list_without_errors_valid_processes() {
+ let specific_path = std::env::var(UNITD_PATH_ENV_KEY).map_err(|error| Box::new(error) as Box<dyn StdError>);
+ let binding = match find_executable_path(specific_path) {
+ Ok(path) => path,
+ Err(error) => {
+ eprintln!("Could not find unitd executable path: {} - skipping test", error);
+ return;
+ }
+ };
+ let binary_name = binding
+ .file_name()
+ .expect("Could not get binary name")
+ .to_string_lossy()
+ .to_string();
+ let unitd_path = binding.to_string_lossy();
+ let mut rng: StdRng = SeedableRng::from_seed(SEED);
+
+ let processes = vec![
+ mock_process(&mut rng, &binary_name, Some(unitd_path.to_string())),
+ mock_process(&mut rng, &binary_name, Some(unitd_path.to_string())),
+ ];
+ let instances = UnitdInstance::collect_unitd_processes(processes);
+ // assert_eq!(instances.len(), 3);
+ instances.iter().for_each(|p| {
+ assert_eq!(p.errors.len(), 0, "Expected no errors, got: {:?}", p.errors);
+ })
+ }
+}
diff --git a/tools/unitctl/unit-client-rs/src/unitd_process.rs b/tools/unitctl/unit-client-rs/src/unitd_process.rs
new file mode 100644
index 00000000..3dc0c3af
--- /dev/null
+++ b/tools/unitctl/unit-client-rs/src/unitd_process.rs
@@ -0,0 +1,196 @@
+use crate::unitd_cmd::UnitdCmd;
+use crate::unitd_docker::{pid_is_dockerized, UnitdContainer};
+use crate::unitd_instance::UNITD_BINARY_NAMES;
+use crate::unitd_process_user::UnitdProcessUser;
+use serde::ser::SerializeMap;
+use serde::{Serialize, Serializer};
+use std::collections::HashMap;
+use std::path::Path;
+use sysinfo::{Pid, Process, ProcessRefreshKind, System, UpdateKind, Users};
+
+#[derive(Debug, Clone)]
+pub struct UnitdProcess {
+ pub binary_name: String,
+ pub process_id: u64,
+ pub executable_path: Option<Box<Path>>,
+ pub environ: Vec<String>,
+ pub all_cmds: Vec<String>,
+ pub working_dir: Option<Box<Path>>,
+ pub child_pids: Vec<u64>,
+ pub user: Option<UnitdProcessUser>,
+ pub effective_user: Option<UnitdProcessUser>,
+ pub container: Option<UnitdContainer>,
+}
+
+impl Serialize for UnitdProcess {
+ fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+ where
+ S: Serializer,
+ {
+ // 6 = fields to serialize
+ let mut state = serializer.serialize_map(Some(6))?;
+ state.serialize_entry("pid", &self.process_id)?;
+ state.serialize_entry("user", &self.user)?;
+ state.serialize_entry("effective_user", &self.effective_user)?;
+ state.serialize_entry("executable", &self.executable_path())?;
+ state.serialize_entry("child_pids", &self.child_pids)?;
+ state.serialize_entry("container", &self.container)?;
+ state.end()
+ }
+}
+
+impl UnitdProcess {
+ pub fn find_unitd_processes() -> Vec<UnitdProcess> {
+ let process_refresh_kind = ProcessRefreshKind::new()
+ .with_cmd(UpdateKind::Always)
+ .with_cwd(UpdateKind::Always)
+ .with_exe(UpdateKind::Always)
+ .with_user(UpdateKind::Always);
+ let refresh_kind = sysinfo::RefreshKind::new().with_processes(process_refresh_kind);
+ let sys = System::new_with_specifics(refresh_kind);
+ let unitd_processes: HashMap<&Pid, &Process> = sys
+ .processes()
+ .iter()
+ .filter(|p| {
+ let process_name = p.1.name();
+ UNITD_BINARY_NAMES.contains(&process_name)
+ })
+ .collect::<HashMap<&Pid, &Process>>();
+ let users = Users::new_with_refreshed_list();
+
+ unitd_processes
+ .iter()
+ // Filter out child processes
+ .filter(|p| {
+ #[cfg(target_os = "linux")]
+ if pid_is_dockerized(p.0.as_u32().into()) {
+ return false;
+ }
+ let parent_pid = p.1.parent();
+ match parent_pid {
+ Some(pid) => !unitd_processes.contains_key(&pid),
+ None => false,
+ }
+ })
+ .map(|p| {
+ let tuple = p.to_owned();
+ /* The sysinfo library only supports 32-bit pids, yet larger values are possible
+ * if the OS is configured to support it, thus we use 64-bit integers internally
+ * because it is just a matter of time until the library changes to larger values. */
+ let pid = *tuple.0;
+ let process = *tuple.1;
+ let process_id: u64 = pid.as_u32().into();
+ let executable_path: Option<Box<Path>> = process.exe().map(|p| p.to_path_buf().into_boxed_path());
+ let environ: Vec<String> = process.environ().into();
+ let cmd: Vec<String> = process.cmd().into();
+ let working_dir: Option<Box<Path>> = process.cwd().map(|p| p.to_path_buf().into_boxed_path());
+ let child_pids = unitd_processes
+ .iter()
+ .filter_map(|p| p.to_owned().1.parent())
+ .filter(|parent_pid| parent_pid == pid)
+ .map(|p| p.as_u32() as u64)
+ .collect::<Vec<u64>>();
+
+ let user = process
+ .user_id()
+ .and_then(|uid| users.get_user_by_id(uid))
+ .map(UnitdProcessUser::from);
+ let effective_user = process
+ .effective_user_id()
+ .and_then(|uid| users.get_user_by_id(uid))
+ .map(UnitdProcessUser::from);
+
+ UnitdProcess {
+ binary_name: process.name().to_string(),
+ process_id,
+ executable_path,
+ environ,
+ all_cmds: cmd,
+ working_dir,
+ child_pids,
+ user,
+ effective_user,
+ container: None,
+ }
+ })
+ .collect::<Vec<UnitdProcess>>()
+ }
+
+ pub fn cmd(&self) -> Option<UnitdCmd> {
+ if self.all_cmds.is_empty() {
+ return None;
+ }
+
+ match UnitdCmd::new(self.all_cmds[0].clone(), self.binary_name.as_ref()) {
+ Ok(cmd) => Some(cmd),
+ Err(error) => {
+ eprintln!("Failed to parse process cmd: {}", error);
+ None
+ }
+ }
+ }
+
+ pub fn executable_path(&self) -> Option<Box<Path>> {
+ if self.executable_path.is_some() {
+ return self.executable_path.clone();
+ }
+ self.cmd().and_then(|cmd| cmd.process_executable_path)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ fn can_parse_runtime_cmd_absolute_path(binary_name: &str) {
+ let cmd = format!(
+ "unit: main v1.28.0 [/usr/sbin/{} --log /var/log/unit.log --pid /var/run/unit.pid]",
+ binary_name
+ );
+ let unitd_cmd = UnitdCmd::new(cmd, binary_name).expect("Failed to parse unitd cmd");
+ assert_eq!(unitd_cmd.version.unwrap(), "1.28.0");
+ assert_eq!(
+ unitd_cmd.process_executable_path.unwrap().to_string_lossy(),
+ format!("/usr/sbin/{}", binary_name)
+ );
+ let flags = unitd_cmd.flags.unwrap();
+ assert_eq!(flags.get_flag_value("log").unwrap(), "/var/log/unit.log");
+ assert_eq!(flags.get_flag_value("pid").unwrap(), "/var/run/unit.pid");
+ }
+
+ fn can_parse_runtime_cmd_relative_path(binary_name: &str) {
+ let cmd = format!(
+ "unit: main v1.29.0 [./sbin/{} --no-daemon --tmp /tmp --something]",
+ binary_name
+ );
+ let unitd_cmd = UnitdCmd::new(cmd, binary_name).expect("Failed to parse unitd cmd");
+ assert_eq!(unitd_cmd.version.unwrap(), "1.29.0");
+ assert_eq!(
+ unitd_cmd.process_executable_path.unwrap().to_string_lossy(),
+ format!("./sbin/{}", binary_name)
+ );
+ let flags = unitd_cmd.flags.unwrap();
+ assert_eq!(flags.get_flag_value("tmp").unwrap(), "/tmp");
+ assert!(flags.has_flag("something"));
+ }
+
+ #[test]
+ fn can_parse_runtime_cmd_unitd_absolute_path() {
+ can_parse_runtime_cmd_absolute_path("unitd");
+ }
+
+ #[test]
+ fn can_parse_runtime_cmd_unitd_debug_absolute_path() {
+ can_parse_runtime_cmd_absolute_path("unitd-debug");
+ }
+
+ #[test]
+ fn can_parse_runtime_cmd_unitd_relative_path() {
+ can_parse_runtime_cmd_relative_path("unitd");
+ }
+
+ #[test]
+ fn can_parse_runtime_cmd_unitd_debug_relative_path() {
+ can_parse_runtime_cmd_relative_path("unitd-debug");
+ }
+}
diff --git a/tools/unitctl/unit-client-rs/src/unitd_process_user.rs b/tools/unitctl/unit-client-rs/src/unitd_process_user.rs
new file mode 100644
index 00000000..c4f9be22
--- /dev/null
+++ b/tools/unitctl/unit-client-rs/src/unitd_process_user.rs
@@ -0,0 +1,36 @@
+use serde::Serialize;
+use std::fmt;
+use std::fmt::Display;
+use sysinfo::User;
+
+#[derive(Debug, Clone, Serialize)]
+pub struct UnitdProcessUser {
+ pub name: String,
+ pub uid: u32,
+ pub gid: u32,
+ pub groups: Vec<String>,
+}
+
+impl Display for UnitdProcessUser {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ write!(
+ f,
+ "name: {}, uid: {}, gid: {}, groups: {}",
+ self.name,
+ self.uid,
+ self.gid,
+ self.groups.join(", ")
+ )
+ }
+}
+
+impl From<&User> for UnitdProcessUser {
+ fn from(user: &User) -> Self {
+ UnitdProcessUser {
+ name: user.name().into(),
+ uid: *user.id().clone(),
+ gid: *user.group_id(),
+ groups: user.groups().iter().map(|g| g.name().into()).collect(),
+ }
+ }
+}