Skip to content

Commit

Permalink
Do not use partial rewrites
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Nov 9, 2023
1 parent d524da9 commit b29b097
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 5 deletions.
13 changes: 13 additions & 0 deletions datafusion/common/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,19 @@ pub trait TreeNode: Sized {
Ok(new_node)
}

/// Convenience utils for writing optimizers rule: recursively apply the given 'op' first to all of its
/// children and then itself(Postorder Traversal) using a mutable function, `F`.
/// When the `op` does not apply to a given node, it is left unchanged.
fn transform_up_mut<F>(self, op: &mut F) -> Result<Self>
where
F: FnMut(Self) -> Result<Transformed<Self>>,
{
let after_op_children = self.map_children(|node| node.transform_up_mut(op))?;

let new_node = op(after_op_children)?.into();
Ok(new_node)
}

/// Transform the tree node using the given [TreeNodeRewriter]
/// It performs a depth first walk of an node and its children.
///
Expand Down
33 changes: 28 additions & 5 deletions datafusion/core/src/physical_optimizer/projection_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -788,16 +788,31 @@ fn update_expr(
projected_exprs: &[(Arc<dyn PhysicalExpr>, String)],
sync_with_child: bool,
) -> Result<Option<Arc<dyn PhysicalExpr>>> {
let mut rewritten = false;
#[derive(Debug, PartialEq)]
enum RewriteState {
/// The expression is unchanged.
Unchanged,
/// Some part of the expression has been rewritten
RewrittenValid,
/// Some part of the expression has been rewritten, but some column
/// references could not be.
RewrittenInvalid,
}

let mut state = RewriteState::Unchanged;

let new_expr = expr
.clone()
.transform_down_mut(&mut |expr: Arc<dyn PhysicalExpr>| {
.transform_up_mut(&mut |expr: Arc<dyn PhysicalExpr>| {
if state == RewriteState::RewrittenInvalid {
return Ok(Transformed::No(expr));
}

let Some(column) = expr.as_any().downcast_ref::<Column>() else {
return Ok(Transformed::No(expr));
};
if sync_with_child {
rewritten = true;
state = RewriteState::RewrittenValid;
// Update the index of `column`:
Ok(Transformed::Yes(projected_exprs[column.index()].0.clone()))
} else {
Expand All @@ -815,15 +830,23 @@ fn update_expr(
},
);
if let Some(new_col) = new_col {
rewritten = true;
state = RewriteState::RewrittenValid;
Ok(Transformed::Yes(new_col))
} else {
// didn't find a rewrite, stop trying
state = RewriteState::RewrittenInvalid;
Ok(Transformed::No(expr))
}
}
});

new_expr.map(|new_expr| if rewritten { Some(new_expr) } else { None })
new_expr.map(|new_expr| {
if state == RewriteState::RewrittenValid {
Some(new_expr)
} else {
None
}
})
}

/// Creates a new [`ProjectionExec`] instance with the given child plan and
Expand Down

0 comments on commit b29b097

Please sign in to comment.