-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-35703][SQL][FOLLOWUP] ValidateRequirements should check the co-partitioning requirement #35225
Conversation
@@ -441,7 +441,7 @@ case class HashShuffleSpec( | |||
distribution.clustering.zipWithIndex.foreach { case (distKey, distKeyPos) => | |||
distKeyToPos.getOrElseUpdate(distKey.canonicalized, mutable.BitSet.empty).add(distKeyPos) | |||
} | |||
partitioning.expressions.map(k => distKeyToPos(k.canonicalized)) | |||
partitioning.expressions.map(k => distKeyToPos.getOrElse(k.canonicalized, mutable.BitSet.empty)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is needed to pass the new test: https://github.com/apache/spark/pull/35225/files#diff-dd5267ed90ff4f7191facb46157728eee14f225ae64dc867d095aa924444f8f1R78
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks @cloud-fan !
val specs = children.map(_.outputPartitioning).zip(requiredChildDistributions).map { | ||
case (p, d) => p.createShuffleSpec(d.asInstanceOf[ClusteredDistribution]) | ||
} | ||
specs.tail.forall(_.isCompatibleWith(specs.head)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe we can add a logDebug
here too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for maintaining existing behavior.
testValidate(Seq(1, 2), Seq(1, 0), Seq(1, 0), 5, 5, false) | ||
} | ||
|
||
test("SMJ requirements not satisfied with unequal partition number") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to test for HashPartitioning(1)
and AllTuples
. I remember we had quite some bugs with AllTuples
before. But this comment is non-blocking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks @cloud-fan!
thanks for the review, merging to master! |
What changes were proposed in this pull request?
This is a followup of #32875 . This PR updates
ValidateRequirements
to match the new change in the partitioning-distribution framework, and check the co-partitioning requirement for join nodes.Why are the changes needed?
Fix bugs in
ValidateRequirements
Does this PR introduce any user-facing change?
no
How was this patch tested?
a new test suite