Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Support for Microsoft Fabric / OneLake #4573

Merged
merged 9 commits into from
Aug 11, 2023
101 changes: 100 additions & 1 deletion object_store/src/azure/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,10 @@ pub struct MicrosoftAzureBuilder {
client_options: ClientOptions,
/// Credentials
credentials: Option<AzureCredentialProvider>,
/// When set to true, fabric url scheme will be used
///
/// i.e. https://{account_name}.dfs.fabric.microsoft.com
use_fabric_endpoint: ConfigValue<bool>,
}

/// Configuration keys for [`MicrosoftAzureBuilder`]
Expand Down Expand Up @@ -430,6 +434,13 @@ pub enum AzureConfigKey {
/// - `use_emulator`
UseEmulator,

/// Use object store with url scheme account.dfs.fabric.microsoft.com
///
/// Supported keys:
/// - `azure_use_fabric_endpoint`
/// - `use_fabric_endpoint`
UseFabricEndpoint,

/// Endpoint to request a imds managed identity token
///
/// Supported keys:
Expand Down Expand Up @@ -482,6 +493,7 @@ impl AsRef<str> for AzureConfigKey {
Self::SasKey => "azure_storage_sas_key",
Self::Token => "azure_storage_token",
Self::UseEmulator => "azure_storage_use_emulator",
Self::UseFabricEndpoint => "azure_use_fabric_endpoint",
Self::MsiEndpoint => "azure_msi_endpoint",
Self::ObjectId => "azure_object_id",
Self::MsiResourceId => "azure_msi_resource_id",
Expand Down Expand Up @@ -531,6 +543,9 @@ impl FromStr for AzureConfigKey {
"azure_federated_token_file" | "federated_token_file" => {
Ok(Self::FederatedTokenFile)
}
"azure_use_fabric_endpoint" | "use_fabric_endpoint" => {
Ok(Self::UseFabricEndpoint)
}
"azure_use_azure_cli" | "use_azure_cli" => Ok(Self::UseAzureCli),
// Backwards compatibility
"azure_allow_http" => Ok(Self::Client(ClientConfigKey::AllowHttp)),
Expand Down Expand Up @@ -600,11 +615,16 @@ impl MicrosoftAzureBuilder {
///
/// - `abfs[s]://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
/// - `abfs[s]://<file_system>@<account_name>.dfs.core.windows.net/<path>`
/// - `abfs[s]://<file_system>@<account_name>.dfs.fabric.microsoft.com/<path>`
/// - `az://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
/// - `adl://<container>/<path>` (according to [fsspec](https://github.com/fsspec/adlfs))
/// - `azure://<container>/<path>` (custom)
/// - `https://<account>.dfs.core.windows.net`
/// - `https://<account>.blob.core.windows.net`
/// - `https://<account>.dfs.fabric.microsoft.com`
/// - `https://<account>.dfs.fabric.microsoft.com/<container>`
/// - `https://<account>.blob.fabric.microsoft.com`
/// - `https://<account>.blob.fabric.microsoft.com/<container>`
///
/// Note: Settings derived from the URL will override any others set on this builder
///
Expand Down Expand Up @@ -639,6 +659,7 @@ impl MicrosoftAzureBuilder {
}
AzureConfigKey::UseAzureCli => self.use_azure_cli.parse(value),
AzureConfigKey::UseEmulator => self.use_emulator.parse(value),
AzureConfigKey::UseFabricEndpoint => self.use_fabric_endpoint.parse(value),
AzureConfigKey::Client(key) => {
self.client_options = self.client_options.with_config(key, value)
}
Expand Down Expand Up @@ -692,6 +713,9 @@ impl MicrosoftAzureBuilder {
AzureConfigKey::SasKey => self.sas_key.clone(),
AzureConfigKey::Token => self.bearer_token.clone(),
AzureConfigKey::UseEmulator => Some(self.use_emulator.to_string()),
AzureConfigKey::UseFabricEndpoint => {
Some(self.use_fabric_endpoint.to_string())
}
AzureConfigKey::MsiEndpoint => self.msi_endpoint.clone(),
AzureConfigKey::ObjectId => self.object_id.clone(),
AzureConfigKey::MsiResourceId => self.msi_resource_id.clone(),
Expand Down Expand Up @@ -724,6 +748,10 @@ impl MicrosoftAzureBuilder {
} else if let Some(a) = host.strip_suffix(".dfs.core.windows.net") {
self.container_name = Some(validate(parsed.username())?);
self.account_name = Some(validate(a)?);
} else if let Some(a) = host.strip_suffix(".dfs.fabric.microsoft.com") {
self.container_name = Some(validate(parsed.username())?);
self.account_name = Some(validate(a)?);
self.use_fabric_endpoint = true.into();
} else {
return Err(UrlNotRecognisedSnafu { url }.build().into());
}
Expand All @@ -733,6 +761,21 @@ impl MicrosoftAzureBuilder {
| Some((a, "blob.core.windows.net")) => {
self.account_name = Some(validate(a)?);
}
Some((a, "dfs.fabric.microsoft.com"))
| Some((a, "blob.fabric.microsoft.com")) => {
self.account_name = Some(validate(a)?);
// Attempt to infer the container name from the URL
// - https://onelake.dfs.fabric.microsoft.com/<workspaceGUID>/<itemGUID>/Files/test.csv
// - https://onelake.dfs.fabric.microsoft.com/<workspace>/<item>.<itemtype>/<path>/<fileName>
//
// See <https://learn.microsoft.com/en-us/fabric/onelake/onelake-access-api>
if let Some(workspace) = parsed.path_segments().unwrap().next() {
if !workspace.is_empty() {
self.container_name = Some(workspace.to_string())
}
}
self.use_fabric_endpoint = true.into();
}
_ => return Err(UrlNotRecognisedSnafu { url }.build().into()),
},
scheme => return Err(UnknownUrlSchemeSnafu { scheme }.build().into()),
Expand Down Expand Up @@ -819,6 +862,14 @@ impl MicrosoftAzureBuilder {
self
}

/// Set if Microsoft Fabric url scheme should be used (defaults to false)
/// When disabled the url scheme used is `https://{account}.blob.core.windows.net`
/// When enabled the url scheme used is `https://{account}.dfs.fabric.microsoft.com`
pub fn with_use_fabric_endpoint(mut self, use_fabric_endpoint: bool) -> Self {
self.use_fabric_endpoint = use_fabric_endpoint.into();
self
}

/// Sets what protocol is allowed. If `allow_http` is :
/// * false (default): Only HTTPS are allowed
/// * true: HTTP and HTTPS are allowed
Expand Down Expand Up @@ -885,6 +936,7 @@ impl MicrosoftAzureBuilder {
}

let container = self.container_name.ok_or(Error::MissingContainerName {})?;

let static_creds = |credential: AzureCredential| -> AzureCredentialProvider {
Arc::new(StaticCredentialProvider::new(credential))
};
Expand All @@ -906,7 +958,11 @@ impl MicrosoftAzureBuilder {
(true, url, credential, account_name)
} else {
let account_name = self.account_name.ok_or(Error::MissingAccount {})?;
let account_url = format!("https://{}.blob.core.windows.net", &account_name);
let account_url = match self.use_fabric_endpoint.get()? {
true => format!("https://{}.blob.fabric.microsoft.com", &account_name),
false => format!("https://{}.blob.core.windows.net", &account_name),
};

let url = Url::parse(&account_url)
.context(UnableToParseUrlSnafu { url: account_url })?;

Expand Down Expand Up @@ -1049,6 +1105,15 @@ mod tests {
.unwrap();
assert_eq!(builder.account_name, Some("account".to_string()));
assert_eq!(builder.container_name, Some("file_system".to_string()));
assert!(!builder.use_fabric_endpoint.get().unwrap());

let mut builder = MicrosoftAzureBuilder::new();
builder
.parse_url("abfss://file_system@account.dfs.fabric.microsoft.com/")
.unwrap();
assert_eq!(builder.account_name, Some("account".to_string()));
assert_eq!(builder.container_name, Some("file_system".to_string()));
assert!(builder.use_fabric_endpoint.get().unwrap());

let mut builder = MicrosoftAzureBuilder::new();
builder.parse_url("abfs://container/path").unwrap();
Expand All @@ -1067,12 +1132,46 @@ mod tests {
.parse_url("https://account.dfs.core.windows.net/")
.unwrap();
assert_eq!(builder.account_name, Some("account".to_string()));
assert!(!builder.use_fabric_endpoint.get().unwrap());

let mut builder = MicrosoftAzureBuilder::new();
builder
.parse_url("https://account.blob.core.windows.net/")
.unwrap();
assert_eq!(builder.account_name, Some("account".to_string()));
assert!(!builder.use_fabric_endpoint.get().unwrap());

let mut builder = MicrosoftAzureBuilder::new();
builder
.parse_url("https://account.dfs.fabric.microsoft.com/")
.unwrap();
assert_eq!(builder.account_name, Some("account".to_string()));
assert_eq!(builder.container_name, None);
assert!(builder.use_fabric_endpoint.get().unwrap());

let mut builder = MicrosoftAzureBuilder::new();
builder
.parse_url("https://account.dfs.fabric.microsoft.com/container")
.unwrap();
assert_eq!(builder.account_name, Some("account".to_string()));
assert_eq!(builder.container_name.as_deref(), Some("container"));
assert!(builder.use_fabric_endpoint.get().unwrap());

let mut builder = MicrosoftAzureBuilder::new();
builder
.parse_url("https://account.blob.fabric.microsoft.com/")
.unwrap();
assert_eq!(builder.account_name, Some("account".to_string()));
assert_eq!(builder.container_name, None);
assert!(builder.use_fabric_endpoint.get().unwrap());

let mut builder = MicrosoftAzureBuilder::new();
builder
.parse_url("https://account.blob.fabric.microsoft.com/container")
.unwrap();
assert_eq!(builder.account_name, Some("account".to_string()));
assert_eq!(builder.container_name.as_deref(), Some("container"));
assert!(builder.use_fabric_endpoint.get().unwrap());

let err_cases = [
"mailto://account.blob.core.windows.net/",
Expand Down
Loading