Skip to content

Commit

Permalink
add LogicalPlanBuilder::join_on
Browse files Browse the repository at this point in the history
  • Loading branch information
haohuaijin committed Oct 12, 2023
1 parent 92ba6c3 commit 9be3a72
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 6 deletions.
7 changes: 1 addition & 6 deletions datafusion/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -663,12 +663,7 @@ impl DataFrame {
) -> Result<DataFrame> {
let expr = on_exprs.into_iter().reduce(Expr::and);
let plan = LogicalPlanBuilder::from(self.plan)
.join(
right.plan,
join_type,
(Vec::<Column>::new(), Vec::<Column>::new()),
expr,
)?
.join_on(right.plan, join_type, expr)?
.build()?;
Ok(DataFrame::new(self.session_state, plan))
}
Expand Down
48 changes: 48 additions & 0 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,54 @@ impl LogicalPlanBuilder {
self.join_detailed(right, join_type, join_keys, filter, false)
}

/// Apply a join with on constraint.
///
/// The `ExtractEquijoinPredicate` optimizer pass has the ability to split join predicates into
/// equijoin predicates and (other) filter predicates. Therefore, if you prefer not to manually split the
/// join predicates, it is recommended to use the `join_on` method instead of the `join` method.
///
/// ```
/// # use datafusion_expr::{Expr, col, LogicalPlanBuilder,
/// # logical_plan::builder::LogicalTableSource, logical_plan::JoinType,};
/// # use std::sync::Arc;
/// # use arrow::datatypes::{Schema, DataType, Field};
/// # use datafusion_common::Result;
/// # fn main() -> Result<()> {
/// let example_schema = Arc::new(Schema::new(vec![
/// Field::new("a", DataType::Int32, false),
/// Field::new("b", DataType::Int32, false),
/// Field::new("c", DataType::Int32, false),
/// ]));
/// let table_source = Arc::new(LogicalTableSource::new(example_schema));
/// let left_table = table_source.clone();
/// let right_table = table_source.clone();
///
/// let right_plan = LogicalPlanBuilder::scan("right", right_table, None)?.build()?;
///
/// let exprs = vec![col("left.a").eq(col("right.a")), col("left.b").not_eq(col("right.b"))]
/// .into_iter()
/// .reduce(Expr::and);
/// let plan = LogicalPlanBuilder::scan("left", left_table, None)?
/// .join_on(right_plan, JoinType::Inner, exprs)?
/// .build()?;
/// # Ok(())
/// # }
/// ```
pub fn join_on(
self,
right: LogicalPlan,
join_type: JoinType,
on_exprs: Option<Expr>,
) -> Result<Self> {
self.join_detailed(
right,
join_type,
(Vec::<Column>::new(), Vec::<Column>::new()),
on_exprs,
false,
)
}

pub(crate) fn normalize(
plan: &LogicalPlan,
column: impl Into<Column> + Clone,
Expand Down

0 comments on commit 9be3a72

Please sign in to comment.