Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,15 @@ trait CheckAnalysis extends PredicateHelper {
}).length > 1
}

private def checkLimitClause(limitExpr: Expression): Unit = {
val numRows = limitExpr.eval().asInstanceOf[Int]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the limit expression guaranteed to be literal?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope. Users can input an expression here. For example,

assertEqual(s"$sql limit cast(9 / 4 as int)", plan.limit(Cast(Literal(9) / 4, IntegerType)))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, but it's still foldable. Is it possible it's non-foldable?

Copy link
Member Author

@gatorsmile gatorsmile Jul 5, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not support non-foldable limit clauses.

object SpecialLimits extends Strategy {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.ReturnAnswer(rootPlan) => rootPlan match {
case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, child)) =>
execution.TakeOrderedAndProjectExec(limit, order, None, planLater(child)) :: Nil
case logical.Limit(
IntegerLiteral(limit),
logical.Project(projectList, logical.Sort(order, true, child))) =>
execution.TakeOrderedAndProjectExec(
limit, order, Some(projectList), planLater(child)) :: Nil
case logical.Limit(IntegerLiteral(limit), child) =>
execution.CollectLimitExec(limit, planLater(child)) :: Nil
case other => planLater(other) :: Nil
}
case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, child)) =>
execution.TakeOrderedAndProjectExec(limit, order, None, planLater(child)) :: Nil
case logical.Limit(
IntegerLiteral(limit), logical.Project(projectList, logical.Sort(order, true, child))) =>
execution.TakeOrderedAndProjectExec(
limit, order, Some(projectList), planLater(child)) :: Nil
case _ => Nil
}
}

case logical.LocalLimit(IntegerLiteral(limit), child) =>
execution.LocalLimitExec(limit, planLater(child)) :: Nil
case logical.GlobalLimit(IntegerLiteral(limit), child) =>
execution.GlobalLimitExec(limit, planLater(child)) :: Nil

But,,, we do not issue an exception if users do it. Thus, the error we got is strange:

assertion failed: No plan for GlobalLimit (_nondeterministic#203 > 0.2)
+- Project [key#11, value#12, rand(-1441968339187861415) AS _nondeterministic#203]
   +- LocalLimit (_nondeterministic#202 > 0.2)
      +- Project [key#11, value#12, rand(-1308350387169017676) AS _nondeterministic#202]
         +- LogicalRDD [key#11, value#12]

java.lang.AssertionError: assertion failed: No plan for GlobalLimit (_nondeterministic#203 > 0.2)
+- Project [key#11, value#12, rand(-1441968339187861415) AS _nondeterministic#203]
   +- LocalLimit (_nondeterministic#202 > 0.2)
      +- Project [key#11, value#12, rand(-1308350387169017676) AS _nondeterministic#202]
         +- LogicalRDD [key#11, value#12]

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me do it in this PR. Thank you for your review! : )

if (numRows < 0) {
failAnalysis(
s"number_rows in limit clause must be equal to or greater than 0. " +
s"number_rows:$numRows")
}
}

def checkAnalysis(plan: LogicalPlan): Unit = {
// We transform up and order the rules so as to catch the first possible failure instead
// of the result of cascading resolution failures.
Expand Down Expand Up @@ -251,6 +260,10 @@ trait CheckAnalysis extends PredicateHelper {
s"but one table has '${firstError.output.length}' columns and another table has " +
s"'${s.children.head.output.length}' columns")

case GlobalLimit(limitExpr, _) => checkLimitClause(limitExpr)

case LocalLimit(limitExpr, _) => checkLimitClause(limitExpr)

case p if p.expressions.exists(ScalarSubquery.hasCorrelatedScalarSubquery) =>
p match {
case _: Filter | _: Aggregate | _: Project => // Ok
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,12 @@ case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryN
}
override lazy val statistics: Statistics = {
val limit = limitExpr.eval().asInstanceOf[Int]
val sizeInBytes = (limit: Long) * output.map(a => a.dataType.defaultSize).sum
var sizeInBytes = (limit: Long) * output.map(a => a.dataType.defaultSize).sum
if (sizeInBytes == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a special case for limit. How about we make it more clear? e.g.

val sizeInBytes = if (limit == 0) {
  1
} else {
  ...
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Let me fix it.

// sizeInBytes can't be zero, or sizeInBytes of BinaryNode will also be zero
// (product of children).
sizeInBytes = 1
}
child.statistics.copy(sizeInBytes = sizeInBytes)
}
}
Expand All @@ -675,7 +680,12 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNo
}
override lazy val statistics: Statistics = {
val limit = limitExpr.eval().asInstanceOf[Int]
val sizeInBytes = (limit: Long) * output.map(a => a.dataType.defaultSize).sum
var sizeInBytes = (limit: Long) * output.map(a => a.dataType.defaultSize).sum
if (sizeInBytes == 0) {
// sizeInBytes can't be zero, or sizeInBytes of BinaryNode will also be zero
// (product of children).
sizeInBytes = 1
}
child.statistics.copy(sizeInBytes = sizeInBytes)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,12 @@ class AnalysisErrorSuite extends AnalysisTest {
"Generators are not supported outside the SELECT clause, but got: Sort" :: Nil
)

errorTest(
"num_rows in limit clause must be equal to or greater than 0",
listRelation.limit(-1),
"number_rows in limit clause must be equal to or greater than 0. number_rows:-1" :: Nil
)

errorTest(
"more than one generators in SELECT",
listRelation.select(Explode('list), Explode('list)),
Expand Down
13 changes: 13 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,19 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
}

test("negative in LIMIT or TABLESAMPLE") {
val expected = "number_rows in limit clause must be equal to or greater than 0. number_rows:-1"
var e = intercept[AnalysisException] {
sql("SELECT * FROM testData TABLESAMPLE (-1 rows)")
}.getMessage
assert(e.contains(expected))

e = intercept[AnalysisException] {
sql("SELECT * FROM testData LIMIT -1")
}.getMessage
assert(e.contains(expected))
}

test("aggregates with nulls") {
checkAnswer(
sql("SELECT SKEWNESS(a), KURTOSIS(a), MIN(a), MAX(a)," +
Expand Down
44 changes: 44 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

package org.apache.spark.sql

import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, Join, LocalLimit}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._

class StatisticsSuite extends QueryTest with SharedSQLContext {
import testImplicits._

test("SPARK-15392: DataFrame created from RDD should not be broadcasted") {
val rdd = sparkContext.range(1, 100).map(i => Row(i, i))
Expand All @@ -31,4 +33,46 @@ class StatisticsSuite extends QueryTest with SharedSQLContext {
spark.sessionState.conf.autoBroadcastJoinThreshold)
}

test("estimates the size of limit") {
withTempTable("test") {
Seq(("one", 1), ("two", 2), ("three", 3), ("four", 4)).toDF("k", "v")
.createOrReplaceTempView("test")
Seq((0, 1), (1, 24), (2, 48)).foreach { case (limit, expected) =>
val df = sql(s"""SELECT * FROM test limit $limit""")

val sizesGlobalLimit = df.queryExecution.analyzed.collect { case g: GlobalLimit =>
g.statistics.sizeInBytes
}
assert(sizesGlobalLimit.size === 1, s"Size wrong for:\n ${df.queryExecution}")
assert(sizesGlobalLimit(0).equals(BigInt(expected)),
s"expected exact size 24 for table 'test', got: ${sizesGlobalLimit(0)}")

val sizesLocalLimit = df.queryExecution.analyzed.collect { case l: LocalLimit =>
l.statistics.sizeInBytes
}
assert(sizesLocalLimit.size === 1, s"Size wrong for:\n ${df.queryExecution}")
assert(sizesLocalLimit(0).equals(BigInt(expected)),
s"expected exact size 24 for table 'test', got: ${sizesLocalLimit(0)}")
}
}
}

test("estimates the size of a limit 0 on outer join") {
withTempTable("test") {
Seq(("one", 1), ("two", 2), ("three", 3), ("four", 4)).toDF("k", "v")
.createOrReplaceTempView("test")
val df1 = spark.table("test")
val df2 = spark.table("test").limit(0)
val df = df1.join(df2, Seq("k"), "left")

val sizes = df.queryExecution.analyzed.collect { case g: Join =>
g.statistics.sizeInBytes
}

assert(sizes.size === 1, s"Size wrong for:\n ${df.queryExecution}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about number of Join nodes is wrong

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

assert(sizes(0).equals(BigInt(96)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you equals? Would === not work here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just following what the other test cases did. Sure, we can change all of them. Let me do it. Thanks!

s"expected exact size 96 for table 'test', got: ${sizes(0)}")
}
}

}