Skip to content

Commit 9e95e5d

Browse files
[SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-classes
This is the follow-up PR to apache#37972 and apache#38212 ### What changes were proposed in this pull request? 1. Move spark-protobuf error classes to the spark error-classes framework(core/src/main/resources/error/error-classes.json). 2. Support protobuf imports 3. validate protobuf timestamp and duration types. ### Why are the changes needed? N/A ### Does this PR introduce _any_ user-facing change? None ### How was this patch tested? Existing tests should cover the validation of this PR. CC: rangadi mposdev21 gengliangwang Closes apache#38344 from SandishKumarHN/SPARK-40777-ProtoErrorCls. Authored-by: SandishKumarHN <sanysandish@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
1 parent 1661d86 commit 9e95e5d

File tree

20 files changed

+625
-191
lines changed

20 files changed

+625
-191
lines changed

connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,10 @@ import scala.util.control.NonFatal
2121

2222
import com.google.protobuf.DynamicMessage
2323

24-
import org.apache.spark.SparkException
25-
import org.apache.spark.sql.AnalysisException
2624
import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, SpecificInternalRow, UnaryExpression}
2725
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode}
2826
import org.apache.spark.sql.catalyst.util.{FailFastMode, ParseMode, PermissiveMode}
27+
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
2928
import org.apache.spark.sql.protobuf.utils.{ProtobufOptions, ProtobufUtils, SchemaConverters}
3029
import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType, StructType}
3130

@@ -71,16 +70,11 @@ private[protobuf] case class ProtobufDataToCatalyst(
7170
@transient private lazy val parseMode: ParseMode = {
7271
val mode = protobufOptions.parseMode
7372
if (mode != PermissiveMode && mode != FailFastMode) {
74-
throw new AnalysisException(unacceptableModeMessage(mode.name))
73+
throw QueryCompilationErrors.parseModeUnsupportedError(prettyName, mode)
7574
}
7675
mode
7776
}
7877

79-
private def unacceptableModeMessage(name: String): String = {
80-
s"from_protobuf() doesn't support the $name mode. " +
81-
s"Acceptable modes are ${PermissiveMode.name} and ${FailFastMode.name}."
82-
}
83-
8478
@transient private lazy val nullResultRow: Any = dataType match {
8579
case st: StructType =>
8680
val resultRow = new SpecificInternalRow(st.map(_.dataType))
@@ -98,13 +92,9 @@ private[protobuf] case class ProtobufDataToCatalyst(
9892
case PermissiveMode =>
9993
nullResultRow
10094
case FailFastMode =>
101-
throw new SparkException(
102-
"Malformed records are detected in record parsing. " +
103-
s"Current parse Mode: ${FailFastMode.name}. To process malformed records as null " +
104-
"result, try setting the option 'mode' as 'PERMISSIVE'.",
105-
e)
95+
throw QueryExecutionErrors.malformedProtobufMessageDetectedInMessageParsingError(e)
10696
case _ =>
107-
throw new AnalysisException(unacceptableModeMessage(parseMode.name))
97+
throw QueryCompilationErrors.parseModeUnsupportedError(prettyName, parseMode)
10898
}
10999
}
110100

@@ -119,8 +109,8 @@ private[protobuf] case class ProtobufDataToCatalyst(
119109
case Some(number) =>
120110
// Unknown fields contain a field with same number as a known field. Must be due to
121111
// mismatch of schema between writer and reader here.
122-
throw new IllegalArgumentException(s"Type mismatch encountered for field:" +
123-
s" ${messageDescriptor.getFields.get(number)}")
112+
throw QueryCompilationErrors.protobufFieldTypeMismatchError(
113+
messageDescriptor.getFields.get(number).toString)
124114
case None =>
125115
}
126116

connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,14 @@ import com.google.protobuf.{ByteString, DynamicMessage, Message}
2222
import com.google.protobuf.Descriptors._
2323
import com.google.protobuf.Descriptors.FieldDescriptor.JavaType._
2424

25+
import org.apache.spark.sql.AnalysisException
2526
import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters, StructFilters}
2627
import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArrayData}
2728
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, DateTimeUtils, GenericArrayData}
2829
import org.apache.spark.sql.errors.QueryCompilationErrors
2930
import org.apache.spark.sql.protobuf.utils.ProtobufUtils
3031
import org.apache.spark.sql.protobuf.utils.ProtobufUtils.ProtoMatchedField
3132
import org.apache.spark.sql.protobuf.utils.ProtobufUtils.toFieldStr
32-
import org.apache.spark.sql.protobuf.utils.SchemaConverters.IncompatibleSchemaException
3333
import org.apache.spark.sql.types._
3434
import org.apache.spark.unsafe.types.UTF8String
3535

@@ -61,10 +61,10 @@ private[sql] class ProtobufDeserializer(
6161
}
6262
}
6363
} catch {
64-
case ise: IncompatibleSchemaException =>
65-
throw new IncompatibleSchemaException(
66-
s"Cannot convert Protobuf type ${rootDescriptor.getName} " +
67-
s"to SQL type ${rootCatalystType.sql}.",
64+
case ise: AnalysisException =>
65+
throw QueryCompilationErrors.cannotConvertProtobufTypeToCatalystTypeError(
66+
rootDescriptor.getName,
67+
rootCatalystType,
6868
ise)
6969
}
7070

@@ -152,11 +152,6 @@ private[sql] class ProtobufDeserializer(
152152
catalystType: DataType,
153153
protoPath: Seq[String],
154154
catalystPath: Seq[String]): (CatalystDataUpdater, Int, Any) => Unit = {
155-
val errorPrefix = s"Cannot convert Protobuf ${toFieldStr(protoPath)} to " +
156-
s"SQL ${toFieldStr(catalystPath)} because "
157-
val incompatibleMsg = errorPrefix +
158-
s"schema is incompatible (protoType = ${protoType} ${protoType.toProto.getLabel} " +
159-
s"${protoType.getJavaType} ${protoType.getType}, sqlType = ${catalystType.sql})"
160155

161156
(protoType.getJavaType, catalystType) match {
162157

@@ -175,8 +170,9 @@ private[sql] class ProtobufDeserializer(
175170
case (INT, ShortType) =>
176171
(updater, ordinal, value) => updater.setShort(ordinal, value.asInstanceOf[Short])
177172

178-
case (BOOLEAN | INT | FLOAT | DOUBLE | LONG | STRING | ENUM | BYTE_STRING,
179-
ArrayType(dataType: DataType, containsNull)) if protoType.isRepeated =>
173+
case (
174+
BOOLEAN | INT | FLOAT | DOUBLE | LONG | STRING | ENUM | BYTE_STRING,
175+
ArrayType(dataType: DataType, containsNull)) if protoType.isRepeated =>
180176
newArrayWriter(protoType, protoPath, catalystPath, dataType, containsNull)
181177

182178
case (LONG, LongType) =>
@@ -199,7 +195,8 @@ private[sql] class ProtobufDeserializer(
199195
(updater, ordinal, value) =>
200196
val byte_array = value match {
201197
case s: ByteString => s.toByteArray
202-
case _ => throw new Exception("Invalid ByteString format")
198+
case unsupported =>
199+
throw QueryCompilationErrors.invalidByteStringFormatError(unsupported)
203200
}
204201
updater.set(ordinal, byte_array)
205202

@@ -244,7 +241,13 @@ private[sql] class ProtobufDeserializer(
244241
case (ENUM, StringType) =>
245242
(updater, ordinal, value) => updater.set(ordinal, UTF8String.fromString(value.toString))
246243

247-
case _ => throw new IncompatibleSchemaException(incompatibleMsg)
244+
case _ =>
245+
throw QueryCompilationErrors.cannotConvertProtobufTypeToSqlTypeError(
246+
toFieldStr(protoPath),
247+
catalystPath,
248+
s"${protoType} ${protoType.toProto.getLabel} ${protoType.getJavaType}" +
249+
s" ${protoType.getType}",
250+
catalystType)
248251
}
249252
}
250253

connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufSerializer.scala

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,14 @@ import com.google.protobuf.Descriptors.{Descriptor, FieldDescriptor}
2323
import com.google.protobuf.Descriptors.FieldDescriptor.JavaType._
2424

2525
import org.apache.spark.internal.Logging
26+
import org.apache.spark.sql.AnalysisException
2627
import org.apache.spark.sql.catalyst.InternalRow
2728
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
2829
import org.apache.spark.sql.catalyst.util.{DateTimeUtils, IntervalUtils}
2930
import org.apache.spark.sql.catalyst.util.IntervalStringStyles.ANSI_STYLE
31+
import org.apache.spark.sql.errors.QueryCompilationErrors
3032
import org.apache.spark.sql.protobuf.utils.ProtobufUtils
3133
import org.apache.spark.sql.protobuf.utils.ProtobufUtils.{toFieldStr, ProtoMatchedField}
32-
import org.apache.spark.sql.protobuf.utils.SchemaConverters.IncompatibleSchemaException
3334
import org.apache.spark.sql.types._
3435

3536
/**
@@ -53,10 +54,10 @@ private[sql] class ProtobufSerializer(
5354
newStructConverter(st, rootDescriptor, Nil, Nil).asInstanceOf[Any => Any]
5455
}
5556
} catch {
56-
case ise: IncompatibleSchemaException =>
57-
throw new IncompatibleSchemaException(
58-
s"Cannot convert SQL type ${rootCatalystType.sql} to Protobuf type " +
59-
s"${rootDescriptor.getName}.",
57+
case ise: AnalysisException =>
58+
throw QueryCompilationErrors.cannotConvertSqlTypeToProtobufError(
59+
rootDescriptor.getName,
60+
rootCatalystType,
6061
ise)
6162
}
6263
if (nullable) { (data: Any) =>
@@ -77,8 +78,6 @@ private[sql] class ProtobufSerializer(
7778
fieldDescriptor: FieldDescriptor,
7879
catalystPath: Seq[String],
7980
protoPath: Seq[String]): Converter = {
80-
val errorPrefix = s"Cannot convert SQL ${toFieldStr(catalystPath)} " +
81-
s"to Protobuf ${toFieldStr(protoPath)} because "
8281
(catalystType, fieldDescriptor.getJavaType) match {
8382
case (NullType, _) =>
8483
(getter, ordinal) => null
@@ -104,10 +103,11 @@ private[sql] class ProtobufSerializer(
104103
(getter, ordinal) =>
105104
val data = getter.getUTF8String(ordinal).toString
106105
if (!enumSymbols.contains(data)) {
107-
throw new IncompatibleSchemaException(
108-
errorPrefix +
109-
s""""$data" cannot be written since it's not defined in enum """ +
110-
enumSymbols.mkString("\"", "\", \"", "\""))
106+
throw QueryCompilationErrors.cannotConvertCatalystTypeToProtobufEnumTypeError(
107+
catalystPath,
108+
toFieldStr(protoPath),
109+
data,
110+
enumSymbols.mkString("\"", "\", \"", "\""))
111111
}
112112
fieldDescriptor.getEnumType.findValueByName(data)
113113
case (StringType, STRING) =>
@@ -124,7 +124,8 @@ private[sql] class ProtobufSerializer(
124124
case (TimestampType, MESSAGE) =>
125125
(getter, ordinal) =>
126126
val millis = DateTimeUtils.microsToMillis(getter.getLong(ordinal))
127-
Timestamp.newBuilder()
127+
Timestamp
128+
.newBuilder()
128129
.setSeconds((millis / 1000))
129130
.setNanos(((millis % 1000) * 1000000).toInt)
130131
.build()
@@ -201,7 +202,8 @@ private[sql] class ProtobufSerializer(
201202
val calendarInterval = IntervalUtils.fromIntervalString(dayTimeIntervalString)
202203

203204
val millis = DateTimeUtils.microsToMillis(calendarInterval.microseconds)
204-
val duration = Duration.newBuilder()
205+
val duration = Duration
206+
.newBuilder()
205207
.setSeconds((millis / 1000))
206208
.setNanos(((millis % 1000) * 1000000).toInt)
207209

@@ -215,10 +217,12 @@ private[sql] class ProtobufSerializer(
215217
duration.build()
216218

217219
case _ =>
218-
throw new IncompatibleSchemaException(
219-
errorPrefix +
220-
s"schema is incompatible (sqlType = ${catalystType.sql}, " +
221-
s"protoType = ${fieldDescriptor.getJavaType})")
220+
throw QueryCompilationErrors.cannotConvertCatalystTypeToProtobufTypeError(
221+
catalystPath,
222+
toFieldStr(protoPath),
223+
catalystType,
224+
s"${fieldDescriptor} ${fieldDescriptor.toProto.getLabel} ${fieldDescriptor.getJavaType}" +
225+
s" ${fieldDescriptor.getType}")
222226
}
223227
}
224228

0 commit comments

Comments
 (0)