From 8397855f24ce548fe53ed0d7a8bb4d9cbbeb866b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alihan=20=C3=87elikcan?= Date: Fri, 20 Sep 2024 16:29:20 +0300 Subject: [PATCH] Fix NestedLoopJoin performance regression (#12531) * Optimize apply_join_filter_to_indices calls * Optimize join indices calculation * Cache join indices * Update datafusion/physical-plan/src/joins/nested_loop_join.rs * Fix missing flag for adjust_indices_by_join_type * Fix SQL logic test --------- Co-authored-by: Mehmet Ozan Kabak --- .../src/joins/nested_loop_join.rs | 111 +++++++++++++----- datafusion/sqllogictest/test_files/joins.slt | 2 +- 2 files changed, 83 insertions(+), 30 deletions(-) diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index b30e5184f0f7..029003374acc 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -42,10 +42,9 @@ use crate::{ use arrow::array::{BooleanBufferBuilder, UInt32Array, UInt64Array}; use arrow::compute::concat_batches; -use arrow::datatypes::{Schema, SchemaRef, UInt64Type}; +use arrow::datatypes::{Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use arrow::util::bit_util; -use arrow_array::PrimitiveArray; use datafusion_common::{exec_datafusion_err, JoinSide, Result, Statistics}; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_execution::TaskContext; @@ -348,6 +347,11 @@ impl ExecutionPlan for NestedLoopJoinExec { let outer_table = self.right.execute(partition, context)?; + let indices_cache = (UInt64Array::new_null(0), UInt32Array::new_null(0)); + + // Right side has an order and it is maintained during operation. + let right_side_ordered = + self.maintains_input_order()[1] && self.right.output_ordering().is_some(); Ok(Box::pin(NestedLoopJoinStream { schema: Arc::clone(&self.schema), filter: self.filter.clone(), @@ -357,6 +361,8 @@ impl ExecutionPlan for NestedLoopJoinExec { is_exhausted: false, column_indices: self.column_indices.clone(), join_metrics, + indices_cache, + right_side_ordered, })) } @@ -456,21 +462,74 @@ struct NestedLoopJoinStream { // null_equals_null: bool /// Join execution metrics join_metrics: BuildProbeJoinMetrics, + /// Cache for join indices calculations + indices_cache: (UInt64Array, UInt32Array), + /// Whether the right side is ordered + right_side_ordered: bool, } +/// Creates a Cartesian product of two input batches, preserving the order of the right batch, +/// and applying a join filter if provided. +/// +/// # Example +/// Input: +/// left = [0, 1], right = [0, 1, 2] +/// +/// Output: +/// left_indices = [0, 1, 0, 1, 0, 1], right_indices = [0, 0, 1, 1, 2, 2] +/// +/// Input: +/// left = [0, 1, 2], right = [0, 1, 2, 3], filter = left.a != right.a +/// +/// Output: +/// left_indices = [1, 2, 0, 2, 0, 1, 0, 1, 2], right_indices = [0, 0, 1, 1, 2, 2, 3, 3, 3] fn build_join_indices( - right_row_index: usize, left_batch: &RecordBatch, right_batch: &RecordBatch, filter: Option<&JoinFilter>, + indices_cache: &mut (UInt64Array, UInt32Array), ) -> Result<(UInt64Array, UInt32Array)> { - // left indices: [0, 1, 2, 3, 4, ..., left_row_count] - // right indices: [right_index, right_index, ..., right_index] - let left_row_count = left_batch.num_rows(); - let left_indices = UInt64Array::from_iter_values(0..(left_row_count as u64)); - let right_indices = UInt32Array::from(vec![right_row_index as u32; left_row_count]); - // in the nested loop join, the filter can contain non-equal and equal condition. + let right_row_count = right_batch.num_rows(); + let output_row_count = left_row_count * right_row_count; + + // We always use the same indices before applying the filter, so we can cache them + let (left_indices_cache, right_indices_cache) = indices_cache; + let cached_output_row_count = left_indices_cache.len(); + + let (left_indices, right_indices) = + match output_row_count.cmp(&cached_output_row_count) { + std::cmp::Ordering::Equal => { + // Reuse the cached indices + (left_indices_cache.clone(), right_indices_cache.clone()) + } + std::cmp::Ordering::Less => { + // Left_row_count never changes because it's the build side. The changes to the + // right_row_count can be handled trivially by taking the first output_row_count + // elements of the cache because of how the indices are generated. + // (See the Ordering::Greater match arm) + ( + left_indices_cache.slice(0, output_row_count), + right_indices_cache.slice(0, output_row_count), + ) + } + std::cmp::Ordering::Greater => { + // Rebuild the indices cache + + // Produces 0, 1, 2, 0, 1, 2, 0, 1, 2, ... + *left_indices_cache = UInt64Array::from_iter_values( + (0..output_row_count as u64).map(|i| i % left_row_count as u64), + ); + + // Produces 0, 0, 0, 1, 1, 1, 2, 2, 2, ... + *right_indices_cache = UInt32Array::from_iter_values( + (0..output_row_count as u32).map(|i| i / left_row_count as u32), + ); + + (left_indices_cache.clone(), right_indices_cache.clone()) + } + }; + if let Some(filter) = filter { apply_join_filter_to_indices( left_batch, @@ -524,6 +583,8 @@ impl NestedLoopJoinStream { &self.column_indices, &self.schema, visited_left_side, + &mut self.indices_cache, + self.right_side_ordered, ); // Recording time & updating output metrics @@ -587,6 +648,7 @@ impl NestedLoopJoinStream { } } +#[allow(clippy::too_many_arguments)] fn join_left_and_right_batch( left_batch: &RecordBatch, right_batch: &RecordBatch, @@ -595,27 +657,18 @@ fn join_left_and_right_batch( column_indices: &[ColumnIndex], schema: &Schema, visited_left_side: &SharedBitmapBuilder, + indices_cache: &mut (UInt64Array, UInt32Array), + right_side_ordered: bool, ) -> Result { - let indices = (0..right_batch.num_rows()) - .map(|right_row_index| { - build_join_indices(right_row_index, left_batch, right_batch, filter) - }) - .collect::>>() - .map_err(|e| { - exec_datafusion_err!( - "Fail to build join indices in NestedLoopJoinExec, error:{e}" - ) - })?; - - let mut left_indices_builder: Vec = vec![]; - let mut right_indices_builder: Vec = vec![]; - for (left_side, right_side) in indices { - left_indices_builder.extend(left_side.values()); - right_indices_builder.extend(right_side.values()); - } + let (left_side, right_side) = + build_join_indices(left_batch, right_batch, filter, indices_cache).map_err( + |e| { + exec_datafusion_err!( + "Fail to build join indices in NestedLoopJoinExec, error: {e}" + ) + }, + )?; - let left_side: PrimitiveArray = left_indices_builder.into(); - let right_side = right_indices_builder.into(); // set the left bitmap // and only full join need the left bitmap if need_produce_result_in_final(join_type) { @@ -630,7 +683,7 @@ fn join_left_and_right_batch( right_side, 0..right_batch.num_rows(), join_type, - true, + right_side_ordered, ); build_batch_from_indices( diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 679c2eee10a4..7d0262952b31 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -2136,10 +2136,10 @@ FROM (select t1_id from join_t1 where join_t1.t1_id > 22) as join_t1 RIGHT JOIN (select t2_id from join_t2 where join_t2.t2_id > 11) as join_t2 ON join_t1.t1_id < join_t2.t2_id ---- -NULL 22 33 44 33 55 44 55 +NULL 22 ##### # Configuration teardown