From 7afeb8b8078018a167e7bce6421024b95517b234 Mon Sep 17 00:00:00 2001 From: Asura7969 <1402357969@qq.com> Date: Wed, 8 Nov 2023 17:04:53 +0800 Subject: [PATCH 1/4] Minor: Improve the document format of JoinHashMap --- .../src/joins/hash_join_utils.rs | 115 ++++++++++-------- 1 file changed, 62 insertions(+), 53 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join_utils.rs b/datafusion/physical-plan/src/joins/hash_join_utils.rs index 3a2a85c72722..3ea0331ab4fe 100644 --- a/datafusion/physical-plan/src/joins/hash_join_utils.rs +++ b/datafusion/physical-plan/src/joins/hash_join_utils.rs @@ -40,59 +40,68 @@ use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr}; use hashbrown::raw::RawTable; use hashbrown::HashSet; -// Maps a `u64` hash value based on the build side ["on" values] to a list of indices with this key's value. -// By allocating a `HashMap` with capacity for *at least* the number of rows for entries at the build side, -// we make sure that we don't have to re-hash the hashmap, which needs access to the key (the hash in this case) value. -// E.g. 1 -> [3, 6, 8] indicates that the column values map to rows 3, 6 and 8 for hash value 1 -// As the key is a hash value, we need to check possible hash collisions in the probe stage -// During this stage it might be the case that a row is contained the same hashmap value, -// but the values don't match. Those are checked in the [equal_rows] macro -// The indices (values) are stored in a separate chained list stored in the `Vec`. -// The first value (+1) is stored in the hashmap, whereas the next value is stored in array at the position value. -// The chain can be followed until the value "0" has been reached, meaning the end of the list. -// Also see chapter 5.3 of [Balancing vectorized query execution with bandwidth-optimized storage](https://dare.uva.nl/search?identifier=5ccbb60a-38b8-4eeb-858a-e7735dd37487) -// See the example below: -// Insert (1,1) -// map: -// --------- -// | 1 | 2 | -// --------- -// next: -// --------------------- -// | 0 | 0 | 0 | 0 | 0 | -// --------------------- -// Insert (2,2) -// map: -// --------- -// | 1 | 2 | -// | 2 | 3 | -// --------- -// next: -// --------------------- -// | 0 | 0 | 0 | 0 | 0 | -// --------------------- -// Insert (1,3) -// map: -// --------- -// | 1 | 4 | -// | 2 | 3 | -// --------- -// next: -// --------------------- -// | 0 | 0 | 0 | 2 | 0 | <--- hash value 1 maps to 4,2 (which means indices values 3,1) -// --------------------- -// Insert (1,4) -// map: -// --------- -// | 1 | 5 | -// | 2 | 3 | -// --------- -// next: -// --------------------- -// | 0 | 0 | 0 | 2 | 4 | <--- hash value 1 maps to 5,4,2 (which means indices values 4,3,1) -// --------------------- -// TODO: speed up collision checks -// https://github.com/apache/arrow-datafusion/issues/50 +/// Maps a `u64` hash value based on the build side ["on" values] to a list of indices with this key's value. +/// +/// By allocating a `HashMap` with capacity for *at least* the number of rows for entries at the build side, +/// we make sure that we don't have to re-hash the hashmap, which needs access to the key (the hash in this case) value. +/// +/// E.g. 1 -> [3, 6, 8] indicates that the column values map to rows 3, 6 and 8 for hash value 1 +/// As the key is a hash value, we need to check possible hash collisions in the probe stage +/// During this stage it might be the case that a row is contained the same hashmap value, +/// but the values don't match. Those are checked in the [equal_rows] macro +/// The indices (values) are stored in a separate chained list stored in the `Vec`. +/// +/// The first value (+1) is stored in the hashmap, whereas the next value is stored in array at the position value. +/// +/// The chain can be followed until the value "0" has been reached, meaning the end of the list. +/// Also see chapter 5.3 of [Balancing vectorized query execution with bandwidth-optimized storage](https://dare.uva.nl/search?identifier=5ccbb60a-38b8-4eeb-858a-e7735dd37487) +/// +/// # Example +/// +/// ``` text +/// See the example below: +/// Insert (1,1) +/// map: +/// --------- +/// | 1 | 2 | +/// --------- +/// next: +/// --------------------- +/// | 0 | 0 | 0 | 0 | 0 | +/// --------------------- +/// Insert (2,2) +/// map: +/// --------- +/// | 1 | 2 | +/// | 2 | 3 | +/// --------- +/// next: +/// --------------------- +/// | 0 | 0 | 0 | 0 | 0 | +/// --------------------- +/// Insert (1,3) +/// map: +/// --------- +/// | 1 | 4 | +/// | 2 | 3 | +/// --------- +/// next: +/// --------------------- +/// | 0 | 0 | 0 | 2 | 0 | <--- hash value 1 maps to 4,2 (which means indices values 3,1) +/// --------------------- +/// Insert (1,4) +/// map: +/// --------- +/// | 1 | 5 | +/// | 2 | 3 | +/// --------- +/// next: +/// --------------------- +/// | 0 | 0 | 0 | 2 | 4 | <--- hash value 1 maps to 5,4,2 (which means indices values 4,3,1) +/// --------------------- +/// ``` +/// +///TODO: [speed up collision checks](https://github.com/apache/arrow-datafusion/issues/50) pub struct JoinHashMap { // Stores hash value to last row index pub map: RawTable<(u64, u64)>, From deefdd07986fc205a22bdb69e664fa724f234a7e Mon Sep 17 00:00:00 2001 From: Asura7969 <1402357969@qq.com> Date: Thu, 7 Dec 2023 16:56:00 +0800 Subject: [PATCH 2/4] fix `compute_record_batch_statistics` wrong with `projection` --- datafusion/physical-plan/src/common.rs | 40 ++++++++++++++++++-------- 1 file changed, 28 insertions(+), 12 deletions(-) diff --git a/datafusion/physical-plan/src/common.rs b/datafusion/physical-plan/src/common.rs index 649f3a31aa7e..713d877f5541 100644 --- a/datafusion/physical-plan/src/common.rs +++ b/datafusion/physical-plan/src/common.rs @@ -30,6 +30,7 @@ use crate::{ColumnStatistics, ExecutionPlan, Statistics}; use arrow::datatypes::Schema; use arrow::ipc::writer::{FileWriter, IpcWriteOptions}; use arrow::record_batch::RecordBatch; +use arrow_array::Array; use datafusion_common::stats::Precision; use datafusion_common::{plan_err, DataFusionError, Result}; use datafusion_execution::memory_pool::MemoryReservation; @@ -139,17 +140,24 @@ pub fn compute_record_batch_statistics( ) -> Statistics { let nb_rows = batches.iter().flatten().map(RecordBatch::num_rows).sum(); - let total_byte_size = batches - .iter() - .flatten() - .map(|b| b.get_array_memory_size()) - .sum(); - let projection = match projection { Some(p) => p, None => (0..schema.fields().len()).collect(), }; + let total_byte_size = batches + .iter() + .flatten() + .map(|b| { + b.columns() + .iter() + .enumerate() + .filter(|(index, _)| projection.contains(index)) + .map(|(_, col)| col.get_array_memory_size()) + .sum::() + }) + .sum(); + let mut column_statistics = vec![ColumnStatistics::new_unknown(); projection.len()]; for partition in batches.iter() { @@ -388,6 +396,7 @@ mod tests { datatypes::{DataType, Field, Schema}, record_batch::RecordBatch, }; + use arrow_array::UInt64Array; use datafusion_expr::Operator; use datafusion_physical_expr::expressions::{col, Column}; @@ -685,20 +694,30 @@ mod tests { let schema = Arc::new(Schema::new(vec![ Field::new("f32", DataType::Float32, false), Field::new("f64", DataType::Float64, false), + Field::new("u64", DataType::UInt64, false), ])); let batch = RecordBatch::try_new( Arc::clone(&schema), vec![ Arc::new(Float32Array::from(vec![1., 2., 3.])), Arc::new(Float64Array::from(vec![9., 8., 7.])), + Arc::new(UInt64Array::from(vec![4, 5, 6])), ], )?; + + // just select f32,f64 + let select_projection = Some(vec![0, 1]); + let byte_size = batch + .project(&select_projection.clone().unwrap()) + .unwrap() + .get_array_memory_size(); + let actual = - compute_record_batch_statistics(&[vec![batch]], &schema, Some(vec![0, 1])); + compute_record_batch_statistics(&[vec![batch]], &schema, select_projection); - let mut expected = Statistics { + let expected = Statistics { num_rows: Precision::Exact(3), - total_byte_size: Precision::Exact(464), // this might change a bit if the way we compute the size changes + total_byte_size: Precision::Exact(byte_size), column_statistics: vec![ ColumnStatistics { distinct_count: Precision::Absent, @@ -715,9 +734,6 @@ mod tests { ], }; - // Prevent test flakiness due to undefined / changing implementation details - expected.total_byte_size = actual.total_byte_size.clone(); - assert_eq!(actual, expected); Ok(()) } From d19294fb136e8b770baf04c0f939f2781a716b36 Mon Sep 17 00:00:00 2001 From: Asura7969 <1402357969@qq.com> Date: Mon, 11 Dec 2023 16:27:19 +0800 Subject: [PATCH 3/4] fix test --- datafusion/sqllogictest/test_files/joins.slt | 42 ++++++++++---------- 1 file changed, 20 insertions(+), 22 deletions(-) diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 0fea8da5a342..eb1727889163 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -1569,15 +1569,13 @@ Projection: join_t1.t1_id, join_t2.t2_id, join_t1.t1_name ----TableScan: join_t1 projection=[t1_id, t1_name] ----TableScan: join_t2 projection=[t2_id] physical_plan -ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@3 as t2_id, t1_name@1 as t1_name] +ProjectionExec: expr=[t1_id@1 as t1_id, t2_id@0 as t2_id, t1_name@2 as t1_name] --CoalesceBatchesExec: target_batch_size=2 -----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(join_t1.t1_id + UInt32(11)@2, t2_id@0)] -------CoalescePartitionsExec ---------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 11 as join_t1.t1_id + UInt32(11)] -----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 -------------MemoryExec: partitions=1, partition_sizes=[1] -------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 ---------MemoryExec: partitions=1, partition_sizes=[1] +----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(t2_id@0, join_t1.t1_id + UInt32(11)@2)] +------MemoryExec: partitions=1, partition_sizes=[1] +------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 11 as join_t1.t1_id + UInt32(11)] +--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +----------MemoryExec: partitions=1, partition_sizes=[1] statement ok set datafusion.optimizer.repartition_joins = true; @@ -1595,18 +1593,18 @@ Projection: join_t1.t1_id, join_t2.t2_id, join_t1.t1_name ----TableScan: join_t1 projection=[t1_id, t1_name] ----TableScan: join_t2 projection=[t2_id] physical_plan -ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@3 as t2_id, t1_name@1 as t1_name] +ProjectionExec: expr=[t1_id@1 as t1_id, t2_id@0 as t2_id, t1_name@2 as t1_name] --CoalesceBatchesExec: target_batch_size=2 -----HashJoinExec: mode=Partitioned, join_type=Inner, on=[(join_t1.t1_id + UInt32(11)@2, t2_id@0)] +----HashJoinExec: mode=Partitioned, join_type=Inner, on=[(t2_id@0, join_t1.t1_id + UInt32(11)@2)] +------CoalesceBatchesExec: target_batch_size=2 +--------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2 +----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +------------MemoryExec: partitions=1, partition_sizes=[1] ------CoalesceBatchesExec: target_batch_size=2 --------RepartitionExec: partitioning=Hash([join_t1.t1_id + UInt32(11)@2], 2), input_partitions=2 ----------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 11 as join_t1.t1_id + UInt32(11)] ------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 --------------MemoryExec: partitions=1, partition_sizes=[1] -------CoalesceBatchesExec: target_batch_size=2 ---------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2 -----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 -------------MemoryExec: partitions=1, partition_sizes=[1] # Right side expr key inner join @@ -2821,13 +2819,13 @@ physical_plan SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST] --SortExec: expr=[t1_id@0 ASC NULLS LAST] ----CoalesceBatchesExec: target_batch_size=2 -------HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(t1_id@0, t2_id@0)] +------HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(t2_id@0, t1_id@0)] --------CoalesceBatchesExec: target_batch_size=2 -----------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2 +----------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2 ------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 --------------MemoryExec: partitions=1, partition_sizes=[1] --------CoalesceBatchesExec: target_batch_size=2 -----------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2 +----------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2 ------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 --------------MemoryExec: partitions=1, partition_sizes=[1] @@ -2862,13 +2860,13 @@ physical_plan SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST] --SortExec: expr=[t1_id@0 ASC NULLS LAST] ----CoalesceBatchesExec: target_batch_size=2 -------HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(t1_id@0, t2_id@0)] +------HashJoinExec: mode=Partitioned, join_type=RightSemi, on=[(t2_id@0, t1_id@0)] --------CoalesceBatchesExec: target_batch_size=2 -----------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2 +----------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2 ------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 --------------MemoryExec: partitions=1, partition_sizes=[1] --------CoalesceBatchesExec: target_batch_size=2 -----------RepartitionExec: partitioning=Hash([t2_id@0], 2), input_partitions=2 +----------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2 ------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 --------------MemoryExec: partitions=1, partition_sizes=[1] @@ -2924,7 +2922,7 @@ physical_plan SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST] --SortExec: expr=[t1_id@0 ASC NULLS LAST] ----CoalesceBatchesExec: target_batch_size=2 -------HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(t1_id@0, t2_id@0)] +------HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, t1_id@0)] --------MemoryExec: partitions=1, partition_sizes=[1] --------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 ----------MemoryExec: partitions=1, partition_sizes=[1] @@ -2960,7 +2958,7 @@ physical_plan SortPreservingMergeExec: [t1_id@0 ASC NULLS LAST] --SortExec: expr=[t1_id@0 ASC NULLS LAST] ----CoalesceBatchesExec: target_batch_size=2 -------HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(t1_id@0, t2_id@0)] +------HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(t2_id@0, t1_id@0)] --------MemoryExec: partitions=1, partition_sizes=[1] --------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 ----------MemoryExec: partitions=1, partition_sizes=[1] From 928cbb19818630288f0355e00be8ab5d10734af7 Mon Sep 17 00:00:00 2001 From: Asura7969 <1402357969@qq.com> Date: Mon, 11 Dec 2023 16:48:53 +0800 Subject: [PATCH 4/4] fix test --- datafusion/physical-plan/src/common.rs | 6 ++---- .../sqllogictest/test_files/groupby.slt | 21 ++++++++++--------- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/datafusion/physical-plan/src/common.rs b/datafusion/physical-plan/src/common.rs index 713d877f5541..e83dc2525b9f 100644 --- a/datafusion/physical-plan/src/common.rs +++ b/datafusion/physical-plan/src/common.rs @@ -149,11 +149,9 @@ pub fn compute_record_batch_statistics( .iter() .flatten() .map(|b| { - b.columns() + projection .iter() - .enumerate() - .filter(|(index, _)| projection.contains(index)) - .map(|(_, col)| col.get_array_memory_size()) + .map(|index| b.column(*index).get_array_memory_size()) .sum::() }) .sum(); diff --git a/datafusion/sqllogictest/test_files/groupby.slt b/datafusion/sqllogictest/test_files/groupby.slt index b7be4d78b583..2b7df08a1794 100644 --- a/datafusion/sqllogictest/test_files/groupby.slt +++ b/datafusion/sqllogictest/test_files/groupby.slt @@ -2021,14 +2021,15 @@ SortPreservingMergeExec: [col0@0 ASC NULLS LAST] ----------RepartitionExec: partitioning=Hash([col0@0, col1@1, col2@2], 4), input_partitions=4 ------------AggregateExec: mode=Partial, gby=[col0@0 as col0, col1@1 as col1, col2@2 as col2], aggr=[LAST_VALUE(r.col1)], ordering_mode=PartiallySorted([0]) --------------SortExec: expr=[col0@3 ASC NULLS LAST] -----------------CoalesceBatchesExec: target_batch_size=8192 -------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(col0@0, col0@0)] ---------------------CoalesceBatchesExec: target_batch_size=8192 -----------------------RepartitionExec: partitioning=Hash([col0@0], 4), input_partitions=1 -------------------------MemoryExec: partitions=1, partition_sizes=[3] ---------------------CoalesceBatchesExec: target_batch_size=8192 -----------------------RepartitionExec: partitioning=Hash([col0@0], 4), input_partitions=1 -------------------------MemoryExec: partitions=1, partition_sizes=[3] +----------------ProjectionExec: expr=[col0@2 as col0, col1@3 as col1, col2@4 as col2, col0@0 as col0, col1@1 as col1] +------------------CoalesceBatchesExec: target_batch_size=8192 +--------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(col0@0, col0@0)] +----------------------CoalesceBatchesExec: target_batch_size=8192 +------------------------RepartitionExec: partitioning=Hash([col0@0], 4), input_partitions=1 +--------------------------MemoryExec: partitions=1, partition_sizes=[3] +----------------------CoalesceBatchesExec: target_batch_size=8192 +------------------------RepartitionExec: partitioning=Hash([col0@0], 4), input_partitions=1 +--------------------------MemoryExec: partitions=1, partition_sizes=[3] # Columns in the table are a,b,c,d. Source is CsvExec which is ordered by # a,b,c column. Column a has cardinality 2, column b has cardinality 4. @@ -2709,9 +2710,9 @@ SortExec: expr=[sn@2 ASC NULLS LAST] --ProjectionExec: expr=[zip_code@1 as zip_code, country@2 as country, sn@0 as sn, ts@3 as ts, currency@4 as currency, LAST_VALUE(e.amount) ORDER BY [e.sn ASC NULLS LAST]@5 as last_rate] ----AggregateExec: mode=Single, gby=[sn@2 as sn, zip_code@0 as zip_code, country@1 as country, ts@3 as ts, currency@4 as currency], aggr=[LAST_VALUE(e.amount)] ------SortExec: expr=[sn@5 ASC NULLS LAST] ---------ProjectionExec: expr=[zip_code@0 as zip_code, country@1 as country, sn@2 as sn, ts@3 as ts, currency@4 as currency, sn@5 as sn, amount@8 as amount] +--------ProjectionExec: expr=[zip_code@4 as zip_code, country@5 as country, sn@6 as sn, ts@7 as ts, currency@8 as currency, sn@0 as sn, amount@3 as amount] ----------CoalesceBatchesExec: target_batch_size=8192 -------------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(currency@4, currency@2)], filter=ts@0 >= ts@1 +------------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(currency@2, currency@4)], filter=ts@0 >= ts@1 --------------MemoryExec: partitions=1, partition_sizes=[1] --------------MemoryExec: partitions=1, partition_sizes=[1]