Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
52ca902
alter_add_col: initial changes
xwu0226 Nov 21, 2016
f498fa6
add testcases
xwu0226 Dec 1, 2016
522443e
negative testcases
xwu0226 Dec 1, 2016
1af2654
remove non-support testcase
xwu0226 Dec 5, 2016
ec57ee9
fix testcase
xwu0226 Dec 5, 2016
ec74849
update testcases
xwu0226 Dec 7, 2016
8fca889
update testcases
xwu0226 Dec 7, 2016
4a17529
update testcases
xwu0226 Jan 13, 2017
9699128
comments for command caseclass
xwu0226 Jan 20, 2017
9860e5c
udate comments based on review
xwu0226 Jan 21, 2017
dfff364
SPARK-19261: update to support datasource table and add new testcases
xwu0226 Feb 3, 2017
9f23254
remove workaournd for parquet issue since parquet-1.8.2 is now supported
xwu0226 Feb 4, 2017
180092f
SPARK-19261: using white list for datasource table types that support…
xwu0226 Feb 7, 2017
5a8aa80
fix code style
xwu0226 Feb 7, 2017
d3860e6
fix coding style
xwu0226 Feb 7, 2017
55577aa
update upon review
xwu0226 Feb 24, 2017
6fa913a
refactor code from alterTable function
xwu0226 Feb 25, 2017
7231efe
rebase and resolve conflict
xwu0226 Mar 6, 2017
e4e9ecf
resolve conflicts
xwu0226 Mar 9, 2017
75e7441
using ExternalCatalog.alterTableSchema
xwu0226 Mar 14, 2017
9847030
add InMemoryCatalog testcases
xwu0226 Mar 15, 2017
1a383bb
revert change in HiveExernalCatalog.scala
xwu0226 Mar 15, 2017
f994ce9
update upon review
xwu0226 Mar 16, 2017
5bf7360
add checking for duplicate column names
xwu0226 Mar 16, 2017
599c45e
add case sensativity for duplicate name checking and new testcases
xwu0226 Mar 16, 2017
b3edfea
typo
xwu0226 Mar 16, 2017
7d8a515
resolve conflicts and modify testcases
xwu0226 Mar 17, 2017
e895278
update testcases
xwu0226 Mar 17, 2017
e171ac4
move checkduplicate and schema arrangement to SessionCatalog.alterTab…
xwu0226 Mar 17, 2017
4391edd
change SessionCatalog.alterTableAddColumn back to alterTableSchema
xwu0226 Mar 18, 2017
a3fef12
update upon review comments
xwu0226 Mar 18, 2017
1eb7cd3
some minor updates upon review comments
xwu0226 Mar 19, 2017
04ce8f4
update based on review
xwu0226 Mar 21, 2017
7d8437d
update on minor comments
xwu0226 Mar 21, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add checking for duplicate column names
  • Loading branch information
xwu0226 committed Mar 19, 2017
commit 5bf7360834e257a2e0083a5f92f24da73416780d
Original file line number Diff line number Diff line change
Expand Up @@ -454,11 +454,11 @@ abstract class SessionCatalogSuite extends PlanTest {
test("alter table add columns") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also add a negative test case for dropping columns, although we do not support it now.

val externalCatalog = newBasicCatalog()
val sessionCatalog = new SessionCatalog(externalCatalog)
sessionCatalog.createTable(newTable("alter_add", "default"), ignoreIfExists = false)
val oldTab = externalCatalog.getTable("default", "alter_add")
sessionCatalog.alterTableSchema(TableIdentifier("alter_add", Some("default")),
sessionCatalog.createTable(newTable("t1", "default"), ignoreIfExists = false)
val oldTab = externalCatalog.getTable("default", "t1")
sessionCatalog.alterTableSchema(TableIdentifier("t1", Some("default")),
oldTab.schema.add("c3", IntegerType))
val newTab = externalCatalog.getTable("default", "alter_add")
val newTab = externalCatalog.getTable("default", "t1")
assert(newTab.schema.equals(oldTab.schema.add("c3", IntegerType)))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,28 +192,37 @@ case class AlterTableAddColumnsCommand(
val catalog = sparkSession.sessionState.catalog
val catalogTable = verifyAlterTableAddColumn(catalog, table)

// If an exception is thrown here we can just assume the table is uncached;
// this can happen with Hive tables when the underlying catalog is in-memory.
val wasCached = Try(sparkSession.catalog.isCached(table.unquotedString)).getOrElse(false)
if (wasCached) {
try {
sparkSession.catalog.uncacheTable(table.unquotedString)
} catch {
case NonFatal(e) => log.warn(e.toString, e)
}
try {
sparkSession.catalog.uncacheTable(table.unquotedString)
} catch {
case NonFatal(e) =>
log.warn(s"Exception when attempting to uncache table ${table.unquotedString}", e)
}

// Invalidate the table last, otherwise uncaching the table would load the logical plan
// back into the hive metastore cache
catalog.refreshTable(table)
val partitionFields = catalogTable.schema.takeRight(catalogTable.partitionColumnNames.length)
val dataSchema = catalogTable.schema
.take(catalogTable.schema.length - catalogTable.partitionColumnNames.length)
val newSchemaFields = catalogTable.schema
.take(catalogTable.schema.length - catalogTable.partitionColumnNames.length) ++
columns ++ partitionFields
checkDuplication(newSchemaFields.map(_.name))
catalog.alterTableSchema(table, newSchema =
catalogTable.schema.copy(fields = (dataSchema ++ columns ++ partitionFields).toArray))
catalogTable.schema.copy(fields = newSchemaFields.toArray))

Seq.empty[Row]
}

private def checkDuplication(colNames: Seq[String]): Unit = {
if (colNames.distinct.length != colNames.length) {
val duplicateColumns = colNames.groupBy(identity).collect {
case (x, ys) if ys.length > 1 => x
}
throw new AnalysisException(
s"Found duplicate column(s): ${duplicateColumns.mkString(", ")}")
}
}

/**
* ALTER TABLE ADD COLUMNS command does not support temporary view/table,
* view, or datasource table with text, orc formats or external provider.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2180,22 +2180,22 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {

Seq("parquet", "json", "csv").foreach { provider =>
test(s"alter datasource table add columns - $provider") {
withTable("alter_add_ds") {
sql(s"CREATE TABLE alter_add_ds (c1 int) USING $provider")
sql("INSERT INTO alter_add_ds VALUES (1)")
sql("ALTER TABLE alter_add_ds ADD COLUMNS (c2 int)")
withTable("t1") {
sql(s"CREATE TABLE t1 (c1 int) USING $provider")
sql("INSERT INTO t1 VALUES (1)")
sql("ALTER TABLE t1 ADD COLUMNS (c2 int)")
checkAnswer(
sql("SELECT * FROM alter_add_ds"),
sql("SELECT * FROM t1"),
Seq(Row(1, null))
)
checkAnswer(
sql("SELECT * FROM alter_add_ds WHERE c2 is null"),
sql("SELECT * FROM t1 WHERE c2 is null"),
Seq(Row(1, null))
)

sql("INSERT INTO alter_add_ds VALUES (3, 2)")
sql("INSERT INTO t1 VALUES (3, 2)")
checkAnswer(
sql("SELECT * FROM alter_add_ds WHERE c2 = 2"),
sql("SELECT * FROM t1 WHERE c2 = 2"),
Seq(Row(3, 2))
)
}
Expand All @@ -2204,36 +2204,36 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {

Seq("parquet", "json", "csv").foreach { provider =>
test(s"alter datasource table add columns - partitioned - $provider") {
withTable("alter_add_ds") {
sql(s"CREATE TABLE alter_add_ds (c1 int, c2 int) USING $provider PARTITIONED BY (c2)")
sql("INSERT INTO alter_add_ds PARTITION(c2 = 2) VALUES (1)")
sql("ALTER TABLE alter_add_ds ADD COLUMNS (c3 int)")
withTable("t1") {
sql(s"CREATE TABLE t1 (c1 int, c2 int) USING $provider PARTITIONED BY (c2)")
sql("INSERT INTO t1 PARTITION(c2 = 2) VALUES (1)")
sql("ALTER TABLE t1 ADD COLUMNS (c3 int)")
checkAnswer(
sql("SELECT * FROM alter_add_ds"),
sql("SELECT * FROM t1"),
Seq(Row(1, null, 2))
)
checkAnswer(
sql("SELECT * FROM alter_add_ds WHERE c3 is null"),
sql("SELECT * FROM t1 WHERE c3 is null"),
Seq(Row(1, null, 2))
)
sql("INSERT INTO alter_add_ds PARTITION(c2 =1) VALUES (2, 3)")
sql("INSERT INTO t1 PARTITION(c2 =1) VALUES (2, 3)")
checkAnswer(
sql("SELECT * FROM alter_add_ds WHERE c3 = 3"),
sql("SELECT * FROM t1 WHERE c3 = 3"),
Seq(Row(2, 3, 1))
)
checkAnswer(
sql("SELECT * FROM alter_add_ds WHERE c2 = 1"),
sql("SELECT * FROM t1 WHERE c2 = 1"),
Seq(Row(2, 3, 1))
)
}
}
}

test("alter datasource table add columns - text format not supported") {
withTable("alter_add_ds_text") {
sql(s"CREATE TABLE alter_add_ds_text (c1 int) USING text")
withTable("t1") {
sql(s"CREATE TABLE t1 (c1 int) USING text")
val e = intercept[AnalysisException] {
sql("ALTER TABLE alter_add_ds_text ADD COLUMNS (c2 int)")
sql("ALTER TABLE t1 ADD COLUMNS (c2 int)")
}.getMessage
assert(e.contains("does not support ALTER ADD COLUMNS"))
}
Expand All @@ -2258,4 +2258,14 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
assert(e.message.contains("is a VIEW, which does not support ALTER ADD COLUMNS"))
}
}

test("alter table add columns with existing column name") {
withTable("t1") {
sql(s"CREATE TABLE t1 (c1 int) USING PARQUET")
val e = intercept[AnalysisException] {
sql("ALTER TABLE t1 ADD COLUMNS (c1 string)")
}.getMessage
assert(e.contains("Found duplicate column(s)"))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class JDBCSuite extends SparkFunSuite
conn.prepareStatement("insert into test.people values ('mary', 2)").executeUpdate()
conn.prepareStatement(
"insert into test.people values ('joe ''foo'' \"bar\"', 3)").executeUpdate()
conn.commit()

sql(
s"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA
class HiveDDLSuite
extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach {
import testImplicits._
val hiveFormats = Seq("PARQUET", "ORC", "TEXTFILE", "SEQUENCEFILE", "RCFILE", "AVRO")

override def afterEach(): Unit = {
try {
Expand Down Expand Up @@ -1861,55 +1862,55 @@ class HiveDDLSuite
}
}

Seq("PARQUET", "ORC", "TEXTFILE", "SEQUENCEFILE", "RCFILE", "AVRO").foreach { tableType =>
hiveFormats.foreach { tableType =>
test(s"alter hive serde table add columns -- partitioned - $tableType") {
withTable("alter_add_partitioned") {
withTable("tab") {
sql(
s"""
|CREATE TABLE alter_add_partitioned (c1 int, c2 int)
|CREATE TABLE tab (c1 int, c2 int)
|PARTITIONED BY (c3 int) STORED AS $tableType
""".stripMargin)

sql("INSERT INTO alter_add_partitioned PARTITION (c3=1) VALUES (1, 2)")
sql("ALTER TABLE alter_add_partitioned ADD COLUMNS (c4 int)")
sql("INSERT INTO tab PARTITION (c3=1) VALUES (1, 2)")
sql("ALTER TABLE tab ADD COLUMNS (c4 int)")
checkAnswer(
sql("SELECT * FROM alter_add_partitioned WHERE c3 = 1"),
sql("SELECT * FROM tab WHERE c3 = 1"),
Seq(Row(1, 2, null, 1))
)
assert(sql("SELECT * FROM alter_add_partitioned").schema
assert(sql("SELECT * FROM tab").schema
.contains(StructField("c4", IntegerType)))
sql("INSERT INTO alter_add_partitioned PARTITION (c3=2) VALUES (2, 3, 4)")
sql("INSERT INTO tab PARTITION (c3=2) VALUES (2, 3, 4)")
checkAnswer(
sql("SELECT * FROM alter_add_partitioned"),
sql("SELECT * FROM tab"),
Seq(Row(1, 2, null, 1), Row(2, 3, 4, 2))
)
checkAnswer(
sql("SELECT * FROM alter_add_partitioned WHERE c3 = 2 AND c4 IS NOT NULL"),
sql("SELECT * FROM tab WHERE c3 = 2 AND c4 IS NOT NULL"),
Seq(Row(2, 3, 4, 2))
)
}
}
}

Seq("PARQUET", "ORC", "TEXTFILE", "SEQUENCEFILE", "RCFILE", "AVRO").foreach { tableType =>
hiveFormats.foreach { tableType =>
test(s"alter hive serde table add columns -- with predicate - $tableType ") {
withTable("alter_add_predicate") {
sql(s"CREATE TABLE alter_add_predicate (c1 int, c2 int) STORED AS $tableType")
sql("INSERT INTO alter_add_predicate VALUES (1, 2)")
sql("ALTER TABLE alter_add_predicate ADD COLUMNS (c4 int)")
withTable("tab") {
sql(s"CREATE TABLE tab (c1 int, c2 int) STORED AS $tableType")
sql("INSERT INTO tab VALUES (1, 2)")
sql("ALTER TABLE tab ADD COLUMNS (c4 int)")
checkAnswer(
sql("SELECT * FROM alter_add_predicate WHERE c4 IS NULL"),
sql("SELECT * FROM tab WHERE c4 IS NULL"),
Seq(Row(1, 2, null))
)
assert(sql("SELECT * FROM alter_add_predicate").schema
assert(sql("SELECT * FROM tab").schema
.contains(StructField("c4", IntegerType)))
sql("INSERT INTO alter_add_predicate VALUES (2, 3, 4)")
sql("INSERT INTO tab VALUES (2, 3, 4)")
checkAnswer(
sql("SELECT * FROM alter_add_predicate WHERE c4 = 4 "),
sql("SELECT * FROM tab WHERE c4 = 4 "),
Seq(Row(2, 3, 4))
)
checkAnswer(
sql("SELECT * FROM alter_add_predicate"),
sql("SELECT * FROM tab"),
Seq(Row(1, 2, null), Row(2, 3, 4))
)
}
Expand All @@ -1919,13 +1920,23 @@ class HiveDDLSuite
Seq("orc", "ORC", "org.apache.spark.sql.hive.orc",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's remove it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. will do.

"org.apache.spark.sql.hive.orc.DefaultSource").foreach { source =>
test(s"alter datasource table add columns - $source format not supported") {
withTable("alter_add_ds_text") {
sql(s"CREATE TABLE alter_add_ds_text (c1 int) USING $source")
withTable("tab") {
sql(s"CREATE TABLE tab (c1 int) USING $source")
val e = intercept[AnalysisException] {
sql("ALTER TABLE alter_add_ds_text ADD COLUMNS (c2 int)")
sql("ALTER TABLE tab ADD COLUMNS (c2 int)")
}.getMessage
assert(e.contains("does not support ALTER ADD COLUMNS"))
}
}
}

test("alter table add columns with existing partition column name") {
withTable("tab") {
sql("CREATE TABLE tab (c1 int) PARTITIONED BY (c2 int) STORED AS PARQUET")
val e = intercept[AnalysisException] {
sql("ALTER TABLE tab ADD COLUMNS (c2 string)")
}.getMessage
assert(e.contains("Found duplicate column(s)"))
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You still can combine it with the one in InMemoryCatalogedDDLSuite by using isUsingHiveMetastore

}