Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Join intermediate tasks for small partitions #11023

Merged
merged 1 commit into from
Mar 9, 2022

Conversation

losipiuk
Copy link
Member

@losipiuk losipiuk commented Feb 11, 2022

Description

If partitions produced by upstream tasks are small it is sub-optimal to create a separate task for each partition.
With this commit, a single task can read data from multiple input partitions; target input size is configured via fault-tolerant-execution-target-task-input-size.
If the task is also reading source data (could be the case e.g if there is a join vs bucketed table and join key matches bucketing), the task sizing takes input split
weights into account (configured via fault-tolerant-execution-target-task-split-count).

Based on #10837 (review just last commit)

General information

Is this change a fix, improvement, new feature, refactoring, or other?

improvement

Is this a change to the core query engine, a connector, client library, or the SPI interfaces? (be specific)

core query engine

How would you describe this change to a non-technical end user or system administrator?

N/A

Documentation

(x) No documentation is needed.
( ) Sufficient documentation is included in this PR.
( ) Documentation PR is available with #prnumber.
( ) Documentation issue #issuenumber is filed, and can be handled later.

Release notes

(x) No release notes entries required.
( ) Release notes entries required with the following suggested text:

@losipiuk losipiuk force-pushed the lo/adaptive-task-sizeing branch from 9f0c8a6 to cd428c0 Compare February 14, 2022 19:56
@cla-bot cla-bot bot added the cla-signed label Feb 14, 2022
@losipiuk losipiuk marked this pull request as ready for review February 14, 2022 20:03
@losipiuk losipiuk force-pushed the lo/adaptive-task-sizeing branch from cd428c0 to f8e1b1b Compare February 14, 2022 20:04
if ((splitsWeight > 0 || exchangeSourcesSize > 0)
&& ((splitsWeight + taskSplitWeight) > targetPartitionSplitWeight || (exchangeSourcesSize + taskExchangeSourcesSize + replicatedExchangeSourcesSize) > targetPartitionSourceSizeInBytes)) {
exchangeSources.putAll(replicatedExchangeSourceHandles); // add replicated exchanges
joinedTasks.add(new TaskDescriptor(taskPartitionId++, splits.build(), exchangeSources.build(), groupNodeRequirements));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QQ: does task execution automatically takes care of processing the splits and partitions corresponding to the same hash?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Execution is dumb. But each of TaskDescriptors that is passed to postprocessTasks handles a single partition (splits read data from table buckets that match exchange partition).
In postprocessTasks I merge some TaskDescriptors if they are too small.

@arhimondr arhimondr mentioned this pull request Feb 15, 2022
31 tasks
@losipiuk losipiuk force-pushed the lo/adaptive-task-sizeing branch from f8e1b1b to f8e1f5b Compare February 17, 2022 10:25
@losipiuk
Copy link
Member Author

rebased

private long sourceHandleSize(ExchangeSourceHandle handle)
{
Exchange exchange = exchangeForHandle.get(handle);
ExchangeSourceStatistics exchangeSourceStatistics = exchange.getExchangeSourceStatistics(handle);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking if it isn't a design mistake to have the getExchangeSourceStatistics method under the Exchange interface instead of the ExchangeManager. Maintaining these mappings could be costly if the number of partitions is high and it seems to be completely unnecessary. Do you think it is worth fixing it? (could be done as a follow up PR)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be move there.
But then so should ExchangeSourceSplitter split(ExchangeSourceHandle handle, long targetSizeInBytes);, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leaving as a followup

@losipiuk losipiuk requested a review from findepi February 18, 2022 12:04
@losipiuk losipiuk force-pushed the lo/adaptive-task-sizeing branch from f8e1f5b to 5c1dc6a Compare February 18, 2022 12:32
@losipiuk losipiuk requested a review from sopel39 February 23, 2022 13:42
@losipiuk losipiuk force-pushed the lo/adaptive-task-sizeing branch from 5c1dc6a to d9c63f1 Compare March 2, 2022 19:02
@losipiuk
Copy link
Member Author

losipiuk commented Mar 2, 2022

(rebased)

@@ -22,6 +22,7 @@
import com.google.common.collect.ImmutableSet;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you extend commit message to describe what this actually means?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed. Hope it is better now.

@losipiuk losipiuk force-pushed the lo/adaptive-task-sizeing branch from d9c63f1 to 4456068 Compare March 8, 2022 11:01
If partitions produced by upstream tasks are small it is sub-optimal to
create a separate task for each partition.
With this commit, a single task can read data from multiple input
partitions; target input size is configured via
fault-tolerant-execution-target-task-input-size.
If the task is also reading source data (could be the case e.g if there
is a join vs bucketed table and join key matches bucketing), the task
sizing takes input split
weights into account (configured via
fault-tolerant-execution-target-task-split-count).
@losipiuk losipiuk force-pushed the lo/adaptive-task-sizeing branch from 4456068 to 85ad8ca Compare March 8, 2022 16:37
@losipiuk
Copy link
Member Author

losipiuk commented Mar 9, 2022

CI: #11388

@losipiuk losipiuk merged commit c9f6e77 into trinodb:master Mar 9, 2022
@github-actions github-actions bot added this to the 373 milestone Mar 9, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

4 participants