Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix KryoSerializer
  • Loading branch information
zsxwing committed Dec 2, 2015
commit bfb336048f4383702315f2de352437c75218cb92
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.roaringbitmap.RoaringBitmap
import org.apache.spark._
import org.apache.spark.api.python.PythonBroadcast
import org.apache.spark.broadcast.HttpBroadcast
import org.apache.spark.network.util.{ByteUnit, JavaUtils}
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus}
import org.apache.spark.storage._
import org.apache.spark.util.collection.CompactBuffer
Expand Down Expand Up @@ -307,7 +307,7 @@ private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends Serializ
override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {
val kryo = borrowKryo()
try {
input.setBuffer(JavaUtils.bufferToArray(bytes))
input.setBuffer(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessary for this change, but at some point it might be worth it to change this to use Kryo's ByteBufferInput.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kryo will use the array as an internal buffer. Why it's not necessary?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm saying that the change I proposed is not necessary, not that your change is not necessary.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm saying that the change I proposed is not necessary, not that your change is not necessary.

Got it. Sorry for my misunderstanding.

kryo.readClassAndObject(input).asInstanceOf[T]
} finally {
releaseKryo(kryo)
Expand All @@ -319,7 +319,7 @@ private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends Serializ
val oldClassLoader = kryo.getClassLoader
try {
kryo.setClassLoader(loader)
input.setBuffer(JavaUtils.bufferToArray(bytes))
input.setBuffer(bytes.array(), bytes.arrayOffset() + bytes.position(), bytes.remaining())
kryo.readClassAndObject(input).asInstanceOf[T]
} finally {
kryo.setClassLoader(oldClassLoader)
Expand Down