summaryrefslogtreecommitdiffhomepage
path: root/tools/unitctl/unit-client-rs
diff options
context:
space:
mode:
Diffstat (limited to 'tools/unitctl/unit-client-rs')
-rw-r--r--tools/unitctl/unit-client-rs/Cargo.toml2
-rw-r--r--tools/unitctl/unit-client-rs/src/lib.rs1
-rw-r--r--tools/unitctl/unit-client-rs/src/unit_client.rs146
-rw-r--r--tools/unitctl/unit-client-rs/src/unitd_cmd.rs5
-rw-r--r--tools/unitctl/unit-client-rs/src/unitd_docker.rs282
-rw-r--r--tools/unitctl/unit-client-rs/src/unitd_instance.rs77
-rw-r--r--tools/unitctl/unit-client-rs/src/unitd_process.rs34
7 files changed, 447 insertions, 100 deletions
diff --git a/tools/unitctl/unit-client-rs/Cargo.toml b/tools/unitctl/unit-client-rs/Cargo.toml
index d3b2f9cf..3e48ee23 100644
--- a/tools/unitctl/unit-client-rs/Cargo.toml
+++ b/tools/unitctl/unit-client-rs/Cargo.toml
@@ -27,6 +27,8 @@ which = "5.0"
unit-openapi = { path = "../unit-openapi" }
rustls = "0.23.5"
+bollard = "0.16.1"
+regex = "1.10.4"
[dev-dependencies]
rand = "0.8.5"
diff --git a/tools/unitctl/unit-client-rs/src/lib.rs b/tools/unitctl/unit-client-rs/src/lib.rs
index dca8a86f..a0933f42 100644
--- a/tools/unitctl/unit-client-rs/src/lib.rs
+++ b/tools/unitctl/unit-client-rs/src/lib.rs
@@ -10,6 +10,7 @@ mod runtime_flags;
pub mod unit_client;
mod unitd_cmd;
pub mod unitd_configure_options;
+pub mod unitd_docker;
pub mod unitd_instance;
pub mod unitd_process;
mod unitd_process_user;
diff --git a/tools/unitctl/unit-client-rs/src/unit_client.rs b/tools/unitctl/unit-client-rs/src/unit_client.rs
index b856fd20..7456b106 100644
--- a/tools/unitctl/unit-client-rs/src/unit_client.rs
+++ b/tools/unitctl/unit-client-rs/src/unit_client.rs
@@ -1,7 +1,6 @@
use std::collections::HashMap;
use std::error::Error as StdError;
use std::fmt::Debug;
-use std::future::Future;
use std::rc::Rc;
use std::{fmt, io};
@@ -13,7 +12,6 @@ use hyper::{http, Body, Client, Request};
use hyper_tls::HttpsConnector;
use hyperlocal::{UnixClientExt, UnixConnector};
use serde::{Deserialize, Serialize};
-use tokio::runtime::Runtime;
use crate::control_socket_address::ControlSocket;
use unit_openapi::apis::configuration::Configuration;
@@ -168,51 +166,38 @@ where
#[derive(Debug)]
pub struct UnitClient {
pub control_socket: ControlSocket,
- /// A `current_thread` runtime for executing operations on the
- /// asynchronous client in a blocking manner.
- rt: Runtime,
/// Client for communicating with the control API over the UNIX domain socket
client: Box<RemoteClient<Body>>,
}
impl UnitClient {
- pub fn new_with_runtime(control_socket: ControlSocket, runtime: Runtime) -> Self {
+ pub fn new(control_socket: ControlSocket) -> Self {
if control_socket.is_local_socket() {
- Self::new_unix(control_socket, runtime)
+ Self::new_unix(control_socket)
} else {
- Self::new_http(control_socket, runtime)
+ Self::new_http(control_socket)
}
}
- pub fn new(control_socket: ControlSocket) -> Self {
- let runtime = tokio::runtime::Builder::new_current_thread()
- .enable_all()
- .build()
- .expect("Unable to create a current_thread runtime");
- Self::new_with_runtime(control_socket, runtime)
- }
-
- pub fn new_http(control_socket: ControlSocket, runtime: Runtime) -> Self {
+ pub fn new_http(control_socket: ControlSocket) -> Self {
let remote_client = Client::builder().build(HttpsConnector::new());
Self {
control_socket,
- rt: runtime,
client: Box::from(RemoteClient::Tcp { client: remote_client }),
}
}
- pub fn new_unix(control_socket: ControlSocket, runtime: Runtime) -> UnitClient {
+ pub fn new_unix(control_socket: ControlSocket) -> UnitClient {
let remote_client = Client::unix();
Self {
control_socket,
- rt: runtime,
client: Box::from(RemoteClient::Unix { client: remote_client }),
}
}
/// Sends a request to UNIT and deserializes the JSON response body into the value of type `RESPONSE`.
- pub fn send_request_and_deserialize_response<RESPONSE: for<'de> serde::Deserialize<'de>>(
+ pub async fn send_request_and_deserialize_response<RESPONSE: for<'de> serde::Deserialize<'de>>(
&self,
mut request: Request<Body>,
) -> Result<RESPONSE, UnitClientError> {
@@ -223,34 +208,32 @@ impl UnitClient {
let response_future = self.client.request(request);
- self.rt.block_on(async {
- let response = response_future
- .await
- .map_err(|error| UnitClientError::new(error, self.control_socket.to_string(), path.to_string()))?;
-
- let status = response.status();
- let body = hyper::body::aggregate(response)
- .await
- .map_err(|error| UnitClientError::new(error, self.control_socket.to_string(), path.to_string()))?;
- let reader = &mut body.reader();
- if !status.is_success() {
- let error: HashMap<String, String> =
- serde_json::from_reader(reader).map_err(|error| UnitClientError::JsonError {
- source: error,
- path: path.to_string(),
- })?;
-
- return Err(UnitClientError::HttpResponseJsonBodyError {
- status,
+ let response = response_future
+ .await
+ .map_err(|error| UnitClientError::new(error, self.control_socket.to_string(), path.to_string()))?;
+
+ let status = response.status();
+ let body = hyper::body::aggregate(response)
+ .await
+ .map_err(|error| UnitClientError::new(error, self.control_socket.to_string(), path.to_string()))?;
+ let reader = &mut body.reader();
+ if !status.is_success() {
+ let error: HashMap<String, String> =
+ serde_json::from_reader(reader).map_err(|error| UnitClientError::JsonError {
+ source: error,
path: path.to_string(),
- error: error.get("error").unwrap_or(&"Unknown error".into()).to_string(),
- detail: error.get("detail").unwrap_or(&"".into()).to_string(),
- });
- }
- serde_json::from_reader(reader).map_err(|error| UnitClientError::JsonError {
- source: error,
+ })?;
+
+ return Err(UnitClientError::HttpResponseJsonBodyError {
+ status,
path: path.to_string(),
- })
+ error: error.get("error").unwrap_or(&"Unknown error".into()).to_string(),
+ detail: error.get("detail").unwrap_or(&"".into()).to_string(),
+ });
+ }
+ serde_json::from_reader(reader).map_err(|error| UnitClientError::JsonError {
+ source: error,
+ path: path.to_string(),
})
}
@@ -258,23 +241,17 @@ impl UnitClient {
new_openapi_client!(self, ListenersApiClient, ListenersApi)
}
- pub fn listeners(&self) -> Result<HashMap<String, ConfigListener>, Box<UnitClientError>> {
- let list_listeners = self.listeners_api().get_listeners();
- self.execute_openapi_future(list_listeners)
- }
-
- pub fn execute_openapi_future<F: Future<Output = Result<R, OpenAPIError>>, R: for<'de> serde::Deserialize<'de>>(
- &self,
- future: F,
- ) -> Result<R, Box<UnitClientError>> {
- self.rt.block_on(future).map_err(|error| {
- let remapped_error = if let OpenAPIError::Hyper(hyper_error) = error {
- UnitClientError::new(hyper_error, self.control_socket.to_string(), "".to_string())
+ pub async fn listeners(&self) -> Result<HashMap<String, ConfigListener>, Box<UnitClientError>> {
+ self.listeners_api().get_listeners().await.or_else(|err| {
+ if let OpenAPIError::Hyper(hyper_error) = err {
+ Err(Box::new(UnitClientError::new(
+ hyper_error,
+ self.control_socket.to_string(),
+ "".to_string(),
+ )))
} else {
- UnitClientError::OpenAPIError { source: error }
- };
-
- Box::new(remapped_error)
+ Err(Box::new(UnitClientError::OpenAPIError { source: err }))
+ }
})
}
@@ -282,13 +259,22 @@ impl UnitClient {
new_openapi_client!(self, StatusApiClient, StatusApi)
}
- pub fn status(&self) -> Result<Status, Box<UnitClientError>> {
- let status = self.status_api().get_status();
- self.execute_openapi_future(status)
+ pub async fn status(&self) -> Result<Status, Box<UnitClientError>> {
+ self.status_api().get_status().await.or_else(|err| {
+ if let OpenAPIError::Hyper(hyper_error) = err {
+ Err(Box::new(UnitClientError::new(
+ hyper_error,
+ self.control_socket.to_string(),
+ "".to_string(),
+ )))
+ } else {
+ Err(Box::new(UnitClientError::OpenAPIError { source: err }))
+ }
+ })
}
- pub fn is_running(&self) -> bool {
- self.status().is_ok()
+ pub async fn is_running(&self) -> bool {
+ self.status().await.is_ok()
}
}
@@ -336,9 +322,9 @@ mod tests {
use super::*;
// Integration tests
- #[test]
- fn can_connect_to_unit_api() {
- match UnitdInstance::running_unitd_instances().first() {
+ #[tokio::test]
+ async fn can_connect_to_unit_api() {
+ match UnitdInstance::running_unitd_instances().await.first() {
Some(unit_instance) => {
let control_api_socket_address = unit_instance
.control_api_socket_address()
@@ -346,7 +332,7 @@ mod tests {
let control_socket = ControlSocket::try_from(control_api_socket_address)
.expect("Unable to parse control socket address");
let unit_client = UnitClient::new(control_socket);
- assert!(unit_client.is_running());
+ assert!(unit_client.is_running().await);
}
None => {
eprintln!("No running unitd instances found - skipping test");
@@ -354,9 +340,9 @@ mod tests {
}
}
- #[test]
- fn can_get_unit_status() {
- match UnitdInstance::running_unitd_instances().first() {
+ #[tokio::test]
+ async fn can_get_unit_status() {
+ match UnitdInstance::running_unitd_instances().await.first() {
Some(unit_instance) => {
let control_api_socket_address = unit_instance
.control_api_socket_address()
@@ -364,7 +350,7 @@ mod tests {
let control_socket = ControlSocket::try_from(control_api_socket_address)
.expect("Unable to parse control socket address");
let unit_client = UnitClient::new(control_socket);
- let status = unit_client.status().expect("Unable to get unit status");
+ let status = unit_client.status().await.expect("Unable to get unit status");
println!("Unit status: {:?}", status);
}
None => {
@@ -373,9 +359,9 @@ mod tests {
}
}
- #[test]
- fn can_get_unit_listeners() {
- match UnitdInstance::running_unitd_instances().first() {
+ #[tokio::test]
+ async fn can_get_unit_listeners() {
+ match UnitdInstance::running_unitd_instances().await.first() {
Some(unit_instance) => {
let control_api_socket_address = unit_instance
.control_api_socket_address()
@@ -383,7 +369,7 @@ mod tests {
let control_socket = ControlSocket::try_from(control_api_socket_address)
.expect("Unable to parse control socket address");
let unit_client = UnitClient::new(control_socket);
- unit_client.listeners().expect("Unable to get Unit listeners");
+ unit_client.listeners().await.expect("Unable to get Unit listeners");
}
None => {
eprintln!("No running unitd instances found - skipping test");
diff --git a/tools/unitctl/unit-client-rs/src/unitd_cmd.rs b/tools/unitctl/unit-client-rs/src/unitd_cmd.rs
index c4883ed5..17563cb0 100644
--- a/tools/unitctl/unit-client-rs/src/unitd_cmd.rs
+++ b/tools/unitctl/unit-client-rs/src/unitd_cmd.rs
@@ -28,11 +28,13 @@ impl UnitdCmd {
.expect("Unable to parse cmd")
.splitn(2, " [")
.collect::<Vec<&str>>();
+
if parts.len() != 2 {
let msg = format!("cmd does not have the expected format: {}", process_cmd);
return Err(IoError::new(ErrorKind::InvalidInput, msg).into());
}
- let version: Option<String> = Some(parts[0].to_string());
+
+ let version = Some(parts[0].to_string());
let executable_path = UnitdCmd::parse_executable_path_from_cmd(parts[1], binary_name);
let flags = UnitdCmd::parse_runtime_flags_from_cmd(parts[1]);
@@ -69,6 +71,7 @@ impl UnitdCmd {
if cmd.is_empty() {
return None;
}
+
// Split out everything in between the brackets [ and ]
let split = cmd.trim_end_matches(']').splitn(2, '[').collect::<Vec<&str>>();
if split.is_empty() {
diff --git a/tools/unitctl/unit-client-rs/src/unitd_docker.rs b/tools/unitctl/unit-client-rs/src/unitd_docker.rs
new file mode 100644
index 00000000..d5028afc
--- /dev/null
+++ b/tools/unitctl/unit-client-rs/src/unitd_docker.rs
@@ -0,0 +1,282 @@
+use std::collections::HashMap;
+use std::fs::read_to_string;
+use std::path::PathBuf;
+
+use crate::unitd_process::UnitdProcess;
+
+use bollard::secret::ContainerInspectResponse;
+use regex::Regex;
+use serde::ser::SerializeMap;
+use serde::{Serialize, Serializer};
+
+use bollard::{models::ContainerSummary, Docker};
+
+#[derive(Clone, Debug)]
+pub struct UnitdContainer {
+ pub container_id: Option<String>,
+ pub container_image: String,
+ pub command: Option<String>,
+ pub mounts: HashMap<PathBuf, PathBuf>,
+ pub platform: String,
+ details: Option<ContainerInspectResponse>,
+}
+
+impl From<&ContainerSummary> for UnitdContainer {
+ fn from(ctr: &ContainerSummary) -> Self {
+ // we assume paths from the docker api are absolute
+ // they certainly have to be later...
+ let mut mounts = HashMap::new();
+ if let Some(mts) = &ctr.mounts {
+ for i in mts {
+ if let Some(ref src) = i.source {
+ if let Some(ref dest) = i.destination {
+ mounts.insert(PathBuf::from(dest.clone()), PathBuf::from(src.clone()));
+ }
+ }
+ }
+ }
+
+ UnitdContainer {
+ container_id: ctr.id.clone(),
+ container_image: format!(
+ "{} (docker)",
+ ctr.image.clone().unwrap_or(String::from("unknown container")),
+ ),
+ command: ctr.command.clone(),
+ mounts: mounts,
+ platform: String::from("Docker"),
+ details: None,
+ }
+ }
+}
+
+impl From<&UnitdContainer> for UnitdProcess {
+ fn from(ctr: &UnitdContainer) -> Self {
+ let version = ctr.details.as_ref().and_then(|details| {
+ details.config.as_ref().and_then(|conf| {
+ conf.labels.as_ref().and_then(|labels| {
+ labels
+ .get("org.opencontainers.image.version")
+ .and_then(|version| Some(version.clone()))
+ })
+ })
+ });
+ let command = ctr.command.clone().and_then(|cmd| {
+ Some(format!(
+ "{}{} [{}{}]",
+ "unit: main v",
+ version.or(Some(String::from(""))).unwrap(),
+ ctr.container_image,
+ ctr.rewrite_socket(
+ cmd.strip_prefix("/usr/local/bin/docker-entrypoint.sh")
+ .or_else(|| Some(""))
+ .unwrap()
+ .to_string())
+ ))
+ });
+ let mut cmds = vec![];
+ let _ = command.map_or((), |cmd| cmds.push(cmd));
+ UnitdProcess {
+ all_cmds: cmds,
+ binary_name: ctr.container_image.clone(),
+ process_id: ctr
+ .details
+ .as_ref()
+ .and_then(|details| {
+ details
+ .state
+ .as_ref()
+ .and_then(|state| state.pid.and_then(|pid| Some(pid.clone() as u64)))
+ })
+ .or(Some(0 as u64))
+ .unwrap(),
+ executable_path: None,
+ environ: vec![],
+ working_dir: ctr.details.as_ref().and_then(|details| {
+ details.config.as_ref().and_then(|conf| {
+ Some(
+ PathBuf::from(
+ conf.working_dir
+ .as_ref()
+ .map_or(String::new(), |dir| ctr.host_path(dir.clone())),
+ )
+ .into_boxed_path(),
+ )
+ })
+ }),
+ child_pids: vec![],
+ user: None,
+ effective_user: None,
+ container: Some(ctr.clone()),
+ }
+ }
+}
+
+impl Serialize for UnitdContainer {
+ fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+ where
+ S: Serializer,
+ {
+ let mut state = serializer.serialize_map(Some(5))?;
+ state.serialize_entry("container_id", &self.container_id)?;
+ state.serialize_entry("container_image", &self.container_image)?;
+ state.serialize_entry("command", &self.command)?;
+ state.serialize_entry("mounts", &self.mounts)?;
+ state.serialize_entry("platform", &self.platform)?;
+ state.end()
+ }
+}
+
+impl UnitdContainer {
+ pub async fn find_unitd_containers() -> Vec<UnitdContainer> {
+ if let Ok(docker) = Docker::connect_with_local_defaults() {
+ match docker.list_containers::<String>(None).await {
+ Err(e) => {
+ eprintln!("{}", e);
+ vec![]
+ }
+ Ok(summary) => {
+ // cant do this functionally because of the async call
+ let mut mapped = vec![];
+ for ctr in summary {
+ if ctr.clone().image.or(Some(String::new())).unwrap().contains("unit") {
+ let mut c = UnitdContainer::from(&ctr);
+ if let Some(names) = ctr.names {
+ if names.len() > 0 {
+ let name = names[0].strip_prefix("/").or(Some(names[0].as_str())).unwrap();
+ if let Ok(cir) = docker.inspect_container(name, None).await {
+ c.details = Some(cir);
+ }
+ }
+ }
+ mapped.push(c);
+ }
+ }
+ mapped
+ }
+ }
+ } else {
+ vec![]
+ }
+ }
+
+ pub fn host_path(&self, container_path: String) -> String {
+ let cp = PathBuf::from(container_path);
+
+ // get only possible mount points
+ // sort to deepest mountpoint first
+ // assumed deepest possible mount point takes precedence
+ let mut keys = self
+ .mounts
+ .clone()
+ .into_keys()
+ .filter(|mp| cp.as_path().starts_with(mp))
+ .collect::<Vec<_>>();
+ keys.sort_by_key(|a| 0 as isize - a.ancestors().count() as isize);
+
+ // either return translated path or original prefixed with "container"
+ if keys.len() > 0 {
+ self.mounts[&keys[0]]
+ .clone()
+ .join(
+ cp.as_path()
+ .strip_prefix(keys[0].clone())
+ .expect("error checking path prefix"),
+ )
+ .to_string_lossy()
+ .to_string()
+ } else {
+ format!("<container>:{}", cp.display())
+ }
+ }
+
+ pub fn rewrite_socket(&self, command: String) -> String {
+ command
+ .split(" ")
+ .map(|tok| if tok.starts_with("unix:") {
+ format!("unix:{}", self.host_path(
+ tok.strip_prefix("unix:")
+ .unwrap()
+ .to_string()))
+ } else {
+ tok.to_string()
+ })
+ .collect::<Vec<_>>()
+ .join(" ")
+ }
+
+ pub fn container_is_running(&self) -> Option<bool> {
+ self.details
+ .as_ref()
+ .and_then(|details| details.state.as_ref().and_then(|state| state.running))
+ }
+}
+
+/* Returns either 64 char docker container ID or None */
+pub fn pid_is_dockerized(pid: u64) -> bool {
+ let cg_filepath = format!("/proc/{}/cgroup", pid);
+ match read_to_string(cg_filepath) {
+ Err(e) => {
+ eprintln!("{}", e);
+ false
+ }
+ Ok(contents) => {
+ let docker_re = Regex::new(r"docker-([a-zA-Z0-9]{64})").unwrap();
+ docker_re.is_match(contents.as_str())
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_path_translation() {
+ let mut mounts = HashMap::new();
+ mounts.insert("/1/2/3/4/5/6/7".into(), "/0".into());
+ mounts.insert("/root".into(), "/1".into());
+ mounts.insert("/root/mid".into(), "/2".into());
+ mounts.insert("/root/mid/child".into(), "/3".into());
+ mounts.insert("/mid/child".into(), "/4".into());
+ mounts.insert("/child".into(), "/5".into());
+
+ let ctr = UnitdContainer {
+ container_id: None,
+ container_image: String::from(""),
+ command: None,
+ platform: "test".to_string(),
+ details: None,
+ mounts: mounts,
+ };
+
+ assert_eq!(
+ "/3/c2/test".to_string(),
+ ctr.host_path("/root/mid/child/c2/test".to_string())
+ );
+ assert_eq!(
+ "<container>:/path/to/conf".to_string(),
+ ctr.host_path("/path/to/conf".to_string())
+ );
+ }
+
+ #[test]
+ fn test_unix_sock_path_translate() {
+ let mut mounts = HashMap::new();
+ mounts.insert("/var/run".into(), "/tmp".into());
+
+ let ctr = UnitdContainer {
+ container_id: None,
+ container_image: String::from(""),
+ command: None,
+ platform: "test".to_string(),
+ details: None,
+ mounts: mounts,
+ };
+
+ assert_eq!(
+ ctr.rewrite_socket("unitd --no-daemon --control unix:/var/run/control.unit.sock".to_string()),
+ "unitd --no-daemon --control unix:/tmp/control.unit.sock".to_string());
+
+ }
+}
diff --git a/tools/unitctl/unit-client-rs/src/unitd_instance.rs b/tools/unitctl/unit-client-rs/src/unitd_instance.rs
index 9467fcb7..86f8e73d 100644
--- a/tools/unitctl/unit-client-rs/src/unitd_instance.rs
+++ b/tools/unitctl/unit-client-rs/src/unitd_instance.rs
@@ -1,4 +1,5 @@
use crate::unit_client::UnitClientError;
+use crate::unitd_docker::UnitdContainer;
use serde::ser::SerializeMap;
use serde::{Serialize, Serializer};
use std::error::Error as StdError;
@@ -25,7 +26,7 @@ impl Serialize for UnitdInstance {
where
S: Serializer,
{
- let mut state = serializer.serialize_map(Some(15))?;
+ let mut state = serializer.serialize_map(Some(11))?;
let runtime_flags = self
.process
.cmd()
@@ -34,13 +35,9 @@ impl Serialize for UnitdInstance {
let configure_flags = self.configure_options.as_ref().map(|opts| opts.all_flags.clone());
- state.serialize_entry("pid", &self.process.process_id)?;
+ state.serialize_entry("process", &self.process)?;
state.serialize_entry("version", &self.version())?;
- state.serialize_entry("user", &self.process.user)?;
- state.serialize_entry("effective_user", &self.process.effective_user)?;
- state.serialize_entry("executable", &self.process.executable_path())?;
state.serialize_entry("control_socket", &self.control_api_socket_address())?;
- state.serialize_entry("child_pids", &self.process.child_pids)?;
state.serialize_entry("log_path", &self.log_path())?;
state.serialize_entry("pid_path", &self.pid_path())?;
state.serialize_entry("modules_directory", &self.modules_directory())?;
@@ -56,8 +53,19 @@ impl Serialize for UnitdInstance {
}
impl UnitdInstance {
- pub fn running_unitd_instances() -> Vec<UnitdInstance> {
- Self::collect_unitd_processes(UnitdProcess::find_unitd_processes())
+ pub async fn running_unitd_instances() -> Vec<UnitdInstance> {
+ Self::collect_unitd_processes(
+ UnitdProcess::find_unitd_processes()
+ .into_iter()
+ .chain(
+ UnitdContainer::find_unitd_containers()
+ .await
+ .into_iter()
+ .map(|x| UnitdProcess::from(&x))
+ .collect::<Vec<_>>(),
+ )
+ .collect(),
+ )
}
/// Find all running unitd processes and convert them into UnitdInstances and filter
@@ -91,11 +99,14 @@ impl UnitdInstance {
pid: process.process_id,
})?;
Ok(new_path)
- } else {
+ } else if process.container.is_none() {
Err(UnitClientError::UnitdProcessParseError {
message: "Unable to get absolute unitd executable path from process".to_string(),
pid: process.process_id,
})
+ } else {
+ // container case
+ Ok(PathBuf::from("/").into_boxed_path())
}
}
None => Err(UnitClientError::UnitdProcessParseError {
@@ -107,7 +118,30 @@ impl UnitdInstance {
fn map_process_to_unitd_instance(process: &UnitdProcess) -> UnitdInstance {
match unitd_path_from_process(process) {
- Ok(unitd_path) => match UnitdConfigureOptions::new(&unitd_path.clone().into_path_buf()) {
+ Ok(_) if process.container.is_some() => {
+ let mut err = vec![];
+ // double check that it is running
+ let running = process.container
+ .as_ref()
+ .unwrap()
+ .container_is_running();
+
+ if running.is_none() || !running.unwrap() {
+ err.push(UnitClientError::UnitdProcessParseError{
+ message: "process container is not running".to_string(),
+ pid: process.process_id,
+ });
+ }
+
+ UnitdInstance {
+ process: process.to_owned(),
+ configure_options: None,
+ errors: err,
+ }
+ },
+ Ok(unitd_path) => match UnitdConfigureOptions::new(
+ &unitd_path.clone()
+ .into_path_buf()) {
Ok(configure_options) => UnitdInstance {
process: process.to_owned(),
configure_options: Some(configure_options),
@@ -250,10 +284,22 @@ impl fmt::Display for UnitdInstance {
writeln!(f, " API control unix socket: {}", socket_address)?;
writeln!(f, " Child processes ids: {}", child_pids)?;
writeln!(f, " Runtime flags: {}", runtime_flags)?;
- write!(f, " Configure options: {}", configure_flags)?;
+ writeln!(f, " Configure options: {}", configure_flags)?;
+
+ if let Some(ctr) = &self.process.container {
+ writeln!(f, " Container:")?;
+ writeln!(f, " Platform: {}", ctr.platform)?;
+ if let Some(id) = ctr.container_id.clone() {
+ writeln!(f, " Container ID: {}", id)?;
+ }
+ writeln!(f, " Mounts:")?;
+ for (k, v) in &ctr.mounts {
+ writeln!(f, " {} => {}", k.to_string_lossy(), v.to_string_lossy())?;
+ }
+ }
if !self.errors.is_empty() {
- write!(f, "\n Errors:")?;
+ write!(f, " Errors:")?;
for error in &self.errors {
write!(f, "\n {}", error)?;
}
@@ -302,9 +348,9 @@ mod tests {
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29,
30, 31,
];
- #[test]
- fn can_find_unitd_instances() {
- UnitdInstance::running_unitd_instances().iter().for_each(|p| {
+ #[tokio::test]
+ async fn can_find_unitd_instances() {
+ UnitdInstance::running_unitd_instances().await.iter().for_each(|p| {
println!("{:?}", p);
println!("Runtime Flags: {:?}", p.process.cmd().map(|c| c.flags));
println!("Temp directory: {:?}", p.tmp_directory());
@@ -326,6 +372,7 @@ mod tests {
child_pids: vec![],
user: None,
effective_user: None,
+ container: None,
}
}
diff --git a/tools/unitctl/unit-client-rs/src/unitd_process.rs b/tools/unitctl/unit-client-rs/src/unitd_process.rs
index b8604e89..2a78bfc6 100644
--- a/tools/unitctl/unit-client-rs/src/unitd_process.rs
+++ b/tools/unitctl/unit-client-rs/src/unitd_process.rs
@@ -1,6 +1,9 @@
use crate::unitd_cmd::UnitdCmd;
+use crate::unitd_docker::{pid_is_dockerized, UnitdContainer};
use crate::unitd_instance::UNITD_BINARY_NAMES;
use crate::unitd_process_user::UnitdProcessUser;
+use serde::ser::SerializeMap;
+use serde::{Serialize, Serializer};
use std::collections::HashMap;
use std::path::Path;
use sysinfo::{Pid, Process, ProcessRefreshKind, System, UpdateKind, Users};
@@ -16,6 +19,23 @@ pub struct UnitdProcess {
pub child_pids: Vec<u64>,
pub user: Option<UnitdProcessUser>,
pub effective_user: Option<UnitdProcessUser>,
+ pub container: Option<UnitdContainer>,
+}
+
+impl Serialize for UnitdProcess {
+ fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+ where
+ S: Serializer,
+ {
+ let mut state = serializer.serialize_map(Some(6))?;
+ state.serialize_entry("pid", &self.process_id)?;
+ state.serialize_entry("user", &self.user)?;
+ state.serialize_entry("effective_user", &self.effective_user)?;
+ state.serialize_entry("executable", &self.executable_path())?;
+ state.serialize_entry("child_pids", &self.child_pids)?;
+ state.serialize_entry("container", &self.container)?;
+ state.end()
+ }
}
impl UnitdProcess {
@@ -41,10 +61,15 @@ impl UnitdProcess {
.iter()
// Filter out child processes
.filter(|p| {
- let parent_pid = p.1.parent();
- match parent_pid {
- Some(pid) => !unitd_processes.contains_key(&pid),
- None => false,
+ #[cfg(target_os = "linux")]
+ if pid_is_dockerized(p.0.as_u32().into()) {
+ false
+ } else {
+ let parent_pid = p.1.parent();
+ match parent_pid {
+ Some(pid) => !unitd_processes.contains_key(&pid),
+ None => false,
+ }
}
})
.map(|p| {
@@ -85,6 +110,7 @@ impl UnitdProcess {
child_pids,
user,
effective_user,
+ container: None,
}
})
.collect::<Vec<UnitdProcess>>()