Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper {
}
}

private def dedupJoin(joinPlan: Join): Join = joinPlan match {
private def dedupJoin(joinPlan: LogicalPlan): LogicalPlan = joinPlan match {
// SPARK-21835: It is possibly that the two sides of the join have conflicting attributes,
// the produced join then becomes unresolved and break structural integrity. We should
// de-duplicate conflicting attributes. We don't use transformation here because we only
// care about the most top join converted from correlated predicate subquery.
case j @ Join(left, right, joinType @ (LeftSemi | LeftAnti), joinCond) =>
case j @ Join(left, right, joinType @ (LeftSemi | LeftAnti | ExistenceJoin(_)), joinCond) =>
val duplicates = right.outputSet.intersect(left.outputSet)
if (duplicates.nonEmpty) {
val aliasMap = AttributeMap(duplicates.map { dup =>
Expand Down Expand Up @@ -145,13 +145,16 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper {
e transformUp {
case Exists(sub, conditions, _) =>
val exists = AttributeReference("exists", BooleanType, nullable = false)()
newPlan = Join(newPlan, sub, ExistenceJoin(exists), conditions.reduceLeftOption(And))
// Deduplicate conflicting attributes if any.
newPlan = dedupJoin(
Join(newPlan, sub, ExistenceJoin(exists), conditions.reduceLeftOption(And)))
exists
case In(value, Seq(ListQuery(sub, conditions, _, _))) =>
val exists = AttributeReference("exists", BooleanType, nullable = false)()
val inConditions = getValueExpression(value).zip(sub.output).map(EqualTo.tupled)
val newConditions = (inConditions ++ conditions).reduceLeftOption(And)
newPlan = Join(newPlan, sub, ExistenceJoin(exists), newConditions)
// Deduplicate conflicting attributes if any.
newPlan = dedupJoin(Join(newPlan, sub, ExistenceJoin(exists), newConditions))
exists
}
}
Expand Down
12 changes: 12 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -938,4 +938,16 @@ class SubquerySuite extends QueryTest with SharedSQLContext {
}
}
}

test("SPARK-21835: Join in correlated subquery should be duplicateResolved: case 3") {
val sqlText =
"""
|SELECT * FROM l, r WHERE l.a = r.c + 1 AND
|(EXISTS (SELECT * FROM r) OR l.a = r.c)
""".stripMargin
val optimizedPlan = sql(sqlText).queryExecution.optimizedPlan
val join = optimizedPlan.collectFirst { case j: Join => j }.get
assert(join.duplicateResolved)
assert(optimizedPlan.resolved)
}
}