From e9ddd376de896a350adc166d925792f06197c782 Mon Sep 17 00:00:00 2001 From: Stijn Date: Fri, 6 Dec 2024 10:00:53 +0100 Subject: [PATCH] feat: Add `maintain_order` parameter to joins (#20026) --- .../src/chunked_array/ops/sort/options.rs | 2 +- crates/polars-core/src/frame/mod.rs | 1 + crates/polars-lazy/src/frame/mod.rs | 16 +- crates/polars-ops/src/frame/join/args.rs | 15 ++ .../src/frame/join/dispatch_left_right.rs | 222 +++++++++++++----- crates/polars-ops/src/frame/join/general.rs | 3 +- .../src/frame/join/hash_join/mod.rs | 46 +++- .../src/frame/join/hash_join/sort_merge.rs | 6 +- crates/polars-ops/src/frame/join/mod.rs | 72 ++++-- crates/polars-python/src/conversion/mod.rs | 18 ++ crates/polars-python/src/lazyframe/general.rs | 8 +- .../src/lazyframe/visitor/nodes.rs | 1 + crates/polars-utils/src/index.rs | 1 + py-polars/polars/_typing.py | 3 + py-polars/polars/dataframe/frame.py | 21 ++ py-polars/polars/lazyframe/frame.py | 26 +- py-polars/tests/unit/operations/test_join.py | 123 ++++++++++ 17 files changed, 489 insertions(+), 95 deletions(-) diff --git a/crates/polars-core/src/chunked_array/ops/sort/options.rs b/crates/polars-core/src/chunked_array/ops/sort/options.rs index 95bff0b1b47a..276c8f08774a 100644 --- a/crates/polars-core/src/chunked_array/ops/sort/options.rs +++ b/crates/polars-core/src/chunked_array/ops/sort/options.rs @@ -149,7 +149,7 @@ impl SortMultipleOptions { self } - /// Implement order for all columns. Default `false`. + /// Sort order for all columns. Default `false` which is ascending. pub fn with_order_descending(mut self, descending: bool) -> Self { self.descending = vec![descending]; self diff --git a/crates/polars-core/src/frame/mod.rs b/crates/polars-core/src/frame/mod.rs index 3eaa575a2da1..d1d80b7e23fc 100644 --- a/crates/polars-core/src/frame/mod.rs +++ b/crates/polars-core/src/frame/mod.rs @@ -2184,6 +2184,7 @@ impl DataFrame { /// Return a sorted clone of this [`DataFrame`]. /// + /// In many cases the output chunks will be continuous in memory but this is not guaranteed /// # Example /// /// Sort by a single column with default options: diff --git a/crates/polars-lazy/src/frame/mod.rs b/crates/polars-lazy/src/frame/mod.rs index cd32b11e1dd6..154726e6a27e 100644 --- a/crates/polars-lazy/src/frame/mod.rs +++ b/crates/polars-lazy/src/frame/mod.rs @@ -34,7 +34,7 @@ use polars_core::prelude::*; use polars_expr::{create_physical_expr, ExpressionConversionState}; use polars_io::RowIndex; use polars_mem_engine::{create_physical_plan, Executor}; -use polars_ops::frame::JoinCoalesce; +use polars_ops::frame::{JoinCoalesce, MaintainOrderJoin}; #[cfg(feature = "is_between")] use polars_ops::prelude::ClosedInterval; pub use polars_plan::frame::{AllowedOptimizations, OptFlags}; @@ -1998,8 +1998,9 @@ pub struct JoinBuilder { force_parallel: bool, suffix: Option, validation: JoinValidation, - coalesce: JoinCoalesce, join_nulls: bool, + coalesce: JoinCoalesce, + maintain_order: MaintainOrderJoin, } impl JoinBuilder { /// Create the `JoinBuilder` with the provided `LazyFrame` as the left table. @@ -2012,10 +2013,11 @@ impl JoinBuilder { right_on: vec![], allow_parallel: true, force_parallel: false, - join_nulls: false, suffix: None, validation: Default::default(), + join_nulls: false, coalesce: Default::default(), + maintain_order: Default::default(), } } @@ -2096,6 +2098,12 @@ impl JoinBuilder { self } + /// Whether to preserve the row order. + pub fn maintain_order(mut self, maintain_order: MaintainOrderJoin) -> Self { + self.maintain_order = maintain_order; + self + } + /// Finish builder pub fn finish(self) -> LazyFrame { let mut opt_state = self.lf.opt_state; @@ -2113,6 +2121,7 @@ impl JoinBuilder { slice: None, join_nulls: self.join_nulls, coalesce: self.coalesce, + maintain_order: self.maintain_order, }; let lp = self @@ -2209,6 +2218,7 @@ impl JoinBuilder { slice: None, join_nulls: self.join_nulls, coalesce: self.coalesce, + maintain_order: self.maintain_order, }; let options = JoinOptions { allow_parallel: self.allow_parallel, diff --git a/crates/polars-ops/src/frame/join/args.rs b/crates/polars-ops/src/frame/join/args.rs index def36b76a677..c872b1993eee 100644 --- a/crates/polars-ops/src/frame/join/args.rs +++ b/crates/polars-ops/src/frame/join/args.rs @@ -29,6 +29,7 @@ pub struct JoinArgs { pub slice: Option<(i64, usize)>, pub join_nulls: bool, pub coalesce: JoinCoalesce, + pub maintain_order: MaintainOrderJoin, } impl JoinArgs { @@ -68,6 +69,18 @@ impl JoinCoalesce { } } +#[derive(Copy, Clone, PartialEq, Eq, Debug, Hash, Default, IntoStaticStr)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +#[strum(serialize_all = "snake_case")] +pub enum MaintainOrderJoin { + #[default] + None, + Left, + Right, + LeftRight, + RightLeft, +} + impl Default for JoinArgs { fn default() -> Self { Self { @@ -77,6 +90,7 @@ impl Default for JoinArgs { slice: None, join_nulls: false, coalesce: Default::default(), + maintain_order: Default::default(), } } } @@ -90,6 +104,7 @@ impl JoinArgs { slice: None, join_nulls: false, coalesce: Default::default(), + maintain_order: Default::default(), } } diff --git a/crates/polars-ops/src/frame/join/dispatch_left_right.rs b/crates/polars-ops/src/frame/join/dispatch_left_right.rs index 3c82773c7d9e..301cfb68e512 100644 --- a/crates/polars-ops/src/frame/join/dispatch_left_right.rs +++ b/crates/polars-ops/src/frame/join/dispatch_left_right.rs @@ -67,82 +67,190 @@ pub fn materialize_left_join_from_series( s_right = s_right.rechunk(); } - let ids = sort_or_hash_left(&s_left, &s_right, verbose, args.validation, args.join_nulls)?; + // The current sort_or_hash_left implementation preserves the Left DataFrame order so skip left for now. + let requires_ordering = matches!( + args.maintain_order, + MaintainOrderJoin::Right | MaintainOrderJoin::RightLeft + ); + if requires_ordering { + // When ordering we rechunk the series so we don't get ChunkIds as output + s_left = s_left.rechunk(); + s_right = s_right.rechunk(); + } + + let (left_idx, right_idx) = + sort_or_hash_left(&s_left, &s_right, verbose, args.validation, args.join_nulls)?; + let right = if let Some(drop_names) = drop_names { right.drop_many(drop_names) } else { right.drop(s_right.name()).unwrap() }; - Ok(materialize_left_join(&left, &right, ids, args)) + + #[cfg(feature = "chunked_ids")] + match (left_idx, right_idx) { + (ChunkJoinIds::Left(left_idx), ChunkJoinOptIds::Left(right_idx)) => { + if requires_ordering { + Ok(maintain_order_idx( + &left, + &right, + left_idx.as_slice(), + right_idx.as_slice(), + args, + )) + } else { + Ok(POOL.join( + || materialize_left_join_idx_left(&left, left_idx.as_slice(), args), + || materialize_left_join_idx_right(&right, right_idx.as_slice(), args), + )) + } + }, + (ChunkJoinIds::Left(left_idx), ChunkJoinOptIds::Right(right_idx)) => Ok(POOL.join( + || materialize_left_join_idx_left(&left, left_idx.as_slice(), args), + || materialize_left_join_chunked_right(&right, right_idx.as_slice(), args), + )), + (ChunkJoinIds::Right(left_idx), ChunkJoinOptIds::Right(right_idx)) => Ok(POOL.join( + || materialize_left_join_chunked_left(&left, left_idx.as_slice(), args), + || materialize_left_join_chunked_right(&right, right_idx.as_slice(), args), + )), + (ChunkJoinIds::Right(left_idx), ChunkJoinOptIds::Left(right_idx)) => Ok(POOL.join( + || materialize_left_join_chunked_left(&left, left_idx.as_slice(), args), + || materialize_left_join_idx_right(&right, right_idx.as_slice(), args), + )), + } + + #[cfg(not(feature = "chunked_ids"))] + if requires_ordering { + Ok(maintain_order_idx( + &left, + &right, + left_idx.as_slice(), + right_idx.as_slice(), + args, + )) + } else { + Ok(POOL.join( + || materialize_left_join_idx_left(&left, left_idx.as_slice(), args), + || materialize_left_join_idx_right(&right, right_idx.as_slice(), args), + )) + } } -#[cfg(feature = "chunked_ids")] -fn materialize_left_join( +fn maintain_order_idx( left: &DataFrame, other: &DataFrame, - ids: LeftJoinIds, + left_idx: &[IdxSize], + right_idx: &[NullableIdxSize], args: &JoinArgs, ) -> (DataFrame, DataFrame) { - let (left_idx, right_idx) = ids; - let materialize_left = || match left_idx { - ChunkJoinIds::Left(left_idx) => unsafe { - let mut left_idx = &*left_idx; - if let Some((offset, len)) = args.slice { - left_idx = slice_slice(left_idx, offset, len); - } - left._create_left_df_from_slice(left_idx, true, args.slice.is_some(), true) - }, - ChunkJoinIds::Right(left_idx) => unsafe { - let mut left_idx = &*left_idx; - if let Some((offset, len)) = args.slice { - left_idx = slice_slice(left_idx, offset, len); - } - left.create_left_df_chunked(left_idx, true, args.slice.is_some()) - }, + let mut df = { + // SAFETY: left_idx and right_idx are continuous memory that outlive the memory mapped slices + let left = unsafe { IdxCa::mmap_slice("a".into(), left_idx) }; + let right = unsafe { IdxCa::mmap_slice("b".into(), bytemuck::cast_slice(right_idx)) }; + DataFrame::new(vec![left.into_series().into(), right.into_series().into()]).unwrap() }; - let materialize_right = || match right_idx { - ChunkJoinOptIds::Left(right_idx) => unsafe { - let mut right_idx = &*right_idx; - if let Some((offset, len)) = args.slice { - right_idx = slice_slice(right_idx, offset, len); - } - IdxCa::with_nullable_idx(right_idx, |idx| other.take_unchecked(idx)) - }, - ChunkJoinOptIds::Right(right_idx) => unsafe { - let mut right_idx = &*right_idx; - if let Some((offset, len)) = args.slice { - right_idx = slice_slice(right_idx, offset, len); - } - other._take_opt_chunked_unchecked_hor_par(right_idx) - }, + let options = SortMultipleOptions::new() + .with_order_descending(false) + .with_maintain_order(true); + + let columns = match args.maintain_order { + // If the left order is preserved then there are no unsorted right rows + // So Left and LeftRight are equal + MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight => vec!["a"], + MaintainOrderJoin::Right => vec!["b"], + MaintainOrderJoin::RightLeft => vec!["b", "a"], + _ => unreachable!(), }; - POOL.join(materialize_left, materialize_right) + + df.sort_in_place(columns, options).unwrap(); + df.rechunk_mut(); + + let join_tuples_left = df + .column("a") + .unwrap() + .as_series() + .unwrap() + .idx() + .unwrap() + .cont_slice() + .unwrap(); + + let join_tuples_right = df + .column("b") + .unwrap() + .as_series() + .unwrap() + .idx() + .unwrap() + .cont_slice() + .unwrap(); + + POOL.join( + || materialize_left_join_idx_left(left, join_tuples_left, args), + || materialize_left_join_idx_right(other, bytemuck::cast_slice(join_tuples_right), args), + ) } -#[cfg(not(feature = "chunked_ids"))] -fn materialize_left_join( +fn materialize_left_join_idx_left( left: &DataFrame, - other: &DataFrame, - ids: LeftJoinIds, + left_idx: &[IdxSize], args: &JoinArgs, -) -> (DataFrame, DataFrame) { - let (left_idx, right_idx) = ids; - - let mut left_idx = &*left_idx; - if let Some((offset, len)) = args.slice { - left_idx = slice_slice(left_idx, offset, len); +) -> DataFrame { + let left_idx = if let Some((offset, len)) = args.slice { + slice_slice(left_idx, offset, len) + } else { + left_idx + }; + unsafe { + left._create_left_df_from_slice( + left_idx, + true, + args.slice.is_some(), + matches!( + args.maintain_order, + MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight + ), + ) } - let materialize_left = - || unsafe { left._create_left_df_from_slice(&left_idx, true, args.slice.is_some(), true) }; +} - let mut right_idx = &*right_idx; - if let Some((offset, len)) = args.slice { - right_idx = slice_slice(right_idx, offset, len); - } - let materialize_right = || { - let right_idx = &*right_idx; - unsafe { IdxCa::with_nullable_idx(right_idx, |idx| other.take_unchecked(idx)) } +fn materialize_left_join_idx_right( + right: &DataFrame, + right_idx: &[NullableIdxSize], + args: &JoinArgs, +) -> DataFrame { + let right_idx = if let Some((offset, len)) = args.slice { + slice_slice(right_idx, offset, len) + } else { + right_idx + }; + unsafe { IdxCa::with_nullable_idx(right_idx, |idx| right.take_unchecked(idx)) } +} +#[cfg(feature = "chunked_ids")] +fn materialize_left_join_chunked_left( + left: &DataFrame, + left_idx: &[ChunkId], + args: &JoinArgs, +) -> DataFrame { + let left_idx = if let Some((offset, len)) = args.slice { + slice_slice(left_idx, offset, len) + } else { + left_idx + }; + unsafe { left.create_left_df_chunked(left_idx, true, args.slice.is_some()) } +} + +#[cfg(feature = "chunked_ids")] +fn materialize_left_join_chunked_right( + right: &DataFrame, + right_idx: &[ChunkId], + args: &JoinArgs, +) -> DataFrame { + let right_idx = if let Some((offset, len)) = args.slice { + slice_slice(right_idx, offset, len) + } else { + right_idx }; - POOL.join(materialize_left, materialize_right) + unsafe { right._take_opt_chunked_unchecked_hor_par(right_idx) } } diff --git a/crates/polars-ops/src/frame/join/general.rs b/crates/polars-ops/src/frame/join/general.rs index 0bf0a86cd972..5ea6ef68638d 100644 --- a/crates/polars-ops/src/frame/join/general.rs +++ b/crates/polars-ops/src/frame/join/general.rs @@ -11,7 +11,8 @@ fn get_suffix(suffix: Option) -> PlSmallStr { suffix.unwrap_or_else(|| PlSmallStr::from_static("_right")) } -/// Utility method to finish a join. +/// Renames the columns on the right to not clash with the left using a specified or otherwise default suffix +/// and then merges the right dataframe into the left #[doc(hidden)] pub fn _finish_join( mut df_left: DataFrame, diff --git a/crates/polars-ops/src/frame/join/hash_join/mod.rs b/crates/polars-ops/src/frame/join/hash_join/mod.rs index 1bf51d8c2e3b..5c664b91182d 100644 --- a/crates/polars-ops/src/frame/join/hash_join/mod.rs +++ b/crates/polars-ops/src/frame/join/hash_join/mod.rs @@ -91,7 +91,7 @@ pub trait JoinDispatch: IntoDf { let df_self = self.to_df(); let left_join_no_duplicate_matches = - left_join && !was_sliced && join_tuples.len() == df_self.height(); + sorted_tuple_idx && left_join && !was_sliced && join_tuples.len() == df_self.height(); if left_join_no_duplicate_matches { df_self.clone() @@ -161,14 +161,42 @@ pub trait JoinDispatch: IntoDf { join_idx_l.slice(offset, len); join_idx_r.slice(offset, len); } - let idx_ca_l = IdxCa::with_chunk(PlSmallStr::EMPTY, join_idx_l); - let idx_ca_r = IdxCa::with_chunk(PlSmallStr::EMPTY, join_idx_r); - - // Take the left and right dataframes by join tuples - let (df_left, df_right) = POOL.join( - || unsafe { df_self.take_unchecked(&idx_ca_l) }, - || unsafe { other.take_unchecked(&idx_ca_r) }, - ); + let idx_ca_l = IdxCa::with_chunk("a".into(), join_idx_l); + let idx_ca_r = IdxCa::with_chunk("b".into(), join_idx_r); + + let (df_left, df_right) = if args.maintain_order != MaintainOrderJoin::None { + let mut df = DataFrame::new(vec![ + idx_ca_l.into_series().into(), + idx_ca_r.into_series().into(), + ])?; + + let options = SortMultipleOptions::new() + .with_order_descending(false) + .with_maintain_order(true) + .with_nulls_last(true); + + let columns = match args.maintain_order { + MaintainOrderJoin::Left => vec!["a"], + MaintainOrderJoin::LeftRight => vec!["a", "b"], + MaintainOrderJoin::Right => vec!["b"], + MaintainOrderJoin::RightLeft => vec!["b", "a"], + _ => unreachable!(), + }; + + df.sort_in_place(columns, options)?; + + let join_tuples_left = df.column("a").unwrap().idx().unwrap(); + let join_tuples_right = df.column("b").unwrap().idx().unwrap(); + POOL.join( + || unsafe { df_self.take_unchecked(join_tuples_left) }, + || unsafe { other.take_unchecked(join_tuples_right) }, + ) + } else { + POOL.join( + || unsafe { df_self.take_unchecked(&idx_ca_l) }, + || unsafe { other.take_unchecked(&idx_ca_r) }, + ) + }; let coalesce = args.coalesce.coalesce(&JoinType::Full); let out = _finish_join(df_left, df_right, args.suffix.clone()); diff --git a/crates/polars-ops/src/frame/join/hash_join/sort_merge.rs b/crates/polars-ops/src/frame/join/hash_join/sort_merge.rs index 95cde8387733..b9df6bcb3839 100644 --- a/crates/polars-ops/src/frame/join/hash_join/sort_merge.rs +++ b/crates/polars-ops/src/frame/join/hash_join/sort_merge.rs @@ -152,8 +152,10 @@ pub(super) fn par_sorted_merge_inner_no_nulls( } } -#[cfg(feature = "performant")] -fn to_left_join_ids(left_idx: Vec, right_idx: Vec) -> LeftJoinIds { +pub(crate) fn to_left_join_ids( + left_idx: Vec, + right_idx: Vec, +) -> LeftJoinIds { #[cfg(feature = "chunked_ids")] { (Either::Left(left_idx), Either::Left(right_idx)) diff --git a/crates/polars-ops/src/frame/join/mod.rs b/crates/polars-ops/src/frame/join/mod.rs index fd59ef7f4a1c..021b9c5bf35c 100644 --- a/crates/polars-ops/src/frame/join/mod.rs +++ b/crates/polars-ops/src/frame/join/mod.rs @@ -504,25 +504,61 @@ trait DataFrameJoinOpsPrivate: IntoDf { join_tuples_right = slice_slice(join_tuples_right, offset, len); } - let (df_left, df_right) = POOL.join( - // SAFETY: join indices are known to be in bounds - || unsafe { - left_df._create_left_df_from_slice( - join_tuples_left, - false, - args.slice.is_some(), - sorted, - ) - }, - || unsafe { - if let Some(drop_names) = drop_names { - other.drop_many(drop_names) - } else { - other.drop(s_right.name()).unwrap() + let other = if let Some(drop_names) = drop_names { + other.drop_many(drop_names) + } else { + other.drop(s_right.name()).unwrap() + }; + + let mut left = unsafe { IdxCa::mmap_slice("a".into(), join_tuples_left) }; + if sorted { + left.set_sorted_flag(IsSorted::Ascending); + } + let right = unsafe { IdxCa::mmap_slice("b".into(), join_tuples_right) }; + + let already_left_sorted = sorted + && matches!( + args.maintain_order, + MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight + ); + let (df_left, df_right) = + if args.maintain_order != MaintainOrderJoin::None && !already_left_sorted { + let mut df = + DataFrame::new(vec![left.into_series().into(), right.into_series().into()])?; + + let columns = match args.maintain_order { + MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight => vec!["a"], + MaintainOrderJoin::Right | MaintainOrderJoin::RightLeft => vec!["b"], + _ => unreachable!(), + }; + + let options = SortMultipleOptions::new() + .with_order_descending(false) + .with_maintain_order(true); + + df.sort_in_place(columns, options)?; + + let [mut a, b]: [Column; 2] = df.take_columns().try_into().unwrap(); + if matches!( + args.maintain_order, + MaintainOrderJoin::Left | MaintainOrderJoin::LeftRight + ) { + a.set_sorted_flag(IsSorted::Ascending); } - ._take_unchecked_slice(join_tuples_right, true) - }, - ); + + POOL.join( + // SAFETY: join indices are known to be in bounds + || unsafe { left_df.take_unchecked(a.idx().unwrap()) }, + || unsafe { other.take_unchecked(b.idx().unwrap()) }, + ) + } else { + POOL.join( + // SAFETY: join indices are known to be in bounds + || unsafe { left_df.take_unchecked(left.into_series().idx().unwrap()) }, + || unsafe { other.take_unchecked(right.into_series().idx().unwrap()) }, + ) + }; + _finish_join(df_left, df_right, args.suffix.clone()) } } diff --git a/crates/polars-python/src/conversion/mod.rs b/crates/polars-python/src/conversion/mod.rs index ed28df05746d..6f81c274eb18 100644 --- a/crates/polars-python/src/conversion/mod.rs +++ b/crates/polars-python/src/conversion/mod.rs @@ -1156,6 +1156,24 @@ impl<'py> FromPyObject<'py> for Wrap { } } +impl<'py> FromPyObject<'py> for Wrap { + fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult { + let parsed = match &*ob.extract::()? { + "none" => MaintainOrderJoin::None, + "left" => MaintainOrderJoin::Left, + "right" => MaintainOrderJoin::Right, + "left_right" => MaintainOrderJoin::LeftRight, + "right_left" => MaintainOrderJoin::RightLeft, + v => { + return Err(PyValueError::new_err(format!( + "`maintain_order` must be one of {{'none', 'left', 'right', 'left_right', 'right_left'}}, got {v}", + ))) + }, + }; + Ok(Wrap(parsed)) + } +} + #[cfg(feature = "csv")] impl<'py> FromPyObject<'py> for Wrap { fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult { diff --git a/crates/polars-python/src/lazyframe/general.rs b/crates/polars-python/src/lazyframe/general.rs index 935b6bd082b5..3c835d063f77 100644 --- a/crates/polars-python/src/lazyframe/general.rs +++ b/crates/polars-python/src/lazyframe/general.rs @@ -1020,7 +1020,7 @@ impl PyLazyFrame { .into()) } - #[pyo3(signature = (other, left_on, right_on, allow_parallel, force_parallel, join_nulls, how, suffix, validate, coalesce=None))] + #[pyo3(signature = (other, left_on, right_on, allow_parallel, force_parallel, join_nulls, how, suffix, validate, maintain_order, coalesce=None))] fn join( &self, other: Self, @@ -1032,6 +1032,7 @@ impl PyLazyFrame { how: Wrap, suffix: String, validate: Wrap, + maintain_order: Wrap, coalesce: Option, ) -> PyResult { let coalesce = match coalesce { @@ -1059,9 +1060,10 @@ impl PyLazyFrame { .force_parallel(force_parallel) .join_nulls(join_nulls) .how(how.0) - .coalesce(coalesce) - .validate(validate.0) .suffix(suffix) + .validate(validate.0) + .coalesce(coalesce) + .maintain_order(maintain_order.0) .finish() .into()) } diff --git a/crates/polars-python/src/lazyframe/visitor/nodes.rs b/crates/polars-python/src/lazyframe/visitor/nodes.rs index 05a56d920719..ccb666e6dcba 100644 --- a/crates/polars-python/src/lazyframe/visitor/nodes.rs +++ b/crates/polars-python/src/lazyframe/visitor/nodes.rs @@ -494,6 +494,7 @@ pub(crate) fn into_py(py: Python<'_>, plan: &IR) -> PyResult { options.args.slice, options.args.suffix().as_str(), options.args.coalesce.coalesce(how), + Into::<&str>::into(options.args.maintain_order), ) .to_object(py) }, diff --git a/crates/polars-utils/src/index.rs b/crates/polars-utils/src/index.rs index fb43a1958cd6..b6101e440075 100644 --- a/crates/polars-utils/src/index.rs +++ b/crates/polars-utils/src/index.rs @@ -46,6 +46,7 @@ impl Debug for NullableIdxSize { impl NullableIdxSize { #[inline(always)] pub fn is_null_idx(&self) -> bool { + // The left/right join maintain_order algorithms depend on the special value for sorting self.inner == IdxSize::MAX } diff --git a/py-polars/polars/_typing.py b/py-polars/polars/_typing.py index d614c1276e8f..92b4109f620b 100644 --- a/py-polars/polars/_typing.py +++ b/py-polars/polars/_typing.py @@ -108,6 +108,9 @@ def __arrow_c_stream__(self, requested_schema: object | None = None) -> object: IpcCompression: TypeAlias = Literal["uncompressed", "lz4", "zstd"] JoinValidation: TypeAlias = Literal["m:m", "m:1", "1:m", "1:1"] Label: TypeAlias = Literal["left", "right", "datapoint"] +MaintainOrderJoin: TypeAlias = Literal[ + "none", "left", "right", "left_right", "right_left" +] NonExistent: TypeAlias = Literal["raise", "null"] NullBehavior: TypeAlias = Literal["ignore", "drop"] ParallelStrategy: TypeAlias = Literal[ diff --git a/py-polars/polars/dataframe/frame.py b/py-polars/polars/dataframe/frame.py index 101d965f8e80..c8185ba625a1 100644 --- a/py-polars/polars/dataframe/frame.py +++ b/py-polars/polars/dataframe/frame.py @@ -154,6 +154,7 @@ JoinStrategy, JoinValidation, Label, + MaintainOrderJoin, MultiColSelector, MultiIndexSelector, OneOrMoreDataTypes, @@ -7238,6 +7239,7 @@ def join( validate: JoinValidation = "m:m", join_nulls: bool = False, coalesce: bool | None = None, + maintain_order: MaintainOrderJoin | None = None, ) -> DataFrame: """ Join in SQL-like fashion. @@ -7301,6 +7303,24 @@ def join( .. note:: Joining on any other expressions than `col` will turn off coalescing. + maintain_order : {'none', 'left', 'right', 'left_right', 'right_left'} + Which DataFrame row order to preserve, if any. + Do not rely on any observed ordering without explicitly + setting this parameter, as your code may break in a future release. + Not specifying any ordering can improve performance + Supported for inner, left, right and full joins + + * *none* + No specific ordering is desired. The ordering might differ across + Polars versions or even between different runs. + * *left* + Preserves the order of the left DataFrame. + * *right* + Preserves the order of the right DataFrame. + * *left_right* + First preserves the order of the left DataFrame, then the right. + * *right_left* + First preserves the order of the right DataFrame, then the left. See Also -------- @@ -7398,6 +7418,7 @@ def join( validate=validate, join_nulls=join_nulls, coalesce=coalesce, + maintain_order=maintain_order, ) .collect(_eager=True) ) diff --git a/py-polars/polars/lazyframe/frame.py b/py-polars/polars/lazyframe/frame.py index 77f50efd9d85..c177d9184f78 100644 --- a/py-polars/polars/lazyframe/frame.py +++ b/py-polars/polars/lazyframe/frame.py @@ -109,6 +109,7 @@ JoinStrategy, JoinValidation, Label, + MaintainOrderJoin, Orientation, PolarsDataType, PythonDataType, @@ -4565,6 +4566,7 @@ def join( validate: JoinValidation = "m:m", join_nulls: bool = False, coalesce: bool | None = None, + maintain_order: MaintainOrderJoin | None = None, allow_parallel: bool = True, force_parallel: bool = False, ) -> LazyFrame: @@ -4597,7 +4599,6 @@ def join( Returns rows from the left table that have a match in the right table. * *anti* Returns rows from the left table that have no match in the right table. - left_on Join column of the left DataFrame. right_on @@ -4631,6 +4632,24 @@ def join( .. note:: Joining on any other expressions than `col` will turn off coalescing. + maintain_order : {'none', 'left', 'right', 'left_right', 'right_left'} + Which DataFrame row order to preserve, if any. + Do not rely on any observed ordering without explicitly + setting this parameter, as your code may break in a future release. + Not specifying any ordering can improve performance + Supported for inner, left, right and full joins + + * *none* + No specific ordering is desired. The ordering might differ across + Polars versions or even between different runs. + * *left* + Preserves the order of the left DataFrame. + * *right* + Preserves the order of the right DataFrame. + * *left_right* + First preserves the order of the left DataFrame, then the right. + * *right_left* + First preserves the order of the right DataFrame, then the left. allow_parallel Allow the physical plan to optionally evaluate the computation of both DataFrames up to the join in parallel. @@ -4714,6 +4733,9 @@ def join( msg = f"expected `other` join table to be a LazyFrame, not a {type(other).__name__!r}" raise TypeError(msg) + if maintain_order is None: + maintain_order = "none" + uses_on = on is not None uses_left_on = left_on is not None uses_right_on = right_on is not None @@ -4753,6 +4775,7 @@ def join( how, suffix, validate, + maintain_order, ) ) @@ -4778,6 +4801,7 @@ def join( how, suffix, validate, + maintain_order, coalesce, ) ) diff --git a/py-polars/tests/unit/operations/test_join.py b/py-polars/tests/unit/operations/test_join.py index 27cba18e18d5..73d6836d66ed 100644 --- a/py-polars/tests/unit/operations/test_join.py +++ b/py-polars/tests/unit/operations/test_join.py @@ -1151,3 +1151,126 @@ def test_join_full_19814() -> None: assert a.join(b, on="a", how="full", coalesce=True).collect().to_dict( as_series=False ) == {"a": [1, 3, 4], "c": [None, None, None]} + + +def test_join_preserve_order_inner() -> None: + left = pl.LazyFrame({"a": [None, 2, 1, 1, 5]}) + right = pl.LazyFrame({"a": [1, 1, None, 2], "b": [6, 7, 8, 9]}) + + # Inner joins + + inner_left = left.join(right, on="a", how="inner", maintain_order="left").collect() + assert inner_left.get_column("a").cast(pl.UInt32).to_list() == [2, 1, 1, 1, 1] + inner_left_right = left.join( + right, on="a", how="inner", maintain_order="left" + ).collect() + assert inner_left.get_column("a").equals(inner_left_right.get_column("a")) + + inner_right = left.join( + right, on="a", how="inner", maintain_order="right" + ).collect() + assert inner_right.get_column("a").cast(pl.UInt32).to_list() == [1, 1, 1, 1, 2] + inner_right_left = left.join( + right, on="a", how="inner", maintain_order="right" + ).collect() + assert inner_right.get_column("a").equals(inner_right_left.get_column("a")) + + +def test_join_preserve_order_left() -> None: + left = pl.LazyFrame({"a": [None, 2, 1, 1, 5]}) + right = pl.LazyFrame({"a": [1, None, 2, 6], "b": [6, 7, 8, 9]}) + + # Right now the left join algorithm is ordered without explicitly setting any order + # This behaviour is deprecated but can only be removed in 2.0 + left_none = left.join(right, on="a", how="left", maintain_order="none").collect() + assert left_none.get_column("a").cast(pl.UInt32).to_list() == [ + None, + 2, + 1, + 1, + 5, + ] + + left_left = left.join(right, on="a", how="left", maintain_order="left").collect() + assert left_left.get_column("a").cast(pl.UInt32).to_list() == [ + None, + 2, + 1, + 1, + 5, + ] + + left_left_right = left.join( + right, on="a", how="left", maintain_order="left_right" + ).collect() + # If the left order is preserved then there are no unsorted right rows + assert left_left.get_column("a").equals(left_left_right.get_column("a")) + + left_right = left.join(right, on="a", how="left", maintain_order="right").collect() + assert left_right.get_column("a").cast(pl.UInt32).to_list()[:5] == [ + 1, + 1, + 2, + None, + 5, + ] + + left_right_left = left.join( + right, on="a", how="left", maintain_order="right_left" + ).collect() + assert left_right_left.get_column("a").cast(pl.UInt32).to_list() == [ + 1, + 1, + 2, + None, + 5, + ] + + +def test_join_preserve_order_full() -> None: + left = pl.LazyFrame({"a": [None, 2, 1, 1, 5]}) + right = pl.LazyFrame({"a": [1, None, 2, 6], "b": [6, 7, 8, 9]}) + + full_left = left.join(right, on="a", how="full", maintain_order="left").collect() + print(full_left) + assert full_left.get_column("a").cast(pl.UInt32).to_list()[:5] == [ + None, + 2, + 1, + 1, + 5, + ] + full_right = left.join(right, on="a", how="full", maintain_order="right").collect() + assert full_right.get_column("a").cast(pl.UInt32).to_list()[:5] == [ + 1, + 1, + None, + 2, + None, + ] + + full_left_right = left.join( + right, on="a", how="full", maintain_order="left_right" + ).collect() + assert full_left_right.get_column("a_right").cast(pl.UInt32).to_list() == [ + None, + 2, + 1, + 1, + None, + None, + 6, + ] + + full_right_left = left.join( + right, on="a", how="full", maintain_order="right_left" + ).collect() + assert full_right_left.get_column("a").cast(pl.UInt32).to_list() == [ + 1, + 1, + None, + 2, + None, + None, + 5, + ]