Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory limited nested-loop join #5564

Merged
merged 3 commits into from
Mar 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions datafusion/core/src/physical_plan/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,6 @@ use tokio::task::JoinHandle;
/// [`MemoryReservation`] used across query execution streams
pub(crate) type SharedMemoryReservation = Arc<Mutex<MemoryReservation>>;

/// [`MemoryReservation`] used at query operator level
/// `Option` wrapper allows to initialize empty reservation in operator constructor,
/// and set it to actual reservation at stream level.
pub(crate) type OperatorMemoryReservation = Arc<Mutex<Option<SharedMemoryReservation>>>;

/// Stream of record batches
pub struct SizedRecordBatchStream {
schema: SchemaRef,
Expand Down
28 changes: 7 additions & 21 deletions datafusion/core/src/physical_plan/joins/cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ use arrow::datatypes::{Schema, SchemaRef};
use arrow::record_batch::RecordBatch;

use crate::execution::context::TaskContext;
use crate::execution::memory_pool::MemoryConsumer;
use crate::physical_plan::common::{OperatorMemoryReservation, SharedMemoryReservation};
use crate::execution::memory_pool::{SharedOptionalMemoryReservation, TryGrow};
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::physical_plan::{
coalesce_batches::concat_batches, coalesce_partitions::CoalescePartitionsExec,
Expand All @@ -38,7 +37,6 @@ use crate::physical_plan::{
use crate::{error::Result, scalar::ScalarValue};
use async_trait::async_trait;
use datafusion_common::DataFusionError;
use parking_lot::Mutex;

use super::utils::{
adjust_right_output_partitioning, cross_join_equivalence_properties,
Expand All @@ -61,7 +59,7 @@ pub struct CrossJoinExec {
/// Build-side data
left_fut: OnceAsync<JoinLeftData>,
/// Memory reservation for build-side data
reservation: OperatorMemoryReservation,
reservation: SharedOptionalMemoryReservation,
/// Execution plan metrics
metrics: ExecutionPlanMetricsSet,
}
Expand Down Expand Up @@ -106,7 +104,7 @@ async fn load_left_input(
left: Arc<dyn ExecutionPlan>,
context: Arc<TaskContext>,
metrics: BuildProbeJoinMetrics,
reservation: SharedMemoryReservation,
reservation: SharedOptionalMemoryReservation,
) -> Result<JoinLeftData> {
// merge all left parts into a single stream
let merge = {
Expand All @@ -125,7 +123,7 @@ async fn load_left_input(
|mut acc, batch| async {
let batch_size = batch.get_array_memory_size();
// Reserve memory for incoming batch
acc.3.lock().try_grow(batch_size)?;
acc.3.try_grow(batch_size)?;
// Update metrics
acc.2.build_mem_used.add(batch_size);
acc.2.build_input_batches.add(1);
Expand Down Expand Up @@ -226,27 +224,15 @@ impl ExecutionPlan for CrossJoinExec {
let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics);

// Initialization of operator-level reservation
{
let mut reservation_lock = self.reservation.lock();
if reservation_lock.is_none() {
*reservation_lock = Some(Arc::new(Mutex::new(
MemoryConsumer::new("CrossJoinExec").register(context.memory_pool()),
)));
};
}

let reservation = self.reservation.lock().clone().ok_or_else(|| {
DataFusionError::Internal(
"Operator-level memory reservation is not initialized".to_string(),
)
})?;
self.reservation
.initialize("CrossJoinExec", context.memory_pool());

let left_fut = self.left_fut.once(|| {
load_left_input(
self.left.clone(),
context,
join_metrics.clone(),
reservation,
self.reservation.clone(),
)
});

Expand Down
43 changes: 17 additions & 26 deletions datafusion/core/src/physical_plan/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ use hashbrown::raw::RawTable;
use crate::physical_plan::{
coalesce_batches::concat_batches,
coalesce_partitions::CoalescePartitionsExec,
common::{OperatorMemoryReservation, SharedMemoryReservation},
expressions::Column,
expressions::PhysicalSortExpr,
hash_utils::create_hashes,
Expand All @@ -78,7 +77,12 @@ use crate::logical_expr::JoinType;

use crate::arrow::array::BooleanBufferBuilder;
use crate::arrow::datatypes::TimeUnit;
use crate::execution::{context::TaskContext, memory_pool::MemoryConsumer};
use crate::execution::{
context::TaskContext,
memory_pool::{
MemoryConsumer, SharedMemoryReservation, SharedOptionalMemoryReservation, TryGrow,
},
};

use super::{
utils::{OnceAsync, OnceFut},
Expand All @@ -88,7 +92,6 @@ use crate::physical_plan::joins::utils::{
adjust_indices_by_join_type, apply_join_filter_to_indices, build_batch_from_indices,
get_final_indices_from_bit_map, need_produce_result_in_final, JoinSide,
};
use parking_lot::Mutex;
use std::fmt;
use std::task::Poll;

Expand Down Expand Up @@ -137,7 +140,7 @@ pub struct HashJoinExec {
/// Build-side data
left_fut: OnceAsync<JoinLeftData>,
/// Operator-level memory reservation for left data
reservation: OperatorMemoryReservation,
reservation: SharedOptionalMemoryReservation,
/// Shares the `RandomState` for the hashing algorithm
random_state: RandomState,
/// Partitioning mode to use
Expand Down Expand Up @@ -378,26 +381,14 @@ impl ExecutionPlan for HashJoinExec {
let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics);

// Initialization of operator-level reservation
{
let mut operator_reservation_lock = self.reservation.lock();
if operator_reservation_lock.is_none() {
*operator_reservation_lock = Some(Arc::new(Mutex::new(
MemoryConsumer::new("HashJoinExec").register(context.memory_pool()),
)));
};
}

let operator_reservation = self.reservation.lock().clone().ok_or_else(|| {
DataFusionError::Internal(
"Operator-level memory reservation is not initialized".to_string(),
)
})?;
self.reservation
.initialize("HashJoinExec", context.memory_pool());

// Inititalization of stream-level reservation
let reservation = Arc::new(Mutex::new(
let reservation = SharedMemoryReservation::from(
MemoryConsumer::new(format!("HashJoinStream[{partition}]"))
.register(context.memory_pool()),
));
);

// Memory reservation for left-side data depends on PartitionMode:
// - operator-level for `CollectLeft` mode
Expand All @@ -415,7 +406,7 @@ impl ExecutionPlan for HashJoinExec {
on_left.clone(),
context.clone(),
join_metrics.clone(),
operator_reservation.clone(),
Arc::new(self.reservation.clone()),
)
}),
PartitionMode::Partitioned => OnceFut::new(collect_left_input(
Expand All @@ -425,7 +416,7 @@ impl ExecutionPlan for HashJoinExec {
on_left.clone(),
context.clone(),
join_metrics.clone(),
reservation.clone(),
Arc::new(reservation.clone()),
)),
PartitionMode::Auto => {
return Err(DataFusionError::Plan(format!(
Expand Down Expand Up @@ -497,7 +488,7 @@ async fn collect_left_input(
on_left: Vec<Column>,
context: Arc<TaskContext>,
metrics: BuildProbeJoinMetrics,
reservation: SharedMemoryReservation,
reservation: Arc<dyn TryGrow>,
) -> Result<JoinLeftData> {
let schema = left.schema();

Expand Down Expand Up @@ -526,7 +517,7 @@ async fn collect_left_input(
.try_fold(initial, |mut acc, batch| async {
let batch_size = batch.get_array_memory_size();
// Reserve memory for incoming batch
acc.3.lock().try_grow(batch_size)?;
acc.3.try_grow(batch_size)?;
// Update metrics
acc.2.build_mem_used.add(batch_size);
acc.2.build_input_batches.add(1);
Expand Down Expand Up @@ -555,7 +546,7 @@ async fn collect_left_input(
// + 16 bytes fixed
let estimated_hastable_size = 32 * estimated_buckets + estimated_buckets + 16;

reservation.lock().try_grow(estimated_hastable_size)?;
reservation.try_grow(estimated_hastable_size)?;
metrics.build_mem_used.add(estimated_hastable_size);

let mut hashmap = JoinHashMap(RawTable::with_capacity(num_rows));
Expand Down Expand Up @@ -1157,7 +1148,7 @@ impl HashJoinStream {
// TODO: Replace `ceil` wrapper with stable `div_cell` after
// https://github.com/rust-lang/rust/issues/88581
let visited_bitmap_size = bit_util::ceil(left_data.1.num_rows(), 8);
self.reservation.lock().try_grow(visited_bitmap_size)?;
self.reservation.try_grow(visited_bitmap_size)?;
self.join_metrics.build_mem_used.add(visited_bitmap_size);
}

Expand Down
Loading