Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ public static Object read(
if (dataType instanceof TimestampType) {
return obj.getLong(ordinal);
}
if (dataType instanceof TimestampWithoutTZType) {
return obj.getLong(ordinal);
}
if (dataType instanceof CalendarIntervalType) {
return obj.getInterval(ordinal);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,14 @@ object Encoders {
*/
def LOCALDATE: Encoder[java.time.LocalDate] = ExpressionEncoder()

/**
* Creates an encoder that serializes instances of the `java.time.LocalDateTime` class
* to the internal representation of nullable Catalyst's DateType.
*
* @since 3.2.0
*/
def LOCALDATETIME: Encoder[java.time.LocalDateTime] = ExpressionEncoder()

/**
* An encoder for nullable timestamp type.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.lang.{Iterable => JavaIterable}
import java.math.{BigDecimal => JavaBigDecimal}
import java.math.{BigInteger => JavaBigInteger}
import java.sql.{Date, Timestamp}
import java.time.{Duration, Instant, LocalDate, Period}
import java.time.{Duration, Instant, LocalDate, LocalDateTime, Period}
import java.util.{Map => JavaMap}
import javax.annotation.Nullable

Expand Down Expand Up @@ -66,6 +66,7 @@ object CatalystTypeConverters {
case DateType => DateConverter
case TimestampType if SQLConf.get.datetimeJava8ApiEnabled => InstantConverter
case TimestampType => TimestampConverter
case TimestampWithoutTZType => TimestampWithoutTZConverter
case dt: DecimalType => new DecimalConverter(dt)
case BooleanType => BooleanConverter
case ByteType => ByteConverter
Expand Down Expand Up @@ -354,6 +355,23 @@ object CatalystTypeConverters {
DateTimeUtils.microsToInstant(row.getLong(column))
}

private object TimestampWithoutTZConverter
extends CatalystTypeConverter[Any, LocalDateTime, Any] {
override def toCatalystImpl(scalaValue: Any): Any = scalaValue match {
case l: LocalDateTime => DateTimeUtils.localDateTimeToMicros(l)
case other => throw new IllegalArgumentException(
s"The value (${other.toString}) of the type (${other.getClass.getCanonicalName}) "
+ s"cannot be converted to the ${TimestampWithoutTZType.sql} type")
}

override def toScala(catalystValue: Any): LocalDateTime =
if (catalystValue == null) null
else DateTimeUtils.microsToLocalDateTime(catalystValue.asInstanceOf[Long])

override def toScalaImpl(row: InternalRow, column: Int): LocalDateTime =
DateTimeUtils.microsToLocalDateTime(row.getLong(column))
}

private class DecimalConverter(dataType: DecimalType)
extends CatalystTypeConverter[Any, JavaBigDecimal, Decimal] {

Expand Down Expand Up @@ -489,6 +507,7 @@ object CatalystTypeConverters {
case ld: LocalDate => LocalDateConverter.toCatalyst(ld)
case t: Timestamp => TimestampConverter.toCatalyst(t)
case i: Instant => InstantConverter.toCatalyst(i)
case l: LocalDateTime => TimestampWithoutTZConverter.toCatalyst(l)
case d: BigDecimal => new DecimalConverter(DecimalType(d.precision, d.scale)).toCatalyst(d)
case d: JavaBigDecimal => new DecimalConverter(DecimalType(d.precision, d.scale)).toCatalyst(d)
case seq: Seq[Any] => new GenericArrayData(seq.map(convertToCatalyst).toArray)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,15 @@ object DeserializerBuildHelper {
returnNullable = false)
}

def createDeserializerForLocalDateTime(path: Expression): Expression = {
StaticInvoke(
DateTimeUtils.getClass,
ObjectType(classOf[java.time.LocalDateTime]),
"microsToLocalDateTime",
path :: Nil,
returnNullable = false)
}

def createDeserializerForJavaBigDecimal(
path: Expression,
returnNullable: Boolean): Expression = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ object InternalRow {
case ShortType => (input, ordinal) => input.getShort(ordinal)
case IntegerType | DateType | YearMonthIntervalType =>
(input, ordinal) => input.getInt(ordinal)
case LongType | TimestampType | DayTimeIntervalType =>
case LongType | TimestampType | TimestampWithoutTZType | DayTimeIntervalType =>
(input, ordinal) => input.getLong(ordinal)
case FloatType => (input, ordinal) => input.getFloat(ordinal)
case DoubleType => (input, ordinal) => input.getDouble(ordinal)
Expand Down Expand Up @@ -171,7 +171,7 @@ object InternalRow {
case ShortType => (input, v) => input.setShort(ordinal, v.asInstanceOf[Short])
case IntegerType | DateType | YearMonthIntervalType =>
(input, v) => input.setInt(ordinal, v.asInstanceOf[Int])
case LongType | TimestampType | DayTimeIntervalType =>
case LongType | TimestampType | TimestampWithoutTZType | DayTimeIntervalType =>
(input, v) => input.setLong(ordinal, v.asInstanceOf[Long])
case FloatType => (input, v) => input.setFloat(ordinal, v.asInstanceOf[Float])
case DoubleType => (input, v) => input.setDouble(ordinal, v.asInstanceOf[Double])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ object JavaTypeInference {
case c: Class[_] if c == classOf[java.sql.Date] => (DateType, true)
case c: Class[_] if c == classOf[java.time.Instant] => (TimestampType, true)
case c: Class[_] if c == classOf[java.sql.Timestamp] => (TimestampType, true)
case c: Class[_] if c == classOf[java.time.LocalDateTime] => (TimestampWithoutTZType, true)
case c: Class[_] if c == classOf[java.time.Duration] => (DayTimeIntervalType, true)
case c: Class[_] if c == classOf[java.time.Period] => (YearMonthIntervalType, true)

Expand Down Expand Up @@ -250,6 +251,9 @@ object JavaTypeInference {
case c if c == classOf[java.sql.Timestamp] =>
createDeserializerForSqlTimestamp(path)

case c if c == classOf[java.time.LocalDateTime] =>
createDeserializerForLocalDateTime(path)

case c if c == classOf[java.time.Duration] =>
createDeserializerForDuration(path)

Expand Down Expand Up @@ -409,6 +413,9 @@ object JavaTypeInference {

case c if c == classOf[java.sql.Timestamp] => createSerializerForSqlTimestamp(inputObject)

case c if c == classOf[java.time.LocalDateTime] =>
createSerializerForLocalDateTime(inputObject)

case c if c == classOf[java.time.LocalDate] => createSerializerForJavaLocalDate(inputObject)

case c if c == classOf[java.sql.Date] => createSerializerForSqlDate(inputObject)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,9 @@ object ScalaReflection extends ScalaReflection {
case t if isSubtype(t, localTypeOf[java.sql.Timestamp]) =>
createDeserializerForSqlTimestamp(path)

case t if isSubtype(t, localTypeOf[java.time.LocalDateTime]) =>
createDeserializerForLocalDateTime(path)

case t if isSubtype(t, localTypeOf[java.time.Duration]) =>
createDeserializerForDuration(path)

Expand Down Expand Up @@ -524,6 +527,9 @@ object ScalaReflection extends ScalaReflection {
case t if isSubtype(t, localTypeOf[java.sql.Timestamp]) =>
createSerializerForSqlTimestamp(inputObject)

case t if isSubtype(t, localTypeOf[java.time.LocalDateTime]) =>
createSerializerForLocalDateTime(inputObject)

case t if isSubtype(t, localTypeOf[java.time.LocalDate]) =>
createSerializerForJavaLocalDate(inputObject)

Expand Down Expand Up @@ -746,6 +752,8 @@ object ScalaReflection extends ScalaReflection {
Schema(TimestampType, nullable = true)
case t if isSubtype(t, localTypeOf[java.sql.Timestamp]) =>
Schema(TimestampType, nullable = true)
case t if isSubtype(t, localTypeOf[java.time.LocalDateTime]) =>
Schema(TimestampWithoutTZType, nullable = true)
case t if isSubtype(t, localTypeOf[java.time.LocalDate]) => Schema(DateType, nullable = true)
case t if isSubtype(t, localTypeOf[java.sql.Date]) => Schema(DateType, nullable = true)
case t if isSubtype(t, localTypeOf[CalendarInterval]) =>
Expand Down Expand Up @@ -850,6 +858,7 @@ object ScalaReflection extends ScalaReflection {
StringType -> classOf[UTF8String],
DateType -> classOf[DateType.InternalType],
TimestampType -> classOf[TimestampType.InternalType],
TimestampWithoutTZType -> classOf[TimestampWithoutTZType.InternalType],
BinaryType -> classOf[BinaryType.InternalType],
CalendarIntervalType -> classOf[CalendarInterval],
DayTimeIntervalType -> classOf[DayTimeIntervalType.InternalType],
Expand All @@ -866,6 +875,7 @@ object ScalaReflection extends ScalaReflection {
DoubleType -> classOf[java.lang.Double],
DateType -> classOf[java.lang.Integer],
TimestampType -> classOf[java.lang.Long],
TimestampWithoutTZType -> classOf[java.lang.Long],
DayTimeIntervalType -> classOf[java.lang.Long],
YearMonthIntervalType -> classOf[java.lang.Integer]
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,15 @@ object SerializerBuildHelper {
returnNullable = false)
}

def createSerializerForLocalDateTime(inputObject: Expression): Expression = {
StaticInvoke(
DateTimeUtils.getClass,
TimestampWithoutTZType,
"localDateTimeToMicros",
inputObject :: Nil,
returnNullable = false)
}

def createSerializerForJavaLocalDate(inputObject: Expression): Expression = {
StaticInvoke(
DateTimeUtils.getClass,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,10 @@ package object dsl {
/** Creates a new AttributeReference of type timestamp */
def timestamp: AttributeReference = AttributeReference(s, TimestampType, nullable = true)()

/** Creates a new AttributeReference of type timestamp without time zone */
def timestampWithoutTZ: AttributeReference =
AttributeReference(s, TimestampWithoutTZType, nullable = true)()

/** Creates a new AttributeReference of the day-time interval type */
def dayTimeInterval: AttributeReference = {
AttributeReference(s, DayTimeIntervalType, nullable = true)()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ import org.apache.spark.sql.types._
* TimestampType -> java.sql.Timestamp if spark.sql.datetime.java8API.enabled is false
* TimestampType -> java.time.Instant if spark.sql.datetime.java8API.enabled is true
*
* TimestampWithoutTZType -> java.time.LocalDateTime
*
* DayTimeIntervalType -> java.time.Duration
* YearMonthIntervalType -> java.time.Period
*
Expand Down Expand Up @@ -103,6 +105,8 @@ object RowEncoder {
createSerializerForSqlTimestamp(inputObject)
}

case TimestampWithoutTZType => createSerializerForLocalDateTime(inputObject)

case DateType =>
if (SQLConf.get.datetimeJava8ApiEnabled) {
createSerializerForJavaLocalDate(inputObject)
Expand Down Expand Up @@ -226,6 +230,8 @@ object RowEncoder {
} else {
ObjectType(classOf[java.sql.Timestamp])
}
case TimestampWithoutTZType =>
ObjectType(classOf[java.time.LocalDateTime])
case DateType =>
if (SQLConf.get.datetimeJava8ApiEnabled) {
ObjectType(classOf[java.time.LocalDate])
Expand Down Expand Up @@ -281,6 +287,9 @@ object RowEncoder {
createDeserializerForSqlTimestamp(input)
}

case TimestampWithoutTZType =>
createDeserializerForLocalDateTime(input)

case DateType =>
if (SQLConf.get.datetimeJava8ApiEnabled) {
createDeserializerForLocalDate(input)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ object InterpretedUnsafeProjection {
case IntegerType | DateType | YearMonthIntervalType =>
(v, i) => writer.write(i, v.getInt(i))

case LongType | TimestampType | DayTimeIntervalType =>
case LongType | TimestampType | TimestampWithoutTZType | DayTimeIntervalType =>
(v, i) => writer.write(i, v.getLong(i))

case FloatType =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,8 @@ final class SpecificInternalRow(val values: Array[MutableValue]) extends BaseGen
private[this] def dataTypeToMutableValue(dataType: DataType): MutableValue = dataType match {
// We use INT for DATE and YearMonthIntervalType internally
case IntegerType | DateType | YearMonthIntervalType => new MutableInt
// We use Long for Timestamp and DayTimeInterval internally
case LongType | TimestampType | DayTimeIntervalType => new MutableLong
// We use Long for Timestamp, Timestamp without time zone and DayTimeInterval internally
case LongType | TimestampType | TimestampWithoutTZType | DayTimeIntervalType => new MutableLong
case FloatType => new MutableFloat
case DoubleType => new MutableDouble
case BooleanType => new MutableBoolean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1817,7 +1817,7 @@ object CodeGenerator extends Logging {
case ByteType => JAVA_BYTE
case ShortType => JAVA_SHORT
case IntegerType | DateType | YearMonthIntervalType => JAVA_INT
case LongType | TimestampType | DayTimeIntervalType => JAVA_LONG
case LongType | TimestampType | TimestampWithoutTZType | DayTimeIntervalType => JAVA_LONG
case FloatType => JAVA_FLOAT
case DoubleType => JAVA_DOUBLE
case _: DecimalType => "Decimal"
Expand All @@ -1838,7 +1838,8 @@ object CodeGenerator extends Logging {
case ByteType => java.lang.Byte.TYPE
case ShortType => java.lang.Short.TYPE
case IntegerType | DateType | YearMonthIntervalType => java.lang.Integer.TYPE
case LongType | TimestampType | DayTimeIntervalType => java.lang.Long.TYPE
case LongType | TimestampType | TimestampWithoutTZType | DayTimeIntervalType =>
java.lang.Long.TYPE
case FloatType => java.lang.Float.TYPE
case DoubleType => java.lang.Double.TYPE
case _: DecimalType => classOf[Decimal]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import java.lang.{Short => JavaShort}
import java.math.{BigDecimal => JavaBigDecimal}
import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp}
import java.time.{Duration, Instant, LocalDate, Period}
import java.time.{Duration, Instant, LocalDate, LocalDateTime, Period}
import java.util
import java.util.Objects
import javax.xml.bind.DatatypeConverter
Expand Down Expand Up @@ -80,6 +80,7 @@ object Literal {
case d: Decimal => Literal(d, DecimalType(Math.max(d.precision, d.scale), d.scale))
case i: Instant => Literal(instantToMicros(i), TimestampType)
case t: Timestamp => Literal(DateTimeUtils.fromJavaTimestamp(t), TimestampType)
case l: LocalDateTime => Literal(DateTimeUtils.localDateTimeToMicros(l), TimestampWithoutTZType)
case ld: LocalDate => Literal(ld.toEpochDay.toInt, DateType)
case d: Date => Literal(DateTimeUtils.fromJavaDate(d), DateType)
case d: Duration => Literal(durationToMicros(d), DayTimeIntervalType)
Expand Down Expand Up @@ -119,6 +120,7 @@ object Literal {
case _ if clz == classOf[Date] => DateType
case _ if clz == classOf[Instant] => TimestampType
case _ if clz == classOf[Timestamp] => TimestampType
case _ if clz == classOf[LocalDateTime] => TimestampWithoutTZType
case _ if clz == classOf[Duration] => DayTimeIntervalType
case _ if clz == classOf[Period] => YearMonthIntervalType
case _ if clz == classOf[JavaBigDecimal] => DecimalType.SYSTEM_DEFAULT
Expand Down Expand Up @@ -177,6 +179,7 @@ object Literal {
case dt: DecimalType => Literal(Decimal(0, dt.precision, dt.scale))
case DateType => create(0, DateType)
case TimestampType => create(0L, TimestampType)
case TimestampWithoutTZType => create(0L, TimestampWithoutTZType)
case DayTimeIntervalType => create(0L, DayTimeIntervalType)
case YearMonthIntervalType => create(0, YearMonthIntervalType)
case StringType => Literal("")
Expand All @@ -198,7 +201,8 @@ object Literal {
case ByteType => v.isInstanceOf[Byte]
case ShortType => v.isInstanceOf[Short]
case IntegerType | DateType | YearMonthIntervalType => v.isInstanceOf[Int]
case LongType | TimestampType | DayTimeIntervalType => v.isInstanceOf[Long]
case LongType | TimestampType | TimestampWithoutTZType | DayTimeIntervalType =>
v.isInstanceOf[Long]
case FloatType => v.isInstanceOf[Float]
case DoubleType => v.isInstanceOf[Double]
case _: DecimalType => v.isInstanceOf[Decimal]
Expand Down Expand Up @@ -422,7 +426,7 @@ case class Literal (value: Any, dataType: DataType) extends LeafExpression {
}
case ByteType | ShortType =>
ExprCode.forNonNullValue(JavaCode.expression(s"($javaType)$value", dataType))
case TimestampType | LongType | DayTimeIntervalType =>
case TimestampType | TimestampWithoutTZType | LongType | DayTimeIntervalType =>
toExprCode(s"${value}L")
case _ =>
val constRef = ctx.addReferenceObj("literal", value, javaType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ object DateTimeUtils {
instantToMicros(instant)
}

def microsToLocalDateTime(micros: Long): LocalDateTime = {
getLocalDateTime(micros, ZoneOffset.UTC)
}

def localDateTimeToMicros(localDateTime: LocalDateTime): Long = {
instantToMicros(localDateTime.toInstant(ZoneOffset.UTC))
}

/**
* Converts a local date at the default JVM time zone to the number of days since 1970-01-01
* in the hybrid calendar (Julian + Gregorian) by discarding the time part. The resulted days are
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ object DataType {
private val otherTypes = {
Seq(NullType, DateType, TimestampType, BinaryType, IntegerType, BooleanType, LongType,
DoubleType, FloatType, ShortType, ByteType, StringType, CalendarIntervalType,
DayTimeIntervalType, YearMonthIntervalType)
DayTimeIntervalType, YearMonthIntervalType, TimestampWithoutTZType)
.map(t => t.typeName -> t).toMap
}

Expand Down
Loading