From f91a2aecf795b2a2b2b834bf69b21875ef6f0b6f Mon Sep 17 00:00:00 2001 From: Daoyuan Wang Date: Tue, 14 Apr 2015 23:09:00 -0700 Subject: [PATCH] yin's comment: use external sort if option is enabled, add comments --- .../apache/spark/sql/execution/Exchange.scala | 17 ++++++++++++++--- .../spark/sql/execution/basicOperators.scala | 2 -- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index dad8c3de4e1c2..2599765f24aa5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -29,7 +29,10 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.util.MutablePair object Exchange { - /** Returns true when the ordering expressions are a subset of the key. */ + /** + * Returns true when the ordering expressions are a subset of the key. + * if true, ShuffledRDD can use `setKeyOrdering(orderingKey)` to sort within [[Exchange]]. + */ def canSortWithShuffle(partitioning: Partitioning, desiredOrdering: Seq[SortOrder]): Boolean = { desiredOrdering.map(_.child).toSet.subsetOf(partitioning.keyExpressions.toSet) } @@ -224,7 +227,11 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[ } val withSort = if (needSort) { - Sort(rowOrdering, global = false, withShuffle) + if (sqlContext.conf.externalSortEnabled) { + ExternalSort(rowOrdering, global = false, withShuffle) + } else { + Sort(rowOrdering, global = false, withShuffle) + } } else { withShuffle } @@ -253,7 +260,11 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[ case (UnspecifiedDistribution, Seq(), child) => child case (UnspecifiedDistribution, rowOrdering, child) => - Sort(rowOrdering, global = false, child) + if (sqlContext.conf.externalSortEnabled) { + ExternalSort(rowOrdering, global = false, child) + } else { + Sort(rowOrdering, global = false, child) + } case (dist, ordering, _) => sys.error(s"Don't know how to ensure $dist with ordering $ordering") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index a13c7f7c8611b..8cc1b527bd682 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -106,8 +106,6 @@ case class Limit(limit: Int, child: SparkPlan) override def output: Seq[Attribute] = child.output override def outputPartitioning: Partitioning = SinglePartition - override def outputOrdering: Seq[SortOrder] = child.outputOrdering - override def executeCollect(): Array[Row] = child.executeTake(limit) override def execute(): RDD[Row] = {