Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/sql-keywords.md
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ Below is a list of all the keywords in Spark SQL.
<tr><td>PRECEDING</td><td>non-reserved</td><td>non-reserved</td><td>non-reserved</td></tr>
<tr><td>PRIMARY</td><td>reserved</td><td>non-reserved</td><td>reserved</td></tr>
<tr><td>PRINCIPALS</td><td>non-reserved</td><td>non-reserved</td><td>non-reserved</td></tr>
<tr><td>PROPERTIES</td><td>non-reserved</td><td>non-reserved</td><td>non-reserved</td></tr>
<tr><td>PURGE</td><td>non-reserved</td><td>non-reserved</td><td>non-reserved</td></tr>
<tr><td>QUERY</td><td>non-reserved</td><td>non-reserved</td><td>non-reserved</td></tr>
<tr><td>RANGE</td><td>non-reserved</td><td>non-reserved</td><td>reserved</td></tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,10 @@ statement
: query #statementDefault
| ctes? dmlStatementNoWith #dmlStatement
| USE NAMESPACE? multipartIdentifier #use
| CREATE database (IF NOT EXISTS)? db=errorCapturingIdentifier
| CREATE (database | NAMESPACE) (IF NOT EXISTS)? multipartIdentifier
((COMMENT comment=STRING) |
locationSpec |
(WITH DBPROPERTIES tablePropertyList))* #createDatabase
(WITH (DBPROPERTIES | PROPERTIES) tablePropertyList))* #createNamespace
| ALTER database db=errorCapturingIdentifier
SET DBPROPERTIES tablePropertyList #setDatabaseProperties
| ALTER database db=errorCapturingIdentifier
Expand Down Expand Up @@ -1039,6 +1039,7 @@ ansiNonReserved
| POSITION
| PRECEDING
| PRINCIPALS
| PROPERTIES
| PURGE
| QUERY
| RANGE
Expand Down Expand Up @@ -1299,6 +1300,7 @@ nonReserved
| PRECEDING
| PRIMARY
| PRINCIPALS
| PROPERTIES
| PURGE
| QUERY
| RANGE
Expand Down Expand Up @@ -1564,6 +1566,7 @@ POSITION: 'POSITION';
PRECEDING: 'PRECEDING';
PRIMARY: 'PRIMARY';
PRINCIPALS: 'PRINCIPALS';
PROPERTIES: 'PROPERTIES';
PURGE: 'PURGE';
QUERY: 'QUERY';
RANGE: 'RANGE';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,13 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
s"Can not specify catalog `${catalog.name}` for view ${viewName.quoted} " +
s"because view support in catalog has not been implemented yet")

case c @ CreateNamespaceStatement(NonSessionCatalog(catalog, nameParts), _, _) =>
CreateNamespace(
catalog.asNamespaceCatalog,
nameParts,
c.ifNotExists,
c.properties)

case ShowNamespacesStatement(Some(CatalogAndNamespace(catalog, namespace)), pattern) =>
ShowNamespaces(catalog.asNamespaceCatalog, namespace, pattern)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2307,6 +2307,46 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
}
}

/**
* Create a [[CreateNamespaceStatement]] command.
*
* For example:
* {{{
* CREATE NAMESPACE [IF NOT EXISTS] ns1.ns2.ns3
* create_namespace_clauses;
*
* create_namespace_clauses (order insensitive):
* [COMMENT namespace_comment]
* [LOCATION path]
* [WITH PROPERTIES (key1=val1, key2=val2, ...)]
* }}}
*/
override def visitCreateNamespace(ctx: CreateNamespaceContext): LogicalPlan = withOrigin(ctx) {
checkDuplicateClauses(ctx.COMMENT, "COMMENT", ctx)
checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx)
checkDuplicateClauses(ctx.PROPERTIES, "WITH PROPERTIES", ctx)
checkDuplicateClauses(ctx.DBPROPERTIES, "WITH DBPROPERTIES", ctx)

if (!ctx.PROPERTIES.isEmpty && !ctx.DBPROPERTIES.isEmpty) {
throw new ParseException(s"Either PROPERTIES or DBPROPERTIES is allowed.", ctx)
}

var properties = ctx.tablePropertyList.asScala.headOption
.map(visitPropertyKeyValues)
.getOrElse(Map.empty)
Option(ctx.comment).map(string).map {
properties += CreateNamespaceStatement.COMMENT_PROPERTY_KEY -> _
}
ctx.locationSpec.asScala.headOption.map(visitLocationSpec).map {
properties += CreateNamespaceStatement.LOCATION_PROPERTY_KEY -> _
}

CreateNamespaceStatement(
visitMultipartIdentifier(ctx.multipartIdentifier),
ctx.EXISTS != null,
properties)
}

/**
* Create a [[ShowNamespacesStatement]] command.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,19 @@ case class InsertIntoStatement(
case class ShowTablesStatement(namespace: Option[Seq[String]], pattern: Option[String])
extends ParsedStatement

/**
* A CREATE NAMESPACE statement, as parsed from SQL.
*/
case class CreateNamespaceStatement(
namespace: Seq[String],
ifNotExists: Boolean,
properties: Map[String, String]) extends ParsedStatement

object CreateNamespaceStatement {
val COMMENT_PROPERTY_KEY: String = "comment"
val LOCATION_PROPERTY_KEY: String = "location"
}

/**
* A SHOW NAMESPACES statement, as parsed from SQL.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,14 @@ case class ReplaceTableAsSelect(
}
}

/**
* The logical plan of the CREATE NAMESPACE command that works for v2 catalogs.
*/
case class CreateNamespace(
catalog: SupportsNamespaces,
namespace: Seq[String],
ifNotExists: Boolean,
properties: Map[String, String]) extends Command

/**
* The logical plan of the SHOW NAMESPACES command that works for v2 catalogs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,90 @@ class DDLParserSuite extends AnalysisTest {
ShowTablesStatement(Some(Seq("tbl")), Some("*dog*")))
}

test("create namespace -- backward compatibility with DATABASE/DBPROPERTIES") {
val expected = CreateNamespaceStatement(
Seq("a", "b", "c"),
ifNotExists = true,
Map(
"a" -> "a",
"b" -> "b",
"c" -> "c",
"comment" -> "namespace_comment",
"location" -> "/home/user/db"))

comparePlans(
parsePlan(
"""
|CREATE NAMESPACE IF NOT EXISTS a.b.c
|WITH PROPERTIES ('a'='a', 'b'='b', 'c'='c')
|COMMENT 'namespace_comment' LOCATION '/home/user/db'
""".stripMargin),
expected)

comparePlans(
parsePlan(
"""
|CREATE DATABASE IF NOT EXISTS a.b.c
|WITH DBPROPERTIES ('a'='a', 'b'='b', 'c'='c')
|COMMENT 'namespace_comment' LOCATION '/home/user/db'
""".stripMargin),
expected)
}

test("create namespace -- check duplicates") {
def createDatabase(duplicateClause: String): String = {
s"""
|CREATE NAMESPACE IF NOT EXISTS a.b.c
|$duplicateClause
|$duplicateClause
""".stripMargin
}
val sql1 = createDatabase("COMMENT 'namespace_comment'")
val sql2 = createDatabase("LOCATION '/home/user/db'")
val sql3 = createDatabase("WITH PROPERTIES ('a'='a', 'b'='b', 'c'='c')")
val sql4 = createDatabase("WITH DBPROPERTIES ('a'='a', 'b'='b', 'c'='c')")

intercept(sql1, "Found duplicate clauses: COMMENT")
intercept(sql2, "Found duplicate clauses: LOCATION")
intercept(sql3, "Found duplicate clauses: WITH PROPERTIES")
intercept(sql4, "Found duplicate clauses: WITH DBPROPERTIES")
}

test("create namespace - property values must be set") {
assertUnsupported(
sql = "CREATE NAMESPACE a.b.c WITH PROPERTIES('key_without_value', 'key_with_value'='x')",
containsThesePhrases = Seq("key_without_value"))
}

test("create namespace -- either PROPERTIES or DBPROPERTIES is allowed") {
val sql =
s"""
|CREATE NAMESPACE IF NOT EXISTS a.b.c
|WITH PROPERTIES ('a'='a', 'b'='b', 'c'='c')
|WITH DBPROPERTIES ('a'='a', 'b'='b', 'c'='c')
""".stripMargin
intercept(sql, "Either PROPERTIES or DBPROPERTIES is allowed")
}

test("create namespace - support for other types in PROPERTIES") {
val sql =
"""
|CREATE NAMESPACE a.b.c
|LOCATION '/home/user/db'
|WITH PROPERTIES ('a'=1, 'b'=0.1, 'c'=TRUE)
""".stripMargin
comparePlans(
parsePlan(sql),
CreateNamespaceStatement(
Seq("a", "b", "c"),
ifNotExists = false,
Map(
"a" -> "1",
"b" -> "0.1",
"c" -> "true",
"location" -> "/home/user/db")))
}

test("show databases: basic") {
comparePlans(
parsePlan("SHOW DATABASES"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class ParserUtilsSuite extends SparkFunSuite {
|WITH DBPROPERTIES ('a'='a', 'b'='b', 'c'='c')
""".stripMargin
) { parser =>
parser.statement().asInstanceOf[CreateDatabaseContext]
parser.statement().asInstanceOf[CreateNamespaceContext]
}

val emptyContext = buildContext("") { parser =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, LookupCatalog, TableChange, V1Table}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableRecoverPartitionsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, AnalyzeColumnCommand, AnalyzePartitionCommand, AnalyzeTableCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand, ShowPartitionsCommand, ShowTablesCommand, TruncateTableCommand}
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableRecoverPartitionsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, AnalyzeColumnCommand, AnalyzePartitionCommand, AnalyzeTableCommand, CreateDatabaseCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand, ShowPartitionsCommand, ShowTablesCommand, TruncateTableCommand}
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource}
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -255,6 +255,19 @@ class ResolveSessionCatalog(
case DropViewStatement(SessionCatalog(catalog, viewName), ifExists) =>
DropTableCommand(viewName.asTableIdentifier, ifExists, isView = true, purge = false)

case c @ CreateNamespaceStatement(SessionCatalog(catalog, nameParts), _, _) =>
if (nameParts.length != 1) {
throw new AnalysisException(
s"The database name is not valid: ${nameParts.quoted}")
}

val comment = c.properties.get(CreateNamespaceStatement.COMMENT_PROPERTY_KEY)
val location = c.properties.get(CreateNamespaceStatement.LOCATION_PROPERTY_KEY)
val newProperties = c.properties -
CreateNamespaceStatement.COMMENT_PROPERTY_KEY -
CreateNamespaceStatement.LOCATION_PROPERTY_KEY
CreateDatabaseCommand(nameParts.head, c.ifNotExists, location, comment, newProperties)

case ShowTablesStatement(Some(SessionCatalog(catalog, nameParts)), pattern) =>
if (nameParts.length != 1) {
throw new AnalysisException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,33 +329,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
)
}

/**
* Create a [[CreateDatabaseCommand]] command.
*
* For example:
* {{{
* CREATE DATABASE [IF NOT EXISTS] database_name
* create_database_clauses;
*
* create_database_clauses (order insensitive):
* [COMMENT database_comment]
* [LOCATION path]
* [WITH DBPROPERTIES (key1=val1, key2=val2, ...)]
* }}}
*/
override def visitCreateDatabase(ctx: CreateDatabaseContext): LogicalPlan = withOrigin(ctx) {
checkDuplicateClauses(ctx.COMMENT, "COMMENT", ctx)
checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx)
checkDuplicateClauses(ctx.DBPROPERTIES, "WITH DBPROPERTIES", ctx)

CreateDatabaseCommand(
ctx.db.getText,
ctx.EXISTS != null,
ctx.locationSpec.asScala.headOption.map(visitLocationSpec),
Option(ctx.comment).map(string),
ctx.tablePropertyList.asScala.headOption.map(visitPropertyKeyValues).getOrElse(Map.empty))
}

/**
* Create an [[AlterDatabasePropertiesCommand]] command.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.v2

import scala.collection.JavaConverters.mapAsJavaMapConverter

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.catalog.SupportsNamespaces

/**
* Physical plan node for creating a namespace.
*/
case class CreateNamespaceExec(
catalog: SupportsNamespaces,
namespace: Seq[String],
ifNotExists: Boolean,
private var properties: Map[String, String])
extends V2CommandExec {
override protected def run(): Seq[InternalRow] = {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

val ns = namespace.toArray
if (!catalog.namespaceExists(ns)) {
try {
catalog.createNamespace(ns, properties.asJava)
} catch {
case _: NamespaceAlreadyExistsException if ifNotExists =>
logWarning(s"Namespace ${namespace.quoted} was created concurrently. Ignoring.")
}
} else if (!ifNotExists) {
throw new NamespaceAlreadyExistsException(ns)
}

Seq.empty
}

override def output: Seq[Attribute] = Seq.empty
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.collection.mutable
import org.apache.spark.sql.{AnalysisException, Strategy}
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, PredicateHelper, SubqueryExpression}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeTable, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition, ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, ShowNamespaces, ShowTables}
import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateNamespace, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeTable, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition, ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, ShowNamespaces, ShowTables}
import org.apache.spark.sql.connector.catalog.{StagingTableCatalog, TableCapability}
import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownFilters, SupportsPushDownRequiredColumns}
import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream}
Expand Down Expand Up @@ -289,6 +289,9 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
case AlterTable(catalog, ident, _, changes) =>
AlterTableExec(catalog, ident, changes) :: Nil

case CreateNamespace(catalog, namespace, ifNotExists, properties) =>
CreateNamespaceExec(catalog, namespace, ifNotExists, properties) :: Nil

case r: ShowNamespaces =>
ShowNamespacesExec(r.output, r.catalog, r.namespace, r.pattern) :: Nil

Expand Down
Loading