Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Simplifies ParquetSchemaConverter and updates outdated comments
  • Loading branch information
liancheng committed Aug 3, 2015
commit 679888afa7f4128fb9bbbb021bd5ec06467f88ee
Original file line number Diff line number Diff line change
Expand Up @@ -329,10 +329,6 @@ private[parquet] class ParquetSchemaConverter(
ParquetSchemaConverter.checkFieldName(field.name)

field.dataType match {
// ===================
// Simple atomic types
// ===================

case BooleanType =>
Types.primitive(BOOLEAN, repetition).named(field.name)

Expand Down Expand Up @@ -363,173 +359,123 @@ private[parquet] class ParquetSchemaConverter(
// NOTE: Spark SQL TimestampType is NOT a well defined type in Parquet format spec.
//
// As stated in PARQUET-323, Parquet `INT96` was originally introduced to represent nanosecond
// timestamp in Impala for some historical reasons, it's not recommended to be used for any
// other types and will probably be deprecated in future Parquet format spec. That's the
// reason why Parquet format spec only defines `TIMESTAMP_MILLIS` and `TIMESTAMP_MICROS` which
// are both logical types annotating `INT64`.
// timestamp in Impala for some historical reasons. It's not recommended to be used for any
// other types and will probably be deprecated in some future version of parquet-format spec.
// That's the reason why parquet-format spec only defines `TIMESTAMP_MILLIS` and
// `TIMESTAMP_MICROS` which are both logical types annotating `INT64`.
//
// Originally, Spark SQL uses the same nanosecond timestamp type as Impala and Hive. Starting
// from Spark 1.5.0, we resort to a timestamp type with 100 ns precision so that we can store
// a timestamp into a `Long`. This design decision is subject to change though, for example,
// we may resort to microsecond precision in the future.
// from Spark 1.5.0, we resort to microsecond timestamp type.
//
// For Parquet, we plan to write all `TimestampType` value as `TIMESTAMP_MICROS`, but it's
// currently not implemented yet because parquet-mr 1.7.0 (the version we're currently using)
// hasn't implemented `TIMESTAMP_MICROS` yet.
// We plan to write all `TimestampType` values as `TIMESTAMP_MICROS`, but up to writing, the
// most recent version of parquet-mr (1.8.1) hasn't implemented `TIMESTAMP_MICROS` yet.
//
// TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that.
// TODO Converts to `TIMESTAMP_MICROS` once parquet-mr implements that.
case TimestampType =>
Types.primitive(INT96, repetition).named(field.name)

case BinaryType =>
Types.primitive(BINARY, repetition).named(field.name)

// ======================
// Decimals (legacy mode)
// ======================

// Spark 1.4.x and prior versions only support decimals with a maximum precision of 18 and
// always store decimals in fixed-length byte arrays. To keep compatibility with these older
// versions, here we convert decimals with all precisions to `FIXED_LEN_BYTE_ARRAY` annotated
// by `DECIMAL`.
case DecimalType.Fixed(precision, scale) if writeLegacyParquetFormat =>
Types
.primitive(FIXED_LEN_BYTE_ARRAY, repetition)
.as(DECIMAL)
.precision(precision)
.scale(scale)
.length(minBytesForPrecision(precision))
.named(field.name)

// ========================
// Decimals (standard mode)
// ========================

// Uses INT32 for 1 <= precision <= 9
case DecimalType.Fixed(precision, scale)
if precision <= MAX_PRECISION_FOR_INT32 && !writeLegacyParquetFormat =>
Types
.primitive(INT32, repetition)
.as(DECIMAL)
.precision(precision)
.scale(scale)
.named(field.name)

// Uses INT64 for 10 <= precision <= 18
case DecimalType.Fixed(precision, scale)
if precision <= MAX_PRECISION_FOR_INT64 && !writeLegacyParquetFormat =>
Types
.primitive(INT64, repetition)
.as(DECIMAL)
.precision(precision)
.scale(scale)
.named(field.name)

// Uses FIXED_LEN_BYTE_ARRAY for all other precisions
case DecimalType.Fixed(precision, scale) if !writeLegacyParquetFormat =>
Types
.primitive(FIXED_LEN_BYTE_ARRAY, repetition)
.as(DECIMAL)
.precision(precision)
.scale(scale)
.length(minBytesForPrecision(precision))
.named(field.name)

// ===================================
// ArrayType and MapType (legacy mode)
// ===================================

// Spark 1.4.x and prior versions convert ArrayType with nullable elements into a 3-level
// LIST structure. This behavior is somewhat a hybrid of parquet-hive and parquet-avro
// (1.6.0rc3): the 3-level structure is similar to parquet-hive while the 3rd level anonymous
// field name "array" is from parquet-avro. Note that this case is covered by the backwards-
// compatibility rules implemented in `isElementType()`.
case ArrayType(elementType, nullable @ true) if writeLegacyParquetFormat =>
// <list-repetition> group <name> (LIST) {
// optional group bag {
// repeated <element-type> array;
// }
// }
ConversionPatterns.listType(
repetition,
field.name,
Types
.buildGroup(REPEATED)
// "array" is the name chosen by Spark SQL 1.4.0 and prior versions
.addField(convertField(StructField("array", elementType, nullable)))
.named("bag"))

// Spark 1.4.x and prior versions convert ArrayType with non-nullable elements into a 2-level
// LIST structure. This behavior mimics parquet-avro (1.6.0rc3). Note that this case is
// covered by the backwards-compatibility rules implemented in `isElementType()`.
case ArrayType(elementType, nullable @ false) if writeLegacyParquetFormat =>
// <list-repetition> group <name> (LIST) {
// repeated <element-type> array;
// }
ConversionPatterns.listType(
repetition,
field.name,
// "array" is the name chosen by parquet-avro (1.7.0 and prior version)
convertField(StructField("array", elementType, nullable), REPEATED))

// Spark 1.4.x and prior versions convert MapType into a 3-level group annotated by
// MAP_KEY_VALUE. This is covered by `convertGroupField(field: GroupType): DataType`.
case MapType(keyType, valueType, valueContainsNull) if writeLegacyParquetFormat =>
// <map-repetition> group <name> (MAP) {
// repeated group map (MAP_KEY_VALUE) {
// required <key-type> key;
// <value-repetition> <value-type> value;
// }
// }
ConversionPatterns.mapType(
repetition,
field.name,
convertField(StructField("key", keyType, nullable = false)),
convertField(StructField("value", valueType, valueContainsNull)))

// =====================================
// ArrayType and MapType (standard mode)
// =====================================

case ArrayType(elementType, containsNull) if !writeLegacyParquetFormat =>
// <list-repetition> group <name> (LIST) {
// repeated group list {
// <element-repetition> <element-type> element;
// }
// }
Types
.buildGroup(repetition).as(LIST)
.addField(
Types.repeatedGroup()
.addField(convertField(StructField("element", elementType, containsNull)))
.named("list"))
.named(field.name)

case MapType(keyType, valueType, valueContainsNull) =>
// <map-repetition> group <name> (MAP) {
// repeated group key_value {
// required <key-type> key;
// <value-repetition> <value-type> value;
// }
// }
Types
.buildGroup(repetition).as(MAP)
.addField(
case DecimalType.Fixed(precision, scale) =>
val builder = writeLegacyParquetFormat match {
// Standard mode, 1 <= precision <= 9, converts to INT32 based DECIMAL
case false if precision <= MAX_PRECISION_FOR_INT32 =>
Types.primitive(INT32, repetition)

// Standard mode, 10 <= precision <= 18, converts to INT64 based DECIMAL
case false if precision <= MAX_PRECISION_FOR_INT64 =>
Types.primitive(INT64, repetition)

// All other cases:
// - Standard mode, 19 <= precision <= 38, converts to FIXED_LEN_BYTE_ARRAY based DECIMAL
// - Legacy mode, 1 <= precision <= 38, converts to FIXED_LEN_BYTE_ARRAY based DECIMAL
case _ =>
val numBytes = minBytesForPrecision(precision)
Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition).length(numBytes)
}

builder.as(DECIMAL).precision(precision).scale(scale).named(field.name)

case t: ArrayType =>
val repeatedType = (writeLegacyParquetFormat, t.containsNull) match {
case (true, true) =>
// Legacy mode: Spark 1.4.x and prior versions convert `ArrayType` with nullable
// elements into a 3-level `LIST` structure. This behavior is somewhat a hybrid of
// parquet-hive and parquet-avro (1.6.0rc3): the 3-level structure is similar to
// parquet-hive while the 3rd level anonymous field name "array" is from parquet-avro.
//
// <list-repetition> group <name> (LIST) {
// repeated group bag { |
// optional <element-type> array; |- repeatedType
// } |
// }
Types
.repeatedGroup()
.addField(convertField(StructField("key", keyType, nullable = false)))
.addField(convertField(StructField("value", valueType, valueContainsNull)))
.named("key_value"))
.named(field.name)

// ===========
// Other types
// ===========

case StructType(fields) =>
fields.foldLeft(Types.buildGroup(repetition)) { (builder, field) =>
builder.addField(convertField(field))
}.named(field.name)
.addField(convertField(StructField("array", t.elementType, t.containsNull)))
.named("bag")

case (true, false) =>
// Legacy mode: Spark 1.4.x and prior versions convert `ArrayType` with non-nullable
// elements into a 2-level `LIST` structure. This behavior mimics parquet-avro
// (1.6.0rc3).
//
// <list-repetition> group <name> (LIST) {
// repeated <element-type> array; <- repeatedType
// }
convertField(StructField("array", t.elementType, t.containsNull), REPEATED)

case (false, _) =>
// Standard mode:
//
// <list-repetition> group <name> (LIST) {
// repeated group list { |
// <element-repetition> <element-type> element; |- repeatedType
// } |
// }
Types
.repeatedGroup()
.addField(convertField(StructField("element", t.elementType, t.containsNull)))
.named("list")
}

Types.buildGroup(repetition).as(LIST).addField(repeatedType).named(field.name)

case t: MapType =>
val repeatedGroupBuilder =
Types
.repeatedGroup()
.addField(convertField(StructField("key", t.keyType, nullable = false)))
.addField(convertField(StructField("value", t.valueType, t.valueContainsNull)))

val repeatedGroup = if (writeLegacyParquetFormat) {
// Legacy mode: Spark 1.4.x and prior versions convert MapType into a 3-level group
// annotated by MAP_KEY_VALUE.
//
// <map-repetition> group <name> (MAP) {
// repeated group map (MAP_KEY_VALUE) { |
// required <key-type> key; |- repeatedGroup
// <value-repetition> <value-type> value; |
// } |
// }
repeatedGroupBuilder.as(MAP_KEY_VALUE).named("map")
} else {
// Standard mode:
//
// <map-repetition> group <name> (MAP) {
// repeated group key_value { |
// required <key-type> key; |- repeatedGroup
// <value-repetition> <value-type> value; |
// } |
// }
repeatedGroupBuilder.named("key_value")
}

Types.buildGroup(repetition).as(MAP).addField(repeatedGroup).named(field.name)

case t: StructType =>
val parquetFields = t.fields.map(convertField)
Types.buildGroup(repetition).addFields(parquetFields: _*).named(field.name)

case udt: UserDefinedType[_] =>
convertField(field.copy(dataType = udt.sqlType))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,18 +243,16 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit
}

writeLegacyParquetFormat match {
// Standard mode, writes decimals with precision <= 9 as INT32
// Standard mode, 1 <= precision <= 9, writes as INT32
case false if precision <= MAX_PRECISION_FOR_INT32 => int32Writer

// Standard mode, writes decimals with precision <= 18 as INT64
// Standard mode, 10 <= precision <= 18, writes as INT64
case false if precision <= MAX_PRECISION_FOR_INT64 => int64Writer

// Legacy mode, writes decimals with precision <= 18 as FIXED_LEN_BYTE_ARRAY
// Legacy mode, 1 <= precision <= 18, writes as FIXED_LEN_BYTE_ARRAY
case true if precision <= MAX_PRECISION_FOR_INT64 => binaryWriterUsingUnscaledLong

// All other cases:
// - Standard mode, writes decimals with precision > 18 as FIXED_LEN_BYTE_ARRAY
// - Legacy mode, writes decimals with precision > 18 as FIXED_LEN_BYTE_ARRAY
// Either standard or legacy mode, 19 <= precision <= 38, writes as FIXED_LEN_BYTE_ARRAY
case _ => binaryWriterUsingUnscaledBytes
}
}
Expand Down