-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-25497][SQL] Limit operation within whole stage codegen should not consume all the inputs #22630
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change here simply moves the inner loop after the batchEnd and metrics update, so that we can get correct metrics when we stop earlier because of limit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I change the test to check whole-stage mode only. The metrics is different between whole-stage and normal mode, and the bug was only in whole-stage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have not looked at this in details. But if there is limit before Aggregate? We should not consume all input rows.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's say the query is range -> limit -> agg -> limit.
So agg does consume all the inputs, from the first limit. The range will have a stop check w.r.t. to first limit, not the second limit. If there is no limit before agg, then range will not have a stop check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note that this is sub-optimal for adjacent limits, but I think it's fine as optimizer will merge adjacent limits.
| def outputFromRegularHashMap: String = { | ||
| s""" | ||
| |while ($iterTerm.next()) { | ||
| |while ($iterTerm.next()$keepProducingDataCond) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here I only add the stop check for regular hash map. The fast hash map is small and all in memory, it's ok to always output all of it.
|
Test build #96944 has finished for PR 22630 at commit
|
|
Test build #96947 has finished for PR 22630 at commit
|
|
Test build #96948 has finished for PR 22630 at commit
|
|
This is an interesting change. I like this idea. |
| */ | ||
| def needStopCheck: Boolean = parent.needStopCheck | ||
|
|
||
| def conditionsOfKeepProducingData: Seq[String] = parent.conditionsOfKeepProducingData |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have described simply what this two method are here?
| if (parent.limitNotReachedChecks.isEmpty) { | ||
| "" | ||
| } else { | ||
| parent.limitNotReachedChecks.mkString(" && ", " && ", "") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here we are assuming that this is going to be in and with an already existing condition. I don't see a case in which this may be used is a different context as of now, but what about just producing the conditions here and put the initial and outside of this? It may be easier to reuse this. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then we will have a lot of places generating the initial &&. If we do have a different context in the future, we can use limitNotReachedChecks directly.
| | | ||
| | if (shouldStop()) return; | ||
| |} | ||
| |$iterTerm.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is an unrelated change, right? It changes nothing in the generated code, right? just want to double-check I am not missing something (what changes is that before we were not doing the cleanup in case of limit operator, instead now we do, I see this).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it's unrelated and is a noop. outputFromRowBasedMap and outputFromVectorizedMap put the resource closing at the end, I want to be consistent here.
| // with partition start. `batchEnd` tracks the end index of the current batch, initialized | ||
| // with `nextIndex`. In the outer loop, we first check if `nextIndex == batchEnd`. If it's true, | ||
| // it means the current batch is fully consumed, and we will update `batchEnd` to process the | ||
| // next batch. If `batchEnd` reaches partition end, exit the outer loop. finally we enter the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Capital case for finally
| } | ||
| """, inlineToOuterClass = true) | ||
| val countTerm = ctx.addMutableState(CodeGenerator.JAVA_INT, "count") // init as count = 0 | ||
| ctx.addMutableState(CodeGenerator.JAVA_INT, countTerm, forceInline = true, useFreshName = false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to forceInline?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because the counter variable name is decided before we obtain the CodegenContext. If we don't inline here, we need a way to notify the upstream operators about the counter name, which is hard to do.
| } | ||
|
|
||
| private def collectNodeWithinWholeStage[T <: SparkPlan : ClassTag](plan: SparkPlan): Seq[T] = { | ||
| val stages = plan.collect { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
collectFirst?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we also want to detect the case of 2 whole stage codegen and fail.
| val range = ctx.freshName("range") | ||
| val shouldStop = if (parent.needStopCheck) { | ||
| s"if (shouldStop()) { $number = $value + ${step}L; return; }" | ||
| s"if (shouldStop()) { $nextIndex = $value + ${step}L; return; }" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in this case we are not very accurate in the metrics right? I mean we always say that we are returning a full batch, even though we have consumed less rows than a batch.
What about updating the metrics before returning? Something like $inputMetrics.incRecordsRead($localIdx - $localEnd);?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right about the problem, but I'm not going to touch this part in this PR. Note that this PR focuses on limit whole stage codegen.
Personally I feel it's ok to make the metrics a little inaccurate for better performance, we can discuss it later in other PRs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW I do have a local branch that fixed this problem, I just don't have time to benchmark it yet. I'll send it out later and let's move the discussion there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure why you need a benchmark for this (unless you did something different from what I have suggested in the earlier comment). In that case it is a single metric update which happens only when stopping, it shouldn't introduce any significant overhead. Am I missing something? Anyway let's move the discussion to the next PR then, thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something like
$inputMetrics.incRecordsRead($localIdx - $localEnd);?
localIdx is purely local to the loop, if we access it outside of the loop, we need to define localIdx outside of loop as well. This may have some performance penalty. cc @kiszk
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but shouldStop is called local to the loop, isn't it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldStop is called local, but metrics updating is not.
Anyway, JVM JIT is mysterious and we need to be super careful when updating this kind of hot loops. That said, I'm not confident of any changes to the hot loop without a benchmark.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, let's get back to this eventually later, this is anyway not worse than before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for late comment. It would be good to discuss detail in another PR.
At first, I agree with necessary of benchmarking. Here are my thoughts.
- I think that
localIdxcan be defined as local variable outside of the loop. Or, how about storinglocalIdxto another local variable only ifparent.needStopCheckistrue. - Since
shouldStop()is simply without updating, we expect the JIT applies inlining and some optimizations. - If we want to call
incRecordRead, it would be good to exit a loop usingbreakand then callincRecordRead.
|
Test build #96983 has finished for PR 22630 at commit
|
|
Test build #96984 has finished for PR 22630 at commit
|
|
Test build #96986 has finished for PR 22630 at commit
|
|
Test build #96985 has finished for PR 22630 at commit
|
| def newLimitCountTerm(): String = { | ||
| val id = curId.getAndIncrement() | ||
| s"_limit_counter_$id" | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't we use freshName?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is no CodegenContext here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see MapObjects.apply as an existing example.
| val inputRow = if (needsUnsafeRowConversion) null else row | ||
| s""" | ||
| |while ($input.hasNext()) { | ||
| |while ($input.hasNext()$limitNotReachedCond) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can put limitNotReachedCond as first condition to avoid possible buffering of row.
| // upstream operators. Here we override this method to return Nil, so that upstream operators will | ||
| // not generate useless conditions (which are always evaluated to false) for the Limit operators | ||
| // after Sort. | ||
| override def limitNotReachedChecks: Seq[String] = Nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems that all blocking operators will have this behavior. Shall we rather have a blockingOperator flag def and make this a final function incorporating this logic there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's only done in Sort and Aggregate currently. I don't want to overdesign it until there are more use cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am fine to do it later, but I'd like to avoid to have other places where we duplicate this logic in the future in order to avoid possible mistakes.
| * limit-not-reached checks. | ||
| */ | ||
| final def limitNotReachedCond: String = { | ||
| if (parent.limitNotReachedChecks.isEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just one thought: since we propagate (correctly) the limitNotReachedChecks to all the children, shall we also enforce that we are calling this on a node which will not propagate the limitNotReachedChecks anymore? We may use the blocking flag proposed in the other comment maybe.
The reason I'd like to do this is to enforce that we are not introducing the same limit condition check more than once, in more than one operator, which would be useless and may cause (small) perf issue. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not very useful to enforce that. The consequence is so minor and I don't think it's worth the complexity. I want to have a simple and robust framework for the limit optimization first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to have a simple and robust framework
yes, I 100%, that's why I'd like to early detect all the possible situations which we are not thinking as possible but may happen in corner cases we are not considering. What I am suggesting here is to enforce and fail that for testing only of course, in production we shouldn't do anything similar.
|
Test build #96996 has finished for PR 22630 at commit
|
|
Test build #96997 has finished for PR 22630 at commit
|
|
LGTM |
|
LGTM apart the minor comments which we can address also later |
|
Test build #97099 has finished for PR 22630 at commit
|
|
Test build #97101 has finished for PR 22630 at commit
|
|
retest this please. |
mgaido91
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM apart one nit, thanks for your work on this @cloud-fan and @viirya
| final def limitNotReachedCond: String = { | ||
| // InputAdapter is also a leaf node. | ||
| val isLeafNode = children.isEmpty || this.isInstanceOf[InputAdapter] | ||
| assert(isLeafNode || this.isInstanceOf[BlockingOperatorWithCodegen], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: shall we do this only if Utils.isTesting and otherwise just emit a warning maybe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah good idea!
viirya
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
| val isLeafNode = children.isEmpty || this.isInstanceOf[InputAdapter] | ||
| assert(isLeafNode || this.isInstanceOf[BlockingOperatorWithCodegen], | ||
| "only leaf nodes and blocking nodes need to call this method in its data producing loop.") | ||
| if (isLeafNode || this.isInstanceOf[BlockingOperatorWithCodegen]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (!isLeafNode && !this.isInstanceOf[BlockingOperatorWithCodegen])?
| // InputAdapter is also a leaf node. | ||
| val isLeafNode = children.isEmpty || this.isInstanceOf[InputAdapter] | ||
| if (isLeafNode || this.isInstanceOf[BlockingOperatorWithCodegen]) { | ||
| val errMsg = "only leaf nodes and blocking nodes need to call 'limitNotReachedCond' " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Only
| if (Utils.isTesting) { | ||
| throw new IllegalStateException(errMsg) | ||
| } else { | ||
| logWarning(errMsg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: shall we also mention to report to the community if seen?
|
Test build #97106 has finished for PR 22630 at commit
|
|
LGTM |
|
Test build #97109 has finished for PR 22630 at commit
|
|
Test build #97111 has finished for PR 22630 at commit
|
| case w: WholeStageCodegenExec => w | ||
| } | ||
| assert(stages.length == 1, "The query plan should have one and only one whole-stage.") | ||
| stages.head |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Do we need this line?
| val loopCondition = if (limitNotReachedChecks.isEmpty) { | ||
| "true" | ||
| } else { | ||
| limitNotReachedChecks.mkString(" && ") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I am a bit affraid about 64KB Java bytecode overflow by using mkString. On the other hand, I understand that this condition generation is performance sensitive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is whole-stage-codege. If bytecode overfolow happens, we will fallback
| if (parent.limitNotReachedChecks.isEmpty) { | ||
| "" | ||
| } else { | ||
| parent.limitNotReachedChecks.mkString("", " && ", " &&") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I am a bit affraid about 64KB Java bytecode overflow by using mkString. On the other hand, I understand that this condition generation is performance sensitive.
|
Test build #97136 has finished for PR 22630 at commit
|
|
LGTM |
|
Thanks! merging to master |
…not consume all the inputs ## What changes were proposed in this pull request? This PR is inspired by apache#22524, but proposes a safer fix. The current limit whole stage codegen has 2 problems: 1. It's only applied to `InputAdapter`, many leaf nodes can't stop earlier w.r.t. limit. 2. It needs to override a method, which will break if we have more than one limit in the whole-stage. The first problem is easy to fix, just figure out which nodes can stop earlier w.r.t. limit, and update them. This PR updates `RangeExec`, `ColumnarBatchScan`, `SortExec`, `HashAggregateExec`. The second problem is hard to fix. This PR proposes to propagate the limit counter variable name upstream, so that the upstream leaf/blocking nodes can check the limit counter and quit the loop earlier. For better performance, the implementation here follows `CodegenSupport.needStopCheck`, so that we only codegen the check only if there is limit in the query. For columnar node like range, we check the limit counter per-batch instead of per-row, to make the inner loop tight and fast. Why this is safer? 1. the leaf/blocking nodes don't have to check the limit counter and stop earlier. It's only for performance. (this is same as before) 2. The blocking operators can stop propagating the limit counter name, because the counter of limit after blocking operators will never increase, before blocking operators consume all the data from upstream operators. So the upstream operators don't care about limit after blocking operators. This is also for performance only, it's OK if we forget to do it for some new blocking operators. ## How was this patch tested? a new test Closes apache#22630 from cloud-fan/limit. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Kazuaki Ishizaki <[email protected]>
…not consume all the inputs ## What changes were proposed in this pull request? This PR is inspired by apache#22524, but proposes a safer fix. The current limit whole stage codegen has 2 problems: 1. It's only applied to `InputAdapter`, many leaf nodes can't stop earlier w.r.t. limit. 2. It needs to override a method, which will break if we have more than one limit in the whole-stage. The first problem is easy to fix, just figure out which nodes can stop earlier w.r.t. limit, and update them. This PR updates `RangeExec`, `ColumnarBatchScan`, `SortExec`, `HashAggregateExec`. The second problem is hard to fix. This PR proposes to propagate the limit counter variable name upstream, so that the upstream leaf/blocking nodes can check the limit counter and quit the loop earlier. For better performance, the implementation here follows `CodegenSupport.needStopCheck`, so that we only codegen the check only if there is limit in the query. For columnar node like range, we check the limit counter per-batch instead of per-row, to make the inner loop tight and fast. Why this is safer? 1. the leaf/blocking nodes don't have to check the limit counter and stop earlier. It's only for performance. (this is same as before) 2. The blocking operators can stop propagating the limit counter name, because the counter of limit after blocking operators will never increase, before blocking operators consume all the data from upstream operators. So the upstream operators don't care about limit after blocking operators. This is also for performance only, it's OK if we forget to do it for some new blocking operators. ## How was this patch tested? a new test Closes apache#22630 from cloud-fan/limit. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Kazuaki Ishizaki <[email protected]>
What changes were proposed in this pull request?
This PR is inspired by #22524, but proposes a safer fix.
The current limit whole stage codegen has 2 problems:
InputAdapter, many leaf nodes can't stop earlier w.r.t. limit.The first problem is easy to fix, just figure out which nodes can stop earlier w.r.t. limit, and update them. This PR updates
RangeExec,ColumnarBatchScan,SortExec,HashAggregateExec.The second problem is hard to fix. This PR proposes to propagate the limit counter variable name upstream, so that the upstream leaf/blocking nodes can check the limit counter and quit the loop earlier.
For better performance, the implementation here follows
CodegenSupport.needStopCheck, so that we only codegen the check only if there is limit in the query. For columnar node like range, we check the limit counter per-batch instead of per-row, to make the inner loop tight and fast.Why this is safer?
How was this patch tested?
a new test