From d2f8b977f2d78689512b67c82627f0b22e64daa7 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 25 Mar 2014 14:36:07 -0700 Subject: [PATCH] Removed duplicate unpersistRDD. --- .../src/main/scala/org/apache/spark/ContextCleaner.scala | 9 +-------- core/src/main/scala/org/apache/spark/SparkContext.scala | 3 +-- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 2 +- 3 files changed, 3 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index d499af20502d0..deabf6f5c8c5f 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -98,13 +98,6 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { listeners += listener } - /** Unpersists RDD and remove all blocks for it from memory and disk. */ - def unpersistRDD(rddId: Int, blocking: Boolean) { - logDebug("Unpersisted RDD " + rddId) - sc.env.blockManager.master.removeRdd(rddId, blocking) - sc.persistentRdds.remove(rddId) - } - /** Register an object for cleanup. */ private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask) { referenceBuffer += new WeakReferenceWithCleanupTask(objectForCleanup, task) @@ -136,7 +129,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { private def doCleanupRDD(rddId: Int) { try { logDebug("Cleaning RDD " + rddId) - unpersistRDD(rddId, false) + sc.unpersistRDD(rddId, false) listeners.foreach(_.rddCleaned(rddId)) logInfo("Cleaned RDD " + rddId) } catch { diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 500fb098e6649..5cd2caed10297 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -756,8 +756,7 @@ class SparkContext( /** * Unpersist an RDD from memory and/or disk storage */ - private[spark] def unpersistRDD(rdd: RDD[_], blocking: Boolean = true) { - val rddId = rdd.id + private[spark] def unpersistRDD(rddId: Int, blocking: Boolean = true) { env.blockManager.master.removeRdd(rddId, blocking) persistentRdds.remove(rddId) listenerBus.post(SparkListenerUnpersistRDD(rddId)) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index a5dc7a959fb22..2b7e3d99e68cb 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -158,7 +158,7 @@ abstract class RDD[T: ClassTag]( */ def unpersist(blocking: Boolean = true): RDD[T] = { logInfo("Removing RDD " + id + " from persistence list") - sc.unpersistRDD(this, blocking) + sc.unpersistRDD(this.id, blocking) storageLevel = StorageLevel.NONE this }