Skip to content
Closed
Prev Previous commit
Next Next commit
Detected foldable expression in limit
  • Loading branch information
gatorsmile committed Jul 5, 2016
commit 5b36fbcd1b0417c0e5796299ffb6b538d322fed6
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ trait CheckAnalysis extends PredicateHelper {
}

private def checkLimitClause(limitExpr: Expression): Unit = {
if (!limitExpr.foldable) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it must be foldable, can we just use int as limit instead of expression?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you saying we should change the SQL parser? You know, if so, we also need to change the TABLESAMPLE n ROWS.

FYI, even Hive does not support foldable expressions.

hive> select * from t1 limit 1 + 1;
FAILED: ParseException line 1:25 missing EOF at '+' near '1'
hive> select * from t1 limit (1+1);
FAILED: ParseException line 1:23 extraneous input '(' expecting Number near '1'

I found that Impala supports it.
http://www.cloudera.com/documentation/archive/impala/2-x/2-1-x/topics/impala_limit.html

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm saying maybe we can declare Limit as case class GlobalLimit(limit: Int, child: LogicalPlan), but it's not a small change, we could think about it later.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uh, I see. Thank you!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this may be more readable:

limitExpr match {
  case e if !e.foldable => fail(...)
  case e if e.dataType != IntegerType => fail("the limit expression must be int type, but got ${e.dataType.simpleString}")
  case e if e.eval() < 0 => fail(...)
  case e =>
}

failAnalysis(
"The argument to the LIMIT clause must evaluate to a constant value. " +
s"Limit:${limitExpr.sql}")
}
val numRows = limitExpr.eval().asInstanceOf[Int]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the limit expression guaranteed to be literal?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope. Users can input an expression here. For example,

assertEqual(s"$sql limit cast(9 / 4 as int)", plan.limit(Cast(Literal(9) / 4, IntegerType)))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, but it's still foldable. Is it possible it's non-foldable?

Copy link
Member Author

@gatorsmile gatorsmile Jul 5, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not support non-foldable limit clauses.

object SpecialLimits extends Strategy {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.ReturnAnswer(rootPlan) => rootPlan match {
case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, child)) =>
execution.TakeOrderedAndProjectExec(limit, order, None, planLater(child)) :: Nil
case logical.Limit(
IntegerLiteral(limit),
logical.Project(projectList, logical.Sort(order, true, child))) =>
execution.TakeOrderedAndProjectExec(
limit, order, Some(projectList), planLater(child)) :: Nil
case logical.Limit(IntegerLiteral(limit), child) =>
execution.CollectLimitExec(limit, planLater(child)) :: Nil
case other => planLater(other) :: Nil
}
case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, child)) =>
execution.TakeOrderedAndProjectExec(limit, order, None, planLater(child)) :: Nil
case logical.Limit(
IntegerLiteral(limit), logical.Project(projectList, logical.Sort(order, true, child))) =>
execution.TakeOrderedAndProjectExec(
limit, order, Some(projectList), planLater(child)) :: Nil
case _ => Nil
}
}

case logical.LocalLimit(IntegerLiteral(limit), child) =>
execution.LocalLimitExec(limit, planLater(child)) :: Nil
case logical.GlobalLimit(IntegerLiteral(limit), child) =>
execution.GlobalLimitExec(limit, planLater(child)) :: Nil

But,,, we do not issue an exception if users do it. Thus, the error we got is strange:

assertion failed: No plan for GlobalLimit (_nondeterministic#203 > 0.2)
+- Project [key#11, value#12, rand(-1441968339187861415) AS _nondeterministic#203]
   +- LocalLimit (_nondeterministic#202 > 0.2)
      +- Project [key#11, value#12, rand(-1308350387169017676) AS _nondeterministic#202]
         +- LogicalRDD [key#11, value#12]

java.lang.AssertionError: assertion failed: No plan for GlobalLimit (_nondeterministic#203 > 0.2)
+- Project [key#11, value#12, rand(-1441968339187861415) AS _nondeterministic#203]
   +- LocalLimit (_nondeterministic#202 > 0.2)
      +- Project [key#11, value#12, rand(-1308350387169017676) AS _nondeterministic#202]
         +- LogicalRDD [key#11, value#12]

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me do it in this PR. Thank you for your review! : )

if (numRows < 0) {
failAnalysis(
Expand Down
38 changes: 23 additions & 15 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -592,19 +592,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
}

test("negative in LIMIT or TABLESAMPLE") {
val expected = "number_rows in limit clause must be equal to or greater than 0. number_rows:-1"
var e = intercept[AnalysisException] {
sql("SELECT * FROM testData TABLESAMPLE (-1 rows)")
}.getMessage
assert(e.contains(expected))

e = intercept[AnalysisException] {
sql("SELECT * FROM testData LIMIT -1")
}.getMessage
assert(e.contains(expected))
}

test("aggregates with nulls") {
checkAnswer(
sql("SELECT SKEWNESS(a), KURTOSIS(a), MIN(a), MAX(a)," +
Expand Down Expand Up @@ -673,18 +660,39 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {

test("limit") {
checkAnswer(
sql("SELECT * FROM testData LIMIT 10"),
sql("SELECT * FROM testData LIMIT 9 + 1"),
testData.take(10).toSeq)

checkAnswer(
sql("SELECT * FROM arrayData LIMIT 1"),
sql("SELECT * FROM arrayData LIMIT CAST(1 AS Integer)"),
arrayData.collect().take(1).map(Row.fromTuple).toSeq)

checkAnswer(
sql("SELECT * FROM mapData LIMIT 1"),
mapData.collect().take(1).map(Row.fromTuple).toSeq)
}

test("non-foldable expressions in LIMIT") {
val e = intercept[AnalysisException] {
sql("SELECT * FROM testData LIMIT key > 3")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what will happen if the type is wrong? e.g. LIMIT true

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A good question! : ) Now, the exception we issued is not good:

java.lang.Boolean cannot be cast to java.lang.Integer
java.lang.ClassCastException: java.lang.Boolean cannot be cast to java.lang.Integer
    at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101)

Let me fix it and throw a more reasonable exception:

number_rows in limit clause cannot be cast to integer:true;

}.getMessage
assert(e.contains("The argument to the LIMIT clause must evaluate to a constant value. " +
"Limit:(testdata.`key` > 3)"))
}

test("negative in LIMIT or TABLESAMPLE") {
val expected = "number_rows in limit clause must be equal to or greater than 0. number_rows:-1"
var e = intercept[AnalysisException] {
sql("SELECT * FROM testData TABLESAMPLE (-1 rows)")
}.getMessage
assert(e.contains(expected))

e = intercept[AnalysisException] {
sql("SELECT * FROM testData LIMIT -1")
}.getMessage
assert(e.contains(expected))
}

test("CTE feature") {
checkAnswer(
sql("with q1 as (select * from testData limit 10) select * from q1"),
Expand Down