Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add maintain_order parameter to joins #20026

Merged
merged 17 commits into from
Dec 6, 2024
2 changes: 1 addition & 1 deletion crates/polars-core/src/chunked_array/ops/sort/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ impl SortMultipleOptions {
self
}

/// Implement order for all columns. Default `false`.
/// Sort order for all columns. Default `false` which is ascending.
pub fn with_order_descending(mut self, descending: bool) -> Self {
self.descending = vec![descending];
self
Expand Down
1 change: 1 addition & 0 deletions crates/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2186,6 +2186,7 @@ impl DataFrame {

/// Return a sorted clone of this [`DataFrame`].
///
/// In many cases the output chunks will be continuous in memory but this is not guaranteed
/// # Example
///
/// Sort by a single column with default options:
Expand Down
16 changes: 13 additions & 3 deletions crates/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use polars_core::prelude::*;
use polars_expr::{create_physical_expr, ExpressionConversionState};
use polars_io::RowIndex;
use polars_mem_engine::{create_physical_plan, Executor};
use polars_ops::frame::JoinCoalesce;
use polars_ops::frame::{JoinCoalesce, MaintainOrderJoin};
#[cfg(feature = "is_between")]
use polars_ops::prelude::ClosedInterval;
pub use polars_plan::frame::{AllowedOptimizations, OptFlags};
Expand Down Expand Up @@ -2031,8 +2031,9 @@ pub struct JoinBuilder {
force_parallel: bool,
suffix: Option<PlSmallStr>,
validation: JoinValidation,
coalesce: JoinCoalesce,
join_nulls: bool,
coalesce: JoinCoalesce,
maintain_order: MaintainOrderJoin,
}
impl JoinBuilder {
/// Create the `JoinBuilder` with the provided `LazyFrame` as the left table.
Expand All @@ -2045,10 +2046,11 @@ impl JoinBuilder {
right_on: vec![],
allow_parallel: true,
force_parallel: false,
join_nulls: false,
suffix: None,
validation: Default::default(),
join_nulls: false,
coalesce: Default::default(),
maintain_order: Default::default(),
}
}

Expand Down Expand Up @@ -2129,6 +2131,12 @@ impl JoinBuilder {
self
}

/// Whether to preserve the row order.
pub fn maintain_order(mut self, maintain_order: MaintainOrderJoin) -> Self {
self.maintain_order = maintain_order;
self
}

/// Finish builder
pub fn finish(self) -> LazyFrame {
let mut opt_state = self.lf.opt_state;
Expand All @@ -2146,6 +2154,7 @@ impl JoinBuilder {
slice: None,
join_nulls: self.join_nulls,
coalesce: self.coalesce,
maintain_order: self.maintain_order,
};

let lp = self
Expand Down Expand Up @@ -2242,6 +2251,7 @@ impl JoinBuilder {
slice: None,
join_nulls: self.join_nulls,
coalesce: self.coalesce,
maintain_order: self.maintain_order,
};
let options = JoinOptions {
allow_parallel: self.allow_parallel,
Expand Down
15 changes: 15 additions & 0 deletions crates/polars-ops/src/frame/join/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub struct JoinArgs {
pub slice: Option<(i64, usize)>,
pub join_nulls: bool,
pub coalesce: JoinCoalesce,
pub maintain_order: MaintainOrderJoin,
}

impl JoinArgs {
Expand Down Expand Up @@ -68,6 +69,18 @@ impl JoinCoalesce {
}
}

#[derive(Copy, Clone, PartialEq, Eq, Debug, Hash, Default, IntoStaticStr)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[strum(serialize_all = "snake_case")]
pub enum MaintainOrderJoin {
#[default]
None,
Left,
Right,
LeftRight,
RightLeft,
}

impl Default for JoinArgs {
fn default() -> Self {
Self {
Expand All @@ -77,6 +90,7 @@ impl Default for JoinArgs {
slice: None,
join_nulls: false,
coalesce: Default::default(),
maintain_order: Default::default(),
}
}
}
Expand All @@ -90,6 +104,7 @@ impl JoinArgs {
slice: None,
join_nulls: false,
coalesce: Default::default(),
maintain_order: Default::default(),
}
}

Expand Down
222 changes: 165 additions & 57 deletions crates/polars-ops/src/frame/join/dispatch_left_right.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,82 +67,190 @@ pub fn materialize_left_join_from_series(
s_right = s_right.rechunk();
}

let ids = sort_or_hash_left(&s_left, &s_right, verbose, args.validation, args.join_nulls)?;
// The current sort_or_hash_left implementation preserves the Left DataFrame order so skip left for now.
let requires_ordering = matches!(
args.maintain_order,
MaintainOrderJoin::Right | MaintainOrderJoin::RightLeft
);
if requires_ordering {
// When ordering we rechunk the series so we don't get ChunkIds as output
stijnherfst marked this conversation as resolved.
Show resolved Hide resolved
s_left = s_left.rechunk();
s_right = s_right.rechunk();
}

let (left_idx, right_idx) =
sort_or_hash_left(&s_left, &s_right, verbose, args.validation, args.join_nulls)?;

let right = if let Some(drop_names) = drop_names {
right.drop_many(drop_names)
} else {
right.drop(s_right.name()).unwrap()
};
Ok(materialize_left_join(&left, &right, ids, args))

#[cfg(feature = "chunked_ids")]
match (left_idx, right_idx) {
(ChunkJoinIds::Left(left_idx), ChunkJoinOptIds::Left(right_idx)) => {
if requires_ordering {
Ok(maintain_order_idx(
&left,
&right,
left_idx.as_slice(),
right_idx.as_slice(),
args,
))
} else {
Ok(POOL.join(
|| materialize_left_join_idx_left(&left, left_idx.as_slice(), args),
|| materialize_left_join_idx_right(&right, right_idx.as_slice(), args),
))
}
},
(ChunkJoinIds::Left(left_idx), ChunkJoinOptIds::Right(right_idx)) => Ok(POOL.join(
|| materialize_left_join_idx_left(&left, left_idx.as_slice(), args),
|| materialize_left_join_chunked_right(&right, right_idx.as_slice(), args),
)),
(ChunkJoinIds::Right(left_idx), ChunkJoinOptIds::Right(right_idx)) => Ok(POOL.join(
|| materialize_left_join_chunked_left(&left, left_idx.as_slice(), args),
|| materialize_left_join_chunked_right(&right, right_idx.as_slice(), args),
)),
(ChunkJoinIds::Right(left_idx), ChunkJoinOptIds::Left(right_idx)) => Ok(POOL.join(
|| materialize_left_join_chunked_left(&left, left_idx.as_slice(), args),
|| materialize_left_join_idx_right(&right, right_idx.as_slice(), args),
)),
}

#[cfg(not(feature = "chunked_ids"))]
if requires_ordering {
Ok(maintain_order_idx(
&left,
&right,
left_idx.as_slice(),
right_idx.as_slice(),
args,
))
} else {
Ok(POOL.join(
|| materialize_left_join_idx_left(&left, left_idx.as_slice(), args),
|| materialize_left_join_idx_right(&right, right_idx.as_slice(), args),
))
}
}

#[cfg(feature = "chunked_ids")]
fn materialize_left_join(
fn maintain_order_idx(
left: &DataFrame,
other: &DataFrame,
ids: LeftJoinIds,
left_idx: &[IdxSize],
right_idx: &[NullableIdxSize],
args: &JoinArgs,
) -> (DataFrame, DataFrame) {
let (left_idx, right_idx) = ids;
let materialize_left = || match left_idx {
ChunkJoinIds::Left(left_idx) => unsafe {
let mut left_idx = &*left_idx;
if let Some((offset, len)) = args.slice {
left_idx = slice_slice(left_idx, offset, len);
}
left._create_left_df_from_slice(left_idx, true, args.slice.is_some(), true)
},
ChunkJoinIds::Right(left_idx) => unsafe {
let mut left_idx = &*left_idx;
if let Some((offset, len)) = args.slice {
left_idx = slice_slice(left_idx, offset, len);
}
left.create_left_df_chunked(left_idx, true, args.slice.is_some())
},
let mut df = {
// SAFETY: left_idx and right_idx are continuous memory that outlive the memory mapped slices
let left = unsafe { IdxCa::mmap_slice("a".into(), left_idx) };
stijnherfst marked this conversation as resolved.
Show resolved Hide resolved
let right = unsafe { IdxCa::mmap_slice("b".into(), bytemuck::cast_slice(right_idx)) };
DataFrame::new(vec![left.into_series().into(), right.into_series().into()]).unwrap()
};

let materialize_right = || match right_idx {
ChunkJoinOptIds::Left(right_idx) => unsafe {
let mut right_idx = &*right_idx;
if let Some((offset, len)) = args.slice {
right_idx = slice_slice(right_idx, offset, len);
}
IdxCa::with_nullable_idx(right_idx, |idx| other.take_unchecked(idx))
},
ChunkJoinOptIds::Right(right_idx) => unsafe {
let mut right_idx = &*right_idx;
if let Some((offset, len)) = args.slice {
right_idx = slice_slice(right_idx, offset, len);
}
other._take_opt_chunked_unchecked_hor_par(right_idx)
},
let options = SortMultipleOptions::new()
.with_order_descending(false)
.with_maintain_order(true);

let columns = match args.maintain_order {
// If the left order is preserved then there are no unsorted right rows
// So Left and LeftRight are equal
MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight => vec!["a"],
MaintainOrderJoin::Right => vec!["b"],
MaintainOrderJoin::RightLeft => vec!["b", "a"],
_ => unreachable!(),
};
POOL.join(materialize_left, materialize_right)

df.sort_in_place(columns, options).unwrap();
df.rechunk_mut();

let join_tuples_left = df
.column("a")
.unwrap()
.as_series()
.unwrap()
.idx()
.unwrap()
.cont_slice()
.unwrap();

let join_tuples_right = df
.column("b")
.unwrap()
.as_series()
.unwrap()
.idx()
.unwrap()
.cont_slice()
.unwrap();

stijnherfst marked this conversation as resolved.
Show resolved Hide resolved
POOL.join(
|| materialize_left_join_idx_left(left, join_tuples_left, args),
|| materialize_left_join_idx_right(other, bytemuck::cast_slice(join_tuples_right), args),
)
}

#[cfg(not(feature = "chunked_ids"))]
fn materialize_left_join(
fn materialize_left_join_idx_left(
left: &DataFrame,
other: &DataFrame,
ids: LeftJoinIds,
left_idx: &[IdxSize],
args: &JoinArgs,
) -> (DataFrame, DataFrame) {
let (left_idx, right_idx) = ids;

let mut left_idx = &*left_idx;
if let Some((offset, len)) = args.slice {
left_idx = slice_slice(left_idx, offset, len);
) -> DataFrame {
let left_idx = if let Some((offset, len)) = args.slice {
slice_slice(left_idx, offset, len)
} else {
left_idx
};
unsafe {
left._create_left_df_from_slice(
left_idx,
true,
args.slice.is_some(),
matches!(
args.maintain_order,
MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight
),
)
}
let materialize_left =
|| unsafe { left._create_left_df_from_slice(&left_idx, true, args.slice.is_some(), true) };
}

let mut right_idx = &*right_idx;
if let Some((offset, len)) = args.slice {
right_idx = slice_slice(right_idx, offset, len);
}
let materialize_right = || {
let right_idx = &*right_idx;
unsafe { IdxCa::with_nullable_idx(right_idx, |idx| other.take_unchecked(idx)) }
fn materialize_left_join_idx_right(
right: &DataFrame,
right_idx: &[NullableIdxSize],
args: &JoinArgs,
) -> DataFrame {
let right_idx = if let Some((offset, len)) = args.slice {
slice_slice(right_idx, offset, len)
} else {
right_idx
};
unsafe { IdxCa::with_nullable_idx(right_idx, |idx| right.take_unchecked(idx)) }
}
#[cfg(feature = "chunked_ids")]
fn materialize_left_join_chunked_left(
left: &DataFrame,
left_idx: &[ChunkId],
args: &JoinArgs,
) -> DataFrame {
let left_idx = if let Some((offset, len)) = args.slice {
slice_slice(left_idx, offset, len)
} else {
left_idx
};
unsafe { left.create_left_df_chunked(left_idx, true, args.slice.is_some()) }
}

#[cfg(feature = "chunked_ids")]
fn materialize_left_join_chunked_right(
right: &DataFrame,
right_idx: &[ChunkId],
args: &JoinArgs,
) -> DataFrame {
let right_idx = if let Some((offset, len)) = args.slice {
slice_slice(right_idx, offset, len)
} else {
right_idx
};
POOL.join(materialize_left, materialize_right)
unsafe { right._take_opt_chunked_unchecked_hor_par(right_idx) }
}
3 changes: 2 additions & 1 deletion crates/polars-ops/src/frame/join/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ fn get_suffix(suffix: Option<PlSmallStr>) -> PlSmallStr {
suffix.unwrap_or_else(|| PlSmallStr::from_static("_right"))
}

/// Utility method to finish a join.
/// Renames the columns on the right to not clash with the left using a specified or otherwise default suffix
/// and then merges the right dataframe into the left
#[doc(hidden)]
pub fn _finish_join(
mut df_left: DataFrame,
Expand Down
Loading
Loading