diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 520392c9f075..78c70606bf68 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -2557,6 +2557,10 @@ mod tests { ) -> Result { unimplemented!("NoOp"); } + + fn supports_limit_pushdown(&self) -> bool { + false // Disallow limit push-down by default + } } #[derive(Debug)] diff --git a/datafusion/core/tests/user_defined/user_defined_plan.rs b/datafusion/core/tests/user_defined/user_defined_plan.rs index e51adbc4ddc1..2b45d0ed600b 100644 --- a/datafusion/core/tests/user_defined/user_defined_plan.rs +++ b/datafusion/core/tests/user_defined/user_defined_plan.rs @@ -443,6 +443,10 @@ impl UserDefinedLogicalNodeCore for TopKPlanNode { expr: replace_sort_expression(self.expr.clone(), exprs.swap_remove(0)), }) } + + fn supports_limit_pushdown(&self) -> bool { + false // Disallow limit push-down by default + } } /// Physical planner for TopK nodes diff --git a/datafusion/expr/src/logical_plan/extension.rs b/datafusion/expr/src/logical_plan/extension.rs index d49c85fb6fd6..19d4cb3db9ce 100644 --- a/datafusion/expr/src/logical_plan/extension.rs +++ b/datafusion/expr/src/logical_plan/extension.rs @@ -195,6 +195,16 @@ pub trait UserDefinedLogicalNode: fmt::Debug + Send + Sync { /// directly because it must remain object safe. fn dyn_eq(&self, other: &dyn UserDefinedLogicalNode) -> bool; fn dyn_ord(&self, other: &dyn UserDefinedLogicalNode) -> Option; + + /// Returns `true` if a limit can be safely pushed down through this + /// `UserDefinedLogicalNode` node. + /// + /// If this method returns `true`, and the query plan contains a limit at + /// the output of this node, DataFusion will push the limit to the input + /// of this node. + fn supports_limit_pushdown(&self) -> bool { + false + } } impl Hash for dyn UserDefinedLogicalNode { @@ -295,6 +305,16 @@ pub trait UserDefinedLogicalNodeCore: ) -> Option>> { None } + + /// Returns `true` if a limit can be safely pushed down through this + /// `UserDefinedLogicalNode` node. + /// + /// If this method returns `true`, and the query plan contains a limit at + /// the output of this node, DataFusion will push the limit to the input + /// of this node. + fn supports_limit_pushdown(&self) -> bool { + false // Disallow limit push-down by default + } } /// Automatically derive UserDefinedLogicalNode to `UserDefinedLogicalNode` @@ -361,6 +381,10 @@ impl UserDefinedLogicalNode for T { .downcast_ref::() .and_then(|other| self.partial_cmp(other)) } + + fn supports_limit_pushdown(&self) -> bool { + self.supports_limit_pushdown() + } } fn get_all_columns_from_schema(schema: &DFSchema) -> HashSet { diff --git a/datafusion/optimizer/src/analyzer/subquery.rs b/datafusion/optimizer/src/analyzer/subquery.rs index c771f31a58b2..aabc549de583 100644 --- a/datafusion/optimizer/src/analyzer/subquery.rs +++ b/datafusion/optimizer/src/analyzer/subquery.rs @@ -385,6 +385,10 @@ mod test { empty_schema: Arc::clone(&self.empty_schema), }) } + + fn supports_limit_pushdown(&self) -> bool { + false // Disallow limit push-down by default + } } #[test] diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index 5ab427a31699..b5d581f3919f 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -895,6 +895,10 @@ mod tests { // Since schema is same. Output columns requires their corresponding version in the input columns. Some(vec![output_columns.to_vec()]) } + + fn supports_limit_pushdown(&self) -> bool { + false // Disallow limit push-down by default + } } #[derive(Debug, Hash, PartialEq, Eq)] @@ -991,6 +995,10 @@ mod tests { } Some(vec![left_reqs, right_reqs]) } + + fn supports_limit_pushdown(&self) -> bool { + false // Disallow limit push-down by default + } } #[test] diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 4e36cc62588e..6e2cc0cbdbcb 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -1499,6 +1499,10 @@ mod tests { schema: Arc::clone(&self.schema), }) } + + fn supports_limit_pushdown(&self) -> bool { + false // Disallow limit push-down by default + } } #[test] diff --git a/datafusion/optimizer/src/push_down_limit.rs b/datafusion/optimizer/src/push_down_limit.rs index 158c7592df51..8b5e483001b3 100644 --- a/datafusion/optimizer/src/push_down_limit.rs +++ b/datafusion/optimizer/src/push_down_limit.rs @@ -153,6 +153,29 @@ impl OptimizerRule for PushDownLimit { subquery_alias.input = Arc::new(new_limit); Ok(Transformed::yes(LogicalPlan::SubqueryAlias(subquery_alias))) } + LogicalPlan::Extension(extension_plan) + if extension_plan.node.supports_limit_pushdown() => + { + let new_children = extension_plan + .node + .inputs() + .into_iter() + .map(|child| { + LogicalPlan::Limit(Limit { + skip: 0, + fetch: Some(fetch + skip), + input: Arc::new(child.clone()), + }) + }) + .collect::>(); + + // Create a new extension node with updated inputs + let child_plan = LogicalPlan::Extension(extension_plan); + let new_extension = + child_plan.with_new_exprs(child_plan.expressions(), new_children)?; + + transformed_limit(skip, fetch, new_extension) + } input => original_limit(skip, fetch, input), } } @@ -258,17 +281,241 @@ fn push_down_join(mut join: Join, limit: usize) -> Transformed { #[cfg(test)] mod test { + use std::cmp::Ordering; + use std::fmt::{Debug, Formatter}; use std::vec; use super::*; use crate::test::*; - use datafusion_expr::{col, exists, logical_plan::builder::LogicalPlanBuilder}; + + use datafusion_common::DFSchemaRef; + use datafusion_expr::{ + col, exists, logical_plan::builder::LogicalPlanBuilder, Expr, Extension, + UserDefinedLogicalNodeCore, + }; use datafusion_functions_aggregate::expr_fn::max; fn assert_optimized_plan_equal(plan: LogicalPlan, expected: &str) -> Result<()> { assert_optimized_plan_eq(Arc::new(PushDownLimit::new()), plan, expected) } + #[derive(Debug, PartialEq, Eq, Hash)] + pub struct NoopPlan { + input: Vec, + schema: DFSchemaRef, + } + + // Manual implementation needed because of `schema` field. Comparison excludes this field. + impl PartialOrd for NoopPlan { + fn partial_cmp(&self, other: &Self) -> Option { + self.input.partial_cmp(&other.input) + } + } + + impl UserDefinedLogicalNodeCore for NoopPlan { + fn name(&self) -> &str { + "NoopPlan" + } + + fn inputs(&self) -> Vec<&LogicalPlan> { + self.input.iter().collect() + } + + fn schema(&self) -> &DFSchemaRef { + &self.schema + } + + fn expressions(&self) -> Vec { + self.input + .iter() + .flat_map(|child| child.expressions()) + .collect() + } + + fn fmt_for_explain(&self, f: &mut Formatter) -> std::fmt::Result { + write!(f, "NoopPlan") + } + + fn with_exprs_and_inputs( + &self, + _exprs: Vec, + inputs: Vec, + ) -> Result { + Ok(Self { + input: inputs, + schema: Arc::clone(&self.schema), + }) + } + + fn supports_limit_pushdown(&self) -> bool { + true // Allow limit push-down + } + } + + #[derive(Debug, PartialEq, Eq, Hash)] + struct NoLimitNoopPlan { + input: Vec, + schema: DFSchemaRef, + } + + // Manual implementation needed because of `schema` field. Comparison excludes this field. + impl PartialOrd for NoLimitNoopPlan { + fn partial_cmp(&self, other: &Self) -> Option { + self.input.partial_cmp(&other.input) + } + } + + impl UserDefinedLogicalNodeCore for NoLimitNoopPlan { + fn name(&self) -> &str { + "NoLimitNoopPlan" + } + + fn inputs(&self) -> Vec<&LogicalPlan> { + self.input.iter().collect() + } + + fn schema(&self) -> &DFSchemaRef { + &self.schema + } + + fn expressions(&self) -> Vec { + self.input + .iter() + .flat_map(|child| child.expressions()) + .collect() + } + + fn fmt_for_explain(&self, f: &mut Formatter) -> std::fmt::Result { + write!(f, "NoLimitNoopPlan") + } + + fn with_exprs_and_inputs( + &self, + _exprs: Vec, + inputs: Vec, + ) -> Result { + Ok(Self { + input: inputs, + schema: Arc::clone(&self.schema), + }) + } + + fn supports_limit_pushdown(&self) -> bool { + false // Disallow limit push-down by default + } + } + #[test] + fn limit_pushdown_basic() -> Result<()> { + let table_scan = test_table_scan()?; + let noop_plan = LogicalPlan::Extension(Extension { + node: Arc::new(NoopPlan { + input: vec![table_scan.clone()], + schema: Arc::clone(table_scan.schema()), + }), + }); + + let plan = LogicalPlanBuilder::from(noop_plan) + .limit(0, Some(1000))? + .build()?; + + let expected = "Limit: skip=0, fetch=1000\ + \n NoopPlan\ + \n Limit: skip=0, fetch=1000\ + \n TableScan: test, fetch=1000"; + + assert_optimized_plan_equal(plan, expected) + } + + #[test] + fn limit_pushdown_with_skip() -> Result<()> { + let table_scan = test_table_scan()?; + let noop_plan = LogicalPlan::Extension(Extension { + node: Arc::new(NoopPlan { + input: vec![table_scan.clone()], + schema: Arc::clone(table_scan.schema()), + }), + }); + + let plan = LogicalPlanBuilder::from(noop_plan) + .limit(10, Some(1000))? + .build()?; + + let expected = "Limit: skip=10, fetch=1000\ + \n NoopPlan\ + \n Limit: skip=0, fetch=1010\ + \n TableScan: test, fetch=1010"; + + assert_optimized_plan_equal(plan, expected) + } + + #[test] + fn limit_pushdown_multiple_limits() -> Result<()> { + let table_scan = test_table_scan()?; + let noop_plan = LogicalPlan::Extension(Extension { + node: Arc::new(NoopPlan { + input: vec![table_scan.clone()], + schema: Arc::clone(table_scan.schema()), + }), + }); + + let plan = LogicalPlanBuilder::from(noop_plan) + .limit(10, Some(1000))? + .limit(20, Some(500))? + .build()?; + + let expected = "Limit: skip=30, fetch=500\ + \n NoopPlan\ + \n Limit: skip=0, fetch=530\ + \n TableScan: test, fetch=530"; + + assert_optimized_plan_equal(plan, expected) + } + + #[test] + fn limit_pushdown_multiple_inputs() -> Result<()> { + let table_scan = test_table_scan()?; + let noop_plan = LogicalPlan::Extension(Extension { + node: Arc::new(NoopPlan { + input: vec![table_scan.clone(), table_scan.clone()], + schema: Arc::clone(table_scan.schema()), + }), + }); + + let plan = LogicalPlanBuilder::from(noop_plan) + .limit(0, Some(1000))? + .build()?; + + let expected = "Limit: skip=0, fetch=1000\ + \n NoopPlan\ + \n Limit: skip=0, fetch=1000\ + \n TableScan: test, fetch=1000\ + \n Limit: skip=0, fetch=1000\ + \n TableScan: test, fetch=1000"; + + assert_optimized_plan_equal(plan, expected) + } + + #[test] + fn limit_pushdown_disallowed_noop_plan() -> Result<()> { + let table_scan = test_table_scan()?; + let no_limit_noop_plan = LogicalPlan::Extension(Extension { + node: Arc::new(NoLimitNoopPlan { + input: vec![table_scan.clone()], + schema: Arc::clone(table_scan.schema()), + }), + }); + + let plan = LogicalPlanBuilder::from(no_limit_noop_plan) + .limit(0, Some(1000))? + .build()?; + + let expected = "Limit: skip=0, fetch=1000\ + \n NoLimitNoopPlan\ + \n TableScan: test"; + + assert_optimized_plan_equal(plan, expected) + } + #[test] fn limit_pushdown_projection_table_provider() -> Result<()> { let table_scan = test_table_scan()?; diff --git a/datafusion/optimizer/src/test/user_defined.rs b/datafusion/optimizer/src/test/user_defined.rs index 814cd0c0cd0a..a39f90b5da5d 100644 --- a/datafusion/optimizer/src/test/user_defined.rs +++ b/datafusion/optimizer/src/test/user_defined.rs @@ -76,4 +76,8 @@ impl UserDefinedLogicalNodeCore for TestUserDefinedPlanNode { input: inputs.swap_remove(0), }) } + + fn supports_limit_pushdown(&self) -> bool { + false // Disallow limit push-down by default + } } diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index 8a94f905812c..cd789e06dc3b 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -1060,6 +1060,10 @@ impl UserDefinedLogicalNodeCore for TopKPlanNode { expr: exprs.swap_remove(0), }) } + + fn supports_limit_pushdown(&self) -> bool { + false // Disallow limit push-down by default + } } #[derive(Debug)] diff --git a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs index f7686bec5435..3b7d0fd29610 100644 --- a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs @@ -149,6 +149,10 @@ impl UserDefinedLogicalNode for MockUserDefinedLogicalPlan { fn dyn_ord(&self, _: &dyn UserDefinedLogicalNode) -> Option { unimplemented!() } + + fn supports_limit_pushdown(&self) -> bool { + false // Disallow limit push-down by default + } } impl MockUserDefinedLogicalPlan {