Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Address comments.
  • Loading branch information
viirya committed Sep 25, 2018
commit 2f4d356872438b609a55f177ead1ee00ea441350
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,12 @@ public void append(InternalRow row) {
}

/**
* Sets the flag of stopping the query execution early.
* Sets the flag of stopping the query execution early under whole-stage codegen.
*
* This has two use cases:
* 1. Limit operators should call it with true when the given limit number is reached.
* 2. Blocking operators (sort, aggregate, etc.) should call it with false to reset it after consuming
* all records from upstream.
*/
public void setStopEarly(boolean value) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we have more documents about how to use it? For now I see 2 use cases:

  1. limit operator should call it with true when the limit is hit
  2. blocking operator(sort, agg, etc.) should call it with false to reset it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Let me add it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You also hint me that we should reset stop early flag in sort exec node too. I will add it and related test.

isStopEarly = value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,12 @@ case class SortExec(
| $spillSize.add($metrics.memoryBytesSpilled() - $spillSizeBefore);
| $metrics.incPeakExecutionMemory($sorterVariable.getPeakMemoryUsage());
| $needToSort = false;
|
| // Reset stop early flag set by previous limit operator
| setStopEarly(false);
| }
|
| while ($sortedIterator.hasNext()) {
| while ($sortedIterator.hasNext() && !stopEarly()) {
| UnsafeRow $outputRow = (UnsafeRow)$sortedIterator.next();
| ${consume(ctx, null, outputRow)}
| if (shouldStop()) return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -471,8 +471,6 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
| int $localEnd = (int)($range / ${step}L);
| for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) {
| long $value = ((long)$localIdx * ${step}L) + $number;
| $numOutput.add(1);
| $inputMetrics.incRecordsRead(1);
| ${consume(ctx, Seq(ev))}
| if (stopEarly()) {
| break;
Expand All @@ -493,6 +491,9 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
| $numElementsTodo = 0;
| if ($nextBatchTodo == 0) break;
| }
| $numOutput.add($nextBatchTodo);
| $inputMetrics.incRecordsRead($nextBatchTodo);
|
| $batchEnd += $nextBatchTodo * ${step}L;
| }
""".stripMargin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,11 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
s"""
| if ($countTerm < $limit) {
| $countTerm += 1;
| ${consume(ctx, input)}
|
| if ($countTerm == $limit) {
| setStopEarly(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we do this after consume?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

won't we call shouldStop inside consume? if it does, stopEarly will not be set.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if ($countTerm == $limit) means this is the last record, and we should still consume it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see. And I think shouldStop shouldn't be called inside consume.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually as I'm just looking at the query again, there should not be a stopEarly check inside consume that prevents us to consume the last record. Because the check should be at the outer while loop.

The cases having stopEarly check inside consume, is blocking operators like sort and aggregate, for them we need to reset the flag.

But for safety, I think I will also move this after consume.

| }
| ${consume(ctx, input)}
| }
""".stripMargin
}
Expand Down
43 changes: 32 additions & 11 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import org.apache.spark.{AccumulatorSuite, SparkException}
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.sql.catalyst.util.StringUtils
import org.apache.spark.sql.execution.{aggregate, FilterExec, RangeExec}
import org.apache.spark.sql.execution.{aggregate, FilterExec, LocalLimitExec, RangeExec}
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec}
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, CartesianProductExec, SortMergeJoinExec}
import org.apache.spark.sql.functions._
Expand Down Expand Up @@ -2876,30 +2876,51 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}.sum
assert(aggNoGroupingNumRecords == 2)

// Sets `TOP_K_SORT_FALLBACK_THRESHOLD` to a low value because we don't want sort + limit
// be planned as `TakeOrderedAndProject` node.
withSQLConf(SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD.key -> "1") {
val sortDF = spark.range(0, 100, 1, 1)
.filter('id >= 0)
.limit(10)
.sortWithinPartitions("id")
// use non-deterministic expr to prevent filter be pushed down.
.selectExpr("rand() + id as id2")
.filter('id2 >= 0)
.limit(5)
.selectExpr("1 + id2 as id3")
sortDF.collect()
val sortNumRecords = sortDF.queryExecution.sparkPlan.collect {
case l @ LocalLimitExec(_, f: FilterExec) => f
}.map { filterNode =>
filterNode.metrics("numOutputRows").value
}
assert(sortNumRecords.sorted === Seq(5, 10))
}

val filterDF = spark.range(0, 100, 1, 1).filter('id >= 0)
.selectExpr("id + 1 as id2").limit(1).filter('id > 50)
filterDF.collect()
val filterNumRecords = filterDF.queryExecution.sparkPlan.collect {
case f @ FilterExec(_, r: RangeExec) => (f, r)
}.map { case (filterNode, rangeNode) =>
(filterNode.metrics("numOutputRows").value, rangeNode.metrics("numOutputRows").value)
case f @ FilterExec(_, r: RangeExec) => f
}.map { case filterNode =>
filterNode.metrics("numOutputRows").value
}.head
// RangeNode and FilterNode both output 1 record.
assert(filterNumRecords == Tuple2(1, 1))
assert(filterNumRecords == 1)

val twoLimitsDF = spark.range(0, 100, 1, 1)
.limit(1)
.filter('id >= 0)
.limit(1)
.selectExpr("id + 1 as id2")
.limit(2)
.filter('id > 50)
.filter('id2 >= 0)
twoLimitsDF.collect()
val twoLimitsDFNumRecords = twoLimitsDF.queryExecution.sparkPlan.collect {
case r: RangeExec => r
}.map { rangeNode =>
rangeNode.metrics("numOutputRows").value
case f @ FilterExec(_, _: RangeExec) => f
}.map { filterNode =>
filterNode.metrics("numOutputRows").value
}.head
assert(twoLimitsDFNumRecords == 1)
checkAnswer(twoLimitsDF, Row(1) :: Nil)
}
}

Expand Down