Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 1246 add min max to stat counter #144

Closed
Closed
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,16 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
new java.util.ArrayList(arr)
}

def max(comp: Comparator[T]): T = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add doc comments to these and the Scala versions

import scala.collection.JavaConversions._
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need to import this if you're going to call Ordering.comparatorToOrdering directly -- was it necessary? It was in some other methods because they used other conversions

rdd.max()(Ordering.comparatorToOrdering(comp))
}

def min(comp: Comparator[T]): T = {
import scala.collection.JavaConversions._
rdd.min()(Ordering.comparatorToOrdering(comp))
}

/**
* Returns the first K elements from this RDD using the
* natural ordering for T while maintain the order.
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -958,6 +958,10 @@ abstract class RDD[T: ClassTag](
*/
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = top(num)(ord.reverse)

def max()(implicit ord: Ordering[T]):T = this.reduce{(x,y) => ord.max(x,y)}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe you can do this.reduce(ord.max), and same for the min below


def min()(implicit ord: Ordering[T]):T = this.reduce{(x,y) => ord.min(x,y)}

/**
* Save this RDD as a text file, using string representations of elements.
*/
Expand Down
16 changes: 15 additions & 1 deletion core/src/main/scala/org/apache/spark/util/StatCounter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {
private var n: Long = 0 // Running count of our values
private var mu: Double = 0 // Running mean of our values
private var m2: Double = 0 // Running variance numerator (sum of (x - mean)^2)
private var max_v: Double = 0 // Running max of our values
private var min_v: Double = 0 // Running min of our values

merge(values)

Expand All @@ -41,6 +43,8 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {
n += 1
mu += delta / n
m2 += delta * (value - mu)
max_v = math.max(max_v, value)
min_v = math.min(min_v, value)
this
}

Expand All @@ -58,7 +62,9 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {
if (n == 0) {
mu = other.mu
m2 = other.m2
n = other.n
n = other.n
max_v = other.max_v
min_v = other.min_v
} else if (other.n != 0) {
val delta = other.mu - mu
if (other.n * 10 < n) {
Expand All @@ -70,6 +76,8 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {
}
m2 += other.m2 + (delta * delta * n * other.n) / (n + other.n)
n += other.n
max_v = math.max(max_v, other.max_v)
min_v = math.min(min_v, other.min_v)
}
this
}
Expand All @@ -81,6 +89,8 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {
other.n = n
other.mu = mu
other.m2 = m2
other.max_v = max_v
other.min_v = min_v
other
}

Expand All @@ -90,6 +100,10 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {

def sum: Double = n * mu

def max: Double = max_v

def min: Double = min_v

/** Return the variance of the values. */
def variance: Double = {
if (n == 0) {
Expand Down
2 changes: 2 additions & 0 deletions core/src/test/scala/org/apache/spark/PartitioningSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet
assert(abs(6.0/2 - rdd.mean) < 0.01)
assert(abs(1.0 - rdd.variance) < 0.01)
assert(abs(1.0 - rdd.stdev) < 0.01)
assert(abs(4.0 - stats.max) === 0)
assert(abs(-1.0 - stats.max) === 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably easier to write these as stats.max === 4.0 and stats.min === -1.0


// Add other tests here for classes that should be able to handle empty partitions correctly
}
Expand Down
2 changes: 2 additions & 0 deletions core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(nums.glom().map(_.toList).collect().toList === List(List(1, 2), List(3, 4)))
assert(nums.collect({ case i if i >= 3 => i.toString }).collect().toList === List("3", "4"))
assert(nums.keyBy(_.toString).collect().toList === List(("1", 1), ("2", 2), ("3", 3), ("4", 4)))
assert(nums.max() === 4)
assert(nums.min() === 1)
val partitionSums = nums.mapPartitions(iter => Iterator(iter.reduceLeft(_ + _)))
assert(partitionSums.collect().toList === List(3, 7))

Expand Down
19 changes: 19 additions & 0 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,26 @@ def func(iterator):
return reduce(op, vals, zeroValue)

# TODO: aggregate


def max(self):
"""
Find the maximum item in this RDD.

>>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).max()
43.0
"""
return self.stats().max()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should use a more direct reduce here too


def min(self):
"""
Find the maximum item in this RDD.

>>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).min()
1.0
"""
return self.stats().min()

def sum(self):
"""
Add up the elements in this RDD.
Expand Down
23 changes: 21 additions & 2 deletions python/pyspark/statcounter.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ def __init__(self, values=[]):
self.n = 0L # Running count of our values
self.mu = 0.0 # Running mean of our values
self.m2 = 0.0 # Running variance numerator (sum of (x - mean)^2)

self.max_v = float("-inf")
self.min_v = float("inf")

for v in values:
self.merge(v)

Expand All @@ -36,6 +38,11 @@ def merge(self, value):
self.n += 1
self.mu += delta / self.n
self.m2 += delta * (value - self.mu)
if self.max_v < value:
self.max_v = value
if self.min_v > value:
self.min_v = value

return self

# Merge another StatCounter into this one, adding up the internal statistics.
Expand All @@ -49,7 +56,10 @@ def mergeStats(self, other):
if self.n == 0:
self.mu = other.mu
self.m2 = other.m2
self.n = other.n
self.n = other.n
self.max_v = other.max_v
self.min_v = other.min_v

elif other.n != 0:
delta = other.mu - self.mu
if other.n * 10 < self.n:
Expand All @@ -58,6 +68,9 @@ def mergeStats(self, other):
self.mu = other.mu - (delta * self.n) / (self.n + other.n)
else:
self.mu = (self.mu * self.n + other.mu * other.n) / (self.n + other.n)

self.max_v = max(self.max_v, other.max_v)
self.min_v = min(self.min_v, other.min_v)

self.m2 += other.m2 + (delta * delta * self.n * other.n) / (self.n + other.n)
self.n += other.n
Expand All @@ -76,6 +89,12 @@ def mean(self):
def sum(self):
return self.n * self.mu

def min(self):
return self.min_v

def max(self):
return self.max_v

# Return the variance of the values.
def variance(self):
if self.n == 0:
Expand Down