From 34e68fd120a233b73810a3cfe12ade57081b5168 Mon Sep 17 00:00:00 2001 From: kazdy Date: Mon, 30 Sep 2024 22:50:07 +0200 Subject: [PATCH 1/9] feat: implement datafusion TableProviderFactory --- crates/datafusion/src/lib.rs | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs index b090696..ac76c5c 100644 --- a/crates/datafusion/src/lib.rs +++ b/crates/datafusion/src/lib.rs @@ -24,7 +24,7 @@ use std::thread; use arrow_schema::SchemaRef; use async_trait::async_trait; -use datafusion::catalog::Session; +use datafusion::catalog::{Session, TableProviderFactory}; use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::datasource::physical_plan::parquet::ParquetExecBuilder; @@ -35,7 +35,7 @@ use datafusion_common::config::TableParquetOptions; use datafusion_common::DFSchema; use datafusion_common::DataFusionError::Execution; use datafusion_common::Result; -use datafusion_expr::{Expr, TableType}; +use datafusion_expr::{CreateExternalTable, Expr, TableType}; use datafusion_physical_expr::create_physical_expr; use hudi_core::config::read::HudiReadConfig::InputPartitions; @@ -150,6 +150,19 @@ impl TableProvider for HudiDataSource { } } +pub struct HudiTableProvider; + +#[async_trait] +impl TableProviderFactory for HudiTableProvider { + async fn create(&self, _state: &dyn Session, cmd: &CreateExternalTable) -> Result> { + let table_provider = match cmd.options.is_empty() { + true => HudiTable::new(cmd.location()).await?, + false => HudiTable::new_with_options(cmd.location(), cmd.options()).await?, + }; + Ok(Arc::new(table_provider)) + } +} + #[cfg(test)] mod tests { use std::fs::canonicalize; From 70b6b1416c74cf27b59110a91d1f4e2c4f923a9d Mon Sep 17 00:00:00 2001 From: kazdy Date: Mon, 30 Sep 2024 23:04:39 +0200 Subject: [PATCH 2/9] feat: HudiTableProvider uses HudiDataSource instead HudiTable --- crates/datafusion/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs index ac76c5c..3f18b55 100644 --- a/crates/datafusion/src/lib.rs +++ b/crates/datafusion/src/lib.rs @@ -156,8 +156,8 @@ pub struct HudiTableProvider; impl TableProviderFactory for HudiTableProvider { async fn create(&self, _state: &dyn Session, cmd: &CreateExternalTable) -> Result> { let table_provider = match cmd.options.is_empty() { - true => HudiTable::new(cmd.location()).await?, - false => HudiTable::new_with_options(cmd.location(), cmd.options()).await?, + true => HudiDataSource::new(&*cmd.location).await?, + false => HudiDataSource::new_with_options(&*cmd.location, &cmd.options).await?, }; Ok(Arc::new(table_provider)) } From 416f3c9e7f0b2a0f51e2c54d2d050c57ff7e1ea4 Mon Sep 17 00:00:00 2001 From: kazdy Date: Mon, 30 Sep 2024 23:10:53 +0200 Subject: [PATCH 3/9] chore: tidy up --- crates/datafusion/src/lib.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs index 3f18b55..4947f4a 100644 --- a/crates/datafusion/src/lib.rs +++ b/crates/datafusion/src/lib.rs @@ -154,10 +154,14 @@ pub struct HudiTableProvider; #[async_trait] impl TableProviderFactory for HudiTableProvider { - async fn create(&self, _state: &dyn Session, cmd: &CreateExternalTable) -> Result> { + async fn create( + &self, + _state: &dyn Session, + cmd: &CreateExternalTable, + ) -> Result> { let table_provider = match cmd.options.is_empty() { - true => HudiDataSource::new(&*cmd.location).await?, - false => HudiDataSource::new_with_options(&*cmd.location, &cmd.options).await?, + true => HudiDataSource::new(&cmd.location).await?, + false => HudiDataSource::new_with_options(&cmd.location, &cmd.options).await?, }; Ok(Arc::new(table_provider)) } From 431f8e130b6c6ef2577990ad9b1e98a85146f607 Mon Sep 17 00:00:00 2001 From: kazdy Date: Mon, 30 Sep 2024 23:28:44 +0200 Subject: [PATCH 4/9] chore make HudiDataSource own location and options --- crates/datafusion/src/lib.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs index 4947f4a..8277caf 100644 --- a/crates/datafusion/src/lib.rs +++ b/crates/datafusion/src/lib.rs @@ -160,8 +160,14 @@ impl TableProviderFactory for HudiTableProvider { cmd: &CreateExternalTable, ) -> Result> { let table_provider = match cmd.options.is_empty() { - true => HudiDataSource::new(&cmd.location).await?, - false => HudiDataSource::new_with_options(&cmd.location, &cmd.options).await?, + true => HudiDataSource::new(cmd.to_owned().location.as_str()).await?, + false => { + HudiDataSource::new_with_options( + cmd.to_owned().location.as_str(), + cmd.to_owned().options, + ) + .await? + } }; Ok(Arc::new(table_provider)) } From 773174330522cc2889ba94d324138681b0488387 Mon Sep 17 00:00:00 2001 From: kazdy Date: Mon, 30 Sep 2024 23:35:49 +0200 Subject: [PATCH 5/9] chore: make HudiDataSource own location and options --- crates/datafusion/src/lib.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs index 8277caf..bff8915 100644 --- a/crates/datafusion/src/lib.rs +++ b/crates/datafusion/src/lib.rs @@ -160,11 +160,11 @@ impl TableProviderFactory for HudiTableProvider { cmd: &CreateExternalTable, ) -> Result> { let table_provider = match cmd.options.is_empty() { - true => HudiDataSource::new(cmd.to_owned().location.as_str()).await?, + true => HudiDataSource::new(cmd.location.to_owned().as_str()).await?, false => { HudiDataSource::new_with_options( - cmd.to_owned().location.as_str(), - cmd.to_owned().options, + cmd.location.to_owned().as_str(), + cmd.options.to_owned(), ) .await? } From b8617beeba7a6b0d474a2b7f6b4dd51f8cd5284d Mon Sep 17 00:00:00 2001 From: kazdy Date: Thu, 10 Oct 2024 19:59:08 +0200 Subject: [PATCH 6/9] feat: implement tests for datafusion TableProviderFactory --- crates/datafusion/src/lib.rs | 63 ++++++++++++++++++++++++++++++++++-- 1 file changed, 61 insertions(+), 2 deletions(-) diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs index bff8915..48517b3 100644 --- a/crates/datafusion/src/lib.rs +++ b/crates/datafusion/src/lib.rs @@ -150,10 +150,10 @@ impl TableProvider for HudiDataSource { } } -pub struct HudiTableProvider; +pub struct HudiTableFactory; #[async_trait] -impl TableProviderFactory for HudiTableProvider { +impl TableProviderFactory for HudiTableFactory { async fn create( &self, _state: &dyn Session, @@ -179,6 +179,10 @@ mod tests { use std::path::Path; use std::sync::Arc; + // use datafusion::catalog::TableProviderFactory; + // use datafusion::execution::context::SessionState; + // use datafusion::execution::runtime_env::RuntimeEnv; + use datafusion::execution::session_state::SessionStateBuilder; use datafusion::prelude::{SessionConfig, SessionContext}; use datafusion_common::ScalarValue; use url::Url; @@ -193,6 +197,7 @@ mod tests { use utils::{get_bool_column, get_i32_column, get_str_column}; use crate::HudiDataSource; + use crate::HudiTableFactory; #[tokio::test] async fn get_default_input_partitions() { @@ -221,6 +226,24 @@ mod tests { ctx } + async fn prepare_session_context_with_table_factory() -> SessionContext { + let config = SessionConfig::new().set( + "datafusion.sql_parser.enable_ident_normalization", + &ScalarValue::from(false), + ); + + let mut session_state = SessionStateBuilder::new() + .with_default_features() + .with_config(config) + .build(); + + session_state + .table_factories_mut() + .insert("HUDITABLE".to_string(), Arc::new(HudiTableFactory {})); + + SessionContext::new_with_state(session_state) + } + async fn verify_plan( ctx: &SessionContext, sql: &str, @@ -317,4 +340,40 @@ mod tests { verify_data_with_replacecommits(&ctx, &sql, test_table.as_ref()).await } } + + #[tokio::test] + async fn datafusion_read_external_hudi_table() { + for test_table in &[ + V6ComplexkeygenHivestyle, + V6Nonpartitioned, + V6SimplekeygenNonhivestyle, + V6SimplekeygenHivestyleNoMetafields, + V6TimebasedkeygenNonhivestyle, + ] { + println!(">>> testing for {}", test_table.as_ref()); + let ctx = prepare_session_context_with_table_factory().await; + let base_path = test_table.path(); + + let create_table_sql = format!( + "CREATE EXTERNAL TABLE {} STORED AS HUDITABLE LOCATION '{}'", + test_table.as_ref(), + base_path + ); + + let _ = ctx + .sql(create_table_sql.as_str()) + .await + .expect("Failed to register table"); + + let sql = format!( + r#" + SELECT id, name, isActive, structField.field2 + FROM {} WHERE id % 2 = 0 + AND structField.field2 > 30 ORDER BY name LIMIT 10"#, + test_table.as_ref() + ); + + verify_data(&ctx, &sql, test_table.as_ref()).await + } + } } From 1ac4b42dde9187a73bc6f0e51736167a8fe28550 Mon Sep 17 00:00:00 2001 From: kazdy Date: Thu, 10 Oct 2024 20:03:04 +0200 Subject: [PATCH 7/9] chore: remove unused imports --- crates/datafusion/src/lib.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs index 48517b3..0b8103e 100644 --- a/crates/datafusion/src/lib.rs +++ b/crates/datafusion/src/lib.rs @@ -179,9 +179,6 @@ mod tests { use std::path::Path; use std::sync::Arc; - // use datafusion::catalog::TableProviderFactory; - // use datafusion::execution::context::SessionState; - // use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::execution::session_state::SessionStateBuilder; use datafusion::prelude::{SessionConfig, SessionContext}; use datafusion_common::ScalarValue; From 73b3ccd75cfffbcfd9aa5914b9c45848c2508745 Mon Sep 17 00:00:00 2001 From: kazdy Date: Sun, 13 Oct 2024 15:43:34 +0200 Subject: [PATCH 8/9] test if can pass options when creating datafusion external table --- crates/datafusion/src/lib.rs | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs index 0b8103e..0bc5191 100644 --- a/crates/datafusion/src/lib.rs +++ b/crates/datafusion/src/lib.rs @@ -373,4 +373,37 @@ mod tests { verify_data(&ctx, &sql, test_table.as_ref()).await } } + + #[tokio::test] + async fn datafusion_read_external_hudi_table_with_replacecommits() { + for (test_table, planned_input_partitions) in + &[(V6SimplekeygenNonhivestyleOverwritetable, 1)] + { + println!(">>> testing for {}", test_table.as_ref()); + let ctx = prepare_session_context_with_table_factory().await; + let base_path = test_table.path(); + + let create_table_sql = format!( + "CREATE EXTERNAL TABLE {} STORED AS HUDITABLE LOCATION '{}' OPTIONS ('hoodie.read.input.partitions' '2');", + test_table.as_ref(), + base_path + ); + + let _ = ctx + .sql(create_table_sql.as_str()) + .await + .expect("Failed to register table"); + + let sql = format!( + r#" + SELECT id, name, isActive, structField.field2 + FROM {} WHERE id % 2 = 0 + AND structField.field2 > 30 ORDER BY name LIMIT 10"#, + test_table.as_ref() + ); + + verify_plan(&ctx, &sql, test_table.as_ref(), planned_input_partitions).await; + verify_data_with_replacecommits(&ctx, &sql, test_table.as_ref()).await + } + } } From cbd1c99b49593e6cce2c45c6c06ec1d61c5787c3 Mon Sep 17 00:00:00 2001 From: Shiyan Xu <2701446+xushiyan@users.noreply.github.com> Date: Mon, 14 Oct 2024 16:22:18 -0500 Subject: [PATCH 9/9] improve impl and tests --- crates/datafusion/src/lib.rs | 232 ++++++++++++++++++----------------- 1 file changed, 117 insertions(+), 115 deletions(-) diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs index 0bc5191..9ef4070 100644 --- a/crates/datafusion/src/lib.rs +++ b/crates/datafusion/src/lib.rs @@ -18,6 +18,7 @@ */ use std::any::Any; +use std::collections::HashMap; use std::fmt::Debug; use std::sync::Arc; use std::thread; @@ -150,38 +151,63 @@ impl TableProvider for HudiDataSource { } } -pub struct HudiTableFactory; +pub struct HudiTableFactory {} + +impl Default for HudiTableFactory { + fn default() -> Self { + Self::new() + } +} + +impl HudiTableFactory { + pub fn new() -> Self { + Self {} + } + + fn resolve_options( + state: &dyn Session, + cmd: &CreateExternalTable, + ) -> Result> { + let mut options: HashMap<_, _> = state + .config_options() + .entries() + .iter() + .filter_map(|e| { + let value = e.value.as_ref().filter(|v| !v.is_empty())?; + Some((e.key.clone(), value.clone())) + }) + .collect(); + + // options from the command take precedence + options.extend(cmd.options.iter().map(|(k, v)| (k.clone(), v.clone()))); + + Ok(options) + } +} #[async_trait] impl TableProviderFactory for HudiTableFactory { async fn create( &self, - _state: &dyn Session, + state: &dyn Session, cmd: &CreateExternalTable, ) -> Result> { - let table_provider = match cmd.options.is_empty() { - true => HudiDataSource::new(cmd.location.to_owned().as_str()).await?, - false => { - HudiDataSource::new_with_options( - cmd.location.to_owned().as_str(), - cmd.options.to_owned(), - ) - .await? - } - }; + let options = HudiTableFactory::resolve_options(state, cmd)?; + let base_uri = cmd.location.as_str(); + let table_provider = HudiDataSource::new_with_options(base_uri, options).await?; Ok(Arc::new(table_provider)) } } #[cfg(test)] mod tests { + use super::*; + use datafusion::execution::session_state::SessionStateBuilder; + use datafusion::prelude::{SessionConfig, SessionContext}; + use datafusion_common::{DataFusionError, ScalarValue}; use std::fs::canonicalize; use std::path::Path; use std::sync::Arc; - - use datafusion::execution::session_state::SessionStateBuilder; - use datafusion::prelude::{SessionConfig, SessionContext}; - use datafusion_common::ScalarValue; use url::Url; use hudi_core::config::read::HudiReadConfig::InputPartitions; @@ -205,42 +231,83 @@ mod tests { assert_eq!(hudi.get_input_partitions(), 0) } - async fn prepare_session_context( + async fn register_test_table_with_session( test_table: &TestTable, - options: Vec<(&str, &str)>, - ) -> SessionContext { - let config = SessionConfig::new().set( - "datafusion.sql_parser.enable_ident_normalization", - &ScalarValue::from(false), - ); - let ctx = SessionContext::new_with_config(config); - let base_url = test_table.url(); - let hudi = HudiDataSource::new_with_options(base_url.as_str(), options) - .await - .unwrap(); - ctx.register_table(test_table.as_ref(), Arc::new(hudi)) - .unwrap(); - ctx + options: I, + use_sql: bool, + ) -> Result + where + I: IntoIterator, + K: AsRef, + V: Into, + { + let ctx = create_test_session().await; + if use_sql { + let create_table_sql = format!( + "CREATE EXTERNAL TABLE {} STORED AS HUDI LOCATION '{}' {}", + test_table.as_ref(), + test_table.path(), + concat_as_sql_options(options) + ); + ctx.sql(create_table_sql.as_str()).await?; + } else { + let base_url = test_table.url(); + let hudi = HudiDataSource::new_with_options(base_url.as_str(), options).await?; + ctx.register_table(test_table.as_ref(), Arc::new(hudi))?; + } + Ok(ctx) } - async fn prepare_session_context_with_table_factory() -> SessionContext { + async fn create_test_session() -> SessionContext { let config = SessionConfig::new().set( "datafusion.sql_parser.enable_ident_normalization", &ScalarValue::from(false), ); - let mut session_state = SessionStateBuilder::new() .with_default_features() .with_config(config) .build(); - session_state .table_factories_mut() - .insert("HUDITABLE".to_string(), Arc::new(HudiTableFactory {})); + .insert("HUDI".to_string(), Arc::new(HudiTableFactory::new())); SessionContext::new_with_state(session_state) } + fn concat_as_sql_options(options: I) -> String + where + I: IntoIterator, + K: AsRef, + V: Into, + { + let kv_pairs: Vec = options + .into_iter() + .map(|(k, v)| format!("'{}' '{}'", k.as_ref(), v.into())) + .collect(); + + if kv_pairs.is_empty() { + String::new() + } else { + format!("OPTIONS ({})", kv_pairs.join(", ")) + } + } + + #[tokio::test] + async fn test_create_table_with_unknown_format() { + let test_table = V6Nonpartitioned; + let invalid_format = "UNKNOWN_FORMAT"; + let create_table_sql = format!( + "CREATE EXTERNAL TABLE {} STORED AS {} LOCATION '{}'", + test_table.as_ref(), + invalid_format, + test_table.path() + ); + + let ctx = create_test_session().await; + let result = ctx.sql(create_table_sql.as_str()).await; + assert!(result.is_err()); + } + async fn verify_plan( ctx: &SessionContext, sql: &str, @@ -279,16 +346,18 @@ mod tests { #[tokio::test] async fn datafusion_read_hudi_table() { - for (test_table, planned_input_partitions) in &[ - (V6ComplexkeygenHivestyle, 2), - (V6Nonpartitioned, 1), - (V6SimplekeygenNonhivestyle, 2), - (V6SimplekeygenHivestyleNoMetafields, 2), - (V6TimebasedkeygenNonhivestyle, 2), + for (test_table, use_sql, planned_input_partitions) in &[ + (V6ComplexkeygenHivestyle, true, 2), + (V6Nonpartitioned, true, 1), + (V6SimplekeygenNonhivestyle, false, 2), + (V6SimplekeygenHivestyleNoMetafields, true, 2), + (V6TimebasedkeygenNonhivestyle, false, 2), ] { println!(">>> testing for {}", test_table.as_ref()); - let options = vec![(InputPartitions.as_ref(), "2")]; - let ctx = prepare_session_context(test_table, options).await; + let options = [(InputPartitions, "2")]; + let ctx = register_test_table_with_session(test_table, options, *use_sql) + .await + .unwrap(); let sql = format!( r#" @@ -318,81 +387,14 @@ mod tests { #[tokio::test] async fn datafusion_read_hudi_table_with_replacecommits() { - for (test_table, planned_input_partitions) in - &[(V6SimplekeygenNonhivestyleOverwritetable, 1)] + for (test_table, use_sql, planned_input_partitions) in + &[(V6SimplekeygenNonhivestyleOverwritetable, true, 1)] { println!(">>> testing for {}", test_table.as_ref()); let ctx = - prepare_session_context(test_table, vec![(InputPartitions.as_ref(), "2")]).await; - - let sql = format!( - r#" - SELECT id, name, isActive, structField.field2 - FROM {} WHERE id % 2 = 0 - AND structField.field2 > 30 ORDER BY name LIMIT 10"#, - test_table.as_ref() - ); - - verify_plan(&ctx, &sql, test_table.as_ref(), planned_input_partitions).await; - verify_data_with_replacecommits(&ctx, &sql, test_table.as_ref()).await - } - } - - #[tokio::test] - async fn datafusion_read_external_hudi_table() { - for test_table in &[ - V6ComplexkeygenHivestyle, - V6Nonpartitioned, - V6SimplekeygenNonhivestyle, - V6SimplekeygenHivestyleNoMetafields, - V6TimebasedkeygenNonhivestyle, - ] { - println!(">>> testing for {}", test_table.as_ref()); - let ctx = prepare_session_context_with_table_factory().await; - let base_path = test_table.path(); - - let create_table_sql = format!( - "CREATE EXTERNAL TABLE {} STORED AS HUDITABLE LOCATION '{}'", - test_table.as_ref(), - base_path - ); - - let _ = ctx - .sql(create_table_sql.as_str()) - .await - .expect("Failed to register table"); - - let sql = format!( - r#" - SELECT id, name, isActive, structField.field2 - FROM {} WHERE id % 2 = 0 - AND structField.field2 > 30 ORDER BY name LIMIT 10"#, - test_table.as_ref() - ); - - verify_data(&ctx, &sql, test_table.as_ref()).await - } - } - - #[tokio::test] - async fn datafusion_read_external_hudi_table_with_replacecommits() { - for (test_table, planned_input_partitions) in - &[(V6SimplekeygenNonhivestyleOverwritetable, 1)] - { - println!(">>> testing for {}", test_table.as_ref()); - let ctx = prepare_session_context_with_table_factory().await; - let base_path = test_table.path(); - - let create_table_sql = format!( - "CREATE EXTERNAL TABLE {} STORED AS HUDITABLE LOCATION '{}' OPTIONS ('hoodie.read.input.partitions' '2');", - test_table.as_ref(), - base_path - ); - - let _ = ctx - .sql(create_table_sql.as_str()) - .await - .expect("Failed to register table"); + register_test_table_with_session(test_table, [(InputPartitions, "2")], *use_sql) + .await + .unwrap(); let sql = format!( r#"