diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index 1cc4271f8cf33..3df44ae1fad64 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -30,7 +30,7 @@ private[spark] trait CleanerListener { } /** - * Cleans RDDs and shuffle data. This should be instantiated only on the driver. + * Cleans RDDs and shuffle data. */ private[spark] class ContextCleaner(env: SparkEnv) extends Logging { @@ -62,12 +62,13 @@ private[spark] class ContextCleaner(env: SparkEnv) extends Logging { cleaningThread.interrupt() } - /** Clean all data and metadata related to a RDD, including shuffle files and metadata */ + /** Clean (unpersist) RDD data. */ def cleanRDD(rdd: RDD[_]) { enqueue(CleanRDD(rdd.sparkContext, rdd.id)) logDebug("Enqueued RDD " + rdd + " for cleaning up") } + /** Clean shuffle data. */ def cleanShuffle(shuffleId: Int) { enqueue(CleanShuffle(shuffleId)) logDebug("Enqueued shuffle " + shuffleId + " for cleaning up") @@ -102,16 +103,16 @@ private[spark] class ContextCleaner(env: SparkEnv) extends Logging { /** Perform RDD cleaning */ private def doCleanRDD(sc: SparkContext, rddId: Int) { - logDebug("Cleaning rdd "+ rddId) + logDebug("Cleaning rdd " + rddId) sc.env.blockManager.master.removeRdd(rddId, false) sc.persistentRdds.remove(rddId) listeners.foreach(_.rddCleaned(rddId)) - logInfo("Cleaned rdd "+ rddId) + logInfo("Cleaned rdd " + rddId) } /** Perform shuffle cleaning */ private def doCleanShuffle(shuffleId: Int) { - logDebug("Cleaning shuffle "+ shuffleId) + logDebug("Cleaning shuffle " + shuffleId) mapOutputTrackerMaster.unregisterShuffle(shuffleId) blockManager.master.removeShuffle(shuffleId) listeners.foreach(_.shuffleCleaned(shuffleId)) @@ -123,4 +124,4 @@ private[spark] class ContextCleaner(env: SparkEnv) extends Logging { private def blockManager = env.blockManager private def isStopped = synchronized { stopped } -} \ No newline at end of file +} diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index ed498696f3e2a..4d0f3dd6cdb71 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -54,6 +54,11 @@ private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster } } +/** + * Class that keeps track of the location of the location of the mapt output of + * a stage. This is abstract because different versions of MapOutputTracker + * (driver and worker) use different HashMap to store its metadata. + */ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging { private val timeout = AkkaUtils.askTimeout(conf) @@ -181,6 +186,10 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging } } +/** + * MapOutputTracker for the workers. This uses BoundedHashMap to keep track of + * a limited number of most recently used map output information. + */ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTracker(conf) { /** @@ -192,7 +201,10 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr protected val mapStatuses = new BoundedHashMap[Int, Array[MapStatus]](MAX_MAP_STATUSES, true) } - +/** + * MapOutputTracker for the driver. This uses TimeStampedHashMap to keep track of map + * output information, which allows old output information based on a TTL. + */ private[spark] class MapOutputTrackerMaster(conf: SparkConf) extends MapOutputTracker(conf) { diff --git a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala index 4a4b7d6837bca..60901c5e36130 100644 --- a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala +++ b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala @@ -50,7 +50,8 @@ private[spark] class TimeStampedHashMap[A, B](updateTimeStampOnGet: Boolean = fa override def get(key: A): Option[B] = { val timeStampedValue = internalMap.get(key) if (updateTimeStampOnGet && timeStampedValue != null) { - internalJavaMap.replace(key, timeStampedValue, TimeStampedValue(currentTime, timeStampedValue.value)) + internalJavaMap.replace(key, timeStampedValue, + TimeStampedValue(currentTime, timeStampedValue.value)) } Option(timeStampedValue).map(_.value) } diff --git a/core/src/main/scala/org/apache/spark/util/TimeStampedWeakValueHashMap.scala b/core/src/main/scala/org/apache/spark/util/TimeStampedWeakValueHashMap.scala index f2ef96f2fbfa9..ea0fde87c56d0 100644 --- a/core/src/main/scala/org/apache/spark/util/TimeStampedWeakValueHashMap.scala +++ b/core/src/main/scala/org/apache/spark/util/TimeStampedWeakValueHashMap.scala @@ -33,7 +33,7 @@ private[util] case class TimeStampedWeakValue[T](timestamp: Long, weakValue: Wea * A map that stores the timestamp of when a key was inserted along with the value, * while ensuring that the values are weakly referenced. If the value is garbage collected and * the weak reference is null, get() operation returns the key be non-existent. However, - * the key is actually not remmoved in the current implementation. Key-value pairs whose + * the key is actually not removed in the current implementation. Key-value pairs whose * timestamps are older than a particular threshold time can then be removed using the * clearOldValues method. It exposes a scala.collection.mutable.Map interface to allow it to be a * drop-in replacement for Scala HashMaps.