Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,7 @@ private[spark] class CompressedMapStatus(

/**
* A [[MapStatus]] implementation that only stores the average size of non-empty blocks,
* plus a bitmap for tracking which blocks are empty. During serialization, this bitmap
* is compressed.
* plus a bitmap for tracking which blocks are empty.
*
* @param loc location where the task is being executed
* @param numNonEmptyBlocks the number of non-empty blocks
Expand Down Expand Up @@ -194,6 +193,8 @@ private[spark] object HighlyCompressedMapStatus {
} else {
0
}
emptyBlocks.trim()
emptyBlocks.runOptimize()
new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@

package org.apache.spark.serializer

import java.io.{EOFException, IOException, InputStream, OutputStream, DataInput, DataOutput}
import java.io.{DataInput, DataOutput, EOFException, IOException, InputStream, OutputStream}
import java.nio.ByteBuffer
import javax.annotation.Nullable

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag

import com.esotericsoftware.kryo.{Kryo, KryoException, Serializer => KryoClassSerializer}
import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer}
import com.esotericsoftware.kryo.{Kryo, KryoException, Serializer => KryoClassSerializer}
import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator}
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.roaringbitmap.RoaringBitmap
Expand All @@ -38,8 +38,8 @@ import org.apache.spark.broadcast.HttpBroadcast
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus}
import org.apache.spark.storage._
import org.apache.spark.util.{Utils, BoundedPriorityQueue, SerializableConfiguration, SerializableJobConf}
import org.apache.spark.util.collection.CompactBuffer
import org.apache.spark.util.{BoundedPriorityQueue, SerializableConfiguration, SerializableJobConf, Utils}

/**
* A Spark serializer that uses the [[https://code.google.com/p/kryo/ Kryo serialization library]].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.storage.BlockManagerId

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.serializer.JavaSerializer
import org.roaringbitmap.RoaringBitmap

import scala.util.Random

Expand Down Expand Up @@ -97,4 +98,34 @@ class MapStatusSuite extends SparkFunSuite {
val buf = ser.newInstance().serialize(status)
ser.newInstance().deserialize[MapStatus](buf)
}

test("RoaringBitmap: runOptimize succeeded") {
val r = new RoaringBitmap
(1 to 200000).foreach(i =>
if (i % 200 != 0) {
r.add(i)
}
)
val size1 = r.getSizeInBytes
val success = r.runOptimize()
r.trim()
val size2 = r.getSizeInBytes
assert(size1 > size2)
assert(success)
}

test("RoaringBitmap: runOptimize failed") {
val r = new RoaringBitmap
(1 to 200000).foreach(i =>
if (i % 200 == 0) {
r.add(i)
}
)
val size1 = r.getSizeInBytes
val success = r.runOptimize()
r.trim()
val size2 = r.getSizeInBytes
assert(size1 === size2)
assert(!success)
}
}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@
<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
<version>0.4.5</version>
<version>0.5.11</version>
</dependency>
<dependency>
<groupId>commons-net</groupId>
Expand Down