From 23b013b2060196571ed9c9c2f373379581d5c774 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Wed, 18 Mar 2020 18:03:21 +0300 Subject: [PATCH 01/11] Add the SQL config `spark.sql.legacy.avro.rebaseDateTime.enabled` --- .../org/apache/spark/sql/internal/SQLConf.scala | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index b6282715533d2..1c57ea9990767 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2487,6 +2487,20 @@ object SQLConf { .booleanConf .createWithDefault(false) + val LEGACY_AVRO_REBASE_DATETIME = + buildConf("spark.sql.legacy.avro.rebaseDateTime.enabled") + .internal() + .doc("When true, rebase dates/timestamps from Proleptic Gregorian calendar " + + "to the hybrid calendar (Julian + Gregorian) in write and " + + "from the hybrid calendar to Proleptic Gregorian calendar in read. " + + "The rebasing is performed by converting micros/millis/days to " + + "a local date/timestamp in the source calendar, interpreting the resulted date/" + + "timestamp in the target calendar, and getting the number of micros/millis/days " + + "since the epoch 1970-01-01 00:00:00Z.") + .version("3.0.0") + .booleanConf + .createWithDefault(false) + /** * Holds information about keys that have been deprecated. * @@ -3061,6 +3075,8 @@ class SQLConf extends Serializable with Logging { def integerGroupingIdEnabled: Boolean = getConf(SQLConf.LEGACY_INTEGER_GROUPING_ID) + def avroRebaseDateTimeEnabled: Boolean = getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ From 9f1c0ea3b1c3bc987da19012550d6877a1b1245a Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Wed, 18 Mar 2020 20:26:58 +0300 Subject: [PATCH 02/11] Add test for read files saved by Spark 2.4.5 --- .../test/resources/before_1582_date_v2_4.avro | Bin 0 -> 202 bytes .../resources/before_1582_ts_micros_v2_4.avro | Bin 0 -> 218 bytes .../spark/sql/avro/AvroLogicalTypeSuite.scala | 18 +++++++++++++++++- 3 files changed, 17 insertions(+), 1 deletion(-) create mode 100644 external/avro/src/test/resources/before_1582_date_v2_4.avro create mode 100644 external/avro/src/test/resources/before_1582_ts_micros_v2_4.avro diff --git a/external/avro/src/test/resources/before_1582_date_v2_4.avro b/external/avro/src/test/resources/before_1582_date_v2_4.avro new file mode 100644 index 0000000000000000000000000000000000000000..96aa7cbf176a537ca3e845aab126eba8ed6aba7c GIT binary patch literal 202 zcmeZI%3@>@ODrqO*DFrWNXij}OQt6@q~5=&Bn8XyJ$g`p;8=9K_NbMn(OlM{17Af|(r)&dPL&B;-U zt&Ob}0lOkOKP5Gpr#LUMprDdrzHQd^1CI?3%Fp`waqSAJTqY3~9)`~|wP*dj)rKwy E0Gm2Ya{vGU literal 0 HcmV?d00001 diff --git a/external/avro/src/test/resources/before_1582_ts_micros_v2_4.avro b/external/avro/src/test/resources/before_1582_ts_micros_v2_4.avro new file mode 100644 index 0000000000000000000000000000000000000000..efe5e71a58813ecd1bb2c8105341103518c984d3 GIT binary patch literal 218 zcmeZI%3@>@ODrqO*DFrWNX<=bW2#mvsVqoUvQjEaP0lY$QPNS$OUwoFOY#eRQp-|v zf?(olnW;G`#Y$Gu)o>-nK;;naKtZTEIr(|%K;fMH^vvYMoDhiNC7HRY#U+Wk1-iMJ z$wm3aO0__HN^^3QVrye-MZgY9&QD2A<|)ogEGVdC==4kz>^@w+=(}Ubgt!%Rzca~k WDlq(?{OIYcl^1iwrcDY&R{#JKF;cAn literal 0 HcmV?d00001 diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala index 82569653c1f23..6335304892e4c 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala @@ -25,7 +25,7 @@ import org.apache.avro.file.DataFileWriter import org.apache.avro.generic.{GenericData, GenericDatumWriter, GenericRecord} import org.apache.spark.{SparkConf, SparkException} -import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.{DataFrame, QueryTest, Row} import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession @@ -348,6 +348,22 @@ abstract class AvroLogicalTypeSuite extends QueryTest with SharedSparkSession { assert(msg.contains("Unscaled value too large for precision")) } } + + private def readResourceAvroFile(name: String): DataFrame = { + val url = Thread.currentThread().getContextClassLoader.getResource(name) + spark.read.format("avro").load(url.toString) + } + + test("SPARK-31183: compatibility with Spark 2.4 in reading dates/timestamps") { + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") { + checkAnswer( + readResourceAvroFile("before_1582_date_v2_4.avro"), + Row(java.sql.Date.valueOf("1001-01-01"))) + checkAnswer(readResourceAvroFile( + "before_1582_ts_micros_v2_4.avro"), + Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456"))) + } + } } class AvroV1LogicalTypeSuite extends AvroLogicalTypeSuite { From a9b4b8aa4f13a79341feb1a8a3522801b04c04ee Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Wed, 18 Mar 2020 20:27:22 +0300 Subject: [PATCH 03/11] Copy-paste the rebase functions --- .../sql/catalyst/util/DateTimeUtils.scala | 100 +++++++++++++++++- 1 file changed, 99 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 9f207ec891e63..e425fd2524516 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -21,7 +21,7 @@ import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} import java.time._ import java.time.temporal.{ChronoField, ChronoUnit, IsoFields} -import java.util.{Locale, TimeZone} +import java.util.{Calendar, Locale, TimeZone} import java.util.concurrent.TimeUnit._ import scala.util.control.NonFatal @@ -974,4 +974,102 @@ object DateTimeUtils { } }.mkString("'") } + + /** + * Converts the given microseconds to a local date-time in UTC time zone in Proleptic Gregorian + * calendar, interprets the result as a local date-time in Julian calendar in UTC time zone. + * And takes microseconds since the epoch from the Julian timestamp. + * + * @param micros The number of microseconds since the epoch '1970-01-01T00:00:00Z'. + * @return The rebased microseconds since the epoch in Julian calendar. + */ + def rebaseGregorianToJulianMicros(micros: Long): Long = { + val ldt = microsToInstant(micros).atZone(ZoneId.systemDefault).toLocalDateTime + val cal = new Calendar.Builder() + // `gregory` is a hybrid calendar that supports both + // the Julian and Gregorian calendar systems + .setCalendarType("gregory") + .setDate(ldt.getYear, ldt.getMonthValue - 1, ldt.getDayOfMonth) + .setTimeOfDay(ldt.getHour, ldt.getMinute, ldt.getSecond) + .build() + millisToMicros(cal.getTimeInMillis) + ldt.get(ChronoField.MICRO_OF_SECOND) + } + + /** + * Converts the given microseconds to a local date-time in UTC time zone in Julian calendar, + * interprets the result as a local date-time in Proleptic Gregorian calendar in UTC time zone. + * And takes microseconds since the epoch from the Gregorian timestamp. + * + * @param micros The number of microseconds since the epoch '1970-01-01T00:00:00Z'. + * @return The rebased microseconds since the epoch in Proleptic Gregorian calendar. + */ + def rebaseJulianToGregorianMicros(micros: Long): Long = { + val cal = new Calendar.Builder() + // `gregory` is a hybrid calendar that supports both + // the Julian and Gregorian calendar systems + .setCalendarType("gregory") + .setInstant(microsToMillis(micros)) + .build() + val localDateTime = LocalDateTime.of( + cal.get(Calendar.YEAR), + cal.get(Calendar.MONTH) + 1, + cal.get(Calendar.DAY_OF_MONTH), + cal.get(Calendar.HOUR_OF_DAY), + cal.get(Calendar.MINUTE), + cal.get(Calendar.SECOND), + (Math.floorMod(micros, MICROS_PER_SECOND) * NANOS_PER_MICROS).toInt) + instantToMicros(localDateTime.atZone(ZoneId.systemDefault).toInstant) + } + + /** + * Converts the given number of days since the epoch day 1970-01-01 to + * a local date in Julian calendar, interprets the result as a local + * date in Proleptic Gregorian calendar, and take the number of days + * since the epoch from the Gregorian date. + * + * @param days The number of days since the epoch in Julian calendar. + * @return The rebased number of days in Gregorian calendar. + */ + def rebaseJulianToGregorianDays(days: Int): Int = { + val utcCal = new Calendar.Builder() + // `gregory` is a hybrid calendar that supports both + // the Julian and Gregorian calendar systems + .setCalendarType("gregory") + .setTimeZone(TimeZoneUTC) + .setInstant(Math.multiplyExact(days, MILLIS_PER_DAY)) + .build() + val localDate = LocalDate.of( + utcCal.get(Calendar.YEAR), + utcCal.get(Calendar.MONTH) + 1, + utcCal.get(Calendar.DAY_OF_MONTH)) + Math.toIntExact(localDate.toEpochDay) + } + + /** + * Rebasing days since the epoch to store the same number of days + * as by Spark 2.4 and earlier versions. Spark 3.0 switched to + * Proleptic Gregorian calendar (see SPARK-26651), and as a consequence of that, + * this affects dates before 1582-10-15. Spark 2.4 and earlier versions use + * Julian calendar for dates before 1582-10-15. So, the same local date may + * be mapped to different number of days since the epoch in different calendars. + * + * For example: + * Proleptic Gregorian calendar: 1582-01-01 -> -141714 + * Julian calendar: 1582-01-01 -> -141704 + * The code below converts -141714 to -141704. + * + * @param days The number of days since the epoch 1970-01-01. It can be negative. + * @return The rebased number of days since the epoch in Julian calendar. + */ + def rebaseGregorianToJulianDays(days: Int): Int = { + val localDate = LocalDate.ofEpochDay(days) + val utcCal = new Calendar.Builder() + // `gregory` is a hybrid calendar that supports both + // the Julian and Gregorian calendar systems + .setCalendarType("gregory") + .setTimeZone(TimeZoneUTC) + .setDate(localDate.getYear, localDate.getMonthValue - 1, localDate.getDayOfMonth) + .build() + Math.toIntExact(Math.floorDiv(utcCal.getTimeInMillis, MILLIS_PER_DAY)) + } } From de064d1ecd6d42461352daa7d30513a73adf5d9a Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Wed, 18 Mar 2020 20:27:45 +0300 Subject: [PATCH 04/11] Apply rebasing in AvroDeserializer --- .../spark/sql/avro/AvroDeserializer.scala | 34 ++++++++++++++----- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala index 2c17c16f06da7..5ec28ba6b63a3 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala @@ -32,8 +32,9 @@ import org.apache.avro.util.Utf8 import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArrayData} -import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData} +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, DateTimeUtils, GenericArrayData} import org.apache.spark.sql.catalyst.util.DateTimeConstants.MILLIS_PER_DAY +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String /** @@ -42,6 +43,9 @@ import org.apache.spark.unsafe.types.UTF8String class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) { private lazy val decimalConversions = new DecimalConversion() + // Enable rebasing date/timestamp from Julian to Proleptic Gregorian calendar + private val rebaseDateTime = SQLConf.get.avroRebaseDateTimeEnabled + private val converter: Any => Any = rootCatalystType match { // A shortcut for empty schema. case st: StructType if st.isEmpty => @@ -88,6 +92,11 @@ class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) { case (INT, IntegerType) => (updater, ordinal, value) => updater.setInt(ordinal, value.asInstanceOf[Int]) + case (INT, DateType) if rebaseDateTime => (updater, ordinal, value) => + val days = value.asInstanceOf[Int] + val rebasedDays = DateTimeUtils.rebaseJulianToGregorianDays(days) + updater.setInt(ordinal, rebasedDays) + case (INT, DateType) => (updater, ordinal, value) => updater.setInt(ordinal, value.asInstanceOf[Int]) @@ -95,14 +104,23 @@ class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) { updater.setLong(ordinal, value.asInstanceOf[Long]) case (LONG, TimestampType) => avroType.getLogicalType match { - case _: TimestampMillis => (updater, ordinal, value) => - updater.setLong(ordinal, value.asInstanceOf[Long] * 1000) + // For backward compatibility, if the Avro type is Long and it is not logical type + // (the `null` case), the value is processed as timestamp type with millisecond precision. + case null | _: TimestampMillis => (updater, ordinal, value) => + val millis = value.asInstanceOf[Long] + val micros = DateTimeUtils.millisToMicros(millis) + if (rebaseDateTime) { + updater.setLong(ordinal, DateTimeUtils.rebaseJulianToGregorianMicros(micros)) + } else { + updater.setLong(ordinal, micros) + } case _: TimestampMicros => (updater, ordinal, value) => - updater.setLong(ordinal, value.asInstanceOf[Long]) - case null => (updater, ordinal, value) => - // For backward compatibility, if the Avro type is Long and it is not logical type, - // the value is processed as timestamp type with millisecond precision. - updater.setLong(ordinal, value.asInstanceOf[Long] * 1000) + val micros = value.asInstanceOf[Long] + if (rebaseDateTime) { + updater.setLong(ordinal, DateTimeUtils.rebaseJulianToGregorianMicros(micros)) + } else { + updater.setLong(ordinal, micros) + } case other => throw new IncompatibleSchemaException( s"Cannot convert Avro logical type ${other} to Catalyst Timestamp type.") } From 5837181ab9cc8ce988d5bdafd4cad3cb41b5cc87 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Wed, 18 Mar 2020 20:30:01 +0300 Subject: [PATCH 05/11] Copy-paste tests to DateTimeUtilsSuite --- .../catalyst/util/DateTimeUtilsSuite.scala | 60 +++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala index 68a4a24d6c9c7..c810633b79f96 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala @@ -686,4 +686,64 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper { assert(convertIncompatiblePattern("yyyy-MM-dd'T'HH:mm:ss.SSSz G") === "yyyy-MM-dd'T'HH:mm:ss.SSSz G") } + + test("rebase julian to/from gregorian micros") { + outstandingTimezones.foreach { timeZone => + withDefaultTimeZone(timeZone) { + Seq( + "0001-01-01 01:02:03.654321", + "1000-01-01 03:02:01.123456", + "1582-10-04 00:00:00.000000", + "1582-10-15 00:00:00.999999", // Gregorian cutover day + "1883-11-10 00:00:00.000000", // America/Los_Angeles -7:52:58 zone offset + "1883-11-20 00:00:00.000000", // America/Los_Angeles -08:00 zone offset + "1969-12-31 11:22:33.000100", + "1970-01-01 00:00:00.000001", // The epoch day + "2020-03-14 09:33:01.500000").foreach { ts => + withClue(s"time zone = ${timeZone.getID} ts = $ts") { + val julianTs = Timestamp.valueOf(ts) + val julianMicros = millisToMicros(julianTs.getTime) + + ((julianTs.getNanos / NANOS_PER_MICROS) % MICROS_PER_MILLIS) + val gregorianMicros = instantToMicros(LocalDateTime.parse(ts.replace(' ', 'T')) + .atZone(timeZone.toZoneId) + .toInstant) + + assert(rebaseJulianToGregorianMicros(julianMicros) === gregorianMicros) + assert(rebaseGregorianToJulianMicros(gregorianMicros) === julianMicros) + } + } + } + } + } + + test("rebase gregorian to/from julian days") { + // millisToDays() and fromJavaDate() are taken from Spark 2.4 + def millisToDays(millisUtc: Long, timeZone: TimeZone): Int = { + val millisLocal = millisUtc + timeZone.getOffset(millisUtc) + Math.floor(millisLocal.toDouble / MILLIS_PER_DAY).toInt + } + def fromJavaDate(date: Date): Int = { + millisToDays(date.getTime, defaultTimeZone()) + } + outstandingTimezones.foreach { timeZone => + withDefaultTimeZone(timeZone) { + Seq( + "0001-01-01", + "1000-01-01", + "1582-10-04", + "1582-10-15", // Gregorian cutover day + "1883-11-10", // America/Los_Angeles -7:52:58 zone offset + "1883-11-20", // America/Los_Angeles -08:00 zone offset + "1969-12-31", + "1970-01-01", // The epoch day + "2020-03-14").foreach { date => + val julianDays = fromJavaDate(Date.valueOf(date)) + val gregorianDays = localDateToDays(LocalDate.parse(date)) + + assert(rebaseGregorianToJulianDays(gregorianDays) === julianDays) + assert(rebaseJulianToGregorianDays(julianDays) === gregorianDays) + } + } + } + } } From 61cf83bee7f312f8e1b2966d6da0b25a77fe32bf Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Wed, 18 Mar 2020 21:17:20 +0300 Subject: [PATCH 06/11] Add tests for write path --- .../spark/sql/avro/AvroLogicalTypeSuite.scala | 38 ++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala index 6335304892e4c..7cb5f63ad2e18 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.avro import java.io.File -import java.sql.Timestamp +import java.sql.{Date, Timestamp} import org.apache.avro.{LogicalTypes, Schema} import org.apache.avro.Conversions.DecimalConversion @@ -364,6 +364,42 @@ abstract class AvroLogicalTypeSuite extends QueryTest with SharedSparkSession { Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456"))) } } + + test("SPARK-31183: rebasing timestamps in write") { + val tsStr = "1001-01-01 01:02:03.123" + val nonRebased = "1001-01-07 01:09:05.123" + withTempPath { dir => + val path = dir.getAbsolutePath + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") { + Seq(tsStr).toDF("tsS") + .select($"tsS".cast("timestamp").as("ts")) + .write.format("avro") + .save(path) + + checkAnswer(spark.read.format("avro").load(path), Row(Timestamp.valueOf(tsStr))) + } + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") { + checkAnswer(spark.read.format("avro").load(path), Row(Timestamp.valueOf(nonRebased))) + } + } + } + + test("SPARK-31183: rebasing dates in write") { + withTempPath { dir => + val path = dir.getAbsolutePath + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") { + Seq("1001-01-01").toDF("dateS") + .select($"dateS".cast("date").as("date")) + .write.format("avro") + .save(path) + + checkAnswer(spark.read.format("avro").load(path), Row(Date.valueOf("1001-01-01"))) + } + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") { + checkAnswer(spark.read.format("avro").load(path), Row(Date.valueOf("1001-01-07"))) + } + } + } } class AvroV1LogicalTypeSuite extends AvroLogicalTypeSuite { From 9a96af08260967554a9ae7c4fa9bd8d66a223e85 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Wed, 18 Mar 2020 21:17:39 +0300 Subject: [PATCH 07/11] Rebase timestamps/dates in write --- .../spark/sql/avro/AvroSerializer.scala | 26 +++++++++++++++---- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala index b7bf7e5543033..9d95c84676bef 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala @@ -34,6 +34,8 @@ import org.apache.avro.util.Utf8 import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{SpecializedGetters, SpecificInternalRow} +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ /** @@ -42,6 +44,9 @@ import org.apache.spark.sql.types._ class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean) extends Logging { + // Whether to rebase datetimes from Gregorian to Julian calendar in write + private val rebaseDateTime: Boolean = SQLConf.get.avroRebaseDateTimeEnabled + def serialize(catalystData: Any): Any = { converter.apply(catalystData) } @@ -135,15 +140,26 @@ class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: case (BinaryType, BYTES) => (getter, ordinal) => ByteBuffer.wrap(getter.getBinary(ordinal)) + case (DateType, INT) if rebaseDateTime => + (getter, ordinal) => DateTimeUtils.rebaseGregorianToJulianDays(getter.getInt(ordinal)) + case (DateType, INT) => (getter, ordinal) => getter.getInt(ordinal) case (TimestampType, LONG) => avroType.getLogicalType match { - case _: TimestampMillis => (getter, ordinal) => getter.getLong(ordinal) / 1000 - case _: TimestampMicros => (getter, ordinal) => getter.getLong(ordinal) - // For backward compatibility, if the Avro type is Long and it is not logical type, - // output the timestamp value as with millisecond precision. - case null => (getter, ordinal) => getter.getLong(ordinal) / 1000 + // For backward compatibility, if the Avro type is Long and it is not logical type + // (the `null` case), output the timestamp value as with millisecond precision. + case null | _: TimestampMillis => (getter, ordinal) => + val micros = getter.getLong(ordinal) + val rebasedMicros = if (rebaseDateTime) { + DateTimeUtils.rebaseGregorianToJulianMicros(micros) + } else micros + DateTimeUtils.microsToMillis(rebasedMicros) + case _: TimestampMicros => (getter, ordinal) => + val micros = getter.getLong(ordinal) + if (rebaseDateTime) { + DateTimeUtils.rebaseGregorianToJulianMicros(micros) + } else micros case other => throw new IncompatibleSchemaException( s"Cannot convert Catalyst Timestamp type to Avro logical type ${other}") } From dac03f291ba2175ffcb6b374ddb669f0a60b7f8e Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 19 Mar 2020 13:47:13 +0300 Subject: [PATCH 08/11] Add a test for millis timestamp --- .../spark/sql/avro/AvroLogicalTypeSuite.scala | 38 +++++++++++++++++-- 1 file changed, 35 insertions(+), 3 deletions(-) diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala index 7cb5f63ad2e18..a3eaf63bf0722 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala @@ -365,9 +365,9 @@ abstract class AvroLogicalTypeSuite extends QueryTest with SharedSparkSession { } } - test("SPARK-31183: rebasing timestamps in write") { - val tsStr = "1001-01-01 01:02:03.123" - val nonRebased = "1001-01-07 01:09:05.123" + test("SPARK-31183: rebasing microseconds timestamps in write") { + val tsStr = "1001-01-01 01:02:03.123456" + val nonRebased = "1001-01-07 01:09:05.123456" withTempPath { dir => val path = dir.getAbsolutePath withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") { @@ -384,6 +384,38 @@ abstract class AvroLogicalTypeSuite extends QueryTest with SharedSparkSession { } } + test("SPARK-31183: rebasing milliseconds timestamps in write") { + val tsStr = "1001-01-01 01:02:03.123456" + val rebased = "1001-01-01 01:02:03.123" + val nonRebased = "1001-01-07 01:09:05.123" + val timestampSchema = """ + { + "namespace": "logical", + "type": "record", + "name": "test", + "fields": [ + {"name": "ts", "type": {"type": "long","logicalType": "timestamp-millis"}} + ] + } + """ + withTempPath { dir => + val path = dir.getAbsolutePath + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") { + Seq(tsStr).toDF("tsS") + .select($"tsS".cast("timestamp").as("ts")) + .write + .option("avroSchema", timestampSchema) + .format("avro") + .save(path) + + checkAnswer(spark.read.format("avro").load(path), Row(Timestamp.valueOf(rebased))) + } + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") { + checkAnswer(spark.read.format("avro").load(path), Row(Timestamp.valueOf(nonRebased))) + } + } + } + test("SPARK-31183: rebasing dates in write") { withTempPath { dir => val path = dir.getAbsolutePath From dd24d916e96a26869428a23016311d0d1bb84be9 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 19 Mar 2020 21:08:36 +0300 Subject: [PATCH 09/11] Test reading 2.4 timestamps in millis --- .../resources/before_1582_ts_millis_v2_4.avro | Bin 0 -> 244 bytes .../spark/sql/avro/AvroLogicalTypeSuite.scala | 7 +++++-- 2 files changed, 5 insertions(+), 2 deletions(-) create mode 100644 external/avro/src/test/resources/before_1582_ts_millis_v2_4.avro diff --git a/external/avro/src/test/resources/before_1582_ts_millis_v2_4.avro b/external/avro/src/test/resources/before_1582_ts_millis_v2_4.avro new file mode 100644 index 0000000000000000000000000000000000000000..dbaec814eb9549543358c68f94429140f60318ba GIT binary patch literal 244 zcmeZI%3@>@ODrqO*DFrWNX<>$!&I$QQdy9yWTjM;nw(#hqNJmgmzWFUm!uY#KzYRl ziOC?Loc#36Z0d-baZvX%Q literal 0 HcmV?d00001 diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala index a3eaf63bf0722..3512bac6d0003 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala @@ -359,9 +359,12 @@ abstract class AvroLogicalTypeSuite extends QueryTest with SharedSparkSession { checkAnswer( readResourceAvroFile("before_1582_date_v2_4.avro"), Row(java.sql.Date.valueOf("1001-01-01"))) - checkAnswer(readResourceAvroFile( - "before_1582_ts_micros_v2_4.avro"), + checkAnswer( + readResourceAvroFile("before_1582_ts_micros_v2_4.avro"), Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456"))) + checkAnswer( + readResourceAvroFile("before_1582_ts_millis_v2_4.avro"), + Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.124"))) } } From 19a5ff7233208d29972bd17b2d491cbc15bc75d4 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 19 Mar 2020 21:11:23 +0300 Subject: [PATCH 10/11] Add stripMargin --- .../spark/sql/avro/AvroLogicalTypeSuite.scala | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala index 3512bac6d0003..ff8c33847fac0 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala @@ -392,15 +392,15 @@ abstract class AvroLogicalTypeSuite extends QueryTest with SharedSparkSession { val rebased = "1001-01-01 01:02:03.123" val nonRebased = "1001-01-07 01:09:05.123" val timestampSchema = """ - { - "namespace": "logical", - "type": "record", - "name": "test", - "fields": [ - {"name": "ts", "type": {"type": "long","logicalType": "timestamp-millis"}} - ] - } - """ + |{ + | "namespace": "logical", + | "type": "record", + | "name": "test", + | "fields": [ + | {"name": "ts", "type": {"type": "long","logicalType": "timestamp-millis"}} + | ] + |} + """.stripMargin withTempPath { dir => val path = dir.getAbsolutePath withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") { From 2464c905045856fdbf17d2765ae499b96f912621 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 19 Mar 2020 21:23:15 +0300 Subject: [PATCH 11/11] Test for just long --- .../spark/sql/avro/AvroLogicalTypeSuite.scala | 55 +++++++++++-------- 1 file changed, 31 insertions(+), 24 deletions(-) diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala index ff8c33847fac0..9e89b69c0b33c 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala @@ -391,30 +391,37 @@ abstract class AvroLogicalTypeSuite extends QueryTest with SharedSparkSession { val tsStr = "1001-01-01 01:02:03.123456" val rebased = "1001-01-01 01:02:03.123" val nonRebased = "1001-01-07 01:09:05.123" - val timestampSchema = """ - |{ - | "namespace": "logical", - | "type": "record", - | "name": "test", - | "fields": [ - | {"name": "ts", "type": {"type": "long","logicalType": "timestamp-millis"}} - | ] - |} - """.stripMargin - withTempPath { dir => - val path = dir.getAbsolutePath - withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") { - Seq(tsStr).toDF("tsS") - .select($"tsS".cast("timestamp").as("ts")) - .write - .option("avroSchema", timestampSchema) - .format("avro") - .save(path) - - checkAnswer(spark.read.format("avro").load(path), Row(Timestamp.valueOf(rebased))) - } - withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") { - checkAnswer(spark.read.format("avro").load(path), Row(Timestamp.valueOf(nonRebased))) + Seq( + """{"type": "long","logicalType": "timestamp-millis"}""", + """"long"""").foreach { tsType => + val timestampSchema = s""" + |{ + | "namespace": "logical", + | "type": "record", + | "name": "test", + | "fields": [ + | {"name": "ts", "type": $tsType} + | ] + |}""".stripMargin + withTempPath { dir => + val path = dir.getAbsolutePath + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") { + Seq(tsStr).toDF("tsS") + .select($"tsS".cast("timestamp").as("ts")) + .write + .option("avroSchema", timestampSchema) + .format("avro") + .save(path) + + checkAnswer( + spark.read.schema("ts timestamp").format("avro").load(path), + Row(Timestamp.valueOf(rebased))) + } + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") { + checkAnswer( + spark.read.schema("ts timestamp").format("avro").load(path), + Row(Timestamp.valueOf(nonRebased))) + } } } }