Skip to content

Commit

Permalink
feat: Turn of coalescing and fix mutation of join on expressions (#17061
Browse files Browse the repository at this point in the history
)
  • Loading branch information
ritchie46 authored Jun 19, 2024
1 parent df14d67 commit 8976c2c
Show file tree
Hide file tree
Showing 13 changed files with 101 additions and 90 deletions.
12 changes: 2 additions & 10 deletions crates/polars-mem-engine/src/executors/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ impl Executor for JoinExec {
(input_left.execute(state), input_right.execute(state))
};

let mut df_left = df_left?;
let mut df_right = df_right?;
let df_left = df_left?;
let df_right = df_right?;

let profile_name = if state.has_node_timer() {
let by = self
Expand Down Expand Up @@ -97,14 +97,6 @@ impl Executor for JoinExec {
.map(|e| e.evaluate(&df_right, state))
.collect::<PolarsResult<Vec<_>>>()?;

// make sure that we can join on evaluated expressions
for s in &left_on_series {
df_left.with_column(s.clone())?;
}
for s in &right_on_series {
df_right.with_column(s.clone())?;
}

// prepare the tolerance
// we must ensure that we use the right units
#[cfg(feature = "asof_join")]
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-ops/src/frame/join/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl JoinArgs {
}
}

#[derive(Clone, PartialEq, Eq, Debug, Hash, Default)]
#[derive(Copy, Clone, PartialEq, Eq, Debug, Hash, Default)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum JoinCoalesce {
#[default]
Expand All @@ -56,7 +56,7 @@ impl JoinCoalesce {
matches!(self, CoalesceColumns)
},
#[cfg(feature = "asof_join")]
AsOf(_) => false,
AsOf(_) => matches!(self, JoinSpecific | CoalesceColumns),
Cross => false,
#[cfg(feature = "semi_anti_join")]
Semi | Anti => false,
Expand Down
25 changes: 17 additions & 8 deletions crates/polars-ops/src/frame/join/asof/groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -587,29 +587,36 @@ pub trait AsofJoinBy: IntoDf {
fn _join_asof_by(
&self,
other: &DataFrame,
left_on: &str,
right_on: &str,
left_on: &Series,
right_on: &Series,
left_by: Vec<SmartString>,
right_by: Vec<SmartString>,
strategy: AsofStrategy,
tolerance: Option<AnyValue<'static>>,
suffix: Option<&str>,
slice: Option<(i64, usize)>,
coalesce: bool,
) -> PolarsResult<DataFrame> {
let (self_sliced_slot, other_sliced_slot); // Keeps temporaries alive.
let (self_df, other_df);
let (self_sliced_slot, other_sliced_slot, left_slice_s, right_slice_s); // Keeps temporaries alive.
let (self_df, other_df, left_key, right_key);
if let Some((offset, len)) = slice {
self_sliced_slot = self.to_df().slice(offset, len);
other_sliced_slot = other.slice(offset, len);
left_slice_s = left_on.slice(offset, len);
right_slice_s = right_on.slice(offset, len);
left_key = &left_slice_s;
right_key = &right_slice_s;
self_df = &self_sliced_slot;
other_df = &other_sliced_slot;
} else {
self_df = self.to_df();
other_df = other;
left_key = left_on;
right_key = right_on;
}

let left_asof = self_df.column(left_on)?.to_physical_repr();
let right_asof = other_df.column(right_on)?.to_physical_repr();
let left_asof = left_key.to_physical_repr();
let right_asof = right_key.to_physical_repr();
let right_asof_name = right_asof.name();
let left_asof_name = left_asof.name();
check_asof_columns(
Expand Down Expand Up @@ -645,7 +652,7 @@ pub trait AsofJoinBy: IntoDf {
)?;

let mut drop_these = right_by.get_column_names();
if left_asof_name == right_asof_name {
if coalesce && left_asof_name == right_asof_name {
drop_these.push(right_asof_name);
}

Expand Down Expand Up @@ -688,8 +695,10 @@ pub trait AsofJoinBy: IntoDf {
let self_df = self.to_df();
let left_by = left_by.into_iter().map(|s| s.as_ref().into()).collect();
let right_by = right_by.into_iter().map(|s| s.as_ref().into()).collect();
let left_key = self_df.column(left_on)?;
let right_key = other.column(right_on)?;
self_df._join_asof_by(
other, left_on, right_on, left_by, right_by, strategy, tolerance, None, None,
other, left_key, right_key, left_by, right_by, strategy, tolerance, None, None, true,
)
}
}
Expand Down
25 changes: 5 additions & 20 deletions crates/polars-ops/src/frame/join/asof/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,16 +208,15 @@ pub trait AsofJoin: IntoDf {
fn _join_asof(
&self,
other: &DataFrame,
left_on: &str,
right_on: &str,
left_key: &Series,
right_key: &Series,
strategy: AsofStrategy,
tolerance: Option<AnyValue<'static>>,
suffix: Option<String>,
slice: Option<(i64, usize)>,
coalesce: bool,
) -> PolarsResult<DataFrame> {
let self_df = self.to_df();
let left_key = self_df.column(left_on)?;
let right_key = other.column(right_on)?;

check_asof_columns(left_key, right_key, tolerance.is_some(), true)?;
let left_key = left_key.to_physical_repr();
Expand Down Expand Up @@ -270,8 +269,8 @@ pub trait AsofJoin: IntoDf {
}?;

// Drop right join column.
let other = if left_on == right_on {
Cow::Owned(other.drop(right_on)?)
let other = if coalesce && left_key.name() == right_key.name() {
Cow::Owned(other.drop(right_key.name())?)
} else {
Cow::Borrowed(other)
};
Expand All @@ -287,20 +286,6 @@ pub trait AsofJoin: IntoDf {

_finish_join(left, right_df, suffix.as_deref())
}

/// This is similar to a left-join except that we match on nearest key rather than equal keys.
/// The keys must be sorted to perform an asof join
fn join_asof(
&self,
other: &DataFrame,
left_on: &str,
right_on: &str,
strategy: AsofStrategy,
tolerance: Option<AnyValue<'static>>,
suffix: Option<String>,
) -> PolarsResult<DataFrame> {
self._join_asof(other, left_on, right_on, strategy, tolerance, suffix, None)
}
}

impl AsofJoin for DataFrame {}
57 changes: 27 additions & 30 deletions crates/polars-ops/src/frame/join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ pub trait DataFrameJoinOps: IntoDf {
clear(&mut selected_right);
}

let should_coalesce = args.coalesce.coalesce(&args.how);
let should_coalesce = args.should_coalesce();
assert_eq!(selected_left.len(), selected_right.len());

#[cfg(feature = "chunked_ids")]
Expand Down Expand Up @@ -229,35 +229,32 @@ pub trait DataFrameJoinOps: IntoDf {
args.join_nulls,
),
#[cfg(feature = "asof_join")]
JoinType::AsOf(options) => {
let left_on = selected_left[0].name();
let right_on = selected_right[0].name();

match (options.left_by, options.right_by) {
(Some(left_by), Some(right_by)) => left_df._join_asof_by(
other,
left_on,
right_on,
left_by,
right_by,
options.strategy,
options.tolerance,
args.suffix.as_deref(),
args.slice,
),
(None, None) => left_df._join_asof(
other,
left_on,
right_on,
options.strategy,
options.tolerance,
args.suffix,
args.slice,
),
_ => {
panic!("expected by arguments on both sides")
},
}
JoinType::AsOf(options) => match (options.left_by, options.right_by) {
(Some(left_by), Some(right_by)) => left_df._join_asof_by(
other,
s_left,
s_right,
left_by,
right_by,
options.strategy,
options.tolerance,
args.suffix.as_deref(),
args.slice,
should_coalesce,
),
(None, None) => left_df._join_asof(
other,
s_left,
s_right,
options.strategy,
options.tolerance,
args.suffix,
args.slice,
should_coalesce,
),
_ => {
panic!("expected by arguments on both sides")
},
},
JoinType::Cross => {
unreachable!()
Expand Down
9 changes: 8 additions & 1 deletion crates/polars-plan/src/plans/conversion/dsl_to_ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,15 +372,22 @@ pub fn to_alp_impl(
input_right,
left_on,
right_on,
options,
mut options,
} => {
let mut turn_off_coalesce = false;
for e in left_on.iter().chain(right_on.iter()) {
if has_expr(e, |e| matches!(e, Expr::Alias(_, _))) {
polars_bail!(
ComputeError:
"'alias' is not allowed in a join key, use 'with_columns' first",
)
}
// Any expression that is not a simple column expression will turn of coalescing.
turn_off_coalesce |= has_expr(e, |e| !matches!(e, Expr::Column(_)));
}
if turn_off_coalesce {
let options = Arc::make_mut(&mut options);
options.args.coalesce = JoinCoalesce::KeepColumns;
}

options.args.validation.is_valid_join(&options.args.how)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,12 +266,18 @@ pub(super) fn process_join(
);
}

// For left and innner joins we can set `coalesce` to `true` if the rhs key columns are not projected.
// For left and inner joins we can set `coalesce` to `true` if the rhs key columns are not projected.
// This saves a materialization.
if !options.args.should_coalesce()
&& matches!(options.args.how, JoinType::Left | JoinType::Inner)
{
let mut allow_opt = true;
let non_coalesced_key_is_used = right_on.iter().any(|e| {
// Inline expressions other than col should not coalesce.
if !matches!(expr_arena.get(e.node()), AExpr::Column(_)) {
allow_opt = false;
return true;
}
let key_name = e.output_name();

// If the name is in the lhs table, a suffix is added.
Expand All @@ -285,7 +291,7 @@ pub(super) fn process_join(
});

// If they key is not used, coalesce the columns as that is often cheaper.
if !non_coalesced_key_is_used {
if !non_coalesced_key_is_used && allow_opt {
let options = Arc::make_mut(&mut options);
options.args.coalesce = JoinCoalesce::CoalesceColumns;
}
Expand Down
14 changes: 6 additions & 8 deletions crates/polars-plan/src/plans/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,17 +241,18 @@ pub(crate) fn det_join_schema(
right_on: &[Expr],
options: &JoinOptions,
) -> PolarsResult<SchemaRef> {
match options.args.how {
match &options.args.how {
// semi and anti joins are just filtering operations
// the schema will never change.
#[cfg(feature = "semi_anti_join")]
JoinType::Semi | JoinType::Anti => Ok(schema_left.clone()),
_ => {
_how => {
let mut new_schema = Schema::with_capacity(schema_left.len() + schema_right.len());

for (name, dtype) in schema_left.iter() {
new_schema.with_column(name.clone(), dtype.clone());
}
let should_coalesce = options.args.should_coalesce();

// make sure that expression are assigned to the schema
// an expression can have an alias, and change a dtype.
Expand All @@ -267,13 +268,13 @@ pub(crate) fn det_join_schema(
// so the columns that are joined on, may have different
// values so if the right has a different name, it is added to the schema
#[cfg(feature = "asof_join")]
if !options.args.coalesce.coalesce(&options.args.how) {
if matches!(_how, JoinType::AsOf(_)) {
for (left_on, right_on) in left_on.iter().zip(right_on) {
let field_left =
left_on.to_field_amortized(schema_left, Context::Default, &mut arena)?;
let field_right =
right_on.to_field_amortized(schema_right, Context::Default, &mut arena)?;
if field_left.name != field_right.name {
if should_coalesce && field_left.name != field_right.name {
if schema_left.contains(&field_right.name) {
new_schema.with_column(
_join_suffix_name(&field_right.name, options.args.suffix()).into(),
Expand All @@ -292,12 +293,9 @@ pub(crate) fn det_join_schema(
join_on_right.insert(field.name);
}

let are_coalesced = options.args.coalesce.coalesce(&options.args.how);
let is_asof = options.args.how.is_asof();

// Asof joins are special, if the names are equal they will not be coalesced.
for (name, dtype) in schema_right.iter() {
if !join_on_right.contains(name.as_str()) || (!are_coalesced && !is_asof)
if !join_on_right.contains(name.as_str()) || (!should_coalesce)
// The names that are joined on are merged
{
if schema_left.contains(name.as_str()) {
Expand Down
3 changes: 3 additions & 0 deletions py-polars/polars/dataframe/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -6584,6 +6584,9 @@ def join(
- True: -> Always coalesce join columns.
- False: -> Never coalesce join columns.
Note that joining on any other expressions than `col`
will turn off coalescing.
Returns
-------
DataFrame
Expand Down
3 changes: 3 additions & 0 deletions py-polars/polars/lazyframe/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -3975,6 +3975,9 @@ def join(
- None: -> join specific.
- True: -> Always coalesce join columns.
- False: -> Never coalesce join columns.
Note that joining on any other expressions than `col`
will turn off coalescing.
allow_parallel
Allow the physical plan to optionally evaluate the computation of both
DataFrames up to the join in parallel.
Expand Down
1 change: 1 addition & 0 deletions py-polars/tests/unit/datatypes/test_temporal.py
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,7 @@ def test_asof_join_tolerance_grouper() -> None:
{
"date": [date(2020, 1, 5), date(2020, 1, 10)],
"by": [1, 1],
"date_right": [date(2020, 1, 5), None],
"values": [100, None],
}
)
Expand Down
Loading

0 comments on commit 8976c2c

Please sign in to comment.