-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-35703][SQL] Relax constraint for bucket join and remove HashClusteredDistribution #32875
Changes from all commits
5d68f37
5b6e371
a6fba50
45c9a05
b184af9
3efc11d
5e4fca5
1a8301e
449db40
96f7e9c
c48c378
d0a5059
ea8032a
33a91bd
9a0d6c2
81b1a50
708ee6f
e743419
24473ff
17e7b3f
bc41e1c
fc843c4
a0358bb
2511d5d
bbf2475
9ac2656
24fe488
3ad0d8c
112b110
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,8 @@ | |
|
||
package org.apache.spark.sql.catalyst.plans.physical | ||
|
||
import scala.collection.mutable | ||
|
||
import org.apache.spark.sql.catalyst.expressions._ | ||
import org.apache.spark.sql.types.{DataType, IntegerType} | ||
|
||
|
@@ -87,31 +89,6 @@ case class ClusteredDistribution( | |
} | ||
} | ||
|
||
/** | ||
* Represents data where tuples have been clustered according to the hash of the given | ||
* `expressions`. The hash function is defined as `HashPartitioning.partitionIdExpression`, so only | ||
* [[HashPartitioning]] can satisfy this distribution. | ||
* | ||
* This is a strictly stronger guarantee than [[ClusteredDistribution]]. Given a tuple and the | ||
* number of partitions, this distribution strictly requires which partition the tuple should be in. | ||
*/ | ||
case class HashClusteredDistribution( | ||
expressions: Seq[Expression], | ||
requiredNumPartitions: Option[Int] = None) extends Distribution { | ||
require( | ||
expressions != Nil, | ||
"The expressions for hash of a HashClusteredDistribution should not be Nil. " + | ||
"An AllTuples should be used to represent a distribution that only has " + | ||
"a single partition.") | ||
|
||
override def createPartitioning(numPartitions: Int): Partitioning = { | ||
assert(requiredNumPartitions.isEmpty || requiredNumPartitions.get == numPartitions, | ||
s"This HashClusteredDistribution requires ${requiredNumPartitions.get} partitions, but " + | ||
s"the actual number of partitions is $numPartitions.") | ||
HashPartitioning(expressions, numPartitions) | ||
} | ||
} | ||
|
||
/** | ||
* Represents data where tuples have been ordered according to the `ordering` | ||
* [[Expression Expressions]]. Its requirement is defined as the following: | ||
|
@@ -171,6 +148,17 @@ trait Partitioning { | |
required.requiredNumPartitions.forall(_ == numPartitions) && satisfies0(required) | ||
} | ||
|
||
/** | ||
* Creates a shuffle spec for this partitioning and its required distribution. The | ||
* spec is used in the scenario where an operator has multiple children (e.g., join), and is | ||
* used to decide whether this child is co-partitioned with others, therefore whether extra | ||
* shuffle shall be introduced. | ||
* | ||
* @param distribution the required clustered distribution for this partitioning | ||
*/ | ||
def createShuffleSpec(distribution: ClusteredDistribution): ShuffleSpec = | ||
throw new IllegalStateException(s"Unexpected partitioning: ${getClass.getSimpleName}") | ||
|
||
/** | ||
* The actual method that defines whether this [[Partitioning]] can satisfy the given | ||
* [[Distribution]], after the `numPartitions` check. | ||
|
@@ -202,6 +190,9 @@ case object SinglePartition extends Partitioning { | |
case _: BroadcastDistribution => false | ||
case _ => true | ||
} | ||
|
||
override def createShuffleSpec(distribution: ClusteredDistribution): ShuffleSpec = | ||
SinglePartitionShuffleSpec | ||
} | ||
|
||
/** | ||
|
@@ -219,17 +210,16 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int) | |
override def satisfies0(required: Distribution): Boolean = { | ||
super.satisfies0(required) || { | ||
required match { | ||
case h: HashClusteredDistribution => | ||
expressions.length == h.expressions.length && expressions.zip(h.expressions).forall { | ||
case (l, r) => l.semanticEquals(r) | ||
} | ||
case ClusteredDistribution(requiredClustering, _) => | ||
expressions.forall(x => requiredClustering.exists(_.semanticEquals(x))) | ||
case _ => false | ||
} | ||
} | ||
} | ||
|
||
override def createShuffleSpec(distribution: ClusteredDistribution): ShuffleSpec = | ||
HashShuffleSpec(this, distribution) | ||
|
||
/** | ||
* Returns an expression that will produce a valid partition ID(i.e. non-negative and is less | ||
* than numPartitions) based on hashing expressions. | ||
|
@@ -288,6 +278,9 @@ case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int) | |
} | ||
} | ||
|
||
override def createShuffleSpec(distribution: ClusteredDistribution): ShuffleSpec = | ||
RangeShuffleSpec(this.numPartitions, distribution) | ||
|
||
override protected def withNewChildrenInternal( | ||
newChildren: IndexedSeq[Expression]): RangePartitioning = | ||
copy(ordering = newChildren.asInstanceOf[Seq[SortOrder]]) | ||
|
@@ -330,6 +323,11 @@ case class PartitioningCollection(partitionings: Seq[Partitioning]) | |
override def satisfies0(required: Distribution): Boolean = | ||
partitionings.exists(_.satisfies(required)) | ||
|
||
override def createShuffleSpec(distribution: ClusteredDistribution): ShuffleSpec = { | ||
val filtered = partitionings.filter(_.satisfies(distribution)) | ||
ShuffleSpecCollection(filtered.map(_.createShuffleSpec(distribution))) | ||
} | ||
|
||
override def toString: String = { | ||
partitionings.map(_.toString).mkString("(", " or ", ")") | ||
} | ||
|
@@ -352,3 +350,151 @@ case class BroadcastPartitioning(mode: BroadcastMode) extends Partitioning { | |
case _ => false | ||
} | ||
} | ||
|
||
/** | ||
* This is used in the scenario where an operator has multiple children (e.g., join) and one or more | ||
* of which have their own requirement regarding whether its data can be considered as | ||
* co-partitioned from others. This offers APIs for: | ||
* | ||
* - Comparing with specs from other children of the operator and check if they are compatible. | ||
* When two specs are compatible, we can say their data are co-partitioned, and Spark will | ||
* potentially be able to eliminate shuffle if necessary. | ||
* - Creating a partitioning that can be used to re-partition another child, so that to make it | ||
* having a compatible partitioning as this node. | ||
*/ | ||
trait ShuffleSpec { | ||
/** | ||
* Returns the number of partitions of this shuffle spec | ||
*/ | ||
def numPartitions: Int | ||
|
||
/** | ||
* Returns true iff this spec is compatible with the provided shuffle spec. | ||
* | ||
* A true return value means that the data partitioning from this spec can be seen as | ||
* co-partitioned with the `other`, and therefore no shuffle is required when joining the two | ||
* sides. | ||
*/ | ||
def isCompatibleWith(other: ShuffleSpec): Boolean | ||
|
||
/** | ||
* Whether this shuffle spec can be used to create partitionings for the other children. | ||
*/ | ||
def canCreatePartitioning: Boolean = false | ||
|
||
/** | ||
* Creates a partitioning that can be used to re-partition the other side with the given | ||
* clustering expressions. | ||
* | ||
* This will only be called when: | ||
* - [[canCreatePartitioning]] returns true. | ||
* - [[isCompatibleWith]] returns false on the side where the `clustering` is from. | ||
*/ | ||
def createPartitioning(clustering: Seq[Expression]): Partitioning = | ||
throw new UnsupportedOperationException("Operation unsupported for " + | ||
s"${getClass.getCanonicalName}") | ||
} | ||
|
||
case object SinglePartitionShuffleSpec extends ShuffleSpec { | ||
override def isCompatibleWith(other: ShuffleSpec): Boolean = { | ||
cloud-fan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
other.numPartitions == 1 | ||
} | ||
|
||
override def canCreatePartitioning: Boolean = true | ||
|
||
override def createPartitioning(clustering: Seq[Expression]): Partitioning = | ||
SinglePartition | ||
|
||
override def numPartitions: Int = 1 | ||
} | ||
|
||
case class RangeShuffleSpec( | ||
numPartitions: Int, | ||
distribution: ClusteredDistribution) extends ShuffleSpec { | ||
|
||
override def isCompatibleWith(other: ShuffleSpec): Boolean = other match { | ||
case SinglePartitionShuffleSpec => numPartitions == 1 | ||
case ShuffleSpecCollection(specs) => specs.exists(isCompatibleWith) | ||
// `RangePartitioning` is not compatible with any other partitioning since it can't guarantee | ||
// data are co-partitioned for all the children, as range boundaries are randomly sampled. | ||
case _ => false | ||
cloud-fan marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, if the other spec and this spec are both only one partition? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's handled above: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah, you mean any two specs are compatible if they both have only one partition. Makes sense. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ya, as range shuffle spec can have only one partition too. |
||
} | ||
} | ||
|
||
case class HashShuffleSpec( | ||
partitioning: HashPartitioning, | ||
distribution: ClusteredDistribution) extends ShuffleSpec { | ||
lazy val hashKeyPositions: Seq[mutable.BitSet] = | ||
createHashKeyPositions(distribution.clustering, partitioning.expressions) | ||
|
||
override def isCompatibleWith(other: ShuffleSpec): Boolean = other match { | ||
case SinglePartitionShuffleSpec => | ||
partitioning.numPartitions == 1 | ||
case otherHashSpec @ HashShuffleSpec(otherPartitioning, otherDistribution) => | ||
// we need to check: | ||
// 1. both distributions have the same number of clustering expressions | ||
// 2. both partitioning have the same number of partitions | ||
// 3. both partitioning have the same number of expressions | ||
// 4. each pair of expression from both has overlapping positions in their | ||
// corresponding distributions. | ||
distribution.clustering.length == otherDistribution.clustering.length && | ||
partitioning.numPartitions == otherPartitioning.numPartitions && | ||
partitioning.expressions.length == otherPartitioning.expressions.length && { | ||
val otherHashKeyPositions = otherHashSpec.hashKeyPositions | ||
hashKeyPositions.zip(otherHashKeyPositions).forall { case (left, right) => | ||
left.intersect(right).nonEmpty | ||
} | ||
} | ||
case ShuffleSpecCollection(specs) => | ||
specs.exists(isCompatibleWith) | ||
case _ => | ||
false | ||
} | ||
|
||
override def canCreatePartitioning: Boolean = true | ||
|
||
override def createPartitioning(clustering: Seq[Expression]): Partitioning = { | ||
val exprs = hashKeyPositions.map(v => clustering(v.head)) | ||
HashPartitioning(exprs, partitioning.numPartitions) | ||
} | ||
|
||
override def numPartitions: Int = partitioning.numPartitions | ||
|
||
/** | ||
* Returns a sequence where each element is a set of positions of the key in `hashKeys` to its | ||
* positions in `requiredClusterKeys`. For instance, if `requiredClusterKeys` is [a, b, b] and | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
hmm, how can this happen? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we allow duplicate keys in join conditions. You can check |
||
* `hashKeys` is [a, b], the result will be [(0), (1, 2)]. | ||
*/ | ||
private def createHashKeyPositions( | ||
requiredClusterKeys: Seq[Expression], | ||
hashKeys: Seq[Expression]): Seq[mutable.BitSet] = { | ||
val distKeyToPos = mutable.Map.empty[Expression, mutable.BitSet] | ||
requiredClusterKeys.zipWithIndex.foreach { case (distKey, distKeyPos) => | ||
distKeyToPos.getOrElseUpdate(distKey.canonicalized, mutable.BitSet.empty).add(distKeyPos) | ||
} | ||
|
||
hashKeys.map(k => distKeyToPos(k.canonicalized)) | ||
} | ||
} | ||
|
||
case class ShuffleSpecCollection(specs: Seq[ShuffleSpec]) extends ShuffleSpec { | ||
override def isCompatibleWith(other: ShuffleSpec): Boolean = { | ||
specs.exists(_.isCompatibleWith(other)) | ||
} | ||
|
||
override def canCreatePartitioning: Boolean = | ||
specs.forall(_.canCreatePartitioning) | ||
|
||
override def createPartitioning(clustering: Seq[Expression]): Partitioning = { | ||
// as we only consider # of partitions as the cost now, it doesn't matter which one we choose | ||
// since they should all have the same # of partitions. | ||
cloud-fan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
require(specs.map(_.numPartitions).toSet.size == 1, "expected all specs in the collection " + | ||
"to have the same number of partitions") | ||
specs.head.createPartitioning(clustering) | ||
} | ||
|
||
override def numPartitions: Int = { | ||
require(specs.nonEmpty, "expected specs to be non-empty") | ||
specs.head.numPartitions | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I realized we even documented the characteristic I mentioned.
That said, I'd slightly in favor of be very clear about specialization of HashClusteredDistribution & HashPartitioning on Spark internal, via prefix on naming. The name seems to be too general and no one would know about the characteristic unless reading through the classdoc carefully. And it would be very confusing when someone finds a needs to have "general" HashClusteredDistribution & HashPartitioning and somehow finds these classes.