From bfe7b52addf92adbfd269afe0f0c33986780bbbe Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Tue, 8 Apr 2014 20:09:06 -0700 Subject: [PATCH] Adding experimental for approximate counts --- .../apache/spark/api/java/JavaDoubleRDD.scala | 19 ++++++++++++++++--- .../apache/spark/api/java/JavaPairRDD.scala | 11 ++++++++--- .../apache/spark/api/java/JavaRDDLike.scala | 9 +++++++-- .../apache/spark/rdd/DoubleRDDFunctions.scala | 13 +++++++++++-- .../apache/spark/rdd/PairRDDFunctions.scala | 5 ++++- 5 files changed, 46 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala index f816bb43a5b44..ffb9425872c41 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala @@ -23,6 +23,7 @@ import scala.reflect.ClassTag import org.apache.spark.Partitioner import org.apache.spark.SparkContext.doubleRDDToDoubleRDDFunctions +import org.apache.spark.annotations.Experimental import org.apache.spark.api.java.function.{Function => JFunction} import org.apache.spark.partial.{BoundedDouble, PartialResult} import org.apache.spark.rdd.RDD @@ -184,14 +185,26 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[JDouble, Ja def meanApprox(timeout: Long, confidence: JDouble): PartialResult[BoundedDouble] = srdd.meanApprox(timeout, confidence) - /** (Experimental) Approximate operation to return the mean within a timeout. */ + /** + * :: Experimental :: + * Approximate operation to return the mean within a timeout. + */ + @Experimental def meanApprox(timeout: Long): PartialResult[BoundedDouble] = srdd.meanApprox(timeout) - /** (Experimental) Approximate operation to return the sum within a timeout. */ + /** + * :: Experimental :: + * Approximate operation to return the sum within a timeout. + */ + @Experimental def sumApprox(timeout: Long, confidence: JDouble): PartialResult[BoundedDouble] = srdd.sumApprox(timeout, confidence) - /** (Experimental) Approximate operation to return the sum within a timeout. */ + /** + * :: Experimental :: + * Approximate operation to return the sum within a timeout. + */ + @Experimental def sumApprox(timeout: Long): PartialResult[BoundedDouble] = srdd.sumApprox(timeout) /** diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index 9596dbaf75488..eb394acbba447 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -26,11 +26,12 @@ import com.google.common.base.Optional import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.compress.CompressionCodec import org.apache.hadoop.mapred.{JobConf, OutputFormat} -import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, Job} +import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat} import org.apache.spark.{HashPartitioner, Partitioner} import org.apache.spark.Partitioner._ import org.apache.spark.SparkContext.rddToPairRDDFunctions +import org.apache.spark.annotations.Experimental import org.apache.spark.api.java.JavaSparkContext.fakeClassTag import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, PairFunction} import org.apache.spark.partial.{BoundedDouble, PartialResult} @@ -200,16 +201,20 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) def countByKey(): java.util.Map[K, Long] = mapAsJavaMap(rdd.countByKey()) /** - * (Experimental) Approximate version of countByKey that can return a partial result if it does + * :: Experimental :: + * Approximate version of countByKey that can return a partial result if it does * not finish within a timeout. */ + @Experimental def countByKeyApprox(timeout: Long): PartialResult[java.util.Map[K, BoundedDouble]] = rdd.countByKeyApprox(timeout).map(mapAsJavaMap) /** - * (Experimental) Approximate version of countByKey that can return a partial result if it does + * :: Experimental :: + * Approximate version of countByKey that can return a partial result if it does * not finish within a timeout. */ + @Experimental def countByKeyApprox(timeout: Long, confidence: Double = 0.95) : PartialResult[java.util.Map[K, BoundedDouble]] = rdd.countByKeyApprox(timeout, confidence).map(mapAsJavaMap) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index e03b8e78d5f52..e41356c2fc19d 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -26,6 +26,7 @@ import com.google.common.base.Optional import org.apache.hadoop.io.compress.CompressionCodec import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.annotations.Experimental import org.apache.spark.api.java.JavaPairRDD._ import org.apache.spark.api.java.JavaSparkContext.fakeClassTag import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, _} @@ -331,16 +332,20 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def count(): Long = rdd.count() /** - * (Experimental) Approximate version of count() that returns a potentially incomplete result + * :: Experimental :: + * Approximate version of count() that returns a potentially incomplete result * within a timeout, even if not all tasks have finished. */ + @Experimental def countApprox(timeout: Long, confidence: Double): PartialResult[BoundedDouble] = rdd.countApprox(timeout, confidence) /** - * (Experimental) Approximate version of count() that returns a potentially incomplete result + * :: Experimental :: + * Approximate version of count() that returns a potentially incomplete result * within a timeout, even if not all tasks have finished. */ + @Experimental def countApprox(timeout: Long): PartialResult[BoundedDouble] = rdd.countApprox(timeout) diff --git a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala index a7b6b3b5146ce..bb45473ace0ee 100644 --- a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala @@ -17,6 +17,7 @@ package org.apache.spark.rdd +import org.apache.spark.annotations.Experimental import org.apache.spark.{TaskContext, Logging} import org.apache.spark.partial.BoundedDouble import org.apache.spark.partial.MeanEvaluator @@ -63,14 +64,22 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable { */ def sampleVariance(): Double = stats().sampleVariance - /** (Experimental) Approximate operation to return the mean within a timeout. */ + /** + * :: Experimental :: + * Approximate operation to return the mean within a timeout. + */ + @Experimental def meanApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = { val processPartition = (ctx: TaskContext, ns: Iterator[Double]) => StatCounter(ns) val evaluator = new MeanEvaluator(self.partitions.size, confidence) self.context.runApproximateJob(self, processPartition, evaluator, timeout) } - /** (Experimental) Approximate operation to return the sum within a timeout. */ + /** + * :: Experimental :: + * Approximate operation to return the sum within a timeout. + */ + @Experimental def sumApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = { val processPartition = (ctx: TaskContext, ns: Iterator[Double]) => StatCounter(ns) val evaluator = new SumEvaluator(self.partitions.size, confidence) diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 14386ff5b9127..e816b213c802a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -39,6 +39,7 @@ RecordWriter => NewRecordWriter, SparkHadoopMapReduceUtil} import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat} import org.apache.spark._ +import org.apache.spark.annotations.Experimental import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.SparkHadoopWriter import org.apache.spark.Partitioner.defaultPartitioner @@ -201,9 +202,11 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) def countByKey(): Map[K, Long] = self.map(_._1).countByValue() /** - * (Experimental) Approximate version of countByKey that can return a partial result if it does + * :: Experimental :: + * Approximate version of countByKey that can return a partial result if it does * not finish within a timeout. */ + @Experimental def countByKeyApprox(timeout: Long, confidence: Double = 0.95) : PartialResult[Map[K, BoundedDouble]] = { self.map(_._1).countByValueApprox(timeout, confidence)