From 584bb757ee98c40ec81bf43e982ce64f2e5b3981 Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Thu, 17 Feb 2022 13:24:53 +0800 Subject: [PATCH] physical sort crate (#1843) --- Cargo.toml | 1 + datafusion-physical-expr/Cargo.toml | 40 +++++ datafusion-physical-expr/README.md | 24 +++ .../src/aggregate_expr.rs | 59 +++++++ datafusion-physical-expr/src/lib.rs | 26 +++ datafusion-physical-expr/src/physical_expr.rs | 41 +++++ datafusion-physical-expr/src/sort_expr.rs | 65 ++++++++ datafusion-physical-expr/src/window_expr.rs | 113 +++++++++++++ datafusion/Cargo.toml | 1 + .../src/physical_plan/expressions/mod.rs | 53 +------ datafusion/src/physical_plan/mod.rs | 150 +----------------- 11 files changed, 381 insertions(+), 192 deletions(-) create mode 100644 datafusion-physical-expr/Cargo.toml create mode 100644 datafusion-physical-expr/README.md create mode 100644 datafusion-physical-expr/src/aggregate_expr.rs create mode 100644 datafusion-physical-expr/src/lib.rs create mode 100644 datafusion-physical-expr/src/physical_expr.rs create mode 100644 datafusion-physical-expr/src/sort_expr.rs create mode 100644 datafusion-physical-expr/src/window_expr.rs diff --git a/Cargo.toml b/Cargo.toml index f74f53ced323..beaa22d91fa4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ members = [ "datafusion", "datafusion-common", "datafusion-expr", + "datafusion-physical-expr", "datafusion-cli", "datafusion-examples", "benchmarks", diff --git a/datafusion-physical-expr/Cargo.toml b/datafusion-physical-expr/Cargo.toml new file mode 100644 index 000000000000..ba12b586354e --- /dev/null +++ b/datafusion-physical-expr/Cargo.toml @@ -0,0 +1,40 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "datafusion-physical-expr" +description = "DataFusion is an in-memory query engine that uses Apache Arrow as the memory model" +version = "7.0.0" +homepage = "https://github.com/apache/arrow-datafusion" +repository = "https://github.com/apache/arrow-datafusion" +readme = "../README.md" +authors = ["Apache Arrow "] +license = "Apache-2.0" +keywords = [ "arrow", "query", "sql" ] +edition = "2021" +rust-version = "1.58" + +[lib] +name = "datafusion_physical_expr" +path = "src/lib.rs" + +[features] + +[dependencies] +datafusion-common = { path = "../datafusion-common", version = "7.0.0" } +datafusion-expr = { path = "../datafusion-expr", version = "7.0.0" } +arrow = { version = "9.0.0", features = ["prettyprint"] } diff --git a/datafusion-physical-expr/README.md b/datafusion-physical-expr/README.md new file mode 100644 index 000000000000..9c92023382d4 --- /dev/null +++ b/datafusion-physical-expr/README.md @@ -0,0 +1,24 @@ + + +# DataFusion Physical Expr + +This is an internal module for fundamental physical expression types of [DataFusion][df]. + +[df]: https://crates.io/crates/datafusion diff --git a/datafusion-physical-expr/src/aggregate_expr.rs b/datafusion-physical-expr/src/aggregate_expr.rs new file mode 100644 index 000000000000..fc0f39e6977c --- /dev/null +++ b/datafusion-physical-expr/src/aggregate_expr.rs @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::PhysicalExpr; + +use arrow::datatypes::Field; +use datafusion_common::Result; +use datafusion_expr::Accumulator; +use std::fmt::Debug; + +use std::any::Any; +use std::sync::Arc; + +/// An aggregate expression that: +/// * knows its resulting field +/// * knows how to create its accumulator +/// * knows its accumulator's state's field +/// * knows the expressions from whose its accumulator will receive values +pub trait AggregateExpr: Send + Sync + Debug { + /// Returns the aggregate expression as [`Any`](std::any::Any) so that it can be + /// downcast to a specific implementation. + fn as_any(&self) -> &dyn Any; + + /// the field of the final result of this aggregation. + fn field(&self) -> Result; + + /// the accumulator used to accumulate values from the expressions. + /// the accumulator expects the same number of arguments as `expressions` and must + /// return states with the same description as `state_fields` + fn create_accumulator(&self) -> Result>; + + /// the fields that encapsulate the Accumulator's state + /// the number of fields here equals the number of states that the accumulator contains + fn state_fields(&self) -> Result>; + + /// expressions that are passed to the Accumulator. + /// Single-column aggregations such as `sum` return a single value, others (e.g. `cov`) return many. + fn expressions(&self) -> Vec>; + + /// Human readable name such as `"MIN(c2)"`. The default + /// implementation returns placeholder text. + fn name(&self) -> &str { + "AggregateExpr: default name" + } +} diff --git a/datafusion-physical-expr/src/lib.rs b/datafusion-physical-expr/src/lib.rs new file mode 100644 index 000000000000..63edaa5ac94b --- /dev/null +++ b/datafusion-physical-expr/src/lib.rs @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod aggregate_expr; +mod physical_expr; +mod sort_expr; +mod window_expr; + +pub use aggregate_expr::AggregateExpr; +pub use physical_expr::PhysicalExpr; +pub use sort_expr::PhysicalSortExpr; +pub use window_expr::WindowExpr; diff --git a/datafusion-physical-expr/src/physical_expr.rs b/datafusion-physical-expr/src/physical_expr.rs new file mode 100644 index 000000000000..25885b1ab567 --- /dev/null +++ b/datafusion-physical-expr/src/physical_expr.rs @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::datatypes::{DataType, Schema}; + +use arrow::record_batch::RecordBatch; + +use datafusion_common::Result; + +use datafusion_expr::ColumnarValue; +use std::fmt::{Debug, Display}; + +use std::any::Any; + +/// Expression that can be evaluated against a RecordBatch +/// A Physical expression knows its type, nullability and how to evaluate itself. +pub trait PhysicalExpr: Send + Sync + Display + Debug { + /// Returns the physical expression as [`Any`](std::any::Any) so that it can be + /// downcast to a specific implementation. + fn as_any(&self) -> &dyn Any; + /// Get the data type of this expression, given the schema of the input + fn data_type(&self, input_schema: &Schema) -> Result; + /// Determine whether this expression is nullable, given the schema of the input + fn nullable(&self, input_schema: &Schema) -> Result; + /// Evaluate an expression against a RecordBatch + fn evaluate(&self, batch: &RecordBatch) -> Result; +} diff --git a/datafusion-physical-expr/src/sort_expr.rs b/datafusion-physical-expr/src/sort_expr.rs new file mode 100644 index 000000000000..e8172dd9979e --- /dev/null +++ b/datafusion-physical-expr/src/sort_expr.rs @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::PhysicalExpr; +use arrow::compute::kernels::sort::{SortColumn, SortOptions}; +use arrow::record_batch::RecordBatch; +use datafusion_common::{DataFusionError, Result}; +use datafusion_expr::ColumnarValue; +use std::sync::Arc; + +/// Represents Sort operation for a column in a RecordBatch +#[derive(Clone, Debug)] +pub struct PhysicalSortExpr { + /// Physical expression representing the column to sort + pub expr: Arc, + /// Option to specify how the given column should be sorted + pub options: SortOptions, +} + +impl std::fmt::Display for PhysicalSortExpr { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + let opts_string = match (self.options.descending, self.options.nulls_first) { + (true, true) => "DESC", + (true, false) => "DESC NULLS LAST", + (false, true) => "ASC", + (false, false) => "ASC NULLS LAST", + }; + + write!(f, "{} {}", self.expr, opts_string) + } +} + +impl PhysicalSortExpr { + /// evaluate the sort expression into SortColumn that can be passed into arrow sort kernel + pub fn evaluate_to_sort_column(&self, batch: &RecordBatch) -> Result { + let value_to_sort = self.expr.evaluate(batch)?; + let array_to_sort = match value_to_sort { + ColumnarValue::Array(array) => array, + ColumnarValue::Scalar(scalar) => { + return Err(DataFusionError::Plan(format!( + "Sort operation is not applicable to scalar value {}", + scalar + ))); + } + }; + Ok(SortColumn { + values: array_to_sort, + options: Some(self.options), + }) + } +} diff --git a/datafusion-physical-expr/src/window_expr.rs b/datafusion-physical-expr/src/window_expr.rs new file mode 100644 index 000000000000..67caba51dcab --- /dev/null +++ b/datafusion-physical-expr/src/window_expr.rs @@ -0,0 +1,113 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::{PhysicalExpr, PhysicalSortExpr}; +use arrow::compute::kernels::partition::lexicographical_partition_ranges; +use arrow::compute::kernels::sort::{SortColumn, SortOptions}; +use arrow::record_batch::RecordBatch; +use arrow::{array::ArrayRef, datatypes::Field}; +use datafusion_common::{DataFusionError, Result}; +use std::any::Any; +use std::fmt::Debug; +use std::ops::Range; +use std::sync::Arc; + +/// A window expression that: +/// * knows its resulting field +pub trait WindowExpr: Send + Sync + Debug { + /// Returns the window expression as [`Any`](std::any::Any) so that it can be + /// downcast to a specific implementation. + fn as_any(&self) -> &dyn Any; + + /// the field of the final result of this window function. + fn field(&self) -> Result; + + /// Human readable name such as `"MIN(c2)"` or `"RANK()"`. The default + /// implementation returns placeholder text. + fn name(&self) -> &str { + "WindowExpr: default name" + } + + /// expressions that are passed to the WindowAccumulator. + /// Functions which take a single input argument, such as `sum`, return a single [`datafusion_expr::expr::Expr`], + /// others (e.g. `cov`) return many. + fn expressions(&self) -> Vec>; + + /// evaluate the window function arguments against the batch and return + /// array ref, normally the resulting vec is a single element one. + fn evaluate_args(&self, batch: &RecordBatch) -> Result> { + self.expressions() + .iter() + .map(|e| e.evaluate(batch)) + .map(|r| r.map(|v| v.into_array(batch.num_rows()))) + .collect() + } + + /// evaluate the window function values against the batch + fn evaluate(&self, batch: &RecordBatch) -> Result; + + /// evaluate the partition points given the sort columns; if the sort columns are + /// empty then the result will be a single element vec of the whole column rows. + fn evaluate_partition_points( + &self, + num_rows: usize, + partition_columns: &[SortColumn], + ) -> Result>> { + if partition_columns.is_empty() { + Ok(vec![Range { + start: 0, + end: num_rows, + }]) + } else { + Ok(lexicographical_partition_ranges(partition_columns) + .map_err(DataFusionError::ArrowError)? + .collect::>()) + } + } + + /// expressions that's from the window function's partition by clause, empty if absent + fn partition_by(&self) -> &[Arc]; + + /// expressions that's from the window function's order by clause, empty if absent + fn order_by(&self) -> &[PhysicalSortExpr]; + + /// get partition columns that can be used for partitioning, empty if absent + fn partition_columns(&self, batch: &RecordBatch) -> Result> { + self.partition_by() + .iter() + .map(|expr| { + PhysicalSortExpr { + expr: expr.clone(), + options: SortOptions::default(), + } + .evaluate_to_sort_column(batch) + }) + .collect() + } + + /// get sort columns that can be used for peer evaluation, empty if absent + fn sort_columns(&self, batch: &RecordBatch) -> Result> { + let mut sort_columns = self.partition_columns(batch)?; + let order_by_columns = self + .order_by() + .iter() + .map(|e| e.evaluate_to_sort_column(batch)) + .collect::>>()?; + sort_columns.extend(order_by_columns); + Ok(sort_columns) + } +} diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index 7da50ad10249..cbba899181e2 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -54,6 +54,7 @@ row = [] [dependencies] datafusion-common = { path = "../datafusion-common", version = "7.0.0" } datafusion-expr = { path = "../datafusion-expr", version = "7.0.0" } +datafusion-physical-expr = { path = "../datafusion-physical-expr", version = "7.0.0" } ahash = { version = "0.7", default-features = false } hashbrown = { version = "0.12", features = ["raw"] } arrow = { version = "9.0.0", features = ["prettyprint"] } diff --git a/datafusion/src/physical_plan/expressions/mod.rs b/datafusion/src/physical_plan/expressions/mod.rs index 06afe004ff34..567e87c1d36d 100644 --- a/datafusion/src/physical_plan/expressions/mod.rs +++ b/datafusion/src/physical_plan/expressions/mod.rs @@ -17,13 +17,7 @@ //! Defines physical expressions that can evaluated at runtime during query execution -use std::sync::Arc; - -use super::ColumnarValue; -use crate::error::{DataFusionError, Result}; -use crate::physical_plan::PhysicalExpr; -use arrow::compute::kernels::sort::{SortColumn, SortOptions}; -use arrow::record_batch::RecordBatch; +use datafusion_expr::ColumnarValue; mod approx_distinct; mod approx_percentile_cont; @@ -118,52 +112,13 @@ pub fn format_state_name(name: &str, state_name: &str) -> String { format!("{}[{}]", name, state_name) } -/// Represents Sort operation for a column in a RecordBatch -#[derive(Clone, Debug)] -pub struct PhysicalSortExpr { - /// Physical expression representing the column to sort - pub expr: Arc, - /// Option to specify how the given column should be sorted - pub options: SortOptions, -} - -impl std::fmt::Display for PhysicalSortExpr { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - let opts_string = match (self.options.descending, self.options.nulls_first) { - (true, true) => "DESC", - (true, false) => "DESC NULLS LAST", - (false, true) => "ASC", - (false, false) => "ASC NULLS LAST", - }; - - write!(f, "{} {}", self.expr, opts_string) - } -} - -impl PhysicalSortExpr { - /// evaluate the sort expression into SortColumn that can be passed into arrow sort kernel - pub fn evaluate_to_sort_column(&self, batch: &RecordBatch) -> Result { - let value_to_sort = self.expr.evaluate(batch)?; - let array_to_sort = match value_to_sort { - ColumnarValue::Array(array) => array, - ColumnarValue::Scalar(scalar) => { - return Err(DataFusionError::Plan(format!( - "Sort operation is not applicable to scalar value {}", - scalar - ))); - } - }; - Ok(SortColumn { - values: array_to_sort, - options: Some(self.options), - }) - } -} +pub use datafusion_physical_expr::PhysicalSortExpr; #[cfg(test)] mod tests { - use super::*; use crate::{error::Result, physical_plan::AggregateExpr, scalar::ScalarValue}; + use arrow::record_batch::RecordBatch; + use std::sync::Arc; /// macro to perform an aggregation and verify the result. #[macro_export] diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index cb9ae644080e..9949982d0f27 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -23,25 +23,20 @@ use self::{ coalesce_partitions::CoalescePartitionsExec, display::DisplayableExecutionPlan, }; use crate::physical_plan::expressions::PhysicalSortExpr; -use crate::{ - error::{DataFusionError, Result}, - execution::runtime_env::RuntimeEnv, - scalar::ScalarValue, -}; -use arrow::compute::kernels::partition::lexicographical_partition_ranges; -use arrow::compute::kernels::sort::{SortColumn, SortOptions}; -use arrow::datatypes::{DataType, Schema, SchemaRef}; +use crate::{error::Result, execution::runtime_env::RuntimeEnv, scalar::ScalarValue}; + +use arrow::datatypes::SchemaRef; use arrow::error::Result as ArrowResult; use arrow::record_batch::RecordBatch; -use arrow::{array::ArrayRef, datatypes::Field}; + use async_trait::async_trait; pub use datafusion_expr::Accumulator; pub use datafusion_expr::ColumnarValue; pub use display::DisplayFormatType; use futures::stream::Stream; use std::fmt; -use std::fmt::{Debug, Display}; -use std::ops::Range; +use std::fmt::Debug; + use std::sync::Arc; use std::task::{Context, Poll}; use std::{any::Any, pin::Pin}; @@ -474,138 +469,7 @@ pub enum Distribution { HashPartitioned(Vec>), } -/// Expression that can be evaluated against a RecordBatch -/// A Physical expression knows its type, nullability and how to evaluate itself. -pub trait PhysicalExpr: Send + Sync + Display + Debug { - /// Returns the physical expression as [`Any`](std::any::Any) so that it can be - /// downcast to a specific implementation. - fn as_any(&self) -> &dyn Any; - /// Get the data type of this expression, given the schema of the input - fn data_type(&self, input_schema: &Schema) -> Result; - /// Determine whether this expression is nullable, given the schema of the input - fn nullable(&self, input_schema: &Schema) -> Result; - /// Evaluate an expression against a RecordBatch - fn evaluate(&self, batch: &RecordBatch) -> Result; -} - -/// An aggregate expression that: -/// * knows its resulting field -/// * knows how to create its accumulator -/// * knows its accumulator's state's field -/// * knows the expressions from whose its accumulator will receive values -pub trait AggregateExpr: Send + Sync + Debug { - /// Returns the aggregate expression as [`Any`](std::any::Any) so that it can be - /// downcast to a specific implementation. - fn as_any(&self) -> &dyn Any; - - /// the field of the final result of this aggregation. - fn field(&self) -> Result; - - /// the accumulator used to accumulate values from the expressions. - /// the accumulator expects the same number of arguments as `expressions` and must - /// return states with the same description as `state_fields` - fn create_accumulator(&self) -> Result>; - - /// the fields that encapsulate the Accumulator's state - /// the number of fields here equals the number of states that the accumulator contains - fn state_fields(&self) -> Result>; - - /// expressions that are passed to the Accumulator. - /// Single-column aggregations such as `sum` return a single value, others (e.g. `cov`) return many. - fn expressions(&self) -> Vec>; - - /// Human readable name such as `"MIN(c2)"`. The default - /// implementation returns placeholder text. - fn name(&self) -> &str { - "AggregateExpr: default name" - } -} - -/// A window expression that: -/// * knows its resulting field -pub trait WindowExpr: Send + Sync + Debug { - /// Returns the window expression as [`Any`](std::any::Any) so that it can be - /// downcast to a specific implementation. - fn as_any(&self) -> &dyn Any; - - /// the field of the final result of this window function. - fn field(&self) -> Result; - - /// Human readable name such as `"MIN(c2)"` or `"RANK()"`. The default - /// implementation returns placeholder text. - fn name(&self) -> &str { - "WindowExpr: default name" - } - - /// expressions that are passed to the WindowAccumulator. - /// Functions which take a single input argument, such as `sum`, return a single [`datafusion_expr::expr::Expr`], - /// others (e.g. `cov`) return many. - fn expressions(&self) -> Vec>; - - /// evaluate the window function arguments against the batch and return - /// array ref, normally the resulting vec is a single element one. - fn evaluate_args(&self, batch: &RecordBatch) -> Result> { - self.expressions() - .iter() - .map(|e| e.evaluate(batch)) - .map(|r| r.map(|v| v.into_array(batch.num_rows()))) - .collect() - } - - /// evaluate the window function values against the batch - fn evaluate(&self, batch: &RecordBatch) -> Result; - - /// evaluate the partition points given the sort columns; if the sort columns are - /// empty then the result will be a single element vec of the whole column rows. - fn evaluate_partition_points( - &self, - num_rows: usize, - partition_columns: &[SortColumn], - ) -> Result>> { - if partition_columns.is_empty() { - Ok(vec![Range { - start: 0, - end: num_rows, - }]) - } else { - Ok(lexicographical_partition_ranges(partition_columns) - .map_err(DataFusionError::ArrowError)? - .collect::>()) - } - } - - /// expressions that's from the window function's partition by clause, empty if absent - fn partition_by(&self) -> &[Arc]; - - /// expressions that's from the window function's order by clause, empty if absent - fn order_by(&self) -> &[PhysicalSortExpr]; - - /// get partition columns that can be used for partitioning, empty if absent - fn partition_columns(&self, batch: &RecordBatch) -> Result> { - self.partition_by() - .iter() - .map(|expr| { - PhysicalSortExpr { - expr: expr.clone(), - options: SortOptions::default(), - } - .evaluate_to_sort_column(batch) - }) - .collect() - } - - /// get sort columns that can be used for peer evaluation, empty if absent - fn sort_columns(&self, batch: &RecordBatch) -> Result> { - let mut sort_columns = self.partition_columns(batch)?; - let order_by_columns = self - .order_by() - .iter() - .map(|e| e.evaluate_to_sort_column(batch)) - .collect::>>()?; - sort_columns.extend(order_by_columns); - Ok(sort_columns) - } -} +pub use datafusion_physical_expr::{AggregateExpr, PhysicalExpr, WindowExpr}; /// Applies an optional projection to a [`SchemaRef`], returning the /// projected schema