Skip to content

Commit

Permalink
improve impl and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
xushiyan committed Oct 14, 2024
1 parent abf8f8a commit caf1233
Showing 1 changed file with 116 additions and 115 deletions.
231 changes: 116 additions & 115 deletions crates/datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/

use std::any::Any;
use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;
use std::thread;
Expand Down Expand Up @@ -143,38 +144,62 @@ impl TableProvider for HudiDataSource {
}
}

pub struct HudiTableFactory;
pub struct HudiTableFactory {}

impl Default for HudiTableFactory {
fn default() -> Self {
Self::new()
}
}

impl HudiTableFactory {
pub fn new() -> Self {
Self {}
}
fn resolve_options(
state: &dyn Session,
cmd: &CreateExternalTable,
) -> Result<HashMap<String, String>> {
let mut options: HashMap<_, _> = state
.config_options()
.entries()
.iter()
.filter_map(|e| {
let value = e.value.as_ref().filter(|v| !v.is_empty())?;
Some((e.key.clone(), value.clone()))
})
.collect();

// options from the command take precedence
options.extend(cmd.options.iter().map(|(k, v)| (k.clone(), v.clone())));

Ok(options)
}
}

#[async_trait]
impl TableProviderFactory for HudiTableFactory {
async fn create(
&self,
_state: &dyn Session,
state: &dyn Session,
cmd: &CreateExternalTable,
) -> Result<Arc<dyn TableProvider>> {
let table_provider = match cmd.options.is_empty() {
true => HudiDataSource::new(cmd.location.to_owned().as_str()).await?,
false => {
HudiDataSource::new_with_options(
cmd.location.to_owned().as_str(),
cmd.options.to_owned(),
)
.await?
}
};
let options = HudiTableFactory::resolve_options(state, cmd)?;
let base_uri = cmd.location.as_str();
let table_provider = HudiDataSource::new_with_options(base_uri, options).await?;
Ok(Arc::new(table_provider))
}
}

#[cfg(test)]
mod tests {
use super::*;
use datafusion::execution::session_state::SessionStateBuilder;
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_common::{DataFusionError, ScalarValue};
use std::fs::canonicalize;
use std::path::Path;
use std::sync::Arc;

use datafusion::execution::session_state::SessionStateBuilder;
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_common::ScalarValue;
use url::Url;

use hudi_core::config::read::HudiReadConfig::InputPartitions;
Expand All @@ -198,42 +223,83 @@ mod tests {
assert_eq!(hudi.get_input_partitions(), 0)
}

async fn prepare_session_context(
async fn register_test_table_with_session<I, K, V>(
test_table: &TestTable,
options: Vec<(&str, &str)>,
) -> SessionContext {
let config = SessionConfig::new().set(
"datafusion.sql_parser.enable_ident_normalization",
&ScalarValue::from(false),
);
let ctx = SessionContext::new_with_config(config);
let base_url = test_table.url();
let hudi = HudiDataSource::new_with_options(base_url.as_str(), options)
.await
.unwrap();
ctx.register_table(test_table.as_ref(), Arc::new(hudi))
.unwrap();
ctx
options: I,
use_sql: bool,
) -> Result<SessionContext, DataFusionError>
where
I: IntoIterator<Item = (K, V)>,
K: AsRef<str>,
V: Into<String>,
{
let ctx = create_test_session().await;
if use_sql {
let create_table_sql = format!(
"CREATE EXTERNAL TABLE {} STORED AS HUDI LOCATION '{}' {}",
test_table.as_ref(),
test_table.path(),
concat_as_sql_options(options)
);
ctx.sql(create_table_sql.as_str()).await?;
} else {
let base_url = test_table.url();
let hudi = HudiDataSource::new_with_options(base_url.as_str(), options).await?;
ctx.register_table(test_table.as_ref(), Arc::new(hudi))?;
}
Ok(ctx)
}

async fn prepare_session_context_with_table_factory() -> SessionContext {
async fn create_test_session() -> SessionContext {
let config = SessionConfig::new().set(
"datafusion.sql_parser.enable_ident_normalization",
&ScalarValue::from(false),
);

let mut session_state = SessionStateBuilder::new()
.with_default_features()
.with_config(config)
.build();

session_state
.table_factories_mut()
.insert("HUDITABLE".to_string(), Arc::new(HudiTableFactory {}));
.insert("HUDI".to_string(), Arc::new(HudiTableFactory::new()));

SessionContext::new_with_state(session_state)
}

fn concat_as_sql_options<I, K, V>(options: I) -> String
where
I: IntoIterator<Item = (K, V)>,
K: AsRef<str>,
V: Into<String>,
{
let kv_pairs: Vec<String> = options
.into_iter()
.map(|(k, v)| format!("'{}' '{}'", k.as_ref(), v.into()))
.collect();

if kv_pairs.is_empty() {
String::new()
} else {
format!("OPTIONS ({})", kv_pairs.join(", "))
}
}

#[tokio::test]
async fn test_create_table_with_unknown_format() {
let test_table = V6Nonpartitioned;
let invalid_format = "UNKNOWN_FORMAT";
let create_table_sql = format!(
"CREATE EXTERNAL TABLE {} STORED AS {} LOCATION '{}'",
test_table.as_ref(),
invalid_format,
test_table.path()
);

let ctx = create_test_session().await;
let result = ctx.sql(create_table_sql.as_str()).await;
assert!(result.is_err());
}

async fn verify_plan(
ctx: &SessionContext,
sql: &str,
Expand Down Expand Up @@ -272,16 +338,18 @@ mod tests {

#[tokio::test]
async fn datafusion_read_hudi_table() {
for (test_table, planned_input_partitions) in &[
(V6ComplexkeygenHivestyle, 2),
(V6Nonpartitioned, 1),
(V6SimplekeygenNonhivestyle, 2),
(V6SimplekeygenHivestyleNoMetafields, 2),
(V6TimebasedkeygenNonhivestyle, 2),
for (test_table, use_sql, planned_input_partitions) in &[
(V6ComplexkeygenHivestyle, true, 2),
(V6Nonpartitioned, true, 1),
(V6SimplekeygenNonhivestyle, false, 2),
(V6SimplekeygenHivestyleNoMetafields, true, 2),
(V6TimebasedkeygenNonhivestyle, false, 2),
] {
println!(">>> testing for {}", test_table.as_ref());
let options = vec![(InputPartitions.as_ref(), "2")];
let ctx = prepare_session_context(test_table, options).await;
let options = [(InputPartitions, "2")];
let ctx = register_test_table_with_session(test_table, options, *use_sql)
.await
.unwrap();

let sql = format!(
r#"
Expand Down Expand Up @@ -311,81 +379,14 @@ mod tests {

#[tokio::test]
async fn datafusion_read_hudi_table_with_replacecommits() {
for (test_table, planned_input_partitions) in
&[(V6SimplekeygenNonhivestyleOverwritetable, 1)]
for (test_table, use_sql, planned_input_partitions) in
&[(V6SimplekeygenNonhivestyleOverwritetable, true, 1)]
{
println!(">>> testing for {}", test_table.as_ref());
let ctx =
prepare_session_context(test_table, vec![(InputPartitions.as_ref(), "2")]).await;

let sql = format!(
r#"
SELECT id, name, isActive, structField.field2
FROM {} WHERE id % 2 = 0
AND structField.field2 > 30 ORDER BY name LIMIT 10"#,
test_table.as_ref()
);

verify_plan(&ctx, &sql, test_table.as_ref(), planned_input_partitions).await;
verify_data_with_replacecommits(&ctx, &sql, test_table.as_ref()).await
}
}

#[tokio::test]
async fn datafusion_read_external_hudi_table() {
for test_table in &[
V6ComplexkeygenHivestyle,
V6Nonpartitioned,
V6SimplekeygenNonhivestyle,
V6SimplekeygenHivestyleNoMetafields,
V6TimebasedkeygenNonhivestyle,
] {
println!(">>> testing for {}", test_table.as_ref());
let ctx = prepare_session_context_with_table_factory().await;
let base_path = test_table.path();

let create_table_sql = format!(
"CREATE EXTERNAL TABLE {} STORED AS HUDITABLE LOCATION '{}'",
test_table.as_ref(),
base_path
);

let _ = ctx
.sql(create_table_sql.as_str())
.await
.expect("Failed to register table");

let sql = format!(
r#"
SELECT id, name, isActive, structField.field2
FROM {} WHERE id % 2 = 0
AND structField.field2 > 30 ORDER BY name LIMIT 10"#,
test_table.as_ref()
);

verify_data(&ctx, &sql, test_table.as_ref()).await
}
}

#[tokio::test]
async fn datafusion_read_external_hudi_table_with_replacecommits() {
for (test_table, planned_input_partitions) in
&[(V6SimplekeygenNonhivestyleOverwritetable, 1)]
{
println!(">>> testing for {}", test_table.as_ref());
let ctx = prepare_session_context_with_table_factory().await;
let base_path = test_table.path();

let create_table_sql = format!(
"CREATE EXTERNAL TABLE {} STORED AS HUDITABLE LOCATION '{}' OPTIONS ('hoodie.read.input.partitions' '2');",
test_table.as_ref(),
base_path
);

let _ = ctx
.sql(create_table_sql.as_str())
.await
.expect("Failed to register table");
register_test_table_with_session(test_table, [(InputPartitions, "2")], *use_sql)
.await
.unwrap();

let sql = format!(
r#"
Expand Down

0 comments on commit caf1233

Please sign in to comment.