Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -290,9 +290,9 @@ private[spark] object SQLConf {
defaultValue = Some(true),
doc = "Enables Parquet filter push-down optimization when set to true.")

val PARQUET_FOLLOW_PARQUET_FORMAT_SPEC = booleanConf(
key = "spark.sql.parquet.followParquetFormatSpec",
defaultValue = Some(false),
val PARQUET_WRITE_LEGACY_FORMAT = booleanConf(
key = "spark.sql.parquet.writeLegacyFormat",
defaultValue = Some(true),
doc = "Whether to follow Parquet's format specification when converting Parquet schema to " +
"Spark SQL schema and vice versa.",
isPublic = false)
Expand All @@ -304,8 +304,7 @@ private[spark] object SQLConf {
"subclass of org.apache.hadoop.mapreduce.OutputCommitter. Typically, it's also a subclass " +
"of org.apache.parquet.hadoop.ParquetOutputCommitter. NOTE: 1. Instead of SQLConf, this " +
"option must be set in Hadoop Configuration. 2. This option overrides " +
"\"spark.sql.sources.outputCommitterClass\"."
)
"\"spark.sql.sources.outputCommitterClass\".")

val ORC_FILTER_PUSHDOWN_ENABLED = booleanConf("spark.sql.orc.filterPushdown",
defaultValue = Some(false),
Expand Down Expand Up @@ -497,7 +496,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf {

private[spark] def isParquetINT96AsTimestamp: Boolean = getConf(PARQUET_INT96_AS_TIMESTAMP)

private[spark] def followParquetFormatSpec: Boolean = getConf(PARQUET_FOLLOW_PARQUET_FORMAT_SPEC)
private[spark] def writeLegacyParquetFormat: Boolean = getConf(PARQUET_WRITE_LEGACY_FORMAT)

private[spark] def inMemoryPartitionPruning: Boolean = getConf(IN_MEMORY_PARTITION_PRUNING)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ private[parquet] object CatalystReadSupport {
private def clipParquetGroupFields(
parquetRecord: GroupType, structType: StructType): Seq[Type] = {
val parquetFieldMap = parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap
val toParquet = new CatalystSchemaConverter(followParquetFormatSpec = true)
val toParquet = new CatalystSchemaConverter(writeLegacyParquetFormat = false)
structType.map { f =>
parquetFieldMap
.get(f.name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,34 +41,31 @@ import org.apache.spark.sql.{AnalysisException, SQLConf}
* @constructor
* @param assumeBinaryIsString Whether unannotated BINARY fields should be assumed to be Spark SQL
* [[StringType]] fields when converting Parquet a [[MessageType]] to Spark SQL
* [[StructType]].
* [[StructType]]. This argument only affects Parquet read path.
* @param assumeInt96IsTimestamp Whether unannotated INT96 fields should be assumed to be Spark SQL
* [[TimestampType]] fields when converting Parquet a [[MessageType]] to Spark SQL
* [[StructType]]. Note that Spark SQL [[TimestampType]] is similar to Hive timestamp, which
* has optional nanosecond precision, but different from `TIME_MILLS` and `TIMESTAMP_MILLIS`
* described in Parquet format spec.
* @param followParquetFormatSpec Whether to generate standard DECIMAL, LIST, and MAP structure when
* converting Spark SQL [[StructType]] to Parquet [[MessageType]]. For Spark 1.4.x and
* prior versions, Spark SQL only supports decimals with a max precision of 18 digits, and
* uses non-standard LIST and MAP structure. Note that the current Parquet format spec is
* backwards-compatible with these settings. If this argument is set to `false`, we fallback
* to old style non-standard behaviors.
* described in Parquet format spec. This argument only affects Parquet read path.
* @param writeLegacyParquetFormat Whether to use legacy Parquet format compatible with Spark 1.4
* and prior versions when converting a Catalyst [[StructType]] to a Parquet [[MessageType]].
* When set to false, use standard format defined in parquet-format spec. This argument only
* affects Parquet write path.
*/
private[parquet] class CatalystSchemaConverter(
assumeBinaryIsString: Boolean = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get,
assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get,
followParquetFormatSpec: Boolean = SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.defaultValue.get
) {
writeLegacyParquetFormat: Boolean = SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get) {

def this(conf: SQLConf) = this(
assumeBinaryIsString = conf.isParquetBinaryAsString,
assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp,
followParquetFormatSpec = conf.followParquetFormatSpec)
writeLegacyParquetFormat = conf.writeLegacyParquetFormat)

def this(conf: Configuration) = this(
assumeBinaryIsString = conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean,
assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean,
followParquetFormatSpec = conf.get(SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.key).toBoolean)
writeLegacyParquetFormat = conf.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key).toBoolean)

/**
* Converts Parquet [[MessageType]] `parquetSchema` to a Spark SQL [[StructType]].
Expand Down Expand Up @@ -371,15 +368,15 @@ private[parquet] class CatalystSchemaConverter(
case BinaryType =>
Types.primitive(BINARY, repetition).named(field.name)

// =====================================
// Decimals (for Spark version <= 1.4.x)
// =====================================
// ======================
// Decimals (legacy mode)
// ======================

// Spark 1.4.x and prior versions only support decimals with a maximum precision of 18 and
// always store decimals in fixed-length byte arrays. To keep compatibility with these older
// versions, here we convert decimals with all precisions to `FIXED_LEN_BYTE_ARRAY` annotated
// by `DECIMAL`.
case DecimalType.Fixed(precision, scale) if !followParquetFormatSpec =>
case DecimalType.Fixed(precision, scale) if writeLegacyParquetFormat =>
Types
.primitive(FIXED_LEN_BYTE_ARRAY, repetition)
.as(DECIMAL)
Expand All @@ -388,13 +385,13 @@ private[parquet] class CatalystSchemaConverter(
.length(CatalystSchemaConverter.minBytesForPrecision(precision))
.named(field.name)

// =====================================
// Decimals (follow Parquet format spec)
// =====================================
// ========================
// Decimals (standard mode)
// ========================

// Uses INT32 for 1 <= precision <= 9
case DecimalType.Fixed(precision, scale)
if precision <= MAX_PRECISION_FOR_INT32 && followParquetFormatSpec =>
if precision <= MAX_PRECISION_FOR_INT32 && !writeLegacyParquetFormat =>
Types
.primitive(INT32, repetition)
.as(DECIMAL)
Expand All @@ -404,7 +401,7 @@ private[parquet] class CatalystSchemaConverter(

// Uses INT64 for 1 <= precision <= 18
case DecimalType.Fixed(precision, scale)
if precision <= MAX_PRECISION_FOR_INT64 && followParquetFormatSpec =>
if precision <= MAX_PRECISION_FOR_INT64 && !writeLegacyParquetFormat =>
Types
.primitive(INT64, repetition)
.as(DECIMAL)
Expand All @@ -413,7 +410,7 @@ private[parquet] class CatalystSchemaConverter(
.named(field.name)

// Uses FIXED_LEN_BYTE_ARRAY for all other precisions
case DecimalType.Fixed(precision, scale) if followParquetFormatSpec =>
case DecimalType.Fixed(precision, scale) if !writeLegacyParquetFormat =>
Types
.primitive(FIXED_LEN_BYTE_ARRAY, repetition)
.as(DECIMAL)
Expand All @@ -422,15 +419,15 @@ private[parquet] class CatalystSchemaConverter(
.length(CatalystSchemaConverter.minBytesForPrecision(precision))
.named(field.name)

// ===================================================
// ArrayType and MapType (for Spark versions <= 1.4.x)
// ===================================================
// ===================================
// ArrayType and MapType (legacy mode)
// ===================================

// Spark 1.4.x and prior versions convert `ArrayType` with nullable elements into a 3-level
// `LIST` structure. This behavior is somewhat a hybrid of parquet-hive and parquet-avro
// (1.6.0rc3): the 3-level structure is similar to parquet-hive while the 3rd level element
// field name "array" is borrowed from parquet-avro.
case ArrayType(elementType, nullable @ true) if !followParquetFormatSpec =>
case ArrayType(elementType, nullable @ true) if writeLegacyParquetFormat =>
// <list-repetition> group <name> (LIST) {
// optional group bag {
// repeated <element-type> array;
Expand All @@ -448,7 +445,7 @@ private[parquet] class CatalystSchemaConverter(
// Spark 1.4.x and prior versions convert ArrayType with non-nullable elements into a 2-level
// LIST structure. This behavior mimics parquet-avro (1.6.0rc3). Note that this case is
// covered by the backwards-compatibility rules implemented in `isElementType()`.
case ArrayType(elementType, nullable @ false) if !followParquetFormatSpec =>
case ArrayType(elementType, nullable @ false) if writeLegacyParquetFormat =>
// <list-repetition> group <name> (LIST) {
// repeated <element-type> element;
// }
Expand All @@ -460,7 +457,7 @@ private[parquet] class CatalystSchemaConverter(

// Spark 1.4.x and prior versions convert MapType into a 3-level group annotated by
// MAP_KEY_VALUE. This is covered by `convertGroupField(field: GroupType): DataType`.
case MapType(keyType, valueType, valueContainsNull) if !followParquetFormatSpec =>
case MapType(keyType, valueType, valueContainsNull) if writeLegacyParquetFormat =>
// <map-repetition> group <name> (MAP) {
// repeated group map (MAP_KEY_VALUE) {
// required <key-type> key;
Expand All @@ -473,11 +470,11 @@ private[parquet] class CatalystSchemaConverter(
convertField(StructField("key", keyType, nullable = false)),
convertField(StructField("value", valueType, valueContainsNull)))

// ==================================================
// ArrayType and MapType (follow Parquet format spec)
// ==================================================
// =====================================
// ArrayType and MapType (standard mode)
// =====================================

case ArrayType(elementType, containsNull) if followParquetFormatSpec =>
case ArrayType(elementType, containsNull) if !writeLegacyParquetFormat =>
// <list-repetition> group <name> (LIST) {
// repeated group list {
// <element-repetition> <element-type> element;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ private[sql] class ParquetRelation(
val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown
val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString
val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp
val followParquetFormatSpec = sqlContext.conf.followParquetFormatSpec
val writeLegacyParquetFormat = sqlContext.conf.writeLegacyParquetFormat

// Parquet row group size. We will use this value as the value for
// mapreduce.input.fileinputformat.split.minsize and mapred.min.split.size if the value
Expand All @@ -305,7 +305,7 @@ private[sql] class ParquetRelation(
parquetFilterPushDown,
assumeBinaryIsString,
assumeInt96IsTimestamp,
followParquetFormatSpec) _
writeLegacyParquetFormat) _

// Create the function to set input paths at the driver side.
val setInputPaths =
Expand Down Expand Up @@ -531,7 +531,7 @@ private[sql] object ParquetRelation extends Logging {
parquetFilterPushDown: Boolean,
assumeBinaryIsString: Boolean,
assumeInt96IsTimestamp: Boolean,
followParquetFormatSpec: Boolean)(job: Job): Unit = {
writeLegacyParquetFormat: Boolean)(job: Job): Unit = {
val conf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName)

Expand Down Expand Up @@ -561,7 +561,7 @@ private[sql] object ParquetRelation extends Logging {
// Sets flags for Parquet schema conversion
conf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key, assumeBinaryIsString)
conf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, assumeInt96IsTimestamp)
conf.setBoolean(SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.key, followParquetFormatSpec)
conf.setBoolean(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key, writeLegacyParquetFormat)

overrideMinSplitSize(parquetBlockSize, conf)
}
Expand All @@ -586,7 +586,7 @@ private[sql] object ParquetRelation extends Logging {
val converter = new CatalystSchemaConverter(
sqlContext.conf.isParquetBinaryAsString,
sqlContext.conf.isParquetBinaryAsString,
sqlContext.conf.followParquetFormatSpec)
sqlContext.conf.writeLegacyParquetFormat)

converter.convert(schema)
}
Expand Down Expand Up @@ -720,7 +720,7 @@ private[sql] object ParquetRelation extends Logging {
filesToTouch: Seq[FileStatus], sqlContext: SQLContext): Option[StructType] = {
val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString
val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp
val followParquetFormatSpec = sqlContext.conf.followParquetFormatSpec
val writeLegacyParquetFormat = sqlContext.conf.writeLegacyParquetFormat
val serializedConf = new SerializableConfiguration(sqlContext.sparkContext.hadoopConfiguration)

// !! HACK ALERT !!
Expand Down Expand Up @@ -760,7 +760,7 @@ private[sql] object ParquetRelation extends Logging {
new CatalystSchemaConverter(
assumeBinaryIsString = assumeBinaryIsString,
assumeInt96IsTimestamp = assumeInt96IsTimestamp,
followParquetFormatSpec = followParquetFormatSpec)
writeLegacyParquetFormat = writeLegacyParquetFormat)

footers.map { footer =>
ParquetRelation.readSchemaFromFooter(footer, converter)
Expand Down
Loading