Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup TreeNode implementations #8672

Merged
merged 9 commits into from
Jan 1, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 21 additions & 5 deletions datafusion/common/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ use crate::Result;
/// [`LogicalPlan`]: https://docs.rs/datafusion-expr/latest/datafusion_expr/logical_plan/enum.LogicalPlan.html
/// [`Expr`]: https://docs.rs/datafusion-expr/latest/datafusion_expr/expr/enum.Expr.html
pub trait TreeNode: Sized {
/// Returns all children of the TreeNode
fn children_nodes(&self) -> Vec<Self>;
Copy link
Member Author

@viirya viirya Dec 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First tried with Vec<&Self>: https://github.com/apache/arrow-datafusion/compare/main...viirya:arrow-datafusion:refactor_treenode?expand=1.

Most cases it works well. But one trouble is the TreeNode implementation for Arc<T>:

impl<T: DynTreeNode + ?Sized> TreeNode for Arc<T> {
    fn children_nodes(&self) -> Vec<&Arc<T>> {
        // DynTreeNode.arc_children returns Vec<Arc<Self>> and this function cannot return reference of temporary object. The implementations of `DynTreeNode.arc_children` have same issue. It cannot be changed to return Vec<&Arc<Self>> too
        unimplemented!("Call arc_children instead")
    }

So changed to Vec<Self>.


/// Use preorder to iterate the node on the tree so that we can
/// stop fast for some cases.
///
Expand Down Expand Up @@ -211,7 +214,17 @@ pub trait TreeNode: Sized {
/// Apply the closure `F` to the node's children
fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
where
F: FnMut(&Self) -> Result<VisitRecursion>;
F: FnMut(&Self) -> Result<VisitRecursion>,
{
for child in self.children_nodes() {
match op(&child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
}
}
Ok(VisitRecursion::Continue)
}

/// Apply transform `F` to the node's children, the transform `F` might have a direction(Preorder or Postorder)
fn map_children<F>(self, transform: F) -> Result<Self>
Expand Down Expand Up @@ -342,18 +355,21 @@ pub trait DynTreeNode {
/// Blanket implementation for Arc for any tye that implements
/// [`DynTreeNode`] (such as [`Arc<dyn PhysicalExpr>`])
impl<T: DynTreeNode + ?Sized> TreeNode for Arc<T> {
fn children_nodes(&self) -> Vec<Arc<T>> {
self.arc_children()
}

fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
for child in self.arc_children() {
match op(&child)? {
for child in &self.arc_children() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this 2nd apply_children() implementation here? The base implementation in TreeNode might be enough.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, missed this. This can be removed too.

match op(child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
}
}

Ok(VisitRecursion::Continue)
}

Expand All @@ -368,7 +384,7 @@ impl<T: DynTreeNode + ?Sized> TreeNode for Arc<T> {
let arc_self = Arc::clone(&self);
self.with_new_arc_children(arc_self, new_children?)
} else {
Ok(self)
Ok(self.clone())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just one minor note, that this clone seems to be unnecessary.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, probably I added it during trying different ideas locally. Removed.

}
}
}
31 changes: 5 additions & 26 deletions datafusion/core/src/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use crate::physical_plan::{
};

use arrow::compute::SortOptions;
use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_expr::logical_plan::JoinType;
use datafusion_physical_expr::expressions::{Column, NoOp};
use datafusion_physical_expr::utils::map_columns_before_projection;
Expand Down Expand Up @@ -1409,18 +1409,8 @@ impl DistributionContext {
}

impl TreeNode for DistributionContext {
fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
for child in &self.children_nodes {
match op(child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
}
}
Ok(VisitRecursion::Continue)
fn children_nodes(&self) -> Vec<Self> {
self.children_nodes.iter().map(|c| c.clone()).collect()
}

fn map_children<F>(mut self, transform: F) -> Result<Self>
Expand Down Expand Up @@ -1483,19 +1473,8 @@ impl PlanWithKeyRequirements {
}

impl TreeNode for PlanWithKeyRequirements {
fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
for child in &self.children {
match op(child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
}
}

Ok(VisitRecursion::Continue)
fn children_nodes(&self) -> Vec<Self> {
self.children.iter().map(|c| c.clone()).collect()
}

fn map_children<F>(mut self, transform: F) -> Result<Self>
Expand Down
32 changes: 5 additions & 27 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ use crate::physical_plan::{
with_new_children_if_necessary, Distribution, ExecutionPlan, InputOrderMode,
};

use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{plan_err, DataFusionError};
use datafusion_physical_expr::{PhysicalSortExpr, PhysicalSortRequirement};
use datafusion_physical_plan::repartition::RepartitionExec;
Expand Down Expand Up @@ -145,19 +145,8 @@ impl PlanWithCorrespondingSort {
}

impl TreeNode for PlanWithCorrespondingSort {
fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
for child in &self.children_nodes {
match op(child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
}
}

Ok(VisitRecursion::Continue)
fn children_nodes(&self) -> Vec<Self> {
self.children_nodes.iter().map(|c| c.clone()).collect()
}

fn map_children<F>(mut self, transform: F) -> Result<Self>
Expand Down Expand Up @@ -237,19 +226,8 @@ impl PlanWithCorrespondingCoalescePartitions {
}

impl TreeNode for PlanWithCorrespondingCoalescePartitions {
fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
for child in &self.children_nodes {
match op(child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
}
}

Ok(VisitRecursion::Continue)
fn children_nodes(&self) -> Vec<Self> {
self.children_nodes.iter().map(|c| c.clone()).collect()
}

fn map_children<F>(mut self, transform: F) -> Result<Self>
Expand Down
17 changes: 3 additions & 14 deletions datafusion/core/src/physical_optimizer/pipeline_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};

use datafusion_common::config::OptimizerOptions;
use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{plan_err, DataFusionError};
use datafusion_physical_expr::intervals::utils::{check_support, is_datatype_supported};
use datafusion_physical_plan::joins::SymmetricHashJoinExec;
Expand Down Expand Up @@ -91,19 +91,8 @@ impl PipelineStatePropagator {
}

impl TreeNode for PipelineStatePropagator {
fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
for child in &self.children {
match op(child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
}
}

Ok(VisitRecursion::Continue)
fn children_nodes(&self) -> Vec<Self> {
self.children.iter().map(|c| c.clone()).collect()
}

fn map_children<F>(mut self, transform: F) -> Result<Self>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};

use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_physical_plan::unbounded_output;

/// For a given `plan`, this object carries the information one needs from its
Expand Down Expand Up @@ -104,18 +104,8 @@ impl OrderPreservationContext {
}

impl TreeNode for OrderPreservationContext {
fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
for child in &self.children_nodes {
match op(child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
}
}
Ok(VisitRecursion::Continue)
fn children_nodes(&self) -> Vec<Self> {
self.children_nodes.iter().map(|c| c.clone()).collect()
}

fn map_children<F>(mut self, transform: F) -> Result<Self>
Expand Down
18 changes: 4 additions & 14 deletions datafusion/core/src/physical_optimizer/sort_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};

use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{plan_err, DataFusionError, JoinSide, Result};
use datafusion_expr::JoinType;
use datafusion_physical_expr::expressions::Column;
Expand Down Expand Up @@ -71,20 +71,10 @@ impl SortPushDown {
}

impl TreeNode for SortPushDown {
fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
for child in &self.children_nodes {
match op(child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
}
}

Ok(VisitRecursion::Continue)
fn children_nodes(&self) -> Vec<Self> {
self.children_nodes.iter().map(|c| c.clone()).collect()
}

fn map_children<F>(mut self, transform: F) -> Result<Self>
where
F: FnMut(Self) -> Result<Self>,
Expand Down
47 changes: 18 additions & 29 deletions datafusion/expr/src/tree_node/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,13 @@ use crate::expr::{
};
use crate::{Expr, GetFieldAccess};

use datafusion_common::tree_node::{TreeNode, VisitRecursion};
use datafusion_common::tree_node::TreeNode;
use datafusion_common::{internal_err, DataFusionError, Result};

impl TreeNode for Expr {
fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
let children = match self {
Expr::Alias(Alias{expr,..})
fn children_nodes(&self) -> Vec<Self> {
Copy link
Contributor

@peter-toth peter-toth Dec 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is a nice refactor but I'm not sure it is needed at all (1.) and I'm also not sure cloning a node's children into a Vec is good direction (2.). Let me share my thoughts on these 2.

  1. I think in DataFusion there are only 4 real tree types: LogicalPlan, Expr, ExecutionPlan and PhysicalExpr and there 7 special tree types based on the above 4: SortPushDown, PlanWithRequitements, ExprOrdering, OrderPreservationContext, PipelineStatePropagator, PlanWithCorrespondingCoalescePartitions and PlanWithCorrespondingSort.
    The only reason why DataFusion currently has these 7 special trees is to be able to propagate down/up some additional information during tree transformations. But I think these special trees are simply not needed and can be removed. If we add some transformation helper functions like TreeNode.transform_down_with_payload() and TreeNode.transform_up_with_payload() we can easily remove these 7 special types completely.
    Get rid of special TreeNodes #8663 is about this issue and I removed SortPushDown, PlanWithRequitements and ExprOrdering as exaples in this PR: Transform with payload #8664.

  2. If we just focus on the 4 base tree types we can distingsuish 2 different types: Expr uses Boxes but the other 3 (LogicalPlan, ExecutionPlan and PhysicalExpr) use Arcs for the links between the nodes. I think this is very important in their behaviour. E.g. a big issue for Exprs when the nodes are cloned as it basicaly means cloning the whole subtree under the node. But cloning is not an issue for the other 3 as cloning Arcs is cheap. So I think the problem with the current Expr.apply_children() and the Expr.children_nodes() in this PR that it doesn't move the code into the a direction where cloning is not necessary on trees made of Boxes.
    What I would suggest long term is in Refactor TreeNode recursions #7942. Instead of the old TreeNode.apply_children() (or collecting the children into a Vec) we could use TreeNode.visit_children() (https://github.com/apache/arrow-datafusion/pull/7942/files#diff-6515fda3c67b9d487ab491fd21c27e32c411a8cbee24725d737290d19c10c199R31). IMO we need to implement visit_children() for the 4 base tree types only (see 1.).
    (Unfortunately I didn't noted the visit related changes in the Refactor TreeNode recursions #7942 PR description, but it is a big PR with many different ideas and focusing on transform seemed more important.)

Copy link
Member Author

@viirya viirya Dec 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I mentioned in above comment, the first try I did is to return Vec<&Self> for children_nodes. I think it makes more sense to return reference of children nodes instead of owned types (although for other implementations it is smart pointers, but I don't want to assume that the TreeNode impls must link with children nodes with Arc etc.).

But there is one trouble when implement TreeNode for Arc<T>: #8672 (comment). I've tried some ideas locally to get rid of it, but in the end they don't work so I changed children_nodes to return Vec<Self>. (still thinking if I can remove it but it may be more change. 🤔 )

This can be seen as an incremental cleanup patch as it doesn't change current API and keep current behavior but clean up the code as its purpose. In the long term I'd like to see more fundamental change to TreeNode and this can be seen as a short term effort to make the code easier to look/work on.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to return Vec<Cow<Self>> that would contain borrowed elements for Expr but owned for the others?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sometimes there may be cases where one needs to do multiple passes over the same tree (e.g. doing one top down and one bottom up pass consecutively). Or we may want to run different logic on the same tree.

So it may be a little premature to conclude we can do away with "derived" trees and just keep the base four.

I suggest we take small steps and progressively discover the requirements/use cases. I think refactoring to get rid of duplicate apply_children and map_children is a good first step.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, that's what I thought so this restrains to be limited change (mostly apply_children).

Copy link
Contributor

@peter-toth peter-toth Dec 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sometimes there may be cases where one needs to do multiple passes over the same tree (e.g. doing one top down and one bottom up pass consecutively). Or we may want to run different logic on the same tree.

This is also supported with transform_with_payload() (https://github.com/apache/arrow-datafusion/pull/8664/files#diff-1148652cb6868dfd5d45d595a1013232c2407f3c89d3850a4ac8e48b8a0884e1R127-R151) that does a top-down and a bottom-up transforms too and capable to propagate different payloads in the 2 directions.

So it may be a little premature to conclude we can do away with "derived" trees and just keep the base four.

I'm happy to try refactoring the remaining 4 (OrderPreservationContext, PipelineStatePropagator, PlanWithCorrespondingCoalescePartitions and PlanWithCorrespondingSort) in scope of #8664 next week and check the above statement.
(I thought a smaller PR that shows 1-2 examples for both up and down directions would be easier to review, but probably we can do it in 1 big PR if that's prefered.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I investigated the Vec<Cow<Self>> idea (#8672 (comment)) a bit further and it seems possible to avoid cloning at many places. This could be especially useful for Exprs where cloning is very expensive. I've opened a PR to this PR to here: viirya#132

match self {
Expr::Alias(Alias { expr, .. })
| Expr::Not(expr)
| Expr::IsNotNull(expr)
| Expr::IsTrue(expr)
Expand All @@ -47,26 +44,24 @@ impl TreeNode for Expr {
| Expr::Cast(Cast { expr, .. })
| Expr::TryCast(TryCast { expr, .. })
| Expr::Sort(Sort { expr, .. })
| Expr::InSubquery(InSubquery{ expr, .. }) => vec![expr.as_ref().clone()],
| Expr::InSubquery(InSubquery { expr, .. }) => vec![expr.as_ref().clone()],
Expr::GetIndexedField(GetIndexedField { expr, field }) => {
let expr = expr.as_ref().clone();
match field {
GetFieldAccess::ListIndex {key} => {
GetFieldAccess::ListIndex { key } => {
vec![key.as_ref().clone(), expr]
},
GetFieldAccess::ListRange {start, stop} => {
}
GetFieldAccess::ListRange { start, stop } => {
vec![start.as_ref().clone(), stop.as_ref().clone(), expr]
}
GetFieldAccess::NamedStructField {name: _name} => {
GetFieldAccess::NamedStructField { name: _name } => {
vec![expr]
}
}
}
Expr::GroupingSet(GroupingSet::Rollup(exprs))
| Expr::GroupingSet(GroupingSet::Cube(exprs)) => exprs.clone(),
Expr::ScalarFunction (ScalarFunction{ args, .. } ) => {
args.clone()
}
Expr::ScalarFunction(ScalarFunction { args, .. }) => args.clone(),
Expr::GroupingSet(GroupingSet::GroupingSets(lists_of_exprs)) => {
lists_of_exprs.clone().into_iter().flatten().collect()
}
Expand All @@ -77,8 +72,8 @@ impl TreeNode for Expr {
| Expr::Literal(_)
| Expr::Exists { .. }
| Expr::ScalarSubquery(_)
| Expr::Wildcard {..}
| Expr::Placeholder (_) => vec![],
| Expr::Wildcard { .. }
| Expr::Placeholder(_) => vec![],
Expr::BinaryExpr(BinaryExpr { left, right, .. }) => {
vec![left.as_ref().clone(), right.as_ref().clone()]
}
Expand Down Expand Up @@ -107,8 +102,12 @@ impl TreeNode for Expr {
}
expr_vec
}
Expr::AggregateFunction(AggregateFunction { args, filter, order_by, .. })
=> {
Expr::AggregateFunction(AggregateFunction {
args,
filter,
order_by,
..
}) => {
let mut expr_vec = args.clone();

if let Some(f) = filter {
Expand Down Expand Up @@ -137,17 +136,7 @@ impl TreeNode for Expr {
expr_vec.extend(list.clone());
expr_vec
}
};

for child in children.iter() {
match op(child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
}
}

Ok(VisitRecursion::Continue)
}

fn map_children<F>(self, transform: F) -> Result<Self>
Expand Down
19 changes: 4 additions & 15 deletions datafusion/expr/src/tree_node/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ use datafusion_common::tree_node::{TreeNodeVisitor, VisitRecursion};
use datafusion_common::{tree_node::TreeNode, Result};

impl TreeNode for LogicalPlan {
fn children_nodes(&self) -> Vec<Self> {
self.inputs().into_iter().map(|p| p.clone()).collect()
}

fn apply<F>(&self, op: &mut F) -> Result<VisitRecursion>
where
F: FnMut(&Self) -> Result<VisitRecursion>,
Expand Down Expand Up @@ -91,21 +95,6 @@ impl TreeNode for LogicalPlan {
visitor.post_visit(self)
}

fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
for child in self.inputs() {
match op(child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
}
}

Ok(VisitRecursion::Continue)
}

fn map_children<F>(self, transform: F) -> Result<Self>
where
F: FnMut(Self) -> Result<Self>,
Expand Down
Loading
Loading