From 0c45b1e088f7db72045a1efabb1d2017bfe7c755 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 11 Oct 2019 11:54:12 +0300 Subject: [PATCH 01/19] Add DURATION to Encoders --- .../src/main/scala/org/apache/spark/sql/Encoders.scala | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala index 5d31b5bbf12a..9e2eff0961b2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala @@ -128,6 +128,14 @@ object Encoders { */ def INSTANT: Encoder[java.time.Instant] = ExpressionEncoder() + /** + * Creates an encoder that serializes instances of the `java.time.Duration` class + * to the internal representation of nullable Catalyst's CalendarIntervalType. + * + * @since 3.0.0 + */ + def DURATION: Encoder[java.time.Duration] = ExpressionEncoder() + /** * An encoder for arrays of bytes. * From 55ea764e1eac0bc90acf783b5d8bf20aeaed6e10 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 11 Oct 2019 12:18:48 +0300 Subject: [PATCH 02/19] Add DurationConverter --- .../sql/catalyst/CatalystTypeConverters.scala | 16 ++++++++++++++-- .../spark/sql/catalyst/util/DateTimeUtils.scala | 12 +++++++++++- 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala index 34d2f45e715e..0b901293be11 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala @@ -21,7 +21,7 @@ import java.lang.{Iterable => JavaIterable} import java.math.{BigDecimal => JavaBigDecimal} import java.math.{BigInteger => JavaBigInteger} import java.sql.{Date, Timestamp} -import java.time.{Instant, LocalDate} +import java.time.{Duration, Instant, LocalDate} import java.util.{Map => JavaMap} import javax.annotation.Nullable @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} /** * Functions to convert Scala types to Catalyst types and vice versa. @@ -74,6 +74,7 @@ object CatalystTypeConverters { case LongType => LongConverter case FloatType => FloatConverter case DoubleType => DoubleConverter + case CalendarIntervalType => DurationConverter case dataType: DataType => IdentityConverter(dataType) } converter.asInstanceOf[CatalystTypeConverter[Any, Any, Any]] @@ -341,6 +342,16 @@ object CatalystTypeConverters { DateTimeUtils.microsToInstant(row.getLong(column)) } + private object DurationConverter extends CatalystTypeConverter[Duration, Duration, Any] { + override def toCatalystImpl(scalaValue: Duration): CalendarInterval = + DateTimeUtils.durationToInterval(scalaValue) + override def toScala(catalystValue: Any): Duration = + if (catalystValue == null) null + else DateTimeUtils.intervalToDuration(catalystValue.asInstanceOf[CalendarInterval]) + override def toScalaImpl(row: InternalRow, column: Int): Duration = + DateTimeUtils.intervalToDuration(row.getInterval(column)) + } + private class DecimalConverter(dataType: DecimalType) extends CatalystTypeConverter[Any, JavaBigDecimal, Decimal] { @@ -462,6 +473,7 @@ object CatalystTypeConverters { map, (key: Any) => convertToCatalyst(key), (value: Any) => convertToCatalyst(value)) + case d: Duration => DurationConverter.toCatalyst(d) case other => other } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 79fc45ec8947..6706fcb773ea 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -27,7 +27,7 @@ import java.util.concurrent.TimeUnit._ import scala.util.control.NonFatal import org.apache.spark.sql.types.Decimal -import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} /** * Helper functions for converting between internal and external date and time representations. @@ -959,4 +959,14 @@ object DateTimeUtils { None } } + + def durationToInterval(duration: Duration): CalendarInterval = { + val micros = duration.getSeconds * MICROS_PER_SECOND + duration.getNano / NANOS_PER_MICROS + new CalendarInterval(0, micros) + } + + def intervalToDuration(interval: CalendarInterval): Duration = { + val microsDuration = Duration.ofNanos(interval.microseconds * NANOS_PER_MICROS) + microsDuration.plusSeconds(interval.months * SECONDS_PER_MONTH) + } } From 7e8e4557a50f5d2afef55fc752f2547b15d7df72 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 11 Oct 2019 12:22:59 +0300 Subject: [PATCH 03/19] Add createDeserializerForLocalTime --- .../spark/sql/catalyst/DeserializerBuildHelper.scala | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala index e55c25c4b0c5..5224d0e791d9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala @@ -118,6 +118,15 @@ object DeserializerBuildHelper { returnNullable = false) } + def createDeserializerForDuration(path: Expression): Expression = { + StaticInvoke( + DateTimeUtils.getClass, + ObjectType(classOf[java.time.Duration]), + "intervalToDuration", + path :: Nil, + returnNullable = false) + } + def createDeserializerForJavaBigDecimal( path: Expression, returnNullable: Boolean): Expression = { From 3cdef53f6fc0af9b2b7fb8e6b0a4c849ca134fd0 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 11 Oct 2019 12:47:33 +0300 Subject: [PATCH 04/19] Update InternalRow.getWriter --- .../scala/org/apache/spark/sql/catalyst/InternalRow.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala index 1bb13e3368c0..00615922d31e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util.{ArrayData, MapData} import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} /** * An abstract class for row used internally in Spark SQL, which only contains the columns as @@ -58,6 +58,8 @@ abstract class InternalRow extends SpecializedGetters with Serializable { */ def setDecimal(i: Int, value: Decimal, precision: Int): Unit = update(i, value) + def setInterval(i: Int, value: CalendarInterval): Unit = update(i, value) + /** * Make a copy of the current [[InternalRow]] object. */ @@ -177,6 +179,8 @@ object InternalRow { case _: StructType => (input, v) => input.update(ordinal, v.asInstanceOf[InternalRow].copy()) case _: ArrayType => (input, v) => input.update(ordinal, v.asInstanceOf[ArrayData].copy()) case _: MapType => (input, v) => input.update(ordinal, v.asInstanceOf[MapData].copy()) + case _: CalendarIntervalType => + (input, v) => input.setInterval(ordinal, v.asInstanceOf[CalendarInterval]) case _ => (input, v) => input.update(ordinal, v) } } From d5618fc70e0342b41ce5af8101a565709c70cfb2 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 11 Oct 2019 12:54:49 +0300 Subject: [PATCH 05/19] Add createSerializerForJavaDuration --- .../apache/spark/sql/catalyst/JavaTypeInference.scala | 6 ++++++ .../spark/sql/catalyst/SerializerBuildHelper.scala | 9 +++++++++ 2 files changed, 15 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala index c5be3efc6371..e79b5b488c87 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala @@ -106,6 +106,7 @@ object JavaTypeInference { case c: Class[_] if c == classOf[java.sql.Date] => (DateType, true) case c: Class[_] if c == classOf[java.time.Instant] => (TimestampType, true) case c: Class[_] if c == classOf[java.sql.Timestamp] => (TimestampType, true) + case c: Class[_] if c == classOf[java.time.Duration] => (CalendarIntervalType, true) case _ if typeToken.isArray => val (dataType, nullable) = inferDataType(typeToken.getComponentType, seenTypeSet) @@ -235,6 +236,9 @@ object JavaTypeInference { case c if c == classOf[java.sql.Timestamp] => createDeserializerForSqlTimestamp(path) + case c if c == classOf[java.time.Duration] => + createDeserializerForDuration(path) + case c if c == classOf[java.lang.String] => createDeserializerForString(path, returnNullable = true) @@ -390,6 +394,8 @@ object JavaTypeInference { case c if c == classOf[java.sql.Date] => createSerializerForSqlDate(inputObject) + case c if c == classOf[java.time.Duration] => createSerializerForJavaDuration(inputObject) + case c if c == classOf[java.math.BigDecimal] => createSerializerForJavaBigDecimal(inputObject) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala index 026ff6f2983f..7673ae8bfb6e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala @@ -101,6 +101,15 @@ object SerializerBuildHelper { returnNullable = false) } + def createSerializerForJavaDuration(inputObject: Expression): Expression = { + StaticInvoke( + DateTimeUtils.getClass, + CalendarIntervalType, + "durationToInterval", + inputObject :: Nil, + returnNullable = false) + } + def createSerializerForJavaBigDecimal(inputObject: Expression): Expression = { CheckOverflow(StaticInvoke( Decimal.getClass, From e6dd5c8c806a0fe57447b2f96566c7d257d15d15 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 11 Oct 2019 14:04:25 +0300 Subject: [PATCH 06/19] Update ScalaReflection --- .../org/apache/spark/sql/catalyst/ScalaReflection.scala | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index ceb315550038..b4a7f13b087e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -230,6 +230,9 @@ object ScalaReflection extends ScalaReflection { case t if isSubtype(t, localTypeOf[java.sql.Timestamp]) => createDeserializerForSqlTimestamp(path) + case t if isSubtype(t, localTypeOf[java.time.Duration]) => + createDeserializerForDuration(path) + case t if isSubtype(t, localTypeOf[java.lang.String]) => createDeserializerForString(path, returnNullable = false) @@ -496,6 +499,9 @@ object ScalaReflection extends ScalaReflection { case t if isSubtype(t, localTypeOf[java.sql.Date]) => createSerializerForSqlDate(inputObject) + case t if isSubtype(t, localTypeOf[java.time.Duration]) => + createSerializerForJavaDuration(inputObject) + case t if isSubtype(t, localTypeOf[BigDecimal]) => createSerializerForScalaBigDecimal(inputObject) @@ -671,6 +677,8 @@ object ScalaReflection extends ScalaReflection { Schema(TimestampType, nullable = true) case t if isSubtype(t, localTypeOf[java.time.LocalDate]) => Schema(DateType, nullable = true) case t if isSubtype(t, localTypeOf[java.sql.Date]) => Schema(DateType, nullable = true) + case t if isSubtype(t, localTypeOf[java.time.Duration]) => + Schema(CalendarIntervalType, nullable = true) case t if isSubtype(t, localTypeOf[BigDecimal]) => Schema(DecimalType.SYSTEM_DEFAULT, nullable = true) case t if isSubtype(t, localTypeOf[java.math.BigDecimal]) => From fc275f30079a8897b967612983c9582d100fee52 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 11 Oct 2019 14:09:38 +0300 Subject: [PATCH 07/19] Update row encoder --- .../org/apache/spark/sql/catalyst/encoders/RowEncoder.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala index 765018f07d87..c2a39a274a94 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala @@ -57,6 +57,7 @@ import org.apache.spark.sql.types._ * ArrayType -> scala.collection.Seq or Array * MapType -> scala.collection.Map * StructType -> org.apache.spark.sql.Row + * CalendarIntervalType -> java.time.Duration * }}} */ object RowEncoder { @@ -108,6 +109,8 @@ object RowEncoder { createSerializerForSqlDate(inputObject) } + case CalendarIntervalType => createSerializerForJavaDuration(inputObject) + case d: DecimalType => CheckOverflow(StaticInvoke( Decimal.getClass, @@ -226,6 +229,7 @@ object RowEncoder { } else { ObjectType(classOf[java.sql.Date]) } + case CalendarIntervalType => ObjectType(classOf[java.time.Duration]) case _: DecimalType => ObjectType(classOf[java.math.BigDecimal]) case StringType => ObjectType(classOf[java.lang.String]) case _: ArrayType => ObjectType(classOf[scala.collection.Seq[_]]) @@ -281,6 +285,8 @@ object RowEncoder { createDeserializerForSqlDate(input) } + case CalendarIntervalType => createDeserializerForDuration(input) + case _: DecimalType => createDeserializerForJavaBigDecimal(input, returnNullable = false) case StringType => createDeserializerForString(input, returnNullable = false) From 6f231e5a64143da922ccbd4378bb0fbe14cac749 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 11 Oct 2019 14:28:31 +0300 Subject: [PATCH 08/19] Add the test "converting java.time.Duration to CalendarIntervalType" --- .../catalyst/CatalystTypeConvertersSuite.scala | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala index b9e7cf304989..9d8f0432588d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst -import java.time.{Instant, LocalDate} +import java.time.{Duration, Instant, LocalDate} import org.apache.spark.SparkFunSuite import org.apache.spark.sql.Row @@ -216,4 +216,18 @@ class CatalystTypeConvertersSuite extends SparkFunSuite with SQLHelper { } } } + + test("converting java.time.Duration to CalendarIntervalType") { + Seq( + "P0D", + "PT0.000001S", + "PT-0.000001S", + "P1DT2H3M4.000001S", + "P-1DT2H3M4.000001S").foreach { time => + val input = Duration.parse(time) + val result = CatalystTypeConverters.convertToCatalyst(input) + val expected = DateTimeUtils.durationToInterval(input) + assert(result === expected) + } + } } From eb052078693b5992044d08b41ddeba639d3e6795 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 11 Oct 2019 14:33:02 +0300 Subject: [PATCH 09/19] Add tests to CatalystTypeConvertersSuite --- .../sql/catalyst/CatalystTypeConvertersSuite.scala | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala index 9d8f0432588d..07333da7e4e8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.plans.SQLHelper import org.apache.spark.sql.catalyst.util.{DateTimeUtils, GenericArrayData} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} class CatalystTypeConvertersSuite extends SparkFunSuite with SQLHelper { @@ -230,4 +230,16 @@ class CatalystTypeConvertersSuite extends SparkFunSuite with SQLHelper { assert(result === expected) } } + + test("converting CalendarIntervalType to java.time.Duration") { + Seq( + CalendarInterval.fromString("interval 0 days"), + CalendarInterval.fromString("interval 1 month"), + CalendarInterval.fromString("interval 1 month 1 microsecond"), + CalendarInterval.fromString("interval -1 month -1 microsecond"), + CalendarInterval.fromString("interval 10000 years -1 microsecond")).foreach { i => + val duration = DateTimeUtils.intervalToDuration(i) + assert(CatalystTypeConverters.createToScalaConverter(CalendarIntervalType)(i) === duration) + } + } } From 0476082bfe0468e8a5a7e935e5448cf3a2d6760a Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 11 Oct 2019 14:45:41 +0300 Subject: [PATCH 10/19] Add tests to RowEncoderSuite --- .../apache/spark/sql/catalyst/ScalaReflection.scala | 2 +- .../sql/catalyst/encoders/RowEncoderSuite.scala | 12 ++++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index b4a7f13b087e..8910150dd19e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -138,7 +138,7 @@ object ScalaReflection extends ScalaReflection { */ def isNativeType(dt: DataType): Boolean = dt match { case NullType | BooleanType | ByteType | ShortType | IntegerType | LongType | - FloatType | DoubleType | BinaryType | CalendarIntervalType => true + FloatType | DoubleType | BinaryType => true case _ => false } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala index 1a1cab823d4f..24101175a860 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala @@ -332,6 +332,18 @@ class RowEncoderSuite extends CodegenInterpretedPlanTest { } } + test("encoding/decoding CalendarIntervalType to/from java.time.Duration") { + withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> "true") { + val schema = new StructType().add("i", CalendarIntervalType) + val encoder = RowEncoder(schema).resolveAndBind() + val duration = java.time.Duration.parse("P2DT3H4M") + val row = encoder.toRow(Row(duration)) + assert(row.getInterval(0) === DateTimeUtils.durationToInterval(duration)) + val readback = encoder.fromRow(row) + assert(readback.get(0).equals(duration)) + } + } + for { elementType <- Seq(IntegerType, StringType) containsNull <- Seq(true, false) From ce397ce52aab62c477dd23e5d40d43cf8d948692 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 11 Oct 2019 14:46:49 +0300 Subject: [PATCH 11/19] Move the row encoder test out of flag --- .../sql/catalyst/encoders/RowEncoderSuite.scala | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala index 24101175a860..4abf2b5cdd12 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala @@ -333,15 +333,13 @@ class RowEncoderSuite extends CodegenInterpretedPlanTest { } test("encoding/decoding CalendarIntervalType to/from java.time.Duration") { - withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> "true") { - val schema = new StructType().add("i", CalendarIntervalType) - val encoder = RowEncoder(schema).resolveAndBind() - val duration = java.time.Duration.parse("P2DT3H4M") - val row = encoder.toRow(Row(duration)) - assert(row.getInterval(0) === DateTimeUtils.durationToInterval(duration)) - val readback = encoder.fromRow(row) - assert(readback.get(0).equals(duration)) - } + val schema = new StructType().add("i", CalendarIntervalType) + val encoder = RowEncoder(schema).resolveAndBind() + val duration = java.time.Duration.parse("P2DT3H4M") + val row = encoder.toRow(Row(duration)) + assert(row.getInterval(0) === DateTimeUtils.durationToInterval(duration)) + val readback = encoder.fromRow(row) + assert(readback.get(0).equals(duration)) } for { From 6198a84cfb171e045d62735ce55ba42a2b8bfa1d Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 11 Oct 2019 14:54:40 +0300 Subject: [PATCH 12/19] Add a test to LiteralExpressionSuite --- .../sql/catalyst/expressions/LiteralExpressionSuite.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralExpressionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralExpressionSuite.scala index 269f1a09ac53..e4e76f30af7f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralExpressionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralExpressionSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.expressions import java.nio.charset.StandardCharsets -import java.time.{Instant, LocalDate, LocalDateTime, ZoneOffset} +import java.time.{Duration, Instant, LocalDate, LocalDateTime, ZoneOffset} import java.util.TimeZone import scala.reflect.runtime.universe.TypeTag @@ -70,12 +70,13 @@ class LiteralExpressionSuite extends SparkFunSuite with ExpressionEvalHelper { withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> "false") { checkEvaluation(Literal.default(DateType), DateTimeUtils.toJavaDate(0)) checkEvaluation(Literal.default(TimestampType), DateTimeUtils.toJavaTimestamp(0L)) + checkEvaluation(Literal.default(CalendarIntervalType), new CalendarInterval(0, 0L)) } withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> "true") { checkEvaluation(Literal.default(DateType), LocalDate.ofEpochDay(0)) checkEvaluation(Literal.default(TimestampType), Instant.ofEpochSecond(0)) + checkEvaluation(Literal.default(CalendarIntervalType), Duration.ofSeconds(0)) } - checkEvaluation(Literal.default(CalendarIntervalType), new CalendarInterval(0, 0L)) checkEvaluation(Literal.default(ArrayType(StringType)), Array()) checkEvaluation(Literal.default(MapType(IntegerType, StringType)), Map()) checkEvaluation(Literal.default(StructType(StructField("a", StringType) :: Nil)), Row("")) From 78b9e5801d473f5010c3ff1bc763250a3e060ad7 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 11 Oct 2019 15:05:28 +0300 Subject: [PATCH 13/19] Add more tests to LiteralExpressionSuite --- .../sql/catalyst/expressions/literals.scala | 6 ++++-- .../expressions/LiteralExpressionSuite.scala | 18 ++++++++++++++++++ 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala index f03174babcd9..78997e9bc08a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala @@ -27,7 +27,7 @@ import java.lang.{Short => JavaShort} import java.math.{BigDecimal => JavaBigDecimal} import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} -import java.time.{Instant, LocalDate} +import java.time.{Duration, Instant, LocalDate} import java.util import java.util.Objects import javax.xml.bind.DatatypeConverter @@ -42,7 +42,7 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, ScalaReflection} import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.catalyst.util.DateTimeUtils.instantToMicros +import org.apache.spark.sql.catalyst.util.DateTimeUtils.{durationToInterval, instantToMicros} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types._ @@ -71,6 +71,7 @@ object Literal { case t: Timestamp => Literal(DateTimeUtils.fromJavaTimestamp(t), TimestampType) case ld: LocalDate => Literal(ld.toEpochDay.toInt, DateType) case d: Date => Literal(DateTimeUtils.fromJavaDate(d), DateType) + case d: Duration => Literal(durationToInterval(d), CalendarIntervalType) case a: Array[Byte] => Literal(a, BinaryType) case a: collection.mutable.WrappedArray[_] => apply(a.array) case a: Array[_] => @@ -120,6 +121,7 @@ object Literal { case _ if clz == classOf[BigInt] => DecimalType.SYSTEM_DEFAULT case _ if clz == classOf[BigDecimal] => DecimalType.SYSTEM_DEFAULT case _ if clz == classOf[CalendarInterval] => CalendarIntervalType + case _ if clz == classOf[Duration] => CalendarIntervalType case _ if clz.isArray => ArrayType(componentTypeToDataType(clz.getComponentType)) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralExpressionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralExpressionSuite.scala index e4e76f30af7f..b8547bb2bc64 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralExpressionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralExpressionSuite.scala @@ -317,4 +317,22 @@ class LiteralExpressionSuite extends SparkFunSuite with ExpressionEvalHelper { assert(literalStr === expected) } } + + test("construct literals from java.time.Duration") { + Seq( + Duration.ofSeconds(0), + Duration.ofSeconds(1, 999999000), + Duration.ofSeconds(-1, -999999000), + Duration.ofDays(365 * 10000), + Duration.ofDays(-365 * 10000)).foreach { duration => + checkEvaluation(Literal(duration), duration) + } + } + + test("construct literals from arrays of java.time.Duration") { + val duration0 = Duration.ofMinutes(10) + checkEvaluation(Literal(Array(duration0)), Array(duration0)) + val duration1 = Duration.ofHours(3) + checkEvaluation(Literal(Array(duration0, duration1)), Array(duration0, duration1)) + } } From 114bbc70b805ad81d8967c28bcf24f23cc620b18 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 11 Oct 2019 15:07:26 +0300 Subject: [PATCH 14/19] Implicit newDurationEncoder --- .../src/main/scala/org/apache/spark/sql/SQLImplicits.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala index 71cbc3ab14d9..3da744de1f3a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala @@ -88,6 +88,9 @@ abstract class SQLImplicits extends LowPrioritySQLImplicits { /** @since 3.0.0 */ implicit def newInstantEncoder: Encoder[java.time.Instant] = Encoders.INSTANT + /** @since 3.0.0 */ + implicit def newDurationEncoder: Encoder[java.time.Duration] = Encoders.DURATION + // Boxed primitives /** @since 2.0.0 */ From 728c8015b7e5c218f49a63c0e433c87ddd5b640a Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 11 Oct 2019 15:10:12 +0300 Subject: [PATCH 15/19] Add the test "implicit encoder for Duration" --- .../src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 9c50e374f74d..15047672b317 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -1843,6 +1843,11 @@ class DatasetSuite extends QueryTest with SharedSparkSession { assert(spark.range(1).map { _ => instant }.head === instant) } + test("implicit encoder for Duration") { + val duration = java.time.Duration.ofMinutes(10) + assert(spark.range(1).map { _ => duration }.head === duration) + } + val dotColumnTestModes = Table( ("caseSensitive", "colName"), ("true", "field.1"), From 938533602175bc759d690f9774fec77c9cf0d0d4 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 11 Oct 2019 15:14:14 +0300 Subject: [PATCH 16/19] Add a test to JavaDatasetSuite --- .../java/test/org/apache/spark/sql/JavaDatasetSuite.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java index 1e5f55e494b7..0c1d29b53b89 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java @@ -21,6 +21,7 @@ import java.math.BigDecimal; import java.sql.Date; import java.sql.Timestamp; +import java.time.Duration; import java.time.Instant; import java.time.LocalDate; import java.util.*; @@ -411,6 +412,14 @@ public void testLocalDateAndInstantEncoders() { Assert.assertEquals(data, ds.collectAsList()); } + @Test + public void testDurationEncoder() { + Encoder encoder = Encoders.DURATION(); + List data = Arrays.asList(Duration.ofHours(2)); + Dataset ds = spark.createDataset(data, encoder); + Assert.assertEquals(data, ds.collectAsList()); + } + public static class KryoSerializable { String value; From 568a162662ff01aee36364e794cdbe990bcbef21 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 11 Oct 2019 15:16:03 +0300 Subject: [PATCH 17/19] Fix default test --- .../sql/catalyst/expressions/LiteralExpressionSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralExpressionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralExpressionSuite.scala index b8547bb2bc64..3ed6ef85cd30 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralExpressionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralExpressionSuite.scala @@ -70,13 +70,13 @@ class LiteralExpressionSuite extends SparkFunSuite with ExpressionEvalHelper { withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> "false") { checkEvaluation(Literal.default(DateType), DateTimeUtils.toJavaDate(0)) checkEvaluation(Literal.default(TimestampType), DateTimeUtils.toJavaTimestamp(0L)) - checkEvaluation(Literal.default(CalendarIntervalType), new CalendarInterval(0, 0L)) } withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> "true") { checkEvaluation(Literal.default(DateType), LocalDate.ofEpochDay(0)) checkEvaluation(Literal.default(TimestampType), Instant.ofEpochSecond(0)) - checkEvaluation(Literal.default(CalendarIntervalType), Duration.ofSeconds(0)) } + checkEvaluation(Literal.default(CalendarIntervalType), new CalendarInterval(0, 0L)) + checkEvaluation(Literal.default(CalendarIntervalType), Duration.ofSeconds(0)) checkEvaluation(Literal.default(ArrayType(StringType)), Array()) checkEvaluation(Literal.default(MapType(IntegerType, StringType)), Map()) checkEvaluation(Literal.default(StructType(StructField("a", StringType) :: Nil)), Row("")) From c188351c774f8d028c61b50e780b1bd0d8f188a0 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 11 Oct 2019 15:26:28 +0300 Subject: [PATCH 18/19] UDF test --- .../src/test/scala/org/apache/spark/sql/UDFSuite.scala | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala index 268fb65144e7..e6477d0956ae 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala @@ -516,6 +516,14 @@ class UDFSuite extends QueryTest with SharedSparkSession { } } + test("Using java.time.Duration in UDF") { + val expected = java.time.Duration.ofMinutes(40).plusDays(1) + val plusDay = udf((i: java.time.Duration) => i.plusDays(1)) + val df = spark.sql("SELECT interval 40 minutes as i") + .select(plusDay('i)) + assert(df.collect().toSeq === Seq(Row(expected))) + } + test("SPARK-28321 0-args Java UDF should not be called only once") { val nonDeterministicJavaUDF = udf( new UDF0[Int] { From 4aeb9b0066090a39e1a355d8e4e13ddc53af8af9 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 11 Oct 2019 15:32:00 +0300 Subject: [PATCH 19/19] Add a test to JavaUDFSuite --- .../java/test/org/apache/spark/sql/JavaUDFSuite.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaUDFSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaUDFSuite.java index 9af5023acf39..401011cf3229 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaUDFSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaUDFSuite.java @@ -18,6 +18,7 @@ package test.org.apache.spark.sql; import java.io.Serializable; +import java.time.Duration; import java.time.LocalDate; import java.util.List; @@ -139,4 +140,14 @@ public void udf7Test() { spark.conf().set(SQLConf.DATETIME_JAVA8API_ENABLED().key(), originConf); } } + + @SuppressWarnings("unchecked") + @Test + public void udf8Test() { + spark.udf().register( + "plusDay", + (Duration d) -> d.plusDays(1), DataTypes.CalendarIntervalType); + Row result = spark.sql("SELECT plusDay(interval 40 minutes)").head(); + Assert.assertEquals(Duration.ofMinutes(40).plusDays(1), result.get(0)); + } }