-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-9703] [SQL] Refactor EnsureRequirements to avoid certain unnecessary shuffles #7988
Changes from 16 commits
2dfc648
cc5669c
0675956
adcc742
c9fb231
c628daf
752b8de
2e0f33a
5172ac5
0725a34
a1c12b9
4f08278
8dbc845
06aba0c
642b0bb
fee65c4
2c7e126
18cddeb
1307c50
8784bd9
0983f75
38006e7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -95,6 +95,22 @@ sealed trait Partitioning { | |
def guarantees(other: Partitioning): Boolean | ||
} | ||
|
||
object Partitioning { | ||
def allCompatible(partitionings: Seq[Partitioning]): Boolean = { | ||
// Note: this assumes transitivity | ||
partitionings.sliding(2).map { | ||
case Seq(a) => true | ||
case Seq(a, b) => | ||
if (a.numPartitions != b.numPartitions) { | ||
assert(!a.guarantees(b) && !b.guarantees(a)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since we are using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This sounds good to me. Let's revisit this naming later once we actually break this symmetry. |
||
false | ||
} else { | ||
a.guarantees(b) && b.guarantees(a) | ||
} | ||
}.forall(_ == true) | ||
} | ||
} | ||
|
||
case class UnknownPartitioning(numPartitions: Int) extends Partitioning { | ||
override def satisfies(required: Distribution): Boolean = required match { | ||
case UnspecifiedDistribution => true | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -197,66 +197,108 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una | |
* of input data meets the | ||
* [[org.apache.spark.sql.catalyst.plans.physical.Distribution Distribution]] requirements for | ||
* each operator by inserting [[Exchange]] Operators where required. Also ensure that the | ||
* required input partition ordering requirements are met. | ||
* input partition ordering requirements are met. | ||
*/ | ||
private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[SparkPlan] { | ||
// TODO: Determine the number of partitions. | ||
def numPartitions: Int = sqlContext.conf.numShufflePartitions | ||
private def numPartitions: Int = sqlContext.conf.numShufflePartitions | ||
|
||
def apply(plan: SparkPlan): SparkPlan = plan.transformUp { | ||
case operator: SparkPlan => | ||
// Adds Exchange or Sort operators as required | ||
def addOperatorsIfNecessary( | ||
partitioning: Partitioning, | ||
rowOrdering: Seq[SortOrder], | ||
child: SparkPlan): SparkPlan = { | ||
|
||
def addShuffleIfNecessary(child: SparkPlan): SparkPlan = { | ||
if (!child.outputPartitioning.guarantees(partitioning)) { | ||
Exchange(partitioning, child) | ||
} else { | ||
child | ||
} | ||
} | ||
/** | ||
* Given a required distribution, returns a partitioning that satisfies that distribution. | ||
*/ | ||
private def canonicalPartitioning(requiredDistribution: Distribution): Partitioning = { | ||
requiredDistribution match { | ||
case AllTuples => SinglePartition | ||
case ClusteredDistribution(clustering) => HashPartitioning(clustering, numPartitions) | ||
case OrderedDistribution(ordering) => RangePartitioning(ordering, numPartitions) | ||
case dist => sys.error(s"Do not know how to satisfy distribution $dist") | ||
} | ||
} | ||
|
||
def addSortIfNecessary(child: SparkPlan): SparkPlan = { | ||
/** | ||
* Return true if all of the operator's children satisfy their output distribution requirements. | ||
*/ | ||
private def childPartitioningsSatisfyDistributionRequirements(operator: SparkPlan): Boolean = { | ||
operator.children.zip(operator.requiredChildDistribution).forall { | ||
case (child, distribution) => child.outputPartitioning.satisfies(distribution) | ||
} | ||
} | ||
|
||
if (rowOrdering.nonEmpty) { | ||
// If child.outputOrdering is [a, b] and rowOrdering is [a], we do not need to sort. | ||
val minSize = Seq(rowOrdering.size, child.outputOrdering.size).min | ||
if (minSize == 0 || rowOrdering.take(minSize) != child.outputOrdering.take(minSize)) { | ||
sqlContext.planner.BasicOperators.getSortOperator(rowOrdering, global = false, child) | ||
} else { | ||
/** | ||
* Given an operator, check whether the operator requires its children to have compatible | ||
* output partitionings and add Exchanges to fix any detected incompatibilities. | ||
*/ | ||
private def ensureChildPartitioningsAreCompatible(operator: SparkPlan): SparkPlan = { | ||
if (operator.requiresChildPartitioningsToBeCompatible) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm. What about There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The default return value of |
||
if (!Partitioning.allCompatible(operator.children.map(_.outputPartitioning))) { | ||
val newChildren = operator.children.zip(operator.requiredChildDistribution).map { | ||
case (child, requiredDistribution) => | ||
val targetPartitioning = canonicalPartitioning(requiredDistribution) | ||
if (child.outputPartitioning.guarantees(targetPartitioning)) { | ||
child | ||
} else { | ||
Exchange(targetPartitioning, child) | ||
} | ||
} else { | ||
child | ||
} | ||
} | ||
|
||
addSortIfNecessary(addShuffleIfNecessary(child)) | ||
val newOperator = operator.withNewChildren(newChildren) | ||
assert(childPartitioningsSatisfyDistributionRequirements(newOperator)) | ||
newOperator | ||
} else { | ||
operator | ||
} | ||
} else { | ||
operator | ||
} | ||
} | ||
|
||
val requirements = | ||
(operator.requiredChildDistribution, operator.requiredChildOrdering, operator.children) | ||
private def ensureDistributionAndOrdering(operator: SparkPlan): SparkPlan = { | ||
|
||
val fixedChildren = requirements.zipped.map { | ||
case (AllTuples, rowOrdering, child) => | ||
addOperatorsIfNecessary(SinglePartition, rowOrdering, child) | ||
case (ClusteredDistribution(clustering), rowOrdering, child) => | ||
addOperatorsIfNecessary(HashPartitioning(clustering, numPartitions), rowOrdering, child) | ||
case (OrderedDistribution(ordering), rowOrdering, child) => | ||
addOperatorsIfNecessary(RangePartitioning(ordering, numPartitions), rowOrdering, child) | ||
def addShuffleIfNecessary(child: SparkPlan, requiredDistribution: Distribution): SparkPlan = { | ||
// A pre-condition of ensureDistributionAndOrdering is that joins' children have compatible | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This piece of reasoning is the trickiest part of this entire patch. Is this a valid argument given the current semantics of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will be cleaned up tomorrow when I consolidate the shuffle steps and make the assertions / invariants clearer. |
||
// partitionings. Thus, we only need to check whether the output partitionings satisfy | ||
// the required distribution. In the case where the children are all compatible, then they | ||
// will either all satisfy the required distribution or will all fail to satisfy it, since | ||
// A.guarantees(B) implies that A and B satisfy the same set of distributions. | ||
// Therefore, if all children are compatible then either all or none of them will shuffled to | ||
// ensure that the distribution requirements are met. | ||
// | ||
// Note that this reasoning implicitly assumes that operators which require compatible | ||
// child partitionings have equivalent required distributions for those children. | ||
if (child.outputPartitioning.satisfies(requiredDistribution)) { | ||
child | ||
} else { | ||
Exchange(canonicalPartitioning(requiredDistribution), child) | ||
} | ||
} | ||
|
||
case (UnspecifiedDistribution, Seq(), child) => | ||
def addSortIfNecessary(child: SparkPlan, requiredOrdering: Seq[SortOrder]): SparkPlan = { | ||
if (requiredOrdering.nonEmpty) { | ||
// If child.outputOrdering is [a, b] and requiredOrdering is [a], we do not need to sort. | ||
val minSize = Seq(requiredOrdering.size, child.outputOrdering.size).min | ||
if (minSize == 0 || requiredOrdering.take(minSize) != child.outputOrdering.take(minSize)) { | ||
sqlContext.planner.BasicOperators.getSortOperator(requiredOrdering, global = false, child) | ||
} else { | ||
child | ||
case (UnspecifiedDistribution, rowOrdering, child) => | ||
sqlContext.planner.BasicOperators.getSortOperator(rowOrdering, global = false, child) | ||
|
||
case (dist, ordering, _) => | ||
sys.error(s"Don't know how to ensure $dist with ordering $ordering") | ||
} | ||
} else { | ||
child | ||
} | ||
} | ||
|
||
val children = operator.children | ||
val requiredChildDistribution = operator.requiredChildDistribution | ||
val requiredChildOrdering = operator.requiredChildOrdering | ||
assert(children.length == requiredChildDistribution.length) | ||
assert(children.length == requiredChildOrdering.length) | ||
val newChildren = (children, requiredChildDistribution, requiredChildOrdering).zipped.map { | ||
case (child, requiredDistribution, requiredOrdering) => | ||
addSortIfNecessary(addShuffleIfNecessary(child, requiredDistribution), requiredOrdering) | ||
} | ||
operator.withNewChildren(newChildren) | ||
} | ||
|
||
operator.withNewChildren(fixedChildren) | ||
def apply(plan: SparkPlan): SparkPlan = plan.transformUp { | ||
case operator: SparkPlan => | ||
ensureDistributionAndOrdering(ensureChildPartitioningsAreCompatible(operator)) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: we can use
forall
instead ofmap
here and remove theforall
at the end.