Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 73 additions & 4 deletions core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ package org.apache.spark.scheduler
import java.io.{Externalizable, ObjectInput, ObjectOutput}

import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.collection.BitSet
import org.apache.spark.util.Utils
import org.apache.spark.util.collection.{BitSet, OpenHashSet}

/**
* Result returned by a ShuffleMapTask to a scheduler. Includes the block manager address that the
Expand Down Expand Up @@ -80,7 +80,6 @@ private[spark] object MapStatus {
}
}


/**
* A [[MapStatus]] implementation that tracks the size of each block. Size for each block is
* represented using a single byte.
Expand Down Expand Up @@ -119,6 +118,66 @@ private[spark] class CompressedMapStatus(
}
}

/**
* A [[MapStatus]] implementation that only stores the average size of non-empty blocks,
* plus a OpenHashSet for tracking which blocks are empty(dense) / non-empty(sparse).
* using a OpenHashSet[Int] can save more memory usage than BitSet
*
* @param loc location where the task is being executed
* @param numNonEmptyBlocks the number of non-empty blocks
* @param markedBlocks a OpenHashSet tracking which blocks are empty(dense) / non-empty(sparse)
* @param avgSize average size of the non-empty blocks
*/
private[spark] class MapStatusTrackingEmptyBlocks private (
private[this] var loc: BlockManagerId,
private[this] var numNonEmptyBlocks: Int,
private[this] var markedBlocks: OpenHashSet[Int],
private[this] var avgSize: Long,
private[this] var isSparse: Boolean)
extends MapStatus with Externalizable {

// loc could be null when the default constructor is called during deserialization
require(loc == null || avgSize > 0 || numNonEmptyBlocks == 0,
"Average size can only be zero for map stages that produced no output")

protected def this() = this(null, -1, null, -1, false) // For deserialization only

override def location: BlockManagerId = loc

override def getSizeForBlock(reduceId: Int): Long = {
if (isSparse ^ markedBlocks.contains(reduceId)) {
0
} else {
avgSize
}
}

override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
loc.writeExternal(out)
out.writeObject(markedBlocks)
out.writeLong(avgSize)
out.writeBoolean(isSparse)
}

override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
loc = BlockManagerId(in)
markedBlocks = in.readObject().asInstanceOf[OpenHashSet[Int]]
avgSize = in.readLong()
isSparse = in.readBoolean()
}
}

private[spark] object MapStatusTrackingEmptyBlocks {
def apply(
loc: BlockManagerId,
numNonEmptyBlocks: Int ,
markedBlocks: OpenHashSet[Int],
avgSize: Long,
isSparse: Boolean): MapStatusTrackingEmptyBlocks = {
new MapStatusTrackingEmptyBlocks(loc, numNonEmptyBlocks, markedBlocks, avgSize, isSparse)
}
}

/**
* A [[MapStatus]] implementation that only stores the average size of non-empty blocks,
* plus a bitmap for tracking which blocks are empty. During serialization, this bitmap
Expand Down Expand Up @@ -167,7 +226,7 @@ private[spark] class HighlyCompressedMapStatus private (
}

private[spark] object HighlyCompressedMapStatus {
def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): HighlyCompressedMapStatus = {
def apply[T >: MapStatus](loc: BlockManagerId, uncompressedSizes: Array[Long]): T = {
// We must keep track of which blocks are empty so that we don't report a zero-sized
// block as being non-empty (or vice-versa) when using the average block size.
var i = 0
Expand All @@ -178,13 +237,17 @@ private[spark] object HighlyCompressedMapStatus {
// we expect that there will be far fewer of them, so we will perform fewer bitmap insertions.
val totalNumBlocks = uncompressedSizes.length
val emptyBlocks = new BitSet(totalNumBlocks)
val emptyBlocksHashSet = new OpenHashSet[Int]
val nonEmptyBlocks = new OpenHashSet[Int]
while (i < totalNumBlocks) {
var size = uncompressedSizes(i)
if (size > 0) {
numNonEmptyBlocks += 1
totalSize += size
nonEmptyBlocks.add(i)
} else {
emptyBlocks.set(i)
emptyBlocksHashSet.add(i)
}
i += 1
}
Expand All @@ -193,6 +256,12 @@ private[spark] object HighlyCompressedMapStatus {
} else {
0
}
new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize)
if(numNonEmptyBlocks * 32 < totalNumBlocks){
MapStatusTrackingEmptyBlocks(loc, numNonEmptyBlocks, nonEmptyBlocks, avgSize, isSparse = true)
} else if ((totalNumBlocks - numNonEmptyBlocks) * 32 < totalNumBlocks){
MapStatusTrackingEmptyBlocks(loc, numNonEmptyBlocks, emptyBlocksHashSet, avgSize, isSparse = false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isSparse is the wrong name, I think -- both cases are sparse, its a question of whether or not you are storing the empty blocks.

} else {
new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark._
import org.apache.spark.api.python.PythonBroadcast
import org.apache.spark.broadcast.HttpBroadcast
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus}
import org.apache.spark.scheduler.{MapStatusTrackingEmptyBlocks, CompressedMapStatus, HighlyCompressedMapStatus}
import org.apache.spark.storage._
import org.apache.spark.util.{Utils, BoundedPriorityQueue, SerializableConfiguration, SerializableJobConf}
import org.apache.spark.util.collection.{BitSet, CompactBuffer}
Expand Down Expand Up @@ -363,6 +363,7 @@ private[serializer] object KryoSerializer {
classOf[CompressedMapStatus],
classOf[HighlyCompressedMapStatus],
classOf[BitSet],
classOf[MapStatusTrackingEmptyBlocks],
classOf[CompactBuffer[_]],
classOf[BlockManagerId],
classOf[Array[Byte]],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,23 +66,52 @@ class MapStatusSuite extends SparkFunSuite {
}
}

test("large tasks should use " + classOf[HighlyCompressedMapStatus].getName) {
test("large tasks with dense non-empty blocks should use" +
classOf[MapStatusTrackingEmptyBlocks].getName) {
val sizes = Array.fill[Long](2001)(150L)
val status = MapStatus(null, sizes)
assert(status.isInstanceOf[HighlyCompressedMapStatus])
assert(status.isInstanceOf[MapStatusTrackingEmptyBlocks])
assert(status.getSizeForBlock(10) === 150L)
assert(status.getSizeForBlock(50) === 150L)
assert(status.getSizeForBlock(99) === 150L)
assert(status.getSizeForBlock(2000) === 150L)
}

test("HighlyCompressedMapStatus: estimated size should be the average non-empty block size") {
test("large tasks with sparse non-empty blocks should use " +
classOf[MapStatusTrackingEmptyBlocks].getName) {
val sizes = Array.fill[Long](2001)(0L)
sizes(0) = 1L
val status = MapStatus(null, sizes)
assert(status.isInstanceOf[MapStatusTrackingEmptyBlocks])
assert(status.getSizeForBlock(0) === 1L)
assert(status.getSizeForBlock(50) === 0L)
assert(status.getSizeForBlock(99) === 0L)
assert(status.getSizeForBlock(2000) === 0L)
}

test("large tasks with not dense or sparse non-empty blocks should use " +
classOf[HighlyCompressedMapStatus].getName) {
val sizes = Array.fill[Long](2001)(0L)
for(i <- 0 to sizes.length - 1){
if (i % 2 == 1) {
sizes(i) = 1L
}
}
val status = MapStatus(null, sizes)
assert(status.isInstanceOf[HighlyCompressedMapStatus])
assert(status.getSizeForBlock(0) === 0L)
assert(status.getSizeForBlock(1) === 1L)
assert(status.getSizeForBlock(1999) === 1L)
assert(status.getSizeForBlock(2000) === 0L)
}

test("MapStatusTrackingEmptyBlocks: estimated size should be the average non-empty block size") {
val sizes = Array.tabulate[Long](3000) { i => i.toLong }
val avg = sizes.sum / sizes.filter(_ != 0).length
val loc = BlockManagerId("a", "b", 10)
val status = MapStatus(loc, sizes)
val status1 = compressAndDecompressMapStatus(status)
assert(status1.isInstanceOf[HighlyCompressedMapStatus])
assert(status1.isInstanceOf[MapStatusTrackingEmptyBlocks])
assert(status1.location == loc)
for (i <- 0 until 3000) {
val estimate = status1.getSizeForBlock(i)
Expand Down