Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/SPARK-3278' into SPARK-3278
Browse files Browse the repository at this point in the history
  • Loading branch information
zapletal-martin committed Jan 30, 2015
2 parents d8feb82 + ded071c commit e3c0e44
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.io.Serializable
import java.lang.{Double => JDouble}
import java.util.Arrays.binarySearch

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.api.java.{JavaDoubleRDD, JavaRDD}
import org.apache.spark.rdd.RDD

Expand All @@ -31,31 +33,29 @@ import org.apache.spark.rdd.RDD
* Boundaries must be sorted in increasing order.
* @param predictions Array of predictions associated to the boundaries at the same index.
* Results of isotonic regression and therefore monotone.
* @param isotonic indicates whether this is isotonic or antitonic.
*/
class IsotonicRegressionModel (
boundaries: Array[Double],
val boundaries: Array[Double],
val predictions: Array[Double],
isotonic: Boolean)
extends Serializable {
val isotonic: Boolean) extends Serializable {

private val predictionOrd = if (isotonic) Ordering[Double] else Ordering[Double].reverse

require(boundaries.length == predictions.length)
assertOrdered(boundaries)
assertOrdered(predictions)(predictionOrd)

private def isSorted(xs: Array[Double]): Boolean = {
/** Asserts the input array is monotone with the given ordering. */
private def assertOrdered(xs: Array[Double])(implicit ord: Ordering[Double]): Unit = {
var i = 1
while (i < xs.length) {
if (xs(i) < xs(i - 1)) false
require(ord.compare(xs(i - 1), xs(i)) <= 0,
s"Elements (${xs(i - 1)}, ${xs(i)}) are not ordered.")
i += 1
}
true
}

if (isotonic) {
assert(isSorted(predictions))
} else {
assert(isSorted(predictions.map(-_)))
}

assert(isSorted(boundaries))
assert(boundaries.length == predictions.length)

/**
* Predict labels for provided features.
* Using a piecewise linear function.
Expand Down Expand Up @@ -175,10 +175,10 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
input.map(x => (-x._1, x._2, x._3))
}

val isotonicRegression = parallelPoolAdjacentViolators(preprocessedInput)
val pooled = parallelPoolAdjacentViolators(preprocessedInput)

val predictions = if (isotonic) isotonicRegression.map(_._1) else isotonicRegression.map(-_._1)
val boundaries = isotonicRegression.map(_._2)
val predictions = if (isotonic) pooled.map(_._1) else pooled.map(-_._1)
val boundaries = pooled.map(_._2)

new IsotonicRegressionModel(boundaries, predictions, isotonic)
}
Expand Down Expand Up @@ -210,6 +210,10 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
private def poolAdjacentViolators(
input: Array[(Double, Double, Double)]): Array[(Double, Double, Double)] = {

if (input.isEmpty) {
return Array.empty
}

// Pools sub array within given bounds assigning weighted average value to all elements.
def pool(input: Array[(Double, Double, Double)], start: Int, end: Int): Unit = {
val poolSubArray = input.slice(start, end + 1)
Expand Down Expand Up @@ -248,7 +252,35 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
}
}

input
// For points having the same prediction, we only keep two boundary points.
val compressed = ArrayBuffer.empty[(Double, Double, Double)]

var (curLabel, curFeature, curWeight) = input.head
var rightBound = curFeature
def merge(): Unit = {
compressed += ((curLabel, curFeature, curWeight))
if (rightBound > curFeature) {
compressed += ((curLabel, rightBound, 0.0))
}
}
i = 1
while (i < input.length) {
val (label, feature, weight) = input(i)
if (label == curLabel) {
curWeight += weight
rightBound = feature
} else {
merge()
curLabel = label
curFeature = feature
curWeight = weight
rightBound = curFeature
}
i += 1
}
merge()

compressed.toArray
}

/**
Expand All @@ -261,11 +293,12 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
*/
private def parallelPoolAdjacentViolators(
input: RDD[(Double, Double, Double)]): Array[(Double, Double, Double)] = {

val parallelStepResult = input
.sortBy(x => (x._2, x._1))
.mapPartitions(it => poolAdjacentViolators(it.toArray).toIterator)

poolAdjacentViolators(parallelStepResult.collect())
.glom()
.flatMap(poolAdjacentViolators)
.collect()
.sortBy(x => (x._2, x._1)) // Sort again because collect() doesn't promise ordering.
poolAdjacentViolators(parallelStepResult)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.mllib.regression
import org.scalatest.{Matchers, FunSuite}

import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.mllib.util.TestingUtils._

class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with Matchers {

Expand All @@ -28,15 +29,13 @@ class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with M
}

private def generateIsotonicInput(labels: Seq[Double]): Seq[(Double, Double, Double)] = {
labels.zip(1 to labels.size).map(point => (point._1, point._2.toDouble, 1d))
Seq.tabulate(labels.size)(i => (labels(i), i.toDouble, 1d))
}

private def generateIsotonicInput(
labels: Seq[Double],
weights: Seq[Double]): Seq[(Double, Double, Double)] = {
labels.zip(1 to labels.size)
.zip(weights)
.map(point => (point._1._1, point._1._2.toDouble, point._2))
Seq.tabulate(labels.size)(i => (labels(i), i.toDouble, weights(i)))
}

private def runIsotonicRegression(
Expand All @@ -54,9 +53,24 @@ class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with M
}

test("increasing isotonic regression") {
val model = runIsotonicRegression(Seq(1, 2, 3, 3, 1, 6, 17, 16, 17, 18), true)
/*
The following result could be re-produced with sklearn.
assert(model.predictions === Array(1, 2, 7d/3, 7d/3, 7d/3, 6, 16.5, 16.5, 17, 18))
> from sklearn.isotonic import IsotonicRegression
> x = range(9)
> y = [1, 2, 3, 1, 6, 17, 16, 17, 18]
> ir = IsotonicRegression(x, y)
> print ir.predict(x)
array([ 1. , 2. , 2. , 2. , 6. , 16.5, 16.5, 17. , 18. ])
*/
val model = runIsotonicRegression(Seq(1, 2, 3, 1, 6, 17, 16, 17, 18), true)

assert(Array.tabulate(9)(x => model.predict(x)) === Array(1, 2, 2, 2, 6, 16.5, 16.5, 17, 18))

assert(model.boundaries === Array(0, 1, 3, 4, 5, 6, 7, 8))
assert(model.predictions === Array(1, 2, 2, 6, 16.5, 16.5, 17.0, 18.0))
assert(model.isotonic)
}

test("isotonic regression with size 0") {
Expand All @@ -80,74 +94,82 @@ class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with M
test("isotonic regression strictly decreasing sequence") {
val model = runIsotonicRegression(Seq(5, 4, 3, 2, 1), true)

assert(model.predictions === Array(3, 3, 3, 3, 3))
assert(model.boundaries === Array(0, 4))
assert(model.predictions === Array(3, 3))
}

test("isotonic regression with last element violating monotonicity") {
val model = runIsotonicRegression(Seq(1, 2, 3, 4, 2), true)

assert(model.predictions === Array(1, 2, 3, 3, 3))
assert(model.boundaries === Array(0, 1, 2, 4))
assert(model.predictions === Array(1, 2, 3, 3))
}

test("isotonic regression with first element violating monotonicity") {
val model = runIsotonicRegression(Seq(4, 2, 3, 4, 5), true)

assert(model.predictions === Array(3, 3, 3, 4, 5))
assert(model.boundaries === Array(0, 2, 3, 4))
assert(model.predictions === Array(3, 3, 4, 5))
}

test("isotonic regression with negative labels") {
val model = runIsotonicRegression(Seq(-1, -2, 0, 1, -1), true)

assert(model.predictions === Array(-1.5, -1.5, 0, 0, 0))
assert(model.boundaries === Array(0, 1, 2, 4))
assert(model.predictions === Array(-1.5, -1.5, 0, 0))
}

test("isotonic regression with unordered input") {
val trainRDD = sc.parallelize(generateIsotonicInput(Seq(1, 2, 3, 4, 5)).reverse).cache()
val model = new IsotonicRegression().run(trainRDD)
val trainRDD = sc.parallelize(generateIsotonicInput(Seq(1, 2, 3, 4, 5)).reverse, 2).cache()

val model = new IsotonicRegression().run(trainRDD)
assert(model.predictions === Array(1, 2, 3, 4, 5))
}

test("weighted isotonic regression") {
val model = runIsotonicRegression(Seq(1, 2, 3, 4, 2), Seq(1, 1, 1, 1, 2), true)

assert(model.predictions === Array(1, 2, 2.75, 2.75,2.75))
assert(model.boundaries === Array(0, 1, 2, 4))
assert(model.predictions === Array(1, 2, 2.75, 2.75))
}

test("weighted isotonic regression with weights lower than 1") {
val model = runIsotonicRegression(Seq(1, 2, 3, 2, 1), Seq(1, 1, 1, 0.1, 0.1), true)

assert(model.predictions.map(round) === Array(1, 2, 3.3/1.2, 3.3/1.2, 3.3/1.2))
assert(model.boundaries === Array(0, 1, 2, 4))
assert(model.predictions.map(round) === Array(1, 2, 3.3/1.2, 3.3/1.2))
}

test("weighted isotonic regression with negative weights") {
val model = runIsotonicRegression(Seq(1, 2, 3, 2, 1), Seq(-1, 1, -3, 1, -5), true)

assert(model.predictions === Array(1.0, 10.0/6, 10.0/6, 10.0/6, 10.0/6))
assert(model.boundaries === Array(0.0, 1.0, 4.0))
assert(model.predictions === Array(1.0, 10.0/6, 10.0/6))
}

test("weighted isotonic regression with zero weights") {
val model = runIsotonicRegression(Seq[Double](1, 2, 3, 2, 1), Seq[Double](0, 0, 0, 1, 0), true)

assert(model.predictions === Array(1, 2, 2, 2, 2))
assert(model.boundaries === Array(0.0, 1.0, 4.0))
assert(model.predictions === Array(1, 2, 2))
}

test("isotonic regression prediction") {
val model = runIsotonicRegression(Seq(1, 2, 7, 1, 2), true)

assert(model.predict(-2) === 1)
assert(model.predict(-1) === 1)
assert(model.predict(0) === 1)
assert(model.predict(1.5) === 1.5)
assert(model.predict(1.75) === 1.75)
assert(model.predict(2) === 2)
assert(model.predict(3) === 10d/3)
assert(model.predict(10) === 10d/3)
assert(model.predict(0.5) === 1.5)
assert(model.predict(0.75) === 1.75)
assert(model.predict(1) === 2)
assert(model.predict(2) === 10d/3)
assert(model.predict(9) === 10d/3)
}

test("isotonic regression prediction with duplicate features") {
val trainRDD = sc.parallelize(
Seq[(Double, Double, Double)](
(2, 1, 1), (1, 1, 1), (4, 2, 1), (2, 2, 1), (6, 3, 1), (5, 3, 1))).cache()
(2, 1, 1), (1, 1, 1), (4, 2, 1), (2, 2, 1), (6, 3, 1), (5, 3, 1)), 2).cache()
val model = new IsotonicRegression().run(trainRDD)

assert(model.predict(0) === 1)
Expand All @@ -159,7 +181,7 @@ class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with M
test("antitonic regression prediction with duplicate features") {
val trainRDD = sc.parallelize(
Seq[(Double, Double, Double)](
(5, 1, 1), (6, 1, 1), (2, 2, 1), (4, 2, 1), (1, 3, 1), (2, 3, 1))).cache()
(5, 1, 1), (6, 1, 1), (2, 2, 1), (4, 2, 1), (1, 3, 1), (2, 3, 1)), 2).cache()
val model = new IsotonicRegression().setIsotonic(false).run(trainRDD)

assert(model.predict(0) === 6)
Expand All @@ -170,20 +192,50 @@ class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with M

test("isotonic regression RDD prediction") {
val model = runIsotonicRegression(Seq(1, 2, 7, 1, 2), true)
val testRDD = sc.parallelize(List(-1.0, 0.0, 1.5, 1.75, 2.0, 3.0, 10.0)).cache()

assert(model.predict(testRDD).collect() === Array(1, 1, 1.5, 1.75, 2, 10.0/3, 10.0/3))
val testRDD = sc.parallelize(List(-2.0, -1.0, 0.5, 0.75, 1.0, 2.0, 9.0), 2).cache()
val predictions = testRDD.map(x => (x, model.predict(x))).collect().sortBy(_._1).map(_._2)
assert(predictions === Array(1, 1, 1.5, 1.75, 2, 10.0/3, 10.0/3))
}

test("antitonic regression prediction") {
val model = runIsotonicRegression(Seq(7, 5, 3, 5, 1), false)

assert(model.predict(-2) === 7)
assert(model.predict(-1) === 7)
assert(model.predict(0) === 7)
assert(model.predict(1.5) === 6)
assert(model.predict(1.75) === 5.5)
assert(model.predict(2) === 5)
assert(model.predict(3) === 4)
assert(model.predict(10) === 1)
}
}
assert(model.predict(0.5) === 6)
assert(model.predict(0.75) === 5.5)
assert(model.predict(1) === 5)
assert(model.predict(2) === 4)
assert(model.predict(9) === 1)
}

test("model construction") {
val model = new IsotonicRegressionModel(Array(0.0, 1.0), Array(1.0, 2.0), isotonic = true)
assert(model.predict(-0.5) === 1.0)
assert(model.predict(0.0) === 1.0)
assert(model.predict(0.5) ~== 1.5 absTol 1e-14)
assert(model.predict(1.0) === 2.0)
assert(model.predict(1.5) === 2.0)

intercept[IllegalArgumentException] {
// different array sizes.
new IsotonicRegressionModel(Array(0.0, 1.0), Array(1.0), isotonic = true)
}

intercept[IllegalArgumentException] {
// unordered boundaries
new IsotonicRegressionModel(Array(1.0, 0.0), Array(1.0, 2.0), isotonic = true)
}

intercept[IllegalArgumentException] {
// unordered predictions (isotonic)
new IsotonicRegressionModel(Array(0.0, 1.0), Array(2.0, 1.0), isotonic = true)
}

intercept[IllegalArgumentException] {
// unordered predictions (antitonic)
new IsotonicRegressionModel(Array(0.0, 1.0), Array(1.0, 2.0), isotonic = false)
}
}
}

0 comments on commit e3c0e44

Please sign in to comment.