Skip to content

Commit

Permalink
feat: add datafusion storage catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
roeap committed May 20, 2023
1 parent dde2f7e commit 75019f5
Show file tree
Hide file tree
Showing 3 changed files with 205 additions and 7 deletions.
12 changes: 6 additions & 6 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ async-trait = "0.1"
bytes = "1"
chrono = { version = "0.4.22", default-features = false, features = ["clock"] }
cfg-if = "1"
dashmap = "5"
datafusion-objectstore-hdfs = { version = "0.1.3", default-features = false, features = [
"hdfs3",
"try_spawn_blocking",
Expand Down Expand Up @@ -60,7 +61,10 @@ rusoto_dynamodb = { version = "0.47", default-features = false, optional = true
rusoto_glue = { version = "0.47", default-features = false, optional = true }

# Unity
reqwest = { version = "0.11", default-features = false, features = ["rustls-tls", "json"], optional = true }
reqwest = { version = "0.11", default-features = false, features = [
"rustls-tls",
"json",
], optional = true }
reqwest-middleware = { version = "0.2.1", optional = true }
reqwest-retry = { version = "0.2.2", optional = true }

Expand Down Expand Up @@ -142,11 +146,7 @@ s3 = [
"object_store/aws",
"object_store/aws_profile",
]
unity-experimental = [
"reqwest",
"reqwest-middleware",
"reqwest-retry",
]
unity-experimental = ["reqwest", "reqwest-middleware", "reqwest-retry"]

[[bench]]
name = "read_checkpoint"
Expand Down
6 changes: 5 additions & 1 deletion rust/src/data_catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@ use std::fmt::Debug;

#[cfg(feature = "glue")]
pub mod glue;

#[cfg(feature = "datafusion")]
pub mod storage;
#[cfg(feature = "unity-experimental")]
pub mod unity;

/// A result type for data catalog implementations
pub type DataCatalogResult<T> = Result<T, DataCatalogError>;

/// Error enum that represents a CatalogError.
#[derive(thiserror::Error, Debug)]
pub enum DataCatalogError {
Expand Down
194 changes: 194 additions & 0 deletions rust/src/data_catalog/storage/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
//! listing_schema contains a SchemaProvider that scans ObjectStores for tables automatically
use std::any::Any;
use std::collections::{HashMap, HashSet};
use std::path::Path;
use std::sync::Arc;

use async_trait::async_trait;
use dashmap::DashMap;
use datafusion::catalog::schema::SchemaProvider;
use datafusion::datasource::TableProvider;
use datafusion_common::DataFusionError;
use futures::TryStreamExt;
use itertools::Itertools;
use object_store::ObjectStore;

use crate::storage::config::{configure_store, StorageOptions};
use crate::{ensure_table_uri, open_table_with_storage_options, DeltaResult};

const DELTA_LOG_FOLDER: &str = "_delta_log";

/// A `SchemaProvider` that scans an `ObjectStore` to automatically discover delta tables.
///
/// A subfolder relationship is assumed, i.e. given:
/// authority = s3://host.example.com:3000
/// path = /data/tpch
///
/// A table called "customer" will be registered for the folder:
/// s3://host.example.com:3000/data/tpch/customer
///
/// assuming it contains valid deltalake data, i.e a `_delta_log` folder:
/// s3://host.example.com:3000/data/tpch/customer/_delta_log/
pub struct ListingSchemaProvider {
authority: String,
path: object_store::path::Path,
/// Underlying object store
store: Arc<dyn ObjectStore>,
/// A map of table names to a fully quilfied storage location
tables: DashMap<String, String>,
/// Options used to create underlying object stores
storage_options: StorageOptions,
}

impl ListingSchemaProvider {
/// Create a new [`ListingSchemaProvider`]
pub fn try_new(
root_uri: impl AsRef<str>,
storage_options: Option<HashMap<String, String>>,
) -> DeltaResult<Self> {
let uri = ensure_table_uri(root_uri)?;
let mut authority = uri.clone();
authority.set_path("");
let authority = authority.to_string();
let path = object_store::path::Path::from(uri.path());
let storage_options = storage_options.unwrap_or_default().into();
// We already parsed the url, so unwrapping is safe.
let store = configure_store(&url::Url::parse(&authority).unwrap(), &storage_options)?;
Ok(Self {
authority,
path,
store,
tables: DashMap::new(),
storage_options,
})
}

/// Reload table information from ObjectStore
pub async fn refresh(&self) -> datafusion_common::Result<()> {
let entries: Vec<_> = self
.store
.list(Some(&self.path))
.await?
.try_collect()
.await?;
let base = Path::new(self.path.as_ref());
let mut tables = HashSet::new();
for file in entries.iter() {
let mut parent = Path::new(file.location.as_ref());
while let Some(p) = parent.parent() {
if parent.ends_with(DELTA_LOG_FOLDER) {
tables.insert(p);
break;
}
if p == base {
break;
}
parent = p;
}
}
for table in tables.into_iter() {
let file_name = table
.file_name()
.ok_or_else(|| DataFusionError::Internal("Cannot parse file name!".to_string()))?
.to_str()
.ok_or_else(|| DataFusionError::Internal("Cannot parse file name!".to_string()))?
.replace(['-', '.'], "_")
.to_ascii_lowercase();
let table_name = file_name.split('.').collect_vec()[0];
let table_path = table
.to_str()
.ok_or_else(|| DataFusionError::Internal("Cannot parse file name!".to_string()))?
.to_string();

if !self.table_exist(table_name) {
let table_url = format!("{}/{}", self.authority, table_path);
self.tables.insert(table_name.to_string(), table_url);
}
}
Ok(())
}
}

#[async_trait]
impl SchemaProvider for ListingSchemaProvider {
fn as_any(&self) -> &dyn Any {
self
}

fn table_names(&self) -> Vec<String> {
self.tables.iter().map(|t| t.key().clone()).collect()
}

async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
let location = self.tables.get(name).map(|t| t.clone())?;
let provider = open_table_with_storage_options(location, self.storage_options.0.clone())
.await
.ok()?;
Some(Arc::new(provider) as Arc<dyn TableProvider>)
}

fn register_table(
&self,
_name: String,
_table: Arc<dyn TableProvider>,
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
Err(DataFusionError::Execution(
"schema provider does not support registering tables".to_owned(),
))
}

fn deregister_table(
&self,
_name: &str,
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
Err(DataFusionError::Execution(
"schema provider does not support deregistering tables".to_owned(),
))
}

fn table_exist(&self, name: &str) -> bool {
self.tables.contains_key(name)
}
}

#[cfg(test)]
mod tests {
use super::*;
use datafusion::assert_batches_sorted_eq;
use datafusion::catalog::catalog::{CatalogProvider, MemoryCatalogProvider};
use datafusion::execution::context::SessionContext;

#[tokio::test]
async fn test_table_names() {
let fs = ListingSchemaProvider::try_new("./tests/data/", None).unwrap();
fs.refresh().await.unwrap();
let table_names = fs.table_names();
assert!(table_names.len() > 20);
assert!(table_names.contains(&"simple_table".to_string()))
}

#[tokio::test]
async fn test_query_table() {
let schema = Arc::new(ListingSchemaProvider::try_new("./tests/data/", None).unwrap());
schema.refresh().await.unwrap();

let ctx = SessionContext::new();
let catalog = Arc::new(MemoryCatalogProvider::default());
catalog.register_schema("test", schema).unwrap();
ctx.register_catalog("delta", catalog);

let data = ctx
.sql("SELECT * FROM delta.test.simple_table")
.await
.unwrap()
.collect()
.await
.unwrap();

let expected = vec![
"+----+", "| id |", "+----+", "| 5 |", "| 7 |", "| 9 |", "+----+",
];

assert_batches_sorted_eq!(&expected, &data);
}
}

0 comments on commit 75019f5

Please sign in to comment.