Skip to content

Commit 82285b5

Browse files
cloud-fanJackey Lee
authored andcommitted
[SPARK-26459][SQL] replace UpdateNullabilityInAttributeReferences with FixNullability
## What changes were proposed in this pull request? This is a followup of apache#18576 The newly added rule `UpdateNullabilityInAttributeReferences` does the same thing the `FixNullability` does, we only need to keep one of them. This PR removes `UpdateNullabilityInAttributeReferences`, and use `FixNullability` to replace it. Also rename it to `UpdateAttributeNullability` ## How was this patch tested? existing tests Closes apache#23390 from cloud-fan/nullable. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Takeshi Yamamuro <[email protected]>
1 parent 6129fb5 commit 82285b5

File tree

4 files changed

+65
-57
lines changed

4 files changed

+65
-57
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 2 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -197,8 +197,8 @@ class Analyzer(
197197
PullOutNondeterministic),
198198
Batch("UDF", Once,
199199
HandleNullInputsForUDF),
200-
Batch("FixNullability", Once,
201-
FixNullability),
200+
Batch("UpdateNullability", Once,
201+
UpdateAttributeNullability),
202202
Batch("Subquery", Once,
203203
UpdateOuterReferences),
204204
Batch("Cleanup", fixedPoint,
@@ -1821,40 +1821,6 @@ class Analyzer(
18211821
}
18221822
}
18231823

1824-
/**
1825-
* Fixes nullability of Attributes in a resolved LogicalPlan by using the nullability of
1826-
* corresponding Attributes of its children output Attributes. This step is needed because
1827-
* users can use a resolved AttributeReference in the Dataset API and outer joins
1828-
* can change the nullability of an AttribtueReference. Without the fix, a nullable column's
1829-
* nullable field can be actually set as non-nullable, which cause illegal optimization
1830-
* (e.g., NULL propagation) and wrong answers.
1831-
* See SPARK-13484 and SPARK-13801 for the concrete queries of this case.
1832-
*/
1833-
object FixNullability extends Rule[LogicalPlan] {
1834-
1835-
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
1836-
case p if !p.resolved => p // Skip unresolved nodes.
1837-
case p: LogicalPlan if p.resolved =>
1838-
val childrenOutput = p.children.flatMap(c => c.output).groupBy(_.exprId).flatMap {
1839-
case (exprId, attributes) =>
1840-
// If there are multiple Attributes having the same ExprId, we need to resolve
1841-
// the conflict of nullable field. We do not really expect this happen.
1842-
val nullable = attributes.exists(_.nullable)
1843-
attributes.map(attr => attr.withNullability(nullable))
1844-
}.toSeq
1845-
// At here, we create an AttributeMap that only compare the exprId for the lookup
1846-
// operation. So, we can find the corresponding input attribute's nullability.
1847-
val attributeMap = AttributeMap[Attribute](childrenOutput.map(attr => attr -> attr))
1848-
// For an Attribute used by the current LogicalPlan, if it is from its children,
1849-
// we fix the nullable field by using the nullability setting of the corresponding
1850-
// output Attribute from the children.
1851-
p.transformExpressions {
1852-
case attr: Attribute if attributeMap.contains(attr) =>
1853-
attr.withNullability(attributeMap(attr).nullable)
1854-
}
1855-
}
1856-
}
1857-
18581824
/**
18591825
* Extracts [[WindowExpression]]s from the projectList of a [[Project]] operator and
18601826
* aggregateExpressions of an [[Aggregate]] operator and creates individual [[Window]]
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.analysis
19+
20+
import org.apache.spark.sql.catalyst.expressions.Attribute
21+
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
22+
import org.apache.spark.sql.catalyst.rules.Rule
23+
24+
/**
25+
* Updates nullability of Attributes in a resolved LogicalPlan by using the nullability of
26+
* corresponding Attributes of its children output Attributes. This step is needed because
27+
* users can use a resolved AttributeReference in the Dataset API and outer joins
28+
* can change the nullability of an AttribtueReference. Without this rule, a nullable column's
29+
* nullable field can be actually set as non-nullable, which cause illegal optimization
30+
* (e.g., NULL propagation) and wrong answers.
31+
* See SPARK-13484 and SPARK-13801 for the concrete queries of this case.
32+
*
33+
* This rule should be executed again at the end of optimization phase, as optimizer may change
34+
* some expressions and their nullabilities as well. See SPARK-21351 for more details.
35+
*/
36+
object UpdateAttributeNullability extends Rule[LogicalPlan] {
37+
38+
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
39+
// Skip unresolved nodes.
40+
case p if !p.resolved => p
41+
// Skip leaf node, as it has no child and no need to update nullability.
42+
case p: LeafNode => p
43+
case p: LogicalPlan =>
44+
val nullabilities = p.children.flatMap(c => c.output).groupBy(_.exprId).map {
45+
// If there are multiple Attributes having the same ExprId, we need to resolve
46+
// the conflict of nullable field. We do not really expect this to happen.
47+
case (exprId, attributes) => exprId -> attributes.exists(_.nullable)
48+
}
49+
// For an Attribute used by the current LogicalPlan, if it is from its children,
50+
// we fix the nullable field by using the nullability setting of the corresponding
51+
// output Attribute from the children.
52+
p.transformExpressions {
53+
case attr: Attribute if nullabilities.contains(attr.exprId) =>
54+
attr.withNullability(nullabilities(attr.exprId))
55+
}
56+
}
57+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -179,8 +179,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
179179
ColumnPruning,
180180
CollapseProject,
181181
RemoveNoopOperators) :+
182-
Batch("UpdateAttributeReferences", Once,
183-
UpdateNullabilityInAttributeReferences) :+
182+
Batch("UpdateNullability", Once, UpdateAttributeNullability) :+
184183
// This batch must be executed after the `RewriteSubquery` batch, which creates joins.
185184
Batch("NormalizeFloatingNumbers", Once, NormalizeFloatingNumbers)
186185
}
@@ -1647,18 +1646,3 @@ object RemoveRepetitionFromGroupExpressions extends Rule[LogicalPlan] {
16471646
}
16481647
}
16491648
}
1650-
1651-
/**
1652-
* Updates nullability in [[AttributeReference]]s if nullability is different between
1653-
* non-leaf plan's expressions and the children output.
1654-
*/
1655-
object UpdateNullabilityInAttributeReferences extends Rule[LogicalPlan] {
1656-
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
1657-
case p if !p.isInstanceOf[LeafNode] =>
1658-
val nullabilityMap = AttributeMap(p.children.flatMap(_.output).map { x => x -> x.nullable })
1659-
p transformExpressions {
1660-
case ar: AttributeReference if nullabilityMap.contains(ar) =>
1661-
ar.withNullability(nullabilityMap(ar))
1662-
}
1663-
}
1664-
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UpdateNullabilityInAttributeReferencesSuite.scala renamed to sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UpdateAttributeNullabilityInOptimizerSuite.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.sql.catalyst.optimizer
1919

20+
import org.apache.spark.sql.catalyst.analysis.UpdateAttributeNullability
2021
import org.apache.spark.sql.catalyst.dsl.expressions._
2122
import org.apache.spark.sql.catalyst.dsl.plans._
2223
import org.apache.spark.sql.catalyst.expressions.{CreateArray, GetArrayItem}
@@ -25,7 +26,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
2526
import org.apache.spark.sql.catalyst.rules.RuleExecutor
2627

2728

28-
class UpdateNullabilityInAttributeReferencesSuite extends PlanTest {
29+
class UpdateAttributeNullabilityInOptimizerSuite extends PlanTest {
2930

3031
object Optimizer extends RuleExecutor[LogicalPlan] {
3132
val batches =
@@ -36,8 +37,8 @@ class UpdateNullabilityInAttributeReferencesSuite extends PlanTest {
3637
SimplifyConditionals,
3738
SimplifyBinaryComparison,
3839
SimplifyExtractValueOps) ::
39-
Batch("UpdateAttributeReferences", Once,
40-
UpdateNullabilityInAttributeReferences) :: Nil
40+
Batch("UpdateNullability", Once,
41+
UpdateAttributeNullability) :: Nil
4142
}
4243

4344
test("update nullability in AttributeReference") {
@@ -46,7 +47,7 @@ class UpdateNullabilityInAttributeReferencesSuite extends PlanTest {
4647
// nullable AttributeReference to `b`, because both array indexing and map lookup are
4748
// nullable expressions. After optimization, the same attribute is now non-nullable,
4849
// but the AttributeReference is not updated to reflect this. So, we need to update nullability
49-
// by the `UpdateNullabilityInAttributeReferences` rule.
50+
// by the `UpdateAttributeNullability` rule.
5051
val original = rel
5152
.select(GetArrayItem(CreateArray(Seq('a, 'a + 1L)), 0) as "b")
5253
.groupBy($"b")("1")

0 commit comments

Comments
 (0)