summaryrefslogtreecommitdiffhomepage
path: root/tools/unitctl/unit-client-rs/src/unit_client.rs
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--tools/unitctl/unit-client-rs/src/unit_client.rs393
1 files changed, 393 insertions, 0 deletions
diff --git a/tools/unitctl/unit-client-rs/src/unit_client.rs b/tools/unitctl/unit-client-rs/src/unit_client.rs
new file mode 100644
index 00000000..b856fd20
--- /dev/null
+++ b/tools/unitctl/unit-client-rs/src/unit_client.rs
@@ -0,0 +1,393 @@
+use std::collections::HashMap;
+use std::error::Error as StdError;
+use std::fmt::Debug;
+use std::future::Future;
+use std::rc::Rc;
+use std::{fmt, io};
+
+use custom_error::custom_error;
+use hyper::body::{Buf, HttpBody};
+use hyper::client::{HttpConnector, ResponseFuture};
+use hyper::Error as HyperError;
+use hyper::{http, Body, Client, Request};
+use hyper_tls::HttpsConnector;
+use hyperlocal::{UnixClientExt, UnixConnector};
+use serde::{Deserialize, Serialize};
+use tokio::runtime::Runtime;
+
+use crate::control_socket_address::ControlSocket;
+use unit_openapi::apis::configuration::Configuration;
+use unit_openapi::apis::{Error as OpenAPIError, StatusApi};
+use unit_openapi::apis::{ListenersApi, ListenersApiClient, StatusApiClient};
+use unit_openapi::models::{ConfigListener, Status};
+
+const USER_AGENT: &str = concat!("UNIT CLI/", env!("CARGO_PKG_VERSION"), "/rust");
+
+custom_error! {pub UnitClientError
+ OpenAPIError { source: OpenAPIError } = "OpenAPI error",
+ JsonError { source: serde_json::Error,
+ path: String} = "JSON error [path={path}]",
+ HyperError { source: hyper::Error,
+ control_socket_address: String,
+ path: String} = "Communications error [control_socket_address={control_socket_address}, path={path}]: {source}",
+ HttpRequestError { source: http::Error,
+ path: String} = "HTTP error [path={path}]",
+ HttpResponseError { status: http::StatusCode,
+ path: String,
+ body: String} = "HTTP response error [path={path}, status={status}]:\n{body}",
+ HttpResponseJsonBodyError { status: http::StatusCode,
+ path: String,
+ error: String,
+ detail: String} = "HTTP response error [path={path}, status={status}]:\n Error: {error}\n Detail: {detail}",
+ IoError { source: io::Error, socket: String } = "IO error [socket={socket}]",
+ UnixSocketAddressError {
+ source: io::Error,
+ control_socket_address: String
+ } = "Invalid unix domain socket address [control_socket_address={control_socket_address}]",
+ SocketPermissionsError { control_socket_address: String } =
+ "Insufficient permissions to connect to control socket [control_socket_address={control_socket_address}]",
+ UnixSocketNotFound { control_socket_address: String } = "Unix socket not found [control_socket_address={control_socket_address}]",
+ TcpSocketAddressUriError {
+ source: http::uri::InvalidUri,
+ control_socket_address: String
+ } = "Invalid TCP socket address [control_socket_address={control_socket_address}]",
+ TcpSocketAddressParseError {
+ message: String,
+ control_socket_address: String
+ } = "Invalid TCP socket address [control_socket_address={control_socket_address}]: {message}",
+ TcpSocketAddressNoPortError {
+ control_socket_address: String
+ } = "TCP socket address does not have a port specified [control_socket_address={control_socket_address}]",
+ UnitdProcessParseError {
+ message: String,
+ pid: u64
+ } = "{message} for [pid={pid}]",
+ UnitdProcessExecError {
+ source: Box<dyn StdError>,
+ message: String,
+ executable_path: String,
+ pid: u64
+ } = "{message} for [pid={pid}, executable_path={executable_path}]: {source}",
+}
+
+impl UnitClientError {
+ fn new(error: HyperError, control_socket_address: String, path: String) -> Self {
+ if error.is_connect() {
+ if let Some(source) = error.source() {
+ if let Some(io_error) = source.downcast_ref::<io::Error>() {
+ if io_error.kind().eq(&io::ErrorKind::PermissionDenied) {
+ return UnitClientError::SocketPermissionsError { control_socket_address };
+ }
+ }
+ }
+ }
+
+ UnitClientError::HyperError {
+ source: error,
+ control_socket_address,
+ path,
+ }
+ }
+}
+
+macro_rules! new_openapi_client_from_hyper_client {
+ ($unit_client:expr, $hyper_client: ident, $api_client:ident, $api_trait:ident) => {{
+ let config = Configuration {
+ base_path: $unit_client.control_socket.create_uri_with_path("/").to_string(),
+ user_agent: Some(format!("{}/OpenAPI-Generator", USER_AGENT).to_owned()),
+ client: $hyper_client.clone(),
+ basic_auth: None,
+ oauth_access_token: None,
+ api_key: None,
+ };
+ let rc_config = Rc::new(config);
+ Box::new($api_client::new(rc_config)) as Box<dyn $api_trait>
+ }};
+}
+
+macro_rules! new_openapi_client {
+ ($unit_client:expr, $api_client:ident, $api_trait:ident) => {
+ match &*$unit_client.client {
+ RemoteClient::Tcp { client } => {
+ new_openapi_client_from_hyper_client!($unit_client, client, $api_client, $api_trait)
+ }
+ RemoteClient::Unix { client } => {
+ new_openapi_client_from_hyper_client!($unit_client, client, $api_client, $api_trait)
+ }
+ }
+ };
+}
+
+#[derive(Clone)]
+pub enum RemoteClient<B>
+where
+ B: HttpBody + Send + 'static,
+ B::Data: Send,
+ B::Error: Into<Box<dyn StdError + Send + Sync>>,
+{
+ Unix {
+ client: Client<UnixConnector, B>,
+ },
+ Tcp {
+ client: Client<HttpsConnector<HttpConnector>, B>,
+ },
+}
+
+impl<B> RemoteClient<B>
+where
+ B: HttpBody + Send + 'static,
+ B::Data: Send,
+ B::Error: Into<Box<dyn StdError + Send + Sync>>,
+{
+ fn client_name(&self) -> &str {
+ match self {
+ RemoteClient::Unix { .. } => "Client<UnixConnector, Body>",
+ RemoteClient::Tcp { .. } => "Client<HttpsConnector<HttpConnector>, Body>",
+ }
+ }
+
+ pub fn request(&self, req: Request<B>) -> ResponseFuture {
+ match self {
+ RemoteClient::Unix { client } => client.request(req),
+ RemoteClient::Tcp { client } => client.request(req),
+ }
+ }
+}
+
+impl<B> Debug for RemoteClient<B>
+where
+ B: HttpBody + Send + 'static,
+ B::Data: Send,
+ B::Error: Into<Box<dyn StdError + Send + Sync>>,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "{}", self.client_name())
+ }
+}
+
+#[derive(Debug)]
+pub struct UnitClient {
+ pub control_socket: ControlSocket,
+ /// A `current_thread` runtime for executing operations on the
+ /// asynchronous client in a blocking manner.
+ rt: Runtime,
+ /// Client for communicating with the control API over the UNIX domain socket
+ client: Box<RemoteClient<Body>>,
+}
+
+impl UnitClient {
+ pub fn new_with_runtime(control_socket: ControlSocket, runtime: Runtime) -> Self {
+ if control_socket.is_local_socket() {
+ Self::new_unix(control_socket, runtime)
+ } else {
+ Self::new_http(control_socket, runtime)
+ }
+ }
+
+ pub fn new(control_socket: ControlSocket) -> Self {
+ let runtime = tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()
+ .expect("Unable to create a current_thread runtime");
+ Self::new_with_runtime(control_socket, runtime)
+ }
+
+ pub fn new_http(control_socket: ControlSocket, runtime: Runtime) -> Self {
+ let remote_client = Client::builder().build(HttpsConnector::new());
+ Self {
+ control_socket,
+ rt: runtime,
+ client: Box::from(RemoteClient::Tcp { client: remote_client }),
+ }
+ }
+
+ pub fn new_unix(control_socket: ControlSocket, runtime: Runtime) -> UnitClient {
+ let remote_client = Client::unix();
+
+ Self {
+ control_socket,
+ rt: runtime,
+ client: Box::from(RemoteClient::Unix { client: remote_client }),
+ }
+ }
+
+ /// Sends a request to UNIT and deserializes the JSON response body into the value of type `RESPONSE`.
+ pub fn send_request_and_deserialize_response<RESPONSE: for<'de> serde::Deserialize<'de>>(
+ &self,
+ mut request: Request<Body>,
+ ) -> Result<RESPONSE, UnitClientError> {
+ let uri = request.uri().clone();
+ let path: &str = uri.path();
+
+ request.headers_mut().insert("User-Agent", USER_AGENT.parse().unwrap());
+
+ let response_future = self.client.request(request);
+
+ self.rt.block_on(async {
+ let response = response_future
+ .await
+ .map_err(|error| UnitClientError::new(error, self.control_socket.to_string(), path.to_string()))?;
+
+ let status = response.status();
+ let body = hyper::body::aggregate(response)
+ .await
+ .map_err(|error| UnitClientError::new(error, self.control_socket.to_string(), path.to_string()))?;
+ let reader = &mut body.reader();
+ if !status.is_success() {
+ let error: HashMap<String, String> =
+ serde_json::from_reader(reader).map_err(|error| UnitClientError::JsonError {
+ source: error,
+ path: path.to_string(),
+ })?;
+
+ return Err(UnitClientError::HttpResponseJsonBodyError {
+ status,
+ path: path.to_string(),
+ error: error.get("error").unwrap_or(&"Unknown error".into()).to_string(),
+ detail: error.get("detail").unwrap_or(&"".into()).to_string(),
+ });
+ }
+ serde_json::from_reader(reader).map_err(|error| UnitClientError::JsonError {
+ source: error,
+ path: path.to_string(),
+ })
+ })
+ }
+
+ pub fn listeners_api(&self) -> Box<dyn ListenersApi + 'static> {
+ new_openapi_client!(self, ListenersApiClient, ListenersApi)
+ }
+
+ pub fn listeners(&self) -> Result<HashMap<String, ConfigListener>, Box<UnitClientError>> {
+ let list_listeners = self.listeners_api().get_listeners();
+ self.execute_openapi_future(list_listeners)
+ }
+
+ pub fn execute_openapi_future<F: Future<Output = Result<R, OpenAPIError>>, R: for<'de> serde::Deserialize<'de>>(
+ &self,
+ future: F,
+ ) -> Result<R, Box<UnitClientError>> {
+ self.rt.block_on(future).map_err(|error| {
+ let remapped_error = if let OpenAPIError::Hyper(hyper_error) = error {
+ UnitClientError::new(hyper_error, self.control_socket.to_string(), "".to_string())
+ } else {
+ UnitClientError::OpenAPIError { source: error }
+ };
+
+ Box::new(remapped_error)
+ })
+ }
+
+ pub fn status_api(&self) -> Box<dyn StatusApi + 'static> {
+ new_openapi_client!(self, StatusApiClient, StatusApi)
+ }
+
+ pub fn status(&self) -> Result<Status, Box<UnitClientError>> {
+ let status = self.status_api().get_status();
+ self.execute_openapi_future(status)
+ }
+
+ pub fn is_running(&self) -> bool {
+ self.status().is_ok()
+ }
+}
+
+pub type UnitSerializableMap = HashMap<String, serde_json::Value>;
+
+#[derive(Clone, Debug, Deserialize, Serialize)]
+pub struct UnitStatus {
+ pub connections: UnitStatusConnections,
+ pub requests: UnitStatusRequests,
+ pub applications: HashMap<String, UnitStatusApplication>,
+}
+
+#[derive(Clone, Debug, Deserialize, Serialize)]
+pub struct UnitStatusConnections {
+ #[serde(default)]
+ pub closed: usize,
+ #[serde(default)]
+ pub idle: usize,
+ #[serde(default)]
+ pub active: usize,
+ #[serde(default)]
+ pub accepted: usize,
+}
+
+#[derive(Clone, Debug, Deserialize, Serialize)]
+pub struct UnitStatusRequests {
+ #[serde(default)]
+ pub active: usize,
+ #[serde(default)]
+ pub total: usize,
+}
+
+#[derive(Clone, Debug, Deserialize, Serialize)]
+pub struct UnitStatusApplication {
+ #[serde(default)]
+ pub processes: HashMap<String, usize>,
+ #[serde(default)]
+ pub requests: HashMap<String, usize>,
+}
+
+#[cfg(test)]
+mod tests {
+ use crate::unitd_instance::UnitdInstance;
+
+ use super::*;
+ // Integration tests
+
+ #[test]
+ fn can_connect_to_unit_api() {
+ match UnitdInstance::running_unitd_instances().first() {
+ Some(unit_instance) => {
+ let control_api_socket_address = unit_instance
+ .control_api_socket_address()
+ .expect("No control API socket path found");
+ let control_socket = ControlSocket::try_from(control_api_socket_address)
+ .expect("Unable to parse control socket address");
+ let unit_client = UnitClient::new(control_socket);
+ assert!(unit_client.is_running());
+ }
+ None => {
+ eprintln!("No running unitd instances found - skipping test");
+ }
+ }
+ }
+
+ #[test]
+ fn can_get_unit_status() {
+ match UnitdInstance::running_unitd_instances().first() {
+ Some(unit_instance) => {
+ let control_api_socket_address = unit_instance
+ .control_api_socket_address()
+ .expect("No control API socket path found");
+ let control_socket = ControlSocket::try_from(control_api_socket_address)
+ .expect("Unable to parse control socket address");
+ let unit_client = UnitClient::new(control_socket);
+ let status = unit_client.status().expect("Unable to get unit status");
+ println!("Unit status: {:?}", status);
+ }
+ None => {
+ eprintln!("No running unitd instances found - skipping test");
+ }
+ }
+ }
+
+ #[test]
+ fn can_get_unit_listeners() {
+ match UnitdInstance::running_unitd_instances().first() {
+ Some(unit_instance) => {
+ let control_api_socket_address = unit_instance
+ .control_api_socket_address()
+ .expect("No control API socket path found");
+ let control_socket = ControlSocket::try_from(control_api_socket_address)
+ .expect("Unable to parse control socket address");
+ let unit_client = UnitClient::new(control_socket);
+ unit_client.listeners().expect("Unable to get Unit listeners");
+ }
+ None => {
+ eprintln!("No running unitd instances found - skipping test");
+ }
+ }
+ }
+}