From 56d6f5c33bd3ce68e43846aa10ce0f6002f316a9 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Wed, 6 Feb 2013 21:34:09 -0800 Subject: [PATCH 01/12] sketch of how to do mapParitionsWithSetupAndCleanup --- core/src/main/scala/spark/RDD.scala | 23 ++++++++++++++--- .../scala/spark/util/CleanupIterator.scala | 25 +++++++++++++++++++ 2 files changed, 44 insertions(+), 4 deletions(-) create mode 100644 core/src/main/scala/spark/util/CleanupIterator.scala diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 9d6ea782bd..7170790e2a 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -1,13 +1,10 @@ package spark -import java.net.URL -import java.util.{Date, Random} -import java.util.{HashMap => JHashMap} +import java.util.Random import scala.collection.Map import scala.collection.JavaConversions.mapAsScalaMap import scala.collection.mutable.ArrayBuffer -import scala.collection.mutable.HashMap import org.apache.hadoop.io.BytesWritable import org.apache.hadoop.io.NullWritable @@ -34,6 +31,8 @@ import spark.rdd.ZippedRDD import spark.storage.StorageLevel import SparkContext._ +import spark.RDD.PartitionMapper +import util.CleanupIterator /** * A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, @@ -339,6 +338,14 @@ abstract class RDD[T: ClassManifest]( preservesPartitioning: Boolean = false): RDD[U] = new MapPartitionsWithSplitRDD(this, sc.clean(f), preservesPartitioning) + def mapWithSetupAndCleanup[U: ClassManifest](m: PartitionMapper[T,U]) : RDD[U] = + mapPartitions{ + itr => + m.setup + val subItr = itr.map{m.map} + CleanupIterator[U,Iterator[U]](subItr, m.cleanup _) + } + /** * Zips this RDD with another one, returning key-value pairs with the first element in each RDD, * second element in each RDD, etc. Assumes that the two RDDs have the *same number of @@ -680,3 +687,11 @@ abstract class RDD[T: ClassManifest]( origin) } + +object RDD { + trait PartitionMapper[T,U] { + def setup + def map(t: T) : U + def cleanup + } +} diff --git a/core/src/main/scala/spark/util/CleanupIterator.scala b/core/src/main/scala/spark/util/CleanupIterator.scala new file mode 100644 index 0000000000..7e804508b8 --- /dev/null +++ b/core/src/main/scala/spark/util/CleanupIterator.scala @@ -0,0 +1,25 @@ +package spark.util + +/** + * Wrapper around an iterator which calls a cleanup method when its finished iterating through its elements + */ +abstract class CleanupIterator[+A, +I <: Iterator[A]](sub: I) extends Iterator[A]{ + def next = sub.next + def hasNext = { + val r = sub.hasNext + if (!r) { + cleanup + } + r + } + + def cleanup +} + +object CleanupIterator { + def apply[A, I <: Iterator[A]](sub: I, cleanupFunction: () => Unit) : CleanupIterator[A,I] = { + new CleanupIterator[A,I](sub) { + def cleanup = cleanupFunction() + } + } +} \ No newline at end of file From b6485586c47b61132895485088be6d5266de9165 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Fri, 8 Feb 2013 13:50:32 -0800 Subject: [PATCH 02/12] add tests & docs --- core/src/main/scala/spark/RDD.scala | 33 ++++++++++++++--- core/src/test/scala/spark/RDDSuite.scala | 46 ++++++++++++++++++++++++ 2 files changed, 74 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 7170790e2a..a837420988 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -338,10 +338,17 @@ abstract class RDD[T: ClassManifest]( preservesPartitioning: Boolean = false): RDD[U] = new MapPartitionsWithSplitRDD(this, sc.clean(f), preservesPartitioning) + /** + * Return a new RDD by applying a function to every element in this RDD, with extra setup & cleanup + * at the beginning & end of processing every partition. + * + * This might be useful if you need to setup some resources per task & cleanup them up at the end, eg. + * a db connection + */ def mapWithSetupAndCleanup[U: ClassManifest](m: PartitionMapper[T,U]) : RDD[U] = - mapPartitions{ - itr => - m.setup + mapPartitionsWithSplit{ + case(partition, itr) => + m.setup(partition) val subItr = itr.map{m.map} CleanupIterator[U,Iterator[U]](subItr, m.cleanup _) } @@ -689,9 +696,25 @@ abstract class RDD[T: ClassManifest]( } object RDD { - trait PartitionMapper[T,U] { - def setup + + /** + * Defines a map function over elements of an RDD, but with extra setup and cleanup + * that happens + */ + trait PartitionMapper[T,U] extends Serializable { + /** + * called at the start of processing of each partition + */ + def setup(partiton:Int) + + /** + * transform one element of the partition + */ def map(t: T) : U + + /** + * called at the end of each partition + */ def cleanup } } diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index fe7deb10d6..405f209002 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -4,6 +4,8 @@ import scala.collection.mutable.HashMap import org.scalatest.FunSuite import spark.SparkContext._ import spark.rdd.{CoalescedRDD, PartitionPruningRDD} +import spark.RDD.PartitionMapper +import collection._ class RDDSuite extends FunSuite with LocalSparkContext { @@ -173,4 +175,48 @@ class RDDSuite extends FunSuite with LocalSparkContext { assert(prunedData.size === 1) assert(prunedData(0) === 10) } + + test("mapPartitionWithSetupAndCleanup") { + sc = new SparkContext("local[4]", "test") + val data = sc.parallelize(1 to 100, 4) + val acc = sc.accumulableCollection(new HashMap[Int,Set[Int]]()) + val mapped = data.mapWithSetupAndCleanup(new PartitionMapper[Int,Int](){ + var partition = -1 + val values = mutable.Set[Int]() + def setup(partition:Int) {this.partition = partition} + def map(i:Int) = {values += i; i * 2} + def cleanup = { + //the purpose of this strange code is just to make sure this method is called + // after the data has been iterated through completely. + acc.localValue += (partition -> Set[Int](values.toSeq: _*)) + } + }).collect + + assert(mapped.toSet === (1 to 100).map{_ * 2}.toSet) + assert(acc.value.keySet == (0 to 3).toSet) + acc.value.foreach { case(partition, values) => + assert(values.size === 25) + } + + + //the naive alternative doesn't work + val acc2 = sc.accumulableCollection(new HashMap[Int,Set[Int]]()) + val m2 = data.mapPartitionsWithSplit{ + case (partition, itr) => + val values = mutable.Set[Int]() + val mItr = itr.map{i => values += i; i * 2} + //you haven't actually put anything into values yet, b/c itr.map defines another + // iterator, which is lazily computed. so the Set is empty + acc2.localValue += (partition -> Set[Int](values.toSeq: _*)) + mItr + }.collect + + assert(m2.toSet === (1 to 100).map{_ * 2}.toSet) + assert(acc2.value.keySet == (0 to 3).toSet) + //this condition will fail +// acc2.value.foreach { case(partition, values) => +// assert(values.size === 25) +// } + + } } From aebda1d23fd5394bb0bcc3972aba2202f4bf2ad8 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Fri, 8 Feb 2013 14:49:02 -0800 Subject: [PATCH 03/12] use immutable map in test for clarity; fixup style issues --- core/src/main/scala/spark/RDD.scala | 4 ++-- core/src/test/scala/spark/RDDSuite.scala | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index a837420988..3da0e8ac2a 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -30,9 +30,9 @@ import spark.rdd.UnionRDD import spark.rdd.ZippedRDD import spark.storage.StorageLevel -import SparkContext._ +import spark.SparkContext._ import spark.RDD.PartitionMapper -import util.CleanupIterator +import spark.util.CleanupIterator /** * A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index 405f209002..222cfae7c7 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -1,11 +1,11 @@ package spark import scala.collection.mutable.HashMap +import scala.collection.Set import org.scalatest.FunSuite import spark.SparkContext._ import spark.rdd.{CoalescedRDD, PartitionPruningRDD} import spark.RDD.PartitionMapper -import collection._ class RDDSuite extends FunSuite with LocalSparkContext { @@ -182,13 +182,13 @@ class RDDSuite extends FunSuite with LocalSparkContext { val acc = sc.accumulableCollection(new HashMap[Int,Set[Int]]()) val mapped = data.mapWithSetupAndCleanup(new PartitionMapper[Int,Int](){ var partition = -1 - val values = mutable.Set[Int]() + var values = Set[Int]() def setup(partition:Int) {this.partition = partition} def map(i:Int) = {values += i; i * 2} def cleanup = { //the purpose of this strange code is just to make sure this method is called // after the data has been iterated through completely. - acc.localValue += (partition -> Set[Int](values.toSeq: _*)) + acc.localValue += (partition -> values) } }).collect @@ -203,16 +203,16 @@ class RDDSuite extends FunSuite with LocalSparkContext { val acc2 = sc.accumulableCollection(new HashMap[Int,Set[Int]]()) val m2 = data.mapPartitionsWithSplit{ case (partition, itr) => - val values = mutable.Set[Int]() + var values = Set[Int]() val mItr = itr.map{i => values += i; i * 2} //you haven't actually put anything into values yet, b/c itr.map defines another // iterator, which is lazily computed. so the Set is empty - acc2.localValue += (partition -> Set[Int](values.toSeq: _*)) + acc2.localValue += (partition -> values) mItr }.collect assert(m2.toSet === (1 to 100).map{_ * 2}.toSet) - assert(acc2.value.keySet == (0 to 3).toSet) + assert(acc2.value.keySet === (0 to 3).toSet) //this condition will fail // acc2.value.foreach { case(partition, values) => // assert(values.size === 25) From e347e49b1e91a29324423a2f4c7c5ca88afdc107 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Sun, 10 Feb 2013 14:39:28 -0800 Subject: [PATCH 04/12] switch to using onCompleteCallback, to make sure cleanup is called even if exception is thrown --- core/src/main/scala/spark/RDD.scala | 13 ++++------ .../MapPartitionsWithSetupAndCleanup.scala | 26 +++++++++++++++++++ 2 files changed, 31 insertions(+), 8 deletions(-) create mode 100644 core/src/main/scala/spark/rdd/MapPartitionsWithSetupAndCleanup.scala diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 3da0e8ac2a..31c7ec81ad 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -345,13 +345,10 @@ abstract class RDD[T: ClassManifest]( * This might be useful if you need to setup some resources per task & cleanup them up at the end, eg. * a db connection */ - def mapWithSetupAndCleanup[U: ClassManifest](m: PartitionMapper[T,U]) : RDD[U] = - mapPartitionsWithSplit{ - case(partition, itr) => - m.setup(partition) - val subItr = itr.map{m.map} - CleanupIterator[U,Iterator[U]](subItr, m.cleanup _) - } + def mapWithSetupAndCleanup[U: ClassManifest]( + m: PartitionMapper[T,U], + preservesPartitioning: Boolean = false): RDD[U] = + new MapPartitionsWithSetupAndCleanup(this, m, preservesPartitioning) /** * Zips this RDD with another one, returning key-value pairs with the first element in each RDD, @@ -713,7 +710,7 @@ object RDD { def map(t: T) : U /** - * called at the end of each partition + * called at the end of each partition. This will get called even if the map failed (eg., an exception was thrown) */ def cleanup } diff --git a/core/src/main/scala/spark/rdd/MapPartitionsWithSetupAndCleanup.scala b/core/src/main/scala/spark/rdd/MapPartitionsWithSetupAndCleanup.scala new file mode 100644 index 0000000000..42a1cbc923 --- /dev/null +++ b/core/src/main/scala/spark/rdd/MapPartitionsWithSetupAndCleanup.scala @@ -0,0 +1,26 @@ +package spark.rdd + +import spark.{TaskContext, Split, RDD} +import spark.RDD.PartitionMapper + +/** + * + */ + +class MapPartitionsWithSetupAndCleanup[U: ClassManifest, T: ClassManifest]( + prev: RDD[T], + m: PartitionMapper[T,U], + preservesPartitioning: Boolean +) extends RDD[U](prev){ + + override def getSplits = firstParent[T].splits + + override val partitioner = if (preservesPartitioning) prev.partitioner else None + + override def compute(split: Split, context: TaskContext) = { + context.addOnCompleteCallback(m.cleanup _) + m.setup(split.index) + firstParent[T].iterator(split, context).map(m.map _) + } + +} From a93d195be4743f7f5559c82db80835c8634ae579 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Sun, 10 Feb 2013 14:40:58 -0800 Subject: [PATCH 05/12] fixup! switch --- core/src/main/scala/spark/RDD.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 31c7ec81ad..2757c10b40 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -23,6 +23,7 @@ import spark.rdd.FlatMappedRDD import spark.rdd.GlommedRDD import spark.rdd.MappedRDD import spark.rdd.MapPartitionsRDD +import spark.rdd.MapPartitionsWithSetupAndCleanup import spark.rdd.MapPartitionsWithSplitRDD import spark.rdd.PipedRDD import spark.rdd.SampledRDD From 0807ee30ddb0343e3a1deca60cb8e10d41bb89a8 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Sun, 10 Feb 2013 14:41:36 -0800 Subject: [PATCH 06/12] remove CleanupIterator as its no longer used --- core/src/main/scala/spark/RDD.scala | 1 - .../scala/spark/util/CleanupIterator.scala | 25 ------------------- 2 files changed, 26 deletions(-) delete mode 100644 core/src/main/scala/spark/util/CleanupIterator.scala diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 2757c10b40..51acbd44c3 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -33,7 +33,6 @@ import spark.storage.StorageLevel import spark.SparkContext._ import spark.RDD.PartitionMapper -import spark.util.CleanupIterator /** * A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, diff --git a/core/src/main/scala/spark/util/CleanupIterator.scala b/core/src/main/scala/spark/util/CleanupIterator.scala deleted file mode 100644 index 7e804508b8..0000000000 --- a/core/src/main/scala/spark/util/CleanupIterator.scala +++ /dev/null @@ -1,25 +0,0 @@ -package spark.util - -/** - * Wrapper around an iterator which calls a cleanup method when its finished iterating through its elements - */ -abstract class CleanupIterator[+A, +I <: Iterator[A]](sub: I) extends Iterator[A]{ - def next = sub.next - def hasNext = { - val r = sub.hasNext - if (!r) { - cleanup - } - r - } - - def cleanup -} - -object CleanupIterator { - def apply[A, I <: Iterator[A]](sub: I, cleanupFunction: () => Unit) : CleanupIterator[A,I] = { - new CleanupIterator[A,I](sub) { - def cleanup = cleanupFunction() - } - } -} \ No newline at end of file From 74c606a2202c57026697f7bc94c1984c375c7406 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Sun, 10 Feb 2013 18:16:05 -0800 Subject: [PATCH 07/12] add java api for partition mapper --- .../spark/api/java/JavaPartitionMapper.java | 19 +++++++++++++++++++ .../scala/spark/api/java/JavaRDDLike.scala | 10 ++++++++++ 2 files changed, 29 insertions(+) create mode 100644 core/src/main/scala/spark/api/java/JavaPartitionMapper.java diff --git a/core/src/main/scala/spark/api/java/JavaPartitionMapper.java b/core/src/main/scala/spark/api/java/JavaPartitionMapper.java new file mode 100644 index 0000000000..d88c964245 --- /dev/null +++ b/core/src/main/scala/spark/api/java/JavaPartitionMapper.java @@ -0,0 +1,19 @@ +package spark.api.java; + +import scala.reflect.ClassManifest; +import scala.reflect.ClassManifest$; + +import java.io.Serializable; + +public abstract class JavaPartitionMapper implements Serializable { + + public abstract void setup(int partition); + + public abstract R map(T t) throws Exception; + + public abstract void cleanup(); + + public ClassManifest returnType() { + return (ClassManifest) ClassManifest$.MODULE$.fromClass(Object.class); + } +} diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala index 60025b459c..7629507fc4 100644 --- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala @@ -10,6 +10,7 @@ import spark.api.java.function.{Function2 => JFunction2, Function => JFunction, import spark.partial.{PartialResult, BoundedDouble} import spark.storage.StorageLevel import com.google.common.base.Optional +import spark.RDD.PartitionMapper trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround[T] { @@ -116,6 +117,15 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround JavaPairRDD.fromRDD(rdd.mapPartitions(fn))(f.keyType(), f.valueType()) } + def mapPartitionsWithSetupAndCleanup[U](m: JavaPartitionMapper[T,U]): JavaRDD[U] = { + val scalaMapper = new PartitionMapper[T,U] { + def setup(partition:Int) = m.setup(partition) + def map(t:T) = m.map(t) + def cleanup() = m.cleanup() + } + JavaRDD.fromRDD(rdd.mapWithSetupAndCleanup(scalaMapper)(m.returnType()))(m.returnType()) + } + /** * Return an RDD created by coalescing all elements within each partition into an array. */ From b69c6190f3e558ddb2f283913b15180ce207199a Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Sun, 10 Feb 2013 21:28:37 -0800 Subject: [PATCH 08/12] more java api for partition mapper --- .../api/java/JavaDoublePartitionMapper.java | 12 +++++++ .../api/java/JavaPairPartitionMapper.java | 14 ++++++++ .../spark/api/java/JavaPartitionMapper.java | 19 ---------- .../scala/spark/api/java/JavaRDDLike.scala | 35 ++++++++++++++++--- .../scala/spark/api/java/ManifestHelper.java | 11 ++++++ 5 files changed, 68 insertions(+), 23 deletions(-) create mode 100644 core/src/main/scala/spark/api/java/JavaDoublePartitionMapper.java create mode 100644 core/src/main/scala/spark/api/java/JavaPairPartitionMapper.java delete mode 100644 core/src/main/scala/spark/api/java/JavaPartitionMapper.java create mode 100644 core/src/main/scala/spark/api/java/ManifestHelper.java diff --git a/core/src/main/scala/spark/api/java/JavaDoublePartitionMapper.java b/core/src/main/scala/spark/api/java/JavaDoublePartitionMapper.java new file mode 100644 index 0000000000..aadf403087 --- /dev/null +++ b/core/src/main/scala/spark/api/java/JavaDoublePartitionMapper.java @@ -0,0 +1,12 @@ +package spark.api.java; + +import java.io.Serializable; + +public abstract class JavaDoublePartitionMapper implements Serializable { + + public abstract void setup(int partition); + + public abstract Double map(T t) throws Exception; + + public abstract void cleanup(); +} diff --git a/core/src/main/scala/spark/api/java/JavaPairPartitionMapper.java b/core/src/main/scala/spark/api/java/JavaPairPartitionMapper.java new file mode 100644 index 0000000000..0ec4bd34cb --- /dev/null +++ b/core/src/main/scala/spark/api/java/JavaPairPartitionMapper.java @@ -0,0 +1,14 @@ +package spark.api.java; + +import scala.Tuple2; + +import java.io.Serializable; + +public abstract class JavaPairPartitionMapper implements Serializable { + + public abstract void setup(int partition); + + public abstract Tuple2 map(T t) throws Exception; + + public abstract void cleanup(); +} diff --git a/core/src/main/scala/spark/api/java/JavaPartitionMapper.java b/core/src/main/scala/spark/api/java/JavaPartitionMapper.java deleted file mode 100644 index d88c964245..0000000000 --- a/core/src/main/scala/spark/api/java/JavaPartitionMapper.java +++ /dev/null @@ -1,19 +0,0 @@ -package spark.api.java; - -import scala.reflect.ClassManifest; -import scala.reflect.ClassManifest$; - -import java.io.Serializable; - -public abstract class JavaPartitionMapper implements Serializable { - - public abstract void setup(int partition); - - public abstract R map(T t) throws Exception; - - public abstract void cleanup(); - - public ClassManifest returnType() { - return (ClassManifest) ClassManifest$.MODULE$.fromClass(Object.class); - } -} diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala index 7629507fc4..32a60b7550 100644 --- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala @@ -11,6 +11,7 @@ import spark.partial.{PartialResult, BoundedDouble} import spark.storage.StorageLevel import com.google.common.base.Optional import spark.RDD.PartitionMapper +import spark.api.java.ManifestHelper.fakeManifest trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround[T] { @@ -117,13 +118,39 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround JavaPairRDD.fromRDD(rdd.mapPartitions(fn))(f.keyType(), f.valueType()) } - def mapPartitionsWithSetupAndCleanup[U](m: JavaPartitionMapper[T,U]): JavaRDD[U] = { - val scalaMapper = new PartitionMapper[T,U] { + /** + * Return a new RDD by applying a function to each element of the RDD, with an additional + * setup & cleanup that happens before & after computing each partition + */ + def mapWithSetupAndCleanup[U](m: PartitionMapper[T,U]): JavaRDD[U] = { + JavaRDD.fromRDD(rdd.mapWithSetupAndCleanup(m)(fakeManifest[U]))(fakeManifest[U]) + } + + /** + * Return a new RDD by applying a function to each element of the RDD, with an additional + * setup & cleanup that happens before & after computing each partition + */ + def mapWithSetupAndCleanup[K,V](m: JavaPairPartitionMapper[T,K,V]): JavaPairRDD[K,V] = { + val scalaMapper = new PartitionMapper[T,(K,V)] { + def setup(partition:Int) = m.setup(partition) + def map(t:T) = m.map(t) + def cleanup = m.cleanup() + } + JavaPairRDD.fromRDD(rdd.mapWithSetupAndCleanup(scalaMapper)(fakeManifest[(K,V)]))( + fakeManifest[K], fakeManifest[V]) + } + + /** + * Return a new RDD by applying a function to each element of the RDD, with an additional + * setup & cleanup that happens before & after computing each partition + */ + def mapWithSetupAndCleanup(m: JavaDoublePartitionMapper[T]): JavaDoubleRDD = { + val scalaMapper = new PartitionMapper[T,Double] { def setup(partition:Int) = m.setup(partition) def map(t:T) = m.map(t) - def cleanup() = m.cleanup() + def cleanup = m.cleanup() } - JavaRDD.fromRDD(rdd.mapWithSetupAndCleanup(scalaMapper)(m.returnType()))(m.returnType()) + JavaDoubleRDD.fromRDD(rdd.mapWithSetupAndCleanup(scalaMapper)(manifest[Double])) } /** diff --git a/core/src/main/scala/spark/api/java/ManifestHelper.java b/core/src/main/scala/spark/api/java/ManifestHelper.java new file mode 100644 index 0000000000..f2ca6da022 --- /dev/null +++ b/core/src/main/scala/spark/api/java/ManifestHelper.java @@ -0,0 +1,11 @@ +package spark.api.java; + +import scala.reflect.ClassManifest; +import scala.reflect.ClassManifest$; + +class ManifestHelper { + + public static ClassManifest fakeManifest() { + return (ClassManifest) ClassManifest$.MODULE$.fromClass(Object.class); + } +} From 2a337659b217851bd018d8ff868955611092c4fc Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Tue, 12 Feb 2013 07:57:24 -0800 Subject: [PATCH 09/12] JavaPairPartitionMapper directly implements PartitionMapper, so we can remove a helper object --- .../main/scala/spark/api/java/JavaPairPartitionMapper.java | 5 +++-- core/src/main/scala/spark/api/java/JavaRDDLike.scala | 7 +------ 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/spark/api/java/JavaPairPartitionMapper.java b/core/src/main/scala/spark/api/java/JavaPairPartitionMapper.java index 0ec4bd34cb..ec4ed36d97 100644 --- a/core/src/main/scala/spark/api/java/JavaPairPartitionMapper.java +++ b/core/src/main/scala/spark/api/java/JavaPairPartitionMapper.java @@ -1,14 +1,15 @@ package spark.api.java; import scala.Tuple2; +import spark.RDD; import java.io.Serializable; -public abstract class JavaPairPartitionMapper implements Serializable { +public abstract class JavaPairPartitionMapper implements spark.RDD.PartitionMapper>, Serializable { public abstract void setup(int partition); - public abstract Tuple2 map(T t) throws Exception; + public abstract Tuple2 map(T t); public abstract void cleanup(); } diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala index 32a60b7550..a163ff574d 100644 --- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala @@ -131,12 +131,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround * setup & cleanup that happens before & after computing each partition */ def mapWithSetupAndCleanup[K,V](m: JavaPairPartitionMapper[T,K,V]): JavaPairRDD[K,V] = { - val scalaMapper = new PartitionMapper[T,(K,V)] { - def setup(partition:Int) = m.setup(partition) - def map(t:T) = m.map(t) - def cleanup = m.cleanup() - } - JavaPairRDD.fromRDD(rdd.mapWithSetupAndCleanup(scalaMapper)(fakeManifest[(K,V)]))( + JavaPairRDD.fromRDD(rdd.mapWithSetupAndCleanup(m)(fakeManifest[(K,V)]))( fakeManifest[K], fakeManifest[V]) } From dc3dbbcca8249239637af193d321abf87bccd5c2 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Tue, 12 Feb 2013 22:06:33 -0800 Subject: [PATCH 10/12] cleanup java api; add some tests --- core/src/main/scala/spark/RDD.scala | 1 + .../api/java/JavaDoublePartitionMapper.java | 10 +--- .../api/java/JavaPairPartitionMapper.java | 11 +--- .../scala/spark/api/java/JavaRDDLike.scala | 7 +-- core/src/test/scala/spark/JavaAPISuite.java | 53 +++++++++++++++++-- 5 files changed, 53 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 51acbd44c3..a7010d7633 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -707,6 +707,7 @@ object RDD { /** * transform one element of the partition */ + @throws(classOf[Exception]) //for the java api def map(t: T) : U /** diff --git a/core/src/main/scala/spark/api/java/JavaDoublePartitionMapper.java b/core/src/main/scala/spark/api/java/JavaDoublePartitionMapper.java index aadf403087..5d07c10572 100644 --- a/core/src/main/scala/spark/api/java/JavaDoublePartitionMapper.java +++ b/core/src/main/scala/spark/api/java/JavaDoublePartitionMapper.java @@ -1,12 +1,4 @@ package spark.api.java; -import java.io.Serializable; - -public abstract class JavaDoublePartitionMapper implements Serializable { - - public abstract void setup(int partition); - - public abstract Double map(T t) throws Exception; - - public abstract void cleanup(); +public abstract class JavaDoublePartitionMapper implements spark.RDD.PartitionMapper { } diff --git a/core/src/main/scala/spark/api/java/JavaPairPartitionMapper.java b/core/src/main/scala/spark/api/java/JavaPairPartitionMapper.java index ec4ed36d97..a09bf66cf1 100644 --- a/core/src/main/scala/spark/api/java/JavaPairPartitionMapper.java +++ b/core/src/main/scala/spark/api/java/JavaPairPartitionMapper.java @@ -1,15 +1,6 @@ package spark.api.java; import scala.Tuple2; -import spark.RDD; -import java.io.Serializable; - -public abstract class JavaPairPartitionMapper implements spark.RDD.PartitionMapper>, Serializable { - - public abstract void setup(int partition); - - public abstract Tuple2 map(T t); - - public abstract void cleanup(); +public abstract class JavaPairPartitionMapper implements spark.RDD.PartitionMapper> { } diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala index a163ff574d..f3bd918a5f 100644 --- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala @@ -140,12 +140,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround * setup & cleanup that happens before & after computing each partition */ def mapWithSetupAndCleanup(m: JavaDoublePartitionMapper[T]): JavaDoubleRDD = { - val scalaMapper = new PartitionMapper[T,Double] { - def setup(partition:Int) = m.setup(partition) - def map(t:T) = m.map(t) - def cleanup = m.cleanup() - } - JavaDoubleRDD.fromRDD(rdd.mapWithSetupAndCleanup(scalaMapper)(manifest[Double])) + JavaDoubleRDD.fromRDD(rdd.mapWithSetupAndCleanup(m)(manifest[java.lang.Double]).asInstanceOf[RDD[Double]]) } /** diff --git a/core/src/test/scala/spark/JavaAPISuite.java b/core/src/test/scala/spark/JavaAPISuite.java index 934e4c2f67..1c6039c0f8 100644 --- a/core/src/test/scala/spark/JavaAPISuite.java +++ b/core/src/test/scala/spark/JavaAPISuite.java @@ -19,10 +19,7 @@ import org.junit.Before; import org.junit.Test; -import spark.api.java.JavaDoubleRDD; -import spark.api.java.JavaPairRDD; -import spark.api.java.JavaRDD; -import spark.api.java.JavaSparkContext; +import spark.api.java.*; import spark.api.java.function.*; import spark.partial.BoundedDouble; import spark.partial.PartialResult; @@ -400,6 +397,54 @@ public Iterable call(Iterator iter) { Assert.assertEquals("[3, 7]", partitionSums.collect().toString()); } + @Test + public void mapPartitionsWithSetupAndCleanup() { + //the real test of the behavior is in the scala test, just make sure the java api wrappers are OK + JavaRDD rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10), 4); + + JavaPairRDD pairRdd = rdd.mapWithSetupAndCleanup(new JavaPairPartitionMapper() { + @Override + public void setup(int partition) { + System.out.println("setup " + partition); + } + + @Override + public Tuple2 map(Integer integer) { + return new Tuple2(integer, integer + "_"); + } + + @Override + public void cleanup() { + System.out.println("cleanup"); + } + }); + Assert.assertEquals( + "[(1,1_), (2,2_), (3,3_), (4,4_), (5,5_), (6,6_), (7,7_), (8,8_), (9,9_), (10,10_)]", + pairRdd.collect().toString()); + + + JavaDoubleRDD doubleRdd = rdd.mapWithSetupAndCleanup(new JavaDoublePartitionMapper() { + @Override + public void setup(int partition) { + System.out.println("setup" + partition); + } + + @Override + public Double map(Integer integer) throws Exception { + return integer.doubleValue(); + } + + @Override + public void cleanup() { + System.out.println("cleanup"); + } + }); + Assert.assertEquals( + "[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]", + doubleRdd.collect().toString()); + + } + @Test public void persist() { JavaDoubleRDD doubleRDD = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0)); From 442e941f4e8ac048efaffda7d343624cbe49b49e Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Sat, 16 Mar 2013 14:23:47 -0700 Subject: [PATCH 11/12] rename to mapWithSetup; fix for split -> partition rename; fix spacing --- core/src/main/scala/spark/RDD.scala | 2 +- .../scala/spark/api/java/JavaRDDLike.scala | 12 +-- .../MapPartitionsWithSetupAndCleanup.scala | 6 +- core/src/test/scala/spark/JavaAPISuite.java | 88 +++++++++---------- core/src/test/scala/spark/RDDSuite.scala | 2 +- 5 files changed, 55 insertions(+), 55 deletions(-) diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 7ad4a7dfeb..c66759f59f 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -370,7 +370,7 @@ abstract class RDD[T: ClassManifest]( * This might be useful if you need to setup some resources per task & cleanup them up at the end, eg. * a db connection */ - def mapWithSetupAndCleanup[U: ClassManifest]( + def mapWithSetup[U: ClassManifest]( m: PartitionMapper[T,U], preservesPartitioning: Boolean = false): RDD[U] = new MapPartitionsWithSetupAndCleanup(this, m, preservesPartitioning) diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala index f6ec98d295..4dcf705e85 100644 --- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala @@ -123,16 +123,16 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * Return a new RDD by applying a function to each element of the RDD, with an additional * setup & cleanup that happens before & after computing each partition */ - def mapWithSetupAndCleanup[U](m: PartitionMapper[T,U]): JavaRDD[U] = { - JavaRDD.fromRDD(rdd.mapWithSetupAndCleanup(m)(fakeManifest[U]))(fakeManifest[U]) + def mapWithSetup[U](m: PartitionMapper[T,U]): JavaRDD[U] = { + JavaRDD.fromRDD(rdd.mapWithSetup(m)(fakeManifest[U]))(fakeManifest[U]) } /** * Return a new RDD by applying a function to each element of the RDD, with an additional * setup & cleanup that happens before & after computing each partition */ - def mapWithSetupAndCleanup[K,V](m: JavaPairPartitionMapper[T,K,V]): JavaPairRDD[K,V] = { - JavaPairRDD.fromRDD(rdd.mapWithSetupAndCleanup(m)(fakeManifest[(K,V)]))( + def mapWithSetup[K,V](m: JavaPairPartitionMapper[T,K,V]): JavaPairRDD[K,V] = { + JavaPairRDD.fromRDD(rdd.mapWithSetup(m)(fakeManifest[(K,V)]))( fakeManifest[K], fakeManifest[V]) } @@ -140,8 +140,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * Return a new RDD by applying a function to each element of the RDD, with an additional * setup & cleanup that happens before & after computing each partition */ - def mapWithSetupAndCleanup(m: JavaDoublePartitionMapper[T]): JavaDoubleRDD = { - JavaDoubleRDD.fromRDD(rdd.mapWithSetupAndCleanup(m)(manifest[java.lang.Double]).asInstanceOf[RDD[Double]]) + def mapWithSetup(m: JavaDoublePartitionMapper[T]): JavaDoubleRDD = { + JavaDoubleRDD.fromRDD(rdd.mapWithSetup(m)(manifest[java.lang.Double]).asInstanceOf[RDD[Double]]) } /** diff --git a/core/src/main/scala/spark/rdd/MapPartitionsWithSetupAndCleanup.scala b/core/src/main/scala/spark/rdd/MapPartitionsWithSetupAndCleanup.scala index 42a1cbc923..e818b5e9b3 100644 --- a/core/src/main/scala/spark/rdd/MapPartitionsWithSetupAndCleanup.scala +++ b/core/src/main/scala/spark/rdd/MapPartitionsWithSetupAndCleanup.scala @@ -1,6 +1,6 @@ package spark.rdd -import spark.{TaskContext, Split, RDD} +import spark.{TaskContext, Partition, RDD} import spark.RDD.PartitionMapper /** @@ -13,11 +13,11 @@ class MapPartitionsWithSetupAndCleanup[U: ClassManifest, T: ClassManifest]( preservesPartitioning: Boolean ) extends RDD[U](prev){ - override def getSplits = firstParent[T].splits + override def getPartitions = firstParent[T].partitions override val partitioner = if (preservesPartitioning) prev.partitioner else None - override def compute(split: Split, context: TaskContext) = { + override def compute(split: Partition, context: TaskContext) = { context.addOnCompleteCallback(m.cleanup _) m.setup(split.index) firstParent[T].iterator(split, context).map(m.map _) diff --git a/core/src/test/scala/spark/JavaAPISuite.java b/core/src/test/scala/spark/JavaAPISuite.java index 167fbcaa84..0850dbc99a 100644 --- a/core/src/test/scala/spark/JavaAPISuite.java +++ b/core/src/test/scala/spark/JavaAPISuite.java @@ -399,51 +399,51 @@ public Iterable call(Iterator iter) { @Test public void mapPartitionsWithSetupAndCleanup() { - //the real test of the behavior is in the scala test, just make sure the java api wrappers are OK - JavaRDD rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10), 4); - - JavaPairRDD pairRdd = rdd.mapWithSetupAndCleanup(new JavaPairPartitionMapper() { - @Override - public void setup(int partition) { - System.out.println("setup " + partition); - } - - @Override - public Tuple2 map(Integer integer) { - return new Tuple2(integer, integer + "_"); - } - - @Override - public void cleanup() { - System.out.println("cleanup"); - } - }); - Assert.assertEquals( - "[(1,1_), (2,2_), (3,3_), (4,4_), (5,5_), (6,6_), (7,7_), (8,8_), (9,9_), (10,10_)]", - pairRdd.collect().toString()); - - - JavaDoubleRDD doubleRdd = rdd.mapWithSetupAndCleanup(new JavaDoublePartitionMapper() { - @Override - public void setup(int partition) { - System.out.println("setup" + partition); - } - - @Override - public Double map(Integer integer) throws Exception { - return integer.doubleValue(); - } - - @Override - public void cleanup() { - System.out.println("cleanup"); - } - }); - Assert.assertEquals( - "[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]", - doubleRdd.collect().toString()); + //the real test of the behavior is in the scala test, just make sure the java api wrappers are OK + JavaRDD rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10), 4); - } + JavaPairRDD pairRdd = rdd.mapWithSetupAndCleanup(new JavaPairPartitionMapper() { + @Override + public void setup(int partition) { + System.out.println("setup " + partition); + } + + @Override + public Tuple2 map(Integer integer) { + return new Tuple2(integer, integer + "_"); + } + + @Override + public void cleanup() { + System.out.println("cleanup"); + } + }); + Assert.assertEquals( + "[(1,1_), (2,2_), (3,3_), (4,4_), (5,5_), (6,6_), (7,7_), (8,8_), (9,9_), (10,10_)]", + pairRdd.collect().toString()); + + + JavaDoubleRDD doubleRdd = rdd.mapWithSetupAndCleanup(new JavaDoublePartitionMapper() { + @Override + public void setup(int partition) { + System.out.println("setup" + partition); + } + + @Override + public Double map(Integer integer) throws Exception { + return integer.doubleValue(); + } + + @Override + public void cleanup() { + System.out.println("cleanup"); + } + }); + Assert.assertEquals( + "[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]", + doubleRdd.collect().toString()); + + } @Test public void persist() { diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index 68fc59e069..db30fbbdbd 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -185,7 +185,7 @@ class RDDSuite extends FunSuite with LocalSparkContext { sc = new SparkContext("local[4]", "test") val data = sc.parallelize(1 to 100, 4) val acc = sc.accumulableCollection(new HashMap[Int,Set[Int]]()) - val mapped = data.mapWithSetupAndCleanup(new PartitionMapper[Int,Int](){ + val mapped = data.mapWithSetup(new PartitionMapper[Int,Int](){ var partition = -1 var values = Set[Int]() def setup(partition:Int) {this.partition = partition} From 90c7fabe79d6c956e7f6cc208e9fd2fba3f149bb Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Tue, 19 Mar 2013 23:35:13 -0700 Subject: [PATCH 12/12] doh, fix missing refactor of mapWithSetup --- core/src/test/scala/spark/JavaAPISuite.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/spark/JavaAPISuite.java b/core/src/test/scala/spark/JavaAPISuite.java index 0850dbc99a..8520ae911c 100644 --- a/core/src/test/scala/spark/JavaAPISuite.java +++ b/core/src/test/scala/spark/JavaAPISuite.java @@ -402,7 +402,7 @@ public void mapPartitionsWithSetupAndCleanup() { //the real test of the behavior is in the scala test, just make sure the java api wrappers are OK JavaRDD rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10), 4); - JavaPairRDD pairRdd = rdd.mapWithSetupAndCleanup(new JavaPairPartitionMapper() { + JavaPairRDD pairRdd = rdd.mapWithSetup(new JavaPairPartitionMapper() { @Override public void setup(int partition) { System.out.println("setup " + partition); @@ -423,7 +423,7 @@ public void cleanup() { pairRdd.collect().toString()); - JavaDoubleRDD doubleRdd = rdd.mapWithSetupAndCleanup(new JavaDoublePartitionMapper() { + JavaDoubleRDD doubleRdd = rdd.mapWithSetup(new JavaDoublePartitionMapper() { @Override public void setup(int partition) { System.out.println("setup" + partition);