From d7a37bcaf123389fb0828eefb92659c6d9cb3460 Mon Sep 17 00:00:00 2001 From: Oleg Sidorkin Date: Sun, 10 May 2015 01:31:34 -0700 Subject: [PATCH 1/8] [SPARK-7345][SQL] Spark cannot detect renamed columns using JDBC connector Issue appears when one tries to create DataFrame using sqlContext.load("jdbc"...) statement when "dbtable" contains query with renamed columns. If original column is used in SQL query once the resulting DataFrame will contain non-renamed column. If original column is used in SQL query several times with different aliases, sqlContext.load will fail. Original implementation of JDBCRDD.resolveTable uses getColumnName to detect column names in RDD schema. Suggested implementation uses getColumnLabel to handle column renames in SQL statement which is aware of SQL "AS" statement. Readings: http://stackoverflow.com/questions/4271152/getcolumnlabel-vs-getcolumnname http://stackoverflow.com/questions/12259829/jdbc-getcolumnname-getcolumnlabel-db2 Official documentation unfortunately a bit misleading in definition of "suggested title" purpose however clearly defines behavior of AS keyword in SQL statement. http://docs.oracle.com/javase/7/docs/api/java/sql/ResultSetMetaData.html getColumnLabel - Gets the designated column's suggested title for use in printouts and displays. The suggested title is usually specified by the SQL AS clause. If a SQL AS is not specified, the value returned from getColumnLabel will be the same as the value returned by the getColumnName method. Author: Oleg Sidorkin Closes #6032 from osidorkin/master and squashes the following commits: 10fc44b [Oleg Sidorkin] [SPARK-7345][SQL] Regression test for JDBCSuite (resolved scala style test error) 2aaf6f7 [Oleg Sidorkin] [SPARK-7345][SQL] Regression test for JDBCSuite (renamed fields in JDBC query) b7d5b22 [Oleg Sidorkin] [SPARK-7345][SQL] Regression test for JDBCSuite 09559a0 [Oleg Sidorkin] [SPARK-7345][SQL] Spark cannot detect renamed columns using JDBC connector --- .../org/apache/spark/sql/jdbc/JDBCRDD.scala | 2 +- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 16 ++++++++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala index 1a5083dbe0f61..a03ade3881f59 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala @@ -109,7 +109,7 @@ private[sql] object JDBCRDD extends Logging { val fields = new Array[StructField](ncols) var i = 0 while (i < ncols) { - val columnName = rsmd.getColumnName(i + 1) + val columnName = rsmd.getColumnLabel(i + 1) val dataType = rsmd.getColumnType(i + 1) val typeName = rsmd.getColumnTypeName(i + 1) val fieldSize = rsmd.getPrecision(i + 1) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 021affafe36a6..2abfe7f167f77 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -204,6 +204,22 @@ class JDBCSuite extends FunSuite with BeforeAndAfter { assert(ids(2) === 3) } + test("Register JDBC query with renamed fields") { + // Regression test for bug SPARK-7345 + sql( + s""" + |CREATE TEMPORARY TABLE renamed + |USING org.apache.spark.sql.jdbc + |OPTIONS (url '$url', dbtable '(select NAME as NAME1, NAME as NAME2 from TEST.PEOPLE)', + |user 'testUser', password 'testPass') + """.stripMargin.replaceAll("\n", " ")) + + val df = sql("SELECT * FROM renamed") + assert(df.schema.fields.size == 2) + assert(df.schema.fields(0).name == "NAME1") + assert(df.schema.fields(1).name == "NAME2") + } + test("Basic API") { assert(TestSQLContext.jdbc(urlWithUserAndPass, "TEST.PEOPLE").collect().size === 3) } From 6bf9352fa5d740d01ffdafbbb23d9732752a8d87 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sun, 10 May 2015 21:26:36 +0800 Subject: [PATCH 2/8] [MINOR] [SQL] Fixes variable name typo [Review on Reviewable](https://reviewable.io/reviews/apache/spark/6038) Author: Cheng Lian Closes #6038 from liancheng/fix-typo and squashes the following commits: 572c2a4 [Cheng Lian] Fixes variable name typo --- .../apache/spark/sql/sources/CreateTableAsSelectSuite.scala | 2 +- .../scala/org/apache/spark/sql/sources/DDLTestSuite.scala | 2 +- .../scala/org/apache/spark/sql/sources/DataSourceTest.scala | 4 ++-- .../org/apache/spark/sql/sources/FilteredScanSuite.scala | 2 +- .../test/scala/org/apache/spark/sql/sources/InsertSuite.scala | 2 +- .../scala/org/apache/spark/sql/sources/PrunedScanSuite.scala | 2 +- .../scala/org/apache/spark/sql/sources/SaveLoadSuite.scala | 2 +- .../scala/org/apache/spark/sql/sources/TableScanSuite.scala | 2 +- 8 files changed, 9 insertions(+), 9 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala index 20a23b3bd6aa9..54f2f3cdec298 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala @@ -26,7 +26,7 @@ import org.apache.spark.util.Utils class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll { - import caseInsensisitiveContext._ + import caseInsensitiveContext._ var path: File = null diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala index ca25751b9583d..6664e8d64c13a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala @@ -64,7 +64,7 @@ case class SimpleDDLScan(from: Int, to: Int, table: String)(@transient val sqlCo } class DDLTestSuite extends DataSourceTest { - import caseInsensisitiveContext._ + import caseInsensitiveContext._ before { sql( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala index 9d3090c19b4e8..24ed665c67d2e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala @@ -24,7 +24,7 @@ import org.scalatest.BeforeAndAfter abstract class DataSourceTest extends QueryTest with BeforeAndAfter { // We want to test some edge cases. - implicit val caseInsensisitiveContext = new SQLContext(TestSQLContext.sparkContext) + implicit val caseInsensitiveContext = new SQLContext(TestSQLContext.sparkContext) - caseInsensisitiveContext.setConf(SQLConf.CASE_SENSITIVE, "false") + caseInsensitiveContext.setConf(SQLConf.CASE_SENSITIVE, "false") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala index cb5e5147ff189..cce747e7dbf64 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala @@ -97,7 +97,7 @@ object FiltersPushed { class FilteredScanSuite extends DataSourceTest { - import caseInsensisitiveContext._ + import caseInsensitiveContext._ before { sql( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index 50629ea4dc066..d1d427e1790bd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -26,7 +26,7 @@ import org.apache.spark.util.Utils class InsertSuite extends DataSourceTest with BeforeAndAfterAll { - import caseInsensisitiveContext._ + import caseInsensitiveContext._ var path: File = null diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala index 6a1ddf2f8e98b..c2bc52e2120c1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala @@ -52,7 +52,7 @@ case class SimplePrunedScan(from: Int, to: Int)(@transient val sqlContext: SQLCo } class PrunedScanSuite extends DataSourceTest { - import caseInsensisitiveContext._ + import caseInsensitiveContext._ before { sql( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala index cb287ba85c1f8..6567d1acd7644 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala @@ -27,7 +27,7 @@ import org.apache.spark.util.Utils class SaveLoadSuite extends DataSourceTest with BeforeAndAfterAll { - import caseInsensisitiveContext._ + import caseInsensitiveContext._ var originalDefaultSource: String = null diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala index 3b47b8adf313b..77af04a491742 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala @@ -88,7 +88,7 @@ case class AllDataTypesScan( } class TableScanSuite extends DataSourceTest { - import caseInsensisitiveContext._ + import caseInsensitiveContext._ var tableWithSchemaExpected = (1 to 10).map { i => Row( From 3038443e58b9320c56f7785d9e36d4f85a563e6b Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Sun, 10 May 2015 13:29:27 -0700 Subject: [PATCH 3/8] [SPARK-7431] [ML] [PYTHON] Made CrossValidatorModel call parent init in PySpark Fixes bug with PySpark cvModel not having UID Also made small PySpark fixes: Evaluator should inherit from Params. MockModel should inherit from Model. CC: mengxr Author: Joseph K. Bradley Closes #5968 from jkbradley/pyspark-cv-uid and squashes the following commits: 57f13cd [Joseph K. Bradley] Made CrossValidatorModel call parent init in PySpark --- python/pyspark/ml/pipeline.py | 2 +- python/pyspark/ml/tests.py | 4 ++-- python/pyspark/ml/tuning.py | 1 + 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/python/pyspark/ml/pipeline.py b/python/pyspark/ml/pipeline.py index c1b2077c985cf..fdbae06405f6a 100644 --- a/python/pyspark/ml/pipeline.py +++ b/python/pyspark/ml/pipeline.py @@ -179,7 +179,7 @@ def transform(self, dataset, params={}): return dataset -class Evaluator(object): +class Evaluator(Params): """ Base class for evaluators that compute metrics from predictions. """ diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index 3a42bcf723894..75bb5d749ca87 100644 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -34,7 +34,7 @@ from pyspark.sql import DataFrame from pyspark.ml.param import Param from pyspark.ml.param.shared import HasMaxIter, HasInputCol -from pyspark.ml.pipeline import Transformer, Estimator, Pipeline +from pyspark.ml.pipeline import Estimator, Model, Pipeline, Transformer class MockDataset(DataFrame): @@ -77,7 +77,7 @@ def fit(self, dataset, params={}): return model -class MockModel(MockTransformer, Transformer): +class MockModel(MockTransformer, Model): def __init__(self): super(MockModel, self).__init__() diff --git a/python/pyspark/ml/tuning.py b/python/pyspark/ml/tuning.py index 28e3727f2c064..86f4dc7368be0 100644 --- a/python/pyspark/ml/tuning.py +++ b/python/pyspark/ml/tuning.py @@ -236,6 +236,7 @@ class CrossValidatorModel(Model): """ def __init__(self, bestModel): + super(CrossValidatorModel, self).__init__() #: best model from cross validation self.bestModel = bestModel From 8c07c75c9831d6c34f69fe840edb6470d4dfdfef Mon Sep 17 00:00:00 2001 From: "Kirill A. Korinskiy" Date: Sun, 10 May 2015 13:34:00 -0700 Subject: [PATCH 4/8] [SPARK-5521] PCA wrapper for easy transform vectors I implement a simple PCA wrapper for easy transform of vectors by PCA for example LabeledPoint or another complicated structure. Example of usage: ``` import org.apache.spark.mllib.regression.LinearRegressionWithSGD import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.feature.PCA val data = sc.textFile("data/mllib/ridge-data/lpsa.data").map { line => val parts = line.split(',') LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble))) }.cache() val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L) val training = splits(0).cache() val test = splits(1) val pca = PCA.create(training.first().features.size/2, data.map(_.features)) val training_pca = training.map(p => p.copy(features = pca.transform(p.features))) val test_pca = test.map(p => p.copy(features = pca.transform(p.features))) val numIterations = 100 val model = LinearRegressionWithSGD.train(training, numIterations) val model_pca = LinearRegressionWithSGD.train(training_pca, numIterations) val valuesAndPreds = test.map { point => val score = model.predict(point.features) (score, point.label) } val valuesAndPreds_pca = test_pca.map { point => val score = model_pca.predict(point.features) (score, point.label) } val MSE = valuesAndPreds.map{case(v, p) => math.pow((v - p), 2)}.mean() val MSE_pca = valuesAndPreds_pca.map{case(v, p) => math.pow((v - p), 2)}.mean() println("Mean Squared Error = " + MSE) println("PCA Mean Squared Error = " + MSE_pca) ``` Author: Kirill A. Korinskiy Author: Joseph K. Bradley Closes #4304 from catap/pca and squashes the following commits: 501bcd9 [Joseph K. Bradley] Small updates: removed k from Java-friendly PCA fit(). In PCASuite, converted results to set for comparison. Added an error message for bad k in PCA. 9dcc02b [Kirill A. Korinskiy] [SPARK-5521] fix scala style 1892a06 [Kirill A. Korinskiy] [SPARK-5521] PCA wrapper for easy transform vectors --- docs/mllib-dimensionality-reduction.md | 19 +++- docs/mllib-feature-extraction.md | 55 ++++++++++- .../org/apache/spark/mllib/feature/PCA.scala | 93 +++++++++++++++++++ .../apache/spark/mllib/feature/PCASuite.scala | 48 ++++++++++ 4 files changed, 213 insertions(+), 2 deletions(-) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala create mode 100644 mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala diff --git a/docs/mllib-dimensionality-reduction.md b/docs/mllib-dimensionality-reduction.md index 870fed6cc5024..05f51168d837c 100644 --- a/docs/mllib-dimensionality-reduction.md +++ b/docs/mllib-dimensionality-reduction.md @@ -137,7 +137,7 @@ statistical method to find a rotation such that the first coordinate has the lar possible, and each succeeding coordinate in turn has the largest variance possible. The columns of the rotation matrix are called principal components. PCA is used widely in dimensionality reduction. -MLlib supports PCA for tall-and-skinny matrices stored in row-oriented format. +MLlib supports PCA for tall-and-skinny matrices stored in row-oriented format and any Vectors.
@@ -157,6 +157,23 @@ val pc: Matrix = mat.computePrincipalComponents(10) // Principal components are // Project the rows to the linear space spanned by the top 10 principal components. val projected: RowMatrix = mat.multiply(pc) {% endhighlight %} + +The following code demonstrates how to compute principal components on source vectors +and use them to project the vectors into a low-dimensional space while keeping associated labels: + +{% highlight scala %} +import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.mllib.feature.PCA + +val data: RDD[LabeledPoint] = ... + +// Compute the top 10 principal components. +val pca = new PCA(10).fit(data.map(_.features)) + +// Project vectors to the linear space spanned by the top 10 principal components, keeping the label +val projected = data.map(p => p.copy(features = pca.transform(p.features))) +{% endhighlight %} +
diff --git a/docs/mllib-feature-extraction.md b/docs/mllib-feature-extraction.md index 03fedd01016b9..f723cd6b9dfab 100644 --- a/docs/mllib-feature-extraction.md +++ b/docs/mllib-feature-extraction.md @@ -507,7 +507,6 @@ v_N This example below demonstrates how to load a simple vectors file, extract a set of vectors, then transform those vectors using a transforming vector value. -
{% highlight scala %} @@ -531,3 +530,57 @@ val transformedData2 = parsedData.map(x => transformer.transform(x))
+## PCA + +A feature transformer that projects vectors to a low-dimensional space using PCA. +Details you can read at [dimensionality reduction](mllib-dimensionality-reduction.html). + +### Example + +The following code demonstrates how to compute principal components on a `Vector` +and use them to project the vectors into a low-dimensional space while keeping associated labels +for calculation a [Linear Regression]((mllib-linear-methods.html)) + +
+
+{% highlight scala %} +import org.apache.spark.mllib.regression.LinearRegressionWithSGD +import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.feature.PCA + +val data = sc.textFile("data/mllib/ridge-data/lpsa.data").map { line => + val parts = line.split(',') + LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble))) +}.cache() + +val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L) +val training = splits(0).cache() +val test = splits(1) + +val pca = new PCA(training.first().features.size/2).fit(data.map(_.features)) +val training_pca = training.map(p => p.copy(features = pca.transform(p.features))) +val test_pca = test.map(p => p.copy(features = pca.transform(p.features))) + +val numIterations = 100 +val model = LinearRegressionWithSGD.train(training, numIterations) +val model_pca = LinearRegressionWithSGD.train(training_pca, numIterations) + +val valuesAndPreds = test.map { point => + val score = model.predict(point.features) + (score, point.label) +} + +val valuesAndPreds_pca = test_pca.map { point => + val score = model_pca.predict(point.features) + (score, point.label) +} + +val MSE = valuesAndPreds.map{case(v, p) => math.pow((v - p), 2)}.mean() +val MSE_pca = valuesAndPreds_pca.map{case(v, p) => math.pow((v - p), 2)}.mean() + +println("Mean Squared Error = " + MSE) +println("PCA Mean Squared Error = " + MSE_pca) +{% endhighlight %} +
+
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala new file mode 100644 index 0000000000000..4e01e402b4283 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.feature + +import org.apache.spark.api.java.JavaRDD +import org.apache.spark.mllib.linalg._ +import org.apache.spark.mllib.linalg.distributed.RowMatrix +import org.apache.spark.rdd.RDD + +/** + * A feature transformer that projects vectors to a low-dimensional space using PCA. + * + * @param k number of principal components + */ +class PCA(val k: Int) { + require(k >= 1, s"PCA requires a number of principal components k >= 1 but was given $k") + + /** + * Computes a [[PCAModel]] that contains the principal components of the input vectors. + * + * @param sources source vectors + */ + def fit(sources: RDD[Vector]): PCAModel = { + require(k <= sources.first().size, + s"source vector size is ${sources.first().size} must be greater than k=$k") + + val mat = new RowMatrix(sources) + val pc = mat.computePrincipalComponents(k) match { + case dm: DenseMatrix => + dm + case sm: SparseMatrix => + /* Convert a sparse matrix to dense. + * + * RowMatrix.computePrincipalComponents always returns a dense matrix. + * The following code is a safeguard. + */ + sm.toDense + case m => + throw new IllegalArgumentException("Unsupported matrix format. Expected " + + s"SparseMatrix or DenseMatrix. Instead got: ${m.getClass}") + + } + new PCAModel(k, pc) + } + + /** Java-friendly version of [[fit()]] */ + def fit(sources: JavaRDD[Vector]): PCAModel = fit(sources.rdd) +} + +/** + * Model fitted by [[PCA]] that can project vectors to a low-dimensional space using PCA. + * + * @param k number of principal components. + * @param pc a principal components Matrix. Each column is one principal component. + */ +class PCAModel private[mllib] (val k: Int, val pc: DenseMatrix) extends VectorTransformer { + /** + * Transform a vector by computed Principal Components. + * + * @param vector vector to be transformed. + * Vector must be the same length as the source vectors given to [[PCA.fit()]]. + * @return transformed vector. Vector will be of length k. + */ + override def transform(vector: Vector): Vector = { + vector match { + case dv: DenseVector => + pc.transpose.multiply(dv) + case SparseVector(size, indices, values) => + /* SparseVector -> single row SparseMatrix */ + val sm = Matrices.sparse(size, 1, Array(0, indices.length), indices, values).transpose + val projection = sm.multiply(pc) + Vectors.dense(projection.values) + case _ => + throw new IllegalArgumentException("Unsupported vector format. Expected " + + s"SparseVector or DenseVector. Instead got: ${vector.getClass}") + } + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala new file mode 100644 index 0000000000000..758af588f1c69 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.feature + +import org.scalatest.FunSuite + +import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.linalg.distributed.RowMatrix +import org.apache.spark.mllib.util.MLlibTestSparkContext + +class PCASuite extends FunSuite with MLlibTestSparkContext { + + private val data = Array( + Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))), + Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0), + Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0) + ) + + private lazy val dataRDD = sc.parallelize(data, 2) + + test("Correct computing use a PCA wrapper") { + val k = dataRDD.count().toInt + val pca = new PCA(k).fit(dataRDD) + + val mat = new RowMatrix(dataRDD) + val pc = mat.computePrincipalComponents(k) + + val pca_transform = pca.transform(dataRDD).collect() + val mat_multiply = mat.multiply(pc).rows.collect() + + assert(pca_transform.toSet === mat_multiply.toSet) + } +} From c5aca0c27be31e94ffdb01ef2eb29d3b373d7f4c Mon Sep 17 00:00:00 2001 From: Glenn Weidner Date: Sun, 10 May 2015 19:18:32 -0700 Subject: [PATCH 5/8] [SPARK-7427] [PYSPARK] Make sharedParams match in Scala, Python Modified 2 files: python/pyspark/ml/param/_shared_params_code_gen.py python/pyspark/ml/param/shared.py Generated shared.py on Linux using Python 2.6.6 on Redhat Enterprise Linux Server 6.6. python _shared_params_code_gen.py > shared.py Only changed maxIter, regParam, rawPredictionCol based on strings from SharedParamsCodeGen.scala. Note warning was displayed when committing shared.py: warning: LF will be replaced by CRLF in python/pyspark/ml/param/shared.py. Author: Glenn Weidner Closes #6023 from gweidner/br-7427 and squashes the following commits: db72e32 [Glenn Weidner] [SPARK-7427] [PySpark] Make sharedParams match in Scala, Python 825e4a9 [Glenn Weidner] [SPARK-7427] [PySpark] Make sharedParams match in Scala, Python e6a865e [Glenn Weidner] [SPARK-7427] [PySpark] Make sharedParams match in Scala, Python 1eee702 [Glenn Weidner] Merge remote-tracking branch 'upstream/master' 1ac10e5 [Glenn Weidner] Merge remote-tracking branch 'upstream/master' cafd104 [Glenn Weidner] Merge remote-tracking branch 'upstream/master' 9bea1eb [Glenn Weidner] Merge remote-tracking branch 'upstream/master' 4a35c20 [Glenn Weidner] Merge remote-tracking branch 'upstream/master' 9790cbe [Glenn Weidner] Merge remote-tracking branch 'upstream/master' d9c30f4 [Glenn Weidner] [SPARK-7275] [SQL] [WIP] Make LogicalRelation public --- .../ml/param/_shared_params_code_gen.py | 6 ++-- python/pyspark/ml/param/shared.py | 30 +++++++++---------- python/pyspark/ml/tests.py | 4 +-- tox.ini | 2 +- 4 files changed, 20 insertions(+), 22 deletions(-) diff --git a/python/pyspark/ml/param/_shared_params_code_gen.py b/python/pyspark/ml/param/_shared_params_code_gen.py index ed3171b6976d3..3be0979b92013 100644 --- a/python/pyspark/ml/param/_shared_params_code_gen.py +++ b/python/pyspark/ml/param/_shared_params_code_gen.py @@ -88,12 +88,12 @@ def get$Name(self): print("\n# DO NOT MODIFY THIS FILE! It was generated by _shared_params_code_gen.py.\n") print("from pyspark.ml.param import Param, Params\n\n") shared = [ - ("maxIter", "max number of iterations", None), - ("regParam", "regularization constant", None), + ("maxIter", "max number of iterations (>= 0)", None), + ("regParam", "regularization parameter (>= 0)", None), ("featuresCol", "features column name", "'features'"), ("labelCol", "label column name", "'label'"), ("predictionCol", "prediction column name", "'prediction'"), - ("rawPredictionCol", "raw prediction column name", "'rawPrediction'"), + ("rawPredictionCol", "raw prediction (a.k.a. confidence) column name", "'rawPrediction'"), ("inputCol", "input column name", None), ("inputCols", "input column names", None), ("outputCol", "output column name", None), diff --git a/python/pyspark/ml/param/shared.py b/python/pyspark/ml/param/shared.py index d0bcadee22347..4b22322b895b4 100644 --- a/python/pyspark/ml/param/shared.py +++ b/python/pyspark/ml/param/shared.py @@ -22,16 +22,16 @@ class HasMaxIter(Params): """ - Mixin for param maxIter: max number of iterations. + Mixin for param maxIter: max number of iterations (>= 0). """ # a placeholder to make it appear in the generated doc - maxIter = Param(Params._dummy(), "maxIter", "max number of iterations") + maxIter = Param(Params._dummy(), "maxIter", "max number of iterations (>= 0)") def __init__(self): super(HasMaxIter, self).__init__() - #: param for max number of iterations - self.maxIter = Param(self, "maxIter", "max number of iterations") + #: param for max number of iterations (>= 0) + self.maxIter = Param(self, "maxIter", "max number of iterations (>= 0)") if None is not None: self._setDefault(maxIter=None) @@ -51,16 +51,16 @@ def getMaxIter(self): class HasRegParam(Params): """ - Mixin for param regParam: regularization constant. + Mixin for param regParam: regularization parameter (>= 0). """ # a placeholder to make it appear in the generated doc - regParam = Param(Params._dummy(), "regParam", "regularization constant") + regParam = Param(Params._dummy(), "regParam", "regularization parameter (>= 0)") def __init__(self): super(HasRegParam, self).__init__() - #: param for regularization constant - self.regParam = Param(self, "regParam", "regularization constant") + #: param for regularization parameter (>= 0) + self.regParam = Param(self, "regParam", "regularization parameter (>= 0)") if None is not None: self._setDefault(regParam=None) @@ -167,16 +167,16 @@ def getPredictionCol(self): class HasRawPredictionCol(Params): """ - Mixin for param rawPredictionCol: raw prediction column name. + Mixin for param rawPredictionCol: raw prediction (a.k.a. confidence) column name. """ # a placeholder to make it appear in the generated doc - rawPredictionCol = Param(Params._dummy(), "rawPredictionCol", "raw prediction column name") + rawPredictionCol = Param(Params._dummy(), "rawPredictionCol", "raw prediction (a.k.a. confidence) column name") def __init__(self): super(HasRawPredictionCol, self).__init__() - #: param for raw prediction column name - self.rawPredictionCol = Param(self, "rawPredictionCol", "raw prediction column name") + #: param for raw prediction (a.k.a. confidence) column name + self.rawPredictionCol = Param(self, "rawPredictionCol", "raw prediction (a.k.a. confidence) column name") if 'rawPrediction' is not None: self._setDefault(rawPredictionCol='rawPrediction') @@ -403,14 +403,12 @@ class HasStepSize(Params): """ # a placeholder to make it appear in the generated doc - stepSize = Param(Params._dummy(), "stepSize", - "Step size to be used for each iteration of optimization.") + stepSize = Param(Params._dummy(), "stepSize", "Step size to be used for each iteration of optimization.") def __init__(self): super(HasStepSize, self).__init__() #: param for Step size to be used for each iteration of optimization. - self.stepSize = Param(self, "stepSize", - "Step size to be used for each iteration of optimization.") + self.stepSize = Param(self, "stepSize", "Step size to be used for each iteration of optimization.") if None is not None: self._setDefault(stepSize=None) diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index 75bb5d749ca87..ba6478dcd58a9 100644 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -128,7 +128,7 @@ def test_param(self): testParams = TestParams() maxIter = testParams.maxIter self.assertEqual(maxIter.name, "maxIter") - self.assertEqual(maxIter.doc, "max number of iterations") + self.assertEqual(maxIter.doc, "max number of iterations (>= 0)") self.assertTrue(maxIter.parent is testParams) def test_params(self): @@ -156,7 +156,7 @@ def test_params(self): self.assertEquals( testParams.explainParams(), "\n".join(["inputCol: input column name (undefined)", - "maxIter: max number of iterations (default: 10, current: 100)"])) + "maxIter: max number of iterations (>= 0) (default: 10, current: 100)"])) if __name__ == "__main__": diff --git a/tox.ini b/tox.ini index b568029a204cc..76e3f42cde62d 100644 --- a/tox.ini +++ b/tox.ini @@ -15,4 +15,4 @@ [pep8] max-line-length=100 -exclude=cloudpickle.py,heapq3.py +exclude=cloudpickle.py,heapq3.py,shared.py From 0835f1edd4c9c05439df85c248faf6787d45f7b7 Mon Sep 17 00:00:00 2001 From: Shivaram Venkataraman Date: Sun, 10 May 2015 19:49:42 -0700 Subject: [PATCH 6/8] [SPARK-7512] [SPARKR] Fix RDD's show method to use getJRDD Since the RDD object might be a Pipelined RDD we should use `getJRDD` to get the right handle to the Java object. Fixes the bug reported at http://stackoverflow.com/questions/30057702/sparkr-filterrdd-and-flatmap-not-working cc concretevitamin Author: Shivaram Venkataraman Closes #6035 from shivaram/sparkr-show-bug and squashes the following commits: d70145c [Shivaram Venkataraman] Fix RDD's show method to use getJRDD Fixes the bug reported at http://stackoverflow.com/questions/30057702/sparkr-filterrdd-and-flatmap-not-working --- R/pkg/R/RDD.R | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R index 73999a6737032..9138629cac9c0 100644 --- a/R/pkg/R/RDD.R +++ b/R/pkg/R/RDD.R @@ -67,8 +67,8 @@ setMethod("initialize", "RDD", function(.Object, jrdd, serializedMode, }) setMethod("show", "RDD", - function(.Object) { - cat(paste(callJMethod(.Object@jrdd, "toString"), "\n", sep="")) + function(object) { + cat(paste(callJMethod(getJRDD(object), "toString"), "\n", sep="")) }) setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val) { From 2242ab31e99227a102b0918d73db67e99899fd24 Mon Sep 17 00:00:00 2001 From: tianyi Date: Mon, 11 May 2015 14:08:15 +0800 Subject: [PATCH 7/8] [SPARK-7519] [SQL] fix minor bugs in thrift server UI Bugs description: 1. There are extra commas on the top of session list. 2. The format of time in "Start at:" part is not the same as others. 3. The total number of online sessions is wrong. Author: tianyi Closes #6048 from tianyi/SPARK-7519 and squashes the following commits: ed366b7 [tianyi] fix bug --- .../sql/hive/thriftserver/HiveThriftServer2.scala | 4 +++- .../sql/hive/thriftserver/ui/ThriftServerPage.scala | 10 +++++----- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index 0be5a92c2546c..3458b04bfba0f 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -147,7 +147,7 @@ object HiveThriftServer2 extends Logging { override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { server.stop() } - + var onlineSessionNum: Int = 0 val sessionList = new mutable.LinkedHashMap[String, SessionInfo] val executionList = new mutable.LinkedHashMap[String, ExecutionInfo] val retainedStatements = @@ -170,11 +170,13 @@ object HiveThriftServer2 extends Logging { def onSessionCreated(ip: String, sessionId: String, userName: String = "UNKNOWN"): Unit = { val info = new SessionInfo(sessionId, System.currentTimeMillis, ip, userName) sessionList.put(sessionId, info) + onlineSessionNum += 1 trimSessionIfNecessary() } def onSessionClosed(sessionId: String): Unit = { sessionList(sessionId).finishTimestamp = System.currentTimeMillis + onlineSessionNum -= 1 } def onStatementStart( diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala index 71b16b6bebffb..6a2be4a58e5cb 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala @@ -29,7 +29,7 @@ import org.apache.spark.ui.UIUtils._ import org.apache.spark.ui._ -/** Page for Spark Web UI that shows statistics of a streaming job */ +/** Page for Spark Web UI that shows statistics of a thrift server */ private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage("") with Logging { private val listener = parent.listener @@ -42,7 +42,7 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage("" generateBasicStats() ++
++

- {listener.sessionList.size} session(s) are online, + {listener.onlineSessionNum} session(s) are online, running {listener.totalRunning} SQL statement(s)

++ generateSessionStatsTable() ++ @@ -50,12 +50,12 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage("" UIUtils.headerSparkPage("ThriftServer", content, parent, Some(5000)) } - /** Generate basic stats of the streaming program */ + /** Generate basic stats of the thrift server program */ private def generateBasicStats(): Seq[Node] = { val timeSinceStart = System.currentTimeMillis() - startTime.getTime
  • - Started at: {startTime.toString} + Started at: {formatDate(startTime)}
  • Time since start: {formatDurationVerbose(timeSinceStart)} @@ -148,7 +148,7 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage("" {session.userName} {session.ip} - {session.sessionId} , + {session.sessionId} {formatDate(session.startTimestamp)} {if(session.finishTimestamp > 0) formatDate(session.finishTimestamp)} {formatDurationOption(Some(session.totalTime))} From d70a076892e0677acceccaba665908cdf664f1b4 Mon Sep 17 00:00:00 2001 From: Wesley Miao Date: Mon, 11 May 2015 12:20:06 +0100 Subject: [PATCH 8/8] [SPARK-7326] [STREAMING] Performing window() on a WindowedDStream doesn't work all the time tdas https://issues.apache.org/jira/browse/SPARK-7326 The problem most likely resides in DStream.slice() implementation, as shown below. def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = { if (!isInitialized) { throw new SparkException(this + " has not been initialized") } if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) { logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")") } if (!(toTime - zeroTime).isMultipleOf(slideDuration)) { logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration (" + slideDuration + ")") } val alignedToTime = toTime.floor(slideDuration, zeroTime) val alignedFromTime = fromTime.floor(slideDuration, zeroTime) logInfo("Slicing from " + fromTime + " to " + toTime + " (aligned to " + alignedFromTime + " and " + alignedToTime + ")") alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => { if (time >= zeroTime) getOrCompute(time) else None }) } Here after performing floor() on both fromTime and toTime, the result (alignedFromTime - zeroTime) and (alignedToTime - zeroTime) may no longer be multiple of the slidingDuration, thus making isTimeValid() check failed for all the remaining computation. The fix is to add a new floor() function in Time.scala to respect the zeroTime while performing the floor : def floor(that: Duration, zeroTime: Time): Time = { val t = that.milliseconds new Time(((this.millis - zeroTime.milliseconds) / t) * t + zeroTime.milliseconds) } And then change the DStream.slice to call this new floor function by passing in its zeroTime. val alignedToTime = toTime.floor(slideDuration, zeroTime) val alignedFromTime = fromTime.floor(slideDuration, zeroTime) This way the alignedToTime and alignedFromTime are *really* aligned in respect to zeroTime whose value is not really a 0. Author: Wesley Miao Author: Wesley Closes #5871 from wesleymiao/spark-7326 and squashes the following commits: 82a4d8c [Wesley Miao] [SPARK-7326] [STREAMING] Performing window() on a WindowedDStream dosen't work all the time 48b4dc0 [Wesley] [SPARK-7326] [STREAMING] Performing window() on a WindowedDStream doesn't work all the time 6ade399 [Wesley] [SPARK-7326] [STREAMING] Performing window() on a WindowedDStream doesn't work all the time 2611745 [Wesley Miao] [SPARK-7326] [STREAMING] Performing window() on a WindowedDStream doesn't work all the time --- .../org/apache/spark/streaming/Time.scala | 5 +++++ .../spark/streaming/dstream/DStream.scala | 22 ++++++++++++------- .../apache/spark/streaming/TimeSuite.scala | 3 +++ 3 files changed, 22 insertions(+), 8 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Time.scala b/streaming/src/main/scala/org/apache/spark/streaming/Time.scala index 42c49678d24f0..92cfd7d40338c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Time.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Time.scala @@ -63,6 +63,11 @@ case class Time(private val millis: Long) { new Time((this.millis / t) * t) } + def floor(that: Duration, zeroTime: Time): Time = { + val t = that.milliseconds + new Time(((this.millis - zeroTime.milliseconds) / t) * t + zeroTime.milliseconds) + } + def isMultipleOf(that: Duration): Boolean = (this.millis % that.milliseconds == 0) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index f1f8a70655996..7092a3d3f0b86 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -763,16 +763,22 @@ abstract class DStream[T: ClassTag] ( if (!isInitialized) { throw new SparkException(this + " has not been initialized") } - if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) { - logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration (" - + slideDuration + ")") + + val alignedToTime = if ((toTime - zeroTime).isMultipleOf(slideDuration)) { + toTime + } else { + logWarning("toTime (" + toTime + ") is not a multiple of slideDuration (" + + slideDuration + ")") + toTime.floor(slideDuration, zeroTime) } - if (!(toTime - zeroTime).isMultipleOf(slideDuration)) { - logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration (" - + slideDuration + ")") + + val alignedFromTime = if ((fromTime - zeroTime).isMultipleOf(slideDuration)) { + fromTime + } else { + logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration (" + + slideDuration + ")") + fromTime.floor(slideDuration, zeroTime) } - val alignedToTime = toTime.floor(slideDuration) - val alignedFromTime = fromTime.floor(slideDuration) logInfo("Slicing from " + fromTime + " to " + toTime + " (aligned to " + alignedFromTime + " and " + alignedToTime + ")") diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TimeSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/TimeSuite.scala index 5579ac364346c..e6a01656f479d 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TimeSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TimeSuite.scala @@ -69,6 +69,9 @@ class TimeSuite extends TestSuiteBase { assert(new Time(1200).floor(new Duration(200)) == new Time(1200)) assert(new Time(199).floor(new Duration(200)) == new Time(0)) assert(new Time(1).floor(new Duration(1)) == new Time(1)) + assert(new Time(1350).floor(new Duration(200), new Time(50)) == new Time(1250)) + assert(new Time(1350).floor(new Duration(200), new Time(150)) == new Time(1350)) + assert(new Time(1350).floor(new Duration(200), new Time(200)) == new Time(1200)) } test("isMultipleOf") {