From dc2deb379c56104794beb2a2be27246c24875012 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 6 Apr 2020 22:09:25 +0800 Subject: [PATCH 1/2] Rebase datetime in parquet/avro according to file metadata --- .../spark/sql/avro/AvroDeserializer.scala | 12 +- .../spark/sql/avro/AvroFileFormat.scala | 11 +- .../spark/sql/avro/AvroOutputWriter.scala | 17 ++- .../spark/sql/avro/AvroSerializer.scala | 14 ++- .../v2/avro/AvroPartitionReaderFactory.scala | 10 +- .../AvroCatalystDataConversionSuite.scala | 2 +- .../org/apache/spark/sql/avro/AvroSuite.scala | 117 ++++++++++++++---- .../parquet/VectorizedColumnReader.java | 6 +- .../VectorizedParquetRecordReader.java | 18 ++- .../datasources/DataSourceUtils.scala | 17 +++ .../parquet/ParquetFileFormat.scala | 13 +- .../parquet/ParquetReadSupport.scala | 15 ++- .../parquet/ParquetRecordMaterializer.scala | 10 +- .../parquet/ParquetRowConverter.scala | 8 +- .../parquet/ParquetWriteSupport.scala | 6 +- .../ParquetPartitionReaderFactory.scala | 26 ++-- .../scala/org/apache/spark/sql/package.scala | 6 + .../benchmark/DataSourceReadBenchmark.scala | 6 +- .../parquet/ParquetEncodingSuite.scala | 6 +- .../datasources/parquet/ParquetIOSuite.scala | 95 +++++++++++--- .../datasources/parquet/ParquetTest.scala | 7 +- 21 files changed, 318 insertions(+), 104 deletions(-) diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala index 8d78cf40a2d5..f32fe46bb6e1 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala @@ -41,12 +41,14 @@ import org.apache.spark.unsafe.types.UTF8String /** * A deserializer to deserialize data in avro format to data in catalyst format. */ -class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) { - private lazy val decimalConversions = new DecimalConversion() +class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType, rebaseDateTime: Boolean) { + + def this(rootAvroType: Schema, rootCatalystType: DataType) { + this(rootAvroType, rootCatalystType, + SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ)) + } - // Enable rebasing date/timestamp from Julian to Proleptic Gregorian calendar - private val rebaseDateTime = - SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ) + private lazy val decimalConversions = new DecimalConversion() private val converter: Any => Any = rootCatalystType match { // A shortcut for empty schema. diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala index 123669ba1376..e69c95b797c7 100755 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala @@ -34,7 +34,8 @@ import org.apache.spark.TaskContext import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriterFactory, PartitionedFile} +import org.apache.spark.sql.execution.datasources.{DataSourceUtils, FileFormat, OutputWriterFactory, PartitionedFile} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.{DataSourceRegister, Filter} import org.apache.spark.sql.types._ import org.apache.spark.util.SerializableConfiguration @@ -123,8 +124,12 @@ private[sql] class AvroFileFormat extends FileFormat reader.sync(file.start) val stop = file.start + file.length - val deserializer = - new AvroDeserializer(userProvidedSchema.getOrElse(reader.getSchema), requiredSchema) + val rebaseDateTime = DataSourceUtils.needRebaseDateTime( + reader.asInstanceOf[DataFileReader[_]].getMetaString).getOrElse { + SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ) + } + val deserializer = new AvroDeserializer( + userProvidedSchema.getOrElse(reader.getSchema), requiredSchema, rebaseDateTime) new Iterator[InternalRow] { private[this] var completed = false diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala index 2cfa3a4826ed..82a568049990 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala @@ -29,9 +29,10 @@ import org.apache.hadoop.io.NullWritable import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext} import org.apache.spark.SPARK_VERSION_SHORT -import org.apache.spark.sql.SPARK_VERSION_METADATA_KEY +import org.apache.spark.sql.{SPARK_LEGACY_DATETIME, SPARK_VERSION_METADATA_KEY} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.OutputWriter +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ // NOTE: This class is instantiated and used on executor side only, no need to be serializable. @@ -41,16 +42,24 @@ private[avro] class AvroOutputWriter( schema: StructType, avroSchema: Schema) extends OutputWriter { + // Whether to rebase datetimes from Gregorian to Julian calendar in write + private val rebaseDateTime: Boolean = + SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE) + // The input rows will never be null. - private lazy val serializer = new AvroSerializer(schema, avroSchema, nullable = false) + private lazy val serializer = + new AvroSerializer(schema, avroSchema, nullable = false, rebaseDateTime) /** * Overrides the couple of methods responsible for generating the output streams / files so * that the data can be correctly partitioned */ private val recordWriter: RecordWriter[AvroKey[GenericRecord], NullWritable] = { - val sparkVersion = Map(SPARK_VERSION_METADATA_KEY -> SPARK_VERSION_SHORT).asJava - new SparkAvroKeyOutputFormat(sparkVersion) { + val fileMeta = Map(SPARK_VERSION_METADATA_KEY -> SPARK_VERSION_SHORT) ++ { + if (rebaseDateTime) Some(SPARK_LEGACY_DATETIME -> "") else None + } + + new SparkAvroKeyOutputFormat(fileMeta.asJava) { override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { new Path(path) diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala index daa9c7d611ee..c87249e29fbd 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala @@ -42,12 +42,16 @@ import org.apache.spark.sql.types._ /** * A serializer to serialize data in catalyst format to data in avro format. */ -class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean) - extends Logging { +class AvroSerializer( + rootCatalystType: DataType, + rootAvroType: Schema, + nullable: Boolean, + rebaseDateTime: Boolean) extends Logging { - // Whether to rebase datetimes from Gregorian to Julian calendar in write - private val rebaseDateTime: Boolean = - SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE) + def this(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean) { + this(rootCatalystType, rootAvroType, nullable, + SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE)) + } def serialize(catalystData: Any): Any = { converter.apply(catalystData) diff --git a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala index 8230dbaf8ea6..712aec6acbd5 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala @@ -32,7 +32,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.avro.{AvroDeserializer, AvroOptions} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.read.PartitionReader -import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedFile} import org.apache.spark.sql.execution.datasources.v2.{EmptyPartitionReader, FilePartitionReaderFactory, PartitionReaderWithPartitionValues} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType @@ -88,8 +88,12 @@ case class AvroPartitionReaderFactory( reader.sync(partitionedFile.start) val stop = partitionedFile.start + partitionedFile.length - val deserializer = - new AvroDeserializer(userProvidedSchema.getOrElse(reader.getSchema), readDataSchema) + val rebaseDateTime = DataSourceUtils.needRebaseDateTime( + reader.asInstanceOf[DataFileReader[_]].getMetaString).getOrElse { + SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ) + } + val deserializer = new AvroDeserializer( + userProvidedSchema.getOrElse(reader.getSchema), readDataSchema, rebaseDateTime) val fileReader = new PartitionReader[InternalRow] { private[this] var completed = false diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala index c8a1f670bda9..64d790bc4acd 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala @@ -288,7 +288,7 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite """.stripMargin val avroSchema = new Schema.Parser().parse(jsonFormatSchema) val dataType = SchemaConverters.toSqlType(avroSchema).dataType - val deserializer = new AvroDeserializer(avroSchema, dataType) + val deserializer = new AvroDeserializer(avroSchema, dataType, rebaseDateTime = false) def checkDeserialization(data: GenericData.Record, expected: Any): Unit = { assert(checkResult( diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index a5224fd104ea..cf8e2bd7d561 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -84,9 +84,8 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession { }, new GenericDatumReader[Any]()).getSchema.toString(false) } - private def readResourceAvroFile(name: String): DataFrame = { - val url = Thread.currentThread().getContextClassLoader.getResource(name) - spark.read.format("avro").load(url.toString) + private def getResourceAvroFilePath(name: String): String = { + Thread.currentThread().getContextClassLoader.getResource(name).toString } test("resolve avro data source") { @@ -1531,15 +1530,63 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession { test("SPARK-31183: compatibility with Spark 2.4 in reading dates/timestamps") { withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> "true") { - checkAnswer( - readResourceAvroFile("before_1582_date_v2_4.avro"), - Row(java.sql.Date.valueOf("1001-01-01"))) - checkAnswer( - readResourceAvroFile("before_1582_ts_micros_v2_4.avro"), - Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456"))) - checkAnswer( - readResourceAvroFile("before_1582_ts_millis_v2_4.avro"), - Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.124"))) + // test reading the existing 2.4 files and 3.0 newly written files together. + withTempPath { path => + val path2_4 = getResourceAvroFilePath("before_1582_date_v2_4.avro") + val path3_0 = path.getCanonicalPath + val dateStr = "1001-01-01" + Seq(dateStr).toDF("str").select($"str".cast("date").as("date")) + .write.format("avro").save(path3_0) + checkAnswer( + spark.read.format("avro").load(path2_4, path3_0), + Seq( + Row(java.sql.Date.valueOf(dateStr)), + Row(java.sql.Date.valueOf(dateStr)))) + } + + withTempPath { path => + val path2_4 = getResourceAvroFilePath("before_1582_ts_micros_v2_4.avro") + val path3_0 = path.getCanonicalPath + val avroSchema = + """ + |{ + | "type" : "record", + | "name" : "test_schema", + | "fields" : [ + | {"name": "ts", "type": {"type": "long", "logicalType": "timestamp-micros"}} + | ] + |}""".stripMargin + val tsStr = "1001-01-01 01:02:03.123456" + Seq(tsStr).toDF("str").select($"str".cast("timestamp").as("ts")) + .write.format("avro").option("avroSchema", avroSchema).save(path3_0) + checkAnswer( + spark.read.format("avro").load(path2_4, path3_0), + Seq( + Row(java.sql.Timestamp.valueOf(tsStr)), + Row(java.sql.Timestamp.valueOf(tsStr)))) + } + + withTempPath { path => + val path2_4 = getResourceAvroFilePath("before_1582_ts_millis_v2_4.avro") + val path3_0 = path.getCanonicalPath + val avroSchema = + """ + |{ + | "type" : "record", + | "name" : "test_schema", + | "fields" : [ + | {"name": "ts", "type": {"type": "long", "logicalType": "timestamp-millis"}} + | ] + |}""".stripMargin + val tsStr = "1001-01-01 01:02:03.124" + Seq(tsStr).toDF("str").select($"str".cast("timestamp").as("ts")) + .write.format("avro").option("avroSchema", avroSchema).save(path3_0) + checkAnswer( + spark.read.format("avro").load(path2_4, path3_0), + Seq( + Row(java.sql.Timestamp.valueOf(tsStr)), + Row(java.sql.Timestamp.valueOf(tsStr)))) + } } } @@ -1554,10 +1601,18 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession { .write.format("avro") .save(path) } - withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> "true") { - checkAnswer(spark.read.format("avro").load(path), Row(Timestamp.valueOf(tsStr))) + + // The file metadata indicates if it needs rebase or not, so we can always get the correct + // result regardless of the "rebaseInRead" config. + Seq(true, false).foreach { rebase => + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> rebase.toString) { + checkAnswer(spark.read.format("avro").load(path), Row(Timestamp.valueOf(tsStr))) + } } - withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> "false") { + + // Force to not rebase to prove the written datetime values are rebased and we will get + // wrong result if we don't rebase while reading. + withSQLConf("spark.test.forceNoRebase" -> "true") { checkAnswer(spark.read.format("avro").load(path), Row(Timestamp.valueOf(nonRebased))) } } @@ -1589,12 +1644,20 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession { .format("avro") .save(path) } - withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> "true") { - checkAnswer( - spark.read.schema("ts timestamp").format("avro").load(path), - Row(Timestamp.valueOf(rebased))) + + // The file metadata indicates if it needs rebase or not, so we can always get the correct + // result regardless of the "rebaseInRead" config. + Seq(true, false).foreach { rebase => + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> rebase.toString) { + checkAnswer( + spark.read.schema("ts timestamp").format("avro").load(path), + Row(Timestamp.valueOf(rebased))) + } } - withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> "false") { + + // Force to not rebase to prove the written datetime values are rebased and we will get + // wrong result if we don't rebase while reading. + withSQLConf("spark.test.forceNoRebase" -> "true") { checkAnswer( spark.read.schema("ts timestamp").format("avro").load(path), Row(Timestamp.valueOf(nonRebased))) @@ -1612,10 +1675,18 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession { .write.format("avro") .save(path) } - withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> "true") { - checkAnswer(spark.read.format("avro").load(path), Row(Date.valueOf("1001-01-01"))) + + // The file metadata indicates if it needs rebase or not, so we can always get the correct + // result regardless of the "rebaseInRead" config. + Seq(true, false).foreach { rebase => + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> rebase.toString) { + checkAnswer(spark.read.format("avro").load(path), Row(Date.valueOf("1001-01-01"))) + } } - withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> "false") { + + // Force to not rebase to prove the written datetime values are rebased and we will get + // wrong result if we don't rebase while reading. + withSQLConf("spark.test.forceNoRebase" -> "true") { checkAnswer(spark.read.format("avro").load(path), Row(Date.valueOf("1001-01-07"))) } } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java index c50619a4c12e..cfb873ff3737 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java @@ -38,7 +38,6 @@ import org.apache.spark.sql.catalyst.util.RebaseDateTime; import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException; import org.apache.spark.sql.execution.vectorized.WritableColumnVector; -import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.DecimalType; @@ -109,7 +108,8 @@ public VectorizedColumnReader( ColumnDescriptor descriptor, OriginalType originalType, PageReader pageReader, - ZoneId convertTz) throws IOException { + ZoneId convertTz, + boolean rebaseDateTime) throws IOException { this.descriptor = descriptor; this.pageReader = pageReader; this.convertTz = convertTz; @@ -132,7 +132,7 @@ public VectorizedColumnReader( if (totalValueCount == 0) { throw new IOException("totalValueCount == 0"); } - this.rebaseDateTime = SQLConf.get().parquetRebaseDateTimeInReadEnabled(); + this.rebaseDateTime = rebaseDateTime; } /** diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java index 7306709a79c3..c9590b97ce9c 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java @@ -86,7 +86,12 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa * The timezone that timestamp INT96 values should be converted to. Null if no conversion. Here to * workaround incompatibilities between different engines when writing timestamp values. */ - private ZoneId convertTz = null; + private final ZoneId convertTz; + + /** + * true if need to rebase date/timestamp from Julian to Proleptic Gregorian calendar. + */ + private final boolean rebaseDateTime; /** * columnBatch object that is used for batch decoding. This is created on first use and triggers @@ -116,12 +121,19 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa */ private final MemoryMode MEMORY_MODE; - public VectorizedParquetRecordReader(ZoneId convertTz, boolean useOffHeap, int capacity) { + public VectorizedParquetRecordReader( + ZoneId convertTz, boolean rebaseDateTime, boolean useOffHeap, int capacity) { this.convertTz = convertTz; + this.rebaseDateTime = rebaseDateTime; MEMORY_MODE = useOffHeap ? MemoryMode.OFF_HEAP : MemoryMode.ON_HEAP; this.capacity = capacity; } + // For test only. + public VectorizedParquetRecordReader(boolean useOffHeap, int capacity) { + this(null, false, useOffHeap, capacity); + } + /** * Implementation of RecordReader API. */ @@ -309,7 +321,7 @@ private void checkEndOfRowGroup() throws IOException { for (int i = 0; i < columns.size(); ++i) { if (missingColumns[i]) continue; columnReaders[i] = new VectorizedColumnReader(columns.get(i), types.get(i).getOriginalType(), - pages.getPageReader(columns.get(i)), convertTz); + pages.getPageReader(columns.get(i)), convertTz, rebaseDateTime); } totalCountLoadedSoFar += pages.getRowCount(); } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala index bd56635084c3..b19de6d3deb8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala @@ -21,8 +21,11 @@ import org.apache.hadoop.fs.Path import org.json4s.NoTypeHints import org.json4s.jackson.Serialization +import org.apache.spark.sql.{SPARK_LEGACY_DATETIME, SPARK_VERSION_METADATA_KEY} import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils object DataSourceUtils { @@ -64,4 +67,18 @@ object DataSourceUtils { private[sql] def isDataFile(fileName: String) = !(fileName.startsWith("_") || fileName.startsWith(".")) + + def needRebaseDateTime(lookupFileMeta: String => String): Option[Boolean] = { + if (Utils.isTesting && SQLConf.get.getConfString("spark.test.forceNoRebase", "") == "true") { + return Some(false) + } + // If there is no version, we return None and let the caller side to decide. + Option(lookupFileMeta(SPARK_VERSION_METADATA_KEY)).map { version => + // Files written by Spark 2.4 and earlier follow the legacy hybrid calendar and we need to + // rebase the datetime values. + // Files written by Spark 3.0 and latter may also need the rebase if they were written with + // the "rebaseInWrite" config enabled. + version < "3.0.0" || lookupFileMeta(SPARK_LEGACY_DATETIME) != null + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 29dbd8dfbca8..c6d9ddf370e2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -300,6 +300,11 @@ class ParquetFileFormat None } + val rebaseDateTime = DataSourceUtils.needRebaseDateTime( + footerFileMetaData.getKeyValueMetaData.get).getOrElse { + SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ) + } + val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) val hadoopAttemptContext = new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId) @@ -312,7 +317,10 @@ class ParquetFileFormat val taskContext = Option(TaskContext.get()) if (enableVectorizedReader) { val vectorizedReader = new VectorizedParquetRecordReader( - convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined, capacity) + convertTz.orNull, + rebaseDateTime, + enableOffHeapColumnVector && taskContext.isDefined, + capacity) val iter = new RecordReaderIterator(vectorizedReader) // SPARK-23457 Register a task completion listener before `initialization`. taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close())) @@ -328,7 +336,8 @@ class ParquetFileFormat } else { logDebug(s"Falling back to parquet-mr") // ParquetRecordReader returns InternalRow - val readSupport = new ParquetReadSupport(convertTz, enableVectorizedReader = false) + val readSupport = new ParquetReadSupport( + convertTz, enableVectorizedReader = false, rebaseDateTime) val reader = if (pushed.isDefined && enableRecordFilter) { val parquetFilter = FilterCompat.get(pushed.get, null) new ParquetRecordReader[InternalRow](readSupport, parquetFilter) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala index c05ecf16311a..28165e0bbecd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala @@ -50,16 +50,18 @@ import org.apache.spark.sql.types._ * Due to this reason, we no longer rely on [[ReadContext]] to pass requested schema from [[init()]] * to [[prepareForRead()]], but use a private `var` for simplicity. */ -class ParquetReadSupport(val convertTz: Option[ZoneId], - enableVectorizedReader: Boolean) +class ParquetReadSupport( + val convertTz: Option[ZoneId], + enableVectorizedReader: Boolean, + rebaseDateTime: Boolean) extends ReadSupport[InternalRow] with Logging { private var catalystRequestedSchema: StructType = _ def this() { // We need a zero-arg constructor for SpecificParquetRecordReaderBase. But that is only - // used in the vectorized reader, where we get the convertTz value directly, and the value here - // is ignored. - this(None, enableVectorizedReader = true) + // used in the vectorized reader, where we get the convertTz/rebaseDateTime value directly, + // and the values here are ignored. + this(None, enableVectorizedReader = true, rebaseDateTime = false) } /** @@ -127,7 +129,8 @@ class ParquetReadSupport(val convertTz: Option[ZoneId], parquetRequestedSchema, ParquetReadSupport.expandUDT(catalystRequestedSchema), new ParquetToSparkSchemaConverter(conf), - convertTz) + convertTz, + rebaseDateTime) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala index 5622169df128..ba7511c5b8ae 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala @@ -31,16 +31,20 @@ import org.apache.spark.sql.types.StructType * @param parquetSchema Parquet schema of the records to be read * @param catalystSchema Catalyst schema of the rows to be constructed * @param schemaConverter A Parquet-Catalyst schema converter that helps initializing row converters + * @param convertTz the optional time zone to convert to for int96 data + * @param rebaseDateTime true if need to rebase date/timestamp from Julian to Proleptic Gregorian + * calendar */ private[parquet] class ParquetRecordMaterializer( parquetSchema: MessageType, catalystSchema: StructType, schemaConverter: ParquetToSparkSchemaConverter, - convertTz: Option[ZoneId]) + convertTz: Option[ZoneId], + rebaseDateTime: Boolean) extends RecordMaterializer[InternalRow] { - private val rootConverter = - new ParquetRowConverter(schemaConverter, parquetSchema, catalystSchema, convertTz, NoopUpdater) + private val rootConverter = new ParquetRowConverter( + schemaConverter, parquetSchema, catalystSchema, convertTz, rebaseDateTime, NoopUpdater) override def getCurrentRecord: InternalRow = rootConverter.currentRecord diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index 6072db102fe3..705995fee33f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -121,6 +121,8 @@ private[parquet] class ParquetPrimitiveConverter(val updater: ParentContainerUpd * @param catalystType Spark SQL schema that corresponds to the Parquet record type. User-defined * types should have been expanded. * @param convertTz the optional time zone to convert to for int96 data + * @param rebaseDateTime true if need to rebase date/timestamp from Julian to Proleptic Gregorian + * calendar * @param updater An updater which propagates converted field values to the parent container */ private[parquet] class ParquetRowConverter( @@ -128,12 +130,10 @@ private[parquet] class ParquetRowConverter( parquetType: GroupType, catalystType: StructType, convertTz: Option[ZoneId], + rebaseDateTime: Boolean, updater: ParentContainerUpdater) extends ParquetGroupConverter(updater) with Logging { - // Enable rebasing date/timestamp from Julian to Proleptic Gregorian calendar - private val rebaseDateTime = SQLConf.get.parquetRebaseDateTimeInReadEnabled - assert( parquetType.getFieldCount <= catalystType.length, s"""Field count of the Parquet schema is greater than the field count of the Catalyst schema: @@ -386,7 +386,7 @@ private[parquet] class ParquetRowConverter( } } new ParquetRowConverter( - schemaConverter, parquetType.asGroupType(), t, convertTz, wrappedUpdater) + schemaConverter, parquetType.asGroupType(), t, convertTz, rebaseDateTime, wrappedUpdater) case t => throw new RuntimeException( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala index 7317a250060b..b135611dd641 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala @@ -31,7 +31,7 @@ import org.apache.parquet.io.api.{Binary, RecordConsumer} import org.apache.spark.SPARK_VERSION_SHORT import org.apache.spark.internal.Logging -import org.apache.spark.sql.SPARK_VERSION_METADATA_KEY +import org.apache.spark.sql.{SPARK_LEGACY_DATETIME, SPARK_VERSION_METADATA_KEY} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.SpecializedGetters import org.apache.spark.sql.catalyst.util.DateTimeUtils @@ -103,7 +103,7 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging { val metadata = Map( SPARK_VERSION_METADATA_KEY -> SPARK_VERSION_SHORT, ParquetReadSupport.SPARK_METADATA_KEY -> schemaString - ).asJava + ) ++ (if (rebaseDateTime) Some(SPARK_LEGACY_DATETIME -> "") else None) logInfo( s"""Initialized Parquet WriteSupport with Catalyst schema: @@ -112,7 +112,7 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging { |$messageType """.stripMargin) - new WriteContext(messageType, metadata) + new WriteContext(messageType, metadata.asJava) } override def prepareForWrite(recordConsumer: RecordConsumer): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala index 047bc74a8d81..1925fa1796d4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala @@ -33,7 +33,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader} -import org.apache.spark.sql.execution.datasources.{PartitionedFile, RecordReaderIterator} +import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedFile, RecordReaderIterator} import org.apache.spark.sql.execution.datasources.parquet._ import org.apache.spark.sql.execution.datasources.v2._ import org.apache.spark.sql.internal.SQLConf @@ -117,7 +117,7 @@ case class ParquetPartitionReaderFactory( file: PartitionedFile, buildReaderFunc: ( ParquetInputSplit, InternalRow, TaskAttemptContextImpl, Option[FilterPredicate], - Option[ZoneId]) => RecordReader[Void, T]): RecordReader[Void, T] = { + Option[ZoneId], Boolean) => RecordReader[Void, T]): RecordReader[Void, T] = { val conf = broadcastedConf.value.value val filePath = new Path(new URI(file.filePath)) @@ -169,8 +169,12 @@ case class ParquetPartitionReaderFactory( if (pushed.isDefined) { ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get) } - val reader = - buildReaderFunc(split, file.partitionValues, hadoopAttemptContext, pushed, convertTz) + val rebaseDatetime = DataSourceUtils.needRebaseDateTime( + footerFileMetaData.getKeyValueMetaData.get).getOrElse { + SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ) + } + val reader = buildReaderFunc( + split, file.partitionValues, hadoopAttemptContext, pushed, convertTz, rebaseDatetime) reader.initialize(split, hadoopAttemptContext) reader } @@ -184,11 +188,13 @@ case class ParquetPartitionReaderFactory( partitionValues: InternalRow, hadoopAttemptContext: TaskAttemptContextImpl, pushed: Option[FilterPredicate], - convertTz: Option[ZoneId]): RecordReader[Void, InternalRow] = { + convertTz: Option[ZoneId], + needDateTimeRebase: Boolean): RecordReader[Void, InternalRow] = { logDebug(s"Falling back to parquet-mr") val taskContext = Option(TaskContext.get()) // ParquetRecordReader returns InternalRow - val readSupport = new ParquetReadSupport(convertTz, enableVectorizedReader = false) + val readSupport = new ParquetReadSupport( + convertTz, enableVectorizedReader = false, needDateTimeRebase) val reader = if (pushed.isDefined && enableRecordFilter) { val parquetFilter = FilterCompat.get(pushed.get, null) new ParquetRecordReader[InternalRow](readSupport, parquetFilter) @@ -213,10 +219,14 @@ case class ParquetPartitionReaderFactory( partitionValues: InternalRow, hadoopAttemptContext: TaskAttemptContextImpl, pushed: Option[FilterPredicate], - convertTz: Option[ZoneId]): VectorizedParquetRecordReader = { + convertTz: Option[ZoneId], + rebaseDatetime: Boolean): VectorizedParquetRecordReader = { val taskContext = Option(TaskContext.get()) val vectorizedReader = new VectorizedParquetRecordReader( - convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined, capacity) + convertTz.orNull, + rebaseDatetime, + enableOffHeapColumnVector && taskContext.isDefined, + capacity) val iter = new RecordReaderIterator(vectorizedReader) // SPARK-23457 Register a task completion listener before `initialization`. taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close())) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/package.scala index 58de6758f4d7..18975c52d0bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/package.scala @@ -54,4 +54,10 @@ package object sql { * Note that Hive table property `spark.sql.create.version` also has Spark version. */ private[sql] val SPARK_VERSION_METADATA_KEY = "org.apache.spark.version" + + /** + * Parquet/Avro file metadata key to indicate that the file was written with legacy datetime + * values. + */ + private[sql] val SPARK_LEGACY_DATETIME = "org.apache.spark.legacyDatetime" } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala index a084bec98551..d29c5e3f8801 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala @@ -169,7 +169,7 @@ object DataSourceReadBenchmark extends SqlBasedBenchmark { files.map(_.asInstanceOf[String]).foreach { p => val reader = new VectorizedParquetRecordReader( - null, enableOffHeapColumnVector, vectorizedReaderBatchSize) + enableOffHeapColumnVector, vectorizedReaderBatchSize) try { reader.initialize(p, ("id" :: Nil).asJava) val batch = reader.resultBatch() @@ -203,7 +203,7 @@ object DataSourceReadBenchmark extends SqlBasedBenchmark { files.map(_.asInstanceOf[String]).foreach { p => val reader = new VectorizedParquetRecordReader( - null, enableOffHeapColumnVector, vectorizedReaderBatchSize) + enableOffHeapColumnVector, vectorizedReaderBatchSize) try { reader.initialize(p, ("id" :: Nil).asJava) val batch = reader.resultBatch() @@ -458,7 +458,7 @@ object DataSourceReadBenchmark extends SqlBasedBenchmark { var sum = 0 files.map(_.asInstanceOf[String]).foreach { p => val reader = new VectorizedParquetRecordReader( - null, enableOffHeapColumnVector, vectorizedReaderBatchSize) + enableOffHeapColumnVector, vectorizedReaderBatchSize) try { reader.initialize(p, ("c1" :: "c2" :: Nil).asJava) val batch = reader.resultBatch() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala index 6d681afd23b1..fbfedf02dc87 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala @@ -42,7 +42,7 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSparkSess val conf = sqlContext.conf val reader = new VectorizedParquetRecordReader( - null, conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize) + conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize) reader.initialize(file.asInstanceOf[String], null) val batch = reader.resultBatch() assert(reader.nextBatch()) @@ -69,7 +69,7 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSparkSess val conf = sqlContext.conf val reader = new VectorizedParquetRecordReader( - null, conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize) + conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize) reader.initialize(file.asInstanceOf[String], null) val batch = reader.resultBatch() assert(reader.nextBatch()) @@ -100,7 +100,7 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSparkSess val conf = sqlContext.conf val reader = new VectorizedParquetRecordReader( - null, conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize) + conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize) reader.initialize(file, null /* set columns to null to project all columns */) val column = reader.resultBatch().column(0) assert(reader.nextBatch()) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index f901ce16f096..b79f019c9a0c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.datasources.parquet import java.sql.{Date, Timestamp} +import java.time._ import java.util.Locale import scala.collection.JavaConverters._ @@ -720,7 +721,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession { val conf = sqlContext.conf val reader = new VectorizedParquetRecordReader( - null, conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize) + conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize) try { reader.initialize(file, null) val result = mutable.ArrayBuffer.empty[(Int, String)] @@ -739,7 +740,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession { val conf = sqlContext.conf val reader = new VectorizedParquetRecordReader( - null, conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize) + conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize) try { reader.initialize(file, ("_2" :: Nil).asJava) val result = mutable.ArrayBuffer.empty[(String)] @@ -757,7 +758,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession { val conf = sqlContext.conf val reader = new VectorizedParquetRecordReader( - null, conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize) + conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize) try { reader.initialize(file, ("_2" :: "_1" :: Nil).asJava) val result = mutable.ArrayBuffer.empty[(String, Int)] @@ -776,7 +777,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession { val conf = sqlContext.conf val reader = new VectorizedParquetRecordReader( - null, conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize) + conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize) try { reader.initialize(file, List[String]().asJava) var result = 0 @@ -817,7 +818,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession val schema = StructType(StructField("pcol", dt) :: Nil) val conf = sqlContext.conf val vectorizedReader = new VectorizedParquetRecordReader( - null, conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize) + conf.offHeapColumnVectorEnabled, conf.parquetVectorizedReaderBatchSize) val partitionValues = new GenericInternalRow(Array(v)) val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0) @@ -885,16 +886,55 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession Seq(false, true).foreach { vectorized => withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString) { withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ.key -> "true") { - checkAnswer( - readResourceParquetFile("test-data/before_1582_date_v2_4.snappy.parquet"), - Row(java.sql.Date.valueOf("1001-01-01"))) - checkAnswer(readResourceParquetFile( - "test-data/before_1582_timestamp_micros_v2_4.snappy.parquet"), - Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456"))) - checkAnswer(readResourceParquetFile( - "test-data/before_1582_timestamp_millis_v2_4.snappy.parquet"), - Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123"))) + // test reading the existing 2.4 files and 3.0 newly written files together. + withTempPath { path => + val path2_4 = getResourceParquetFilePath( + "test-data/before_1582_date_v2_4.snappy.parquet") + val path3_0 = path.getCanonicalPath + val dateStr = "1001-01-01" + Seq(dateStr).toDF("str").select($"str".cast("date").as("date")) + .write.parquet(path3_0) + checkAnswer( + spark.read.format("parquet").load(path2_4, path3_0), + Seq( + Row(java.sql.Date.valueOf(dateStr)), + Row(java.sql.Date.valueOf(dateStr)))) + } + + withTempPath { path => + withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "TIMESTAMP_MICROS") { + val path2_4 = getResourceParquetFilePath( + "test-data/before_1582_timestamp_micros_v2_4.snappy.parquet") + val path3_0 = path.getCanonicalPath + val tsStr = "1001-01-01 01:02:03.123456" + Seq(tsStr).toDF("str").select($"str".cast("timestamp").as("ts")) + .write.parquet(path3_0) + checkAnswer( + spark.read.format("parquet").load(path2_4, path3_0), + Seq( + Row(java.sql.Timestamp.valueOf(tsStr)), + Row(java.sql.Timestamp.valueOf(tsStr)))) + } + } + + withTempPath { path => + withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "TIMESTAMP_MILLIS") { + val path2_4 = getResourceParquetFilePath( + "test-data/before_1582_timestamp_millis_v2_4.snappy.parquet") + val path3_0 = path.getCanonicalPath + val tsStr = "1001-01-01 01:02:03.123" + Seq(tsStr).toDF("str").select($"str".cast("timestamp").as("ts")) + .write.parquet(path3_0) + checkAnswer( + spark.read.format("parquet").load(path2_4, path3_0), + Seq( + Row(java.sql.Timestamp.valueOf(tsStr)), + Row(java.sql.Timestamp.valueOf(tsStr)))) + } + } } + + // INT96 is a legacy timestamp format and we always rebase the seconds for it. checkAnswer(readResourceParquetFile( "test-data/before_1582_timestamp_int96_v2_4.snappy.parquet"), Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456"))) @@ -918,10 +958,17 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession .write .parquet(path) } - withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ.key -> "true") { - checkAnswer(spark.read.parquet(path), Row(Timestamp.valueOf(tsStr))) + // The file metadata indicates if it needs rebase or not, so we can always get the + // correct result regardless of the "rebaseInRead" config. + Seq(true, false).foreach { rebase => + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> rebase.toString) { + checkAnswer(spark.read.parquet(path), Row(Timestamp.valueOf(tsStr))) + } } - withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ.key -> "false") { + + // Force to not rebase to prove the written datetime values are rebased and we will get + // wrong result if we don't rebase while reading. + withSQLConf("spark.test.forceNoRebase" -> "true") { checkAnswer(spark.read.parquet(path), Row(Timestamp.valueOf(nonRebased))) } } @@ -939,10 +986,18 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession .write .parquet(path) } - withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ.key -> "true") { - checkAnswer(spark.read.parquet(path), Row(Date.valueOf("1001-01-01"))) + + // The file metadata indicates if it needs rebase or not, so we can always get the correct + // result regardless of the "rebaseInRead" config. + Seq(true, false).foreach { rebase => + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> rebase.toString) { + checkAnswer(spark.read.parquet(path), Row(Date.valueOf("1001-01-01"))) + } } - withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ.key -> "false") { + + // Force to not rebase to prove the written datetime values are rebased and we will get + // wrong result if we don't rebase while reading. + withSQLConf("spark.test.forceNoRebase" -> "true") { checkAnswer(spark.read.parquet(path), Row(Date.valueOf("1001-01-07"))) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala index f2dbc536ac56..c833d5f1ab1f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala @@ -156,7 +156,10 @@ private[sql] trait ParquetTest extends FileBasedDataSourceTest { } protected def readResourceParquetFile(name: String): DataFrame = { - val url = Thread.currentThread().getContextClassLoader.getResource(name) - spark.read.parquet(url.toString) + spark.read.parquet(getResourceParquetFilePath(name)) + } + + protected def getResourceParquetFilePath(name: String): String = { + Thread.currentThread().getContextClassLoader.getResource(name).toString } } From 6905247094d22e939d9bd24177934573cc04be99 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 21 Apr 2020 15:52:03 +0800 Subject: [PATCH 2/2] address comments --- .../org/apache/spark/sql/avro/AvroSuite.scala | 98 ++++++++----------- .../parquet/ParquetRecordMaterializer.scala | 2 +- .../parquet/ParquetRowConverter.scala | 2 +- .../scala/org/apache/spark/sql/package.scala | 2 +- .../datasources/parquet/ParquetIOSuite.scala | 84 ++++++++-------- 5 files changed, 84 insertions(+), 104 deletions(-) diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index cf8e2bd7d561..3e754f02911d 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -1529,64 +1529,50 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession { } test("SPARK-31183: compatibility with Spark 2.4 in reading dates/timestamps") { - withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> "true") { - // test reading the existing 2.4 files and 3.0 newly written files together. - withTempPath { path => - val path2_4 = getResourceAvroFilePath("before_1582_date_v2_4.avro") - val path3_0 = path.getCanonicalPath - val dateStr = "1001-01-01" - Seq(dateStr).toDF("str").select($"str".cast("date").as("date")) - .write.format("avro").save(path3_0) - checkAnswer( - spark.read.format("avro").load(path2_4, path3_0), - Seq( - Row(java.sql.Date.valueOf(dateStr)), - Row(java.sql.Date.valueOf(dateStr)))) - } - - withTempPath { path => - val path2_4 = getResourceAvroFilePath("before_1582_ts_micros_v2_4.avro") - val path3_0 = path.getCanonicalPath - val avroSchema = - """ - |{ - | "type" : "record", - | "name" : "test_schema", - | "fields" : [ - | {"name": "ts", "type": {"type": "long", "logicalType": "timestamp-micros"}} - | ] - |}""".stripMargin - val tsStr = "1001-01-01 01:02:03.123456" - Seq(tsStr).toDF("str").select($"str".cast("timestamp").as("ts")) - .write.format("avro").option("avroSchema", avroSchema).save(path3_0) - checkAnswer( - spark.read.format("avro").load(path2_4, path3_0), - Seq( - Row(java.sql.Timestamp.valueOf(tsStr)), - Row(java.sql.Timestamp.valueOf(tsStr)))) + // test reading the existing 2.4 files and new 3.0 files (with rebase on/off) together. + def checkReadMixedFiles(fileName: String, dt: String, dataStr: String): Unit = { + withTempPaths(2) { paths => + paths.foreach(_.delete()) + val path2_4 = getResourceAvroFilePath(fileName) + val path3_0 = paths(0).getCanonicalPath + val path3_0_rebase = paths(1).getCanonicalPath + if (dt == "date") { + val df = Seq(dataStr).toDF("str").select($"str".cast("date").as("date")) + df.write.format("avro").save(path3_0) + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE.key -> "true") { + df.write.format("avro").save(path3_0_rebase) + } + checkAnswer( + spark.read.format("avro").load(path2_4, path3_0, path3_0_rebase), + 1.to(3).map(_ => Row(java.sql.Date.valueOf(dataStr)))) + } else { + val df = Seq(dataStr).toDF("str").select($"str".cast("timestamp").as("ts")) + val avroSchema = + s""" + |{ + | "type" : "record", + | "name" : "test_schema", + | "fields" : [ + | {"name": "ts", "type": {"type": "long", "logicalType": "$dt"}} + | ] + |}""".stripMargin + df.write.format("avro").option("avroSchema", avroSchema).save(path3_0) + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE.key -> "true") { + df.write.format("avro").option("avroSchema", avroSchema).save(path3_0_rebase) + } + checkAnswer( + spark.read.format("avro").load(path2_4, path3_0, path3_0_rebase), + 1.to(3).map(_ => Row(java.sql.Timestamp.valueOf(dataStr)))) + } } + } - withTempPath { path => - val path2_4 = getResourceAvroFilePath("before_1582_ts_millis_v2_4.avro") - val path3_0 = path.getCanonicalPath - val avroSchema = - """ - |{ - | "type" : "record", - | "name" : "test_schema", - | "fields" : [ - | {"name": "ts", "type": {"type": "long", "logicalType": "timestamp-millis"}} - | ] - |}""".stripMargin - val tsStr = "1001-01-01 01:02:03.124" - Seq(tsStr).toDF("str").select($"str".cast("timestamp").as("ts")) - .write.format("avro").option("avroSchema", avroSchema).save(path3_0) - checkAnswer( - spark.read.format("avro").load(path2_4, path3_0), - Seq( - Row(java.sql.Timestamp.valueOf(tsStr)), - Row(java.sql.Timestamp.valueOf(tsStr)))) - } + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> "true") { + checkReadMixedFiles("before_1582_date_v2_4.avro", "date", "1001-01-01") + checkReadMixedFiles( + "before_1582_ts_micros_v2_4.avro", "timestamp-micros", "1001-01-01 01:02:03.123456") + checkReadMixedFiles( + "before_1582_ts_millis_v2_4.avro", "timestamp-millis", "1001-01-01 01:02:03.124") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala index ba7511c5b8ae..ec037130aa7e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.types.StructType * @param parquetSchema Parquet schema of the records to be read * @param catalystSchema Catalyst schema of the rows to be constructed * @param schemaConverter A Parquet-Catalyst schema converter that helps initializing row converters - * @param convertTz the optional time zone to convert to for int96 data + * @param convertTz the optional time zone to convert to int96 data * @param rebaseDateTime true if need to rebase date/timestamp from Julian to Proleptic Gregorian * calendar */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index 705995fee33f..8376b7b137ae 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -120,7 +120,7 @@ private[parquet] class ParquetPrimitiveConverter(val updater: ParentContainerUpd * @param parquetType Parquet schema of Parquet records * @param catalystType Spark SQL schema that corresponds to the Parquet record type. User-defined * types should have been expanded. - * @param convertTz the optional time zone to convert to for int96 data + * @param convertTz the optional time zone to convert to int96 data * @param rebaseDateTime true if need to rebase date/timestamp from Julian to Proleptic Gregorian * calendar * @param updater An updater which propagates converted field values to the parent container diff --git a/sql/core/src/main/scala/org/apache/spark/sql/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/package.scala index 18975c52d0bd..c0397010acba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/package.scala @@ -59,5 +59,5 @@ package object sql { * Parquet/Avro file metadata key to indicate that the file was written with legacy datetime * values. */ - private[sql] val SPARK_LEGACY_DATETIME = "org.apache.spark.legacyDatetime" + private[sql] val SPARK_LEGACY_DATETIME = "org.apache.spark.legacyDateTime" } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index b79f019c9a0c..239db7d4f30d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -883,55 +883,49 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } test("SPARK-31159: compatibility with Spark 2.4 in reading dates/timestamps") { - Seq(false, true).foreach { vectorized => - withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString) { - withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ.key -> "true") { - // test reading the existing 2.4 files and 3.0 newly written files together. - withTempPath { path => - val path2_4 = getResourceParquetFilePath( - "test-data/before_1582_date_v2_4.snappy.parquet") - val path3_0 = path.getCanonicalPath - val dateStr = "1001-01-01" - Seq(dateStr).toDF("str").select($"str".cast("date").as("date")) - .write.parquet(path3_0) - checkAnswer( - spark.read.format("parquet").load(path2_4, path3_0), - Seq( - Row(java.sql.Date.valueOf(dateStr)), - Row(java.sql.Date.valueOf(dateStr)))) + // test reading the existing 2.4 files and new 3.0 files (with rebase on/off) together. + def checkReadMixedFiles(fileName: String, dt: String, dataStr: String): Unit = { + withTempPaths(2) { paths => + paths.foreach(_.delete()) + val path2_4 = getResourceParquetFilePath("test-data/" + fileName) + val path3_0 = paths(0).getCanonicalPath + val path3_0_rebase = paths(1).getCanonicalPath + if (dt == "date") { + val df = Seq(dataStr).toDF("str").select($"str".cast("date").as("date")) + df.write.parquet(path3_0) + withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE.key -> "true") { + df.write.parquet(path3_0_rebase) } - - withTempPath { path => - withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "TIMESTAMP_MICROS") { - val path2_4 = getResourceParquetFilePath( - "test-data/before_1582_timestamp_micros_v2_4.snappy.parquet") - val path3_0 = path.getCanonicalPath - val tsStr = "1001-01-01 01:02:03.123456" - Seq(tsStr).toDF("str").select($"str".cast("timestamp").as("ts")) - .write.parquet(path3_0) - checkAnswer( - spark.read.format("parquet").load(path2_4, path3_0), - Seq( - Row(java.sql.Timestamp.valueOf(tsStr)), - Row(java.sql.Timestamp.valueOf(tsStr)))) + checkAnswer( + spark.read.format("parquet").load(path2_4, path3_0, path3_0_rebase), + 1.to(3).map(_ => Row(java.sql.Date.valueOf(dataStr)))) + } else { + val df = Seq(dataStr).toDF("str").select($"str".cast("timestamp").as("ts")) + withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> dt) { + df.write.parquet(path3_0) + withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE.key -> "true") { + df.write.parquet(path3_0_rebase) } } + checkAnswer( + spark.read.format("parquet").load(path2_4, path3_0, path3_0_rebase), + 1.to(3).map(_ => Row(java.sql.Timestamp.valueOf(dataStr)))) + } + } + } - withTempPath { path => - withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "TIMESTAMP_MILLIS") { - val path2_4 = getResourceParquetFilePath( - "test-data/before_1582_timestamp_millis_v2_4.snappy.parquet") - val path3_0 = path.getCanonicalPath - val tsStr = "1001-01-01 01:02:03.123" - Seq(tsStr).toDF("str").select($"str".cast("timestamp").as("ts")) - .write.parquet(path3_0) - checkAnswer( - spark.read.format("parquet").load(path2_4, path3_0), - Seq( - Row(java.sql.Timestamp.valueOf(tsStr)), - Row(java.sql.Timestamp.valueOf(tsStr)))) - } - } + Seq(false, true).foreach { vectorized => + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString) { + withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ.key -> "true") { + checkReadMixedFiles("before_1582_date_v2_4.snappy.parquet", "date", "1001-01-01") + checkReadMixedFiles( + "before_1582_timestamp_micros_v2_4.snappy.parquet", + "TIMESTAMP_MICROS", + "1001-01-01 01:02:03.123456") + checkReadMixedFiles( + "before_1582_timestamp_millis_v2_4.snappy.parquet", + "TIMESTAMP_MILLIS", + "1001-01-01 01:02:03.123") } // INT96 is a legacy timestamp format and we always rebase the seconds for it.