Skip to content

Commit

Permalink
[SPARK-3974] Made partitioner a variable inside BlockMatrix instead o…
Browse files Browse the repository at this point in the history
…f a constructor variable
  • Loading branch information
brkyvz committed Nov 20, 2014
1 parent d033861 commit 9ae85aa
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,11 @@ class ColumnBasedPartitioner(
* @param numRowBlocks Number of blocks that form the rows of this matrix
* @param numColBlocks Number of blocks that form the columns of this matrix
* @param rdd The RDD of SubMatrices (local matrices) that form this matrix
* @param partitioner A partitioner that specifies how SubMatrices are stored in the cluster
*/
class BlockMatrix(
val numRowBlocks: Int,
val numColBlocks: Int,
val rdd: RDD[SubMatrix],
val partitioner: BlockMatrixPartitioner) extends DistributedMatrix with Logging {
val rdd: RDD[SubMatrix]) extends DistributedMatrix with Logging {

/**
* Alternate constructor for BlockMatrix without the input of a partitioner. Will use a Grid
Expand All @@ -170,11 +168,31 @@ class BlockMatrix(
* @param numRowBlocks Number of blocks that form the rows of this matrix
* @param numColBlocks Number of blocks that form the columns of this matrix
* @param rdd The RDD of SubMatrices (local matrices) that form this matrix
* @param partitioner A partitioner that specifies how SubMatrices are stored in the cluster
*/
def this(numRowBlocks: Int, numColBlocks: Int, rdd: RDD[SubMatrix]) = {
this(numRowBlocks, numColBlocks, rdd, new GridPartitioner(numRowBlocks, numColBlocks,
rdd.first().mat.numRows, rdd.first().mat.numCols))
def this(
numRowBlocks: Int,
numColBlocks: Int,
rdd: RDD[SubMatrix],
partitioner: BlockMatrixPartitioner) = {
this(numRowBlocks, numColBlocks, rdd)
setPartitioner(partitioner)
}

private[mllib] var partitioner: BlockMatrixPartitioner = {
val firstSubMatrix = rdd.first().mat
new GridPartitioner(numRowBlocks, numColBlocks,
firstSubMatrix.numRows, firstSubMatrix.numCols)
}

/**
* Set the partitioner for the matrix. For internal use only. Users should use `repartition`.
* @param part A partitioner that specifies how SubMatrices are stored in the cluster
*/
private def setPartitioner(part: BlockMatrixPartitioner): Unit = {
partitioner = part
}

// A key-value pair RDD is required to partition properly
private var matrixRDD: RDD[(Int, SubMatrix)] = keyBy()

Expand Down Expand Up @@ -259,8 +277,9 @@ class BlockMatrix(
* @param part The partitioner to partition by
* @return The repartitioned BlockMatrix
*/
def repartition(part: BlockMatrixPartitioner = partitioner): DistributedMatrix = {
def repartition(part: BlockMatrixPartitioner): DistributedMatrix = {
matrixRDD = keyBy(part)
setPartitioner(part)
this
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,20 @@ class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext {
assert(gridBasedMat.numCols() === n)
}

test("partitioner and repartition") {
assert(colBasedMat.partitioner.name === "column")
assert(rowBasedMat.partitioner.name === "row")
assert(gridBasedMat.partitioner.name === "grid")

val colPart = new ColumnBasedPartitioner(numColBlocks, rowPerPart, colPerPart)
val rowPart = new RowBasedPartitioner(numRowBlocks, rowPerPart, colPerPart)
gridBasedMat.repartition(rowPart).asInstanceOf[BlockMatrix]
assert(gridBasedMat.partitioner.name === "row")

gridBasedMat.repartition(colPart).asInstanceOf[BlockMatrix]
assert(gridBasedMat.partitioner.name === "column")
}

test("toBreeze and collect") {
val expected = BDM(
(1.0, 0.0, 0.0, 0.0),
Expand Down

0 comments on commit 9ae85aa

Please sign in to comment.