Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,8 @@ case class AlterTableSetPropertiesCommand(

override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
DDLUtils.verifyAlterTableType(catalog, tableName, isView)
val table = catalog.getTableMetadata(tableName)
DDLUtils.verifyAlterTableType(catalog, table, isView)
// This overrides old properties
val newTable = table.copy(properties = table.properties ++ properties)
catalog.alterTable(newTable)
Expand All @@ -264,8 +264,8 @@ case class AlterTableUnsetPropertiesCommand(

override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
DDLUtils.verifyAlterTableType(catalog, tableName, isView)
val table = catalog.getTableMetadata(tableName)
DDLUtils.verifyAlterTableType(catalog, table, isView)
if (!ifExists) {
propKeys.foreach { k =>
if (!table.properties.contains(k)) {
Expand Down Expand Up @@ -305,6 +305,7 @@ case class AlterTableSerDePropertiesCommand(
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
DDLUtils.verifyAlterTableType(catalog, table, isView = false)
// For datasource tables, disallow setting serde or specifying partition
if (partSpec.isDefined && DDLUtils.isDatasourceTable(table)) {
throw new AnalysisException("Operation not allowed: ALTER TABLE SET " +
Expand Down Expand Up @@ -354,6 +355,7 @@ case class AlterTableAddPartitionCommand(
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
DDLUtils.verifyAlterTableType(catalog, table, isView = false)
if (DDLUtils.isDatasourceTable(table)) {
throw new AnalysisException(
"ALTER TABLE ADD PARTITION is not allowed for tables defined using the datasource API")
Expand Down Expand Up @@ -383,7 +385,14 @@ case class AlterTableRenamePartitionCommand(
extends RunnableCommand {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the behaviour before?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We got the following errors:

Partition not found in table 'tab1' database 'default':
ds -> 2008-04-08
hr -> 11;
org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException: Partition not found in table 'tab1' database 'default':
ds -> 2008-04-08
hr -> 11;
    at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$renamePartitions$1$$anonfun$apply$mcV$sp$5$$anonfun$16.apply(HiveClientImpl.scala:502)
    at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$renamePartitions$1$$anonfun$apply$mcV$sp$5$$anonfun$16.apply(HiveClientImpl.scala:502)

Basically, Hive does not recognize the partitioning info of data source tables.

override def run(sparkSession: SparkSession): Seq[Row] = {
sparkSession.sessionState.catalog.renamePartitions(
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
if (DDLUtils.isDatasourceTable(table)) {
throw new AnalysisException(
"ALTER TABLE RENAME PARTITION is not allowed for tables defined using the datasource API")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can just say data source table, to be consistent with other places.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will do it. Thanks!

}
DDLUtils.verifyAlterTableType(catalog, table, isView = false)
catalog.renamePartitions(
tableName, Seq(oldPartition), Seq(newPartition))
Seq.empty[Row]
}
Expand Down Expand Up @@ -414,6 +423,7 @@ case class AlterTableDropPartitionCommand(
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
DDLUtils.verifyAlterTableType(catalog, table, isView = false)
if (DDLUtils.isDatasourceTable(table)) {
throw new AnalysisException(
"ALTER TABLE DROP PARTITIONS is not allowed for tables defined using the datasource API")
Expand Down Expand Up @@ -475,6 +485,7 @@ case class AlterTableRecoverPartitionsCommand(
s"Operation not allowed: $cmd on temporary tables: $tableName")
}
val table = catalog.getTableMetadata(tableName)
DDLUtils.verifyAlterTableType(catalog, table, isView = false)
if (DDLUtils.isDatasourceTable(table)) {
throw new AnalysisException(
s"Operation not allowed: $cmd on datasource tables: $tableName")
Expand Down Expand Up @@ -650,6 +661,7 @@ case class AlterTableSetLocationCommand(
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
DDLUtils.verifyAlterTableType(catalog, table, isView = false)
partitionSpec match {
case Some(spec) =>
// Partition spec is specified, so we set the location only for this partition
Expand Down Expand Up @@ -688,19 +700,26 @@ object DDLUtils {
/**
* If the command ALTER VIEW is to alter a table or ALTER TABLE is to alter a view,
* issue an exception [[AnalysisException]].
*
* Note: temporary views can be altered by both ALTER VIEW and ALTER TABLE commands,
* since temporary views can be also created by CREATE TEMPORARY TABLE. In the future,
* when we decided to drop the support, we should disallow users to alter temporary views
* by ALTER TABLE.
*/
def verifyAlterTableType(
catalog: SessionCatalog,
tableIdentifier: TableIdentifier,
tableMetadata: CatalogTable,
isView: Boolean): Unit = {
catalog.getTableMetadataOption(tableIdentifier).map(_.tableType match {
case CatalogTableType.VIEW if !isView =>
throw new AnalysisException(
"Cannot alter a view with ALTER TABLE. Please use ALTER VIEW instead")
case o if o != CatalogTableType.VIEW && isView =>
throw new AnalysisException(
s"Cannot alter a table with ALTER VIEW. Please use ALTER TABLE instead")
case _ =>
})
if (!catalog.isTemporaryTable(tableMetadata.identifier)) {
tableMetadata.tableType match {
case CatalogTableType.VIEW if !isView =>
throw new AnalysisException(
"Cannot alter a view with ALTER TABLE. Please use ALTER VIEW instead")
case o if o != CatalogTableType.VIEW && isView =>
throw new AnalysisException(
s"Cannot alter a table with ALTER VIEW. Please use ALTER TABLE instead")
case _ =>
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ case class AlterTableRenameCommand(

override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
DDLUtils.verifyAlterTableType(catalog, oldName, isView)
val table = catalog.getTableMetadata(oldName)
DDLUtils.verifyAlterTableType(catalog, table, isView)
// If this is a temp view, just rename the view.
// Otherwise, if this is a real table, we also need to uncache and invalidate the table.
val isTemporary = catalog.isTemporaryTable(oldName)
Expand All @@ -177,7 +178,6 @@ case class AlterTableRenameCommand(
}
}
// For datasource tables, we also need to update the "path" serde property
val table = catalog.getTableMetadata(oldName)
if (DDLUtils.isDatasourceTable(table) && table.tableType == CatalogTableType.MANAGED) {
val newPath = catalog.defaultTablePath(newTblName)
val newTable = table.withNewStorage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,18 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
assert(spark.table("teachers").collect().toSeq == df.collect().toSeq)
}

test("rename temporary table") {
withTempView("tab1", "tab2") {
spark.range(10).createOrReplaceTempView("tab1")
sql("ALTER TABLE tab1 RENAME TO tab2")
checkAnswer(spark.table("tab2"), spark.range(10).toDF())
intercept[NoSuchTableException] { spark.table("tab1") }
sql("ALTER VIEW tab2 RENAME TO tab1")
checkAnswer(spark.table("tab1"), spark.range(10).toDF())
intercept[NoSuchTableException] { spark.table("tab2") }
}
}

test("rename temporary table - destination table already exists") {
withTempView("tab1", "tab2") {
sql(
Expand Down Expand Up @@ -871,25 +883,16 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
test("alter table: rename partition") {
val catalog = spark.sessionState.catalog
val tableIdent = TableIdentifier("tab1", Some("dbx"))
val part1 = Map("a" -> "1", "b" -> "q")
val part2 = Map("a" -> "2", "b" -> "c")
val part3 = Map("a" -> "3", "b" -> "p")
createDatabase(catalog, "dbx")
createTable(catalog, tableIdent)
createTablePartition(catalog, part1, tableIdent)
createTablePartition(catalog, part2, tableIdent)
createTablePartition(catalog, part3, tableIdent)
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
Set(part1, part2, part3))
createPartitionedTable(tableIdent, isDatasourceTable = false)
sql("ALTER TABLE dbx.tab1 PARTITION (a='1', b='q') RENAME TO PARTITION (a='100', b='p')")
sql("ALTER TABLE dbx.tab1 PARTITION (a='2', b='c') RENAME TO PARTITION (a='200', b='c')")
sql("ALTER TABLE dbx.tab1 PARTITION (a='2', b='c') RENAME TO PARTITION (a='20', b='c')")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok then it's fine...

assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
Set(Map("a" -> "100", "b" -> "p"), Map("a" -> "200", "b" -> "c"), part3))
Set(Map("a" -> "100", "b" -> "p"), Map("a" -> "20", "b" -> "c"), Map("a" -> "3", "b" -> "p")))
// rename without explicitly specifying database
catalog.setCurrentDatabase("dbx")
sql("ALTER TABLE tab1 PARTITION (a='100', b='p') RENAME TO PARTITION (a='10', b='p')")
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
Set(Map("a" -> "10", "b" -> "p"), Map("a" -> "200", "b" -> "c"), part3))
Set(Map("a" -> "10", "b" -> "p"), Map("a" -> "20", "b" -> "c"), Map("a" -> "3", "b" -> "p")))
// table to alter does not exist
intercept[NoSuchTableException] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why change the test order? I think it's natural to test non-existing table first, then non-existing partition.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, let me change it back.

sql("ALTER TABLE does_not_exist PARTITION (c='3') RENAME TO PARTITION (c='333')")
Expand All @@ -900,6 +903,38 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
}

test("alter table: rename partition (datasource table)") {
createPartitionedTable(TableIdentifier("tab1", Some("dbx")), isDatasourceTable = true)
val e = intercept[AnalysisException] {
sql("ALTER TABLE dbx.tab1 PARTITION (a='1', b='q') RENAME TO PARTITION (a='100', b='p')")
}.getMessage
assert(e.contains(
"ALTER TABLE RENAME PARTITION is not allowed for tables defined using the datasource API"))
// table to alter does not exist
intercept[NoSuchTableException] {
sql("ALTER TABLE does_not_exist PARTITION (c='3') RENAME TO PARTITION (c='333')")
}
}

private def createPartitionedTable(
tableIdent: TableIdentifier,
isDatasourceTable: Boolean): Unit = {
val catalog = spark.sessionState.catalog
val part1 = Map("a" -> "1", "b" -> "q")
val part2 = Map("a" -> "2", "b" -> "c")
val part3 = Map("a" -> "3", "b" -> "p")
createDatabase(catalog, "dbx")
createTable(catalog, tableIdent)
createTablePartition(catalog, part1, tableIdent)
createTablePartition(catalog, part2, tableIdent)
createTablePartition(catalog, part3, tableIdent)
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
Set(part1, part2, part3))
if (isDatasourceTable) {
convertToDatasourceTable(catalog, tableIdent)
}
}

test("show tables") {
withTempView("show1a", "show2b") {
sql(
Expand Down Expand Up @@ -1246,7 +1281,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
// table to alter does not exist
intercept[AnalysisException] {
sql("ALTER TABLE does_not_exist SET SERDEPROPERTIES ('x' = 'y')")
sql("ALTER TABLE does_not_exist PARTITION (a=1, b=2) SET SERDEPROPERTIES ('x' = 'y')")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why change this test? If we wanna test the PARTITION code path, we should add a new test

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to fix a bug in testSetSerdePartition. This function is to test ALTER TABLE PARTITION Set SERDEPROPERTIES. It sounds like the previous coder just copied and pasted from another function.

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,16 @@ class HiveDDLSuite
}
}

private def assertErrorForAlterTableOnView(sqlText: String): Unit = {
val message = intercept[AnalysisException](sql(sqlText)).getMessage
assert(message.contains("Cannot alter a view with ALTER TABLE. Please use ALTER VIEW instead"))
}

private def assertErrorForAlterViewOnTable(sqlText: String): Unit = {
val message = intercept[AnalysisException](sql(sqlText)).getMessage
assert(message.contains("Cannot alter a table with ALTER VIEW. Please use ALTER TABLE instead"))
}

test("alter views and alter table - misuse") {
val tabName = "tab1"
withTable(tabName) {
Expand All @@ -317,45 +327,42 @@ class HiveDDLSuite

assert(catalog.tableExists(TableIdentifier(tabName)))
assert(catalog.tableExists(TableIdentifier(oldViewName)))
assert(!catalog.tableExists(TableIdentifier(newViewName)))

var message = intercept[AnalysisException] {
sql(s"ALTER VIEW $tabName RENAME TO $newViewName")
}.getMessage
assert(message.contains(
"Cannot alter a table with ALTER VIEW. Please use ALTER TABLE instead"))
assertErrorForAlterViewOnTable(s"ALTER VIEW $tabName RENAME TO $newViewName")

message = intercept[AnalysisException] {
sql(s"ALTER VIEW $tabName SET TBLPROPERTIES ('p' = 'an')")
}.getMessage
assert(message.contains(
"Cannot alter a table with ALTER VIEW. Please use ALTER TABLE instead"))
assertErrorForAlterTableOnView(s"ALTER TABLE $oldViewName RENAME TO $newViewName")

message = intercept[AnalysisException] {
sql(s"ALTER VIEW $tabName UNSET TBLPROPERTIES ('p')")
}.getMessage
assert(message.contains(
"Cannot alter a table with ALTER VIEW. Please use ALTER TABLE instead"))
assertErrorForAlterViewOnTable(s"ALTER VIEW $tabName SET TBLPROPERTIES ('p' = 'an')")

message = intercept[AnalysisException] {
sql(s"ALTER TABLE $oldViewName RENAME TO $newViewName")
}.getMessage
assert(message.contains(
"Cannot alter a view with ALTER TABLE. Please use ALTER VIEW instead"))
assertErrorForAlterTableOnView(s"ALTER TABLE $oldViewName SET TBLPROPERTIES ('p' = 'an')")

message = intercept[AnalysisException] {
sql(s"ALTER TABLE $oldViewName SET TBLPROPERTIES ('p' = 'an')")
}.getMessage
assert(message.contains(
"Cannot alter a view with ALTER TABLE. Please use ALTER VIEW instead"))
assertErrorForAlterViewOnTable(s"ALTER VIEW $tabName UNSET TBLPROPERTIES ('p')")

message = intercept[AnalysisException] {
sql(s"ALTER TABLE $oldViewName UNSET TBLPROPERTIES ('p')")
}.getMessage
assert(message.contains(
"Cannot alter a view with ALTER TABLE. Please use ALTER VIEW instead"))
assertErrorForAlterTableOnView(s"ALTER TABLE $oldViewName UNSET TBLPROPERTIES ('p')")

assertErrorForAlterTableOnView(s"ALTER TABLE $oldViewName SET LOCATION '/path/to/home'")

assertErrorForAlterTableOnView(s"ALTER TABLE $oldViewName SET SERDE 'whatever'")

assertErrorForAlterTableOnView(s"ALTER TABLE $oldViewName SET SERDEPROPERTIES ('x' = 'y')")

assertErrorForAlterTableOnView(
s"ALTER TABLE $oldViewName PARTITION (a=1, b=2) SET SERDEPROPERTIES ('x' = 'y')")

assertErrorForAlterTableOnView(
s"ALTER TABLE $oldViewName ADD IF NOT EXISTS PARTITION (a='4', b='8')")

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need to test isAlterView = true for ADD PARTITION and DROP PARTITION. Let me add them now...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uh, both are not supported. nvm. I think we already cover all the cases. Code is ready to review. Thanks!

assertErrorForAlterTableOnView(s"ALTER TABLE $oldViewName DROP IF EXISTS PARTITION (a='2')")

assertErrorForAlterTableOnView(s"ALTER TABLE $oldViewName RECOVER PARTITIONS")

assertErrorForAlterTableOnView(
s"ALTER TABLE $oldViewName PARTITION (a='1') RENAME TO PARTITION (a='100')")

assert(catalog.tableExists(TableIdentifier(tabName)))
assert(catalog.tableExists(TableIdentifier(oldViewName)))
assert(!catalog.tableExists(TableIdentifier(newViewName)))
}
}
}
Expand Down