Skip to content
Closed
Prev Previous commit
Next Next commit
update roaringbitmap to 0.5.10 and runOptimize
  • Loading branch information
yaooqinn committed Nov 13, 2015
commit 8f3eb7d25d0ca4465a5dc50aca5954a444ef82e0
1 change: 0 additions & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@
<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
<version>0.4.5</version>
</dependency>
<dependency>
<groupId>commons-net</groupId>
Expand Down
13 changes: 7 additions & 6 deletions core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,11 @@ private[spark] class CompressedMapStatus(

/**
* A [[MapStatus]] implementation that only stores the average size of non-empty blocks,
* plus a bitmap for tracking which blocks are empty. During serialization, this bitmap
* is compressed.
* plus a bitmap for tracking which blocks are empty(isSparse)/non-empty(!isSparse).
*
* @param loc location where the task is being executed
* @param numNonEmptyBlocks the number of non-empty blocks
* @param markedBlocks a bitmap tracking which blocks are empty
* @param markedBlocks a bitmap tracking which blocks are empty(isSparse)/non-empty(!isSparse)
* @param avgSize average size of the non-empty blocks
*/
private[spark] class HighlyCompressedMapStatus private (
Expand Down Expand Up @@ -176,9 +175,7 @@ private[spark] object HighlyCompressedMapStatus {
var i = 0
var numNonEmptyBlocks: Int = 0
var totalSize: Long = 0
// From a compression standpoint, it shouldn't matter whether we track empty or non-empty
// blocks. From a performance standpoint, we benefit from tracking empty blocks because
// we expect that there will be far fewer of them, so we will perform fewer bitmap insertions.

val emptyBlocks = new RoaringBitmap()
val nonEmptyBlocks = new RoaringBitmap()
val totalNumBlocks = uncompressedSizes.length
Expand All @@ -199,8 +196,12 @@ private[spark] object HighlyCompressedMapStatus {
0
}
if (numNonEmptyBlocks < totalNumBlocks / 2) {
// If non-empty blocks are sparse, we track non-empty blocks and set `isSparse` to true.
nonEmptyBlocks.runOptimize()
new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, nonEmptyBlocks, avgSize, isSparse = true)
} else {
// If non-empty blocks are dense, we track empty blocks and set `isSparse` to false.
emptyBlocks.runOptimize()
new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize, isSparse = false)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,24 @@ import java.io.{EOFException, IOException, InputStream, OutputStream}
import java.nio.ByteBuffer
import javax.annotation.Nullable

import org.roaringbitmap.{BitmapContainer, RoaringBitmap, RoaringArray, ArrayContainer}

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag

import com.esotericsoftware.kryo.{Kryo, KryoException}
import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer}
import com.esotericsoftware.kryo.{Kryo, KryoException}
import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator}
import org.apache.avro.generic.{GenericData, GenericRecord}

import org.apache.spark._
import org.apache.spark.api.python.PythonBroadcast
import org.apache.spark.broadcast.HttpBroadcast
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus}
import org.apache.spark.storage._
import org.apache.spark.util.{Utils, BoundedPriorityQueue, SerializableConfiguration, SerializableJobConf}
import org.apache.spark.util.collection.{BitSet, CompactBuffer}
import org.apache.spark.util.{BoundedPriorityQueue, SerializableConfiguration, SerializableJobConf, Utils}
import org.roaringbitmap.RoaringBitmap

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag

/**
* A Spark serializer that uses the [[https://code.google.com/p/kryo/ Kryo serialization library]].
Expand Down Expand Up @@ -365,11 +363,6 @@ private[serializer] object KryoSerializer {
classOf[CompressedMapStatus],
classOf[HighlyCompressedMapStatus],
classOf[RoaringBitmap],
classOf[RoaringArray],
classOf[RoaringArray.Element],
classOf[Array[RoaringArray.Element]],
classOf[ArrayContainer],
classOf[BitmapContainer],
classOf[BitSet],
classOf[CompactBuffer[_]],
classOf[BlockManagerId],
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@
<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
<version>0.4.5</version>
<version>0.5.10</version>
</dependency>
<dependency>
<groupId>commons-net</groupId>
Expand Down