Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-9703] [SQL] Refactor EnsureRequirements to avoid certain unnecessary shuffles #7988

Closed
wants to merge 22 commits into from

Conversation

JoshRosen
Copy link
Contributor

This pull request refactors the EnsureRequirements planning rule in order to avoid the addition of certain unnecessary shuffles.

As an example of how unnecessary shuffles can occur, consider SortMergeJoin, which requires clustered distribution and sorted ordering of its children's input rows. Say that both of SMJ's children produce unsorted output but are both SinglePartition. In this case, we will need to inject sort operators but should not need to inject Exchanges. Unfortunately, it looks like the EnsureRequirements unnecessarily repartitions using a hash partitioning.

This patch solves this problem by refactoring EnsureRequirements to properly implement the compatibleWith checks that were broken in earlier implementations. See the significant inline comments for a better description of how this works. The majority of this PR is new comments and test cases, with few actual changes to the code.

@SparkQA
Copy link

SparkQA commented Aug 6, 2015

Test build #40007 has finished for PR 7988 at commit 0675956.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 6, 2015

Test build #40024 has finished for PR 7988 at commit c628daf.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 6, 2015

Test build #40026 has finished for PR 7988 at commit 752b8de.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 6, 2015

Test build #40045 has finished for PR 7988 at commit 0725a34.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 6, 2015

Test build #40044 has finished for PR 7988 at commit 2e0f33a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen JoshRosen changed the title [SQL] [WIP] Demonstration of issue in Exchange planning [SPARK-9703] [SQL] [WIP] EnsureRequirements should not add unnecessary shuffles when only ordering requirements are unsatisfied Aug 6, 2015
@JoshRosen
Copy link
Contributor Author

There's one more test that I need to add.

@SparkQA
Copy link

SparkQA commented Aug 6, 2015

Test build #40071 has finished for PR 7988 at commit 4f08278.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

case (OrderedDistribution(ordering), rowOrdering, child) =>
addOperatorsIfNecessary(RangePartitioning(ordering, numPartitions), rowOrdering, child)
def addShuffleIfNecessary(child: SparkPlan, requiredDistribution: Distribution): SparkPlan = {
// A pre-condition of ensureDistributionAndOrdering is that joins' children have compatible
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This piece of reasoning is the trickiest part of this entire patch. Is this a valid argument given the current semantics of guarantees() and satisfies()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be cleaned up tomorrow when I consolidate the shuffle steps and make the assertions / invariants clearer.

@JoshRosen JoshRosen changed the title [SPARK-9703] [SQL] [WIP] EnsureRequirements should not add unnecessary shuffles when only ordering requirements are unsatisfied [SPARK-9703] [SQL] EnsureRequirements should not add unnecessary shuffles when only ordering requirements are unsatisfied Aug 6, 2015
@SparkQA
Copy link

SparkQA commented Aug 6, 2015

Test build #40077 has finished for PR 7988 at commit 8dbc845.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

case Seq(a) => true
case Seq(a, b) =>
if (a.numPartitions != b.numPartitions) {
assert(!a.guarantees(b) && !b.guarantees(a))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are using guarantees to check if sibling operators are compatible or not. How about we just get back compatible with?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds good to me. Let's revisit this naming later once we actually break this symmetry.

@SparkQA
Copy link

SparkQA commented Aug 7, 2015

Test build #40087 has finished for PR 7988 at commit 642b0bb.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 7, 2015

Test build #40080 has finished for PR 7988 at commit 06aba0c.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 7, 2015

Test build #40098 has finished for PR 7988 at commit fee65c4.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

Yin and I discussed offline and I think that I see a way to greatly simplify this patch while making it easier to understand. Will update after I unblock my SMJ patch.

@SparkQA
Copy link

SparkQA commented Aug 8, 2015

Test build #40224 has finished for PR 7988 at commit 8784bd9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -75,6 +75,37 @@ case class OrderedDistribution(ordering: Seq[SortOrder]) extends Distribution {
def clustering: Set[Expression] = ordering.map(_.child).toSet
}

/**
* Describes how an operator's output is split across partitions. The `satisfies`,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Editing note: should exploit parallelism between these two lists, since that's better writing. Will copyedit tomorrow.

@JoshRosen JoshRosen changed the title [SPARK-9703] [SQL] EnsureRequirements should not add unnecessary shuffles when only ordering requirements are unsatisfied [SPARK-9703] [SQL] Refactor EnsureRequirements to avoid certain unnecessary shuffles Aug 9, 2015
@JoshRosen
Copy link
Contributor Author

Alright, I've simplified these EnsureRequirements changes and I think this patch is now ready for review. The final change was quite small, essentially adding back the compatibleWith checks and fixing them. The rest of the changes are tests, documentation, and some small refactoring to improve clarity.

@SparkQA
Copy link

SparkQA commented Aug 9, 2015

Test build #40262 has finished for PR 7988 at commit 38006e7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

/cc @yhuai for final proofreading and sign-off.

@yhuai
Copy link
Contributor

yhuai commented Aug 9, 2015

Looks very good! I am merging it to master and branch 1.5.

asfgit pushed a commit that referenced this pull request Aug 9, 2015
…essary shuffles

This pull request refactors the `EnsureRequirements` planning rule in order to avoid the addition of certain unnecessary shuffles.

As an example of how unnecessary shuffles can occur, consider SortMergeJoin, which requires clustered distribution and sorted ordering of its children's input rows. Say that both of SMJ's children produce unsorted output but are both SinglePartition. In this case, we will need to inject sort operators but should not need to inject Exchanges. Unfortunately, it looks like the EnsureRequirements unnecessarily repartitions using a hash partitioning.

This patch solves this problem by refactoring `EnsureRequirements` to properly implement the `compatibleWith` checks that were broken in earlier implementations. See the significant inline comments for a better description of how this works. The majority of this PR is new comments and test cases, with few actual changes to the code.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #7988 from JoshRosen/exchange-fixes and squashes the following commits:

38006e7 [Josh Rosen] Rewrite EnsureRequirements _yet again_ to make things even simpler
0983f75 [Josh Rosen] More guarantees vs. compatibleWith cleanup; delete BroadcastPartitioning.
8784bd9 [Josh Rosen] Giant comment explaining compatibleWith vs. guarantees
1307c50 [Josh Rosen] Update conditions for requiring child compatibility.
18cddeb [Josh Rosen] Rename DummyPlan to DummySparkPlan.
2c7e126 [Josh Rosen] Merge remote-tracking branch 'origin/master' into exchange-fixes
fee65c4 [Josh Rosen] Further refinement to comments / reasoning
642b0bb [Josh Rosen] Further expand comment / reasoning
06aba0c [Josh Rosen] Add more comments
8dbc845 [Josh Rosen] Add even more tests.
4f08278 [Josh Rosen] Fix the test by adding the compatibility check to EnsureRequirements
a1c12b9 [Josh Rosen] Add failing test to demonstrate allCompatible bug
0725a34 [Josh Rosen] Small assertion cleanup.
5172ac5 [Josh Rosen] Add test for requiresChildrenToProduceSameNumberOfPartitions.
2e0f33a [Josh Rosen] Write a more generic test for EnsureRequirements.
752b8de [Josh Rosen] style fix
c628daf [Josh Rosen] Revert accidental ExchangeSuite change.
c9fb231 [Josh Rosen] Rewrite exchange to fix better handle this case.
adcc742 [Josh Rosen] Move test to PlannerSuite.
0675956 [Josh Rosen] Preserving ordering and partitioning in row format converters also does not help.
cc5669c [Josh Rosen] Adding outputPartitioning to Repartition does not fix the test.
2dfc648 [Josh Rosen] Add failing test illustrating bad exchange planning.

(cherry picked from commit 23cf5af)
Signed-off-by: Yin Huai <yhuai@databricks.com>
@asfgit asfgit closed this in 23cf5af Aug 9, 2015
object Partitioning {
def allCompatible(partitionings: Seq[Partitioning]): Boolean = {
// Note: this assumes transitivity
partitionings.sliding(2).map {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: we can use forall instead of map here and remove the forall at the end.

CodingCat pushed a commit to CodingCat/spark that referenced this pull request Aug 17, 2015
…essary shuffles

This pull request refactors the `EnsureRequirements` planning rule in order to avoid the addition of certain unnecessary shuffles.

As an example of how unnecessary shuffles can occur, consider SortMergeJoin, which requires clustered distribution and sorted ordering of its children's input rows. Say that both of SMJ's children produce unsorted output but are both SinglePartition. In this case, we will need to inject sort operators but should not need to inject Exchanges. Unfortunately, it looks like the EnsureRequirements unnecessarily repartitions using a hash partitioning.

This patch solves this problem by refactoring `EnsureRequirements` to properly implement the `compatibleWith` checks that were broken in earlier implementations. See the significant inline comments for a better description of how this works. The majority of this PR is new comments and test cases, with few actual changes to the code.

Author: Josh Rosen <joshrosen@databricks.com>

Closes apache#7988 from JoshRosen/exchange-fixes and squashes the following commits:

38006e7 [Josh Rosen] Rewrite EnsureRequirements _yet again_ to make things even simpler
0983f75 [Josh Rosen] More guarantees vs. compatibleWith cleanup; delete BroadcastPartitioning.
8784bd9 [Josh Rosen] Giant comment explaining compatibleWith vs. guarantees
1307c50 [Josh Rosen] Update conditions for requiring child compatibility.
18cddeb [Josh Rosen] Rename DummyPlan to DummySparkPlan.
2c7e126 [Josh Rosen] Merge remote-tracking branch 'origin/master' into exchange-fixes
fee65c4 [Josh Rosen] Further refinement to comments / reasoning
642b0bb [Josh Rosen] Further expand comment / reasoning
06aba0c [Josh Rosen] Add more comments
8dbc845 [Josh Rosen] Add even more tests.
4f08278 [Josh Rosen] Fix the test by adding the compatibility check to EnsureRequirements
a1c12b9 [Josh Rosen] Add failing test to demonstrate allCompatible bug
0725a34 [Josh Rosen] Small assertion cleanup.
5172ac5 [Josh Rosen] Add test for requiresChildrenToProduceSameNumberOfPartitions.
2e0f33a [Josh Rosen] Write a more generic test for EnsureRequirements.
752b8de [Josh Rosen] style fix
c628daf [Josh Rosen] Revert accidental ExchangeSuite change.
c9fb231 [Josh Rosen] Rewrite exchange to fix better handle this case.
adcc742 [Josh Rosen] Move test to PlannerSuite.
0675956 [Josh Rosen] Preserving ordering and partitioning in row format converters also does not help.
cc5669c [Josh Rosen] Adding outputPartitioning to Repartition does not fix the test.
2dfc648 [Josh Rosen] Add failing test illustrating bad exchange planning.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants