Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,8 @@ statement
partitionSpec+ #addTablePartition
| ALTER TABLE multipartIdentifier
from=partitionSpec RENAME TO to=partitionSpec #renameTablePartition
| ALTER TABLE tableIdentifier
| ALTER (TABLE | VIEW) multipartIdentifier
DROP (IF EXISTS)? partitionSpec (',' partitionSpec)* PURGE? #dropTablePartitions
| ALTER VIEW tableIdentifier
DROP (IF EXISTS)? partitionSpec (',' partitionSpec)* #dropTablePartitions
| ALTER TABLE multipartIdentifier
(partitionSpec)? SET locationSpec #setTableLocation
| ALTER TABLE multipartIdentifier RECOVER PARTITIONS #recoverPartitions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2967,4 +2967,30 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
visitNonOptionalPartitionSpec(ctx.from),
visitNonOptionalPartitionSpec(ctx.to))
}

/**
* Create an [[AlterTableDropPartitionStatement]]
*
* For example:
* {{{
* ALTER TABLE multi_part_name DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...]
* [PURGE];
* ALTER VIEW view DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...];
* }}}
*
* ALTER VIEW ... DROP PARTITION ... is not supported because the concept of partitioning
* is associated with physical tables
*/
override def visitDropTablePartitions(
ctx: DropTablePartitionsContext): LogicalPlan = withOrigin(ctx) {
if (ctx.VIEW != null) {
operationNotAllowed("ALTER VIEW ... DROP PARTITION", ctx)
}
AlterTableDropPartitionStatement(
visitMultipartIdentifier(ctx.multipartIdentifier),
ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec),
ifExists = ctx.EXISTS != null,
purge = ctx.PURGE != null,
retainData = false)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,16 @@ case class AlterTableRenamePartitionStatement(
from: TablePartitionSpec,
to: TablePartitionSpec) extends ParsedStatement

/**
* ALTER TABLE ... DROP PARTITION command, as parsed from SQL
*/
case class AlterTableDropPartitionStatement(
tableName: Seq[String],
specs: Seq[TablePartitionSpec],
ifExists: Boolean,
purge: Boolean,
retainData: Boolean) extends ParsedStatement

/**
* ALTER VIEW ... SET TBLPROPERTIES command, as parsed from SQL.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1228,6 +1228,56 @@ class DDLParserSuite extends AnalysisTest {
comparePlans(parsed2, expected2)
}

// ALTER TABLE table_name DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...]
// ALTER VIEW table_name DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...]
test("alter table: drop partition") {
val sql1_table =
"""
|ALTER TABLE table_name DROP IF EXISTS PARTITION
|(dt='2008-08-08', country='us'), PARTITION (dt='2009-09-09', country='uk')
""".stripMargin
val sql2_table =
"""
|ALTER TABLE table_name DROP PARTITION
|(dt='2008-08-08', country='us'), PARTITION (dt='2009-09-09', country='uk')
""".stripMargin
val sql1_view = sql1_table.replace("TABLE", "VIEW")
val sql2_view = sql2_table.replace("TABLE", "VIEW")

val parsed1_table = parsePlan(sql1_table)
val parsed2_table = parsePlan(sql2_table)
val parsed1_purge = parsePlan(sql1_table + " PURGE")

assertUnsupported(sql1_view)
assertUnsupported(sql2_view)

val expected1_table = AlterTableDropPartitionStatement(
Seq("table_name"),
Seq(
Map("dt" -> "2008-08-08", "country" -> "us"),
Map("dt" -> "2009-09-09", "country" -> "uk")),
ifExists = true,
purge = false,
retainData = false)
val expected2_table = expected1_table.copy(ifExists = false)
val expected1_purge = expected1_table.copy(purge = true)

comparePlans(parsed1_table, expected1_table)
comparePlans(parsed2_table, expected2_table)
comparePlans(parsed1_purge, expected1_purge)

val sql3_table = "ALTER TABLE a.b.c DROP IF EXISTS PARTITION (ds='2017-06-10')"
val expected3_table = AlterTableDropPartitionStatement(
Seq("a", "b", "c"),
Seq(Map("ds" -> "2017-06-10")),
ifExists = true,
purge = false,
retainData = false)

val parsed3_table = parsePlan(sql3_table)
comparePlans(parsed3_table, expected3_table)
}

private case class TableSpec(
name: Seq[String],
schema: Option[StructType],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,15 @@ class ResolveSessionCatalog(
v1TableName.asTableIdentifier,
from,
to)

case AlterTableDropPartitionStatement(tableName, specs, ifExists, purge, retainData) =>
val v1TableName = parseV1Table(tableName, "ALTER TABLE DROP PARTITION")
AlterTableDropPartitionCommand(
v1TableName.asTableIdentifier,
specs,
ifExists,
purge,
retainData)
}

private def parseV1Table(tableName: Seq[String], sql: String): Seq[String] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,31 +461,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
ctx.EXISTS != null)
}

/**
* Create an [[AlterTableDropPartitionCommand]] command
*
* For example:
* {{{
* ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE];
* ALTER VIEW view DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...];
* }}}
*
* ALTER VIEW ... DROP PARTITION ... is not supported because the concept of partitioning
* is associated with physical tables
*/
override def visitDropTablePartitions(
ctx: DropTablePartitionsContext): LogicalPlan = withOrigin(ctx) {
if (ctx.VIEW != null) {
operationNotAllowed("ALTER VIEW ... DROP PARTITION", ctx)
}
AlterTableDropPartitionCommand(
visitTableIdentifier(ctx.tableIdentifier),
ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec),
ifExists = ctx.EXISTS != null,
purge = ctx.PURGE != null,
retainData = false)
}

/**
* Create a [[AlterTableChangeColumnCommand]] command.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1396,6 +1396,17 @@ class DataSourceV2SQLSuite
}
}

test("ALTER TABLE DROP PARTITIONS") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo PARTITIONED BY (id)")
val e = intercept[AnalysisException] {
sql(s"ALTER TABLE $t DROP PARTITION (id=1)")
}
assert(e.message.contains("ALTER TABLE DROP PARTITION is only supported with v1 tables"))
}
}

private def testV1Command(sqlCommand: String, sqlParams: String): Unit = {
val e = intercept[AnalysisException] {
sql(s"$sqlCommand $sqlParams")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -566,45 +566,6 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession {
""".stripMargin)
}

// ALTER TABLE table_name DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...]
// ALTER VIEW table_name DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...]
test("alter table/view: drop partitions") {
val sql1_table =
"""
|ALTER TABLE table_name DROP IF EXISTS PARTITION
|(dt='2008-08-08', country='us'), PARTITION (dt='2009-09-09', country='uk')
""".stripMargin
val sql2_table =
"""
|ALTER TABLE table_name DROP PARTITION
|(dt='2008-08-08', country='us'), PARTITION (dt='2009-09-09', country='uk')
""".stripMargin
val sql1_view = sql1_table.replace("TABLE", "VIEW")
val sql2_view = sql2_table.replace("TABLE", "VIEW")

val parsed1_table = parser.parsePlan(sql1_table)
val parsed2_table = parser.parsePlan(sql2_table)
val parsed1_purge = parser.parsePlan(sql1_table + " PURGE")
assertUnsupported(sql1_view)
assertUnsupported(sql2_view)

val tableIdent = TableIdentifier("table_name", None)
val expected1_table = AlterTableDropPartitionCommand(
tableIdent,
Seq(
Map("dt" -> "2008-08-08", "country" -> "us"),
Map("dt" -> "2009-09-09", "country" -> "uk")),
ifExists = true,
purge = false,
retainData = false)
val expected2_table = expected1_table.copy(ifExists = false)
val expected1_purge = expected1_table.copy(purge = true)

comparePlans(parsed1_table, expected1_table)
comparePlans(parsed2_table, expected2_table)
comparePlans(parsed1_purge, expected1_purge)
}

test("alter table: archive partition (not supported)") {
assertUnsupported("ALTER TABLE table_name ARCHIVE PARTITION (dt='2008-08-08', country='us')")
}
Expand Down