diff --git a/crates/polars-mem-engine/src/executors/join.rs b/crates/polars-mem-engine/src/executors/join.rs index df3953a5ecb3..5edab8551ece 100644 --- a/crates/polars-mem-engine/src/executors/join.rs +++ b/crates/polars-mem-engine/src/executors/join.rs @@ -68,8 +68,8 @@ impl Executor for JoinExec { (input_left.execute(state), input_right.execute(state)) }; - let mut df_left = df_left?; - let mut df_right = df_right?; + let df_left = df_left?; + let df_right = df_right?; let profile_name = if state.has_node_timer() { let by = self @@ -97,14 +97,6 @@ impl Executor for JoinExec { .map(|e| e.evaluate(&df_right, state)) .collect::>>()?; - // make sure that we can join on evaluated expressions - for s in &left_on_series { - df_left.with_column(s.clone())?; - } - for s in &right_on_series { - df_right.with_column(s.clone())?; - } - // prepare the tolerance // we must ensure that we use the right units #[cfg(feature = "asof_join")] diff --git a/crates/polars-ops/src/frame/join/args.rs b/crates/polars-ops/src/frame/join/args.rs index ea37475c32c0..a68805556f80 100644 --- a/crates/polars-ops/src/frame/join/args.rs +++ b/crates/polars-ops/src/frame/join/args.rs @@ -35,7 +35,7 @@ impl JoinArgs { } } -#[derive(Clone, PartialEq, Eq, Debug, Hash, Default)] +#[derive(Copy, Clone, PartialEq, Eq, Debug, Hash, Default)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub enum JoinCoalesce { #[default] @@ -56,7 +56,7 @@ impl JoinCoalesce { matches!(self, CoalesceColumns) }, #[cfg(feature = "asof_join")] - AsOf(_) => false, + AsOf(_) => matches!(self, JoinSpecific | CoalesceColumns), Cross => false, #[cfg(feature = "semi_anti_join")] Semi | Anti => false, diff --git a/crates/polars-ops/src/frame/join/asof/groups.rs b/crates/polars-ops/src/frame/join/asof/groups.rs index 22a72be8de9a..918d95596dee 100644 --- a/crates/polars-ops/src/frame/join/asof/groups.rs +++ b/crates/polars-ops/src/frame/join/asof/groups.rs @@ -587,29 +587,36 @@ pub trait AsofJoinBy: IntoDf { fn _join_asof_by( &self, other: &DataFrame, - left_on: &str, - right_on: &str, + left_on: &Series, + right_on: &Series, left_by: Vec, right_by: Vec, strategy: AsofStrategy, tolerance: Option>, suffix: Option<&str>, slice: Option<(i64, usize)>, + coalesce: bool, ) -> PolarsResult { - let (self_sliced_slot, other_sliced_slot); // Keeps temporaries alive. - let (self_df, other_df); + let (self_sliced_slot, other_sliced_slot, left_slice_s, right_slice_s); // Keeps temporaries alive. + let (self_df, other_df, left_key, right_key); if let Some((offset, len)) = slice { self_sliced_slot = self.to_df().slice(offset, len); other_sliced_slot = other.slice(offset, len); + left_slice_s = left_on.slice(offset, len); + right_slice_s = right_on.slice(offset, len); + left_key = &left_slice_s; + right_key = &right_slice_s; self_df = &self_sliced_slot; other_df = &other_sliced_slot; } else { self_df = self.to_df(); other_df = other; + left_key = left_on; + right_key = right_on; } - let left_asof = self_df.column(left_on)?.to_physical_repr(); - let right_asof = other_df.column(right_on)?.to_physical_repr(); + let left_asof = left_key.to_physical_repr(); + let right_asof = right_key.to_physical_repr(); let right_asof_name = right_asof.name(); let left_asof_name = left_asof.name(); check_asof_columns( @@ -645,7 +652,7 @@ pub trait AsofJoinBy: IntoDf { )?; let mut drop_these = right_by.get_column_names(); - if left_asof_name == right_asof_name { + if coalesce && left_asof_name == right_asof_name { drop_these.push(right_asof_name); } @@ -688,8 +695,10 @@ pub trait AsofJoinBy: IntoDf { let self_df = self.to_df(); let left_by = left_by.into_iter().map(|s| s.as_ref().into()).collect(); let right_by = right_by.into_iter().map(|s| s.as_ref().into()).collect(); + let left_key = self_df.column(left_on)?; + let right_key = other.column(right_on)?; self_df._join_asof_by( - other, left_on, right_on, left_by, right_by, strategy, tolerance, None, None, + other, left_key, right_key, left_by, right_by, strategy, tolerance, None, None, true, ) } } diff --git a/crates/polars-ops/src/frame/join/asof/mod.rs b/crates/polars-ops/src/frame/join/asof/mod.rs index 2332a00c21e0..07fdd69c7399 100644 --- a/crates/polars-ops/src/frame/join/asof/mod.rs +++ b/crates/polars-ops/src/frame/join/asof/mod.rs @@ -208,16 +208,15 @@ pub trait AsofJoin: IntoDf { fn _join_asof( &self, other: &DataFrame, - left_on: &str, - right_on: &str, + left_key: &Series, + right_key: &Series, strategy: AsofStrategy, tolerance: Option>, suffix: Option, slice: Option<(i64, usize)>, + coalesce: bool, ) -> PolarsResult { let self_df = self.to_df(); - let left_key = self_df.column(left_on)?; - let right_key = other.column(right_on)?; check_asof_columns(left_key, right_key, tolerance.is_some(), true)?; let left_key = left_key.to_physical_repr(); @@ -270,8 +269,8 @@ pub trait AsofJoin: IntoDf { }?; // Drop right join column. - let other = if left_on == right_on { - Cow::Owned(other.drop(right_on)?) + let other = if coalesce && left_key.name() == right_key.name() { + Cow::Owned(other.drop(right_key.name())?) } else { Cow::Borrowed(other) }; @@ -287,20 +286,6 @@ pub trait AsofJoin: IntoDf { _finish_join(left, right_df, suffix.as_deref()) } - - /// This is similar to a left-join except that we match on nearest key rather than equal keys. - /// The keys must be sorted to perform an asof join - fn join_asof( - &self, - other: &DataFrame, - left_on: &str, - right_on: &str, - strategy: AsofStrategy, - tolerance: Option>, - suffix: Option, - ) -> PolarsResult { - self._join_asof(other, left_on, right_on, strategy, tolerance, suffix, None) - } } impl AsofJoin for DataFrame {} diff --git a/crates/polars-ops/src/frame/join/mod.rs b/crates/polars-ops/src/frame/join/mod.rs index 03c9b24a0772..53c7dba6c0aa 100644 --- a/crates/polars-ops/src/frame/join/mod.rs +++ b/crates/polars-ops/src/frame/join/mod.rs @@ -132,7 +132,7 @@ pub trait DataFrameJoinOps: IntoDf { clear(&mut selected_right); } - let should_coalesce = args.coalesce.coalesce(&args.how); + let should_coalesce = args.should_coalesce(); assert_eq!(selected_left.len(), selected_right.len()); #[cfg(feature = "chunked_ids")] @@ -229,35 +229,32 @@ pub trait DataFrameJoinOps: IntoDf { args.join_nulls, ), #[cfg(feature = "asof_join")] - JoinType::AsOf(options) => { - let left_on = selected_left[0].name(); - let right_on = selected_right[0].name(); - - match (options.left_by, options.right_by) { - (Some(left_by), Some(right_by)) => left_df._join_asof_by( - other, - left_on, - right_on, - left_by, - right_by, - options.strategy, - options.tolerance, - args.suffix.as_deref(), - args.slice, - ), - (None, None) => left_df._join_asof( - other, - left_on, - right_on, - options.strategy, - options.tolerance, - args.suffix, - args.slice, - ), - _ => { - panic!("expected by arguments on both sides") - }, - } + JoinType::AsOf(options) => match (options.left_by, options.right_by) { + (Some(left_by), Some(right_by)) => left_df._join_asof_by( + other, + s_left, + s_right, + left_by, + right_by, + options.strategy, + options.tolerance, + args.suffix.as_deref(), + args.slice, + should_coalesce, + ), + (None, None) => left_df._join_asof( + other, + s_left, + s_right, + options.strategy, + options.tolerance, + args.suffix, + args.slice, + should_coalesce, + ), + _ => { + panic!("expected by arguments on both sides") + }, }, JoinType::Cross => { unreachable!() diff --git a/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs b/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs index eddaf9c31807..b7420fbf5aa6 100644 --- a/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs +++ b/crates/polars-plan/src/plans/conversion/dsl_to_ir.rs @@ -372,8 +372,9 @@ pub fn to_alp_impl( input_right, left_on, right_on, - options, + mut options, } => { + let mut turn_off_coalesce = false; for e in left_on.iter().chain(right_on.iter()) { if has_expr(e, |e| matches!(e, Expr::Alias(_, _))) { polars_bail!( @@ -381,6 +382,12 @@ pub fn to_alp_impl( "'alias' is not allowed in a join key, use 'with_columns' first", ) } + // Any expression that is not a simple column expression will turn of coalescing. + turn_off_coalesce |= has_expr(e, |e| !matches!(e, Expr::Column(_))); + } + if turn_off_coalesce { + let options = Arc::make_mut(&mut options); + options.args.coalesce = JoinCoalesce::KeepColumns; } options.args.validation.is_valid_join(&options.args.how)?; diff --git a/crates/polars-plan/src/plans/optimizer/projection_pushdown/joins.rs b/crates/polars-plan/src/plans/optimizer/projection_pushdown/joins.rs index f93f603f580d..e7bf461a76f6 100644 --- a/crates/polars-plan/src/plans/optimizer/projection_pushdown/joins.rs +++ b/crates/polars-plan/src/plans/optimizer/projection_pushdown/joins.rs @@ -266,12 +266,18 @@ pub(super) fn process_join( ); } - // For left and innner joins we can set `coalesce` to `true` if the rhs key columns are not projected. + // For left and inner joins we can set `coalesce` to `true` if the rhs key columns are not projected. // This saves a materialization. if !options.args.should_coalesce() && matches!(options.args.how, JoinType::Left | JoinType::Inner) { + let mut allow_opt = true; let non_coalesced_key_is_used = right_on.iter().any(|e| { + // Inline expressions other than col should not coalesce. + if !matches!(expr_arena.get(e.node()), AExpr::Column(_)) { + allow_opt = false; + return true; + } let key_name = e.output_name(); // If the name is in the lhs table, a suffix is added. @@ -285,7 +291,7 @@ pub(super) fn process_join( }); // If they key is not used, coalesce the columns as that is often cheaper. - if !non_coalesced_key_is_used { + if !non_coalesced_key_is_used && allow_opt { let options = Arc::make_mut(&mut options); options.args.coalesce = JoinCoalesce::CoalesceColumns; } diff --git a/crates/polars-plan/src/plans/schema.rs b/crates/polars-plan/src/plans/schema.rs index 35f4aade0ab6..1f97e045eab4 100644 --- a/crates/polars-plan/src/plans/schema.rs +++ b/crates/polars-plan/src/plans/schema.rs @@ -241,17 +241,18 @@ pub(crate) fn det_join_schema( right_on: &[Expr], options: &JoinOptions, ) -> PolarsResult { - match options.args.how { + match &options.args.how { // semi and anti joins are just filtering operations // the schema will never change. #[cfg(feature = "semi_anti_join")] JoinType::Semi | JoinType::Anti => Ok(schema_left.clone()), - _ => { + _how => { let mut new_schema = Schema::with_capacity(schema_left.len() + schema_right.len()); for (name, dtype) in schema_left.iter() { new_schema.with_column(name.clone(), dtype.clone()); } + let should_coalesce = options.args.should_coalesce(); // make sure that expression are assigned to the schema // an expression can have an alias, and change a dtype. @@ -267,13 +268,13 @@ pub(crate) fn det_join_schema( // so the columns that are joined on, may have different // values so if the right has a different name, it is added to the schema #[cfg(feature = "asof_join")] - if !options.args.coalesce.coalesce(&options.args.how) { + if matches!(_how, JoinType::AsOf(_)) { for (left_on, right_on) in left_on.iter().zip(right_on) { let field_left = left_on.to_field_amortized(schema_left, Context::Default, &mut arena)?; let field_right = right_on.to_field_amortized(schema_right, Context::Default, &mut arena)?; - if field_left.name != field_right.name { + if should_coalesce && field_left.name != field_right.name { if schema_left.contains(&field_right.name) { new_schema.with_column( _join_suffix_name(&field_right.name, options.args.suffix()).into(), @@ -292,12 +293,9 @@ pub(crate) fn det_join_schema( join_on_right.insert(field.name); } - let are_coalesced = options.args.coalesce.coalesce(&options.args.how); - let is_asof = options.args.how.is_asof(); - // Asof joins are special, if the names are equal they will not be coalesced. for (name, dtype) in schema_right.iter() { - if !join_on_right.contains(name.as_str()) || (!are_coalesced && !is_asof) + if !join_on_right.contains(name.as_str()) || (!should_coalesce) // The names that are joined on are merged { if schema_left.contains(name.as_str()) { diff --git a/py-polars/polars/dataframe/frame.py b/py-polars/polars/dataframe/frame.py index ebe67be9a4f0..10acc4d3a5bf 100644 --- a/py-polars/polars/dataframe/frame.py +++ b/py-polars/polars/dataframe/frame.py @@ -6584,6 +6584,9 @@ def join( - True: -> Always coalesce join columns. - False: -> Never coalesce join columns. + Note that joining on any other expressions than `col` + will turn off coalescing. + Returns ------- DataFrame diff --git a/py-polars/polars/lazyframe/frame.py b/py-polars/polars/lazyframe/frame.py index 1b2f815e5b99..5bc86aba553b 100644 --- a/py-polars/polars/lazyframe/frame.py +++ b/py-polars/polars/lazyframe/frame.py @@ -3975,6 +3975,9 @@ def join( - None: -> join specific. - True: -> Always coalesce join columns. - False: -> Never coalesce join columns. + + Note that joining on any other expressions than `col` + will turn off coalescing. allow_parallel Allow the physical plan to optionally evaluate the computation of both DataFrames up to the join in parallel. diff --git a/py-polars/tests/unit/datatypes/test_temporal.py b/py-polars/tests/unit/datatypes/test_temporal.py index b1ba9859c606..b2759769273d 100644 --- a/py-polars/tests/unit/datatypes/test_temporal.py +++ b/py-polars/tests/unit/datatypes/test_temporal.py @@ -571,6 +571,7 @@ def test_asof_join_tolerance_grouper() -> None: { "date": [date(2020, 1, 5), date(2020, 1, 10)], "by": [1, 1], + "date_right": [date(2020, 1, 5), None], "values": [100, None], } ) diff --git a/py-polars/tests/unit/operations/test_join.py b/py-polars/tests/unit/operations/test_join.py index 71ed3eb377b3..d2e1db858757 100644 --- a/py-polars/tests/unit/operations/test_join.py +++ b/py-polars/tests/unit/operations/test_join.py @@ -152,9 +152,9 @@ def test_join_on_expressions() -> None: df_b = pl.DataFrame({"b": [1, 4, 9, 9, 0]}) - assert df_a.join(df_b, left_on=(pl.col("a") ** 2).cast(int), right_on=pl.col("b"))[ - "a" - ].to_list() == [1, 4, 9, 9] + assert df_a.join( + df_b, left_on=(pl.col("a") ** 2).cast(int), right_on=pl.col("b") + ).to_dict(as_series=False) == {"a": [1, 2, 3, 3], "b": [1, 4, 9, 9]} def test_join() -> None: @@ -249,12 +249,14 @@ def test_join_on_cast() -> None: assert df_a.join(df_b, on=pl.col("a").cast(pl.Int64)).to_dict(as_series=False) == { "index": [1, 2, 3, 5], "a": [-2, 3, 3, 10], + "a_right": [-2, 3, 3, 10], } assert df_a.lazy().join( df_b.lazy(), on=pl.col("a").cast(pl.Int64) ).collect().to_dict(as_series=False) == { "index": [1, 2, 3, 5], "a": [-2, 3, 3, 10], + "a_right": [-2, 3, 3, 10], } @@ -365,7 +367,7 @@ def test_join_panic_on_binary_expr_5915() -> None: df_b = pl.DataFrame({"b": [1, 4, 9, 9, 0]}).lazy() z = df_a.join(df_b, left_on=[(pl.col("a") + 1).cast(int)], right_on=[pl.col("b")]) - assert z.collect().to_dict(as_series=False) == {"a": [4]} + assert z.collect().to_dict(as_series=False) == {"a": [3], "b": [4]} def test_semi_join_projection_pushdown_6423() -> None: @@ -970,9 +972,9 @@ def test_join_lit_panic_11410() -> None: df = pl.LazyFrame({"date": [1, 2, 3], "symbol": [4, 5, 6]}) dates = df.select("date").unique(maintain_order=True) symbols = df.select("symbol").unique(maintain_order=True) - assert symbols.join(dates, left_on=pl.lit(1), right_on=pl.lit(1)).drop( - "literal" - ).collect().to_dict(as_series=False) == {"symbol": [4], "date": [1]} + assert symbols.join(dates, left_on=pl.lit(1), right_on=pl.lit(1)).collect().to_dict( + as_series=False + ) == {"symbol": [4], "date": [1]} def test_join_empty_literal_17027() -> None: diff --git a/py-polars/tests/unit/operations/test_join_asof.py b/py-polars/tests/unit/operations/test_join_asof.py index 2ab9abf0967b..59e6cefb8227 100644 --- a/py-polars/tests/unit/operations/test_join_asof.py +++ b/py-polars/tests/unit/operations/test_join_asof.py @@ -54,6 +54,12 @@ def test_asof_join_inline_cast_6438() -> None: datetime(2020, 1, 1, 9, 3), datetime(2020, 1, 1, 9, 6), ], + "time_right": [ + datetime(2020, 1, 1, 9, 0), + None, + datetime(2020, 1, 1, 9, 2), + datetime(2020, 1, 1, 9, 3), + ], "stock": ["A", "B", "B", "C"], "trade": [101, 299, 301, 500], "quote": [100, None, 300, 501], @@ -169,6 +175,7 @@ def test_join_asof_floats() -> None: expected = { "a": [1.0, 2.0, 3.0], "b": ["lrow1", "lrow2", "lrow3"], + "a_right": [0.59, 1.49, 2.89], "b_right": ["rrow1", "rrow2", "rrow3"], } assert result.to_dict(as_series=False) == expected @@ -183,8 +190,8 @@ def test_join_asof_floats() -> None: "val": [0.0, 2.5, 2.6, 2.7, 3.4, 4.0, 5.0], "c": ["x", "x", "x", "y", "y", "y", "y"], } - ).with_columns(pl.col("val").alias("b")) - assert df1.join_asof(df2, on=pl.col("b").set_sorted(), by="c").to_dict( + ).with_columns(pl.col("val").alias("b").set_sorted()) + assert df1.set_sorted("b").join_asof(df2, on=pl.col("b"), by="c").to_dict( as_series=False ) == { "b": [ @@ -394,6 +401,7 @@ def test_asof_join_by_logical_types() -> None: ], "b": [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0], "c": ["1", "2", "3", "1", "2", "3", "1", "2", "3"], + "b_right": [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0], } assert result.to_dict(as_series=False) == expected