Skip to content

Commit e651900

Browse files
lianchengyhuai
authored andcommitted
[SPARK-16344][SQL] Decoding Parquet array of struct with a single field named "element"
## What changes were proposed in this pull request? Due to backward-compatibility reasons, the following Parquet schema is ambiguous: ``` optional group f (LIST) { repeated group list { optional group element { optional int32 element; } } } ``` According to the parquet-format spec, when interpreted as a standard 3-level layout, this type is equivalent to the following SQL type: ``` ARRAY<STRUCT<element: INT>> ``` However, when interpreted as a legacy 2-level layout, it's equivalent to ``` ARRAY<STRUCT<element: STRUCT<element: INT>>> ``` Historically, to disambiguate these cases, we employed two methods: - `ParquetSchemaConverter.isElementType()` Used to disambiguate the above cases while converting Parquet types to Spark types. - `ParquetRowConverter.isElementType()` Used to disambiguate the above cases while instantiating row converters that convert Parquet records to Spark rows. Unfortunately, these two methods make different decision about the above problematic Parquet type, and caused SPARK-16344. `ParquetRowConverter.isElementType()` is necessary for Spark 1.4 and earlier versions because Parquet requested schemata are directly converted from Spark schemata in these versions. The converted Parquet schemata may be incompatible with actual schemata of the underlying physical files when the files are written by a system/library that uses a schema conversion scheme that is different from Spark when writing Parquet LIST and MAP fields. In Spark 1.5, Parquet requested schemata are always properly tailored from schemata of physical files to be read. Thus `ParquetRowConverter.isElementType()` is no longer necessary. This PR replaces this method with a simply yet accurate scheme: whenever an ambiguous Parquet type is hit, convert the type in question back to a Spark type using `ParquetSchemaConverter` and check whether it matches the corresponding Spark type. ## How was this patch tested? New test cases added in `ParquetHiveCompatibilitySuite` and `ParquetQuerySuite`. Author: Cheng Lian <lian@databricks.com> Closes apache#14014 from liancheng/spark-16344-for-master-and-2.0.
1 parent e3cd5b3 commit e651900

File tree

6 files changed

+73
-47
lines changed

6 files changed

+73
-47
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,8 @@ private[parquet] class ParquetReadSupport extends ReadSupport[InternalRow] with
9696

9797
new ParquetRecordMaterializer(
9898
parquetRequestedSchema,
99-
ParquetReadSupport.expandUDT(catalystRequestedSchema))
99+
ParquetReadSupport.expandUDT(catalystRequestedSchema),
100+
new ParquetSchemaConverter(conf))
100101
}
101102
}
102103

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,14 @@ import org.apache.spark.sql.types.StructType
2828
*
2929
* @param parquetSchema Parquet schema of the records to be read
3030
* @param catalystSchema Catalyst schema of the rows to be constructed
31+
* @param schemaConverter A Parquet-Catalyst schema converter that helps initializing row converters
3132
*/
3233
private[parquet] class ParquetRecordMaterializer(
33-
parquetSchema: MessageType, catalystSchema: StructType)
34+
parquetSchema: MessageType, catalystSchema: StructType, schemaConverter: ParquetSchemaConverter)
3435
extends RecordMaterializer[InternalRow] {
3536

36-
private val rootConverter = new ParquetRowConverter(parquetSchema, catalystSchema, NoopUpdater)
37+
private val rootConverter =
38+
new ParquetRowConverter(schemaConverter, parquetSchema, catalystSchema, NoopUpdater)
3739

3840
override def getCurrentRecord: InternalRow = rootConverter.currentRecord
3941

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala

Lines changed: 42 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import scala.collection.mutable.ArrayBuffer
2525

2626
import org.apache.parquet.column.Dictionary
2727
import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter}
28-
import org.apache.parquet.schema.{GroupType, MessageType, PrimitiveType, Type}
28+
import org.apache.parquet.schema.{GroupType, MessageType, Type}
2929
import org.apache.parquet.schema.OriginalType.{INT_32, LIST, UTF8}
3030
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.{BINARY, DOUBLE, FIXED_LEN_BYTE_ARRAY, INT32, INT64}
3131

@@ -113,12 +113,14 @@ private[parquet] class ParquetPrimitiveConverter(val updater: ParentContainerUpd
113113
* When used as a root converter, [[NoopUpdater]] should be used since root converters don't have
114114
* any "parent" container.
115115
*
116+
* @param schemaConverter A utility converter used to convert Parquet types to Catalyst types.
116117
* @param parquetType Parquet schema of Parquet records
117118
* @param catalystType Spark SQL schema that corresponds to the Parquet record type. User-defined
118119
* types should have been expanded.
119120
* @param updater An updater which propagates converted field values to the parent container
120121
*/
121122
private[parquet] class ParquetRowConverter(
123+
schemaConverter: ParquetSchemaConverter,
122124
parquetType: GroupType,
123125
catalystType: StructType,
124126
updater: ParentContainerUpdater)
@@ -292,9 +294,10 @@ private[parquet] class ParquetRowConverter(
292294
new ParquetMapConverter(parquetType.asGroupType(), t, updater)
293295

294296
case t: StructType =>
295-
new ParquetRowConverter(parquetType.asGroupType(), t, new ParentContainerUpdater {
296-
override def set(value: Any): Unit = updater.set(value.asInstanceOf[InternalRow].copy())
297-
})
297+
new ParquetRowConverter(
298+
schemaConverter, parquetType.asGroupType(), t, new ParentContainerUpdater {
299+
override def set(value: Any): Unit = updater.set(value.asInstanceOf[InternalRow].copy())
300+
})
298301

299302
case t =>
300303
throw new RuntimeException(
@@ -442,13 +445,46 @@ private[parquet] class ParquetRowConverter(
442445
private val elementConverter: Converter = {
443446
val repeatedType = parquetSchema.getType(0)
444447
val elementType = catalystSchema.elementType
445-
val parentName = parquetSchema.getName
446448

447-
if (isElementType(repeatedType, elementType, parentName)) {
449+
// At this stage, we're not sure whether the repeated field maps to the element type or is
450+
// just the syntactic repeated group of the 3-level standard LIST layout. Take the following
451+
// Parquet LIST-annotated group type as an example:
452+
//
453+
// optional group f (LIST) {
454+
// repeated group list {
455+
// optional group element {
456+
// optional int32 element;
457+
// }
458+
// }
459+
// }
460+
//
461+
// This type is ambiguous:
462+
//
463+
// 1. When interpreted as a standard 3-level layout, the `list` field is just the syntactic
464+
// group, and the entire type should be translated to:
465+
//
466+
// ARRAY<STRUCT<element: INT>>
467+
//
468+
// 2. On the other hand, when interpreted as a non-standard 2-level layout, the `list` field
469+
// represents the element type, and the entire type should be translated to:
470+
//
471+
// ARRAY<STRUCT<element: STRUCT<element: INT>>>
472+
//
473+
// Here we try to convert field `list` into a Catalyst type to see whether the converted type
474+
// matches the Catalyst array element type. If it doesn't match, then it's case 1; otherwise,
475+
// it's case 2.
476+
val guessedElementType = schemaConverter.convertField(repeatedType)
477+
478+
if (DataType.equalsIgnoreCompatibleNullability(guessedElementType, elementType)) {
479+
// If the repeated field corresponds to the element type, creates a new converter using the
480+
// type of the repeated field.
448481
newConverter(repeatedType, elementType, new ParentContainerUpdater {
449482
override def set(value: Any): Unit = currentArray += value
450483
})
451484
} else {
485+
// If the repeated field corresponds to the syntactic group in the standard 3-level Parquet
486+
// LIST layout, creates a new converter using the only child field of the repeated field.
487+
assert(!repeatedType.isPrimitive && repeatedType.asGroupType().getFieldCount == 1)
452488
new ElementConverter(repeatedType.asGroupType().getType(0), elementType)
453489
}
454490
}
@@ -462,37 +498,6 @@ private[parquet] class ParquetRowConverter(
462498
// in row cells.
463499
override def start(): Unit = currentArray = ArrayBuffer.empty[Any]
464500

465-
// scalastyle:off
466-
/**
467-
* Returns whether the given type is the element type of a list or is a syntactic group with
468-
* one field that is the element type. This is determined by checking whether the type can be
469-
* a syntactic group and by checking whether a potential syntactic group matches the expected
470-
* schema.
471-
* {{{
472-
* <list-repetition> group <name> (LIST) {
473-
* repeated group list { <-- repeatedType points here
474-
* <element-repetition> <element-type> element;
475-
* }
476-
* }
477-
* }}}
478-
* In short, here we handle Parquet list backwards-compatibility rules on the read path. This
479-
* method is based on `AvroIndexedRecordConverter.isElementType`.
480-
*
481-
* @see https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules
482-
*/
483-
// scalastyle:on
484-
private def isElementType(
485-
parquetRepeatedType: Type, catalystElementType: DataType, parentName: String): Boolean = {
486-
(parquetRepeatedType, catalystElementType) match {
487-
case (t: PrimitiveType, _) => true
488-
case (t: GroupType, _) if t.getFieldCount > 1 => true
489-
case (t: GroupType, _) if t.getFieldCount == 1 && t.getName == "array" => true
490-
case (t: GroupType, _) if t.getFieldCount == 1 && t.getName == parentName + "_tuple" => true
491-
case (t: GroupType, StructType(Array(f))) if f.name == t.getFieldName(0) => true
492-
case _ => false
493-
}
494-
}
495-
496501
/** Array element converter */
497502
private final class ElementConverter(parquetType: Type, catalystType: DataType)
498503
extends GroupConverter {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ private[parquet] class ParquetSchemaConverter(
260260
{
261261
// For legacy 2-level list types with primitive element type, e.g.:
262262
//
263-
// // List<Integer> (nullable list, non-null elements)
263+
// // ARRAY<INT> (nullable list, non-null elements)
264264
// optional group my_list (LIST) {
265265
// repeated int32 element;
266266
// }
@@ -270,7 +270,7 @@ private[parquet] class ParquetSchemaConverter(
270270
// For legacy 2-level list types whose element type is a group type with 2 or more fields,
271271
// e.g.:
272272
//
273-
// // List<Tuple<String, Integer>> (nullable list, non-null elements)
273+
// // ARRAY<STRUCT<str: STRING, num: INT>> (nullable list, non-null elements)
274274
// optional group my_list (LIST) {
275275
// repeated group element {
276276
// required binary str (UTF8);
@@ -282,7 +282,7 @@ private[parquet] class ParquetSchemaConverter(
282282
} || {
283283
// For legacy 2-level list types generated by parquet-avro (Parquet version < 1.6.0), e.g.:
284284
//
285-
// // List<OneTuple<String>> (nullable list, non-null elements)
285+
// // ARRAY<STRUCT<str: STRING>> (nullable list, non-null elements)
286286
// optional group my_list (LIST) {
287287
// repeated group array {
288288
// required binary str (UTF8);
@@ -293,7 +293,7 @@ private[parquet] class ParquetSchemaConverter(
293293
} || {
294294
// For Parquet data generated by parquet-thrift, e.g.:
295295
//
296-
// // List<OneTuple<String>> (nullable list, non-null elements)
296+
// // ARRAY<STRUCT<str: STRING>> (nullable list, non-null elements)
297297
// optional group my_list (LIST) {
298298
// repeated group my_list_tuple {
299299
// required binary str (UTF8);

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.spark.sql._
2626
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
2727
import org.apache.spark.sql.catalyst.expressions.SpecificMutableRow
2828
import org.apache.spark.sql.execution.BatchedDataSourceScanExec
29-
import org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStruct, NestedStructUDT}
29+
import org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStruct, NestedStructUDT, SingleElement}
3030
import org.apache.spark.sql.internal.SQLConf
3131
import org.apache.spark.sql.test.SharedSQLContext
3232
import org.apache.spark.sql.types._
@@ -668,9 +668,23 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
668668
}
669669
}
670670
}
671+
672+
test("SPARK-16344: array of struct with a single field named 'element'") {
673+
withTempPath { dir =>
674+
val path = dir.getCanonicalPath
675+
Seq(Tuple1(Array(SingleElement(42)))).toDF("f").write.parquet(path)
676+
677+
checkAnswer(
678+
sqlContext.read.parquet(path),
679+
Row(Array(Row(42)))
680+
)
681+
}
682+
}
671683
}
672684

673685
object TestingUDT {
686+
case class SingleElement(element: Long)
687+
674688
@SQLUserDefinedType(udt = classOf[NestedStructUDT])
675689
case class NestedStruct(a: Integer, b: Long, c: Double)
676690

sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@ package org.apache.spark.sql.hive
1919

2020
import java.sql.Timestamp
2121

22-
import org.apache.hadoop.hive.conf.HiveConf
23-
2422
import org.apache.spark.sql.Row
2523
import org.apache.spark.sql.execution.datasources.parquet.ParquetCompatibilityTest
2624
import org.apache.spark.sql.hive.test.TestHiveSingleton
@@ -137,4 +135,10 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi
137135
Row(Row(1, Seq("foo", "bar", null))),
138136
"STRUCT<f0: INT, f1: ARRAY<STRING>>")
139137
}
138+
139+
test("SPARK-16344: array of struct with a single field named 'array_element'") {
140+
testParquetHiveCompatibility(
141+
Row(Seq(Row(1))),
142+
"ARRAY<STRUCT<array_element: INT>>")
143+
}
140144
}

0 commit comments

Comments
 (0)