Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Let MapStatus be smaller in sparse/tense cases 1
  • Loading branch information
yaooqinn committed Nov 9, 2015
commit cb4bce57fdfad98a64821870025b8173447cf882
15 changes: 9 additions & 6 deletions core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ private[spark] class CompressedMapStatus(
/**
* A [[MapStatus]] implementation that only stores the average size of non-empty blocks,
* plus a hashset for tracking which blocks are not empty.
* In this case, no-empty blocks are very sparse,
* using a HashSet[Int] can save more memory usage than BitSet
*
* @param loc location where the task is being executed
* @param numNonEmptyBlocks the number of non-empty blocks
Expand Down Expand Up @@ -178,8 +180,9 @@ private[spark] object MapStatusTrackingNoEmptyBlocks {

/**
* A [[MapStatus]] implementation that only stores the average size of non-empty blocks,
* plus a bitmap for tracking which blocks are empty. During serialization, this bitmap
* is compressed.
* plus a hashset for tracking which blocks are empty.
* In this case, no-empty blocks are very dense,
* using a HashSet[Int] can save more memory usage than BitSet
*
* @param loc location where the task is being executed
* @param numNonEmptyBlocks the number of non-empty blocks
Expand Down Expand Up @@ -241,10 +244,10 @@ private[spark] object MapStatusTrackingEmptyBlocks {
* @param avgSize average size of the non-empty blocks
*/
private[spark] class HighlyCompressedMapStatus private (
private[this] var loc: BlockManagerId,
private[this] var numNonEmptyBlocks: Int,
private[this] var emptyBlocks: BitSet,
private[this] var avgSize: Long)
private[this] var loc: BlockManagerId,
private[this] var numNonEmptyBlocks: Int,
private[this] var emptyBlocks: BitSet,
private[this] var avgSize: Long)
extends MapStatus with Externalizable {

// loc could be null when the default constructor is called during deserialization
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,23 +66,49 @@ class MapStatusSuite extends SparkFunSuite {
}
}

test("large tasks should use " + classOf[HighlyCompressedMapStatus].getName) {
test("large tasks with dense non-empty blocks should use " + classOf[MapStatusTrackingEmptyBlocks].getName) {
val sizes = Array.fill[Long](2001)(150L)
val status = MapStatus(null, sizes)
assert(status.isInstanceOf[HighlyCompressedMapStatus])
assert(status.isInstanceOf[MapStatusTrackingEmptyBlocks])
assert(status.getSizeForBlock(10) === 150L)
assert(status.getSizeForBlock(50) === 150L)
assert(status.getSizeForBlock(99) === 150L)
assert(status.getSizeForBlock(2000) === 150L)
}

test("HighlyCompressedMapStatus: estimated size should be the average non-empty block size") {
test("large tasks with sparse non-empty blocks should use " + classOf[MapStatusTrackingNoEmptyBlocks].getName) {
val sizes = Array.fill[Long](2001)(0L)
sizes(0) = 1L
val status = MapStatus(null, sizes)
assert(status.isInstanceOf[MapStatusTrackingNoEmptyBlocks])
assert(status.getSizeForBlock(0) === 1L)
assert(status.getSizeForBlock(50) === 0L)
assert(status.getSizeForBlock(99) === 0L)
assert(status.getSizeForBlock(2000) === 0L)
}

test("large tasks with not dense or sparse non-empty blocks should use " + classOf[HighlyCompressedMapStatus].getName) {
val sizes = Array.fill[Long](2001)(0L)
for(i <- 0 to sizes.length - 1){
if(i % 2 == 1){
sizes(i) = 1L
}
}
val status = MapStatus(null, sizes)
assert(status.isInstanceOf[HighlyCompressedMapStatus])
assert(status.getSizeForBlock(0) === 0L)
assert(status.getSizeForBlock(1) === 1L)
assert(status.getSizeForBlock(1999) === 1L)
assert(status.getSizeForBlock(2000) === 0L)
}

test("MapStatusTrackingEmptyBlocks: estimated size should be the average non-empty block size") {
val sizes = Array.tabulate[Long](3000) { i => i.toLong }
val avg = sizes.sum / sizes.filter(_ != 0).length
val loc = BlockManagerId("a", "b", 10)
val status = MapStatus(loc, sizes)
val status1 = compressAndDecompressMapStatus(status)
assert(status1.isInstanceOf[HighlyCompressedMapStatus])
assert(status1.isInstanceOf[MapStatusTrackingEmptyBlocks])
assert(status1.location == loc)
for (i <- 0 until 3000) {
val estimate = status1.getSizeForBlock(i)
Expand Down