From 77c9d58da4edb21b1551dd3482d67830875b0970 Mon Sep 17 00:00:00 2001 From: Yash Datta Date: Tue, 31 Mar 2015 18:41:40 +0530 Subject: [PATCH 1/4] SPARK-6632: Read schema from each input split in the ReadSupport hook, reconciling with the metastore schema at that time --- .../sql/parquet/ParquetTableSupport.scala | 22 ++++++++++++++++++- .../apache/spark/sql/parquet/newParquet.scala | 10 ++++----- 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala index 5a1b15490d27..09fc5ba77eea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala @@ -98,12 +98,32 @@ private[parquet] class RowReadSupport extends ReadSupport[Row] with Logging { val metadata = new JHashMap[String, String]() val requestedAttributes = RowReadSupport.getRequestedSchema(configuration) + // convert fileSchema to attributes + val fileAttributes = ParquetTypesConverter.convertToAttributes(fileSchema, true, true) + val fileAttMap = fileAttributes.map(f => f.name.toLowerCase -> f.name).toMap + if (requestedAttributes != null) { + // reconcile names of requested Attributes + val modRequestedAttributes = requestedAttributes.map(attr => { + val lName = attr.name.toLowerCase + if (fileAttMap.contains(lName)) { + attr.withName(fileAttMap(lName)) + } else { + if (attr.nullable) { + attr + } else { + //field is not nullable but not present in the parquet file schema!! + //this is just a safety check since in hive all columns are nullable + // throw exception here + throw new RuntimeException(s"""Field ${attr.name} is non-nullable, + but not found in parquet file schema: ${fileSchema}""".stripMargin) + }}}) + // If the parquet file is thrift derived, there is a good chance that // it will have the thrift class in metadata. val isThriftDerived = keyValueMetaData.keySet().contains("thrift.class") parquetSchema = ParquetTypesConverter - .convertFromAttributes(requestedAttributes, isThriftDerived) + .convertFromAttributes(modRequestedAttributes, isThriftDerived) metadata.put( RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA, ParquetTypesConverter.convertToString(requestedAttributes)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 20fdf5e58ef8..debdd578396a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -316,14 +316,15 @@ private[sql] case class ParquetRelation2( } // To get the schema. We first try to get the schema defined in maybeSchema. - // If maybeSchema is not defined, we will try to get the schema from existing parquet data - // (through readSchema). If data does not exist, we will try to get the schema defined in + // If maybeSchema is not defined, we will try to get the schema defined in // maybeMetastoreSchema (defined in the options of the data source). - // Finally, if we still could not get the schema. We throw an error. + // If that is not supplied, we will try to get the schema from existing parquet data + // (through readSchema). If data does not exist and we still could not get the schema, + // We throw an error. parquetSchema = maybeSchema - .orElse(readSchema()) .orElse(maybeMetastoreSchema) + .orElse(readSchema()) .getOrElse(sys.error("Failed to get the schema.")) partitionKeysIncludedInParquetSchema = @@ -340,7 +341,6 @@ private[sql] case class ParquetRelation2( // If this Parquet relation is converted from a Hive Metastore table, must reconcile case // insensitivity issue and possible schema mismatch. maybeMetastoreSchema - .map(ParquetRelation2.mergeMetastoreParquetSchema(_, fullRelationSchema)) .getOrElse(fullRelationSchema) } } From 958082316a7179b66585bd2c4d0e43bc6dd9c5de Mon Sep 17 00:00:00 2001 From: Yash Datta Date: Sat, 4 Apr 2015 23:20:56 +0530 Subject: [PATCH 2/4] SPARK-6632: Fix whitespace --- .../org/apache/spark/sql/parquet/ParquetTableSupport.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala index 09fc5ba77eea..205ede9b9a91 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala @@ -112,8 +112,8 @@ private[parquet] class RowReadSupport extends ReadSupport[Row] with Logging { if (attr.nullable) { attr } else { - //field is not nullable but not present in the parquet file schema!! - //this is just a safety check since in hive all columns are nullable + // field is not nullable but not present in the parquet file schema!! + // this is just a safety check since in hive all columns are nullable // throw exception here throw new RuntimeException(s"""Field ${attr.name} is non-nullable, but not found in parquet file schema: ${fileSchema}""".stripMargin) From 20e0825f362b4ad5cd788f9782c031e47b89dea3 Mon Sep 17 00:00:00 2001 From: Yash Datta Date: Sun, 12 Apr 2015 17:18:40 +0530 Subject: [PATCH 3/4] SPARK-6632: maybeMetastoreSchema should only contain non-partitioning columns, fullschema is derived later within ParquetRelation2 This is done so that partitionKeysIncludedInParquetSchema is computed correctly later on --- .../org/apache/spark/sql/parquet/newParquet.scala | 10 +++------- .../apache/spark/sql/hive/HiveMetastoreCatalog.scala | 3 ++- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index debdd578396a..7ca186927915 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -329,19 +329,15 @@ private[sql] case class ParquetRelation2( partitionKeysIncludedInParquetSchema = isPartitioned && - partitionColumns.forall(f => parquetSchema.fieldNames.contains(f.name)) + partitionColumns.forall(f => parquetSchema.fieldNames.contains(f.name)) + // Reconcile the schema later schema = { - val fullRelationSchema = if (partitionKeysIncludedInParquetSchema) { + if (partitionKeysIncludedInParquetSchema) { parquetSchema } else { StructType(parquetSchema.fields ++ partitionColumns.fields) } - - // If this Parquet relation is converted from a Hive Metastore table, must reconcile case - // insensitivity issue and possible schema mismatch. - maybeMetastoreSchema - .getOrElse(fullRelationSchema) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 3ed5c5b03173..859f1fdc2382 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -235,7 +235,8 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with } private def convertToParquetRelation(metastoreRelation: MetastoreRelation): LogicalRelation = { - val metastoreSchema = StructType.fromAttributes(metastoreRelation.output) + // We want to pass schema without partition attributes as these are passed separately + val metastoreSchema = StructType.fromAttributes(metastoreRelation.attributes) val mergeSchema = hive.convertMetastoreParquetWithSchemaMerging // NOTE: Instead of passing Metastore schema directly to `ParquetRelation2`, we have to From 90d77825922e6ac29051febd60cb8a9ed1b9841f Mon Sep 17 00:00:00 2001 From: Yash Datta Date: Sun, 12 Apr 2015 17:22:31 +0530 Subject: [PATCH 4/4] SPARK-6632: Fix whitespace --- .../main/scala/org/apache/spark/sql/parquet/newParquet.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 7ca186927915..0bec40f2e5db 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -329,7 +329,7 @@ private[sql] case class ParquetRelation2( partitionKeysIncludedInParquetSchema = isPartitioned && - partitionColumns.forall(f => parquetSchema.fieldNames.contains(f.name)) + partitionColumns.forall(f => parquetSchema.fieldNames.contains(f.name)) // Reconcile the schema later schema = {