Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add explanatory comments
  • Loading branch information
JoshRosen committed May 27, 2015
commit ba55d20ba9c3768d04fb7b39c45a3123b66b8d6f
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.serializer

import java.io.{EOFException, IOException, InputStream, OutputStream}
import java.nio.ByteBuffer
import javax.annotation.Nullable

import scala.reflect.ClassTag

Expand Down Expand Up @@ -202,26 +203,43 @@ class KryoDeserializationStream(

private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance {

private[this] var cachedKryo: Kryo = ks.newKryo()
/**
* A re-used [[Kryo]] instance. Methods will borrow this instance by calling `borrowKryo()`, do
* their work, then release the instance by calling `releaseKryo()`. Logically, this is a caching
* pool of size one. SerializerInstances are not thread-safe, hence accesses to this field are
* not synchronized.
*/
@Nullable private[this] var cachedKryo: Kryo = null

private[spark] def borrowKryo(): Kryo = {
/**
* Borrows a [[Kryo]] instance. If possible, this tries to re-use a cached Kryo instance;
* otherwise, it allocates a new instance.
*/
private[serializer] def borrowKryo(): Kryo = {
if (cachedKryo != null) {
val kryo = cachedKryo
kryo.reset() // We must reset in case this serializer instance was reused (see SPARK-7766)
// As a defensive measure, call reset() to clear any Kryo state that might have been modified
// by the last operation to borrow this instance (see SPARK-7766 for discussion of this issue)
kryo.reset()
cachedKryo = null
kryo
} else {
ks.newKryo()
}
}

private[spark] def releaseKryo(kryo: Kryo): Unit = {
/**
* Release a borrowed [[Kryo]] instance. If this serializer instance already has a cached Kryo
* instance, then the given Kryo instance is discarded; otherwise, the Kryo is stored for later
* re-use.
*/
private[serializer] def releaseKryo(kryo: Kryo): Unit = {
if (cachedKryo == null) {
cachedKryo = kryo
}
}

// Make these lazy vals to avoid creating a buffer unless we use them
// Make these lazy vals to avoid creating a buffer unless we use them.
private lazy val output = ks.newKryoOutput()
private lazy val input = new KryoInput()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.serializer

import java.io._
import java.nio.ByteBuffer
import javax.annotation.concurrent.NotThreadSafe

import scala.reflect.ClassTag

Expand Down Expand Up @@ -114,8 +115,12 @@ object Serializer {
/**
* :: DeveloperApi ::
* An instance of a serializer, for use by one thread at a time.
*
* It is legal to create multiple serialization / deserialization streams from the same
* SerializerInstance as long as those streams are all used within the same thread.
*/
@DeveloperApi
@NotThreadSafe
abstract class SerializerInstance {
def serialize[T: ClassTag](t: T): ByteBuffer

Expand Down