Skip to content

Commit be675a0

Browse files
linhongliu-dbcloud-fan
authored andcommitted
[SPARK-34490][SQL] Analysis should fail if the view refers a dropped table
### What changes were proposed in this pull request? When resolving a view, we use the captured view name in `AnalysisContext` to distinguish whether a relation name is a view or a table. But if the resolution failed, other rules (e.g. `ResolveTables`) will try to resolve the relation again but without `AnalysisContext`. So, in this case, the resolution may be incorrect. For example, if the view refers to a dropped table while a view with the same name exists, the dropped table will be resolved as a view rather than an unresolved exception. ### Why are the changes needed? bugfix ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? newly added test cases Closes apache#31606 from linhongliu-db/fix-temp-view-master. Lead-authored-by: Linhong Liu <linhong.liu@databricks.com> Co-authored-by: Linhong Liu <67896261+linhongliu-db@users.noreply.github.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 612d523 commit be675a0

File tree

3 files changed

+53
-15
lines changed

3 files changed

+53
-15
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -871,24 +871,24 @@ class Analyzer(override val catalogManager: CatalogManager)
871871
object ResolveTempViews extends Rule[LogicalPlan] {
872872
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
873873
case u @ UnresolvedRelation(ident, _, isStreaming) =>
874-
lookupTempView(ident, isStreaming).getOrElse(u)
874+
lookupTempView(ident, isStreaming, performCheck = true).getOrElse(u)
875875
case i @ InsertIntoStatement(UnresolvedRelation(ident, _, false), _, _, _, _, _) =>
876-
lookupTempView(ident)
876+
lookupTempView(ident, performCheck = true)
877877
.map(view => i.copy(table = view))
878878
.getOrElse(i)
879879
case c @ CacheTable(UnresolvedRelation(ident, _, false), _, _, _) =>
880-
lookupTempView(ident)
880+
lookupTempView(ident, performCheck = true)
881881
.map(view => c.copy(table = view))
882882
.getOrElse(c)
883883
case c @ UncacheTable(UnresolvedRelation(ident, _, false), _, _) =>
884-
lookupTempView(ident)
884+
lookupTempView(ident, performCheck = true)
885885
.map(view => c.copy(table = view, isTempView = true))
886886
.getOrElse(c)
887887
// TODO (SPARK-27484): handle streaming write commands when we have them.
888888
case write: V2WriteCommand =>
889889
write.table match {
890890
case UnresolvedRelation(ident, _, false) =>
891-
lookupTempView(ident).map(EliminateSubqueryAliases(_)).map {
891+
lookupTempView(ident, performCheck = true).map(EliminateSubqueryAliases(_)).map {
892892
case r: DataSourceV2Relation => write.withNewTable(r)
893893
case _ => throw QueryCompilationErrors.writeIntoTempViewNotAllowedError(ident.quoted)
894894
}.getOrElse(write)
@@ -921,7 +921,9 @@ class Analyzer(override val catalogManager: CatalogManager)
921921
}
922922

923923
def lookupTempView(
924-
identifier: Seq[String], isStreaming: Boolean = false): Option[LogicalPlan] = {
924+
identifier: Seq[String],
925+
isStreaming: Boolean = false,
926+
performCheck: Boolean = false): Option[LogicalPlan] = {
925927
// Permanent View can't refer to temp views, no need to lookup at all.
926928
if (isResolvingView && !referredTempViewNames.contains(identifier)) return None
927929

@@ -934,7 +936,7 @@ class Analyzer(override val catalogManager: CatalogManager)
934936
if (isStreaming && tmpView.nonEmpty && !tmpView.get.isStreaming) {
935937
throw QueryCompilationErrors.readNonStreamingTempViewError(identifier.quoted)
936938
}
937-
tmpView.map(ResolveRelations.resolveViews)
939+
tmpView.map(ResolveRelations.resolveViews(_, performCheck))
938940
}
939941
}
940942

@@ -1098,7 +1100,7 @@ class Analyzer(override val catalogManager: CatalogManager)
10981100
// look at `AnalysisContext.catalogAndNamespace` when resolving relations with single-part name.
10991101
// If `AnalysisContext.catalogAndNamespace` is non-empty, analyzer will expand single-part names
11001102
// with it, instead of current catalog and namespace.
1101-
def resolveViews(plan: LogicalPlan): LogicalPlan = plan match {
1103+
def resolveViews(plan: LogicalPlan, performCheck: Boolean = false): LogicalPlan = plan match {
11021104
// The view's child should be a logical plan parsed from the `desc.viewText`, the variable
11031105
// `viewText` should be defined, or else we throw an error on the generation of the View
11041106
// operator.
@@ -1115,9 +1117,18 @@ class Analyzer(override val catalogManager: CatalogManager)
11151117
executeSameContext(child)
11161118
}
11171119
}
1120+
// Fail the analysis eagerly because outside AnalysisContext, the unresolved operators
1121+
// inside a view maybe resolved incorrectly.
1122+
// But for commands like `DropViewCommand`, resolving view is unnecessary even though
1123+
// there is unresolved node. So use the `performCheck` flag to skip the analysis check
1124+
// for these commands.
1125+
// TODO(SPARK-34504): avoid unnecessary view resolving and remove the `performCheck` flag
1126+
if (performCheck) {
1127+
checkAnalysis(newChild)
1128+
}
11181129
view.copy(child = newChild)
11191130
case p @ SubqueryAlias(_, view: View) =>
1120-
p.copy(child = resolveViews(view))
1131+
p.copy(child = resolveViews(view, performCheck))
11211132
case _ => plan
11221133
}
11231134

@@ -1137,14 +1148,14 @@ class Analyzer(override val catalogManager: CatalogManager)
11371148

11381149
case c @ CacheTable(u @ UnresolvedRelation(_, _, false), _, _, _) =>
11391150
lookupRelation(u.multipartIdentifier, u.options, false)
1140-
.map(resolveViews)
1151+
.map(resolveViews(_, performCheck = true))
11411152
.map(EliminateSubqueryAliases(_))
11421153
.map(relation => c.copy(table = relation))
11431154
.getOrElse(c)
11441155

11451156
case c @ UncacheTable(u @ UnresolvedRelation(_, _, false), _, _) =>
11461157
lookupRelation(u.multipartIdentifier, u.options, false)
1147-
.map(resolveViews)
1158+
.map(resolveViews(_, performCheck = true))
11481159
.map(EliminateSubqueryAliases(_))
11491160
.map(relation => c.copy(table = relation))
11501161
.getOrElse(c)
@@ -1170,7 +1181,7 @@ class Analyzer(override val catalogManager: CatalogManager)
11701181

11711182
case u: UnresolvedRelation =>
11721183
lookupRelation(u.multipartIdentifier, u.options, u.isStreaming)
1173-
.map(resolveViews).getOrElse(u)
1184+
.map(resolveViews(_, performCheck = true)).getOrElse(u)
11741185

11751186
case u @ UnresolvedTable(identifier, cmd, relationTypeMismatchHint) =>
11761187
lookupTableOrView(identifier).map {

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TableLookupCacheSuite.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.analysis
1919

2020
import java.io.File
2121

22+
import scala.collection.JavaConverters._
23+
2224
import org.mockito.ArgumentMatchers.any
2325
import org.mockito.Mockito._
2426
import org.mockito.invocation.InvocationOnMock
@@ -27,8 +29,8 @@ import org.scalatest.matchers.must.Matchers
2729
import org.apache.spark.sql.catalyst.TableIdentifier
2830
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFormat, CatalogTable, CatalogTableType, ExternalCatalog, InMemoryCatalog, SessionCatalog}
2931
import org.apache.spark.sql.catalyst.dsl.plans._
30-
import org.apache.spark.sql.connector.InMemoryTableCatalog
31-
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, Identifier, Table, V1Table}
32+
import org.apache.spark.sql.connector.{InMemoryTable, InMemoryTableCatalog}
33+
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, Identifier, Table}
3234
import org.apache.spark.sql.types._
3335

3436
class TableLookupCacheSuite extends AnalysisTest with Matchers {
@@ -46,7 +48,12 @@ class TableLookupCacheSuite extends AnalysisTest with Matchers {
4648
ignoreIfExists = false)
4749
val v2Catalog = new InMemoryTableCatalog {
4850
override def loadTable(ident: Identifier): Table = {
49-
V1Table(externalCatalog.getTable("default", ident.name))
51+
val catalogTable = externalCatalog.getTable("default", ident.name)
52+
new InMemoryTable(
53+
catalogTable.identifier.table,
54+
catalogTable.schema,
55+
Array.empty,
56+
Map.empty[String, String].asJava)
5057
}
5158
override def name: String = CatalogManager.SESSION_CATALOG_NAME
5259
}

sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,26 @@ abstract class SQLViewTestSuite extends QueryTest with SQLTestUtils {
258258
checkViewOutput(viewName, Seq(Row(2)))
259259
}
260260
}
261+
262+
test("SPARK-34490 - query should fail if the view refers a dropped table") {
263+
withTable("t") {
264+
Seq(2, 3, 1).toDF("c1").write.format("parquet").saveAsTable("t")
265+
val viewName = createView("testView", "SELECT * FROM t")
266+
withView(viewName) {
267+
// Always create a temp view in this case, not use `createView` on purpose
268+
sql("CREATE TEMP VIEW t AS SELECT 1 AS c1")
269+
withTempView("t") {
270+
checkViewOutput(viewName, Seq(Row(2), Row(3), Row(1)))
271+
// Manually drop table `t` to see if the query will fail
272+
sql("DROP TABLE IF EXISTS default.t")
273+
val e = intercept[AnalysisException] {
274+
sql(s"SELECT * FROM $viewName").collect()
275+
}.getMessage
276+
assert(e.contains("Table or view not found: t"))
277+
}
278+
}
279+
}
280+
}
261281
}
262282

263283
class LocalTempViewTestSuite extends SQLViewTestSuite with SharedSparkSession {

0 commit comments

Comments
 (0)