-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Join intermediate tasks for small partitions #11023
Conversation
9f0c8a6
to
cd428c0
Compare
cd428c0
to
f8e1b1b
Compare
if ((splitsWeight > 0 || exchangeSourcesSize > 0) | ||
&& ((splitsWeight + taskSplitWeight) > targetPartitionSplitWeight || (exchangeSourcesSize + taskExchangeSourcesSize + replicatedExchangeSourcesSize) > targetPartitionSourceSizeInBytes)) { | ||
exchangeSources.putAll(replicatedExchangeSourceHandles); // add replicated exchanges | ||
joinedTasks.add(new TaskDescriptor(taskPartitionId++, splits.build(), exchangeSources.build(), groupNodeRequirements)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
QQ: does task execution automatically takes care of processing the splits and partitions corresponding to the same hash?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Execution is dumb. But each of TaskDescriptor
s that is passed to postprocessTasks
handles a single partition (splits read data from table buckets that match exchange partition).
In postprocessTasks
I merge some TaskDescriptors
if they are too small.
f8e1b1b
to
f8e1f5b
Compare
rebased |
core/trino-main/src/main/java/io/trino/execution/scheduler/StageTaskSourceFactory.java
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/scheduler/StageTaskSourceFactory.java
Outdated
Show resolved
Hide resolved
private long sourceHandleSize(ExchangeSourceHandle handle) | ||
{ | ||
Exchange exchange = exchangeForHandle.get(handle); | ||
ExchangeSourceStatistics exchangeSourceStatistics = exchange.getExchangeSourceStatistics(handle); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm thinking if it isn't a design mistake to have the getExchangeSourceStatistics
method under the Exchange
interface instead of the ExchangeManager
. Maintaining these mappings could be costly if the number of partitions is high and it seems to be completely unnecessary. Do you think it is worth fixing it? (could be done as a follow up PR)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It could be move there.
But then so should ExchangeSourceSplitter split(ExchangeSourceHandle handle, long targetSizeInBytes);
, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
leaving as a followup
f8e1f5b
to
5c1dc6a
Compare
5c1dc6a
to
d9c63f1
Compare
(rebased) |
@@ -22,6 +22,7 @@ | |||
import com.google.common.collect.ImmutableSet; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you extend commit message to describe what this actually means?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed. Hope it is better now.
d9c63f1
to
4456068
Compare
If partitions produced by upstream tasks are small it is sub-optimal to create a separate task for each partition. With this commit, a single task can read data from multiple input partitions; target input size is configured via fault-tolerant-execution-target-task-input-size. If the task is also reading source data (could be the case e.g if there is a join vs bucketed table and join key matches bucketing), the task sizing takes input split weights into account (configured via fault-tolerant-execution-target-task-split-count).
4456068
to
85ad8ca
Compare
CI: #11388 |
Description
If partitions produced by upstream tasks are small it is sub-optimal to create a separate task for each partition.
With this commit, a single task can read data from multiple input partitions; target input size is configured via
fault-tolerant-execution-target-task-input-size
.If the task is also reading source data (could be the case e.g if there is a join vs bucketed table and join key matches bucketing), the task sizing takes input split
weights into account (configured via
fault-tolerant-execution-target-task-split-count
).Based on #10837 (review just last commit)General information
improvement
core query engine
N/A
Documentation
(x) No documentation is needed.
( ) Sufficient documentation is included in this PR.
( ) Documentation PR is available with #prnumber.
( ) Documentation issue #issuenumber is filed, and can be handled later.
Release notes
(x) No release notes entries required.
( ) Release notes entries required with the following suggested text: