diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogExtension.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogExtension.java new file mode 100644 index 000000000000..2bf72c1da73d --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogExtension.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +import org.apache.spark.annotation.Experimental; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +/** + * An API to extend the Spark built-in session catalog. Implementation can get the built-in session + * catalog from {@link #setDelegateCatalog(TableCatalog)}, implement catalog functions with + * some custom logic and call the built-in session catalog at the end. For example, they can + * implement {@code createTable}, do something else before calling {@code createTable} of the + * built-in session catalog. + */ +@Experimental +public interface CatalogExtension extends TableCatalog { + + /** + * This will be called only once by Spark to pass in the Spark built-in session catalog, after + * {@link #initialize(String, CaseInsensitiveStringMap)} is called. + */ + void setDelegateCatalog(TableCatalog delegate); +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/DelegatingCatalogExtension.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/DelegatingCatalogExtension.java new file mode 100644 index 000000000000..2d3700a08806 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/DelegatingCatalogExtension.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +import java.util.Map; + +import org.apache.spark.annotation.Experimental; +import org.apache.spark.sql.catalog.v2.expressions.Transform; +import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; +import org.apache.spark.sql.sources.v2.Table; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +/** + * A simple implementation of {@link CatalogExtension}, which implements all the catalog functions + * by calling the built-in session catalog directly. This is created for convenience, so that users + * only need to override some methods where they want to apply custom logic. For example, they can + * override {@code createTable}, do something else before calling {@code super.createTable}. + */ +@Experimental +public abstract class DelegatingCatalogExtension implements CatalogExtension { + + private TableCatalog delegate; + + public final void setDelegateCatalog(TableCatalog delegate) { + this.delegate = delegate; + } + + @Override + public String name() { + return delegate.name(); + } + + @Override + public final void initialize(String name, CaseInsensitiveStringMap options) {} + + @Override + public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceException { + return delegate.listTables(namespace); + } + + @Override + public Table loadTable(Identifier ident) throws NoSuchTableException { + return delegate.loadTable(ident); + } + + @Override + public void invalidateTable(Identifier ident) { + delegate.invalidateTable(ident); + } + + @Override + public boolean tableExists(Identifier ident) { + return delegate.tableExists(ident); + } + + @Override + public Table createTable( + Identifier ident, + StructType schema, + Transform[] partitions, + Map properties) throws TableAlreadyExistsException, NoSuchNamespaceException { + return delegate.createTable(ident, schema, partitions, properties); + } + + @Override + public Table alterTable( + Identifier ident, + TableChange... changes) throws NoSuchTableException { + return delegate.alterTable(ident, changes); + } + + @Override + public boolean dropTable(Identifier ident) { + return delegate.dropTable(ident); + } + + @Override + public void renameTable( + Identifier oldIdent, + Identifier newIdent) throws NoSuchTableException, TableAlreadyExistsException { + delegate.renameTable(oldIdent, newIdent); + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/CatalogManager.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/CatalogManager.scala index d5a6a61f8257..5bba88dbe76d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/CatalogManager.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/CatalogManager.scala @@ -27,12 +27,16 @@ import org.apache.spark.sql.internal.SQLConf * A thread-safe manager for [[CatalogPlugin]]s. It tracks all the registered catalogs, and allow * the caller to look up a catalog by name. */ -class CatalogManager(conf: SQLConf) extends Logging { +class CatalogManager(conf: SQLConf, defaultSessionCatalog: TableCatalog) extends Logging { private val catalogs = mutable.HashMap.empty[String, CatalogPlugin] def catalog(name: String): CatalogPlugin = synchronized { - catalogs.getOrElseUpdate(name, Catalogs.load(name, conf)) + if (name.equalsIgnoreCase(CatalogManager.SESSION_CATALOG_NAME)) { + v2SessionCatalog + } else { + catalogs.getOrElseUpdate(name, Catalogs.load(name, conf)) + } } def defaultCatalog: Option[CatalogPlugin] = { @@ -47,16 +51,30 @@ class CatalogManager(conf: SQLConf) extends Logging { } } - def v2SessionCatalog: Option[CatalogPlugin] = { - try { - Some(catalog(CatalogManager.SESSION_CATALOG_NAME)) - } catch { - case NonFatal(e) => - logError("Cannot load v2 session catalog", e) - None + private def loadV2SessionCatalog(): CatalogPlugin = { + Catalogs.load(CatalogManager.SESSION_CATALOG_NAME, conf) match { + case extension: CatalogExtension => + extension.setDelegateCatalog(defaultSessionCatalog) + extension + case other => other } } + // If the V2_SESSION_CATALOG config is specified, we try to instantiate the user-specified v2 + // session catalog. Otherwise, return the default session catalog. + def v2SessionCatalog: CatalogPlugin = { + conf.getConf(SQLConf.V2_SESSION_CATALOG).map { customV2SessionCatalog => + try { + catalogs.getOrElseUpdate(CatalogManager.SESSION_CATALOG_NAME, loadV2SessionCatalog()) + } catch { + case NonFatal(_) => + logError( + "Fail to instantiate the custom v2 session catalog: " + customV2SessionCatalog) + defaultSessionCatalog + } + }.getOrElse(defaultSessionCatalog) + } + private def getDefaultNamespace(c: CatalogPlugin) = c match { case c: SupportsNamespaces => c.defaultNamespace() case _ => Array.empty[String] diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala index 846810fc1806..2281402740a3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/LookupCatalog.scala @@ -45,7 +45,7 @@ trait LookupCatalog extends Logging { * This happens when the source implementation extends the v2 TableProvider API and is not listed * in the fallback configuration, spark.sql.sources.write.useV1SourceList */ - def sessionCatalog: Option[CatalogPlugin] = catalogManager.v2SessionCatalog + def sessionCatalog: CatalogPlugin = catalogManager.v2SessionCatalog /** * Extract catalog plugin and remaining identifier names. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 6f17256f8163..5d130c5af213 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.analysis +import java.util import java.util.Locale import scala.collection.mutable @@ -25,7 +26,7 @@ import scala.util.Random import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalog.v2._ -import org.apache.spark.sql.catalog.v2.expressions.{FieldReference, IdentityTransform} +import org.apache.spark.sql.catalog.v2.expressions.{FieldReference, IdentityTransform, Transform} import org.apache.spark.sql.catalyst._ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.encoders.OuterScopes @@ -45,6 +46,7 @@ import org.apache.spark.sql.internal.SQLConf.{PartitionOverwriteMode, StoreAssig import org.apache.spark.sql.sources.v2.Table import org.apache.spark.sql.sources.v2.internal.V1Table import org.apache.spark.sql.types._ +import org.apache.spark.sql.util.CaseInsensitiveStringMap /** * A trivial [[Analyzer]] with a dummy [[SessionCatalog]] and [[EmptyFunctionRegistry]]. @@ -60,6 +62,24 @@ object SimpleAnalyzer extends Analyzer( }, new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true)) +object FakeV2SessionCatalog extends TableCatalog { + private def fail() = throw new UnsupportedOperationException + override def listTables(namespace: Array[String]): Array[Identifier] = fail() + override def loadTable(ident: Identifier): Table = { + throw new NoSuchTableException(ident.toString) + } + override def createTable( + ident: Identifier, + schema: StructType, + partitions: Array[Transform], + properties: util.Map[String, String]): Table = fail() + override def alterTable(ident: Identifier, changes: TableChange*): Table = fail() + override def dropTable(ident: Identifier): Boolean = fail() + override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = fail() + override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = fail() + override def name(): String = fail() +} + /** * Provides a way to keep state during the analysis, this enables us to decouple the concerns * of analysis environment from the catalog. @@ -101,15 +121,21 @@ object AnalysisContext { */ class Analyzer( catalog: SessionCatalog, + v2SessionCatalog: TableCatalog, conf: SQLConf, maxIterations: Int) extends RuleExecutor[LogicalPlan] with CheckAnalysis with LookupCatalog { + // Only for tests. def this(catalog: SessionCatalog, conf: SQLConf) = { - this(catalog, conf, conf.optimizerMaxIterations) + this(catalog, FakeV2SessionCatalog, conf, conf.optimizerMaxIterations) + } + + def this(catalog: SessionCatalog, v2SessionCatalog: TableCatalog, conf: SQLConf) = { + this(catalog, v2SessionCatalog, conf, conf.optimizerMaxIterations) } - override val catalogManager: CatalogManager = new CatalogManager(conf) + override val catalogManager: CatalogManager = new CatalogManager(conf, v2SessionCatalog) def executeAndCheck(plan: LogicalPlan, tracker: QueryPlanningTracker): LogicalPlan = { AnalysisHelper.markInAnalyzer { @@ -954,7 +980,7 @@ class Analyzer( case scala.Right(tableOpt) => tableOpt.map { table => AlterTable( - sessionCatalog.get.asTableCatalog, // table being resolved means this exists + sessionCatalog.asTableCatalog, Identifier.of(tableName.init.toArray, tableName.last), DataSourceV2Relation.create(table), changes @@ -2837,7 +2863,7 @@ class Analyzer( case CatalogObjectIdentifier(Some(v2Catalog), ident) => scala.Left((v2Catalog, ident, loadTable(v2Catalog, ident))) case CatalogObjectIdentifier(None, ident) => - catalogManager.v2SessionCatalog.flatMap(loadTable(_, ident)) match { + loadTable(catalogManager.v2SessionCatalog, ident) match { case Some(_: V1Table) => scala.Right(None) case other => scala.Right(other) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 0e7391063ed1..f366c656073d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1956,9 +1956,12 @@ object SQLConf { .createOptional val V2_SESSION_CATALOG = buildConf("spark.sql.catalog.session") - .doc("Name of the default v2 catalog, used when a catalog is not identified in queries") + .doc("A catalog implementation that will be used in place of the Spark built-in session " + + "catalog for v2 operations. The implementation may extend `CatalogExtension` to be " + + "passed the Spark built-in session catalog, so that it may delegate calls to the " + + "built-in session catalog.") .stringConf - .createWithDefault("org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog") + .createOptional val LEGACY_LOOSE_UPCAST = buildConf("spark.sql.legacy.looseUpcast") .doc("When true, the upcast will be loose and allows string to atomic types.") diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogManagerSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogManagerSuite.scala index f7f190136bfc..d2a2ba50ead4 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogManagerSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogManagerSuite.scala @@ -21,6 +21,7 @@ import java.util import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalog.v2.{CatalogManager, NamespaceChange, SupportsNamespaces} +import org.apache.spark.sql.catalyst.analysis.FakeV2SessionCatalog import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -28,7 +29,7 @@ class CatalogManagerSuite extends SparkFunSuite { test("CatalogManager should reflect the changes of default catalog") { val conf = new SQLConf - val catalogManager = new CatalogManager(conf) + val catalogManager = new CatalogManager(conf, FakeV2SessionCatalog) assert(catalogManager.currentCatalog.isEmpty) assert(catalogManager.currentNamespace.sameElements(Array("default"))) @@ -42,7 +43,7 @@ class CatalogManagerSuite extends SparkFunSuite { test("CatalogManager should keep the current catalog once set") { val conf = new SQLConf - val catalogManager = new CatalogManager(conf) + val catalogManager = new CatalogManager(conf, FakeV2SessionCatalog) assert(catalogManager.currentCatalog.isEmpty) conf.setConfString("spark.sql.catalog.dummy", classOf[DummyCatalog].getName) catalogManager.setCurrentCatalog("dummy") @@ -57,7 +58,7 @@ class CatalogManagerSuite extends SparkFunSuite { test("current namespace should be updated when switching current catalog") { val conf = new SQLConf - val catalogManager = new CatalogManager(conf) + val catalogManager = new CatalogManager(conf, FakeV2SessionCatalog) catalogManager.setCurrentNamespace(Array("abc")) assert(catalogManager.currentNamespace.sameElements(Array("abc"))) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index c782e5012d8d..04a611024eb2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -354,15 +354,14 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val session = df.sparkSession val canUseV2 = lookupV2Provider().isDefined - val sessionCatalogOpt = session.sessionState.analyzer.sessionCatalog + val sessionCatalog = session.sessionState.analyzer.sessionCatalog session.sessionState.sqlParser.parseMultipartIdentifier(tableName) match { case CatalogObjectIdentifier(Some(catalog), ident) => insertInto(catalog, ident) - case CatalogObjectIdentifier(None, ident) - if canUseV2 && sessionCatalogOpt.isDefined && ident.namespace().length <= 1 => - insertInto(sessionCatalogOpt.get, ident) + case CatalogObjectIdentifier(None, ident) if canUseV2 && ident.namespace().length <= 1 => + insertInto(sessionCatalog, ident) case AsTableIdentifier(tableIdentifier) => insertInto(tableIdentifier) @@ -488,17 +487,16 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val session = df.sparkSession val canUseV2 = lookupV2Provider().isDefined - val sessionCatalogOpt = session.sessionState.analyzer.sessionCatalog + val sessionCatalog = session.sessionState.analyzer.sessionCatalog session.sessionState.sqlParser.parseMultipartIdentifier(tableName) match { case CatalogObjectIdentifier(Some(catalog), ident) => saveAsTable(catalog.asTableCatalog, ident, modeForDSV2) - case CatalogObjectIdentifier(None, ident) - if canUseV2 && sessionCatalogOpt.isDefined && ident.namespace().length <= 1 => + case CatalogObjectIdentifier(None, ident) if canUseV2 && ident.namespace().length <= 1 => // We pass in the modeForDSV1, as using the V2 session catalog should maintain compatibility // for now. - saveAsTable(sessionCatalogOpt.get.asTableCatalog, ident, modeForDSV1) + saveAsTable(sessionCatalog.asTableCatalog, ident, modeForDSV1) case AsTableIdentifier(tableIdentifier) => saveAsTable(tableIdentifier) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala index a37a2cf7f036..43bee695bd5c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources import scala.collection.mutable import org.apache.spark.sql.{AnalysisException, SaveMode} -import org.apache.spark.sql.catalog.v2.{CatalogManager, CatalogPlugin, Identifier, LookupCatalog, TableCatalog} +import org.apache.spark.sql.catalog.v2.{CatalogManager, Identifier, LookupCatalog, TableCatalog} import org.apache.spark.sql.catalog.v2.expressions.Transform import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{CastSupport, UnresolvedRelation} @@ -40,9 +40,6 @@ case class DataSourceResolution( import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ - def v2SessionCatalog: CatalogPlugin = sessionCatalog.getOrElse( - throw new AnalysisException("No v2 session catalog implementation is available")) - override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case CreateTableStatement( AsTableIdentifier(table), schema, partitionCols, bucketSpec, properties, @@ -64,7 +61,7 @@ case class DataSourceResolution( case _ => // the identifier had no catalog and no default catalog is set, but the source is v2. // use the v2 session catalog, which delegates to the global v1 session catalog - convertCreateTable(v2SessionCatalog.asTableCatalog, identifier, create) + convertCreateTable(sessionCatalog.asTableCatalog, identifier, create) } case CreateTableAsSelectStatement( @@ -87,7 +84,7 @@ case class DataSourceResolution( case _ => // the identifier had no catalog and no default catalog is set, but the source is v2. // use the v2 session catalog, which delegates to the global v1 session catalog - convertCTAS(v2SessionCatalog.asTableCatalog, identifier, create) + convertCTAS(sessionCatalog.asTableCatalog, identifier, create) } case DescribeColumnStatement( @@ -119,19 +116,13 @@ case class DataSourceResolution( case replace: ReplaceTableStatement => // the provider was not a v1 source, convert to a v2 plan val CatalogObjectIdentifier(maybeCatalog, identifier) = replace.tableName - val catalog = maybeCatalog.orElse(sessionCatalog) - .getOrElse(throw new AnalysisException( - s"No catalog specified for table ${identifier.quoted} and no default catalog is set")) - .asTableCatalog + val catalog = maybeCatalog.getOrElse(sessionCatalog).asTableCatalog convertReplaceTable(catalog, identifier, replace) case rtas: ReplaceTableAsSelectStatement => // the provider was not a v1 source, convert to a v2 plan val CatalogObjectIdentifier(maybeCatalog, identifier) = rtas.tableName - val catalog = maybeCatalog.orElse(sessionCatalog) - .getOrElse(throw new AnalysisException( - s"No catalog specified for table ${identifier.quoted} and no default catalog is set")) - .asTableCatalog + val catalog = maybeCatalog.getOrElse(sessionCatalog).asTableCatalog convertRTAS(catalog, identifier, rtas) case DropTableStatement(CatalogObjectIdentifier(Some(catalog), ident), ifExists, _) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index ebfd7384930f..75320fb51db3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -24,15 +24,15 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalog.v2.{Identifier, NamespaceChange, SupportsNamespaces, TableCatalog, TableChange} +import org.apache.spark.sql.catalog.v2.{CatalogManager, Identifier, NamespaceChange, SupportsNamespaces, TableCatalog, TableChange} import org.apache.spark.sql.catalog.v2.NamespaceChange.{RemoveProperty, SetProperty} -import org.apache.spark.sql.catalog.v2.expressions.{BucketTransform, FieldReference, IdentityTransform, Transform} +import org.apache.spark.sql.catalog.v2.expressions.{BucketTransform, FieldReference, IdentityTransform, LogicalExpressions, Transform} import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, CatalogTable, CatalogTableType, CatalogUtils, SessionCatalog} import org.apache.spark.sql.execution.datasources.DataSource -import org.apache.spark.sql.internal.SessionState +import org.apache.spark.sql.internal.{SessionState, SQLConf} import org.apache.spark.sql.sources.v2.Table import org.apache.spark.sql.sources.v2.internal.V1Table import org.apache.spark.sql.types.StructType @@ -41,25 +41,17 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap /** * A [[TableCatalog]] that translates calls to the v1 SessionCatalog. */ -class V2SessionCatalog(sessionState: SessionState) extends TableCatalog with SupportsNamespaces { +class V2SessionCatalog(catalog: SessionCatalog, conf: SQLConf) + extends TableCatalog with SupportsNamespaces { import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ import V2SessionCatalog._ - def this() = { - this(SparkSession.active.sessionState) - } - override val defaultNamespace: Array[String] = Array("default") - private lazy val catalog: SessionCatalog = sessionState.catalog - - private var _name: String = _ + override def name: String = CatalogManager.SESSION_CATALOG_NAME - override def name: String = _name - - override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = { - this._name = name - } + // This class is instantiated by Spark, so `initialize` method will not be called. + override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {} override def listTables(namespace: Array[String]): Array[Identifier] = { namespace match { @@ -92,7 +84,7 @@ class V2SessionCatalog(sessionState: SessionState) extends TableCatalog with Sup properties: util.Map[String, String]): Table = { val (partitionColumns, maybeBucketSpec) = V2SessionCatalog.convertTransforms(partitions) - val provider = properties.getOrDefault("provider", sessionState.conf.defaultDataSourceName) + val provider = properties.getOrDefault("provider", conf.defaultDataSourceName) val tableProperties = properties.asScala val location = Option(properties.get(LOCATION_TABLE_PROP)) val storage = DataSource.buildStorageFormatFromOptions(tableProperties.toMap) @@ -108,7 +100,7 @@ class V2SessionCatalog(sessionState: SessionState) extends TableCatalog with Sup partitionColumnNames = partitionColumns, bucketSpec = maybeBucketSpec, properties = tableProperties.toMap, - tracksPartitionsInCatalog = sessionState.conf.manageFilesourcePartitions, + tracksPartitionsInCatalog = conf.manageFilesourcePartitions, comment = Option(properties.get(COMMENT_TABLE_PROP))) try { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index db4885aa01ba..20b0143f098c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{ColumnarRule, QueryExecution, SparkOptimizer, SparkPlanner, SparkSqlParser} import org.apache.spark.sql.execution.analysis.DetectAmbiguousSelfJoin import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.execution.datasources.v2.TableCapabilityCheck +import org.apache.spark.sql.execution.datasources.v2.{TableCapabilityCheck, V2SessionCatalog} import org.apache.spark.sql.streaming.StreamingQueryManager import org.apache.spark.sql.util.ExecutionListenerManager @@ -151,6 +151,8 @@ abstract class BaseSessionStateBuilder( catalog } + protected lazy val v2SessionCatalog = new V2SessionCatalog(catalog, conf) + /** * Interface exposed to the user for registering user-defined functions. * @@ -164,7 +166,7 @@ abstract class BaseSessionStateBuilder( * * Note: this depends on the `conf` and `catalog` fields. */ - protected def analyzer: Analyzer = new Analyzer(catalog, conf) { + protected def analyzer: Analyzer = new Analyzer(catalog, v2SessionCatalog, conf) { override val extendedResolutionRules: Seq[Rule[LogicalPlan]] = new FindDataSourceTable(session) +: new ResolveSQLOnFile(session) +: diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index bba1dc0f697a..e2cee593af03 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -61,14 +61,12 @@ class PlanResolutionSuite extends AnalysisTest { invocation.getArgument[String](0) match { case "testcat" => testCat - case "session" => - v2SessionCatalog case name => throw new CatalogNotFoundException(s"No such catalog: $name") } }) when(manager.defaultCatalog).thenReturn(Some(testCat)) - when(manager.v2SessionCatalog).thenCallRealMethod() + when(manager.v2SessionCatalog).thenReturn(v2SessionCatalog) manager } @@ -78,14 +76,12 @@ class PlanResolutionSuite extends AnalysisTest { invocation.getArgument[String](0) match { case "testcat" => testCat - case "session" => - v2SessionCatalog case name => throw new CatalogNotFoundException(s"No such catalog: $name") } }) when(manager.defaultCatalog).thenReturn(None) - when(manager.v2SessionCatalog).thenCallRealMethod() + when(manager.v2SessionCatalog).thenReturn(v2SessionCatalog) manager } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala index 275bc339b3b5..a309152b2614 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala @@ -46,7 +46,7 @@ class V2SessionCatalogBaseSuite extends SparkFunSuite with SharedSparkSession wi val testIdent: Identifier = Identifier.of(testNs, "test_table") def newCatalog(): V2SessionCatalog = { - val newCatalog = new V2SessionCatalog(spark.sessionState) + val newCatalog = new V2SessionCatalog(spark.sessionState.catalog, spark.sessionState.conf) newCatalog.initialize("test", CaseInsensitiveStringMap.empty()) newCatalog } @@ -58,7 +58,6 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { override protected def beforeAll(): Unit = { super.beforeAll() - // TODO: when there is a public API for v2 catalogs, use that instead val catalog = newCatalog() catalog.createNamespace(Array("db"), emptyProps) catalog.createNamespace(Array("db2"), emptyProps) @@ -82,16 +81,6 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { private val testIdentNew = Identifier.of(testNs, "test_table_new") - test("Catalogs can load the catalog") { - val catalog = newCatalog() - - val conf = new SQLConf - conf.setConfString("spark.sql.catalog.test", catalog.getClass.getName) - - val loaded = Catalogs.load("test", conf) - assert(loaded.getClass == catalog.getClass) - } - test("listTables") { val catalog = newCatalog() val ident1 = Identifier.of(Array("ns"), "test_table_1") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSessionCatalogSuite.scala index fee696250163..629e825ec68d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSessionCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSessionCatalogSuite.scala @@ -21,15 +21,14 @@ import java.util import org.scalatest.BeforeAndAfter -import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, SaveMode} +import org.apache.spark.sql.{DataFrame, QueryTest, SaveMode} import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, TableCatalog, TableChange} import org.apache.spark.sql.catalog.v2.expressions.Transform import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.connector.InMemoryTable -import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog -import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE, PartitionOverwriteMode, V2_SESSION_CATALOG} +import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG import org.apache.spark.sql.sources.v2.utils.TestV2SessionCatalogBase import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.StructType @@ -89,7 +88,7 @@ class DataSourceV2DataFrameSessionCatalogSuite val t1 = "prop_table" withTable(t1) { spark.range(20).write.format(v2Format).option("path", "abc").saveAsTable(t1) - val cat = spark.sessionState.catalogManager.v2SessionCatalog.get.asInstanceOf[TableCatalog] + val cat = spark.sessionState.catalogManager.v2SessionCatalog.asInstanceOf[TableCatalog] val tableInfo = cat.loadTable(Identifier.of(Array.empty, t1)) assert(tableInfo.properties().get("location") === "abc") assert(tableInfo.properties().get("provider") === v2Format) @@ -156,7 +155,7 @@ private[v2] trait SessionCatalogTest[T <: Table, Catalog <: TestV2SessionCatalog override def afterEach(): Unit = { super.afterEach() catalog("session").asInstanceOf[Catalog].clearTables() - spark.conf.set(V2_SESSION_CATALOG.key, classOf[V2SessionCatalog].getName) + spark.conf.unset(V2_SESSION_CATALOG.key) } protected def verifyTable(tableName: String, expected: DataFrame): Unit diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSessionCatalogSuite.scala index cfbafdb65c7c..8df65c14a8d9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSessionCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSessionCatalogSuite.scala @@ -46,7 +46,7 @@ class DataSourceV2SQLSessionCatalogSuite } override def getTableMetadata(tableName: String): Table = { - val v2Catalog = spark.sessionState.catalogManager.v2SessionCatalog.get + val v2Catalog = spark.sessionState.catalogManager.v2SessionCatalog val nameParts = spark.sessionState.sqlParser.parseMultipartIdentifier(tableName) v2Catalog.asInstanceOf[TableCatalog] .loadTable(Identifier.of(Array.empty, nameParts.last)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala index b6e7bc5d1a4d..de5b4692b8ba 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala @@ -19,16 +19,14 @@ package org.apache.spark.sql.sources.v2 import scala.collection.JavaConverters._ -import org.apache.spark.SparkException import org.apache.spark.sql._ import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, TableCatalog} import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchDatabaseException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.connector.{InMemoryTable, InMemoryTableCatalog, StagingInMemoryTableCatalog} -import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG import org.apache.spark.sql.sources.v2.internal.V1Table -import org.apache.spark.sql.types.{ArrayType, BooleanType, DoubleType, IntegerType, LongType, MapType, StringType, StructField, StructType, TimestampType} +import org.apache.spark.sql.types.{BooleanType, LongType, StringType, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap class DataSourceV2SQLSuite @@ -512,7 +510,8 @@ class DataSourceV2SQLSuite } test("CreateTableAsSelect: v2 session catalog can load v1 source table") { - spark.conf.set(V2_SESSION_CATALOG.key, classOf[V2SessionCatalog].getName) + // unset this config to use the default v2 session catalog. + spark.conf.unset(V2_SESSION_CATALOG.key) val df = spark.createDataFrame(Seq((1L, "a"), (2L, "b"), (3L, "c"))).toDF("id", "data") df.createOrReplaceTempView("source") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/utils/TestV2SessionCatalogBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/utils/TestV2SessionCatalogBase.scala index 28ce6a94b253..b25eab154626 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/utils/TestV2SessionCatalogBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/utils/TestV2SessionCatalogBase.scala @@ -22,9 +22,8 @@ import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ -import org.apache.spark.sql.catalog.v2.Identifier +import org.apache.spark.sql.catalog.v2.{DelegatingCatalogExtension, Identifier} import org.apache.spark.sql.catalog.v2.expressions.Transform -import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog import org.apache.spark.sql.sources.v2.Table import org.apache.spark.sql.types.StructType @@ -33,7 +32,7 @@ import org.apache.spark.sql.types.StructType * for testing DDL as well as write operations (through df.write.saveAsTable, df.write.insertInto * and SQL). */ -private[v2] trait TestV2SessionCatalogBase[T <: Table] extends V2SessionCatalog { +private[v2] abstract class TestV2SessionCatalogBase[T <: Table] extends DelegatingCatalogExtension { protected val tables: util.Map[Identifier, T] = new ConcurrentHashMap[Identifier, T]() diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala index 188aedc3640b..3fc7908e9c4c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala @@ -67,7 +67,7 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session /** * A logical query plan `Analyzer` with rules specific to Hive. */ - override protected def analyzer: Analyzer = new Analyzer(catalog, conf) { + override protected def analyzer: Analyzer = new Analyzer(catalog, v2SessionCatalog, conf) { override val extendedResolutionRules: Seq[Rule[LogicalPlan]] = new ResolveHiveSerdeTable(session) +: new FindDataSourceTable(session) +: