Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
update parquet row converter for arrays
  • Loading branch information
sadikovi committed Sep 20, 2021
commit d3d47f736286af906e66c3faf3dae7995eb28c9c
Original file line number Diff line number Diff line change
Expand Up @@ -602,8 +602,10 @@ private[parquet] class ParquetRowConverter(
// matches the Catalyst array element type. If it doesn't match, then it's case 1; otherwise,
// it's case 2.
val guessedElementType = schemaConverter.convertField(repeatedType)
// We also need to check if the list element follows the backward compatible pattern.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we also update the long code comment above?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can. I will update, thanks.

val isLegacy = schemaConverter.isElementType(repeatedType, parquetSchema.getName())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting - does it mean in the parquet-mr read path Spark were not able to handle legacy list format? also do we need to do something similar to legacy map format?

BTW: you can remove () in parquetSchema.getName() since this is an accessor method.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, the existing schemaConverter.convertField(repeatedType) already covered the legacy format lists but this particular issue is about schema evolution with added new struct fields. I wonder whether it's better to just expand equalsIgnoreCompatibleNullability and allow element to contain guessedElementType.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that is correct, legacy format would still be read by Spark, it was schema evolution of a list element that could trigger this issue. If all of the files have the same schema, everything should work just fine.

I considered having something like "contains" instead of "equals" but I had a concern that this might introduce issues when the schema "contains" but it should still be treated as a 3-level LIST. Also, I could not find "contains" method for DataType in the codebase. IMHO, it is better to check parquet compatibility issues using parquet schema rather Catalyst schema which was meant to reconcile those types anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know, making method non-private is not ideal but neither is adding a new DataType."contains" method or duplicating code for another function. Let me know what you think could be a better (or the least intrusive) approach. Thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Yea it's just a nit from me and this looks OK to me too.


if (DataType.equalsIgnoreCompatibleNullability(guessedElementType, elementType)) {
if (DataType.equalsIgnoreCompatibleNullability(guessedElementType, elementType) || isLegacy) {
// If the repeated field corresponds to the element type, creates a new converter using the
// type of the repeated field.
newConverter(repeatedType, elementType, new ParentContainerUpdater {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ class ParquetToSparkSchemaConverter(
// Here we implement Parquet LIST backwards-compatibility rules.
// See: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules
// scalastyle:on
private def isElementType(repeatedType: Type, parentName: String): Boolean = {
private[parquet] def isElementType(repeatedType: Type, parentName: String): Boolean = {
{
// For legacy 2-level list types with primitive element type, e.g.:
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{ArrayType, IntegerType, StructField, StructType}

class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with SharedSparkSession {
test("parquet files with different physical schemas but share the same logical schema") {
Expand Down Expand Up @@ -96,6 +97,58 @@ class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with SharedS
}
}

test("parquet files with legacy mode and schema evolution") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
test("parquet files with legacy mode and schema evolution") {
test("SPARK-36803: parquet files with legacy mode and schema evolution") {

// This test case writes arrays in Parquet legacy mode and schema evolution and verifies that
// the data can be correctly read back.

Seq(false, true).foreach { legacyMode =>
withSQLConf(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> legacyMode.toString) {
withTempPath { tableDir =>
val schema1 = StructType(
StructField("col-0", ArrayType(
StructType(
StructField("col-0", IntegerType, true) ::
Nil
),
containsNull = false // allows to create 2-level Parquet LIST type in legacy mode
)) ::
Nil
)
val row1 = Row(Seq(Row(1)))
val df1 = spark.createDataFrame(spark.sparkContext.parallelize(row1 :: Nil, 1), schema1)
df1.write.parquet(tableDir.getAbsolutePath)

val schema2 = StructType(
StructField("col-0", ArrayType(
StructType(
StructField("col-0", IntegerType, true) ::
StructField("col-1", IntegerType, true) :: // additional field
Nil
),
containsNull = false
)) ::
Nil
)
val row2 = Row(Seq(Row(1, 2)))
val df2 = spark.createDataFrame(spark.sparkContext.parallelize(row2 :: Nil, 1), schema2)
df2.write.mode("append").parquet(tableDir.getAbsolutePath)

// Reading of data should succeed and should not fail with
// java.lang.ClassCastException: optional int32 col-0 is not a group
withAllParquetReaders {
checkAnswer(
spark.read.schema(schema2).parquet(tableDir.getAbsolutePath),
Seq(
Row(Seq(Row(1, null))),
Row(Seq(Row(1, 2)))
)
)
}
}
}
}
}

test("parquet timestamp conversion") {
// Make a table with one parquet file written by impala, and one parquet file written by spark.
// We should only adjust the timestamps in the impala file, and only if the conf is set
Expand Down