Skip to content

Commit

Permalink
Explicitly cast iterator in branches for type clarity
Browse files Browse the repository at this point in the history
  • Loading branch information
massie committed Jun 9, 2015
1 parent 7c8f73e commit 01e8721
Showing 1 changed file with 11 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,21 @@ private[spark] class HashShuffleReader[K, C](
readMetrics.incRecordsRead(1)
delegate.next()
}
}.asInstanceOf[Iterator[Nothing]]
}

val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
if (dep.mapSideCombine) {
new InterruptibleIterator(context, dep.aggregator.get.combineCombinersByKey(iter, context))
// We are reading values that are already combined
val combinedKeyValuesIterator = iter.asInstanceOf[Iterator[(K,C)]]
new InterruptibleIterator(context,
dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context))
} else {
new InterruptibleIterator(context, dep.aggregator.get.combineValuesByKey(iter, context))
// We don't know the value type, but also don't care -- the dependency *should*
// have made sure its compatible w/ this aggregator, which will convert the value
// type to the combined type C
val keyValuesIterator = iter.asInstanceOf[Iterator[(K,Nothing)]]
new InterruptibleIterator(context,
dep.aggregator.get.combineValuesByKey(keyValuesIterator, context))
}
} else {
require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")
Expand Down

0 comments on commit 01e8721

Please sign in to comment.