Skip to content

Commit

Permalink
refactor: make putifabsent default for s3
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco authored and rtyler committed Jan 1, 2025
1 parent 336aeb2 commit dae29bc
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 34 deletions.
55 changes: 32 additions & 23 deletions crates/aws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use std::{
};
use storage::{S3ObjectStoreFactory, S3StorageOptions};
use tracing::debug;
use tracing::warn;
use url::Url;

#[derive(Clone, Debug, Default)]
Expand All @@ -54,19 +55,6 @@ impl LogStoreFactory for S3LogStoreFactory {
) -> DeltaResult<Arc<dyn LogStore>> {
let store = url_prefix_handler(store, Path::parse(location.path())?);

// With conditional put in S3-like API we can use the deltalake default logstore which use PutIfAbsent
if options.0.keys().any(|key| {
let key = key.to_ascii_lowercase();
[
AmazonS3ConfigKey::ConditionalPut.as_ref(),
"conditional_put",
]
.contains(&key.as_str())
}) {
debug!("S3LogStoreFactory has been asked to create a default LogStore where the underlying store has Conditonal Put enabled - no locking provider required");
return Ok(default_logstore(store, location, options));
}

if options.0.keys().any(|key| {
let key = key.to_ascii_lowercase();
[
Expand All @@ -76,22 +64,43 @@ impl LogStoreFactory for S3LogStoreFactory {
.contains(&key.as_str())
}) {
debug!("S3LogStoreFactory has been asked to create a LogStore where the underlying store has copy-if-not-exists enabled - no locking provider required");
warn!("Most S3 object store support conditional put, remove copy_if_not_exists parameter to use a more performant conditional put.");
return Ok(logstore::default_s3_logstore(store, location, options));
}

let s3_options = S3StorageOptions::from_map(&options.0)?;

if s3_options.locking_provider.as_deref() != Some("dynamodb") {
debug!("S3LogStoreFactory has been asked to create a LogStore without the dynamodb locking provider");
return Ok(logstore::default_s3_logstore(store, location, options));
if s3_options.locking_provider.as_deref() == Some("dynamodb") {
debug!("S3LogStoreFactory has been asked to create a LogStore with the dynamodb locking provider");
return Ok(Arc::new(logstore::S3DynamoDbLogStore::try_new(
location.clone(),
options.clone(),
&s3_options,
store,
)?));
}

Ok(Arc::new(logstore::S3DynamoDbLogStore::try_new(
location.clone(),
options.clone(),
&s3_options,
store,
)?))
// All S3-like Object Stores use conditional put, object-store crate however still requires you to explicitly
// set this behaviour. We will however assume, when a locking provider/copy-if-not-exists keys are not provided
// that PutIfAbsent is supported.
// With conditional put in S3-like API we can use the deltalake default logstore which use PutIfAbsent
let inner = if !options.0.keys().any(|key| {
let key = key.to_ascii_lowercase();
[
AmazonS3ConfigKey::ConditionalPut.as_ref(),
"conditional_put",
]
.contains(&key.as_str())
}) {
let mut inner = options.0.clone();
inner.insert("aws_conditional_put".to_string(), "etag".to_string());
inner
} else {
options.0.clone()
};

let options = StorageOptions::from(inner);
debug!("S3LogStoreFactory has been asked to create a default LogStore where the underlying store has Conditonal Put enabled - no locking provider required");
Ok(default_logstore(store, location, &options))
}
}

Expand Down
17 changes: 6 additions & 11 deletions crates/aws/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,24 +110,19 @@ fn aws_storage_handler(
store: ObjectStoreRef,
options: &StorageOptions,
) -> DeltaResult<ObjectStoreRef> {
// If the copy-if-not-exists env var is set or ConditionalPut is set, we don't need to instantiate a locking client or check for allow-unsafe-rename.
if options
.0
.contains_key(AmazonS3ConfigKey::CopyIfNotExists.as_ref())
|| options
.0
.contains_key(AmazonS3ConfigKey::ConditionalPut.as_ref())
let s3_options = S3StorageOptions::from_map(&options.0)?;
// Nearly all S3 Object stores support conditional put, so we change the default to always returning an S3 Object store
// unless explicitly passing a locking provider key or allow_unsafe_rename. Then we will pass it to the old S3StorageBackend.
if s3_options.locking_provider.as_deref() == Some("dynamodb") || s3_options.allow_unsafe_rename
{
Ok(store)
} else {
let s3_options = S3StorageOptions::from_map(&options.0)?;

let store = S3StorageBackend::try_new(
store,
Some("dynamodb") == s3_options.locking_provider.as_deref()
|| s3_options.allow_unsafe_rename,
)?;
Ok(Arc::new(store))
} else {
Ok(store)
}
}

Expand Down

0 comments on commit dae29bc

Please sign in to comment.