From df2100a252c77f7ba41f4cae6ec09015ba51d5da Mon Sep 17 00:00:00 2001 From: Gijs Burghoorn Date: Wed, 6 Nov 2024 22:14:02 +0100 Subject: [PATCH] refactor: Get `Column` into `polars-expr` (#19660) --- .../src/chunked_array/from_iterator.rs | 9 + .../src/frame/column/arithmetic.rs | 69 +---- crates/polars-core/src/frame/column/mod.rs | 260 +++++++++++++++++- .../src/frame/column/partitioned.rs | 15 + .../frame/group_by/aggregations/dispatch.rs | 2 +- .../src/expressions/aggregation.rs | 175 ++++++------ crates/polars-expr/src/expressions/alias.rs | 20 +- crates/polars-expr/src/expressions/apply.rs | 24 +- crates/polars-expr/src/expressions/binary.rs | 80 ++++-- crates/polars-expr/src/expressions/cast.rs | 31 ++- crates/polars-expr/src/expressions/column.rs | 18 +- crates/polars-expr/src/expressions/count.rs | 15 +- crates/polars-expr/src/expressions/filter.rs | 2 +- crates/polars-expr/src/expressions/gather.rs | 8 +- crates/polars-expr/src/expressions/literal.rs | 66 +++-- crates/polars-expr/src/expressions/mod.rs | 12 +- crates/polars-expr/src/expressions/rolling.rs | 4 +- crates/polars-expr/src/expressions/slice.rs | 8 +- crates/polars-expr/src/expressions/sort.rs | 2 +- crates/polars-expr/src/expressions/sortby.rs | 14 +- crates/polars-expr/src/expressions/ternary.rs | 8 +- crates/polars-expr/src/expressions/window.rs | 15 +- crates/polars-lazy/src/dsl/list.rs | 4 +- .../streaming/construct_pipeline.rs | 4 +- .../polars-mem-engine/src/executors/filter.rs | 4 +- .../src/executors/group_by_partitioned.rs | 6 +- .../polars-mem-engine/src/executors/join.rs | 4 +- .../src/executors/projection_utils.rs | 14 +- .../polars-mem-engine/src/executors/stack.rs | 14 +- crates/polars-stream/src/expression.rs | 4 +- crates/polars-stream/src/nodes/group_by.rs | 5 +- crates/polars-stream/src/nodes/reduce.rs | 2 +- 32 files changed, 580 insertions(+), 338 deletions(-) diff --git a/crates/polars-core/src/chunked_array/from_iterator.rs b/crates/polars-core/src/chunked_array/from_iterator.rs index ba9e8d1e6ccc..de5c3f89ee44 100644 --- a/crates/polars-core/src/chunked_array/from_iterator.rs +++ b/crates/polars-core/src/chunked_array/from_iterator.rs @@ -152,6 +152,15 @@ where } } +impl FromIterator> for ListChunked { + fn from_iter>>(iter: T) -> Self { + ListChunked::from_iter( + iter.into_iter() + .map(|c| c.map(|c| c.take_materialized_series())), + ) + } +} + impl FromIterator> for ListChunked { #[inline] fn from_iter>>(iter: I) -> Self { diff --git a/crates/polars-core/src/frame/column/arithmetic.rs b/crates/polars-core/src/frame/column/arithmetic.rs index 97907f3457b9..8018ee4527e6 100644 --- a/crates/polars-core/src/frame/column/arithmetic.rs +++ b/crates/polars-core/src/frame/column/arithmetic.rs @@ -1,70 +1,7 @@ use num_traits::{Num, NumCast}; -use polars_error::{polars_bail, PolarsResult}; +use polars_error::PolarsResult; use super::{Column, ScalarColumn, Series}; -use crate::utils::Container; - -fn output_length(a: &Column, b: &Column) -> PolarsResult { - match (a.len(), b.len()) { - // broadcasting - (1, o) | (o, 1) => Ok(o), - // equal - (a, b) if a == b => Ok(a), - // unequal - (a, b) => { - polars_bail!(InvalidOperation: "cannot do arithmetic operation on series of different lengths: got {} and {}", a, b) - }, - } -} - -fn unit_series_op PolarsResult>( - l: &Series, - r: &Series, - op: F, - length: usize, -) -> PolarsResult { - debug_assert!(l.len() <= 1); - debug_assert!(r.len() <= 1); - - op(l, r) - .map(|s| ScalarColumn::from_single_value_series(s, length)) - .map(Column::from) -} - -fn op_with_broadcast PolarsResult>( - l: &Column, - r: &Column, - op: F, -) -> PolarsResult { - // Here we rely on the underlying broadcast operations. - - let length = output_length(l, r)?; - match (l, r) { - (Column::Series(l), Column::Scalar(r)) => { - let r = r.as_single_value_series(); - if l.len() == 1 { - unit_series_op(l, &r, op, length) - } else { - op(l, &r).map(Column::from) - } - }, - (Column::Scalar(l), Column::Series(r)) => { - let l = l.as_single_value_series(); - if r.len() == 1 { - unit_series_op(&l, r, op, length) - } else { - op(&l, r).map(Column::from) - } - }, - (Column::Scalar(l), Column::Scalar(r)) => unit_series_op( - &l.as_single_value_series(), - &r.as_single_value_series(), - op, - length, - ), - (l, r) => op(l.as_materialized_series(), r.as_materialized_series()).map(Column::from), - } -} fn num_op_with_broadcast Series>( c: &'_ Column, @@ -90,7 +27,7 @@ macro_rules! broadcastable_ops { #[inline] fn $op(self, rhs: Self) -> Self::Output { - op_with_broadcast(&self, &rhs, |l, r| l.$op(r)) + self.try_apply_broadcasting_binary_elementwise(&rhs, |l, r| l.$op(r)) } } @@ -99,7 +36,7 @@ macro_rules! broadcastable_ops { #[inline] fn $op(self, rhs: Self) -> Self::Output { - op_with_broadcast(self, rhs, |l, r| l.$op(r)) + self.try_apply_broadcasting_binary_elementwise(rhs, |l, r| l.$op(r)) } } )+ diff --git a/crates/polars-core/src/frame/column/mod.rs b/crates/polars-core/src/frame/column/mod.rs index d21ec3ac8536..16eb940022b3 100644 --- a/crates/polars-core/src/frame/column/mod.rs +++ b/crates/polars-core/src/frame/column/mod.rs @@ -670,6 +670,16 @@ impl Column { unsafe { self.as_materialized_series().agg_list(groups) }.into() } + /// # Safety + /// + /// Does no bounds checks, groups must be correct. + #[cfg(feature = "algorithm_group_by")] + pub fn agg_valid_count(&self, groups: &GroupsProxy) -> Self { + // @partition-opt + // @scalar-opt + unsafe { self.as_materialized_series().agg_valid_count(groups) }.into() + } + pub fn full_null(name: PlSmallStr, size: usize, dtype: &DataType) -> Self { Series::full_null(name, size, dtype).into() // @TODO: This causes failures @@ -1018,15 +1028,64 @@ impl Column { } pub fn bitand(&self, rhs: &Self) -> PolarsResult { + // @partition-opt + // @scalar-opt self.as_materialized_series() .bitand(rhs.as_materialized_series()) .map(Column::from) } + pub fn bitor(&self, rhs: &Self) -> PolarsResult { + // @partition-opt + // @scalar-opt + self.as_materialized_series() + .bitor(rhs.as_materialized_series()) + .map(Column::from) + } + pub fn bitxor(&self, rhs: &Self) -> PolarsResult { + // @partition-opt + // @scalar-opt + self.as_materialized_series() + .bitxor(rhs.as_materialized_series()) + .map(Column::from) + } + + pub fn try_add_owned(self, other: Self) -> PolarsResult { + // @partition-opt + // @scalar-opt + self.take_materialized_series() + .try_add_owned(other.take_materialized_series()) + .map(Column::from) + } + pub fn try_sub_owned(self, other: Self) -> PolarsResult { + // @partition-opt + // @scalar-opt + self.take_materialized_series() + .try_sub_owned(other.take_materialized_series()) + .map(Column::from) + } + pub fn try_mul_owned(self, other: Self) -> PolarsResult { + // @partition-opt + // @scalar-opt + self.take_materialized_series() + .try_mul_owned(other.take_materialized_series()) + .map(Column::from) + } pub(crate) fn str_value(&self, index: usize) -> PolarsResult> { Ok(self.get(index)?.str_value()) } + pub fn min_reduce(&self) -> PolarsResult { + match self { + Column::Series(s) => s.min_reduce(), + Column::Partitioned(s) => s.min_reduce(), + Column::Scalar(s) => { + // We don't really want to deal with handling the full semantics here so we just + // cast to a single value series. This is a tiny bit wasteful, but probably fine. + s.as_single_value_series().min_reduce() + }, + } + } pub fn max_reduce(&self) -> PolarsResult { match self { Column::Series(s) => s.max_reduce(), @@ -1038,25 +1097,114 @@ impl Column { }, } } - - pub fn min_reduce(&self) -> PolarsResult { + pub fn median_reduce(&self) -> PolarsResult { match self { - Column::Series(s) => s.min_reduce(), - Column::Partitioned(s) => s.min_reduce(), + Column::Series(s) => s.median_reduce(), + Column::Partitioned(s) => s.as_materialized_series().median_reduce(), Column::Scalar(s) => { // We don't really want to deal with handling the full semantics here so we just // cast to a single value series. This is a tiny bit wasteful, but probably fine. - s.as_single_value_series().min_reduce() + s.as_single_value_series().median_reduce() + }, + } + } + pub fn mean_reduce(&self) -> Scalar { + match self { + Column::Series(s) => s.mean_reduce(), + Column::Partitioned(s) => s.as_materialized_series().mean_reduce(), + Column::Scalar(s) => { + // We don't really want to deal with handling the full semantics here so we just + // cast to a single value series. This is a tiny bit wasteful, but probably fine. + s.as_single_value_series().mean_reduce() }, } } + pub fn std_reduce(&self, ddof: u8) -> PolarsResult { + match self { + Column::Series(s) => s.std_reduce(ddof), + Column::Partitioned(s) => s.as_materialized_series().std_reduce(ddof), + Column::Scalar(s) => { + // We don't really want to deal with handling the full semantics here so we just + // cast to a single value series. This is a tiny bit wasteful, but probably fine. + s.as_single_value_series().std_reduce(ddof) + }, + } + } + pub fn var_reduce(&self, ddof: u8) -> PolarsResult { + match self { + Column::Series(s) => s.var_reduce(ddof), + Column::Partitioned(s) => s.as_materialized_series().var_reduce(ddof), + Column::Scalar(s) => { + // We don't really want to deal with handling the full semantics here so we just + // cast to a single value series. This is a tiny bit wasteful, but probably fine. + s.as_single_value_series().var_reduce(ddof) + }, + } + } + pub fn sum_reduce(&self) -> PolarsResult { + // @partition-opt + // @scalar-opt + self.as_materialized_series().sum_reduce() + } + pub fn and_reduce(&self) -> PolarsResult { + match self { + Column::Series(s) => s.and_reduce(), + Column::Partitioned(s) => s.and_reduce(), + Column::Scalar(s) => { + // We don't really want to deal with handling the full semantics here so we just + // cast to a single value series. This is a tiny bit wasteful, but probably fine. + s.as_single_value_series().and_reduce() + }, + } + } + pub fn or_reduce(&self) -> PolarsResult { + match self { + Column::Series(s) => s.or_reduce(), + Column::Partitioned(s) => s.or_reduce(), + Column::Scalar(s) => { + // We don't really want to deal with handling the full semantics here so we just + // cast to a single value series. This is a tiny bit wasteful, but probably fine. + s.as_single_value_series().or_reduce() + }, + } + } + pub fn xor_reduce(&self) -> PolarsResult { + match self { + Column::Series(s) => s.xor_reduce(), + // @partition-opt + Column::Partitioned(s) => s.as_materialized_series().xor_reduce(), + Column::Scalar(s) => { + // We don't really want to deal with handling the full semantics here so we just + // cast to a single value series. This is a tiny bit wasteful, but probably fine. + s.as_single_value_series().xor_reduce() + }, + } + } + pub fn n_unique(&self) -> PolarsResult { + match self { + Column::Series(s) => s.n_unique(), + Column::Partitioned(s) => s.partitions().n_unique(), + // @scalar-opt + Column::Scalar(s) => s.as_single_value_series().n_unique(), + } + } + pub fn quantile_reduce(&self, quantile: f64, method: QuantileMethod) -> PolarsResult { + self.as_materialized_series() + .quantile_reduce(quantile, method) + } + + pub fn implode(&self) -> PolarsResult { + // @partition-opt + // @scalar-opt + self.as_materialized_series().implode() + } pub(crate) fn estimated_size(&self) -> usize { // @scalar-opt self.as_materialized_series().estimated_size() } - pub(crate) fn sort_with(&self, options: SortOptions) -> PolarsResult { + pub fn sort_with(&self, options: SortOptions) -> PolarsResult { match self { Column::Series(s) => s.sort_with(options).map(Self::from), // @partition-opt @@ -1083,7 +1231,6 @@ impl Column { }, } } - pub fn try_apply_unary_elementwise( &self, f: impl Fn(&Series) -> PolarsResult, @@ -1099,6 +1246,105 @@ impl Column { } } + pub fn apply_broadcasting_binary_elementwise( + &self, + other: &Self, + op: impl Fn(&Series, &Series) -> Series, + ) -> PolarsResult { + self.try_apply_broadcasting_binary_elementwise(other, |lhs, rhs| Ok(op(lhs, rhs))) + } + pub fn try_apply_broadcasting_binary_elementwise( + &self, + other: &Self, + op: impl Fn(&Series, &Series) -> PolarsResult, + ) -> PolarsResult { + fn output_length(a: &Column, b: &Column) -> PolarsResult { + match (a.len(), b.len()) { + // broadcasting + (1, o) | (o, 1) => Ok(o), + // equal + (a, b) if a == b => Ok(a), + // unequal + (a, b) => { + polars_bail!(InvalidOperation: "cannot do a binary operation on columns of different lengths: got {} and {}", a, b) + }, + } + } + + // Here we rely on the underlying broadcast operations. + let length = output_length(self, other)?; + match (self, other) { + (Column::Series(lhs), Column::Series(rhs)) => op(lhs, rhs).map(Column::from), + (Column::Series(lhs), Column::Scalar(rhs)) => { + op(lhs, &rhs.as_single_value_series()).map(Column::from) + }, + (Column::Scalar(lhs), Column::Series(rhs)) => { + op(&lhs.as_single_value_series(), rhs).map(Column::from) + }, + (Column::Scalar(lhs), Column::Scalar(rhs)) => { + let lhs = lhs.as_single_value_series(); + let rhs = rhs.as_single_value_series(); + + let result = op(&lhs, &rhs)?; + if result.is_empty() { + Ok(result.into_column()) + } else { + Ok(ScalarColumn::from_single_value_series(result, length).into_column()) + } + }, + // @partition-opt + (lhs, rhs) => { + op(lhs.as_materialized_series(), rhs.as_materialized_series()).map(Column::from) + }, + } + } + + pub fn apply_binary_elementwise( + &self, + other: &Self, + f: impl Fn(&Series, &Series) -> Series, + f_lb: impl Fn(&Scalar, &Series) -> Series, + f_rb: impl Fn(&Series, &Scalar) -> Series, + ) -> Column { + self.try_apply_binary_elementwise( + other, + |lhs, rhs| Ok(f(lhs, rhs)), + |lhs, rhs| Ok(f_lb(lhs, rhs)), + |lhs, rhs| Ok(f_rb(lhs, rhs)), + ) + .unwrap() + } + pub fn try_apply_binary_elementwise( + &self, + other: &Self, + f: impl Fn(&Series, &Series) -> PolarsResult, + f_lb: impl Fn(&Scalar, &Series) -> PolarsResult, + f_rb: impl Fn(&Series, &Scalar) -> PolarsResult, + ) -> PolarsResult { + debug_assert_eq!(self.len(), other.len()); + + match (self, other) { + (Column::Series(lhs), Column::Series(rhs)) => f(lhs, rhs).map(Column::from), + (Column::Series(lhs), Column::Scalar(rhs)) => f_rb(lhs, rhs.scalar()).map(Column::from), + (Column::Scalar(lhs), Column::Series(rhs)) => f_lb(lhs.scalar(), rhs).map(Column::from), + (Column::Scalar(lhs), Column::Scalar(rhs)) => { + let lhs = lhs.as_single_value_series(); + let rhs = rhs.as_single_value_series(); + + let result = f(&lhs, &rhs)?; + if result.is_empty() { + Ok(result.into_column()) + } else { + Ok(ScalarColumn::from_single_value_series(result, self.len()).into_column()) + } + }, + // @partition-opt + (lhs, rhs) => { + f(lhs.as_materialized_series(), rhs.as_materialized_series()).map(Column::from) + }, + } + } + #[cfg(feature = "approx_unique")] pub fn approx_n_unique(&self) -> PolarsResult { match self { diff --git a/crates/polars-core/src/frame/column/partitioned.rs b/crates/polars-core/src/frame/column/partitioned.rs index a22e697290ec..16d4e9538634 100644 --- a/crates/polars-core/src/frame/column/partitioned.rs +++ b/crates/polars-core/src/frame/column/partitioned.rs @@ -274,4 +274,19 @@ impl PartitionedColumn { pub fn clear(&self) -> Self { Self::new_empty(self.name.clone(), self.values.dtype().clone()) } + + pub fn partitions(&self) -> &Series { + &self.values + } + pub fn partition_ends(&self) -> &[IdxSize] { + &self.ends + } + + pub fn or_reduce(&self) -> PolarsResult { + self.values.or_reduce() + } + + pub fn and_reduce(&self) -> PolarsResult { + self.values.and_reduce() + } } diff --git a/crates/polars-core/src/frame/group_by/aggregations/dispatch.rs b/crates/polars-core/src/frame/group_by/aggregations/dispatch.rs index aaf24a470969..8f01ce3f291a 100644 --- a/crates/polars-core/src/frame/group_by/aggregations/dispatch.rs +++ b/crates/polars-core/src/frame/group_by/aggregations/dispatch.rs @@ -15,7 +15,7 @@ impl Series { } #[doc(hidden)] - pub fn agg_valid_count(&self, groups: &GroupsProxy) -> Series { + pub unsafe fn agg_valid_count(&self, groups: &GroupsProxy) -> Series { // Prevent a rechunk for every individual group. let s = if groups.len() > 1 && self.null_count() > 0 { self.rechunk() diff --git a/crates/polars-expr/src/expressions/aggregation.rs b/crates/polars-expr/src/expressions/aggregation.rs index f1cfa5251899..fb691d746715 100644 --- a/crates/polars-expr/src/expressions/aggregation.rs +++ b/crates/polars-expr/src/expressions/aggregation.rs @@ -50,7 +50,7 @@ impl PhysicalExpr for AggregationExpr { None } - fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult { + fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult { let s = self.input.evaluate(df, state)?; let AggregationType { @@ -69,28 +69,29 @@ impl PhysicalExpr for AggregationExpr { GroupByMethod::Min => { if MetadataEnv::experimental_enabled() { if let Some(sc) = s.get_metadata().and_then(|v| v.min_value()) { - return Ok(sc.into_series(s.name().clone())); + return Ok(sc.into_column(s.name().clone())); } } match s.is_sorted_flag() { IsSorted::Ascending | IsSorted::Descending => { - s.min_reduce().map(|sc| sc.into_series(s.name().clone())) + s.min_reduce().map(|sc| sc.into_column(s.name().clone())) }, - IsSorted::Not => parallel_op_series( - |s| s.min_reduce().map(|sc| sc.into_series(s.name().clone())), + IsSorted::Not => parallel_op_columns( + |s| s.min_reduce().map(|sc| sc.into_column(s.name().clone())), s, allow_threading, ), } }, #[cfg(feature = "propagate_nans")] - GroupByMethod::NanMin => parallel_op_series( + GroupByMethod::NanMin => parallel_op_columns( |s| { Ok(polars_ops::prelude::nan_propagating_aggregate::nan_min_s( - &s, + s.as_materialized_series(), s.name().clone(), - )) + ) + .into_column()) }, s, allow_threading, @@ -102,28 +103,29 @@ impl PhysicalExpr for AggregationExpr { GroupByMethod::Max => { if MetadataEnv::experimental_enabled() { if let Some(sc) = s.get_metadata().and_then(|v| v.max_value()) { - return Ok(sc.into_series(s.name().clone())); + return Ok(sc.into_column(s.name().clone())); } } match s.is_sorted_flag() { IsSorted::Ascending | IsSorted::Descending => { - s.max_reduce().map(|sc| sc.into_series(s.name().clone())) + s.max_reduce().map(|sc| sc.into_column(s.name().clone())) }, - IsSorted::Not => parallel_op_series( - |s| s.max_reduce().map(|sc| sc.into_series(s.name().clone())), + IsSorted::Not => parallel_op_columns( + |s| s.max_reduce().map(|sc| sc.into_column(s.name().clone())), s, allow_threading, ), } }, #[cfg(feature = "propagate_nans")] - GroupByMethod::NanMax => parallel_op_series( + GroupByMethod::NanMax => parallel_op_columns( |s| { Ok(polars_ops::prelude::nan_propagating_aggregate::nan_max_s( - &s, + s.as_materialized_series(), s.name().clone(), - )) + ) + .into_column()) }, s, allow_threading, @@ -132,20 +134,20 @@ impl PhysicalExpr for AggregationExpr { GroupByMethod::NanMax => { panic!("activate 'propagate_nans' feature") }, - GroupByMethod::Median => s.median_reduce().map(|sc| sc.into_series(s.name().clone())), - GroupByMethod::Mean => Ok(s.mean_reduce().into_series(s.name().clone())), + GroupByMethod::Median => s.median_reduce().map(|sc| sc.into_column(s.name().clone())), + GroupByMethod::Mean => Ok(s.mean_reduce().into_column(s.name().clone())), GroupByMethod::First => Ok(if s.is_empty() { - Series::full_null(s.name().clone(), 1, s.dtype()) + Column::full_null(s.name().clone(), 1, s.dtype()) } else { s.head(Some(1)) }), GroupByMethod::Last => Ok(if s.is_empty() { - Series::full_null(s.name().clone(), 1, s.dtype()) + Column::full_null(s.name().clone(), 1, s.dtype()) } else { s.tail(Some(1)) }), - GroupByMethod::Sum => parallel_op_series( - |s| s.sum_reduce().map(|sc| sc.into_series(s.name().clone())), + GroupByMethod::Sum => parallel_op_columns( + |s| s.sum_reduce().map(|sc| sc.into_column(s.name().clone())), s, allow_threading, ), @@ -154,41 +156,41 @@ impl PhysicalExpr for AggregationExpr { if MetadataEnv::experimental_enabled() { if let Some(count) = s.get_metadata().and_then(|v| v.distinct_count()) { let count = count + IdxSize::from(s.null_count() > 0); - return Ok(IdxCa::from_slice(s.name().clone(), &[count]).into_series()); + return Ok(IdxCa::from_slice(s.name().clone(), &[count]).into_column()); } } s.n_unique().map(|count| { - IdxCa::from_slice(s.name().clone(), &[count as IdxSize]).into_series() + IdxCa::from_slice(s.name().clone(), &[count as IdxSize]).into_column() }) }, GroupByMethod::Count { include_nulls } => { let count = s.len() - s.null_count() * !include_nulls as usize; - Ok(IdxCa::from_slice(s.name().clone(), &[count as IdxSize]).into_series()) + Ok(IdxCa::from_slice(s.name().clone(), &[count as IdxSize]).into_column()) }, - GroupByMethod::Implode => s.implode().map(|ca| ca.into_series()), + GroupByMethod::Implode => s.implode().map(|ca| ca.into_column()), GroupByMethod::Std(ddof) => s .std_reduce(ddof) - .map(|sc| sc.into_series(s.name().clone())), + .map(|sc| sc.into_column(s.name().clone())), GroupByMethod::Var(ddof) => s .var_reduce(ddof) - .map(|sc| sc.into_series(s.name().clone())), + .map(|sc| sc.into_column(s.name().clone())), GroupByMethod::Quantile(_, _) => unimplemented!(), #[cfg(feature = "bitwise")] GroupByMethod::Bitwise(f) => match f { - GroupByBitwiseMethod::And => parallel_op_series( - |s| s.and_reduce().map(|sc| sc.into_series(s.name().clone())), + GroupByBitwiseMethod::And => parallel_op_columns( + |s| s.and_reduce().map(|sc| sc.into_column(s.name().clone())), s, allow_threading, ), - GroupByBitwiseMethod::Or => parallel_op_series( - |s| s.or_reduce().map(|sc| sc.into_series(s.name().clone())), + GroupByBitwiseMethod::Or => parallel_op_columns( + |s| s.or_reduce().map(|sc| sc.into_column(s.name().clone())), s, allow_threading, ), - GroupByBitwiseMethod::Xor => parallel_op_series( - |s| s.xor_reduce().map(|sc| sc.into_series(s.name().clone())), + GroupByBitwiseMethod::Xor => parallel_op_columns( + |s| s.xor_reduce().map(|sc| sc.into_column(s.name().clone())), s, allow_threading, ), @@ -223,27 +225,27 @@ impl PhysicalExpr for AggregationExpr { GroupByMethod::Min => { let (s, groups) = ac.get_final_aggregation(); let agg_s = s.agg_min(&groups); - AggregatedScalar(rename_series(agg_s, keep_name)) + AggregatedScalar(agg_s.with_name(keep_name)) }, GroupByMethod::Max => { let (s, groups) = ac.get_final_aggregation(); let agg_s = s.agg_max(&groups); - AggregatedScalar(rename_series(agg_s, keep_name)) + AggregatedScalar(agg_s.with_name(keep_name)) }, GroupByMethod::Median => { let (s, groups) = ac.get_final_aggregation(); let agg_s = s.agg_median(&groups); - AggregatedScalar(rename_series(agg_s, keep_name)) + AggregatedScalar(agg_s.with_name(keep_name)) }, GroupByMethod::Mean => { let (s, groups) = ac.get_final_aggregation(); let agg_s = s.agg_mean(&groups); - AggregatedScalar(rename_series(agg_s, keep_name)) + AggregatedScalar(agg_s.with_name(keep_name)) }, GroupByMethod::Sum => { let (s, groups) = ac.get_final_aggregation(); let agg_s = s.agg_sum(&groups); - AggregatedScalar(rename_series(agg_s, keep_name)) + AggregatedScalar(agg_s.with_name(keep_name)) }, GroupByMethod::Count { include_nulls } => { if include_nulls || ac.series().null_count() == 0 { @@ -321,7 +323,7 @@ impl PhysicalExpr for AggregationExpr { .map(|s| s.len() as IdxSize - s.null_count() as IdxSize) }) .collect(); - AggregatedScalar(rename_series(out.into_series(), keep_name)) + AggregatedScalar(out.into_series().with_name(keep_name)) }, AggState::NotAggregated(s) => { let s = s.clone(); @@ -371,17 +373,17 @@ impl PhysicalExpr for AggregationExpr { GroupByMethod::First => { let (s, groups) = ac.get_final_aggregation(); let agg_s = s.agg_first(&groups); - AggregatedScalar(rename_series(agg_s, keep_name)) + AggregatedScalar(agg_s.with_name(keep_name)) }, GroupByMethod::Last => { let (s, groups) = ac.get_final_aggregation(); let agg_s = s.agg_last(&groups); - AggregatedScalar(rename_series(agg_s, keep_name)) + AggregatedScalar(agg_s.with_name(keep_name)) }, GroupByMethod::NUnique => { let (s, groups) = ac.get_final_aggregation(); let agg_s = s.agg_n_unique(&groups); - AggregatedScalar(rename_series(agg_s, keep_name)) + AggregatedScalar(agg_s.with_name(keep_name)) }, GroupByMethod::Implode => { // if the aggregation is already @@ -404,7 +406,7 @@ impl PhysicalExpr for AggregationExpr { agg.as_list().into_series() }, }; - AggregatedList(rename_series(s, keep_name)) + AggregatedList(s.with_name(keep_name)) }, GroupByMethod::Groups => { let mut column: ListChunked = ac.groups().as_list_chunked(); @@ -414,12 +416,12 @@ impl PhysicalExpr for AggregationExpr { GroupByMethod::Std(ddof) => { let (s, groups) = ac.get_final_aggregation(); let agg_s = s.agg_std(&groups, ddof); - AggregatedScalar(rename_series(agg_s, keep_name)) + AggregatedScalar(agg_s.with_name(keep_name)) }, GroupByMethod::Var(ddof) => { let (s, groups) = ac.get_final_aggregation(); let agg_s = s.agg_var(&groups, ddof); - AggregatedScalar(rename_series(agg_s, keep_name)) + AggregatedScalar(agg_s.with_name(keep_name)) }, GroupByMethod::Quantile(_, _) => { // implemented explicitly in AggQuantile struct @@ -433,7 +435,7 @@ impl PhysicalExpr for AggregationExpr { GroupByBitwiseMethod::Or => s.agg_or(&groups), GroupByBitwiseMethod::Xor => s.agg_xor(&groups), }; - AggregatedScalar(rename_series(agg_s, keep_name)) + AggregatedScalar(agg_s.with_name(keep_name)) }, GroupByMethod::NanMin => { #[cfg(feature = "propagate_nans")] @@ -444,7 +446,7 @@ impl PhysicalExpr for AggregationExpr { } else { s.agg_min(&groups) }; - AggregatedScalar(rename_series(agg_s, keep_name)) + AggregatedScalar(agg_s.with_name(keep_name)) } #[cfg(not(feature = "propagate_nans"))] { @@ -460,7 +462,7 @@ impl PhysicalExpr for AggregationExpr { } else { s.agg_max(&groups) }; - AggregatedScalar(rename_series(agg_s, keep_name)) + AggregatedScalar(agg_s.with_name(keep_name)) } #[cfg(not(feature = "propagate_nans"))] { @@ -493,20 +495,15 @@ impl PhysicalExpr for AggregationExpr { } } -fn rename_series(mut s: Series, name: PlSmallStr) -> Series { - s.rename(name); - s -} - impl PartitionedAggregation for AggregationExpr { fn evaluate_partitioned( &self, df: &DataFrame, groups: &GroupsProxy, state: &ExecutionState, - ) -> PolarsResult { + ) -> PolarsResult { let expr = self.input.as_partitioned_aggregator().unwrap(); - let series = expr.evaluate_partitioned(df, groups, state)?; + let column = expr.evaluate_partitioned(df, groups, state)?; // SAFETY: // groups are in bounds @@ -514,15 +511,15 @@ impl PartitionedAggregation for AggregationExpr { match self.agg_type.groupby { #[cfg(feature = "dtype-struct")] GroupByMethod::Mean => { - let new_name = series.name().clone(); + let new_name = column.name().clone(); // ensure we don't overflow // the all 8 and 16 bits integers are already upcasted to int16 on `agg_sum` - let mut agg_s = if matches!(series.dtype(), DataType::Int32 | DataType::UInt32) + let mut agg_s = if matches!(column.dtype(), DataType::Int32 | DataType::UInt32) { - series.cast(&DataType::Int64).unwrap().agg_sum(groups) + column.cast(&DataType::Int64).unwrap().agg_sum(groups) } else { - series.agg_sum(groups) + column.agg_sum(groups) }; agg_s.rename(new_name.clone()); @@ -533,54 +530,52 @@ impl PartitionedAggregation for AggregationExpr { DataType::Float32 => agg_s, _ => agg_s.cast(&DataType::Float64).unwrap(), }; - let mut count_s = series.agg_valid_count(groups); + let mut count_s = column.agg_valid_count(groups); count_s.rename(PlSmallStr::from_static("__POLARS_COUNT")); - Ok(StructChunked::from_series( - new_name, - agg_s.len(), - [agg_s, count_s].iter(), + Ok( + StructChunked::from_columns(new_name, agg_s.len(), &[agg_s, count_s]) + .unwrap() + .into_column(), ) - .unwrap() - .into_series()) } }, GroupByMethod::Implode => { - let new_name = series.name().clone(); - let mut agg = series.agg_list(groups); + let new_name = column.name().clone(); + let mut agg = column.agg_list(groups); agg.rename(new_name); Ok(agg) }, GroupByMethod::First => { - let mut agg = series.agg_first(groups); - agg.rename(series.name().clone()); + let mut agg = column.agg_first(groups); + agg.rename(column.name().clone()); Ok(agg) }, GroupByMethod::Last => { - let mut agg = series.agg_last(groups); - agg.rename(series.name().clone()); + let mut agg = column.agg_last(groups); + agg.rename(column.name().clone()); Ok(agg) }, GroupByMethod::Max => { - let mut agg = series.agg_max(groups); - agg.rename(series.name().clone()); + let mut agg = column.agg_max(groups); + agg.rename(column.name().clone()); Ok(agg) }, GroupByMethod::Min => { - let mut agg = series.agg_min(groups); - agg.rename(series.name().clone()); + let mut agg = column.agg_min(groups); + agg.rename(column.name().clone()); Ok(agg) }, GroupByMethod::Sum => { - let mut agg = series.agg_sum(groups); - agg.rename(series.name().clone()); + let mut agg = column.agg_sum(groups); + agg.rename(column.name().clone()); Ok(agg) }, GroupByMethod::Count { include_nulls: true, } => { let mut ca = groups.group_count(); - ca.rename(series.name().clone()); - Ok(ca.into_series()) + ca.rename(column.name().clone()); + Ok(ca.into_column()) }, _ => { unimplemented!() @@ -591,10 +586,10 @@ impl PartitionedAggregation for AggregationExpr { fn finalize( &self, - partitioned: Series, + partitioned: Column, groups: &GroupsProxy, _state: &ExecutionState, - ) -> PolarsResult { + ) -> PolarsResult { match self.agg_type.groupby { GroupByMethod::Count { include_nulls: true, @@ -616,9 +611,9 @@ impl PartitionedAggregation for AggregationExpr { let (agg_count, agg_s) = unsafe { POOL.join(|| count.agg_sum(groups), || sum.agg_sum(groups)) }; let agg_s = &agg_s / &agg_count; - Ok(rename_series(agg_s?, new_name)) + Ok(agg_s?.with_name(new_name).into_column()) }, - _ => Ok(Series::full_null( + _ => Ok(Column::full_null( new_name, groups.len(), partitioned.dtype(), @@ -685,7 +680,7 @@ impl PartitionedAggregation for AggregationExpr { if can_fast_explode { ca.set_fast_explode() } - Ok(ca.into_series().as_list().into_series()) + Ok(ca.into_series().as_list().into_column()) }, GroupByMethod::First => { let mut agg = unsafe { partitioned.agg_first(groups) }; @@ -746,12 +741,12 @@ impl PhysicalExpr for AggQuantileExpr { None } - fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult { + fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult { let input = self.input.evaluate(df, state)?; let quantile = self.get_quantile(df, state)?; input .quantile_reduce(quantile, self.method) - .map(|sc| sc.into_series(input.name().clone())) + .map(|sc| sc.into_column(input.name().clone())) } #[allow(clippy::ptr_arg)] fn evaluate_on_groups<'a>( @@ -791,9 +786,9 @@ impl PhysicalExpr for AggQuantileExpr { /// Simple wrapper to parallelize functions that can be divided over threads aggregated and /// finally aggregated in the main thread. This can be done for sum, min, max, etc. -fn parallel_op_series(f: F, s: Series, allow_threading: bool) -> PolarsResult +fn parallel_op_columns(f: F, s: Column, allow_threading: bool) -> PolarsResult where - F: Fn(Series) -> PolarsResult + Send + Sync, + F: Fn(Column) -> PolarsResult + Send + Sync, { // set during debug low so // we mimic production size data behavior @@ -826,7 +821,7 @@ where let mut iter = chunks.into_iter(); let first = iter.next().unwrap(); let dtype = first.dtype(); - let out = iter.fold(first.to_physical_repr().into_owned(), |mut acc, s| { + let out = iter.fold(first.to_physical_repr(), |mut acc, s| { acc.append(&s.to_physical_repr()).unwrap(); acc }); diff --git a/crates/polars-expr/src/expressions/alias.rs b/crates/polars-expr/src/expressions/alias.rs index 8d321263a3f5..6144a1418de2 100644 --- a/crates/polars-expr/src/expressions/alias.rs +++ b/crates/polars-expr/src/expressions/alias.rs @@ -18,7 +18,7 @@ impl AliasExpr { } } - fn finish(&self, input: Series) -> Series { + fn finish(&self, input: Column) -> Column { input.with_name(self.name.clone()) } } @@ -28,7 +28,7 @@ impl PhysicalExpr for AliasExpr { Some(&self.expr) } - fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult { + fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult { let series = self.physical_expr.evaluate(df, state)?; Ok(self.finish(series)) } @@ -42,12 +42,16 @@ impl PhysicalExpr for AliasExpr { ) -> PolarsResult> { let mut ac = self.physical_expr.evaluate_on_groups(df, groups, state)?; let s = ac.take(); - let s = self.finish(s); + let s = self.finish(s.into()); if ac.is_literal() { - ac.with_literal(s); + ac.with_literal(s.take_materialized_series()); } else { - ac.with_series(s, ac.is_aggregated(), Some(&self.expr))?; + ac.with_series( + s.take_materialized_series(), + ac.is_aggregated(), + Some(&self.expr), + )?; } Ok(ac) } @@ -78,7 +82,7 @@ impl PartitionedAggregation for AliasExpr { df: &DataFrame, groups: &GroupsProxy, state: &ExecutionState, - ) -> PolarsResult { + ) -> PolarsResult { let agg = self.physical_expr.as_partitioned_aggregator().unwrap(); let s = agg.evaluate_partitioned(df, groups, state)?; Ok(s.with_name(self.name.clone())) @@ -86,10 +90,10 @@ impl PartitionedAggregation for AliasExpr { fn finalize( &self, - partitioned: Series, + partitioned: Column, groups: &GroupsProxy, state: &ExecutionState, - ) -> PolarsResult { + ) -> PolarsResult { let agg = self.physical_expr.as_partitioned_aggregator().unwrap(); let s = agg.finalize(partitioned, groups, state)?; Ok(s.with_name(self.name.clone())) diff --git a/crates/polars-expr/src/expressions/apply.rs b/crates/polars-expr/src/expressions/apply.rs index d6c37a5a004f..ddb4c37fac5d 100644 --- a/crates/polars-expr/src/expressions/apply.rs +++ b/crates/polars-expr/src/expressions/apply.rs @@ -321,7 +321,7 @@ impl PhysicalExpr for ApplyExpr { Some(&self.expr) } - fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult { + fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult { let f = |e: &Arc| e.evaluate(df, state); let mut inputs = if self.allow_threading && self.inputs.len() > 1 { POOL.install(|| { @@ -341,14 +341,9 @@ impl PhysicalExpr for ApplyExpr { if self.allow_rename { self.eval_and_flatten(&mut inputs) - .map(|c| c.as_materialized_series().clone()) } else { let in_name = inputs[0].name().clone(); - Ok(self - .eval_and_flatten(&mut inputs)? - .as_materialized_series() - .clone() - .with_name(in_name)) + Ok(self.eval_and_flatten(&mut inputs)?.with_name(in_name)) } } @@ -681,29 +676,24 @@ impl PartitionedAggregation for ApplyExpr { df: &DataFrame, groups: &GroupsProxy, state: &ExecutionState, - ) -> PolarsResult { + ) -> PolarsResult { let a = self.inputs[0].as_partitioned_aggregator().unwrap(); - let s = a.evaluate_partitioned(df, groups, state)?.into(); + let s = a.evaluate_partitioned(df, groups, state)?; if self.allow_rename { self.eval_and_flatten(&mut [s]) - .map(|c| c.as_materialized_series().clone()) } else { let in_name = s.name().clone(); - Ok(self - .eval_and_flatten(&mut [s])? - .as_materialized_series() - .clone() - .with_name(in_name)) + Ok(self.eval_and_flatten(&mut [s])?.with_name(in_name)) } } fn finalize( &self, - partitioned: Series, + partitioned: Column, _groups: &GroupsProxy, _state: &ExecutionState, - ) -> PolarsResult { + ) -> PolarsResult { Ok(partitioned) } } diff --git a/crates/polars-expr/src/expressions/binary.rs b/crates/polars-expr/src/expressions/binary.rs index 23f50af45273..7754c2b6633e 100644 --- a/crates/polars-expr/src/expressions/binary.rs +++ b/crates/polars-expr/src/expressions/binary.rs @@ -41,7 +41,7 @@ impl BinaryExpr { } /// Can partially do operations in place. -fn apply_operator_owned(left: Series, right: Series, op: Operator) -> PolarsResult { +fn apply_operator_owned(left: Column, right: Column, op: Operator) -> PolarsResult { match op { Operator::Plus => left.try_add_owned(right), Operator::Minus => left.try_sub_owned(right), @@ -52,15 +52,15 @@ fn apply_operator_owned(left: Series, right: Series, op: Operator) -> PolarsResu } } -pub fn apply_operator(left: &Series, right: &Series, op: Operator) -> PolarsResult { +pub fn apply_operator(left: &Column, right: &Column, op: Operator) -> PolarsResult { use DataType::*; match op { - Operator::Gt => ChunkCompareIneq::gt(left, right).map(|ca| ca.into_series()), - Operator::GtEq => ChunkCompareIneq::gt_eq(left, right).map(|ca| ca.into_series()), - Operator::Lt => ChunkCompareIneq::lt(left, right).map(|ca| ca.into_series()), - Operator::LtEq => ChunkCompareIneq::lt_eq(left, right).map(|ca| ca.into_series()), - Operator::Eq => ChunkCompareEq::equal(left, right).map(|ca| ca.into_series()), - Operator::NotEq => ChunkCompareEq::not_equal(left, right).map(|ca| ca.into_series()), + Operator::Gt => ChunkCompareIneq::gt(left, right).map(|ca| ca.into_column()), + Operator::GtEq => ChunkCompareIneq::gt_eq(left, right).map(|ca| ca.into_column()), + Operator::Lt => ChunkCompareIneq::lt(left, right).map(|ca| ca.into_column()), + Operator::LtEq => ChunkCompareIneq::lt_eq(left, right).map(|ca| ca.into_column()), + Operator::Eq => ChunkCompareEq::equal(left, right).map(|ca| ca.into_column()), + Operator::NotEq => ChunkCompareEq::not_equal(left, right).map(|ca| ca.into_column()), Operator::Plus => left + right, Operator::Minus => left - right, Operator::Multiply => left * right, @@ -87,7 +87,11 @@ pub fn apply_operator(left: &Series, right: &Series, op: Operator) -> PolarsResu Operator::FloorDivide => { #[cfg(feature = "round_series")] { - floor_div_series(left, right) + floor_div_series( + left.as_materialized_series(), + right.as_materialized_series(), + ) + .map(Column::from) } #[cfg(not(feature = "round_series"))] { @@ -104,8 +108,8 @@ pub fn apply_operator(left: &Series, right: &Series, op: Operator) -> PolarsResu .bitand(&right.cast(&DataType::Boolean)?), Operator::Xor => left.bitxor(right), Operator::Modulus => left % right, - Operator::EqValidity => left.equal_missing(right).map(|ca| ca.into_series()), - Operator::NotEqValidity => left.not_equal_missing(right).map(|ca| ca.into_series()), + Operator::EqValidity => left.equal_missing(right).map(|ca| ca.into_column()), + Operator::NotEqValidity => left.not_equal_missing(right).map(|ca| ca.into_column()), } } @@ -123,8 +127,8 @@ impl BinaryExpr { // Drop lhs so that we might operate in place. drop(ac_l.take()); - let out = apply_operator_owned(lhs, rhs, self.op)?; - ac_l.with_series(out, aggregated, Some(&self.expr))?; + let out = apply_operator_owned(lhs.into_column(), rhs.into_column(), self.op)?; + ac_l.with_series(out.take_materialized_series(), aggregated, Some(&self.expr))?; Ok(ac_l) } @@ -137,16 +141,16 @@ impl BinaryExpr { ac_l.groups(); ac_r.groups(); polars_ensure!(ac_l.groups.len() == ac_r.groups.len(), ComputeError: "lhs and rhs should have same group length"); - let left_s = ac_l.series().rechunk(); - let right_s = ac_r.series().rechunk(); + let left_s = ac_l.series().rechunk().into_column(); + let right_s = ac_r.series().rechunk().into_column(); let res_s = apply_operator(&left_s, &right_s, self.op)?; ac_l.with_update_groups(UpdateGroups::WithSeriesLen); let res_s = if res_s.len() == 1 { res_s.new_from_index(0, ac_l.groups.len()) } else { - ListChunked::full(name, &res_s, ac_l.groups.len()).into_series() + ListChunked::full(name, res_s.as_materialized_series(), ac_l.groups.len()).into_column() }; - ac_l.with_series(res_s, true, Some(&self.expr))?; + ac_l.with_series(res_s.take_materialized_series(), true, Some(&self.expr))?; Ok(ac_l) } @@ -159,7 +163,13 @@ impl BinaryExpr { let ca = ac_l .iter_groups(false) .zip(ac_r.iter_groups(false)) - .map(|(l, r)| Some(apply_operator(l?.as_ref(), r?.as_ref(), self.op))) + .map(|(l, r)| { + Some(apply_operator( + &l?.as_ref().clone().into_column(), + &r?.as_ref().clone().into_column(), + self.op, + )) + }) .map(|opt_res| opt_res.transpose()) .collect::>()? .with_name(name); @@ -175,7 +185,7 @@ impl PhysicalExpr for BinaryExpr { Some(&self.expr) } - fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult { + fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult { // Window functions may set a global state that determine their output // state, so we don't let them run in parallel as they race // they also saturate the thread pool by themselves, so that's fine. @@ -246,8 +256,10 @@ impl PhysicalExpr for BinaryExpr { (AggState::AggregatedList(lhs), AggState::AggregatedList(rhs)) => { let lhs = lhs.list().unwrap(); let rhs = rhs.list().unwrap(); - let out = - lhs.apply_to_inner(&|lhs| apply_operator(&lhs, &rhs.get_inner(), self.op))?; + let out = lhs.apply_to_inner(&|lhs| { + apply_operator(&lhs.into_column(), &rhs.get_inner().into_column(), self.op) + .map(|c| c.take_materialized_series()) + })?; ac_l.with_series(out.into_series(), true, Some(&self.expr))?; Ok(ac_l) }, @@ -279,7 +291,7 @@ mod stats { use super::*; - fn apply_operator_stats_eq(min_max: &Series, literal: &Series) -> bool { + fn apply_operator_stats_eq(min_max: &Column, literal: &Column) -> bool { use ChunkCompareIneq as C; // Literal is greater than max, don't need to read. if C::gt(literal, min_max).map(|s| s.all()).unwrap_or(false) { @@ -294,7 +306,7 @@ mod stats { true } - fn apply_operator_stats_neq(min_max: &Series, literal: &Series) -> bool { + fn apply_operator_stats_neq(min_max: &Column, literal: &Column) -> bool { if min_max.len() < 2 || min_max.null_count() > 0 { return true; } @@ -311,7 +323,7 @@ mod stats { true } - fn apply_operator_stats_rhs_lit(min_max: &Series, literal: &Series, op: Operator) -> bool { + fn apply_operator_stats_rhs_lit(min_max: &Column, literal: &Column, op: Operator) -> bool { use ChunkCompareIneq as C; match op { Operator::Eq => apply_operator_stats_eq(min_max, literal), @@ -347,7 +359,7 @@ mod stats { } } - fn apply_operator_stats_lhs_lit(literal: &Series, min_max: &Series, op: Operator) -> bool { + fn apply_operator_stats_lhs_lit(literal: &Column, min_max: &Column, op: Operator) -> bool { use ChunkCompareIneq as C; match op { Operator::Eq => apply_operator_stats_eq(min_max, literal), @@ -423,7 +435,11 @@ mod stats { // will be incorrect if not debug_assert_eq!(min_max_s.null_count(), 0); let lit_s = self.right.evaluate(&dummy, &state).unwrap(); - Ok(apply_operator_stats_rhs_lit(&min_max_s, &lit_s, self.op)) + Ok(apply_operator_stats_rhs_lit( + &min_max_s.into_column(), + &lit_s, + self.op, + )) }, } }, @@ -435,7 +451,11 @@ mod stats { // will be incorrect if not debug_assert_eq!(min_max_s.null_count(), 0); let lit_s = self.left.evaluate(&dummy, &state).unwrap(); - Ok(apply_operator_stats_lhs_lit(&lit_s, &min_max_s, self.op)) + Ok(apply_operator_stats_lhs_lit( + &lit_s, + &min_max_s.into_column(), + self.op, + )) }, } }, @@ -476,7 +496,7 @@ impl PartitionedAggregation for BinaryExpr { df: &DataFrame, groups: &GroupsProxy, state: &ExecutionState, - ) -> PolarsResult { + ) -> PolarsResult { let left = self.left.as_partitioned_aggregator().unwrap(); let right = self.right.as_partitioned_aggregator().unwrap(); let left = left.evaluate_partitioned(df, groups, state)?; @@ -486,10 +506,10 @@ impl PartitionedAggregation for BinaryExpr { fn finalize( &self, - partitioned: Series, + partitioned: Column, _groups: &GroupsProxy, _state: &ExecutionState, - ) -> PolarsResult { + ) -> PolarsResult { Ok(partitioned) } } diff --git a/crates/polars-expr/src/expressions/cast.rs b/crates/polars-expr/src/expressions/cast.rs index ebfd50311918..dcbd67d36a7e 100644 --- a/crates/polars-expr/src/expressions/cast.rs +++ b/crates/polars-expr/src/expressions/cast.rs @@ -12,7 +12,7 @@ pub struct CastExpr { } impl CastExpr { - fn finish(&self, input: &Series) -> PolarsResult { + fn finish(&self, input: &Column) -> PolarsResult { input.cast_with_options(&self.dtype, self.options) } } @@ -22,9 +22,9 @@ impl PhysicalExpr for CastExpr { Some(&self.expr) } - fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult { - let series = self.input.evaluate(df, state)?; - self.finish(&series) + fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult { + let column = self.input.evaluate(df, state)?; + self.finish(&column) } #[allow(clippy::ptr_arg)] @@ -40,15 +40,18 @@ impl PhysicalExpr for CastExpr { // this will not explode and potentially increase memory due to overlapping groups AggState::AggregatedList(s) => { let ca = s.list().unwrap(); - let casted = ca.apply_to_inner(&|s| self.finish(&s))?; + let casted = ca.apply_to_inner(&|s| { + self.finish(&s.into_column()) + .map(|c| c.take_materialized_series()) + })?; ac.with_series(casted.into_series(), true, None)?; }, AggState::AggregatedScalar(s) => { - let s = self.finish(s)?; + let s = self.finish(&s.clone().into_column())?; if ac.is_literal() { - ac.with_literal(s); + ac.with_literal(s.take_materialized_series()); } else { - ac.with_series(s, true, None)?; + ac.with_series(s.take_materialized_series(), true, None)?; } }, _ => { @@ -56,12 +59,12 @@ impl PhysicalExpr for CastExpr { ac.groups(); let s = ac.flat_naive(); - let s = self.finish(s.as_ref())?; + let s = self.finish(&s.as_ref().clone().into_column())?; if ac.is_literal() { - ac.with_literal(s); + ac.with_literal(s.take_materialized_series()); } else { - ac.with_series(s, false, None)?; + ac.with_series(s.take_materialized_series(), false, None)?; } }, } @@ -91,17 +94,17 @@ impl PartitionedAggregation for CastExpr { df: &DataFrame, groups: &GroupsProxy, state: &ExecutionState, - ) -> PolarsResult { + ) -> PolarsResult { let e = self.input.as_partitioned_aggregator().unwrap(); self.finish(&e.evaluate_partitioned(df, groups, state)?) } fn finalize( &self, - partitioned: Series, + partitioned: Column, groups: &GroupsProxy, state: &ExecutionState, - ) -> PolarsResult { + ) -> PolarsResult { let agg = self.input.as_partitioned_aggregator().unwrap(); agg.finalize(partitioned, groups, state) } diff --git a/crates/polars-expr/src/expressions/column.rs b/crates/polars-expr/src/expressions/column.rs index 8a59d6c25ddb..2142d22df6d9 100644 --- a/crates/polars-expr/src/expressions/column.rs +++ b/crates/polars-expr/src/expressions/column.rs @@ -140,7 +140,7 @@ impl PhysicalExpr for ColumnExpr { fn as_expression(&self) -> Option<&Expr> { Some(&self.expr) } - fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult { + fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult { let out = match self.schema.get_full(&self.name) { Some((idx, _, _)) => { // check if the schema was correct @@ -168,12 +168,12 @@ impl PhysicalExpr for ColumnExpr { // in debug builds we panic so that it can be fixed when occurring None => { if self.name.starts_with(CSE_REPLACED) { - return self.process_cse(df, &self.schema); + return self.process_cse(df, &self.schema).map(Column::from); } self.process_by_linear_search(df, state, true) }, }; - self.check_external_context(out, state) + self.check_external_context(out, state).map(Column::from) } #[allow(clippy::ptr_arg)] @@ -184,7 +184,11 @@ impl PhysicalExpr for ColumnExpr { state: &ExecutionState, ) -> PolarsResult> { let s = self.evaluate(df, state)?; - Ok(AggregationContext::new(s, Cow::Borrowed(groups), false)) + Ok(AggregationContext::new( + s.take_materialized_series(), + Cow::Borrowed(groups), + false, + )) } fn as_partitioned_aggregator(&self) -> Option<&dyn PartitionedAggregation> { @@ -209,16 +213,16 @@ impl PartitionedAggregation for ColumnExpr { df: &DataFrame, _groups: &GroupsProxy, state: &ExecutionState, - ) -> PolarsResult { + ) -> PolarsResult { self.evaluate(df, state) } fn finalize( &self, - partitioned: Series, + partitioned: Column, _groups: &GroupsProxy, _state: &ExecutionState, - ) -> PolarsResult { + ) -> PolarsResult { Ok(partitioned) } } diff --git a/crates/polars-expr/src/expressions/count.rs b/crates/polars-expr/src/expressions/count.rs index 5e8b4c75e376..6102caf5a354 100644 --- a/crates/polars-expr/src/expressions/count.rs +++ b/crates/polars-expr/src/expressions/count.rs @@ -21,11 +21,8 @@ impl PhysicalExpr for CountExpr { Some(&self.expr) } - fn evaluate(&self, df: &DataFrame, _state: &ExecutionState) -> PolarsResult { - Ok(Series::new( - PlSmallStr::from_static("len"), - [df.height() as IdxSize], - )) + fn evaluate(&self, df: &DataFrame, _state: &ExecutionState) -> PolarsResult { + Ok(Series::new(PlSmallStr::from_static("len"), [df.height() as IdxSize]).into_column()) } fn evaluate_on_groups<'a>( @@ -59,19 +56,19 @@ impl PartitionedAggregation for CountExpr { df: &DataFrame, groups: &GroupsProxy, state: &ExecutionState, - ) -> PolarsResult { + ) -> PolarsResult { self.evaluate_on_groups(df, groups, state) - .map(|mut ac| ac.aggregated()) + .map(|mut ac| ac.aggregated().into_column()) } /// Called to merge all the partitioned results in a final aggregate. #[allow(clippy::ptr_arg)] fn finalize( &self, - partitioned: Series, + partitioned: Column, groups: &GroupsProxy, _state: &ExecutionState, - ) -> PolarsResult { + ) -> PolarsResult { // SAFETY: groups are in bounds. let agg = unsafe { partitioned.agg_sum(groups) }; Ok(agg.with_name(PlSmallStr::from_static(LEN))) diff --git a/crates/polars-expr/src/expressions/filter.rs b/crates/polars-expr/src/expressions/filter.rs index b11d0dda6129..6f847a7fa8ed 100644 --- a/crates/polars-expr/src/expressions/filter.rs +++ b/crates/polars-expr/src/expressions/filter.rs @@ -24,7 +24,7 @@ impl PhysicalExpr for FilterExpr { fn as_expression(&self) -> Option<&Expr> { Some(&self.expr) } - fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult { + fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult { let s_f = || self.input.evaluate(df, state); let predicate_f = || self.by.evaluate(df, state); diff --git a/crates/polars-expr/src/expressions/gather.rs b/crates/polars-expr/src/expressions/gather.rs index a6450bcb531b..19a0e35ff315 100644 --- a/crates/polars-expr/src/expressions/gather.rs +++ b/crates/polars-expr/src/expressions/gather.rs @@ -18,7 +18,7 @@ impl PhysicalExpr for GatherExpr { fn as_expression(&self) -> Option<&Expr> { Some(&self.expr) } - fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult { + fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult { let series = self.phys_expr.evaluate(df, state)?; self.finish(df, state, series) } @@ -102,10 +102,10 @@ impl GatherExpr { &self, df: &DataFrame, state: &ExecutionState, - series: Series, - ) -> PolarsResult { + series: Column, + ) -> PolarsResult { let idx = self.idx.evaluate(df, state)?; - let idx = convert_to_unsigned_index(&idx, series.len())?; + let idx = convert_to_unsigned_index(idx.as_materialized_series(), series.len())?; series.take(&idx) } diff --git a/crates/polars-expr/src/expressions/literal.rs b/crates/polars-expr/src/expressions/literal.rs index 2089e4cf5bb4..0c6900d4356b 100644 --- a/crates/polars-expr/src/expressions/literal.rs +++ b/crates/polars-expr/src/expressions/literal.rs @@ -21,29 +21,31 @@ impl PhysicalExpr for LiteralExpr { fn as_expression(&self) -> Option<&Expr> { Some(&self.1) } - fn evaluate(&self, _df: &DataFrame, _state: &ExecutionState) -> PolarsResult { + fn evaluate(&self, _df: &DataFrame, _state: &ExecutionState) -> PolarsResult { use LiteralValue::*; let s = match &self.0 { #[cfg(feature = "dtype-i8")] - Int8(v) => Int8Chunked::full(get_literal_name().clone(), *v, 1).into_series(), + Int8(v) => Int8Chunked::full(get_literal_name().clone(), *v, 1).into_column(), #[cfg(feature = "dtype-i16")] - Int16(v) => Int16Chunked::full(get_literal_name().clone(), *v, 1).into_series(), - Int32(v) => Int32Chunked::full(get_literal_name().clone(), *v, 1).into_series(), - Int64(v) => Int64Chunked::full(get_literal_name().clone(), *v, 1).into_series(), + Int16(v) => Int16Chunked::full(get_literal_name().clone(), *v, 1).into_column(), + Int32(v) => Int32Chunked::full(get_literal_name().clone(), *v, 1).into_column(), + Int64(v) => Int64Chunked::full(get_literal_name().clone(), *v, 1).into_column(), #[cfg(feature = "dtype-u8")] - UInt8(v) => UInt8Chunked::full(get_literal_name().clone(), *v, 1).into_series(), + UInt8(v) => UInt8Chunked::full(get_literal_name().clone(), *v, 1).into_column(), #[cfg(feature = "dtype-u16")] - UInt16(v) => UInt16Chunked::full(get_literal_name().clone(), *v, 1).into_series(), - UInt32(v) => UInt32Chunked::full(get_literal_name().clone(), *v, 1).into_series(), - UInt64(v) => UInt64Chunked::full(get_literal_name().clone(), *v, 1).into_series(), - Float32(v) => Float32Chunked::full(get_literal_name().clone(), *v, 1).into_series(), - Float64(v) => Float64Chunked::full(get_literal_name().clone(), *v, 1).into_series(), + UInt16(v) => UInt16Chunked::full(get_literal_name().clone(), *v, 1).into_column(), + UInt32(v) => UInt32Chunked::full(get_literal_name().clone(), *v, 1).into_column(), + UInt64(v) => UInt64Chunked::full(get_literal_name().clone(), *v, 1).into_column(), + Float32(v) => Float32Chunked::full(get_literal_name().clone(), *v, 1).into_column(), + Float64(v) => Float64Chunked::full(get_literal_name().clone(), *v, 1).into_column(), #[cfg(feature = "dtype-decimal")] Decimal(v, scale) => Int128Chunked::full(get_literal_name().clone(), *v, 1) .into_decimal_unchecked(None, *scale) - .into_series(), - Boolean(v) => BooleanChunked::full(get_literal_name().clone(), *v, 1).into_series(), - Null => polars_core::prelude::Series::new_null(get_literal_name().clone(), 1), + .into_column(), + Boolean(v) => BooleanChunked::full(get_literal_name().clone(), *v, 1).into_column(), + Null => { + polars_core::prelude::Series::new_null(get_literal_name().clone(), 1).into_column() + }, Range { low, high, dtype } => match dtype { DataType::Int32 => { polars_ensure!( @@ -53,13 +55,13 @@ impl PhysicalExpr for LiteralExpr { let low = *low as i32; let high = *high as i32; let ca: NoNull = (low..high).collect(); - ca.into_inner().into_series() + ca.into_inner().into_column() }, DataType::Int64 => { let low = *low; let high = *high; let ca: NoNull = (low..high).collect(); - ca.into_inner().into_series() + ca.into_inner().into_column() }, DataType::UInt32 => { polars_ensure!( @@ -69,28 +71,28 @@ impl PhysicalExpr for LiteralExpr { let low = *low as u32; let high = *high as u32; let ca: NoNull = (low..high).collect(); - ca.into_inner().into_series() + ca.into_inner().into_column() }, dt => polars_bail!( InvalidOperation: "datatype `{}` is not supported as range", dt ), }, - String(v) => StringChunked::full(get_literal_name().clone(), v, 1).into_series(), - Binary(v) => BinaryChunked::full(get_literal_name().clone(), v, 1).into_series(), + String(v) => StringChunked::full(get_literal_name().clone(), v, 1).into_column(), + Binary(v) => BinaryChunked::full(get_literal_name().clone(), v, 1).into_column(), #[cfg(feature = "dtype-datetime")] DateTime(timestamp, tu, tz) => { Int64Chunked::full(get_literal_name().clone(), *timestamp, 1) .into_datetime(*tu, tz.clone()) - .into_series() + .into_column() }, #[cfg(feature = "dtype-duration")] Duration(v, tu) => Int64Chunked::full(get_literal_name().clone(), *v, 1) .into_duration(*tu) - .into_series(), + .into_column(), #[cfg(feature = "dtype-date")] Date(v) => Int32Chunked::full(get_literal_name().clone(), *v, 1) .into_date() - .into_series(), + .into_column(), #[cfg(feature = "dtype-time")] Time(v) => { if !(0..NANOSECONDS_IN_DAY).contains(v) { @@ -102,16 +104,17 @@ impl PhysicalExpr for LiteralExpr { Int64Chunked::full(get_literal_name().clone(), *v, 1) .into_time() - .into_series() + .into_column() }, - Series(series) => series.deref().clone(), - OtherScalar(s) => s.clone().into_series(get_literal_name().clone()), + Series(series) => series.deref().clone().into_column(), + OtherScalar(s) => s.clone().into_column(get_literal_name().clone()), lv @ (Int(_) | Float(_) | StrCat(_)) => polars_core::prelude::Series::from_any_values( get_literal_name().clone(), &[lv.to_any_value().unwrap()], false, ) - .unwrap(), + .unwrap() + .into_column(), }; Ok(s) } @@ -124,7 +127,10 @@ impl PhysicalExpr for LiteralExpr { state: &ExecutionState, ) -> PolarsResult> { let s = self.evaluate(df, state)?; - Ok(AggregationContext::from_literal(s, Cow::Borrowed(groups))) + Ok(AggregationContext::from_literal( + s.take_materialized_series(), + Cow::Borrowed(groups), + )) } fn as_partitioned_aggregator(&self) -> Option<&dyn PartitionedAggregation> { @@ -150,16 +156,16 @@ impl PartitionedAggregation for LiteralExpr { df: &DataFrame, _groups: &GroupsProxy, state: &ExecutionState, - ) -> PolarsResult { + ) -> PolarsResult { self.evaluate(df, state) } fn finalize( &self, - partitioned: Series, + partitioned: Column, _groups: &GroupsProxy, _state: &ExecutionState, - ) -> PolarsResult { + ) -> PolarsResult { Ok(partitioned) } } diff --git a/crates/polars-expr/src/expressions/mod.rs b/crates/polars-expr/src/expressions/mod.rs index 15550c517fe7..8ccc5349b733 100644 --- a/crates/polars-expr/src/expressions/mod.rs +++ b/crates/polars-expr/src/expressions/mod.rs @@ -536,7 +536,7 @@ pub trait PhysicalExpr: Send + Sync { } /// Take a DataFrame and evaluate the expression. - fn evaluate(&self, df: &DataFrame, _state: &ExecutionState) -> PolarsResult; + fn evaluate(&self, df: &DataFrame, _state: &ExecutionState) -> PolarsResult; /// Some expression that are not aggregations can be done per group /// Think of sort, slice, filter, shift, etc. @@ -611,7 +611,9 @@ impl PhysicalIoExpr for PhysicalIoHelper { if self.has_window_function { state.insert_has_window_function_flag(); } - self.expr.evaluate(df, &state) + self.expr + .evaluate(df, &state) + .map(|c| c.take_materialized_series()) } fn live_variables(&self) -> Option> { @@ -651,14 +653,14 @@ pub trait PartitionedAggregation: Send + Sync + PhysicalExpr { df: &DataFrame, groups: &GroupsProxy, state: &ExecutionState, - ) -> PolarsResult; + ) -> PolarsResult; /// Called to merge all the partitioned results in a final aggregate. #[allow(clippy::ptr_arg)] fn finalize( &self, - partitioned: Series, + partitioned: Column, groups: &GroupsProxy, state: &ExecutionState, - ) -> PolarsResult; + ) -> PolarsResult; } diff --git a/crates/polars-expr/src/expressions/rolling.rs b/crates/polars-expr/src/expressions/rolling.rs index 806e3d5b0398..7e9897d7328c 100644 --- a/crates/polars-expr/src/expressions/rolling.rs +++ b/crates/polars-expr/src/expressions/rolling.rs @@ -19,7 +19,7 @@ pub(crate) struct RollingExpr { } impl PhysicalExpr for RollingExpr { - fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult { + fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult { let groups_key = format!("{:?}", &self.options); let groups_map = state.group_tuples.read().unwrap(); @@ -47,7 +47,7 @@ impl PhysicalExpr for RollingExpr { if let Some(name) = &self.out_name { out.rename(name.clone()); } - Ok(out) + Ok(out.into_column()) } fn evaluate_on_groups<'a>( diff --git a/crates/polars-expr/src/expressions/slice.rs b/crates/polars-expr/src/expressions/slice.rs index d0e187120939..2b805edd1bb0 100644 --- a/crates/polars-expr/src/expressions/slice.rs +++ b/crates/polars-expr/src/expressions/slice.rs @@ -82,7 +82,7 @@ impl PhysicalExpr for SliceExpr { Some(&self.expr) } - fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult { + fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult { let results = POOL.install(|| { [&self.offset, &self.length, &self.input] .par_iter() @@ -92,7 +92,11 @@ impl PhysicalExpr for SliceExpr { let offset = &results[0]; let length = &results[1]; let series = &results[2]; - let (offset, length) = extract_args(offset, length, &self.expr)?; + let (offset, length) = extract_args( + offset.as_materialized_series(), + length.as_materialized_series(), + &self.expr, + )?; Ok(series.slice(offset, length)) } diff --git a/crates/polars-expr/src/expressions/sort.rs b/crates/polars-expr/src/expressions/sort.rs index 751b09b07475..be9fe57e29ce 100644 --- a/crates/polars-expr/src/expressions/sort.rs +++ b/crates/polars-expr/src/expressions/sort.rs @@ -46,7 +46,7 @@ impl PhysicalExpr for SortExpr { fn as_expression(&self) -> Option<&Expr> { Some(&self.expr) } - fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult { + fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult { let series = self.physical_expr.evaluate(df, state)?; series.sort_with(self.options) } diff --git a/crates/polars-expr/src/expressions/sortby.rs b/crates/polars-expr/src/expressions/sortby.rs index f966e4cbb544..1624d7c9bcd6 100644 --- a/crates/polars-expr/src/expressions/sortby.rs +++ b/crates/polars-expr/src/expressions/sortby.rs @@ -199,7 +199,7 @@ impl PhysicalExpr for SortByExpr { fn as_expression(&self) -> Option<&Expr> { Some(&self.expr) } - fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult { + fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult { let series_f = || self.input.evaluate(df, state); if self.by.is_empty() { // Sorting by 0 columns returns input unchanged. @@ -220,13 +220,11 @@ impl PhysicalExpr for SortByExpr { .by .iter() .map(|e| { - e.evaluate(df, state) - .map(|s| match s.dtype() { - #[cfg(feature = "dtype-categorical")] - DataType::Categorical(_, _) | DataType::Enum(_, _) => s, - _ => s.to_physical_repr().into_owned(), - }) - .map(Column::from) + e.evaluate(df, state).map(|s| match s.dtype() { + #[cfg(feature = "dtype-categorical")] + DataType::Categorical(_, _) | DataType::Enum(_, _) => s, + _ => s.to_physical_repr(), + }) }) .collect::>>()?; diff --git a/crates/polars-expr/src/expressions/ternary.rs b/crates/polars-expr/src/expressions/ternary.rs index 37600c71f06a..2d1035c22eb7 100644 --- a/crates/polars-expr/src/expressions/ternary.rs +++ b/crates/polars-expr/src/expressions/ternary.rs @@ -79,7 +79,7 @@ impl PhysicalExpr for TernaryExpr { Some(&self.expr) } - fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult { + fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult { let mut state = state.split(); // Don't cache window functions as they run in parallel. state.remove_cache_window_flag(); @@ -337,7 +337,7 @@ impl PartitionedAggregation for TernaryExpr { df: &DataFrame, groups: &GroupsProxy, state: &ExecutionState, - ) -> PolarsResult { + ) -> PolarsResult { let truthy = self.truthy.as_partitioned_aggregator().unwrap(); let falsy = self.falsy.as_partitioned_aggregator().unwrap(); let mask = self.predicate.as_partitioned_aggregator().unwrap(); @@ -352,10 +352,10 @@ impl PartitionedAggregation for TernaryExpr { fn finalize( &self, - partitioned: Series, + partitioned: Column, _groups: &GroupsProxy, _state: &ExecutionState, - ) -> PolarsResult { + ) -> PolarsResult { Ok(partitioned) } } diff --git a/crates/polars-expr/src/expressions/window.rs b/crates/polars-expr/src/expressions/window.rs index f843c0e83d95..e15a301f68b4 100644 --- a/crates/polars-expr/src/expressions/window.rs +++ b/crates/polars-expr/src/expressions/window.rs @@ -371,7 +371,7 @@ impl PhysicalExpr for WindowExpr { // This first cached the group_by and the join tuples, but rayon under a mutex leads to deadlocks: // https://github.com/rayon-rs/rayon/issues/592 - fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult { + fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult { // This method does the following: // 1. determine group_by tuples based on the group_column // 2. apply an aggregation function @@ -400,7 +400,7 @@ impl PhysicalExpr for WindowExpr { if df.is_empty() { let field = self.phys_function.to_field(&df.schema())?; - return Ok(Series::full_null(field.name().clone(), 0, field.dtype())); + return Ok(Column::full_null(field.name().clone(), 0, field.dtype())); } let group_by_columns = self @@ -443,7 +443,7 @@ impl PhysicalExpr for WindowExpr { if let Some((order_by, options)) = &self.order_by { let order_by = order_by.evaluate(df, state)?; polars_ensure!(order_by.len() == df.height(), ShapeMismatch: "the order by expression evaluated to a length: {} that doesn't match the input DataFrame: {}", order_by.len(), df.height()); - groups = update_groups_sort_by(&groups, &order_by, options)? + groups = update_groups_sort_by(&groups, order_by.as_materialized_series(), options)? } let out: PolarsResult = Ok(groups); @@ -521,7 +521,7 @@ impl PhysicalExpr for WindowExpr { if let Some(name) = &self.out_name { out.rename(name.clone()); } - Ok(out) + Ok(out.into_column()) }, Explode => { let mut out = ac.aggregated().explode()?; @@ -529,7 +529,7 @@ impl PhysicalExpr for WindowExpr { if let Some(name) = &self.out_name { out.rename(name.clone()); } - Ok(out) + Ok(out.into_column()) }, Map => { // TODO! @@ -551,6 +551,7 @@ impl PhysicalExpr for WindowExpr { state, &cache_key, ) + .map(Column::from) }, Join => { let out_column = ac.aggregated(); @@ -566,7 +567,7 @@ impl PhysicalExpr for WindowExpr { // we take the group locations to directly map them to the right place (UpdateGroups::No, Some(out)) => { cache_gb(gb, state, &cache_key); - Ok(out) + Ok(out.into_column()) }, (_, _) => { let keys = gb.keys(); @@ -625,7 +626,7 @@ impl PhysicalExpr for WindowExpr { jt_map.insert(cache_key, join_opt_ids); } - Ok(out) + Ok(out.into_column()) }, } }, diff --git a/crates/polars-lazy/src/dsl/list.rs b/crates/polars-lazy/src/dsl/list.rs index d73e4be5d13e..c706ee9b6957 100644 --- a/crates/polars-lazy/src/dsl/list.rs +++ b/crates/polars-lazy/src/dsl/list.rs @@ -69,7 +69,7 @@ fn run_per_sublist( let df = s.into_frame(); let out = phys_expr.evaluate(&df, &state); match out { - Ok(s) => Some(s), + Ok(s) => Some(s.take_materialized_series()), Err(e) => { *m_err.lock().unwrap() = Some(e); None @@ -90,7 +90,7 @@ fn run_per_sublist( let out = phys_expr.evaluate(&df_container, &state); df_container.clear_columns(); match out { - Ok(s) => Some(s), + Ok(s) => Some(s.take_materialized_series()), Err(e) => { err = Some(e); None diff --git a/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs b/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs index ad4d8cd1fb48..0700f5f767e7 100644 --- a/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs +++ b/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs @@ -36,7 +36,9 @@ impl PhysicalIoExpr for Wrap { } impl PhysicalPipedExpr for Wrap { fn evaluate(&self, chunk: &DataChunk, state: &ExecutionState) -> PolarsResult { - self.0.evaluate(&chunk.data, state) + self.0 + .evaluate(&chunk.data, state) + .map(|c| c.take_materialized_series()) } fn field(&self, input_schema: &Schema) -> PolarsResult { self.0.to_field(input_schema) diff --git a/crates/polars-mem-engine/src/executors/filter.rs b/crates/polars-mem-engine/src/executors/filter.rs index 689674345760..417a7ecf766e 100644 --- a/crates/polars-mem-engine/src/executors/filter.rs +++ b/crates/polars-mem-engine/src/executors/filter.rs @@ -45,7 +45,7 @@ impl FilterExec { if self.has_window { state.clear_window_expr_cache() } - df.filter(series_to_mask(&s)?) + df.filter(series_to_mask(s.as_materialized_series())?) } fn execute_chunks( @@ -55,7 +55,7 @@ impl FilterExec { ) -> PolarsResult { let iter = chunks.into_par_iter().map(|df| { let s = self.predicate.evaluate(&df, state)?; - df.filter(series_to_mask(&s)?) + df.filter(series_to_mask(s.as_materialized_series())?) }); let df = POOL.install(|| iter.collect::>>())?; Ok(accumulate_dataframes_vertical_unchecked(df)) diff --git a/crates/polars-mem-engine/src/executors/group_by_partitioned.rs b/crates/polars-mem-engine/src/executors/group_by_partitioned.rs index ad41378b3086..61cb9b10bc52 100644 --- a/crates/polars-mem-engine/src/executors/group_by_partitioned.rs +++ b/crates/polars-mem-engine/src/executors/group_by_partitioned.rs @@ -332,11 +332,7 @@ impl PartitionGroupByExec { .map(|(expr, partitioned_s)| { let agg_expr = expr.as_partitioned_aggregator().unwrap(); agg_expr - .finalize( - partitioned_s.as_materialized_series().clone(), - groups, - state, - ) + .finalize(partitioned_s.clone(), groups, state) .map(Column::from) }) .collect(); diff --git a/crates/polars-mem-engine/src/executors/join.rs b/crates/polars-mem-engine/src/executors/join.rs index 5edab8551ece..4fed3cb7a3ff 100644 --- a/crates/polars-mem-engine/src/executors/join.rs +++ b/crates/polars-mem-engine/src/executors/join.rs @@ -139,8 +139,8 @@ impl Executor for JoinExec { let df = df_left._join_impl( &df_right, - left_on_series, - right_on_series, + left_on_series.into_iter().map(|c| c.take_materialized_series()).collect(), + right_on_series.into_iter().map(|c| c.take_materialized_series()).collect(), self.args.clone(), true, state.verbose(), diff --git a/crates/polars-mem-engine/src/executors/projection_utils.rs b/crates/polars-mem-engine/src/executors/projection_utils.rs index 47464849582e..01dc5f362fd9 100644 --- a/crates/polars-mem-engine/src/executors/projection_utils.rs +++ b/crates/polars-mem-engine/src/executors/projection_utils.rs @@ -20,7 +20,7 @@ fn rolling_evaluate( df: &DataFrame, state: &ExecutionState, rolling: PlHashMap<&RollingGroupOptions, Vec>, -) -> PolarsResult>> { +) -> PolarsResult>> { POOL.install(|| { rolling .par_iter() @@ -51,7 +51,7 @@ fn window_evaluate( df: &DataFrame, state: &ExecutionState, window: PlHashMap>, -) -> PolarsResult>> { +) -> PolarsResult>> { POOL.install(|| { window .par_iter() @@ -99,7 +99,7 @@ fn execute_projection_cached_window_fns( df: &DataFrame, exprs: &[Arc], state: &ExecutionState, -) -> PolarsResult> { +) -> PolarsResult> { // We partition by normal expression and window expression // - the normal expressions can run in parallel // - the window expression take more memory and often use the same group_by keys and join tuples @@ -202,7 +202,7 @@ fn run_exprs_par( df: &DataFrame, exprs: &[Arc], state: &ExecutionState, -) -> PolarsResult> { +) -> PolarsResult> { POOL.install(|| { exprs .par_iter() @@ -215,7 +215,7 @@ fn run_exprs_seq( df: &DataFrame, exprs: &[Arc], state: &ExecutionState, -) -> PolarsResult> { +) -> PolarsResult> { exprs.iter().map(|expr| expr.evaluate(df, state)).collect() } @@ -225,7 +225,7 @@ pub(super) fn evaluate_physical_expressions( state: &ExecutionState, has_windows: bool, run_parallel: bool, -) -> PolarsResult> { +) -> PolarsResult> { let expr_runner = if has_windows { execute_projection_cached_window_fns } else if run_parallel && exprs.len() > 1 { @@ -246,7 +246,7 @@ pub(super) fn evaluate_physical_expressions( pub(super) fn check_expand_literals( df: &DataFrame, phys_expr: &[Arc], - mut selected_columns: Vec, + mut selected_columns: Vec, zero_length: bool, options: ProjectionOptions, ) -> PolarsResult { diff --git a/crates/polars-mem-engine/src/executors/stack.rs b/crates/polars-mem-engine/src/executors/stack.rs index ddeed0e8996b..ba6fa8111402 100644 --- a/crates/polars-mem-engine/src/executors/stack.rs +++ b/crates/polars-mem-engine/src/executors/stack.rs @@ -37,7 +37,12 @@ impl StackExec { self.options.run_parallel, )?; // We don't have to do a broadcast check as cse is not allowed to hit this. - df._add_series(res, schema)?; + df._add_series( + res.into_iter() + .map(|c| c.take_materialized_series()) + .collect(), + schema, + )?; Ok(df) }); @@ -94,7 +99,12 @@ impl StackExec { } } } - df._add_series(res, schema)?; + df._add_series( + res.into_iter() + .map(|v| v.take_materialized_series()) + .collect(), + schema, + )?; } df }; diff --git a/crates/polars-stream/src/expression.rs b/crates/polars-stream/src/expression.rs index 3c1b9445997c..197a28e265cc 100644 --- a/crates/polars-stream/src/expression.rs +++ b/crates/polars-stream/src/expression.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use polars_core::frame::DataFrame; -use polars_core::prelude::Series; +use polars_core::prelude::Column; use polars_error::PolarsResult; use polars_expr::prelude::{ExecutionState, PhysicalExpr}; @@ -21,7 +21,7 @@ impl StreamExpr { } } - pub async fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult { + pub async fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult { if self.reentrant { let state = state.clone(); let phys_expr = self.inner.clone(); diff --git a/crates/polars-stream/src/nodes/group_by.rs b/crates/polars-stream/src/nodes/group_by.rs index a2a2ae0d4d96..fb91a2965ac5 100644 --- a/crates/polars-stream/src/nodes/group_by.rs +++ b/crates/polars-stream/src/nodes/group_by.rs @@ -85,7 +85,10 @@ impl GroupBySinkState { // SAFETY: we resize the reduction to the number of groups beforehand. reduction.resize(local.grouper.num_groups()); reduction.update_groups( - &selector.evaluate(&df, state).await?, + selector + .evaluate(&df, state) + .await? + .as_materialized_series(), &group_idxs, )?; } diff --git a/crates/polars-stream/src/nodes/reduce.rs b/crates/polars-stream/src/nodes/reduce.rs index 565854e97b81..8a863050be9b 100644 --- a/crates/polars-stream/src/nodes/reduce.rs +++ b/crates/polars-stream/src/nodes/reduce.rs @@ -64,7 +64,7 @@ impl ReduceNode { while let Ok(morsel) = recv.recv().await { for (reducer, selector) in local_reducers.iter_mut().zip(selectors) { let input = selector.evaluate(morsel.df(), state).await?; - reducer.update_group(&input, 0)?; + reducer.update_group(input.as_materialized_series(), 0)?; } }