From f09003e76d0cc9d4b4785845ba446f3f86ef90cc Mon Sep 17 00:00:00 2001 From: Emil Ejbyfeldt Date: Mon, 27 May 2024 08:48:15 +0200 Subject: [PATCH 1/2] SPARK-48428: Fix IllegalStateException in NestedColumnAliasing In #35170 SPARK-37855 and #32301 SPARK-35194 introduced conditions for ExtractValues that can currently not be handled. The considtion is introduced after `collectRootReferenceAndExtractValue` and just removes these candidates. This is problematic since these expressions might have contained `AttributeReference` that needed to not do an incorrect rewrite. This fixes these family of bugs by moving the conditions into the function `collectRootReferenceAndExtractValue`. --- .../optimizer/NestedColumnAliasing.scala | 16 +++++++------- .../optimizer/NestedColumnAliasingSuite.scala | 21 +++++++++++++++++++ 2 files changed, 29 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala index ca3c14177e6b..3b1b353bd3bf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala @@ -218,6 +218,11 @@ object NestedColumnAliasing { case _ => false } + private def canTransform(ev: Expression): Boolean = { + // we can not alias the attr from lambda variable whose expr id is not available + !ev.exists(_.isInstanceOf[NamedLambdaVariable]) && ev.references.size == 1 + } + /** * Returns two types of expressions: * - Root references that are individually accessed @@ -226,11 +231,11 @@ object NestedColumnAliasing { */ private def collectRootReferenceAndExtractValue(e: Expression): Seq[Expression] = e match { case _: AttributeReference => Seq(e) - case GetStructField(_: ExtractValue | _: AttributeReference, _, _) => Seq(e) + case GetStructField(_: ExtractValue | _: AttributeReference, _, _) if canTransform(e) => Seq(e) case GetArrayStructFields(_: MapValues | _: MapKeys | _: ExtractValue | - _: AttributeReference, _, _, _, _) => Seq(e) + _: AttributeReference, _, _, _, _) if canTransform(e) => Seq(e) case es if es.children.nonEmpty => es.children.flatMap(collectRootReferenceAndExtractValue) case _ => Seq.empty } @@ -249,13 +254,8 @@ object NestedColumnAliasing { val otherRootReferences = new mutable.ArrayBuffer[AttributeReference]() exprList.foreach { e => extractor(e).foreach { - // we can not alias the attr from lambda variable whose expr id is not available - case ev: ExtractValue if !ev.exists(_.isInstanceOf[NamedLambdaVariable]) => - if (ev.references.size == 1) { - nestedFieldReferences.append(ev) - } + case ev: ExtractValue => nestedFieldReferences.append(ev) case ar: AttributeReference => otherRootReferences.append(ar) - case _ => // ignore } } val exclusiveAttrSet = AttributeSet(exclusiveAttrs ++ otherRootReferences) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala index bd0cc6216f7a..38cd25cf491a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala @@ -863,6 +863,27 @@ class NestedColumnAliasingSuite extends SchemaPruningTest { // The plan is expected to be unchanged. comparePlans(plan, RemoveNoopOperators.apply(optimized.get)) } + + test("SPARK-48428: Do not pushdown when attr is used in expression with mutliple references") { + val query = contact + .limit(5) + .select( + GetStructField(GetStructField(CreateStruct(Seq($"id", $"employer")), 1), 0), + $"employer.id") + .analyze + + val optimized = Optimize.execute(query) + + val expected = contact + .select($"id", $"employer") + .limit(5) + .select( + GetStructField(GetStructField(CreateStruct(Seq($"id", $"employer")), 1), 0), + $"employer.id") + .analyze + + comparePlans(optimized, expected) + } } object NestedColumnAliasingSuite { From 57b2701abeee5d0004f51e98fd915868988e8558 Mon Sep 17 00:00:00 2001 From: Emil Ejbyfeldt Date: Mon, 27 May 2024 10:40:19 +0200 Subject: [PATCH 2/2] Better name --- .../spark/sql/catalyst/optimizer/NestedColumnAliasing.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala index 3b1b353bd3bf..8de2663a9809 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala @@ -218,7 +218,7 @@ object NestedColumnAliasing { case _ => false } - private def canTransform(ev: Expression): Boolean = { + private def canAlias(ev: Expression): Boolean = { // we can not alias the attr from lambda variable whose expr id is not available !ev.exists(_.isInstanceOf[NamedLambdaVariable]) && ev.references.size == 1 } @@ -231,11 +231,11 @@ object NestedColumnAliasing { */ private def collectRootReferenceAndExtractValue(e: Expression): Seq[Expression] = e match { case _: AttributeReference => Seq(e) - case GetStructField(_: ExtractValue | _: AttributeReference, _, _) if canTransform(e) => Seq(e) + case GetStructField(_: ExtractValue | _: AttributeReference, _, _) if canAlias(e) => Seq(e) case GetArrayStructFields(_: MapValues | _: MapKeys | _: ExtractValue | - _: AttributeReference, _, _, _, _) if canTransform(e) => Seq(e) + _: AttributeReference, _, _, _, _) if canAlias(e) => Seq(e) case es if es.children.nonEmpty => es.children.flatMap(collectRootReferenceAndExtractValue) case _ => Seq.empty }