Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
address comment
  • Loading branch information
cloud-fan committed Jan 10, 2019
commit bcb56670c4d35df880ff7b4dbe1d29d1359e7791
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.catalyst.analysis
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It still looks weird to me if we call an analyzer rule in the optimizer. Our codegen impl depends on the correctness of nullability fields. I am wondering which rule could break it? join reordering?

I think our existing test cases might already have such a case. Could you throw an exception if this rule changes the nullability in the optimizer stage? I want to know the exact case why we need to run this in the optimizer stage.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future, if we introduce fixed-size array type, then CreateArray returns fixed-size array, and GetArrayItem can define the nullable smarter if the input is fixed-size array.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu do you have more use cases? If it's the only use case, maybe we can simply remove this optimization as its use case is rare. And we can optimize it in a better way in the future.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we need to understand which cases are improved and then update the nullable at the right place.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Removing it from the optimizer looks ok to me, but I remember the rule seems to be related to the existing tests? See: #18576 (comment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we accept this patch and think about removing this optimization later?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, that sounds good to me. Thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When removing it in a following pr, could you reopen the jira, too? https://issues.apache.org/jira/browse/SPARK-21351

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, feel free to merge this PR if you think it's ready to go. thanks!


import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule

Expand All @@ -41,22 +41,17 @@ object UpdateAttributeNullability extends Rule[LogicalPlan] {
// Skip leaf node, as it has no child and no need to update nullability.
case p: LeafNode => p
case p: LogicalPlan =>
val childrenOutput = p.children.flatMap(c => c.output).groupBy(_.exprId).flatMap {
case (exprId, attributes) =>
// If there are multiple Attributes having the same ExprId, we need to resolve
// the conflict of nullable field. We do not really expect this happen.
val nullable = attributes.exists(_.nullable)
attributes.map(attr => attr.withNullability(nullable))
}.toSeq
// At here, we create an AttributeMap that only compare the exprId for the lookup
// operation. So, we can find the corresponding input attribute's nullability.
val attributeMap = AttributeMap[Attribute](childrenOutput.map(attr => attr -> attr))
val nullabilities = p.children.flatMap(c => c.output).groupBy(_.exprId).map {
// If there are multiple Attributes having the same ExprId, we need to resolve
// the conflict of nullable field. We do not really expect this to happen.
case (exprId, attributes) => exprId -> attributes.exists(_.nullable)
}
// For an Attribute used by the current LogicalPlan, if it is from its children,
// we fix the nullable field by using the nullability setting of the corresponding
// output Attribute from the children.
p.transformExpressions {
case attr: Attribute if attributeMap.contains(attr) =>
attr.withNullability(attributeMap(attr).nullable)
case attr: Attribute if nullabilities.contains(attr.exprId) =>
attr.withNullability(nullabilities(attr.exprId))
}
}
}