Skip to content

Commit fd77ec6

Browse files
cashmandcloud-fan
authored andcommitted
[SPARK-53291][SQL] Fix nullability for value column
### What changes were proposed in this pull request? For shredded Variant, we currently always set the `value` column to be nullable. But when there is no corresponding `typed_value`, and the value doesn't represent an object field (where null implies missing from the object), the `value` is never null, and we can set the column to be required. ### Why are the changes needed? This shouldn't affect results as read by Spark, but it may cause the parquet file to be marginally larger, and the [spec](https://github.com/apache/parquet-format/blob/master/VariantShredding.md) wording indicates that `value` must be required in these situations, so a strict reader could reject the schema as it's currently being produced. ### Does this PR introduce _any_ user-facing change? Variant parquet file schema may change slightly. ### How was this patch tested? Unit test extended to cover this case. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52043 from cashmand/fix_nullability. Authored-by: cashmand <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 923d70f commit fd77ec6

File tree

2 files changed

+48
-7
lines changed

2 files changed

+48
-7
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkShreddingUtils.scala

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -473,13 +473,15 @@ case object SparkShreddingUtils {
473473
* b: struct<typed_value: string, value: binary>>>
474474
*
475475
*/
476-
def variantShreddingSchema(dataType: DataType, isTopLevel: Boolean = true): StructType = {
476+
def variantShreddingSchema(dataType: DataType,
477+
isTopLevel: Boolean = true,
478+
isObjectField : Boolean = false): StructType = {
477479
val fields = dataType match {
478480
case ArrayType(elementType, _) =>
479481
// Always set containsNull to false. One of value or typed_value must always be set for
480482
// array elements.
481483
val arrayShreddingSchema =
482-
ArrayType(variantShreddingSchema(elementType, false), containsNull = false)
484+
ArrayType(variantShreddingSchema(elementType, false, false), containsNull = false)
483485
Seq(
484486
StructField(VariantValueFieldName, BinaryType, nullable = true),
485487
StructField(TypedValueFieldName, arrayShreddingSchema, nullable = true)
@@ -489,15 +491,17 @@ case object SparkShreddingUtils {
489491
// "value" columna as "00", and missing values are represented by setting both "value" and
490492
// "typed_value" to null.
491493
val objectShreddingSchema = StructType(fields.map(f =>
492-
f.copy(dataType = variantShreddingSchema(f.dataType, false), nullable = false)))
494+
f.copy(dataType = variantShreddingSchema(f.dataType, false, true), nullable = false)))
493495
Seq(
494496
StructField(VariantValueFieldName, BinaryType, nullable = true),
495497
StructField(TypedValueFieldName, objectShreddingSchema, nullable = true)
496498
)
497499
case VariantType =>
498-
// For Variant, we don't need a typed column
500+
// For Variant, we don't need a typed column. If there is no typed column, value is required
501+
// for array elements or top-level fields, but optional for objects (where a null represents
502+
// a missing field).
499503
Seq(
500-
StructField(VariantValueFieldName, BinaryType, nullable = true)
504+
StructField(VariantValueFieldName, BinaryType, nullable = isObjectField)
501505
)
502506
case _: NumericType | BooleanType | _: StringType | BinaryType | _: DatetimeType =>
503507
Seq(

sql/core/src/test/scala/org/apache/spark/sql/VariantWriteShreddingSuite.scala

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,12 @@ class VariantWriteShreddingSuite extends SparkFunSuite with ExpressionEvalHelper
7777
StructField("value", BinaryType, nullable = true),
7878
StructField("typed_value", IntegerType, nullable = true))))
7979

80+
// If typed_value is not provided, value is required.
81+
assert(SparkShreddingUtils.variantShreddingSchema(VariantType) ==
82+
StructType(Seq(
83+
StructField("metadata", BinaryType, nullable = false),
84+
StructField("value", BinaryType, nullable = false))))
85+
8086
val fieldA = StructType(Seq(
8187
StructField("value", BinaryType, nullable = true),
8288
StructField("typed_value", TimestampNTZType, nullable = true)))
@@ -86,10 +92,22 @@ class VariantWriteShreddingSuite extends SparkFunSuite with ExpressionEvalHelper
8692
val fieldB = StructType(Seq(
8793
StructField("value", BinaryType, nullable = true),
8894
StructField("typed_value", arrayType, nullable = true)))
95+
// If typed_value is not provided for an object field, value is still optional.
96+
val fieldC = StructType(Seq(
97+
StructField("value", BinaryType, nullable = true)))
98+
// If typed_value is not provided for an array element, value is required.
99+
val untypedArrayType = ArrayType(StructType(Seq(
100+
StructField("value", BinaryType, nullable = false))), containsNull = false)
101+
val fieldD = StructType(Seq(
102+
StructField("value", BinaryType, nullable = true),
103+
StructField("typed_value", untypedArrayType, nullable = true)))
89104
val objectType = StructType(Seq(
90105
StructField("a", fieldA, nullable = false),
91-
StructField("b", fieldB, nullable = false)))
92-
val structSchema = DataType.fromDDL("a timestamp_ntz, b array<string>")
106+
StructField("b", fieldB, nullable = false),
107+
StructField("c", fieldC, nullable = false),
108+
StructField("d", fieldD, nullable = false)))
109+
val structSchema = DataType.fromDDL(
110+
"a timestamp_ntz, b array<string>, c variant, d array<variant>")
93111
assert(SparkShreddingUtils.variantShreddingSchema(structSchema) ==
94112
StructType(Seq(
95113
StructField("metadata", BinaryType, nullable = false),
@@ -185,6 +203,8 @@ class VariantWriteShreddingSuite extends SparkFunSuite with ExpressionEvalHelper
185203
testWithSchema(obj, t, Row(obj.getMetadata, untypedValue(obj), null))
186204
}
187205

206+
testWithSchema(obj, VariantType, Row(obj.getMetadata, untypedValue(obj)))
207+
188208
// Happy path
189209
testWithSchema(obj, StructType.fromDDL("a int, b string"),
190210
Row(obj.getMetadata, null, Row(Row(null, 1), Row(null, "hello"))))
@@ -210,6 +230,11 @@ class VariantWriteShreddingSuite extends SparkFunSuite with ExpressionEvalHelper
210230
testWithSchema(obj, ArrayType(StructType.fromDDL("a int, b string")),
211231
Row(obj.getMetadata, untypedValue(obj), null))
212232

233+
// Shred with no typed_value in field schema
234+
testWithSchema(obj, StructType.fromDDL("a variant, b variant"),
235+
Row(obj.getMetadata, null,
236+
Row(Row(untypedValue("1")), Row(untypedValue("\"hello\"")))))
237+
213238
// Similar to the case above where "b" was not in the shredding schema, but with the unshredded
214239
// value being an object. Check that the copied value has correct dictionary IDs.
215240
val obj2 = parseJson("""{"a": 1, "b": {"c": "hello"}}""")
@@ -230,6 +255,9 @@ class VariantWriteShreddingSuite extends SparkFunSuite with ExpressionEvalHelper
230255
StructType.fromDDL("a int, b string")).foreach { t =>
231256
testWithSchema(arr, t, Row(arr.getMetadata, untypedValue(arr), null))
232257
}
258+
259+
testWithSchema(arr, VariantType, Row(arr.getMetadata, untypedValue(arr)))
260+
233261
// First element is shredded
234262
testWithSchema(arr, ArrayType(StructType.fromDDL("a int, b string")),
235263
Row(arr.getMetadata, null, Array(
@@ -254,6 +282,15 @@ class VariantWriteShreddingSuite extends SparkFunSuite with ExpressionEvalHelper
254282
Row(null, 2),
255283
Row(null, 3)
256284
)))
285+
286+
// No typed_value in element schema
287+
testWithSchema(arr, ArrayType(VariantType),
288+
Row(arr.getMetadata, null, Array(
289+
Row(untypedValue("""{"a": 1, "b": "hello"}""")),
290+
Row(untypedValue("2")),
291+
Row(untypedValue("null")),
292+
Row(untypedValue("4"))
293+
)))
257294
}
258295

259296
}

0 commit comments

Comments
 (0)