From daa17c13171519e0366a4bfd1eba721018a638f7 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Fri, 18 Oct 2019 00:42:34 +0200 Subject: [PATCH 1/8] initial checkin --- .../spark/sql/catalyst/parser/SqlBase.g4 | 4 +-- .../sql/catalyst/parser/AstBuilder.scala | 31 +++++++++++++++++++ .../catalyst/plans/logical/statements.scala | 10 ++++++ .../spark/sql/execution/SparkSqlParser.scala | 27 ---------------- 4 files changed, 43 insertions(+), 29 deletions(-) diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index a7824d50ca84..b6e5641da875 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -83,10 +83,10 @@ statement : query #statementDefault | ctes? dmlStatementNoWith #dmlStatement | USE NAMESPACE? multipartIdentifier #use - | CREATE database (IF NOT EXISTS)? db=errorCapturingIdentifier + | CREATE (DATABASE | NAMESPACE) (IF NOT EXISTS)? multipartIdentifier ((COMMENT comment=STRING) | locationSpec | - (WITH DBPROPERTIES tablePropertyList))* #createDatabase + (WITH DBPROPERTIES tablePropertyList))* #createNamespace | ALTER database db=errorCapturingIdentifier SET DBPROPERTIES tablePropertyList #setDatabaseProperties | ALTER database db=errorCapturingIdentifier diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index d138ff401944..49f08647f3dc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -2299,6 +2299,37 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } } + /** + * Create a [[CreateDatabaseCommand]] command. + * + * For example: + * {{{ + * CREATE DATABASE [IF NOT EXISTS] database_name + * create_database_clauses; + * + * create_database_clauses (order insensitive): + * [COMMENT database_comment] + * [LOCATION path] + * [WITH DBPROPERTIES (key1=val1, key2=val2, ...)] + * }}} + */ + override def visitCreateNamespace(ctx: CreateNamespaceContext): LogicalPlan = withOrigin(ctx) { + checkDuplicateClauses(ctx.COMMENT, "COMMENT", ctx) + checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx) + checkDuplicateClauses(ctx.DBPROPERTIES, "WITH DBPROPERTIES", ctx) + + if (ctx.DATABASE != null && ctx.multipartIdentifier != null) { + throw new ParseException(s"FROM/IN operator is not allowed in SHOW DATABASES", ctx) + } + + CreateNamespaceStatement( + visitMultipartIdentifier(ctx.multipartIdentifier), + ctx.EXISTS != null, + Option(ctx.comment).map(string), + ctx.locationSpec.asScala.headOption.map(visitLocationSpec), + ctx.tablePropertyList.asScala.headOption.map(visitPropertyKeyValues).getOrElse(Map.empty)) + } + /** * Create a [[ShowNamespacesStatement]] command. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala index 757a8bfe219f..74310cb99a96 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala @@ -282,6 +282,16 @@ case class InsertIntoStatement( case class ShowTablesStatement(namespace: Option[Seq[String]], pattern: Option[String]) extends ParsedStatement +/** + * A CREATE NAMESPACE statement, as parsed from SQL. + */ +case class CreateNamespaceStatement( + namespace: Seq[String], + ifNotExists: Boolean, + comment: Option[String], + location: Option[String], + properties: Map[String, String]) extends ParsedStatement + /** * A SHOW NAMESPACES statement, as parsed from SQL. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 3dd392156d52..800d66c1f176 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -423,33 +423,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { "MSCK REPAIR TABLE") } - /** - * Create a [[CreateDatabaseCommand]] command. - * - * For example: - * {{{ - * CREATE DATABASE [IF NOT EXISTS] database_name - * create_database_clauses; - * - * create_database_clauses (order insensitive): - * [COMMENT database_comment] - * [LOCATION path] - * [WITH DBPROPERTIES (key1=val1, key2=val2, ...)] - * }}} - */ - override def visitCreateDatabase(ctx: CreateDatabaseContext): LogicalPlan = withOrigin(ctx) { - checkDuplicateClauses(ctx.COMMENT, "COMMENT", ctx) - checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx) - checkDuplicateClauses(ctx.DBPROPERTIES, "WITH DBPROPERTIES", ctx) - - CreateDatabaseCommand( - ctx.db.getText, - ctx.EXISTS != null, - ctx.locationSpec.asScala.headOption.map(visitLocationSpec), - Option(ctx.comment).map(string), - ctx.tablePropertyList.asScala.headOption.map(visitPropertyKeyValues).getOrElse(Map.empty)) - } - /** * Create an [[AlterDatabasePropertiesCommand]] command. * From 27066a5c00d028efb5afa3901b3225e84c5e6393 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Fri, 18 Oct 2019 17:02:58 +0200 Subject: [PATCH 2/8] Finishing up --- .../spark/sql/catalyst/parser/SqlBase.g4 | 2 +- .../catalyst/analysis/ResolveCatalogs.scala | 9 +++ .../sql/catalyst/parser/AstBuilder.scala | 14 ++--- .../catalyst/plans/logical/statements.scala | 2 +- .../catalyst/plans/logical/v2Commands.scala | 10 +++ .../sql/catalyst/parser/DDLParserSuite.scala | 57 +++++++++++++++++ .../catalyst/parser/ParserUtilsSuite.scala | 2 +- .../analysis/ResolveSessionCatalog.scala | 9 ++- .../datasources/v2/CreateNamespaceExec.scala | 61 +++++++++++++++++++ .../sql/connector/DataSourceV2SQLSuite.scala | 8 +++ .../execution/command/DDLParserSuite.scala | 58 ------------------ 11 files changed, 161 insertions(+), 71 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateNamespaceExec.scala diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index b6e5641da875..6e3cae5bae2a 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -83,7 +83,7 @@ statement : query #statementDefault | ctes? dmlStatementNoWith #dmlStatement | USE NAMESPACE? multipartIdentifier #use - | CREATE (DATABASE | NAMESPACE) (IF NOT EXISTS)? multipartIdentifier + | CREATE (database | NAMESPACE) (IF NOT EXISTS)? multipartIdentifier ((COMMENT comment=STRING) | locationSpec | (WITH DBPROPERTIES tablePropertyList))* #createNamespace diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala index 568944678544..3926b9c72384 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala @@ -168,6 +168,15 @@ class ResolveCatalogs(val catalogManager: CatalogManager) s"Can not specify catalog `${catalog.name}` for view ${viewName.quoted} " + s"because view support in catalog has not been implemented yet") + case c @ CreateNamespaceStatement(NonSessionCatalog(catalog, nameParts), _, _, _, _) => + CreateNamespace( + catalog.asNamespaceCatalog, + nameParts, + c.ifNotExists, + c.comment, + c.locationSpec, + c.properties) + case ShowNamespacesStatement(Some(CatalogAndNamespace(catalog, namespace)), pattern) => ShowNamespaces(catalog.asNamespaceCatalog, namespace, pattern) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 49f08647f3dc..162344c89fec 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -2300,15 +2300,15 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } /** - * Create a [[CreateDatabaseCommand]] command. + * Create a [[CreateNamespaceStatement]] command. * * For example: * {{{ - * CREATE DATABASE [IF NOT EXISTS] database_name - * create_database_clauses; + * CREATE NAMESPACE [IF NOT EXISTS] ns1.ns2.ns3 + * create_namespace_clauses; * - * create_database_clauses (order insensitive): - * [COMMENT database_comment] + * create_namespace_clauses (order insensitive): + * [COMMENT namespace_comment] * [LOCATION path] * [WITH DBPROPERTIES (key1=val1, key2=val2, ...)] * }}} @@ -2318,10 +2318,6 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx) checkDuplicateClauses(ctx.DBPROPERTIES, "WITH DBPROPERTIES", ctx) - if (ctx.DATABASE != null && ctx.multipartIdentifier != null) { - throw new ParseException(s"FROM/IN operator is not allowed in SHOW DATABASES", ctx) - } - CreateNamespaceStatement( visitMultipartIdentifier(ctx.multipartIdentifier), ctx.EXISTS != null, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala index 74310cb99a96..6321f9fc0110 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala @@ -289,7 +289,7 @@ case class CreateNamespaceStatement( namespace: Seq[String], ifNotExists: Boolean, comment: Option[String], - location: Option[String], + locationSpec: Option[String], properties: Map[String, String]) extends ParsedStatement /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index f89dfb1ec47d..8ba8722b3fdf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -237,6 +237,16 @@ case class ReplaceTableAsSelect( } } +/** + * The logical plan of the CREATE NAMESPACE command that works for v2 catalogs. + */ +case class CreateNamespace( + catalog: SupportsNamespaces, + namespace: Seq[String], + ifNotExists: Boolean, + comment: Option[String], + locationSpec: Option[String], + properties: Map[String, String]) extends Command /** * The logical plan of the SHOW NAMESPACES command that works for v2 catalogs. diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index d9e50ef09fdd..b66d05e13a91 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -845,6 +845,63 @@ class DDLParserSuite extends AnalysisTest { ShowTablesStatement(Some(Seq("tbl")), Some("*dog*"))) } + test("create namespace") { + val sql = + """ + |CREATE DATABASE IF NOT EXISTS database_name + |WITH DBPROPERTIES ('a'='a', 'b'='b', 'c'='c') + |COMMENT 'database_comment' LOCATION '/home/user/db' + """.stripMargin + comparePlans( + parsePlan(sql), + CreateNamespaceStatement( + Seq("database_name"), + ifNotExists = true, + Some("database_comment"), + Some("/home/user/db"), + Map("a" -> "a", "b" -> "b", "c" -> "c"))) + } + + test("create namespace -- check duplicates") { + def createDatabase(duplicateClause: String): String = { + s""" + |CREATE DATABASE IF NOT EXISTS database_name + |$duplicateClause + |$duplicateClause + """.stripMargin + } + val sql1 = createDatabase("COMMENT 'database_comment'") + val sql2 = createDatabase("LOCATION '/home/user/db'") + val sql3 = createDatabase("WITH DBPROPERTIES ('a'='a', 'b'='b', 'c'='c')") + + intercept(sql1, "Found duplicate clauses: COMMENT") + intercept(sql2, "Found duplicate clauses: LOCATION") + intercept(sql3, "Found duplicate clauses: WITH DBPROPERTIES") + } + + test("create namespace - property values must be set") { + assertUnsupported( + sql = "CREATE DATABASE my_db WITH DBPROPERTIES('key_without_value', 'key_with_value'='x')", + containsThesePhrases = Seq("key_without_value")) + } + + test("create namespace - support for other types in DBPROPERTIES") { + val sql = + """ + |CREATE DATABASE database_name + |LOCATION '/home/user/db' + |WITH DBPROPERTIES ('a'=1, 'b'=0.1, 'c'=TRUE) + """.stripMargin + comparePlans( + parsePlan(sql), + CreateNamespaceStatement( + Seq("database_name"), + ifNotExists = false, + None, + Some("/home/user/db"), + Map("a" -> "1", "b" -> "0.1", "c" -> "true"))) + } + test("show databases: basic") { comparePlans( parsePlan("SHOW DATABASES"), diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ParserUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ParserUtilsSuite.scala index 07f77ea889db..c6434f2bdd3e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ParserUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ParserUtilsSuite.scala @@ -50,7 +50,7 @@ class ParserUtilsSuite extends SparkFunSuite { |WITH DBPROPERTIES ('a'='a', 'b'='b', 'c'='c') """.stripMargin ) { parser => - parser.statement().asInstanceOf[CreateDatabaseContext] + parser.statement().asInstanceOf[CreateNamespaceContext] } val emptyContext = buildContext("") { parser => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index a96533dac97e..2ec343f15ed8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, LookupCatalog, TableChange, V1Table} import org.apache.spark.sql.connector.expressions.Transform -import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand, ShowTablesCommand} +import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, CreateDatabaseCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand, ShowTablesCommand} import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource} import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2 import org.apache.spark.sql.internal.SQLConf @@ -255,6 +255,13 @@ class ResolveSessionCatalog( case DropViewStatement(SessionCatalog(catalog, viewName), ifExists) => DropTableCommand(viewName.asTableIdentifier, ifExists, isView = true, purge = false) + case c @ CreateNamespaceStatement(SessionCatalog(catalog, nameParts), _, _, _, _) => + if (nameParts.length != 1) { + throw new AnalysisException( + s"The database name is not valid: ${nameParts.quoted}") + } + CreateDatabaseCommand(nameParts.head, c.ifNotExists, c.locationSpec, c.comment, c.properties) + case ShowTablesStatement(Some(SessionCatalog(catalog, nameParts)), pattern) => if (nameParts.length != 1) { throw new AnalysisException( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateNamespaceExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateNamespaceExec.scala new file mode 100644 index 000000000000..cb5f9db6297e --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateNamespaceExec.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import scala.collection.JavaConverters.mapAsJavaMapConverter + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException +import org.apache.spark.sql.catalyst.catalog.CatalogUtils +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.SupportsNamespaces + +/** + * Physical plan node for creating a namespace. + */ +case class CreateNamespaceExec( + catalog: SupportsNamespaces, + namespace: Seq[String], + ifNotExists: Boolean, + comment: Option[String], + locationSpec: Option[String], + private var properties: Map[String, String]) + extends V2CommandExec { + override protected def run(): Seq[InternalRow] = { + val ns = namespace.toArray + if (ifNotExists && catalog.namespaceExists(ns)) { + throw new NamespaceAlreadyExistsException(ns) + } + + // Add any additional properties. + if (comment.nonEmpty) { + properties += ("comment" -> comment.get) + } + if (locationSpec.nonEmpty) { + properties += ("locationSpec" -> CatalogUtils.stringToURI(locationSpec.get).toString) + } + + // Note that even if ifNotExists is set to false, createNamespace can still throw + // NamespaceAlreadyExistsException depending on the SupportsNamespaces implementation. + catalog.createNamespace(ns, properties.asJava) + + Seq.empty + } + + override def output: Seq[Attribute] = Seq.empty +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 3b42c2374f00..9112f56b5eee 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -747,6 +747,14 @@ class DataSourceV2SQLSuite assert(expected === df.collect()) } + test("CreateNameSpace: session catalog") { + sql("CREATE NAMESPACE ns") + testShowNamespaces("SHOW NAMESPACES", Seq("default", "ns")) + + sql("CREATE NAMESPACE testcat.ns1.ns2") + testShowNamespaces("SHOW NAMESPACES testcat", Seq("ns1")) + } + test("ShowNamespaces: show root namespaces with default v2 catalog") { spark.conf.set("spark.sql.default.catalog", "testcat") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala index 303630d9d0cb..25c65b3ee5ff 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala @@ -74,46 +74,6 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession { }.head } - test("create database") { - val sql = - """ - |CREATE DATABASE IF NOT EXISTS database_name - |WITH DBPROPERTIES ('a'='a', 'b'='b', 'c'='c') - |COMMENT 'database_comment' LOCATION '/home/user/db' - """.stripMargin - val parsed = parser.parsePlan(sql) - val expected = CreateDatabaseCommand( - "database_name", - ifNotExists = true, - Some("/home/user/db"), - Some("database_comment"), - Map("a" -> "a", "b" -> "b", "c" -> "c")) - comparePlans(parsed, expected) - } - - test("create database -- check duplicates") { - def createDatabase(duplicateClause: String): String = { - s""" - |CREATE DATABASE IF NOT EXISTS database_name - |$duplicateClause - |$duplicateClause - """.stripMargin - } - val sql1 = createDatabase("COMMENT 'database_comment'") - val sql2 = createDatabase("LOCATION '/home/user/db'") - val sql3 = createDatabase("WITH DBPROPERTIES ('a'='a', 'b'='b', 'c'='c')") - - intercept(sql1, "Found duplicate clauses: COMMENT") - intercept(sql2, "Found duplicate clauses: LOCATION") - intercept(sql3, "Found duplicate clauses: WITH DBPROPERTIES") - } - - test("create database - property values must be set") { - assertUnsupported( - sql = "CREATE DATABASE my_db WITH DBPROPERTIES('key_without_value', 'key_with_value'='x')", - containsThesePhrases = Seq("key_without_value")) - } - test("drop database") { val sql1 = "DROP DATABASE IF EXISTS database_name RESTRICT" val sql2 = "DROP DATABASE IF EXISTS database_name CASCADE" @@ -891,24 +851,6 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession { comparePlans(parsed3, expected3) } - test("support for other types in DBPROPERTIES") { - val sql = - """ - |CREATE DATABASE database_name - |LOCATION '/home/user/db' - |WITH DBPROPERTIES ('a'=1, 'b'=0.1, 'c'=TRUE) - """.stripMargin - val parsed = parser.parsePlan(sql) - val expected = CreateDatabaseCommand( - "database_name", - ifNotExists = false, - Some("/home/user/db"), - None, - Map("a" -> "1", "b" -> "0.1", "c" -> "true")) - - comparePlans(parsed, expected) - } - test("Test CTAS #1") { val s1 = """ From 08728d3bc2778b4ccd2a023bfa476b0fa61b4e83 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Fri, 18 Oct 2019 17:48:36 +0200 Subject: [PATCH 3/8] add more tests --- .../datasources/v2/CreateNamespaceExec.scala | 9 +++---- .../datasources/v2/DataSourceV2Strategy.scala | 5 +++- .../sql/connector/DataSourceV2SQLSuite.scala | 24 ++++++++++++++++--- 3 files changed, 28 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateNamespaceExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateNamespaceExec.scala index cb5f9db6297e..9fa633d17b3d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateNamespaceExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateNamespaceExec.scala @@ -37,9 +37,8 @@ case class CreateNamespaceExec( private var properties: Map[String, String]) extends V2CommandExec { override protected def run(): Seq[InternalRow] = { - val ns = namespace.toArray - if (ifNotExists && catalog.namespaceExists(ns)) { - throw new NamespaceAlreadyExistsException(ns) + if (ifNotExists && catalog.namespaceExists(namespace.toArray)) { + return Seq.empty } // Add any additional properties. @@ -50,9 +49,7 @@ case class CreateNamespaceExec( properties += ("locationSpec" -> CatalogUtils.stringToURI(locationSpec.get).toString) } - // Note that even if ifNotExists is set to false, createNamespace can still throw - // NamespaceAlreadyExistsException depending on the SupportsNamespaces implementation. - catalog.createNamespace(ns, properties.asJava) + catalog.createNamespace(namespace.toArray, properties.asJava) Seq.empty } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index c8d29520bcfc..d3cdfae451b5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -23,7 +23,7 @@ import scala.collection.mutable import org.apache.spark.sql.{AnalysisException, Strategy} import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, PredicateHelper, SubqueryExpression} import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeTable, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition, ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, ShowNamespaces, ShowTables} +import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateNamespace, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeTable, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition, ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, ShowNamespaces, ShowTables} import org.apache.spark.sql.connector.catalog.{StagingTableCatalog, TableCapability} import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownFilters, SupportsPushDownRequiredColumns} import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream} @@ -289,6 +289,9 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { case AlterTable(catalog, ident, _, changes) => AlterTableExec(catalog, ident, changes) :: Nil + case CreateNamespace(catalog, namespace, ifNotExists, comment, locationSpec, properties) => + CreateNamespaceExec(catalog, namespace, ifNotExists, comment, locationSpec, properties) :: Nil + case r: ShowNamespaces => ShowNamespacesExec(r.output, r.catalog, r.namespace, r.pattern) :: Nil diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 81bdbee1c16c..f3c4d34ef5c4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.connector import scala.collection.JavaConverters._ import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchDatabaseException, NoSuchTableException, TableAlreadyExistsException} +import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NamespaceAlreadyExistsException, NoSuchDatabaseException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.connector.catalog._ import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME import org.apache.spark.sql.internal.SQLConf @@ -764,12 +764,30 @@ class DataSourceV2SQLSuite assert(expected === df.collect()) } - test("CreateNameSpace: session catalog") { + test("CreateNameSpace: basic tests") { + // Session catalog is used. sql("CREATE NAMESPACE ns") testShowNamespaces("SHOW NAMESPACES", Seq("default", "ns")) + // V2 non-session catalog is used. sql("CREATE NAMESPACE testcat.ns1.ns2") - testShowNamespaces("SHOW NAMESPACES testcat", Seq("ns1")) + testShowNamespaces("SHOW NAMESPACES IN testcat", Seq("ns1")) + testShowNamespaces("SHOW NAMESPACES IN testcat.ns1", Seq("ns1.ns2")) + + // TODO: Add tests for validating namespace metadata when DESCRIBE NAMESPACE is available. + } + + test("CreateNameSpace: test handling of 'IF NOT EXIST'") { + sql("CREATE NAMESPACE IF NOT EXISTS testcat.ns1") + + // The 'ns1' namespace already exists, so this should fail. + val exception = intercept[NamespaceAlreadyExistsException] { + sql("CREATE NAMESPACE testcat.ns1") + } + assert(exception.getMessage.contains("Namespace 'ns1' already exists")) + + // The following will be no-op since the namespace already exists. + sql("CREATE NAMESPACE IF NOT EXISTS testcat.ns1") } test("ShowNamespaces: show root namespaces with default v2 catalog") { From c1e66873f1bd98928cea774c9a9277a7392d96be Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Sat, 19 Oct 2019 20:14:49 -0700 Subject: [PATCH 4/8] Address comments --- .../sql/catalyst/analysis/ResolveCatalogs.scala | 4 +--- .../spark/sql/catalyst/parser/AstBuilder.scala | 14 +++++++++++--- .../sql/catalyst/plans/logical/statements.scala | 7 +++++-- .../sql/catalyst/plans/logical/v2Commands.scala | 2 -- .../sql/catalyst/parser/DDLParserSuite.scala | 17 +++++++++++------ .../analysis/ResolveSessionCatalog.scala | 10 ++++++++-- .../datasources/v2/CreateNamespaceExec.scala | 10 ---------- .../datasources/v2/DataSourceV2Strategy.scala | 4 ++-- 8 files changed, 38 insertions(+), 30 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala index 7e06e1b67b8c..6553b3d57d7f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala @@ -168,13 +168,11 @@ class ResolveCatalogs(val catalogManager: CatalogManager) s"Can not specify catalog `${catalog.name}` for view ${viewName.quoted} " + s"because view support in catalog has not been implemented yet") - case c @ CreateNamespaceStatement(NonSessionCatalog(catalog, nameParts), _, _, _, _) => + case c @ CreateNamespaceStatement(NonSessionCatalog(catalog, nameParts), _, _) => CreateNamespace( catalog.asNamespaceCatalog, nameParts, c.ifNotExists, - c.comment, - c.locationSpec, c.properties) case ShowNamespacesStatement(Some(CatalogAndNamespace(catalog, namespace)), pattern) => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 3c0eadaaf257..005e4915339f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -2318,12 +2318,20 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx) checkDuplicateClauses(ctx.DBPROPERTIES, "WITH DBPROPERTIES", ctx) + var properties = ctx.tablePropertyList.asScala.headOption + .map(visitPropertyKeyValues) + .getOrElse(Map.empty) + Option(ctx.comment).map(string).map { + properties += CreateNamespaceStatement.COMMENT_PROPERTY_KEY -> _ + } + ctx.locationSpec.asScala.headOption.map(visitLocationSpec).map { + properties += CreateNamespaceStatement.LOCATION_PROPERTY_KEY -> _ + } + CreateNamespaceStatement( visitMultipartIdentifier(ctx.multipartIdentifier), ctx.EXISTS != null, - Option(ctx.comment).map(string), - ctx.locationSpec.asScala.headOption.map(visitLocationSpec), - ctx.tablePropertyList.asScala.headOption.map(visitPropertyKeyValues).getOrElse(Map.empty)) + properties) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala index 8a4d412c1de6..624da936120b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala @@ -288,10 +288,13 @@ case class ShowTablesStatement(namespace: Option[Seq[String]], pattern: Option[S case class CreateNamespaceStatement( namespace: Seq[String], ifNotExists: Boolean, - comment: Option[String], - locationSpec: Option[String], properties: Map[String, String]) extends ParsedStatement +object CreateNamespaceStatement { + val COMMENT_PROPERTY_KEY: String = "comment" + val LOCATION_PROPERTY_KEY: String = "location" +} + /** * A SHOW NAMESPACES statement, as parsed from SQL. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 8ba8722b3fdf..1c3f2447305d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -244,8 +244,6 @@ case class CreateNamespace( catalog: SupportsNamespaces, namespace: Seq[String], ifNotExists: Boolean, - comment: Option[String], - locationSpec: Option[String], properties: Map[String, String]) extends Command /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index f862bb49f1fa..64ff82c998df 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -857,9 +857,12 @@ class DDLParserSuite extends AnalysisTest { CreateNamespaceStatement( Seq("database_name"), ifNotExists = true, - Some("database_comment"), - Some("/home/user/db"), - Map("a" -> "a", "b" -> "b", "c" -> "c"))) + Map( + "a" -> "a", + "b" -> "b", + "c" -> "c", + "comment" -> "database_comment", + "location" -> "/home/user/db"))) } test("create namespace -- check duplicates") { @@ -897,9 +900,11 @@ class DDLParserSuite extends AnalysisTest { CreateNamespaceStatement( Seq("database_name"), ifNotExists = false, - None, - Some("/home/user/db"), - Map("a" -> "1", "b" -> "0.1", "c" -> "true"))) + Map( + "a" -> "1", + "b" -> "0.1", + "c" -> "true", + "location" -> "/home/user/db"))) } test("show databases: basic") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 4cddf4d631cf..1a77aaf98dcb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -255,12 +255,18 @@ class ResolveSessionCatalog( case DropViewStatement(SessionCatalog(catalog, viewName), ifExists) => DropTableCommand(viewName.asTableIdentifier, ifExists, isView = true, purge = false) - case c @ CreateNamespaceStatement(SessionCatalog(catalog, nameParts), _, _, _, _) => + case c @ CreateNamespaceStatement(SessionCatalog(catalog, nameParts), _, _) => if (nameParts.length != 1) { throw new AnalysisException( s"The database name is not valid: ${nameParts.quoted}") } - CreateDatabaseCommand(nameParts.head, c.ifNotExists, c.locationSpec, c.comment, c.properties) + + val comment = c.properties.get(CreateNamespaceStatement.COMMENT_PROPERTY_KEY) + val location = c.properties.get(CreateNamespaceStatement.LOCATION_PROPERTY_KEY) + val newProperties = c.properties - + CreateNamespaceStatement.COMMENT_PROPERTY_KEY - + CreateNamespaceStatement.LOCATION_PROPERTY_KEY + CreateDatabaseCommand(nameParts.head, c.ifNotExists, location, comment, newProperties) case ShowTablesStatement(Some(SessionCatalog(catalog, nameParts)), pattern) => if (nameParts.length != 1) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateNamespaceExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateNamespaceExec.scala index 9fa633d17b3d..08cf9ace05ab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateNamespaceExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateNamespaceExec.scala @@ -32,8 +32,6 @@ case class CreateNamespaceExec( catalog: SupportsNamespaces, namespace: Seq[String], ifNotExists: Boolean, - comment: Option[String], - locationSpec: Option[String], private var properties: Map[String, String]) extends V2CommandExec { override protected def run(): Seq[InternalRow] = { @@ -41,14 +39,6 @@ case class CreateNamespaceExec( return Seq.empty } - // Add any additional properties. - if (comment.nonEmpty) { - properties += ("comment" -> comment.get) - } - if (locationSpec.nonEmpty) { - properties += ("locationSpec" -> CatalogUtils.stringToURI(locationSpec.get).toString) - } - catalog.createNamespace(namespace.toArray, properties.asJava) Seq.empty diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index d3cdfae451b5..49035c3cc3da 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -289,8 +289,8 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { case AlterTable(catalog, ident, _, changes) => AlterTableExec(catalog, ident, changes) :: Nil - case CreateNamespace(catalog, namespace, ifNotExists, comment, locationSpec, properties) => - CreateNamespaceExec(catalog, namespace, ifNotExists, comment, locationSpec, properties) :: Nil + case CreateNamespace(catalog, namespace, ifNotExists, properties) => + CreateNamespaceExec(catalog, namespace, ifNotExists, properties) :: Nil case r: ShowNamespaces => ShowNamespacesExec(r.output, r.catalog, r.namespace, r.pattern) :: Nil From 6f80ff4ce65d4fa5e64db9735232412bbd989a19 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Sun, 20 Oct 2019 08:41:20 -0700 Subject: [PATCH 5/8] change ifNotExist logic. --- .../execution/datasources/v2/CreateNamespaceExec.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateNamespaceExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateNamespaceExec.scala index 08cf9ace05ab..255c5db5e3fb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateNamespaceExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateNamespaceExec.scala @@ -35,12 +35,12 @@ case class CreateNamespaceExec( private var properties: Map[String, String]) extends V2CommandExec { override protected def run(): Seq[InternalRow] = { - if (ifNotExists && catalog.namespaceExists(namespace.toArray)) { - return Seq.empty + try { + catalog.createNamespace(namespace.toArray, properties.asJava) + } catch { + case _ : NamespaceAlreadyExistsException if ifNotExists => } - catalog.createNamespace(namespace.toArray, properties.asJava) - Seq.empty } From 970338ceb945e603dfe9632c82778a09c9ab5375 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Sun, 20 Oct 2019 20:28:14 -0700 Subject: [PATCH 6/8] address comments --- .../sql/catalyst/plans/logical/statements.scala | 6 +++--- .../datasources/v2/CreateNamespaceExec.scala | 17 ++++++++++++----- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala index 8225c45d99fa..9883e90a3693 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala @@ -286,9 +286,9 @@ case class ShowTablesStatement(namespace: Option[Seq[String]], pattern: Option[S * A CREATE NAMESPACE statement, as parsed from SQL. */ case class CreateNamespaceStatement( - namespace: Seq[String], - ifNotExists: Boolean, - properties: Map[String, String]) extends ParsedStatement + namespace: Seq[String], + ifNotExists: Boolean, + properties: Map[String, String]) extends ParsedStatement object CreateNamespaceStatement { val COMMENT_PROPERTY_KEY: String = "comment" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateNamespaceExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateNamespaceExec.scala index 255c5db5e3fb..0f69f85dd837 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateNamespaceExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateNamespaceExec.scala @@ -21,7 +21,6 @@ import scala.collection.JavaConverters.mapAsJavaMapConverter import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException -import org.apache.spark.sql.catalyst.catalog.CatalogUtils import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.connector.catalog.SupportsNamespaces @@ -35,10 +34,18 @@ case class CreateNamespaceExec( private var properties: Map[String, String]) extends V2CommandExec { override protected def run(): Seq[InternalRow] = { - try { - catalog.createNamespace(namespace.toArray, properties.asJava) - } catch { - case _ : NamespaceAlreadyExistsException if ifNotExists => + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + val ns = namespace.toArray + if (!catalog.namespaceExists(ns)) { + try { + catalog.createNamespace(ns, properties.asJava) + } catch { + case _: NamespaceAlreadyExistsException if ifNotExists => + logWarning(s"Namespace ${namespace.quoted} was created concurrently. Ignoring.") + } + } else if (!ifNotExists) { + throw new NamespaceAlreadyExistsException(ns) } Seq.empty From 132cd7fb1f95d70591dddc4ad03e05a2acb08dc8 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Tue, 22 Oct 2019 13:24:10 -0700 Subject: [PATCH 7/8] Address PR comments --- docs/sql-keywords.md | 1 + .../spark/sql/catalyst/parser/SqlBase.g4 | 5 +- .../sql/catalyst/parser/AstBuilder.scala | 7 +- .../catalyst/plans/logical/v2Commands.scala | 8 +- .../sql/catalyst/parser/DDLParserSuite.scala | 74 ++++++++++++------- 5 files changed, 63 insertions(+), 32 deletions(-) diff --git a/docs/sql-keywords.md b/docs/sql-keywords.md index 7a0e3efee8ff..b4f8d8be11c4 100644 --- a/docs/sql-keywords.md +++ b/docs/sql-keywords.md @@ -210,6 +210,7 @@ Below is a list of all the keywords in Spark SQL. PRECEDINGnon-reservednon-reservednon-reserved PRIMARYreservednon-reservedreserved PRINCIPALSnon-reservednon-reservednon-reserved + PROPERTIESnon-reservednon-reservednon-reserved PURGEnon-reservednon-reservednon-reserved QUERYnon-reservednon-reservednon-reserved RANGEnon-reservednon-reservedreserved diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 401bb9164849..5d245fcf9361 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -86,7 +86,7 @@ statement | CREATE (database | NAMESPACE) (IF NOT EXISTS)? multipartIdentifier ((COMMENT comment=STRING) | locationSpec | - (WITH DBPROPERTIES tablePropertyList))* #createNamespace + (WITH (DBPROPERTIES | PROPERTIES) tablePropertyList))* #createNamespace | ALTER database db=errorCapturingIdentifier SET DBPROPERTIES tablePropertyList #setDatabaseProperties | ALTER database db=errorCapturingIdentifier @@ -1039,6 +1039,7 @@ ansiNonReserved | POSITION | PRECEDING | PRINCIPALS + | PROPERTIES | PURGE | QUERY | RANGE @@ -1299,6 +1300,7 @@ nonReserved | PRECEDING | PRIMARY | PRINCIPALS + | PROPERTIES | PURGE | QUERY | RANGE @@ -1564,6 +1566,7 @@ POSITION: 'POSITION'; PRECEDING: 'PRECEDING'; PRIMARY: 'PRIMARY'; PRINCIPALS: 'PRINCIPALS'; +PROPERTIES: 'PROPERTIES'; PURGE: 'PURGE'; QUERY: 'QUERY'; RANGE: 'RANGE'; diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 53630d7d6246..46ec729c5f08 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -2318,14 +2318,19 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging * create_namespace_clauses (order insensitive): * [COMMENT namespace_comment] * [LOCATION path] - * [WITH DBPROPERTIES (key1=val1, key2=val2, ...)] + * [WITH PROPERTIES (key1=val1, key2=val2, ...)] * }}} */ override def visitCreateNamespace(ctx: CreateNamespaceContext): LogicalPlan = withOrigin(ctx) { checkDuplicateClauses(ctx.COMMENT, "COMMENT", ctx) checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx) + checkDuplicateClauses(ctx.PROPERTIES, "WITH PROPERTIES", ctx) checkDuplicateClauses(ctx.DBPROPERTIES, "WITH DBPROPERTIES", ctx) + if (!ctx.PROPERTIES.isEmpty() && !ctx.DBPROPERTIES.isEmpty()) { + throw new ParseException(s"Either PROPERTIES or DBPROPERTIES is allowed.", ctx) + } + var properties = ctx.tablePropertyList.asScala.headOption .map(visitPropertyKeyValues) .getOrElse(Map.empty) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 1c3f2447305d..8f5731a4a7a7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -241,10 +241,10 @@ case class ReplaceTableAsSelect( * The logical plan of the CREATE NAMESPACE command that works for v2 catalogs. */ case class CreateNamespace( - catalog: SupportsNamespaces, - namespace: Seq[String], - ifNotExists: Boolean, - properties: Map[String, String]) extends Command + catalog: SupportsNamespaces, + namespace: Seq[String], + ifNotExists: Boolean, + properties: Map[String, String]) extends Command /** * The logical plan of the SHOW NAMESPACES command that works for v2 catalogs. diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 392150d974b6..dde0094df3f9 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -845,60 +845,82 @@ class DDLParserSuite extends AnalysisTest { ShowTablesStatement(Some(Seq("tbl")), Some("*dog*"))) } - test("create namespace") { - val sql = - """ - |CREATE DATABASE IF NOT EXISTS database_name - |WITH DBPROPERTIES ('a'='a', 'b'='b', 'c'='c') - |COMMENT 'database_comment' LOCATION '/home/user/db' - """.stripMargin + test("create namespace -- backward compatibility with DATABASE/DBPROPERTIES") { + val expected = CreateNamespaceStatement( + Seq("a", "b", "c"), + ifNotExists = true, + Map( + "a" -> "a", + "b" -> "b", + "c" -> "c", + "comment" -> "namespace_comment", + "location" -> "/home/user/db")) + comparePlans( - parsePlan(sql), - CreateNamespaceStatement( - Seq("database_name"), - ifNotExists = true, - Map( - "a" -> "a", - "b" -> "b", - "c" -> "c", - "comment" -> "database_comment", - "location" -> "/home/user/db"))) + parsePlan( + """ + |CREATE NAMESPACE IF NOT EXISTS a.b.c + |WITH PROPERTIES ('a'='a', 'b'='b', 'c'='c') + |COMMENT 'namespace_comment' LOCATION '/home/user/db' + """.stripMargin), + expected) + + comparePlans( + parsePlan( + """ + |CREATE DATABASE IF NOT EXISTS a.b.c + |WITH DBPROPERTIES ('a'='a', 'b'='b', 'c'='c') + |COMMENT 'namespace_comment' LOCATION '/home/user/db' + """.stripMargin), + expected) } test("create namespace -- check duplicates") { def createDatabase(duplicateClause: String): String = { s""" - |CREATE DATABASE IF NOT EXISTS database_name + |CREATE NAMESPACE IF NOT EXISTS a.b.c |$duplicateClause |$duplicateClause """.stripMargin } - val sql1 = createDatabase("COMMENT 'database_comment'") + val sql1 = createDatabase("COMMENT 'namespace_comment'") val sql2 = createDatabase("LOCATION '/home/user/db'") - val sql3 = createDatabase("WITH DBPROPERTIES ('a'='a', 'b'='b', 'c'='c')") + val sql3 = createDatabase("WITH PROPERTIES ('a'='a', 'b'='b', 'c'='c')") + val sql4 = createDatabase("WITH DBPROPERTIES ('a'='a', 'b'='b', 'c'='c')") intercept(sql1, "Found duplicate clauses: COMMENT") intercept(sql2, "Found duplicate clauses: LOCATION") - intercept(sql3, "Found duplicate clauses: WITH DBPROPERTIES") + intercept(sql3, "Found duplicate clauses: WITH PROPERTIES") + intercept(sql4, "Found duplicate clauses: WITH DBPROPERTIES") } test("create namespace - property values must be set") { assertUnsupported( - sql = "CREATE DATABASE my_db WITH DBPROPERTIES('key_without_value', 'key_with_value'='x')", + sql = "CREATE NAMESPACE a.b.c WITH PROPERTIES('key_without_value', 'key_with_value'='x')", containsThesePhrases = Seq("key_without_value")) } - test("create namespace - support for other types in DBPROPERTIES") { + test("create namespace -- either PROPERTIES or DBPROPERTIES is allowed") { + val sql = + s""" + |CREATE NAMESPACE IF NOT EXISTS a.b.c + |WITH PROPERTIES ('a'='a', 'b'='b', 'c'='c') + |WITH DBPROPERTIES ('a'='a', 'b'='b', 'c'='c') + """.stripMargin + intercept(sql, "Either PROPERTIES or DBPROPERTIES is allowed") + } + + test("create namespace - support for other types in PROPERTIES") { val sql = """ - |CREATE DATABASE database_name + |CREATE NAMESPACE a.b.c |LOCATION '/home/user/db' - |WITH DBPROPERTIES ('a'=1, 'b'=0.1, 'c'=TRUE) + |WITH PROPERTIES ('a'=1, 'b'=0.1, 'c'=TRUE) """.stripMargin comparePlans( parsePlan(sql), CreateNamespaceStatement( - Seq("database_name"), + Seq("a", "b", "c"), ifNotExists = false, Map( "a" -> "1", From 3fd447d6c96fe48d5ac37a828096b6cf8835d52c Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Tue, 22 Oct 2019 13:29:03 -0700 Subject: [PATCH 8/8] style change --- .../scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 46ec729c5f08..e7a4d3be804d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -2327,7 +2327,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging checkDuplicateClauses(ctx.PROPERTIES, "WITH PROPERTIES", ctx) checkDuplicateClauses(ctx.DBPROPERTIES, "WITH DBPROPERTIES", ctx) - if (!ctx.PROPERTIES.isEmpty() && !ctx.DBPROPERTIES.isEmpty()) { + if (!ctx.PROPERTIES.isEmpty && !ctx.DBPROPERTIES.isEmpty) { throw new ParseException(s"Either PROPERTIES or DBPROPERTIES is allowed.", ctx) }