From 5e51e513ddabebaf1064849c943ffa3b66cbff73 Mon Sep 17 00:00:00 2001 From: Austin Liu Date: Tue, 1 Oct 2024 00:16:28 +0800 Subject: [PATCH 1/6] Update trait `UserDefinedLogicalNodeCore` Signed-off-by: Austin Liu --- datafusion/expr/src/logical_plan/extension.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/datafusion/expr/src/logical_plan/extension.rs b/datafusion/expr/src/logical_plan/extension.rs index d49c85fb6fd6..d0b05cfd590a 100644 --- a/datafusion/expr/src/logical_plan/extension.rs +++ b/datafusion/expr/src/logical_plan/extension.rs @@ -195,6 +195,10 @@ pub trait UserDefinedLogicalNode: fmt::Debug + Send + Sync { /// directly because it must remain object safe. fn dyn_eq(&self, other: &dyn UserDefinedLogicalNode) -> bool; fn dyn_ord(&self, other: &dyn UserDefinedLogicalNode) -> Option; + + /// Indicates to the optimizer if its safe to push a limit down past + /// this extension node + fn allows_limit_to_inputs(&self) -> bool; } impl Hash for dyn UserDefinedLogicalNode { @@ -295,6 +299,10 @@ pub trait UserDefinedLogicalNodeCore: ) -> Option>> { None } + + /// Indicates to the optimizer if its safe to push a limit down past + /// this extension node + fn allows_limit_to_inputs(&self) -> bool; } /// Automatically derive UserDefinedLogicalNode to `UserDefinedLogicalNode` @@ -361,6 +369,10 @@ impl UserDefinedLogicalNode for T { .downcast_ref::() .and_then(|other| self.partial_cmp(other)) } + + fn allows_limit_to_inputs(&self) -> bool { + self.allows_limit_to_inputs() + } } fn get_all_columns_from_schema(schema: &DFSchema) -> HashSet { From e9d8574b990b006a62e2c214d838cbf819b4963e Mon Sep 17 00:00:00 2001 From: Austin Liu Date: Tue, 1 Oct 2024 00:17:33 +0800 Subject: [PATCH 2/6] Update corresponding interface Signed-off-by: Austin Liu Add rewrite rule for `push-down-limit` for `Extension` Signed-off-by: Austin Liu --- datafusion/core/src/physical_planner.rs | 4 ++++ datafusion/optimizer/src/analyzer/subquery.rs | 4 ++++ datafusion/optimizer/src/optimize_projections/mod.rs | 8 ++++++++ datafusion/optimizer/src/push_down_filter.rs | 3 +++ datafusion/optimizer/src/test/user_defined.rs | 4 ++++ 5 files changed, 23 insertions(+) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 520392c9f075..1650d187805c 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -2557,6 +2557,10 @@ mod tests { ) -> Result { unimplemented!("NoOp"); } + + fn allows_limit_to_inputs(&self) -> bool { + false // Disallow limit push-down by default + } } #[derive(Debug)] diff --git a/datafusion/optimizer/src/analyzer/subquery.rs b/datafusion/optimizer/src/analyzer/subquery.rs index c771f31a58b2..68af00e01f25 100644 --- a/datafusion/optimizer/src/analyzer/subquery.rs +++ b/datafusion/optimizer/src/analyzer/subquery.rs @@ -385,6 +385,10 @@ mod test { empty_schema: Arc::clone(&self.empty_schema), }) } + + fn allows_limit_to_inputs(&self) -> bool { + false // Disallow limit push-down by default + } } #[test] diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index 5ab427a31699..36d205eb219d 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -895,6 +895,10 @@ mod tests { // Since schema is same. Output columns requires their corresponding version in the input columns. Some(vec![output_columns.to_vec()]) } + + fn allows_limit_to_inputs(&self) -> bool { + false // Disallow limit push-down by default + } } #[derive(Debug, Hash, PartialEq, Eq)] @@ -991,6 +995,10 @@ mod tests { } Some(vec![left_reqs, right_reqs]) } + + fn allows_limit_to_inputs(&self) -> bool { + false // Disallow limit push-down by default + } } #[test] diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 4e36cc62588e..5acd786444e3 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -1499,6 +1499,9 @@ mod tests { schema: Arc::clone(&self.schema), }) } + fn allows_limit_to_inputs(&self) -> bool { + false // Disallow limit push-down by default + } } #[test] diff --git a/datafusion/optimizer/src/test/user_defined.rs b/datafusion/optimizer/src/test/user_defined.rs index 814cd0c0cd0a..fb51ae042a82 100644 --- a/datafusion/optimizer/src/test/user_defined.rs +++ b/datafusion/optimizer/src/test/user_defined.rs @@ -76,4 +76,8 @@ impl UserDefinedLogicalNodeCore for TestUserDefinedPlanNode { input: inputs.swap_remove(0), }) } + + fn allows_limit_to_inputs(&self) -> bool { + false // Disallow limit push-down by default + } } From af8babf036e4f528680550aca9ea04dd24c99e9c Mon Sep 17 00:00:00 2001 From: Austin Liu Date: Tue, 1 Oct 2024 00:19:54 +0800 Subject: [PATCH 3/6] Add rewrite rule for `push-down-limit` for `Extension` and tests Signed-off-by: Austin Liu --- datafusion/optimizer/src/push_down_limit.rs | 256 +++++++++++++++++++- 1 file changed, 255 insertions(+), 1 deletion(-) diff --git a/datafusion/optimizer/src/push_down_limit.rs b/datafusion/optimizer/src/push_down_limit.rs index 158c7592df51..7937d6fcf567 100644 --- a/datafusion/optimizer/src/push_down_limit.rs +++ b/datafusion/optimizer/src/push_down_limit.rs @@ -153,6 +153,36 @@ impl OptimizerRule for PushDownLimit { subquery_alias.input = Arc::new(new_limit); Ok(Transformed::yes(LogicalPlan::SubqueryAlias(subquery_alias))) } + LogicalPlan::Extension(extension_plan) => { + if !extension_plan.node.allows_limit_to_inputs() { + // If push down is not allowed, keep the original limit + return original_limit( + skip, + fetch, + LogicalPlan::Extension(extension_plan), + ); + } + + let new_children = extension_plan + .node + .inputs() + .into_iter() + .map(|child| { + LogicalPlan::Limit(Limit { + skip: 0, + fetch: Some(fetch + skip), + input: Arc::new(child.clone()), + }) + }) + .collect::>(); + + // Create a new extension node with updated inputs + let child_plan = LogicalPlan::Extension(extension_plan); + let new_extension = + child_plan.with_new_exprs(child_plan.expressions(), new_children)?; + + transformed_limit(skip, fetch, new_extension) + } input => original_limit(skip, fetch, input), } } @@ -258,17 +288,241 @@ fn push_down_join(mut join: Join, limit: usize) -> Transformed { #[cfg(test)] mod test { + use std::cmp::Ordering; + use std::fmt::{Debug, Formatter}; use std::vec; use super::*; use crate::test::*; - use datafusion_expr::{col, exists, logical_plan::builder::LogicalPlanBuilder}; + + use datafusion_common::DFSchemaRef; + use datafusion_expr::{ + col, exists, logical_plan::builder::LogicalPlanBuilder, Expr, Extension, + UserDefinedLogicalNodeCore, + }; use datafusion_functions_aggregate::expr_fn::max; fn assert_optimized_plan_equal(plan: LogicalPlan, expected: &str) -> Result<()> { assert_optimized_plan_eq(Arc::new(PushDownLimit::new()), plan, expected) } + #[derive(Debug, PartialEq, Eq, Hash)] + pub struct NoopPlan { + input: Vec, + schema: DFSchemaRef, + } + + // Manual implementation needed because of `schema` field. Comparison excludes this field. + impl PartialOrd for NoopPlan { + fn partial_cmp(&self, other: &Self) -> Option { + self.input.partial_cmp(&other.input) + } + } + + impl UserDefinedLogicalNodeCore for NoopPlan { + fn name(&self) -> &str { + "NoopPlan" + } + + fn inputs(&self) -> Vec<&LogicalPlan> { + self.input.iter().collect() + } + + fn schema(&self) -> &DFSchemaRef { + &self.schema + } + + fn expressions(&self) -> Vec { + self.input + .iter() + .flat_map(|child| child.expressions()) + .collect() + } + + fn fmt_for_explain(&self, f: &mut Formatter) -> std::fmt::Result { + write!(f, "NoopPlan") + } + + fn with_exprs_and_inputs( + &self, + _exprs: Vec, + inputs: Vec, + ) -> Result { + Ok(Self { + input: inputs, + schema: Arc::clone(&self.schema), + }) + } + + fn allows_limit_to_inputs(&self) -> bool { + true // Allow limit push-down + } + } + + #[derive(Debug, PartialEq, Eq, Hash)] + struct NoLimitNoopPlan { + input: Vec, + schema: DFSchemaRef, + } + + // Manual implementation needed because of `schema` field. Comparison excludes this field. + impl PartialOrd for NoLimitNoopPlan { + fn partial_cmp(&self, other: &Self) -> Option { + self.input.partial_cmp(&other.input) + } + } + + impl UserDefinedLogicalNodeCore for NoLimitNoopPlan { + fn name(&self) -> &str { + "NoLimitNoopPlan" + } + + fn inputs(&self) -> Vec<&LogicalPlan> { + self.input.iter().collect() + } + + fn schema(&self) -> &DFSchemaRef { + &self.schema + } + + fn expressions(&self) -> Vec { + self.input + .iter() + .flat_map(|child| child.expressions()) + .collect() + } + + fn fmt_for_explain(&self, f: &mut Formatter) -> std::fmt::Result { + write!(f, "NoLimitNoopPlan") + } + + fn with_exprs_and_inputs( + &self, + _exprs: Vec, + inputs: Vec, + ) -> Result { + Ok(Self { + input: inputs, + schema: Arc::clone(&self.schema), + }) + } + + fn allows_limit_to_inputs(&self) -> bool { + false // Disallow limit push-down by default + } + } + #[test] + fn limit_pushdown_basic() -> Result<()> { + let table_scan = test_table_scan()?; + let noop_plan = LogicalPlan::Extension(Extension { + node: Arc::new(NoopPlan { + input: vec![table_scan.clone()], + schema: Arc::clone(table_scan.schema()), + }), + }); + + let plan = LogicalPlanBuilder::from(noop_plan) + .limit(0, Some(1000))? + .build()?; + + let expected = "Limit: skip=0, fetch=1000\ + \n NoopPlan\ + \n Limit: skip=0, fetch=1000\ + \n TableScan: test, fetch=1000"; + + assert_optimized_plan_equal(plan, expected) + } + + #[test] + fn limit_pushdown_with_skip() -> Result<()> { + let table_scan = test_table_scan()?; + let noop_plan = LogicalPlan::Extension(Extension { + node: Arc::new(NoopPlan { + input: vec![table_scan.clone()], + schema: Arc::clone(table_scan.schema()), + }), + }); + + let plan = LogicalPlanBuilder::from(noop_plan) + .limit(10, Some(1000))? + .build()?; + + let expected = "Limit: skip=10, fetch=1000\ + \n NoopPlan\ + \n Limit: skip=0, fetch=1010\ + \n TableScan: test, fetch=1010"; + + assert_optimized_plan_equal(plan, expected) + } + + #[test] + fn limit_pushdown_multiple_limits() -> Result<()> { + let table_scan = test_table_scan()?; + let noop_plan = LogicalPlan::Extension(Extension { + node: Arc::new(NoopPlan { + input: vec![table_scan.clone()], + schema: Arc::clone(table_scan.schema()), + }), + }); + + let plan = LogicalPlanBuilder::from(noop_plan) + .limit(10, Some(1000))? + .limit(20, Some(500))? + .build()?; + + let expected = "Limit: skip=30, fetch=500\ + \n NoopPlan\ + \n Limit: skip=0, fetch=530\ + \n TableScan: test, fetch=530"; + + assert_optimized_plan_equal(plan, expected) + } + + #[test] + fn limit_pushdown_multiple_inputs() -> Result<()> { + let table_scan = test_table_scan()?; + let noop_plan = LogicalPlan::Extension(Extension { + node: Arc::new(NoopPlan { + input: vec![table_scan.clone(), table_scan.clone()], + schema: Arc::clone(table_scan.schema()), + }), + }); + + let plan = LogicalPlanBuilder::from(noop_plan) + .limit(0, Some(1000))? + .build()?; + + let expected = "Limit: skip=0, fetch=1000\ + \n NoopPlan\ + \n Limit: skip=0, fetch=1000\ + \n TableScan: test, fetch=1000\ + \n Limit: skip=0, fetch=1000\ + \n TableScan: test, fetch=1000"; + + assert_optimized_plan_equal(plan, expected) + } + + #[test] + fn limit_pushdown_disallowed_noop_plan() -> Result<()> { + let table_scan = test_table_scan()?; + let no_limit_noop_plan = LogicalPlan::Extension(Extension { + node: Arc::new(NoLimitNoopPlan { + input: vec![table_scan.clone()], + schema: Arc::clone(table_scan.schema()), + }), + }); + + let plan = LogicalPlanBuilder::from(no_limit_noop_plan) + .limit(0, Some(1000))? + .build()?; + + let expected = "Limit: skip=0, fetch=1000\ + \n NoLimitNoopPlan\ + \n TableScan: test"; + + assert_optimized_plan_equal(plan, expected) + } + #[test] fn limit_pushdown_projection_table_provider() -> Result<()> { let table_scan = test_table_scan()?; From 0872fa7519edc5986c9a4ef555ef5dfea235be95 Mon Sep 17 00:00:00 2001 From: Austin Liu Date: Tue, 1 Oct 2024 00:39:57 +0800 Subject: [PATCH 4/6] Update corresponding interface Signed-off-by: Austin Liu --- datafusion/core/tests/user_defined/user_defined_plan.rs | 4 ++++ datafusion/optimizer/src/push_down_filter.rs | 1 + datafusion/proto/tests/cases/roundtrip_logical_plan.rs | 4 ++++ datafusion/substrait/tests/cases/roundtrip_logical_plan.rs | 4 ++++ 4 files changed, 13 insertions(+) diff --git a/datafusion/core/tests/user_defined/user_defined_plan.rs b/datafusion/core/tests/user_defined/user_defined_plan.rs index e51adbc4ddc1..baf14d4c06f2 100644 --- a/datafusion/core/tests/user_defined/user_defined_plan.rs +++ b/datafusion/core/tests/user_defined/user_defined_plan.rs @@ -443,6 +443,10 @@ impl UserDefinedLogicalNodeCore for TopKPlanNode { expr: replace_sort_expression(self.expr.clone(), exprs.swap_remove(0)), }) } + + fn allows_limit_to_inputs(&self) -> bool { + false // Disallow limit push-down by default + } } /// Physical planner for TopK nodes diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 5acd786444e3..0ecf48028519 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -1499,6 +1499,7 @@ mod tests { schema: Arc::clone(&self.schema), }) } + fn allows_limit_to_inputs(&self) -> bool { false // Disallow limit push-down by default } diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index 8a94f905812c..e53775c16336 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -1060,6 +1060,10 @@ impl UserDefinedLogicalNodeCore for TopKPlanNode { expr: exprs.swap_remove(0), }) } + + fn allows_limit_to_inputs(&self) -> bool { + false // Disallow limit push-down by default + } } #[derive(Debug)] diff --git a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs index f7686bec5435..93569bbb123e 100644 --- a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs @@ -149,6 +149,10 @@ impl UserDefinedLogicalNode for MockUserDefinedLogicalPlan { fn dyn_ord(&self, _: &dyn UserDefinedLogicalNode) -> Option { unimplemented!() } + + fn allows_limit_to_inputs(&self) -> bool { + false // Disallow limit push-down by default + } } impl MockUserDefinedLogicalPlan { From 3337d66a8070beef8858ebf4ea3879d43390abc9 Mon Sep 17 00:00:00 2001 From: Austin Liu Date: Tue, 1 Oct 2024 17:02:49 +0800 Subject: [PATCH 5/6] Reorganize to match guard Signed-off-by: Austin Liu --- datafusion/optimizer/src/push_down_limit.rs | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/datafusion/optimizer/src/push_down_limit.rs b/datafusion/optimizer/src/push_down_limit.rs index 7937d6fcf567..cc322d8d1c0d 100644 --- a/datafusion/optimizer/src/push_down_limit.rs +++ b/datafusion/optimizer/src/push_down_limit.rs @@ -153,16 +153,15 @@ impl OptimizerRule for PushDownLimit { subquery_alias.input = Arc::new(new_limit); Ok(Transformed::yes(LogicalPlan::SubqueryAlias(subquery_alias))) } - LogicalPlan::Extension(extension_plan) => { - if !extension_plan.node.allows_limit_to_inputs() { - // If push down is not allowed, keep the original limit - return original_limit( - skip, - fetch, - LogicalPlan::Extension(extension_plan), - ); - } - + LogicalPlan::Extension(extension_plan) + if !extension_plan.node.allows_limit_to_inputs() => + { + // If push down is not allowed, keep the original limit + original_limit(skip, fetch, LogicalPlan::Extension(extension_plan)) + } + LogicalPlan::Extension(extension_plan) + if extension_plan.node.allows_limit_to_inputs() => + { let new_children = extension_plan .node .inputs() From 2ccaa9696a7a32925ac819c73b5f4fcbc262a575 Mon Sep 17 00:00:00 2001 From: Austin Liu Date: Wed, 2 Oct 2024 21:14:09 +0800 Subject: [PATCH 6/6] Clena up Signed-off-by: Austin Liu Clean up Signed-off-by: Austin Liu --- datafusion/core/src/physical_planner.rs | 2 +- .../tests/user_defined/user_defined_plan.rs | 2 +- datafusion/expr/src/logical_plan/extension.rs | 28 +++++++++++++------ datafusion/optimizer/src/analyzer/subquery.rs | 2 +- .../optimizer/src/optimize_projections/mod.rs | 4 +-- datafusion/optimizer/src/push_down_filter.rs | 2 +- datafusion/optimizer/src/push_down_limit.rs | 12 ++------ datafusion/optimizer/src/test/user_defined.rs | 2 +- .../tests/cases/roundtrip_logical_plan.rs | 2 +- .../tests/cases/roundtrip_logical_plan.rs | 2 +- 10 files changed, 32 insertions(+), 26 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 1650d187805c..78c70606bf68 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -2558,7 +2558,7 @@ mod tests { unimplemented!("NoOp"); } - fn allows_limit_to_inputs(&self) -> bool { + fn supports_limit_pushdown(&self) -> bool { false // Disallow limit push-down by default } } diff --git a/datafusion/core/tests/user_defined/user_defined_plan.rs b/datafusion/core/tests/user_defined/user_defined_plan.rs index baf14d4c06f2..2b45d0ed600b 100644 --- a/datafusion/core/tests/user_defined/user_defined_plan.rs +++ b/datafusion/core/tests/user_defined/user_defined_plan.rs @@ -444,7 +444,7 @@ impl UserDefinedLogicalNodeCore for TopKPlanNode { }) } - fn allows_limit_to_inputs(&self) -> bool { + fn supports_limit_pushdown(&self) -> bool { false // Disallow limit push-down by default } } diff --git a/datafusion/expr/src/logical_plan/extension.rs b/datafusion/expr/src/logical_plan/extension.rs index d0b05cfd590a..19d4cb3db9ce 100644 --- a/datafusion/expr/src/logical_plan/extension.rs +++ b/datafusion/expr/src/logical_plan/extension.rs @@ -196,9 +196,15 @@ pub trait UserDefinedLogicalNode: fmt::Debug + Send + Sync { fn dyn_eq(&self, other: &dyn UserDefinedLogicalNode) -> bool; fn dyn_ord(&self, other: &dyn UserDefinedLogicalNode) -> Option; - /// Indicates to the optimizer if its safe to push a limit down past - /// this extension node - fn allows_limit_to_inputs(&self) -> bool; + /// Returns `true` if a limit can be safely pushed down through this + /// `UserDefinedLogicalNode` node. + /// + /// If this method returns `true`, and the query plan contains a limit at + /// the output of this node, DataFusion will push the limit to the input + /// of this node. + fn supports_limit_pushdown(&self) -> bool { + false + } } impl Hash for dyn UserDefinedLogicalNode { @@ -300,9 +306,15 @@ pub trait UserDefinedLogicalNodeCore: None } - /// Indicates to the optimizer if its safe to push a limit down past - /// this extension node - fn allows_limit_to_inputs(&self) -> bool; + /// Returns `true` if a limit can be safely pushed down through this + /// `UserDefinedLogicalNode` node. + /// + /// If this method returns `true`, and the query plan contains a limit at + /// the output of this node, DataFusion will push the limit to the input + /// of this node. + fn supports_limit_pushdown(&self) -> bool { + false // Disallow limit push-down by default + } } /// Automatically derive UserDefinedLogicalNode to `UserDefinedLogicalNode` @@ -370,8 +382,8 @@ impl UserDefinedLogicalNode for T { .and_then(|other| self.partial_cmp(other)) } - fn allows_limit_to_inputs(&self) -> bool { - self.allows_limit_to_inputs() + fn supports_limit_pushdown(&self) -> bool { + self.supports_limit_pushdown() } } diff --git a/datafusion/optimizer/src/analyzer/subquery.rs b/datafusion/optimizer/src/analyzer/subquery.rs index 68af00e01f25..aabc549de583 100644 --- a/datafusion/optimizer/src/analyzer/subquery.rs +++ b/datafusion/optimizer/src/analyzer/subquery.rs @@ -386,7 +386,7 @@ mod test { }) } - fn allows_limit_to_inputs(&self) -> bool { + fn supports_limit_pushdown(&self) -> bool { false // Disallow limit push-down by default } } diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index 36d205eb219d..b5d581f3919f 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -896,7 +896,7 @@ mod tests { Some(vec![output_columns.to_vec()]) } - fn allows_limit_to_inputs(&self) -> bool { + fn supports_limit_pushdown(&self) -> bool { false // Disallow limit push-down by default } } @@ -996,7 +996,7 @@ mod tests { Some(vec![left_reqs, right_reqs]) } - fn allows_limit_to_inputs(&self) -> bool { + fn supports_limit_pushdown(&self) -> bool { false // Disallow limit push-down by default } } diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 0ecf48028519..6e2cc0cbdbcb 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -1500,7 +1500,7 @@ mod tests { }) } - fn allows_limit_to_inputs(&self) -> bool { + fn supports_limit_pushdown(&self) -> bool { false // Disallow limit push-down by default } } diff --git a/datafusion/optimizer/src/push_down_limit.rs b/datafusion/optimizer/src/push_down_limit.rs index cc322d8d1c0d..8b5e483001b3 100644 --- a/datafusion/optimizer/src/push_down_limit.rs +++ b/datafusion/optimizer/src/push_down_limit.rs @@ -154,13 +154,7 @@ impl OptimizerRule for PushDownLimit { Ok(Transformed::yes(LogicalPlan::SubqueryAlias(subquery_alias))) } LogicalPlan::Extension(extension_plan) - if !extension_plan.node.allows_limit_to_inputs() => - { - // If push down is not allowed, keep the original limit - original_limit(skip, fetch, LogicalPlan::Extension(extension_plan)) - } - LogicalPlan::Extension(extension_plan) - if extension_plan.node.allows_limit_to_inputs() => + if extension_plan.node.supports_limit_pushdown() => { let new_children = extension_plan .node @@ -353,7 +347,7 @@ mod test { }) } - fn allows_limit_to_inputs(&self) -> bool { + fn supports_limit_pushdown(&self) -> bool { true // Allow limit push-down } } @@ -406,7 +400,7 @@ mod test { }) } - fn allows_limit_to_inputs(&self) -> bool { + fn supports_limit_pushdown(&self) -> bool { false // Disallow limit push-down by default } } diff --git a/datafusion/optimizer/src/test/user_defined.rs b/datafusion/optimizer/src/test/user_defined.rs index fb51ae042a82..a39f90b5da5d 100644 --- a/datafusion/optimizer/src/test/user_defined.rs +++ b/datafusion/optimizer/src/test/user_defined.rs @@ -77,7 +77,7 @@ impl UserDefinedLogicalNodeCore for TestUserDefinedPlanNode { }) } - fn allows_limit_to_inputs(&self) -> bool { + fn supports_limit_pushdown(&self) -> bool { false // Disallow limit push-down by default } } diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index e53775c16336..cd789e06dc3b 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -1061,7 +1061,7 @@ impl UserDefinedLogicalNodeCore for TopKPlanNode { }) } - fn allows_limit_to_inputs(&self) -> bool { + fn supports_limit_pushdown(&self) -> bool { false // Disallow limit push-down by default } } diff --git a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs index 93569bbb123e..3b7d0fd29610 100644 --- a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs @@ -150,7 +150,7 @@ impl UserDefinedLogicalNode for MockUserDefinedLogicalPlan { unimplemented!() } - fn allows_limit_to_inputs(&self) -> bool { + fn supports_limit_pushdown(&self) -> bool { false // Disallow limit push-down by default } }