Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -304,18 +304,23 @@ object CatalystTypeConverters {
row.getUTF8String(column).toString
}

private object DateConverter extends CatalystTypeConverter[Date, Date, Any] {
override def toCatalystImpl(scalaValue: Date): Int = DateTimeUtils.fromJavaDate(scalaValue)
private object DateConverter extends CatalystTypeConverter[Any, Date, Any] {
override def toCatalystImpl(scalaValue: Any): Int = scalaValue match {
case d: Date => DateTimeUtils.fromJavaDate(d)
case l: LocalDate => DateTimeUtils.localDateToDays(l)
case other => throw new IllegalArgumentException(
s"The value (${other.toString}) of the type (${other.getClass.getCanonicalName}) "
+ s"cannot be converted to the ${DateType.sql} type")
}
override def toScala(catalystValue: Any): Date =
if (catalystValue == null) null else DateTimeUtils.toJavaDate(catalystValue.asInstanceOf[Int])
override def toScalaImpl(row: InternalRow, column: Int): Date =
DateTimeUtils.toJavaDate(row.getInt(column))
}

private object LocalDateConverter extends CatalystTypeConverter[LocalDate, LocalDate, Any] {
override def toCatalystImpl(scalaValue: LocalDate): Int = {
DateTimeUtils.localDateToDays(scalaValue)
}
private object LocalDateConverter extends CatalystTypeConverter[Any, LocalDate, Any] {
override def toCatalystImpl(scalaValue: Any): Int =
DateConverter.toCatalystImpl(scalaValue)
override def toScala(catalystValue: Any): LocalDate = {
if (catalystValue == null) null
else DateTimeUtils.daysToLocalDate(catalystValue.asInstanceOf[Int])
Expand All @@ -324,19 +329,24 @@ object CatalystTypeConverters {
DateTimeUtils.daysToLocalDate(row.getInt(column))
}

private object TimestampConverter extends CatalystTypeConverter[Timestamp, Timestamp, Any] {
override def toCatalystImpl(scalaValue: Timestamp): Long =
DateTimeUtils.fromJavaTimestamp(scalaValue)
private object TimestampConverter extends CatalystTypeConverter[Any, Timestamp, Any] {
override def toCatalystImpl(scalaValue: Any): Long = scalaValue match {
case t: Timestamp => DateTimeUtils.fromJavaTimestamp(t)
case i: Instant => DateTimeUtils.instantToMicros(i)
case other => throw new IllegalArgumentException(
s"The value (${other.toString}) of the type (${other.getClass.getCanonicalName}) "
+ s"cannot be converted to the ${TimestampType.sql} type")
}
override def toScala(catalystValue: Any): Timestamp =
if (catalystValue == null) null
else DateTimeUtils.toJavaTimestamp(catalystValue.asInstanceOf[Long])
override def toScalaImpl(row: InternalRow, column: Int): Timestamp =
DateTimeUtils.toJavaTimestamp(row.getLong(column))
}

private object InstantConverter extends CatalystTypeConverter[Instant, Instant, Any] {
override def toCatalystImpl(scalaValue: Instant): Long =
DateTimeUtils.instantToMicros(scalaValue)
private object InstantConverter extends CatalystTypeConverter[Any, Instant, Any] {
override def toCatalystImpl(scalaValue: Any): Long =
TimestampConverter.toCatalystImpl(scalaValue)
override def toScala(catalystValue: Any): Instant =
if (catalystValue == null) null
else DateTimeUtils.microsToInstant(catalystValue.asInstanceOf[Long])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData
import org.apache.spark.sql.catalyst.plans.SQLHelper
import org.apache.spark.sql.catalyst.util.{DateTimeUtils, GenericArrayData, IntervalUtils}
import org.apache.spark.sql.catalyst.util.{DateTimeConstants, DateTimeUtils, GenericArrayData, IntervalUtils}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -197,8 +197,8 @@ class CatalystTypeConvertersSuite extends SparkFunSuite with SQLHelper {
"1970-01-01",
"1972-12-31",
"2019-02-16",
"2119-03-16").foreach { timestamp =>
val input = LocalDate.parse(timestamp)
"2119-03-16").foreach { date =>
val input = LocalDate.parse(date)
val result = CatalystTypeConverters.convertToCatalyst(input)
val expected = DateTimeUtils.localDateToDays(input)
assert(result === expected)
Expand Down Expand Up @@ -294,4 +294,44 @@ class CatalystTypeConvertersSuite extends SparkFunSuite with SQLHelper {
}
}
}

test("SPARK-35204: createToCatalystConverter for date") {
Seq(true, false).foreach { enable =>
withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> enable.toString) {
Seq(-1234, 0, 1234).foreach { days =>
val converter = CatalystTypeConverters.createToCatalystConverter(DateType)

val ld = LocalDate.ofEpochDay(days)
val result1 = converter(ld)

val d = java.sql.Date.valueOf(ld)
val result2 = converter(d)

val expected = DateTimeUtils.localDateToDays(ld)
assert(result1 === expected)
assert(result2 === expected)
}
}
}
}

test("SPARK-35204: createToCatalystConverter for timestamp") {
Seq(true, false).foreach { enable =>
withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> enable.toString) {
Seq(-1234, 0, 1234).foreach { seconds =>
val converter = CatalystTypeConverters.createToCatalystConverter(TimestampType)

val i = Instant.ofEpochSecond(seconds)
val result1 = converter(i)

val t = new java.sql.Timestamp(seconds * DateTimeConstants.MILLIS_PER_SECOND)
val result2 = converter(t)

val expected = seconds * DateTimeConstants.MICROS_PER_SECOND
assert(result1 === expected)
assert(result2 === expected)
}
}
}
}
}