Skip to content

Commit

Permalink
ARROW-10837: [Rust][DataFusion] Use Vec<u8> for hash keys
Browse files Browse the repository at this point in the history
This PR is a follow up of #8765 . Here, the hashmap values for the key are converted to `Vec<u8>` to use as key instead.

This is a bit faster as both hashing and cloning will be faster. It will also use less additional memory than the earlier usage of the dynamic `GroupByScalar` values (for hash join).

[This PR]
```
Query 12 iteration 0 took 1315 ms
Query 12 iteration 1 took 1324 ms
Query 12 iteration 2 took 1329 ms
Query 12 iteration 3 took 1334 ms
Query 12 iteration 4 took 1335 ms
Query 12 iteration 5 took 1338 ms
Query 12 iteration 6 took 1337 ms
Query 12 iteration 7 took 1349 ms
Query 12 iteration 8 took 1348 ms
Query 12 iteration 9 took 1358 ms
```

[Master]
```
Query 12 iteration 0 took 1379 ms
Query 12 iteration 1 took 1383 ms
Query 12 iteration 2 took 1401 ms
Query 12 iteration 3 took 1406 ms
Query 12 iteration 4 took 1420 ms
Query 12 iteration 5 took 1435 ms
Query 12 iteration 6 took 1401 ms
Query 12 iteration 7 took 1404 ms
Query 12 iteration 8 took 1418 ms
Query 12 iteration 9 took 1416 ms
```

[This PR]
```
Query 1 iteration 0 took 871 ms
Query 1 iteration 1 took 866 ms
Query 1 iteration 2 took 869 ms
Query 1 iteration 3 took 869 ms
Query 1 iteration 4 took 867 ms
Query 1 iteration 5 took 874 ms
Query 1 iteration 6 took 870 ms
Query 1 iteration 7 took 875 ms
Query 1 iteration 8 took 871 ms
Query 1 iteration 9 took 869 ms
```

[Master]
```
Query 1 iteration 0 took 1189 ms
Query 1 iteration 1 took 1192 ms
Query 1 iteration 2 took 1189 ms
Query 1 iteration 3 took 1185 ms
Query 1 iteration 4 took 1193 ms
Query 1 iteration 5 took 1202 ms
Query 1 iteration 6 took 1547 ms
Query 1 iteration 7 took 1242 ms
Query 1 iteration 8 took 1202 ms
Query 1 iteration 9 took 1197 ms
```

FWIW, micro benchmark results for aggregate queries:

```
aggregate_query_no_group_by 15 12
                        time:   [538.54 us 541.48 us 544.74 us]
                        change: [+5.4384% +6.6260% +8.2034%] (p = 0.00 < 0.05)
                        Performance has regressed.
Found 8 outliers among 100 measurements (8.00%)
  7 (7.00%) high mild
  1 (1.00%) high severe

aggregate_query_no_group_by_count_distinct_wide 15 12
                        time:   [4.8418 ms 4.8744 ms 4.9076 ms]
                        change: [-13.890% -12.580% -11.260%] (p = 0.00 < 0.05)
                        Performance has improved.

aggregate_query_no_group_by_count_distinct_narrow 15 12
                        time:   [2.1910 ms 2.2100 ms 2.2291 ms]
                        change: [-30.490% -28.886% -27.271%] (p = 0.00 < 0.05)
                        Performance has improved.

Benchmarking aggregate_query_group_by 15 12: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 8.1s, enable flat sampling, or reduce sample count to 50.
aggregate_query_group_by 15 12
                        time:   [1.5905 ms 1.5977 ms 1.6054 ms]
                        change: [-18.271% -16.780% -15.396%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 6 outliers among 100 measurements (6.00%)
  1 (1.00%) high mild
  5 (5.00%) high severe

aggregate_query_group_by_with_filter 15 12
                        time:   [788.26 us 792.05 us 795.74 us]
                        change: [-9.8088% -8.5606% -7.4141%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 6 outliers among 100 measurements (6.00%)
  5 (5.00%) high mild
  1 (1.00%) high severe

Benchmarking aggregate_query_group_by_u64 15 12: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 9.3s, enable flat sampling, or reduce sample count to 50.
aggregate_query_group_by_u64 15 12
                        time:   [1.8502 ms 1.8565 ms 1.8630 ms]
                        change: [+8.6203% +9.8872% +10.973%] (p = 0.00 < 0.05)
                        Performance has regressed.
Found 8 outliers among 100 measurements (8.00%)
  3 (3.00%) low mild
  2 (2.00%) high mild
  3 (3.00%) high severe

aggregate_query_group_by_with_filter_u64 15 12
                        time:   [777.83 us 782.75 us 788.15 us]
                        change: [-7.5157% -6.6393% -5.6558%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 5 outliers among 100 measurements (5.00%)
  3 (3.00%) high mild
  2 (2.00%) high severe
```

FYI @jorgecarleitao

Closes #8863 from Dandandan/key_byte_vec

Lead-authored-by: Heres, Daniel <danielheres@gmail.com>
Co-authored-by: Daniël Heres <danielheres@gmail.com>
Signed-off-by: Jorge C. Leitao <jorgecarleitao@gmail.com>
  • Loading branch information
Dandandan authored and jorgecarleitao committed Dec 11, 2020
1 parent b9e94d3 commit 259fd71
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 27 deletions.
33 changes: 20 additions & 13 deletions rust/datafusion/src/physical_plan/hash_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ use arrow::{
use pin_project_lite::pin_project;

use super::{
common, expressions::Column, group_scalar::GroupByScalar, RecordBatchStream,
SendableRecordBatchStream,
common, expressions::Column, group_scalar::GroupByScalar, hash_join::create_key,
RecordBatchStream, SendableRecordBatchStream,
};
use ahash::RandomState;
use hashbrown::HashMap;
Expand Down Expand Up @@ -245,12 +245,14 @@ fn group_aggregate_batch(
// create vector large enough to hold the grouping key
// this is an optimization to avoid allocating `key` on every row.
// it will be overwritten on every iteration of the loop below
let mut key = Vec::with_capacity(group_values.len());
let mut group_by_values = Vec::with_capacity(group_values.len());
for _ in 0..group_values.len() {
key.push(GroupByScalar::UInt32(0));
group_by_values.push(GroupByScalar::UInt32(0));
}

let mut key = key.into_boxed_slice();
let mut group_by_values = group_by_values.into_boxed_slice();

let mut key = Vec::with_capacity(group_values.len());

// 1.1 construct the key from the group values
// 1.2 construct the mapping key if it does not exist
Expand All @@ -263,16 +265,21 @@ fn group_aggregate_batch(
// 1.1
create_key(&group_values, row, &mut key)
.map_err(DataFusionError::into_arrow_external_error)?;

accumulators
.raw_entry_mut()
.from_key(&key)
// 1.3
.and_modify(|_, (_, v)| v.push(row as u32))
.and_modify(|_, (_, _, v)| v.push(row as u32))
// 1.2
.or_insert_with(|| {
// We can safely unwrap here as we checked we can create an accumulator before
let accumulator_set = create_accumulators(aggr_expr).unwrap();
(key.clone(), (accumulator_set, vec![row as u32]))
let _ = create_group_by_values(&group_values, row, &mut group_by_values);
(
key.clone(),
(group_by_values.clone(), accumulator_set, vec![row as u32]),
)
});
}

Expand All @@ -284,7 +291,7 @@ fn group_aggregate_batch(
accumulators
.iter_mut()
// 2.1
.map(|(_, (accumulator_set, indices))| {
.map(|(_, (_, accumulator_set, indices))| {
// 2.2
accumulator_set
.into_iter()
Expand Down Expand Up @@ -391,7 +398,7 @@ impl GroupedHashAggregateStream {

type AccumulatorSet = Vec<Box<dyn Accumulator>>;
type Accumulators =
HashMap<Box<[GroupByScalar]>, (AccumulatorSet, Vec<u32>), RandomState>;
HashMap<Vec<u8>, (Box<[GroupByScalar]>, AccumulatorSet, Vec<u32>), RandomState>;

impl Stream for GroupedHashAggregateStream {
type Item = ArrowResult<RecordBatch>;
Expand Down Expand Up @@ -646,10 +653,10 @@ fn create_batch_from_map(
// 5. concatenate the arrays over the second index [j] into a single vec<ArrayRef>.
let arrays = accumulators
.iter()
.map(|(k, (accumulator_set, _))| {
.map(|(_, (group_by_values, accumulator_set, _))| {
// 2.
let mut groups = (0..num_group_expr)
.map(|i| match &k[i] {
.map(|i| match &group_by_values[i] {
GroupByScalar::Int8(n) => {
Arc::new(Int8Array::from(vec![*n])) as ArrayRef
}
Expand Down Expand Up @@ -726,8 +733,8 @@ fn finalize_aggregation(
}
}

/// Create a Vec<GroupByScalar> that can be used as a map key
pub(crate) fn create_key(
/// Create a Box<[GroupByScalar]> for the group by values
pub(crate) fn create_group_by_values(
group_by_keys: &[ArrayRef],
row: usize,
vec: &mut Box<[GroupByScalar]>,
Expand Down
87 changes: 73 additions & 14 deletions rust/datafusion/src/physical_plan/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! Defines the join plan for executing partitions in parallel and then joining the results
//! into a set of partitions.

use arrow::array::ArrayRef;
use std::sync::Arc;
use std::{any::Any, collections::HashSet};

Expand All @@ -26,21 +27,24 @@ use futures::{Stream, StreamExt, TryStreamExt};
use hashbrown::HashMap;

use arrow::array::{make_array, Array, MutableArrayData};
use arrow::datatypes::DataType;
use arrow::datatypes::{Schema, SchemaRef};
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;

use super::{expressions::col, hash_aggregate::create_key};
use arrow::array::{
Int16Array, Int32Array, Int64Array, Int8Array, StringArray, UInt16Array, UInt32Array,
UInt64Array, UInt8Array,
};

use super::expressions::col;
use super::{
hash_utils::{build_join_schema, check_join_is_valid, JoinOn, JoinType},
merge::MergeExec,
};
use crate::error::{DataFusionError, Result};

use super::{
group_scalar::GroupByScalar, ExecutionPlan, Partitioning, RecordBatchStream,
SendableRecordBatchStream,
};
use super::{ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream};
use ahash::RandomState;

// An index of (batch, row) uniquely identifying a row in a part.
Expand All @@ -52,7 +56,7 @@ type JoinIndex = Option<(usize, usize)>;
// Maps ["on" value] -> [list of indices with this key's value]
// E.g. [1, 2] -> [(0, 3), (1, 6), (0, 8)] indicates that (column1, column2) = [1, 2] is true
// for rows 3 and 8 from batch 0 and row 6 from batch 1.
type JoinHashMap = HashMap<Box<[GroupByScalar]>, Vec<Index>, RandomState>;
type JoinHashMap = HashMap<Vec<u8>, Vec<Index>, RandomState>;
type JoinLeftData = (JoinHashMap, Vec<RecordBatch>);

/// join execution plan executes partitions in parallel and combines them into a set of
Expand Down Expand Up @@ -205,11 +209,6 @@ fn update_hash(
.collect::<Result<Vec<_>>>()?;

let mut key = Vec::with_capacity(keys_values.len());
for _ in 0..keys_values.len() {
key.push(GroupByScalar::UInt32(0));
}

let mut key = key.into_boxed_slice();

// update the hash map
for row in 0..batch.num_rows() {
Expand Down Expand Up @@ -318,6 +317,67 @@ fn build_batch_from_indices(
Ok(RecordBatch::try_new(Arc::new(schema.clone()), columns)?)
}

/// Create a key `Vec<u8>` that is used as key for the hashmap
pub(crate) fn create_key(
group_by_keys: &[ArrayRef],
row: usize,
vec: &mut Vec<u8>,
) -> Result<()> {
vec.clear();
for i in 0..group_by_keys.len() {
let col = &group_by_keys[i];
match col.data_type() {
DataType::UInt8 => {
let array = col.as_any().downcast_ref::<UInt8Array>().unwrap();
vec.extend(array.value(row).to_le_bytes().iter());
}
DataType::UInt16 => {
let array = col.as_any().downcast_ref::<UInt16Array>().unwrap();
vec.extend(array.value(row).to_le_bytes().iter());
}
DataType::UInt32 => {
let array = col.as_any().downcast_ref::<UInt32Array>().unwrap();
vec.extend(array.value(row).to_le_bytes().iter());
}
DataType::UInt64 => {
let array = col.as_any().downcast_ref::<UInt64Array>().unwrap();
vec.extend(array.value(row).to_le_bytes().iter());
}
DataType::Int8 => {
let array = col.as_any().downcast_ref::<Int8Array>().unwrap();
vec.extend(array.value(row).to_le_bytes().iter());
}
DataType::Int16 => {
let array = col.as_any().downcast_ref::<Int16Array>().unwrap();
vec.extend(array.value(row).to_le_bytes().iter());
}
DataType::Int32 => {
let array = col.as_any().downcast_ref::<Int32Array>().unwrap();
vec.extend(array.value(row).to_le_bytes().iter());
}
DataType::Int64 => {
let array = col.as_any().downcast_ref::<Int64Array>().unwrap();
vec.extend(array.value(row).to_le_bytes().iter());
}
DataType::Utf8 => {
let array = col.as_any().downcast_ref::<StringArray>().unwrap();
let value = array.value(row);
// store the size
vec.extend(value.len().to_le_bytes().iter());
// store the string value
vec.extend(array.value(row).as_bytes().iter());
}
_ => {
// This is internal because we should have caught this before.
return Err(DataFusionError::Internal(
"Unsupported GROUP BY data type".to_string(),
));
}
}
}
Ok(())
}

fn build_batch(
batch: &RecordBatch,
left_data: &JoinLeftData,
Expand Down Expand Up @@ -370,9 +430,8 @@ fn build_join_indexes(
JoinType::Inner => {
// inner => key intersection
// unfortunately rust does not support intersection of map keys :(
let left_set: HashSet<Box<[GroupByScalar]>> = left.keys().cloned().collect();
let left_right: HashSet<Box<[GroupByScalar]>> =
right.keys().cloned().collect();
let left_set: HashSet<Vec<u8>> = left.keys().cloned().collect();
let left_right: HashSet<Vec<u8>> = right.keys().cloned().collect();
let inner = left_set.intersection(&left_right);

let mut indexes = Vec::new(); // unknown a prior size
Expand Down

0 comments on commit 259fd71

Please sign in to comment.