From 67af4202f21a41db1a354d319debcf94d0740cd5 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Thu, 19 Dec 2024 18:26:32 +1100 Subject: [PATCH] c --- .../src/cloud/credential_provider.rs | 11 +- crates/polars-io/src/cloud/glob.rs | 22 ++-- .../polars-io/src/cloud/object_store_setup.rs | 42 ++++--- crates/polars-io/src/cloud/options.rs | 117 +++++++++++++++++- .../src/plans/conversion/dsl_to_ir.rs | 18 ++- .../polars/io/cloud/credential_provider.py | 4 +- 6 files changed, 172 insertions(+), 42 deletions(-) diff --git a/crates/polars-io/src/cloud/credential_provider.rs b/crates/polars-io/src/cloud/credential_provider.rs index e6de837488c1..6dfbe880cc9e 100644 --- a/crates/polars-io/src/cloud/credential_provider.rs +++ b/crates/polars-io/src/cloud/credential_provider.rs @@ -547,6 +547,7 @@ mod python_impl { #[cfg(feature = "azure")] fn into_azure_provider(self) -> object_store::azure::AzureCredentialProvider { + use object_store::azure::AzureAccessKey; use polars_error::{to_compute_err, PolarsResult}; use crate::cloud::credential_provider::{ @@ -570,6 +571,12 @@ mod python_impl { // We only support bearer for now match k.as_ref() { + "account_key" => { + credentials = object_store::azure::AzureCredential::AccessKey( + AzureAccessKey::try_new(v.as_str()) + .map_err(|e| PyValueError::new_err(e.to_string()))?, + ) + }, "bearer_token" => { credentials = object_store::azure::AzureCredential::BearerToken(v) @@ -577,8 +584,8 @@ mod python_impl { v => { return pyo3::PyResult::Err(PyValueError::new_err(format!( "unknown configuration key for azure: {}, \ - valid configuration keys are: {}", - v, "bearer_token", + valid configuration keys are: {}, {}", + v, "account_key", "bearer_token", ))) }, } diff --git a/crates/polars-io/src/cloud/glob.rs b/crates/polars-io/src/cloud/glob.rs index c55d7767f39f..9ad4595faa53 100644 --- a/crates/polars-io/src/cloud/glob.rs +++ b/crates/polars-io/src/cloud/glob.rs @@ -1,8 +1,9 @@ use futures::TryStreamExt; use object_store::path::Path; use polars_core::error::to_compute_err; -use polars_core::prelude::{polars_ensure, polars_err}; -use polars_error::PolarsResult; +use polars_core::prelude::polars_ensure; +use polars_error::{polars_bail, PolarsResult}; +use polars_utils::format_pl_smallstr; use polars_utils::pl_str::PlSmallStr; use regex::Regex; use url::Url; @@ -98,13 +99,16 @@ impl CloudLocation { } let key = parsed.path(); - let bucket = parsed - .host() - .ok_or_else( - || polars_err!(ComputeError: "cannot parse bucket (host) from url: {}", parsed), - )? - .to_string() - .into(); + + let bucket = format_pl_smallstr!( + "{}", + &parsed[url::Position::BeforeUsername..url::Position::AfterPort] + ); + + if bucket.is_empty() { + polars_bail!(ComputeError: "CloudLocation::from_url(): empty bucket: {}", parsed); + } + (bucket, key) }; diff --git a/crates/polars-io/src/cloud/object_store_setup.rs b/crates/polars-io/src/cloud/object_store_setup.rs index 22e666a8198b..85e61d721fd0 100644 --- a/crates/polars-io/src/cloud/object_store_setup.rs +++ b/crates/polars-io/src/cloud/object_store_setup.rs @@ -7,6 +7,7 @@ use polars_core::config; use polars_error::{polars_bail, to_compute_err, PolarsError, PolarsResult}; use polars_utils::aliases::PlHashMap; use polars_utils::pl_str::PlSmallStr; +use polars_utils::{format_pl_smallstr, pl_serialize}; use tokio::sync::RwLock; use url::Url; @@ -17,7 +18,7 @@ use crate::cloud::CloudConfig; /// get rate limited when querying the DNS (can take up to 5s). /// Other reasons are connection pools that must be shared between as much as possible. #[allow(clippy::type_complexity)] -static OBJECT_STORE_CACHE: Lazy>> = +static OBJECT_STORE_CACHE: Lazy, PolarsObjectStore>>> = Lazy::new(Default::default); #[allow(dead_code)] @@ -29,10 +30,10 @@ fn err_missing_feature(feature: &str, scheme: &str) -> PolarsResult) -> String { +fn url_and_creds_to_key(url: &Url, options: Option<&CloudOptions>) -> Vec { #[derive(Clone, Debug, PartialEq, Hash, Eq)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] - struct S { + struct C { max_retries: usize, #[cfg(feature = "file_cache")] file_cache_ttl: u64, @@ -41,8 +42,15 @@ fn url_and_creds_to_key(url: &Url, options: Option<&CloudOptions>) -> String { credential_provider: usize, } + #[derive(Clone, Debug, PartialEq, Hash, Eq)] + #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] + struct S { + url_base: PlSmallStr, + cloud_options: Option, + } + // We include credentials as they can expire, so users will send new credentials for the same url. - let creds = serde_json::to_string(&options.map( + let cloud_options = options.map( |CloudOptions { // Destructure to ensure this breaks if anything changes. max_retries, @@ -52,7 +60,7 @@ fn url_and_creds_to_key(url: &Url, options: Option<&CloudOptions>) -> String { #[cfg(feature = "cloud")] credential_provider, }| { - S { + C { max_retries: *max_retries, #[cfg(feature = "file_cache")] file_cache_ttl: *file_cache_ttl, @@ -61,15 +69,21 @@ fn url_and_creds_to_key(url: &Url, options: Option<&CloudOptions>) -> String { credential_provider: credential_provider.as_ref().map_or(0, |x| x.func_addr()), } }, - )) - .unwrap(); - - format!( - "{}://{}<\\creds\\>{}", - url.scheme(), - &url[url::Position::BeforeHost..url::Position::AfterPort], - creds - ) + ); + + let cache_key = S { + url_base: format_pl_smallstr!( + "{}", + &url[url::Position::BeforeScheme..url::Position::AfterPort] + ), + cloud_options, + }; + + if config::verbose() { + eprintln!("object store cache key: {} {:?}", url, &cache_key); + } + + pl_serialize::serialize_to_bytes(&cache_key).unwrap() } /// Construct an object_store `Path` from a string without any encoding/decoding. diff --git a/crates/polars-io/src/cloud/options.rs b/crates/polars-io/src/cloud/options.rs index de5f8903eabd..015a8cae2f18 100644 --- a/crates/polars-io/src/cloud/options.rs +++ b/crates/polars-io/src/cloud/options.rs @@ -406,16 +406,20 @@ impl CloudOptions { pub fn build_azure(&self, url: &str) -> PolarsResult { use super::credential_provider::IntoCredentialProvider; - let mut builder = if self.credential_provider.is_none() { - MicrosoftAzureBuilder::from_env() - } else { - MicrosoftAzureBuilder::new() - }; + let mut storage_account: Option = None; + + // The credential provider `self.credentials` is prioritized if it is set. We also need + // `from_env()` as it may source environment configured storage account name. + let mut builder = MicrosoftAzureBuilder::from_env(); + if let Some(options) = &self.config { let CloudConfig::Azure(options) = options else { panic!("impl error: cloud type mismatch") }; for (key, value) in options.iter() { + if key == &AzureConfigKey::AccountName { + storage_account = Some(value.into()); + } builder = builder.with_config(*key, value); } } @@ -425,8 +429,18 @@ impl CloudOptions { .with_url(url) .with_retry(get_retry_config(self.max_retries)); + // Prefer the one embedded in the path + storage_account = extract_adls_uri_storage_account(url) + .map(|x| x.into()) + .or(storage_account); + let builder = if let Some(v) = self.credential_provider.clone() { builder.with_credentials(v.into_azure_provider()) + } else if let Some(v) = storage_account + .as_deref() + .and_then(get_azure_storage_account_key) + { + builder.with_access_key(v) } else { builder }; @@ -610,6 +624,99 @@ impl CloudOptions { } } +/// ```text +/// "abfss://{CONTAINER}@{STORAGE_ACCOUNT}.dfs.core.windows.net/" +/// ^^^^^^^^^^^^^^^^^ +/// ``` +#[cfg(feature = "azure")] +fn extract_adls_uri_storage_account(path: &str) -> Option<&str> { + Some( + path.split_once("://")? + .1 + .split_once('/')? + .0 + .split_once('@')? + .1 + .split_once(".dfs.core.windows.net")? + .0, + ) +} + +/// Attempt to retrieve the storage account key for this account using the Azure CLI. +#[cfg(feature = "azure")] +fn get_azure_storage_account_key(account_name: &str) -> Option { + if polars_core::config::verbose() { + eprintln!( + "get_azure_storage_account_key: storage_account_name: {}", + account_name + ); + } + + let mut cmd = if cfg!(target_family = "windows") { + // https://github.com/apache/arrow-rs/blob/565c24b8071269b02c3937e34c51eacf0f4cbad6/object_store/src/azure/credential.rs#L877-L894 + let mut v = std::process::Command::new("cmd"); + v.args([ + "/C", + "az", + "storage", + "account", + "keys", + "list", + "--output", + "json", + "--account-name", + account_name, + ]); + v + } else { + let mut v = std::process::Command::new("az"); + v.args([ + "storage", + "account", + "keys", + "list", + "--output", + "json", + "--account-name", + account_name, + ]); + v + }; + + let json_resp = cmd + .output() + .ok() + .filter(|x| x.status.success()) + .map(|x| String::from_utf8(x.stdout))? + .ok()?; + + // [ + // { + // "creationTime": "1970-01-01T00:00:00.000000+00:00", + // "keyName": "key1", + // "permissions": "FULL", + // "value": "..." + // }, + // { + // "creationTime": "1970-01-01T00:00:00.000000+00:00", + // "keyName": "key2", + // "permissions": "FULL", + // "value": "..." + // } + // ] + + #[derive(Debug, serde::Deserialize)] + struct S { + value: String, + } + + let resp: Vec = serde_json::from_str(&json_resp).ok()?; + + let access_key = resp.into_iter().next()?.value; + + Some(access_key) +} + #[cfg(feature = "cloud")] #[cfg(test)] mod tests { diff --git a/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs b/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs index 34f03e6debdd..0ad4bb6bb0ee 100644 --- a/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs +++ b/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs @@ -163,21 +163,19 @@ pub fn to_alp_impl(lp: DslPlan, ctxt: &mut DslConversionContext) -> PolarsResult let sources = match &scan_type { #[cfg(feature = "parquet")] - FileScan::Parquet { - ref cloud_options, .. - } => sources + FileScan::Parquet { cloud_options, .. } => sources .expand_paths_with_hive_update(&mut file_options, cloud_options.as_ref())?, #[cfg(feature = "ipc")] - FileScan::Ipc { - ref cloud_options, .. - } => sources + FileScan::Ipc { cloud_options, .. } => sources .expand_paths_with_hive_update(&mut file_options, cloud_options.as_ref())?, #[cfg(feature = "csv")] - FileScan::Csv { - ref cloud_options, .. - } => sources.expand_paths(&file_options, cloud_options.as_ref())?, + FileScan::Csv { cloud_options, .. } => { + sources.expand_paths(&file_options, cloud_options.as_ref())? + }, #[cfg(feature = "json")] - FileScan::NDJson { .. } => sources.expand_paths(&file_options, None)?, + FileScan::NDJson { cloud_options, .. } => { + sources.expand_paths(&file_options, cloud_options.as_ref())? + }, FileScan::Anonymous { .. } => sources, }; diff --git a/py-polars/polars/io/cloud/credential_provider.py b/py-polars/polars/io/cloud/credential_provider.py index dd98683f316e..832f8708f00d 100644 --- a/py-polars/polars/io/cloud/credential_provider.py +++ b/py-polars/polars/io/cloud/credential_provider.py @@ -165,7 +165,7 @@ def __init__( ---------- Parameters are passed to `google.auth.default()` """ - msg = "`CredentialProviderAWS` functionality is considered unstable" + msg = "`CredentialProviderGCP` functionality is considered unstable" issue_unstable_warning(msg) self._check_module_availability() @@ -194,7 +194,7 @@ def __init__( self.creds = creds def __call__(self) -> CredentialProviderFunctionReturn: - """Fetch the credentials for the configured profile name.""" + """Fetch the credentials.""" import google.auth.transport.requests self.creds.refresh(google.auth.transport.requests.__dict__["Request"]())