From d52b1cec806949eadfdf933bcec537f3aee72d6f Mon Sep 17 00:00:00 2001 From: Mohammed Muddassir Date: Thu, 27 Jul 2023 02:03:56 +0530 Subject: [PATCH 1/8] Changes required for onelake-fix --- object_store/src/azure/mod.rs | 39 +++++++++++++++++++++++++++++++---- 1 file changed, 35 insertions(+), 4 deletions(-) diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index d2735038321b..ff0c9e9ba610 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -726,7 +726,9 @@ impl MicrosoftAzureBuilder { // or the convention for the hadoop driver abfs[s]://@.dfs.core.windows.net/ if parsed.username().is_empty() { self.container_name = Some(validate(host)?); - } else if let Some(a) = host.strip_suffix(".dfs.core.windows.net") { + } else if let Some(a) = host.strip_suffix(".dfs.core.windows.net") + .or_else(|| host.strip_suffix(".dfs.fabric.microsoft.com")) + { self.container_name = Some(validate(parsed.username())?); self.account_name = Some(validate(a)?); } else { @@ -735,7 +737,12 @@ impl MicrosoftAzureBuilder { } "https" => match host.split_once('.') { Some((a, "dfs.core.windows.net")) - | Some((a, "blob.core.windows.net")) => { + | Some((a, "blob.core.windows.net")) + => { + self.account_name = Some(validate(a)?); + } + Some((a, "dfs.fabric.microsoft.com")) + | Some((a, "blob.fabric.microsoft.com")) =>{ self.account_name = Some(validate(a)?); } _ => return Err(UrlNotRecognisedSnafu { url }.build().into()), @@ -889,7 +896,17 @@ impl MicrosoftAzureBuilder { self.parse_url(&url)?; } - let container = self.container_name.ok_or(Error::MissingContainerName {})?; + let use_ok_or = match &self.account_name { + Some(account_name) => !account_name.contains("onelake"), + None => true, + }; + + let container = if use_ok_or { + self.container_name.ok_or(Error::MissingContainerName {})? + } else { + self.container_name.unwrap_or_default() + }; + let static_creds = |credential: AzureCredential| -> AzureCredentialProvider { Arc::new(StaticCredentialProvider::new(credential)) }; @@ -911,7 +928,13 @@ impl MicrosoftAzureBuilder { (true, url, credential, account_name) } else { let account_name = self.account_name.ok_or(Error::MissingAccount {})?; - let account_url = format!("https://{}.blob.core.windows.net", &account_name); + let account_url = if account_name.contains("onelake") { + format!("https://{}.blob.fabric.microsoft.com", &account_name) + } else { + format!("https://{}.blob.core.windows.net", &account_name) + }; + + let url = Url::parse(&account_url) .context(UnableToParseUrlSnafu { url: account_url })?; @@ -1168,6 +1191,14 @@ mod tests { .unwrap(); assert_eq!(builder.account_name, Some("account".to_string())); + let mut builder = MicrosoftAzureBuilder::new(); + builder + .parse_url("https://daily-onelake.dfs.fabric.microsoft.com/86bc63cf-5086-42e0-b16d-6bc580d1dc87/17d3977c-d46e-4bae-8fed-ff467e674aed/Files/SampleCustomerList.csv") + .unwrap(); + assert_eq!(builder.account_name, Some("daily-onelake".to_string())); + assert_eq!(builder.container_name, Some("86bc63cf-5086-42e0-b16d-6bc580d1dc87".to_string())); + + let err_cases = [ "mailto://account.blob.core.windows.net/", "az://blob.mydomain/", From bf2b828d795d067e86ece9023ec3c35a23c8ca52 Mon Sep 17 00:00:00 2001 From: Mohammed Muddassir Date: Thu, 27 Jul 2023 19:17:39 +0530 Subject: [PATCH 2/8] Fix Unit tests --- object_store/src/azure/mod.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index ff0c9e9ba610..8debdd07fb38 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -1196,9 +1196,7 @@ mod tests { .parse_url("https://daily-onelake.dfs.fabric.microsoft.com/86bc63cf-5086-42e0-b16d-6bc580d1dc87/17d3977c-d46e-4bae-8fed-ff467e674aed/Files/SampleCustomerList.csv") .unwrap(); assert_eq!(builder.account_name, Some("daily-onelake".to_string())); - assert_eq!(builder.container_name, Some("86bc63cf-5086-42e0-b16d-6bc580d1dc87".to_string())); - - + let err_cases = [ "mailto://account.blob.core.windows.net/", "az://blob.mydomain/", From 02ed7c599e945ec936780b6fc103f3fff0356cc8 Mon Sep 17 00:00:00 2001 From: Mohammed Muddassir Date: Tue, 8 Aug 2023 15:35:29 +0530 Subject: [PATCH 3/8] Add Unit Tests --- object_store/src/azure/mod.rs | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index 8debdd07fb38..9e0b809c1061 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -1167,9 +1167,16 @@ mod tests { assert_eq!(builder.account_name, Some("account".to_string())); assert_eq!(builder.container_name, Some("file_system".to_string())); + let mut builder = MicrosoftAzureBuilder::new(); + builder + .parse_url("abfss://file_system@account.dfs.fabric.microsoft.com/") + .unwrap(); + assert_eq!(builder.account_name, Some("account".to_string())); + assert_eq!(builder.container_name, Some("file_system".to_string())); + let mut builder = MicrosoftAzureBuilder::new(); builder.parse_url("abfs://container/path").unwrap(); - assert_eq!(builder.container_name, Some("container".to_string())); + assert_eq!(builder.container_name, Some("container".to_string())); let mut builder = MicrosoftAzureBuilder::new(); builder.parse_url("az://container").unwrap(); @@ -1193,9 +1200,15 @@ mod tests { let mut builder = MicrosoftAzureBuilder::new(); builder - .parse_url("https://daily-onelake.dfs.fabric.microsoft.com/86bc63cf-5086-42e0-b16d-6bc580d1dc87/17d3977c-d46e-4bae-8fed-ff467e674aed/Files/SampleCustomerList.csv") + .parse_url("https://account.dfs.fabric.microsoft.com/") .unwrap(); - assert_eq!(builder.account_name, Some("daily-onelake".to_string())); + assert_eq!(builder.account_name, Some("account".to_string())); + + let mut builder = MicrosoftAzureBuilder::new(); + builder + .parse_url("https://account.blob.fabric.microsoft.com/") + .unwrap(); + assert_eq!(builder.account_name, Some("account".to_string())); let err_cases = [ "mailto://account.blob.core.windows.net/", From 5e253beeee16fde171e0b505476a0446ae86b786 Mon Sep 17 00:00:00 2001 From: Mohammed Muddassir Date: Wed, 9 Aug 2023 22:16:16 +0530 Subject: [PATCH 4/8] Add onelake read/write test --- object_store/tests/azure_object_store.rs | 114 +++++++++++++++++++++++ 1 file changed, 114 insertions(+) create mode 100644 object_store/tests/azure_object_store.rs diff --git a/object_store/tests/azure_object_store.rs b/object_store/tests/azure_object_store.rs new file mode 100644 index 000000000000..bcdc1f84ac41 --- /dev/null +++ b/object_store/tests/azure_object_store.rs @@ -0,0 +1,114 @@ +use async_trait::async_trait; +use bytes::Bytes; +use futures::stream::BoxStream; +use object_store::azure::{MicrosoftAzureBuilder, MicrosoftAzure}; +use object_store::path::Path; +use object_store::{ + GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, +}; +use std::fmt::Formatter; +use tokio::io::AsyncWrite; + +#[derive(Debug)] +struct AzureStore(MicrosoftAzure); + +impl std::fmt::Display for AzureStore { + fn fmt(&self, _: &mut Formatter<'_>) -> std::fmt::Result { + todo!() + } +} + +#[async_trait] +impl ObjectStore for AzureStore { + async fn put(&self, path: &Path, data: Bytes) -> object_store::Result<()> { + self.0.put(path, data).await + } + + async fn put_multipart( + &self, + _: &Path, + ) -> object_store::Result<(MultipartId, Box)> { + todo!() + } + + async fn abort_multipart( + &self, + _: &Path, + _: &MultipartId, + ) -> object_store::Result<()> { + todo!() + } + + async fn get_opts( + &self, + location: &Path, + options: GetOptions, + ) -> object_store::Result { + self.0.get_opts(location, options).await + } + + async fn head(&self, _: &Path) -> object_store::Result { + todo!() + } + + async fn delete(&self, _: &Path) -> object_store::Result<()> { + todo!() + } + + async fn list( + &self, + _: Option<&Path>, + ) -> object_store::Result>> { + todo!() + } + + async fn list_with_delimiter( + &self, + _: Option<&Path>, + ) -> object_store::Result { + todo!() + } + + async fn copy(&self, _: &Path, _: &Path) -> object_store::Result<()> { + todo!() + } + + async fn copy_if_not_exists(&self, _: &Path, _: &Path) -> object_store::Result<()> { + todo!() + } +} + + +#[tokio::test] +async fn test_fabric() { + //Format:https://onelake.dfs.fabric.microsoft.com///Files/test.csv + //Example:https://onelake.dfs.fabric.microsoft.com/86bc63cf-5086-42e0-b16d-6bc580d1dc87/17d3977c-d46e-4bae-8fed-ff467e674aed/Files/SampleCustomerList.csv + //Account Name : onelake + //Container Name : workspaceGUID + + let daily_store = AzureStore( + MicrosoftAzureBuilder::new() + .with_container_name("86bc63cf-5086-42e0-b16d-6bc580d1dc87") + .with_account("onelake") + .with_bearer_token_authorization("jwt-token") + .build() + .unwrap()); + + let path = Path::from("17d3977c-d46e-4bae-8fed-ff467e674aed/Files/SampleCustomerList.csv"); + + read_write_test(&daily_store, &path).await; +} + + +async fn read_write_test(store: &AzureStore, path: &Path) { + let expected = Bytes::from_static(b"hello world"); + store.put(path, expected.clone()).await.unwrap(); + let fetched = store.get(path).await.unwrap().bytes().await.unwrap(); + assert_eq!(expected, fetched); + + for range in [0..10, 3..5, 0..expected.len()] { + let data = store.get_range(path, range.clone()).await.unwrap(); + assert_eq!(&data[..], &expected[range]) + } +} + From a612d8f7891eb53df4a18d2b82c8f37efe507773 Mon Sep 17 00:00:00 2001 From: Mohammed Muddassir Date: Fri, 11 Aug 2023 00:05:14 +0530 Subject: [PATCH 5/8] Add with_use_fabric , for fabric url check --- object_store/src/azure/mod.rs | 53 ++++++++++++++++++------ object_store/tests/azure_object_store.rs | 23 +++++++++- 2 files changed, 62 insertions(+), 14 deletions(-) diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index 9e0b809c1061..a20791a5faa5 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -346,6 +346,8 @@ pub struct MicrosoftAzureBuilder { client_options: ClientOptions, /// Credentials credentials: Option, + /// When set to true , fabric url scheme https://onelake.dfs.fabric.microsoft.com will be used + use_fabric: ConfigValue, } /// Configuration keys for [`MicrosoftAzureBuilder`] @@ -435,6 +437,13 @@ pub enum AzureConfigKey { /// - `use_emulator` UseEmulator, + /// Use object store with url scheme account.dfs.fabric.microsoft.com + /// + /// Supported keys: + /// - `object_store_use_fabric` + /// - `use_fabric` + UseFabric, + /// Endpoint to request a imds managed identity token /// /// Supported keys: @@ -487,6 +496,7 @@ impl AsRef for AzureConfigKey { Self::SasKey => "azure_storage_sas_key", Self::Token => "azure_storage_token", Self::UseEmulator => "azure_storage_use_emulator", + Self::UseFabric => "object_store_use_fabric", Self::MsiEndpoint => "azure_msi_endpoint", Self::ObjectId => "azure_object_id", Self::MsiResourceId => "azure_msi_resource_id", @@ -536,6 +546,9 @@ impl FromStr for AzureConfigKey { "azure_federated_token_file" | "federated_token_file" => { Ok(Self::FederatedTokenFile) } + "use_fabric" | "object_store_use_fabric" => { + Ok(Self::UseFabric) + } "azure_use_azure_cli" | "use_azure_cli" => Ok(Self::UseAzureCli), // Backwards compatibility "azure_allow_http" => Ok(Self::Client(ClientConfigKey::AllowHttp)), @@ -605,12 +618,13 @@ impl MicrosoftAzureBuilder { /// /// - `abfs[s]:///` (according to [fsspec](https://github.com/fsspec/adlfs)) /// - `abfs[s]://@.dfs.core.windows.net/` + /// - `abfs[s]://@.dfs.fabric.microsoft.com/` /// - `az:///` (according to [fsspec](https://github.com/fsspec/adlfs)) /// - `adl:///` (according to [fsspec](https://github.com/fsspec/adlfs)) /// - `azure:///` (custom) /// - `https://.dfs.core.windows.net` /// - `https://.blob.core.windows.net` - /// + /// - `https://.dfs.fabric.microsoft.com` /// Note: Settings derived from the URL will override any others set on this builder /// /// # Example @@ -644,6 +658,7 @@ impl MicrosoftAzureBuilder { } AzureConfigKey::UseAzureCli => self.use_azure_cli.parse(value), AzureConfigKey::UseEmulator => self.use_emulator.parse(value), + AzureConfigKey::UseFabric => self.use_fabric.parse(value), AzureConfigKey::Client(key) => { self.client_options = self.client_options.with_config(key, value) } @@ -697,6 +712,7 @@ impl MicrosoftAzureBuilder { AzureConfigKey::SasKey => self.sas_key.clone(), AzureConfigKey::Token => self.bearer_token.clone(), AzureConfigKey::UseEmulator => Some(self.use_emulator.to_string()), + AzureConfigKey::UseFabric => Some(self.use_fabric.to_string()), AzureConfigKey::MsiEndpoint => self.msi_endpoint.clone(), AzureConfigKey::ObjectId => self.object_id.clone(), AzureConfigKey::MsiResourceId => self.msi_resource_id.clone(), @@ -744,6 +760,18 @@ impl MicrosoftAzureBuilder { Some((a, "dfs.fabric.microsoft.com")) | Some((a, "blob.fabric.microsoft.com")) =>{ self.account_name = Some(validate(a)?); + // if the container_name i.e the workspaceGUID is not set for fabric try to infer this + // from the url scheme https://onelake.dfs.fabric.microsoft.com///Files/test.csv + if self.container_name.is_none() { + let path_segments: Vec<_> = parsed.path_segments().unwrap().collect(); + if let Some(storage_name) = path_segments.first() { + self.container_name = if !storage_name.is_empty() { + Some(storage_name.to_string()) + } else { + None + }; + } + } } _ => return Err(UrlNotRecognisedSnafu { url }.build().into()), }, @@ -831,6 +859,15 @@ impl MicrosoftAzureBuilder { self } + /// Set if Fabric url should be used (defaults to false) + /// Default http://mystorageaccount.blob.core.windows.net + /// When set to true url scheme https://onelake.dfs.fabric.microsoft.com/ will be used + pub fn with_use_fabric(mut self, use_fabric: bool) -> Self { + self.use_fabric = use_fabric.into(); + self + } + + /// Sets what protocol is allowed. If `allow_http` is : /// * false (default): Only HTTPS are allowed /// * true: HTTP and HTTPS are allowed @@ -896,16 +933,7 @@ impl MicrosoftAzureBuilder { self.parse_url(&url)?; } - let use_ok_or = match &self.account_name { - Some(account_name) => !account_name.contains("onelake"), - None => true, - }; - - let container = if use_ok_or { - self.container_name.ok_or(Error::MissingContainerName {})? - } else { - self.container_name.unwrap_or_default() - }; + let container = self.container_name.ok_or(Error::MissingContainerName {})?; let static_creds = |credential: AzureCredential| -> AzureCredentialProvider { Arc::new(StaticCredentialProvider::new(credential)) @@ -928,13 +956,12 @@ impl MicrosoftAzureBuilder { (true, url, credential, account_name) } else { let account_name = self.account_name.ok_or(Error::MissingAccount {})?; - let account_url = if account_name.contains("onelake") { + let account_url = if self.use_fabric.get()? { format!("https://{}.blob.fabric.microsoft.com", &account_name) } else { format!("https://{}.blob.core.windows.net", &account_name) }; - let url = Url::parse(&account_url) .context(UnableToParseUrlSnafu { url: account_url })?; diff --git a/object_store/tests/azure_object_store.rs b/object_store/tests/azure_object_store.rs index bcdc1f84ac41..e1ff7e3ccd31 100644 --- a/object_store/tests/azure_object_store.rs +++ b/object_store/tests/azure_object_store.rs @@ -89,7 +89,28 @@ async fn test_fabric() { let daily_store = AzureStore( MicrosoftAzureBuilder::new() .with_container_name("86bc63cf-5086-42e0-b16d-6bc580d1dc87") - .with_account("onelake") + .with_account("daily-onelake") + .with_use_fabric(true) + .with_bearer_token_authorization("jwt-token") + .build() + .unwrap()); + + let path = Path::from("17d3977c-d46e-4bae-8fed-ff467e674aed/Files/SampleCustomerList.csv"); + + read_write_test(&daily_store, &path).await; +} + +#[tokio::test] +async fn test_fabric_url() { + //Format:https://onelake.dfs.fabric.microsoft.com///Files/test.csv + //Example:https://onelake.dfs.fabric.microsoft.com/86bc63cf-5086-42e0-b16d-6bc580d1dc87/17d3977c-d46e-4bae-8fed-ff467e674aed/Files/SampleCustomerList.csv + //Account Name : onelake + //Container Name : workspaceGUID + + let daily_store = AzureStore( + MicrosoftAzureBuilder::new() + .with_url("https://daily-onelake.dfs.fabric.microsoft.com/86bc63cf-5086-42e0-b16d-6bc580d1dc87") + .with_use_fabric(true) .with_bearer_token_authorization("jwt-token") .build() .unwrap()); From 65089e8056b3fe900b31b3345b18acd79d08b2f3 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 11 Aug 2023 13:56:12 +0100 Subject: [PATCH 6/8] Final tweaks --- object_store/src/azure/mod.rs | 78 ++++++++----- object_store/tests/azure_object_store.rs | 135 ----------------------- 2 files changed, 48 insertions(+), 165 deletions(-) delete mode 100644 object_store/tests/azure_object_store.rs diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index 559d1b9548af..8f6fd97cd1df 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -341,7 +341,9 @@ pub struct MicrosoftAzureBuilder { client_options: ClientOptions, /// Credentials credentials: Option, - /// When set to true , fabric url scheme https://onelake.dfs.fabric.microsoft.com will be used + /// When set to true , fabric url scheme will be used + /// + /// i.e. https://{account_name}.dfs.fabric.microsoft.com use_fabric: ConfigValue, } @@ -435,7 +437,7 @@ pub enum AzureConfigKey { /// Use object store with url scheme account.dfs.fabric.microsoft.com /// /// Supported keys: - /// - `object_store_use_fabric` + /// - `azure_use_fabric` /// - `use_fabric` UseFabric, @@ -491,7 +493,7 @@ impl AsRef for AzureConfigKey { Self::SasKey => "azure_storage_sas_key", Self::Token => "azure_storage_token", Self::UseEmulator => "azure_storage_use_emulator", - Self::UseFabric => "object_store_use_fabric", + Self::UseFabric => "azure_use_fabric", Self::MsiEndpoint => "azure_msi_endpoint", Self::ObjectId => "azure_object_id", Self::MsiResourceId => "azure_msi_resource_id", @@ -541,9 +543,7 @@ impl FromStr for AzureConfigKey { "azure_federated_token_file" | "federated_token_file" => { Ok(Self::FederatedTokenFile) } - "use_fabric" | "object_store_use_fabric" => { - Ok(Self::UseFabric) - } + "use_fabric" | "azure_use_fabric" => Ok(Self::UseFabric), "azure_use_azure_cli" | "use_azure_cli" => Ok(Self::UseAzureCli), // Backwards compatibility "azure_allow_http" => Ok(Self::Client(ClientConfigKey::AllowHttp)), @@ -620,6 +620,9 @@ impl MicrosoftAzureBuilder { /// - `https://.dfs.core.windows.net` /// - `https://.blob.core.windows.net` /// - `https://.dfs.fabric.microsoft.com` + /// - `https://.dfs.fabric.microsoft.com/` + /// - `https://.blob.fabric.microsoft.com` + /// - `https://.blob.fabric.microsoft.com/` /// Note: Settings derived from the URL will override any others set on this builder /// /// # Example @@ -737,8 +740,9 @@ impl MicrosoftAzureBuilder { // or the convention for the hadoop driver abfs[s]://@.dfs.core.windows.net/ if parsed.username().is_empty() { self.container_name = Some(validate(host)?); - } else if let Some(a) = host.strip_suffix(".dfs.core.windows.net") - .or_else(|| host.strip_suffix(".dfs.fabric.microsoft.com")) + } else if let Some(a) = host + .strip_suffix(".dfs.core.windows.net") + .or_else(|| host.strip_suffix(".dfs.fabric.microsoft.com")) { self.container_name = Some(validate(parsed.username())?); self.account_name = Some(validate(a)?); @@ -748,23 +752,23 @@ impl MicrosoftAzureBuilder { } "https" => match host.split_once('.') { Some((a, "dfs.core.windows.net")) - | Some((a, "blob.core.windows.net")) - => { + | Some((a, "blob.core.windows.net")) => { self.account_name = Some(validate(a)?); } Some((a, "dfs.fabric.microsoft.com")) - | Some((a, "blob.fabric.microsoft.com")) =>{ + | Some((a, "blob.fabric.microsoft.com")) => { self.account_name = Some(validate(a)?); - // if the container_name i.e the workspaceGUID is not set for fabric try to infer this + // if the container_name i.e the workspaceGUID is not set for fabric try to infer this // from the url scheme https://onelake.dfs.fabric.microsoft.com///Files/test.csv - if self.container_name.is_none() { - let path_segments: Vec<_> = parsed.path_segments().unwrap().collect(); - if let Some(storage_name) = path_segments.first() { - self.container_name = if !storage_name.is_empty() { - Some(storage_name.to_string()) - } else { - None - }; + // or https://onelake.dfs.fabric.microsoft.com//.// + // + // See + + if self.container_name.is_none() { + if let Some(workspace) = parsed.path_segments().unwrap().next() { + if !workspace.is_empty() { + self.container_name = Some(workspace.to_string()) + } } } } @@ -854,15 +858,14 @@ impl MicrosoftAzureBuilder { self } - /// Set if Fabric url should be used (defaults to false) - /// Default http://mystorageaccount.blob.core.windows.net - /// When set to true url scheme https://onelake.dfs.fabric.microsoft.com/ will be used + /// Set if Microsoft Fabric url scheme should be used (defaults to false) + /// When disabled the url scheme used is `https://{account}.blob.core.windows.net` + /// When enabled the url scheme used is `https://{account}.dfs.fabric.microsoft.com` pub fn with_use_fabric(mut self, use_fabric: bool) -> Self { self.use_fabric = use_fabric.into(); self } - /// Sets what protocol is allowed. If `allow_http` is : /// * false (default): Only HTTPS are allowed /// * true: HTTP and HTTPS are allowed @@ -951,10 +954,9 @@ impl MicrosoftAzureBuilder { (true, url, credential, account_name) } else { let account_name = self.account_name.ok_or(Error::MissingAccount {})?; - let account_url = if self.use_fabric.get()? { - format!("https://{}.blob.fabric.microsoft.com", &account_name) - } else { - format!("https://{}.blob.core.windows.net", &account_name) + let account_url = match self.use_fabric.get()? { + true => format!("https://{}.blob.fabric.microsoft.com", &account_name), + false => format!("https://{}.blob.core.windows.net", &account_name), }; let url = Url::parse(&account_url) @@ -1109,7 +1111,7 @@ mod tests { let mut builder = MicrosoftAzureBuilder::new(); builder.parse_url("abfs://container/path").unwrap(); - assert_eq!(builder.container_name, Some("container".to_string())); + assert_eq!(builder.container_name, Some("container".to_string())); let mut builder = MicrosoftAzureBuilder::new(); builder.parse_url("az://container").unwrap(); @@ -1136,13 +1138,29 @@ mod tests { .parse_url("https://account.dfs.fabric.microsoft.com/") .unwrap(); assert_eq!(builder.account_name, Some("account".to_string())); + assert_eq!(builder.container_name, None); + + let mut builder = MicrosoftAzureBuilder::new(); + builder + .parse_url("https://account.dfs.fabric.microsoft.com/container") + .unwrap(); + assert_eq!(builder.account_name, Some("account".to_string())); + assert_eq!(builder.container_name.as_deref(), Some("container")); let mut builder = MicrosoftAzureBuilder::new(); builder .parse_url("https://account.blob.fabric.microsoft.com/") .unwrap(); assert_eq!(builder.account_name, Some("account".to_string())); - + assert_eq!(builder.container_name, None); + + let mut builder = MicrosoftAzureBuilder::new(); + builder + .parse_url("https://account.blob.fabric.microsoft.com/container") + .unwrap(); + assert_eq!(builder.account_name, Some("account".to_string())); + assert_eq!(builder.container_name.as_deref(), Some("container")); + let err_cases = [ "mailto://account.blob.core.windows.net/", "az://blob.mydomain/", diff --git a/object_store/tests/azure_object_store.rs b/object_store/tests/azure_object_store.rs deleted file mode 100644 index e1ff7e3ccd31..000000000000 --- a/object_store/tests/azure_object_store.rs +++ /dev/null @@ -1,135 +0,0 @@ -use async_trait::async_trait; -use bytes::Bytes; -use futures::stream::BoxStream; -use object_store::azure::{MicrosoftAzureBuilder, MicrosoftAzure}; -use object_store::path::Path; -use object_store::{ - GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, -}; -use std::fmt::Formatter; -use tokio::io::AsyncWrite; - -#[derive(Debug)] -struct AzureStore(MicrosoftAzure); - -impl std::fmt::Display for AzureStore { - fn fmt(&self, _: &mut Formatter<'_>) -> std::fmt::Result { - todo!() - } -} - -#[async_trait] -impl ObjectStore for AzureStore { - async fn put(&self, path: &Path, data: Bytes) -> object_store::Result<()> { - self.0.put(path, data).await - } - - async fn put_multipart( - &self, - _: &Path, - ) -> object_store::Result<(MultipartId, Box)> { - todo!() - } - - async fn abort_multipart( - &self, - _: &Path, - _: &MultipartId, - ) -> object_store::Result<()> { - todo!() - } - - async fn get_opts( - &self, - location: &Path, - options: GetOptions, - ) -> object_store::Result { - self.0.get_opts(location, options).await - } - - async fn head(&self, _: &Path) -> object_store::Result { - todo!() - } - - async fn delete(&self, _: &Path) -> object_store::Result<()> { - todo!() - } - - async fn list( - &self, - _: Option<&Path>, - ) -> object_store::Result>> { - todo!() - } - - async fn list_with_delimiter( - &self, - _: Option<&Path>, - ) -> object_store::Result { - todo!() - } - - async fn copy(&self, _: &Path, _: &Path) -> object_store::Result<()> { - todo!() - } - - async fn copy_if_not_exists(&self, _: &Path, _: &Path) -> object_store::Result<()> { - todo!() - } -} - - -#[tokio::test] -async fn test_fabric() { - //Format:https://onelake.dfs.fabric.microsoft.com///Files/test.csv - //Example:https://onelake.dfs.fabric.microsoft.com/86bc63cf-5086-42e0-b16d-6bc580d1dc87/17d3977c-d46e-4bae-8fed-ff467e674aed/Files/SampleCustomerList.csv - //Account Name : onelake - //Container Name : workspaceGUID - - let daily_store = AzureStore( - MicrosoftAzureBuilder::new() - .with_container_name("86bc63cf-5086-42e0-b16d-6bc580d1dc87") - .with_account("daily-onelake") - .with_use_fabric(true) - .with_bearer_token_authorization("jwt-token") - .build() - .unwrap()); - - let path = Path::from("17d3977c-d46e-4bae-8fed-ff467e674aed/Files/SampleCustomerList.csv"); - - read_write_test(&daily_store, &path).await; -} - -#[tokio::test] -async fn test_fabric_url() { - //Format:https://onelake.dfs.fabric.microsoft.com///Files/test.csv - //Example:https://onelake.dfs.fabric.microsoft.com/86bc63cf-5086-42e0-b16d-6bc580d1dc87/17d3977c-d46e-4bae-8fed-ff467e674aed/Files/SampleCustomerList.csv - //Account Name : onelake - //Container Name : workspaceGUID - - let daily_store = AzureStore( - MicrosoftAzureBuilder::new() - .with_url("https://daily-onelake.dfs.fabric.microsoft.com/86bc63cf-5086-42e0-b16d-6bc580d1dc87") - .with_use_fabric(true) - .with_bearer_token_authorization("jwt-token") - .build() - .unwrap()); - - let path = Path::from("17d3977c-d46e-4bae-8fed-ff467e674aed/Files/SampleCustomerList.csv"); - - read_write_test(&daily_store, &path).await; -} - - -async fn read_write_test(store: &AzureStore, path: &Path) { - let expected = Bytes::from_static(b"hello world"); - store.put(path, expected.clone()).await.unwrap(); - let fetched = store.get(path).await.unwrap().bytes().await.unwrap(); - assert_eq!(expected, fetched); - - for range in [0..10, 3..5, 0..expected.len()] { - let data = store.get_range(path, range.clone()).await.unwrap(); - assert_eq!(&data[..], &expected[range]) - } -} - From 042f00583e426f887014290f025e69d4c2e2d87c Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 11 Aug 2023 14:03:15 +0100 Subject: [PATCH 7/8] Further tweaks --- object_store/src/azure/mod.rs | 37 +++++++++++++++++++---------------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index 8f6fd97cd1df..3765c11e66c5 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -341,10 +341,10 @@ pub struct MicrosoftAzureBuilder { client_options: ClientOptions, /// Credentials credentials: Option, - /// When set to true , fabric url scheme will be used + /// When set to true, fabric url scheme will be used /// /// i.e. https://{account_name}.dfs.fabric.microsoft.com - use_fabric: ConfigValue, + use_fabric_endpoint: ConfigValue, } /// Configuration keys for [`MicrosoftAzureBuilder`] @@ -437,9 +437,9 @@ pub enum AzureConfigKey { /// Use object store with url scheme account.dfs.fabric.microsoft.com /// /// Supported keys: - /// - `azure_use_fabric` - /// - `use_fabric` - UseFabric, + /// - `azure_use_fabric_endpoint` + /// - `use_fabric_endpoint` + UseFabricEndpoint, /// Endpoint to request a imds managed identity token /// @@ -493,7 +493,7 @@ impl AsRef for AzureConfigKey { Self::SasKey => "azure_storage_sas_key", Self::Token => "azure_storage_token", Self::UseEmulator => "azure_storage_use_emulator", - Self::UseFabric => "azure_use_fabric", + Self::UseFabricEndpoint => "azure_use_fabric_endpoint", Self::MsiEndpoint => "azure_msi_endpoint", Self::ObjectId => "azure_object_id", Self::MsiResourceId => "azure_msi_resource_id", @@ -543,7 +543,9 @@ impl FromStr for AzureConfigKey { "azure_federated_token_file" | "federated_token_file" => { Ok(Self::FederatedTokenFile) } - "use_fabric" | "azure_use_fabric" => Ok(Self::UseFabric), + "azure_use_fabric_endpoint" | "use_fabric_endpoint" => { + Ok(Self::UseFabricEndpoint) + } "azure_use_azure_cli" | "use_azure_cli" => Ok(Self::UseAzureCli), // Backwards compatibility "azure_allow_http" => Ok(Self::Client(ClientConfigKey::AllowHttp)), @@ -623,6 +625,7 @@ impl MicrosoftAzureBuilder { /// - `https://.dfs.fabric.microsoft.com/` /// - `https://.blob.fabric.microsoft.com` /// - `https://.blob.fabric.microsoft.com/` + /// /// Note: Settings derived from the URL will override any others set on this builder /// /// # Example @@ -656,7 +659,7 @@ impl MicrosoftAzureBuilder { } AzureConfigKey::UseAzureCli => self.use_azure_cli.parse(value), AzureConfigKey::UseEmulator => self.use_emulator.parse(value), - AzureConfigKey::UseFabric => self.use_fabric.parse(value), + AzureConfigKey::UseFabricEndpoint => self.use_fabric_endpoint.parse(value), AzureConfigKey::Client(key) => { self.client_options = self.client_options.with_config(key, value) } @@ -710,7 +713,9 @@ impl MicrosoftAzureBuilder { AzureConfigKey::SasKey => self.sas_key.clone(), AzureConfigKey::Token => self.bearer_token.clone(), AzureConfigKey::UseEmulator => Some(self.use_emulator.to_string()), - AzureConfigKey::UseFabric => Some(self.use_fabric.to_string()), + AzureConfigKey::UseFabricEndpoint => { + Some(self.use_fabric_endpoint.to_string()) + } AzureConfigKey::MsiEndpoint => self.msi_endpoint.clone(), AzureConfigKey::ObjectId => self.object_id.clone(), AzureConfigKey::MsiResourceId => self.msi_resource_id.clone(), @@ -764,11 +769,9 @@ impl MicrosoftAzureBuilder { // // See - if self.container_name.is_none() { - if let Some(workspace) = parsed.path_segments().unwrap().next() { - if !workspace.is_empty() { - self.container_name = Some(workspace.to_string()) - } + if let Some(workspace) = parsed.path_segments().unwrap().next() { + if !workspace.is_empty() { + self.container_name = Some(workspace.to_string()) } } } @@ -861,8 +864,8 @@ impl MicrosoftAzureBuilder { /// Set if Microsoft Fabric url scheme should be used (defaults to false) /// When disabled the url scheme used is `https://{account}.blob.core.windows.net` /// When enabled the url scheme used is `https://{account}.dfs.fabric.microsoft.com` - pub fn with_use_fabric(mut self, use_fabric: bool) -> Self { - self.use_fabric = use_fabric.into(); + pub fn with_use_fabric_endpoint(mut self, use_fabric_endpoint: bool) -> Self { + self.use_fabric_endpoint = use_fabric_endpoint.into(); self } @@ -954,7 +957,7 @@ impl MicrosoftAzureBuilder { (true, url, credential, account_name) } else { let account_name = self.account_name.ok_or(Error::MissingAccount {})?; - let account_url = match self.use_fabric.get()? { + let account_url = match self.use_fabric_endpoint.get()? { true => format!("https://{}.blob.fabric.microsoft.com", &account_name), false => format!("https://{}.blob.core.windows.net", &account_name), }; From 81b630b26484608c9e83d53ac15fd80d6449636e Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 11 Aug 2023 14:09:25 +0100 Subject: [PATCH 8/8] Automatically set use_fabric_endpoint --- object_store/src/azure/mod.rs | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index 3765c11e66c5..6bb4cdad1bb0 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -745,12 +745,13 @@ impl MicrosoftAzureBuilder { // or the convention for the hadoop driver abfs[s]://@.dfs.core.windows.net/ if parsed.username().is_empty() { self.container_name = Some(validate(host)?); - } else if let Some(a) = host - .strip_suffix(".dfs.core.windows.net") - .or_else(|| host.strip_suffix(".dfs.fabric.microsoft.com")) - { + } else if let Some(a) = host.strip_suffix(".dfs.core.windows.net") { self.container_name = Some(validate(parsed.username())?); self.account_name = Some(validate(a)?); + } else if let Some(a) = host.strip_suffix(".dfs.fabric.microsoft.com") { + self.container_name = Some(validate(parsed.username())?); + self.account_name = Some(validate(a)?); + self.use_fabric_endpoint = true.into(); } else { return Err(UrlNotRecognisedSnafu { url }.build().into()); } @@ -763,17 +764,17 @@ impl MicrosoftAzureBuilder { Some((a, "dfs.fabric.microsoft.com")) | Some((a, "blob.fabric.microsoft.com")) => { self.account_name = Some(validate(a)?); - // if the container_name i.e the workspaceGUID is not set for fabric try to infer this - // from the url scheme https://onelake.dfs.fabric.microsoft.com///Files/test.csv - // or https://onelake.dfs.fabric.microsoft.com//.// + // Attempt to infer the container name from the URL + // - https://onelake.dfs.fabric.microsoft.com///Files/test.csv + // - https://onelake.dfs.fabric.microsoft.com//.// // // See - if let Some(workspace) = parsed.path_segments().unwrap().next() { if !workspace.is_empty() { self.container_name = Some(workspace.to_string()) } } + self.use_fabric_endpoint = true.into(); } _ => return Err(UrlNotRecognisedSnafu { url }.build().into()), }, @@ -1104,6 +1105,7 @@ mod tests { .unwrap(); assert_eq!(builder.account_name, Some("account".to_string())); assert_eq!(builder.container_name, Some("file_system".to_string())); + assert!(!builder.use_fabric_endpoint.get().unwrap()); let mut builder = MicrosoftAzureBuilder::new(); builder @@ -1111,6 +1113,7 @@ mod tests { .unwrap(); assert_eq!(builder.account_name, Some("account".to_string())); assert_eq!(builder.container_name, Some("file_system".to_string())); + assert!(builder.use_fabric_endpoint.get().unwrap()); let mut builder = MicrosoftAzureBuilder::new(); builder.parse_url("abfs://container/path").unwrap(); @@ -1129,12 +1132,14 @@ mod tests { .parse_url("https://account.dfs.core.windows.net/") .unwrap(); assert_eq!(builder.account_name, Some("account".to_string())); + assert!(!builder.use_fabric_endpoint.get().unwrap()); let mut builder = MicrosoftAzureBuilder::new(); builder .parse_url("https://account.blob.core.windows.net/") .unwrap(); assert_eq!(builder.account_name, Some("account".to_string())); + assert!(!builder.use_fabric_endpoint.get().unwrap()); let mut builder = MicrosoftAzureBuilder::new(); builder @@ -1142,6 +1147,7 @@ mod tests { .unwrap(); assert_eq!(builder.account_name, Some("account".to_string())); assert_eq!(builder.container_name, None); + assert!(builder.use_fabric_endpoint.get().unwrap()); let mut builder = MicrosoftAzureBuilder::new(); builder @@ -1149,6 +1155,7 @@ mod tests { .unwrap(); assert_eq!(builder.account_name, Some("account".to_string())); assert_eq!(builder.container_name.as_deref(), Some("container")); + assert!(builder.use_fabric_endpoint.get().unwrap()); let mut builder = MicrosoftAzureBuilder::new(); builder @@ -1156,6 +1163,7 @@ mod tests { .unwrap(); assert_eq!(builder.account_name, Some("account".to_string())); assert_eq!(builder.container_name, None); + assert!(builder.use_fabric_endpoint.get().unwrap()); let mut builder = MicrosoftAzureBuilder::new(); builder @@ -1163,6 +1171,7 @@ mod tests { .unwrap(); assert_eq!(builder.account_name, Some("account".to_string())); assert_eq!(builder.container_name.as_deref(), Some("container")); + assert!(builder.use_fabric_endpoint.get().unwrap()); let err_cases = [ "mailto://account.blob.core.windows.net/",