Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix limit codegen.
  • Loading branch information
viirya committed Sep 24, 2018
commit a09e60f1e026504657f3de7669eb79cc0b4c2c8c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ public abstract class BufferedRowIterator {

protected int partitionIndex = -1;

// This indicates whether the query execution should be stopped even the input rows are still
// available. This is used in limit operator. When it reaches the given number of rows to limit,
// this flag is set and the execution should be stopped.
protected boolean isStopEarly = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if there are 2 limits in the query?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a test for 2 limits.

When any of 2 limits sets isStopEarly, I think the execution should be stopped. Is there any case opposite to this?


public boolean hasNext() throws IOException {
if (currentRows.isEmpty()) {
processNext();
Expand Down Expand Up @@ -73,14 +78,21 @@ public void append(InternalRow row) {
currentRows.add(row);
}

/**
* Sets the flag of stopping the query execution early.
*/
public void setStopEarly(boolean value) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we have more documents about how to use it? For now I see 2 use cases:

  1. limit operator should call it with true when the limit is hit
  2. blocking operator(sort, agg, etc.) should call it with false to reset it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Let me add it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You also hint me that we should reset stop early flag in sort exec node too. I will add it and related test.

isStopEarly = value;
}

/**
* Returns whether this iterator should stop fetching next row from [[CodegenSupport#inputRDDs]].
*
* If it returns true, the caller should exit the loop that [[InputAdapter]] generates.
* This interface is mainly used to limit the number of input rows.
*/
public boolean stopEarly() {
return false;
return isStopEarly;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ case class HashAggregateExec(
val aggTime = metricTerm(ctx, "aggTime")
val beforeAgg = ctx.freshName("beforeAgg")
s"""
| while (!$initAgg && !stopEarly()) {
| while (!$initAgg) {
| $initAgg = true;
| long $beforeAgg = System.nanoTime();
| $doAggFuncName();
Expand Down Expand Up @@ -723,6 +723,9 @@ case class HashAggregateExec(
long $beforeAgg = System.nanoTime();
$doAggFuncName();
$aggTime.add((System.nanoTime() - $beforeAgg) / 1000000);

// Reset stop early flag set by previous limit operator
setStopEarly(false);
}

// output the result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,21 +71,12 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
}

override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
val stopEarly =
ctx.addMutableState(CodeGenerator.JAVA_BOOLEAN, "stopEarly") // init as stopEarly = false

ctx.addNewFunction("stopEarly", s"""
@Override
protected boolean stopEarly() {
return $stopEarly;
}
""", inlineToOuterClass = true)
val countTerm = ctx.addMutableState(CodeGenerator.JAVA_INT, "count") // init as count = 0
s"""
| if ($countTerm < $limit) {
| $countTerm += 1;
| if ($countTerm == $limit) {
| $stopEarly = true;
| setStopEarly(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we do this after consume?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

won't we call shouldStop inside consume? if it does, stopEarly will not be set.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if ($countTerm == $limit) means this is the last record, and we should still consume it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see. And I think shouldStop shouldn't be called inside consume.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually as I'm just looking at the query again, there should not be a stopEarly check inside consume that prevents us to consume the last record. Because the check should be at the outer while loop.

The cases having stopEarly check inside consume, is blocking operators like sort and aggregate, for them we need to reset the flag.

But for safety, I think I will also move this after consume.

| }
| ${consume(ctx, input)}
| }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ class DataFrameAggregateSuite extends QueryTest with SharedSQLContext {
Seq(Row(1, 2, Seq("a", "b")), Row(3, 2, Seq("c", "c", "d"))))
}

test("SPARK-18004 limit + aggregates") {
test("SPARK-18528 limit + aggregates") {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This JIRA number is wrong.

val df = Seq(("a", 1), ("b", 2), ("c", 1), ("d", 5)).toDF("id", "value")
val limit2Df = df.limit(2)
checkAnswer(
Expand Down
25 changes: 25 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2865,6 +2865,17 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
// The second hash aggregate before local limit outputs 1 record.
assert(aggNumRecords == 101)

val aggNoGroupingDF = spark.range(0, 100, 1, 1)
.groupBy()
.count().limit(1).filter('count > 0)
aggNoGroupingDF.collect()
val aggNoGroupingNumRecords = aggNoGroupingDF.queryExecution.sparkPlan.collect {
case h: HashAggregateExec => h
}.map { hashNode =>
hashNode.metrics("numOutputRows").value
}.sum
assert(aggNoGroupingNumRecords == 2)

val filterDF = spark.range(0, 100, 1, 1).filter('id >= 0)
.selectExpr("id + 1 as id2").limit(1).filter('id > 50)
filterDF.collect()
Expand All @@ -2875,6 +2886,20 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}.head
// RangeNode and FilterNode both output 1 record.
assert(filterNumRecords == Tuple2(1, 1))

val twoLimitsDF = spark.range(0, 100, 1, 1)
.limit(1)
.filter('id >= 0)
.selectExpr("id + 1 as id2")
.limit(2)
.filter('id > 50)
twoLimitsDF.collect()
val twoLimitsDFNumRecords = twoLimitsDF.queryExecution.sparkPlan.collect {
case r: RangeExec => r
}.map { rangeNode =>
rangeNode.metrics("numOutputRows").value
}.head
assert(twoLimitsDFNumRecords == 1)
}
}

Expand Down