Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
b693209
Ready for Pull request
Nov 11, 2014
f378e16
[SPARK-3974] Block Matrix Abstractions ready
Nov 11, 2014
aa8f086
[SPARK-3974] Additional comments added
Nov 11, 2014
589fbb6
[SPARK-3974] Code review feedback addressed
Nov 14, 2014
19c17e8
[SPARK-3974] Changed blockIdRow and blockIdCol
Nov 14, 2014
b05aabb
[SPARK-3974] Updated tests to reflect changes
brkyvz Nov 14, 2014
645afbe
[SPARK-3974] Pull latest master
brkyvz Nov 14, 2014
49b9586
[SPARK-3974] Updated testing utils from master
brkyvz Nov 14, 2014
d033861
[SPARK-3974] Removed SubMatrixInfo and added constructor without part…
brkyvz Nov 15, 2014
9ae85aa
[SPARK-3974] Made partitioner a variable inside BlockMatrix instead o…
brkyvz Nov 20, 2014
ab6cde0
[SPARK-3974] Modifications cleaning code up, making size calculation …
brkyvz Jan 14, 2015
ba414d2
[SPARK-3974] fixed frobenius norm
brkyvz Jan 14, 2015
239ab4b
[SPARK-3974] Addressed @jkbradley's comments
brkyvz Jan 19, 2015
1e8bb2a
[SPARK-3974] Change return type of cache and persist
brkyvz Jan 20, 2015
1a63b20
[SPARK-3974] Remove setPartition method. Isn't required
brkyvz Jan 20, 2015
eebbdf7
preliminary changes addressing code review
brkyvz Jan 21, 2015
f9d664b
updated API and modified partitioning scheme
brkyvz Jan 21, 2015
1694c9e
almost finished addressing comments
brkyvz Jan 27, 2015
140f20e
Merge branch 'master' of github.com:apache/spark into SPARK-3974
brkyvz Jan 27, 2015
5eecd48
fixed gridPartitioner and added tests
brkyvz Jan 27, 2015
24ec7b8
update grid partitioner
mengxr Jan 28, 2015
e1d3ee8
minor updates
mengxr Jan 28, 2015
feb32a7
update tests
mengxr Jan 28, 2015
a8eace2
Merge pull request #2 from mengxr/brkyvz-SPARK-3974
brkyvz Jan 28, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-3974] Modifications cleaning code up, making size calculation …
…more robust
  • Loading branch information
brkyvz committed Jan 14, 2015
commit ab6cde0d90b917e89f97c86ef3c84dcdc64a9b57
Original file line number Diff line number Diff line change
Expand Up @@ -20,97 +20,51 @@ package org.apache.spark.mllib.linalg.distributed
import breeze.linalg.{DenseMatrix => BDM}

import org.apache.spark._
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try to be more explicit on the imports.

import org.apache.spark.mllib.linalg.DenseMatrix
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.rdd.RDDFunctions._
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not necessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is treeAggregate in getDim

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I didn't see mllib ...

import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils

/**
* Represents a local matrix that makes up one block of a distributed BlockMatrix
*
* @param blockRowIndex The row index of this block. Must be zero based.
* @param blockColIndex The column index of this block. Must be zero based.
* @param mat The underlying local matrix
*/
case class SubMatrix(blockRowIndex: Int, blockColIndex: Int, mat: DenseMatrix) extends Serializable

/**
* A partitioner that decides how the matrix is distributed in the cluster
* A grid partitioner, which stores every block in a separate partition.
*
* @param numPartitions Number of partitions
* @param numRowBlocks Number of blocks that form the rows of the matrix.
* @param numColBlocks Number of blocks that form the columns of the matrix.
* @param rowPerBlock Number of rows that make up each block.
* @param colPerBlock Number of columns that make up each block.
*/
private[mllib] abstract class BlockMatrixPartitioner(
override val numPartitions: Int,
private[mllib] class GridPartitioner(
val numRowBlocks: Int,
val numColBlocks: Int,
val rowPerBlock: Int,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename "rowPerBlock" --> "rowsPerBlock"? Same for "colPerBlock" and in other places in this file.

val colPerBlock: Int) extends Partitioner {
val name: String
val colPerBlock: Int,
override val numPartitions: Int) extends Partitioner {

/**
* Returns the index of the partition the SubMatrix belongs to.
*
* @param key The key for the SubMatrix. Can be its row index, column index or position in the
* grid.
* @param key The key for the SubMatrix. Can be its position in the grid (its column major index)
* or a tuple of three integers that are the final row index after the multiplication,
* the index of the block to multiply with, and the final column index after the
* multiplication.
* @return The index of the partition, which the SubMatrix belongs to.
*/
override def getPartition(key: Any): Int = {
Utils.nonNegativeMod(key.asInstanceOf[Int], numPartitions)
}
}

/**
* A grid partitioner, which stores every block in a separate partition.
*
* @param numRowBlocks Number of blocks that form the rows of the matrix.
* @param numColBlocks Number of blocks that form the columns of the matrix.
* @param rowPerBlock Number of rows that make up each block.
* @param colPerBlock Number of columns that make up each block.
*/
class GridPartitioner(
val numRowBlocks: Int,
val numColBlocks: Int,
override val rowPerBlock: Int,
override val colPerBlock: Int)
extends BlockMatrixPartitioner(numRowBlocks * numColBlocks, rowPerBlock, colPerBlock) {

override val name = "grid"

override val numPartitions = numRowBlocks * numColBlocks

/** Checks whether the partitioners have the same characteristics */
override def equals(obj: Any): Boolean = {
obj match {
case r: GridPartitioner =>
(this.numPartitions == r.numPartitions) && (this.rowPerBlock == r.rowPerBlock) &&
(this.colPerBlock == r.colPerBlock)
key match {
case ind: (Int, Int) =>
Utils.nonNegativeMod(ind._1 + ind._2 * numRowBlocks, numPartitions)
case indices: (Int, Int, Int) =>
Utils.nonNegativeMod(indices._1 + indices._3 * numRowBlocks, numPartitions)
case _ =>
false
throw new IllegalArgumentException("Unrecognized key")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put the key inside the error message.

}
}
}

/**
* A specialized partitioner that stores all blocks in the same row in just one partition.
*
* @param numPartitions Number of partitions. Should be set as the number of blocks that form
* the rows of the matrix.
* @param rowPerBlock Number of rows that make up each block.
* @param colPerBlock Number of columns that make up each block.
*/
class RowBasedPartitioner(
override val numPartitions: Int,
override val rowPerBlock: Int,
override val colPerBlock: Int)
extends BlockMatrixPartitioner(numPartitions, rowPerBlock, colPerBlock) {

override val name = "row"

/** Checks whether the partitioners have the same characteristics */
override def equals(obj: Any): Boolean = {
obj match {
case r: RowBasedPartitioner =>
case r: GridPartitioner =>
(this.numPartitions == r.numPartitions) && (this.rowPerBlock == r.rowPerBlock) &&
(this.colPerBlock == r.colPerBlock)
case _ =>
Expand All @@ -119,36 +73,6 @@ class RowBasedPartitioner(
}
}

/**
* A specialized partitioner that stores all blocks in the same column in just one partition.
*
* @param numPartitions Number of partitions. Should be set as the number of blocks that form
* the columns of the matrix.
* @param rowPerBlock Number of rows that make up each block.
* @param colPerBlock Number of columns that make up each block.
*/
class ColumnBasedPartitioner(
override val numPartitions: Int,
override val rowPerBlock: Int,
override val colPerBlock: Int)
extends BlockMatrixPartitioner(numPartitions, rowPerBlock, colPerBlock) {

override val name = "column"

/** Checks whether the partitioners have the same characteristics */
override def equals(obj: Any): Boolean = {
obj match {
case p: ColumnBasedPartitioner =>
(this.numPartitions == p.numPartitions) && (this.rowPerBlock == p.rowPerBlock) &&
(this.colPerBlock == p.colPerBlock)
case r: RowBasedPartitioner =>
(this.numPartitions == r.numPartitions) && (this.colPerBlock == r.rowPerBlock)
case _ =>
false
}
}
}

/**
* Represents a distributed matrix in blocks of local matrices.
*
Expand All @@ -159,7 +83,9 @@ class ColumnBasedPartitioner(
class BlockMatrix(
val numRowBlocks: Int,
val numColBlocks: Int,
val rdd: RDD[SubMatrix]) extends DistributedMatrix with Logging {
val rdd: RDD[((Int, Int), Matrix)]) extends DistributedMatrix with Logging {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brkyvz @rezazadeh I'm wondering why you are using (Int,Int) as the block index, as opposed to a class that implements hashCode() to partition the matrix in lexicographic order. Then, the call rdd.partitionBy(new HashPartitioner(numRowBlocks*numColBlocks)) will assure the same partitioning between two distributed matrices. When performing addition or elementwise-multiplication with other distributed matrices, the operation A_ij + B_ij would be local without any shuffle.
Could you please explain the use of (Int,Int) over an Index class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @mbhynes. Great question. We did have such a class, it was called BlockPartition, which later was renamed to SubMatrix. I did try what you suggested, but here was the catch. Even though the partition id's match for partitions, they didn't necessarily end at the same executors. Spark distributes the partitions deterministically, so theoretically, they should end at the same executors, but what happens is, an executor lags, "something" happens, and the partitions then get jumbled around executors. We couldn't (Spark doesn't) guarantee that these partitions end up on the same machine for fault tolerance reasons (it's how the scheduler works). Therefore we needed to have indices as above (which the class SubMatrix had). To ensure that we add the correct blocks with each other, calling a .join was inevitable. Instead of storing the index inside both SubMatrix and having at as a key, we decided to just index it as above.

Then the question becomes: Can users properly index their matrices, properly supply ((Int, Int), Matrix)? To solve this, we will support conversions from CoordinateMatrix and IndexedRowMatrix, which are convenient storage methods, and have users call .toBlockMatrix from these classes.


type SubMatrix = ((Int, Int), Matrix) // ((blockRowIndex, blockColIndex), matrix)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private?


/**
* Alternate constructor for BlockMatrix without the input of a partitioner. Will use a Grid
Expand All @@ -168,125 +94,92 @@ class BlockMatrix(
* @param numRowBlocks Number of blocks that form the rows of this matrix
* @param numColBlocks Number of blocks that form the columns of this matrix
* @param rdd The RDD of SubMatrices (local matrices) that form this matrix
* @param partitioner A partitioner that specifies how SubMatrices are stored in the cluster
* @param rowPerBlock Number of rows that make up each block.
* @param colPerBlock Number of columns that make up each block.
*/
def this(
numRowBlocks: Int,
numColBlocks: Int,
rdd: RDD[SubMatrix],
partitioner: BlockMatrixPartitioner) = {
rdd: RDD[((Int, Int), Matrix)],
rowPerBlock: Int,
colPerBlock: Int) = {
this(numRowBlocks, numColBlocks, rdd)
setPartitioner(partitioner)
val part = new GridPartitioner(numRowBlocks, numColBlocks, rowPerBlock, colPerBlock, rdd.partitions.length)
setPartitioner(part)
}

private[mllib] var partitioner: BlockMatrixPartitioner = {
val firstSubMatrix = rdd.first().mat
private[mllib] var partitioner: GridPartitioner = {
val firstSubMatrix = rdd.first()._2
new GridPartitioner(numRowBlocks, numColBlocks,
firstSubMatrix.numRows, firstSubMatrix.numCols)
firstSubMatrix.numRows, firstSubMatrix.numCols, rdd.partitions.length)
}

/**
* Set the partitioner for the matrix. For internal use only. Users should use `repartition`.
* @param part A partitioner that specifies how SubMatrices are stored in the cluster
*/
private def setPartitioner(part: BlockMatrixPartitioner): Unit = {
private def setPartitioner(part: GridPartitioner): Unit = {
partitioner = part
}

// A key-value pair RDD is required to partition properly
private var matrixRDD: RDD[(Int, SubMatrix)] = keyBy()

private lazy val dims: (Long, Long) = getDim

override def numRows(): Long = dims._1
override def numCols(): Long = dims._2

if (partitioner.name.equals("column")) {
require(numColBlocks == partitioner.numPartitions, "The number of column blocks should match" +
s" the number of partitions of the column partitioner. numColBlocks: $numColBlocks, " +
s"partitioner.numPartitions: ${partitioner.numPartitions}")
} else if (partitioner.name.equals("row")) {
require(numRowBlocks == partitioner.numPartitions, "The number of row blocks should match" +
s" the number of partitions of the row partitioner. numRowBlocks: $numRowBlocks, " +
s"partitioner.numPartitions: ${partitioner.numPartitions}")
} else if (partitioner.name.equals("grid")) {
require(numRowBlocks * numColBlocks == partitioner.numPartitions, "The number of blocks " +
s"should match the number of partitions of the grid partitioner. numRowBlocks * " +
s"numColBlocks: ${numRowBlocks * numColBlocks}, " +
s"partitioner.numPartitions: ${partitioner.numPartitions}")
} else {
throw new IllegalArgumentException("Unrecognized partitioner.")
}

/** Returns the dimensions of the matrix. */
def getDim: (Long, Long) = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be private?


val firstRowColumn = rdd.filter(block => block.blockRowIndex == 0 || block.blockColIndex == 0).
map { block =>
((block.blockRowIndex, block.blockColIndex), (block.mat.numRows, block.mat.numCols))
// picks the sizes of the matrix with the maximum indices
def pickSizeByGreaterIndex(example: (Int, Int, Int, Int), base: (Int, Int, Int, Int)): (Int, Int, Int, Int) = {
if (example._1 > base._1 && example._2 > base._2) {
(example._1, example._2, example._3, example._4)
} else if (example._1 > base._1) {
(example._1, base._2, example._3, base._4)
} else if (example._2 > base._2) {
(base._1, example._2, base._3, example._4)
} else {
(base._1, base._2, base._3, base._4)
}
}

firstRowColumn.treeAggregate((0L, 0L))(
seqOp = (c, v) => (c, v) match { case ((x_dim, y_dim), ((indX, indY), (nRow, nCol))) =>
if (indX == 0 && indY == 0) {
(x_dim + nRow, y_dim + nCol)
} else if (indX == 0) {
(x_dim, y_dim + nCol)
} else {
(x_dim + nRow, y_dim)
}
val lastRowCol = rdd.treeAggregate((0, 0, 0, 0))(
seqOp = (c, v) => (c, v) match { case (base, ((blockXInd, blockYInd), mat)) =>
pickSizeByGreaterIndex((blockXInd, blockYInd, mat.numRows, mat.numCols), base)
},
combOp = (c1, c2) => (c1, c2) match {
case ((x_dim1, y_dim1), (x_dim2, y_dim2)) =>
(x_dim1 + x_dim2, y_dim1 + y_dim2)
case (res1, res2) =>
pickSizeByGreaterIndex(res1, res2)
})

(lastRowCol._1.toLong * partitioner.rowPerBlock + lastRowCol._3,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd add a comment here about how this handles the case when the last row & column of sub-matrices could be smaller than the rest of the sub-matrices.

lastRowCol._2.toLong * partitioner.colPerBlock + lastRowCol._4)
}

/** Returns the Frobenius Norm of the matrix */
def normFro(): Double = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I vote for norm(name: String) with default "Fro"...or maybe something numeric for entry-wise norms. What do other libraries do?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this function. We can add it back later.

math.sqrt(rdd.map(lm => lm.mat.values.map(x => math.pow(x, 2)).sum).reduce(_ + _))
math.sqrt(rdd.map {
case sparse: ((Int, Int), SparseMatrix) =>
sparse._2.values.map(x => math.pow(x, 2)).sum
case dense: ((Int, Int), DenseMatrix) =>
dense._2.values.map(x => math.pow(x, 2)).sum
}.reduce(_ + _))
}

/** Cache the underlying RDD. */
def cache(): DistributedMatrix = {
matrixRDD.cache()
rdd.cache()
this
}

/** Set the storage level for the underlying RDD. */
def persist(storageLevel: StorageLevel): DistributedMatrix = {
matrixRDD.persist(storageLevel)
this
}

/** Add a key to the underlying rdd for partitioning and joins. */
private def keyBy(part: BlockMatrixPartitioner = partitioner): RDD[(Int, SubMatrix)] = {
rdd.map { block =>
part match {
case r: RowBasedPartitioner => (block.blockRowIndex, block)
case c: ColumnBasedPartitioner => (block.blockColIndex, block)
case g: GridPartitioner => (block.blockRowIndex + numRowBlocks * block.blockColIndex, block)
case _ => throw new IllegalArgumentException("Unrecognized partitioner")
}
}
}

/**
* Repartition the BlockMatrix using a different partitioner.
*
* @param part The partitioner to partition by
* @return The repartitioned BlockMatrix
*/
def repartition(part: BlockMatrixPartitioner): DistributedMatrix = {
matrixRDD = keyBy(part)
setPartitioner(part)
rdd.persist(storageLevel)
this
}

/** Collect the distributed matrix on the driver. */
def collect(): DenseMatrix = {
val parts = rdd.map(x => ((x.blockRowIndex, x.blockColIndex), x.mat)).
collect().sortBy(x => (x._1._2, x._1._1))
def toLocalMatrix(): Matrix = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename toLocal?

val parts = rdd.collect().sortBy(x => (x._1._2, x._1._1))
val nRows = numRows().toInt
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd check numRows and numCols here before converting to Int and throw an error if the matrix is too large.

val nCols = numCols().toInt
val values = new Array[Double](nRows * nCols)
Expand All @@ -301,7 +194,7 @@ class BlockMatrix(
val indStart = (j + colStart) * nRows + rowStart
val indEnd = block.numRows
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"indEnd" --> "iEnd" since it matches with i, not indStart

val matStart = j * block.numRows
val mat = block.values
val mat = block.toArray
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move outside loop

while (i < indEnd) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be shortened with an Array.copy call, but either is OK with me.

values(indStart + i) = mat(matStart + i)
i += 1
Expand All @@ -316,7 +209,7 @@ class BlockMatrix(

/** Collects data and assembles a local dense breeze matrix (for test only). */
private[mllib] def toBreeze(): BDM[Double] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is just for testing, then I'd make it private[distributed]. If it should fit with other APIs, then it should return a Matrix, not a DenseMatrix.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is how it is currently for all distributed matrices, and each return a BDM. Maybe we can change all of them later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see. Yeah, I think we should change that later, but later is fine since it's internal.

val localMat = collect()
new BDM[Double](localMat.numRows, localMat.numCols, localMat.values)
val localMat = toLocalMatrix()
new BDM[Double](localMat.numRows, localMat.numCols, localMat.toArray)
}
}
Loading