Skip to content

Commit

Permalink
Giant comment explaining compatibleWith vs. guarantees
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Aug 8, 2015
1 parent 1307c50 commit 8784bd9
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,37 @@ case class OrderedDistribution(ordering: Seq[SortOrder]) extends Distribution {
def clustering: Set[Expression] = ordering.map(_.child).toSet
}

/**
* Describes how an operator's output is split across partitions. The `satisfies`,
* `compatibleWith`, and `guarantees` methods describe relationships between child partitionings,
* target partitionings, and [[Distribution]]s. These relations are described more precisely in
* their individual method docs, but at a high level:
*
* - `satisfies` is a relationship between partitionings and distributions.
* - `compatibleWith` is relationships between an operator's child output partitionings.
* - `guarantees` is a relationship between a child's existing output partitioning and a target
* output partitioning.
*
* Diagrammatically:
*
* +--------------+
* | Distribution |
* +--------------+
* ^
* |
* satisfies
* |
* +--------------+ +--------------+
* | Child | | Target |
* +----| Partitioning |----guarantees--->| Partitioning |
* | +--------------+ +--------------+
* | ^
* | |
* | compatibleWith
* | |
* +------------+
*
*/
sealed trait Partitioning {
/** Returns the number of partitions that the data is split across */
val numPartitions: Int
Expand All @@ -87,12 +118,50 @@ sealed trait Partitioning {
*/
def satisfies(required: Distribution): Boolean

/**
* Returns true iff we can say that the partitioning scheme of this [[Partitioning]] guarantees
* the same partitioning scheme described by `other`. If a `A.guarantees(B)`, then repartitioning
* the child's output according to `B` will be unnecessary. `guarantees` is used as a performance
* optimization to allow the exchange planner to avoid redundant repartitionings. By default,
* a partitioning only guarantees partitionings that are equal to itself (i.e. the same number
* of partitions, same strategy (range or hash), etc).
*
* In order to enable more aggressive optimization, this strict equality check can be relaxed.
* For example, say that the planner needs to repartition all of an operator's children so that
* they satisfy the [[AllTuples]] distribution. One way to do this is to repartition all children
* to have the [[SinglePartition]] partitioning. If one of the operator's children already happens
* to be hash-partitioned with a single partition then we do not need to re-shuffle this child;
* this repartitioning can be avoided if a single-partition [[HashPartitioning]] `guarantees`
* [[SinglePartition]].
*
* The SinglePartition example given above is not particularly interesting; guarantees' real
* value occurs for more advanced partitioning strategies. SPARK-7871 will introduce a notion
* of null-safe partitionings, under which partitionings can specify whether rows whose
* partitioning keys contain null values will be grouped into the same partition or whether they
* will have an unknown / random distribution. If a partitioning does not require nulls to be
* clustered then a partitioning which _does_ cluster nulls will guarantee the null clustered
* partitioning. The converse is not true, however: a partitioning which clusters nulls cannot
* be guaranteed by one which does not cluster them. Thus, in general `guarantees` is not a
* symmetric relation.
*
* Another way to think about `guarantees`: if `A.guarantees(B)`, then any partitioning of rows
* produced by `A` could have also been produced by `B`.
*/
def guarantees(other: Partitioning): Boolean = this == other

/**
* Returns true iff we can say that the partitioning scheme of this [[Partitioning]]
* guarantees the same partitioning scheme described by `other`.
*
* Compatibility of partitionings is only checked for operators that have multiple children
* and that require a specific child output [[Distribution]], such as joins.
*
* Intuitively, partitionings are compatible if they route the same partitioning key to the same
* partition. For instance, two hash partitionings are only compatible if they produce the same
* number of output partitionings and hash records according to the same hash function and
* same partitioning key schema.
*/
// TODO: Add an example once we have the `nullSafe` concept.
def guarantees(other: Partitioning): Boolean
def compatibleWith(other: Partitioning): Boolean
}

object Partitioning {
Expand All @@ -102,10 +171,10 @@ object Partitioning {
case Seq(a) => true
case Seq(a, b) =>
if (a.numPartitions != b.numPartitions) {
assert(!a.guarantees(b) && !b.guarantees(a))
assert(!a.compatibleWith(b) && !b.compatibleWith(a))
false
} else {
a.guarantees(b) && b.guarantees(a)
a.compatibleWith(b) && b.compatibleWith(a)
}
}.forall(_ == true)
}
Expand All @@ -117,15 +186,15 @@ case class UnknownPartitioning(numPartitions: Int) extends Partitioning {
case _ => false
}

override def guarantees(other: Partitioning): Boolean = false
override def compatibleWith(other: Partitioning): Boolean = false
}

case object SinglePartition extends Partitioning {
val numPartitions = 1

override def satisfies(required: Distribution): Boolean = true

override def guarantees(other: Partitioning): Boolean = other match {
override def compatibleWith(other: Partitioning): Boolean = other match {
case SinglePartition => true
case _ => false
}
Expand All @@ -136,7 +205,7 @@ case object BroadcastPartitioning extends Partitioning {

override def satisfies(required: Distribution): Boolean = true

override def guarantees(other: Partitioning): Boolean = other match {
override def compatibleWith(other: Partitioning): Boolean = other match {
case BroadcastPartitioning => true
case _ => false
}
Expand All @@ -163,7 +232,7 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)
case _ => false
}

override def guarantees(other: Partitioning): Boolean = other match {
override def compatibleWith(other: Partitioning): Boolean = other match {
case o: HashPartitioning =>
this.clusteringSet == o.clusteringSet && this.numPartitions == o.numPartitions
case _ => false
Expand Down Expand Up @@ -201,7 +270,7 @@ case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int)
case _ => false
}

override def guarantees(other: Partitioning): Boolean = other match {
override def compatibleWith(other: Partitioning): Boolean = other match {
case o: RangePartitioning => this == o
case _ => false
}
Expand Down Expand Up @@ -248,8 +317,8 @@ case class PartitioningCollection(partitionings: Seq[Partitioning])
* Returns true if any `partitioning` of this collection guarantees
* the given [[Partitioning]].
*/
override def guarantees(other: Partitioning): Boolean =
partitionings.exists(_.guarantees(other))
override def compatibleWith(other: Partitioning): Boolean =
partitionings.exists(_.compatibleWith(other))

override def toString: String = {
partitionings.map(_.toString).mkString("(", " or ", ")")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[
val newChildren = operator.children.zip(operator.requiredChildDistribution).map {
case (child, requiredDistribution) =>
val targetPartitioning = canonicalPartitioning(requiredDistribution)
if (child.outputPartitioning.guarantees(targetPartitioning)) {
if (child.outputPartitioning.compatibleWith(targetPartitioning)) {
child
} else {
Exchange(targetPartitioning, child)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,8 @@ class PlannerSuite extends SparkFunSuite with SQLTestUtils {
assert(rightPartitioning.satisfies(distribution))
// However, these partitionings are not compatible with each other, so we still need to
// repartition both inputs prior to performing the join:
assert(!leftPartitioning.guarantees(rightPartitioning))
assert(!rightPartitioning.guarantees(leftPartitioning))
assert(!leftPartitioning.compatibleWith(rightPartitioning))
assert(!rightPartitioning.compatibleWith(leftPartitioning))
val inputPlan = DummySparkPlan(
children = Seq(
DummySparkPlan(outputPartitioning = leftPartitioning),
Expand Down

0 comments on commit 8784bd9

Please sign in to comment.