Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -2356,18 +2356,9 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}.getMessage
assert(e.contains("Found duplicate column(s)"))
} else {
if (isUsingHiveMetastore) {
// hive catalog will still complains that c1 is duplicate column name because hive
// identifiers are case insensitive.
val e = intercept[AnalysisException] {
sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
}.getMessage
assert(e.contains("HiveException"))
} else {
sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
assert(spark.table("t1").schema
.equals(new StructType().add("c1", IntegerType).add("C1", StringType)))
}
sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
assert(spark.table("t1").schema
.equals(new StructType().add("c1", IntegerType).add("C1", StringType)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.equals -> ==

}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import org.apache.thrift.TException

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.catalog._
Expand All @@ -43,7 +44,7 @@ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.hive.client.HiveClient
import org.apache.spark.sql.internal.HiveSerDe
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
import org.apache.spark.sql.internal.StaticSQLConf._
import org.apache.spark.sql.types.{DataType, StructType}

Expand Down Expand Up @@ -257,6 +258,20 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
}
}

/**
* Retrieve a configuration value for the current active session, if any.
*/
private def currentSessionConf[T](entry: ConfigEntry[T]): T = {
SparkSession.getActiveSession.orElse(SparkSession.getDefaultSession).map { session =>
session.conf.get(entry)
}.getOrElse {
// If there's no active session, try to read from the SparkConf object instead. Normally
// there should be an active session, but unit tests invoke methods on the catalog directly,
// so that might not be true in some cases.
conf.get(entry)
}
}

private def createDataSourceTable(table: CatalogTable, ignoreIfExists: Boolean): Unit = {
// data source table always have a provider, it's guaranteed by `DDLUtils.isDatasourceTable`.
val provider = table.provider.get
Expand Down Expand Up @@ -288,6 +303,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
// bucket specification to empty. Note that partition columns are retained, so that we can
// call partition-related Hive API later.
def newSparkSQLSpecificMetastoreTable(): CatalogTable = {
val hiveCompatible = Map(DATASOURCE_HIVE_COMPATIBLE -> "false")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good idea if we do this from the first version. But now, for backward compatibility, we have to handle the case without this special flag at read path, then I can't see the point of having this flag.

table.copy(
// Hive only allows directory paths as location URIs while Spark SQL data source tables
// also allow file paths. For non-hive-compatible format, we should not set location URI
Expand All @@ -297,11 +313,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
properties = storagePropsWithLocation),
schema = table.partitionSchema,
bucketSpec = None,
properties = table.properties ++ tableProperties)
properties = table.properties ++ tableProperties ++ hiveCompatible)
}

// converts the table metadata to Hive compatible format, i.e. set the serde information.
def newHiveCompatibleMetastoreTable(serde: HiveSerDe): CatalogTable = {
val hiveCompatible = Map(DATASOURCE_HIVE_COMPATIBLE -> "true")
val location = if (table.tableType == EXTERNAL) {
// When we hit this branch, we are saving an external data source table with hive
// compatible format, which means the data source is file-based and must have a `path`.
Expand All @@ -320,7 +337,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
serde = serde.serde,
properties = storagePropsWithLocation
),
properties = table.properties ++ tableProperties)
properties = table.properties ++ tableProperties ++ hiveCompatible)
}

val qualifiedTableName = table.identifier.quotedString
Expand All @@ -342,6 +359,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
"Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. "
(None, message)

case _ if currentSessionConf(SQLConf.CASE_SENSITIVE) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should look at the schema instead of looking at the config. It's possible that even case sensitive config is on, the column names are all lowercased and it's still hive compatible.

My proposal: checking schema.asLowerCased == schema, if it's false, then it's not hive compatible. We need to add StructType.asLowerCased though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually is this a useful change? In the read path we still need to handle the case that, a hive compatible table have inconsistent schema between table properties and metadata.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I'll remove this change. The write-path change you propose isn't necessary because if you have an "invalid" schema (same column name with different case), the Hive metastore will complain and the table will be stored as non-Hive-compatible.

The problem this was trying to avoid is related to the changes in alterTableSchema; if you create a Hive-compatible table here, then later tried to update it with an invalid schema, you'd have a frankentable because the code in alterTableSchema was wrong.

But since this change is mainly about fixing alterTableSchema, you'll now get a proper error in that case instead of ending up with a potentially corrupted table.

val message =
s"Persisting case sensitive data source table $qualifiedTableName into " +
"Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. "
(None, message)

case Some(serde) =>
val message =
s"Persisting file based data source table $qualifiedTableName into " +
Expand Down Expand Up @@ -386,6 +409,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
* can be used as table properties later.
*/
private def tableMetaToTableProps(table: CatalogTable): mutable.Map[String, String] = {
tableMetaToTableProps(table, table.schema)
}

private def tableMetaToTableProps(
table: CatalogTable,
schema: StructType): mutable.Map[String, String] = {
val partitionColumns = table.partitionColumnNames
val bucketSpec = table.bucketSpec

Expand All @@ -394,7 +423,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
// property. In this case, we split the JSON string and store each part as a separate table
// property.
val threshold = conf.get(SCHEMA_STRING_LENGTH_THRESHOLD)
val schemaJsonString = table.schema.json
val schemaJsonString = schema.json
// Split the JSON string.
val parts = schemaJsonString.grouped(threshold).toSeq
properties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString)
Expand Down Expand Up @@ -611,21 +640,39 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
override def alterTableSchema(db: String, table: String, schema: StructType): Unit = withClient {
requireTableExists(db, table)
val rawTable = getRawTable(db, table)
val withNewSchema = rawTable.copy(schema = schema)
verifyColumnNames(withNewSchema)
// Add table metadata such as table schema, partition columns, etc. to table properties.
val updatedTable = withNewSchema.copy(
properties = withNewSchema.properties ++ tableMetaToTableProps(withNewSchema))
try {
client.alterTable(updatedTable)
} catch {
case NonFatal(e) =>
val warningMessage =
s"Could not alter schema of table ${rawTable.identifier.quotedString} in a Hive " +
"compatible way. Updating Hive metastore in Spark SQL specific format."
logWarning(warningMessage, e)
client.alterTable(updatedTable.copy(schema = updatedTable.partitionSchema))
val updatedProperties = rawTable.properties ++ tableMetaToTableProps(rawTable, schema)

// Detect whether this is a Hive-compatible table.
val provider = rawTable.properties.get(DATASOURCE_PROVIDER)
val isHiveCompatible = if (provider.isDefined && provider != Some(DDLUtils.HIVE_PROVIDER)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The whole check might not support all the previous versions. We change these flags multiple times. We might break the support of the table metadata created by the previous version of Spark

How about directly comparing the schemas and checks they are Hive compatible. cc @cloud-fan WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, since we still need to handle the case without the special flag for old spark versions, it makes more sense to just detect hive compatibility by comparing the row table schema and the table schema from table properties.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you mean the check below in the case _ => case, right?

I see that both compatible and non-compatible tables set that property, at least in 2.1, so let me see if there's an easy way to differentiate that without having to replicate all the original checks (which may be hard to do at this point).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the check to use the serde instead. The new tests pass even without the explicit check for DATASOURCE_HIVE_COMPATIBLE when doing that, although I prefer leaving the explicit property for clarity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also checked 2.0 and 1.6 and both seem to do the same thing (both set the provider, and both use a different serde for non-compatible tables), so the check should work for those versions too.

Copy link
Member

@gatorsmile gatorsmile Aug 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a test case for the cross-version compatibility checking? I am just afraid it might not work as expected

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We plan to submit a separate PR for verifying all the related cross-version issues. That needs to verify most DDL statements. You can ignore my previous comment. Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Too late now, already added the tests.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you create a separate utility function for isHiveCompatible in HiveExternalCatalog.scala?

rawTable.properties.get(DATASOURCE_HIVE_COMPATIBLE) match {
case Some(value) =>
value.toBoolean
case _ =>
// If the property is not set, the table may have been created by an old version
// of Spark. Those versions set a "path" property in the table's storage descriptor
// for non-Hive-compatible tables, so use that to detect compatibility.
rawTable.storage.properties.get("path").isDefined
}
} else {
// All non-DS tables are treated as regular Hive tables.
true
}

val updatedTable = if (isHiveCompatible) {
val _updated = rawTable.copy(properties = updatedProperties, schema = schema)
verifyColumnNames(_updated)
_updated
} else {
// If the table is not Hive-compatible, the schema of the table should not be overwritten with
// the updated schema. The previous value stored in the metastore should be preserved; that
// will be either the table's original partition schema, or a placeholder schema inserted by
// the Hive client wrapper if the partition schema was empty.
rawTable.copy(properties = updatedProperties)
}

client.alterTable(updatedTable)
}

override def alterTableStats(
Expand Down Expand Up @@ -1193,6 +1240,7 @@ object HiveExternalCatalog {
val DATASOURCE_SCHEMA_PARTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "partCol."
val DATASOURCE_SCHEMA_BUCKETCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "bucketCol."
val DATASOURCE_SCHEMA_SORTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "sortCol."
val DATASOURCE_HIVE_COMPATIBLE = SPARK_SQL_PREFIX + "hive.compatibility"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use DATASOURCE_PREFIX ?


val STATISTICS_PREFIX = SPARK_SQL_PREFIX + "statistics."
val STATISTICS_TOTAL_SIZE = STATISTICS_PREFIX + "totalSize"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.client.HiveClientImpl._
import org.apache.spark.sql.types._
import org.apache.spark.util.{CircularBuffer, Utils}
Expand Down Expand Up @@ -908,7 +909,13 @@ private[hive] object HiveClientImpl {
}
// after SPARK-19279, it is not allowed to create a hive table with an empty schema,
// so here we should not add a default col schema
Copy link
Member

@viirya viirya Aug 9, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment looks like needing to move accordingly?

if (schema.isEmpty && DDLUtils.isDatasourceTable(table)) {
//
// Because HiveExternalCatalog sometimes writes back "raw" tables that have not been
// completely translated to Spark's view, the provider information needs to be looked
// up in two places.
val provider = table.provider.orElse(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change would have fixed the second exception in the bug (about storing an empty schema); but the code was just ending up in that situation because of the other problems this PR is fixing. This change shouldn't be needed for the fix, but I included it for further correctness.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the second exception? Could you explain more? If this is fixing a different bug, could you open a new JIRA and put it in the PR title?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you look at the bug, there are two exceptions. One gets logged, the second is thrown and caused the test to fail in my 2.1-based branch.

The exception happened because alterTableSchema is writing back the result of getRawTable. That raw table does not have the provider set; instead, it's in the table's properties. This check looks at both places, so that other code that uses getRawTable can properly pass this check.

As I explained in a previous comment, this doesn't happen anymore for alterTableSchema because of the other changes. But there's still code in the catalog class that writes back tables fetched with getRawTable, so this feels safer.

table.properties.get(HiveExternalCatalog.DATASOURCE_PROVIDER))
if (schema.isEmpty && provider != Some(DDLUtils.HIVE_PROVIDER)) {
// This is a hack to preserve existing behavior. Before Spark 2.0, we do not
// set a default serde here (this was done in Hive), and so if the user provides
// an empty schema Hive would automatically populate the schema with a single
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,25 @@ import java.net.URI

import scala.language.existentials

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfterEach
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}

import org.apache.spark.SparkException
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils}
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils}
import org.apache.spark.sql.hive.orc.OrcFileOperator
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.internal.StaticSQLConf._
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert it back?

import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types._
import org.apache.spark.tags.ExtendedHiveTest
import org.apache.spark.util.Utils

// TODO(gatorsmile): combine HiveCatalogedDDLSuite and HiveDDLSuite
Expand Down Expand Up @@ -1998,3 +2001,61 @@ class HiveDDLSuite
}
}
}

/**
* A separate set of DDL tests that uses Hive 2.1 libraries, which behave a little differently
* from the built-in ones.
*/
@ExtendedHiveTest
class Hive_2_1_DDLSuite extends SparkFunSuite with TestHiveSingleton with BeforeAndAfterEach
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we create a separate suite for this? HiveDDLSuite.scala is too big now.

with BeforeAndAfterAll {

// Create a custom HiveExternalCatalog instance with the desired configuration. We cannot
// use SparkSession here since there's already an active on managed by the TestHive object.
private var catalog = {
val warehouse = Utils.createTempDir()
val metastore = Utils.createTempDir()
metastore.delete()
val sparkConf = new SparkConf()
.set(SparkLauncher.SPARK_MASTER, "local")
.set(WAREHOUSE_PATH.key, warehouse.toURI().toString())
.set(CATALOG_IMPLEMENTATION.key, "hive")
.set(HiveUtils.HIVE_METASTORE_VERSION.key, "2.1")
.set(HiveUtils.HIVE_METASTORE_JARS.key, "maven")

val hadoopConf = new Configuration()
hadoopConf.set("hive.metastore.warehouse.dir", warehouse.toURI().toString())
hadoopConf.set("javax.jdo.option.ConnectionURL",
s"jdbc:derby:;databaseName=${metastore.getAbsolutePath()};create=true")
// These options are needed since the defaults in Hive 2.1 cause exceptions with an
// empty metastore db.
hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
hadoopConf.set("hive.metastore.schema.verification", "false")

new HiveExternalCatalog(sparkConf, hadoopConf)
}

override def afterEach: Unit = {
catalog.listTables("default").foreach { t =>
catalog.dropTable("default", t, true, false)
}
spark.sessionState.catalog.reset()
}

override def afterAll(): Unit = {
catalog = null
}

test("SPARK-21617: ALTER TABLE..ADD COLUMNS for DataSource tables") {
spark.sql("CREATE TABLE t1 (c1 int) USING json")
val oldTable = spark.sessionState.catalog.externalCatalog.getTable("default", "t1")
catalog.createTable(oldTable, true)

val newSchema = StructType(oldTable.schema.fields ++ Array(StructField("c2", IntegerType)))
catalog.alterTableSchema("default", "t1", newSchema)

val updatedTable = catalog.getTable("default", "t1")
assert(updatedTable.schema.fieldNames === Array("c1", "c2"))
}

}