Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
b693209
Ready for Pull request
Nov 11, 2014
f378e16
[SPARK-3974] Block Matrix Abstractions ready
Nov 11, 2014
aa8f086
[SPARK-3974] Additional comments added
Nov 11, 2014
589fbb6
[SPARK-3974] Code review feedback addressed
Nov 14, 2014
19c17e8
[SPARK-3974] Changed blockIdRow and blockIdCol
Nov 14, 2014
b05aabb
[SPARK-3974] Updated tests to reflect changes
brkyvz Nov 14, 2014
645afbe
[SPARK-3974] Pull latest master
brkyvz Nov 14, 2014
49b9586
[SPARK-3974] Updated testing utils from master
brkyvz Nov 14, 2014
d033861
[SPARK-3974] Removed SubMatrixInfo and added constructor without part…
brkyvz Nov 15, 2014
9ae85aa
[SPARK-3974] Made partitioner a variable inside BlockMatrix instead o…
brkyvz Nov 20, 2014
ab6cde0
[SPARK-3974] Modifications cleaning code up, making size calculation …
brkyvz Jan 14, 2015
ba414d2
[SPARK-3974] fixed frobenius norm
brkyvz Jan 14, 2015
239ab4b
[SPARK-3974] Addressed @jkbradley's comments
brkyvz Jan 19, 2015
1e8bb2a
[SPARK-3974] Change return type of cache and persist
brkyvz Jan 20, 2015
1a63b20
[SPARK-3974] Remove setPartition method. Isn't required
brkyvz Jan 20, 2015
eebbdf7
preliminary changes addressing code review
brkyvz Jan 21, 2015
f9d664b
updated API and modified partitioning scheme
brkyvz Jan 21, 2015
1694c9e
almost finished addressing comments
brkyvz Jan 27, 2015
140f20e
Merge branch 'master' of github.com:apache/spark into SPARK-3974
brkyvz Jan 27, 2015
5eecd48
fixed gridPartitioner and added tests
brkyvz Jan 27, 2015
24ec7b8
update grid partitioner
mengxr Jan 28, 2015
e1d3ee8
minor updates
mengxr Jan 28, 2015
feb32a7
update tests
mengxr Jan 28, 2015
a8eace2
Merge pull request #2 from mengxr/brkyvz-SPARK-3974
brkyvz Jan 28, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
preliminary changes addressing code review
  • Loading branch information
brkyvz committed Jan 21, 2015
commit eebbdf742e5ef94e9c5f278a07fd94b625117716
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ private[mllib] class GridPartitioner(
override val numPartitions: Int) extends Partitioner {

/**
* Returns the index of the partition the SubMatrix belongs to.
* Returns the index of the partition the SubMatrix belongs to. Tries to achieve block wise
* partitioning.
*
* @param key The key for the SubMatrix. Can be its position in the grid (its column major index)
* or a tuple of three integers that are the final row index after the multiplication,
Expand All @@ -51,22 +52,25 @@ private[mllib] class GridPartitioner(
* @return The index of the partition, which the SubMatrix belongs to.
*/
override def getPartition(key: Any): Int = {
val sqrtPartition = math.round(math.sqrt(numPartitions)).toInt
// numPartitions may not be the square of a number, it can even be a prime number

key match {
case (rowIndex: Int, colIndex: Int) =>
Utils.nonNegativeMod(rowIndex + colIndex * numRowBlocks, numPartitions)
case (rowIndex: Int, innerIndex: Int, colIndex: Int) =>
Utils.nonNegativeMod(rowIndex + colIndex * numRowBlocks, numPartitions)
case (blockRowIndex: Int, blockColIndex: Int) =>
Utils.nonNegativeMod(blockRowIndex + blockColIndex * numRowBlocks, numPartitions)
case (blockRowIndex: Int, innerIndex: Int, blockColIndex: Int) =>
Utils.nonNegativeMod(blockRowIndex + blockColIndex * numRowBlocks, numPartitions)
case _ =>
throw new IllegalArgumentException("Unrecognized key")
throw new IllegalArgumentException(s"Unrecognized key. key: $key")
}
}

/** Checks whether the partitioners have the same characteristics */
override def equals(obj: Any): Boolean = {
obj match {
case r: GridPartitioner =>
(this.numPartitions == r.numPartitions) && (this.rowsPerBlock == r.rowsPerBlock) &&
(this.colsPerBlock == r.colsPerBlock)
(this.numRowBlocks == r.numRowBlocks) && (this.numColBlocks == r.numColBlocks)
(this.rowsPerBlock == r.rowsPerBlock) && (this.colsPerBlock == r.colsPerBlock)
case _ =>
false
}
Expand All @@ -85,7 +89,7 @@ class BlockMatrix(
val numColBlocks: Int,
val rdd: RDD[((Int, Int), Matrix)]) extends DistributedMatrix with Logging {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brkyvz @rezazadeh I'm wondering why you are using (Int,Int) as the block index, as opposed to a class that implements hashCode() to partition the matrix in lexicographic order. Then, the call rdd.partitionBy(new HashPartitioner(numRowBlocks*numColBlocks)) will assure the same partitioning between two distributed matrices. When performing addition or elementwise-multiplication with other distributed matrices, the operation A_ij + B_ij would be local without any shuffle.
Could you please explain the use of (Int,Int) over an Index class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @mbhynes. Great question. We did have such a class, it was called BlockPartition, which later was renamed to SubMatrix. I did try what you suggested, but here was the catch. Even though the partition id's match for partitions, they didn't necessarily end at the same executors. Spark distributes the partitions deterministically, so theoretically, they should end at the same executors, but what happens is, an executor lags, "something" happens, and the partitions then get jumbled around executors. We couldn't (Spark doesn't) guarantee that these partitions end up on the same machine for fault tolerance reasons (it's how the scheduler works). Therefore we needed to have indices as above (which the class SubMatrix had). To ensure that we add the correct blocks with each other, calling a .join was inevitable. Instead of storing the index inside both SubMatrix and having at as a key, we decided to just index it as above.

Then the question becomes: Can users properly index their matrices, properly supply ((Int, Int), Matrix)? To solve this, we will support conversions from CoordinateMatrix and IndexedRowMatrix, which are convenient storage methods, and have users call .toBlockMatrix from these classes.


type SubMatrix = ((Int, Int), Matrix) // ((blockRowIndex, blockColIndex), matrix)
private type SubMatrix = ((Int, Int), Matrix) // ((blockRowIndex, blockColIndex), matrix)

/**
* Alternate constructor for BlockMatrix without the input of a partitioner. Will use a Grid
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.mllib.linalg.distributed

import org.scalatest.FunSuite

import breeze.linalg.{DenseMatrix => BDM}

import org.apache.spark.mllib.linalg.{DenseMatrix, Matrices, Matrix}
Expand Down