-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-8848] [SQL] Refactors Parquet write path to follow parquet-format #8988
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 10 commits
d1583f8
6fd20f7
d395bfd
a680f09
5b08a20
f03ef93
2bc5ebc
c542ae9
af50f9c
e67d0b1
db79fb6
fb6ee9f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -95,7 +95,9 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with | |
| """.stripMargin | ||
| } | ||
|
|
||
| new CatalystRecordMaterializer(parquetRequestedSchema, catalystRequestedSchema) | ||
| new CatalystRecordMaterializer( | ||
| parquetRequestedSchema, | ||
| CatalystReadSupport.expandUDT(catalystRequestedSchema)) | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -110,7 +112,10 @@ private[parquet] object CatalystReadSupport { | |
| */ | ||
| def clipParquetSchema(parquetSchema: MessageType, catalystSchema: StructType): MessageType = { | ||
| val clippedParquetFields = clipParquetGroupFields(parquetSchema.asGroupType(), catalystSchema) | ||
| Types.buildMessage().addFields(clippedParquetFields: _*).named("root") | ||
| Types | ||
| .buildMessage() | ||
| .addFields(clippedParquetFields: _*) | ||
| .named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME) | ||
| } | ||
|
|
||
| private def clipParquetType(parquetType: Type, catalystType: DataType): Type = { | ||
|
|
@@ -271,4 +276,30 @@ private[parquet] object CatalystReadSupport { | |
| .getOrElse(toParquet.convertField(f)) | ||
| } | ||
| } | ||
|
|
||
| def expandUDT(schema: StructType): StructType = { | ||
| def expand(dataType: DataType): DataType = { | ||
| dataType match { | ||
| case t: ArrayType => | ||
| t.copy(elementType = expand(t.elementType)) | ||
|
|
||
| case t: MapType => | ||
| t.copy( | ||
| keyType = expand(t.keyType), | ||
| valueType = expand(t.valueType)) | ||
|
|
||
| case t: StructType => | ||
| val expandedFields = t.fields.map(f => f.copy(dataType = expand(f.dataType))) | ||
| t.copy(fields = expandedFields) | ||
|
|
||
| case t: UserDefinedType[_] => | ||
| t.sqlType | ||
|
|
||
| case t => | ||
| t | ||
| } | ||
| } | ||
|
|
||
| expand(schema).asInstanceOf[StructType] | ||
| } | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure whether this method is useful enough to be added as methods of all complex data types.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe not. |
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -121,7 +121,7 @@ private[parquet] class CatalystSchemaConverter( | |
| val precision = field.getDecimalMetadata.getPrecision | ||
| val scale = field.getDecimalMetadata.getScale | ||
|
|
||
| CatalystSchemaConverter.analysisRequire( | ||
| CatalystSchemaConverter.checkConversionRequirement( | ||
| maxPrecision == -1 || 1 <= precision && precision <= maxPrecision, | ||
| s"Invalid decimal precision: $typeName cannot store $precision digits (max $maxPrecision)") | ||
|
|
||
|
|
@@ -155,7 +155,7 @@ private[parquet] class CatalystSchemaConverter( | |
| } | ||
|
|
||
| case INT96 => | ||
| CatalystSchemaConverter.analysisRequire( | ||
| CatalystSchemaConverter.checkConversionRequirement( | ||
| assumeInt96IsTimestamp, | ||
| "INT96 is not supported unless it's interpreted as timestamp. " + | ||
| s"Please try to set ${SQLConf.PARQUET_INT96_AS_TIMESTAMP.key} to true.") | ||
|
|
@@ -197,11 +197,11 @@ private[parquet] class CatalystSchemaConverter( | |
| // | ||
| // See: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists | ||
| case LIST => | ||
| CatalystSchemaConverter.analysisRequire( | ||
| CatalystSchemaConverter.checkConversionRequirement( | ||
| field.getFieldCount == 1, s"Invalid list type $field") | ||
|
|
||
| val repeatedType = field.getType(0) | ||
| CatalystSchemaConverter.analysisRequire( | ||
| CatalystSchemaConverter.checkConversionRequirement( | ||
| repeatedType.isRepetition(REPEATED), s"Invalid list type $field") | ||
|
|
||
| if (isElementType(repeatedType, field.getName)) { | ||
|
|
@@ -217,17 +217,17 @@ private[parquet] class CatalystSchemaConverter( | |
| // See: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules-1 | ||
| // scalastyle:on | ||
| case MAP | MAP_KEY_VALUE => | ||
| CatalystSchemaConverter.analysisRequire( | ||
| CatalystSchemaConverter.checkConversionRequirement( | ||
| field.getFieldCount == 1 && !field.getType(0).isPrimitive, | ||
| s"Invalid map type: $field") | ||
|
|
||
| val keyValueType = field.getType(0).asGroupType() | ||
| CatalystSchemaConverter.analysisRequire( | ||
| CatalystSchemaConverter.checkConversionRequirement( | ||
| keyValueType.isRepetition(REPEATED) && keyValueType.getFieldCount == 2, | ||
| s"Invalid map type: $field") | ||
|
|
||
| val keyType = keyValueType.getType(0) | ||
| CatalystSchemaConverter.analysisRequire( | ||
| CatalystSchemaConverter.checkConversionRequirement( | ||
| keyType.isPrimitive, | ||
| s"Map key type is expected to be a primitive type, but found: $keyType") | ||
|
|
||
|
|
@@ -299,7 +299,10 @@ private[parquet] class CatalystSchemaConverter( | |
| * Converts a Spark SQL [[StructType]] to a Parquet [[MessageType]]. | ||
| */ | ||
| def convert(catalystSchema: StructType): MessageType = { | ||
| Types.buildMessage().addFields(catalystSchema.map(convertField): _*).named("root") | ||
| Types | ||
| .buildMessage() | ||
| .addFields(catalystSchema.map(convertField): _*) | ||
| .named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME) | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -347,21 +350,21 @@ private[parquet] class CatalystSchemaConverter( | |
| // NOTE: Spark SQL TimestampType is NOT a well defined type in Parquet format spec. | ||
| // | ||
| // As stated in PARQUET-323, Parquet `INT96` was originally introduced to represent nanosecond | ||
| // timestamp in Impala for some historical reasons, it's not recommended to be used for any | ||
| // other types and will probably be deprecated in future Parquet format spec. That's the | ||
| // reason why Parquet format spec only defines `TIMESTAMP_MILLIS` and `TIMESTAMP_MICROS` which | ||
| // are both logical types annotating `INT64`. | ||
| // timestamp in Impala for some historical reasons. It's not recommended to be used for any | ||
| // other types and will probably be deprecated in some future version of parquet-format spec. | ||
| // That's the reason why parquet-format spec only defines `TIMESTAMP_MILLIS` and | ||
| // `TIMESTAMP_MICROS` which are both logical types annotating `INT64`. | ||
| // | ||
| // Originally, Spark SQL uses the same nanosecond timestamp type as Impala and Hive. Starting | ||
| // from Spark 1.5.0, we resort to a timestamp type with 100 ns precision so that we can store | ||
| // from Spark 1.4.0, we resort to a timestamp type with 100 ns precision so that we can store | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be 1.5 |
||
| // a timestamp into a `Long`. This design decision is subject to change though, for example, | ||
| // we may resort to microsecond precision in the future. | ||
| // | ||
| // For Parquet, we plan to write all `TimestampType` value as `TIMESTAMP_MICROS`, but it's | ||
| // currently not implemented yet because parquet-mr 1.7.0 (the version we're currently using) | ||
| // hasn't implemented `TIMESTAMP_MICROS` yet. | ||
| // | ||
| // TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that. | ||
| // TODO Converts `TIMESTAMP_MICROS` once parquet-mr implements that. | ||
| case TimestampType => | ||
| Types.primitive(INT96, repetition).named(field.name) | ||
|
|
||
|
|
@@ -523,11 +526,12 @@ private[parquet] class CatalystSchemaConverter( | |
| } | ||
| } | ||
|
|
||
|
|
||
| private[parquet] object CatalystSchemaConverter { | ||
| val SPARK_PARQUET_SCHEMA_NAME = "spark_schema" | ||
|
|
||
| def checkFieldName(name: String): Unit = { | ||
| // ,;{}()\n\t= and space are special characters in Parquet schema | ||
| analysisRequire( | ||
| checkConversionRequirement( | ||
| !name.matches(".*[ ,;{}()\n\t=].*"), | ||
| s"""Attribute name "$name" contains invalid character(s) among " ,;{}()\\n\\t=". | ||
| |Please use alias to rename it. | ||
|
|
@@ -539,7 +543,7 @@ private[parquet] object CatalystSchemaConverter { | |
| schema | ||
| } | ||
|
|
||
| def analysisRequire(f: => Boolean, message: String): Unit = { | ||
| def checkConversionRequirement(f: => Boolean, message: String): Unit = { | ||
| if (!f) { | ||
| throw new AnalysisException(message) | ||
| } | ||
|
|
@@ -553,16 +557,8 @@ private[parquet] object CatalystSchemaConverter { | |
| numBytes | ||
| } | ||
|
|
||
| private val MIN_BYTES_FOR_PRECISION = Array.tabulate[Int](39)(computeMinBytesForPrecision) | ||
|
|
||
| // Returns the minimum number of bytes needed to store a decimal with a given `precision`. | ||
| def minBytesForPrecision(precision : Int) : Int = { | ||
| if (precision < MIN_BYTES_FOR_PRECISION.length) { | ||
| MIN_BYTES_FOR_PRECISION(precision) | ||
| } else { | ||
| computeMinBytesForPrecision(precision) | ||
| } | ||
| } | ||
| val minBytesForPrecision = Array.tabulate[Int](39)(computeMinBytesForPrecision) | ||
|
|
||
| val MAX_PRECISION_FOR_INT32 = maxPrecisionForBytes(4) /* 9 */ | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Expands UDTs early so that
CatalystRowConverteralways receive a Catalyst schema without UDTs.