From 270ee6afd952159a34b868186c04a772a0992adf Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 2 Aug 2021 13:46:26 -0400 Subject: [PATCH 1/5] Implement faster GroupByHash design --- .../src/physical_plan/hash_aggregate.rs | 391 +++++------------- 1 file changed, 97 insertions(+), 294 deletions(-) diff --git a/datafusion/src/physical_plan/hash_aggregate.rs b/datafusion/src/physical_plan/hash_aggregate.rs index 5c3c57695d0f..28bf51a191fd 100644 --- a/datafusion/src/physical_plan/hash_aggregate.rs +++ b/datafusion/src/physical_plan/hash_aggregate.rs @@ -29,42 +29,26 @@ use futures::{ }; use crate::error::{DataFusionError, Result}; +use crate::physical_plan::hash_utils::create_hashes; use crate::physical_plan::{ Accumulator, AggregateExpr, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PhysicalExpr, SQLMetric, }; use crate::scalar::ScalarValue; +use arrow::{array::ArrayRef, compute, compute::cast}; use arrow::{ array::{Array, UInt32Builder}, error::{ArrowError, Result as ArrowResult}, }; use arrow::{ - array::{ - ArrayRef, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, - Int8Array, StringArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array, - }, - compute, -}; -use arrow::{ - array::{BooleanArray, Date32Array, DictionaryArray}, - compute::cast, - datatypes::{ - ArrowDictionaryKeyType, ArrowNativeType, Int16Type, Int32Type, Int64Type, - Int8Type, UInt16Type, UInt32Type, UInt64Type, UInt8Type, - }, -}; -use arrow::{ - datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit}, + datatypes::{Field, Schema, SchemaRef}, record_batch::RecordBatch, }; use hashbrown::HashMap; use pin_project_lite::pin_project; +use smallvec::{smallvec, SmallVec}; -use arrow::array::{ - LargeStringArray, TimestampMicrosecondArray, TimestampMillisecondArray, - TimestampNanosecondArray, -}; use async_trait::async_trait; use super::{expressions::Column, RecordBatchStream, SendableRecordBatchStream}; @@ -339,6 +323,7 @@ pin_project! { fn group_aggregate_batch( mode: &AggregateMode, + random_state: &RandomState, group_expr: &[Arc], aggr_expr: &[Arc], batch: RecordBatch, @@ -363,8 +348,6 @@ fn group_aggregate_batch( let mut group_by_values = group_by_values.into_boxed_slice(); - let mut key = Vec::with_capacity(group_values.len()); - // 1.1 construct the key from the group values // 1.2 construct the mapping key if it does not exist // 1.3 add the row' index to `indices` @@ -372,37 +355,53 @@ fn group_aggregate_batch( // Make sure we can create the accumulators or otherwise return an error create_accumulators(aggr_expr).map_err(DataFusionError::into_arrow_external_error)?; - // Keys received in this batch - let mut batch_keys = vec![]; + // track which entries in `accumulators` have rows in this batch to aggregate + let mut groups_with_rows = vec![]; - for row in 0..batch.num_rows() { - // 1.1 - create_key(&group_values, row, &mut key) - .map_err(DataFusionError::into_arrow_external_error)?; + // 1.1 Calculate the group keys for the group values + let mut batch_hashes = vec![0; batch.num_rows()]; + create_hashes(&group_values, random_state, &mut batch_hashes)?; + + for (row, hash) in batch_hashes.into_iter().enumerate() { + let Accumulators { map, group_states } = &mut accumulators; - accumulators + map .raw_entry_mut() - .from_key(&key) + .from_key(&hash) // 1.3 - .and_modify(|_, (_, _, v)| { - if v.is_empty() { - batch_keys.push(key.clone()) + .and_modify(|_, group_state_indexes| { + let mut iter = group_state_indexes.iter(); + let group_idx = *(iter.next().unwrap()); // created with 1 element + /* ******* TODO THIS IS WHERE COLLISIONS NEED TO BE HANDLED ******** */ + assert!(iter.next().is_none(), "TODO hash collisions"); + + // 1.3 + let group_state = &mut group_states[group_idx]; + if group_state.indices.is_empty() { + groups_with_rows.push(group_idx); }; - v.push(row as u32) + group_state.indices.push(row as u32); // remember this row }) // 1.2 .or_insert_with(|| { // We can safely unwrap here as we checked we can create an accumulator before let accumulator_set = create_accumulators(aggr_expr).unwrap(); - batch_keys.push(key.clone()); + // Note it would be nice to make this a real error (rather than panic) // but it is better than silently ignoring the issue and getting wrong results create_group_by_values(&group_values, row, &mut group_by_values) .expect("can not create group by value"); - ( - key.clone(), - (group_by_values.clone(), accumulator_set, vec![row as u32]), - ) + + let group_state = GroupState { + group_by_values: group_by_values.clone(), + accumulator_set, + indices: vec![row as u32], // 1.3 + }; + + let group_idx = group_states.len(); + group_states.push(group_state); + groups_with_rows.push(group_idx); + (hash, smallvec![group_idx]) }); } @@ -410,8 +409,8 @@ fn group_aggregate_batch( let mut batch_indices: UInt32Builder = UInt32Builder::new(0); let mut offsets = vec![0]; let mut offset_so_far = 0; - for key in batch_keys.iter() { - let (_, _, indices) = accumulators.get_mut(key).unwrap(); + for group_idx in groups_with_rows.iter() { + let indices = &accumulators.group_states[*group_idx].indices; batch_indices.append_slice(indices)?; offset_so_far += indices.len(); offsets.push(offset_so_far); @@ -442,13 +441,14 @@ fn group_aggregate_batch( // 2.3 `slice` from each of its arrays the keys' values // 2.4 update / merge the accumulator with the values // 2.5 clear indices - batch_keys - .iter_mut() + groups_with_rows + .iter() .zip(offsets.windows(2)) - .try_for_each(|(key, offsets)| { - let (_, accumulator_set, indices) = accumulators.get_mut(key).unwrap(); + .try_for_each(|(group_idx, offsets)| { + let group_state = &mut accumulators.group_states[*group_idx]; // 2.2 - accumulator_set + group_state + .accumulator_set .iter_mut() .zip(values.iter()) .map(|(accumulator, aggr_array)| { @@ -472,238 +472,12 @@ fn group_aggregate_batch( }) // 2.5 .and({ - indices.clear(); + group_state.indices.clear(); Ok(()) }) })?; - Ok(accumulators) -} - -/// Appends a sequence of [u8] bytes for the value in `col[row]` to -/// `vec` to be used as a key into the hash map for a dictionary type -/// -/// Note that ideally, for dictionary encoded columns, we would be -/// able to simply use the dictionary idicies themselves (no need to -/// look up values) or possibly simply build the hash table entirely -/// on the dictionary indexes. -/// -/// This aproach would likely work (very) well for the common case, -/// but it also has to to handle the case where the dictionary itself -/// is not the same across all record batches (and thus indexes in one -/// record batch may not correspond to the same index in another) -fn dictionary_create_key_for_col( - col: &ArrayRef, - row: usize, - vec: &mut Vec, -) -> Result<()> { - let dict_col = col.as_any().downcast_ref::>().unwrap(); - - // look up the index in the values dictionary - let keys_col = dict_col.keys(); - let values_index = keys_col.value(row).to_usize().ok_or_else(|| { - DataFusionError::Internal(format!( - "Can not convert index to usize in dictionary of type creating group by value {:?}", - keys_col.data_type() - )) - })?; - create_key_for_col(dict_col.values(), values_index, vec) -} - -/// Appends a sequence of [u8] bytes for the value in `col[row]` to -/// `vec` to be used as a key into the hash map. -/// -/// NOTE: This function does not check col.is_valid(). Caller must do so -fn create_key_for_col(col: &ArrayRef, row: usize, vec: &mut Vec) -> Result<()> { - match col.data_type() { - DataType::Boolean => { - let array = col.as_any().downcast_ref::().unwrap(); - vec.extend_from_slice(&[array.value(row) as u8]); - } - DataType::Float32 => { - let array = col.as_any().downcast_ref::().unwrap(); - vec.extend_from_slice(&array.value(row).to_le_bytes()); - } - DataType::Float64 => { - let array = col.as_any().downcast_ref::().unwrap(); - vec.extend_from_slice(&array.value(row).to_le_bytes()); - } - DataType::UInt8 => { - let array = col.as_any().downcast_ref::().unwrap(); - vec.extend_from_slice(&array.value(row).to_le_bytes()); - } - DataType::UInt16 => { - let array = col.as_any().downcast_ref::().unwrap(); - vec.extend_from_slice(&array.value(row).to_le_bytes()); - } - DataType::UInt32 => { - let array = col.as_any().downcast_ref::().unwrap(); - vec.extend_from_slice(&array.value(row).to_le_bytes()); - } - DataType::UInt64 => { - let array = col.as_any().downcast_ref::().unwrap(); - vec.extend_from_slice(&array.value(row).to_le_bytes()); - } - DataType::Int8 => { - let array = col.as_any().downcast_ref::().unwrap(); - vec.extend_from_slice(&array.value(row).to_le_bytes()); - } - DataType::Int16 => { - let array = col.as_any().downcast_ref::().unwrap(); - vec.extend(array.value(row).to_le_bytes().iter()); - } - DataType::Int32 => { - let array = col.as_any().downcast_ref::().unwrap(); - vec.extend_from_slice(&array.value(row).to_le_bytes()); - } - DataType::Int64 => { - let array = col.as_any().downcast_ref::().unwrap(); - vec.extend_from_slice(&array.value(row).to_le_bytes()); - } - DataType::Timestamp(TimeUnit::Millisecond, None) => { - let array = col - .as_any() - .downcast_ref::() - .unwrap(); - vec.extend_from_slice(&array.value(row).to_le_bytes()); - } - DataType::Timestamp(TimeUnit::Microsecond, None) => { - let array = col - .as_any() - .downcast_ref::() - .unwrap(); - vec.extend_from_slice(&array.value(row).to_le_bytes()); - } - DataType::Timestamp(TimeUnit::Nanosecond, None) => { - let array = col - .as_any() - .downcast_ref::() - .unwrap(); - vec.extend_from_slice(&array.value(row).to_le_bytes()); - } - DataType::Utf8 => { - let array = col.as_any().downcast_ref::().unwrap(); - let value = array.value(row); - // store the size - vec.extend_from_slice(&value.len().to_le_bytes()); - // store the string value - vec.extend_from_slice(value.as_bytes()); - } - DataType::LargeUtf8 => { - let array = col.as_any().downcast_ref::().unwrap(); - let value = array.value(row); - // store the size - vec.extend_from_slice(&value.len().to_le_bytes()); - // store the string value - vec.extend_from_slice(value.as_bytes()); - } - DataType::Date32 => { - let array = col.as_any().downcast_ref::().unwrap(); - vec.extend_from_slice(&array.value(row).to_le_bytes()); - } - DataType::Dictionary(index_type, _) => match **index_type { - DataType::Int8 => { - dictionary_create_key_for_col::(col, row, vec)?; - } - DataType::Int16 => { - dictionary_create_key_for_col::(col, row, vec)?; - } - DataType::Int32 => { - dictionary_create_key_for_col::(col, row, vec)?; - } - DataType::Int64 => { - dictionary_create_key_for_col::(col, row, vec)?; - } - DataType::UInt8 => { - dictionary_create_key_for_col::(col, row, vec)?; - } - DataType::UInt16 => { - dictionary_create_key_for_col::(col, row, vec)?; - } - DataType::UInt32 => { - dictionary_create_key_for_col::(col, row, vec)?; - } - DataType::UInt64 => { - dictionary_create_key_for_col::(col, row, vec)?; - } - _ => { - return Err(DataFusionError::Internal(format!( - "Unsupported GROUP BY type (dictionary index type not supported creating key) {}", - col.data_type(), - ))) - } - }, - _ => { - // This is internal because we should have caught this before. - return Err(DataFusionError::Internal(format!( - "Unsupported GROUP BY type creating key {}", - col.data_type(), - ))); - } - } - Ok(()) -} - -/// Create a key `Vec` that is used as key for the hashmap -/// -/// This looks like -/// [null_byte][col_value_bytes][null_byte][col_value_bytes] -/// -/// Note that relatively uncommon patterns (e.g. not 0x00) are chosen -/// for the null_byte to make debugging easier. The actual values are -/// arbitrary. -/// -/// For a NULL value in a column, the key looks like -/// [0xFE] -/// -/// For a Non-NULL value in a column, this looks like: -/// [0xFF][byte representation of column value] -/// -/// Example of a key with no NULL values: -/// ```text -/// 0xFF byte at the start of each column -/// signifies the value is non-null -/// │ -/// -/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ─ ─ ─ ┐ -/// -/// │ string len │ 0x1234 -/// { ▼ (as usize le) "foo" ▼(as u16 le) -/// k1: "foo" ╔ ═┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──╦ ═┌──┬──┐ -/// k2: 0x1234u16 FF║03│00│00│00│00│00│00│00│"f│"o│"o│FF║34│12│ -/// } ╚ ═└──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──╩ ═└──┴──┘ -/// 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 -/// ``` -/// -/// Example of a key with NULL values: -/// -///```text -/// 0xFE byte at the start of k1 column -/// ┌ ─ signifies the value is NULL -/// -/// └ ┐ -/// 0x1234 -/// { ▼ (as u16 le) -/// k1: NULL ╔ ═╔ ═┌──┬──┐ -/// k2: 0x1234u16 FE║FF║12│34│ -/// } ╚ ═╚ ═└──┴──┘ -/// 0 1 2 3 -///``` -pub(crate) fn create_key( - group_by_keys: &[ArrayRef], - row: usize, - vec: &mut Vec, -) -> Result<()> { - vec.clear(); - for col in group_by_keys { - if !col.is_valid(row) { - vec.push(0xFE); - } else { - vec.push(0xFF); - create_key_for_col(col, row, vec)? - } - } - Ok(()) + Ok(accumulators) } async fn compute_grouped_hash_aggregate( @@ -720,11 +494,7 @@ async fn compute_grouped_hash_aggregate( aggregate_expressions(&aggr_expr, &mode, group_expr.len()) .map_err(DataFusionError::into_arrow_external_error)?; - // mapping key -> (set of accumulators, indices of the key in the batch) - // * the indexes are updated at each row - // * the accumulators are updated at the end of each batch - // * the indexes are `clear`ed at the end of each batch - //let mut accumulators: Accumulators = FnvHashMap::default(); + let random_state = RandomState::new(); // iterate over all input batches and update the accumulators let mut accumulators = Accumulators::default(); @@ -732,6 +502,7 @@ async fn compute_grouped_hash_aggregate( let batch = batch?; accumulators = group_aggregate_batch( &mode, + &random_state, &group_expr, &aggr_expr, batch, @@ -779,8 +550,35 @@ impl GroupedHashAggregateStream { } type AccumulatorItem = Box; -type Accumulators = - HashMap, (Box<[ScalarValue]>, Vec, Vec), RandomState>; + +/// The state that is built for each output group. +#[derive(Debug)] +struct GroupState { + /// The actual group by values, one for each group column + group_by_values: Box<[ScalarValue]>, + + // Accumulator state, one for each aggregate + accumulator_set: Vec, + + /// scratch space used to collect indices for input rows in a + /// bach that have values to aggregate. Reset on each batch + indices: Vec, +} + +/// The state of all the groups +#[derive(Debug, Default)] +struct Accumulators { + /// Maps hash values to one or more indices in `group_states` + /// + /// keys: u64 hashes of the GroupValue + /// values: indices into `group_states` + /// + /// TODO: try and avoid double hashing + map: HashMap, RandomState>, + + /// State for each group + group_states: Vec, +} impl Stream for GroupedHashAggregateStream { type Item = ArrowResult; @@ -903,6 +701,7 @@ pin_project! { } } +/// Special case aggregate with no groups async fn compute_hash_aggregate( mode: AggregateMode, schema: SchemaRef, @@ -1031,10 +830,10 @@ fn create_batch_from_map( num_group_expr: usize, output_schema: &Schema, ) -> ArrowResult { - if accumulators.is_empty() { + if accumulators.group_states.is_empty() { return Ok(RecordBatch::new_empty(Arc::new(output_schema.to_owned()))); } - let (_, (_, accs, _)) = accumulators.iter().next().unwrap(); + let accs = &accumulators.group_states[0].accumulator_set; let mut acc_data_types: Vec = vec![]; // Calculate number/shape of state arrays @@ -1056,8 +855,9 @@ fn create_batch_from_map( .map(|i| { ScalarValue::iter_to_array( accumulators - .into_iter() - .map(|(_, (group_by_values, _, _))| group_by_values[i].clone()), + .group_states + .iter() + .map(|group_state| group_state.group_by_values[i].clone()), ) }) .collect::>>() @@ -1068,20 +868,22 @@ fn create_batch_from_map( for y in 0..state_len { match mode { AggregateMode::Partial => { - let res = ScalarValue::iter_to_array(accumulators.into_iter().map( - |(_, (_, accumulator, _))| { - let x = accumulator[x].state().unwrap(); + let res = ScalarValue::iter_to_array( + accumulators.group_states.iter().map(|group_state| { + let x = group_state.accumulator_set[x].state().unwrap(); x[y].clone() - }, - )) + }), + ) .map_err(DataFusionError::into_arrow_external_error)?; columns.push(res); } AggregateMode::Final | AggregateMode::FinalPartitioned => { - let res = ScalarValue::iter_to_array(accumulators.into_iter().map( - |(_, (_, accumulator, _))| accumulator[x].evaluate().unwrap(), - )) + let res = ScalarValue::iter_to_array( + accumulators.group_states.iter().map(|group_state| { + group_state.accumulator_set[x].evaluate().unwrap() + }), + ) .map_err(DataFusionError::into_arrow_external_error)?; columns.push(res); } @@ -1161,7 +963,8 @@ pub(crate) fn create_group_by_values( #[cfg(test)] mod tests { - use arrow::array::Float64Array; + use arrow::array::{Float64Array, UInt32Array}; + use arrow::datatypes::DataType; use super::*; use crate::physical_plan::expressions::{col, Avg}; From 6cd506f2355b5f92d06f732d4671a960761f377e Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 5 Aug 2021 14:25:44 -0400 Subject: [PATCH 2/5] Rewrite to use RawMap per Dandandan suggestion --- datafusion/Cargo.toml | 2 +- .../src/physical_plan/hash_aggregate.rs | 75 ++++++++++++------- datafusion/src/scalar.rs | 9 +++ 3 files changed, 56 insertions(+), 30 deletions(-) diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index 2716cc751500..286be8a7a511 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -48,7 +48,7 @@ force_hash_collisions = [] [dependencies] ahash = "0.7" -hashbrown = "0.11" +hashbrown = { version = "0.11", features = ["raw"] } arrow = { version = "5.1", features = ["prettyprint"] } parquet = { version = "5.1", features = ["arrow"] } sqlparser = "0.9.0" diff --git a/datafusion/src/physical_plan/hash_aggregate.rs b/datafusion/src/physical_plan/hash_aggregate.rs index 28bf51a191fd..2e52ca6d1d19 100644 --- a/datafusion/src/physical_plan/hash_aggregate.rs +++ b/datafusion/src/physical_plan/hash_aggregate.rs @@ -27,6 +27,7 @@ use futures::{ stream::{Stream, StreamExt}, Future, }; +use hashbrown::HashMap; use crate::error::{DataFusionError, Result}; use crate::physical_plan::hash_utils::create_hashes; @@ -45,9 +46,8 @@ use arrow::{ datatypes::{Field, Schema, SchemaRef}, record_batch::RecordBatch, }; -use hashbrown::HashMap; +use hashbrown::raw::RawTable; use pin_project_lite::pin_project; -use smallvec::{smallvec, SmallVec}; use async_trait::async_trait; @@ -365,44 +365,49 @@ fn group_aggregate_batch( for (row, hash) in batch_hashes.into_iter().enumerate() { let Accumulators { map, group_states } = &mut accumulators; - map - .raw_entry_mut() - .from_key(&hash) - // 1.3 - .and_modify(|_, group_state_indexes| { - let mut iter = group_state_indexes.iter(); - let group_idx = *(iter.next().unwrap()); // created with 1 element - /* ******* TODO THIS IS WHERE COLLISIONS NEED TO BE HANDLED ******** */ - assert!(iter.next().is_none(), "TODO hash collisions"); + let entry = map.get_mut(hash, |(_hash, group_idx)| { + // verify that a group that we are inserting with hash is + // actually the same key value as the group in + // existing_idx (aka group_values @ row) + let group_state = &group_states[*group_idx]; + group_values + .iter() + .zip(group_state.group_by_values.iter()) + .all(|(array, scalar)| scalar.eq_array(array, row)) + }); + match entry { + // Existing entry for this group value + Some((_hash, group_idx)) => { + let group_state = &mut group_states[*group_idx]; // 1.3 - let group_state = &mut group_states[group_idx]; if group_state.indices.is_empty() { - groups_with_rows.push(group_idx); + groups_with_rows.push(*group_idx); }; group_state.indices.push(row as u32); // remember this row - }) - // 1.2 - .or_insert_with(|| { + } + // 1.2 Need to create new entry + None => { // We can safely unwrap here as we checked we can create an accumulator before let accumulator_set = create_accumulators(aggr_expr).unwrap(); - // Note it would be nice to make this a real error (rather than panic) - // but it is better than silently ignoring the issue and getting wrong results - create_group_by_values(&group_values, row, &mut group_by_values) - .expect("can not create group by value"); + // Copy group values from arrays into ScalarValues + create_group_by_values(&group_values, row, &mut group_by_values)?; + // Add new entry to group_states and save newly created index let group_state = GroupState { group_by_values: group_by_values.clone(), accumulator_set, indices: vec![row as u32], // 1.3 }; - let group_idx = group_states.len(); group_states.push(group_state); groups_with_rows.push(group_idx); - (hash, smallvec![group_idx]) - }); + + // for hasher function, use precomputed hash value + map.insert(hash, (hash, group_idx), |(hash, _group_idx)| *hash); + } + }; } // Collect all indices + offsets based on keys in this vec @@ -566,20 +571,32 @@ struct GroupState { } /// The state of all the groups -#[derive(Debug, Default)] +#[derive(Default)] struct Accumulators { - /// Maps hash values to one or more indices in `group_states` + /// Logically maps group values to an index in `group_states` /// - /// keys: u64 hashes of the GroupValue - /// values: indices into `group_states` + /// Uses the raw API of hashbrown to avoid actually storing the + /// keys in the table /// - /// TODO: try and avoid double hashing - map: HashMap, RandomState>, + /// keys: u64 hashes of the GroupValue + /// values: (hash, index into `group_states`) + map: RawTable<(u64, usize)>, /// State for each group group_states: Vec, } +impl std::fmt::Debug for Accumulators { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + // hashes are not store inline, so could only get values + let map_string = "RawTable"; + f.debug_struct("Accumulators") + .field("map", &map_string) + .field("group_states", &self.group_states) + .finish() + } +} + impl Stream for GroupedHashAggregateStream { type Item = ArrowResult; diff --git a/datafusion/src/scalar.rs b/datafusion/src/scalar.rs index 86d17654c060..4582e0fb3edb 100644 --- a/datafusion/src/scalar.rs +++ b/datafusion/src/scalar.rs @@ -1120,6 +1120,15 @@ impl ScalarValue { None => self.is_null(), } } + + /// Compares array @ row for equality with self. + /// + /// TODO: optimize: avoid constructing an intermediate ScalarValue + #[inline] + pub fn eq_array(&self, array: &ArrayRef, index: usize) -> bool { + let arr_scalar = Self::try_from_array(array, index).unwrap(); + arr_scalar.eq(self) + } } impl From for ScalarValue { From e2b6c8e7029c0b433d8b5909b33b2fbf8b285ecc Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 9 Aug 2021 11:57:04 -0400 Subject: [PATCH 3/5] remove stub --- datafusion/src/scalar.rs | 9 --------- 1 file changed, 9 deletions(-) diff --git a/datafusion/src/scalar.rs b/datafusion/src/scalar.rs index 4582e0fb3edb..86d17654c060 100644 --- a/datafusion/src/scalar.rs +++ b/datafusion/src/scalar.rs @@ -1120,15 +1120,6 @@ impl ScalarValue { None => self.is_null(), } } - - /// Compares array @ row for equality with self. - /// - /// TODO: optimize: avoid constructing an intermediate ScalarValue - #[inline] - pub fn eq_array(&self, array: &ArrayRef, index: usize) -> bool { - let arr_scalar = Self::try_from_array(array, index).unwrap(); - arr_scalar.eq(self) - } } impl From for ScalarValue { From c3cf0d5318c257c9ee7757cb09aff28b7d8dd691 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 9 Aug 2021 12:20:24 -0400 Subject: [PATCH 4/5] Return error with create_accumulators --- datafusion/src/physical_plan/hash_aggregate.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/datafusion/src/physical_plan/hash_aggregate.rs b/datafusion/src/physical_plan/hash_aggregate.rs index 2e52ca6d1d19..14f271b656b2 100644 --- a/datafusion/src/physical_plan/hash_aggregate.rs +++ b/datafusion/src/physical_plan/hash_aggregate.rs @@ -352,9 +352,6 @@ fn group_aggregate_batch( // 1.2 construct the mapping key if it does not exist // 1.3 add the row' index to `indices` - // Make sure we can create the accumulators or otherwise return an error - create_accumulators(aggr_expr).map_err(DataFusionError::into_arrow_external_error)?; - // track which entries in `accumulators` have rows in this batch to aggregate let mut groups_with_rows = vec![]; @@ -388,8 +385,8 @@ fn group_aggregate_batch( } // 1.2 Need to create new entry None => { - // We can safely unwrap here as we checked we can create an accumulator before - let accumulator_set = create_accumulators(aggr_expr).unwrap(); + let accumulator_set = create_accumulators(aggr_expr) + .map_err(DataFusionError::into_arrow_external_error)?; // Copy group values from arrays into ScalarValues create_group_by_values(&group_values, row, &mut group_by_values)?; From c5bc0c116bc0a74719ed714b173dd39adb1da3ff Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 9 Aug 2021 13:06:56 -0400 Subject: [PATCH 5/5] Do not memoize group key creation --- .../src/physical_plan/hash_aggregate.rs | 29 ++++--------------- 1 file changed, 6 insertions(+), 23 deletions(-) diff --git a/datafusion/src/physical_plan/hash_aggregate.rs b/datafusion/src/physical_plan/hash_aggregate.rs index 14f271b656b2..1c07f61f10cd 100644 --- a/datafusion/src/physical_plan/hash_aggregate.rs +++ b/datafusion/src/physical_plan/hash_aggregate.rs @@ -346,8 +346,6 @@ fn group_aggregate_batch( group_by_values.push(ScalarValue::UInt32(Some(0))); } - let mut group_by_values = group_by_values.into_boxed_slice(); - // 1.1 construct the key from the group values // 1.2 construct the mapping key if it does not exist // 1.3 add the row' index to `indices` @@ -388,12 +386,15 @@ fn group_aggregate_batch( let accumulator_set = create_accumulators(aggr_expr) .map_err(DataFusionError::into_arrow_external_error)?; - // Copy group values from arrays into ScalarValues - create_group_by_values(&group_values, row, &mut group_by_values)?; + // Copy group values out of arrays into `ScalarValue`s + let group_by_values = group_values + .iter() + .map(|col| ScalarValue::try_from_array(col, row)) + .collect::>>()?; // Add new entry to group_states and save newly created index let group_state = GroupState { - group_by_values: group_by_values.clone(), + group_by_values: group_by_values.into_boxed_slice(), accumulator_set, indices: vec![row as u32], // 1.3 }; @@ -956,24 +957,6 @@ fn finalize_aggregation( } } -/// Extract the value in `col[row]` as a GroupByScalar -fn create_group_by_value(col: &ArrayRef, row: usize) -> Result { - ScalarValue::try_from_array(col, row) -} - -/// Extract the values in `group_by_keys` arrow arrays into the target vector -/// as GroupByScalar values -pub(crate) fn create_group_by_values( - group_by_keys: &[ArrayRef], - row: usize, - vec: &mut Box<[ScalarValue]>, -) -> Result<()> { - for (i, col) in group_by_keys.iter().enumerate() { - vec[i] = create_group_by_value(col, row)? - } - Ok(()) -} - #[cfg(test)] mod tests {