Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
b693209
Ready for Pull request
Nov 11, 2014
f378e16
[SPARK-3974] Block Matrix Abstractions ready
Nov 11, 2014
aa8f086
[SPARK-3974] Additional comments added
Nov 11, 2014
589fbb6
[SPARK-3974] Code review feedback addressed
Nov 14, 2014
19c17e8
[SPARK-3974] Changed blockIdRow and blockIdCol
Nov 14, 2014
b05aabb
[SPARK-3974] Updated tests to reflect changes
brkyvz Nov 14, 2014
645afbe
[SPARK-3974] Pull latest master
brkyvz Nov 14, 2014
49b9586
[SPARK-3974] Updated testing utils from master
brkyvz Nov 14, 2014
d033861
[SPARK-3974] Removed SubMatrixInfo and added constructor without part…
brkyvz Nov 15, 2014
9ae85aa
[SPARK-3974] Made partitioner a variable inside BlockMatrix instead o…
brkyvz Nov 20, 2014
ab6cde0
[SPARK-3974] Modifications cleaning code up, making size calculation …
brkyvz Jan 14, 2015
ba414d2
[SPARK-3974] fixed frobenius norm
brkyvz Jan 14, 2015
239ab4b
[SPARK-3974] Addressed @jkbradley's comments
brkyvz Jan 19, 2015
1e8bb2a
[SPARK-3974] Change return type of cache and persist
brkyvz Jan 20, 2015
1a63b20
[SPARK-3974] Remove setPartition method. Isn't required
brkyvz Jan 20, 2015
eebbdf7
preliminary changes addressing code review
brkyvz Jan 21, 2015
f9d664b
updated API and modified partitioning scheme
brkyvz Jan 21, 2015
1694c9e
almost finished addressing comments
brkyvz Jan 27, 2015
140f20e
Merge branch 'master' of github.com:apache/spark into SPARK-3974
brkyvz Jan 27, 2015
5eecd48
fixed gridPartitioner and added tests
brkyvz Jan 27, 2015
24ec7b8
update grid partitioner
mengxr Jan 28, 2015
e1d3ee8
minor updates
mengxr Jan 28, 2015
feb32a7
update tests
mengxr Jan 28, 2015
a8eace2
Merge pull request #2 from mengxr/brkyvz-SPARK-3974
brkyvz Jan 28, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-3974] Removed SubMatrixInfo and added constructor without part…
…itioner
  • Loading branch information
brkyvz committed Nov 15, 2014
commit d033861d5a2f88b223f601feb4445308399901e8
Original file line number Diff line number Diff line change
Expand Up @@ -21,47 +21,28 @@ import breeze.linalg.{DenseMatrix => BDM}

import org.apache.spark._
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try to be more explicit on the imports.

import org.apache.spark.mllib.linalg.DenseMatrix
import org.apache.spark.mllib.rdd.RDDFunctions._
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not necessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is treeAggregate in getDim

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I didn't see mllib ...

import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils

/**
* Represents a local matrix that makes up one block of a distributed BlockMatrix
*
* @param blockRowIndex The row index of this block
* @param blockColIndex The column index of this block
* @param blockRowIndex The row index of this block. Must be zero based.
* @param blockColIndex The column index of this block. Must be zero based.
* @param mat The underlying local matrix
*/
case class SubMatrix(blockRowIndex: Int, blockColIndex: Int, mat: DenseMatrix) extends Serializable

/**
* Information of the submatrices of the BlockMatrix maintained on the driver
*
* @param partitionId The id of the partition the block is found in
* @param blockRowIndex The row index of this block
* @param blockColIndex The column index of this block
* @param startRow The starting row index with respect to the distributed BlockMatrix
* @param numRows The number of rows in this block
* @param startCol The starting column index with respect to the distributed BlockMatrix
* @param numCols The number of columns in this block
*/
case class SubMatrixInfo(
partitionId: Int,
blockRowIndex: Int,
blockColIndex: Int,
startRow: Long,
numRows: Int,
startCol: Long,
numCols: Int) extends Serializable

/**
* A partitioner that decides how the matrix is distributed in the cluster
*
* @param numPartitions Number of partitions
* @param rowPerBlock Number of rows that make up each block.
* @param colPerBlock Number of columns that make up each block.
*/
abstract class BlockMatrixPartitioner(
private[mllib] abstract class BlockMatrixPartitioner(
override val numPartitions: Int,
val rowPerBlock: Int,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename "rowPerBlock" --> "rowsPerBlock"? Same for "colPerBlock" and in other places in this file.

val colPerBlock: Int) extends Partitioner {
Expand Down Expand Up @@ -173,20 +154,30 @@ class ColumnBasedPartitioner(
*
* @param numRowBlocks Number of blocks that form the rows of this matrix
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could it be derived from nRows and rowsPerBlock? Having nRows, numRowBlocks, and rowsPerBlock would leave space for inconsistent inputs.

* @param numColBlocks Number of blocks that form the columns of this matrix
* @param rdd The RDD of SubMatrixs (local matrices) that form this matrix
* @param partitioner A partitioner that specifies how SubMatrixs are stored in the cluster
* @param rdd The RDD of SubMatrices (local matrices) that form this matrix
* @param partitioner A partitioner that specifies how SubMatrices are stored in the cluster
*/
class BlockMatrix(
val numRowBlocks: Int,
val numColBlocks: Int,
val rdd: RDD[SubMatrix],
val partitioner: BlockMatrixPartitioner) extends DistributedMatrix with Logging {

/**
* Alternate constructor for BlockMatrix without the input of a partitioner. Will use a Grid
* Partitioner by default.
*
* @param numRowBlocks Number of blocks that form the rows of this matrix
* @param numColBlocks Number of blocks that form the columns of this matrix
* @param rdd The RDD of SubMatrices (local matrices) that form this matrix
*/
def this(numRowBlocks: Int, numColBlocks: Int, rdd: RDD[SubMatrix]) = {
this(numRowBlocks, numColBlocks, rdd, new GridPartitioner(numRowBlocks, numColBlocks,
rdd.first().mat.numRows, rdd.first().mat.numCols))
}
// A key-value pair RDD is required to partition properly
private var matrixRDD: RDD[(Int, SubMatrix)] = keyBy()

@transient var blockInfo_ : Map[(Int, Int), SubMatrixInfo] = null

private lazy val dims: (Long, Long) = getDim

override def numRows(): Long = dims._1
Expand All @@ -211,55 +202,26 @@ class BlockMatrix(

/** Returns the dimensions of the matrix. */
def getDim: (Long, Long) = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be private?

val bi = getBlockInfo
val xDim = bi.map { x =>
(x._1._1, x._2.numRows.toLong)
}.groupBy(x => x._1).values.map(_.head._2.toLong).reduceLeft(_ + _)

val yDim = bi.map { x =>
(x._1._2, x._2.numCols.toLong)
}.groupBy(x => x._1).values.map(_.head._2.toLong).reduceLeft(_ + _)

(xDim, yDim)
}

/** Calculates the information for each block and collects it on the driver */
private def calculateBlockInfo(): Unit = {
// collect may cause akka frameSize errors
val blockStartRowColsParts = matrixRDD.mapPartitionsWithIndex { case (partId, iter) =>
iter.map { case (id, block) =>
((block.blockRowIndex, block.blockColIndex), (partId, block.mat.numRows, block.mat.numCols))
val firstRowColumn = rdd.filter(block => block.blockRowIndex == 0 || block.blockColIndex == 0).
map { block =>
((block.blockRowIndex, block.blockColIndex), (block.mat.numRows, block.mat.numCols))
}
}.collect()
val blockStartRowCols = blockStartRowColsParts.sortBy(_._1)

// Group blockInfo by rowId, pick the first row and sort on rowId
val rowReps = blockStartRowCols.groupBy(_._1._1).values.map(_.head).toSeq.sortBy(_._1._1)

// Group blockInfo by columnId, pick the first column and sort on columnId
val colReps = blockStartRowCols.groupBy(_._1._2).values.map(_.head).toSeq.sortBy(_._1._2)

// Calculate startRows
val cumulativeRowSum = rowReps.scanLeft((0, 0L)) { case (x1, x2) =>
(x1._1 + 1, x1._2 + x2._2._2)
}.toMap

val cumulativeColSum = colReps.scanLeft((0, 0L)) { case (x1, x2) =>
(x1._1 + 1, x1._2 + x2._2._3)
}.toMap

blockInfo_ = blockStartRowCols.map{ case ((rowId, colId), (partId, numRow, numCol)) =>
((rowId, colId), new SubMatrixInfo(partId, rowId, colId, cumulativeRowSum(rowId),
numRow, cumulativeColSum(colId), numCol))
}.toMap
}

/** Returns a map of the information of the blocks that form the distributed matrix. */
def getBlockInfo: Map[(Int, Int), SubMatrixInfo] = {
if (blockInfo_ == null) {
calculateBlockInfo()
}
blockInfo_
firstRowColumn.treeAggregate((0L, 0L))(
seqOp = (c, v) => (c, v) match { case ((x_dim, y_dim), ((indX, indY), (nRow, nCol))) =>
if (indX == 0 && indY == 0) {
(x_dim + nRow, y_dim + nCol)
} else if (indX == 0) {
(x_dim, y_dim + nCol)
} else {
(x_dim + nRow, y_dim)
}
},
combOp = (c1, c2) => (c1, c2) match {
case ((x_dim1, y_dim1), (x_dim2, y_dim2)) =>
(x_dim1 + x_dim2, y_dim1 + y_dim2)
})
}

/** Returns the Frobenius Norm of the matrix */
Expand Down Expand Up @@ -309,23 +271,26 @@ class BlockMatrix(
val nRows = numRows().toInt
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd check numRows and numCols here before converting to Int and throw an error if the matrix is too large.

val nCols = numCols().toInt
val values = new Array[Double](nRows * nCols)
val blockInfos = getBlockInfo
var rowStart = 0
var colStart = 0
parts.foreach { part =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use a match statement so you don't have to use the ._1._1 style of indexing

val blockInfo = blockInfos((part._1._1, part._1._2))
// Figure out where this part should be put
if (part._1._1 == 0) rowStart = 0
val block = part._2
var j = 0
while (j < blockInfo.numCols) {
while (j < block.numCols) {
var i = 0
val indStart = (j + blockInfo.startCol.toInt) * nRows + blockInfo.startRow.toInt
val indEnd = blockInfo.numRows
val matStart = j * blockInfo.numRows
val mat = part._2.values
val indStart = (j + colStart) * nRows + rowStart
val indEnd = block.numRows
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"indEnd" --> "iEnd" since it matches with i, not indStart

val matStart = j * block.numRows
val mat = block.values
while (i < indEnd) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be shortened with an Array.copy call, but either is OK with me.

values(indStart + i) = mat(matStart + i)
i += 1
}
j += 1
}
rowStart += block.numRows
if (part._1._1 == numRowBlocks - 1) colStart += block.numCols
}
new DenseMatrix(nRows, nCols, values)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,13 @@ class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext {

val colPart = new ColumnBasedPartitioner(numColBlocks, rowPerPart, colPerPart)
val rowPart = new RowBasedPartitioner(numRowBlocks, rowPerPart, colPerPart)
val gridPart = new GridPartitioner(numRowBlocks, numColBlocks, rowPerPart, colPerPart)

colBasedMat =
new BlockMatrix(numRowBlocks, numColBlocks, sc.parallelize(entries, numColBlocks), colPart)
rowBasedMat =
new BlockMatrix(numRowBlocks, numColBlocks, sc.parallelize(entries, numRowBlocks), rowPart)
gridBasedMat =
new BlockMatrix(numRowBlocks, numColBlocks,
sc.parallelize(entries, numRowBlocks * numColBlocks), gridPart)
gridBasedMat = new BlockMatrix(numRowBlocks, numColBlocks,
sc.parallelize(entries, numRowBlocks * numColBlocks))
}

test("size") {
Expand Down Expand Up @@ -84,37 +82,4 @@ class BlockMatrixSuite extends FunSuite with MLlibTestSparkContext {
assert(rowBasedMat.collect() === dense)
assert(gridBasedMat.collect() === dense)
}

test("blockInfo") {
val colMatInfo = colBasedMat.getBlockInfo
val rowMatInfo = rowBasedMat.getBlockInfo
val gridMatInfo = gridBasedMat.getBlockInfo

assert(colMatInfo((0, 1)).numRows === 2)
assert(colMatInfo((0, 1)).numCols === 2)
assert(colMatInfo((0, 1)).startRow === 0)
assert(colMatInfo((0, 1)).startCol === 2)
assert(colMatInfo((2, 0)).numRows === 1)
assert(colMatInfo((2, 0)).numCols === 2)
assert(colMatInfo((2, 0)).startRow === 4)
assert(colMatInfo((2, 0)).startCol === 0)

assert(rowMatInfo((0, 1)).numRows === 2)
assert(rowMatInfo((0, 1)).numCols === 2)
assert(rowMatInfo((0, 1)).startRow === 0)
assert(rowMatInfo((0, 1)).startCol === 2)
assert(rowMatInfo((2, 0)).numRows === 1)
assert(rowMatInfo((2, 0)).numCols === 2)
assert(rowMatInfo((2, 0)).startRow === 4)
assert(rowMatInfo((2, 0)).startCol === 0)

assert(gridMatInfo((0, 1)).numRows === 2)
assert(gridMatInfo((0, 1)).numCols === 2)
assert(gridMatInfo((0, 1)).startRow === 0)
assert(gridMatInfo((0, 1)).startCol === 2)
assert(gridMatInfo((2, 0)).numRows === 1)
assert(gridMatInfo((2, 0)).numCols === 2)
assert(gridMatInfo((2, 0)).startRow === 4)
assert(gridMatInfo((2, 0)).startCol === 0)
}
}