Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Projection Pushdown in PhysicalPlan #8073

Merged
merged 2 commits into from
Nov 9, 2023
Merged

Projection Pushdown in PhysicalPlan #8073

merged 2 commits into from
Nov 9, 2023

Conversation

berkaysynnada
Copy link
Contributor

@berkaysynnada berkaysynnada commented Nov 7, 2023

Which issue does this PR close?

Closes #.

Rationale for this change

Pushing down the ProjectionExec is generally beneficial for execution. Hence, whenever feasible and advantageous, we should aim to swap it with its input. This PR introduces a rule for this purpose. While a similar rule exists in the logical planning stage, some cases may emerge for further optimization after some optimizer rules have worked.

What changes are included in this PR?

This PR introduces a PhysicalOptimizerRule ProjectionPushdown implemented at the final optimization step. The rule initially checks if the operation is a ProjectionExec. If it is, the rule attempts to eliminate it if it's redundant. If not, it examines the input of the projection. Each operator has its own conditions for swapping with a projection. If the conditions are satisfied, the plan tree Projection <-- X <-- Y evolves to X <-- Projection <-- Y. Two projections can always be combined into one, and in some scenarios, projections can be removed from the plan if they can be propagated to the source providers.

Are these changes tested?

Yes, unit tests have been added to cover each operator. Additionally, various .slt test changes show successful optimizations. Benchmark results are as follows:

┏━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃     main ┃ feature_optimize_projections ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │ 421.93ms │                     421.60ms │     no change │
│ QQuery 2     │  59.86ms │                      59.83ms │     no change │
│ QQuery 3     │ 141.49ms │                     140.42ms │     no change │
│ QQuery 4     │  89.99ms │                      88.31ms │     no change │
│ QQuery 5     │ 173.94ms │                     171.95ms │     no change │
│ QQuery 6     │  84.61ms │                      84.30ms │     no change │
│ QQuery 7     │ 265.81ms │                     251.49ms │ +1.06x faster │
│ QQuery 8     │ 203.04ms │                     200.24ms │     no change │
│ QQuery 9     │ 285.64ms │                     285.41ms │     no change │
│ QQuery 10    │ 224.73ms │                     220.80ms │     no change │
│ QQuery 11    │  57.97ms │                      58.64ms │     no change │
│ QQuery 12    │ 158.46ms │                     158.17ms │     no change │
│ QQuery 13    │ 231.70ms │                     221.08ms │     no change │
│ QQuery 14    │ 119.61ms │                     119.42ms │     no change │
│ QQuery 15    │  82.82ms │                      83.33ms │     no change │
│ QQuery 16    │  51.07ms │                      50.90ms │     no change │
│ QQuery 17    │ 222.45ms │                     204.10ms │ +1.09x faster │
│ QQuery 18    │ 428.66ms │                     436.40ms │     no change │
│ QQuery 19    │ 239.63ms │                     241.73ms │     no change │
│ QQuery 20    │ 137.80ms │                     141.81ms │     no change │
│ QQuery 21    │ 354.14ms │                     362.21ms │     no change │
│ QQuery 22    │  52.01ms │                      53.07ms │     no change │
└──────────────┴──────────┴──────────────────────────────┴───────────────┘

Are there any user-facing changes?

@github-actions github-actions bot added physical-expr Physical Expressions core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) labels Nov 7, 2023
Copy link
Contributor

@ozankabak ozankabak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have collaborated with @berkaysynnada and reviewed this PR carefully over the last week. Almost all the changes are within a new file that implements the rule (projection_pushdown.rs), so it should be an easy review.

@alamb
Copy link
Contributor

alamb commented Nov 7, 2023

@crepererum implemented something similar to this in IOx -- can you please review this PR as well @crepererum ? Maybe we can contribute some of IOx's tests cases back upstream?

try_swapping_with_nested_loop_join(projection, nl_join)?
} else if let Some(sm_join) = input.downcast_ref::<SortMergeJoinExec>() {
try_swapping_with_sort_merge_join(projection, sm_join)?
} else if let Some(sym_join) = input.downcast_ref::<SymmetricHashJoinExec>() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we make this registry-based so that custom execs could also profit from this pass?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we can make a method on ExecutionPlan and then add the relevant methods to each impl ExecutionPlan, similar to what I did in #7936

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We like this idea and considered how we can do this, but didn't see an obvious design to follow. Any suggestions on how we can do this? Also, do you think we should get the functionality in first and do the refactor as a follow-on PR, or should we incorporate it in this one?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could add a function like this to ExecutionPlan:

trait ExecutionPlan {
...
  /// Tries to push a projection of the output of this ExecutionPlan 
  /// *into* itself input by rewriting the internal expressions.
  /// 
  /// For example,  
  /// (TODO EXAMPLE HERE
  ///
  /// If the ExecutionPlan does not support pushdown, ,returns Ok(None) (the default)
  fn push_projection(&self, projection: &[(Arc<dyn PhysicalExpr>, String)]) -> Result<Option<Arc<dyn ExecutionPlan>>>) {
    Ok(None)
  }
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I guess this whole problem is:

How is a set of optimizer passes linked to a set of nodes, while both sets are extendible?

I see the following rough solutions:

A: omniscient optimizer

The optimizer knows all node types. This is what this PR does (and what many other passes do).

This doesn't scale well.

B: omniscient nodes

The nodes know all optimizer passes and implement them themselves. This kinda sounds like what @alamb proposes.

This doesn't scale well.

C: registry-based linking

Developers are aware of the which nodes can be optimized in which way and can fill out gaps in the optimizer-node matrix. The issue is mostly how this registry should be implemented. Rust has a bunch of crates for that that are all not great (due to the issue of the initialization order). Luckily we all know which node types are in a plan (because you could traverse the plan) so we could hook registry initialization in there. Something like:

trait ExecutionPlan {
    fn register_hook_for_optimizer_pass(....);
}

D: abstraction

This is what most other optimizer passes do: they read some abstract property of the node (like "schema", "num children", "output ordering", ...) and infer the correct behavior based on that. I think we could use something like this here as well. Namely if you would know what columns of an input schema are used by the node itself and which are just "pass-through", you could apply projection pushdown automatically.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC what @alamb proposes is almost at the boundary of categories B and D. If we can define what the proposed function does purely in terms of projection behavior (which would still have meaning independent from the pushdown rule), we can consider it to be in category D.

We think that category A is not in-line with Datafusion's philosophy, and I think we all agree on this. However, in many cases, category A-type implementations serve as a good stepping stone as we try to gain a good understanding of how a good category C/D design looks like. So, on our end, we typically employ the strategy of getting a good test suite and a baseline category A implementation done first, and then progressively migrate towards a long-term category C/D solution. This PR is one of such first steps 🙂

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think with a follow-up ticket "make this rule generic" we could accept the solution in this PR, WDYT @alamb ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think with a follow-up ticket "make this rule generic" we could accept the solution in this PR, WDYT @alamb ?

I agree -- I filed #8096

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @berkaysynnada and @ozankabak -- this PR is on my review list but I probably won't get to it until tomorrow

physical_plan
ProjectionExec: expr=[a@0 as a]
--CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_ordering=[a@0 ASC NULLS LAST], has_header=true
physical_plan CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a], output_ordering=[a@0 ASC NULLS LAST], has_header=true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a nice improvement in plan (it avoids scanning b now)

try_swapping_with_nested_loop_join(projection, nl_join)?
} else if let Some(sm_join) = input.downcast_ref::<SortMergeJoinExec>() {
try_swapping_with_sort_merge_join(projection, sm_join)?
} else if let Some(sym_join) = input.downcast_ref::<SymmetricHashJoinExec>() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could add a function like this to ExecutionPlan:

trait ExecutionPlan {
...
  /// Tries to push a projection of the output of this ExecutionPlan 
  /// *into* itself input by rewriting the internal expressions.
  /// 
  /// For example,  
  /// (TODO EXAMPLE HERE
  ///
  /// If the ExecutionPlan does not support pushdown, ,returns Ok(None) (the default)
  fn push_projection(&self, projection: &[(Arc<dyn PhysicalExpr>, String)]) -> Result<Option<Arc<dyn ExecutionPlan>>>) {
    Ok(None)
  }
}

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @berkaysynnada and @ozankabak -- I took a look at the code and I have some ideas of how to simplify it, but we can do so as a follow on PR.

This is a very nice contribution

@@ -107,6 +108,13 @@ impl PhysicalOptimizer {
// into an `order by max(x) limit y`. In this case it will copy the limit value down
// to the aggregation, allowing it to use only y number of accumulators.
Arc::new(TopKAggregation::new()),
// The ProjectionPushdown rule tries to push projections towards
// the sources in the execution plan. As a result of this process,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// the sources in the execution plan. As a result of this process,
// the sources in the execution plan, in addition to the projection pushdown
// that happens in the LogicalPlan optimizer. As a result of this process,

Comment on lines +298 to +301
// If the projection does not narrow the the schema, we should not try to push it down:
if projection.expr().len() >= projection.input().schema().fields().len() {
return Ok(None);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is repeated for almost every operator -- it might be possible to pull it into remove_unnecessary_projections and remove all the duplication here

},
))
}
} else if let Some(binary) = expr_any.downcast_ref::<BinaryExpr>() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pattern is basically recursing through the PhysicalExpr tree manually, and only covers some subset of of the nodes.

I tried rewriting it using TreeNode, which is both less code and covers all PhysicalExpr types, not just a subset, and the tests still pass.

I will make a follow on PR with the proposed simplification

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb alamb merged commit 1c17c47 into apache:main Nov 9, 2023
22 checks passed
@alamb
Copy link
Contributor

alamb commented Nov 9, 2023

I have a follow on PR with some proposed simplifications: #8109

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate physical-expr Physical Expressions sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants