Skip to content

Commit

Permalink
sweep sweep
Browse files Browse the repository at this point in the history
  • Loading branch information
yifeih committed Jan 18, 2019
1 parent 6e86ac0 commit 4a12c93
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 11 deletions.
17 changes: 7 additions & 10 deletions core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.roaringbitmap.RoaringBitmap
import scala.collection.mutable

import org.apache.spark.SparkEnv
import org.apache.spark.internal.{Logging, config}
import org.apache.spark.internal.config
import org.apache.spark.shuffle.api.CommittedPartition
import org.apache.spark.storage.{BlockManagerId, ShuffleLocation}
import org.apache.spark.util.Utils
Expand All @@ -48,7 +48,7 @@ private[spark] sealed trait MapStatus {
}


private[spark] object MapStatus extends Logging {
private[spark] object MapStatus {

/**
* Min partition number to use [[HighlyCompressedMapStatus]]. A bit ugly here because in test
Expand All @@ -59,19 +59,16 @@ private[spark] object MapStatus extends Logging {
.getOrElse(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS.defaultValue.get)

def apply(loc: BlockManagerId, committedPartitions: Array[CommittedPartition]): MapStatus = {
val shuffleLocationsArray = committedPartitions.map(partition => {
partition match {
val shuffleLocationsArray = committedPartitions.collect {
case partition if partition != null && partition.shuffleLocation().isPresent
=> partition.shuffleLocation().get()
case _ => null
}
})
val lengthsArray = committedPartitions.map(partition => {
partition match {
}
val lengthsArray = committedPartitions.collect {
case partition if partition != null => partition.length()
case _ => 0
}
})

}
if (committedPartitions.length > minPartitionsToUseHighlyCompressMapStatus) {
HighlyCompressedMapStatus(loc, lengthsArray, shuffleLocationsArray)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.roaringbitmap.RoaringBitmap
import org.apache.spark.{SharedSparkContext, SparkConf, SparkFunSuite}
import org.apache.spark.scheduler.HighlyCompressedMapStatus
import org.apache.spark.serializer.KryoTest._
import org.apache.spark.storage.{BlockManagerId}
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.{ThreadUtils, Utils}

class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
Expand Down

0 comments on commit 4a12c93

Please sign in to comment.