Skip to content

Commit

Permalink
More guarantees vs. compatibleWith cleanup; delete BroadcastPartition…
Browse files Browse the repository at this point in the history
…ing.
  • Loading branch information
JoshRosen committed Aug 9, 2015
1 parent 8784bd9 commit 0983f75
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ case class OrderedDistribution(ordering: Seq[SortOrder]) extends Distribution {
}

/**
* Describes how an operator's output is split across partitions. The `satisfies`,
* `compatibleWith`, and `guarantees` methods describe relationships between child partitionings,
* Describes how an operator's output is split across partitions. The `compatibleWith`,
* `guarantees`, and `satisfies` methods describe relationships between child partitionings,
* target partitionings, and [[Distribution]]s. These relations are described more precisely in
* their individual method docs, but at a high level:
*
Expand Down Expand Up @@ -118,6 +118,23 @@ sealed trait Partitioning {
*/
def satisfies(required: Distribution): Boolean

/**
* Returns true iff we can say that the partitioning scheme of this [[Partitioning]]
* guarantees the same partitioning scheme described by `other`.
*
* Compatibility of partitionings is only checked for operators that have multiple children
* and that require a specific child output [[Distribution]], such as joins.
*
* Intuitively, partitionings are compatible if they route the same partitioning key to the same
* partition. For instance, two hash partitionings are only compatible if they produce the same
* number of output partitionings and hash records according to the same hash function and
* same partitioning key schema.
*
* Put another way, two partitionings are compatible with each other if they satisfy all of the
* same distribution guarantees.
*/
def compatibleWith(other: Partitioning): Boolean

/**
* Returns true iff we can say that the partitioning scheme of this [[Partitioning]] guarantees
* the same partitioning scheme described by `other`. If a `A.guarantees(B)`, then repartitioning
Expand Down Expand Up @@ -148,20 +165,6 @@ sealed trait Partitioning {
* produced by `A` could have also been produced by `B`.
*/
def guarantees(other: Partitioning): Boolean = this == other

/**
* Returns true iff we can say that the partitioning scheme of this [[Partitioning]]
* guarantees the same partitioning scheme described by `other`.
*
* Compatibility of partitionings is only checked for operators that have multiple children
* and that require a specific child output [[Distribution]], such as joins.
*
* Intuitively, partitionings are compatible if they route the same partitioning key to the same
* partition. For instance, two hash partitionings are only compatible if they produce the same
* number of output partitionings and hash records according to the same hash function and
* same partitioning key schema.
*/
def compatibleWith(other: Partitioning): Boolean
}

object Partitioning {
Expand All @@ -187,28 +190,18 @@ case class UnknownPartitioning(numPartitions: Int) extends Partitioning {
}

override def compatibleWith(other: Partitioning): Boolean = false

override def guarantees(other: Partitioning): Boolean = false
}

case object SinglePartition extends Partitioning {
val numPartitions = 1

override def satisfies(required: Distribution): Boolean = true

override def compatibleWith(other: Partitioning): Boolean = other match {
case SinglePartition => true
case _ => false
}
}

case object BroadcastPartitioning extends Partitioning {
val numPartitions = 1

override def satisfies(required: Distribution): Boolean = true
override def compatibleWith(other: Partitioning): Boolean = other.numPartitions == 1

override def compatibleWith(other: Partitioning): Boolean = other match {
case BroadcastPartitioning => true
case _ => false
}
override def guarantees(other: Partitioning): Boolean = other.numPartitions == 1
}

/**
Expand Down Expand Up @@ -237,6 +230,12 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)
this.clusteringSet == o.clusteringSet && this.numPartitions == o.numPartitions
case _ => false
}

override def guarantees(other: Partitioning): Boolean = other match {
case o: HashPartitioning =>
this.clusteringSet == o.clusteringSet && this.numPartitions == o.numPartitions
case _ => false
}
}

/**
Expand Down Expand Up @@ -274,6 +273,11 @@ case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int)
case o: RangePartitioning => this == o
case _ => false
}

override def guarantees(other: Partitioning): Boolean = other match {
case o: RangePartitioning => this == o
case _ => false
}
}

/**
Expand Down Expand Up @@ -314,12 +318,19 @@ case class PartitioningCollection(partitionings: Seq[Partitioning])
partitionings.exists(_.satisfies(required))

/**
* Returns true if any `partitioning` of this collection guarantees
* Returns true if any `partitioning` of this collection is compatible with
* the given [[Partitioning]].
*/
override def compatibleWith(other: Partitioning): Boolean =
partitionings.exists(_.compatibleWith(other))

/**
* Returns true if any `partitioning` of this collection guarantees
* the given [[Partitioning]].
*/
override def guarantees(other: Partitioning): Boolean =
partitionings.exists(_.guarantees(other))

override def toString: String = {
partitionings.map(_.toString).mkString("(", " or ", ")")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[
val newChildren = operator.children.zip(operator.requiredChildDistribution).map {
case (child, requiredDistribution) =>
val targetPartitioning = canonicalPartitioning(requiredDistribution)
if (child.outputPartitioning.compatibleWith(targetPartitioning)) {
if (child.outputPartitioning.guarantees(targetPartitioning)) {
child
} else {
Exchange(targetPartitioning, child)
Expand Down

0 comments on commit 0983f75

Please sign in to comment.