Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

join changes: broadcast left/right_on expressions + omit left_on/right_on expressions in result #14007

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 25 additions & 24 deletions crates/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1124,30 +1124,9 @@ impl DataFrame {

/// Add a new column to this [`DataFrame`] or replace an existing one.
pub fn with_column<S: IntoSeries>(&mut self, column: S) -> PolarsResult<&mut Self> {
fn inner(df: &mut DataFrame, mut series: Series) -> PolarsResult<&mut DataFrame> {
let height = df.height();
if series.len() == 1 && height > 1 {
series = series.new_from_index(0, height);
}

if series.len() == height || df.is_empty() {
df.add_column_by_search(series)?;
Ok(df)
}
// special case for literals
else if height == 0 && series.len() == 1 {
let s = series.clear();
df.add_column_by_search(s)?;
Ok(df)
} else {
polars_bail!(
ShapeMismatch: "unable to add a column of length {} to a DataFrame of height {}",
series.len(), height,
);
}
}
let series = column.into_series();
inner(self, series)
let series = broadcast_series_to_df(column.into_series(), self)?;
self.add_column_by_search(series)?;
Ok(self)
}

/// Adds a column to the [`DataFrame`] without doing any checks
Expand Down Expand Up @@ -3073,6 +3052,28 @@ fn ensure_can_extend(left: &Series, right: &Series) -> PolarsResult<()> {
Ok(())
}

// # TODO dodgy name and bad location
pub fn broadcast_series_to_df(mut series: Series, df: &DataFrame) -> PolarsResult<Series> {
let height = df.height();
if series.len() == 1 && height > 1 {
series = series.new_from_index(0, height);
}

if series.len() == height || df.is_empty() {
Ok(series)
}
// special case for literals
else if height == 0 && series.len() == 1 {
let s = series.clear();
Ok(s)
} else {
polars_bail!(
ShapeMismatch: "unable to add a column of length {} to a DataFrame of height {}",
series.len(), height,
);
}
}

#[cfg(test)]
mod test {
use super::*;
Expand Down
26 changes: 15 additions & 11 deletions crates/polars-lazy/src/physical_plan/executors/join.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use polars_core::frame::broadcast_series_to_df;
use polars_ops::frame::DataFrameJoinOps;

use super::*;
Expand Down Expand Up @@ -68,8 +69,8 @@ impl Executor for JoinExec {
(input_left.execute(state), input_right.execute(state))
};

let mut df_left = df_left?;
let mut df_right = df_right?;
let df_left = df_left?;
let df_right = df_right?;

let profile_name = if state.has_node_timer() {
let by = self
Expand All @@ -88,22 +89,25 @@ impl Executor for JoinExec {
let left_on_series = self
.left_on
.iter()
.map(|e| e.evaluate(&df_left, state))
.map(|e| e.evaluate(&df_left, state).and_then(|s| broadcast_series_to_df(s, &df_left)))
.collect::<PolarsResult<Vec<_>>>()?;

let right_on_series = self
.right_on
.iter()
.map(|e| e.evaluate(&df_right, state))
.map(|e| e.evaluate(&df_right, state).and_then(|s| broadcast_series_to_df(s, &df_right)))
.collect::<PolarsResult<Vec<_>>>()?;

// make sure that we can join on evaluated expressions
for s in &left_on_series {
df_left.with_column(s.clone())?;
}
for s in &right_on_series {
df_right.with_column(s.clone())?;
}
// TODO remove
if state.verbose() {
eprintln!("left on series: {:?}", left_on_series);
eprintln!("right on series: {:?}", right_on_series);

eprintln!("left df: {:?}", df_left);
eprintln!("right df: {:?}", df_right);
};

// TODO asof

// prepare the tolerance
// we must ensure that we use the right units
Expand Down
36 changes: 35 additions & 1 deletion crates/polars-ops/src/frame/join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,41 @@ pub trait DataFrameJoinOps: IntoDf {
}
let ids =
_left_join_multiple_keys(&mut left, &mut right, None, None, args.join_nulls);
left_df._finish_left_join(ids, &remove_selected(other, &selected_right), args)

// Quick hack for demonstration:
// Drop the columns in right df if left_on/right_on expressions exist in both left and right df
let ones_to_drop = selected_left
.iter()
.zip(selected_right.iter())
.filter_map(|(s1, s2)| {
// check if s1 is column in the left df
let s1_exists = left_df
.column(s1.name())
.map(|s| s1.get_data_ptr() == s.get_data_ptr())
.unwrap_or(false);

// check if s2 is in the right df
let s2_exists = other
.column(s2.name())
.map(|s| s2.get_data_ptr() == s.get_data_ptr())
.unwrap_or(false);

// only drop if both exist
if s1_exists && s2_exists {
Some(s2)
} else {
None
}
})
.cloned()
.collect::<Vec<_>>();

if _verbose {
let dropped_names = ones_to_drop.iter().map(|x| x.name()).collect::<Vec<_>>();
eprintln!("Dropping names in right df: {:?}", dropped_names);
}

left_df._finish_left_join(ids, &remove_selected(other, &ones_to_drop), args)
},
JoinType::Outer { .. } => {
let df_left = DataFrame::new_no_checks(selected_left_physical);
Expand Down
1 change: 1 addition & 0 deletions crates/polars-plan/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -875,6 +875,7 @@ impl LogicalPlanBuilder {
right_on: Vec<Expr>,
options: Arc<JoinOptions>,
) -> Self {
// TODO investigate necessity of this
for e in left_on.iter().chain(right_on.iter()) {
if has_expr(e, |e| matches!(e, Expr::Alias(_, _))) {
return LogicalPlan::Error {
Expand Down
Loading