Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ case class SortExec(
global: Boolean,
child: SparkPlan,
testSpillFrequency: Int = 0)
extends UnaryExecNode with CodegenSupport {
extends UnaryExecNode with BlockingOperatorWithCodegen {

override def output: Seq[Attribute] = child.output

Expand Down Expand Up @@ -124,21 +124,6 @@ case class SortExec(
// Name of sorter variable used in codegen.
private var sorterVariable: String = _

// The result rows come from the sort buffer, so this operator doesn't need to copy its result
// even if its child does.
override def needCopyResult: Boolean = false

// Sort operator always consumes all the input rows before outputting any result, so we don't need
// a stop check before sorting.
override def needStopCheck: Boolean = false

// Sort is a blocking operator. It needs to consume all the inputs before producing any output.
// This means, Limit operator after Sort will never reach its limit during the execution of Sort's
// upstream operators. Here we override this method to return Nil, so that upstream operators will
// not generate useless conditions (which are always evaluated to false) for the Limit operators
// after Sort.
override def limitNotReachedChecks: Seq[String] = Nil

override protected def doProduce(ctx: CodegenContext): String = {
val needToSort =
ctx.addMutableState(CodeGenerator.JAVA_BOOLEAN, "needToSort", v => s"$v = true;")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,10 @@ trait CodegenSupport extends SparkPlan {
* limit-not-reached checks.
*/
final def limitNotReachedCond: String = {
// InputAdapter is also a leaf node.
val isLeafNode = children.isEmpty || this.isInstanceOf[InputAdapter]
assert(isLeafNode || this.isInstanceOf[BlockingOperatorWithCodegen],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: shall we do this only if Utils.isTesting and otherwise just emit a warning maybe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah good idea!

"only leaf nodes and blocking nodes need to call this method in its data producing loop.")
if (parent.limitNotReachedChecks.isEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just one thought: since we propagate (correctly) the limitNotReachedChecks to all the children, shall we also enforce that we are calling this on a node which will not propagate the limitNotReachedChecks anymore? We may use the blocking flag proposed in the other comment maybe.

The reason I'd like to do this is to enforce that we are not introducing the same limit condition check more than once, in more than one operator, which would be useless and may cause (small) perf issue. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not very useful to enforce that. The consequence is so minor and I don't think it's worth the complexity. I want to have a simple and robust framework for the limit optimization first.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to have a simple and robust framework

yes, I 100%, that's why I'd like to early detect all the possible situations which we are not thinking as possible but may happen in corner cases we are not considering. What I am suggesting here is to enforce and fail that for testing only of course, in production we shouldn't do anything similar.

""
} else {
Expand All @@ -368,6 +372,29 @@ trait CodegenSupport extends SparkPlan {
}
}

/**
* A special kind of operators which support whole stage codegen. Blocking means these operators
* will consume all the inputs first, before producing output. Typical blocking operators are
* sort and aggregate.
*/
trait BlockingOperatorWithCodegen extends CodegenSupport {

// Blocking operators usually have some kind of buffer to keep the data before producing them, so
// then don't to copy its result even if its child does.
override def needCopyResult: Boolean = false

// Blocking operators always consume all the input first, so its upstream operators don't need a
// stop check.
override def needStopCheck: Boolean = false

// Blocking operators need to consume all the inputs before producing any output. This means,
// Limit operator after this blocking operator will never reach its limit during the execution of
// this blocking operator's upstream operators. Here we override this method to return Nil, so
// that upstream operators will not generate useless conditions (which are always evaluated to
// false) for the Limit operators after this blocking operator.
override def limitNotReachedChecks: Seq[String] = Nil
}


/**
* InputAdapter is used to hide a SparkPlan from a subtree that supports codegen.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ case class HashAggregateExec(
initialInputBufferOffset: Int,
resultExpressions: Seq[NamedExpression],
child: SparkPlan)
extends UnaryExecNode with CodegenSupport {
extends UnaryExecNode with BlockingOperatorWithCodegen {

private[this] val aggregateBufferAttributes = {
aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes)
Expand Down Expand Up @@ -151,21 +151,6 @@ case class HashAggregateExec(
child.asInstanceOf[CodegenSupport].inputRDDs()
}

// The result rows come from the aggregate buffer, or a single row(no grouping keys), so this
// operator doesn't need to copy its result even if its child does.
override def needCopyResult: Boolean = false

// Aggregate operator always consumes all the input rows before outputting any result, so we
// don't need a stop check before aggregating.
override def needStopCheck: Boolean = false

// Aggregate is a blocking operator. It needs to consume all the inputs before producing any
// output. This means, Limit operator after Aggregate will never reach its limit during the
// execution of Aggregate's upstream operators. Here we override this method to return Nil, so
// that upstream operators will not generate useless conditions (which are always evaluated to
// true) for the Limit operators after Aggregate.
override def limitNotReachedChecks: Seq[String] = Nil

protected override def doProduce(ctx: CodegenContext): String = {
if (groupingExpressions.isEmpty) {
doProduceWithoutKeys(ctx)
Expand Down