Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Address comments.
  • Loading branch information
viirya committed Sep 6, 2017
commit c1325fb9b1f8501b1a31b61e9b39bf1213b021f7
Original file line number Diff line number Diff line change
Expand Up @@ -49,28 +49,31 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper {
}
}

def dedupJoin(plan: LogicalPlan): LogicalPlan = {
plan transform {
case j @ Join(left, right, joinType, joinCond) =>
val duplicates = right.outputSet.intersect(left.outputSet)
if (duplicates.nonEmpty) {
val aliasMap = AttributeMap(duplicates.map { dup =>
dup -> Alias(dup, dup.toString)()
}.toSeq)
val aliasedExpressions = right.output.map { ref =>
aliasMap.getOrElse(ref, ref)
}
val newRight = Project(aliasedExpressions, right)
val newJoinCond = joinCond.map { condExpr =>
condExpr transform {
case a: Attribute => aliasMap.getOrElse(a, a).toAttribute
}
private def dedupJoin(joinPlan: Join): Join = joinPlan match {
// SPARK-21835: It is possibly that the two sides of the join have conflicting attributes,
// the produced join then becomes unresolved and break structural integrity. We should
// de-duplicate conflicting attributes. We don't use transformation here because we only
// care about the most top join converted from correlated predicate subquery.
case j @ Join(left, right, joinType @ (LeftSemi | LeftAnti), joinCond) =>
val duplicates = right.outputSet.intersect(left.outputSet)
if (duplicates.nonEmpty) {
val aliasMap = AttributeMap(duplicates.map { dup =>
dup -> Alias(dup, dup.toString)()
}.toSeq)
val aliasedExpressions = right.output.map { ref =>
aliasMap.getOrElse(ref, ref)
}
val newRight = Project(aliasedExpressions, right)
val newJoinCond = joinCond.map { condExpr =>
condExpr transform {
case a: Attribute => aliasMap.getOrElse(a, a).toAttribute
}
Join(left, newRight, joinType, newJoinCond)
} else {
j
}
}
Join(left, newRight, joinType, newJoinCond)
} else {
j
}
case _ => joinPlan
}

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
Expand All @@ -85,17 +88,20 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper {
}

// Filter the plan by applying left semi and left anti joins.
val rewritten = withSubquery.foldLeft(newFilter) {
withSubquery.foldLeft(newFilter) {
case (p, Exists(sub, conditions, _)) =>
val (joinCond, outerPlan) = rewriteExistentialExpr(conditions, p)
Join(outerPlan, sub, LeftSemi, joinCond)
// Deduplicate conflicting attributes if any.
dedupJoin(Join(outerPlan, sub, LeftSemi, joinCond))
case (p, Not(Exists(sub, conditions, _))) =>
val (joinCond, outerPlan) = rewriteExistentialExpr(conditions, p)
Join(outerPlan, sub, LeftAnti, joinCond)
// Deduplicate conflicting attributes if any.
dedupJoin(Join(outerPlan, sub, LeftAnti, joinCond))
case (p, In(value, Seq(ListQuery(sub, conditions, _, _)))) =>
val inConditions = getValueExpression(value).zip(sub.output).map(EqualTo.tupled)
val (joinCond, outerPlan) = rewriteExistentialExpr(inConditions ++ conditions, p)
Join(outerPlan, sub, LeftSemi, joinCond)
// Deduplicate conflicting attributes if any.
dedupJoin(Join(outerPlan, sub, LeftSemi, joinCond))
case (p, Not(In(value, Seq(ListQuery(sub, conditions, _, _))))) =>
// This is a NULL-aware (left) anti join (NAAJ) e.g. col NOT IN expr
// Construct the condition. A NULL in one of the conditions is regarded as a positive
Expand All @@ -117,12 +123,12 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper {
// will have the final conditions in the LEFT ANTI as
// (A.A1 = B.B1 OR ISNULL(A.A1 = B.B1)) AND (B.B2 = A.A2)
val pairs = (joinConds.map(c => Or(c, IsNull(c))) ++ conditions).reduceLeft(And)
Join(outerPlan, sub, LeftAnti, Option(pairs))
// Deduplicate conflicting attributes if any.
dedupJoin(Join(outerPlan, sub, LeftAnti, Option(pairs)))
case (p, predicate) =>
val (newCond, inputPlan) = rewriteExistentialExpr(Seq(predicate), p)
Project(p.output, Filter(newCond.get, inputPlan))
}
dedupJoin(rewritten)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -889,7 +889,6 @@ class SubquerySuite extends QueryTest with SharedSQLContext {
|WHERE
|NOT EXISTS (SELECT * FROM t1)
""".stripMargin
val ds = sql(sqlText)
val optimizedPlan = sql(sqlText).queryExecution.optimizedPlan
val join = optimizedPlan.collect {
case j: Join => j
Expand Down