-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-21617][SQL] Store correct table metadata when altering schema in Hive metastore. #18849
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
aae3abd
2350b10
7ccf474
40ebc96
2f57a3c
0b27209
6824e35
7b777ed
abd6cf1
4a05b55
3c8a6bb
b63539d
cef66ac
c41683a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -32,7 +32,8 @@ import org.apache.thrift.TException | |
|
|
||
| import org.apache.spark.{SparkConf, SparkException} | ||
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.sql.AnalysisException | ||
| import org.apache.spark.internal.config.ConfigEntry | ||
| import org.apache.spark.sql.{AnalysisException, SparkSession} | ||
| import org.apache.spark.sql.catalyst.TableIdentifier | ||
| import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException | ||
| import org.apache.spark.sql.catalyst.catalog._ | ||
|
|
@@ -43,7 +44,7 @@ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap | |
| import org.apache.spark.sql.execution.command.DDLUtils | ||
| import org.apache.spark.sql.execution.datasources.PartitioningUtils | ||
| import org.apache.spark.sql.hive.client.HiveClient | ||
| import org.apache.spark.sql.internal.HiveSerDe | ||
| import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} | ||
| import org.apache.spark.sql.internal.StaticSQLConf._ | ||
| import org.apache.spark.sql.types.{DataType, StructType} | ||
|
|
||
|
|
@@ -114,7 +115,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat | |
| * should interpret these special data source properties and restore the original table metadata | ||
| * before returning it. | ||
| */ | ||
| private def getRawTable(db: String, table: String): CatalogTable = withClient { | ||
| private[hive] def getRawTable(db: String, table: String): CatalogTable = withClient { | ||
| client.getTable(db, table) | ||
| } | ||
|
|
||
|
|
@@ -257,6 +258,20 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Retrieve a configuration value for the current active session, if any. | ||
| */ | ||
| private def currentSessionConf[T](entry: ConfigEntry[T]): T = { | ||
| SparkSession.getActiveSession.orElse(SparkSession.getDefaultSession).map { session => | ||
| session.conf.get(entry) | ||
| }.getOrElse { | ||
| // If there's no active session, try to read from the SparkConf object instead. Normally | ||
| // there should be an active session, but unit tests invoke methods on the catalog directly, | ||
| // so that might not be true in some cases. | ||
| conf.get(entry) | ||
| } | ||
| } | ||
|
|
||
| private def createDataSourceTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = { | ||
| // data source table always have a provider, it's guaranteed by `DDLUtils.isDatasourceTable`. | ||
| val provider = table.provider.get | ||
|
|
@@ -288,6 +303,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat | |
| // bucket specification to empty. Note that partition columns are retained, so that we can | ||
| // call partition-related Hive API later. | ||
| def newSparkSQLSpecificMetastoreTable(): CatalogTable = { | ||
| val hiveCompatible = Map(DATASOURCE_HIVE_COMPATIBLE -> "false") | ||
| table.copy( | ||
| // Hive only allows directory paths as location URIs while Spark SQL data source tables | ||
| // also allow file paths. For non-hive-compatible format, we should not set location URI | ||
|
|
@@ -297,11 +313,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat | |
| properties = storagePropsWithLocation), | ||
| schema = table.partitionSchema, | ||
| bucketSpec = None, | ||
| properties = table.properties ++ tableProperties) | ||
| properties = table.properties ++ tableProperties ++ hiveCompatible) | ||
| } | ||
|
|
||
| // converts the table metadata to Hive compatible format, i.e. set the serde information. | ||
| def newHiveCompatibleMetastoreTable(serde: HiveSerDe): CatalogTable = { | ||
| val hiveCompatible = Map(DATASOURCE_HIVE_COMPATIBLE -> "true") | ||
| val location = if (table.tableType == EXTERNAL) { | ||
| // When we hit this branch, we are saving an external data source table with hive | ||
| // compatible format, which means the data source is file-based and must have a `path`. | ||
|
|
@@ -320,7 +337,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat | |
| serde = serde.serde, | ||
| properties = storagePropsWithLocation | ||
| ), | ||
| properties = table.properties ++ tableProperties) | ||
| properties = table.properties ++ tableProperties ++ hiveCompatible) | ||
| } | ||
|
|
||
| val qualifiedTableName = table.identifier.quotedString | ||
|
|
@@ -342,6 +359,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat | |
| "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " | ||
| (None, message) | ||
|
|
||
| case _ if currentSessionConf(SQLConf.CASE_SENSITIVE) => | ||
|
||
| val message = | ||
| s"Persisting case sensitive data source table $qualifiedTableName into " + | ||
| "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " | ||
| (None, message) | ||
|
|
||
| case Some(serde) => | ||
| val message = | ||
| s"Persisting file based data source table $qualifiedTableName into " + | ||
|
|
@@ -386,6 +409,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat | |
| * can be used as table properties later. | ||
| */ | ||
| private def tableMetaToTableProps(table: CatalogTable): mutable.Map[String, String] = { | ||
| tableMetaToTableProps(table, table.schema) | ||
| } | ||
|
|
||
| private def tableMetaToTableProps( | ||
| table: CatalogTable, | ||
| schema: StructType): mutable.Map[String, String] = { | ||
| val partitionColumns = table.partitionColumnNames | ||
| val bucketSpec = table.bucketSpec | ||
|
|
||
|
|
@@ -394,7 +423,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat | |
| // property. In this case, we split the JSON string and store each part as a separate table | ||
| // property. | ||
| val threshold = conf.get(SCHEMA_STRING_LENGTH_THRESHOLD) | ||
| val schemaJsonString = table.schema.json | ||
| val schemaJsonString = schema.json | ||
| // Split the JSON string. | ||
| val parts = schemaJsonString.grouped(threshold).toSeq | ||
| properties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString) | ||
|
|
@@ -611,21 +640,41 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat | |
| override def alterTableSchema(db: String, table: String, schema: StructType): Unit = withClient { | ||
| requireTableExists(db, table) | ||
| val rawTable = getRawTable(db, table) | ||
| val withNewSchema = rawTable.copy(schema = schema) | ||
| verifyColumnNames(withNewSchema) | ||
| // Add table metadata such as table schema, partition columns, etc. to table properties. | ||
| val updatedTable = withNewSchema.copy( | ||
| properties = withNewSchema.properties ++ tableMetaToTableProps(withNewSchema)) | ||
| try { | ||
| client.alterTable(updatedTable) | ||
| } catch { | ||
| case NonFatal(e) => | ||
| val warningMessage = | ||
| s"Could not alter schema of table ${rawTable.identifier.quotedString} in a Hive " + | ||
| "compatible way. Updating Hive metastore in Spark SQL specific format." | ||
| logWarning(warningMessage, e) | ||
| client.alterTable(updatedTable.copy(schema = updatedTable.partitionSchema)) | ||
| val updatedProperties = rawTable.properties ++ tableMetaToTableProps(rawTable, schema) | ||
|
|
||
| // Detect whether this is a Hive-compatible table. | ||
| val provider = rawTable.properties.get(DATASOURCE_PROVIDER) | ||
| val isHiveCompatible = if (provider.isDefined && provider != Some(DDLUtils.HIVE_PROVIDER)) { | ||
|
||
| rawTable.properties.get(DATASOURCE_HIVE_COMPATIBLE) match { | ||
| case Some(value) => | ||
| value.toBoolean | ||
| case _ => | ||
| // If the property is not set, the table may have been created by an old version | ||
| // of Spark. Detect Hive compatibility by comparing the table's serde with the | ||
| // serde for the table's data source. If they match, the table is Hive-compatible. | ||
| // If they don't, they're not, because of some other table property that made it | ||
| // not initially Hive-compatible. | ||
| HiveSerDe.sourceToSerDe(provider.get) == rawTable.storage.serde | ||
| } | ||
| } else { | ||
| // All non-DS tables are treated as regular Hive tables. | ||
| true | ||
| } | ||
|
|
||
| val updatedTable = if (isHiveCompatible) { | ||
| val _updated = rawTable.copy(properties = updatedProperties, schema = schema) | ||
| verifyColumnNames(_updated) | ||
| _updated | ||
| } else { | ||
| // If the table is not Hive-compatible, the schema of the table should not be overwritten with | ||
| // the updated schema. The previous value stored in the metastore should be preserved; that | ||
| // will be either the table's original partition schema, or a placeholder schema inserted by | ||
| // the Hive client wrapper if the partition schema was empty. | ||
| rawTable.copy(properties = updatedProperties) | ||
| } | ||
|
|
||
| client.alterTable(updatedTable) | ||
| } | ||
|
|
||
| override def alterTableStats( | ||
|
|
@@ -1193,6 +1242,7 @@ object HiveExternalCatalog { | |
| val DATASOURCE_SCHEMA_PARTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "partCol." | ||
| val DATASOURCE_SCHEMA_BUCKETCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "bucketCol." | ||
| val DATASOURCE_SCHEMA_SORTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "sortCol." | ||
| val DATASOURCE_HIVE_COMPATIBLE = DATASOURCE_PREFIX + "hive.compatibility" | ||
|
|
||
| val STATISTICS_PREFIX = SPARK_SQL_PREFIX + "statistics." | ||
| val STATISTICS_TOTAL_SIZE = STATISTICS_PREFIX + "totalSize" | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -49,6 +49,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression | |
| import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} | ||
| import org.apache.spark.sql.execution.QueryExecutionException | ||
| import org.apache.spark.sql.execution.command.DDLUtils | ||
| import org.apache.spark.sql.hive.HiveExternalCatalog | ||
| import org.apache.spark.sql.hive.client.HiveClientImpl._ | ||
| import org.apache.spark.sql.types._ | ||
| import org.apache.spark.util.{CircularBuffer, Utils} | ||
|
|
@@ -908,7 +909,13 @@ private[hive] object HiveClientImpl { | |
| } | ||
| // after SPARK-19279, it is not allowed to create a hive table with an empty schema, | ||
| // so here we should not add a default col schema | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This comment looks like needing to move accordingly? |
||
| if (schema.isEmpty && DDLUtils.isDatasourceTable(table)) { | ||
| // | ||
| // Because HiveExternalCatalog sometimes writes back "raw" tables that have not been | ||
| // completely translated to Spark's view, the provider information needs to be looked | ||
| // up in two places. | ||
| val provider = table.provider.orElse( | ||
|
||
| table.properties.get(HiveExternalCatalog.DATASOURCE_PROVIDER)) | ||
| if (schema.isEmpty && provider != Some(DDLUtils.HIVE_PROVIDER)) { | ||
| // This is a hack to preserve existing behavior. Before Spark 2.0, we do not | ||
| // set a default serde here (this was done in Hive), and so if the user provides | ||
| // an empty schema Hive would automatically populate the schema with a single | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,131 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.hive.execution | ||
|
|
||
| import scala.language.existentials | ||
|
|
||
| import org.apache.hadoop.conf.Configuration | ||
| import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} | ||
|
|
||
| import org.apache.spark.{SparkConf, SparkFunSuite} | ||
| import org.apache.spark.launcher.SparkLauncher | ||
| import org.apache.spark.sql.AnalysisException | ||
| import org.apache.spark.sql.catalyst.catalog._ | ||
| import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils} | ||
| import org.apache.spark.sql.hive.test.TestHiveSingleton | ||
| import org.apache.spark.sql.internal.StaticSQLConf._ | ||
| import org.apache.spark.sql.types._ | ||
| import org.apache.spark.tags.ExtendedHiveTest | ||
| import org.apache.spark.util.Utils | ||
|
|
||
| /** | ||
| * A separate set of DDL tests that uses Hive 2.1 libraries, which behave a little differently | ||
| * from the built-in ones. | ||
| */ | ||
| @ExtendedHiveTest | ||
| class Hive_2_1_DDLSuite extends SparkFunSuite with TestHiveSingleton with BeforeAndAfterEach | ||
| with BeforeAndAfterAll { | ||
|
|
||
| // Create a custom HiveExternalCatalog instance with the desired configuration. We cannot | ||
| // use SparkSession here since there's already an active on managed by the TestHive object. | ||
| private var catalog = { | ||
| val warehouse = Utils.createTempDir() | ||
| val metastore = Utils.createTempDir() | ||
| metastore.delete() | ||
| val sparkConf = new SparkConf() | ||
| .set(SparkLauncher.SPARK_MASTER, "local") | ||
| .set(WAREHOUSE_PATH.key, warehouse.toURI().toString()) | ||
| .set(CATALOG_IMPLEMENTATION.key, "hive") | ||
| .set(HiveUtils.HIVE_METASTORE_VERSION.key, "2.1") | ||
| .set(HiveUtils.HIVE_METASTORE_JARS.key, "maven") | ||
|
|
||
| val hadoopConf = new Configuration() | ||
| hadoopConf.set("hive.metastore.warehouse.dir", warehouse.toURI().toString()) | ||
| hadoopConf.set("javax.jdo.option.ConnectionURL", | ||
| s"jdbc:derby:;databaseName=${metastore.getAbsolutePath()};create=true") | ||
| // These options are needed since the defaults in Hive 2.1 cause exceptions with an | ||
| // empty metastore db. | ||
| hadoopConf.set("datanucleus.schema.autoCreateAll", "true") | ||
| hadoopConf.set("hive.metastore.schema.verification", "false") | ||
|
|
||
| new HiveExternalCatalog(sparkConf, hadoopConf) | ||
| } | ||
|
|
||
| override def afterEach: Unit = { | ||
| catalog.listTables("default").foreach { t => | ||
| catalog.dropTable("default", t, true, false) | ||
| } | ||
| spark.sessionState.catalog.reset() | ||
| } | ||
|
|
||
| override def afterAll(): Unit = { | ||
| catalog = null | ||
| } | ||
|
|
||
| test("SPARK-21617: ALTER TABLE for non-compatible DataSource tables") { | ||
| testAlterTable( | ||
| "t1", | ||
| "CREATE TABLE t1 (c1 int) USING json", | ||
| StructType(Array(StructField("c1", IntegerType), StructField("c2", IntegerType))), | ||
| hiveCompatible = false) | ||
| } | ||
|
|
||
| test("SPARK-21617: ALTER TABLE for Hive-compatible DataSource tables") { | ||
| testAlterTable( | ||
| "t1", | ||
| "CREATE TABLE t1 (c1 int) USING parquet", | ||
| StructType(Array(StructField("c1", IntegerType), StructField("c2", IntegerType)))) | ||
| } | ||
|
|
||
| test("SPARK-21617: ALTER TABLE for Hive tables") { | ||
| testAlterTable( | ||
| "t1", | ||
| "CREATE TABLE t1 (c1 int) STORED AS parquet", | ||
| StructType(Array(StructField("c1", IntegerType), StructField("c2", IntegerType)))) | ||
| } | ||
|
|
||
| test("SPARK-21617: ALTER TABLE with incompatible schema on Hive-compatible table") { | ||
| val exception = intercept[AnalysisException] { | ||
| testAlterTable( | ||
| "t1", | ||
| "CREATE TABLE t1 (c1 string) USING parquet", | ||
| StructType(Array(StructField("c2", IntegerType)))) | ||
| } | ||
| assert(exception.getMessage().contains("types incompatible with the existing columns")) | ||
| } | ||
|
|
||
| private def testAlterTable( | ||
| tableName: String, | ||
| createTableStmt: String, | ||
| updatedSchema: StructType, | ||
| hiveCompatible: Boolean = true): Unit = { | ||
| spark.sql(createTableStmt) | ||
| val oldTable = spark.sessionState.catalog.externalCatalog.getTable("default", tableName) | ||
| catalog.createTable(oldTable, true) | ||
| catalog.alterTableSchema("default", tableName, updatedSchema) | ||
|
|
||
| val updatedTable = catalog.getTable("default", tableName) | ||
| assert(updatedTable.schema.fieldNames === updatedSchema.fieldNames) | ||
|
|
||
| val rawTable = catalog.getRawTable("default", tableName) | ||
| val compatibility = rawTable.properties.get(HiveExternalCatalog.DATASOURCE_HIVE_COMPATIBLE) | ||
| .map(_.toBoolean).getOrElse(true) | ||
| assert(hiveCompatible === compatibility) | ||
|
||
| } | ||
|
|
||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good idea if we do this from the first version. But now, for backward compatibility, we have to handle the case without this special flag at read path, then I can't see the point of having this flag.