Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Renames root Parquet message name
  • Loading branch information
liancheng committed Aug 2, 2015
commit 0395e9505942647ce8e4e282223ae7fcdde872a3
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ private[parquet] class ParquetReadSupport extends ReadSupport[InternalRow] with
if (fileFieldNames.contains(field.name)) {
// If the field exists in the target Parquet file, extracts the field type from the
// full file schema and makes a single-field Parquet schema
new MessageType("root", fileSchema.getType(field.name))
new MessageType(
ParquetSchemaConverter.SPARK_PARQUET_SCHEMA_NAME,
fileSchema.getType(field.name))
} else {
// Otherwise, just resorts to `CatalystSchemaConverter`
toParquet.convert(StructType(Array(field)))
Expand All @@ -131,7 +133,7 @@ private[parquet] class ParquetReadSupport extends ReadSupport[InternalRow] with
// columns. Note that it's possible that no columns are requested at all (e.g., count
// some partition column of a partitioned Parquet table). That's why `fold` is used here
// and always fallback to an empty Parquet schema.
.fold(new MessageType("root")) {
.fold(new MessageType(ParquetSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)) {
_ union _
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,10 @@ private[parquet] class ParquetSchemaConverter(
* Converts a Spark SQL [[StructType]] to a Parquet [[MessageType]].
*/
def convert(catalystSchema: StructType): MessageType = {
Types.buildMessage().addFields(catalystSchema.map(convertField): _*).named("root")
Types
.buildMessage()
.addFields(catalystSchema.map(convertField): _*)
.named(ParquetSchemaConverter.SPARK_PARQUET_SCHEMA_NAME)
}

/**
Expand Down Expand Up @@ -541,6 +544,8 @@ private[parquet] class ParquetSchemaConverter(


private[parquet] object ParquetSchemaConverter {
val SPARK_PARQUET_SCHEMA_NAME = "spark_schema"

def checkFieldName(name: String): Unit = {
// ,;{}()\n\t= and space are special characters in Parquet schema
analysisRequire(
Expand Down