Skip to content

Commit afa5d85

Browse files
committed
address comments
1 parent 460302c commit afa5d85

File tree

1 file changed

+68
-25
lines changed

1 file changed

+68
-25
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala

Lines changed: 68 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
9696
}
9797

9898
/**
99-
* Get the raw table metadata from hive metastore directly.
99+
* Get the raw table metadata from hive metastore directly. The raw table metadata may contains
100+
* special data source properties and should not be exposed outside of `HiveExternalCatalog`. We
101+
* should interpret these special data source properties and restore the original table metadata
102+
* before returning it.
100103
*/
101104
private def getRawTable(db: String, table: String): CatalogTable = withClient {
102105
client.getTable(db, table)
@@ -193,12 +196,19 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
193196

194197
if (tableDefinition.tableType == VIEW) {
195198
client.createTable(tableDefinition, ignoreIfExists)
199+
} else if (tableDefinition.provider.get == "hive") {
200+
// Here we follow data source tables and put table metadata like provider, schema, etc. in
201+
// table properties, so that we can work around the Hive metastore issue about not case
202+
// preserving and make Hive serde table support mixed-case column names.
203+
val tableWithDataSourceProps = tableDefinition.copy(
204+
properties = tableDefinition.properties ++ tableMetaToTableProps(tableDefinition))
205+
client.createTable(tableWithDataSourceProps, ignoreIfExists)
196206
} else {
197207
// To work around some hive metastore issues, e.g. not case-preserving, bad decimal type
198208
// support, no column nullability, etc., we should do some extra works before saving table
199209
// metadata into Hive metastore:
200210
// 1. Put table metadata like provider, schema, etc. in table properties.
201-
// 2. Check if this table is hive compatible(Hive serde table is obviously Hive compatible)
211+
// 2. Check if this table is hive compatible.
202212
// 2.1 If it's not hive compatible, set location URI, schema, partition columns and bucket
203213
// spec to empty and save table metadata to Hive.
204214
// 2.2 If it's hive compatible, set serde information in table metadata and try to save
@@ -267,12 +277,6 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
267277
.getOrElse("skipHiveMetadata", "false").toBoolean
268278

269279
val (hiveCompatibleTable, logMessage) = maybeSerde match {
270-
case _ if tableDefinition.provider.get == "hive" =>
271-
val message = s"Persisting Hive serde table $qualifiedTableName into Hive metastore."
272-
val tableWithDataSourceProps = tableDefinition.copy(
273-
properties = tableDefinition.properties ++ tableProperties)
274-
(Some(tableWithDataSourceProps), message)
275-
276280
case _ if skipHiveMetadata =>
277281
val message =
278282
s"Persisting data source table $qualifiedTableName into Hive metastore in" +
@@ -326,6 +330,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
326330
}
327331

328332
/**
333+
* Data source tables may be non Hive compatible and we need to store table metadata in table
334+
* properties to workaround some Hive metastore limitations.
329335
* This method puts table provider, partition provider, schema, partition column names, bucket
330336
* specification into a map, which can be used as table properties later.
331337
*/
@@ -386,8 +392,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
386392
}
387393

388394
private def saveTableIntoHive(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = {
395+
assert(DDLUtils.isDatasourceTable(tableDefinition),
396+
"saveTableIntoHive only takes data source table.")
389397
// If this is an external data source table...
390-
if (tableDefinition.provider.get != "hive" && tableDefinition.tableType == EXTERNAL &&
398+
if (tableDefinition.tableType == EXTERNAL &&
391399
// ... that is not persisted as Hive compatible format (external tables in Hive compatible
392400
// format always set `locationUri` to the actual data location and should NOT be hacked as
393401
// following.)
@@ -431,6 +439,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
431439
override def renameTable(db: String, oldName: String, newName: String): Unit = withClient {
432440
val rawTable = getRawTable(db, oldName)
433441

442+
// Note that Hive serde tables don't use path option in storage properties to store the value
443+
// of table location, but use `locationUri` field to store it directly. And `locationUri` field
444+
// will be updated automatically in Hive metastore by the `alterTable` call at the end of this
445+
// method. Here we only update the path option if the path option already exists in storage
446+
// properties, to avoid adding a unnecessary path option for Hive serde tables.
434447
val hasPathOption = new CaseInsensitiveMap(rawTable.storage.properties).contains("path")
435448
val storageWithNewPath = if (rawTable.tableType == MANAGED && hasPathOption) {
436449
// If it's a managed table with path option and we are renaming it, then the path option
@@ -498,18 +511,42 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
498511
val newStorage = if (tableDefinition.provider.get == "hive") {
499512
tableDefinition.storage
500513
} else {
514+
// We can't alter the table storage of data source table directly for 2 reasons:
515+
// 1. internally we use path option in storage properties to store the value of table
516+
// location, but the given `tableDefinition` is from outside and doesn't have the path
517+
// option, we need to add it manually.
518+
// 2. this data source table may be created on a file, not a directory, then we can't set
519+
// the `locationUri` field and save it to Hive metastore, because Hive only allows
520+
// directory as table location.
521+
//
522+
// For example, an external data source table is created with a single file '/path/to/file'.
523+
// Internally, we will add a path option with value '/path/to/file' to storage properties,
524+
// and set the `locationUri` to a special value due to SPARK-15269(please see
525+
// `saveTableIntoHive` for more details). When users try to get the table metadata back, we
526+
// will restore the `locationUri` field from the path option and remove the path option from
527+
// storage properties. When users try to alter the table storage, the given
528+
// `tableDefinition` will have `locationUri` field with value `/path/to/file` and the path
529+
// option is not set.
530+
//
531+
// Here we need 2 extra steps:
532+
// 1. add path option to storage properties, to match the internal format, i.e. using path
533+
// option to store the value of table location.
534+
// 2. set the `locationUri` field back to the old one from the existing table metadata,
535+
// if users don't want to alter the table location. This step is necessary as the
536+
// `locationUri` is not always same with the path option, e.g. in the above example
537+
// `locationUri` is a special value and we should respect it. Note that, if users
538+
// want to alter the table location to a file path, we will fail. This should be fixed
539+
// in the future.
540+
501541
val newLocation = tableDefinition.storage.locationUri
502542
val storageWithPathOption = tableDefinition.storage.copy(
503543
properties = tableDefinition.storage.properties ++ newLocation.map("path" -> _))
504544

505-
// For data source tables, only update the `locationUri` field if the location is really
506-
// changed, because this table may be not Hive-compatible and can not set the `locationUri`
507-
// field. We should respect the old `locationUri` even it's None.
508545
val oldLocation = getLocationFromStorageProps(oldTableDef)
509546
if (oldLocation == newLocation) {
510547
storageWithPathOption.copy(locationUri = oldTableDef.storage.locationUri)
511548
} else {
512-
storageWithPathOption.copy(locationUri = newLocation)
549+
storageWithPathOption
513550
}
514551
}
515552

@@ -558,14 +595,16 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
558595
val tableWithSchema = if (table.tableType == VIEW) {
559596
table
560597
} else {
561-
getProviderFromTableProperties(table).map { provider =>
562-
if (provider == "hive") {
563-
restoreHiveSerdeTable(table)
564-
} else {
565-
restoreDataSourceTable(table, provider)
566-
}
567-
} getOrElse {
568-
table.copy(provider = Some("hive"), tracksPartitionsInCatalog = true)
598+
getProviderFromTableProperties(table) match {
599+
// No provider in table properties, which means this table is created by Spark prior to 2.1,
600+
// or is created at Hive side.
601+
case None => table.copy(provider = Some("hive"), tracksPartitionsInCatalog = true)
602+
603+
// This is a Hive serde table created by Spark 2.1 or higher versions.
604+
case Some("hive") => restoreHiveSerdeTable(table)
605+
606+
// This is a regular data source table.
607+
case Some(provider) => restoreDataSourceTable(table, provider)
569608
}
570609
}
571610

@@ -669,8 +708,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
669708

670709
val orderedPartitionSpec = new util.LinkedHashMap[String, String]()
671710
getTable(db, table).partitionColumnNames.foreach { colName =>
672-
// Lowercase the partition column names before passing the partition spec to Hive client, as
673-
// Hive metastore is not case preserving.
711+
// Hive metastore is not case preserving and keeps partition columns with lower cased names,
712+
// and Hive will validate the column names in partition spec to make sure they are partition
713+
// columns. Here we Lowercase the column names before passing the partition spec to Hive
714+
// client, to satisfy Hive.
674715
orderedPartitionSpec.put(colName.toLowerCase, partition(colName))
675716
}
676717

@@ -696,8 +737,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
696737

697738
val orderedPartitionSpec = new util.LinkedHashMap[String, String]()
698739
getTable(db, table).partitionColumnNames.foreach { colName =>
699-
// Lowercase the partition column names before passing the partition spec to Hive client, as
700-
// Hive metastore is not case preserving.
740+
// Hive metastore is not case preserving and keeps partition columns with lower cased names,
741+
// and Hive will validate the column names in partition spec to make sure they are partition
742+
// columns. Here we Lowercase the column names before passing the partition spec to Hive
743+
// client, to satisfy Hive.
701744
orderedPartitionSpec.put(colName.toLowerCase, partition(colName))
702745
}
703746

0 commit comments

Comments
 (0)