Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-27858][SQL] Fix for avro deserialization on union types with m…
…ultiple non-null types (#562)

## What changes were proposed in this pull request?

This PR aims to fix an issue on a union avro type with more than one non-null value (for instance ["string", "null", "int"]) whose the deserialization to a DataFrame would throw a java.lang.ArrayIndexOutOfBoundsException. The issue was that the fieldWriter relied on the index from the avro schema before nulls were filtered out.

## How was this patch tested?

A test for the case of multiple non-null values was added and the tests were run using sbt by running testOnly org.apache.spark.sql.avro.AvroSuite
  • Loading branch information
vinooganesh authored and bulldozer-bot[bot] committed May 28, 2019
commit a2904adf968385d335874b8b9f77ad411902cbe7
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) {
case (UNION, _) =>
val allTypes = avroType.getTypes.asScala
val nonNullTypes = allTypes.filter(_.getType != NULL)
val nonNullAvroType = Schema.createUnion(nonNullTypes.asJava)
if (nonNullTypes.nonEmpty) {
if (nonNullTypes.length == 1) {
newWriter(nonNullTypes.head, catalystType, path)
Expand Down Expand Up @@ -253,7 +254,7 @@ class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) {
(updater, ordinal, value) => {
val row = new SpecificInternalRow(st)
val fieldUpdater = new RowUpdater(row)
val i = GenericData.get().resolveUnion(avroType, value)
val i = GenericData.get().resolveUnion(nonNullAvroType, value)
fieldWriters(i)(fieldUpdater, i, value)
updater.set(ordinal, row)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,32 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
}
}

test("SPARK-27858 Union type: More than one non-null type") {
withTempDir { dir =>
val complexNullUnionType = Schema.createUnion(
List(Schema.create(Type.INT), Schema.create(Type.NULL), Schema.create(Type.STRING)).asJava)
val fields = Seq(
new Field("field1", complexNullUnionType, "doc", null.asInstanceOf[AnyVal])).asJava
val schema = Schema.createRecord("name", "docs", "namespace", false)
schema.setFields(fields)
val datumWriter = new GenericDatumWriter[GenericRecord](schema)
val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter)
dataFileWriter.create(schema, new File(s"$dir.avro"))
val avroRec = new GenericData.Record(schema)
avroRec.put("field1", 42)
dataFileWriter.append(avroRec)
val avroRec2 = new GenericData.Record(schema)
avroRec2.put("field1", "Alice")
dataFileWriter.append(avroRec2)
dataFileWriter.flush()
dataFileWriter.close()

val df = spark.read.format("avro").load(s"$dir.avro")
assert(df.schema === StructType.fromDDL("field1 struct<member0: int, member1: string>"))
assert(df.collect().toSet == Set(Row(Row(42, null)), Row(Row(null, "Alice"))))
}
}

test("Complex Union Type") {
withTempPath { dir =>
val fixedSchema = Schema.createFixed("fixed_name", "doc", "namespace", 4)
Expand Down