Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3404,6 +3404,17 @@ class PlanGenerationTestSuite
fn.typedLit(java.time.Period.ofDays(100)),
fn.typedLit(java.time.LocalTime.of(23, 59, 59, 999999999)),
fn.typedLit(new CalendarInterval(2, 20, 100L)),
fn.typedLit(
(
java.time.LocalDate.of(2020, 10, 10),
java.time.Instant.ofEpochMilli(1677155519808L),
new java.sql.Timestamp(12345L),
java.time.LocalDateTime.of(2023, 2, 23, 20, 36),
java.sql.Date.valueOf("2023-02-23"),
java.time.Duration.ofSeconds(200L),
java.time.Period.ofDays(100),
java.time.LocalTime.of(23, 59, 59, 999999999),
new CalendarInterval(2, 20, 100L))),

// Handle parameterized scala types e.g.: List, Seq and Map.
fn.typedLit(Some(1)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ object LiteralValueProtoConverter {
}
}

private def getConverter(dataType: proto.DataType): proto.Expression.Literal => Any = {
private def getScalaConverter(dataType: proto.DataType): proto.Expression.Literal => Any = {
dataType.getKindCase match {
case proto.DataType.KindCase.SHORT => v => v.getShort.toShort
case proto.DataType.KindCase.INTEGER => v => v.getInteger
Expand All @@ -487,11 +487,18 @@ object LiteralValueProtoConverter {
case proto.DataType.KindCase.BOOLEAN => v => v.getBoolean
case proto.DataType.KindCase.STRING => v => v.getString
case proto.DataType.KindCase.BINARY => v => v.getBinary.toByteArray
case proto.DataType.KindCase.DATE => v => v.getDate
case proto.DataType.KindCase.TIMESTAMP => v => v.getTimestamp
case proto.DataType.KindCase.TIMESTAMP_NTZ => v => v.getTimestampNtz
case proto.DataType.KindCase.DAY_TIME_INTERVAL => v => v.getDayTimeInterval
case proto.DataType.KindCase.YEAR_MONTH_INTERVAL => v => v.getYearMonthInterval
case proto.DataType.KindCase.DATE =>
v => SparkDateTimeUtils.toJavaDate(v.getDate)
Copy link
Contributor

@cloud-fan cloud-fan Sep 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need a bit more context to understand it better. So at the client side, fn.typedLit takes java classes like LocalDate, and convert it to int before putting it into the protobuf message. At the server side, v.getDate returns int and we need to convert it back to the LocalDate.

This looks a bit messy to me. If the client side already knows how to convert external java objects to internal catalyst values, why does server need this conversion?

Copy link
Contributor Author

@heyihong heyihong Sep 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan I am not sure if the client side knows how to convert external Java objects into internal Catalyst values. The conversion occurs here in LiteralValueProtoConverter and the server just converts the Proto value back to the Java object.

This code path is also needed in the Scala client, since the client may need to convert temporal values from proto observed metrics received from the server into Java objects.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's rename def getConverter to getScalaConverter

case proto.DataType.KindCase.TIMESTAMP =>
v => SparkDateTimeUtils.toJavaTimestamp(v.getTimestamp)
case proto.DataType.KindCase.TIMESTAMP_NTZ =>
v => SparkDateTimeUtils.microsToLocalDateTime(v.getTimestampNtz)
case proto.DataType.KindCase.DAY_TIME_INTERVAL =>
v => SparkIntervalUtils.microsToDuration(v.getDayTimeInterval)
case proto.DataType.KindCase.YEAR_MONTH_INTERVAL =>
v => SparkIntervalUtils.monthsToPeriod(v.getYearMonthInterval)
case proto.DataType.KindCase.TIME =>
v => SparkDateTimeUtils.nanosToLocalTime(v.getTime.getNano)
case proto.DataType.KindCase.DECIMAL => v => Decimal(v.getDecimal.getValue)
case proto.DataType.KindCase.CALENDAR_INTERVAL =>
v =>
Expand Down Expand Up @@ -646,7 +653,7 @@ object LiteralValueProtoConverter {
}
}

makeArrayData(getConverter(arrayType.getElementType))
makeArrayData(getScalaConverter(arrayType.getElementType))
}

def getProtoArrayType(array: proto.Expression.Literal.Array): proto.DataType.Array = {
Expand Down Expand Up @@ -691,7 +698,7 @@ object LiteralValueProtoConverter {
}
}

makeMapData(getConverter(mapType.getKeyType), getConverter(mapType.getValueType))
makeMapData(getScalaConverter(mapType.getKeyType), getScalaConverter(mapType.getValueType))
}

def getProtoMapType(map: proto.Expression.Literal.Map): proto.DataType.Map = {
Expand Down Expand Up @@ -724,7 +731,7 @@ object LiteralValueProtoConverter {
val structData = Array.tabulate(struct.getElementsCount) { i =>
val element = struct.getElements(i)
val dataType = structType.getFields(i).getDataType
getConverter(dataType)(element)
getScalaConverter(dataType)(element)
}
new GenericRowWithSchema(structData, DataTypeProtoConverter.toCatalystStructType(structType))
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
Project [id#0L, id#0L, 1 AS 1#0, null AS NULL#0, true AS true#0, 68 AS 68#0, 9872 AS 9872#0, -8726532 AS -8726532#0, 7834609328726532 AS 7834609328726532#0L, 2.718281828459045 AS 2.718281828459045#0, -0.8 AS -0.8#0, 89.97620 AS 89.97620#0, 89889.7667231 AS 89889.7667231#0, connect! AS connect!#0, T AS T#0, ABCDEFGHIJ AS ABCDEFGHIJ#0, 0x78797A7B7C7D7E7F808182838485868788898A8B8C8D8E AS X'78797A7B7C7D7E7F808182838485868788898A8B8C8D8E'#0, 0x0806 AS X'0806'#0, [8,6] AS ARRAY(8, 6)#0, null AS NULL#0, 2020-10-10 AS DATE '2020-10-10'#0, 8.997620 AS 8.997620#0, 2023-02-23 04:31:59.808 AS TIMESTAMP '2023-02-23 04:31:59.808'#0, 1969-12-31 16:00:12.345 AS TIMESTAMP '1969-12-31 16:00:12.345'#0, 2023-02-23 20:36:00 AS TIMESTAMP_NTZ '2023-02-23 20:36:00'#0, 2023-02-23 AS DATE '2023-02-23'#0, INTERVAL '0 00:03:20' DAY TO SECOND AS INTERVAL '0 00:03:20' DAY TO SECOND#0, INTERVAL '0-0' YEAR TO MONTH AS INTERVAL '0-0' YEAR TO MONTH#0, 23:59:59.999999999 AS TIME '23:59:59.999999999'#0, 2 months 20 days 0.0001 seconds AS INTERVAL '2 months 20 days 0.0001 seconds'#0, 1 AS 1#0, [1,2,3] AS ARRAY(1, 2, 3)#0, [1,2,3] AS ARRAY(1, 2, 3)#0, map(keys: [a,b], values: [1,2]) AS MAP('a', 1, 'b', 2)#0, [a,2,1.0] AS NAMED_STRUCT('_1', 'a', '_2', 2, '_3', 1.0D)#0, null AS NULL#0, [1] AS ARRAY(1)#0, map(keys: [1], values: [0]) AS MAP(1, 0)#0, map(keys: [1], values: [0]) AS MAP(1, 0)#0, map(keys: [1], values: [0]) AS MAP(1, 0)#0, [[1,2,3],[4,5,6],[7,8,9]] AS ARRAY(ARRAY(1, 2, 3), ARRAY(4, 5, 6), ARRAY(7, 8, 9))#0, [keys: [a,b], values: [1,2],keys: [a,b], values: [3,4],keys: [a,b], values: [5,6]] AS ARRAY(MAP('a', 1, 'b', 2), MAP('a', 3, 'b', 4), MAP('a', 5, 'b', 6))#0, [keys: [a,b], values: [[1,2],[3,4]],keys: [a,b], values: [[5,6],[7,8]],keys: [a,b], values: [[],[]]] AS ARRAY(MAP('a', ARRAY('1', '2'), 'b', ARRAY('3', '4')), MAP('a', ARRAY('5', '6'), 'b', ARRAY('7', '8')), MAP('a', ARRAY(), 'b', ARRAY()))#0, map(keys: [1,2], values: [keys: [a,b], values: [1,2],keys: [a,b], values: [3,4]]) AS MAP(1, MAP('a', 1, 'b', 2), 2, MAP('a', 3, 'b', 4))#0, [[1,2,3],keys: [a,b], values: [1,2],[a,keys: [1,2], values: [a,b]]] AS NAMED_STRUCT('_1', ARRAY(1, 2, 3), '_2', MAP('a', 1, 'b', 2), '_3', NAMED_STRUCT('_1', 'a', '_2', MAP(1, 'a', 2, 'b')))#0]
Project [id#0L, id#0L, 1 AS 1#0, null AS NULL#0, true AS true#0, 68 AS 68#0, 9872 AS 9872#0, -8726532 AS -8726532#0, 7834609328726532 AS 7834609328726532#0L, 2.718281828459045 AS 2.718281828459045#0, -0.8 AS -0.8#0, 89.97620 AS 89.97620#0, 89889.7667231 AS 89889.7667231#0, connect! AS connect!#0, T AS T#0, ABCDEFGHIJ AS ABCDEFGHIJ#0, 0x78797A7B7C7D7E7F808182838485868788898A8B8C8D8E AS X'78797A7B7C7D7E7F808182838485868788898A8B8C8D8E'#0, 0x0806 AS X'0806'#0, [8,6] AS ARRAY(8, 6)#0, null AS NULL#0, 2020-10-10 AS DATE '2020-10-10'#0, 8.997620 AS 8.997620#0, 2023-02-23 04:31:59.808 AS TIMESTAMP '2023-02-23 04:31:59.808'#0, 1969-12-31 16:00:12.345 AS TIMESTAMP '1969-12-31 16:00:12.345'#0, 2023-02-23 20:36:00 AS TIMESTAMP_NTZ '2023-02-23 20:36:00'#0, 2023-02-23 AS DATE '2023-02-23'#0, INTERVAL '0 00:03:20' DAY TO SECOND AS INTERVAL '0 00:03:20' DAY TO SECOND#0, INTERVAL '0-0' YEAR TO MONTH AS INTERVAL '0-0' YEAR TO MONTH#0, 23:59:59.999999999 AS TIME '23:59:59.999999999'#0, 2 months 20 days 0.0001 seconds AS INTERVAL '2 months 20 days 0.0001 seconds'#0, [18545,1677155519808000,12345000,1677184560000000,19411,200000000,0,86399999999999,2 months 20 days 0.0001 seconds] AS NAMED_STRUCT('_1', DATE '2020-10-10', '_2', TIMESTAMP '2023-02-23 04:31:59.808', '_3', TIMESTAMP '1969-12-31 16:00:12.345', '_4', TIMESTAMP_NTZ '2023-02-23 20:36:00', '_5', DATE '2023-02-23', '_6', INTERVAL '0 00:03:20' DAY TO SECOND, '_7', INTERVAL '0-0' YEAR TO MONTH, '_8', TIME '23:59:59.999999999', '_9', INTERVAL '2 months 20 days 0.0001 seconds')#0, 1 AS 1#0, [1,2,3] AS ARRAY(1, 2, 3)#0, [1,2,3] AS ARRAY(1, 2, 3)#0, map(keys: [a,b], values: [1,2]) AS MAP('a', 1, 'b', 2)#0, [a,2,1.0] AS NAMED_STRUCT('_1', 'a', '_2', 2, '_3', 1.0D)#0, null AS NULL#0, [1] AS ARRAY(1)#0, map(keys: [1], values: [0]) AS MAP(1, 0)#0, map(keys: [1], values: [0]) AS MAP(1, 0)#0, map(keys: [1], values: [0]) AS MAP(1, 0)#0, [[1,2,3],[4,5,6],[7,8,9]] AS ARRAY(ARRAY(1, 2, 3), ARRAY(4, 5, 6), ARRAY(7, 8, 9))#0, [keys: [a,b], values: [1,2],keys: [a,b], values: [3,4],keys: [a,b], values: [5,6]] AS ARRAY(MAP('a', 1, 'b', 2), MAP('a', 3, 'b', 4), MAP('a', 5, 'b', 6))#0, [keys: [a,b], values: [[1,2],[3,4]],keys: [a,b], values: [[5,6],[7,8]],keys: [a,b], values: [[],[]]] AS ARRAY(MAP('a', ARRAY('1', '2'), 'b', ARRAY('3', '4')), MAP('a', ARRAY('5', '6'), 'b', ARRAY('7', '8')), MAP('a', ARRAY(), 'b', ARRAY()))#0, map(keys: [1,2], values: [keys: [a,b], values: [1,2],keys: [a,b], values: [3,4]]) AS MAP(1, MAP('a', 1, 'b', 2), 2, MAP('a', 3, 'b', 4))#0, [[1,2,3],keys: [a,b], values: [1,2],[a,keys: [1,2], values: [a,b]]] AS NAMED_STRUCT('_1', ARRAY(1, 2, 3), '_2', MAP('a', 1, 'b', 2), '_3', NAMED_STRUCT('_1', 'a', '_2', MAP(1, 'a', 2, 'b')))#0]
+- LocalRelation <empty>, [id#0L, a#0, b#0]
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,101 @@
}
}
}
}, {
"literal": {
"struct": {
"elements": [{
"date": 18545
}, {
"timestamp": "1677155519808000"
}, {
"timestamp": "12345000"
}, {
"timestampNtz": "1677184560000000"
}, {
"date": 19411
}, {
"dayTimeInterval": "200000000"
}, {
"yearMonthInterval": 0
}, {
"time": {
"nano": "86399999999999",
"precision": 6
}
}, {
"calendarInterval": {
"months": 2,
"days": 20,
"microseconds": "100"
}
}],
"dataTypeStruct": {
"fields": [{
"name": "_1",
"nullable": true
}, {
"name": "_2",
"nullable": true
}, {
"name": "_3",
"nullable": true
}, {
"name": "_4",
"nullable": true
}, {
"name": "_5",
"nullable": true
}, {
"name": "_6",
"dataType": {
"dayTimeInterval": {
"startField": 0,
"endField": 3
}
},
"nullable": true
}, {
"name": "_7",
"dataType": {
"yearMonthInterval": {
"startField": 0,
"endField": 1
}
},
"nullable": true
}, {
"name": "_8",
"dataType": {
"time": {
"precision": 6
}
},
"nullable": true
}, {
"name": "_9",
"nullable": true
}]
}
}
},
"common": {
"origin": {
"jvmOrigin": {
"stackTrace": [{
"classLoaderName": "app",
"declaringClass": "org.apache.spark.sql.functions$",
"methodName": "typedLit",
"fileName": "functions.scala"
}, {
"classLoaderName": "app",
"declaringClass": "org.apache.spark.sql.PlanGenerationTestSuite",
"methodName": "~~trimmed~anonfun~~",
"fileName": "PlanGenerationTestSuite.scala"
}]
}
}
}
}, {
"literal": {
"integer": 1
Expand Down
Binary file not shown.