Skip to content

Commit 5a70fa8

Browse files
cloud-fanHyukjinKwon
authored andcommitted
[SPARK-31361][SQL] Rebase datetime in parquet/avro according to file metadata
### What changes were proposed in this pull request? This PR adds a new parquet/avro file metadata: `org.apache.spark.legacyDatetime`. It indicates that the file was written with the "rebaseInWrite" config enabled, and spark need to do rebase when reading it. This makes Spark be able to do rebase more smartly: 1. If we don't know which Spark version writes the file, do rebase if the "rebaseInRead" config is true. 2. If the file was written by Spark 2.4 and earlier, then do rebase. 3. If the file was written by Spark 3.0 and later, do rebase if the `org.apache.spark.legacyDatetime` exists in file metadata. ### Why are the changes needed? It's very easy to have mixed-calendar parquet/avro files: e.g. A user upgrades to Spark 3.0 and writes some parquet files to an existing directory. Then he realizes that the directory contains legacy datetime values before 1582. However, it's too late and he has to find out all the legacy files manually and read them separately. To support mixed-calendar parquet/avro files, we need to decide to rebase or not based on the file metadata. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Updated test Closes #28137 from cloud-fan/datetime. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org> (cherry picked from commit a5ebbac) Signed-off-by: HyukjinKwon <gurwls223@apache.org>
1 parent 756e85e commit 5a70fa8

File tree

21 files changed

+299
-105
lines changed

21 files changed

+299
-105
lines changed

external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,14 @@ import org.apache.spark.unsafe.types.UTF8String
4141
/**
4242
* A deserializer to deserialize data in avro format to data in catalyst format.
4343
*/
44-
class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) {
45-
private lazy val decimalConversions = new DecimalConversion()
44+
class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType, rebaseDateTime: Boolean) {
45+
46+
def this(rootAvroType: Schema, rootCatalystType: DataType) {
47+
this(rootAvroType, rootCatalystType,
48+
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ))
49+
}
4650

47-
// Enable rebasing date/timestamp from Julian to Proleptic Gregorian calendar
48-
private val rebaseDateTime =
49-
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ)
51+
private lazy val decimalConversions = new DecimalConversion()
5052

5153
private val converter: Any => Any = rootCatalystType match {
5254
// A shortcut for empty schema.

external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ import org.apache.spark.TaskContext
3434
import org.apache.spark.internal.Logging
3535
import org.apache.spark.sql.SparkSession
3636
import org.apache.spark.sql.catalyst.InternalRow
37-
import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriterFactory, PartitionedFile}
37+
import org.apache.spark.sql.execution.datasources.{DataSourceUtils, FileFormat, OutputWriterFactory, PartitionedFile}
38+
import org.apache.spark.sql.internal.SQLConf
3839
import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
3940
import org.apache.spark.sql.types._
4041
import org.apache.spark.util.SerializableConfiguration
@@ -123,8 +124,12 @@ private[sql] class AvroFileFormat extends FileFormat
123124
reader.sync(file.start)
124125
val stop = file.start + file.length
125126

126-
val deserializer =
127-
new AvroDeserializer(userProvidedSchema.getOrElse(reader.getSchema), requiredSchema)
127+
val rebaseDateTime = DataSourceUtils.needRebaseDateTime(
128+
reader.asInstanceOf[DataFileReader[_]].getMetaString).getOrElse {
129+
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ)
130+
}
131+
val deserializer = new AvroDeserializer(
132+
userProvidedSchema.getOrElse(reader.getSchema), requiredSchema, rebaseDateTime)
128133

129134
new Iterator[InternalRow] {
130135
private[this] var completed = false

external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,10 @@ import org.apache.hadoop.io.NullWritable
2929
import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext}
3030

3131
import org.apache.spark.SPARK_VERSION_SHORT
32-
import org.apache.spark.sql.SPARK_VERSION_METADATA_KEY
32+
import org.apache.spark.sql.{SPARK_LEGACY_DATETIME, SPARK_VERSION_METADATA_KEY}
3333
import org.apache.spark.sql.catalyst.InternalRow
3434
import org.apache.spark.sql.execution.datasources.OutputWriter
35+
import org.apache.spark.sql.internal.SQLConf
3536
import org.apache.spark.sql.types._
3637

3738
// NOTE: This class is instantiated and used on executor side only, no need to be serializable.
@@ -41,16 +42,24 @@ private[avro] class AvroOutputWriter(
4142
schema: StructType,
4243
avroSchema: Schema) extends OutputWriter {
4344

45+
// Whether to rebase datetimes from Gregorian to Julian calendar in write
46+
private val rebaseDateTime: Boolean =
47+
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE)
48+
4449
// The input rows will never be null.
45-
private lazy val serializer = new AvroSerializer(schema, avroSchema, nullable = false)
50+
private lazy val serializer =
51+
new AvroSerializer(schema, avroSchema, nullable = false, rebaseDateTime)
4652

4753
/**
4854
* Overrides the couple of methods responsible for generating the output streams / files so
4955
* that the data can be correctly partitioned
5056
*/
5157
private val recordWriter: RecordWriter[AvroKey[GenericRecord], NullWritable] = {
52-
val sparkVersion = Map(SPARK_VERSION_METADATA_KEY -> SPARK_VERSION_SHORT).asJava
53-
new SparkAvroKeyOutputFormat(sparkVersion) {
58+
val fileMeta = Map(SPARK_VERSION_METADATA_KEY -> SPARK_VERSION_SHORT) ++ {
59+
if (rebaseDateTime) Some(SPARK_LEGACY_DATETIME -> "") else None
60+
}
61+
62+
new SparkAvroKeyOutputFormat(fileMeta.asJava) {
5463

5564
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
5665
new Path(path)

external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,16 @@ import org.apache.spark.sql.types._
4242
/**
4343
* A serializer to serialize data in catalyst format to data in avro format.
4444
*/
45-
class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean)
46-
extends Logging {
45+
class AvroSerializer(
46+
rootCatalystType: DataType,
47+
rootAvroType: Schema,
48+
nullable: Boolean,
49+
rebaseDateTime: Boolean) extends Logging {
4750

48-
// Whether to rebase datetimes from Gregorian to Julian calendar in write
49-
private val rebaseDateTime: Boolean =
50-
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE)
51+
def this(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean) {
52+
this(rootCatalystType, rootAvroType, nullable,
53+
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE))
54+
}
5155

5256
def serialize(catalystData: Any): Any = {
5357
converter.apply(catalystData)

external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import org.apache.spark.internal.Logging
3232
import org.apache.spark.sql.avro.{AvroDeserializer, AvroOptions}
3333
import org.apache.spark.sql.catalyst.InternalRow
3434
import org.apache.spark.sql.connector.read.PartitionReader
35-
import org.apache.spark.sql.execution.datasources.PartitionedFile
35+
import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedFile}
3636
import org.apache.spark.sql.execution.datasources.v2.{EmptyPartitionReader, FilePartitionReaderFactory, PartitionReaderWithPartitionValues}
3737
import org.apache.spark.sql.internal.SQLConf
3838
import org.apache.spark.sql.types.StructType
@@ -88,8 +88,12 @@ case class AvroPartitionReaderFactory(
8888
reader.sync(partitionedFile.start)
8989
val stop = partitionedFile.start + partitionedFile.length
9090

91-
val deserializer =
92-
new AvroDeserializer(userProvidedSchema.getOrElse(reader.getSchema), readDataSchema)
91+
val rebaseDateTime = DataSourceUtils.needRebaseDateTime(
92+
reader.asInstanceOf[DataFileReader[_]].getMetaString).getOrElse {
93+
SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ)
94+
}
95+
val deserializer = new AvroDeserializer(
96+
userProvidedSchema.getOrElse(reader.getSchema), readDataSchema, rebaseDateTime)
9397

9498
val fileReader = new PartitionReader[InternalRow] {
9599
private[this] var completed = false

external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,7 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite
288288
""".stripMargin
289289
val avroSchema = new Schema.Parser().parse(jsonFormatSchema)
290290
val dataType = SchemaConverters.toSqlType(avroSchema).dataType
291-
val deserializer = new AvroDeserializer(avroSchema, dataType)
291+
val deserializer = new AvroDeserializer(avroSchema, dataType, rebaseDateTime = false)
292292

293293
def checkDeserialization(data: GenericData.Record, expected: Any): Unit = {
294294
assert(checkResult(

external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala

Lines changed: 80 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,8 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession {
8484
}, new GenericDatumReader[Any]()).getSchema.toString(false)
8585
}
8686

87-
private def readResourceAvroFile(name: String): DataFrame = {
88-
val url = Thread.currentThread().getContextClassLoader.getResource(name)
89-
spark.read.format("avro").load(url.toString)
87+
private def getResourceAvroFilePath(name: String): String = {
88+
Thread.currentThread().getContextClassLoader.getResource(name).toString
9089
}
9190

9291
test("resolve avro data source") {
@@ -1530,16 +1529,50 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession {
15301529
}
15311530

15321531
test("SPARK-31183: compatibility with Spark 2.4 in reading dates/timestamps") {
1532+
// test reading the existing 2.4 files and new 3.0 files (with rebase on/off) together.
1533+
def checkReadMixedFiles(fileName: String, dt: String, dataStr: String): Unit = {
1534+
withTempPaths(2) { paths =>
1535+
paths.foreach(_.delete())
1536+
val path2_4 = getResourceAvroFilePath(fileName)
1537+
val path3_0 = paths(0).getCanonicalPath
1538+
val path3_0_rebase = paths(1).getCanonicalPath
1539+
if (dt == "date") {
1540+
val df = Seq(dataStr).toDF("str").select($"str".cast("date").as("date"))
1541+
df.write.format("avro").save(path3_0)
1542+
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE.key -> "true") {
1543+
df.write.format("avro").save(path3_0_rebase)
1544+
}
1545+
checkAnswer(
1546+
spark.read.format("avro").load(path2_4, path3_0, path3_0_rebase),
1547+
1.to(3).map(_ => Row(java.sql.Date.valueOf(dataStr))))
1548+
} else {
1549+
val df = Seq(dataStr).toDF("str").select($"str".cast("timestamp").as("ts"))
1550+
val avroSchema =
1551+
s"""
1552+
|{
1553+
| "type" : "record",
1554+
| "name" : "test_schema",
1555+
| "fields" : [
1556+
| {"name": "ts", "type": {"type": "long", "logicalType": "$dt"}}
1557+
| ]
1558+
|}""".stripMargin
1559+
df.write.format("avro").option("avroSchema", avroSchema).save(path3_0)
1560+
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_WRITE.key -> "true") {
1561+
df.write.format("avro").option("avroSchema", avroSchema).save(path3_0_rebase)
1562+
}
1563+
checkAnswer(
1564+
spark.read.format("avro").load(path2_4, path3_0, path3_0_rebase),
1565+
1.to(3).map(_ => Row(java.sql.Timestamp.valueOf(dataStr))))
1566+
}
1567+
}
1568+
}
1569+
15331570
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> "true") {
1534-
checkAnswer(
1535-
readResourceAvroFile("before_1582_date_v2_4.avro"),
1536-
Row(java.sql.Date.valueOf("1001-01-01")))
1537-
checkAnswer(
1538-
readResourceAvroFile("before_1582_ts_micros_v2_4.avro"),
1539-
Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456")))
1540-
checkAnswer(
1541-
readResourceAvroFile("before_1582_ts_millis_v2_4.avro"),
1542-
Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.124")))
1571+
checkReadMixedFiles("before_1582_date_v2_4.avro", "date", "1001-01-01")
1572+
checkReadMixedFiles(
1573+
"before_1582_ts_micros_v2_4.avro", "timestamp-micros", "1001-01-01 01:02:03.123456")
1574+
checkReadMixedFiles(
1575+
"before_1582_ts_millis_v2_4.avro", "timestamp-millis", "1001-01-01 01:02:03.124")
15431576
}
15441577
}
15451578

@@ -1554,10 +1587,18 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession {
15541587
.write.format("avro")
15551588
.save(path)
15561589
}
1557-
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> "true") {
1558-
checkAnswer(spark.read.format("avro").load(path), Row(Timestamp.valueOf(tsStr)))
1590+
1591+
// The file metadata indicates if it needs rebase or not, so we can always get the correct
1592+
// result regardless of the "rebaseInRead" config.
1593+
Seq(true, false).foreach { rebase =>
1594+
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> rebase.toString) {
1595+
checkAnswer(spark.read.format("avro").load(path), Row(Timestamp.valueOf(tsStr)))
1596+
}
15591597
}
1560-
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> "false") {
1598+
1599+
// Force to not rebase to prove the written datetime values are rebased and we will get
1600+
// wrong result if we don't rebase while reading.
1601+
withSQLConf("spark.test.forceNoRebase" -> "true") {
15611602
checkAnswer(spark.read.format("avro").load(path), Row(Timestamp.valueOf(nonRebased)))
15621603
}
15631604
}
@@ -1589,12 +1630,20 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession {
15891630
.format("avro")
15901631
.save(path)
15911632
}
1592-
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> "true") {
1593-
checkAnswer(
1594-
spark.read.schema("ts timestamp").format("avro").load(path),
1595-
Row(Timestamp.valueOf(rebased)))
1633+
1634+
// The file metadata indicates if it needs rebase or not, so we can always get the correct
1635+
// result regardless of the "rebaseInRead" config.
1636+
Seq(true, false).foreach { rebase =>
1637+
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> rebase.toString) {
1638+
checkAnswer(
1639+
spark.read.schema("ts timestamp").format("avro").load(path),
1640+
Row(Timestamp.valueOf(rebased)))
1641+
}
15961642
}
1597-
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> "false") {
1643+
1644+
// Force to not rebase to prove the written datetime values are rebased and we will get
1645+
// wrong result if we don't rebase while reading.
1646+
withSQLConf("spark.test.forceNoRebase" -> "true") {
15981647
checkAnswer(
15991648
spark.read.schema("ts timestamp").format("avro").load(path),
16001649
Row(Timestamp.valueOf(nonRebased)))
@@ -1612,10 +1661,18 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession {
16121661
.write.format("avro")
16131662
.save(path)
16141663
}
1615-
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> "true") {
1616-
checkAnswer(spark.read.format("avro").load(path), Row(Date.valueOf("1001-01-01")))
1664+
1665+
// The file metadata indicates if it needs rebase or not, so we can always get the correct
1666+
// result regardless of the "rebaseInRead" config.
1667+
Seq(true, false).foreach { rebase =>
1668+
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> rebase.toString) {
1669+
checkAnswer(spark.read.format("avro").load(path), Row(Date.valueOf("1001-01-01")))
1670+
}
16171671
}
1618-
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME_IN_READ.key -> "false") {
1672+
1673+
// Force to not rebase to prove the written datetime values are rebased and we will get
1674+
// wrong result if we don't rebase while reading.
1675+
withSQLConf("spark.test.forceNoRebase" -> "true") {
16191676
checkAnswer(spark.read.format("avro").load(path), Row(Date.valueOf("1001-01-07")))
16201677
}
16211678
}

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import org.apache.spark.sql.catalyst.util.RebaseDateTime;
3939
import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException;
4040
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
41-
import org.apache.spark.sql.internal.SQLConf;
4241
import org.apache.spark.sql.types.DataTypes;
4342
import org.apache.spark.sql.types.DecimalType;
4443

@@ -109,7 +108,8 @@ public VectorizedColumnReader(
109108
ColumnDescriptor descriptor,
110109
OriginalType originalType,
111110
PageReader pageReader,
112-
ZoneId convertTz) throws IOException {
111+
ZoneId convertTz,
112+
boolean rebaseDateTime) throws IOException {
113113
this.descriptor = descriptor;
114114
this.pageReader = pageReader;
115115
this.convertTz = convertTz;
@@ -132,7 +132,7 @@ public VectorizedColumnReader(
132132
if (totalValueCount == 0) {
133133
throw new IOException("totalValueCount == 0");
134134
}
135-
this.rebaseDateTime = SQLConf.get().parquetRebaseDateTimeInReadEnabled();
135+
this.rebaseDateTime = rebaseDateTime;
136136
}
137137

138138
/**

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,12 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
8686
* The timezone that timestamp INT96 values should be converted to. Null if no conversion. Here to
8787
* workaround incompatibilities between different engines when writing timestamp values.
8888
*/
89-
private ZoneId convertTz = null;
89+
private final ZoneId convertTz;
90+
91+
/**
92+
* true if need to rebase date/timestamp from Julian to Proleptic Gregorian calendar.
93+
*/
94+
private final boolean rebaseDateTime;
9095

9196
/**
9297
* columnBatch object that is used for batch decoding. This is created on first use and triggers
@@ -116,12 +121,19 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
116121
*/
117122
private final MemoryMode MEMORY_MODE;
118123

119-
public VectorizedParquetRecordReader(ZoneId convertTz, boolean useOffHeap, int capacity) {
124+
public VectorizedParquetRecordReader(
125+
ZoneId convertTz, boolean rebaseDateTime, boolean useOffHeap, int capacity) {
120126
this.convertTz = convertTz;
127+
this.rebaseDateTime = rebaseDateTime;
121128
MEMORY_MODE = useOffHeap ? MemoryMode.OFF_HEAP : MemoryMode.ON_HEAP;
122129
this.capacity = capacity;
123130
}
124131

132+
// For test only.
133+
public VectorizedParquetRecordReader(boolean useOffHeap, int capacity) {
134+
this(null, false, useOffHeap, capacity);
135+
}
136+
125137
/**
126138
* Implementation of RecordReader API.
127139
*/
@@ -309,7 +321,7 @@ private void checkEndOfRowGroup() throws IOException {
309321
for (int i = 0; i < columns.size(); ++i) {
310322
if (missingColumns[i]) continue;
311323
columnReaders[i] = new VectorizedColumnReader(columns.get(i), types.get(i).getOriginalType(),
312-
pages.getPageReader(columns.get(i)), convertTz);
324+
pages.getPageReader(columns.get(i)), convertTz, rebaseDateTime);
313325
}
314326
totalCountLoadedSoFar += pages.getRowCount();
315327
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,11 @@ import org.apache.hadoop.fs.Path
2121
import org.json4s.NoTypeHints
2222
import org.json4s.jackson.Serialization
2323

24+
import org.apache.spark.sql.{SPARK_LEGACY_DATETIME, SPARK_VERSION_METADATA_KEY}
2425
import org.apache.spark.sql.AnalysisException
26+
import org.apache.spark.sql.internal.SQLConf
2527
import org.apache.spark.sql.types._
28+
import org.apache.spark.util.Utils
2629

2730

2831
object DataSourceUtils {
@@ -64,4 +67,18 @@ object DataSourceUtils {
6467

6568
private[sql] def isDataFile(fileName: String) =
6669
!(fileName.startsWith("_") || fileName.startsWith("."))
70+
71+
def needRebaseDateTime(lookupFileMeta: String => String): Option[Boolean] = {
72+
if (Utils.isTesting && SQLConf.get.getConfString("spark.test.forceNoRebase", "") == "true") {
73+
return Some(false)
74+
}
75+
// If there is no version, we return None and let the caller side to decide.
76+
Option(lookupFileMeta(SPARK_VERSION_METADATA_KEY)).map { version =>
77+
// Files written by Spark 2.4 and earlier follow the legacy hybrid calendar and we need to
78+
// rebase the datetime values.
79+
// Files written by Spark 3.0 and latter may also need the rebase if they were written with
80+
// the "rebaseInWrite" config enabled.
81+
version < "3.0.0" || lookupFileMeta(SPARK_LEGACY_DATETIME) != null
82+
}
83+
}
6784
}

0 commit comments

Comments
 (0)