Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update explain plan to show TopK operator #7826

Merged
merged 3 commits into from
Oct 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ async fn test_physical_plan_display_indent() {
let expected = vec![
"GlobalLimitExec: skip=0, fetch=10",
" SortPreservingMergeExec: [the_min@2 DESC], fetch=10",
" SortExec: fetch=10, expr=[the_min@2 DESC]",
" SortExec: TopK(fetch=10), expr=[the_min@2 DESC]",
" ProjectionExec: expr=[c1@0 as c1, MAX(aggregate_test_100.c12)@1 as MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)@2 as the_min]",
" AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)]",
" CoalesceBatchesExec: target_batch_size=4096",
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/sql/group_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ async fn group_by_limit() -> Result<()> {
let physical_plan = dataframe.create_physical_plan().await?;
let mut expected_physical_plan = r#"
GlobalLimitExec: skip=0, fetch=4
SortExec: fetch=4, expr=[MAX(traces.ts)@1 DESC]
SortExec: TopK(fetch=4), expr=[MAX(traces.ts)@1 DESC]
AggregateExec: mode=Single, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.ts)], lim=[4]
"#.trim().to_string();
let actual_phys_plan =
Expand Down
6 changes: 1 addition & 5 deletions datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -766,11 +766,7 @@ impl DisplayAs for SortExec {
let expr = PhysicalSortExpr::format_list(&self.expr);
match self.fetch {
Some(fetch) => {
write!(
f,
// TODO should this say topk?
"SortExec: fetch={fetch}, expr=[{expr}]",
)
write!(f, "SortExec: TopK(fetch={fetch}), expr=[{expr}]",)
}
None => write!(f, "SortExec: expr=[{expr}]"),
}
Expand Down
10 changes: 5 additions & 5 deletions datafusion/sqllogictest/test_files/aggregate.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2338,7 +2338,7 @@ Limit: skip=0, fetch=4
physical_plan
GlobalLimitExec: skip=0, fetch=4
--SortPreservingMergeExec: [MAX(traces.timestamp)@1 DESC], fetch=4
----SortExec: fetch=4, expr=[MAX(traces.timestamp)@1 DESC]
----SortExec: TopK(fetch=4), expr=[MAX(traces.timestamp)@1 DESC]
------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)]
--------CoalesceBatchesExec: target_batch_size=8192
----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4
Expand Down Expand Up @@ -2393,7 +2393,7 @@ Limit: skip=0, fetch=4
physical_plan
GlobalLimitExec: skip=0, fetch=4
--SortPreservingMergeExec: [MAX(traces.timestamp)@1 DESC], fetch=4
----SortExec: fetch=4, expr=[MAX(traces.timestamp)@1 DESC]
----SortExec: TopK(fetch=4), expr=[MAX(traces.timestamp)@1 DESC]
------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)], lim=[4]
--------CoalesceBatchesExec: target_batch_size=8192
----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4
Expand All @@ -2412,7 +2412,7 @@ Limit: skip=0, fetch=4
physical_plan
GlobalLimitExec: skip=0, fetch=4
--SortPreservingMergeExec: [MIN(traces.timestamp)@1 DESC], fetch=4
----SortExec: fetch=4, expr=[MIN(traces.timestamp)@1 DESC]
----SortExec: TopK(fetch=4), expr=[MIN(traces.timestamp)@1 DESC]
------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MIN(traces.timestamp)]
--------CoalesceBatchesExec: target_batch_size=8192
----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4
Expand All @@ -2431,7 +2431,7 @@ Limit: skip=0, fetch=4
physical_plan
GlobalLimitExec: skip=0, fetch=4
--SortPreservingMergeExec: [MAX(traces.timestamp)@1 ASC NULLS LAST], fetch=4
----SortExec: fetch=4, expr=[MAX(traces.timestamp)@1 ASC NULLS LAST]
----SortExec: TopK(fetch=4), expr=[MAX(traces.timestamp)@1 ASC NULLS LAST]
------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)]
--------CoalesceBatchesExec: target_batch_size=8192
----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4
Expand All @@ -2450,7 +2450,7 @@ Limit: skip=0, fetch=4
physical_plan
GlobalLimitExec: skip=0, fetch=4
--SortPreservingMergeExec: [trace_id@0 ASC NULLS LAST], fetch=4
----SortExec: fetch=4, expr=[trace_id@0 ASC NULLS LAST]
----SortExec: TopK(fetch=4), expr=[trace_id@0 ASC NULLS LAST]
------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)]
--------CoalesceBatchesExec: target_batch_size=8192
----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/topk.slt
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ Limit: skip=0, fetch=5
----TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
physical_plan
GlobalLimitExec: skip=0, fetch=5
--SortExec: fetch=5, expr=[c13@12 DESC]
--SortExec: TopK(fetch=5), expr=[c13@12 DESC]
----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13], has_header=true


Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/tpch/q10.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ Limit: skip=0, fetch=10
physical_plan
GlobalLimitExec: skip=0, fetch=10
--SortPreservingMergeExec: [revenue@2 DESC], fetch=10
----SortExec: fetch=10, expr=[revenue@2 DESC]
----SortExec: TopK(fetch=10), expr=[revenue@2 DESC]
------ProjectionExec: expr=[c_custkey@0 as c_custkey, c_name@1 as c_name, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@7 as revenue, c_acctbal@2 as c_acctbal, n_name@4 as n_name, c_address@5 as c_address, c_phone@3 as c_phone, c_comment@6 as c_comment]
--------AggregateExec: mode=FinalPartitioned, gby=[c_custkey@0 as c_custkey, c_name@1 as c_name, c_acctbal@2 as c_acctbal, c_phone@3 as c_phone, n_name@4 as n_name, c_address@5 as c_address, c_comment@6 as c_comment], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]
----------CoalesceBatchesExec: target_batch_size=8192
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/tpch/q11.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ Limit: skip=0, fetch=10
physical_plan
GlobalLimitExec: skip=0, fetch=10
--SortPreservingMergeExec: [value@1 DESC], fetch=10
----SortExec: fetch=10, expr=[value@1 DESC]
----SortExec: TopK(fetch=10), expr=[value@1 DESC]
------ProjectionExec: expr=[ps_partkey@0 as ps_partkey, SUM(partsupp.ps_supplycost * partsupp.ps_availqty)@1 as value]
--------NestedLoopJoinExec: join_type=Inner, filter=CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty)@0 AS Decimal128(38, 15)) > SUM(partsupp.ps_supplycost * partsupp.ps_availqty) * Float64(0.0001)@1
----------AggregateExec: mode=FinalPartitioned, gby=[ps_partkey@0 as ps_partkey], aggr=[SUM(partsupp.ps_supplycost * partsupp.ps_availqty)]
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/tpch/q13.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ Limit: skip=0, fetch=10
physical_plan
GlobalLimitExec: skip=0, fetch=10
--SortPreservingMergeExec: [custdist@1 DESC,c_count@0 DESC], fetch=10
----SortExec: fetch=10, expr=[custdist@1 DESC,c_count@0 DESC]
----SortExec: TopK(fetch=10), expr=[custdist@1 DESC,c_count@0 DESC]
------ProjectionExec: expr=[c_count@0 as c_count, COUNT(*)@1 as custdist]
--------AggregateExec: mode=FinalPartitioned, gby=[c_count@0 as c_count], aggr=[COUNT(*)]
----------CoalesceBatchesExec: target_batch_size=8192
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/tpch/q16.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ Limit: skip=0, fetch=10
physical_plan
GlobalLimitExec: skip=0, fetch=10
--SortPreservingMergeExec: [supplier_cnt@3 DESC,p_brand@0 ASC NULLS LAST,p_type@1 ASC NULLS LAST,p_size@2 ASC NULLS LAST], fetch=10
----SortExec: fetch=10, expr=[supplier_cnt@3 DESC,p_brand@0 ASC NULLS LAST,p_type@1 ASC NULLS LAST,p_size@2 ASC NULLS LAST]
----SortExec: TopK(fetch=10), expr=[supplier_cnt@3 DESC,p_brand@0 ASC NULLS LAST,p_type@1 ASC NULLS LAST,p_size@2 ASC NULLS LAST]
------ProjectionExec: expr=[group_alias_0@0 as part.p_brand, group_alias_1@1 as part.p_type, group_alias_2@2 as part.p_size, COUNT(alias1)@3 as supplier_cnt]
--------AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0, group_alias_1@1 as group_alias_1, group_alias_2@2 as group_alias_2], aggr=[COUNT(alias1)]
----------CoalesceBatchesExec: target_batch_size=8192
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/tpch/q2.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ Limit: skip=0, fetch=10
physical_plan
GlobalLimitExec: skip=0, fetch=10
--SortPreservingMergeExec: [s_acctbal@0 DESC,n_name@2 ASC NULLS LAST,s_name@1 ASC NULLS LAST,p_partkey@3 ASC NULLS LAST], fetch=10
----SortExec: fetch=10, expr=[s_acctbal@0 DESC,n_name@2 ASC NULLS LAST,s_name@1 ASC NULLS LAST,p_partkey@3 ASC NULLS LAST]
----SortExec: TopK(fetch=10), expr=[s_acctbal@0 DESC,n_name@2 ASC NULLS LAST,s_name@1 ASC NULLS LAST,p_partkey@3 ASC NULLS LAST]
------ProjectionExec: expr=[s_acctbal@5 as s_acctbal, s_name@2 as s_name, n_name@8 as n_name, p_partkey@0 as p_partkey, p_mfgr@1 as p_mfgr, s_address@3 as s_address, s_phone@4 as s_phone, s_comment@6 as s_comment]
--------CoalesceBatchesExec: target_batch_size=8192
----------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(p_partkey@0, ps_partkey@1), (ps_supplycost@7, MIN(partsupp.ps_supplycost)@0)]
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/tpch/q3.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ Limit: skip=0, fetch=10
physical_plan
GlobalLimitExec: skip=0, fetch=10
--SortPreservingMergeExec: [revenue@1 DESC,o_orderdate@2 ASC NULLS LAST], fetch=10
----SortExec: fetch=10, expr=[revenue@1 DESC,o_orderdate@2 ASC NULLS LAST]
----SortExec: TopK(fetch=10), expr=[revenue@1 DESC,o_orderdate@2 ASC NULLS LAST]
------ProjectionExec: expr=[l_orderkey@0 as l_orderkey, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@3 as revenue, o_orderdate@1 as o_orderdate, o_shippriority@2 as o_shippriority]
--------AggregateExec: mode=FinalPartitioned, gby=[l_orderkey@0 as l_orderkey, o_orderdate@1 as o_orderdate, o_shippriority@2 as o_shippriority], aggr=[SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]
----------CoalesceBatchesExec: target_batch_size=8192
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/tpch/q9.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ Limit: skip=0, fetch=10
physical_plan
GlobalLimitExec: skip=0, fetch=10
--SortPreservingMergeExec: [nation@0 ASC NULLS LAST,o_year@1 DESC], fetch=10
----SortExec: fetch=10, expr=[nation@0 ASC NULLS LAST,o_year@1 DESC]
----SortExec: TopK(fetch=10), expr=[nation@0 ASC NULLS LAST,o_year@1 DESC]
------ProjectionExec: expr=[nation@0 as nation, o_year@1 as o_year, SUM(profit.amount)@2 as sum_profit]
--------AggregateExec: mode=FinalPartitioned, gby=[nation@0 as nation, o_year@1 as o_year], aggr=[SUM(profit.amount)]
----------CoalesceBatchesExec: target_batch_size=8192
Expand Down
Loading