diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 878c08a79813..1839203e3b23 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -212,7 +212,7 @@ statement | LOAD DATA LOCAL? INPATH path=STRING OVERWRITE? INTO TABLE tableIdentifier partitionSpec? #loadData | TRUNCATE TABLE tableIdentifier partitionSpec? #truncateTable - | MSCK REPAIR TABLE tableIdentifier #repairTable + | MSCK REPAIR TABLE multipartIdentifier #repairTable | op=(ADD | LIST) identifier .*? #manageResource | SET ROLE .*? #failNativeCommand | SET .*? #setConfiguration diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 6a108901dd32..8da9ebf8d035 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -2708,4 +2708,16 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging tableName, Option(visitIdentifierSeq(ctx.identifierSeq())), allColumns = false) } } + + /** + * Create a [[RepairTableStatement]]. + * + * For example: + * {{{ + * MSCK REPAIR TABLE multi_part_name + * }}} + */ + override def visitRepairTable(ctx: RepairTableContext): LogicalPlan = withOrigin(ctx) { + RepairTableStatement(visitMultipartIdentifier(ctx.multipartIdentifier())) + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala index 208ecd0e99c8..72d5cbb7d904 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala @@ -311,3 +311,8 @@ case class AnalyzeColumnStatement( require(columnNames.isDefined ^ allColumns, "Parameter `columnNames` or `allColumns` are " + "mutually exclusive. Only one of them should be specified.") } + +/** + * A REPAIR TABLE statement, as parsed from SQL + */ +case class RepairTableStatement(tableName: Seq[String]) extends ParsedStatement diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index b42fec201352..0eaf74f65506 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -955,6 +955,12 @@ class DDLParserSuite extends AnalysisTest { "missing 'COLUMNS' at ''") } + test("MSCK REPAIR table") { + comparePlans( + parsePlan("MSCK REPAIR TABLE a.b.c"), + RepairTableStatement(Seq("a", "b", "c"))) + } + private case class TableSpec( name: Seq[String], schema: Option[StructType], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index a781e2fb258e..72f539f72008 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, LookupCatalog, TableChange, V1Table} import org.apache.spark.sql.connector.expressions.Transform -import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, AnalyzeColumnCommand, AnalyzePartitionCommand, AnalyzeTableCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand, ShowTablesCommand} +import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableRecoverPartitionsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, AnalyzeColumnCommand, AnalyzePartitionCommand, AnalyzeTableCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand, ShowTablesCommand} import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource} import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2 import org.apache.spark.sql.internal.SQLConf @@ -266,22 +266,30 @@ class ResolveSessionCatalog( ShowTablesCommand(None, pattern) case AnalyzeTableStatement(tableName, partitionSpec, noScan) => - val CatalogAndIdentifierParts(catalog, parts) = tableName - if (!isSessionCatalog(catalog)) { - throw new AnalysisException("ANALYZE TABLE is only supported with v1 tables.") - } + val v1TableName = parseV1Table(tableName, "ANALYZE TABLE") if (partitionSpec.isEmpty) { - AnalyzeTableCommand(parts.asTableIdentifier, noScan) + AnalyzeTableCommand(v1TableName.asTableIdentifier, noScan) } else { - AnalyzePartitionCommand(parts.asTableIdentifier, partitionSpec, noScan) + AnalyzePartitionCommand(v1TableName.asTableIdentifier, partitionSpec, noScan) } case AnalyzeColumnStatement(tableName, columnNames, allColumns) => - val CatalogAndIdentifierParts(catalog, parts) = tableName - if (!isSessionCatalog(catalog)) { - throw new AnalysisException("ANALYZE TABLE is only supported with v1 tables.") - } - AnalyzeColumnCommand(parts.asTableIdentifier, columnNames, allColumns) + val v1TableName = parseV1Table(tableName, "ANALYZE TABLE") + AnalyzeColumnCommand(v1TableName.asTableIdentifier, columnNames, allColumns) + + case RepairTableStatement(tableName) => + val v1TableName = parseV1Table(tableName, "MSCK REPAIR TABLE") + AlterTableRecoverPartitionsCommand( + v1TableName.asTableIdentifier, + "MSCK REPAIR TABLE") + } + + private def parseV1Table(tableName: Seq[String], sql: String): Seq[String] = { + val CatalogAndIdentifierParts(catalog, parts) = tableName + if (!isSessionCatalog(catalog)) { + throw new AnalysisException(s"$sql is only supported with v1 tables.") + } + parts } private def buildCatalogTable( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index df63787fa508..3e7a54877cae 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -360,20 +360,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec)) } - /** - * Create a [[AlterTableRecoverPartitionsCommand]] command. - * - * For example: - * {{{ - * MSCK REPAIR TABLE tablename - * }}} - */ - override def visitRepairTable(ctx: RepairTableContext): LogicalPlan = withOrigin(ctx) { - AlterTableRecoverPartitionsCommand( - visitTableIdentifier(ctx.tableIdentifier), - "MSCK REPAIR TABLE") - } - /** * Create a [[CreateDatabaseCommand]] command. * diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 1d2bc9d4c8ee..d253e6078ddc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -1198,6 +1198,18 @@ class DataSourceV2SQLSuite } } + test("MSCK REPAIR TABLE") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo") + + val e = intercept[AnalysisException] { + sql(s"MSCK REPAIR TABLE $t") + } + assert(e.message.contains("MSCK REPAIR TABLE is only supported with v1 tables")) + } + } + private def assertAnalysisError(sqlStatement: String, expectedError: String): Unit = { val errMsg = intercept[AnalysisException] { sql(sqlStatement) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala index 303630d9d0cb..5a5899cbccc5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala @@ -1444,15 +1444,6 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession { intercept(sql2, "Found duplicate clauses: TBLPROPERTIES") } - test("MSCK REPAIR table") { - val sql = "MSCK REPAIR TABLE tab1" - val parsed = parser.parsePlan(sql) - val expected = AlterTableRecoverPartitionsCommand( - TableIdentifier("tab1", None), - "MSCK REPAIR TABLE") - comparePlans(parsed, expected) - } - test("create table like") { val v1 = "CREATE TABLE table1 LIKE table2" val (target, source, location, exists) = parser.parsePlan(v1).collect {