From 9ae85aa1ebabdc099d7f655bc1d9021d34d2910f Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Thu, 20 Nov 2014 11:58:58 -0800 Subject: [PATCH] [SPARK-3974] Made partitioner a variable inside BlockMatrix instead of a constructor variable --- .../linalg/distributed/BlockMatrix.scala | 33 +++++++++++++++---- .../linalg/distributed/BlockMatrixSuite.scala | 14 ++++++++ 2 files changed, 40 insertions(+), 7 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala index 17fcfaf4d0fe1..7b4e61b534454 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala @@ -155,13 +155,11 @@ class ColumnBasedPartitioner( * @param numRowBlocks Number of blocks that form the rows of this matrix * @param numColBlocks Number of blocks that form the columns of this matrix * @param rdd The RDD of SubMatrices (local matrices) that form this matrix - * @param partitioner A partitioner that specifies how SubMatrices are stored in the cluster */ class BlockMatrix( val numRowBlocks: Int, val numColBlocks: Int, - val rdd: RDD[SubMatrix], - val partitioner: BlockMatrixPartitioner) extends DistributedMatrix with Logging { + val rdd: RDD[SubMatrix]) extends DistributedMatrix with Logging { /** * Alternate constructor for BlockMatrix without the input of a partitioner. Will use a Grid @@ -170,11 +168,31 @@ class BlockMatrix( * @param numRowBlocks Number of blocks that form the rows of this matrix * @param numColBlocks Number of blocks that form the columns of this matrix * @param rdd The RDD of SubMatrices (local matrices) that form this matrix + * @param partitioner A partitioner that specifies how SubMatrices are stored in the cluster */ - def this(numRowBlocks: Int, numColBlocks: Int, rdd: RDD[SubMatrix]) = { - this(numRowBlocks, numColBlocks, rdd, new GridPartitioner(numRowBlocks, numColBlocks, - rdd.first().mat.numRows, rdd.first().mat.numCols)) + def this( + numRowBlocks: Int, + numColBlocks: Int, + rdd: RDD[SubMatrix], + partitioner: BlockMatrixPartitioner) = { + this(numRowBlocks, numColBlocks, rdd) + setPartitioner(partitioner) } + + private[mllib] var partitioner: BlockMatrixPartitioner = { + val firstSubMatrix = rdd.first().mat + new GridPartitioner(numRowBlocks, numColBlocks, + firstSubMatrix.numRows, firstSubMatrix.numCols) + } + + /** + * Set the partitioner for the matrix. For internal use only. Users should use `repartition`. + * @param part A partitioner that specifies how SubMatrices are stored in the cluster + */ + private def setPartitioner(part: BlockMatrixPartitioner): Unit = { + partitioner = part + } + // A key-value pair RDD is required to partition properly private var matrixRDD: RDD[(Int, SubMatrix)] = keyBy() @@ -259,8 +277,9 @@ class BlockMatrix( * @param part The partitioner to partition by * @return The repartitioned BlockMatrix */ - def repartition(part: BlockMatrixPartitioner = partitioner): DistributedMatrix = { + def repartition(part: BlockMatrixPartitioner): DistributedMatrix = { matrixRDD = keyBy(part) + setPartitioner(part) this } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala index 5c74a15f64bf9..1d9ff1112ddb4 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala @@ -66,6 +66,20 @@ class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext { assert(gridBasedMat.numCols() === n) } + test("partitioner and repartition") { + assert(colBasedMat.partitioner.name === "column") + assert(rowBasedMat.partitioner.name === "row") + assert(gridBasedMat.partitioner.name === "grid") + + val colPart = new ColumnBasedPartitioner(numColBlocks, rowPerPart, colPerPart) + val rowPart = new RowBasedPartitioner(numRowBlocks, rowPerPart, colPerPart) + gridBasedMat.repartition(rowPart).asInstanceOf[BlockMatrix] + assert(gridBasedMat.partitioner.name === "row") + + gridBasedMat.repartition(colPart).asInstanceOf[BlockMatrix] + assert(gridBasedMat.partitioner.name === "column") + } + test("toBreeze and collect") { val expected = BDM( (1.0, 0.0, 0.0, 0.0),