-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-35703][SQL][FOLLOWUP] Only eliminate shuffles if partition keys contain all the join keys #35138
Conversation
@@ -1,90 +1,97 @@ | |||
TakeOrderedAndProject [i_item_id,i_item_desc,s_state,store_sales_quantitycount,store_sales_quantityave,store_sales_quantitystdev,store_sales_quantitycov,as_store_returns_quantitycount,as_store_returns_quantityave,as_store_returns_quantitystdev,store_returns_quantitycov,catalog_sales_quantitycount,catalog_sales_quantityave,catalog_sales_quantitystdev,catalog_sales_quantitycov] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked it locally. Now the plan golden files are exactly the same with the ones before #32875
// will add shuffles with the default partitioning of `ClusteredDistribution`, which uses all | ||
// the join keys. | ||
if (SQLConf.get.getConf(SQLConf.REQUIRE_ALL_JOIN_KEYS_AS_PARTITION_KEYS)) { | ||
distribution.clustering.forall(x => partitioning.expressions.exists(_.semanticEquals(x))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to require partitioning.expressions
to be exactly same with distribution.clustering
as well? e.g. for followed cases:
partitioning.expressions: [a, b]
distribution.clustering: [b, a]
partitioning.expressions: [a, b, a]
distribution.clustering: [a, b]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. To fully restore to the previous behavior, we should require an exact match, though I think the current change should cover the data skew issues.
I'll make the change to be conservative.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes the case where partitioning expressions is a superset of distribution clustering should already be rejected by HashPartitioning#satisfies
, but it maybe better to make it more explicit here.
@@ -396,6 +396,16 @@ object SQLConf { | |||
.booleanConf | |||
.createWithDefault(true) | |||
|
|||
val REQUIRE_ALL_JOIN_KEYS_AS_PARTITION_KEYS = | |||
buildConf("spark.sql.join.requireAllJoinKeysAsPartitionKeys") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would this config take effect for all physical operators having 2 children and requiring ClusteredDistribution
? Example like CoGroupExec
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, and they use HashClusteredDistribution
before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then should we make the config name more general?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about spark.sql.requireAllClusterKeysAsPartitionKeysToCoParition
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm slightly inclined to a shorter name like spark.sql.join.requireAllJoinKeysAsPartitionKeys
but it's just a personal flavor. Also I'd suggest something like spark.sql.enableStrictShuffleKeysCheck
but it's up to you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me. @cloud-fan did you actually observe the perf regression in real Spark jobs? just curious whether it'll be very common when this is config is disabled.
// will add shuffles with the default partitioning of `ClusteredDistribution`, which uses all | ||
// the join keys. | ||
if (SQLConf.get.getConf(SQLConf.REQUIRE_ALL_JOIN_KEYS_AS_PARTITION_KEYS)) { | ||
distribution.clustering.forall(x => partitioning.expressions.exists(_.semanticEquals(x))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes the case where partitioning expressions is a superset of distribution clustering should already be rejected by HashPartitioning#satisfies
, but it maybe better to make it more explicit here.
@@ -396,6 +396,16 @@ object SQLConf { | |||
.booleanConf | |||
.createWithDefault(true) | |||
|
|||
val REQUIRE_ALL_JOIN_KEYS_AS_PARTITION_KEYS = | |||
buildConf("spark.sql.join.requireAllJoinKeysAsPartitionKeys") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm slightly inclined to a shorter name like spark.sql.join.requireAllJoinKeysAsPartitionKeys
but it's just a personal flavor. Also I'd suggest something like spark.sql.enableStrictShuffleKeysCheck
but it's up to you.
@sunchao I haven't tried it on real workloads yet, but it's pretty obvious that we can construct a query with certain input data to expose this regression. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan could you fix the test failure: looks like org.apache.spark.sql.catalyst.ShuffleSpecSuite
is failing. LGTM after the test failures are addressed. I also compared the golden files with this PR and #32875 combined and there're no changes which is expected.
I am supporting to disable this feature by default to be safe. But just two cents from our production experience. We enabled the same feature (avoid shuffle if bucket keys are subset of join keys) by default in our production for several years, and didn't see much data skew issues. Our workload is not representative in industry, but just to provide some observation in one large scale environment. |
Thanks @c21 ! this is good data point. We're also planning to evaluate this feature in production jobs. |
I think we can still roll out this optimization later, with some heuristics to avoid bad cases. We just need more time to evaluate and do experiments. |
Hmm |
Oops we missed another one: "SPARK-27485: EnsureRequirements.reorder should handle duplicate expressions" in |
thanks for review, merging to master! |
…s contain all the join keys ### What changes were proposed in this pull request? This is a followup of apache#32875 . Basically apache#32875 did two improvements: 1. allow bucket join even if the bucket hash function is different from Spark's shuffle hash function 2. allow bucket join even if the hash partition keys are subset of join keys. The first improvement is the major target for implementing the SPIP "storage partition join". The second improvement is kind of a consequence of the framework refactor, which is not planned. This PR is to disable the second improvement by default, which may introduce perf regression if there are data skew without shuffle. We need more designs to enable this improvement, like checking the ndv. ### Why are the changes needed? Avoid perf regression ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? Closes apache#35138 from cloud-fan/join. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
This is a followup of #32875 . Basically #32875 did two improvements:
The first improvement is the major target for implementing the SPIP "storage partition join". The second improvement is kind of a consequence of the framework refactor, which is not planned.
This PR is to disable the second improvement by default, which may introduce perf regression if there are data skew without shuffle. We need more designs to enable this improvement, like checking the ndv.
Why are the changes needed?
Avoid perf regression
Does this PR introduce any user-facing change?
no
How was this patch tested?