Skip to content

Commit

Permalink
chore: move LIMIT+OFFSET tests to proper sqllogic test case
Browse files Browse the repository at this point in the history
  • Loading branch information
wiedld committed Sep 9, 2024
1 parent 30145cf commit 34b94f0
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 127 deletions.
143 changes: 143 additions & 0 deletions datafusion/sqllogictest/test_files/limit.slt
Original file line number Diff line number Diff line change
Expand Up @@ -565,3 +565,146 @@ physical_plan

statement ok
drop table data;


####################
# Test issue: limit pushdown with offsets
####################

statement ok
CREATE EXTERNAL TABLE ordered_table (
a0 INT,
a INT,
b INT,
c INT UNSIGNED,
d INT
)
STORED AS CSV
WITH ORDER (c ASC)
LOCATION '../core/tests/data/window_2.csv'
OPTIONS ('format.has_header' 'true');

# all results
query II
SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc;
----
3 25
2 25
1 0
0 0

# limit only
query II
SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc LIMIT 3;
----
3 25
2 25
1 0

# offset only
query II
SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc OFFSET 1;
----
2 25
1 0
0 0

# offset + limit
query II
SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc OFFSET 1 LIMIT 2;
----
2 25
1 0

# Applying offset & limit when multiple streams from groupby
query TT
EXPLAIN SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc OFFSET 1 LIMIT 2;
----
logical_plan
01)Limit: skip=1, fetch=2
02)--Sort: ordered_table.b DESC NULLS FIRST, fetch=3
03)----Aggregate: groupBy=[[ordered_table.b]], aggr=[[sum(CAST(ordered_table.a AS Int64))]]
04)------TableScan: ordered_table projection=[a, b]
physical_plan
01)GlobalLimitExec: skip=1, fetch=2
02)--SortPreservingMergeExec: [b@0 DESC], fetch=3
03)----SortExec: TopK(fetch=3), expr=[b@0 DESC], preserve_partitioning=[true]
04)------AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[sum(ordered_table.a)]
05)--------CoalesceBatchesExec: target_batch_size=8192
06)----------RepartitionExec: partitioning=Hash([b@0], 4), input_partitions=4
07)------------AggregateExec: mode=Partial, gby=[b@1 as b], aggr=[sum(ordered_table.a)]
08)--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
09)----------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], has_header=true

# Applying offset & limit when multiple streams from union
query TT
explain select * FROM (
select c FROM ordered_table
UNION ALL
select d FROM ordered_table
) order by 1 desc LIMIT 10 OFFSET 4;
----
logical_plan
01)Limit: skip=4, fetch=10
02)--Sort: ordered_table.c DESC NULLS FIRST, fetch=14
03)----Union
04)------Projection: CAST(ordered_table.c AS Int64) AS c
05)--------TableScan: ordered_table projection=[c]
06)------Projection: CAST(ordered_table.d AS Int64) AS c
07)--------TableScan: ordered_table projection=[d]
physical_plan
01)GlobalLimitExec: skip=4, fetch=10
02)--SortPreservingMergeExec: [c@0 DESC], fetch=14
03)----UnionExec
04)------SortExec: TopK(fetch=14), expr=[c@0 DESC], preserve_partitioning=[true]
05)--------ProjectionExec: expr=[CAST(c@0 AS Int64) as c]
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
07)------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], output_ordering=[c@0 ASC NULLS LAST], has_header=true
08)------SortExec: TopK(fetch=14), expr=[c@0 DESC], preserve_partitioning=[true]
09)--------ProjectionExec: expr=[CAST(d@0 AS Int64) as c]
10)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
11)------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[d], has_header=true

# ApplyingmLIMIT & OFFSET to subquery.
query III
select t1.b, c, c2 FROM (
select b, c FROM ordered_table ORDER BY b desc, c desc OFFSET 1 LIMIT 4
) as t1 INNER JOIN (
select b, c as c2 FROM ordered_table ORDER BY b desc, d desc OFFSET 1 LIMIT 4
) as t2
ON t1.b = t2.b
ORDER BY t1.b desc, c desc, c2 desc;
----
3 98 96
3 98 89
3 98 82
3 98 79
3 97 96
3 97 89
3 97 82
3 97 79
3 96 96
3 96 89
3 96 82
3 96 79
3 95 96
3 95 89
3 95 82
3 95 79

# Apply OFFSET & LIMIT to both parent and child (subquery).
query III
select t1.b, c, c2 FROM (
select b, c FROM ordered_table ORDER BY b desc, c desc OFFSET 1 LIMIT 4
) as t1 INNER JOIN (
select b, c as c2 FROM ordered_table ORDER BY b desc, d desc OFFSET 1 LIMIT 4
) as t2
ON t1.b = t2.b
ORDER BY t1.b desc, c desc, c2 desc
OFFSET 3 LIMIT 2;
----
3 99 82
3 99 79

statement ok
drop table ordered_table;
127 changes: 0 additions & 127 deletions datafusion/sqllogictest/test_files/order.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1196,130 +1196,3 @@ physical_plan
02)--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
03)----SortExec: TopK(fetch=1), expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false]
04)------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], has_header=true


####################
# Test issue: TBD
####################

# all results
query II
SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc;
----
3 25
2 25
1 0
0 0

# limit only
query II
SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc LIMIT 3;
----
3 25
2 25
1 0

# offset only
query II
SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc OFFSET 1;
----
2 25
1 0
0 0

# offset + limit
query II
SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc OFFSET 1 LIMIT 2;
----
2 25
1 0

# Applying offset & limit when multiple streams from groupby
query TT
EXPLAIN SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc OFFSET 1 LIMIT 2;
----
logical_plan
01)Limit: skip=1, fetch=2
02)--Sort: ordered_table.b DESC NULLS FIRST, fetch=3
03)----Aggregate: groupBy=[[ordered_table.b]], aggr=[[sum(CAST(ordered_table.a AS Int64))]]
04)------TableScan: ordered_table projection=[a, b]
physical_plan
01)GlobalLimitExec: skip=1, fetch=2
02)--SortPreservingMergeExec: [b@0 DESC], fetch=3
03)----SortExec: TopK(fetch=3), expr=[b@0 DESC], preserve_partitioning=[true]
04)------AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[sum(ordered_table.a)]
05)--------CoalesceBatchesExec: target_batch_size=8192
06)----------RepartitionExec: partitioning=Hash([b@0], 2), input_partitions=2
07)------------AggregateExec: mode=Partial, gby=[b@1 as b], aggr=[sum(ordered_table.a)]
08)--------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
09)----------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], has_header=true

# Applying offset & limit when multiple streams from union
query TT
explain select * FROM (
select c FROM ordered_table
UNION ALL
select d FROM ordered_table
) order by 1 desc LIMIT 10 OFFSET 4;
----
logical_plan
01)Limit: skip=4, fetch=10
02)--Sort: ordered_table.c DESC NULLS FIRST, fetch=14
03)----Union
04)------Projection: CAST(ordered_table.c AS Int64) AS c
05)--------TableScan: ordered_table projection=[c]
06)------Projection: CAST(ordered_table.d AS Int64) AS c
07)--------TableScan: ordered_table projection=[d]
physical_plan
01)GlobalLimitExec: skip=4, fetch=10
02)--SortPreservingMergeExec: [c@0 DESC], fetch=14
03)----UnionExec
04)------SortExec: TopK(fetch=14), expr=[c@0 DESC], preserve_partitioning=[true]
05)--------ProjectionExec: expr=[CAST(c@0 AS Int64) as c]
06)----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
07)------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], output_ordering=[c@0 ASC NULLS LAST], has_header=true
08)------SortExec: TopK(fetch=14), expr=[c@0 DESC], preserve_partitioning=[true]
09)--------ProjectionExec: expr=[CAST(d@0 AS Int64) as c]
10)----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
11)------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[d], has_header=true

# ApplyingmLIMIT & OFFSET to subquery.
query III
select t1.b, c, c2 FROM (
select b, c FROM ordered_table ORDER BY b desc, c desc OFFSET 1 LIMIT 4
) as t1 INNER JOIN (
select b, c as c2 FROM ordered_table ORDER BY b desc, d desc OFFSET 1 LIMIT 4
) as t2
ON t1.b = t2.b
ORDER BY t1.b desc, c desc, c2 desc;
----
3 98 96
3 98 89
3 98 82
3 98 79
3 97 96
3 97 89
3 97 82
3 97 79
3 96 96
3 96 89
3 96 82
3 96 79
3 95 96
3 95 89
3 95 82
3 95 79

# Apply OFFSET & LIMIT to both parent and child (subquery).
query III
select t1.b, c, c2 FROM (
select b, c FROM ordered_table ORDER BY b desc, c desc OFFSET 1 LIMIT 4
) as t1 INNER JOIN (
select b, c as c2 FROM ordered_table ORDER BY b desc, d desc OFFSET 1 LIMIT 4
) as t2
ON t1.b = t2.b
ORDER BY t1.b desc, c desc, c2 desc
OFFSET 3 LIMIT 2;
----
3 99 82
3 99 79

0 comments on commit 34b94f0

Please sign in to comment.