Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-12170: [Rust][DataFusion] Introduce repartition optimization #9865

Closed
wants to merge 37 commits into from

Conversation

Dandandan
Copy link
Contributor

@Dandandan Dandandan commented Mar 31, 2021

This introduces a optimization pass to introduce repartition whenever the number of partitions of the plan drops below the configured amount of concurrency to optimize the amount of achievable concurrency.

This PR separates the optimizations into a PhysicalOptimizer, so this can be extended and built upon later.

The performance benefit is clear when loading data into memory with a single partition, to test the use case whenever we would have single files or in memory data has high enough throughput, but the single partition causes too little parallelism.
This has a similar performance benefit of pre-partitioning the data and loading it in memory in those queries.

cargo run --release --bin tpch --features "snmalloc" -- benchmark --iterations 30 --path [path --format parquet --query 1 --batch-size 8192 --concurrency 16 -m -n 1

Master

Query 1 avg time: 411.57 ms
Query 3 avg time: 147.32 ms
Query 5 avg time: 237.62 ms
Query 6 avg time: 46.00 ms
Query 12 avg time: 124.02 ms

PR

Query 1 avg time: 76.37 ms
Query 3 avg time: 67.51 ms
Query 5 avg time: 134.14 ms
Query 6 avg time: 9.58 ms
Query 12 avg time: 20.60 ms

All in all, looking good, we observe speed ups up to 6x for this test!

@andygrove
Copy link
Member

I like where this is heading @Dandandan 🚀

@github-actions
Copy link

github-actions bot commented Apr 1, 2021

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking pretty cool @Dandandan

Comment on lines +46 to +52
// wrap operators in CoalesceBatches to avoid lots of tiny batches when we have
// highly selective filters
let children = plan
.children()
.iter()
.map(|child| self.optimize(child.clone(), config))
.collect::<Result<Vec<_>>>()?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realize you are just moving code around so this comment is outside the context of this PR....

However, I wonder if it would be more performant to do the coalescing directly in the filter kernel code -- the way coalsce is written today requires copying the the (filtered) output into a different (coalesced) array

I think @ritchie46 had some code that allowed incrementally building up output in several chunks as part of polars which may be relevant

I think this code is good, but I wanted to plant a seed 🌱 for future optimizations

Copy link
Contributor Author

@Dandandan Dandandan Apr 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that might be a useful direction indeed!

I think indeed it can be more efficient in some cases for nodes to write to mutable buffers than produce smaller batches and concatenate them afterwards, although currently it does not seem to me like it would be a enormous performance improvement based on what I saw in profiling info.

Probably not something in the scope of this PR indeed as it's already getting pretty big.

Some other notes:

  • In this PR I think I had to create the physical optimizer abstraction, as otherwise I felt the planner would become unmaintainable. The planning and optimization are now separated and not in the same pass like before (I was a bit confused actually about how it worked before!)
  • Currently I added the AddMergeExec as an optimization pass, as that was like that in the code before, however it feels a bit off as optimization pass? But I will probably keep it like that for now.

rust/datafusion/src/physical_optimizer/merge_exec.rs Outdated Show resolved Hide resolved
// Recurse into children bottom-up (added nodes should be as deep as possible)

let new_plan = if plan.children().is_empty() {
// leaf node - don't replace children
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems like new_with_children could handle the case of zero children as well, FWIW

.collect(),
),
}
let optimizers = &ctx_state.config.physical_optimizers;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@seddonm1
Copy link
Contributor

seddonm1 commented Apr 2, 2021

@Dandandan great that someone is working through these optimisations!

@Dandandan Dandandan marked this pull request as ready for review April 2, 2021 06:50
@Dandandan
Copy link
Contributor Author

No worries @alamb

@codecov-io
Copy link

codecov-io commented Apr 3, 2021

Codecov Report

Merging #9865 (4bb4dc5) into master (81f6521) will increase coverage by 0.07%.
The diff coverage is 87.85%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #9865      +/-   ##
==========================================
+ Coverage   82.70%   82.77%   +0.07%     
==========================================
  Files         257      260       +3     
  Lines       60486    60625     +139     
==========================================
+ Hits        50027    50185     +158     
+ Misses      10459    10440      -19     
Impacted Files Coverage Δ
rust/benchmarks/src/bin/tpch.rs 38.33% <0.00%> (ø)
rust/datafusion/src/physical_plan/mod.rs 88.00% <ø> (ø)
rust/datafusion/src/physical_plan/parquet.rs 87.57% <ø> (+0.29%) ⬆️
rust/datafusion/src/physical_plan/hash_join.rs 85.20% <77.08%> (+0.60%) ⬆️
...st/datafusion/src/physical_optimizer/merge_exec.rs 77.77% <77.77%> (ø)
rust/datafusion/src/execution/context.rs 92.58% <87.50%> (-0.45%) ⬇️
...afusion/src/physical_optimizer/coalesce_batches.rs 88.23% <88.23%> (ø)
rust/datafusion/src/physical_plan/planner.rs 80.22% <91.89%> (+0.29%) ⬆️
...t/datafusion/src/physical_optimizer/repartition.rs 96.72% <96.72%> (ø)
rust/datafusion/src/datasource/parquet.rs 94.40% <100.00%> (-0.04%) ⬇️
... and 25 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 5489bbf...4bb4dc5. Read the comment docs.

@alamb
Copy link
Contributor

alamb commented Apr 4, 2021

I spent some time looking at this.

It seems as if at least one problem is that the TopKExec user defined plan should declare that it takes only a single input partition from its child:

#[async_trait]
impl ExecutionPlan for TopKExec {
...
    fn required_child_distribution(&self) -> Distribution {
        Distribution::SinglePartition
    }
...

However, that is still not enough to get the tests to pass. My debugging of cargo test -p datafusion --test user_defined_plan suggests that the actual TopK stream never runs; I'll keep putzing with it

@Dandandan
Copy link
Contributor Author

Dandandan commented Apr 4, 2021

Thanks @alamb that gives some more pointers to continue with. I am also interested what the eventual (optimized) physical plan looks like. At least not having the single partition will avoid having the mergeexec and/or allow the repartitioning to be added as a direct child.

@Dandandan
Copy link
Contributor Author

Dandandan commented Apr 5, 2021

Somehow this is what the plan looks like - already before optimization(?) it seems it doesn't have children somehow

TopKExec

@Dandandan
Copy link
Contributor Author

Dandandan commented Apr 5, 2021

It seems something in the Repartition optimizer, when disabling it it succeeds now. An added repartition should always leed to a mergeexec, whenever the partition count is too high, so it must be something between repartion and addmergeexec?

@alamb
Copy link
Contributor

alamb commented Apr 5, 2021

Somehow this is what the plan looks like - already before optimization(?) it seems it doesn't have children somehow

I think this is due to the Debug implementation not printing out children recursively:

write!(f, "TopKExec")

@Dandandan
Copy link
Contributor Author

Somehow this is what the plan looks like - already before optimization(?) it seems it doesn't have children somehow

I think this is due to the Debug implementation not printing out children recursively:

write!(f, "TopKExec")

Yeah, I realized that later. Don't know how it is handled in others though, seems formatters implementations only need to include themselves, not their children?

@Dandandan
Copy link
Contributor Author

Somehow this is what the plan looks like - already before optimization(?) it seems it doesn't have children somehow

I think this is due to the Debug implementation not printing out children recursively:

write!(f, "TopKExec")

Yeah, I realized that later. Don't know how it is handled in others though, seems formatters implementations only need to include themselves, not their children?

Ah it is only defined like that on the logical plan...

@Dandandan
Copy link
Contributor Author

This is the optimized plan, which looks ok, there is a RepartitionExec wrapping the CsvExec (which is ok) and a MergeExec just before TopKExec:

TopKExec { input: MergeExec { input: ProjectionExec { expr: [(Column { name: "customer_id" }, "customer_id"), (Column { name: "revenue" }, "revenue")], schema: Schema { fields: [Field { name: "customer_id", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: None }, Field { name: "revenue", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: None }], metadata: {} }, input: CoalesceBatchesExec { input: RepartitionExec { input: CsvExec { path: "tests/customer.csv", filenames: ["tests/customer.csv"], schema: Schema { fields: [Field { name: "customer_id", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: None }, Field { name: "revenue", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: None }], metadata: {} }, has_header: false, delimiter: Some(44), file_extension: ".csv", projection: Some([0, 1]), projected_schema: Schema { fields: [Field { name: "customer_id", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: None }, Field { name: "revenue", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: None }], metadata: {} }, batch_size: 8192, limit: None }, partitioning: RoundRobinBatch(16), channels: Mutex { data: {} } }, target_batch_size: 4096 } } }, k: 3 }

@Dandandan
Copy link
Contributor Author

Something is happening with polling here and marking the stream as done. When I remove that part, the TopKExec will get results in a later moment. So the fold/aggregate is not getting all batches somehow?

@Dandandan
Copy link
Contributor Author

Dandandan commented Apr 8, 2021

@alamb I managed to fix the TopKExec test by moving the self.done = true inside the poll_next_unpin. I am not 100% what is the exact difference is in semantics? But it seems more similar as what's done in e.g. limit (#9926)

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this looks great @Dandandan - thanks!

@@ -471,12 +471,10 @@ impl Stream for TopKReader {
return Poll::Ready(None);
}
// this aggregates and thus returns a single RecordBatch.
self.done = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is probably a real bug -- the topK isn't actually done until it produces output -- and the way this code is written top_values.poll_next_unpin() may not return Poll::NotReady ? Maybe?

Anyhow, looks great

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was my intuition too, yeah. This seems to be more correct and fixes the issue.

@alamb
Copy link
Contributor

alamb commented Apr 8, 2021

I'll plan to merge this once CI is done (it looks like there is a bit of a backup now)

@alamb alamb closed this in 6bace6e Apr 9, 2021
GeorgeAp pushed a commit to sirensolutions/arrow that referenced this pull request Jun 7, 2021
This introduces a optimization pass to introduce repartition whenever the number of partitions of the plan drops below the configured amount of concurrency to optimize the amount of achievable concurrency.

This PR separates the optimizations into a `PhysicalOptimizer`, so this can be extended and built upon later.

The performance benefit is clear when loading data into memory with a single partition, to test the use case whenever we would have single files or in memory data has high enough throughput, but the single partition causes too little parallelism.
This has a similar performance benefit of pre-partitioning the data and loading it in memory in those queries.

```
cargo run --release --bin tpch --features "snmalloc" -- benchmark --iterations 30 --path [path --format parquet --query 1 --batch-size 8192 --concurrency 16 -m -n 1
```

Master

```
Query 1 avg time: 411.57 ms
Query 3 avg time: 147.32 ms
Query 5 avg time: 237.62 ms
Query 6 avg time: 46.00 ms
Query 12 avg time: 124.02 ms
```

PR
```
Query 1 avg time: 76.37 ms
Query 3 avg time: 67.51 ms
Query 5 avg time: 134.14 ms
Query 6 avg time: 9.58 ms
Query 12 avg time: 20.60 ms
```

All in all, looking good, we observe speed ups up to 6x for this test!

Closes apache#9865 from Dandandan/reparition-opt

Lead-authored-by: Heres, Daniel <danielheres@gmail.com>
Co-authored-by: Daniël Heres <danielheres@gmail.com>
Signed-off-by: Andrew Lamb <andrew@nerdnetworks.org>
michalursa pushed a commit to michalursa/arrow that referenced this pull request Jun 10, 2021
This introduces a optimization pass to introduce repartition whenever the number of partitions of the plan drops below the configured amount of concurrency to optimize the amount of achievable concurrency.

This PR separates the optimizations into a `PhysicalOptimizer`, so this can be extended and built upon later.

The performance benefit is clear when loading data into memory with a single partition, to test the use case whenever we would have single files or in memory data has high enough throughput, but the single partition causes too little parallelism.
This has a similar performance benefit of pre-partitioning the data and loading it in memory in those queries.

```
cargo run --release --bin tpch --features "snmalloc" -- benchmark --iterations 30 --path [path --format parquet --query 1 --batch-size 8192 --concurrency 16 -m -n 1
```

Master

```
Query 1 avg time: 411.57 ms
Query 3 avg time: 147.32 ms
Query 5 avg time: 237.62 ms
Query 6 avg time: 46.00 ms
Query 12 avg time: 124.02 ms
```

PR
```
Query 1 avg time: 76.37 ms
Query 3 avg time: 67.51 ms
Query 5 avg time: 134.14 ms
Query 6 avg time: 9.58 ms
Query 12 avg time: 20.60 ms
```

All in all, looking good, we observe speed ups up to 6x for this test!

Closes apache#9865 from Dandandan/reparition-opt

Lead-authored-by: Heres, Daniel <danielheres@gmail.com>
Co-authored-by: Daniël Heres <danielheres@gmail.com>
Signed-off-by: Andrew Lamb <andrew@nerdnetworks.org>
michalursa pushed a commit to michalursa/arrow that referenced this pull request Jun 13, 2021
This introduces a optimization pass to introduce repartition whenever the number of partitions of the plan drops below the configured amount of concurrency to optimize the amount of achievable concurrency.

This PR separates the optimizations into a `PhysicalOptimizer`, so this can be extended and built upon later.

The performance benefit is clear when loading data into memory with a single partition, to test the use case whenever we would have single files or in memory data has high enough throughput, but the single partition causes too little parallelism.
This has a similar performance benefit of pre-partitioning the data and loading it in memory in those queries.

```
cargo run --release --bin tpch --features "snmalloc" -- benchmark --iterations 30 --path [path --format parquet --query 1 --batch-size 8192 --concurrency 16 -m -n 1
```

Master

```
Query 1 avg time: 411.57 ms
Query 3 avg time: 147.32 ms
Query 5 avg time: 237.62 ms
Query 6 avg time: 46.00 ms
Query 12 avg time: 124.02 ms
```

PR
```
Query 1 avg time: 76.37 ms
Query 3 avg time: 67.51 ms
Query 5 avg time: 134.14 ms
Query 6 avg time: 9.58 ms
Query 12 avg time: 20.60 ms
```

All in all, looking good, we observe speed ups up to 6x for this test!

Closes apache#9865 from Dandandan/reparition-opt

Lead-authored-by: Heres, Daniel <danielheres@gmail.com>
Co-authored-by: Daniël Heres <danielheres@gmail.com>
Signed-off-by: Andrew Lamb <andrew@nerdnetworks.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants