Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
b988651
[SPARK-16804][SQL] Correlated subqueries containing LIMIT return inco…
nsyca Jul 29, 2016
069ed8f
[SPARK-16804][SQL] Correlated subqueries containing LIMIT return inco…
nsyca Jul 29, 2016
edca333
New positive test cases
nsyca Jul 30, 2016
64184fd
Fix unit test case failure
nsyca Aug 1, 2016
29f82b0
blocking TABLESAMPLE
nsyca Aug 5, 2016
ac43ab4
Fixing code styling
nsyca Aug 5, 2016
631d396
Correcting Scala test style
nsyca Aug 7, 2016
7eb9b2d
One (last) attempt to correct the Scala style tests
nsyca Aug 8, 2016
1387cf5
Merge remote-tracking branch 'upstream/master'
nsyca Aug 12, 2016
6d9bade
Merge remote-tracking branch 'upstream/master'
nsyca Nov 4, 2016
9a1f80b
Merge remote-tracking branch 'upstream/master'
nsyca Nov 4, 2016
3fe9429
Merge remote-tracking branch 'upstream/master'
nsyca Nov 5, 2016
0757b81
Merge remote-tracking branch 'upstream/master'
nsyca Nov 11, 2016
35b77f0
Merge remote-tracking branch 'upstream/master'
nsyca Nov 12, 2016
c63b8c6
Merge remote-tracking branch 'upstream/master'
nsyca Nov 14, 2016
f3351d5
Merge remote-tracking branch 'upstream/master'
nsyca Nov 18, 2016
9fc5c33
Merge remote-tracking branch 'upstream/master'
nsyca Nov 18, 2016
402e1d9
Merge remote-tracking branch 'upstream/master'
nsyca Nov 22, 2016
b117281
Merge remote-tracking branch 'upstream/master'
nsyca Nov 23, 2016
3023399
Merge remote-tracking branch 'upstream/master'
nsyca Nov 24, 2016
4b692f0
Merge remote-tracking branch 'upstream/master'
nsyca Nov 25, 2016
0d64512
working code #1
nsyca Nov 28, 2016
c8aadb5
Merge remote-tracking branch 'upstream/master'
nsyca Nov 28, 2016
3f184ea
Merge branch 'master' into spark18455.0
nsyca Nov 28, 2016
23e357c
Make the code more concise
nsyca Nov 28, 2016
d60f0de
Fix stylecheck failure
nsyca Nov 28, 2016
2181647
Merge remote-tracking branch 'upstream/master'
nsyca Nov 28, 2016
599f54b
Merge branch 'master' into spark18455.0
nsyca Nov 28, 2016
ca9e1a8
Cosmetic code changes
nsyca Nov 28, 2016
3f4c62a
Address review comment #1
nsyca Nov 30, 2016
c8588de
Merge remote-tracking branch 'upstream/master'
nsyca Nov 30, 2016
05fd7a3
Merge branch 'master' into spark18455.0
nsyca Nov 30, 2016
1d32958
Remove the extra space
nsyca Nov 30, 2016
0c9d0b5
Move LeftSemi to be the same group as LeftOuter
nsyca Dec 1, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Make the code more concise
  • Loading branch information
nsyca committed Nov 28, 2016
commit 23e357ca3fe77cdad62c25ac2a2d4a5fad0ccb94
Original file line number Diff line number Diff line change
Expand Up @@ -1011,9 +1011,9 @@ class Analyzer(
val predicateMap = scala.collection.mutable.Map.empty[LogicalPlan, Seq[Expression]]

// Make sure a plan's subtree does not contain outer references
def failOnOuterReferenceInSubTree(p: LogicalPlan, msg: String): Unit = {
def failOnOuterReferenceInSubTree(p: LogicalPlan): Unit = {
if (p.collect(predicateMap).nonEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets change this line into p.collectFirst(predicateMap).nonEmpty that is a little more efficient.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. I will make the change in the next PR.

failAnalysis(s"Accessing outer query column is not allowed in $msg: $p")
failAnalysis(s"Accessing outer query column is not allowed in:\n$p")
}
}

Expand All @@ -1027,7 +1027,7 @@ class Analyzer(
if (p.expressions.exists(containsOuter)) {
failAnalysis(
"Expressions referencing the outer query are not supported outside of WHERE/HAVING " +
s"clauses: $p")
s"clauses:\n$p")
}
}

Expand Down Expand Up @@ -1078,13 +1078,13 @@ class Analyzer(
val transformed = BooleanSimplification(sub) transformUp {

// Whitelist operators allowed in a correlated subquery
// There are 3 categories:
// 1. Operators that are allowed anywhere in a correlated subquery
// and, by definition, they cannot host outer references.
// There are 4 categories:
// 1. Operators that are allowed anywhere in a correlated subquery, and,
// by definition of the operators, they cannot host outer references.
// 2. Operators that are allowed anywhere in a correlated subquery
// so long as they do not host outer references.
// 3. Operators that need special treatment. These operators are
// Project, Filter, Join, Aggregate, Window(?).
// 3. Operators that need special handlings. These operators are
// Project, Filter, Join, Aggregate, and Generate.
//
// Any operators that are not in the above list are allowed
// in a correlated subquery only if they are not on a correlation path.
Expand Down Expand Up @@ -1122,8 +1122,8 @@ class Analyzer(
n

// Category 3:
// Filter is the ONLY operator allowed to host correlated expressions.
// Filter can be anywhere in a correlated subquery.
// Filter is one of the two operators allowed to host correlated expressions.
// The other operator is Join. Filter can be anywhere in a correlated subquery.
case f @ Filter(cond, child) =>
// Find all predicates with an outer reference.
val (correlated, local) = splitConjunctivePredicates(cond).partition(containsOuter)
Expand Down Expand Up @@ -1173,92 +1173,56 @@ class Analyzer(
a
}

// Join cannot host any correlated expressions.
// Inner join, like Project, can be anywhere.
case j @ Join(_, _, jt, _) if jt.isInstanceOf[InnerLike] =>
failOnOuterReference(j)
j
// Right outer join's left operand cannot be on a correlation path.
case j @ Join(left, _, RightOuter, _) =>
failOnOuterReference(j)
failOnOuterReferenceInSubTree(left, "a RIGHT OUTER JOIN")
j
// Likewise, Left outer join's right operand cannot be on a correlation path.
case j @ Join(_, right, LeftOuter, _) =>
failOnOuterReference(j)
failOnOuterReferenceInSubTree(right, "a LEFT (OUTER) JOIN")
j
// LeftSemi, LeftAnti and ExistenceJoin are special cases of LeftOuter.
// ExistenceJoin cannot be used externally in both SQL and DataFrame
// so it should not show up here in Analysis phase.
case j @ Join(_, right, LeftSemi, _) =>
failOnOuterReference(j)
failOnOuterReferenceInSubTree(right, "a LEFT (OUTER) JOIN")
j
case j @ Join(_, right, LeftAnti, _) =>
failOnOuterReference(j)
failOnOuterReferenceInSubTree(right, "a LEFT (OUTER) JOIN")
j
// Any other join types not explicitly listed above,
// including Full outer join, are treated as Category 4.
case j @ Join(_, _, _, _) =>
failOnOuterReferenceInSubTree(j, "other JOIN")
// Join can host correlated expressions.
case j @ Join(left, right, joinType, _) =>
joinType match {
// Inner join, like Filter, can be anywhere.
// LeftSemi is a special case of Inner join which returns
// only the first matched row to the right table.
case _: InnerLike | LeftSemi =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot pull-out a predicate which is defined in the right side of a LeftSemi join. LeftSemi join should be treated like all a LeftOuter join.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You were doing too many code reviews yesterday/today. ;-) In your PR for SPARK-18597, you placed LeftSemi in the same category as InnerJoin.

The Left in LeftSemi does not mean it's a left outer join. It means a "semi" match from the "left" table of an inner join, returning only the first match of the right table (and ignoring subsequent matches). This is why I do not like the terminology. It's confusing. I prefer to call it a (left) early-out (inner) join, or simply early-out join.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to admit that have been reviewing a lot of PRs. However I am quite sure that you cannot define a correlated predicate in the plan on the right hand side of a LEFT SEMI/early-out join because we only output the column of the plan on the left hand side. For example:

select *
from   tbl_a
where  exists (select 1
               from tbl_b
               left semi join( select id
                               from tbl_c
                               where tbl_c.id = tbl_a.id) c
                on c.id = tbl_b.id)

In this example we could not move the correlated predicate tbl_c.id = tbl_a.id because the Left Semi join does not output cid. BTW: In this case it would actually be OK to convert the Left Semi join into an Inner join.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I write this lengthy response here is to convince you that we should leave LeftSemi in the same group as InnerJoin. Please bear with me.

The example you gave here demonstrates a limitation of the subquery supported in Spark today. We should plan to be able to handle this case of deep correlation in the future:

select  *
from    t
where   exists (select 1
                from   t2
                where  t2.c1=t1.c1
                and    exists (select 1
                               from   t3
                               where  t3.c2=t1.c1))

And if we do, then we will need to allow LeftSemi to output the columns from the right table.

One way to imagine a use case of LeftSemi is if we have a look up join where the join predicate forms a N:1 relationship, just like between a foreign key and its primary key. The join is effectively a LeftSemi that is guaranteed we only need to find the first matched row and move on to the next row of the left table (just like a hash join that we need to probe the first matched and stop early seeking the next matched in the hash chain). From a run-time viewpoint, a LeftSemi is (almost, more on this later) better than a regular InnerJoin that it does not need to probe for the next matched rows regardless of the chosen join methods: nested-loop join, sort-merge join, or hash join. A LeftSemi, however, dictates which tables can be the left and the right. As the name implies, the left table of a LeftSemi needs to be the N-side of the N:1 join. So in the case that N-side is the smaller table, it could be better to do the regular inner join with 1-side as the left table then perform a compensation on top of the join to remove the duplicate matched rows. Having said that, we can also implement a RightSemi join in the runtime layer so that we can pick any join, LeftSemi, RightSemi, or InnerJoin with a compensation, based on the cost.

Until we wade into the CBO, the whole planning business would be an interesting area, isn't it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense to extract predicates from the right hand side of a LeftSemi join. My problem with this is far more practical. Join with a LeftSemi join type does not output any right hand side attributes (see: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala#L273), the plan breaks are as soon as you extract a correlated predicate from the right hand side of the join and try to rewrite the tree using that predicate. That is all.

It would - fortunately - be quite simple to support this. Just rewrite every LeftSemi join with underlying predicates into an Inner join. I am not entirely sure if we should support this, technically the query is incorrect. Lets defer this to a follow-up PR.

The example you give is different. I do think we should support that. Please note that the example you give will not have any left semi joins during analysis, the left semi joins are introduced during optimization; this makes it relatively straight forward to detect such a nested case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got your point now. As it is today, pulling up the correlated pred from the right operand of LeftSemi will break.

failOnOuterReference(j)

// Left outer join's right operand cannot be on a correlation path.
// LeftAnti and ExistenceJoin are special cases of LeftOuter.
// Note that ExistenceJoin cannot be expressed externally in both SQL and DataFrame
// so it should not show up here in Analysis phase. This is just a safety net.
case LeftOuter | LeftAnti | ExistenceJoin(_) =>
failOnOuterReference(j)
failOnOuterReferenceInSubTree(right)

// Likewise, Right outer join's left operand cannot be on a correlation path.
case RightOuter =>
failOnOuterReference(j)
failOnOuterReferenceInSubTree(left)

// Any other join types not explicitly listed above,
// including Full outer join, are treated as Category 4.
case _ =>
failOnOuterReferenceInSubTree(j)
}
j

// ??
// Window cannot host any correlated expressions
// Window cannot be on or above any non-equality correlated expressions
case w : Window =>
failOnOuterReference(w)
failOnNonEqualCorrelatedPredicate(foundNonEqualCorrelatedPred, w)
w

// ??
case n @ Generate(generator, join, _, _, _, _) =>
// generator which is a derived class of Expression must not host
// any outer references
if (containsOuter(generator)) {
failOnOuterReference(n)
} else if (join) {
// LATERAL VIEW [OUTER]
failOnOuterReference(n)
} else {
failOnOuterReferenceInSubTree(n, "")
}
n

/*
// These operators are permitted under a correlation point
// They are not permitted between a correlation point and their outer reference.
case u: Union =>
failOnOuterReferenceInSubTree(u, "a UNION")
u
case s: SetOperation =>
failOnOuterReferenceInSubTree(s.right, "an INTERSECT/EXCEPT")
s
case e: Expand =>
failOnOuterReferenceInSubTree(e, "an EXPAND")
e
case l : LocalLimit =>
failOnOuterReferenceInSubTree(l, "a LIMIT")
l
// Since LIMIT <n> is represented as GlobalLimit(<n>, (LocalLimit (<n>, child))
// and we are walking bottom up, we will fail on LocalLimit before
// reaching GlobalLimit.
// The code below is just a safety net.
case g : GlobalLimit =>
failOnOuterReferenceInSubTree(g, "a LIMIT")
g
case s : Sample =>
failOnOuterReferenceInSubTree(s, "a TABLESAMPLE")
s
*/
// Generator with join=true, i.e., expressed with
// LATERAL VIEW [OUTER], similar to inner join,
// allows to have correlation under it
// but must not host any outer references.
case n @ Generate(generator, join, _, _, _, _) if (join) =>
if (containsOuter(generator)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure what is going on here. Why only check all the expressions in the operator, when the generator contains an outer reference? Generate only has one expression, the generator, so I think you can safely call failOnOuterReference(p) directly.

Copy link
Contributor Author

@nsyca nsyca Nov 29, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. I was thinking of

if (containsOuter(generator)) {
  failAnalysis( ... )
}

but the code clearly does not reflect my thought. I will make the change.

failOnOuterReference(n)
}
n

case n @ Generate(_, _, _, _, _, _) =>
// Generator with join=false is treated as Category 4.
failOnOuterReferenceInSubTree(n)
n

// Category 4: Any other operators not in the above 3 categories
// cannot be on a correlation path, that is they are allowed
// cannot be on a correlation path, that is they are allowed only
// under a correlation point but they and their descendant operators
// are not allowed to have any correlated expressions.
case p =>
failOnOuterReferenceInSubTree(p, "")
failOnOuterReferenceInSubTree(p)
p
}
(transformed, predicateMap.values.flatten.toSeq)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ class AnalysisErrorSuite extends AnalysisTest {
Filter(EqualTo(OuterReference(a), b), LocalRelation(b)))
),
LocalRelation(a))
assertAnalysisError(plan4, "Accessing outer query column is not allowed in a LIMIT" :: Nil)
assertAnalysisError(plan4, "Accessing outer query column is not allowed in" :: Nil)

val plan5 = Filter(
Exists(
Expand All @@ -551,6 +551,6 @@ class AnalysisErrorSuite extends AnalysisTest {
),
LocalRelation(a))
assertAnalysisError(plan5,
"Accessing outer query column is not allowed in a TABLESAMPLE" :: Nil)
"Accessing outer query column is not allowed in" :: Nil)
}
}
18 changes: 18 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -789,4 +789,22 @@ class SubquerySuite extends QueryTest with SharedSQLContext {
}
}
}

// Generate operator
test("Correlated subqueries in LATERAL VIEW") {
withTempView("t1", "t2") {
Seq((1, 1), (2, 0)).toDF("c1", "c2").createOrReplaceTempView("t1")
Seq[(Int, Array[Int])]((1, Array(1, 2)), (2, Array(-1, -3)))
.toDF("c1", "arr_c2").createTempView("t2")
checkAnswer(
sql(
"""
| select c2
| from t1
| where exists (select *
| from t2 lateral view explode(arr_c2) q as c2
where t1.c1 = t2.c1)""".stripMargin),
Row(1) :: Row(0) :: Nil)
}
}
}