Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support partition prune api #119

Merged
merged 6 commits into from
Oct 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,14 @@ datafusion-common = { version = "= 41.0.0" }
datafusion-physical-expr = { version = "= 41.0.0" }

# serde
percent-encoding = { version = "2.3.1" }
serde = { version = "1.0.203", features = ["derive"] }
serde_json = { version = "1" }

# "stdlib"
anyhow = { version = "1.0.86" }
bytes = { version = "1" }
once_cell = { version = "1.19.0" }
strum = { version = "0.26.3", features = ["derive"] }
strum_macros = "0.26.4"
url = { version = "2" }
Expand Down
2 changes: 2 additions & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ datafusion = { workspace = true, optional = true }
datafusion-expr = { workspace = true, optional = true }
datafusion-common = { workspace = true, optional = true }
datafusion-physical-expr = { workspace = true, optional = true }
percent-encoding = { workspace = true }
once_cell = { workspace = true }

[dev-dependencies]
hudi-tests = { path = "../tests" }
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/config/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ impl ConfigParser for HudiTableConfig {
match self {
Self::DatabaseName => Some(HudiConfigValue::String("default".to_string())),
Self::DropsPartitionFields => Some(HudiConfigValue::Boolean(false)),
Self::PartitionFields => Some(HudiConfigValue::List(vec![])),
Self::PopulatesMetaFields => Some(HudiConfigValue::Boolean(true)),
_ => None,
}
Expand Down
3 changes: 2 additions & 1 deletion crates/core/src/file_group/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,8 @@ mod tests {
.base_file
.commit_time,
"20240402123035233"
)
);
assert!(fg.get_file_slice_as_of("-1").is_none());
}

#[test]
Expand Down
175 changes: 131 additions & 44 deletions crates/core/src/table/fs_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use anyhow::{anyhow, Result};
use arrow::record_batch::RecordBatch;
use dashmap::DashMap;
use url::Url;

use crate::config::HudiConfigs;
use crate::file_group::{BaseFile, FileGroup, FileSlice};
use crate::storage::file_info::FileInfo;
use crate::storage::{get_leaf_dirs, Storage};
use crate::table::partition::PartitionPruner;
use anyhow::{anyhow, Result};
use arrow::record_batch::RecordBatch;
use dashmap::DashMap;
use url::Url;

#[derive(Clone, Debug)]
#[allow(dead_code)]
Expand All @@ -45,18 +45,18 @@
configs: Arc<HudiConfigs>,
) -> Result<Self> {
let storage = Storage::new(base_url, &storage_options)?;
let partition_paths = Self::load_partition_paths(&storage).await?;
let partition_to_file_groups =
Self::load_file_groups_for_partitions(&storage, partition_paths).await?;
let partition_to_file_groups = Arc::new(DashMap::from_iter(partition_to_file_groups));
let partition_to_file_groups = Arc::new(DashMap::new());

Check warning on line 48 in crates/core/src/table/fs_view.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/table/fs_view.rs#L48

Added line #L48 was not covered by tests
Ok(FileSystemView {
configs,
storage,
partition_to_file_groups,
})
}

async fn load_partition_paths(storage: &Storage) -> Result<Vec<String>> {
async fn load_partition_paths(
storage: &Storage,
partition_pruner: &PartitionPruner,
) -> Result<Vec<String>> {
let top_level_dirs: Vec<String> = storage
.list_dirs(None)
.await?
Expand All @@ -70,7 +70,14 @@
if partition_paths.is_empty() {
partition_paths.push("".to_string())
}
Ok(partition_paths)
if partition_pruner.is_empty() {
return Ok(partition_paths);
}

Ok(partition_paths
xushiyan marked this conversation as resolved.
Show resolved Hide resolved
.into_iter()
.filter(|path_str| partition_pruner.should_include(path_str))
.collect())
}

async fn load_file_groups_for_partitions(
Expand Down Expand Up @@ -121,55 +128,50 @@
Ok(file_groups)
}

pub fn get_file_slices_as_of(
pub async fn get_file_slices_as_of(
&self,
timestamp: &str,
partition_pruner: &PartitionPruner,
excluding_file_groups: &HashSet<FileGroup>,
) -> Result<Vec<FileSlice>> {
let mut file_slices = Vec::new();
for fgs in self.partition_to_file_groups.iter() {
let fgs_ref = fgs.value();
if self.partition_to_file_groups.is_empty() {
let partition_paths =
Self::load_partition_paths(&self.storage, partition_pruner).await?;
let partition_to_file_groups =
Self::load_file_groups_for_partitions(&self.storage, partition_paths).await?;
partition_to_file_groups.into_iter().for_each(|pair| {
self.partition_to_file_groups.insert(pair.0, pair.1);
});
}
for mut fgs in self
.partition_to_file_groups
.iter_mut()
.filter(|item| partition_pruner.should_include(item.key()))
{
let fgs_ref = fgs.value_mut();
for fg in fgs_ref {
if excluding_file_groups.contains(fg) {
continue;
}
if let Some(fsl) = fg.get_file_slice_as_of(timestamp) {
if let Some(fsl) = fg.get_file_slice_mut_as_of(timestamp) {
// TODO: pass ref instead of copying
file_slices.push(fsl.clone());
fsl.load_stats(&self.storage).await?;
let immut_fsl: &FileSlice = fsl;
file_slices.push(immut_fsl.clone());
}
}
}
Ok(file_slices)
}

pub async fn load_file_slices_stats_as_of(
&self,
timestamp: &str,
exclude_file_groups: &HashSet<FileGroup>,
) -> Result<()> {
for mut fgs in self.partition_to_file_groups.iter_mut() {
let fgs_ref = fgs.value_mut();
for fg in fgs_ref {
if exclude_file_groups.contains(fg) {
continue;
}
if let Some(file_slice) = fg.get_file_slice_mut_as_of(timestamp) {
file_slice
.load_stats(&self.storage)
.await
.expect("Successful loading file stats.");
}
}
}
Ok(())
}

pub async fn read_file_slice_by_path_unchecked(
&self,
relative_path: &str,
) -> Result<RecordBatch> {
self.storage.get_parquet_file_data(relative_path).await
}

pub async fn read_file_slice_unchecked(&self, file_slice: &FileSlice) -> Result<RecordBatch> {
self.read_file_slice_by_path_unchecked(&file_slice.base_file_relative_path())
.await
Expand All @@ -178,20 +180,22 @@

#[cfg(test)]
mod tests {
use hudi_tests::TestTable;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use hudi_tests::TestTable;

use crate::config::HudiConfigs;
use crate::storage::Storage;
use crate::table::fs_view::FileSystemView;
use crate::table::partition::PartitionPruner;
use crate::table::Table;

#[tokio::test]
async fn get_partition_paths_for_nonpartitioned_table() {
let base_url = TestTable::V6Nonpartitioned.url();
let storage = Storage::new(Arc::new(base_url), &HashMap::new()).unwrap();
let partition_paths = FileSystemView::load_partition_paths(&storage)
let partition_pruner = PartitionPruner::empty();
let partition_paths = FileSystemView::load_partition_paths(&storage, &partition_pruner)
.await
.unwrap();
let partition_path_set: HashSet<&str> =
Expand All @@ -203,7 +207,8 @@
async fn get_partition_paths_for_complexkeygen_table() {
let base_url = TestTable::V6ComplexkeygenHivestyle.url();
let storage = Storage::new(Arc::new(base_url), &HashMap::new()).unwrap();
let partition_paths = FileSystemView::load_partition_paths(&storage)
let partition_pruner = PartitionPruner::empty();
let partition_paths = FileSystemView::load_partition_paths(&storage, &partition_pruner)
.await
.unwrap();
let partition_path_set: HashSet<&str> =
Expand All @@ -229,15 +234,97 @@
.await
.unwrap();

assert!(fs_view.partition_to_file_groups.is_empty());
let partition_pruner = PartitionPruner::empty();
let excludes = HashSet::new();
let file_slices = fs_view
.get_file_slices_as_of("20240418173551906", &excludes)
.get_file_slices_as_of("20240418173551906", &partition_pruner, &excludes)
.await
.unwrap();
assert_eq!(fs_view.partition_to_file_groups.len(), 1);
assert_eq!(file_slices.len(), 1);
let fg_ids = file_slices
.iter()
.map(|fsl| fsl.file_group_id())
.collect::<Vec<_>>();
assert_eq!(fg_ids, vec!["a079bdb3-731c-4894-b855-abfcd6921007-0"]);
for fsl in file_slices.iter() {
assert_eq!(fsl.base_file.stats.as_ref().unwrap().num_records, 4);
}
}

#[tokio::test]
async fn fs_view_get_latest_file_slices_with_replace_commit() {
let base_url = TestTable::V6SimplekeygenNonhivestyleOverwritetable.url();
let hudi_table = Table::new(base_url.path()).await.unwrap();
let fs_view = FileSystemView::new(
Arc::new(base_url),
Arc::new(HashMap::new()),
Arc::new(HudiConfigs::empty()),
)
.await
.unwrap();

assert_eq!(fs_view.partition_to_file_groups.len(), 0);
let partition_pruner = PartitionPruner::empty();
let excludes = &hudi_table
.timeline
.get_replaced_file_groups()
.await
.unwrap();
let file_slices = fs_view
.get_file_slices_as_of("20240707001303088", &partition_pruner, excludes)
.await
.unwrap();
assert_eq!(fs_view.partition_to_file_groups.len(), 3);
assert_eq!(file_slices.len(), 1);
let fg_ids = file_slices
.iter()
.map(|fsl| fsl.file_group_id())
.collect::<Vec<_>>();
assert_eq!(fg_ids, vec!["a079bdb3-731c-4894-b855-abfcd6921007-0"])
assert_eq!(fg_ids, vec!["ebcb261d-62d3-4895-90ec-5b3c9622dff4-0"]);
for fsl in file_slices.iter() {
assert_eq!(fsl.base_file.stats.as_ref().unwrap().num_records, 1);
}
}

#[tokio::test]
async fn fs_view_get_latest_file_slices_with_partition_filters() {
let base_url = TestTable::V6ComplexkeygenHivestyle.url();
let hudi_table = Table::new(base_url.path()).await.unwrap();
let fs_view = FileSystemView::new(
Arc::new(base_url),
Arc::new(HashMap::new()),
Arc::new(HudiConfigs::empty()),
)
.await
.unwrap();
assert_eq!(fs_view.partition_to_file_groups.len(), 0);
let excludes = &hudi_table
.timeline
.get_replaced_file_groups()
.await
.unwrap();
let partition_schema = hudi_table.get_partition_schema().await.unwrap();
let partition_pruner = PartitionPruner::new(
&["byteField < 20", "shortField = 300"],
&partition_schema,
hudi_table.configs.as_ref(),
)
.unwrap();
let file_slices = fs_view
.get_file_slices_as_of("20240418173235694", &partition_pruner, excludes)
.await
.unwrap();
assert_eq!(fs_view.partition_to_file_groups.len(), 1);
assert_eq!(file_slices.len(), 1);
let fg_ids = file_slices
.iter()
.map(|fsl| fsl.file_group_id())
.collect::<Vec<_>>();
assert_eq!(fg_ids, vec!["a22e8257-e249-45e9-ba46-115bc85adcba-0"]);
for fsl in file_slices.iter() {
assert_eq!(fsl.base_file.stats.as_ref().unwrap().num_records, 2);
}
}
}
Loading
Loading