-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-9703] [SQL] Refactor EnsureRequirements to avoid certain unnecessary shuffles #7988
Conversation
Test build #40007 has finished for PR 7988 at commit
|
Test build #40024 has finished for PR 7988 at commit
|
Test build #40026 has finished for PR 7988 at commit
|
Test build #40045 has finished for PR 7988 at commit
|
Test build #40044 has finished for PR 7988 at commit
|
There's one more test that I need to add. |
Test build #40071 has finished for PR 7988 at commit
|
case (OrderedDistribution(ordering), rowOrdering, child) => | ||
addOperatorsIfNecessary(RangePartitioning(ordering, numPartitions), rowOrdering, child) | ||
def addShuffleIfNecessary(child: SparkPlan, requiredDistribution: Distribution): SparkPlan = { | ||
// A pre-condition of ensureDistributionAndOrdering is that joins' children have compatible |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This piece of reasoning is the trickiest part of this entire patch. Is this a valid argument given the current semantics of guarantees()
and satisfies()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be cleaned up tomorrow when I consolidate the shuffle steps and make the assertions / invariants clearer.
Test build #40077 has finished for PR 7988 at commit
|
case Seq(a) => true | ||
case Seq(a, b) => | ||
if (a.numPartitions != b.numPartitions) { | ||
assert(!a.guarantees(b) && !b.guarantees(a)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we are using guarantees
to check if sibling operators are compatible or not. How about we just get back compatible with?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sounds good to me. Let's revisit this naming later once we actually break this symmetry.
Test build #40087 has finished for PR 7988 at commit
|
Test build #40080 has finished for PR 7988 at commit
|
Test build #40098 has finished for PR 7988 at commit
|
Yin and I discussed offline and I think that I see a way to greatly simplify this patch while making it easier to understand. Will update after I unblock my SMJ patch. |
Test build #40224 has finished for PR 7988 at commit
|
@@ -75,6 +75,37 @@ case class OrderedDistribution(ordering: Seq[SortOrder]) extends Distribution { | |||
def clustering: Set[Expression] = ordering.map(_.child).toSet | |||
} | |||
|
|||
/** | |||
* Describes how an operator's output is split across partitions. The `satisfies`, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Editing note: should exploit parallelism between these two lists, since that's better writing. Will copyedit tomorrow.
Alright, I've simplified these |
Test build #40262 has finished for PR 7988 at commit
|
/cc @yhuai for final proofreading and sign-off. |
Looks very good! I am merging it to master and branch 1.5. |
…essary shuffles This pull request refactors the `EnsureRequirements` planning rule in order to avoid the addition of certain unnecessary shuffles. As an example of how unnecessary shuffles can occur, consider SortMergeJoin, which requires clustered distribution and sorted ordering of its children's input rows. Say that both of SMJ's children produce unsorted output but are both SinglePartition. In this case, we will need to inject sort operators but should not need to inject Exchanges. Unfortunately, it looks like the EnsureRequirements unnecessarily repartitions using a hash partitioning. This patch solves this problem by refactoring `EnsureRequirements` to properly implement the `compatibleWith` checks that were broken in earlier implementations. See the significant inline comments for a better description of how this works. The majority of this PR is new comments and test cases, with few actual changes to the code. Author: Josh Rosen <joshrosen@databricks.com> Closes #7988 from JoshRosen/exchange-fixes and squashes the following commits: 38006e7 [Josh Rosen] Rewrite EnsureRequirements _yet again_ to make things even simpler 0983f75 [Josh Rosen] More guarantees vs. compatibleWith cleanup; delete BroadcastPartitioning. 8784bd9 [Josh Rosen] Giant comment explaining compatibleWith vs. guarantees 1307c50 [Josh Rosen] Update conditions for requiring child compatibility. 18cddeb [Josh Rosen] Rename DummyPlan to DummySparkPlan. 2c7e126 [Josh Rosen] Merge remote-tracking branch 'origin/master' into exchange-fixes fee65c4 [Josh Rosen] Further refinement to comments / reasoning 642b0bb [Josh Rosen] Further expand comment / reasoning 06aba0c [Josh Rosen] Add more comments 8dbc845 [Josh Rosen] Add even more tests. 4f08278 [Josh Rosen] Fix the test by adding the compatibility check to EnsureRequirements a1c12b9 [Josh Rosen] Add failing test to demonstrate allCompatible bug 0725a34 [Josh Rosen] Small assertion cleanup. 5172ac5 [Josh Rosen] Add test for requiresChildrenToProduceSameNumberOfPartitions. 2e0f33a [Josh Rosen] Write a more generic test for EnsureRequirements. 752b8de [Josh Rosen] style fix c628daf [Josh Rosen] Revert accidental ExchangeSuite change. c9fb231 [Josh Rosen] Rewrite exchange to fix better handle this case. adcc742 [Josh Rosen] Move test to PlannerSuite. 0675956 [Josh Rosen] Preserving ordering and partitioning in row format converters also does not help. cc5669c [Josh Rosen] Adding outputPartitioning to Repartition does not fix the test. 2dfc648 [Josh Rosen] Add failing test illustrating bad exchange planning. (cherry picked from commit 23cf5af) Signed-off-by: Yin Huai <yhuai@databricks.com>
object Partitioning { | ||
def allCompatible(partitionings: Seq[Partitioning]): Boolean = { | ||
// Note: this assumes transitivity | ||
partitionings.sliding(2).map { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: we can use forall
instead of map
here and remove the forall
at the end.
…essary shuffles This pull request refactors the `EnsureRequirements` planning rule in order to avoid the addition of certain unnecessary shuffles. As an example of how unnecessary shuffles can occur, consider SortMergeJoin, which requires clustered distribution and sorted ordering of its children's input rows. Say that both of SMJ's children produce unsorted output but are both SinglePartition. In this case, we will need to inject sort operators but should not need to inject Exchanges. Unfortunately, it looks like the EnsureRequirements unnecessarily repartitions using a hash partitioning. This patch solves this problem by refactoring `EnsureRequirements` to properly implement the `compatibleWith` checks that were broken in earlier implementations. See the significant inline comments for a better description of how this works. The majority of this PR is new comments and test cases, with few actual changes to the code. Author: Josh Rosen <joshrosen@databricks.com> Closes apache#7988 from JoshRosen/exchange-fixes and squashes the following commits: 38006e7 [Josh Rosen] Rewrite EnsureRequirements _yet again_ to make things even simpler 0983f75 [Josh Rosen] More guarantees vs. compatibleWith cleanup; delete BroadcastPartitioning. 8784bd9 [Josh Rosen] Giant comment explaining compatibleWith vs. guarantees 1307c50 [Josh Rosen] Update conditions for requiring child compatibility. 18cddeb [Josh Rosen] Rename DummyPlan to DummySparkPlan. 2c7e126 [Josh Rosen] Merge remote-tracking branch 'origin/master' into exchange-fixes fee65c4 [Josh Rosen] Further refinement to comments / reasoning 642b0bb [Josh Rosen] Further expand comment / reasoning 06aba0c [Josh Rosen] Add more comments 8dbc845 [Josh Rosen] Add even more tests. 4f08278 [Josh Rosen] Fix the test by adding the compatibility check to EnsureRequirements a1c12b9 [Josh Rosen] Add failing test to demonstrate allCompatible bug 0725a34 [Josh Rosen] Small assertion cleanup. 5172ac5 [Josh Rosen] Add test for requiresChildrenToProduceSameNumberOfPartitions. 2e0f33a [Josh Rosen] Write a more generic test for EnsureRequirements. 752b8de [Josh Rosen] style fix c628daf [Josh Rosen] Revert accidental ExchangeSuite change. c9fb231 [Josh Rosen] Rewrite exchange to fix better handle this case. adcc742 [Josh Rosen] Move test to PlannerSuite. 0675956 [Josh Rosen] Preserving ordering and partitioning in row format converters also does not help. cc5669c [Josh Rosen] Adding outputPartitioning to Repartition does not fix the test. 2dfc648 [Josh Rosen] Add failing test illustrating bad exchange planning.
This pull request refactors the
EnsureRequirements
planning rule in order to avoid the addition of certain unnecessary shuffles.As an example of how unnecessary shuffles can occur, consider SortMergeJoin, which requires clustered distribution and sorted ordering of its children's input rows. Say that both of SMJ's children produce unsorted output but are both SinglePartition. In this case, we will need to inject sort operators but should not need to inject Exchanges. Unfortunately, it looks like the EnsureRequirements unnecessarily repartitions using a hash partitioning.
This patch solves this problem by refactoring
EnsureRequirements
to properly implement thecompatibleWith
checks that were broken in earlier implementations. See the significant inline comments for a better description of how this works. The majority of this PR is new comments and test cases, with few actual changes to the code.