Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix
  • Loading branch information
panbingkun committed Nov 18, 2024
commit d58719c97568b2917b6ad68c82bf5a7e316bbc85
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.protobuf.utils.{ProtobufUtils, SchemaConverters}
import org.apache.spark.sql.sources.{EqualTo, Not}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.{ProtobufUtils => CommonProtobufUtils}
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.ArrayImplicits._

Expand All @@ -39,15 +40,15 @@ class ProtobufCatalystDataConversionSuite
with ProtobufTestBase {

private val testFileDescFile = protobufDescriptorFile("catalyst_types.desc")
private val testFileDesc = ProtobufUtils.readDescriptorFileContent(testFileDescFile)
private val testFileDesc = CommonProtobufUtils.readDescriptorFileContent(testFileDescFile)
private val javaClassNamePrefix = "org.apache.spark.sql.protobuf.protos.CatalystTypes$"

private def checkResultWithEval(
data: Literal,
descFilePath: String,
messageName: String,
expected: Any): Unit = {
val descBytes = ProtobufUtils.readDescriptorFileContent(descFilePath)
val descBytes = CommonProtobufUtils.readDescriptorFileContent(descFilePath)
withClue("(Eval check with Java class name)") {
val className = s"$javaClassNamePrefix$messageName"
checkEvaluation(
Expand All @@ -72,7 +73,7 @@ class ProtobufCatalystDataConversionSuite
actualSchema: String,
badSchema: String): Unit = {

val descBytes = ProtobufUtils.readDescriptorFileContent(descFilePath)
val descBytes = CommonProtobufUtils.readDescriptorFileContent(descFilePath)
val binary = CatalystDataToProtobuf(data, actualSchema, Some(descBytes))

intercept[Exception] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,19 @@ import org.apache.spark.sql.protobuf.utils.ProtobufOptions
import org.apache.spark.sql.protobuf.utils.ProtobufUtils
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.{ProtobufUtils => CommonProtobufUtils}

class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with ProtobufTestBase
with Serializable {

import testImplicits._

val testFileDescFile = protobufDescriptorFile("functions_suite.desc")
private val testFileDesc = ProtobufUtils.readDescriptorFileContent(testFileDescFile)
private val testFileDesc = CommonProtobufUtils.readDescriptorFileContent(testFileDescFile)
private val javaClassNamePrefix = "org.apache.spark.sql.protobuf.protos.SimpleMessageProtos$"

val proto2FileDescFile = protobufDescriptorFile("proto2_messages.desc")
val proto2FileDesc = ProtobufUtils.readDescriptorFileContent(proto2FileDescFile)
val proto2FileDesc = CommonProtobufUtils.readDescriptorFileContent(proto2FileDescFile)
private val proto2JavaClassNamePrefix = "org.apache.spark.sql.protobuf.protos.Proto2Messages$"

private def emptyBinaryDF = Seq(Array[Byte]()).toDF("binary")
Expand Down Expand Up @@ -467,7 +468,7 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot

test("Handle extra fields : oldProducer -> newConsumer") {
val catalystTypesFile = protobufDescriptorFile("catalyst_types.desc")
val descBytes = ProtobufUtils.readDescriptorFileContent(catalystTypesFile)
val descBytes = CommonProtobufUtils.readDescriptorFileContent(catalystTypesFile)

val oldProducer = ProtobufUtils.buildDescriptor(descBytes, "oldProducer")
val newConsumer = ProtobufUtils.buildDescriptor(descBytes, "newConsumer")
Expand Down Expand Up @@ -509,7 +510,7 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot

test("Handle extra fields : newProducer -> oldConsumer") {
val catalystTypesFile = protobufDescriptorFile("catalyst_types.desc")
val descBytes = ProtobufUtils.readDescriptorFileContent(catalystTypesFile)
val descBytes = CommonProtobufUtils.readDescriptorFileContent(catalystTypesFile)

val newProducer = ProtobufUtils.buildDescriptor(descBytes, "newProducer")
val oldConsumer = ProtobufUtils.buildDescriptor(descBytes, "oldConsumer")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.Cast.toSQLType
import org.apache.spark.sql.protobuf.utils.ProtobufUtils
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{IntegerType, StructType}
import org.apache.spark.sql.util.{ProtobufUtils => CommonProtobufUtils}

/**
* Tests for [[ProtobufSerializer]] and [[ProtobufDeserializer]] with a more specific focus on
Expand All @@ -37,12 +38,12 @@ class ProtobufSerdeSuite extends SharedSparkSession with ProtobufTestBase {
import ProtoSerdeSuite.MatchType._

private val testFileDescFile = protobufDescriptorFile("serde_suite.desc")
private val testFileDesc = ProtobufUtils.readDescriptorFileContent(testFileDescFile)
private val testFileDesc = CommonProtobufUtils.readDescriptorFileContent(testFileDescFile)

private val javaClassNamePrefix = "org.apache.spark.sql.protobuf.protos.SerdeSuiteProtos$"

private val proto2DescFile = protobufDescriptorFile("proto2_messages.desc")
private val proto2Desc = ProtobufUtils.readDescriptorFileContent(proto2DescFile)
private val proto2Desc = CommonProtobufUtils.readDescriptorFileContent(proto2DescFile)

test("Test basic conversion") {
withFieldMatchType { fieldMatch =>
Expand Down Expand Up @@ -215,7 +216,7 @@ class ProtobufSerdeSuite extends SharedSparkSession with ProtobufTestBase {

val e1 = intercept[AnalysisException] {
ProtobufUtils.buildDescriptor(
ProtobufUtils.readDescriptorFileContent(fileDescFile),
CommonProtobufUtils.readDescriptorFileContent(fileDescFile),
"SerdeBasicMessage"
)
}
Expand All @@ -225,7 +226,7 @@ class ProtobufSerdeSuite extends SharedSparkSession with ProtobufTestBase {
condition = "CANNOT_PARSE_PROTOBUF_DESCRIPTOR")

val basicMessageDescWithoutImports = descriptorSetWithoutImports(
ProtobufUtils.readDescriptorFileContent(
CommonProtobufUtils.readDescriptorFileContent(
protobufDescriptorFile("basicmessage.desc")
),
"BasicMessage"
Expand Down