Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -2356,18 +2356,9 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}.getMessage
assert(e.contains("Found duplicate column(s)"))
} else {
if (isUsingHiveMetastore) {
// hive catalog will still complains that c1 is duplicate column name because hive
// identifiers are case insensitive.
val e = intercept[AnalysisException] {
sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
}.getMessage
assert(e.contains("HiveException"))
} else {
sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
assert(spark.table("t1").schema
.equals(new StructType().add("c1", IntegerType).add("C1", StringType)))
}
sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
assert(spark.table("t1").schema ==
new StructType().add("c1", IntegerType).add("C1", StringType))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import org.apache.thrift.TException

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.catalog._
Expand All @@ -43,7 +44,7 @@ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.hive.client.HiveClient
import org.apache.spark.sql.internal.HiveSerDe
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
import org.apache.spark.sql.internal.StaticSQLConf._
import org.apache.spark.sql.types.{DataType, StructType}

Expand Down Expand Up @@ -114,7 +115,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
* should interpret these special data source properties and restore the original table metadata
* before returning it.
*/
private def getRawTable(db: String, table: String): CatalogTable = withClient {
private[hive] def getRawTable(db: String, table: String): CatalogTable = withClient {
client.getTable(db, table)
}

Expand Down Expand Up @@ -257,6 +258,20 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
}
}

/**
* Retrieve a configuration value for the current active session, if any.
*/
private def currentSessionConf[T](entry: ConfigEntry[T]): T = {
SparkSession.getActiveSession.orElse(SparkSession.getDefaultSession).map { session =>
session.conf.get(entry)
}.getOrElse {
// If there's no active session, try to read from the SparkConf object instead. Normally
// there should be an active session, but unit tests invoke methods on the catalog directly,
// so that might not be true in some cases.
conf.get(entry)
}
}

private def createDataSourceTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = {
// data source table always have a provider, it's guaranteed by `DDLUtils.isDatasourceTable`.
val provider = table.provider.get
Expand Down Expand Up @@ -288,6 +303,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
// bucket specification to empty. Note that partition columns are retained, so that we can
// call partition-related Hive API later.
def newSparkSQLSpecificMetastoreTable(): CatalogTable = {
val hiveCompatible = Map(DATASOURCE_HIVE_COMPATIBLE -> "false")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good idea if we do this from the first version. But now, for backward compatibility, we have to handle the case without this special flag at read path, then I can't see the point of having this flag.

table.copy(
// Hive only allows directory paths as location URIs while Spark SQL data source tables
// also allow file paths. For non-hive-compatible format, we should not set location URI
Expand All @@ -297,11 +313,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
properties = storagePropsWithLocation),
schema = table.partitionSchema,
bucketSpec = None,
properties = table.properties ++ tableProperties)
properties = table.properties ++ tableProperties ++ hiveCompatible)
}

// converts the table metadata to Hive compatible format, i.e. set the serde information.
def newHiveCompatibleMetastoreTable(serde: HiveSerDe): CatalogTable = {
val hiveCompatible = Map(DATASOURCE_HIVE_COMPATIBLE -> "true")
val location = if (table.tableType == EXTERNAL) {
// When we hit this branch, we are saving an external data source table with hive
// compatible format, which means the data source is file-based and must have a `path`.
Expand All @@ -320,7 +337,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
serde = serde.serde,
properties = storagePropsWithLocation
),
properties = table.properties ++ tableProperties)
properties = table.properties ++ tableProperties ++ hiveCompatible)
}

val qualifiedTableName = table.identifier.quotedString
Expand All @@ -342,6 +359,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
"Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. "
(None, message)

case _ if currentSessionConf(SQLConf.CASE_SENSITIVE) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should look at the schema instead of looking at the config. It's possible that even case sensitive config is on, the column names are all lowercased and it's still hive compatible.

My proposal: checking schema.asLowerCased == schema, if it's false, then it's not hive compatible. We need to add StructType.asLowerCased though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually is this a useful change? In the read path we still need to handle the case that, a hive compatible table have inconsistent schema between table properties and metadata.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I'll remove this change. The write-path change you propose isn't necessary because if you have an "invalid" schema (same column name with different case), the Hive metastore will complain and the table will be stored as non-Hive-compatible.

The problem this was trying to avoid is related to the changes in alterTableSchema; if you create a Hive-compatible table here, then later tried to update it with an invalid schema, you'd have a frankentable because the code in alterTableSchema was wrong.

But since this change is mainly about fixing alterTableSchema, you'll now get a proper error in that case instead of ending up with a potentially corrupted table.

val message =
s"Persisting case sensitive data source table $qualifiedTableName into " +
"Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. "
(None, message)

case Some(serde) =>
val message =
s"Persisting file based data source table $qualifiedTableName into " +
Expand Down Expand Up @@ -386,6 +409,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
* can be used as table properties later.
*/
private def tableMetaToTableProps(table: CatalogTable): mutable.Map[String, String] = {
tableMetaToTableProps(table, table.schema)
}

private def tableMetaToTableProps(
table: CatalogTable,
schema: StructType): mutable.Map[String, String] = {
val partitionColumns = table.partitionColumnNames
val bucketSpec = table.bucketSpec

Expand All @@ -394,7 +423,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
// property. In this case, we split the JSON string and store each part as a separate table
// property.
val threshold = conf.get(SCHEMA_STRING_LENGTH_THRESHOLD)
val schemaJsonString = table.schema.json
val schemaJsonString = schema.json
// Split the JSON string.
val parts = schemaJsonString.grouped(threshold).toSeq
properties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString)
Expand Down Expand Up @@ -611,21 +640,22 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
override def alterTableSchema(db: String, table: String, schema: StructType): Unit = withClient {
requireTableExists(db, table)
val rawTable = getRawTable(db, table)
val withNewSchema = rawTable.copy(schema = schema)
verifyColumnNames(withNewSchema)
// Add table metadata such as table schema, partition columns, etc. to table properties.
val updatedTable = withNewSchema.copy(
properties = withNewSchema.properties ++ tableMetaToTableProps(withNewSchema))
try {
client.alterTable(updatedTable)
} catch {
case NonFatal(e) =>
val warningMessage =
s"Could not alter schema of table ${rawTable.identifier.quotedString} in a Hive " +
"compatible way. Updating Hive metastore in Spark SQL specific format."
logWarning(warningMessage, e)
client.alterTable(updatedTable.copy(schema = updatedTable.partitionSchema))
val updatedProperties = rawTable.properties ++ tableMetaToTableProps(rawTable, schema)

val updatedTable = if (isHiveCompatible(rawTable)) {
val _updated = rawTable.copy(properties = updatedProperties, schema = schema)
verifyColumnNames(_updated)
_updated
} else {
// If the table is not Hive-compatible, the schema of the table should not be overwritten with
// the updated schema. The previous value stored in the metastore should be preserved; that
// will be either the table's original partition schema, or a placeholder schema inserted by
// the Hive client wrapper if the partition schema was empty.
rawTable.copy(properties = updatedProperties)
}

client.alterTable(updatedTable)
}

override def alterTableStats(
Expand Down Expand Up @@ -1175,6 +1205,27 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
client.listFunctions(db, pattern)
}

/** Detect whether a table is stored with Hive-compatible metadata. */
private def isHiveCompatible(table: CatalogTable): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think an easy way to check hive compatibility is, comparing the schema from table properties and metadata, if they equal without considering case and nullability, it's hive compatible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's already code to check compatibility in createDataSourceTable. The problem is that the logic there can only really be applied in the context of that method. Later on you cannot detect compatibility the same way, and just checking the schema is not enough.

The previous code was prone to leave corrupted tables (in old versions of Hive) or just plain not work (in newer versions of Hive). Having a consistent way of checking for Hive compatibility is key to that fix, because alterTableSchema has to behave differently for each case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the following logics does not work? Could you share the example we can try?

      val schemaFromTableProps = getSchemaFromTableProperties(table)
      val partColumnNames = getPartitionColumnsFromTableProperties(table)
      val reorderedSchema = reorderSchema(schema = schemaFromTableProps, partColumnNames)
      val isHiveCompatible  = DataType.equalsIgnoreCaseAndNullability(reorderedSchema, table.schema)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me revert the question for you: why is it bad to have the compatibility bit be a table property?

You cannot make a non-compatible table compatible after it's been created. At the very least, there is zero code that would achieve that today. And the key to this fix is that the alterTableSchema method needs to know whether the table is compatible or not in a reliable way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And, also, if you look at createDataSourceTable, it does more checks that your example to decide whether a table is Hive-compatible or not. And the ultimate check is whether the Hive metastore accepts the table (exception means Spark tries to create the table again, as non-compatible).

So, again, you cannot detect compatibility by just checking the schema.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is it bad to have the compatibility bit be a table property?

This compatibility bit is only processable/parsable to the Spark SQL (V2.3+, if we added it). If the other Spark SQL engine (e.g., V2.2) share the same metastore, they can make a schema change by altering the table properties (i.e., the behavior before this PR). Then, it will break the assumption we made here. The value of this new compatibility conf/flag becomes invalid.

So far, the safest way to check the compatibility is to compare the schema. If you think it is not enough, we can add the same thing we do for CREATE TABLE

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So far, the safest way to check the compatibility is to compare the schema.

That is not enough, as I've tried to explain several times. alterTableSchema is broken in older Spark versions and can end up creating corrupt tables (where the schema does not match the storage descriptor, for example). So you need a reliable way of detecting compatibility, and the schema is not it.

The closest we have currently is the storage descriptor (the fallback case in my code).

Let me think about what happens when an older Spark touches the tables with the new property set. alterTableSchema even in older Spark versions will maintain that information, but maybe other code paths won't.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A quick look shows that all places that do a metastore alterTable use getRawTable to get the metadata, meaning they would preserve the new property even in old Spark versions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The value of this new compatibility conf/flag becomes invalid.

Also, that's not true. While it's true that old Spark versions can still corrupt these tables, this property is supposed to be a reliable way to detect compatibility going forward, so that if there are more cases similar to this, they can be handled without having to make guesses about whether the table is compatible or not.

So, in my view, as long as old Spark versions don't get rid of the property when altering tables, and it seems they don't, it's beneficial to have this explicit compatibility flag.

val provider = table.provider.orElse(table.properties.get(DATASOURCE_PROVIDER))
if (provider.isDefined && provider != Some(DDLUtils.HIVE_PROVIDER)) {
table.properties.get(DATASOURCE_HIVE_COMPATIBLE) match {
case Some(value) =>
value.toBoolean
case _ =>
// If the property is not set, the table may have been created by an old version
// of Spark. Detect Hive compatibility by comparing the table's serde with the
// serde for the table's data source. If they match, the table is Hive-compatible.
// If they don't, they're not, because of some other table property that made it
// not initially Hive-compatible.
HiveSerDe.sourceToSerDe(provider.get) == table.storage.serde
Copy link
Member

@viirya viirya Aug 9, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a change regarding treating case-sensitive DS tables as Hive-incompatible above. Once the given table is this kind of table without the new DATASOURCE_HIVE_COMPATIBLE property, we should treat it as Hive compatible or incompatible? Looks like for now we treat it as compatible?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Case-sensitive tables are weird. They're a session configuration, but IMO that config should affect compatibility, because even if you create a table that is Hive compatible initially, you could modify it later so that it's not Hive compatible anymore. Seems like the 1.2 Hive libraries would allow the broken metadata, while the 2.1 libraries complain about it.

So yes, currently when case-sensitivity is enabled you still create tables that may be Hive-compatible, and this change forces those tables to not be Hive-compatible.

As for existing tables, there's no way to know, because that data is not present anywhere in the table's metadata. (It's not after my change either, so basically you can read that table with a case-insensitive session and who knows what might happen.)

I'm ok with reverting this part since it's all a little hazy, but just wanted to point out that it's a kinda weird part of the code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey all, could I get a thumbs up / down on the case-sensitiveness-handling part of this change?

}
} else {
// All non-DS tables are treated as regular Hive tables.
true
}
}

}

object HiveExternalCatalog {
Expand All @@ -1193,6 +1244,7 @@ object HiveExternalCatalog {
val DATASOURCE_SCHEMA_PARTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "partCol."
val DATASOURCE_SCHEMA_BUCKETCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "bucketCol."
val DATASOURCE_SCHEMA_SORTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "sortCol."
val DATASOURCE_HIVE_COMPATIBLE = DATASOURCE_PREFIX + "hive.compatibility"

val STATISTICS_PREFIX = SPARK_SQL_PREFIX + "statistics."
val STATISTICS_TOTAL_SIZE = STATISTICS_PREFIX + "totalSize"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.client.HiveClientImpl._
import org.apache.spark.sql.types._
import org.apache.spark.util.{CircularBuffer, Utils}
Expand Down Expand Up @@ -908,7 +909,13 @@ private[hive] object HiveClientImpl {
}
// after SPARK-19279, it is not allowed to create a hive table with an empty schema,
// so here we should not add a default col schema
Copy link
Member

@viirya viirya Aug 9, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment looks like needing to move accordingly?

if (schema.isEmpty && DDLUtils.isDatasourceTable(table)) {
//
// Because HiveExternalCatalog sometimes writes back "raw" tables that have not been
// completely translated to Spark's view, the provider information needs to be looked
// up in two places.
val provider = table.provider.orElse(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change would have fixed the second exception in the bug (about storing an empty schema); but the code was just ending up in that situation because of the other problems this PR is fixing. This change shouldn't be needed for the fix, but I included it for further correctness.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the second exception? Could you explain more? If this is fixing a different bug, could you open a new JIRA and put it in the PR title?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you look at the bug, there are two exceptions. One gets logged, the second is thrown and caused the test to fail in my 2.1-based branch.

The exception happened because alterTableSchema is writing back the result of getRawTable. That raw table does not have the provider set; instead, it's in the table's properties. This check looks at both places, so that other code that uses getRawTable can properly pass this check.

As I explained in a previous comment, this doesn't happen anymore for alterTableSchema because of the other changes. But there's still code in the catalog class that writes back tables fetched with getRawTable, so this feels safer.

table.properties.get(HiveExternalCatalog.DATASOURCE_PROVIDER))
if (schema.isEmpty && provider != Some(DDLUtils.HIVE_PROVIDER)) {
// This is a hack to preserve existing behavior. Before Spark 2.0, we do not
// set a default serde here (this was done in Hive), and so if the user provides
// an empty schema Hive would automatically populate the schema with a single
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive.execution

import scala.language.existentials

import org.apache.hadoop.conf.Configuration
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils}
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.StaticSQLConf._
import org.apache.spark.sql.types._
import org.apache.spark.tags.ExtendedHiveTest
import org.apache.spark.util.Utils

/**
* A separate set of DDL tests that uses Hive 2.1 libraries, which behave a little differently
* from the built-in ones.
*/
@ExtendedHiveTest
class Hive_2_1_DDLSuite extends SparkFunSuite with TestHiveSingleton with BeforeAndAfterEach
with BeforeAndAfterAll {

// Create a custom HiveExternalCatalog instance with the desired configuration. We cannot
// use SparkSession here since there's already an active on managed by the TestHive object.
private var catalog = {
val warehouse = Utils.createTempDir()
val metastore = Utils.createTempDir()
metastore.delete()
val sparkConf = new SparkConf()
.set(SparkLauncher.SPARK_MASTER, "local")
.set(WAREHOUSE_PATH.key, warehouse.toURI().toString())
.set(CATALOG_IMPLEMENTATION.key, "hive")
.set(HiveUtils.HIVE_METASTORE_VERSION.key, "2.1")
.set(HiveUtils.HIVE_METASTORE_JARS.key, "maven")

val hadoopConf = new Configuration()
hadoopConf.set("hive.metastore.warehouse.dir", warehouse.toURI().toString())
hadoopConf.set("javax.jdo.option.ConnectionURL",
s"jdbc:derby:;databaseName=${metastore.getAbsolutePath()};create=true")
// These options are needed since the defaults in Hive 2.1 cause exceptions with an
// empty metastore db.
hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
hadoopConf.set("hive.metastore.schema.verification", "false")

new HiveExternalCatalog(sparkConf, hadoopConf)
}

override def afterEach: Unit = {
catalog.listTables("default").foreach { t =>
catalog.dropTable("default", t, true, false)
}
spark.sessionState.catalog.reset()
}

override def afterAll(): Unit = {
catalog = null
}

test("SPARK-21617: ALTER TABLE for non-compatible DataSource tables") {
testAlterTable(
"t1",
"CREATE TABLE t1 (c1 int) USING json",
StructType(Array(StructField("c1", IntegerType), StructField("c2", IntegerType))),
hiveCompatible = false)
}

test("SPARK-21617: ALTER TABLE for Hive-compatible DataSource tables") {
testAlterTable(
"t1",
"CREATE TABLE t1 (c1 int) USING parquet",
StructType(Array(StructField("c1", IntegerType), StructField("c2", IntegerType))))
}

test("SPARK-21617: ALTER TABLE for Hive tables") {
testAlterTable(
"t1",
"CREATE TABLE t1 (c1 int) STORED AS parquet",
StructType(Array(StructField("c1", IntegerType), StructField("c2", IntegerType))))
}

test("SPARK-21617: ALTER TABLE with incompatible schema on Hive-compatible table") {
val exception = intercept[AnalysisException] {
testAlterTable(
"t1",
"CREATE TABLE t1 (c1 string) USING parquet",
StructType(Array(StructField("c2", IntegerType))))
}
assert(exception.getMessage().contains("types incompatible with the existing columns"))
}

private def testAlterTable(
tableName: String,
createTableStmt: String,
updatedSchema: StructType,
hiveCompatible: Boolean = true): Unit = {
spark.sql(createTableStmt)
val oldTable = spark.sessionState.catalog.externalCatalog.getTable("default", tableName)
catalog.createTable(oldTable, true)
catalog.alterTableSchema("default", tableName, updatedSchema)

val updatedTable = catalog.getTable("default", tableName)
assert(updatedTable.schema.fieldNames === updatedSchema.fieldNames)

val rawTable = catalog.getRawTable("default", tableName)
val compatibility = rawTable.properties.get(HiveExternalCatalog.DATASOURCE_HIVE_COMPATIBLE)
.map(_.toBoolean).getOrElse(true)
assert(hiveCompatible === compatibility)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need to test whether Hive can still read the altered table schema by using

spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client.runSqlHive

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not easy to do here. The catalog being updated is not the same as the one the spark session is using. You can potentially run queries against the 2.1 catalog in the test, but how do you insert data into the table? (You could run a Hive query for that to, but then what's the point?)

I'd argue this kind of test should be done in HiveDDLSuite if it doesn't do it now; and if it's desired to test against multiple Hive versions, that it needs to be re-worked so it can be run against multiple Hive versions. But TestHiveSingleton makes that really hard currently, and fixing that is way beyond the scope of this change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My only comment here is to ensure the altered table is still readable by Hive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand, but it's really hard to write that kind of test without a serious rewrite of the tests in the hive module, so that you can have multiple SparkSession instances.

Right now, I think the best we can achieve is "the metastore has accepted the table so the metadata looks ok", and assume that the tests performed elsewhere (e.g. HiveDDLSuite), where a proper SparkSession exists, are enough to make sure Hive can read the data.

Copy link
Member

@gatorsmile gatorsmile Aug 10, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked the test case coverage. We do not have such a check. Could you add them in the following test cases?
https://github.com/apache/spark/blob/master/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala#L1865-L1923

I think this PR is also trying to make Hive readable after Spark adds columns.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this PR is also trying to make Hive readable after Spark adds columns.

No, that should be the case before already. This PR is just to make the existing feature work on Hive 2.1.

I really would like to avoid turning this PR into "let's fix all the Hive tests to make sure they make sense". If you'd like I can open a bug to track that, but that is not what this change is about and I'd like to keep it focused.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, we can do it in a separate PR.

}

}