Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Turn of coalescing and fix mutation of join on expressions #17061

Merged
merged 3 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 2 additions & 10 deletions crates/polars-mem-engine/src/executors/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ impl Executor for JoinExec {
(input_left.execute(state), input_right.execute(state))
};

let mut df_left = df_left?;
let mut df_right = df_right?;
let df_left = df_left?;
let df_right = df_right?;

let profile_name = if state.has_node_timer() {
let by = self
Expand Down Expand Up @@ -97,14 +97,6 @@ impl Executor for JoinExec {
.map(|e| e.evaluate(&df_right, state))
.collect::<PolarsResult<Vec<_>>>()?;

// make sure that we can join on evaluated expressions
for s in &left_on_series {
df_left.with_column(s.clone())?;
}
for s in &right_on_series {
df_right.with_column(s.clone())?;
}

// prepare the tolerance
// we must ensure that we use the right units
#[cfg(feature = "asof_join")]
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-ops/src/frame/join/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl JoinArgs {
}
}

#[derive(Clone, PartialEq, Eq, Debug, Hash, Default)]
#[derive(Copy, Clone, PartialEq, Eq, Debug, Hash, Default)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum JoinCoalesce {
#[default]
Expand All @@ -56,7 +56,7 @@ impl JoinCoalesce {
matches!(self, CoalesceColumns)
},
#[cfg(feature = "asof_join")]
AsOf(_) => false,
AsOf(_) => matches!(self, JoinSpecific | CoalesceColumns),
Cross => false,
#[cfg(feature = "semi_anti_join")]
Semi | Anti => false,
Expand Down
25 changes: 17 additions & 8 deletions crates/polars-ops/src/frame/join/asof/groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -587,29 +587,36 @@ pub trait AsofJoinBy: IntoDf {
fn _join_asof_by(
&self,
other: &DataFrame,
left_on: &str,
right_on: &str,
left_on: &Series,
right_on: &Series,
left_by: Vec<SmartString>,
right_by: Vec<SmartString>,
strategy: AsofStrategy,
tolerance: Option<AnyValue<'static>>,
suffix: Option<&str>,
slice: Option<(i64, usize)>,
coalesce: bool,
) -> PolarsResult<DataFrame> {
let (self_sliced_slot, other_sliced_slot); // Keeps temporaries alive.
let (self_df, other_df);
let (self_sliced_slot, other_sliced_slot, left_slice_s, right_slice_s); // Keeps temporaries alive.
let (self_df, other_df, left_key, right_key);
if let Some((offset, len)) = slice {
self_sliced_slot = self.to_df().slice(offset, len);
other_sliced_slot = other.slice(offset, len);
left_slice_s = left_on.slice(offset, len);
right_slice_s = right_on.slice(offset, len);
left_key = &left_slice_s;
right_key = &right_slice_s;
self_df = &self_sliced_slot;
other_df = &other_sliced_slot;
} else {
self_df = self.to_df();
other_df = other;
left_key = left_on;
right_key = right_on;
}

let left_asof = self_df.column(left_on)?.to_physical_repr();
let right_asof = other_df.column(right_on)?.to_physical_repr();
let left_asof = left_key.to_physical_repr();
let right_asof = right_key.to_physical_repr();
let right_asof_name = right_asof.name();
let left_asof_name = left_asof.name();
check_asof_columns(
Expand Down Expand Up @@ -645,7 +652,7 @@ pub trait AsofJoinBy: IntoDf {
)?;

let mut drop_these = right_by.get_column_names();
if left_asof_name == right_asof_name {
if coalesce && left_asof_name == right_asof_name {
drop_these.push(right_asof_name);
}

Expand Down Expand Up @@ -688,8 +695,10 @@ pub trait AsofJoinBy: IntoDf {
let self_df = self.to_df();
let left_by = left_by.into_iter().map(|s| s.as_ref().into()).collect();
let right_by = right_by.into_iter().map(|s| s.as_ref().into()).collect();
let left_key = self_df.column(left_on)?;
let right_key = other.column(right_on)?;
self_df._join_asof_by(
other, left_on, right_on, left_by, right_by, strategy, tolerance, None, None,
other, left_key, right_key, left_by, right_by, strategy, tolerance, None, None, true,
)
}
}
Expand Down
25 changes: 5 additions & 20 deletions crates/polars-ops/src/frame/join/asof/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,16 +208,15 @@ pub trait AsofJoin: IntoDf {
fn _join_asof(
&self,
other: &DataFrame,
left_on: &str,
right_on: &str,
left_key: &Series,
right_key: &Series,
strategy: AsofStrategy,
tolerance: Option<AnyValue<'static>>,
suffix: Option<String>,
slice: Option<(i64, usize)>,
coalesce: bool,
) -> PolarsResult<DataFrame> {
let self_df = self.to_df();
let left_key = self_df.column(left_on)?;
let right_key = other.column(right_on)?;

check_asof_columns(left_key, right_key, tolerance.is_some(), true)?;
let left_key = left_key.to_physical_repr();
Expand Down Expand Up @@ -270,8 +269,8 @@ pub trait AsofJoin: IntoDf {
}?;

// Drop right join column.
let other = if left_on == right_on {
Cow::Owned(other.drop(right_on)?)
let other = if coalesce && left_key.name() == right_key.name() {
Cow::Owned(other.drop(right_key.name())?)
} else {
Cow::Borrowed(other)
};
Expand All @@ -287,20 +286,6 @@ pub trait AsofJoin: IntoDf {

_finish_join(left, right_df, suffix.as_deref())
}

/// This is similar to a left-join except that we match on nearest key rather than equal keys.
/// The keys must be sorted to perform an asof join
fn join_asof(
&self,
other: &DataFrame,
left_on: &str,
right_on: &str,
strategy: AsofStrategy,
tolerance: Option<AnyValue<'static>>,
suffix: Option<String>,
) -> PolarsResult<DataFrame> {
self._join_asof(other, left_on, right_on, strategy, tolerance, suffix, None)
}
}

impl AsofJoin for DataFrame {}
57 changes: 27 additions & 30 deletions crates/polars-ops/src/frame/join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ pub trait DataFrameJoinOps: IntoDf {
clear(&mut selected_right);
}

let should_coalesce = args.coalesce.coalesce(&args.how);
let should_coalesce = args.should_coalesce();
assert_eq!(selected_left.len(), selected_right.len());

#[cfg(feature = "chunked_ids")]
Expand Down Expand Up @@ -229,35 +229,32 @@ pub trait DataFrameJoinOps: IntoDf {
args.join_nulls,
),
#[cfg(feature = "asof_join")]
JoinType::AsOf(options) => {
let left_on = selected_left[0].name();
let right_on = selected_right[0].name();

match (options.left_by, options.right_by) {
(Some(left_by), Some(right_by)) => left_df._join_asof_by(
other,
left_on,
right_on,
left_by,
right_by,
options.strategy,
options.tolerance,
args.suffix.as_deref(),
args.slice,
),
(None, None) => left_df._join_asof(
other,
left_on,
right_on,
options.strategy,
options.tolerance,
args.suffix,
args.slice,
),
_ => {
panic!("expected by arguments on both sides")
},
}
JoinType::AsOf(options) => match (options.left_by, options.right_by) {
(Some(left_by), Some(right_by)) => left_df._join_asof_by(
other,
s_left,
s_right,
left_by,
right_by,
options.strategy,
options.tolerance,
args.suffix.as_deref(),
args.slice,
should_coalesce,
),
(None, None) => left_df._join_asof(
other,
s_left,
s_right,
options.strategy,
options.tolerance,
args.suffix,
args.slice,
should_coalesce,
),
_ => {
panic!("expected by arguments on both sides")
},
},
JoinType::Cross => {
unreachable!()
Expand Down
9 changes: 8 additions & 1 deletion crates/polars-plan/src/plans/conversion/dsl_to_ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,15 +372,22 @@ pub fn to_alp_impl(
input_right,
left_on,
right_on,
options,
mut options,
} => {
let mut turn_off_coalesce = false;
for e in left_on.iter().chain(right_on.iter()) {
if has_expr(e, |e| matches!(e, Expr::Alias(_, _))) {
polars_bail!(
ComputeError:
"'alias' is not allowed in a join key, use 'with_columns' first",
)
}
// Any expression that is not a simple column expression will turn of coalescing.
turn_off_coalesce |= has_expr(e, |e| !matches!(e, Expr::Column(_)));
}
if turn_off_coalesce {
let options = Arc::make_mut(&mut options);
options.args.coalesce = JoinCoalesce::KeepColumns;
}

options.args.validation.is_valid_join(&options.args.how)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,12 +266,18 @@ pub(super) fn process_join(
);
}

// For left and innner joins we can set `coalesce` to `true` if the rhs key columns are not projected.
// For left and inner joins we can set `coalesce` to `true` if the rhs key columns are not projected.
// This saves a materialization.
if !options.args.should_coalesce()
&& matches!(options.args.how, JoinType::Left | JoinType::Inner)
{
let mut allow_opt = true;
let non_coalesced_key_is_used = right_on.iter().any(|e| {
// Inline expressions other than col should not coalesce.
if !matches!(expr_arena.get(e.node()), AExpr::Column(_)) {
allow_opt = false;
return true;
}
let key_name = e.output_name();

// If the name is in the lhs table, a suffix is added.
Expand All @@ -285,7 +291,7 @@ pub(super) fn process_join(
});

// If they key is not used, coalesce the columns as that is often cheaper.
if !non_coalesced_key_is_used {
if !non_coalesced_key_is_used && allow_opt {
let options = Arc::make_mut(&mut options);
options.args.coalesce = JoinCoalesce::CoalesceColumns;
}
Expand Down
14 changes: 6 additions & 8 deletions crates/polars-plan/src/plans/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,17 +241,18 @@ pub(crate) fn det_join_schema(
right_on: &[Expr],
options: &JoinOptions,
) -> PolarsResult<SchemaRef> {
match options.args.how {
match &options.args.how {
// semi and anti joins are just filtering operations
// the schema will never change.
#[cfg(feature = "semi_anti_join")]
JoinType::Semi | JoinType::Anti => Ok(schema_left.clone()),
_ => {
_how => {
let mut new_schema = Schema::with_capacity(schema_left.len() + schema_right.len());

for (name, dtype) in schema_left.iter() {
new_schema.with_column(name.clone(), dtype.clone());
}
let should_coalesce = options.args.should_coalesce();

// make sure that expression are assigned to the schema
// an expression can have an alias, and change a dtype.
Expand All @@ -267,13 +268,13 @@ pub(crate) fn det_join_schema(
// so the columns that are joined on, may have different
// values so if the right has a different name, it is added to the schema
#[cfg(feature = "asof_join")]
if !options.args.coalesce.coalesce(&options.args.how) {
if matches!(_how, JoinType::AsOf(_)) {
for (left_on, right_on) in left_on.iter().zip(right_on) {
let field_left =
left_on.to_field_amortized(schema_left, Context::Default, &mut arena)?;
let field_right =
right_on.to_field_amortized(schema_right, Context::Default, &mut arena)?;
if field_left.name != field_right.name {
if should_coalesce && field_left.name != field_right.name {
if schema_left.contains(&field_right.name) {
new_schema.with_column(
_join_suffix_name(&field_right.name, options.args.suffix()).into(),
Expand All @@ -292,12 +293,9 @@ pub(crate) fn det_join_schema(
join_on_right.insert(field.name);
}

let are_coalesced = options.args.coalesce.coalesce(&options.args.how);
let is_asof = options.args.how.is_asof();

// Asof joins are special, if the names are equal they will not be coalesced.
for (name, dtype) in schema_right.iter() {
if !join_on_right.contains(name.as_str()) || (!are_coalesced && !is_asof)
if !join_on_right.contains(name.as_str()) || (!should_coalesce)
// The names that are joined on are merged
{
if schema_left.contains(name.as_str()) {
Expand Down
3 changes: 3 additions & 0 deletions py-polars/polars/dataframe/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -6584,6 +6584,9 @@ def join(
- True: -> Always coalesce join columns.
- False: -> Never coalesce join columns.

Note that joining on any other expressions than `col`
will turn off coalescing.

Returns
-------
DataFrame
Expand Down
3 changes: 3 additions & 0 deletions py-polars/polars/lazyframe/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -3975,6 +3975,9 @@ def join(
- None: -> join specific.
- True: -> Always coalesce join columns.
- False: -> Never coalesce join columns.

Note that joining on any other expressions than `col`
will turn off coalescing.
allow_parallel
Allow the physical plan to optionally evaluate the computation of both
DataFrames up to the join in parallel.
Expand Down
1 change: 1 addition & 0 deletions py-polars/tests/unit/datatypes/test_temporal.py
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,7 @@ def test_asof_join_tolerance_grouper() -> None:
{
"date": [date(2020, 1, 5), date(2020, 1, 10)],
"by": [1, 1],
"date_right": [date(2020, 1, 5), None],
"values": [100, None],
}
)
Expand Down
Loading