diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala index 5ec28ba6b63a..6c60869dad9b 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala @@ -106,21 +106,22 @@ class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) { case (LONG, TimestampType) => avroType.getLogicalType match { // For backward compatibility, if the Avro type is Long and it is not logical type // (the `null` case), the value is processed as timestamp type with millisecond precision. + case null | _: TimestampMillis if rebaseDateTime => (updater, ordinal, value) => + val millis = value.asInstanceOf[Long] + val micros = DateTimeUtils.millisToMicros(millis) + val rebasedMicros = DateTimeUtils.rebaseJulianToGregorianMicros(micros) + updater.setLong(ordinal, rebasedMicros) case null | _: TimestampMillis => (updater, ordinal, value) => val millis = value.asInstanceOf[Long] val micros = DateTimeUtils.millisToMicros(millis) - if (rebaseDateTime) { - updater.setLong(ordinal, DateTimeUtils.rebaseJulianToGregorianMicros(micros)) - } else { - updater.setLong(ordinal, micros) - } + updater.setLong(ordinal, micros) + case _: TimestampMicros if rebaseDateTime => (updater, ordinal, value) => + val micros = value.asInstanceOf[Long] + val rebasedMicros = DateTimeUtils.rebaseJulianToGregorianMicros(micros) + updater.setLong(ordinal, rebasedMicros) case _: TimestampMicros => (updater, ordinal, value) => val micros = value.asInstanceOf[Long] - if (rebaseDateTime) { - updater.setLong(ordinal, DateTimeUtils.rebaseJulianToGregorianMicros(micros)) - } else { - updater.setLong(ordinal, micros) - } + updater.setLong(ordinal, micros) case other => throw new IncompatibleSchemaException( s"Cannot convert Avro logical type ${other} to Catalyst Timestamp type.") } diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala index 9d95c84676be..16b8eabfb120 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala @@ -149,17 +149,15 @@ class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: case (TimestampType, LONG) => avroType.getLogicalType match { // For backward compatibility, if the Avro type is Long and it is not logical type // (the `null` case), output the timestamp value as with millisecond precision. - case null | _: TimestampMillis => (getter, ordinal) => + case null | _: TimestampMillis if rebaseDateTime => (getter, ordinal) => val micros = getter.getLong(ordinal) - val rebasedMicros = if (rebaseDateTime) { - DateTimeUtils.rebaseGregorianToJulianMicros(micros) - } else micros + val rebasedMicros = DateTimeUtils.rebaseGregorianToJulianMicros(micros) DateTimeUtils.microsToMillis(rebasedMicros) - case _: TimestampMicros => (getter, ordinal) => - val micros = getter.getLong(ordinal) - if (rebaseDateTime) { - DateTimeUtils.rebaseGregorianToJulianMicros(micros) - } else micros + case null | _: TimestampMillis => (getter, ordinal) => + DateTimeUtils.microsToMillis(getter.getLong(ordinal)) + case _: TimestampMicros if rebaseDateTime => (getter, ordinal) => + DateTimeUtils.rebaseGregorianToJulianMicros(getter.getLong(ordinal)) + case _: TimestampMicros => (getter, ordinal) => getter.getLong(ordinal) case other => throw new IncompatibleSchemaException( s"Cannot convert Catalyst Timestamp type to Avro logical type ${other}") } diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala index 9e89b69c0b33..82569653c1f2 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.avro import java.io.File -import java.sql.{Date, Timestamp} +import java.sql.Timestamp import org.apache.avro.{LogicalTypes, Schema} import org.apache.avro.Conversions.DecimalConversion @@ -25,7 +25,7 @@ import org.apache.avro.file.DataFileWriter import org.apache.avro.generic.{GenericData, GenericDatumWriter, GenericRecord} import org.apache.spark.{SparkConf, SparkException} -import org.apache.spark.sql.{DataFrame, QueryTest, Row} +import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession @@ -348,100 +348,6 @@ abstract class AvroLogicalTypeSuite extends QueryTest with SharedSparkSession { assert(msg.contains("Unscaled value too large for precision")) } } - - private def readResourceAvroFile(name: String): DataFrame = { - val url = Thread.currentThread().getContextClassLoader.getResource(name) - spark.read.format("avro").load(url.toString) - } - - test("SPARK-31183: compatibility with Spark 2.4 in reading dates/timestamps") { - withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") { - checkAnswer( - readResourceAvroFile("before_1582_date_v2_4.avro"), - Row(java.sql.Date.valueOf("1001-01-01"))) - checkAnswer( - readResourceAvroFile("before_1582_ts_micros_v2_4.avro"), - Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456"))) - checkAnswer( - readResourceAvroFile("before_1582_ts_millis_v2_4.avro"), - Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.124"))) - } - } - - test("SPARK-31183: rebasing microseconds timestamps in write") { - val tsStr = "1001-01-01 01:02:03.123456" - val nonRebased = "1001-01-07 01:09:05.123456" - withTempPath { dir => - val path = dir.getAbsolutePath - withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") { - Seq(tsStr).toDF("tsS") - .select($"tsS".cast("timestamp").as("ts")) - .write.format("avro") - .save(path) - - checkAnswer(spark.read.format("avro").load(path), Row(Timestamp.valueOf(tsStr))) - } - withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") { - checkAnswer(spark.read.format("avro").load(path), Row(Timestamp.valueOf(nonRebased))) - } - } - } - - test("SPARK-31183: rebasing milliseconds timestamps in write") { - val tsStr = "1001-01-01 01:02:03.123456" - val rebased = "1001-01-01 01:02:03.123" - val nonRebased = "1001-01-07 01:09:05.123" - Seq( - """{"type": "long","logicalType": "timestamp-millis"}""", - """"long"""").foreach { tsType => - val timestampSchema = s""" - |{ - | "namespace": "logical", - | "type": "record", - | "name": "test", - | "fields": [ - | {"name": "ts", "type": $tsType} - | ] - |}""".stripMargin - withTempPath { dir => - val path = dir.getAbsolutePath - withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") { - Seq(tsStr).toDF("tsS") - .select($"tsS".cast("timestamp").as("ts")) - .write - .option("avroSchema", timestampSchema) - .format("avro") - .save(path) - - checkAnswer( - spark.read.schema("ts timestamp").format("avro").load(path), - Row(Timestamp.valueOf(rebased))) - } - withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") { - checkAnswer( - spark.read.schema("ts timestamp").format("avro").load(path), - Row(Timestamp.valueOf(nonRebased))) - } - } - } - } - - test("SPARK-31183: rebasing dates in write") { - withTempPath { dir => - val path = dir.getAbsolutePath - withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") { - Seq("1001-01-01").toDF("dateS") - .select($"dateS".cast("date").as("date")) - .write.format("avro") - .save(path) - - checkAnswer(spark.read.format("avro").load(path), Row(Date.valueOf("1001-01-01"))) - } - withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") { - checkAnswer(spark.read.format("avro").load(path), Row(Date.valueOf("1001-01-07"))) - } - } - } } class AvroV1LogicalTypeSuite extends AvroLogicalTypeSuite { diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index 360160c9c939..34a0e2bf607a 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -21,7 +21,7 @@ import java.io._ import java.net.URL import java.nio.file.{Files, Paths} import java.sql.{Date, Timestamp} -import java.util.{Locale, TimeZone, UUID} +import java.util.{Locale, UUID} import scala.collection.JavaConverters._ @@ -35,9 +35,10 @@ import org.apache.commons.io.FileUtils import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.sql._ -import org.apache.spark.sql.TestingUDT.{IntervalData, NullData, NullUDT} +import org.apache.spark.sql.TestingUDT.IntervalData import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.plans.logical.Filter +import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.datasources.{DataSource, FilePartition} import org.apache.spark.sql.execution.datasources.v2.BatchScanExec @@ -83,6 +84,11 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession { }, new GenericDatumReader[Any]()).getSchema.toString(false) } + private def readResourceAvroFile(name: String): DataFrame = { + val url = Thread.currentThread().getContextClassLoader.getResource(name) + spark.read.format("avro").load(url.toString) + } + test("resolve avro data source") { val databricksAvro = "com.databricks.spark.avro" // By default the backward compatibility for com.databricks.spark.avro is enabled. @@ -402,18 +408,19 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession { StructField("float", FloatType, true), StructField("date", DateType, true) )) - TimeZone.setDefault(TimeZone.getTimeZone("UTC")) - val rdd = spark.sparkContext.parallelize(Seq( - Row(1f, null), - Row(2f, new Date(1451948400000L)), - Row(3f, new Date(1460066400500L)) - )) - val df = spark.createDataFrame(rdd, schema) - df.write.format("avro").save(dir.toString) - assert(spark.read.format("avro").load(dir.toString).count == rdd.count) - checkAnswer( - spark.read.format("avro").load(dir.toString).select("date"), - Seq(Row(null), Row(new Date(1451865600000L)), Row(new Date(1459987200000L)))) + DateTimeTestUtils.withDefaultTimeZone(DateTimeUtils.TimeZoneUTC) { + val rdd = spark.sparkContext.parallelize(Seq( + Row(1f, null), + Row(2f, new Date(1451948400000L)), + Row(3f, new Date(1460066400500L)) + )) + val df = spark.createDataFrame(rdd, schema) + df.write.format("avro").save(dir.toString) + assert(spark.read.format("avro").load(dir.toString).count == rdd.count) + checkAnswer( + spark.read.format("avro").load(dir.toString).select("date"), + Seq(Row(null), Row(new Date(1451865600000L)), Row(new Date(1459987200000L)))) + } } } @@ -1521,6 +1528,95 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession { assert(deprecatedEvents.size === 1) } } + + test("SPARK-31183: compatibility with Spark 2.4 in reading dates/timestamps") { + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") { + checkAnswer( + readResourceAvroFile("before_1582_date_v2_4.avro"), + Row(java.sql.Date.valueOf("1001-01-01"))) + checkAnswer( + readResourceAvroFile("before_1582_ts_micros_v2_4.avro"), + Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456"))) + checkAnswer( + readResourceAvroFile("before_1582_ts_millis_v2_4.avro"), + Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.124"))) + } + } + + test("SPARK-31183: rebasing microseconds timestamps in write") { + val tsStr = "1001-01-01 01:02:03.123456" + val nonRebased = "1001-01-07 01:09:05.123456" + withTempPath { dir => + val path = dir.getAbsolutePath + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") { + Seq(tsStr).toDF("tsS") + .select($"tsS".cast("timestamp").as("ts")) + .write.format("avro") + .save(path) + + checkAnswer(spark.read.format("avro").load(path), Row(Timestamp.valueOf(tsStr))) + } + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") { + checkAnswer(spark.read.format("avro").load(path), Row(Timestamp.valueOf(nonRebased))) + } + } + } + + test("SPARK-31183: rebasing milliseconds timestamps in write") { + val tsStr = "1001-01-01 01:02:03.123456" + val rebased = "1001-01-01 01:02:03.123" + val nonRebased = "1001-01-07 01:09:05.123" + Seq( + """{"type": "long","logicalType": "timestamp-millis"}""", + """"long"""").foreach { tsType => + val timestampSchema = s""" + |{ + | "namespace": "logical", + | "type": "record", + | "name": "test", + | "fields": [ + | {"name": "ts", "type": $tsType} + | ] + |}""".stripMargin + withTempPath { dir => + val path = dir.getAbsolutePath + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") { + Seq(tsStr).toDF("tsS") + .select($"tsS".cast("timestamp").as("ts")) + .write + .option("avroSchema", timestampSchema) + .format("avro") + .save(path) + + checkAnswer( + spark.read.schema("ts timestamp").format("avro").load(path), + Row(Timestamp.valueOf(rebased))) + } + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") { + checkAnswer( + spark.read.schema("ts timestamp").format("avro").load(path), + Row(Timestamp.valueOf(nonRebased))) + } + } + } + } + + test("SPARK-31183: rebasing dates in write") { + withTempPath { dir => + val path = dir.getAbsolutePath + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") { + Seq("1001-01-01").toDF("dateS") + .select($"dateS".cast("date").as("date")) + .write.format("avro") + .save(path) + + checkAnswer(spark.read.format("avro").load(path), Row(Date.valueOf("1001-01-01"))) + } + withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") { + checkAnswer(spark.read.format("avro").load(path), Row(Date.valueOf("1001-01-07"))) + } + } + } } class AvroV1Suite extends AvroSuite {