From 75019f501cbe7c58d8191890d81fed1417e260ad Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Sat, 20 May 2023 11:11:16 +0200 Subject: [PATCH] feat: add datafusion storage catalog --- rust/Cargo.toml | 12 +- rust/src/data_catalog/mod.rs | 6 +- rust/src/data_catalog/storage/mod.rs | 194 +++++++++++++++++++++++++++ 3 files changed, 205 insertions(+), 7 deletions(-) create mode 100644 rust/src/data_catalog/storage/mod.rs diff --git a/rust/Cargo.toml b/rust/Cargo.toml index fbd7f0df6e..b8e7e8c3e5 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -21,6 +21,7 @@ async-trait = "0.1" bytes = "1" chrono = { version = "0.4.22", default-features = false, features = ["clock"] } cfg-if = "1" +dashmap = "5" datafusion-objectstore-hdfs = { version = "0.1.3", default-features = false, features = [ "hdfs3", "try_spawn_blocking", @@ -60,7 +61,10 @@ rusoto_dynamodb = { version = "0.47", default-features = false, optional = true rusoto_glue = { version = "0.47", default-features = false, optional = true } # Unity -reqwest = { version = "0.11", default-features = false, features = ["rustls-tls", "json"], optional = true } +reqwest = { version = "0.11", default-features = false, features = [ + "rustls-tls", + "json", +], optional = true } reqwest-middleware = { version = "0.2.1", optional = true } reqwest-retry = { version = "0.2.2", optional = true } @@ -142,11 +146,7 @@ s3 = [ "object_store/aws", "object_store/aws_profile", ] -unity-experimental = [ - "reqwest", - "reqwest-middleware", - "reqwest-retry", -] +unity-experimental = ["reqwest", "reqwest-middleware", "reqwest-retry"] [[bench]] name = "read_checkpoint" diff --git a/rust/src/data_catalog/mod.rs b/rust/src/data_catalog/mod.rs index cfe4ea8cf1..354a6ec1bd 100644 --- a/rust/src/data_catalog/mod.rs +++ b/rust/src/data_catalog/mod.rs @@ -4,10 +4,14 @@ use std::fmt::Debug; #[cfg(feature = "glue")] pub mod glue; - +#[cfg(feature = "datafusion")] +pub mod storage; #[cfg(feature = "unity-experimental")] pub mod unity; +/// A result type for data catalog implementations +pub type DataCatalogResult = Result; + /// Error enum that represents a CatalogError. #[derive(thiserror::Error, Debug)] pub enum DataCatalogError { diff --git a/rust/src/data_catalog/storage/mod.rs b/rust/src/data_catalog/storage/mod.rs new file mode 100644 index 0000000000..e961506e8a --- /dev/null +++ b/rust/src/data_catalog/storage/mod.rs @@ -0,0 +1,194 @@ +//! listing_schema contains a SchemaProvider that scans ObjectStores for tables automatically +use std::any::Any; +use std::collections::{HashMap, HashSet}; +use std::path::Path; +use std::sync::Arc; + +use async_trait::async_trait; +use dashmap::DashMap; +use datafusion::catalog::schema::SchemaProvider; +use datafusion::datasource::TableProvider; +use datafusion_common::DataFusionError; +use futures::TryStreamExt; +use itertools::Itertools; +use object_store::ObjectStore; + +use crate::storage::config::{configure_store, StorageOptions}; +use crate::{ensure_table_uri, open_table_with_storage_options, DeltaResult}; + +const DELTA_LOG_FOLDER: &str = "_delta_log"; + +/// A `SchemaProvider` that scans an `ObjectStore` to automatically discover delta tables. +/// +/// A subfolder relationship is assumed, i.e. given: +/// authority = s3://host.example.com:3000 +/// path = /data/tpch +/// +/// A table called "customer" will be registered for the folder: +/// s3://host.example.com:3000/data/tpch/customer +/// +/// assuming it contains valid deltalake data, i.e a `_delta_log` folder: +/// s3://host.example.com:3000/data/tpch/customer/_delta_log/ +pub struct ListingSchemaProvider { + authority: String, + path: object_store::path::Path, + /// Underlying object store + store: Arc, + /// A map of table names to a fully quilfied storage location + tables: DashMap, + /// Options used to create underlying object stores + storage_options: StorageOptions, +} + +impl ListingSchemaProvider { + /// Create a new [`ListingSchemaProvider`] + pub fn try_new( + root_uri: impl AsRef, + storage_options: Option>, + ) -> DeltaResult { + let uri = ensure_table_uri(root_uri)?; + let mut authority = uri.clone(); + authority.set_path(""); + let authority = authority.to_string(); + let path = object_store::path::Path::from(uri.path()); + let storage_options = storage_options.unwrap_or_default().into(); + // We already parsed the url, so unwrapping is safe. + let store = configure_store(&url::Url::parse(&authority).unwrap(), &storage_options)?; + Ok(Self { + authority, + path, + store, + tables: DashMap::new(), + storage_options, + }) + } + + /// Reload table information from ObjectStore + pub async fn refresh(&self) -> datafusion_common::Result<()> { + let entries: Vec<_> = self + .store + .list(Some(&self.path)) + .await? + .try_collect() + .await?; + let base = Path::new(self.path.as_ref()); + let mut tables = HashSet::new(); + for file in entries.iter() { + let mut parent = Path::new(file.location.as_ref()); + while let Some(p) = parent.parent() { + if parent.ends_with(DELTA_LOG_FOLDER) { + tables.insert(p); + break; + } + if p == base { + break; + } + parent = p; + } + } + for table in tables.into_iter() { + let file_name = table + .file_name() + .ok_or_else(|| DataFusionError::Internal("Cannot parse file name!".to_string()))? + .to_str() + .ok_or_else(|| DataFusionError::Internal("Cannot parse file name!".to_string()))? + .replace(['-', '.'], "_") + .to_ascii_lowercase(); + let table_name = file_name.split('.').collect_vec()[0]; + let table_path = table + .to_str() + .ok_or_else(|| DataFusionError::Internal("Cannot parse file name!".to_string()))? + .to_string(); + + if !self.table_exist(table_name) { + let table_url = format!("{}/{}", self.authority, table_path); + self.tables.insert(table_name.to_string(), table_url); + } + } + Ok(()) + } +} + +#[async_trait] +impl SchemaProvider for ListingSchemaProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn table_names(&self) -> Vec { + self.tables.iter().map(|t| t.key().clone()).collect() + } + + async fn table(&self, name: &str) -> Option> { + let location = self.tables.get(name).map(|t| t.clone())?; + let provider = open_table_with_storage_options(location, self.storage_options.0.clone()) + .await + .ok()?; + Some(Arc::new(provider) as Arc) + } + + fn register_table( + &self, + _name: String, + _table: Arc, + ) -> datafusion_common::Result>> { + Err(DataFusionError::Execution( + "schema provider does not support registering tables".to_owned(), + )) + } + + fn deregister_table( + &self, + _name: &str, + ) -> datafusion_common::Result>> { + Err(DataFusionError::Execution( + "schema provider does not support deregistering tables".to_owned(), + )) + } + + fn table_exist(&self, name: &str) -> bool { + self.tables.contains_key(name) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use datafusion::assert_batches_sorted_eq; + use datafusion::catalog::catalog::{CatalogProvider, MemoryCatalogProvider}; + use datafusion::execution::context::SessionContext; + + #[tokio::test] + async fn test_table_names() { + let fs = ListingSchemaProvider::try_new("./tests/data/", None).unwrap(); + fs.refresh().await.unwrap(); + let table_names = fs.table_names(); + assert!(table_names.len() > 20); + assert!(table_names.contains(&"simple_table".to_string())) + } + + #[tokio::test] + async fn test_query_table() { + let schema = Arc::new(ListingSchemaProvider::try_new("./tests/data/", None).unwrap()); + schema.refresh().await.unwrap(); + + let ctx = SessionContext::new(); + let catalog = Arc::new(MemoryCatalogProvider::default()); + catalog.register_schema("test", schema).unwrap(); + ctx.register_catalog("delta", catalog); + + let data = ctx + .sql("SELECT * FROM delta.test.simple_table") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let expected = vec![ + "+----+", "| id |", "+----+", "| 5 |", "| 7 |", "| 9 |", "+----+", + ]; + + assert_batches_sorted_eq!(&expected, &data); + } +}