Skip to content

Commit

Permalink
Small assertion cleanup.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Aug 6, 2015
1 parent 5172ac5 commit 0725a34
Showing 1 changed file with 15 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,19 @@ class PlannerSuite extends SparkFunSuite with SQLTestUtils {

// --- Unit tests of EnsureRequirements ---------------------------------------------------------

private def assertDistributionRequirementsAreSatisfied(outputPlan: SparkPlan): Unit = {
if (outputPlan.requiresChildrenToProduceSameNumberOfPartitions) {
if (outputPlan.children.map(_.outputPartitioning.numPartitions).toSet.size != 1) {
fail(s"Children did not produce the same number of partitions:\n$outputPlan")
}
}
outputPlan.children.zip(outputPlan.requiredChildDistribution).foreach {
case (child, requiredDist) =>
assert(child.outputPartitioning.satisfies(requiredDist),
s"$child output partitioning does not satisfy $requiredDist:\n$outputPlan")
}
}

test("EnsureRequirements ensures that children produce same number of partitions when required") {
val clustering = Literal(1) :: Nil
val distribution = ClusteredDistribution(clustering)
Expand All @@ -222,7 +235,7 @@ class PlannerSuite extends SparkFunSuite with SQLTestUtils {
requiredChildOrdering = Seq(Seq.empty, Seq.empty)
)
val outputPlan = EnsureRequirements(sqlContext).apply(inputPlan)
assert (outputPlan.children.map(_.outputPartitioning.numPartitions).toSet.size === 1)
assertDistributionRequirementsAreSatisfied(outputPlan)
}

test("EnsureRequirements should not repartition if only ordering requirement is unsatisfied") {
Expand All @@ -238,6 +251,7 @@ class PlannerSuite extends SparkFunSuite with SQLTestUtils {
requiredChildOrdering = Seq(outputOrdering, outputOrdering)
)
val outputPlan = EnsureRequirements(sqlContext).apply(inputPlan)
assertDistributionRequirementsAreSatisfied(outputPlan)
if (outputPlan.collect { case Exchange(_, _) => true }.nonEmpty) {
fail(s"No Exchanges should have been added:\n$outputPlan")
}
Expand Down

0 comments on commit 0725a34

Please sign in to comment.