Skip to content

Commit

Permalink
Account for constant equivalence properties in union, tests
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Oct 1, 2024
1 parent 84ac4f9 commit 0d545c9
Show file tree
Hide file tree
Showing 7 changed files with 901 additions and 425 deletions.
44 changes: 32 additions & 12 deletions datafusion/physical-expr-common/src/sort_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,28 +96,48 @@ impl PhysicalSortExpr {
}

/// Set the sort sort options to ASC
pub fn asc(mut self) -> Self {
self.options.descending = false;
self
pub fn asc(self) -> Self {
self.with_descending(false)
}

/// Set the sort sort options to DESC
pub fn desc(mut self) -> Self {
self.options.descending = true;
self
pub fn desc(self) -> Self {
self.with_descending(true)
}

/// Set the sort sort options to NULLS FIRST
pub fn nulls_first(mut self) -> Self {
self.options.nulls_first = true;
/// set the sort options `descending` flag
pub fn with_descending(mut self, descending: bool) -> Self {
self.options.descending = descending;
self
}

/// Set the sort sort options to NULLS LAST
pub fn nulls_last(mut self) -> Self {
self.options.nulls_first = false;
/// Set the sort options to NULLS FIRST
pub fn nulls_first(self) -> Self {
self.with_nulls_first(true)
}

/// Set the sort options to NULLS LAST
pub fn nulls_last(self) -> Self {
self.with_nulls_first(false)
}

/// set the sort options `nulls_first` flag
pub fn with_nulls_first(mut self, nulls_first: bool) -> Self {
self.options.nulls_first = nulls_first;
self
}

/// return the underlying PhysicalExpr
pub fn expr(&self) -> &Arc<dyn PhysicalExpr> {
&self.expr
}
}

/// Access the PhysicalSortExpr as a PhysicalExpr
impl AsRef<dyn PhysicalExpr> for PhysicalSortExpr {
fn as_ref(&self) -> &(dyn PhysicalExpr + 'static) {
self.expr.as_ref()
}
}

impl PartialEq for PhysicalSortExpr {
Expand Down
50 changes: 46 additions & 4 deletions datafusion/physical-expr/src/equivalence/class.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::JoinType;
use datafusion_physical_expr_common::physical_expr::format_physical_expr_list;

#[derive(Debug, Clone)]
/// A structure representing a expression known to be constant in a physical execution plan.
///
/// The `ConstExpr` struct encapsulates an expression that is constant during the execution
Expand All @@ -41,9 +40,10 @@ use datafusion_physical_expr_common::physical_expr::format_physical_expr_list;
///
/// - `expr`: Constant expression for a node in the physical plan.
///
/// - `across_partitions`: A boolean flag indicating whether the constant expression is
/// valid across partitions. If set to `true`, the constant expression has same value for all partitions.
/// If set to `false`, the constant expression may have different values for different partitions.
/// - `across_partitions`: A boolean flag indicating whether the constant
/// expression is the same across partitions. If set to `true`, the constant
/// expression has same value for all partitions. If set to `false`, the
/// constant expression may have different values for different partitions.
///
/// # Example
///
Expand All @@ -56,11 +56,22 @@ use datafusion_physical_expr_common::physical_expr::format_physical_expr_list;
/// // create a constant expression from a physical expression
/// let const_expr = ConstExpr::from(col);
/// ```
#[derive(Debug, Clone)]
pub struct ConstExpr {
/// The expression that is known to be constant (e.g. a `Column`)
expr: Arc<dyn PhysicalExpr>,
/// Does the constant have the same value across all partitions? See
/// struct docs for more details
across_partitions: bool,
}

impl PartialEq for ConstExpr {
fn eq(&self, other: &Self) -> bool {
self.across_partitions == other.across_partitions
&& self.expr.eq(other.expr.as_any())
}
}

impl ConstExpr {
/// Create a new constant expression from a physical expression.
///
Expand All @@ -74,11 +85,17 @@ impl ConstExpr {
}
}

/// Set the `across_partitions` flag
///
/// See struct docs for more details
pub fn with_across_partitions(mut self, across_partitions: bool) -> Self {
self.across_partitions = across_partitions;
self
}

/// Is the expression the same across all partitions?
///
/// See struct docs for more details
pub fn across_partitions(&self) -> bool {
self.across_partitions
}
Expand All @@ -101,6 +118,31 @@ impl ConstExpr {
across_partitions: self.across_partitions,
})
}

/// Returns true if this constant expression is equal to the given expression
pub fn eq_expr(&self, other: impl AsRef<dyn PhysicalExpr>) -> bool {
self.expr.eq(other.as_ref().as_any())
}

/// Returns a [`Display`]able list of `ConstExpr`.
pub fn format_list(input: &[ConstExpr]) -> impl Display + '_ {
struct DisplayableList<'a>(&'a [ConstExpr]);
impl<'a> Display for DisplayableList<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let mut first = true;
for sort_expr in self.0 {
if first {
first = false;
} else {
write!(f, ",")?;
}
write!(f, "{}", sort_expr)?;
}
Ok(())
}
}
DisplayableList(input)
}
}

/// Display implementation for `ConstExpr`
Expand Down
25 changes: 17 additions & 8 deletions datafusion/physical-expr/src/equivalence/ordering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
// specific language governing permissions and limitations
// under the License.

use std::fmt::Display;
use std::hash::Hash;
use std::sync::Arc;

use crate::equivalence::add_offset_to_expr;
use crate::{LexOrdering, PhysicalExpr, PhysicalSortExpr};
use arrow_schema::SortOptions;
use std::fmt::Display;
use std::hash::Hash;
use std::sync::Arc;
use std::vec::IntoIter;

/// An `OrderingEquivalenceClass` object keeps track of different alternative
/// orderings than can describe a schema. For example, consider the following table:
Expand All @@ -36,15 +36,15 @@ use arrow_schema::SortOptions;
///
/// Here, both `vec![a ASC, b ASC]` and `vec![c DESC, d ASC]` describe the table
/// ordering. In this case, we say that these orderings are equivalent.
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
#[derive(Debug, Clone, Eq, PartialEq, Hash, Default)]
pub struct OrderingEquivalenceClass {
pub orderings: Vec<LexOrdering>,
}

impl OrderingEquivalenceClass {
/// Creates new empty ordering equivalence class.
pub fn empty() -> Self {
Self { orderings: vec![] }
Default::default()
}

/// Clears (empties) this ordering equivalence class.
Expand Down Expand Up @@ -197,6 +197,15 @@ impl OrderingEquivalenceClass {
}
}

impl IntoIterator for OrderingEquivalenceClass {
type Item = LexOrdering;
type IntoIter = IntoIter<LexOrdering>;

fn into_iter(self) -> Self::IntoIter {
self.orderings.into_iter()
}
}

/// This function constructs a duplicate-free `LexOrdering` by filtering out
/// duplicate entries that have same physical expression inside. For example,
/// `vec![a ASC, a DESC]` collapses to `vec![a ASC]`.
Expand Down Expand Up @@ -229,10 +238,10 @@ impl Display for OrderingEquivalenceClass {
write!(f, "[")?;
let mut iter = self.orderings.iter();
if let Some(ordering) = iter.next() {
write!(f, "{}", PhysicalSortExpr::format_list(ordering))?;
write!(f, "[{}]", PhysicalSortExpr::format_list(ordering))?;
}
for ordering in iter {
write!(f, "{}", PhysicalSortExpr::format_list(ordering))?;
write!(f, ", [{}]", PhysicalSortExpr::format_list(ordering))?;
}
write!(f, "]")?;
Ok(())
Expand Down
Loading

0 comments on commit 0d545c9

Please sign in to comment.