diff options
author | oxpa <iippolitov@gmail.com> | 2024-09-17 14:21:10 +0100 |
---|---|---|
committer | oxpa <iippolitov@gmail.com> | 2024-09-17 14:21:10 +0100 |
commit | 2417826d8bebf921ee1be102ef8ce702f0683d66 (patch) | |
tree | 76d29a1705415ed7368870826dbb2f04942ee794 /tools/unitctl/unit-client-rs/src/unit_client.rs | |
parent | 0e79d961bb1ea68674961da1703ffedb1ddf6e43 (diff) | |
parent | 24ed91f40634372d99f67f0e4e3c2ac0abde81bd (diff) | |
download | unit-2417826d8bebf921ee1be102ef8ce702f0683d66.tar.gz unit-2417826d8bebf921ee1be102ef8ce702f0683d66.tar.bz2 |
Merge tag '1.33.0' into packaging.
Unit 1.33.0 release.
Diffstat (limited to 'tools/unitctl/unit-client-rs/src/unit_client.rs')
-rw-r--r-- | tools/unitctl/unit-client-rs/src/unit_client.rs | 424 |
1 files changed, 424 insertions, 0 deletions
diff --git a/tools/unitctl/unit-client-rs/src/unit_client.rs b/tools/unitctl/unit-client-rs/src/unit_client.rs new file mode 100644 index 00000000..3d09e67a --- /dev/null +++ b/tools/unitctl/unit-client-rs/src/unit_client.rs @@ -0,0 +1,424 @@ +use std::collections::HashMap; +use std::error::Error as StdError; +use std::fmt::Debug; +use std::rc::Rc; +use std::{fmt, io}; + +use custom_error::custom_error; +use hyper::body::{Buf, HttpBody}; +use hyper::client::{HttpConnector, ResponseFuture}; +use hyper::Error as HyperError; +use hyper::{http, Body, Client, Request}; +use hyper_tls::HttpsConnector; +use hyperlocal::{UnixClientExt, UnixConnector}; +use serde::{Deserialize, Serialize}; + +use crate::control_socket_address::ControlSocket; +use unit_openapi::apis::configuration::Configuration; +use unit_openapi::apis::{ + ApplicationsApi, ApplicationsApiClient, AppsApi, AppsApiClient, Error as OpenAPIError, ListenersApi, + ListenersApiClient, StatusApi, StatusApiClient, +}; +use unit_openapi::models::{ConfigApplication, ConfigListener, Status}; + +const USER_AGENT: &str = concat!("Unit CLI/", env!("CARGO_PKG_VERSION"), "/rust"); + +custom_error! {pub UnitClientError + OpenAPIError { source: OpenAPIError } = "OpenAPI error", + JsonError { source: serde_json::Error, + path: String} = "JSON error [path={path}]", + HyperError { source: hyper::Error, + control_socket_address: String, + path: String} = "Communications error [control_socket_address={control_socket_address}, path={path}]: {source}", + HttpRequestError { source: http::Error, + path: String} = "HTTP error [path={path}]", + HttpResponseError { status: http::StatusCode, + path: String, + body: String} = "HTTP response error [path={path}, status={status}]:\n{body}", + HttpResponseJsonBodyError { status: http::StatusCode, + path: String, + error: String, + detail: String} = "HTTP response error [path={path}, status={status}]:\n Error: {error}\n Detail: {detail}", + IoError { source: io::Error, socket: String } = "IO error [socket={socket}]", + UnixSocketAddressError { + source: io::Error, + control_socket_address: String + } = "Invalid unix domain socket address [control_socket_address={control_socket_address}]", + SocketPermissionsError { control_socket_address: String } = + "Insufficient permissions to connect to control socket [control_socket_address={control_socket_address}]", + UnixSocketNotFound { control_socket_address: String } = "Unix socket not found [control_socket_address={control_socket_address}]", + TcpSocketAddressUriError { + source: http::uri::InvalidUri, + control_socket_address: String + } = "Invalid TCP socket address [control_socket_address={control_socket_address}]", + TcpSocketAddressParseError { + message: String, + control_socket_address: String + } = "Invalid TCP socket address [control_socket_address={control_socket_address}]: {message}", + TcpSocketAddressNoPortError { + control_socket_address: String + } = "TCP socket address does not have a port specified [control_socket_address={control_socket_address}]", + UnitdProcessParseError { + message: String, + pid: u64 + } = "{message} for [pid={pid}]", + UnitdProcessExecError { + source: Box<dyn StdError>, + message: String, + executable_path: String, + pid: u64 + } = "{message} for [pid={pid}, executable_path={executable_path}]: {source}", + UnitdDockerError { + message: String + } = "Failed to communicate with docker daemon: {message}", +} + +impl UnitClientError { + fn new(error: HyperError, control_socket_address: String, path: String) -> Self { + if error.is_connect() { + if let Some(source) = error.source() { + if let Some(io_error) = source.downcast_ref::<io::Error>() { + if io_error.kind().eq(&io::ErrorKind::PermissionDenied) { + return UnitClientError::SocketPermissionsError { control_socket_address }; + } + } + } + } + + UnitClientError::HyperError { + source: error, + control_socket_address, + path, + } + } +} + +macro_rules! new_openapi_client_from_hyper_client { + ($unit_client:expr, $hyper_client: ident, $api_client:ident, $api_trait:ident) => {{ + let config = Configuration { + base_path: $unit_client.control_socket.create_uri_with_path("/").to_string(), + user_agent: Some(format!("{}/OpenAPI-Generator", USER_AGENT).to_owned()), + client: $hyper_client.clone(), + basic_auth: None, + oauth_access_token: None, + api_key: None, + }; + let rc_config = Rc::new(config); + Box::new($api_client::new(rc_config)) as Box<dyn $api_trait> + }}; +} + +macro_rules! new_openapi_client { + ($unit_client:expr, $api_client:ident, $api_trait:ident) => { + match &*$unit_client.client { + RemoteClient::Tcp { client } => { + new_openapi_client_from_hyper_client!($unit_client, client, $api_client, $api_trait) + } + RemoteClient::Unix { client } => { + new_openapi_client_from_hyper_client!($unit_client, client, $api_client, $api_trait) + } + } + }; +} + +#[derive(Clone)] +pub enum RemoteClient<B> +where + B: HttpBody + Send + 'static, + B::Data: Send, + B::Error: Into<Box<dyn StdError + Send + Sync>>, +{ + Unix { + client: Client<UnixConnector, B>, + }, + Tcp { + client: Client<HttpsConnector<HttpConnector>, B>, + }, +} + +impl<B> RemoteClient<B> +where + B: HttpBody + Send + 'static, + B::Data: Send, + B::Error: Into<Box<dyn StdError + Send + Sync>>, +{ + fn client_name(&self) -> &str { + match self { + RemoteClient::Unix { .. } => "Client<UnixConnector, Body>", + RemoteClient::Tcp { .. } => "Client<HttpsConnector<HttpConnector>, Body>", + } + } + + pub fn request(&self, req: Request<B>) -> ResponseFuture { + match self { + RemoteClient::Unix { client } => client.request(req), + RemoteClient::Tcp { client } => client.request(req), + } + } +} + +impl<B> Debug for RemoteClient<B> +where + B: HttpBody + Send + 'static, + B::Data: Send, + B::Error: Into<Box<dyn StdError + Send + Sync>>, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.client_name()) + } +} + +#[derive(Debug)] +pub struct UnitClient { + pub control_socket: ControlSocket, + /// Client for communicating with the control API over the UNIX domain socket + client: Box<RemoteClient<Body>>, +} + +impl UnitClient { + pub fn new(control_socket: ControlSocket) -> Self { + if control_socket.is_local_socket() { + Self::new_unix(control_socket) + } else { + Self::new_http(control_socket) + } + } + + pub fn new_http(control_socket: ControlSocket) -> Self { + let remote_client = Client::builder().build(HttpsConnector::new()); + Self { + control_socket, + client: Box::from(RemoteClient::Tcp { client: remote_client }), + } + } + + pub fn new_unix(control_socket: ControlSocket) -> UnitClient { + let remote_client = Client::unix(); + + Self { + control_socket, + client: Box::from(RemoteClient::Unix { client: remote_client }), + } + } + + /// Sends a request to Unit and deserializes the JSON response body into the value of type `RESPONSE`. + pub async fn send_request_and_deserialize_response<RESPONSE: for<'de> serde::Deserialize<'de>>( + &self, + mut request: Request<Body>, + ) -> Result<RESPONSE, UnitClientError> { + let uri = request.uri().clone(); + let path: &str = uri.path(); + + request.headers_mut().insert("User-Agent", USER_AGENT.parse().unwrap()); + + let response_future = self.client.request(request); + + let response = response_future + .await + .map_err(|error| UnitClientError::new(error, self.control_socket.to_string(), path.to_string()))?; + + let status = response.status(); + let body = hyper::body::aggregate(response) + .await + .map_err(|error| UnitClientError::new(error, self.control_socket.to_string(), path.to_string()))?; + let reader = &mut body.reader(); + if !status.is_success() { + let error: HashMap<String, String> = + serde_json::from_reader(reader).map_err(|error| UnitClientError::JsonError { + source: error, + path: path.to_string(), + })?; + + return Err(UnitClientError::HttpResponseJsonBodyError { + status, + path: path.to_string(), + error: error.get("error").unwrap_or(&"Unknown error".into()).to_string(), + detail: error.get("detail").unwrap_or(&"".into()).to_string(), + }); + } + serde_json::from_reader(reader).map_err(|error| UnitClientError::JsonError { + source: error, + path: path.to_string(), + }) + } + + pub fn listeners_api(&self) -> Box<dyn ListenersApi + 'static> { + new_openapi_client!(self, ListenersApiClient, ListenersApi) + } + + pub async fn listeners(&self) -> Result<HashMap<String, ConfigListener>, Box<UnitClientError>> { + self.listeners_api().get_listeners().await.or_else(|err| { + if let OpenAPIError::Hyper(hyper_error) = err { + Err(Box::new(UnitClientError::new( + hyper_error, + self.control_socket.to_string(), + "/listeners".to_string(), + ))) + } else { + Err(Box::new(UnitClientError::OpenAPIError { source: err })) + } + }) + } + + pub fn status_api(&self) -> Box<dyn StatusApi + 'static> { + new_openapi_client!(self, StatusApiClient, StatusApi) + } + + pub async fn status(&self) -> Result<Status, Box<UnitClientError>> { + self.status_api().get_status().await.or_else(|err| { + if let OpenAPIError::Hyper(hyper_error) = err { + Err(Box::new(UnitClientError::new( + hyper_error, + self.control_socket.to_string(), + "/status".to_string(), + ))) + } else { + Err(Box::new(UnitClientError::OpenAPIError { source: err })) + } + }) + } + + pub fn applications_api(&self) -> Box<dyn ApplicationsApi + 'static> { + new_openapi_client!(self, ApplicationsApiClient, ApplicationsApi) + } + + pub async fn applications(&self) -> Result<HashMap<String, ConfigApplication>, Box<UnitClientError>> { + self.applications_api().get_applications().await.or_else(|err| { + if let OpenAPIError::Hyper(hyper_error) = err { + Err(Box::new(UnitClientError::new( + hyper_error, + self.control_socket.to_string(), + "/applications".to_string(), + ))) + } else { + Err(Box::new(UnitClientError::OpenAPIError { source: err })) + } + }) + } + + pub async fn per_application_api(&self) -> Box<dyn AppsApi + 'static> { + new_openapi_client!(self, AppsApiClient, AppsApi) + } + + pub async fn restart_application(&self, name: &String) -> Result<HashMap<String, String>, Box<UnitClientError>> { + self.per_application_api() + .await + .get_app_restart(name.as_str()) + .await + .or_else(|err| { + if let OpenAPIError::Hyper(hyper_error) = err { + Err(Box::new(UnitClientError::new( + hyper_error, + self.control_socket.to_string(), + format!("/control/applications/{}/restart", name), + ))) + } else { + Err(Box::new(UnitClientError::OpenAPIError { source: err })) + } + }) + } + + pub async fn is_running(&self) -> bool { + self.status().await.is_ok() + } +} + +pub type UnitSerializableMap = HashMap<String, serde_json::Value>; + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct UnitStatus { + pub connections: UnitStatusConnections, + pub requests: UnitStatusRequests, + pub applications: HashMap<String, UnitStatusApplication>, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct UnitStatusConnections { + #[serde(default)] + pub closed: usize, + #[serde(default)] + pub idle: usize, + #[serde(default)] + pub active: usize, + #[serde(default)] + pub accepted: usize, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct UnitStatusRequests { + #[serde(default)] + pub active: usize, + #[serde(default)] + pub total: usize, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct UnitStatusApplication { + #[serde(default)] + pub processes: HashMap<String, usize>, + #[serde(default)] + pub requests: HashMap<String, usize>, +} + +#[cfg(test)] +mod tests { + use crate::unitd_instance::UnitdInstance; + + use super::*; + // Integration tests + + #[tokio::test] + async fn can_connect_to_unit_api() { + match UnitdInstance::running_unitd_instances().await.first() { + Some(unit_instance) => { + let control_api_socket_address = unit_instance + .control_api_socket_address() + .expect("No control API socket path found"); + let control_socket = ControlSocket::try_from(control_api_socket_address) + .expect("Unable to parse control socket address"); + let unit_client = UnitClient::new(control_socket); + assert!(unit_client.is_running().await); + } + None => { + eprintln!("No running unitd instances found - skipping test"); + } + } + } + + #[tokio::test] + async fn can_get_unit_status() { + match UnitdInstance::running_unitd_instances().await.first() { + Some(unit_instance) => { + let control_api_socket_address = unit_instance + .control_api_socket_address() + .expect("No control API socket path found"); + let control_socket = ControlSocket::try_from(control_api_socket_address) + .expect("Unable to parse control socket address"); + let unit_client = UnitClient::new(control_socket); + let status = unit_client.status().await.expect("Unable to get unit status"); + println!("Unit status: {:?}", status); + } + None => { + eprintln!("No running unitd instances found - skipping test"); + } + } + } + + #[tokio::test] + async fn can_get_unit_listeners() { + match UnitdInstance::running_unitd_instances().await.first() { + Some(unit_instance) => { + let control_api_socket_address = unit_instance + .control_api_socket_address() + .expect("No control API socket path found"); + let control_socket = ControlSocket::try_from(control_api_socket_address) + .expect("Unable to parse control socket address"); + let unit_client = UnitClient::new(control_socket); + unit_client.listeners().await.expect("Unable to get Unit listeners"); + } + None => { + eprintln!("No running unitd instances found - skipping test"); + } + } + } +} |