Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ final class Decimal extends Ordered[Decimal] with Serializable {
*/
def set(decimal: BigDecimal, precision: Int, scale: Int): Decimal = {
this.decimalVal = decimal.setScale(scale, ROUNDING_MODE)
require(decimalVal.precision <= precision, "Overflowed precision")
require(
decimalVal.precision <= precision,
s"Precision overflow. Max precision: $precision, got: ${decimalVal.precision}")
this.longVal = 0L
this._precision = precision
this._scale = scale
Expand Down
12 changes: 6 additions & 6 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -283,12 +283,12 @@ private[spark] object SQLConf {
defaultValue = Some(true),
doc = "Enables Parquet filter push-down optimization when set to true.")

val PARQUET_FOLLOW_PARQUET_FORMAT_SPEC = booleanConf(
key = "spark.sql.parquet.followParquetFormatSpec",
val PARQUET_WRITE_LEGACY_FORMAT = booleanConf(
key = "spark.sql.parquet.writeLegacyParquetFormat",
defaultValue = Some(false),
doc = "Whether to follow Parquet's format specification when converting Parquet schema to " +
"Spark SQL schema and vice versa.",
isPublic = false)
doc = "When true, writes Parquet data in legacy format compatible with Spark 1.4.0 and prior " +
"versions, instead of the standard one defined in parquet-format spec.",
isPublic = true)

val PARQUET_OUTPUT_COMMITTER_CLASS = stringConf(
key = "spark.sql.parquet.output.committer.class",
Expand Down Expand Up @@ -493,7 +493,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf {

private[spark] def isParquetINT96AsTimestamp: Boolean = getConf(PARQUET_INT96_AS_TIMESTAMP)

private[spark] def followParquetFormatSpec: Boolean = getConf(PARQUET_FOLLOW_PARQUET_FORMAT_SPEC)
private[spark] def writeLegacyParquetFormat: Boolean = getConf(PARQUET_WRITE_LEGACY_FORMAT)

private[spark] def inMemoryPartitionPruning: Boolean = getConf(IN_MEMORY_PARTITION_PRUNING)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.apache.parquet.hadoop.{ParquetFileReader, ParquetFileWriter, ParquetO
*
* NEVER use [[DirectParquetOutputCommitter]] when appending data, because currently there's
* no safe way undo a failed appending job (that's why both `abortTask()` and `abortJob()` are
* left * empty).
* left empty).
*/
private[parquet] class DirectParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
extends ParquetOutputCommitter(outputPath, context) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -31,44 +31,44 @@ import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.StructType

private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with Logging {
private[parquet] class ParquetReadSupport extends ReadSupport[InternalRow] with Logging {
override def prepareForRead(
conf: Configuration,
keyValueMetaData: JMap[String, String],
fileSchema: MessageType,
readContext: ReadContext): RecordMaterializer[InternalRow] = {
log.debug(s"Preparing for read Parquet file with message type: $fileSchema")

val toCatalyst = new CatalystSchemaConverter(conf)
val toCatalyst = new ParquetSchemaConverter(conf)
val parquetRequestedSchema = readContext.getRequestedSchema

val catalystRequestedSchema =
Option(readContext.getReadSupportMetadata).map(_.toMap).flatMap { metadata =>
metadata
// First tries to read requested schema, which may result from projections
.get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
.get(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
// If not available, tries to read Catalyst schema from file metadata. It's only
// available if the target file is written by Spark SQL.
.orElse(metadata.get(CatalystReadSupport.SPARK_METADATA_KEY))
.orElse(metadata.get(ParquetReadSupport.SPARK_METADATA_KEY))
}.map(StructType.fromString).getOrElse {
logDebug("Catalyst schema not available, falling back to Parquet schema")
toCatalyst.convert(parquetRequestedSchema)
}

logDebug(s"Catalyst schema used to read Parquet files: $catalystRequestedSchema")
new CatalystRecordMaterializer(parquetRequestedSchema, catalystRequestedSchema)
new ParquetRecordMaterializer(parquetRequestedSchema, catalystRequestedSchema)
}

override def init(context: InitContext): ReadContext = {
val conf = context.getConfiguration

// If the target file was written by Spark SQL, we should be able to find a serialized Catalyst
// schema of this file from its the metadata.
val maybeRowSchema = Option(conf.get(RowWriteSupport.SPARK_ROW_SCHEMA))
val maybeRowSchema = Option(conf.get(ParquetWriteSupport.SPARK_ROW_SCHEMA))

// Optional schema of requested columns, in the form of a string serialized from a Catalyst
// `StructType` containing all requested columns.
val maybeRequestedSchema = Option(conf.get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA))
val maybeRequestedSchema = Option(conf.get(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA))

// Below we construct a Parquet schema containing all requested columns. This schema tells
// Parquet which columns to read.
Expand Down Expand Up @@ -110,7 +110,7 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with
// different physical structures.
val parquetRequestedSchema =
maybeRequestedSchema.fold(context.getFileSchema) { schemaString =>
val toParquet = new CatalystSchemaConverter(conf)
val toParquet = new ParquetSchemaConverter(conf)
val fileSchema = context.getFileSchema.asGroupType()
val fileFieldNames = fileSchema.getFields.map(_.getName).toSet

Expand All @@ -121,7 +121,9 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with
if (fileFieldNames.contains(field.name)) {
// If the field exists in the target Parquet file, extracts the field type from the
// full file schema and makes a single-field Parquet schema
new MessageType("root", fileSchema.getType(field.name))
new MessageType(
ParquetSchemaConverter.SPARK_PARQUET_SCHEMA_NAME,
fileSchema.getType(field.name))
} else {
// Otherwise, just resorts to `CatalystSchemaConverter`
toParquet.convert(StructType(Array(field)))
Expand All @@ -131,22 +133,22 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with
// columns. Note that it's possible that no columns are requested at all (e.g., count
// some partition column of a partitioned Parquet table). That's why `fold` is used here
// and always fallback to an empty Parquet schema.
.fold(new MessageType("root")) {
.fold(new MessageType(ParquetSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)) {
_ union _
}
}

val metadata =
Map.empty[String, String] ++
maybeRequestedSchema.map(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA -> _) ++
maybeRowSchema.map(RowWriteSupport.SPARK_ROW_SCHEMA -> _)
maybeRequestedSchema.map(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA -> _) ++
maybeRowSchema.map(ParquetWriteSupport.SPARK_ROW_SCHEMA -> _)

logInfo(s"Going to read Parquet file with these requested columns: $parquetRequestedSchema")
new ReadContext(parquetRequestedSchema, metadata)
}
}

private[parquet] object CatalystReadSupport {
private[parquet] object ParquetReadSupport {
val SPARK_ROW_REQUESTED_SCHEMA = "org.apache.spark.sql.parquet.row.requested_schema"

val SPARK_METADATA_KEY = "org.apache.spark.sql.parquet.row.metadata"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ import org.apache.spark.sql.types.StructType
* @param parquetSchema Parquet schema of the records to be read
* @param catalystSchema Catalyst schema of the rows to be constructed
*/
private[parquet] class CatalystRecordMaterializer(
private[parquet] class ParquetRecordMaterializer(
parquetSchema: MessageType, catalystSchema: StructType)
extends RecordMaterializer[InternalRow] {

private val rootConverter = new CatalystRowConverter(parquetSchema, catalystSchema, NoopUpdater)
private val rootConverter = new ParquetRowConverter(parquetSchema, catalystSchema, NoopUpdater)

override def getCurrentRecord: InternalRow = rootConverter.currentRow

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,16 @@ import org.apache.parquet.hadoop.{ParquetOutputCommitter, ParquetRecordReader, _
import org.apache.parquet.schema.MessageType
import org.apache.parquet.{Log => ParquetLog}

import org.apache.spark.{Logging, Partition => SparkPartition, SparkException}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.{SqlNewHadoopPartition, SqlNewHadoopRDD, RDD}
import org.apache.spark.rdd.RDD._
import org.apache.spark.rdd.{RDD, SqlNewHadoopPartition, SqlNewHadoopRDD}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.PartitionSpec
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.util.{SerializableConfiguration, Utils}
import org.apache.spark.{Logging, Partition => SparkPartition, SparkException}


private[sql] class DefaultSource extends HadoopFsRelationProvider {
Expand Down Expand Up @@ -228,18 +228,13 @@ private[sql] class ParquetRelation(
// bundled with `ParquetOutputFormat[Row]`.
job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]])

// TODO There's no need to use two kinds of WriteSupport
// We should unify them. `SpecificMutableRow` can process both atomic (primitive) types and
// complex types.
val writeSupportClass =
if (dataSchema.map(_.dataType).forall(ParquetTypesConverter.isPrimitiveType)) {
classOf[MutableRowWriteSupport]
} else {
classOf[RowWriteSupport]
}
ParquetOutputFormat.setWriteSupportClass(job, classOf[ParquetWriteSupport])
ParquetWriteSupport.setSchema(dataSchema, conf)

ParquetOutputFormat.setWriteSupportClass(job, writeSupportClass)
RowWriteSupport.setSchema(dataSchema.toAttributes, conf)
// Sets flag for Parquet schema converter (converting Catalyst schema to Parquet schema)
conf.set(
SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key,
sqlContext.conf.writeLegacyParquetFormat.toString)

// Sets compression scheme
conf.set(
Expand Down Expand Up @@ -267,7 +262,6 @@ private[sql] class ParquetRelation(
val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown
val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString
val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp
val followParquetFormatSpec = sqlContext.conf.followParquetFormatSpec

// Create the function to set variable Parquet confs at both driver and executor side.
val initLocalJobFuncOpt =
Expand All @@ -278,8 +272,7 @@ private[sql] class ParquetRelation(
useMetadataCache,
parquetFilterPushDown,
assumeBinaryIsString,
assumeInt96IsTimestamp,
followParquetFormatSpec) _
assumeInt96IsTimestamp) _

// Create the function to set input paths at the driver side.
val setInputPaths = ParquetRelation.initializeDriverSideJobFunc(inputFiles) _
Expand Down Expand Up @@ -479,10 +472,9 @@ private[sql] object ParquetRelation extends Logging {
useMetadataCache: Boolean,
parquetFilterPushDown: Boolean,
assumeBinaryIsString: Boolean,
assumeInt96IsTimestamp: Boolean,
followParquetFormatSpec: Boolean)(job: Job): Unit = {
assumeInt96IsTimestamp: Boolean)(job: Job): Unit = {
val conf = job.getConfiguration
conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName)
conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName)

// Try to push down filters when filter push-down is enabled.
if (parquetFilterPushDown) {
Expand All @@ -495,22 +487,21 @@ private[sql] object ParquetRelation extends Logging {
.foreach(ParquetInputFormat.setFilterPredicate(conf, _))
}

conf.set(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA, {
conf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, {
val requestedSchema = StructType(requiredColumns.map(dataSchema(_)))
CatalystSchemaConverter.checkFieldNames(requestedSchema).json
ParquetSchemaConverter.checkFieldNames(requestedSchema).json
})

conf.set(
RowWriteSupport.SPARK_ROW_SCHEMA,
CatalystSchemaConverter.checkFieldNames(dataSchema).json)
ParquetWriteSupport.SPARK_ROW_SCHEMA,
ParquetSchemaConverter.checkFieldNames(dataSchema).json)

// Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
conf.setBoolean(SQLConf.PARQUET_CACHE_METADATA.key, useMetadataCache)

// Sets flags for Parquet schema conversion
// Sets flags for Parquet schema converter (converting Parquet schema to Catalyst schema)
conf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key, assumeBinaryIsString)
conf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, assumeInt96IsTimestamp)
conf.setBoolean(SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.key, followParquetFormatSpec)
}

/** This closure sets input paths at the driver side. */
Expand All @@ -527,10 +518,10 @@ private[sql] object ParquetRelation extends Logging {
footers: Seq[Footer], sqlContext: SQLContext): Option[StructType] = {

def parseParquetSchema(schema: MessageType): StructType = {
val converter = new CatalystSchemaConverter(
val converter = new ParquetSchemaConverter(
sqlContext.conf.isParquetBinaryAsString,
sqlContext.conf.isParquetBinaryAsString,
sqlContext.conf.followParquetFormatSpec)
sqlContext.conf.writeLegacyParquetFormat)

converter.convert(schema)
}
Expand All @@ -541,7 +532,7 @@ private[sql] object ParquetRelation extends Logging {
val serializedSchema = metadata
.getKeyValueMetaData
.toMap
.get(CatalystReadSupport.SPARK_METADATA_KEY)
.get(ParquetReadSupport.SPARK_METADATA_KEY)
if (serializedSchema.isEmpty) {
// Falls back to Parquet schema if no Spark SQL schema found.
Some(parseParquetSchema(metadata.getSchema))
Expand Down Expand Up @@ -664,7 +655,7 @@ private[sql] object ParquetRelation extends Logging {
filesToTouch: Seq[FileStatus], sqlContext: SQLContext): Option[StructType] = {
val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString
val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp
val followParquetFormatSpec = sqlContext.conf.followParquetFormatSpec
val writeLegacyParquetFormat = sqlContext.conf.writeLegacyParquetFormat
val serializedConf = new SerializableConfiguration(sqlContext.sparkContext.hadoopConfiguration)

// HACK ALERT:
Expand Down Expand Up @@ -701,10 +692,10 @@ private[sql] object ParquetRelation extends Logging {

// Converter used to convert Parquet `MessageType` to Spark SQL `StructType`
val converter =
new CatalystSchemaConverter(
new ParquetSchemaConverter(
assumeBinaryIsString = assumeBinaryIsString,
assumeInt96IsTimestamp = assumeInt96IsTimestamp,
followParquetFormatSpec = followParquetFormatSpec)
writeLegacyParquetFormat = writeLegacyParquetFormat)

footers.map { footer =>
ParquetRelation.readSchemaFromFooter(footer, converter)
Expand All @@ -720,12 +711,12 @@ private[sql] object ParquetRelation extends Logging {
* a [[StructType]] converted from the [[MessageType]] stored in this footer.
*/
def readSchemaFromFooter(
footer: Footer, converter: CatalystSchemaConverter): StructType = {
footer: Footer, converter: ParquetSchemaConverter): StructType = {
val fileMetaData = footer.getParquetMetadata.getFileMetaData
fileMetaData
.getKeyValueMetaData
.toMap
.get(CatalystReadSupport.SPARK_METADATA_KEY)
.get(ParquetReadSupport.SPARK_METADATA_KEY)
.flatMap(deserializeSchemaString)
.getOrElse(converter.convert(fileMetaData.getSchema))
}
Expand Down
Loading