diff --git a/crates/aws/src/lib.rs b/crates/aws/src/lib.rs index 154b9de5bc..c879724c29 100644 --- a/crates/aws/src/lib.rs +++ b/crates/aws/src/lib.rs @@ -38,6 +38,7 @@ use std::{ sync::Arc, time::{Duration, SystemTime}, }; +use storage::S3StorageOptionsConversion; use storage::{S3ObjectStoreFactory, S3StorageOptions}; use tracing::debug; use tracing::warn; @@ -46,6 +47,8 @@ use url::Url; #[derive(Clone, Debug, Default)] pub struct S3LogStoreFactory {} +impl S3StorageOptionsConversion for S3LogStoreFactory {} + impl LogStoreFactory for S3LogStoreFactory { fn with_options( &self, @@ -54,7 +57,7 @@ impl LogStoreFactory for S3LogStoreFactory { options: &StorageOptions, ) -> DeltaResult> { let store = url_prefix_handler(store, Path::parse(location.path())?); - + let options = self.with_env_s3(options); if options.0.keys().any(|key| { let key = key.to_ascii_lowercase(); [ @@ -65,7 +68,7 @@ impl LogStoreFactory for S3LogStoreFactory { }) { debug!("S3LogStoreFactory has been asked to create a LogStore where the underlying store has copy-if-not-exists enabled - no locking provider required"); warn!("Most S3 object store support conditional put, remove copy_if_not_exists parameter to use a more performant conditional put."); - return Ok(logstore::default_s3_logstore(store, location, options)); + return Ok(logstore::default_s3_logstore(store, location, &options)); } let s3_options = S3StorageOptions::from_map(&options.0)?; @@ -78,7 +81,7 @@ impl LogStoreFactory for S3LogStoreFactory { store, )?)); } - Ok(default_logstore(store, location, options)) + Ok(default_logstore(store, location, &options)) } } diff --git a/crates/aws/src/storage.rs b/crates/aws/src/storage.rs index 9c6473e234..87882f63bf 100644 --- a/crates/aws/src/storage.rs +++ b/crates/aws/src/storage.rs @@ -32,52 +32,7 @@ const STORE_NAME: &str = "DeltaS3ObjectStore"; #[derive(Clone, Default, Debug)] pub struct S3ObjectStoreFactory {} -impl S3ObjectStoreFactory { - fn with_env_s3(&self, options: &StorageOptions) -> StorageOptions { - let mut options = StorageOptions( - options - .0 - .clone() - .into_iter() - .map(|(k, v)| { - if let Ok(config_key) = AmazonS3ConfigKey::from_str(&k.to_ascii_lowercase()) { - (config_key.as_ref().to_string(), v) - } else { - (k, v) - } - }) - .collect(), - ); - - for (os_key, os_value) in std::env::vars_os() { - if let (Some(key), Some(value)) = (os_key.to_str(), os_value.to_str()) { - if let Ok(config_key) = AmazonS3ConfigKey::from_str(&key.to_ascii_lowercase()) { - if !options.0.contains_key(config_key.as_ref()) { - options - .0 - .insert(config_key.as_ref().to_string(), value.to_string()); - } - } - } - } - - // All S3-like Object Stores use conditional put, object-store crate however still requires you to explicitly - // set this behaviour. We will however assume, when a locking provider/copy-if-not-exists keys are not provided - // that PutIfAbsent is supported. - // With conditional put in S3-like API we can use the deltalake default logstore which use PutIfAbsent - if !options.0.keys().any(|key| { - let key = key.to_ascii_lowercase(); - [ - AmazonS3ConfigKey::ConditionalPut.as_ref(), - "conditional_put", - ] - .contains(&key.as_str()) - }) { - options.0.insert("conditional_put".into(), "etag".into()); - } - options - } -} +impl S3StorageOptionsConversion for S3ObjectStoreFactory {} impl ObjectStoreFactory for S3ObjectStoreFactory { fn parse_url_opts( @@ -102,19 +57,17 @@ impl ObjectStoreFactory for S3ObjectStoreFactory { })?; let prefix = Path::parse(path)?; - if is_aws(storage_options) { - debug!("Detected AWS S3, resolving credentials"); - let sdk_config = execute_sdk_future(crate::credentials::resolve_credentials( - storage_options.clone(), - ))??; + let s3_options: S3StorageOptions = S3StorageOptions::from_map(&options.0)?; + + if let Some(ref sdk_config) = s3_options.sdk_config { builder = builder.with_credentials(Arc::new( - crate::credentials::AWSForObjectStore::new(sdk_config), + crate::credentials::AWSForObjectStore::new(sdk_config.clone()), )); } let inner = builder.build()?; - let store = aws_storage_handler(limit_store_handler(inner, &options), &options)?; + let store = aws_storage_handler(limit_store_handler(inner, &options), &s3_options)?; debug!("Initialized the object store: {store:?}"); Ok((store, prefix)) @@ -123,9 +76,8 @@ impl ObjectStoreFactory for S3ObjectStoreFactory { fn aws_storage_handler( store: ObjectStoreRef, - options: &StorageOptions, + s3_options: &S3StorageOptions, ) -> DeltaResult { - let s3_options = S3StorageOptions::from_map(&options.0)?; // Nearly all S3 Object stores support conditional put, so we change the default to always returning an S3 Object store // unless explicitly passing a locking provider key or allow_unsafe_rename. Then we will pass it to the old S3StorageBackend. if s3_options.locking_provider.as_deref() == Some("dynamodb") || s3_options.allow_unsafe_rename @@ -154,7 +106,8 @@ fn is_aws(options: &StorageOptions) -> bool { } // Options at this stage should only contain 'aws_endpoint' in lowercase // due to with_env_s3 - !(options.0.contains_key("aws_endpoint") || options.0.contains_key(constants::AWS_ENDPOINT_URL)) + !(options.0.contains_key("aws_endpoint") + || !options.0.contains_key(constants::AWS_ENDPOINT_URL)) } /// Options used to configure the [S3StorageBackend]. @@ -240,7 +193,7 @@ impl S3StorageOptions { let sdk_config = match is_aws(&storage_options) { false => None, true => { - debug!("Detected AWS S3, resolving credentials"); + debug!("Detected AWS S3 Storage options, resolving AWS credentials"); Some(execute_sdk_future( crate::credentials::resolve_credentials(storage_options.clone()), )??) @@ -477,6 +430,53 @@ pub(crate) fn str_option(map: &HashMap, key: &str) -> Option StorageOptions { + let mut options = StorageOptions( + options + .0 + .clone() + .into_iter() + .map(|(k, v)| { + if let Ok(config_key) = AmazonS3ConfigKey::from_str(&k.to_ascii_lowercase()) { + (config_key.as_ref().to_string(), v) + } else { + (k, v) + } + }) + .collect(), + ); + + for (os_key, os_value) in std::env::vars_os() { + if let (Some(key), Some(value)) = (os_key.to_str(), os_value.to_str()) { + if let Ok(config_key) = AmazonS3ConfigKey::from_str(&key.to_ascii_lowercase()) { + if !options.0.contains_key(config_key.as_ref()) { + options + .0 + .insert(config_key.as_ref().to_string(), value.to_string()); + } + } + } + } + + // All S3-like Object Stores use conditional put, object-store crate however still requires you to explicitly + // set this behaviour. We will however assume, when a locking provider/copy-if-not-exists keys are not provided + // that PutIfAbsent is supported. + // With conditional put in S3-like API we can use the deltalake default logstore which use PutIfAbsent + if !options.0.keys().any(|key| { + let key = key.to_ascii_lowercase(); + [ + AmazonS3ConfigKey::ConditionalPut.as_ref(), + "conditional_put", + ] + .contains(&key.as_str()) + }) { + options.0.insert("conditional_put".into(), "etag".into()); + } + options + } +} + #[cfg(test)] mod tests { use super::*;