diff --git a/datafusion/physical-optimizer/src/limit_pushdown.rs b/datafusion/physical-optimizer/src/limit_pushdown.rs index 7f45292f9e27..15d210e1b10b 100644 --- a/datafusion/physical-optimizer/src/limit_pushdown.rs +++ b/datafusion/physical-optimizer/src/limit_pushdown.rs @@ -126,10 +126,14 @@ impl From for Arc { /// The helper takes an `ExecutionPlan` and a global (algorithm) state which is /// an instance of `GlobalRequirements` and modifies these parameters while /// checking if the limits can be pushed down or not. +/// +/// If a limit is encountered, a [`TreeNodeRecursion::Stop`] is returned. Otherwise, +/// return a [`TreeNodeRecursion::Continue`]. pub fn pushdown_limit_helper( mut pushdown_plan: Arc, mut global_state: GlobalRequirements, ) -> Result<(Transformed>, GlobalRequirements)> { + // Extract limit, if exist, and return child inputs. if let Some(limit_exec) = extract_limit(&pushdown_plan) { // If we have fetch/skip info in the global state already, we need to // decide which one to continue with: @@ -199,10 +203,17 @@ pub fn pushdown_limit_helper( // This plan is combining input partitions, so we need to add the // fetch info to plan if possible. If not, we must add a `LimitExec` // with the information from the global state. + let mut new_plan = plan_with_fetch; + // Execution plans can't (yet) handle skip, so if we have one, + // we still need to add a global limit + if global_state.skip > 0 { + new_plan = + add_global_limit(new_plan, global_state.skip, global_state.fetch); + } global_state.fetch = skip_and_fetch; global_state.skip = 0; global_state.satisfied = true; - Ok((Transformed::yes(plan_with_fetch), global_state)) + Ok((Transformed::yes(new_plan), global_state)) } else if global_state.satisfied { // If the plan is already satisfied, do not add a limit: Ok((Transformed::no(pushdown_plan), global_state)) @@ -256,13 +267,17 @@ pub(crate) fn pushdown_limits( pushdown_plan: Arc, global_state: GlobalRequirements, ) -> Result> { + // Call pushdown_limit_helper. + // This will either extract the limit node (returning the child), or apply the limit pushdown. let (mut new_node, mut global_state) = pushdown_limit_helper(pushdown_plan, global_state)?; + // While limits exist, continue combining the global_state. while new_node.tnr == TreeNodeRecursion::Stop { (new_node, global_state) = pushdown_limit_helper(new_node.data, global_state)?; } + // Apply pushdown limits in children let children = new_node.data.children(); let new_children = children .into_iter() @@ -270,7 +285,6 @@ pub(crate) fn pushdown_limits( pushdown_limits(Arc::::clone(child), global_state.clone()) }) .collect::>()?; - new_node.data.with_new_children(new_children) } diff --git a/datafusion/sqllogictest/test_files/limit.slt b/datafusion/sqllogictest/test_files/limit.slt index 17bd398bd229..5b98392f1aa0 100644 --- a/datafusion/sqllogictest/test_files/limit.slt +++ b/datafusion/sqllogictest/test_files/limit.slt @@ -565,3 +565,149 @@ physical_plan statement ok drop table data; + + +#################### +# Test issue: limit pushdown with offsets +# Ensure the offset is not lost: https://github.com/apache/datafusion/issues/12423 +#################### + +statement ok +CREATE EXTERNAL TABLE ordered_table ( + a0 INT, + a INT, + b INT, + c INT UNSIGNED, + d INT +) +STORED AS CSV +WITH ORDER (c ASC) +LOCATION '../core/tests/data/window_2.csv' +OPTIONS ('format.has_header' 'true'); + +# all results +query II +SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc; +---- +3 25 +2 25 +1 0 +0 0 + +# limit only +query II +SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc LIMIT 3; +---- +3 25 +2 25 +1 0 + +# offset only +query II +SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc OFFSET 1; +---- +2 25 +1 0 +0 0 + +# offset + limit +query II +SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc OFFSET 1 LIMIT 2; +---- +2 25 +1 0 + +# Applying offset & limit when multiple streams from groupby +# the plan must still have a global limit to apply the offset +query TT +EXPLAIN SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc OFFSET 1 LIMIT 2; +---- +logical_plan +01)Limit: skip=1, fetch=2 +02)--Sort: ordered_table.b DESC NULLS FIRST, fetch=3 +03)----Aggregate: groupBy=[[ordered_table.b]], aggr=[[sum(CAST(ordered_table.a AS Int64))]] +04)------TableScan: ordered_table projection=[a, b] +physical_plan +01)GlobalLimitExec: skip=1, fetch=2 +02)--SortPreservingMergeExec: [b@0 DESC], fetch=3 +03)----SortExec: TopK(fetch=3), expr=[b@0 DESC], preserve_partitioning=[true] +04)------AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[sum(ordered_table.a)] +05)--------CoalesceBatchesExec: target_batch_size=8192 +06)----------RepartitionExec: partitioning=Hash([b@0], 4), input_partitions=4 +07)------------AggregateExec: mode=Partial, gby=[b@1 as b], aggr=[sum(ordered_table.a)] +08)--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +09)----------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], has_header=true + +# Applying offset & limit when multiple streams from union +# the plan must still have a global limit to apply the offset +query TT +explain select * FROM ( + select c FROM ordered_table + UNION ALL + select d FROM ordered_table +) order by 1 desc LIMIT 10 OFFSET 4; +---- +logical_plan +01)Limit: skip=4, fetch=10 +02)--Sort: ordered_table.c DESC NULLS FIRST, fetch=14 +03)----Union +04)------Projection: CAST(ordered_table.c AS Int64) AS c +05)--------TableScan: ordered_table projection=[c] +06)------Projection: CAST(ordered_table.d AS Int64) AS c +07)--------TableScan: ordered_table projection=[d] +physical_plan +01)GlobalLimitExec: skip=4, fetch=10 +02)--SortPreservingMergeExec: [c@0 DESC], fetch=14 +03)----UnionExec +04)------SortExec: TopK(fetch=14), expr=[c@0 DESC], preserve_partitioning=[true] +05)--------ProjectionExec: expr=[CAST(c@0 AS Int64) as c] +06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +07)------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], output_ordering=[c@0 ASC NULLS LAST], has_header=true +08)------SortExec: TopK(fetch=14), expr=[c@0 DESC], preserve_partitioning=[true] +09)--------ProjectionExec: expr=[CAST(d@0 AS Int64) as c] +10)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +11)------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[d], has_header=true + +# Applying LIMIT & OFFSET to subquery. +query III +select t1.b, c, c2 FROM ( + select b, c FROM ordered_table ORDER BY b desc, c desc OFFSET 1 LIMIT 4 +) as t1 INNER JOIN ( + select b, c as c2 FROM ordered_table ORDER BY b desc, d desc OFFSET 1 LIMIT 4 +) as t2 +ON t1.b = t2.b +ORDER BY t1.b desc, c desc, c2 desc; +---- +3 98 96 +3 98 89 +3 98 82 +3 98 79 +3 97 96 +3 97 89 +3 97 82 +3 97 79 +3 96 96 +3 96 89 +3 96 82 +3 96 79 +3 95 96 +3 95 89 +3 95 82 +3 95 79 + +# Apply OFFSET & LIMIT to both parent and child (subquery). +query III +select t1.b, c, c2 FROM ( + select b, c FROM ordered_table ORDER BY b desc, c desc OFFSET 1 LIMIT 4 +) as t1 INNER JOIN ( + select b, c as c2 FROM ordered_table ORDER BY b desc, d desc OFFSET 1 LIMIT 4 +) as t2 +ON t1.b = t2.b +ORDER BY t1.b desc, c desc, c2 desc +OFFSET 3 LIMIT 2; +---- +3 99 82 +3 99 79 + +statement ok +drop table ordered_table;