-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-41471][SQL] Reduce Spark shuffle when only one side of a join is KeyGroupedPartitioning #42194
Conversation
…is KeyGroupedPartitioning
Sorry for the delay. I'll take a look at this in the next 1-2 days. |
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
Outdated
Show resolved
Hide resolved
96bce41
to
b2b3a10
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update @Hisoka-X , looks much better now!
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except one minor comment.
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
cc @cloud-fan for another check too |
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering, will this work if other SPJ flags: spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled and spark.sql.sources.v2.bucketing.pushPartValues.enabled are set to true, and we have case of multiple split for a partition?
In that case, it seems the BatchScanExec of the side with KeyGroupedPartitioning will group partition splits, will it make it out-of-sync with the other side using KeyGroupedPartitioner?
It may also be hard to work with #42306 which uses a similar mechanism to group partition splits?
This is a problem, let me add a test case for this, maybe we should use partitionValue which after group partition splits. Thanks for point that, cc @sunchao |
Thanks, I think that would be great if we can somehow ! |
Yes good point @szehon-ho . If Perhaps we should use |
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me, just one nit. Thanks @Hisoka-X !
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! (again) Sorry @Hisoka-X I have a few more tiny nits. Otherwise it looks great to me!
sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala
Outdated
Show resolved
Hide resolved
Merged to master, thanks @Hisoka-X and @szehon-ho ! |
Thanks @sunchao and @szehon-ho ! |
buildConf("spark.sql.sources.v2.bucketing.shuffle.enabled") | ||
.doc("During a storage-partitioned join, whether to allow to shuffle only one side." + | ||
"When only one side is KeyGroupedPartitioning, if the conditions are met, spark will " + | ||
"only shuffle the other side. This optimization will reduce the amount of data that " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we make the algorithm smarter? If the other side is large, doing a KeyGroupedPartitioning
may lead to skew and it's still better to shuffle both sides with hash partitioning.
Let's think of an extreme case: one side reports KeyGroupedPartitioning
with only one partition, with this optimization, we end up with doing the join using a single thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the ShuffleSpec
"framework" in EnsureRequirements
already takes this into consideration. This PR mainly makes KeyGroupedShuffleSpec
behaves similar to HashShuffleSpec
and be able to shuffle the other side (via making canCreatePartitioning
return true).
…is KeyGroupedPartitioning ### What changes were proposed in this pull request? When only one side of a SPJ (Storage-Partitioned Join) is KeyGroupedPartitioning, Spark currently needs to shuffle both sides using HashPartitioning. However, we may just need to shuffle the other side according to the partition transforms defined in KeyGroupedPartitioning. This is especially useful when the other side is relatively small. 1. Add new config `spark.sql.sources.v2.bucketing.shuffle.enabled` to control this feature enable or not. 2. Add `KeyGroupedPartitioner` use to partition when we know the tranform value of another side (KeyGroupedPartitioning at now). Spark already know the partition value with partition id of KeyGroupedPartitioning side in `EnsureRequirements`. Then save it in `KeyGroupedPartitioner` use to shuffle another partition, to make sure the same key data will shuffle into same partition. 3. only `identity` transform will work now. Because have another problem for now, same transform between DS V2 connector implement and catalog function will report different value, before solve this problem, we should only support `identity`. eg: in test package, `YearFunction` https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala#L47 and https://github.com/apache/spark/blob/master/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala#L143 ### Why are the changes needed? Reduce data shuffle in specific SPJ scenarios ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? add new test Closes apache#42194 from Hisoka-X/SPARK-41471_one_side_keygroup. Authored-by: Jia Fan <fanjiaeminem@qq.com> Signed-off-by: Chao Sun <sunchao@apple.com>
What changes were proposed in this pull request?
When only one side of a SPJ (Storage-Partitioned Join) is KeyGroupedPartitioning, Spark currently needs to shuffle both sides using HashPartitioning. However, we may just need to shuffle the other side according to the partition transforms defined in KeyGroupedPartitioning. This is especially useful when the other side is relatively small.
spark.sql.sources.v2.bucketing.shuffle.enabled
to control this feature enable or not.KeyGroupedPartitioner
use to partition when we know the tranform value of another side (KeyGroupedPartitioning at now). Spark already know the partition value with partition id of KeyGroupedPartitioning side inEnsureRequirements
. Then save it inKeyGroupedPartitioner
use to shuffle another partition, to make sure the same key data will shuffle into same partition.identity
transform will work now. Because have another problem for now, same transform between DS V2 connector implement and catalog function will report different value, before solve this problem, we should only supportidentity
. eg: in test package,YearFunction
https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala#L47 and https://github.com/apache/spark/blob/master/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala#L143Why are the changes needed?
Reduce data shuffle in specific SPJ scenarios
Does this PR introduce any user-facing change?
No
How was this patch tested?
add new test