From d3d47f736286af906e66c3faf3dae7995eb28c9c Mon Sep 17 00:00:00 2001 From: Ivan Sadikov Date: Mon, 20 Sep 2021 12:19:50 +1200 Subject: [PATCH 1/3] update parquet row converter for arrays --- .../parquet/ParquetRowConverter.scala | 4 +- .../parquet/ParquetSchemaConverter.scala | 2 +- .../ParquetInteroperabilitySuite.scala | 53 +++++++++++++++++++ 3 files changed, 57 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index e0af5d8dd869..0682266c4a90 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -602,8 +602,10 @@ private[parquet] class ParquetRowConverter( // matches the Catalyst array element type. If it doesn't match, then it's case 1; otherwise, // it's case 2. val guessedElementType = schemaConverter.convertField(repeatedType) + // We also need to check if the list element follows the backward compatible pattern. + val isLegacy = schemaConverter.isElementType(repeatedType, parquetSchema.getName()) - if (DataType.equalsIgnoreCompatibleNullability(guessedElementType, elementType)) { + if (DataType.equalsIgnoreCompatibleNullability(guessedElementType, elementType) || isLegacy) { // If the repeated field corresponds to the element type, creates a new converter using the // type of the repeated field. newConverter(repeatedType, elementType, new ParentContainerUpdater { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala index f3bfd99368de..f3ecd790761a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala @@ -266,7 +266,7 @@ class ParquetToSparkSchemaConverter( // Here we implement Parquet LIST backwards-compatibility rules. // See: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules // scalastyle:on - private def isElementType(repeatedType: Type, parentName: String): Boolean = { + private[parquet] def isElementType(repeatedType: Type, parentName: String): Boolean = { { // For legacy 2-level list types with primitive element type, e.g.: // diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala index b1d930acb097..f8d1d98f7bf5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.{ArrayType, IntegerType, StructField, StructType} class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with SharedSparkSession { test("parquet files with different physical schemas but share the same logical schema") { @@ -96,6 +97,58 @@ class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with SharedS } } + test("parquet files with legacy mode and schema evolution") { + // This test case writes arrays in Parquet legacy mode and schema evolution and verifies that + // the data can be correctly read back. + + Seq(false, true).foreach { legacyMode => + withSQLConf(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> legacyMode.toString) { + withTempPath { tableDir => + val schema1 = StructType( + StructField("col-0", ArrayType( + StructType( + StructField("col-0", IntegerType, true) :: + Nil + ), + containsNull = false // allows to create 2-level Parquet LIST type in legacy mode + )) :: + Nil + ) + val row1 = Row(Seq(Row(1))) + val df1 = spark.createDataFrame(spark.sparkContext.parallelize(row1 :: Nil, 1), schema1) + df1.write.parquet(tableDir.getAbsolutePath) + + val schema2 = StructType( + StructField("col-0", ArrayType( + StructType( + StructField("col-0", IntegerType, true) :: + StructField("col-1", IntegerType, true) :: // additional field + Nil + ), + containsNull = false + )) :: + Nil + ) + val row2 = Row(Seq(Row(1, 2))) + val df2 = spark.createDataFrame(spark.sparkContext.parallelize(row2 :: Nil, 1), schema2) + df2.write.mode("append").parquet(tableDir.getAbsolutePath) + + // Reading of data should succeed and should not fail with + // java.lang.ClassCastException: optional int32 col-0 is not a group + withAllParquetReaders { + checkAnswer( + spark.read.schema(schema2).parquet(tableDir.getAbsolutePath), + Seq( + Row(Seq(Row(1, null))), + Row(Seq(Row(1, 2))) + ) + ) + } + } + } + } + } + test("parquet timestamp conversion") { // Make a table with one parquet file written by impala, and one parquet file written by spark. // We should only adjust the timestamps in the impala file, and only if the conf is set From 33fa362553e85e5362c32854d8eb0f2904d48db6 Mon Sep 17 00:00:00 2001 From: Ivan Sadikov Date: Mon, 20 Sep 2021 19:19:18 +1200 Subject: [PATCH 2/3] address comments --- .../datasources/parquet/ParquetRowConverter.scala | 14 ++++++++++---- .../parquet/ParquetInteroperabilitySuite.scala | 2 +- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index 0682266c4a90..25ea07e067a5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -574,7 +574,7 @@ private[parquet] class ParquetRowConverter( val repeatedType = parquetSchema.getType(0) val elementType = catalystSchema.elementType - // At this stage, we're not sure whether the repeated field maps to the element type or is + // At this stage, we need to figure out if the repeated field maps to the element type or is // just the syntactic repeated group of the 3-level standard LIST layout. Take the following // Parquet LIST-annotated group type as an example: // @@ -598,11 +598,17 @@ private[parquet] class ParquetRowConverter( // // ARRAY>> // + // // Here we try to convert field `list` into a Catalyst type to see whether the converted type - // matches the Catalyst array element type. If it doesn't match, then it's case 1; otherwise, - // it's case 2. + // matches the Catalyst array element type. + // + // If the guessed element type from the above does not match the Catalyst type (for example, + // in case of schema evolution), we need to check if the repeated type matches one of the + // backward-compatibility rules for legacy LIST types (see the link above). + // + // If the element type does not match the Catalyst type and the underlying repeated type + // does not belong to the legacy LIST type, then it is case 1; otherwise, it is case 2. val guessedElementType = schemaConverter.convertField(repeatedType) - // We also need to check if the list element follows the backward compatible pattern. val isLegacy = schemaConverter.isElementType(repeatedType, parquetSchema.getName()) if (DataType.equalsIgnoreCompatibleNullability(guessedElementType, elementType) || isLegacy) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala index f8d1d98f7bf5..a7395a61992d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala @@ -97,7 +97,7 @@ class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with SharedS } } - test("parquet files with legacy mode and schema evolution") { + test("SPARK-36803: parquet files with legacy mode and schema evolution") { // This test case writes arrays in Parquet legacy mode and schema evolution and verifies that // the data can be correctly read back. From 8a21e051130f1c360ea4432a9c1ffda9cb27865c Mon Sep 17 00:00:00 2001 From: Ivan Sadikov Date: Wed, 22 Sep 2021 12:32:00 +1200 Subject: [PATCH 3/3] fix parquetSchema.getName --- .../sql/execution/datasources/parquet/ParquetRowConverter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index 25ea07e067a5..583b4bab1ba6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -609,7 +609,7 @@ private[parquet] class ParquetRowConverter( // If the element type does not match the Catalyst type and the underlying repeated type // does not belong to the legacy LIST type, then it is case 1; otherwise, it is case 2. val guessedElementType = schemaConverter.convertField(repeatedType) - val isLegacy = schemaConverter.isElementType(repeatedType, parquetSchema.getName()) + val isLegacy = schemaConverter.isElementType(repeatedType, parquetSchema.getName) if (DataType.equalsIgnoreCompatibleNullability(guessedElementType, elementType) || isLegacy) { // If the repeated field corresponds to the element type, creates a new converter using the