Skip to content

Commit

Permalink
add ut
Browse files Browse the repository at this point in the history
  • Loading branch information
xushiyan committed Oct 5, 2024
1 parent 2ecd30c commit f1ce54d
Showing 1 changed file with 193 additions and 152 deletions.
345 changes: 193 additions & 152 deletions crates/core/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,7 @@ impl Table {

#[cfg(test)]
mod tests {
use arrow_array::StringArray;
use std::collections::HashSet;
use std::fs::canonicalize;
use std::path::PathBuf;
Expand Down Expand Up @@ -551,158 +552,6 @@ mod tests {
assert_eq!(fields, vec!["ts_str"]);
}

#[tokio::test]
async fn hudi_table_read_file_slice() {
let base_url = TestTable::V6Nonpartitioned.url();
let hudi_table = Table::new(base_url.path()).await.unwrap();
let batches = hudi_table
.read_file_slice_by_path(
"a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",
)
.await
.unwrap();
assert_eq!(batches.num_rows(), 4);
assert_eq!(batches.num_columns(), 21);
}

#[tokio::test]
async fn hudi_table_get_file_paths_for_complex_keygen_and_hive_style() {
let base_url = TestTable::V6ComplexkeygenHivestyle.url();
let hudi_table = Table::new(base_url.path()).await.unwrap();
assert_eq!(hudi_table.timeline.instants.len(), 2);

let partition_filters = &[];
let actual: HashSet<String> = HashSet::from_iter(
hudi_table
.get_file_paths_with_filters(partition_filters)
.await
.unwrap(),
);
let expected: HashSet<String> = HashSet::from_iter(vec![
"byteField=10/shortField=300/a22e8257-e249-45e9-ba46-115bc85adcba-0_0-161-223_20240418173235694.parquet",
"byteField=20/shortField=100/bb7c3a45-387f-490d-aab2-981c3f1a8ada-0_0-140-198_20240418173213674.parquet",
"byteField=30/shortField=100/4668e35e-bff8-4be9-9ff2-e7fb17ecb1a7-0_1-161-224_20240418173235694.parquet",
]
.into_iter().map(|f| { join_url_segments(&base_url, &[f]).unwrap().to_string() })
.collect::<Vec<_>>());
assert_eq!(actual, expected);

let partition_filters = &["byteField >= 10", "byteField < 20", "shortField != 100"];
let actual: HashSet<String> = HashSet::from_iter(
hudi_table
.get_file_paths_with_filters(partition_filters)
.await
.unwrap(),
);
let expected: HashSet<String> = HashSet::from_iter(vec![
"byteField=10/shortField=300/a22e8257-e249-45e9-ba46-115bc85adcba-0_0-161-223_20240418173235694.parquet",
]
.into_iter().map(|f| { join_url_segments(&base_url, &[f]).unwrap().to_string() })
.collect::<Vec<_>>());
assert_eq!(actual, expected);
}

#[tokio::test]
async fn hudi_table_get_file_paths_for_simple_key_and_non_hive_style() {
let base_url = TestTable::V6SimplekeygenNonhivestyle.url();
let hudi_table = Table::new(base_url.path()).await.unwrap();
assert_eq!(hudi_table.timeline.instants.len(), 2);

let partition_filters = &[];
let actual: HashSet<String> = HashSet::from_iter(
hudi_table
.get_file_paths_with_filters(partition_filters)
.await
.unwrap(),
);
let expected: HashSet<String> = HashSet::from_iter(
vec![
"10/97de74b1-2a8e-4bb7-874c-0a74e1f42a77-0_0-119-166_20240418172804498.parquet",
"20/76e0556b-390d-4249-b7ad-9059e2bc2cbd-0_0-98-141_20240418172802262.parquet",
"30/6db57019-98ee-480e-8eb1-fb3de48e1c24-0_1-119-167_20240418172804498.parquet",
]
.into_iter()
.map(|f| join_url_segments(&base_url, &[f]).unwrap().to_string())
.collect::<Vec<_>>(),
);
assert_eq!(actual, expected);

let partition_filters = &["byteField >= 10", "byteField < 30"];
let actual: HashSet<String> = HashSet::from_iter(
hudi_table
.get_file_paths_with_filters(partition_filters)
.await
.unwrap(),
);
let expected: HashSet<String> = HashSet::from_iter(
vec![
"10/97de74b1-2a8e-4bb7-874c-0a74e1f42a77-0_0-119-166_20240418172804498.parquet",
"20/76e0556b-390d-4249-b7ad-9059e2bc2cbd-0_0-98-141_20240418172802262.parquet",
]
.into_iter()
.map(|f| join_url_segments(&base_url, &[f]).unwrap().to_string())
.collect::<Vec<_>>(),
);
assert_eq!(actual, expected);
}

#[tokio::test]
async fn hudi_table_get_file_slices_as_of_timestamps() {
let base_url = TestTable::V6Nonpartitioned.url();

let hudi_table = Table::new(base_url.path()).await.unwrap();
let file_slices = hudi_table.get_file_slices(&[]).await.unwrap();
assert_eq!(
file_slices
.iter()
.map(|f| f.base_file_relative_path())
.collect::<Vec<_>>(),
vec!["a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",]
);

// as of the latest timestamp
let opts = [(AsOfTimestamp.as_ref(), "20240418173551906")];
let hudi_table = Table::new_with_options(base_url.path(), opts)
.await
.unwrap();
let file_slices = hudi_table.get_file_slices(&[]).await.unwrap();
assert_eq!(
file_slices
.iter()
.map(|f| f.base_file_relative_path())
.collect::<Vec<_>>(),
vec!["a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",]
);

// as of just smaller than the latest timestamp
let opts = [(AsOfTimestamp.as_ref(), "20240418173551905")];
let hudi_table = Table::new_with_options(base_url.path(), opts)
.await
.unwrap();
let file_slices = hudi_table.get_file_slices(&[]).await.unwrap();
assert_eq!(
file_slices
.iter()
.map(|f| f.base_file_relative_path())
.collect::<Vec<_>>(),
vec!["a079bdb3-731c-4894-b855-abfcd6921007-0_0-182-253_20240418173550988.parquet",]
);

// as of non-exist old timestamp
let opts = [(AsOfTimestamp.as_ref(), "0")];
let hudi_table = Table::new_with_options(base_url.path(), opts)
.await
.unwrap();
let file_slices = hudi_table.get_file_slices(&[]).await.unwrap();
assert_eq!(
file_slices
.iter()
.map(|f| f.base_file_relative_path())
.collect::<Vec<_>>(),
Vec::<String>::new()
);
}

#[tokio::test]
async fn validate_invalid_table_props() {
let table = get_test_table_without_validation("table_props_invalid").await;
Expand Down Expand Up @@ -871,4 +720,196 @@ mod tests {
assert_eq!(configs.get(TableName).unwrap().to::<String>(), "trips");
env::remove_var(HUDI_CONF_DIR)
}

#[tokio::test]
async fn hudi_table_read_file_slice() {
let base_url = TestTable::V6Nonpartitioned.url();
let hudi_table = Table::new(base_url.path()).await.unwrap();
let batches = hudi_table
.read_file_slice_by_path(
"a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",
)
.await
.unwrap();
assert_eq!(batches.num_rows(), 4);
assert_eq!(batches.num_columns(), 21);
}

#[tokio::test]
async fn hudi_table_get_file_slices_as_of_timestamps() {
let base_url = TestTable::V6Nonpartitioned.url();

let hudi_table = Table::new(base_url.path()).await.unwrap();
let file_slices = hudi_table.get_file_slices(&[]).await.unwrap();
assert_eq!(
file_slices
.iter()
.map(|f| f.base_file_relative_path())
.collect::<Vec<_>>(),
vec!["a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",]
);

// as of the latest timestamp
let opts = [(AsOfTimestamp.as_ref(), "20240418173551906")];
let hudi_table = Table::new_with_options(base_url.path(), opts)
.await
.unwrap();
let file_slices = hudi_table.get_file_slices(&[]).await.unwrap();
assert_eq!(
file_slices
.iter()
.map(|f| f.base_file_relative_path())
.collect::<Vec<_>>(),
vec!["a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",]
);

// as of just smaller than the latest timestamp
let opts = [(AsOfTimestamp.as_ref(), "20240418173551905")];
let hudi_table = Table::new_with_options(base_url.path(), opts)
.await
.unwrap();
let file_slices = hudi_table.get_file_slices(&[]).await.unwrap();
assert_eq!(
file_slices
.iter()
.map(|f| f.base_file_relative_path())
.collect::<Vec<_>>(),
vec!["a079bdb3-731c-4894-b855-abfcd6921007-0_0-182-253_20240418173550988.parquet",]
);

// as of non-exist old timestamp
let opts = [(AsOfTimestamp.as_ref(), "0")];
let hudi_table = Table::new_with_options(base_url.path(), opts)
.await
.unwrap();
let file_slices = hudi_table.get_file_slices(&[]).await.unwrap();
assert_eq!(
file_slices
.iter()
.map(|f| f.base_file_relative_path())
.collect::<Vec<_>>(),
Vec::<String>::new()
);
}

#[tokio::test]
async fn hudi_table_get_file_paths_for_simple_key_and_non_hive_style() {
let base_url = TestTable::V6SimplekeygenNonhivestyle.url();
let hudi_table = Table::new(base_url.path()).await.unwrap();
assert_eq!(hudi_table.timeline.instants.len(), 2);

let partition_filters = &[];
let actual: HashSet<String> = HashSet::from_iter(
hudi_table
.get_file_paths_with_filters(partition_filters)
.await
.unwrap(),
);
let expected: HashSet<String> = HashSet::from_iter(
vec![
"10/97de74b1-2a8e-4bb7-874c-0a74e1f42a77-0_0-119-166_20240418172804498.parquet",
"20/76e0556b-390d-4249-b7ad-9059e2bc2cbd-0_0-98-141_20240418172802262.parquet",
"30/6db57019-98ee-480e-8eb1-fb3de48e1c24-0_1-119-167_20240418172804498.parquet",
]
.into_iter()
.map(|f| join_url_segments(&base_url, &[f]).unwrap().to_string())
.collect::<Vec<_>>(),
);
assert_eq!(actual, expected);

let partition_filters = &["byteField >= 10", "byteField < 30"];
let actual: HashSet<String> = HashSet::from_iter(
hudi_table
.get_file_paths_with_filters(partition_filters)
.await
.unwrap(),
);
let expected: HashSet<String> = HashSet::from_iter(
vec![
"10/97de74b1-2a8e-4bb7-874c-0a74e1f42a77-0_0-119-166_20240418172804498.parquet",
"20/76e0556b-390d-4249-b7ad-9059e2bc2cbd-0_0-98-141_20240418172802262.parquet",
]
.into_iter()
.map(|f| join_url_segments(&base_url, &[f]).unwrap().to_string())
.collect::<Vec<_>>(),
);
assert_eq!(actual, expected);
}

#[tokio::test]
async fn hudi_table_get_file_paths_for_complex_keygen_and_hive_style() {
let base_url = TestTable::V6ComplexkeygenHivestyle.url();
let hudi_table = Table::new(base_url.path()).await.unwrap();
assert_eq!(hudi_table.timeline.instants.len(), 2);

let partition_filters = &[];
let actual: HashSet<String> = HashSet::from_iter(
hudi_table
.get_file_paths_with_filters(partition_filters)
.await
.unwrap(),
);
let expected: HashSet<String> = HashSet::from_iter(vec![
"byteField=10/shortField=300/a22e8257-e249-45e9-ba46-115bc85adcba-0_0-161-223_20240418173235694.parquet",
"byteField=20/shortField=100/bb7c3a45-387f-490d-aab2-981c3f1a8ada-0_0-140-198_20240418173213674.parquet",
"byteField=30/shortField=100/4668e35e-bff8-4be9-9ff2-e7fb17ecb1a7-0_1-161-224_20240418173235694.parquet",
]
.into_iter().map(|f| { join_url_segments(&base_url, &[f]).unwrap().to_string() })
.collect::<Vec<_>>());
assert_eq!(actual, expected);

let partition_filters = &["byteField >= 10", "byteField < 20", "shortField != 100"];
let actual: HashSet<String> = HashSet::from_iter(
hudi_table
.get_file_paths_with_filters(partition_filters)
.await
.unwrap(),
);
let expected: HashSet<String> = HashSet::from_iter(vec![
"byteField=10/shortField=300/a22e8257-e249-45e9-ba46-115bc85adcba-0_0-161-223_20240418173235694.parquet",
]
.into_iter().map(|f| { join_url_segments(&base_url, &[f]).unwrap().to_string() })
.collect::<Vec<_>>());
assert_eq!(actual, expected);
}

#[tokio::test]
async fn hudi_table_read_snapshot_for_complex_keygen_and_hive_style() {
let base_url = TestTable::V6ComplexkeygenHivestyle.url();
let hudi_table = Table::new(base_url.path()).await.unwrap();
let partition_filters = &["byteField >= 10", "byteField < 20", "shortField != 100"];
let records = hudi_table.read_snapshot(partition_filters).await.unwrap();
assert_eq!(records.len(), 1);
assert_eq!(records[0].num_rows(), 2);
let actual_partition_paths: HashSet<&str> = HashSet::from_iter(
records[0]
.column_by_name("_hoodie_partition_path")
.unwrap()
.as_any()
.downcast_ref::<StringArray>()
.unwrap()
.iter()
.map(|s| s.unwrap())
.collect::<Vec<_>>(),
);
let expected_partition_paths: HashSet<&str> =
HashSet::from_iter(vec!["byteField=10/shortField=300"]);
assert_eq!(actual_partition_paths, expected_partition_paths);

let actual_file_names: HashSet<&str> = HashSet::from_iter(
records[0]
.column_by_name("_hoodie_file_name")
.unwrap()
.as_any()
.downcast_ref::<StringArray>()
.unwrap()
.iter()
.map(|s| s.unwrap())
.collect::<Vec<_>>(),
);
let expected_file_names: HashSet<&str> = HashSet::from_iter(vec![
"a22e8257-e249-45e9-ba46-115bc85adcba-0_0-161-223_20240418173235694.parquet",
]);
assert_eq!(actual_file_names, expected_file_names);
}
}

0 comments on commit f1ce54d

Please sign in to comment.