Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add a new rule to solve illegal references
  • Loading branch information
maropu committed May 12, 2016
commit b56da9f41ed3c62ec17bd79df6bb27728936026e
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ class Analyzer(
TimeWindowing ::
HiveTypeCoercion.typeCoercionRules ++
extendedResolutionRules : _*),
Batch("Solve", Once,
SolveIllegalReferences),
Batch("Nondeterministic", Once,
PullOutNondeterministic),
Batch("UDF", Once,
Expand Down Expand Up @@ -1459,6 +1461,30 @@ class Analyzer(
}
}

/**
* Replaces attribute references in a filter if it has a join as a child and it references some
* columns on the base relations of the join. This is because outer joins change nullability on
* columns and this could cause wrong NULL propagation in Optimizer.
* See SPARK-13484 for the concrete query of this case.
*/
object SolveIllegalReferences extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case q: LogicalPlan =>
q.transform {
case f @ Filter(filterCondition, j @ Join(_, _, _, _)) =>
val joinOutput = new ArrayBuffer[(Attribute, Attribute)]
j.output.map {
case a: AttributeReference => joinOutput += ((a, a))
}
val joinOutputMap = AttributeMap(joinOutput)
val newFilterCond = filterCondition.transform {
case a: AttributeReference => joinOutputMap.get(a).getOrElse(a)
}
Filter(newFilterCond, j)
}
}
}

/**
* Extracts [[WindowExpression]]s from the projectList of a [[Project]] operator and
* aggregateExpressions of an [[Aggregate]] operator and creates individual [[Window]]
Expand Down