Skip to content
Prev Previous commit
Next Next commit
Cleanup for submitting as standalone patch.
  • Loading branch information
JoshRosen committed May 5, 2015
commit 123b99286e3a5e6586818d302ec79abc4f0e958e
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class KryoSerializer(conf: SparkConf)
new KryoSerializerInstance(this)
}

override def supportsRelocationOfSerializedObjects: Boolean = {
private[spark] override lazy val supportsRelocationOfSerializedObjects: Boolean = {
// If auto-flush is disabled, then Kryo may store references to duplicate occurrences of objects
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this say "auto-reset" instead of "auto-flush"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes; good catch.

// in the stream rather than writing those objects' serialized bytes, breaking relocation. See
// https://groups.google.com/d/msg/kryo-users/6ZUSyfjjtdo/FhGG1KHDXPgJ for more details.
Expand Down
44 changes: 26 additions & 18 deletions core/src/main/scala/org/apache/spark/serializer/Serializer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.nio.ByteBuffer
import scala.reflect.ClassTag

import org.apache.spark.{SparkConf, SparkEnv}
import org.apache.spark.annotation.{Experimental, DeveloperApi}
import org.apache.spark.annotation.{Private, Experimental, DeveloperApi}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: alphabetize

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Argh, I really need to fix my IntelliJ settings. I switched versions and didn't port all of my import sorting settings over, so stuff like this keeps happening :(

import org.apache.spark.util.{Utils, ByteBufferInputStream, NextIterator}

/**
Expand Down Expand Up @@ -65,28 +65,36 @@ abstract class Serializer {
def newInstance(): SerializerInstance

/**
* :: Private ::
* Returns true if this serializer supports relocation of its serialized objects and false
* otherwise. This should return true if and only if reordering the bytes of serialized objects
* in serialization stream output results in re-ordered input that can be read with the
* deserializer. For instance, the following should work if the serializer supports relocation:
* otherwise. This should return true if and only if reordering the bytes of serialized objects
* in serialization stream output is equivalent to having re-ordered those elements prior to
* serializing them. More specifically, the following should hold if a serializer supports
* relocation:
*
* serOut.open()
* position = 0
* serOut.write(obj1)
* serOut.flush()
* position = # of bytes writen to stream so far
* obj1Bytes = [bytes 0 through position of stream]
* serOut.write(obj2)
* serOut.flush
* position2 = # of bytes written to stream so far
* obj2Bytes = bytes[position through position2 of stream]
* {{{
* serOut.open()
* position = 0
* serOut.write(obj1)
* serOut.flush()
* position = # of bytes writen to stream so far
* obj1Bytes = output[0:position-1]
* serOut.write(obj2)
* serOut.flush()
* position2 = # of bytes written to stream so far
* obj2Bytes = output[position:position2-1]
* serIn.open([obj2bytes] concatenate [obj1bytes]) should return (obj2, obj1)
* }}}
*
* serIn.open([obj2bytes] concatenate [obj1bytes]) should return (obj2, obj1)
* In general, this property should hold for serializers that are stateless.
*
* See SPARK-7311 for more discussion.
* This API is private to Spark; this method should not be overridden in third-party subclasses
* or called in user code and is subject to removal in future Spark releases.
*
* See SPARK-7311 for more details.
*/
@Experimental
def supportsRelocationOfSerializedObjects: Boolean = false
@Private
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that we want to commit to this as a stable public API, which is why I've chosen to mark this as private and leave comments warning users that this API is private and subject to change. If someone can think of a better way to restrict use / implementation of this method, I'd be happy to incorporate that change.

private[spark] def supportsRelocationOfSerializedObjects: Boolean = false
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not set on this name, by the way; happy to change if someone thinks of a less verbose name that's not misleading.

}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,19 @@ import org.scalatest.{Assertions, FunSuite}
import org.apache.spark.SparkConf
import org.apache.spark.serializer.KryoTest.RegistratorWithoutAutoReset


/**
* Tests to ensure that [[Serializer]] implementations obey the API contracts for methods that
* describe properties of the serialized stream, such as
* [[Serializer.supportsRelocationOfSerializedObjects]].
*/
class SerializerPropertiesSuite extends FunSuite {

import SerializerPropertiesSuite._

test("JavaSerializer does not support relocation") {
// Per a comment on the SPARK-4550 JIRA ticket, Java serialization appears to write out the
// full class name the first time an object is written to an output stream, but subsequent
// references to the class write a more compact identifier; this prevents relocation.
val ser = new JavaSerializer(new SparkConf())
testSupportsRelocationOfSerializedObjects(ser, generateRandomItem)
}
Expand Down Expand Up @@ -76,7 +83,7 @@ object SerializerPropertiesSuite extends Assertions {
if (!serializer.supportsRelocationOfSerializedObjects) {
return
}
val NUM_TRIALS = 10
val NUM_TRIALS = 5
val rand = new Random(42)
for (_ <- 1 to NUM_TRIALS) {
val items = {
Expand Down