Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2356,18 +2356,9 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}.getMessage
assert(e.contains("Found duplicate column(s)"))
} else {
if (isUsingHiveMetastore) {
// hive catalog will still complains that c1 is duplicate column name because hive
// identifiers are case insensitive.
val e = intercept[AnalysisException] {
sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
}.getMessage
assert(e.contains("HiveException"))
} else {
sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
assert(spark.table("t1").schema
.equals(new StructType().add("c1", IntegerType).add("C1", StringType)))
}
sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
assert(spark.table("t1").schema
.equals(new StructType().add("c1", IntegerType).add("C1", StringType)))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -616,15 +616,24 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
// Add table metadata such as table schema, partition columns, etc. to table properties.
val updatedTable = withNewSchema.copy(
properties = withNewSchema.properties ++ tableMetaToTableProps(withNewSchema))

// If it's a data source table, make sure the original schema is left unchanged; the
// actual schema is recorded as a table property.
val tableToStore = if (DDLUtils.isDatasourceTable(updatedTable)) {
updatedTable.copy(schema = rawTable.schema)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do support ALTER TABLE ADD COLUMN, which relies on alterTableSchema . The data source tables can be read by Hive if possible. Thus, I think we should not set the schema unchanged.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just checked the JIRA description. This sounds a bug we need to resolve. It needs a little bit complex to fix it. We need to follow what we did for create table. cc @xwu0226 Please help @vanzin address this issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I see that this will break DS tables created with newHiveCompatibleMetastoreTable instead of newSparkSQLSpecificMetastoreTable.

For the former, the only thing I can see that could be used to identify the case is the presence of serde properties in the table metadata. That could replace the DDLUtils.isDatasourceTable(updatedTable) check to see whether the schema needs to be updated.

For the latter case, I see that newSparkSQLSpecificMetastoreTable stores the partition schema as the table's schema (which sort of explains the weird exception handling I saw). So this code is only correct if the partition schema cannot change. Where is the partition schema for a DS table defined? Is that under control of the user (or the data source implementation)? Because if it can change you can run into pretty much the same issue.

} else {
updatedTable
}

try {
client.alterTable(updatedTable)
client.alterTable(tableToStore)
} catch {
case NonFatal(e) =>
val warningMessage =
s"Could not alter schema of table ${rawTable.identifier.quotedString} in a Hive " +
"compatible way. Updating Hive metastore in Spark SQL specific format."
logWarning(warningMessage, e)
client.alterTable(updatedTable.copy(schema = updatedTable.partitionSchema))
client.alterTable(updatedTable.copy(schema = tableToStore.partitionSchema))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the exception handling code I mentioned in the bug report which seems very suspicious. I had half a desire to just remove it, but maybe someone can explain to me why this code makes sense.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this part is directly related to the logic which converts the table metadata to Spark SQL specific format:

def newSparkSQLSpecificMetastoreTable(): CatalogTable = {
table.copy(
// Hive only allows directory paths as location URIs while Spark SQL data source tables
// also allow file paths. For non-hive-compatible format, we should not set location URI
// to avoid hive metastore to throw exception.
storage = table.storage.copy(
locationUri = None,
properties = storagePropsWithLocation),
schema = table.partitionSchema,
bucketSpec = None,
properties = table.properties ++ tableProperties)
}

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.client.HiveClientImpl._
import org.apache.spark.sql.types._
import org.apache.spark.util.{CircularBuffer, Utils}
Expand Down Expand Up @@ -413,7 +414,10 @@ private[hive] class HiveClientImpl(
unsupportedFeatures += "partitioned view"
}

val properties = Option(h.getParameters).map(_.asScala.toMap).orNull
val properties = Option(h.getParameters).map(_.asScala.toMap).getOrElse(Map())

val provider = properties.get(HiveExternalCatalog.DATASOURCE_PROVIDER)
.orElse(Some(DDLUtils.HIVE_PROVIDER))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously we don't store provider for Hive serde table. Some existing logic to decide if a table retrieved from metastore is a datasource table may be broken due to this change.

Copy link
Member

@viirya viirya Aug 3, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh. Nvm. Looks like we access the key DATASOURCE_PROVIDER in table.properties for that purpose. This should be safe. Anyway, actually we will set provider for CatalogTable later when restoring the table read from metastore. Maybe this is redundant.

Another concern is we previously don't restore provider for a view, please refer to

. By this change, we will set provider to HIVE_PROVIDER too for view.

Copy link
Contributor Author

@vanzin vanzin Aug 3, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this is redundant.

This was definitely not redundant in my testing. The metadata loaded from the metastore in HiveExternalCatalog.alterTableSchema definitely did not have the provider set when I debugged this. In fact the test I wrote fails if I remove this code (or comment the line that sets "provider" a few lines below).

Perhaps some other part of the code sets it in a different code path, but this would make that part of the code redundant, not the other way around.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The restoring you mention is done in HiveExternalCatalog.restoreTableMetadata. Let me see if I can use that instead of making this change.


// Hive-generated Statistics are also recorded in ignoredProperties
val ignoredProperties = scala.collection.mutable.Map.empty[String, String]
Expand Down Expand Up @@ -468,6 +472,7 @@ private[hive] class HiveClientImpl(
throw new AnalysisException("Hive index table is not supported.")
},
schema = schema,
provider = provider,
partitionColumnNames = partCols.map(_.name),
// If the table is written by Spark, we will put bucketing information in table properties,
// and will always overwrite the bucket spec in hive metastore by the bucketing information
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,20 @@ import java.net.URI
import scala.language.existentials

import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfterEach
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}

import org.apache.spark.SparkException
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils}
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils}
import org.apache.spark.sql.hive.orc.OrcFileOperator
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.internal.StaticSQLConf._
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -1998,3 +1999,46 @@ class HiveDDLSuite
}
}
}

/**
* A separate set of DDL tests that uses Hive 2.1 libraries, which behave a little differently
* from the built-in ones.
*/
class HiveDDLSuite_2_1 extends SparkFunSuite with BeforeAndAfterEach with BeforeAndAfterAll {

private val spark = {
val warehouse = Utils.createTempDir()
val metastore = Utils.createTempDir()
metastore.delete()
SparkSession.builder()
.config(SparkLauncher.SPARK_MASTER, "local")
.config(WAREHOUSE_PATH.key, warehouse.toURI().toString())
.config(CATALOG_IMPLEMENTATION.key, "hive")
.config(HiveUtils.HIVE_METASTORE_VERSION.key, "2.1")
.config(HiveUtils.HIVE_METASTORE_JARS.key, "maven")
.config("spark.hadoop.javax.jdo.option.ConnectionURL",
s"jdbc:derby:;databaseName=${metastore.getAbsolutePath()};create=true")
// These options are needed since the defaults in Hive 2.1 cause exceptions with an
// empty metastore db.
.config("spark.hadoop.datanucleus.schema.autoCreateAll", "true")
.config("spark.hadoop.hive.metastore.schema.verification", "false")
.getOrCreate()
}

override def afterEach: Unit = {
spark.sessionState.catalog.reset()
}

override def afterAll(): Unit = {
spark.close()
}

test("SPARK-21617: ALTER TABLE..ADD COLUMNS for DataSource tables") {
spark.sql("CREATE TABLE t1 (c1 int) USING json")
spark.sql("ALTER TABLE t1 ADD COLUMNS (c2 int)")

val df = spark.table("t1")
assert(df.schema.fieldNames === Array("c1", "c2"))
}

}