From 4d6e4b6aae7e2371bb0469894a2e382fe1ba12a8 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 23 Apr 2021 17:17:42 +0800 Subject: [PATCH 1/2] CatalystTypeConverters of date/timestamp should accept both the old and new Java time classes --- .../sql/catalyst/CatalystTypeConverters.scala | 34 +++++++++----- .../CatalystTypeConvertersSuite.scala | 46 +++++++++++++++++-- 2 files changed, 65 insertions(+), 15 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala index b55d1b725f56..251d23b12f33 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala @@ -304,18 +304,23 @@ object CatalystTypeConverters { row.getUTF8String(column).toString } - private object DateConverter extends CatalystTypeConverter[Date, Date, Any] { - override def toCatalystImpl(scalaValue: Date): Int = DateTimeUtils.fromJavaDate(scalaValue) + private object DateConverter extends CatalystTypeConverter[Any, Date, Any] { + override def toCatalystImpl(scalaValue: Any): Int = scalaValue match { + case d: Date => DateTimeUtils.fromJavaDate(d) + case l: LocalDate => DateTimeUtils.localDateToDays(l) + case other => throw new IllegalArgumentException( + s"The value (${other.toString}) of the type (${other.getClass.getCanonicalName}) " + + s"cannot be converted to the date type") + } override def toScala(catalystValue: Any): Date = if (catalystValue == null) null else DateTimeUtils.toJavaDate(catalystValue.asInstanceOf[Int]) override def toScalaImpl(row: InternalRow, column: Int): Date = DateTimeUtils.toJavaDate(row.getInt(column)) } - private object LocalDateConverter extends CatalystTypeConverter[LocalDate, LocalDate, Any] { - override def toCatalystImpl(scalaValue: LocalDate): Int = { - DateTimeUtils.localDateToDays(scalaValue) - } + private object LocalDateConverter extends CatalystTypeConverter[Any, LocalDate, Any] { + override def toCatalystImpl(scalaValue: Any): Int = + DateConverter.toCatalystImpl(scalaValue) override def toScala(catalystValue: Any): LocalDate = { if (catalystValue == null) null else DateTimeUtils.daysToLocalDate(catalystValue.asInstanceOf[Int]) @@ -324,9 +329,14 @@ object CatalystTypeConverters { DateTimeUtils.daysToLocalDate(row.getInt(column)) } - private object TimestampConverter extends CatalystTypeConverter[Timestamp, Timestamp, Any] { - override def toCatalystImpl(scalaValue: Timestamp): Long = - DateTimeUtils.fromJavaTimestamp(scalaValue) + private object TimestampConverter extends CatalystTypeConverter[Any, Timestamp, Any] { + override def toCatalystImpl(scalaValue: Any): Long = scalaValue match { + case t: Timestamp => DateTimeUtils.fromJavaTimestamp(t) + case i: Instant => DateTimeUtils.instantToMicros(i) + case other => throw new IllegalArgumentException( + s"The value (${other.toString}) of the type (${other.getClass.getCanonicalName}) " + + s"cannot be converted to the timestamp type") + } override def toScala(catalystValue: Any): Timestamp = if (catalystValue == null) null else DateTimeUtils.toJavaTimestamp(catalystValue.asInstanceOf[Long]) @@ -334,9 +344,9 @@ object CatalystTypeConverters { DateTimeUtils.toJavaTimestamp(row.getLong(column)) } - private object InstantConverter extends CatalystTypeConverter[Instant, Instant, Any] { - override def toCatalystImpl(scalaValue: Instant): Long = - DateTimeUtils.instantToMicros(scalaValue) + private object InstantConverter extends CatalystTypeConverter[Any, Instant, Any] { + override def toCatalystImpl(scalaValue: Any): Long = + TimestampConverter.toCatalystImpl(scalaValue) override def toScala(catalystValue: Any): Instant = if (catalystValue == null) null else DateTimeUtils.microsToInstant(catalystValue.asInstanceOf[Long]) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala index 0dbae707a4a3..67ec3a30c68b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.SparkFunSuite import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData import org.apache.spark.sql.catalyst.plans.SQLHelper -import org.apache.spark.sql.catalyst.util.{DateTimeUtils, GenericArrayData, IntervalUtils} +import org.apache.spark.sql.catalyst.util.{DateTimeConstants, DateTimeUtils, GenericArrayData, IntervalUtils} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -197,8 +197,8 @@ class CatalystTypeConvertersSuite extends SparkFunSuite with SQLHelper { "1970-01-01", "1972-12-31", "2019-02-16", - "2119-03-16").foreach { timestamp => - val input = LocalDate.parse(timestamp) + "2119-03-16").foreach { date => + val input = LocalDate.parse(date) val result = CatalystTypeConverters.convertToCatalyst(input) val expected = DateTimeUtils.localDateToDays(input) assert(result === expected) @@ -294,4 +294,44 @@ class CatalystTypeConvertersSuite extends SparkFunSuite with SQLHelper { } } } + + test("SPARK-35204: createToCatalystConverter for date") { + Seq(true, false).foreach { enable => + withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> enable.toString) { + Seq(-1234, 0, 1234).foreach { days => + val converter = CatalystTypeConverters.createToCatalystConverter(DateType) + + val ld = LocalDate.ofEpochDay(days) + val result1 = converter(ld) + + val d = java.sql.Date.valueOf(ld) + val result2 = converter(d) + + val expected = DateTimeUtils.localDateToDays(ld) + assert(result1 === expected) + assert(result2 === expected) + } + } + } + } + + test("SPARK-35204: createToCatalystConverter for timestamp") { + Seq(true, false).foreach { enable => + withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> enable.toString) { + Seq(-1234, 0, 1234).foreach { seconds => + val converter = CatalystTypeConverters.createToCatalystConverter(TimestampType) + + val i = Instant.ofEpochSecond(seconds) + val result1 = converter(i) + + val t = new java.sql.Timestamp(seconds * 1000) + val result2 = converter(t) + + val expected = seconds * DateTimeConstants.MICROS_PER_SECOND + assert(result1 === expected) + assert(result2 === expected) + } + } + } + } } From c8dbc1a9fe845632f7143d9c50ec5f154182210b Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 23 Apr 2021 22:22:19 +0800 Subject: [PATCH 2/2] address comments --- .../apache/spark/sql/catalyst/CatalystTypeConverters.scala | 4 ++-- .../spark/sql/catalyst/CatalystTypeConvertersSuite.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala index 251d23b12f33..ccf0a50b734f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala @@ -310,7 +310,7 @@ object CatalystTypeConverters { case l: LocalDate => DateTimeUtils.localDateToDays(l) case other => throw new IllegalArgumentException( s"The value (${other.toString}) of the type (${other.getClass.getCanonicalName}) " - + s"cannot be converted to the date type") + + s"cannot be converted to the ${DateType.sql} type") } override def toScala(catalystValue: Any): Date = if (catalystValue == null) null else DateTimeUtils.toJavaDate(catalystValue.asInstanceOf[Int]) @@ -335,7 +335,7 @@ object CatalystTypeConverters { case i: Instant => DateTimeUtils.instantToMicros(i) case other => throw new IllegalArgumentException( s"The value (${other.toString}) of the type (${other.getClass.getCanonicalName}) " - + s"cannot be converted to the timestamp type") + + s"cannot be converted to the ${TimestampType.sql} type") } override def toScala(catalystValue: Any): Timestamp = if (catalystValue == null) null diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala index 67ec3a30c68b..169c5d6a3197 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala @@ -324,7 +324,7 @@ class CatalystTypeConvertersSuite extends SparkFunSuite with SQLHelper { val i = Instant.ofEpochSecond(seconds) val result1 = converter(i) - val t = new java.sql.Timestamp(seconds * 1000) + val t = new java.sql.Timestamp(seconds * DateTimeConstants.MILLIS_PER_SECOND) val result2 = converter(t) val expected = seconds * DateTimeConstants.MICROS_PER_SECOND