Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ private[parquet] class ParquetRowConverter(
val repeatedType = parquetSchema.getType(0)
val elementType = catalystSchema.elementType

// At this stage, we're not sure whether the repeated field maps to the element type or is
// At this stage, we need to figure out if the repeated field maps to the element type or is
// just the syntactic repeated group of the 3-level standard LIST layout. Take the following
// Parquet LIST-annotated group type as an example:
//
Expand All @@ -598,12 +598,20 @@ private[parquet] class ParquetRowConverter(
//
// ARRAY<STRUCT<element: STRUCT<element: INT>>>
//
//
// Here we try to convert field `list` into a Catalyst type to see whether the converted type
// matches the Catalyst array element type. If it doesn't match, then it's case 1; otherwise,
// it's case 2.
// matches the Catalyst array element type.
//
// If the guessed element type from the above does not match the Catalyst type (for example,
// in case of schema evolution), we need to check if the repeated type matches one of the
// backward-compatibility rules for legacy LIST types (see the link above).
//
// If the element type does not match the Catalyst type and the underlying repeated type
// does not belong to the legacy LIST type, then it is case 1; otherwise, it is case 2.
val guessedElementType = schemaConverter.convertField(repeatedType)
val isLegacy = schemaConverter.isElementType(repeatedType, parquetSchema.getName)

if (DataType.equalsIgnoreCompatibleNullability(guessedElementType, elementType)) {
if (DataType.equalsIgnoreCompatibleNullability(guessedElementType, elementType) || isLegacy) {
// If the repeated field corresponds to the element type, creates a new converter using the
// type of the repeated field.
newConverter(repeatedType, elementType, new ParentContainerUpdater {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ class ParquetToSparkSchemaConverter(
// Here we implement Parquet LIST backwards-compatibility rules.
// See: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules
// scalastyle:on
private def isElementType(repeatedType: Type, parentName: String): Boolean = {
private[parquet] def isElementType(repeatedType: Type, parentName: String): Boolean = {
{
// For legacy 2-level list types with primitive element type, e.g.:
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{ArrayType, IntegerType, StructField, StructType}

class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with SharedSparkSession {
test("parquet files with different physical schemas but share the same logical schema") {
Expand Down Expand Up @@ -96,6 +97,58 @@ class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with SharedS
}
}

test("SPARK-36803: parquet files with legacy mode and schema evolution") {
// This test case writes arrays in Parquet legacy mode and schema evolution and verifies that
// the data can be correctly read back.

Seq(false, true).foreach { legacyMode =>
withSQLConf(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> legacyMode.toString) {
withTempPath { tableDir =>
val schema1 = StructType(
StructField("col-0", ArrayType(
StructType(
StructField("col-0", IntegerType, true) ::
Nil
),
containsNull = false // allows to create 2-level Parquet LIST type in legacy mode
)) ::
Nil
)
val row1 = Row(Seq(Row(1)))
val df1 = spark.createDataFrame(spark.sparkContext.parallelize(row1 :: Nil, 1), schema1)
df1.write.parquet(tableDir.getAbsolutePath)

val schema2 = StructType(
StructField("col-0", ArrayType(
StructType(
StructField("col-0", IntegerType, true) ::
StructField("col-1", IntegerType, true) :: // additional field
Nil
),
containsNull = false
)) ::
Nil
)
val row2 = Row(Seq(Row(1, 2)))
val df2 = spark.createDataFrame(spark.sparkContext.parallelize(row2 :: Nil, 1), schema2)
df2.write.mode("append").parquet(tableDir.getAbsolutePath)

// Reading of data should succeed and should not fail with
// java.lang.ClassCastException: optional int32 col-0 is not a group
withAllParquetReaders {
checkAnswer(
spark.read.schema(schema2).parquet(tableDir.getAbsolutePath),
Seq(
Row(Seq(Row(1, null))),
Row(Seq(Row(1, 2)))
)
)
}
}
}
}
}

test("parquet timestamp conversion") {
// Make a table with one parquet file written by impala, and one parquet file written by spark.
// We should only adjust the timestamps in the impala file, and only if the conf is set
Expand Down