From d1583f88507a32afb509d33313d0cbe02de4897a Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Mon, 5 Oct 2015 15:42:44 -0700 Subject: [PATCH 01/12] Refactors Parquet write path to follow parquet-format --- .../org/apache/spark/sql/types/Decimal.scala | 4 +- .../parquet/CatalystReadSupport.scala | 5 +- .../parquet/CatalystRowConverter.scala | 42 +- .../parquet/CatalystSchemaConverter.scala | 56 ++- .../parquet/CatalystWriteSupport.scala | 434 ++++++++++++++++++ .../DirectParquetOutputCommitter.scala | 2 +- .../parquet/ParquetConverter.scala | 39 -- .../datasources/parquet/ParquetRelation.scala | 42 +- .../parquet/ParquetTableSupport.scala | 321 ------------- .../parquet/ParquetTypesConverter.scala | 160 ------- .../datasources/parquet/ParquetIOSuite.scala | 72 ++- .../parquet/ParquetSchemaSuite.scala | 22 +- .../datasources/parquet/ParquetTest.scala | 44 +- 13 files changed, 614 insertions(+), 629 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetConverter.scala delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTableSupport.scala delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypesConverter.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala index bfcf111385b7..662001c8986e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala @@ -108,7 +108,9 @@ final class Decimal extends Ordered[Decimal] with Serializable { */ def set(decimal: BigDecimal, precision: Int, scale: Int): Decimal = { this.decimalVal = decimal.setScale(scale, ROUNDING_MODE) - require(decimalVal.precision <= precision, "Overflowed precision") + require( + decimalVal.precision <= precision, + s"Decimal precision ${decimalVal.precision} exceeds max precision $precision") this.longVal = 0L this._precision = precision this._scale = scale diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala index 532569803409..628dd11ca81b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala @@ -110,7 +110,10 @@ private[parquet] object CatalystReadSupport { */ def clipParquetSchema(parquetSchema: MessageType, catalystSchema: StructType): MessageType = { val clippedParquetFields = clipParquetGroupFields(parquetSchema.asGroupType(), catalystSchema) - Types.buildMessage().addFields(clippedParquetFields: _*).named("root") + Types + .buildMessage() + .addFields(clippedParquetFields: _*) + .named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME) } private def clipParquetType(parquetType: Type, catalystType: DataType): Type = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala index 050d3610a641..f7615f5d7e8e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala @@ -340,30 +340,36 @@ private[parquet] class CatalystRowConverter( val scale = decimalType.scale if (precision <= CatalystSchemaConverter.MAX_PRECISION_FOR_INT64) { - // Constructs a `Decimal` with an unscaled `Long` value if possible. The underlying - // `ByteBuffer` implementation is guaranteed to be `HeapByteBuffer`, so here we are using - // `Binary.toByteBuffer.array()` to steal the underlying byte array without copying it. - val buffer = value.toByteBuffer - val bytes = buffer.array() - val start = buffer.position() - val end = buffer.limit() - - var unscaled = 0L - var i = start - - while (i < end) { - unscaled = (unscaled << 8) | (bytes(i) & 0xff) - i += 1 - } - - val bits = 8 * (end - start) - unscaled = (unscaled << (64 - bits)) >> (64 - bits) + // Constructs a `Decimal` with an unscaled `Long` value if possible. + val unscaled = binaryToUnscaledLong(value) Decimal(unscaled, precision, scale) } else { // Otherwise, resorts to an unscaled `BigInteger` instead. Decimal(new BigDecimal(new BigInteger(value.getBytes), scale), precision, scale) } } + + private def binaryToUnscaledLong(binary: Binary): Long = { + // The underlying `ByteBuffer` implementation is guaranteed to be `HeapByteBuffer`, so here + // we are using `Binary.toByteBuffer.array()` to steal the underlying byte array without + // copying it. + val buffer = binary.toByteBuffer + val bytes = buffer.array() + val start = buffer.position() + val end = buffer.limit() + + var unscaled = 0L + var i = start + + while (i < end) { + unscaled = (unscaled << 8) | (bytes(i) & 0xff) + i += 1 + } + + val bits = 8 * (end - start) + unscaled = (unscaled << (64 - bits)) >> (64 - bits) + unscaled + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala index 6904fc736c10..77eabb9bd76a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala @@ -47,7 +47,7 @@ import org.apache.spark.sql.{AnalysisException, SQLConf} * [[StructType]]. Note that Spark SQL [[TimestampType]] is similar to Hive timestamp, which * has optional nanosecond precision, but different from `TIME_MILLS` and `TIMESTAMP_MILLIS` * described in Parquet format spec. This argument only affects Parquet read path. - * @param writeLegacyParquetFormat Whether to use legacy Parquet format compatible with Spark 1.4 + * @param writeLegacyParquetFormat Whether to use legacy Parquet format compatible with Spark 1.5 * and prior versions when converting a Catalyst [[StructType]] to a Parquet [[MessageType]]. * When set to false, use standard format defined in parquet-format spec. This argument only * affects Parquet write path. @@ -121,7 +121,7 @@ private[parquet] class CatalystSchemaConverter( val precision = field.getDecimalMetadata.getPrecision val scale = field.getDecimalMetadata.getScale - CatalystSchemaConverter.analysisRequire( + CatalystSchemaConverter.checkConversionRequirement( maxPrecision == -1 || 1 <= precision && precision <= maxPrecision, s"Invalid decimal precision: $typeName cannot store $precision digits (max $maxPrecision)") @@ -155,7 +155,7 @@ private[parquet] class CatalystSchemaConverter( } case INT96 => - CatalystSchemaConverter.analysisRequire( + CatalystSchemaConverter.checkConversionRequirement( assumeInt96IsTimestamp, "INT96 is not supported unless it's interpreted as timestamp. " + s"Please try to set ${SQLConf.PARQUET_INT96_AS_TIMESTAMP.key} to true.") @@ -197,11 +197,11 @@ private[parquet] class CatalystSchemaConverter( // // See: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists case LIST => - CatalystSchemaConverter.analysisRequire( + CatalystSchemaConverter.checkConversionRequirement( field.getFieldCount == 1, s"Invalid list type $field") val repeatedType = field.getType(0) - CatalystSchemaConverter.analysisRequire( + CatalystSchemaConverter.checkConversionRequirement( repeatedType.isRepetition(REPEATED), s"Invalid list type $field") if (isElementType(repeatedType, field.getName)) { @@ -217,17 +217,17 @@ private[parquet] class CatalystSchemaConverter( // See: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules-1 // scalastyle:on case MAP | MAP_KEY_VALUE => - CatalystSchemaConverter.analysisRequire( + CatalystSchemaConverter.checkConversionRequirement( field.getFieldCount == 1 && !field.getType(0).isPrimitive, s"Invalid map type: $field") val keyValueType = field.getType(0).asGroupType() - CatalystSchemaConverter.analysisRequire( + CatalystSchemaConverter.checkConversionRequirement( keyValueType.isRepetition(REPEATED) && keyValueType.getFieldCount == 2, s"Invalid map type: $field") val keyType = keyValueType.getType(0) - CatalystSchemaConverter.analysisRequire( + CatalystSchemaConverter.checkConversionRequirement( keyType.isPrimitive, s"Map key type is expected to be a primitive type, but found: $keyType") @@ -299,7 +299,10 @@ private[parquet] class CatalystSchemaConverter( * Converts a Spark SQL [[StructType]] to a Parquet [[MessageType]]. */ def convert(catalystSchema: StructType): MessageType = { - Types.buildMessage().addFields(catalystSchema.map(convertField): _*).named("root") + Types + .buildMessage() + .addFields(catalystSchema.map(convertField): _*) + .named(CatalystSchemaConverter.SPARK_PARQUET_SCHEMA_NAME) } /** @@ -347,10 +350,10 @@ private[parquet] class CatalystSchemaConverter( // NOTE: Spark SQL TimestampType is NOT a well defined type in Parquet format spec. // // As stated in PARQUET-323, Parquet `INT96` was originally introduced to represent nanosecond - // timestamp in Impala for some historical reasons, it's not recommended to be used for any - // other types and will probably be deprecated in future Parquet format spec. That's the - // reason why Parquet format spec only defines `TIMESTAMP_MILLIS` and `TIMESTAMP_MICROS` which - // are both logical types annotating `INT64`. + // timestamp in Impala for some historical reasons. It's not recommended to be used for any + // other types and will probably be deprecated in some future version of parquet-format spec. + // That's the reason why parquet-format spec only defines `TIMESTAMP_MILLIS` and + // `TIMESTAMP_MICROS` which are both logical types annotating `INT64`. // // Originally, Spark SQL uses the same nanosecond timestamp type as Impala and Hive. Starting // from Spark 1.5.0, we resort to a timestamp type with 100 ns precision so that we can store @@ -361,7 +364,7 @@ private[parquet] class CatalystSchemaConverter( // currently not implemented yet because parquet-mr 1.7.0 (the version we're currently using) // hasn't implemented `TIMESTAMP_MICROS` yet. // - // TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that. + // TODO Converts `TIMESTAMP_MICROS` once parquet-mr implements that. case TimestampType => Types.primitive(INT96, repetition).named(field.name) @@ -372,7 +375,7 @@ private[parquet] class CatalystSchemaConverter( // Decimals (legacy mode) // ====================== - // Spark 1.4.x and prior versions only support decimals with a maximum precision of 18 and + // Spark 1.5.x and prior versions only support decimals with a maximum precision of 18 and // always store decimals in fixed-length byte arrays. To keep compatibility with these older // versions, here we convert decimals with all precisions to `FIXED_LEN_BYTE_ARRAY` annotated // by `DECIMAL`. @@ -423,7 +426,7 @@ private[parquet] class CatalystSchemaConverter( // ArrayType and MapType (legacy mode) // =================================== - // Spark 1.4.x and prior versions convert `ArrayType` with nullable elements into a 3-level + // Spark 1.5.x and prior versions convert `ArrayType` with nullable elements into a 3-level // `LIST` structure. This behavior is somewhat a hybrid of parquet-hive and parquet-avro // (1.6.0rc3): the 3-level structure is similar to parquet-hive while the 3rd level element // field name "array" is borrowed from parquet-avro. @@ -442,7 +445,7 @@ private[parquet] class CatalystSchemaConverter( .addField(convertField(StructField("array", elementType, nullable))) .named("bag")) - // Spark 1.4.x and prior versions convert ArrayType with non-nullable elements into a 2-level + // Spark 1.5.x and prior versions convert ArrayType with non-nullable elements into a 2-level // LIST structure. This behavior mimics parquet-avro (1.6.0rc3). Note that this case is // covered by the backwards-compatibility rules implemented in `isElementType()`. case ArrayType(elementType, nullable @ false) if writeLegacyParquetFormat => @@ -455,7 +458,7 @@ private[parquet] class CatalystSchemaConverter( // "array" is the name chosen by parquet-avro (1.7.0 and prior version) convertField(StructField("array", elementType, nullable), REPEATED)) - // Spark 1.4.x and prior versions convert MapType into a 3-level group annotated by + // Spark 1.5.x and prior versions convert MapType into a 3-level group annotated by // MAP_KEY_VALUE. This is covered by `convertGroupField(field: GroupType): DataType`. case MapType(keyType, valueType, valueContainsNull) if writeLegacyParquetFormat => // group (MAP) { @@ -523,11 +526,12 @@ private[parquet] class CatalystSchemaConverter( } } - private[parquet] object CatalystSchemaConverter { + val SPARK_PARQUET_SCHEMA_NAME = "spark_schema" + def checkFieldName(name: String): Unit = { // ,;{}()\n\t= and space are special characters in Parquet schema - analysisRequire( + checkConversionRequirement( !name.matches(".*[ ,;{}()\n\t=].*"), s"""Attribute name "$name" contains invalid character(s) among " ,;{}()\\n\\t=". |Please use alias to rename it. @@ -539,7 +543,7 @@ private[parquet] object CatalystSchemaConverter { schema } - def analysisRequire(f: => Boolean, message: String): Unit = { + def checkConversionRequirement(f: => Boolean, message: String): Unit = { if (!f) { throw new AnalysisException(message) } @@ -553,16 +557,8 @@ private[parquet] object CatalystSchemaConverter { numBytes } - private val MIN_BYTES_FOR_PRECISION = Array.tabulate[Int](39)(computeMinBytesForPrecision) - // Returns the minimum number of bytes needed to store a decimal with a given `precision`. - def minBytesForPrecision(precision : Int) : Int = { - if (precision < MIN_BYTES_FOR_PRECISION.length) { - MIN_BYTES_FOR_PRECISION(precision) - } else { - computeMinBytesForPrecision(precision) - } - } + val minBytesForPrecision = Array.tabulate[Int](39)(computeMinBytesForPrecision) val MAX_PRECISION_FOR_INT32 = maxPrecisionForBytes(4) /* 9 */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala new file mode 100644 index 000000000000..00318c958acb --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import java.nio.{ByteBuffer, ByteOrder} +import java.util + +import scala.collection.JavaConverters.mapAsJavaMapConverter + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.column.ParquetProperties +import org.apache.parquet.hadoop.ParquetOutputFormat +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.hadoop.api.WriteSupport.WriteContext +import org.apache.parquet.io.api.{Binary, RecordConsumer} + +import org.apache.spark.Logging +import org.apache.spark.sql.SQLConf +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{SpecializedGetters, SpecificMutableRow} +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.{MAX_PRECISION_FOR_INT32, MAX_PRECISION_FOR_INT64, minBytesForPrecision} +import org.apache.spark.sql.types._ + +/** + * A Parquet [[WriteSupport]] implementation that writes Catalyst [[InternalRow]]s as Parquet + * messages. This class can write Parquet data in two modes: + * + * - Standard mode: Parquet data are written in standard format defined in parquet-format spec. + * - Legacy mode: Parquet data are written in legacy format compatible with Spark 1.5 and prior. + * + * This behavior can be controlled by SQL option `spark.sql.parquet.writeLegacyParquetFormat`. The + * value of the option is propagated to this class by the `init()` method and its Hadoop + * configuration argument. + */ +private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] with Logging { + // A `ValueWriter` is responsible for writing a field of an `InternalRow` to the record consumer. + // Here we are using `SpecializedGetters` rather than `InternalRow` so that we can directly access + // data in `ArrayData` without the help of `SpecificMutableRow`. + private type ValueWriter = (SpecializedGetters, Int) => Unit + + // Schema of the `InternalRow`s to be written + private var schema: StructType = _ + + // `ValueWriter`s for all fields of the schema + private var rootFieldWriters: Seq[ValueWriter] = _ + + // The Parquet `RecordConsumer` to which all `InternalRow`s are written + private var recordConsumer: RecordConsumer = _ + + // Whether to write data in legacy Parquet format compatible with Spark 1.5 and prior versions + private var writeLegacyParquetFormat: Boolean = _ + + // Reusable byte array used to write timestamps as Parquet INT96 values + private val timestampBuffer = new Array[Byte](12) + + // Reusable byte array used to write decimal values + private val decimalBuffer = new Array[Byte](minBytesForPrecision(DecimalType.MAX_PRECISION)) + + override def init(configuration: Configuration): WriteContext = { + val schemaString = configuration.get(CatalystWriteSupport.SPARK_ROW_SCHEMA) + this.schema = StructType.fromString(schemaString) + this.writeLegacyParquetFormat = { + // `SQLConf.PARQUET_WRITE_LEGACY_FORMAT` should always be explicitly set in ParquetRelation + assert(configuration.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key) != null) + configuration.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key).toBoolean + } + this.rootFieldWriters = schema.map(_.dataType).map(makeWriter) + + val messageType = new CatalystSchemaConverter(configuration).convert(schema) + val metadata = Map(CatalystReadSupport.SPARK_METADATA_KEY -> schemaString).asJava + + logDebug( + s"""Initialized Parquet WriteSupport with Catalyst schema: + |${schema.prettyJson} + |and corresponding Parquet message type: + |$messageType + """.stripMargin) + + new WriteContext(messageType, metadata) + } + + override def prepareForWrite(recordConsumer: RecordConsumer): Unit = { + this.recordConsumer = recordConsumer + } + + override def write(row: InternalRow): Unit = { + consumeMessage(writeFields(row, schema, rootFieldWriters)) + } + + private def writeFields( + row: InternalRow, schema: StructType, fieldWriters: Seq[ValueWriter]): Unit = { + var i = 0 + while (i < row.numFields) { + if (!row.isNullAt(i)) { + consumeField(schema(i).name, i) { + fieldWriters(i).apply(row, i) + } + } + i += 1 + } + } + + private def makeWriter(dataType: DataType): ValueWriter = { + dataType match { + case BooleanType => + (row: SpecializedGetters, ordinal: Int) => + recordConsumer.addBoolean(row.getBoolean(ordinal)) + + case ByteType => + (row: SpecializedGetters, ordinal: Int) => + recordConsumer.addInteger(row.getByte(ordinal)) + + case ShortType => + (row: SpecializedGetters, ordinal: Int) => + recordConsumer.addInteger(row.getShort(ordinal)) + + case IntegerType | DateType => + (row: SpecializedGetters, ordinal: Int) => + recordConsumer.addInteger(row.getInt(ordinal)) + + case LongType => + (row: SpecializedGetters, ordinal: Int) => + recordConsumer.addLong(row.getLong(ordinal)) + + case FloatType => + (row: SpecializedGetters, ordinal: Int) => + recordConsumer.addFloat(row.getFloat(ordinal)) + + case DoubleType => + (row: SpecializedGetters, ordinal: Int) => + recordConsumer.addDouble(row.getDouble(ordinal)) + + case StringType => + (row: SpecializedGetters, ordinal: Int) => + recordConsumer.addBinary(Binary.fromByteArray(row.getUTF8String(ordinal).getBytes)) + + case TimestampType => + (row: SpecializedGetters, ordinal: Int) => { + // TODO Writes `TimestampType` values as `TIMESTAMP_MICROS` once parquet-mr implements it + // Currently we only support timestamps stored as INT96, which is compatible with Hive + // and Impala. However, INT96 is to be deprecated. We plan to support `TIMESTAMP_MICROS` + // defined in the parquet-format spec. But up until writing, the most recent parquet-mr + // version (1.8.1) hasn't implemented it yet. + + // NOTE: Starting from Spark 1.5, Spark SQL `TimestampType` only has microsecond + // precision. Nanosecond parts of timestamp values read from INT96 are simply stripped. + val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(row.getLong(ordinal)) + val buf = ByteBuffer.wrap(timestampBuffer) + buf.order(ByteOrder.LITTLE_ENDIAN).putLong(timeOfDayNanos).putInt(julianDay) + recordConsumer.addBinary(Binary.fromByteArray(timestampBuffer)) + } + + case BinaryType => + (row: SpecializedGetters, ordinal: Int) => + recordConsumer.addBinary(Binary.fromByteArray(row.getBinary(ordinal))) + + case DecimalType.Fixed(precision, scale) => + makeDecimalWriter(precision, scale) + + case t: StructType => + val fieldWriters = t.map(_.dataType).map(makeWriter) + (row: SpecializedGetters, ordinal: Int) => + consumeGroup(writeFields(row.getStruct(ordinal, t.length), t, fieldWriters)) + + case t: ArrayType => makeArrayWriter(t) + + case t: MapType => makeMapWriter(t) + + case t: UserDefinedType[_] => makeWriter(t.sqlType) + + // TODO Adds IntervalType support + case _ => sys.error(s"Unsupported data type $dataType.") + } + } + + private def makeDecimalWriter(precision: Int, scale: Int): ValueWriter = { + assert( + precision <= DecimalType.MAX_PRECISION, + s"Decimal precision $precision exceeds max precision ${DecimalType.MAX_PRECISION}") + + val numBytes = minBytesForPrecision(precision) + + val int32Writer = + (row: SpecializedGetters, ordinal: Int) => { + val unscaledInt = row.getDecimal(ordinal, precision, scale).toUnscaledLong.toInt + recordConsumer.addInteger(unscaledInt) + } + + val int64Writer = + (row: SpecializedGetters, ordinal: Int) => { + val unscaledLong = row.getDecimal(ordinal, precision, scale).toUnscaledLong + recordConsumer.addLong(unscaledLong) + } + + val binaryWriterUsingUnscaledLong = + (row: SpecializedGetters, ordinal: Int) => { + // When the precision is low enough (<= 18) to squeeze the decimal value into a `Long`, we + // can build a fixed-length byte array with length `numBytes` using the unscaled `Long` + // value and the `decimalBuffer` for better performance. + val unscaled = row.getDecimal(ordinal, precision, scale).toUnscaledLong + var i = 0 + var shift = 8 * (numBytes - 1) + + while (i < numBytes) { + decimalBuffer(i) = (unscaled >> shift).toByte + i += 1 + shift -= 8 + } + + recordConsumer.addBinary(Binary.fromByteArray(decimalBuffer, 0, numBytes)) + } + + val binaryWriterUsingUnscaledBytes = + (row: SpecializedGetters, ordinal: Int) => { + val decimal = row.getDecimal(ordinal, precision, scale) + val bytes = decimal.toJavaBigDecimal.unscaledValue().toByteArray + val fixedLengthBytes = if (bytes.length == numBytes) { + // If the length of the underlying byte array of the unscaled `BigInteger` happens to be + // `numBytes`, just reuse it, so that we don't bother copying it to `decimalBuffer`. + bytes + } else { + // Otherwise, the length must be less than `numBytes`. In this case we copy contents of + // the underlying bytes with padding sign bytes to `decimalBuffer` to form the result + // fixed-length byte array. + val signByte = if (bytes.head < 0) -1: Byte else 0: Byte + util.Arrays.fill(decimalBuffer, 0, numBytes - bytes.length, signByte) + System.arraycopy(bytes, 0, decimalBuffer, numBytes - bytes.length, bytes.length) + decimalBuffer + } + + recordConsumer.addBinary(Binary.fromByteArray(fixedLengthBytes, 0, numBytes)) + } + + writeLegacyParquetFormat match { + // Standard mode, 1 <= precision <= 9, writes as INT32 + case false if precision <= MAX_PRECISION_FOR_INT32 => int32Writer + + // Standard mode, 10 <= precision <= 18, writes as INT64 + case false if precision <= MAX_PRECISION_FOR_INT64 => int64Writer + + // Legacy mode, 1 <= precision <= 18, writes as FIXED_LEN_BYTE_ARRAY + case true if precision <= MAX_PRECISION_FOR_INT64 => binaryWriterUsingUnscaledLong + + // Either standard or legacy mode, 19 <= precision <= 38, writes as FIXED_LEN_BYTE_ARRAY + case _ => binaryWriterUsingUnscaledBytes + } + } + + def makeArrayWriter(arrayType: ArrayType): ValueWriter = { + val elementWriter = makeWriter(arrayType.elementType) + + def threeLevelArrayWriter(repeatedGroupName: String, elementFieldName: String): ValueWriter = + (row: SpecializedGetters, ordinal: Int) => { + val array = row.getArray(ordinal) + consumeGroup { + // Only creates the repeated field if the array is non-empty. + if (array.numElements() > 0) { + consumeField(repeatedGroupName, 0) { + var i = 0 + while (i < array.numElements()) { + consumeGroup { + // Only creates the element field if the current array element is not null. + if (!array.isNullAt(i)) { + consumeField(elementFieldName, 0) { + elementWriter.apply(array, i) + } + } + } + i += 1 + } + } + } + } + } + + def twoLevelArrayWriter(repeatedFieldName: String): ValueWriter = + (row: SpecializedGetters, ordinal: Int) => { + val array = row.getArray(ordinal) + consumeGroup { + // Only creates the repeated field if the array is non-empty. + if (array.numElements() > 0) { + consumeField(repeatedFieldName, 0) { + var i = 0 + while (i < array.numElements()) { + elementWriter.apply(array, i) + i += 1 + } + } + } + } + } + + (writeLegacyParquetFormat, arrayType.containsNull) match { + case (legacyMode @ false, _) => + // Standard mode: + // + // group (LIST) { + // repeated group list { + // ^~~~ repeatedGroupName + // element; + // ^~~~~~~ elementFieldName + // } + // } + threeLevelArrayWriter(repeatedGroupName = "list", elementFieldName = "element") + + case (legacyMode @ true, nullableElements @ true) => + // Legacy mode, with nullable elements: + // + // group (LIST) { + // optional group bag { + // ^~~ repeatedGroupName + // repeated array; + // ^~~~~ elementFieldName + // } + // } + threeLevelArrayWriter(repeatedGroupName = "bag", elementFieldName = "array") + + case (legacyMode @ true, nullableElements @ false) => + // Legacy mode, with non-nullable elements: + // + // group (LIST) { + // repeated array; + // ^~~~~ repeatedFieldName + // } + twoLevelArrayWriter(repeatedFieldName = "array") + } + } + + private def makeMapWriter(mapType: MapType): ValueWriter = { + val keyType = mapType.keyType + val valueType = mapType.valueType + val keyWriter = makeWriter(keyType) + val valueWriter = makeWriter(valueType) + val mutableRow = new SpecificMutableRow(keyType :: valueType :: Nil) + val repeatedGroupName = if (writeLegacyParquetFormat) { + // Legacy mode: + // + // group (MAP) { + // repeated group map (MAP_KEY_VALUE) { + // ^~~ repeatedGroupName + // required key; + // value; + // } + // } + "map" + } else { + // Standard mode: + // + // group (MAP) { + // repeated group key_value { + // ^~~~~~~~~ repeatedGroupName + // required key; + // value; + // } + // } + "key_value" + } + + (row: SpecializedGetters, ordinal: Int) => { + val map = row.getMap(ordinal) + consumeGroup { + // Only creates the repeated field if the map is non-empty. + if (map.numElements() > 0) { + consumeField(repeatedGroupName, 0) { + var i = 0 + while (i < map.numElements()) { + consumeGroup { + mutableRow.update(0, map.keyArray().get(i, keyType)) + consumeField("key", 0) { + keyWriter.apply(mutableRow, 0) + } + + // Only creates the "value" field if the value if non-empty + if (!map.valueArray().isNullAt(i)) { + mutableRow.update(1, map.valueArray().get(i, valueType)) + consumeField("value", 1) { + valueWriter.apply(mutableRow, 1) + } + } + } + i += 1 + } + } + } + } + } + } + + private def consumeMessage(f: => Unit): Unit = { + recordConsumer.startMessage() + f + recordConsumer.endMessage() + } + + private def consumeGroup(f: => Unit): Unit = { + recordConsumer.startGroup() + f + recordConsumer.endGroup() + } + + private def consumeField(field: String, index: Int)(f: => Unit): Unit = { + recordConsumer.startField(field, index) + f + recordConsumer.endField(field, index) + } +} + +private[parquet] object CatalystWriteSupport { + val SPARK_ROW_SCHEMA: String = "org.apache.spark.sql.parquet.row.attributes" + + def setSchema(schema: StructType, configuration: Configuration): Unit = { + schema.map(_.name).foreach(CatalystSchemaConverter.checkFieldName) + configuration.set(SPARK_ROW_SCHEMA, schema.json) + configuration.set( + ParquetOutputFormat.WRITER_VERSION, + ParquetProperties.WriterVersion.PARQUET_1_0.toString) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala index de1fd0166ac5..300e8677b312 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala @@ -39,7 +39,7 @@ import org.apache.parquet.hadoop.{ParquetFileReader, ParquetFileWriter, ParquetO * * NEVER use [[DirectParquetOutputCommitter]] when appending data, because currently there's * no safe way undo a failed appending job (that's why both `abortTask()` and `abortJob()` are - * left * empty). + * left empty). */ private[parquet] class DirectParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext) extends ParquetOutputCommitter(outputPath, context) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetConverter.scala deleted file mode 100644 index ccd7ebf319af..000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetConverter.scala +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.datasources.parquet - -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.types.{MapData, ArrayData} - -// TODO Removes this while fixing SPARK-8848 -private[sql] object CatalystConverter { - // This is mostly Parquet convention (see, e.g., `ConversionPatterns`). - // Note that "array" for the array elements is chosen by ParquetAvro. - // Using a different value will result in Parquet silently dropping columns. - val ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME = "bag" - val ARRAY_ELEMENTS_SCHEMA_NAME = "array" - - val MAP_KEY_SCHEMA_NAME = "key" - val MAP_VALUE_SCHEMA_NAME = "value" - val MAP_SCHEMA_NAME = "map" - - // TODO: consider using Array[T] for arrays to avoid boxing of primitive types - type ArrayScalaType = ArrayData - type StructScalaType = InternalRow - type MapScalaType = MapData -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index 8a9c0e733a9a..77d851ca486b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -218,8 +218,8 @@ private[sql] class ParquetRelation( } // SPARK-9849 DirectParquetOutputCommitter qualified name should be backward compatible - val committerClassname = conf.get(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key) - if (committerClassname == "org.apache.spark.sql.parquet.DirectParquetOutputCommitter") { + val committerClassName = conf.get(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key) + if (committerClassName == "org.apache.spark.sql.parquet.DirectParquetOutputCommitter") { conf.set(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key, classOf[DirectParquetOutputCommitter].getCanonicalName) } @@ -248,18 +248,22 @@ private[sql] class ParquetRelation( // bundled with `ParquetOutputFormat[Row]`. job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]]) - // TODO There's no need to use two kinds of WriteSupport - // We should unify them. `SpecificMutableRow` can process both atomic (primitive) types and - // complex types. - val writeSupportClass = - if (dataSchema.map(_.dataType).forall(ParquetTypesConverter.isPrimitiveType)) { - classOf[MutableRowWriteSupport] - } else { - classOf[RowWriteSupport] - } + ParquetOutputFormat.setWriteSupportClass(job, classOf[CatalystWriteSupport]) + CatalystWriteSupport.setSchema(dataSchema, conf) + + // Sets flags for `CatalystSchemaConverter` (which converts Catalyst schema to Parquet schema) + // and `CatalystWriteSupport` (writing actual rows to Parquet files). + conf.set( + SQLConf.PARQUET_BINARY_AS_STRING.key, + sqlContext.conf.isParquetBinaryAsString.toString) - ParquetOutputFormat.setWriteSupportClass(job, writeSupportClass) - RowWriteSupport.setSchema(dataSchema.toAttributes, conf) + conf.set( + SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, + sqlContext.conf.isParquetINT96AsTimestamp.toString) + + conf.set( + SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key, + sqlContext.conf.writeLegacyParquetFormat.toString) // Sets compression scheme conf.set( @@ -287,7 +291,6 @@ private[sql] class ParquetRelation( val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp - val writeLegacyParquetFormat = sqlContext.conf.writeLegacyParquetFormat // Parquet row group size. We will use this value as the value for // mapreduce.input.fileinputformat.split.minsize and mapred.min.split.size if the value @@ -304,8 +307,7 @@ private[sql] class ParquetRelation( useMetadataCache, parquetFilterPushDown, assumeBinaryIsString, - assumeInt96IsTimestamp, - writeLegacyParquetFormat) _ + assumeInt96IsTimestamp) _ // Create the function to set input paths at the driver side. val setInputPaths = @@ -530,8 +532,7 @@ private[sql] object ParquetRelation extends Logging { useMetadataCache: Boolean, parquetFilterPushDown: Boolean, assumeBinaryIsString: Boolean, - assumeInt96IsTimestamp: Boolean, - writeLegacyParquetFormat: Boolean)(job: Job): Unit = { + assumeInt96IsTimestamp: Boolean)(job: Job): Unit = { val conf = SparkHadoopUtil.get.getConfigurationFromJobContext(job) conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName) @@ -552,16 +553,15 @@ private[sql] object ParquetRelation extends Logging { }) conf.set( - RowWriteSupport.SPARK_ROW_SCHEMA, + CatalystWriteSupport.SPARK_ROW_SCHEMA, CatalystSchemaConverter.checkFieldNames(dataSchema).json) // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata conf.setBoolean(SQLConf.PARQUET_CACHE_METADATA.key, useMetadataCache) - // Sets flags for Parquet schema conversion + // Sets flags for `CatalystSchemaConverter` conf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key, assumeBinaryIsString) conf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, assumeInt96IsTimestamp) - conf.setBoolean(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key, writeLegacyParquetFormat) overrideMinSplitSize(parquetBlockSize, conf) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTableSupport.scala deleted file mode 100644 index ed89aa27aa1f..000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTableSupport.scala +++ /dev/null @@ -1,321 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.datasources.parquet - -import java.math.BigInteger -import java.nio.{ByteBuffer, ByteOrder} -import java.util.{HashMap => JHashMap} - -import org.apache.hadoop.conf.Configuration -import org.apache.parquet.column.ParquetProperties -import org.apache.parquet.hadoop.ParquetOutputFormat -import org.apache.parquet.hadoop.api.WriteSupport -import org.apache.parquet.io.api._ - -import org.apache.spark.Logging -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.types.UTF8String - -/** - * A `parquet.hadoop.api.WriteSupport` for Row objects. - */ -private[parquet] class RowWriteSupport extends WriteSupport[InternalRow] with Logging { - - private[parquet] var writer: RecordConsumer = null - private[parquet] var attributes: Array[Attribute] = null - - override def init(configuration: Configuration): WriteSupport.WriteContext = { - val origAttributesStr: String = configuration.get(RowWriteSupport.SPARK_ROW_SCHEMA) - val metadata = new JHashMap[String, String]() - metadata.put(CatalystReadSupport.SPARK_METADATA_KEY, origAttributesStr) - - if (attributes == null) { - attributes = ParquetTypesConverter.convertFromString(origAttributesStr).toArray - } - - log.debug(s"write support initialized for requested schema $attributes") - new WriteSupport.WriteContext(ParquetTypesConverter.convertFromAttributes(attributes), metadata) - } - - override def prepareForWrite(recordConsumer: RecordConsumer): Unit = { - writer = recordConsumer - log.debug(s"preparing for write with schema $attributes") - } - - override def write(record: InternalRow): Unit = { - val attributesSize = attributes.size - if (attributesSize > record.numFields) { - throw new IndexOutOfBoundsException("Trying to write more fields than contained in row " + - s"($attributesSize > ${record.numFields})") - } - - var index = 0 - writer.startMessage() - while(index < attributesSize) { - // null values indicate optional fields but we do not check currently - if (!record.isNullAt(index)) { - writer.startField(attributes(index).name, index) - writeValue(attributes(index).dataType, record.get(index, attributes(index).dataType)) - writer.endField(attributes(index).name, index) - } - index = index + 1 - } - writer.endMessage() - } - - private[parquet] def writeValue(schema: DataType, value: Any): Unit = { - if (value != null) { - schema match { - case t: UserDefinedType[_] => writeValue(t.sqlType, value) - case t @ ArrayType(_, _) => writeArray( - t, - value.asInstanceOf[CatalystConverter.ArrayScalaType]) - case t @ MapType(_, _, _) => writeMap( - t, - value.asInstanceOf[CatalystConverter.MapScalaType]) - case t @ StructType(_) => writeStruct( - t, - value.asInstanceOf[CatalystConverter.StructScalaType]) - case _ => writePrimitive(schema.asInstanceOf[AtomicType], value) - } - } - } - - private[parquet] def writePrimitive(schema: DataType, value: Any): Unit = { - if (value != null) { - schema match { - case BooleanType => writer.addBoolean(value.asInstanceOf[Boolean]) - case ByteType => writer.addInteger(value.asInstanceOf[Byte]) - case ShortType => writer.addInteger(value.asInstanceOf[Short]) - case IntegerType | DateType => writer.addInteger(value.asInstanceOf[Int]) - case LongType => writer.addLong(value.asInstanceOf[Long]) - case TimestampType => writeTimestamp(value.asInstanceOf[Long]) - case FloatType => writer.addFloat(value.asInstanceOf[Float]) - case DoubleType => writer.addDouble(value.asInstanceOf[Double]) - case StringType => writer.addBinary( - Binary.fromByteArray(value.asInstanceOf[UTF8String].getBytes)) - case BinaryType => writer.addBinary( - Binary.fromByteArray(value.asInstanceOf[Array[Byte]])) - case DecimalType.Fixed(precision, _) => - writeDecimal(value.asInstanceOf[Decimal], precision) - case _ => sys.error(s"Do not know how to writer $schema to consumer") - } - } - } - - private[parquet] def writeStruct( - schema: StructType, - struct: CatalystConverter.StructScalaType): Unit = { - if (struct != null) { - val fields = schema.fields.toArray - writer.startGroup() - var i = 0 - while(i < fields.length) { - if (!struct.isNullAt(i)) { - writer.startField(fields(i).name, i) - writeValue(fields(i).dataType, struct.get(i, fields(i).dataType)) - writer.endField(fields(i).name, i) - } - i = i + 1 - } - writer.endGroup() - } - } - - private[parquet] def writeArray( - schema: ArrayType, - array: CatalystConverter.ArrayScalaType): Unit = { - val elementType = schema.elementType - writer.startGroup() - if (array.numElements() > 0) { - if (schema.containsNull) { - writer.startField(CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME, 0) - var i = 0 - while (i < array.numElements()) { - writer.startGroup() - if (!array.isNullAt(i)) { - writer.startField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0) - writeValue(elementType, array.get(i, elementType)) - writer.endField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0) - } - writer.endGroup() - i = i + 1 - } - writer.endField(CatalystConverter.ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME, 0) - } else { - writer.startField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0) - var i = 0 - while (i < array.numElements()) { - writeValue(elementType, array.get(i, elementType)) - i = i + 1 - } - writer.endField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0) - } - } - writer.endGroup() - } - - private[parquet] def writeMap( - schema: MapType, - map: CatalystConverter.MapScalaType): Unit = { - writer.startGroup() - val length = map.numElements() - if (length > 0) { - writer.startField(CatalystConverter.MAP_SCHEMA_NAME, 0) - map.foreach(schema.keyType, schema.valueType, (key, value) => { - writer.startGroup() - writer.startField(CatalystConverter.MAP_KEY_SCHEMA_NAME, 0) - writeValue(schema.keyType, key) - writer.endField(CatalystConverter.MAP_KEY_SCHEMA_NAME, 0) - if (value != null) { - writer.startField(CatalystConverter.MAP_VALUE_SCHEMA_NAME, 1) - writeValue(schema.valueType, value) - writer.endField(CatalystConverter.MAP_VALUE_SCHEMA_NAME, 1) - } - writer.endGroup() - }) - writer.endField(CatalystConverter.MAP_SCHEMA_NAME, 0) - } - writer.endGroup() - } - - // Scratch array used to write decimals as fixed-length byte array - private[this] var reusableDecimalBytes = new Array[Byte](16) - - private[parquet] def writeDecimal(decimal: Decimal, precision: Int): Unit = { - val numBytes = CatalystSchemaConverter.minBytesForPrecision(precision) - - def longToBinary(unscaled: Long): Binary = { - var i = 0 - var shift = 8 * (numBytes - 1) - while (i < numBytes) { - reusableDecimalBytes(i) = (unscaled >> shift).toByte - i += 1 - shift -= 8 - } - Binary.fromByteArray(reusableDecimalBytes, 0, numBytes) - } - - def bigIntegerToBinary(unscaled: BigInteger): Binary = { - unscaled.toByteArray match { - case bytes if bytes.length == numBytes => - Binary.fromByteArray(bytes) - - case bytes if bytes.length <= reusableDecimalBytes.length => - val signedByte = (if (bytes.head < 0) -1 else 0).toByte - java.util.Arrays.fill(reusableDecimalBytes, 0, numBytes - bytes.length, signedByte) - System.arraycopy(bytes, 0, reusableDecimalBytes, numBytes - bytes.length, bytes.length) - Binary.fromByteArray(reusableDecimalBytes, 0, numBytes) - - case bytes => - reusableDecimalBytes = new Array[Byte](bytes.length) - bigIntegerToBinary(unscaled) - } - } - - val binary = if (numBytes <= 8) { - longToBinary(decimal.toUnscaledLong) - } else { - bigIntegerToBinary(decimal.toJavaBigDecimal.unscaledValue()) - } - - writer.addBinary(binary) - } - - // array used to write Timestamp as Int96 (fixed-length binary) - private[this] val int96buf = new Array[Byte](12) - - private[parquet] def writeTimestamp(ts: Long): Unit = { - val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(ts) - val buf = ByteBuffer.wrap(int96buf) - buf.order(ByteOrder.LITTLE_ENDIAN) - buf.putLong(timeOfDayNanos) - buf.putInt(julianDay) - writer.addBinary(Binary.fromByteArray(int96buf)) - } -} - -// Optimized for non-nested rows -private[parquet] class MutableRowWriteSupport extends RowWriteSupport { - override def write(record: InternalRow): Unit = { - val attributesSize = attributes.size - if (attributesSize > record.numFields) { - throw new IndexOutOfBoundsException("Trying to write more fields than contained in row " + - s"($attributesSize > ${record.numFields})") - } - - var index = 0 - writer.startMessage() - while(index < attributesSize) { - // null values indicate optional fields but we do not check currently - if (!record.isNullAt(index) && !record.isNullAt(index)) { - writer.startField(attributes(index).name, index) - consumeType(attributes(index).dataType, record, index) - writer.endField(attributes(index).name, index) - } - index = index + 1 - } - writer.endMessage() - } - - private def consumeType( - ctype: DataType, - record: InternalRow, - index: Int): Unit = { - ctype match { - case BooleanType => writer.addBoolean(record.getBoolean(index)) - case ByteType => writer.addInteger(record.getByte(index)) - case ShortType => writer.addInteger(record.getShort(index)) - case IntegerType | DateType => writer.addInteger(record.getInt(index)) - case LongType => writer.addLong(record.getLong(index)) - case TimestampType => writeTimestamp(record.getLong(index)) - case FloatType => writer.addFloat(record.getFloat(index)) - case DoubleType => writer.addDouble(record.getDouble(index)) - case StringType => - writer.addBinary(Binary.fromByteArray(record.getUTF8String(index).getBytes)) - case BinaryType => - writer.addBinary(Binary.fromByteArray(record.getBinary(index))) - case DecimalType.Fixed(precision, scale) => - writeDecimal(record.getDecimal(index, precision, scale), precision) - case _ => sys.error(s"Unsupported datatype $ctype, cannot write to consumer") - } - } -} - -private[parquet] object RowWriteSupport { - val SPARK_ROW_SCHEMA: String = "org.apache.spark.sql.parquet.row.attributes" - - def getSchema(configuration: Configuration): Seq[Attribute] = { - val schemaString = configuration.get(RowWriteSupport.SPARK_ROW_SCHEMA) - if (schemaString == null) { - throw new RuntimeException("Missing schema!") - } - ParquetTypesConverter.convertFromString(schemaString) - } - - def setSchema(schema: Seq[Attribute], configuration: Configuration) { - val encoded = ParquetTypesConverter.convertToString(schema) - configuration.set(SPARK_ROW_SCHEMA, encoded) - configuration.set( - ParquetOutputFormat.WRITER_VERSION, - ParquetProperties.WriterVersion.PARQUET_1_0.toString) - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypesConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypesConverter.scala deleted file mode 100644 index b647bb6116af..000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypesConverter.scala +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.datasources.parquet - -import java.io.IOException -import java.util.{Collections, Arrays} - -import scala.util.Try - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.mapreduce.Job -import org.apache.parquet.format.converter.ParquetMetadataConverter -import org.apache.parquet.hadoop.metadata.{FileMetaData, ParquetMetadata} -import org.apache.parquet.hadoop.util.ContextUtil -import org.apache.parquet.hadoop.{Footer, ParquetFileReader, ParquetFileWriter} -import org.apache.parquet.schema.MessageType - -import org.apache.spark.Logging -import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.types._ - - -private[parquet] object ParquetTypesConverter extends Logging { - def isPrimitiveType(ctype: DataType): Boolean = ctype match { - case _: NumericType | BooleanType | DateType | TimestampType | StringType | BinaryType => true - case _ => false - } - - /** - * Compute the FIXED_LEN_BYTE_ARRAY length needed to represent a given DECIMAL precision. - */ - private[parquet] val BYTES_FOR_PRECISION = Array.tabulate[Int](38) { precision => - var length = 1 - while (math.pow(2.0, 8 * length - 1) < math.pow(10.0, precision)) { - length += 1 - } - length - } - - def convertFromAttributes(attributes: Seq[Attribute]): MessageType = { - val converter = new CatalystSchemaConverter() - converter.convert(StructType.fromAttributes(attributes)) - } - - def convertFromString(string: String): Seq[Attribute] = { - Try(DataType.fromJson(string)).getOrElse(DataType.fromCaseClassString(string)) match { - case s: StructType => s.toAttributes - case other => sys.error(s"Can convert $string to row") - } - } - - def convertToString(schema: Seq[Attribute]): String = { - schema.map(_.name).foreach(CatalystSchemaConverter.checkFieldName) - StructType.fromAttributes(schema).json - } - - def writeMetaData(attributes: Seq[Attribute], origPath: Path, conf: Configuration): Unit = { - if (origPath == null) { - throw new IllegalArgumentException("Unable to write Parquet metadata: path is null") - } - val fs = origPath.getFileSystem(conf) - if (fs == null) { - throw new IllegalArgumentException( - s"Unable to write Parquet metadata: path $origPath is incorrectly formatted") - } - val path = origPath.makeQualified(fs) - if (fs.exists(path) && !fs.getFileStatus(path).isDir) { - throw new IllegalArgumentException(s"Expected to write to directory $path but found file") - } - val metadataPath = new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE) - if (fs.exists(metadataPath)) { - try { - fs.delete(metadataPath, true) - } catch { - case e: IOException => - throw new IOException(s"Unable to delete previous PARQUET_METADATA_FILE at $metadataPath") - } - } - val extraMetadata = new java.util.HashMap[String, String]() - extraMetadata.put( - CatalystReadSupport.SPARK_METADATA_KEY, - ParquetTypesConverter.convertToString(attributes)) - // TODO: add extra data, e.g., table name, date, etc.? - - val parquetSchema: MessageType = ParquetTypesConverter.convertFromAttributes(attributes) - val metaData: FileMetaData = new FileMetaData( - parquetSchema, - extraMetadata, - "Spark") - - ParquetFileWriter.writeMetadataFile( - conf, - path, - Arrays.asList(new Footer(path, new ParquetMetadata(metaData, Collections.emptyList())))) - } - - /** - * Try to read Parquet metadata at the given Path. We first see if there is a summary file - * in the parent directory. If so, this is used. Else we read the actual footer at the given - * location. - * @param origPath The path at which we expect one (or more) Parquet files. - * @param configuration The Hadoop configuration to use. - * @return The `ParquetMetadata` containing among other things the schema. - */ - def readMetaData(origPath: Path, configuration: Option[Configuration]): ParquetMetadata = { - if (origPath == null) { - throw new IllegalArgumentException("Unable to read Parquet metadata: path is null") - } - val job = new Job() - val conf = { - // scalastyle:off jobcontext - configuration.getOrElse(ContextUtil.getConfiguration(job)) - // scalastyle:on jobcontext - } - val fs: FileSystem = origPath.getFileSystem(conf) - if (fs == null) { - throw new IllegalArgumentException(s"Incorrectly formatted Parquet metadata path $origPath") - } - val path = origPath.makeQualified(fs) - - val children = - fs - .globStatus(path) - .flatMap { status => if (status.isDir) fs.listStatus(status.getPath) else List(status) } - .filterNot { status => - val name = status.getPath.getName - (name(0) == '.' || name(0) == '_') && name != ParquetFileWriter.PARQUET_METADATA_FILE - } - - // NOTE (lian): Parquet "_metadata" file can be very slow if the file consists of lots of row - // groups. Since Parquet schema is replicated among all row groups, we only need to touch a - // single row group to read schema related metadata. Notice that we are making assumptions that - // all data in a single Parquet file have the same schema, which is normally true. - children - // Try any non-"_metadata" file first... - .find(_.getPath.getName != ParquetFileWriter.PARQUET_METADATA_FILE) - // ... and fallback to "_metadata" if no such file exists (which implies the Parquet file is - // empty, thus normally the "_metadata" file is expected to be fairly small). - .orElse(children.find(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE)) - .map(ParquetFileReader.readFooter(conf, _, ParquetMetadataConverter.NO_FILTER)) - .getOrElse( - throw new IllegalArgumentException(s"Could not find Parquet metadata at path $path")) - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index cd552e83372f..4c4e3a45811f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -28,10 +28,10 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} import org.apache.parquet.example.data.simple.SimpleGroup import org.apache.parquet.example.data.{Group, GroupWriter} +import org.apache.parquet.hadoop._ import org.apache.parquet.hadoop.api.WriteSupport import org.apache.parquet.hadoop.api.WriteSupport.WriteContext -import org.apache.parquet.hadoop.metadata.{BlockMetaData, CompressionCodecName, FileMetaData, ParquetMetadata} -import org.apache.parquet.hadoop.{Footer, ParquetFileWriter, ParquetOutputCommitter, ParquetWriter} +import org.apache.parquet.hadoop.metadata.{CompressionCodecName, FileMetaData, ParquetMetadata} import org.apache.parquet.io.api.RecordConsumer import org.apache.parquet.schema.{MessageType, MessageTypeParser} @@ -99,16 +99,17 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING.key -> "true")(checkParquetFile(data)) } - test("fixed-length decimals") { + testStandardAndLegacyModes("fixed-length decimals") { def makeDecimalRDD(decimal: DecimalType): DataFrame = sparkContext .parallelize(0 to 1000) - .map(i => Tuple1(i / 100.0)) + .map(i => Tuple1((i - 500) / 100.0)) .toDF() // Parquet doesn't allow column names with spaces, have to add an alias here .select($"_1" cast decimal as "dec") - for ((precision, scale) <- Seq((5, 2), (1, 0), (1, 1), (18, 10), (18, 17), (19, 0), (38, 37))) { + val combinations = Seq((5, 2), (1, 0), (1, 1), (18, 10), (18, 17), (19, 0), (38, 37)) + for ((precision, scale) <- combinations) { withTempPath { dir => val data = makeDecimalRDD(DecimalType(precision, scale)) data.write.parquet(dir.getCanonicalPath) @@ -132,22 +133,22 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { } } - test("map") { + testStandardAndLegacyModes("map") { val data = (1 to 4).map(i => Tuple1(Map(i -> s"val_$i"))) checkParquetFile(data) } - test("array") { + testStandardAndLegacyModes("array") { val data = (1 to 4).map(i => Tuple1(Seq(i, i + 1))) checkParquetFile(data) } - test("array and double") { + testStandardAndLegacyModes("array and double") { val data = (1 to 4).map(i => (i.toDouble, Seq(i.toDouble, (i + 1).toDouble))) checkParquetFile(data) } - test("struct") { + testStandardAndLegacyModes("struct") { val data = (1 to 4).map(i => Tuple1((i, s"val_$i"))) withParquetDataFrame(data) { df => // Structs are converted to `Row`s @@ -157,7 +158,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { } } - test("nested struct with array of array as field") { + testStandardAndLegacyModes("nested struct with array of array as field") { val data = (1 to 4).map(i => Tuple1((i, Seq(Seq(s"val_$i"))))) withParquetDataFrame(data) { df => // Structs are converted to `Row`s @@ -167,7 +168,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { } } - test("nested map with struct as value type") { + testStandardAndLegacyModes("nested map with struct as value type") { val data = (1 to 4).map(i => Tuple1(Map(i -> (i, s"val_$i")))) withParquetDataFrame(data) { df => checkAnswer(df, data.map { case Tuple1(m) => @@ -176,6 +177,28 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { } } + test("read standard Parquet file under legacy mode") { + withTempPath { dir => + val path = dir.getCanonicalPath + + withSQLConf(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> "false") { + sqlContext + .range(4) + .selectExpr("NAMED_STRUCT('a', id, 'b', ARRAY(id, id + 1, id + 2)) AS s") + .write + .parquet(path) + } + + withSQLConf(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> "true") { + checkAnswer( + sqlContext.read.parquet(path), + (0 until 4).map { + id => Row(Row(id, Seq(id, id + 1, id + 2))) + }) + } + } + } + test("nulls") { val allNulls = ( null.asInstanceOf[java.lang.Boolean], @@ -205,14 +228,14 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { } test("compression codec") { - def compressionCodecFor(path: String): String = { - val codecs = ParquetTypesConverter - .readMetaData(new Path(path), Some(hadoopConfiguration)).getBlocks.asScala - .flatMap(_.getColumns.asScala) - .map(_.getCodec.name()) - .distinct - - assert(codecs.size === 1) + def compressionCodecFor(path: String, codecName: String): String = { + val codecs = for { + footer <- readAllFootersWithoutSummaryFiles(new Path(path), hadoopConfiguration) + block <- footer.getParquetMetadata.getBlocks.asScala + column <- block.getColumns.asScala + } yield column.getCodec.name() + + assert(codecs.distinct === Seq(codecName)) codecs.head } @@ -222,7 +245,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> codec.name()) { withParquetFile(data) { path => assertResult(sqlContext.conf.parquetCompressionCodec.toUpperCase) { - compressionCodecFor(path) + compressionCodecFor(path, codec.name()) } } } @@ -278,15 +301,14 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { withTempPath { file => val path = new Path(file.toURI.toString) val fs = FileSystem.getLocal(hadoopConfiguration) - val attributes = ScalaReflection.attributesFor[(Int, String)] - ParquetTypesConverter.writeMetaData(attributes, path, hadoopConfiguration) + val schema = StructType.fromAttributes(ScalaReflection.attributesFor[(Int, String)]) + writeMetadata(schema, path, hadoopConfiguration) assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_COMMON_METADATA_FILE))) assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE))) - val metaData = ParquetTypesConverter.readMetaData(path, Some(hadoopConfiguration)) - val actualSchema = metaData.getFileMetaData.getSchema - val expectedSchema = ParquetTypesConverter.convertFromAttributes(attributes) + val expectedSchema = new CatalystSchemaConverter().convert(schema) + val actualSchema = readFooter(path, hadoopConfiguration).getFileMetaData.getSchema actualSchema.checkContains(expectedSchema) expectedSchema.checkContains(actualSchema) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala index f17fb36f25fe..128b59edede4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -357,8 +357,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest { val jsonString = """{"type":"struct","fields":[{"name":"c1","type":"integer","nullable":false,"metadata":{}},{"name":"c2","type":"binary","nullable":true,"metadata":{}}]}""" // scalastyle:on - val fromCaseClassString = ParquetTypesConverter.convertFromString(caseClassString) - val fromJson = ParquetTypesConverter.convertFromString(jsonString) + val fromCaseClassString = StructType.fromString(caseClassString) + val fromJson = StructType.fromString(jsonString) (fromCaseClassString, fromJson).zipped.foreach { (a, b) => assert(a.name == b.name) @@ -665,7 +665,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { writeLegacyParquetFormat = false) testCatalystToParquet( - "Backwards-compatibility: LIST with nullable element type - 2 - prior to 1.4.x", + "Backwards-compatibility: LIST with nullable element type - 2 - prior to 1.5.x", StructType(Seq( StructField( "f1", @@ -703,7 +703,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { writeLegacyParquetFormat = false) testCatalystToParquet( - "Backwards-compatibility: LIST with non-nullable element type - 2 - prior to 1.4.x", + "Backwards-compatibility: LIST with non-nullable element type - 2 - prior to 1.5.x", StructType(Seq( StructField( "f1", @@ -764,7 +764,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { writeLegacyParquetFormat = true) testParquetToCatalyst( - "Backwards-compatibility: MAP with non-nullable value type - 3 - prior to 1.4.x", + "Backwards-compatibility: MAP with non-nullable value type - 3 - prior to 1.5.x", StructType(Seq( StructField( "f1", @@ -868,7 +868,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { writeLegacyParquetFormat = false) testCatalystToParquet( - "Backwards-compatibility: MAP with non-nullable value type - 2 - prior to 1.4.x", + "Backwards-compatibility: MAP with non-nullable value type - 2 - prior to 1.5.x", StructType(Seq( StructField( "f1", @@ -908,7 +908,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { writeLegacyParquetFormat = false) testCatalystToParquet( - "Backwards-compatibility: MAP with nullable value type - 3 - prior to 1.4.x", + "Backwards-compatibility: MAP with nullable value type - 3 - prior to 1.5.x", StructType(Seq( StructField( "f1", @@ -987,7 +987,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { writeLegacyParquetFormat = false) testSchema( - "DECIMAL(1, 0) - prior to 1.4.x", + "DECIMAL(1, 0) - prior to 1.5.x", StructType(Seq(StructField("f1", DecimalType(1, 0)))), """message root { | optional fixed_len_byte_array(1) f1 (DECIMAL(1, 0)); @@ -998,7 +998,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { writeLegacyParquetFormat = true) testSchema( - "DECIMAL(8, 3) - prior to 1.4.x", + "DECIMAL(8, 3) - prior to 1.5.x", StructType(Seq(StructField("f1", DecimalType(8, 3)))), """message root { | optional fixed_len_byte_array(4) f1 (DECIMAL(8, 3)); @@ -1009,7 +1009,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { writeLegacyParquetFormat = true) testSchema( - "DECIMAL(9, 3) - prior to 1.4.x", + "DECIMAL(9, 3) - prior to 1.5.x", StructType(Seq(StructField("f1", DecimalType(9, 3)))), """message root { | optional fixed_len_byte_array(5) f1 (DECIMAL(9, 3)); @@ -1020,7 +1020,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { writeLegacyParquetFormat = true) testSchema( - "DECIMAL(18, 3) - prior to 1.4.x", + "DECIMAL(18, 3) - prior to 1.5.x", StructType(Seq(StructField("f1", DecimalType(18, 3)))), """message root { | optional fixed_len_byte_array(8) f1 (DECIMAL(18, 3)); diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala index 442fafb12f20..9840ad919e51 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala @@ -19,11 +19,19 @@ package org.apache.spark.sql.execution.datasources.parquet import java.io.File +import scala.collection.JavaConverters._ import scala.reflect.ClassTag import scala.reflect.runtime.universe.TypeTag +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.parquet.format.converter.ParquetMetadataConverter +import org.apache.parquet.hadoop.metadata.{BlockMetaData, FileMetaData, ParquetMetadata} +import org.apache.parquet.hadoop.{Footer, ParquetFileReader, ParquetFileWriter} + import org.apache.spark.sql.test.SQLTestUtils -import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{DataFrame, SQLConf, SaveMode} /** * A helper trait that provides convenient facilities for Parquet testing. @@ -97,4 +105,38 @@ private[sql] trait ParquetTest extends SQLTestUtils { assert(partDir.mkdirs(), s"Couldn't create directory $partDir") partDir } + + protected def writeMetadata( + schema: StructType, path: Path, configuration: Configuration): Unit = { + val parquetSchema = new CatalystSchemaConverter().convert(schema) + val extraMetadata = Map(CatalystReadSupport.SPARK_METADATA_KEY -> schema.json).asJava + val createdBy = s"Apache Spark ${org.apache.spark.SPARK_VERSION}" + val fileMetadata = new FileMetaData(parquetSchema, extraMetadata, createdBy) + val parquetMetadata = new ParquetMetadata(fileMetadata, Seq.empty[BlockMetaData].asJava) + val footer = new Footer(path, parquetMetadata) + ParquetFileWriter.writeMetadataFile(configuration, path, Seq(footer).asJava) + } + + protected def readAllFootersWithoutSummaryFiles( + path: Path, configuration: Configuration): Seq[Footer] = { + val fs = path.getFileSystem(configuration) + ParquetFileReader.readAllFootersInParallel(configuration, fs.getFileStatus(path)).asScala.toSeq + } + + protected def readFooter(path: Path, configuration: Configuration): ParquetMetadata = { + ParquetFileReader.readFooter( + configuration, + new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE), + ParquetMetadataConverter.NO_FILTER) + } + + protected def testStandardAndLegacyModes(testName: String)(f: => Unit): Unit = { + test(s"Standard mode - $testName") { + withSQLConf(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> "false") { f } + } + + test(s"Legacy mode - $testName") { + withSQLConf(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> "true") { f } + } + } } From 6fd20f70baa535b1772c1c30a3f651ea673560f2 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Mon, 5 Oct 2015 22:28:40 -0700 Subject: [PATCH 02/12] Optimizes writing low precision decimals --- .../datasources/parquet/CatalystWriteSupport.scala | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala index 00318c958acb..196ac156ea81 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala @@ -197,16 +197,12 @@ private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] wi val numBytes = minBytesForPrecision(precision) val int32Writer = - (row: SpecializedGetters, ordinal: Int) => { - val unscaledInt = row.getDecimal(ordinal, precision, scale).toUnscaledLong.toInt - recordConsumer.addInteger(unscaledInt) - } + (row: SpecializedGetters, ordinal: Int) => + recordConsumer.addInteger(row.getLong(ordinal).toInt) val int64Writer = - (row: SpecializedGetters, ordinal: Int) => { - val unscaledLong = row.getDecimal(ordinal, precision, scale).toUnscaledLong - recordConsumer.addLong(unscaledLong) - } + (row: SpecializedGetters, ordinal: Int) => + recordConsumer.addLong(row.getLong(ordinal)) val binaryWriterUsingUnscaledLong = (row: SpecializedGetters, ordinal: Int) => { From d395bfd0bb73a7522e807e43f80e4307a65001a2 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 6 Oct 2015 11:06:18 -0700 Subject: [PATCH 03/12] Fixes bug in reading UDTs under standard mode --- .../parquet/CatalystReadSupport.scala | 30 +++++++++++- .../parquet/CatalystRowConverter.scala | 16 +++---- .../parquet/CatalystWriteSupport.scala | 2 +- .../spark/sql/UserDefinedTypeSuite.scala | 32 +++++++++---- .../parquet/ParquetQuerySuite.scala | 46 ++++++++++++++++++- 5 files changed, 105 insertions(+), 21 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala index 628dd11ca81b..a958373eb769 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala @@ -95,7 +95,9 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with """.stripMargin } - new CatalystRecordMaterializer(parquetRequestedSchema, catalystRequestedSchema) + new CatalystRecordMaterializer( + parquetRequestedSchema, + CatalystReadSupport.expandUDT(catalystRequestedSchema)) } } @@ -274,4 +276,30 @@ private[parquet] object CatalystReadSupport { .getOrElse(toParquet.convertField(f)) } } + + def expandUDT(schema: StructType): StructType = { + def expand(dataType: DataType): DataType = { + dataType match { + case t: ArrayType => + t.copy(elementType = expand(t.elementType)) + + case t: MapType => + t.copy( + keyType = expand(t.keyType), + valueType = expand(t.valueType)) + + case t: StructType => + val expandedFields = t.fields.map(f => f.copy(dataType = expand(f.dataType))) + t.copy(fields = expandedFields) + + case t: UserDefinedType[_] => + t.sqlType + + case t => + t + } + } + + expand(schema).asInstanceOf[StructType] + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala index f7615f5d7e8e..fb1b60a889dc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala @@ -114,7 +114,8 @@ private[parquet] class CatalystPrimitiveConverter(val updater: ParentContainerUp * any "parent" container. * * @param parquetType Parquet schema of Parquet records - * @param catalystType Spark SQL schema that corresponds to the Parquet record type + * @param catalystType Spark SQL schema that corresponds to the Parquet record type. User-defined + * types should have been expanded. * @param updater An updater which propagates converted field values to the parent container */ private[parquet] class CatalystRowConverter( @@ -133,6 +134,12 @@ private[parquet] class CatalystRowConverter( |${catalystType.prettyJson} """.stripMargin) + assert( + !catalystType.existsRecursively(_.isInstanceOf[UserDefinedType[_]]), + s"""User-defined types in Catalyst schema should have already been expanded: + |${catalystType.prettyJson} + """.stripMargin) + logDebug( s"""Building row converter for the following schema: | @@ -268,13 +275,6 @@ private[parquet] class CatalystRowConverter( override def set(value: Any): Unit = updater.set(value.asInstanceOf[InternalRow].copy()) }) - case t: UserDefinedType[_] => - val catalystTypeForUDT = t.sqlType - val nullable = parquetType.isRepetition(Repetition.OPTIONAL) - val field = StructField("udt", catalystTypeForUDT, nullable) - val parquetTypeForUDT = new CatalystSchemaConverter().convertField(field) - newConverter(parquetTypeForUDT, catalystTypeForUDT, updater) - case _ => throw new RuntimeException( s"Unable to create Parquet converter for data type ${catalystType.json}") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala index 196ac156ea81..20e0fcd1e18c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala @@ -85,7 +85,7 @@ private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] wi val messageType = new CatalystSchemaConverter(configuration).convert(schema) val metadata = Map(CatalystReadSupport.SPARK_METADATA_KEY -> schemaString).asJava - logDebug( + logInfo( s"""Initialized Parquet WriteSupport with Catalyst schema: |${schema.prettyJson} |and corresponding Parquet message type: diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala index 7992fd59ff4b..d17671d48a2f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala @@ -24,6 +24,7 @@ import com.clearspring.analytics.stream.cardinality.HyperLogLog import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.CatalystTypeConverters import org.apache.spark.sql.catalyst.expressions.{OpenHashSetUDT, HyperLogLogUDT} +import org.apache.spark.sql.execution.datasources.parquet.ParquetTest import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ @@ -68,7 +69,7 @@ private[sql] class MyDenseVectorUDT extends UserDefinedType[MyDenseVector] { private[spark] override def asNullable: MyDenseVectorUDT = this } -class UserDefinedTypeSuite extends QueryTest with SharedSQLContext { +class UserDefinedTypeSuite extends QueryTest with SharedSQLContext with ParquetTest { import testImplicits._ private lazy val pointsRDD = Seq( @@ -98,17 +99,28 @@ class UserDefinedTypeSuite extends QueryTest with SharedSQLContext { Seq(Row(true), Row(true))) } - - test("UDTs with Parquet") { - val tempDir = Utils.createTempDir() - tempDir.delete() - pointsRDD.write.parquet(tempDir.getCanonicalPath) + testStandardAndLegacyModes("UDTs with Parquet") { + withTempPath { dir => + val path = dir.getCanonicalPath + pointsRDD.write.parquet(path) + checkAnswer( + sqlContext.read.parquet(path), + Seq( + Row(1.0, new MyDenseVector(Array(0.1, 1.0))), + Row(0.0, new MyDenseVector(Array(0.2, 2.0))))) + } } - test("Repartition UDTs with Parquet") { - val tempDir = Utils.createTempDir() - tempDir.delete() - pointsRDD.repartition(1).write.parquet(tempDir.getCanonicalPath) + testStandardAndLegacyModes("Repartition UDTs with Parquet") { + withTempPath { dir => + val path = dir.getCanonicalPath + pointsRDD.repartition(1).write.parquet(path) + checkAnswer( + sqlContext.read.parquet(path), + Seq( + Row(1.0, new MyDenseVector(Array(0.1, 1.0))), + Row(0.0, new MyDenseVector(Array(0.2, 2.0))))) + } } // Tests to make sure that all operators correctly convert types on the way out. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 1c1cfa34ad04..cc02ef81c9f8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -484,7 +484,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext } } - test("SPARK-10301 requested schema clipping - UDT") { + testStandardAndLegacyModes("SPARK-10301 requested schema clipping - UDT") { withTempPath { dir => val path = dir.getCanonicalPath @@ -517,6 +517,50 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext Row(Row(NestedStruct(1, 2L, 3.5D)))) } } + + test("expand UDT in StructType") { + val schema = new StructType().add("n", new NestedStructUDT, nullable = true) + val expected = new StructType().add("n", new NestedStructUDT().sqlType, nullable = true) + assert(CatalystReadSupport.expandUDT(schema) === expected) + } + + test("expand UDT in ArrayType") { + val schema = new StructType().add( + "n", + ArrayType( + elementType = new NestedStructUDT, + containsNull = false), + nullable = true) + + val expected = new StructType().add( + "n", + ArrayType( + elementType = new NestedStructUDT().sqlType, + containsNull = false), + nullable = true) + + assert(CatalystReadSupport.expandUDT(schema) === expected) + } + + test("expand UDT in MapType") { + val schema = new StructType().add( + "n", + MapType( + keyType = IntegerType, + valueType = new NestedStructUDT, + valueContainsNull = false), + nullable = true) + + val expected = new StructType().add( + "n", + MapType( + keyType = IntegerType, + valueType = new NestedStructUDT().sqlType, + valueContainsNull = false), + nullable = true) + + assert(CatalystReadSupport.expandUDT(schema) === expected) + } } object TestingUDT { From a680f09320aabd65f64fe071ed6b697862500eb9 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 6 Oct 2015 12:01:13 -0700 Subject: [PATCH 04/12] Stops using MutableRow when writing maps to avoid boxing --- .../parquet/CatalystWriteSupport.scala | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala index 20e0fcd1e18c..d3ee9436308e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala @@ -339,11 +339,8 @@ private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] wi } private def makeMapWriter(mapType: MapType): ValueWriter = { - val keyType = mapType.keyType - val valueType = mapType.valueType - val keyWriter = makeWriter(keyType) - val valueWriter = makeWriter(valueType) - val mutableRow = new SpecificMutableRow(keyType :: valueType :: Nil) + val keyWriter = makeWriter(mapType.keyType) + val valueWriter = makeWriter(mapType.valueType) val repeatedGroupName = if (writeLegacyParquetFormat) { // Legacy mode: // @@ -370,6 +367,9 @@ private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] wi (row: SpecializedGetters, ordinal: Int) => { val map = row.getMap(ordinal) + val keyArray = map.keyArray() + val valueArray = map.valueArray() + consumeGroup { // Only creates the repeated field if the map is non-empty. if (map.numElements() > 0) { @@ -377,16 +377,14 @@ private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] wi var i = 0 while (i < map.numElements()) { consumeGroup { - mutableRow.update(0, map.keyArray().get(i, keyType)) consumeField("key", 0) { - keyWriter.apply(mutableRow, 0) + keyWriter.apply(keyArray, i) } // Only creates the "value" field if the value if non-empty if (!map.valueArray().isNullAt(i)) { - mutableRow.update(1, map.valueArray().get(i, valueType)) consumeField("value", 1) { - valueWriter.apply(mutableRow, 1) + valueWriter.apply(valueArray, i) } } } From 5b08a20ead9d401232855bb2af54ae54696ef17c Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 7 Oct 2015 16:23:38 -0700 Subject: [PATCH 05/12] Removes more dead code --- .../parquet/CatalystRowConverter.scala | 1 - .../datasources/parquet/ParquetFilters.scala | 34 ------------------- 2 files changed, 35 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala index fb1b60a889dc..247d35363b86 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala @@ -27,7 +27,6 @@ import org.apache.parquet.column.Dictionary import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter} import org.apache.parquet.schema.OriginalType.{INT_32, LIST, UTF8} import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE -import org.apache.parquet.schema.Type.Repetition import org.apache.parquet.schema.{GroupType, MessageType, PrimitiveType, Type} import org.apache.spark.Logging diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index c6b3fe7900da..7a3d1248cb6b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -18,18 +18,13 @@ package org.apache.spark.sql.execution.datasources.parquet import java.io.Serializable -import java.nio.ByteBuffer -import com.google.common.io.BaseEncoding -import org.apache.hadoop.conf.Configuration import org.apache.parquet.filter2.predicate.FilterApi._ import org.apache.parquet.filter2.predicate._ import org.apache.parquet.io.api.Binary import org.apache.parquet.schema.OriginalType import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName -import org.apache.spark.SparkEnv -import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.sources import org.apache.spark.sql.types._ @@ -282,33 +277,4 @@ private[sql] object ParquetFilters { addMethod.setAccessible(true) addMethod.invoke(null, classOf[Binary], enumTypeDescriptor) } - - /** - * Note: Inside the Hadoop API we only have access to `Configuration`, not to - * [[org.apache.spark.SparkContext]], so we cannot use broadcasts to convey - * the actual filter predicate. - */ - def serializeFilterExpressions(filters: Seq[Expression], conf: Configuration): Unit = { - if (filters.nonEmpty) { - val serialized: Array[Byte] = - SparkEnv.get.closureSerializer.newInstance().serialize(filters).array() - val encoded: String = BaseEncoding.base64().encode(serialized) - conf.set(PARQUET_FILTER_DATA, encoded) - } - } - - /** - * Note: Inside the Hadoop API we only have access to `Configuration`, not to - * [[org.apache.spark.SparkContext]], so we cannot use broadcasts to convey - * the actual filter predicate. - */ - def deserializeFilterExpressions(conf: Configuration): Seq[Expression] = { - val data = conf.get(PARQUET_FILTER_DATA) - if (data != null) { - val decoded: Array[Byte] = BaseEncoding.base64().decode(data) - SparkEnv.get.closureSerializer.newInstance().deserialize(ByteBuffer.wrap(decoded)) - } else { - Seq() - } - } } From f03ef93e06c1241c69b49dd89d0b155b7ef87019 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 7 Oct 2015 16:54:52 -0700 Subject: [PATCH 06/12] Fixes a bug when writing small decimals coming from rows that are not UnsafeRow --- .../parquet/CatalystWriteSupport.scala | 12 ++++++++---- .../datasources/parquet/ParquetIOSuite.scala | 15 ++++++++------- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala index d3ee9436308e..2134f148c101 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala @@ -197,12 +197,16 @@ private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] wi val numBytes = minBytesForPrecision(precision) val int32Writer = - (row: SpecializedGetters, ordinal: Int) => - recordConsumer.addInteger(row.getLong(ordinal).toInt) + (row: SpecializedGetters, ordinal: Int) => { + val unscaledLong = row.getDecimal(ordinal, precision, scale).toUnscaledLong + recordConsumer.addInteger(unscaledLong.toInt) + } val int64Writer = - (row: SpecializedGetters, ordinal: Int) => - recordConsumer.addLong(row.getLong(ordinal)) + (row: SpecializedGetters, ordinal: Int) => { + val unscaledLong = row.getDecimal(ordinal, precision, scale).toUnscaledLong + recordConsumer.addLong(unscaledLong) + } val binaryWriterUsingUnscaledLong = (row: SpecializedGetters, ordinal: Int) => { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 4c4e3a45811f..76b60eadb60c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -100,13 +100,14 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { } testStandardAndLegacyModes("fixed-length decimals") { - def makeDecimalRDD(decimal: DecimalType): DataFrame = - sparkContext - .parallelize(0 to 1000) - .map(i => Tuple1((i - 500) / 100.0)) - .toDF() - // Parquet doesn't allow column names with spaces, have to add an alias here - .select($"_1" cast decimal as "dec") + def makeDecimalRDD(decimal: DecimalType): DataFrame = { + sqlContext + .range(1000) + // Parquet doesn't allow column names with spaces, have to add an alias here. + // Minus 500 here so that negative decimals are also tested. + .select((('id - 500) / 100.0) cast decimal as 'dec) + .coalesce(1) + } val combinations = Seq((5, 2), (1, 0), (1, 1), (18, 10), (18, 17), (19, 0), (38, 37)) for ((precision, scale) <- combinations) { From 2bc5ebcf8c473817af68b056f772eb77a48acb07 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 7 Oct 2015 17:00:42 -0700 Subject: [PATCH 07/12] Addresses comments --- .../scala/org/apache/spark/sql/SQLConf.scala | 5 ++--- .../parquet/CatalystSchemaConverter.scala | 12 ++++++------ .../parquet/CatalystWriteSupport.scala | 4 ++-- .../parquet/ParquetSchemaSuite.scala | 18 +++++++++--------- 4 files changed, 19 insertions(+), 20 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index e7bbc7d5db49..b2ac19ad48fb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -292,10 +292,9 @@ private[spark] object SQLConf { val PARQUET_WRITE_LEGACY_FORMAT = booleanConf( key = "spark.sql.parquet.writeLegacyFormat", - defaultValue = Some(true), + defaultValue = Some(false), doc = "Whether to follow Parquet's format specification when converting Parquet schema to " + - "Spark SQL schema and vice versa.", - isPublic = false) + "Spark SQL schema and vice versa.") val PARQUET_OUTPUT_COMMITTER_CLASS = stringConf( key = "spark.sql.parquet.output.committer.class", diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala index 77eabb9bd76a..96e622f8b33a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala @@ -47,7 +47,7 @@ import org.apache.spark.sql.{AnalysisException, SQLConf} * [[StructType]]. Note that Spark SQL [[TimestampType]] is similar to Hive timestamp, which * has optional nanosecond precision, but different from `TIME_MILLS` and `TIMESTAMP_MILLIS` * described in Parquet format spec. This argument only affects Parquet read path. - * @param writeLegacyParquetFormat Whether to use legacy Parquet format compatible with Spark 1.5 + * @param writeLegacyParquetFormat Whether to use legacy Parquet format compatible with Spark 1.4 * and prior versions when converting a Catalyst [[StructType]] to a Parquet [[MessageType]]. * When set to false, use standard format defined in parquet-format spec. This argument only * affects Parquet write path. @@ -356,7 +356,7 @@ private[parquet] class CatalystSchemaConverter( // `TIMESTAMP_MICROS` which are both logical types annotating `INT64`. // // Originally, Spark SQL uses the same nanosecond timestamp type as Impala and Hive. Starting - // from Spark 1.5.0, we resort to a timestamp type with 100 ns precision so that we can store + // from Spark 1.4.0, we resort to a timestamp type with 100 ns precision so that we can store // a timestamp into a `Long`. This design decision is subject to change though, for example, // we may resort to microsecond precision in the future. // @@ -375,7 +375,7 @@ private[parquet] class CatalystSchemaConverter( // Decimals (legacy mode) // ====================== - // Spark 1.5.x and prior versions only support decimals with a maximum precision of 18 and + // Spark 1.4.x and prior versions only support decimals with a maximum precision of 18 and // always store decimals in fixed-length byte arrays. To keep compatibility with these older // versions, here we convert decimals with all precisions to `FIXED_LEN_BYTE_ARRAY` annotated // by `DECIMAL`. @@ -426,7 +426,7 @@ private[parquet] class CatalystSchemaConverter( // ArrayType and MapType (legacy mode) // =================================== - // Spark 1.5.x and prior versions convert `ArrayType` with nullable elements into a 3-level + // Spark 1.4.x and prior versions convert `ArrayType` with nullable elements into a 3-level // `LIST` structure. This behavior is somewhat a hybrid of parquet-hive and parquet-avro // (1.6.0rc3): the 3-level structure is similar to parquet-hive while the 3rd level element // field name "array" is borrowed from parquet-avro. @@ -445,7 +445,7 @@ private[parquet] class CatalystSchemaConverter( .addField(convertField(StructField("array", elementType, nullable))) .named("bag")) - // Spark 1.5.x and prior versions convert ArrayType with non-nullable elements into a 2-level + // Spark 1.4.x and prior versions convert ArrayType with non-nullable elements into a 2-level // LIST structure. This behavior mimics parquet-avro (1.6.0rc3). Note that this case is // covered by the backwards-compatibility rules implemented in `isElementType()`. case ArrayType(elementType, nullable @ false) if writeLegacyParquetFormat => @@ -458,7 +458,7 @@ private[parquet] class CatalystSchemaConverter( // "array" is the name chosen by parquet-avro (1.7.0 and prior version) convertField(StructField("array", elementType, nullable), REPEATED)) - // Spark 1.5.x and prior versions convert MapType into a 3-level group annotated by + // Spark 1.4.x and prior versions convert MapType into a 3-level group annotated by // MAP_KEY_VALUE. This is covered by `convertGroupField(field: GroupType): DataType`. case MapType(keyType, valueType, valueContainsNull) if writeLegacyParquetFormat => // group (MAP) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala index 2134f148c101..7b58ef390af7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala @@ -42,7 +42,7 @@ import org.apache.spark.sql.types._ * messages. This class can write Parquet data in two modes: * * - Standard mode: Parquet data are written in standard format defined in parquet-format spec. - * - Legacy mode: Parquet data are written in legacy format compatible with Spark 1.5 and prior. + * - Legacy mode: Parquet data are written in legacy format compatible with Spark 1.4 and prior. * * This behavior can be controlled by SQL option `spark.sql.parquet.writeLegacyParquetFormat`. The * value of the option is propagated to this class by the `init()` method and its Hadoop @@ -63,7 +63,7 @@ private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] wi // The Parquet `RecordConsumer` to which all `InternalRow`s are written private var recordConsumer: RecordConsumer = _ - // Whether to write data in legacy Parquet format compatible with Spark 1.5 and prior versions + // Whether to write data in legacy Parquet format compatible with Spark 1.4 and prior versions private var writeLegacyParquetFormat: Boolean = _ // Reusable byte array used to write timestamps as Parquet INT96 values diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala index 128b59edede4..60fa81b1ab81 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -665,7 +665,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { writeLegacyParquetFormat = false) testCatalystToParquet( - "Backwards-compatibility: LIST with nullable element type - 2 - prior to 1.5.x", + "Backwards-compatibility: LIST with nullable element type - 2 - prior to 1.4.x", StructType(Seq( StructField( "f1", @@ -703,7 +703,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { writeLegacyParquetFormat = false) testCatalystToParquet( - "Backwards-compatibility: LIST with non-nullable element type - 2 - prior to 1.5.x", + "Backwards-compatibility: LIST with non-nullable element type - 2 - prior to 1.4.x", StructType(Seq( StructField( "f1", @@ -764,7 +764,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { writeLegacyParquetFormat = true) testParquetToCatalyst( - "Backwards-compatibility: MAP with non-nullable value type - 3 - prior to 1.5.x", + "Backwards-compatibility: MAP with non-nullable value type - 3 - prior to 1.4.x", StructType(Seq( StructField( "f1", @@ -868,7 +868,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { writeLegacyParquetFormat = false) testCatalystToParquet( - "Backwards-compatibility: MAP with non-nullable value type - 2 - prior to 1.5.x", + "Backwards-compatibility: MAP with non-nullable value type - 2 - prior to 1.4.x", StructType(Seq( StructField( "f1", @@ -908,7 +908,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { writeLegacyParquetFormat = false) testCatalystToParquet( - "Backwards-compatibility: MAP with nullable value type - 3 - prior to 1.5.x", + "Backwards-compatibility: MAP with nullable value type - 3 - prior to 1.4.x", StructType(Seq( StructField( "f1", @@ -987,7 +987,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { writeLegacyParquetFormat = false) testSchema( - "DECIMAL(1, 0) - prior to 1.5.x", + "DECIMAL(1, 0) - prior to 1.4.x", StructType(Seq(StructField("f1", DecimalType(1, 0)))), """message root { | optional fixed_len_byte_array(1) f1 (DECIMAL(1, 0)); @@ -998,7 +998,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { writeLegacyParquetFormat = true) testSchema( - "DECIMAL(8, 3) - prior to 1.5.x", + "DECIMAL(8, 3) - prior to 1.4.x", StructType(Seq(StructField("f1", DecimalType(8, 3)))), """message root { | optional fixed_len_byte_array(4) f1 (DECIMAL(8, 3)); @@ -1009,7 +1009,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { writeLegacyParquetFormat = true) testSchema( - "DECIMAL(9, 3) - prior to 1.5.x", + "DECIMAL(9, 3) - prior to 1.4.x", StructType(Seq(StructField("f1", DecimalType(9, 3)))), """message root { | optional fixed_len_byte_array(5) f1 (DECIMAL(9, 3)); @@ -1020,7 +1020,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { writeLegacyParquetFormat = true) testSchema( - "DECIMAL(18, 3) - prior to 1.5.x", + "DECIMAL(18, 3) - prior to 1.4.x", StructType(Seq(StructField("f1", DecimalType(18, 3)))), """message root { | optional fixed_len_byte_array(8) f1 (DECIMAL(18, 3)); From c542ae93eef917f310a3331a0bdf90cb6260db4f Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 7 Oct 2015 17:29:08 -0700 Subject: [PATCH 08/12] Fixes comment typo and removes an unused import --- .../datasources/parquet/CatalystWriteSupport.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala index 7b58ef390af7..d768126c6ffa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala @@ -32,7 +32,7 @@ import org.apache.parquet.io.api.{Binary, RecordConsumer} import org.apache.spark.Logging import org.apache.spark.sql.SQLConf import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{SpecializedGetters, SpecificMutableRow} +import org.apache.spark.sql.catalyst.expressions.SpecializedGetters import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.{MAX_PRECISION_FOR_INT32, MAX_PRECISION_FOR_INT64, minBytesForPrecision} import org.apache.spark.sql.types._ @@ -44,9 +44,9 @@ import org.apache.spark.sql.types._ * - Standard mode: Parquet data are written in standard format defined in parquet-format spec. * - Legacy mode: Parquet data are written in legacy format compatible with Spark 1.4 and prior. * - * This behavior can be controlled by SQL option `spark.sql.parquet.writeLegacyParquetFormat`. The - * value of the option is propagated to this class by the `init()` method and its Hadoop - * configuration argument. + * This behavior can be controlled by SQL option `spark.sql.parquet.writeLegacyFormat`. The value + * of this option is propagated to this class by the `init()` method and its Hadoop configuration + * argument. */ private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] with Logging { // A `ValueWriter` is responsible for writing a field of an `InternalRow` to the record consumer. From af50f9c96f11f2413d0ba59cba3dc104f2a159be Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 7 Oct 2015 17:57:13 -0700 Subject: [PATCH 09/12] One more dead code :) --- .../sql/execution/datasources/parquet/ParquetFilters.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 7a3d1248cb6b..78040d99fb0a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -29,8 +29,6 @@ import org.apache.spark.sql.sources import org.apache.spark.sql.types._ private[sql] object ParquetFilters { - val PARQUET_FILTER_DATA = "org.apache.spark.sql.parquet.row.filter" - case class SetInFilter[T <: Comparable[T]]( valueSet: Set[T]) extends UserDefinedPredicate[T] with Serializable { From e67d0b1082b9a9a0b268875826889c8a945e0109 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 7 Oct 2015 23:22:35 -0700 Subject: [PATCH 10/12] Fixes test cases --- .../sql/hive/HiveMetastoreCatalogSuite.scala | 28 +++++++++++-------- 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala index 107457f79ec0..d63f3d399652 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala @@ -20,11 +20,11 @@ package org.apache.spark.sql.hive import java.io.File import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.{QueryTest, Row, SaveMode} import org.apache.spark.sql.hive.client.{ExternalTable, ManagedTable} import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.test.{ExamplePointUDT, SQLTestUtils} import org.apache.spark.sql.types.{DecimalType, StringType, StructType} +import org.apache.spark.sql.{SQLConf, QueryTest, Row, SaveMode} class HiveMetastoreCatalogSuite extends SparkFunSuite with TestHiveSingleton { import hiveContext.implicits._ @@ -74,11 +74,13 @@ class DataSourceWithHiveMetastoreCatalogSuite ).foreach { case (provider, (inputFormat, outputFormat, serde)) => test(s"Persist non-partitioned $provider relation into metastore as managed table") { withTable("t") { - testDF - .write - .mode(SaveMode.Overwrite) - .format(provider) - .saveAsTable("t") + withSQLConf(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> "true") { + testDF + .write + .mode(SaveMode.Overwrite) + .format(provider) + .saveAsTable("t") + } val hiveTable = catalog.client.getTable("default", "t") assert(hiveTable.inputFormat === Some(inputFormat)) @@ -102,12 +104,14 @@ class DataSourceWithHiveMetastoreCatalogSuite withTable("t") { val path = dir.getCanonicalFile - testDF - .write - .mode(SaveMode.Overwrite) - .format(provider) - .option("path", path.toString) - .saveAsTable("t") + withSQLConf(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> "true") { + testDF + .write + .mode(SaveMode.Overwrite) + .format(provider) + .option("path", path.toString) + .saveAsTable("t") + } val hiveTable = catalog.client.getTable("default", "t") assert(hiveTable.inputFormat === Some(inputFormat)) From db79fb6dad46dd502017099fe883a29ee9941a9b Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Thu, 8 Oct 2015 10:54:15 -0700 Subject: [PATCH 11/12] Addresses comments --- .../datasources/parquet/CatalystSchemaConverter.scala | 2 +- .../datasources/parquet/CatalystWriteSupport.scala | 8 ++++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala index 96e622f8b33a..7f3394c20ed3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala @@ -356,7 +356,7 @@ private[parquet] class CatalystSchemaConverter( // `TIMESTAMP_MICROS` which are both logical types annotating `INT64`. // // Originally, Spark SQL uses the same nanosecond timestamp type as Impala and Hive. Starting - // from Spark 1.4.0, we resort to a timestamp type with 100 ns precision so that we can store + // from Spark 1.5.0, we resort to a timestamp type with 100 ns precision so that we can store // a timestamp into a `Long`. This design decision is subject to change though, for example, // we may resort to microsecond precision in the future. // diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala index d768126c6ffa..483363d2c1a2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala @@ -100,7 +100,9 @@ private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] wi } override def write(row: InternalRow): Unit = { - consumeMessage(writeFields(row, schema, rootFieldWriters)) + consumeMessage { + writeFields(row, schema, rootFieldWriters) + } } private def writeFields( @@ -176,7 +178,9 @@ private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] wi case t: StructType => val fieldWriters = t.map(_.dataType).map(makeWriter) (row: SpecializedGetters, ordinal: Int) => - consumeGroup(writeFields(row.getStruct(ordinal, t.length), t, fieldWriters)) + consumeGroup { + writeFields(row.getStruct(ordinal, t.length), t, fieldWriters) + } case t: ArrayType => makeArrayWriter(t) From fb6ee9fc2d39688dcbdc55398013324ede708848 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Thu, 8 Oct 2015 11:45:16 -0700 Subject: [PATCH 12/12] Removes redundant test case --- .../datasources/parquet/ParquetIOSuite.scala | 22 ------------------- 1 file changed, 22 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 76b60eadb60c..599cf948e76a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -178,28 +178,6 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { } } - test("read standard Parquet file under legacy mode") { - withTempPath { dir => - val path = dir.getCanonicalPath - - withSQLConf(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> "false") { - sqlContext - .range(4) - .selectExpr("NAMED_STRUCT('a', id, 'b', ARRAY(id, id + 1, id + 2)) AS s") - .write - .parquet(path) - } - - withSQLConf(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> "true") { - checkAnswer( - sqlContext.read.parquet(path), - (0 until 4).map { - id => Row(Row(id, Seq(id, id + 1, id + 2))) - }) - } - } - } - test("nulls") { val allNulls = ( null.asInstanceOf[java.lang.Boolean],