-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ARROW-12170: [Rust][DataFusion] Introduce repartition optimization #9865
Conversation
I like where this is heading @Dandandan 🚀 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is looking pretty cool @Dandandan
// wrap operators in CoalesceBatches to avoid lots of tiny batches when we have | ||
// highly selective filters | ||
let children = plan | ||
.children() | ||
.iter() | ||
.map(|child| self.optimize(child.clone(), config)) | ||
.collect::<Result<Vec<_>>>()?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I realize you are just moving code around so this comment is outside the context of this PR....
However, I wonder if it would be more performant to do the coalescing directly in the filter kernel code -- the way coalsce is written today requires copying the the (filtered) output into a different (coalesced) array
I think @ritchie46 had some code that allowed incrementally building up output in several chunks as part of polars which may be relevant
I think this code is good, but I wanted to plant a seed 🌱 for future optimizations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that might be a useful direction indeed!
I think indeed it can be more efficient in some cases for nodes to write to mutable buffers than produce smaller batches and concatenate them afterwards, although currently it does not seem to me like it would be a enormous performance improvement based on what I saw in profiling info.
Probably not something in the scope of this PR indeed as it's already getting pretty big.
Some other notes:
- In this PR I think I had to create the physical optimizer abstraction, as otherwise I felt the planner would become unmaintainable. The planning and optimization are now separated and not in the same pass like before (I was a bit confused actually about how it worked before!)
- Currently I added the AddMergeExec as an optimization pass, as that was like that in the code before, however it feels a bit off as optimization pass? But I will probably keep it like that for now.
// Recurse into children bottom-up (added nodes should be as deep as possible) | ||
|
||
let new_plan = if plan.children().is_empty() { | ||
// leaf node - don't replace children |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems like new_with_children
could handle the case of zero children as well, FWIW
.collect(), | ||
), | ||
} | ||
let optimizers = &ctx_state.config.physical_optimizers; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@Dandandan great that someone is working through these optimisations! |
No worries @alamb |
Codecov Report
@@ Coverage Diff @@
## master #9865 +/- ##
==========================================
+ Coverage 82.70% 82.77% +0.07%
==========================================
Files 257 260 +3
Lines 60486 60625 +139
==========================================
+ Hits 50027 50185 +158
+ Misses 10459 10440 -19
Continue to review full report at Codecov.
|
I spent some time looking at this. It seems as if at least one problem is that the
However, that is still not enough to get the tests to pass. My debugging of |
Thanks @alamb that gives some more pointers to continue with. I am also interested what the eventual (optimized) physical plan looks like. At least not having the single partition will avoid having the mergeexec and/or allow the repartitioning to be added as a direct child. |
Somehow this is what the plan looks like - already before optimization(?) it seems it doesn't have children somehow
|
It seems something in the |
I think this is due to the
|
Yeah, I realized that later. Don't know how it is handled in others though, seems formatters implementations only need to include themselves, not their children? |
Ah it is only defined like that on the logical plan... |
This is the optimized plan, which looks ok, there is a
|
Something is happening with polling here and marking the stream as done. When I remove that part, the TopKExec will get results in a later moment. So the fold/aggregate is not getting all batches somehow? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this looks great @Dandandan - thanks!
@@ -471,12 +471,10 @@ impl Stream for TopKReader { | |||
return Poll::Ready(None); | |||
} | |||
// this aggregates and thus returns a single RecordBatch. | |||
self.done = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is probably a real bug -- the topK isn't actually done until it produces output -- and the way this code is written top_values.poll_next_unpin()
may not return Poll::NotReady
? Maybe?
Anyhow, looks great
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That was my intuition too, yeah. This seems to be more correct and fixes the issue.
I'll plan to merge this once CI is done (it looks like there is a bit of a backup now) |
This introduces a optimization pass to introduce repartition whenever the number of partitions of the plan drops below the configured amount of concurrency to optimize the amount of achievable concurrency. This PR separates the optimizations into a `PhysicalOptimizer`, so this can be extended and built upon later. The performance benefit is clear when loading data into memory with a single partition, to test the use case whenever we would have single files or in memory data has high enough throughput, but the single partition causes too little parallelism. This has a similar performance benefit of pre-partitioning the data and loading it in memory in those queries. ``` cargo run --release --bin tpch --features "snmalloc" -- benchmark --iterations 30 --path [path --format parquet --query 1 --batch-size 8192 --concurrency 16 -m -n 1 ``` Master ``` Query 1 avg time: 411.57 ms Query 3 avg time: 147.32 ms Query 5 avg time: 237.62 ms Query 6 avg time: 46.00 ms Query 12 avg time: 124.02 ms ``` PR ``` Query 1 avg time: 76.37 ms Query 3 avg time: 67.51 ms Query 5 avg time: 134.14 ms Query 6 avg time: 9.58 ms Query 12 avg time: 20.60 ms ``` All in all, looking good, we observe speed ups up to 6x for this test! Closes apache#9865 from Dandandan/reparition-opt Lead-authored-by: Heres, Daniel <danielheres@gmail.com> Co-authored-by: Daniël Heres <danielheres@gmail.com> Signed-off-by: Andrew Lamb <andrew@nerdnetworks.org>
This introduces a optimization pass to introduce repartition whenever the number of partitions of the plan drops below the configured amount of concurrency to optimize the amount of achievable concurrency. This PR separates the optimizations into a `PhysicalOptimizer`, so this can be extended and built upon later. The performance benefit is clear when loading data into memory with a single partition, to test the use case whenever we would have single files or in memory data has high enough throughput, but the single partition causes too little parallelism. This has a similar performance benefit of pre-partitioning the data and loading it in memory in those queries. ``` cargo run --release --bin tpch --features "snmalloc" -- benchmark --iterations 30 --path [path --format parquet --query 1 --batch-size 8192 --concurrency 16 -m -n 1 ``` Master ``` Query 1 avg time: 411.57 ms Query 3 avg time: 147.32 ms Query 5 avg time: 237.62 ms Query 6 avg time: 46.00 ms Query 12 avg time: 124.02 ms ``` PR ``` Query 1 avg time: 76.37 ms Query 3 avg time: 67.51 ms Query 5 avg time: 134.14 ms Query 6 avg time: 9.58 ms Query 12 avg time: 20.60 ms ``` All in all, looking good, we observe speed ups up to 6x for this test! Closes apache#9865 from Dandandan/reparition-opt Lead-authored-by: Heres, Daniel <danielheres@gmail.com> Co-authored-by: Daniël Heres <danielheres@gmail.com> Signed-off-by: Andrew Lamb <andrew@nerdnetworks.org>
This introduces a optimization pass to introduce repartition whenever the number of partitions of the plan drops below the configured amount of concurrency to optimize the amount of achievable concurrency. This PR separates the optimizations into a `PhysicalOptimizer`, so this can be extended and built upon later. The performance benefit is clear when loading data into memory with a single partition, to test the use case whenever we would have single files or in memory data has high enough throughput, but the single partition causes too little parallelism. This has a similar performance benefit of pre-partitioning the data and loading it in memory in those queries. ``` cargo run --release --bin tpch --features "snmalloc" -- benchmark --iterations 30 --path [path --format parquet --query 1 --batch-size 8192 --concurrency 16 -m -n 1 ``` Master ``` Query 1 avg time: 411.57 ms Query 3 avg time: 147.32 ms Query 5 avg time: 237.62 ms Query 6 avg time: 46.00 ms Query 12 avg time: 124.02 ms ``` PR ``` Query 1 avg time: 76.37 ms Query 3 avg time: 67.51 ms Query 5 avg time: 134.14 ms Query 6 avg time: 9.58 ms Query 12 avg time: 20.60 ms ``` All in all, looking good, we observe speed ups up to 6x for this test! Closes apache#9865 from Dandandan/reparition-opt Lead-authored-by: Heres, Daniel <danielheres@gmail.com> Co-authored-by: Daniël Heres <danielheres@gmail.com> Signed-off-by: Andrew Lamb <andrew@nerdnetworks.org>
This introduces a optimization pass to introduce repartition whenever the number of partitions of the plan drops below the configured amount of concurrency to optimize the amount of achievable concurrency.
This PR separates the optimizations into a
PhysicalOptimizer
, so this can be extended and built upon later.The performance benefit is clear when loading data into memory with a single partition, to test the use case whenever we would have single files or in memory data has high enough throughput, but the single partition causes too little parallelism.
This has a similar performance benefit of pre-partitioning the data and loading it in memory in those queries.
Master
PR
All in all, looking good, we observe speed ups up to 6x for this test!