diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/AbsoluteError.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/AbsoluteError.scala index f69960ee37143..4bed11a80c7c5 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/AbsoluteError.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/AbsoluteError.scala @@ -47,30 +47,15 @@ object AbsoluteError extends Loss { if ((point.label - model.predict(point.features)) < 0) 1.0 else -1.0 } - /** - * Method to calculate loss of the base learner for the gradient boosting calculation. - * Note: This method is not used by the gradient boosting algorithm but is useful for debugging - * purposes. - * @param model Ensemble model - * @param data Training dataset: RDD of [[org.apache.spark.mllib.regression.LabeledPoint]]. - * @return Mean absolute error of model on data - */ - override def computeError(model: TreeEnsembleModel, data: RDD[LabeledPoint]): Double = { - data.map { y => - val err = model.predict(y.features) - y.label - math.abs(err) - }.mean() - } - /** * Method to calculate loss when the predictions are already known. * Note: This method is used in the method evaluateEachIteration to avoid recomputing the * predicted values from previously fit trees. - * @param datum: LabeledPoint - * @param prediction: Predicted label. - * @return Absolute error of model on the given datapoint. + * @param prediction Predicted label. + * @param datum LabeledPoint. + * @return Absolute error of model on the given datapoint. */ - override def computeError(datum: LabeledPoint, prediction: Double): Double = { + override def computeError(prediction: Double, datum: LabeledPoint): Double = { val err = datum.label - prediction math.abs(err) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/LogLoss.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/LogLoss.scala index 06ec62ea82526..76af56bd954fa 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/LogLoss.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/LogLoss.scala @@ -50,32 +50,15 @@ object LogLoss extends Loss { - 4.0 * point.label / (1.0 + math.exp(2.0 * point.label * prediction)) } - /** - * Method to calculate loss of the base learner for the gradient boosting calculation. - * Note: This method is not used by the gradient boosting algorithm but is useful for debugging - * purposes. - * @param model Ensemble model - * @param data Training dataset: RDD of [[org.apache.spark.mllib.regression.LabeledPoint]]. - * @return Mean log loss of model on data - */ - override def computeError(model: TreeEnsembleModel, data: RDD[LabeledPoint]): Double = { - data.map { case point => - val prediction = model.predict(point.features) - val margin = 2.0 * point.label * prediction - // The following is equivalent to 2.0 * log(1 + exp(-margin)) but more numerically stable. - 2.0 * MLUtils.log1pExp(-margin) - }.mean() - } - /** * Method to calculate loss when the predictions are already known. * Note: This method is used in the method evaluateEachIteration to avoid recomputing the * predicted values from previously fit trees. - * @param datum: LabeledPoint - * @param prediction: Predicted label. + * @param prediction Predicted label. + * @param datum LabeledPoint * @return log loss of model on the datapoint. */ - override def computeError(datum: LabeledPoint, prediction: Double): Double = { + override def computeError(prediction: Double, datum: LabeledPoint): Double = { val margin = 2.0 * datum.label * prediction // The following is equivalent to 2.0 * log(1 + exp(-margin)) but more numerically stable. 2.0 * MLUtils.log1pExp(-margin) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/Loss.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/Loss.scala index 62d9c86bb14a0..7bb0a51e2c782 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/Loss.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/Loss.scala @@ -47,16 +47,18 @@ trait Loss extends Serializable { * @param data Training dataset: RDD of [[org.apache.spark.mllib.regression.LabeledPoint]]. * @return Measure of model error on data */ - def computeError(model: TreeEnsembleModel, data: RDD[LabeledPoint]): Double + def computeError(model: TreeEnsembleModel, data: RDD[LabeledPoint]): Double = { + data.map(point => computeError(model.predict(point.features), point)).mean() + } /** * Method to calculate loss when the predictions are already known. * Note: This method is used in the method evaluateEachIteration to avoid recomputing the * predicted values from previously fit trees. - * @param datum: LabeledPoint - * @param prediction: Predicted label. + * @param prediction Predicted label. + * @param datum LabeledPoint * @return Measure of model error on datapoint. */ - def computeError(datum: LabeledPoint, prediction: Double) : Double + def computeError(prediction: Double, datum: LabeledPoint): Double } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/SquaredError.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/SquaredError.scala index 0ff8718f0d54c..cfe548b1d0021 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/SquaredError.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/loss/SquaredError.scala @@ -47,30 +47,15 @@ object SquaredError extends Loss { 2.0 * (model.predict(point.features) - point.label) } - /** - * Method to calculate loss of the base learner for the gradient boosting calculation. - * Note: This method is not used by the gradient boosting algorithm but is useful for debugging - * purposes. - * @param model Ensemble model - * @param data Training dataset: RDD of [[org.apache.spark.mllib.regression.LabeledPoint]]. - * @return Mean squared error of model on data - */ - override def computeError(model: TreeEnsembleModel, data: RDD[LabeledPoint]): Double = { - data.map { y => - val err = model.predict(y.features) - y.label - err * err - }.mean() - } - /** * Method to calculate loss when the predictions are already known. * Note: This method is used in the method evaluateEachIteration to avoid recomputing the * predicted values from previously fit trees. - * @param datum: LabeledPoint - * @param prediction: Predicted label. - * @return Mean squared error of model on datapoint. + * @param prediction Predicted label. + * @param datum LabeledPoint + * @return Mean squared error of model on datapoint. */ - override def computeError(datum: LabeledPoint, prediction: Double): Double = { + override def computeError(prediction: Double, datum: LabeledPoint): Double = { val err = prediction - datum.label err * err } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala index b4dc0e7398292..1eee8e96f7975 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala @@ -113,47 +113,52 @@ class GradientBoostedTreesModel( /** * Method to compute error or loss for every iteration of gradient boosting. - * @param data: RDD of [[org.apache.spark.mllib.regression.LabeledPoint]] - * @param loss: evaluation metric. + * @param data RDD of [[org.apache.spark.mllib.regression.LabeledPoint]] + * @param loss evaluation metric. * @return an array with index i having the losses or errors for the ensemble * containing trees 1 to i + 1 */ def evaluateEachIteration( data: RDD[LabeledPoint], - loss: Loss) : Array[Double] = { + loss: Loss): Array[Double] = { val sc = data.sparkContext val remappedData = algo match { case Classification => data.map(x => new LabeledPoint((x.label * 2) - 1, x.features)) case _ => data } - val initialTree = trees(0) + val numIterations = trees.length val evaluationArray = Array.fill(numIterations)(0.0) - // Initial weight is 1.0 - var predictionErrorModel = remappedData.map {i => - val pred = initialTree.predict(i.features) - val error = loss.computeError(i, pred) + var predictionAndError: RDD[(Double, Double)] = remappedData.map { i => + val pred = treeWeights(0) * trees(0).predict(i.features) + val error = loss.computeError(pred, i) (pred, error) } - evaluationArray(0) = predictionErrorModel.values.mean() + evaluationArray(0) = predictionAndError.values.mean() // Avoid the model being copied across numIterations. val broadcastTrees = sc.broadcast(trees) val broadcastWeights = sc.broadcast(treeWeights) - (1 until numIterations).map {nTree => - predictionErrorModel = (remappedData zip predictionErrorModel) map { - case (point, (pred, error)) => { - val newPred = pred + ( - broadcastTrees.value(nTree).predict(point.features) * broadcastWeights.value(nTree)) - val newError = loss.computeError(point, newPred) - (newPred, newError) + (1 until numIterations).map { nTree => + val currentTree = broadcastTrees.value(nTree) + val currentTreeWeight = broadcastWeights.value(nTree) + predictionAndError = remappedData.zip(predictionAndError).mapPartitions { iter => + iter map { + case (point, (pred, error)) => { + val newPred = pred + currentTree.predict(point.features) * currentTreeWeight + val newError = loss.computeError(newPred, point) + (newPred, newError) + } } } - evaluationArray(nTree) = predictionErrorModel.values.mean() + evaluationArray(nTree) = predictionAndError.values.mean() } + + broadcastTrees.unpersist() + broadcastWeights.unpersist() evaluationArray } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/tree/GradientBoostedTreesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/tree/GradientBoostedTreesSuite.scala index 2d90764eeb799..55b0bac7d49fe 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/tree/GradientBoostedTreesSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/tree/GradientBoostedTreesSuite.scala @@ -197,7 +197,7 @@ class GradientBoostedTreesSuite extends FunSuite with MLlibTestSparkContext { assert(evaluationArray(numTrees) > evaluationArray(numTrees - 1)) var i = 1 while (i < numTrees) { - assert(evaluationArray(i) < evaluationArray(i - 1)) + assert(evaluationArray(i) <= evaluationArray(i - 1)) i += 1 } }