Skip to content
Closed
Prev Previous commit
Next Next commit
Proper fix.
HiveExternalCatalog.alterTableSchema takes a shortcut by modifying the raw
Hive table metadata instead of the full Spark view; that means it needs to
be aware of whether the table is Hive-compatible or not.

For compatible tables, the current "replace the schema" code is the correct
path, except that an exception in that path should result in an error, and
not in retrying in a different way.

For non-compatible tables, Spark should just update the table properties,
and leave the schema stored in the raw table untouched.

Because Spark doesn't explicitly store metadata about whether a table is
Hive-compatible or not, a new property was added just to make that explicit.
The code tries to detect old DS tables that don't have the property and do
the right thing.

These changes also uncovered a problem with the way case-sensitive DS tables
were being saved to the Hive metastore; the metastore is case-insensitive,
and the code was treating these tables as Hive-compatible if the data source
had a Hive counterpart (e.g. for parquet). In this scenario, the schema
could be corrupted when being updated from Spark if conflicting columns existed
ignoring case. The change fixes this by making case-sensitive DS-tables not
Hive-compatible.
  • Loading branch information
Marcelo Vanzin committed Aug 4, 2017
commit 7ccf4743024a8a447a4b05369f6ebf237cf88c4f
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import org.apache.thrift.TException

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.catalog._
Expand All @@ -43,7 +44,7 @@ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.hive.client.HiveClient
import org.apache.spark.sql.internal.HiveSerDe
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
import org.apache.spark.sql.internal.StaticSQLConf._
import org.apache.spark.sql.types.{DataType, StructType}

Expand Down Expand Up @@ -257,6 +258,20 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
}
}

/**
* Retrieve a configuration value for the current active session, if any.
*/
private def currentSessionConf[T](entry: ConfigEntry[T]): T = {
SparkSession.getActiveSession.orElse(SparkSession.getDefaultSession).map { session =>
session.conf.get(entry)
}.getOrElse {
// If there's no active session, try to read from the SparkConf object instead. Normally
// there should be an active session, but unit tests invoke methods on the catalog directly,
// so that might not be true in some cases.
conf.get(entry)
}
}

private def createDataSourceTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = {
// data source table always have a provider, it's guaranteed by `DDLUtils.isDatasourceTable`.
val provider = table.provider.get
Expand Down Expand Up @@ -288,6 +303,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
// bucket specification to empty. Note that partition columns are retained, so that we can
// call partition-related Hive API later.
def newSparkSQLSpecificMetastoreTable(): CatalogTable = {
val hiveCompatible = Map(DATASOURCE_HIVE_COMPATIBLE -> "false")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good idea if we do this from the first version. But now, for backward compatibility, we have to handle the case without this special flag at read path, then I can't see the point of having this flag.

table.copy(
// Hive only allows directory paths as location URIs while Spark SQL data source tables
// also allow file paths. For non-hive-compatible format, we should not set location URI
Expand All @@ -297,11 +313,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
properties = storagePropsWithLocation),
schema = table.partitionSchema,
bucketSpec = None,
properties = table.properties ++ tableProperties)
properties = table.properties ++ tableProperties ++ hiveCompatible)
}

// converts the table metadata to Hive compatible format, i.e. set the serde information.
def newHiveCompatibleMetastoreTable(serde: HiveSerDe): CatalogTable = {
val hiveCompatible = Map(DATASOURCE_HIVE_COMPATIBLE -> "true")
val location = if (table.tableType == EXTERNAL) {
// When we hit this branch, we are saving an external data source table with hive
// compatible format, which means the data source is file-based and must have a `path`.
Expand All @@ -320,7 +337,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
serde = serde.serde,
properties = storagePropsWithLocation
),
properties = table.properties ++ tableProperties)
properties = table.properties ++ tableProperties ++ hiveCompatible)
}

val qualifiedTableName = table.identifier.quotedString
Expand All @@ -342,6 +359,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
"Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. "
(None, message)

case _ if currentSessionConf(SQLConf.CASE_SENSITIVE) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should look at the schema instead of looking at the config. It's possible that even case sensitive config is on, the column names are all lowercased and it's still hive compatible.

My proposal: checking schema.asLowerCased == schema, if it's false, then it's not hive compatible. We need to add StructType.asLowerCased though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually is this a useful change? In the read path we still need to handle the case that, a hive compatible table have inconsistent schema between table properties and metadata.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I'll remove this change. The write-path change you propose isn't necessary because if you have an "invalid" schema (same column name with different case), the Hive metastore will complain and the table will be stored as non-Hive-compatible.

The problem this was trying to avoid is related to the changes in alterTableSchema; if you create a Hive-compatible table here, then later tried to update it with an invalid schema, you'd have a frankentable because the code in alterTableSchema was wrong.

But since this change is mainly about fixing alterTableSchema, you'll now get a proper error in that case instead of ending up with a potentially corrupted table.

val message =
s"Persisting case sensitive data source table $qualifiedTableName into " +
"Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. "
(None, message)

case Some(serde) =>
val message =
s"Persisting file based data source table $qualifiedTableName into " +
Expand Down Expand Up @@ -386,6 +409,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
* can be used as table properties later.
*/
private def tableMetaToTableProps(table: CatalogTable): mutable.Map[String, String] = {
tableMetaToTableProps(table, table.schema)
}

private def tableMetaToTableProps(
table: CatalogTable,
schema: StructType): mutable.Map[String, String] = {
val partitionColumns = table.partitionColumnNames
val bucketSpec = table.bucketSpec

Expand All @@ -394,7 +423,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
// property. In this case, we split the JSON string and store each part as a separate table
// property.
val threshold = conf.get(SCHEMA_STRING_LENGTH_THRESHOLD)
val schemaJsonString = table.schema.json
val schemaJsonString = schema.json
// Split the JSON string.
val parts = schemaJsonString.grouped(threshold).toSeq
properties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString)
Expand Down Expand Up @@ -611,30 +640,39 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
override def alterTableSchema(db: String, table: String, schema: StructType): Unit = withClient {
requireTableExists(db, table)
val rawTable = getRawTable(db, table)
val withNewSchema = rawTable.copy(schema = schema)
verifyColumnNames(withNewSchema)
// Add table metadata such as table schema, partition columns, etc. to table properties.
val updatedTable = withNewSchema.copy(
properties = withNewSchema.properties ++ tableMetaToTableProps(withNewSchema))

// If it's a data source table, make sure the original schema is left unchanged; the
// actual schema is recorded as a table property.
val tableToStore = if (DDLUtils.isDatasourceTable(updatedTable)) {
updatedTable.copy(schema = rawTable.schema)
val updatedProperties = rawTable.properties ++ tableMetaToTableProps(rawTable, schema)

// Detect whether this is a Hive-compatible table.
val provider = rawTable.properties.get(DATASOURCE_PROVIDER)
val isHiveCompatible = if (provider.isDefined && provider != Some(DDLUtils.HIVE_PROVIDER)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The whole check might not support all the previous versions. We change these flags multiple times. We might break the support of the table metadata created by the previous version of Spark

How about directly comparing the schemas and checks they are Hive compatible. cc @cloud-fan WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, since we still need to handle the case without the special flag for old spark versions, it makes more sense to just detect hive compatibility by comparing the row table schema and the table schema from table properties.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you mean the check below in the case _ => case, right?

I see that both compatible and non-compatible tables set that property, at least in 2.1, so let me see if there's an easy way to differentiate that without having to replicate all the original checks (which may be hard to do at this point).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the check to use the serde instead. The new tests pass even without the explicit check for DATASOURCE_HIVE_COMPATIBLE when doing that, although I prefer leaving the explicit property for clarity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also checked 2.0 and 1.6 and both seem to do the same thing (both set the provider, and both use a different serde for non-compatible tables), so the check should work for those versions too.

Copy link
Member

@gatorsmile gatorsmile Aug 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a test case for the cross-version compatibility checking? I am just afraid it might not work as expected

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We plan to submit a separate PR for verifying all the related cross-version issues. That needs to verify most DDL statements. You can ignore my previous comment. Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Too late now, already added the tests.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you create a separate utility function for isHiveCompatible in HiveExternalCatalog.scala?

rawTable.properties.get(DATASOURCE_HIVE_COMPATIBLE) match {
case Some(value) =>
value.toBoolean
case _ =>
// If the property is not set, the table may have been created by an old version
// of Spark. Those versions set a "path" property in the table's storage descriptor
// for non-Hive-compatible tables, so use that to detect compatibility.
rawTable.storage.properties.get("path").isDefined
}
} else {
updatedTable
// All non-DS tables are treated as regular Hive tables.
true
}

try {
client.alterTable(tableToStore)
} catch {
case NonFatal(e) =>
val warningMessage =
s"Could not alter schema of table ${rawTable.identifier.quotedString} in a Hive " +
"compatible way. Updating Hive metastore in Spark SQL specific format."
logWarning(warningMessage, e)
client.alterTable(updatedTable.copy(schema = tableToStore.partitionSchema))
val updatedTable = if (isHiveCompatible) {
val _updated = rawTable.copy(properties = updatedProperties, schema = schema)
verifyColumnNames(_updated)
_updated
} else {
// If the table is not Hive-compatible, the schema of the table should not be overwritten with
// the updated schema. The previous value stored in the metastore should be preserved; that
// will be either the table's original partition schema, or a placeholder schema inserted by
// the Hive client wrapper if the partition schema was empty.
rawTable.copy(properties = updatedProperties)
}

client.alterTable(updatedTable)
}

override def alterTableStats(
Expand Down Expand Up @@ -1202,6 +1240,7 @@ object HiveExternalCatalog {
val DATASOURCE_SCHEMA_PARTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "partCol."
val DATASOURCE_SCHEMA_BUCKETCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "bucketCol."
val DATASOURCE_SCHEMA_SORTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "sortCol."
val DATASOURCE_HIVE_COMPATIBLE = SPARK_SQL_PREFIX + "hive.compatibility"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use DATASOURCE_PREFIX ?


val STATISTICS_PREFIX = SPARK_SQL_PREFIX + "statistics."
val STATISTICS_TOTAL_SIZE = STATISTICS_PREFIX + "totalSize"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,10 +414,7 @@ private[hive] class HiveClientImpl(
unsupportedFeatures += "partitioned view"
}

val properties = Option(h.getParameters).map(_.asScala.toMap).getOrElse(Map())

val provider = properties.get(HiveExternalCatalog.DATASOURCE_PROVIDER)
.orElse(Some(DDLUtils.HIVE_PROVIDER))
val properties = Option(h.getParameters).map(_.asScala.toMap).orNull

// Hive-generated Statistics are also recorded in ignoredProperties
val ignoredProperties = scala.collection.mutable.Map.empty[String, String]
Expand Down Expand Up @@ -472,7 +469,6 @@ private[hive] class HiveClientImpl(
throw new AnalysisException("Hive index table is not supported.")
},
schema = schema,
provider = provider,
partitionColumnNames = partCols.map(_.name),
// If the table is written by Spark, we will put bucketing information in table properties,
// and will always overwrite the bucket spec in hive metastore by the bucketing information
Expand Down Expand Up @@ -913,7 +909,13 @@ private[hive] object HiveClientImpl {
}
// after SPARK-19279, it is not allowed to create a hive table with an empty schema,
// so here we should not add a default col schema
Copy link
Member

@viirya viirya Aug 9, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment looks like needing to move accordingly?

if (schema.isEmpty && DDLUtils.isDatasourceTable(table)) {
//
// Because HiveExternalCatalog sometimes writes back "raw" tables that have not been
// completely translated to Spark's view, the provider information needs to be looked
// up in two places.
val provider = table.provider.orElse(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change would have fixed the second exception in the bug (about storing an empty schema); but the code was just ending up in that situation because of the other problems this PR is fixing. This change shouldn't be needed for the fix, but I included it for further correctness.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the second exception? Could you explain more? If this is fixing a different bug, could you open a new JIRA and put it in the PR title?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you look at the bug, there are two exceptions. One gets logged, the second is thrown and caused the test to fail in my 2.1-based branch.

The exception happened because alterTableSchema is writing back the result of getRawTable. That raw table does not have the provider set; instead, it's in the table's properties. This check looks at both places, so that other code that uses getRawTable can properly pass this check.

As I explained in a previous comment, this doesn't happen anymore for alterTableSchema because of the other changes. But there's still code in the catalog class that writes back tables fetched with getRawTable, so this feels safer.

table.properties.get(HiveExternalCatalog.DATASOURCE_PROVIDER))
if (schema.isEmpty && provider != Some(DDLUtils.HIVE_PROVIDER)) {
// This is a hack to preserve existing behavior. Before Spark 2.0, we do not
// set a default serde here (this was done in Hive), and so if the user provides
// an empty schema Hive would automatically populate the schema with a single
Expand Down