Skip to content

Commit dfd7cde

Browse files
heyihongdongjoon-hyun
authored andcommitted
[SPARK-45842][SQL] Refactor Catalog Function APIs to use analyzer
### What changes were proposed in this pull request? - Refactor Catalog Function APIs to use analyzer ### Why are the changes needed? - Less duplicate logics. We should not directly invoke catalog APIs, but go through analyzer. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #43720 from heyihong/SPARK-45842. Authored-by: Yihong He <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 4df4fec commit dfd7cde

File tree

1 file changed

+35
-24
lines changed

1 file changed

+35
-24
lines changed

sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala

Lines changed: 35 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,14 @@ import scala.util.control.NonFatal
2222

2323
import org.apache.spark.sql._
2424
import org.apache.spark.sql.catalog.{Catalog, CatalogMetadata, Column, Database, Function, Table}
25-
import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, FunctionIdentifier, TableIdentifier}
25+
import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, TableIdentifier}
2626
import org.apache.spark.sql.catalyst.analysis._
2727
import org.apache.spark.sql.catalyst.catalog._
2828
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
2929
import org.apache.spark.sql.catalyst.expressions.{Expression, Literal}
3030
import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, LocalRelation, LogicalPlan, OptionList, RecoverPartitions, ShowFunctions, ShowNamespaces, ShowTables, UnresolvedTableSpec, View}
3131
import org.apache.spark.sql.catalyst.types.DataTypeUtils
32-
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, FunctionCatalog, Identifier, SupportsNamespaces, Table => V2Table, TableCatalog, V1Table}
32+
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, Identifier, SupportsNamespaces, Table => V2Table, TableCatalog, V1Table}
3333
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.{CatalogHelper, MultipartIdentifierHelper, NamespaceHelper, TransformHelper}
3434
import org.apache.spark.sql.errors.QueryCompilationErrors
3535
import org.apache.spark.sql.execution.command.ShowTablesCommand
@@ -284,6 +284,33 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
284284
CatalogImpl.makeDataset(functions.result(), sparkSession)
285285
}
286286

287+
private def toFunctionIdent(functionName: String): Seq[String] = {
288+
val parsed = parseIdent(functionName)
289+
// For backward compatibility (Spark 3.3 and prior), we should check if the function exists in
290+
// the Hive Metastore first.
291+
if (parsed.length <= 2 &&
292+
!sessionCatalog.isTemporaryFunction(parsed.asFunctionIdentifier) &&
293+
sessionCatalog.isPersistentFunction(parsed.asFunctionIdentifier)) {
294+
qualifyV1Ident(parsed)
295+
} else {
296+
parsed
297+
}
298+
}
299+
300+
private def functionExists(ident: Seq[String]): Boolean = {
301+
val plan =
302+
UnresolvedFunctionName(ident, CatalogImpl.FUNCTION_EXISTS_COMMAND_NAME, false, None)
303+
try {
304+
sparkSession.sessionState.executePlan(plan).analyzed match {
305+
case _: ResolvedPersistentFunc => true
306+
case _: ResolvedNonPersistentFunc => true
307+
case _ => false
308+
}
309+
} catch {
310+
case e: AnalysisException if e.getErrorClass == "UNRESOLVED_ROUTINE" => false
311+
}
312+
}
313+
287314
private def makeFunction(ident: Seq[String]): Function = {
288315
val plan = UnresolvedFunctionName(ident, "Catalog.makeFunction", false, None)
289316
sparkSession.sessionState.executePlan(plan).analyzed match {
@@ -465,17 +492,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
465492
* function. This throws an `AnalysisException` when no `Function` can be found.
466493
*/
467494
override def getFunction(functionName: String): Function = {
468-
val parsed = parseIdent(functionName)
469-
// For backward compatibility (Spark 3.3 and prior), we should check if the function exists in
470-
// the Hive Metastore first.
471-
val nameParts = if (parsed.length <= 2 &&
472-
!sessionCatalog.isTemporaryFunction(parsed.asFunctionIdentifier) &&
473-
sessionCatalog.isPersistentFunction(parsed.asFunctionIdentifier)) {
474-
qualifyV1Ident(parsed)
475-
} else {
476-
parsed
477-
}
478-
makeFunction(nameParts)
495+
makeFunction(toFunctionIdent(functionName))
479496
}
480497

481498
/**
@@ -540,23 +557,16 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
540557
* or a function.
541558
*/
542559
override def functionExists(functionName: String): Boolean = {
543-
val parsed = parseIdent(functionName)
544-
// For backward compatibility (Spark 3.3 and prior), we should check if the function exists in
545-
// the Hive Metastore first. This also checks if it's a built-in/temp function.
546-
(parsed.length <= 2 && sessionCatalog.functionExists(parsed.asFunctionIdentifier)) || {
547-
val plan = UnresolvedIdentifier(parsed)
548-
sparkSession.sessionState.executePlan(plan).analyzed match {
549-
case ResolvedIdentifier(catalog: FunctionCatalog, ident) => catalog.functionExists(ident)
550-
case _ => false
551-
}
552-
}
560+
functionExists(toFunctionIdent(functionName))
553561
}
554562

555563
/**
556564
* Checks if the function with the specified name exists in the specified database.
557565
*/
558566
override def functionExists(dbName: String, functionName: String): Boolean = {
559-
sessionCatalog.functionExists(FunctionIdentifier(functionName, Option(dbName)))
567+
// For backward compatibility (Spark 3.3 and prior), here we always look up the function from
568+
// the Hive Metastore.
569+
functionExists(Seq(CatalogManager.SESSION_CATALOG_NAME, dbName, functionName))
560570
}
561571

562572
/**
@@ -942,4 +952,5 @@ private[sql] object CatalogImpl {
942952
new Dataset[T](queryExecution, enc)
943953
}
944954

955+
private val FUNCTION_EXISTS_COMMAND_NAME = "Catalog.functionExists"
945956
}

0 commit comments

Comments
 (0)