diff options
Diffstat (limited to 'tools/unitctl/unit-client-rs/src/unit_client.rs')
-rw-r--r-- | tools/unitctl/unit-client-rs/src/unit_client.rs | 146 |
1 files changed, 66 insertions, 80 deletions
diff --git a/tools/unitctl/unit-client-rs/src/unit_client.rs b/tools/unitctl/unit-client-rs/src/unit_client.rs index b856fd20..7456b106 100644 --- a/tools/unitctl/unit-client-rs/src/unit_client.rs +++ b/tools/unitctl/unit-client-rs/src/unit_client.rs @@ -1,7 +1,6 @@ use std::collections::HashMap; use std::error::Error as StdError; use std::fmt::Debug; -use std::future::Future; use std::rc::Rc; use std::{fmt, io}; @@ -13,7 +12,6 @@ use hyper::{http, Body, Client, Request}; use hyper_tls::HttpsConnector; use hyperlocal::{UnixClientExt, UnixConnector}; use serde::{Deserialize, Serialize}; -use tokio::runtime::Runtime; use crate::control_socket_address::ControlSocket; use unit_openapi::apis::configuration::Configuration; @@ -168,51 +166,38 @@ where #[derive(Debug)] pub struct UnitClient { pub control_socket: ControlSocket, - /// A `current_thread` runtime for executing operations on the - /// asynchronous client in a blocking manner. - rt: Runtime, /// Client for communicating with the control API over the UNIX domain socket client: Box<RemoteClient<Body>>, } impl UnitClient { - pub fn new_with_runtime(control_socket: ControlSocket, runtime: Runtime) -> Self { + pub fn new(control_socket: ControlSocket) -> Self { if control_socket.is_local_socket() { - Self::new_unix(control_socket, runtime) + Self::new_unix(control_socket) } else { - Self::new_http(control_socket, runtime) + Self::new_http(control_socket) } } - pub fn new(control_socket: ControlSocket) -> Self { - let runtime = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .expect("Unable to create a current_thread runtime"); - Self::new_with_runtime(control_socket, runtime) - } - - pub fn new_http(control_socket: ControlSocket, runtime: Runtime) -> Self { + pub fn new_http(control_socket: ControlSocket) -> Self { let remote_client = Client::builder().build(HttpsConnector::new()); Self { control_socket, - rt: runtime, client: Box::from(RemoteClient::Tcp { client: remote_client }), } } - pub fn new_unix(control_socket: ControlSocket, runtime: Runtime) -> UnitClient { + pub fn new_unix(control_socket: ControlSocket) -> UnitClient { let remote_client = Client::unix(); Self { control_socket, - rt: runtime, client: Box::from(RemoteClient::Unix { client: remote_client }), } } /// Sends a request to UNIT and deserializes the JSON response body into the value of type `RESPONSE`. - pub fn send_request_and_deserialize_response<RESPONSE: for<'de> serde::Deserialize<'de>>( + pub async fn send_request_and_deserialize_response<RESPONSE: for<'de> serde::Deserialize<'de>>( &self, mut request: Request<Body>, ) -> Result<RESPONSE, UnitClientError> { @@ -223,34 +208,32 @@ impl UnitClient { let response_future = self.client.request(request); - self.rt.block_on(async { - let response = response_future - .await - .map_err(|error| UnitClientError::new(error, self.control_socket.to_string(), path.to_string()))?; - - let status = response.status(); - let body = hyper::body::aggregate(response) - .await - .map_err(|error| UnitClientError::new(error, self.control_socket.to_string(), path.to_string()))?; - let reader = &mut body.reader(); - if !status.is_success() { - let error: HashMap<String, String> = - serde_json::from_reader(reader).map_err(|error| UnitClientError::JsonError { - source: error, - path: path.to_string(), - })?; - - return Err(UnitClientError::HttpResponseJsonBodyError { - status, + let response = response_future + .await + .map_err(|error| UnitClientError::new(error, self.control_socket.to_string(), path.to_string()))?; + + let status = response.status(); + let body = hyper::body::aggregate(response) + .await + .map_err(|error| UnitClientError::new(error, self.control_socket.to_string(), path.to_string()))?; + let reader = &mut body.reader(); + if !status.is_success() { + let error: HashMap<String, String> = + serde_json::from_reader(reader).map_err(|error| UnitClientError::JsonError { + source: error, path: path.to_string(), - error: error.get("error").unwrap_or(&"Unknown error".into()).to_string(), - detail: error.get("detail").unwrap_or(&"".into()).to_string(), - }); - } - serde_json::from_reader(reader).map_err(|error| UnitClientError::JsonError { - source: error, + })?; + + return Err(UnitClientError::HttpResponseJsonBodyError { + status, path: path.to_string(), - }) + error: error.get("error").unwrap_or(&"Unknown error".into()).to_string(), + detail: error.get("detail").unwrap_or(&"".into()).to_string(), + }); + } + serde_json::from_reader(reader).map_err(|error| UnitClientError::JsonError { + source: error, + path: path.to_string(), }) } @@ -258,23 +241,17 @@ impl UnitClient { new_openapi_client!(self, ListenersApiClient, ListenersApi) } - pub fn listeners(&self) -> Result<HashMap<String, ConfigListener>, Box<UnitClientError>> { - let list_listeners = self.listeners_api().get_listeners(); - self.execute_openapi_future(list_listeners) - } - - pub fn execute_openapi_future<F: Future<Output = Result<R, OpenAPIError>>, R: for<'de> serde::Deserialize<'de>>( - &self, - future: F, - ) -> Result<R, Box<UnitClientError>> { - self.rt.block_on(future).map_err(|error| { - let remapped_error = if let OpenAPIError::Hyper(hyper_error) = error { - UnitClientError::new(hyper_error, self.control_socket.to_string(), "".to_string()) + pub async fn listeners(&self) -> Result<HashMap<String, ConfigListener>, Box<UnitClientError>> { + self.listeners_api().get_listeners().await.or_else(|err| { + if let OpenAPIError::Hyper(hyper_error) = err { + Err(Box::new(UnitClientError::new( + hyper_error, + self.control_socket.to_string(), + "".to_string(), + ))) } else { - UnitClientError::OpenAPIError { source: error } - }; - - Box::new(remapped_error) + Err(Box::new(UnitClientError::OpenAPIError { source: err })) + } }) } @@ -282,13 +259,22 @@ impl UnitClient { new_openapi_client!(self, StatusApiClient, StatusApi) } - pub fn status(&self) -> Result<Status, Box<UnitClientError>> { - let status = self.status_api().get_status(); - self.execute_openapi_future(status) + pub async fn status(&self) -> Result<Status, Box<UnitClientError>> { + self.status_api().get_status().await.or_else(|err| { + if let OpenAPIError::Hyper(hyper_error) = err { + Err(Box::new(UnitClientError::new( + hyper_error, + self.control_socket.to_string(), + "".to_string(), + ))) + } else { + Err(Box::new(UnitClientError::OpenAPIError { source: err })) + } + }) } - pub fn is_running(&self) -> bool { - self.status().is_ok() + pub async fn is_running(&self) -> bool { + self.status().await.is_ok() } } @@ -336,9 +322,9 @@ mod tests { use super::*; // Integration tests - #[test] - fn can_connect_to_unit_api() { - match UnitdInstance::running_unitd_instances().first() { + #[tokio::test] + async fn can_connect_to_unit_api() { + match UnitdInstance::running_unitd_instances().await.first() { Some(unit_instance) => { let control_api_socket_address = unit_instance .control_api_socket_address() @@ -346,7 +332,7 @@ mod tests { let control_socket = ControlSocket::try_from(control_api_socket_address) .expect("Unable to parse control socket address"); let unit_client = UnitClient::new(control_socket); - assert!(unit_client.is_running()); + assert!(unit_client.is_running().await); } None => { eprintln!("No running unitd instances found - skipping test"); @@ -354,9 +340,9 @@ mod tests { } } - #[test] - fn can_get_unit_status() { - match UnitdInstance::running_unitd_instances().first() { + #[tokio::test] + async fn can_get_unit_status() { + match UnitdInstance::running_unitd_instances().await.first() { Some(unit_instance) => { let control_api_socket_address = unit_instance .control_api_socket_address() @@ -364,7 +350,7 @@ mod tests { let control_socket = ControlSocket::try_from(control_api_socket_address) .expect("Unable to parse control socket address"); let unit_client = UnitClient::new(control_socket); - let status = unit_client.status().expect("Unable to get unit status"); + let status = unit_client.status().await.expect("Unable to get unit status"); println!("Unit status: {:?}", status); } None => { @@ -373,9 +359,9 @@ mod tests { } } - #[test] - fn can_get_unit_listeners() { - match UnitdInstance::running_unitd_instances().first() { + #[tokio::test] + async fn can_get_unit_listeners() { + match UnitdInstance::running_unitd_instances().await.first() { Some(unit_instance) => { let control_api_socket_address = unit_instance .control_api_socket_address() @@ -383,7 +369,7 @@ mod tests { let control_socket = ControlSocket::try_from(control_api_socket_address) .expect("Unable to parse control socket address"); let unit_client = UnitClient::new(control_socket); - unit_client.listeners().expect("Unable to get Unit listeners"); + unit_client.listeners().await.expect("Unable to get Unit listeners"); } None => { eprintln!("No running unitd instances found - skipping test"); |