Skip to content

Commit

Permalink
Merge commit 'f5805b7aae974c0499ec9b979962083d0ac8965c' into chunchun…
Browse files Browse the repository at this point in the history
…/update-df-apr-week-1
  • Loading branch information
appletreeisyellow committed Apr 17, 2024
2 parents 190a9a6 + f5805b7 commit 5bbecb4
Show file tree
Hide file tree
Showing 22 changed files with 294 additions and 164 deletions.
2 changes: 2 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1362,6 +1362,8 @@ impl TableOptions {
}
}

/// Options that control how Parquet files are read, including global options
/// that apply to all columns and optional column-specific overrides
#[derive(Clone, Default, Debug, PartialEq)]
pub struct TableParquetOptions {
/// Global Parquet options that propagates to all columns.
Expand Down
32 changes: 19 additions & 13 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,10 @@ pub struct ParquetExec {
metadata_size_hint: Option<usize>,
/// Optional user defined parquet file reader factory
parquet_file_reader_factory: Option<Arc<dyn ParquetFileReaderFactory>>,
/// Cached plan properties such as equivalence properties, ordering, partitioning, etc.
cache: PlanProperties,
/// Parquet Options
parquet_options: TableParquetOptions,
/// Options for reading Parquet files
table_parquet_options: TableParquetOptions,
}

impl ParquetExec {
Expand All @@ -100,7 +101,7 @@ impl ParquetExec {
base_config: FileScanConfig,
predicate: Option<Arc<dyn PhysicalExpr>>,
metadata_size_hint: Option<usize>,
parquet_options: TableParquetOptions,
table_parquet_options: TableParquetOptions,
) -> Self {
debug!("Creating ParquetExec, files: {:?}, projection {:?}, predicate: {:?}, limit: {:?}",
base_config.file_groups, base_config.projection, predicate, base_config.limit);
Expand Down Expand Up @@ -155,15 +156,20 @@ impl ParquetExec {
metadata_size_hint,
parquet_file_reader_factory: None,
cache,
parquet_options,
table_parquet_options,
}
}

/// Ref to the base configs
/// [`FileScanConfig`] that controls this scan (such as which files to read)
pub fn base_config(&self) -> &FileScanConfig {
&self.base_config
}

/// Options passed to the parquet reader for this scan
pub fn table_parquet_options(&self) -> &TableParquetOptions {
&self.table_parquet_options
}

/// Optional predicate.
pub fn predicate(&self) -> Option<&Arc<dyn PhysicalExpr>> {
self.predicate.as_ref()
Expand Down Expand Up @@ -197,13 +203,13 @@ impl ParquetExec {
///
/// [`Expr`]: datafusion_expr::Expr
pub fn with_pushdown_filters(mut self, pushdown_filters: bool) -> Self {
self.parquet_options.global.pushdown_filters = pushdown_filters;
self.table_parquet_options.global.pushdown_filters = pushdown_filters;
self
}

/// Return the value described in [`Self::with_pushdown_filters`]
fn pushdown_filters(&self) -> bool {
self.parquet_options.global.pushdown_filters
self.table_parquet_options.global.pushdown_filters
}

/// If true, the `RowFilter` made by `pushdown_filters` may try to
Expand All @@ -213,38 +219,38 @@ impl ParquetExec {
///
/// [`Expr`]: datafusion_expr::Expr
pub fn with_reorder_filters(mut self, reorder_filters: bool) -> Self {
self.parquet_options.global.reorder_filters = reorder_filters;
self.table_parquet_options.global.reorder_filters = reorder_filters;
self
}

/// Return the value described in [`Self::with_reorder_filters`]
fn reorder_filters(&self) -> bool {
self.parquet_options.global.reorder_filters
self.table_parquet_options.global.reorder_filters
}

/// If enabled, the reader will read the page index
/// This is used to optimise filter pushdown
/// via `RowSelector` and `RowFilter` by
/// eliminating unnecessary IO and decoding
pub fn with_enable_page_index(mut self, enable_page_index: bool) -> Self {
self.parquet_options.global.enable_page_index = enable_page_index;
self.table_parquet_options.global.enable_page_index = enable_page_index;
self
}

/// Return the value described in [`Self::with_enable_page_index`]
fn enable_page_index(&self) -> bool {
self.parquet_options.global.enable_page_index
self.table_parquet_options.global.enable_page_index
}

/// If enabled, the reader will read by the bloom filter
pub fn with_enable_bloom_filter(mut self, enable_bloom_filter: bool) -> Self {
self.parquet_options.global.bloom_filter_enabled = enable_bloom_filter;
self.table_parquet_options.global.bloom_filter_enabled = enable_bloom_filter;
self
}

/// Return the value described in [`Self::with_enable_bloom_filter`]
fn enable_bloom_filter(&self) -> bool {
self.parquet_options.global.bloom_filter_enabled
self.table_parquet_options.global.bloom_filter_enabled
}

fn output_partitioning_helper(file_config: &FileScanConfig) -> Partitioning {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,10 @@ impl PruningStatistics for BloomFilterStatistics {
ScalarValue::Int32(Some(v)) => sbbf.check(v),
ScalarValue::Int16(Some(v)) => sbbf.check(v),
ScalarValue::Int8(Some(v)) => sbbf.check(v),
ScalarValue::UInt64(Some(v)) => sbbf.check(v),
ScalarValue::UInt32(Some(v)) => sbbf.check(v),
ScalarValue::UInt16(Some(v)) => sbbf.check(v),
ScalarValue::UInt8(Some(v)) => sbbf.check(v),
ScalarValue::Decimal128(Some(v), p, s) => match parquet_type {
Type::INT32 => {
//https://github.com/apache/parquet-format/blob/eb4b31c1d64a01088d02a2f9aefc6c17c54cc6fc/Encodings.md?plain=1#L35-L42
Expand Down
11 changes: 11 additions & 0 deletions datafusion/core/tests/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ enum Scenario {
Int,
Int32Range,
UInt,
UInt32Range,
Float64,
Decimal,
DecimalBloomFilterInt32,
Expand Down Expand Up @@ -455,6 +456,13 @@ fn make_int32_range(start: i32, end: i32) -> RecordBatch {
RecordBatch::try_new(schema, vec![array.clone()]).unwrap()
}

fn make_uint32_range(start: u32, end: u32) -> RecordBatch {
let schema = Arc::new(Schema::new(vec![Field::new("u", DataType::UInt32, true)]));
let v = vec![start, end];
let array = Arc::new(UInt32Array::from(v)) as ArrayRef;
RecordBatch::try_new(schema, vec![array.clone()]).unwrap()
}

/// Return record batch with f64 vector
///
/// Columns are named
Expand Down Expand Up @@ -659,6 +667,9 @@ fn create_data_batch(scenario: Scenario) -> Vec<RecordBatch> {
make_uint_batches(250, 255),
]
}
Scenario::UInt32Range => {
vec![make_uint32_range(0, 10), make_uint32_range(200000, 300000)]
}
Scenario::Float64 => {
vec![
make_f64_batch(vec![-5.0, -4.0, -3.0, -2.0, -1.0]),
Expand Down
162 changes: 161 additions & 1 deletion datafusion/core/tests/parquet/row_group_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ macro_rules! int_tests {
async fn [<prune_int $bits _scalar_fun_and_eq >]() {
RowGroupPruningTest::new()
.with_scenario(Scenario::Int)
.with_query(&format!("SELECT * FROM t where i{} = 1", $bits))
.with_query(&format!("SELECT * FROM t where abs(i{}) = 1 and i{} = 1", $bits, $bits))
.with_expected_errors(Some(0))
.with_matched_by_stats(Some(1))
.with_pruned_by_stats(Some(3))
Expand Down Expand Up @@ -452,6 +452,144 @@ int_tests!(16, correct_bloom_filters: false);
int_tests!(32, correct_bloom_filters: true);
int_tests!(64, correct_bloom_filters: true);

// $bits: number of bits of the integer to test (8, 16, 32, 64)
// $correct_bloom_filters: if false, replicates the
// https://github.com/apache/arrow-datafusion/issues/9779 bug so that tests pass
// if and only if Bloom filters on UInt8 and UInt16 columns are still buggy.
macro_rules! uint_tests {
($bits:expr, correct_bloom_filters: $correct_bloom_filters:expr) => {
paste::item! {
#[tokio::test]
async fn [<prune_uint $bits _lt >]() {
RowGroupPruningTest::new()
.with_scenario(Scenario::UInt)
.with_query(&format!("SELECT * FROM t where u{} < 6", $bits))
.with_expected_errors(Some(0))
.with_matched_by_stats(Some(3))
.with_pruned_by_stats(Some(1))
.with_matched_by_bloom_filter(Some(0))
.with_pruned_by_bloom_filter(Some(0))
.with_expected_rows(11)
.test_row_group_prune()
.await;
}

#[tokio::test]
async fn [<prune_uint $bits _eq >]() {
RowGroupPruningTest::new()
.with_scenario(Scenario::UInt)
.with_query(&format!("SELECT * FROM t where u{} = 6", $bits))
.with_expected_errors(Some(0))
.with_matched_by_stats(Some(1))
.with_pruned_by_stats(Some(3))
.with_matched_by_bloom_filter(Some(if $correct_bloom_filters { 1 } else { 0 }))
.with_pruned_by_bloom_filter(Some(if $correct_bloom_filters { 0 } else { 1 }))
.with_expected_rows(if $correct_bloom_filters { 1 } else { 0 })
.test_row_group_prune()
.await;
}
#[tokio::test]
async fn [<prune_uint $bits _scalar_fun_and_eq >]() {
RowGroupPruningTest::new()
.with_scenario(Scenario::UInt)
.with_query(&format!("SELECT * FROM t where power(u{}, 2) = 36 and u{} = 6", $bits, $bits))
.with_expected_errors(Some(0))
.with_matched_by_stats(Some(1))
.with_pruned_by_stats(Some(3))
.with_matched_by_bloom_filter(Some(if $correct_bloom_filters { 1 } else { 0 }))
.with_pruned_by_bloom_filter(Some(if $correct_bloom_filters { 0 } else { 1 }))
.with_expected_rows(if $correct_bloom_filters { 1 } else { 0 })
.test_row_group_prune()
.await;
}

#[tokio::test]
async fn [<prune_uint $bits _scalar_fun >]() {
RowGroupPruningTest::new()
.with_scenario(Scenario::UInt)
.with_query(&format!("SELECT * FROM t where power(u{}, 2) = 25", $bits))
.with_expected_errors(Some(0))
.with_matched_by_stats(Some(0))
.with_pruned_by_stats(Some(0))
.with_matched_by_bloom_filter(Some(0))
.with_pruned_by_bloom_filter(Some(0))
.with_expected_rows(2)
.test_row_group_prune()
.await;
}

#[tokio::test]
async fn [<prune_uint $bits _complex_expr >]() {
RowGroupPruningTest::new()
.with_scenario(Scenario::UInt)
.with_query(&format!("SELECT * FROM t where u{}+1 = 6", $bits))
.with_expected_errors(Some(0))
.with_matched_by_stats(Some(0))
.with_pruned_by_stats(Some(0))
.with_matched_by_bloom_filter(Some(0))
.with_pruned_by_bloom_filter(Some(0))
.with_expected_rows(2)
.test_row_group_prune()
.await;
}

#[tokio::test]
async fn [<prune_uint $bits _eq_in_list >]() {
// result of sql "SELECT * FROM t where in (1)"
RowGroupPruningTest::new()
.with_scenario(Scenario::UInt)
.with_query(&format!("SELECT * FROM t where u{} in (6)", $bits))
.with_expected_errors(Some(0))
.with_matched_by_stats(Some(1))
.with_pruned_by_stats(Some(3))
.with_matched_by_bloom_filter(Some(if $correct_bloom_filters { 1 } else { 0 }))
.with_pruned_by_bloom_filter(Some(if $correct_bloom_filters { 0 } else { 1 }))
.with_expected_rows(if $correct_bloom_filters { 1 } else { 0 })
.test_row_group_prune()
.await;
}

#[tokio::test]
async fn [<prune_uint $bits _eq_in_list_2 >]() {
// result of sql "SELECT * FROM t where in (1000)", prune all
// test whether statistics works
RowGroupPruningTest::new()
.with_scenario(Scenario::UInt)
.with_query(&format!("SELECT * FROM t where u{} in (100)", $bits))
.with_expected_errors(Some(0))
.with_matched_by_stats(Some(0))
.with_pruned_by_stats(Some(4))
.with_matched_by_bloom_filter(Some(0))
.with_pruned_by_bloom_filter(Some(0))
.with_expected_rows(0)
.test_row_group_prune()
.await;
}

#[tokio::test]
async fn [<prune_uint $bits _eq_in_list_negated >]() {
// result of sql "SELECT * FROM t where not in (1)" prune nothing
RowGroupPruningTest::new()
.with_scenario(Scenario::UInt)
.with_query(&format!("SELECT * FROM t where u{} not in (6)", $bits))
.with_expected_errors(Some(0))
.with_matched_by_stats(Some(4))
.with_pruned_by_stats(Some(0))
.with_matched_by_bloom_filter(Some(4))
.with_pruned_by_bloom_filter(Some(0))
.with_expected_rows(19)
.test_row_group_prune()
.await;
}
}
};
}

uint_tests!(8, correct_bloom_filters: false);
uint_tests!(16, correct_bloom_filters: false);
uint_tests!(32, correct_bloom_filters: true);
uint_tests!(64, correct_bloom_filters: true);

#[tokio::test]
async fn prune_int32_eq_large_in_list() {
// result of sql "SELECT * FROM t where i in (2050...2582)", prune all
Expand All @@ -474,6 +612,28 @@ async fn prune_int32_eq_large_in_list() {
.await;
}

#[tokio::test]
async fn prune_uint32_eq_large_in_list() {
// result of sql "SELECT * FROM t where i in (2050...2582)", prune all
RowGroupPruningTest::new()
.with_scenario(Scenario::UInt32Range)
.with_query(
format!(
"SELECT * FROM t where u in ({})",
(200050..200082).join(",")
)
.as_str(),
)
.with_expected_errors(Some(0))
.with_matched_by_stats(Some(1))
.with_pruned_by_stats(Some(0))
.with_matched_by_bloom_filter(Some(0))
.with_pruned_by_bloom_filter(Some(1))
.with_expected_rows(0)
.test_row_group_prune()
.await;
}

#[tokio::test]
async fn prune_f64_lt() {
RowGroupPruningTest::new()
Expand Down
Loading

0 comments on commit 5bbecb4

Please sign in to comment.