From 3de50c833b0e4bbb688b02240d11a1d5092c0d36 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Fri, 23 Aug 2024 10:24:59 +0200 Subject: [PATCH] [minor] Use Vec instead of primitive builders (#12121) * Use vec instead of builder * Compile * Use vec instead of builder * Revert --- .../src/aggregate/groups_accumulator.rs | 8 +++--- .../physical-plan/src/joins/hash_join.rs | 25 +++++++----------- .../src/joins/nested_loop_join.rs | 25 ++++++++---------- .../src/joins/symmetric_hash_join.rs | 10 +++---- datafusion/physical-plan/src/joins/utils.rs | 26 ++++++++----------- .../physical-plan/src/repartition/mod.rs | 14 +++++----- 6 files changed, 46 insertions(+), 62 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs index 3984b02c5fbb..1c97d22ec79c 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs @@ -24,7 +24,7 @@ pub mod nulls; pub mod prim_op; use arrow::{ - array::{ArrayRef, AsArray, BooleanArray, PrimitiveArray, UInt32Builder}, + array::{ArrayRef, AsArray, BooleanArray, PrimitiveArray}, compute, datatypes::UInt32Type, }; @@ -170,7 +170,7 @@ impl GroupsAccumulatorAdapter { let mut groups_with_rows = vec![]; // batch_indices holds indices into values, each group is contiguous - let mut batch_indices = UInt32Builder::with_capacity(0); + let mut batch_indices = vec![]; // offsets[i] is index into batch_indices where the rows for // group_index i starts @@ -184,11 +184,11 @@ impl GroupsAccumulatorAdapter { } groups_with_rows.push(group_index); - batch_indices.append_slice(indices); + batch_indices.extend_from_slice(indices); offset_so_far += indices.len(); offsets.push(offset_so_far); } - let batch_indices = batch_indices.finish(); + let batch_indices = batch_indices.into(); // reorder the values and opt_filter by batch_indices so that // all values for each group are contiguous, then invoke the diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index e40a07cf6220..7fac23ad5557 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -49,8 +49,7 @@ use crate::{ }; use arrow::array::{ - Array, ArrayRef, BooleanArray, BooleanBufferBuilder, PrimitiveArray, UInt32Array, - UInt64Array, + Array, ArrayRef, BooleanArray, BooleanBufferBuilder, UInt32Array, UInt64Array, }; use arrow::compute::kernels::cmp::{eq, not_distinct}; use arrow::compute::{and, concat_batches, take, FilterBuilder}; @@ -1204,13 +1203,11 @@ fn lookup_join_hashmap( }) .collect::>>()?; - let (mut probe_builder, mut build_builder, next_offset) = build_hashmap + let (probe_indices, build_indices, next_offset) = build_hashmap .get_matched_indices_with_limit_offset(hashes_buffer, None, limit, offset); - let build_indices: UInt64Array = - PrimitiveArray::new(build_builder.finish().into(), None); - let probe_indices: UInt32Array = - PrimitiveArray::new(probe_builder.finish().into(), None); + let build_indices: UInt64Array = build_indices.into(); + let probe_indices: UInt32Array = probe_indices.into(); let (build_indices, probe_indices) = equal_rows_arr( &build_indices, @@ -1566,7 +1563,7 @@ mod tests { test::build_table_i32, test::exec::MockExec, }; - use arrow::array::{Date32Array, Int32Array, UInt32Builder, UInt64Builder}; + use arrow::array::{Date32Array, Int32Array}; use arrow::datatypes::{DataType, Field}; use arrow_array::StructArray; use arrow_buffer::NullBuffer; @@ -3169,17 +3166,13 @@ mod tests { (0, None), )?; - let mut left_ids = UInt64Builder::with_capacity(0); - left_ids.append_value(0); - left_ids.append_value(1); + let left_ids: UInt64Array = vec![0, 1].into(); - let mut right_ids = UInt32Builder::with_capacity(0); - right_ids.append_value(0); - right_ids.append_value(1); + let right_ids: UInt32Array = vec![0, 1].into(); - assert_eq!(left_ids.finish(), l); + assert_eq!(left_ids, l); - assert_eq!(right_ids.finish(), r); + assert_eq!(right_ids, r); Ok(()) } diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index 04a025c93288..18de2de03192 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -40,13 +40,12 @@ use crate::{ RecordBatchStream, SendableRecordBatchStream, }; -use arrow::array::{ - BooleanBufferBuilder, UInt32Array, UInt32Builder, UInt64Array, UInt64Builder, -}; +use arrow::array::{BooleanBufferBuilder, UInt32Array, UInt64Array}; use arrow::compute::concat_batches; -use arrow::datatypes::{Schema, SchemaRef}; +use arrow::datatypes::{Schema, SchemaRef, UInt64Type}; use arrow::record_batch::RecordBatch; use arrow::util::bit_util; +use arrow_array::PrimitiveArray; use datafusion_common::{exec_datafusion_err, JoinSide, Result, Statistics}; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_execution::TaskContext; @@ -573,23 +572,21 @@ fn join_left_and_right_batch( ) })?; - let mut left_indices_builder = UInt64Builder::new(); - let mut right_indices_builder = UInt32Builder::new(); + let mut left_indices_builder: Vec = vec![]; + let mut right_indices_builder: Vec = vec![]; for (left_side, right_side) in indices { - left_indices_builder - .append_values(left_side.values(), &vec![true; left_side.len()]); - right_indices_builder - .append_values(right_side.values(), &vec![true; right_side.len()]); + left_indices_builder.extend(left_side.values()); + right_indices_builder.extend(right_side.values()); } - let left_side = left_indices_builder.finish(); - let right_side = right_indices_builder.finish(); + let left_side: PrimitiveArray = left_indices_builder.into(); + let right_side = right_indices_builder.into(); // set the left bitmap // and only full join need the left bitmap if need_produce_result_in_final(join_type) { let mut bitmap = visited_left_side.lock(); - left_side.iter().flatten().for_each(|x| { - bitmap.set_bit(x as usize, true); + left_side.values().iter().for_each(|x| { + bitmap.set_bit(*x as usize, true); }); } // adjust the two side indices base on the join type diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index 1bf2ef2fd5f7..7dab664502e9 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -929,13 +929,11 @@ fn lookup_join_hashmap( let (mut matched_probe, mut matched_build) = build_hashmap .get_matched_indices(hash_values.iter().enumerate().rev(), deleted_offset); - matched_probe.as_slice_mut().reverse(); - matched_build.as_slice_mut().reverse(); + matched_probe.reverse(); + matched_build.reverse(); - let build_indices: UInt64Array = - PrimitiveArray::new(matched_build.finish().into(), None); - let probe_indices: UInt32Array = - PrimitiveArray::new(matched_probe.finish().into(), None); + let build_indices: UInt64Array = matched_build.into(); + let probe_indices: UInt32Array = matched_probe.into(); let (build_indices, probe_indices) = equal_rows_arr( &build_indices, diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 80d8815bdebc..8fdbf7041e2f 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -31,7 +31,7 @@ use crate::{ use arrow::array::{ downcast_array, new_null_array, Array, BooleanBufferBuilder, UInt32Array, - UInt32BufferBuilder, UInt32Builder, UInt64Array, UInt64BufferBuilder, + UInt32Builder, UInt64Array, }; use arrow::compute; use arrow::datatypes::{Field, Schema, SchemaBuilder, UInt32Type, UInt64Type}; @@ -163,8 +163,8 @@ macro_rules! chain_traverse { } else { i }; - $match_indices.append(match_row_idx); - $input_indices.append($input_idx as u32); + $match_indices.push(match_row_idx); + $input_indices.push($input_idx as u32); $remaining_output -= 1; // Follow the chain to get the next index value let next = $next_chain[match_row_idx as usize]; @@ -238,9 +238,9 @@ pub trait JoinHashMapType { &self, iter: impl Iterator, deleted_offset: Option, - ) -> (UInt32BufferBuilder, UInt64BufferBuilder) { - let mut input_indices = UInt32BufferBuilder::new(0); - let mut match_indices = UInt64BufferBuilder::new(0); + ) -> (Vec, Vec) { + let mut input_indices = vec![]; + let mut match_indices = vec![]; let hash_map = self.get_map(); let next_chain = self.get_list(); @@ -261,8 +261,8 @@ pub trait JoinHashMapType { } else { i }; - match_indices.append(match_row_idx); - input_indices.append(row_idx as u32); + match_indices.push(match_row_idx); + input_indices.push(row_idx as u32); // Follow the chain to get the next index value let next = next_chain[match_row_idx as usize]; if next == 0 { @@ -289,13 +289,9 @@ pub trait JoinHashMapType { deleted_offset: Option, limit: usize, offset: JoinHashMapOffset, - ) -> ( - UInt32BufferBuilder, - UInt64BufferBuilder, - Option, - ) { - let mut input_indices = UInt32BufferBuilder::new(0); - let mut match_indices = UInt64BufferBuilder::new(0); + ) -> (Vec, Vec, Option) { + let mut input_indices = vec![]; + let mut match_indices = vec![]; let mut remaining_output = limit; diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 656d82215bbe..5a3fcb5029e1 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -38,9 +38,10 @@ use crate::sorts::streaming_merge; use crate::stream::RecordBatchStreamAdapter; use crate::{DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, Statistics}; -use arrow::array::{ArrayRef, UInt64Builder}; -use arrow::datatypes::SchemaRef; +use arrow::array::ArrayRef; +use arrow::datatypes::{SchemaRef, UInt64Type}; use arrow::record_batch::RecordBatch; +use arrow_array::PrimitiveArray; use datafusion_common::utils::transpose; use datafusion_common::{arrow_datafusion_err, not_impl_err, DataFusionError, Result}; use datafusion_common_runtime::SpawnedTask; @@ -275,12 +276,11 @@ impl BatchPartitioner { create_hashes(&arrays, random_state, hash_buffer)?; let mut indices: Vec<_> = (0..*partitions) - .map(|_| UInt64Builder::with_capacity(batch.num_rows())) + .map(|_| Vec::with_capacity(batch.num_rows())) .collect(); for (index, hash) in hash_buffer.iter().enumerate() { - indices[(*hash % *partitions as u64) as usize] - .append_value(index as u64); + indices[(*hash % *partitions as u64) as usize].push(index as u64); } // Finished building index-arrays for output partitions @@ -291,8 +291,8 @@ impl BatchPartitioner { let it = indices .into_iter() .enumerate() - .filter_map(|(partition, mut indices)| { - let indices = indices.finish(); + .filter_map(|(partition, indices)| { + let indices: PrimitiveArray = indices.into(); (!indices.is_empty()).then_some((partition, indices)) }) .map(move |(partition, indices)| {