Skip to content

Commit

Permalink
feat: Retry with reloaded credentials on cloud error (#20185)
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion authored Dec 7, 2024
1 parent 19939ae commit fdaf517
Show file tree
Hide file tree
Showing 12 changed files with 527 additions and 274 deletions.
5 changes: 4 additions & 1 deletion crates/polars-io/src/cloud/adaptors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,10 @@ impl CloudWriter {

let (cloud_location, object_store) =
crate::cloud::build_object_store(uri, cloud_options, false).await?;
Self::new_with_object_store(object_store, object_path_from_str(&cloud_location.prefix)?)
Self::new_with_object_store(
object_store.to_dyn_object_store().await,
object_path_from_str(&cloud_location.prefix)?,
)
}

pub fn close(&mut self) -> PolarsResult<()> {
Expand Down
46 changes: 30 additions & 16 deletions crates/polars-io/src/cloud/glob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use object_store::path::Path;
use polars_core::error::to_compute_err;
use polars_core::prelude::{polars_ensure, polars_err};
use polars_error::PolarsResult;
use polars_utils::pl_str::PlSmallStr;
use regex::Regex;
use url::Url;

Expand Down Expand Up @@ -74,13 +75,13 @@ pub(crate) fn extract_prefix_expansion(url: &str) -> PolarsResult<(String, Optio
#[derive(PartialEq, Debug, Default)]
pub struct CloudLocation {
/// The scheme (s3, ...).
pub scheme: String,
pub scheme: PlSmallStr,
/// The bucket name.
pub bucket: String,
pub bucket: PlSmallStr,
/// The prefix inside the bucket, this will be the full key when wildcards are not used.
pub prefix: String,
/// The path components that need to be expanded.
pub expansion: Option<String>,
pub expansion: Option<PlSmallStr>,
}

impl CloudLocation {
Expand All @@ -102,7 +103,8 @@ impl CloudLocation {
.ok_or_else(
|| polars_err!(ComputeError: "cannot parse bucket (host) from url: {}", parsed),
)?
.to_string();
.to_string()
.into();
(bucket, key)
};

Expand All @@ -114,9 +116,9 @@ impl CloudLocation {
if is_local && key.starts_with(DELIMITER) {
prefix.insert(0, DELIMITER);
}
(prefix, expansion)
(prefix, expansion.map(|x| x.into()))
} else {
(key.to_string(), None)
(key.as_ref().into(), None)
};

Ok(CloudLocation {
Expand Down Expand Up @@ -155,7 +157,7 @@ impl Matcher {
}

pub(crate) fn is_matching(&self, key: &str) -> bool {
if !key.starts_with(&self.prefix) {
if !key.starts_with(self.prefix.as_str()) {
// Prefix does not match, should not happen.
return false;
}
Expand Down Expand Up @@ -183,23 +185,35 @@ pub async fn glob(url: &str, cloud_options: Option<&CloudOptions>) -> PolarsResu
let matcher = &Matcher::new(
if scheme == "file" {
// For local paths the returned location has the leading slash stripped.
prefix[1..].to_string()
prefix[1..].into()
} else {
prefix.clone()
},
expansion.as_deref(),
)?;

let path = Path::from(prefix.as_str());
let path = Some(&path);

let mut locations = store
.list(Some(&Path::from(prefix)))
.try_filter_map(|x| async move {
let out =
(x.size > 0 && matcher.is_matching(x.location.as_ref())).then_some(x.location);
Ok(out)
.try_exec_rebuild_on_err(|store| {
let st = store.clone();

async {
let store = st;
store
.list(path)
.try_filter_map(|x| async move {
let out = (x.size > 0 && matcher.is_matching(x.location.as_ref()))
.then_some(x.location);
Ok(out)
})
.try_collect::<Vec<_>>()
.await
.map_err(to_compute_err)
}
})
.try_collect::<Vec<_>>()
.await
.map_err(to_compute_err)?;
.await?;

locations.sort_unstable();
Ok(locations
Expand Down
217 changes: 135 additions & 82 deletions crates/polars-io/src/cloud/object_store_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,22 @@ use once_cell::sync::Lazy;
use polars_core::config;
use polars_error::{polars_bail, to_compute_err, PolarsError, PolarsResult};
use polars_utils::aliases::PlHashMap;
use polars_utils::pl_str::PlSmallStr;
use tokio::sync::RwLock;
use url::Url;

use super::{parse_url, CloudLocation, CloudOptions, CloudType};
use super::{parse_url, CloudLocation, CloudOptions, CloudType, PolarsObjectStore};
use crate::cloud::CloudConfig;

/// Object stores must be cached. Every object-store will do DNS lookups and
/// get rate limited when querying the DNS (can take up to 5s).
/// Other reasons are connection pools that must be shared between as much as possible.
#[allow(clippy::type_complexity)]
static OBJECT_STORE_CACHE: Lazy<RwLock<PlHashMap<String, Arc<dyn ObjectStore>>>> =
static OBJECT_STORE_CACHE: Lazy<RwLock<PlHashMap<String, PolarsObjectStore>>> =
Lazy::new(Default::default);

type BuildResult = PolarsResult<(CloudLocation, Arc<dyn ObjectStore>)>;

#[allow(dead_code)]
fn err_missing_feature(feature: &str, scheme: &str) -> BuildResult {
fn err_missing_feature(feature: &str, scheme: &str) -> PolarsResult<Arc<dyn ObjectStore>> {
polars_bail!(
ComputeError:
"feature '{}' must be enabled in order to use '{}' cloud urls", feature, scheme,
Expand Down Expand Up @@ -64,6 +63,7 @@ fn url_and_creds_to_key(url: &Url, options: Option<&CloudOptions>) -> String {
},
))
.unwrap();

format!(
"{}://{}<\\creds\\>{}",
url.scheme(),
Expand All @@ -77,6 +77,126 @@ pub fn object_path_from_str(path: &str) -> PolarsResult<object_store::path::Path
object_store::path::Path::parse(path).map_err(to_compute_err)
}

#[derive(Debug, Clone)]
pub(crate) struct PolarsObjectStoreBuilder {
url: PlSmallStr,
parsed_url: Url,
#[allow(unused)]
scheme: PlSmallStr,
cloud_type: CloudType,
options: Option<CloudOptions>,
}

impl PolarsObjectStoreBuilder {
pub(super) async fn build_impl(&self) -> PolarsResult<Arc<dyn ObjectStore>> {
let options = self
.options
.as_ref()
.unwrap_or_else(|| CloudOptions::default_static_ref());

let store = match self.cloud_type {
CloudType::Aws => {
#[cfg(feature = "aws")]
{
let store = options.build_aws(&self.url).await?;
Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
}
#[cfg(not(feature = "aws"))]
return err_missing_feature("aws", &self.scheme);
},
CloudType::Gcp => {
#[cfg(feature = "gcp")]
{
let store = options.build_gcp(&self.url)?;
Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
}
#[cfg(not(feature = "gcp"))]
return err_missing_feature("gcp", &self.scheme);
},
CloudType::Azure => {
{
#[cfg(feature = "azure")]
{
let store = options.build_azure(&self.url)?;
Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
}
}
#[cfg(not(feature = "azure"))]
return err_missing_feature("azure", &self.scheme);
},
CloudType::File => {
let local = LocalFileSystem::new();
Ok::<_, PolarsError>(Arc::new(local) as Arc<dyn ObjectStore>)
},
CloudType::Http => {
{
#[cfg(feature = "http")]
{
let store = options.build_http(&self.url)?;
PolarsResult::Ok(Arc::new(store) as Arc<dyn ObjectStore>)
}
}
#[cfg(not(feature = "http"))]
return err_missing_feature("http", &cloud_location.scheme);
},
CloudType::Hf => panic!("impl error: unresolved hf:// path"),
}?;

Ok(store)
}

/// Note: Use `build_impl` for a non-caching version.
pub(super) async fn build(self) -> PolarsResult<PolarsObjectStore> {
let opt_cache_key = match &self.cloud_type {
CloudType::Aws | CloudType::Gcp | CloudType::Azure => Some(url_and_creds_to_key(
&self.parsed_url,
self.options.as_ref(),
)),
CloudType::File | CloudType::Http | CloudType::Hf => None,
};

let opt_cache_write_guard = if let Some(cache_key) = opt_cache_key.as_deref() {
let cache = OBJECT_STORE_CACHE.read().await;

if let Some(store) = cache.get(cache_key) {
return Ok(store.clone());
}

drop(cache);

let cache = OBJECT_STORE_CACHE.write().await;

if let Some(store) = cache.get(cache_key) {
return Ok(store.clone());
}

Some(cache)
} else {
None
};

let store = self.build_impl().await?;
let store = PolarsObjectStore::new_from_inner(store, self);

if let Some(mut cache) = opt_cache_write_guard {
// Clear the cache if we surpass a certain amount of buckets.
if cache.len() >= 8 {
if config::verbose() {
eprintln!(
"build_object_store: clearing store cache (cache.len(): {})",
cache.len()
);
}
cache.clear()
}

cache.insert(opt_cache_key.unwrap(), store.clone());
}

Ok(store)
}
}

/// Build an [`ObjectStore`] based on the URL and passed in url. Return the cloud location and an implementation of the object store.
pub async fn build_object_store(
url: &str,
Expand All @@ -86,88 +206,21 @@ pub async fn build_object_store(
)]
options: Option<&CloudOptions>,
glob: bool,
) -> BuildResult {
) -> PolarsResult<(CloudLocation, PolarsObjectStore)> {
let parsed = parse_url(url).map_err(to_compute_err)?;
let cloud_location = CloudLocation::from_url(&parsed, glob)?;
let cloud_type = CloudType::from_url(&parsed)?;

// FIXME: `credential_provider` is currently serializing the entire Python function here
// into a string with pickle for this cache key because we are using `serde_json::to_string`
let key = url_and_creds_to_key(&parsed, options);
let mut allow_cache = true;

{
let cache = OBJECT_STORE_CACHE.read().await;
if let Some(store) = cache.get(&key) {
return Ok((cloud_location, store.clone()));
}
let store = PolarsObjectStoreBuilder {
url: url.into(),
parsed_url: parsed,
scheme: cloud_location.scheme.as_str().into(),
cloud_type,
options: options.cloned(),
}
.build()
.await?;

let options = options.map(std::borrow::Cow::Borrowed).unwrap_or_default();

let cloud_type = CloudType::from_url(&parsed)?;
let store = match cloud_type {
CloudType::Aws => {
#[cfg(feature = "aws")]
{
let store = options.build_aws(url).await?;
Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
}
#[cfg(not(feature = "aws"))]
return err_missing_feature("aws", &cloud_location.scheme);
},
CloudType::Gcp => {
#[cfg(feature = "gcp")]
{
let store = options.build_gcp(url)?;
Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
}
#[cfg(not(feature = "gcp"))]
return err_missing_feature("gcp", &cloud_location.scheme);
},
CloudType::Azure => {
{
#[cfg(feature = "azure")]
{
let store = options.build_azure(url)?;
Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
}
}
#[cfg(not(feature = "azure"))]
return err_missing_feature("azure", &cloud_location.scheme);
},
CloudType::File => {
allow_cache = false;
let local = LocalFileSystem::new();
Ok::<_, PolarsError>(Arc::new(local) as Arc<dyn ObjectStore>)
},
CloudType::Http => {
{
allow_cache = false;
#[cfg(feature = "http")]
{
let store = options.build_http(url)?;
PolarsResult::Ok(Arc::new(store) as Arc<dyn ObjectStore>)
}
}
#[cfg(not(feature = "http"))]
return err_missing_feature("http", &cloud_location.scheme);
},
CloudType::Hf => panic!("impl error: unresolved hf:// path"),
}?;
if allow_cache {
let mut cache = OBJECT_STORE_CACHE.write().await;
// Clear the cache if we surpass a certain amount of buckets.
if cache.len() > 8 {
if config::verbose() {
eprintln!(
"build_object_store: clearing store cache (cache.len(): {})",
cache.len()
);
}
cache.clear()
}
cache.insert(key, store.clone());
}
Ok((cloud_location, store))
}

Expand Down
Loading

0 comments on commit fdaf517

Please sign in to comment.