Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
address comments
  • Loading branch information
anchovYu committed Dec 9, 2022
commit 94d5c9ee7c095b40ea5fe676fa50bc7acc5fe885
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ class AttributeMap[A](val baseMap: Map[ExprId, (Attribute, A)])

override def contains(k: Attribute): Boolean = get(k).isDefined

override def + [B1 >: A](kv: (Attribute, B1)): Map[Attribute, B1] = baseMap.values.toMap + kv
override def + [B1 >: A](kv: (Attribute, B1)): AttributeMap[B1] =
AttributeMap(baseMap.values.toMap + kv)

override def iterator: Iterator[(Attribute, A)] = baseMap.valuesIterator

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ class AttributeMap[A](val baseMap: Map[ExprId, (Attribute, A)])

override def contains(k: Attribute): Boolean = get(k).isDefined

override def + [B1 >: A](kv: (Attribute, B1)): AttributeMap[B1] =
AttributeMap(baseMap.values.toMap + kv)

override def updated[B1 >: A](key: Attribute, value: B1): Map[Attribute, B1] =
baseMap.values.toMap + (key -> value)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1787,11 +1787,9 @@ class Analyzer(override val catalogManager: CatalogManager)
*/
private def resolveByLateralAlias(
nameParts: Seq[String], lateralAlias: Alias): Option[LateralColumnAliasReference] = {
// TODO question: everytime it resolves the extract field it generates a new exprId.
// Does it matter?
val resolvedAttr = resolveExpressionByPlanOutput(
expr = UnresolvedAttribute(nameParts),
plan = Project(Seq(lateralAlias), OneRowRelation()),
plan = LocalRelation(Seq(lateralAlias.toAttribute)),
throws = false
).asInstanceOf[NamedExpression]
if (resolvedAttr.resolved) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, LateralColumnAliasReference, NamedExpression}
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeMap, LateralColumnAliasReference, NamedExpression}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern.{LATERAL_COLUMN_ALIAS_REFERENCE}
import org.apache.spark.sql.catalyst.trees.TreePattern.LATERAL_COLUMN_ALIAS_REFERENCE
import org.apache.spark.sql.internal.SQLConf

/**
Expand Down Expand Up @@ -76,12 +76,12 @@ object ResolveLateralColumnAliasReference extends Rule[LogicalPlan] {
_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE), ruleId) {
case p @ Project(projectList, child) if p.resolved
&& projectList.exists(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) =>
var aliasMap = Map[Attribute, AliasEntry]()
var aliasMap = AttributeMap.empty[AliasEntry]
val referencedAliases = collection.mutable.Set.empty[AliasEntry]
def unwrapLCAReference(e: NamedExpression): NamedExpression = {
e.transformWithPruning(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) {
case lcaRef: LateralColumnAliasReference if aliasMap.contains(lcaRef.a) =>
val aliasEntry = aliasMap(lcaRef.a)
val aliasEntry = aliasMap.get(lcaRef.a).get
// If there is no chaining of lateral column alias reference, push down the alias
// and unwrap the LateralColumnAliasReference to the NamedExpression inside
// If there is chaining, don't resolve and save to future rounds
Expand Down