Skip to content


Merge branch 'SPARK-3278-weightedLabeledPoint' into SPARK-3278
Browse files Browse the repository at this point in the history
  • Loading branch information
zapletal-martin committed Jan 11, 2015
2 parents 823d803 + 941fd1f commit e9b3323
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 236 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,67 +17,47 @@

package org.apache.spark.mllib.regression

import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.regression.MonotonicityConstraint.MonotonicityConstraint._
import{JavaRDD, JavaPairRDD}
import org.apache.spark.rdd.RDD

* Monotonicity constrains for monotone regression
* Isotonic (increasing)
* Antitonic (decreasing)
object MonotonicityConstraint {

object MonotonicityConstraint {

sealed trait MonotonicityConstraint {
private[regression] def holds(
current: WeightedLabeledPoint,
next: WeightedLabeledPoint): Boolean

* Isotonic monotonicity constraint. Increasing sequence
case object Isotonic extends MonotonicityConstraint {
override def holds(current: WeightedLabeledPoint, next: WeightedLabeledPoint): Boolean = {
current.label <= next.label

* Antitonic monotonicity constrain. Decreasing sequence
case object Antitonic extends MonotonicityConstraint {
override def holds(current: WeightedLabeledPoint, next: WeightedLabeledPoint): Boolean = {
current.label >= next.label

val Isotonic = MonotonicityConstraint.Isotonic
val Antitonic = MonotonicityConstraint.Antitonic

* Regression model for Isotonic regression
* @param predictions Weights computed for every feature.
* @param monotonicityConstraint specifies if the sequence is increasing or decreasing
* @param isotonic isotonic (increasing) or antitonic (decreasing) sequence
class IsotonicRegressionModel(
val predictions: Seq[WeightedLabeledPoint],
val monotonicityConstraint: MonotonicityConstraint)
extends RegressionModel {
class IsotonicRegressionModel (
val predictions: Seq[(Double, Double, Double)],
val isotonic: Boolean)
extends Serializable {

override def predict(testData: RDD[Vector]): RDD[Double] =
* Predict labels for provided features
* @param testData features to be labeled
* @return predicted labels
def predict(testData: RDD[Double]): RDD[Double] =

override def predict(testData: Vector): Double = {
* Predict labels for provided features
* @param testData features to be labeled
* @return predicted labels
def predict(testData: JavaRDD[java.lang.Double]): RDD[java.lang.Double] = => x.doubleValue()).map(predict)

* Predict a single label
* @param testData feature to be labeled
* @return predicted label
def predict(testData: Double): Double =
// Take the highest of data points smaller than our feature or data point with lowest feature
(predictions.head +:
predictions.filter(y => y.features.toArray.head <= testData.toArray.head)).last.label
(predictions.head +: predictions.filter(y => y._2 <= testData)).last._1

Expand All @@ -91,23 +71,23 @@ trait IsotonicRegressionAlgorithm
* @param predictions labels estimated using isotonic regression algorithm.
* Used for predictions on new data points.
* @param monotonicityConstraint isotonic or antitonic
* @param isotonic isotonic (increasing) or antitonic (decreasing) sequence
* @return isotonic regression model
protected def createModel(
predictions: Seq[WeightedLabeledPoint],
monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel
predictions: Seq[(Double, Double, Double)],
isotonic: Boolean): IsotonicRegressionModel

* Run algorithm to obtain isotonic regression model
* @param input data
* @param monotonicityConstraint ascending or descenting
* @param input (label, feature, weight)
* @param isotonic isotonic (increasing) or antitonic (decreasing) sequence
* @return isotonic regression model
def run(
input: RDD[WeightedLabeledPoint],
monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel
input: RDD[(Double, Double, Double)],
isotonic: Boolean): IsotonicRegressionModel

Expand All @@ -117,17 +97,17 @@ class PoolAdjacentViolators private [mllib]
extends IsotonicRegressionAlgorithm {

override def run(
input: RDD[WeightedLabeledPoint],
monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel = {
input: RDD[(Double, Double, Double)],
isotonic: Boolean): IsotonicRegressionModel = {
parallelPoolAdjacentViolators(input, monotonicityConstraint),
parallelPoolAdjacentViolators(input, isotonic),

override protected def createModel(
predictions: Seq[WeightedLabeledPoint],
monotonicityConstraint: MonotonicityConstraint): IsotonicRegressionModel = {
new IsotonicRegressionModel(predictions, monotonicityConstraint)
predictions: Seq[(Double, Double, Double)],
isotonic: Boolean): IsotonicRegressionModel = {
new IsotonicRegressionModel(predictions, isotonic)

Expand All @@ -138,32 +118,40 @@ class PoolAdjacentViolators private [mllib]
* Method in situ mutates input array
* @param in input data
* @param monotonicityConstraint asc or desc
* @param isotonic asc or desc
* @return result
private def poolAdjacentViolators(
in: Array[WeightedLabeledPoint],
monotonicityConstraint: MonotonicityConstraint): Array[WeightedLabeledPoint] = {
in: Array[(Double, Double, Double)],
isotonic: Boolean): Array[(Double, Double, Double)] = {

// Pools sub array within given bounds assigning weighted average value to all elements
def pool(in: Array[WeightedLabeledPoint], start: Int, end: Int): Unit = {
def pool(in: Array[(Double, Double, Double)], start: Int, end: Int): Unit = {
val poolSubArray = in.slice(start, end + 1)

val weightedSum = => lp.label * lp.weight).sum
val weight =
val weightedSum = => lp._1 * lp._3).sum
val weight =

for(i <- start to end) {
in(i) = WeightedLabeledPoint(weightedSum / weight, in(i).features, in(i).weight)
in(i) = (weightedSum / weight, in(i)._2, in(i)._3)

val isotonicConstraint: (Double, Double) => Boolean = (x, y) => x <= y
val antitonicConstraint: (Double, Double) => Boolean = (x, y) => x >= y

def monotonicityConstraint(isotonic: Boolean) =
if(isotonic) isotonicConstraint else antitonicConstraint

val monotonicityConstraintHolds = monotonicityConstraint(isotonic)

var i = 0

while(i < in.length) {
var j = i

// Find monotonicity violating sequence, if any
while(j < in.length - 1 && !monotonicityConstraint.holds(in(j), in(j + 1))) {
while(j < in.length - 1 && !monotonicityConstraintHolds(in(j)._1, in(j + 1)._1)) {
j = j + 1

Expand All @@ -173,7 +161,7 @@ class PoolAdjacentViolators private [mllib]
} else {
// Otherwise pool the violating sequence
// And check if pooling caused monotonicity violation in previously processed points
while (i >= 0 && !monotonicityConstraint.holds(in(i), in(i + 1))) {
while (i >= 0 && !monotonicityConstraintHolds(in(i)._1, in(i + 1)._1)) {
pool(in, i, j)
i = i - 1
Expand All @@ -190,19 +178,19 @@ class PoolAdjacentViolators private [mllib]
* Calls Pool adjacent violators on each partition and then again on the result
* @param testData input
* @param monotonicityConstraint asc or desc
* @param isotonic isotonic (increasing) or antitonic (decreasing) sequence
* @return result
private def parallelPoolAdjacentViolators(
testData: RDD[WeightedLabeledPoint],
monotonicityConstraint: MonotonicityConstraint): Seq[WeightedLabeledPoint] = {
testData: RDD[(Double, Double, Double)],
isotonic: Boolean): Seq[(Double, Double, Double)] = {

.mapPartitions(it => poolAdjacentViolators(it.toArray, monotonicityConstraint).toIterator)
.collect(), monotonicityConstraint)
.mapPartitions(it => poolAdjacentViolators(it.toArray, isotonic).toIterator)
.collect(), isotonic)

Expand All @@ -212,20 +200,35 @@ class PoolAdjacentViolators private [mllib]
object IsotonicRegression {

* Train a monotone regression model given an RDD of (label, features, weight).
* Currently only one dimensional algorithm is supported (features.length is one)
* Train a monotone regression model given an RDD of (label, feature, weight).
* Label is the dependent y value
* Weight of the data point is the number of measurements. Default is 1
* @param input RDD of (label, array of features, weight).
* @param input RDD of (label, feature, weight).
* Each point describes a row of the data
* matrix A as well as the corresponding right hand side label y
* and weight as number of measurements
* @param monotonicityConstraint Isotonic (increasing) or Antitonic (decreasing) sequence
* @param isotonic isotonic (increasing) or antitonic (decreasing) sequence
def train(
input: RDD[(Double, Double, Double)],
isotonic: Boolean = true): IsotonicRegressionModel = {
new PoolAdjacentViolators().run(input, isotonic)

* Train a monotone regression model given an RDD of (label, feature).
* Label is the dependent y value
* Weight defaults to 1
* @param input RDD of (label, feature).
* @param isotonic isotonic (increasing) or antitonic (decreasing) sequence
* @return
def train(
input: RDD[WeightedLabeledPoint],
monotonicityConstraint: MonotonicityConstraint = Isotonic): IsotonicRegressionModel = {
new PoolAdjacentViolators().run(input, monotonicityConstraint)
input: JavaPairRDD[java.lang.Double, java.lang.Double],
isotonic: Boolean): IsotonicRegressionModel = {
new PoolAdjacentViolators()
.run( => (x._1.doubleValue(), x._2.doubleValue(), 1d)), isotonic)

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,8 @@

package org.apache.spark.mllib.util

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.WeightedLabeledPointConversions._
import org.apache.spark.mllib.regression.{LabeledPoint, WeightedLabeledPoint}

import scala.collection.JavaConversions._
import java.lang.{Double => JDouble}

object IsotonicDataGenerator {

Expand All @@ -30,19 +27,19 @@ object IsotonicDataGenerator {
* @param labels list of labels for the data points
* @return Java List of input.
def generateIsotonicInputAsList(labels: Array[Double]): java.util.List[WeightedLabeledPoint] = {
def generateIsotonicInputAsList(labels: Array[Double]): java.util.List[(JDouble, JDouble)] = {
seqAsJavaList(generateIsotonicInput(wrapDoubleArray(labels):_*).map(x => (new JDouble(x._1), new JDouble(x._2))))
//.map(d => new Tuple3(new java.lang.Double(d._1), new java.lang.Double(d._2), new java.lang.Double(d._3))))

* Return an ordered sequence of labeled data points with default weights
* @param labels list of labels for the data points
* @return sequence of data points
def generateIsotonicInput(labels: Double*): Seq[WeightedLabeledPoint] = {
def generateIsotonicInput(labels: Double*): Seq[(Double, Double, Double)] = { to labels.size)
.map(point => labeledPointToWeightedLabeledPoint(
LabeledPoint(point._1, Vectors.dense(point._2))))
.map(point => (point._1, point._2.toDouble, 1d))

Expand All @@ -53,8 +50,8 @@ object IsotonicDataGenerator {
def generateWeightedIsotonicInput(
labels: Seq[Double],
weights: Seq[Double]): Seq[WeightedLabeledPoint] = {
weights: Seq[Double]): Seq[(Double, Double, Double)] = { to labels.size).zip(weights)
.map(point => WeightedLabeledPoint(point._1._1, Vectors.dense(point._1._2), point._2))
.map(point => (point._1._1, point._1._2.toDouble, point._2))

0 comments on commit e9b3323

Please sign in to comment.