Skip to content

Commit 98e1a4c

Browse files
imback82cloud-fan
authored andcommitted
[SPARK-28319][SQL] Implement SHOW TABLES for Data Source V2 Tables
## What changes were proposed in this pull request? Implements the SHOW TABLES logical and physical plans for data source v2 tables. ## How was this patch tested? Added unit tests to `DataSourceV2SQLSuite`. Closes #25247 from imback82/dsv2_show_tables. Lead-authored-by: terryk <yuminkim@gmail.com> Co-authored-by: Terry Kim <yuminkim@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 9976b87 commit 98e1a4c

File tree

12 files changed

+347
-27
lines changed

12 files changed

+347
-27
lines changed

sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ statement
183183
| DROP TEMPORARY? FUNCTION (IF EXISTS)? qualifiedName #dropFunction
184184
| EXPLAIN (LOGICAL | FORMATTED | EXTENDED | CODEGEN | COST)?
185185
statement #explain
186-
| SHOW TABLES ((FROM | IN) db=errorCapturingIdentifier)?
186+
| SHOW TABLES ((FROM | IN) multipartIdentifier)?
187187
(LIKE? pattern=STRING)? #showTables
188188
| SHOW TABLE EXTENDED ((FROM | IN) db=errorCapturingIdentifier)?
189189
LIKE pattern=STRING partitionSpec? #showTable

sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,24 @@ trait LookupCatalog extends Logging {
8181
}
8282
}
8383

84+
type CatalogNamespace = (Option[CatalogPlugin], Seq[String])
85+
86+
/**
87+
* Extract catalog and namespace from a multi-part identifier with the default catalog if needed.
88+
* Catalog name takes precedence over namespaces.
89+
*/
90+
object CatalogNamespace {
91+
def unapply(parts: Seq[String]): Some[CatalogNamespace] = parts match {
92+
case Seq(catalogName, tail @ _*) =>
93+
try {
94+
Some((Some(catalogManager.catalog(catalogName)), tail))
95+
} catch {
96+
case _: CatalogNotFoundException =>
97+
Some((defaultCatalog, parts))
98+
}
99+
}
100+
}
101+
84102
/**
85103
* Extract legacy table identifier from a multi-part identifier.
86104
*

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last}
3838
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
3939
import org.apache.spark.sql.catalyst.plans._
4040
import org.apache.spark.sql.catalyst.plans.logical._
41-
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DeleteFromStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, InsertIntoStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement}
41+
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DeleteFromStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, InsertIntoStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement, ShowTablesStatement}
4242
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, stringToDate, stringToTimestamp}
4343
import org.apache.spark.sql.internal.SQLConf
4444
import org.apache.spark.sql.types._
@@ -2449,6 +2449,15 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
24492449
ctx.EXISTS != null)
24502450
}
24512451

2452+
/**
2453+
* Create a [[ShowTablesStatement]] command.
2454+
*/
2455+
override def visitShowTables(ctx: ShowTablesContext): LogicalPlan = withOrigin(ctx) {
2456+
ShowTablesStatement(
2457+
Option(ctx.multipartIdentifier).map(visitMultipartIdentifier),
2458+
Option(ctx.pattern).map(string))
2459+
}
2460+
24522461
/**
24532462
* Parse new column info from ADD COLUMN into a QualifiedColType.
24542463
*/

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -616,6 +616,17 @@ case class AlterTable(
616616
}
617617
}
618618

619+
/**
620+
* The logical plan of the SHOW TABLE command that works for v2 catalogs.
621+
*/
622+
case class ShowTables(
623+
catalog: TableCatalog,
624+
namespace: Seq[String],
625+
pattern: Option[String]) extends Command {
626+
override val output: Seq[Attribute] = Seq(
627+
AttributeReference("namespace", StringType, nullable = false)(),
628+
AttributeReference("tableName", StringType, nullable = false)())
629+
}
619630

620631
/**
621632
* Insert some data into a table. Note that this plan is unresolved and has to be replaced by the
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.plans.logical.sql
19+
20+
/**
21+
* A SHOW TABLES statement, as parsed from SQL.
22+
*/
23+
case class ShowTablesStatement(namespace: Option[Seq[String]], pattern: Option[String])
24+
extends ParsedStatement

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalog.v2.expressions.{ApplyTransform, BucketTransf
2424
import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedRelation, UnresolvedStar}
2525
import org.apache.spark.sql.catalyst.catalog.BucketSpec
2626
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
27-
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, InsertIntoStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement}
27+
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, InsertIntoStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement, ShowTablesStatement}
2828
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType, TimestampType}
2929
import org.apache.spark.unsafe.types.UTF8String
3030

@@ -764,6 +764,21 @@ class DDLParserSuite extends AnalysisTest {
764764
assert(exc.getMessage.contains("INSERT INTO ... IF NOT EXISTS"))
765765
}
766766

767+
test("show tables") {
768+
comparePlans(
769+
parsePlan("SHOW TABLES"),
770+
ShowTablesStatement(None, None))
771+
comparePlans(
772+
parsePlan("SHOW TABLES FROM testcat.ns1.ns2.tbl"),
773+
ShowTablesStatement(Some(Seq("testcat", "ns1", "ns2", "tbl")), None))
774+
comparePlans(
775+
parsePlan("SHOW TABLES IN testcat.ns1.ns2.tbl"),
776+
ShowTablesStatement(Some(Seq("testcat", "ns1", "ns2", "tbl")), None))
777+
comparePlans(
778+
parsePlan("SHOW TABLES IN tbl LIKE '*dog*'"),
779+
ShowTablesStatement(Some(Seq("tbl")), Some("*dog*")))
780+
}
781+
767782
private case class TableSpec(
768783
name: Seq[String],
769784
schema: Option[StructType],

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -145,21 +145,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
145145
SetDatabaseCommand(ctx.db.getText)
146146
}
147147

148-
/**
149-
* Create a [[ShowTablesCommand]] logical plan.
150-
* Example SQL :
151-
* {{{
152-
* SHOW TABLES [(IN|FROM) database_name] [[LIKE] 'identifier_with_wildcards'];
153-
* }}}
154-
*/
155-
override def visitShowTables(ctx: ShowTablesContext): LogicalPlan = withOrigin(ctx) {
156-
ShowTablesCommand(
157-
Option(ctx.db).map(_.getText),
158-
Option(ctx.pattern).map(string),
159-
isExtended = false,
160-
partitionSpec = None)
161-
}
162-
163148
/**
164149
* Create a [[ShowTablesCommand]] logical plan.
165150
* Example SQL :

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,10 @@ import org.apache.spark.sql.catalog.v2.expressions.Transform
2727
import org.apache.spark.sql.catalyst.TableIdentifier
2828
import org.apache.spark.sql.catalyst.analysis.{CastSupport, UnresolvedRelation}
2929
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils, UnresolvedCatalogRelation}
30-
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DeleteFromTable, DropTable, Filter, LogicalPlan, ReplaceTable, ReplaceTableAsSelect, SubqueryAlias}
31-
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DeleteFromStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement}
30+
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DeleteFromTable, DropTable, Filter, LogicalPlan, ReplaceTable, ReplaceTableAsSelect, ShowTables, SubqueryAlias}
31+
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DeleteFromStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement, ShowTablesStatement}
3232
import org.apache.spark.sql.catalyst.rules.Rule
33-
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand}
33+
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand, ShowTablesCommand}
3434
import org.apache.spark.sql.internal.SQLConf
3535
import org.apache.spark.sql.sources.v2.TableProvider
3636
import org.apache.spark.sql.types.{HIVE_TYPE_STRING, HiveStringType, MetadataBuilder, StructField, StructType}
@@ -180,6 +180,29 @@ case class DataSourceResolution(
180180
val aliased = delete.tableAlias.map(SubqueryAlias(_, relation)).getOrElse(relation)
181181
DeleteFromTable(aliased, delete.condition)
182182

183+
case ShowTablesStatement(None, pattern) =>
184+
defaultCatalog match {
185+
case Some(catalog) =>
186+
ShowTables(
187+
catalog.asTableCatalog,
188+
catalogManager.currentNamespace,
189+
pattern)
190+
case None =>
191+
ShowTablesCommand(None, pattern)
192+
}
193+
194+
case ShowTablesStatement(Some(namespace), pattern) =>
195+
val CatalogNamespace(maybeCatalog, ns) = namespace
196+
maybeCatalog match {
197+
case Some(catalog) =>
198+
ShowTables(catalog.asTableCatalog, ns, pattern)
199+
case None =>
200+
if (namespace.length != 1) {
201+
throw new AnalysisException(
202+
s"The database name is not valid: ${namespace.quoted}")
203+
}
204+
ShowTablesCommand(Some(namespace.quoted), pattern)
205+
}
183206
}
184207

185208
object V1WriteProvider {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.spark.sql.{AnalysisException, Strategy}
2626
import org.apache.spark.sql.catalog.v2.StagingTableCatalog
2727
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, PredicateHelper, SubqueryExpression}
2828
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
29-
import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeTable, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition, ReplaceTable, ReplaceTableAsSelect}
29+
import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeTable, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition, ReplaceTable, ReplaceTableAsSelect, ShowTables}
3030
import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
3131
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
3232
import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
@@ -269,6 +269,9 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
269269
case AlterTable(catalog, ident, _, changes) =>
270270
AlterTableExec(catalog, ident, changes) :: Nil
271271

272+
case r : ShowTables =>
273+
ShowTablesExec(r.output, r.catalog, r.namespace, r.pattern) :: Nil
274+
272275
case _ => Nil
273276
}
274277
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.datasources.v2
19+
20+
import scala.collection.mutable.ArrayBuffer
21+
22+
import org.apache.spark.rdd.RDD
23+
import org.apache.spark.sql.catalog.v2.CatalogV2Implicits.NamespaceHelper
24+
import org.apache.spark.sql.catalog.v2.TableCatalog
25+
import org.apache.spark.sql.catalyst.InternalRow
26+
import org.apache.spark.sql.catalyst.encoders.RowEncoder
27+
import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRowWithSchema}
28+
import org.apache.spark.sql.catalyst.util.StringUtils
29+
import org.apache.spark.sql.execution.LeafExecNode
30+
31+
/**
32+
* Physical plan node for showing tables.
33+
*/
34+
case class ShowTablesExec(
35+
output: Seq[Attribute],
36+
catalog: TableCatalog,
37+
namespace: Seq[String],
38+
pattern: Option[String])
39+
extends LeafExecNode {
40+
override protected def doExecute(): RDD[InternalRow] = {
41+
val rows = new ArrayBuffer[InternalRow]()
42+
val encoder = RowEncoder(schema).resolveAndBind()
43+
44+
val tables = catalog.listTables(namespace.toArray)
45+
tables.map { table =>
46+
if (pattern.map(StringUtils.filterPattern(Seq(table.name()), _).nonEmpty).getOrElse(true)) {
47+
rows += encoder
48+
.toRow(
49+
new GenericRowWithSchema(
50+
Array(table.namespace().quoted, table.name()),
51+
schema))
52+
.copy()
53+
}
54+
}
55+
56+
sparkContext.parallelize(rows, 1)
57+
}
58+
}

0 commit comments

Comments
 (0)