Skip to content

Commit

Permalink
Minor: include sort expressions in SortPreservingRepartitionExec
Browse files Browse the repository at this point in the history
…explain plan (#7796)

* Minor: include sort expressions in SortPreservingRepartitionExec plan

* update plan in new test
  • Loading branch information
alamb committed Oct 13, 2023
1 parent 19d22d3 commit e1bef86
Show file tree
Hide file tree
Showing 9 changed files with 124 additions and 64 deletions.
41 changes: 22 additions & 19 deletions datafusion/core/src/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1738,8 +1738,11 @@ mod tests {
_t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
let expr: Vec<String> = self.expr.iter().map(|e| e.to_string()).collect();
write!(f, "SortRequiredExec: [{}]", expr.join(","))
write!(
f,
"SortRequiredExec: [{}]",
PhysicalSortExpr::format_list(&self.expr)
)
}
}

Expand Down Expand Up @@ -3056,16 +3059,16 @@ mod tests {
vec![
top_join_plan.as_str(),
join_plan.as_str(),
"SortPreservingRepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
"SortPreservingRepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10, sort_exprs=a@0 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[a@0 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortPreservingRepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10",
"SortPreservingRepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10, sort_exprs=b1@1 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[b1@1 ASC]",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortPreservingRepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
"SortPreservingRepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, sort_exprs=c@2 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[c@2 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
Expand All @@ -3082,21 +3085,21 @@ mod tests {
_ => vec![
top_join_plan.as_str(),
// Below 4 operators are differences introduced, when join mode is changed
"SortPreservingRepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
"SortPreservingRepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10, sort_exprs=a@0 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[a@0 ASC]",
"CoalescePartitionsExec",
join_plan.as_str(),
"SortPreservingRepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
"SortPreservingRepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10, sort_exprs=a@0 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[a@0 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortPreservingRepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10",
"SortPreservingRepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10, sort_exprs=b1@1 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[b1@1 ASC]",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortPreservingRepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
"SortPreservingRepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, sort_exprs=c@2 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[c@2 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
Expand Down Expand Up @@ -3170,38 +3173,38 @@ mod tests {
JoinType::Inner | JoinType::Right => vec![
top_join_plan.as_str(),
join_plan.as_str(),
"SortPreservingRepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
"SortPreservingRepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10, sort_exprs=a@0 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[a@0 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortPreservingRepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10",
"SortPreservingRepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10, sort_exprs=b1@1 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[b1@1 ASC]",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortPreservingRepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
"SortPreservingRepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, sort_exprs=c@2 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[c@2 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
],
// Should include 8 RepartitionExecs (4 of them preserves order) and 4 SortExecs
JoinType::Left | JoinType::Full => vec![
top_join_plan.as_str(),
"SortPreservingRepartitionExec: partitioning=Hash([b1@6], 10), input_partitions=10",
"SortPreservingRepartitionExec: partitioning=Hash([b1@6], 10), input_partitions=10, sort_exprs=b1@6 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[b1@6 ASC]",
"CoalescePartitionsExec",
join_plan.as_str(),
"SortPreservingRepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10",
"SortPreservingRepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10, sort_exprs=a@0 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[a@0 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortPreservingRepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10",
"SortPreservingRepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10, sort_exprs=b1@1 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[b1@1 ASC]",
"ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortPreservingRepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
"SortPreservingRepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, sort_exprs=c@2 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[c@2 ASC]",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
Expand Down Expand Up @@ -3292,7 +3295,7 @@ mod tests {

let expected_first_sort_enforcement = &[
"SortMergeJoin: join_type=Inner, on=[(b3@1, b2@1), (a3@0, a2@0)]",
"SortPreservingRepartitionExec: partitioning=Hash([b3@1, a3@0], 10), input_partitions=10",
"SortPreservingRepartitionExec: partitioning=Hash([b3@1, a3@0], 10), input_partitions=10, sort_exprs=b3@1 ASC,a3@0 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[b3@1 ASC,a3@0 ASC]",
"CoalescePartitionsExec",
Expand All @@ -3303,7 +3306,7 @@ mod tests {
"AggregateExec: mode=Partial, gby=[b@1 as b1, a@0 as a1], aggr=[]",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]",
"SortPreservingRepartitionExec: partitioning=Hash([b2@1, a2@0], 10), input_partitions=10",
"SortPreservingRepartitionExec: partitioning=Hash([b2@1, a2@0], 10), input_partitions=10, sort_exprs=b2@1 ASC,a2@0 ASC",
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
"SortExec: expr=[b2@1 ASC,a2@0 ASC]",
"CoalescePartitionsExec",
Expand Down Expand Up @@ -4382,7 +4385,7 @@ mod tests {
let expected = &[
"SortPreservingMergeExec: [c@2 ASC]",
"FilterExec: c@2 = 0",
"SortPreservingRepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
"SortPreservingRepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2, sort_exprs=c@2 ASC",
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
];

Expand Down
27 changes: 17 additions & 10 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2156,15 +2156,19 @@ mod tests {
let coalesce_partitions = coalesce_partitions_exec(repartition_hash);
let physical_plan = sort_exec(vec![sort_expr("a", &schema)], coalesce_partitions);

let expected_input = ["SortExec: expr=[a@0 ASC]",
let expected_input = [
"SortExec: expr=[a@0 ASC]",
" CoalescePartitionsExec",
" RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC], has_header=false"];
let expected_optimized = ["SortPreservingMergeExec: [a@0 ASC]",
" SortPreservingRepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC], has_header=false"
];
let expected_optimized = [
"SortPreservingMergeExec: [a@0 ASC]",
" SortPreservingRepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, sort_exprs=a@0 ASC",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC], has_header=false"];
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC], has_header=false",
];
assert_optimized!(expected_input, expected_optimized, physical_plan, true);
Ok(())
}
Expand All @@ -2186,11 +2190,14 @@ mod tests {
" CoalescePartitionsExec",
" RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC], has_header=false"];
let expected_optimized = ["SortPreservingMergeExec: [a@0 ASC]",
" SortPreservingRepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC], has_header=false"
];
let expected_optimized = [
"SortPreservingMergeExec: [a@0 ASC]",
" SortPreservingRepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, sort_exprs=a@0 ASC",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC], has_header=false"];
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], infinite_source=true, output_ordering=[a@0 ASC], has_header=false",
];
assert_optimized!(expected_input, expected_optimized, physical_plan, false);
Ok(())
}
Expand Down Expand Up @@ -2259,7 +2266,7 @@ mod tests {
let expected_input = [
"BoundedWindowAggExec: wdw=[count: Ok(Field { name: \"count\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(NULL), end_bound: CurrentRow }], mode=[Sorted]",
" SortPreservingMergeExec: [a@0 ASC,b@1 ASC]",
" SortPreservingRepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10",
" SortPreservingRepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10, sort_exprs=a@0 ASC,b@1 ASC",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
" SortExec: expr=[a@0 ASC,b@1 ASC]",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false",
Expand Down
Loading

0 comments on commit e1bef86

Please sign in to comment.