From ffc391bc8d4047499dcfac90a5881894b278ab3c Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 22 Aug 2019 23:04:59 +0800 Subject: [PATCH 1/8] create a public API for v2 session catalog --- .../sql/catalog/v2/CatalogExtension.java | 101 ++++++++++++++++++ .../spark/sql/catalog/v2/CatalogManager.scala | 13 ++- .../sql/catalyst/analysis/Analyzer.scala | 30 +++++- .../apache/spark/sql/internal/SQLConf.scala | 7 +- .../catalog/CatalogManagerSuite.scala | 7 +- .../datasources/v2/V2SessionCatalog.scala | 26 +++-- .../internal/BaseSessionStateBuilder.scala | 6 +- .../command/PlanResolutionSuite.scala | 8 +- ...SourceV2DataFrameSessionCatalogSuite.scala | 7 +- .../v2/utils/TestV2SessionCatalogBase.scala | 5 +- .../sql/hive/HiveSessionStateBuilder.scala | 2 +- 11 files changed, 172 insertions(+), 40 deletions(-) create mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogExtension.java diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogExtension.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogExtension.java new file mode 100644 index 000000000000..a2a4bbeb6e5c --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogExtension.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +import java.util.Map; + +import org.apache.spark.sql.catalog.v2.expressions.Transform; +import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; +import org.apache.spark.sql.sources.v2.Table; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +/** + * An API to extend the Spark built-in session catalog. Implementation can override some methods + * to apply custom logic. For example, they can override {@code createTable}, do something else + * before calling {@code super.createTable}. + */ +public abstract class CatalogExtension implements TableCatalog { + + private TableCatalog delegate; + + /** + * This will be called only once by Spark to pass in the Spark built-in session catalog. + */ + public final void setDelegateCatalog(TableCatalog delegate) { + this.delegate = delegate; + } + + @Override + public String name() { + return delegate.name(); + } + + @Override + public final void initialize(String name, CaseInsensitiveStringMap options) {} + + @Override + public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceException { + return delegate.listTables(namespace); + } + + @Override + public Table loadTable(Identifier ident) throws NoSuchTableException { + return delegate.loadTable(ident); + } + + @Override + public void invalidateTable(Identifier ident) { + delegate.invalidateTable(ident); + } + + @Override + public boolean tableExists(Identifier ident) { + return delegate.tableExists(ident); + } + + @Override + public Table createTable( + Identifier ident, + StructType schema, + Transform[] partitions, + Map properties) throws TableAlreadyExistsException, NoSuchNamespaceException { + return delegate.createTable(ident, schema, partitions, properties); + } + + @Override + public Table alterTable( + Identifier ident, + TableChange... changes) throws NoSuchTableException { + return delegate.alterTable(ident, changes); + } + + @Override + public boolean dropTable(Identifier ident) { + return delegate.dropTable(ident); + } + + @Override + public void renameTable( + Identifier oldIdent, + Identifier newIdent) throws NoSuchTableException, TableAlreadyExistsException { + delegate.renameTable(oldIdent, newIdent); + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/CatalogManager.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/CatalogManager.scala index d5a6a61f8257..baa946d23ae2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/CatalogManager.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/CatalogManager.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.internal.SQLConf * A thread-safe manager for [[CatalogPlugin]]s. It tracks all the registered catalogs, and allow * the caller to look up a catalog by name. */ -class CatalogManager(conf: SQLConf) extends Logging { +class CatalogManager(conf: SQLConf, sessionCatalog: TableCatalog) extends Logging { private val catalogs = mutable.HashMap.empty[String, CatalogPlugin] @@ -47,9 +47,18 @@ class CatalogManager(conf: SQLConf) extends Logging { } } + private def loadV2SessionCatalog(): CatalogPlugin = { + Catalogs.load(CatalogManager.SESSION_CATALOG_NAME, conf) match { + case extension: CatalogExtension => + extension.setDelegateCatalog(sessionCatalog) + extension + case other => other + } + } + def v2SessionCatalog: Option[CatalogPlugin] = { try { - Some(catalog(CatalogManager.SESSION_CATALOG_NAME)) + Some(catalogs.getOrElseUpdate(CatalogManager.SESSION_CATALOG_NAME, loadV2SessionCatalog())) } catch { case NonFatal(e) => logError("Cannot load v2 session catalog", e) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index e2be5090486b..99a40c305a1f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.analysis +import java.util import java.util.Locale import scala.collection.mutable @@ -25,7 +26,7 @@ import scala.util.Random import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalog.v2._ -import org.apache.spark.sql.catalog.v2.expressions.{FieldReference, IdentityTransform} +import org.apache.spark.sql.catalog.v2.expressions.{FieldReference, IdentityTransform, Transform} import org.apache.spark.sql.catalyst._ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.encoders.OuterScopes @@ -45,6 +46,7 @@ import org.apache.spark.sql.internal.SQLConf.{PartitionOverwriteMode, StoreAssig import org.apache.spark.sql.sources.v2.Table import org.apache.spark.sql.sources.v2.internal.V1Table import org.apache.spark.sql.types._ +import org.apache.spark.sql.util.CaseInsensitiveStringMap /** * A trivial [[Analyzer]] with a dummy [[SessionCatalog]] and [[EmptyFunctionRegistry]]. @@ -60,6 +62,22 @@ object SimpleAnalyzer extends Analyzer( }, new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true)) +object FakeV2SessionCatalog extends TableCatalog { + private def fail() = throw new UnsupportedOperationException + override def listTables(namespace: Array[String]): Array[Identifier] = fail() + override def loadTable(ident: Identifier): Table = fail() + override def createTable( + ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: util.Map[String, String]): Table = fail() + override def alterTable(ident: Identifier, changes: TableChange*): Table = fail() + override def dropTable(ident: Identifier): Boolean = fail() + override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = fail() + override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = fail() + override def name(): String = fail() +} + /** * Provides a way to keep state during the analysis, this enables us to decouple the concerns * of analysis environment from the catalog. @@ -101,15 +119,21 @@ object AnalysisContext { */ class Analyzer( catalog: SessionCatalog, + v2SessionCatalog: TableCatalog, conf: SQLConf, maxIterations: Int) extends RuleExecutor[LogicalPlan] with CheckAnalysis with LookupCatalog { + // Only for tests. def this(catalog: SessionCatalog, conf: SQLConf) = { - this(catalog, conf, conf.optimizerMaxIterations) + this(catalog, FakeV2SessionCatalog, conf, conf.optimizerMaxIterations) + } + + def this(catalog: SessionCatalog, v2SessionCatalog: TableCatalog, conf: SQLConf) = { + this(catalog, v2SessionCatalog, conf, conf.optimizerMaxIterations) } - override val catalogManager: CatalogManager = new CatalogManager(conf) + override val catalogManager: CatalogManager = new CatalogManager(conf, v2SessionCatalog) def executeAndCheck(plan: LogicalPlan, tracker: QueryPlanningTracker): LogicalPlan = { AnalysisHelper.markInAnalyzer { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 006bb99e5939..0620c56cc718 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1947,9 +1947,12 @@ object SQLConf { .createOptional val V2_SESSION_CATALOG = buildConf("spark.sql.catalog.session") - .doc("Name of the default v2 catalog, used when a catalog is not identified in queries") + .doc("A catalog implementation which will be used as the session catalog for Spark " + + "queries. The implementation must either extend `CatalogExtension`, or implement " + + "`CatalogPlugin`. When extending `CatalogExtension`, the default session catalog " + + "will be passed in so that the implementation can fallback some functions to it.") .stringConf - .createWithDefault("org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog") + .createOptional val LEGACY_LOOSE_UPCAST = buildConf("spark.sql.legacy.looseUpcast") .doc("When true, the upcast will be loose and allows string to atomic types.") diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogManagerSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogManagerSuite.scala index f7f190136bfc..d2a2ba50ead4 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogManagerSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogManagerSuite.scala @@ -21,6 +21,7 @@ import java.util import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalog.v2.{CatalogManager, NamespaceChange, SupportsNamespaces} +import org.apache.spark.sql.catalyst.analysis.FakeV2SessionCatalog import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -28,7 +29,7 @@ class CatalogManagerSuite extends SparkFunSuite { test("CatalogManager should reflect the changes of default catalog") { val conf = new SQLConf - val catalogManager = new CatalogManager(conf) + val catalogManager = new CatalogManager(conf, FakeV2SessionCatalog) assert(catalogManager.currentCatalog.isEmpty) assert(catalogManager.currentNamespace.sameElements(Array("default"))) @@ -42,7 +43,7 @@ class CatalogManagerSuite extends SparkFunSuite { test("CatalogManager should keep the current catalog once set") { val conf = new SQLConf - val catalogManager = new CatalogManager(conf) + val catalogManager = new CatalogManager(conf, FakeV2SessionCatalog) assert(catalogManager.currentCatalog.isEmpty) conf.setConfString("spark.sql.catalog.dummy", classOf[DummyCatalog].getName) catalogManager.setCurrentCatalog("dummy") @@ -57,7 +58,7 @@ class CatalogManagerSuite extends SparkFunSuite { test("current namespace should be updated when switching current catalog") { val conf = new SQLConf - val catalogManager = new CatalogManager(conf) + val catalogManager = new CatalogManager(conf, FakeV2SessionCatalog) catalogManager.setCurrentNamespace(Array("abc")) assert(catalogManager.currentNamespace.sameElements(Array("abc"))) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index ebfd7384930f..3e960048a1ad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -24,15 +24,15 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalog.v2.{Identifier, NamespaceChange, SupportsNamespaces, TableCatalog, TableChange} +import org.apache.spark.sql.catalog.v2.{CatalogManager, Identifier, NamespaceChange, SupportsNamespaces, TableCatalog, TableChange} import org.apache.spark.sql.catalog.v2.NamespaceChange.{RemoveProperty, SetProperty} -import org.apache.spark.sql.catalog.v2.expressions.{BucketTransform, FieldReference, IdentityTransform, Transform} +import org.apache.spark.sql.catalog.v2.expressions.{BucketTransform, FieldReference, IdentityTransform, LogicalExpressions, Transform} import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, CatalogTable, CatalogTableType, CatalogUtils, SessionCatalog} import org.apache.spark.sql.execution.datasources.DataSource -import org.apache.spark.sql.internal.SessionState +import org.apache.spark.sql.internal.{SessionState, SQLConf} import org.apache.spark.sql.sources.v2.Table import org.apache.spark.sql.sources.v2.internal.V1Table import org.apache.spark.sql.types.StructType @@ -41,25 +41,23 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap /** * A [[TableCatalog]] that translates calls to the v1 SessionCatalog. */ -class V2SessionCatalog(sessionState: SessionState) extends TableCatalog with SupportsNamespaces { +class V2SessionCatalog(catalog: SessionCatalog, conf: SQLConf) + extends TableCatalog with SupportsNamespaces { import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ import V2SessionCatalog._ + def this(sessionState: SessionState) = this(sessionState.catalog, sessionState.conf) + def this() = { this(SparkSession.active.sessionState) } override val defaultNamespace: Array[String] = Array("default") - private lazy val catalog: SessionCatalog = sessionState.catalog - - private var _name: String = _ + override def name: String = CatalogManager.SESSION_CATALOG_NAME - override def name: String = _name - - override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = { - this._name = name - } + // This class is instantiated by Spark, so `initialize` method will not be called. + override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {} override def listTables(namespace: Array[String]): Array[Identifier] = { namespace match { @@ -92,7 +90,7 @@ class V2SessionCatalog(sessionState: SessionState) extends TableCatalog with Sup properties: util.Map[String, String]): Table = { val (partitionColumns, maybeBucketSpec) = V2SessionCatalog.convertTransforms(partitions) - val provider = properties.getOrDefault("provider", sessionState.conf.defaultDataSourceName) + val provider = properties.getOrDefault("provider", conf.defaultDataSourceName) val tableProperties = properties.asScala val location = Option(properties.get(LOCATION_TABLE_PROP)) val storage = DataSource.buildStorageFormatFromOptions(tableProperties.toMap) @@ -108,7 +106,7 @@ class V2SessionCatalog(sessionState: SessionState) extends TableCatalog with Sup partitionColumnNames = partitionColumns, bucketSpec = maybeBucketSpec, properties = tableProperties.toMap, - tracksPartitionsInCatalog = sessionState.conf.manageFilesourcePartitions, + tracksPartitionsInCatalog = conf.manageFilesourcePartitions, comment = Option(properties.get(COMMENT_TABLE_PROP))) try { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index 831c19bbe12c..6773f92a736f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{ColumnarRule, QueryExecution, SparkOptimizer, SparkPlanner, SparkSqlParser} import org.apache.spark.sql.execution.analysis.DetectAmbiguousSelfJoin import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.execution.datasources.v2.{V2StreamingScanSupportCheck, V2WriteSupportCheck} +import org.apache.spark.sql.execution.datasources.v2.{V2SessionCatalog, V2StreamingScanSupportCheck, V2WriteSupportCheck} import org.apache.spark.sql.streaming.StreamingQueryManager import org.apache.spark.sql.util.ExecutionListenerManager @@ -151,6 +151,8 @@ abstract class BaseSessionStateBuilder( catalog } + protected lazy val v2SessionCatalog = new V2SessionCatalog(catalog, conf) + /** * Interface exposed to the user for registering user-defined functions. * @@ -164,7 +166,7 @@ abstract class BaseSessionStateBuilder( * * Note: this depends on the `conf` and `catalog` fields. */ - protected def analyzer: Analyzer = new Analyzer(catalog, conf) { + protected def analyzer: Analyzer = new Analyzer(catalog, v2SessionCatalog, conf) { override val extendedResolutionRules: Seq[Rule[LogicalPlan]] = new FindDataSourceTable(session) +: new ResolveSQLOnFile(session) +: diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index bba1dc0f697a..5c3c185fa948 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -61,14 +61,12 @@ class PlanResolutionSuite extends AnalysisTest { invocation.getArgument[String](0) match { case "testcat" => testCat - case "session" => - v2SessionCatalog case name => throw new CatalogNotFoundException(s"No such catalog: $name") } }) when(manager.defaultCatalog).thenReturn(Some(testCat)) - when(manager.v2SessionCatalog).thenCallRealMethod() + when(manager.v2SessionCatalog).thenReturn(Some(v2SessionCatalog)) manager } @@ -78,14 +76,12 @@ class PlanResolutionSuite extends AnalysisTest { invocation.getArgument[String](0) match { case "testcat" => testCat - case "session" => - v2SessionCatalog case name => throw new CatalogNotFoundException(s"No such catalog: $name") } }) when(manager.defaultCatalog).thenReturn(None) - when(manager.v2SessionCatalog).thenCallRealMethod() + when(manager.v2SessionCatalog).thenReturn(Some(v2SessionCatalog)) manager } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSessionCatalogSuite.scala index 2ada934df877..b84317b50c65 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSessionCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSessionCatalogSuite.scala @@ -21,15 +21,14 @@ import java.util import org.scalatest.BeforeAndAfter -import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, SaveMode} +import org.apache.spark.sql.{DataFrame, QueryTest, SaveMode} import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, TableChange} import org.apache.spark.sql.catalog.v2.expressions.Transform import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.connector.InMemoryTable -import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog -import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE, PartitionOverwriteMode, V2_SESSION_CATALOG} +import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG import org.apache.spark.sql.sources.v2.utils.TestV2SessionCatalogBase import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.StructType @@ -145,7 +144,7 @@ private[v2] trait SessionCatalogTest[T <: Table, Catalog <: TestV2SessionCatalog override def afterEach(): Unit = { super.afterEach() catalog("session").asInstanceOf[Catalog].clearTables() - spark.conf.set(V2_SESSION_CATALOG.key, classOf[V2SessionCatalog].getName) + spark.conf.unset(V2_SESSION_CATALOG.key) } protected def verifyTable(tableName: String, expected: DataFrame): Unit diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/utils/TestV2SessionCatalogBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/utils/TestV2SessionCatalogBase.scala index 28ce6a94b253..493352cd2e03 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/utils/TestV2SessionCatalogBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/utils/TestV2SessionCatalogBase.scala @@ -22,9 +22,8 @@ import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ -import org.apache.spark.sql.catalog.v2.Identifier +import org.apache.spark.sql.catalog.v2.{CatalogExtension, Identifier} import org.apache.spark.sql.catalog.v2.expressions.Transform -import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog import org.apache.spark.sql.sources.v2.Table import org.apache.spark.sql.types.StructType @@ -33,7 +32,7 @@ import org.apache.spark.sql.types.StructType * for testing DDL as well as write operations (through df.write.saveAsTable, df.write.insertInto * and SQL). */ -private[v2] trait TestV2SessionCatalogBase[T <: Table] extends V2SessionCatalog { +private[v2] abstract class TestV2SessionCatalogBase[T <: Table] extends CatalogExtension { protected val tables: util.Map[Identifier, T] = new ConcurrentHashMap[Identifier, T]() diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala index a143c6f77d55..ae6a28642a45 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala @@ -67,7 +67,7 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session /** * A logical query plan `Analyzer` with rules specific to Hive. */ - override protected def analyzer: Analyzer = new Analyzer(catalog, conf) { + override protected def analyzer: Analyzer = new Analyzer(catalog, v2SessionCatalog, conf) { override val extendedResolutionRules: Seq[Rule[LogicalPlan]] = new ResolveHiveSerdeTable(session) +: new FindDataSourceTable(session) +: From efe6f9cd064bc1046840d6daaae30e4c31ac9cac Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 30 Aug 2019 16:52:03 +0800 Subject: [PATCH 2/8] address comments --- .../sql/catalog/v2/CatalogExtension.java | 83 ++------------ .../v2/DelegatingCatalogExtension.java | 102 ++++++++++++++++++ .../apache/spark/sql/internal/SQLConf.scala | 8 +- .../datasources/v2/V2SessionCatalog.scala | 4 +- .../v2/utils/TestV2SessionCatalogBase.scala | 4 +- 5 files changed, 118 insertions(+), 83 deletions(-) create mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/DelegatingCatalogExtension.java diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogExtension.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogExtension.java index a2a4bbeb6e5c..5580eba93674 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogExtension.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogExtension.java @@ -17,85 +17,20 @@ package org.apache.spark.sql.catalog.v2; -import java.util.Map; - -import org.apache.spark.sql.catalog.v2.expressions.Transform; -import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; -import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; -import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; -import org.apache.spark.sql.sources.v2.Table; -import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.util.CaseInsensitiveStringMap; /** - * An API to extend the Spark built-in session catalog. Implementation can override some methods - * to apply custom logic. For example, they can override {@code createTable}, do something else - * before calling {@code super.createTable}. + * An API to extend the Spark built-in session catalog. Implementation can get the built-in session + * catalog from {@link #setDelegateCatalog(TableCatalog)}, implement catalog functions with + * some custom logic and call the built-in session catalog at the end. For example, they can + * implement {@code createTable}, do something else before calling {@code createTable} of the + * built-in session catalog. */ -public abstract class CatalogExtension implements TableCatalog { - - private TableCatalog delegate; +public interface CatalogExtension extends TableCatalog { /** - * This will be called only once by Spark to pass in the Spark built-in session catalog. + * This will be called only once by Spark to pass in the Spark built-in session catalog, after + * {@link #initialize(String, CaseInsensitiveStringMap)} is called. */ - public final void setDelegateCatalog(TableCatalog delegate) { - this.delegate = delegate; - } - - @Override - public String name() { - return delegate.name(); - } - - @Override - public final void initialize(String name, CaseInsensitiveStringMap options) {} - - @Override - public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceException { - return delegate.listTables(namespace); - } - - @Override - public Table loadTable(Identifier ident) throws NoSuchTableException { - return delegate.loadTable(ident); - } - - @Override - public void invalidateTable(Identifier ident) { - delegate.invalidateTable(ident); - } - - @Override - public boolean tableExists(Identifier ident) { - return delegate.tableExists(ident); - } - - @Override - public Table createTable( - Identifier ident, - StructType schema, - Transform[] partitions, - Map properties) throws TableAlreadyExistsException, NoSuchNamespaceException { - return delegate.createTable(ident, schema, partitions, properties); - } - - @Override - public Table alterTable( - Identifier ident, - TableChange... changes) throws NoSuchTableException { - return delegate.alterTable(ident, changes); - } - - @Override - public boolean dropTable(Identifier ident) { - return delegate.dropTable(ident); - } - - @Override - public void renameTable( - Identifier oldIdent, - Identifier newIdent) throws NoSuchTableException, TableAlreadyExistsException { - delegate.renameTable(oldIdent, newIdent); - } + void setDelegateCatalog(TableCatalog delegate); } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/DelegatingCatalogExtension.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/DelegatingCatalogExtension.java new file mode 100644 index 000000000000..53eaa72fe0e8 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/DelegatingCatalogExtension.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +import java.util.Map; + +import org.apache.spark.sql.catalog.v2.expressions.Transform; +import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; +import org.apache.spark.sql.sources.v2.Table; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +/** + * A simple implementation of {@link CatalogExtension}, which implements all the catalog functions + * by calling the built-in session catalog directly. This is created for convenience, so that users + * only need to override some methods where they want to apply custom logic. For example, they can + * override {@code createTable}, do something else before calling {@code super.createTable}. + */ +public abstract class DelegatingCatalogExtension implements CatalogExtension { + + private TableCatalog delegate; + + /** + * This will be called only once by Spark to pass in the Spark built-in session catalog. + */ + public final void setDelegateCatalog(TableCatalog delegate) { + this.delegate = delegate; + } + + @Override + public String name() { + return delegate.name(); + } + + @Override + public final void initialize(String name, CaseInsensitiveStringMap options) {} + + @Override + public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceException { + return delegate.listTables(namespace); + } + + @Override + public Table loadTable(Identifier ident) throws NoSuchTableException { + return delegate.loadTable(ident); + } + + @Override + public void invalidateTable(Identifier ident) { + delegate.invalidateTable(ident); + } + + @Override + public boolean tableExists(Identifier ident) { + return delegate.tableExists(ident); + } + + @Override + public Table createTable( + Identifier ident, + StructType schema, + Transform[] partitions, + Map properties) throws TableAlreadyExistsException, NoSuchNamespaceException { + return delegate.createTable(ident, schema, partitions, properties); + } + + @Override + public Table alterTable( + Identifier ident, + TableChange... changes) throws NoSuchTableException { + return delegate.alterTable(ident, changes); + } + + @Override + public boolean dropTable(Identifier ident) { + return delegate.dropTable(ident); + } + + @Override + public void renameTable( + Identifier oldIdent, + Identifier newIdent) throws NoSuchTableException, TableAlreadyExistsException { + delegate.renameTable(oldIdent, newIdent); + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 0620c56cc718..d357a178ece5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1947,10 +1947,10 @@ object SQLConf { .createOptional val V2_SESSION_CATALOG = buildConf("spark.sql.catalog.session") - .doc("A catalog implementation which will be used as the session catalog for Spark " + - "queries. The implementation must either extend `CatalogExtension`, or implement " + - "`CatalogPlugin`. When extending `CatalogExtension`, the default session catalog " + - "will be passed in so that the implementation can fallback some functions to it.") + .doc("A catalog implementation that will be used in place of the Spark built-in session " + + "catalog for v2 operations. The implementation may extend `CatalogExtension` to be " + + "passed the Spark built-in session catalog, so that it may delegate calls to the " + + "built-in session catalog.") .stringConf .createOptional diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index 3e960048a1ad..bc4752e73fb5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -48,9 +48,7 @@ class V2SessionCatalog(catalog: SessionCatalog, conf: SQLConf) def this(sessionState: SessionState) = this(sessionState.catalog, sessionState.conf) - def this() = { - this(SparkSession.active.sessionState) - } + def this() = this(SparkSession.active.sessionState) override val defaultNamespace: Array[String] = Array("default") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/utils/TestV2SessionCatalogBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/utils/TestV2SessionCatalogBase.scala index 493352cd2e03..b25eab154626 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/utils/TestV2SessionCatalogBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/utils/TestV2SessionCatalogBase.scala @@ -22,7 +22,7 @@ import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ -import org.apache.spark.sql.catalog.v2.{CatalogExtension, Identifier} +import org.apache.spark.sql.catalog.v2.{DelegatingCatalogExtension, Identifier} import org.apache.spark.sql.catalog.v2.expressions.Transform import org.apache.spark.sql.sources.v2.Table import org.apache.spark.sql.types.StructType @@ -32,7 +32,7 @@ import org.apache.spark.sql.types.StructType * for testing DDL as well as write operations (through df.write.saveAsTable, df.write.insertInto * and SQL). */ -private[v2] abstract class TestV2SessionCatalogBase[T <: Table] extends CatalogExtension { +private[v2] abstract class TestV2SessionCatalogBase[T <: Table] extends DelegatingCatalogExtension { protected val tables: util.Map[Identifier, T] = new ConcurrentHashMap[Identifier, T]() From a19943f4b1df437f4725f968853b21962be126e0 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 30 Aug 2019 20:17:53 +0800 Subject: [PATCH 3/8] fix test --- .../org/apache/spark/sql/catalog/v2/CatalogManager.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/CatalogManager.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/CatalogManager.scala index baa946d23ae2..ff6abbf9af16 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/CatalogManager.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/CatalogManager.scala @@ -32,7 +32,13 @@ class CatalogManager(conf: SQLConf, sessionCatalog: TableCatalog) extends Loggin private val catalogs = mutable.HashMap.empty[String, CatalogPlugin] def catalog(name: String): CatalogPlugin = synchronized { - catalogs.getOrElseUpdate(name, Catalogs.load(name, conf)) + if (name.equalsIgnoreCase(CatalogManager.SESSION_CATALOG_NAME)) { + v2SessionCatalog.getOrElse { + throw new IllegalStateException("v2 session catalog not available") + } + } else { + catalogs.getOrElseUpdate(name, Catalogs.load(name, conf)) + } } def defaultCatalog: Option[CatalogPlugin] = { From 1d9f1553c92fd3ae8bf0a3dd817dfa1ad3f5ae17 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 5 Sep 2019 11:58:36 +0800 Subject: [PATCH 4/8] address comments --- .../v2/DelegatingCatalogExtension.java | 3 --- .../spark/sql/catalog/v2/CatalogManager.scala | 26 ++++++++++--------- .../spark/sql/catalog/v2/LookupCatalog.scala | 2 +- .../sql/catalyst/analysis/Analyzer.scala | 4 +-- .../apache/spark/sql/DataFrameWriter.scala | 14 +++++----- .../datasources/DataSourceResolution.scala | 19 ++++---------- .../datasources/v2/V2SessionCatalog.scala | 4 --- .../command/PlanResolutionSuite.scala | 4 +-- .../v2/V2SessionCatalogSuite.scala | 3 +-- .../DataSourceV2SQLSessionCatalogSuite.scala | 2 +- .../sql/sources/v2/DataSourceV2SQLSuite.scala | 7 +++-- 11 files changed, 35 insertions(+), 53 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/DelegatingCatalogExtension.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/DelegatingCatalogExtension.java index 53eaa72fe0e8..82cce70d930b 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/DelegatingCatalogExtension.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/DelegatingCatalogExtension.java @@ -37,9 +37,6 @@ public abstract class DelegatingCatalogExtension implements CatalogExtension { private TableCatalog delegate; - /** - * This will be called only once by Spark to pass in the Spark built-in session catalog. - */ public final void setDelegateCatalog(TableCatalog delegate) { this.delegate = delegate; } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/CatalogManager.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/CatalogManager.scala index ff6abbf9af16..f389eb8a56db 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/CatalogManager.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/CatalogManager.scala @@ -27,15 +27,13 @@ import org.apache.spark.sql.internal.SQLConf * A thread-safe manager for [[CatalogPlugin]]s. It tracks all the registered catalogs, and allow * the caller to look up a catalog by name. */ -class CatalogManager(conf: SQLConf, sessionCatalog: TableCatalog) extends Logging { +class CatalogManager(conf: SQLConf, defaultSessionCatalog: TableCatalog) extends Logging { private val catalogs = mutable.HashMap.empty[String, CatalogPlugin] def catalog(name: String): CatalogPlugin = synchronized { if (name.equalsIgnoreCase(CatalogManager.SESSION_CATALOG_NAME)) { - v2SessionCatalog.getOrElse { - throw new IllegalStateException("v2 session catalog not available") - } + v2SessionCatalog } else { catalogs.getOrElseUpdate(name, Catalogs.load(name, conf)) } @@ -56,19 +54,23 @@ class CatalogManager(conf: SQLConf, sessionCatalog: TableCatalog) extends Loggin private def loadV2SessionCatalog(): CatalogPlugin = { Catalogs.load(CatalogManager.SESSION_CATALOG_NAME, conf) match { case extension: CatalogExtension => - extension.setDelegateCatalog(sessionCatalog) + extension.setDelegateCatalog(defaultSessionCatalog) extension case other => other } } - def v2SessionCatalog: Option[CatalogPlugin] = { - try { - Some(catalogs.getOrElseUpdate(CatalogManager.SESSION_CATALOG_NAME, loadV2SessionCatalog())) - } catch { - case NonFatal(e) => - logError("Cannot load v2 session catalog", e) - None + // If the V2_SESSION_CATALOG config is specified, we try to instantiate the user-specified v2 + // session catalog. Otherwise, return the default session catalog. + def v2SessionCatalog: CatalogPlugin = { + if (conf.getConf(SQLConf.V2_SESSION_CATALOG).isDefined) { + try { + catalogs.getOrElseUpdate(CatalogManager.SESSION_CATALOG_NAME, loadV2SessionCatalog()) + } catch { + case NonFatal(_) => defaultSessionCatalog + } + } else { + defaultSessionCatalog } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala index 846810fc1806..2281402740a3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala @@ -45,7 +45,7 @@ trait LookupCatalog extends Logging { * This happens when the source implementation extends the v2 TableProvider API and is not listed * in the fallback configuration, spark.sql.sources.write.useV1SourceList */ - def sessionCatalog: Option[CatalogPlugin] = catalogManager.v2SessionCatalog + def sessionCatalog: CatalogPlugin = catalogManager.v2SessionCatalog /** * Extract catalog plugin and remaining identifier names. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 99a40c305a1f..30d28216e694 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -978,7 +978,7 @@ class Analyzer( case scala.Right(tableOpt) => tableOpt.map { table => AlterTable( - sessionCatalog.get.asTableCatalog, // table being resolved means this exists + sessionCatalog.asTableCatalog, Identifier.of(tableName.init.toArray, tableName.last), DataSourceV2Relation.create(table), changes @@ -2809,7 +2809,7 @@ class Analyzer( case CatalogObjectIdentifier(Some(v2Catalog), ident) => scala.Left((v2Catalog, ident, loadTable(v2Catalog, ident))) case CatalogObjectIdentifier(None, ident) => - catalogManager.v2SessionCatalog.flatMap(loadTable(_, ident)) match { + loadTable(catalogManager.v2SessionCatalog, ident) match { case Some(_: V1Table) => scala.Right(None) case other => scala.Right(other) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 9e58cd8b7eb4..4a42f8838193 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -353,15 +353,14 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val session = df.sparkSession val canUseV2 = lookupV2Provider().isDefined - val sessionCatalogOpt = session.sessionState.analyzer.sessionCatalog + val sessionCatalog = session.sessionState.analyzer.sessionCatalog session.sessionState.sqlParser.parseMultipartIdentifier(tableName) match { case CatalogObjectIdentifier(Some(catalog), ident) => insertInto(catalog, ident) - case CatalogObjectIdentifier(None, ident) - if canUseV2 && sessionCatalogOpt.isDefined && ident.namespace().length <= 1 => - insertInto(sessionCatalogOpt.get, ident) + case CatalogObjectIdentifier(None, ident) if canUseV2 && ident.namespace().length <= 1 => + insertInto(sessionCatalog, ident) case AsTableIdentifier(tableIdentifier) => insertInto(tableIdentifier) @@ -487,17 +486,16 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val session = df.sparkSession val canUseV2 = lookupV2Provider().isDefined - val sessionCatalogOpt = session.sessionState.analyzer.sessionCatalog + val sessionCatalog = session.sessionState.analyzer.sessionCatalog session.sessionState.sqlParser.parseMultipartIdentifier(tableName) match { case CatalogObjectIdentifier(Some(catalog), ident) => saveAsTable(catalog.asTableCatalog, ident, modeForDSV2) - case CatalogObjectIdentifier(None, ident) - if canUseV2 && sessionCatalogOpt.isDefined && ident.namespace().length <= 1 => + case CatalogObjectIdentifier(None, ident) if canUseV2 && ident.namespace().length <= 1 => // We pass in the modeForDSV1, as using the V2 session catalog should maintain compatibility // for now. - saveAsTable(sessionCatalogOpt.get.asTableCatalog, ident, modeForDSV1) + saveAsTable(sessionCatalog.asTableCatalog, ident, modeForDSV1) case AsTableIdentifier(tableIdentifier) => saveAsTable(tableIdentifier) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala index a37a2cf7f036..43bee695bd5c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources import scala.collection.mutable import org.apache.spark.sql.{AnalysisException, SaveMode} -import org.apache.spark.sql.catalog.v2.{CatalogManager, CatalogPlugin, Identifier, LookupCatalog, TableCatalog} +import org.apache.spark.sql.catalog.v2.{CatalogManager, Identifier, LookupCatalog, TableCatalog} import org.apache.spark.sql.catalog.v2.expressions.Transform import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{CastSupport, UnresolvedRelation} @@ -40,9 +40,6 @@ case class DataSourceResolution( import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ - def v2SessionCatalog: CatalogPlugin = sessionCatalog.getOrElse( - throw new AnalysisException("No v2 session catalog implementation is available")) - override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case CreateTableStatement( AsTableIdentifier(table), schema, partitionCols, bucketSpec, properties, @@ -64,7 +61,7 @@ case class DataSourceResolution( case _ => // the identifier had no catalog and no default catalog is set, but the source is v2. // use the v2 session catalog, which delegates to the global v1 session catalog - convertCreateTable(v2SessionCatalog.asTableCatalog, identifier, create) + convertCreateTable(sessionCatalog.asTableCatalog, identifier, create) } case CreateTableAsSelectStatement( @@ -87,7 +84,7 @@ case class DataSourceResolution( case _ => // the identifier had no catalog and no default catalog is set, but the source is v2. // use the v2 session catalog, which delegates to the global v1 session catalog - convertCTAS(v2SessionCatalog.asTableCatalog, identifier, create) + convertCTAS(sessionCatalog.asTableCatalog, identifier, create) } case DescribeColumnStatement( @@ -119,19 +116,13 @@ case class DataSourceResolution( case replace: ReplaceTableStatement => // the provider was not a v1 source, convert to a v2 plan val CatalogObjectIdentifier(maybeCatalog, identifier) = replace.tableName - val catalog = maybeCatalog.orElse(sessionCatalog) - .getOrElse(throw new AnalysisException( - s"No catalog specified for table ${identifier.quoted} and no default catalog is set")) - .asTableCatalog + val catalog = maybeCatalog.getOrElse(sessionCatalog).asTableCatalog convertReplaceTable(catalog, identifier, replace) case rtas: ReplaceTableAsSelectStatement => // the provider was not a v1 source, convert to a v2 plan val CatalogObjectIdentifier(maybeCatalog, identifier) = rtas.tableName - val catalog = maybeCatalog.orElse(sessionCatalog) - .getOrElse(throw new AnalysisException( - s"No catalog specified for table ${identifier.quoted} and no default catalog is set")) - .asTableCatalog + val catalog = maybeCatalog.getOrElse(sessionCatalog).asTableCatalog convertRTAS(catalog, identifier, rtas) case DropTableStatement(CatalogObjectIdentifier(Some(catalog), ident), ifExists, _) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index bc4752e73fb5..75320fb51db3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -46,10 +46,6 @@ class V2SessionCatalog(catalog: SessionCatalog, conf: SQLConf) import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ import V2SessionCatalog._ - def this(sessionState: SessionState) = this(sessionState.catalog, sessionState.conf) - - def this() = this(SparkSession.active.sessionState) - override val defaultNamespace: Array[String] = Array("default") override def name: String = CatalogManager.SESSION_CATALOG_NAME diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index 5c3c185fa948..e2cee593af03 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -66,7 +66,7 @@ class PlanResolutionSuite extends AnalysisTest { } }) when(manager.defaultCatalog).thenReturn(Some(testCat)) - when(manager.v2SessionCatalog).thenReturn(Some(v2SessionCatalog)) + when(manager.v2SessionCatalog).thenReturn(v2SessionCatalog) manager } @@ -81,7 +81,7 @@ class PlanResolutionSuite extends AnalysisTest { } }) when(manager.defaultCatalog).thenReturn(None) - when(manager.v2SessionCatalog).thenReturn(Some(v2SessionCatalog)) + when(manager.v2SessionCatalog).thenReturn(v2SessionCatalog) manager } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala index 275bc339b3b5..8a1fa998b157 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala @@ -46,7 +46,7 @@ class V2SessionCatalogBaseSuite extends SparkFunSuite with SharedSparkSession wi val testIdent: Identifier = Identifier.of(testNs, "test_table") def newCatalog(): V2SessionCatalog = { - val newCatalog = new V2SessionCatalog(spark.sessionState) + val newCatalog = new V2SessionCatalog(spark.sessionState.catalog, spark.sessionState.conf) newCatalog.initialize("test", CaseInsensitiveStringMap.empty()) newCatalog } @@ -58,7 +58,6 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { override protected def beforeAll(): Unit = { super.beforeAll() - // TODO: when there is a public API for v2 catalogs, use that instead val catalog = newCatalog() catalog.createNamespace(Array("db"), emptyProps) catalog.createNamespace(Array("db2"), emptyProps) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSessionCatalogSuite.scala index cfbafdb65c7c..8df65c14a8d9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSessionCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSessionCatalogSuite.scala @@ -46,7 +46,7 @@ class DataSourceV2SQLSessionCatalogSuite } override def getTableMetadata(tableName: String): Table = { - val v2Catalog = spark.sessionState.catalogManager.v2SessionCatalog.get + val v2Catalog = spark.sessionState.catalogManager.v2SessionCatalog val nameParts = spark.sessionState.sqlParser.parseMultipartIdentifier(tableName) v2Catalog.asInstanceOf[TableCatalog] .loadTable(Identifier.of(Array.empty, nameParts.last)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala index b6e7bc5d1a4d..de5b4692b8ba 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala @@ -19,16 +19,14 @@ package org.apache.spark.sql.sources.v2 import scala.collection.JavaConverters._ -import org.apache.spark.SparkException import org.apache.spark.sql._ import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, TableCatalog} import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchDatabaseException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.connector.{InMemoryTable, InMemoryTableCatalog, StagingInMemoryTableCatalog} -import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG import org.apache.spark.sql.sources.v2.internal.V1Table -import org.apache.spark.sql.types.{ArrayType, BooleanType, DoubleType, IntegerType, LongType, MapType, StringType, StructField, StructType, TimestampType} +import org.apache.spark.sql.types.{BooleanType, LongType, StringType, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap class DataSourceV2SQLSuite @@ -512,7 +510,8 @@ class DataSourceV2SQLSuite } test("CreateTableAsSelect: v2 session catalog can load v1 source table") { - spark.conf.set(V2_SESSION_CATALOG.key, classOf[V2SessionCatalog].getName) + // unset this config to use the default v2 session catalog. + spark.conf.unset(V2_SESSION_CATALOG.key) val df = spark.createDataFrame(Seq((1L, "a"), (2L, "b"), (3L, "c"))).toDF("id", "data") df.createOrReplaceTempView("source") From 155c37f7f3a76a79cfbb8eaafb33007d81d798a0 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 5 Sep 2019 13:16:25 +0800 Subject: [PATCH 5/8] add Experimental --- .../java/org/apache/spark/sql/catalog/v2/CatalogExtension.java | 2 ++ .../apache/spark/sql/catalog/v2/DelegatingCatalogExtension.java | 2 ++ 2 files changed, 4 insertions(+) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogExtension.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogExtension.java index 5580eba93674..2bf72c1da73d 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogExtension.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogExtension.java @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalog.v2; +import org.apache.spark.annotation.Experimental; import org.apache.spark.sql.util.CaseInsensitiveStringMap; /** @@ -26,6 +27,7 @@ * implement {@code createTable}, do something else before calling {@code createTable} of the * built-in session catalog. */ +@Experimental public interface CatalogExtension extends TableCatalog { /** diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/DelegatingCatalogExtension.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/DelegatingCatalogExtension.java index 82cce70d930b..2d3700a08806 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/DelegatingCatalogExtension.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/DelegatingCatalogExtension.java @@ -19,6 +19,7 @@ import java.util.Map; +import org.apache.spark.annotation.Experimental; import org.apache.spark.sql.catalog.v2.expressions.Transform; import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; @@ -33,6 +34,7 @@ * only need to override some methods where they want to apply custom logic. For example, they can * override {@code createTable}, do something else before calling {@code super.createTable}. */ +@Experimental public abstract class DelegatingCatalogExtension implements CatalogExtension { private TableCatalog delegate; From f89f25a43da0ea0e57eeece22b311a33ff0c0850 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 5 Sep 2019 16:10:05 +0800 Subject: [PATCH 6/8] fix tests --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 30d28216e694..6580c46ae4c2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -65,7 +65,9 @@ object SimpleAnalyzer extends Analyzer( object FakeV2SessionCatalog extends TableCatalog { private def fail() = throw new UnsupportedOperationException override def listTables(namespace: Array[String]): Array[Identifier] = fail() - override def loadTable(ident: Identifier): Table = fail() + override def loadTable(ident: Identifier): Table = { + throw new NoSuchTableException(ident.toString) + } override def createTable( ident: Identifier, schema: StructType, From 494e031359da6e77b588e4f8e3a3073a9544be02 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 5 Sep 2019 20:39:27 +0800 Subject: [PATCH 7/8] fix test --- .../datasources/v2/V2SessionCatalogSuite.scala | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala index 8a1fa998b157..a309152b2614 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala @@ -81,16 +81,6 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { private val testIdentNew = Identifier.of(testNs, "test_table_new") - test("Catalogs can load the catalog") { - val catalog = newCatalog() - - val conf = new SQLConf - conf.setConfString("spark.sql.catalog.test", catalog.getClass.getName) - - val loaded = Catalogs.load("test", conf) - assert(loaded.getClass == catalog.getClass) - } - test("listTables") { val catalog = newCatalog() val ident1 = Identifier.of(Array("ns"), "test_table_1") From 05db8602d0ef8c98f0f4d70ef8a4d67bb3f73692 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 9 Sep 2019 12:38:24 +0800 Subject: [PATCH 8/8] add error log --- .../apache/spark/sql/catalog/v2/CatalogManager.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/CatalogManager.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/CatalogManager.scala index f389eb8a56db..5bba88dbe76d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/CatalogManager.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/CatalogManager.scala @@ -63,15 +63,16 @@ class CatalogManager(conf: SQLConf, defaultSessionCatalog: TableCatalog) extends // If the V2_SESSION_CATALOG config is specified, we try to instantiate the user-specified v2 // session catalog. Otherwise, return the default session catalog. def v2SessionCatalog: CatalogPlugin = { - if (conf.getConf(SQLConf.V2_SESSION_CATALOG).isDefined) { + conf.getConf(SQLConf.V2_SESSION_CATALOG).map { customV2SessionCatalog => try { catalogs.getOrElseUpdate(CatalogManager.SESSION_CATALOG_NAME, loadV2SessionCatalog()) } catch { - case NonFatal(_) => defaultSessionCatalog + case NonFatal(_) => + logError( + "Fail to instantiate the custom v2 session catalog: " + customV2SessionCatalog) + defaultSessionCatalog } - } else { - defaultSessionCatalog - } + }.getOrElse(defaultSessionCatalog) } private def getDefaultNamespace(c: CatalogPlugin) = c match {