Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
address comment
  • Loading branch information
cloud-fan committed Oct 5, 2018
commit 51ce7be89de2a942508a939c20658b80ece9fe56
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ private[sql] trait ColumnarBatchScan extends CodegenSupport {
|if ($batch == null) {
| $nextBatchFuncName();
|}
|while ($batch != null$keepProducingDataCond) {
|while ($batch != null$limitNotReachedCond) {
| int $numRows = $batch.numRows();
| int $localEnd = $numRows - $idx;
| for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) {
Expand Down Expand Up @@ -166,7 +166,7 @@ private[sql] trait ColumnarBatchScan extends CodegenSupport {
}
val inputRow = if (needsUnsafeRowConversion) null else row
s"""
|while ($input.hasNext()$keepProducingDataCond) {
|while ($input.hasNext()$limitNotReachedCond) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can put limitNotReachedCond as first condition to avoid possible buffering of row.

| InternalRow $row = (InternalRow) $input.next();
| $numOutputRows.add(1);
| ${consume(ctx, outputVars, inputRow).trim}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,12 @@ case class SortExec(
// a stop check before sorting.
override def needStopCheck: Boolean = false

// Sort is a blocking operator. It needs to consume all the inputs before producing any
// output. This means, Limit after Sort has no effect to Sort's upstream operators.
// Here we override this method to return Nil, so that upstream operators will not generate
// unnecessary conditions (which is always evaluated to false) for the Limit after Sort.
override def conditionsOfKeepProducingData: Seq[String] = Nil
// Sort is a blocking operator. It needs to consume all the inputs before producing any output.
// This means, Limit operator after Sort will never reach its limit during the execution of Sort's
// upstream operators. Here we override this method to return Nil, so that upstream operators will
// not generate useless conditions (which are always evaluated to false) for the Limit operators
// after Sort.
override def limitNotReachedChecks: Seq[String] = Nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems that all blocking operators will have this behavior. Shall we rather have a blockingOperator flag def and make this a final function incorporating this logic there?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's only done in Sort and Aggregate currently. I don't want to overdesign it until there are more use cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine to do it later, but I'd like to avoid to have other places where we duplicate this logic in the future in order to avoid possible mistakes.


override protected def doProduce(ctx: CodegenContext): String = {
val needToSort =
Expand Down Expand Up @@ -178,7 +179,7 @@ case class SortExec(
| $needToSort = false;
| }
|
| while ($sortedIterator.hasNext()$keepProducingDataCond) {
| while ($sortedIterator.hasNext()$limitNotReachedCond) {
| UnsafeRow $outputRow = (UnsafeRow)$sortedIterator.next();
| ${consume(ctx, null, outputRow)}
| if (shouldStop()) return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,13 +346,24 @@ trait CodegenSupport extends SparkPlan {
*/
def needStopCheck: Boolean = parent.needStopCheck

def conditionsOfKeepProducingData: Seq[String] = parent.conditionsOfKeepProducingData
/**
* A sequence of checks which evaluate to true if the downstream Limit operators have not received
* enough records and reached the limit. If current node is a data producing node, it can leverage
* this information to stop producing data and complete the data flow earlier. Common data
* producing nodes are leaf nodes like Range and Scan, and blocking nodes like Sort and Aggregate.
* These checks should be put into the loop condition of the data producing loop.
*/
def limitNotReachedChecks: Seq[String] = parent.limitNotReachedChecks

final protected def keepProducingDataCond: String = {
if (parent.conditionsOfKeepProducingData.isEmpty) {
/**
* A helper method to generate the data producing loop condition according to the
* limit-not-reached checks.
*/
final def limitNotReachedCond: String = {
if (parent.limitNotReachedChecks.isEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just one thought: since we propagate (correctly) the limitNotReachedChecks to all the children, shall we also enforce that we are calling this on a node which will not propagate the limitNotReachedChecks anymore? We may use the blocking flag proposed in the other comment maybe.

The reason I'd like to do this is to enforce that we are not introducing the same limit condition check more than once, in more than one operator, which would be useless and may cause (small) perf issue. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not very useful to enforce that. The consequence is so minor and I don't think it's worth the complexity. I want to have a simple and robust framework for the limit optimization first.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to have a simple and robust framework

yes, I 100%, that's why I'd like to early detect all the possible situations which we are not thinking as possible but may happen in corner cases we are not considering. What I am suggesting here is to enforce and fail that for testing only of course, in production we shouldn't do anything similar.

""
} else {
parent.conditionsOfKeepProducingData.mkString(" && ", " && ", "")
parent.limitNotReachedChecks.mkString(" && ", " && ", "")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here we are assuming that this is going to be in and with an already existing condition. I don't see a case in which this may be used is a different context as of now, but what about just producing the conditions here and put the initial and outside of this? It may be easier to reuse this. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then we will have a lot of places generating the initial &&. If we do have a different context in the future, we can use limitNotReachedChecks directly.

}
}
}
Expand Down Expand Up @@ -391,7 +402,7 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with CodegenSupp
forceInline = true)
val row = ctx.freshName("row")
s"""
| while ($input.hasNext()$keepProducingDataCond) {
| while ($input.hasNext()$limitNotReachedCond) {
| InternalRow $row = (InternalRow) $input.next();
| ${consume(ctx, null, row).trim}
| if (shouldStop()) return;
Expand Down Expand Up @@ -687,7 +698,7 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int)

override def needStopCheck: Boolean = true

override def conditionsOfKeepProducingData: Seq[String] = Nil
override def limitNotReachedChecks: Seq[String] = Nil

override protected def otherCopyArgs: Seq[AnyRef] = Seq(codegenStageId.asInstanceOf[Integer])
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,11 @@ case class HashAggregateExec(
override def needStopCheck: Boolean = false

// Aggregate is a blocking operator. It needs to consume all the inputs before producing any
// output. This means, Limit after Aggregate has no effect to Aggregate's upstream operators.
// Here we override this method to return Nil, so that upstream operators will not generate
// unnecessary conditions (which is always evaluated to false) for the Limit after Aggregate.
override def conditionsOfKeepProducingData: Seq[String] = Nil
// output. This means, Limit operator after Aggregate will never reach its limit during the
// execution of Aggregate's upstream operators. Here we override this method to return Nil, so
// that upstream operators will not generate useless conditions (which are always evaluated to
// true) for the Limit operators after Aggregate.
override def limitNotReachedChecks: Seq[String] = Nil

protected override def doProduce(ctx: CodegenContext): String = {
if (groupingExpressions.isEmpty) {
Expand Down Expand Up @@ -711,7 +712,7 @@ case class HashAggregateExec(

def outputFromRegularHashMap: String = {
s"""
|while ($iterTerm.next()$keepProducingDataCond) {
|while ($iterTerm.next()$limitNotReachedCond) {
| UnsafeRow $keyTerm = (UnsafeRow) $iterTerm.getKey();
| UnsafeRow $bufferTerm = (UnsafeRow) $iterTerm.getValue();
| $outputFunc($keyTerm, $bufferTerm);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
| $initRangeFuncName(partitionIndex);
| }
|
| while (true$keepProducingDataCond) {
| while (true$limitNotReachedCond) {
| if ($nextIndex == $batchEnd) {
| long $nextBatchTodo;
| if ($numElementsTodo > ${batchSize}L) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ case class SortMergeJoinExec(
}

s"""
|while (findNextInnerJoinRows($leftInput, $rightInput)$keepProducingDataCond) {
|while (findNextInnerJoinRows($leftInput, $rightInput)) {
| ${leftVarDecl.mkString("\n")}
| ${beforeLoop.trim}
| scala.collection.Iterator<UnsafeRow> $iterator = $matches.generateIterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport {

private lazy val countTerm = BaseLimitExec.newLimitCountTerm()

override lazy val conditionsOfKeepProducingData: Seq[String] = {
s"$countTerm < $limit" +: super.conditionsOfKeepProducingData
override lazy val limitNotReachedChecks: Seq[String] = {
s"$countTerm < $limit" +: super.limitNotReachedChecks
}

protected override def doProduce(ctx: CodegenContext): String = {
Expand Down