Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move repartition_file_scans out of enable_round_robin check in EnforceDistribution rule #8731

Merged
merged 12 commits into from
Jan 5, 2024

Conversation

viirya
Copy link
Member

@viirya viirya commented Jan 3, 2024

Which issue does this PR close?

Closes #.

Rationale for this change

This patch cleans up some code in EnforceDistribution rule. In particular, it also does one major changes by moving repartition_file_scans out of the check by enable_round_robin.

What changes are included in this PR?

Are these changes tested?

Existing tests

Are there any user-facing changes?

No

@github-actions github-actions bot added core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) labels Jan 3, 2024
Comment on lines -1218 to -1225
// When `repartition_file_scans` is set, attempt to increase
// parallelism at the source.
if repartition_file_scans {
if let Some(new_child) =
child.plan.repartitioned(target_partitions, config)?
{
child.plan = new_child;
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

repartition_file_scans doesn't look like it should be guarded by enable_round_robin config.

Comment on lines 1227 to 1231
// Increase parallelism by adding round-robin repartitioning
// on top of the operator. Note that we only do this if the
// partition count is not already equal to the desired partition
// count.
child = add_roundrobin_on_top(child, target_partitions)?;
Copy link
Member Author

@viirya viirya Jan 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't look correct to me. The produced query plan also looks confused.

If there are already distribution requirements like single partition or hash partitioning (i.e., it will be repartitioning), why we add round-robin repartitioning below it to result in two repartitioning?

From the comment, looks like it is to increase parallelism? But the parallelism is bound to cpus, however here the partitioning number is target_partitions. That's said it could go to round-robin into much higher partitions (9000 for example as I saw such in one existing test case). But I don't think we will have parallelism as same as target_partitions. Not to mention that additional round-robin partitioning might have their cost.

Regarding parallelism, it sounds more correct to produce partitions at scans (i.e., repartition_file_scans) and results in required parallelism on following operators naturally instead of inserting arbitrary round-robin repartitioning around distribution requirements.

I did benchmark for this change. Overall I don't see significant downgrade.

For example:

Benchmark clickbench_1.json                            
--------------------                                   
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃       main ┃    cleanup ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 0     │     3.19ms │     3.13ms │     no change │
│ QQuery 1     │    22.84ms │    24.64ms │  1.08x slower │
│ QQuery 2     │    71.05ms │    79.33ms │  1.12x slower │
│ QQuery 3     │    60.94ms │    67.83ms │  1.11x slower │
│ QQuery 4     │   453.96ms │   470.30ms │     no change │
│ QQuery 5     │  1089.41ms │   877.61ms │ +1.24x faster │
│ QQuery 6     │    24.80ms │    27.15ms │  1.09x slower │
│ QQuery 7     │    24.79ms │    26.62ms │  1.07x slower │
│ QQuery 8     │  1001.42ms │  1000.47ms │     no change │
│ QQuery 9     │  1625.50ms │  1653.77ms │     no change │
│ QQuery 10    │   201.92ms │   237.07ms │  1.17x slower │
│ QQuery 11    │   226.95ms │   251.00ms │  1.11x slower │
│ QQuery 12    │  1023.53ms │   791.79ms │ +1.29x faster │
│ QQuery 13    │  1672.11ms │  1435.32ms │ +1.16x faster │
│ QQuery 14    │  1117.68ms │   881.34ms │ +1.27x faster │
│ QQuery 15    │   554.72ms │   574.87ms │     no change │
│ QQuery 16    │  1900.79ms │  1840.62ms │     no change │
│ QQuery 17    │  1884.34ms │  1791.93ms │     no change │
│ QQuery 18    │  4313.52ms │  3465.75ms │ +1.24x faster │
│ QQuery 19    │    43.34ms │    53.79ms │  1.24x slower │
│ QQuery 20    │  1976.39ms │  1850.32ms │ +1.07x faster │
│ QQuery 21    │  2127.49ms │  2010.90ms │ +1.06x faster │
│ QQuery 22    │  4457.89ms │  4859.12ms │  1.09x slower │
│ QQuery 23    │  8203.33ms │  8276.48ms │     no change │
│ QQuery 24    │   511.62ms │   426.74ms │ +1.20x faster │
│ QQuery 25    │   486.66ms │   366.92ms │ +1.33x faster │
│ QQuery 26    │   588.12ms │   473.45ms │ +1.24x faster │                                                                     
│ QQuery 27    │  1692.65ms │  1595.15ms │ +1.06x faster │
│ QQuery 28    │ 12640.52ms │ 12382.83ms │     no change │                                                                     
│ QQuery 29    │   407.09ms │   447.76ms │  1.10x slower │     
│ QQuery 30    │   968.69ms │   770.84ms │ +1.26x faster │
│ QQuery 31    │  1066.54ms │   876.63ms │ +1.22x faster │                                                                                                                                                                                                     
│ QQuery 32    │ 10732.72ms │  8554.12ms │ +1.25x faster │
│ QQuery 33    │  7478.91ms │  6086.03ms │ +1.23x faster │                                                                                                                                                                                                     
│ QQuery 34    │  6516.26ms │  6733.28ms │     no change │                                                                                                                                                                                                     
│ QQuery 35    │  1236.72ms │  1207.24ms │     no change │                                                                     
│ QQuery 36    │   198.37ms │   195.67ms │     no change │     
│ QQuery 37    │    99.59ms │    99.33ms │     no change │
│ QQuery 38    │   111.45ms │   111.15ms │     no change │
│ QQuery 39    │   432.10ms │   432.58ms │     no change │
│ QQuery 40    │    45.72ms │    43.84ms │     no change │
│ QQuery 41    │    41.19ms │    41.65ms │     no change │
│ QQuery 42    │    48.21ms │    48.99ms │     no change │
└──────────────┴────────────┴────────────┴───────────────┘

Copy link
Contributor

@Dandandan Dandandan Jan 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea as you said is to increase parallelism of the hash-repartition operator. target_partitions in most cases / by design is currently bound to the CPU.

Copy link
Contributor

@Dandandan Dandandan Jan 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure the clickbench benchmark actually has different query plans? As we push down repartitioning to e.g. parquet/csv, we shouldn't introduce the round Robin repartitioning anymore in those plans, as target_partitions requirement might already be met.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure as these benchmarks don't check query plans. I may need to manually check it. I will post back what I see later. However, my first idea might be, doesn't it mean mostly we don't need to add these round robin? I'm not sure if the addition of the round robin is driven by seeing better performance number.

Btw, I know this change might be a bit controversial before I change it. I change it because the query plan looks weird to have two-level repartitions always (round robin + hashing) and some of them looks unreasonable (round robin 9000 partitions?) so want to raise the discussion. If I cannot get consensus on the removal, I can restore it for sure and keep other change in this PR only.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea as you said is to increase parallelism of the hash-repartition operator. target_partitions in most cases / by design is currently bound to the CPU.

Yea, however, I think in most cases hash repartitioning should already take not-single partitioned input (unless its input is intentionally partitioned into single partition). If the scans are well partitioned, later operators follow the partitioning.

Also, I don't see it can obviously affect performance so far (based on the benchmark result locally). Actually, I think mostly the addition round robin doesn't help but adds a little cost (the round robin partitioning is not zero-cost op).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I remember correctly, before partitions were pushed down to scans, adding round robin repartitioning after e.g. single files was better than doing it only in hash repartition. I think if you disable the file repartitioning or test with formats that don't support it you should be able to measure the effect.
The additional cost of round-robin is extremely small (invisible in profiles), especially compared to hash repartitioning.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me restore this change first and make this PR (other parts) forward.

@@ -1198,37 +1191,25 @@ fn ensure_distribution(
)
.map(
|(mut child, requirement, required_input_ordering, would_benefit, maintains)| {
// Don't need to apply when the returned row count is not greater than 1:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on what value is compared with (batch_size), I think this should be batch_size instead of 1.

Comment on lines -1365 to -1372
child_context.distribution_connection = match child_context.plan.as_any() {
plan_any if plan_any.is::<RepartitionExec>() => matches!(
plan_any
.downcast_ref::<RepartitionExec>()
.unwrap()
.partitioning(),
Partitioning::RoundRobinBatch(_) | Partitioning::Hash(_, _)
),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary matching. For all supported partitioning in RepartitionExec, distribution_connection should be true.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And changed to use utility functions like is_repartition.

------------------------------------RepartitionExec: partitioning=Hash([s_suppkey@0], 4), input_partitions=4
--------------------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
----------------------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/supplier.tbl]]}, projection=[s_suppkey, s_nationkey], has_header=false
------------------------------------RepartitionExec: partitioning=Hash([s_suppkey@0], 4), input_partitions=1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these plans in these tests are less optimal parallelism-wise as the (compute intensive) hash-repartition is now running on one partition.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My reason to make the change: #8731 (comment)

if repartition_beneficial_stats {
// Since hashing benefits from partitioning, add a round-robin repartition
// before it:
input = add_roundrobin_on_top(input, n_target)?;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks redundant as we already have it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted this first. I will clean these up in other PR.

@@ -1237,12 +1231,7 @@ fn ensure_distribution(
child = add_spm_on_top(child);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Dandandan Do we need the additional round robin for single partition? This looks unnecessary as single partitioning is not costly as hash partitioning.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me focus on repartition_file_scans change first in this PR. I will open another PR to clean up these round robin stuffs.

@@ -147,7 +147,7 @@ WITH HEADER ROW
LOCATION 'test_files/scratch/repartition_scan/csv_table/';

query I
select * from csv_table;
select * from csv_table ORDER BY column1;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes the test result stable, otherwise every time I rerun it locally, it gets different results...

@@ -3794,7 +3794,7 @@ select a,
1 1
2 1

# support scalar value in ORDER BY
# support scalar value in ORDER BY
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simply ran all sqllogictests locally. Seems some space chars left there.

} else {
true
};

// When `repartition_file_scans` is set, attempt to increase
// parallelism at the source.
if repartition_file_scans {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also check for repartition_beneficial_stats maybe?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, looks like we have filled file scan statistics.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a good code change to me, but I am surprised that there are no (real) test changes needed other than parallelization_two_partitions

The changes to parallelization_two_partitions are pretty subtle to me in that if they changed I might not catch the regression (the plan would simply look like it had an implicit rather than an explicit range)

Is there some way we can add an explain plan to prevent this code from being broken in the future?

@viirya
Copy link
Member Author

viirya commented Jan 3, 2024

Is there some way we can add an explain plan to prevent this code from being broken in the future?

Let me see how to add one.

Comment on lines +66 to +73
# disable round robin repartitioning
statement ok
set datafusion.optimizer.enable_round_robin_repartition = false;

## Expect to see the scan read the file as "4" groups with even sizes (offsets) again
query TT
EXPLAIN SELECT column1 FROM parquet_table WHERE column1 <> 42;
----
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I verified that this test case fails on current main branch.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good to me -- thank you @viirya

cc @mustafasrepo / @ozankabak as you have expertise in this area of the code I think

Copy link
Contributor

@mustafasrepo mustafasrepo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @viirya LGTM!.

@viirya
Copy link
Member Author

viirya commented Jan 5, 2024

Thank you @alamb @Dandandan @mustafasrepo @ozankabak

@alamb alamb merged commit 29f23eb into apache:main Jan 5, 2024
22 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants