Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix incorrect OFFSET during LIMIT pushdown. #12399

Merged
16 changes: 14 additions & 2 deletions datafusion/physical-optimizer/src/limit_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,14 @@ impl From<LimitExec> for Arc<dyn ExecutionPlan> {
/// The helper takes an `ExecutionPlan` and a global (algorithm) state which is
/// an instance of `GlobalRequirements` and modifies these parameters while
/// checking if the limits can be pushed down or not.
///
/// If a limit is encountered, a [`TreeNodeRecursion::Stop`] is returned. Otherwise,
/// return a [`TreeNodeRecursion::Continue`].
pub fn pushdown_limit_helper(
mut pushdown_plan: Arc<dyn ExecutionPlan>,
mut global_state: GlobalRequirements,
) -> Result<(Transformed<Arc<dyn ExecutionPlan>>, GlobalRequirements)> {
// Extract limit, if exist, and return child inputs.
if let Some(limit_exec) = extract_limit(&pushdown_plan) {
// If we have fetch/skip info in the global state already, we need to
// decide which one to continue with:
Expand Down Expand Up @@ -199,10 +203,15 @@ pub fn pushdown_limit_helper(
// This plan is combining input partitions, so we need to add the
// fetch info to plan if possible. If not, we must add a `LimitExec`
// with the information from the global state.
let mut new_plan = plan_with_fetch;
if global_state.skip > 0 {
alamb marked this conversation as resolved.
Show resolved Hide resolved
new_plan =
add_global_limit(new_plan, global_state.skip, global_state.fetch);
}
global_state.fetch = skip_and_fetch;
global_state.skip = 0;
global_state.satisfied = true;
Ok((Transformed::yes(plan_with_fetch), global_state))
Ok((Transformed::yes(new_plan), global_state))
} else if global_state.satisfied {
// If the plan is already satisfied, do not add a limit:
Ok((Transformed::no(pushdown_plan), global_state))
Expand Down Expand Up @@ -256,21 +265,24 @@ pub(crate) fn pushdown_limits(
pushdown_plan: Arc<dyn ExecutionPlan>,
global_state: GlobalRequirements,
) -> Result<Arc<dyn ExecutionPlan>> {
// Call pushdown_limit_helper.
// This will either extract the limit node (returning the child), or apply the limit pushdown.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this might be a good comment to add to the pushdown_limit_helper function as well

let (mut new_node, mut global_state) =
pushdown_limit_helper(pushdown_plan, global_state)?;

// While limits exist, continue combining the global_state.
while new_node.tnr == TreeNodeRecursion::Stop {
(new_node, global_state) = pushdown_limit_helper(new_node.data, global_state)?;
}

// Apply pushdown limits in children
let children = new_node.data.children();
let new_children = children
.into_iter()
.map(|child| {
pushdown_limits(Arc::<dyn ExecutionPlan>::clone(child), global_state.clone())
})
.collect::<Result<_>>()?;

new_node.data.with_new_children(new_children)
}

Expand Down
143 changes: 143 additions & 0 deletions datafusion/sqllogictest/test_files/limit.slt
Original file line number Diff line number Diff line change
Expand Up @@ -565,3 +565,146 @@ physical_plan

statement ok
drop table data;


####################
# Test issue: limit pushdown with offsets
alamb marked this conversation as resolved.
Show resolved Hide resolved
####################

statement ok
CREATE EXTERNAL TABLE ordered_table (
a0 INT,
a INT,
b INT,
c INT UNSIGNED,
d INT
)
STORED AS CSV
WITH ORDER (c ASC)
LOCATION '../core/tests/data/window_2.csv'
OPTIONS ('format.has_header' 'true');

# all results
query II
SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc;
----
3 25
2 25
1 0
0 0

# limit only
query II
SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc LIMIT 3;
----
3 25
2 25
1 0

# offset only
query II
SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc OFFSET 1;
----
2 25
1 0
0 0

# offset + limit
query II
SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc OFFSET 1 LIMIT 2;
----
2 25
1 0

# Applying offset & limit when multiple streams from groupby
alamb marked this conversation as resolved.
Show resolved Hide resolved
query TT
EXPLAIN SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc OFFSET 1 LIMIT 2;
----
logical_plan
01)Limit: skip=1, fetch=2
02)--Sort: ordered_table.b DESC NULLS FIRST, fetch=3
03)----Aggregate: groupBy=[[ordered_table.b]], aggr=[[sum(CAST(ordered_table.a AS Int64))]]
04)------TableScan: ordered_table projection=[a, b]
physical_plan
01)GlobalLimitExec: skip=1, fetch=2
02)--SortPreservingMergeExec: [b@0 DESC], fetch=3
03)----SortExec: TopK(fetch=3), expr=[b@0 DESC], preserve_partitioning=[true]
04)------AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[sum(ordered_table.a)]
05)--------CoalesceBatchesExec: target_batch_size=8192
06)----------RepartitionExec: partitioning=Hash([b@0], 4), input_partitions=4
07)------------AggregateExec: mode=Partial, gby=[b@1 as b], aggr=[sum(ordered_table.a)]
08)--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
09)----------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], has_header=true

# Applying offset & limit when multiple streams from union
alamb marked this conversation as resolved.
Show resolved Hide resolved
query TT
explain select * FROM (
select c FROM ordered_table
UNION ALL
select d FROM ordered_table
) order by 1 desc LIMIT 10 OFFSET 4;
----
logical_plan
01)Limit: skip=4, fetch=10
02)--Sort: ordered_table.c DESC NULLS FIRST, fetch=14
03)----Union
04)------Projection: CAST(ordered_table.c AS Int64) AS c
05)--------TableScan: ordered_table projection=[c]
06)------Projection: CAST(ordered_table.d AS Int64) AS c
07)--------TableScan: ordered_table projection=[d]
physical_plan
01)GlobalLimitExec: skip=4, fetch=10
02)--SortPreservingMergeExec: [c@0 DESC], fetch=14
03)----UnionExec
04)------SortExec: TopK(fetch=14), expr=[c@0 DESC], preserve_partitioning=[true]
05)--------ProjectionExec: expr=[CAST(c@0 AS Int64) as c]
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
07)------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], output_ordering=[c@0 ASC NULLS LAST], has_header=true
08)------SortExec: TopK(fetch=14), expr=[c@0 DESC], preserve_partitioning=[true]
09)--------ProjectionExec: expr=[CAST(d@0 AS Int64) as c]
10)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
11)------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[d], has_header=true

# ApplyingmLIMIT & OFFSET to subquery.
alamb marked this conversation as resolved.
Show resolved Hide resolved
query III
select t1.b, c, c2 FROM (
select b, c FROM ordered_table ORDER BY b desc, c desc OFFSET 1 LIMIT 4
) as t1 INNER JOIN (
select b, c as c2 FROM ordered_table ORDER BY b desc, d desc OFFSET 1 LIMIT 4
) as t2
ON t1.b = t2.b
ORDER BY t1.b desc, c desc, c2 desc;
----
3 98 96
3 98 89
3 98 82
3 98 79
3 97 96
3 97 89
3 97 82
3 97 79
3 96 96
3 96 89
3 96 82
3 96 79
3 95 96
3 95 89
3 95 82
3 95 79

# Apply OFFSET & LIMIT to both parent and child (subquery).
query III
select t1.b, c, c2 FROM (
select b, c FROM ordered_table ORDER BY b desc, c desc OFFSET 1 LIMIT 4
) as t1 INNER JOIN (
select b, c as c2 FROM ordered_table ORDER BY b desc, d desc OFFSET 1 LIMIT 4
) as t2
ON t1.b = t2.b
ORDER BY t1.b desc, c desc, c2 desc
OFFSET 3 LIMIT 2;
----
3 99 82
3 99 79

statement ok
drop table ordered_table;