-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Mitigate Writer skewness when writing partitioned data with preferred partitioning enabled #14718
Mitigate Writer skewness when writing partitioned data with preferred partitioning enabled #14718
Conversation
core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/LocalExchange.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/optimizations/BeginTableWrite.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/plan/TableWriterNode.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/plan/TableWriterNode.java
Outdated
Show resolved
Hide resolved
@@ -103,7 +103,7 @@ public void testScaledWritersUsedAndTargetSupportsIt() | |||
PlanNode root = planBuilder.output( | |||
outputBuilder -> outputBuilder | |||
.source(planBuilder.tableWithExchangeCreate( | |||
planBuilder.createTarget(catalogSupportingScaledWriters, schemaTableName, true), | |||
planBuilder.createTarget(catalogSupportingScaledWriters, schemaTableName, true, true), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: consider introducing builder
plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/LocalExchange.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/LocalExchange.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/LocalExchange.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/LocalExchangeSourceOperator.java
Outdated
Show resolved
Hide resolved
import static io.trino.sql.planner.SystemPartitioningHandle.SINGLE_DISTRIBUTION; | ||
import static java.util.Objects.requireNonNull; | ||
import static java.util.function.Function.identity; | ||
|
||
@ThreadSafe | ||
public class LocalExchange | ||
{ | ||
private static final int MIN_PARTITION_COUNT_PER_WRITER = 128; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name is bit generic. Could you also add javadoc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It feels like the granularity like this is a little bit of an overkill. With the task concurrency of 32 the implementation would have to re-balance 4096 partitions and keep the state for each partition in memory. Generally there's only a few partitions that are skewed. I wonder if we should start with something lower, maybe 8? (32 * 8 = 256)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With 8 or some other low value, it could be possible that a few small partitions are scaled along with skewed ones because they belong to the same bucket. But, it might be okay since 256 is per node. WDYT? @sopel39
core/trino-main/src/main/java/io/trino/operator/exchange/LocalExchange.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/LocalExchange.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/LocalExchange.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/LocalExchange.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/SystemPartitioningHandle.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/LocalExchange.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/LocalExchange.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/LocalExchange.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/optimizations/AddLocalExchanges.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/LocalExchange.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningManager.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningManager.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningManager.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningManager.java
Outdated
Show resolved
Hide resolved
core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningManager.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningManager.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningManager.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningManager.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningManager.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningManager.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningManager.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningManager.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningManager.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/resourcegroups/IndexedPriorityQueue.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningExchanger.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/resourcegroups/IndexedPriorityQueue.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/optimizations/BeginTableWrite.java
Outdated
Show resolved
Hide resolved
core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorTableLayout.java
Outdated
Show resolved
Hide resolved
core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorTableLayout.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/LocalExchange.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/ScaleWriterPartitioningHandle.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/ScaleWriterPartitioningHandle.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/sanity/ValidateScaledWritersUsage.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenTaskSourceFactory.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/ScaleWriterPartitioningHandle.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/ScaleWriterPartitioningHandle.java
Outdated
Show resolved
Hide resolved
Up to "Introduce ScaleWriterPartitioningHandle " lgtm % comments |
I think there are two reasons:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm % Add scaleWriters flag in PartitioningHandle
core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/PartitioningHandle.java
Outdated
Show resolved
Hide resolved
lgtm until Add scaleWriters flag in PartitioningHandle
core/trino-main/src/main/java/io/trino/operator/exchange/LocalExchange.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/LocalExchange.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
// Update the partitions row count state which will help with scaling partitions across writers | ||
partitionRebalancer.addPartitionsRowCount(writerPartitionIdToRowCount.buildOrThrow()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MAX_PARTITIONS_TO_REBALANCE_PER_WRITER * 32 = 4096
, which means in worst case writerPartitionIdToRowCount
will be as big as page.
Consider reversing the relationship, e.g: partitionRebalancer
fetching writerPartitionIdToRowCount
from ScaleWriterPartitioningExchanger
instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about this but it just makes the code more complex. I wonder if it's worth the effort.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reversed the relationship. PTAL
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/execution/resourcegroups/IndexedPriorityQueue.java
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
In trino, the CPU time is almost similar for all related queries. So to better understand, I created this doc containing CPU utilization from the AWS console. https://docs.google.com/document/d/1Bg5z-EzavtkXnBfhNLdJE3ngXRgrYrmmTVRJwiMhpTE/edit?usp=sharing |
@arhimondr @sopel39 PTAL again |
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM % a couple of nits
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningExchanger.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningExchanger.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningExchanger.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/LocalExchange.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningExchanger.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningExchanger.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningExchanger.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningExchanger.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningExchanger.java
Outdated
Show resolved
Hide resolved
@sopel39 PTAL again |
core/trino-main/src/main/java/io/trino/operator/exchange/ScaleWriterPartitioningExchanger.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/PartitioningScheme.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/SystemPartitioningHandle.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/optimizations/AddLocalExchanges.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/optimizations/AddLocalExchanges.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm % comments
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
% comments
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Show resolved
Hide resolved
{ | ||
private final List<Integer> writerAssignments; | ||
// Partition estimated physical written bytes at the end of last rebalance cycle | ||
private final AtomicLong physicalWrittenBytesAtLastRebalance = new AtomicLong(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: it might be clearer if you had a field like lastScaleUpPhysicalWrittenBytes
and didn't set physicalWrittenBytesAtLastRebalance
to 0
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/test/java/io/trino/operator/exchange/TestLocalExchange.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/test/java/io/trino/operator/exchange/TestUniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/test/java/io/trino/operator/exchange/TestUniformPartitionRebalancer.java
Show resolved
Hide resolved
core/trino-main/src/test/java/io/trino/operator/exchange/TestUniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/test/java/io/trino/operator/exchange/TestUniformPartitionRebalancer.java
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/exchange/UniformPartitionRebalancer.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/test/java/io/trino/operator/exchange/TestLocalExchange.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/optimizations/AddLocalExchanges.java
Outdated
Show resolved
Hide resolved
First figure out if there's a skewness across writers in a node, then find the biggest partitions in the skewed writers and scale them across writers which are on the lower end i.e. written smallest amount of data. Scaling will only happen if the skewness is above 70% and the partition to be scaled has written atleast writerMinSize since last scale up.
Description
Issue: #13379
Problem
Improve the performance of partitioned writes specifically in case writers/partitions are skewed.
Right now, prefer-partitioning only works if you have statistics and the number of partitions is greater than preferred-write-partitioning-min-number-of-partitions (default to 50). However, we know that stats are not always guaranteed to be present in which case partitioned writes will go through from an inefficient route. But with scaling, we could enable prefer-partitioning for any number of partitions thus we don't have to rely on statistics.
Benchmarks
Cluster with 6 worker nodes
1.) Single partition (260M rows)
1:31 mins
18:34 mins
2:55 mins
2.) 8 partitions (600M rows)
2:11 mins
6:23 mins
1:48 mins
3.) 2000+ partitions with almost no skewness (600M rows)
13:23 mins
11:32 mins
10:55 mins
4.) 2000+ partitions with 6 skewed partitions (2.74B rows)
22:18 mins
(400GB peak memory)55:26 mins
(76.1GB peak memory)20:33 mins
(79.5GB peak memory)In experiments 3 and 4, the finishing time is also substantial and included in the measurements (almost +9 mins)
Non-technical explanation
Release notes
( ) This is not user-visible or docs only and no release notes are required.
( ) Release notes are required, please propose a release note for me.
( ) Release notes are required, with the following suggested text: