Skip to content
Closed
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
03c3bd9
Refactors Parquet read path to implement backwards-compatibility rules
liancheng Jul 5, 2015
0525346
Removes old Parquet record converters
liancheng Jul 5, 2015
a74fb2c
More comments
liancheng Jul 5, 2015
bcac49f
Removes the 16-byte restriction of decimals
liancheng Jul 5, 2015
6437d4b
Assembles requested schema from Parquet file schema
liancheng Jul 5, 2015
1781dff
Adds test case for SPARK-8811
liancheng Jul 5, 2015
7fb21f1
Reverts an unnecessary debugging change
liancheng Jul 5, 2015
38fe1e7
Adds explicit return type
liancheng Jul 6, 2015
802cbd7
Fixes bugs related to schema merging and empty requested columns
liancheng Jul 6, 2015
884d3e6
Fixes styling issue and reverts unnecessary changes
liancheng Jul 6, 2015
0cc1b37
Fixes MiMa checks
liancheng Jul 6, 2015
a099d3e
More comments
liancheng Jul 6, 2015
06cfe9d
Adds comments about TimestampType handling
liancheng Jul 6, 2015
13b9121
Adds ParquetAvroCompatibilitySuite
liancheng Jul 7, 2015
440f7b3
Adds generated files to .rat-excludes
liancheng Jul 7, 2015
1d390aa
Adds parquet-thrift compatibility test
liancheng Jul 7, 2015
f2208cd
Adds README.md for Thrift/Avro code generation
liancheng Jul 7, 2015
a8f13bb
Using Parquet writer API to do compatibility tests
liancheng Jul 7, 2015
3d7ab36
Fixes .rat-excludes
liancheng Jul 7, 2015
7946ee1
Fixes Scala styling issues
liancheng Jul 7, 2015
926af87
Simplifies Parquet compatibility test suites
liancheng Jul 8, 2015
598c3e8
Adds extra Maven repo for hadoop-lzo, which is a transitive dependenc…
liancheng Jul 8, 2015
b8c1295
Excludes the whole parquet package from MiMa
liancheng Jul 8, 2015
c6fbc06
Removes WIP file committed by mistake
liancheng Jul 8, 2015
360fe18
Adds ParquetHiveCompatibilitySuite
liancheng Jul 8, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fixes bugs related to schema merging and empty requested columns
  • Loading branch information
liancheng committed Jul 8, 2015
commit 802cbd75f3105a7ba2191f3423d9d7ab5dd8ca16
Original file line number Diff line number Diff line change
Expand Up @@ -63,46 +63,108 @@ private[parquet] class RowReadSupport extends ReadSupport[InternalRow] with Logg
log.debug(s"Preparing for read Parquet file with message type: $fileSchema")

val toCatalyst = new CatalystSchemaConverter(conf)
val parquetSchema = readContext.getRequestedSchema
val catalystSchema =
Option(readContext.getReadSupportMetadata)
.map(_.toMap)
.flatMap { metadata =>
metadata
// First tries to read requested schema, which may result from projections
.get(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
// If not available, tries to read Catalyst schema from file metadata. It's only
// available if the target file is written by Spark SQL.
.orElse(metadata.get(RowReadSupport.SPARK_METADATA_KEY))
}
.map(StructType.fromString)
.getOrElse {
logDebug("Catalyst schema not available, falling back to Parquet message type")
toCatalyst.convert(parquetSchema)
}
val parquetRequestedSchema = readContext.getRequestedSchema

val catalystRequestedSchema =
Option(readContext.getReadSupportMetadata).map(_.toMap).flatMap { metadata =>
metadata
// First tries to read requested schema, which may result from projections
.get(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
// If not available, tries to read Catalyst schema from file metadata. It's only
// available if the target file is written by Spark SQL.
.orElse(metadata.get(RowReadSupport.SPARK_METADATA_KEY))
}.map(StructType.fromString).getOrElse {
logDebug("Catalyst schema not available, falling back to Parquet schema")
toCatalyst.convert(parquetRequestedSchema)
}

logDebug(s"Catalyst schema used to read Parquet files: $catalystSchema")
new RowRecordMaterializer(parquetSchema, catalystSchema)
logDebug(s"Catalyst schema used to read Parquet files: $catalystRequestedSchema")
new RowRecordMaterializer(parquetRequestedSchema, catalystRequestedSchema)
}

override def init(context: InitContext): ReadContext = {
val conf = context.getConfiguration

// If the target file was written by Spark SQL, we should be able to find a serialized Catalyst
// schema of this file from its the metadata.
val maybeRowSchema = Option(conf.get(RowWriteSupport.SPARK_ROW_SCHEMA))

// Optional schema of requested columns, in the form of a string serialized from a Catalyst
// `StructType` containing all requested columns.
val maybeRequestedSchema = Option(conf.get(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA))

// Below we construct a Parquet schema containing all requested columns. This schema tells
// Parquet which columns to read.
//
// If `maybeRequestedSchema` is defined, we assemble an equivalent Parquet schema. Otherwise,
// we have to fallback to the full file schema which contains all columns in the file.
// Obviously this may waste IO bandwidth since it may read more columns than requested.
//
// Two things to note:
//
// 1. It's possible that some requested columns don't exist in the target Parquet file. For
// example, in the case of schema merging, the globally merged schema may contain extra
// columns gathered from other Parquet files. These columns will be simply filled with nulls
// when actually reading the target Parquet file.
//
// 2. When `maybeRequestedSchema` is available, we can't simply convert the Catalyst schema to
// Parquet schema using `CatalystSchemaConverter`, because the mapping is not unique due to
// non-standard behaviors of some Parquet libraries/tools. For example, a Parquet file
// containing a single integer array field `f1` may have the following legacy 2-level
// structure:
//
// message root {
// optional group f1 (LIST) {
// required INT32 element;
// }
// }
//
// while `CatalystSchemaConverter` may generate a standard 3-level structure:
//
// message root {
// optional group f1 (LIST) {
// repeated group list {
// required INT32 element;
// }
// }
// }
//
// Apparently, we can't use the 2nd schema to read the target Parquet file as they have
// different physical structures.
val parquetRequestedSchema =
maybeRequestedSchema.map { schemaString =>
StructType.fromString(schemaString).map { field =>
val fieldType = context.getFileSchema.asGroupType().getType(field.name)
new MessageType("root", fieldType)
}.reduce(_ union _)
}.getOrElse(context.getFileSchema)
maybeRequestedSchema.fold(context.getFileSchema) { schemaString =>
val toParquet = new CatalystSchemaConverter(conf)
val fileSchema = context.getFileSchema.asGroupType()
val fileFieldNames = fileSchema.getFields.map(_.getName).toSet

StructType
// Deserializes the Catalyst schema of requested columns
.fromString(schemaString)
.map { field =>
if (fileFieldNames.contains(field.name)) {
// If the field exists in the target Parquet file, extracts the field type from the
// full file schema and makes a single-field Parquet schema
new MessageType("root", fileSchema.getType(field.name))
} else {
// Otherwise, just resorts to `CatalystSchemaConverter`
toParquet.convert(StructType(Array(field)))
}
}
// Merges all single-field Parquet schemas to form a complete schema for all requested
// columns. Note that it's possible that no columns are requested at all (e.g., count
// some partition column of a partitioned Parquet table). That's why `fold` is used here
// and always fallback to an empty Parquet schema.
.fold(new MessageType("root")) {
_ union _
}
}

val metadata =
Map.empty[String, String] ++
maybeRequestedSchema.map(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA -> _) ++
maybeRowSchema.map(RowWriteSupport.SPARK_ROW_SCHEMA -> _)

logInfo(s"Going to read Parquet file with these requested columns: $parquetRequestedSchema")
new ReadContext(parquetRequestedSchema, metadata)
}
}
Expand Down