Skip to content

Commit

Permalink
[SPARK-41008][MLLIB] Dedup isotonic regression duplicate features
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Adding a pre-processing step to isotonic regression in mllib to handle duplicate features. This is to match `sklearn` implementation. Input points of duplicate feature values are aggregated into a single point using as label the weighted average of the labels of the points with duplicate feature values. All points for a unique feature values are aggregated as:
 - Aggregated label is the weighted average of all labels
 - Aggregated feature is the weighted average of all equal features. It is possible that feature values to be equal up to a resolution due to representation errors, since we cannot know which feature value to use in that case, we compute the weighted average of the features. Ideally, all feature values will be equal and the weighted average is just the value at any point.
 - Aggregated weight is the sum of all weights

### Why are the changes needed?

As per discussion on ticket [[SPARK-41008]](https://issues.apache.org/jira/browse/SPARK-41008), it is a bug and results should match `sklearn`.

### Does this PR introduce _any_ user-facing change?

There are no changes to the API, documentation or error messages. However, the user should expect results to change.

### How was this patch tested?

Existing test cases for duplicate features failed. These tests were adjusted accordingly. Also, new tests are added.

Here is a python snippet that can be used to verify the results:

```python
from sklearn.isotonic import IsotonicRegression

def test(x, y, x_test, isotonic=True):
    ir = IsotonicRegression(out_of_bounds='clip', increasing=isotonic).fit(x, y)
    y_test = ir.predict(x_test)

    def print_array(label, a):
        print(f"{label}: [{', '.join([str(i) for i in a])}]")

    print_array("boundaries", ir.X_thresholds_)
    print_array("predictions", ir.y_thresholds_)
    print_array("y_test", y_test)

test(
    x = [0.6, 0.6, 0.333, 0.333, 0.333, 0.20, 0.20, 0.20, 0.20],
    y = [1, 0, 0, 1, 0, 1, 0, 0, 0],
    x_test = [0.6, 0.6, 0.333, 0.333, 0.333, 0.20, 0.20, 0.20, 0.20]
)
```

srowen zapletal-martin

Closes apache#38966 from ahmed-mahran/ml-isotonic-reg-dups.

Authored-by: Ahmed Mahran <ahmed.mahran@mashin.io>
Signed-off-by: Sean Owen <srowen@gmail.com>
  • Loading branch information
ahmed-mahran authored and beliefer committed Dec 18, 2022
1 parent 5751e8c commit 00a45c7
Show file tree
Hide file tree
Showing 2 changed files with 262 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.mllib.regression

import java.io.Serializable
Expand All @@ -24,6 +23,7 @@ import java.util.Arrays.binarySearch
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import org.apache.commons.math3.util.Precision
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
Expand Down Expand Up @@ -307,6 +307,65 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
run(input.rdd.retag.asInstanceOf[RDD[(Double, Double, Double)]])
}

/**
* Aggregates points of duplicate feature values into a single point using as label the weighted
* average of the labels of the points with duplicate feature values. All points for a unique
* feature values are aggregated as:
*
* - Aggregated label is the weighted average of all labels
* - Aggregated feature is the weighted average of all equal features[1]
* - Aggregated weight is the sum of all weights
*
* [1] Note: It is possible that feature values to be equal up to a resolution due to
* representation errors, since we cannot know which feature value to use in that case, we
* compute the weighted average of the features. Ideally, all feature values will be equal and
* the weighted average is just the value at any point.
*
* @param input
* Input data of tuples (label, feature, weight). Weights must be non-negative.
* @return
* Points with unique feature values.
*/
private[regression] def makeUnique(
input: Array[(Double, Double, Double)]): Array[(Double, Double, Double)] = {

val cleanInput = input.filter { case (y, x, weight) =>
require(
weight >= 0.0,
s"Negative weight at point ($y, $x, $weight). Weights must be non-negative")
weight > 0
}

if (cleanInput.length <= 1) {
cleanInput
} else {
// whether or not two double features are equal up to a precision
@inline def areEqual(a: Double, b: Double): Boolean = Precision.equals(a, b)

val pointsAccumulator = new IsotonicRegression.PointsAccumulator
var (_, prevFeature, _) = cleanInput.head

// Go through input points, merging all points with approximately equal feature values into
// a single point. Equality of features is defined by areEqual method. The label of the
// accumulated points is the weighted average of the labels of all points of equal feature
// value. It is possible that feature values to be equal up to a resolution due to
// representation errors, since we cannot know which feature value to use in that case,
// we compute the weighted average of the features.
cleanInput.foreach { case point @ (_, feature, _) =>
if (areEqual(feature, prevFeature)) {
pointsAccumulator += point
} else {
pointsAccumulator.appendToOutput()
pointsAccumulator := point
}
prevFeature = feature
}
// Append the last accumulated point
pointsAccumulator.appendToOutput()
pointsAccumulator.getOutput
}
}

/**
* Performs a pool adjacent violators algorithm (PAV). Implements the algorithm originally
* described in [1], using the formulation from [2, 3]. Uses an array to keep track of start
Expand All @@ -322,35 +381,27 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
* functions subject to simple chain constraints." SIAM Journal on Optimization 10.3 (2000):
* 658-672.
*
* @param input Input data of tuples (label, feature, weight). Weights must
be non-negative.
* @param cleanUniqueInput Input data of tuples(label, feature, weight).Features must be unique
* and weights must be non-negative.
* @return Result tuples (label, feature, weight) where labels were updated
* to form a monotone sequence as per isotonic regression definition.
*/
private def poolAdjacentViolators(
input: Array[(Double, Double, Double)]): Array[(Double, Double, Double)] = {
cleanUniqueInput: Array[(Double, Double, Double)]): Array[(Double, Double, Double)] = {

val cleanInput = input.filter{ case (y, x, weight) =>
require(
weight >= 0.0,
s"Negative weight at point ($y, $x, $weight). Weights must be non-negative"
)
weight > 0
}

if (cleanInput.isEmpty) {
if (cleanUniqueInput.isEmpty) {
return Array.empty
}

// Keeps track of the start and end indices of the blocks. if [i, j] is a valid block from
// cleanInput(i) to cleanInput(j) (inclusive), then blockBounds(i) = j and blockBounds(j) = i
// Initially, each data point is its own block.
val blockBounds = Array.range(0, cleanInput.length)
val blockBounds = Array.range(0, cleanUniqueInput.length)

// Keep track of the sum of weights and sum of weight * y for each block. weights(start)
// gives the values for the block. Entries that are not at the start of a block
// are meaningless.
val weights: Array[(Double, Double)] = cleanInput.map { case (y, _, weight) =>
val weights: Array[(Double, Double)] = cleanUniqueInput.map { case (y, _, weight) =>
(weight, weight * y)
}

Expand Down Expand Up @@ -392,10 +443,10 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
// Merge on >= instead of > because it eliminates adjacent blocks with the same average, and we
// want to compress our output as much as possible. Both give correct results.
var i = 0
while (nextBlock(i) < cleanInput.length) {
while (nextBlock(i) < cleanUniqueInput.length) {
if (average(i) >= average(nextBlock(i))) {
merge(i, nextBlock(i))
while((i > 0) && (average(prevBlock(i)) >= average(i))) {
while ((i > 0) && (average(prevBlock(i)) >= average(i))) {
i = merge(prevBlock(i), i)
}
} else {
Expand All @@ -406,15 +457,15 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
// construct the output by walking through the blocks in order
val output = ArrayBuffer.empty[(Double, Double, Double)]
i = 0
while (i < cleanInput.length) {
while (i < cleanUniqueInput.length) {
// If block size is > 1, a point at the start and end of the block,
// each receiving half the weight. Otherwise, a single point with
// all the weight.
if (cleanInput(blockEnd(i))._2 > cleanInput(i)._2) {
output += ((average(i), cleanInput(i)._2, weights(i)._1 / 2))
output += ((average(i), cleanInput(blockEnd(i))._2, weights(i)._1 / 2))
if (cleanUniqueInput(blockEnd(i))._2 > cleanUniqueInput(i)._2) {
output += ((average(i), cleanUniqueInput(i)._2, weights(i)._1 / 2))
output += ((average(i), cleanUniqueInput(blockEnd(i))._2, weights(i)._1 / 2))
} else {
output += ((average(i), cleanInput(i)._2, weights(i)._1))
output += ((average(i), cleanUniqueInput(i)._2, weights(i)._1))
}
i = nextBlock(i)
}
Expand All @@ -434,12 +485,56 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
input: RDD[(Double, Double, Double)]): Array[(Double, Double, Double)] = {
val keyedInput = input.keyBy(_._2)
val parallelStepResult = keyedInput
// Points with same or adjacent features must collocate within the same partition.
.partitionBy(new RangePartitioner(keyedInput.getNumPartitions, keyedInput))
.values
// Lexicographically sort points by features then labels.
.mapPartitions(p => Iterator(p.toArray.sortBy(x => (x._2, x._1))))
// Aggregate points with equal features into a single point.
.map(makeUnique)
.flatMap(poolAdjacentViolators)
.collect()
.sortBy(x => (x._2, x._1)) // Sort again because collect() doesn't promise ordering.
// Sort again because collect() doesn't promise ordering.
.sortBy(x => (x._2, x._1))
poolAdjacentViolators(parallelStepResult)
}
}

object IsotonicRegression {
/**
* Utility class, holds a buffer of all points with unique features so far, and performs
* weighted sum accumulation of points. Hides these details for better readability of the
* main algorithm.
*/
class PointsAccumulator {
private val output = ArrayBuffer[(Double, Double, Double)]()
private var (currentLabel: Double, currentFeature: Double, currentWeight: Double) =
(0d, 0d, 0d)

/** Resets the current value of the point accumulator using the provided point. */
def :=(point: (Double, Double, Double)): Unit = {
val (label, feature, weight) = point
currentLabel = label * weight
currentFeature = feature * weight
currentWeight = weight
}

/** Accumulates the provided point into the current value of the point accumulator. */
def +=(point: (Double, Double, Double)): Unit = {
val (label, feature, weight) = point
currentLabel += label * weight
currentFeature += feature * weight
currentWeight += weight
}

/** Appends the current value of the point accumulator to the output. */
def appendToOutput(): Unit =
output += ((
currentLabel / currentWeight,
currentFeature / currentWeight,
currentWeight))

/** Returns all accumulated points so far. */
def getOutput: Array[(Double, Double, Double)] = output.toArray
}
}
Loading

0 comments on commit 00a45c7

Please sign in to comment.