From e1db950e91c7d9526519626aa252cd711307d857 Mon Sep 17 00:00:00 2001 From: Li Pu Date: Tue, 3 Jun 2014 18:05:18 -0700 Subject: [PATCH] SPARK-1782: svd for sparse matrix using ARPACK copy ARPACK dsaupd/dseupd code from latest breeze change RowMatrix to use sparse SVD change tests for sparse SVD --- .../linalg/EigenValueDecomposition.scala | 120 ++++++++++++++++++ .../mllib/linalg/distributed/RowMatrix.scala | 42 ++++-- .../linalg/distributed/RowMatrixSuite.scala | 10 +- 3 files changed, 153 insertions(+), 19 deletions(-) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala new file mode 100644 index 0000000000000..7f07eb8768e97 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg + +import org.apache.spark.annotation.Experimental +import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV} +import org.netlib.util.{intW, doubleW} +import com.github.fommil.netlib.ARPACK + +/** + * :: Experimental :: + * Represents eigenvalue decomposition factors. + */ +@Experimental +case class EigenValueDecomposition[VType](s: Vector, V: VType) + +object EigenValueDecomposition { + /** + * Compute the leading k eigenvalues and eigenvectors on a symmetric square matrix using ARPACK. + * The caller needs to ensure that the input matrix is real symmetric. This function requires + * memory for `n*(4*k+4)` doubles. + * + * @param mul a function that multiplies the symmetric matrix with a Vector. + * @param n dimension of the square matrix (maximum Int.MaxValue). + * @param k number of leading eigenvalues required. + * @param tol tolerance of the eigs computation. + * @return a dense vector of eigenvalues in descending order and a dense matrix of eigenvectors + * (columns of the matrix). The number of computed eigenvalues might be smaller than k. + */ + private[mllib] def symmetricEigs(mul: Vector => Vector, n: Int, k: Int, tol: Double) + : (BDV[Double], BDM[Double]) = { + require(n > k, s"Number of required eigenvalues $k must be smaller than matrix dimension $n") + + val arpack = ARPACK.getInstance() + + val tolW = new doubleW(tol) + val nev = new intW(k) + val ncv = scala.math.min(2*k,n) + + val bmat = "I" + val which = "LM" + + var iparam = new Array[Int](11) + iparam(0) = 1 + iparam(2) = 300 + iparam(6) = 1 + + var ido = new intW(0) + var info = new intW(0) + var resid:Array[Double] = new Array[Double](n) + var v = new Array[Double](n*ncv) + var workd = new Array[Double](3*n) + var workl = new Array[Double](ncv*(ncv+8)) + var ipntr = new Array[Int](11) + + // first call to ARPACK + arpack.dsaupd(ido, bmat, n, which, nev.`val`, tolW, resid, ncv, v, n, iparam, ipntr, workd, + workl, workl.length, info) + + val w = BDV(workd) + + while(ido.`val` !=99) { + if (ido.`val` != -1 && ido.`val` != 1) + throw new IllegalStateException("ARPACK returns ido = " + ido.`val`) + // multiply working vector with the matrix + val inputOffset = ipntr(0) - 1 + val outputOffset = ipntr(1) - 1 + val x = w(inputOffset until inputOffset + n) + val y = w(outputOffset until outputOffset + n) + y := BDV(mul(Vectors.fromBreeze(x)).toArray) + // call ARPACK + arpack.dsaupd(ido, bmat, n, which, nev.`val`, tolW, resid, ncv, v, n, iparam, ipntr, + workd, workl, workl.length, info) + } + + if (info.`val` != 0) + throw new IllegalStateException("ARPACK returns non-zero info = " + info.`val`) + + val d = new Array[Double](nev.`val`) + val select = new Array[Boolean](ncv) + val z = java.util.Arrays.copyOfRange(v, 0, nev.`val` * n) + + arpack.dseupd(true, "A", select, d, z, n, 0.0, bmat, n, which, nev, tol, resid, ncv, v, n, + iparam, ipntr, workd, workl, workl.length, info) + + val computed = iparam(4) + + val s = BDV(d)(0 until computed) + val U = new BDM(n, computed, z) + + val sortedEigenValuesWithIndex = s.toArray.zipWithIndex.sortBy(-1 * _._1).zipWithIndex + + val sorteds = BDV(sortedEigenValuesWithIndex.map(_._1._1)) + val sortedU = BDM.zeros[Double](n, computed) + + // copy eigenvectors in descending order of eigenvalues + sortedEigenValuesWithIndex.map{ + r => { + sortedU(::, r._2) := U(::, r._1._2) + } + } + + (sorteds, sortedU) + } +} \ No newline at end of file diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index 07dfadf2f7869..fd653a367a81d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -200,6 +200,19 @@ class RowMatrix( nRows } + /** + * Multiply the Gramian matrix `A^T A` by a Vector on the right. + * + * @param v a local vector whose length must match the number of columns of this matrix + * @return a local DenseVector representing the product + */ + private[mllib] def multiplyGramianMatrix(v: Vector): Vector = { + val bv = rows.map{ + row => row.toBreeze * row.toBreeze.dot(v.toBreeze) + }.reduce( (x: BV[Double], y: BV[Double]) => x + y ) + Vectors.fromBreeze(bv) + } + /** * Computes the Gramian matrix `A^T A`. */ @@ -221,15 +234,17 @@ class RowMatrix( /** * Computes the singular value decomposition of this matrix. - * Denote this matrix by A (m x n), this will compute matrices U, S, V such that A = U * S * V'. + * Denote this matrix by A (m x n), this will compute matrices U, S, V such that A ~= U * S * V', + * where S contains the leading singular values, U and V contain the corresponding singular + * vectors. * - * There is no restriction on m, but we require `n^2` doubles to fit in memory. - * Further, n should be less than m. - - * The decomposition is computed by first computing A'A = V S^2 V', - * computing svd locally on that (since n x n is small), from which we recover S and V. - * Then we compute U via easy matrix multiplication as U = A * (V * S^-1). - * Note that this approach requires `O(n^3)` time on the master node. + * There is no restriction on m, but we require `n*(6*k+4)` doubles to fit in memory on the master + * node. Further, n should be less than m. + * + * The decomposition is computed by providing a function that multiples a vector with A'A to + * ARPACK, and iteratively invoking ARPACK-dsaupd on master node, from which we recover S and V. + * Then we compute U via easy matrix multiplication as U = A * (V * S-1). + * Note that this approach requires `O(nnz(A))` time. * * At most k largest non-zero singular values and associated vectors are returned. * If there are k such values, then the dimensions of the return will be: @@ -243,20 +258,19 @@ class RowMatrix( * @param computeU whether to compute U * @param rCond the reciprocal condition number. All singular values smaller than rCond * sigma(0) * are treated as zero, where sigma(0) is the largest singular value. + * @param tol the tolerance of the svd computation. * @return SingularValueDecomposition(U, s, V) */ def computeSVD( k: Int, computeU: Boolean = false, - rCond: Double = 1e-9): SingularValueDecomposition[RowMatrix, Matrix] = { + rCond: Double = 1e-9, + tol: Double = 1e-6): SingularValueDecomposition[RowMatrix, Matrix] = { val n = numCols().toInt require(k > 0 && k <= n, s"Request up to n singular values k=$k n=$n.") - val G = computeGramianMatrix() - - // TODO: Use sparse SVD instead. - val (u: BDM[Double], sigmaSquares: BDV[Double], v: BDM[Double]) = - brzSvd(G.toBreeze.asInstanceOf[BDM[Double]]) + val (sigmaSquares: BDV[Double], u: BDM[Double]) = + EigenValueDecomposition.symmetricEigs(multiplyGramianMatrix, n, k, tol) val sigmas: BDV[Double] = brzSqrt(sigmaSquares) // Determine effective rank. diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala index c9f9acf4c1335..9c346de3d2fbe 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala @@ -99,7 +99,7 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext { val localMat = mat.toBreeze() val (localU, localSigma, localVt) = brzSvd(localMat) val localV: BDM[Double] = localVt.t.toDenseMatrix - for (k <- 1 to n) { + for (k <- 1 to (n - 1)) { val svd = mat.computeSVD(k, computeU = true) val U = svd.U val s = svd.s @@ -113,19 +113,19 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext { assertColumnEqualUpToSign(V.toBreeze.asInstanceOf[BDM[Double]], localV, k) assert(closeToZero(s.toBreeze.asInstanceOf[BDV[Double]] - localSigma(0 until k))) } - val svdWithoutU = mat.computeSVD(n) + val svdWithoutU = mat.computeSVD(n - 1) assert(svdWithoutU.U === null) } } test("svd of a low-rank matrix") { - val rows = sc.parallelize(Array.fill(4)(Vectors.dense(1.0, 1.0)), 2) - val mat = new RowMatrix(rows, 4, 2) + val rows = sc.parallelize(Array.fill(4)(Vectors.dense(1.0, 1.0, 1.0)), 2) + val mat = new RowMatrix(rows, 4, 3) val svd = mat.computeSVD(2, computeU = true) assert(svd.s.size === 1, "should not return zero singular values") assert(svd.U.numRows() === 4) assert(svd.U.numCols() === 1) - assert(svd.V.numRows === 2) + assert(svd.V.numRows === 3) assert(svd.V.numCols === 1) }