Skip to content

Commit

Permalink
Consolidate physical join code into `datafusion/core/src/physical_pla…
Browse files Browse the repository at this point in the history
…n/joins` (#3942)

* Consolidate physical join code into `datafusion/core/src/physical_plan/joins`

* Update

* Update `use` paths

* Add RAT
  • Loading branch information
alamb authored Oct 24, 2022
1 parent 9595b8d commit ff8618a
Show file tree
Hide file tree
Showing 10 changed files with 77 additions and 54 deletions.
5 changes: 2 additions & 3 deletions datafusion/core/src/physical_optimizer/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ use crate::{
error::Result,
physical_optimizer::PhysicalOptimizerRule,
physical_plan::{
coalesce_batches::CoalesceBatchesExec, filter::FilterExec,
hash_join::HashJoinExec, repartition::RepartitionExec,
with_new_children_if_necessary,
coalesce_batches::CoalesceBatchesExec, filter::FilterExec, joins::HashJoinExec,
repartition::RepartitionExec, with_new_children_if_necessary,
},
};
use std::sync::Arc;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ use arrow::datatypes::Schema;

use crate::execution::context::SessionConfig;
use crate::logical_expr::JoinType;
use crate::physical_plan::cross_join::CrossJoinExec;
use crate::physical_plan::expressions::Column;
use crate::physical_plan::hash_join::HashJoinExec;
use crate::physical_plan::join_utils::{ColumnIndex, JoinFilter, JoinSide};
use crate::physical_plan::joins::{
utils::{ColumnIndex, JoinFilter, JoinSide},
CrossJoinExec, HashJoinExec,
};
use crate::physical_plan::projection::ProjectionExec;
use crate::physical_plan::{ExecutionPlan, PhysicalExpr};

Expand Down Expand Up @@ -197,7 +198,7 @@ impl PhysicalOptimizerRule for HashBuildProbeOrder {
mod tests {
use crate::{
physical_plan::{
displayable, hash_join::PartitionMode, ColumnStatistics, Statistics,
displayable, joins::PartitionMode, ColumnStatistics, Statistics,
},
test::exec::StatisticsExec,
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,19 @@ use arrow::datatypes::{Schema, SchemaRef};
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;

use super::expressions::PhysicalSortExpr;
use super::{
coalesce_partitions::CoalescePartitionsExec, join_utils::check_join_is_valid,
ColumnStatistics, Statistics,
use crate::execution::context::TaskContext;
use crate::physical_plan::{
coalesce_batches::concat_batches, coalesce_partitions::CoalescePartitionsExec,
ColumnStatistics, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
SendableRecordBatchStream, Statistics,
};
use crate::{error::Result, scalar::ScalarValue};
use async_trait::async_trait;
use datafusion_physical_expr::PhysicalSortExpr;
use log::debug;
use std::time::Instant;

use super::{
coalesce_batches::concat_batches, DisplayFormatType, ExecutionPlan, Partitioning,
RecordBatchStream, SendableRecordBatchStream,
};
use crate::execution::context::TaskContext;
use crate::physical_plan::join_utils::{OnceAsync, OnceFut};
use log::debug;
use super::utils::{check_join_is_valid, OnceAsync, OnceFut};

/// Data of the left side
type JoinLeftData = RecordBatch;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,33 +55,32 @@ use arrow::array::{

use hashbrown::raw::RawTable;

use super::{
use crate::physical_plan::{
coalesce_batches::concat_batches,
coalesce_partitions::CoalescePartitionsExec,
expressions::Column,
expressions::PhysicalSortExpr,
join_utils::{
hash_utils::create_hashes,
joins::utils::{
build_join_schema, check_join_is_valid, estimate_join_statistics, ColumnIndex,
JoinFilter, JoinOn, JoinSide,
},
};
use super::{
expressions::Column,
metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream,
SendableRecordBatchStream, Statistics,
};
use super::{hash_utils::create_hashes, Statistics};

use crate::error::{DataFusionError, Result};
use crate::logical_expr::JoinType;

use super::{
DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
SendableRecordBatchStream,
};
use crate::arrow::array::BooleanBufferBuilder;
use crate::arrow::datatypes::TimeUnit;
use crate::execution::context::TaskContext;
use crate::physical_plan::coalesce_batches::concat_batches;
use crate::physical_plan::PhysicalExpr;

use crate::physical_plan::join_utils::{OnceAsync, OnceFut};
use super::{
utils::{OnceAsync, OnceFut},
PartitionMode,
};
use log::debug;
use std::cmp;
use std::fmt;
Expand Down Expand Up @@ -182,15 +181,6 @@ impl HashJoinMetrics {
}
}

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
/// Partitioning mode to use for hash join
pub enum PartitionMode {
/// Left/right children are partitioned using the left and right keys
Partitioned,
/// Left side will collected into one partition
CollectLeft,
}

impl HashJoinExec {
/// Tries to create a new [HashJoinExec].
/// # Error
Expand Down
38 changes: 38 additions & 0 deletions datafusion/core/src/physical_plan/joins/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! DataFusion Join implementations

mod cross_join;
mod hash_join;
mod sort_merge_join;
pub mod utils;

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
/// Partitioning mode to use for hash join
pub enum PartitionMode {
/// Left/right children are partitioned using the left and right keys
Partitioned,
/// Left side will collected into one partition
CollectLeft,
}

pub use cross_join::CrossJoinExec;
pub use hash_join::HashJoinExec;

// Note: SortMergeJoin is not used in plans yet
pub use sort_merge_join::SortMergeJoinExec;
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ use crate::logical_expr::JoinType;
use crate::physical_plan::common::combine_batches;
use crate::physical_plan::expressions::Column;
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::join_utils::{build_join_schema, check_join_is_valid, JoinOn};
use crate::physical_plan::joins::utils::{
build_join_schema, check_join_is_valid, JoinOn,
};
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};
use crate::physical_plan::{
metrics, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
Expand Down Expand Up @@ -1198,9 +1200,9 @@ mod tests {
use crate::error::Result;
use crate::logical_expr::JoinType;
use crate::physical_plan::expressions::Column;
use crate::physical_plan::join_utils::JoinOn;
use crate::physical_plan::joins::utils::JoinOn;
use crate::physical_plan::joins::SortMergeJoinExec;
use crate::physical_plan::memory::MemoryExec;
use crate::physical_plan::sort_merge_join::SortMergeJoinExec;
use crate::physical_plan::{common, ExecutionPlan};
use crate::prelude::{SessionConfig, SessionContext};
use crate::test::{build_table_i32, columns};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use std::future::Future;
use std::sync::Arc;
use std::task::{Context, Poll};

use super::{ColumnStatistics, ExecutionPlan, Statistics};
use crate::physical_plan::{ColumnStatistics, ExecutionPlan, Statistics};

/// The on clause of the join, as vector of (left, right) columns.
pub type JoinOn = Vec<(Column, Column)>;
Expand Down
5 changes: 1 addition & 4 deletions datafusion/core/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -520,22 +520,19 @@ pub mod analyze;
pub mod coalesce_batches;
pub mod coalesce_partitions;
pub mod common;
pub mod cross_join;
pub mod display;
pub mod empty;
pub mod explain;
pub mod file_format;
pub mod filter;
pub mod hash_join;
pub mod hash_utils;
pub mod join_utils;
pub mod joins;
pub mod limit;
pub mod memory;
pub mod metrics;
pub mod planner;
pub mod projection;
pub mod repartition;
pub mod sort_merge_join;
pub mod sorts;
pub mod stream;
pub mod udaf;
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use super::analyze::AnalyzeExec;
use super::sorts::sort_preserving_merge::SortPreservingMergeExec;
use super::{
aggregates, empty::EmptyExec, hash_join::PartitionMode, udaf, union::UnionExec,
aggregates, empty::EmptyExec, joins::PartitionMode, udaf, union::UnionExec,
values::ValuesExec, windows,
};
use crate::config::{OPT_EXPLAIN_LOGICAL_PLAN_ONLY, OPT_EXPLAIN_PHYSICAL_PLAN_ONLY};
Expand All @@ -39,17 +39,17 @@ use crate::logical_expr::{Limit, Values};
use crate::physical_expr::create_physical_expr;
use crate::physical_optimizer::optimizer::PhysicalOptimizerRule;
use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
use crate::physical_plan::cross_join::CrossJoinExec;
use crate::physical_plan::explain::ExplainExec;
use crate::physical_plan::expressions::{Column, PhysicalSortExpr};
use crate::physical_plan::filter::FilterExec;
use crate::physical_plan::hash_join::HashJoinExec;
use crate::physical_plan::joins::CrossJoinExec;
use crate::physical_plan::joins::HashJoinExec;
use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use crate::physical_plan::projection::ProjectionExec;
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::windows::WindowAggExec;
use crate::physical_plan::{join_utils, Partitioning};
use crate::physical_plan::{joins::utils as join_utils, Partitioning};
use crate::physical_plan::{AggregateExpr, ExecutionPlan, PhysicalExpr, WindowExpr};
use crate::{
error::{DataFusionError, Result},
Expand Down
3 changes: 1 addition & 2 deletions datafusion/core/tests/join_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@ use rand::{Rng, SeedableRng};

use datafusion::physical_plan::collect;
use datafusion::physical_plan::expressions::Column;
use datafusion::physical_plan::hash_join::{HashJoinExec, PartitionMode};
use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode, SortMergeJoinExec};
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::sort_merge_join::SortMergeJoinExec;
use datafusion_expr::JoinType;

use datafusion::prelude::{SessionConfig, SessionContext};
Expand Down

0 comments on commit ff8618a

Please sign in to comment.