Skip to content
Closed
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
b693209
Ready for Pull request
Nov 11, 2014
f378e16
[SPARK-3974] Block Matrix Abstractions ready
Nov 11, 2014
aa8f086
[SPARK-3974] Additional comments added
Nov 11, 2014
589fbb6
[SPARK-3974] Code review feedback addressed
Nov 14, 2014
19c17e8
[SPARK-3974] Changed blockIdRow and blockIdCol
Nov 14, 2014
b05aabb
[SPARK-3974] Updated tests to reflect changes
brkyvz Nov 14, 2014
645afbe
[SPARK-3974] Pull latest master
brkyvz Nov 14, 2014
49b9586
[SPARK-3974] Updated testing utils from master
brkyvz Nov 14, 2014
d033861
[SPARK-3974] Removed SubMatrixInfo and added constructor without part…
brkyvz Nov 15, 2014
9ae85aa
[SPARK-3974] Made partitioner a variable inside BlockMatrix instead o…
brkyvz Nov 20, 2014
ab6cde0
[SPARK-3974] Modifications cleaning code up, making size calculation …
brkyvz Jan 14, 2015
ba414d2
[SPARK-3974] fixed frobenius norm
brkyvz Jan 14, 2015
239ab4b
[SPARK-3974] Addressed @jkbradley's comments
brkyvz Jan 19, 2015
1e8bb2a
[SPARK-3974] Change return type of cache and persist
brkyvz Jan 20, 2015
1a63b20
[SPARK-3974] Remove setPartition method. Isn't required
brkyvz Jan 20, 2015
eebbdf7
preliminary changes addressing code review
brkyvz Jan 21, 2015
f9d664b
updated API and modified partitioning scheme
brkyvz Jan 21, 2015
1694c9e
almost finished addressing comments
brkyvz Jan 27, 2015
140f20e
Merge branch 'master' of github.com:apache/spark into SPARK-3974
brkyvz Jan 27, 2015
5eecd48
fixed gridPartitioner and added tests
brkyvz Jan 27, 2015
24ec7b8
update grid partitioner
mengxr Jan 28, 2015
e1d3ee8
minor updates
mengxr Jan 28, 2015
feb32a7
update tests
mengxr Jan 28, 2015
a8eace2
Merge pull request #2 from mengxr/brkyvz-SPARK-3974
brkyvz Jan 28, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-3974] Block Matrix Abstractions ready
  • Loading branch information
Burak Yavuz committed Nov 11, 2014
commit f378e163b04dad88f6e4fe309e45a5a632aa4101
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,32 @@ package org.apache.spark.mllib.linalg.distributed
import breeze.linalg.{DenseMatrix => BDM}

import org.apache.spark._
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try to be more explicit on the imports.

import org.apache.spark.mllib.linalg.{DenseMatrix, Matrices }
import org.apache.spark.mllib.linalg.DenseMatrix
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils

case class BlockPartition(
blockIdRow: Int,
blockIdCol: Int,
mat: DenseMatrix) extends Serializable
/**
* Represents a local matrix that makes up one block of a distributed BlockMatrix
*
* @param blockIdRow The row index of this block
* @param blockIdCol The column index of this block
* @param mat The underlying local matrix
*/
case class BlockPartition(blockIdRow: Int, blockIdCol: Int, mat: DenseMatrix) extends Serializable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name BlockPartition is a little confusing. Is it a partition? Do we allow multiple blocks per partition? If this is just talking about a block, we can call it Submatrix (see: http://en.wikipedia.org/wiki/Block_matrix).

Should the name be blockRowIndex instead of blockIdRow? Id is not the same as Index.


// Information about BlockMatrix maintained on the driver
/**
* Information about the BlockMatrix maintained on the driver
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Information about a submatrix of a block matrix."

*
* @param partitionId The id of the partition the block is found in
* @param blockIdRow The row index of this block
* @param blockIdCol The column index of this block
* @param startRow The starting row index with respect to the distributed BlockMatrix
* @param numRows The number of rows in this block
* @param startCol The starting column index with respect to the distributed BlockMatrix
* @param numCols The number of columns in this block
*/
case class BlockPartitionInfo(
partitionId: Int,
blockIdRow: Int,
Expand All @@ -41,6 +55,13 @@ case class BlockPartitionInfo(
startCol: Long,
numCols: Int) extends Serializable

/**
* A partitioner that decides how the matrix is distributed in the cluster
*
* @param numPartitions Number of partitions
* @param rowPerBlock Number of rows that make up each block.
* @param colPerBlock Number of columns that make up each block.
*/
abstract class BlockMatrixPartitioner(
override val numPartitions: Int,
val rowPerBlock: Int,
Expand All @@ -52,6 +73,14 @@ abstract class BlockMatrixPartitioner(
}
}

/**
* A grid partitioner, which stores every block in a separate partition.
*
* @param numRowBlocks Number of blocks that form the rows of the matrix.
* @param numColBlocks Number of blocks that form the columns of the matrix.
* @param rowPerBlock Number of rows that make up each block.
* @param colPerBlock Number of columns that make up each block.
*/
class GridPartitioner(
val numRowBlocks: Int,
val numColBlocks: Int,
Expand All @@ -74,6 +103,14 @@ class GridPartitioner(
}
}

/**
* A specialized partitioner that stores all blocks in the same row in just one partition.
*
* @param numPartitions Number of partitions. Should be set as the number of blocks that form
* the rows of the matrix.
* @param rowPerBlock Number of rows that make up each block.
* @param colPerBlock Number of columns that make up each block.
*/
class RowBasedPartitioner(
override val numPartitions: Int,
override val rowPerBlock: Int,
Expand All @@ -93,6 +130,14 @@ class RowBasedPartitioner(
}
}

/**
* A specialized partitioner that stores all blocks in the same column in just one partition.
*
* @param numPartitions Number of partitions. Should be set as the number of blocks that form
* the columns of the matrix.
* @param rowPerBlock Number of rows that make up each block.
* @param colPerBlock Number of columns that make up each block.
*/
class ColumnBasedPartitioner(
override val numPartitions: Int,
override val rowPerBlock: Int,
Expand All @@ -114,39 +159,44 @@ class ColumnBasedPartitioner(
}
}

/**
* Represents a distributed matrix in blocks of local matrices.
*
* @param numRowBlocks Number of blocks that form the rows of this matrix
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could it be derived from nRows and rowsPerBlock? Having nRows, numRowBlocks, and rowsPerBlock would leave space for inconsistent inputs.

* @param numColBlocks Number of blocks that form the columns of this matrix
* @param rdd The RDD of BlockPartitions (local matrices) that form this matrix
* @param partitioner A partitioner that specifies how BlockPartitions are stored in the cluster
*/
class BlockMatrix(
val numRowBlocks: Int,
val numColBlocks: Int,
val rdd: RDD[BlockPartition],
val partitioner: BlockMatrixPartitioner) extends DistributedMatrix with Logging {

// We need a key-value pair RDD to partition properly
private var matrixRDD = rdd.map { block =>
partitioner match {
case r: RowBasedPartitioner => (block.blockIdRow, block)
case c: ColumnBasedPartitioner => (block.blockIdCol, block)
case g: GridPartitioner => (block.blockIdRow + numRowBlocks * block.blockIdCol, block)
case _ => throw new IllegalArgumentException("Unrecognized partitioner")
}
}
// A key-value pair RDD is required to partition properly
private var matrixRDD: RDD[(Int, BlockPartition)] = keyBy()

@transient var blockInfo_ : Map[(Int, Int), BlockPartitionInfo] = null

lazy val dims: (Long, Long) = getDim
private lazy val dims: (Long, Long) = getDim

override def numRows(): Long = dims._1
override def numCols(): Long = dims._2

if (partitioner.name.equals("column")) {
require(numColBlocks == partitioner.numPartitions)
require(numColBlocks == partitioner.numPartitions, "The number of column blocks should match" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Output the non-equal parameters here?

" the number of partitions of the column partitioner.")
} else if (partitioner.name.equals("row")) {
require(numRowBlocks == partitioner.numPartitions)
require(numRowBlocks == partitioner.numPartitions, "The number of row blocks should match" +
" the number of partitions of the row partitioner.")
} else if (partitioner.name.equals("grid")) {
require(numRowBlocks * numColBlocks == partitioner.numPartitions)
require(numRowBlocks * numColBlocks == partitioner.numPartitions, "The number of blocks " +
"should match the number of partitions of the grid partitioner.")
} else {
throw new IllegalArgumentException("Unrecognized partitioner.")
}

/* Returns the dimensions of the matrix. */
def getDim: (Long, Long) = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be private?

val bi = getBlockInfo
val xDim = bi.map { x =>
Expand Down Expand Up @@ -194,18 +244,20 @@ class BlockMatrix(
}.toMap

blockInfo_ = blockStartRowCols.map{ case ((rowId, colId), (partId, numRow, numCol)) =>
((rowId, colId), new BlockPartitionInfo(partId, rowId, colId, cumulativeRowSum(rowId), numRow,
cumulativeColSum(colId), numCol))
((rowId, colId), new BlockPartitionInfo(partId, rowId, colId, cumulativeRowSum(rowId),
numRow, cumulativeColSum(colId), numCol))
}.toMap
}

/* Returns a map of the information of the blocks that form the distributed matrix. */
def getBlockInfo: Map[(Int, Int), BlockPartitionInfo] = {
if (blockInfo_ == null) {
calculateBlockInfo()
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: indentation

blockInfo_
}

/* Returns the Frobenius Norm of the matrix */
def normFro(): Double = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this function. We can add it back later.

math.sqrt(rdd.map(lm => lm.mat.values.map(x => math.pow(x, 2)).sum).reduce(_ + _))
}
Expand All @@ -222,8 +274,19 @@ class BlockMatrix(
this
}

private def keyBy(part: BlockMatrixPartitioner = partitioner): RDD[(Int, BlockPartition)] = {
rdd.map { block =>
part match {
case r: RowBasedPartitioner => (block.blockIdRow, block)
case c: ColumnBasedPartitioner => (block.blockIdCol, block)
case g: GridPartitioner => (block.blockIdRow + numRowBlocks * block.blockIdCol, block)
case _ => throw new IllegalArgumentException("Unrecognized partitioner")
}
}
}

def repartition(part: BlockMatrixPartitioner = partitioner): DistributedMatrix = {
matrixRDD = matrixRDD.partitionBy(part)
matrixRDD = keyBy(part)
this
}

Expand Down Expand Up @@ -259,80 +322,4 @@ class BlockMatrix(
val localMat = collect()
new BDM[Double](localMat.numRows, localMat.numCols, localMat.values)
}

def add(other: DistributedMatrix): DistributedMatrix = {
other match {
// We really need a function to check if two matrices are partitioned similarly
case otherBlocked: BlockMatrix =>
if (checkPartitioning(otherBlocked, OperationNames.add)){
val addedBlocks = rdd.zip(otherBlocked.rdd).map{ case (a, b) =>
val result = a.mat.toBreeze + b.mat.toBreeze
new BlockPartition(a.blockIdRow, a.blockIdCol,
Matrices.fromBreeze(result).asInstanceOf[DenseMatrix])
}
new BlockMatrix(numRowBlocks, numColBlocks, addedBlocks, partitioner)
} else {
throw new SparkException(
"Cannot add matrices with non-matching partitioners")
}
case _ =>
throw new IllegalArgumentException("Cannot add matrices of different types")
}
}

def multiply(other: DistributedMatrix): BlockMatrix = {
other match {
case otherBlocked: BlockMatrix =>
if (checkPartitioning(otherBlocked, OperationNames.multiply)){

val resultPartitioner = new GridPartitioner(numRowBlocks, otherBlocked.numColBlocks,
partitioner.rowPerBlock, otherBlocked.partitioner.colPerBlock)

val multiplyBlocks = matrixRDD.join(otherBlocked.matrixRDD, partitioner).
map { case (key, (mat1, mat2)) =>
val C = mat1.mat multiply mat2.mat
(mat1.blockIdRow + numRowBlocks * mat2.blockIdCol, C.toBreeze)
}.reduceByKey(resultPartitioner, (a, b) => a + b)

val newBlocks = multiplyBlocks.map{ case (index, mat) =>
val colId = index / numRowBlocks
val rowId = index - colId * numRowBlocks
new BlockPartition(rowId, colId, Matrices.fromBreeze(mat).asInstanceOf[DenseMatrix])
}
new BlockMatrix(numRowBlocks, otherBlocked.numColBlocks, newBlocks, resultPartitioner)
} else {
throw new SparkException(
"Cannot multiply matrices with non-matching partitioners")
}
case _ =>
throw new IllegalArgumentException("Cannot add matrices of different types")
}
}

private def checkPartitioning(other: BlockMatrix, operation: Int): Boolean = {
val otherPartitioner = other.partitioner
operation match {
case OperationNames.add =>
partitioner.equals(otherPartitioner)
case OperationNames.multiply =>
partitioner.name == "column" && otherPartitioner.name == "row" &&
partitioner.numPartitions == otherPartitioner.numPartitions &&
partitioner.colPerBlock == otherPartitioner.rowPerBlock &&
numColBlocks == other.numRowBlocks
case _ =>
throw new IllegalArgumentException("Unsupported operation")
}
}
}

/**
* Maintains supported and default block matrix operation names.
*
* Currently supported operations: `add`, `multiply`.
*/
private object OperationNames {

val add: Int = 1
val multiply: Int = 2

}