Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ import scala.util.control.NonFatal

import org.apache.spark.sql._
import org.apache.spark.sql.catalog.{Catalog, CatalogMetadata, Column, Database, Function, Table}
import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.{Expression, Literal}
import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, LocalRelation, LogicalPlan, OptionList, RecoverPartitions, ShowFunctions, ShowNamespaces, ShowTables, UnresolvedTableSpec, View}
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, FunctionCatalog, Identifier, SupportsNamespaces, Table => V2Table, TableCatalog, V1Table}
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, Identifier, SupportsNamespaces, Table => V2Table, TableCatalog, V1Table}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.{CatalogHelper, MultipartIdentifierHelper, NamespaceHelper, TransformHelper}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.command.ShowTablesCommand
Expand Down Expand Up @@ -284,6 +284,33 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
CatalogImpl.makeDataset(functions.result(), sparkSession)
}

private def toFunctionIdent(functionName: String): Seq[String] = {
val parsed = parseIdent(functionName)
// For backward compatibility (Spark 3.3 and prior), we should check if the function exists in
// the Hive Metastore first.
if (parsed.length <= 2 &&
!sessionCatalog.isTemporaryFunction(parsed.asFunctionIdentifier) &&
sessionCatalog.isPersistentFunction(parsed.asFunctionIdentifier)) {
qualifyV1Ident(parsed)
} else {
parsed
}
}

private def functionExists(ident: Seq[String]): Boolean = {
val plan =
UnresolvedFunctionName(ident, CatalogImpl.FUNCTION_EXISTS_COMMAND_NAME, false, None)
try {
sparkSession.sessionState.executePlan(plan).analyzed match {
case _: ResolvedPersistentFunc => true
case _: ResolvedNonPersistentFunc => true
case _ => false
}
} catch {
case e: AnalysisException if e.getErrorClass == "UNRESOLVED_ROUTINE" => false
}
}

private def makeFunction(ident: Seq[String]): Function = {
val plan = UnresolvedFunctionName(ident, "Catalog.makeFunction", false, None)
sparkSession.sessionState.executePlan(plan).analyzed match {
Expand Down Expand Up @@ -465,17 +492,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
* function. This throws an `AnalysisException` when no `Function` can be found.
*/
override def getFunction(functionName: String): Function = {
val parsed = parseIdent(functionName)
// For backward compatibility (Spark 3.3 and prior), we should check if the function exists in
// the Hive Metastore first.
val nameParts = if (parsed.length <= 2 &&
!sessionCatalog.isTemporaryFunction(parsed.asFunctionIdentifier) &&
sessionCatalog.isPersistentFunction(parsed.asFunctionIdentifier)) {
qualifyV1Ident(parsed)
} else {
parsed
}
makeFunction(nameParts)
makeFunction(toFunctionIdent(functionName))
}

/**
Expand Down Expand Up @@ -540,23 +557,16 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
* or a function.
*/
override def functionExists(functionName: String): Boolean = {
val parsed = parseIdent(functionName)
// For backward compatibility (Spark 3.3 and prior), we should check if the function exists in
// the Hive Metastore first. This also checks if it's a built-in/temp function.
(parsed.length <= 2 && sessionCatalog.functionExists(parsed.asFunctionIdentifier)) || {
val plan = UnresolvedIdentifier(parsed)
sparkSession.sessionState.executePlan(plan).analyzed match {
case ResolvedIdentifier(catalog: FunctionCatalog, ident) => catalog.functionExists(ident)
case _ => false
}
}
functionExists(toFunctionIdent(functionName))
}

/**
* Checks if the function with the specified name exists in the specified database.
*/
override def functionExists(dbName: String, functionName: String): Boolean = {
sessionCatalog.functionExists(FunctionIdentifier(functionName, Option(dbName)))
// For backward compatibility (Spark 3.3 and prior), here we always look up the function from
// the Hive Metastore.
functionExists(Seq(CatalogManager.SESSION_CATALOG_NAME, dbName, functionName))
}

/**
Expand Down Expand Up @@ -942,4 +952,5 @@ private[sql] object CatalogImpl {
new Dataset[T](queryExecution, enc)
}

private val FUNCTION_EXISTS_COMMAND_NAME = "Catalog.functionExists"
}