Skip to content

Commit

Permalink
[SPARK-27777][ML] Eliminate uncessary sliding job in AreaUnderCurve
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?
compute AUC on one pass

## How was this patch tested?
existing tests

performance tests:
```
import org.apache.spark.mllib.evaluation._
val scoreAndLabels = sc.parallelize(Array.range(0, 100000).map{ i => (i.toDouble / 100000, (i % 2).toDouble) }, 4)
scoreAndLabels.persist()
scoreAndLabels.count()
val tic = System.currentTimeMillis
(0 until 100).foreach{i => val metrics = new BinaryClassificationMetrics(scoreAndLabels, 0); val auc = metrics.areaUnderROC; metrics.unpersist}
val toc = System.currentTimeMillis
toc - tic
```

|New| Existing|
|------|----------|
|87532|103644|

One-pass AUC saves about 16% computation time.

Closes #24648 from zhengruifeng/auc_opt.

Authored-by: zhengruifeng <ruifengz@foxmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
  • Loading branch information
zhengruifeng authored and srowen committed May 27, 2019
1 parent 8949bc7 commit 32461d4
Showing 1 changed file with 29 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.mllib.evaluation

import org.apache.spark.mllib.rdd.RDDFunctions._
import org.apache.spark.rdd.RDD

/**
Expand All @@ -42,10 +41,35 @@ private[evaluation] object AreaUnderCurve {
* @param curve an RDD of ordered 2D points stored in pairs representing a curve
*/
def of(curve: RDD[(Double, Double)]): Double = {
curve.sliding(2).aggregate(0.0)(
seqOp = (auc: Double, points: Array[(Double, Double)]) => auc + trapezoid(points),
combOp = _ + _
)
val localAreas = curve.mapPartitions { iter =>
if (iter.nonEmpty) {
var localArea = 0.0
var head = true
var firstPoint = (Double.NaN, Double.NaN)
var lastPoint = (Double.NaN, Double.NaN)

iter.sliding(2).foreach { points =>
if (head) {
firstPoint = points.head
head = false
}
lastPoint = points.last

if (points.length == 2) {
localArea += trapezoid(points)
}
}
Iterator.single((localArea, (firstPoint, lastPoint)))
} else {
Iterator.empty
}
}.collect()

localAreas.map(_._1).sum + localAreas.iterator.map(_._2)
.sliding(2).withPartial(false)
.map { case Seq((_, last1), (first2, _)) =>
trapezoid(Seq(last1, first2))
}.sum
}

/**
Expand Down

0 comments on commit 32461d4

Please sign in to comment.