Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Let MapStatus be smaller in sparse/tense cases
  • Loading branch information
yaooqinn committed Nov 9, 2015
commit 492adeb58871c179bf5f6b553111c6f2a2b3ee3b
84 changes: 50 additions & 34 deletions core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -121,20 +121,21 @@ private[spark] class CompressedMapStatus(
}
}



/**
* A [[MapStatus]] implementation that only stores the average size of non-empty blocks,
* plus a bitmap for tracking which blocks are empty. During serialization, this bitmap
* is compressed.
* plus a hashset for tracking which blocks are not empty.
*
* @param loc location where the task is being executed
* @param numNonEmptyBlocks the number of non-empty blocks
* @param emptyBlocks a bitmap tracking which blocks are empty
* @param nonEmptyBlocks a hashset tracking which blocks are not empty
* @param avgSize average size of the non-empty blocks
*/
private[spark] class HighlyCompressedMapStatus private (
private[spark] class MapStatusTrackingNoEmptyBlocks private (
private[this] var loc: BlockManagerId,
private[this] var numNonEmptyBlocks: Int,
private[this] var emptyBlocks: BitSet,
private[this] var nonEmptyBlocks: mutable.HashSet[Int],
private[this] var avgSize: Long)
extends MapStatus with Externalizable {

Expand All @@ -147,40 +148,48 @@ private[spark] class HighlyCompressedMapStatus private (
override def location: BlockManagerId = loc

override def getSizeForBlock(reduceId: Int): Long = {
if (emptyBlocks.get(reduceId)) {
0
} else {
if (nonEmptyBlocks.contains(reduceId)) {
avgSize
} else {
0
}
}

override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
loc.writeExternal(out)
emptyBlocks.writeExternal(out)
out.writeObject(nonEmptyBlocks)
out.writeLong(avgSize)
}

override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
loc = BlockManagerId(in)
emptyBlocks = new BitSet
emptyBlocks.readExternal(in)
nonEmptyBlocks = new mutable.HashSet[Int]
nonEmptyBlocks = in.readObject().asInstanceOf[mutable.HashSet[Int]]
avgSize = in.readLong()
}
}

private[spark] object MapStatusTrackingNoEmptyBlocks {
def apply(loc: BlockManagerId, numNonEmptyBlocks: Int , nonEmptyBlocks: mutable.HashSet[Int], avgSize: Long): MapStatusTrackingNoEmptyBlocks =
{
new MapStatusTrackingNoEmptyBlocks(loc, numNonEmptyBlocks, nonEmptyBlocks, avgSize )
}
}

/**
* A [[MapStatus]] implementation that only stores the average size of non-empty blocks,
* plus a hashset for tracking which blocks are not empty.
* plus a bitmap for tracking which blocks are empty. During serialization, this bitmap
* is compressed.
*
* @param loc location where the task is being executed
* @param numNonEmptyBlocks the number of non-empty blocks
* @param nonEmptyBlocks a hashset tracking which blocks are not empty
* @param emptyBlocksHashSet a bitmap tracking which blocks are empty
* @param avgSize average size of the non-empty blocks
*/
private[spark] class MapStatusTrackingNoEmptyBlocks private (
private[spark] class MapStatusTrackingEmptyBlocks private (
private[this] var loc: BlockManagerId,
private[this] var numNonEmptyBlocks: Int,
private[this] var nonEmptyBlocks: mutable.HashSet[Int],
private[this] var emptyBlocksHashSet: mutable.HashSet[Int],
private[this] var avgSize: Long)
extends MapStatus with Externalizable {

Expand All @@ -193,42 +202,49 @@ private[spark] class MapStatusTrackingNoEmptyBlocks private (
override def location: BlockManagerId = loc

override def getSizeForBlock(reduceId: Int): Long = {
if (nonEmptyBlocks.contains(reduceId)) {
avgSize
} else {
if (emptyBlocksHashSet.contains(reduceId)) {
0
} else {
avgSize
}
}

override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
loc.writeExternal(out)
out.writeObject(nonEmptyBlocks)
out.writeObject(emptyBlocksHashSet)
out.writeLong(avgSize)
}

override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
loc = BlockManagerId(in)
nonEmptyBlocks = new mutable.HashSet[Int]
nonEmptyBlocks = in.readObject().asInstanceOf[mutable.HashSet[Int]]
emptyBlocksHashSet = new mutable.HashSet[Int]
emptyBlocksHashSet = in.readObject().asInstanceOf[mutable.HashSet[Int]]
avgSize = in.readLong()
}
}

private[spark] object MapStatusTrackingEmptyBlocks {
def apply(loc: BlockManagerId, numNonEmptyBlocks: Int , emptyBlocksHashSet: mutable.HashSet[Int], avgSize: Long): MapStatusTrackingEmptyBlocks =
{
new MapStatusTrackingEmptyBlocks(loc, numNonEmptyBlocks, emptyBlocksHashSet, avgSize )
}
}

/**
* A [[MapStatus]] implementation that only stores the average size of non-empty blocks,
* plus a bitmap for tracking which blocks are empty. During serialization, this bitmap
* is compressed.
*
* @param loc location where the task is being executed
* @param numNonEmptyBlocks the number of non-empty blocks
* @param emptyBlocksHashSet a bitmap tracking which blocks are empty
* @param emptyBlocks a bitmap tracking which blocks are empty
* @param avgSize average size of the non-empty blocks
*/
private[spark] class MapStatusTrackingEmptyBlocks private (
private[this] var loc: BlockManagerId,
private[this] var numNonEmptyBlocks: Int,
private[this] var emptyBlocksHashSet: mutable.HashSet[Int],
private[this] var avgSize: Long)
private[spark] class HighlyCompressedMapStatus private (
private[this] var loc: BlockManagerId,
private[this] var numNonEmptyBlocks: Int,
private[this] var emptyBlocks: BitSet,
private[this] var avgSize: Long)
extends MapStatus with Externalizable {

// loc could be null when the default constructor is called during deserialization
Expand All @@ -240,7 +256,7 @@ private[spark] class MapStatusTrackingEmptyBlocks private (
override def location: BlockManagerId = loc

override def getSizeForBlock(reduceId: Int): Long = {
if (emptyBlocksHashSet.contains(reduceId)) {
if (emptyBlocks.get(reduceId)) {
0
} else {
avgSize
Expand All @@ -249,20 +265,20 @@ private[spark] class MapStatusTrackingEmptyBlocks private (

override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
loc.writeExternal(out)
out.writeObject(emptyBlocksHashSet)
emptyBlocks.writeExternal(out)
out.writeLong(avgSize)
}

override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
loc = BlockManagerId(in)
emptyBlocksHashSet = new mutable.HashSet[Int]
emptyBlocksHashSet = in.readObject().asInstanceOf[mutable.HashSet[Int]]
emptyBlocks = new BitSet
emptyBlocks.readExternal(in)
avgSize = in.readLong()
}
}

private[spark] object HighlyCompressedMapStatus {
def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {
def apply[T >: MapStatus](loc: BlockManagerId, uncompressedSizes: Array[Long]): T = {
// We must keep track of which blocks are empty so that we don't report a zero-sized
// block as being non-empty (or vice-versa) when using the average block size.
var i = 0
Expand Down Expand Up @@ -293,10 +309,10 @@ private[spark] object HighlyCompressedMapStatus {
0
}
if(numNonEmptyBlocks * 32 < totalNumBlocks){
new MapStatusTrackingNoEmptyBlocks(loc, nonEmptyBlocks, nonEmptyBlocks, avgSize )
MapStatusTrackingNoEmptyBlocks(loc, numNonEmptyBlocks, nonEmptyBlocks, avgSize )
}
else if ((totalNumBlocks - numNonEmptyBlocks) * 32 < totalNumBlocks){
new MapStatusTrackingEmptyBlocks(loc, nonEmptyBlocks, emptyBlocksHashSet, avgSize )
MapStatusTrackingEmptyBlocks(loc, numNonEmptyBlocks, emptyBlocksHashSet, avgSize)
}
else {
new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize)
Expand Down