Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
address comments
  • Loading branch information
gengliangwang committed Jul 20, 2018
commit 0a6dd0bb67edcd7ebe552486efb9bd7c1e6ef893
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,29 @@

package org.apache.spark.sql

import org.apache.avro.Schema
import org.apache.avro.generic.GenericDatumReader
import org.apache.avro.io.{BinaryDecoder, DecoderFactory}

import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters, SerializableSchema}
import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters}
import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, UnaryExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode}
import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add ExpressionDescription and javadoc here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a function expression like the ones in SQL core, so ExpressionDescription can't apply here. I think we can leave it for now.

case class AvroDataToCatalyst(child: Expression, avroType: SerializableSchema)
case class AvroDataToCatalyst(child: Expression, jsonFormatSchema: String)
extends UnaryExpression with ExpectsInputTypes {

override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType)

override lazy val dataType: DataType =
SchemaConverters.toSqlType(avroType.value).dataType
override lazy val dataType: DataType = SchemaConverters.toSqlType(avroSchema).dataType

override def nullable: Boolean = true

@transient private lazy val reader = new GenericDatumReader[Any](avroType.value)
@transient private lazy val avroSchema = new Schema.Parser().parse(jsonFormatSchema)

@transient private lazy val deserializer = new AvroDeserializer(avroType.value, dataType)
@transient private lazy val reader = new GenericDatumReader[Any](avroSchema)

@transient private lazy val deserializer = new AvroDeserializer(avroSchema, dataType)

@transient private var decoder: BinaryDecoder = _

Expand All @@ -51,7 +53,7 @@ case class AvroDataToCatalyst(child: Expression, avroType: SerializableSchema)
}

override def simpleString: String = {
s"from_avro(${child.sql}, ${dataType.simpleString})"
s"from_avro(${child.sql}, ${dataType.catalogString})"
}

override def sql: String = simpleString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ case class CatalystDataToAvro(child: Expression) extends UnaryExpression {
}

override def simpleString: String = {
s"to_avro(${child.sql}, ${child.dataType.simpleString})"
s"to_avro(${child.sql}, ${child.dataType.catalogString})"
}

override def sql: String = simpleString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,21 @@ package object avro {
* arbitrary result.
*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we add @since?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

* @param data the binary column.
* @param avroType the avro type.
* @param jsonFormatSchema the avro schema in JSON string format.
*
* @since 2.4.0
*/
@Experimental
def from_avro(data: Column, avroType: Schema): Column = {
new Column(AvroDataToCatalyst(data.expr, new SerializableSchema(avroType)))
def from_avro(data: Column, jsonFormatSchema: String): Column = {
new Column(AvroDataToCatalyst(data.expr, jsonFormatSchema))
}

/**
* Converts a column into binary of avro format.
*
* @param data the data column.
*
* @since 2.4.0
*/
@Experimental
def to_avro(data: Column): Column = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,18 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite with ExpressionEvalH

private def roundTripTest(data: Literal): Unit = {
val avroType = SchemaConverters.toAvroType(data.dataType, data.nullable)
checkResult(data, avroType, data.eval())
checkResult(data, avroType.toString, data.eval())
}

private def checkResult(data: Literal, avroType: Schema, expected: Any): Unit = {
private def checkResult(data: Literal, schema: String, expected: Any): Unit = {
checkEvaluation(
AvroDataToCatalyst(CatalystDataToAvro(data), new SerializableSchema(avroType)),
AvroDataToCatalyst(CatalystDataToAvro(data), schema),
prepareExpectedResult(expected))
}

private def assertFail(data: Literal, avroType: Schema): Unit = {
private def assertFail(data: Literal, schema: String): Unit = {
intercept[java.io.EOFException] {
AvroDataToCatalyst(CatalystDataToAvro(data), new SerializableSchema(avroType)).eval()
AvroDataToCatalyst(CatalystDataToAvro(data), schema).eval()
}
}

Expand Down Expand Up @@ -126,7 +126,7 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite with ExpressionEvalH
""".stripMargin

// When read int as string, avro reader is not able to parse the binary and fail.
assertFail(data, new Schema.Parser().parse(avroTypeJson))
assertFail(data, avroTypeJson)
}

test("read string as int") {
Expand All @@ -141,7 +141,7 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite with ExpressionEvalH

// When read string data as int, avro reader is not able to find the type mismatch and read
// the string length as int value.
checkResult(data, new Schema.Parser().parse(avroTypeJson), 3)
checkResult(data, avroTypeJson, 3)
}

test("read float as double") {
Expand All @@ -156,7 +156,7 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite with ExpressionEvalH

// When read float data as double, avro reader fails(trying to read 8 bytes while the data have
// only 4 bytes).
assertFail(data, new Schema.Parser().parse(avroTypeJson))
assertFail(data, avroTypeJson)
}

test("read double as float") {
Expand All @@ -170,6 +170,6 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite with ExpressionEvalH
""".stripMargin

// avro reader reads the first 4 bytes of a double as a float, the result is totally undefined.
checkResult(data, new Schema.Parser().parse(avroTypeJson), 5.848603E35f)
checkResult(data, avroTypeJson, 5.848603E35f)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,56 +30,52 @@ class AvroFunctionsSuite extends QueryTest with SharedSQLContext {
val df = spark.range(10).select('id, 'id.cast("string").as("str"))

val avroDF = df.select(to_avro('id).as("a"), to_avro('str).as("b"))
val avroTypeLong = new Schema.Parser().parse(
s"""
|{
| "type": "int",
| "name": "id"
|}
""".stripMargin)
val avroTypeStr = new Schema.Parser().parse(
s"""
|{
| "type": "string",
| "name": "str"
|}
""".stripMargin)
val avroTypeLong = s"""
|{
| "type": "int",
| "name": "id"
|}
""".stripMargin
val avroTypeStr = s"""
|{
| "type": "string",
| "name": "str"
|}
""".stripMargin
checkAnswer(avroDF.select(from_avro('a, avroTypeLong), from_avro('b, avroTypeStr)), df)
}

test("roundtrip in to_avro and from_avro - struct") {
val df = spark.range(10).select(struct('id, 'id.cast("string").as("str")).as("struct"))
val avroStructDF = df.select(to_avro('struct).as("avro"))
val avroTypeStruct = new Schema.Parser().parse(
s"""
|{
| "type": "record",
| "name": "struct",
| "fields": [
| {"name": "col1", "type": "long"},
| {"name": "col2", "type": "string"}
| ]
|}
""".stripMargin)
val avroTypeStruct = s"""
|{
| "type": "record",
| "name": "struct",
| "fields": [
| {"name": "col1", "type": "long"},
| {"name": "col2", "type": "string"}
| ]
|}
""".stripMargin
checkAnswer(avroStructDF.select(from_avro('avro, avroTypeStruct)), df)
}

test("roundtrip in to_avro and from_avro - array with null") {
val dfOne = Seq(Tuple1(Tuple1(1) :: Nil), Tuple1(null :: Nil)).toDF("array")
val avroTypeArrStruct = new Schema.Parser().parse(
s"""
|[ {
| "type" : "array",
| "items" : [ {
| "type" : "record",
| "name" : "x",
| "fields" : [ {
| "name" : "y",
| "type" : "int"
| } ]
| }, "null" ]
|}, "null" ]
""".stripMargin)
val avroTypeArrStruct = s"""
|[ {
| "type" : "array",
| "items" : [ {
| "type" : "record",
| "name" : "x",
| "fields" : [ {
| "name" : "y",
| "type" : "int"
| } ]
| }, "null" ]
|}, "null" ]
""".stripMargin
val readBackOne = dfOne.select(to_avro($"array").as("avro"))
.select(from_avro($"avro", avroTypeArrStruct).as("array"))
checkAnswer(dfOne, readBackOne)
Expand Down