Skip to content

Commit

Permalink
feat: support partition prune api
Browse files Browse the repository at this point in the history
  • Loading branch information
KnightChess committed Sep 12, 2024
1 parent 01ef2fc commit 085fbba
Show file tree
Hide file tree
Showing 5 changed files with 705 additions and 30 deletions.
3 changes: 1 addition & 2 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ tokio = { workspace = true }
# datafusion
datafusion = { workspace = true, optional = true }
datafusion-expr = { workspace = true, optional = true }
datafusion-common = { workspace = true, optional = true }
datafusion-common = { workspace = true }
datafusion-physical-expr = { workspace = true, optional = true }

[dev-dependencies]
Expand All @@ -73,6 +73,5 @@ hudi-tests = { path = "../tests" }
datafusion = [
"dep:datafusion",
"datafusion-expr",
"datafusion-common",
"datafusion-physical-expr",
]
1 change: 1 addition & 0 deletions crates/core/src/config/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ impl ConfigParser for HudiTableConfig {
Self::DatabaseName => Some(HudiConfigValue::String("default".to_string())),
Self::DropsPartitionFields => Some(HudiConfigValue::Boolean(false)),
Self::PopulatesMetaFields => Some(HudiConfigValue::Boolean(true)),
Self::PartitionFields => Some(HudiConfigValue::List(vec!["".to_string()])),
_ => None,
}
}
Expand Down
110 changes: 89 additions & 21 deletions crates/core/src/table/fs_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ use std::sync::Arc;

use anyhow::{anyhow, Result};
use arrow::record_batch::RecordBatch;
use arrow_schema::Schema;
use dashmap::DashMap;
use url::Url;

use crate::config::HudiConfigs;
use crate::file_group::{BaseFile, FileGroup, FileSlice};
use crate::storage::file_info::FileInfo;
use crate::storage::{get_leaf_dirs, Storage};
use crate::table::partitions::{HudiTablePartition, PartitionFilter};

#[derive(Clone, Debug)]
#[allow(dead_code)]
Expand All @@ -45,18 +47,19 @@ impl FileSystemView {
configs: Arc<HudiConfigs>,
) -> Result<Self> {
let storage = Storage::new(base_url, &storage_options)?;
let partition_paths = Self::load_partition_paths(&storage).await?;
let partition_to_file_groups =
Self::load_file_groups_for_partitions(&storage, partition_paths).await?;
let partition_to_file_groups = Arc::new(DashMap::from_iter(partition_to_file_groups));
let partition_to_file_groups = Arc::new(DashMap::new());

Check warning on line 50 in crates/core/src/table/fs_view.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/table/fs_view.rs#L50

Added line #L50 was not covered by tests
Ok(FileSystemView {
configs,
storage,
partition_to_file_groups,
})
}

async fn load_partition_paths(storage: &Storage) -> Result<Vec<String>> {
async fn load_partition_paths(
storage: &Storage,
partition_filters: &[PartitionFilter],
partition_schema: Schema,
) -> Result<Vec<String>> {
let top_level_dirs: Vec<String> = storage
.list_dirs(None)
.await?
Expand All @@ -70,7 +73,29 @@ impl FileSystemView {
if partition_paths.is_empty() {
partition_paths.push("".to_string())
}
Ok(partition_paths)
if partition_filters.is_empty() {
Ok(partition_paths)
} else {
// fields data type
let field_data_type: HashMap<_, _> = partition_schema
.fields
.iter()
.map(|field| (field.name().clone(), field.data_type().clone()))
.collect();

Ok(partition_paths
.into_iter()
.filter(|path_str| {
let partitions: Vec<HudiTablePartition> = path_str
.split('/')
.map(|s| HudiTablePartition::try_from((s, &field_data_type)).unwrap())
.collect();
partition_filters
.iter()
.all(|filter| filter.match_partitions(&partitions))
})
.collect())
}
}

async fn load_file_groups_for_partitions(
Expand Down Expand Up @@ -121,21 +146,40 @@ impl FileSystemView {
Ok(file_groups)
}

pub fn get_file_slices_as_of(
pub async fn get_file_slices_as_of(
&self,
timestamp: &str,
excluding_file_groups: &HashSet<FileGroup>,
partition_filter: &[PartitionFilter],
partition_schema: Schema,
) -> Result<Vec<FileSlice>> {
let mut file_slices = Vec::new();
for fgs in self.partition_to_file_groups.iter() {
let fgs_ref = fgs.value();
if self.partition_to_file_groups.is_empty() {
let partition_paths =
Self::load_partition_paths(&self.storage, partition_filter, partition_schema)
.await
.unwrap();
let partition_to_file_groups =
Self::load_file_groups_for_partitions(&self.storage, partition_paths)
.await
.unwrap();
partition_to_file_groups.into_iter().for_each(|pair| {
self.partition_to_file_groups.insert(pair.0, pair.1);
});
}
for mut fgs in self.partition_to_file_groups.iter_mut() {
let fgs_ref = fgs.value_mut();
for fg in fgs_ref {
if excluding_file_groups.contains(fg) {
continue;
}
if let Some(fsl) = fg.get_file_slice_as_of(timestamp) {
if let Some(fsl) = fg.get_file_slice_mut_as_of(timestamp) {
// TODO: pass ref instead of copying
file_slices.push(fsl.clone());
fsl.load_stats(&self.storage)

Check warning on line 178 in crates/core/src/table/fs_view.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/table/fs_view.rs#L178

Added line #L178 was not covered by tests
.await
.expect("Fail to load file slice stats.");
let immut_fsl: &FileSlice = fsl;
file_slices.push(immut_fsl.clone());
}
}
}
Expand Down Expand Up @@ -174,26 +218,35 @@ impl FileSystemView {
self.read_file_slice_by_path_unchecked(&file_slice.base_file_relative_path())
.await
}

pub fn reset(&mut self) {
self.partition_to_file_groups = Arc::new(DashMap::new())
}
}

#[cfg(test)]
mod tests {
use hudi_tests::TestTable;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use hudi_tests::TestTable;

use crate::config::HudiConfigs;
use crate::storage::Storage;
use crate::table::fs_view::FileSystemView;
use crate::table::Table;

#[tokio::test]
async fn get_partition_paths_for_nonpartitioned_table() {
let base_url = TestTable::V6Nonpartitioned.url();
let hudi_table = Table::new(base_url.path()).await.unwrap();
let storage = Storage::new(Arc::new(base_url), &HashMap::new()).unwrap();
let partition_paths = FileSystemView::load_partition_paths(&storage)
.await
.unwrap();
let partition_paths = FileSystemView::load_partition_paths(
&storage,
&hudi_table.partition_filters,
hudi_table.get_partition_schema().await.unwrap(),
)
.await
.unwrap();
let partition_path_set: HashSet<&str> =
HashSet::from_iter(partition_paths.iter().map(|p| p.as_str()));
assert_eq!(partition_path_set, HashSet::from([""]))
Expand All @@ -202,10 +255,15 @@ mod tests {
#[tokio::test]
async fn get_partition_paths_for_complexkeygen_table() {
let base_url = TestTable::V6ComplexkeygenHivestyle.url();
let hudi_table = Table::new(base_url.path()).await.unwrap();
let storage = Storage::new(Arc::new(base_url), &HashMap::new()).unwrap();
let partition_paths = FileSystemView::load_partition_paths(&storage)
.await
.unwrap();
let partition_paths = FileSystemView::load_partition_paths(
&storage,
&hudi_table.partition_filters,
hudi_table.get_partition_schema().await.unwrap(),
)
.await
.unwrap();
let partition_path_set: HashSet<&str> =
HashSet::from_iter(partition_paths.iter().map(|p| p.as_str()));
assert_eq!(
Expand All @@ -221,6 +279,7 @@ mod tests {
#[tokio::test]
async fn fs_view_get_latest_file_slices() {
let base_url = TestTable::V6Nonpartitioned.url();
let hudi_table = Table::new(base_url.path()).await.unwrap();
let fs_view = FileSystemView::new(
Arc::new(base_url),
Arc::new(HashMap::new()),
Expand All @@ -231,13 +290,22 @@ mod tests {

let excludes = HashSet::new();
let file_slices = fs_view
.get_file_slices_as_of("20240418173551906", &excludes)
.get_file_slices_as_of(
"20240418173551906",
&excludes,
&hudi_table.partition_filters,
hudi_table.get_partition_schema().await.unwrap(),
)
.await
.unwrap();
assert_eq!(file_slices.len(), 1);
let fg_ids = file_slices
.iter()
.map(|fsl| fsl.file_group_id())
.collect::<Vec<_>>();
assert_eq!(fg_ids, vec!["a079bdb3-731c-4894-b855-abfcd6921007-0"])
assert_eq!(fg_ids, vec!["a079bdb3-731c-4894-b855-abfcd6921007-0"]);
for fsl in file_slices.iter() {
assert_eq!(fsl.base_file.stats.as_ref().unwrap().num_records, 4);
}
}
}
Loading

0 comments on commit 085fbba

Please sign in to comment.