Skip to content

Commit

Permalink
Fix NestedLoopJoin performance regression (#12531)
Browse files Browse the repository at this point in the history
* Optimize apply_join_filter_to_indices calls

* Optimize join indices calculation

* Cache join indices

* Update datafusion/physical-plan/src/joins/nested_loop_join.rs

* Fix missing flag for adjust_indices_by_join_type

* Fix SQL logic test

---------

Co-authored-by: Mehmet Ozan Kabak <ozankabak@gmail.com>
  • Loading branch information
alihan-synnada and ozankabak committed Sep 20, 2024
1 parent 0243ebd commit 8397855
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 30 deletions.
111 changes: 82 additions & 29 deletions datafusion/physical-plan/src/joins/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,9 @@ use crate::{

use arrow::array::{BooleanBufferBuilder, UInt32Array, UInt64Array};
use arrow::compute::concat_batches;
use arrow::datatypes::{Schema, SchemaRef, UInt64Type};
use arrow::datatypes::{Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use arrow::util::bit_util;
use arrow_array::PrimitiveArray;
use datafusion_common::{exec_datafusion_err, JoinSide, Result, Statistics};
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion_execution::TaskContext;
Expand Down Expand Up @@ -348,6 +347,11 @@ impl ExecutionPlan for NestedLoopJoinExec {

let outer_table = self.right.execute(partition, context)?;

let indices_cache = (UInt64Array::new_null(0), UInt32Array::new_null(0));

// Right side has an order and it is maintained during operation.
let right_side_ordered =
self.maintains_input_order()[1] && self.right.output_ordering().is_some();
Ok(Box::pin(NestedLoopJoinStream {
schema: Arc::clone(&self.schema),
filter: self.filter.clone(),
Expand All @@ -357,6 +361,8 @@ impl ExecutionPlan for NestedLoopJoinExec {
is_exhausted: false,
column_indices: self.column_indices.clone(),
join_metrics,
indices_cache,
right_side_ordered,
}))
}

Expand Down Expand Up @@ -456,21 +462,74 @@ struct NestedLoopJoinStream {
// null_equals_null: bool
/// Join execution metrics
join_metrics: BuildProbeJoinMetrics,
/// Cache for join indices calculations
indices_cache: (UInt64Array, UInt32Array),
/// Whether the right side is ordered
right_side_ordered: bool,
}

/// Creates a Cartesian product of two input batches, preserving the order of the right batch,
/// and applying a join filter if provided.
///
/// # Example
/// Input:
/// left = [0, 1], right = [0, 1, 2]
///
/// Output:
/// left_indices = [0, 1, 0, 1, 0, 1], right_indices = [0, 0, 1, 1, 2, 2]
///
/// Input:
/// left = [0, 1, 2], right = [0, 1, 2, 3], filter = left.a != right.a
///
/// Output:
/// left_indices = [1, 2, 0, 2, 0, 1, 0, 1, 2], right_indices = [0, 0, 1, 1, 2, 2, 3, 3, 3]
fn build_join_indices(
right_row_index: usize,
left_batch: &RecordBatch,
right_batch: &RecordBatch,
filter: Option<&JoinFilter>,
indices_cache: &mut (UInt64Array, UInt32Array),
) -> Result<(UInt64Array, UInt32Array)> {
// left indices: [0, 1, 2, 3, 4, ..., left_row_count]
// right indices: [right_index, right_index, ..., right_index]

let left_row_count = left_batch.num_rows();
let left_indices = UInt64Array::from_iter_values(0..(left_row_count as u64));
let right_indices = UInt32Array::from(vec![right_row_index as u32; left_row_count]);
// in the nested loop join, the filter can contain non-equal and equal condition.
let right_row_count = right_batch.num_rows();
let output_row_count = left_row_count * right_row_count;

// We always use the same indices before applying the filter, so we can cache them
let (left_indices_cache, right_indices_cache) = indices_cache;
let cached_output_row_count = left_indices_cache.len();

let (left_indices, right_indices) =
match output_row_count.cmp(&cached_output_row_count) {
std::cmp::Ordering::Equal => {
// Reuse the cached indices
(left_indices_cache.clone(), right_indices_cache.clone())
}
std::cmp::Ordering::Less => {
// Left_row_count never changes because it's the build side. The changes to the
// right_row_count can be handled trivially by taking the first output_row_count
// elements of the cache because of how the indices are generated.
// (See the Ordering::Greater match arm)
(
left_indices_cache.slice(0, output_row_count),
right_indices_cache.slice(0, output_row_count),
)
}
std::cmp::Ordering::Greater => {
// Rebuild the indices cache

// Produces 0, 1, 2, 0, 1, 2, 0, 1, 2, ...
*left_indices_cache = UInt64Array::from_iter_values(
(0..output_row_count as u64).map(|i| i % left_row_count as u64),
);

// Produces 0, 0, 0, 1, 1, 1, 2, 2, 2, ...
*right_indices_cache = UInt32Array::from_iter_values(
(0..output_row_count as u32).map(|i| i / left_row_count as u32),
);

(left_indices_cache.clone(), right_indices_cache.clone())
}
};

if let Some(filter) = filter {
apply_join_filter_to_indices(
left_batch,
Expand Down Expand Up @@ -524,6 +583,8 @@ impl NestedLoopJoinStream {
&self.column_indices,
&self.schema,
visited_left_side,
&mut self.indices_cache,
self.right_side_ordered,
);

// Recording time & updating output metrics
Expand Down Expand Up @@ -587,6 +648,7 @@ impl NestedLoopJoinStream {
}
}

#[allow(clippy::too_many_arguments)]
fn join_left_and_right_batch(
left_batch: &RecordBatch,
right_batch: &RecordBatch,
Expand All @@ -595,27 +657,18 @@ fn join_left_and_right_batch(
column_indices: &[ColumnIndex],
schema: &Schema,
visited_left_side: &SharedBitmapBuilder,
indices_cache: &mut (UInt64Array, UInt32Array),
right_side_ordered: bool,
) -> Result<RecordBatch> {
let indices = (0..right_batch.num_rows())
.map(|right_row_index| {
build_join_indices(right_row_index, left_batch, right_batch, filter)
})
.collect::<Result<Vec<(UInt64Array, UInt32Array)>>>()
.map_err(|e| {
exec_datafusion_err!(
"Fail to build join indices in NestedLoopJoinExec, error:{e}"
)
})?;

let mut left_indices_builder: Vec<u64> = vec![];
let mut right_indices_builder: Vec<u32> = vec![];
for (left_side, right_side) in indices {
left_indices_builder.extend(left_side.values());
right_indices_builder.extend(right_side.values());
}
let (left_side, right_side) =
build_join_indices(left_batch, right_batch, filter, indices_cache).map_err(
|e| {
exec_datafusion_err!(
"Fail to build join indices in NestedLoopJoinExec, error: {e}"
)
},
)?;

let left_side: PrimitiveArray<UInt64Type> = left_indices_builder.into();
let right_side = right_indices_builder.into();
// set the left bitmap
// and only full join need the left bitmap
if need_produce_result_in_final(join_type) {
Expand All @@ -630,7 +683,7 @@ fn join_left_and_right_batch(
right_side,
0..right_batch.num_rows(),
join_type,
true,
right_side_ordered,
);

build_batch_from_indices(
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/joins.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2136,10 +2136,10 @@ FROM (select t1_id from join_t1 where join_t1.t1_id > 22) as join_t1
RIGHT JOIN (select t2_id from join_t2 where join_t2.t2_id > 11) as join_t2
ON join_t1.t1_id < join_t2.t2_id
----
NULL 22
33 44
33 55
44 55
NULL 22

#####
# Configuration teardown
Expand Down

0 comments on commit 8397855

Please sign in to comment.