Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ class Analyzer(
case u : UnresolvedRelation =>
val substituted = cteRelations.find(x => resolver(x._1, u.tableIdentifier.table))
.map(_._2).map { relation =>
val withAlias = u.alias.map(SubqueryAlias(_, relation))
val withAlias = u.alias.map(SubqueryAlias(_, relation, None))
withAlias.getOrElse(relation)
}
substituted.getOrElse(u)
Expand Down Expand Up @@ -2058,7 +2058,7 @@ class Analyzer(
*/
object EliminateSubqueryAliases extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case SubqueryAlias(_, child) => child
case SubqueryAlias(_, child, _) => child
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ trait CheckAnalysis extends PredicateHelper {

// Skip projects and subquery aliases added by the Analyzer and the SQLBuilder.
def cleanQuery(p: LogicalPlan): LogicalPlan = p match {
case SubqueryAlias(_, child) => cleanQuery(child)
case Project(_, child) => cleanQuery(child)
case s: SubqueryAlias => cleanQuery(s.child)
case p: Project => cleanQuery(p.child)
case child => child
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,27 +411,29 @@ class SessionCatalog(
}

/**
* Return a [[LogicalPlan]] that represents the given table.
* Return a [[LogicalPlan]] that represents the given table or view.
*
* If a database is specified in `name`, this will return the table from that database.
* If no database is specified, this will first attempt to return a temporary table with
* the same name, then, if that does not exist, return the table from the current database.
* If a database is specified in `name`, this will return the table/view from that database.
* If no database is specified, this will first attempt to return a temporary table/view with
* the same name, then, if that does not exist, return the table/view from the current database.
*
* If the relation is a view, the relation will be wrapped in a [[SubqueryAlias]] which will
* track the name of the view.
*/
def lookupRelation(name: TableIdentifier, alias: Option[String] = None): LogicalPlan = {
synchronized {
val db = formatDatabaseName(name.database.getOrElse(currentDb))
val table = formatTableName(name.table)
val relation =
if (name.database.isDefined || !tempTables.contains(table)) {
val metadata = externalCatalog.getTable(db, table)
SimpleCatalogRelation(db, metadata)
} else {
tempTables(table)
val relationAlias = alias.getOrElse(table)
if (name.database.isDefined || !tempTables.contains(table)) {
val metadata = externalCatalog.getTable(db, table)
val view = Option(metadata.tableType).collect {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are too conservative here, CatalogTable.tableType should never be null, and we use this assumption in a lot of places.

case CatalogTableType.VIEW => name
}
val qualifiedTable = SubqueryAlias(table, relation)
// If an alias was specified by the lookup, wrap the plan in a subquery so that
// attributes are properly qualified with this alias.
alias.map(a => SubqueryAlias(a, qualifiedTable)).getOrElse(qualifiedTable)
SubqueryAlias(relationAlias, SimpleCatalogRelation(db, metadata), view)
} else {
SubqueryAlias(relationAlias, tempTables(table), Option(name))
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ package object dsl {
orderSpec: Seq[SortOrder]): LogicalPlan =
Window(windowExpressions, partitionSpec, orderSpec, logicalPlan)

def subquery(alias: Symbol): LogicalPlan = SubqueryAlias(alias.name, logicalPlan)
def subquery(alias: Symbol): LogicalPlan = SubqueryAlias(alias.name, logicalPlan, None)

def except(otherPlan: LogicalPlan): LogicalPlan = Except(logicalPlan, otherPlan)

Expand All @@ -367,7 +367,7 @@ package object dsl {

def as(alias: String): LogicalPlan = logicalPlan match {
case UnresolvedRelation(tbl, _) => UnresolvedRelation(tbl, Option(alias))
case plan => SubqueryAlias(alias, plan)
case plan => SubqueryAlias(alias, plan, None)
}

def repartition(num: Integer): LogicalPlan =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ case class ScalarSubquery(
override def dataType: DataType = query.schema.fields.head.dataType
override def foldable: Boolean = false
override def nullable: Boolean = true
override def plan: LogicalPlan = SubqueryAlias(toString, query)
override def plan: LogicalPlan = SubqueryAlias(toString, query, None)
override def withNewPlan(plan: LogicalPlan): ScalarSubquery = copy(query = plan)
override def toString: String = s"scalar-subquery#${exprId.id} $conditionString"
}
Expand Down Expand Up @@ -100,7 +100,7 @@ case class PredicateSubquery(
override lazy val resolved = childrenResolved && query.resolved
override lazy val references: AttributeSet = super.references -- query.outputSet
override def nullable: Boolean = nullAware
override def plan: LogicalPlan = SubqueryAlias(toString, query)
override def plan: LogicalPlan = SubqueryAlias(toString, query, None)
override def withNewPlan(plan: LogicalPlan): PredicateSubquery = copy(query = plan)
override def semanticEquals(o: Expression): Boolean = o match {
case p: PredicateSubquery =>
Expand Down Expand Up @@ -153,7 +153,7 @@ case class ListQuery(query: LogicalPlan, exprId: ExprId = NamedExpression.newExp
override def dataType: DataType = ArrayType(NullType)
override def nullable: Boolean = false
override def withNewPlan(plan: LogicalPlan): ListQuery = copy(query = plan)
override def plan: LogicalPlan = SubqueryAlias(toString, query)
override def plan: LogicalPlan = SubqueryAlias(toString, query, None)
override def toString: String = s"list#${exprId.id}"
}

Expand All @@ -174,6 +174,6 @@ case class Exists(query: LogicalPlan, exprId: ExprId = NamedExpression.newExprId
override def children: Seq[Expression] = Seq.empty
override def nullable: Boolean = false
override def withNewPlan(plan: LogicalPlan): Exists = copy(query = plan)
override def plan: LogicalPlan = SubqueryAlias(toString, query)
override def plan: LogicalPlan = SubqueryAlias(toString, query, None)
override def toString: String = s"exists#${exprId.id}"
}
Original file line number Diff line number Diff line change
Expand Up @@ -1862,7 +1862,7 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] {
// and Project operators, followed by an optional Filter, followed by an
// Aggregate. Traverse the operators recursively.
def evalPlan(lp : LogicalPlan) : Map[ExprId, Option[Any]] = lp match {
case SubqueryAlias(_, child) => evalPlan(child)
case SubqueryAlias(_, child, _) => evalPlan(child)
case Filter(condition, child) =>
val bindings = evalPlan(child)
if (bindings.isEmpty) bindings
Expand Down Expand Up @@ -1920,7 +1920,7 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] {
topPart += p
bottomPart = child

case s @ SubqueryAlias(_, child) =>
case s @ SubqueryAlias(_, child, _) =>
topPart += s
bottomPart = child

Expand Down Expand Up @@ -1991,8 +1991,8 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] {
topPart.reverse.foreach {
case Project(projList, _) =>
subqueryRoot = Project(projList ++ havingInputs, subqueryRoot)
case s @ SubqueryAlias(alias, _) =>
subqueryRoot = SubqueryAlias(alias, subqueryRoot)
case s @ SubqueryAlias(alias, _, None) =>
subqueryRoot = SubqueryAlias(alias, subqueryRoot, None)
case op => sys.error(s"Unexpected operator $op in corelated subquery")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
* This is only used for Common Table Expressions.
*/
override def visitNamedQuery(ctx: NamedQueryContext): SubqueryAlias = withOrigin(ctx) {
SubqueryAlias(ctx.name.getText, plan(ctx.queryNoWith))
SubqueryAlias(ctx.name.getText, plan(ctx.queryNoWith), None)
}

/**
Expand Down Expand Up @@ -723,7 +723,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
* Create an alias (SubqueryAlias) for a LogicalPlan.
*/
private def aliasPlan(alias: ParserRuleContext, plan: LogicalPlan): LogicalPlan = {
SubqueryAlias(alias.getText, plan)
SubqueryAlias(alias.getText, plan, None)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.plans.logical

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
Expand Down Expand Up @@ -693,7 +694,11 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNo
}
}

case class SubqueryAlias(alias: String, child: LogicalPlan) extends UnaryNode {
case class SubqueryAlias(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should document that, if the view is not None, the child will be a logical plan for a view.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm shall we just use a isView flag? Will the view have different name than the alias?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will follow-up on the documentation.

The alias can be different from the view identifier. The identifier also contains the name of the database. We would lose information if we would try to put in the alias.

alias: String,
child: LogicalPlan,
view: Option[TableIdentifier])
extends UnaryNode {

override def output: Seq[Attribute] = child.output.map(_.withQualifier(Some(alias)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,8 +339,8 @@ class AnalysisSuite extends AnalysisTest {
val query =
Project(Seq($"x.key", $"y.key"),
Join(
Project(Seq($"x.key"), SubqueryAlias("x", input)),
Project(Seq($"y.key"), SubqueryAlias("y", input)),
Project(Seq($"x.key"), SubqueryAlias("x", input, None)),
Project(Seq($"y.key"), SubqueryAlias("y", input, None)),
Inner, None))

assertAnalysisSuccess(query)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,31 +395,38 @@ class SessionCatalogSuite extends SparkFunSuite {
sessionCatalog.setCurrentDatabase("db2")
// If we explicitly specify the database, we'll look up the relation in that database
assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1", Some("db2")))
== SubqueryAlias("tbl1", SimpleCatalogRelation("db2", metastoreTable1)))
== SubqueryAlias("tbl1", SimpleCatalogRelation("db2", metastoreTable1), None))
// Otherwise, we'll first look up a temporary table with the same name
assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1"))
== SubqueryAlias("tbl1", tempTable1))
== SubqueryAlias("tbl1", tempTable1, Some(TableIdentifier("tbl1"))))
// Then, if that does not exist, look up the relation in the current database
sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false)
assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1"))
== SubqueryAlias("tbl1", SimpleCatalogRelation("db2", metastoreTable1)))
== SubqueryAlias("tbl1", SimpleCatalogRelation("db2", metastoreTable1), None))
}

test("lookup table relation with alias") {
val catalog = new SessionCatalog(newBasicCatalog())
val alias = "monster"
val tableMetadata = catalog.getTableMetadata(TableIdentifier("tbl1", Some("db2")))
val relation = SubqueryAlias("tbl1", SimpleCatalogRelation("db2", tableMetadata))
val relation = SubqueryAlias("tbl1", SimpleCatalogRelation("db2", tableMetadata), None)
val relationWithAlias =
SubqueryAlias(alias,
SubqueryAlias("tbl1",
SimpleCatalogRelation("db2", tableMetadata)))
SimpleCatalogRelation("db2", tableMetadata), None)
assert(catalog.lookupRelation(
TableIdentifier("tbl1", Some("db2")), alias = None) == relation)
assert(catalog.lookupRelation(
TableIdentifier("tbl1", Some("db2")), alias = Some(alias)) == relationWithAlias)
}

test("lookup view with view name in alias") {
val catalog = new SessionCatalog(newBasicCatalog())
val tmpView = Range(1, 10, 2, 10)
catalog.createTempView("vw1", tmpView, overrideIfExists = false)
val plan = catalog.lookupRelation(TableIdentifier("vw1"), Option("range"))
assert(plan == SubqueryAlias("range", tmpView, Option(TableIdentifier("vw1"))))
}

test("table exists") {
val catalog = new SessionCatalog(newBasicCatalog())
assert(catalog.tableExists(TableIdentifier("tbl1", Some("db2"))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,16 +320,16 @@ class ColumnPruningSuite extends PlanTest {
val query =
Project(Seq($"x.key", $"y.key"),
Join(
SubqueryAlias("x", input),
BroadcastHint(SubqueryAlias("y", input)), Inner, None)).analyze
SubqueryAlias("x", input, None),
BroadcastHint(SubqueryAlias("y", input, None)), Inner, None)).analyze

val optimized = Optimize.execute(query)

val expected =
Join(
Project(Seq($"x.key"), SubqueryAlias("x", input)),
Project(Seq($"x.key"), SubqueryAlias("x", input, None)),
BroadcastHint(
Project(Seq($"y.key"), SubqueryAlias("y", input))),
Project(Seq($"y.key"), SubqueryAlias("y", input, None))),
Inner, None).analyze

comparePlans(optimized, expected)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ class EliminateSubqueryAliasesSuite extends PlanTest with PredicateHelper {

test("eliminate top level subquery") {
val input = LocalRelation('a.int, 'b.int)
val query = SubqueryAlias("a", input)
val query = SubqueryAlias("a", input, None)
comparePlans(afterOptimization(query), input)
}

test("eliminate mid-tree subquery") {
val input = LocalRelation('a.int, 'b.int)
val query = Filter(TrueLiteral, SubqueryAlias("a", input))
val query = Filter(TrueLiteral, SubqueryAlias("a", input, None))
comparePlans(
afterOptimization(query),
Filter(TrueLiteral, LocalRelation('a.int, 'b.int)))
Expand All @@ -61,7 +61,7 @@ class EliminateSubqueryAliasesSuite extends PlanTest with PredicateHelper {
test("eliminate multiple subqueries") {
val input = LocalRelation('a.int, 'b.int)
val query = Filter(TrueLiteral,
SubqueryAlias("c", SubqueryAlias("b", SubqueryAlias("a", input))))
SubqueryAlias("c", SubqueryAlias("b", SubqueryAlias("a", input, None), None), None))
comparePlans(
afterOptimization(query),
Filter(TrueLiteral, LocalRelation('a.int, 'b.int)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,15 +97,15 @@ class JoinOptimizationSuite extends PlanTest {
val query =
Project(Seq($"x.key", $"y.key"),
Join(
SubqueryAlias("x", input),
BroadcastHint(SubqueryAlias("y", input)), Inner, None)).analyze
SubqueryAlias("x", input, None),
BroadcastHint(SubqueryAlias("y", input, None)), Inner, None)).analyze

val optimized = Optimize.execute(query)

val expected =
Join(
Project(Seq($"x.key"), SubqueryAlias("x", input)),
BroadcastHint(Project(Seq($"y.key"), SubqueryAlias("y", input))),
Project(Seq($"x.key"), SubqueryAlias("x", input, None)),
BroadcastHint(Project(Seq($"y.key"), SubqueryAlias("y", input, None))),
Inner, None).analyze

comparePlans(optimized, expected)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class PlanParserSuite extends PlanTest {
def cte(plan: LogicalPlan, namedPlans: (String, LogicalPlan)*): With = {
val ctes = namedPlans.map {
case (name, cte) =>
name -> SubqueryAlias(name, cte)
name -> SubqueryAlias(name, cte, None)
}
With(plan, ctes)
}
Expand Down
2 changes: 1 addition & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -967,7 +967,7 @@ class Dataset[T] private[sql](
* @since 1.6.0
*/
def as(alias: String): Dataset[T] = withTypedPlan {
SubqueryAlias(alias, logicalPlan)
SubqueryAlias(alias, logicalPlan, None)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class SQLBuilder private (
val aliasedOutput = canonicalizedPlan.output.zip(outputNames).map {
case (attr, name) => Alias(attr.withQualifier(None), name)()
}
val finalPlan = Project(aliasedOutput, SubqueryAlias(finalName, canonicalizedPlan))
val finalPlan = Project(aliasedOutput, SubqueryAlias(finalName, canonicalizedPlan, None))

try {
val replaced = finalPlan.transformAllExpressions {
Expand Down Expand Up @@ -440,7 +440,7 @@ class SQLBuilder private (

object RemoveSubqueriesAboveSQLTable extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case SubqueryAlias(_, t @ ExtractSQLTable(_)) => t
case SubqueryAlias(_, t @ ExtractSQLTable(_), _) => t
}
}

Expand Down Expand Up @@ -557,7 +557,7 @@ class SQLBuilder private (
}

private def addSubquery(plan: LogicalPlan): SubqueryAlias = {
SubqueryAlias(newSubqueryName(), plan)
SubqueryAlias(newSubqueryName(), plan, None)
}

private def addSubqueryIfNeeded(plan: LogicalPlan): LogicalPlan = plan match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class ResolveDataSource(sparkSession: SparkSession) extends Rule[LogicalPlan] {
s"${u.tableIdentifier.database.get}")
}
val plan = LogicalRelation(dataSource.resolveRelation())
u.alias.map(a => SubqueryAlias(u.alias.get, plan)).getOrElse(plan)
u.alias.map(a => SubqueryAlias(u.alias.get, plan, None)).getOrElse(plan)
} catch {
case e: ClassNotFoundException => u
case e: Exception =>
Expand Down
Loading