Skip to content

Commit

Permalink
Adding experimental for approximate counts
Browse files Browse the repository at this point in the history
  • Loading branch information
pwendell committed Apr 9, 2014
1 parent 8d0c873 commit bfe7b52
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 11 deletions.
19 changes: 16 additions & 3 deletions core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.reflect.ClassTag

import org.apache.spark.Partitioner
import org.apache.spark.SparkContext.doubleRDDToDoubleRDDFunctions
import org.apache.spark.annotations.Experimental
import org.apache.spark.api.java.function.{Function => JFunction}
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -184,14 +185,26 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[JDouble, Ja
def meanApprox(timeout: Long, confidence: JDouble): PartialResult[BoundedDouble] =
srdd.meanApprox(timeout, confidence)

/** (Experimental) Approximate operation to return the mean within a timeout. */
/**
* :: Experimental ::
* Approximate operation to return the mean within a timeout.
*/
@Experimental
def meanApprox(timeout: Long): PartialResult[BoundedDouble] = srdd.meanApprox(timeout)

/** (Experimental) Approximate operation to return the sum within a timeout. */
/**
* :: Experimental ::
* Approximate operation to return the sum within a timeout.
*/
@Experimental
def sumApprox(timeout: Long, confidence: JDouble): PartialResult[BoundedDouble] =
srdd.sumApprox(timeout, confidence)

/** (Experimental) Approximate operation to return the sum within a timeout. */
/**
* :: Experimental ::
* Approximate operation to return the sum within a timeout.
*/
@Experimental
def sumApprox(timeout: Long): PartialResult[BoundedDouble] = srdd.sumApprox(timeout)

/**
Expand Down
11 changes: 8 additions & 3 deletions core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ import com.google.common.base.Optional
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.mapred.{JobConf, OutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, Job}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}

import org.apache.spark.{HashPartitioner, Partitioner}
import org.apache.spark.Partitioner._
import org.apache.spark.SparkContext.rddToPairRDDFunctions
import org.apache.spark.annotations.Experimental
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, PairFunction}
import org.apache.spark.partial.{BoundedDouble, PartialResult}
Expand Down Expand Up @@ -200,16 +201,20 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
def countByKey(): java.util.Map[K, Long] = mapAsJavaMap(rdd.countByKey())

/**
* (Experimental) Approximate version of countByKey that can return a partial result if it does
* :: Experimental ::
* Approximate version of countByKey that can return a partial result if it does
* not finish within a timeout.
*/
@Experimental
def countByKeyApprox(timeout: Long): PartialResult[java.util.Map[K, BoundedDouble]] =
rdd.countByKeyApprox(timeout).map(mapAsJavaMap)

/**
* (Experimental) Approximate version of countByKey that can return a partial result if it does
* :: Experimental ::
* Approximate version of countByKey that can return a partial result if it does
* not finish within a timeout.
*/
@Experimental
def countByKeyApprox(timeout: Long, confidence: Double = 0.95)
: PartialResult[java.util.Map[K, BoundedDouble]] =
rdd.countByKeyApprox(timeout, confidence).map(mapAsJavaMap)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import com.google.common.base.Optional
import org.apache.hadoop.io.compress.CompressionCodec

import org.apache.spark.{Partition, SparkContext, TaskContext}
import org.apache.spark.annotations.Experimental
import org.apache.spark.api.java.JavaPairRDD._
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, _}
Expand Down Expand Up @@ -331,16 +332,20 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def count(): Long = rdd.count()

/**
* (Experimental) Approximate version of count() that returns a potentially incomplete result
* :: Experimental ::
* Approximate version of count() that returns a potentially incomplete result
* within a timeout, even if not all tasks have finished.
*/
@Experimental
def countApprox(timeout: Long, confidence: Double): PartialResult[BoundedDouble] =
rdd.countApprox(timeout, confidence)

/**
* (Experimental) Approximate version of count() that returns a potentially incomplete result
* :: Experimental ::
* Approximate version of count() that returns a potentially incomplete result
* within a timeout, even if not all tasks have finished.
*/
@Experimental
def countApprox(timeout: Long): PartialResult[BoundedDouble] =
rdd.countApprox(timeout)

Expand Down
13 changes: 11 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.rdd

import org.apache.spark.annotations.Experimental
import org.apache.spark.{TaskContext, Logging}
import org.apache.spark.partial.BoundedDouble
import org.apache.spark.partial.MeanEvaluator
Expand Down Expand Up @@ -63,14 +64,22 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
*/
def sampleVariance(): Double = stats().sampleVariance

/** (Experimental) Approximate operation to return the mean within a timeout. */
/**
* :: Experimental ::
* Approximate operation to return the mean within a timeout.
*/
@Experimental
def meanApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = {
val processPartition = (ctx: TaskContext, ns: Iterator[Double]) => StatCounter(ns)
val evaluator = new MeanEvaluator(self.partitions.size, confidence)
self.context.runApproximateJob(self, processPartition, evaluator, timeout)
}

/** (Experimental) Approximate operation to return the sum within a timeout. */
/**
* :: Experimental ::
* Approximate operation to return the sum within a timeout.
*/
@Experimental
def sumApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = {
val processPartition = (ctx: TaskContext, ns: Iterator[Double]) => StatCounter(ns)
val evaluator = new SumEvaluator(self.partitions.size, confidence)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ RecordWriter => NewRecordWriter, SparkHadoopMapReduceUtil}
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}

import org.apache.spark._
import org.apache.spark.annotations.Experimental
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.SparkHadoopWriter
import org.apache.spark.Partitioner.defaultPartitioner
Expand Down Expand Up @@ -201,9 +202,11 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
def countByKey(): Map[K, Long] = self.map(_._1).countByValue()

/**
* (Experimental) Approximate version of countByKey that can return a partial result if it does
* :: Experimental ::
* Approximate version of countByKey that can return a partial result if it does
* not finish within a timeout.
*/
@Experimental
def countByKeyApprox(timeout: Long, confidence: Double = 0.95)
: PartialResult[Map[K, BoundedDouble]] = {
self.map(_._1).countByValueApprox(timeout, confidence)
Expand Down

0 comments on commit bfe7b52

Please sign in to comment.