Skip to content
Prev Previous commit
Next Next commit
Finishing up
  • Loading branch information
imback82 committed Oct 18, 2019
commit 27066a5c00d028efb5afa3901b3225e84c5e6393
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ statement
: query #statementDefault
| ctes? dmlStatementNoWith #dmlStatement
| USE NAMESPACE? multipartIdentifier #use
| CREATE (DATABASE | NAMESPACE) (IF NOT EXISTS)? multipartIdentifier
| CREATE (database | NAMESPACE) (IF NOT EXISTS)? multipartIdentifier
((COMMENT comment=STRING) |
locationSpec |
(WITH DBPROPERTIES tablePropertyList))* #createNamespace
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I missed this. Shall we use PROPERTIES instead of DBPROPERTIES? We can write DBPROPERTIES | PROPERTIES for backward compatibility

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for catching this. updated.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,15 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
s"Can not specify catalog `${catalog.name}` for view ${viewName.quoted} " +
s"because view support in catalog has not been implemented yet")

case c @ CreateNamespaceStatement(NonSessionCatalog(catalog, nameParts), _, _, _, _) =>
CreateNamespace(
catalog.asNamespaceCatalog,
nameParts,
c.ifNotExists,
c.comment,
c.locationSpec,
c.properties)

case ShowNamespacesStatement(Some(CatalogAndNamespace(catalog, namespace)), pattern) =>
ShowNamespaces(catalog.asNamespaceCatalog, namespace, pattern)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2300,15 +2300,15 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
}

/**
* Create a [[CreateDatabaseCommand]] command.
* Create a [[CreateNamespaceStatement]] command.
*
* For example:
* {{{
* CREATE DATABASE [IF NOT EXISTS] database_name
* create_database_clauses;
* CREATE NAMESPACE [IF NOT EXISTS] ns1.ns2.ns3
* create_namespace_clauses;
*
* create_database_clauses (order insensitive):
* [COMMENT database_comment]
* create_namespace_clauses (order insensitive):
* [COMMENT namespace_comment]
* [LOCATION path]
* [WITH DBPROPERTIES (key1=val1, key2=val2, ...)]
* }}}
Expand All @@ -2318,10 +2318,6 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx)
checkDuplicateClauses(ctx.DBPROPERTIES, "WITH DBPROPERTIES", ctx)

if (ctx.DATABASE != null && ctx.multipartIdentifier != null) {
throw new ParseException(s"FROM/IN operator is not allowed in SHOW DATABASES", ctx)
}

CreateNamespaceStatement(
visitMultipartIdentifier(ctx.multipartIdentifier),
ctx.EXISTS != null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ case class CreateNamespaceStatement(
namespace: Seq[String],
ifNotExists: Boolean,
comment: Option[String],
location: Option[String],
locationSpec: Option[String],
properties: Map[String, String]) extends ParsedStatement
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan, what is the behavior of IF NOT EXISTS with DBPROPERTIES? If the namespace exists, are properties updated?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as table/database, if table/database doesn't exist, the command is a noop.


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,16 @@ case class ReplaceTableAsSelect(
}
}

/**
* The logical plan of the CREATE NAMESPACE command that works for v2 catalogs.
*/
case class CreateNamespace(
catalog: SupportsNamespaces,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: 4 space identation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

namespace: Seq[String],
ifNotExists: Boolean,
comment: Option[String],
locationSpec: Option[String],
properties: Map[String, String]) extends Command

/**
* The logical plan of the SHOW NAMESPACES command that works for v2 catalogs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,63 @@ class DDLParserSuite extends AnalysisTest {
ShowTablesStatement(Some(Seq("tbl")), Some("*dog*")))
}

test("create namespace") {
val sql =
"""
|CREATE DATABASE IF NOT EXISTS database_name
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use a.b.c instead of database_name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

|WITH DBPROPERTIES ('a'='a', 'b'='b', 'c'='c')
|COMMENT 'database_comment' LOCATION '/home/user/db'
""".stripMargin
comparePlans(
parsePlan(sql),
CreateNamespaceStatement(
Seq("database_name"),
ifNotExists = true,
Some("database_comment"),
Some("/home/user/db"),
Map("a" -> "a", "b" -> "b", "c" -> "c")))
}

test("create namespace -- check duplicates") {
def createDatabase(duplicateClause: String): String = {
s"""
|CREATE DATABASE IF NOT EXISTS database_name
|$duplicateClause
|$duplicateClause
""".stripMargin
}
val sql1 = createDatabase("COMMENT 'database_comment'")
val sql2 = createDatabase("LOCATION '/home/user/db'")
val sql3 = createDatabase("WITH DBPROPERTIES ('a'='a', 'b'='b', 'c'='c')")

intercept(sql1, "Found duplicate clauses: COMMENT")
intercept(sql2, "Found duplicate clauses: LOCATION")
intercept(sql3, "Found duplicate clauses: WITH DBPROPERTIES")
}

test("create namespace - property values must be set") {
assertUnsupported(
sql = "CREATE DATABASE my_db WITH DBPROPERTIES('key_without_value', 'key_with_value'='x')",
containsThesePhrases = Seq("key_without_value"))
}

test("create namespace - support for other types in DBPROPERTIES") {
val sql =
"""
|CREATE DATABASE database_name
|LOCATION '/home/user/db'
|WITH DBPROPERTIES ('a'=1, 'b'=0.1, 'c'=TRUE)
""".stripMargin
comparePlans(
parsePlan(sql),
CreateNamespaceStatement(
Seq("database_name"),
ifNotExists = false,
None,
Some("/home/user/db"),
Map("a" -> "1", "b" -> "0.1", "c" -> "true")))
}

test("show databases: basic") {
comparePlans(
parsePlan("SHOW DATABASES"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class ParserUtilsSuite extends SparkFunSuite {
|WITH DBPROPERTIES ('a'='a', 'b'='b', 'c'='c')
""".stripMargin
) { parser =>
parser.statement().asInstanceOf[CreateDatabaseContext]
parser.statement().asInstanceOf[CreateNamespaceContext]
}

val emptyContext = buildContext("") { parser =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, LookupCatalog, TableChange, V1Table}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand, ShowTablesCommand}
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, CreateDatabaseCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand, ShowTablesCommand}
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource}
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -255,6 +255,13 @@ class ResolveSessionCatalog(
case DropViewStatement(SessionCatalog(catalog, viewName), ifExists) =>
DropTableCommand(viewName.asTableIdentifier, ifExists, isView = true, purge = false)

case c @ CreateNamespaceStatement(SessionCatalog(catalog, nameParts), _, _, _, _) =>
if (nameParts.length != 1) {
throw new AnalysisException(
s"The database name is not valid: ${nameParts.quoted}")
}
CreateDatabaseCommand(nameParts.head, c.ifNotExists, c.locationSpec, c.comment, c.properties)

case ShowTablesStatement(Some(SessionCatalog(catalog, nameParts)), pattern) =>
if (nameParts.length != 1) {
throw new AnalysisException(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.v2

import scala.collection.JavaConverters.mapAsJavaMapConverter

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException
import org.apache.spark.sql.catalyst.catalog.CatalogUtils
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.catalog.SupportsNamespaces

/**
* Physical plan node for creating a namespace.
*/
case class CreateNamespaceExec(
catalog: SupportsNamespaces,
namespace: Seq[String],
ifNotExists: Boolean,
comment: Option[String],
locationSpec: Option[String],
private var properties: Map[String, String])
extends V2CommandExec {
override protected def run(): Seq[InternalRow] = {
val ns = namespace.toArray
if (ifNotExists && catalog.namespaceExists(ns)) {
throw new NamespaceAlreadyExistsException(ns)
}

// Add any additional properties.
if (comment.nonEmpty) {
properties += ("comment" -> comment.get)
}
if (locationSpec.nonEmpty) {
properties += ("locationSpec" -> CatalogUtils.stringToURI(locationSpec.get).toString)
}

// Note that even if ifNotExists is set to false, createNamespace can still throw
// NamespaceAlreadyExistsException depending on the SupportsNamespaces implementation.
catalog.createNamespace(ns, properties.asJava)

Seq.empty
}

override def output: Seq[Attribute] = Seq.empty
}
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,14 @@ class DataSourceV2SQLSuite
assert(expected === df.collect())
}

test("CreateNameSpace: session catalog") {
sql("CREATE NAMESPACE ns")
testShowNamespaces("SHOW NAMESPACES", Seq("default", "ns"))

sql("CREATE NAMESPACE testcat.ns1.ns2")
testShowNamespaces("SHOW NAMESPACES testcat", Seq("ns1"))
}

test("ShowNamespaces: show root namespaces with default v2 catalog") {
spark.conf.set("spark.sql.default.catalog", "testcat")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,46 +74,6 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession {
}.head
}

test("create database") {
val sql =
"""
|CREATE DATABASE IF NOT EXISTS database_name
|WITH DBPROPERTIES ('a'='a', 'b'='b', 'c'='c')
|COMMENT 'database_comment' LOCATION '/home/user/db'
""".stripMargin
val parsed = parser.parsePlan(sql)
val expected = CreateDatabaseCommand(
"database_name",
ifNotExists = true,
Some("/home/user/db"),
Some("database_comment"),
Map("a" -> "a", "b" -> "b", "c" -> "c"))
comparePlans(parsed, expected)
}

test("create database -- check duplicates") {
def createDatabase(duplicateClause: String): String = {
s"""
|CREATE DATABASE IF NOT EXISTS database_name
|$duplicateClause
|$duplicateClause
""".stripMargin
}
val sql1 = createDatabase("COMMENT 'database_comment'")
val sql2 = createDatabase("LOCATION '/home/user/db'")
val sql3 = createDatabase("WITH DBPROPERTIES ('a'='a', 'b'='b', 'c'='c')")

intercept(sql1, "Found duplicate clauses: COMMENT")
intercept(sql2, "Found duplicate clauses: LOCATION")
intercept(sql3, "Found duplicate clauses: WITH DBPROPERTIES")
}

test("create database - property values must be set") {
assertUnsupported(
sql = "CREATE DATABASE my_db WITH DBPROPERTIES('key_without_value', 'key_with_value'='x')",
containsThesePhrases = Seq("key_without_value"))
}

test("drop database") {
val sql1 = "DROP DATABASE IF EXISTS database_name RESTRICT"
val sql2 = "DROP DATABASE IF EXISTS database_name CASCADE"
Expand Down Expand Up @@ -891,24 +851,6 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession {
comparePlans(parsed3, expected3)
}

test("support for other types in DBPROPERTIES") {
val sql =
"""
|CREATE DATABASE database_name
|LOCATION '/home/user/db'
|WITH DBPROPERTIES ('a'=1, 'b'=0.1, 'c'=TRUE)
""".stripMargin
val parsed = parser.parsePlan(sql)
val expected = CreateDatabaseCommand(
"database_name",
ifNotExists = false,
Some("/home/user/db"),
None,
Map("a" -> "1", "b" -> "0.1", "c" -> "true"))

comparePlans(parsed, expected)
}

test("Test CTAS #1") {
val s1 =
"""
Expand Down