diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaCrossValidatorExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaCrossValidatorExample.java index f4b4f8d8c7b2f..65b393353b28b 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaCrossValidatorExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaCrossValidatorExample.java @@ -117,9 +117,9 @@ public static void main(String[] args) { // Make predictions on test documents. cvModel uses the best model found (lrModel). cvModel.transform(test).registerAsTable("prediction"); - JavaSchemaRDD predictions = jsql.sql("SELECT id, text, score, prediction FROM prediction"); + JavaSchemaRDD predictions = jsql.sql("SELECT id, text, probability, prediction FROM prediction"); for (Row r: predictions.collect()) { - System.out.println("(" + r.get(0) + ", " + r.get(1) + ") --> score=" + r.get(2) + System.out.println("(" + r.get(0) + ", " + r.get(1) + ") --> prob=" + r.get(2) + ", prediction=" + r.get(3)); } } diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaSimpleParamsExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaSimpleParamsExample.java index e25b271777ed4..16ced6b911ac8 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaSimpleParamsExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaSimpleParamsExample.java @@ -81,7 +81,7 @@ public static void main(String[] args) { // One can also combine ParamMaps. ParamMap paramMap2 = new ParamMap(); - paramMap2.put(lr.scoreCol().w("probability")); // Change output column name + paramMap2.put(lr.probabilityCol().w("myProbability")); // Change output column name ParamMap paramMapCombined = paramMap.$plus$plus(paramMap2); // Now learn a new model using the paramMapCombined parameters. @@ -98,8 +98,8 @@ public static void main(String[] args) { // Make predictions on test documents using the Transformer.transform() method. // LogisticRegression.transform will only use the 'features' column. - // Note that model2.transform() outputs a 'probability' column instead of the usual 'score' - // column since we renamed the lr.scoreCol parameter previously. + // Note that model2.transform() outputs a 'myProbability' column instead of the usual + // 'probability' column since we renamed the lr.probabilityCol parameter previously. model2.transform(test).registerAsTable("results"); JavaSchemaRDD results = jsql.sql("SELECT features, label, probability, prediction FROM results"); diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaSimpleTextClassificationPipeline.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaSimpleTextClassificationPipeline.java index 54f18014e4b2f..c2496d9c57b15 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaSimpleTextClassificationPipeline.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaSimpleTextClassificationPipeline.java @@ -84,9 +84,9 @@ public static void main(String[] args) { // Make predictions on test documents. model.transform(test).registerAsTable("prediction"); - JavaSchemaRDD predictions = jsql.sql("SELECT id, text, score, prediction FROM prediction"); + JavaSchemaRDD predictions = jsql.sql("SELECT id, text, probability, prediction FROM prediction"); for (Row r: predictions.collect()) { - System.out.println("(" + r.get(0) + ", " + r.get(1) + ") --> score=" + r.get(2) + System.out.println("(" + r.get(0) + ", " + r.get(1) + ") --> prob=" + r.get(2) + ", prediction=" + r.get(3)); } } diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala index ce6bc066bd70d..0db32835b8fdf 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala @@ -24,6 +24,7 @@ import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator import org.apache.spark.ml.feature.{HashingTF, Tokenizer} import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator} +import org.apache.spark.mllib.linalg.Vector import org.apache.spark.sql.{Row, SQLContext} /** @@ -101,10 +102,10 @@ object CrossValidatorExample { // Make predictions on test documents. cvModel uses the best model found (lrModel). cvModel.transform(test) - .select('id, 'text, 'score, 'prediction) + .select('id, 'text, 'probability, 'prediction) .collect() - .foreach { case Row(id: Long, text: String, score: Double, prediction: Double) => - println("(" + id + ", " + text + ") --> score=" + score + ", prediction=" + prediction) + .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) => + println("(" + id + ", " + text + ") --> prob=" + prob + ", prediction=" + prediction) } } } diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/DeveloperApiExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/DeveloperApiExample.scala new file mode 100644 index 0000000000000..2f1de5c58ed1e --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/ml/DeveloperApiExample.scala @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.ml + +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.SparkContext._ +import org.apache.spark.ml.classification.{Classifier, ClassifierParams, ClassificationModel} +import org.apache.spark.ml.param.{Params, IntParam, ParamMap} +import org.apache.spark.mllib.linalg.{BLAS, Vector, Vectors, VectorUDT} +import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.sql.{DataType, SchemaRDD, Row, SQLContext} + +/** + * A simple example demonstrating how to write your own learning algorithm using Estimator, + * Transformer, and other abstractions. + * This mimics [[org.apache.spark.ml.classification.LogisticRegression]]. + * Run with + * {{{ + * bin/run-example ml.DeveloperApiExample + * }}} + */ +object DeveloperApiExample { + + def main(args: Array[String]) { + val conf = new SparkConf().setAppName("DeveloperApiExample") + val sc = new SparkContext(conf) + val sqlContext = new SQLContext(sc) + import sqlContext._ + + // Prepare training data. + // We use LabeledPoint, which is a case class. Spark SQL can convert RDDs of Java Beans + // into SchemaRDDs, where it uses the bean metadata to infer the schema. + val training = sparkContext.parallelize(Seq( + LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)), + LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)), + LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)), + LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5)))) + + // Create a LogisticRegression instance. This instance is an Estimator. + val lr = new MyLogisticRegression() + // Print out the parameters, documentation, and any default values. + println("MyLogisticRegression parameters:\n" + lr.explainParams() + "\n") + + // We may set parameters using setter methods. + lr.setMaxIter(10) + + // Learn a LogisticRegression model. This uses the parameters stored in lr. + val model = lr.fit(training) + + // Prepare test data. + val test = sparkContext.parallelize(Seq( + LabeledPoint(1.0, Vectors.dense(-1.0, 1.5, 1.3)), + LabeledPoint(0.0, Vectors.dense(3.0, 2.0, -0.1)), + LabeledPoint(1.0, Vectors.dense(0.0, 2.2, -1.5)))) + + // Make predictions on test data. + val sumPredictions: Double = model.transform(test) + .select('features, 'label, 'prediction) + .collect() + .map { case Row(features: Vector, label: Double, prediction: Double) => + prediction + }.sum + assert(sumPredictions == 0.0, + "MyLogisticRegression predicted something other than 0, even though all weights are 0!") + } +} + +/** + * Example of defining a parameter trait for a user-defined type of [[Classifier]]. + * + * NOTE: This is private since it is an example. In practice, you may not want it to be private. + */ +private trait MyLogisticRegressionParams extends ClassifierParams { + + /** param for max number of iterations */ + val maxIter: IntParam = new IntParam(this, "maxIter", "max number of iterations") + def getMaxIter: Int = get(maxIter) +} + +/** + * Example of defining a type of [[Classifier]]. + * + * NOTE: This is private since it is an example. In practice, you may not want it to be private. + */ +private class MyLogisticRegression + extends Classifier[Vector, MyLogisticRegression, MyLogisticRegressionModel] + with MyLogisticRegressionParams { + + setMaxIter(100) // Initialize + + def setMaxIter(value: Int): this.type = set(maxIter, value) + + override def fit(dataset: SchemaRDD, paramMap: ParamMap): MyLogisticRegressionModel = { + // Check schema (types). This allows early failure before running the algorithm. + transformSchema(dataset.schema, paramMap, logging = true) + + // Extract columns from data using helper method. + val oldDataset = extractLabeledPoints(dataset, paramMap) + + // Combine given parameters with the embedded parameters, where the given paramMap overrides + // any embedded settings. + val map = this.paramMap ++ paramMap + + // Do learning to estimate the weight vector. + val numFeatures = oldDataset.take(1)(0).features.size + val weights = Vectors.zeros(numFeatures) // Learning would happen here. + + // Create a model to return. + val lrm = new MyLogisticRegressionModel(this, map, weights) + + // Copy model params. + // An Estimator stores the parameters for the Model it produces, and this copies any relevant + // parameters to the model. + Params.inheritValues(map, this, lrm) + + // Return the learned model. + lrm + } + + /** + * Returns the SQL DataType corresponding to the FeaturesType type parameter. + * This is used by [[ClassifierParams.validateAndTransformSchema()]] to check the input data. + */ + override protected def featuresDataType: DataType = new VectorUDT +} + +/** + * Example of defining a type of [[ClassificationModel]]. + * + * NOTE: This is private since it is an example. In practice, you may not want it to be private. + */ +private class MyLogisticRegressionModel( + override val parent: MyLogisticRegression, + override val fittingParamMap: ParamMap, + val weights: Vector) + extends ClassificationModel[Vector, MyLogisticRegressionModel] + with MyLogisticRegressionParams { + + // This uses the default implementation of transform(), which reads column "features" and outputs + // columns "prediction" and "rawPrediction." + + // This uses the default implementation of predict(), which chooses the label corresponding to + // the maximum value returned by [[predictRaw()]]. + + /** + * Raw prediction for each possible label. + * The meaning of a "raw" prediction may vary between algorithms, but it intuitively gives + * a measure of confidence in each possible label (where larger = more confident). + * This internal method is used to implement [[transform()]] and output [[rawPredictionCol]]. + * + * @return vector where element i is the raw prediction for label i. + * This raw prediction may be any real number, where a larger value indicates greater + * confidence for that label. + */ + override protected def predictRaw(features: Vector): Vector = { + val margin = BLAS.dot(features, weights) + // There are 2 classes (binary classification), so we return a length-2 vector, + // where index i corresponds to class i (i = 0, 1). + Vectors.dense(-margin, margin) + } + + /** Number of classes the label can take. 2 indicates binary classification. */ + override val numClasses: Int = 2 + + /** + * Create a copy of the model. + * The copy is shallow, except for the embedded paramMap, which gets a deep copy. + * + * This is used for the defaul implementation of [[transform()]]. + */ + override protected def copy(): MyLogisticRegressionModel = { + val m = new MyLogisticRegressionModel(parent, fittingParamMap, weights) + Params.inheritValues(this.paramMap, this, m) + m + } + + /** + * Returns the SQL DataType corresponding to the FeaturesType type parameter. + * This is used by [[ClassifierParams.validateAndTransformSchema()]] to check the input data. + */ + override protected def featuresDataType: DataType = new VectorUDT +} diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/SimpleParamsExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/SimpleParamsExample.scala index 44d5b084c269a..1c0d4d2c647ca 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/SimpleParamsExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/SimpleParamsExample.scala @@ -73,7 +73,7 @@ object SimpleParamsExample { paramMap.put(lr.regParam -> 0.1, lr.threshold -> 0.55) // Specify multiple Params. // One can also combine ParamMaps. - val paramMap2 = ParamMap(lr.scoreCol -> "probability") // Change output column name + val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability") // Change output column name val paramMapCombined = paramMap ++ paramMap2 // Now learn a new model using the paramMapCombined parameters. @@ -81,18 +81,18 @@ object SimpleParamsExample { val model2 = lr.fit(training, paramMapCombined) println("Model 2 was fit using parameters: " + model2.fittingParamMap) - // Prepare test documents. + // Prepare test data. val test = sparkContext.parallelize(Seq( LabeledPoint(1.0, Vectors.dense(-1.0, 1.5, 1.3)), LabeledPoint(0.0, Vectors.dense(3.0, 2.0, -0.1)), LabeledPoint(1.0, Vectors.dense(0.0, 2.2, -1.5)))) - // Make predictions on test documents using the Transformer.transform() method. + // Make predictions on test data using the Transformer.transform() method. // LogisticRegression.transform will only use the 'features' column. - // Note that model2.transform() outputs a 'probability' column instead of the usual 'score' - // column since we renamed the lr.scoreCol parameter previously. + // Note that model2.transform() outputs a 'myProbability' column instead of the usual + // 'probability' column since we renamed the lr.probabilityCol parameter previously. model2.transform(test) - .select('features, 'label, 'probability, 'prediction) + .select('features, 'label, 'myProbability, 'prediction) .collect() .foreach { case Row(features: Vector, label: Double, prob: Double, prediction: Double) => println("(" + features + ", " + label + ") -> prob=" + prob + ", prediction=" + prediction) diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/SimpleTextClassificationPipeline.scala b/examples/src/main/scala/org/apache/spark/examples/ml/SimpleTextClassificationPipeline.scala index 92895a05e479a..795852b9efc03 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/SimpleTextClassificationPipeline.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/SimpleTextClassificationPipeline.scala @@ -24,6 +24,7 @@ import org.apache.spark.SparkContext._ import org.apache.spark.ml.Pipeline import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.feature.{HashingTF, Tokenizer} +import org.apache.spark.mllib.linalg.Vector import org.apache.spark.sql.{Row, SQLContext} @BeanInfo @@ -80,10 +81,10 @@ object SimpleTextClassificationPipeline { // Make predictions on test documents. model.transform(test) - .select('id, 'text, 'score, 'prediction) + .select('id, 'text, 'probability, 'prediction) .collect() - .foreach { case Row(id: Long, text: String, score: Double, prediction: Double) => - println("(" + id + ", " + text + ") --> score=" + score + ", prediction=" + prediction) + .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) => + println("(" + id + ", " + text + ") --> prob=" + prob + ", prediction=" + prediction) } } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/LabeledPoint.scala b/mllib/src/main/scala/org/apache/spark/ml/LabeledPoint.scala deleted file mode 100644 index 8b6b2f3fa2756..0000000000000 --- a/mllib/src/main/scala/org/apache/spark/ml/LabeledPoint.scala +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.ml - -import scala.beans.BeanInfo - -import org.apache.spark.annotation.AlphaComponent -import org.apache.spark.mllib.linalg.Vector - -/** - * :: AlphaComponent :: - * Class that represents an instance (data point) for prediction tasks. - * - * @param label Label to predict - * @param features List of features describing this instance - * @param weight Instance weight - */ -@AlphaComponent -@BeanInfo -case class LabeledPoint(label: Double, features: Vector, weight: Double) { - - /** Constructor which sets instance weight to 1.0 */ - def this(label: Double, features: Vector) = this(label, features, 1.0) - - override def toString: String = { - "(%s,%s,%s)".format(label, features, weight) - } -} - -/** - * :: AlphaComponent :: - */ -@AlphaComponent -object LabeledPoint { - /** Constructor which sets instance weight to 1.0 */ - def apply(label: Double, features: Vector) = new LabeledPoint(label, features, 1.0) -} diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala index 2f31beb7303fb..243de234dffdf 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala @@ -17,27 +17,56 @@ package org.apache.spark.ml.classification -import org.apache.spark.annotation.AlphaComponent -import org.apache.spark.api.java.JavaRDD +import scala.reflect.runtime.universe._ + +import org.apache.spark.annotation.{DeveloperApi, AlphaComponent} import org.apache.spark.ml.impl.estimator.{PredictionModel, Predictor, PredictorParams} -import org.apache.spark.mllib.linalg.Vector -import org.apache.spark.rdd.RDD +import org.apache.spark.ml.param.{Params, ParamMap, HasRawPredictionCol} +import org.apache.spark.mllib.linalg.{Vector, VectorUDT} +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.analysis.Star /** + * :: DeveloperApi :: * Params for classification. - * Currently empty, but may add functionality later. */ -private[classification] trait ClassifierParams extends PredictorParams +@DeveloperApi +trait ClassifierParams extends PredictorParams + with HasRawPredictionCol { + + override protected def validateAndTransformSchema( + schema: StructType, + paramMap: ParamMap, + fitting: Boolean, + featuresDataType: DataType): StructType = { + val parentSchema = super.validateAndTransformSchema(schema, paramMap, fitting, featuresDataType) + val map = this.paramMap ++ paramMap + addOutputColumn(parentSchema, map(rawPredictionCol), new VectorUDT) + } +} /** - * Single-label binary or multiclass classification + * :: AlphaComponent :: + * Single-label binary or multiclass classification. * Classes are indexed {0, 1, ..., numClasses - 1}. + * + * @tparam FeaturesType Type of input features. E.g., [[Vector]] + * @tparam Learner Concrete Estimator type + * @tparam M Concrete Model type */ @AlphaComponent -abstract class Classifier[Learner <: Classifier[Learner, M], M <: ClassificationModel[M]] - extends Predictor[Learner, M] +abstract class Classifier[ + FeaturesType, + Learner <: Classifier[FeaturesType, Learner, M], + M <: ClassificationModel[FeaturesType, M]] + extends Predictor[FeaturesType, Learner, M] with ClassifierParams { + setRawPredictionCol("") // Do not output by default + + def setRawPredictionCol(value: String): Learner = + set(rawPredictionCol, value).asInstanceOf[Learner] + // TODO: defaultEvaluator (follow-up PR) } @@ -46,42 +75,130 @@ abstract class Classifier[Learner <: Classifier[Learner, M], M <: Classification * Model produced by a [[Classifier]]. * Classes are indexed {0, 1, ..., numClasses - 1}. * - * @tparam M Model type. + * @tparam FeaturesType Type of input features. E.g., [[Vector]] + * @tparam M Concrete Model type */ @AlphaComponent -abstract class ClassificationModel[M <: ClassificationModel[M]] - extends PredictionModel[M] with ClassifierParams { +abstract class ClassificationModel[FeaturesType, M <: ClassificationModel[FeaturesType, M]] + extends PredictionModel[FeaturesType, M] with ClassifierParams { + + setRawPredictionCol("") // Do not output by default + + def setRawPredictionCol(value: String): M = set(rawPredictionCol, value).asInstanceOf[M] /** Number of classes (values which the label can take). */ def numClasses: Int /** + * Transforms dataset by reading from [[featuresCol]], and appending new columns as specified by + * parameters: + * - predicted labels as [[predictionCol]] of type [[Double]] + * - raw predictions (confidences) as [[rawPredictionCol]] of type [[Vector]]. + * + * @param dataset input dataset + * @param paramMap additional parameters, overwrite embedded params + * @return transformed dataset + */ + override def transform(dataset: SchemaRDD, paramMap: ParamMap): SchemaRDD = { + // This default implementation should be overridden as needed. + + // Check schema + transformSchema(dataset.schema, paramMap, logging = true) + val map = this.paramMap ++ paramMap + + // Prepare model + val tmpModel = if (paramMap.size != 0) { + val tmpModel = this.copy() + Params.inheritValues(paramMap, parent, tmpModel) + tmpModel + } else { + this + } + + val (numColsOutput, outputData) = + ClassificationModel.transformColumnsImpl[FeaturesType](dataset, tmpModel, map) + if (numColsOutput == 0) { + logWarning(s"$uid: ClassificationModel.transform() was called as NOOP" + + " since no output columns were set.") + } + outputData + } + + /** + * :: DeveloperApi :: + * * Predict label for the given features. + * This internal method is used to implement [[transform()]] and output [[predictionCol]]. + * * This default implementation for classification predicts the index of the maximum value * from [[predictRaw()]]. */ - override def predict(features: Vector): Double = { + @DeveloperApi + override protected def predict(features: FeaturesType): Double = { predictRaw(features).toArray.zipWithIndex.maxBy(_._1)._2 } /** + * :: DeveloperApi :: + * * Raw prediction for each possible label. * The meaning of a "raw" prediction may vary between algorithms, but it intuitively gives - * a magnitude of confidence in each possible label. + * a measure of confidence in each possible label (where larger = more confident). + * This internal method is used to implement [[transform()]] and output [[rawPredictionCol]]. + * * @return vector where element i is the raw prediction for label i. * This raw prediction may be any real number, where a larger value indicates greater * confidence for that label. */ - def predictRaw(features: Vector): Vector + @DeveloperApi + protected def predictRaw(features: FeaturesType): Vector + +} + +private[ml] object ClassificationModel { + + /** + * Added prediction column(s). This is separated from [[ClassificationModel.transform()]] + * since it is used by [[org.apache.spark.ml.classification.ProbabilisticClassificationModel]]. + * @param dataset Input dataset + * @param map Parameter map. This will NOT be merged with the embedded paramMap; the merge + * should already be done. + * @return (number of columns added, transformed dataset) + */ + private[ml] def transformColumnsImpl[FeaturesType]( + dataset: SchemaRDD, + model: ClassificationModel[FeaturesType, _], + map: ParamMap): (Int, SchemaRDD) = { - /** Batch version of [[predictRaw]] */ - def predictRaw(dataset: RDD[Vector]): RDD[Vector] = dataset.map(predictRaw) + import org.apache.spark.sql.catalyst.dsl._ + import dataset.sqlContext._ - /** Java-friendly batch version of [[predictRaw]] */ - def predictRaw(dataset: JavaRDD[Vector]): JavaRDD[Vector] = { - dataset.rdd.map(predictRaw).toJavaRDD() + // Output selected columns only. + // This is a bit complicated since it tries to avoid repeated computation. + var tmpData = dataset + var numColsOutput = 0 + if (map(model.rawPredictionCol) != "") { + // output raw prediction + val features2raw: FeaturesType => Vector = model.predictRaw + tmpData = tmpData.select(Star(None), + features2raw.call(map(model.featuresCol).attr) as map(model.rawPredictionCol)) + numColsOutput += 1 + if (map(model.predictionCol) != "") { + val raw2pred: Vector => Double = (rawPred) => { + rawPred.toArray.zipWithIndex.maxBy(_._1)._2 + } + tmpData = tmpData.select(Star(None), + raw2pred.call(map(model.rawPredictionCol).attr) as map(model.predictionCol)) + numColsOutput += 1 + } + } else if (map(model.predictionCol) != "") { + // output prediction + val features2pred: FeaturesType => Double = model.predict + tmpData = tmpData.select(Star(None), + features2pred.call(map(model.featuresCol).attr) as map(model.predictionCol)) + numColsOutput += 1 + } + (numColsOutput, tmpData) } - // TODO: accuracy(dataset: RDD[LabeledPoint]): Double (follow-up PR) - } diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index f9e8a2277faf9..62b543ef7141c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -18,12 +18,9 @@ package org.apache.spark.ml.classification import org.apache.spark.annotation.AlphaComponent -import org.apache.spark.ml.LabeledPoint -import org.apache.spark.ml.impl.estimator.ProbabilisticClassificationModel import org.apache.spark.ml.param._ import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS -import org.apache.spark.mllib.linalg.{Vectors, BLAS, Vector} -import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.linalg.{VectorUDT, Vectors, BLAS, Vector} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.Star import org.apache.spark.sql.catalyst.dsl._ @@ -32,22 +29,8 @@ import org.apache.spark.storage.StorageLevel /** * Params for logistic regression. */ -private[classification] trait LogisticRegressionParams extends ClassifierParams - with HasRegParam with HasMaxIter with HasThreshold with HasScoreCol { - - override protected def validateAndTransformSchema( - schema: StructType, - paramMap: ParamMap, - fitting: Boolean): StructType = { - val parentSchema = super.validateAndTransformSchema(schema, paramMap, fitting) - val map = this.paramMap ++ paramMap - val fieldNames = parentSchema.fieldNames - require(!fieldNames.contains(map(scoreCol)), s"Score column ${map(scoreCol)} already exists.") - val outputFields = parentSchema.fields ++ Seq( - StructField(map(scoreCol), DoubleType, nullable = false)) - StructType(outputFields) - } -} +private[classification] trait LogisticRegressionParams extends ProbabilisticClassifierParams + with HasRegParam with HasMaxIter with HasThreshold /** @@ -56,7 +39,8 @@ private[classification] trait LogisticRegressionParams extends ClassifierParams * Currently, this class only supports binary classification. */ @AlphaComponent -class LogisticRegression extends Classifier[LogisticRegression, LogisticRegressionModel] +class LogisticRegression + extends ProbabilisticClassifier[Vector, LogisticRegression, LogisticRegressionModel] with LogisticRegressionParams { setRegParam(0.1) @@ -66,44 +50,37 @@ class LogisticRegression extends Classifier[LogisticRegression, LogisticRegressi def setRegParam(value: Double): this.type = set(regParam, value) def setMaxIter(value: Int): this.type = set(maxIter, value) def setThreshold(value: Double): this.type = set(threshold, value) - def setScoreCol(value: String): this.type = set(scoreCol, value) - /** - * Same as [[fit()]], but using strong types. - * NOTE: This does NOT support instance weights. - * @param dataset Training data. Instance weights are ignored. - * @param paramMap Parameters for training. - * These values override any specified in this Estimator's embedded ParamMap. - */ - override def train(dataset: RDD[LabeledPoint], paramMap: ParamMap): LogisticRegressionModel = { + override def fit(dataset: SchemaRDD, paramMap: ParamMap): LogisticRegressionModel = { + // Check schema + transformSchema(dataset.schema, paramMap, logging = true) + + // Extract columns from data. If dataset is persisted, do not persist oldDataset. + val oldDataset = extractLabeledPoints(dataset, paramMap) val map = this.paramMap ++ paramMap - val oldDataset = dataset.map { case LabeledPoint(label: Double, features: Vector, weight) => - org.apache.spark.mllib.regression.LabeledPoint(label, features) - } - // If dataset is persisted, do not persist oldDataset. val handlePersistence = dataset.getStorageLevel == StorageLevel.NONE if (handlePersistence) { oldDataset.persist(StorageLevel.MEMORY_AND_DISK) } + + // Train model val lr = new LogisticRegressionWithLBFGS lr.optimizer .setRegParam(map(regParam)) .setNumIterations(map(maxIter)) - val model = lr.run(oldDataset) - val lrm = new LogisticRegressionModel(this, map, model.weights, model.intercept) + val oldModel = lr.run(oldDataset) + val lrm = new LogisticRegressionModel(this, map, oldModel.weights, oldModel.intercept) + if (handlePersistence) { oldDataset.unpersist() } + + // copy model params + Params.inheritValues(map, this, lrm) lrm } - /** - * Same as [[fit()]], but using strong types. - * NOTE: This does NOT support instance weights. - * @param dataset Training data. Instance weights are ignored. - */ - override def train(dataset: RDD[LabeledPoint]): LogisticRegressionModel = - train(dataset, new ParamMap()) // Override documentation + override protected def featuresDataType: DataType = new VectorUDT } @@ -117,14 +94,12 @@ class LogisticRegressionModel private[ml] ( override val fittingParamMap: ParamMap, val weights: Vector, val intercept: Double) - extends ClassificationModel[LogisticRegressionModel] - with ProbabilisticClassificationModel + extends ProbabilisticClassificationModel[Vector, LogisticRegressionModel] with LogisticRegressionParams { setThreshold(0.5) def setThreshold(value: Double): this.type = set(threshold, value) - def setScoreCol(value: String): this.type = set(scoreCol, value) private val margin: Vector => Double = (features) => { BLAS.dot(features, weights) + intercept @@ -136,42 +111,94 @@ class LogisticRegressionModel private[ml] ( } override def transform(dataset: SchemaRDD, paramMap: ParamMap): SchemaRDD = { + // Check schema transformSchema(dataset.schema, paramMap, logging = true) + import dataset.sqlContext._ val map = this.paramMap ++ paramMap - val t = map(threshold) - val predict: Double => Double = (score) => { - if (score > t) 1.0 else 0.0 + + // Output selected columns only. + // This is a bit complicated since it tries to avoid repeated computation. + // rawPrediction (-margin, margin) + // probability (1.0-score, score) + // prediction (max margin) + var tmpData = dataset + var numColsOutput = 0 + if (map(rawPredictionCol) != "") { + val features2raw: Vector => Vector = predictRaw + tmpData = tmpData.select(Star(None), + features2raw.call(map(featuresCol).attr) as map(rawPredictionCol)) + numColsOutput += 1 } - dataset.select(Star(None), score.call(map(featuresCol).attr) as map(scoreCol)) - .select(Star(None), predict.call(map(scoreCol).attr) as map(predictionCol)) + if (map(probabilityCol) != "") { + if (map(rawPredictionCol) != "") { + val raw2prob: Vector => Vector = (rawPreds) => { + val prob1 = 1.0 / 1.0 + math.exp(-rawPreds(1)) + Vectors.dense(1.0 - prob1, prob1) + } + tmpData = tmpData.select(Star(None), + raw2prob.call(map(rawPredictionCol).attr) as map(probabilityCol)) + } else { + val features2prob: Vector => Vector = predictProbabilities + tmpData = tmpData.select(Star(None), + features2prob.call(map(featuresCol).attr) as map(probabilityCol)) + } + numColsOutput += 1 + } + if (map(predictionCol) != "") { + val t = map(threshold) + if (map(probabilityCol) != "") { + val predict: Vector => Double = (probs) => { + if (probs(1) > t) 1.0 else 0.0 + } + tmpData = tmpData.select(Star(None), + predict.call(map(probabilityCol).attr) as map(predictionCol)) + } else if (map(rawPredictionCol) != "") { + val predict: Vector => Double = (rawPreds) => { + val prob1 = 1.0 / 1.0 + math.exp(-rawPreds(1)) + if (prob1 > t) 1.0 else 0.0 + } + tmpData = tmpData.select(Star(None), + predict.call(map(rawPredictionCol).attr) as map(predictionCol)) + } else { + val predict: Vector => Double = this.predict + tmpData = tmpData.select(Star(None), + predict.call(map(featuresCol).attr) as map(predictionCol)) + } + numColsOutput += 1 + } + if (numColsOutput == 0) { + this.logWarning(s"$uid: LogisticRegressionModel.transform() was called as NOOP" + + " since no output columns were set.") + } + tmpData } override val numClasses: Int = 2 - // TODO: Override batch predict() for efficiency. - /** * Predict label for the given feature vector. * The behavior of this can be adjusted using [[threshold]]. */ - override def predict(features: Vector): Double = { + override protected def predict(features: Vector): Double = { if (score(features) > paramMap(threshold)) 1 else 0 } - override def predictProbabilities(features: Vector): Vector = { + override protected def predictProbabilities(features: Vector): Vector = { val s = score(features) - Vectors.dense(Array(1.0 - s, s)) + Vectors.dense(1.0 - s, s) } - override def predictRaw(features: Vector): Vector = { + override protected def predictRaw(features: Vector): Vector = { val m = margin(features) - Vectors.dense(Array(-m, m)) + Vectors.dense(-m, m) } - private[ml] override def copy(): LogisticRegressionModel = { + override protected def copy(): LogisticRegressionModel = { val m = new LogisticRegressionModel(parent, fittingParamMap, weights, intercept) Params.inheritValues(this.paramMap, this, m) m } + + override protected def featuresDataType: DataType = new VectorUDT } diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala new file mode 100644 index 0000000000000..41f9b9601a00b --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.classification + +import scala.reflect.runtime.universe._ + +import org.apache.spark.annotation.{AlphaComponent, DeveloperApi} +import org.apache.spark.ml.param.{HasProbabilityCol, ParamMap, Params} +import org.apache.spark.mllib.linalg.{Vector, VectorUDT} +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.analysis.Star + +/** + * Params for probabilistic classification. + */ +private[classification] trait ProbabilisticClassifierParams + extends ClassifierParams with HasProbabilityCol { + + override protected def validateAndTransformSchema( + schema: StructType, + paramMap: ParamMap, + fitting: Boolean, + featuresDataType: DataType): StructType = { + val parentSchema = super.validateAndTransformSchema(schema, paramMap, fitting, featuresDataType) + val map = this.paramMap ++ paramMap + addOutputColumn(parentSchema, map(probabilityCol), new VectorUDT) + } +} + +/** + * :: AlphaComponent :: + * Single-label binary or multiclass classifier which can output class conditional probabilities. + * + * @tparam FeaturesType Type of input features. E.g., [[Vector]] + * @tparam Learner Concrete Estimator type + * @tparam M Concrete Model type + */ +@AlphaComponent +abstract class ProbabilisticClassifier[ + FeaturesType, + Learner <: ProbabilisticClassifier[FeaturesType, Learner, M], + M <: ProbabilisticClassificationModel[FeaturesType, M]] + extends Classifier[FeaturesType, Learner, M] with ProbabilisticClassifierParams { + + setProbabilityCol("") // Do not output by default + + def setProbabilityCol(value: String): Learner = set(probabilityCol, value).asInstanceOf[Learner] +} + +/** + * :: AlphaComponent :: + * Model produced by a [[ProbabilisticClassifier]]. + * Classes are indexed {0, 1, ..., numClasses - 1}. + * + * @tparam FeaturesType Type of input features. E.g., [[Vector]] + * @tparam M Concrete Model type + */ +@AlphaComponent +abstract class ProbabilisticClassificationModel[ + FeaturesType, + M <: ProbabilisticClassificationModel[FeaturesType, M]] + extends ClassificationModel[FeaturesType, M] with ProbabilisticClassifierParams { + + setProbabilityCol("") // Do not output by default + + def setProbabilityCol(value: String): M = set(probabilityCol, value).asInstanceOf[M] + + /** + * Transforms dataset by reading from [[featuresCol]], and appending new columns as specified by + * parameters: + * - predicted labels as [[predictionCol]] of type [[Double]] + * - raw predictions (confidences) as [[rawPredictionCol]] of type [[Vector]] + * - probability of each class as [[probabilityCol]] of type [[Vector]]. + * + * @param dataset input dataset + * @param paramMap additional parameters, overwrite embedded params + * @return transformed dataset + */ + override def transform(dataset: SchemaRDD, paramMap: ParamMap): SchemaRDD = { + // This default implementation should be overridden as needed. + import dataset.sqlContext._ + import org.apache.spark.sql.catalyst.dsl._ + + // Check schema + transformSchema(dataset.schema, paramMap, logging = true) + val map = this.paramMap ++ paramMap + + // Prepare model + val tmpModel = if (paramMap.size != 0) { + val tmpModel = this.copy() + Params.inheritValues(paramMap, parent, tmpModel) + tmpModel + } else { + this + } + + val (numColsOutput, outputData) = + ClassificationModel.transformColumnsImpl[FeaturesType](dataset, tmpModel, map) + + // Output selected columns only. + if (map(probabilityCol) != "") { + // output probabilities + val features2probs: FeaturesType => Vector = (features) => { + tmpModel.predictProbabilities(features) + } + outputData.select(Star(None), + features2probs.call(map(featuresCol).attr) as map(probabilityCol)) + } else { + if (numColsOutput == 0) { + this.logWarning(s"$uid: ProbabilisticClassificationModel.transform() was called as NOOP" + + " since no output columns were set.") + } + outputData + } + } + + /** + * :: DeveloperApi :: + * + * Predict the probability of each class given the features. + * These predictions are also called class conditional probabilities. + * + * WARNING: Not all models output well-calibrated probability estimates! These probabilities + * should be treated as confidences, not precise probabilities. + * + * This internal method is used to implement [[transform()]] and output [[probabilityCol]]. + */ + @DeveloperApi + protected def predictProbabilities(features: FeaturesType): Vector +} diff --git a/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala b/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala index 0b0504e036ec9..602d1fce1fc0f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala @@ -21,6 +21,7 @@ import org.apache.spark.annotation.AlphaComponent import org.apache.spark.ml._ import org.apache.spark.ml.param._ import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics +import org.apache.spark.mllib.linalg.{Vector, VectorUDT} import org.apache.spark.sql.{DoubleType, Row, SchemaRDD} /** @@ -29,7 +30,7 @@ import org.apache.spark.sql.{DoubleType, Row, SchemaRDD} */ @AlphaComponent class BinaryClassificationEvaluator extends Evaluator with Params - with HasScoreCol with HasLabelCol { + with HasRawPredictionCol with HasLabelCol { /** param for metric name in evaluation */ val metricName: Param[String] = new Param(this, "metricName", @@ -37,24 +38,21 @@ class BinaryClassificationEvaluator extends Evaluator with Params def getMetricName: String = get(metricName) def setMetricName(value: String): this.type = set(metricName, value) - def setScoreCol(value: String): this.type = set(scoreCol, value) + def setScoreCol(value: String): this.type = set(rawPredictionCol, value) def setLabelCol(value: String): this.type = set(labelCol, value) override def evaluate(dataset: SchemaRDD, paramMap: ParamMap): Double = { val map = this.paramMap ++ paramMap val schema = dataset.schema - val scoreType = schema(map(scoreCol)).dataType - require(scoreType == DoubleType, - s"Score column ${map(scoreCol)} must be double type but found $scoreType") - val labelType = schema(map(labelCol)).dataType - require(labelType == DoubleType, - s"Label column ${map(labelCol)} must be double type but found $labelType") + checkInputColumn(schema, map(rawPredictionCol), new VectorUDT) + checkInputColumn(schema, map(labelCol), DoubleType) import dataset.sqlContext._ - val scoreAndLabels = dataset.select(map(scoreCol).attr, map(labelCol).attr) - .map { case Row(score: Double, label: Double) => - (score, label) + // TODO: When dataset metadata has been implemented, check rawPredictionCol vector length = 2. + val scoreAndLabels = dataset.select(map(rawPredictionCol).attr, map(labelCol).attr) + .map { case Row(rawPrediction: Vector, label: Double) => + (rawPrediction(1), label) } val metrics = new BinaryClassificationMetrics(scoreAndLabels) val metric = map(metricName) match { diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala index 9352f40f372d3..caaca07a4b013 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala @@ -29,11 +29,11 @@ import org.apache.spark.sql.{DataType, StringType, ArrayType} @AlphaComponent class Tokenizer extends UnaryTransformer[String, Seq[String], Tokenizer] { - protected override def createTransformFunc(paramMap: ParamMap): String => Seq[String] = { + override protected def createTransformFunc(paramMap: ParamMap): String => Seq[String] = { _.toLowerCase.split("\\s") } - protected override def validateInputType(inputType: DataType): Unit = { + override protected def validateInputType(inputType: DataType): Unit = { require(inputType == StringType, s"Input type must be string type but got $inputType.") } diff --git a/mllib/src/main/scala/org/apache/spark/ml/impl/estimator/Predictor.scala b/mllib/src/main/scala/org/apache/spark/ml/impl/estimator/Predictor.scala index 48cecfefd4c07..35ca5a0bcbe00 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/impl/estimator/Predictor.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/impl/estimator/Predictor.scala @@ -17,15 +17,23 @@ package org.apache.spark.ml.impl.estimator -import org.apache.spark.api.java.JavaRDD -import org.apache.spark.ml.{Estimator, LabeledPoint, Model} +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.ml.{Estimator, Model} import org.apache.spark.ml.param._ -import org.apache.spark.mllib.linalg.{Vector, VectorUDT} +import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.Star -private[ml] trait PredictorParams extends Params + +/** + * :: DeveloperApi :: + * + * Trait for parameters for prediction (regression and classification). + */ +@DeveloperApi +trait PredictorParams extends Params with HasLabelCol with HasFeaturesCol with HasPredictionCol { /** @@ -33,33 +41,41 @@ private[ml] trait PredictorParams extends Params * @param schema input schema * @param paramMap additional parameters * @param fitting whether this is in fitting + * @param featuresDataType SQL DataType for FeaturesType. + * E.g., [[org.apache.spark.mllib.linalg.VectorUDT]] for vector features. * @return output schema */ protected def validateAndTransformSchema( schema: StructType, paramMap: ParamMap, - fitting: Boolean): StructType = { + fitting: Boolean, + featuresDataType: DataType): StructType = { val map = this.paramMap ++ paramMap - val featuresType = schema(map(featuresCol)).dataType - // TODO: Support casting Array[Double] and Array[Float] to Vector. - require(featuresType.isInstanceOf[VectorUDT], - s"Features column ${map(featuresCol)} must be Vector types" + - s" but was actually $featuresType.") + // TODO: Support casting Array[Double] and Array[Float] to Vector when FeaturesType = Vector + checkInputColumn(schema, map(featuresCol), featuresDataType) if (fitting) { - val labelType = schema(map(labelCol)).dataType - require(labelType == DoubleType || labelType == IntegerType, - s"Cannot convert label column ${map(labelCol)} of type $labelType to a Double column.") + // TODO: Allow other numeric types + checkInputColumn(schema, map(labelCol), DoubleType) } - val fieldNames = schema.fieldNames - require(!fieldNames.contains(map(predictionCol)), - s"Prediction column ${map(predictionCol)} already exists.") - val outputFields = schema.fields ++ Seq( - StructField(map(predictionCol), DoubleType, nullable = false)) - StructType(outputFields) + addOutputColumn(schema, map(predictionCol), DoubleType) } } -private[ml] abstract class Predictor[Learner <: Predictor[Learner, M], M <: PredictionModel[M]] +/** + * Abstraction for prediction problems (regression and classification). + * + * @tparam FeaturesType Type of features. + * E.g., [[org.apache.spark.mllib.linalg.VectorUDT]] for vector features. + * @tparam Learner Specialization of this class. If you subclass this type, use this type + * parameter to specify the concrete type. + * @tparam M Specialization of [[PredictionModel]]. If you subclass this type, use this type + * parameter to specify the concrete type for the corresponding model. + */ +@DeveloperApi +abstract class Predictor[ + FeaturesType, + Learner <: Predictor[FeaturesType, Learner, M], + M <: PredictionModel[FeaturesType, M]] extends Estimator[M] with PredictorParams { // TODO: Eliminate asInstanceOf and see if that works. @@ -67,6 +83,8 @@ private[ml] abstract class Predictor[Learner <: Predictor[Learner, M], M <: Pred def setFeaturesCol(value: String): Learner = set(featuresCol, value).asInstanceOf[Learner] def setPredictionCol(value: String): Learner = set(predictionCol, value).asInstanceOf[Learner] + /* + // This will be useful for boosting. protected def selectLabelColumn(dataset: SchemaRDD, paramMap: ParamMap): RDD[Double] = { import dataset.sqlContext._ val map = this.paramMap ++ paramMap @@ -75,113 +93,109 @@ private[ml] abstract class Predictor[Learner <: Predictor[Learner, M], M <: Pred case Row(label: Int) => label.toDouble } } + */ + + /** + * :: DeveloperApi :: + * + * Returns the SQL DataType corresponding to the FeaturesType type parameter. + * + * This is used by [[validateAndTransformSchema()]]. + * This workaround is needed since SQL has different APIs for Scala and Java. + */ + @DeveloperApi + protected def featuresDataType: DataType private[ml] override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = { - validateAndTransformSchema(schema, paramMap, fitting = true) + validateAndTransformSchema(schema, paramMap, fitting = true, featuresDataType) } - override def fit(dataset: SchemaRDD, paramMap: ParamMap): M = { - transformSchema(dataset.schema, paramMap, logging = true) + /** + * Extract [[labelCol]] and [[featuresCol]] from the given dataset, + * and put it in an RDD with strong types. + */ + protected def extractLabeledPoints(dataset: SchemaRDD, paramMap: ParamMap): RDD[LabeledPoint] = { import dataset.sqlContext._ val map = this.paramMap ++ paramMap - val instances = dataset.select(map(labelCol).attr, map(featuresCol).attr) + dataset.select(map(labelCol).attr, map(featuresCol).attr) .map { case Row(label: Double, features: Vector) => LabeledPoint(label, features) } - val model = train(instances, map) - // copy model params - Params.inheritValues(map, this, model) - model } - - /** - * Same as [[fit()]], but using strong types. - * - * @param dataset Training data - * @param paramMap Parameters for training. - * These values override any specified in this Estimator's embedded ParamMap. - */ - def train(dataset: RDD[LabeledPoint], paramMap: ParamMap): M - - /** - * Same as [[fit()]], but using strong types. - * @param dataset Training data - */ - def train(dataset: RDD[LabeledPoint]): M = train(dataset, new ParamMap()) - - /** Java-friendly version of [[train()]]. */ - def train(dataset: JavaRDD[LabeledPoint], paramMap: ParamMap): M = train(dataset.rdd, paramMap) - - /** Java-friendly version of [[train()]]. */ - def train(dataset: JavaRDD[LabeledPoint]): M = train(dataset.rdd) } -private[ml] abstract class PredictionModel[M <: PredictionModel[M]] +private[ml] abstract class PredictionModel[FeaturesType, M <: PredictionModel[FeaturesType, M]] extends Model[M] with PredictorParams { def setFeaturesCol(value: String): M = set(featuresCol, value).asInstanceOf[M] def setPredictionCol(value: String): M = set(predictionCol, value).asInstanceOf[M] + /** + * :: DeveloperApi :: + * + * Returns the SQL DataType corresponding to the FeaturesType type parameter. + * + * This is used by [[validateAndTransformSchema()]]. + * This workaround is needed since SQL has different APIs for Scala and Java. + */ + @DeveloperApi + protected def featuresDataType: DataType + private[ml] override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = { - validateAndTransformSchema(schema, paramMap, fitting = false) + validateAndTransformSchema(schema, paramMap, fitting = false, featuresDataType) } /** - * Transforms dataset by reading from [[featuresCol]], calling [[predict( )]], and storing + * Transforms dataset by reading from [[featuresCol]], calling [[predict()]], and storing * the predictions as a new column [[predictionCol]]. - * This default implementation should be overridden as needed. + * * @param dataset input dataset * @param paramMap additional parameters, overwrite embedded params * @return transformed dataset with [[predictionCol]] of type [[Double]] */ override def transform(dataset: SchemaRDD, paramMap: ParamMap): SchemaRDD = { + // This default implementation should be overridden as needed. import org.apache.spark.sql.catalyst.dsl._ import dataset.sqlContext._ + // Check schema transformSchema(dataset.schema, paramMap, logging = true) val map = this.paramMap ++ paramMap - val tmpModel = this.copy() - Params.inheritValues(paramMap, parent, tmpModel) - val pred: Vector => Double = (features) => { - tmpModel.predict(features) + + // Prepare model + val tmpModel = if (paramMap.size != 0) { + val tmpModel = this.copy() + Params.inheritValues(paramMap, parent, tmpModel) + tmpModel + } else { + this } - dataset.select(Star(None), pred.call(map(featuresCol).attr) as map(predictionCol)) - } - /** - * Strongly typed version of [[transform()]]. - * Default implementation using single-instance predict(). - * - * Developers should override this for efficiency. E.g., this does not broadcast the model. - */ - def predict(dataset: RDD[Vector], paramMap: ParamMap): RDD[Double] = { - val tmpModel = this.copy() - Params.inheritValues(paramMap, parent, tmpModel) - dataset.map(tmpModel.predict) + if (map(predictionCol) != "") { + val pred: FeaturesType => Double = (features) => { + tmpModel.predict(features) + } + dataset.select(Star(None), pred.call(map(featuresCol).attr) as map(predictionCol)) + } else { + this.logWarning(s"$uid: Predictor.transform() was called as NOOP" + + " since no output columns were set.") + dataset + } } - /** Strongly typed version of [[transform()]]. */ - def predict(dataset: RDD[Vector]): RDD[Double] = predict(dataset, new ParamMap) - /** + * :: DeveloperApi :: + * * Predict label for the given features. + * This internal method is used to implement [[transform()]] and output [[predictionCol]]. */ - def predict(features: Vector): Double - - /** Java-friendly version of [[predict()]]. */ - def predict(dataset: JavaRDD[Vector], paramMap: ParamMap): JavaRDD[java.lang.Double] = { - predict(dataset.rdd, paramMap).map(_.asInstanceOf[java.lang.Double]).toJavaRDD() - } - - /** Java-friendly version of [[predict()]]. */ - def predict(dataset: JavaRDD[Vector]): JavaRDD[java.lang.Double] = { - predict(dataset.rdd, new ParamMap).map(_.asInstanceOf[java.lang.Double]).toJavaRDD() - } + @DeveloperApi + protected def predict(features: FeaturesType): Double /** * Create a copy of the model. * The copy is shallow, except for the embedded paramMap, which gets a deep copy. */ - private[ml] def copy(): M + protected def copy(): M } diff --git a/mllib/src/main/scala/org/apache/spark/ml/impl/estimator/ProbabilisticClassificationModel.scala b/mllib/src/main/scala/org/apache/spark/ml/impl/estimator/ProbabilisticClassificationModel.scala deleted file mode 100644 index e534a8c264bb3..0000000000000 --- a/mllib/src/main/scala/org/apache/spark/ml/impl/estimator/ProbabilisticClassificationModel.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.ml.impl.estimator - -import org.apache.spark.api.java.JavaRDD -import org.apache.spark.mllib.linalg.Vector -import org.apache.spark.rdd.RDD - -/** - * Trait for a [[org.apache.spark.ml.classification.ClassificationModel]] which can output - * class conditional probabilities. - */ -private[ml] trait ProbabilisticClassificationModel { - - /** - * Predict the probability of each class given the features. - * These predictions are also called class conditional probabilities. - * - * WARNING: Not all models output well-calibrated probability estimates! These probabilities - * should be treated as confidences, not precise probabilities. - */ - def predictProbabilities(features: Vector): Vector - - /** Batch version of [[predictProbabilities()]] */ - def predictProbabilities(features: RDD[Vector]): RDD[Vector] = features.map(predictProbabilities) - - /** Java-friendly batch version of [[predictProbabilities()]] */ - def predictProbabilities(features: JavaRDD[Vector]): JavaRDD[Vector] = { - features.rdd.map(predictProbabilities).toJavaRDD() - } -} diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/params.scala b/mllib/src/main/scala/org/apache/spark/ml/param/params.scala index 4e18a5c9b7d08..e0e334c9b4f8a 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/params.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/params.scala @@ -19,11 +19,14 @@ package org.apache.spark.ml.param import scala.annotation.varargs import scala.collection.mutable +import scala.reflect.runtime.universe._ import java.lang.reflect.Modifier -import org.apache.spark.annotation.AlphaComponent +import org.apache.spark.annotation.{DeveloperApi, AlphaComponent} import org.apache.spark.ml.Identifiable +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.ScalaReflection /** * :: AlphaComponent :: @@ -158,7 +161,7 @@ trait Params extends Identifiable with Serializable { /** * Sets a parameter in the embedded param map. */ - private[ml] def set[T](param: Param[T], value: T): this.type = { + protected def set[T](param: Param[T], value: T): this.type = { require(param.parent.eq(this)) paramMap.put(param.asInstanceOf[Param[Any]], value) this @@ -167,7 +170,7 @@ trait Params extends Identifiable with Serializable { /** * Gets the value of a parameter in the embedded param map. */ - private[ml] def get[T](param: Param[T]): T = { + protected def get[T](param: Param[T]): T = { require(param.parent.eq(this)) paramMap(param) } @@ -176,9 +179,38 @@ trait Params extends Identifiable with Serializable { * Internal param map. */ protected val paramMap: ParamMap = ParamMap.empty + + /** + * Check whether the given schema contains an input column. + * @param colName Parameter name for the input column. + * @param dataType SQL DataType of the input column. + */ + protected def checkInputColumn(schema: StructType, colName: String, dataType: DataType): Unit = { + val actualDataType = schema(colName).dataType + require(actualDataType.equals(dataType), + s"Input column $colName must be of type $dataType" + + s" but was actually $actualDataType. Column param description: ${getParam(colName)}") + } + + protected def addOutputColumn( + schema: StructType, + colName: String, + dataType: DataType): StructType = { + if (colName.length == 0) return schema + val fieldNames = schema.fieldNames + require(!fieldNames.contains(colName), s"Prediction column $colName already exists.") + val outputFields = schema.fields ++ Seq(StructField(colName, dataType, nullable = false)) + StructType(outputFields) + } } -private[ml] object Params { +/** + * :: DeveloperApi :: + * + * Helper functionality for developers. + */ +@DeveloperApi +object Params { /** * Copies parameter values from the parent estimator to the child model it produced. @@ -304,6 +336,11 @@ class ParamMap private[ml] (private val map: mutable.Map[Param[Any], Any]) exten ParamPair(param, value) } } + + /** + * Number of param pairs in this set. + */ + def size: Int = map.size } object ParamMap { diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/sharedParams.scala b/mllib/src/main/scala/org/apache/spark/ml/param/sharedParams.scala index ef141d3eb2b06..bf336f3f7173b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/sharedParams.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/sharedParams.scala @@ -17,6 +17,10 @@ package org.apache.spark.ml.param +/* NOTE TO DEVELOPERS: + * If you add these parameter traits into your algorithm, you need to add a setter method as well. + */ + private[ml] trait HasRegParam extends Params { /** param for regularization parameter */ val regParam: DoubleParam = new DoubleParam(this, "regParam", "regularization parameter") @@ -42,12 +46,6 @@ private[ml] trait HasLabelCol extends Params { def getLabelCol: String = get(labelCol) } -private[ml] trait HasScoreCol extends Params { - /** param for score column name */ - val scoreCol: Param[String] = new Param(this, "scoreCol", "score column name", Some("score")) - def getScoreCol: String = get(scoreCol) -} - private[ml] trait HasPredictionCol extends Params { /** param for prediction column name */ val predictionCol: Param[String] = @@ -55,6 +53,22 @@ private[ml] trait HasPredictionCol extends Params { def getPredictionCol: String = get(predictionCol) } +private[ml] trait HasRawPredictionCol extends Params { + /** param for raw prediction column name */ + val rawPredictionCol: Param[String] = + new Param(this, "rawPredictionCol", "raw prediction (a.k.a. confidence) column name", + Some("rawPrediction")) + def getRawPredictionCol: String = get(rawPredictionCol) +} + +private[ml] trait HasProbabilityCol extends Params { + /** param for predicted class conditional probabilities column name */ + val probabilityCol: Param[String] = + new Param(this, "probabilityCol", "column name for predicted class conditional probabilities", + Some("probability")) + def getProbabilityCol: String = get(probabilityCol) +} + private[ml] trait HasThreshold extends Params { /** param for threshold in (binary) prediction */ val threshold: DoubleParam = new DoubleParam(this, "threshold", "threshold in prediction") diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala index e6abe9b404808..3ff7107221763 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala @@ -18,11 +18,10 @@ package org.apache.spark.ml.regression import org.apache.spark.annotation.AlphaComponent -import org.apache.spark.ml.LabeledPoint import org.apache.spark.ml.param.{Params, ParamMap, HasMaxIter, HasRegParam} -import org.apache.spark.mllib.linalg.{BLAS, Vector} +import org.apache.spark.mllib.linalg.{VectorUDT, BLAS, Vector} import org.apache.spark.mllib.regression.LinearRegressionWithSGD -import org.apache.spark.rdd.RDD +import org.apache.spark.sql._ import org.apache.spark.storage.StorageLevel /** @@ -36,7 +35,7 @@ private[regression] trait LinearRegressionParams extends RegressorParams * Logistic regression. */ @AlphaComponent -class LinearRegression extends Regressor[LinearRegression, LinearRegressionModel] +class LinearRegression extends Regressor[Vector, LinearRegression, LinearRegressionModel] with LinearRegressionParams { setRegParam(0.1) @@ -45,41 +44,36 @@ class LinearRegression extends Regressor[LinearRegression, LinearRegressionModel def setRegParam(value: Double): this.type = set(regParam, value) def setMaxIter(value: Int): this.type = set(maxIter, value) - /** - * Same as [[fit()]], but using strong types. - * NOTE: This does NOT support instance weights. - * @param dataset Training data. Instance weights are ignored. - * @param paramMap Parameters for training. - * These values override any specified in this Estimator's embedded ParamMap. - */ - override def train(dataset: RDD[LabeledPoint], paramMap: ParamMap): LinearRegressionModel = { + override def fit(dataset: SchemaRDD, paramMap: ParamMap): LinearRegressionModel = { + // Check schema + transformSchema(dataset.schema, paramMap, logging = true) + + // Extract columns from data. If dataset is persisted, do not persist oldDataset. + val oldDataset = extractLabeledPoints(dataset, paramMap) val map = this.paramMap ++ paramMap - val oldDataset = dataset.map { case LabeledPoint(label: Double, features: Vector, weight) => - org.apache.spark.mllib.regression.LabeledPoint(label, features) - } - val handlePersistence = oldDataset.getStorageLevel == StorageLevel.NONE + val handlePersistence = dataset.getStorageLevel == StorageLevel.NONE if (handlePersistence) { oldDataset.persist(StorageLevel.MEMORY_AND_DISK) } + + // Train model val lr = new LinearRegressionWithSGD() lr.optimizer .setRegParam(map(regParam)) .setNumIterations(map(maxIter)) val model = lr.run(oldDataset) val lrm = new LinearRegressionModel(this, map, model.weights, model.intercept) + if (handlePersistence) { oldDataset.unpersist() } + + // copy model params + Params.inheritValues(map, this, lrm) lrm } - /** - * Same as [[fit()]], but using strong types. - * NOTE: This does NOT support instance weights. - * @param dataset Training data. Instance weights are ignored. - */ - override def train(dataset: RDD[LabeledPoint]): LinearRegressionModel = - train(dataset, new ParamMap()) // Override documentation + override protected def featuresDataType: DataType = new VectorUDT } /** @@ -92,16 +86,18 @@ class LinearRegressionModel private[ml] ( override val fittingParamMap: ParamMap, val weights: Vector, val intercept: Double) - extends RegressionModel[LinearRegressionModel] + extends RegressionModel[Vector, LinearRegressionModel] with LinearRegressionParams { - override def predict(features: Vector): Double = { + override protected def predict(features: Vector): Double = { BLAS.dot(features, weights) + intercept } - private[ml] override def copy(): LinearRegressionModel = { + override protected def copy(): LinearRegressionModel = { val m = new LinearRegressionModel(parent, fittingParamMap, weights, intercept) Params.inheritValues(this.paramMap, this, m) m } + + override protected def featuresDataType: DataType = new VectorUDT } diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/Regressor.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/Regressor.scala index 78086fe16fd60..5f10344456a10 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/Regressor.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/Regressor.scala @@ -17,23 +17,31 @@ package org.apache.spark.ml.regression -import org.apache.spark.annotation.AlphaComponent +import org.apache.spark.annotation.{DeveloperApi, AlphaComponent} import org.apache.spark.ml.impl.estimator.{PredictionModel, Predictor, PredictorParams} -import org.apache.spark.mllib.linalg.Vector /** + * :: DeveloperApi :: * Params for regression. * Currently empty, but may add functionality later. */ -private[regression] trait RegressorParams extends PredictorParams +@DeveloperApi +trait RegressorParams extends PredictorParams /** * :: AlphaComponent :: * Single-label regression + * + * @tparam FeaturesType Type of input features. E.g., [[org.apache.spark.mllib.linalg.Vector]] + * @tparam Learner Concrete Estimator type + * @tparam M Concrete Model type */ @AlphaComponent -abstract class Regressor[Learner <: Regressor[Learner, M], M <: RegressionModel[M]] - extends Predictor[Learner, M] +abstract class Regressor[ + FeaturesType, + Learner <: Regressor[FeaturesType, Learner, M], + M <: RegressionModel[FeaturesType, M]] + extends Predictor[FeaturesType, Learner, M] with RegressorParams { // TODO: defaultEvaluator (follow-up PR) @@ -42,15 +50,21 @@ abstract class Regressor[Learner <: Regressor[Learner, M], M <: RegressionModel[ /** * :: AlphaComponent :: * Model produced by a [[Regressor]]. - * @tparam M Model type. + * + * @tparam FeaturesType Type of input features. E.g., [[org.apache.spark.mllib.linalg.Vector]] + * @tparam M Concrete Model type. */ @AlphaComponent -abstract class RegressionModel[M <: RegressionModel[M]] - extends PredictionModel[M] with RegressorParams { +abstract class RegressionModel[FeaturesType, M <: RegressionModel[FeaturesType, M]] + extends PredictionModel[FeaturesType, M] with RegressorParams { /** + * :: DeveloperApi :: + * * Predict real-valued label for the given features. + * This internal method is used to implement [[transform()]] and output [[predictionCol]]. */ - def predict(features: Vector): Double + @DeveloperApi + protected def predict(features: FeaturesType): Double } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala index 01f3f90577142..7bc18711e9cbb 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala @@ -91,7 +91,7 @@ sealed trait Vector extends Serializable { * User-defined type for [[Vector]] which allows easy interaction with SQL * via [[org.apache.spark.sql.SchemaRDD]]. */ -private[spark] class VectorUDT extends UserDefinedType[Vector] { +class VectorUDT extends UserDefinedType[Vector] { override def sqlType: StructType = { // type: 0 = sparse, 1 = dense @@ -147,6 +147,13 @@ private[spark] class VectorUDT extends UserDefinedType[Vector] { override def pyUDT: String = "pyspark.mllib.linalg.VectorUDT" override def userClass: Class[Vector] = classOf[Vector] + + override def equals(o: Any): Boolean = { + o match { + case v: VectorUDT => true + case _ => false + } + } } /**