-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-11016] Move RoaringBitmap to explicit Kryo serializer #9748
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
207a8b6
9947bdc
f91a22f
ba0967c
7fcbf66
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,20 +17,20 @@ | |
|
|
||
| package org.apache.spark.serializer | ||
|
|
||
| import java.io.{EOFException, IOException, InputStream, OutputStream} | ||
| import java.io.{EOFException, IOException, InputStream, OutputStream, DataInput, DataOutput} | ||
| import java.nio.ByteBuffer | ||
| import javax.annotation.Nullable | ||
|
|
||
| import scala.collection.JavaConverters._ | ||
| import scala.collection.mutable.ArrayBuffer | ||
| import scala.reflect.ClassTag | ||
|
|
||
| import com.esotericsoftware.kryo.{Kryo, KryoException} | ||
| import com.esotericsoftware.kryo.{Kryo, KryoException, Serializer => KryoClassSerializer} | ||
| import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} | ||
| import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer} | ||
| import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator} | ||
| import org.apache.avro.generic.{GenericData, GenericRecord} | ||
| import org.roaringbitmap.{ArrayContainer, BitmapContainer, RoaringArray, RoaringBitmap} | ||
| import org.roaringbitmap.RoaringBitmap | ||
|
|
||
| import org.apache.spark._ | ||
| import org.apache.spark.api.python.PythonBroadcast | ||
|
|
@@ -94,6 +94,9 @@ class KryoSerializer(conf: SparkConf) | |
| for (cls <- KryoSerializer.toRegister) { | ||
| kryo.register(cls) | ||
| } | ||
| for ((cls, ser) <- KryoSerializer.toRegisterSerializer) { | ||
| kryo.register(cls, ser) | ||
| } | ||
|
|
||
| // For results returned by asJavaIterable. See JavaIterableWrapperSerializer. | ||
| kryo.register(JavaIterableWrapperSerializer.wrapperClass, new JavaIterableWrapperSerializer) | ||
|
|
@@ -363,12 +366,6 @@ private[serializer] object KryoSerializer { | |
| classOf[StorageLevel], | ||
| classOf[CompressedMapStatus], | ||
| classOf[HighlyCompressedMapStatus], | ||
| classOf[RoaringBitmap], | ||
| classOf[RoaringArray], | ||
| classOf[RoaringArray.Element], | ||
| classOf[Array[RoaringArray.Element]], | ||
| classOf[ArrayContainer], | ||
| classOf[BitmapContainer], | ||
| classOf[CompactBuffer[_]], | ||
| classOf[BlockManagerId], | ||
| classOf[Array[Byte]], | ||
|
|
@@ -377,6 +374,55 @@ private[serializer] object KryoSerializer { | |
| classOf[BoundedPriorityQueue[_]], | ||
| classOf[SparkConf] | ||
| ) | ||
|
|
||
| private val toRegisterSerializer = Map[Class[_], KryoClassSerializer[_]]( | ||
| classOf[RoaringBitmap] -> new KryoClassSerializer[RoaringBitmap](){ | ||
| override def write(kryo: Kryo, output: KryoOutput, bitmap: RoaringBitmap): Unit = | ||
| bitmap.serialize(new KryoOutputDataOutputBridge(output)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Indentation is off. |
||
| override def read(kryo: Kryo, input: KryoInput, clazz: Class[RoaringBitmap]): | ||
| RoaringBitmap = { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Indentation is off. And needn't to break the line here. |
||
| val ret = new RoaringBitmap | ||
| ret.deserialize(new KryoInputDataInputBridge(input)) | ||
| ret | ||
| } | ||
| } | ||
| ) | ||
| } | ||
|
|
||
| private[serializer] class KryoInputDataInputBridge(input : KryoInput) extends DataInput { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove the space before |
||
| override def readLong(): Long = input.readLong() | ||
| override def readChar(): Char = input.readChar() | ||
| override def readFloat(): Float = input.readFloat() | ||
| override def readByte(): Byte = input.readByte() | ||
| override def readShort(): Short = input.readShort() | ||
| override def readUTF(): String = input.readString() // readString in kryo does utf8 | ||
| override def readInt(): Int = input.readInt() | ||
| override def readUnsignedShort(): Int = input.readShortUnsigned() | ||
| override def skipBytes(n: Int): Int = input.skip(n.toLong).toInt | ||
| override def readFully(b: Array[Byte]): Unit = input.read(b) | ||
| override def readFully(b: Array[Byte], off: Int, len: Int): Unit = input.read(b, off, len) | ||
| override def readLine(): String = throw new UnsupportedOperationException("readLine") | ||
| override def readBoolean(): Boolean = input.readBoolean() | ||
| override def readUnsignedByte(): Int = input.readByteUnsigned() | ||
| override def readDouble(): Double = input.readDouble() | ||
| } | ||
|
|
||
| private[serializer] class KryoOutputDataOutputBridge(output : KryoOutput) extends DataOutput { | ||
| override def writeFloat(v: Float): Unit = output.writeFloat(v) | ||
| // There is no "readChars" counterpart, except maybe "readLine", which is not supported | ||
| override def writeChars(s: String): Unit = throw new UnsupportedOperationException("writeChars") | ||
| override def writeDouble(v: Double): Unit = output.writeDouble(v) | ||
| override def writeUTF(s: String): Unit = output.writeString(s) // writeString in kryo does UTF8 | ||
| override def writeShort(v: Int): Unit = output.writeShort(v) | ||
| override def writeInt(v: Int): Unit = output.writeInt(v) | ||
| override def writeBoolean(v: Boolean): Unit = output.writeBoolean(v) | ||
| override def write(b: Int): Unit = output.write(b) | ||
| override def write(b: Array[Byte]): Unit = output.write(b) | ||
| override def write(b: Array[Byte], off: Int, len: Int): Unit = output.write(b, off, len) | ||
| override def writeBytes(s: String): Unit = output.writeString(s) | ||
| override def writeChar(v: Int): Unit = output.writeChar(v.toChar) | ||
| override def writeLong(v: Long): Unit = output.writeLong(v) | ||
| override def writeByte(v: Int): Unit = output.writeByte(v) | ||
| } | ||
|
|
||
| /** | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Space before
{.