diff --git a/.gitignore b/.gitignore index 4ad9711..a6bd56b 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ local-* /target Cargo.lock +.vscode \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index 0e89c90..a09fea0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,8 +23,8 @@ blocking = ["reqwest/blocking"] [dependencies] tokio = { version = "1", features = ["macros", "rt-multi-thread",] } -parquet = "14.0.0" -reqwest = { version = "0.11", features = ["json"] } +parquet = "52.1.0" +reqwest = { version = "0.12", features = ["json"] } url = "2.2" rustc_version_runtime = "0.1" serde = { version = "1.0", features = ["derive"] } @@ -32,7 +32,7 @@ serde_json = "1.0" anyhow = "1.0" log = "0.4" env_logger = "0.9" -polars = { version = "0.22.8", features = ["lazy", "parquet"] } +polars = { version = "0.41.3", features = ["lazy", "parquet"] } [dev-dependencies] wiremock = "0.5" diff --git a/examples/async.rs b/examples/async.rs index 9f2f83a..27a63b3 100644 --- a/examples/async.rs +++ b/examples/async.rs @@ -11,7 +11,7 @@ async fn main() { let conf_str = &fs::read_to_string("./config.json").unwrap(); let config: ProviderConfig = serde_json::from_str(conf_str).expect("Invalid configuration"); - let mut app = Client::new(config, None).await.unwrap(); + let mut app = Client::new(config, None, None).await.unwrap(); let shares = app.list_shares().await.unwrap(); if shares.len() == 0 { println!("At least 1 Delta Share is required"); @@ -42,7 +42,7 @@ async fn main() { ); } else { let res = app - .get_dataframe(&tables[0]) + .get_dataframe(&tables[0], None) .await .unwrap() .collect() diff --git a/examples/blocking.rs b/examples/blocking.rs index c994b52..b6bd757 100644 --- a/examples/blocking.rs +++ b/examples/blocking.rs @@ -10,7 +10,7 @@ fn main() { let conf_str = &fs::read_to_string("./config.json").unwrap(); let config: ProviderConfig = serde_json::from_str(conf_str).expect("Invalid configuration"); - let mut app = Client::new(config, None).unwrap(); + let mut app = Client::new(config, None, None).unwrap(); let shares = app.list_shares().unwrap(); if shares.len() == 0 { println!("At least 1 Delta Share is required"); @@ -22,7 +22,11 @@ fn main() { shares[0].name ); } else { - let res = app.get_dataframe(&tables[0]).unwrap().collect().unwrap(); + let res = app + .get_dataframe(&tables[0], None) + .unwrap() + .collect() + .unwrap(); println!("Dataframe:\n {}", res); } } diff --git a/src/blocking/client.rs b/src/blocking/client.rs index a558767..36be608 100644 --- a/src/blocking/client.rs +++ b/src/blocking/client.rs @@ -2,7 +2,7 @@ use crate::protocol::*; use crate::reader::*; use crate::utils::*; use parquet::data_type::AsBytes; -use polars::prelude::{LazyFrame, Result as PolarResult}; +use polars::prelude::LazyFrame; use reqwest::{header, header::HeaderValue}; use serde_json::{Map, Number, Value}; use std::collections::HashMap; @@ -30,45 +30,63 @@ impl Client { pub fn new( provider_config: ProviderConfig, data_root: Option, + capabilities: Option>, ) -> Result { if provider_config.share_credentials_version > CREDENTIALS_VERSION { - panic!("'share_credentials_version' in the provider configuration is {}, which is newer than the \ + return Err(anyhow::anyhow!("'share_credentials_version' in the provider configuration is {}, which is newer than the \ version {} supported by the current release. Please upgrade to a newer release.", provider_config.share_credentials_version, - CREDENTIALS_VERSION); + CREDENTIALS_VERSION)); } let cache: HashMap = HashMap::new(); Ok(Self { - http_client: Self::get_client(&provider_config)?, + http_client: Self::get_client(&provider_config, capabilities.unwrap_or_default())?, base_url: Self::build_base_url(&provider_config.endpoint)?, data_root: data_root.unwrap_or( env::temp_dir() .as_path() .join("delta_sharing") .to_str() - .unwrap() + .ok_or(anyhow::anyhow!("Error selecting data root folder"))? .to_string(), ), cache: cache, }) } - fn get_client(config: &ProviderConfig) -> Result { + fn get_client( + config: &ProviderConfig, + capabilities: HashMap, + ) -> Result { let rust_version: &str = &format!("{}", rustc_version_runtime::version()); let user_agent: &str = &format!("Delta-Sharing-Rust/{VERSION} Rust/{rust_version}"); let bearer_token = &format!("Bearer {}", config.bearer_token); let mut headers = header::HeaderMap::new(); headers.insert( header::AUTHORIZATION, - header::HeaderValue::from_str(bearer_token).unwrap(), + header::HeaderValue::from_str(bearer_token) + .map_err(|e| anyhow::anyhow!("Error setting authorization header:{e}"))?, ); headers.insert( header::USER_AGENT, - header::HeaderValue::from_str(user_agent).unwrap(), + header::HeaderValue::from_str(user_agent) + .map_err(|e| anyhow::anyhow!("Error setting user agent header:{e}"))?, + ); + headers.insert( + header::HeaderName::from_static("delta-sharing-capabilities"), + header::HeaderValue::from_str( + &capabilities + .iter() + .map(|(k, v)| format!("{k}={v}")) + .collect::>() + .join(";"), + ) + .map_err(|e| anyhow::anyhow!("Error setting delta-sharing-capabilities header:{e}"))?, ); reqwest::blocking::Client::builder() .default_headers(headers) .build() + .map_err(|e| anyhow::anyhow!("Error building Http client: {e}")) } fn build_base_url(endpoint: &String) -> Result { @@ -77,8 +95,11 @@ impl Client { Url::parse(&root_path) } - fn get(&self, target: &str) -> Result { - let url = self.base_url.join(target).unwrap(); + fn get(&self, target: &str) -> Result { + let url = self + .base_url + .join(target) + .map_err(|e| anyhow::anyhow!("Error creating GET url: {e}"))?; debug!("--> HTTP GET to: {}", &url); let resp = self.http_client.get(url.as_str()).send()?; let resp_text = resp.text()?; @@ -86,23 +107,29 @@ impl Client { return Ok(resp_text); } - fn head(&self, target: &str, key: &str) -> Option { - let url = self.base_url.join(target).unwrap(); + fn head(&self, target: &str, key: &str) -> Result, anyhow::Error> { + let url = self + .base_url + .join(target) + .map_err(|e| anyhow::anyhow!("Error creating HEAD url: {e}"))?; debug!("HTTP HEAD to: {}", &url); let resp = self .http_client .head(url.as_str()) .send() - .expect("Invalid request"); + .map_err(|e| anyhow::anyhow!("Invalid HEAD request: {e}"))?; let version = resp.headers().get(key); match version { - Some(h) => Some(h.clone()), - None => None, + Some(h) => Ok(Some(h.clone())), + None => Ok(None), } } - fn post(&self, target: &str, json: &Map) -> Result { - let url = self.base_url.join(target).unwrap(); + fn post(&self, target: &str, json: &Map) -> Result { + let url = self + .base_url + .join(target) + .map_err(|e| anyhow::anyhow!("Error creating POST url: {e}"))?; debug!("--> HTTP POST to: {}", &url); let resp = self.http_client.post(url.as_str()).json(json).send()?; let resp_text = resp.text()?; @@ -110,24 +137,30 @@ impl Client { return Ok(resp_text); } - fn download(&self, url: String, dest_path: &Path) { + fn download(&self, url: String, dest_path: &Path) -> Result { debug!("--> Download {} to {}", &url, dest_path.display()); - let resp = reqwest::blocking::get(url).unwrap(); - let mut out = fs::File::create(dest_path).expect("Failed to create an output file"); - let content = resp.bytes().unwrap(); + let resp = reqwest::blocking::get(url) + .map_err(|e| anyhow::anyhow!("Error creating POST url: {e}"))?; + let mut out = fs::File::create(dest_path) + .map_err(|e| anyhow::anyhow!("Failed to create an output file: {e}"))?; + let content = resp + .bytes() + .map_err(|e| anyhow::anyhow!("Failed to read download bytes: {e}"))?; io::copy(&mut content.as_bytes(), &mut out) - .expect("Failed to save the content to output file"); + .map_err(|e| anyhow::anyhow!("Failed to save the content to output file: {e}")) } pub fn list_shares(&self) -> Result, anyhow::Error> { let shares = self.get("shares")?; - let parsed: ShareResponse = serde_json::from_str(&shares).expect("Invalid response"); + let parsed: ShareResponse = serde_json::from_str(&shares) + .map_err(|e| anyhow::anyhow!("Invalid list shares response: {e}"))?; return Ok(parsed.items.clone()); } pub fn list_schemas(&self, share: &Share) -> Result, anyhow::Error> { let schemas = self.get(&format!("shares/{}/schemas", share.name))?; - let parsed: SchemaResponse = serde_json::from_str(&schemas).expect("Invalid response"); + let parsed: SchemaResponse = serde_json::from_str(&schemas) + .map_err(|e| anyhow::anyhow!("Invalid list schemas response: {e}"))?; return Ok(parsed.items.clone()); } @@ -136,13 +169,15 @@ impl Client { "shares/{}/schemas/{}/tables", schema.share, schema.name ))?; - let parsed: TableResponse = serde_json::from_str(&tables).expect("Invalid response"); + let parsed: TableResponse = serde_json::from_str(&tables) + .map_err(|e| anyhow::anyhow!("Invalid list tables response: {e}"))?; return Ok(parsed.items.clone()); } pub fn list_all_tables(&self, share: &Share) -> Result, anyhow::Error> { let tables = self.get(&format!("shares/{}/all-tables", share.name))?; - let parsed: TableResponse = serde_json::from_str(&tables).expect("Invalid response"); + let parsed: TableResponse = serde_json::from_str(&tables) + .map_err(|e| anyhow::anyhow!("Invalid list all tables response: {e}"))?; return Ok(parsed.items.clone()); } @@ -152,12 +187,20 @@ impl Client { table.share, table.schema, table.name ))?; let mut meta_lines = meta.lines(); - let protocol: ProtocolResponse = - serde_json::from_str(meta_lines.next().expect("Invalid response")) - .expect("Invalid protocol"); - let metadata: MetadataResponse = - serde_json::from_str(meta_lines.next().expect("Invalid response")) - .expect("Invalid metadata"); + let protocol: ProtocolResponse = meta_lines + .next() + .map(|lines| { + serde_json::from_str::(lines) + .map_err(|e| anyhow::anyhow!("Invalid protocol response - {lines}: {e}")) + }) + .unwrap_or(Err(anyhow::anyhow!("Empty protocol response")))?; + let metadata: MetadataResponse = meta_lines + .next() + .map(|lines| { + serde_json::from_str::(lines) + .map_err(|e| anyhow::anyhow!("Invalid metadata response - {lines}: {e}")) + }) + .unwrap_or(Err(anyhow::anyhow!("Empty metadata response")))?; Ok(TableMetadata { protocol: protocol.protocol, metadata: metadata.metadata, @@ -173,46 +216,40 @@ impl Client { "delta-table-version", ); match version { - Some(v) => v + Ok(Some(v)) => v .to_str() - .expect("Invalid version number") - .parse::() - .expect("Invalid version number"), - None => -1, + .ok() + .and_then(|value| value.parse::().ok()) + .unwrap_or(-1), + _ => -1, } } pub fn list_table_files( &self, table: &Table, - predicate_hints: Option>, - limit_hint: Option, - version: Option, + request: Option, ) -> Result { let mut map = Map::new(); - if predicate_hints.is_some() { + if let Some(predicate_hints) = request.as_ref().and_then(|r| r.predicate_hints.as_ref()) { map.insert( "predicateHints".to_string(), Value::Array( predicate_hints - .unwrap() .iter() .map(|s| Value::String(s.to_string())) .collect::>(), ), ); } - if limit_hint.is_some() { + if let Some(limit_hint) = request.as_ref().and_then(|r| r.limit_hint) { map.insert( "limitHint".to_string(), - Value::Number(Number::from(limit_hint.unwrap())), + Value::Number(Number::from(limit_hint)), ); } - if version.is_some() { - map.insert( - "version".to_string(), - Value::Number(Number::from(version.unwrap())), - ); + if let Some(version) = request.as_ref().and_then(|r| r.version) { + map.insert("version".to_string(), Value::Number(Number::from(version))); } let response = self.post( &format!( @@ -222,15 +259,24 @@ impl Client { &map, )?; let mut lines = response.lines(); - let protocol: ProtocolResponse = - serde_json::from_str(lines.next().expect("Invalid response")) - .expect("Invalid protocol"); - let metadata: MetadataResponse = - serde_json::from_str(lines.next().expect("Invalid response")) - .expect("Invalid metadata"); + let protocol: ProtocolResponse = lines + .next() + .map(|lines| { + serde_json::from_str::(lines) + .map_err(|e| anyhow::anyhow!("Invalid protocol response - {lines}: {e}")) + }) + .unwrap_or(Err(anyhow::anyhow!("Empty protocol response")))?; + let metadata: MetadataResponse = lines + .next() + .map(|lines| { + serde_json::from_str::(lines) + .map_err(|e| anyhow::anyhow!("Invalid metadata response - {lines}: {e}")) + }) + .unwrap_or(Err(anyhow::anyhow!("Empty metadata response")))?; let mut files: Vec = Vec::new(); for l in lines { - let file: FileResponse = serde_json::from_str(l).expect("Invalid file info"); + let file: FileResponse = + serde_json::from_str(l).map_err(|e| anyhow::anyhow!("Invalid file info: {e}"))?; files.push(file.file.clone()); } Ok(TableFiles { @@ -242,59 +288,110 @@ impl Client { }) } - fn download_files(&self, table_path: &PathBuf, table_files: &TableFiles) -> Vec { + fn download_files( + &self, + table_path: &PathBuf, + table_files: &TableFiles, + ) -> Result, anyhow::Error> { if Path::exists(&table_path) { - fs::remove_dir_all(&table_path).unwrap(); + fs::remove_dir_all(&table_path) + .map_err(|e| anyhow::anyhow!("Error cleaning table path: {e}"))?; } - fs::create_dir_all(&table_path).unwrap(); + fs::create_dir_all(&table_path) + .map_err(|e| anyhow::anyhow!("Error creating table path: {e}"))?; let mut file_paths: Vec = Vec::new(); - for file in table_files.files.clone() { - let dst_path = &table_path.join(format!("{}.snappy.parquet", &file.id)); - self.download(file.url, &dst_path); - file_paths.push(dst_path.clone()); + let count = table_files.files.len(); + for (index, file) in table_files.files.clone().into_iter().enumerate() { + match file { + File::Parquet(ParquetFile { id, url, .. }) => { + let dst_path = &table_path.join(format!("{}.snappy.parquet", &id)); + let bytes = self.download(url, &dst_path)?; + debug!( + "Downloaded {}/{} {} ({} bytes)", + index + 1, + count, + dst_path.display(), + bytes + ); + file_paths.push(dst_path.clone()); + } + File::Delta(delta_file) => { + if let Some(url) = delta_file.get_url() { + let dst_path = + &table_path.join(format!("{}.snappy.parquet", &delta_file.id)); + let bytes = self.download(url, &dst_path)?; + debug!( + "Downloaded {}/{} {} ({} bytes)", + index + 1, + count, + dst_path.display(), + bytes + ); + file_paths.push(dst_path.clone()); + } + } + } } - file_paths.clone() + Ok(file_paths.clone()) } - fn load_cached(&self, table_path: &PathBuf, table_files: &TableFiles) -> Option> { + fn load_cached( + &self, + table_path: &PathBuf, + table_files: &TableFiles, + ) -> Result>, anyhow::Error> { // Check if the files exist, load and compare the files. let metadata_path = &table_path.join(METADATA_FILE); if Path::exists(&metadata_path) { - let metadata_str = &fs::read_to_string(&metadata_path).unwrap(); - let metadata: TableMetadata = serde_json::from_str(&metadata_str).expect(&format!( - "Invalid configuration in {}", - metadata_path.display() - )); + let metadata_str = &fs::read_to_string(&metadata_path).map_err(|e| { + anyhow::anyhow!("Error reading file path {}: {}", metadata_path.display(), e) + })?; + let metadata: TableMetadata = serde_json::from_str(&metadata_str).map_err(|e| { + anyhow::anyhow!( + "Invalid configuration in {}: {}", + metadata_path.display(), + e + ) + })?; let mut download = metadata != table_files.metadata; if !download { let mut file_paths: Vec = Vec::new(); for file in &table_files.files { - let file_path = &table_path.join(format!("{}.snappy.parquet", &file.id)); + let file_id = match file { + File::Parquet(ParquetFile { id, .. }) => id, + File::Delta(DeltaFile { id, .. }) => id, + }; + let file_path = &table_path.join(format!("{}.snappy.parquet", &file_id)); if !Path::exists(&file_path) { // File is missing, invalidate cache download = true; - fs::remove_dir(&table_path).unwrap(); + fs::remove_dir_all(&table_path) + .map_err(|e| anyhow::anyhow!("Error invalidating cache: {e}"))?; break; } file_paths.push(file_path.clone()); } if !download { - return Some(file_paths.clone()); + return Ok(Some(file_paths.clone())); } } } - None + Ok(None) } - pub fn get_files(&mut self, table: &Table) -> Result, anyhow::Error> { + pub fn get_files( + &mut self, + table: &Table, + request: Option, + ) -> Result, anyhow::Error> { let key = table.fully_qualified_name(); let mut download = true; let table_path = Path::new(&self.data_root).join(table.fully_qualified_name()); - let table_files = self.list_table_files(table, None, None, None).unwrap(); + let table_files = self.list_table_files(table, request)?; if let Some(cached) = self.cache.get(&key) { download = cached.table_files.metadata != table_files.metadata; - } else if let Some(cached) = self.load_cached(&table_path, &table_files) { + } else if let Some(cached) = self.load_cached(&table_path, &table_files)? { download = false; self.cache.insert( key.clone(), @@ -306,7 +403,7 @@ impl Client { } if download { info!("--> Downloading data files to {}", &table_path.display()); - let paths = self.download_files(&table_path, &table_files); + let paths = self.download_files(&table_path, &table_files)?; serde_json::to_writer( &fs::File::create(&table_path.join(METADATA_FILE))?, &table_files.metadata, @@ -319,12 +416,22 @@ impl Client { }, ); } - Ok(self.cache.get(&key).unwrap().file_paths.clone()) + Ok(self + .cache + .get(&key) + .ok_or(anyhow::anyhow!("Error reading {key} from cache"))? + .file_paths + .clone()) } - pub fn get_dataframe(&mut self, table: &Table) -> PolarResult { - self.get_files(&table)?; + pub fn get_dataframe( + &mut self, + table: &Table, + request: Option, + ) -> Result { + self.get_files(&table, request)?; let table_path = Path::new(&self.data_root).join(table.fully_qualified_name()); load_parquet_files_as_dataframe(&table_path) + .map_err(|e| anyhow::anyhow!("Error loading parquet files: {e}")) } } diff --git a/src/blocking/mod.rs b/src/blocking/mod.rs index 1f3f67f..e855e48 100644 --- a/src/blocking/mod.rs +++ b/src/blocking/mod.rs @@ -26,7 +26,7 @@ //! endpoint: "".to_string(), //! bearer_token: "".to_string(), //! }; -//! let mut app = Client::new(config, None).unwrap(); +//! let mut app = Client::new(config, None, None).unwrap(); //! let shares = app.list_shares().unwrap(); //! if shares.len() == 0 { //! println!("At least 1 Delta Share is required"); @@ -38,7 +38,7 @@ //! shares[0].name //! ); //! } else { -//! let res = app.get_dataframe(&tables[0]).unwrap().collect().unwrap(); +//! let res = app.get_dataframe(&tables[0], None).unwrap().collect().unwrap(); //! println!("Dataframe:\n {}", res); //! } //! } diff --git a/src/client.rs b/src/client.rs index 3c0ba1a..6e14bea 100644 --- a/src/client.rs +++ b/src/client.rs @@ -2,7 +2,7 @@ use crate::protocol::*; use crate::reader::*; use crate::utils::*; use parquet::data_type::AsBytes; -use polars::prelude::{LazyFrame, Result as PolarResult}; +use polars::prelude::LazyFrame; use reqwest::{header, header::HeaderValue}; use serde_json::{Map, Number, Value}; use std::collections::HashMap; @@ -30,43 +30,63 @@ impl Client { pub async fn new( provider_config: ProviderConfig, data_root: Option, + capabilities: Option>, ) -> Result { if provider_config.share_credentials_version > CREDENTIALS_VERSION { - panic!("'share_credentials_version' in the provider configuration is {}, which is newer than the \ + return Err(anyhow::anyhow!("'share_credentials_version' in the provider configuration is {}, which is newer than the \ version {} supported by the current release. Please upgrade to a newer release.", provider_config.share_credentials_version, - CREDENTIALS_VERSION); + CREDENTIALS_VERSION)); } let cache: HashMap = HashMap::new(); Ok(Self { - http_client: Self::get_client(&provider_config)?, + http_client: Self::get_client(&provider_config, capabilities.unwrap_or_default())?, base_url: Self::build_base_url(&provider_config.endpoint)?, data_root: data_root.unwrap_or( env::temp_dir() .as_path() .join("delta_sharing") .to_str() - .unwrap() + .ok_or(anyhow::anyhow!("Error selecting data root folder"))? .to_string(), ), cache: cache, }) } - fn get_client(config: &ProviderConfig) -> Result { + fn get_client( + config: &ProviderConfig, + capabilities: HashMap, + ) -> Result { let rust_version: &str = &format!("{}", rustc_version_runtime::version()); let user_agent: &str = &format!("Delta-Sharing-Rust/{VERSION} Rust/{rust_version}"); let bearer_token = &format!("Bearer {}", config.bearer_token); let mut headers = header::HeaderMap::new(); headers.insert( header::AUTHORIZATION, - header::HeaderValue::from_str(bearer_token).unwrap(), + header::HeaderValue::from_str(bearer_token) + .map_err(|e| anyhow::anyhow!("Error setting authorization header:{e}"))?, ); headers.insert( header::USER_AGENT, - header::HeaderValue::from_str(user_agent).unwrap(), + header::HeaderValue::from_str(user_agent) + .map_err(|e| anyhow::anyhow!("Error setting user agent header:{e}"))?, ); - reqwest::Client::builder().default_headers(headers).build() + headers.insert( + header::HeaderName::from_static("delta-sharing-capabilities"), + header::HeaderValue::from_str( + &capabilities + .iter() + .map(|(k, v)| format!("{k}={v}")) + .collect::>() + .join(";"), + ) + .map_err(|e| anyhow::anyhow!("Error setting delta-sharing-capabilities header:{e}"))?, + ); + reqwest::Client::builder() + .default_headers(headers) + .build() + .map_err(|e| anyhow::anyhow!("Error building Http client: {e}")) } fn build_base_url(endpoint: &String) -> Result { @@ -75,8 +95,11 @@ impl Client { Url::parse(&root_path) } - async fn get(&self, target: &str) -> Result { - let url = self.base_url.join(target).unwrap(); + async fn get(&self, target: &str) -> Result { + let url = self + .base_url + .join(target) + .map_err(|e| anyhow::anyhow!("Error creating GET url: {e}"))?; debug!("--> HTTP GET to: {}", &url); let resp = self.http_client.get(url.as_str()).send().await?; let resp_text = resp.text().await?; @@ -84,28 +107,30 @@ impl Client { return Ok(resp_text); } - async fn head(&self, target: &str, key: &str) -> Option { - let url = self.base_url.join(target).unwrap(); + async fn head(&self, target: &str, key: &str) -> Result, anyhow::Error> { + let url = self + .base_url + .join(target) + .map_err(|e| anyhow::anyhow!("Error creating HEAD url: {e}"))?; debug!("HTTP HEAD to: {}", &url); let resp = self .http_client .head(url.as_str()) .send() .await - .expect("Invalid request"); + .map_err(|e| anyhow::anyhow!("Invalid HEAD request: {e}"))?; let version = resp.headers().get(key); match version { - Some(h) => Some(h.clone()), - None => None, + Some(h) => Ok(Some(h.clone())), + None => Ok(None), } } - async fn post( - &self, - target: &str, - json: &Map, - ) -> Result { - let url = self.base_url.join(target).unwrap(); + async fn post(&self, target: &str, json: &Map) -> Result { + let url = self + .base_url + .join(target) + .map_err(|e| anyhow::anyhow!("Error creating POST url: {e}"))?; debug!("--> HTTP POST to: {}", &url); let resp = self .http_client @@ -118,24 +143,32 @@ impl Client { return Ok(resp_text); } - async fn download(&self, url: String, dest_path: &Path) { + async fn download(&self, url: String, dest_path: &Path) -> Result { debug!("--> Download {} to {}", &url, dest_path.display()); - let resp = reqwest::get(url).await.unwrap(); - let mut out = fs::File::create(dest_path).expect("Failed to create an output file"); - let content = resp.bytes().await.unwrap(); + let resp = reqwest::get(url) + .await + .map_err(|e| anyhow::anyhow!("Error creating POST url: {e}"))?; + let mut out = fs::File::create(dest_path) + .map_err(|e| anyhow::anyhow!("Failed to create an output file: {e}"))?; + let content = resp + .bytes() + .await + .map_err(|e| anyhow::anyhow!("Failed to read download bytes: {e}"))?; io::copy(&mut content.as_bytes(), &mut out) - .expect("Failed to save the content to output file"); + .map_err(|e| anyhow::anyhow!("Failed to save the content to output file: {e}")) } pub async fn list_shares(&self) -> Result, anyhow::Error> { let shares = self.get("shares").await?; - let parsed: ShareResponse = serde_json::from_str(&shares).expect("Invalid response"); + let parsed: ShareResponse = serde_json::from_str(&shares) + .map_err(|e| anyhow::anyhow!("Invalid list shares response: {e}"))?; return Ok(parsed.items.clone()); } pub async fn list_schemas(&self, share: &Share) -> Result, anyhow::Error> { let schemas = self.get(&format!("shares/{}/schemas", share.name)).await?; - let parsed: SchemaResponse = serde_json::from_str(&schemas).expect("Invalid response"); + let parsed: SchemaResponse = serde_json::from_str(&schemas) + .map_err(|e| anyhow::anyhow!("Invalid list schemas response: {e}"))?; return Ok(parsed.items.clone()); } @@ -146,7 +179,8 @@ impl Client { schema.share, schema.name )) .await?; - let parsed: TableResponse = serde_json::from_str(&tables).expect("Invalid response"); + let parsed: TableResponse = serde_json::from_str(&tables) + .map_err(|e| anyhow::anyhow!("Invalid list tables response: {e}"))?; return Ok(parsed.items.clone()); } @@ -154,7 +188,8 @@ impl Client { let tables = self .get(&format!("shares/{}/all-tables", share.name)) .await?; - let parsed: TableResponse = serde_json::from_str(&tables).expect("Invalid response"); + let parsed: TableResponse = serde_json::from_str(&tables) + .map_err(|e| anyhow::anyhow!("Invalid list all tables response: {e}"))?; return Ok(parsed.items.clone()); } @@ -166,12 +201,20 @@ impl Client { )) .await?; let mut meta_lines = meta.lines(); - let protocol: ProtocolResponse = - serde_json::from_str(meta_lines.next().expect("Invalid response")) - .expect("Invalid protocol"); - let metadata: MetadataResponse = - serde_json::from_str(meta_lines.next().expect("Invalid response")) - .expect("Invalid metadata"); + let protocol: ProtocolResponse = meta_lines + .next() + .map(|lines| { + serde_json::from_str::(lines) + .map_err(|e| anyhow::anyhow!("Invalid protocol response - {lines}: {e}")) + }) + .unwrap_or(Err(anyhow::anyhow!("Empty protocol response")))?; + let metadata: MetadataResponse = meta_lines + .next() + .map(|lines| { + serde_json::from_str::(lines) + .map_err(|e| anyhow::anyhow!("Invalid metadata response - {lines}: {e}")) + }) + .unwrap_or(Err(anyhow::anyhow!("Empty metadata response")))?; Ok(TableMetadata { protocol: protocol.protocol, metadata: metadata.metadata, @@ -189,46 +232,40 @@ impl Client { ) .await; match version { - Some(v) => v + Ok(Some(v)) => v .to_str() - .expect("Invalid version number") - .parse::() - .expect("Invalid version number"), - None => -1, + .ok() + .and_then(|value| value.parse::().ok()) + .unwrap_or(-1), + _ => -1, } } pub async fn list_table_files( &self, table: &Table, - predicate_hints: Option>, - limit_hint: Option, - version: Option, + request: Option, ) -> Result { let mut map = Map::new(); - if predicate_hints.is_some() { + if let Some(predicate_hints) = request.as_ref().and_then(|r| r.predicate_hints.as_ref()) { map.insert( "predicateHints".to_string(), Value::Array( predicate_hints - .unwrap() .iter() .map(|s| Value::String(s.to_string())) .collect::>(), ), ); } - if limit_hint.is_some() { + if let Some(limit_hint) = request.as_ref().and_then(|r| r.limit_hint) { map.insert( "limitHint".to_string(), - Value::Number(Number::from(limit_hint.unwrap())), + Value::Number(Number::from(limit_hint)), ); } - if version.is_some() { - map.insert( - "version".to_string(), - Value::Number(Number::from(version.unwrap())), - ); + if let Some(version) = request.as_ref().and_then(|r| r.version) { + map.insert("version".to_string(), Value::Number(Number::from(version))); } let response = self .post( @@ -240,15 +277,24 @@ impl Client { ) .await?; let mut lines = response.lines(); - let protocol: ProtocolResponse = - serde_json::from_str(lines.next().expect("Invalid response")) - .expect("Invalid protocol"); - let metadata: MetadataResponse = - serde_json::from_str(lines.next().expect("Invalid response")) - .expect("Invalid metadata"); + let protocol: ProtocolResponse = lines + .next() + .map(|lines| { + serde_json::from_str::(lines) + .map_err(|e| anyhow::anyhow!("Invalid protocol response - {lines}: {e}")) + }) + .unwrap_or(Err(anyhow::anyhow!("Empty protocol response")))?; + let metadata: MetadataResponse = lines + .next() + .map(|lines| { + serde_json::from_str::(lines) + .map_err(|e| anyhow::anyhow!("Invalid metadata response - {lines}: {e}")) + }) + .unwrap_or(Err(anyhow::anyhow!("Empty metadata response")))?; let mut files: Vec = Vec::new(); for l in lines { - let file: FileResponse = serde_json::from_str(l).expect("Invalid file info"); + let file: FileResponse = + serde_json::from_str(l).map_err(|e| anyhow::anyhow!("Invalid file info: {e}"))?; files.push(file.file.clone()); } Ok(TableFiles { @@ -260,63 +306,110 @@ impl Client { }) } - async fn download_files(&self, table_path: &PathBuf, table_files: &TableFiles) -> Vec { + async fn download_files( + &self, + table_path: &PathBuf, + table_files: &TableFiles, + ) -> Result, anyhow::Error> { if Path::exists(&table_path) { - fs::remove_dir_all(&table_path).unwrap(); + fs::remove_dir_all(&table_path) + .map_err(|e| anyhow::anyhow!("Error cleaning table path: {e}"))?; } - fs::create_dir_all(&table_path).unwrap(); + fs::create_dir_all(&table_path) + .map_err(|e| anyhow::anyhow!("Error creating table path: {e}"))?; let mut file_paths: Vec = Vec::new(); - for file in table_files.files.clone() { - let dst_path = &table_path.join(format!("{}.snappy.parquet", &file.id)); - self.download(file.url, &dst_path).await; - file_paths.push(dst_path.clone()); + let count = table_files.files.len(); + for (index, file) in table_files.files.clone().into_iter().enumerate() { + match file { + File::Parquet(ParquetFile { id, url, .. }) => { + let dst_path = &table_path.join(format!("{}.snappy.parquet", &id)); + let bytes = self.download(url, &dst_path).await?; + debug!( + "Downloaded {}/{} {} ({} bytes)", + index + 1, + count, + dst_path.display(), + bytes + ); + file_paths.push(dst_path.clone()); + } + File::Delta(delta_file) => { + if let Some(url) = delta_file.get_url() { + let dst_path = + &table_path.join(format!("{}.snappy.parquet", &delta_file.id)); + let bytes = self.download(url, &dst_path).await?; + debug!( + "Downloaded {}/{} {} ({} bytes)", + index + 1, + count, + dst_path.display(), + bytes + ); + file_paths.push(dst_path.clone()) + } + } + } } - file_paths.clone() + Ok(file_paths.clone()) } async fn load_cached( &self, table_path: &PathBuf, table_files: &TableFiles, - ) -> Option> { + ) -> Result>, anyhow::Error> { // Check if the files exist, load and compare the files. let metadata_path = &table_path.join(METADATA_FILE); if Path::exists(&metadata_path) { - let metadata_str = &fs::read_to_string(&metadata_path).unwrap(); - let metadata: TableMetadata = serde_json::from_str(&metadata_str).expect(&format!( - "Invalid configuration in {}", - metadata_path.display() - )); + let metadata_str = &fs::read_to_string(&metadata_path).map_err(|e| { + anyhow::anyhow!("Error reading file path {}: {}", metadata_path.display(), e) + })?; + let metadata: TableMetadata = serde_json::from_str(&metadata_str).map_err(|e| { + anyhow::anyhow!( + "Invalid configuration in {}: {}", + metadata_path.display(), + e + ) + })?; let mut download = metadata != table_files.metadata; if !download { let mut file_paths: Vec = Vec::new(); for file in &table_files.files { - let file_path = &table_path.join(format!("{}.snappy.parquet", &file.id)); + let file_id = match file { + File::Parquet(ParquetFile { id, .. }) => id, + File::Delta(DeltaFile { id, .. }) => id, + }; + let file_path = &table_path.join(format!("{}.snappy.parquet", &file_id)); if !Path::exists(&file_path) { // File is missing, invalidate cache download = true; - fs::remove_dir(&table_path).unwrap(); + fs::remove_dir_all(&table_path) + .map_err(|e| anyhow::anyhow!("Error invalidating cache: {e}"))?; break; } file_paths.push(file_path.clone()); } if !download { - return Some(file_paths.clone()); + return Ok(Some(file_paths.clone())); } } } - None + Ok(None) } - pub async fn get_files(&mut self, table: &Table) -> Result, anyhow::Error> { + pub async fn get_files( + &mut self, + table: &Table, + request: Option, + ) -> Result, anyhow::Error> { let key = table.fully_qualified_name(); let mut download = true; let table_path = Path::new(&self.data_root).join(table.fully_qualified_name()); - let table_files = self.list_table_files(table, None, None, None).await?; + let table_files = self.list_table_files(table, request).await?; if let Some(cached) = self.cache.get(&key) { download = cached.table_files.metadata != table_files.metadata; - } else if let Some(cached) = self.load_cached(&table_path, &table_files).await { + } else if let Some(cached) = self.load_cached(&table_path, &table_files).await? { download = false; self.cache.insert( key.clone(), @@ -328,7 +421,7 @@ impl Client { } if download { info!("--> Downloading data files to {}", &table_path.display()); - let paths = self.download_files(&table_path, &table_files).await; + let paths = self.download_files(&table_path, &table_files).await?; serde_json::to_writer( &fs::File::create(&table_path.join(METADATA_FILE))?, &table_files.metadata, @@ -341,13 +434,23 @@ impl Client { }, ); } - Ok(self.cache.get(&key).unwrap().file_paths.clone()) + Ok(self + .cache + .get(&key) + .ok_or(anyhow::anyhow!("Error reading {key} from cache"))? + .file_paths + .clone()) } - pub async fn get_dataframe(&mut self, table: &Table) -> PolarResult { - self.get_files(&table).await?; + pub async fn get_dataframe( + &mut self, + table: &Table, + request: Option, + ) -> Result { + self.get_files(&table, request).await?; let table_path = Path::new(&self.data_root).join(table.fully_qualified_name()); load_parquet_files_as_dataframe(&table_path) + .map_err(|e| anyhow::anyhow!("Error loading parquet files: {e}")) } } @@ -364,7 +467,6 @@ mod tests { endpoint: "https://sharing.delta.io/delta-sharing/".to_string(), bearer_token: "token".to_string(), }; - let c = super::Client::new(config, None).await; - drop(c); + let _ = super::Client::new(config, None, None).await.unwrap(); } } diff --git a/src/lib.rs b/src/lib.rs index 5a37b9f..2bd2716 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -50,7 +50,7 @@ //! endpoint: "".to_string(), //! bearer_token: "".to_string(), //! }; -//! let mut app = Client::new(config, None).await.unwrap(); +//! let mut app = Client::new(config, None, None).await.unwrap(); //! let shares = app.list_shares().await.unwrap(); //! if shares.len() == 0 { //! println!("At least 1 Delta Share is required"); @@ -63,7 +63,7 @@ //! ); //! } else { //! let res = app -//! .get_dataframe(&tables[0]) +//! .get_dataframe(&tables[0], None) //! .await //! .unwrap() //! .collect() diff --git a/src/protocol.rs b/src/protocol.rs index d8e69ed..f01ed94 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -38,8 +38,24 @@ impl Table { #[derive(Deserialize, Debug, Clone, PartialEq, Serialize)] #[serde(rename_all = "camelCase")] -pub struct Protocol { +pub struct DeltaProtocol { pub min_reader_version: i32, + pub min_writer_version: i32, + pub reader_features: Vec, + pub writer_features: Vec, +} + +#[derive(Deserialize, Debug, Clone, PartialEq, Serialize)] +#[serde(untagged)] +pub enum Protocol { + Parquet { + #[serde(rename = "minReaderVersion")] + min_reader_version: i32, + }, + Delta { + #[serde(rename = "deltaProtocol")] + delta_protocol: DeltaProtocol, + }, } #[derive(Deserialize, Debug, Clone, PartialEq, Serialize)] @@ -50,7 +66,7 @@ pub struct Format { #[derive(Deserialize, Debug, Clone, PartialEq, Serialize)] #[serde(rename_all = "camelCase")] -pub struct Metadata { +pub struct ParquetMetadata { pub id: String, pub name: Option, pub description: Option, @@ -60,6 +76,21 @@ pub struct Metadata { pub partition_columns: Vec, } +#[derive(Deserialize, Debug, Clone, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct DeltaMetadata { + pub size: usize, + pub num_files: usize, + pub delta_metadata: ParquetMetadata, +} + +#[derive(Deserialize, Debug, Clone, PartialEq, Serialize)] +#[serde(untagged)] +pub enum Metadata { + Parquet(ParquetMetadata), + Delta(DeltaMetadata), +} + #[derive(Deserialize, Debug, Clone, PartialEq, Serialize)] pub struct TableMetadata { pub protocol: Protocol, @@ -68,7 +99,7 @@ pub struct TableMetadata { #[derive(Deserialize, Debug, Clone, PartialEq, Serialize)] #[serde(rename_all = "camelCase")] -pub struct File { +pub struct ParquetFile { pub id: String, pub url: String, pub partition_values: Map, @@ -76,8 +107,46 @@ pub struct File { pub stats: Option, } +#[derive(Deserialize, Debug, Clone, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct DeltaFile { + pub id: String, + pub deletion_vector_file_id: Option, + pub version: Option, + pub timestamp: Option, + pub expiration_timestamp: Option, + pub delta_single_action: Map, +} + +impl DeltaFile { + pub fn get_url(&self) -> Option { + if let Some(value) = self + .delta_single_action + .get("add") + .and_then(|add| add.get("path")) + { + return value.as_str().map(|v| v.to_string()); + } else { + return None; + } + } +} + +#[derive(Deserialize, Debug, Clone, PartialEq, Serialize)] +#[serde(untagged)] +pub enum File { + Parquet(ParquetFile), + Delta(DeltaFile), +} + #[derive(Deserialize, Debug, Clone, PartialEq, Serialize)] pub struct TableFiles { pub metadata: TableMetadata, pub files: Vec, } + +pub struct FilesRequest { + pub predicate_hints: Option>, + pub limit_hint: Option, + pub version: Option, +} diff --git a/src/reader.rs b/src/reader.rs index 6adbdb2..1b7ec90 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -1,12 +1,13 @@ -use polars::prelude::Result as PolarResult; use polars::prelude::*; use std::path::PathBuf; -pub fn load_parquet_files_as_dataframe(parquet_root_dir_path: &PathBuf) -> PolarResult { +pub fn load_parquet_files_as_dataframe( + parquet_root_dir_path: &PathBuf, +) -> Result { let search_pattern = parquet_root_dir_path .join("*.parquet") .display() .to_string(); - let res = LazyFrame::scan_parquet(search_pattern.into(), Default::default()); + let res = LazyFrame::scan_parquet(search_pattern, Default::default()); res } diff --git a/src/utils.rs b/src/utils.rs index fbd3821..a93f2f7 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -36,11 +36,6 @@ pub struct FileResponse { pub file: File, } -#[derive(Deserialize)] -pub struct FileActionResponse { - pub file: File, -} - #[derive(Deserialize, PartialEq, Serialize)] pub struct FileCache { pub table_files: TableFiles, diff --git a/tests/blocking.rs b/tests/blocking.rs index 3827211..99e699a 100644 --- a/tests/blocking.rs +++ b/tests/blocking.rs @@ -21,7 +21,7 @@ fn create_blocking_test_app() -> BlockingTestApp { endpoint: server.uri(), bearer_token: Uuid::new_v4().to_string(), }; - let client = Client::new(config, None).unwrap(); + let client = Client::new(config, None, None).unwrap(); let test_app = BlockingTestApp { client, server }; test_app } @@ -74,7 +74,7 @@ fn get_dataframe() { "shares/{}/schemas/{}/tables/{}/query", table.share, table.schema, table.name ); - let mut file: File = + let mut file: ParquetFile = serde_json::from_str(common::TEST_FILE_RESPONSE).expect("Invalid file info"); let file_url_path = "/shares/test.parquet"; file.url = format!("{}{}", &app.server.uri(), &file_url_path); @@ -117,15 +117,15 @@ fn get_dataframe() { .unwrap() .to_string(); - let df = c.get_dataframe(&table).unwrap().collect().unwrap(); + let df = c.get_dataframe(&table, None).unwrap().collect().unwrap(); assert_eq!(df.shape(), (5, 3), "Dataframe shape mismatch"); // Get the data again, this time it should be served from the local cache (enforced by Expections set on Mocks) - let df1 = c.get_dataframe(&table).unwrap().collect().unwrap(); + let df1 = c.get_dataframe(&table, None).unwrap().collect().unwrap(); assert_eq!(df1.shape(), (5, 3), "Dataframe shape mismatch"); assert_eq!( - df1.get_row(0).0[1], - polars::datatypes::AnyValue::Utf8("One"), + df1.get_row(0).unwrap().0[1], + polars::datatypes::AnyValue::String("One"), "Row value mismatch" ); } diff --git a/tests/client.rs b/tests/client.rs index a8346ba..0ca5dcf 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -112,28 +112,38 @@ async fn get_table_metadata() { let app = create_mocked_test_app(body, &url, method("GET")).await; let meta = app.client.get_table_metadata(&table).await.unwrap(); - assert_eq!(meta.protocol.min_reader_version, 1, "Protocol mismatch"); - assert_eq!( - meta.metadata.id, "cf9c9342-b773-4c7b-a217-037d02ffe5d8", - "Metadata ID mismatch" - ); - assert_eq!( - meta.metadata.format.provider, "parquet", - "Metadata format provider mismatch" - ); - assert_eq!( - meta.metadata.name, None, - "Metadata name value should be missing" - ); - assert_eq!( - meta.metadata.partition_columns.len(), - 0, - "There should be no partitions" - ); - assert_eq!( - meta.metadata.configuration["conf_1_name"], "conf_1_value", - "Configuration value expected" - ); + match meta.protocol { + Protocol::Delta { .. } => assert!(false, "Wrong protocol deserialization"), + Protocol::Parquet { min_reader_version } => { + assert_eq!(min_reader_version, 1, "Protocol mismatch") + } + }; + match meta.metadata { + Metadata::Delta { .. } => assert!(false, "Wrong metadata deserialization"), + Metadata::Parquet(ParquetMetadata { + id, + format, + name, + partition_columns, + configuration, + .. + }) => { + assert_eq!( + id, "cf9c9342-b773-4c7b-a217-037d02ffe5d8", + "Metadata ID mismatch" + ); + assert_eq!( + format.provider, "parquet", + "Metadata format provider mismatch" + ); + assert_eq!(name, None, "Metadata name value should be missing"); + assert_eq!(partition_columns.len(), 0, "There should be no partitions"); + assert_eq!( + configuration["conf_1_name"], "conf_1_value", + "Configuration value expected" + ); + } + }; } #[tokio::test] @@ -191,14 +201,13 @@ async fn list_all_table_files() { table.share, table.schema, table.name ); let app = create_mocked_test_app(body, &url, method("POST")).await; - let files = app - .client - .list_table_files(&table, None, None, None) - .await - .unwrap(); + let files = app.client.list_table_files(&table, None).await.unwrap(); assert_eq!(files.files.len(), 2, "File count mismatch"); - assert_eq!(files.files[1].id, "2", "File id mismatch"); + match &files.files[1] { + File::Parquet(ParquetFile { id, .. }) => assert_eq!(id, "2", "File id mismatch"), + File::Delta(DeltaFile { .. }) => assert!(false, "Wrong file deserialization"), + }; } #[tokio::test] @@ -219,7 +228,7 @@ async fn get_files() { "shares/{}/schemas/{}/tables/{}/query", table.share, table.schema, table.name ); - let mut file: File = + let mut file: ParquetFile = serde_json::from_str(common::TEST_FILE_RESPONSE).expect("Invalid file info"); let file_url_path = "/shares/test.parquet"; file.url = format!("{}{}", &app.server.uri(), &file_url_path); @@ -264,7 +273,7 @@ async fn get_files() { assert!(!Path::exists(&expected_path), "File should not exist"); - let files = c.get_files(&table).await.unwrap(); + let files = c.get_files(&table, None).await.unwrap(); assert_eq!(files.len(), 1, "File count mismatch"); assert_eq!(files[0], expected_path, "File path mismatch"); @@ -287,7 +296,7 @@ async fn get_dataframe() { "shares/{}/schemas/{}/tables/{}/query", table.share, table.schema, table.name ); - let mut file: File = + let mut file: ParquetFile = serde_json::from_str(common::TEST_FILE_RESPONSE).expect("Invalid file info"); let file_url_path = "/shares/test.parquet"; file.url = format!("{}{}", &app.server.uri(), &file_url_path); @@ -330,15 +339,25 @@ async fn get_dataframe() { .unwrap() .to_string(); - let df = c.get_dataframe(&table).await.unwrap().collect().unwrap(); + let df = c + .get_dataframe(&table, None) + .await + .unwrap() + .collect() + .unwrap(); assert_eq!(df.shape(), (5, 3), "Dataframe shape mismatch"); // Get the data again, this time it should be served from the local cache (enforced by Expections set on Mocks) - let df1 = c.get_dataframe(&table).await.unwrap().collect().unwrap(); + let df1 = c + .get_dataframe(&table, None) + .await + .unwrap() + .collect() + .unwrap(); assert_eq!(df1.shape(), (5, 3), "Dataframe shape mismatch"); assert_eq!( - df1.get_row(0).0[1], - polars::datatypes::AnyValue::Utf8("One"), + df1.get_row(0).unwrap().0[1], + polars::datatypes::AnyValue::String("One"), "Row value mismatch" ); } diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 8331554..3f1e2a9 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -9,6 +9,7 @@ use wiremock::matchers::{path, MethodExactMatcher}; use delta_sharing::protocol::*; use delta_sharing::Client; +#[allow(dead_code)] pub struct TestApp { pub client: Client, pub server: MockServer, @@ -18,6 +19,7 @@ pub const TEST_PROTOCOL_RESPONSE: &str = r#"{ "minReaderVersion": 1 }"#; pub const TEST_METADATA_RESPONSE: &str = r#"{ "id": "cf9c9342-b773-4c7b-a217-037d02ffe5d8", "format": { "provider": "parquet" }, "schemaString": "{\"type\":\"struct\",\"fields\":[{\"name\":\"int_field_1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"double_field_1\",\"type\":\"double\",\"nullable\":true,\"metadata\":{}}]}", "partitionColumns": [], "configuration": {"conf_1_name": "conf_1_value"} }"#; pub const TEST_FILE_RESPONSE: &str = r#"{ "url": "", "id": "1", "partitionValues": {}, "size": 2350, "stats": "{\"numRecords\":1}" }"#; +#[allow(dead_code)] pub async fn create_test_app() -> TestApp { let _ = env_logger::try_init(); @@ -28,11 +30,12 @@ pub async fn create_test_app() -> TestApp { endpoint: server.uri(), bearer_token: Uuid::new_v4().to_string(), }; - let client = Client::new(config, None).await.unwrap(); + let client = Client::new(config, None, None).await.unwrap(); let app = TestApp { client, server }; app } +#[allow(dead_code)] pub async fn create_mocked_test_app( body: &str, url: &str,