Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ statement
| LOAD DATA LOCAL? INPATH path=STRING OVERWRITE? INTO TABLE
tableIdentifier partitionSpec? #loadData
| TRUNCATE TABLE tableIdentifier partitionSpec? #truncateTable
| MSCK REPAIR TABLE tableIdentifier #repairTable
| MSCK REPAIR TABLE multipartIdentifier #repairTable
| op=(ADD | LIST) identifier .*? #manageResource
| SET ROLE .*? #failNativeCommand
| SET .*? #setConfiguration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2708,4 +2708,16 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
tableName, Option(visitIdentifierSeq(ctx.identifierSeq())), allColumns = false)
}
}

/**
* Create a [[RepairTableStatement]].
*
* For example:
* {{{
* MSCK REPAIR TABLE multi_part_name
* }}}
*/
override def visitRepairTable(ctx: RepairTableContext): LogicalPlan = withOrigin(ctx) {
RepairTableStatement(visitMultipartIdentifier(ctx.multipartIdentifier()))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -311,3 +311,8 @@ case class AnalyzeColumnStatement(
require(columnNames.isDefined ^ allColumns, "Parameter `columnNames` or `allColumns` are " +
"mutually exclusive. Only one of them should be specified.")
}

/**
* A REPAIR TABLE statement, as parsed from SQL
*/
case class RepairTableStatement(tableName: Seq[String]) extends ParsedStatement
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,12 @@ class DDLParserSuite extends AnalysisTest {
"missing 'COLUMNS' at '<EOF>'")
}

test("MSCK REPAIR table") {
comparePlans(
parsePlan("MSCK REPAIR TABLE a.b.c"),
RepairTableStatement(Seq("a", "b", "c")))
}

private case class TableSpec(
name: Seq[String],
schema: Option[StructType],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, LookupCatalog, TableChange, V1Table}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, AnalyzeColumnCommand, AnalyzePartitionCommand, AnalyzeTableCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand, ShowTablesCommand}
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableRecoverPartitionsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, AnalyzeColumnCommand, AnalyzePartitionCommand, AnalyzeTableCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand, ShowTablesCommand}
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource}
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -266,22 +266,30 @@ class ResolveSessionCatalog(
ShowTablesCommand(None, pattern)

case AnalyzeTableStatement(tableName, partitionSpec, noScan) =>
val CatalogAndIdentifierParts(catalog, parts) = tableName
if (!isSessionCatalog(catalog)) {
throw new AnalysisException("ANALYZE TABLE is only supported with v1 tables.")
}
val v1TableName = parseV1Table(tableName, "ANALYZE TABLE")
if (partitionSpec.isEmpty) {
AnalyzeTableCommand(parts.asTableIdentifier, noScan)
AnalyzeTableCommand(v1TableName.asTableIdentifier, noScan)
} else {
AnalyzePartitionCommand(parts.asTableIdentifier, partitionSpec, noScan)
AnalyzePartitionCommand(v1TableName.asTableIdentifier, partitionSpec, noScan)
}

case AnalyzeColumnStatement(tableName, columnNames, allColumns) =>
val CatalogAndIdentifierParts(catalog, parts) = tableName
if (!isSessionCatalog(catalog)) {
throw new AnalysisException("ANALYZE TABLE is only supported with v1 tables.")
}
AnalyzeColumnCommand(parts.asTableIdentifier, columnNames, allColumns)
val v1TableName = parseV1Table(tableName, "ANALYZE TABLE")
AnalyzeColumnCommand(v1TableName.asTableIdentifier, columnNames, allColumns)

case RepairTableStatement(tableName) =>
val v1TableName = parseV1Table(tableName, "MSCK REPAIR TABLE")
AlterTableRecoverPartitionsCommand(
v1TableName.asTableIdentifier,
"MSCK REPAIR TABLE")
}

private def parseV1Table(tableName: Seq[String], sql: String): Seq[String] = {
val CatalogAndIdentifierParts(catalog, parts) = tableName
if (!isSessionCatalog(catalog)) {
throw new AnalysisException(s"$sql is only supported with v1 tables.")
}
parts
}

private def buildCatalogTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,20 +360,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec))
}

/**
* Create a [[AlterTableRecoverPartitionsCommand]] command.
*
* For example:
* {{{
* MSCK REPAIR TABLE tablename
* }}}
*/
override def visitRepairTable(ctx: RepairTableContext): LogicalPlan = withOrigin(ctx) {
AlterTableRecoverPartitionsCommand(
visitTableIdentifier(ctx.tableIdentifier),
"MSCK REPAIR TABLE")
}

/**
* Create a [[CreateDatabaseCommand]] command.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1198,6 +1198,18 @@ class DataSourceV2SQLSuite
}
}

test("MSCK REPAIR TABLE") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo")

val e = intercept[AnalysisException] {
sql(s"MSCK REPAIR TABLE $t")
}
assert(e.message.contains("MSCK REPAIR TABLE is only supported with v1 tables"))
}
}

private def assertAnalysisError(sqlStatement: String, expectedError: String): Unit = {
val errMsg = intercept[AnalysisException] {
sql(sqlStatement)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1444,15 +1444,6 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession {
intercept(sql2, "Found duplicate clauses: TBLPROPERTIES")
}

test("MSCK REPAIR table") {
val sql = "MSCK REPAIR TABLE tab1"
val parsed = parser.parsePlan(sql)
val expected = AlterTableRecoverPartitionsCommand(
TableIdentifier("tab1", None),
"MSCK REPAIR TABLE")
comparePlans(parsed, expected)
}

test("create table like") {
val v1 = "CREATE TABLE table1 LIKE table2"
val (target, source, location, exists) = parser.parsePlan(v1).collect {
Expand Down