From 053795cd4ec5cbac022fbea767b79f8da1424060 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=9E=97=E4=BC=9F?= Date: Thu, 8 Aug 2024 09:48:09 +0800 Subject: [PATCH] Improve nested loop join code (#11863) * Improve nested loop join code * fmt --- .../src/joins/nested_loop_join.rs | 90 +++++++++---------- 1 file changed, 41 insertions(+), 49 deletions(-) diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index 9f1465c2d7c1..d69d818331be 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -47,7 +47,7 @@ use arrow::compute::concat_batches; use arrow::datatypes::{Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use arrow::util::bit_util; -use datafusion_common::{exec_err, JoinSide, Result, Statistics}; +use datafusion_common::{exec_datafusion_err, JoinSide, Result, Statistics}; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_execution::TaskContext; use datafusion_expr::JoinType; @@ -562,62 +562,54 @@ fn join_left_and_right_batch( schema: &Schema, visited_left_side: &SharedBitmapBuilder, ) -> Result { - let indices_result = (0..left_batch.num_rows()) + let indices = (0..left_batch.num_rows()) .map(|left_row_index| { build_join_indices(left_row_index, right_batch, left_batch, filter) }) - .collect::>>(); + .collect::>>() + .map_err(|e| { + exec_datafusion_err!( + "Fail to build join indices in NestedLoopJoinExec, error:{e}" + ) + })?; let mut left_indices_builder = UInt64Builder::new(); let mut right_indices_builder = UInt32Builder::new(); - let left_right_indices = match indices_result { - Err(err) => { - exec_err!("Fail to build join indices in NestedLoopJoinExec, error:{err}") - } - Ok(indices) => { - for (left_side, right_side) in indices { - left_indices_builder - .append_values(left_side.values(), &vec![true; left_side.len()]); - right_indices_builder - .append_values(right_side.values(), &vec![true; right_side.len()]); - } - Ok(( - left_indices_builder.finish(), - right_indices_builder.finish(), - )) - } - }; - match left_right_indices { - Ok((left_side, right_side)) => { - // set the left bitmap - // and only full join need the left bitmap - if need_produce_result_in_final(join_type) { - let mut bitmap = visited_left_side.lock(); - left_side.iter().flatten().for_each(|x| { - bitmap.set_bit(x as usize, true); - }); - } - // adjust the two side indices base on the join type - let (left_side, right_side) = adjust_indices_by_join_type( - left_side, - right_side, - 0..right_batch.num_rows(), - join_type, - false, - ); + for (left_side, right_side) in indices { + left_indices_builder + .append_values(left_side.values(), &vec![true; left_side.len()]); + right_indices_builder + .append_values(right_side.values(), &vec![true; right_side.len()]); + } - build_batch_from_indices( - schema, - left_batch, - right_batch, - &left_side, - &right_side, - column_indices, - JoinSide::Left, - ) - } - Err(e) => Err(e), + let left_side = left_indices_builder.finish(); + let right_side = right_indices_builder.finish(); + // set the left bitmap + // and only full join need the left bitmap + if need_produce_result_in_final(join_type) { + let mut bitmap = visited_left_side.lock(); + left_side.iter().flatten().for_each(|x| { + bitmap.set_bit(x as usize, true); + }); } + // adjust the two side indices base on the join type + let (left_side, right_side) = adjust_indices_by_join_type( + left_side, + right_side, + 0..right_batch.num_rows(), + join_type, + false, + ); + + build_batch_from_indices( + schema, + left_batch, + right_batch, + &left_side, + &right_side, + column_indices, + JoinSide::Left, + ) } fn get_final_indices_from_shared_bitmap(