From dcb9b347bafa51f8fb74f81693c7bb0f878132eb Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 20 Sep 2024 17:03:18 -0400 Subject: [PATCH] Account for constant equivalence properties in union, tests --- .../physical-expr-common/src/sort_expr.rs | 44 +- .../physical-expr/src/equivalence/class.rs | 50 +- .../src/equivalence/properties.rs | 436 ++++++++++++++---- datafusion/physical-expr/src/partitioning.rs | 6 +- 4 files changed, 440 insertions(+), 96 deletions(-) diff --git a/datafusion/physical-expr-common/src/sort_expr.rs b/datafusion/physical-expr-common/src/sort_expr.rs index 704cb291335fa..94f5fdeb662b3 100644 --- a/datafusion/physical-expr-common/src/sort_expr.rs +++ b/datafusion/physical-expr-common/src/sort_expr.rs @@ -96,28 +96,48 @@ impl PhysicalSortExpr { } /// Set the sort sort options to ASC - pub fn asc(mut self) -> Self { - self.options.descending = false; - self + pub fn asc(self) -> Self { + self.with_descending(false) } /// Set the sort sort options to DESC - pub fn desc(mut self) -> Self { - self.options.descending = true; - self + pub fn desc(self) -> Self { + self.with_descending(true) } - /// Set the sort sort options to NULLS FIRST - pub fn nulls_first(mut self) -> Self { - self.options.nulls_first = true; + /// set the sort options `descending` flag + pub fn with_descending(mut self, descending: bool) -> Self { + self.options.descending = descending; self } - /// Set the sort sort options to NULLS LAST - pub fn nulls_last(mut self) -> Self { - self.options.nulls_first = false; + /// Set the sort options to NULLS FIRST + pub fn nulls_first(self) -> Self { + self.with_nulls_first(true) + } + + /// Set the sort options to NULLS LAST + pub fn nulls_last(self) -> Self { + self.with_nulls_first(false) + } + + /// set the sort options `nulls_first` flag + pub fn with_nulls_first(mut self, nulls_first: bool) -> Self { + self.options.nulls_first = nulls_first; self } + + /// return the underlying PhysicalExpr + pub fn expr(&self) -> &Arc { + &self.expr + } +} + +/// Access the PhysicalSortExpr as a PhysicalExpr +impl AsRef for PhysicalSortExpr { + fn as_ref(&self) -> &(dyn PhysicalExpr + 'static) { + self.expr.as_ref() + } } impl PartialEq for PhysicalSortExpr { diff --git a/datafusion/physical-expr/src/equivalence/class.rs b/datafusion/physical-expr/src/equivalence/class.rs index 00708b4540aa6..c1851ddb22b53 100644 --- a/datafusion/physical-expr/src/equivalence/class.rs +++ b/datafusion/physical-expr/src/equivalence/class.rs @@ -30,7 +30,6 @@ use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::JoinType; use datafusion_physical_expr_common::physical_expr::format_physical_expr_list; -#[derive(Debug, Clone)] /// A structure representing a expression known to be constant in a physical execution plan. /// /// The `ConstExpr` struct encapsulates an expression that is constant during the execution @@ -41,9 +40,10 @@ use datafusion_physical_expr_common::physical_expr::format_physical_expr_list; /// /// - `expr`: Constant expression for a node in the physical plan. /// -/// - `across_partitions`: A boolean flag indicating whether the constant expression is -/// valid across partitions. If set to `true`, the constant expression has same value for all partitions. -/// If set to `false`, the constant expression may have different values for different partitions. +/// - `across_partitions`: A boolean flag indicating whether the constant +/// expression is the same across partitions. If set to `true`, the constant +/// expression has same value for all partitions. If set to `false`, the +/// constant expression may have different values for different partitions. /// /// # Example /// @@ -56,11 +56,22 @@ use datafusion_physical_expr_common::physical_expr::format_physical_expr_list; /// // create a constant expression from a physical expression /// let const_expr = ConstExpr::from(col); /// ``` +#[derive(Debug, Clone)] pub struct ConstExpr { + /// The expression that is known to be constant (e.g. a `Column`) expr: Arc, + /// Does the constant have the same value across all partitions? See + /// struct docs for more details across_partitions: bool, } +impl PartialEq for ConstExpr { + fn eq(&self, other: &Self) -> bool { + self.across_partitions == other.across_partitions + && self.expr.eq(other.expr.as_any()) + } +} + impl ConstExpr { /// Create a new constant expression from a physical expression. /// @@ -74,11 +85,17 @@ impl ConstExpr { } } + /// Set the `across_partitions` flag + /// + /// See struct docs for more details pub fn with_across_partitions(mut self, across_partitions: bool) -> Self { self.across_partitions = across_partitions; self } + /// Is the expression the same across all partitions? + /// + /// See struct docs for more details pub fn across_partitions(&self) -> bool { self.across_partitions } @@ -101,6 +118,31 @@ impl ConstExpr { across_partitions: self.across_partitions, }) } + + /// Returns true if this constant expression is equal to the given expression + pub fn eq_expr(&self, other: impl AsRef) -> bool { + self.expr.eq(other.as_ref().as_any()) + } + + /// Returns a [`Display`]able list of `ConstExpr`. + pub fn format_list(input: &[ConstExpr]) -> impl Display + '_ { + struct DisplayableList<'a>(&'a [ConstExpr]); + impl<'a> Display for DisplayableList<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + let mut first = true; + for const_expr in self.0 { + if first { + first = false; + } else { + write!(f, ",")?; + } + write!(f, "{}", const_expr)?; + } + Ok(()) + } + } + DisplayableList(input) + } } /// Display implementation for `ConstExpr` diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index 8137b4f9da137..12ec094459f8f 100644 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -17,6 +17,8 @@ use std::fmt::Display; use std::hash::{Hash, Hasher}; +use std::iter::Peekable; +use std::slice::Iter; use std::sync::Arc; use super::ordering::collapse_lex_ordering; @@ -279,6 +281,12 @@ impl EquivalenceProperties { self.with_constants(constants) } + /// Remove the specified constant + pub fn remove_constant(mut self, c: &ConstExpr) -> Self { + self.constants.retain(|existing| existing != c); + self + } + /// Track/register physical expressions with constant values. pub fn with_constants( mut self, @@ -1120,15 +1128,7 @@ impl Display for EquivalenceProperties { write!(f, ", eq: {}", self.eq_group)?; } if !self.constants.is_empty() { - write!(f, ", const: [")?; - let mut iter = self.constants.iter(); - if let Some(c) = iter.next() { - write!(f, "{}", c)?; - } - for c in iter { - write!(f, ", {}", c)?; - } - write!(f, "]")?; + write!(f, ", const: [{}]", ConstExpr::format_list(&self.constants))?; } Ok(()) } @@ -1611,58 +1611,62 @@ impl Hash for ExprWrapper { /// Calculates the union (in the sense of `UnionExec`) `EquivalenceProperties` /// of `lhs` and `rhs` according to the schema of `lhs`. +/// +/// Rules: The UnionExec does not interleave its inputs: instead it passes each +/// input partition from the children as its own output. +/// +/// Since the output equivalence properties are properties that are true for +/// *all* output partitions, that is the same as being true for all *input* +/// partitions fn calculate_union_binary( - lhs: EquivalenceProperties, + mut lhs: EquivalenceProperties, mut rhs: EquivalenceProperties, ) -> Result { - // TODO: In some cases, we should be able to preserve some equivalence - // classes. Add support for such cases. - // Harmonize the schema of the rhs with the schema of the lhs (which is the accumulator schema): if !rhs.schema.eq(&lhs.schema) { rhs = rhs.with_new_schema(Arc::clone(&lhs.schema))?; } - // First, calculate valid constants for the union. A quantity is constant - // after the union if it is constant in both sides. - let constants = lhs + // First, calculate valid constants for the union. An expression is constant + // at the output of the union if it is constant in both sides. + let constants: Vec<_> = lhs .constants() .iter() .filter(|const_expr| const_exprs_contains(rhs.constants(), const_expr.expr())) .map(|const_expr| { - // TODO: When both sides' constants are valid across partitions, - // the union's constant should also be valid if values are - // the same. However, we do not have the capability to - // check this yet. + // TODO: When both sides have a constant column, and the actual + // constant value is the same, then the output properties could + // reflect the constant is valid across all partitions. However we + // don't track the actual value that the ConstExpr takes on, so we + // can't determine that yet ConstExpr::new(Arc::clone(const_expr.expr())).with_across_partitions(false) }) .collect(); + // remove any constants that are shared in both outputs (avoid double counting them) + for c in &constants { + lhs = lhs.remove_constant(c); + rhs = rhs.remove_constant(c); + } + // Next, calculate valid orderings for the union by searching for prefixes // in both sides. - let mut orderings = vec![]; - for mut ordering in lhs.normalized_oeq_class().orderings { - // Progressively shorten the ordering to search for a satisfied prefix: - while !rhs.ordering_satisfy(&ordering) { - ordering.pop(); - } - // There is a non-trivial satisfied prefix, add it as a valid ordering: - if !ordering.is_empty() { - orderings.push(ordering); - } - } - for mut ordering in rhs.normalized_oeq_class().orderings { - // Progressively shorten the ordering to search for a satisfied prefix: - while !lhs.ordering_satisfy(&ordering) { - ordering.pop(); - } - // There is a non-trivial satisfied prefix, add it as a valid ordering: - if !ordering.is_empty() { - orderings.push(ordering); - } - } - let mut eq_properties = EquivalenceProperties::new(lhs.schema); - eq_properties.constants = constants; + let mut orderings = UnionEquivalentOrderingBuilder::new(); + orderings.add_satisfied_orderings( + lhs.normalized_oeq_class().orderings, + lhs.constants(), + &rhs, + ); + orderings.add_satisfied_orderings( + rhs.normalized_oeq_class().orderings, + rhs.constants(), + &lhs, + ); + let orderings = orderings.build(); + + let mut eq_properties = + EquivalenceProperties::new(lhs.schema).with_constants(constants); + eq_properties.add_new_orderings(orderings); Ok(eq_properties) } @@ -1675,8 +1679,7 @@ pub fn calculate_union( eqps: Vec, schema: SchemaRef, ) -> Result { - // TODO: In some cases, we should be able to preserve some equivalence - // classes. Add support for such cases. + // TODO avoid this clone by using `reduce` let mut init = eqps[0].clone(); // Harmonize the schema of the init with the schema of the union: if !init.schema.eq(&schema) { @@ -1687,6 +1690,206 @@ pub fn calculate_union( .try_fold(init, calculate_union_binary) } +#[derive(Debug)] +enum AddedOrdering { + /// The ordering was added to the in progress result + Yes, + /// The ordering was not added + No(LexOrdering), +} + +/// Builds valid output orderings of a `UnionExec` +#[derive(Debug)] +struct UnionEquivalentOrderingBuilder { + orderings: Vec, +} + +impl UnionEquivalentOrderingBuilder { + fn new() -> Self { + Self { orderings: vec![] } + } + + /// Add all orderings from `orderings` that satisfy `properties`, + /// potentially augmented with`constants`. + /// + /// Note: any column that is known to be constant can be inserted into the + /// ordering without changing its meaning + /// + /// For example: + /// * `orderings` contains `[a ASC, c ASC]` and `constants` contains `b` + /// * `properties` has required ordering `[a ASC, b ASC]` + /// + /// Then this will add `[a ASC, b ASC]` to the `orderings` list (as `a` was + /// in the sort order and `b` was a constant). + fn add_satisfied_orderings( + &mut self, + orderings: impl IntoIterator, + constants: &[ConstExpr], + properties: &EquivalenceProperties, + ) { + for mut ordering in orderings.into_iter() { + // Progressively shorten the ordering to search for a satisfied prefix: + loop { + match self.try_add_ordering(ordering, constants, properties) { + AddedOrdering::Yes => break, + AddedOrdering::No(o) => { + ordering = o; + ordering.pop(); + } + } + } + } + } + + /// Adds `ordering`, potentially augmented with constants, if it satisfies + /// the target `properties` properties. + /// + /// Returns + /// + /// * [`AddedOrdering::Yes`] if the ordering was added (either directly or + /// augmented), or was empty. + /// + /// * [`AddedOrdering::No`] if the ordering was not added + fn try_add_ordering( + &mut self, + ordering: LexOrdering, + constants: &[ConstExpr], + properties: &EquivalenceProperties, + ) -> AddedOrdering { + if ordering.is_empty() { + AddedOrdering::Yes + } else if constants.is_empty() && properties.ordering_satisfy(&ordering) { + // If the ordering satisfies the target properties, no need to + // augment it with constants. + self.orderings.push(ordering); + AddedOrdering::Yes + } else { + // Did not satisfy target properties, try and augment with constants + // to match the properties + if self.try_find_augmented_ordering(&ordering, constants, properties) { + AddedOrdering::Yes + } else { + AddedOrdering::No(ordering) + } + } + } + + /// Attempts to add `constants` to `ordering` to satisfy the properties. + /// + /// returns true if any orderings were added, false otherwise + fn try_find_augmented_ordering( + &mut self, + ordering: &LexOrdering, + constants: &[ConstExpr], + properties: &EquivalenceProperties, + ) -> bool { + // can't augment if there is nothing to augment with + if constants.is_empty() { + return false; + } + let start_num_orderings = self.orderings.len(); + + // for each equivalent ordering in properties, try and augment + // `ordering` it with the constants to match + for existing_ordering in &properties.oeq_class.orderings { + if let Some(augmented_ordering) = self.augment_ordering( + ordering, + constants, + existing_ordering, + &properties.constants, + ) { + if !augmented_ordering.is_empty() { + assert!(properties.ordering_satisfy(&augmented_ordering)); + self.orderings.push(augmented_ordering); + } + } + } + + self.orderings.len() > start_num_orderings + } + + /// Attempts to augment the ordering with constants to match the + /// `existing_ordering` + /// + /// Returns Some(ordering) if an augmented ordering was found, None otherwise + fn augment_ordering( + &mut self, + ordering: &LexOrdering, + constants: &[ConstExpr], + existing_ordering: &LexOrdering, + existing_constants: &[ConstExpr], + ) -> Option { + let mut augmented_ordering = vec![]; + let mut sort_expr_iter = ordering.iter().peekable(); + let mut existing_sort_expr_iter = existing_ordering.iter().peekable(); + + // walk in parallel down the two orderings, trying to match them up + while sort_expr_iter.peek().is_some() || existing_sort_expr_iter.peek().is_some() + { + // If the next expressions are equal, add the next match + // otherwise try and match with a constant + if let Some(expr) = + advance_if_match(&mut sort_expr_iter, &mut existing_sort_expr_iter) + { + augmented_ordering.push(expr); + } else if let Some(expr) = + advance_if_matches_constant(&mut sort_expr_iter, existing_constants) + { + augmented_ordering.push(expr); + } else if let Some(expr) = + advance_if_matches_constant(&mut existing_sort_expr_iter, constants) + { + augmented_ordering.push(expr); + } else { + // no match, can't continue the ordering, return what we have + break; + } + } + + Some(augmented_ordering) + } + + fn build(self) -> Vec { + self.orderings + } +} + +/// Advances two iterators in parallel +/// +/// If the next expressions are equal, the iterators are advanced and returns +/// the matched expression . +/// +/// Otherwise, the iterators are left unchanged and return `None` +fn advance_if_match( + iter1: &mut Peekable>, + iter2: &mut Peekable>, +) -> Option { + if matches!((iter1.peek(), iter2.peek()), (Some(expr1), Some(expr2)) if expr1.eq(expr2)) + { + iter1.next().unwrap(); + iter2.next().cloned() + } else { + None + } +} + +/// Advances the iterator with a constant +/// +/// If the next expression matches one of the constants, advances the iterator +/// returning the matched expression +/// +/// Otherwise, the iterator is left unchanged and returns `None` +fn advance_if_matches_constant( + iter: &mut Peekable>, + constants: &[ConstExpr], +) -> Option { + let expr = iter.peek()?; + let const_expr = constants.iter().find(|c| c.eq_expr(expr))?; + let found_expr = PhysicalSortExpr::new(Arc::clone(const_expr.expr()), expr.options); + iter.next(); + Some(found_expr) +} + #[cfg(test)] mod tests { use std::ops::Not; @@ -2786,7 +2989,7 @@ mod tests { } #[test] - fn test_union_equivalence_properties_constants_1() { + fn test_union_equivalence_properties_constants_common_constants() { let schema = create_test_schema().unwrap(); UnionEquivalenceTest::new(&schema) .with_child_sort_and_const_exprs( @@ -2810,10 +3013,9 @@ mod tests { } #[test] - fn test_union_equivalence_properties_constants_2() { + fn test_union_equivalence_properties_constants_prefix() { let schema = create_test_schema().unwrap(); UnionEquivalenceTest::new(&schema) - // Meet ordering between [a ASC], [a ASC, b ASC] should be [a ASC] .with_child_sort_and_const_exprs( // First child: [a ASC], const [] vec![vec!["a"]], @@ -2835,10 +3037,9 @@ mod tests { } #[test] - fn test_union_equivalence_properties_constants_3() { + fn test_union_equivalence_properties_constants_asc_desc_mismatch() { let schema = create_test_schema().unwrap(); UnionEquivalenceTest::new(&schema) - // Meet ordering between [a ASC], [a DESC] should be [] .with_child_sort_and_const_exprs( // First child: [a ASC], const [] vec![vec!["a"]], @@ -2860,7 +3061,7 @@ mod tests { } #[test] - fn test_union_equivalence_properties_constants_4() { + fn test_union_equivalence_properties_constants_different_schemas() { let schema = create_test_schema().unwrap(); let schema2 = append_fields(&schema, "1"); UnionEquivalenceTest::new(&schema) @@ -2877,11 +3078,10 @@ mod tests { &schema2, ) .with_expected_sort_and_const_exprs( - // Union orderings: - // should be [a ASC] + // Union orderings: [a ASC] // - // Where a, and a1 ath the same index for their corresponding - // schemas. + // Note that a, and a1 are at the same index for their + // corresponding schemas. vec![vec!["a"]], vec![], ) @@ -2889,9 +3089,7 @@ mod tests { } #[test] - #[ignore] - // ignored due to https://github.com/apache/datafusion/issues/12446 - fn test_union_equivalence_properties_constants() { + fn test_union_equivalence_properties_constants_fill_gaps() { let schema = create_test_schema().unwrap(); UnionEquivalenceTest::new(&schema) .with_child_sort_and_const_exprs( @@ -2918,13 +3116,58 @@ mod tests { } #[test] - #[ignore] - // ignored due to https://github.com/apache/datafusion/issues/12446 - fn test_union_equivalence_properties_constants_desc() { + fn test_union_equivalence_properties_constants_no_fill_gaps() { + let schema = create_test_schema().unwrap(); + UnionEquivalenceTest::new(&schema) + .with_child_sort_and_const_exprs( + // First child orderings: [a ASC, c ASC], const [d] // some other constant + vec![vec!["a", "c"]], + vec!["d"], + &schema, + ) + .with_child_sort_and_const_exprs( + // Second child orderings: [b ASC, c ASC], const [a] + vec![vec!["b", "c"]], + vec!["a"], + &schema, + ) + .with_expected_sort_and_const_exprs( + // Union orderings: [[a]] (only a is constant) + vec![vec!["a"]], + vec![], + ) + .run() + } + + #[test] + fn test_union_equivalence_properties_constants_fill_some_gaps() { + let schema = create_test_schema().unwrap(); + UnionEquivalenceTest::new(&schema) + .with_child_sort_and_const_exprs( + // First child orderings: [c ASC], const [a, b] // some other constant + vec![vec!["c"]], + vec!["a", "b"], + &schema, + ) + .with_child_sort_and_const_exprs( + // Second child orderings: [a DESC, b], const [] + vec![vec!["a DESC", "b"]], + vec![], + &schema, + ) + .with_expected_sort_and_const_exprs( + // Union orderings: [[a, b]] (can fill in the a/b with constants) + vec![vec!["a DESC", "b"]], + vec![], + ) + .run() + } + + #[test] + fn test_union_equivalence_properties_constants_fill_gaps_non_symmetric() { let schema = create_test_schema().unwrap(); UnionEquivalenceTest::new(&schema) .with_child_sort_and_const_exprs( - // NB `b DESC` in the second child // First child orderings: [a ASC, c ASC], const [b] vec![vec!["a", "c"]], vec!["b"], @@ -2948,9 +3191,7 @@ mod tests { } #[test] - #[ignore] - // ignored due to https://github.com/apache/datafusion/issues/12446 - fn test_union_equivalence_properties_constants_middle() { + fn test_union_equivalence_properties_constants_gap_fill_symmetric() { let schema = create_test_schema().unwrap(); UnionEquivalenceTest::new(&schema) .with_child_sort_and_const_exprs( @@ -2967,8 +3208,8 @@ mod tests { ) .with_expected_sort_and_const_exprs( // Union orderings: - // [a, b, d] (c constant) - // [a, c, d] (b constant) + // [a, b, c, d] + // [a, c, b, d] vec![vec!["a", "c", "b", "d"], vec!["a", "b", "c", "d"]], vec![], ) @@ -2976,8 +3217,31 @@ mod tests { } #[test] - #[ignore] - // ignored due to https://github.com/apache/datafusion/issues/12446 + fn test_union_equivalence_properties_constants_gap_fill_and_common() { + let schema = create_test_schema().unwrap(); + UnionEquivalenceTest::new(&schema) + .with_child_sort_and_const_exprs( + // First child: [a DESC, d ASC], const [b, c] + vec![vec!["a DESC", "d"]], + vec!["b", "c"], + &schema, + ) + .with_child_sort_and_const_exprs( + // Second child: [a DESC, c ASC, d ASC], const [b] + vec![vec!["a DESC", "c", "d"]], + vec!["b"], + &schema, + ) + .with_expected_sort_and_const_exprs( + // Union orderings: + // [a DESC, c, d] [b] + vec![vec!["a DESC", "c", "d"]], + vec!["b"], + ) + .run() + } + + #[test] fn test_union_equivalence_properties_constants_middle_desc() { let schema = create_test_schema().unwrap(); UnionEquivalenceTest::new(&schema) @@ -3088,18 +3352,32 @@ mod tests { child_properties, expected_properties, } = self; + let expected_properties = expected_properties.expect("expected_properties not set"); - let actual_properties = - calculate_union(child_properties, Arc::clone(&output_schema)) - .expect("failed to calculate union equivalence properties"); - assert_eq_properties_same( - &actual_properties, - &expected_properties, - format!( - "expected: {expected_properties:?}\nactual: {actual_properties:?}" - ), - ); + + // try all permutations of the children + // as the code treats lhs and rhs differently + for child_properties in child_properties + .iter() + .cloned() + .permutations(child_properties.len()) + { + println!("--- permutation ---"); + for c in &child_properties { + println!("{c}"); + } + let actual_properties = + calculate_union(child_properties, Arc::clone(&output_schema)) + .expect("failed to calculate union equivalence properties"); + assert_eq_properties_same( + &actual_properties, + &expected_properties, + format!( + "expected: {expected_properties:?}\nactual: {actual_properties:?}" + ), + ); + } } /// Make equivalence properties for the specified columns named in orderings and constants diff --git a/datafusion/physical-expr/src/partitioning.rs b/datafusion/physical-expr/src/partitioning.rs index 01f72a8efd9a5..2e16e20d451ff 100644 --- a/datafusion/physical-expr/src/partitioning.rs +++ b/datafusion/physical-expr/src/partitioning.rs @@ -271,7 +271,11 @@ impl Display for Distribution { Distribution::UnspecifiedDistribution => write!(f, "Unspecified"), Distribution::SinglePartition => write!(f, "SinglePartition"), Distribution::HashPartitioned(exprs) => { - write!(f, "HashPartitioned[{}])", format_physical_expr_list(exprs)) + write!( + f, + "Hash Partitioned by {}", + format_physical_expr_list(exprs) + ) } } }