diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala index d92a2a98a3588..e0b8d58af490d 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala @@ -65,13 +65,21 @@ private[spark] class HashShuffleReader[K, C]( readMetrics.incRecordsRead(1) delegate.next() } - }.asInstanceOf[Iterator[Nothing]] + } val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) { if (dep.mapSideCombine) { - new InterruptibleIterator(context, dep.aggregator.get.combineCombinersByKey(iter, context)) + // We are reading values that are already combined + val combinedKeyValuesIterator = iter.asInstanceOf[Iterator[(K,C)]] + new InterruptibleIterator(context, + dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context)) } else { - new InterruptibleIterator(context, dep.aggregator.get.combineValuesByKey(iter, context)) + // We don't know the value type, but also don't care -- the dependency *should* + // have made sure its compatible w/ this aggregator, which will convert the value + // type to the combined type C + val keyValuesIterator = iter.asInstanceOf[Iterator[(K,Nothing)]] + new InterruptibleIterator(context, + dep.aggregator.get.combineValuesByKey(keyValuesIterator, context)) } } else { require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")