Skip to content

Commit 679888a

Browse files
committed
Simplifies ParquetSchemaConverter and updates outdated comments
1 parent 6bda94b commit 679888a

File tree

2 files changed

+109
-165
lines changed

2 files changed

+109
-165
lines changed

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetSchemaConverter.scala

Lines changed: 105 additions & 159 deletions
Original file line numberDiff line numberDiff line change
@@ -329,10 +329,6 @@ private[parquet] class ParquetSchemaConverter(
329329
ParquetSchemaConverter.checkFieldName(field.name)
330330

331331
field.dataType match {
332-
// ===================
333-
// Simple atomic types
334-
// ===================
335-
336332
case BooleanType =>
337333
Types.primitive(BOOLEAN, repetition).named(field.name)
338334

@@ -363,173 +359,123 @@ private[parquet] class ParquetSchemaConverter(
363359
// NOTE: Spark SQL TimestampType is NOT a well defined type in Parquet format spec.
364360
//
365361
// As stated in PARQUET-323, Parquet `INT96` was originally introduced to represent nanosecond
366-
// timestamp in Impala for some historical reasons, it's not recommended to be used for any
367-
// other types and will probably be deprecated in future Parquet format spec. That's the
368-
// reason why Parquet format spec only defines `TIMESTAMP_MILLIS` and `TIMESTAMP_MICROS` which
369-
// are both logical types annotating `INT64`.
362+
// timestamp in Impala for some historical reasons. It's not recommended to be used for any
363+
// other types and will probably be deprecated in some future version of parquet-format spec.
364+
// That's the reason why parquet-format spec only defines `TIMESTAMP_MILLIS` and
365+
// `TIMESTAMP_MICROS` which are both logical types annotating `INT64`.
370366
//
371367
// Originally, Spark SQL uses the same nanosecond timestamp type as Impala and Hive. Starting
372-
// from Spark 1.5.0, we resort to a timestamp type with 100 ns precision so that we can store
373-
// a timestamp into a `Long`. This design decision is subject to change though, for example,
374-
// we may resort to microsecond precision in the future.
368+
// from Spark 1.5.0, we resort to microsecond timestamp type.
375369
//
376-
// For Parquet, we plan to write all `TimestampType` value as `TIMESTAMP_MICROS`, but it's
377-
// currently not implemented yet because parquet-mr 1.7.0 (the version we're currently using)
378-
// hasn't implemented `TIMESTAMP_MICROS` yet.
370+
// We plan to write all `TimestampType` values as `TIMESTAMP_MICROS`, but up to writing, the
371+
// most recent version of parquet-mr (1.8.1) hasn't implemented `TIMESTAMP_MICROS` yet.
379372
//
380-
// TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that.
373+
// TODO Converts to `TIMESTAMP_MICROS` once parquet-mr implements that.
381374
case TimestampType =>
382375
Types.primitive(INT96, repetition).named(field.name)
383376

384377
case BinaryType =>
385378
Types.primitive(BINARY, repetition).named(field.name)
386379

387-
// ======================
388-
// Decimals (legacy mode)
389-
// ======================
390-
391-
// Spark 1.4.x and prior versions only support decimals with a maximum precision of 18 and
392-
// always store decimals in fixed-length byte arrays. To keep compatibility with these older
393-
// versions, here we convert decimals with all precisions to `FIXED_LEN_BYTE_ARRAY` annotated
394-
// by `DECIMAL`.
395-
case DecimalType.Fixed(precision, scale) if writeLegacyParquetFormat =>
396-
Types
397-
.primitive(FIXED_LEN_BYTE_ARRAY, repetition)
398-
.as(DECIMAL)
399-
.precision(precision)
400-
.scale(scale)
401-
.length(minBytesForPrecision(precision))
402-
.named(field.name)
403-
404-
// ========================
405-
// Decimals (standard mode)
406-
// ========================
407-
408-
// Uses INT32 for 1 <= precision <= 9
409-
case DecimalType.Fixed(precision, scale)
410-
if precision <= MAX_PRECISION_FOR_INT32 && !writeLegacyParquetFormat =>
411-
Types
412-
.primitive(INT32, repetition)
413-
.as(DECIMAL)
414-
.precision(precision)
415-
.scale(scale)
416-
.named(field.name)
417-
418-
// Uses INT64 for 10 <= precision <= 18
419-
case DecimalType.Fixed(precision, scale)
420-
if precision <= MAX_PRECISION_FOR_INT64 && !writeLegacyParquetFormat =>
421-
Types
422-
.primitive(INT64, repetition)
423-
.as(DECIMAL)
424-
.precision(precision)
425-
.scale(scale)
426-
.named(field.name)
427-
428-
// Uses FIXED_LEN_BYTE_ARRAY for all other precisions
429-
case DecimalType.Fixed(precision, scale) if !writeLegacyParquetFormat =>
430-
Types
431-
.primitive(FIXED_LEN_BYTE_ARRAY, repetition)
432-
.as(DECIMAL)
433-
.precision(precision)
434-
.scale(scale)
435-
.length(minBytesForPrecision(precision))
436-
.named(field.name)
437-
438-
// ===================================
439-
// ArrayType and MapType (legacy mode)
440-
// ===================================
441-
442-
// Spark 1.4.x and prior versions convert ArrayType with nullable elements into a 3-level
443-
// LIST structure. This behavior is somewhat a hybrid of parquet-hive and parquet-avro
444-
// (1.6.0rc3): the 3-level structure is similar to parquet-hive while the 3rd level anonymous
445-
// field name "array" is from parquet-avro. Note that this case is covered by the backwards-
446-
// compatibility rules implemented in `isElementType()`.
447-
case ArrayType(elementType, nullable @ true) if writeLegacyParquetFormat =>
448-
// <list-repetition> group <name> (LIST) {
449-
// optional group bag {
450-
// repeated <element-type> array;
451-
// }
452-
// }
453-
ConversionPatterns.listType(
454-
repetition,
455-
field.name,
456-
Types
457-
.buildGroup(REPEATED)
458-
// "array" is the name chosen by Spark SQL 1.4.0 and prior versions
459-
.addField(convertField(StructField("array", elementType, nullable)))
460-
.named("bag"))
461-
462-
// Spark 1.4.x and prior versions convert ArrayType with non-nullable elements into a 2-level
463-
// LIST structure. This behavior mimics parquet-avro (1.6.0rc3). Note that this case is
464-
// covered by the backwards-compatibility rules implemented in `isElementType()`.
465-
case ArrayType(elementType, nullable @ false) if writeLegacyParquetFormat =>
466-
// <list-repetition> group <name> (LIST) {
467-
// repeated <element-type> array;
468-
// }
469-
ConversionPatterns.listType(
470-
repetition,
471-
field.name,
472-
// "array" is the name chosen by parquet-avro (1.7.0 and prior version)
473-
convertField(StructField("array", elementType, nullable), REPEATED))
474-
475-
// Spark 1.4.x and prior versions convert MapType into a 3-level group annotated by
476-
// MAP_KEY_VALUE. This is covered by `convertGroupField(field: GroupType): DataType`.
477-
case MapType(keyType, valueType, valueContainsNull) if writeLegacyParquetFormat =>
478-
// <map-repetition> group <name> (MAP) {
479-
// repeated group map (MAP_KEY_VALUE) {
480-
// required <key-type> key;
481-
// <value-repetition> <value-type> value;
482-
// }
483-
// }
484-
ConversionPatterns.mapType(
485-
repetition,
486-
field.name,
487-
convertField(StructField("key", keyType, nullable = false)),
488-
convertField(StructField("value", valueType, valueContainsNull)))
489-
490-
// =====================================
491-
// ArrayType and MapType (standard mode)
492-
// =====================================
493-
494-
case ArrayType(elementType, containsNull) if !writeLegacyParquetFormat =>
495-
// <list-repetition> group <name> (LIST) {
496-
// repeated group list {
497-
// <element-repetition> <element-type> element;
498-
// }
499-
// }
500-
Types
501-
.buildGroup(repetition).as(LIST)
502-
.addField(
503-
Types.repeatedGroup()
504-
.addField(convertField(StructField("element", elementType, containsNull)))
505-
.named("list"))
506-
.named(field.name)
507-
508-
case MapType(keyType, valueType, valueContainsNull) =>
509-
// <map-repetition> group <name> (MAP) {
510-
// repeated group key_value {
511-
// required <key-type> key;
512-
// <value-repetition> <value-type> value;
513-
// }
514-
// }
515-
Types
516-
.buildGroup(repetition).as(MAP)
517-
.addField(
380+
case DecimalType.Fixed(precision, scale) =>
381+
val builder = writeLegacyParquetFormat match {
382+
// Standard mode, 1 <= precision <= 9, converts to INT32 based DECIMAL
383+
case false if precision <= MAX_PRECISION_FOR_INT32 =>
384+
Types.primitive(INT32, repetition)
385+
386+
// Standard mode, 10 <= precision <= 18, converts to INT64 based DECIMAL
387+
case false if precision <= MAX_PRECISION_FOR_INT64 =>
388+
Types.primitive(INT64, repetition)
389+
390+
// All other cases:
391+
// - Standard mode, 19 <= precision <= 38, converts to FIXED_LEN_BYTE_ARRAY based DECIMAL
392+
// - Legacy mode, 1 <= precision <= 38, converts to FIXED_LEN_BYTE_ARRAY based DECIMAL
393+
case _ =>
394+
val numBytes = minBytesForPrecision(precision)
395+
Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition).length(numBytes)
396+
}
397+
398+
builder.as(DECIMAL).precision(precision).scale(scale).named(field.name)
399+
400+
case t: ArrayType =>
401+
val repeatedType = (writeLegacyParquetFormat, t.containsNull) match {
402+
case (true, true) =>
403+
// Legacy mode: Spark 1.4.x and prior versions convert `ArrayType` with nullable
404+
// elements into a 3-level `LIST` structure. This behavior is somewhat a hybrid of
405+
// parquet-hive and parquet-avro (1.6.0rc3): the 3-level structure is similar to
406+
// parquet-hive while the 3rd level anonymous field name "array" is from parquet-avro.
407+
//
408+
// <list-repetition> group <name> (LIST) {
409+
// repeated group bag { |
410+
// optional <element-type> array; |- repeatedType
411+
// } |
412+
// }
518413
Types
519414
.repeatedGroup()
520-
.addField(convertField(StructField("key", keyType, nullable = false)))
521-
.addField(convertField(StructField("value", valueType, valueContainsNull)))
522-
.named("key_value"))
523-
.named(field.name)
524-
525-
// ===========
526-
// Other types
527-
// ===========
528-
529-
case StructType(fields) =>
530-
fields.foldLeft(Types.buildGroup(repetition)) { (builder, field) =>
531-
builder.addField(convertField(field))
532-
}.named(field.name)
415+
.addField(convertField(StructField("array", t.elementType, t.containsNull)))
416+
.named("bag")
417+
418+
case (true, false) =>
419+
// Legacy mode: Spark 1.4.x and prior versions convert `ArrayType` with non-nullable
420+
// elements into a 2-level `LIST` structure. This behavior mimics parquet-avro
421+
// (1.6.0rc3).
422+
//
423+
// <list-repetition> group <name> (LIST) {
424+
// repeated <element-type> array; <- repeatedType
425+
// }
426+
convertField(StructField("array", t.elementType, t.containsNull), REPEATED)
427+
428+
case (false, _) =>
429+
// Standard mode:
430+
//
431+
// <list-repetition> group <name> (LIST) {
432+
// repeated group list { |
433+
// <element-repetition> <element-type> element; |- repeatedType
434+
// } |
435+
// }
436+
Types
437+
.repeatedGroup()
438+
.addField(convertField(StructField("element", t.elementType, t.containsNull)))
439+
.named("list")
440+
}
441+
442+
Types.buildGroup(repetition).as(LIST).addField(repeatedType).named(field.name)
443+
444+
case t: MapType =>
445+
val repeatedGroupBuilder =
446+
Types
447+
.repeatedGroup()
448+
.addField(convertField(StructField("key", t.keyType, nullable = false)))
449+
.addField(convertField(StructField("value", t.valueType, t.valueContainsNull)))
450+
451+
val repeatedGroup = if (writeLegacyParquetFormat) {
452+
// Legacy mode: Spark 1.4.x and prior versions convert MapType into a 3-level group
453+
// annotated by MAP_KEY_VALUE.
454+
//
455+
// <map-repetition> group <name> (MAP) {
456+
// repeated group map (MAP_KEY_VALUE) { |
457+
// required <key-type> key; |- repeatedGroup
458+
// <value-repetition> <value-type> value; |
459+
// } |
460+
// }
461+
repeatedGroupBuilder.as(MAP_KEY_VALUE).named("map")
462+
} else {
463+
// Standard mode:
464+
//
465+
// <map-repetition> group <name> (MAP) {
466+
// repeated group key_value { |
467+
// required <key-type> key; |- repeatedGroup
468+
// <value-repetition> <value-type> value; |
469+
// } |
470+
// }
471+
repeatedGroupBuilder.named("key_value")
472+
}
473+
474+
Types.buildGroup(repetition).as(MAP).addField(repeatedGroup).named(field.name)
475+
476+
case t: StructType =>
477+
val parquetFields = t.fields.map(convertField)
478+
Types.buildGroup(repetition).addFields(parquetFields: _*).named(field.name)
533479

534480
case udt: UserDefinedType[_] =>
535481
convertField(field.copy(dataType = udt.sqlType))

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetWriteSupport.scala

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -243,18 +243,16 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit
243243
}
244244

245245
writeLegacyParquetFormat match {
246-
// Standard mode, writes decimals with precision <= 9 as INT32
246+
// Standard mode, 1 <= precision <= 9, writes as INT32
247247
case false if precision <= MAX_PRECISION_FOR_INT32 => int32Writer
248248

249-
// Standard mode, writes decimals with precision <= 18 as INT64
249+
// Standard mode, 10 <= precision <= 18, writes as INT64
250250
case false if precision <= MAX_PRECISION_FOR_INT64 => int64Writer
251251

252-
// Legacy mode, writes decimals with precision <= 18 as FIXED_LEN_BYTE_ARRAY
252+
// Legacy mode, 1 <= precision <= 18, writes as FIXED_LEN_BYTE_ARRAY
253253
case true if precision <= MAX_PRECISION_FOR_INT64 => binaryWriterUsingUnscaledLong
254254

255-
// All other cases:
256-
// - Standard mode, writes decimals with precision > 18 as FIXED_LEN_BYTE_ARRAY
257-
// - Legacy mode, writes decimals with precision > 18 as FIXED_LEN_BYTE_ARRAY
255+
// Either standard or legacy mode, 19 <= precision <= 38, writes as FIXED_LEN_BYTE_ARRAY
258256
case _ => binaryWriterUsingUnscaledBytes
259257
}
260258
}

0 commit comments

Comments
 (0)