-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark 1246 add min max to stat counter #144
Changes from 7 commits
4916016
eaf89d9
29981f2
37a7dea
1e7056d
ed67136
a5c13b0
1a97558
21dd366
5d96799
82cde0e
fd3fd4b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -477,6 +477,16 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { | |
new java.util.ArrayList(arr) | ||
} | ||
|
||
def max(comp: Comparator[T]): T = { | ||
import scala.collection.JavaConversions._ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think you need to import this if you're going to call Ordering.comparatorToOrdering directly -- was it necessary? It was in some other methods because they used other conversions |
||
rdd.max()(Ordering.comparatorToOrdering(comp)) | ||
} | ||
|
||
def min(comp: Comparator[T]): T = { | ||
import scala.collection.JavaConversions._ | ||
rdd.min()(Ordering.comparatorToOrdering(comp)) | ||
} | ||
|
||
/** | ||
* Returns the first K elements from this RDD using the | ||
* natural ordering for T while maintain the order. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -958,6 +958,10 @@ abstract class RDD[T: ClassTag]( | |
*/ | ||
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = top(num)(ord.reverse) | ||
|
||
def max()(implicit ord: Ordering[T]):T = this.reduce{(x,y) => ord.max(x,y)} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe you can do |
||
|
||
def min()(implicit ord: Ordering[T]):T = this.reduce{(x,y) => ord.min(x,y)} | ||
|
||
/** | ||
* Save this RDD as a text file, using string representations of elements. | ||
*/ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -171,6 +171,8 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet | |
assert(abs(6.0/2 - rdd.mean) < 0.01) | ||
assert(abs(1.0 - rdd.variance) < 0.01) | ||
assert(abs(1.0 - rdd.stdev) < 0.01) | ||
assert(abs(4.0 - stats.max) === 0) | ||
assert(abs(-1.0 - stats.max) === 0) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably easier to write these as stats.max === 4.0 and stats.min === -1.0 |
||
|
||
// Add other tests here for classes that should be able to handle empty partitions correctly | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -534,7 +534,26 @@ def func(iterator): | |
return reduce(op, vals, zeroValue) | ||
|
||
# TODO: aggregate | ||
|
||
|
||
def max(self): | ||
""" | ||
Find the maximum item in this RDD. | ||
|
||
>>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).max() | ||
43.0 | ||
""" | ||
return self.stats().max() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should use a more direct |
||
|
||
def min(self): | ||
""" | ||
Find the maximum item in this RDD. | ||
|
||
>>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).min() | ||
1.0 | ||
""" | ||
return self.stats().min() | ||
|
||
def sum(self): | ||
""" | ||
Add up the elements in this RDD. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add doc comments to these and the Scala versions