diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala index ca3c14177e6b..8de2663a9809 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala @@ -218,6 +218,11 @@ object NestedColumnAliasing { case _ => false } + private def canAlias(ev: Expression): Boolean = { + // we can not alias the attr from lambda variable whose expr id is not available + !ev.exists(_.isInstanceOf[NamedLambdaVariable]) && ev.references.size == 1 + } + /** * Returns two types of expressions: * - Root references that are individually accessed @@ -226,11 +231,11 @@ object NestedColumnAliasing { */ private def collectRootReferenceAndExtractValue(e: Expression): Seq[Expression] = e match { case _: AttributeReference => Seq(e) - case GetStructField(_: ExtractValue | _: AttributeReference, _, _) => Seq(e) + case GetStructField(_: ExtractValue | _: AttributeReference, _, _) if canAlias(e) => Seq(e) case GetArrayStructFields(_: MapValues | _: MapKeys | _: ExtractValue | - _: AttributeReference, _, _, _, _) => Seq(e) + _: AttributeReference, _, _, _, _) if canAlias(e) => Seq(e) case es if es.children.nonEmpty => es.children.flatMap(collectRootReferenceAndExtractValue) case _ => Seq.empty } @@ -249,13 +254,8 @@ object NestedColumnAliasing { val otherRootReferences = new mutable.ArrayBuffer[AttributeReference]() exprList.foreach { e => extractor(e).foreach { - // we can not alias the attr from lambda variable whose expr id is not available - case ev: ExtractValue if !ev.exists(_.isInstanceOf[NamedLambdaVariable]) => - if (ev.references.size == 1) { - nestedFieldReferences.append(ev) - } + case ev: ExtractValue => nestedFieldReferences.append(ev) case ar: AttributeReference => otherRootReferences.append(ar) - case _ => // ignore } } val exclusiveAttrSet = AttributeSet(exclusiveAttrs ++ otherRootReferences) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala index bd0cc6216f7a..38cd25cf491a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala @@ -863,6 +863,27 @@ class NestedColumnAliasingSuite extends SchemaPruningTest { // The plan is expected to be unchanged. comparePlans(plan, RemoveNoopOperators.apply(optimized.get)) } + + test("SPARK-48428: Do not pushdown when attr is used in expression with mutliple references") { + val query = contact + .limit(5) + .select( + GetStructField(GetStructField(CreateStruct(Seq($"id", $"employer")), 1), 0), + $"employer.id") + .analyze + + val optimized = Optimize.execute(query) + + val expected = contact + .select($"id", $"employer") + .limit(5) + .select( + GetStructField(GetStructField(CreateStruct(Seq($"id", $"employer")), 1), 0), + $"employer.id") + .analyze + + comparePlans(optimized, expected) + } } object NestedColumnAliasingSuite {