Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Use Spark BitSet instead of RoaringBitmap to reduce memory usage.
  • Loading branch information
viirya committed Oct 23, 2015
commit 392975d3b5c48bc61bfa6caff7bfd23b9e095cde
13 changes: 6 additions & 7 deletions core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ package org.apache.spark.scheduler

import java.io.{Externalizable, ObjectInput, ObjectOutput}

import org.roaringbitmap.RoaringBitmap

import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.collection.BitSet
import org.apache.spark.util.Utils

/**
Expand Down Expand Up @@ -133,7 +132,7 @@ private[spark] class CompressedMapStatus(
private[spark] class HighlyCompressedMapStatus private (
private[this] var loc: BlockManagerId,
private[this] var numNonEmptyBlocks: Int,
private[this] var emptyBlocks: RoaringBitmap,
private[this] var emptyBlocks: BitSet,
private[this] var avgSize: Long)
extends MapStatus with Externalizable {

Expand All @@ -146,7 +145,7 @@ private[spark] class HighlyCompressedMapStatus private (
override def location: BlockManagerId = loc

override def getSizeForBlock(reduceId: Int): Long = {
if (emptyBlocks.contains(reduceId)) {
if (emptyBlocks.get(reduceId)) {
0
} else {
avgSize
Expand All @@ -161,7 +160,7 @@ private[spark] class HighlyCompressedMapStatus private (

override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
loc = BlockManagerId(in)
emptyBlocks = new RoaringBitmap()
emptyBlocks = new BitSet(0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except that I'd make this a call to the no-arg constructor. It's almost a plus, in that it verifies it exists for exactly the purpose it's needed.

Any objections? i'll merge soon anyway.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated it.

emptyBlocks.readExternal(in)
avgSize = in.readLong()
}
Expand All @@ -177,15 +176,15 @@ private[spark] object HighlyCompressedMapStatus {
// From a compression standpoint, it shouldn't matter whether we track empty or non-empty
// blocks. From a performance standpoint, we benefit from tracking empty blocks because
// we expect that there will be far fewer of them, so we will perform fewer bitmap insertions.
val emptyBlocks = new RoaringBitmap()
val totalNumBlocks = uncompressedSizes.length
val emptyBlocks = new BitSet(totalNumBlocks)
while (i < totalNumBlocks) {
var size = uncompressedSizes(i)
if (size > 0) {
numNonEmptyBlocks += 1
totalSize += size
} else {
emptyBlocks.add(i)
emptyBlocks.set(i)
}
i += 1
}
Expand Down
26 changes: 23 additions & 3 deletions core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,19 @@

package org.apache.spark.util.collection

import java.io.{Externalizable, ObjectInput, ObjectOutput}

import org.apache.spark.util.{Utils => UUtils}


/**
* A simple, fixed-size bit set implementation. This implementation is fast because it avoids
* safety/bound checking.
*/
class BitSet(numBits: Int) extends Serializable {
class BitSet(private[this] var numBits: Int) extends Externalizable {

private val words = new Array[Long](bit2words(numBits))
private val numWords = words.length
private var words = new Array[Long](bit2words(numBits))
private def numWords = words.length

/**
* Compute the capacity (number of bits) that can be represented
Expand Down Expand Up @@ -230,4 +235,19 @@ class BitSet(numBits: Int) extends Serializable {

/** Return the number of longs it would take to hold numBits. */
private def bit2words(numBits: Int) = ((numBits - 1) >> 6) + 1

override def writeExternal(out: ObjectOutput): Unit = UUtils.tryOrIOException {
out.writeInt(numBits)
words.foreach(out.writeLong(_))
}

override def readExternal(in: ObjectInput): Unit = UUtils.tryOrIOException {
numBits = in.readInt()
words = new Array[Long](bit2words(numBits))
var index = 0
while (index < words.length) {
words(index) = in.readLong()
index += 1
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package org.apache.spark.util.collection

import java.io.{File, FileInputStream, FileOutputStream, ObjectInputStream, ObjectOutputStream}

import org.apache.spark.SparkFunSuite
import org.apache.spark.util.{Utils => UUtils}

class BitSetSuite extends SparkFunSuite {

Expand Down Expand Up @@ -152,4 +155,50 @@ class BitSetSuite extends SparkFunSuite {
assert(bitsetDiff.nextSetBit(85) === 85)
assert(bitsetDiff.nextSetBit(86) === -1)
}

test("read and write externally") {
val tempDir = UUtils.createTempDir()
val outputFile = File.createTempFile("bits", null, tempDir)

val fos = new FileOutputStream(outputFile)
val oos = new ObjectOutputStream(fos)

// Create BitSet
val setBits = Seq(0, 9, 1, 10, 90, 96)
val bitset = new BitSet(100)

for (i <- 0 until 100) {
assert(!bitset.get(i))
}

setBits.foreach(i => bitset.set(i))

for (i <- 0 until 100) {
if (setBits.contains(i)) {
assert(bitset.get(i))
} else {
assert(!bitset.get(i))
}
}
assert(bitset.cardinality() === setBits.size)

bitset.writeExternal(oos)
oos.close()

val fis = new FileInputStream(outputFile)
val ois = new ObjectInputStream(fis)

// Read BitSet from the file
val bitset2 = new BitSet(0)
bitset2.readExternal(ois)

for (i <- 0 until 100) {
if (setBits.contains(i)) {
assert(bitset2.get(i))
} else {
assert(!bitset2.get(i))
}
}
assert(bitset2.cardinality() === setBits.size)
}
}