-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-746][CORE] Added Avro Serialization to Kryo #7004
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
97fba62
2b545cc
f4ae251
ab46d10
d421bf5
6d1925c
0f5471a
c5fe794
fa9298b
1183a48
dd71efe
c0cf329
8158d51
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,6 +21,8 @@ import java.io.{EOFException, IOException, InputStream, OutputStream} | |
| import java.nio.ByteBuffer | ||
| import javax.annotation.Nullable | ||
|
|
||
| import org.apache.spark.io.CompressionCodec | ||
|
|
||
| import scala.reflect.ClassTag | ||
|
|
||
| import org.apache.avro.generic.{GenericData, GenericRecord} | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: order |
||
|
|
@@ -76,6 +78,7 @@ class KryoSerializer(conf: SparkConf) | |
| .filter(!_.isEmpty) | ||
|
|
||
| private val avroSchemas = conf.getAvroSchema | ||
| private val codec = CompressionCodec.createCodec(conf) | ||
|
|
||
| def newKryoOutput(): KryoOutput = new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize)) | ||
|
|
||
|
|
@@ -103,8 +106,8 @@ class KryoSerializer(conf: SparkConf) | |
| kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer()) | ||
| kryo.register(classOf[PythonBroadcast], new KryoJavaSerializer()) | ||
|
|
||
| kryo.register(classOf[GenericRecord], new GenericAvroSerializer(avroSchemas)) | ||
| kryo.register(classOf[GenericData.Record], new GenericAvroSerializer(avroSchemas)) | ||
| kryo.register(classOf[GenericRecord], new GenericAvroSerializer(avroSchemas, codec)) | ||
| kryo.register(classOf[GenericData.Record], new GenericAvroSerializer(avroSchemas, codec)) | ||
|
|
||
| try { | ||
| // Use the default classloader when calling the user registrator. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer | |
| import com.esotericsoftware.kryo.io.{Output, Input} | ||
| import org.apache.avro.{SchemaBuilder, Schema} | ||
| import org.apache.avro.generic.GenericData.Record | ||
| import org.apache.spark.io.CompressionCodec | ||
|
|
||
| import org.apache.spark.{SparkFunSuite, SharedSparkContext} | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: empty line between other imports and spark imports |
||
|
|
||
|
|
@@ -37,12 +38,14 @@ class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext { | |
| record.put("data", "test data") | ||
|
|
||
| test("schema compression and decompression") { | ||
| val genericSer = new GenericAvroSerializer(conf.getAvroSchema) | ||
| val genericSer = new GenericAvroSerializer(conf.getAvroSchema, | ||
| CompressionCodec.createCodec(conf)) | ||
| assert(schema === genericSer.decompress(ByteBuffer.wrap(genericSer.compress(schema)))) | ||
| } | ||
|
|
||
| test("record serialization and deserialization") { | ||
| val genericSer = new GenericAvroSerializer(conf.getAvroSchema) | ||
| val genericSer = new GenericAvroSerializer(conf.getAvroSchema, | ||
| CompressionCodec.createCodec(conf)) | ||
|
|
||
| val outputStream = new ByteArrayOutputStream() | ||
| val output = new Output(outputStream) | ||
|
|
@@ -55,7 +58,8 @@ class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext { | |
| } | ||
|
|
||
| test("uses schema fingerprint to decrease message size") { | ||
| val genericSerFull = new GenericAvroSerializer(conf.getAvroSchema) | ||
| val genericSerFull = new GenericAvroSerializer(conf.getAvroSchema, | ||
| CompressionCodec.createCodec(conf)) | ||
|
|
||
| val output = new Output(new ByteArrayOutputStream()) | ||
|
|
||
|
|
@@ -65,7 +69,8 @@ class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext { | |
| val normalLength = output.total - beginningNormalPosition | ||
|
|
||
| conf.registerAvroSchemas(schema) | ||
| val genericSerFinger = new GenericAvroSerializer(conf.getAvroSchema) | ||
| val genericSerFinger = new GenericAvroSerializer(conf.getAvroSchema, | ||
| CompressionCodec.createCodec(conf)) | ||
| val beginningFingerprintPosition = output.total() | ||
| genericSerFinger.serializeDatum(record, output) | ||
| val fingerprintLength = output.total - beginningFingerprintPosition | ||
|
|
@@ -74,7 +79,8 @@ class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext { | |
| } | ||
|
|
||
| test("caches previously seen schemas") { | ||
| val genericSer = new GenericAvroSerializer(conf.getAvroSchema) | ||
| val genericSer = new GenericAvroSerializer(conf.getAvroSchema, | ||
| CompressionCodec.createCodec(conf)) | ||
| val compressedSchema = genericSer.compress(schema) | ||
| val decompressedScheam = genericSer.decompress(ByteBuffer.wrap(compressedSchema)) | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: import order, should be below scala imports