Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-41471][SQL] Reduce Spark shuffle when only one side of a join is KeyGroupedPartitioning #42194

Closed
wants to merge 18 commits into from

Conversation

Hisoka-X
Copy link
Member

@Hisoka-X Hisoka-X commented Jul 28, 2023

What changes were proposed in this pull request?

When only one side of a SPJ (Storage-Partitioned Join) is KeyGroupedPartitioning, Spark currently needs to shuffle both sides using HashPartitioning. However, we may just need to shuffle the other side according to the partition transforms defined in KeyGroupedPartitioning. This is especially useful when the other side is relatively small.

  1. Add new config spark.sql.sources.v2.bucketing.shuffle.enabled to control this feature enable or not.
  2. Add KeyGroupedPartitioner use to partition when we know the tranform value of another side (KeyGroupedPartitioning at now). Spark already know the partition value with partition id of KeyGroupedPartitioning side in EnsureRequirements. Then save it in KeyGroupedPartitioner use to shuffle another partition, to make sure the same key data will shuffle into same partition.
  3. only identity transform will work now. Because have another problem for now, same transform between DS V2 connector implement and catalog function will report different value, before solve this problem, we should only support identity. eg: in test package, YearFunction https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala#L47 and https://github.com/apache/spark/blob/master/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala#L143

Why are the changes needed?

Reduce data shuffle in specific SPJ scenarios

Does this PR introduce any user-facing change?

No

How was this patch tested?

add new test

@Hisoka-X
Copy link
Member Author

cc @sunchao @cloud-fan

@sunchao
Copy link
Member

sunchao commented Aug 3, 2023

Sorry for the delay. I'll take a look at this in the next 1-2 days.

@Hisoka-X Hisoka-X force-pushed the SPARK-41471_one_side_keygroup branch from 96bce41 to b2b3a10 Compare August 9, 2023 06:55
Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update @Hisoka-X , looks much better now!

Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except one minor comment.

@sunchao
Copy link
Member

sunchao commented Aug 12, 2023

cc @cloud-fan for another check too

Copy link
Contributor

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering, will this work if other SPJ flags: spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled and spark.sql.sources.v2.bucketing.pushPartValues.enabled are set to true, and we have case of multiple split for a partition?

In that case, it seems the BatchScanExec of the side with KeyGroupedPartitioning will group partition splits, will it make it out-of-sync with the other side using KeyGroupedPartitioner?

It may also be hard to work with #42306 which uses a similar mechanism to group partition splits?

@Hisoka-X
Copy link
Member Author

I'm wondering, will this work if other SPJ flags: spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled and spark.sql.sources.v2.bucketing.pushPartValues.enabled are set to true, and we have case of multiple split for a partition?

In that case, it seems the BatchScanExec of the side with KeyGroupedPartitioning will group partition splits, will it make it out-of-sync with the other side using KeyGroupedPartitioner?

It may also be hard to work with #42306 which uses a similar mechanism to group partition splits?

This is a problem, let me add a test case for this, maybe we should use partitionValue which after group partition splits. Thanks for point that, cc @sunchao

@szehon-ho
Copy link
Contributor

maybe we should use partitionValue which after group partition splits.

Thanks, I think that would be great if we can somehow !

@sunchao
Copy link
Member

sunchao commented Aug 15, 2023

Yes good point @szehon-ho . If spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled and spark.sql.sources.v2.bucketing.pushPartValues.enabled are both turned on, this may not work since the data on the hash partitioning side is shuffled according to the partition values before the grouping, which contain duplicates.

Perhaps we should use KeyGroupedPartitioning.uniquePartitionValues when computing valueMap.

Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me, just one nit. Thanks @Hisoka-X !

Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! (again) Sorry @Hisoka-X I have a few more tiny nits. Otherwise it looks great to me!

@sunchao sunchao closed this in ce12f6d Aug 24, 2023
@sunchao
Copy link
Member

sunchao commented Aug 24, 2023

Merged to master, thanks @Hisoka-X and @szehon-ho !

@Hisoka-X
Copy link
Member Author

Thanks @sunchao and @szehon-ho !

buildConf("spark.sql.sources.v2.bucketing.shuffle.enabled")
.doc("During a storage-partitioned join, whether to allow to shuffle only one side." +
"When only one side is KeyGroupedPartitioning, if the conditions are met, spark will " +
"only shuffle the other side. This optimization will reduce the amount of data that " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we make the algorithm smarter? If the other side is large, doing a KeyGroupedPartitioning may lead to skew and it's still better to shuffle both sides with hash partitioning.

Let's think of an extreme case: one side reports KeyGroupedPartitioning with only one partition, with this optimization, we end up with doing the join using a single thread.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the ShuffleSpec "framework" in EnsureRequirements already takes this into consideration. This PR mainly makes KeyGroupedShuffleSpec behaves similar to HashShuffleSpec and be able to shuffle the other side (via making canCreatePartitioning return true).

@Hisoka-X Hisoka-X deleted the SPARK-41471_one_side_keygroup branch September 1, 2023 12:34
szehon-ho pushed a commit to szehon-ho/spark that referenced this pull request Feb 7, 2024
…is KeyGroupedPartitioning

### What changes were proposed in this pull request?
When only one side of a SPJ (Storage-Partitioned Join) is KeyGroupedPartitioning, Spark currently needs to shuffle both sides using HashPartitioning. However, we may just need to shuffle the other side according to the partition transforms defined in KeyGroupedPartitioning. This is especially useful when the other side is relatively small.
1. Add new config `spark.sql.sources.v2.bucketing.shuffle.enabled` to control this feature enable or not.
2. Add `KeyGroupedPartitioner` use to partition when we know the tranform value of another side (KeyGroupedPartitioning at now). Spark already know the partition value with partition id of KeyGroupedPartitioning side in `EnsureRequirements`. Then save it in `KeyGroupedPartitioner` use to shuffle another partition, to make sure the same key data will shuffle into same partition.
3. only `identity` transform will work now. Because have another problem for now, same transform between DS V2 connector implement and catalog function will report different value, before solve this problem, we should only support `identity`. eg: in test package, `YearFunction` https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala#L47 and https://github.com/apache/spark/blob/master/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala#L143

### Why are the changes needed?
Reduce data shuffle in specific SPJ scenarios

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
add new test

Closes apache#42194 from Hisoka-X/SPARK-41471_one_side_keygroup.

Authored-by: Jia Fan <fanjiaeminem@qq.com>
Signed-off-by: Chao Sun <sunchao@apple.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants