From b6fd751b9d5b2168740c622041d00c6b54eadbfb Mon Sep 17 00:00:00 2001 From: wiedld Date: Mon, 9 Sep 2024 10:35:26 -0700 Subject: [PATCH 1/8] test: demonstrate offset not applied correctly with limit pushdown on multiple input streams --- datafusion/sqllogictest/test_files/order.slt | 133 +++++++++++++++++++ 1 file changed, 133 insertions(+) diff --git a/datafusion/sqllogictest/test_files/order.slt b/datafusion/sqllogictest/test_files/order.slt index 7bb872e5a48f..62faf5c55c53 100644 --- a/datafusion/sqllogictest/test_files/order.slt +++ b/datafusion/sqllogictest/test_files/order.slt @@ -1196,3 +1196,136 @@ physical_plan 02)--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 03)----SortExec: TopK(fetch=1), expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false] 04)------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], has_header=true + + +#################### +# Test issue: TBD +#################### + +# all results +query II +SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc; +---- +3 25 +2 25 +1 0 +0 0 + +# limit only +query II +SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc LIMIT 3; +---- +3 25 +2 25 +1 0 + +# offset only +query II +SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc OFFSET 1; +---- +2 25 +1 0 +0 0 + +# TODO: fix this to properly apply offset +# offset + limit +query II +SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc OFFSET 1 LIMIT 2; +---- +3 25 +2 25 +1 0 + +# TODO: fix this to not remove the skip=1 during the limit pushdown +# Applying offset & limit when multiple streams from groupby +query TT +EXPLAIN SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc OFFSET 1 LIMIT 2; +---- +logical_plan +01)Limit: skip=1, fetch=2 +02)--Sort: ordered_table.b DESC NULLS FIRST, fetch=3 +03)----Aggregate: groupBy=[[ordered_table.b]], aggr=[[sum(CAST(ordered_table.a AS Int64))]] +04)------TableScan: ordered_table projection=[a, b] +physical_plan +01)SortPreservingMergeExec: [b@0 DESC], fetch=3 +02)--SortExec: TopK(fetch=3), expr=[b@0 DESC], preserve_partitioning=[true] +03)----AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[sum(ordered_table.a)] +04)------CoalesceBatchesExec: target_batch_size=8192 +05)--------RepartitionExec: partitioning=Hash([b@0], 2), input_partitions=2 +06)----------AggregateExec: mode=Partial, gby=[b@1 as b], aggr=[sum(ordered_table.a)] +07)------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +08)--------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], has_header=true + +# TODO: fix this to not remove the skip=4 during the limit pushdown +# Applying offset & limit when multiple streams from union +query TT +explain select * FROM ( + select c FROM ordered_table + UNION ALL + select d FROM ordered_table +) order by 1 desc LIMIT 10 OFFSET 4; +---- +logical_plan +01)Limit: skip=4, fetch=10 +02)--Sort: ordered_table.c DESC NULLS FIRST, fetch=14 +03)----Union +04)------Projection: CAST(ordered_table.c AS Int64) AS c +05)--------TableScan: ordered_table projection=[c] +06)------Projection: CAST(ordered_table.d AS Int64) AS c +07)--------TableScan: ordered_table projection=[d] +physical_plan +01)SortPreservingMergeExec: [c@0 DESC], fetch=14 +02)--UnionExec +03)----SortExec: TopK(fetch=14), expr=[c@0 DESC], preserve_partitioning=[true] +04)------ProjectionExec: expr=[CAST(c@0 AS Int64) as c] +05)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +06)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], output_ordering=[c@0 ASC NULLS LAST], has_header=true +07)----SortExec: TopK(fetch=14), expr=[c@0 DESC], preserve_partitioning=[true] +08)------ProjectionExec: expr=[CAST(d@0 AS Int64) as c] +09)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +10)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[d], has_header=true + +# ApplyingmLIMIT & OFFSET to subquery. +query III +select t1.b, c, c2 FROM ( + select b, c FROM ordered_table ORDER BY b desc, c desc OFFSET 1 LIMIT 4 +) as t1 INNER JOIN ( + select b, c as c2 FROM ordered_table ORDER BY b desc, d desc OFFSET 1 LIMIT 4 +) as t2 +ON t1.b = t2.b +ORDER BY t1.b desc, c desc, c2 desc; +---- +3 98 96 +3 98 89 +3 98 82 +3 98 79 +3 97 96 +3 97 89 +3 97 82 +3 97 79 +3 96 96 +3 96 89 +3 96 82 +3 96 79 +3 95 96 +3 95 89 +3 95 82 +3 95 79 + +# TODO: fix this does not correctly work. +# Apply OFFSET & LIMIT to both parent and child (subquery). +query III +select t1.b, c, c2 FROM ( + select b, c FROM ordered_table ORDER BY b desc, c desc OFFSET 1 LIMIT 4 +) as t1 INNER JOIN ( + select b, c as c2 FROM ordered_table ORDER BY b desc, d desc OFFSET 1 LIMIT 4 +) as t2 +ON t1.b = t2.b +ORDER BY t1.b desc, c desc, c2 desc +OFFSET 3 LIMIT 2; +---- +3 99 96 +3 99 89 +3 99 87 +3 99 82 +3 99 79 From 2fcad95240f71b826acef7748e308b2f1bbf1b72 Mon Sep 17 00:00:00 2001 From: wiedld Date: Mon, 9 Sep 2024 14:08:42 -0700 Subject: [PATCH 2/8] fix: do not pushdown when skip is applied --- datafusion/physical-optimizer/src/limit_pushdown.rs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-optimizer/src/limit_pushdown.rs b/datafusion/physical-optimizer/src/limit_pushdown.rs index 7f45292f9e27..776c145cbc2c 100644 --- a/datafusion/physical-optimizer/src/limit_pushdown.rs +++ b/datafusion/physical-optimizer/src/limit_pushdown.rs @@ -126,10 +126,14 @@ impl From for Arc { /// The helper takes an `ExecutionPlan` and a global (algorithm) state which is /// an instance of `GlobalRequirements` and modifies these parameters while /// checking if the limits can be pushed down or not. +/// +/// If a limit is encountered, a [`TreeNodeRecursion::Stop`] is returned. Otherwise, +/// return a [`TreeNodeRecursion::Continue`]. pub fn pushdown_limit_helper( mut pushdown_plan: Arc, mut global_state: GlobalRequirements, ) -> Result<(Transformed>, GlobalRequirements)> { + // Extract limit, if exist, and return child inputs. if let Some(limit_exec) = extract_limit(&pushdown_plan) { // If we have fetch/skip info in the global state already, we need to // decide which one to continue with: @@ -190,7 +194,8 @@ pub fn pushdown_limit_helper( let skip_and_fetch = Some(global_fetch + global_state.skip); - if pushdown_plan.supports_limit_pushdown() { + let global_skip = global_state.skip; + if global_skip == 0 && pushdown_plan.supports_limit_pushdown() { if !combines_input_partitions(&pushdown_plan) { // We have information in the global state and the plan pushes down, // continue: @@ -223,7 +228,6 @@ pub fn pushdown_limit_helper( // to add the fetch info and return the plan. // There's no push down, change fetch & skip to default values: - let global_skip = global_state.skip; global_state.fetch = None; global_state.skip = 0; @@ -256,13 +260,16 @@ pub(crate) fn pushdown_limits( pushdown_plan: Arc, global_state: GlobalRequirements, ) -> Result> { + // Apply limit push down let (mut new_node, mut global_state) = pushdown_limit_helper(pushdown_plan, global_state)?; + // While limits exist, continue combining the global_state. while new_node.tnr == TreeNodeRecursion::Stop { (new_node, global_state) = pushdown_limit_helper(new_node.data, global_state)?; } + // Pushdown limits in children let children = new_node.data.children(); let new_children = children .into_iter() @@ -270,7 +277,6 @@ pub(crate) fn pushdown_limits( pushdown_limits(Arc::::clone(child), global_state.clone()) }) .collect::>()?; - new_node.data.with_new_children(new_children) } From 30b21cabce4dbafcc7037d1e6a72fc48cd7ccde4 Mon Sep 17 00:00:00 2001 From: wiedld Date: Mon, 9 Sep 2024 14:10:04 -0700 Subject: [PATCH 3/8] test: update tests after fix --- datafusion/sqllogictest/test_files/order.slt | 48 +++++++++----------- 1 file changed, 21 insertions(+), 27 deletions(-) diff --git a/datafusion/sqllogictest/test_files/order.slt b/datafusion/sqllogictest/test_files/order.slt index 62faf5c55c53..1b8c79494dd8 100644 --- a/datafusion/sqllogictest/test_files/order.slt +++ b/datafusion/sqllogictest/test_files/order.slt @@ -1227,16 +1227,13 @@ SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc OFFSET 1; 1 0 0 0 -# TODO: fix this to properly apply offset # offset + limit query II SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc OFFSET 1 LIMIT 2; ---- -3 25 2 25 1 0 -# TODO: fix this to not remove the skip=1 during the limit pushdown # Applying offset & limit when multiple streams from groupby query TT EXPLAIN SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc OFFSET 1 LIMIT 2; @@ -1247,16 +1244,16 @@ logical_plan 03)----Aggregate: groupBy=[[ordered_table.b]], aggr=[[sum(CAST(ordered_table.a AS Int64))]] 04)------TableScan: ordered_table projection=[a, b] physical_plan -01)SortPreservingMergeExec: [b@0 DESC], fetch=3 -02)--SortExec: TopK(fetch=3), expr=[b@0 DESC], preserve_partitioning=[true] -03)----AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[sum(ordered_table.a)] -04)------CoalesceBatchesExec: target_batch_size=8192 -05)--------RepartitionExec: partitioning=Hash([b@0], 2), input_partitions=2 -06)----------AggregateExec: mode=Partial, gby=[b@1 as b], aggr=[sum(ordered_table.a)] -07)------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 -08)--------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], has_header=true - -# TODO: fix this to not remove the skip=4 during the limit pushdown +01)GlobalLimitExec: skip=1, fetch=2 +02)--SortPreservingMergeExec: [b@0 DESC], fetch=3 +03)----SortExec: TopK(fetch=3), expr=[b@0 DESC], preserve_partitioning=[true] +04)------AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[sum(ordered_table.a)] +05)--------CoalesceBatchesExec: target_batch_size=8192 +06)----------RepartitionExec: partitioning=Hash([b@0], 2), input_partitions=2 +07)------------AggregateExec: mode=Partial, gby=[b@1 as b], aggr=[sum(ordered_table.a)] +08)--------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +09)----------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], has_header=true + # Applying offset & limit when multiple streams from union query TT explain select * FROM ( @@ -1274,16 +1271,17 @@ logical_plan 06)------Projection: CAST(ordered_table.d AS Int64) AS c 07)--------TableScan: ordered_table projection=[d] physical_plan -01)SortPreservingMergeExec: [c@0 DESC], fetch=14 -02)--UnionExec -03)----SortExec: TopK(fetch=14), expr=[c@0 DESC], preserve_partitioning=[true] -04)------ProjectionExec: expr=[CAST(c@0 AS Int64) as c] -05)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 -06)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], output_ordering=[c@0 ASC NULLS LAST], has_header=true -07)----SortExec: TopK(fetch=14), expr=[c@0 DESC], preserve_partitioning=[true] -08)------ProjectionExec: expr=[CAST(d@0 AS Int64) as c] -09)--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 -10)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[d], has_header=true +01)GlobalLimitExec: skip=4, fetch=10 +02)--SortPreservingMergeExec: [c@0 DESC], fetch=14 +03)----UnionExec +04)------SortExec: TopK(fetch=14), expr=[c@0 DESC], preserve_partitioning=[true] +05)--------ProjectionExec: expr=[CAST(c@0 AS Int64) as c] +06)----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +07)------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], output_ordering=[c@0 ASC NULLS LAST], has_header=true +08)------SortExec: TopK(fetch=14), expr=[c@0 DESC], preserve_partitioning=[true] +09)--------ProjectionExec: expr=[CAST(d@0 AS Int64) as c] +10)----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +11)------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[d], has_header=true # ApplyingmLIMIT & OFFSET to subquery. query III @@ -1312,7 +1310,6 @@ ORDER BY t1.b desc, c desc, c2 desc; 3 95 82 3 95 79 -# TODO: fix this does not correctly work. # Apply OFFSET & LIMIT to both parent and child (subquery). query III select t1.b, c, c2 FROM ( @@ -1324,8 +1321,5 @@ ON t1.b = t2.b ORDER BY t1.b desc, c desc, c2 desc OFFSET 3 LIMIT 2; ---- -3 99 96 -3 99 89 -3 99 87 3 99 82 3 99 79 From 30145cffbaa97e65f7c33cc675c6b8fcc5bf5337 Mon Sep 17 00:00:00 2001 From: wiedld Date: Mon, 9 Sep 2024 14:19:34 -0700 Subject: [PATCH 4/8] chore: more doc cleanup --- datafusion/physical-optimizer/src/limit_pushdown.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-optimizer/src/limit_pushdown.rs b/datafusion/physical-optimizer/src/limit_pushdown.rs index 776c145cbc2c..d0e035eca075 100644 --- a/datafusion/physical-optimizer/src/limit_pushdown.rs +++ b/datafusion/physical-optimizer/src/limit_pushdown.rs @@ -260,7 +260,8 @@ pub(crate) fn pushdown_limits( pushdown_plan: Arc, global_state: GlobalRequirements, ) -> Result> { - // Apply limit push down + // Call pushdown_limit_helper. + // This will either extract the limit node (returning the child), or apply the limit pushdown. let (mut new_node, mut global_state) = pushdown_limit_helper(pushdown_plan, global_state)?; @@ -269,7 +270,7 @@ pub(crate) fn pushdown_limits( (new_node, global_state) = pushdown_limit_helper(new_node.data, global_state)?; } - // Pushdown limits in children + // Apply pushdown limits in children let children = new_node.data.children(); let new_children = children .into_iter() From 34b94f09dd06a7b2414fd0ce283107ee40e15edc Mon Sep 17 00:00:00 2001 From: wiedld Date: Mon, 9 Sep 2024 14:58:38 -0700 Subject: [PATCH 5/8] chore: move LIMIT+OFFSET tests to proper sqllogic test case --- datafusion/sqllogictest/test_files/limit.slt | 143 +++++++++++++++++++ datafusion/sqllogictest/test_files/order.slt | 127 ---------------- 2 files changed, 143 insertions(+), 127 deletions(-) diff --git a/datafusion/sqllogictest/test_files/limit.slt b/datafusion/sqllogictest/test_files/limit.slt index 17bd398bd229..bbed3ce4ad26 100644 --- a/datafusion/sqllogictest/test_files/limit.slt +++ b/datafusion/sqllogictest/test_files/limit.slt @@ -565,3 +565,146 @@ physical_plan statement ok drop table data; + + +#################### +# Test issue: limit pushdown with offsets +#################### + +statement ok +CREATE EXTERNAL TABLE ordered_table ( + a0 INT, + a INT, + b INT, + c INT UNSIGNED, + d INT +) +STORED AS CSV +WITH ORDER (c ASC) +LOCATION '../core/tests/data/window_2.csv' +OPTIONS ('format.has_header' 'true'); + +# all results +query II +SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc; +---- +3 25 +2 25 +1 0 +0 0 + +# limit only +query II +SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc LIMIT 3; +---- +3 25 +2 25 +1 0 + +# offset only +query II +SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc OFFSET 1; +---- +2 25 +1 0 +0 0 + +# offset + limit +query II +SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc OFFSET 1 LIMIT 2; +---- +2 25 +1 0 + +# Applying offset & limit when multiple streams from groupby +query TT +EXPLAIN SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc OFFSET 1 LIMIT 2; +---- +logical_plan +01)Limit: skip=1, fetch=2 +02)--Sort: ordered_table.b DESC NULLS FIRST, fetch=3 +03)----Aggregate: groupBy=[[ordered_table.b]], aggr=[[sum(CAST(ordered_table.a AS Int64))]] +04)------TableScan: ordered_table projection=[a, b] +physical_plan +01)GlobalLimitExec: skip=1, fetch=2 +02)--SortPreservingMergeExec: [b@0 DESC], fetch=3 +03)----SortExec: TopK(fetch=3), expr=[b@0 DESC], preserve_partitioning=[true] +04)------AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[sum(ordered_table.a)] +05)--------CoalesceBatchesExec: target_batch_size=8192 +06)----------RepartitionExec: partitioning=Hash([b@0], 4), input_partitions=4 +07)------------AggregateExec: mode=Partial, gby=[b@1 as b], aggr=[sum(ordered_table.a)] +08)--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +09)----------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], has_header=true + +# Applying offset & limit when multiple streams from union +query TT +explain select * FROM ( + select c FROM ordered_table + UNION ALL + select d FROM ordered_table +) order by 1 desc LIMIT 10 OFFSET 4; +---- +logical_plan +01)Limit: skip=4, fetch=10 +02)--Sort: ordered_table.c DESC NULLS FIRST, fetch=14 +03)----Union +04)------Projection: CAST(ordered_table.c AS Int64) AS c +05)--------TableScan: ordered_table projection=[c] +06)------Projection: CAST(ordered_table.d AS Int64) AS c +07)--------TableScan: ordered_table projection=[d] +physical_plan +01)GlobalLimitExec: skip=4, fetch=10 +02)--SortPreservingMergeExec: [c@0 DESC], fetch=14 +03)----UnionExec +04)------SortExec: TopK(fetch=14), expr=[c@0 DESC], preserve_partitioning=[true] +05)--------ProjectionExec: expr=[CAST(c@0 AS Int64) as c] +06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +07)------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], output_ordering=[c@0 ASC NULLS LAST], has_header=true +08)------SortExec: TopK(fetch=14), expr=[c@0 DESC], preserve_partitioning=[true] +09)--------ProjectionExec: expr=[CAST(d@0 AS Int64) as c] +10)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +11)------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[d], has_header=true + +# ApplyingmLIMIT & OFFSET to subquery. +query III +select t1.b, c, c2 FROM ( + select b, c FROM ordered_table ORDER BY b desc, c desc OFFSET 1 LIMIT 4 +) as t1 INNER JOIN ( + select b, c as c2 FROM ordered_table ORDER BY b desc, d desc OFFSET 1 LIMIT 4 +) as t2 +ON t1.b = t2.b +ORDER BY t1.b desc, c desc, c2 desc; +---- +3 98 96 +3 98 89 +3 98 82 +3 98 79 +3 97 96 +3 97 89 +3 97 82 +3 97 79 +3 96 96 +3 96 89 +3 96 82 +3 96 79 +3 95 96 +3 95 89 +3 95 82 +3 95 79 + +# Apply OFFSET & LIMIT to both parent and child (subquery). +query III +select t1.b, c, c2 FROM ( + select b, c FROM ordered_table ORDER BY b desc, c desc OFFSET 1 LIMIT 4 +) as t1 INNER JOIN ( + select b, c as c2 FROM ordered_table ORDER BY b desc, d desc OFFSET 1 LIMIT 4 +) as t2 +ON t1.b = t2.b +ORDER BY t1.b desc, c desc, c2 desc +OFFSET 3 LIMIT 2; +---- +3 99 82 +3 99 79 + +statement ok +drop table ordered_table; diff --git a/datafusion/sqllogictest/test_files/order.slt b/datafusion/sqllogictest/test_files/order.slt index 1b8c79494dd8..7bb872e5a48f 100644 --- a/datafusion/sqllogictest/test_files/order.slt +++ b/datafusion/sqllogictest/test_files/order.slt @@ -1196,130 +1196,3 @@ physical_plan 02)--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 03)----SortExec: TopK(fetch=1), expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false] 04)------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], has_header=true - - -#################### -# Test issue: TBD -#################### - -# all results -query II -SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc; ----- -3 25 -2 25 -1 0 -0 0 - -# limit only -query II -SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc LIMIT 3; ----- -3 25 -2 25 -1 0 - -# offset only -query II -SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc OFFSET 1; ----- -2 25 -1 0 -0 0 - -# offset + limit -query II -SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc OFFSET 1 LIMIT 2; ----- -2 25 -1 0 - -# Applying offset & limit when multiple streams from groupby -query TT -EXPLAIN SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc OFFSET 1 LIMIT 2; ----- -logical_plan -01)Limit: skip=1, fetch=2 -02)--Sort: ordered_table.b DESC NULLS FIRST, fetch=3 -03)----Aggregate: groupBy=[[ordered_table.b]], aggr=[[sum(CAST(ordered_table.a AS Int64))]] -04)------TableScan: ordered_table projection=[a, b] -physical_plan -01)GlobalLimitExec: skip=1, fetch=2 -02)--SortPreservingMergeExec: [b@0 DESC], fetch=3 -03)----SortExec: TopK(fetch=3), expr=[b@0 DESC], preserve_partitioning=[true] -04)------AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[sum(ordered_table.a)] -05)--------CoalesceBatchesExec: target_batch_size=8192 -06)----------RepartitionExec: partitioning=Hash([b@0], 2), input_partitions=2 -07)------------AggregateExec: mode=Partial, gby=[b@1 as b], aggr=[sum(ordered_table.a)] -08)--------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 -09)----------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], has_header=true - -# Applying offset & limit when multiple streams from union -query TT -explain select * FROM ( - select c FROM ordered_table - UNION ALL - select d FROM ordered_table -) order by 1 desc LIMIT 10 OFFSET 4; ----- -logical_plan -01)Limit: skip=4, fetch=10 -02)--Sort: ordered_table.c DESC NULLS FIRST, fetch=14 -03)----Union -04)------Projection: CAST(ordered_table.c AS Int64) AS c -05)--------TableScan: ordered_table projection=[c] -06)------Projection: CAST(ordered_table.d AS Int64) AS c -07)--------TableScan: ordered_table projection=[d] -physical_plan -01)GlobalLimitExec: skip=4, fetch=10 -02)--SortPreservingMergeExec: [c@0 DESC], fetch=14 -03)----UnionExec -04)------SortExec: TopK(fetch=14), expr=[c@0 DESC], preserve_partitioning=[true] -05)--------ProjectionExec: expr=[CAST(c@0 AS Int64) as c] -06)----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 -07)------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], output_ordering=[c@0 ASC NULLS LAST], has_header=true -08)------SortExec: TopK(fetch=14), expr=[c@0 DESC], preserve_partitioning=[true] -09)--------ProjectionExec: expr=[CAST(d@0 AS Int64) as c] -10)----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 -11)------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[d], has_header=true - -# ApplyingmLIMIT & OFFSET to subquery. -query III -select t1.b, c, c2 FROM ( - select b, c FROM ordered_table ORDER BY b desc, c desc OFFSET 1 LIMIT 4 -) as t1 INNER JOIN ( - select b, c as c2 FROM ordered_table ORDER BY b desc, d desc OFFSET 1 LIMIT 4 -) as t2 -ON t1.b = t2.b -ORDER BY t1.b desc, c desc, c2 desc; ----- -3 98 96 -3 98 89 -3 98 82 -3 98 79 -3 97 96 -3 97 89 -3 97 82 -3 97 79 -3 96 96 -3 96 89 -3 96 82 -3 96 79 -3 95 96 -3 95 89 -3 95 82 -3 95 79 - -# Apply OFFSET & LIMIT to both parent and child (subquery). -query III -select t1.b, c, c2 FROM ( - select b, c FROM ordered_table ORDER BY b desc, c desc OFFSET 1 LIMIT 4 -) as t1 INNER JOIN ( - select b, c as c2 FROM ordered_table ORDER BY b desc, d desc OFFSET 1 LIMIT 4 -) as t2 -ON t1.b = t2.b -ORDER BY t1.b desc, c desc, c2 desc -OFFSET 3 LIMIT 2; ----- -3 99 82 -3 99 79 From 570590c5efa5267feafdd95a34af8dbf81cc0a77 Mon Sep 17 00:00:00 2001 From: wiedld Date: Tue, 10 Sep 2024 08:26:17 -0700 Subject: [PATCH 6/8] refactor: add global limit back (if there is a skip) during limit pushdown --- datafusion/physical-optimizer/src/limit_pushdown.rs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-optimizer/src/limit_pushdown.rs b/datafusion/physical-optimizer/src/limit_pushdown.rs index d0e035eca075..d97b8a0fd5e2 100644 --- a/datafusion/physical-optimizer/src/limit_pushdown.rs +++ b/datafusion/physical-optimizer/src/limit_pushdown.rs @@ -194,8 +194,7 @@ pub fn pushdown_limit_helper( let skip_and_fetch = Some(global_fetch + global_state.skip); - let global_skip = global_state.skip; - if global_skip == 0 && pushdown_plan.supports_limit_pushdown() { + if pushdown_plan.supports_limit_pushdown() { if !combines_input_partitions(&pushdown_plan) { // We have information in the global state and the plan pushes down, // continue: @@ -204,10 +203,15 @@ pub fn pushdown_limit_helper( // This plan is combining input partitions, so we need to add the // fetch info to plan if possible. If not, we must add a `LimitExec` // with the information from the global state. + let mut new_plan = plan_with_fetch; + if global_state.skip > 0 { + new_plan = + add_global_limit(new_plan, global_state.skip, global_state.fetch); + } global_state.fetch = skip_and_fetch; global_state.skip = 0; global_state.satisfied = true; - Ok((Transformed::yes(plan_with_fetch), global_state)) + Ok((Transformed::yes(new_plan), global_state)) } else if global_state.satisfied { // If the plan is already satisfied, do not add a limit: Ok((Transformed::no(pushdown_plan), global_state)) @@ -228,6 +232,7 @@ pub fn pushdown_limit_helper( // to add the fetch info and return the plan. // There's no push down, change fetch & skip to default values: + let global_skip = global_state.skip; global_state.fetch = None; global_state.skip = 0; From 0b3053b0f84cc0935d27d4a59932ce32a2176ff6 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 11 Sep 2024 10:02:30 -0400 Subject: [PATCH 7/8] Apply suggestions from code review --- datafusion/sqllogictest/test_files/limit.slt | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/datafusion/sqllogictest/test_files/limit.slt b/datafusion/sqllogictest/test_files/limit.slt index bbed3ce4ad26..5b98392f1aa0 100644 --- a/datafusion/sqllogictest/test_files/limit.slt +++ b/datafusion/sqllogictest/test_files/limit.slt @@ -569,6 +569,7 @@ drop table data; #################### # Test issue: limit pushdown with offsets +# Ensure the offset is not lost: https://github.com/apache/datafusion/issues/12423 #################### statement ok @@ -617,6 +618,7 @@ SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc OFFSET 1 LIMIT 2; 1 0 # Applying offset & limit when multiple streams from groupby +# the plan must still have a global limit to apply the offset query TT EXPLAIN SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc OFFSET 1 LIMIT 2; ---- @@ -637,6 +639,7 @@ physical_plan 09)----------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], has_header=true # Applying offset & limit when multiple streams from union +# the plan must still have a global limit to apply the offset query TT explain select * FROM ( select c FROM ordered_table @@ -665,7 +668,7 @@ physical_plan 10)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 11)------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[d], has_header=true -# ApplyingmLIMIT & OFFSET to subquery. +# Applying LIMIT & OFFSET to subquery. query III select t1.b, c, c2 FROM ( select b, c FROM ordered_table ORDER BY b desc, c desc OFFSET 1 LIMIT 4 From cb7ded27033d155a9d972bc2ac040e67f8ab05cf Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 11 Sep 2024 10:10:03 -0400 Subject: [PATCH 8/8] Add comment explaining why --- datafusion/physical-optimizer/src/limit_pushdown.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/physical-optimizer/src/limit_pushdown.rs b/datafusion/physical-optimizer/src/limit_pushdown.rs index d97b8a0fd5e2..15d210e1b10b 100644 --- a/datafusion/physical-optimizer/src/limit_pushdown.rs +++ b/datafusion/physical-optimizer/src/limit_pushdown.rs @@ -204,6 +204,8 @@ pub fn pushdown_limit_helper( // fetch info to plan if possible. If not, we must add a `LimitExec` // with the information from the global state. let mut new_plan = plan_with_fetch; + // Execution plans can't (yet) handle skip, so if we have one, + // we still need to add a global limit if global_state.skip > 0 { new_plan = add_global_limit(new_plan, global_state.skip, global_state.fetch);