diff --git a/rust/src/builder.rs b/rust/src/builder.rs index ed2f207ddf..ab2ae8e2d7 100644 --- a/rust/src/builder.rs +++ b/rust/src/builder.rs @@ -1,6 +1,7 @@ //! Create or load DeltaTables use std::collections::HashMap; +use std::path::PathBuf; use std::sync::Arc; use crate::delta::{DeltaResult, DeltaTable, DeltaTableError}; @@ -345,34 +346,75 @@ pub(crate) fn str_option(map: &HashMap, key: &str) -> Option) -> DeltaResult { let table_uri = table_uri.as_ref(); - if let Ok(path) = std::fs::canonicalize(table_uri) { - return Url::from_directory_path(path) - .map_err(|_| DeltaTableError::InvalidTableLocation(table_uri.to_string())); + + enum UriType { + LocalPath(PathBuf), + Url(Url), } - if let Ok(url) = Url::parse(table_uri) { - return Ok(match url.scheme() { - "file" => url, - _ => { - let mut new_url = url.clone(); - new_url.set_path(url.path().trim_end_matches('/')); - new_url + + let uri_type: UriType = if let Ok(url) = Url::parse(table_uri) { + if url.scheme() == "file" { + UriType::LocalPath(url.to_file_path().map_err(|err| { + let msg = format!("Invalid table location: {}\nError: {:?}", table_uri, err); + DeltaTableError::InvalidTableLocation(msg) + })?) + } else { + UriType::Url(url) + } + } else { + UriType::LocalPath(PathBuf::from(table_uri)) + }; + + // If it is a local path, we need to create it if it does not exist. + let mut url = match uri_type { + UriType::LocalPath(path) => { + if !path.exists() { + std::fs::create_dir_all(&path).map_err(|err| { + let msg = format!( + "Could not create local directory: {}\nError: {:?}", + table_uri, err + ); + DeltaTableError::InvalidTableLocation(msg) + })?; } - }); - } - // The table uri still might be a relative paths that does not exist. - std::fs::create_dir_all(table_uri) - .map_err(|_| DeltaTableError::InvalidTableLocation(table_uri.to_string()))?; - let path = std::fs::canonicalize(table_uri) - .map_err(|_| DeltaTableError::InvalidTableLocation(table_uri.to_string()))?; - Url::from_directory_path(path) - .map_err(|_| DeltaTableError::InvalidTableLocation(table_uri.to_string())) + let path = std::fs::canonicalize(path).map_err(|err| { + let msg = format!("Invalid table location: {}\nError: {:?}", table_uri, err); + DeltaTableError::InvalidTableLocation(msg) + })?; + Url::from_directory_path(path).map_err(|_| { + let msg = format!( + "Could not construct a URL from canonicalized path: {}.\n\ + Something must be very wrong with the table path.", + table_uri + ); + DeltaTableError::InvalidTableLocation(msg) + })? + } + UriType::Url(url) => url, + }; + + let trimmed_path = url.path().trim_end_matches('/').to_owned(); + url.set_path(&trimmed_path); + Ok(url) } #[cfg(test)] mod tests { use super::*; + use std::path::Path; #[test] fn test_ensure_table_uri() { @@ -383,13 +425,74 @@ mod tests { assert!(uri.is_ok()); let uri = ensure_table_uri("s3://container/path"); assert!(uri.is_ok()); - let uri = ensure_table_uri("file:///").unwrap(); - assert_eq!("file:///", uri.as_str()); - let uri = ensure_table_uri("memory://").unwrap(); - assert_eq!("memory://", uri.as_str()); - let uri = ensure_table_uri("s3://tests/data/delta-0.8.0/").unwrap(); - assert_eq!("s3://tests/data/delta-0.8.0", uri.as_str()); - let _uri = ensure_table_uri("s3://tests/data/delta-0.8.0//").unwrap(); - assert_eq!("s3://tests/data/delta-0.8.0", uri.as_str()) + + // These cases should all roundtrip to themselves + let roundtrip_cases = &[ + "s3://tests/data/delta-0.8.0", + "memory://", + "file:///", + "s3://bucket/my%20table", // Doesn't double-encode + ]; + + for case in roundtrip_cases { + let uri = ensure_table_uri(case).unwrap(); + assert_eq!(case, &uri.as_str()); + } + + // Other cases + let map_cases = &[ + // extra slashes are removed + ( + "s3://tests/data/delta-0.8.0//", + "s3://tests/data/delta-0.8.0", + ), + ("s3://bucket/my table", "s3://bucket/my%20table"), + ]; + + for (case, expected) in map_cases { + let uri = ensure_table_uri(case).unwrap(); + assert_eq!(expected, &uri.as_str()); + } + } + + #[test] + fn test_ensure_table_uri_path() { + let tmp_dir = tempdir::TempDir::new("test").unwrap(); + let tmp_path = std::fs::canonicalize(tmp_dir.path()).unwrap(); + let paths = &[ + tmp_path.join("data/delta-0.8.0"), + tmp_path.join("space in path"), + tmp_path.join("special&chars/你好/😊"), + ]; + + for path in paths { + assert!(!path.exists()); + let expected = Url::parse(&format!("file://{}", path.to_str().unwrap())).unwrap(); + let uri = ensure_table_uri(path.as_os_str().to_str().unwrap()).unwrap(); + assert_eq!(expected, uri); + assert!(path.exists()); + } + + // Creates non-existent relative directories + let relative_path = Path::new("_tmp/test %3F"); + assert!(!relative_path.exists()); + ensure_table_uri(relative_path.as_os_str().to_str().unwrap()).unwrap(); + assert!(relative_path.exists()); + std::fs::remove_dir_all(relative_path).unwrap(); + } + + #[test] + fn test_ensure_table_uri_url() { + // Urls should round trips as-is + let expected = Url::parse("s3://tests/data/delta-0.8.0").unwrap(); + let url = ensure_table_uri(&expected).unwrap(); + assert_eq!(expected, url); + + let tmp_dir = tempdir::TempDir::new("test").unwrap(); + let tmp_path = std::fs::canonicalize(tmp_dir.path()).unwrap(); + let path = tmp_path.join("data/delta-0.8.0"); + let expected = Url::parse(&format!("file://{}", path.to_str().unwrap())).unwrap(); + let url = ensure_table_uri(&expected).unwrap(); + assert_eq!(expected, url); } } diff --git a/rust/src/delta.rs b/rust/src/delta.rs index 300da985dd..b7d6cb66aa 100644 --- a/rust/src/delta.rs +++ b/rust/src/delta.rs @@ -232,7 +232,7 @@ pub enum DeltaTableError { /// A Feature is missing to perform operation #[error("Delta-rs must be build with feature '{feature}' to support loading from: {url}.")] MissingFeature { - /// Name of the missiing feature + /// Name of the missing feature feature: &'static str, /// Storage location url url: String, @@ -263,6 +263,14 @@ pub enum DeltaTableError { }, } +impl From for DeltaTableError { + fn from(err: object_store::path::Error) -> Self { + Self::GenericError { + source: Box::new(err), + } + } +} + /// Delta table metadata #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub struct DeltaTableMetaData { diff --git a/rust/src/storage/config.rs b/rust/src/storage/config.rs index 8f32747022..882b89e97a 100644 --- a/rust/src/storage/config.rs +++ b/rust/src/storage/config.rs @@ -30,7 +30,7 @@ use object_store::gcp::{GoogleCloudStorageBuilder, GoogleConfigKey}; use std::str::FromStr; /// Options used for configuring backend storage -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize, Default)] pub struct StorageOptions(pub HashMap); impl StorageOptions { @@ -98,128 +98,192 @@ impl From> for StorageOptions { Self::new(value) } } - -pub(crate) enum ObjectStoreKind { - Local, - InMemory, - S3, - Google, - Azure, - Hdfs, -} - -impl ObjectStoreKind { - pub fn parse_url(url: &Url) -> DeltaResult { - match url.scheme() { - "file" => Ok(ObjectStoreKind::Local), - "memory" => Ok(ObjectStoreKind::InMemory), - "az" | "abfs" | "abfss" | "azure" | "wasb" | "adl" => Ok(ObjectStoreKind::Azure), - "s3" | "s3a" => Ok(ObjectStoreKind::S3), - "gs" => Ok(ObjectStoreKind::Google), - "hdfs" => Ok(ObjectStoreKind::Hdfs), - "https" => { - let host = url.host_str().unwrap_or_default(); - if host.contains("amazonaws.com") { - Ok(ObjectStoreKind::S3) - } else if host.contains("dfs.core.windows.net") - || host.contains("blob.core.windows.net") - { - Ok(ObjectStoreKind::Azure) - } else { - Err(DeltaTableError::Generic(format!( - "unsupported url: {}", - url.as_str() - ))) - } +pub(crate) fn configure_store( + url: &Url, + options: &StorageOptions, +) -> DeltaResult> { + match url.scheme() { + "file" => try_configure_local( + url.to_file_path() + .map_err(|_| DeltaTableError::InvalidTableLocation(url.to_string()))? + .to_str() + .ok_or_else(|| DeltaTableError::InvalidTableLocation(url.to_string()))?, + ), + "memory" => try_configure_memory(url), + "az" | "abfs" | "abfss" | "azure" | "wasb" | "adl" => try_configure_azure(url, options), + "s3" | "s3a" => try_configure_s3(url, options), + "gs" => try_configure_gcs(url, options), + "hdfs" => try_configure_hdfs(url, options), + "https" => { + let host = url.host_str().unwrap_or_default(); + if host.contains("amazonaws.com") { + try_configure_s3(url, options) + } else if host.contains("dfs.core.windows.net") + || host.contains("blob.core.windows.net") + { + try_configure_azure(url, options) + } else { + Err(DeltaTableError::Generic(format!( + "unsupported url: {}", + url.as_str() + ))) } - _ => Err(DeltaTableError::Generic(format!( - "unsupported url: {}", - url.as_str() - ))), } + _ => Err(DeltaTableError::Generic(format!( + "unsupported url: {}", + url.as_str() + ))), } +} - pub fn into_impl( - self, - storage_url: &Url, - options: impl Into, - ) -> DeltaResult> { - let _options = options.into(); - match self { - ObjectStoreKind::Local => Ok(Self::url_prefix_handler( - FileStorageBackend::new(), - storage_url, - )), - ObjectStoreKind::InMemory => Ok(Self::url_prefix_handler(InMemory::new(), storage_url)), - #[cfg(any(feature = "s3", feature = "s3-native-tls"))] - ObjectStoreKind::S3 => { - let amazon_s3 = AmazonS3Builder::from_env() - .with_url(storage_url.as_ref()) - .try_with_options(&_options.as_s3_options())? - .with_allow_http(_options.allow_http()) - .build()?; - let store = S3StorageBackend::try_new( - Arc::new(amazon_s3), - S3StorageOptions::from_map(&_options.0), - )?; - Ok(Self::url_prefix_handler(store, storage_url)) - } - #[cfg(not(any(feature = "s3", feature = "s3-native-tls")))] - ObjectStoreKind::S3 => Err(DeltaTableError::MissingFeature { - feature: "s3", - url: storage_url.as_ref().into(), - }), - #[cfg(feature = "azure")] - ObjectStoreKind::Azure => { - let store = MicrosoftAzureBuilder::from_env() - .with_url(storage_url.as_ref()) - .try_with_options(&_options.as_azure_options())? - .with_allow_http(_options.allow_http()) - .build()?; - Ok(Self::url_prefix_handler(store, storage_url)) - } - #[cfg(not(feature = "azure"))] - ObjectStoreKind::Azure => Err(DeltaTableError::MissingFeature { - feature: "azure", - url: storage_url.as_ref().into(), - }), - #[cfg(feature = "gcs")] - ObjectStoreKind::Google => { - let store = GoogleCloudStorageBuilder::from_env() - .with_url(storage_url.as_ref()) - .try_with_options(&_options.as_gcs_options())? - .build()?; - Ok(Self::url_prefix_handler(store, storage_url)) - } - #[cfg(not(feature = "gcs"))] - ObjectStoreKind::Google => Err(DeltaTableError::MissingFeature { - feature: "gcs", - url: storage_url.as_ref().into(), - }), - #[cfg(feature = "hdfs")] - ObjectStoreKind::Hdfs => { - let store = HadoopFileSystem::new(storage_url.as_ref()).ok_or_else(|| { - DeltaTableError::Generic(format!( - "failed to create HadoopFileSystem for {}", - storage_url.as_ref() - )) - })?; - Ok(Self::url_prefix_handler(store, storage_url)) - } - #[cfg(not(feature = "hdfs"))] - ObjectStoreKind::Hdfs => Err(DeltaTableError::MissingFeature { - feature: "hdfs", - url: storage_url.as_ref().into(), - }), - } +fn try_configure_local>(path: P) -> Result, DeltaTableError> { + Ok(Arc::new(FileStorageBackend::try_new(path.as_ref())?)) +} + +fn try_configure_memory(storage_url: &Url) -> DeltaResult> { + url_prefix_handler(InMemory::new(), storage_url) +} + +#[cfg(feature = "gcs")] +fn try_configure_gcs( + storage_url: &Url, + options: &StorageOptions, +) -> DeltaResult> { + let store = GoogleCloudStorageBuilder::from_env() + .with_url(storage_url.as_ref()) + .try_with_options(&options.as_gcs_options())? + .build()?; + url_prefix_handler(store, storage_url) +} + +#[cfg(not(feature = "gcs"))] +fn try_configure_gcs( + storage_url: &Url, + _options: &StorageOptions, +) -> DeltaResult> { + Err(DeltaTableError::MissingFeature { + feature: "gcs", + url: storage_url.as_ref().into(), + }) +} + +#[cfg(feature = "azure")] +fn try_configure_azure( + storage_url: &Url, + options: &StorageOptions, +) -> DeltaResult> { + let store = MicrosoftAzureBuilder::from_env() + .with_url(storage_url.as_ref()) + .try_with_options(&options.as_azure_options())? + .with_allow_http(options.allow_http()) + .build()?; + url_prefix_handler(store, storage_url) +} + +#[cfg(not(feature = "azure"))] +fn try_configure_azure( + storage_url: &Url, + _options: &StorageOptions, +) -> DeltaResult> { + Err(DeltaTableError::MissingFeature { + feature: "azure", + url: storage_url.as_ref().into(), + }) +} + +#[cfg(any(feature = "s3", feature = "s3-native-tls"))] +fn try_configure_s3( + storage_url: &Url, + options: &StorageOptions, +) -> DeltaResult> { + let amazon_s3 = AmazonS3Builder::from_env() + .with_url(storage_url.as_ref()) + .try_with_options(&options.as_s3_options())? + .with_allow_http(options.allow_http()) + .build()?; + let store = + S3StorageBackend::try_new(Arc::new(amazon_s3), S3StorageOptions::from_map(&options.0))?; + url_prefix_handler(store, storage_url) +} + +#[cfg(not(any(feature = "s3", feature = "s3-native-tls")))] +fn try_configure_s3( + storage_url: &Url, + _options: &StorageOptions, +) -> DeltaResult> { + Err(DeltaTableError::MissingFeature { + feature: "s3", + url: storage_url.as_ref().into(), + }) +} + +#[cfg(feature = "hdfs")] +fn try_configure_hdfs( + storage_url: &Url, + _options: &StorageOptions, +) -> DeltaResult> { + let store = HadoopFileSystem::new(storage_url.as_ref()).ok_or_else(|| { + DeltaTableError::Generic(format!( + "failed to create HadoopFileSystem for {}", + storage_url.as_ref() + )) + })?; + url_prefix_handler(store, storage_url) +} + +#[cfg(not(feature = "hdfs"))] +fn try_configure_hdfs( + storage_url: &Url, + _options: &StorageOptions, +) -> DeltaResult> { + Err(DeltaTableError::MissingFeature { + feature: "hdfs", + url: storage_url.as_ref().into(), + }) +} + +fn url_prefix_handler( + store: T, + storage_url: &Url, +) -> DeltaResult> { + let prefix = Path::parse(storage_url.path())?; + if prefix != Path::from("/") { + Ok(Arc::new(PrefixStore::new(store, prefix))) + } else { + Ok(Arc::new(store)) } +} - fn url_prefix_handler(store: T, storage_url: &Url) -> Arc { - let prefix = Path::from(storage_url.path()); - if prefix != Path::from("/") { - Arc::new(PrefixStore::new(store, prefix)) - } else { - Arc::new(store) - } +#[cfg(test)] +mod test { + use crate::ensure_table_uri; + + use super::*; + + #[tokio::test] + async fn test_configure_store_local() -> Result<(), Box> { + let temp_dir = tempfile::tempdir().unwrap(); + let temp_dir_path = temp_dir.path(); + let path = temp_dir_path.join("test space 😁"); + + let table_uri = ensure_table_uri(path.as_os_str().to_str().unwrap()).unwrap(); + + let store = configure_store(&table_uri, &StorageOptions::default()).unwrap(); + + let contents = b"test"; + let key = "test.txt"; + let file_path = path.join(key); + std::fs::write(&file_path, contents).unwrap(); + + let res = store + .get(&object_store::path::Path::from(key)) + .await + .unwrap() + .bytes() + .await + .unwrap(); + assert_eq!(res.as_ref(), contents); + + Ok(()) } } diff --git a/rust/src/storage/file.rs b/rust/src/storage/file.rs index c276cd781f..b0775416b4 100644 --- a/rust/src/storage/file.rs +++ b/rust/src/storage/file.rs @@ -12,6 +12,7 @@ use object_store::{ use std::ops::Range; use std::sync::Arc; use tokio::io::AsyncWrite; +use url::Url; const STORE_NAME: &str = "DeltaLocalObjectStore"; @@ -93,17 +94,46 @@ impl From for ObjectStoreError { /// * Darwin is supported but not fully tested. /// Patches welcome. /// * Support for other platforms are not implemented at the moment. -#[derive(Debug, Default)] +#[derive(Debug)] pub struct FileStorageBackend { inner: Arc, + root_url: Arc, } impl FileStorageBackend { /// Creates a new FileStorageBackend. - pub fn new() -> Self { - Self { - inner: Arc::new(LocalFileSystem::default()), - } + pub fn try_new(path: impl AsRef) -> ObjectStoreResult { + Ok(Self { + root_url: Arc::new(Self::path_to_root_url(path.as_ref())?), + inner: Arc::new(LocalFileSystem::new_with_prefix(path)?), + }) + } + + fn path_to_root_url(path: &std::path::Path) -> ObjectStoreResult { + let root_path = + std::fs::canonicalize(path).map_err(|e| object_store::Error::InvalidPath { + source: object_store::path::Error::Canonicalize { + path: path.into(), + source: e, + }, + })?; + + Url::from_file_path(root_path).map_err(|_| object_store::Error::InvalidPath { + source: object_store::path::Error::InvalidPath { path: path.into() }, + }) + } + + /// Return an absolute filesystem path of the given location + fn path_to_filesystem(&self, location: &ObjectStorePath) -> String { + let mut url = self.root_url.as_ref().clone(); + url.path_segments_mut() + .expect("url path") + // technically not necessary as Path ignores empty segments + // but avoids creating paths with "//" which look odd in error messages. + .pop_if_empty() + .extend(location.parts()); + + url.to_file_path().unwrap().to_str().unwrap().to_owned() } } @@ -113,19 +143,6 @@ impl std::fmt::Display for FileStorageBackend { } } -/// Return an absolute filesystem path of the given location -fn path_to_filesystem(location: &ObjectStorePath) -> String { - let mut url = url::Url::parse("file:///").unwrap(); - url.path_segments_mut() - .expect("url path") - // technically not necessary as Path ignores empty segments - // but avoids creating paths with "//" which look odd in error messages. - .pop_if_empty() - .extend(location.parts()); - - url.to_file_path().unwrap().to_str().unwrap().to_owned() -} - #[async_trait::async_trait] impl ObjectStore for FileStorageBackend { async fn put(&self, location: &ObjectStorePath, bytes: Bytes) -> ObjectStoreResult<()> { @@ -183,8 +200,8 @@ impl ObjectStore for FileStorageBackend { from: &ObjectStorePath, to: &ObjectStorePath, ) -> ObjectStoreResult<()> { - let path_from = path_to_filesystem(from); - let path_to = path_to_filesystem(to); + let path_from = self.path_to_filesystem(from); + let path_to = self.path_to_filesystem(to); Ok(rename_noreplace(path_from.as_ref(), path_to.as_ref()).await?) } diff --git a/rust/src/storage/mod.rs b/rust/src/storage/mod.rs index c0768bc9d8..c9fe842f93 100644 --- a/rust/src/storage/mod.rs +++ b/rust/src/storage/mod.rs @@ -4,7 +4,7 @@ pub mod config; pub mod file; pub mod utils; -use self::config::{ObjectStoreKind, StorageOptions}; +use self::config::StorageOptions; use crate::{DeltaDataTypeVersion, DeltaResult}; use bytes::Bytes; @@ -89,12 +89,12 @@ impl DeltaObjectStore { /// * `location` - A url pointing to the root of the delta table. /// * `options` - Options passed to underlying builders. See [`with_storage_options`](crate::builder::DeltaTableBuilder::with_storage_options) pub fn try_new(location: Url, options: impl Into + Clone) -> DeltaResult { - let storage = - ObjectStoreKind::parse_url(&location)?.into_impl(&location, options.clone())?; + let options = options.into(); + let storage = config::configure_store(&location, &options)?; Ok(Self { storage, location, - options: options.into(), + options, }) } diff --git a/rust/src/test_utils.rs b/rust/src/test_utils.rs index 865e16cb5f..a11bff8c17 100644 --- a/rust/src/test_utils.rs +++ b/rust/src/test_utils.rs @@ -118,6 +118,14 @@ impl IntegrationContext { std::fs::create_dir_all(&dest_path)?; copy(table.as_path(), &dest_path, &options)?; } + StorageIntegration::Amazon => { + let dest = format!("{}/{}", self.root_uri(), name.as_ref()); + s3_cli::copy_directory(table.as_path(), dest)?; + } + StorageIntegration::Microsoft => { + let dest = format!("{}/{}", self.bucket, name.as_ref()); + az_cli::copy_directory(table.as_path(), dest)?; + } _ => { let from = table.as_path().as_str().to_owned(); let to = format!("{}/{}", self.root_uri(), name.as_ref()); @@ -203,6 +211,7 @@ pub enum TestTables { SimpleCommit, Golden, Delta0_8_0Partitioned, + Delta0_8_0SpecialPartitioned, Custom(String), } @@ -226,6 +235,11 @@ impl TestTables { .to_str() .unwrap() .to_owned(), + Self::Delta0_8_0SpecialPartitioned => data_path + .join("delta-0.8.0-special-partition") + .to_str() + .unwrap() + .to_owned(), // the data path for upload does not apply to custom tables. Self::Custom(_) => todo!(), } @@ -237,6 +251,7 @@ impl TestTables { Self::SimpleCommit => "simple_commit".into(), Self::Golden => "golden".into(), Self::Delta0_8_0Partitioned => "delta-0.8.0-partitioned".into(), + Self::Delta0_8_0SpecialPartitioned => "delta-0.8.0-special-partition".into(), Self::Custom(name) => name.to_owned(), } } @@ -284,6 +299,26 @@ pub mod az_cli { child.wait() } + /// copy directory + pub fn copy_directory( + source: impl AsRef, + destination: impl AsRef, + ) -> std::io::Result { + let mut child = Command::new("az") + .args([ + "storage", + "blob", + "upload-batch", + "-s", + source.as_ref(), + "-d", + destination.as_ref(), + ]) + .spawn() + .expect("az command is installed"); + child.wait() + } + /// prepare_env pub fn prepare_env() { set_env_if_not_set("AZURE_STORAGE_USE_EMULATOR", "1"); @@ -341,6 +376,28 @@ pub mod s3_cli { child.wait() } + /// copy directory + pub fn copy_directory( + source: impl AsRef, + destination: impl AsRef, + ) -> std::io::Result { + let endpoint = std::env::var(s3_storage_options::AWS_ENDPOINT_URL) + .expect("variable ENDPOINT must be set to connect to S3"); + let mut child = Command::new("aws") + .args([ + "s3", + "cp", + source.as_ref(), + destination.as_ref(), + "--endpoint-url", + &endpoint, + "--recursive", + ]) + .spawn() + .expect("aws command is installed"); + child.wait() + } + /// prepare_env pub fn prepare_env() { set_env_if_not_set( diff --git a/rust/tests/integration_concurrent_writes.rs b/rust/tests/integration_concurrent_writes.rs index ca91654ad8..739d8bffde 100644 --- a/rust/tests/integration_concurrent_writes.rs +++ b/rust/tests/integration_concurrent_writes.rs @@ -15,6 +15,7 @@ async fn test_concurrent_writes_local() -> TestResult { Ok(()) } +#[cfg(feature = "s3")] #[tokio::test] async fn concurrent_writes_s3() -> TestResult { test_concurrent_writes(StorageIntegration::Amazon).await?; diff --git a/rust/tests/integration_object_store.rs b/rust/tests/integration_object_store.rs index 2dd96a5cda..daf702d291 100644 --- a/rust/tests/integration_object_store.rs +++ b/rust/tests/integration_object_store.rs @@ -3,7 +3,7 @@ use bytes::Bytes; use deltalake::storage::utils::flatten_list_stream; use deltalake::test_utils::{IntegrationContext, StorageIntegration, TestResult}; -use deltalake::DeltaTableBuilder; +use deltalake::{DeltaTableBuilder, ObjectStore}; use object_store::{path::Path, DynObjectStore, Error as ObjectStoreError}; use serial_test::serial; @@ -22,6 +22,7 @@ async fn test_object_store_azure() -> TestResult { Ok(()) } +#[cfg(feature = "s3")] #[tokio::test] #[serial] async fn test_object_store_aws() -> TestResult { @@ -415,3 +416,29 @@ async fn delete_fixtures(storage: &DynObjectStore) -> TestResult { } Ok(()) } + +#[tokio::test] +#[serial] +async fn test_object_store_prefixes_local() -> TestResult { + test_object_store_prefixes(StorageIntegration::Local).await?; + Ok(()) +} + +async fn test_object_store_prefixes(integration: StorageIntegration) -> TestResult { + let context = IntegrationContext::new(integration)?; + let prefixes = &["table path", "table path/hello%3F", "你好/😊"]; + for prefix in prefixes { + let rooturi = format!("{}/{}", context.root_uri(), prefix); + let delta_store = DeltaTableBuilder::from_uri(&rooturi) + .with_allow_http(true) + .build_storage()?; + + let contents = Bytes::from("cats"); + let path = Path::from("test"); + delta_store.put(&path, contents.clone()).await?; + let data = delta_store.get(&path).await?.bytes().await?; + assert_eq!(&data, &contents); + } + + Ok(()) +} diff --git a/rust/tests/integration_read.rs b/rust/tests/integration_read.rs index b2a499ad6f..67d135d084 100644 --- a/rust/tests/integration_read.rs +++ b/rust/tests/integration_read.rs @@ -1,7 +1,7 @@ #![cfg(feature = "integration_test")] use deltalake::test_utils::{IntegrationContext, StorageIntegration, TestResult, TestTables}; -use deltalake::DeltaTableBuilder; +use deltalake::{DeltaTableBuilder, ObjectStore}; #[cfg(any(feature = "s3", feature = "s3-native-tls"))] use dynamodb_lock::dynamo_lock_options; #[cfg(any(feature = "s3", feature = "s3-native-tls"))] @@ -9,17 +9,34 @@ use maplit::hashmap; use object_store::path::Path; use serial_test::serial; +static TEST_PREFIXES: &[&str] = &["my table", "你好/😊"]; + +/// TEST_PREFIXES as they should appear in object stores. +static TEST_PREFIXES_ENCODED: &[&str] = &["my%20table", "%E4%BD%A0%E5%A5%BD/%F0%9F%98%8A"]; + #[tokio::test] #[serial] async fn test_read_tables_local() -> TestResult { - Ok(read_tables(StorageIntegration::Local).await?) + read_tables(StorageIntegration::Local).await?; + + for prefix in TEST_PREFIXES { + read_table_paths(StorageIntegration::Local, prefix, prefix).await?; + } + + Ok(()) } #[cfg(feature = "azure")] #[tokio::test] #[serial] async fn test_read_tables_azure() -> TestResult { - Ok(read_tables(StorageIntegration::Microsoft).await?) + read_tables(StorageIntegration::Microsoft).await?; + + for (prefix, prefix_encoded) in TEST_PREFIXES.iter().zip(TEST_PREFIXES_ENCODED.iter()) { + read_table_paths(StorageIntegration::Microsoft, prefix, prefix_encoded).await?; + } + + Ok(()) } #[cfg(feature = "hdfs")] @@ -33,13 +50,22 @@ async fn test_read_tables_hdfs() -> TestResult { #[tokio::test] #[serial] async fn test_read_tables_aws() -> TestResult { - Ok(read_tables(StorageIntegration::Amazon).await?) + read_tables(StorageIntegration::Amazon).await?; + + for (prefix, prefix_encoded) in TEST_PREFIXES.iter().zip(TEST_PREFIXES_ENCODED.iter()) { + read_table_paths(StorageIntegration::Amazon, prefix, prefix_encoded).await?; + } + + Ok(()) } async fn read_tables(storage: StorageIntegration) -> TestResult { let context = IntegrationContext::new(storage)?; context.load_table(TestTables::Simple).await?; context.load_table(TestTables::Golden).await?; + context + .load_table(TestTables::Delta0_8_0SpecialPartitioned) + .await?; read_simple_table(&context).await?; read_simple_table_with_version(&context).await?; @@ -48,6 +74,56 @@ async fn read_tables(storage: StorageIntegration) -> TestResult { Ok(()) } +async fn read_table_paths( + storage: StorageIntegration, + table_root: &str, + upload_path: &str, +) -> TestResult { + let context = IntegrationContext::new(storage)?; + context + .load_table_with_name(TestTables::Delta0_8_0SpecialPartitioned, upload_path) + .await?; + + verify_store(&context, table_root).await?; + + read_encoded_table(&context, table_root).await?; + + Ok(()) +} + +async fn verify_store(integration: &IntegrationContext, root_path: &str) -> TestResult { + let table_uri = format!("{}/{}", integration.root_uri(), root_path); + let storage = DeltaTableBuilder::from_uri(table_uri.clone()) + .with_allow_http(true) + .build_storage()?; + + let files = storage.list_with_delimiter(None).await?; + assert_eq!( + vec![ + Path::parse("_delta_log").unwrap(), + Path::parse("x=A%2FA").unwrap(), + Path::parse("x=B%20B").unwrap(), + ], + files.common_prefixes + ); + + Ok(()) +} + +async fn read_encoded_table(integration: &IntegrationContext, root_path: &str) -> TestResult { + let table_uri = format!("{}/{}", integration.root_uri(), root_path); + + let table = DeltaTableBuilder::from_uri(table_uri) + .with_allow_http(true) + .load() + .await?; + + assert_eq!(table.version(), 0); + assert_eq!(table.get_files().len(), 2); + + Ok(()) +} + async fn read_simple_table(integration: &IntegrationContext) -> TestResult { let table_uri = integration.uri_for_table(TestTables::Simple); // the s3 options don't hurt us for other integrations ...