summaryrefslogtreecommitdiffhomepage
path: root/tools/unitctl/unit-client-rs/src/unit_client.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tools/unitctl/unit-client-rs/src/unit_client.rs')
-rw-r--r--tools/unitctl/unit-client-rs/src/unit_client.rs146
1 files changed, 66 insertions, 80 deletions
diff --git a/tools/unitctl/unit-client-rs/src/unit_client.rs b/tools/unitctl/unit-client-rs/src/unit_client.rs
index b856fd20..7456b106 100644
--- a/tools/unitctl/unit-client-rs/src/unit_client.rs
+++ b/tools/unitctl/unit-client-rs/src/unit_client.rs
@@ -1,7 +1,6 @@
use std::collections::HashMap;
use std::error::Error as StdError;
use std::fmt::Debug;
-use std::future::Future;
use std::rc::Rc;
use std::{fmt, io};
@@ -13,7 +12,6 @@ use hyper::{http, Body, Client, Request};
use hyper_tls::HttpsConnector;
use hyperlocal::{UnixClientExt, UnixConnector};
use serde::{Deserialize, Serialize};
-use tokio::runtime::Runtime;
use crate::control_socket_address::ControlSocket;
use unit_openapi::apis::configuration::Configuration;
@@ -168,51 +166,38 @@ where
#[derive(Debug)]
pub struct UnitClient {
pub control_socket: ControlSocket,
- /// A `current_thread` runtime for executing operations on the
- /// asynchronous client in a blocking manner.
- rt: Runtime,
/// Client for communicating with the control API over the UNIX domain socket
client: Box<RemoteClient<Body>>,
}
impl UnitClient {
- pub fn new_with_runtime(control_socket: ControlSocket, runtime: Runtime) -> Self {
+ pub fn new(control_socket: ControlSocket) -> Self {
if control_socket.is_local_socket() {
- Self::new_unix(control_socket, runtime)
+ Self::new_unix(control_socket)
} else {
- Self::new_http(control_socket, runtime)
+ Self::new_http(control_socket)
}
}
- pub fn new(control_socket: ControlSocket) -> Self {
- let runtime = tokio::runtime::Builder::new_current_thread()
- .enable_all()
- .build()
- .expect("Unable to create a current_thread runtime");
- Self::new_with_runtime(control_socket, runtime)
- }
-
- pub fn new_http(control_socket: ControlSocket, runtime: Runtime) -> Self {
+ pub fn new_http(control_socket: ControlSocket) -> Self {
let remote_client = Client::builder().build(HttpsConnector::new());
Self {
control_socket,
- rt: runtime,
client: Box::from(RemoteClient::Tcp { client: remote_client }),
}
}
- pub fn new_unix(control_socket: ControlSocket, runtime: Runtime) -> UnitClient {
+ pub fn new_unix(control_socket: ControlSocket) -> UnitClient {
let remote_client = Client::unix();
Self {
control_socket,
- rt: runtime,
client: Box::from(RemoteClient::Unix { client: remote_client }),
}
}
/// Sends a request to UNIT and deserializes the JSON response body into the value of type `RESPONSE`.
- pub fn send_request_and_deserialize_response<RESPONSE: for<'de> serde::Deserialize<'de>>(
+ pub async fn send_request_and_deserialize_response<RESPONSE: for<'de> serde::Deserialize<'de>>(
&self,
mut request: Request<Body>,
) -> Result<RESPONSE, UnitClientError> {
@@ -223,34 +208,32 @@ impl UnitClient {
let response_future = self.client.request(request);
- self.rt.block_on(async {
- let response = response_future
- .await
- .map_err(|error| UnitClientError::new(error, self.control_socket.to_string(), path.to_string()))?;
-
- let status = response.status();
- let body = hyper::body::aggregate(response)
- .await
- .map_err(|error| UnitClientError::new(error, self.control_socket.to_string(), path.to_string()))?;
- let reader = &mut body.reader();
- if !status.is_success() {
- let error: HashMap<String, String> =
- serde_json::from_reader(reader).map_err(|error| UnitClientError::JsonError {
- source: error,
- path: path.to_string(),
- })?;
-
- return Err(UnitClientError::HttpResponseJsonBodyError {
- status,
+ let response = response_future
+ .await
+ .map_err(|error| UnitClientError::new(error, self.control_socket.to_string(), path.to_string()))?;
+
+ let status = response.status();
+ let body = hyper::body::aggregate(response)
+ .await
+ .map_err(|error| UnitClientError::new(error, self.control_socket.to_string(), path.to_string()))?;
+ let reader = &mut body.reader();
+ if !status.is_success() {
+ let error: HashMap<String, String> =
+ serde_json::from_reader(reader).map_err(|error| UnitClientError::JsonError {
+ source: error,
path: path.to_string(),
- error: error.get("error").unwrap_or(&"Unknown error".into()).to_string(),
- detail: error.get("detail").unwrap_or(&"".into()).to_string(),
- });
- }
- serde_json::from_reader(reader).map_err(|error| UnitClientError::JsonError {
- source: error,
+ })?;
+
+ return Err(UnitClientError::HttpResponseJsonBodyError {
+ status,
path: path.to_string(),
- })
+ error: error.get("error").unwrap_or(&"Unknown error".into()).to_string(),
+ detail: error.get("detail").unwrap_or(&"".into()).to_string(),
+ });
+ }
+ serde_json::from_reader(reader).map_err(|error| UnitClientError::JsonError {
+ source: error,
+ path: path.to_string(),
})
}
@@ -258,23 +241,17 @@ impl UnitClient {
new_openapi_client!(self, ListenersApiClient, ListenersApi)
}
- pub fn listeners(&self) -> Result<HashMap<String, ConfigListener>, Box<UnitClientError>> {
- let list_listeners = self.listeners_api().get_listeners();
- self.execute_openapi_future(list_listeners)
- }
-
- pub fn execute_openapi_future<F: Future<Output = Result<R, OpenAPIError>>, R: for<'de> serde::Deserialize<'de>>(
- &self,
- future: F,
- ) -> Result<R, Box<UnitClientError>> {
- self.rt.block_on(future).map_err(|error| {
- let remapped_error = if let OpenAPIError::Hyper(hyper_error) = error {
- UnitClientError::new(hyper_error, self.control_socket.to_string(), "".to_string())
+ pub async fn listeners(&self) -> Result<HashMap<String, ConfigListener>, Box<UnitClientError>> {
+ self.listeners_api().get_listeners().await.or_else(|err| {
+ if let OpenAPIError::Hyper(hyper_error) = err {
+ Err(Box::new(UnitClientError::new(
+ hyper_error,
+ self.control_socket.to_string(),
+ "".to_string(),
+ )))
} else {
- UnitClientError::OpenAPIError { source: error }
- };
-
- Box::new(remapped_error)
+ Err(Box::new(UnitClientError::OpenAPIError { source: err }))
+ }
})
}
@@ -282,13 +259,22 @@ impl UnitClient {
new_openapi_client!(self, StatusApiClient, StatusApi)
}
- pub fn status(&self) -> Result<Status, Box<UnitClientError>> {
- let status = self.status_api().get_status();
- self.execute_openapi_future(status)
+ pub async fn status(&self) -> Result<Status, Box<UnitClientError>> {
+ self.status_api().get_status().await.or_else(|err| {
+ if let OpenAPIError::Hyper(hyper_error) = err {
+ Err(Box::new(UnitClientError::new(
+ hyper_error,
+ self.control_socket.to_string(),
+ "".to_string(),
+ )))
+ } else {
+ Err(Box::new(UnitClientError::OpenAPIError { source: err }))
+ }
+ })
}
- pub fn is_running(&self) -> bool {
- self.status().is_ok()
+ pub async fn is_running(&self) -> bool {
+ self.status().await.is_ok()
}
}
@@ -336,9 +322,9 @@ mod tests {
use super::*;
// Integration tests
- #[test]
- fn can_connect_to_unit_api() {
- match UnitdInstance::running_unitd_instances().first() {
+ #[tokio::test]
+ async fn can_connect_to_unit_api() {
+ match UnitdInstance::running_unitd_instances().await.first() {
Some(unit_instance) => {
let control_api_socket_address = unit_instance
.control_api_socket_address()
@@ -346,7 +332,7 @@ mod tests {
let control_socket = ControlSocket::try_from(control_api_socket_address)
.expect("Unable to parse control socket address");
let unit_client = UnitClient::new(control_socket);
- assert!(unit_client.is_running());
+ assert!(unit_client.is_running().await);
}
None => {
eprintln!("No running unitd instances found - skipping test");
@@ -354,9 +340,9 @@ mod tests {
}
}
- #[test]
- fn can_get_unit_status() {
- match UnitdInstance::running_unitd_instances().first() {
+ #[tokio::test]
+ async fn can_get_unit_status() {
+ match UnitdInstance::running_unitd_instances().await.first() {
Some(unit_instance) => {
let control_api_socket_address = unit_instance
.control_api_socket_address()
@@ -364,7 +350,7 @@ mod tests {
let control_socket = ControlSocket::try_from(control_api_socket_address)
.expect("Unable to parse control socket address");
let unit_client = UnitClient::new(control_socket);
- let status = unit_client.status().expect("Unable to get unit status");
+ let status = unit_client.status().await.expect("Unable to get unit status");
println!("Unit status: {:?}", status);
}
None => {
@@ -373,9 +359,9 @@ mod tests {
}
}
- #[test]
- fn can_get_unit_listeners() {
- match UnitdInstance::running_unitd_instances().first() {
+ #[tokio::test]
+ async fn can_get_unit_listeners() {
+ match UnitdInstance::running_unitd_instances().await.first() {
Some(unit_instance) => {
let control_api_socket_address = unit_instance
.control_api_socket_address()
@@ -383,7 +369,7 @@ mod tests {
let control_socket = ControlSocket::try_from(control_api_socket_address)
.expect("Unable to parse control socket address");
let unit_client = UnitClient::new(control_socket);
- unit_client.listeners().expect("Unable to get Unit listeners");
+ unit_client.listeners().await.expect("Unable to get Unit listeners");
}
None => {
eprintln!("No running unitd instances found - skipping test");