Skip to content

Commit

Permalink
Merge pull request #2 from mengxr/brkyvz-SPARK-3974
Browse files Browse the repository at this point in the history
Simplify GridPartitioner partitioning
  • Loading branch information
brkyvz committed Jan 28, 2015
2 parents 5eecd48 + feb32a7 commit a8eace2
Show file tree
Hide file tree
Showing 2 changed files with 161 additions and 170 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,146 +18,111 @@
package org.apache.spark.mllib.linalg.distributed

import breeze.linalg.{DenseMatrix => BDM}
import org.apache.spark.util.Utils

import org.apache.spark.{Logging, Partitioner}
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.rdd.RDDFunctions._
import org.apache.spark.mllib.linalg.{DenseMatrix, Matrix}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel

/**
* A grid partitioner, which stores every block in a separate partition.
* A grid partitioner, which uses a regular grid to partition coordinates.
*
* @param numRowBlocks Number of blocks that form the rows of the matrix.
* @param numColBlocks Number of blocks that form the columns of the matrix.
* @param suggestedNumPartitions Number of partitions to partition the rdd into. The final number
* of partitions will be set to `min(suggestedNumPartitions,
* numRowBlocks * numColBlocks)`, because setting the number of
* partitions greater than the number of sub matrices is not useful.
* @param rows Number of rows.
* @param cols Number of columns.
* @param rowsPerPart Number of rows per partition, which may be less at the bottom edge.
* @param colsPerPart Number of columns per partition, which may be less at the right edge.
*/
private[mllib] class GridPartitioner(
val numRowBlocks: Int,
val numColBlocks: Int,
suggestedNumPartitions: Int) extends Partitioner {
private val totalBlocks = numRowBlocks.toLong * numColBlocks
// Having the number of partitions greater than the number of sub matrices does not help
override val numPartitions = math.min(suggestedNumPartitions, totalBlocks).toInt

private val blockLengthsPerPartition = findOptimalBlockLengths
// Number of neighboring blocks to take in each row
private val numRowBlocksPerPartition = blockLengthsPerPartition._1
// Number of neighboring blocks to take in each column
private val numColBlocksPerPartition = blockLengthsPerPartition._2
// Number of rows of partitions
private val blocksPerRow = math.ceil(numRowBlocks * 1.0 / numRowBlocksPerPartition).toInt
val rows: Int,
val cols: Int,
val rowsPerPart: Int,
val colsPerPart: Int) extends Partitioner {

require(rows > 0)
require(cols > 0)
require(rowsPerPart > 0)
require(colsPerPart > 0)

private val rowPartitions = math.ceil(rows / rowsPerPart).toInt
private val colPartitions = math.ceil(cols / colsPerPart).toInt

override val numPartitions = rowPartitions * colPartitions

/**
* Returns the index of the partition the SubMatrix belongs to. Tries to achieve block wise
* partitioning.
* Returns the index of the partition the input coordinate belongs to.
*
* @param key The key for the SubMatrix. Can be its position in the grid (its column major index)
* or a tuple of three integers that are the final row index after the multiplication,
* the index of the block to multiply with, and the final column index after the
* multiplication.
* @return The index of the partition, which the SubMatrix belongs to.
* @param key The coordinate (i, j) or a tuple (i, j, k), where k is the inner index used in
* multiplication. k is ignored in computing partitions.
* @return The index of the partition, which the coordinate belongs to.
*/
override def getPartition(key: Any): Int = {
key match {
case (blockRowIndex: Int, blockColIndex: Int) =>
getPartitionId(blockRowIndex, blockColIndex)
case (blockRowIndex: Int, innerIndex: Int, blockColIndex: Int) =>
getPartitionId(blockRowIndex, blockColIndex)
case (i: Int, j: Int) =>
getPartitionId(i, j)
case (i: Int, j: Int, _: Int) =>
getPartitionId(i, j)
case _ =>
throw new IllegalArgumentException(s"Unrecognized key. key: $key")
throw new IllegalArgumentException(s"Unrecognized key: $key.")
}
}

/** Partitions sub-matrices as blocks with neighboring sub-matrices. */
private def getPartitionId(blockRowIndex: Int, blockColIndex: Int): Int = {
require(0 <= blockRowIndex && blockRowIndex < numRowBlocks, "The blockRowIndex in the key " +
s"must be in the range 0 <= blockRowIndex < numRowBlocks. blockRowIndex: $blockRowIndex," +
s"numRowBlocks: $numRowBlocks")
require(0 <= blockRowIndex && blockColIndex < numColBlocks, "The blockColIndex in the key " +
s"must be in the range 0 <= blockRowIndex < numColBlocks. blockColIndex: $blockColIndex, " +
s"numColBlocks: $numColBlocks")
// Coordinates of the block
val i = blockRowIndex / numRowBlocksPerPartition
val j = blockColIndex / numColBlocksPerPartition
// The mod shouldn't be required but is added as a guarantee for possible corner cases
Utils.nonNegativeMod(j * blocksPerRow + i, numPartitions)
private def getPartitionId(i: Int, j: Int): Int = {
require(0 <= i && i < rows, s"Row index $i out of range [0, $rows).")
require(0 <= j && j < cols, s"Column index $j out of range [0, $cols).")
i / rowsPerPart + j / colsPerPart * rowPartitions
}

/** Tries to calculate the optimal number of blocks that should be in each partition. */
private def findOptimalBlockLengths: (Int, Int) = {
// Gives the optimal number of blocks that need to be in each partition
val targetNumBlocksPerPartition = math.ceil(totalBlocks * 1.0 / numPartitions).toInt
// Number of neighboring blocks to take in each row
var m = math.ceil(math.sqrt(targetNumBlocksPerPartition)).toInt
// Number of neighboring blocks to take in each column
var n = math.ceil(targetNumBlocksPerPartition * 1.0 / m).toInt
// Try to make m and n close to each other while making sure that we don't exceed the number
// of partitions
var numBlocksForRows = math.ceil(numRowBlocks * 1.0 / m)
var numBlocksForCols = math.ceil(numColBlocks * 1.0 / n)
while ((numBlocksForRows * numBlocksForCols > numPartitions) && (m * n != 0)) {
if (numRowBlocks <= numColBlocks) {
m += 1
n = math.ceil(targetNumBlocksPerPartition * 1.0 / m).toInt
} else {
n += 1
m = math.ceil(targetNumBlocksPerPartition * 1.0 / n).toInt
}
numBlocksForRows = math.ceil(numRowBlocks * 1.0 / m)
numBlocksForCols = math.ceil(numColBlocks * 1.0 / n)
}
// If a good partitioning scheme couldn't be found, set the side with the smaller dimension to
// 1 and the other to the number of targetNumBlocksPerPartition
if (m * n == 0) {
if (numRowBlocks <= numColBlocks) {
m = 1
n = targetNumBlocksPerPartition
} else {
n = 1
m = targetNumBlocksPerPartition
}
}
(m, n)
}

/** Checks whether the partitioners have the same characteristics */
override def equals(obj: Any): Boolean = {
obj match {
case r: GridPartitioner =>
(this.numRowBlocks == r.numRowBlocks) && (this.numColBlocks == r.numColBlocks) &&
(this.numPartitions == r.numPartitions)
(this.rows == r.rows) && (this.cols == r.cols) &&
(this.rowsPerPart == r.rowsPerPart) && (this.colsPerPart == r.colsPerPart)
case _ =>
false
}
}
}

private[mllib] object GridPartitioner {

/** Creates a new [[GridPartitioner]] instance. */
def apply(rows: Int, cols: Int, rowsPerPart: Int, colsPerPart: Int): GridPartitioner = {
new GridPartitioner(rows, cols, rowsPerPart, colsPerPart)
}

/** Creates a new [[GridPartitioner]] instance with the input suggested number of partitions. */
def apply(rows: Int, cols: Int, suggestedNumPartitions: Int): GridPartitioner = {
require(suggestedNumPartitions > 0)
val scale = 1.0 / math.sqrt(suggestedNumPartitions)
val rowsPerPart = math.round(math.max(scale * rows, 1.0)).toInt
val colsPerPart = math.round(math.max(scale * cols, 1.0)).toInt
new GridPartitioner(rows, cols, rowsPerPart, colsPerPart)
}
}

/**
* Represents a distributed matrix in blocks of local matrices.
*
* @param rdd The RDD of SubMatrices (local matrices) that form this matrix
* @param nRows Number of rows of this matrix. If the supplied value is less than or equal to zero,
* the number of rows will be calculated when `numRows` is invoked.
* @param nCols Number of columns of this matrix. If the supplied value is less than or equal to
* zero, the number of columns will be calculated when `numCols` is invoked.
* @param blocks The RDD of sub-matrix blocks (blockRowIndex, blockColIndex, sub-matrix) that form
* this distributed matrix.
* @param rowsPerBlock Number of rows that make up each block. The blocks forming the final
* rows are not required to have the given number of rows
* @param colsPerBlock Number of columns that make up each block. The blocks forming the final
* columns are not required to have the given number of columns
* @param nRows Number of rows of this matrix. If the supplied value is less than or equal to zero,
* the number of rows will be calculated when `numRows` is invoked.
* @param nCols Number of columns of this matrix. If the supplied value is less than or equal to
* zero, the number of columns will be calculated when `numCols` is invoked.
*/
class BlockMatrix(
val rdd: RDD[((Int, Int), Matrix)],
private var nRows: Long,
private var nCols: Long,
val blocks: RDD[((Int, Int), Matrix)],
val rowsPerBlock: Int,
val colsPerBlock: Int) extends DistributedMatrix with Logging {
val colsPerBlock: Int,
private var nRows: Long,
private var nCols: Long) extends DistributedMatrix with Logging {

private type SubMatrix = ((Int, Int), Matrix) // ((blockRowIndex, blockColIndex), matrix)
private type MatrixBlock = ((Int, Int), Matrix) // ((blockRowIndex, blockColIndex), sub-matrix)

/**
* Alternate constructor for BlockMatrix without the input of the number of rows and columns.
Expand All @@ -172,45 +137,48 @@ class BlockMatrix(
rdd: RDD[((Int, Int), Matrix)],
rowsPerBlock: Int,
colsPerBlock: Int) = {
this(rdd, 0L, 0L, rowsPerBlock, colsPerBlock)
this(rdd, rowsPerBlock, colsPerBlock, 0L, 0L)
}

private lazy val dims: (Long, Long) = getDim

override def numRows(): Long = {
if (nRows <= 0L) nRows = dims._1
if (nRows <= 0L) estimateDim()
nRows
}

override def numCols(): Long = {
if (nCols <= 0L) nCols = dims._2
if (nCols <= 0L) estimateDim()
nCols
}

val numRowBlocks = math.ceil(numRows() * 1.0 / rowsPerBlock).toInt
val numColBlocks = math.ceil(numCols() * 1.0 / colsPerBlock).toInt

private[mllib] var partitioner: GridPartitioner =
new GridPartitioner(numRowBlocks, numColBlocks, rdd.partitions.length)

/** Returns the dimensions of the matrix. */
private def getDim: (Long, Long) = {
val (rows, cols) = rdd.map { case ((blockRowIndex, blockColIndex), mat) =>
(blockRowIndex * rowsPerBlock + mat.numRows, blockColIndex * colsPerBlock + mat.numCols)
}.reduce((x0, x1) => (math.max(x0._1, x1._1), math.max(x0._2, x1._2)))

(math.max(rows, nRows), math.max(cols, nCols))
GridPartitioner(numRowBlocks, numColBlocks, suggestedNumPartitions = blocks.partitions.size)

/** Estimates the dimensions of the matrix. */
private def estimateDim(): Unit = {
val (rows, cols) = blocks.map { case ((blockRowIndex, blockColIndex), mat) =>
(blockRowIndex.toLong * rowsPerBlock + mat.numRows,
blockColIndex.toLong * colsPerBlock + mat.numCols)
}.reduce { (x0, x1) =>
(math.max(x0._1, x1._1), math.max(x0._2, x1._2))
}
if (nRows <= 0L) nRows = rows
assert(rows <= nRows, s"The number of rows $rows is more than claimed $nRows.")
if (nCols <= 0L) nCols = cols
assert(cols <= nCols, s"The number of columns $cols is more than claimed $nCols.")
}

/** Cache the underlying RDD. */
def cache(): BlockMatrix = {
rdd.cache()
/** Caches the underlying RDD. */
def cache(): this.type = {
blocks.cache()
this
}

/** Set the storage level for the underlying RDD. */
def persist(storageLevel: StorageLevel): BlockMatrix = {
rdd.persist(storageLevel)
/** Persists the underlying RDD with the specified storage level. */
def persist(storageLevel: StorageLevel): this.type = {
blocks.persist(storageLevel)
this
}

Expand All @@ -222,22 +190,22 @@ class BlockMatrix(
s"Int.MaxValue. Currently numCols: ${numCols()}")
require(numRows() * numCols() < Int.MaxValue, "The length of the values array must be " +
s"less than Int.MaxValue. Currently numRows * numCols: ${numRows() * numCols()}")
val nRows = numRows().toInt
val nCols = numCols().toInt
val mem = nRows * nCols / 125000
val m = numRows().toInt
val n = numCols().toInt
val mem = m * n / 125000
if (mem > 500) logWarning(s"Storing this matrix will require $mem MB of memory!")

val parts = rdd.collect()
val values = new Array[Double](nRows * nCols)
parts.foreach { case ((blockRowIndex, blockColIndex), block) =>
val localBlocks = blocks.collect()
val values = new Array[Double](m * n)
localBlocks.foreach { case ((blockRowIndex, blockColIndex), submat) =>
val rowOffset = blockRowIndex * rowsPerBlock
val colOffset = blockColIndex * colsPerBlock
block.foreachActive { (i, j, v) =>
val indexOffset = (j + colOffset) * nRows + rowOffset + i
submat.foreachActive { (i, j, v) =>
val indexOffset = (j + colOffset) * m + rowOffset + i
values(indexOffset) = v
}
}
new DenseMatrix(nRows, nCols, values)
new DenseMatrix(m, n, values)
}

/** Collects data and assembles a local dense breeze matrix (for test only). */
Expand Down
Loading

0 comments on commit a8eace2

Please sign in to comment.