From 2ff811455a20d7651a817dec98db06ff3778cab5 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Wed, 20 Aug 2014 18:06:23 -0700 Subject: [PATCH] Fix bug --- .../scala/org/apache/spark/sql/execution/Exchange.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 09c34b7059fc3..4802e40595807 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -46,12 +46,15 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una case HashPartitioning(expressions, numPartitions) => // TODO: Eliminate redundant expressions in grouping key and value. val rdd = child.execute().mapPartitions { iter => - @transient val hashExpressions = - newMutableProjection(expressions, child.output)() - if (sortBasedShuffleOn) { + @transient val hashExpressions = + newProjection(expressions, child.output) + iter.map(r => (hashExpressions(r), r.copy())) } else { + @transient val hashExpressions = + newMutableProjection(expressions, child.output)() + val mutablePair = new MutablePair[Row, Row]() iter.map(r => mutablePair.update(hashExpressions(r), r)) }