Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: aws credential resolving, reduce api calls #3107

Merged
merged 2 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/aws/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-aws"
version = "0.6.0"
version = "0.6.1"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand Down
10 changes: 7 additions & 3 deletions crates/aws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use std::{
sync::Arc,
time::{Duration, SystemTime},
};
use storage::S3StorageOptionsConversion;
use storage::{S3ObjectStoreFactory, S3StorageOptions};
use tracing::debug;
use tracing::warn;
Expand All @@ -46,6 +47,8 @@ use url::Url;
#[derive(Clone, Debug, Default)]
pub struct S3LogStoreFactory {}

impl S3StorageOptionsConversion for S3LogStoreFactory {}

impl LogStoreFactory for S3LogStoreFactory {
fn with_options(
&self,
Expand All @@ -54,7 +57,7 @@ impl LogStoreFactory for S3LogStoreFactory {
options: &StorageOptions,
) -> DeltaResult<Arc<dyn LogStore>> {
let store = url_prefix_handler(store, Path::parse(location.path())?);

let options = self.with_env_s3(options);
if options.0.keys().any(|key| {
let key = key.to_ascii_lowercase();
[
Expand All @@ -65,7 +68,7 @@ impl LogStoreFactory for S3LogStoreFactory {
}) {
debug!("S3LogStoreFactory has been asked to create a LogStore where the underlying store has copy-if-not-exists enabled - no locking provider required");
warn!("Most S3 object store support conditional put, remove copy_if_not_exists parameter to use a more performant conditional put.");
return Ok(logstore::default_s3_logstore(store, location, options));
return Ok(logstore::default_s3_logstore(store, location, &options));
}

let s3_options = S3StorageOptions::from_map(&options.0)?;
Expand All @@ -78,7 +81,7 @@ impl LogStoreFactory for S3LogStoreFactory {
store,
)?));
}
Ok(default_logstore(store, location, options))
Ok(default_logstore(store, location, &options))
}
}

Expand Down Expand Up @@ -777,6 +780,7 @@ mod tests {
unsafe {
std::env::set_var(crate::constants::AWS_S3_LOCKING_PROVIDER, "dynamodb");
}

let logstore = factory
.with_options(Arc::new(store), &url, &StorageOptions::from(HashMap::new()))
.unwrap();
Expand Down
120 changes: 62 additions & 58 deletions crates/aws/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,52 +32,7 @@ const STORE_NAME: &str = "DeltaS3ObjectStore";
#[derive(Clone, Default, Debug)]
pub struct S3ObjectStoreFactory {}

impl S3ObjectStoreFactory {
fn with_env_s3(&self, options: &StorageOptions) -> StorageOptions {
let mut options = StorageOptions(
options
.0
.clone()
.into_iter()
.map(|(k, v)| {
if let Ok(config_key) = AmazonS3ConfigKey::from_str(&k.to_ascii_lowercase()) {
(config_key.as_ref().to_string(), v)
} else {
(k, v)
}
})
.collect(),
);

for (os_key, os_value) in std::env::vars_os() {
if let (Some(key), Some(value)) = (os_key.to_str(), os_value.to_str()) {
if let Ok(config_key) = AmazonS3ConfigKey::from_str(&key.to_ascii_lowercase()) {
if !options.0.contains_key(config_key.as_ref()) {
options
.0
.insert(config_key.as_ref().to_string(), value.to_string());
}
}
}
}

// All S3-like Object Stores use conditional put, object-store crate however still requires you to explicitly
// set this behaviour. We will however assume, when a locking provider/copy-if-not-exists keys are not provided
// that PutIfAbsent is supported.
// With conditional put in S3-like API we can use the deltalake default logstore which use PutIfAbsent
if !options.0.keys().any(|key| {
let key = key.to_ascii_lowercase();
[
AmazonS3ConfigKey::ConditionalPut.as_ref(),
"conditional_put",
]
.contains(&key.as_str())
}) {
options.0.insert("conditional_put".into(), "etag".into());
}
options
}
}
impl S3StorageOptionsConversion for S3ObjectStoreFactory {}

impl ObjectStoreFactory for S3ObjectStoreFactory {
fn parse_url_opts(
Expand All @@ -102,19 +57,17 @@ impl ObjectStoreFactory for S3ObjectStoreFactory {
})?;
let prefix = Path::parse(path)?;

if is_aws(storage_options) {
debug!("Detected AWS S3, resolving credentials");
let sdk_config = execute_sdk_future(crate::credentials::resolve_credentials(
storage_options.clone(),
))??;
let s3_options: S3StorageOptions = S3StorageOptions::from_map(&options.0)?;

if let Some(ref sdk_config) = s3_options.sdk_config {
builder = builder.with_credentials(Arc::new(
crate::credentials::AWSForObjectStore::new(sdk_config),
crate::credentials::AWSForObjectStore::new(sdk_config.clone()),
));
}

let inner = builder.build()?;

let store = aws_storage_handler(limit_store_handler(inner, &options), &options)?;
let store = aws_storage_handler(limit_store_handler(inner, &options), &s3_options)?;
debug!("Initialized the object store: {store:?}");

Ok((store, prefix))
Expand All @@ -123,9 +76,8 @@ impl ObjectStoreFactory for S3ObjectStoreFactory {

fn aws_storage_handler(
store: ObjectStoreRef,
options: &StorageOptions,
s3_options: &S3StorageOptions,
) -> DeltaResult<ObjectStoreRef> {
let s3_options = S3StorageOptions::from_map(&options.0)?;
// Nearly all S3 Object stores support conditional put, so we change the default to always returning an S3 Object store
// unless explicitly passing a locking provider key or allow_unsafe_rename. Then we will pass it to the old S3StorageBackend.
if s3_options.locking_provider.as_deref() == Some("dynamodb") || s3_options.allow_unsafe_rename
Expand All @@ -146,12 +98,18 @@ fn aws_storage_handler(
// This function will return true in the default case since it's most likely that the absence of
// options will mean default/S3 configuration
fn is_aws(options: &StorageOptions) -> bool {
if options.0.contains_key(constants::AWS_FORCE_CREDENTIAL_LOAD) {
// Checks storage option first then env var for existence of aws force credential load
// .from_s3_env never inserts these into the options because they are delta-rs specific
if str_option(&options.0, constants::AWS_FORCE_CREDENTIAL_LOAD).is_some() {
return true;
}
if options.0.contains_key(constants::AWS_S3_LOCKING_PROVIDER) {

// Checks storage option first then env var for existence of locking provider
// .from_s3_env never inserts these into the options because they are delta-rs specific
if str_option(&options.0, constants::AWS_S3_LOCKING_PROVIDER).is_some() {
return true;
}

// Options at this stage should only contain 'aws_endpoint' in lowercase
// due to with_env_s3
!(options.0.contains_key("aws_endpoint") || options.0.contains_key(constants::AWS_ENDPOINT_URL))
Expand Down Expand Up @@ -240,7 +198,7 @@ impl S3StorageOptions {
let sdk_config = match is_aws(&storage_options) {
false => None,
true => {
debug!("Detected AWS S3, resolving credentials");
debug!("Detected AWS S3 Storage options, resolving AWS credentials");
Some(execute_sdk_future(
crate::credentials::resolve_credentials(storage_options.clone()),
)??)
Expand Down Expand Up @@ -477,6 +435,52 @@ pub(crate) fn str_option(map: &HashMap<String, String>, key: &str) -> Option<Str
std::env::var(key).ok()
}

pub(crate) trait S3StorageOptionsConversion {
fn with_env_s3(&self, options: &StorageOptions) -> StorageOptions {
let mut options = StorageOptions(
options
.0
.clone()
.into_iter()
.map(|(k, v)| {
if let Ok(config_key) = AmazonS3ConfigKey::from_str(&k.to_ascii_lowercase()) {
(config_key.as_ref().to_string(), v)
} else {
(k, v)
}
})
.collect(),
);

for (os_key, os_value) in std::env::vars_os() {
if let (Some(key), Some(value)) = (os_key.to_str(), os_value.to_str()) {
if let Ok(config_key) = AmazonS3ConfigKey::from_str(&key.to_ascii_lowercase()) {
options
.0
.entry(config_key.as_ref().to_string())
.or_insert(value.to_string());
}
}
}

// All S3-like Object Stores use conditional put, object-store crate however still requires you to explicitly
// set this behaviour. We will however assume, when a locking provider/copy-if-not-exists keys are not provided
// that PutIfAbsent is supported.
// With conditional put in S3-like API we can use the deltalake default logstore which use PutIfAbsent
if !options.0.keys().any(|key| {
let key = key.to_ascii_lowercase();
[
AmazonS3ConfigKey::ConditionalPut.as_ref(),
"conditional_put",
]
.contains(&key.as_str())
}) {
options.0.insert("conditional_put".into(), "etag".into());
}
options
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
2 changes: 1 addition & 1 deletion python/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-python"
version = "0.23.2"
version = "0.23.3"
authors = ["Qingping Hou <dave2008713@gmail.com>", "Will Jones <willjones127@gmail.com>"]
homepage = "https://github.com/delta-io/delta-rs"
license = "Apache-2.0"
Expand Down
Loading