Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-35703][SQL] Relax constraint for bucket join and remove HashClusteredDistribution #32875

Closed
wants to merge 29 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.catalyst.plans.physical

import scala.collection.mutable

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types.{DataType, IntegerType}

Expand Down Expand Up @@ -87,31 +89,6 @@ case class ClusteredDistribution(
}
}

/**
* Represents data where tuples have been clustered according to the hash of the given
* `expressions`. The hash function is defined as `HashPartitioning.partitionIdExpression`, so only
Copy link
Contributor

@HeartSaVioR HeartSaVioR Feb 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realized we even documented the characteristic I mentioned.

That said, I'd slightly in favor of be very clear about specialization of HashClusteredDistribution & HashPartitioning on Spark internal, via prefix on naming. The name seems to be too general and no one would know about the characteristic unless reading through the classdoc carefully. And it would be very confusing when someone finds a needs to have "general" HashClusteredDistribution & HashPartitioning and somehow finds these classes.

* [[HashPartitioning]] can satisfy this distribution.
*
* This is a strictly stronger guarantee than [[ClusteredDistribution]]. Given a tuple and the
* number of partitions, this distribution strictly requires which partition the tuple should be in.
*/
case class HashClusteredDistribution(
expressions: Seq[Expression],
requiredNumPartitions: Option[Int] = None) extends Distribution {
require(
expressions != Nil,
"The expressions for hash of a HashClusteredDistribution should not be Nil. " +
"An AllTuples should be used to represent a distribution that only has " +
"a single partition.")

override def createPartitioning(numPartitions: Int): Partitioning = {
assert(requiredNumPartitions.isEmpty || requiredNumPartitions.get == numPartitions,
s"This HashClusteredDistribution requires ${requiredNumPartitions.get} partitions, but " +
s"the actual number of partitions is $numPartitions.")
HashPartitioning(expressions, numPartitions)
}
}

/**
* Represents data where tuples have been ordered according to the `ordering`
* [[Expression Expressions]]. Its requirement is defined as the following:
Expand Down Expand Up @@ -171,6 +148,17 @@ trait Partitioning {
required.requiredNumPartitions.forall(_ == numPartitions) && satisfies0(required)
}

/**
* Creates a shuffle spec for this partitioning and its required distribution. The
* spec is used in the scenario where an operator has multiple children (e.g., join), and is
* used to decide whether this child is co-partitioned with others, therefore whether extra
* shuffle shall be introduced.
*
* @param distribution the required clustered distribution for this partitioning
*/
def createShuffleSpec(distribution: ClusteredDistribution): ShuffleSpec =
throw new IllegalStateException(s"Unexpected partitioning: ${getClass.getSimpleName}")

/**
* The actual method that defines whether this [[Partitioning]] can satisfy the given
* [[Distribution]], after the `numPartitions` check.
Expand Down Expand Up @@ -202,6 +190,9 @@ case object SinglePartition extends Partitioning {
case _: BroadcastDistribution => false
case _ => true
}

override def createShuffleSpec(distribution: ClusteredDistribution): ShuffleSpec =
SinglePartitionShuffleSpec
}

/**
Expand All @@ -219,17 +210,16 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)
override def satisfies0(required: Distribution): Boolean = {
super.satisfies0(required) || {
required match {
case h: HashClusteredDistribution =>
expressions.length == h.expressions.length && expressions.zip(h.expressions).forall {
case (l, r) => l.semanticEquals(r)
}
case ClusteredDistribution(requiredClustering, _) =>
expressions.forall(x => requiredClustering.exists(_.semanticEquals(x)))
case _ => false
}
}
}

override def createShuffleSpec(distribution: ClusteredDistribution): ShuffleSpec =
HashShuffleSpec(this, distribution)

/**
* Returns an expression that will produce a valid partition ID(i.e. non-negative and is less
* than numPartitions) based on hashing expressions.
Expand Down Expand Up @@ -288,6 +278,9 @@ case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int)
}
}

override def createShuffleSpec(distribution: ClusteredDistribution): ShuffleSpec =
RangeShuffleSpec(this.numPartitions, distribution)

override protected def withNewChildrenInternal(
newChildren: IndexedSeq[Expression]): RangePartitioning =
copy(ordering = newChildren.asInstanceOf[Seq[SortOrder]])
Expand Down Expand Up @@ -330,6 +323,11 @@ case class PartitioningCollection(partitionings: Seq[Partitioning])
override def satisfies0(required: Distribution): Boolean =
partitionings.exists(_.satisfies(required))

override def createShuffleSpec(distribution: ClusteredDistribution): ShuffleSpec = {
val filtered = partitionings.filter(_.satisfies(distribution))
ShuffleSpecCollection(filtered.map(_.createShuffleSpec(distribution)))
}

override def toString: String = {
partitionings.map(_.toString).mkString("(", " or ", ")")
}
Expand All @@ -352,3 +350,151 @@ case class BroadcastPartitioning(mode: BroadcastMode) extends Partitioning {
case _ => false
}
}

/**
* This is used in the scenario where an operator has multiple children (e.g., join) and one or more
* of which have their own requirement regarding whether its data can be considered as
* co-partitioned from others. This offers APIs for:
*
* - Comparing with specs from other children of the operator and check if they are compatible.
* When two specs are compatible, we can say their data are co-partitioned, and Spark will
* potentially be able to eliminate shuffle if necessary.
* - Creating a partitioning that can be used to re-partition another child, so that to make it
* having a compatible partitioning as this node.
*/
trait ShuffleSpec {
/**
* Returns the number of partitions of this shuffle spec
*/
def numPartitions: Int

/**
* Returns true iff this spec is compatible with the provided shuffle spec.
*
* A true return value means that the data partitioning from this spec can be seen as
* co-partitioned with the `other`, and therefore no shuffle is required when joining the two
* sides.
*/
def isCompatibleWith(other: ShuffleSpec): Boolean

/**
* Whether this shuffle spec can be used to create partitionings for the other children.
*/
def canCreatePartitioning: Boolean = false

/**
* Creates a partitioning that can be used to re-partition the other side with the given
* clustering expressions.
*
* This will only be called when:
* - [[canCreatePartitioning]] returns true.
* - [[isCompatibleWith]] returns false on the side where the `clustering` is from.
*/
def createPartitioning(clustering: Seq[Expression]): Partitioning =
throw new UnsupportedOperationException("Operation unsupported for " +
s"${getClass.getCanonicalName}")
}

case object SinglePartitionShuffleSpec extends ShuffleSpec {
override def isCompatibleWith(other: ShuffleSpec): Boolean = {
other.numPartitions == 1
}

override def canCreatePartitioning: Boolean = true

override def createPartitioning(clustering: Seq[Expression]): Partitioning =
SinglePartition

override def numPartitions: Int = 1
}

case class RangeShuffleSpec(
numPartitions: Int,
distribution: ClusteredDistribution) extends ShuffleSpec {

override def isCompatibleWith(other: ShuffleSpec): Boolean = other match {
case SinglePartitionShuffleSpec => numPartitions == 1
case ShuffleSpecCollection(specs) => specs.exists(isCompatibleWith)
// `RangePartitioning` is not compatible with any other partitioning since it can't guarantee
// data are co-partitioned for all the children, as range boundaries are randomly sampled.
case _ => false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, if the other spec and this spec are both only one partition?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's handled above: case SinglePartitionShuffleSpec => numPartitions == 1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, you mean any two specs are compatible if they both have only one partition. Makes sense.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ya, as range shuffle spec can have only one partition too.

}
}

case class HashShuffleSpec(
partitioning: HashPartitioning,
distribution: ClusteredDistribution) extends ShuffleSpec {
lazy val hashKeyPositions: Seq[mutable.BitSet] =
createHashKeyPositions(distribution.clustering, partitioning.expressions)

override def isCompatibleWith(other: ShuffleSpec): Boolean = other match {
case SinglePartitionShuffleSpec =>
partitioning.numPartitions == 1
case otherHashSpec @ HashShuffleSpec(otherPartitioning, otherDistribution) =>
// we need to check:
// 1. both distributions have the same number of clustering expressions
// 2. both partitioning have the same number of partitions
// 3. both partitioning have the same number of expressions
// 4. each pair of expression from both has overlapping positions in their
// corresponding distributions.
distribution.clustering.length == otherDistribution.clustering.length &&
partitioning.numPartitions == otherPartitioning.numPartitions &&
partitioning.expressions.length == otherPartitioning.expressions.length && {
val otherHashKeyPositions = otherHashSpec.hashKeyPositions
hashKeyPositions.zip(otherHashKeyPositions).forall { case (left, right) =>
left.intersect(right).nonEmpty
}
}
case ShuffleSpecCollection(specs) =>
specs.exists(isCompatibleWith)
case _ =>
false
}

override def canCreatePartitioning: Boolean = true

override def createPartitioning(clustering: Seq[Expression]): Partitioning = {
val exprs = hashKeyPositions.map(v => clustering(v.head))
HashPartitioning(exprs, partitioning.numPartitions)
}

override def numPartitions: Int = partitioning.numPartitions

/**
* Returns a sequence where each element is a set of positions of the key in `hashKeys` to its
* positions in `requiredClusterKeys`. For instance, if `requiredClusterKeys` is [a, b, b] and
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if requiredClusterKeys is [a, b, b]

hmm, how can this happen?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we allow duplicate keys in join conditions. You can check EnsureRequirementSuite for some related tests (search 'duplicate' keyword). Also SPARK-27485 is related.

* `hashKeys` is [a, b], the result will be [(0), (1, 2)].
*/
private def createHashKeyPositions(
requiredClusterKeys: Seq[Expression],
hashKeys: Seq[Expression]): Seq[mutable.BitSet] = {
val distKeyToPos = mutable.Map.empty[Expression, mutable.BitSet]
requiredClusterKeys.zipWithIndex.foreach { case (distKey, distKeyPos) =>
distKeyToPos.getOrElseUpdate(distKey.canonicalized, mutable.BitSet.empty).add(distKeyPos)
}

hashKeys.map(k => distKeyToPos(k.canonicalized))
}
}

case class ShuffleSpecCollection(specs: Seq[ShuffleSpec]) extends ShuffleSpec {
override def isCompatibleWith(other: ShuffleSpec): Boolean = {
specs.exists(_.isCompatibleWith(other))
}

override def canCreatePartitioning: Boolean =
specs.forall(_.canCreatePartitioning)

override def createPartitioning(clustering: Seq[Expression]): Partitioning = {
// as we only consider # of partitions as the cost now, it doesn't matter which one we choose
// since they should all have the same # of partitions.
require(specs.map(_.numPartitions).toSet.size == 1, "expected all specs in the collection " +
"to have the same number of partitions")
specs.head.createPartitioning(clustering)
}

override def numPartitions: Int = {
require(specs.nonEmpty, "expected specs to be non-empty")
specs.head.numPartitions
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,6 @@ class DistributionSuite extends SparkFunSuite {
ClusteredDistribution(Seq($"a", $"b", $"c")),
true)

checkSatisfied(
SinglePartition,
HashClusteredDistribution(Seq($"a", $"b", $"c")),
true)

checkSatisfied(
SinglePartition,
OrderedDistribution(Seq($"a".asc, $"b".asc, $"c".asc)),
Expand Down Expand Up @@ -172,23 +167,6 @@ class DistributionSuite extends SparkFunSuite {
ClusteredDistribution(Seq($"d", $"e")),
false)

// HashPartitioning can satisfy HashClusteredDistribution iff its hash expressions are exactly
// same with the required hash clustering expressions.
checkSatisfied(
HashPartitioning(Seq($"a", $"b", $"c"), 10),
HashClusteredDistribution(Seq($"a", $"b", $"c")),
true)

checkSatisfied(
HashPartitioning(Seq($"c", $"b", $"a"), 10),
HashClusteredDistribution(Seq($"a", $"b", $"c")),
false)

checkSatisfied(
HashPartitioning(Seq($"a", $"b"), 10),
HashClusteredDistribution(Seq($"a", $"b", $"c")),
false)

// HashPartitioning cannot satisfy OrderedDistribution
checkSatisfied(
HashPartitioning(Seq($"a", $"b", $"c"), 10),
Expand Down Expand Up @@ -269,12 +247,6 @@ class DistributionSuite extends SparkFunSuite {
RangePartitioning(Seq($"a".asc, $"b".asc, $"c".asc), 10),
ClusteredDistribution(Seq($"c", $"d")),
false)

// RangePartitioning cannot satisfy HashClusteredDistribution
checkSatisfied(
RangePartitioning(Seq($"a".asc, $"b".asc, $"c".asc), 10),
HashClusteredDistribution(Seq($"a", $"b", $"c")),
false)
}

test("Partitioning.numPartitions must match Distribution.requiredNumPartitions to satisfy it") {
Expand All @@ -283,21 +255,11 @@ class DistributionSuite extends SparkFunSuite {
ClusteredDistribution(Seq($"a", $"b", $"c"), Some(10)),
false)

checkSatisfied(
SinglePartition,
HashClusteredDistribution(Seq($"a", $"b", $"c"), Some(10)),
false)

checkSatisfied(
HashPartitioning(Seq($"a", $"b", $"c"), 10),
ClusteredDistribution(Seq($"a", $"b", $"c"), Some(5)),
false)

checkSatisfied(
HashPartitioning(Seq($"a", $"b", $"c"), 10),
HashClusteredDistribution(Seq($"a", $"b", $"c"), Some(5)),
false)

checkSatisfied(
RangePartitioning(Seq($"a".asc, $"b".asc, $"c".asc), 10),
ClusteredDistribution(Seq($"a", $"b", $"c"), Some(5)),
Expand Down
Loading