Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
fix.
  • Loading branch information
gatorsmile committed Nov 4, 2016
commit 509327e303984906ce4c87e459d6323582963089
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ class Analyzer(
case u : UnresolvedRelation =>
val substituted = cteRelations.find(x => resolver(x._1, u.tableIdentifier.table))
.map(_._2).map { relation =>
val withAlias = u.alias.map(SubqueryAlias(_, relation, None))
val withAlias = u.alias.map(SubqueryAlias(_, relation, None)())
withAlias.getOrElse(relation)
}
substituted.getOrElse(u)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,16 +553,16 @@ class SessionCatalog(
val relationAlias = alias.getOrElse(table)
if (db == globalTempViewManager.database) {
globalTempViewManager.get(table).map { viewDef =>
SubqueryAlias(relationAlias, viewDef, Some(name))
SubqueryAlias(relationAlias, viewDef, Some(name))(isGeneratedByTempTable = true)
}.getOrElse(throw new NoSuchTableException(db, table))
} else if (name.database.isDefined || !tempTables.contains(table)) {
val metadata = externalCatalog.getTable(db, table)
val view = Option(metadata.tableType).collect {
case CatalogTableType.VIEW => name
}
SubqueryAlias(relationAlias, SimpleCatalogRelation(db, metadata), view)
SubqueryAlias(relationAlias, SimpleCatalogRelation(db, metadata), view)()
} else {
SubqueryAlias(relationAlias, tempTables(table), Option(name))
SubqueryAlias(relationAlias, tempTables(table), Option(name))(isGeneratedByTempTable = true)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ package object dsl {
orderSpec: Seq[SortOrder]): LogicalPlan =
Window(windowExpressions, partitionSpec, orderSpec, logicalPlan)

def subquery(alias: Symbol): LogicalPlan = SubqueryAlias(alias.name, logicalPlan, None)
def subquery(alias: Symbol): LogicalPlan = SubqueryAlias(alias.name, logicalPlan, None)()

def except(otherPlan: LogicalPlan): LogicalPlan = Except(logicalPlan, otherPlan)

Expand All @@ -371,7 +371,7 @@ package object dsl {

def as(alias: String): LogicalPlan = logicalPlan match {
case UnresolvedRelation(tbl, _) => UnresolvedRelation(tbl, Option(alias))
case plan => SubqueryAlias(alias, plan, None)
case plan => SubqueryAlias(alias, plan, None)()
}

def repartition(num: Integer): LogicalPlan =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] {
case Project(projList, _) =>
subqueryRoot = Project(projList ++ havingInputs, subqueryRoot)
case s @ SubqueryAlias(alias, _, None) =>
subqueryRoot = SubqueryAlias(alias, subqueryRoot, None)
subqueryRoot = SubqueryAlias(alias, subqueryRoot, None)()
case op => sys.error(s"Unexpected operator $op in corelated subquery")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
* This is only used for Common Table Expressions.
*/
override def visitNamedQuery(ctx: NamedQueryContext): SubqueryAlias = withOrigin(ctx) {
SubqueryAlias(ctx.name.getText, plan(ctx.query), None)
SubqueryAlias(ctx.name.getText, plan(ctx.query), None)()
}

/**
Expand Down Expand Up @@ -729,7 +729,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
* Create an alias (SubqueryAlias) for a LogicalPlan.
*/
private def aliasPlan(alias: ParserRuleContext, plan: LogicalPlan): LogicalPlan = {
SubqueryAlias(alias.getText, plan, None)
SubqueryAlias(alias.getText, plan, None)()
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -709,10 +709,13 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNo
case class SubqueryAlias(
alias: String,
child: LogicalPlan,
view: Option[TableIdentifier])
view: Option[TableIdentifier])(
val isGeneratedByTempTable: java.lang.Boolean = false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a way to do it without introducing this?

Copy link
Member Author

@gatorsmile gatorsmile Nov 4, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just updated the PR description. This might be the cleanest way. The reason is explained below.

When finding an unresolved temporary view, Analyzer replaces it by a SubqueryAlias with the corresponding logical plan, which is stored in an in-memory HashMap. After replacement, it is impossible to detect whether the SubqueryAlias is added/generated from a temporary view. Thus, to detect the usage of a temporary view in view definition, we added an extra flag isGeneratedByTempTable into SubqueryAlias. The flag is added into the curried arguments. Via this extra flag, we can easily detect the usage of temporary view from a logical plan traversal.

Also cc @cloud-fan and @liancheng Do you have any better solution?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can do it from the unanalyzed plan, like what this PR did for temp functions.

extends UnaryNode {

override def output: Seq[Attribute] = child.output.map(_.withQualifier(Some(alias)))

override protected def otherCopyArgs: Seq[AnyRef] = isGeneratedByTempTable :: Nil
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,8 @@ class AnalysisSuite extends AnalysisTest with ShouldMatchers {
val query =
Project(Seq($"x.key", $"y.key"),
Join(
Project(Seq($"x.key"), SubqueryAlias("x", input, None)),
Project(Seq($"y.key"), SubqueryAlias("y", input, None)),
Project(Seq($"x.key"), SubqueryAlias("x", input, None)()),
Project(Seq($"y.key"), SubqueryAlias("y", input, None)()),
Cross, None))

assertAnalysisSuccess(query)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,24 +406,24 @@ class SessionCatalogSuite extends SparkFunSuite {
sessionCatalog.setCurrentDatabase("db2")
// If we explicitly specify the database, we'll look up the relation in that database
assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1", Some("db2")))
== SubqueryAlias("tbl1", SimpleCatalogRelation("db2", metastoreTable1), None))
== SubqueryAlias("tbl1", SimpleCatalogRelation("db2", metastoreTable1), None)())
// Otherwise, we'll first look up a temporary table with the same name
assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1"))
== SubqueryAlias("tbl1", tempTable1, Some(TableIdentifier("tbl1"))))
== SubqueryAlias("tbl1", tempTable1, Some(TableIdentifier("tbl1")))())
// Then, if that does not exist, look up the relation in the current database
sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false)
assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1"))
== SubqueryAlias("tbl1", SimpleCatalogRelation("db2", metastoreTable1), None))
== SubqueryAlias("tbl1", SimpleCatalogRelation("db2", metastoreTable1), None)())
}

test("lookup table relation with alias") {
val catalog = new SessionCatalog(newBasicCatalog())
val alias = "monster"
val tableMetadata = catalog.getTableMetadata(TableIdentifier("tbl1", Some("db2")))
val relation = SubqueryAlias("tbl1", SimpleCatalogRelation("db2", tableMetadata), None)
val relation = SubqueryAlias("tbl1", SimpleCatalogRelation("db2", tableMetadata), None)()
val relationWithAlias =
SubqueryAlias(alias,
SimpleCatalogRelation("db2", tableMetadata), None)
SimpleCatalogRelation("db2", tableMetadata), None)()
assert(catalog.lookupRelation(
TableIdentifier("tbl1", Some("db2")), alias = None) == relation)
assert(catalog.lookupRelation(
Expand All @@ -435,7 +435,7 @@ class SessionCatalogSuite extends SparkFunSuite {
val tmpView = Range(1, 10, 2, 10)
catalog.createTempView("vw1", tmpView, overrideIfExists = false)
val plan = catalog.lookupRelation(TableIdentifier("vw1"), Option("range"))
assert(plan == SubqueryAlias("range", tmpView, Option(TableIdentifier("vw1"))))
assert(plan == SubqueryAlias("range", tmpView, Option(TableIdentifier("vw1")))())
}

test("table exists") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,16 +320,16 @@ class ColumnPruningSuite extends PlanTest {
val query =
Project(Seq($"x.key", $"y.key"),
Join(
SubqueryAlias("x", input, None),
BroadcastHint(SubqueryAlias("y", input, None)), Inner, None)).analyze
SubqueryAlias("x", input, None)(),
BroadcastHint(SubqueryAlias("y", input, None)()), Inner, None)).analyze

val optimized = Optimize.execute(query)

val expected =
Join(
Project(Seq($"x.key"), SubqueryAlias("x", input, None)),
Project(Seq($"x.key"), SubqueryAlias("x", input, None)()),
BroadcastHint(
Project(Seq($"y.key"), SubqueryAlias("y", input, None))),
Project(Seq($"y.key"), SubqueryAlias("y", input, None)())),
Inner, None).analyze

comparePlans(optimized, expected)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ class EliminateSubqueryAliasesSuite extends PlanTest with PredicateHelper {

test("eliminate top level subquery") {
val input = LocalRelation('a.int, 'b.int)
val query = SubqueryAlias("a", input, None)
val query = SubqueryAlias("a", input, None)()
comparePlans(afterOptimization(query), input)
}

test("eliminate mid-tree subquery") {
val input = LocalRelation('a.int, 'b.int)
val query = Filter(TrueLiteral, SubqueryAlias("a", input, None))
val query = Filter(TrueLiteral, SubqueryAlias("a", input, None)())
comparePlans(
afterOptimization(query),
Filter(TrueLiteral, LocalRelation('a.int, 'b.int)))
Expand All @@ -61,7 +61,7 @@ class EliminateSubqueryAliasesSuite extends PlanTest with PredicateHelper {
test("eliminate multiple subqueries") {
val input = LocalRelation('a.int, 'b.int)
val query = Filter(TrueLiteral,
SubqueryAlias("c", SubqueryAlias("b", SubqueryAlias("a", input, None), None), None))
SubqueryAlias("c", SubqueryAlias("b", SubqueryAlias("a", input, None)(), None)(), None)())
comparePlans(
afterOptimization(query),
Filter(TrueLiteral, LocalRelation('a.int, 'b.int)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,15 @@ class JoinOptimizationSuite extends PlanTest {
val query =
Project(Seq($"x.key", $"y.key"),
Join(
SubqueryAlias("x", input, None),
BroadcastHint(SubqueryAlias("y", input, None)), Cross, None)).analyze
SubqueryAlias("x", input, None)(),
BroadcastHint(SubqueryAlias("y", input, None)()), Cross, None)).analyze

val optimized = Optimize.execute(query)

val expected =
Join(
Project(Seq($"x.key"), SubqueryAlias("x", input, None)),
BroadcastHint(Project(Seq($"y.key"), SubqueryAlias("y", input, None))),
Project(Seq($"x.key"), SubqueryAlias("x", input, None)()),
BroadcastHint(Project(Seq($"y.key"), SubqueryAlias("y", input, None)())),
Cross, None).analyze

comparePlans(optimized, expected)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class PlanParserSuite extends PlanTest {
def cte(plan: LogicalPlan, namedPlans: (String, LogicalPlan)*): With = {
val ctes = namedPlans.map {
case (name, cte) =>
name -> SubqueryAlias(name, cte, None)
name -> SubqueryAlias(name, cte, None)()
}
With(plan, ctes)
}
Expand Down
2 changes: 1 addition & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1039,7 +1039,7 @@ class Dataset[T] private[sql](
* @since 1.6.0
*/
def as(alias: String): Dataset[T] = withTypedPlan {
SubqueryAlias(alias, logicalPlan, None)
SubqueryAlias(alias, logicalPlan, None)()
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class SQLBuilder private (
val aliasedOutput = canonicalizedPlan.output.zip(outputNames).map {
case (attr, name) => Alias(attr.withQualifier(None), name)()
}
val finalPlan = Project(aliasedOutput, SubqueryAlias(finalName, canonicalizedPlan, None))
val finalPlan = Project(aliasedOutput, SubqueryAlias(finalName, canonicalizedPlan, None)())

try {
val replaced = finalPlan.transformAllExpressions {
Expand Down Expand Up @@ -563,7 +563,7 @@ class SQLBuilder private (
}

private def addSubquery(plan: LogicalPlan): SubqueryAlias = {
SubqueryAlias(newSubqueryName(), plan, None)
SubqueryAlias(newSubqueryName(), plan, None)()
}

private def addSubqueryIfNeeded(plan: LogicalPlan): LogicalPlan = plan match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@ package org.apache.spark.sql.execution.command

import scala.util.control.NonFatal

import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession}
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.{SQLBuilder, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.expressions.Alias
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.execution.datasources.{DataSource, LogicalRelation}
import org.apache.spark.sql.types.{MetadataBuilder, StructType}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, SubqueryAlias}
import org.apache.spark.sql.types.MetadataBuilder


/**
Expand Down Expand Up @@ -131,6 +130,18 @@ case class CreateViewCommand(
s"specified by CREATE VIEW (num: `${userSpecifiedColumns.length}`).")
}

// When creating a permanent view, not allowed to reference temporary objects. For example,
// temporary views.
// TODO: Disallow creating permanent views based on temporary UDFs
if (!isTemporary) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also i'd move the check into a function, so it is more obvious what's going on with the main workflow.

analyzedPlan.collectFirst {
case s: SubqueryAlias if s.isGeneratedByTempTable =>
throw new AnalysisException(s"Not allowed to create a permanent view $name by " +
s"referencing a temp view `${s.alias}`. " +
originalText.map(sql => s"""SQL: "$sql".""").getOrElse(""))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need to show the entire sql? it's basically what the user just typed in

}
}

val aliasedPlan = if (userSpecifiedColumns.isEmpty) {
analyzedPlan
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class ResolveDataSource(sparkSession: SparkSession) extends Rule[LogicalPlan] {
s"${u.tableIdentifier.database.get}")
}
val plan = LogicalRelation(dataSource.resolveRelation())
u.alias.map(a => SubqueryAlias(u.alias.get, plan, None)).getOrElse(plan)
u.alias.map(a => SubqueryAlias(u.alias.get, plan, None)()).getOrElse(plan)
} catch {
case e: ClassNotFoundException => u
case e: Exception =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,21 +106,21 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log

if (DDLUtils.isDatasourceTable(table)) {
val dataSourceTable = cachedDataSourceTables(qualifiedTableName)
val qualifiedTable = SubqueryAlias(qualifiedTableName.name, dataSourceTable, None)
val qualifiedTable = SubqueryAlias(qualifiedTableName.name, dataSourceTable, None)()
// Then, if alias is specified, wrap the table with a Subquery using the alias.
// Otherwise, wrap the table with a Subquery using the table name.
alias.map(a => SubqueryAlias(a, qualifiedTable, None)).getOrElse(qualifiedTable)
alias.map(a => SubqueryAlias(a, qualifiedTable, None)()).getOrElse(qualifiedTable)
} else if (table.tableType == CatalogTableType.VIEW) {
val viewText = table.viewText.getOrElse(sys.error("Invalid view without text."))
SubqueryAlias(
alias.getOrElse(table.identifier.table),
sparkSession.sessionState.sqlParser.parsePlan(viewText),
Option(table.identifier))
Option(table.identifier))()
} else {
val qualifiedTable =
MetastoreRelation(
qualifiedTableName.database, qualifiedTableName.name)(table, sparkSession)
alias.map(a => SubqueryAlias(a, qualifiedTable, None)).getOrElse(qualifiedTable)
alias.map(a => SubqueryAlias(a, qualifiedTable, None)()).getOrElse(qualifiedTable)
}
}

Expand Down Expand Up @@ -312,7 +312,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
// Read path
case relation: MetastoreRelation if shouldConvertMetastoreParquet(relation) =>
val parquetRelation = convertToParquetRelation(relation)
SubqueryAlias(relation.tableName, parquetRelation, None)
SubqueryAlias(relation.tableName, parquetRelation, None)()
}
}
}
Expand Down Expand Up @@ -350,7 +350,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
// Read path
case relation: MetastoreRelation if shouldConvertMetastoreOrc(relation) =>
val orcRelation = convertToOrcRelation(relation)
SubqueryAlias(relation.tableName, orcRelation, None)
SubqueryAlias(relation.tableName, orcRelation, None)()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,18 @@ private[sql] class HiveSessionCatalog(
if (db == globalTempViewManager.database) {
val relationAlias = alias.getOrElse(table)
globalTempViewManager.get(table).map { viewDef =>
SubqueryAlias(relationAlias, viewDef, Some(name))
SubqueryAlias(relationAlias, viewDef, Some(name))(isGeneratedByTempTable = true)
}.getOrElse(throw new NoSuchTableException(db, table))
} else if (name.database.isDefined || !tempTables.contains(table)) {
val database = name.database.map(formatDatabaseName)
val newName = name.copy(database = database, table = table)
metastoreCatalog.lookupRelation(newName, alias)
} else {
val relation = tempTables(table)
val tableWithQualifiers = SubqueryAlias(table, relation, None)
val tableWithQualifiers = SubqueryAlias(table, relation, None)(isGeneratedByTempTable = true)
// If an alias was specified by the lookup, wrap the plan in a subquery so that
// attributes are properly qualified with this alias.
alias.map(a => SubqueryAlias(a, tableWithQualifiers, None)).getOrElse(tableWithQualifiers)
alias.map(a => SubqueryAlias(a, tableWithQualifiers, None)()).getOrElse(tableWithQualifiers)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,38 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
spark.sql(s"DROP TABLE IF EXISTS jt")
}

test("nested views (interleaved with temporary views)") {
withView("jtv1", "jtv2", "jtv3", "temp_jtv1", "temp_jtv2", "temp_jtv3") {
test("create a persistent view on a persistent view") {
withView("jtv1", "jtv2") {
sql("CREATE VIEW jtv1 AS SELECT * FROM jt WHERE id > 3")
sql("CREATE VIEW jtv2 AS SELECT * FROM jtv1 WHERE id < 6")
checkAnswer(sql("select count(*) FROM jtv2"), Row(2))
}
}

test("create a temp view on a persistent view") {
withView("jtv1", "temp_jtv1") {
sql("CREATE VIEW jtv1 AS SELECT * FROM jt WHERE id > 3")
sql("CREATE TEMPORARY VIEW temp_jtv1 AS SELECT * FROM jtv1 WHERE id < 6")
checkAnswer(sql("select count(*) FROM temp_jtv1"), Row(2))
}
}

// Checks temporary views
test("create a temp view on a temp view") {
withView("temp_jtv1", "temp_jtv2") {
sql("CREATE TEMPORARY VIEW temp_jtv1 AS SELECT * FROM jt WHERE id > 3")
sql("CREATE TEMPORARY VIEW temp_jtv2 AS SELECT * FROM temp_jtv1 WHERE id < 6")
checkAnswer(sql("select count(*) FROM temp_jtv2"), Row(2))
}
}

// Checks interleaved temporary view and normal view
sql("CREATE TEMPORARY VIEW temp_jtv3 AS SELECT * FROM jt WHERE id > 3")
sql("CREATE VIEW jtv3 AS SELECT * FROM temp_jtv3 WHERE id < 6")
checkAnswer(sql("select count(*) FROM jtv3"), Row(2))
test("create a persistent view on a temp view") {
withView("jtv1", "temp_jtv1") {
sql("CREATE TEMPORARY VIEW temp_jtv1 AS SELECT * FROM jt WHERE id > 3")
val e = intercept[AnalysisException] {
sql("CREATE VIEW jtv1 AS SELECT * FROM temp_jtv1 WHERE id < 6")
}.getMessage
assert(e.contains(
s"Not allowed to create a permanent view `jtv1` by referencing a temp view `temp_jtv1`"))
}
}

Expand Down