From 207a8b69642e024dd0658adb90b6e947eb40afe0 Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Fri, 9 Oct 2015 12:12:34 -0700 Subject: [PATCH 1/7] [SPARK-11016] Move RoaringBitmap to explicit Kryo serializer --- .../spark/serializer/KryoSerializer.scala | 64 ++++++++++++++++--- 1 file changed, 55 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index c5195c1143a8..ac6de258dfb0 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -17,7 +17,7 @@ package org.apache.spark.serializer -import java.io.{EOFException, IOException, InputStream, OutputStream} +import java.io.{EOFException, IOException, InputStream, OutputStream, DataInput, DataOutput} import java.nio.ByteBuffer import javax.annotation.Nullable @@ -25,12 +25,12 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag -import com.esotericsoftware.kryo.{Kryo, KryoException} +import com.esotericsoftware.kryo.{Kryo, KryoException, Serializer => KryoClassSerializer} import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer} import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator} import org.apache.avro.generic.{GenericData, GenericRecord} -import org.roaringbitmap.{ArrayContainer, BitmapContainer, RoaringArray, RoaringBitmap} +import org.roaringbitmap.RoaringBitmap import org.apache.spark._ import org.apache.spark.api.python.PythonBroadcast @@ -94,6 +94,9 @@ class KryoSerializer(conf: SparkConf) for (cls <- KryoSerializer.toRegister) { kryo.register(cls) } + for ((cls, ser) <- KryoSerializer.toRegisterSerializer) { + kryo.register(cls, ser) + } // For results returned by asJavaIterable. See JavaIterableWrapperSerializer. kryo.register(JavaIterableWrapperSerializer.wrapperClass, new JavaIterableWrapperSerializer) @@ -363,12 +366,6 @@ private[serializer] object KryoSerializer { classOf[StorageLevel], classOf[CompressedMapStatus], classOf[HighlyCompressedMapStatus], - classOf[RoaringBitmap], - classOf[RoaringArray], - classOf[RoaringArray.Element], - classOf[Array[RoaringArray.Element]], - classOf[ArrayContainer], - classOf[BitmapContainer], classOf[CompactBuffer[_]], classOf[BlockManagerId], classOf[Array[Byte]], @@ -377,6 +374,55 @@ private[serializer] object KryoSerializer { classOf[BoundedPriorityQueue[_]], classOf[SparkConf] ) + + private val toRegisterSerializer = Map[Class[_], KryoClassSerializer[_]]( + classOf[RoaringBitmap] -> new KryoClassSerializer[RoaringBitmap](){ + override def write(kryo: Kryo, output: KryoOutput, bitmap: RoaringBitmap): Unit = + bitmap.serialize(new KryoOutputDataOutputBridge(output)) + override def read(kryo: Kryo, input: KryoInput, clazz: Class[RoaringBitmap]): + RoaringBitmap = { + val ret = new RoaringBitmap + ret.deserialize(new KryoInputDataInputBridge(input)) + ret + } + } + ) +} + +private[serializer] class KryoInputDataInputBridge(input : KryoInput) extends DataInput { + override def readLong(): Long = input.readLong() + override def readChar(): Char = input.readChar() + override def readFloat(): Float = input.readFloat() + override def readByte(): Byte = input.readByte() + override def readShort(): Short = input.readShort() + override def readUTF(): String = input.readString() // readString in kryo does utf8 + override def readInt(): Int = input.readInt() + override def readUnsignedShort(): Int = input.readShortUnsigned() + override def skipBytes(n: Int): Int = input.skip(n.toLong).toInt + override def readFully(b: Array[Byte]): Unit = input.read(b) + override def readFully(b: Array[Byte], off: Int, len: Int): Unit = input.read(b, off, len) + override def readLine(): String = throw new UnsupportedOperationException("readLine") + override def readBoolean(): Boolean = input.readBoolean() + override def readUnsignedByte(): Int = input.readByteUnsigned() + override def readDouble(): Double = input.readDouble() +} + +private[serializer] class KryoOutputDataOutputBridge(output : KryoOutput) extends DataOutput { + override def writeFloat(v: Float): Unit = output.writeFloat(v) + // There is no "readChars" counterpart, except maybe "readLine", which is not supported + override def writeChars(s: String): Unit = throw new UnsupportedOperationException("writeChars") + override def writeDouble(v: Double): Unit = output.writeDouble(v) + override def writeUTF(s: String): Unit = output.writeString(s) // writeString in kryo does UTF8 + override def writeShort(v: Int): Unit = output.writeShort(v) + override def writeInt(v: Int): Unit = output.writeInt(v) + override def writeBoolean(v: Boolean): Unit = output.writeBoolean(v) + override def write(b: Int): Unit = output.write(b) + override def write(b: Array[Byte]): Unit = output.write(b) + override def write(b: Array[Byte], off: Int, len: Int): Unit = output.write(b, off, len) + override def writeBytes(s: String): Unit = output.writeString(s) + override def writeChar(v: Int): Unit = output.writeChar(v.toChar) + override def writeLong(v: Long): Unit = output.writeLong(v) + override def writeByte(v: Int): Unit = output.writeByte(v) } /** From 75c3209c4bba8902eb0a4c1649864106f015c39f Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 12 Nov 2015 19:25:02 +0800 Subject: [PATCH 2/7] MapStatus Using RoaringBitmap More Properly --- core/pom.xml | 5 +++ .../apache/spark/scheduler/MapStatus.scala | 34 ++++++++++++------- .../spark/serializer/KryoSerializer.scala | 8 +++++ pom.xml | 5 +++ 4 files changed, 40 insertions(+), 12 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index 570a25cf325a..fa4685db985c 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -173,6 +173,11 @@ net.jpountz.lz4 lz4 + + org.roaringbitmap + RoaringBitmap + 0.4.5 + commons-net commons-net diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala index 180c8d1827e1..a09cff36de66 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala @@ -20,8 +20,8 @@ package org.apache.spark.scheduler import java.io.{Externalizable, ObjectInput, ObjectOutput} import org.apache.spark.storage.BlockManagerId -import org.apache.spark.util.collection.BitSet import org.apache.spark.util.Utils +import org.roaringbitmap.RoaringBitmap /** * Result returned by a ShuffleMapTask to a scheduler. Includes the block manager address that the @@ -126,26 +126,27 @@ private[spark] class CompressedMapStatus( * * @param loc location where the task is being executed * @param numNonEmptyBlocks the number of non-empty blocks - * @param emptyBlocks a bitmap tracking which blocks are empty + * @param markedBlocks a bitmap tracking which blocks are empty * @param avgSize average size of the non-empty blocks */ private[spark] class HighlyCompressedMapStatus private ( private[this] var loc: BlockManagerId, private[this] var numNonEmptyBlocks: Int, - private[this] var emptyBlocks: BitSet, - private[this] var avgSize: Long) + private[this] var markedBlocks: RoaringBitmap, + private[this] var avgSize: Long, + private[this] var isSparse: Boolean) extends MapStatus with Externalizable { // loc could be null when the default constructor is called during deserialization require(loc == null || avgSize > 0 || numNonEmptyBlocks == 0, "Average size can only be zero for map stages that produced no output") - protected def this() = this(null, -1, null, -1) // For deserialization only + protected def this() = this(null, -1, null, -1, false) // For deserialization only override def location: BlockManagerId = loc override def getSizeForBlock(reduceId: Int): Long = { - if (emptyBlocks.get(reduceId)) { + if (isSparse ^ markedBlocks.contains(reduceId)) { 0 } else { avgSize @@ -154,15 +155,17 @@ private[spark] class HighlyCompressedMapStatus private ( override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { loc.writeExternal(out) - emptyBlocks.writeExternal(out) + markedBlocks.writeExternal(out) out.writeLong(avgSize) + out.writeBoolean(isSparse) } override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { loc = BlockManagerId(in) - emptyBlocks = new BitSet - emptyBlocks.readExternal(in) + markedBlocks = new RoaringBitmap() + markedBlocks.readExternal(in) avgSize = in.readLong() + isSparse = in.readBoolean() } } @@ -176,15 +179,17 @@ private[spark] object HighlyCompressedMapStatus { // From a compression standpoint, it shouldn't matter whether we track empty or non-empty // blocks. From a performance standpoint, we benefit from tracking empty blocks because // we expect that there will be far fewer of them, so we will perform fewer bitmap insertions. + val emptyBlocks = new RoaringBitmap() + val nonEmptyBlocks = new RoaringBitmap() val totalNumBlocks = uncompressedSizes.length - val emptyBlocks = new BitSet(totalNumBlocks) while (i < totalNumBlocks) { var size = uncompressedSizes(i) if (size > 0) { numNonEmptyBlocks += 1 + nonEmptyBlocks.add(i) totalSize += size } else { - emptyBlocks.set(i) + emptyBlocks.add(i) } i += 1 } @@ -193,6 +198,11 @@ private[spark] object HighlyCompressedMapStatus { } else { 0 } - new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize) + if (numNonEmptyBlocks < totalNumBlocks / 2) { + new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, nonEmptyBlocks, avgSize, isSparse = true) + } else { + new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize, isSparse = false) + } + } } diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index bc51d4f2820c..0bdaca288f2d 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -21,6 +21,8 @@ import java.io.{EOFException, IOException, InputStream, OutputStream} import java.nio.ByteBuffer import javax.annotation.Nullable +import org.roaringbitmap.{BitmapContainer, RoaringBitmap, RoaringArray, ArrayContainer} + import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag @@ -362,6 +364,12 @@ private[serializer] object KryoSerializer { classOf[StorageLevel], classOf[CompressedMapStatus], classOf[HighlyCompressedMapStatus], + classOf[RoaringBitmap], + classOf[RoaringArray], + classOf[RoaringArray.Element], + classOf[Array[RoaringArray.Element]], + classOf[ArrayContainer], + classOf[BitmapContainer], classOf[BitSet], classOf[CompactBuffer[_]], classOf[BlockManagerId], diff --git a/pom.xml b/pom.xml index 4ed1c0c82dee..2d5348d9afd2 100644 --- a/pom.xml +++ b/pom.xml @@ -623,6 +623,11 @@ + + org.roaringbitmap + RoaringBitmap + 0.4.5 + commons-net commons-net From 8f3eb7d25d0ca4465a5dc50aca5954a444ef82e0 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Fri, 13 Nov 2015 10:24:13 +0800 Subject: [PATCH 3/7] update roaringbitmap to 0.5.10 and runOptimize --- core/pom.xml | 1 - .../apache/spark/scheduler/MapStatus.scala | 13 ++++++------ .../spark/serializer/KryoSerializer.scala | 21 +++++++------------ pom.xml | 2 +- 4 files changed, 15 insertions(+), 22 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index fa4685db985c..5e9e758d72b7 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -176,7 +176,6 @@ org.roaringbitmap RoaringBitmap - 0.4.5 commons-net diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala index a09cff36de66..392867467172 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala @@ -121,12 +121,11 @@ private[spark] class CompressedMapStatus( /** * A [[MapStatus]] implementation that only stores the average size of non-empty blocks, - * plus a bitmap for tracking which blocks are empty. During serialization, this bitmap - * is compressed. + * plus a bitmap for tracking which blocks are empty(isSparse)/non-empty(!isSparse). * * @param loc location where the task is being executed * @param numNonEmptyBlocks the number of non-empty blocks - * @param markedBlocks a bitmap tracking which blocks are empty + * @param markedBlocks a bitmap tracking which blocks are empty(isSparse)/non-empty(!isSparse) * @param avgSize average size of the non-empty blocks */ private[spark] class HighlyCompressedMapStatus private ( @@ -176,9 +175,7 @@ private[spark] object HighlyCompressedMapStatus { var i = 0 var numNonEmptyBlocks: Int = 0 var totalSize: Long = 0 - // From a compression standpoint, it shouldn't matter whether we track empty or non-empty - // blocks. From a performance standpoint, we benefit from tracking empty blocks because - // we expect that there will be far fewer of them, so we will perform fewer bitmap insertions. + val emptyBlocks = new RoaringBitmap() val nonEmptyBlocks = new RoaringBitmap() val totalNumBlocks = uncompressedSizes.length @@ -199,8 +196,12 @@ private[spark] object HighlyCompressedMapStatus { 0 } if (numNonEmptyBlocks < totalNumBlocks / 2) { + // If non-empty blocks are sparse, we track non-empty blocks and set `isSparse` to true. + nonEmptyBlocks.runOptimize() new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, nonEmptyBlocks, avgSize, isSparse = true) } else { + // If non-empty blocks are dense, we track empty blocks and set `isSparse` to false. + emptyBlocks.runOptimize() new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize, isSparse = false) } diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 0bdaca288f2d..183639c5407c 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -21,26 +21,24 @@ import java.io.{EOFException, IOException, InputStream, OutputStream} import java.nio.ByteBuffer import javax.annotation.Nullable -import org.roaringbitmap.{BitmapContainer, RoaringBitmap, RoaringArray, ArrayContainer} - -import scala.collection.JavaConverters._ -import scala.collection.mutable.ArrayBuffer -import scala.reflect.ClassTag - -import com.esotericsoftware.kryo.{Kryo, KryoException} import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer} +import com.esotericsoftware.kryo.{Kryo, KryoException} import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator} import org.apache.avro.generic.{GenericData, GenericRecord} - import org.apache.spark._ import org.apache.spark.api.python.PythonBroadcast import org.apache.spark.broadcast.HttpBroadcast import org.apache.spark.network.util.ByteUnit import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus} import org.apache.spark.storage._ -import org.apache.spark.util.{Utils, BoundedPriorityQueue, SerializableConfiguration, SerializableJobConf} import org.apache.spark.util.collection.{BitSet, CompactBuffer} +import org.apache.spark.util.{BoundedPriorityQueue, SerializableConfiguration, SerializableJobConf, Utils} +import org.roaringbitmap.RoaringBitmap + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag /** * A Spark serializer that uses the [[https://code.google.com/p/kryo/ Kryo serialization library]]. @@ -365,11 +363,6 @@ private[serializer] object KryoSerializer { classOf[CompressedMapStatus], classOf[HighlyCompressedMapStatus], classOf[RoaringBitmap], - classOf[RoaringArray], - classOf[RoaringArray.Element], - classOf[Array[RoaringArray.Element]], - classOf[ArrayContainer], - classOf[BitmapContainer], classOf[BitSet], classOf[CompactBuffer[_]], classOf[BlockManagerId], diff --git a/pom.xml b/pom.xml index 2d5348d9afd2..c965bcb0056a 100644 --- a/pom.xml +++ b/pom.xml @@ -626,7 +626,7 @@ org.roaringbitmap RoaringBitmap - 0.4.5 + 0.5.10 commons-net From b4b7a1c835235826d512f1993c00ca35e445fcba Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Fri, 13 Nov 2015 16:38:29 +0800 Subject: [PATCH 4/7] add roaringbitmap runOptimise tests --- .../apache/spark/scheduler/MapStatus.scala | 8 ++--- .../spark/scheduler/MapStatusSuite.scala | 29 +++++++++++++++++++ 2 files changed, 33 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala index 392867467172..03e2062236bc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala @@ -175,7 +175,6 @@ private[spark] object HighlyCompressedMapStatus { var i = 0 var numNonEmptyBlocks: Int = 0 var totalSize: Long = 0 - val emptyBlocks = new RoaringBitmap() val nonEmptyBlocks = new RoaringBitmap() val totalNumBlocks = uncompressedSizes.length @@ -196,14 +195,15 @@ private[spark] object HighlyCompressedMapStatus { 0 } if (numNonEmptyBlocks < totalNumBlocks / 2) { - // If non-empty blocks are sparse, we track non-empty blocks and set `isSparse` to true. + // If non-empty blocks are sparse, we track non-empty blocks and set `isSparse` true. nonEmptyBlocks.runOptimize() + nonEmptyBlocks.trim() new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, nonEmptyBlocks, avgSize, isSparse = true) } else { - // If non-empty blocks are dense, we track empty blocks and set `isSparse` to false. + // If non-empty blocks are dense, we track empty blocks and set `isSparse` false. emptyBlocks.runOptimize() + emptyBlocks.trim() new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize, isSparse = false) } - } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala index b8e466fab450..b6dcad61e118 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala @@ -21,6 +21,7 @@ import org.apache.spark.storage.BlockManagerId import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.serializer.JavaSerializer +import org.roaringbitmap.RoaringBitmap import scala.util.Random @@ -97,4 +98,32 @@ class MapStatusSuite extends SparkFunSuite { val buf = ser.newInstance().serialize(status) ser.newInstance().deserialize[MapStatus](buf) } + + test("RoaringBitmap: runOptimize succeeded") { + val r = new RoaringBitmap + for (i <- 1 to 200000) { + if(i % 200 != 0) + r.add(i) + } + val size1 = r.getSizeInBytes + val success = r.runOptimize() + r.trim() + val size2 = r.getSizeInBytes + assert(size1 > size2) + assert(success) + } + + test("RoaringBitmap: runOptimize failed") { + val r = new RoaringBitmap + for (i <- 1 to 200000) { + if(i % 200 == 0) + r.add(i) + } + val size1 = r.getSizeInBytes + val success = r.runOptimize() + r.trim() + val size2 = r.getSizeInBytes + assert(size1 === size2) + assert(!success) + } } From b9cc41439203f6924e75836a39fc603ea0714ceb Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Mon, 16 Nov 2015 11:05:51 +0800 Subject: [PATCH 5/7] runOptimize and trim only --- .../apache/spark/scheduler/MapStatus.scala | 37 ++++++------------- 1 file changed, 12 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala index 03e2062236bc..42c6788773b7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala @@ -121,31 +121,30 @@ private[spark] class CompressedMapStatus( /** * A [[MapStatus]] implementation that only stores the average size of non-empty blocks, - * plus a bitmap for tracking which blocks are empty(isSparse)/non-empty(!isSparse). + * plus a bitmap for tracking which blocks are empty. * * @param loc location where the task is being executed * @param numNonEmptyBlocks the number of non-empty blocks - * @param markedBlocks a bitmap tracking which blocks are empty(isSparse)/non-empty(!isSparse) + * @param emptyBlocks a bitmap tracking which blocks are empty * @param avgSize average size of the non-empty blocks */ private[spark] class HighlyCompressedMapStatus private ( private[this] var loc: BlockManagerId, private[this] var numNonEmptyBlocks: Int, - private[this] var markedBlocks: RoaringBitmap, - private[this] var avgSize: Long, - private[this] var isSparse: Boolean) + private[this] var emptyBlocks: RoaringBitmap, + private[this] var avgSize: Long) extends MapStatus with Externalizable { // loc could be null when the default constructor is called during deserialization require(loc == null || avgSize > 0 || numNonEmptyBlocks == 0, "Average size can only be zero for map stages that produced no output") - protected def this() = this(null, -1, null, -1, false) // For deserialization only + protected def this() = this(null, -1, null, -1) // For deserialization only override def location: BlockManagerId = loc override def getSizeForBlock(reduceId: Int): Long = { - if (isSparse ^ markedBlocks.contains(reduceId)) { + if (emptyBlocks.contains(reduceId)) { 0 } else { avgSize @@ -154,17 +153,15 @@ private[spark] class HighlyCompressedMapStatus private ( override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { loc.writeExternal(out) - markedBlocks.writeExternal(out) + emptyBlocks.writeExternal(out) out.writeLong(avgSize) - out.writeBoolean(isSparse) } override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { loc = BlockManagerId(in) - markedBlocks = new RoaringBitmap() - markedBlocks.readExternal(in) + emptyBlocks = new RoaringBitmap() + emptyBlocks.readExternal(in) avgSize = in.readLong() - isSparse = in.readBoolean() } } @@ -176,13 +173,11 @@ private[spark] object HighlyCompressedMapStatus { var numNonEmptyBlocks: Int = 0 var totalSize: Long = 0 val emptyBlocks = new RoaringBitmap() - val nonEmptyBlocks = new RoaringBitmap() val totalNumBlocks = uncompressedSizes.length while (i < totalNumBlocks) { var size = uncompressedSizes(i) if (size > 0) { numNonEmptyBlocks += 1 - nonEmptyBlocks.add(i) totalSize += size } else { emptyBlocks.add(i) @@ -194,16 +189,8 @@ private[spark] object HighlyCompressedMapStatus { } else { 0 } - if (numNonEmptyBlocks < totalNumBlocks / 2) { - // If non-empty blocks are sparse, we track non-empty blocks and set `isSparse` true. - nonEmptyBlocks.runOptimize() - nonEmptyBlocks.trim() - new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, nonEmptyBlocks, avgSize, isSparse = true) - } else { - // If non-empty blocks are dense, we track empty blocks and set `isSparse` false. - emptyBlocks.runOptimize() - emptyBlocks.trim() - new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize, isSparse = false) - } + emptyBlocks.runOptimize() + emptyBlocks.trim() + new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize) } } From 64e7295c7fd0f193e8b74f07a2823a478e6c1796 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Mon, 16 Nov 2015 16:01:00 +0800 Subject: [PATCH 6/7] fix style --- .../apache/spark/scheduler/MapStatusSuite.scala | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala index b6dcad61e118..15c8de61b824 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala @@ -101,10 +101,11 @@ class MapStatusSuite extends SparkFunSuite { test("RoaringBitmap: runOptimize succeeded") { val r = new RoaringBitmap - for (i <- 1 to 200000) { - if(i % 200 != 0) + (1 to 200000).foreach(i => + if (i % 200 != 0) { r.add(i) - } + } + ) val size1 = r.getSizeInBytes val success = r.runOptimize() r.trim() @@ -115,10 +116,11 @@ class MapStatusSuite extends SparkFunSuite { test("RoaringBitmap: runOptimize failed") { val r = new RoaringBitmap - for (i <- 1 to 200000) { - if(i % 200 == 0) + (1 to 200000).foreach(i => + if (i % 200 == 0) { r.add(i) - } + } + ) val size1 = r.getSizeInBytes val success = r.runOptimize() r.trim() From f69061dc615a94f03bafeddc64dd504c4bf48b12 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Mon, 16 Nov 2015 22:37:33 -0800 Subject: [PATCH 7/7] update to 0.5.11 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 3853ce78cbb4..940e2d8740bf 100644 --- a/pom.xml +++ b/pom.xml @@ -637,7 +637,7 @@ org.roaringbitmap RoaringBitmap - 0.5.10 + 0.5.11 commons-net