From bcb9549f57d9f8cff5ee534e897bb2772cc966bf Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Thu, 29 Jan 2015 18:26:37 -0800 Subject: [PATCH] Fixed issues after rebasing from master (after move from SchemaRDD to DataFrame) --- .../spark/ml/classification/Classifier.scala | 30 +++++------ .../classification/LogisticRegression.scala | 51 +++++++------------ .../ProbabilisticClassifier.scala | 13 +++-- .../BinaryClassificationEvaluator.scala | 6 +-- .../spark/ml/impl/estimator/Predictor.scala | 20 ++++---- .../ml/regression/LinearRegression.scala | 6 +-- .../JavaLogisticRegressionSuite.java | 27 +++------- .../JavaLinearRegressionSuite.java | 18 ++----- .../LogisticRegressionSuite.scala | 27 ++++------ .../ml/regression/LinearRegressionSuite.scala | 15 ++---- 10 files changed, 79 insertions(+), 134 deletions(-) rename mllib/src/test/java/org/apache/spark/ml/{classification => regression}/JavaLinearRegressionSuite.java (83%) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala index 58a48bc5ecce3..ca791ae612132 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala @@ -21,9 +21,9 @@ import org.apache.spark.annotation.{DeveloperApi, AlphaComponent} import org.apache.spark.ml.impl.estimator.{PredictionModel, Predictor, PredictorParams} import org.apache.spark.ml.param.{Params, ParamMap, HasRawPredictionCol} import org.apache.spark.mllib.linalg.{Vector, VectorUDT} -import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.analysis.Star -import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.sql.Dsl._ +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.types.{DataType, DoubleType, StructType} /** @@ -95,7 +95,7 @@ abstract class ClassificationModel[FeaturesType, M <: ClassificationModel[Featur * @param paramMap additional parameters, overwrite embedded params * @return transformed dataset */ - override def transform(dataset: SchemaRDD, paramMap: ParamMap): SchemaRDD = { + override def transform(dataset: DataFrame, paramMap: ParamMap): DataFrame = { // This default implementation should be overridden as needed. // Check schema @@ -162,12 +162,9 @@ private[ml] object ClassificationModel { * @return (number of columns added, transformed dataset) */ private[ml] def transformColumnsImpl[FeaturesType]( - dataset: SchemaRDD, + dataset: DataFrame, model: ClassificationModel[FeaturesType, _], - map: ParamMap): (Int, SchemaRDD) = { - - import org.apache.spark.sql.catalyst.dsl._ - import dataset.sqlContext._ + map: ParamMap): (Int, DataFrame) = { // Output selected columns only. // This is a bit complicated since it tries to avoid repeated computation. @@ -176,22 +173,25 @@ private[ml] object ClassificationModel { if (map(model.rawPredictionCol) != "") { // output raw prediction val features2raw: FeaturesType => Vector = model.predictRaw - tmpData = tmpData.select(Star(None), - features2raw.call(map(model.featuresCol).attr) as map(model.rawPredictionCol)) + tmpData = tmpData.select($"*", + callUDF(features2raw, new VectorUDT, + tmpData(map(model.featuresCol))).as(map(model.rawPredictionCol))) numColsOutput += 1 if (map(model.predictionCol) != "") { val raw2pred: Vector => Double = (rawPred) => { rawPred.toArray.zipWithIndex.maxBy(_._1)._2 } - tmpData = tmpData.select(Star(None), - raw2pred.call(map(model.rawPredictionCol).attr) as map(model.predictionCol)) + tmpData = tmpData.select($"*", + callUDF(raw2pred, DoubleType, + tmpData(map(model.rawPredictionCol))).as(map(model.predictionCol))) numColsOutput += 1 } } else if (map(model.predictionCol) != "") { // output prediction val features2pred: FeaturesType => Double = model.predict - tmpData = tmpData.select(Star(None), - features2pred.call(map(model.featuresCol).attr) as map(model.predictionCol)) + tmpData = tmpData.select($"*", + callUDF(features2pred, DoubleType, + tmpData(map(model.featuresCol))).as(map(model.predictionCol))) numColsOutput += 1 } (numColsOutput, tmpData) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index 7dcc6b1bf9ecd..293062fc45915 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -20,12 +20,10 @@ package org.apache.spark.ml.classification import org.apache.spark.annotation.AlphaComponent import org.apache.spark.ml.param._ import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS -import org.apache.spark.mllib.linalg.{BLAS, Vector, Vectors} -import org.apache.spark.sql._ +import org.apache.spark.mllib.linalg.{VectorUDT, BLAS, Vector, Vectors} +import org.apache.spark.sql.DataFrame import org.apache.spark.sql.Dsl._ -import org.apache.spark.sql.types.{DoubleType, StructField, StructType} -import org.apache.spark.sql.catalyst.analysis.Star -import org.apache.spark.sql.catalyst.dsl._ +import org.apache.spark.sql.types.DoubleType import org.apache.spark.storage.StorageLevel @@ -55,10 +53,10 @@ class LogisticRegression def setMaxIter(value: Int): this.type = set(maxIter, value) def setThreshold(value: Double): this.type = set(threshold, value) - override protected def train(dataset: SchemaRDD, paramMap: ParamMap): LogisticRegressionModel = { + override protected def train(dataset: DataFrame, paramMap: ParamMap): LogisticRegressionModel = { // Extract columns from data. If dataset is persisted, do not persist oldDataset. val oldDataset = extractLabeledPoints(dataset, paramMap) - val handlePersistence = dataset.getStorageLevel == StorageLevel.NONE + val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE if (handlePersistence) { oldDataset.persist(StorageLevel.MEMORY_AND_DISK) } @@ -106,25 +104,10 @@ class LogisticRegressionModel private[ml] ( 1.0 / (1.0 + math.exp(-m)) } -/* override def transform(dataset: DataFrame, paramMap: ParamMap): DataFrame = { - transformSchema(dataset.schema, paramMap, logging = true) - val map = this.paramMap ++ paramMap - val scoreFunction = udf { v: Vector => - val margin = BLAS.dot(v, weights) - 1.0 / (1.0 + math.exp(-margin)) - } - val t = map(threshold) - val predictFunction: Double => Double = (score) => { if (score > t) 1.0 else 0.0 } - dataset - .select($"*", callUDF(scoreFunction, col(map(featuresCol))).as(map(scoreCol))) - .select($"*", callUDF(predictFunction, col(map(scoreCol))).as(map(predictionCol))) -*/ - override def transform(dataset: SchemaRDD, paramMap: ParamMap): SchemaRDD = { // Check schema transformSchema(dataset.schema, paramMap, logging = true) - import dataset.sqlContext._ val map = this.paramMap ++ paramMap // Output selected columns only. @@ -136,8 +119,8 @@ class LogisticRegressionModel private[ml] ( var numColsOutput = 0 if (map(rawPredictionCol) != "") { val features2raw: Vector => Vector = predictRaw - tmpData = tmpData.select(Star(None), - features2raw.call(map(featuresCol).attr) as map(rawPredictionCol)) + tmpData = tmpData.select($"*", + callUDF(features2raw, new VectorUDT, tmpData(map(featuresCol))).as(map(rawPredictionCol))) numColsOutput += 1 } if (map(probabilityCol) != "") { @@ -146,12 +129,12 @@ class LogisticRegressionModel private[ml] ( val prob1 = 1.0 / (1.0 + math.exp(-rawPreds(1))) Vectors.dense(1.0 - prob1, prob1) } - tmpData = tmpData.select(Star(None), - raw2prob.call(map(rawPredictionCol).attr) as map(probabilityCol)) + tmpData = tmpData.select($"*", + callUDF(raw2prob, new VectorUDT, tmpData(map(rawPredictionCol))).as(map(probabilityCol))) } else { val features2prob: Vector => Vector = predictProbabilities - tmpData = tmpData.select(Star(None), - features2prob.call(map(featuresCol).attr) as map(probabilityCol)) + tmpData = tmpData.select($"*", + callUDF(features2prob, new VectorUDT, tmpData(map(featuresCol))).as(map(probabilityCol))) } numColsOutput += 1 } @@ -161,19 +144,19 @@ class LogisticRegressionModel private[ml] ( val predict: Vector => Double = (probs) => { if (probs(1) > t) 1.0 else 0.0 } - tmpData = tmpData.select(Star(None), - predict.call(map(probabilityCol).attr) as map(predictionCol)) + tmpData = tmpData.select($"*", + callUDF(predict, DoubleType, tmpData(map(probabilityCol))).as(map(predictionCol))) } else if (map(rawPredictionCol) != "") { val predict: Vector => Double = (rawPreds) => { val prob1 = 1.0 / (1.0 + math.exp(-rawPreds(1))) if (prob1 > t) 1.0 else 0.0 } - tmpData = tmpData.select(Star(None), - predict.call(map(rawPredictionCol).attr) as map(predictionCol)) + tmpData = tmpData.select($"*", + callUDF(predict, DoubleType, tmpData(map(rawPredictionCol))).as(map(predictionCol))) } else { val predict: Vector => Double = this.predict - tmpData = tmpData.select(Star(None), - predict.call(map(featuresCol).attr) as map(predictionCol)) + tmpData = tmpData.select($"*", + callUDF(predict, DoubleType, tmpData(map(featuresCol))).as(map(predictionCol))) } numColsOutput += 1 } diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala index d7c75ef6e364b..330571cf7b91f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala @@ -20,8 +20,8 @@ package org.apache.spark.ml.classification import org.apache.spark.annotation.{AlphaComponent, DeveloperApi} import org.apache.spark.ml.param.{HasProbabilityCol, ParamMap, Params} import org.apache.spark.mllib.linalg.{Vector, VectorUDT} -import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.analysis.Star +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.Dsl._ import org.apache.spark.sql.types.{DataType, StructType} @@ -91,10 +91,8 @@ abstract class ProbabilisticClassificationModel[ * @param paramMap additional parameters, overwrite embedded params * @return transformed dataset */ - override def transform(dataset: SchemaRDD, paramMap: ParamMap): SchemaRDD = { + override def transform(dataset: DataFrame, paramMap: ParamMap): DataFrame = { // This default implementation should be overridden as needed. - import dataset.sqlContext._ - import org.apache.spark.sql.catalyst.dsl._ // Check schema transformSchema(dataset.schema, paramMap, logging = true) @@ -118,8 +116,9 @@ abstract class ProbabilisticClassificationModel[ val features2probs: FeaturesType => Vector = (features) => { tmpModel.predictProbabilities(features) } - outputData.select(Star(None), - features2probs.call(map(featuresCol).attr) as map(probabilityCol)) + outputData.select($"*", + callUDF(features2probs, new VectorUDT, + outputData(map(featuresCol))).as(map(probabilityCol))) } else { if (numColsOutput == 0) { this.logWarning(s"$uid: ProbabilisticClassificationModel.transform() was called as NOOP" + diff --git a/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala b/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala index aa1caff0c0931..f21a30627e540 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala @@ -18,11 +18,11 @@ package org.apache.spark.ml.evaluation import org.apache.spark.annotation.AlphaComponent -import org.apache.spark.ml._ +import org.apache.spark.ml.Evaluator import org.apache.spark.ml.param._ import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics -import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.mllib.linalg.{Vector, VectorUDT} +import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.types.DoubleType @@ -52,7 +52,7 @@ class BinaryClassificationEvaluator extends Evaluator with Params checkInputColumn(schema, map(labelCol), DoubleType) // TODO: When dataset metadata has been implemented, check rawPredictionCol vector length = 2. - val scoreAndLabels = dataset.select(map(rawPredictionCol).attr, map(labelCol).attr) + val scoreAndLabels = dataset.select(map(rawPredictionCol), map(labelCol)) .map { case Row(rawPrediction: Vector, label: Double) => (rawPrediction(1), label) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/impl/estimator/Predictor.scala b/mllib/src/main/scala/org/apache/spark/ml/impl/estimator/Predictor.scala index 5e6a8912dabb8..cdf458f584ae4 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/impl/estimator/Predictor.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/impl/estimator/Predictor.scala @@ -23,8 +23,8 @@ import org.apache.spark.ml.param._ import org.apache.spark.mllib.linalg.{VectorUDT, Vector} import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.rdd.RDD -import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.analysis.Star +import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.Dsl._ import org.apache.spark.sql.types.{DataType, DoubleType, StructType} @@ -85,7 +85,7 @@ abstract class Predictor[ def setFeaturesCol(value: String): Learner = set(featuresCol, value).asInstanceOf[Learner] def setPredictionCol(value: String): Learner = set(predictionCol, value).asInstanceOf[Learner] - override def fit(dataset: SchemaRDD, paramMap: ParamMap): M = { + override def fit(dataset: DataFrame, paramMap: ParamMap): M = { // This handles a few items such as schema validation. // Developers only need to implement train(). transformSchema(dataset.schema, paramMap, logging = true) @@ -108,7 +108,7 @@ abstract class Predictor[ * @return Fitted model */ @DeveloperApi - protected def train(dataset: SchemaRDD, paramMap: ParamMap): M + protected def train(dataset: DataFrame, paramMap: ParamMap): M /** * :: DeveloperApi :: @@ -131,10 +131,9 @@ abstract class Predictor[ * Extract [[labelCol]] and [[featuresCol]] from the given dataset, * and put it in an RDD with strong types. */ - protected def extractLabeledPoints(dataset: SchemaRDD, paramMap: ParamMap): RDD[LabeledPoint] = { - import dataset.sqlContext._ + protected def extractLabeledPoints(dataset: DataFrame, paramMap: ParamMap): RDD[LabeledPoint] = { val map = this.paramMap ++ paramMap - dataset.select(map(labelCol).attr, map(featuresCol).attr) + dataset.select(map(labelCol), map(featuresCol)) .map { case Row(label: Double, features: Vector) => LabeledPoint(label, features) } @@ -184,10 +183,8 @@ abstract class PredictionModel[FeaturesType, M <: PredictionModel[FeaturesType, * @param paramMap additional parameters, overwrite embedded params * @return transformed dataset with [[predictionCol]] of type [[Double]] */ - override def transform(dataset: SchemaRDD, paramMap: ParamMap): SchemaRDD = { + override def transform(dataset: DataFrame, paramMap: ParamMap): DataFrame = { // This default implementation should be overridden as needed. - import org.apache.spark.sql.catalyst.dsl._ - import dataset.sqlContext._ // Check schema transformSchema(dataset.schema, paramMap, logging = true) @@ -206,7 +203,8 @@ abstract class PredictionModel[FeaturesType, M <: PredictionModel[FeaturesType, val pred: FeaturesType => Double = (features) => { tmpModel.predict(features) } - dataset.select(Star(None), pred.call(map(featuresCol).attr) as map(predictionCol)) + dataset.select($"*", + callUDF(pred, DoubleType, dataset(map(featuresCol))).as(map(predictionCol))) } else { this.logWarning(s"$uid: Predictor.transform() was called as NOOP" + " since no output columns were set.") diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala index 29080cb0100fd..d5a7bdafcb623 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala @@ -21,7 +21,7 @@ import org.apache.spark.annotation.AlphaComponent import org.apache.spark.ml.param.{Params, ParamMap, HasMaxIter, HasRegParam} import org.apache.spark.mllib.linalg.{BLAS, Vector} import org.apache.spark.mllib.regression.LinearRegressionWithSGD -import org.apache.spark.sql._ +import org.apache.spark.sql.DataFrame import org.apache.spark.storage.StorageLevel @@ -47,10 +47,10 @@ class LinearRegression extends Regressor[Vector, LinearRegression, LinearRegress def setRegParam(value: Double): this.type = set(regParam, value) def setMaxIter(value: Int): this.type = set(maxIter, value) - override protected def train(dataset: SchemaRDD, paramMap: ParamMap): LinearRegressionModel = { + override protected def train(dataset: DataFrame, paramMap: ParamMap): LinearRegressionModel = { // Extract columns from data. If dataset is persisted, do not persist oldDataset. val oldDataset = extractLabeledPoints(dataset, paramMap) - val handlePersistence = dataset.getStorageLevel == StorageLevel.NONE + val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE if (handlePersistence) { oldDataset.persist(StorageLevel.MEMORY_AND_DISK) } diff --git a/mllib/src/test/java/org/apache/spark/ml/classification/JavaLogisticRegressionSuite.java b/mllib/src/test/java/org/apache/spark/ml/classification/JavaLogisticRegressionSuite.java index 11acaa3a0d357..26284023b0f69 100644 --- a/mllib/src/test/java/org/apache/spark/ml/classification/JavaLogisticRegressionSuite.java +++ b/mllib/src/test/java/org/apache/spark/ml/classification/JavaLogisticRegressionSuite.java @@ -19,7 +19,6 @@ import java.io.Serializable; import java.lang.Math; -import java.util.ArrayList; import java.util.List; import org.junit.After; @@ -28,12 +27,11 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.mllib.regression.LabeledPoint; -import org.apache.spark.sql.DataFrame; -import org.apache.spark.sql.SQLContext; import static org.apache.spark.mllib.classification.LogisticRegressionSuite.generateLogisticInputAsList; import org.apache.spark.mllib.linalg.Vector; import org.apache.spark.mllib.regression.LabeledPoint; +import org.apache.spark.sql.DataFrame; +import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.Row; @@ -50,11 +48,7 @@ public class JavaLogisticRegressionSuite implements Serializable { public void setUp() { jsc = new JavaSparkContext("local", "JavaLogisticRegressionSuite"); jsql = new SQLContext(jsc); - List points = new ArrayList(); - for (org.apache.spark.mllib.regression.LabeledPoint lp: - generateLogisticInputAsList(1.0, 1.0, 100, 42)) { - points.add(new LabeledPoint(lp.label(), lp.features())); - } + List points = generateLogisticInputAsList(1.0, 1.0, 100, 42); datasetRDD = jsc.parallelize(points, 2); dataset = jsql.applySchema(datasetRDD, LabeledPoint.class); dataset.registerTempTable("dataset"); @@ -98,21 +92,14 @@ public void logisticRegressionWithSetters() { // Modify model params, and check that the params worked. model.setThreshold(1.0); model.transform(dataset).registerTempTable("predAllZero"); - SchemaRDD predAllZero = jsql.sql("SELECT prediction, myProbability FROM predAllZero"); + DataFrame predAllZero = jsql.sql("SELECT prediction, myProbability FROM predAllZero"); for (Row r: predAllZero.collectAsList()) { assert(r.getDouble(0) == 0.0); } // Call transform with params, and check that the params worked. - /* TODO: USE THIS - model.transform(dataset, model.threshold().w(0.8)) // overwrite threshold - .registerTempTable("prediction"); - DataFrame predictions = jsql.sql("SELECT label, score, prediction FROM prediction"); - predictions.collectAsList(); - */ - model.transform(dataset, model.threshold().w(0.0), model.probabilityCol().w("myProb")) .registerTempTable("predNotAllZero"); - SchemaRDD predNotAllZero = jsql.sql("SELECT prediction, myProb FROM predNotAllZero"); + DataFrame predNotAllZero = jsql.sql("SELECT prediction, myProb FROM predNotAllZero"); boolean foundNonZero = false; for (Row r: predNotAllZero.collectAsList()) { if (r.getDouble(0) != 0.0) foundNonZero = true; @@ -137,7 +124,7 @@ public void logisticRegressionPredictorClassifierMethods() { assert(model.numClasses() == 2); model.transform(dataset).registerTempTable("transformed"); - SchemaRDD trans1 = jsql.sql("SELECT rawPrediction, probability FROM transformed"); + DataFrame trans1 = jsql.sql("SELECT rawPrediction, probability FROM transformed"); for (Row row: trans1.collect()) { Vector raw = (Vector)row.get(0); Vector prob = (Vector)row.get(1); @@ -148,7 +135,7 @@ public void logisticRegressionPredictorClassifierMethods() { assert(Math.abs(prob.apply(0) - (1.0 - probFromRaw1)) < eps); } - SchemaRDD trans2 = jsql.sql("SELECT prediction, probability FROM transformed"); + DataFrame trans2 = jsql.sql("SELECT prediction, probability FROM transformed"); for (Row row: trans2.collect()) { double pred = row.getDouble(0); Vector prob = (Vector)row.get(1); diff --git a/mllib/src/test/java/org/apache/spark/ml/classification/JavaLinearRegressionSuite.java b/mllib/src/test/java/org/apache/spark/ml/regression/JavaLinearRegressionSuite.java similarity index 83% rename from mllib/src/test/java/org/apache/spark/ml/classification/JavaLinearRegressionSuite.java rename to mllib/src/test/java/org/apache/spark/ml/regression/JavaLinearRegressionSuite.java index d918fc7caf6a0..5bd616e74d86c 100644 --- a/mllib/src/test/java/org/apache/spark/ml/classification/JavaLinearRegressionSuite.java +++ b/mllib/src/test/java/org/apache/spark/ml/regression/JavaLinearRegressionSuite.java @@ -15,10 +15,9 @@ * limitations under the License. */ -package org.apache.spark.ml.classification; +package org.apache.spark.ml.regression; import java.io.Serializable; -import java.util.ArrayList; import java.util.List; import org.junit.After; @@ -27,32 +26,25 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.ml.regression.LinearRegression; -import org.apache.spark.ml.regression.LinearRegressionModel; import static org.apache.spark.mllib.classification.LogisticRegressionSuite .generateLogisticInputAsList; import org.apache.spark.mllib.regression.LabeledPoint; +import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.SQLContext; -import org.apache.spark.sql.SchemaRDD; public class JavaLinearRegressionSuite implements Serializable { private transient JavaSparkContext jsc; private transient SQLContext jsql; - private transient SchemaRDD dataset; + private transient DataFrame dataset; private transient JavaRDD datasetRDD; - private double eps = 1e-5; @Before public void setUp() { jsc = new JavaSparkContext("local", "JavaLinearRegressionSuite"); jsql = new SQLContext(jsc); - List points = new ArrayList(); - for (org.apache.spark.mllib.regression.LabeledPoint lp: - generateLogisticInputAsList(1.0, 1.0, 100, 42)) { - points.add(new LabeledPoint(lp.label(), lp.features())); - } + List points = generateLogisticInputAsList(1.0, 1.0, 100, 42); datasetRDD = jsc.parallelize(points, 2); dataset = jsql.applySchema(datasetRDD, LabeledPoint.class); dataset.registerTempTable("dataset"); @@ -70,7 +62,7 @@ public void linearRegressionDefaultParams() { assert(lr.getLabelCol().equals("label")); LinearRegressionModel model = lr.fit(dataset); model.transform(dataset).registerTempTable("prediction"); - SchemaRDD predictions = jsql.sql("SELECT label, prediction FROM prediction"); + DataFrame predictions = jsql.sql("SELECT label, prediction FROM prediction"); predictions.collect(); // Check defaults assert(model.getFeaturesCol().equals("features")); diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala index f412622572c1b..b3d1bfcfbee0f 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala @@ -42,14 +42,18 @@ class LogisticRegressionSuite extends FunSuite with MLlibTestSparkContext { test("logistic regression: default params") { val lr = new LogisticRegression assert(lr.getLabelCol == "label") + assert(lr.getFeaturesCol == "features") + assert(lr.getPredictionCol == "prediction") + assert(lr.getRawPredictionCol == "rawPrediction") + assert(lr.getProbabilityCol == "probability") val model = lr.fit(dataset) model.transform(dataset) - .select('label, 'probability, 'prediction) + .select("label", "probability", "prediction", "rawPrediction") .collect() - // Check defaults assert(model.getThreshold === 0.5) assert(model.getFeaturesCol == "features") assert(model.getPredictionCol == "prediction") + assert(model.getRawPredictionCol == "rawPrediction") assert(model.getProbabilityCol == "probability") } @@ -61,16 +65,6 @@ class LogisticRegressionSuite extends FunSuite with MLlibTestSparkContext { .setThreshold(0.6) .setProbabilityCol("myProbability") val model = lr.fit(dataset) - model.transform(dataset, model.threshold -> 0.8) // overwrite threshold - .select("label", "score", "prediction") - .collect() - } - - test("logistic regression fit and transform with varargs") { - val lr = new LogisticRegression - val model = lr.fit(dataset, lr.maxIter -> 10, lr.regParam -> 1.0) - model.transform(dataset, model.threshold -> 0.8, model.scoreCol -> "probability") - .select("label", "probability", "prediction") assert(model.fittingParamMap.get(lr.maxIter) === Some(10)) assert(model.fittingParamMap.get(lr.regParam) === Some(1.0)) assert(model.fittingParamMap.get(lr.threshold) === Some(0.6)) @@ -79,7 +73,7 @@ class LogisticRegressionSuite extends FunSuite with MLlibTestSparkContext { // Modify model params, and check that the params worked. model.setThreshold(1.0) val predAllZero = model.transform(dataset) - .select('prediction, 'myProbability) + .select("prediction", "myProbability") .collect() .map { case Row(pred: Double, prob: Vector) => pred } assert(predAllZero.forall(_ === 0), @@ -88,7 +82,7 @@ class LogisticRegressionSuite extends FunSuite with MLlibTestSparkContext { // Call transform with params, and check that the params worked. val predNotAllZero = model.transform(dataset, model.threshold -> 0.0, model.probabilityCol -> "myProb") - .select('prediction, 'myProb) + .select("prediction", "myProb") .collect() .map { case Row(pred: Double, prob: Vector) => pred } assert(predNotAllZero.exists(_ !== 0.0)) @@ -105,7 +99,6 @@ class LogisticRegressionSuite extends FunSuite with MLlibTestSparkContext { test("logistic regression: Predictor, Classifier methods") { val sqlContext = this.sqlContext - import sqlContext._ val lr = new LogisticRegression val model = lr.fit(dataset) @@ -115,7 +108,7 @@ class LogisticRegressionSuite extends FunSuite with MLlibTestSparkContext { val results = model.transform(dataset) // Compare rawPrediction with probability - results.select('rawPrediction, 'probability).collect().map { + results.select("rawPrediction", "probability").collect().map { case Row(raw: Vector, prob: Vector) => assert(raw.size === 2) assert(prob.size === 2) @@ -125,7 +118,7 @@ class LogisticRegressionSuite extends FunSuite with MLlibTestSparkContext { } // Compare prediction with probability - results.select('prediction, 'probability).collect().map { + results.select("prediction", "probability").collect().map { case Row(pred: Double, prob: Vector) => val predFromProb = prob.toArray.zipWithIndex.maxBy(_._1)._2 assert(pred == predFromProb) diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala index b5876cc96c2b8..bbb44c3e2dfc2 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala @@ -20,32 +20,27 @@ package org.apache.spark.ml.regression import org.scalatest.FunSuite import org.apache.spark.mllib.classification.LogisticRegressionSuite.generateLogisticInput -import org.apache.spark.mllib.linalg.Vector -import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.util.MLlibTestSparkContext -import org.apache.spark.mllib.util.TestingUtils._ -import org.apache.spark.sql.{Row, SQLContext, SchemaRDD} +import org.apache.spark.sql.{DataFrame, SQLContext} class LinearRegressionSuite extends FunSuite with MLlibTestSparkContext { @transient var sqlContext: SQLContext = _ - @transient var dataset: SchemaRDD = _ + @transient var dataset: DataFrame = _ override def beforeAll(): Unit = { super.beforeAll() sqlContext = new SQLContext(sc) - dataset = sqlContext.createSchemaRDD( + dataset = sqlContext.createDataFrame( sc.parallelize(generateLogisticInput(1.0, 1.0, nPoints = 100, seed = 42), 2)) } test("linear regression: default params") { - val sqlContext = this.sqlContext - import sqlContext._ val lr = new LinearRegression assert(lr.getLabelCol == "label") val model = lr.fit(dataset) model.transform(dataset) - .select('label, 'prediction) + .select("label", "prediction") .collect() // Check defaults assert(model.getFeaturesCol == "features") @@ -54,8 +49,6 @@ class LinearRegressionSuite extends FunSuite with MLlibTestSparkContext { test("linear regression with setters") { // Set params, train, and check as many as we can. - val sqlContext = this.sqlContext - import sqlContext._ val lr = new LinearRegression() .setMaxIter(10) .setRegParam(1.0)