Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,32 @@ private[parquet] class RowReadSupport extends ReadSupport[Row] with Logging {
val metadata = new JHashMap[String, String]()
val requestedAttributes = RowReadSupport.getRequestedSchema(configuration)

// convert fileSchema to attributes
val fileAttributes = ParquetTypesConverter.convertToAttributes(fileSchema, true, true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid we can't always set the last two arguments to true, they should be determined according to corresponding SQLConf configurations.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these booleans are for finding the datatype of the attribute, whereas here we are just interested in finding out the names of the columns, to reconcile with metastore schema. Hence it is safe to always send these parameters as true, since we do not have SQL context here from which to derive these.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks for the explanation. Would please also add a comment for this?

val fileAttMap = fileAttributes.map(f => f.name.toLowerCase -> f.name).toMap

if (requestedAttributes != null) {
// reconcile names of requested Attributes
val modRequestedAttributes = requestedAttributes.map(attr => {
val lName = attr.name.toLowerCase
if (fileAttMap.contains(lName)) {
attr.withName(fileAttMap(lName))
} else {
if (attr.nullable) {
attr
} else {
// field is not nullable but not present in the parquet file schema!!
// this is just a safety check since in hive all columns are nullable
// throw exception here
throw new RuntimeException(s"""Field ${attr.name} is non-nullable,
but not found in parquet file schema: ${fileSchema}""".stripMargin)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This exception message would look pretty weird when printed since there isn't a "margin" character (|) in the string. Also we don't want to split this message into multiple lines. Maybe this:

throw new RuntimeException(
  s"Field ${attr.name} is non-nullable, " +
    s"but not found in parquet file schema: ${fileSchema}")

}}})

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we essentially duplicating ParquetRelation2.mergeMetastoreParquetSchema here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, the difference being that this happens within each task, whereas ParquetRelation2.mergeMetastoreParquetSchema happens on the driver. This eliminates the need of mergeMetastoreParquetSchema method

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ParquetRelation2.mergeMetastoreParquetSchema is just a static method, can we just reuse that here? Especially comments for this method and ParquetRelation2.mergeMissingNullableFields are pretty useful. I would like to keep them.

And please don't put multiple }/) on a single line.

// If the parquet file is thrift derived, there is a good chance that
// it will have the thrift class in metadata.
val isThriftDerived = keyValueMetaData.keySet().contains("thrift.class")
parquetSchema = ParquetTypesConverter
.convertFromAttributes(requestedAttributes, isThriftDerived)
.convertFromAttributes(modRequestedAttributes, isThriftDerived)
metadata.put(
RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
ParquetTypesConverter.convertToString(requestedAttributes))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,32 +316,28 @@ private[sql] case class ParquetRelation2(
}

// To get the schema. We first try to get the schema defined in maybeSchema.
// If maybeSchema is not defined, we will try to get the schema from existing parquet data
// (through readSchema). If data does not exist, we will try to get the schema defined in
// If maybeSchema is not defined, we will try to get the schema defined in
// maybeMetastoreSchema (defined in the options of the data source).
// Finally, if we still could not get the schema. We throw an error.
// If that is not supplied, we will try to get the schema from existing parquet data
// (through readSchema). If data does not exist and we still could not get the schema,
// We throw an error.
parquetSchema =
maybeSchema
.orElse(readSchema())
.orElse(maybeMetastoreSchema)
.orElse(readSchema())
.getOrElse(sys.error("Failed to get the schema."))

partitionKeysIncludedInParquetSchema =
isPartitioned &&
partitionColumns.forall(f => parquetSchema.fieldNames.contains(f.name))

// Reconcile the schema later
schema = {
val fullRelationSchema = if (partitionKeysIncludedInParquetSchema) {
if (partitionKeysIncludedInParquetSchema) {
parquetSchema
} else {
StructType(parquetSchema.fields ++ partitionColumns.fields)
}

// If this Parquet relation is converted from a Hive Metastore table, must reconcile case
// insensitivity issue and possible schema mismatch.
maybeMetastoreSchema
.map(ParquetRelation2.mergeMetastoreParquetSchema(_, fullRelationSchema))
.getOrElse(fullRelationSchema)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,8 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
}

private def convertToParquetRelation(metastoreRelation: MetastoreRelation): LogicalRelation = {
val metastoreSchema = StructType.fromAttributes(metastoreRelation.output)
// We want to pass schema without partition attributes as these are passed separately
val metastoreSchema = StructType.fromAttributes(metastoreRelation.attributes)
val mergeSchema = hive.convertMetastoreParquetWithSchemaMerging

// NOTE: Instead of passing Metastore schema directly to `ParquetRelation2`, we have to
Expand Down