diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index 5cb51c7..0d802e4 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -446,6 +446,7 @@ impl Table { #[cfg(test)] mod tests { + use arrow_array::StringArray; use std::collections::HashSet; use std::fs::canonicalize; use std::path::PathBuf; @@ -551,158 +552,6 @@ mod tests { assert_eq!(fields, vec!["ts_str"]); } - #[tokio::test] - async fn hudi_table_read_file_slice() { - let base_url = TestTable::V6Nonpartitioned.url(); - let hudi_table = Table::new(base_url.path()).await.unwrap(); - let batches = hudi_table - .read_file_slice_by_path( - "a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet", - ) - .await - .unwrap(); - assert_eq!(batches.num_rows(), 4); - assert_eq!(batches.num_columns(), 21); - } - - #[tokio::test] - async fn hudi_table_get_file_paths_for_complex_keygen_and_hive_style() { - let base_url = TestTable::V6ComplexkeygenHivestyle.url(); - let hudi_table = Table::new(base_url.path()).await.unwrap(); - assert_eq!(hudi_table.timeline.instants.len(), 2); - - let partition_filters = &[]; - let actual: HashSet = HashSet::from_iter( - hudi_table - .get_file_paths_with_filters(partition_filters) - .await - .unwrap(), - ); - let expected: HashSet = HashSet::from_iter(vec![ - "byteField=10/shortField=300/a22e8257-e249-45e9-ba46-115bc85adcba-0_0-161-223_20240418173235694.parquet", - "byteField=20/shortField=100/bb7c3a45-387f-490d-aab2-981c3f1a8ada-0_0-140-198_20240418173213674.parquet", - "byteField=30/shortField=100/4668e35e-bff8-4be9-9ff2-e7fb17ecb1a7-0_1-161-224_20240418173235694.parquet", - ] - .into_iter().map(|f| { join_url_segments(&base_url, &[f]).unwrap().to_string() }) - .collect::>()); - assert_eq!(actual, expected); - - let partition_filters = &["byteField >= 10", "byteField < 20", "shortField != 100"]; - let actual: HashSet = HashSet::from_iter( - hudi_table - .get_file_paths_with_filters(partition_filters) - .await - .unwrap(), - ); - let expected: HashSet = HashSet::from_iter(vec![ - "byteField=10/shortField=300/a22e8257-e249-45e9-ba46-115bc85adcba-0_0-161-223_20240418173235694.parquet", - ] - .into_iter().map(|f| { join_url_segments(&base_url, &[f]).unwrap().to_string() }) - .collect::>()); - assert_eq!(actual, expected); - } - - #[tokio::test] - async fn hudi_table_get_file_paths_for_simple_key_and_non_hive_style() { - let base_url = TestTable::V6SimplekeygenNonhivestyle.url(); - let hudi_table = Table::new(base_url.path()).await.unwrap(); - assert_eq!(hudi_table.timeline.instants.len(), 2); - - let partition_filters = &[]; - let actual: HashSet = HashSet::from_iter( - hudi_table - .get_file_paths_with_filters(partition_filters) - .await - .unwrap(), - ); - let expected: HashSet = HashSet::from_iter( - vec![ - "10/97de74b1-2a8e-4bb7-874c-0a74e1f42a77-0_0-119-166_20240418172804498.parquet", - "20/76e0556b-390d-4249-b7ad-9059e2bc2cbd-0_0-98-141_20240418172802262.parquet", - "30/6db57019-98ee-480e-8eb1-fb3de48e1c24-0_1-119-167_20240418172804498.parquet", - ] - .into_iter() - .map(|f| join_url_segments(&base_url, &[f]).unwrap().to_string()) - .collect::>(), - ); - assert_eq!(actual, expected); - - let partition_filters = &["byteField >= 10", "byteField < 30"]; - let actual: HashSet = HashSet::from_iter( - hudi_table - .get_file_paths_with_filters(partition_filters) - .await - .unwrap(), - ); - let expected: HashSet = HashSet::from_iter( - vec![ - "10/97de74b1-2a8e-4bb7-874c-0a74e1f42a77-0_0-119-166_20240418172804498.parquet", - "20/76e0556b-390d-4249-b7ad-9059e2bc2cbd-0_0-98-141_20240418172802262.parquet", - ] - .into_iter() - .map(|f| join_url_segments(&base_url, &[f]).unwrap().to_string()) - .collect::>(), - ); - assert_eq!(actual, expected); - } - - #[tokio::test] - async fn hudi_table_get_file_slices_as_of_timestamps() { - let base_url = TestTable::V6Nonpartitioned.url(); - - let hudi_table = Table::new(base_url.path()).await.unwrap(); - let file_slices = hudi_table.get_file_slices(&[]).await.unwrap(); - assert_eq!( - file_slices - .iter() - .map(|f| f.base_file_relative_path()) - .collect::>(), - vec!["a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",] - ); - - // as of the latest timestamp - let opts = [(AsOfTimestamp.as_ref(), "20240418173551906")]; - let hudi_table = Table::new_with_options(base_url.path(), opts) - .await - .unwrap(); - let file_slices = hudi_table.get_file_slices(&[]).await.unwrap(); - assert_eq!( - file_slices - .iter() - .map(|f| f.base_file_relative_path()) - .collect::>(), - vec!["a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",] - ); - - // as of just smaller than the latest timestamp - let opts = [(AsOfTimestamp.as_ref(), "20240418173551905")]; - let hudi_table = Table::new_with_options(base_url.path(), opts) - .await - .unwrap(); - let file_slices = hudi_table.get_file_slices(&[]).await.unwrap(); - assert_eq!( - file_slices - .iter() - .map(|f| f.base_file_relative_path()) - .collect::>(), - vec!["a079bdb3-731c-4894-b855-abfcd6921007-0_0-182-253_20240418173550988.parquet",] - ); - - // as of non-exist old timestamp - let opts = [(AsOfTimestamp.as_ref(), "0")]; - let hudi_table = Table::new_with_options(base_url.path(), opts) - .await - .unwrap(); - let file_slices = hudi_table.get_file_slices(&[]).await.unwrap(); - assert_eq!( - file_slices - .iter() - .map(|f| f.base_file_relative_path()) - .collect::>(), - Vec::::new() - ); - } - #[tokio::test] async fn validate_invalid_table_props() { let table = get_test_table_without_validation("table_props_invalid").await; @@ -871,4 +720,196 @@ mod tests { assert_eq!(configs.get(TableName).unwrap().to::(), "trips"); env::remove_var(HUDI_CONF_DIR) } + + #[tokio::test] + async fn hudi_table_read_file_slice() { + let base_url = TestTable::V6Nonpartitioned.url(); + let hudi_table = Table::new(base_url.path()).await.unwrap(); + let batches = hudi_table + .read_file_slice_by_path( + "a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet", + ) + .await + .unwrap(); + assert_eq!(batches.num_rows(), 4); + assert_eq!(batches.num_columns(), 21); + } + + #[tokio::test] + async fn hudi_table_get_file_slices_as_of_timestamps() { + let base_url = TestTable::V6Nonpartitioned.url(); + + let hudi_table = Table::new(base_url.path()).await.unwrap(); + let file_slices = hudi_table.get_file_slices(&[]).await.unwrap(); + assert_eq!( + file_slices + .iter() + .map(|f| f.base_file_relative_path()) + .collect::>(), + vec!["a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",] + ); + + // as of the latest timestamp + let opts = [(AsOfTimestamp.as_ref(), "20240418173551906")]; + let hudi_table = Table::new_with_options(base_url.path(), opts) + .await + .unwrap(); + let file_slices = hudi_table.get_file_slices(&[]).await.unwrap(); + assert_eq!( + file_slices + .iter() + .map(|f| f.base_file_relative_path()) + .collect::>(), + vec!["a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",] + ); + + // as of just smaller than the latest timestamp + let opts = [(AsOfTimestamp.as_ref(), "20240418173551905")]; + let hudi_table = Table::new_with_options(base_url.path(), opts) + .await + .unwrap(); + let file_slices = hudi_table.get_file_slices(&[]).await.unwrap(); + assert_eq!( + file_slices + .iter() + .map(|f| f.base_file_relative_path()) + .collect::>(), + vec!["a079bdb3-731c-4894-b855-abfcd6921007-0_0-182-253_20240418173550988.parquet",] + ); + + // as of non-exist old timestamp + let opts = [(AsOfTimestamp.as_ref(), "0")]; + let hudi_table = Table::new_with_options(base_url.path(), opts) + .await + .unwrap(); + let file_slices = hudi_table.get_file_slices(&[]).await.unwrap(); + assert_eq!( + file_slices + .iter() + .map(|f| f.base_file_relative_path()) + .collect::>(), + Vec::::new() + ); + } + + #[tokio::test] + async fn hudi_table_get_file_paths_for_simple_key_and_non_hive_style() { + let base_url = TestTable::V6SimplekeygenNonhivestyle.url(); + let hudi_table = Table::new(base_url.path()).await.unwrap(); + assert_eq!(hudi_table.timeline.instants.len(), 2); + + let partition_filters = &[]; + let actual: HashSet = HashSet::from_iter( + hudi_table + .get_file_paths_with_filters(partition_filters) + .await + .unwrap(), + ); + let expected: HashSet = HashSet::from_iter( + vec![ + "10/97de74b1-2a8e-4bb7-874c-0a74e1f42a77-0_0-119-166_20240418172804498.parquet", + "20/76e0556b-390d-4249-b7ad-9059e2bc2cbd-0_0-98-141_20240418172802262.parquet", + "30/6db57019-98ee-480e-8eb1-fb3de48e1c24-0_1-119-167_20240418172804498.parquet", + ] + .into_iter() + .map(|f| join_url_segments(&base_url, &[f]).unwrap().to_string()) + .collect::>(), + ); + assert_eq!(actual, expected); + + let partition_filters = &["byteField >= 10", "byteField < 30"]; + let actual: HashSet = HashSet::from_iter( + hudi_table + .get_file_paths_with_filters(partition_filters) + .await + .unwrap(), + ); + let expected: HashSet = HashSet::from_iter( + vec![ + "10/97de74b1-2a8e-4bb7-874c-0a74e1f42a77-0_0-119-166_20240418172804498.parquet", + "20/76e0556b-390d-4249-b7ad-9059e2bc2cbd-0_0-98-141_20240418172802262.parquet", + ] + .into_iter() + .map(|f| join_url_segments(&base_url, &[f]).unwrap().to_string()) + .collect::>(), + ); + assert_eq!(actual, expected); + } + + #[tokio::test] + async fn hudi_table_get_file_paths_for_complex_keygen_and_hive_style() { + let base_url = TestTable::V6ComplexkeygenHivestyle.url(); + let hudi_table = Table::new(base_url.path()).await.unwrap(); + assert_eq!(hudi_table.timeline.instants.len(), 2); + + let partition_filters = &[]; + let actual: HashSet = HashSet::from_iter( + hudi_table + .get_file_paths_with_filters(partition_filters) + .await + .unwrap(), + ); + let expected: HashSet = HashSet::from_iter(vec![ + "byteField=10/shortField=300/a22e8257-e249-45e9-ba46-115bc85adcba-0_0-161-223_20240418173235694.parquet", + "byteField=20/shortField=100/bb7c3a45-387f-490d-aab2-981c3f1a8ada-0_0-140-198_20240418173213674.parquet", + "byteField=30/shortField=100/4668e35e-bff8-4be9-9ff2-e7fb17ecb1a7-0_1-161-224_20240418173235694.parquet", + ] + .into_iter().map(|f| { join_url_segments(&base_url, &[f]).unwrap().to_string() }) + .collect::>()); + assert_eq!(actual, expected); + + let partition_filters = &["byteField >= 10", "byteField < 20", "shortField != 100"]; + let actual: HashSet = HashSet::from_iter( + hudi_table + .get_file_paths_with_filters(partition_filters) + .await + .unwrap(), + ); + let expected: HashSet = HashSet::from_iter(vec![ + "byteField=10/shortField=300/a22e8257-e249-45e9-ba46-115bc85adcba-0_0-161-223_20240418173235694.parquet", + ] + .into_iter().map(|f| { join_url_segments(&base_url, &[f]).unwrap().to_string() }) + .collect::>()); + assert_eq!(actual, expected); + } + + #[tokio::test] + async fn hudi_table_read_snapshot_for_complex_keygen_and_hive_style() { + let base_url = TestTable::V6ComplexkeygenHivestyle.url(); + let hudi_table = Table::new(base_url.path()).await.unwrap(); + let partition_filters = &["byteField >= 10", "byteField < 20", "shortField != 100"]; + let records = hudi_table.read_snapshot(partition_filters).await.unwrap(); + assert_eq!(records.len(), 1); + assert_eq!(records[0].num_rows(), 2); + let actual_partition_paths: HashSet<&str> = HashSet::from_iter( + records[0] + .column_by_name("_hoodie_partition_path") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|s| s.unwrap()) + .collect::>(), + ); + let expected_partition_paths: HashSet<&str> = + HashSet::from_iter(vec!["byteField=10/shortField=300"]); + assert_eq!(actual_partition_paths, expected_partition_paths); + + let actual_file_names: HashSet<&str> = HashSet::from_iter( + records[0] + .column_by_name("_hoodie_file_name") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|s| s.unwrap()) + .collect::>(), + ); + let expected_file_names: HashSet<&str> = HashSet::from_iter(vec![ + "a22e8257-e249-45e9-ba46-115bc85adcba-0_0-161-223_20240418173235694.parquet", + ]); + assert_eq!(actual_file_names, expected_file_names); + } }