Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
04959c2
refactor analyzer adding a new object
anchovYu Nov 23, 2022
6f44c85
lca code
anchovYu Nov 23, 2022
725e5ac
add tests, refine logic
anchovYu Nov 28, 2022
660e1d2
move lca rule to a new file
anchovYu Nov 28, 2022
fd06094
rename conf
anchovYu Nov 28, 2022
7d4f80f
test failure
anchovYu Nov 29, 2022
b9704d5
small fix
anchovYu Nov 29, 2022
777f13a
temp commit, still in implementation
anchovYu Nov 29, 2022
09480ea
a temporary solution, but still fail certain cases
anchovYu Nov 30, 2022
c972738
working solution, needs some refinement
anchovYu Dec 1, 2022
97ee293
Merge remote-tracking branch 'apache/master' into SPARK-27561-refactor
anchovYu Dec 1, 2022
5785943
make changes to accomodate the recent refactor
anchovYu Dec 2, 2022
757cffb
introduce leaf exp in Project as well
anchovYu Dec 5, 2022
29de892
handle a corner case
anchovYu Dec 5, 2022
72991c6
add more tests; add check rule
anchovYu Dec 6, 2022
d45fe31
uplift the necessity to resolve expression in second phase; add more …
anchovYu Dec 8, 2022
1f55f73
address comments to add tests for LCA off
anchovYu Dec 8, 2022
f753529
revert the refactor, split LCA into two rules
anchovYu Dec 9, 2022
b9f706f
better refactor
anchovYu Dec 9, 2022
94d5c9e
address comments
anchovYu Dec 9, 2022
d2e75fd
Merge branch 'SPARK-27561-refactor' into SPARK-27561-agg
anchovYu Dec 9, 2022
edde37c
basic version passing all tests
anchovYu Dec 9, 2022
fb7b18c
update the logic, add and refactor tests
anchovYu Dec 12, 2022
3698cff
update comments
anchovYu Dec 13, 2022
e700d6a
add a corner case comment
anchovYu Dec 13, 2022
8d20986
address comments
anchovYu Dec 13, 2022
d952aa7
Merge branch 'SPARK-27561-refactor' into SPARK-27561-agg
anchovYu Dec 13, 2022
44d5a3d
Merge remote-tracking branch 'apache/master' into SPARK-27561-agg
anchovYu Dec 13, 2022
ccebc1c
revert some changes
anchovYu Dec 13, 2022
5540b70
fix few todos
anchovYu Dec 13, 2022
338ba11
Merge remote-tracking branch 'apache/master' into SPARK-27561-agg
anchovYu Dec 16, 2022
136a930
fix the failing test
anchovYu Dec 16, 2022
5076ad2
fix the missing_aggregate issue, turn on conf to see failed tests
anchovYu Dec 19, 2022
2f2dee5
remove few todos
anchovYu Dec 19, 2022
3a5509a
better fix to maintain aggregate error: only lift up in certain cases
anchovYu Dec 20, 2022
a23debb
Merge remote-tracking branch 'apache/master' into SPARK-27561-agg
anchovYu Dec 20, 2022
b200da0
typo
anchovYu Dec 20, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
make changes to accomodate the recent refactor
  • Loading branch information
anchovYu committed Dec 2, 2022
commit 5785943fbb53b525b4434b7566d4f466461ceb61
Original file line number Diff line number Diff line change
Expand Up @@ -1844,7 +1844,7 @@ class Analyzer(override val catalogManager: CatalogManager)
// Only Project and Aggregate can host star expressions.
case u @ (_: Project | _: Aggregate) =>
Try(s.expand(u.children.head, resolver)) match {
case Success(expanded) => expanded.map(wrapOuterReference)
case Success(expanded) => expanded.map(wrapOuterReference(_))
case Failure(_) => throw e
}
// Do not use the outer plan to resolve the star expression
Expand Down Expand Up @@ -2165,7 +2165,7 @@ class Analyzer(override val catalogManager: CatalogManager)
case u @ UnresolvedAttribute(nameParts) => withPosition(u) {
try {
AnalysisContext.get.outerPlan.get.resolveChildren(nameParts, resolver) match {
case Some(resolved) => wrapOuterReference(resolved)
case Some(resolved) => wrapOuterReference(resolved, Some(nameParts))
case None => u
}
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, NamedExpression}
import org.apache.spark.sql.catalyst.expressions.{Alias, NamedExpression, OuterReference}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_ATTRIBUTE
import org.apache.spark.sql.catalyst.trees.TreePattern.{OUTER_REFERENCE, UNRESOLVED_ATTRIBUTE}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
Expand All @@ -31,6 +31,14 @@ import org.apache.spark.sql.internal.SQLConf
* resolved by other rules
* - in Aggregate TODO.
*
* The name resolution priority:
* local table column > local lateral column alias > outer reference
*
* Because lateral column alias has higher resolution priority than outer reference, it will try
* to resolve an [[OuterReference]] using lateral column alias, similar as an
* [[UnresolvedAttribute]]. If success, it strips [[OuterReference]] and restores it back to
* [[UnresolvedAttribute]]
*
* For Project, it rewrites by inserting a newly created Project plan between the original Project
* and its child, pushing the referenced lateral column aliases to this new Project, and updating
* the project list of the original Project.
Expand All @@ -51,39 +59,51 @@ object ResolveLateralColumnAlias extends Rule[LogicalPlan] {
def resolver: Resolver = conf.resolver

private def rewriteLateralColumnAlias(plan: LogicalPlan): LogicalPlan = {
plan.resolveOperatorsUpWithPruning(_.containsPattern(UNRESOLVED_ATTRIBUTE), ruleId) {
plan.resolveOperatorsUpWithPruning(
_.containsAnyPattern(UNRESOLVED_ATTRIBUTE, OUTER_REFERENCE), ruleId) {
case p @ Project(projectList, child) if p.childrenResolved
&& !Analyzer.containsStar(projectList)
&& projectList.exists(_.containsPattern(UNRESOLVED_ATTRIBUTE)) =>
&& projectList.exists(_.containsAnyPattern(UNRESOLVED_ATTRIBUTE, OUTER_REFERENCE)) =>

var aliasMap = CaseInsensitiveMap(Map[String, Seq[AliasEntry]]())
def insertIntoAliasMap(a: Alias, idx: Int): Unit = {
val prevAliases = aliasMap.getOrElse(a.name, Seq.empty[AliasEntry])
aliasMap += (a.name -> (prevAliases :+ AliasEntry(a, idx)))
}
def lookUpLCA(e: Expression): Seq[AliasEntry] = {
var matchedLCA: Seq[AliasEntry] = Seq.empty[AliasEntry]
e.transformWithPruning(_.containsPattern(UNRESOLVED_ATTRIBUTE)) {

val referencedAliases = collection.mutable.Set.empty[AliasEntry]
def resolveLCA(e: NamedExpression): NamedExpression = {
e.transformWithPruning(_.containsAnyPattern(UNRESOLVED_ATTRIBUTE, OUTER_REFERENCE)) {
case o: OuterReference
if aliasMap.contains(o.nameParts.map(_.head).getOrElse(o.name)) =>
val name = o.nameParts.map(_.head).getOrElse(o.name)
val aliases = aliasMap.get(name).get
aliases.size match {
case n if n > 1 =>
throw QueryCompilationErrors.ambiguousLateralColumnAlias(o.name, n)
case n if n == 1 && aliases.head.alias.resolved =>
// Only resolved alias can be the lateral column alias
referencedAliases += aliases.head
o.nameParts.map(UnresolvedAttribute(_)).getOrElse(UnresolvedAttribute(o.name))
case _ =>
o
}
case u: UnresolvedAttribute if aliasMap.contains(u.nameParts.head) &&
Analyzer.resolveExpressionByPlanChildren(u, p, resolver)
.isInstanceOf[UnresolvedAttribute] =>
val aliases = aliasMap.get(u.nameParts.head).get
aliases.size match {
case n if n > 1 =>
throw QueryCompilationErrors.ambiguousLateralColumnAlias(u.name, n)
case _ =>
val referencedAlias = aliases.head
case n if n == 1 && aliases.head.alias.resolved =>
// Only resolved alias can be the lateral column alias
if (referencedAlias.alias.resolved) {
matchedLCA :+= referencedAlias
}
referencedAliases += aliases.head
case _ =>
}
u
}
matchedLCA
}.asInstanceOf[NamedExpression]
}

val referencedAliases = projectList.zipWithIndex.flatMap {
val newProjectList = projectList.zipWithIndex.map {
case (a: Alias, idx) =>
// Add all alias to the aliasMap. But note only resolved alias can be LCA and pushed
// down. Unresolved alias is added to the map to perform the ambiguous name check.
Expand All @@ -92,17 +112,17 @@ object ResolveLateralColumnAlias extends Rule[LogicalPlan] {
// only 1 AS a is pushed down, even though 1 AS a, 'a + 1 AS b and 'b + 1 AS c are
// all added to the aliasMap. On the second round, when 'a + 1 AS b is resolved,
// it is pushed down.
val matchedLCA = lookUpLCA(a)
insertIntoAliasMap(a, idx)
matchedLCA
val lcaResolved = resolveLCA(a).asInstanceOf[Alias]
insertIntoAliasMap(lcaResolved, idx)
lcaResolved
case (e, _) =>
lookUpLCA(e)
}.toSet
resolveLCA(e)
}

if (referencedAliases.isEmpty) {
p
} else {
val outerProjectList = collection.mutable.Seq(projectList: _*)
val outerProjectList = collection.mutable.Seq(newProjectList: _*)
val innerProjectList =
collection.mutable.ArrayBuffer(child.output.map(_.asInstanceOf[NamedExpression]): _*)
referencedAliases.foreach { case AliasEntry(alias: Alias, idx) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,8 +424,18 @@ case class OuterReference(e: NamedExpression)
override def qualifier: Seq[String] = e.qualifier
override def exprId: ExprId = e.exprId
override def toAttribute: Attribute = e.toAttribute
override def newInstance(): NamedExpression = OuterReference(e.newInstance())
override def newInstance(): NamedExpression =
OuterReference(e.newInstance()).setNameParts(nameParts)
final override val nodePatterns: Seq[TreePattern] = Seq(OUTER_REFERENCE)

// optional field of the original name parts of UnresolvedAttribute before it is resolved to
// OuterReference. Used in rule ResolveLateralColumnAlias to restore OuterReference back to
// UnresolvedAttribute.
var nameParts: Option[Seq[String]] = None
def setNameParts(newNameParts: Option[Seq[String]]): OuterReference = {
nameParts = newNameParts
this
}
}

object VirtualColumn {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ object SubExprUtils extends PredicateHelper {
/**
* Wrap attributes in the expression with [[OuterReference]]s.
*/
def wrapOuterReference[E <: Expression](e: E): E = {
e.transform { case a: Attribute => OuterReference(a) }.asInstanceOf[E]
def wrapOuterReference[E <: Expression](e: E, nameParts: Option[Seq[String]] = None): E = {
e.transform { case a: Attribute => OuterReference(a).setNameParts(nameParts) }.asInstanceOf[E]
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql
import org.scalactic.source.Position
import org.scalatest.Tag

import org.apache.spark.sql.catalyst.trees.TreePattern.OUTER_REFERENCE
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession

Expand Down Expand Up @@ -59,6 +60,17 @@ class LateralColumnAliasSuite extends QueryTest with SharedSparkSession {
}
}

private def withLCAOff(f: => Unit): Unit = {
withSQLConf(SQLConf.LATERAL_COLUMN_ALIAS_IMPLICIT_ENABLED.key -> "false") {
f
}
}
private def withLCAOn(f: => Unit): Unit = {
withSQLConf(SQLConf.LATERAL_COLUMN_ALIAS_IMPLICIT_ENABLED.key -> "true") {
f
}
}

test("Lateral alias in project") {
checkAnswer(sql(s"select dept as d, d + 1 as e from $testTable where name = 'amy'"),
Row(1, 2))
Expand Down Expand Up @@ -106,4 +118,123 @@ class LateralColumnAliasSuite extends QueryTest with SharedSparkSession {
Row(18000, 18000, 10000)
)
}

test("Duplicated lateral alias names - Project") {
def checkDuplicatedAliasErrorHelper(query: String, parameters: Map[String, String]): Unit = {
checkError(
exception = intercept[AnalysisException] {sql(query)},
errorClass = "AMBIGUOUS_LATERAL_COLUMN_ALIAS",
sqlState = "42000",
parameters = parameters
)
}

// Has duplicated names but not referenced is fine
checkAnswer(
sql(s"SELECT salary AS d, bonus AS d FROM $testTable WHERE name = 'jen'"),
Row(12000, 1200)
)
checkAnswer(
sql(s"SELECT salary AS d, d, 10000 AS d FROM $testTable WHERE name = 'jen'"),
Row(12000, 12000, 10000)
)
checkAnswer(
sql(s"SELECT salary * 1.5 AS d, d, 10000 AS d FROM $testTable WHERE name = 'jen'"),
Row(18000, 18000, 10000)
)
checkAnswer(
sql(s"SELECT salary + 1000 AS new_salary, new_salary * 1.0 AS new_salary " +
s"FROM $testTable WHERE name = 'jen'"),
Row(13000, 13000.0))

// Referencing duplicated names raises error
checkDuplicatedAliasErrorHelper(
s"SELECT salary * 1.5 AS d, d, 10000 AS d, d + 1 FROM $testTable",
parameters = Map("name" -> "`d`", "n" -> "2")
)
checkDuplicatedAliasErrorHelper(
s"SELECT 10000 AS d, d * 1.0, salary * 1.5 AS d, d FROM $testTable",
parameters = Map("name" -> "`d`", "n" -> "2")
)
checkDuplicatedAliasErrorHelper(
s"SELECT salary AS d, d + 1 AS d, d + 1 AS d FROM $testTable",
parameters = Map("name" -> "`d`", "n" -> "2")
)
checkDuplicatedAliasErrorHelper(
s"SELECT salary * 1.5 AS d, d, bonus * 1.5 AS d, d + d FROM $testTable",
parameters = Map("name" -> "`d`", "n" -> "2")
)

checkAnswer(
sql(
s"""
|SELECT salary * 1.5 AS salary, salary, 10000 AS salary, salary
|FROM $testTable
|WHERE name = 'jen'
|""".stripMargin),
Row(18000, 12000, 10000, 12000)
)
}

test("Lateral alias conflicts with OuterReference - Project") {
// an attribute can both be resolved as LCA and OuterReference
val query1 =
s"""
|SELECT *
|FROM range(1, 7)
|WHERE (
| SELECT id2
| FROM (SELECT 1 AS id, id + 1 AS id2)) > 5
|ORDER BY id
|""".stripMargin
withLCAOff { checkAnswer(sql(query1), Row(5) :: Row(6) :: Nil) }
withLCAOn { checkAnswer(sql(query1), Seq.empty) }

// an attribute can only be resolved as LCA
val query2 =
s"""
|SELECT *
|FROM range(1, 7)
|WHERE (
| SELECT id2
| FROM (SELECT 1 AS id1, id1 + 1 AS id2)) > 5
|""".stripMargin
withLCAOff {
assert(intercept[AnalysisException] { sql(query2) }
.getErrorClass == "UNRESOLVED_COLUMN.WITHOUT_SUGGESTION")
}
withLCAOn { checkAnswer(sql(query2), Seq.empty) }

// an attribute should only be resolved as OuterReference
val query3 =
s"""
|SELECT *
|FROM range(1, 7) outer_table
|WHERE (
| SELECT id2
| FROM (SELECT 1 AS id, outer_table.id + 1 AS id2)) > 5
|""".stripMargin
withLCAOff { checkAnswer(sql(query3), Row(5) :: Row(6) :: Nil) }
withLCAOn { checkAnswer(sql(query3), Row(5) :: Row(6) :: Nil) }

// a bit complex subquery that the id + 1 is first wrapped with OuterReference
// test if lca rule strips the OuterReference and resolves to lateral alias
val query4 =
s"""
|SELECT *
|FROM range(1, 7)
|WHERE (
| SELECT id2
| FROM (SELECT dept * 2.0 AS id, id + 1 AS id2 FROM $testTable)) > 5
|ORDER BY id
|""".stripMargin
withLCAOff { intercept[AnalysisException] { sql(query4) } } // surprisingly can't run ..
withLCAOn {
val analyzedPlan = sql(query4).queryExecution.analyzed
assert(!analyzedPlan.containsPattern(OUTER_REFERENCE))
// but running it triggers exception
// checkAnswer(sql(query4), Range(1, 7).map(Row(_)))
}
}
// TODO: LCA in subquery
}