-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-11583] [Core]MapStatus Using RoaringBitmap More Properly #9661
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
75c3209
8f3eb7d
b4b7a1c
372dc4c
b9cc414
64e7295
0c4bcba
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -121,12 +121,11 @@ private[spark] class CompressedMapStatus( | |
|
|
||
| /** | ||
| * A [[MapStatus]] implementation that only stores the average size of non-empty blocks, | ||
| * plus a bitmap for tracking which blocks are empty. During serialization, this bitmap | ||
| * is compressed. | ||
| * plus a bitmap for tracking which blocks are empty(isSparse)/non-empty(!isSparse). | ||
| * | ||
| * @param loc location where the task is being executed | ||
| * @param numNonEmptyBlocks the number of non-empty blocks | ||
| * @param markedBlocks a bitmap tracking which blocks are empty | ||
| * @param markedBlocks a bitmap tracking which blocks are empty(isSparse)/non-empty(!isSparse) | ||
| * @param avgSize average size of the non-empty blocks | ||
| */ | ||
| private[spark] class HighlyCompressedMapStatus private ( | ||
|
|
@@ -176,9 +175,7 @@ private[spark] object HighlyCompressedMapStatus { | |
| var i = 0 | ||
| var numNonEmptyBlocks: Int = 0 | ||
| var totalSize: Long = 0 | ||
| // From a compression standpoint, it shouldn't matter whether we track empty or non-empty | ||
| // blocks. From a performance standpoint, we benefit from tracking empty blocks because | ||
| // we expect that there will be far fewer of them, so we will perform fewer bitmap insertions. | ||
|
|
||
| val emptyBlocks = new RoaringBitmap() | ||
| val nonEmptyBlocks = new RoaringBitmap() | ||
| val totalNumBlocks = uncompressedSizes.length | ||
|
|
@@ -199,8 +196,12 @@ private[spark] object HighlyCompressedMapStatus { | |
| 0 | ||
| } | ||
| if (numNonEmptyBlocks < totalNumBlocks / 2) { | ||
| // If non-empty blocks are sparse, we track non-empty blocks and set `isSparse` to true. | ||
| nonEmptyBlocks.runOptimize() | ||
| new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, nonEmptyBlocks, avgSize, isSparse = true) | ||
| } else { | ||
| // If non-empty blocks are dense, we track empty blocks and set `isSparse` to false. | ||
| emptyBlocks.runOptimize() | ||
| new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize, isSparse = false) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (a) |
||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,26 +21,24 @@ import java.io.{EOFException, IOException, InputStream, OutputStream} | |
| import java.nio.ByteBuffer | ||
| import javax.annotation.Nullable | ||
|
|
||
| import org.roaringbitmap.{BitmapContainer, RoaringBitmap, RoaringArray, ArrayContainer} | ||
|
|
||
| import scala.collection.JavaConverters._ | ||
| import scala.collection.mutable.ArrayBuffer | ||
| import scala.reflect.ClassTag | ||
|
|
||
| import com.esotericsoftware.kryo.{Kryo, KryoException} | ||
| import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} | ||
| import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer} | ||
| import com.esotericsoftware.kryo.{Kryo, KryoException} | ||
| import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator} | ||
| import org.apache.avro.generic.{GenericData, GenericRecord} | ||
|
|
||
| import org.apache.spark._ | ||
| import org.apache.spark.api.python.PythonBroadcast | ||
| import org.apache.spark.broadcast.HttpBroadcast | ||
| import org.apache.spark.network.util.ByteUnit | ||
| import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus} | ||
| import org.apache.spark.storage._ | ||
| import org.apache.spark.util.{Utils, BoundedPriorityQueue, SerializableConfiguration, SerializableJobConf} | ||
| import org.apache.spark.util.collection.{BitSet, CompactBuffer} | ||
| import org.apache.spark.util.{BoundedPriorityQueue, SerializableConfiguration, SerializableJobConf, Utils} | ||
| import org.roaringbitmap.RoaringBitmap | ||
|
|
||
| import scala.collection.JavaConverters._ | ||
| import scala.collection.mutable.ArrayBuffer | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you update your import rules to place scale package before third parties (as before this PR)?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK |
||
| import scala.reflect.ClassTag | ||
|
|
||
| /** | ||
| * A Spark serializer that uses the [[https://code.google.com/p/kryo/ Kryo serialization library]]. | ||
|
|
@@ -365,11 +363,6 @@ private[serializer] object KryoSerializer { | |
| classOf[CompressedMapStatus], | ||
| classOf[HighlyCompressedMapStatus], | ||
| classOf[RoaringBitmap], | ||
| classOf[RoaringArray], | ||
| classOf[RoaringArray.Element], | ||
| classOf[Array[RoaringArray.Element]], | ||
| classOf[ArrayContainer], | ||
| classOf[BitmapContainer], | ||
| classOf[BitSet], | ||
| classOf[CompactBuffer[_]], | ||
| classOf[BlockManagerId], | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment still make sense, could you keep it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK