Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -3691,6 +3691,13 @@
],
"sqlState" : "42K0G"
},
"PROTOBUF_NOT_LOADED_SQL_FUNCTIONS_UNUSABLE" : {
"message" : [
"Cannot call the <functionName> SQL function because the Protobuf data source is not loaded.",
"Please restart your job or session with the 'spark-protobuf' package loaded, such as by using the --packages argument on the command line, and then retry your query or command again."
],
"sqlState" : "22KD3"
},
"PROTOBUF_TYPE_NOT_SUPPORT" : {
"message" : [
"Protobuf type not yet supported: <protobufType>."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2042,6 +2042,166 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot
}
}

test("SPARK-49121: from_protobuf and to_protobuf SQL functions") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@itholic It seems that this test cannot be successfully tested in the Maven daily test, both Java 17 and Java 21:

image

also cc @HyukjinKwon

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@itholic mind taking a look please?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@itholic @HyukjinKwon Attempting to fix: #47855

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for following up.

withTable("protobuf_test_table") {
sql(
"""
|CREATE TABLE protobuf_test_table AS
| SELECT named_struct(
| 'id', 1L,
| 'string_value', 'test_string',
| 'int32_value', 32,
| 'int64_value', 64L,
| 'double_value', CAST(123.456 AS DOUBLE),
| 'float_value', CAST(789.01 AS FLOAT),
| 'bool_value', true,
| 'bytes_value', CAST('sample_bytes' AS BINARY)
| ) AS complex_struct
|""".stripMargin)

val toProtobufSql =
s"""
|SELECT
| to_protobuf(
| complex_struct, 'SimpleMessageJavaTypes', '$testFileDescFile', map()
| ) AS protobuf_data
|FROM protobuf_test_table
|""".stripMargin

val protobufResult = spark.sql(toProtobufSql).collect()
assert(protobufResult != null)

val fromProtobufSql =
s"""
|SELECT
| from_protobuf(protobuf_data, 'SimpleMessageJavaTypes', '$testFileDescFile', map())
|FROM
| ($toProtobufSql)
|""".stripMargin

checkAnswer(
spark.sql(fromProtobufSql),
Seq(Row(Row(1L, "test_string", 32, 64L, 123.456, 789.01F, true, "sample_bytes".getBytes)))
)

// Negative tests for to_protobuf.
checkError(
exception = intercept[AnalysisException](sql(
s"""
|SELECT
| to_protobuf(complex_struct, 42, '$testFileDescFile', map())
|FROM protobuf_test_table
|""".stripMargin)),
errorClass = "DATATYPE_MISMATCH.TYPE_CHECK_FAILURE_WITH_HINT",
parameters = Map(
"sqlExpr" -> s"""\"toprotobuf(complex_struct, 42, $testFileDescFile, map())\"""",
"msg" -> ("The second argument of the TO_PROTOBUF SQL function must be a constant " +
"string representing the Protobuf message name"),
"hint" -> ""),
queryContext = Array(ExpectedContext(
fragment = s"to_protobuf(complex_struct, 42, '$testFileDescFile', map())",
start = 10,
stop = 153))
)
checkError(
exception = intercept[AnalysisException](sql(
s"""
|SELECT
| to_protobuf(complex_struct, 'SimpleMessageJavaTypes', 42, map())
|FROM protobuf_test_table
|""".stripMargin)),
errorClass = "DATATYPE_MISMATCH.TYPE_CHECK_FAILURE_WITH_HINT",
parameters = Map(
"sqlExpr" -> "\"toprotobuf(complex_struct, SimpleMessageJavaTypes, 42, map())\"",
"msg" -> ("The third argument of the TO_PROTOBUF SQL function must be a constant " +
"string representing the Protobuf descriptor file path"),
"hint" -> ""),
queryContext = Array(ExpectedContext(
fragment = "to_protobuf(complex_struct, 'SimpleMessageJavaTypes', 42, map())",
start = 10,
stop = 73))
)
checkError(
exception = intercept[AnalysisException](sql(
s"""
|SELECT
| to_protobuf(complex_struct, 'SimpleMessageJavaTypes', '$testFileDescFile', 42)
|FROM protobuf_test_table
|""".stripMargin)),
errorClass = "DATATYPE_MISMATCH.TYPE_CHECK_FAILURE_WITH_HINT",
parameters = Map(
"sqlExpr" ->
s"""\"toprotobuf(complex_struct, SimpleMessageJavaTypes, $testFileDescFile, 42)\"""",
"msg" -> ("The fourth argument of the TO_PROTOBUF SQL function must be a constant " +
"map of strings to strings containing the options to use for converting the value " +
"to Protobuf format"),
"hint" -> ""),
queryContext = Array(ExpectedContext(
fragment =
s"to_protobuf(complex_struct, 'SimpleMessageJavaTypes', '$testFileDescFile', 42)",
start = 10,
stop = 172))
)

// Negative tests for from_protobuf.
checkError(
exception = intercept[AnalysisException](sql(
s"""
|SELECT from_protobuf(protobuf_data, 42, '$testFileDescFile', map())
|FROM ($toProtobufSql)
|""".stripMargin)),
errorClass = "DATATYPE_MISMATCH.TYPE_CHECK_FAILURE_WITH_HINT",
parameters = Map(
"sqlExpr" -> s"""\"fromprotobuf(protobuf_data, 42, $testFileDescFile, map())\"""",
"msg" -> ("The second argument of the FROM_PROTOBUF SQL function must be a constant " +
"string representing the Protobuf message name"),
"hint" -> ""),
queryContext = Array(ExpectedContext(
fragment = s"from_protobuf(protobuf_data, 42, '$testFileDescFile', map())",
start = 8,
stop = 152))
)
checkError(
exception = intercept[AnalysisException](sql(
s"""
|SELECT from_protobuf(protobuf_data, 'SimpleMessageJavaTypes', 42, map())
|FROM ($toProtobufSql)
|""".stripMargin)),
errorClass = "DATATYPE_MISMATCH.TYPE_CHECK_FAILURE_WITH_HINT",
parameters = Map(
"sqlExpr" -> "\"fromprotobuf(protobuf_data, SimpleMessageJavaTypes, 42, map())\"",
"msg" -> ("The third argument of the FROM_PROTOBUF SQL function must be a constant " +
"string representing the Protobuf descriptor file path"),
"hint" -> ""),
queryContext = Array(ExpectedContext(
fragment = "from_protobuf(protobuf_data, 'SimpleMessageJavaTypes', 42, map())",
start = 8,
stop = 72))
)
checkError(
exception = intercept[AnalysisException](sql(
s"""
|SELECT
| from_protobuf(protobuf_data, 'SimpleMessageJavaTypes', '$testFileDescFile', 42)
|FROM ($toProtobufSql)
|""".stripMargin)),
errorClass = "DATATYPE_MISMATCH.TYPE_CHECK_FAILURE_WITH_HINT",
parameters = Map(
"sqlExpr" ->
s"""\"fromprotobuf(protobuf_data, SimpleMessageJavaTypes, $testFileDescFile, 42)\"""",
"msg" -> ("The fourth argument of the FROM_PROTOBUF SQL function must be a constant " +
"map of strings to strings containing the options to use for converting the value " +
"from Protobuf format"),
"hint" -> ""),
queryContext = Array(ExpectedContext(
fragment =
s"from_protobuf(protobuf_data, 'SimpleMessageJavaTypes', '$testFileDescFile', 42)",
start = 10,
stop = 173))
)
}
}

def testFromProtobufWithOptions(
df: DataFrame,
expectedDf: DataFrame,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -869,7 +869,11 @@ object FunctionRegistry {

// Avro
expression[FromAvro]("from_avro"),
expression[ToAvro]("to_avro")
expression[ToAvro]("to_avro"),

// Protobuf
expression[FromProtobuf]("from_protobuf"),
expression[ToProtobuf]("to_protobuf")
)

val builtin: SimpleFunctionRegistry = {
Expand Down
Loading