-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-44647][SQL] Support SPJ where join keys are less than cluster keys #42306
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -355,7 +355,14 @@ case class KeyGroupedPartitioning( | |
} else { | ||
// We'll need to find leaf attributes from the partition expressions first. | ||
val attributes = expressions.flatMap(_.collectLeaves()) | ||
attributes.forall(x => requiredClustering.exists(_.semanticEquals(x))) | ||
|
||
if (SQLConf.get.v2BucketingAllowJoinKeysSubsetOfPartitionKeys) { | ||
// check that all join keys (required clustering keys) contained in partitioning | ||
requiredClustering.forall(x => attributes.exists(_.semanticEquals(x))) && | ||
expressions.forall(_.collectLeaves().size == 1) | ||
} else { | ||
attributes.forall(x => requiredClustering.exists(_.semanticEquals(x))) | ||
} | ||
} | ||
|
||
case _ => | ||
|
@@ -364,8 +371,20 @@ case class KeyGroupedPartitioning( | |
} | ||
} | ||
|
||
override def createShuffleSpec(distribution: ClusteredDistribution): ShuffleSpec = | ||
KeyGroupedShuffleSpec(this, distribution) | ||
override def createShuffleSpec(distribution: ClusteredDistribution): ShuffleSpec = { | ||
val result = KeyGroupedShuffleSpec(this, distribution) | ||
if (SQLConf.get.v2BucketingAllowJoinKeysSubsetOfPartitionKeys) { | ||
// If allowing join keys to be subset of clustering keys, we should create a new | ||
// `KeyGroupedPartitioning` here that is grouped on the join keys instead, and use that as | ||
// the returned shuffle spec. | ||
val joinKeyPositions = result.keyPositions.map(_.nonEmpty).zipWithIndex.filter(_._1).map(_._2) | ||
val projectedPartitioning = KeyGroupedPartitioning(expressions, joinKeyPositions, | ||
partitionValues, originalPartitionValues) | ||
result.copy(partitioning = projectedPartitioning, joinKeyPositions = Some(joinKeyPositions)) | ||
} else { | ||
result | ||
} | ||
} | ||
|
||
lazy val uniquePartitionValues: Seq[InternalRow] = { | ||
partitionValues | ||
|
@@ -378,8 +397,25 @@ case class KeyGroupedPartitioning( | |
object KeyGroupedPartitioning { | ||
def apply( | ||
expressions: Seq[Expression], | ||
partitionValues: Seq[InternalRow]): KeyGroupedPartitioning = { | ||
KeyGroupedPartitioning(expressions, partitionValues.size, partitionValues, partitionValues) | ||
projectionPositions: Seq[Int], | ||
partitionValues: Seq[InternalRow], | ||
originalPartitionValues: Seq[InternalRow]): KeyGroupedPartitioning = { | ||
val projectedExpressions = projectionPositions.map(expressions(_)) | ||
val projectedPartitionValues = partitionValues.map(project(expressions, projectionPositions, _)) | ||
val projectedOriginalPartitionValues = | ||
originalPartitionValues.map(project(expressions, projectionPositions, _)) | ||
|
||
KeyGroupedPartitioning(projectedExpressions, projectedPartitionValues.length, | ||
projectedPartitionValues, projectedOriginalPartitionValues) | ||
} | ||
|
||
def project( | ||
expressions: Seq[Expression], | ||
positions: Seq[Int], | ||
input: InternalRow): InternalRow = { | ||
val projectedValues: Array[Any] = positions.map(i => input.get(i, expressions(i).dataType)) | ||
.toArray | ||
new GenericInternalRow(projectedValues) | ||
} | ||
|
||
def supportsExpressions(expressions: Seq[Expression]): Boolean = { | ||
|
@@ -672,9 +708,18 @@ case class HashShuffleSpec( | |
override def numPartitions: Int = partitioning.numPartitions | ||
} | ||
|
||
/** | ||
* [[ShuffleSpec]] created by [[KeyGroupedPartitioning]]. | ||
* | ||
* @param partitioning key grouped partitioning | ||
szehon-ho marked this conversation as resolved.
Show resolved
Hide resolved
|
||
* @param distribution distribution | ||
* @param joinKeyPosition position of join keys among cluster keys. | ||
* This is set if joining on a subset of cluster keys is allowed. | ||
*/ | ||
case class KeyGroupedShuffleSpec( | ||
partitioning: KeyGroupedPartitioning, | ||
distribution: ClusteredDistribution) extends ShuffleSpec { | ||
distribution: ClusteredDistribution, | ||
joinKeyPositions: Option[Seq[Int]] = None) extends ShuffleSpec { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we can add some comments for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added comments, please check and suggest if it can be improved. |
||
|
||
/** | ||
* A sequence where each element is a set of positions of the partition expression to the cluster | ||
|
@@ -709,7 +754,7 @@ case class KeyGroupedShuffleSpec( | |
// 3.3 each pair of partition expressions at the same index must share compatible | ||
// transform functions. | ||
// 4. the partition values from both sides are following the same order. | ||
case otherSpec @ KeyGroupedShuffleSpec(otherPartitioning, otherDistribution) => | ||
case otherSpec @ KeyGroupedShuffleSpec(otherPartitioning, otherDistribution, _) => | ||
distribution.clustering.length == otherDistribution.clustering.length && | ||
numPartitions == other.numPartitions && areKeysCompatible(otherSpec) && | ||
partitioning.partitionValues.zip(otherPartitioning.partitionValues).forall { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be guaranteed currently - it might be better to have this invariant check somewhere else like when constructing a
KeyGroupedPartitioning
, but OK to leave it here for now