From 00a45c70e1f2b6524c8ba56ab76d85851f3b3875 Mon Sep 17 00:00:00 2001 From: Ahmed Mahran Date: Thu, 8 Dec 2022 08:28:48 -0600 Subject: [PATCH] [SPARK-41008][MLLIB] Dedup isotonic regression duplicate features ### What changes were proposed in this pull request? Adding a pre-processing step to isotonic regression in mllib to handle duplicate features. This is to match `sklearn` implementation. Input points of duplicate feature values are aggregated into a single point using as label the weighted average of the labels of the points with duplicate feature values. All points for a unique feature values are aggregated as: - Aggregated label is the weighted average of all labels - Aggregated feature is the weighted average of all equal features. It is possible that feature values to be equal up to a resolution due to representation errors, since we cannot know which feature value to use in that case, we compute the weighted average of the features. Ideally, all feature values will be equal and the weighted average is just the value at any point. - Aggregated weight is the sum of all weights ### Why are the changes needed? As per discussion on ticket [[SPARK-41008]](https://issues.apache.org/jira/browse/SPARK-41008), it is a bug and results should match `sklearn`. ### Does this PR introduce _any_ user-facing change? There are no changes to the API, documentation or error messages. However, the user should expect results to change. ### How was this patch tested? Existing test cases for duplicate features failed. These tests were adjusted accordingly. Also, new tests are added. Here is a python snippet that can be used to verify the results: ```python from sklearn.isotonic import IsotonicRegression def test(x, y, x_test, isotonic=True): ir = IsotonicRegression(out_of_bounds='clip', increasing=isotonic).fit(x, y) y_test = ir.predict(x_test) def print_array(label, a): print(f"{label}: [{', '.join([str(i) for i in a])}]") print_array("boundaries", ir.X_thresholds_) print_array("predictions", ir.y_thresholds_) print_array("y_test", y_test) test( x = [0.6, 0.6, 0.333, 0.333, 0.333, 0.20, 0.20, 0.20, 0.20], y = [1, 0, 0, 1, 0, 1, 0, 0, 0], x_test = [0.6, 0.6, 0.333, 0.333, 0.333, 0.20, 0.20, 0.20, 0.20] ) ``` srowen zapletal-martin Closes #38966 from ahmed-mahran/ml-isotonic-reg-dups. Authored-by: Ahmed Mahran Signed-off-by: Sean Owen --- .../mllib/regression/IsotonicRegression.scala | 141 +++++++++++--- .../regression/IsotonicRegressionSuite.scala | 180 ++++++++++++++---- 2 files changed, 262 insertions(+), 59 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala index 649f9816e6a5a..0b2bf14750168 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.spark.mllib.regression import java.io.Serializable @@ -24,6 +23,7 @@ import java.util.Arrays.binarySearch import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer +import org.apache.commons.math3.util.Precision import org.json4s._ import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ @@ -307,6 +307,65 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali run(input.rdd.retag.asInstanceOf[RDD[(Double, Double, Double)]]) } + /** + * Aggregates points of duplicate feature values into a single point using as label the weighted + * average of the labels of the points with duplicate feature values. All points for a unique + * feature values are aggregated as: + * + * - Aggregated label is the weighted average of all labels + * - Aggregated feature is the weighted average of all equal features[1] + * - Aggregated weight is the sum of all weights + * + * [1] Note: It is possible that feature values to be equal up to a resolution due to + * representation errors, since we cannot know which feature value to use in that case, we + * compute the weighted average of the features. Ideally, all feature values will be equal and + * the weighted average is just the value at any point. + * + * @param input + * Input data of tuples (label, feature, weight). Weights must be non-negative. + * @return + * Points with unique feature values. + */ + private[regression] def makeUnique( + input: Array[(Double, Double, Double)]): Array[(Double, Double, Double)] = { + + val cleanInput = input.filter { case (y, x, weight) => + require( + weight >= 0.0, + s"Negative weight at point ($y, $x, $weight). Weights must be non-negative") + weight > 0 + } + + if (cleanInput.length <= 1) { + cleanInput + } else { + // whether or not two double features are equal up to a precision + @inline def areEqual(a: Double, b: Double): Boolean = Precision.equals(a, b) + + val pointsAccumulator = new IsotonicRegression.PointsAccumulator + var (_, prevFeature, _) = cleanInput.head + + // Go through input points, merging all points with approximately equal feature values into + // a single point. Equality of features is defined by areEqual method. The label of the + // accumulated points is the weighted average of the labels of all points of equal feature + // value. It is possible that feature values to be equal up to a resolution due to + // representation errors, since we cannot know which feature value to use in that case, + // we compute the weighted average of the features. + cleanInput.foreach { case point @ (_, feature, _) => + if (areEqual(feature, prevFeature)) { + pointsAccumulator += point + } else { + pointsAccumulator.appendToOutput() + pointsAccumulator := point + } + prevFeature = feature + } + // Append the last accumulated point + pointsAccumulator.appendToOutput() + pointsAccumulator.getOutput + } + } + /** * Performs a pool adjacent violators algorithm (PAV). Implements the algorithm originally * described in [1], using the formulation from [2, 3]. Uses an array to keep track of start @@ -322,35 +381,27 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali * functions subject to simple chain constraints." SIAM Journal on Optimization 10.3 (2000): * 658-672. * - * @param input Input data of tuples (label, feature, weight). Weights must - be non-negative. + * @param cleanUniqueInput Input data of tuples(label, feature, weight).Features must be unique + * and weights must be non-negative. * @return Result tuples (label, feature, weight) where labels were updated * to form a monotone sequence as per isotonic regression definition. */ private def poolAdjacentViolators( - input: Array[(Double, Double, Double)]): Array[(Double, Double, Double)] = { + cleanUniqueInput: Array[(Double, Double, Double)]): Array[(Double, Double, Double)] = { - val cleanInput = input.filter{ case (y, x, weight) => - require( - weight >= 0.0, - s"Negative weight at point ($y, $x, $weight). Weights must be non-negative" - ) - weight > 0 - } - - if (cleanInput.isEmpty) { + if (cleanUniqueInput.isEmpty) { return Array.empty } // Keeps track of the start and end indices of the blocks. if [i, j] is a valid block from // cleanInput(i) to cleanInput(j) (inclusive), then blockBounds(i) = j and blockBounds(j) = i // Initially, each data point is its own block. - val blockBounds = Array.range(0, cleanInput.length) + val blockBounds = Array.range(0, cleanUniqueInput.length) // Keep track of the sum of weights and sum of weight * y for each block. weights(start) // gives the values for the block. Entries that are not at the start of a block // are meaningless. - val weights: Array[(Double, Double)] = cleanInput.map { case (y, _, weight) => + val weights: Array[(Double, Double)] = cleanUniqueInput.map { case (y, _, weight) => (weight, weight * y) } @@ -392,10 +443,10 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali // Merge on >= instead of > because it eliminates adjacent blocks with the same average, and we // want to compress our output as much as possible. Both give correct results. var i = 0 - while (nextBlock(i) < cleanInput.length) { + while (nextBlock(i) < cleanUniqueInput.length) { if (average(i) >= average(nextBlock(i))) { merge(i, nextBlock(i)) - while((i > 0) && (average(prevBlock(i)) >= average(i))) { + while ((i > 0) && (average(prevBlock(i)) >= average(i))) { i = merge(prevBlock(i), i) } } else { @@ -406,15 +457,15 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali // construct the output by walking through the blocks in order val output = ArrayBuffer.empty[(Double, Double, Double)] i = 0 - while (i < cleanInput.length) { + while (i < cleanUniqueInput.length) { // If block size is > 1, a point at the start and end of the block, // each receiving half the weight. Otherwise, a single point with // all the weight. - if (cleanInput(blockEnd(i))._2 > cleanInput(i)._2) { - output += ((average(i), cleanInput(i)._2, weights(i)._1 / 2)) - output += ((average(i), cleanInput(blockEnd(i))._2, weights(i)._1 / 2)) + if (cleanUniqueInput(blockEnd(i))._2 > cleanUniqueInput(i)._2) { + output += ((average(i), cleanUniqueInput(i)._2, weights(i)._1 / 2)) + output += ((average(i), cleanUniqueInput(blockEnd(i))._2, weights(i)._1 / 2)) } else { - output += ((average(i), cleanInput(i)._2, weights(i)._1)) + output += ((average(i), cleanUniqueInput(i)._2, weights(i)._1)) } i = nextBlock(i) } @@ -434,12 +485,56 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali input: RDD[(Double, Double, Double)]): Array[(Double, Double, Double)] = { val keyedInput = input.keyBy(_._2) val parallelStepResult = keyedInput + // Points with same or adjacent features must collocate within the same partition. .partitionBy(new RangePartitioner(keyedInput.getNumPartitions, keyedInput)) .values + // Lexicographically sort points by features then labels. .mapPartitions(p => Iterator(p.toArray.sortBy(x => (x._2, x._1)))) + // Aggregate points with equal features into a single point. + .map(makeUnique) .flatMap(poolAdjacentViolators) .collect() - .sortBy(x => (x._2, x._1)) // Sort again because collect() doesn't promise ordering. + // Sort again because collect() doesn't promise ordering. + .sortBy(x => (x._2, x._1)) poolAdjacentViolators(parallelStepResult) } } + +object IsotonicRegression { + /** + * Utility class, holds a buffer of all points with unique features so far, and performs + * weighted sum accumulation of points. Hides these details for better readability of the + * main algorithm. + */ + class PointsAccumulator { + private val output = ArrayBuffer[(Double, Double, Double)]() + private var (currentLabel: Double, currentFeature: Double, currentWeight: Double) = + (0d, 0d, 0d) + + /** Resets the current value of the point accumulator using the provided point. */ + def :=(point: (Double, Double, Double)): Unit = { + val (label, feature, weight) = point + currentLabel = label * weight + currentFeature = feature * weight + currentWeight = weight + } + + /** Accumulates the provided point into the current value of the point accumulator. */ + def +=(point: (Double, Double, Double)): Unit = { + val (label, feature, weight) = point + currentLabel += label * weight + currentFeature += feature * weight + currentWeight += weight + } + + /** Appends the current value of the point accumulator to the output. */ + def appendToOutput(): Unit = + output += (( + currentLabel / currentWeight, + currentFeature / currentWeight, + currentWeight)) + + /** Returns all accumulated points so far. */ + def getOutput: Array[(Double, Double, Double)] = output.toArray + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala index 8066900dfa011..b59d16be6cd0a 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.mllib.regression +import org.apache.commons.math3.util.Precision import org.scalatest.matchers.must.Matchers import org.apache.spark.{SparkException, SparkFunSuite} @@ -24,6 +25,24 @@ import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.mllib.util.TestingUtils._ import org.apache.spark.util.Utils +/** + * Tests can be verified through the following python snippet: + * + * {{{ + * from sklearn.isotonic import IsotonicRegression + * + * def test(x, y, x_test, isotonic=True): + * ir = IsotonicRegression(out_of_bounds='clip', increasing=isotonic).fit(x, y) + * y_test = ir.predict(x_test) + * + * def print_array(label, a): + * print(f"{label}: [{', '.join([str(i) for i in a])}]") + * + * print_array("boundaries", ir.X_thresholds_) + * print_array("predictions", ir.y_thresholds_) + * print_array("y_test", y_test) + * }}} + */ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext with Matchers { private def round(d: Double) = { @@ -44,8 +63,7 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext w labels: Seq[Double], weights: Seq[Double], isotonic: Boolean): IsotonicRegressionModel = { - val trainRDD = sc.parallelize(generateIsotonicInput(labels, weights)).cache() - new IsotonicRegression().setIsotonic(isotonic).run(trainRDD) + runIsotonicRegressionOnInput(generateIsotonicInput(labels, weights), isotonic) } private def runIsotonicRegression( @@ -54,17 +72,37 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext w runIsotonicRegression(labels, Array.fill(labels.size)(1d), isotonic) } + private def runIsotonicRegression( + labels: Seq[Double], + features: Seq[Double], + weights: Seq[Double], + isotonic: Boolean): IsotonicRegressionModel = { + runIsotonicRegressionOnInput( + labels.indices.map(i => (labels(i), features(i), weights(i))), + isotonic) + } + + private def runIsotonicRegressionOnInput( + input: Seq[(Double, Double, Double)], + isotonic: Boolean, + slices: Int = sc.defaultParallelism): IsotonicRegressionModel = { + val trainRDD = sc.parallelize(input, slices).cache() + new IsotonicRegression().setIsotonic(isotonic).run(trainRDD) + } + test("increasing isotonic regression") { /* The following result could be re-produced with sklearn. - > from sklearn.isotonic import IsotonicRegression - > x = range(9) - > y = [1, 2, 3, 1, 6, 17, 16, 17, 18] - > ir = IsotonicRegression(x, y) - > print ir.predict(x) + > test( + > x = range(9), + > y = [1, 2, 3, 1, 6, 17, 16, 17, 18], + > x_test = range(9) + > ) - array([ 1. , 2. , 2. , 2. , 6. , 16.5, 16.5, 17. , 18. ]) + boundaries: [0.0, 1.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0] + predictions: [1.0, 2.0, 2.0, 6.0, 16.5, 16.5, 17.0, 18.0] + y_test: [1.0, 2.0, 2.0, 2.0, 6.0, 16.5, 16.5, 17.0, 18.0] */ val model = runIsotonicRegression(Seq(1, 2, 3, 1, 6, 17, 16, 17, 18), true) @@ -142,9 +180,9 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext w } test("isotonic regression with unordered input") { - val trainRDD = sc.parallelize(generateIsotonicInput(Seq(1, 2, 3, 4, 5)).reverse, 2).cache() + val model = + runIsotonicRegressionOnInput(generateIsotonicInput(Seq(1, 2, 3, 4, 5)).reverse, true, 2) - val model = new IsotonicRegression().run(trainRDD) assert(model.predictions === Array(1, 2, 3, 4, 5)) } @@ -159,7 +197,7 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext w val model = runIsotonicRegression(Seq(1, 2, 3, 2, 1), Seq(1, 1, 1, 0.1, 0.1), true) assert(model.boundaries === Array(0, 1, 2, 4)) - assert(model.predictions.map(round) === Array(1, 2, 3.3/1.2, 3.3/1.2)) + assert(model.predictions.map(round) === Array(1, 2, 3.3 / 1.2, 3.3 / 1.2)) } test("weighted isotonic regression with negative weights") { @@ -176,11 +214,20 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext w } test("SPARK-16426 isotonic regression with duplicate features that produce NaNs") { - val trainRDD = sc.parallelize(Seq[(Double, Double, Double)]((2, 1, 1), (1, 1, 1), (0, 2, 1), - (1, 2, 1), (0.5, 3, 1), (0, 3, 1)), - 2) + val model = runIsotonicRegressionOnInput( + Seq((2, 1, 1), (1, 1, 1), (0, 2, 1), (1, 2, 1), (0.5, 3, 1), (0, 3, 1)), + true, + 2) + + assert(model.boundaries === Array(1.0, 3.0)) + assert(model.predictions === Array(0.75, 0.75)) + } - val model = new IsotonicRegression().run(trainRDD) + test("SPARK-41008 isotonic regression with duplicate features differs from sklearn") { + val model = runIsotonicRegressionOnInput( + Seq((2, 1, 1), (1, 1, 1), (0, 2, 1), (1, 2, 1), (0.5, 3, 1), (0, 3, 1)), + true, + 2) assert(model.boundaries === Array(1.0, 3.0)) assert(model.predictions === Array(0.75, 0.75)) @@ -194,32 +241,38 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext w assert(model.predict(0.5) === 1.5) assert(model.predict(0.75) === 1.75) assert(model.predict(1) === 2) - assert(model.predict(2) === 10d/3) - assert(model.predict(9) === 10d/3) + assert(model.predict(2) === 10.0 / 3) + assert(model.predict(9) === 10.0 / 3) } test("isotonic regression prediction with duplicate features") { - val trainRDD = sc.parallelize( - Seq[(Double, Double, Double)]( - (2, 1, 1), (1, 1, 1), (4, 2, 1), (2, 2, 1), (6, 3, 1), (5, 3, 1)), 2).cache() - val model = new IsotonicRegression().run(trainRDD) - - assert(model.predict(0) === 1) - assert(model.predict(1.5) === 2) - assert(model.predict(2.5) === 4.5) - assert(model.predict(4) === 6) + val model = runIsotonicRegressionOnInput( + Seq((2, 1, 1), (1, 1, 1), (4, 2, 1), (2, 2, 1), (6, 3, 1), (5, 3, 1)), + true, + 2) + + assert(model.boundaries === Array(1.0, 2.0, 3.0)) + assert(model.predictions === Array(1.5, 3.0, 5.5)) + + assert(model.predict(0) === 1.5) + assert(model.predict(1.5) === 2.25) + assert(model.predict(2.5) === 4.25) + assert(model.predict(4) === 5.5) } test("antitonic regression prediction with duplicate features") { - val trainRDD = sc.parallelize( - Seq[(Double, Double, Double)]( - (5, 1, 1), (6, 1, 1), (2, 2, 1), (4, 2, 1), (1, 3, 1), (2, 3, 1)), 2).cache() - val model = new IsotonicRegression().setIsotonic(false).run(trainRDD) - - assert(model.predict(0) === 6) - assert(model.predict(1.5) === 4.5) - assert(model.predict(2.5) === 2) - assert(model.predict(4) === 1) + val model = runIsotonicRegressionOnInput( + Seq((5, 1, 1), (6, 1, 1), (2, 2, 1), (4, 2, 1), (1, 3, 1), (2, 3, 1)), + false, + 2) + + assert(model.boundaries === Array(1.0, 2.0, 3.0)) + assert(model.predictions === Array(5.5, 3.0, 1.5)) + + assert(model.predict(0) === 5.5) + assert(model.predict(1.5) === 4.25) + assert(model.predict(2.5) === 2.25) + assert(model.predict(4) === 1.5) } test("isotonic regression RDD prediction") { @@ -227,7 +280,7 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext w val testRDD = sc.parallelize(List(-2.0, -1.0, 0.5, 0.75, 1.0, 2.0, 9.0), 2).cache() val predictions = testRDD.map(x => (x, model.predict(x))).collect().sortBy(_._1).map(_._2) - assert(predictions === Array(1, 1, 1.5, 1.75, 2, 10.0/3, 10.0/3)) + assert(predictions === Array(1, 1, 1.5, 1.75, 2, 10.0 / 3, 10.0 / 3)) } test("antitonic regression prediction") { @@ -270,4 +323,59 @@ class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext w new IsotonicRegressionModel(Array(0.0, 1.0), Array(1.0, 2.0), isotonic = false) } } + + test("makeUnique: handle duplicate features") { + val regressor = new IsotonicRegression() + import regressor.makeUnique + import Precision.EPSILON + + // Note: input must be lexicographically sorted by (feature, label) + + // empty + assert(makeUnique(Array.empty) === Array.empty) + + // single + assert(makeUnique(Array((1.0, 1.0, 1.0))) === Array((1.0, 1.0, 1.0))) + + // two and duplicate + assert(makeUnique(Array((1.0, 1.0, 1.0), (1.0, 1.0, 1.0))) === Array((1.0, 1.0, 2.0))) + + // two and unique + assert( + makeUnique(Array((1.0, 1.0, 1.0), (1.0, 2.0, 1.0))) === + Array((1.0, 1.0, 1.0), (1.0, 2.0, 1.0))) + + // generic with duplicates + assert( + makeUnique( + Array( + (10.0, 1.0, 1.0), (20.0, 1.0, 1.0), + (10.0, 2.0, 1.0), (20.0, 2.0, 1.0), (30.0, 2.0, 1.0), + (10.0, 3.0, 1.0) + )) === Array((15.0, 1.0, 2.0), (20.0, 2.0, 3.0), (10.0, 3.0, 1.0))) + + // generic unique + assert( + makeUnique(Array((10.0, 1.0, 1.0), (10.0, 2.0, 1.0), (10.0, 3.0, 1.0))) === Array( + (10.0, 1.0, 1.0), + (10.0, 2.0, 1.0), + (10.0, 3.0, 1.0))) + + // generic with duplicates and non-uniform weights + assert( + makeUnique( + Array( + (10.0, 1.0, 0.3), (20.0, 1.0, 0.7), + (10.0, 2.0, 0.3), (20.0, 2.0, 0.3), (30.0, 2.0, 0.4), + (10.0, 3.0, 1.0) + )) === Array( + (10.0 * 0.3 + 20.0 * 0.7, 1.0, 1.0), + (10.0 * 0.3 + 20.0 * 0.3 + 30.0 * 0.4, 2.0, 1.0), + (10.0, 3.0, 1.0))) + + // duplicate up to resolution error + assert( + makeUnique(Array((1.0, 1.0, 1.0), (1.0, 1.0 - EPSILON, 1.0), (1.0, 1.0 + EPSILON, 1.0))) === + Array((1.0, 1.0, 3.0))) + } }