Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
simliar classes merged
  • Loading branch information
yaooqinn committed Nov 9, 2015
commit e1d11067e304f3f2131848d568034ec4cae4e3af
95 changes: 19 additions & 76 deletions core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -121,94 +121,34 @@ private[spark] class CompressedMapStatus(
}
}



/**
* A [[MapStatus]] implementation that only stores the average size of non-empty blocks,
* plus a hashset for tracking which blocks are not empty.
* In this case, no-empty blocks are very sparse,
* plus a hashset for tracking which blocks are empty(dense) / non-empty(sparse).
* using a HashSet[Int] can save more memory usage than BitSet
*
* @param loc location where the task is being executed
* @param numNonEmptyBlocks the number of non-empty blocks
* @param nonEmptyBlocks a hashset tracking which blocks are not empty
* @param avgSize average size of the non-empty blocks
*/
private[spark] class MapStatusTrackingNoEmptyBlocks private (
private[this] var loc: BlockManagerId,
private[this] var numNonEmptyBlocks: Int,
private[this] var nonEmptyBlocks: mutable.HashSet[Int],
private[this] var avgSize: Long)
extends MapStatus with Externalizable {

// loc could be null when the default constructor is called during deserialization
require(loc == null || avgSize > 0 || numNonEmptyBlocks == 0,
"Average size can only be zero for map stages that produced no output")

protected def this() = this(null, -1, null, -1) // For deserialization only

override def location: BlockManagerId = loc

override def getSizeForBlock(reduceId: Int): Long = {
if (nonEmptyBlocks.contains(reduceId)) {
avgSize
} else {
0
}
}

override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
loc.writeExternal(out)
out.writeObject(nonEmptyBlocks)
out.writeLong(avgSize)
}

override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
loc = BlockManagerId(in)
nonEmptyBlocks = new mutable.HashSet[Int]
nonEmptyBlocks = in.readObject().asInstanceOf[mutable.HashSet[Int]]
avgSize = in.readLong()
}
}

private[spark] object MapStatusTrackingNoEmptyBlocks {
def apply(
loc: BlockManagerId,
numNonEmptyBlocks: Int,
nonEmptyBlocks: mutable.HashSet[Int],
avgSize: Long): MapStatusTrackingNoEmptyBlocks = {
new MapStatusTrackingNoEmptyBlocks(loc, numNonEmptyBlocks, nonEmptyBlocks, avgSize )
}
}

/**
* A [[MapStatus]] implementation that only stores the average size of non-empty blocks,
* plus a hashset for tracking which blocks are empty.
* In this case, no-empty blocks are very dense,
* using a HashSet[Int] can save more memory usage than BitSet
*
* @param loc location where the task is being executed
* @param numNonEmptyBlocks the number of non-empty blocks
* @param emptyBlocksHashSet a bitmap tracking which blocks are empty
* @param markedBlocks a HashSet tracking which blocks are empty(dense) / non-empty(sparse)
* @param avgSize average size of the non-empty blocks
*/
private[spark] class MapStatusTrackingEmptyBlocks private (
private[this] var loc: BlockManagerId,
private[this] var numNonEmptyBlocks: Int,
private[this] var emptyBlocksHashSet: mutable.HashSet[Int],
private[this] var avgSize: Long)
private[this] var markedBlocks: mutable.HashSet[Int],
private[this] var avgSize: Long,
private[this] var isSparse: Boolean)
extends MapStatus with Externalizable {

// loc could be null when the default constructor is called during deserialization
require(loc == null || avgSize > 0 || numNonEmptyBlocks == 0,
"Average size can only be zero for map stages that produced no output")

protected def this() = this(null, -1, null, -1) // For deserialization only
protected def this() = this(null, -1, null, -1, false) // For deserialization only

override def location: BlockManagerId = loc

override def getSizeForBlock(reduceId: Int): Long = {
if (emptyBlocksHashSet.contains(reduceId)) {
if (isSparse ^ markedBlocks.contains(reduceId)) {
0
} else {
avgSize
Expand All @@ -217,14 +157,14 @@ private[spark] class MapStatusTrackingEmptyBlocks private (

override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
loc.writeExternal(out)
out.writeObject(emptyBlocksHashSet)
out.writeObject(markedBlocks)
out.writeLong(avgSize)
}

override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
loc = BlockManagerId(in)
emptyBlocksHashSet = new mutable.HashSet[Int]
emptyBlocksHashSet = in.readObject().asInstanceOf[mutable.HashSet[Int]]
markedBlocks = new mutable.HashSet[Int]
markedBlocks = in.readObject().asInstanceOf[mutable.HashSet[Int]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you mean to immediately override the assignment on the previous line?

avgSize = in.readLong()
}
}
Expand All @@ -233,9 +173,10 @@ private[spark] object MapStatusTrackingEmptyBlocks {
def apply(
loc: BlockManagerId,
numNonEmptyBlocks: Int ,
emptyBlocksHashSet: mutable.HashSet[Int],
avgSize: Long): MapStatusTrackingEmptyBlocks = {
new MapStatusTrackingEmptyBlocks(loc, numNonEmptyBlocks, emptyBlocksHashSet, avgSize )
markedBlocks: mutable.HashSet[Int],
avgSize: Long,
flag: Boolean): MapStatusTrackingEmptyBlocks = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indent these by 2 more spaces

new MapStatusTrackingEmptyBlocks(loc, numNonEmptyBlocks, markedBlocks, avgSize, flag)
}
}

Expand All @@ -261,7 +202,7 @@ private[spark] class HighlyCompressedMapStatus private (
"Average size can only be zero for map stages that produced no output")

protected def this() = this(null, -1, null, -1) // For deserialization only

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert

override def location: BlockManagerId = loc

override def getSizeForBlock(reduceId: Int): Long = {
Expand Down Expand Up @@ -317,11 +258,13 @@ private[spark] object HighlyCompressedMapStatus {
} else {
0
}
var isSparse = true
if(numNonEmptyBlocks * 32 < totalNumBlocks){
MapStatusTrackingNoEmptyBlocks(loc, numNonEmptyBlocks, nonEmptyBlocks, avgSize )
MapStatusTrackingEmptyBlocks(loc, numNonEmptyBlocks, nonEmptyBlocks, avgSize, isSparse)
}
else if ((totalNumBlocks - numNonEmptyBlocks) * 32 < totalNumBlocks){
MapStatusTrackingEmptyBlocks(loc, numNonEmptyBlocks, emptyBlocksHashSet, avgSize)
isSparse = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this use of a var is unnecessarily complicated. Just do

if (numNonEmptyBlocks * 32 < totalNumBlocks) {
  MapStatusTrackingEmptyBlocks(loc, numNonEmptyBlocks, nonEmptyBlocks, avgSize, isSparse = true)
} else if (...) {
  MapStatusTrackingEmptyBlocks(loc, numNonEmptyBlocks, emptyBlocksHashSet, avgSize, isSparse = false)
} else {
  ...
}

MapStatusTrackingEmptyBlocks(loc, numNonEmptyBlocks, emptyBlocksHashSet, avgSize, isSparse)
}
else {
new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark._
import org.apache.spark.api.python.PythonBroadcast
import org.apache.spark.broadcast.HttpBroadcast
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.scheduler.{MapStatusTrackingEmptyBlocks, MapStatusTrackingNoEmptyBlocks, CompressedMapStatus, HighlyCompressedMapStatus}
import org.apache.spark.scheduler.{MapStatusTrackingEmptyBlocks, CompressedMapStatus, HighlyCompressedMapStatus}
import org.apache.spark.storage._
import org.apache.spark.util.{Utils, BoundedPriorityQueue, SerializableConfiguration, SerializableJobConf}
import org.apache.spark.util.collection.{BitSet, CompactBuffer}
Expand Down Expand Up @@ -363,7 +363,6 @@ private[serializer] object KryoSerializer {
classOf[CompressedMapStatus],
classOf[HighlyCompressedMapStatus],
classOf[BitSet],
classOf[MapStatusTrackingNoEmptyBlocks],
classOf[MapStatusTrackingEmptyBlocks],
classOf[CompactBuffer[_]],
classOf[BlockManagerId],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,11 @@ class MapStatusSuite extends SparkFunSuite {
}

test("large tasks with sparse non-empty blocks should use " +
classOf[MapStatusTrackingNoEmptyBlocks].getName) {
classOf[MapStatusTrackingEmptyBlocks].getName) {
val sizes = Array.fill[Long](2001)(0L)
sizes(0) = 1L
val status = MapStatus(null, sizes)
assert(status.isInstanceOf[MapStatusTrackingNoEmptyBlocks])
assert(status.isInstanceOf[MapStatusTrackingEmptyBlocks])
assert(status.getSizeForBlock(0) === 1L)
assert(status.getSizeForBlock(50) === 0L)
assert(status.getSizeForBlock(99) === 0L)
Expand Down