diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala index 5e45b375ddd4..ddb74a657fd3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala @@ -22,6 +22,8 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput} import scala.collection.mutable import scala.collection.mutable.ArrayBuffer +import com.esotericsoftware.kryo.{Kryo, KryoSerializable} +import com.esotericsoftware.kryo.io.{Input, Output} import org.roaringbitmap.RoaringBitmap import org.apache.spark.SparkEnv @@ -142,7 +144,7 @@ private[spark] class HighlyCompressedMapStatus private ( private[this] var emptyBlocks: RoaringBitmap, private[this] var avgSize: Long, private var hugeBlockSizes: Map[Int, Byte]) - extends MapStatus with Externalizable { + extends MapStatus with Externalizable with KryoSerializable { // loc could be null when the default constructor is called during deserialization require(loc == null || avgSize > 0 || hugeBlockSizes.size > 0 || numNonEmptyBlocks == 0, @@ -189,6 +191,15 @@ private[spark] class HighlyCompressedMapStatus private ( } hugeBlockSizes = hugeBlockSizesArray.toMap } + + override def write(kryo: Kryo, output: Output): Unit = { + kryo.writeClassAndObject(output, hugeBlockSizes) + } + + override def read(kryo: Kryo, input: Input): Unit = { + hugeBlockSizes = kryo.readClassAndObject(input).asInstanceOf[Map[Int, Byte]] + } + } private[spark] object HighlyCompressedMapStatus {