Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,20 @@ trait CheckAnalysis extends PredicateHelper {
}).length > 1
}

private def checkLimitClause(limitExpr: Expression): Unit = {
if (!limitExpr.foldable) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it must be foldable, can we just use int as limit instead of expression?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you saying we should change the SQL parser? You know, if so, we also need to change the TABLESAMPLE n ROWS.

FYI, even Hive does not support foldable expressions.

hive> select * from t1 limit 1 + 1;
FAILED: ParseException line 1:25 missing EOF at '+' near '1'
hive> select * from t1 limit (1+1);
FAILED: ParseException line 1:23 extraneous input '(' expecting Number near '1'

I found that Impala supports it.
http://www.cloudera.com/documentation/archive/impala/2-x/2-1-x/topics/impala_limit.html

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm saying maybe we can declare Limit as case class GlobalLimit(limit: Int, child: LogicalPlan), but it's not a small change, we could think about it later.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uh, I see. Thank you!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this may be more readable:

limitExpr match {
  case e if !e.foldable => fail(...)
  case e if e.dataType != IntegerType => fail("the limit expression must be int type, but got ${e.dataType.simpleString}")
  case e if e.eval() < 0 => fail(...)
  case e =>
}

failAnalysis(
"The argument to the LIMIT clause must evaluate to a constant value. " +
s"Limit:${limitExpr.sql}")
}
val numRows = limitExpr.eval().asInstanceOf[Int]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the limit expression guaranteed to be literal?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope. Users can input an expression here. For example,

assertEqual(s"$sql limit cast(9 / 4 as int)", plan.limit(Cast(Literal(9) / 4, IntegerType)))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, but it's still foldable. Is it possible it's non-foldable?

Copy link
Member Author

@gatorsmile gatorsmile Jul 5, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not support non-foldable limit clauses.

object SpecialLimits extends Strategy {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.ReturnAnswer(rootPlan) => rootPlan match {
case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, child)) =>
execution.TakeOrderedAndProjectExec(limit, order, None, planLater(child)) :: Nil
case logical.Limit(
IntegerLiteral(limit),
logical.Project(projectList, logical.Sort(order, true, child))) =>
execution.TakeOrderedAndProjectExec(
limit, order, Some(projectList), planLater(child)) :: Nil
case logical.Limit(IntegerLiteral(limit), child) =>
execution.CollectLimitExec(limit, planLater(child)) :: Nil
case other => planLater(other) :: Nil
}
case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, child)) =>
execution.TakeOrderedAndProjectExec(limit, order, None, planLater(child)) :: Nil
case logical.Limit(
IntegerLiteral(limit), logical.Project(projectList, logical.Sort(order, true, child))) =>
execution.TakeOrderedAndProjectExec(
limit, order, Some(projectList), planLater(child)) :: Nil
case _ => Nil
}
}

case logical.LocalLimit(IntegerLiteral(limit), child) =>
execution.LocalLimitExec(limit, planLater(child)) :: Nil
case logical.GlobalLimit(IntegerLiteral(limit), child) =>
execution.GlobalLimitExec(limit, planLater(child)) :: Nil

But,,, we do not issue an exception if users do it. Thus, the error we got is strange:

assertion failed: No plan for GlobalLimit (_nondeterministic#203 > 0.2)
+- Project [key#11, value#12, rand(-1441968339187861415) AS _nondeterministic#203]
   +- LocalLimit (_nondeterministic#202 > 0.2)
      +- Project [key#11, value#12, rand(-1308350387169017676) AS _nondeterministic#202]
         +- LogicalRDD [key#11, value#12]

java.lang.AssertionError: assertion failed: No plan for GlobalLimit (_nondeterministic#203 > 0.2)
+- Project [key#11, value#12, rand(-1441968339187861415) AS _nondeterministic#203]
   +- LocalLimit (_nondeterministic#202 > 0.2)
      +- Project [key#11, value#12, rand(-1308350387169017676) AS _nondeterministic#202]
         +- LogicalRDD [key#11, value#12]

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me do it in this PR. Thank you for your review! : )

if (numRows < 0) {
failAnalysis(
s"number_rows in limit clause must be equal to or greater than 0. " +
s"number_rows:$numRows")
}
}

def checkAnalysis(plan: LogicalPlan): Unit = {
// We transform up and order the rules so as to catch the first possible failure instead
// of the result of cascading resolution failures.
Expand Down Expand Up @@ -251,6 +265,10 @@ trait CheckAnalysis extends PredicateHelper {
s"but one table has '${firstError.output.length}' columns and another table has " +
s"'${s.children.head.output.length}' columns")

case GlobalLimit(limitExpr, _) => checkLimitClause(limitExpr)

case LocalLimit(limitExpr, _) => checkLimitClause(limitExpr)

case p if p.expressions.exists(ScalarSubquery.hasCorrelatedScalarSubquery) =>
p match {
case _: Filter | _: Aggregate | _: Project => // Ok
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,13 @@ case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryN
}
override lazy val statistics: Statistics = {
val limit = limitExpr.eval().asInstanceOf[Int]
val sizeInBytes = (limit: Long) * output.map(a => a.dataType.defaultSize).sum
val sizeInBytes = if (limit == 0) {
// sizeInBytes can't be zero, or sizeInBytes of BinaryNode will also be zero
// (product of children).
1
} else {
(limit: Long) * output.map(a => a.dataType.defaultSize).sum
}
child.statistics.copy(sizeInBytes = sizeInBytes)
}
}
Expand All @@ -675,7 +681,13 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNo
}
override lazy val statistics: Statistics = {
val limit = limitExpr.eval().asInstanceOf[Int]
val sizeInBytes = (limit: Long) * output.map(a => a.dataType.defaultSize).sum
val sizeInBytes = if (limit == 0) {
// sizeInBytes can't be zero, or sizeInBytes of BinaryNode will also be zero
// (product of children).
1
} else {
(limit: Long) * output.map(a => a.dataType.defaultSize).sum
}
child.statistics.copy(sizeInBytes = sizeInBytes)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,12 @@ class AnalysisErrorSuite extends AnalysisTest {
"Generators are not supported outside the SELECT clause, but got: Sort" :: Nil
)

errorTest(
"num_rows in limit clause must be equal to or greater than 0",
listRelation.limit(-1),
"number_rows in limit clause must be equal to or greater than 0. number_rows:-1" :: Nil
)

errorTest(
"more than one generators in SELECT",
listRelation.select(Explode('list), Explode('list)),
Expand Down
25 changes: 23 additions & 2 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -660,18 +660,39 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {

test("limit") {
checkAnswer(
sql("SELECT * FROM testData LIMIT 10"),
sql("SELECT * FROM testData LIMIT 9 + 1"),
testData.take(10).toSeq)

checkAnswer(
sql("SELECT * FROM arrayData LIMIT 1"),
sql("SELECT * FROM arrayData LIMIT CAST(1 AS Integer)"),
arrayData.collect().take(1).map(Row.fromTuple).toSeq)

checkAnswer(
sql("SELECT * FROM mapData LIMIT 1"),
mapData.collect().take(1).map(Row.fromTuple).toSeq)
}

test("non-foldable expressions in LIMIT") {
val e = intercept[AnalysisException] {
sql("SELECT * FROM testData LIMIT key > 3")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what will happen if the type is wrong? e.g. LIMIT true

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A good question! : ) Now, the exception we issued is not good:

java.lang.Boolean cannot be cast to java.lang.Integer
java.lang.ClassCastException: java.lang.Boolean cannot be cast to java.lang.Integer
    at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101)

Let me fix it and throw a more reasonable exception:

number_rows in limit clause cannot be cast to integer:true;

}.getMessage
assert(e.contains("The argument to the LIMIT clause must evaluate to a constant value. " +
"Limit:(testdata.`key` > 3)"))
}

test("negative in LIMIT or TABLESAMPLE") {
val expected = "number_rows in limit clause must be equal to or greater than 0. number_rows:-1"
var e = intercept[AnalysisException] {
sql("SELECT * FROM testData TABLESAMPLE (-1 rows)")
}.getMessage
assert(e.contains(expected))

e = intercept[AnalysisException] {
sql("SELECT * FROM testData LIMIT -1")
}.getMessage
assert(e.contains(expected))
}

test("CTE feature") {
checkAnswer(
sql("with q1 as (select * from testData limit 10) select * from q1"),
Expand Down
44 changes: 44 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

package org.apache.spark.sql

import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, Join, LocalLimit}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._

class StatisticsSuite extends QueryTest with SharedSQLContext {
import testImplicits._

test("SPARK-15392: DataFrame created from RDD should not be broadcasted") {
val rdd = sparkContext.range(1, 100).map(i => Row(i, i))
Expand All @@ -31,4 +33,46 @@ class StatisticsSuite extends QueryTest with SharedSQLContext {
spark.sessionState.conf.autoBroadcastJoinThreshold)
}

test("estimates the size of limit") {
withTempTable("test") {
Seq(("one", 1), ("two", 2), ("three", 3), ("four", 4)).toDF("k", "v")
.createOrReplaceTempView("test")
Seq((0, 1), (1, 24), (2, 48)).foreach { case (limit, expected) =>
val df = sql(s"""SELECT * FROM test limit $limit""")

val sizesGlobalLimit = df.queryExecution.analyzed.collect { case g: GlobalLimit =>
g.statistics.sizeInBytes
}
assert(sizesGlobalLimit.size === 1, s"Size wrong for:\n ${df.queryExecution}")
assert(sizesGlobalLimit.head === BigInt(expected),
s"expected exact size 24 for table 'test', got: ${sizesGlobalLimit.head}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we hardcode 24 here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

: ) Forgot to change it.


val sizesLocalLimit = df.queryExecution.analyzed.collect { case l: LocalLimit =>
l.statistics.sizeInBytes
}
assert(sizesLocalLimit.size === 1, s"Size wrong for:\n ${df.queryExecution}")
assert(sizesLocalLimit.head === BigInt(expected),
s"expected exact size 24 for table 'test', got: ${sizesLocalLimit.head}")
}
}
}

test("estimates the size of a limit 0 on outer join") {
withTempTable("test") {
Seq(("one", 1), ("two", 2), ("three", 3), ("four", 4)).toDF("k", "v")
.createOrReplaceTempView("test")
val df1 = spark.table("test")
val df2 = spark.table("test").limit(0)
val df = df1.join(df2, Seq("k"), "left")

val sizes = df.queryExecution.analyzed.collect { case g: Join =>
g.statistics.sizeInBytes
}

assert(sizes.size === 1, s"Size wrong for:\n ${df.queryExecution}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about number of Join nodes is wrong

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

assert(sizes.head === BigInt(96),
s"expected exact size 96 for table 'test', got: ${sizes.head}")
}
}

}