From feb57c00508560a08100a9f78c1245cfa80474de Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 27 Dec 2018 22:23:05 +0800 Subject: [PATCH 1/4] reuse the FixNullability rule --- .../sql/catalyst/analysis/Analyzer.scala | 78 ++++++++++--------- .../sql/catalyst/optimizer/Optimizer.scala | 18 +---- ...> UpdateNullabilityInOptimizerSuite.scala} | 7 +- 3 files changed, 47 insertions(+), 56 deletions(-) rename sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/{UpdateNullabilityInAttributeReferencesSuite.scala => UpdateNullabilityInOptimizerSuite.scala} (92%) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 2aa0f2117364..d5054dc610e3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -197,8 +197,8 @@ class Analyzer( PullOutNondeterministic), Batch("UDF", Once, HandleNullInputsForUDF), - Batch("FixNullability", Once, - FixNullability), + Batch("UpdateNullability", Once, + UpdateNullability), Batch("Subquery", Once, UpdateOuterReferences), Batch("Cleanup", fixedPoint, @@ -1821,40 +1821,6 @@ class Analyzer( } } - /** - * Fixes nullability of Attributes in a resolved LogicalPlan by using the nullability of - * corresponding Attributes of its children output Attributes. This step is needed because - * users can use a resolved AttributeReference in the Dataset API and outer joins - * can change the nullability of an AttribtueReference. Without the fix, a nullable column's - * nullable field can be actually set as non-nullable, which cause illegal optimization - * (e.g., NULL propagation) and wrong answers. - * See SPARK-13484 and SPARK-13801 for the concrete queries of this case. - */ - object FixNullability extends Rule[LogicalPlan] { - - def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp { - case p if !p.resolved => p // Skip unresolved nodes. - case p: LogicalPlan if p.resolved => - val childrenOutput = p.children.flatMap(c => c.output).groupBy(_.exprId).flatMap { - case (exprId, attributes) => - // If there are multiple Attributes having the same ExprId, we need to resolve - // the conflict of nullable field. We do not really expect this happen. - val nullable = attributes.exists(_.nullable) - attributes.map(attr => attr.withNullability(nullable)) - }.toSeq - // At here, we create an AttributeMap that only compare the exprId for the lookup - // operation. So, we can find the corresponding input attribute's nullability. - val attributeMap = AttributeMap[Attribute](childrenOutput.map(attr => attr -> attr)) - // For an Attribute used by the current LogicalPlan, if it is from its children, - // we fix the nullable field by using the nullability setting of the corresponding - // output Attribute from the children. - p.transformExpressions { - case attr: Attribute if attributeMap.contains(attr) => - attr.withNullability(attributeMap(attr).nullable) - } - } - } - /** * Extracts [[WindowExpression]]s from the projectList of a [[Project]] operator and * aggregateExpressions of an [[Aggregate]] operator and creates individual [[Window]] @@ -2848,3 +2814,43 @@ object UpdateOuterReferences extends Rule[LogicalPlan] { } } } + +/** + * Updates nullability of Attributes in a resolved LogicalPlan by using the nullability of + * corresponding Attributes of its children output Attributes. This step is needed because + * users can use a resolved AttributeReference in the Dataset API and outer joins + * can change the nullability of an AttribtueReference. Without this rule, a nullable column's + * nullable field can be actually set as non-nullable, which cause illegal optimization + * (e.g., NULL propagation) and wrong answers. + * See SPARK-13484 and SPARK-13801 for the concrete queries of this case. + * + * This rule should be executed again at the end of optimization phase, as optimizer may change + * some expressions and their nullabilities as well. See SPARK-21351 for more details. + */ +object UpdateNullability extends Rule[LogicalPlan] { + + def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp { + // Skip unresolved nodes. + case p if !p.resolved => p + // Skip leaf node, as it has no child and no need to update nullability. + case p: LeafNode => p + case p: LogicalPlan => + val childrenOutput = p.children.flatMap(c => c.output).groupBy(_.exprId).flatMap { + case (exprId, attributes) => + // If there are multiple Attributes having the same ExprId, we need to resolve + // the conflict of nullable field. We do not really expect this happen. + val nullable = attributes.exists(_.nullable) + attributes.map(attr => attr.withNullability(nullable)) + }.toSeq + // At here, we create an AttributeMap that only compare the exprId for the lookup + // operation. So, we can find the corresponding input attribute's nullability. + val attributeMap = AttributeMap[Attribute](childrenOutput.map(attr => attr -> attr)) + // For an Attribute used by the current LogicalPlan, if it is from its children, + // we fix the nullable field by using the nullability setting of the corresponding + // output Attribute from the children. + p.transformExpressions { + case attr: Attribute if attributeMap.contains(attr) => + attr.withNullability(attributeMap(attr).nullable) + } + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index d51dc6663d43..65710f8baf03 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -179,8 +179,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) ColumnPruning, CollapseProject, RemoveNoopOperators) :+ - Batch("UpdateAttributeReferences", Once, - UpdateNullabilityInAttributeReferences) :+ + Batch("UpdateNullability", Once, UpdateNullability) :+ // This batch must be executed after the `RewriteSubquery` batch, which creates joins. Batch("NormalizeFloatingNumbers", Once, NormalizeFloatingNumbers) } @@ -1647,18 +1646,3 @@ object RemoveRepetitionFromGroupExpressions extends Rule[LogicalPlan] { } } } - -/** - * Updates nullability in [[AttributeReference]]s if nullability is different between - * non-leaf plan's expressions and the children output. - */ -object UpdateNullabilityInAttributeReferences extends Rule[LogicalPlan] { - def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { - case p if !p.isInstanceOf[LeafNode] => - val nullabilityMap = AttributeMap(p.children.flatMap(_.output).map { x => x -> x.nullable }) - p transformExpressions { - case ar: AttributeReference if nullabilityMap.contains(ar) => - ar.withNullability(nullabilityMap(ar)) - } - } -} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UpdateNullabilityInAttributeReferencesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UpdateNullabilityInOptimizerSuite.scala similarity index 92% rename from sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UpdateNullabilityInAttributeReferencesSuite.scala rename to sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UpdateNullabilityInOptimizerSuite.scala index 09b11f5aba2a..951f295a4c37 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UpdateNullabilityInAttributeReferencesSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UpdateNullabilityInOptimizerSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.optimizer +import org.apache.spark.sql.catalyst.analysis.UpdateNullability import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions.{CreateArray, GetArrayItem} @@ -25,7 +26,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.rules.RuleExecutor -class UpdateNullabilityInAttributeReferencesSuite extends PlanTest { +class UpdateNullabilityInOptimizerSuite extends PlanTest { object Optimizer extends RuleExecutor[LogicalPlan] { val batches = @@ -36,8 +37,8 @@ class UpdateNullabilityInAttributeReferencesSuite extends PlanTest { SimplifyConditionals, SimplifyBinaryComparison, SimplifyExtractValueOps) :: - Batch("UpdateAttributeReferences", Once, - UpdateNullabilityInAttributeReferences) :: Nil + Batch("UpdateNullability", Once, + UpdateNullability) :: Nil } test("update nullability in AttributeReference") { From e328ba368243d61c056265a962a471a3b006a8ca Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 28 Dec 2018 12:22:34 +0800 Subject: [PATCH 2/4] address comments --- .../sql/catalyst/analysis/Analyzer.scala | 42 +------------ .../analysis/UpdateAttributeNullability.scala | 62 +++++++++++++++++++ .../sql/catalyst/optimizer/Optimizer.scala | 2 +- ...ttributeNullabilityInOptimizerSuite.scala} | 6 +- 4 files changed, 67 insertions(+), 45 deletions(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UpdateAttributeNullability.scala rename sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/{UpdateNullabilityInOptimizerSuite.scala => UpdateAttributeNullabilityInOptimizerSuite.scala} (92%) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index d5054dc610e3..a84bb7653c52 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -198,7 +198,7 @@ class Analyzer( Batch("UDF", Once, HandleNullInputsForUDF), Batch("UpdateNullability", Once, - UpdateNullability), + UpdateAttributeNullability), Batch("Subquery", Once, UpdateOuterReferences), Batch("Cleanup", fixedPoint, @@ -2814,43 +2814,3 @@ object UpdateOuterReferences extends Rule[LogicalPlan] { } } } - -/** - * Updates nullability of Attributes in a resolved LogicalPlan by using the nullability of - * corresponding Attributes of its children output Attributes. This step is needed because - * users can use a resolved AttributeReference in the Dataset API and outer joins - * can change the nullability of an AttribtueReference. Without this rule, a nullable column's - * nullable field can be actually set as non-nullable, which cause illegal optimization - * (e.g., NULL propagation) and wrong answers. - * See SPARK-13484 and SPARK-13801 for the concrete queries of this case. - * - * This rule should be executed again at the end of optimization phase, as optimizer may change - * some expressions and their nullabilities as well. See SPARK-21351 for more details. - */ -object UpdateNullability extends Rule[LogicalPlan] { - - def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp { - // Skip unresolved nodes. - case p if !p.resolved => p - // Skip leaf node, as it has no child and no need to update nullability. - case p: LeafNode => p - case p: LogicalPlan => - val childrenOutput = p.children.flatMap(c => c.output).groupBy(_.exprId).flatMap { - case (exprId, attributes) => - // If there are multiple Attributes having the same ExprId, we need to resolve - // the conflict of nullable field. We do not really expect this happen. - val nullable = attributes.exists(_.nullable) - attributes.map(attr => attr.withNullability(nullable)) - }.toSeq - // At here, we create an AttributeMap that only compare the exprId for the lookup - // operation. So, we can find the corresponding input attribute's nullability. - val attributeMap = AttributeMap[Attribute](childrenOutput.map(attr => attr -> attr)) - // For an Attribute used by the current LogicalPlan, if it is from its children, - // we fix the nullable field by using the nullability setting of the corresponding - // output Attribute from the children. - p.transformExpressions { - case attr: Attribute if attributeMap.contains(attr) => - attr.withNullability(attributeMap(attr).nullable) - } - } -} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UpdateAttributeNullability.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UpdateAttributeNullability.scala new file mode 100644 index 000000000000..a53fd2be6520 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UpdateAttributeNullability.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule + +/** + * Updates nullability of Attributes in a resolved LogicalPlan by using the nullability of + * corresponding Attributes of its children output Attributes. This step is needed because + * users can use a resolved AttributeReference in the Dataset API and outer joins + * can change the nullability of an AttribtueReference. Without this rule, a nullable column's + * nullable field can be actually set as non-nullable, which cause illegal optimization + * (e.g., NULL propagation) and wrong answers. + * See SPARK-13484 and SPARK-13801 for the concrete queries of this case. + * + * This rule should be executed again at the end of optimization phase, as optimizer may change + * some expressions and their nullabilities as well. See SPARK-21351 for more details. + */ +object UpdateAttributeNullability extends Rule[LogicalPlan] { + + def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp { + // Skip unresolved nodes. + case p if !p.resolved => p + // Skip leaf node, as it has no child and no need to update nullability. + case p: LeafNode => p + case p: LogicalPlan => + val childrenOutput = p.children.flatMap(c => c.output).groupBy(_.exprId).flatMap { + case (exprId, attributes) => + // If there are multiple Attributes having the same ExprId, we need to resolve + // the conflict of nullable field. We do not really expect this happen. + val nullable = attributes.exists(_.nullable) + attributes.map(attr => attr.withNullability(nullable)) + }.toSeq + // At here, we create an AttributeMap that only compare the exprId for the lookup + // operation. So, we can find the corresponding input attribute's nullability. + val attributeMap = AttributeMap[Attribute](childrenOutput.map(attr => attr -> attr)) + // For an Attribute used by the current LogicalPlan, if it is from its children, + // we fix the nullable field by using the nullability setting of the corresponding + // output Attribute from the children. + p.transformExpressions { + case attr: Attribute if attributeMap.contains(attr) => + attr.withNullability(attributeMap(attr).nullable) + } + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 65710f8baf03..d92f7f860b1b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -179,7 +179,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) ColumnPruning, CollapseProject, RemoveNoopOperators) :+ - Batch("UpdateNullability", Once, UpdateNullability) :+ + Batch("UpdateNullability", Once, UpdateAttributeNullability) :+ // This batch must be executed after the `RewriteSubquery` batch, which creates joins. Batch("NormalizeFloatingNumbers", Once, NormalizeFloatingNumbers) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UpdateNullabilityInOptimizerSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UpdateAttributeNullabilityInOptimizerSuite.scala similarity index 92% rename from sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UpdateNullabilityInOptimizerSuite.scala rename to sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UpdateAttributeNullabilityInOptimizerSuite.scala index 951f295a4c37..07b83479dc11 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UpdateNullabilityInOptimizerSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UpdateAttributeNullabilityInOptimizerSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.optimizer -import org.apache.spark.sql.catalyst.analysis.UpdateNullability +import org.apache.spark.sql.catalyst.analysis.UpdateAttributeNullability import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions.{CreateArray, GetArrayItem} @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.rules.RuleExecutor -class UpdateNullabilityInOptimizerSuite extends PlanTest { +class UpdateAttributeNullabilityInOptimizerSuite extends PlanTest { object Optimizer extends RuleExecutor[LogicalPlan] { val batches = @@ -38,7 +38,7 @@ class UpdateNullabilityInOptimizerSuite extends PlanTest { SimplifyBinaryComparison, SimplifyExtractValueOps) :: Batch("UpdateNullability", Once, - UpdateNullability) :: Nil + UpdateAttributeNullability) :: Nil } test("update nullability in AttributeReference") { From 5930cd7bbdf653a040d4dd38557dff6adc70265e Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 28 Dec 2018 12:23:54 +0800 Subject: [PATCH 3/4] fix comment --- .../optimizer/UpdateAttributeNullabilityInOptimizerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UpdateAttributeNullabilityInOptimizerSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UpdateAttributeNullabilityInOptimizerSuite.scala index 07b83479dc11..6d6f799b830f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UpdateAttributeNullabilityInOptimizerSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UpdateAttributeNullabilityInOptimizerSuite.scala @@ -47,7 +47,7 @@ class UpdateAttributeNullabilityInOptimizerSuite extends PlanTest { // nullable AttributeReference to `b`, because both array indexing and map lookup are // nullable expressions. After optimization, the same attribute is now non-nullable, // but the AttributeReference is not updated to reflect this. So, we need to update nullability - // by the `UpdateNullabilityInAttributeReferences` rule. + // by the `UpdateAttributeNullability` rule. val original = rel .select(GetArrayItem(CreateArray(Seq('a, 'a + 1L)), 0) as "b") .groupBy($"b")("1") From bcb56670c4d35df880ff7b4dbe1d29d1359e7791 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 4 Jan 2019 21:11:25 +0800 Subject: [PATCH 4/4] address comment --- .../analysis/UpdateAttributeNullability.scala | 21 +++++++------------ 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UpdateAttributeNullability.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UpdateAttributeNullability.scala index a53fd2be6520..8655decdcf27 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UpdateAttributeNullability.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UpdateAttributeNullability.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.analysis -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap} +import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan} import org.apache.spark.sql.catalyst.rules.Rule @@ -41,22 +41,17 @@ object UpdateAttributeNullability extends Rule[LogicalPlan] { // Skip leaf node, as it has no child and no need to update nullability. case p: LeafNode => p case p: LogicalPlan => - val childrenOutput = p.children.flatMap(c => c.output).groupBy(_.exprId).flatMap { - case (exprId, attributes) => - // If there are multiple Attributes having the same ExprId, we need to resolve - // the conflict of nullable field. We do not really expect this happen. - val nullable = attributes.exists(_.nullable) - attributes.map(attr => attr.withNullability(nullable)) - }.toSeq - // At here, we create an AttributeMap that only compare the exprId for the lookup - // operation. So, we can find the corresponding input attribute's nullability. - val attributeMap = AttributeMap[Attribute](childrenOutput.map(attr => attr -> attr)) + val nullabilities = p.children.flatMap(c => c.output).groupBy(_.exprId).map { + // If there are multiple Attributes having the same ExprId, we need to resolve + // the conflict of nullable field. We do not really expect this to happen. + case (exprId, attributes) => exprId -> attributes.exists(_.nullable) + } // For an Attribute used by the current LogicalPlan, if it is from its children, // we fix the nullable field by using the nullability setting of the corresponding // output Attribute from the children. p.transformExpressions { - case attr: Attribute if attributeMap.contains(attr) => - attr.withNullability(attributeMap(attr).nullable) + case attr: Attribute if nullabilities.contains(attr.exprId) => + attr.withNullability(nullabilities(attr.exprId)) } } }