diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 161be053b296..d672f4e97aaa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -21,7 +21,7 @@ import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} import java.time._ import java.time.temporal.{ChronoField, ChronoUnit, IsoFields} -import java.util.{Locale, TimeZone} +import java.util.{Calendar, Locale, TimeZone} import java.util.concurrent.TimeUnit._ import scala.util.control.NonFatal @@ -148,7 +148,9 @@ object DateTimeUtils { def fromJulianDay(day: Int, nanoseconds: Long): SQLTimestamp = { // use Long to avoid rounding errors val seconds = (day - JULIAN_DAY_OF_EPOCH).toLong * SECONDS_PER_DAY - SECONDS.toMicros(seconds) + NANOSECONDS.toMicros(nanoseconds) + val micros = SECONDS.toMicros(seconds) + NANOSECONDS.toMicros(nanoseconds) + val rebased = rebaseJulianToGregorianMicros(micros) + rebased } /** @@ -157,7 +159,7 @@ object DateTimeUtils { * Note: support timestamp since 4717 BC (without negative nanoseconds, compatible with Hive). */ def toJulianDay(us: SQLTimestamp): (Int, Long) = { - val julian_us = us + JULIAN_DAY_OF_EPOCH * MICROS_PER_DAY + val julian_us = rebaseGregorianToJulianMicros(us) + JULIAN_DAY_OF_EPOCH * MICROS_PER_DAY val day = julian_us / MICROS_PER_DAY val micros = julian_us % MICROS_PER_DAY (day.toInt, MICROSECONDS.toNanos(micros)) @@ -936,4 +938,102 @@ object DateTimeUtils { val days = period.getDays new CalendarInterval(months, days, 0) } + + /** + * Converts the given microseconds to a local date-time in UTC time zone in Proleptic Gregorian + * calendar, interprets the result as a local date-time in Julian calendar in UTC time zone. + * And takes microseconds since the epoch from the Julian timestamp. + * + * @param micros The number of microseconds since the epoch '1970-01-01T00:00:00Z'. + * @return The rebased microseconds since the epoch in Julian calendar. + */ + def rebaseGregorianToJulianMicros(micros: Long): Long = { + val ldt = microsToInstant(micros).atZone(ZoneId.systemDefault).toLocalDateTime + val cal = new Calendar.Builder() + // `gregory` is a hybrid calendar that supports both + // the Julian and Gregorian calendar systems + .setCalendarType("gregory") + .setDate(ldt.getYear, ldt.getMonthValue - 1, ldt.getDayOfMonth) + .setTimeOfDay(ldt.getHour, ldt.getMinute, ldt.getSecond) + .build() + millisToMicros(cal.getTimeInMillis) + ldt.get(ChronoField.MICRO_OF_SECOND) + } + + /** + * Converts the given microseconds to a local date-time in UTC time zone in Julian calendar, + * interprets the result as a local date-time in Proleptic Gregorian calendar in UTC time zone. + * And takes microseconds since the epoch from the Gregorian timestamp. + * + * @param micros The number of microseconds since the epoch '1970-01-01T00:00:00Z'. + * @return The rebased microseconds since the epoch in Proleptic Gregorian calendar. + */ + def rebaseJulianToGregorianMicros(micros: Long): Long = { + val cal = new Calendar.Builder() + // `gregory` is a hybrid calendar that supports both + // the Julian and Gregorian calendar systems + .setCalendarType("gregory") + .setInstant(microsToMillis(micros)) + .build() + val localDateTime = LocalDateTime.of( + cal.get(Calendar.YEAR), + cal.get(Calendar.MONTH) + 1, + cal.get(Calendar.DAY_OF_MONTH), + cal.get(Calendar.HOUR_OF_DAY), + cal.get(Calendar.MINUTE), + cal.get(Calendar.SECOND), + (Math.floorMod(micros, MICROS_PER_SECOND) * NANOS_PER_MICROS).toInt) + instantToMicros(localDateTime.atZone(ZoneId.systemDefault).toInstant) + } + + /** + * Converts the given number of days since the epoch day 1970-01-01 to + * a local date in Julian calendar, interprets the result as a local + * date in Proleptic Gregorian calendar, and take the number of days + * since the epoch from the Gregorian date. + * + * @param days The number of days since the epoch in Julian calendar. + * @return The rebased number of days in Gregorian calendar. + */ + def rebaseJulianToGregorianDays(days: Int): Int = { + val utcCal = new Calendar.Builder() + // `gregory` is a hybrid calendar that supports both + // the Julian and Gregorian calendar systems + .setCalendarType("gregory") + .setTimeZone(TimeZoneUTC) + .setInstant(Math.multiplyExact(days, MILLIS_PER_DAY)) + .build() + val localDate = LocalDate.of( + utcCal.get(Calendar.YEAR), + utcCal.get(Calendar.MONTH) + 1, + utcCal.get(Calendar.DAY_OF_MONTH)) + Math.toIntExact(localDate.toEpochDay) + } + + /** + * Rebasing days since the epoch to store the same number of days + * as by Spark 2.4 and earlier versions. Spark 3.0 switched to + * Proleptic Gregorian calendar (see SPARK-26651), and as a consequence of that, + * this affects dates before 1582-10-15. Spark 2.4 and earlier versions use + * Julian calendar for dates before 1582-10-15. So, the same local date may + * be mapped to different number of days since the epoch in different calendars. + * + * For example: + * Proleptic Gregorian calendar: 1582-01-01 -> -141714 + * Julian calendar: 1582-01-01 -> -141704 + * The code below converts -141714 to -141704. + * + * @param days The number of days since the epoch 1970-01-01. It can be negative. + * @return The rebased number of days since the epoch in Julian calendar. + */ + def rebaseGregorianToJulianDays(days: Int): Int = { + val localDate = LocalDate.ofEpochDay(days) + val utcCal = new Calendar.Builder() + // `gregory` is a hybrid calendar that supports both + // the Julian and Gregorian calendar systems + .setCalendarType("gregory") + .setTimeZone(TimeZoneUTC) + .setDate(localDate.getYear, localDate.getMonthValue - 1, localDate.getDayOfMonth) + .build() + Math.toIntExact(Math.floorDiv(utcCal.getTimeInMillis, MILLIS_PER_DAY)) + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 1cd2fcf28e44..1ccbd3573e77 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2487,6 +2487,20 @@ object SQLConf { .booleanConf .createWithDefault(false) + val LEGACY_PARQUET_REBASE_DATETIME = + buildConf("spark.sql.legacy.parquet.rebaseDateTime.enabled") + .internal() + .doc("When true, rebase dates/timestamps from Proleptic Gregorian calendar " + + "to the hybrid calendar (Julian + Gregorian) in write and " + + "from the hybrid calendar to Proleptic Gregorian calendar in read. " + + "The rebasing is performed by converting micros/millis/days to " + + "a local date/timestamp in the source calendar, interpreting the resulted date/" + + "timestamp in the target calendar, and getting the number of micros/millis/days " + + "since the epoch 1970-01-01 00:00:00Z.") + .version("3.0.0") + .booleanConf + .createWithDefault(false) + /** * Holds information about keys that have been deprecated. * @@ -3064,6 +3078,8 @@ class SQLConf extends Serializable with Logging { def integerGroupingIdEnabled: Boolean = getConf(SQLConf.LEGACY_INTEGER_GROUPING_ID) + def parquetRebaseDateTimeEnabled: Boolean = getConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala index b3ad168bb051..39caa63df12f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala @@ -670,4 +670,64 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper { assert(toDate("tomorrow CET ", zoneId).get === today + 1) } } + + test("rebase julian to/from gregorian micros") { + outstandingTimezones.foreach { timeZone => + withDefaultTimeZone(timeZone) { + Seq( + "0001-01-01 01:02:03.654321", + "1000-01-01 03:02:01.123456", + "1582-10-04 00:00:00.000000", + "1582-10-15 00:00:00.999999", // Gregorian cutover day + "1883-11-10 00:00:00.000000", // America/Los_Angeles -7:52:58 zone offset + "1883-11-20 00:00:00.000000", // America/Los_Angeles -08:00 zone offset + "1969-12-31 11:22:33.000100", + "1970-01-01 00:00:00.000001", // The epoch day + "2020-03-14 09:33:01.500000").foreach { ts => + withClue(s"time zone = ${timeZone.getID} ts = $ts") { + val julianTs = Timestamp.valueOf(ts) + val julianMicros = millisToMicros(julianTs.getTime) + + ((julianTs.getNanos / NANOS_PER_MICROS) % MICROS_PER_MILLIS) + val gregorianMicros = instantToMicros(LocalDateTime.parse(ts.replace(' ', 'T')) + .atZone(timeZone.toZoneId) + .toInstant) + + assert(rebaseJulianToGregorianMicros(julianMicros) === gregorianMicros) + assert(rebaseGregorianToJulianMicros(gregorianMicros) === julianMicros) + } + } + } + } + } + + test("rebase gregorian to/from julian days") { + // millisToDays() and fromJavaDate() are taken from Spark 2.4 + def millisToDays(millisUtc: Long, timeZone: TimeZone): Int = { + val millisLocal = millisUtc + timeZone.getOffset(millisUtc) + Math.floor(millisLocal.toDouble / MILLIS_PER_DAY).toInt + } + def fromJavaDate(date: Date): Int = { + millisToDays(date.getTime, defaultTimeZone()) + } + outstandingTimezones.foreach { timeZone => + withDefaultTimeZone(timeZone) { + Seq( + "0001-01-01", + "1000-01-01", + "1582-10-04", + "1582-10-15", // Gregorian cutover day + "1883-11-10", // America/Los_Angeles -7:52:58 zone offset + "1883-11-20", // America/Los_Angeles -08:00 zone offset + "1969-12-31", + "1970-01-01", // The epoch day + "2020-03-14").foreach { date => + val julianDays = fromJavaDate(Date.valueOf(date)) + val gregorianDays = localDateToDays(LocalDate.parse(date)) + + assert(rebaseGregorianToJulianDays(gregorianDays) === julianDays) + assert(rebaseJulianToGregorianDays(julianDays) === gregorianDays) + } + } + } + } } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java index 6fccb629a381..13225b2f1ebb 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java @@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils; import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException; import org.apache.spark.sql.execution.vectorized.WritableColumnVector; +import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.DecimalType; @@ -101,6 +102,7 @@ public class VectorizedColumnReader { // The timezone conversion to apply to int96 timestamps. Null if no conversion. private final ZoneId convertTz; private static final ZoneId UTC = ZoneOffset.UTC; + private final boolean rebaseDateTime; public VectorizedColumnReader( ColumnDescriptor descriptor, @@ -129,6 +131,7 @@ public VectorizedColumnReader( if (totalValueCount == 0) { throw new IOException("totalValueCount == 0"); } + this.rebaseDateTime = SQLConf.get().parquetRebaseDateTimeEnabled(); } /** @@ -407,7 +410,7 @@ private void readBooleanBatch(int rowId, int num, WritableColumnVector column) private void readIntBatch(int rowId, int num, WritableColumnVector column) throws IOException { // This is where we implement support for the valid type conversions. // TODO: implement remaining type conversions - if (column.dataType() == DataTypes.IntegerType || column.dataType() == DataTypes.DateType || + if (column.dataType() == DataTypes.IntegerType || DecimalType.is32BitDecimalType(column.dataType())) { defColumn.readIntegers( num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); @@ -417,6 +420,21 @@ private void readIntBatch(int rowId, int num, WritableColumnVector column) throw } else if (column.dataType() == DataTypes.ShortType) { defColumn.readShorts( num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); + } else if (column.dataType() == DataTypes.DateType ) { + if (rebaseDateTime) { + for (int i = 0; i < num; i++) { + if (defColumn.readInteger() == maxDefLevel) { + column.putInt( + rowId + i, + DateTimeUtils.rebaseJulianToGregorianDays(dataColumn.readInteger())); + } else { + column.putNull(rowId + i); + } + } + } else { + defColumn.readIntegers( + num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); + } } else { throw constructConvertNotSupportedException(descriptor, column); } @@ -425,14 +443,32 @@ private void readIntBatch(int rowId, int num, WritableColumnVector column) throw private void readLongBatch(int rowId, int num, WritableColumnVector column) throws IOException { // This is where we implement support for the valid type conversions. if (column.dataType() == DataTypes.LongType || - DecimalType.is64BitDecimalType(column.dataType()) || - originalType == OriginalType.TIMESTAMP_MICROS) { + DecimalType.is64BitDecimalType(column.dataType())) { defColumn.readLongs( num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); + } else if (originalType == OriginalType.TIMESTAMP_MICROS) { + if (rebaseDateTime) { + for (int i = 0; i < num; i++) { + if (defColumn.readInteger() == maxDefLevel) { + column.putLong( + rowId + i, + DateTimeUtils.rebaseJulianToGregorianMicros(dataColumn.readLong())); + } else { + column.putNull(rowId + i); + } + } + } else { + defColumn.readLongs( + num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); + } } else if (originalType == OriginalType.TIMESTAMP_MILLIS) { for (int i = 0; i < num; i++) { if (defColumn.readInteger() == maxDefLevel) { - column.putLong(rowId + i, DateTimeUtils.millisToMicros(dataColumn.readLong())); + long micros = DateTimeUtils.millisToMicros(dataColumn.readLong()); + if (rebaseDateTime) { + micros = DateTimeUtils.rebaseJulianToGregorianMicros(micros); + } + column.putLong(rowId + i, micros); } else { column.putNull(rowId + i); } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index e4f7d77ac731..93264854fd49 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -130,6 +130,9 @@ private[parquet] class ParquetRowConverter( updater: ParentContainerUpdater) extends ParquetGroupConverter(updater) with Logging { + // Enable rebasing date/timestamp from Julian to Proleptic Gregorian calendar + private val rebaseDateTime = SQLConf.get.parquetRebaseDateTimeEnabled + assert( parquetType.getFieldCount <= catalystType.length, s"""Field count of the Parquet schema is greater than the field count of the Catalyst schema: @@ -271,16 +274,35 @@ private[parquet] class ParquetRowConverter( new ParquetStringConverter(updater) case TimestampType if parquetType.getOriginalType == OriginalType.TIMESTAMP_MICROS => - new ParquetPrimitiveConverter(updater) { - override def addLong(value: Long): Unit = { - updater.setLong(value) + if (rebaseDateTime) { + new ParquetPrimitiveConverter(updater) { + override def addLong(value: Long): Unit = { + val rebased = DateTimeUtils.rebaseJulianToGregorianMicros(value) + updater.setLong(rebased) + } + } + } else { + new ParquetPrimitiveConverter(updater) { + override def addLong(value: Long): Unit = { + updater.setLong(value) + } } } case TimestampType if parquetType.getOriginalType == OriginalType.TIMESTAMP_MILLIS => - new ParquetPrimitiveConverter(updater) { - override def addLong(value: Long): Unit = { - updater.setLong(DateTimeUtils.millisToMicros(value)) + if (rebaseDateTime) { + new ParquetPrimitiveConverter(updater) { + override def addLong(value: Long): Unit = { + val micros = DateTimeUtils.millisToMicros(value) + val rebased = DateTimeUtils.rebaseJulianToGregorianMicros(micros) + updater.setLong(rebased) + } + } + } else { + new ParquetPrimitiveConverter(updater) { + override def addLong(value: Long): Unit = { + updater.setLong(DateTimeUtils.millisToMicros(value)) + } } } @@ -305,10 +327,17 @@ private[parquet] class ParquetRowConverter( } case DateType => - new ParquetPrimitiveConverter(updater) { - override def addInt(value: Int): Unit = { - // DateType is not specialized in `SpecificMutableRow`, have to box it here. - updater.set(value.asInstanceOf[DateType#InternalType]) + if (rebaseDateTime) { + new ParquetPrimitiveConverter(updater) { + override def addInt(value: Int): Unit = { + updater.set(DateTimeUtils.rebaseJulianToGregorianDays(value)) + } + } + } else { + new ParquetPrimitiveConverter(updater) { + override def addInt(value: Int): Unit = { + updater.set(value) + } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala index bfa33ea23739..5fe0c337c081 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala @@ -77,6 +77,9 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging { private val decimalBuffer = new Array[Byte](Decimal.minBytesForPrecision(DecimalType.MAX_PRECISION)) + // Whether to rebase datetimes from Gregorian to Julian calendar in write + private val rebaseDateTime: Boolean = SQLConf.get.parquetRebaseDateTimeEnabled + override def init(configuration: Configuration): WriteContext = { val schemaString = configuration.get(ParquetWriteSupport.SPARK_ROW_SCHEMA) this.schema = StructType.fromString(schemaString) @@ -147,6 +150,11 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging { (row: SpecializedGetters, ordinal: Int) => recordConsumer.addInteger(row.getShort(ordinal)) + case DateType if rebaseDateTime => + (row: SpecializedGetters, ordinal: Int) => + val rebasedDays = DateTimeUtils.rebaseGregorianToJulianDays(row.getInt(ordinal)) + recordConsumer.addInteger(rebasedDays) + case IntegerType | DateType => (row: SpecializedGetters, ordinal: Int) => recordConsumer.addInteger(row.getInt(ordinal)) @@ -177,10 +185,21 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] with Logging { buf.order(ByteOrder.LITTLE_ENDIAN).putLong(timeOfDayNanos).putInt(julianDay) recordConsumer.addBinary(Binary.fromReusedByteArray(timestampBuffer)) + case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS if rebaseDateTime => + (row: SpecializedGetters, ordinal: Int) => + val rebasedMicros = DateTimeUtils.rebaseGregorianToJulianMicros(row.getLong(ordinal)) + recordConsumer.addLong(rebasedMicros) + case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS => (row: SpecializedGetters, ordinal: Int) => recordConsumer.addLong(row.getLong(ordinal)) + case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MILLIS if rebaseDateTime => + (row: SpecializedGetters, ordinal: Int) => + val rebasedMicros = DateTimeUtils.rebaseGregorianToJulianMicros(row.getLong(ordinal)) + val millis = DateTimeUtils.microsToMillis(rebasedMicros) + recordConsumer.addLong(millis) + case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MILLIS => (row: SpecializedGetters, ordinal: Int) => val millis = DateTimeUtils.microsToMillis(row.getLong(ordinal)) diff --git a/sql/core/src/test/resources/test-data/before_1582_date_v2_4.snappy.parquet b/sql/core/src/test/resources/test-data/before_1582_date_v2_4.snappy.parquet new file mode 100644 index 000000000000..7d5cc12eefe0 Binary files /dev/null and b/sql/core/src/test/resources/test-data/before_1582_date_v2_4.snappy.parquet differ diff --git a/sql/core/src/test/resources/test-data/before_1582_timestamp_int96_v2_4.snappy.parquet b/sql/core/src/test/resources/test-data/before_1582_timestamp_int96_v2_4.snappy.parquet new file mode 100644 index 000000000000..13254bd93a5e Binary files /dev/null and b/sql/core/src/test/resources/test-data/before_1582_timestamp_int96_v2_4.snappy.parquet differ diff --git a/sql/core/src/test/resources/test-data/before_1582_timestamp_micros_v2_4.snappy.parquet b/sql/core/src/test/resources/test-data/before_1582_timestamp_micros_v2_4.snappy.parquet new file mode 100644 index 000000000000..7d2b46e9bea4 Binary files /dev/null and b/sql/core/src/test/resources/test-data/before_1582_timestamp_micros_v2_4.snappy.parquet differ diff --git a/sql/core/src/test/resources/test-data/before_1582_timestamp_millis_v2_4.snappy.parquet b/sql/core/src/test/resources/test-data/before_1582_timestamp_millis_v2_4.snappy.parquet new file mode 100644 index 000000000000..e9825455c201 Binary files /dev/null and b/sql/core/src/test/resources/test-data/before_1582_timestamp_millis_v2_4.snappy.parquet differ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 1550b3bbb624..7f85fd2a1629 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.datasources.parquet +import java.sql.{Date, Timestamp} import java.util.Locale import scala.collection.JavaConverters._ @@ -879,6 +880,71 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession assert(metaData.get(SPARK_VERSION_METADATA_KEY) === SPARK_VERSION_SHORT) } } + + test("SPARK-31159: compatibility with Spark 2.4 in reading dates/timestamps") { + Seq(false, true).foreach { vectorized => + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString) { + withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME.key -> "true") { + checkAnswer( + readResourceParquetFile("test-data/before_1582_date_v2_4.snappy.parquet"), + Row(java.sql.Date.valueOf("1001-01-01"))) + checkAnswer(readResourceParquetFile( + "test-data/before_1582_timestamp_micros_v2_4.snappy.parquet"), + Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456"))) + checkAnswer(readResourceParquetFile( + "test-data/before_1582_timestamp_millis_v2_4.snappy.parquet"), + Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123"))) + } + checkAnswer(readResourceParquetFile( + "test-data/before_1582_timestamp_int96_v2_4.snappy.parquet"), + Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456"))) + } + } + } + + test("SPARK-31159: rebasing timestamps in write") { + Seq( + ("TIMESTAMP_MILLIS", "1001-01-01 01:02:03.123", "1001-01-07 01:09:05.123"), + ("TIMESTAMP_MICROS", "1001-01-01 01:02:03.123456", "1001-01-07 01:09:05.123456"), + ("INT96", "1001-01-01 01:02:03.123456", "1001-01-01 01:02:03.123456") + ).foreach { case (outType, tsStr, nonRebased) => + withClue(s"output type $outType") { + withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> outType) { + withTempPath { dir => + val path = dir.getAbsolutePath + withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME.key -> "true") { + Seq(tsStr).toDF("tsS") + .select($"tsS".cast("timestamp").as("ts")) + .write + .parquet(path) + + checkAnswer(spark.read.parquet(path), Row(Timestamp.valueOf(tsStr))) + } + withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME.key -> "false") { + checkAnswer(spark.read.parquet(path), Row(Timestamp.valueOf(nonRebased))) + } + } + } + } + } + } + + test("SPARK-31159: rebasing dates in write") { + withTempPath { dir => + val path = dir.getAbsolutePath + withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME.key -> "true") { + Seq("1001-01-01").toDF("dateS") + .select($"dateS".cast("date").as("date")) + .write + .parquet(path) + + checkAnswer(spark.read.parquet(path), Row(Date.valueOf("1001-01-01"))) + } + withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME.key -> "false") { + checkAnswer(spark.read.parquet(path), Row(Date.valueOf("1001-01-07"))) + } + } + } } class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)