From dae29bc22e3fa781c394765fe31a80b38e2e1e75 Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Sun, 29 Dec 2024 17:40:22 +0100 Subject: [PATCH] refactor: make putifabsent default for s3 --- crates/aws/src/lib.rs | 55 +++++++++++++++++++++++---------------- crates/aws/src/storage.rs | 17 +++++------- 2 files changed, 38 insertions(+), 34 deletions(-) diff --git a/crates/aws/src/lib.rs b/crates/aws/src/lib.rs index c062e47334..aaee393ec9 100644 --- a/crates/aws/src/lib.rs +++ b/crates/aws/src/lib.rs @@ -40,6 +40,7 @@ use std::{ }; use storage::{S3ObjectStoreFactory, S3StorageOptions}; use tracing::debug; +use tracing::warn; use url::Url; #[derive(Clone, Debug, Default)] @@ -54,19 +55,6 @@ impl LogStoreFactory for S3LogStoreFactory { ) -> DeltaResult> { let store = url_prefix_handler(store, Path::parse(location.path())?); - // With conditional put in S3-like API we can use the deltalake default logstore which use PutIfAbsent - if options.0.keys().any(|key| { - let key = key.to_ascii_lowercase(); - [ - AmazonS3ConfigKey::ConditionalPut.as_ref(), - "conditional_put", - ] - .contains(&key.as_str()) - }) { - debug!("S3LogStoreFactory has been asked to create a default LogStore where the underlying store has Conditonal Put enabled - no locking provider required"); - return Ok(default_logstore(store, location, options)); - } - if options.0.keys().any(|key| { let key = key.to_ascii_lowercase(); [ @@ -76,22 +64,43 @@ impl LogStoreFactory for S3LogStoreFactory { .contains(&key.as_str()) }) { debug!("S3LogStoreFactory has been asked to create a LogStore where the underlying store has copy-if-not-exists enabled - no locking provider required"); + warn!("Most S3 object store support conditional put, remove copy_if_not_exists parameter to use a more performant conditional put."); return Ok(logstore::default_s3_logstore(store, location, options)); } let s3_options = S3StorageOptions::from_map(&options.0)?; - - if s3_options.locking_provider.as_deref() != Some("dynamodb") { - debug!("S3LogStoreFactory has been asked to create a LogStore without the dynamodb locking provider"); - return Ok(logstore::default_s3_logstore(store, location, options)); + if s3_options.locking_provider.as_deref() == Some("dynamodb") { + debug!("S3LogStoreFactory has been asked to create a LogStore with the dynamodb locking provider"); + return Ok(Arc::new(logstore::S3DynamoDbLogStore::try_new( + location.clone(), + options.clone(), + &s3_options, + store, + )?)); } - Ok(Arc::new(logstore::S3DynamoDbLogStore::try_new( - location.clone(), - options.clone(), - &s3_options, - store, - )?)) + // All S3-like Object Stores use conditional put, object-store crate however still requires you to explicitly + // set this behaviour. We will however assume, when a locking provider/copy-if-not-exists keys are not provided + // that PutIfAbsent is supported. + // With conditional put in S3-like API we can use the deltalake default logstore which use PutIfAbsent + let inner = if !options.0.keys().any(|key| { + let key = key.to_ascii_lowercase(); + [ + AmazonS3ConfigKey::ConditionalPut.as_ref(), + "conditional_put", + ] + .contains(&key.as_str()) + }) { + let mut inner = options.0.clone(); + inner.insert("aws_conditional_put".to_string(), "etag".to_string()); + inner + } else { + options.0.clone() + }; + + let options = StorageOptions::from(inner); + debug!("S3LogStoreFactory has been asked to create a default LogStore where the underlying store has Conditonal Put enabled - no locking provider required"); + Ok(default_logstore(store, location, &options)) } } diff --git a/crates/aws/src/storage.rs b/crates/aws/src/storage.rs index 84d5533f82..329ef6023b 100644 --- a/crates/aws/src/storage.rs +++ b/crates/aws/src/storage.rs @@ -110,24 +110,19 @@ fn aws_storage_handler( store: ObjectStoreRef, options: &StorageOptions, ) -> DeltaResult { - // If the copy-if-not-exists env var is set or ConditionalPut is set, we don't need to instantiate a locking client or check for allow-unsafe-rename. - if options - .0 - .contains_key(AmazonS3ConfigKey::CopyIfNotExists.as_ref()) - || options - .0 - .contains_key(AmazonS3ConfigKey::ConditionalPut.as_ref()) + let s3_options = S3StorageOptions::from_map(&options.0)?; + // Nearly all S3 Object stores support conditional put, so we change the default to always returning an S3 Object store + // unless explicitly passing a locking provider key or allow_unsafe_rename. Then we will pass it to the old S3StorageBackend. + if s3_options.locking_provider.as_deref() == Some("dynamodb") || s3_options.allow_unsafe_rename { - Ok(store) - } else { - let s3_options = S3StorageOptions::from_map(&options.0)?; - let store = S3StorageBackend::try_new( store, Some("dynamodb") == s3_options.locking_provider.as_deref() || s3_options.allow_unsafe_rename, )?; Ok(Arc::new(store)) + } else { + Ok(store) } }