diff --git a/Cargo.lock b/Cargo.lock index 9cd2b8e2..e157880f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -205,6 +205,7 @@ dependencies = [ "rocket", "serde", "serde_json", + "tokio", "tokio-test", ] diff --git a/aw-client-rust/Cargo.toml b/aw-client-rust/Cargo.toml index e7633bf6..f41c3706 100644 --- a/aw-client-rust/Cargo.toml +++ b/aw-client-rust/Cargo.toml @@ -11,6 +11,7 @@ serde = "1.0" serde_json = "1.0" chrono = { version = "0.4", features = ["serde"] } aw-models = { path = "../aw-models" } +tokio = { version = "1.28.2", features = ["rt"] } [dev-dependencies] aw-datastore = { path = "../aw-datastore" } diff --git a/aw-client-rust/src/blocking.rs b/aw-client-rust/src/blocking.rs new file mode 100644 index 00000000..f5aab1c7 --- /dev/null +++ b/aw-client-rust/src/blocking.rs @@ -0,0 +1,77 @@ +use std::collections::HashMap; +use std::future::Future; +use std::vec::Vec; + +use chrono::{DateTime, Utc}; + +use aw_models::{Bucket, Event}; + +use super::AwClient as AsyncAwClient; + +pub struct AwClient { + client: AsyncAwClient, + pub baseurl: String, + pub name: String, + pub hostname: String, +} + +impl std::fmt::Debug for AwClient { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "AwClient(baseurl={:?})", self.client.baseurl) + } +} + +fn block_on(f: F) -> F::Output { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("build shell runtime") + .block_on(f) +} + +macro_rules! proxy_method +{ + ($name:tt, $ret:ty, $($v:ident: $t:ty),*) => { + pub fn $name(&self, $($v: $t),*) -> Result<$ret, reqwest::Error> + { block_on(self.client.$name($($v),*)) } + }; +} + +impl AwClient { + pub fn new(ip: &str, port: &str, name: &str) -> AwClient { + let async_client = AsyncAwClient::new(ip, port, name); + + AwClient { + baseurl: async_client.baseurl.clone(), + name: async_client.name.clone(), + hostname: async_client.hostname.clone(), + client: async_client, + } + } + + proxy_method!(get_bucket, Bucket, bucketname: &str); + proxy_method!(get_buckets, HashMap,); + proxy_method!(create_bucket, (), bucket: &Bucket); + proxy_method!(create_bucket_simple, (), bucketname: &str, buckettype: &str); + proxy_method!(delete_bucket, (), bucketname: &str); + proxy_method!( + get_events, + Vec, + bucketname: &str, + start: Option>, + stop: Option>, + limit: Option + ); + proxy_method!(insert_event, (), bucketname: &str, event: &Event); + proxy_method!(insert_events, (), bucketname: &str, events: Vec); + proxy_method!( + heartbeat, + (), + bucketname: &str, + event: &Event, + pulsetime: f64 + ); + proxy_method!(delete_event, (), bucketname: &str, event_id: i64); + proxy_method!(get_event_count, i64, bucketname: &str); + proxy_method!(get_info, aw_models::Info,); +} diff --git a/aw-client-rust/src/lib.rs b/aw-client-rust/src/lib.rs index 1bca1035..0c055f91 100644 --- a/aw-client-rust/src/lib.rs +++ b/aw-client-rust/src/lib.rs @@ -3,6 +3,9 @@ extern crate chrono; extern crate gethostname; extern crate reqwest; extern crate serde_json; +extern crate tokio; + +pub mod blocking; use std::collections::HashMap; use std::vec::Vec; @@ -13,7 +16,7 @@ use serde_json::Map; pub use aw_models::{Bucket, BucketMetadata, Event}; pub struct AwClient { - client: reqwest::blocking::Client, + client: reqwest::Client, pub baseurl: String, pub name: String, pub hostname: String, @@ -28,7 +31,7 @@ impl std::fmt::Debug for AwClient { impl AwClient { pub fn new(ip: &str, port: &str, name: &str) -> AwClient { let baseurl = format!("http://{ip}:{port}"); - let client = reqwest::blocking::Client::builder() + let client = reqwest::Client::builder() .timeout(std::time::Duration::from_secs(120)) .build() .unwrap(); @@ -41,24 +44,31 @@ impl AwClient { } } - pub fn get_bucket(&self, bucketname: &str) -> Result { + pub async fn get_bucket(&self, bucketname: &str) -> Result { let url = format!("{}/api/0/buckets/{}", self.baseurl, bucketname); - let bucket = self.client.get(url).send()?.error_for_status()?.json()?; + let bucket = self + .client + .get(url) + .send() + .await? + .error_for_status()? + .json() + .await?; Ok(bucket) } - pub fn get_buckets(&self) -> Result, reqwest::Error> { + pub async fn get_buckets(&self) -> Result, reqwest::Error> { let url = format!("{}/api/0/buckets/", self.baseurl); - self.client.get(url).send()?.json() + self.client.get(url).send().await?.json().await } - pub fn create_bucket(&self, bucket: &Bucket) -> Result<(), reqwest::Error> { + pub async fn create_bucket(&self, bucket: &Bucket) -> Result<(), reqwest::Error> { let url = format!("{}/api/0/buckets/{}", self.baseurl, bucket.id); - self.client.post(url).json(bucket).send()?; + self.client.post(url).json(bucket).send().await?; Ok(()) } - pub fn create_bucket_simple( + pub async fn create_bucket_simple( &self, bucketname: &str, buckettype: &str, @@ -75,16 +85,16 @@ impl AwClient { created: None, last_updated: None, }; - self.create_bucket(&bucket) + self.create_bucket(&bucket).await } - pub fn delete_bucket(&self, bucketname: &str) -> Result<(), reqwest::Error> { + pub async fn delete_bucket(&self, bucketname: &str) -> Result<(), reqwest::Error> { let url = format!("{}/api/0/buckets/{}", self.baseurl, bucketname); - self.client.delete(url).send()?; + self.client.delete(url).send().await?; Ok(()) } - pub fn get_events( + pub async fn get_events( &self, bucketname: &str, start: Option>, @@ -109,27 +119,31 @@ impl AwClient { url.query_pairs_mut() .append_pair("limit", s.to_string().as_str()); }; - self.client.get(url).send()?.json() + self.client.get(url).send().await?.json().await } - pub fn insert_event(&self, bucketname: &str, event: &Event) -> Result<(), reqwest::Error> { + pub async fn insert_event( + &self, + bucketname: &str, + event: &Event, + ) -> Result<(), reqwest::Error> { let url = format!("{}/api/0/buckets/{}/events", self.baseurl, bucketname); let eventlist = vec![event.clone()]; - self.client.post(url).json(&eventlist).send()?; + self.client.post(url).json(&eventlist).send().await?; Ok(()) } - pub fn insert_events( + pub async fn insert_events( &self, bucketname: &str, events: Vec, ) -> Result<(), reqwest::Error> { let url = format!("{}/api/0/buckets/{}/events", self.baseurl, bucketname); - self.client.post(url).json(&events).send()?; + self.client.post(url).json(&events).send().await?; Ok(()) } - pub fn heartbeat( + pub async fn heartbeat( &self, bucketname: &str, event: &Event, @@ -139,22 +153,33 @@ impl AwClient { "{}/api/0/buckets/{}/heartbeat?pulsetime={}", self.baseurl, bucketname, pulsetime ); - self.client.post(url).json(&event).send()?; + self.client.post(url).json(&event).send().await?; Ok(()) } - pub fn delete_event(&self, bucketname: &str, event_id: i64) -> Result<(), reqwest::Error> { + pub async fn delete_event( + &self, + bucketname: &str, + event_id: i64, + ) -> Result<(), reqwest::Error> { let url = format!( "{}/api/0/buckets/{}/events/{}", self.baseurl, bucketname, event_id ); - self.client.delete(url).send()?; + self.client.delete(url).send().await?; Ok(()) } - pub fn get_event_count(&self, bucketname: &str) -> Result { + pub async fn get_event_count(&self, bucketname: &str) -> Result { let url = format!("{}/api/0/buckets/{}/events/count", self.baseurl, bucketname); - let res = self.client.get(url).send()?.error_for_status()?.text()?; + let res = self + .client + .get(url) + .send() + .await? + .error_for_status()? + .text() + .await?; let count: i64 = match res.trim().parse() { Ok(count) => count, Err(err) => panic!("could not parse get_event_count response: {err:?}"), @@ -162,8 +187,8 @@ impl AwClient { Ok(count) } - pub fn get_info(&self) -> Result { + pub async fn get_info(&self) -> Result { let url = format!("{}/api/0/info", self.baseurl); - self.client.get(url).send()?.json() + self.client.get(url).send().await?.json().await } } diff --git a/aw-client-rust/tests/test.rs b/aw-client-rust/tests/test.rs index d7fe8c0e..f149cdca 100644 --- a/aw-client-rust/tests/test.rs +++ b/aw-client-rust/tests/test.rs @@ -8,7 +8,7 @@ extern crate tokio_test; #[cfg(test)] mod test { - use aw_client_rust::AwClient; + use aw_client_rust::blocking::AwClient; use aw_client_rust::Event; use chrono::{DateTime, Duration, Utc}; use serde_json::Map; @@ -51,7 +51,7 @@ mod test { let shutdown_handler = server.shutdown(); thread::spawn(move || { - let launch = block_on(server.launch()).unwrap(); + let _ = block_on(server.launch()).unwrap(); }); shutdown_handler diff --git a/aw-sync/src/accessmethod.rs b/aw-sync/src/accessmethod.rs index 95887317..91825140 100644 --- a/aw-sync/src/accessmethod.rs +++ b/aw-sync/src/accessmethod.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; -use aw_client_rust::AwClient; +use aw_client_rust::blocking::AwClient; use chrono::{DateTime, Utc}; use reqwest::StatusCode; diff --git a/aw-sync/src/main.rs b/aw-sync/src/main.rs index 931aa42b..518e705d 100644 --- a/aw-sync/src/main.rs +++ b/aw-sync/src/main.rs @@ -20,7 +20,7 @@ use std::path::PathBuf; use chrono::{DateTime, Datelike, TimeZone, Utc}; use clap::{Parser, Subcommand}; -use aw_client_rust::AwClient; +use aw_client_rust::blocking::AwClient; mod accessmethod; mod sync; diff --git a/aw-sync/src/sync.rs b/aw-sync/src/sync.rs index 5c6958a7..8c4cfbb6 100644 --- a/aw-sync/src/sync.rs +++ b/aw-sync/src/sync.rs @@ -13,7 +13,7 @@ use std::ffi::OsStr; use std::fs; use std::path::{Path, PathBuf}; -use aw_client_rust::AwClient; +use aw_client_rust::blocking::AwClient; use chrono::{DateTime, Utc}; use aw_datastore::{Datastore, DatastoreError};