diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index 97cf83d897a8d..d36947e6d538e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -75,6 +75,37 @@ case class OrderedDistribution(ordering: Seq[SortOrder]) extends Distribution { def clustering: Set[Expression] = ordering.map(_.child).toSet } +/** + * Describes how an operator's output is split across partitions. The `satisfies`, + * `compatibleWith`, and `guarantees` methods describe relationships between child partitionings, + * target partitionings, and [[Distribution]]s. These relations are described more precisely in + * their individual method docs, but at a high level: + * + * - `satisfies` is a relationship between partitionings and distributions. + * - `compatibleWith` is relationships between an operator's child output partitionings. + * - `guarantees` is a relationship between a child's existing output partitioning and a target + * output partitioning. + * + * Diagrammatically: + * + * +--------------+ + * | Distribution | + * +--------------+ + * ^ + * | + * satisfies + * | + * +--------------+ +--------------+ + * | Child | | Target | + * +----| Partitioning |----guarantees--->| Partitioning | + * | +--------------+ +--------------+ + * | ^ + * | | + * | compatibleWith + * | | + * +------------+ + * + */ sealed trait Partitioning { /** Returns the number of partitions that the data is split across */ val numPartitions: Int @@ -87,12 +118,50 @@ sealed trait Partitioning { */ def satisfies(required: Distribution): Boolean + /** + * Returns true iff we can say that the partitioning scheme of this [[Partitioning]] guarantees + * the same partitioning scheme described by `other`. If a `A.guarantees(B)`, then repartitioning + * the child's output according to `B` will be unnecessary. `guarantees` is used as a performance + * optimization to allow the exchange planner to avoid redundant repartitionings. By default, + * a partitioning only guarantees partitionings that are equal to itself (i.e. the same number + * of partitions, same strategy (range or hash), etc). + * + * In order to enable more aggressive optimization, this strict equality check can be relaxed. + * For example, say that the planner needs to repartition all of an operator's children so that + * they satisfy the [[AllTuples]] distribution. One way to do this is to repartition all children + * to have the [[SinglePartition]] partitioning. If one of the operator's children already happens + * to be hash-partitioned with a single partition then we do not need to re-shuffle this child; + * this repartitioning can be avoided if a single-partition [[HashPartitioning]] `guarantees` + * [[SinglePartition]]. + * + * The SinglePartition example given above is not particularly interesting; guarantees' real + * value occurs for more advanced partitioning strategies. SPARK-7871 will introduce a notion + * of null-safe partitionings, under which partitionings can specify whether rows whose + * partitioning keys contain null values will be grouped into the same partition or whether they + * will have an unknown / random distribution. If a partitioning does not require nulls to be + * clustered then a partitioning which _does_ cluster nulls will guarantee the null clustered + * partitioning. The converse is not true, however: a partitioning which clusters nulls cannot + * be guaranteed by one which does not cluster them. Thus, in general `guarantees` is not a + * symmetric relation. + * + * Another way to think about `guarantees`: if `A.guarantees(B)`, then any partitioning of rows + * produced by `A` could have also been produced by `B`. + */ + def guarantees(other: Partitioning): Boolean = this == other + /** * Returns true iff we can say that the partitioning scheme of this [[Partitioning]] * guarantees the same partitioning scheme described by `other`. + * + * Compatibility of partitionings is only checked for operators that have multiple children + * and that require a specific child output [[Distribution]], such as joins. + * + * Intuitively, partitionings are compatible if they route the same partitioning key to the same + * partition. For instance, two hash partitionings are only compatible if they produce the same + * number of output partitionings and hash records according to the same hash function and + * same partitioning key schema. */ - // TODO: Add an example once we have the `nullSafe` concept. - def guarantees(other: Partitioning): Boolean + def compatibleWith(other: Partitioning): Boolean } object Partitioning { @@ -102,10 +171,10 @@ object Partitioning { case Seq(a) => true case Seq(a, b) => if (a.numPartitions != b.numPartitions) { - assert(!a.guarantees(b) && !b.guarantees(a)) + assert(!a.compatibleWith(b) && !b.compatibleWith(a)) false } else { - a.guarantees(b) && b.guarantees(a) + a.compatibleWith(b) && b.compatibleWith(a) } }.forall(_ == true) } @@ -117,7 +186,7 @@ case class UnknownPartitioning(numPartitions: Int) extends Partitioning { case _ => false } - override def guarantees(other: Partitioning): Boolean = false + override def compatibleWith(other: Partitioning): Boolean = false } case object SinglePartition extends Partitioning { @@ -125,7 +194,7 @@ case object SinglePartition extends Partitioning { override def satisfies(required: Distribution): Boolean = true - override def guarantees(other: Partitioning): Boolean = other match { + override def compatibleWith(other: Partitioning): Boolean = other match { case SinglePartition => true case _ => false } @@ -136,7 +205,7 @@ case object BroadcastPartitioning extends Partitioning { override def satisfies(required: Distribution): Boolean = true - override def guarantees(other: Partitioning): Boolean = other match { + override def compatibleWith(other: Partitioning): Boolean = other match { case BroadcastPartitioning => true case _ => false } @@ -163,7 +232,7 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int) case _ => false } - override def guarantees(other: Partitioning): Boolean = other match { + override def compatibleWith(other: Partitioning): Boolean = other match { case o: HashPartitioning => this.clusteringSet == o.clusteringSet && this.numPartitions == o.numPartitions case _ => false @@ -201,7 +270,7 @@ case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int) case _ => false } - override def guarantees(other: Partitioning): Boolean = other match { + override def compatibleWith(other: Partitioning): Boolean = other match { case o: RangePartitioning => this == o case _ => false } @@ -248,8 +317,8 @@ case class PartitioningCollection(partitionings: Seq[Partitioning]) * Returns true if any `partitioning` of this collection guarantees * the given [[Partitioning]]. */ - override def guarantees(other: Partitioning): Boolean = - partitionings.exists(_.guarantees(other)) + override def compatibleWith(other: Partitioning): Boolean = + partitionings.exists(_.compatibleWith(other)) override def toString: String = { partitionings.map(_.toString).mkString("(", " or ", ")") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 82867b466627f..123417e28e379 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -230,7 +230,7 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[ val newChildren = operator.children.zip(operator.requiredChildDistribution).map { case (child, requiredDistribution) => val targetPartitioning = canonicalPartitioning(requiredDistribution) - if (child.outputPartitioning.guarantees(targetPartitioning)) { + if (child.outputPartitioning.compatibleWith(targetPartitioning)) { child } else { Exchange(targetPartitioning, child) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index f7aac569ad467..5582caa0d366e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -242,8 +242,8 @@ class PlannerSuite extends SparkFunSuite with SQLTestUtils { assert(rightPartitioning.satisfies(distribution)) // However, these partitionings are not compatible with each other, so we still need to // repartition both inputs prior to performing the join: - assert(!leftPartitioning.guarantees(rightPartitioning)) - assert(!rightPartitioning.guarantees(leftPartitioning)) + assert(!leftPartitioning.compatibleWith(rightPartitioning)) + assert(!rightPartitioning.compatibleWith(leftPartitioning)) val inputPlan = DummySparkPlan( children = Seq( DummySparkPlan(outputPartitioning = leftPartitioning),