Skip to content

Commit

Permalink
feat(rust,python,cli): add SQL support for UNION [ALL] BY NAME, add…
Browse files Browse the repository at this point in the history
… "diagonal_relaxed" strategy for `pl.concat` (#11597)
  • Loading branch information
alexander-beedie authored Oct 9, 2023
1 parent c444c80 commit 5eb499c
Show file tree
Hide file tree
Showing 14 changed files with 170 additions and 93 deletions.
6 changes: 3 additions & 3 deletions crates/polars-core/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ where
/// Concat [`DataFrame`]s horizontally.
#[cfg(feature = "horizontal_concat")]
/// Concat horizontally and extend with null values if lengths don't match
pub fn hor_concat_df(dfs: &[DataFrame]) -> PolarsResult<DataFrame> {
pub fn concat_df_horizontal(dfs: &[DataFrame]) -> PolarsResult<DataFrame> {
let max_len = dfs
.iter()
.map(|df| df.height())
Expand Down Expand Up @@ -98,7 +98,7 @@ pub fn hor_concat_df(dfs: &[DataFrame]) -> PolarsResult<DataFrame> {
/// Concat [`DataFrame`]s diagonally.
#[cfg(feature = "diagonal_concat")]
/// Concat diagonally thereby combining different schemas.
pub fn diag_concat_df(dfs: &[DataFrame]) -> PolarsResult<DataFrame> {
pub fn concat_df_diagonal(dfs: &[DataFrame]) -> PolarsResult<DataFrame> {
// TODO! replace with lazy only?
let upper_bound_width = dfs.iter().map(|df| df.width()).sum();
let mut column_names = AHashSet::with_capacity(upper_bound_width);
Expand Down Expand Up @@ -175,7 +175,7 @@ mod test {
"d" => [1, 2]
]?;

let out = diag_concat_df(&[a, b, c])?;
let out = concat_df_diagonal(&[a, b, c])?;

let expected = df![
"a" => [Some(1), Some(2), None, None, Some(5), Some(7)],
Expand Down
37 changes: 18 additions & 19 deletions crates/polars-lazy/src/dsl/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,11 @@ pub(crate) fn concat_impl<L: AsRef<[LazyFrame]>>(
#[cfg(feature = "diagonal_concat")]
/// Concat [LazyFrame]s diagonally.
/// Calls [`concat`][concat()] internally.
pub fn diag_concat_lf<L: AsRef<[LazyFrame]>>(
lfs: L,
rechunk: bool,
parallel: bool,
pub fn concat_lf_diagonal<L: AsRef<[LazyFrame]>>(
inputs: L,
args: UnionArgs,
) -> PolarsResult<LazyFrame> {
let lfs = lfs.as_ref().to_vec();
let lfs = inputs.as_ref();
let schemas = lfs
.iter()
.map(|lf| lf.schema())
Expand All @@ -143,12 +142,12 @@ pub fn diag_concat_lf<L: AsRef<[LazyFrame]>>(
}
});
}

let lfs_with_all_columns = lfs
.into_iter()
.iter()
// Zip Frames with their Schemas
.zip(schemas)
.map(|(mut lf, lf_schema)| {
.map(|(lf, lf_schema)| {
let mut lf = lf.clone();
for (name, dtype) in total_schema.iter() {
// If a name from Total Schema is not present - append
if lf_schema.get_field(name).is_none() {
Expand All @@ -163,19 +162,11 @@ pub fn diag_concat_lf<L: AsRef<[LazyFrame]>>(
.map(|col_name| col(col_name))
.collect::<Vec<Expr>>(),
);

Ok(reordered_lf)
})
.collect::<PolarsResult<Vec<_>>>()?;

concat(
lfs_with_all_columns,
UnionArgs {
rechunk,
parallel,
to_supertypes: false,
},
)
concat(lfs_with_all_columns, args)
}

#[derive(Clone, Copy)]
Expand All @@ -195,7 +186,7 @@ impl Default for UnionArgs {
}
}

/// Concat multiple
/// Concat multiple [`LazyFrame`]s vertically.
pub fn concat<L: AsRef<[LazyFrame]>>(inputs: L, args: UnionArgs) -> PolarsResult<LazyFrame> {
concat_impl(
inputs,
Expand Down Expand Up @@ -241,7 +232,15 @@ mod test {
"d" => [1, 2]
]?;

let out = diag_concat_lf(&[a.lazy(), b.lazy(), c.lazy()], false, false)?.collect()?;
let out = concat_lf_diagonal(
&[a.lazy(), b.lazy(), c.lazy()],
UnionArgs {
rechunk: false,
parallel: false,
..Default::default()
},
)?
.collect()?;

let expected = df![
"a" => [Some(1), Some(2), None, None, Some(5), Some(7)],
Expand Down
1 change: 1 addition & 0 deletions crates/polars-sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ default = []
ipc = ["polars-lazy/ipc"]
parquet = ["polars-lazy/parquet"]
semi_anti_join = ["polars-lazy/semi_anti_join"]
diagonal_concat = ["polars-lazy/diagonal_concat"]
28 changes: 19 additions & 9 deletions crates/polars-sql/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,21 +199,31 @@ impl SQLContext {
) -> PolarsResult<LazyFrame> {
let left = self.process_set_expr(left, query)?;
let right = self.process_set_expr(right, query)?;
let concatenated = polars_lazy::dsl::concat(
vec![left, right],
UnionArgs {
parallel: true,
..Default::default()
},
);
let opts = UnionArgs {
parallel: true,
to_supertypes: true,
..Default::default()
};
match quantifier {
// UNION ALL
SetQuantifier::All => concatenated,
SetQuantifier::All => polars_lazy::dsl::concat(vec![left, right], opts),
// UNION [DISTINCT]
SetQuantifier::Distinct | SetQuantifier::None => {
let concatenated = polars_lazy::dsl::concat(vec![left, right], opts);
concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any))
},
// UNION ALL BY NAME
// TODO: add recognition for SetQuantifier::DistinctByName
// when "https://github.com/sqlparser-rs/sqlparser-rs/pull/997" is available
#[cfg(feature = "diagonal_concat")]
SetQuantifier::AllByName => concat_lf_diagonal(vec![left, right], opts),
// UNION [DISTINCT] BY NAME
#[cfg(feature = "diagonal_concat")]
SetQuantifier::ByName => {
let concatenated = concat_lf_diagonal(vec![left, right], opts);
concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any))
},
// TODO: support "UNION [ALL] BY NAME"
#[allow(unreachable_patterns)]
_ => polars_bail!(InvalidOperation: "UNION {} is not yet supported", quantifier),
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ pct_change = ["polars-core/pct_change", "polars-lazy?/pct_change"]
moment = ["polars-core/moment", "polars-lazy?/moment", "polars-ops/moment"]
range = ["polars-lazy?/range"]
true_div = ["polars-lazy?/true_div"]
diagonal_concat = ["polars-core/diagonal_concat", "polars-lazy?/diagonal_concat"]
diagonal_concat = ["polars-core/diagonal_concat", "polars-lazy?/diagonal_concat", "polars-sql?/diagonal_concat"]
horizontal_concat = ["polars-core/horizontal_concat"]
abs = ["polars-core/abs", "polars-lazy?/abs"]
dynamic_group_by = ["polars-core/dynamic_group_by", "polars-lazy?/dynamic_group_by"]
Expand Down
4 changes: 2 additions & 2 deletions docs/src/rust/user-guide/transformations/concatenation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
"r2"=> &[7, 8],
"r3"=> &[9, 10],
)?;
let df_horizontal_concat = polars::functions::hor_concat_df(&[df_h1, df_h2])?;
let df_horizontal_concat = polars::functions::concat_df_horizontal(&[df_h1, df_h2])?;
println!("{}", &df_horizontal_concat);
// --8<-- [end:horizontal]

Expand All @@ -42,7 +42,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let df_d2 = df!(
"a"=> &[2],
"d"=> &[4],)?;
let df_diagonal_concat = polars::functions::diag_concat_df(&[df_d1, df_d2])?;
let df_diagonal_concat = polars::functions::concat_df_diagonal(&[df_d1, df_d2])?;
println!("{}", &df_diagonal_concat);
// --8<-- [end:cross]
Ok(())
Expand Down
59 changes: 39 additions & 20 deletions py-polars/polars/functions/eager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
import contextlib
from functools import reduce
from itertools import chain
from typing import TYPE_CHECKING, Iterable, List, Sequence, cast
from typing import TYPE_CHECKING, Iterable, List, Sequence, cast, get_args

import polars._reexport as pl
from polars import functions as F
from polars.type_aliases import FrameType
from polars.type_aliases import ConcatMethod, FrameType
from polars.utils._wrap import wrap_df, wrap_expr, wrap_ldf, wrap_s
from polars.utils.various import ordered_unique

Expand All @@ -16,7 +16,7 @@

if TYPE_CHECKING:
from polars import DataFrame, Expr, LazyFrame, Series
from polars.type_aliases import ConcatMethod, JoinStrategy, PolarsType
from polars.type_aliases import JoinStrategy, PolarsType


def concat(
Expand All @@ -38,10 +38,12 @@ def concat(
LazyFrames do not support the `horizontal` strategy.
* vertical: Applies multiple `vstack` operations.
* vertical_relaxed: Applies multiple `vstack` operations and coerces column
dtypes that are not equal to their supertypes.
* vertical_relaxed: Same as `vertical`, but additionally coerces columns to
their common supertype *if* they are mismatched (eg: Int32 → Int64).
* diagonal: Finds a union between the column schemas and fills missing column
values with ``null``.
* diagonal_relaxed: Same as `diagonal`, but additionally coerces columns to
their common supertype *if* they are mismatched (eg: Int32 → Int64).
* horizontal: Stacks Series from DataFrames horizontally and fills with ``null``
if the lengths don't match.
* align: Combines frames horizontally, auto-determining the common key columns
Expand Down Expand Up @@ -175,41 +177,58 @@ def concat(
to_supertypes=True,
)
).collect(no_optimization=True)

elif how == "diagonal":
out = wrap_df(plr.diag_concat_df(elems))
out = wrap_df(plr.concat_df_diagonal(elems))
elif how == "diagonal_relaxed":
out = wrap_ldf(
plr.concat_lf_diagonal(
[df.lazy() for df in elems],
rechunk=rechunk,
parallel=parallel,
to_supertypes=True,
)
).collect(no_optimization=True)
elif how == "horizontal":
out = wrap_df(plr.hor_concat_df(elems))
out = wrap_df(plr.concat_df_horizontal(elems))
else:
allowed = ", ".join(repr(m) for m in get_args(ConcatMethod))
raise ValueError(
f"`how` must be one of {{'vertical','vertical_relaxed','diagonal','horizontal','align'}},"
f" got {how!r}"
f"DataFrame `how` must be one of {{{allowed}}}, got {how!r}"
)

elif isinstance(first, pl.LazyFrame):
if how == "vertical":
if how in ("vertical", "vertical_relaxed"):
return wrap_ldf(
plr.concat_lf(
elems, rechunk=rechunk, parallel=parallel, to_supertypes=False
elems,
rechunk=rechunk,
parallel=parallel,
to_supertypes=how.endswith("relaxed"),
)
)
if how == "vertical_relaxed":
elif how in ("diagonal", "diagonal_relaxed"):
return wrap_ldf(
plr.concat_lf(
elems, rechunk=rechunk, parallel=parallel, to_supertypes=True
plr.concat_lf_diagonal(
elems,
rechunk=rechunk,
parallel=parallel,
to_supertypes=how.endswith("relaxed"),
)
)
if how == "diagonal":
return wrap_ldf(
plr.diag_concat_lf(elems, rechunk=rechunk, parallel=parallel)
)
else:
allowed = ", ".join(
repr(m) for m in get_args(ConcatMethod) if m != "horizontal"
)
raise ValueError(
"'LazyFrame' only allows {'vertical','vertical_relaxed','diagonal','align'} concat strategies"
f"LazyFrame `how` must be one of {{{allowed}}}, got {how!r}"
)

elif isinstance(first, pl.Series):
if how == "vertical":
out = wrap_s(plr.concat_series(elems))
else:
raise ValueError("Series only allows {'vertical'} concat strategy")
raise ValueError("Series only supports 'vertical' concat strategy")

elif isinstance(first, pl.Expr):
return wrap_expr(plr.concat_expr([e._pyexpr for e in elems], rechunk))
Expand Down
7 changes: 6 additions & 1 deletion py-polars/polars/type_aliases.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,12 @@
# The following have no equivalent on the Rust side
Ambiguous: TypeAlias = Literal["earliest", "latest", "raise"]
ConcatMethod = Literal[
"vertical", "vertical_relaxed", "diagonal", "horizontal", "align"
"vertical",
"vertical_relaxed",
"diagonal",
"diagonal_relaxed",
"horizontal",
"align",
]
EpochTimeUnit = Literal["ns", "us", "ms", "s", "d"]
Orientation: TypeAlias = Literal["col", "row"]
Expand Down
8 changes: 4 additions & 4 deletions py-polars/src/functions/eager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub fn concat_series(series: &PyAny) -> PyResult<PySeries> {
}

#[pyfunction]
pub fn diag_concat_df(dfs: &PyAny) -> PyResult<PyDataFrame> {
pub fn concat_df_diagonal(dfs: &PyAny) -> PyResult<PyDataFrame> {
let iter = dfs.iter()?;

let dfs = iter
Expand All @@ -73,12 +73,12 @@ pub fn diag_concat_df(dfs: &PyAny) -> PyResult<PyDataFrame> {
})
.collect::<PyResult<Vec<_>>>()?;

let df = functions::diag_concat_df(&dfs).map_err(PyPolarsErr::from)?;
let df = functions::concat_df_diagonal(&dfs).map_err(PyPolarsErr::from)?;
Ok(df.into())
}

#[pyfunction]
pub fn hor_concat_df(dfs: &PyAny) -> PyResult<PyDataFrame> {
pub fn concat_df_horizontal(dfs: &PyAny) -> PyResult<PyDataFrame> {
let iter = dfs.iter()?;

let dfs = iter
Expand All @@ -88,6 +88,6 @@ pub fn hor_concat_df(dfs: &PyAny) -> PyResult<PyDataFrame> {
})
.collect::<PyResult<Vec<_>>>()?;

let df = functions::hor_concat_df(&dfs).map_err(PyPolarsErr::from)?;
let df = functions::concat_df_horizontal(&dfs).map_err(PyPolarsErr::from)?;
Ok(df.into())
}
17 changes: 15 additions & 2 deletions py-polars/src/functions/lazy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,12 @@ pub fn datetime(
}

#[pyfunction]
pub fn diag_concat_lf(lfs: &PyAny, rechunk: bool, parallel: bool) -> PyResult<PyLazyFrame> {
pub fn concat_lf_diagonal(
lfs: &PyAny,
rechunk: bool,
parallel: bool,
to_supertypes: bool,
) -> PyResult<PyLazyFrame> {
let iter = lfs.iter()?;

let lfs = iter
Expand All @@ -269,7 +274,15 @@ pub fn diag_concat_lf(lfs: &PyAny, rechunk: bool, parallel: bool) -> PyResult<Py
})
.collect::<PyResult<Vec<_>>>()?;

let lf = dsl::functions::diag_concat_lf(lfs, rechunk, parallel).map_err(PyPolarsErr::from)?;
let lf = dsl::functions::concat_lf_diagonal(
lfs,
UnionArgs {
rechunk,
parallel,
to_supertypes,
},
)
.map_err(PyPolarsErr::from)?;
Ok(lf.into())
}

Expand Down
8 changes: 4 additions & 4 deletions py-polars/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ fn polars(py: Python, m: &PyModule) -> PyResult<()> {
.unwrap();
m.add_wrapped(wrap_pyfunction!(functions::eager::concat_series))
.unwrap();
m.add_wrapped(wrap_pyfunction!(functions::eager::diag_concat_df))
m.add_wrapped(wrap_pyfunction!(functions::eager::concat_df_diagonal))
.unwrap();
m.add_wrapped(wrap_pyfunction!(functions::eager::hor_concat_df))
m.add_wrapped(wrap_pyfunction!(functions::eager::concat_df_horizontal))
.unwrap();

// Functions - range
Expand Down Expand Up @@ -161,10 +161,10 @@ fn polars(py: Python, m: &PyModule) -> PyResult<()> {
.unwrap();
m.add_wrapped(wrap_pyfunction!(functions::lazy::datetime))
.unwrap();
m.add_wrapped(wrap_pyfunction!(functions::lazy::diag_concat_lf))
.unwrap();
m.add_wrapped(wrap_pyfunction!(functions::lazy::concat_expr))
.unwrap();
m.add_wrapped(wrap_pyfunction!(functions::lazy::concat_lf_diagonal))
.unwrap();
m.add_wrapped(wrap_pyfunction!(functions::lazy::dtype_cols))
.unwrap();
m.add_wrapped(wrap_pyfunction!(functions::lazy::duration))
Expand Down
Loading

0 comments on commit 5eb499c

Please sign in to comment.