Skip to content

Commit 1817e67

Browse files
heyihongcloud-fan
authored andcommitted
[SPARK-53524][CONNECT][SQL] Fix temporal value conversion in LiteralValueProtoConverter
### What changes were proposed in this pull request? This PR fixes temporal value conversion issues in the `LiteralValueProtoConverter` for Spark Connect. The main changes include: 1. **Fixed temporal value conversion in `getConverter` method**: Updated the conversion logic for temporal data types (DATE, TIMESTAMP, TIMESTAMP_NTZ, DAY_TIME_INTERVAL, YEAR_MONTH_INTERVAL, TIME) to use proper utility methods from `SparkDateTimeUtils` and `SparkIntervalUtils` instead of directly returning raw protobuf values. 2. **Added comprehensive test coverage**: Extended the `PlanGenerationTestSuite` with a new test case that includes a tuple containing all temporal types to ensure proper conversion and serialization. 3. **Updated test expectations**: Modified the expected explain output and query test files to reflect the corrected temporal value handling. ### Why are the changes needed? The struct type in typedlit doesn't work well with temporal values due to bugs in type conversions. For example, the code below fails: ```scala import org.apache.spark.sql.functions.typedlit spark.sql("select 1").select(typedlit((1, java.time.LocalDate.of(2020, 10, 10)))).collect() """ org.apache.spark.SparkIllegalArgumentException: The value (18545) of the type (java.lang.Integer) cannot be converted to the DATE type. org.apache.spark.sql.catalyst.CatalystTypeConverters$DateConverter$.toCatalystImpl(CatalystTypeConverters.scala:356) org.apache.spark.sql.catalyst.CatalystTypeConverters$DateConverter$.toCatalystImpl(CatalystTypeConverters.scala:347) org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:110) org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:271) org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:251) org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:110) org.apache.spark.sql.catalyst.CatalystTypeConverters$.$anonfun$createToCatalystConverter$2(CatalystTypeConverters.scala:532) org.apache.spark.sql.connect.planner.LiteralExpressionProtoConverter$.toCatalystExpression(LiteralExpressionProtoConverter.scala:116) """ ``` ### Does this PR introduce _any_ user-facing change? **Yes.** This PR fixes temporal value conversion in LiteralValueProtoConverter, allowing the struct type in typedlit to work with temporal values. ### How was this patch tested? `build/sbt "connect-client-jvm/testOnly org.apache.spark.sql.PlanGenerationTestSuite"` `build/sbt "connect/testOnly org.apache.spark.sql.connect.ProtoToParsedPlanTestSuite"` ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Cursor 1.5.11 Closes #52270 from heyihong/SPARK-53524. Authored-by: Yihong He <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent d017779 commit 1817e67

File tree

5 files changed

+123
-10
lines changed

5 files changed

+123
-10
lines changed

sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3404,6 +3404,17 @@ class PlanGenerationTestSuite
34043404
fn.typedLit(java.time.Period.ofDays(100)),
34053405
fn.typedLit(java.time.LocalTime.of(23, 59, 59, 999999999)),
34063406
fn.typedLit(new CalendarInterval(2, 20, 100L)),
3407+
fn.typedLit(
3408+
(
3409+
java.time.LocalDate.of(2020, 10, 10),
3410+
java.time.Instant.ofEpochMilli(1677155519808L),
3411+
new java.sql.Timestamp(12345L),
3412+
java.time.LocalDateTime.of(2023, 2, 23, 20, 36),
3413+
java.sql.Date.valueOf("2023-02-23"),
3414+
java.time.Duration.ofSeconds(200L),
3415+
java.time.Period.ofDays(100),
3416+
java.time.LocalTime.of(23, 59, 59, 999999999),
3417+
new CalendarInterval(2, 20, 100L))),
34073418

34083419
// Handle parameterized scala types e.g.: List, Seq and Map.
34093420
fn.typedLit(Some(1)),

sql/connect/common/src/main/scala/org/apache/spark/sql/connect/common/LiteralValueProtoConverter.scala

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -476,7 +476,7 @@ object LiteralValueProtoConverter {
476476
}
477477
}
478478

479-
private def getConverter(dataType: proto.DataType): proto.Expression.Literal => Any = {
479+
private def getScalaConverter(dataType: proto.DataType): proto.Expression.Literal => Any = {
480480
dataType.getKindCase match {
481481
case proto.DataType.KindCase.SHORT => v => v.getShort.toShort
482482
case proto.DataType.KindCase.INTEGER => v => v.getInteger
@@ -487,11 +487,18 @@ object LiteralValueProtoConverter {
487487
case proto.DataType.KindCase.BOOLEAN => v => v.getBoolean
488488
case proto.DataType.KindCase.STRING => v => v.getString
489489
case proto.DataType.KindCase.BINARY => v => v.getBinary.toByteArray
490-
case proto.DataType.KindCase.DATE => v => v.getDate
491-
case proto.DataType.KindCase.TIMESTAMP => v => v.getTimestamp
492-
case proto.DataType.KindCase.TIMESTAMP_NTZ => v => v.getTimestampNtz
493-
case proto.DataType.KindCase.DAY_TIME_INTERVAL => v => v.getDayTimeInterval
494-
case proto.DataType.KindCase.YEAR_MONTH_INTERVAL => v => v.getYearMonthInterval
490+
case proto.DataType.KindCase.DATE =>
491+
v => SparkDateTimeUtils.toJavaDate(v.getDate)
492+
case proto.DataType.KindCase.TIMESTAMP =>
493+
v => SparkDateTimeUtils.toJavaTimestamp(v.getTimestamp)
494+
case proto.DataType.KindCase.TIMESTAMP_NTZ =>
495+
v => SparkDateTimeUtils.microsToLocalDateTime(v.getTimestampNtz)
496+
case proto.DataType.KindCase.DAY_TIME_INTERVAL =>
497+
v => SparkIntervalUtils.microsToDuration(v.getDayTimeInterval)
498+
case proto.DataType.KindCase.YEAR_MONTH_INTERVAL =>
499+
v => SparkIntervalUtils.monthsToPeriod(v.getYearMonthInterval)
500+
case proto.DataType.KindCase.TIME =>
501+
v => SparkDateTimeUtils.nanosToLocalTime(v.getTime.getNano)
495502
case proto.DataType.KindCase.DECIMAL => v => Decimal(v.getDecimal.getValue)
496503
case proto.DataType.KindCase.CALENDAR_INTERVAL =>
497504
v =>
@@ -646,7 +653,7 @@ object LiteralValueProtoConverter {
646653
}
647654
}
648655

649-
makeArrayData(getConverter(arrayType.getElementType))
656+
makeArrayData(getScalaConverter(arrayType.getElementType))
650657
}
651658

652659
def getProtoArrayType(array: proto.Expression.Literal.Array): proto.DataType.Array = {
@@ -691,7 +698,7 @@ object LiteralValueProtoConverter {
691698
}
692699
}
693700

694-
makeMapData(getConverter(mapType.getKeyType), getConverter(mapType.getValueType))
701+
makeMapData(getScalaConverter(mapType.getKeyType), getScalaConverter(mapType.getValueType))
695702
}
696703

697704
def getProtoMapType(map: proto.Expression.Literal.Map): proto.DataType.Map = {
@@ -724,7 +731,7 @@ object LiteralValueProtoConverter {
724731
val structData = Array.tabulate(struct.getElementsCount) { i =>
725732
val element = struct.getElements(i)
726733
val dataType = structType.getFields(i).getDataType
727-
getConverter(dataType)(element)
734+
getScalaConverter(dataType)(element)
728735
}
729736
new GenericRowWithSchema(structData, DataTypeProtoConverter.toCatalystStructType(structType))
730737
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
Project [id#0L, id#0L, 1 AS 1#0, null AS NULL#0, true AS true#0, 68 AS 68#0, 9872 AS 9872#0, -8726532 AS -8726532#0, 7834609328726532 AS 7834609328726532#0L, 2.718281828459045 AS 2.718281828459045#0, -0.8 AS -0.8#0, 89.97620 AS 89.97620#0, 89889.7667231 AS 89889.7667231#0, connect! AS connect!#0, T AS T#0, ABCDEFGHIJ AS ABCDEFGHIJ#0, 0x78797A7B7C7D7E7F808182838485868788898A8B8C8D8E AS X'78797A7B7C7D7E7F808182838485868788898A8B8C8D8E'#0, 0x0806 AS X'0806'#0, [8,6] AS ARRAY(8, 6)#0, null AS NULL#0, 2020-10-10 AS DATE '2020-10-10'#0, 8.997620 AS 8.997620#0, 2023-02-23 04:31:59.808 AS TIMESTAMP '2023-02-23 04:31:59.808'#0, 1969-12-31 16:00:12.345 AS TIMESTAMP '1969-12-31 16:00:12.345'#0, 2023-02-23 20:36:00 AS TIMESTAMP_NTZ '2023-02-23 20:36:00'#0, 2023-02-23 AS DATE '2023-02-23'#0, INTERVAL '0 00:03:20' DAY TO SECOND AS INTERVAL '0 00:03:20' DAY TO SECOND#0, INTERVAL '0-0' YEAR TO MONTH AS INTERVAL '0-0' YEAR TO MONTH#0, 23:59:59.999999999 AS TIME '23:59:59.999999999'#0, 2 months 20 days 0.0001 seconds AS INTERVAL '2 months 20 days 0.0001 seconds'#0, 1 AS 1#0, [1,2,3] AS ARRAY(1, 2, 3)#0, [1,2,3] AS ARRAY(1, 2, 3)#0, map(keys: [a,b], values: [1,2]) AS MAP('a', 1, 'b', 2)#0, [a,2,1.0] AS NAMED_STRUCT('_1', 'a', '_2', 2, '_3', 1.0D)#0, null AS NULL#0, [1] AS ARRAY(1)#0, map(keys: [1], values: [0]) AS MAP(1, 0)#0, map(keys: [1], values: [0]) AS MAP(1, 0)#0, map(keys: [1], values: [0]) AS MAP(1, 0)#0, [[1,2,3],[4,5,6],[7,8,9]] AS ARRAY(ARRAY(1, 2, 3), ARRAY(4, 5, 6), ARRAY(7, 8, 9))#0, [keys: [a,b], values: [1,2],keys: [a,b], values: [3,4],keys: [a,b], values: [5,6]] AS ARRAY(MAP('a', 1, 'b', 2), MAP('a', 3, 'b', 4), MAP('a', 5, 'b', 6))#0, [keys: [a,b], values: [[1,2],[3,4]],keys: [a,b], values: [[5,6],[7,8]],keys: [a,b], values: [[],[]]] AS ARRAY(MAP('a', ARRAY('1', '2'), 'b', ARRAY('3', '4')), MAP('a', ARRAY('5', '6'), 'b', ARRAY('7', '8')), MAP('a', ARRAY(), 'b', ARRAY()))#0, map(keys: [1,2], values: [keys: [a,b], values: [1,2],keys: [a,b], values: [3,4]]) AS MAP(1, MAP('a', 1, 'b', 2), 2, MAP('a', 3, 'b', 4))#0, [[1,2,3],keys: [a,b], values: [1,2],[a,keys: [1,2], values: [a,b]]] AS NAMED_STRUCT('_1', ARRAY(1, 2, 3), '_2', MAP('a', 1, 'b', 2), '_3', NAMED_STRUCT('_1', 'a', '_2', MAP(1, 'a', 2, 'b')))#0]
1+
Project [id#0L, id#0L, 1 AS 1#0, null AS NULL#0, true AS true#0, 68 AS 68#0, 9872 AS 9872#0, -8726532 AS -8726532#0, 7834609328726532 AS 7834609328726532#0L, 2.718281828459045 AS 2.718281828459045#0, -0.8 AS -0.8#0, 89.97620 AS 89.97620#0, 89889.7667231 AS 89889.7667231#0, connect! AS connect!#0, T AS T#0, ABCDEFGHIJ AS ABCDEFGHIJ#0, 0x78797A7B7C7D7E7F808182838485868788898A8B8C8D8E AS X'78797A7B7C7D7E7F808182838485868788898A8B8C8D8E'#0, 0x0806 AS X'0806'#0, [8,6] AS ARRAY(8, 6)#0, null AS NULL#0, 2020-10-10 AS DATE '2020-10-10'#0, 8.997620 AS 8.997620#0, 2023-02-23 04:31:59.808 AS TIMESTAMP '2023-02-23 04:31:59.808'#0, 1969-12-31 16:00:12.345 AS TIMESTAMP '1969-12-31 16:00:12.345'#0, 2023-02-23 20:36:00 AS TIMESTAMP_NTZ '2023-02-23 20:36:00'#0, 2023-02-23 AS DATE '2023-02-23'#0, INTERVAL '0 00:03:20' DAY TO SECOND AS INTERVAL '0 00:03:20' DAY TO SECOND#0, INTERVAL '0-0' YEAR TO MONTH AS INTERVAL '0-0' YEAR TO MONTH#0, 23:59:59.999999999 AS TIME '23:59:59.999999999'#0, 2 months 20 days 0.0001 seconds AS INTERVAL '2 months 20 days 0.0001 seconds'#0, [18545,1677155519808000,12345000,1677184560000000,19411,200000000,0,86399999999999,2 months 20 days 0.0001 seconds] AS NAMED_STRUCT('_1', DATE '2020-10-10', '_2', TIMESTAMP '2023-02-23 04:31:59.808', '_3', TIMESTAMP '1969-12-31 16:00:12.345', '_4', TIMESTAMP_NTZ '2023-02-23 20:36:00', '_5', DATE '2023-02-23', '_6', INTERVAL '0 00:03:20' DAY TO SECOND, '_7', INTERVAL '0-0' YEAR TO MONTH, '_8', TIME '23:59:59.999999999', '_9', INTERVAL '2 months 20 days 0.0001 seconds')#0, 1 AS 1#0, [1,2,3] AS ARRAY(1, 2, 3)#0, [1,2,3] AS ARRAY(1, 2, 3)#0, map(keys: [a,b], values: [1,2]) AS MAP('a', 1, 'b', 2)#0, [a,2,1.0] AS NAMED_STRUCT('_1', 'a', '_2', 2, '_3', 1.0D)#0, null AS NULL#0, [1] AS ARRAY(1)#0, map(keys: [1], values: [0]) AS MAP(1, 0)#0, map(keys: [1], values: [0]) AS MAP(1, 0)#0, map(keys: [1], values: [0]) AS MAP(1, 0)#0, [[1,2,3],[4,5,6],[7,8,9]] AS ARRAY(ARRAY(1, 2, 3), ARRAY(4, 5, 6), ARRAY(7, 8, 9))#0, [keys: [a,b], values: [1,2],keys: [a,b], values: [3,4],keys: [a,b], values: [5,6]] AS ARRAY(MAP('a', 1, 'b', 2), MAP('a', 3, 'b', 4), MAP('a', 5, 'b', 6))#0, [keys: [a,b], values: [[1,2],[3,4]],keys: [a,b], values: [[5,6],[7,8]],keys: [a,b], values: [[],[]]] AS ARRAY(MAP('a', ARRAY('1', '2'), 'b', ARRAY('3', '4')), MAP('a', ARRAY('5', '6'), 'b', ARRAY('7', '8')), MAP('a', ARRAY(), 'b', ARRAY()))#0, map(keys: [1,2], values: [keys: [a,b], values: [1,2],keys: [a,b], values: [3,4]]) AS MAP(1, MAP('a', 1, 'b', 2), 2, MAP('a', 3, 'b', 4))#0, [[1,2,3],keys: [a,b], values: [1,2],[a,keys: [1,2], values: [a,b]]] AS NAMED_STRUCT('_1', ARRAY(1, 2, 3), '_2', MAP('a', 1, 'b', 2), '_3', NAMED_STRUCT('_1', 'a', '_2', MAP(1, 'a', 2, 'b')))#0]
22
+- LocalRelation <empty>, [id#0L, a#0, b#0]

sql/connect/common/src/test/resources/query-tests/queries/function_typedLit.json

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -674,6 +674,101 @@
674674
}
675675
}
676676
}
677+
}, {
678+
"literal": {
679+
"struct": {
680+
"elements": [{
681+
"date": 18545
682+
}, {
683+
"timestamp": "1677155519808000"
684+
}, {
685+
"timestamp": "12345000"
686+
}, {
687+
"timestampNtz": "1677184560000000"
688+
}, {
689+
"date": 19411
690+
}, {
691+
"dayTimeInterval": "200000000"
692+
}, {
693+
"yearMonthInterval": 0
694+
}, {
695+
"time": {
696+
"nano": "86399999999999",
697+
"precision": 6
698+
}
699+
}, {
700+
"calendarInterval": {
701+
"months": 2,
702+
"days": 20,
703+
"microseconds": "100"
704+
}
705+
}],
706+
"dataTypeStruct": {
707+
"fields": [{
708+
"name": "_1",
709+
"nullable": true
710+
}, {
711+
"name": "_2",
712+
"nullable": true
713+
}, {
714+
"name": "_3",
715+
"nullable": true
716+
}, {
717+
"name": "_4",
718+
"nullable": true
719+
}, {
720+
"name": "_5",
721+
"nullable": true
722+
}, {
723+
"name": "_6",
724+
"dataType": {
725+
"dayTimeInterval": {
726+
"startField": 0,
727+
"endField": 3
728+
}
729+
},
730+
"nullable": true
731+
}, {
732+
"name": "_7",
733+
"dataType": {
734+
"yearMonthInterval": {
735+
"startField": 0,
736+
"endField": 1
737+
}
738+
},
739+
"nullable": true
740+
}, {
741+
"name": "_8",
742+
"dataType": {
743+
"time": {
744+
"precision": 6
745+
}
746+
},
747+
"nullable": true
748+
}, {
749+
"name": "_9",
750+
"nullable": true
751+
}]
752+
}
753+
}
754+
},
755+
"common": {
756+
"origin": {
757+
"jvmOrigin": {
758+
"stackTrace": [{
759+
"classLoaderName": "app",
760+
"declaringClass": "org.apache.spark.sql.functions$",
761+
"methodName": "typedLit",
762+
"fileName": "functions.scala"
763+
}, {
764+
"classLoaderName": "app",
765+
"declaringClass": "org.apache.spark.sql.PlanGenerationTestSuite",
766+
"methodName": "~~trimmed~anonfun~~",
767+
"fileName": "PlanGenerationTestSuite.scala"
768+
}]
769+
}
770+
}
771+
}
677772
}, {
678773
"literal": {
679774
"integer": 1
Binary file not shown.

0 commit comments

Comments
 (0)