Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move repartition_file_scans out of enable_round_robin check in EnforceDistribution rule #8731

Merged
merged 12 commits into from
Jan 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 18 additions & 24 deletions datafusion/core/src/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1198,32 +1198,33 @@ fn ensure_distribution(
)
.map(
|(mut child, requirement, required_input_ordering, would_benefit, maintains)| {
// Don't need to apply when the returned row count is not greater than 1:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on what value is compared with (batch_size), I think this should be batch_size instead of 1.

// Don't need to apply when the returned row count is not greater than batch size
let num_rows = child.plan.statistics()?.num_rows;
let repartition_beneficial_stats = if num_rows.is_exact().unwrap_or(false) {
num_rows
.get_value()
.map(|value| value > &batch_size)
.unwrap_or(true)
.unwrap() // safe to unwrap since is_exact() is true
} else {
true
};

// When `repartition_file_scans` is set, attempt to increase
// parallelism at the source.
if repartition_file_scans && repartition_beneficial_stats {
if let Some(new_child) =
child.plan.repartitioned(target_partitions, config)?
{
child.plan = new_child;
}
}

if enable_round_robin
// Operator benefits from partitioning (e.g. filter):
&& (would_benefit && repartition_beneficial_stats)
// Unless partitioning doesn't increase the partition count, it is not beneficial:
&& child.plan.output_partitioning().partition_count() < target_partitions
{
// When `repartition_file_scans` is set, attempt to increase
// parallelism at the source.
if repartition_file_scans {
if let Some(new_child) =
child.plan.repartitioned(target_partitions, config)?
{
child.plan = new_child;
}
Comment on lines -1218 to -1225
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

repartition_file_scans doesn't look like it should be guarded by enable_round_robin config.

}
// Increase parallelism by adding round-robin repartitioning
// on top of the operator. Note that we only do this if the
// partition count is not already equal to the desired partition
Expand Down Expand Up @@ -1362,17 +1363,10 @@ impl DistributionContext {

fn update_children(mut self) -> Result<Self> {
for child_context in self.children_nodes.iter_mut() {
child_context.distribution_connection = match child_context.plan.as_any() {
plan_any if plan_any.is::<RepartitionExec>() => matches!(
plan_any
.downcast_ref::<RepartitionExec>()
.unwrap()
.partitioning(),
Partitioning::RoundRobinBatch(_) | Partitioning::Hash(_, _)
),
Comment on lines -1365 to -1372
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary matching. For all supported partitioning in RepartitionExec, distribution_connection should be true.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And changed to use utility functions like is_repartition.

plan_any
if plan_any.is::<SortPreservingMergeExec>()
|| plan_any.is::<CoalescePartitionsExec>() =>
child_context.distribution_connection = match &child_context.plan {
plan if is_repartition(plan)
|| is_coalesce_partitions(plan)
|| is_sort_preserving_merge(plan) =>
{
true
}
Expand Down Expand Up @@ -3871,14 +3865,14 @@ pub(crate) mod tests {
"RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
"AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
// Plan already has two partitions
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e]",
"ParquetExec: file_groups={2 groups: [[x:0..100], [y:0..100]]}, projection=[a, b, c, d, e]",
];
let expected_csv = [
"AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]",
"RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2",
"AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]",
// Plan already has two partitions
"CsvExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], has_header=false",
"CsvExec: file_groups={2 groups: [[x:0..100], [y:0..100]]}, projection=[a, b, c, d, e], has_header=false",
];

assert_optimized!(expected_parquet, plan_parquet, true, false, 2, true, 10);
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/arrow_typeof.slt
Original file line number Diff line number Diff line change
Expand Up @@ -375,4 +375,4 @@ select arrow_cast(make_array(1, 2, 3), 'LargeList(Int64)');
query T
select arrow_typeof(arrow_cast(make_array(1, 2, 3), 'LargeList(Int64)'));
----
LargeList(Field { name: "item", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} })
LargeList(Field { name: "item", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} })
24 changes: 22 additions & 2 deletions datafusion/sqllogictest/test_files/repartition_scan.slt
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,26 @@ CoalesceBatchesExec: target_batch_size=8192
--FilterExec: column1@0 != 42
----ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..101], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:101..202], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:202..303], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:303..403]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1

# disable round robin repartitioning
statement ok
set datafusion.optimizer.enable_round_robin_repartition = false;

## Expect to see the scan read the file as "4" groups with even sizes (offsets) again
query TT
EXPLAIN SELECT column1 FROM parquet_table WHERE column1 <> 42;
----
Comment on lines +66 to +73
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I verified that this test case fails on current main branch.

logical_plan
Filter: parquet_table.column1 != Int32(42)
--TableScan: parquet_table projection=[column1], partial_filters=[parquet_table.column1 != Int32(42)]
physical_plan
CoalesceBatchesExec: target_batch_size=8192
--FilterExec: column1@0 != 42
----ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..101], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:101..202], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:202..303], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:303..403]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_min@0 != 42 OR 42 != column1_max@1

# enable round robin repartitioning again
statement ok
set datafusion.optimizer.enable_round_robin_repartition = true;

# create a second parquet file
statement ok
COPY (VALUES (100), (200)) TO 'test_files/scratch/repartition_scan/parquet_table/1.parquet'
Expand Down Expand Up @@ -147,7 +167,7 @@ WITH HEADER ROW
LOCATION 'test_files/scratch/repartition_scan/csv_table/';

query I
select * from csv_table;
select * from csv_table ORDER BY column1;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes the test result stable, otherwise every time I rerun it locally, it gets different results...

----
1
2
Expand Down Expand Up @@ -190,7 +210,7 @@ STORED AS json
LOCATION 'test_files/scratch/repartition_scan/json_table/';

query I
select * from "json_table";
select * from "json_table" ORDER BY column1;
----
1
2
Expand Down
4 changes: 2 additions & 2 deletions datafusion/sqllogictest/test_files/timestamps.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1862,7 +1862,7 @@ SELECT to_timestamp(null) is null as c1,
----
true true true true true true true true true true true true true

# verify timestamp output types
# verify timestamp output types
query TTT
SELECT arrow_typeof(to_timestamp(1)), arrow_typeof(to_timestamp(null)), arrow_typeof(to_timestamp('2023-01-10 12:34:56.000'))
----
Expand All @@ -1880,7 +1880,7 @@ SELECT arrow_typeof(to_timestamp(1)) = arrow_typeof(1::timestamp) as c1,
true true true true true true

# known issues. currently overflows (expects default precision to be microsecond instead of nanoseconds. Work pending)
#verify extreme values
#verify extreme values
#query PPPPPPPP
#SELECT to_timestamp(-62125747200), to_timestamp(1926632005177), -62125747200::timestamp, 1926632005177::timestamp, cast(-62125747200 as timestamp), cast(1926632005177 as timestamp)
#----
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/tpch/q2.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ order by
p_partkey
limit 10;
----
9828.21 Supplier#000000647 UNITED KINGDOM 13120 Manufacturer#5 x5U7MBZmwfG9 33-258-202-4782 s the slyly even ideas poach fluffily
9828.21 Supplier#000000647 UNITED KINGDOM 13120 Manufacturer#5 x5U7MBZmwfG9 33-258-202-4782 s the slyly even ideas poach fluffily
9508.37 Supplier#000000070 FRANCE 3563 Manufacturer#1 INWNH2w,OOWgNDq0BRCcBwOMQc6PdFDc4 16-821-608-1166 ests sleep quickly express ideas. ironic ideas haggle about the final T
9508.37 Supplier#000000070 FRANCE 17268 Manufacturer#4 INWNH2w,OOWgNDq0BRCcBwOMQc6PdFDc4 16-821-608-1166 ests sleep quickly express ideas. ironic ideas haggle about the final T
9453.01 Supplier#000000802 ROMANIA 10021 Manufacturer#5 ,6HYXb4uaHITmtMBj4Ak57Pd 29-342-882-6463 gular frets. permanently special multipliers believe blithely alongs
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/window.slt
Original file line number Diff line number Diff line change
Expand Up @@ -3794,7 +3794,7 @@ select a,
1 1
2 1

# support scalar value in ORDER BY
# support scalar value in ORDER BY
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simply ran all sqllogictests locally. Seems some space chars left there.

query I
select rank() over (order by 1) rnk from (select 1 a union all select 2 a) x
----
Expand Down