Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ public static Object read(
if (dataType instanceof TimestampType) {
return obj.getLong(ordinal);
}
if (dataType instanceof TimeType) {
return obj.getLong(ordinal);
}
if (dataType instanceof CalendarIntervalType) {
return obj.getInterval(ordinal);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ public static int calculateBitSetWidthInBytes(int numFields) {
FloatType,
DoubleType,
DateType,
TimestampType
TimestampType,
TimeType
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Problem is that you have to define the mapping for PySpark and SparkR too that's probably one of the tricky part.
I think it's best to loop the dev mailing list and see if there's a big support for this. Otherwise, I'm not super supportive of it due to the maintenance cost.

})));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ public class DataTypes {
*/
public static final DataType TimestampType = TimestampType$.MODULE$;

/**
* Gets the TimeType object.
*/
public static final DataType TimeType = TimeType$.MODULE$;

/**
* Gets the CalendarIntervalType object.
*/
Expand Down
9 changes: 9 additions & 0 deletions sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ trait Row extends Serializable {
*
* DateType -> java.sql.Date
* TimestampType -> java.sql.Timestamp
* TimeType -> java.sql.Time
*
* BinaryType -> byte array
* ArrayType -> scala.collection.Seq (use getList for java.util.List)
Expand All @@ -192,6 +193,7 @@ trait Row extends Serializable {
*
* DateType -> java.sql.Date
* TimestampType -> java.sql.Timestamp
* TimeType -> java.sql.Time
*
* BinaryType -> byte array
* ArrayType -> scala.collection.Seq (use getList for java.util.List)
Expand Down Expand Up @@ -296,6 +298,13 @@ trait Row extends Serializable {
*/
def getTimestamp(i: Int): java.sql.Timestamp = getAs[java.sql.Timestamp](i)

/**
* Returns the value at position i of date type as java.sql.Time.
*
* @throws ClassCastException when data type does not match.
*/
def getTime(i: Int): java.sql.Time = getAs[java.sql.Time](i)

/**
* Returns the value at position i of date type as java.time.Instant.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst
import java.lang.{Iterable => JavaIterable}
import java.math.{BigDecimal => JavaBigDecimal}
import java.math.{BigInteger => JavaBigInteger}
import java.sql.{Date, Timestamp}
import java.sql.{Date, Time, Timestamp}
import java.time.{Instant, LocalDate}
import java.util.{Map => JavaMap}
import javax.annotation.Nullable
Expand Down Expand Up @@ -331,6 +331,16 @@ object CatalystTypeConverters {
DateTimeUtils.toJavaTimestamp(row.getLong(column))
}

private object TimeConverter extends CatalystTypeConverter[Time, Time, Any] {
override def toCatalystImpl(scalaValue: Time): Long =
DateTimeUtils.fromJavaTime(scalaValue)
override def toScala(catalystValue: Any): Time =
if (catalystValue == null) null
else DateTimeUtils.toJavaTime(catalystValue.asInstanceOf[Long])
override def toScalaImpl(row: InternalRow, column: Int): Time =
DateTimeUtils.toJavaTime(row.getLong(column))
}

private object InstantConverter extends CatalystTypeConverter[Instant, Instant, Any] {
override def toCatalystImpl(scalaValue: Instant): Long =
DateTimeUtils.instantToMicros(scalaValue)
Expand Down Expand Up @@ -451,6 +461,7 @@ object CatalystTypeConverters {
case d: Date => DateConverter.toCatalyst(d)
case ld: LocalDate => LocalDateConverter.toCatalyst(ld)
case t: Timestamp => TimestampConverter.toCatalyst(t)
case ti: Time => TimeConverter.toCatalyst(ti);
case i: Instant => InstantConverter.toCatalyst(i)
case d: BigDecimal => new DecimalConverter(DecimalType(d.precision, d.scale)).toCatalyst(d)
case d: JavaBigDecimal => new DecimalConverter(DecimalType(d.precision, d.scale)).toCatalyst(d)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,15 @@ object DeserializerBuildHelper {
returnNullable = false)
}

def createDeserializerForSqlTime(path: Expression): Expression = {
StaticInvoke(
DateTimeUtils.getClass,
ObjectType(classOf[java.sql.Time]),
"toJavaTime",
path :: Nil,
returnNullable = false)
}

def createDeserializerForJavaBigDecimal(
path: Expression,
returnNullable: Boolean): Expression = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ object InternalRow {
case ByteType => (input, ordinal) => input.getByte(ordinal)
case ShortType => (input, ordinal) => input.getShort(ordinal)
case IntegerType | DateType => (input, ordinal) => input.getInt(ordinal)
case LongType | TimestampType => (input, ordinal) => input.getLong(ordinal)
case LongType | TimestampType | TimeType => (input, ordinal) => input.getLong(ordinal)
case FloatType => (input, ordinal) => input.getFloat(ordinal)
case DoubleType => (input, ordinal) => input.getDouble(ordinal)
case StringType => (input, ordinal) => input.getUTF8String(ordinal)
Expand Down Expand Up @@ -168,7 +168,8 @@ object InternalRow {
case ByteType => (input, v) => input.setByte(ordinal, v.asInstanceOf[Byte])
case ShortType => (input, v) => input.setShort(ordinal, v.asInstanceOf[Short])
case IntegerType | DateType => (input, v) => input.setInt(ordinal, v.asInstanceOf[Int])
case LongType | TimestampType => (input, v) => input.setLong(ordinal, v.asInstanceOf[Long])
case LongType | TimestampType | TimeType => (input, v) =>
input.setLong(ordinal, v.asInstanceOf[Long])
case FloatType => (input, v) => input.setFloat(ordinal, v.asInstanceOf[Float])
case DoubleType => (input, v) => input.setDouble(ordinal, v.asInstanceOf[Double])
case CalendarIntervalType =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,8 @@ object JavaTypeInference {

case c if c == classOf[java.sql.Timestamp] => createSerializerForSqlTimestamp(inputObject)

case c if c == classOf[java.sql.Time] => createSerializerForSqlTime(inputObject)

case c if c == classOf[java.time.LocalDate] => createSerializerForJavaLocalDate(inputObject)

case c if c == classOf[java.sql.Date] => createSerializerForSqlDate(inputObject)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,15 @@ object SerializerBuildHelper {
returnNullable = false)
}

def createSerializerForSqlTime(inputObject: Expression): Expression = {
StaticInvoke(
DateTimeUtils.getClass,
TimeType,
"fromJavaTime",
inputObject :: Nil,
returnNullable = false)
}

def createSerializerForJavaLocalDate(inputObject: Expression): Expression = {
StaticInvoke(
DateTimeUtils.getClass,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ object RowEncoder {
createSerializerForSqlTimestamp(inputObject)
}

case TimeType =>
if (SQLConf.get.datetimeJava8ApiEnabled) {
createSerializerForJavaInstant(inputObject)
} else {
createSerializerForSqlTime(inputObject)
}

case DateType =>
if (SQLConf.get.datetimeJava8ApiEnabled) {
createSerializerForJavaLocalDate(inputObject)
Expand Down Expand Up @@ -220,6 +227,12 @@ object RowEncoder {
} else {
ObjectType(classOf[java.sql.Timestamp])
}
case TimeType =>
if (SQLConf.get.datetimeJava8ApiEnabled) {
ObjectType(classOf[java.time.Instant])
} else {
ObjectType(classOf[java.sql.Time])
}
case DateType =>
if (SQLConf.get.datetimeJava8ApiEnabled) {
ObjectType(classOf[java.time.LocalDate])
Expand Down Expand Up @@ -274,6 +287,13 @@ object RowEncoder {
createDeserializerForSqlTimestamp(input)
}

case TimeType =>
if (SQLConf.get.datetimeJava8ApiEnabled) {
createDeserializerForInstant(input)
} else {
createDeserializerForSqlTime(input)
}

case DateType =>
if (SQLConf.get.datetimeJava8ApiEnabled) {
createDeserializerForLocalDate(input)
Expand Down
Loading