diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala index 5a1b15490d27..205ede9b9a91 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala @@ -98,12 +98,32 @@ private[parquet] class RowReadSupport extends ReadSupport[Row] with Logging { val metadata = new JHashMap[String, String]() val requestedAttributes = RowReadSupport.getRequestedSchema(configuration) + // convert fileSchema to attributes + val fileAttributes = ParquetTypesConverter.convertToAttributes(fileSchema, true, true) + val fileAttMap = fileAttributes.map(f => f.name.toLowerCase -> f.name).toMap + if (requestedAttributes != null) { + // reconcile names of requested Attributes + val modRequestedAttributes = requestedAttributes.map(attr => { + val lName = attr.name.toLowerCase + if (fileAttMap.contains(lName)) { + attr.withName(fileAttMap(lName)) + } else { + if (attr.nullable) { + attr + } else { + // field is not nullable but not present in the parquet file schema!! + // this is just a safety check since in hive all columns are nullable + // throw exception here + throw new RuntimeException(s"""Field ${attr.name} is non-nullable, + but not found in parquet file schema: ${fileSchema}""".stripMargin) + }}}) + // If the parquet file is thrift derived, there is a good chance that // it will have the thrift class in metadata. val isThriftDerived = keyValueMetaData.keySet().contains("thrift.class") parquetSchema = ParquetTypesConverter - .convertFromAttributes(requestedAttributes, isThriftDerived) + .convertFromAttributes(modRequestedAttributes, isThriftDerived) metadata.put( RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA, ParquetTypesConverter.convertToString(requestedAttributes)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 20fdf5e58ef8..0bec40f2e5db 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -316,32 +316,28 @@ private[sql] case class ParquetRelation2( } // To get the schema. We first try to get the schema defined in maybeSchema. - // If maybeSchema is not defined, we will try to get the schema from existing parquet data - // (through readSchema). If data does not exist, we will try to get the schema defined in + // If maybeSchema is not defined, we will try to get the schema defined in // maybeMetastoreSchema (defined in the options of the data source). - // Finally, if we still could not get the schema. We throw an error. + // If that is not supplied, we will try to get the schema from existing parquet data + // (through readSchema). If data does not exist and we still could not get the schema, + // We throw an error. parquetSchema = maybeSchema - .orElse(readSchema()) .orElse(maybeMetastoreSchema) + .orElse(readSchema()) .getOrElse(sys.error("Failed to get the schema.")) partitionKeysIncludedInParquetSchema = isPartitioned && partitionColumns.forall(f => parquetSchema.fieldNames.contains(f.name)) + // Reconcile the schema later schema = { - val fullRelationSchema = if (partitionKeysIncludedInParquetSchema) { + if (partitionKeysIncludedInParquetSchema) { parquetSchema } else { StructType(parquetSchema.fields ++ partitionColumns.fields) } - - // If this Parquet relation is converted from a Hive Metastore table, must reconcile case - // insensitivity issue and possible schema mismatch. - maybeMetastoreSchema - .map(ParquetRelation2.mergeMetastoreParquetSchema(_, fullRelationSchema)) - .getOrElse(fullRelationSchema) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 3ed5c5b03173..859f1fdc2382 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -235,7 +235,8 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with } private def convertToParquetRelation(metastoreRelation: MetastoreRelation): LogicalRelation = { - val metastoreSchema = StructType.fromAttributes(metastoreRelation.output) + // We want to pass schema without partition attributes as these are passed separately + val metastoreSchema = StructType.fromAttributes(metastoreRelation.attributes) val mergeSchema = hive.convertMetastoreParquetWithSchemaMerging // NOTE: Instead of passing Metastore schema directly to `ParquetRelation2`, we have to