Skip to content

Commit 29e154b

Browse files
Gabbi Merzdongjoon-hyun
authored andcommitted
[SPARK-27858][SQL] Fix for avro deserialization on union types with multiple non-null types
## What changes were proposed in this pull request? This PR aims to fix an issue on a union avro type with more than one non-null value (for instance `["string", "null", "int"]`) whose the deserialization to a DataFrame would throw a `java.lang.ArrayIndexOutOfBoundsException`. The issue was that the `fieldWriter` relied on the index from the avro schema before nulls were filtered out. ## How was this patch tested? A test for the case of multiple non-null values was added and the tests were run using sbt by running `testOnly org.apache.spark.sql.avro.AvroSuite` Closes #24722 from gcmerz/master. Authored-by: Gabbi Merz <gmerz@palantir.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
1 parent 00a8c85 commit 29e154b

File tree

2 files changed

+28
-1
lines changed

2 files changed

+28
-1
lines changed

external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,7 @@ class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) {
225225
case (UNION, _) =>
226226
val allTypes = avroType.getTypes.asScala
227227
val nonNullTypes = allTypes.filter(_.getType != NULL)
228+
val nonNullAvroType = Schema.createUnion(nonNullTypes.asJava)
228229
if (nonNullTypes.nonEmpty) {
229230
if (nonNullTypes.length == 1) {
230231
newWriter(nonNullTypes.head, catalystType, path)
@@ -253,7 +254,7 @@ class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) {
253254
(updater, ordinal, value) => {
254255
val row = new SpecificInternalRow(st)
255256
val fieldUpdater = new RowUpdater(row)
256-
val i = GenericData.get().resolveUnion(avroType, value)
257+
val i = GenericData.get().resolveUnion(nonNullAvroType, value)
257258
fieldWriters(i)(fieldUpdater, i, value)
258259
updater.set(ordinal, row)
259260
}

external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,32 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
247247
}
248248
}
249249

250+
test("SPARK-27858 Union type: More than one non-null type") {
251+
withTempDir { dir =>
252+
val complexNullUnionType = Schema.createUnion(
253+
List(Schema.create(Type.INT), Schema.create(Type.NULL), Schema.create(Type.STRING)).asJava)
254+
val fields = Seq(
255+
new Field("field1", complexNullUnionType, "doc", null.asInstanceOf[AnyVal])).asJava
256+
val schema = Schema.createRecord("name", "docs", "namespace", false)
257+
schema.setFields(fields)
258+
val datumWriter = new GenericDatumWriter[GenericRecord](schema)
259+
val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter)
260+
dataFileWriter.create(schema, new File(s"$dir.avro"))
261+
val avroRec = new GenericData.Record(schema)
262+
avroRec.put("field1", 42)
263+
dataFileWriter.append(avroRec)
264+
val avroRec2 = new GenericData.Record(schema)
265+
avroRec2.put("field1", "Alice")
266+
dataFileWriter.append(avroRec2)
267+
dataFileWriter.flush()
268+
dataFileWriter.close()
269+
270+
val df = spark.read.format("avro").load(s"$dir.avro")
271+
assert(df.schema === StructType.fromDDL("field1 struct<member0: int, member1: string>"))
272+
assert(df.collect().toSet == Set(Row(Row(42, null)), Row(Row(null, "Alice"))))
273+
}
274+
}
275+
250276
test("Complex Union Type") {
251277
withTempPath { dir =>
252278
val fixedSchema = Schema.createFixed("fixed_name", "doc", "namespace", 4)

0 commit comments

Comments
 (0)