Skip to content
Closed
Prev Previous commit
Next Next commit
MapStatus Using RoaringBitmap More Properly
  • Loading branch information
yaooqinn committed Nov 12, 2015
commit 75c3209c4bba8902eb0a4c1649864106f015c39f
5 changes: 5 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,11 @@
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
</dependency>
<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
<version>0.4.5</version>
</dependency>
<dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
Expand Down
34 changes: 22 additions & 12 deletions core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ package org.apache.spark.scheduler
import java.io.{Externalizable, ObjectInput, ObjectOutput}

import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.collection.BitSet
import org.apache.spark.util.Utils
import org.roaringbitmap.RoaringBitmap

/**
* Result returned by a ShuffleMapTask to a scheduler. Includes the block manager address that the
Expand Down Expand Up @@ -126,26 +126,27 @@ private[spark] class CompressedMapStatus(
*
* @param loc location where the task is being executed
* @param numNonEmptyBlocks the number of non-empty blocks
* @param emptyBlocks a bitmap tracking which blocks are empty
* @param markedBlocks a bitmap tracking which blocks are empty
* @param avgSize average size of the non-empty blocks
*/
private[spark] class HighlyCompressedMapStatus private (
private[this] var loc: BlockManagerId,
private[this] var numNonEmptyBlocks: Int,
private[this] var emptyBlocks: BitSet,
private[this] var avgSize: Long)
private[this] var markedBlocks: RoaringBitmap,
private[this] var avgSize: Long,
private[this] var isSparse: Boolean)
extends MapStatus with Externalizable {

// loc could be null when the default constructor is called during deserialization
require(loc == null || avgSize > 0 || numNonEmptyBlocks == 0,
"Average size can only be zero for map stages that produced no output")

protected def this() = this(null, -1, null, -1) // For deserialization only
protected def this() = this(null, -1, null, -1, false) // For deserialization only

override def location: BlockManagerId = loc

override def getSizeForBlock(reduceId: Int): Long = {
if (emptyBlocks.get(reduceId)) {
if (isSparse ^ markedBlocks.contains(reduceId)) {
0
} else {
avgSize
Expand All @@ -154,15 +155,17 @@ private[spark] class HighlyCompressedMapStatus private (

override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
loc.writeExternal(out)
emptyBlocks.writeExternal(out)
markedBlocks.writeExternal(out)
out.writeLong(avgSize)
out.writeBoolean(isSparse)
}

override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
loc = BlockManagerId(in)
emptyBlocks = new BitSet
emptyBlocks.readExternal(in)
markedBlocks = new RoaringBitmap()
markedBlocks.readExternal(in)
avgSize = in.readLong()
isSparse = in.readBoolean()
}
}

Expand All @@ -176,15 +179,17 @@ private[spark] object HighlyCompressedMapStatus {
// From a compression standpoint, it shouldn't matter whether we track empty or non-empty
// blocks. From a performance standpoint, we benefit from tracking empty blocks because
// we expect that there will be far fewer of them, so we will perform fewer bitmap insertions.
val emptyBlocks = new RoaringBitmap()
val nonEmptyBlocks = new RoaringBitmap()
val totalNumBlocks = uncompressedSizes.length
val emptyBlocks = new BitSet(totalNumBlocks)
while (i < totalNumBlocks) {
var size = uncompressedSizes(i)
if (size > 0) {
numNonEmptyBlocks += 1
nonEmptyBlocks.add(i)
totalSize += size
} else {
emptyBlocks.set(i)
emptyBlocks.add(i)
}
i += 1
}
Expand All @@ -193,6 +198,11 @@ private[spark] object HighlyCompressedMapStatus {
} else {
0
}
new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize)
if (numNonEmptyBlocks < totalNumBlocks / 2) {
new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, nonEmptyBlocks, avgSize, isSparse = true)
} else {
new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize, isSparse = false)
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.io.{EOFException, IOException, InputStream, OutputStream}
import java.nio.ByteBuffer
import javax.annotation.Nullable

import org.roaringbitmap.{BitmapContainer, RoaringBitmap, RoaringArray, ArrayContainer}

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
Expand Down Expand Up @@ -362,6 +364,12 @@ private[serializer] object KryoSerializer {
classOf[StorageLevel],
classOf[CompressedMapStatus],
classOf[HighlyCompressedMapStatus],
classOf[RoaringBitmap],
classOf[RoaringArray],
classOf[RoaringArray.Element],
classOf[Array[RoaringArray.Element]],
classOf[ArrayContainer],
classOf[BitmapContainer],
classOf[BitSet],
classOf[CompactBuffer[_]],
classOf[BlockManagerId],
Expand Down
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
<version>0.4.5</version>
</dependency>
<dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
Expand Down