diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala index 0aadd476cba63..a2893f78e0fec 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala @@ -101,7 +101,7 @@ object CrossValidatorExample { // Make predictions on test documents. cvModel uses the best model found (lrModel). cvModel.transform(test) - .select('id, 'text, 'probability, 'prediction) + .select("id", "text", "probability", "prediction") .collect() .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) => println(s"($id, $text) --> prob=$prob, prediction=$prediction") diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/DeveloperApiExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/DeveloperApiExample.scala index 13a860a86afb0..6f68020bf9ee2 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/DeveloperApiExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/DeveloperApiExample.scala @@ -22,7 +22,7 @@ import org.apache.spark.ml.classification.{Classifier, ClassifierParams, Classif import org.apache.spark.ml.param.{Params, IntParam, ParamMap} import org.apache.spark.mllib.linalg.{BLAS, Vector, Vectors} import org.apache.spark.mllib.regression.LabeledPoint -import org.apache.spark.sql.{SchemaRDD, Row, SQLContext} +import org.apache.spark.sql.{DataFrame, Row, SQLContext} /** @@ -68,13 +68,15 @@ object DeveloperApiExample { // Make predictions on test data. val sumPredictions: Double = model.transform(test) - .select('features, 'label, 'prediction) + .select("features", "label", "prediction") .collect() .map { case Row(features: Vector, label: Double, prediction: Double) => prediction }.sum assert(sumPredictions == 0.0, "MyLogisticRegression predicted something other than 0, even though all weights are 0!") + + sc.stop() } } @@ -113,7 +115,7 @@ private class MyLogisticRegression // This method is used by fit() override protected def train( - dataset: SchemaRDD, + dataset: DataFrame, paramMap: ParamMap): MyLogisticRegressionModel = { // Extract columns from data using helper method. val oldDataset = extractLabeledPoints(dataset, paramMap) diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/SimpleParamsExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/SimpleParamsExample.scala index ed969f6b64fdc..79ce9fdf7294c 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/SimpleParamsExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/SimpleParamsExample.scala @@ -91,7 +91,7 @@ object SimpleParamsExample { // Note that model2.transform() outputs a 'myProbability' column instead of the usual // 'probability' column since we renamed the lr.probabilityCol parameter previously. model2.transform(test) - .select('features, 'label, 'myProbability, 'prediction) + .select("features", "label", "myProbability", "prediction") .collect() .foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) => println("($features, $label) -> prob=$prob, prediction=$prediction") diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/SimpleTextClassificationPipeline.scala b/examples/src/main/scala/org/apache/spark/examples/ml/SimpleTextClassificationPipeline.scala index ab93c4847195e..968cb292120d8 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/SimpleTextClassificationPipeline.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/SimpleTextClassificationPipeline.scala @@ -80,7 +80,7 @@ object SimpleTextClassificationPipeline { // Make predictions on test documents. model.transform(test) - .select('id, 'text, 'probability, 'prediction) + .select("id", "text", "probability", "prediction") .collect() .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) => println("($id, $text) --> prob=$prob, prediction=$prediction") diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala index ca791ae612132..40b49e37e076d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala @@ -175,15 +175,14 @@ private[ml] object ClassificationModel { val features2raw: FeaturesType => Vector = model.predictRaw tmpData = tmpData.select($"*", callUDF(features2raw, new VectorUDT, - tmpData(map(model.featuresCol))).as(map(model.rawPredictionCol))) + col(map(model.featuresCol))).as(map(model.rawPredictionCol))) numColsOutput += 1 if (map(model.predictionCol) != "") { val raw2pred: Vector => Double = (rawPred) => { rawPred.toArray.zipWithIndex.maxBy(_._1)._2 } tmpData = tmpData.select($"*", - callUDF(raw2pred, DoubleType, - tmpData(map(model.rawPredictionCol))).as(map(model.predictionCol))) + callUDF(raw2pred, col(map(model.rawPredictionCol))).as(map(model.predictionCol))) numColsOutput += 1 } } else if (map(model.predictionCol) != "") { @@ -191,7 +190,7 @@ private[ml] object ClassificationModel { val features2pred: FeaturesType => Double = model.predict tmpData = tmpData.select($"*", callUDF(features2pred, DoubleType, - tmpData(map(model.featuresCol))).as(map(model.predictionCol))) + col(map(model.featuresCol))).as(map(model.predictionCol))) numColsOutput += 1 } (numColsOutput, tmpData) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index 293062fc45915..4492c40aa2bfc 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -20,10 +20,9 @@ package org.apache.spark.ml.classification import org.apache.spark.annotation.AlphaComponent import org.apache.spark.ml.param._ import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS -import org.apache.spark.mllib.linalg.{VectorUDT, BLAS, Vector, Vectors} +import org.apache.spark.mllib.linalg.{BLAS, Vector, Vectors} import org.apache.spark.sql.DataFrame import org.apache.spark.sql.Dsl._ -import org.apache.spark.sql.types.DoubleType import org.apache.spark.storage.StorageLevel @@ -120,7 +119,7 @@ class LogisticRegressionModel private[ml] ( if (map(rawPredictionCol) != "") { val features2raw: Vector => Vector = predictRaw tmpData = tmpData.select($"*", - callUDF(features2raw, new VectorUDT, tmpData(map(featuresCol))).as(map(rawPredictionCol))) + callUDF(features2raw, col(map(featuresCol))).as(map(rawPredictionCol))) numColsOutput += 1 } if (map(probabilityCol) != "") { @@ -130,11 +129,11 @@ class LogisticRegressionModel private[ml] ( Vectors.dense(1.0 - prob1, prob1) } tmpData = tmpData.select($"*", - callUDF(raw2prob, new VectorUDT, tmpData(map(rawPredictionCol))).as(map(probabilityCol))) + callUDF(raw2prob, col(map(rawPredictionCol))).as(map(probabilityCol))) } else { val features2prob: Vector => Vector = predictProbabilities tmpData = tmpData.select($"*", - callUDF(features2prob, new VectorUDT, tmpData(map(featuresCol))).as(map(probabilityCol))) + callUDF(features2prob, col(map(featuresCol))).as(map(probabilityCol))) } numColsOutput += 1 } @@ -145,18 +144,18 @@ class LogisticRegressionModel private[ml] ( if (probs(1) > t) 1.0 else 0.0 } tmpData = tmpData.select($"*", - callUDF(predict, DoubleType, tmpData(map(probabilityCol))).as(map(predictionCol))) + callUDF(predict, col(map(probabilityCol))).as(map(predictionCol))) } else if (map(rawPredictionCol) != "") { val predict: Vector => Double = (rawPreds) => { val prob1 = 1.0 / (1.0 + math.exp(-rawPreds(1))) if (prob1 > t) 1.0 else 0.0 } tmpData = tmpData.select($"*", - callUDF(predict, DoubleType, tmpData(map(rawPredictionCol))).as(map(predictionCol))) + callUDF(predict, col(map(rawPredictionCol))).as(map(predictionCol))) } else { val predict: Vector => Double = this.predict tmpData = tmpData.select($"*", - callUDF(predict, DoubleType, tmpData(map(featuresCol))).as(map(predictionCol))) + callUDF(predict, col(map(featuresCol))).as(map(predictionCol))) } numColsOutput += 1 } diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala index 330571cf7b91f..f7b8afdc9d380 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala @@ -117,8 +117,7 @@ abstract class ProbabilisticClassificationModel[ tmpModel.predictProbabilities(features) } outputData.select($"*", - callUDF(features2probs, new VectorUDT, - outputData(map(featuresCol))).as(map(probabilityCol))) + callUDF(features2probs, new VectorUDT, col(map(featuresCol))).as(map(probabilityCol))) } else { if (numColsOutput == 0) { this.logWarning(s"$uid: ProbabilisticClassificationModel.transform() was called as NOOP" + diff --git a/mllib/src/main/scala/org/apache/spark/ml/impl/estimator/Predictor.scala b/mllib/src/main/scala/org/apache/spark/ml/impl/estimator/Predictor.scala index cdf458f584ae4..59a4e44b13fda 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/impl/estimator/Predictor.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/impl/estimator/Predictor.scala @@ -203,8 +203,7 @@ abstract class PredictionModel[FeaturesType, M <: PredictionModel[FeaturesType, val pred: FeaturesType => Double = (features) => { tmpModel.predict(features) } - dataset.select($"*", - callUDF(pred, DoubleType, dataset(map(featuresCol))).as(map(predictionCol))) + dataset.select($"*", callUDF(pred, DoubleType, col(map(featuresCol))).as(map(predictionCol))) } else { this.logWarning(s"$uid: Predictor.transform() was called as NOOP" + " since no output columns were set.")